]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blobdiff - vendor/google.golang.org/grpc/clientconn.go
Upgrade to 0.12
[github/fretlink/terraform-provider-statuscake.git] / vendor / google.golang.org / grpc / clientconn.go
index e3f6cb19a3086a538ae715a5ac95378294d9b794..56d0bf713d73ea91c8c78b76fc3a1b6220a3e65e 100644 (file)
 package grpc
 
 import (
+       "context"
        "errors"
+       "fmt"
+       "math"
        "net"
+       "reflect"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
-       "golang.org/x/net/context"
-       "golang.org/x/net/trace"
+       "google.golang.org/grpc/balancer"
+       _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
+       "google.golang.org/grpc/codes"
        "google.golang.org/grpc/connectivity"
        "google.golang.org/grpc/credentials"
        "google.golang.org/grpc/grpclog"
+       "google.golang.org/grpc/internal/backoff"
+       "google.golang.org/grpc/internal/channelz"
+       "google.golang.org/grpc/internal/envconfig"
+       "google.golang.org/grpc/internal/grpcsync"
+       "google.golang.org/grpc/internal/transport"
        "google.golang.org/grpc/keepalive"
-       "google.golang.org/grpc/stats"
-       "google.golang.org/grpc/transport"
+       "google.golang.org/grpc/metadata"
+       "google.golang.org/grpc/resolver"
+       _ "google.golang.org/grpc/resolver/dns"         // To register dns resolver.
+       _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
+       "google.golang.org/grpc/status"
+)
+
+const (
+       // minimum time to give a connection to complete
+       minConnectTimeout = 20 * time.Second
+       // must match grpclbName in grpclb/grpclb.go
+       grpclbName = "grpclb"
 )
 
 var (
        // ErrClientConnClosing indicates that the operation is illegal because
        // the ClientConn is closing.
-       ErrClientConnClosing = errors.New("grpc: the client connection is closing")
-       // ErrClientConnTimeout indicates that the ClientConn cannot establish the
-       // underlying connections within the specified timeout.
-       // DEPRECATED: Please use context.DeadlineExceeded instead.
-       ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
+       //
+       // Deprecated: this error should not be relied upon by users; use the status
+       // code of Canceled instead.
+       ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing")
+       // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
+       errConnDrain = errors.New("grpc: the connection is drained")
+       // errConnClosing indicates that the connection is closing.
+       errConnClosing = errors.New("grpc: the connection is closing")
+       // errBalancerClosed indicates that the balancer is closed.
+       errBalancerClosed = errors.New("grpc: balancer is closed")
+       // We use an accessor so that minConnectTimeout can be
+       // atomically read and updated while testing.
+       getMinConnectTimeout = func() time.Duration {
+               return minConnectTimeout
+       }
+)
 
+// The following errors are returned from Dial and DialContext
+var (
        // errNoTransportSecurity indicates that there is no transport security
        // being set for ClientConn. Users should either set one or explicitly
        // call WithInsecure DialOption to disable security.
        errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)")
+       // errTransportCredsAndBundle indicates that creds bundle is used together
+       // with other individual Transport Credentials.
+       errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials")
        // errTransportCredentialsMissing indicates that users want to transmit security
        // information (e.g., oauth2 token) which requires secure connection on an insecure
        // connection.
@@ -55,278 +92,100 @@ var (
        // errCredentialsConflict indicates that grpc.WithTransportCredentials()
        // and grpc.WithInsecure() are both called for a connection.
        errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
-       // errNetworkIO indicates that the connection is down due to some network I/O error.
-       errNetworkIO = errors.New("grpc: failed with network I/O error")
-       // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
-       errConnDrain = errors.New("grpc: the connection is drained")
-       // errConnClosing indicates that the connection is closing.
-       errConnClosing = errors.New("grpc: the connection is closing")
-       // errConnUnavailable indicates that the connection is unavailable.
-       errConnUnavailable = errors.New("grpc: the connection is unavailable")
-       // errBalancerClosed indicates that the balancer is closed.
-       errBalancerClosed = errors.New("grpc: balancer is closed")
-       // minimum time to give a connection to complete
-       minConnectTimeout = 20 * time.Second
 )
 
-// dialOptions configure a Dial call. dialOptions are set by the DialOption
-// values passed to Dial.
-type dialOptions struct {
-       unaryInt    UnaryClientInterceptor
-       streamInt   StreamClientInterceptor
-       codec       Codec
-       cp          Compressor
-       dc          Decompressor
-       bs          backoffStrategy
-       balancer    Balancer
-       block       bool
-       insecure    bool
-       timeout     time.Duration
-       scChan      <-chan ServiceConfig
-       copts       transport.ConnectOptions
-       callOptions []CallOption
-}
-
 const (
        defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
-       defaultClientMaxSendMessageSize    = 1024 * 1024 * 4
+       defaultClientMaxSendMessageSize    = math.MaxInt32
+       // http2IOBufSize specifies the buffer size for sending frames.
+       defaultWriteBufSize = 32 * 1024
+       defaultReadBufSize  = 32 * 1024
 )
 
-// DialOption configures how we set up the connection.
-type DialOption func(*dialOptions)
-
-// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
-// The lower bound for window size is 64K and any value smaller than that will be ignored.
-func WithInitialWindowSize(s int32) DialOption {
-       return func(o *dialOptions) {
-               o.copts.InitialWindowSize = s
-       }
-}
-
-// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
-// The lower bound for window size is 64K and any value smaller than that will be ignored.
-func WithInitialConnWindowSize(s int32) DialOption {
-       return func(o *dialOptions) {
-               o.copts.InitialConnWindowSize = s
-       }
-}
-
-// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
-func WithMaxMsgSize(s int) DialOption {
-       return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
-}
-
-// WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
-func WithDefaultCallOptions(cos ...CallOption) DialOption {
-       return func(o *dialOptions) {
-               o.callOptions = append(o.callOptions, cos...)
-       }
-}
-
-// WithCodec returns a DialOption which sets a codec for message marshaling and unmarshaling.
-func WithCodec(c Codec) DialOption {
-       return func(o *dialOptions) {
-               o.codec = c
-       }
-}
-
-// WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
-// compressor.
-func WithCompressor(cp Compressor) DialOption {
-       return func(o *dialOptions) {
-               o.cp = cp
-       }
-}
-
-// WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
-// message decompressor.
-func WithDecompressor(dc Decompressor) DialOption {
-       return func(o *dialOptions) {
-               o.dc = dc
-       }
-}
-
-// WithBalancer returns a DialOption which sets a load balancer.
-func WithBalancer(b Balancer) DialOption {
-       return func(o *dialOptions) {
-               o.balancer = b
-       }
-}
-
-// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
-func WithServiceConfig(c <-chan ServiceConfig) DialOption {
-       return func(o *dialOptions) {
-               o.scChan = c
-       }
-}
-
-// WithBackoffMaxDelay configures the dialer to use the provided maximum delay
-// when backing off after failed connection attempts.
-func WithBackoffMaxDelay(md time.Duration) DialOption {
-       return WithBackoffConfig(BackoffConfig{MaxDelay: md})
-}
-
-// WithBackoffConfig configures the dialer to use the provided backoff
-// parameters after connection failures.
-//
-// Use WithBackoffMaxDelay until more parameters on BackoffConfig are opened up
-// for use.
-func WithBackoffConfig(b BackoffConfig) DialOption {
-       // Set defaults to ensure that provided BackoffConfig is valid and
-       // unexported fields get default values.
-       setDefaults(&b)
-       return withBackoff(b)
-}
-
-// withBackoff sets the backoff strategy used for retries after a
-// failed connection attempt.
-//
-// This can be exported if arbitrary backoff strategies are allowed by gRPC.
-func withBackoff(bs backoffStrategy) DialOption {
-       return func(o *dialOptions) {
-               o.bs = bs
-       }
-}
-
-// WithBlock returns a DialOption which makes caller of Dial blocks until the underlying
-// connection is up. Without this, Dial returns immediately and connecting the server
-// happens in background.
-func WithBlock() DialOption {
-       return func(o *dialOptions) {
-               o.block = true
-       }
-}
-
-// WithInsecure returns a DialOption which disables transport security for this ClientConn.
-// Note that transport security is required unless WithInsecure is set.
-func WithInsecure() DialOption {
-       return func(o *dialOptions) {
-               o.insecure = true
-       }
-}
-
-// WithTransportCredentials returns a DialOption which configures a
-// connection level security credentials (e.g., TLS/SSL).
-func WithTransportCredentials(creds credentials.TransportCredentials) DialOption {
-       return func(o *dialOptions) {
-               o.copts.TransportCredentials = creds
-       }
-}
-
-// WithPerRPCCredentials returns a DialOption which sets
-// credentials and places auth state on each outbound RPC.
-func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
-       return func(o *dialOptions) {
-               o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
-       }
-}
-
-// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
-// initially. This is valid if and only if WithBlock() is present.
-// Deprecated: use DialContext and context.WithTimeout instead.
-func WithTimeout(d time.Duration) DialOption {
-       return func(o *dialOptions) {
-               o.timeout = d
-       }
-}
-
-// WithDialer returns a DialOption that specifies a function to use for dialing network addresses.
-// If FailOnNonTempDialError() is set to true, and an error is returned by f, gRPC checks the error's
-// Temporary() method to decide if it should try to reconnect to the network address.
-func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption {
-       return func(o *dialOptions) {
-               o.copts.Dialer = func(ctx context.Context, addr string) (net.Conn, error) {
-                       if deadline, ok := ctx.Deadline(); ok {
-                               return f(addr, deadline.Sub(time.Now()))
-                       }
-                       return f(addr, 0)
-               }
-       }
-}
-
-// WithStatsHandler returns a DialOption that specifies the stats handler
-// for all the RPCs and underlying network connections in this ClientConn.
-func WithStatsHandler(h stats.Handler) DialOption {
-       return func(o *dialOptions) {
-               o.copts.StatsHandler = h
-       }
-}
-
-// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
-// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
-// address and won't try to reconnect.
-// The default value of FailOnNonTempDialError is false.
-// This is an EXPERIMENTAL API.
-func FailOnNonTempDialError(f bool) DialOption {
-       return func(o *dialOptions) {
-               o.copts.FailOnNonTempDialError = f
-       }
-}
-
-// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
-func WithUserAgent(s string) DialOption {
-       return func(o *dialOptions) {
-               o.copts.UserAgent = s
-       }
-}
-
-// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
-func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
-       return func(o *dialOptions) {
-               o.copts.KeepaliveParams = kp
-       }
-}
-
-// WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs.
-func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption {
-       return func(o *dialOptions) {
-               o.unaryInt = f
-       }
-}
-
-// WithStreamInterceptor returns a DialOption that specifies the interceptor for streaming RPCs.
-func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
-       return func(o *dialOptions) {
-               o.streamInt = f
-       }
-}
-
-// WithAuthority returns a DialOption that specifies the value to be used as
-// the :authority pseudo-header. This value only works with WithInsecure and
-// has no effect if TransportCredentials are present.
-func WithAuthority(a string) DialOption {
-       return func(o *dialOptions) {
-               o.copts.Authority = a
-       }
-}
-
 // Dial creates a client connection to the given target.
 func Dial(target string, opts ...DialOption) (*ClientConn, error) {
        return DialContext(context.Background(), target, opts...)
 }
 
