aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go1859
1 files changed, 1075 insertions, 784 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index e3f6cb1..56d0bf7 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -19,35 +19,72 @@
19package grpc 19package grpc
20 20
21import ( 21import (
22 "context"
22 "errors" 23 "errors"
24 "fmt"
25 "math"
23 "net" 26 "net"
27 "reflect"
24 "strings" 28 "strings"
25 "sync" 29 "sync"
30 "sync/atomic"
26 "time" 31 "time"
27 32
28 "golang.org/x/net/context" 33 "google.golang.org/grpc/balancer"
29 "golang.org/x/net/trace" 34 _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
35 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/connectivity" 36 "google.golang.org/grpc/connectivity"
31 "google.golang.org/grpc/credentials" 37 "google.golang.org/grpc/credentials"
32 "google.golang.org/grpc/grpclog" 38 "google.golang.org/grpc/grpclog"
39 "google.golang.org/grpc/internal/backoff"
40 "google.golang.org/grpc/internal/channelz"
41 "google.golang.org/grpc/internal/envconfig"
42 "google.golang.org/grpc/internal/grpcsync"
43 "google.golang.org/grpc/internal/transport"
33 "google.golang.org/grpc/keepalive" 44 "google.golang.org/grpc/keepalive"
34 "google.golang.org/grpc/stats" 45 "google.golang.org/grpc/metadata"
35 "google.golang.org/grpc/transport" 46 "google.golang.org/grpc/resolver"
47 _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
48 _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
49 "google.golang.org/grpc/status"
50)
51
52const (
53 // minimum time to give a connection to complete
54 minConnectTimeout = 20 * time.Second
55 // must match grpclbName in grpclb/grpclb.go
56 grpclbName = "grpclb"
36) 57)
37 58
38var ( 59var (
39 // ErrClientConnClosing indicates that the operation is illegal because 60 // ErrClientConnClosing indicates that the operation is illegal because
40 // the ClientConn is closing. 61 // the ClientConn is closing.
41 ErrClientConnClosing = errors.New("grpc: the client connection is closing") 62 //
42 // ErrClientConnTimeout indicates that the ClientConn cannot establish the 63 // Deprecated: this error should not be relied upon by users; use the status
43 // underlying connections within the specified timeout. 64 // code of Canceled instead.
44 // DEPRECATED: Please use context.DeadlineExceeded instead. 65 ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
45 ErrClientConnTimeout = errors.New("grpc: timed out when dialing") 66 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
67 errConnDrain = errors.New("grpc: the connection is drained")
68 // errConnClosing indicates that the connection is closing.
69 errConnClosing = errors.New("grpc: the connection is closing")
70 // errBalancerClosed indicates that the balancer is closed.
71 errBalancerClosed = errors.New("grpc: balancer is closed")
72 // We use an accessor so that minConnectTimeout can be
73 // atomically read and updated while testing.
74 getMinConnectTimeout = func() time.Duration {
75 return minConnectTimeout
76 }
77)
46 78
79// The following errors are returned from Dial and DialContext
80var (
47 // errNoTransportSecurity indicates that there is no transport security 81 // errNoTransportSecurity indicates that there is no transport security
48 // being set for ClientConn. Users should either set one or explicitly 82 // being set for ClientConn. Users should either set one or explicitly
49 // call WithInsecure DialOption to disable security. 83 // call WithInsecure DialOption to disable security.
50 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") 84 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
85 // errTransportCredsAndBundle indicates that creds bundle is used together
86 // with other individual Transport Credentials.
87 errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
51 // errTransportCredentialsMissing indicates that users want to transmit security 88 // errTransportCredentialsMissing indicates that users want to transmit security
52 // information (e.g., oauth2 token) which requires secure connection on an insecure 89 // information (e.g., oauth2 token) which requires secure connection on an insecure
53 // connection. 90 // connection.
@@ -55,278 +92,100 @@ var (
55 // errCredentialsConflict indicates that grpc.WithTransportCredentials() 92 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
56 // and grpc.WithInsecure() are both called for a connection. 93 // and grpc.WithInsecure() are both called for a connection.
57 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") 94 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
58 // errNetworkIO indicates that the connection is down due to some network I/O error.
59 errNetworkIO = errors.New("grpc: failed with network I/O error")
60 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
61 errConnDrain = errors.New("grpc: the connection is drained")
62 // errConnClosing indicates that the connection is closing.
63 errConnClosing = errors.New("grpc: the connection is closing")
64 // errConnUnavailable indicates that the connection is unavailable.
65 errConnUnavailable = errors.New("grpc: the connection is unavailable")
66 // errBalancerClosed indicates that the balancer is closed.
67 errBalancerClosed = errors.New("grpc: balancer is closed")
68 // minimum time to give a connection to complete
69 minConnectTimeout = 20 * time.Second
70) 95)
71 96
72// dialOptions configure a Dial call. dialOptions are set by the DialOption
73// values passed to Dial.
74type dialOptions struct {
75 unaryInt UnaryClientInterceptor
76 streamInt StreamClientInterceptor
77 codec Codec
78 cp Compressor
79 dc Decompressor
80 bs backoffStrategy
81 balancer Balancer
82 block bool
83 insecure bool
84 timeout time.Duration
85 scChan <-chan ServiceConfig
86 copts transport.ConnectOptions
87 callOptions []CallOption
88}
89
90const ( 97const (
91 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 98 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
92 defaultClientMaxSendMessageSize = 1024 * 1024 * 4 99 defaultClientMaxSendMessageSize = math.MaxInt32
100 // http2IOBufSize specifies the buffer size for sending frames.
101 defaultWriteBufSize = 32 * 1024
102 defaultReadBufSize = 32 * 1024
93) 103)
94 104
95// DialOption configures how we set up the connection.
96type DialOption func(*dialOptions)
97
98// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
99// The lower bound for window size is 64K and any value smaller than that will be ignored.
100func WithInitialWindowSize(s int32) DialOption {
101 return func(o *dialOptions) {
102 o.copts.InitialWindowSize = s
103 }
104}
105
106// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
107// The lower bound for window size is 64K and any value smaller than that will be ignored.
108func WithInitialConnWindowSize(s int32) DialOption {
109 return func(o *dialOptions) {
110 o.copts.InitialConnWindowSize = s
111 }
112}
113
114// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
115func WithMaxMsgSize(s int) DialOption {
116 return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
117}
118
119// WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
120func WithDefaultCallOptions(cos ...CallOption) DialOption {
121 return func(o *dialOptions) {
122 o.callOptions = append(o.callOptions, cos...)
123 }
124}
125
126// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
127func WithCodec(c Codec) DialOption {
128 return func(o *dialOptions) {
129 o.codec = c
130 }
131}
132
133// WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
134// compressor.
135func WithCompressor(cp Compressor) DialOption {
136 return func(o *dialOptions) {
137 o.cp = cp
138 }
139}
140
141// WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
142// message decompressor.
143func WithDecompressor(dc Decompressor) DialOption {
144 return func(o *dialOptions) {
145 o.dc = dc
146 }
147}
148
149// WithBalancer returns a DialOption which sets a load balancer.
150func WithBalancer(b Balancer) DialOption {
151 return func(o *dialOptions) {
152 o.balancer = b
153 }
154}
155
156// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
157func WithServiceConfig(c <-chan ServiceConfig) DialOption {
158 return func(o *dialOptions) {
159 o.scChan = c
160 }
161}
162
163// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
164// when backing off after failed connection attempts.
165func WithBackoffMaxDelay(md time.Duration) DialOption {
166 return WithBackoffConfig(BackoffConfig{MaxDelay: md})
167}
168
169// WithBackoffConfig configures the dialer to use the provided backoff
170// parameters after connection failures.
171//
172// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
173// for use.
174func WithBackoffConfig(b BackoffConfig) DialOption {
175 // Set defaults to ensure that provided BackoffConfig is valid and
176 // unexported fields get default values.
177 setDefaults(&b)
178 return withBackoff(b)
179}
180
181// withBackoff sets the backoff strategy used for retries after a
182// failed connection attempt.
183//
184// This can be exported if arbitrary backoff strategies are allowed by gRPC.
185func withBackoff(bs backoffStrategy) DialOption {
186 return func(o *dialOptions) {
187 o.bs = bs
188 }
189}
190
191// WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
192// connection is up. Without this, Dial returns immediately and connecting the server
193// happens in background.
194func WithBlock() DialOption {
195 return func(o *dialOptions) {
196 o.block = true
197 }
198}
199
200// WithInsecure returns a DialOption which disables transport security for this ClientConn.
201// Note that transport security is required unless WithInsecure is set.
202func WithInsecure() DialOption {
203 return func(o *dialOptions) {
204 o.insecure = true
205 }
206}
207
208// WithTransportCredentials returns a DialOption which configures a
209// connection level security credentials (e.g., TLS/SSL).
210func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
211 return func(o *dialOptions) {
212 o.copts.TransportCredentials = creds
213 }
214}
215
216// WithPerRPCCredentials returns a DialOption which sets
217// credentials and places auth state on each outbound RPC.
218func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
219 return func(o *dialOptions) {
220 o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
221 }
222}
223
224// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
225// initially. This is valid if and only if WithBlock() is present.
226// Deprecated: use DialContext and context.WithTimeout instead.
227func WithTimeout(d time.Duration) DialOption {
228 return func(o *dialOptions) {
229 o.timeout = d
230 }
231}
232
233// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
234// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
235// Temporary() method to decide if it should try to reconnect to the network address.
236func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
237 return func(o *dialOptions) {
238 o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
239 if deadline, ok := ctx.Deadline(); ok {
240 return f(addr, deadline.Sub(time.Now()))
241 }
242 return f(addr, 0)
243 }
244 }
245}
246
247// WithStatsHandler returns a DialOption that specifies the stats handler
248// for all the RPCs and underlying network connections in this ClientConn.
249func WithStatsHandler(h stats.Handler) DialOption {
250 return func(o *dialOptions) {
251 o.copts.StatsHandler = h
252 }
253}
254
255// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
256// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
257// address and won't try to reconnect.
258// The default value of FailOnNonTempDialError is false.
259// This is an EXPERIMENTAL API.
260func FailOnNonTempDialError(f bool) DialOption {
261 return func(o *dialOptions) {
262 o.copts.FailOnNonTempDialError = f
263 }
264}
265
266// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
267func WithUserAgent(s string) DialOption {
268 return func(o *dialOptions) {
269 o.copts.UserAgent = s
270 }
271}
272
273// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
274func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
275 return func(o *dialOptions) {
276 o.copts.KeepaliveParams = kp
277 }
278}
279
280// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
281func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
282 return func(o *dialOptions) {
283 o.unaryInt = f
284 }
285}
286
287// WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
288func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
289 return func(o *dialOptions) {
290 o.streamInt = f
291 }
292}
293
294// WithAuthority returns a DialOption that specifies the value to be used as
295// the :authority pseudo-header. This value only works with WithInsecure and
296// has no effect if TransportCredentials are present.
297func WithAuthority(a string) DialOption {
298 return func(o *dialOptions) {
299 o.copts.Authority = a
300 }
301}
302
303// Dial creates a client connection to the given target. 105// Dial creates a client connection to the given target.
304func Dial(target string, opts ...DialOption) (*ClientConn, error) { 106func Dial(target string, opts ...DialOption) (*ClientConn, error) {
305 return DialContext(context.Background(), target, opts...) 107 return DialContext(context.Background(), target, opts...)
306} 108}
307 109
308// DialContext creates a client connection to the given target. ctx can be used to 110// DialContext creates a client connection to the given target. By default, it's
309// cancel or expire the pending connection. Once this function returns, the 111// a non-blocking dial (the function won't wait for connections to be
310// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close 112// established, and connecting happens in the background). To make it a blocking
311// to terminate all the pending operations after this function returns. 113// dial, use WithBlock() dial option.
114//
115// In the non-blocking case, the ctx does not act against the connection. It
116// only controls the setup steps.
117//
118// In the blocking case, ctx can be used to cancel or expire the pending
119// connection. Once this function returns, the cancellation and expiration of
120// ctx will be noop. Users should call ClientConn.Close to terminate all the
121// pending operations after this function returns.
122//
123// The target name syntax is defined in
124// https://github.com/grpc/grpc/blob/master/doc/naming.md.
125// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
312func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { 126func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
313 cc := &ClientConn{ 127 cc := &ClientConn{
314 target: target, 128 target: target,
315 csMgr: &connectivityStateManager{}, 129 csMgr: &connectivityStateManager{},
316 conns: make(map[Address]*addrConn), 130 conns: make(map[*addrConn]struct{}),
317 } 131 dopts: defaultDialOptions(),
318 cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr} 132 blockingpicker: newPickerWrapper(),
133 czData: new(channelzData),
134 firstResolveEvent: grpcsync.NewEvent(),
135 }
136 cc.retryThrottler.Store((*retryThrottler)(nil))
319 cc.ctx, cc.cancel = context.WithCancel(context.Background()) 137 cc.ctx, cc.cancel = context.WithCancel(context.Background())
320 138
321 for _, opt := range opts { 139 for _, opt := range opts {
322 opt(&cc.dopts) 140 opt.apply(&cc.dopts)
141 }
142
143 if channelz.IsOn() {
144 if cc.dopts.channelzParentID != 0 {
145 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
146 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
147 Desc: "Channel Created",
148 Severity: channelz.CtINFO,
149 Parent: &channelz.TraceEventDesc{
150 Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
151 Severity: channelz.CtINFO,
152 },
153 })
154 } else {
155 cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
156 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
157 Desc: "Channel Created",
158 Severity: channelz.CtINFO,
159 })
160 }
161 cc.csMgr.channelzID = cc.channelzID
323 } 162 }
163
164 if !cc.dopts.insecure {
165 if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
166 return nil, errNoTransportSecurity
167 }
168 if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
169 return nil, errTransportCredsAndBundle
170 }
171 } else {
172 if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
173 return nil, errCredentialsConflict
174 }
175 for _, cd := range cc.dopts.copts.PerRPCCredentials {
176 if cd.RequireTransportSecurity() {
177 return nil, errTransportCredentialsMissing
178 }
179 }
180 }
181
324 cc.mkp = cc.dopts.copts.KeepaliveParams 182 cc.mkp = cc.dopts.copts.KeepaliveParams
325 183
326 if cc.dopts.copts.Dialer == nil { 184 if cc.dopts.copts.Dialer == nil {
327 cc.dopts.copts.Dialer = newProxyDialer( 185 cc.dopts.copts.Dialer = newProxyDialer(
328 func(ctx context.Context, addr string) (net.Conn, error) { 186 func(ctx context.Context, addr string) (net.Conn, error) {
329 return dialContext(ctx, "tcp", addr) 187 network, addr := parseDialTarget(addr)
188 return (&net.Dialer{}).DialContext(ctx, network, addr)
330 }, 189 },
331 ) 190 )
332 } 191 }
@@ -367,66 +226,41 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
367 default: 226 default:
368 } 227 }
369 } 228 }
370 // Set defaults.
371 if cc.dopts.codec == nil {
372 cc.dopts.codec = protoCodec{}
373 }
374 if cc.dopts.bs == nil { 229 if cc.dopts.bs == nil {
375 cc.dopts.bs = DefaultBackoffConfig 230 cc.dopts.bs = backoff.Exponential{
231 MaxDelay: DefaultBackoffConfig.MaxDelay,
232 }
233 }
234 if cc.dopts.resolverBuilder == nil {
235 // Only try to parse target when resolver builder is not already set.
236 cc.parsedTarget = parseTarget(cc.target)
237 grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
238 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
239 if cc.dopts.resolverBuilder == nil {
240 // If resolver builder is still nil, the parse target's scheme is
241 // not registered. Fallback to default resolver and set Endpoint to
242 // the original unparsed target.
243 grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
244 cc.parsedTarget = resolver.Target{
245 Scheme: resolver.GetDefaultScheme(),
246 Endpoint: target,
247 }
248 cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
249 }
250 } else {
251 cc.parsedTarget = resolver.Target{Endpoint: target}
376 } 252 }
377 creds := cc.dopts.copts.TransportCredentials 253 creds := cc.dopts.copts.TransportCredentials
378 if creds != nil && creds.Info().ServerName != "" { 254 if creds != nil && creds.Info().ServerName != "" {
379 cc.authority = creds.Info().ServerName 255 cc.authority = creds.Info().ServerName
380 } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" { 256 } else if cc.dopts.insecure && cc.dopts.authority != "" {
381 cc.authority = cc.dopts.copts.Authority 257 cc.authority = cc.dopts.authority
382 } else { 258 } else {
383 cc.authority = target 259 // Use endpoint from "scheme://authority/endpoint" as the default
384 } 260 // authority for ClientConn.
385 waitC := make(chan error, 1) 261 cc.authority = cc.parsedTarget.Endpoint
386 go func() {
387 defer close(waitC)
388 if cc.dopts.balancer == nil && cc.sc.LB != nil {
389 cc.dopts.balancer = cc.sc.LB
390 }
391 if cc.dopts.balancer != nil {
392 var credsClone credentials.TransportCredentials
393 if creds != nil {
394 credsClone = creds.Clone()
395 }
396 config := BalancerConfig{
397 DialCreds: credsClone,
398 Dialer: cc.dopts.copts.Dialer,
399 }
400 if err := cc.dopts.balancer.Start(target, config); err != nil {
401 waitC <- err
402 return
403 }
404 ch := cc.dopts.balancer.Notify()
405 if ch != nil {
406 if cc.dopts.block {
407 doneChan := make(chan struct{})
408 go cc.lbWatcher(doneChan)
409 <-doneChan
410 } else {
411 go cc.lbWatcher(nil)
412 }
413 return
414 }
415 }
416 // No balancer, or no resolver within the balancer. Connect directly.
417 if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil {
418 waitC <- err
419 return
420 }
421 }()
422 select {
423 case <-ctx.Done():
424 return nil, ctx.Err()
425 case err := <-waitC:
426 if err != nil {
427 return nil, err
428 }
429 } 262 }
263
430 if cc.dopts.scChan != nil && !scSet { 264 if cc.dopts.scChan != nil && !scSet {
431 // Blocking wait for the initial service config. 265 // Blocking wait for the initial service config.
432 select { 266 select {
@@ -442,55 +276,50 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
442 go cc.scWatcher() 276 go cc.scWatcher()
443 } 277 }
444 278
445 return cc, nil 279 var credsClone credentials.TransportCredentials
446} 280 if creds := cc.dopts.copts.TransportCredentials; creds != nil {
281 credsClone = creds.Clone()
282 }
283 cc.balancerBuildOpts = balancer.BuildOptions{
284 DialCreds: credsClone,
285 CredsBundle: cc.dopts.copts.CredsBundle,
286 Dialer: cc.dopts.copts.Dialer,
287 ChannelzParentID: cc.channelzID,
288 }
447 289
448// connectivityStateEvaluator gets updated by addrConns when their 290 // Build the resolver.
449// states transition, based on which it evaluates the state of 291 rWrapper, err := newCCResolverWrapper(cc)
450// ClientConn. 292 if err != nil {
451// Note: This code will eventually sit in the balancer in the new design. 293 return nil, fmt.Errorf("failed to build resolver: %v", err)
452type connectivityStateEvaluator struct { 294 }
453 csMgr *connectivityStateManager
454 mu sync.Mutex
455 numReady uint64 // Number of addrConns in ready state.
456 numConnecting uint64 // Number of addrConns in connecting state.
457 numTransientFailure uint64 // Number of addrConns in transientFailure.
458}
459 295
460// recordTransition records state change happening in every addrConn and based on 296 cc.mu.Lock()
461// that it evaluates what state the ClientConn is in. 297 cc.resolverWrapper = rWrapper
462// It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states, 298 cc.mu.Unlock()
463// Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection 299 // A blocking dial blocks until the clientConn is ready.
464// before any addrConn is created ClientConn is in idle state. In the end when ClientConn 300 if cc.dopts.block {
465// closes it is in connectivity.Shutdown state. 301 for {
466// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state. 302 s := cc.GetState()
467func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) { 303 if s == connectivity.Ready {
468 cse.mu.Lock() 304 break
469 defer cse.mu.Unlock() 305 } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
470 306 if err = cc.blockingpicker.connectionError(); err != nil {
471 // Update counters. 307 terr, ok := err.(interface {
472 for idx, state := range []connectivity.State{oldState, newState} { 308 Temporary() bool
473 updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. 309 })
474 switch state { 310 if ok && !terr.Temporary() {
475 case connectivity.Ready: 311 return nil, err
476 cse.numReady += updateVal 312 }
477 case connectivity.Connecting: 313 }
478 cse.numConnecting += updateVal 314 }
479 case connectivity.TransientFailure: 315 if !cc.WaitForStateChange(ctx, s) {
480 cse.numTransientFailure += updateVal 316 // ctx got timeout or canceled.
317 return nil, ctx.Err()
318 }
481 } 319 }
482 } 320 }
483 321
484 // Evaluate. 322 return cc, nil
485 if cse.numReady > 0 {
486 cse.csMgr.updateState(connectivity.Ready)
487 return
488 }
489 if cse.numConnecting > 0 {
490 cse.csMgr.updateState(connectivity.Connecting)
491 return
492 }
493 cse.csMgr.updateState(connectivity.TransientFailure)
494} 323}
495 324
496// connectivityStateManager keeps the connectivity.State of ClientConn. 325// connectivityStateManager keeps the connectivity.State of ClientConn.
@@ -499,6 +328,7 @@ type connectivityStateManager struct {
499 mu sync.Mutex 328 mu sync.Mutex
500 state connectivity.State 329 state connectivity.State
501 notifyChan chan struct{} 330 notifyChan chan struct{}
331 channelzID int64
502} 332}
503 333
504// updateState updates the connectivity.State of ClientConn. 334// updateState updates the connectivity.State of ClientConn.
@@ -514,6 +344,12 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {
514 return 344 return
515 } 345 }
516 csm.state = state 346 csm.state = state
347 if channelz.IsOn() {
348 channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
349 Desc: fmt.Sprintf("Channel Connectivity change to %v", state),
350 Severity: channelz.CtINFO,
351 })
352 }
517 if csm.notifyChan != nil { 353 if csm.notifyChan != nil {
518 // There are other goroutines waiting on this channel. 354 // There are other goroutines waiting on this channel.
519 close(csm.notifyChan) 355 close(csm.notifyChan)
@@ -541,17 +377,32 @@ type ClientConn struct {
541 ctx context.Context 377 ctx context.Context
542 cancel context.CancelFunc 378 cancel context.CancelFunc
543 379
544 target string 380 target string
545 authority string 381 parsedTarget resolver.Target
546 dopts dialOptions 382 authority string
547 csMgr *connectivityStateManager 383 dopts dialOptions
548 csEvltr *connectivityStateEvaluator // This will eventually be part of balancer. 384 csMgr *connectivityStateManager
549 385
550 mu sync.RWMutex 386 balancerBuildOpts balancer.BuildOptions
551 sc ServiceConfig 387 blockingpicker *pickerWrapper
552 conns map[Address]*addrConn 388
389 mu sync.RWMutex
390 resolverWrapper *ccResolverWrapper
391 sc ServiceConfig
392 scRaw string
393 conns map[*addrConn]struct{}
553 // Keepalive parameter can be updated if a GoAway is received. 394 // Keepalive parameter can be updated if a GoAway is received.
554 mkp keepalive.ClientParameters 395 mkp keepalive.ClientParameters
396 curBalancerName string
397 preBalancerName string // previous balancer name.
398 curAddresses []resolver.Address
399 balancerWrapper *ccBalancerWrapper
400 retryThrottler atomic.Value
401
402 firstResolveEvent *grpcsync.Event
403
404 channelzID int64 // channelz unique identification number
405 czData *channelzData
555} 406}
556 407
557// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or 408// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
@@ -576,65 +427,6 @@ func (cc *ClientConn) GetState() connectivity.State {
576 return cc.csMgr.getState() 427 return cc.csMgr.getState()
577} 428}
578 429
579// lbWatcher watches the Notify channel of the balancer in cc and manages
580// connections accordingly. If doneChan is not nil, it is closed after the
581// first successfull connection is made.
582func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
583 defer func() {
584 // In case channel from cc.dopts.balancer.Notify() gets closed before a
585 // successful connection gets established, don't forget to notify the
586 // caller.
587 if doneChan != nil {
588 close(doneChan)
589 }
590 }()
591
592 for addrs := range cc.dopts.balancer.Notify() {
593 var (
594 add []Address // Addresses need to setup connections.
595 del []*addrConn // Connections need to tear down.
596 )
597 cc.mu.Lock()
598 for _, a := range addrs {
599 if _, ok := cc.conns[a]; !ok {
600 add = append(add, a)
601 }
602 }
603 for k, c := range cc.conns {
604 var keep bool
605 for _, a := range addrs {
606 if k == a {
607 keep = true
608 break
609 }
610 }
611 if !keep {
612 del = append(del, c)
613 delete(cc.conns, c.addr)
614 }
615 }
616 cc.mu.Unlock()
617 for _, a := range add {
618 var err error
619 if doneChan != nil {
620 err = cc.resetAddrConn(a, true, nil)
621 if err == nil {
622 close(doneChan)
623 doneChan = nil
624 }
625 } else {
626 err = cc.resetAddrConn(a, false, nil)
627 }
628 if err != nil {
629 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
630 }
631 }
632 for _, c := range del {
633 c.tearDown(errConnDrain)
634 }
635 }
636}
637
638func (cc *ClientConn) scWatcher() { 430func (cc *ClientConn) scWatcher() {
639 for { 431 for {
640 select { 432 select {
@@ -646,6 +438,7 @@ func (cc *ClientConn) scWatcher() {
646 // TODO: load balance policy runtime change is ignored. 438 // TODO: load balance policy runtime change is ignored.
647 // We may revist this decision in the future. 439 // We may revist this decision in the future.
648 cc.sc = sc 440 cc.sc = sc
441 cc.scRaw = ""
649 cc.mu.Unlock() 442 cc.mu.Unlock()
650 case <-cc.ctx.Done(): 443 case <-cc.ctx.Done():
651 return 444 return
@@ -653,99 +446,287 @@ func (cc *ClientConn) scWatcher() {
653 } 446 }
654} 447}
655 448
656// resetAddrConn creates an addrConn for addr and adds it to cc.conns. 449// waitForResolvedAddrs blocks until the resolver has provided addresses or the
657// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. 450// context expires. Returns nil unless the context expires first; otherwise
658// If tearDownErr is nil, errConnDrain will be used instead. 451// returns a status error based on the context.
659// 452func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
660// We should never need to replace an addrConn with a new one. This function is only used 453 // This is on the RPC path, so we use a fast path to avoid the
661// as newAddrConn to create new addrConn. 454 // more-expensive "select" below after the resolver has returned once.
662// TODO rename this function and clean up the code. 455 if cc.firstResolveEvent.HasFired() {
663func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error { 456 return nil
664 ac := &addrConn{
665 cc: cc,
666 addr: addr,
667 dopts: cc.dopts,
668 } 457 }
669 ac.ctx, ac.cancel = context.WithCancel(cc.ctx) 458 select {
670 ac.csEvltr = cc.csEvltr 459 case <-cc.firstResolveEvent.Done():
671 if EnableTracing { 460 return nil
672 ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr) 461 case <-ctx.Done():
462 return status.FromContextError(ctx.Err()).Err()
463 case <-cc.ctx.Done():
464 return ErrClientConnClosing
673 } 465 }
674 if !ac.dopts.insecure { 466}
675 if ac.dopts.copts.TransportCredentials == nil { 467
676 return errNoTransportSecurity 468func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
677 } 469 cc.mu.Lock()
678 } else { 470 defer cc.mu.Unlock()
679 if ac.dopts.copts.TransportCredentials != nil { 471 if cc.conns == nil {
680 return errCredentialsConflict 472 // cc was closed.
473 return
474 }
475
476 if reflect.DeepEqual(cc.curAddresses, addrs) {
477 return
478 }
479
480 cc.curAddresses = addrs
481 cc.firstResolveEvent.Fire()
482
483 if cc.dopts.balancerBuilder == nil {
484 // Only look at balancer types and switch balancer if balancer dial
485 // option is not set.
486 var isGRPCLB bool
487 for _, a := range addrs {
488 if a.Type == resolver.GRPCLB {
489 isGRPCLB = true
490 break
491 }
681 } 492 }
682 for _, cd := range ac.dopts.copts.PerRPCCredentials { 493 var newBalancerName string
683 if cd.RequireTransportSecurity() { 494 if isGRPCLB {
684 return errTransportCredentialsMissing 495 newBalancerName = grpclbName
496 } else {
497 // Address list doesn't contain grpclb address. Try to pick a
498 // non-grpclb balancer.
499 newBalancerName = cc.curBalancerName
500 // If current balancer is grpclb, switch to the previous one.
501 if newBalancerName == grpclbName {
502 newBalancerName = cc.preBalancerName
503 }
504 // The following could be true in two cases:
505 // - the first time handling resolved addresses
506 // (curBalancerName="")
507 // - the first time handling non-grpclb addresses
508 // (curBalancerName="grpclb", preBalancerName="")
509 if newBalancerName == "" {
510 newBalancerName = PickFirstBalancerName
685 } 511 }
686 } 512 }
513 cc.switchBalancer(newBalancerName)
514 } else if cc.balancerWrapper == nil {
515 // Balancer dial option was set, and this is the first time handling
516 // resolved addresses. Build a balancer with dopts.balancerBuilder.
517 cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
687 } 518 }
519
520 cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
521}
522
523// switchBalancer starts the switching from current balancer to the balancer
524// with the given name.
525//
526// It will NOT send the current address list to the new balancer. If needed,
527// caller of this function should send address list to the new balancer after
528// this function returns.
529//
530// Caller must hold cc.mu.
531func (cc *ClientConn) switchBalancer(name string) {
532 if cc.conns == nil {
533 return
534 }
535
536 if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
537 return
538 }
539
540 grpclog.Infof("ClientConn switching balancer to %q", name)
541 if cc.dopts.balancerBuilder != nil {
542 grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
543 return
544 }
545 // TODO(bar switching) change this to two steps: drain and close.
546 // Keep track of sc in wrapper.
547 if cc.balancerWrapper != nil {
548 cc.balancerWrapper.close()
549 }
550
551 builder := balancer.Get(name)
552 // TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
553 // we reuse previous one?
554 if channelz.IsOn() {
555 if builder == nil {
556 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
557 Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
558 Severity: channelz.CtWarning,
559 })
560 } else {
561 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
562 Desc: fmt.Sprintf("Channel switches to new LB policy %q", name),
563 Severity: channelz.CtINFO,
564 })
565 }
566 }
567 if builder == nil {
568 grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
569 builder = newPickfirstBuilder()
570 }
571
572 cc.preBalancerName = cc.curBalancerName
573 cc.curBalancerName = builder.Name()
574 cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
575}
576
577func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
578 cc.mu.Lock()
579 if cc.conns == nil {
580 cc.mu.Unlock()
581 return
582 }
583 // TODO(bar switching) send updates to all balancer wrappers when balancer
584 // gracefully switching is supported.
585 cc.balancerWrapper.handleSubConnStateChange(sc, s)
586 cc.mu.Unlock()
587}
588
589// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
590//
591// Caller needs to make sure len(addrs) > 0.
592func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
593 ac := &addrConn{
594 cc: cc,
595 addrs: addrs,
596 scopts: opts,
597 dopts: cc.dopts,
598 czData: new(channelzData),
599 resetBackoff: make(chan struct{}),
600 }
601 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
688 // Track ac in cc. This needs to be done before any getTransport(...) is called. 602 // Track ac in cc. This needs to be done before any getTransport(...) is called.
689 cc.mu.Lock() 603 cc.mu.Lock()
690 if cc.conns == nil { 604 if cc.conns == nil {
691 cc.mu.Unlock() 605 cc.mu.Unlock()
692 return ErrClientConnClosing 606 return nil, ErrClientConnClosing
607 }
608 if channelz.IsOn() {
609 ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
610 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
611 Desc: "Subchannel Created",
612 Severity: channelz.CtINFO,
613 Parent: &channelz.TraceEventDesc{
614 Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
615 Severity: channelz.CtINFO,
616 },
617 })
693 } 618 }
694 stale := cc.conns[ac.addr] 619 cc.conns[ac] = struct{}{}
695 cc.conns[ac.addr] = ac
696 cc.mu.Unlock() 620 cc.mu.Unlock()
697 if stale != nil { 621 return ac, nil
698 // There is an addrConn alive on ac.addr already. This could be due to 622}
699 // a buggy Balancer that reports duplicated Addresses. 623
700 if tearDownErr == nil { 624// removeAddrConn removes the addrConn in the subConn from clientConn.
701 // tearDownErr is nil if resetAddrConn is called by 625// It also tears down the ac with the given error.
702 // 1) Dial 626func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
703 // 2) lbWatcher 627 cc.mu.Lock()
704 // In both cases, the stale ac should drain, not close. 628 if cc.conns == nil {
705 stale.tearDown(errConnDrain) 629 cc.mu.Unlock()
706 } else { 630 return
707 stale.tearDown(tearDownErr)
708 }
709 } 631 }
710 if block { 632 delete(cc.conns, ac)
711 if err := ac.resetTransport(false); err != nil { 633 cc.mu.Unlock()
712 if err != errConnClosing { 634 ac.tearDown(err)
713 // Tear down ac and delete it from cc.conns. 635}
714 cc.mu.Lock() 636
715 delete(cc.conns, ac.addr) 637func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
716 cc.mu.Unlock() 638 return &channelz.ChannelInternalMetric{
717 ac.tearDown(err) 639 State: cc.GetState(),
718 } 640 Target: cc.target,
719 if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { 641 CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted),
720 return e.Origin() 642 CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded),
721 } 643 CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed),
722 return err 644 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
723 } 645 }
724 // Start to monitor the error status of transport. 646}
725 go ac.transportMonitor() 647
726 } else { 648// Target returns the target string of the ClientConn.
727 // Start a goroutine connecting to the server asynchronously. 649// This is an EXPERIMENTAL API.
728 go func() { 650func (cc *ClientConn) Target() string {
729 if err := ac.resetTransport(false); err != nil { 651 return cc.target
730 grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err) 652}
731 if err != errConnClosing { 653
732 // Keep this ac in cc.conns, to get the reason it's torn down. 654func (cc *ClientConn) incrCallsStarted() {
733 ac.tearDown(err) 655 atomic.AddInt64(&cc.czData.callsStarted, 1)
734 } 656 atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
735 return 657}
736 } 658
737 ac.transportMonitor() 659func (cc *ClientConn) incrCallsSucceeded() {
738 }() 660 atomic.AddInt64(&cc.czData.callsSucceeded, 1)
661}
662
663func (cc *ClientConn) incrCallsFailed() {
664 atomic.AddInt64(&cc.czData.callsFailed, 1)
665}
666
667// connect starts creating a transport.
668// It does nothing if the ac is not IDLE.
669// TODO(bar) Move this to the addrConn section.
670func (ac *addrConn) connect() error {
671 ac.mu.Lock()
672 if ac.state == connectivity.Shutdown {
673 ac.mu.Unlock()
674 return errConnClosing
675 }
676 if ac.state != connectivity.Idle {
677 ac.mu.Unlock()
678 return nil
739 } 679 }
680 ac.updateConnectivityState(connectivity.Connecting)
681 ac.mu.Unlock()
682
683 // Start a goroutine connecting to the server asynchronously.
684 go ac.resetTransport()
740 return nil 685 return nil
741} 686}
742 687
688// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
689//
690// It checks whether current connected address of ac is in the new addrs list.
691// - If true, it updates ac.addrs and returns true. The ac will keep using
692// the existing connection.
693// - If false, it does nothing and returns false.
694func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
695 ac.mu.Lock()
696 defer ac.mu.Unlock()
697 grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
698 if ac.state == connectivity.Shutdown {
699 ac.addrs = addrs
700 return true
701 }
702
703 // Unless we're busy reconnecting already, let's reconnect from the top of
704 // the list.
705 if ac.state != connectivity.Ready {
706 return false
707 }
708
709 var curAddrFound bool
710 for _, a := range addrs {
711 if reflect.DeepEqual(ac.curAddr, a) {
712 curAddrFound = true
713 break
714 }
715 }
716 grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
717 if curAddrFound {
718 ac.addrs = addrs
719 }
720
721 return curAddrFound
722}
723
743// GetMethodConfig gets the method config of the input method. 724// GetMethodConfig gets the method config of the input method.
744// If there's an exact match for input method (i.e. /service/method), we return 725// If there's an exact match for input method (i.e. /service/method), we return
745// the corresponding MethodConfig. 726// the corresponding MethodConfig.
746// If there isn't an exact match for the input method, we look for the default config 727// If there isn't an exact match for the input method, we look for the default config
747// under the service (i.e /service/). If there is a default MethodConfig for 728// under the service (i.e /service/). If there is a default MethodConfig for
748// the serivce, we return it. 729// the service, we return it.
749// Otherwise, we return an empty MethodConfig. 730// Otherwise, we return an empty MethodConfig.
750func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { 731func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
751 // TODO: Avoid the locking here. 732 // TODO: Avoid the locking here.
@@ -754,68 +735,122 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
754 m, ok := cc.sc.Methods[method] 735 m, ok := cc.sc.Methods[method]
755 if !ok { 736 if !ok {
756 i := strings.LastIndex(method, "/") 737 i := strings.LastIndex(method, "/")
757 m, _ = cc.sc.Methods[method[:i+1]] 738 m = cc.sc.Methods[method[:i+1]]
758 } 739 }
759 return m 740 return m
760} 741}
761 742
762func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { 743func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
763 var ( 744 cc.mu.RLock()
764 ac *addrConn 745 defer cc.mu.RUnlock()
765 ok bool 746 return cc.sc.healthCheckConfig
766 put func() 747}
767 ) 748
768 if cc.dopts.balancer == nil { 749func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
769 // If balancer is nil, there should be only one addrConn available. 750 hdr, _ := metadata.FromOutgoingContext(ctx)
770 cc.mu.RLock() 751 t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
771 if cc.conns == nil { 752 FullMethodName: method,
772 cc.mu.RUnlock() 753 Header: hdr,
773 return nil, nil, toRPCErr(ErrClientConnClosing) 754 })
774 } 755 if err != nil {
775 for _, ac = range cc.conns { 756 return nil, nil, toRPCErr(err)
776 // Break after the first iteration to get the first addrConn. 757 }
777 ok = true 758 return t, done, nil
778 break 759}
760
761// handleServiceConfig parses the service config string in JSON format to Go native
762// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
763func (cc *ClientConn) handleServiceConfig(js string) error {
764 if cc.dopts.disableServiceConfig {
765 return nil
766 }
767 if cc.scRaw == js {
768 return nil
769 }
770 if channelz.IsOn() {
771 channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
772 // The special formatting of \"%s\" instead of %q is to provide nice printing of service config
773 // for human consumption.
774 Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js),
775 Severity: channelz.CtINFO,
776 })
777 }
778 sc, err := parseServiceConfig(js)
779 if err != nil {
780 return err
781 }
782 cc.mu.Lock()
783 // Check if the ClientConn is already closed. Some fields (e.g.
784 // balancerWrapper) are set to nil when closing the ClientConn, and could
785 // cause nil pointer panic if we don't have this check.
786 if cc.conns == nil {
787 cc.mu.Unlock()
788 return nil
789 }
790 cc.scRaw = js
791 cc.sc = sc
792
793 if sc.retryThrottling != nil {
794 newThrottler := &retryThrottler{
795 tokens: sc.retryThrottling.MaxTokens,
796 max: sc.retryThrottling.MaxTokens,
797 thresh: sc.retryThrottling.MaxTokens / 2,
798 ratio: sc.retryThrottling.TokenRatio,
779 } 799 }
780 cc.mu.RUnlock() 800 cc.retryThrottler.Store(newThrottler)
781 } else { 801 } else {
782 var ( 802 cc.retryThrottler.Store((*retryThrottler)(nil))
783 addr Address
784 err error
785 )
786 addr, put, err = cc.dopts.balancer.Get(ctx, opts)
787 if err != nil {
788 return nil, nil, toRPCErr(err)
789 }
790 cc.mu.RLock()
791 if cc.conns == nil {
792 cc.mu.RUnlock()
793 return nil, nil, toRPCErr(ErrClientConnClosing)
794 }
795 ac, ok = cc.conns[addr]
796 cc.mu.RUnlock()
797 } 803 }
798 if !ok { 804
799 if put != nil { 805 if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
800 updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false}) 806 if cc.curBalancerName == grpclbName {
801 put() 807 // If current balancer is grpclb, there's at least one grpclb
808 // balancer address in the resolved list. Don't switch the balancer,
809 // but change the previous balancer name, so if a new resolved
810 // address list doesn't contain grpclb address, balancer will be
811 // switched to *sc.LB.
812 cc.preBalancerName = *sc.LB
813 } else {
814 cc.switchBalancer(*sc.LB)
815 cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
802 } 816 }
803 return nil, nil, errConnClosing
804 } 817 }
805 t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait) 818
806 if err != nil { 819 cc.mu.Unlock()
807 if put != nil { 820 return nil
808 updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false}) 821}
809 put() 822
810 } 823func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
811 return nil, nil, err 824 cc.mu.RLock()
825 r := cc.resolverWrapper
826 cc.mu.RUnlock()
827 if r == nil {
828 return
829 }
830 go r.resolveNow(o)
831}
832
833// ResetConnectBackoff wakes up all subchannels in transient failure and causes
834// them to attempt another connection immediately. It also resets the backoff
835// times used for subsequent attempts regardless of the current state.
836//
837// In general, this function should not be used. Typical service or network
838// outages result in a reasonable client reconnection strategy by default.
839// However, if a previously unavailable network becomes available, this may be
840// used to trigger an immediate reconnect.
841//
842// This API is EXPERIMENTAL.
843func (cc *ClientConn) ResetConnectBackoff() {
844 cc.mu.Lock()
845 defer cc.mu.Unlock()
846 for ac := range cc.conns {
847 ac.resetConnectBackoff()
812 } 848 }
813 return t, put, nil
814} 849}
815 850
816// Close tears down the ClientConn and all underlying connections. 851// Close tears down the ClientConn and all underlying connections.
817func (cc *ClientConn) Close() error { 852func (cc *ClientConn) Close() error {
818 cc.cancel() 853 defer cc.cancel()
819 854
820 cc.mu.Lock() 855 cc.mu.Lock()
821 if cc.conns == nil { 856 if cc.conns == nil {
@@ -825,13 +860,41 @@ func (cc *ClientConn) Close() error {
825 conns := cc.conns 860 conns := cc.conns
826 cc.conns = nil 861 cc.conns = nil
827 cc.csMgr.updateState(connectivity.Shutdown) 862 cc.csMgr.updateState(connectivity.Shutdown)
863
864 rWrapper := cc.resolverWrapper
865 cc.resolverWrapper = nil
866 bWrapper := cc.balancerWrapper
867 cc.balancerWrapper = nil
828 cc.mu.Unlock() 868 cc.mu.Unlock()
829 if cc.dopts.balancer != nil { 869
830 cc.dopts.balancer.Close() 870 cc.blockingpicker.close()
871
872 if rWrapper != nil {
873 rWrapper.close()
831 } 874 }
832 for _, ac := range conns { 875 if bWrapper != nil {
876 bWrapper.close()
877 }
878
879 for ac := range conns {
833 ac.tearDown(ErrClientConnClosing) 880 ac.tearDown(ErrClientConnClosing)
834 } 881 }
882 if channelz.IsOn() {
883 ted := &channelz.TraceEventDesc{
884 Desc: "Channel Deleted",
885 Severity: channelz.CtINFO,
886 }
887 if cc.dopts.channelzParentID != 0 {
888 ted.Parent = &channelz.TraceEventDesc{
889 Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
890 Severity: channelz.CtINFO,
891 }
892 }
893 channelz.AddTraceEvent(cc.channelzID, ted)
894 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
895 // the entity beng deleted, and thus prevent it from being deleted right away.
896 channelz.RemoveEntry(cc.channelzID)
897 }
835 return nil 898 return nil
836} 899}
837 900
@@ -841,29 +904,56 @@ type addrConn struct {
841 cancel context.CancelFunc 904 cancel context.CancelFunc
842 905
843 cc *ClientConn 906 cc *ClientConn
844 addr Address
845 dopts dialOptions 907 dopts dialOptions
846 events trace.EventLog 908 acbw balancer.SubConn
909 scopts balancer.NewSubConnOptions
910
911 // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
912 // health checking may require server to report healthy to set ac to READY), and is reset
913 // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
914 // is received, transport is closed, ac has been torn down).
915 transport transport.ClientTransport // The current transport.
847 916
848 csEvltr *connectivityStateEvaluator 917 mu sync.Mutex
918 curAddr resolver.Address // The current address.
919 addrs []resolver.Address // All addresses that the resolver resolved to.
849 920
850 mu sync.Mutex 921 // Use updateConnectivityState for updating addrConn's connectivity state.
851 state connectivity.State 922 state connectivity.State
852 down func(error) // the handler called when a connection is down. 923
853 // ready is closed and becomes nil when a new transport is up or failed 924 tearDownErr error // The reason this addrConn is torn down.
854 // due to timeout. 925
855 ready chan struct{} 926 backoffIdx int // Needs to be stateful for resetConnectBackoff.
856 transport transport.ClientTransport 927 resetBackoff chan struct{}
857 928
858 // The reason this addrConn is torn down. 929 channelzID int64 // channelz unique identification number.
859 tearDownErr error 930 czData *channelzData
931 healthCheckEnabled bool
932}
933
934// Note: this requires a lock on ac.mu.
935func (ac *addrConn) updateConnectivityState(s connectivity.State) {
936 if ac.state == s {
937 return
938 }
939
940 updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
941 grpclog.Infof(updateMsg)
942 ac.state = s
943 if channelz.IsOn() {
944 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
945 Desc: updateMsg,
946 Severity: channelz.CtINFO,
947 })
948 }
949 ac.cc.handleSubConnStateChange(ac.acbw, s)
860} 950}
861 951
862// adjustParams updates parameters used to create transports upon 952// adjustParams updates parameters used to create transports upon
863// receiving a GoAway. 953// receiving a GoAway.
864func (ac *addrConn) adjustParams(r transport.GoAwayReason) { 954func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
865 switch r { 955 switch r {
866 case transport.TooManyPings: 956 case transport.GoAwayTooManyPings:
867 v := 2 * ac.dopts.copts.KeepaliveParams.Time 957 v := 2 * ac.dopts.copts.KeepaliveParams.Time
868 ac.cc.mu.Lock() 958 ac.cc.mu.Lock()
869 if v > ac.cc.mkp.Time { 959 if v > ac.cc.mkp.Time {
@@ -873,246 +963,359 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
873 } 963 }
874} 964}
875 965
876// printf records an event in ac's event log, unless ac has been closed. 966func (ac *addrConn) resetTransport() {
877// REQUIRES ac.mu is held. 967 for i := 0; ; i++ {
878func (ac *addrConn) printf(format string, a ...interface{}) { 968 tryNextAddrFromStart := grpcsync.NewEvent()
879 if ac.events != nil {
880 ac.events.Printf(format, a...)
881 }
882}
883 969
884// errorf records an error in ac's event log, unless ac has been closed.
885// REQUIRES ac.mu is held.
886func (ac *addrConn) errorf(format string, a ...interface{}) {
887 if ac.events != nil {
888 ac.events.Errorf(format, a...)
889 }
890}
891
892// resetTransport recreates a transport to the address for ac.
893// For the old transport:
894// - if drain is true, it will be gracefully closed.
895// - otherwise, it will be closed.
896func (ac *addrConn) resetTransport(drain bool) error {
897 ac.mu.Lock()
898 if ac.state == connectivity.Shutdown {
899 ac.mu.Unlock()
900 return errConnClosing
901 }
902 ac.printf("connecting")
903 if ac.down != nil {
904 ac.down(downErrorf(false, true, "%v", errNetworkIO))
905 ac.down = nil
906 }
907 oldState := ac.state
908 ac.state = connectivity.Connecting
909 ac.csEvltr.recordTransition(oldState, ac.state)
910 t := ac.transport
911 ac.transport = nil
912 ac.mu.Unlock()
913 if t != nil && !drain {
914 t.Close()
915 }
916 ac.cc.mu.RLock()
917 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
918 ac.cc.mu.RUnlock()
919 for retries := 0; ; retries++ {
920 ac.mu.Lock() 970 ac.mu.Lock()
921 if ac.state == connectivity.Shutdown { 971 if i > 0 {
922 // ac.tearDown(...) has been invoked. 972 ac.cc.resolveNow(resolver.ResolveNowOption{})
923 ac.mu.Unlock()
924 return errConnClosing
925 } 973 }
974 addrs := ac.addrs
975 backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
926 ac.mu.Unlock() 976 ac.mu.Unlock()
927 sleepTime := ac.dopts.bs.backoff(retries)
928 timeout := minConnectTimeout
929 if timeout < sleepTime {
930 timeout = sleepTime
931 }
932 ctx, cancel := context.WithTimeout(ac.ctx, timeout)
933 connectTime := time.Now()
934 sinfo := transport.TargetInfo{
935 Addr: ac.addr.Addr,
936 Metadata: ac.addr.Metadata,
937 }
938 newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
939 // Don't call cancel in success path due to a race in Go 1.6:
940 // https://github.com/golang/go/issues/15078.
941 if err != nil {
942 cancel()
943 977
944 if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { 978 addrLoop:
945 return err 979 for _, addr := range addrs {
980 ac.mu.Lock()
981
982 if ac.state == connectivity.Shutdown {
983 ac.mu.Unlock()
984 return
985 }
986 ac.updateConnectivityState(connectivity.Connecting)
987 ac.transport = nil
988 ac.mu.Unlock()
989
990 // This will be the duration that dial gets to finish.
991 dialDuration := getMinConnectTimeout()
992 if dialDuration < backoffFor {
993 // Give dial more time as we keep failing to connect.
994 dialDuration = backoffFor
946 } 995 }
947 grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr) 996 connectDeadline := time.Now().Add(dialDuration)
997
948 ac.mu.Lock() 998 ac.mu.Lock()
999 ac.cc.mu.RLock()
1000 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
1001 ac.cc.mu.RUnlock()
1002
949 if ac.state == connectivity.Shutdown { 1003 if ac.state == connectivity.Shutdown {
950 // ac.tearDown(...) has been invoked.
951 ac.mu.Unlock() 1004 ac.mu.Unlock()
952 return errConnClosing 1005 return
953 } 1006 }
954 ac.errorf("transient failure: %v", err) 1007
955 oldState = ac.state 1008 copts := ac.dopts.copts
956 ac.state = connectivity.TransientFailure 1009 if ac.scopts.CredsBundle != nil {
957 ac.csEvltr.recordTransition(oldState, ac.state) 1010 copts.CredsBundle = ac.scopts.CredsBundle
958 if ac.ready != nil {
959 close(ac.ready)
960 ac.ready = nil
961 } 1011 }
1012 hctx, hcancel := context.WithCancel(ac.ctx)
1013 defer hcancel()
962 ac.mu.Unlock() 1014 ac.mu.Unlock()
963 timer := time.NewTimer(sleepTime - time.Since(connectTime)) 1015
964 select { 1016 if channelz.IsOn() {
965 case <-timer.C: 1017 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
966 case <-ac.ctx.Done(): 1018 Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
967 timer.Stop() 1019 Severity: channelz.CtINFO,
968 return ac.ctx.Err() 1020 })
1021 }
1022
1023 reconnect := grpcsync.NewEvent()
1024 prefaceReceived := make(chan struct{})
1025 newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
1026 if err == nil {
1027 ac.mu.Lock()
1028 ac.curAddr = addr
1029 ac.transport = newTr
1030 ac.mu.Unlock()
1031
1032 healthCheckConfig := ac.cc.healthCheckConfig()
1033 // LB channel health checking is only enabled when all the four requirements below are met:
1034 // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
1035 // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
1036 // 3. a service config with non-empty healthCheckConfig field is provided,
1037 // 4. the current load balancer allows it.
1038 healthcheckManagingState := false
1039 if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
1040 if ac.cc.dopts.healthCheckFunc == nil {
1041 // TODO: add a link to the health check doc in the error message.
1042 grpclog.Error("the client side LB channel health check function has not been set.")
1043 } else {
1044 // TODO(deklerk) refactor to just return transport
1045 go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
1046 healthcheckManagingState = true
1047 }
1048 }
1049 if !healthcheckManagingState {
1050 ac.mu.Lock()
1051 ac.updateConnectivityState(connectivity.Ready)
1052 ac.mu.Unlock()
1053 }
1054 } else {
1055 hcancel()
1056 if err == errConnClosing {
1057 return
1058 }
1059
1060 if tryNextAddrFromStart.HasFired() {
1061 break addrLoop
1062 }
1063 continue
1064 }
1065
1066 ac.mu.Lock()
1067 reqHandshake := ac.dopts.reqHandshake
1068 ac.mu.Unlock()
1069
1070 <-reconnect.Done()
1071 hcancel()
1072
1073 if reqHandshake == envconfig.RequireHandshakeHybrid {
1074 // In RequireHandshakeHybrid mode, we must check to see whether
1075 // server preface has arrived yet to decide whether to start
1076 // reconnecting at the top of the list (server preface received)
1077 // or continue with the next addr in the list as if the
1078 // connection were not successful (server preface not received).
1079 select {
1080 case <-prefaceReceived:
1081 // We received a server preface - huzzah! We consider this
1082 // a success and restart from the top of the addr list.
1083 ac.mu.Lock()
1084 ac.backoffIdx = 0
1085 ac.mu.Unlock()
1086 break addrLoop
1087 default:
1088 // Despite having set state to READY, in hybrid mode we
1089 // consider this a failure and continue connecting at the
1090 // next addr in the list.
1091 ac.mu.Lock()
1092 if ac.state == connectivity.Shutdown {
1093 ac.mu.Unlock()
1094 return
1095 }
1096
1097 ac.updateConnectivityState(connectivity.TransientFailure)
1098 ac.mu.Unlock()
1099
1100 if tryNextAddrFromStart.HasFired() {
1101 break addrLoop
1102 }
1103 }
1104 } else {
1105 // In RequireHandshakeOn mode, we would have already waited for
1106 // the server preface, so we consider this a success and restart
1107 // from the top of the addr list. In RequireHandshakeOff mode,
1108 // we don't care to wait for the server preface before
1109 // considering this a success, so we also restart from the top
1110 // of the addr list.
1111 ac.mu.Lock()
1112 ac.backoffIdx = 0
1113 ac.mu.Unlock()
1114 break addrLoop
969 } 1115 }
970 timer.Stop()
971 continue
972 } 1116 }
1117
1118 // After exhausting all addresses, or after need to reconnect after a
1119 // READY, the addrConn enters TRANSIENT_FAILURE.
973 ac.mu.Lock() 1120 ac.mu.Lock()
974 ac.printf("ready")
975 if ac.state == connectivity.Shutdown { 1121 if ac.state == connectivity.Shutdown {
976 // ac.tearDown(...) has been invoked.
977 ac.mu.Unlock() 1122 ac.mu.Unlock()
978 newTransport.Close() 1123 return
979 return errConnClosing
980 }
981 oldState = ac.state
982 ac.state = connectivity.Ready
983 ac.csEvltr.recordTransition(oldState, ac.state)
984 ac.transport = newTransport
985 if ac.ready != nil {
986 close(ac.ready)
987 ac.ready = nil
988 }
989 if ac.cc.dopts.balancer != nil {
990 ac.down = ac.cc.dopts.balancer.Up(ac.addr)
991 } 1124 }
1125 ac.updateConnectivityState(connectivity.TransientFailure)
1126
1127 // Backoff.
1128 b := ac.resetBackoff
1129 timer := time.NewTimer(backoffFor)
1130 acctx := ac.ctx
992 ac.mu.Unlock() 1131 ac.mu.Unlock()
993 return nil 1132
1133 select {
1134 case <-timer.C:
1135 ac.mu.Lock()
1136 ac.backoffIdx++
1137 ac.mu.Unlock()
1138 case <-b:
1139 timer.Stop()
1140 case <-acctx.Done():
1141 timer.Stop()
1142 return
1143 }
994 } 1144 }
995} 1145}
996 1146
997// Run in a goroutine to track the error in transport and create the 1147// createTransport creates a connection to one of the backends in addrs. It
998// new transport if an error happens. It returns when the channel is closing. 1148// sets ac.transport in the success case, or it returns an error if it was
999func (ac *addrConn) transportMonitor() { 1149// unable to successfully create a transport.
1000 for { 1150//
1151// If waitForHandshake is enabled, it blocks until server preface arrives.
1152func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
1153 onCloseCalled := make(chan struct{})
1154
1155 target := transport.TargetInfo{
1156 Addr: addr.Addr,
1157 Metadata: addr.Metadata,
1158 Authority: ac.cc.authority,
1159 }
1160
1161 prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
1162
1163 onGoAway := func(r transport.GoAwayReason) {
1001 ac.mu.Lock() 1164 ac.mu.Lock()
1002 t := ac.transport 1165 ac.adjustParams(r)
1003 ac.mu.Unlock() 1166 ac.mu.Unlock()
1004 select { 1167 reconnect.Fire()
1005 // This is needed to detect the teardown when 1168 }
1006 // the addrConn is idle (i.e., no RPC in flight). 1169
1007 case <-ac.ctx.Done(): 1170 onClose := func() {
1008 select { 1171 close(onCloseCalled)
1009 case <-t.Error(): 1172 prefaceTimer.Stop()
1010 t.Close() 1173 reconnect.Fire()
1011 default: 1174 }
1012 } 1175
1013 return 1176 onPrefaceReceipt := func() {
1014 case <-t.GoAway(): 1177 close(prefaceReceived)
1015 ac.adjustParams(t.GetGoAwayReason()) 1178 prefaceTimer.Stop()
1016 // If GoAway happens without any network I/O error, the underlying transport 1179 }
1017 // will be gracefully closed, and a new transport will be created. 1180
1018 // (The transport will be closed when all the pending RPCs finished or failed.) 1181 connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
1019 // If GoAway and some network I/O error happen concurrently, the underlying transport 1182 defer cancel()
1020 // will be closed, and a new transport will be created. 1183 if channelz.IsOn() {
1021 var drain bool 1184 copts.ChannelzParentID = ac.channelzID
1185 }
1186
1187 newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
1188
1189 if err == nil {
1190 if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
1022 select { 1191 select {
1023 case <-t.Error(): 1192 case <-prefaceTimer.C:
1024 default: 1193 // We didn't get the preface in time.
1025 drain = true 1194 newTr.Close()
1026 } 1195 err = errors.New("timed out waiting for server handshake")
1027 if err := ac.resetTransport(drain); err != nil { 1196 case <-prefaceReceived:
1028 grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) 1197 // We got the preface - huzzah! things are good.
1029 if err != errConnClosing { 1198 case <-onCloseCalled:
1030 // Keep this ac in cc.conns, to get the reason it's torn down. 1199 // The transport has already closed - noop.
1031 ac.tearDown(err) 1200 return nil, errors.New("connection closed")
1032 }
1033 return
1034 } 1201 }
1035 case <-t.Error(): 1202 } else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
1036 select { 1203 go func() {
1037 case <-ac.ctx.Done(): 1204 select {
1038 t.Close() 1205 case <-prefaceTimer.C:
1039 return 1206 // We didn't get the preface in time.
1040 case <-t.GoAway(): 1207 newTr.Close()
1041 ac.adjustParams(t.GetGoAwayReason()) 1208 case <-prefaceReceived:
1042 if err := ac.resetTransport(false); err != nil { 1209 // We got the preface just in the nick of time - huzzah!
1043 grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) 1210 case <-onCloseCalled:
1044 if err != errConnClosing { 1211 // The transport has already closed - noop.
1045 // Keep this ac in cc.conns, to get the reason it's torn down.
1046 ac.tearDown(err)
1047 }
1048 return
1049 } 1212 }
1050 default: 1213 }()
1051 }
1052 ac.mu.Lock()
1053 if ac.state == connectivity.Shutdown {
1054 // ac has been shutdown.
1055 ac.mu.Unlock()
1056 return
1057 }
1058 oldState := ac.state
1059 ac.state = connectivity.TransientFailure
1060 ac.csEvltr.recordTransition(oldState, ac.state)
1061 ac.mu.Unlock()
1062 if err := ac.resetTransport(false); err != nil {
1063 grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
1064 ac.mu.Lock()
1065 ac.printf("transport exiting: %v", err)
1066 ac.mu.Unlock()
1067 grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
1068 if err != errConnClosing {
1069 // Keep this ac in cc.conns, to get the reason it's torn down.
1070 ac.tearDown(err)
1071 }
1072 return
1073 }
1074 } 1214 }
1075 } 1215 }
1076}
1077 1216
1078// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or 1217 if err != nil {
1079// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true. 1218 // newTr is either nil, or closed.
1080func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) { 1219 ac.cc.blockingpicker.updateConnectionError(err)
1081 for {
1082 ac.mu.Lock() 1220 ac.mu.Lock()
1083 switch { 1221 if ac.state == connectivity.Shutdown {
1084 case ac.state == connectivity.Shutdown: 1222 // ac.tearDown(...) has been invoked.
1085 if failfast || !hasBalancer {
1086 // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
1087 err := ac.tearDownErr
1088 ac.mu.Unlock()
1089 return nil, err
1090 }
1091 ac.mu.Unlock() 1223 ac.mu.Unlock()
1224
1092 return nil, errConnClosing 1225 return nil, errConnClosing
1093 case ac.state == connectivity.Ready: 1226 }
1094 ct := ac.transport 1227 ac.mu.Unlock()
1095 ac.mu.Unlock() 1228 grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
1096 return ct, nil 1229 return nil, err
1097 case ac.state == connectivity.TransientFailure: 1230 }
1098 if failfast || hasBalancer { 1231
1099 ac.mu.Unlock() 1232 // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
1100 return nil, errConnUnavailable 1233 ac.mu.Lock()
1234 if ac.state == connectivity.Shutdown {
1235 ac.mu.Unlock()
1236 newTr.Close()
1237 return nil, errConnClosing
1238 }
1239 ac.mu.Unlock()
1240
1241 // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
1242 ac.mu.Lock()
1243 if ac.state == connectivity.Shutdown {
1244 ac.mu.Unlock()
1245 newTr.Close()
1246 return nil, errConnClosing
1247 }
1248 ac.mu.Unlock()
1249
1250 return newTr, nil
1251}
1252
1253func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
1254 // Set up the health check helper functions
1255 newStream := func() (interface{}, error) {
1256 return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
1257 }
1258 firstReady := true
1259 reportHealth := func(ok bool) {
1260 ac.mu.Lock()
1261 defer ac.mu.Unlock()
1262 if ac.transport != newTr {
1263 return
1264 }
1265 if ok {
1266 if firstReady {
1267 firstReady = false
1268 ac.curAddr = addr
1101 } 1269 }
1270 ac.updateConnectivityState(connectivity.Ready)
1271 } else {
1272 ac.updateConnectivityState(connectivity.TransientFailure)
1102 } 1273 }
1103 ready := ac.ready 1274 }
1104 if ready == nil { 1275 err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
1105 ready = make(chan struct{}) 1276 if err != nil {
1106 ac.ready = ready 1277 if status.Code(err) == codes.Unimplemented {
1278 if channelz.IsOn() {
1279 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1280 Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled",
1281 Severity: channelz.CtError,
1282 })
1283 }
1284 grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
1285 } else {
1286 grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
1107 } 1287 }
1288 }
1289}
1290
1291func (ac *addrConn) resetConnectBackoff() {
1292 ac.mu.Lock()
1293 close(ac.resetBackoff)
1294 ac.backoffIdx = 0
1295 ac.resetBackoff = make(chan struct{})
1296 ac.mu.Unlock()
1297}
1298
1299// getReadyTransport returns the transport if ac's state is READY.
1300// Otherwise it returns nil, false.
1301// If ac's state is IDLE, it will trigger ac to connect.
1302func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
1303 ac.mu.Lock()
1304 if ac.state == connectivity.Ready && ac.transport != nil {
1305 t := ac.transport
1108 ac.mu.Unlock() 1306 ac.mu.Unlock()
1109 select { 1307 return t, true
1110 case <-ctx.Done(): 1308 }
1111 return nil, toRPCErr(ctx.Err()) 1309 var idle bool
1112 // Wait until the new transport is ready or failed. 1310 if ac.state == connectivity.Idle {
1113 case <-ready: 1311 idle = true
1114 }
1115 } 1312 }
1313 ac.mu.Unlock()
1314 // Trigger idle ac to connect.
1315 if idle {
1316 ac.connect()
1317 }
1318 return nil, false
1116} 1319}
1117 1320
1118// tearDown starts to tear down the addrConn. 1321// tearDown starts to tear down the addrConn.
@@ -1121,38 +1324,126 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans
1121// tight loop. 1324// tight loop.
1122// tearDown doesn't remove ac from ac.cc.conns. 1325// tearDown doesn't remove ac from ac.cc.conns.
1123func (ac *addrConn) tearDown(err error) { 1326func (ac *addrConn) tearDown(err error) {
1124 ac.cancel()
1125
1126 ac.mu.Lock() 1327 ac.mu.Lock()
1127 defer ac.mu.Unlock() 1328 if ac.state == connectivity.Shutdown {
1128 if ac.down != nil { 1329 ac.mu.Unlock()
1129 ac.down(downErrorf(false, false, "%v", err)) 1330 return
1130 ac.down = nil
1131 } 1331 }
1132 if err == errConnDrain && ac.transport != nil { 1332 curTr := ac.transport
1333 ac.transport = nil
1334 // We have to set the state to Shutdown before anything else to prevent races
1335 // between setting the state and logic that waits on context cancelation / etc.
1336 ac.updateConnectivityState(connectivity.Shutdown)
1337 ac.cancel()
1338 ac.tearDownErr = err
1339 ac.curAddr = resolver.Address{}
1340 if err == errConnDrain && curTr != nil {
1133 // GracefulClose(...) may be executed multiple times when 1341 // GracefulClose(...) may be executed multiple times when
1134 // i) receiving multiple GoAway frames from the server; or 1342 // i) receiving multiple GoAway frames from the server; or
1135 // ii) there are concurrent name resolver/Balancer triggered 1343 // ii) there are concurrent name resolver/Balancer triggered
1136 // address removal and GoAway. 1344 // address removal and GoAway.
1137 ac.transport.GracefulClose() 1345 // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
1346 ac.mu.Unlock()
1347 curTr.GracefulClose()
1348 ac.mu.Lock()
1138 } 1349 }
1139 if ac.state == connectivity.Shutdown { 1350 if channelz.IsOn() {
1140 return 1351 channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
1352 Desc: "Subchannel Deleted",
1353 Severity: channelz.CtINFO,
1354 Parent: &channelz.TraceEventDesc{
1355 Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
1356 Severity: channelz.CtINFO,
1357 },
1358 })
1359 // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
1360 // the entity beng deleted, and thus prevent it from being deleted right away.
1361 channelz.RemoveEntry(ac.channelzID)
1141 } 1362 }
1142 oldState := ac.state 1363 ac.mu.Unlock()
1143 ac.state = connectivity.Shutdown 1364}
1144 ac.tearDownErr = err 1365
1145 ac.csEvltr.recordTransition(oldState, ac.state) 1366func (ac *addrConn) getState() connectivity.State {
1146 if ac.events != nil { 1367 ac.mu.Lock()
1147 ac.events.Finish() 1368 defer ac.mu.Unlock()
1148 ac.events = nil 1369 return ac.state
1370}
1371
1372func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
1373 ac.mu.Lock()
1374 addr := ac.curAddr.Addr
1375 ac.mu.Unlock()
1376 return &channelz.ChannelInternalMetric{
1377 State: ac.getState(),
1378 Target: addr,
1379 CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted),
1380 CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded),
1381 CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed),
1382 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
1383 }
1384}
1385
1386func (ac *addrConn) incrCallsStarted() {
1387 atomic.AddInt64(&ac.czData.callsStarted, 1)
1388 atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
1389}
1390
1391func (ac *addrConn) incrCallsSucceeded() {
1392 atomic.AddInt64(&ac.czData.callsSucceeded, 1)
1393}
1394
1395func (ac *addrConn) incrCallsFailed() {
1396 atomic.AddInt64(&ac.czData.callsFailed, 1)
1397}
1398
1399type retryThrottler struct {
1400 max float64
1401 thresh float64
1402 ratio float64
1403
1404 mu sync.Mutex
1405 tokens float64 // TODO(dfawley): replace with atomic and remove lock.
1406}
1407
1408// throttle subtracts a retry token from the pool and returns whether a retry
1409// should be throttled (disallowed) based upon the retry throttling policy in
1410// the service config.
1411func (rt *retryThrottler) throttle() bool {
1412 if rt == nil {
1413 return false
1414 }
1415 rt.mu.Lock()
1416 defer rt.mu.Unlock()
1417 rt.tokens--
1418 if rt.tokens < 0 {
1419 rt.tokens = 0
1149 } 1420 }
1150 if ac.ready != nil { 1421 return rt.tokens <= rt.thresh
1151 close(ac.ready) 1422}
1152 ac.ready = nil 1423
1424func (rt *retryThrottler) successfulRPC() {
1425 if rt == nil {
1426 return
1153 } 1427 }
1154 if ac.transport != nil && err != errConnDrain { 1428 rt.mu.Lock()
1155 ac.transport.Close() 1429 defer rt.mu.Unlock()
1430 rt.tokens += rt.ratio
1431 if rt.tokens > rt.max {
1432 rt.tokens = rt.max
1156 } 1433 }
1157 return
1158} 1434}
1435
1436type channelzChannel struct {
1437 cc *ClientConn
1438}
1439
1440func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
1441 return c.cc.channelzMetric()
1442}
1443
1444// ErrClientConnTimeout indicates that the ClientConn cannot establish the
1445// underlying connections within the specified timeout.
1446//
1447// Deprecated: This error is never returned by grpc and should not be
1448// referenced by users.
1449var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")