diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/grpclb.go')
-rw-r--r-- | vendor/google.golang.org/grpc/grpclb.go | 737 |
1 files changed, 737 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/grpclb.go b/vendor/google.golang.org/grpc/grpclb.go new file mode 100644 index 0000000..f7b6b7d --- /dev/null +++ b/vendor/google.golang.org/grpc/grpclb.go | |||
@@ -0,0 +1,737 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2016 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package grpc | ||
20 | |||
21 | import ( | ||
22 | "errors" | ||
23 | "fmt" | ||
24 | "math/rand" | ||
25 | "net" | ||
26 | "sync" | ||
27 | "time" | ||
28 | |||
29 | "golang.org/x/net/context" | ||
30 | "google.golang.org/grpc/codes" | ||
31 | lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1" | ||
32 | "google.golang.org/grpc/grpclog" | ||
33 | "google.golang.org/grpc/metadata" | ||
34 | "google.golang.org/grpc/naming" | ||
35 | ) | ||
36 | |||
37 | // Client API for LoadBalancer service. | ||
38 | // Mostly copied from generated pb.go file. | ||
39 | // To avoid circular dependency. | ||
40 | type loadBalancerClient struct { | ||
41 | cc *ClientConn | ||
42 | } | ||
43 | |||
44 | func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) { | ||
45 | desc := &StreamDesc{ | ||
46 | StreamName: "BalanceLoad", | ||
47 | ServerStreams: true, | ||
48 | ClientStreams: true, | ||
49 | } | ||
50 | stream, err := NewClientStream(ctx, desc, c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...) | ||
51 | if err != nil { | ||
52 | return nil, err | ||
53 | } | ||
54 | x := &balanceLoadClientStream{stream} | ||
55 | return x, nil | ||
56 | } | ||
57 | |||
58 | type balanceLoadClientStream struct { | ||
59 | ClientStream | ||
60 | } | ||
61 | |||
62 | func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error { | ||
63 | return x.ClientStream.SendMsg(m) | ||
64 | } | ||
65 | |||
66 | func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) { | ||
67 | m := new(lbpb.LoadBalanceResponse) | ||
68 | if err := x.ClientStream.RecvMsg(m); err != nil { | ||
69 | return nil, err | ||
70 | } | ||
71 | return m, nil | ||
72 | } | ||
73 | |||
74 | // NewGRPCLBBalancer creates a grpclb load balancer. | ||
75 | func NewGRPCLBBalancer(r naming.Resolver) Balancer { | ||
76 | return &balancer{ | ||
77 | r: r, | ||
78 | } | ||
79 | } | ||
80 | |||
81 | type remoteBalancerInfo struct { | ||
82 | addr string | ||
83 | // the server name used for authentication with the remote LB server. | ||
84 | name string | ||
85 | } | ||
86 | |||
87 | // grpclbAddrInfo consists of the information of a backend server. | ||
88 | type grpclbAddrInfo struct { | ||
89 | addr Address | ||
90 | connected bool | ||
91 | // dropForRateLimiting indicates whether this particular request should be | ||
92 | // dropped by the client for rate limiting. | ||
93 | dropForRateLimiting bool | ||
94 | // dropForLoadBalancing indicates whether this particular request should be | ||
95 | // dropped by the client for load balancing. | ||
96 | dropForLoadBalancing bool | ||
97 | } | ||
98 | |||
99 | type balancer struct { | ||
100 | r naming.Resolver | ||
101 | target string | ||
102 | mu sync.Mutex | ||
103 | seq int // a sequence number to make sure addrCh does not get stale addresses. | ||
104 | w naming.Watcher | ||
105 | addrCh chan []Address | ||
106 | rbs []remoteBalancerInfo | ||
107 | addrs []*grpclbAddrInfo | ||
108 | next int | ||
109 | waitCh chan struct{} | ||
110 | done bool | ||
111 | expTimer *time.Timer | ||
112 | rand *rand.Rand | ||
113 | |||
114 | clientStats lbpb.ClientStats | ||
115 | } | ||
116 | |||
117 | func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error { | ||
118 | updates, err := w.Next() | ||
119 | if err != nil { | ||
120 | grpclog.Warningf("grpclb: failed to get next addr update from watcher: %v", err) | ||
121 | return err | ||
122 | } | ||
123 | b.mu.Lock() | ||
124 | defer b.mu.Unlock() | ||
125 | if b.done { | ||
126 | return ErrClientConnClosing | ||
127 | } | ||
128 | for _, update := range updates { | ||
129 | switch update.Op { | ||
130 | case naming.Add: | ||
131 | var exist bool | ||
132 | for _, v := range b.rbs { | ||
133 | // TODO: Is the same addr with different server name a different balancer? | ||
134 | if update.Addr == v.addr { | ||
135 | exist = true | ||
136 | break | ||
137 | } | ||
138 | } | ||
139 | if exist { | ||
140 | continue | ||
141 | } | ||
142 | md, ok := update.Metadata.(*naming.AddrMetadataGRPCLB) | ||
143 | if !ok { | ||
144 | // TODO: Revisit the handling here and may introduce some fallback mechanism. | ||
145 | grpclog.Errorf("The name resolution contains unexpected metadata %v", update.Metadata) | ||
146 | continue | ||
147 | } | ||
148 | switch md.AddrType { | ||
149 | case naming.Backend: | ||
150 | // TODO: Revisit the handling here and may introduce some fallback mechanism. | ||
151 | grpclog.Errorf("The name resolution does not give grpclb addresses") | ||
152 | continue | ||
153 | case naming.GRPCLB: | ||
154 | b.rbs = append(b.rbs, remoteBalancerInfo{ | ||
155 | addr: update.Addr, | ||
156 | name: md.ServerName, | ||
157 | }) | ||
158 | default: | ||
159 | grpclog.Errorf("Received unknow address type %d", md.AddrType) | ||
160 | continue | ||
161 | } | ||
162 | case naming.Delete: | ||
163 | for i, v := range b.rbs { | ||
164 | if update.Addr == v.addr { | ||
165 | copy(b.rbs[i:], b.rbs[i+1:]) | ||
166 | b.rbs = b.rbs[:len(b.rbs)-1] | ||
167 | break | ||
168 | } | ||
169 | } | ||
170 | default: | ||
171 | grpclog.Errorf("Unknown update.Op %v", update.Op) | ||
172 | } | ||
173 | } | ||
174 | // TODO: Fall back to the basic round-robin load balancing if the resulting address is | ||
175 | // not a load balancer. | ||
176 | select { | ||
177 | case <-ch: | ||
178 | default: | ||
179 | } | ||
180 | ch <- b.rbs | ||
181 | return nil | ||
182 | } | ||
183 | |||
184 | func (b *balancer) serverListExpire(seq int) { | ||
185 | b.mu.Lock() | ||
186 | defer b.mu.Unlock() | ||
187 | // TODO: gRPC interanls do not clear the connections when the server list is stale. | ||
188 | // This means RPCs will keep using the existing server list until b receives new | ||
189 | // server list even though the list is expired. Revisit this behavior later. | ||
190 | if b.done || seq < b.seq { | ||
191 | return | ||
192 | } | ||
193 | b.next = 0 | ||
194 | b.addrs = nil | ||
195 | // Ask grpc internals to close all the corresponding connections. | ||
196 | b.addrCh <- nil | ||
197 | } | ||
198 | |||
199 | func convertDuration(d *lbpb.Duration) time.Duration { | ||
200 | if d == nil { | ||
201 | return 0 | ||
202 | } | ||
203 | return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond | ||
204 | } | ||
205 | |||
206 | func (b *balancer) processServerList(l *lbpb.ServerList, seq int) { | ||
207 | if l == nil { | ||
208 | return | ||
209 | } | ||
210 | servers := l.GetServers() | ||
211 | expiration := convertDuration(l.GetExpirationInterval()) | ||
212 | var ( | ||
213 | sl []*grpclbAddrInfo | ||
214 | addrs []Address | ||
215 | ) | ||
216 | for _, s := range servers { | ||
217 | md := metadata.Pairs("lb-token", s.LoadBalanceToken) | ||
218 | ip := net.IP(s.IpAddress) | ||
219 | ipStr := ip.String() | ||
220 | if ip.To4() == nil { | ||
221 | // Add square brackets to ipv6 addresses, otherwise net.Dial() and | ||
222 | // net.SplitHostPort() will return too many colons error. | ||
223 | ipStr = fmt.Sprintf("[%s]", ipStr) | ||
224 | } | ||
225 | addr := Address{ | ||
226 | Addr: fmt.Sprintf("%s:%d", ipStr, s.Port), | ||
227 | Metadata: &md, | ||
228 | } | ||
229 | sl = append(sl, &grpclbAddrInfo{ | ||
230 | addr: addr, | ||
231 | dropForRateLimiting: s.DropForRateLimiting, | ||
232 | dropForLoadBalancing: s.DropForLoadBalancing, | ||
233 | }) | ||
234 | addrs = append(addrs, addr) | ||
235 | } | ||
236 | b.mu.Lock() | ||
237 | defer b.mu.Unlock() | ||
238 | if b.done || seq < b.seq { | ||
239 | return | ||
240 | } | ||
241 | if len(sl) > 0 { | ||
242 | // reset b.next to 0 when replacing the server list. | ||
243 | b.next = 0 | ||
244 | b.addrs = sl | ||
245 | b.addrCh <- addrs | ||
246 | if b.expTimer != nil { | ||
247 | b.expTimer.Stop() | ||
248 | b.expTimer = nil | ||
249 | } | ||
250 | if expiration > 0 { | ||
251 | b.expTimer = time.AfterFunc(expiration, func() { | ||
252 | b.serverListExpire(seq) | ||
253 | }) | ||
254 | } | ||
255 | } | ||
256 | return | ||
257 | } | ||
258 | |||
259 | func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) { | ||
260 | ticker := time.NewTicker(interval) | ||
261 | defer ticker.Stop() | ||
262 | for { | ||
263 | select { | ||
264 | case <-ticker.C: | ||
265 | case <-done: | ||
266 | return | ||
267 | } | ||
268 | b.mu.Lock() | ||
269 | stats := b.clientStats | ||
270 | b.clientStats = lbpb.ClientStats{} // Clear the stats. | ||
271 | b.mu.Unlock() | ||
272 | t := time.Now() | ||
273 | stats.Timestamp = &lbpb.Timestamp{ | ||
274 | Seconds: t.Unix(), | ||
275 | Nanos: int32(t.Nanosecond()), | ||
276 | } | ||
277 | if err := s.Send(&lbpb.LoadBalanceRequest{ | ||
278 | LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{ | ||
279 | ClientStats: &stats, | ||
280 | }, | ||
281 | }); err != nil { | ||
282 | grpclog.Errorf("grpclb: failed to send load report: %v", err) | ||
283 | return | ||
284 | } | ||
285 | } | ||
286 | } | ||
287 | |||
288 | func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) { | ||
289 | ctx, cancel := context.WithCancel(context.Background()) | ||
290 | defer cancel() | ||
291 | stream, err := lbc.BalanceLoad(ctx) | ||
292 | if err != nil { | ||
293 | grpclog.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err) | ||
294 | return | ||
295 | } | ||
296 | b.mu.Lock() | ||
297 | if b.done { | ||
298 | b.mu.Unlock() | ||
299 | return | ||
300 | } | ||
301 | b.mu.Unlock() | ||
302 | initReq := &lbpb.LoadBalanceRequest{ | ||
303 | LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{ | ||
304 | InitialRequest: &lbpb.InitialLoadBalanceRequest{ | ||
305 | Name: b.target, | ||
306 | }, | ||
307 | }, | ||
308 | } | ||
309 | if err := stream.Send(initReq); err != nil { | ||
310 | grpclog.Errorf("grpclb: failed to send init request: %v", err) | ||
311 | // TODO: backoff on retry? | ||
312 | return true | ||
313 | } | ||
314 | reply, err := stream.Recv() | ||
315 | if err != nil { | ||
316 | grpclog.Errorf("grpclb: failed to recv init response: %v", err) | ||
317 | // TODO: backoff on retry? | ||
318 | return true | ||
319 | } | ||
320 | initResp := reply.GetInitialResponse() | ||
321 | if initResp == nil { | ||
322 | grpclog.Errorf("grpclb: reply from remote balancer did not include initial response.") | ||
323 | return | ||
324 | } | ||
325 | // TODO: Support delegation. | ||
326 | if initResp.LoadBalancerDelegate != "" { | ||
327 | // delegation | ||
328 | grpclog.Errorf("TODO: Delegation is not supported yet.") | ||
329 | return | ||
330 | } | ||
331 | streamDone := make(chan struct{}) | ||
332 | defer close(streamDone) | ||
333 | b.mu.Lock() | ||
334 | b.clientStats = lbpb.ClientStats{} // Clear client stats. | ||
335 | b.mu.Unlock() | ||
336 | if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 { | ||
337 | go b.sendLoadReport(stream, d, streamDone) | ||
338 | } | ||
339 | // Retrieve the server list. | ||
340 | for { | ||
341 | reply, err := stream.Recv() | ||
342 | if err != nil { | ||
343 | grpclog.Errorf("grpclb: failed to recv server list: %v", err) | ||
344 | break | ||
345 | } | ||
346 | b.mu.Lock() | ||
347 | if b.done || seq < b.seq { | ||
348 | b.mu.Unlock() | ||
349 | return | ||
350 | } | ||
351 | b.seq++ // tick when receiving a new list of servers. | ||
352 | seq = b.seq | ||
353 | b.mu.Unlock() | ||
354 | if serverList := reply.GetServerList(); serverList != nil { | ||
355 | b.processServerList(serverList, seq) | ||
356 | } | ||
357 | } | ||
358 | return true | ||
359 | } | ||
360 | |||
361 | func (b *balancer) Start(target string, config BalancerConfig) error { | ||
362 | b.rand = rand.New(rand.NewSource(time.Now().Unix())) | ||
363 | // TODO: Fall back to the basic direct connection if there is no name resolver. | ||
364 | if b.r == nil { | ||
365 | return errors.New("there is no name resolver installed") | ||
366 | } | ||
367 | b.target = target | ||
368 | b.mu.Lock() | ||
369 | if b.done { | ||
370 | b.mu.Unlock() | ||
371 | return ErrClientConnClosing | ||
372 | } | ||
373 | b.addrCh = make(chan []Address) | ||
374 | w, err := b.r.Resolve(target) | ||
375 | if err != nil { | ||
376 | b.mu.Unlock() | ||
377 | grpclog.Errorf("grpclb: failed to resolve address: %v, err: %v", target, err) | ||
378 | return err | ||
379 | } | ||
380 | b.w = w | ||
381 | b.mu.Unlock() | ||
382 | balancerAddrsCh := make(chan []remoteBalancerInfo, 1) | ||
383 | // Spawn a goroutine to monitor the name resolution of remote load balancer. | ||
384 | go func() { | ||
385 | for { | ||
386 | if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil { | ||
387 | grpclog.Warningf("grpclb: the naming watcher stops working due to %v.\n", err) | ||
388 | close(balancerAddrsCh) | ||
389 | return | ||
390 | } | ||
391 | } | ||
392 | }() | ||
393 | // Spawn a goroutine to talk to the remote load balancer. | ||
394 | go func() { | ||
395 | var ( | ||
396 | cc *ClientConn | ||
397 | // ccError is closed when there is an error in the current cc. | ||
398 | // A new rb should be picked from rbs and connected. | ||
399 | ccError chan struct{} | ||
400 | rb *remoteBalancerInfo | ||
401 | rbs []remoteBalancerInfo | ||
402 | rbIdx int | ||
403 | ) | ||
404 | |||
405 | defer func() { | ||
406 | if ccError != nil { | ||
407 | select { | ||
408 | case <-ccError: | ||
409 | default: | ||
410 | close(ccError) | ||
411 | } | ||
412 | } | ||
413 | if cc != nil { | ||
414 | cc.Close() | ||
415 | } | ||
416 | }() | ||
417 | |||
418 | for { | ||
419 | var ok bool | ||
420 | select { | ||
421 | case rbs, ok = <-balancerAddrsCh: | ||
422 | if !ok { | ||
423 | return | ||
424 | } | ||
425 | foundIdx := -1 | ||
426 | if rb != nil { | ||
427 | for i, trb := range rbs { | ||
428 | if trb == *rb { | ||
429 | foundIdx = i | ||
430 | break | ||
431 | } | ||
432 | } | ||
433 | } | ||
434 | if foundIdx >= 0 { | ||
435 | if foundIdx >= 1 { | ||
436 | // Move the address in use to the beginning of the list. | ||
437 | b.rbs[0], b.rbs[foundIdx] = b.rbs[foundIdx], b.rbs[0] | ||
438 | rbIdx = 0 | ||
439 | } | ||
440 | continue // If found, don't dial new cc. | ||
441 | } else if len(rbs) > 0 { | ||
442 | // Pick a random one from the list, instead of always using the first one. | ||
443 | if l := len(rbs); l > 1 && rb != nil { | ||
444 | tmpIdx := b.rand.Intn(l - 1) | ||
445 | b.rbs[0], b.rbs[tmpIdx] = b.rbs[tmpIdx], b.rbs[0] | ||
446 | } | ||
447 | rbIdx = 0 | ||
448 | rb = &rbs[0] | ||
449 | } else { | ||
450 | // foundIdx < 0 && len(rbs) <= 0. | ||
451 | rb = nil | ||
452 | } | ||
453 | case <-ccError: | ||
454 | ccError = nil | ||
455 | if rbIdx < len(rbs)-1 { | ||
456 | rbIdx++ | ||
457 | rb = &rbs[rbIdx] | ||
458 | } else { | ||
459 | rb = nil | ||
460 | } | ||
461 | } | ||
462 | |||
463 | if rb == nil { | ||
464 | continue | ||
465 | } | ||
466 | |||
467 | if cc != nil { | ||
468 | cc.Close() | ||
469 | } | ||
470 | // Talk to the remote load balancer to get the server list. | ||
471 | var ( | ||
472 | err error | ||
473 | dopts []DialOption | ||
474 | ) | ||
475 | if creds := config.DialCreds; creds != nil { | ||
476 | if rb.name != "" { | ||
477 | if err := creds.OverrideServerName(rb.name); err != nil { | ||
478 | grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v", err) | ||
479 | continue | ||
480 | } | ||
481 | } | ||
482 | dopts = append(dopts, WithTransportCredentials(creds)) | ||
483 | } else { | ||
484 | dopts = append(dopts, WithInsecure()) | ||
485 | } | ||
486 | if dialer := config.Dialer; dialer != nil { | ||
487 | // WithDialer takes a different type of function, so we instead use a special DialOption here. | ||
488 | dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = dialer }) | ||
489 | } | ||
490 | ccError = make(chan struct{}) | ||
491 | cc, err = Dial(rb.addr, dopts...) | ||
492 | if err != nil { | ||
493 | grpclog.Warningf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err) | ||
494 | close(ccError) | ||
495 | continue | ||
496 | } | ||
497 | b.mu.Lock() | ||
498 | b.seq++ // tick when getting a new balancer address | ||
499 | seq := b.seq | ||
500 | b.next = 0 | ||
501 | b.mu.Unlock() | ||
502 | go func(cc *ClientConn, ccError chan struct{}) { | ||
503 | lbc := &loadBalancerClient{cc} | ||
504 | b.callRemoteBalancer(lbc, seq) | ||
505 | cc.Close() | ||
506 | select { | ||
507 | case <-ccError: | ||
508 | default: | ||
509 | close(ccError) | ||
510 | } | ||
511 | }(cc, ccError) | ||
512 | } | ||
513 | }() | ||
514 | return nil | ||
515 | } | ||
516 | |||
517 | func (b *balancer) down(addr Address, err error) { | ||
518 | b.mu.Lock() | ||
519 | defer b.mu.Unlock() | ||
520 | for _, a := range b.addrs { | ||
521 | if addr == a.addr { | ||
522 | a.connected = false | ||
523 | break | ||
524 | } | ||
525 | } | ||
526 | } | ||
527 | |||
528 | func (b *balancer) Up(addr Address) func(error) { | ||
529 | b.mu.Lock() | ||
530 | defer b.mu.Unlock() | ||
531 | if b.done { | ||
532 | return nil | ||
533 | } | ||
534 | var cnt int | ||
535 | for _, a := range b.addrs { | ||
536 | if a.addr == addr { | ||
537 | if a.connected { | ||
538 | return nil | ||
539 | } | ||
540 | a.connected = true | ||
541 | } | ||
542 | if a.connected && !a.dropForRateLimiting && !a.dropForLoadBalancing { | ||
543 | cnt++ | ||
544 | } | ||
545 | } | ||
546 | // addr is the only one which is connected. Notify the Get() callers who are blocking. | ||
547 | if cnt == 1 && b.waitCh != nil { | ||
548 | close(b.waitCh) | ||
549 | b.waitCh = nil | ||
550 | } | ||
551 | return func(err error) { | ||
552 | b.down(addr, err) | ||
553 | } | ||
554 | } | ||
555 | |||
556 | func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) { | ||
557 | var ch chan struct{} | ||
558 | b.mu.Lock() | ||
559 | if b.done { | ||
560 | b.mu.Unlock() | ||
561 | err = ErrClientConnClosing | ||
562 | return | ||
563 | } | ||
564 | seq := b.seq | ||
565 | |||
566 | defer func() { | ||
567 | if err != nil { | ||
568 | return | ||
569 | } | ||
570 | put = func() { | ||
571 | s, ok := rpcInfoFromContext(ctx) | ||
572 | if !ok { | ||
573 | return | ||
574 | } | ||
575 | b.mu.Lock() | ||
576 | defer b.mu.Unlock() | ||
577 | if b.done || seq < b.seq { | ||
578 | return | ||
579 | } | ||
580 | b.clientStats.NumCallsFinished++ | ||
581 | if !s.bytesSent { | ||
582 | b.clientStats.NumCallsFinishedWithClientFailedToSend++ | ||
583 | } else if s.bytesReceived { | ||
584 | b.clientStats.NumCallsFinishedKnownReceived++ | ||
585 | } | ||
586 | } | ||
587 | }() | ||
588 | |||
589 | b.clientStats.NumCallsStarted++ | ||
590 | if len(b.addrs) > 0 { | ||
591 | if b.next >= len(b.addrs) { | ||
592 | b.next = 0 | ||
593 | } | ||
594 | next := b.next | ||
595 | for { | ||
596 | a := b.addrs[next] | ||
597 | next = (next + 1) % len(b.addrs) | ||
598 | if a.connected { | ||
599 | if !a.dropForRateLimiting && !a.dropForLoadBalancing { | ||
600 | addr = a.addr | ||
601 | b.next = next | ||
602 | b.mu.Unlock() | ||
603 | return | ||
604 | } | ||
605 | if !opts.BlockingWait { | ||
606 | b.next = next | ||
607 | if a.dropForLoadBalancing { | ||
608 | b.clientStats.NumCallsFinished++ | ||
609 | b.clientStats.NumCallsFinishedWithDropForLoadBalancing++ | ||
610 | } else if a.dropForRateLimiting { | ||
611 | b.clientStats.NumCallsFinished++ | ||
612 | b.clientStats.NumCallsFinishedWithDropForRateLimiting++ | ||
613 | } | ||
614 | b.mu.Unlock() | ||
615 | err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr) | ||
616 | return | ||
617 | } | ||
618 | } | ||
619 | if next == b.next { | ||
620 | // Has iterated all the possible address but none is connected. | ||
621 | break | ||
622 | } | ||
623 | } | ||
624 | } | ||
625 | if !opts.BlockingWait { | ||
626 | if len(b.addrs) == 0 { | ||
627 | b.clientStats.NumCallsFinished++ | ||
628 | b.clientStats.NumCallsFinishedWithClientFailedToSend++ | ||
629 | b.mu.Unlock() | ||
630 | err = Errorf(codes.Unavailable, "there is no address available") | ||
631 | return | ||
632 | } | ||
633 | // Returns the next addr on b.addrs for a failfast RPC. | ||
634 | addr = b.addrs[b.next].addr | ||
635 | b.next++ | ||
636 | b.mu.Unlock() | ||
637 | return | ||
638 | } | ||
639 | // Wait on b.waitCh for non-failfast RPCs. | ||
640 | if b.waitCh == nil { | ||
641 | ch = make(chan struct{}) | ||
642 | b.waitCh = ch | ||
643 | } else { | ||
644 | ch = b.waitCh | ||
645 | } | ||
646 | b.mu.Unlock() | ||
647 | for { | ||
648 | select { | ||
649 | case <-ctx.Done(): | ||
650 | b.mu.Lock() | ||
651 | b.clientStats.NumCallsFinished++ | ||
652 | b.clientStats.NumCallsFinishedWithClientFailedToSend++ | ||
653 | b.mu.Unlock() | ||
654 | err = ctx.Err() | ||
655 | return | ||
656 | case <-ch: | ||
657 | b.mu.Lock() | ||
658 | if b.done { | ||
659 | b.clientStats.NumCallsFinished++ | ||
660 | b.clientStats.NumCallsFinishedWithClientFailedToSend++ | ||
661 | b.mu.Unlock() | ||
662 | err = ErrClientConnClosing | ||
663 | return | ||
664 | } | ||
665 | |||
666 | if len(b.addrs) > 0 { | ||
667 | if b.next >= len(b.addrs) { | ||
668 | b.next = 0 | ||
669 | } | ||
670 | next := b.next | ||
671 | for { | ||
672 | a := b.addrs[next] | ||
673 | next = (next + 1) % len(b.addrs) | ||
674 | if a.connected { | ||
675 | if !a.dropForRateLimiting && !a.dropForLoadBalancing { | ||
676 | addr = a.addr | ||
677 | b.next = next | ||
678 | b.mu.Unlock() | ||
679 | return | ||
680 | } | ||
681 | if !opts.BlockingWait { | ||
682 | b.next = next | ||
683 | if a.dropForLoadBalancing { | ||
684 | b.clientStats.NumCallsFinished++ | ||
685 | b.clientStats.NumCallsFinishedWithDropForLoadBalancing++ | ||
686 | } else if a.dropForRateLimiting { | ||
687 | b.clientStats.NumCallsFinished++ | ||
688 | b.clientStats.NumCallsFinishedWithDropForRateLimiting++ | ||
689 | } | ||
690 | b.mu.Unlock() | ||
691 | err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr) | ||
692 | return | ||
693 | } | ||
694 | } | ||
695 | if next == b.next { | ||
696 | // Has iterated all the possible address but none is connected. | ||
697 | break | ||
698 | } | ||
699 | } | ||
700 | } | ||
701 | // The newly added addr got removed by Down() again. | ||
702 | if b.waitCh == nil { | ||
703 | ch = make(chan struct{}) | ||
704 | b.waitCh = ch | ||
705 | } else { | ||
706 | ch = b.waitCh | ||
707 | } | ||
708 | b.mu.Unlock() | ||
709 | } | ||
710 | } | ||
711 | } | ||
712 | |||
713 | func (b *balancer) Notify() <-chan []Address { | ||
714 | return b.addrCh | ||
715 | } | ||
716 | |||
717 | func (b *balancer) Close() error { | ||
718 | b.mu.Lock() | ||
719 | defer b.mu.Unlock() | ||
720 | if b.done { | ||
721 | return errBalancerClosed | ||
722 | } | ||
723 | b.done = true | ||
724 | if b.expTimer != nil { | ||
725 | b.expTimer.Stop() | ||
726 | } | ||
727 | if b.waitCh != nil { | ||
728 | close(b.waitCh) | ||
729 | } | ||
730 | if b.addrCh != nil { | ||
731 | close(b.addrCh) | ||
732 | } | ||
733 | if b.w != nil { | ||
734 | b.w.Close() | ||
735 | } | ||
736 | return nil | ||
737 | } | ||