-// DialContext creates a client connection to the given target. ctx can be used to
-// cancel or expire the pending connection. Once this function returns, the
-// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
-// to terminate all the pending operations after this function returns.
+// DialContext creates a client connection to the given target. By default, it's
+// a non-blocking dial (the function won't wait for connections to be
+// established, and connecting happens in the background). To make it a blocking
+// dial, use WithBlock() dial option.
+//
+// In the non-blocking case, the ctx does not act against the connection. It
+// only controls the setup steps.
+//
+// In the blocking case, ctx can be used to cancel or expire the pending
+// connection. Once this function returns, the cancellation and expiration of
+// ctx will be noop. Users should call ClientConn.Close to terminate all the
+// pending operations after this function returns.
+//
+// The target name syntax is defined in
+// https://github.com/grpc/grpc/blob/master/doc/naming.md.
+// e.g. to use dns resolver, a "dns:///" prefix should be applied to the target.
 func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
        cc := &ClientConn{
-               target: target,
-               csMgr:  &connectivityStateManager{},
-               conns:  make(map[Address]*addrConn),
-       }
-       cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr}
+               target:            target,
+               csMgr:             &connectivityStateManager{},
+               conns:             make(map[*addrConn]struct{}),
+               dopts:             defaultDialOptions(),
+               blockingpicker:    newPickerWrapper(),
+               czData:            new(channelzData),
+               firstResolveEvent: grpcsync.NewEvent(),
+       }
+       cc.retryThrottler.Store((*retryThrottler)(nil))
        cc.ctx, cc.cancel = context.WithCancel(context.Background())
 
        for _, opt := range opts {
-               opt(&cc.dopts)
+               opt.apply(&cc.dopts)
+       }
+
+       if channelz.IsOn() {
+               if cc.dopts.channelzParentID != 0 {
+                       cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target)
+                       channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
+                               Desc:     "Channel Created",
+                               Severity: channelz.CtINFO,
+                               Parent: &channelz.TraceEventDesc{
+                                       Desc:     fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID),
+                                       Severity: channelz.CtINFO,
+                               },
+                       })
+               } else {
+                       cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target)
+                       channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
+                               Desc:     "Channel Created",
+                               Severity: channelz.CtINFO,
+                       })
+               }
+               cc.csMgr.channelzID = cc.channelzID
+       }
+
+       if !cc.dopts.insecure {
+               if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil {
+                       return nil, errNoTransportSecurity
+               }
+               if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil {
+                       return nil, errTransportCredsAndBundle
+               }
+       } else {
+               if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil {
+                       return nil, errCredentialsConflict
+               }
+               for _, cd := range cc.dopts.copts.PerRPCCredentials {
+                       if cd.RequireTransportSecurity() {
+                               return nil, errTransportCredentialsMissing
+                       }
+               }
        }
+
        cc.mkp = cc.dopts.copts.KeepaliveParams
 
        if cc.dopts.copts.Dialer == nil {
                cc.dopts.copts.Dialer = newProxyDialer(
                        func(ctx context.Context, addr string) (net.Conn, error) {
-                               return dialContext(ctx, "tcp", addr)
+                               network, addr := parseDialTarget(addr)
+                               return (&net.Dialer{}).DialContext(ctx, network, addr)
                        },
                )
        }
@@ -367,66 +226,41 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
                default:
                }
        }
-       // Set defaults.
-       if cc.dopts.codec == nil {
-               cc.dopts.codec = protoCodec{}
-       }
        if cc.dopts.bs == nil {
-               cc.dopts.bs = DefaultBackoffConfig
+               cc.dopts.bs = backoff.Exponential{
+                       MaxDelay: DefaultBackoffConfig.MaxDelay,
+               }
+       }
+       if cc.dopts.resolverBuilder == nil {
+               // Only try to parse target when resolver builder is not already set.
+               cc.parsedTarget = parseTarget(cc.target)
+               grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme)
+               cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
+               if cc.dopts.resolverBuilder == nil {
+                       // If resolver builder is still nil, the parse target's scheme is
+                       // not registered. Fallback to default resolver and set Endpoint to
+                       // the original unparsed target.
+                       grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme)
+                       cc.parsedTarget = resolver.Target{
+                               Scheme:   resolver.GetDefaultScheme(),
+                               Endpoint: target,
+                       }
+                       cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)
+               }
+       } else {
+               cc.parsedTarget = resolver.Target{Endpoint: target}
        }
        creds := cc.dopts.copts.TransportCredentials
        if creds != nil && creds.Info().ServerName != "" {
                cc.authority = creds.Info().ServerName
-       } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
-               cc.authority = cc.dopts.copts.Authority
+       } else if cc.dopts.insecure && cc.dopts.authority != "" {
+               cc.authority = cc.dopts.authority
        } else {
-               cc.authority = target
-       }
-       waitC := make(chan error, 1)
-       go func() {
-               defer close(waitC)
-               if cc.dopts.balancer == nil && cc.sc.LB != nil {
-                       cc.dopts.balancer = cc.sc.LB
-               }
-               if cc.dopts.balancer != nil {
-                       var credsClone credentials.TransportCredentials
-                       if creds != nil {
-                               credsClone = creds.Clone()
-                       }
-                       config := BalancerConfig{
-                               DialCreds: credsClone,
-                               Dialer:    cc.dopts.copts.Dialer,
-                       }
-                       if err := cc.dopts.balancer.Start(target, config); err != nil {
-                               waitC <- err
-                               return
-                       }
-                       ch := cc.dopts.balancer.Notify()
-                       if ch != nil {
-                               if cc.dopts.block {
-                                       doneChan := make(chan struct{})
-                                       go cc.lbWatcher(doneChan)
-                                       <-doneChan
-                               } else {
-                                       go cc.lbWatcher(nil)
-                               }
-                               return
-                       }
-               }
-               // No balancer, or no resolver within the balancer.  Connect directly.
-               if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil {
-                       waitC <- err
-                       return
-               }
-       }()
-       select {
-       case <-ctx.Done():
-               return nil, ctx.Err()
-       case err := <-waitC:
-               if err != nil {
-                       return nil, err
-               }
+               // Use endpoint from "scheme://authority/endpoint" as the default
+               // authority for ClientConn.
+               cc.authority = cc.parsedTarget.Endpoint
        }
