]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | /* |
2 | * | |
3 | * Copyright 2016 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package grpc | |
20 | ||
21 | import ( | |
107c1cdb | 22 | "context" |
15c0b25d AP |
23 | "net" |
24 | "sync" | |
25 | ||
15c0b25d AP |
26 | "google.golang.org/grpc/codes" |
27 | "google.golang.org/grpc/credentials" | |
28 | "google.golang.org/grpc/grpclog" | |
29 | "google.golang.org/grpc/naming" | |
107c1cdb | 30 | "google.golang.org/grpc/status" |
15c0b25d AP |
31 | ) |
32 | ||
33 | // Address represents a server the client connects to. | |
107c1cdb ND |
34 | // |
35 | // Deprecated: please use package balancer. | |
15c0b25d AP |
36 | type Address struct { |
37 | // Addr is the server address on which a connection will be established. | |
38 | Addr string | |
39 | // Metadata is the information associated with Addr, which may be used | |
40 | // to make load balancing decision. | |
41 | Metadata interface{} | |
42 | } | |
43 | ||
44 | // BalancerConfig specifies the configurations for Balancer. | |
107c1cdb ND |
45 | // |
46 | // Deprecated: please use package balancer. | |
15c0b25d AP |
47 | type BalancerConfig struct { |
48 | // DialCreds is the transport credential the Balancer implementation can | |
49 | // use to dial to a remote load balancer server. The Balancer implementations | |
50 | // can ignore this if it does not need to talk to another party securely. | |
51 | DialCreds credentials.TransportCredentials | |
52 | // Dialer is the custom dialer the Balancer implementation can use to dial | |
53 | // to a remote load balancer server. The Balancer implementations | |
54 | // can ignore this if it doesn't need to talk to remote balancer. | |
55 | Dialer func(context.Context, string) (net.Conn, error) | |
56 | } | |
57 | ||
58 | // BalancerGetOptions configures a Get call. | |
107c1cdb ND |
59 | // |
60 | // Deprecated: please use package balancer. | |
15c0b25d AP |
61 | type BalancerGetOptions struct { |
62 | // BlockingWait specifies whether Get should block when there is no | |
63 | // connected address. | |
64 | BlockingWait bool | |
65 | } | |
66 | ||
67 | // Balancer chooses network addresses for RPCs. | |
107c1cdb ND |
68 | // |
69 | // Deprecated: please use package balancer. | |
15c0b25d AP |
70 | type Balancer interface { |
71 | // Start does the initialization work to bootstrap a Balancer. For example, | |
72 | // this function may start the name resolution and watch the updates. It will | |
73 | // be called when dialing. | |
74 | Start(target string, config BalancerConfig) error | |
75 | // Up informs the Balancer that gRPC has a connection to the server at | |
76 | // addr. It returns down which is called once the connection to addr gets | |
77 | // lost or closed. | |
78 | // TODO: It is not clear how to construct and take advantage of the meaningful error | |
79 | // parameter for down. Need realistic demands to guide. | |
80 | Up(addr Address) (down func(error)) | |
81 | // Get gets the address of a server for the RPC corresponding to ctx. | |
82 | // i) If it returns a connected address, gRPC internals issues the RPC on the | |
83 | // connection to this address; | |
84 | // ii) If it returns an address on which the connection is under construction | |
85 | // (initiated by Notify(...)) but not connected, gRPC internals | |
86 | // * fails RPC if the RPC is fail-fast and connection is in the TransientFailure or | |
87 | // Shutdown state; | |
88 | // or | |
89 | // * issues RPC on the connection otherwise. | |
90 | // iii) If it returns an address on which the connection does not exist, gRPC | |
91 | // internals treats it as an error and will fail the corresponding RPC. | |
92 | // | |
93 | // Therefore, the following is the recommended rule when writing a custom Balancer. | |
94 | // If opts.BlockingWait is true, it should return a connected address or | |
95 | // block if there is no connected address. It should respect the timeout or | |
96 | // cancellation of ctx when blocking. If opts.BlockingWait is false (for fail-fast | |
97 | // RPCs), it should return an address it has notified via Notify(...) immediately | |
98 | // instead of blocking. | |
99 | // | |
100 | // The function returns put which is called once the rpc has completed or failed. | |
101 | // put can collect and report RPC stats to a remote load balancer. | |
102 | // | |
103 | // This function should only return the errors Balancer cannot recover by itself. | |
104 | // gRPC internals will fail the RPC if an error is returned. | |
105 | Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) | |
106 | // Notify returns a channel that is used by gRPC internals to watch the addresses | |
107 | // gRPC needs to connect. The addresses might be from a name resolver or remote | |
108 | // load balancer. gRPC internals will compare it with the existing connected | |
109 | // addresses. If the address Balancer notified is not in the existing connected | |
110 | // addresses, gRPC starts to connect the address. If an address in the existing | |
111 | // connected addresses is not in the notification list, the corresponding connection | |
112 | // is shutdown gracefully. Otherwise, there are no operations to take. Note that | |
113 | // the Address slice must be the full list of the Addresses which should be connected. | |
114 | // It is NOT delta. | |
115 | Notify() <-chan []Address | |
116 | // Close shuts down the balancer. | |
117 | Close() error | |
118 | } | |
119 | ||
15c0b25d AP |
120 | // RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch |
121 | // the name resolution updates and updates the addresses available correspondingly. | |
107c1cdb ND |
122 | // |
123 | // Deprecated: please use package balancer/roundrobin. | |
15c0b25d AP |
124 | func RoundRobin(r naming.Resolver) Balancer { |
125 | return &roundRobin{r: r} | |
126 | } | |
127 | ||
128 | type addrInfo struct { | |
129 | addr Address | |
130 | connected bool | |
131 | } | |
132 | ||
133 | type roundRobin struct { | |
134 | r naming.Resolver | |
135 | w naming.Watcher | |
136 | addrs []*addrInfo // all the addresses the client should potentially connect | |
137 | mu sync.Mutex | |
138 | addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to. | |
139 | next int // index of the next address to return for Get() | |
140 | waitCh chan struct{} // the channel to block when there is no connected address available | |
141 | done bool // The Balancer is closed. | |
142 | } | |
143 | ||
144 | func (rr *roundRobin) watchAddrUpdates() error { | |
145 | updates, err := rr.w.Next() | |
146 | if err != nil { | |
147 | grpclog.Warningf("grpc: the naming watcher stops working due to %v.", err) | |
148 | return err | |
149 | } | |
150 | rr.mu.Lock() | |
151 | defer rr.mu.Unlock() | |
152 | for _, update := range updates { | |
153 | addr := Address{ | |
154 | Addr: update.Addr, | |
155 | Metadata: update.Metadata, | |
156 | } | |
157 | switch update.Op { | |
158 | case naming.Add: | |
159 | var exist bool | |
160 | for _, v := range rr.addrs { | |
161 | if addr == v.addr { | |
162 | exist = true | |
163 | grpclog.Infoln("grpc: The name resolver wanted to add an existing address: ", addr) | |
164 | break | |
165 | } | |
166 | } | |
167 | if exist { | |
168 | continue | |
169 | } | |
170 | rr.addrs = append(rr.addrs, &addrInfo{addr: addr}) | |
171 | case naming.Delete: | |
172 | for i, v := range rr.addrs { | |
173 | if addr == v.addr { | |
174 | copy(rr.addrs[i:], rr.addrs[i+1:]) | |
175 | rr.addrs = rr.addrs[:len(rr.addrs)-1] | |
176 | break | |
177 | } | |
178 | } | |
179 | default: | |
180 | grpclog.Errorln("Unknown update.Op ", update.Op) | |
181 | } | |
182 | } | |
183 | // Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified. | |
184 | open := make([]Address, len(rr.addrs)) | |
185 | for i, v := range rr.addrs { | |
186 | open[i] = v.addr | |
187 | } | |
188 | if rr.done { | |
189 | return ErrClientConnClosing | |
190 | } | |
191 | select { | |
192 | case <-rr.addrCh: | |
193 | default: | |
194 | } | |
195 | rr.addrCh <- open | |
196 | return nil | |
197 | } | |
198 | ||
199 | func (rr *roundRobin) Start(target string, config BalancerConfig) error { | |
200 | rr.mu.Lock() | |
201 | defer rr.mu.Unlock() | |
202 | if rr.done { | |
203 | return ErrClientConnClosing | |
204 | } | |
205 | if rr.r == nil { | |
206 | // If there is no name resolver installed, it is not needed to | |
207 | // do name resolution. In this case, target is added into rr.addrs | |
208 | // as the only address available and rr.addrCh stays nil. | |
209 | rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}}) | |
210 | return nil | |
211 | } | |
212 | w, err := rr.r.Resolve(target) | |
213 | if err != nil { | |
214 | return err | |
215 | } | |
216 | rr.w = w | |
217 | rr.addrCh = make(chan []Address, 1) | |
218 | go func() { | |
219 | for { | |
220 | if err := rr.watchAddrUpdates(); err != nil { | |
221 | return | |
222 | } | |
223 | } | |
224 | }() | |
225 | return nil | |
226 | } | |
227 | ||
228 | // Up sets the connected state of addr and sends notification if there are pending | |
229 | // Get() calls. | |
230 | func (rr *roundRobin) Up(addr Address) func(error) { | |
231 | rr.mu.Lock() | |
232 | defer rr.mu.Unlock() | |
233 | var cnt int | |
234 | for _, a := range rr.addrs { | |
235 | if a.addr == addr { | |
236 | if a.connected { | |
237 | return nil | |
238 | } | |
239 | a.connected = true | |
240 | } | |
241 | if a.connected { | |
242 | cnt++ | |
243 | } | |
244 | } | |
245 | // addr is only one which is connected. Notify the Get() callers who are blocking. | |
246 | if cnt == 1 && rr.waitCh != nil { | |
247 | close(rr.waitCh) | |
248 | rr.waitCh = nil | |
249 | } | |
250 | return func(err error) { | |
251 | rr.down(addr, err) | |
252 | } | |
253 | } | |
254 | ||
255 | // down unsets the connected state of addr. | |
256 | func (rr *roundRobin) down(addr Address, err error) { | |
257 | rr.mu.Lock() | |
258 | defer rr.mu.Unlock() | |
259 | for _, a := range rr.addrs { | |
260 | if addr == a.addr { | |
261 | a.connected = false | |
262 | break | |
263 | } | |
264 | } | |
265 | } | |
266 | ||
267 | // Get returns the next addr in the rotation. | |
268 | func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) { | |
269 | var ch chan struct{} | |
270 | rr.mu.Lock() | |
271 | if rr.done { | |
272 | rr.mu.Unlock() | |
273 | err = ErrClientConnClosing | |
274 | return | |
275 | } | |
276 | ||
277 | if len(rr.addrs) > 0 { | |
278 | if rr.next >= len(rr.addrs) { | |
279 | rr.next = 0 | |
280 | } | |
281 | next := rr.next | |
282 | for { | |
283 | a := rr.addrs[next] | |
284 | next = (next + 1) % len(rr.addrs) | |
285 | if a.connected { | |
286 | addr = a.addr | |
287 | rr.next = next | |
288 | rr.mu.Unlock() | |
289 | return | |
290 | } | |
291 | if next == rr.next { | |
292 | // Has iterated all the possible address but none is connected. | |
293 | break | |
294 | } | |
295 | } | |
296 | } | |
297 | if !opts.BlockingWait { | |
298 | if len(rr.addrs) == 0 { | |
299 | rr.mu.Unlock() | |
107c1cdb | 300 | err = status.Errorf(codes.Unavailable, "there is no address available") |
15c0b25d AP |
301 | return |
302 | } | |
303 | // Returns the next addr on rr.addrs for failfast RPCs. | |
304 | addr = rr.addrs[rr.next].addr | |
305 | rr.next++ | |
306 | rr.mu.Unlock() | |
307 | return | |
308 | } | |
309 | // Wait on rr.waitCh for non-failfast RPCs. | |
310 | if rr.waitCh == nil { | |
311 | ch = make(chan struct{}) | |
312 | rr.waitCh = ch | |
313 | } else { | |
314 | ch = rr.waitCh | |
315 | } | |
316 | rr.mu.Unlock() | |
317 | for { | |
318 | select { | |
319 | case <-ctx.Done(): | |
320 | err = ctx.Err() | |
321 | return | |
322 | case <-ch: | |
323 | rr.mu.Lock() | |
324 | if rr.done { | |
325 | rr.mu.Unlock() | |
326 | err = ErrClientConnClosing | |
327 | return | |
328 | } | |
329 | ||
330 | if len(rr.addrs) > 0 { | |
331 | if rr.next >= len(rr.addrs) { | |
332 | rr.next = 0 | |
333 | } | |
334 | next := rr.next | |
335 | for { | |
336 | a := rr.addrs[next] | |
337 | next = (next + 1) % len(rr.addrs) | |
338 | if a.connected { | |
339 | addr = a.addr | |
340 | rr.next = next | |
341 | rr.mu.Unlock() | |
342 | return | |
343 | } | |
344 | if next == rr.next { | |
345 | // Has iterated all the possible address but none is connected. | |
346 | break | |
347 | } | |
348 | } | |
349 | } | |
350 | // The newly added addr got removed by Down() again. | |
351 | if rr.waitCh == nil { | |
352 | ch = make(chan struct{}) | |
353 | rr.waitCh = ch | |
354 | } else { | |
355 | ch = rr.waitCh | |
356 | } | |
357 | rr.mu.Unlock() | |
358 | } | |
359 | } | |
360 | } | |
361 | ||
362 | func (rr *roundRobin) Notify() <-chan []Address { | |
363 | return rr.addrCh | |
364 | } | |
365 | ||
366 | func (rr *roundRobin) Close() error { | |
367 | rr.mu.Lock() | |
368 | defer rr.mu.Unlock() | |
369 | if rr.done { | |
370 | return errBalancerClosed | |
371 | } | |
372 | rr.done = true | |
373 | if rr.w != nil { | |
374 | rr.w.Close() | |
375 | } | |
376 | if rr.waitCh != nil { | |
377 | close(rr.waitCh) | |
378 | rr.waitCh = nil | |
379 | } | |
380 | if rr.addrCh != nil { | |
381 | close(rr.addrCh) | |
382 | } | |
383 | return nil | |
384 | } | |
107c1cdb ND |
385 | |
386 | // pickFirst is used to test multi-addresses in one addrConn in which all addresses share the same addrConn. | |
387 | // It is a wrapper around roundRobin balancer. The logic of all methods works fine because balancer.Get() | |
388 | // returns the only address Up by resetTransport(). | |
389 | type pickFirst struct { | |
390 | *roundRobin | |
391 | } |