aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/google.golang.org/grpc/grpclb.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/grpclb.go')
-rw-r--r--vendor/google.golang.org/grpc/grpclb.go737
1 files changed, 737 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/grpclb.go b/vendor/google.golang.org/grpc/grpclb.go
new file mode 100644
index 0000000..f7b6b7d
--- /dev/null
+++ b/vendor/google.golang.org/grpc/grpclb.go
@@ -0,0 +1,737 @@
1/*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "errors"
23 "fmt"
24 "math/rand"
25 "net"
26 "sync"
27 "time"
28
29 "golang.org/x/net/context"
30 "google.golang.org/grpc/codes"
31 lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1"
32 "google.golang.org/grpc/grpclog"
33 "google.golang.org/grpc/metadata"
34 "google.golang.org/grpc/naming"
35)
36
37// Client API for LoadBalancer service.
38// Mostly copied from generated pb.go file.
39// To avoid circular dependency.
40type loadBalancerClient struct {
41 cc *ClientConn
42}
43
44func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) {
45 desc := &StreamDesc{
46 StreamName: "BalanceLoad",
47 ServerStreams: true,
48 ClientStreams: true,
49 }
50 stream, err := NewClientStream(ctx, desc, c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
51 if err != nil {
52 return nil, err
53 }
54 x := &balanceLoadClientStream{stream}
55 return x, nil
56}
57
58type balanceLoadClientStream struct {
59 ClientStream
60}
61
62func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
63 return x.ClientStream.SendMsg(m)
64}
65
66func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
67 m := new(lbpb.LoadBalanceResponse)
68 if err := x.ClientStream.RecvMsg(m); err != nil {
69 return nil, err
70 }
71 return m, nil
72}
73
74// NewGRPCLBBalancer creates a grpclb load balancer.
75func NewGRPCLBBalancer(r naming.Resolver) Balancer {
76 return &balancer{
77 r: r,
78 }
79}
80
81type remoteBalancerInfo struct {
82 addr string
83 // the server name used for authentication with the remote LB server.
84 name string
85}
86
87// grpclbAddrInfo consists of the information of a backend server.
88type grpclbAddrInfo struct {
89 addr Address
90 connected bool
91 // dropForRateLimiting indicates whether this particular request should be
92 // dropped by the client for rate limiting.
93 dropForRateLimiting bool
94 // dropForLoadBalancing indicates whether this particular request should be
95 // dropped by the client for load balancing.
96 dropForLoadBalancing bool
97}
98
99type balancer struct {
100 r naming.Resolver
101 target string
102 mu sync.Mutex
103 seq int // a sequence number to make sure addrCh does not get stale addresses.
104 w naming.Watcher
105 addrCh chan []Address
106 rbs []remoteBalancerInfo
107 addrs []*grpclbAddrInfo
108 next int
109 waitCh chan struct{}
110 done bool
111 expTimer *time.Timer
112 rand *rand.Rand
113
114 clientStats lbpb.ClientStats
115}
116
117func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
118 updates, err := w.Next()
119 if err != nil {
120 grpclog.Warningf("grpclb: failed to get next addr update from watcher: %v", err)
121 return err
122 }
123 b.mu.Lock()
124 defer b.mu.Unlock()
125 if b.done {
126 return ErrClientConnClosing
127 }
128 for _, update := range updates {
129 switch update.Op {
130 case naming.Add:
131 var exist bool
132 for _, v := range b.rbs {
133 // TODO: Is the same addr with different server name a different balancer?
134 if update.Addr == v.addr {
135 exist = true
136 break
137 }
138 }
139 if exist {
140 continue
141 }
142 md, ok := update.Metadata.(*naming.AddrMetadataGRPCLB)
143 if !ok {
144 // TODO: Revisit the handling here and may introduce some fallback mechanism.
145 grpclog.Errorf("The name resolution contains unexpected metadata %v", update.Metadata)
146 continue
147 }
148 switch md.AddrType {
149 case naming.Backend:
150 // TODO: Revisit the handling here and may introduce some fallback mechanism.
151 grpclog.Errorf("The name resolution does not give grpclb addresses")
152 continue
153 case naming.GRPCLB:
154 b.rbs = append(b.rbs, remoteBalancerInfo{
155 addr: update.Addr,
156 name: md.ServerName,
157 })
158 default:
159 grpclog.Errorf("Received unknow address type %d", md.AddrType)
160 continue
161 }
162 case naming.Delete:
163 for i, v := range b.rbs {
164 if update.Addr == v.addr {
165 copy(b.rbs[i:], b.rbs[i+1:])
166 b.rbs = b.rbs[:len(b.rbs)-1]
167 break
168 }
169 }
170 default:
171 grpclog.Errorf("Unknown update.Op %v", update.Op)
172 }
173 }
174 // TODO: Fall back to the basic round-robin load balancing if the resulting address is
175 // not a load balancer.
176 select {
177 case <-ch:
178 default:
179 }
180 ch <- b.rbs
181 return nil
182}
183
184func (b *balancer) serverListExpire(seq int) {
185 b.mu.Lock()
186 defer b.mu.Unlock()
187 // TODO: gRPC interanls do not clear the connections when the server list is stale.
188 // This means RPCs will keep using the existing server list until b receives new
189 // server list even though the list is expired. Revisit this behavior later.
190 if b.done || seq < b.seq {
191 return
192 }
193 b.next = 0
194 b.addrs = nil
195 // Ask grpc internals to close all the corresponding connections.
196 b.addrCh <- nil
197}
198
199func convertDuration(d *lbpb.Duration) time.Duration {
200 if d == nil {
201 return 0
202 }
203 return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
204}
205
206func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
207 if l == nil {
208 return
209 }
210 servers := l.GetServers()
211 expiration := convertDuration(l.GetExpirationInterval())
212 var (
213 sl []*grpclbAddrInfo
214 addrs []Address
215 )
216 for _, s := range servers {
217 md := metadata.Pairs("lb-token", s.LoadBalanceToken)
218 ip := net.IP(s.IpAddress)
219 ipStr := ip.String()
220 if ip.To4() == nil {
221 // Add square brackets to ipv6 addresses, otherwise net.Dial() and
222 // net.SplitHostPort() will return too many colons error.
223 ipStr = fmt.Sprintf("[%s]", ipStr)
224 }
225 addr := Address{
226 Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
227 Metadata: &md,
228 }
229 sl = append(sl, &grpclbAddrInfo{
230 addr: addr,
231 dropForRateLimiting: s.DropForRateLimiting,
232 dropForLoadBalancing: s.DropForLoadBalancing,
233 })
234 addrs = append(addrs, addr)
235 }
236 b.mu.Lock()
237 defer b.mu.Unlock()
238 if b.done || seq < b.seq {
239 return
240 }
241 if len(sl) > 0 {
242 // reset b.next to 0 when replacing the server list.
243 b.next = 0
244 b.addrs = sl
245 b.addrCh <- addrs
246 if b.expTimer != nil {
247 b.expTimer.Stop()
248 b.expTimer = nil
249 }
250 if expiration > 0 {
251 b.expTimer = time.AfterFunc(expiration, func() {
252 b.serverListExpire(seq)
253 })
254 }
255 }
256 return
257}
258
259func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
260 ticker := time.NewTicker(interval)
261 defer ticker.Stop()
262 for {
263 select {
264 case <-ticker.C:
265 case <-done:
266 return
267 }
268 b.mu.Lock()
269 stats := b.clientStats
270 b.clientStats = lbpb.ClientStats{} // Clear the stats.
271 b.mu.Unlock()
272 t := time.Now()
273 stats.Timestamp = &lbpb.Timestamp{
274 Seconds: t.Unix(),
275 Nanos: int32(t.Nanosecond()),
276 }
277 if err := s.Send(&lbpb.LoadBalanceRequest{
278 LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
279 ClientStats: &stats,
280 },
281 }); err != nil {
282 grpclog.Errorf("grpclb: failed to send load report: %v", err)
283 return
284 }
285 }
286}
287
288func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
289 ctx, cancel := context.WithCancel(context.Background())
290 defer cancel()
291 stream, err := lbc.BalanceLoad(ctx)
292 if err != nil {
293 grpclog.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
294 return
295 }
296 b.mu.Lock()
297 if b.done {
298 b.mu.Unlock()
299 return
300 }
301 b.mu.Unlock()
302 initReq := &lbpb.LoadBalanceRequest{
303 LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
304 InitialRequest: &lbpb.InitialLoadBalanceRequest{
305 Name: b.target,
306 },
307 },
308 }
309 if err := stream.Send(initReq); err != nil {
310 grpclog.Errorf("grpclb: failed to send init request: %v", err)
311 // TODO: backoff on retry?
312 return true
313 }
314 reply, err := stream.Recv()
315 if err != nil {
316 grpclog.Errorf("grpclb: failed to recv init response: %v", err)
317 // TODO: backoff on retry?
318 return true
319 }
320 initResp := reply.GetInitialResponse()
321 if initResp == nil {
322 grpclog.Errorf("grpclb: reply from remote balancer did not include initial response.")
323 return
324 }
325 // TODO: Support delegation.
326 if initResp.LoadBalancerDelegate != "" {
327 // delegation
328 grpclog.Errorf("TODO: Delegation is not supported yet.")
329 return
330 }
331 streamDone := make(chan struct{})
332 defer close(streamDone)
333 b.mu.Lock()
334 b.clientStats = lbpb.ClientStats{} // Clear client stats.
335 b.mu.Unlock()
336 if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
337 go b.sendLoadReport(stream, d, streamDone)
338 }
339 // Retrieve the server list.
340 for {
341 reply, err := stream.Recv()
342 if err != nil {
343 grpclog.Errorf("grpclb: failed to recv server list: %v", err)
344 break
345 }
346 b.mu.Lock()
347 if b.done || seq < b.seq {
348 b.mu.Unlock()
349 return
350 }
351 b.seq++ // tick when receiving a new list of servers.
352 seq = b.seq
353 b.mu.Unlock()
354 if serverList := reply.GetServerList(); serverList != nil {
355 b.processServerList(serverList, seq)
356 }
357 }
358 return true
359}
360
361func (b *balancer) Start(target string, config BalancerConfig) error {
362 b.rand = rand.New(rand.NewSource(time.Now().Unix()))
363 // TODO: Fall back to the basic direct connection if there is no name resolver.
364 if b.r == nil {
365 return errors.New("there is no name resolver installed")
366 }
367 b.target = target
368 b.mu.Lock()
369 if b.done {
370 b.mu.Unlock()
371 return ErrClientConnClosing
372 }
373 b.addrCh = make(chan []Address)
374 w, err := b.r.Resolve(target)
375 if err != nil {
376 b.mu.Unlock()
377 grpclog.Errorf("grpclb: failed to resolve address: %v, err: %v", target, err)
378 return err
379 }
380 b.w = w
381 b.mu.Unlock()
382 balancerAddrsCh := make(chan []remoteBalancerInfo, 1)
383 // Spawn a goroutine to monitor the name resolution of remote load balancer.
384 go func() {
385 for {
386 if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil {
387 grpclog.Warningf("grpclb: the naming watcher stops working due to %v.\n", err)
388 close(balancerAddrsCh)
389 return
390 }
391 }
392 }()
393 // Spawn a goroutine to talk to the remote load balancer.
394 go func() {
395 var (
396 cc *ClientConn
397 // ccError is closed when there is an error in the current cc.
398 // A new rb should be picked from rbs and connected.
399 ccError chan struct{}
400 rb *remoteBalancerInfo
401 rbs []remoteBalancerInfo
402 rbIdx int
403 )
404
405 defer func() {
406 if ccError != nil {
407 select {
408 case <-ccError:
409 default:
410 close(ccError)
411 }
412 }
413 if cc != nil {
414 cc.Close()
415 }
416 }()
417
418 for {
419 var ok bool
420 select {
421 case rbs, ok = <-balancerAddrsCh:
422 if !ok {
423 return
424 }
425 foundIdx := -1
426 if rb != nil {
427 for i, trb := range rbs {
428 if trb == *rb {
429 foundIdx = i
430 break
431 }
432 }
433 }
434 if foundIdx >= 0 {
435 if foundIdx >= 1 {
436 // Move the address in use to the beginning of the list.
437 b.rbs[0], b.rbs[foundIdx] = b.rbs[foundIdx], b.rbs[0]
438 rbIdx = 0
439 }
440 continue // If found, don't dial new cc.
441 } else if len(rbs) > 0 {
442 // Pick a random one from the list, instead of always using the first one.
443 if l := len(rbs); l > 1 && rb != nil {
444 tmpIdx := b.rand.Intn(l - 1)
445 b.rbs[0], b.rbs[tmpIdx] = b.rbs[tmpIdx], b.rbs[0]
446 }
447 rbIdx = 0
448 rb = &rbs[0]
449 } else {
450 // foundIdx < 0 && len(rbs) <= 0.
451 rb = nil
452 }
453 case <-ccError:
454 ccError = nil
455 if rbIdx < len(rbs)-1 {
456 rbIdx++
457 rb = &rbs[rbIdx]
458 } else {
459 rb = nil
460 }
461 }
462
463 if rb == nil {
464 continue
465 }
466
467 if cc != nil {
468 cc.Close()
469 }
470 // Talk to the remote load balancer to get the server list.
471 var (
472 err error
473 dopts []DialOption
474 )
475 if creds := config.DialCreds; creds != nil {
476 if rb.name != "" {
477 if err := creds.OverrideServerName(rb.name); err != nil {
478 grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v", err)
479 continue
480 }
481 }
482 dopts = append(dopts, WithTransportCredentials(creds))
483 } else {
484 dopts = append(dopts, WithInsecure())
485 }
486 if dialer := config.Dialer; dialer != nil {
487 // WithDialer takes a different type of function, so we instead use a special DialOption here.
488 dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = dialer })
489 }
490 ccError = make(chan struct{})
491 cc, err = Dial(rb.addr, dopts...)
492 if err != nil {
493 grpclog.Warningf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
494 close(ccError)
495 continue
496 }
497 b.mu.Lock()
498 b.seq++ // tick when getting a new balancer address
499 seq := b.seq
500 b.next = 0
501 b.mu.Unlock()
502 go func(cc *ClientConn, ccError chan struct{}) {
503 lbc := &loadBalancerClient{cc}
504 b.callRemoteBalancer(lbc, seq)
505 cc.Close()
506 select {
507 case <-ccError:
508 default:
509 close(ccError)
510 }
511 }(cc, ccError)
512 }
513 }()
514 return nil
515}
516
517func (b *balancer) down(addr Address, err error) {
518 b.mu.Lock()
519 defer b.mu.Unlock()
520 for _, a := range b.addrs {
521 if addr == a.addr {
522 a.connected = false
523 break
524 }
525 }
526}
527
528func (b *balancer) Up(addr Address) func(error) {
529 b.mu.Lock()
530 defer b.mu.Unlock()
531 if b.done {
532 return nil
533 }
534 var cnt int
535 for _, a := range b.addrs {
536 if a.addr == addr {
537 if a.connected {
538 return nil
539 }
540 a.connected = true
541 }
542 if a.connected && !a.dropForRateLimiting && !a.dropForLoadBalancing {
543 cnt++
544 }
545 }
546 // addr is the only one which is connected. Notify the Get() callers who are blocking.
547 if cnt == 1 && b.waitCh != nil {
548 close(b.waitCh)
549 b.waitCh = nil
550 }
551 return func(err error) {
552 b.down(addr, err)
553 }
554}
555
556func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
557 var ch chan struct{}
558 b.mu.Lock()
559 if b.done {
560 b.mu.Unlock()
561 err = ErrClientConnClosing
562 return
563 }
564 seq := b.seq
565
566 defer func() {
567 if err != nil {
568 return
569 }
570 put = func() {
571 s, ok := rpcInfoFromContext(ctx)
572 if !ok {
573 return
574 }
575 b.mu.Lock()
576 defer b.mu.Unlock()
577 if b.done || seq < b.seq {
578 return
579 }
580 b.clientStats.NumCallsFinished++
581 if !s.bytesSent {
582 b.clientStats.NumCallsFinishedWithClientFailedToSend++
583 } else if s.bytesReceived {
584 b.clientStats.NumCallsFinishedKnownReceived++
585 }
586 }
587 }()
588
589 b.clientStats.NumCallsStarted++
590 if len(b.addrs) > 0 {
591 if b.next >= len(b.addrs) {
592 b.next = 0
593 }
594 next := b.next
595 for {
596 a := b.addrs[next]
597 next = (next + 1) % len(b.addrs)
598 if a.connected {
599 if !a.dropForRateLimiting && !a.dropForLoadBalancing {
600 addr = a.addr
601 b.next = next
602 b.mu.Unlock()
603 return
604 }
605 if !opts.BlockingWait {
606 b.next = next
607 if a.dropForLoadBalancing {
608 b.clientStats.NumCallsFinished++
609 b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
610 } else if a.dropForRateLimiting {
611 b.clientStats.NumCallsFinished++
612 b.clientStats.NumCallsFinishedWithDropForRateLimiting++
613 }
614 b.mu.Unlock()
615 err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr)
616 return
617 }
618 }
619 if next == b.next {
620 // Has iterated all the possible address but none is connected.
621 break
622 }
623 }
624 }
625 if !opts.BlockingWait {
626 if len(b.addrs) == 0 {
627 b.clientStats.NumCallsFinished++
628 b.clientStats.NumCallsFinishedWithClientFailedToSend++
629 b.mu.Unlock()
630 err = Errorf(codes.Unavailable, "there is no address available")
631 return
632 }
633 // Returns the next addr on b.addrs for a failfast RPC.
634 addr = b.addrs[b.next].addr
635 b.next++
636 b.mu.Unlock()
637 return
638 }
639 // Wait on b.waitCh for non-failfast RPCs.
640 if b.waitCh == nil {
641 ch = make(chan struct{})
642 b.waitCh = ch
643 } else {
644 ch = b.waitCh
645 }
646 b.mu.Unlock()
647 for {
648 select {
649 case <-ctx.Done():
650 b.mu.Lock()
651 b.clientStats.NumCallsFinished++
652 b.clientStats.NumCallsFinishedWithClientFailedToSend++
653 b.mu.Unlock()
654 err = ctx.Err()
655 return
656 case <-ch:
657 b.mu.Lock()
658 if b.done {
659 b.clientStats.NumCallsFinished++
660 b.clientStats.NumCallsFinishedWithClientFailedToSend++
661 b.mu.Unlock()
662 err = ErrClientConnClosing
663 return
664 }
665
666 if len(b.addrs) > 0 {
667 if b.next >= len(b.addrs) {
668 b.next = 0
669 }
670 next := b.next
671 for {
672 a := b.addrs[next]
673 next = (next + 1) % len(b.addrs)
674 if a.connected {
675 if !a.dropForRateLimiting && !a.dropForLoadBalancing {
676 addr = a.addr
677 b.next = next
678 b.mu.Unlock()
679 return
680 }
681 if !opts.BlockingWait {
682 b.next = next
683 if a.dropForLoadBalancing {
684 b.clientStats.NumCallsFinished++
685 b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
686 } else if a.dropForRateLimiting {
687 b.clientStats.NumCallsFinished++
688 b.clientStats.NumCallsFinishedWithDropForRateLimiting++
689 }
690 b.mu.Unlock()
691 err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr)
692 return
693 }
694 }
695 if next == b.next {
696 // Has iterated all the possible address but none is connected.
697 break
698 }
699 }
700 }
701 // The newly added addr got removed by Down() again.
702 if b.waitCh == nil {
703 ch = make(chan struct{})
704 b.waitCh = ch
705 } else {
706 ch = b.waitCh
707 }
708 b.mu.Unlock()
709 }
710 }
711}
712
713func (b *balancer) Notify() <-chan []Address {
714 return b.addrCh
715}
716
717func (b *balancer) Close() error {
718 b.mu.Lock()
719 defer b.mu.Unlock()
720 if b.done {
721 return errBalancerClosed
722 }
723 b.done = true
724 if b.expTimer != nil {
725 b.expTimer.Stop()
726 }
727 if b.waitCh != nil {
728 close(b.waitCh)
729 }
730 if b.addrCh != nil {
731 close(b.addrCh)
732 }
733 if b.w != nil {
734 b.w.Close()
735 }
736 return nil
737}