+
        if cc.dopts.scChan != nil && !scSet {
                // Blocking wait for the initial service config.
                select {
@@ -442,55 +276,50 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
                go cc.scWatcher()
        }
 
-       return cc, nil
-}
+       var credsClone credentials.TransportCredentials
+       if creds := cc.dopts.copts.TransportCredentials; creds != nil {
+               credsClone = creds.Clone()
+       }
+       cc.balancerBuildOpts = balancer.BuildOptions{
+               DialCreds:        credsClone,
+               CredsBundle:      cc.dopts.copts.CredsBundle,
+               Dialer:           cc.dopts.copts.Dialer,
+               ChannelzParentID: cc.channelzID,
+       }
 
-// connectivityStateEvaluator gets updated by addrConns when their
-// states transition, based on which it evaluates the state of
-// ClientConn.
-// Note: This code will eventually sit in the balancer in the new design.
-type connectivityStateEvaluator struct {
-       csMgr               *connectivityStateManager
-       mu                  sync.Mutex
-       numReady            uint64 // Number of addrConns in ready state.
-       numConnecting       uint64 // Number of addrConns in connecting state.
-       numTransientFailure uint64 // Number of addrConns in transientFailure.
-}
+       // Build the resolver.
+       rWrapper, err := newCCResolverWrapper(cc)
+       if err != nil {
+               return nil, fmt.Errorf("failed to build resolver: %v", err)
+       }
 
