3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
26 "google.golang.org/grpc/balancer"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/grpclog"
29 "google.golang.org/grpc/internal/channelz"
30 "google.golang.org/grpc/internal/transport"
31 "google.golang.org/grpc/status"
34 // pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
35 // actions and unblock when there's a picker update.
36 type pickerWrapper struct {
39 blockingCh chan struct{}
40 picker balancer.Picker
42 // The latest connection happened.
47 func newPickerWrapper() *pickerWrapper {
48 bp := &pickerWrapper{blockingCh: make(chan struct{})}
52 func (bp *pickerWrapper) updateConnectionError(err error) {
58 func (bp *pickerWrapper) connectionError() error {
65 // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
66 func (bp *pickerWrapper) updatePicker(p balancer.Picker) {
73 // bp.blockingCh should never be nil.
75 bp.blockingCh = make(chan struct{})
79 func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) {
84 return func(b balancer.DoneInfo) {
85 if b.Err != nil && b.Err != io.EOF {
88 ac.incrCallsSucceeded()
96 // pick returns the transport that will be used for the RPC.
97 // It may block in the following cases:
98 // - there's no picker
99 // - the current picker returns ErrNoSubConnAvailable
100 // - the current picker returns other errors and failfast is false.
101 // - the subConn returned by the current picker is not READY
102 // When one of these situations happens, pick blocks until the picker gets updated.
103 func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) {
113 return nil, nil, ErrClientConnClosing
116 if bp.picker == nil {
119 if ch == bp.blockingCh {
120 // This could happen when either:
121 // - bp.picker is nil (the previous if condition), or
122 // - has called pick on the current picker.
126 return nil, nil, ctx.Err()
136 subConn, done, err := p.Pick(ctx, opts)
140 case balancer.ErrNoSubConnAvailable:
142 case balancer.ErrTransientFailure:
146 return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError())
148 // err is some other error.
149 return nil, nil, toRPCErr(err)
153 acw, ok := subConn.(*acBalancerWrapper)
155 grpclog.Infof("subconn returned from pick is not *acBalancerWrapper")
158 if t, ok := acw.getAddrConn().getReadyTransport(); ok {
160 return t, doneChannelzWrapper(acw, done), nil
164 grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick")
165 // If ok == false, ac.state is not READY.
166 // A valid picker always returns READY subConn. This means the state of ac
167 // just changed, and picker will be updated shortly.
168 // continue back to the beginning of the for loop to repick.
172 func (bp *pickerWrapper) close() {