]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | /* |
2 | * | |
3 | * Copyright 2014 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package grpc | |
20 | ||
21 | import ( | |
107c1cdb | 22 | "context" |
15c0b25d | 23 | "errors" |
107c1cdb ND |
24 | "fmt" |
25 | "math" | |
15c0b25d | 26 | "net" |
107c1cdb | 27 | "reflect" |
15c0b25d AP |
28 | "strings" |
29 | "sync" | |
107c1cdb | 30 | "sync/atomic" |
15c0b25d AP |
31 | "time" |
32 | ||
107c1cdb ND |
33 | "google.golang.org/grpc/balancer" |
34 | _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. | |
35 | "google.golang.org/grpc/codes" | |
15c0b25d AP |
36 | "google.golang.org/grpc/connectivity" |
37 | "google.golang.org/grpc/credentials" | |
38 | "google.golang.org/grpc/grpclog" | |
107c1cdb ND |
39 | "google.golang.org/grpc/internal/backoff" |
40 | "google.golang.org/grpc/internal/channelz" | |
41 | "google.golang.org/grpc/internal/envconfig" | |
42 | "google.golang.org/grpc/internal/grpcsync" | |
43 | "google.golang.org/grpc/internal/transport" | |
15c0b25d | 44 | "google.golang.org/grpc/keepalive" |
107c1cdb ND |
45 | "google.golang.org/grpc/metadata" |
46 | "google.golang.org/grpc/resolver" | |
47 | _ "google.golang.org/grpc/resolver/dns" // To register dns resolver. | |
48 | _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver. | |
49 | "google.golang.org/grpc/status" | |
50 | ) | |
51 | ||
52 | const ( | |
53 | // minimum time to give a connection to complete | |
54 | minConnectTimeout = 20 * time.Second | |
55 | // must match grpclbName in grpclb/grpclb.go | |
56 | grpclbName = "grpclb" | |
15c0b25d AP |
57 | ) |
58 | ||
59 | var ( | |
60 | // ErrClientConnClosing indicates that the operation is illegal because | |
61 | // the ClientConn is closing. | |
107c1cdb ND |
62 | // |
63 | // Deprecated: this error should not be relied upon by users; use the status | |
64 | // code of Canceled instead. | |
65 | ErrClientConnClosing = status.Error(codes.Canceled, "grpc: the client connection is closing") | |
66 | // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. | |
67 | errConnDrain = errors.New("grpc: the connection is drained") | |
68 | // errConnClosing indicates that the connection is closing. | |
69 | errConnClosing = errors.New("grpc: the connection is closing") | |
70 | // errBalancerClosed indicates that the balancer is closed. | |
71 | errBalancerClosed = errors.New("grpc: balancer is closed") | |
72 | // We use an accessor so that minConnectTimeout can be | |
73 | // atomically read and updated while testing. | |
74 | getMinConnectTimeout = func() time.Duration { | |
75 | return minConnectTimeout | |
76 | } | |
77 | ) | |
15c0b25d | 78 | |
107c1cdb ND |
79 | // The following errors are returned from Dial and DialContext |
80 | var ( | |
15c0b25d AP |
81 | // errNoTransportSecurity indicates that there is no transport security |
82 | // being set for ClientConn. Users should either set one or explicitly | |
83 | // call WithInsecure DialOption to disable security. | |
84 | errNoTransportSecurity = errors.New("grpc: no transport security set (use grpc.WithInsecure() explicitly or set credentials)") | |
107c1cdb ND |
85 | // errTransportCredsAndBundle indicates that creds bundle is used together |
86 | // with other individual Transport Credentials. | |
87 | errTransportCredsAndBundle = errors.New("grpc: credentials.Bundle may not be used with individual TransportCredentials") | |
15c0b25d AP |
88 | // errTransportCredentialsMissing indicates that users want to transmit security |
89 | // information (e.g., oauth2 token) which requires secure connection on an insecure | |
90 | // connection. | |
91 | errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") | |
92 | // errCredentialsConflict indicates that grpc.WithTransportCredentials() | |
93 | // and grpc.WithInsecure() are both called for a connection. | |
94 | errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") | |
15c0b25d AP |
95 | ) |
96 | ||
15c0b25d AP |
97 | const ( |
98 | defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 | |
107c1cdb ND |
99 | defaultClientMaxSendMessageSize = math.MaxInt32 |
100 | // http2IOBufSize specifies the buffer size for sending frames. | |
101 | defaultWriteBufSize = 32 * 1024 | |
102 | defaultReadBufSize = 32 * 1024 | |
15c0b25d AP |
103 | ) |
104 | ||
15c0b25d AP |
105 | // Dial creates a client connection to the given target. |
106 | func Dial(target string, opts ...DialOption) (*ClientConn, error) { | |
107 | return DialContext(context.Background(), target, opts...) | |
108 | } | |
109 | ||
107c1cdb ND |
110 | // DialContext creates a client connection to the given target. By default, it's |
111 | // a non-blocking dial (the function won't wait for connections to be | |
112 | // established, and connecting happens in the background). To make it a blocking | |
113 | // dial, use WithBlock() dial option. | |
114 | // | |
115 | // In the non-blocking case, the ctx does not act against the connection. It | |
116 | // only controls the setup steps. | |
117 | // | |
118 | // In the blocking case, ctx can be used to cancel or expire the pending | |
119 | // connection. Once this function returns, the cancellation and expiration of | |
120 | // ctx will be noop. Users should call ClientConn.Close to terminate all the | |
121 | // pending operations after this function returns. | |
122 | // | |
123 | // The target name syntax is defined in | |
124 | // https://github.com/grpc/grpc/blob/master/doc/naming.md. | |
125 | // e.g. to use dns resolver, a "dns:///" prefix should be applied to the target. | |
15c0b25d AP |
126 | func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { |
127 | cc := &ClientConn{ | |
107c1cdb ND |
128 | target: target, |
129 | csMgr: &connectivityStateManager{}, | |
130 | conns: make(map[*addrConn]struct{}), | |
131 | dopts: defaultDialOptions(), | |
132 | blockingpicker: newPickerWrapper(), | |
133 | czData: new(channelzData), | |
134 | firstResolveEvent: grpcsync.NewEvent(), | |
135 | } | |
136 | cc.retryThrottler.Store((*retryThrottler)(nil)) | |
15c0b25d AP |
137 | cc.ctx, cc.cancel = context.WithCancel(context.Background()) |
138 | ||
139 | for _, opt := range opts { | |
107c1cdb ND |
140 | opt.apply(&cc.dopts) |
141 | } | |
142 | ||
143 | if channelz.IsOn() { | |
144 | if cc.dopts.channelzParentID != 0 { | |
145 | cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, cc.dopts.channelzParentID, target) | |
146 | channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ | |
147 | Desc: "Channel Created", | |
148 | Severity: channelz.CtINFO, | |
149 | Parent: &channelz.TraceEventDesc{ | |
150 | Desc: fmt.Sprintf("Nested Channel(id:%d) created", cc.channelzID), | |
151 | Severity: channelz.CtINFO, | |
152 | }, | |
153 | }) | |
154 | } else { | |
155 | cc.channelzID = channelz.RegisterChannel(&channelzChannel{cc}, 0, target) | |
156 | channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ | |
157 | Desc: "Channel Created", | |
158 | Severity: channelz.CtINFO, | |
159 | }) | |
160 | } | |
161 | cc.csMgr.channelzID = cc.channelzID | |
162 | } | |
163 | ||
164 | if !cc.dopts.insecure { | |
165 | if cc.dopts.copts.TransportCredentials == nil && cc.dopts.copts.CredsBundle == nil { | |
166 | return nil, errNoTransportSecurity | |
167 | } | |
168 | if cc.dopts.copts.TransportCredentials != nil && cc.dopts.copts.CredsBundle != nil { | |
169 | return nil, errTransportCredsAndBundle | |
170 | } | |
171 | } else { | |
172 | if cc.dopts.copts.TransportCredentials != nil || cc.dopts.copts.CredsBundle != nil { | |
173 | return nil, errCredentialsConflict | |
174 | } | |
175 | for _, cd := range cc.dopts.copts.PerRPCCredentials { | |
176 | if cd.RequireTransportSecurity() { | |
177 | return nil, errTransportCredentialsMissing | |
178 | } | |
179 | } | |
15c0b25d | 180 | } |
107c1cdb | 181 | |
15c0b25d AP |
182 | cc.mkp = cc.dopts.copts.KeepaliveParams |
183 | ||
184 | if cc.dopts.copts.Dialer == nil { | |
185 | cc.dopts.copts.Dialer = newProxyDialer( | |
186 | func(ctx context.Context, addr string) (net.Conn, error) { | |
107c1cdb ND |
187 | network, addr := parseDialTarget(addr) |
188 | return (&net.Dialer{}).DialContext(ctx, network, addr) | |
15c0b25d AP |
189 | }, |
190 | ) | |
191 | } | |
192 | ||
193 | if cc.dopts.copts.UserAgent != "" { | |
194 | cc.dopts.copts.UserAgent += " " + grpcUA | |
195 | } else { | |
196 | cc.dopts.copts.UserAgent = grpcUA | |
197 | } | |
198 | ||
199 | if cc.dopts.timeout > 0 { | |
200 | var cancel context.CancelFunc | |
201 | ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) | |
202 | defer cancel() | |
203 | } | |
204 | ||
205 | defer func() { | |
206 | select { | |
207 | case <-ctx.Done(): | |
208 | conn, err = nil, ctx.Err() | |
209 | default: | |
210 | } | |
211 | ||
212 | if err != nil { | |
213 | cc.Close() | |
214 | } | |
215 | }() | |
216 | ||
217 | scSet := false | |
218 | if cc.dopts.scChan != nil { | |
219 | // Try to get an initial service config. | |
220 | select { | |
221 | case sc, ok := <-cc.dopts.scChan: | |
222 | if ok { | |
223 | cc.sc = sc | |
224 | scSet = true | |
225 | } | |
226 | default: | |
227 | } | |
228 | } | |
15c0b25d | 229 | if cc.dopts.bs == nil { |
107c1cdb ND |
230 | cc.dopts.bs = backoff.Exponential{ |
231 | MaxDelay: DefaultBackoffConfig.MaxDelay, | |
232 | } | |
233 | } | |
234 | if cc.dopts.resolverBuilder == nil { | |
235 | // Only try to parse target when resolver builder is not already set. | |
236 | cc.parsedTarget = parseTarget(cc.target) | |
237 | grpclog.Infof("parsed scheme: %q", cc.parsedTarget.Scheme) | |
238 | cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) | |
239 | if cc.dopts.resolverBuilder == nil { | |
240 | // If resolver builder is still nil, the parse target's scheme is | |
241 | // not registered. Fallback to default resolver and set Endpoint to | |
242 | // the original unparsed target. | |
243 | grpclog.Infof("scheme %q not registered, fallback to default scheme", cc.parsedTarget.Scheme) | |
244 | cc.parsedTarget = resolver.Target{ | |
245 | Scheme: resolver.GetDefaultScheme(), | |
246 | Endpoint: target, | |
247 | } | |
248 | cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme) | |
249 | } | |
250 | } else { | |
251 | cc.parsedTarget = resolver.Target{Endpoint: target} | |
15c0b25d AP |
252 | } |
253 | creds := cc.dopts.copts.TransportCredentials | |
254 | if creds != nil && creds.Info().ServerName != "" { | |
255 | cc.authority = creds.Info().ServerName | |
107c1cdb ND |
256 | } else if cc.dopts.insecure && cc.dopts.authority != "" { |
257 | cc.authority = cc.dopts.authority | |
15c0b25d | 258 | } else { |
107c1cdb ND |
259 | // Use endpoint from "scheme://authority/endpoint" as the default |
260 | // authority for ClientConn. | |
261 | cc.authority = cc.parsedTarget.Endpoint | |
15c0b25d | 262 | } |
107c1cdb | 263 | |
15c0b25d AP |
264 | if cc.dopts.scChan != nil && !scSet { |
265 | // Blocking wait for the initial service config. | |
266 | select { | |
267 | case sc, ok := <-cc.dopts.scChan: | |
268 | if ok { | |
269 | cc.sc = sc | |
270 | } | |
271 | case <-ctx.Done(): | |
272 | return nil, ctx.Err() | |
273 | } | |
274 | } | |
275 | if cc.dopts.scChan != nil { | |
276 | go cc.scWatcher() | |
277 | } | |
278 | ||
107c1cdb ND |
279 | var credsClone credentials.TransportCredentials |
280 | if creds := cc.dopts.copts.TransportCredentials; creds != nil { | |
281 | credsClone = creds.Clone() | |
282 | } | |
283 | cc.balancerBuildOpts = balancer.BuildOptions{ | |
284 | DialCreds: credsClone, | |
285 | CredsBundle: cc.dopts.copts.CredsBundle, | |
286 | Dialer: cc.dopts.copts.Dialer, | |
287 | ChannelzParentID: cc.channelzID, | |
288 | } | |
15c0b25d | 289 | |
107c1cdb ND |
290 | // Build the resolver. |
291 | rWrapper, err := newCCResolverWrapper(cc) | |
292 | if err != nil { | |
293 | return nil, fmt.Errorf("failed to build resolver: %v", err) | |
294 | } | |
15c0b25d | 295 | |
107c1cdb ND |
296 | cc.mu.Lock() |
297 | cc.resolverWrapper = rWrapper | |
298 | cc.mu.Unlock() | |
299 | // A blocking dial blocks until the clientConn is ready. | |
300 | if cc.dopts.block { | |
301 | for { | |
302 | s := cc.GetState() | |
303 | if s == connectivity.Ready { | |
304 | break | |
305 | } else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure { | |
306 | if err = cc.blockingpicker.connectionError(); err != nil { | |
307 | terr, ok := err.(interface { | |
308 | Temporary() bool | |
309 | }) | |
310 | if ok && !terr.Temporary() { | |
311 | return nil, err | |
312 | } | |
313 | } | |
314 | } | |
315 | if !cc.WaitForStateChange(ctx, s) { | |
316 | // ctx got timeout or canceled. | |
317 | return nil, ctx.Err() | |
318 | } | |
15c0b25d AP |
319 | } |
320 | } | |
321 | ||
107c1cdb | 322 | return cc, nil |
15c0b25d AP |
323 | } |
324 | ||
325 | // connectivityStateManager keeps the connectivity.State of ClientConn. | |
326 | // This struct will eventually be exported so the balancers can access it. | |
327 | type connectivityStateManager struct { | |
328 | mu sync.Mutex | |
329 | state connectivity.State | |
330 | notifyChan chan struct{} | |
107c1cdb | 331 | channelzID int64 |
15c0b25d AP |
332 | } |
333 | ||
334 | // updateState updates the connectivity.State of ClientConn. | |
335 | // If there's a change it notifies goroutines waiting on state change to | |
336 | // happen. | |
337 | func (csm *connectivityStateManager) updateState(state connectivity.State) { | |
338 | csm.mu.Lock() | |
339 | defer csm.mu.Unlock() | |
340 | if csm.state == connectivity.Shutdown { | |
341 | return | |
342 | } | |
343 | if csm.state == state { | |
344 | return | |
345 | } | |
346 | csm.state = state | |
107c1cdb ND |
347 | if channelz.IsOn() { |
348 | channelz.AddTraceEvent(csm.channelzID, &channelz.TraceEventDesc{ | |
349 | Desc: fmt.Sprintf("Channel Connectivity change to %v", state), | |
350 | Severity: channelz.CtINFO, | |
351 | }) | |
352 | } | |
15c0b25d AP |
353 | if csm.notifyChan != nil { |
354 | // There are other goroutines waiting on this channel. | |
355 | close(csm.notifyChan) | |
356 | csm.notifyChan = nil | |
357 | } | |
358 | } | |
359 | ||
360 | func (csm *connectivityStateManager) getState() connectivity.State { | |
361 | csm.mu.Lock() | |
362 | defer csm.mu.Unlock() | |
363 | return csm.state | |
364 | } | |
365 | ||
366 | func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { | |
367 | csm.mu.Lock() | |
368 | defer csm.mu.Unlock() | |
369 | if csm.notifyChan == nil { | |
370 | csm.notifyChan = make(chan struct{}) | |
371 | } | |
372 | return csm.notifyChan | |
373 | } | |
374 | ||
375 | // ClientConn represents a client connection to an RPC server. | |
376 | type ClientConn struct { | |
377 | ctx context.Context | |
378 | cancel context.CancelFunc | |
379 | ||
107c1cdb ND |
380 | target string |
381 | parsedTarget resolver.Target | |
382 | authority string | |
383 | dopts dialOptions | |
384 | csMgr *connectivityStateManager | |
385 | ||
386 | balancerBuildOpts balancer.BuildOptions | |
387 | blockingpicker *pickerWrapper | |
15c0b25d | 388 | |
107c1cdb ND |
389 | mu sync.RWMutex |
390 | resolverWrapper *ccResolverWrapper | |
391 | sc ServiceConfig | |
392 | scRaw string | |
393 | conns map[*addrConn]struct{} | |
15c0b25d | 394 | // Keepalive parameter can be updated if a GoAway is received. |
107c1cdb ND |
395 | mkp keepalive.ClientParameters |
396 | curBalancerName string | |
397 | preBalancerName string // previous balancer name. | |
398 | curAddresses []resolver.Address | |
399 | balancerWrapper *ccBalancerWrapper | |
400 | retryThrottler atomic.Value | |
401 | ||
402 | firstResolveEvent *grpcsync.Event | |
403 | ||
404 | channelzID int64 // channelz unique identification number | |
405 | czData *channelzData | |
15c0b25d AP |
406 | } |
407 | ||
408 | // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or | |
409 | // ctx expires. A true value is returned in former case and false in latter. | |
410 | // This is an EXPERIMENTAL API. | |
411 | func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { | |
412 | ch := cc.csMgr.getNotifyChan() | |
413 | if cc.csMgr.getState() != sourceState { | |
414 | return true | |
415 | } | |
416 | select { | |
417 | case <-ctx.Done(): | |
418 | return false | |
419 | case <-ch: | |
420 | return true | |
421 | } | |
422 | } | |
423 | ||
424 | // GetState returns the connectivity.State of ClientConn. | |
425 | // This is an EXPERIMENTAL API. | |
426 | func (cc *ClientConn) GetState() connectivity.State { | |
427 | return cc.csMgr.getState() | |
428 | } | |
429 | ||
15c0b25d AP |
430 | func (cc *ClientConn) scWatcher() { |
431 | for { | |
432 | select { | |
433 | case sc, ok := <-cc.dopts.scChan: | |
434 | if !ok { | |
435 | return | |
436 | } | |
437 | cc.mu.Lock() | |
438 | // TODO: load balance policy runtime change is ignored. | |
439 | // We may revist this decision in the future. | |
440 | cc.sc = sc | |
107c1cdb | 441 | cc.scRaw = "" |
15c0b25d AP |
442 | cc.mu.Unlock() |
443 | case <-cc.ctx.Done(): | |
444 | return | |
445 | } | |
446 | } | |
447 | } | |
448 | ||
107c1cdb ND |
449 | // waitForResolvedAddrs blocks until the resolver has provided addresses or the |
450 | // context expires. Returns nil unless the context expires first; otherwise | |
451 | // returns a status error based on the context. | |
452 | func (cc *ClientConn) waitForResolvedAddrs(ctx context.Context) error { | |
453 | // This is on the RPC path, so we use a fast path to avoid the | |
454 | // more-expensive "select" below after the resolver has returned once. | |
455 | if cc.firstResolveEvent.HasFired() { | |
456 | return nil | |
15c0b25d | 457 | } |
107c1cdb ND |
458 | select { |
459 | case <-cc.firstResolveEvent.Done(): | |
460 | return nil | |
461 | case <-ctx.Done(): | |
462 | return status.FromContextError(ctx.Err()).Err() | |
463 | case <-cc.ctx.Done(): | |
464 | return ErrClientConnClosing | |
15c0b25d | 465 | } |
107c1cdb ND |
466 | } |
467 | ||
468 | func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) { | |
15c0b25d | 469 | cc.mu.Lock() |
107c1cdb | 470 | defer cc.mu.Unlock() |
15c0b25d | 471 | if cc.conns == nil { |
107c1cdb ND |
472 | // cc was closed. |
473 | return | |
15c0b25d | 474 | } |
107c1cdb ND |
475 | |
476 | if reflect.DeepEqual(cc.curAddresses, addrs) { | |
477 | return | |
15c0b25d | 478 | } |
107c1cdb ND |
479 | |
480 | cc.curAddresses = addrs | |
481 | cc.firstResolveEvent.Fire() | |
482 | ||
483 | if cc.dopts.balancerBuilder == nil { | |
484 | // Only look at balancer types and switch balancer if balancer dial | |
485 | // option is not set. | |
486 | var isGRPCLB bool | |
487 | for _, a := range addrs { | |
488 | if a.Type == resolver.GRPCLB { | |
489 | isGRPCLB = true | |
490 | break | |
15c0b25d | 491 | } |
15c0b25d | 492 | } |
107c1cdb ND |
493 | var newBalancerName string |
494 | if isGRPCLB { | |
495 | newBalancerName = grpclbName | |
496 | } else { | |
497 | // Address list doesn't contain grpclb address. Try to pick a | |
498 | // non-grpclb balancer. | |
499 | newBalancerName = cc.curBalancerName | |
500 | // If current balancer is grpclb, switch to the previous one. | |
501 | if newBalancerName == grpclbName { | |
502 | newBalancerName = cc.preBalancerName | |
15c0b25d | 503 | } |
107c1cdb ND |
504 | // The following could be true in two cases: |
505 | // - the first time handling resolved addresses | |
506 | // (curBalancerName="") | |
507 | // - the first time handling non-grpclb addresses | |
508 | // (curBalancerName="grpclb", preBalancerName="") | |
509 | if newBalancerName == "" { | |
510 | newBalancerName = PickFirstBalancerName | |
511 | } | |
512 | } | |
513 | cc.switchBalancer(newBalancerName) | |
514 | } else if cc.balancerWrapper == nil { | |
515 | // Balancer dial option was set, and this is the first time handling | |
516 | // resolved addresses. Build a balancer with dopts.balancerBuilder. | |
517 | cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts) | |
15c0b25d | 518 | } |
107c1cdb ND |
519 | |
520 | cc.balancerWrapper.handleResolvedAddrs(addrs, nil) | |
15c0b25d AP |
521 | } |
522 | ||
107c1cdb ND |
523 | // switchBalancer starts the switching from current balancer to the balancer |
524 | // with the given name. | |
525 | // | |
526 | // It will NOT send the current address list to the new balancer. If needed, | |
527 | // caller of this function should send address list to the new balancer after | |
528 | // this function returns. | |
529 | // | |
530 | // Caller must hold cc.mu. | |
531 | func (cc *ClientConn) switchBalancer(name string) { | |
532 | if cc.conns == nil { | |
533 | return | |
15c0b25d | 534 | } |
15c0b25d | 535 | |
107c1cdb ND |
536 | if strings.ToLower(cc.curBalancerName) == strings.ToLower(name) { |
537 | return | |
15c0b25d | 538 | } |
107c1cdb ND |
539 | |
540 | grpclog.Infof("ClientConn switching balancer to %q", name) | |
541 | if cc.dopts.balancerBuilder != nil { | |
542 | grpclog.Infoln("ignoring balancer switching: Balancer DialOption used instead") | |
543 | return | |
15c0b25d | 544 | } |
107c1cdb ND |
545 | // TODO(bar switching) change this to two steps: drain and close. |
546 | // Keep track of sc in wrapper. | |
547 | if cc.balancerWrapper != nil { | |
548 | cc.balancerWrapper.close() | |
549 | } | |
550 | ||
551 | builder := balancer.Get(name) | |
552 | // TODO(yuxuanli): If user send a service config that does not contain a valid balancer name, should | |
553 | // we reuse previous one? | |
554 | if channelz.IsOn() { | |
555 | if builder == nil { | |
556 | channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ | |
557 | Desc: fmt.Sprintf("Channel switches to new LB policy %q due to fallback from invalid balancer name", PickFirstBalancerName), | |
558 | Severity: channelz.CtWarning, | |
559 | }) | |
560 | } else { | |
561 | channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ | |
562 | Desc: fmt.Sprintf("Channel switches to new LB policy %q", name), | |
563 | Severity: channelz.CtINFO, | |
564 | }) | |
565 | } | |
566 | } | |
567 | if builder == nil { | |
568 | grpclog.Infof("failed to get balancer builder for: %v, using pick_first instead", name) | |
569 | builder = newPickfirstBuilder() | |
570 | } | |
571 | ||
572 | cc.preBalancerName = cc.curBalancerName | |
573 | cc.curBalancerName = builder.Name() | |
574 | cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts) | |
575 | } | |
576 | ||
577 | func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { | |
578 | cc.mu.Lock() | |
579 | if cc.conns == nil { | |
580 | cc.mu.Unlock() | |
581 | return | |
582 | } | |
583 | // TODO(bar switching) send updates to all balancer wrappers when balancer | |
584 | // gracefully switching is supported. | |
585 | cc.balancerWrapper.handleSubConnStateChange(sc, s) | |
586 | cc.mu.Unlock() | |
587 | } | |
588 | ||
589 | // newAddrConn creates an addrConn for addrs and adds it to cc.conns. | |
590 | // | |
591 | // Caller needs to make sure len(addrs) > 0. | |
592 | func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) { | |
593 | ac := &addrConn{ | |
594 | cc: cc, | |
595 | addrs: addrs, | |
596 | scopts: opts, | |
597 | dopts: cc.dopts, | |
598 | czData: new(channelzData), | |
599 | resetBackoff: make(chan struct{}), | |
600 | } | |
601 | ac.ctx, ac.cancel = context.WithCancel(cc.ctx) | |
602 | // Track ac in cc. This needs to be done before any getTransport(...) is called. | |
603 | cc.mu.Lock() | |
604 | if cc.conns == nil { | |
605 | cc.mu.Unlock() | |
606 | return nil, ErrClientConnClosing | |
607 | } | |
608 | if channelz.IsOn() { | |
609 | ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") | |
610 | channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ | |
611 | Desc: "Subchannel Created", | |
612 | Severity: channelz.CtINFO, | |
613 | Parent: &channelz.TraceEventDesc{ | |
614 | Desc: fmt.Sprintf("Subchannel(id:%d) created", ac.channelzID), | |
615 | Severity: channelz.CtINFO, | |
616 | }, | |
617 | }) | |
618 | } | |
619 | cc.conns[ac] = struct{}{} | |
620 | cc.mu.Unlock() | |
621 | return ac, nil | |
622 | } | |
623 | ||
624 | // removeAddrConn removes the addrConn in the subConn from clientConn. | |
625 | // It also tears down the ac with the given error. | |
626 | func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { | |
627 | cc.mu.Lock() | |
628 | if cc.conns == nil { | |
629 | cc.mu.Unlock() | |
630 | return | |
631 | } | |
632 | delete(cc.conns, ac) | |
633 | cc.mu.Unlock() | |
634 | ac.tearDown(err) | |
635 | } | |
636 | ||
637 | func (cc *ClientConn) channelzMetric() *channelz.ChannelInternalMetric { | |
638 | return &channelz.ChannelInternalMetric{ | |
639 | State: cc.GetState(), | |
640 | Target: cc.target, | |
641 | CallsStarted: atomic.LoadInt64(&cc.czData.callsStarted), | |
642 | CallsSucceeded: atomic.LoadInt64(&cc.czData.callsSucceeded), | |
643 | CallsFailed: atomic.LoadInt64(&cc.czData.callsFailed), | |
644 | LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&cc.czData.lastCallStartedTime)), | |
645 | } | |
646 | } | |
647 | ||
648 | // Target returns the target string of the ClientConn. | |
649 | // This is an EXPERIMENTAL API. | |
650 | func (cc *ClientConn) Target() string { | |
651 | return cc.target | |
652 | } | |
653 | ||
654 | func (cc *ClientConn) incrCallsStarted() { | |
655 | atomic.AddInt64(&cc.czData.callsStarted, 1) | |
656 | atomic.StoreInt64(&cc.czData.lastCallStartedTime, time.Now().UnixNano()) | |
657 | } | |
658 | ||
659 | func (cc *ClientConn) incrCallsSucceeded() { | |
660 | atomic.AddInt64(&cc.czData.callsSucceeded, 1) | |
661 | } | |
662 | ||
663 | func (cc *ClientConn) incrCallsFailed() { | |
664 | atomic.AddInt64(&cc.czData.callsFailed, 1) | |
665 | } | |
666 | ||
667 | // connect starts creating a transport. | |
668 | // It does nothing if the ac is not IDLE. | |
669 | // TODO(bar) Move this to the addrConn section. | |
670 | func (ac *addrConn) connect() error { | |
671 | ac.mu.Lock() | |
672 | if ac.state == connectivity.Shutdown { | |
673 | ac.mu.Unlock() | |
674 | return errConnClosing | |
675 | } | |
676 | if ac.state != connectivity.Idle { | |
677 | ac.mu.Unlock() | |
678 | return nil | |
679 | } | |
680 | ac.updateConnectivityState(connectivity.Connecting) | |
681 | ac.mu.Unlock() | |
682 | ||
683 | // Start a goroutine connecting to the server asynchronously. | |
684 | go ac.resetTransport() | |
685 | return nil | |
686 | } | |
687 | ||
688 | // tryUpdateAddrs tries to update ac.addrs with the new addresses list. | |
689 | // | |
690 | // It checks whether current connected address of ac is in the new addrs list. | |
691 | // - If true, it updates ac.addrs and returns true. The ac will keep using | |
692 | // the existing connection. | |
693 | // - If false, it does nothing and returns false. | |
694 | func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { | |
695 | ac.mu.Lock() | |
696 | defer ac.mu.Unlock() | |
697 | grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs) | |
698 | if ac.state == connectivity.Shutdown { | |
699 | ac.addrs = addrs | |
700 | return true | |
701 | } | |
702 | ||
703 | // Unless we're busy reconnecting already, let's reconnect from the top of | |
704 | // the list. | |
705 | if ac.state != connectivity.Ready { | |
706 | return false | |
707 | } | |
708 | ||
709 | var curAddrFound bool | |
710 | for _, a := range addrs { | |
711 | if reflect.DeepEqual(ac.curAddr, a) { | |
712 | curAddrFound = true | |
713 | break | |
714 | } | |
715 | } | |
716 | grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) | |
717 | if curAddrFound { | |
718 | ac.addrs = addrs | |
719 | } | |
720 | ||
721 | return curAddrFound | |
722 | } | |
723 | ||
724 | // GetMethodConfig gets the method config of the input method. | |
725 | // If there's an exact match for input method (i.e. /service/method), we return | |
726 | // the corresponding MethodConfig. | |
727 | // If there isn't an exact match for the input method, we look for the default config | |
728 | // under the service (i.e /service/). If there is a default MethodConfig for | |
729 | // the service, we return it. | |
730 | // Otherwise, we return an empty MethodConfig. | |
731 | func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { | |
732 | // TODO: Avoid the locking here. | |
733 | cc.mu.RLock() | |
734 | defer cc.mu.RUnlock() | |
735 | m, ok := cc.sc.Methods[method] | |
736 | if !ok { | |
737 | i := strings.LastIndex(method, "/") | |
738 | m = cc.sc.Methods[method[:i+1]] | |
739 | } | |
740 | return m | |
741 | } | |
742 | ||
743 | func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { | |
744 | cc.mu.RLock() | |
745 | defer cc.mu.RUnlock() | |
746 | return cc.sc.healthCheckConfig | |
747 | } | |
748 | ||
749 | func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, func(balancer.DoneInfo), error) { | |
750 | hdr, _ := metadata.FromOutgoingContext(ctx) | |
751 | t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{ | |
752 | FullMethodName: method, | |
753 | Header: hdr, | |
754 | }) | |
15c0b25d | 755 | if err != nil { |
107c1cdb ND |
756 | return nil, nil, toRPCErr(err) |
757 | } | |
758 | return t, done, nil | |
759 | } | |
760 | ||
761 | // handleServiceConfig parses the service config string in JSON format to Go native | |
762 | // struct ServiceConfig, and store both the struct and the JSON string in ClientConn. | |
763 | func (cc *ClientConn) handleServiceConfig(js string) error { | |
764 | if cc.dopts.disableServiceConfig { | |
765 | return nil | |
766 | } | |
767 | if cc.scRaw == js { | |
768 | return nil | |
769 | } | |
770 | if channelz.IsOn() { | |
771 | channelz.AddTraceEvent(cc.channelzID, &channelz.TraceEventDesc{ | |
772 | // The special formatting of \"%s\" instead of %q is to provide nice printing of service config | |
773 | // for human consumption. | |
774 | Desc: fmt.Sprintf("Channel has a new service config \"%s\"", js), | |
775 | Severity: channelz.CtINFO, | |
776 | }) | |
777 | } | |
778 | sc, err := parseServiceConfig(js) | |
779 | if err != nil { | |
780 | return err | |
781 | } | |
782 | cc.mu.Lock() | |
783 | // Check if the ClientConn is already closed. Some fields (e.g. | |
784 | // balancerWrapper) are set to nil when closing the ClientConn, and could | |
785 | // cause nil pointer panic if we don't have this check. | |
786 | if cc.conns == nil { | |
787 | cc.mu.Unlock() | |
788 | return nil | |
789 | } | |
790 | cc.scRaw = js | |
791 | cc.sc = sc | |
792 | ||
793 | if sc.retryThrottling != nil { | |
794 | newThrottler := &retryThrottler{ | |
795 | tokens: sc.retryThrottling.MaxTokens, | |
796 | max: sc.retryThrottling.MaxTokens, | |
797 | thresh: sc.retryThrottling.MaxTokens / 2, | |
798 | ratio: sc.retryThrottling.TokenRatio, | |
799 | } | |
800 | cc.retryThrottler.Store(newThrottler) | |
801 | } else { | |
802 | cc.retryThrottler.Store((*retryThrottler)(nil)) | |
803 | } | |
804 | ||
805 | if sc.LB != nil && *sc.LB != grpclbName { // "grpclb" is not a valid balancer option in service config. | |
806 | if cc.curBalancerName == grpclbName { | |
807 | // If current balancer is grpclb, there's at least one grpclb | |
808 | // balancer address in the resolved list. Don't switch the balancer, | |
809 | // but change the previous balancer name, so if a new resolved | |
810 | // address list doesn't contain grpclb address, balancer will be | |
811 | // switched to *sc.LB. | |
812 | cc.preBalancerName = *sc.LB | |
813 | } else { | |
814 | cc.switchBalancer(*sc.LB) | |
815 | cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil) | |
15c0b25d | 816 | } |
15c0b25d | 817 | } |
107c1cdb ND |
818 | |
819 | cc.mu.Unlock() | |
820 | return nil | |
821 | } | |
822 | ||
823 | func (cc *ClientConn) resolveNow(o resolver.ResolveNowOption) { | |
824 | cc.mu.RLock() | |
825 | r := cc.resolverWrapper | |
826 | cc.mu.RUnlock() | |
827 | if r == nil { | |
828 | return | |
829 | } | |
830 | go r.resolveNow(o) | |
831 | } | |
832 | ||
833 | // ResetConnectBackoff wakes up all subchannels in transient failure and causes | |
834 | // them to attempt another connection immediately. It also resets the backoff | |
835 | // times used for subsequent attempts regardless of the current state. | |
836 | // | |
837 | // In general, this function should not be used. Typical service or network | |
838 | // outages result in a reasonable client reconnection strategy by default. | |
839 | // However, if a previously unavailable network becomes available, this may be | |
840 | // used to trigger an immediate reconnect. | |
841 | // | |
842 | // This API is EXPERIMENTAL. | |
843 | func (cc *ClientConn) ResetConnectBackoff() { | |
844 | cc.mu.Lock() | |
845 | defer cc.mu.Unlock() | |
846 | for ac := range cc.conns { | |
847 | ac.resetConnectBackoff() | |
848 | } | |
15c0b25d AP |
849 | } |
850 | ||
851 | // Close tears down the ClientConn and all underlying connections. | |
852 | func (cc *ClientConn) Close() error { | |
107c1cdb | 853 | defer cc.cancel() |
15c0b25d AP |
854 | |
855 | cc.mu.Lock() | |
856 | if cc.conns == nil { | |
857 | cc.mu.Unlock() | |
858 | return ErrClientConnClosing | |
859 | } | |
860 | conns := cc.conns | |
861 | cc.conns = nil | |
862 | cc.csMgr.updateState(connectivity.Shutdown) | |
107c1cdb ND |
863 | |
864 | rWrapper := cc.resolverWrapper | |
865 | cc.resolverWrapper = nil | |
866 | bWrapper := cc.balancerWrapper | |
867 | cc.balancerWrapper = nil | |
15c0b25d | 868 | cc.mu.Unlock() |
107c1cdb ND |
869 | |
870 | cc.blockingpicker.close() | |
871 | ||
872 | if rWrapper != nil { | |
873 | rWrapper.close() | |
874 | } | |
875 | if bWrapper != nil { | |
876 | bWrapper.close() | |
15c0b25d | 877 | } |
107c1cdb ND |
878 | |
879 | for ac := range conns { | |
15c0b25d AP |
880 | ac.tearDown(ErrClientConnClosing) |
881 | } | |
107c1cdb ND |
882 | if channelz.IsOn() { |
883 | ted := &channelz.TraceEventDesc{ | |
884 | Desc: "Channel Deleted", | |
885 | Severity: channelz.CtINFO, | |
886 | } | |
887 | if cc.dopts.channelzParentID != 0 { | |
888 | ted.Parent = &channelz.TraceEventDesc{ | |
889 | Desc: fmt.Sprintf("Nested channel(id:%d) deleted", cc.channelzID), | |
890 | Severity: channelz.CtINFO, | |
891 | } | |
892 | } | |
893 | channelz.AddTraceEvent(cc.channelzID, ted) | |
894 | // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to | |
895 | // the entity beng deleted, and thus prevent it from being deleted right away. | |
896 | channelz.RemoveEntry(cc.channelzID) | |
897 | } | |
15c0b25d AP |
898 | return nil |
899 | } | |
900 | ||
901 | // addrConn is a network connection to a given address. | |
902 | type addrConn struct { | |
903 | ctx context.Context | |
904 | cancel context.CancelFunc | |
905 | ||
906 | cc *ClientConn | |
15c0b25d | 907 | dopts dialOptions |
107c1cdb ND |
908 | acbw balancer.SubConn |
909 | scopts balancer.NewSubConnOptions | |
910 | ||
911 | // transport is set when there's a viable transport (note: ac state may not be READY as LB channel | |
912 | // health checking may require server to report healthy to set ac to READY), and is reset | |
913 | // to nil when the current transport should no longer be used to create a stream (e.g. after GoAway | |
914 | // is received, transport is closed, ac has been torn down). | |
915 | transport transport.ClientTransport // The current transport. | |
15c0b25d | 916 | |
107c1cdb ND |
917 | mu sync.Mutex |
918 | curAddr resolver.Address // The current address. | |
919 | addrs []resolver.Address // All addresses that the resolver resolved to. | |
15c0b25d | 920 | |
107c1cdb | 921 | // Use updateConnectivityState for updating addrConn's connectivity state. |
15c0b25d | 922 | state connectivity.State |
107c1cdb ND |
923 | |
924 | tearDownErr error // The reason this addrConn is torn down. | |
925 | ||
926 | backoffIdx int // Needs to be stateful for resetConnectBackoff. | |
927 | resetBackoff chan struct{} | |
928 | ||
929 | channelzID int64 // channelz unique identification number. | |
930 | czData *channelzData | |
931 | healthCheckEnabled bool | |
932 | } | |
933 | ||
934 | // Note: this requires a lock on ac.mu. | |
935 | func (ac *addrConn) updateConnectivityState(s connectivity.State) { | |
936 | if ac.state == s { | |
937 | return | |
938 | } | |
939 | ||
940 | updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s) | |
941 | grpclog.Infof(updateMsg) | |
942 | ac.state = s | |
943 | if channelz.IsOn() { | |
944 | channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ | |
945 | Desc: updateMsg, | |
946 | Severity: channelz.CtINFO, | |
947 | }) | |
948 | } | |
949 | ac.cc.handleSubConnStateChange(ac.acbw, s) | |
15c0b25d AP |
950 | } |
951 | ||
952 | // adjustParams updates parameters used to create transports upon | |
953 | // receiving a GoAway. | |
954 | func (ac *addrConn) adjustParams(r transport.GoAwayReason) { | |
955 | switch r { | |
107c1cdb | 956 | case transport.GoAwayTooManyPings: |
15c0b25d AP |
957 | v := 2 * ac.dopts.copts.KeepaliveParams.Time |
958 | ac.cc.mu.Lock() | |
959 | if v > ac.cc.mkp.Time { | |
960 | ac.cc.mkp.Time = v | |
961 | } | |
962 | ac.cc.mu.Unlock() | |
963 | } | |
964 | } | |
965 | ||
107c1cdb ND |
966 | func (ac *addrConn) resetTransport() { |
967 | for i := 0; ; i++ { | |
968 | tryNextAddrFromStart := grpcsync.NewEvent() | |
15c0b25d | 969 | |
15c0b25d | 970 | ac.mu.Lock() |
107c1cdb ND |
971 | if i > 0 { |
972 | ac.cc.resolveNow(resolver.ResolveNowOption{}) | |
15c0b25d | 973 | } |
107c1cdb ND |
974 | addrs := ac.addrs |
975 | backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx) | |
15c0b25d | 976 | ac.mu.Unlock() |
15c0b25d | 977 | |
107c1cdb ND |
978 | addrLoop: |
979 | for _, addr := range addrs { | |
980 | ac.mu.Lock() | |
981 | ||
982 | if ac.state == connectivity.Shutdown { | |
983 | ac.mu.Unlock() | |
984 | return | |
15c0b25d | 985 | } |
107c1cdb ND |
986 | ac.updateConnectivityState(connectivity.Connecting) |
987 | ac.transport = nil | |
988 | ac.mu.Unlock() | |
989 | ||
990 | // This will be the duration that dial gets to finish. | |
991 | dialDuration := getMinConnectTimeout() | |
992 | if dialDuration < backoffFor { | |
993 | // Give dial more time as we keep failing to connect. | |
994 | dialDuration = backoffFor | |
995 | } | |
996 | connectDeadline := time.Now().Add(dialDuration) | |
997 | ||
15c0b25d | 998 | ac.mu.Lock() |
107c1cdb ND |
999 | ac.cc.mu.RLock() |
1000 | ac.dopts.copts.KeepaliveParams = ac.cc.mkp | |
1001 | ac.cc.mu.RUnlock() | |
1002 | ||
15c0b25d | 1003 | if ac.state == connectivity.Shutdown { |
15c0b25d | 1004 | ac.mu.Unlock() |
107c1cdb | 1005 | return |
15c0b25d | 1006 | } |
107c1cdb ND |
1007 | |
1008 | copts := ac.dopts.copts | |
1009 | if ac.scopts.CredsBundle != nil { | |
1010 | copts.CredsBundle = ac.scopts.CredsBundle | |
15c0b25d | 1011 | } |
107c1cdb ND |
1012 | hctx, hcancel := context.WithCancel(ac.ctx) |
1013 | defer hcancel() | |
15c0b25d | 1014 | ac.mu.Unlock() |
107c1cdb ND |
1015 | |
1016 | if channelz.IsOn() { | |
1017 | channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ | |
1018 | Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr), | |
1019 | Severity: channelz.CtINFO, | |
1020 | }) | |
1021 | } | |
1022 | ||
1023 | reconnect := grpcsync.NewEvent() | |
1024 | prefaceReceived := make(chan struct{}) | |
1025 | newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived) | |
1026 | if err == nil { | |
1027 | ac.mu.Lock() | |
1028 | ac.curAddr = addr | |
1029 | ac.transport = newTr | |
1030 | ac.mu.Unlock() | |
1031 | ||
1032 | healthCheckConfig := ac.cc.healthCheckConfig() | |
1033 | // LB channel health checking is only enabled when all the four requirements below are met: | |
1034 | // 1. it is not disabled by the user with the WithDisableHealthCheck DialOption, | |
1035 | // 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package, | |
1036 | // 3. a service config with non-empty healthCheckConfig field is provided, | |
1037 | // 4. the current load balancer allows it. | |
1038 | healthcheckManagingState := false | |
1039 | if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled { | |
1040 | if ac.cc.dopts.healthCheckFunc == nil { | |
1041 | // TODO: add a link to the health check doc in the error message. | |
1042 | grpclog.Error("the client side LB channel health check function has not been set.") | |
1043 | } else { | |
1044 | // TODO(deklerk) refactor to just return transport | |
1045 | go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName) | |
1046 | healthcheckManagingState = true | |
1047 | } | |
1048 | } | |
1049 | if !healthcheckManagingState { | |
1050 | ac.mu.Lock() | |
1051 | ac.updateConnectivityState(connectivity.Ready) | |
1052 | ac.mu.Unlock() | |
1053 | } | |
1054 | } else { | |
1055 | hcancel() | |
1056 | if err == errConnClosing { | |
1057 | return | |
1058 | } | |
1059 | ||
1060 | if tryNextAddrFromStart.HasFired() { | |
1061 | break addrLoop | |
1062 | } | |
1063 | continue | |
1064 | } | |
1065 | ||
1066 | ac.mu.Lock() | |
1067 | reqHandshake := ac.dopts.reqHandshake | |
1068 | ac.mu.Unlock() | |
1069 | ||
1070 | <-reconnect.Done() | |
1071 | hcancel() | |
1072 | ||
1073 | if reqHandshake == envconfig.RequireHandshakeHybrid { | |
1074 | // In RequireHandshakeHybrid mode, we must check to see whether | |
1075 | // server preface has arrived yet to decide whether to start | |
1076 | // reconnecting at the top of the list (server preface received) | |
1077 | // or continue with the next addr in the list as if the | |
1078 | // connection were not successful (server preface not received). | |
1079 | select { | |
1080 | case <-prefaceReceived: | |
1081 | // We received a server preface - huzzah! We consider this | |
1082 | // a success and restart from the top of the addr list. | |
1083 | ac.mu.Lock() | |
1084 | ac.backoffIdx = 0 | |
1085 | ac.mu.Unlock() | |
1086 | break addrLoop | |
1087 | default: | |
1088 | // Despite having set state to READY, in hybrid mode we | |
1089 | // consider this a failure and continue connecting at the | |
1090 | // next addr in the list. | |
1091 | ac.mu.Lock() | |
1092 | if ac.state == connectivity.Shutdown { | |
1093 | ac.mu.Unlock() | |
1094 | return | |
1095 | } | |
1096 | ||
1097 | ac.updateConnectivityState(connectivity.TransientFailure) | |
1098 | ac.mu.Unlock() | |
1099 | ||
1100 | if tryNextAddrFromStart.HasFired() { | |
1101 | break addrLoop | |
1102 | } | |
1103 | } | |
1104 | } else { | |
1105 | // In RequireHandshakeOn mode, we would have already waited for | |
1106 | // the server preface, so we consider this a success and restart | |
1107 | // from the top of the addr list. In RequireHandshakeOff mode, | |
1108 | // we don't care to wait for the server preface before | |
1109 | // considering this a success, so we also restart from the top | |
1110 | // of the addr list. | |
1111 | ac.mu.Lock() | |
1112 | ac.backoffIdx = 0 | |
1113 | ac.mu.Unlock() | |
1114 | break addrLoop | |
15c0b25d | 1115 | } |
15c0b25d | 1116 | } |
107c1cdb ND |
1117 | |
1118 | // After exhausting all addresses, or after need to reconnect after a | |
1119 | // READY, the addrConn enters TRANSIENT_FAILURE. | |
15c0b25d | 1120 | ac.mu.Lock() |
15c0b25d | 1121 | if ac.state == connectivity.Shutdown { |
15c0b25d | 1122 | ac.mu.Unlock() |
107c1cdb | 1123 | return |
15c0b25d | 1124 | } |
107c1cdb ND |
1125 | ac.updateConnectivityState(connectivity.TransientFailure) |
1126 | ||
1127 | // Backoff. | |
1128 | b := ac.resetBackoff | |
1129 | timer := time.NewTimer(backoffFor) | |
1130 | acctx := ac.ctx | |
15c0b25d | 1131 | ac.mu.Unlock() |
107c1cdb ND |
1132 | |
1133 | select { | |
1134 | case <-timer.C: | |
1135 | ac.mu.Lock() | |
1136 | ac.backoffIdx++ | |
1137 | ac.mu.Unlock() | |
1138 | case <-b: | |
1139 | timer.Stop() | |
1140 | case <-acctx.Done(): | |
1141 | timer.Stop() | |
1142 | return | |
1143 | } | |
15c0b25d AP |
1144 | } |
1145 | } | |
1146 | ||
107c1cdb ND |
1147 | // createTransport creates a connection to one of the backends in addrs. It |
1148 | // sets ac.transport in the success case, or it returns an error if it was | |
1149 | // unable to successfully create a transport. | |
1150 | // | |
1151 | // If waitForHandshake is enabled, it blocks until server preface arrives. | |
1152 | func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) { | |
1153 | onCloseCalled := make(chan struct{}) | |
1154 | ||
1155 | target := transport.TargetInfo{ | |
1156 | Addr: addr.Addr, | |
1157 | Metadata: addr.Metadata, | |
1158 | Authority: ac.cc.authority, | |
1159 | } | |
1160 | ||
1161 | prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now())) | |
1162 | ||
1163 | onGoAway := func(r transport.GoAwayReason) { | |
15c0b25d | 1164 | ac.mu.Lock() |
107c1cdb | 1165 | ac.adjustParams(r) |
15c0b25d | 1166 | ac.mu.Unlock() |
107c1cdb ND |
1167 | reconnect.Fire() |
1168 | } | |
1169 | ||
1170 | onClose := func() { | |
1171 | close(onCloseCalled) | |
1172 | prefaceTimer.Stop() | |
1173 | reconnect.Fire() | |
1174 | } | |
1175 | ||
1176 | onPrefaceReceipt := func() { | |
1177 | close(prefaceReceived) | |
1178 | prefaceTimer.Stop() | |
1179 | } | |
1180 | ||
1181 | connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) | |
1182 | defer cancel() | |
1183 | if channelz.IsOn() { | |
1184 | copts.ChannelzParentID = ac.channelzID | |
1185 | } | |
1186 | ||
1187 | newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose) | |
1188 | ||
1189 | if err == nil { | |
1190 | if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn { | |
15c0b25d | 1191 | select { |
107c1cdb ND |
1192 | case <-prefaceTimer.C: |
1193 | // We didn't get the preface in time. | |
1194 | newTr.Close() | |
1195 | err = errors.New("timed out waiting for server handshake") | |
1196 | case <-prefaceReceived: | |
1197 | // We got the preface - huzzah! things are good. | |
1198 | case <-onCloseCalled: | |
1199 | // The transport has already closed - noop. | |
1200 | return nil, errors.New("connection closed") | |
15c0b25d | 1201 | } |
107c1cdb ND |
1202 | } else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid { |
1203 | go func() { | |
1204 | select { | |
1205 | case <-prefaceTimer.C: | |
1206 | // We didn't get the preface in time. | |
1207 | newTr.Close() | |
1208 | case <-prefaceReceived: | |
1209 | // We got the preface just in the nick of time - huzzah! | |
1210 | case <-onCloseCalled: | |
1211 | // The transport has already closed - noop. | |
15c0b25d | 1212 | } |
107c1cdb | 1213 | }() |
15c0b25d AP |
1214 | } |
1215 | } | |
15c0b25d | 1216 | |
107c1cdb ND |
1217 | if err != nil { |
1218 | // newTr is either nil, or closed. | |
1219 | ac.cc.blockingpicker.updateConnectionError(err) | |
15c0b25d | 1220 | ac.mu.Lock() |
107c1cdb ND |
1221 | if ac.state == connectivity.Shutdown { |
1222 | // ac.tearDown(...) has been invoked. | |
15c0b25d | 1223 | ac.mu.Unlock() |
107c1cdb | 1224 | |
15c0b25d | 1225 | return nil, errConnClosing |
107c1cdb ND |
1226 | } |
1227 | ac.mu.Unlock() | |
1228 | grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) | |
1229 | return nil, err | |
1230 | } | |
1231 | ||
1232 | // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport. | |
1233 | ac.mu.Lock() | |
1234 | if ac.state == connectivity.Shutdown { | |
1235 | ac.mu.Unlock() | |
1236 | newTr.Close() | |
1237 | return nil, errConnClosing | |
1238 | } | |
1239 | ac.mu.Unlock() | |
1240 | ||
1241 | // Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport. | |
1242 | ac.mu.Lock() | |
1243 | if ac.state == connectivity.Shutdown { | |
1244 | ac.mu.Unlock() | |
1245 | newTr.Close() | |
1246 | return nil, errConnClosing | |
1247 | } | |
1248 | ac.mu.Unlock() | |
1249 | ||
1250 | return newTr, nil | |
1251 | } | |
1252 | ||
1253 | func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) { | |
1254 | // Set up the health check helper functions | |
1255 | newStream := func() (interface{}, error) { | |
1256 | return ac.newClientStream(ctx, &StreamDesc{ServerStreams: true}, "/grpc.health.v1.Health/Watch", newTr) | |
1257 | } | |
1258 | firstReady := true | |
1259 | reportHealth := func(ok bool) { | |
1260 | ac.mu.Lock() | |
1261 | defer ac.mu.Unlock() | |
1262 | if ac.transport != newTr { | |
1263 | return | |
1264 | } | |
1265 | if ok { | |
1266 | if firstReady { | |
1267 | firstReady = false | |
1268 | ac.curAddr = addr | |
15c0b25d | 1269 | } |
107c1cdb ND |
1270 | ac.updateConnectivityState(connectivity.Ready) |
1271 | } else { | |
1272 | ac.updateConnectivityState(connectivity.TransientFailure) | |
15c0b25d | 1273 | } |
107c1cdb ND |
1274 | } |
1275 | err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName) | |
1276 | if err != nil { | |
1277 | if status.Code(err) == codes.Unimplemented { | |
1278 | if channelz.IsOn() { | |
1279 | channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ | |
1280 | Desc: "Subchannel health check is unimplemented at server side, thus health check is disabled", | |
1281 | Severity: channelz.CtError, | |
1282 | }) | |
1283 | } | |
1284 | grpclog.Error("Subchannel health check is unimplemented at server side, thus health check is disabled") | |
1285 | } else { | |
1286 | grpclog.Errorf("HealthCheckFunc exits with unexpected error %v", err) | |
15c0b25d | 1287 | } |
107c1cdb ND |
1288 | } |
1289 | } | |
1290 | ||
1291 | func (ac *addrConn) resetConnectBackoff() { | |
1292 | ac.mu.Lock() | |
1293 | close(ac.resetBackoff) | |
1294 | ac.backoffIdx = 0 | |
1295 | ac.resetBackoff = make(chan struct{}) | |
1296 | ac.mu.Unlock() | |
1297 | } | |
1298 | ||
1299 | // getReadyTransport returns the transport if ac's state is READY. | |
1300 | // Otherwise it returns nil, false. | |
1301 | // If ac's state is IDLE, it will trigger ac to connect. | |
1302 | func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { | |
1303 | ac.mu.Lock() | |
1304 | if ac.state == connectivity.Ready && ac.transport != nil { | |
1305 | t := ac.transport | |
15c0b25d | 1306 | ac.mu.Unlock() |
107c1cdb | 1307 | return t, true |
15c0b25d | 1308 | } |
107c1cdb ND |
1309 | var idle bool |
1310 | if ac.state == connectivity.Idle { | |
1311 | idle = true | |
1312 | } | |
1313 | ac.mu.Unlock() | |
1314 | // Trigger idle ac to connect. | |
1315 | if idle { | |
1316 | ac.connect() | |
1317 | } | |
1318 | return nil, false | |
15c0b25d AP |
1319 | } |
1320 | ||
1321 | // tearDown starts to tear down the addrConn. | |
1322 | // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in | |
1323 | // some edge cases (e.g., the caller opens and closes many addrConn's in a | |
1324 | // tight loop. | |
1325 | // tearDown doesn't remove ac from ac.cc.conns. | |
1326 | func (ac *addrConn) tearDown(err error) { | |
15c0b25d | 1327 | ac.mu.Lock() |
107c1cdb ND |
1328 | if ac.state == connectivity.Shutdown { |
1329 | ac.mu.Unlock() | |
1330 | return | |
15c0b25d | 1331 | } |
107c1cdb ND |
1332 | curTr := ac.transport |
1333 | ac.transport = nil | |
1334 | // We have to set the state to Shutdown before anything else to prevent races | |
1335 | // between setting the state and logic that waits on context cancelation / etc. | |
1336 | ac.updateConnectivityState(connectivity.Shutdown) | |
1337 | ac.cancel() | |
1338 | ac.tearDownErr = err | |
1339 | ac.curAddr = resolver.Address{} | |
1340 | if err == errConnDrain && curTr != nil { | |
15c0b25d AP |
1341 | // GracefulClose(...) may be executed multiple times when |
1342 | // i) receiving multiple GoAway frames from the server; or | |
1343 | // ii) there are concurrent name resolver/Balancer triggered | |
1344 | // address removal and GoAway. | |
107c1cdb ND |
1345 | // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. |
1346 | ac.mu.Unlock() | |
1347 | curTr.GracefulClose() | |
1348 | ac.mu.Lock() | |
15c0b25d | 1349 | } |
107c1cdb ND |
1350 | if channelz.IsOn() { |
1351 | channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ | |
1352 | Desc: "Subchannel Deleted", | |
1353 | Severity: channelz.CtINFO, | |
1354 | Parent: &channelz.TraceEventDesc{ | |
1355 | Desc: fmt.Sprintf("Subchanel(id:%d) deleted", ac.channelzID), | |
1356 | Severity: channelz.CtINFO, | |
1357 | }, | |
1358 | }) | |
1359 | // TraceEvent needs to be called before RemoveEntry, as TraceEvent may add trace reference to | |
1360 | // the entity beng deleted, and thus prevent it from being deleted right away. | |
1361 | channelz.RemoveEntry(ac.channelzID) | |
15c0b25d | 1362 | } |
107c1cdb ND |
1363 | ac.mu.Unlock() |
1364 | } | |
1365 | ||
1366 | func (ac *addrConn) getState() connectivity.State { | |
1367 | ac.mu.Lock() | |
1368 | defer ac.mu.Unlock() | |
1369 | return ac.state | |
1370 | } | |
1371 | ||
1372 | func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { | |
1373 | ac.mu.Lock() | |
1374 | addr := ac.curAddr.Addr | |
1375 | ac.mu.Unlock() | |
1376 | return &channelz.ChannelInternalMetric{ | |
1377 | State: ac.getState(), | |
1378 | Target: addr, | |
1379 | CallsStarted: atomic.LoadInt64(&ac.czData.callsStarted), | |
1380 | CallsSucceeded: atomic.LoadInt64(&ac.czData.callsSucceeded), | |
1381 | CallsFailed: atomic.LoadInt64(&ac.czData.callsFailed), | |
1382 | LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&ac.czData.lastCallStartedTime)), | |
15c0b25d | 1383 | } |
107c1cdb ND |
1384 | } |
1385 | ||
1386 | func (ac *addrConn) incrCallsStarted() { | |
1387 | atomic.AddInt64(&ac.czData.callsStarted, 1) | |
1388 | atomic.StoreInt64(&ac.czData.lastCallStartedTime, time.Now().UnixNano()) | |
1389 | } | |
1390 | ||
1391 | func (ac *addrConn) incrCallsSucceeded() { | |
1392 | atomic.AddInt64(&ac.czData.callsSucceeded, 1) | |
1393 | } | |
1394 | ||
1395 | func (ac *addrConn) incrCallsFailed() { | |
1396 | atomic.AddInt64(&ac.czData.callsFailed, 1) | |
1397 | } | |
1398 | ||
1399 | type retryThrottler struct { | |
1400 | max float64 | |
1401 | thresh float64 | |
1402 | ratio float64 | |
1403 | ||
1404 | mu sync.Mutex | |
1405 | tokens float64 // TODO(dfawley): replace with atomic and remove lock. | |
1406 | } | |
1407 | ||
1408 | // throttle subtracts a retry token from the pool and returns whether a retry | |
1409 | // should be throttled (disallowed) based upon the retry throttling policy in | |
1410 | // the service config. | |
1411 | func (rt *retryThrottler) throttle() bool { | |
1412 | if rt == nil { | |
1413 | return false | |
15c0b25d | 1414 | } |
107c1cdb ND |
1415 | rt.mu.Lock() |
1416 | defer rt.mu.Unlock() | |
1417 | rt.tokens-- | |
1418 | if rt.tokens < 0 { | |
1419 | rt.tokens = 0 | |
15c0b25d | 1420 | } |
107c1cdb | 1421 | return rt.tokens <= rt.thresh |
15c0b25d | 1422 | } |
107c1cdb ND |
1423 | |
1424 | func (rt *retryThrottler) successfulRPC() { | |
1425 | if rt == nil { | |
1426 | return | |
1427 | } | |
1428 | rt.mu.Lock() | |
1429 | defer rt.mu.Unlock() | |
1430 | rt.tokens += rt.ratio | |
1431 | if rt.tokens > rt.max { | |
1432 | rt.tokens = rt.max | |
1433 | } | |
1434 | } | |
1435 | ||
1436 | type channelzChannel struct { | |
1437 | cc *ClientConn | |
1438 | } | |
1439 | ||
1440 | func (c *channelzChannel) ChannelzMetric() *channelz.ChannelInternalMetric { | |
1441 | return c.cc.channelzMetric() | |
1442 | } | |
1443 | ||
1444 | // ErrClientConnTimeout indicates that the ClientConn cannot establish the | |
1445 | // underlying connections within the specified timeout. | |
1446 | // | |
1447 | // Deprecated: This error is never returned by grpc and should not be | |
1448 | // referenced by users. | |
1449 | var ErrClientConnTimeout = errors.New("grpc: timed out when dialing") |