-// recordTransition records state change happening in every addrConn and based on
-// that it evaluates what state the ClientConn is in.
-// It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states,
-// Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection
-// before any addrConn is created ClientConn is in idle state. In the end when ClientConn
-// closes it is in connectivity.Shutdown state.
-// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
-func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) {
-       cse.mu.Lock()
-       defer cse.mu.Unlock()
-
-       // Update counters.
-       for idx, state := range []connectivity.State{oldState, newState} {
-               updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
-               switch state {
-               case connectivity.Ready:
-                       cse.numReady += updateVal
-               case connectivity.Connecting:
-                       cse.numConnecting += updateVal
-               case connectivity.TransientFailure:
-                       cse.numTransientFailure += updateVal
+       cc.mu.Lock()
+       cc.resolverWrapper = rWrapper
+       cc.mu.Unlock()
+       // A blocking dial blocks until the clientConn is ready.
+       if cc.dopts.block {
+               for {
+                       s := cc.GetState()
+                       if s == connectivity.Ready {
+                               break
+                       } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {
+                               if err = cc.blockingpicker.connectionError(); err != nil {
+                                       terr, ok := err.(interface {
+                                               Temporary() bool
+                                       })
+                                       if ok && !terr.Temporary() {
+                                               return nil, err
+                                       }
+                               }
+                       }
+                       if !cc.WaitForStateChange(ctx, s) {
+                               // ctx got timeout or canceled.
+                               return nil, ctx.Err()
+                       }
                }
        }
 
-       // Evaluate.
-       if cse.numReady > 0 {
-               cse.csMgr.updateState(connectivity.Ready)
-               return
-       }
-       if cse.numConnecting > 0 {
-               cse.csMgr.updateState(connectivity.Connecting)
-               return
-       }
-       cse.csMgr.updateState(connectivity.TransientFailure)
+       return cc, nil
 }
 
 // connectivityStateManager keeps the connectivity.State of ClientConn.
@@ -499,6 +328,7 @@ type connectivityStateManager struct {
        mu         sync.Mutex
        state      connectivity.State
        notifyChan chan struct{}
+       channelzID int64
 }
 
 // updateState updates the connectivity.State of ClientConn.
@@ -514,6 +344,12 @@ func (csm *connectivityStateManager) updateState(state connectivity.State) {
                return
        }
        csm.state = state
+       if channelz.IsOn() {
+               channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{
+                       Desc:     fmt.Sprintf("Channel Connectivity change to %v", state),
+                       Severity: channelz.CtINFO,
+               })
+       }
        if csm.notifyChan != nil {
                // There are other goroutines waiting on this channel.
                close(csm.notifyChan)
@@ -541,17 +377,32 @@ type ClientConn struct {
        ctx    context.Context
        cancel context.CancelFunc
 
-       target    string
-       authority string
-       dopts     dialOptions
-       csMgr     *connectivityStateManager
-       csEvltr   *connectivityStateEvaluator // This will eventually be part of balancer.
+       target       string
+       parsedTarget resolver.Target
+       authority    string
+       dopts        dialOptions
+       csMgr        *connectivityStateManager
+
+       balancerBuildOpts balancer.BuildOptions
+       blockingpicker    *pickerWrapper
 
-       mu    sync.RWMutex
-       sc    ServiceConfig
-       conns map[Address]*addrConn
+       mu              sync.RWMutex
+       resolverWrapper *ccResolverWrapper
+       sc              ServiceConfig
+       scRaw           string
+       conns           map[*addrConn]struct{}
        // Keepalive parameter can be updated if a GoAway is received.
-       mkp keepalive.ClientParameters
+       mkp             keepalive.ClientParameters
+       curBalancerName string
+       preBalancerName string // previous balancer name.
+       curAddresses    []resolver.Address
+       balancerWrapper *ccBalancerWrapper
+       retryThrottler  atomic.Value
+
+       firstResolveEvent *grpcsync.Event
+
+       channelzID int64 // channelz unique identification number
+       czData     *channelzData
 }
 
 // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
@@ -576,65 +427,6 @@ func (cc *ClientConn) GetState() connectivity.State {
        return cc.csMgr.getState()
 }
 
-// lbWatcher watches the Notify channel of the balancer in cc and manages
-// connections accordingly.  If doneChan is not nil, it is closed after the
-// first successfull connection is made.
-func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
-       defer func() {
-               // In case channel from cc.dopts.balancer.Notify() gets closed before a
-               // successful connection gets established, don't forget to notify the
-               // caller.
-               if doneChan != nil {
-                       close(doneChan)
-               }
-       }()
-
-       for addrs := range cc.dopts.balancer.Notify() {
-               var (
-                       add []Address   // Addresses need to setup connections.
-                       del []*addrConn // Connections need to tear down.
-               )
-               cc.mu.Lock()
-               for _, a := range addrs {
-                       if _, ok := cc.conns[a]; !ok {
-                               add = append(add, a)
-                       }
-               }
-               for k, c := range cc.conns {
-                       var keep bool
-                       for _, a := range addrs {
-                               if k == a {
-                                       keep = true
-                                       break
-                               }
-                       }
-                       if !keep {
-                               del = append(del, c)
-                               delete(cc.conns, c.addr)
-                       }
-               }
-               cc.mu.Unlock()
-               for _, a := range add {
-                       var err error
-                       if doneChan != nil {
-                               err = cc.resetAddrConn(a, true, nil)
-                               if err == nil {
-                                       close(doneChan)
-                                       doneChan = nil
-                               }
-                       } else {
-                               err = cc.resetAddrConn(a, false, nil)
-                       }
-                       if err != nil {
-                               grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
-                       }
-               }
-               for _, c := range del {
-                       c.tearDown(errConnDrain)
-               }
-       }
-}
-
 func (cc *ClientConn) scWatcher() {
        for {
                select {
@@ -646,6 +438,7 @@ func (cc *ClientConn) scWatcher() {
                        // TODO: load balance policy runtime change is ignored.
                        // We may revist this decision in the future.
                        cc.sc = sc
+                       cc.scRaw = ""
                        cc.mu.Unlock()
                case <-cc.ctx.Done():
                        return
@@ -653,169 +446,411 @@ func (cc *ClientConn) scWatcher() {
        }
 }
 
-// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
-// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
-// If tearDownErr is nil, errConnDrain will be used instead.
-//
-// We should never need to replace an addrConn with a new one. This function is only used
-// as newAddrConn to create new addrConn.
-// TODO rename this function and clean up the code.
-func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
-       ac := &addrConn{
-               cc:    cc,
-               addr:  addr,
-               dopts: cc.dopts,
-       }
-       ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
-       ac.csEvltr = cc.csEvltr
-       if EnableTracing {
-               ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
+// waitForResolvedAddrs blocks until the resolver has provided addresses or the
+// context expires.  Returns nil unless the context expires first; otherwise
+// returns a status error based on the context.
+func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error {
+       // This is on the RPC path, so we use a fast path to avoid the
+       // more-expensive "select" below after the resolver has returned once.
+       if cc.firstResolveEvent.HasFired() {
+               return nil
        }
-       if !ac.dopts.insecure {
-               if ac.dopts.copts.TransportCredentials == nil {
-                       return errNoTransportSecurity
-               }
-       } else {
-               if ac.dopts.copts.TransportCredentials != nil {
-                       return errCredentialsConflict
-               }
-               for _, cd := range ac.dopts.copts.PerRPCCredentials {
-                       if cd.RequireTransportSecurity() {
-                               return errTransportCredentialsMissing
-                       }
-               }
+       select {
+       case <-cc.firstResolveEvent.Done():
+               return nil
+       case <-ctx.Done():
+               return status.FromContextError(ctx.Err()).Err()
+       case <-cc.ctx.Done():
+               return ErrClientConnClosing
        }
-       // Track ac in cc. This needs to be done before any getTransport(...) is called.
+}
+
+func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
        cc.mu.Lock()
+       defer cc.mu.Unlock()
        if cc.conns == nil {
-               cc.mu.Unlock()
-               return ErrClientConnClosing
+               // cc was closed.
+               return
        }
-       stale := cc.conns[ac.addr]
-       cc.conns[ac.addr] = ac
-       cc.mu.Unlock()
-       if stale != nil {
-               // There is an addrConn alive on ac.addr already. This could be due to
-               // a buggy Balancer that reports duplicated Addresses.
-               if tearDownErr == nil {
-                       // tearDownErr is nil if resetAddrConn is called by
-                       // 1) Dial
-                       // 2) lbWatcher
-                       // In both cases, the stale ac should drain, not close.
-                       stale.tearDown(errConnDrain)
-               } else {
-                       stale.tearDown(tearDownErr)
-               }
+
+       if reflect.DeepEqual(cc.curAddresses, addrs) {
+               return
        }
-       if block {
-               if err := ac.resetTransport(false); err != nil {
-                       if err != errConnClosing {
-                               // Tear down ac and delete it from cc.conns.
-                               cc.mu.Lock()
-                               delete(cc.conns, ac.addr)
-                               cc.mu.Unlock()
-                               ac.tearDown(err)
-                       }
-                       if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
-                               return e.Origin()
+
+       cc.curAddresses = addrs
+       cc.firstResolveEvent.Fire()
+
+       if cc.dopts.balancerBuilder == nil {
+               // Only look at balancer types and switch balancer if balancer dial
+               // option is not set.
+               var isGRPCLB bool
+               for _, a := range addrs {
+                       if a.Type == resolver.GRPCLB {
+                               isGRPCLB = true
+                               break
                        }
-                       return err
                }
-               // Start to monitor the error status of transport.
-               go ac.transportMonitor()
-       } else {
-               // Start a goroutine connecting to the server asynchronously.
-               go func() {
-                       if err := ac.resetTransport(false); err != nil {
-                               grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
-                               if err != errConnClosing {
-                                       // Keep this ac in cc.conns, to get the reason it's torn down.
-                                       ac.tearDown(err)
-                               }
-                               return
+               var newBalancerName string
+               if isGRPCLB {
+                       newBalancerName = grpclbName
+               } else {
+                       // Address list doesn't contain grpclb address. Try to pick a
+                       // non-grpclb balancer.
+                       newBalancerName = cc.curBalancerName
+                       // If current balancer is grpclb, switch to the previous one.
+                       if newBalancerName == grpclbName {
+                               newBalancerName = cc.preBalancerName
                        }
-                       ac.transportMonitor()
-               }()
+                       // The following could be true in two cases:
+                       // - the first time handling resolved addresses
+                       //   (curBalancerName="")
+                       // - the first time handling non-grpclb addresses
+                       //   (curBalancerName="grpclb", preBalancerName="")
+                       if newBalancerName == "" {
+                               newBalancerName = PickFirstBalancerName
+                       }
+               }
+               cc.switchBalancer(newBalancerName)
+       } else if cc.balancerWrapper == nil {
+               // Balancer dial option was set, and this is the first time handling
+               // resolved addresses. Build a balancer with dopts.balancerBuilder.
+               cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
        }
-       return nil
+
+       cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
 }
 
-// GetMethodConfig gets the method config of the input method.
-// If there's an exact match for input method (i.e. /service/method), we return
-// the corresponding MethodConfig.
-// If there isn't an exact match for the input method, we look for the default config
-// under the service (i.e /service/). If there is a default MethodConfig for
-// the serivce, we return it.
-// Otherwise, we return an empty MethodConfig.
-func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
-       // TODO: Avoid the locking here.
-       cc.mu.RLock()
-       defer cc.mu.RUnlock()
-       m, ok := cc.sc.Methods[method]
-       if !ok {
-               i := strings.LastIndex(method, "/")
-               m, _ = cc.sc.Methods[method[:i+1]]
+// switchBalancer starts the switching from current balancer to the balancer
+// with the given name.
+//
+// It will NOT send the current address list to the new balancer. If needed,
+// caller of this function should send address list to the new balancer after
+// this function returns.
+//
+// Caller must hold cc.mu.
+func (cc *ClientConn) switchBalancer(name string) {
+       if cc.conns == nil {
+               return
        }
-       return m
-}
 
-func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
-       var (
-               ac  *addrConn
-               ok  bool
-               put func()
-       )
-       if cc.dopts.balancer == nil {
-               // If balancer is nil, there should be only one addrConn available.
-               cc.mu.RLock()
-               if cc.conns == nil {
-                       cc.mu.RUnlock()
-                       return nil, nil, toRPCErr(ErrClientConnClosing)
-               }
-               for _, ac = range cc.conns {
-                       // Break after the first iteration to get the first addrConn.
-                       ok = true
-                       break
-               }
-               cc.mu.RUnlock()
-       } else {
-               var (
-                       addr Address
-                       err  error
-               )
-               addr, put, err = cc.dopts.balancer.Get(ctx, opts)
-               if err != nil {
-                       return nil, nil, toRPCErr(err)
-               }
-               cc.mu.RLock()
-               if cc.conns == nil {
-                       cc.mu.RUnlock()
-                       return nil, nil, toRPCErr(ErrClientConnClosing)
-               }
-               ac, ok = cc.conns[addr]
-               cc.mu.RUnlock()
+       if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) {
+               return
        }
-       if !ok {
-               if put != nil {
-                       updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
-                       put()
-               }
-               return nil, nil, errConnClosing
+
+       grpclog.Infof("ClientConn switching balancer to %q", name)
+       if cc.dopts.balancerBuilder != nil {
+               grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead")
+               return
        }
-       t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
+       // TODO(bar switching) change this to two steps: drain and close.
+       // Keep track of sc in wrapper.
+       if cc.balancerWrapper != nil {
+               cc.balancerWrapper.close()
+       }
+
+       builder := balancer.Get(name)
+       // TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should
+       // we reuse previous one?
+       if channelz.IsOn() {
+               if builder == nil {
+                       channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
+                               Desc:     fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName),
+                               Severity: channelz.CtWarning,
+                       })
+               } else {
+                       channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
+                               Desc:     fmt.Sprintf("Channel switches to new LB policy %q", name),
+                               Severity: channelz.CtINFO,
+                       })
+               }
+       }
+       if builder == nil {
+               grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name)
+               builder = newPickfirstBuilder()
+       }
+
+       cc.preBalancerName = cc.curBalancerName
+       cc.curBalancerName = builder.Name()
+       cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
+}
+
+func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
+       cc.mu.Lock()
+       if cc.conns == nil {
+               cc.mu.Unlock()
+               return
+       }
+       // TODO(bar switching) send updates to all balancer wrappers when balancer
+       // gracefully switching is supported.
+       cc.balancerWrapper.handleSubConnStateChange(sc, s)
+       cc.mu.Unlock()
+}
+
+// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
+//
+// Caller needs to make sure len(addrs) > 0.
+func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
+       ac := &addrConn{
+               cc:           cc,
+               addrs:        addrs,
+               scopts:       opts,
+               dopts:        cc.dopts,
+               czData:       new(channelzData),
+               resetBackoff: make(chan struct{}),
+       }
+       ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
+       // Track ac in cc. This needs to be done before any getTransport(...) is called.
+       cc.mu.Lock()
+       if cc.conns == nil {
+               cc.mu.Unlock()
+               return nil, ErrClientConnClosing
+       }
+       if channelz.IsOn() {
+               ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "")
+               channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+                       Desc:     "Subchannel Created",
+                       Severity: channelz.CtINFO,
+                       Parent: &channelz.TraceEventDesc{
+                               Desc:     fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID),
+                               Severity: channelz.CtINFO,
+                       },
+               })
+       }
+       cc.conns[ac] = struct{}{}
+       cc.mu.Unlock()
+       return ac, nil
+}
+
+// removeAddrConn removes the addrConn in the subConn from clientConn.
+// It also tears down the ac with the given error.
+func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
+       cc.mu.Lock()
+       if cc.conns == nil {
+               cc.mu.Unlock()
+               return
+       }
+       delete(cc.conns, ac)
+       cc.mu.Unlock()
+       ac.tearDown(err)
+}
+
+func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric {
+       return &channelz.ChannelInternalMetric{
+               State:                    cc.GetState(),
+               Target:                   cc.target,
+               CallsStarted:             atomic.LoadInt64(&cc.czData.callsStarted),
+               CallsSucceeded:           atomic.LoadInt64(&cc.czData.callsSucceeded),
+               CallsFailed:              atomic.LoadInt64(&cc.czData.callsFailed),
+               LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)),
+       }
+}
+
+// Target returns the target string of the ClientConn.
+// This is an EXPERIMENTAL API.
+func (cc *ClientConn) Target() string {
+       return cc.target
+}
+
+func (cc *ClientConn) incrCallsStarted() {
+       atomic.AddInt64(&cc.czData.callsStarted, 1)
+       atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano())
+}
+
+func (cc *ClientConn) incrCallsSucceeded() {
+       atomic.AddInt64(&cc.czData.callsSucceeded, 1)
+}
+
+func (cc *ClientConn) incrCallsFailed() {
+       atomic.AddInt64(&cc.czData.callsFailed, 1)
+}
+
+// connect starts creating a transport.
+// It does nothing if the ac is not IDLE.
+// TODO(bar) Move this to the addrConn section.
+func (ac *addrConn) connect() error {
+       ac.mu.Lock()
+       if ac.state == connectivity.Shutdown {
+               ac.mu.Unlock()
+               return errConnClosing
+       }
+       if ac.state != connectivity.Idle {
+               ac.mu.Unlock()
+               return nil
+       }
+       ac.updateConnectivityState(connectivity.Connecting)
+       ac.mu.Unlock()
+
+       // Start a goroutine connecting to the server asynchronously.
+       go ac.resetTransport()
+       return nil
+}
+
+// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
+//
+// It checks whether current connected address of ac is in the new addrs list.
+//  - If true, it updates ac.addrs and returns true. The ac will keep using
+//    the existing connection.
+//  - If false, it does nothing and returns false.
+func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
+       ac.mu.Lock()
+       defer ac.mu.Unlock()
+       grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
+       if ac.state == connectivity.Shutdown {
+               ac.addrs = addrs
+               return true
+       }
+
+       // Unless we're busy reconnecting already, let's reconnect from the top of
+       // the list.
+       if ac.state != connectivity.Ready {
+               return false
+       }
+
+       var curAddrFound bool
+       for _, a := range addrs {
+               if reflect.DeepEqual(ac.curAddr, a) {
+                       curAddrFound = true
+                       break
+               }
+       }
+       grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
+       if curAddrFound {
+               ac.addrs = addrs
+       }
+
+       return curAddrFound
+}
+
+// GetMethodConfig gets the method config of the input method.
+// If there's an exact match for input method (i.e. /service/method), we return
+// the corresponding MethodConfig.
+// If there isn't an exact match for the input method, we look for the default config
+// under the service (i.e /service/). If there is a default MethodConfig for
+// the service, we return it.
+// Otherwise, we return an empty MethodConfig.
+func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
+       // TODO: Avoid the locking here.
+       cc.mu.RLock()
+       defer cc.mu.RUnlock()
+       m, ok := cc.sc.Methods[method]
+       if !ok {
+               i := strings.LastIndex(method, "/")
+               m = cc.sc.Methods[method[:i+1]]
+       }
+       return m
+}
+
+func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
+       cc.mu.RLock()
+       defer cc.mu.RUnlock()
+       return cc.sc.healthCheckConfig
+}
+
+func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) {
+       hdr, _ := metadata.FromOutgoingContext(ctx)
+       t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{
+               FullMethodName: method,
+               Header:         hdr,
+       })
        if err != nil {
-               if put != nil {
-                       updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
-                       put()
+               return nil, nil, toRPCErr(err)
+       }
+       return t, done, nil
+}
+
+// handleServiceConfig parses the service config string in JSON format to Go native
+// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
+func (cc *ClientConn) handleServiceConfig(js string) error {
+       if cc.dopts.disableServiceConfig {
+               return nil
+       }
+       if cc.scRaw == js {
+               return nil
+       }
+       if channelz.IsOn() {
+               channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{
+                       // The special formatting of \"%s\" instead of %q is to provide nice printing of service config
+                       // for human consumption.
+                       Desc:     fmt.Sprintf("Channel has a new service config \"%s\"", js),
+                       Severity: channelz.CtINFO,
+               })
+       }
+       sc, err := parseServiceConfig(js)
+       if err != nil {
+               return err
+       }
+       cc.mu.Lock()
+       // Check if the ClientConn is already closed. Some fields (e.g.
+       // balancerWrapper) are set to nil when closing the ClientConn, and could
+       // cause nil pointer panic if we don't have this check.
+       if cc.conns == nil {
+               cc.mu.Unlock()
+               return nil
+       }
+       cc.scRaw = js
+       cc.sc = sc
+
+       if sc.retryThrottling != nil {
+               newThrottler := &retryThrottler{
+                       tokens: sc.retryThrottling.MaxTokens,
+                       max:    sc.retryThrottling.MaxTokens,
+                       thresh: sc.retryThrottling.MaxTokens / 2,
+                       ratio:  sc.retryThrottling.TokenRatio,
+               }
+               cc.retryThrottler.Store(newThrottler)
+       } else {
+               cc.retryThrottler.Store((*retryThrottler)(nil))
+       }
+
+       if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config.
+               if cc.curBalancerName == grpclbName {
+                       // If current balancer is grpclb, there's at least one grpclb
+                       // balancer address in the resolved list. Don't switch the balancer,
+                       // but change the previous balancer name, so if a new resolved
+                       // address list doesn't contain grpclb address, balancer will be
+                       // switched to *sc.LB.
+                       cc.preBalancerName = *sc.LB
+               } else {
+                       cc.switchBalancer(*sc.LB)
+                       cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
                }
