]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | /* |
2 | * | |
3 | * Copyright 2016 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package grpc | |
20 | ||
21 | import ( | |
22 | "errors" | |
23 | "fmt" | |
24 | "math/rand" | |
25 | "net" | |
26 | "sync" | |
27 | "time" | |
28 | ||
29 | "golang.org/x/net/context" | |
30 | "google.golang.org/grpc/codes" | |
31 | lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1" | |
32 | "google.golang.org/grpc/grpclog" | |
33 | "google.golang.org/grpc/metadata" | |
34 | "google.golang.org/grpc/naming" | |
35 | ) | |
36 | ||
37 | // Client API for LoadBalancer service. | |
38 | // Mostly copied from generated pb.go file. | |
39 | // To avoid circular dependency. | |
40 | type loadBalancerClient struct { | |
41 | cc *ClientConn | |
42 | } | |
43 | ||
44 | func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) { | |
45 | desc := &StreamDesc{ | |
46 | StreamName: "BalanceLoad", | |
47 | ServerStreams: true, | |
48 | ClientStreams: true, | |
49 | } | |
50 | stream, err := NewClientStream(ctx, desc, c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...) | |
51 | if err != nil { | |
52 | return nil, err | |
53 | } | |
54 | x := &balanceLoadClientStream{stream} | |
55 | return x, nil | |
56 | } | |
57 | ||
58 | type balanceLoadClientStream struct { | |
59 | ClientStream | |
60 | } | |
61 | ||
62 | func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error { | |
63 | return x.ClientStream.SendMsg(m) | |
64 | } | |
65 | ||
66 | func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) { | |
67 | m := new(lbpb.LoadBalanceResponse) | |
68 | if err := x.ClientStream.RecvMsg(m); err != nil { | |
69 | return nil, err | |
70 | } | |
71 | return m, nil | |
72 | } | |
73 | ||
74 | // NewGRPCLBBalancer creates a grpclb load balancer. | |
75 | func NewGRPCLBBalancer(r naming.Resolver) Balancer { | |
76 | return &balancer{ | |
77 | r: r, | |
78 | } | |
79 | } | |
80 | ||
81 | type remoteBalancerInfo struct { | |
82 | addr string | |
83 | // the server name used for authentication with the remote LB server. | |
84 | name string | |
85 | } | |
86 | ||
87 | // grpclbAddrInfo consists of the information of a backend server. | |
88 | type grpclbAddrInfo struct { | |
89 | addr Address | |
90 | connected bool | |
91 | // dropForRateLimiting indicates whether this particular request should be | |
92 | // dropped by the client for rate limiting. | |
93 | dropForRateLimiting bool | |
94 | // dropForLoadBalancing indicates whether this particular request should be | |
95 | // dropped by the client for load balancing. | |
96 | dropForLoadBalancing bool | |
97 | } | |
98 | ||
99 | type balancer struct { | |
100 | r naming.Resolver | |
101 | target string | |
102 | mu sync.Mutex | |
103 | seq int // a sequence number to make sure addrCh does not get stale addresses. | |
104 | w naming.Watcher | |
105 | addrCh chan []Address | |
106 | rbs []remoteBalancerInfo | |
107 | addrs []*grpclbAddrInfo | |
108 | next int | |
109 | waitCh chan struct{} | |
110 | done bool | |
111 | expTimer *time.Timer | |
112 | rand *rand.Rand | |
113 | ||
114 | clientStats lbpb.ClientStats | |
115 | } | |
116 | ||
117 | func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error { | |
118 | updates, err := w.Next() | |
119 | if err != nil { | |
120 | grpclog.Warningf("grpclb: failed to get next addr update from watcher: %v", err) | |
121 | return err | |
122 | } | |
123 | b.mu.Lock() | |
124 | defer b.mu.Unlock() | |
125 | if b.done { | |
126 | return ErrClientConnClosing | |
127 | } | |
128 | for _, update := range updates { | |
129 | switch update.Op { | |
130 | case naming.Add: | |
131 | var exist bool | |
132 | for _, v := range b.rbs { | |
133 | // TODO: Is the same addr with different server name a different balancer? | |
134 | if update.Addr == v.addr { | |
135 | exist = true | |
136 | break | |
137 | } | |
138 | } | |
139 | if exist { | |
140 | continue | |
141 | } | |
142 | md, ok := update.Metadata.(*naming.AddrMetadataGRPCLB) | |
143 | if !ok { | |
144 | // TODO: Revisit the handling here and may introduce some fallback mechanism. | |
145 | grpclog.Errorf("The name resolution contains unexpected metadata %v", update.Metadata) | |
146 | continue | |
147 | } | |
148 | switch md.AddrType { | |
149 | case naming.Backend: | |
150 | // TODO: Revisit the handling here and may introduce some fallback mechanism. | |
151 | grpclog.Errorf("The name resolution does not give grpclb addresses") | |
152 | continue | |
153 | case naming.GRPCLB: | |
154 | b.rbs = append(b.rbs, remoteBalancerInfo{ | |
155 | addr: update.Addr, | |
156 | name: md.ServerName, | |
157 | }) | |
158 | default: | |
159 | grpclog.Errorf("Received unknow address type %d", md.AddrType) | |
160 | continue | |
161 | } | |
162 | case naming.Delete: | |
163 | for i, v := range b.rbs { | |
164 | if update.Addr == v.addr { | |
165 | copy(b.rbs[i:], b.rbs[i+1:]) | |
166 | b.rbs = b.rbs[:len(b.rbs)-1] | |
167 | break | |
168 | } | |
169 | } | |
170 | default: | |
171 | grpclog.Errorf("Unknown update.Op %v", update.Op) | |
172 | } | |
173 | } | |
174 | // TODO: Fall back to the basic round-robin load balancing if the resulting address is | |
175 | // not a load balancer. | |
176 | select { | |
177 | case <-ch: | |
178 | default: | |
179 | } | |
180 | ch <- b.rbs | |
181 | return nil | |
182 | } | |
183 | ||
184 | func (b *balancer) serverListExpire(seq int) { | |
185 | b.mu.Lock() | |
186 | defer b.mu.Unlock() | |
187 | // TODO: gRPC interanls do not clear the connections when the server list is stale. | |
188 | // This means RPCs will keep using the existing server list until b receives new | |
189 | // server list even though the list is expired. Revisit this behavior later. | |
190 | if b.done || seq < b.seq { | |
191 | return | |
192 | } | |
193 | b.next = 0 | |
194 | b.addrs = nil | |
195 | // Ask grpc internals to close all the corresponding connections. | |
196 | b.addrCh <- nil | |
197 | } | |
198 | ||
199 | func convertDuration(d *lbpb.Duration) time.Duration { | |
200 | if d == nil { | |
201 | return 0 | |
202 | } | |
203 | return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond | |
204 | } | |
205 | ||
206 | func (b *balancer) processServerList(l *lbpb.ServerList, seq int) { | |
207 | if l == nil { | |
208 | return | |
209 | } | |
210 | servers := l.GetServers() | |
211 | expiration := convertDuration(l.GetExpirationInterval()) | |
212 | var ( | |
213 | sl []*grpclbAddrInfo | |
214 | addrs []Address | |
215 | ) | |
216 | for _, s := range servers { | |
217 | md := metadata.Pairs("lb-token", s.LoadBalanceToken) | |
218 | ip := net.IP(s.IpAddress) | |
219 | ipStr := ip.String() | |
220 | if ip.To4() == nil { | |
221 | // Add square brackets to ipv6 addresses, otherwise net.Dial() and | |
222 | // net.SplitHostPort() will return too many colons error. | |
223 | ipStr = fmt.Sprintf("[%s]", ipStr) | |
224 | } | |
225 | addr := Address{ | |
226 | Addr: fmt.Sprintf("%s:%d", ipStr, s.Port), | |
227 | Metadata: &md, | |
228 | } | |
229 | sl = append(sl, &grpclbAddrInfo{ | |
230 | addr: addr, | |
231 | dropForRateLimiting: s.DropForRateLimiting, | |
232 | dropForLoadBalancing: s.DropForLoadBalancing, | |
233 | }) | |
234 | addrs = append(addrs, addr) | |
235 | } | |
236 | b.mu.Lock() | |
237 | defer b.mu.Unlock() | |
238 | if b.done || seq < b.seq { | |
239 | return | |
240 | } | |
241 | if len(sl) > 0 { | |
242 | // reset b.next to 0 when replacing the server list. | |
243 | b.next = 0 | |
244 | b.addrs = sl | |
245 | b.addrCh <- addrs | |
246 | if b.expTimer != nil { | |
247 | b.expTimer.Stop() | |
248 | b.expTimer = nil | |
249 | } | |
250 | if expiration > 0 { | |
251 | b.expTimer = time.AfterFunc(expiration, func() { | |
252 | b.serverListExpire(seq) | |
253 | }) | |
254 | } | |
255 | } | |
256 | return | |
257 | } | |
258 | ||
259 | func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) { | |
260 | ticker := time.NewTicker(interval) | |
261 | defer ticker.Stop() | |
262 | for { | |
263 | select { | |
264 | case <-ticker.C: | |
265 | case <-done: | |
266 | return | |
267 | } | |
268 | b.mu.Lock() | |
269 | stats := b.clientStats | |
270 | b.clientStats = lbpb.ClientStats{} // Clear the stats. | |
271 | b.mu.Unlock() | |
272 | t := time.Now() | |
273 | stats.Timestamp = &lbpb.Timestamp{ | |
274 | Seconds: t.Unix(), | |
275 | Nanos: int32(t.Nanosecond()), | |
276 | } | |
277 | if err := s.Send(&lbpb.LoadBalanceRequest{ | |
278 | LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{ | |
279 | ClientStats: &stats, | |
280 | }, | |
281 | }); err != nil { | |
282 | grpclog.Errorf("grpclb: failed to send load report: %v", err) | |
283 | return | |
284 | } | |
285 | } | |
286 | } | |
287 | ||
288 | func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) { | |
289 | ctx, cancel := context.WithCancel(context.Background()) | |
290 | defer cancel() | |
291 | stream, err := lbc.BalanceLoad(ctx) | |
292 | if err != nil { | |
293 | grpclog.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err) | |
294 | return | |
295 | } | |
296 | b.mu.Lock() | |
297 | if b.done { | |
298 | b.mu.Unlock() | |
299 | return | |
300 | } | |
301 | b.mu.Unlock() | |
302 | initReq := &lbpb.LoadBalanceRequest{ | |
303 | LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{ | |
304 | InitialRequest: &lbpb.InitialLoadBalanceRequest{ | |
305 | Name: b.target, | |
306 | }, | |
307 | }, | |
308 | } | |
309 | if err := stream.Send(initReq); err != nil { | |
310 | grpclog.Errorf("grpclb: failed to send init request: %v", err) | |
311 | // TODO: backoff on retry? | |
312 | return true | |
313 | } | |
314 | reply, err := stream.Recv() | |
315 | if err != nil { | |
316 | grpclog.Errorf("grpclb: failed to recv init response: %v", err) | |
317 | // TODO: backoff on retry? | |
318 | return true | |
319 | } | |
320 | initResp := reply.GetInitialResponse() | |
321 | if initResp == nil { | |
322 | grpclog.Errorf("grpclb: reply from remote balancer did not include initial response.") | |
323 | return | |
324 | } | |
325 | // TODO: Support delegation. | |
326 | if initResp.LoadBalancerDelegate != "" { | |
327 | // delegation | |
328 | grpclog.Errorf("TODO: Delegation is not supported yet.") | |
329 | return | |
330 | } | |
331 | streamDone := make(chan struct{}) | |
332 | defer close(streamDone) | |
333 | b.mu.Lock() | |
334 | b.clientStats = lbpb.ClientStats{} // Clear client stats. | |
335 | b.mu.Unlock() | |
336 | if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 { | |
337 | go b.sendLoadReport(stream, d, streamDone) | |
338 | } | |
339 | // Retrieve the server list. | |
340 | for { | |
341 | reply, err := stream.Recv() | |
342 | if err != nil { | |
343 | grpclog.Errorf("grpclb: failed to recv server list: %v", err) | |
344 | break | |
345 | } | |
346 | b.mu.Lock() | |
347 | if b.done || seq < b.seq { | |
348 | b.mu.Unlock() | |
349 | return | |
350 | } | |
351 | b.seq++ // tick when receiving a new list of servers. | |
352 | seq = b.seq | |
353 | b.mu.Unlock() | |
354 | if serverList := reply.GetServerList(); serverList != nil { | |
355 | b.processServerList(serverList, seq) | |
356 | } | |
357 | } | |
358 | return true | |
359 | } | |
360 | ||
361 | func (b *balancer) Start(target string, config BalancerConfig) error { | |
362 | b.rand = rand.New(rand.NewSource(time.Now().Unix())) | |
363 | // TODO: Fall back to the basic direct connection if there is no name resolver. | |
364 | if b.r == nil { | |
365 | return errors.New("there is no name resolver installed") | |
366 | } | |
367 | b.target = target | |
368 | b.mu.Lock() | |
369 | if b.done { | |
370 | b.mu.Unlock() | |
371 | return ErrClientConnClosing | |
372 | } | |
373 | b.addrCh = make(chan []Address) | |
374 | w, err := b.r.Resolve(target) | |
375 | if err != nil { | |
376 | b.mu.Unlock() | |
377 | grpclog.Errorf("grpclb: failed to resolve address: %v, err: %v", target, err) | |
378 | return err | |
379 | } | |
380 | b.w = w | |
381 | b.mu.Unlock() | |
382 | balancerAddrsCh := make(chan []remoteBalancerInfo, 1) | |
383 | // Spawn a goroutine to monitor the name resolution of remote load balancer. | |
384 | go func() { | |
385 | for { | |
386 | if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil { | |
387 | grpclog.Warningf("grpclb: the naming watcher stops working due to %v.\n", err) | |
388 | close(balancerAddrsCh) | |
389 | return | |
390 | } | |
391 | } | |
392 | }() | |
393 | // Spawn a goroutine to talk to the remote load balancer. | |
394 | go func() { | |
395 | var ( | |
396 | cc *ClientConn | |
397 | // ccError is closed when there is an error in the current cc. | |
398 | // A new rb should be picked from rbs and connected. | |
399 | ccError chan struct{} | |
400 | rb *remoteBalancerInfo | |
401 | rbs []remoteBalancerInfo | |
402 | rbIdx int | |
403 | ) | |
404 | ||
405 | defer func() { | |
406 | if ccError != nil { | |
407 | select { | |
408 | case <-ccError: | |
409 | default: | |
410 | close(ccError) | |
411 | } | |
412 | } | |
413 | if cc != nil { | |
414 | cc.Close() | |
415 | } | |
416 | }() | |
417 | ||
418 | for { | |
419 | var ok bool | |
420 | select { | |
421 | case rbs, ok = <-balancerAddrsCh: | |
422 | if !ok { | |
423 | return | |
424 | } | |
425 | foundIdx := -1 | |
426 | if rb != nil { | |
427 | for i, trb := range rbs { | |
428 | if trb == *rb { | |
429 | foundIdx = i | |
430 | break | |
431 | } | |
432 | } | |
433 | } | |
434 | if foundIdx >= 0 { | |
435 | if foundIdx >= 1 { | |
436 | // Move the address in use to the beginning of the list. | |
437 | b.rbs[0], b.rbs[foundIdx] = b.rbs[foundIdx], b.rbs[0] | |
438 | rbIdx = 0 | |
439 | } | |
440 | continue // If found, don't dial new cc. | |
441 | } else if len(rbs) > 0 { | |
442 | // Pick a random one from the list, instead of always using the first one. | |
443 | if l := len(rbs); l > 1 && rb != nil { | |
444 | tmpIdx := b.rand.Intn(l - 1) | |
445 | b.rbs[0], b.rbs[tmpIdx] = b.rbs[tmpIdx], b.rbs[0] | |
446 | } | |
447 | rbIdx = 0 | |
448 | rb = &rbs[0] | |
449 | } else { | |
450 | // foundIdx < 0 && len(rbs) <= 0. | |
451 | rb = nil | |
452 | } | |
453 | case <-ccError: | |
454 | ccError = nil | |
455 | if rbIdx < len(rbs)-1 { | |
456 | rbIdx++ | |
457 | rb = &rbs[rbIdx] | |
458 | } else { | |
459 | rb = nil | |
460 | } | |
461 | } | |
462 | ||
463 | if rb == nil { | |
464 | continue | |
465 | } | |
466 | ||
467 | if cc != nil { | |
468 | cc.Close() | |
469 | } | |
470 | // Talk to the remote load balancer to get the server list. | |
471 | var ( | |
472 | err error | |
473 | dopts []DialOption | |
474 | ) | |
475 | if creds := config.DialCreds; creds != nil { | |
476 | if rb.name != "" { | |
477 | if err := creds.OverrideServerName(rb.name); err != nil { | |
478 | grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v", err) | |
479 | continue | |
480 | } | |
481 | } | |
482 | dopts = append(dopts, WithTransportCredentials(creds)) | |
483 | } else { | |
484 | dopts = append(dopts, WithInsecure()) | |
485 | } | |
486 | if dialer := config.Dialer; dialer != nil { | |
487 | // WithDialer takes a different type of function, so we instead use a special DialOption here. | |
488 | dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = dialer }) | |
489 | } | |
490 | ccError = make(chan struct{}) | |
491 | cc, err = Dial(rb.addr, dopts...) | |
492 | if err != nil { | |
493 | grpclog.Warningf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err) | |
494 | close(ccError) | |
495 | continue | |
496 | } | |
497 | b.mu.Lock() | |
498 | b.seq++ // tick when getting a new balancer address | |
499 | seq := b.seq | |
500 | b.next = 0 | |
501 | b.mu.Unlock() | |
502 | go func(cc *ClientConn, ccError chan struct{}) { | |
503 | lbc := &loadBalancerClient{cc} | |
504 | b.callRemoteBalancer(lbc, seq) | |
505 | cc.Close() | |
506 | select { | |
507 | case <-ccError: | |
508 | default: | |
509 | close(ccError) | |
510 | } | |
511 | }(cc, ccError) | |
512 | } | |
513 | }() | |
514 | return nil | |
515 | } | |
516 | ||
517 | func (b *balancer) down(addr Address, err error) { | |
518 | b.mu.Lock() | |
519 | defer b.mu.Unlock() | |
520 | for _, a := range b.addrs { | |
521 | if addr == a.addr { | |
522 | a.connected = false | |
523 | break | |
524 | } | |
525 | } | |
526 | } | |
527 | ||
528 | func (b *balancer) Up(addr Address) func(error) { | |
529 | b.mu.Lock() | |
530 | defer b.mu.Unlock() | |
531 | if b.done { | |
532 | return nil | |
533 | } | |
534 | var cnt int | |
535 | for _, a := range b.addrs { | |
536 | if a.addr == addr { | |
537 | if a.connected { | |
538 | return nil | |
539 | } | |
540 | a.connected = true | |
541 | } | |
542 | if a.connected && !a.dropForRateLimiting && !a.dropForLoadBalancing { | |
543 | cnt++ | |
544 | } | |
545 | } | |
546 | // addr is the only one which is connected. Notify the Get() callers who are blocking. | |
547 | if cnt == 1 && b.waitCh != nil { | |
548 | close(b.waitCh) | |
549 | b.waitCh = nil | |
550 | } | |
551 | return func(err error) { | |
552 | b.down(addr, err) | |
553 | } | |
554 | } | |
555 | ||
556 | func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) { | |
557 | var ch chan struct{} | |
558 | b.mu.Lock() | |
559 | if b.done { | |
560 | b.mu.Unlock() | |
561 | err = ErrClientConnClosing | |
562 | return | |
563 | } | |
564 | seq := b.seq | |
565 | ||
566 | defer func() { | |
567 | if err != nil { | |
568 | return | |
569 | } | |
570 | put = func() { | |
571 | s, ok := rpcInfoFromContext(ctx) | |
572 | if !ok { | |
573 | return | |
574 | } | |
575 | b.mu.Lock() | |
576 | defer b.mu.Unlock() | |
577 | if b.done || seq < b.seq { | |
578 | return | |
579 | } | |
580 | b.clientStats.NumCallsFinished++ | |
581 | if !s.bytesSent { | |
582 | b.clientStats.NumCallsFinishedWithClientFailedToSend++ | |
583 | } else if s.bytesReceived { | |
584 | b.clientStats.NumCallsFinishedKnownReceived++ | |
585 | } | |
586 | } | |
587 | }() | |
588 | ||
589 | b.clientStats.NumCallsStarted++ | |
590 | if len(b.addrs) > 0 { | |
591 | if b.next >= len(b.addrs) { | |
592 | b.next = 0 | |
593 | } | |
594 | next := b.next | |
595 | for { | |
596 | a := b.addrs[next] | |
597 | next = (next + 1) % len(b.addrs) | |
598 | if a.connected { | |
599 | if !a.dropForRateLimiting && !a.dropForLoadBalancing { | |
600 | addr = a.addr | |
601 | b.next = next | |
602 | b.mu.Unlock() | |
603 | return | |
604 | } | |
605 | if !opts.BlockingWait { | |
606 | b.next = next | |
607 | if a.dropForLoadBalancing { | |
608 | b.clientStats.NumCallsFinished++ | |
609 | b.clientStats.NumCallsFinishedWithDropForLoadBalancing++ | |
610 | } else if a.dropForRateLimiting { | |
611 | b.clientStats.NumCallsFinished++ | |
612 | b.clientStats.NumCallsFinishedWithDropForRateLimiting++ | |
613 | } | |
614 | b.mu.Unlock() | |
615 | err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr) | |
616 | return | |
617 | } | |
618 | } | |
619 | if next == b.next { | |
620 | // Has iterated all the possible address but none is connected. | |
621 | break | |
622 | } | |
623 | } | |
624 | } | |
625 | if !opts.BlockingWait { | |
626 | if len(b.addrs) == 0 { | |
627 | b.clientStats.NumCallsFinished++ | |
628 | b.clientStats.NumCallsFinishedWithClientFailedToSend++ | |
629 | b.mu.Unlock() | |
630 | err = Errorf(codes.Unavailable, "there is no address available") | |
631 | return | |
632 | } | |
633 | // Returns the next addr on b.addrs for a failfast RPC. | |
634 | addr = b.addrs[b.next].addr | |
635 | b.next++ | |
636 | b.mu.Unlock() | |
637 | return | |
638 | } | |
639 | // Wait on b.waitCh for non-failfast RPCs. | |
640 | if b.waitCh == nil { | |
641 | ch = make(chan struct{}) | |
642 | b.waitCh = ch | |
643 | } else { | |
644 | ch = b.waitCh | |
645 | } | |
646 | b.mu.Unlock() | |
647 | for { | |
648 | select { | |
649 | case <-ctx.Done(): | |
650 | b.mu.Lock() | |
651 | b.clientStats.NumCallsFinished++ | |
652 | b.clientStats.NumCallsFinishedWithClientFailedToSend++ | |
653 | b.mu.Unlock() | |
654 | err = ctx.Err() | |
655 | return | |
656 | case <-ch: | |
657 | b.mu.Lock() | |
658 | if b.done { | |
659 | b.clientStats.NumCallsFinished++ | |
660 | b.clientStats.NumCallsFinishedWithClientFailedToSend++ | |
661 | b.mu.Unlock() | |
662 | err = ErrClientConnClosing | |
663 | return | |
664 | } | |
665 | ||
666 | if len(b.addrs) > 0 { | |
667 | if b.next >= len(b.addrs) { | |
668 | b.next = 0 | |
669 | } | |
670 | next := b.next | |
671 | for { | |
672 | a := b.addrs[next] | |
673 | next = (next + 1) % len(b.addrs) | |
674 | if a.connected { | |
675 | if !a.dropForRateLimiting && !a.dropForLoadBalancing { | |
676 | addr = a.addr | |
677 | b.next = next | |
678 | b.mu.Unlock() | |
679 | return | |
680 | } | |
681 | if !opts.BlockingWait { | |
682 | b.next = next | |
683 | if a.dropForLoadBalancing { | |
684 | b.clientStats.NumCallsFinished++ | |
685 | b.clientStats.NumCallsFinishedWithDropForLoadBalancing++ | |
686 | } else if a.dropForRateLimiting { | |
687 | b.clientStats.NumCallsFinished++ | |
688 | b.clientStats.NumCallsFinishedWithDropForRateLimiting++ | |
689 | } | |
690 | b.mu.Unlock() | |
691 | err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr) | |
692 | return | |
693 | } | |
694 | } | |
695 | if next == b.next { | |
696 | // Has iterated all the possible address but none is connected. | |
697 | break | |
698 | } | |
699 | } | |
700 | } | |
701 | // The newly added addr got removed by Down() again. | |
702 | if b.waitCh == nil { | |
703 | ch = make(chan struct{}) | |
704 | b.waitCh = ch | |
705 | } else { | |
706 | ch = b.waitCh | |
707 | } | |
708 | b.mu.Unlock() | |
709 | } | |
710 | } | |
711 | } | |
712 | ||
713 | func (b *balancer) Notify() <-chan []Address { | |
714 | return b.addrCh | |
715 | } | |
716 | ||
717 | func (b *balancer) Close() error { | |
718 | b.mu.Lock() | |
719 | defer b.mu.Unlock() | |
720 | if b.done { | |
721 | return errBalancerClosed | |
722 | } | |
723 | b.done = true | |
724 | if b.expTimer != nil { | |
725 | b.expTimer.Stop() | |
726 | } | |
727 | if b.waitCh != nil { | |
728 | close(b.waitCh) | |
729 | } | |
730 | if b.addrCh != nil { | |
731 | close(b.addrCh) | |
732 | } | |
733 | if b.w != nil { | |
734 | b.w.Close() | |
735 | } | |
736 | return nil | |
737 | } |