3 * Copyright 2016 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
29 "golang.org/x/net/context"
30 "google.golang.org/grpc/codes"
31 lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1"
32 "google.golang.org/grpc/grpclog"
33 "google.golang.org/grpc/metadata"
34 "google.golang.org/grpc/naming"
37 // Client API for LoadBalancer service.
38 // Mostly copied from generated pb.go file.
39 // To avoid circular dependency.
40 type loadBalancerClient struct {
44 func (c *loadBalancerClient) BalanceLoad(ctx context.Context, opts ...CallOption) (*balanceLoadClientStream, error) {
46 StreamName: "BalanceLoad",
50 stream, err := NewClientStream(ctx, desc, c.cc, "/grpc.lb.v1.LoadBalancer/BalanceLoad", opts...)
54 x := &balanceLoadClientStream{stream}
58 type balanceLoadClientStream struct {
62 func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
63 return x.ClientStream.SendMsg(m)
66 func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
67 m := new(lbpb.LoadBalanceResponse)
68 if err := x.ClientStream.RecvMsg(m); err != nil {
74 // NewGRPCLBBalancer creates a grpclb load balancer.
75 func NewGRPCLBBalancer(r naming.Resolver) Balancer {
81 type remoteBalancerInfo struct {
83 // the server name used for authentication with the remote LB server.
87 // grpclbAddrInfo consists of the information of a backend server.
88 type grpclbAddrInfo struct {
91 // dropForRateLimiting indicates whether this particular request should be
92 // dropped by the client for rate limiting.
93 dropForRateLimiting bool
94 // dropForLoadBalancing indicates whether this particular request should be
95 // dropped by the client for load balancing.
96 dropForLoadBalancing bool
99 type balancer struct {
103 seq int // a sequence number to make sure addrCh does not get stale addresses.
105 addrCh chan []Address
106 rbs []remoteBalancerInfo
107 addrs []*grpclbAddrInfo
114 clientStats lbpb.ClientStats
117 func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
118 updates, err := w.Next()
120 grpclog.Warningf("grpclb: failed to get next addr update from watcher: %v", err)
126 return ErrClientConnClosing
128 for _, update := range updates {
132 for _, v := range b.rbs {
133 // TODO: Is the same addr with different server name a different balancer?
134 if update.Addr == v.addr {
142 md, ok := update.Metadata.(*naming.AddrMetadataGRPCLB)
144 // TODO: Revisit the handling here and may introduce some fallback mechanism.
145 grpclog.Errorf("The name resolution contains unexpected metadata %v", update.Metadata)
150 // TODO: Revisit the handling here and may introduce some fallback mechanism.
151 grpclog.Errorf("The name resolution does not give grpclb addresses")
154 b.rbs = append(b.rbs, remoteBalancerInfo{
159 grpclog.Errorf("Received unknow address type %d", md.AddrType)
163 for i, v := range b.rbs {
164 if update.Addr == v.addr {
165 copy(b.rbs[i:], b.rbs[i+1:])
166 b.rbs = b.rbs[:len(b.rbs)-1]
171 grpclog.Errorf("Unknown update.Op %v", update.Op)
174 // TODO: Fall back to the basic round-robin load balancing if the resulting address is
175 // not a load balancer.
184 func (b *balancer) serverListExpire(seq int) {
187 // TODO: gRPC interanls do not clear the connections when the server list is stale.
188 // This means RPCs will keep using the existing server list until b receives new
189 // server list even though the list is expired. Revisit this behavior later.
190 if b.done || seq < b.seq {
195 // Ask grpc internals to close all the corresponding connections.
199 func convertDuration(d *lbpb.Duration) time.Duration {
203 return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
206 func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
210 servers := l.GetServers()
211 expiration := convertDuration(l.GetExpirationInterval())
216 for _, s := range servers {
217 md := metadata.Pairs("lb-token", s.LoadBalanceToken)
218 ip := net.IP(s.IpAddress)
221 // Add square brackets to ipv6 addresses, otherwise net.Dial() and
222 // net.SplitHostPort() will return too many colons error.
223 ipStr = fmt.Sprintf("[%s]", ipStr)
226 Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
229 sl = append(sl, &grpclbAddrInfo{
231 dropForRateLimiting: s.DropForRateLimiting,
232 dropForLoadBalancing: s.DropForLoadBalancing,
234 addrs = append(addrs, addr)
238 if b.done || seq < b.seq {
242 // reset b.next to 0 when replacing the server list.
246 if b.expTimer != nil {
251 b.expTimer = time.AfterFunc(expiration, func() {
252 b.serverListExpire(seq)
259 func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
260 ticker := time.NewTicker(interval)
269 stats := b.clientStats
270 b.clientStats = lbpb.ClientStats{} // Clear the stats.
273 stats.Timestamp = &lbpb.Timestamp{
275 Nanos: int32(t.Nanosecond()),
277 if err := s.Send(&lbpb.LoadBalanceRequest{
278 LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
282 grpclog.Errorf("grpclb: failed to send load report: %v", err)
288 func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
289 ctx, cancel := context.WithCancel(context.Background())
291 stream, err := lbc.BalanceLoad(ctx)
293 grpclog.Errorf("grpclb: failed to perform RPC to the remote balancer %v", err)
302 initReq := &lbpb.LoadBalanceRequest{
303 LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
304 InitialRequest: &lbpb.InitialLoadBalanceRequest{
309 if err := stream.Send(initReq); err != nil {
310 grpclog.Errorf("grpclb: failed to send init request: %v", err)
311 // TODO: backoff on retry?
314 reply, err := stream.Recv()
316 grpclog.Errorf("grpclb: failed to recv init response: %v", err)
317 // TODO: backoff on retry?
320 initResp := reply.GetInitialResponse()
322 grpclog.Errorf("grpclb: reply from remote balancer did not include initial response.")
325 // TODO: Support delegation.
326 if initResp.LoadBalancerDelegate != "" {
328 grpclog.Errorf("TODO: Delegation is not supported yet.")
331 streamDone := make(chan struct{})
332 defer close(streamDone)
334 b.clientStats = lbpb.ClientStats{} // Clear client stats.
336 if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
337 go b.sendLoadReport(stream, d, streamDone)
339 // Retrieve the server list.
341 reply, err := stream.Recv()
343 grpclog.Errorf("grpclb: failed to recv server list: %v", err)
347 if b.done || seq < b.seq {
351 b.seq++ // tick when receiving a new list of servers.
354 if serverList := reply.GetServerList(); serverList != nil {
355 b.processServerList(serverList, seq)
361 func (b *balancer) Start(target string, config BalancerConfig) error {
362 b.rand = rand.New(rand.NewSource(time.Now().Unix()))
363 // TODO: Fall back to the basic direct connection if there is no name resolver.
365 return errors.New("there is no name resolver installed")
371 return ErrClientConnClosing
373 b.addrCh = make(chan []Address)
374 w, err := b.r.Resolve(target)
377 grpclog.Errorf("grpclb: failed to resolve address: %v, err: %v", target, err)
382 balancerAddrsCh := make(chan []remoteBalancerInfo, 1)
383 // Spawn a goroutine to monitor the name resolution of remote load balancer.
386 if err := b.watchAddrUpdates(w, balancerAddrsCh); err != nil {
387 grpclog.Warningf("grpclb: the naming watcher stops working due to %v.\n", err)
388 close(balancerAddrsCh)
393 // Spawn a goroutine to talk to the remote load balancer.
397 // ccError is closed when there is an error in the current cc.
398 // A new rb should be picked from rbs and connected.
399 ccError chan struct{}
400 rb *remoteBalancerInfo
401 rbs []remoteBalancerInfo
421 case rbs, ok = <-balancerAddrsCh:
427 for i, trb := range rbs {
436 // Move the address in use to the beginning of the list.
437 b.rbs[0], b.rbs[foundIdx] = b.rbs[foundIdx], b.rbs[0]
440 continue // If found, don't dial new cc.
441 } else if len(rbs) > 0 {
442 // Pick a random one from the list, instead of always using the first one.
443 if l := len(rbs); l > 1 && rb != nil {
444 tmpIdx := b.rand.Intn(l - 1)
445 b.rbs[0], b.rbs[tmpIdx] = b.rbs[tmpIdx], b.rbs[0]
450 // foundIdx < 0 && len(rbs) <= 0.
455 if rbIdx < len(rbs)-1 {
470 // Talk to the remote load balancer to get the server list.
475 if creds := config.DialCreds; creds != nil {
477 if err := creds.OverrideServerName(rb.name); err != nil {
478 grpclog.Warningf("grpclb: failed to override the server name in the credentials: %v", err)
482 dopts = append(dopts, WithTransportCredentials(creds))
484 dopts = append(dopts, WithInsecure())
486 if dialer := config.Dialer; dialer != nil {
487 // WithDialer takes a different type of function, so we instead use a special DialOption here.
488 dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = dialer })
490 ccError = make(chan struct{})
491 cc, err = Dial(rb.addr, dopts...)
493 grpclog.Warningf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
498 b.seq++ // tick when getting a new balancer address
502 go func(cc *ClientConn, ccError chan struct{}) {
503 lbc := &loadBalancerClient{cc}
504 b.callRemoteBalancer(lbc, seq)
517 func (b *balancer) down(addr Address, err error) {
520 for _, a := range b.addrs {
528 func (b *balancer) Up(addr Address) func(error) {
535 for _, a := range b.addrs {
542 if a.connected && !a.dropForRateLimiting && !a.dropForLoadBalancing {
546 // addr is the only one which is connected. Notify the Get() callers who are blocking.
547 if cnt == 1 && b.waitCh != nil {
551 return func(err error) {
556 func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
561 err = ErrClientConnClosing
571 s, ok := rpcInfoFromContext(ctx)
577 if b.done || seq < b.seq {
580 b.clientStats.NumCallsFinished++
582 b.clientStats.NumCallsFinishedWithClientFailedToSend++
583 } else if s.bytesReceived {
584 b.clientStats.NumCallsFinishedKnownReceived++
589 b.clientStats.NumCallsStarted++
590 if len(b.addrs) > 0 {
591 if b.next >= len(b.addrs) {
597 next = (next + 1) % len(b.addrs)
599 if !a.dropForRateLimiting && !a.dropForLoadBalancing {
605 if !opts.BlockingWait {
607 if a.dropForLoadBalancing {
608 b.clientStats.NumCallsFinished++
609 b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
610 } else if a.dropForRateLimiting {
611 b.clientStats.NumCallsFinished++
612 b.clientStats.NumCallsFinishedWithDropForRateLimiting++
615 err = Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr)
620 // Has iterated all the possible address but none is connected.
625 if !opts.BlockingWait {
626 if len(b.addrs) == 0 {
627 b.clientStats.NumCallsFinished++
628 b.clientStats.NumCallsFinishedWithClientFailedToSend++
630 err = Errorf(codes.Unavailable, "there is no address available")
633 // Returns the next addr on b.addrs for a failfast RPC.
634 addr = b.addrs[b.next].addr
639 // Wait on b.waitCh for non-failfast RPCs.
641 ch = make(chan struct{})
651 b.clientStats.NumCallsFinished++
652 b.clientStats.NumCallsFinishedWithClientFailedToSend++
659 b.clientStats.NumCallsFinished++
660 b.clientStats.NumCallsFinishedWithClientFailedToSend++
662 err = ErrClientConnClosing
666 if len(b.addrs) > 0 {
667 if b.next >= len(b.addrs) {
673 next = (next + 1) % len(b.addrs)
675 if !a.dropForRateLimiting && !a.dropForLoadBalancing {
681 if !opts.BlockingWait {
683 if a.dropForLoadBalancing {
684 b.clientStats.NumCallsFinished++
685 b.clientStats.NumCallsFinishedWithDropForLoadBalancing++
686 } else if a.dropForRateLimiting {
687 b.clientStats.NumCallsFinished++
688 b.clientStats.NumCallsFinishedWithDropForRateLimiting++
691 err = Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr)
696 // Has iterated all the possible address but none is connected.
701 // The newly added addr got removed by Down() again.
703 ch = make(chan struct{})
713 func (b *balancer) Notify() <-chan []Address {
717 func (b *balancer) Close() error {
721 return errBalancerClosed
724 if b.expTimer != nil {