-               return nil, nil, err
        }
-       return t, put, nil
+
+       cc.mu.Unlock()
+       return nil
+}
+
+func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) {
+       cc.mu.RLock()
+       r := cc.resolverWrapper
+       cc.mu.RUnlock()
+       if r == nil {
+               return
+       }
+       go r.resolveNow(o)
+}
+
+// ResetConnectBackoff wakes up all subchannels in transient failure and causes
+// them to attempt another connection immediately.  It also resets the backoff
+// times used for subsequent attempts regardless of the current state.
+//
+// In general, this function should not be used.  Typical service or network
+// outages result in a reasonable client reconnection strategy by default.
+// However, if a previously unavailable network becomes available, this may be
+// used to trigger an immediate reconnect.
+//
+// This API is EXPERIMENTAL.
+func (cc *ClientConn) ResetConnectBackoff() {
+       cc.mu.Lock()
+       defer cc.mu.Unlock()
+       for ac := range cc.conns {
+               ac.resetConnectBackoff()
+       }
 }
 
 // Close tears down the ClientConn and all underlying connections.
 func (cc *ClientConn) Close() error {
-       cc.cancel()
+       defer cc.cancel()
 
        cc.mu.Lock()
        if cc.conns == nil {
@@ -825,13 +860,41 @@ func (cc *ClientConn) Close() error {
        conns := cc.conns
        cc.conns = nil
        cc.csMgr.updateState(connectivity.Shutdown)
+
+       rWrapper := cc.resolverWrapper
+       cc.resolverWrapper = nil
+       bWrapper := cc.balancerWrapper
+       cc.balancerWrapper = nil
        cc.mu.Unlock()
-       if cc.dopts.balancer != nil {
-               cc.dopts.balancer.Close()
+
+       cc.blockingpicker.close()
+
+       if rWrapper != nil {
+               rWrapper.close()
+       }
+       if bWrapper != nil {
+               bWrapper.close()
        }
-       for _, ac := range conns {
+
+       for ac := range conns {
                ac.tearDown(ErrClientConnClosing)
        }
+       if channelz.IsOn() {
+               ted := &channelz.TraceEventDesc{
+                       Desc:     "Channel Deleted",
+                       Severity: channelz.CtINFO,
+               }
+               if cc.dopts.channelzParentID != 0 {
+                       ted.Parent = &channelz.TraceEventDesc{
+                               Desc:     fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID),
+                               Severity: channelz.CtINFO,
+                       }
+               }
+               channelz.AddTraceEvent(cc.channelzID, ted)
+               // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
+               // the entity beng deleted, and thus prevent it from being deleted right away.
+               channelz.RemoveEntry(cc.channelzID)
+       }
        return nil
 }
 
@@ -841,29 +904,56 @@ type addrConn struct {
        cancel context.CancelFunc
 
        cc     *ClientConn
-       addr   Address
        dopts  dialOptions
-       events trace.EventLog
+       acbw   balancer.SubConn
+       scopts balancer.NewSubConnOptions
+
+       // transport is set when there's a viable transport (note: ac state may not be READY as LB channel
+       // health checking may require server to report healthy to set ac to READY), and is reset
+       // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway
+       // is received, transport is closed, ac has been torn down).
+       transport transport.ClientTransport // The current transport.
 
-       csEvltr *connectivityStateEvaluator
+       mu      sync.Mutex
+       curAddr resolver.Address   // The current address.
+       addrs   []resolver.Address // All addresses that the resolver resolved to.
 
-       mu    sync.Mutex
+       // Use updateConnectivityState for updating addrConn's connectivity state.
        state connectivity.State
-       down  func(error) // the handler called when a connection is down.
-       // ready is closed and becomes nil when a new transport is up or failed
-       // due to timeout.
-       ready     chan struct{}
-       transport transport.ClientTransport
-
-       // The reason this addrConn is torn down.
-       tearDownErr error
+
+       tearDownErr error // The reason this addrConn is torn down.
+
+       backoffIdx   int // Needs to be stateful for resetConnectBackoff.
+       resetBackoff chan struct{}
+
+       channelzID         int64 // channelz unique identification number.
+       czData             *channelzData
+       healthCheckEnabled bool
+}
+
+// Note: this requires a lock on ac.mu.
+func (ac *addrConn) updateConnectivityState(s connectivity.State) {
+       if ac.state == s {
+               return
+       }
+
+       updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
+       grpclog.Infof(updateMsg)
+       ac.state = s
+       if channelz.IsOn() {
+               channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+                       Desc:     updateMsg,
+                       Severity: channelz.CtINFO,
+               })
+       }
+       ac.cc.handleSubConnStateChange(ac.acbw, s)
 }
 
 // adjustParams updates parameters used to create transports upon
 // receiving a GoAway.
 func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
        switch r {
-       case transport.TooManyPings:
+       case transport.GoAwayTooManyPings:
                v := 2 * ac.dopts.copts.KeepaliveParams.Time
                ac.cc.mu.Lock()
                if v > ac.cc.mkp.Time {
@@ -873,246 +963,359 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
        }
 }
 
-// printf records an event in ac's event log, unless ac has been closed.
-// REQUIRES ac.mu is held.
-func (ac *addrConn) printf(format string, a ...interface{}) {
-       if ac.events != nil {
-               ac.events.Printf(format, a...)
-       }
-}
+func (ac *addrConn) resetTransport() {
+       for i := 0; ; i++ {
+               tryNextAddrFromStart := grpcsync.NewEvent()
 
-// errorf records an error in ac's event log, unless ac has been closed.
-// REQUIRES ac.mu is held.
-func (ac *addrConn) errorf(format string, a ...interface{}) {
-       if ac.events != nil {
-               ac.events.Errorf(format, a...)
-       }
-}
-
-// resetTransport recreates a transport to the address for ac.
-// For the old transport:
-// - if drain is true, it will be gracefully closed.
-// - otherwise, it will be closed.
-func (ac *addrConn) resetTransport(drain bool) error {
-       ac.mu.Lock()
-       if ac.state == connectivity.Shutdown {
-               ac.mu.Unlock()
-               return errConnClosing
-       }
-       ac.printf("connecting")
-       if ac.down != nil {
-               ac.down(downErrorf(false, true, "%v", errNetworkIO))
-               ac.down = nil
-       }
-       oldState := ac.state
-       ac.state = connectivity.Connecting
-       ac.csEvltr.recordTransition(oldState, ac.state)
-       t := ac.transport
-       ac.transport = nil
-       ac.mu.Unlock()
-       if t != nil && !drain {
-               t.Close()
-       }
-       ac.cc.mu.RLock()
-       ac.dopts.copts.KeepaliveParams = ac.cc.mkp
-       ac.cc.mu.RUnlock()
-       for retries := 0; ; retries++ {
                ac.mu.Lock()
-               if ac.state == connectivity.Shutdown {
-                       // ac.tearDown(...) has been invoked.
-                       ac.mu.Unlock()
-                       return errConnClosing
+               if i > 0 {
+                       ac.cc.resolveNow(resolver.ResolveNowOption{})
                }
+               addrs := ac.addrs
+               backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
                ac.mu.Unlock()
-               sleepTime := ac.dopts.bs.backoff(retries)
-               timeout := minConnectTimeout
-               if timeout < sleepTime {
-                       timeout = sleepTime
-               }
-               ctx, cancel := context.WithTimeout(ac.ctx, timeout)
-               connectTime := time.Now()
-               sinfo := transport.TargetInfo{
-                       Addr:     ac.addr.Addr,
-                       Metadata: ac.addr.Metadata,
-               }
-               newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
-               // Don't call cancel in success path due to a race in Go 1.6:
-               // https://github.com/golang/go/issues/15078.
-               if err != nil {
-                       cancel()
 
-                       if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
-                               return err
+       addrLoop:
+               for _, addr := range addrs {
+                       ac.mu.Lock()
+
+                       if ac.state == connectivity.Shutdown {
+                               ac.mu.Unlock()
+                               return
                        }
-                       grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
+                       ac.updateConnectivityState(connectivity.Connecting)
+                       ac.transport = nil
+                       ac.mu.Unlock()
+
+                       // This will be the duration that dial gets to finish.
+                       dialDuration := getMinConnectTimeout()
+                       if dialDuration < backoffFor {
+                               // Give dial more time as we keep failing to connect.
+                               dialDuration = backoffFor
+                       }
+                       connectDeadline := time.Now().Add(dialDuration)
+
                        ac.mu.Lock()
+                       ac.cc.mu.RLock()
+                       ac.dopts.copts.KeepaliveParams = ac.cc.mkp
+                       ac.cc.mu.RUnlock()
+
                        if ac.state == connectivity.Shutdown {
-                               // ac.tearDown(...) has been invoked.
                                ac.mu.Unlock()
-                               return errConnClosing
+                               return
                        }
-                       ac.errorf("transient failure: %v", err)
-                       oldState = ac.state
-                       ac.state = connectivity.TransientFailure
-                       ac.csEvltr.recordTransition(oldState, ac.state)
-                       if ac.ready != nil {
-                               close(ac.ready)
-                               ac.ready = nil
+
+                       copts := ac.dopts.copts
+                       if ac.scopts.CredsBundle != nil {
+                               copts.CredsBundle = ac.scopts.CredsBundle
                        }
+                       hctx, hcancel := context.WithCancel(ac.ctx)
+                       defer hcancel()
                        ac.mu.Unlock()
-                       timer := time.NewTimer(sleepTime - time.Since(connectTime))
-                       select {
-                       case <-timer.C:
-                       case <-ac.ctx.Done():
-                               timer.Stop()
-                               return ac.ctx.Err()
+
+                       if channelz.IsOn() {
+                               channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+                                       Desc:     fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
+                                       Severity: channelz.CtINFO,
+                               })
+                       }
+
+                       reconnect := grpcsync.NewEvent()
+                       prefaceReceived := make(chan struct{})
+                       newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
+                       if err == nil {
+                               ac.mu.Lock()
+                               ac.curAddr = addr
+                               ac.transport = newTr
+                               ac.mu.Unlock()
+
+                               healthCheckConfig := ac.cc.healthCheckConfig()
+                               // LB channel health checking is only enabled when all the four requirements below are met:
+                               // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
+                               // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
+                               // 3. a service config with non-empty healthCheckConfig field is provided,
+                               // 4. the current load balancer allows it.
+                               healthcheckManagingState := false
+                               if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
+                                       if ac.cc.dopts.healthCheckFunc == nil {
+                                               // TODO: add a link to the health check doc in the error message.
+                                               grpclog.Error("the client side LB channel health check function has not been set.")
+                                       } else {
+                                               // TODO(deklerk) refactor to just return transport
+                                               go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
+                                               healthcheckManagingState = true
+                                       }
+                               }
+                               if !healthcheckManagingState {
+                                       ac.mu.Lock()
+                                       ac.updateConnectivityState(connectivity.Ready)
+                                       ac.mu.Unlock()
+                               }
+                       } else {
+                               hcancel()
+                               if err == errConnClosing {
+                                       return
+                               }
+
+                               if tryNextAddrFromStart.HasFired() {
+                                       break addrLoop
+                               }
+                               continue
+                       }
+
+                       ac.mu.Lock()
+                       reqHandshake := ac.dopts.reqHandshake
+                       ac.mu.Unlock()
+
+                       <-reconnect.Done()
+                       hcancel()
+
+                       if reqHandshake == envconfig.RequireHandshakeHybrid {
+                               // In RequireHandshakeHybrid mode, we must check to see whether
+                               // server preface has arrived yet to decide whether to start
+                               // reconnecting at the top of the list (server preface received)
+                               // or continue with the next addr in the list as if the
+                               // connection were not successful (server preface not received).
+                               select {
+                               case <-prefaceReceived:
+                                       // We received a server preface - huzzah! We consider this
+                                       // a success and restart from the top of the addr list.
+                                       ac.mu.Lock()
+                                       ac.backoffIdx = 0
+                                       ac.mu.Unlock()
+                                       break addrLoop
+                               default:
+                                       // Despite having set state to READY, in hybrid mode we
+                                       // consider this a failure and continue connecting at the
+                                       // next addr in the list.
+                                       ac.mu.Lock()
+                                       if ac.state == connectivity.Shutdown {
+                                               ac.mu.Unlock()
+                                               return
+                                       }
+
+                                       ac.updateConnectivityState(connectivity.TransientFailure)
+                                       ac.mu.Unlock()
+
+                                       if tryNextAddrFromStart.HasFired() {
+                                               break addrLoop
+                                       }
+                               }
+                       } else {
+                               // In RequireHandshakeOn mode, we would have already waited for
+                               // the server preface, so we consider this a success and restart
+                               // from the top of the addr list. In RequireHandshakeOff mode,
+                               // we don't care to wait for the server preface before
+                               // considering this a success, so we also restart from the top
+                               // of the addr list.
+                               ac.mu.Lock()
+                               ac.backoffIdx = 0
+                               ac.mu.Unlock()
+                               break addrLoop
                        }
-                       timer.Stop()
-                       continue
                }
+
+               // After exhausting all addresses, or after need to reconnect after a
+               // READY, the addrConn enters TRANSIENT_FAILURE.
                ac.mu.Lock()
-               ac.printf("ready")
                if ac.state == connectivity.Shutdown {
-                       // ac.tearDown(...) has been invoked.
                        ac.mu.Unlock()
-                       newTransport.Close()
-                       return errConnClosing
-               }
-               oldState = ac.state
-               ac.state = connectivity.Ready
-               ac.csEvltr.recordTransition(oldState, ac.state)
-               ac.transport = newTransport
-               if ac.ready != nil {
-                       close(ac.ready)
-                       ac.ready = nil
-               }
-               if ac.cc.dopts.balancer != nil {
-                       ac.down = ac.cc.dopts.balancer.Up(ac.addr)
+                       return
                }
+               ac.updateConnectivityState(connectivity.TransientFailure)
+
+               // Backoff.
+               b := ac.resetBackoff
+               timer := time.NewTimer(backoffFor)
+               acctx := ac.ctx
                ac.mu.Unlock()
-               return nil
+
+               select {
+               case <-timer.C:
+                       ac.mu.Lock()
+                       ac.backoffIdx++
+                       ac.mu.Unlock()
+               case <-b:
+                       timer.Stop()
+               case <-acctx.Done():
+                       timer.Stop()
+                       return
+               }
        }
 }
 
-// Run in a goroutine to track the error in transport and create the
-// new transport if an error happens. It returns when the channel is closing.
-func (ac *addrConn) transportMonitor() {
-       for {
+// createTransport creates a connection to one of the backends in addrs. It
+// sets ac.transport in the success case, or it returns an error if it was
+// unable to successfully create a transport.
+//
+// If waitForHandshake is enabled, it blocks until server preface arrives.
+func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
+       onCloseCalled := make(chan struct{})
+
+       target := transport.TargetInfo{
+               Addr:      addr.Addr,
+               Metadata:  addr.Metadata,
+               Authority: ac.cc.authority,
+       }
+
+       prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
+
+       onGoAway := func(r transport.GoAwayReason) {
                ac.mu.Lock()
-               t := ac.transport
+               ac.adjustParams(r)
                ac.mu.Unlock()
-               select {
-               // This is needed to detect the teardown when
-               // the addrConn is idle (i.e., no RPC in flight).
-               case <-ac.ctx.Done():
-                       select {
-                       case <-t.Error():
-                               t.Close()
-                       default:
-                       }
-                       return
-               case <-t.GoAway():
-                       ac.adjustParams(t.GetGoAwayReason())
-                       // If GoAway happens without any network I/O error, the underlying transport
-                       // will be gracefully closed, and a new transport will be created.
-                       // (The transport will be closed when all the pending RPCs finished or failed.)
-                       // If GoAway and some network I/O error happen concurrently, the underlying transport
-                       // will be closed, and a new transport will be created.
-                       var drain bool
-                       select {
-                       case <-t.Error():
-                       default:
-                               drain = true
-                       }
-                       if err := ac.resetTransport(drain); err != nil {
-                               grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
-                               if err != errConnClosing {
-                                       // Keep this ac in cc.conns, to get the reason it's torn down.
-                                       ac.tearDown(err)
-                               }
-                               return
-                       }
-               case <-t.Error():
+               reconnect.Fire()
+       }
+
+       onClose := func() {
+               close(onCloseCalled)
+               prefaceTimer.Stop()
+               reconnect.Fire()
+       }
+
+       onPrefaceReceipt := func() {
+               close(prefaceReceived)
+               prefaceTimer.Stop()
+       }
+
+       connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
+       defer cancel()
+       if channelz.IsOn() {
+               copts.ChannelzParentID = ac.channelzID
+       }
+
+       newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
+
+       if err == nil {
+               if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
                        select {
-                       case <-ac.ctx.Done():
-                               t.Close()
-                               return
-                       case <-t.GoAway():
-                               ac.adjustParams(t.GetGoAwayReason())
-                               if err := ac.resetTransport(false); err != nil {
-                                       grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
-                                       if err != errConnClosing {
-                                               // Keep this ac in cc.conns, to get the reason it's torn down.
-                                               ac.tearDown(err)
-                                       }
-                                       return
-                               }
-                       default:
-                       }
-                       ac.mu.Lock()
-                       if ac.state == connectivity.Shutdown {
-                               // ac has been shutdown.
-                               ac.mu.Unlock()
-                               return
+                       case <-prefaceTimer.C:
+                               // We didn't get the preface in time.
+                               newTr.Close()
+                               err = errors.New("timed out waiting for server handshake")
+                       case <-prefaceReceived:
+                               // We got the preface - huzzah! things are good.
+                       case <-onCloseCalled:
+                               // The transport has already closed - noop.
+                               return nil, errors.New("connection closed")
                        }
-                       oldState := ac.state
-                       ac.state = connectivity.TransientFailure
-                       ac.csEvltr.recordTransition(oldState, ac.state)
-                       ac.mu.Unlock()
-                       if err := ac.resetTransport(false); err != nil {
-                               grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
-                               ac.mu.Lock()
-                               ac.printf("transport exiting: %v", err)
-                               ac.mu.Unlock()
-                               grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
-                               if err != errConnClosing {
-                                       // Keep this ac in cc.conns, to get the reason it's torn down.
-                                       ac.tearDown(err)
+               } else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
+                       go func() {
+                               select {
+                               case <-prefaceTimer.C:
+                                       // We didn't get the preface in time.
+                                       newTr.Close()
+                               case <-prefaceReceived:
+                                       // We got the preface just in the nick of time - huzzah!
+                               case <-onCloseCalled:
+                                       // The transport has already closed - noop.
                                }
-                               return
-                       }
+                       }()
                }
        }
-}
 
-// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
-// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
-func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
-       for {
+       if err != nil {
+               // newTr is either nil, or closed.
+               ac.cc.blockingpicker.updateConnectionError(err)
                ac.mu.Lock()
-               switch {
-               case ac.state == connectivity.Shutdown:
-                       if failfast || !hasBalancer {
-                               // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
-                               err := ac.tearDownErr
-                               ac.mu.Unlock()
-                               return nil, err
-                       }
+               if ac.state == connectivity.Shutdown {
+                       // ac.tearDown(...) has been invoked.
                        ac.mu.Unlock()
+
                        return nil, errConnClosing
-               case ac.state == connectivity.Ready:
-                       ct := ac.transport
-                       ac.mu.Unlock()
-                       return ct, nil
-               case ac.state == connectivity.TransientFailure:
-                       if failfast || hasBalancer {
-                               ac.mu.Unlock()
-                               return nil, errConnUnavailable
+               }
+               ac.mu.Unlock()
+               grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
+               return nil, err
+       }
+
+       // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
+       ac.mu.Lock()
+       if ac.state == connectivity.Shutdown {
+               ac.mu.Unlock()
+               newTr.Close()
+               return nil, errConnClosing
+       }
+       ac.mu.Unlock()
+
+       // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
+       ac.mu.Lock()
+       if ac.state == connectivity.Shutdown {
+               ac.mu.Unlock()
+               newTr.Close()
+               return nil, errConnClosing
+       }
+       ac.mu.Unlock()
+
+       return newTr, nil
+}
+
+func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
+       // Set up the health check helper functions
+       newStream := func() (interface{}, error) {
+               return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr)
+       }
+       firstReady := true
+       reportHealth := func(ok bool) {
+               ac.mu.Lock()
+               defer ac.mu.Unlock()
+               if ac.transport != newTr {
+                       return
+               }
+               if ok {
+                       if firstReady {
+                               firstReady = false
+                               ac.curAddr = addr
                        }
+                       ac.updateConnectivityState(connectivity.Ready)
+               } else {
+                       ac.updateConnectivityState(connectivity.TransientFailure)
                }
-               ready := ac.ready
-               if ready == nil {
-                       ready = make(chan struct{})
-                       ac.ready = ready
+       }
+       err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
+       if err != nil {
+               if status.Code(err) == codes.Unimplemented {
+                       if channelz.IsOn() {
+                               channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+                                       Desc:     "Subchannel health check is unimplemented at server side, thus health check is disabled",
+                                       Severity: channelz.CtError,
+                               })
+                       }
+                       grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled")
+               } else {
+                       grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err)
                }
+       }
+}
+
+func (ac *addrConn) resetConnectBackoff() {
+       ac.mu.Lock()
+       close(ac.resetBackoff)
+       ac.backoffIdx = 0
+       ac.resetBackoff = make(chan struct{})
+       ac.mu.Unlock()
+}
+
+// getReadyTransport returns the transport if ac's state is READY.
+// Otherwise it returns nil, false.
+// If ac's state is IDLE, it will trigger ac to connect.
+func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
+       ac.mu.Lock()
+       if ac.state == connectivity.Ready && ac.transport != nil {
+               t := ac.transport
                ac.mu.Unlock()
-               select {
-               case <-ctx.Done():
-                       return nil, toRPCErr(ctx.Err())
-               // Wait until the new transport is ready or failed.
-               case <-ready:
-               }
+               return t, true
        }
+       var idle bool
+       if ac.state == connectivity.Idle {
+               idle = true
+       }
+       ac.mu.Unlock()
+       // Trigger idle ac to connect.
+       if idle {
+               ac.connect()
+       }
+       return nil, false
 }
 
 // tearDown starts to tear down the addrConn.
@@ -1121,38 +1324,126 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans
 // tight loop.
 // tearDown doesn't remove ac from ac.cc.conns.
 func (ac *addrConn) tearDown(err error) {
-       ac.cancel()
-
        ac.mu.Lock()
-       defer ac.mu.Unlock()
-       if ac.down != nil {
-               ac.down(downErrorf(false, false, "%v", err))
-               ac.down = nil
+       if ac.state == connectivity.Shutdown {
+               ac.mu.Unlock()
+               return
        }
-       if err == errConnDrain && ac.transport != nil {
+       curTr := ac.transport
+       ac.transport = nil
+       // We have to set the state to Shutdown before anything else to prevent races
+       // between setting the state and logic that waits on context cancelation / etc.
+       ac.updateConnectivityState(connectivity.Shutdown)
+       ac.cancel()
+       ac.tearDownErr = err
+       ac.curAddr = resolver.Address{}
+       if err == errConnDrain && curTr != nil {
                // GracefulClose(...) may be executed multiple times when
                // i) receiving multiple GoAway frames from the server; or
                // ii) there are concurrent name resolver/Balancer triggered
                // address removal and GoAway.
-               ac.transport.GracefulClose()
+               // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
+               ac.mu.Unlock()
+               curTr.GracefulClose()
+               ac.mu.Lock()
        }
-       if ac.state == connectivity.Shutdown {
-               return
+       if channelz.IsOn() {
+               channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
+                       Desc:     "Subchannel Deleted",
+                       Severity: channelz.CtINFO,
+                       Parent: &channelz.TraceEventDesc{
+                               Desc:     fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID),
+                               Severity: channelz.CtINFO,
+                       },
+               })
+               // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to
+               // the entity beng deleted, and thus prevent it from being deleted right away.
+               channelz.RemoveEntry(ac.channelzID)
        }
-       oldState := ac.state
-       ac.state = connectivity.Shutdown
-       ac.tearDownErr = err
-       ac.csEvltr.recordTransition(oldState, ac.state)
-       if ac.events != nil {
-               ac.events.Finish()
-               ac.events = nil
+       ac.mu.Unlock()
+}
+
+func (ac *addrConn) getState() connectivity.State {
+       ac.mu.Lock()
+       defer ac.mu.Unlock()
+       return ac.state
+}
+
+func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric {
+       ac.mu.Lock()
+       addr := ac.curAddr.Addr
+       ac.mu.Unlock()
+       return &channelz.ChannelInternalMetric{
+               State:                    ac.getState(),
+               Target:                   addr,
+               CallsStarted:             atomic.LoadInt64(&ac.czData.callsStarted),
+               CallsSucceeded:           atomic.LoadInt64(&ac.czData.callsSucceeded),
+               CallsFailed:              atomic.LoadInt64(&ac.czData.callsFailed),
+               LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)),
        }
-       if ac.ready != nil {
-               close(ac.ready)
-               ac.ready = nil
+}
+
+func (ac *addrConn) incrCallsStarted() {
+       atomic.AddInt64(&ac.czData.callsStarted, 1)
+       atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano())
+}
+
+func (ac *addrConn) incrCallsSucceeded() {
+       atomic.AddInt64(&ac.czData.callsSucceeded, 1)
+}
+
+func (ac *addrConn) incrCallsFailed() {
+       atomic.AddInt64(&ac.czData.callsFailed, 1)
+}
+
+type retryThrottler struct {
+       max    float64
+       thresh float64
+       ratio  float64
+
+       mu     sync.Mutex
+       tokens float64 // TODO(dfawley): replace with atomic and remove lock.
+}
+
+// throttle subtracts a retry token from the pool and returns whether a retry
+// should be throttled (disallowed) based upon the retry throttling policy in
+// the service config.
+func (rt *retryThrottler) throttle() bool {
+       if rt == nil {
+               return false
        }
-       if ac.transport != nil && err != errConnDrain {
-               ac.transport.Close()
+       rt.mu.Lock()
+       defer rt.mu.Unlock()
+       rt.tokens--
+       if rt.tokens < 0 {
+               rt.tokens = 0
        }
-       return
+       return rt.tokens <= rt.thresh
 }
+
+func (rt *retryThrottler) successfulRPC() {
+       if rt == nil {
+               return
+       }
+       rt.mu.Lock()
+       defer rt.mu.Unlock()
+       rt.tokens += rt.ratio
+       if rt.tokens > rt.max {
+               rt.tokens = rt.max
+       }
+}
+
+type channelzChannel struct {
+       cc *ClientConn
+}
+
+func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric {
+       return c.cc.channelzMetric()
+}
+
+// ErrClientConnTimeout indicates that the ClientConn cannot establish the
+// underlying connections within the specified timeout.
+//
+// Deprecated: This error is never returned by grpc and should not be
+// referenced by users.
+var ErrClientConnTimeout = errors.New("grpc: timed out when dialing")