]>
Commit | Line | Data |
---|---|---|
1 | /* | |
2 | * | |
3 | * Copyright 2014 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package grpc | |
20 | ||
21 | import ( | |
22 | "errors" | |
23 | "net" | |
24 | "strings" | |
25 | "sync" | |
26 | "time" | |
27 | ||
28 | "golang.org/x/net/context" | |
29 | "golang.org/x/net/trace" | |
30 | "google.golang.org/grpc/connectivity" | |
31 | "google.golang.org/grpc/credentials" | |
32 | "google.golang.org/grpc/grpclog" | |
33 | "google.golang.org/grpc/keepalive" | |
34 | "google.golang.org/grpc/stats" | |
35 | "google.golang.org/grpc/transport" | |
36 | ) | |
37 | ||
38 | var ( | |
39 | // ErrClientConnClosing indicates that the operation is illegal because | |
40 | // the ClientConn is closing. | |
41 | ErrClientConnClosing = errors.New("grpc: the client connection is closing") | |
42 | // ErrClientConnTimeout indicates that the ClientConn cannot establish the | |
43 | // underlying connections within the specified timeout. | |
44 | // DEPRECATED: Please use context.DeadlineExceeded instead. | |
45 | ErrClientConnTimeout = errors.New("grpc: timed out when dialing") | |
46 | ||
47 | // errNoTransportSecurity indicates that there is no transport security | |
48 | // being set for ClientConn. Users should either set one or explicitly | |
49 | // call WithInsecure DialOption to disable security. | |
50 | errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") | |
51 | // errTransportCredentialsMissing indicates that users want to transmit security | |
52 | // information (e.g., oauth2 token) which requires secure connection on an insecure | |
53 | // connection. | |
54 | errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") | |
55 | // errCredentialsConflict indicates that grpc.WithTransportCredentials() | |
56 | // and grpc.WithInsecure() are both called for a connection. | |
57 | errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") | |
58 | // errNetworkIO indicates that the connection is down due to some network I/O error. | |
59 | errNetworkIO = errors.New("grpc: failed with network I/O error") | |
60 | // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. | |
61 | errConnDrain = errors.New("grpc: the connection is drained") | |
62 | // errConnClosing indicates that the connection is closing. | |
63 | errConnClosing = errors.New("grpc: the connection is closing") | |
64 | // errConnUnavailable indicates that the connection is unavailable. | |
65 | errConnUnavailable = errors.New("grpc: the connection is unavailable") | |
66 | // errBalancerClosed indicates that the balancer is closed. | |
67 | errBalancerClosed = errors.New("grpc: balancer is closed") | |
68 | // minimum time to give a connection to complete | |
69 | minConnectTimeout = 20 * time.Second | |
70 | ) | |
71 | ||
72 | // dialOptions configure a Dial call. dialOptions are set by the DialOption | |
73 | // values passed to Dial. | |
74 | type dialOptions struct { | |
75 | unaryInt UnaryClientInterceptor | |
76 | streamInt StreamClientInterceptor | |
77 | codec Codec | |
78 | cp Compressor | |
79 | dc Decompressor | |
80 | bs backoffStrategy | |
81 | balancer Balancer | |
82 | block bool | |
83 | insecure bool | |
84 | timeout time.Duration | |
85 | scChan <-chan ServiceConfig | |
86 | copts transport.ConnectOptions | |
87 | callOptions []CallOption | |
88 | } | |
89 | ||
90 | const ( | |
91 | defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 | |
92 | defaultClientMaxSendMessageSize = 1024 * 1024 * 4 | |
93 | ) | |
94 | ||
95 | // DialOption configures how we set up the connection. | |
96 | type DialOption func(*dialOptions) | |
97 | ||
98 | // WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream. | |
99 | // The lower bound for window size is 64K and any value smaller than that will be ignored. | |
100 | func WithInitialWindowSize(s int32) DialOption { | |
101 | return func(o *dialOptions) { | |
102 | o.copts.InitialWindowSize = s | |
103 | } | |
104 | } | |
105 | ||
106 | // WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection. | |
107 | // The lower bound for window size is 64K and any value smaller than that will be ignored. | |
108 | func WithInitialConnWindowSize(s int32) DialOption { | |
109 | return func(o *dialOptions) { | |
110 | o.copts.InitialConnWindowSize = s | |
111 | } | |
112 | } | |
113 | ||
114 | // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead. | |
115 | func WithMaxMsgSize(s int) DialOption { | |
116 | return WithDefaultCallOptions(MaxCallRecvMsgSize(s)) | |
117 | } | |
118 | ||
119 | // WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection. | |
120 | func WithDefaultCallOptions(cos ...CallOption) DialOption { | |
121 | return func(o *dialOptions) { | |
122 | o.callOptions = append(o.callOptions, cos...) | |
123 | } | |
124 | } | |
125 | ||
126 | // WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling. | |
127 | func WithCodec(c Codec) DialOption { | |
128 | return func(o *dialOptions) { | |
129 | o.codec = c | |
130 | } | |
131 | } | |
132 | ||
133 | // WithCompressor returns a DialOption which sets a CompressorGenerator for generating message | |
134 | // compressor. | |
135 | func WithCompressor(cp Compressor) DialOption { | |
136 | return func(o *dialOptions) { | |
137 | o.cp = cp | |
138 | } | |
139 | } | |
140 | ||
141 | // WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating | |
142 | // message decompressor. | |
143 | func WithDecompressor(dc Decompressor) DialOption { | |
144 | return func(o *dialOptions) { | |
145 | o.dc = dc | |
146 | } | |
147 | } | |
148 | ||
149 | // WithBalancer returns a DialOption which sets a load balancer. | |
150 | func WithBalancer(b Balancer) DialOption { | |
151 | return func(o *dialOptions) { | |
152 | o.balancer = b | |
153 | } | |
154 | } | |
155 | ||
156 | // WithServiceConfig returns a DialOption which has a channel to read the service configuration. | |
157 | func WithServiceConfig(c <-chan ServiceConfig) DialOption { | |
158 | return func(o *dialOptions) { | |
159 | o.scChan = c | |
160 | } | |
161 | } | |
162 | ||
163 | // WithBackoffMaxDelay configures the dialer to use the provided maximum delay | |
164 | // when backing off after failed connection attempts. | |
165 | func WithBackoffMaxDelay(md time.Duration) DialOption { | |
166 | return WithBackoffConfig(BackoffConfig{MaxDelay: md}) | |
167 | } | |
168 | ||
169 | // WithBackoffConfig configures the dialer to use the provided backoff | |
170 | // parameters after connection failures. | |
171 | // | |
172 | // Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up | |
173 | // for use. | |
174 | func WithBackoffConfig(b BackoffConfig) DialOption { | |
175 | // Set defaults to ensure that provided BackoffConfig is valid and | |
176 | // unexported fields get default values. | |
177 | setDefaults(&b) | |
178 | return withBackoff(b) | |
179 | } | |
180 | ||
181 | // withBackoff sets the backoff strategy used for retries after a | |
182 | // failed connection attempt. | |
183 | // | |
184 | // This can be exported if arbitrary backoff strategies are allowed by gRPC. | |
185 | func withBackoff(bs backoffStrategy) DialOption { | |
186 | return func(o *dialOptions) { | |
187 | o.bs = bs | |
188 | } | |
189 | } | |
190 | ||
191 | // WithBlock returns a DialOption which makes caller of Dial blocks until the underlying | |
192 | // connection is up. Without this, Dial returns immediately and connecting the server | |
193 | // happens in background. | |
194 | func WithBlock() DialOption { | |
195 | return func(o *dialOptions) { | |
196 | o.block = true | |
197 | } | |
198 | } | |
199 | ||
200 | // WithInsecure returns a DialOption which disables transport security for this ClientConn. | |
201 | // Note that transport security is required unless WithInsecure is set. | |
202 | func WithInsecure() DialOption { | |
203 | return func(o *dialOptions) { | |
204 | o.insecure = true | |
205 | } | |
206 | } | |
207 | ||
208 | // WithTransportCredentials returns a DialOption which configures a | |
209 | // connection level security credentials (e.g., TLS/SSL). | |
210 | func WithTransportCredentials(creds credentials.TransportCredentials) DialOption { | |
211 | return func(o *dialOptions) { | |
212 | o.copts.TransportCredentials = creds | |
213 | } | |
214 | } | |
215 | ||
216 | // WithPerRPCCredentials returns a DialOption which sets | |
217 | // credentials and places auth state on each outbound RPC. | |
218 | func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption { | |
219 | return func(o *dialOptions) { | |
220 | o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds) | |
221 | } | |
222 | } | |
223 | ||
224 | // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn | |
225 | // initially. This is valid if and only if WithBlock() is present. | |
226 | // Deprecated: use DialContext and context.WithTimeout instead. | |
227 | func WithTimeout(d time.Duration) DialOption { | |
228 | return func(o *dialOptions) { | |
229 | o.timeout = d | |
230 | } | |
231 | } | |
232 | ||
233 | // WithDialer returns a DialOption that specifies a function to use for dialing network addresses. | |
234 | // If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's | |
235 | // Temporary() method to decide if it should try to reconnect to the network address. | |
236 | func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { | |
237 | return func(o *dialOptions) { | |
238 | o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) { | |
239 | if deadline, ok := ctx.Deadline(); ok { | |
240 | return f(addr, deadline.Sub(time.Now())) | |
241 | } | |
242 | return f(addr, 0) | |
243 | } | |
244 | } | |
245 | } | |
246 | ||
247 | // WithStatsHandler returns a DialOption that specifies the stats handler | |
248 | // for all the RPCs and underlying network connections in this ClientConn. | |
249 | func WithStatsHandler(h stats.Handler) DialOption { | |
250 | return func(o *dialOptions) { | |
251 | o.copts.StatsHandler = h | |
252 | } | |
253 | } | |
254 | ||
255 | // FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors. | |
256 | // If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network | |
257 | // address and won't try to reconnect. | |
258 | // The default value of FailOnNonTempDialError is false. | |
259 | // This is an EXPERIMENTAL API. | |
260 | func FailOnNonTempDialError(f bool) DialOption { | |
261 | return func(o *dialOptions) { | |
262 | o.copts.FailOnNonTempDialError = f | |
263 | } | |
264 | } | |
265 | ||
266 | // WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs. | |
267 | func WithUserAgent(s string) DialOption { | |
268 | return func(o *dialOptions) { | |
269 | o.copts.UserAgent = s | |
270 | } | |
271 | } | |
272 | ||
273 | // WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport. | |
274 | func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption { | |
275 | return func(o *dialOptions) { | |
276 | o.copts.KeepaliveParams = kp | |
277 | } | |
278 | } | |
279 | ||
280 | // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs. | |
281 | func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { | |
282 | return func(o *dialOptions) { | |
283 | o.unaryInt = f | |
284 | } | |
285 | } | |
286 | ||
287 | // WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs. | |
288 | func WithStreamInterceptor(f StreamClientInterceptor) DialOption { | |
289 | return func(o *dialOptions) { | |
290 | o.streamInt = f | |
291 | } | |
292 | } | |
293 | ||
294 | // WithAuthority returns a DialOption that specifies the value to be used as | |
295 | // the :authority pseudo-header. This value only works with WithInsecure and | |
296 | // has no effect if TransportCredentials are present. | |
297 | func WithAuthority(a string) DialOption { | |
298 | return func(o *dialOptions) { | |
299 | o.copts.Authority = a | |
300 | } | |
301 | } | |
302 | ||
303 | // Dial creates a client connection to the given target. | |
304 | func Dial(target string, opts ...DialOption) (*ClientConn, error) { | |
305 | return DialContext(context.Background(), target, opts...) | |
306 | } | |
307 | ||
308 | // DialContext creates a client connection to the given target. ctx can be used to | |
309 | // cancel or expire the pending connection. Once this function returns, the | |
310 | // cancellation and expiration of ctx will be noop. Users should call ClientConn.Close | |
311 | // to terminate all the pending operations after this function returns. | |
312 | func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { | |
313 | cc := &ClientConn{ | |
314 | target: target, | |
315 | csMgr: &connectivityStateManager{}, | |
316 | conns: make(map[Address]*addrConn), | |
317 | } | |
318 | cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr} | |
319 | cc.ctx, cc.cancel = context.WithCancel(context.Background()) | |
320 | ||
321 | for _, opt := range opts { | |
322 | opt(&cc.dopts) | |
323 | } | |
324 | cc.mkp = cc.dopts.copts.KeepaliveParams | |
325 | ||
326 | if cc.dopts.copts.Dialer == nil { | |
327 | cc.dopts.copts.Dialer = newProxyDialer( | |
328 | func(ctx context.Context, addr string) (net.Conn, error) { | |
329 | return dialContext(ctx, "tcp", addr) | |
330 | }, | |
331 | ) | |
332 | } | |
333 | ||
334 | if cc.dopts.copts.UserAgent != "" { | |
335 | cc.dopts.copts.UserAgent += " " + grpcUA | |
336 | } else { | |
337 | cc.dopts.copts.UserAgent = grpcUA | |
338 | } | |
339 | ||
340 | if cc.dopts.timeout > 0 { | |
341 | var cancel context.CancelFunc | |
342 | ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) | |
343 | defer cancel() | |
344 | } | |
345 | ||
346 | defer func() { | |
347 | select { | |
348 | case <-ctx.Done(): | |
349 | conn, err = nil, ctx.Err() | |
350 | default: | |
351 | } | |
352 | ||
353 | if err != nil { | |
354 | cc.Close() | |
355 | } | |
356 | }() | |
357 | ||
358 | scSet := false | |
359 | if cc.dopts.scChan != nil { | |
360 | // Try to get an initial service config. | |
361 | select { | |
362 | case sc, ok := <-cc.dopts.scChan: | |
363 | if ok { | |
364 | cc.sc = sc | |
365 | scSet = true | |
366 | } | |
367 | default: | |
368 | } | |
369 | } | |
370 | // Set defaults. | |
371 | if cc.dopts.codec == nil { | |
372 | cc.dopts.codec = protoCodec{} | |
373 | } | |
374 | if cc.dopts.bs == nil { | |
375 | cc.dopts.bs = DefaultBackoffConfig | |
376 | } | |
377 | creds := cc.dopts.copts.TransportCredentials | |
378 | if creds != nil && creds.Info().ServerName != "" { | |
379 | cc.authority = creds.Info().ServerName | |
380 | } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" { | |
381 | cc.authority = cc.dopts.copts.Authority | |
382 | } else { | |
383 | cc.authority = target | |
384 | } | |
385 | waitC := make(chan error, 1) | |
386 | go func() { | |
387 | defer close(waitC) | |
388 | if cc.dopts.balancer == nil && cc.sc.LB != nil { | |
389 | cc.dopts.balancer = cc.sc.LB | |
390 | } | |
391 | if cc.dopts.balancer != nil { | |
392 | var credsClone credentials.TransportCredentials | |
393 | if creds != nil { | |
394 | credsClone = creds.Clone() | |
395 | } | |
396 | config := BalancerConfig{ | |
397 | DialCreds: credsClone, | |
398 | Dialer: cc.dopts.copts.Dialer, | |
399 | } | |
400 | if err := cc.dopts.balancer.Start(target, config); err != nil { | |
401 | waitC <- err | |
402 | return | |
403 | } | |
404 | ch := cc.dopts.balancer.Notify() | |
405 | if ch != nil { | |
406 | if cc.dopts.block { | |
407 | doneChan := make(chan struct{}) | |
408 | go cc.lbWatcher(doneChan) | |
409 | <-doneChan | |
410 | } else { | |
411 | go cc.lbWatcher(nil) | |
412 | } | |
413 | return | |
414 | } | |
415 | } | |
416 | // No balancer, or no resolver within the balancer. Connect directly. | |
417 | if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil { | |
418 | waitC <- err | |
419 | return | |
420 | } | |
421 | }() | |
422 | select { | |
423 | case <-ctx.Done(): | |
424 | return nil, ctx.Err() | |
425 | case err := <-waitC: | |
426 | if err != nil { | |
427 | return nil, err | |
428 | } | |
429 | } | |
430 | if cc.dopts.scChan != nil && !scSet { | |
431 | // Blocking wait for the initial service config. | |
432 | select { | |
433 | case sc, ok := <-cc.dopts.scChan: | |
434 | if ok { | |
435 | cc.sc = sc | |
436 | } | |
437 | case <-ctx.Done(): | |
438 | return nil, ctx.Err() | |
439 | } | |
440 | } | |
441 | if cc.dopts.scChan != nil { | |
442 | go cc.scWatcher() | |
443 | } | |
444 | ||
445 | return cc, nil | |
446 | } | |
447 | ||
448 | // connectivityStateEvaluator gets updated by addrConns when their | |
449 | // states transition, based on which it evaluates the state of | |
450 | // ClientConn. | |
451 | // Note: This code will eventually sit in the balancer in the new design. | |
452 | type connectivityStateEvaluator struct { | |
453 | csMgr *connectivityStateManager | |
454 | mu sync.Mutex | |
455 | numReady uint64 // Number of addrConns in ready state. | |
456 | numConnecting uint64 // Number of addrConns in connecting state. | |
457 | numTransientFailure uint64 // Number of addrConns in transientFailure. | |
458 | } | |
459 | ||
460 | // recordTransition records state change happening in every addrConn and based on | |
461 | // that it evaluates what state the ClientConn is in. | |
462 | // It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states, | |
463 | // Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection | |
464 | // before any addrConn is created ClientConn is in idle state. In the end when ClientConn | |
465 | // closes it is in connectivity.Shutdown state. | |
466 | // TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state. | |
467 | func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) { | |
468 | cse.mu.Lock() | |
469 | defer cse.mu.Unlock() | |
470 | ||
471 | // Update counters. | |
472 | for idx, state := range []connectivity.State{oldState, newState} { | |
473 | updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. | |
474 | switch state { | |
475 | case connectivity.Ready: | |
476 | cse.numReady += updateVal | |
477 | case connectivity.Connecting: | |
478 | cse.numConnecting += updateVal | |
479 | case connectivity.TransientFailure: | |
480 | cse.numTransientFailure += updateVal | |
481 | } | |
482 | } | |
483 | ||
484 | // Evaluate. | |
485 | if cse.numReady > 0 { | |
486 | cse.csMgr.updateState(connectivity.Ready) | |
487 | return | |
488 | } | |
489 | if cse.numConnecting > 0 { | |
490 | cse.csMgr.updateState(connectivity.Connecting) | |
491 | return | |
492 | } | |
493 | cse.csMgr.updateState(connectivity.TransientFailure) | |
494 | } | |
495 | ||
496 | // connectivityStateManager keeps the connectivity.State of ClientConn. | |
497 | // This struct will eventually be exported so the balancers can access it. | |
498 | type connectivityStateManager struct { | |
499 | mu sync.Mutex | |
500 | state connectivity.State | |
501 | notifyChan chan struct{} | |
502 | } | |
503 | ||
504 | // updateState updates the connectivity.State of ClientConn. | |
505 | // If there's a change it notifies goroutines waiting on state change to | |
506 | // happen. | |
507 | func (csm *connectivityStateManager) updateState(state connectivity.State) { | |
508 | csm.mu.Lock() | |
509 | defer csm.mu.Unlock() | |
510 | if csm.state == connectivity.Shutdown { | |
511 | return | |
512 | } | |
513 | if csm.state == state { | |
514 | return | |
515 | } | |
516 | csm.state = state | |
517 | if csm.notifyChan != nil { | |
518 | // There are other goroutines waiting on this channel. | |
519 | close(csm.notifyChan) | |
520 | csm.notifyChan = nil | |
521 | } | |
522 | } | |
523 | ||
524 | func (csm *connectivityStateManager) getState() connectivity.State { | |
525 | csm.mu.Lock() | |
526 | defer csm.mu.Unlock() | |
527 | return csm.state | |
528 | } | |
529 | ||
530 | func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { | |
531 | csm.mu.Lock() | |
532 | defer csm.mu.Unlock() | |
533 | if csm.notifyChan == nil { | |
534 | csm.notifyChan = make(chan struct{}) | |
535 | } | |
536 | return csm.notifyChan | |
537 | } | |
538 | ||
539 | // ClientConn represents a client connection to an RPC server. | |
540 | type ClientConn struct { | |
541 | ctx context.Context | |
542 | cancel context.CancelFunc | |
543 | ||
544 | target string | |
545 | authority string | |
546 | dopts dialOptions | |
547 | csMgr *connectivityStateManager | |
548 | csEvltr *connectivityStateEvaluator // This will eventually be part of balancer. | |
549 | ||
550 | mu sync.RWMutex | |
551 | sc ServiceConfig | |
552 | conns map[Address]*addrConn | |
553 | // Keepalive parameter can be updated if a GoAway is received. | |
554 | mkp keepalive.ClientParameters | |
555 | } | |
556 | ||
557 | // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or | |
558 | // ctx expires. A true value is returned in former case and false in latter. | |
559 | // This is an EXPERIMENTAL API. | |
560 | func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { | |
561 | ch := cc.csMgr.getNotifyChan() | |
562 | if cc.csMgr.getState() != sourceState { | |
563 | return true | |
564 | } | |
565 | select { | |
566 | case <-ctx.Done(): | |
567 | return false | |
568 | case <-ch: | |
569 | return true | |
570 | } | |
571 | } | |
572 | ||
573 | // GetState returns the connectivity.State of ClientConn. | |
574 | // This is an EXPERIMENTAL API. | |
575 | func (cc *ClientConn) GetState() connectivity.State { | |
576 | return cc.csMgr.getState() | |
577 | } | |
578 | ||
579 | // lbWatcher watches the Notify channel of the balancer in cc and manages | |
580 | // connections accordingly. If doneChan is not nil, it is closed after the | |
581 | // first successfull connection is made. | |
582 | func (cc *ClientConn) lbWatcher(doneChan chan struct{}) { | |
583 | defer func() { | |
584 | // In case channel from cc.dopts.balancer.Notify() gets closed before a | |
585 | // successful connection gets established, don't forget to notify the | |
586 | // caller. | |
587 | if doneChan != nil { | |
588 | close(doneChan) | |
589 | } | |
590 | }() | |
591 | ||
592 | for addrs := range cc.dopts.balancer.Notify() { | |
593 | var ( | |
594 | add []Address // Addresses need to setup connections. | |
595 | del []*addrConn // Connections need to tear down. | |
596 | ) | |
597 | cc.mu.Lock() | |
598 | for _, a := range addrs { | |
599 | if _, ok := cc.conns[a]; !ok { | |
600 | add = append(add, a) | |
601 | } | |
602 | } | |
603 | for k, c := range cc.conns { | |
604 | var keep bool | |
605 | for _, a := range addrs { | |
606 | if k == a { | |
607 | keep = true | |
608 | break | |
609 | } | |
610 | } | |
611 | if !keep { | |
612 | del = append(del, c) | |
613 | delete(cc.conns, c.addr) | |
614 | } | |
615 | } | |
616 | cc.mu.Unlock() | |
617 | for _, a := range add { | |
618 | var err error | |
619 | if doneChan != nil { | |
620 | err = cc.resetAddrConn(a, true, nil) | |
621 | if err == nil { | |
622 | close(doneChan) | |
623 | doneChan = nil | |
624 | } | |
625 | } else { | |
626 | err = cc.resetAddrConn(a, false, nil) | |
627 | } | |
628 | if err != nil { | |
629 | grpclog.Warningf("Error creating connection to %v. Err: %v", a, err) | |
630 | } | |
631 | } | |
632 | for _, c := range del { | |
633 | c.tearDown(errConnDrain) | |
634 | } | |
635 | } | |
636 | } | |
637 | ||
638 | func (cc *ClientConn) scWatcher() { | |
639 | for { | |
640 | select { | |
641 | case sc, ok := <-cc.dopts.scChan: | |
642 | if !ok { | |
643 | return | |
644 | } | |
645 | cc.mu.Lock() | |
646 | // TODO: load balance policy runtime change is ignored. | |
647 | // We may revist this decision in the future. | |
648 | cc.sc = sc | |
649 | cc.mu.Unlock() | |
650 | case <-cc.ctx.Done(): | |
651 | return | |
652 | } | |
653 | } | |
654 | } | |
655 | ||
656 | // resetAddrConn creates an addrConn for addr and adds it to cc.conns. | |
657 | // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. | |
658 | // If tearDownErr is nil, errConnDrain will be used instead. | |
659 | // | |
660 | // We should never need to replace an addrConn with a new one. This function is only used | |
661 | // as newAddrConn to create new addrConn. | |
662 | // TODO rename this function and clean up the code. | |
663 | func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error { | |
664 | ac := &addrConn{ | |
665 | cc: cc, | |
666 | addr: addr, | |
667 | dopts: cc.dopts, | |
668 | } | |
669 | ac.ctx, ac.cancel = context.WithCancel(cc.ctx) | |
670 | ac.csEvltr = cc.csEvltr | |
671 | if EnableTracing { | |
672 | ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr) | |
673 | } | |
674 | if !ac.dopts.insecure { | |
675 | if ac.dopts.copts.TransportCredentials == nil { | |
676 | return errNoTransportSecurity | |
677 | } | |
678 | } else { | |
679 | if ac.dopts.copts.TransportCredentials != nil { | |
680 | return errCredentialsConflict | |
681 | } | |
682 | for _, cd := range ac.dopts.copts.PerRPCCredentials { | |
683 | if cd.RequireTransportSecurity() { | |
684 | return errTransportCredentialsMissing | |
685 | } | |
686 | } | |
687 | } | |
688 | // Track ac in cc. This needs to be done before any getTransport(...) is called. | |
689 | cc.mu.Lock() | |
690 | if cc.conns == nil { | |
691 | cc.mu.Unlock() | |
692 | return ErrClientConnClosing | |
693 | } | |
694 | stale := cc.conns[ac.addr] | |
695 | cc.conns[ac.addr] = ac | |
696 | cc.mu.Unlock() | |
697 | if stale != nil { | |
698 | // There is an addrConn alive on ac.addr already. This could be due to | |
699 | // a buggy Balancer that reports duplicated Addresses. | |
700 | if tearDownErr == nil { | |
701 | // tearDownErr is nil if resetAddrConn is called by | |
702 | // 1) Dial | |
703 | // 2) lbWatcher | |
704 | // In both cases, the stale ac should drain, not close. | |
705 | stale.tearDown(errConnDrain) | |
706 | } else { | |
707 | stale.tearDown(tearDownErr) | |
708 | } | |
709 | } | |
710 | if block { | |
711 | if err := ac.resetTransport(false); err != nil { | |
712 | if err != errConnClosing { | |
713 | // Tear down ac and delete it from cc.conns. | |
714 | cc.mu.Lock() | |
715 | delete(cc.conns, ac.addr) | |
716 | cc.mu.Unlock() | |
717 | ac.tearDown(err) | |
718 | } | |
719 | if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { | |
720 | return e.Origin() | |
721 | } | |
722 | return err | |
723 | } | |
724 | // Start to monitor the error status of transport. | |
725 | go ac.transportMonitor() | |
726 | } else { | |
727 | // Start a goroutine connecting to the server asynchronously. | |
728 | go func() { | |
729 | if err := ac.resetTransport(false); err != nil { | |
730 | grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err) | |
731 | if err != errConnClosing { | |
732 | // Keep this ac in cc.conns, to get the reason it's torn down. | |
733 | ac.tearDown(err) | |
734 | } | |
735 | return | |
736 | } | |
737 | ac.transportMonitor() | |
738 | }() | |
739 | } | |
740 | return nil | |
741 | } | |
742 | ||
743 | // GetMethodConfig gets the method config of the input method. | |
744 | // If there's an exact match for input method (i.e. /service/method), we return | |
745 | // the corresponding MethodConfig. | |
746 | // If there isn't an exact match for the input method, we look for the default config | |
747 | // under the service (i.e /service/). If there is a default MethodConfig for | |
748 | // the serivce, we return it. | |
749 | // Otherwise, we return an empty MethodConfig. | |
750 | func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { | |
751 | // TODO: Avoid the locking here. | |
752 | cc.mu.RLock() | |
753 | defer cc.mu.RUnlock() | |
754 | m, ok := cc.sc.Methods[method] | |
755 | if !ok { | |
756 | i := strings.LastIndex(method, "/") | |
757 | m, _ = cc.sc.Methods[method[:i+1]] | |
758 | } | |
759 | return m | |
760 | } | |
761 | ||
762 | func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { | |
763 | var ( | |
764 | ac *addrConn | |
765 | ok bool | |
766 | put func() | |
767 | ) | |
768 | if cc.dopts.balancer == nil { | |
769 | // If balancer is nil, there should be only one addrConn available. | |
770 | cc.mu.RLock() | |
771 | if cc.conns == nil { | |
772 | cc.mu.RUnlock() | |
773 | return nil, nil, toRPCErr(ErrClientConnClosing) | |
774 | } | |
775 | for _, ac = range cc.conns { | |
776 | // Break after the first iteration to get the first addrConn. | |
777 | ok = true | |
778 | break | |
779 | } | |
780 | cc.mu.RUnlock() | |
781 | } else { | |
782 | var ( | |
783 | addr Address | |
784 | err error | |
785 | ) | |
786 | addr, put, err = cc.dopts.balancer.Get(ctx, opts) | |
787 | if err != nil { | |
788 | return nil, nil, toRPCErr(err) | |
789 | } | |
790 | cc.mu.RLock() | |
791 | if cc.conns == nil { | |
792 | cc.mu.RUnlock() | |
793 | return nil, nil, toRPCErr(ErrClientConnClosing) | |
794 | } | |
795 | ac, ok = cc.conns[addr] | |
796 | cc.mu.RUnlock() | |
797 | } | |
798 | if !ok { | |
799 | if put != nil { | |
800 | updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false}) | |
801 | put() | |
802 | } | |
803 | return nil, nil, errConnClosing | |
804 | } | |
805 | t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait) | |
806 | if err != nil { | |
807 | if put != nil { | |
808 | updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false}) | |
809 | put() | |
810 | } | |
811 | return nil, nil, err | |
812 | } | |
813 | return t, put, nil | |
814 | } | |
815 | ||
816 | // Close tears down the ClientConn and all underlying connections. | |
817 | func (cc *ClientConn) Close() error { | |
818 | cc.cancel() | |
819 | ||
820 | cc.mu.Lock() | |
821 | if cc.conns == nil { | |
822 | cc.mu.Unlock() | |
823 | return ErrClientConnClosing | |
824 | } | |
825 | conns := cc.conns | |
826 | cc.conns = nil | |
827 | cc.csMgr.updateState(connectivity.Shutdown) | |
828 | cc.mu.Unlock() | |
829 | if cc.dopts.balancer != nil { | |
830 | cc.dopts.balancer.Close() | |
831 | } | |
832 | for _, ac := range conns { | |
833 | ac.tearDown(ErrClientConnClosing) | |
834 | } | |
835 | return nil | |
836 | } | |
837 | ||
838 | // addrConn is a network connection to a given address. | |
839 | type addrConn struct { | |
840 | ctx context.Context | |
841 | cancel context.CancelFunc | |
842 | ||
843 | cc *ClientConn | |
844 | addr Address | |
845 | dopts dialOptions | |
846 | events trace.EventLog | |
847 | ||
848 | csEvltr *connectivityStateEvaluator | |
849 | ||
850 | mu sync.Mutex | |
851 | state connectivity.State | |
852 | down func(error) // the handler called when a connection is down. | |
853 | // ready is closed and becomes nil when a new transport is up or failed | |
854 | // due to timeout. | |
855 | ready chan struct{} | |
856 | transport transport.ClientTransport | |
857 | ||
858 | // The reason this addrConn is torn down. | |
859 | tearDownErr error | |
860 | } | |
861 | ||
862 | // adjustParams updates parameters used to create transports upon | |
863 | // receiving a GoAway. | |
864 | func (ac *addrConn) adjustParams(r transport.GoAwayReason) { | |
865 | switch r { | |
866 | case transport.TooManyPings: | |
867 | v := 2 * ac.dopts.copts.KeepaliveParams.Time | |
868 | ac.cc.mu.Lock() | |
869 | if v > ac.cc.mkp.Time { | |
870 | ac.cc.mkp.Time = v | |
871 | } | |
872 | ac.cc.mu.Unlock() | |
873 | } | |
874 | } | |
875 | ||
876 | // printf records an event in ac's event log, unless ac has been closed. | |
877 | // REQUIRES ac.mu is held. | |
878 | func (ac *addrConn) printf(format string, a ...interface{}) { | |
879 | if ac.events != nil { | |
880 | ac.events.Printf(format, a...) | |
881 | } | |
882 | } | |
883 | ||
884 | // errorf records an error in ac's event log, unless ac has been closed. | |
885 | // REQUIRES ac.mu is held. | |
886 | func (ac *addrConn) errorf(format string, a ...interface{}) { | |
887 | if ac.events != nil { | |
888 | ac.events.Errorf(format, a...) | |
889 | } | |
890 | } | |
891 | ||
892 | // resetTransport recreates a transport to the address for ac. | |
893 | // For the old transport: | |
894 | // - if drain is true, it will be gracefully closed. | |
895 | // - otherwise, it will be closed. | |
896 | func (ac *addrConn) resetTransport(drain bool) error { | |
897 | ac.mu.Lock() | |
898 | if ac.state == connectivity.Shutdown { | |
899 | ac.mu.Unlock() | |
900 | return errConnClosing | |
901 | } | |
902 | ac.printf("connecting") | |
903 | if ac.down != nil { | |
904 | ac.down(downErrorf(false, true, "%v", errNetworkIO)) | |
905 | ac.down = nil | |
906 | } | |
907 | oldState := ac.state | |
908 | ac.state = connectivity.Connecting | |
909 | ac.csEvltr.recordTransition(oldState, ac.state) | |
910 | t := ac.transport | |
911 | ac.transport = nil | |
912 | ac.mu.Unlock() | |
913 | if t != nil && !drain { | |
914 | t.Close() | |
915 | } | |
916 | ac.cc.mu.RLock() | |
917 | ac.dopts.copts.KeepaliveParams = ac.cc.mkp | |
918 | ac.cc.mu.RUnlock() | |
919 | for retries := 0; ; retries++ { | |
920 | ac.mu.Lock() | |
921 | if ac.state == connectivity.Shutdown { | |
922 | // ac.tearDown(...) has been invoked. | |
923 | ac.mu.Unlock() | |
924 | return errConnClosing | |
925 | } | |
926 | ac.mu.Unlock() | |
927 | sleepTime := ac.dopts.bs.backoff(retries) | |
928 | timeout := minConnectTimeout | |
929 | if timeout < sleepTime { | |
930 | timeout = sleepTime | |
931 | } | |
932 | ctx, cancel := context.WithTimeout(ac.ctx, timeout) | |
933 | connectTime := time.Now() | |
934 | sinfo := transport.TargetInfo{ | |
935 | Addr: ac.addr.Addr, | |
936 | Metadata: ac.addr.Metadata, | |
937 | } | |
938 | newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts) | |
939 | // Don't call cancel in success path due to a race in Go 1.6: | |
940 | // https://github.com/golang/go/issues/15078. | |
941 | if err != nil { | |
942 | cancel() | |
943 | ||
944 | if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { | |
945 | return err | |
946 | } | |
947 | grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr) | |
948 | ac.mu.Lock() | |
949 | if ac.state == connectivity.Shutdown { | |
950 | // ac.tearDown(...) has been invoked. | |
951 | ac.mu.Unlock() | |
952 | return errConnClosing | |
953 | } | |
954 | ac.errorf("transient failure: %v", err) | |
955 | oldState = ac.state | |
956 | ac.state = connectivity.TransientFailure | |
957 | ac.csEvltr.recordTransition(oldState, ac.state) | |
958 | if ac.ready != nil { | |
959 | close(ac.ready) | |
960 | ac.ready = nil | |
961 | } | |
962 | ac.mu.Unlock() | |
963 | timer := time.NewTimer(sleepTime - time.Since(connectTime)) | |
964 | select { | |
965 | case <-timer.C: | |
966 | case <-ac.ctx.Done(): | |
967 | timer.Stop() | |
968 | return ac.ctx.Err() | |
969 | } | |
970 | timer.Stop() | |
971 | continue | |
972 | } | |
973 | ac.mu.Lock() | |
974 | ac.printf("ready") | |
975 | if ac.state == connectivity.Shutdown { | |
976 | // ac.tearDown(...) has been invoked. | |
977 | ac.mu.Unlock() | |
978 | newTransport.Close() | |
979 | return errConnClosing | |
980 | } | |
981 | oldState = ac.state | |
982 | ac.state = connectivity.Ready | |
983 | ac.csEvltr.recordTransition(oldState, ac.state) | |
984 | ac.transport = newTransport | |
985 | if ac.ready != nil { | |
986 | close(ac.ready) | |
987 | ac.ready = nil | |
988 | } | |
989 | if ac.cc.dopts.balancer != nil { | |
990 | ac.down = ac.cc.dopts.balancer.Up(ac.addr) | |
991 | } | |
992 | ac.mu.Unlock() | |
993 | return nil | |
994 | } | |
995 | } | |
996 | ||
997 | // Run in a goroutine to track the error in transport and create the | |
998 | // new transport if an error happens. It returns when the channel is closing. | |
999 | func (ac *addrConn) transportMonitor() { | |
1000 | for { | |
1001 | ac.mu.Lock() | |
1002 | t := ac.transport | |
1003 | ac.mu.Unlock() | |
1004 | select { | |
1005 | // This is needed to detect the teardown when | |
1006 | // the addrConn is idle (i.e., no RPC in flight). | |
1007 | case <-ac.ctx.Done(): | |
1008 | select { | |
1009 | case <-t.Error(): | |
1010 | t.Close() | |
1011 | default: | |
1012 | } | |
1013 | return | |
1014 | case <-t.GoAway(): | |
1015 | ac.adjustParams(t.GetGoAwayReason()) | |
1016 | // If GoAway happens without any network I/O error, the underlying transport | |
1017 | // will be gracefully closed, and a new transport will be created. | |
1018 | // (The transport will be closed when all the pending RPCs finished or failed.) | |
1019 | // If GoAway and some network I/O error happen concurrently, the underlying transport | |
1020 | // will be closed, and a new transport will be created. | |
1021 | var drain bool | |
1022 | select { | |
1023 | case <-t.Error(): | |
1024 | default: | |
1025 | drain = true | |
1026 | } | |
1027 | if err := ac.resetTransport(drain); err != nil { | |
1028 | grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) | |
1029 | if err != errConnClosing { | |
1030 | // Keep this ac in cc.conns, to get the reason it's torn down. | |
1031 | ac.tearDown(err) | |
1032 | } | |
1033 | return | |
1034 | } | |
1035 | case <-t.Error(): | |
1036 | select { | |
1037 | case <-ac.ctx.Done(): | |
1038 | t.Close() | |
1039 | return | |
1040 | case <-t.GoAway(): | |
1041 | ac.adjustParams(t.GetGoAwayReason()) | |
1042 | if err := ac.resetTransport(false); err != nil { | |
1043 | grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) | |
1044 | if err != errConnClosing { | |
1045 | // Keep this ac in cc.conns, to get the reason it's torn down. | |
1046 | ac.tearDown(err) | |
1047 | } | |
1048 | return | |
1049 | } | |
1050 | default: | |
1051 | } | |
1052 | ac.mu.Lock() | |
1053 | if ac.state == connectivity.Shutdown { | |
1054 | // ac has been shutdown. | |
1055 | ac.mu.Unlock() | |
1056 | return | |
1057 | } | |
1058 | oldState := ac.state | |
1059 | ac.state = connectivity.TransientFailure | |
1060 | ac.csEvltr.recordTransition(oldState, ac.state) | |
1061 | ac.mu.Unlock() | |
1062 | if err := ac.resetTransport(false); err != nil { | |
1063 | grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) | |
1064 | ac.mu.Lock() | |
1065 | ac.printf("transport exiting: %v", err) | |
1066 | ac.mu.Unlock() | |
1067 | grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err) | |
1068 | if err != errConnClosing { | |
1069 | // Keep this ac in cc.conns, to get the reason it's torn down. | |
1070 | ac.tearDown(err) | |
1071 | } | |
1072 | return | |
1073 | } | |
1074 | } | |
1075 | } | |
1076 | } | |
1077 | ||
1078 | // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or | |
1079 | // iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true. | |
1080 | func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) { | |
1081 | for { | |
1082 | ac.mu.Lock() | |
1083 | switch { | |
1084 | case ac.state == connectivity.Shutdown: | |
1085 | if failfast || !hasBalancer { | |
1086 | // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr. | |
1087 | err := ac.tearDownErr | |
1088 | ac.mu.Unlock() | |
1089 | return nil, err | |
1090 | } | |
1091 | ac.mu.Unlock() | |
1092 | return nil, errConnClosing | |
1093 | case ac.state == connectivity.Ready: | |
1094 | ct := ac.transport | |
1095 | ac.mu.Unlock() | |
1096 | return ct, nil | |
1097 | case ac.state == connectivity.TransientFailure: | |
1098 | if failfast || hasBalancer { | |
1099 | ac.mu.Unlock() | |
1100 | return nil, errConnUnavailable | |
1101 | } | |
1102 | } | |
1103 | ready := ac.ready | |
1104 | if ready == nil { | |
1105 | ready = make(chan struct{}) | |
1106 | ac.ready = ready | |
1107 | } | |
1108 | ac.mu.Unlock() | |
1109 | select { | |
1110 | case <-ctx.Done(): | |
1111 | return nil, toRPCErr(ctx.Err()) | |
1112 | // Wait until the new transport is ready or failed. | |
1113 | case <-ready: | |
1114 | } | |
1115 | } | |
1116 | } | |
1117 | ||
1118 | // tearDown starts to tear down the addrConn. | |
1119 | // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in | |
1120 | // some edge cases (e.g., the caller opens and closes many addrConn's in a | |
1121 | // tight loop. | |
1122 | // tearDown doesn't remove ac from ac.cc.conns. | |
1123 | func (ac *addrConn) tearDown(err error) { | |
1124 | ac.cancel() | |
1125 | ||
1126 | ac.mu.Lock() | |
1127 | defer ac.mu.Unlock() | |
1128 | if ac.down != nil { | |
1129 | ac.down(downErrorf(false, false, "%v", err)) | |
1130 | ac.down = nil | |
1131 | } | |
1132 | if err == errConnDrain && ac.transport != nil { | |
1133 | // GracefulClose(...) may be executed multiple times when | |
1134 | // i) receiving multiple GoAway frames from the server; or | |
1135 | // ii) there are concurrent name resolver/Balancer triggered | |
1136 | // address removal and GoAway. | |
1137 | ac.transport.GracefulClose() | |
1138 | } | |
1139 | if ac.state == connectivity.Shutdown { | |
1140 | return | |
1141 | } | |
1142 | oldState := ac.state | |
1143 | ac.state = connectivity.Shutdown | |
1144 | ac.tearDownErr = err | |
1145 | ac.csEvltr.recordTransition(oldState, ac.state) | |
1146 | if ac.events != nil { | |
1147 | ac.events.Finish() | |
1148 | ac.events = nil | |
1149 | } | |
1150 | if ac.ready != nil { | |
1151 | close(ac.ready) | |
1152 | ac.ready = nil | |
1153 | } | |
1154 | if ac.transport != nil && err != errConnDrain { | |
1155 | ac.transport.Close() | |
1156 | } | |
1157 | return | |
1158 | } |