diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r-- | vendor/google.golang.org/grpc/clientconn.go | 1158 |
1 files changed, 1158 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go new file mode 100644 index 0000000..e3f6cb1 --- /dev/null +++ b/vendor/google.golang.org/grpc/clientconn.go | |||
@@ -0,0 +1,1158 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package grpc | ||
20 | |||
21 | import ( | ||
22 | "errors" | ||
23 | "net" | ||
24 | "strings" | ||
25 | "sync" | ||
26 | "time" | ||
27 | |||
28 | "golang.org/x/net/context" | ||
29 | "golang.org/x/net/trace" | ||
30 | "google.golang.org/grpc/connectivity" | ||
31 | "google.golang.org/grpc/credentials" | ||
32 | "google.golang.org/grpc/grpclog" | ||
33 | "google.golang.org/grpc/keepalive" | ||
34 | "google.golang.org/grpc/stats" | ||
35 | "google.golang.org/grpc/transport" | ||
36 | ) | ||
37 | |||
38 | var ( | ||
39 | // ErrClientConnClosing indicates that the operation is illegal because | ||
40 | // the ClientConn is closing. | ||
41 | ErrClientConnClosing = errors.New("grpc: the client connection is closing") | ||
42 | // ErrClientConnTimeout indicates that the ClientConn cannot establish the | ||
43 | // underlying connections within the specified timeout. | ||
44 | // DEPRECATED: Please use context.DeadlineExceeded instead. | ||
45 | ErrClientConnTimeout = errors.New("grpc: timed out when dialing") | ||
46 | |||
47 | // errNoTransportSecurity indicates that there is no transport security | ||
48 | // being set for ClientConn. Users should either set one or explicitly | ||
49 | // call WithInsecure DialOption to disable security. | ||
50 | errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") | ||
51 | // errTransportCredentialsMissing indicates that users want to transmit security | ||
52 | // information (e.g., oauth2 token) which requires secure connection on an insecure | ||
53 | // connection. | ||
54 | errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") | ||
55 | // errCredentialsConflict indicates that grpc.WithTransportCredentials() | ||
56 | // and grpc.WithInsecure() are both called for a connection. | ||
57 | errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") | ||
58 | // errNetworkIO indicates that the connection is down due to some network I/O error. | ||
59 | errNetworkIO = errors.New("grpc: failed with network I/O error") | ||
60 | // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. | ||
61 | errConnDrain = errors.New("grpc: the connection is drained") | ||
62 | // errConnClosing indicates that the connection is closing. | ||
63 | errConnClosing = errors.New("grpc: the connection is closing") | ||
64 | // errConnUnavailable indicates that the connection is unavailable. | ||
65 | errConnUnavailable = errors.New("grpc: the connection is unavailable") | ||
66 | // errBalancerClosed indicates that the balancer is closed. | ||
67 | errBalancerClosed = errors.New("grpc: balancer is closed") | ||
68 | // minimum time to give a connection to complete | ||
69 | minConnectTimeout = 20 * time.Second | ||
70 | ) | ||
71 | |||
72 | // dialOptions configure a Dial call. dialOptions are set by the DialOption | ||
73 | // values passed to Dial. | ||
74 | type dialOptions struct { | ||
75 | unaryInt UnaryClientInterceptor | ||
76 | streamInt StreamClientInterceptor | ||
77 | codec Codec | ||
78 | cp Compressor | ||
79 | dc Decompressor | ||
80 | bs backoffStrategy | ||
81 | balancer Balancer | ||
82 | block bool | ||
83 | insecure bool | ||
84 | timeout time.Duration | ||
85 | scChan <-chan ServiceConfig | ||
86 | copts transport.ConnectOptions | ||
87 | callOptions []CallOption | ||
88 | } | ||
89 | |||
90 | const ( | ||
91 | defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 | ||
92 | defaultClientMaxSendMessageSize = 1024 * 1024 * 4 | ||
93 | ) | ||
94 | |||
95 | // DialOption configures how we set up the connection. | ||
96 | type DialOption func(*dialOptions) | ||
97 | |||
98 | // WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream. | ||
99 | // The lower bound for window size is 64K and any value smaller than that will be ignored. | ||
100 | func WithInitialWindowSize(s int32) DialOption { | ||
101 | return func(o *dialOptions) { | ||
102 | o.copts.InitialWindowSize = s | ||
103 | } | ||
104 | } | ||
105 | |||
106 | // WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection. | ||
107 | // The lower bound for window size is 64K and any value smaller than that will be ignored. | ||
108 | func WithInitialConnWindowSize(s int32) DialOption { | ||
109 | return func(o *dialOptions) { | ||
110 | o.copts.InitialConnWindowSize = s | ||
111 | } | ||
112 | } | ||
113 | |||
114 | // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead. | ||
115 | func WithMaxMsgSize(s int) DialOption { | ||
116 | return WithDefaultCallOptions(MaxCallRecvMsgSize(s)) | ||
117 | } | ||
118 | |||
119 | // WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection. | ||
120 | func WithDefaultCallOptions(cos ...CallOption) DialOption { | ||
121 | return func(o *dialOptions) { | ||
122 | o.callOptions = append(o.callOptions, cos...) | ||
123 | } | ||
124 | } | ||
125 | |||
126 | // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling. | ||
127 | func WithCodec(c Codec) DialOption { | ||
128 | return func(o *dialOptions) { | ||
129 | o.codec = c | ||
130 | } | ||
131 | } | ||
132 | |||
133 | // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message | ||
134 | // compressor. | ||
135 | func WithCompressor(cp Compressor) DialOption { | ||
136 | return func(o *dialOptions) { | ||
137 | o.cp = cp | ||
138 | } | ||
139 | } | ||
140 | |||
141 | // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating | ||
142 | // message decompressor. | ||
143 | func WithDecompressor(dc Decompressor) DialOption { | ||
144 | return func(o *dialOptions) { | ||
145 | o.dc = dc | ||
146 | } | ||
147 | } | ||
148 | |||
149 | // WithBalancer returns a DialOption which sets a load balancer. | ||
150 | func WithBalancer(b Balancer) DialOption { | ||
151 | return func(o *dialOptions) { | ||
152 | o.balancer = b | ||
153 | } | ||
154 | } | ||
155 | |||
156 | // WithServiceConfig returns a DialOption which has a channel to read the service configuration. | ||
157 | func WithServiceConfig(c <-chan ServiceConfig) DialOption { | ||
158 | return func(o *dialOptions) { | ||
159 | o.scChan = c | ||
160 | } | ||
161 | } | ||
162 | |||
163 | // WithBackoffMaxDelay configures the dialer to use the provided maximum delay | ||
164 | // when backing off after failed connection attempts. | ||
165 | func WithBackoffMaxDelay(md time.Duration) DialOption { | ||
166 | return WithBackoffConfig(BackoffConfig{MaxDelay: md}) | ||
167 | } | ||
168 | |||
169 | // WithBackoffConfig configures the dialer to use the provided backoff | ||
170 | // parameters after connection failures. | ||
171 | // | ||
172 | // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up | ||
173 | // for use. | ||
174 | func WithBackoffConfig(b BackoffConfig) DialOption { | ||
175 | // Set defaults to ensure that provided BackoffConfig is valid and | ||
176 | // unexported fields get default values. | ||
177 | setDefaults(&b) | ||
178 | return withBackoff(b) | ||
179 | } | ||
180 | |||
181 | // withBackoff sets the backoff strategy used for retries after a | ||
182 | // failed connection attempt. | ||
183 | // | ||
184 | // This can be exported if arbitrary backoff strategies are allowed by gRPC. | ||
185 | func withBackoff(bs backoffStrategy) DialOption { | ||
186 | return func(o *dialOptions) { | ||
187 | o.bs = bs | ||
188 | } | ||
189 | } | ||
190 | |||
191 | // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying | ||
192 | // connection is up. Without this, Dial returns immediately and connecting the server | ||
193 | // happens in background. | ||
194 | func WithBlock() DialOption { | ||
195 | return func(o *dialOptions) { | ||
196 | o.block = true | ||
197 | } | ||
198 | } | ||
199 | |||
200 | // WithInsecure returns a DialOption which disables transport security for this ClientConn. | ||
201 | // Note that transport security is required unless WithInsecure is set. | ||
202 | func WithInsecure() DialOption { | ||
203 | return func(o *dialOptions) { | ||
204 | o.insecure = true | ||
205 | } | ||
206 | } | ||
207 | |||
208 | // WithTransportCredentials returns a DialOption which configures a | ||
209 | // connection level security credentials (e.g., TLS/SSL). | ||
210 | func WithTransportCredentials(creds credentials.TransportCredentials) DialOption { | ||
211 | return func(o *dialOptions) { | ||
212 | o.copts.TransportCredentials = creds | ||
213 | } | ||
214 | } | ||
215 | |||
216 | // WithPerRPCCredentials returns a DialOption which sets | ||
217 | // credentials and places auth state on each outbound RPC. | ||
218 | func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption { | ||
219 | return func(o *dialOptions) { | ||
220 | o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds) | ||
221 | } | ||
222 | } | ||
223 | |||
224 | // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn | ||
225 | // initially. This is valid if and only if WithBlock() is present. | ||
226 | // Deprecated: use DialContext and context.WithTimeout instead. | ||
227 | func WithTimeout(d time.Duration) DialOption { | ||
228 | return func(o *dialOptions) { | ||
229 | o.timeout = d | ||
230 | } | ||
231 | } | ||
232 | |||
233 | // WithDialer returns a DialOption that specifies a function to use for dialing network addresses. | ||
234 | // If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's | ||
235 | // Temporary() method to decide if it should try to reconnect to the network address. | ||
236 | func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { | ||
237 | return func(o *dialOptions) { | ||
238 | o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) { | ||
239 | if deadline, ok := ctx.Deadline(); ok { | ||
240 | return f(addr, deadline.Sub(time.Now())) | ||
241 | } | ||
242 | return f(addr, 0) | ||
243 | } | ||
244 | } | ||
245 | } | ||
246 | |||
247 | // WithStatsHandler returns a DialOption that specifies the stats handler | ||
248 | // for all the RPCs and underlying network connections in this ClientConn. | ||
249 | func WithStatsHandler(h stats.Handler) DialOption { | ||
250 | return func(o *dialOptions) { | ||
251 | o.copts.StatsHandler = h | ||
252 | } | ||
253 | } | ||
254 | |||
255 | // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors. | ||
256 | // If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network | ||
257 | // address and won't try to reconnect. | ||
258 | // The default value of FailOnNonTempDialError is false. | ||
259 | // This is an EXPERIMENTAL API. | ||
260 | func FailOnNonTempDialError(f bool) DialOption { | ||
261 | return func(o *dialOptions) { | ||
262 | o.copts.FailOnNonTempDialError = f | ||
263 | } | ||
264 | } | ||
265 | |||
266 | // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs. | ||
267 | func WithUserAgent(s string) DialOption { | ||
268 | return func(o *dialOptions) { | ||
269 | o.copts.UserAgent = s | ||
270 | } | ||
271 | } | ||
272 | |||
273 | // WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport. | ||
274 | func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption { | ||
275 | return func(o *dialOptions) { | ||
276 | o.copts.KeepaliveParams = kp | ||
277 | } | ||
278 | } | ||
279 | |||
280 | // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs. | ||
281 | func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { | ||
282 | return func(o *dialOptions) { | ||
283 | o.unaryInt = f | ||
284 | } | ||
285 | } | ||
286 | |||
287 | // WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs. | ||
288 | func WithStreamInterceptor(f StreamClientInterceptor) DialOption { | ||
289 | return func(o *dialOptions) { | ||
290 | o.streamInt = f | ||
291 | } | ||
292 | } | ||
293 | |||
294 | // WithAuthority returns a DialOption that specifies the value to be used as | ||
295 | // the :authority pseudo-header. This value only works with WithInsecure and | ||
296 | // has no effect if TransportCredentials are present. | ||
297 | func WithAuthority(a string) DialOption { | ||
298 | return func(o *dialOptions) { | ||
299 | o.copts.Authority = a | ||
300 | } | ||
301 | } | ||
302 | |||
303 | // Dial creates a client connection to the given target. | ||
304 | func Dial(target string, opts ...DialOption) (*ClientConn, error) { | ||
305 | return DialContext(context.Background(), target, opts...) | ||
306 | } | ||
307 | |||
308 | // DialContext creates a client connection to the given target. ctx can be used to | ||
309 | // cancel or expire the pending connection. Once this function returns, the | ||
310 | // cancellation and expiration of ctx will be noop. Users should call ClientConn.Close | ||
311 | // to terminate all the pending operations after this function returns. | ||
312 | func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { | ||
313 | cc := &ClientConn{ | ||
314 | target: target, | ||
315 | csMgr: &connectivityStateManager{}, | ||
316 | conns: make(map[Address]*addrConn), | ||
317 | } | ||
318 | cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr} | ||
319 | cc.ctx, cc.cancel = context.WithCancel(context.Background()) | ||
320 | |||
321 | for _, opt := range opts { | ||
322 | opt(&cc.dopts) | ||
323 | } | ||
324 | cc.mkp = cc.dopts.copts.KeepaliveParams | ||
325 | |||
326 | if cc.dopts.copts.Dialer == nil { | ||
327 | cc.dopts.copts.Dialer = newProxyDialer( | ||
328 | func(ctx context.Context, addr string) (net.Conn, error) { | ||
329 | return dialContext(ctx, "tcp", addr) | ||
330 | }, | ||
331 | ) | ||
332 | } | ||
333 | |||
334 | if cc.dopts.copts.UserAgent != "" { | ||
335 | cc.dopts.copts.UserAgent += " " + grpcUA | ||
336 | } else { | ||
337 | cc.dopts.copts.UserAgent = grpcUA | ||
338 | } | ||
339 | |||
340 | if cc.dopts.timeout > 0 { | ||
341 | var cancel context.CancelFunc | ||
342 | ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) | ||
343 | defer cancel() | ||
344 | } | ||
345 | |||
346 | defer func() { | ||
347 | select { | ||
348 | case <-ctx.Done(): | ||
349 | conn, err = nil, ctx.Err() | ||
350 | default: | ||
351 | } | ||
352 | |||
353 | if err != nil { | ||
354 | cc.Close() | ||
355 | } | ||
356 | }() | ||
357 | |||
358 | scSet := false | ||
359 | if cc.dopts.scChan != nil { | ||
360 | // Try to get an initial service config. | ||
361 | select { | ||
362 | case sc, ok := <-cc.dopts.scChan: | ||
363 | if ok { | ||
364 | cc.sc = sc | ||
365 | scSet = true | ||
366 | } | ||
367 | default: | ||
368 | } | ||
369 | } | ||
370 | // Set defaults. | ||
371 | if cc.dopts.codec == nil { | ||
372 | cc.dopts.codec = protoCodec{} | ||
373 | } | ||
374 | if cc.dopts.bs == nil { | ||
375 | cc.dopts.bs = DefaultBackoffConfig | ||
376 | } | ||
377 | creds := cc.dopts.copts.TransportCredentials | ||
378 | if creds != nil && creds.Info().ServerName != "" { | ||
379 | cc.authority = creds.Info().ServerName | ||
380 | } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" { | ||
381 | cc.authority = cc.dopts.copts.Authority | ||
382 | } else { | ||
383 | cc.authority = target | ||
384 | } | ||
385 | waitC := make(chan error, 1) | ||
386 | go func() { | ||
387 | defer close(waitC) | ||
388 | if cc.dopts.balancer == nil && cc.sc.LB != nil { | ||
389 | cc.dopts.balancer = cc.sc.LB | ||
390 | } | ||
391 | if cc.dopts.balancer != nil { | ||
392 | var credsClone credentials.TransportCredentials | ||
393 | if creds != nil { | ||
394 | credsClone = creds.Clone() | ||
395 | } | ||
396 | config := BalancerConfig{ | ||
397 | DialCreds: credsClone, | ||
398 | Dialer: cc.dopts.copts.Dialer, | ||
399 | } | ||
400 | if err := cc.dopts.balancer.Start(target, config); err != nil { | ||
401 | waitC <- err | ||
402 | return | ||
403 | } | ||
404 | ch := cc.dopts.balancer.Notify() | ||
405 | if ch != nil { | ||
406 | if cc.dopts.block { | ||
407 | doneChan := make(chan struct{}) | ||
408 | go cc.lbWatcher(doneChan) | ||
409 | <-doneChan | ||
410 | } else { | ||
411 | go cc.lbWatcher(nil) | ||
412 | } | ||
413 | return | ||
414 | } | ||
415 | } | ||
416 | // No balancer, or no resolver within the balancer. Connect directly. | ||
417 | if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil { | ||
418 | waitC <- err | ||
419 | return | ||
420 | } | ||
421 | }() | ||
422 | select { | ||
423 | case <-ctx.Done(): | ||
424 | return nil, ctx.Err() | ||
425 | case err := <-waitC: | ||
426 | if err != nil { | ||
427 | return nil, err | ||
428 | } | ||
429 | } | ||
430 | if cc.dopts.scChan != nil && !scSet { | ||
431 | // Blocking wait for the initial service config. | ||
432 | select { | ||
433 | case sc, ok := <-cc.dopts.scChan: | ||
434 | if ok { | ||
435 | cc.sc = sc | ||
436 | } | ||
437 | case <-ctx.Done(): | ||
438 | return nil, ctx.Err() | ||
439 | } | ||
440 | } | ||
441 | if cc.dopts.scChan != nil { | ||
442 | go cc.scWatcher() | ||
443 | } | ||
444 | |||
445 | return cc, nil | ||
446 | } | ||
447 | |||
448 | // connectivityStateEvaluator gets updated by addrConns when their | ||
449 | // states transition, based on which it evaluates the state of | ||
450 | // ClientConn. | ||
451 | // Note: This code will eventually sit in the balancer in the new design. | ||
452 | type connectivityStateEvaluator struct { | ||
453 | csMgr *connectivityStateManager | ||
454 | mu sync.Mutex | ||
455 | numReady uint64 // Number of addrConns in ready state. | ||
456 | numConnecting uint64 // Number of addrConns in connecting state. | ||
457 | numTransientFailure uint64 // Number of addrConns in transientFailure. | ||
458 | } | ||
459 | |||
460 | // recordTransition records state change happening in every addrConn and based on | ||
461 | // that it evaluates what state the ClientConn is in. | ||
462 | // It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states, | ||
463 | // Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection | ||
464 | // before any addrConn is created ClientConn is in idle state. In the end when ClientConn | ||
465 | // closes it is in connectivity.Shutdown state. | ||
466 | // TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state. | ||
467 | func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) { | ||
468 | cse.mu.Lock() | ||
469 | defer cse.mu.Unlock() | ||
470 | |||
471 | // Update counters. | ||
472 | for idx, state := range []connectivity.State{oldState, newState} { | ||
473 | updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. | ||
474 | switch state { | ||
475 | case connectivity.Ready: | ||
476 | cse.numReady += updateVal | ||
477 | case connectivity.Connecting: | ||
478 | cse.numConnecting += updateVal | ||
479 | case connectivity.TransientFailure: | ||
480 | cse.numTransientFailure += updateVal | ||
481 | } | ||
482 | } | ||
483 | |||
484 | // Evaluate. | ||
485 | if cse.numReady > 0 { | ||
486 | cse.csMgr.updateState(connectivity.Ready) | ||
487 | return | ||
488 | } | ||
489 | if cse.numConnecting > 0 { | ||
490 | cse.csMgr.updateState(connectivity.Connecting) | ||
491 | return | ||
492 | } | ||
493 | cse.csMgr.updateState(connectivity.TransientFailure) | ||
494 | } | ||
495 | |||
496 | // connectivityStateManager keeps the connectivity.State of ClientConn. | ||
497 | // This struct will eventually be exported so the balancers can access it. | ||
498 | type connectivityStateManager struct { | ||
499 | mu sync.Mutex | ||
500 | state connectivity.State | ||
501 | notifyChan chan struct{} | ||
502 | } | ||
503 | |||
504 | // updateState updates the connectivity.State of ClientConn. | ||
505 | // If there's a change it notifies goroutines waiting on state change to | ||
506 | // happen. | ||
507 | func (csm *connectivityStateManager) updateState(state connectivity.State) { | ||
508 | csm.mu.Lock() | ||
509 | defer csm.mu.Unlock() | ||
510 | if csm.state == connectivity.Shutdown { | ||
511 | return | ||
512 | } | ||
513 | if csm.state == state { | ||
514 | return | ||
515 | } | ||
516 | csm.state = state | ||
517 | if csm.notifyChan != nil { | ||
518 | // There are other goroutines waiting on this channel. | ||
519 | close(csm.notifyChan) | ||
520 | csm.notifyChan = nil | ||
521 | } | ||
522 | } | ||
523 | |||
524 | func (csm *connectivityStateManager) getState() connectivity.State { | ||
525 | csm.mu.Lock() | ||
526 | defer csm.mu.Unlock() | ||
527 | return csm.state | ||
528 | } | ||
529 | |||
530 | func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { | ||
531 | csm.mu.Lock() | ||
532 | defer csm.mu.Unlock() | ||
533 | if csm.notifyChan == nil { | ||
534 | csm.notifyChan = make(chan struct{}) | ||
535 | } | ||
536 | return csm.notifyChan | ||
537 | } | ||
538 | |||
539 | // ClientConn represents a client connection to an RPC server. | ||
540 | type ClientConn struct { | ||
541 | ctx context.Context | ||
542 | cancel context.CancelFunc | ||
543 | |||
544 | target string | ||
545 | authority string | ||
546 | dopts dialOptions | ||
547 | csMgr *connectivityStateManager | ||
548 | csEvltr *connectivityStateEvaluator // This will eventually be part of balancer. | ||
549 | |||
550 | mu sync.RWMutex | ||
551 | sc ServiceConfig | ||
552 | conns map[Address]*addrConn | ||
553 | // Keepalive parameter can be updated if a GoAway is received. | ||
554 | mkp keepalive.ClientParameters | ||
555 | } | ||
556 | |||
557 | // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or | ||
558 | // ctx expires. A true value is returned in former case and false in latter. | ||
559 | // This is an EXPERIMENTAL API. | ||
560 | func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { | ||
561 | ch := cc.csMgr.getNotifyChan() | ||
562 | if cc.csMgr.getState() != sourceState { | ||
563 | return true | ||
564 | } | ||
565 | select { | ||
566 | case <-ctx.Done(): | ||
567 | return false | ||
568 | case <-ch: | ||
569 | return true | ||
570 | } | ||
571 | } | ||
572 | |||
573 | // GetState returns the connectivity.State of ClientConn. | ||
574 | // This is an EXPERIMENTAL API. | ||
575 | func (cc *ClientConn) GetState() connectivity.State { | ||
576 | return cc.csMgr.getState() | ||
577 | } | ||
578 | |||
579 | // lbWatcher watches the Notify channel of the balancer in cc and manages | ||
580 | // connections accordingly. If doneChan is not nil, it is closed after the | ||
581 | // first successfull connection is made. | ||
582 | func (cc *ClientConn) lbWatcher(doneChan chan struct{}) { | ||
583 | defer func() { | ||
584 | // In case channel from cc.dopts.balancer.Notify() gets closed before a | ||
585 | // successful connection gets established, don't forget to notify the | ||
586 | // caller. | ||
587 | if doneChan != nil { | ||
588 | close(doneChan) | ||
589 | } | ||
590 | }() | ||
591 | |||
592 | for addrs := range cc.dopts.balancer.Notify() { | ||
593 | var ( | ||
594 | add []Address // Addresses need to setup connections. | ||
595 | del []*addrConn // Connections need to tear down. | ||
596 | ) | ||
597 | cc.mu.Lock() | ||
598 | for _, a := range addrs { | ||
599 | if _, ok := cc.conns[a]; !ok { | ||
600 | add = append(add, a) | ||
601 | } | ||
602 | } | ||
603 | for k, c := range cc.conns { | ||
604 | var keep bool | ||
605 | for _, a := range addrs { | ||
606 | if k == a { | ||
607 | keep = true | ||
608 | break | ||
609 | } | ||
610 | } | ||
611 | if !keep { | ||
612 | del = append(del, c) | ||
613 | delete(cc.conns, c.addr) | ||
614 | } | ||
615 | } | ||
616 | cc.mu.Unlock() | ||
617 | for _, a := range add { | ||
618 | var err error | ||
619 | if doneChan != nil { | ||
620 | err = cc.resetAddrConn(a, true, nil) | ||
621 | if err == nil { | ||
622 | close(doneChan) | ||
623 | doneChan = nil | ||
624 | } | ||
625 | } else { | ||
626 | err = cc.resetAddrConn(a, false, nil) | ||
627 | } | ||
628 | if err != nil { | ||
629 | grpclog.Warningf("Error creating connection to %v. Err: %v", a, err) | ||
630 | } | ||
631 | } | ||
632 | for _, c := range del { | ||
633 | c.tearDown(errConnDrain) | ||
634 | } | ||
635 | } | ||
636 | } | ||
637 | |||
638 | func (cc *ClientConn) scWatcher() { | ||
639 | for { | ||
640 | select { | ||
641 | case sc, ok := <-cc.dopts.scChan: | ||
642 | if !ok { | ||
643 | return | ||
644 | } | ||
645 | cc.mu.Lock() | ||
646 | // TODO: load balance policy runtime change is ignored. | ||
647 | // We may revist this decision in the future. | ||
648 | cc.sc = sc | ||
649 | cc.mu.Unlock() | ||
650 | case <-cc.ctx.Done(): | ||
651 | return | ||
652 | } | ||
653 | } | ||
654 | } | ||
655 | |||
656 | // resetAddrConn creates an addrConn for addr and adds it to cc.conns. | ||
657 | // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. | ||
658 | // If tearDownErr is nil, errConnDrain will be used instead. | ||
659 | // | ||
660 | // We should never need to replace an addrConn with a new one. This function is only used | ||
661 | // as newAddrConn to create new addrConn. | ||
662 | // TODO rename this function and clean up the code. | ||
663 | func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error { | ||
664 | ac := &addrConn{ | ||
665 | cc: cc, | ||
666 | addr: addr, | ||
667 | dopts: cc.dopts, | ||
668 | } | ||
669 | ac.ctx, ac.cancel = context.WithCancel(cc.ctx) | ||
670 | ac.csEvltr = cc.csEvltr | ||
671 | if EnableTracing { | ||
672 | ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr) | ||
673 | } | ||
674 | if !ac.dopts.insecure { | ||
675 | if ac.dopts.copts.TransportCredentials == nil { | ||
676 | return errNoTransportSecurity | ||
677 | } | ||
678 | } else { | ||
679 | if ac.dopts.copts.TransportCredentials != nil { | ||
680 | return errCredentialsConflict | ||
681 | } | ||
682 | for _, cd := range ac.dopts.copts.PerRPCCredentials { | ||
683 | if cd.RequireTransportSecurity() { | ||
684 | return errTransportCredentialsMissing | ||
685 | } | ||
686 | } | ||
687 | } | ||
688 | // Track ac in cc. This needs to be done before any getTransport(...) is called. | ||
689 | cc.mu.Lock() | ||
690 | if cc.conns == nil { | ||
691 | cc.mu.Unlock() | ||
692 | return ErrClientConnClosing | ||
693 | } | ||
694 | stale := cc.conns[ac.addr] | ||
695 | cc.conns[ac.addr] = ac | ||
696 | cc.mu.Unlock() | ||
697 | if stale != nil { | ||
698 | // There is an addrConn alive on ac.addr already. This could be due to | ||
699 | // a buggy Balancer that reports duplicated Addresses. | ||
700 | if tearDownErr == nil { | ||
701 | // tearDownErr is nil if resetAddrConn is called by | ||
702 | // 1) Dial | ||
703 | // 2) lbWatcher | ||
704 | // In both cases, the stale ac should drain, not close. | ||
705 | stale.tearDown(errConnDrain) | ||
706 | } else { | ||
707 | stale.tearDown(tearDownErr) | ||
708 | } | ||
709 | } | ||
710 | if block { | ||
711 | if err := ac.resetTransport(false); err != nil { | ||
712 | if err != errConnClosing { | ||
713 | // Tear down ac and delete it from cc.conns. | ||
714 | cc.mu.Lock() | ||
715 | delete(cc.conns, ac.addr) | ||
716 | cc.mu.Unlock() | ||
717 | ac.tearDown(err) | ||
718 | } | ||
719 | if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { | ||
720 | return e.Origin() | ||
721 | } | ||
722 | return err | ||
723 | } | ||
724 | // Start to monitor the error status of transport. | ||
725 | go ac.transportMonitor() | ||
726 | } else { | ||
727 | // Start a goroutine connecting to the server asynchronously. | ||
728 | go func() { | ||
729 | if err := ac.resetTransport(false); err != nil { | ||
730 | grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err) | ||
731 | if err != errConnClosing { | ||
732 | // Keep this ac in cc.conns, to get the reason it's torn down. | ||
733 | ac.tearDown(err) | ||
734 | } | ||
735 | return | ||
736 | } | ||
737 | ac.transportMonitor() | ||
738 | }() | ||
739 | } | ||
740 | return nil | ||
741 | } | ||
742 | |||
743 | // GetMethodConfig gets the method config of the input method. | ||
744 | // If there's an exact match for input method (i.e. /service/method), we return | ||
745 | // the corresponding MethodConfig. | ||
746 | // If there isn't an exact match for the input method, we look for the default config | ||
747 | // under the service (i.e /service/). If there is a default MethodConfig for | ||
748 | // the serivce, we return it. | ||
749 | // Otherwise, we return an empty MethodConfig. | ||
750 | func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { | ||
751 | // TODO: Avoid the locking here. | ||
752 | cc.mu.RLock() | ||
753 | defer cc.mu.RUnlock() | ||
754 | m, ok := cc.sc.Methods[method] | ||
755 | if !ok { | ||
756 | i := strings.LastIndex(method, "/") | ||
757 | m, _ = cc.sc.Methods[method[:i+1]] | ||
758 | } | ||
759 | return m | ||
760 | } | ||
761 | |||
762 | func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { | ||
763 | var ( | ||
764 | ac *addrConn | ||
765 | ok bool | ||
766 | put func() | ||
767 | ) | ||
768 | if cc.dopts.balancer == nil { | ||
769 | // If balancer is nil, there should be only one addrConn available. | ||
770 | cc.mu.RLock() | ||
771 | if cc.conns == nil { | ||
772 | cc.mu.RUnlock() | ||
773 | return nil, nil, toRPCErr(ErrClientConnClosing) | ||
774 | } | ||
775 | for _, ac = range cc.conns { | ||
776 | // Break after the first iteration to get the first addrConn. | ||
777 | ok = true | ||
778 | break | ||
779 | } | ||
780 | cc.mu.RUnlock() | ||
781 | } else { | ||
782 | var ( | ||
783 | addr Address | ||
784 | err error | ||
785 | ) | ||
786 | addr, put, err = cc.dopts.balancer.Get(ctx, opts) | ||
787 | if err != nil { | ||
788 | return nil, nil, toRPCErr(err) | ||
789 | } | ||
790 | cc.mu.RLock() | ||
791 | if cc.conns == nil { | ||
792 | cc.mu.RUnlock() | ||
793 | return nil, nil, toRPCErr(ErrClientConnClosing) | ||
794 | } | ||
795 | ac, ok = cc.conns[addr] | ||
796 | cc.mu.RUnlock() | ||
797 | } | ||
798 | if !ok { | ||
799 | if put != nil { | ||
800 | updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false}) | ||
801 | put() | ||
802 | } | ||
803 | return nil, nil, errConnClosing | ||
804 | } | ||
805 | t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait) | ||
806 | if err != nil { | ||
807 | if put != nil { | ||
808 | updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false}) | ||
809 | put() | ||
810 | } | ||
811 | return nil, nil, err | ||
812 | } | ||
813 | return t, put, nil | ||
814 | } | ||
815 | |||
816 | // Close tears down the ClientConn and all underlying connections. | ||
817 | func (cc *ClientConn) Close() error { | ||
818 | cc.cancel() | ||
819 | |||
820 | cc.mu.Lock() | ||
821 | if cc.conns == nil { | ||
822 | cc.mu.Unlock() | ||
823 | return ErrClientConnClosing | ||
824 | } | ||
825 | conns := cc.conns | ||
826 | cc.conns = nil | ||
827 | cc.csMgr.updateState(connectivity.Shutdown) | ||
828 | cc.mu.Unlock() | ||
829 | if cc.dopts.balancer != nil { | ||
830 | cc.dopts.balancer.Close() | ||
831 | } | ||
832 | for _, ac := range conns { | ||
833 | ac.tearDown(ErrClientConnClosing) | ||
834 | } | ||
835 | return nil | ||
836 | } | ||
837 | |||
838 | // addrConn is a network connection to a given address. | ||
839 | type addrConn struct { | ||
840 | ctx context.Context | ||
841 | cancel context.CancelFunc | ||
842 | |||
843 | cc *ClientConn | ||
844 | addr Address | ||
845 | dopts dialOptions | ||
846 | events trace.EventLog | ||
847 | |||
848 | csEvltr *connectivityStateEvaluator | ||
849 | |||
850 | mu sync.Mutex | ||
851 | state connectivity.State | ||
852 | down func(error) // the handler called when a connection is down. | ||
853 | // ready is closed and becomes nil when a new transport is up or failed | ||
854 | // due to timeout. | ||
855 | ready chan struct{} | ||
856 | transport transport.ClientTransport | ||
857 | |||
858 | // The reason this addrConn is torn down. | ||
859 | tearDownErr error | ||
860 | } | ||
861 | |||
862 | // adjustParams updates parameters used to create transports upon | ||
863 | // receiving a GoAway. | ||
864 | func (ac *addrConn) adjustParams(r transport.GoAwayReason) { | ||
865 | switch r { | ||
866 | case transport.TooManyPings: | ||
867 | v := 2 * ac.dopts.copts.KeepaliveParams.Time | ||
868 | ac.cc.mu.Lock() | ||
869 | if v > ac.cc.mkp.Time { | ||
870 | ac.cc.mkp.Time = v | ||
871 | } | ||
872 | ac.cc.mu.Unlock() | ||
873 | } | ||
874 | } | ||
875 | |||
876 | // printf records an event in ac's event log, unless ac has been closed. | ||
877 | // REQUIRES ac.mu is held. | ||
878 | func (ac *addrConn) printf(format string, a ...interface{}) { | ||
879 | if ac.events != nil { | ||
880 | ac.events.Printf(format, a...) | ||
881 | } | ||
882 | } | ||
883 | |||
884 | // errorf records an error in ac's event log, unless ac has been closed. | ||
885 | // REQUIRES ac.mu is held. | ||
886 | func (ac *addrConn) errorf(format string, a ...interface{}) { | ||
887 | if ac.events != nil { | ||
888 | ac.events.Errorf(format, a...) | ||
889 | } | ||
890 | } | ||
891 | |||
892 | // resetTransport recreates a transport to the address for ac. | ||
893 | // For the old transport: | ||
894 | // - if drain is true, it will be gracefully closed. | ||
895 | // - otherwise, it will be closed. | ||
896 | func (ac *addrConn) resetTransport(drain bool) error { | ||
897 | ac.mu.Lock() | ||
898 | if ac.state == connectivity.Shutdown { | ||
899 | ac.mu.Unlock() | ||
900 | return errConnClosing | ||
901 | } | ||
902 | ac.printf("connecting") | ||
903 | if ac.down != nil { | ||
904 | ac.down(downErrorf(false, true, "%v", errNetworkIO)) | ||
905 | ac.down = nil | ||
906 | } | ||
907 | oldState := ac.state | ||
908 | ac.state = connectivity.Connecting | ||
909 | ac.csEvltr.recordTransition(oldState, ac.state) | ||
910 | t := ac.transport | ||
911 | ac.transport = nil | ||
912 | ac.mu.Unlock() | ||
913 | if t != nil && !drain { | ||
914 | t.Close() | ||
915 | } | ||
916 | ac.cc.mu.RLock() | ||
917 | ac.dopts.copts.KeepaliveParams = ac.cc.mkp | ||
918 | ac.cc.mu.RUnlock() | ||
919 | for retries := 0; ; retries++ { | ||
920 | ac.mu.Lock() | ||
921 | if ac.state == connectivity.Shutdown { | ||
922 | // ac.tearDown(...) has been invoked. | ||
923 | ac.mu.Unlock() | ||
924 | return errConnClosing | ||
925 | } | ||
926 | ac.mu.Unlock() | ||
927 | sleepTime := ac.dopts.bs.backoff(retries) | ||
928 | timeout := minConnectTimeout | ||
929 | if timeout < sleepTime { | ||
930 | timeout = sleepTime | ||
931 | } | ||
932 | ctx, cancel := context.WithTimeout(ac.ctx, timeout) | ||
933 | connectTime := time.Now() | ||
934 | sinfo := transport.TargetInfo{ | ||
935 | Addr: ac.addr.Addr, | ||
936 | Metadata: ac.addr.Metadata, | ||
937 | } | ||
938 | newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts) | ||
939 | // Don't call cancel in success path due to a race in Go 1.6: | ||
940 | // https://github.com/golang/go/issues/15078. | ||
941 | if err != nil { | ||
942 | cancel() | ||
943 | |||
944 | if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { | ||
945 | return err | ||
946 | } | ||
947 | grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr) | ||
948 | ac.mu.Lock() | ||
949 | if ac.state == connectivity.Shutdown { | ||
950 | // ac.tearDown(...) has been invoked. | ||
951 | ac.mu.Unlock() | ||
952 | return errConnClosing | ||
953 | } | ||
954 | ac.errorf("transient failure: %v", err) | ||
955 | oldState = ac.state | ||
956 | ac.state = connectivity.TransientFailure | ||
957 | ac.csEvltr.recordTransition(oldState, ac.state) | ||
958 | if ac.ready != nil { | ||
959 | close(ac.ready) | ||
960 | ac.ready = nil | ||
961 | } | ||
962 | ac.mu.Unlock() | ||
963 | timer := time.NewTimer(sleepTime - time.Since(connectTime)) | ||
964 | select { | ||
965 | case <-timer.C: | ||
966 | case <-ac.ctx.Done(): | ||
967 | timer.Stop() | ||
968 | return ac.ctx.Err() | ||
969 | } | ||
970 | timer.Stop() | ||
971 | continue | ||
972 | } | ||
973 | ac.mu.Lock() | ||
974 | ac.printf("ready") | ||
975 | if ac.state == connectivity.Shutdown { | ||
976 | // ac.tearDown(...) has been invoked. | ||
977 | ac.mu.Unlock() | ||
978 | newTransport.Close() | ||
979 | return errConnClosing | ||
980 | } | ||
981 | oldState = ac.state | ||
982 | ac.state = connectivity.Ready | ||
983 | ac.csEvltr.recordTransition(oldState, ac.state) | ||
984 | ac.transport = newTransport | ||
985 | if ac.ready != nil { | ||
986 | close(ac.ready) | ||
987 | ac.ready = nil | ||
988 | } | ||
989 | if ac.cc.dopts.balancer != nil { | ||
990 | ac.down = ac.cc.dopts.balancer.Up(ac.addr) | ||
991 | } | ||
992 | ac.mu.Unlock() | ||
993 | return nil | ||
994 | } | ||
995 | } | ||
996 | |||
997 | // Run in a goroutine to track the error in transport and create the | ||
998 | // new transport if an error happens. It returns when the channel is closing. | ||
999 | func (ac *addrConn) transportMonitor() { | ||
1000 | for { | ||
1001 | ac.mu.Lock() | ||
1002 | t := ac.transport | ||
1003 | ac.mu.Unlock() | ||
1004 | select { | ||
1005 | // This is needed to detect the teardown when | ||
1006 | // the addrConn is idle (i.e., no RPC in flight). | ||
1007 | case <-ac.ctx.Done(): | ||
1008 | select { | ||
1009 | case <-t.Error(): | ||
1010 | t.Close() | ||
1011 | default: | ||
1012 | } | ||
1013 | return | ||
1014 | case <-t.GoAway(): | ||
1015 | ac.adjustParams(t.GetGoAwayReason()) | ||
1016 | // If GoAway happens without any network I/O error, the underlying transport | ||
1017 | // will be gracefully closed, and a new transport will be created. | ||
1018 | // (The transport will be closed when all the pending RPCs finished or failed.) | ||
1019 | // If GoAway and some network I/O error happen concurrently, the underlying transport | ||
1020 | // will be closed, and a new transport will be created. | ||
1021 | var drain bool | ||
1022 | select { | ||
1023 | case <-t.Error(): | ||
1024 | default: | ||
1025 | drain = true | ||
1026 | } | ||
1027 | if err := ac.resetTransport(drain); err != nil { | ||
1028 | grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) | ||
1029 | if err != errConnClosing { | ||
1030 | // Keep this ac in cc.conns, to get the reason it's torn down. | ||
1031 | ac.tearDown(err) | ||
1032 | } | ||
1033 | return | ||
1034 | } | ||
1035 | case <-t.Error(): | ||
1036 | select { | ||
1037 | case <-ac.ctx.Done(): | ||
1038 | t.Close() | ||
1039 | return | ||
1040 | case <-t.GoAway(): | ||
1041 | ac.adjustParams(t.GetGoAwayReason()) | ||
1042 | if err := ac.resetTransport(false); err != nil { | ||
1043 | grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) | ||
1044 | if err != errConnClosing { | ||
1045 | // Keep this ac in cc.conns, to get the reason it's torn down. | ||
1046 | ac.tearDown(err) | ||
1047 | } | ||
1048 | return | ||
1049 | } | ||
1050 | default: | ||
1051 | } | ||
1052 | ac.mu.Lock() | ||
1053 | if ac.state == connectivity.Shutdown { | ||
1054 | // ac has been shutdown. | ||
1055 | ac.mu.Unlock() | ||
1056 | return | ||
1057 | } | ||
1058 | oldState := ac.state | ||
1059 | ac.state = connectivity.TransientFailure | ||
1060 | ac.csEvltr.recordTransition(oldState, ac.state) | ||
1061 | ac.mu.Unlock() | ||
1062 | if err := ac.resetTransport(false); err != nil { | ||
1063 | grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) | ||
1064 | ac.mu.Lock() | ||
1065 | ac.printf("transport exiting: %v", err) | ||
1066 | ac.mu.Unlock() | ||
1067 | grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err) | ||
1068 | if err != errConnClosing { | ||
1069 | // Keep this ac in cc.conns, to get the reason it's torn down. | ||
1070 | ac.tearDown(err) | ||
1071 | } | ||
1072 | return | ||
1073 | } | ||
1074 | } | ||
1075 | } | ||
1076 | } | ||
1077 | |||
1078 | // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or | ||
1079 | // iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true. | ||
1080 | func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) { | ||
1081 | for { | ||
1082 | ac.mu.Lock() | ||
1083 | switch { | ||
1084 | case ac.state == connectivity.Shutdown: | ||
1085 | if failfast || !hasBalancer { | ||
1086 | // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr. | ||
1087 | err := ac.tearDownErr | ||
1088 | ac.mu.Unlock() | ||
1089 | return nil, err | ||
1090 | } | ||
1091 | ac.mu.Unlock() | ||
1092 | return nil, errConnClosing | ||
1093 | case ac.state == connectivity.Ready: | ||
1094 | ct := ac.transport | ||
1095 | ac.mu.Unlock() | ||
1096 | return ct, nil | ||
1097 | case ac.state == connectivity.TransientFailure: | ||
1098 | if failfast || hasBalancer { | ||
1099 | ac.mu.Unlock() | ||
1100 | return nil, errConnUnavailable | ||
1101 | } | ||
1102 | } | ||
1103 | ready := ac.ready | ||
1104 | if ready == nil { | ||
1105 | ready = make(chan struct{}) | ||
1106 | ac.ready = ready | ||
1107 | } | ||
1108 | ac.mu.Unlock() | ||
1109 | select { | ||
1110 | case <-ctx.Done(): | ||
1111 | return nil, toRPCErr(ctx.Err()) | ||
1112 | // Wait until the new transport is ready or failed. | ||
1113 | case <-ready: | ||
1114 | } | ||
1115 | } | ||
1116 | } | ||
1117 | |||
1118 | // tearDown starts to tear down the addrConn. | ||
1119 | // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in | ||
1120 | // some edge cases (e.g., the caller opens and closes many addrConn's in a | ||
1121 | // tight loop. | ||
1122 | // tearDown doesn't remove ac from ac.cc.conns. | ||
1123 | func (ac *addrConn) tearDown(err error) { | ||
1124 | ac.cancel() | ||
1125 | |||
1126 | ac.mu.Lock() | ||
1127 | defer ac.mu.Unlock() | ||
1128 | if ac.down != nil { | ||
1129 | ac.down(downErrorf(false, false, "%v", err)) | ||
1130 | ac.down = nil | ||
1131 | } | ||
1132 | if err == errConnDrain && ac.transport != nil { | ||
1133 | // GracefulClose(...) may be executed multiple times when | ||
1134 | // i) receiving multiple GoAway frames from the server; or | ||
1135 | // ii) there are concurrent name resolver/Balancer triggered | ||
1136 | // address removal and GoAway. | ||
1137 | ac.transport.GracefulClose() | ||
1138 | } | ||
1139 | if ac.state == connectivity.Shutdown { | ||
1140 | return | ||
1141 | } | ||
1142 | oldState := ac.state | ||
1143 | ac.state = connectivity.Shutdown | ||
1144 | ac.tearDownErr = err | ||
1145 | ac.csEvltr.recordTransition(oldState, ac.state) | ||
1146 | if ac.events != nil { | ||
1147 | ac.events.Finish() | ||
1148 | ac.events = nil | ||
1149 | } | ||
1150 | if ac.ready != nil { | ||
1151 | close(ac.ready) | ||
1152 | ac.ready = nil | ||
1153 | } | ||
1154 | if ac.transport != nil && err != errConnDrain { | ||
1155 | ac.transport.Close() | ||
1156 | } | ||
1157 | return | ||
1158 | } | ||