]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blame - vendor/google.golang.org/grpc/clientconn.go
Merge pull request #27 from terraform-providers/go-modules-2019-02-22
[github/fretlink/terraform-provider-statuscake.git] / vendor / google.golang.org / grpc / clientconn.go
CommitLineData
15c0b25d
AP
1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
22 "errors"
23 "net"
24 "strings"
25 "sync"
26 "time"
27
28 "golang.org/x/net/context"
29 "golang.org/x/net/trace"
30 "google.golang.org/grpc/connectivity"
31 "google.golang.org/grpc/credentials"
32 "google.golang.org/grpc/grpclog"
33 "google.golang.org/grpc/keepalive"
34 "google.golang.org/grpc/stats"
35 "google.golang.org/grpc/transport"
36)
37
38var (
39 // ErrClientConnClosing indicates that the operation is illegal because
40 // the ClientConn is closing.
41 ErrClientConnClosing = errors.New("grpc: the client connection is closing")
42 // ErrClientConnTimeout indicates that the ClientConn cannot establish the
43 // underlying connections within the specified timeout.
44 // DEPRECATED: Please use context.DeadlineExceeded instead.
45 ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
46
47 // errNoTransportSecurity indicates that there is no transport security
48 // being set for ClientConn. Users should either set one or explicitly
49 // call WithInsecure DialOption to disable security.
50 errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
51 // errTransportCredentialsMissing indicates that users want to transmit security
52 // information (e.g., oauth2 token) which requires secure connection on an insecure
53 // connection.
54 errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
55 // errCredentialsConflict indicates that grpc.WithTransportCredentials()
56 // and grpc.WithInsecure() are both called for a connection.
57 errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
58 // errNetworkIO indicates that the connection is down due to some network I/O error.
59 errNetworkIO = errors.New("grpc: failed with network I/O error")
60 // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
61 errConnDrain = errors.New("grpc: the connection is drained")
62 // errConnClosing indicates that the connection is closing.
63 errConnClosing = errors.New("grpc: the connection is closing")
64 // errConnUnavailable indicates that the connection is unavailable.
65 errConnUnavailable = errors.New("grpc: the connection is unavailable")
66 // errBalancerClosed indicates that the balancer is closed.
67 errBalancerClosed = errors.New("grpc: balancer is closed")
68 // minimum time to give a connection to complete
69 minConnectTimeout = 20 * time.Second
70)
71
72// dialOptions configure a Dial call. dialOptions are set by the DialOption
73// values passed to Dial.
74type dialOptions struct {
75 unaryInt UnaryClientInterceptor
76 streamInt StreamClientInterceptor
77 codec Codec
78 cp Compressor
79 dc Decompressor
80 bs backoffStrategy
81 balancer Balancer
82 block bool
83 insecure bool
84 timeout time.Duration
85 scChan <-chan ServiceConfig
86 copts transport.ConnectOptions
87 callOptions []CallOption
88}
89
90const (
91 defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
92 defaultClientMaxSendMessageSize = 1024 * 1024 * 4
93)
94
95// DialOption configures how we set up the connection.
96type DialOption func(*dialOptions)
97
98// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
99// The lower bound for window size is 64K and any value smaller than that will be ignored.
100func WithInitialWindowSize(s int32) DialOption {
101 return func(o *dialOptions) {
102 o.copts.InitialWindowSize = s
103 }
104}
105
106// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
107// The lower bound for window size is 64K and any value smaller than that will be ignored.
108func WithInitialConnWindowSize(s int32) DialOption {
109 return func(o *dialOptions) {
110 o.copts.InitialConnWindowSize = s
111 }
112}
113
114// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
115func WithMaxMsgSize(s int) DialOption {
116 return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
117}
118
119// WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
120func WithDefaultCallOptions(cos ...CallOption) DialOption {
121 return func(o *dialOptions) {
122 o.callOptions = append(o.callOptions, cos...)
123 }
124}
125
126// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
127func WithCodec(c Codec) DialOption {
128 return func(o *dialOptions) {
129 o.codec = c
130 }
131}
132
133// WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
134// compressor.
135func WithCompressor(cp Compressor) DialOption {
136 return func(o *dialOptions) {
137 o.cp = cp
138 }
139}
140
141// WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
142// message decompressor.
143func WithDecompressor(dc Decompressor) DialOption {
144 return func(o *dialOptions) {
145 o.dc = dc
146 }
147}
148
149// WithBalancer returns a DialOption which sets a load balancer.
150func WithBalancer(b Balancer) DialOption {
151 return func(o *dialOptions) {
152 o.balancer = b
153 }
154}
155
156// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
157func WithServiceConfig(c <-chan ServiceConfig) DialOption {
158 return func(o *dialOptions) {
159 o.scChan = c
160 }
161}
162
163// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
164// when backing off after failed connection attempts.
165func WithBackoffMaxDelay(md time.Duration) DialOption {
166 return WithBackoffConfig(BackoffConfig{MaxDelay: md})
167}
168
169// WithBackoffConfig configures the dialer to use the provided backoff
170// parameters after connection failures.
171//
172// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
173// for use.
174func WithBackoffConfig(b BackoffConfig) DialOption {
175 // Set defaults to ensure that provided BackoffConfig is valid and
176 // unexported fields get default values.
177 setDefaults(&b)
178 return withBackoff(b)
179}
180
181// withBackoff sets the backoff strategy used for retries after a
182// failed connection attempt.
183//
184// This can be exported if arbitrary backoff strategies are allowed by gRPC.
185func withBackoff(bs backoffStrategy) DialOption {
186 return func(o *dialOptions) {
187 o.bs = bs
188 }
189}
190
191// WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
192// connection is up. Without this, Dial returns immediately and connecting the server
193// happens in background.
194func WithBlock() DialOption {
195 return func(o *dialOptions) {
196 o.block = true
197 }
198}
199
200// WithInsecure returns a DialOption which disables transport security for this ClientConn.
201// Note that transport security is required unless WithInsecure is set.
202func WithInsecure() DialOption {
203 return func(o *dialOptions) {
204 o.insecure = true
205 }
206}
207
208// WithTransportCredentials returns a DialOption which configures a
209// connection level security credentials (e.g., TLS/SSL).
210func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
211 return func(o *dialOptions) {
212 o.copts.TransportCredentials = creds
213 }
214}
215
216// WithPerRPCCredentials returns a DialOption which sets
217// credentials and places auth state on each outbound RPC.
218func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
219 return func(o *dialOptions) {
220 o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
221 }
222}
223
224// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
225// initially. This is valid if and only if WithBlock() is present.
226// Deprecated: use DialContext and context.WithTimeout instead.
227func WithTimeout(d time.Duration) DialOption {
228 return func(o *dialOptions) {
229 o.timeout = d
230 }
231}
232
233// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
234// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
235// Temporary() method to decide if it should try to reconnect to the network address.
236func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
237 return func(o *dialOptions) {
238 o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
239 if deadline, ok := ctx.Deadline(); ok {
240 return f(addr, deadline.Sub(time.Now()))
241 }
242 return f(addr, 0)
243 }
244 }
245}
246
247// WithStatsHandler returns a DialOption that specifies the stats handler
248// for all the RPCs and underlying network connections in this ClientConn.
249func WithStatsHandler(h stats.Handler) DialOption {
250 return func(o *dialOptions) {
251 o.copts.StatsHandler = h
252 }
253}
254
255// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
256// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
257// address and won't try to reconnect.
258// The default value of FailOnNonTempDialError is false.
259// This is an EXPERIMENTAL API.
260func FailOnNonTempDialError(f bool) DialOption {
261 return func(o *dialOptions) {
262 o.copts.FailOnNonTempDialError = f
263 }
264}
265
266// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
267func WithUserAgent(s string) DialOption {
268 return func(o *dialOptions) {
269 o.copts.UserAgent = s
270 }
271}
272
273// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
274func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
275 return func(o *dialOptions) {
276 o.copts.KeepaliveParams = kp
277 }
278}
279
280// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
281func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
282 return func(o *dialOptions) {
283 o.unaryInt = f
284 }
285}
286
287// WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
288func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
289 return func(o *dialOptions) {
290 o.streamInt = f
291 }
292}
293
294// WithAuthority returns a DialOption that specifies the value to be used as
295// the :authority pseudo-header. This value only works with WithInsecure and
296// has no effect if TransportCredentials are present.
297func WithAuthority(a string) DialOption {
298 return func(o *dialOptions) {
299 o.copts.Authority = a
300 }
301}
302
303// Dial creates a client connection to the given target.
304func Dial(target string, opts ...DialOption) (*ClientConn, error) {
305 return DialContext(context.Background(), target, opts...)
306}
307
308// DialContext creates a client connection to the given target. ctx can be used to
309// cancel or expire the pending connection. Once this function returns, the
310// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
311// to terminate all the pending operations after this function returns.
312func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
313 cc := &ClientConn{
314 target: target,
315 csMgr: &connectivityStateManager{},
316 conns: make(map[Address]*addrConn),
317 }
318 cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr}
319 cc.ctx, cc.cancel = context.WithCancel(context.Background())
320
321 for _, opt := range opts {
322 opt(&cc.dopts)
323 }
324 cc.mkp = cc.dopts.copts.KeepaliveParams
325
326 if cc.dopts.copts.Dialer == nil {
327 cc.dopts.copts.Dialer = newProxyDialer(
328 func(ctx context.Context, addr string) (net.Conn, error) {
329 return dialContext(ctx, "tcp", addr)
330 },
331 )
332 }
333
334 if cc.dopts.copts.UserAgent != "" {
335 cc.dopts.copts.UserAgent += " " + grpcUA
336 } else {
337 cc.dopts.copts.UserAgent = grpcUA
338 }
339
340 if cc.dopts.timeout > 0 {
341 var cancel context.CancelFunc
342 ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)
343 defer cancel()
344 }
345
346 defer func() {
347 select {
348 case <-ctx.Done():
349 conn, err = nil, ctx.Err()
350 default:
351 }
352
353 if err != nil {
354 cc.Close()
355 }
356 }()
357
358 scSet := false
359 if cc.dopts.scChan != nil {
360 // Try to get an initial service config.
361 select {
362 case sc, ok := <-cc.dopts.scChan:
363 if ok {
364 cc.sc = sc
365 scSet = true
366 }
367 default:
368 }
369 }
370 // Set defaults.
371 if cc.dopts.codec == nil {
372 cc.dopts.codec = protoCodec{}
373 }
374 if cc.dopts.bs == nil {
375 cc.dopts.bs = DefaultBackoffConfig
376 }
377 creds := cc.dopts.copts.TransportCredentials
378 if creds != nil && creds.Info().ServerName != "" {
379 cc.authority = creds.Info().ServerName
380 } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
381 cc.authority = cc.dopts.copts.Authority
382 } else {
383 cc.authority = target
384 }
385 waitC := make(chan error, 1)
386 go func() {
387 defer close(waitC)
388 if cc.dopts.balancer == nil && cc.sc.LB != nil {
389 cc.dopts.balancer = cc.sc.LB
390 }
391 if cc.dopts.balancer != nil {
392 var credsClone credentials.TransportCredentials
393 if creds != nil {
394 credsClone = creds.Clone()
395 }
396 config := BalancerConfig{
397 DialCreds: credsClone,
398 Dialer: cc.dopts.copts.Dialer,
399 }
400 if err := cc.dopts.balancer.Start(target, config); err != nil {
401 waitC <- err
402 return
403 }
404 ch := cc.dopts.balancer.Notify()
405 if ch != nil {
406 if cc.dopts.block {
407 doneChan := make(chan struct{})
408 go cc.lbWatcher(doneChan)
409 <-doneChan
410 } else {
411 go cc.lbWatcher(nil)
412 }
413 return
414 }
415 }
416 // No balancer, or no resolver within the balancer. Connect directly.
417 if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil {
418 waitC <- err
419 return
420 }
421 }()
422 select {
423 case <-ctx.Done():
424 return nil, ctx.Err()
425 case err := <-waitC:
426 if err != nil {
427 return nil, err
428 }
429 }
430 if cc.dopts.scChan != nil && !scSet {
431 // Blocking wait for the initial service config.
432 select {
433 case sc, ok := <-cc.dopts.scChan:
434 if ok {
435 cc.sc = sc
436 }
437 case <-ctx.Done():
438 return nil, ctx.Err()
439 }
440 }
441 if cc.dopts.scChan != nil {
442 go cc.scWatcher()
443 }
444
445 return cc, nil
446}
447
448// connectivityStateEvaluator gets updated by addrConns when their
449// states transition, based on which it evaluates the state of
450// ClientConn.
451// Note: This code will eventually sit in the balancer in the new design.
452type connectivityStateEvaluator struct {
453 csMgr *connectivityStateManager
454 mu sync.Mutex
455 numReady uint64 // Number of addrConns in ready state.
456 numConnecting uint64 // Number of addrConns in connecting state.
457 numTransientFailure uint64 // Number of addrConns in transientFailure.
458}
459
460// recordTransition records state change happening in every addrConn and based on
461// that it evaluates what state the ClientConn is in.
462// It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states,
463// Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection
464// before any addrConn is created ClientConn is in idle state. In the end when ClientConn
465// closes it is in connectivity.Shutdown state.
466// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
467func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) {
468 cse.mu.Lock()
469 defer cse.mu.Unlock()
470
471 // Update counters.
472 for idx, state := range []connectivity.State{oldState, newState} {
473 updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
474 switch state {
475 case connectivity.Ready:
476 cse.numReady += updateVal
477 case connectivity.Connecting:
478 cse.numConnecting += updateVal
479 case connectivity.TransientFailure:
480 cse.numTransientFailure += updateVal
481 }
482 }
483
484 // Evaluate.
485 if cse.numReady > 0 {
486 cse.csMgr.updateState(connectivity.Ready)
487 return
488 }
489 if cse.numConnecting > 0 {
490 cse.csMgr.updateState(connectivity.Connecting)
491 return
492 }
493 cse.csMgr.updateState(connectivity.TransientFailure)
494}
495
496// connectivityStateManager keeps the connectivity.State of ClientConn.
497// This struct will eventually be exported so the balancers can access it.
498type connectivityStateManager struct {
499 mu sync.Mutex
500 state connectivity.State
501 notifyChan chan struct{}
502}
503
504// updateState updates the connectivity.State of ClientConn.
505// If there's a change it notifies goroutines waiting on state change to
506// happen.
507func (csm *connectivityStateManager) updateState(state connectivity.State) {
508 csm.mu.Lock()
509 defer csm.mu.Unlock()
510 if csm.state == connectivity.Shutdown {
511 return
512 }
513 if csm.state == state {
514 return
515 }
516 csm.state = state
517 if csm.notifyChan != nil {
518 // There are other goroutines waiting on this channel.
519 close(csm.notifyChan)
520 csm.notifyChan = nil
521 }
522}
523
524func (csm *connectivityStateManager) getState() connectivity.State {
525 csm.mu.Lock()
526 defer csm.mu.Unlock()
527 return csm.state
528}
529
530func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
531 csm.mu.Lock()
532 defer csm.mu.Unlock()
533 if csm.notifyChan == nil {
534 csm.notifyChan = make(chan struct{})
535 }
536 return csm.notifyChan
537}
538
539// ClientConn represents a client connection to an RPC server.
540type ClientConn struct {
541 ctx context.Context
542 cancel context.CancelFunc
543
544 target string
545 authority string
546 dopts dialOptions
547 csMgr *connectivityStateManager
548 csEvltr *connectivityStateEvaluator // This will eventually be part of balancer.
549
550 mu sync.RWMutex
551 sc ServiceConfig
552 conns map[Address]*addrConn
553 // Keepalive parameter can be updated if a GoAway is received.
554 mkp keepalive.ClientParameters
555}
556
557// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
558// ctx expires. A true value is returned in former case and false in latter.
559// This is an EXPERIMENTAL API.
560func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
561 ch := cc.csMgr.getNotifyChan()
562 if cc.csMgr.getState() != sourceState {
563 return true
564 }
565 select {
566 case <-ctx.Done():
567 return false
568 case <-ch:
569 return true
570 }
571}
572
573// GetState returns the connectivity.State of ClientConn.
574// This is an EXPERIMENTAL API.
575func (cc *ClientConn) GetState() connectivity.State {
576 return cc.csMgr.getState()
577}
578
579// lbWatcher watches the Notify channel of the balancer in cc and manages
580// connections accordingly. If doneChan is not nil, it is closed after the
581// first successfull connection is made.
582func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
583 defer func() {
584 // In case channel from cc.dopts.balancer.Notify() gets closed before a
585 // successful connection gets established, don't forget to notify the
586 // caller.
587 if doneChan != nil {
588 close(doneChan)
589 }
590 }()
591
592 for addrs := range cc.dopts.balancer.Notify() {
593 var (
594 add []Address // Addresses need to setup connections.
595 del []*addrConn // Connections need to tear down.
596 )
597 cc.mu.Lock()
598 for _, a := range addrs {
599 if _, ok := cc.conns[a]; !ok {
600 add = append(add, a)
601 }
602 }
603 for k, c := range cc.conns {
604 var keep bool
605 for _, a := range addrs {
606 if k == a {
607 keep = true
608 break
609 }
610 }
611 if !keep {
612 del = append(del, c)
613 delete(cc.conns, c.addr)
614 }
615 }
616 cc.mu.Unlock()
617 for _, a := range add {
618 var err error
619 if doneChan != nil {
620 err = cc.resetAddrConn(a, true, nil)
621 if err == nil {
622 close(doneChan)
623 doneChan = nil
624 }
625 } else {
626 err = cc.resetAddrConn(a, false, nil)
627 }
628 if err != nil {
629 grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
630 }
631 }
632 for _, c := range del {
633 c.tearDown(errConnDrain)
634 }
635 }
636}
637
638func (cc *ClientConn) scWatcher() {
639 for {
640 select {
641 case sc, ok := <-cc.dopts.scChan:
642 if !ok {
643 return
644 }
645 cc.mu.Lock()
646 // TODO: load balance policy runtime change is ignored.
647 // We may revist this decision in the future.
648 cc.sc = sc
649 cc.mu.Unlock()
650 case <-cc.ctx.Done():
651 return
652 }
653 }
654}
655
656// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
657// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
658// If tearDownErr is nil, errConnDrain will be used instead.
659//
660// We should never need to replace an addrConn with a new one. This function is only used
661// as newAddrConn to create new addrConn.
662// TODO rename this function and clean up the code.
663func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
664 ac := &addrConn{
665 cc: cc,
666 addr: addr,
667 dopts: cc.dopts,
668 }
669 ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
670 ac.csEvltr = cc.csEvltr
671 if EnableTracing {
672 ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
673 }
674 if !ac.dopts.insecure {
675 if ac.dopts.copts.TransportCredentials == nil {
676 return errNoTransportSecurity
677 }
678 } else {
679 if ac.dopts.copts.TransportCredentials != nil {
680 return errCredentialsConflict
681 }
682 for _, cd := range ac.dopts.copts.PerRPCCredentials {
683 if cd.RequireTransportSecurity() {
684 return errTransportCredentialsMissing
685 }
686 }
687 }
688 // Track ac in cc. This needs to be done before any getTransport(...) is called.
689 cc.mu.Lock()
690 if cc.conns == nil {
691 cc.mu.Unlock()
692 return ErrClientConnClosing
693 }
694 stale := cc.conns[ac.addr]
695 cc.conns[ac.addr] = ac
696 cc.mu.Unlock()
697 if stale != nil {
698 // There is an addrConn alive on ac.addr already. This could be due to
699 // a buggy Balancer that reports duplicated Addresses.
700 if tearDownErr == nil {
701 // tearDownErr is nil if resetAddrConn is called by
702 // 1) Dial
703 // 2) lbWatcher
704 // In both cases, the stale ac should drain, not close.
705 stale.tearDown(errConnDrain)
706 } else {
707 stale.tearDown(tearDownErr)
708 }
709 }
710 if block {
711 if err := ac.resetTransport(false); err != nil {
712 if err != errConnClosing {
713 // Tear down ac and delete it from cc.conns.
714 cc.mu.Lock()
715 delete(cc.conns, ac.addr)
716 cc.mu.Unlock()
717 ac.tearDown(err)
718 }
719 if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
720 return e.Origin()
721 }
722 return err
723 }
724 // Start to monitor the error status of transport.
725 go ac.transportMonitor()
726 } else {
727 // Start a goroutine connecting to the server asynchronously.
728 go func() {
729 if err := ac.resetTransport(false); err != nil {
730 grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
731 if err != errConnClosing {
732 // Keep this ac in cc.conns, to get the reason it's torn down.
733 ac.tearDown(err)
734 }
735 return
736 }
737 ac.transportMonitor()
738 }()
739 }
740 return nil
741}
742
743// GetMethodConfig gets the method config of the input method.
744// If there's an exact match for input method (i.e. /service/method), we return
745// the corresponding MethodConfig.
746// If there isn't an exact match for the input method, we look for the default config
747// under the service (i.e /service/). If there is a default MethodConfig for
748// the serivce, we return it.
749// Otherwise, we return an empty MethodConfig.
750func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
751 // TODO: Avoid the locking here.
752 cc.mu.RLock()
753 defer cc.mu.RUnlock()
754 m, ok := cc.sc.Methods[method]
755 if !ok {
756 i := strings.LastIndex(method, "/")
757 m, _ = cc.sc.Methods[method[:i+1]]
758 }
759 return m
760}
761
762func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
763 var (
764 ac *addrConn
765 ok bool
766 put func()
767 )
768 if cc.dopts.balancer == nil {
769 // If balancer is nil, there should be only one addrConn available.
770 cc.mu.RLock()
771 if cc.conns == nil {
772 cc.mu.RUnlock()
773 return nil, nil, toRPCErr(ErrClientConnClosing)
774 }
775 for _, ac = range cc.conns {
776 // Break after the first iteration to get the first addrConn.
777 ok = true
778 break
779 }
780 cc.mu.RUnlock()
781 } else {
782 var (
783 addr Address
784 err error
785 )
786 addr, put, err = cc.dopts.balancer.Get(ctx, opts)
787 if err != nil {
788 return nil, nil, toRPCErr(err)
789 }
790 cc.mu.RLock()
791 if cc.conns == nil {
792 cc.mu.RUnlock()
793 return nil, nil, toRPCErr(ErrClientConnClosing)
794 }
795 ac, ok = cc.conns[addr]
796 cc.mu.RUnlock()
797 }
798 if !ok {
799 if put != nil {
800 updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
801 put()
802 }
803 return nil, nil, errConnClosing
804 }
805 t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
806 if err != nil {
807 if put != nil {
808 updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
809 put()
810 }
811 return nil, nil, err
812 }
813 return t, put, nil
814}
815
816// Close tears down the ClientConn and all underlying connections.
817func (cc *ClientConn) Close() error {
818 cc.cancel()
819
820 cc.mu.Lock()
821 if cc.conns == nil {
822 cc.mu.Unlock()
823 return ErrClientConnClosing
824 }
825 conns := cc.conns
826 cc.conns = nil
827 cc.csMgr.updateState(connectivity.Shutdown)
828 cc.mu.Unlock()
829 if cc.dopts.balancer != nil {
830 cc.dopts.balancer.Close()
831 }
832 for _, ac := range conns {
833 ac.tearDown(ErrClientConnClosing)
834 }
835 return nil
836}
837
838// addrConn is a network connection to a given address.
839type addrConn struct {
840 ctx context.Context
841 cancel context.CancelFunc
842
843 cc *ClientConn
844 addr Address
845 dopts dialOptions
846 events trace.EventLog
847
848 csEvltr *connectivityStateEvaluator
849
850 mu sync.Mutex
851 state connectivity.State
852 down func(error) // the handler called when a connection is down.
853 // ready is closed and becomes nil when a new transport is up or failed
854 // due to timeout.
855 ready chan struct{}
856 transport transport.ClientTransport
857
858 // The reason this addrConn is torn down.
859 tearDownErr error
860}
861
862// adjustParams updates parameters used to create transports upon
863// receiving a GoAway.
864func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
865 switch r {
866 case transport.TooManyPings:
867 v := 2 * ac.dopts.copts.KeepaliveParams.Time
868 ac.cc.mu.Lock()
869 if v > ac.cc.mkp.Time {
870 ac.cc.mkp.Time = v
871 }
872 ac.cc.mu.Unlock()
873 }
874}
875
876// printf records an event in ac's event log, unless ac has been closed.
877// REQUIRES ac.mu is held.
878func (ac *addrConn) printf(format string, a ...interface{}) {
879 if ac.events != nil {
880 ac.events.Printf(format, a...)
881 }
882}
883
884// errorf records an error in ac's event log, unless ac has been closed.
885// REQUIRES ac.mu is held.
886func (ac *addrConn) errorf(format string, a ...interface{}) {
887 if ac.events != nil {
888 ac.events.Errorf(format, a...)
889 }
890}
891
892// resetTransport recreates a transport to the address for ac.
893// For the old transport:
894// - if drain is true, it will be gracefully closed.
895// - otherwise, it will be closed.
896func (ac *addrConn) resetTransport(drain bool) error {
897 ac.mu.Lock()
898 if ac.state == connectivity.Shutdown {
899 ac.mu.Unlock()
900 return errConnClosing
901 }
902 ac.printf("connecting")
903 if ac.down != nil {
904 ac.down(downErrorf(false, true, "%v", errNetworkIO))
905 ac.down = nil
906 }
907 oldState := ac.state
908 ac.state = connectivity.Connecting
909 ac.csEvltr.recordTransition(oldState, ac.state)
910 t := ac.transport
911 ac.transport = nil
912 ac.mu.Unlock()
913 if t != nil && !drain {
914 t.Close()
915 }
916 ac.cc.mu.RLock()
917 ac.dopts.copts.KeepaliveParams = ac.cc.mkp
918 ac.cc.mu.RUnlock()
919 for retries := 0; ; retries++ {
920 ac.mu.Lock()
921 if ac.state == connectivity.Shutdown {
922 // ac.tearDown(...) has been invoked.
923 ac.mu.Unlock()
924 return errConnClosing
925 }
926 ac.mu.Unlock()
927 sleepTime := ac.dopts.bs.backoff(retries)
928 timeout := minConnectTimeout
929 if timeout < sleepTime {
930 timeout = sleepTime
931 }
932 ctx, cancel := context.WithTimeout(ac.ctx, timeout)
933 connectTime := time.Now()
934 sinfo := transport.TargetInfo{
935 Addr: ac.addr.Addr,
936 Metadata: ac.addr.Metadata,
937 }
938 newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
939 // Don't call cancel in success path due to a race in Go 1.6:
940 // https://github.com/golang/go/issues/15078.
941 if err != nil {
942 cancel()
943
944 if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
945 return err
946 }
947 grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
948 ac.mu.Lock()
949 if ac.state == connectivity.Shutdown {
950 // ac.tearDown(...) has been invoked.
951 ac.mu.Unlock()
952 return errConnClosing
953 }
954 ac.errorf("transient failure: %v", err)
955 oldState = ac.state
956 ac.state = connectivity.TransientFailure
957 ac.csEvltr.recordTransition(oldState, ac.state)
958 if ac.ready != nil {
959 close(ac.ready)
960 ac.ready = nil
961 }
962 ac.mu.Unlock()
963 timer := time.NewTimer(sleepTime - time.Since(connectTime))
964 select {
965 case <-timer.C:
966 case <-ac.ctx.Done():
967 timer.Stop()
968 return ac.ctx.Err()
969 }
970 timer.Stop()
971 continue
972 }
973 ac.mu.Lock()
974 ac.printf("ready")
975 if ac.state == connectivity.Shutdown {
976 // ac.tearDown(...) has been invoked.
977 ac.mu.Unlock()
978 newTransport.Close()
979 return errConnClosing
980 }
981 oldState = ac.state
982 ac.state = connectivity.Ready
983 ac.csEvltr.recordTransition(oldState, ac.state)
984 ac.transport = newTransport
985 if ac.ready != nil {
986 close(ac.ready)
987 ac.ready = nil
988 }
989 if ac.cc.dopts.balancer != nil {
990 ac.down = ac.cc.dopts.balancer.Up(ac.addr)
991 }
992 ac.mu.Unlock()
993 return nil
994 }
995}
996
997// Run in a goroutine to track the error in transport and create the
998// new transport if an error happens. It returns when the channel is closing.
999func (ac *addrConn) transportMonitor() {
1000 for {
1001 ac.mu.Lock()
1002 t := ac.transport
1003 ac.mu.Unlock()
1004 select {
1005 // This is needed to detect the teardown when
1006 // the addrConn is idle (i.e., no RPC in flight).
1007 case <-ac.ctx.Done():
1008 select {
1009 case <-t.Error():
1010 t.Close()
1011 default:
1012 }
1013 return
1014 case <-t.GoAway():
1015 ac.adjustParams(t.GetGoAwayReason())
1016 // If GoAway happens without any network I/O error, the underlying transport
1017 // will be gracefully closed, and a new transport will be created.
1018 // (The transport will be closed when all the pending RPCs finished or failed.)
1019 // If GoAway and some network I/O error happen concurrently, the underlying transport
1020 // will be closed, and a new transport will be created.
1021 var drain bool
1022 select {
1023 case <-t.Error():
1024 default:
1025 drain = true
1026 }
1027 if err := ac.resetTransport(drain); err != nil {
1028 grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
1029 if err != errConnClosing {
1030 // Keep this ac in cc.conns, to get the reason it's torn down.
1031 ac.tearDown(err)
1032 }
1033 return
1034 }
1035 case <-t.Error():
1036 select {
1037 case <-ac.ctx.Done():
1038 t.Close()
1039 return
1040 case <-t.GoAway():
1041 ac.adjustParams(t.GetGoAwayReason())
1042 if err := ac.resetTransport(false); err != nil {
1043 grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
1044 if err != errConnClosing {
1045 // Keep this ac in cc.conns, to get the reason it's torn down.
1046 ac.tearDown(err)
1047 }
1048 return
1049 }
1050 default:
1051 }
1052 ac.mu.Lock()
1053 if ac.state == connectivity.Shutdown {
1054 // ac has been shutdown.
1055 ac.mu.Unlock()
1056 return
1057 }
1058 oldState := ac.state
1059 ac.state = connectivity.TransientFailure
1060 ac.csEvltr.recordTransition(oldState, ac.state)
1061 ac.mu.Unlock()
1062 if err := ac.resetTransport(false); err != nil {
1063 grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
1064 ac.mu.Lock()
1065 ac.printf("transport exiting: %v", err)
1066 ac.mu.Unlock()
1067 grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
1068 if err != errConnClosing {
1069 // Keep this ac in cc.conns, to get the reason it's torn down.
1070 ac.tearDown(err)
1071 }
1072 return
1073 }
1074 }
1075 }
1076}
1077
1078// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
1079// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
1080func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
1081 for {
1082 ac.mu.Lock()
1083 switch {
1084 case ac.state == connectivity.Shutdown:
1085 if failfast || !hasBalancer {
1086 // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
1087 err := ac.tearDownErr
1088 ac.mu.Unlock()
1089 return nil, err
1090 }
1091 ac.mu.Unlock()
1092 return nil, errConnClosing
1093 case ac.state == connectivity.Ready:
1094 ct := ac.transport
1095 ac.mu.Unlock()
1096 return ct, nil
1097 case ac.state == connectivity.TransientFailure:
1098 if failfast || hasBalancer {
1099 ac.mu.Unlock()
1100 return nil, errConnUnavailable
1101 }
1102 }
1103 ready := ac.ready
1104 if ready == nil {
1105 ready = make(chan struct{})
1106 ac.ready = ready
1107 }
1108 ac.mu.Unlock()
1109 select {
1110 case <-ctx.Done():
1111 return nil, toRPCErr(ctx.Err())
1112 // Wait until the new transport is ready or failed.
1113 case <-ready:
1114 }
1115 }
1116}
1117
1118// tearDown starts to tear down the addrConn.
1119// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
1120// some edge cases (e.g., the caller opens and closes many addrConn's in a
1121// tight loop.
1122// tearDown doesn't remove ac from ac.cc.conns.
1123func (ac *addrConn) tearDown(err error) {
1124 ac.cancel()
1125
1126 ac.mu.Lock()
1127 defer ac.mu.Unlock()
1128 if ac.down != nil {
1129 ac.down(downErrorf(false, false, "%v", err))
1130 ac.down = nil
1131 }
1132 if err == errConnDrain && ac.transport != nil {
1133 // GracefulClose(...) may be executed multiple times when
1134 // i) receiving multiple GoAway frames from the server; or
1135 // ii) there are concurrent name resolver/Balancer triggered
1136 // address removal and GoAway.
1137 ac.transport.GracefulClose()
1138 }
1139 if ac.state == connectivity.Shutdown {
1140 return
1141 }
1142 oldState := ac.state
1143 ac.state = connectivity.Shutdown
1144 ac.tearDownErr = err
1145 ac.csEvltr.recordTransition(oldState, ac.state)
1146 if ac.events != nil {
1147 ac.events.Finish()
1148 ac.events = nil
1149 }
1150 if ac.ready != nil {
1151 close(ac.ready)
1152 ac.ready = nil
1153 }
1154 if ac.transport != nil && err != errConnDrain {
1155 ac.transport.Close()
1156 }
1157 return
1158}