3 * Copyright 2014 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
28 "golang.org/x/net/context"
29 "golang.org/x/net/trace"
30 "google.golang.org/grpc/connectivity"
31 "google.golang.org/grpc/credentials"
32 "google.golang.org/grpc/grpclog"
33 "google.golang.org/grpc/keepalive"
34 "google.golang.org/grpc/stats"
35 "google.golang.org/grpc/transport"
39 // ErrClientConnClosing indicates that the operation is illegal because
40 // the ClientConn is closing.
41 ErrClientConnClosing = errors.New("grpc: the client connection is closing")
42 // ErrClientConnTimeout indicates that the ClientConn cannot establish the
43 // underlying connections within the specified timeout.
44 // DEPRECATED: Please use context.DeadlineExceeded instead.
45 ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
47 // errNoTransportSecurity indicates that there is no transport security
48 // being set for ClientConn. Users should either set one or explicitly
49 // call WithInsecure DialOption to disable security.
50 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
51 // errTransportCredentialsMissing indicates that users want to transmit security
52 // information (e.g., oauth2 token) which requires secure connection on an insecure
54 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
55 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
56 // and grpc.WithInsecure() are both called for a connection.
57 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
58 // errNetworkIO indicates that the connection is down due to some network I/O error.
59 errNetworkIO = errors.New("grpc: failed with network I/O error")
60 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
61 errConnDrain = errors.New("grpc: the connection is drained")
62 // errConnClosing indicates that the connection is closing.
63 errConnClosing = errors.New("grpc: the connection is closing")
64 // errConnUnavailable indicates that the connection is unavailable.
65 errConnUnavailable = errors.New("grpc: the connection is unavailable")
66 // errBalancerClosed indicates that the balancer is closed.
67 errBalancerClosed = errors.New("grpc: balancer is closed")
68 // minimum time to give a connection to complete
69 minConnectTimeout = 20 * time.Second
72 // dialOptions configure a Dial call. dialOptions are set by the DialOption
73 // values passed to Dial.
74 type dialOptions struct {
75 unaryInt UnaryClientInterceptor
76 streamInt StreamClientInterceptor
85 scChan <-chan ServiceConfig
86 copts transport.ConnectOptions
87 callOptions []CallOption
91 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
92 defaultClientMaxSendMessageSize = 1024 * 1024 * 4
95 // DialOption configures how we set up the connection.
96 type DialOption func(*dialOptions)
98 // WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
99 // The lower bound for window size is 64K and any value smaller than that will be ignored.
100 func WithInitialWindowSize(s int32) DialOption {
101 return func(o *dialOptions) {
102 o.copts.InitialWindowSize = s
106 // WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
107 // The lower bound for window size is 64K and any value smaller than that will be ignored.
108 func WithInitialConnWindowSize(s int32) DialOption {
109 return func(o *dialOptions) {
110 o.copts.InitialConnWindowSize = s
114 // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
115 func WithMaxMsgSize(s int) DialOption {
116 return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
119 // WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
120 func WithDefaultCallOptions(cos ...CallOption) DialOption {
121 return func(o *dialOptions) {
122 o.callOptions = append(o.callOptions, cos...)
126 // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
127 func WithCodec(c Codec) DialOption {
128 return func(o *dialOptions) {
133 // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
135 func WithCompressor(cp Compressor) DialOption {
136 return func(o *dialOptions) {
141 // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
142 // message decompressor.
143 func WithDecompressor(dc Decompressor) DialOption {
144 return func(o *dialOptions) {
149 // WithBalancer returns a DialOption which sets a load balancer.
150 func WithBalancer(b Balancer) DialOption {
151 return func(o *dialOptions) {
156 // WithServiceConfig returns a DialOption which has a channel to read the service configuration.
157 func WithServiceConfig(c <-chan ServiceConfig) DialOption {
158 return func(o *dialOptions) {
163 // WithBackoffMaxDelay configures the dialer to use the provided maximum delay
164 // when backing off after failed connection attempts.
165 func WithBackoffMaxDelay(md time.Duration) DialOption {
166 return WithBackoffConfig(BackoffConfig{MaxDelay: md})
169 // WithBackoffConfig configures the dialer to use the provided backoff
170 // parameters after connection failures.
172 // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
174 func WithBackoffConfig(b BackoffConfig) DialOption {
175 // Set defaults to ensure that provided BackoffConfig is valid and
176 // unexported fields get default values.
178 return withBackoff(b)
181 // withBackoff sets the backoff strategy used for retries after a
182 // failed connection attempt.
184 // This can be exported if arbitrary backoff strategies are allowed by gRPC.
185 func withBackoff(bs backoffStrategy) DialOption {
186 return func(o *dialOptions) {
191 // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
192 // connection is up. Without this, Dial returns immediately and connecting the server
193 // happens in background.
194 func WithBlock() DialOption {
195 return func(o *dialOptions) {
200 // WithInsecure returns a DialOption which disables transport security for this ClientConn.
201 // Note that transport security is required unless WithInsecure is set.
202 func WithInsecure() DialOption {
203 return func(o *dialOptions) {
208 // WithTransportCredentials returns a DialOption which configures a
209 // connection level security credentials (e.g., TLS/SSL).
210 func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
211 return func(o *dialOptions) {
212 o.copts.TransportCredentials = creds
216 // WithPerRPCCredentials returns a DialOption which sets
217 // credentials and places auth state on each outbound RPC.
218 func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
219 return func(o *dialOptions) {
220 o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
224 // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
225 // initially. This is valid if and only if WithBlock() is present.
226 // Deprecated: use DialContext and context.WithTimeout instead.
227 func WithTimeout(d time.Duration) DialOption {
228 return func(o *dialOptions) {
233 // WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
234 // If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
235 // Temporary() method to decide if it should try to reconnect to the network address.
236 func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
237 return func(o *dialOptions) {
238 o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
239 if deadline, ok := ctx.Deadline(); ok {
240 return f(addr, deadline.Sub(time.Now()))
247 // WithStatsHandler returns a DialOption that specifies the stats handler
248 // for all the RPCs and underlying network connections in this ClientConn.
249 func WithStatsHandler(h stats.Handler) DialOption {
250 return func(o *dialOptions) {
251 o.copts.StatsHandler = h
255 // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
256 // If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
257 // address and won't try to reconnect.
258 // The default value of FailOnNonTempDialError is false.
259 // This is an EXPERIMENTAL API.
260 func FailOnNonTempDialError(f bool) DialOption {
261 return func(o *dialOptions) {
262 o.copts.FailOnNonTempDialError = f
266 // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
267 func WithUserAgent(s string) DialOption {
268 return func(o *dialOptions) {
269 o.copts.UserAgent = s
273 // WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
274 func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
275 return func(o *dialOptions) {
276 o.copts.KeepaliveParams = kp
280 // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
281 func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
282 return func(o *dialOptions) {
287 // WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
288 func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
289 return func(o *dialOptions) {
294 // WithAuthority returns a DialOption that specifies the value to be used as
295 // the :authority pseudo-header. This value only works with WithInsecure and
296 // has no effect if TransportCredentials are present.
297 func WithAuthority(a string) DialOption {
298 return func(o *dialOptions) {
299 o.copts.Authority = a
303 // Dial creates a client connection to the given target.
304 func Dial(target string, opts ...DialOption) (*ClientConn, error) {
305 return DialContext(context.Background(), target, opts...)
308 // DialContext creates a client connection to the given target. ctx can be used to
309 // cancel or expire the pending connection. Once this function returns, the
310 // cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
311 // to terminate all the pending operations after this function returns.
312 func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
315 csMgr: &connectivityStateManager{},
316 conns: make(map[Address]*addrConn),
318 cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr}
319 cc.ctx, cc.cancel = context.WithCancel(context.Background())
321 for _, opt := range opts {
324 cc.mkp = cc.dopts.copts.KeepaliveParams
326 if cc.dopts.copts.Dialer == nil {
327 cc.dopts.copts.Dialer = newProxyDialer(
328 func(ctx context.Context, addr string) (net.Conn, error) {
329 return dialContext(ctx, "tcp", addr)
334 if cc.dopts.copts.UserAgent != "" {
335 cc.dopts.copts.UserAgent += " " + grpcUA
337 cc.dopts.copts.UserAgent = grpcUA
340 if cc.dopts.timeout > 0 {
341 var cancel context.CancelFunc
342 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
349 conn, err = nil, ctx.Err()
359 if cc.dopts.scChan != nil {
360 // Try to get an initial service config.
362 case sc, ok := <-cc.dopts.scChan:
371 if cc.dopts.codec == nil {
372 cc.dopts.codec = protoCodec{}
374 if cc.dopts.bs == nil {
375 cc.dopts.bs = DefaultBackoffConfig
377 creds := cc.dopts.copts.TransportCredentials
378 if creds != nil && creds.Info().ServerName != "" {
379 cc.authority = creds.Info().ServerName
380 } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
381 cc.authority = cc.dopts.copts.Authority
383 cc.authority = target
385 waitC := make(chan error, 1)
388 if cc.dopts.balancer == nil && cc.sc.LB != nil {
389 cc.dopts.balancer = cc.sc.LB
391 if cc.dopts.balancer != nil {
392 var credsClone credentials.TransportCredentials
394 credsClone = creds.Clone()
396 config := BalancerConfig{
397 DialCreds: credsClone,
398 Dialer: cc.dopts.copts.Dialer,
400 if err := cc.dopts.balancer.Start(target, config); err != nil {
404 ch := cc.dopts.balancer.Notify()
407 doneChan := make(chan struct{})
408 go cc.lbWatcher(doneChan)
416 // No balancer, or no resolver within the balancer. Connect directly.
417 if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil {
424 return nil, ctx.Err()
430 if cc.dopts.scChan != nil && !scSet {
431 // Blocking wait for the initial service config.
433 case sc, ok := <-cc.dopts.scChan:
438 return nil, ctx.Err()
441 if cc.dopts.scChan != nil {
448 // connectivityStateEvaluator gets updated by addrConns when their
449 // states transition, based on which it evaluates the state of
451 // Note: This code will eventually sit in the balancer in the new design.
452 type connectivityStateEvaluator struct {
453 csMgr *connectivityStateManager
455 numReady uint64 // Number of addrConns in ready state.
456 numConnecting uint64 // Number of addrConns in connecting state.
457 numTransientFailure uint64 // Number of addrConns in transientFailure.
460 // recordTransition records state change happening in every addrConn and based on
461 // that it evaluates what state the ClientConn is in.
462 // It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states,
463 // Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection
464 // before any addrConn is created ClientConn is in idle state. In the end when ClientConn
465 // closes it is in connectivity.Shutdown state.
466 // TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
467 func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) {
469 defer cse.mu.Unlock()
472 for idx, state := range []connectivity.State{oldState, newState} {
473 updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
475 case connectivity.Ready:
476 cse.numReady += updateVal
477 case connectivity.Connecting:
478 cse.numConnecting += updateVal
479 case connectivity.TransientFailure:
480 cse.numTransientFailure += updateVal
485 if cse.numReady > 0 {
486 cse.csMgr.updateState(connectivity.Ready)
489 if cse.numConnecting > 0 {
490 cse.csMgr.updateState(connectivity.Connecting)
493 cse.csMgr.updateState(connectivity.TransientFailure)
496 // connectivityStateManager keeps the connectivity.State of ClientConn.
497 // This struct will eventually be exported so the balancers can access it.
498 type connectivityStateManager struct {
500 state connectivity.State
501 notifyChan chan struct{}
504 // updateState updates the connectivity.State of ClientConn.
505 // If there's a change it notifies goroutines waiting on state change to
507 func (csm *connectivityStateManager) updateState(state connectivity.State) {
509 defer csm.mu.Unlock()
510 if csm.state == connectivity.Shutdown {
513 if csm.state == state {
517 if csm.notifyChan != nil {
518 // There are other goroutines waiting on this channel.
519 close(csm.notifyChan)
524 func (csm *connectivityStateManager) getState() connectivity.State {
526 defer csm.mu.Unlock()
530 func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
532 defer csm.mu.Unlock()
533 if csm.notifyChan == nil {
534 csm.notifyChan = make(chan struct{})
536 return csm.notifyChan
539 // ClientConn represents a client connection to an RPC server.
540 type ClientConn struct {
542 cancel context.CancelFunc
547 csMgr *connectivityStateManager
548 csEvltr *connectivityStateEvaluator // This will eventually be part of balancer.
552 conns map[Address]*addrConn
553 // Keepalive parameter can be updated if a GoAway is received.
554 mkp keepalive.ClientParameters
557 // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
558 // ctx expires. A true value is returned in former case and false in latter.
559 // This is an EXPERIMENTAL API.
560 func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
561 ch := cc.csMgr.getNotifyChan()
562 if cc.csMgr.getState() != sourceState {
573 // GetState returns the connectivity.State of ClientConn.
574 // This is an EXPERIMENTAL API.
575 func (cc *ClientConn) GetState() connectivity.State {
576 return cc.csMgr.getState()
579 // lbWatcher watches the Notify channel of the balancer in cc and manages
580 // connections accordingly. If doneChan is not nil, it is closed after the
581 // first successfull connection is made.
582 func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
584 // In case channel from cc.dopts.balancer.Notify() gets closed before a
585 // successful connection gets established, don't forget to notify the
592 for addrs := range cc.dopts.balancer.Notify() {
594 add []Address // Addresses need to setup connections.
595 del []*addrConn // Connections need to tear down.
598 for _, a := range addrs {
599 if _, ok := cc.conns[a]; !ok {
603 for k, c := range cc.conns {
605 for _, a := range addrs {
613 delete(cc.conns, c.addr)
617 for _, a := range add {
620 err = cc.resetAddrConn(a, true, nil)
626 err = cc.resetAddrConn(a, false, nil)
629 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
632 for _, c := range del {
633 c.tearDown(errConnDrain)
638 func (cc *ClientConn) scWatcher() {
641 case sc, ok := <-cc.dopts.scChan:
646 // TODO: load balance policy runtime change is ignored.
647 // We may revist this decision in the future.
650 case <-cc.ctx.Done():
656 // resetAddrConn creates an addrConn for addr and adds it to cc.conns.
657 // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
658 // If tearDownErr is nil, errConnDrain will be used instead.
660 // We should never need to replace an addrConn with a new one. This function is only used
661 // as newAddrConn to create new addrConn.
662 // TODO rename this function and clean up the code.
663 func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
669 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
670 ac.csEvltr = cc.csEvltr
672 ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
674 if !ac.dopts.insecure {
675 if ac.dopts.copts.TransportCredentials == nil {
676 return errNoTransportSecurity
679 if ac.dopts.copts.TransportCredentials != nil {
680 return errCredentialsConflict
682 for _, cd := range ac.dopts.copts.PerRPCCredentials {
683 if cd.RequireTransportSecurity() {
684 return errTransportCredentialsMissing
688 // Track ac in cc. This needs to be done before any getTransport(...) is called.
692 return ErrClientConnClosing
694 stale := cc.conns[ac.addr]
695 cc.conns[ac.addr] = ac
698 // There is an addrConn alive on ac.addr already. This could be due to
699 // a buggy Balancer that reports duplicated Addresses.
700 if tearDownErr == nil {
701 // tearDownErr is nil if resetAddrConn is called by
704 // In both cases, the stale ac should drain, not close.
705 stale.tearDown(errConnDrain)
707 stale.tearDown(tearDownErr)
711 if err := ac.resetTransport(false); err != nil {
712 if err != errConnClosing {
713 // Tear down ac and delete it from cc.conns.
715 delete(cc.conns, ac.addr)
719 if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
724 // Start to monitor the error status of transport.
725 go ac.transportMonitor()
727 // Start a goroutine connecting to the server asynchronously.
729 if err := ac.resetTransport(false); err != nil {
730 grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
731 if err != errConnClosing {
732 // Keep this ac in cc.conns, to get the reason it's torn down.
737 ac.transportMonitor()
743 // GetMethodConfig gets the method config of the input method.
744 // If there's an exact match for input method (i.e. /service/method), we return
745 // the corresponding MethodConfig.
746 // If there isn't an exact match for the input method, we look for the default config
747 // under the service (i.e /service/). If there is a default MethodConfig for
748 // the serivce, we return it.
749 // Otherwise, we return an empty MethodConfig.
750 func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
751 // TODO: Avoid the locking here.
753 defer cc.mu.RUnlock()
754 m, ok := cc.sc.Methods[method]
756 i := strings.LastIndex(method, "/")
757 m, _ = cc.sc.Methods[method[:i+1]]
762 func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
768 if cc.dopts.balancer == nil {
769 // If balancer is nil, there should be only one addrConn available.
773 return nil, nil, toRPCErr(ErrClientConnClosing)
775 for _, ac = range cc.conns {
776 // Break after the first iteration to get the first addrConn.
786 addr, put, err = cc.dopts.balancer.Get(ctx, opts)
788 return nil, nil, toRPCErr(err)
793 return nil, nil, toRPCErr(ErrClientConnClosing)
795 ac, ok = cc.conns[addr]
800 updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
803 return nil, nil, errConnClosing
805 t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
808 updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
816 // Close tears down the ClientConn and all underlying connections.
817 func (cc *ClientConn) Close() error {
823 return ErrClientConnClosing
827 cc.csMgr.updateState(connectivity.Shutdown)
829 if cc.dopts.balancer != nil {
830 cc.dopts.balancer.Close()
832 for _, ac := range conns {
833 ac.tearDown(ErrClientConnClosing)
838 // addrConn is a network connection to a given address.
839 type addrConn struct {
841 cancel context.CancelFunc
846 events trace.EventLog
848 csEvltr *connectivityStateEvaluator
851 state connectivity.State
852 down func(error) // the handler called when a connection is down.
853 // ready is closed and becomes nil when a new transport is up or failed
856 transport transport.ClientTransport
858 // The reason this addrConn is torn down.
862 // adjustParams updates parameters used to create transports upon
863 // receiving a GoAway.
864 func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
866 case transport.TooManyPings:
867 v := 2 * ac.dopts.copts.KeepaliveParams.Time
869 if v > ac.cc.mkp.Time {
876 // printf records an event in ac's event log, unless ac has been closed.
877 // REQUIRES ac.mu is held.
878 func (ac *addrConn) printf(format string, a ...interface{}) {
879 if ac.events != nil {
880 ac.events.Printf(format, a...)
884 // errorf records an error in ac's event log, unless ac has been closed.
885 // REQUIRES ac.mu is held.
886 func (ac *addrConn) errorf(format string, a ...interface{}) {
887 if ac.events != nil {
888 ac.events.Errorf(format, a...)
892 // resetTransport recreates a transport to the address for ac.
893 // For the old transport:
894 // - if drain is true, it will be gracefully closed.
895 // - otherwise, it will be closed.
896 func (ac *addrConn) resetTransport(drain bool) error {
898 if ac.state == connectivity.Shutdown {
900 return errConnClosing
902 ac.printf("connecting")
904 ac.down(downErrorf(false, true, "%v", errNetworkIO))
908 ac.state = connectivity.Connecting
909 ac.csEvltr.recordTransition(oldState, ac.state)
913 if t != nil && !drain {
917 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
919 for retries := 0; ; retries++ {
921 if ac.state == connectivity.Shutdown {
922 // ac.tearDown(...) has been invoked.
924 return errConnClosing
927 sleepTime := ac.dopts.bs.backoff(retries)
928 timeout := minConnectTimeout
929 if timeout < sleepTime {
932 ctx, cancel := context.WithTimeout(ac.ctx, timeout)
933 connectTime := time.Now()
934 sinfo := transport.TargetInfo{
936 Metadata: ac.addr.Metadata,
938 newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
939 // Don't call cancel in success path due to a race in Go 1.6:
940 // https://github.com/golang/go/issues/15078.
944 if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
947 grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
949 if ac.state == connectivity.Shutdown {
950 // ac.tearDown(...) has been invoked.
952 return errConnClosing
954 ac.errorf("transient failure: %v", err)
956 ac.state = connectivity.TransientFailure
957 ac.csEvltr.recordTransition(oldState, ac.state)
963 timer := time.NewTimer(sleepTime - time.Since(connectTime))
966 case <-ac.ctx.Done():
975 if ac.state == connectivity.Shutdown {
976 // ac.tearDown(...) has been invoked.
979 return errConnClosing
982 ac.state = connectivity.Ready
983 ac.csEvltr.recordTransition(oldState, ac.state)
984 ac.transport = newTransport
989 if ac.cc.dopts.balancer != nil {
990 ac.down = ac.cc.dopts.balancer.Up(ac.addr)
997 // Run in a goroutine to track the error in transport and create the
998 // new transport if an error happens. It returns when the channel is closing.
999 func (ac *addrConn) transportMonitor() {
1005 // This is needed to detect the teardown when
1006 // the addrConn is idle (i.e., no RPC in flight).
1007 case <-ac.ctx.Done():
1015 ac.adjustParams(t.GetGoAwayReason())
1016 // If GoAway happens without any network I/O error, the underlying transport
1017 // will be gracefully closed, and a new transport will be created.
1018 // (The transport will be closed when all the pending RPCs finished or failed.)
1019 // If GoAway and some network I/O error happen concurrently, the underlying transport
1020 // will be closed, and a new transport will be created.
1027 if err := ac.resetTransport(drain); err != nil {
1028 grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
1029 if err != errConnClosing {
1030 // Keep this ac in cc.conns, to get the reason it's torn down.
1037 case <-ac.ctx.Done():
1041 ac.adjustParams(t.GetGoAwayReason())
1042 if err := ac.resetTransport(false); err != nil {
1043 grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
1044 if err != errConnClosing {
1045 // Keep this ac in cc.conns, to get the reason it's torn down.
1053 if ac.state == connectivity.Shutdown {
1054 // ac has been shutdown.
1058 oldState := ac.state
1059 ac.state = connectivity.TransientFailure
1060 ac.csEvltr.recordTransition(oldState, ac.state)
1062 if err := ac.resetTransport(false); err != nil {
1063 grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
1065 ac.printf("transport exiting: %v", err)
1067 grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
1068 if err != errConnClosing {
1069 // Keep this ac in cc.conns, to get the reason it's torn down.
1078 // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
1079 // iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
1080 func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
1084 case ac.state == connectivity.Shutdown:
1085 if failfast || !hasBalancer {
1086 // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
1087 err := ac.tearDownErr
1092 return nil, errConnClosing
1093 case ac.state == connectivity.Ready:
1097 case ac.state == connectivity.TransientFailure:
1098 if failfast || hasBalancer {
1100 return nil, errConnUnavailable
1105 ready = make(chan struct{})
1111 return nil, toRPCErr(ctx.Err())
1112 // Wait until the new transport is ready or failed.
1118 // tearDown starts to tear down the addrConn.
1119 // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1120 // some edge cases (e.g., the caller opens and closes many addrConn's in a
1122 // tearDown doesn't remove ac from ac.cc.conns.
1123 func (ac *addrConn) tearDown(err error) {
1127 defer ac.mu.Unlock()
1129 ac.down(downErrorf(false, false, "%v", err))
1132 if err == errConnDrain && ac.transport != nil {
1133 // GracefulClose(...) may be executed multiple times when
1134 // i) receiving multiple GoAway frames from the server; or
1135 // ii) there are concurrent name resolver/Balancer triggered
1136 // address removal and GoAway.
1137 ac.transport.GracefulClose()
1139 if ac.state == connectivity.Shutdown {
1142 oldState := ac.state
1143 ac.state = connectivity.Shutdown
1144 ac.tearDownErr = err
1145 ac.csEvltr.recordTransition(oldState, ac.state)
1146 if ac.events != nil {
1150 if ac.ready != nil {
1154 if ac.transport != nil && err != errConnDrain {
1155 ac.transport.Close()