diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport/http2_client.go')
-rw-r--r-- | vendor/google.golang.org/grpc/internal/transport/http2_client.go | 1380 |
1 files changed, 1380 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_client.go b/vendor/google.golang.org/grpc/internal/transport/http2_client.go new file mode 100644 index 0000000..babcaee --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/transport/http2_client.go | |||
@@ -0,0 +1,1380 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package transport | ||
20 | |||
21 | import ( | ||
22 | "context" | ||
23 | "fmt" | ||
24 | "io" | ||
25 | "math" | ||
26 | "net" | ||
27 | "strconv" | ||
28 | "strings" | ||
29 | "sync" | ||
30 | "sync/atomic" | ||
31 | "time" | ||
32 | |||
33 | "golang.org/x/net/http2" | ||
34 | "golang.org/x/net/http2/hpack" | ||
35 | |||
36 | "google.golang.org/grpc/codes" | ||
37 | "google.golang.org/grpc/credentials" | ||
38 | "google.golang.org/grpc/internal/channelz" | ||
39 | "google.golang.org/grpc/internal/syscall" | ||
40 | "google.golang.org/grpc/keepalive" | ||
41 | "google.golang.org/grpc/metadata" | ||
42 | "google.golang.org/grpc/peer" | ||
43 | "google.golang.org/grpc/stats" | ||
44 | "google.golang.org/grpc/status" | ||
45 | ) | ||
46 | |||
47 | // http2Client implements the ClientTransport interface with HTTP2. | ||
48 | type http2Client struct { | ||
49 | ctx context.Context | ||
50 | cancel context.CancelFunc | ||
51 | ctxDone <-chan struct{} // Cache the ctx.Done() chan. | ||
52 | userAgent string | ||
53 | md interface{} | ||
54 | conn net.Conn // underlying communication channel | ||
55 | loopy *loopyWriter | ||
56 | remoteAddr net.Addr | ||
57 | localAddr net.Addr | ||
58 | authInfo credentials.AuthInfo // auth info about the connection | ||
59 | |||
60 | readerDone chan struct{} // sync point to enable testing. | ||
61 | writerDone chan struct{} // sync point to enable testing. | ||
62 | // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) | ||
63 | // that the server sent GoAway on this transport. | ||
64 | goAway chan struct{} | ||
65 | // awakenKeepalive is used to wake up keepalive when after it has gone dormant. | ||
66 | awakenKeepalive chan struct{} | ||
67 | |||
68 | framer *framer | ||
69 | // controlBuf delivers all the control related tasks (e.g., window | ||
70 | // updates, reset streams, and various settings) to the controller. | ||
71 | controlBuf *controlBuffer | ||
72 | fc *trInFlow | ||
73 | // The scheme used: https if TLS is on, http otherwise. | ||
74 | scheme string | ||
75 | |||
76 | isSecure bool | ||
77 | |||
78 | perRPCCreds []credentials.PerRPCCredentials | ||
79 | |||
80 | // Boolean to keep track of reading activity on transport. | ||
81 | // 1 is true and 0 is false. | ||
82 | activity uint32 // Accessed atomically. | ||
83 | kp keepalive.ClientParameters | ||
84 | keepaliveEnabled bool | ||
85 | |||
86 | statsHandler stats.Handler | ||
87 | |||
88 | initialWindowSize int32 | ||
89 | |||
90 | // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE | ||
91 | maxSendHeaderListSize *uint32 | ||
92 | |||
93 | bdpEst *bdpEstimator | ||
94 | // onPrefaceReceipt is a callback that client transport calls upon | ||
95 | // receiving server preface to signal that a succefull HTTP2 | ||
96 | // connection was established. | ||
97 | onPrefaceReceipt func() | ||
98 | |||
99 | maxConcurrentStreams uint32 | ||
100 | streamQuota int64 | ||
101 | streamsQuotaAvailable chan struct{} | ||
102 | waitingStreams uint32 | ||
103 | nextID uint32 | ||
104 | |||
105 | mu sync.Mutex // guard the following variables | ||
106 | state transportState | ||
107 | activeStreams map[uint32]*Stream | ||
108 | // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. | ||
109 | prevGoAwayID uint32 | ||
110 | // goAwayReason records the http2.ErrCode and debug data received with the | ||
111 | // GoAway frame. | ||
112 | goAwayReason GoAwayReason | ||
113 | |||
114 | // Fields below are for channelz metric collection. | ||
115 | channelzID int64 // channelz unique identification number | ||
116 | czData *channelzData | ||
117 | |||
118 | onGoAway func(GoAwayReason) | ||
119 | onClose func() | ||
120 | } | ||
121 | |||
122 | func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { | ||
123 | if fn != nil { | ||
124 | return fn(ctx, addr) | ||
125 | } | ||
126 | return (&net.Dialer{}).DialContext(ctx, "tcp", addr) | ||
127 | } | ||
128 | |||
129 | func isTemporary(err error) bool { | ||
130 | switch err := err.(type) { | ||
131 | case interface { | ||
132 | Temporary() bool | ||
133 | }: | ||
134 | return err.Temporary() | ||
135 | case interface { | ||
136 | Timeout() bool | ||
137 | }: | ||
138 | // Timeouts may be resolved upon retry, and are thus treated as | ||
139 | // temporary. | ||
140 | return err.Timeout() | ||
141 | } | ||
142 | return true | ||
143 | } | ||
144 | |||
145 | // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 | ||
146 | // and starts to receive messages on it. Non-nil error returns if construction | ||
147 | // fails. | ||
148 | func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) { | ||
149 | scheme := "http" | ||
150 | ctx, cancel := context.WithCancel(ctx) | ||
151 | defer func() { | ||
152 | if err != nil { | ||
153 | cancel() | ||
154 | } | ||
155 | }() | ||
156 | |||
157 | conn, err := dial(connectCtx, opts.Dialer, addr.Addr) | ||
158 | if err != nil { | ||
159 | if opts.FailOnNonTempDialError { | ||
160 | return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err) | ||
161 | } | ||
162 | return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err) | ||
163 | } | ||
164 | // Any further errors will close the underlying connection | ||
165 | defer func(conn net.Conn) { | ||
166 | if err != nil { | ||
167 | conn.Close() | ||
168 | } | ||
169 | }(conn) | ||
170 | kp := opts.KeepaliveParams | ||
171 | // Validate keepalive parameters. | ||
172 | if kp.Time == 0 { | ||
173 | kp.Time = defaultClientKeepaliveTime | ||
174 | } | ||
175 | if kp.Timeout == 0 { | ||
176 | kp.Timeout = defaultClientKeepaliveTimeout | ||
177 | } | ||
178 | keepaliveEnabled := false | ||
179 | if kp.Time != infinity { | ||
180 | if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil { | ||
181 | return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err) | ||
182 | } | ||
183 | keepaliveEnabled = true | ||
184 | } | ||
185 | var ( | ||
186 | isSecure bool | ||
187 | authInfo credentials.AuthInfo | ||
188 | ) | ||
189 | transportCreds := opts.TransportCredentials | ||
190 | perRPCCreds := opts.PerRPCCredentials | ||
191 | |||
192 | if b := opts.CredsBundle; b != nil { | ||
193 | if t := b.TransportCredentials(); t != nil { | ||
194 | transportCreds = t | ||
195 | } | ||
196 | if t := b.PerRPCCredentials(); t != nil { | ||
197 | perRPCCreds = append(perRPCCreds, t) | ||
198 | } | ||
199 | } | ||
200 | if transportCreds != nil { | ||
201 | scheme = "https" | ||
202 | conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn) | ||
203 | if err != nil { | ||
204 | return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err) | ||
205 | } | ||
206 | isSecure = true | ||
207 | } | ||
208 | dynamicWindow := true | ||
209 | icwz := int32(initialWindowSize) | ||
210 | if opts.InitialConnWindowSize >= defaultWindowSize { | ||
211 | icwz = opts.InitialConnWindowSize | ||
212 | dynamicWindow = false | ||
213 | } | ||
214 | writeBufSize := opts.WriteBufferSize | ||
215 | readBufSize := opts.ReadBufferSize | ||
216 | maxHeaderListSize := defaultClientMaxHeaderListSize | ||
217 | if opts.MaxHeaderListSize != nil { | ||
218 | maxHeaderListSize = *opts.MaxHeaderListSize | ||
219 | } | ||
220 | t := &http2Client{ | ||
221 | ctx: ctx, | ||
222 | ctxDone: ctx.Done(), // Cache Done chan. | ||
223 | cancel: cancel, | ||
224 | userAgent: opts.UserAgent, | ||
225 | md: addr.Metadata, | ||
226 | conn: conn, | ||
227 | remoteAddr: conn.RemoteAddr(), | ||
228 | localAddr: conn.LocalAddr(), | ||
229 | authInfo: authInfo, | ||
230 | readerDone: make(chan struct{}), | ||
231 | writerDone: make(chan struct{}), | ||
232 | goAway: make(chan struct{}), | ||
233 | awakenKeepalive: make(chan struct{}, 1), | ||
234 | framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize), | ||
235 | fc: &trInFlow{limit: uint32(icwz)}, | ||
236 | scheme: scheme, | ||
237 | activeStreams: make(map[uint32]*Stream), | ||
238 | isSecure: isSecure, | ||
239 | perRPCCreds: perRPCCreds, | ||
240 | kp: kp, | ||
241 | statsHandler: opts.StatsHandler, | ||
242 | initialWindowSize: initialWindowSize, | ||
243 | onPrefaceReceipt: onPrefaceReceipt, | ||
244 | nextID: 1, | ||
245 | maxConcurrentStreams: defaultMaxStreamsClient, | ||
246 | streamQuota: defaultMaxStreamsClient, | ||
247 | streamsQuotaAvailable: make(chan struct{}, 1), | ||
248 | czData: new(channelzData), | ||
249 | onGoAway: onGoAway, | ||
250 | onClose: onClose, | ||
251 | keepaliveEnabled: keepaliveEnabled, | ||
252 | } | ||
253 | t.controlBuf = newControlBuffer(t.ctxDone) | ||
254 | if opts.InitialWindowSize >= defaultWindowSize { | ||
255 | t.initialWindowSize = opts.InitialWindowSize | ||
256 | dynamicWindow = false | ||
257 | } | ||
258 | if dynamicWindow { | ||
259 | t.bdpEst = &bdpEstimator{ | ||
260 | bdp: initialWindowSize, | ||
261 | updateFlowControl: t.updateFlowControl, | ||
262 | } | ||
263 | } | ||
264 | // Make sure awakenKeepalive can't be written upon. | ||
265 | // keepalive routine will make it writable, if need be. | ||
266 | t.awakenKeepalive <- struct{}{} | ||
267 | if t.statsHandler != nil { | ||
268 | t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ | ||
269 | RemoteAddr: t.remoteAddr, | ||
270 | LocalAddr: t.localAddr, | ||
271 | }) | ||
272 | connBegin := &stats.ConnBegin{ | ||
273 | Client: true, | ||
274 | } | ||
275 | t.statsHandler.HandleConn(t.ctx, connBegin) | ||
276 | } | ||
277 | if channelz.IsOn() { | ||
278 | t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr)) | ||
279 | } | ||
280 | if t.keepaliveEnabled { | ||
281 | go t.keepalive() | ||
282 | } | ||
283 | // Start the reader goroutine for incoming message. Each transport has | ||
284 | // a dedicated goroutine which reads HTTP2 frame from network. Then it | ||
285 | // dispatches the frame to the corresponding stream entity. | ||
286 | go t.reader() | ||
287 | |||
288 | // Send connection preface to server. | ||
289 | n, err := t.conn.Write(clientPreface) | ||
290 | if err != nil { | ||
291 | t.Close() | ||
292 | return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err) | ||
293 | } | ||
294 | if n != len(clientPreface) { | ||
295 | t.Close() | ||
296 | return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) | ||
297 | } | ||
298 | var ss []http2.Setting | ||
299 | |||
300 | if t.initialWindowSize != defaultWindowSize { | ||
301 | ss = append(ss, http2.Setting{ | ||
302 | ID: http2.SettingInitialWindowSize, | ||
303 | Val: uint32(t.initialWindowSize), | ||
304 | }) | ||
305 | } | ||
306 | if opts.MaxHeaderListSize != nil { | ||
307 | ss = append(ss, http2.Setting{ | ||
308 | ID: http2.SettingMaxHeaderListSize, | ||
309 | Val: *opts.MaxHeaderListSize, | ||
310 | }) | ||
311 | } | ||
312 | err = t.framer.fr.WriteSettings(ss...) | ||
313 | if err != nil { | ||
314 | t.Close() | ||
315 | return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err) | ||
316 | } | ||
317 | // Adjust the connection flow control window if needed. | ||
318 | if delta := uint32(icwz - defaultWindowSize); delta > 0 { | ||
319 | if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil { | ||
320 | t.Close() | ||
321 | return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err) | ||
322 | } | ||
323 | } | ||
324 | |||
325 | t.framer.writer.Flush() | ||
326 | go func() { | ||
327 | t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst) | ||
328 | err := t.loopy.run() | ||
329 | if err != nil { | ||
330 | errorf("transport: loopyWriter.run returning. Err: %v", err) | ||
331 | } | ||
332 | // If it's a connection error, let reader goroutine handle it | ||
333 | // since there might be data in the buffers. | ||
334 | if _, ok := err.(net.Error); !ok { | ||
335 | t.conn.Close() | ||
336 | } | ||
337 | close(t.writerDone) | ||
338 | }() | ||
339 | return t, nil | ||
340 | } | ||
341 | |||
342 | func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { | ||
343 | // TODO(zhaoq): Handle uint32 overflow of Stream.id. | ||
344 | s := &Stream{ | ||
345 | done: make(chan struct{}), | ||
346 | method: callHdr.Method, | ||
347 | sendCompress: callHdr.SendCompress, | ||
348 | buf: newRecvBuffer(), | ||
349 | headerChan: make(chan struct{}), | ||
350 | contentSubtype: callHdr.ContentSubtype, | ||
351 | } | ||
352 | s.wq = newWriteQuota(defaultWriteQuota, s.done) | ||
353 | s.requestRead = func(n int) { | ||
354 | t.adjustWindow(s, uint32(n)) | ||
355 | } | ||
356 | // The client side stream context should have exactly the same life cycle with the user provided context. | ||
357 | // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done. | ||
358 | // So we use the original context here instead of creating a copy. | ||
359 | s.ctx = ctx | ||
360 | s.trReader = &transportReader{ | ||
361 | reader: &recvBufferReader{ | ||
362 | ctx: s.ctx, | ||
363 | ctxDone: s.ctx.Done(), | ||
364 | recv: s.buf, | ||
365 | closeStream: func(err error) { | ||
366 | t.CloseStream(s, err) | ||
367 | }, | ||
368 | }, | ||
369 | windowHandler: func(n int) { | ||
370 | t.updateWindow(s, uint32(n)) | ||
371 | }, | ||
372 | } | ||
373 | return s | ||
374 | } | ||
375 | |||
376 | func (t *http2Client) getPeer() *peer.Peer { | ||
377 | pr := &peer.Peer{ | ||
378 | Addr: t.remoteAddr, | ||
379 | } | ||
380 | // Attach Auth info if there is any. | ||
381 | if t.authInfo != nil { | ||
382 | pr.AuthInfo = t.authInfo | ||
383 | } | ||
384 | return pr | ||
385 | } | ||
386 | |||
387 | func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) { | ||
388 | aud := t.createAudience(callHdr) | ||
389 | authData, err := t.getTrAuthData(ctx, aud) | ||
390 | if err != nil { | ||
391 | return nil, err | ||
392 | } | ||
393 | callAuthData, err := t.getCallAuthData(ctx, aud, callHdr) | ||
394 | if err != nil { | ||
395 | return nil, err | ||
396 | } | ||
397 | // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields | ||
398 | // first and create a slice of that exact size. | ||
399 | // Make the slice of certain predictable size to reduce allocations made by append. | ||
400 | hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te | ||
401 | hfLen += len(authData) + len(callAuthData) | ||
402 | headerFields := make([]hpack.HeaderField, 0, hfLen) | ||
403 | headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"}) | ||
404 | headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme}) | ||
405 | headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method}) | ||
406 | headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) | ||
407 | headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)}) | ||
408 | headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent}) | ||
409 | headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"}) | ||
410 | if callHdr.PreviousAttempts > 0 { | ||
411 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)}) | ||
412 | } | ||
413 | |||
414 | if callHdr.SendCompress != "" { | ||
415 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) | ||
416 | } | ||
417 | if dl, ok := ctx.Deadline(); ok { | ||
418 | // Send out timeout regardless its value. The server can detect timeout context by itself. | ||
419 | // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire. | ||
420 | timeout := dl.Sub(time.Now()) | ||
421 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)}) | ||
422 | } | ||
423 | for k, v := range authData { | ||
424 | headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
425 | } | ||
426 | for k, v := range callAuthData { | ||
427 | headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
428 | } | ||
429 | if b := stats.OutgoingTags(ctx); b != nil { | ||
430 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)}) | ||
431 | } | ||
432 | if b := stats.OutgoingTrace(ctx); b != nil { | ||
433 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)}) | ||
434 | } | ||
435 | |||
436 | if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok { | ||
437 | var k string | ||
438 | for _, vv := range added { | ||
439 | for i, v := range vv { | ||
440 | if i%2 == 0 { | ||
441 | k = v | ||
442 | continue | ||
443 | } | ||
444 | // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. | ||
445 | if isReservedHeader(k) { | ||
446 | continue | ||
447 | } | ||
448 | headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)}) | ||
449 | } | ||
450 | } | ||
451 | for k, vv := range md { | ||
452 | // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. | ||
453 | if isReservedHeader(k) { | ||
454 | continue | ||
455 | } | ||
456 | for _, v := range vv { | ||
457 | headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
458 | } | ||
459 | } | ||
460 | } | ||
461 | if md, ok := t.md.(*metadata.MD); ok { | ||
462 | for k, vv := range *md { | ||
463 | if isReservedHeader(k) { | ||
464 | continue | ||
465 | } | ||
466 | for _, v := range vv { | ||
467 | headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
468 | } | ||
469 | } | ||
470 | } | ||
471 | return headerFields, nil | ||
472 | } | ||
473 | |||
474 | func (t *http2Client) createAudience(callHdr *CallHdr) string { | ||
475 | // Create an audience string only if needed. | ||
476 | if len(t.perRPCCreds) == 0 && callHdr.Creds == nil { | ||
477 | return "" | ||
478 | } | ||
479 | // Construct URI required to get auth request metadata. | ||
480 | // Omit port if it is the default one. | ||
481 | host := strings.TrimSuffix(callHdr.Host, ":443") | ||
482 | pos := strings.LastIndex(callHdr.Method, "/") | ||
483 | if pos == -1 { | ||
484 | pos = len(callHdr.Method) | ||
485 | } | ||
486 | return "https://" + host + callHdr.Method[:pos] | ||
487 | } | ||
488 | |||
489 | func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) { | ||
490 | authData := map[string]string{} | ||
491 | for _, c := range t.perRPCCreds { | ||
492 | data, err := c.GetRequestMetadata(ctx, audience) | ||
493 | if err != nil { | ||
494 | if _, ok := status.FromError(err); ok { | ||
495 | return nil, err | ||
496 | } | ||
497 | |||
498 | return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err) | ||
499 | } | ||
500 | for k, v := range data { | ||
501 | // Capital header names are illegal in HTTP/2. | ||
502 | k = strings.ToLower(k) | ||
503 | authData[k] = v | ||
504 | } | ||
505 | } | ||
506 | return authData, nil | ||
507 | } | ||
508 | |||
509 | func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) { | ||
510 | callAuthData := map[string]string{} | ||
511 | // Check if credentials.PerRPCCredentials were provided via call options. | ||
512 | // Note: if these credentials are provided both via dial options and call | ||
513 | // options, then both sets of credentials will be applied. | ||
514 | if callCreds := callHdr.Creds; callCreds != nil { | ||
515 | if !t.isSecure && callCreds.RequireTransportSecurity() { | ||
516 | return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection") | ||
517 | } | ||
518 | data, err := callCreds.GetRequestMetadata(ctx, audience) | ||
519 | if err != nil { | ||
520 | return nil, status.Errorf(codes.Internal, "transport: %v", err) | ||
521 | } | ||
522 | for k, v := range data { | ||
523 | // Capital header names are illegal in HTTP/2 | ||
524 | k = strings.ToLower(k) | ||
525 | callAuthData[k] = v | ||
526 | } | ||
527 | } | ||
528 | return callAuthData, nil | ||
529 | } | ||
530 | |||
531 | // NewStream creates a stream and registers it into the transport as "active" | ||
532 | // streams. | ||
533 | func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { | ||
534 | ctx = peer.NewContext(ctx, t.getPeer()) | ||
535 | headerFields, err := t.createHeaderFields(ctx, callHdr) | ||
536 | if err != nil { | ||
537 | return nil, err | ||
538 | } | ||
539 | s := t.newStream(ctx, callHdr) | ||
540 | cleanup := func(err error) { | ||
541 | if s.swapState(streamDone) == streamDone { | ||
542 | // If it was already done, return. | ||
543 | return | ||
544 | } | ||
545 | // The stream was unprocessed by the server. | ||
546 | atomic.StoreUint32(&s.unprocessed, 1) | ||
547 | s.write(recvMsg{err: err}) | ||
548 | close(s.done) | ||
549 | // If headerChan isn't closed, then close it. | ||
550 | if atomic.SwapUint32(&s.headerDone, 1) == 0 { | ||
551 | close(s.headerChan) | ||
552 | } | ||
553 | |||
554 | } | ||
555 | hdr := &headerFrame{ | ||
556 | hf: headerFields, | ||
557 | endStream: false, | ||
558 | initStream: func(id uint32) (bool, error) { | ||
559 | t.mu.Lock() | ||
560 | if state := t.state; state != reachable { | ||
561 | t.mu.Unlock() | ||
562 | // Do a quick cleanup. | ||
563 | err := error(errStreamDrain) | ||
564 | if state == closing { | ||
565 | err = ErrConnClosing | ||
566 | } | ||
567 | cleanup(err) | ||
568 | return false, err | ||
569 | } | ||
570 | t.activeStreams[id] = s | ||
571 | if channelz.IsOn() { | ||
572 | atomic.AddInt64(&t.czData.streamsStarted, 1) | ||
573 | atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) | ||
574 | } | ||
575 | var sendPing bool | ||
576 | // If the number of active streams change from 0 to 1, then check if keepalive | ||
577 | // has gone dormant. If so, wake it up. | ||
578 | if len(t.activeStreams) == 1 && t.keepaliveEnabled { | ||
579 | select { | ||
580 | case t.awakenKeepalive <- struct{}{}: | ||
581 | sendPing = true | ||
582 | // Fill the awakenKeepalive channel again as this channel must be | ||
583 | // kept non-writable except at the point that the keepalive() | ||
584 | // goroutine is waiting either to be awaken or shutdown. | ||
585 | t.awakenKeepalive <- struct{}{} | ||
586 | default: | ||
587 | } | ||
588 | } | ||
589 | t.mu.Unlock() | ||
590 | return sendPing, nil | ||
591 | }, | ||
592 | onOrphaned: cleanup, | ||
593 | wq: s.wq, | ||
594 | } | ||
595 | firstTry := true | ||
596 | var ch chan struct{} | ||
597 | checkForStreamQuota := func(it interface{}) bool { | ||
598 | if t.streamQuota <= 0 { // Can go negative if server decreases it. | ||
599 | if firstTry { | ||
600 | t.waitingStreams++ | ||
601 | } | ||
602 | ch = t.streamsQuotaAvailable | ||
603 | return false | ||
604 | } | ||
605 | if !firstTry { | ||
606 | t.waitingStreams-- | ||
607 | } | ||
608 | t.streamQuota-- | ||
609 | h := it.(*headerFrame) | ||
610 | h.streamID = t.nextID | ||
611 | t.nextID += 2 | ||
612 | s.id = h.streamID | ||
613 | s.fc = &inFlow{limit: uint32(t.initialWindowSize)} | ||
614 | if t.streamQuota > 0 && t.waitingStreams > 0 { | ||
615 | select { | ||
616 | case t.streamsQuotaAvailable <- struct{}{}: | ||
617 | default: | ||
618 | } | ||
619 | } | ||
620 | return true | ||
621 | } | ||
622 | var hdrListSizeErr error | ||
623 | checkForHeaderListSize := func(it interface{}) bool { | ||
624 | if t.maxSendHeaderListSize == nil { | ||
625 | return true | ||
626 | } | ||
627 | hdrFrame := it.(*headerFrame) | ||
628 | var sz int64 | ||
629 | for _, f := range hdrFrame.hf { | ||
630 | if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { | ||
631 | hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize) | ||
632 | return false | ||
633 | } | ||
634 | } | ||
635 | return true | ||
636 | } | ||
637 | for { | ||
638 | success, err := t.controlBuf.executeAndPut(func(it interface{}) bool { | ||
639 | if !checkForStreamQuota(it) { | ||
640 | return false | ||
641 | } | ||
642 | if !checkForHeaderListSize(it) { | ||
643 | return false | ||
644 | } | ||
645 | return true | ||
646 | }, hdr) | ||
647 | if err != nil { | ||
648 | return nil, err | ||
649 | } | ||
650 | if success { | ||
651 | break | ||
652 | } | ||
653 | if hdrListSizeErr != nil { | ||
654 | return nil, hdrListSizeErr | ||
655 | } | ||
656 | firstTry = false | ||
657 | select { | ||
658 | case <-ch: | ||
659 | case <-s.ctx.Done(): | ||
660 | return nil, ContextErr(s.ctx.Err()) | ||
661 | case <-t.goAway: | ||
662 | return nil, errStreamDrain | ||
663 | case <-t.ctx.Done(): | ||
664 | return nil, ErrConnClosing | ||
665 | } | ||
666 | } | ||
667 | if t.statsHandler != nil { | ||
668 | outHeader := &stats.OutHeader{ | ||
669 | Client: true, | ||
670 | FullMethod: callHdr.Method, | ||
671 | RemoteAddr: t.remoteAddr, | ||
672 | LocalAddr: t.localAddr, | ||
673 | Compression: callHdr.SendCompress, | ||
674 | } | ||
675 | t.statsHandler.HandleRPC(s.ctx, outHeader) | ||
676 | } | ||
677 | return s, nil | ||
678 | } | ||
679 | |||
680 | // CloseStream clears the footprint of a stream when the stream is not needed any more. | ||
681 | // This must not be executed in reader's goroutine. | ||
682 | func (t *http2Client) CloseStream(s *Stream, err error) { | ||
683 | var ( | ||
684 | rst bool | ||
685 | rstCode http2.ErrCode | ||
686 | ) | ||
687 | if err != nil { | ||
688 | rst = true | ||
689 | rstCode = http2.ErrCodeCancel | ||
690 | } | ||
691 | t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false) | ||
692 | } | ||
693 | |||
694 | func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) { | ||
695 | // Set stream status to done. | ||
696 | if s.swapState(streamDone) == streamDone { | ||
697 | // If it was already done, return. If multiple closeStream calls | ||
698 | // happen simultaneously, wait for the first to finish. | ||
699 | <-s.done | ||
700 | return | ||
701 | } | ||
702 | // status and trailers can be updated here without any synchronization because the stream goroutine will | ||
703 | // only read it after it sees an io.EOF error from read or write and we'll write those errors | ||
704 | // only after updating this. | ||
705 | s.status = st | ||
706 | if len(mdata) > 0 { | ||
707 | s.trailer = mdata | ||
708 | } | ||
709 | if err != nil { | ||
710 | // This will unblock reads eventually. | ||
711 | s.write(recvMsg{err: err}) | ||
712 | } | ||
713 | // If headerChan isn't closed, then close it. | ||
714 | if atomic.SwapUint32(&s.headerDone, 1) == 0 { | ||
715 | s.noHeaders = true | ||
716 | close(s.headerChan) | ||
717 | } | ||
718 | cleanup := &cleanupStream{ | ||
719 | streamID: s.id, | ||
720 | onWrite: func() { | ||
721 | t.mu.Lock() | ||
722 | if t.activeStreams != nil { | ||
723 | delete(t.activeStreams, s.id) | ||
724 | } | ||
725 | t.mu.Unlock() | ||
726 | if channelz.IsOn() { | ||
727 | if eosReceived { | ||
728 | atomic.AddInt64(&t.czData.streamsSucceeded, 1) | ||
729 | } else { | ||
730 | atomic.AddInt64(&t.czData.streamsFailed, 1) | ||
731 | } | ||
732 | } | ||
733 | }, | ||
734 | rst: rst, | ||
735 | rstCode: rstCode, | ||
736 | } | ||
737 | addBackStreamQuota := func(interface{}) bool { | ||
738 | t.streamQuota++ | ||
739 | if t.streamQuota > 0 && t.waitingStreams > 0 { | ||
740 | select { | ||
741 | case t.streamsQuotaAvailable <- struct{}{}: | ||
742 | default: | ||
743 | } | ||
744 | } | ||
745 | return true | ||
746 | } | ||
747 | t.controlBuf.executeAndPut(addBackStreamQuota, cleanup) | ||
748 | // This will unblock write. | ||
749 | close(s.done) | ||
750 | } | ||
751 | |||
752 | // Close kicks off the shutdown process of the transport. This should be called | ||
753 | // only once on a transport. Once it is called, the transport should not be | ||
754 | // accessed any more. | ||
755 | // | ||
756 | // This method blocks until the addrConn that initiated this transport is | ||
757 | // re-connected. This happens because t.onClose() begins reconnect logic at the | ||
758 | // addrConn level and blocks until the addrConn is successfully connected. | ||
759 | func (t *http2Client) Close() error { | ||
760 | t.mu.Lock() | ||
761 | // Make sure we only Close once. | ||
762 | if t.state == closing { | ||
763 | t.mu.Unlock() | ||
764 | return nil | ||
765 | } | ||
766 | t.state = closing | ||
767 | streams := t.activeStreams | ||
768 | t.activeStreams = nil | ||
769 | t.mu.Unlock() | ||
770 | t.controlBuf.finish() | ||
771 | t.cancel() | ||
772 | err := t.conn.Close() | ||
773 | if channelz.IsOn() { | ||
774 | channelz.RemoveEntry(t.channelzID) | ||
775 | } | ||
776 | // Notify all active streams. | ||
777 | for _, s := range streams { | ||
778 | t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false) | ||
779 | } | ||
780 | if t.statsHandler != nil { | ||
781 | connEnd := &stats.ConnEnd{ | ||
782 | Client: true, | ||
783 | } | ||
784 | t.statsHandler.HandleConn(t.ctx, connEnd) | ||
785 | } | ||
786 | t.onClose() | ||
787 | return err | ||
788 | } | ||
789 | |||
790 | // GracefulClose sets the state to draining, which prevents new streams from | ||
791 | // being created and causes the transport to be closed when the last active | ||
792 | // stream is closed. If there are no active streams, the transport is closed | ||
793 | // immediately. This does nothing if the transport is already draining or | ||
794 | // closing. | ||
795 | func (t *http2Client) GracefulClose() error { | ||
796 | t.mu.Lock() | ||
797 | // Make sure we move to draining only from active. | ||
798 | if t.state == draining || t.state == closing { | ||
799 | t.mu.Unlock() | ||
800 | return nil | ||
801 | } | ||
802 | t.state = draining | ||
803 | active := len(t.activeStreams) | ||
804 | t.mu.Unlock() | ||
805 | if active == 0 { | ||
806 | return t.Close() | ||
807 | } | ||
808 | t.controlBuf.put(&incomingGoAway{}) | ||
809 | return nil | ||
810 | } | ||
811 | |||
812 | // Write formats the data into HTTP2 data frame(s) and sends it out. The caller | ||
813 | // should proceed only if Write returns nil. | ||
814 | func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { | ||
815 | if opts.Last { | ||
816 | // If it's the last message, update stream state. | ||
817 | if !s.compareAndSwapState(streamActive, streamWriteDone) { | ||
818 | return errStreamDone | ||
819 | } | ||
820 | } else if s.getState() != streamActive { | ||
821 | return errStreamDone | ||
822 | } | ||
823 | df := &dataFrame{ | ||
824 | streamID: s.id, | ||
825 | endStream: opts.Last, | ||
826 | } | ||
827 | if hdr != nil || data != nil { // If it's not an empty data frame. | ||
828 | // Add some data to grpc message header so that we can equally | ||
829 | // distribute bytes across frames. | ||
830 | emptyLen := http2MaxFrameLen - len(hdr) | ||
831 | if emptyLen > len(data) { | ||
832 | emptyLen = len(data) | ||
833 | } | ||
834 | hdr = append(hdr, data[:emptyLen]...) | ||
835 | data = data[emptyLen:] | ||
836 | df.h, df.d = hdr, data | ||
837 | // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler. | ||
838 | if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { | ||
839 | return err | ||
840 | } | ||
841 | } | ||
842 | return t.controlBuf.put(df) | ||
843 | } | ||
844 | |||
845 | func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { | ||
846 | t.mu.Lock() | ||
847 | defer t.mu.Unlock() | ||
848 | s, ok := t.activeStreams[f.Header().StreamID] | ||
849 | return s, ok | ||
850 | } | ||
851 | |||
852 | // adjustWindow sends out extra window update over the initial window size | ||
853 | // of stream if the application is requesting data larger in size than | ||
854 | // the window. | ||
855 | func (t *http2Client) adjustWindow(s *Stream, n uint32) { | ||
856 | if w := s.fc.maybeAdjust(n); w > 0 { | ||
857 | t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) | ||
858 | } | ||
859 | } | ||
860 | |||
861 | // updateWindow adjusts the inbound quota for the stream. | ||
862 | // Window updates will be sent out when the cumulative quota | ||
863 | // exceeds the corresponding threshold. | ||
864 | func (t *http2Client) updateWindow(s *Stream, n uint32) { | ||
865 | if w := s.fc.onRead(n); w > 0 { | ||
866 | t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) | ||
867 | } | ||
868 | } | ||
869 | |||
870 | // updateFlowControl updates the incoming flow control windows | ||
871 | // for the transport and the stream based on the current bdp | ||
872 | // estimation. | ||
873 | func (t *http2Client) updateFlowControl(n uint32) { | ||
874 | t.mu.Lock() | ||
875 | for _, s := range t.activeStreams { | ||
876 | s.fc.newLimit(n) | ||
877 | } | ||
878 | t.mu.Unlock() | ||
879 | updateIWS := func(interface{}) bool { | ||
880 | t.initialWindowSize = int32(n) | ||
881 | return true | ||
882 | } | ||
883 | t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)}) | ||
884 | t.controlBuf.put(&outgoingSettings{ | ||
885 | ss: []http2.Setting{ | ||
886 | { | ||
887 | ID: http2.SettingInitialWindowSize, | ||
888 | Val: n, | ||
889 | }, | ||
890 | }, | ||
891 | }) | ||
892 | } | ||
893 | |||
894 | func (t *http2Client) handleData(f *http2.DataFrame) { | ||
895 | size := f.Header().Length | ||
896 | var sendBDPPing bool | ||
897 | if t.bdpEst != nil { | ||
898 | sendBDPPing = t.bdpEst.add(size) | ||
899 | } | ||
900 | // Decouple connection's flow control from application's read. | ||
901 | // An update on connection's flow control should not depend on | ||
902 | // whether user application has read the data or not. Such a | ||
903 | // restriction is already imposed on the stream's flow control, | ||
904 | // and therefore the sender will be blocked anyways. | ||
905 | // Decoupling the connection flow control will prevent other | ||
906 | // active(fast) streams from starving in presence of slow or | ||
907 | // inactive streams. | ||
908 | // | ||
909 | if w := t.fc.onData(size); w > 0 { | ||
910 | t.controlBuf.put(&outgoingWindowUpdate{ | ||
911 | streamID: 0, | ||
912 | increment: w, | ||
913 | }) | ||
914 | } | ||
915 | if sendBDPPing { | ||
916 | // Avoid excessive ping detection (e.g. in an L7 proxy) | ||
917 | // by sending a window update prior to the BDP ping. | ||
918 | |||
919 | if w := t.fc.reset(); w > 0 { | ||
920 | t.controlBuf.put(&outgoingWindowUpdate{ | ||
921 | streamID: 0, | ||
922 | increment: w, | ||
923 | }) | ||
924 | } | ||
925 | |||
926 | t.controlBuf.put(bdpPing) | ||
927 | } | ||
928 | // Select the right stream to dispatch. | ||
929 | s, ok := t.getStream(f) | ||
930 | if !ok { | ||
931 | return | ||
932 | } | ||
933 | if size > 0 { | ||
934 | if err := s.fc.onData(size); err != nil { | ||
935 | t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false) | ||
936 | return | ||
937 | } | ||
938 | if f.Header().Flags.Has(http2.FlagDataPadded) { | ||
939 | if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 { | ||
940 | t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) | ||
941 | } | ||
942 | } | ||
943 | // TODO(bradfitz, zhaoq): A copy is required here because there is no | ||
944 | // guarantee f.Data() is consumed before the arrival of next frame. | ||
945 | // Can this copy be eliminated? | ||
946 | if len(f.Data()) > 0 { | ||
947 | data := make([]byte, len(f.Data())) | ||
948 | copy(data, f.Data()) | ||
949 | s.write(recvMsg{data: data}) | ||
950 | } | ||
951 | } | ||
952 | // The server has closed the stream without sending trailers. Record that | ||
953 | // the read direction is closed, and set the status appropriately. | ||
954 | if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { | ||
955 | t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true) | ||
956 | } | ||
957 | } | ||
958 | |||
959 | func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { | ||
960 | s, ok := t.getStream(f) | ||
961 | if !ok { | ||
962 | return | ||
963 | } | ||
964 | if f.ErrCode == http2.ErrCodeRefusedStream { | ||
965 | // The stream was unprocessed by the server. | ||
966 | atomic.StoreUint32(&s.unprocessed, 1) | ||
967 | } | ||
968 | statusCode, ok := http2ErrConvTab[f.ErrCode] | ||
969 | if !ok { | ||
970 | warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode) | ||
971 | statusCode = codes.Unknown | ||
972 | } | ||
973 | if statusCode == codes.Canceled { | ||
974 | // Our deadline was already exceeded, and that was likely the cause of | ||
975 | // this cancelation. Alter the status code accordingly. | ||
976 | if d, ok := s.ctx.Deadline(); ok && d.After(time.Now()) { | ||
977 | statusCode = codes.DeadlineExceeded | ||
978 | } | ||
979 | } | ||
980 | t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false) | ||
981 | } | ||
982 | |||
983 | func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) { | ||
984 | if f.IsAck() { | ||
985 | return | ||
986 | } | ||
987 | var maxStreams *uint32 | ||
988 | var ss []http2.Setting | ||
989 | var updateFuncs []func() | ||
990 | f.ForeachSetting(func(s http2.Setting) error { | ||
991 | switch s.ID { | ||
992 | case http2.SettingMaxConcurrentStreams: | ||
993 | maxStreams = new(uint32) | ||
994 | *maxStreams = s.Val | ||
995 | case http2.SettingMaxHeaderListSize: | ||
996 | updateFuncs = append(updateFuncs, func() { | ||
997 | t.maxSendHeaderListSize = new(uint32) | ||
998 | *t.maxSendHeaderListSize = s.Val | ||
999 | }) | ||
1000 | default: | ||
1001 | ss = append(ss, s) | ||
1002 | } | ||
1003 | return nil | ||
1004 | }) | ||
1005 | if isFirst && maxStreams == nil { | ||
1006 | maxStreams = new(uint32) | ||
1007 | *maxStreams = math.MaxUint32 | ||
1008 | } | ||
1009 | sf := &incomingSettings{ | ||
1010 | ss: ss, | ||
1011 | } | ||
1012 | if maxStreams != nil { | ||
1013 | updateStreamQuota := func() { | ||
1014 | delta := int64(*maxStreams) - int64(t.maxConcurrentStreams) | ||
1015 | t.maxConcurrentStreams = *maxStreams | ||
1016 | t.streamQuota += delta | ||
1017 | if delta > 0 && t.waitingStreams > 0 { | ||
1018 | close(t.streamsQuotaAvailable) // wake all of them up. | ||
1019 | t.streamsQuotaAvailable = make(chan struct{}, 1) | ||
1020 | } | ||
1021 | } | ||
1022 | updateFuncs = append(updateFuncs, updateStreamQuota) | ||
1023 | } | ||
1024 | t.controlBuf.executeAndPut(func(interface{}) bool { | ||
1025 | for _, f := range updateFuncs { | ||
1026 | f() | ||
1027 | } | ||
1028 | return true | ||
1029 | }, sf) | ||
1030 | } | ||
1031 | |||
1032 | func (t *http2Client) handlePing(f *http2.PingFrame) { | ||
1033 | if f.IsAck() { | ||
1034 | // Maybe it's a BDP ping. | ||
1035 | if t.bdpEst != nil { | ||
1036 | t.bdpEst.calculate(f.Data) | ||
1037 | } | ||
1038 | return | ||
1039 | } | ||
1040 | pingAck := &ping{ack: true} | ||
1041 | copy(pingAck.data[:], f.Data[:]) | ||
1042 | t.controlBuf.put(pingAck) | ||
1043 | } | ||
1044 | |||
1045 | func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { | ||
1046 | t.mu.Lock() | ||
1047 | if t.state == closing { | ||
1048 | t.mu.Unlock() | ||
1049 | return | ||
1050 | } | ||
1051 | if f.ErrCode == http2.ErrCodeEnhanceYourCalm { | ||
1052 | infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") | ||
1053 | } | ||
1054 | id := f.LastStreamID | ||
1055 | if id > 0 && id%2 != 1 { | ||
1056 | t.mu.Unlock() | ||
1057 | t.Close() | ||
1058 | return | ||
1059 | } | ||
1060 | // A client can receive multiple GoAways from the server (see | ||
1061 | // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first | ||
1062 | // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be | ||
1063 | // sent after an RTT delay with the ID of the last stream the server will | ||
1064 | // process. | ||
1065 | // | ||
1066 | // Therefore, when we get the first GoAway we don't necessarily close any | ||
1067 | // streams. While in case of second GoAway we close all streams created after | ||
1068 | // the GoAwayId. This way streams that were in-flight while the GoAway from | ||
1069 | // server was being sent don't get killed. | ||
1070 | select { | ||
1071 | case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways). | ||
1072 | // If there are multiple GoAways the first one should always have an ID greater than the following ones. | ||
1073 | if id > t.prevGoAwayID { | ||
1074 | t.mu.Unlock() | ||
1075 | t.Close() | ||
1076 | return | ||
1077 | } | ||
1078 | default: | ||
1079 | t.setGoAwayReason(f) | ||
1080 | close(t.goAway) | ||
1081 | t.state = draining | ||
1082 | t.controlBuf.put(&incomingGoAway{}) | ||
1083 | |||
1084 | // This has to be a new goroutine because we're still using the current goroutine to read in the transport. | ||
1085 | t.onGoAway(t.goAwayReason) | ||
1086 | } | ||
1087 | // All streams with IDs greater than the GoAwayId | ||
1088 | // and smaller than the previous GoAway ID should be killed. | ||
1089 | upperLimit := t.prevGoAwayID | ||
1090 | if upperLimit == 0 { // This is the first GoAway Frame. | ||
1091 | upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID. | ||
1092 | } | ||
1093 | for streamID, stream := range t.activeStreams { | ||
1094 | if streamID > id && streamID <= upperLimit { | ||
1095 | // The stream was unprocessed by the server. | ||
1096 | atomic.StoreUint32(&stream.unprocessed, 1) | ||
1097 | t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false) | ||
1098 | } | ||
1099 | } | ||
1100 | t.prevGoAwayID = id | ||
1101 | active := len(t.activeStreams) | ||
1102 | t.mu.Unlock() | ||
1103 | if active == 0 { | ||
1104 | t.Close() | ||
1105 | } | ||
1106 | } | ||
1107 | |||
1108 | // setGoAwayReason sets the value of t.goAwayReason based | ||
1109 | // on the GoAway frame received. | ||
1110 | // It expects a lock on transport's mutext to be held by | ||
1111 | // the caller. | ||
1112 | func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { | ||
1113 | t.goAwayReason = GoAwayNoReason | ||
1114 | switch f.ErrCode { | ||
1115 | case http2.ErrCodeEnhanceYourCalm: | ||
1116 | if string(f.DebugData()) == "too_many_pings" { | ||
1117 | t.goAwayReason = GoAwayTooManyPings | ||
1118 | } | ||
1119 | } | ||
1120 | } | ||
1121 | |||
1122 | func (t *http2Client) GetGoAwayReason() GoAwayReason { | ||
1123 | t.mu.Lock() | ||
1124 | defer t.mu.Unlock() | ||
1125 | return t.goAwayReason | ||
1126 | } | ||
1127 | |||
1128 | func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { | ||
1129 | t.controlBuf.put(&incomingWindowUpdate{ | ||
1130 | streamID: f.Header().StreamID, | ||
1131 | increment: f.Increment, | ||
1132 | }) | ||
1133 | } | ||
1134 | |||
1135 | // operateHeaders takes action on the decoded headers. | ||
1136 | func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { | ||
1137 | s, ok := t.getStream(frame) | ||
1138 | if !ok { | ||
1139 | return | ||
1140 | } | ||
1141 | atomic.StoreUint32(&s.bytesReceived, 1) | ||
1142 | var state decodeState | ||
1143 | if err := state.decodeHeader(frame); err != nil { | ||
1144 | t.closeStream(s, err, true, http2.ErrCodeProtocol, status.New(codes.Internal, err.Error()), nil, false) | ||
1145 | // Something wrong. Stops reading even when there is remaining. | ||
1146 | return | ||
1147 | } | ||
1148 | |||
1149 | endStream := frame.StreamEnded() | ||
1150 | var isHeader bool | ||
1151 | defer func() { | ||
1152 | if t.statsHandler != nil { | ||
1153 | if isHeader { | ||
1154 | inHeader := &stats.InHeader{ | ||
1155 | Client: true, | ||
1156 | WireLength: int(frame.Header().Length), | ||
1157 | } | ||
1158 | t.statsHandler.HandleRPC(s.ctx, inHeader) | ||
1159 | } else { | ||
1160 | inTrailer := &stats.InTrailer{ | ||
1161 | Client: true, | ||
1162 | WireLength: int(frame.Header().Length), | ||
1163 | } | ||
1164 | t.statsHandler.HandleRPC(s.ctx, inTrailer) | ||
1165 | } | ||
1166 | } | ||
1167 | }() | ||
1168 | // If headers haven't been received yet. | ||
1169 | if atomic.SwapUint32(&s.headerDone, 1) == 0 { | ||
1170 | if !endStream { | ||
1171 | // Headers frame is not actually a trailers-only frame. | ||
1172 | isHeader = true | ||
1173 | // These values can be set without any synchronization because | ||
1174 | // stream goroutine will read it only after seeing a closed | ||
1175 | // headerChan which we'll close after setting this. | ||
1176 | s.recvCompress = state.encoding | ||
1177 | if len(state.mdata) > 0 { | ||
1178 | s.header = state.mdata | ||
1179 | } | ||
1180 | } else { | ||
1181 | s.noHeaders = true | ||
1182 | } | ||
1183 | close(s.headerChan) | ||
1184 | } | ||
1185 | if !endStream { | ||
1186 | return | ||
1187 | } | ||
1188 | // if client received END_STREAM from server while stream was still active, send RST_STREAM | ||
1189 | rst := s.getState() == streamActive | ||
1190 | t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.mdata, true) | ||
1191 | } | ||
1192 | |||
1193 | // reader runs as a separate goroutine in charge of reading data from network | ||
1194 | // connection. | ||
1195 | // | ||
1196 | // TODO(zhaoq): currently one reader per transport. Investigate whether this is | ||
1197 | // optimal. | ||
1198 | // TODO(zhaoq): Check the validity of the incoming frame sequence. | ||
1199 | func (t *http2Client) reader() { | ||
1200 | defer close(t.readerDone) | ||
1201 | // Check the validity of server preface. | ||
1202 | frame, err := t.framer.fr.ReadFrame() | ||
1203 | if err != nil { | ||
1204 | t.Close() // this kicks off resetTransport, so must be last before return | ||
1205 | return | ||
1206 | } | ||
1207 | t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!) | ||
1208 | if t.keepaliveEnabled { | ||
1209 | atomic.CompareAndSwapUint32(&t.activity, 0, 1) | ||
1210 | } | ||
1211 | sf, ok := frame.(*http2.SettingsFrame) | ||
1212 | if !ok { | ||
1213 | t.Close() // this kicks off resetTransport, so must be last before return | ||
1214 | return | ||
1215 | } | ||
1216 | t.onPrefaceReceipt() | ||
1217 | t.handleSettings(sf, true) | ||
1218 | |||
1219 | // loop to keep reading incoming messages on this transport. | ||
1220 | for { | ||
1221 | frame, err := t.framer.fr.ReadFrame() | ||
1222 | if t.keepaliveEnabled { | ||
1223 | atomic.CompareAndSwapUint32(&t.activity, 0, 1) | ||
1224 | } | ||
1225 | if err != nil { | ||
1226 | // Abort an active stream if the http2.Framer returns a | ||
1227 | // http2.StreamError. This can happen only if the server's response | ||
1228 | // is malformed http2. | ||
1229 | if se, ok := err.(http2.StreamError); ok { | ||
1230 | t.mu.Lock() | ||
1231 | s := t.activeStreams[se.StreamID] | ||
1232 | t.mu.Unlock() | ||
1233 | if s != nil { | ||
1234 | // use error detail to provide better err message | ||
1235 | code := http2ErrConvTab[se.Code] | ||
1236 | msg := t.framer.fr.ErrorDetail().Error() | ||
1237 | t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false) | ||
1238 | } | ||
1239 | continue | ||
1240 | } else { | ||
1241 | // Transport error. | ||
1242 | t.Close() | ||
1243 | return | ||
1244 | } | ||
1245 | } | ||
1246 | switch frame := frame.(type) { | ||
1247 | case *http2.MetaHeadersFrame: | ||
1248 | t.operateHeaders(frame) | ||
1249 | case *http2.DataFrame: | ||
1250 | t.handleData(frame) | ||
1251 | case *http2.RSTStreamFrame: | ||
1252 | t.handleRSTStream(frame) | ||
1253 | case *http2.SettingsFrame: | ||
1254 | t.handleSettings(frame, false) | ||
1255 | case *http2.PingFrame: | ||
1256 | t.handlePing(frame) | ||
1257 | case *http2.GoAwayFrame: | ||
1258 | t.handleGoAway(frame) | ||
1259 | case *http2.WindowUpdateFrame: | ||
1260 | t.handleWindowUpdate(frame) | ||
1261 | default: | ||
1262 | errorf("transport: http2Client.reader got unhandled frame type %v.", frame) | ||
1263 | } | ||
1264 | } | ||
1265 | } | ||
1266 | |||
1267 | // keepalive running in a separate goroutune makes sure the connection is alive by sending pings. | ||
1268 | func (t *http2Client) keepalive() { | ||
1269 | p := &ping{data: [8]byte{}} | ||
1270 | timer := time.NewTimer(t.kp.Time) | ||
1271 | for { | ||
1272 | select { | ||
1273 | case <-timer.C: | ||
1274 | if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | ||
1275 | timer.Reset(t.kp.Time) | ||
1276 | continue | ||
1277 | } | ||
1278 | // Check if keepalive should go dormant. | ||
1279 | t.mu.Lock() | ||
1280 | if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { | ||
1281 | // Make awakenKeepalive writable. | ||
1282 | <-t.awakenKeepalive | ||
1283 | t.mu.Unlock() | ||
1284 | select { | ||
1285 | case <-t.awakenKeepalive: | ||
1286 | // If the control gets here a ping has been sent | ||
1287 | // need to reset the timer with keepalive.Timeout. | ||
1288 | case <-t.ctx.Done(): | ||
1289 | return | ||
1290 | } | ||
1291 | } else { | ||
1292 | t.mu.Unlock() | ||
1293 | if channelz.IsOn() { | ||
1294 | atomic.AddInt64(&t.czData.kpCount, 1) | ||
1295 | } | ||
1296 | // Send ping. | ||
1297 | t.controlBuf.put(p) | ||
1298 | } | ||
1299 | |||
1300 | // By the time control gets here a ping has been sent one way or the other. | ||
1301 | timer.Reset(t.kp.Timeout) | ||
1302 | select { | ||
1303 | case <-timer.C: | ||
1304 | if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | ||
1305 | timer.Reset(t.kp.Time) | ||
1306 | continue | ||
1307 | } | ||
1308 | t.Close() | ||
1309 | return | ||
1310 | case <-t.ctx.Done(): | ||
1311 | if !timer.Stop() { | ||
1312 | <-timer.C | ||
1313 | } | ||
1314 | return | ||
1315 | } | ||
1316 | case <-t.ctx.Done(): | ||
1317 | if !timer.Stop() { | ||
1318 | <-timer.C | ||
1319 | } | ||
1320 | return | ||
1321 | } | ||
1322 | } | ||
1323 | } | ||
1324 | |||
1325 | func (t *http2Client) Error() <-chan struct{} { | ||
1326 | return t.ctx.Done() | ||
1327 | } | ||
1328 | |||
1329 | func (t *http2Client) GoAway() <-chan struct{} { | ||
1330 | return t.goAway | ||
1331 | } | ||
1332 | |||
1333 | func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric { | ||
1334 | s := channelz.SocketInternalMetric{ | ||
1335 | StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted), | ||
1336 | StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded), | ||
1337 | StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed), | ||
1338 | MessagesSent: atomic.LoadInt64(&t.czData.msgSent), | ||
1339 | MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv), | ||
1340 | KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount), | ||
1341 | LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), | ||
1342 | LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), | ||
1343 | LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), | ||
1344 | LocalFlowControlWindow: int64(t.fc.getSize()), | ||
1345 | SocketOptions: channelz.GetSocketOption(t.conn), | ||
1346 | LocalAddr: t.localAddr, | ||
1347 | RemoteAddr: t.remoteAddr, | ||
1348 | // RemoteName : | ||
1349 | } | ||
1350 | if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { | ||
1351 | s.Security = au.GetSecurityValue() | ||
1352 | } | ||
1353 | s.RemoteFlowControlWindow = t.getOutFlowWindow() | ||
1354 | return &s | ||
1355 | } | ||
1356 | |||
1357 | func (t *http2Client) IncrMsgSent() { | ||
1358 | atomic.AddInt64(&t.czData.msgSent, 1) | ||
1359 | atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) | ||
1360 | } | ||
1361 | |||
1362 | func (t *http2Client) IncrMsgRecv() { | ||
1363 | atomic.AddInt64(&t.czData.msgRecv, 1) | ||
1364 | atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) | ||
1365 | } | ||
1366 | |||
1367 | func (t *http2Client) getOutFlowWindow() int64 { | ||
1368 | resp := make(chan uint32, 1) | ||
1369 | timer := time.NewTimer(time.Second) | ||
1370 | defer timer.Stop() | ||
1371 | t.controlBuf.put(&outFlowControlSizeRequest{resp}) | ||
1372 | select { | ||
1373 | case sz := <-resp: | ||
1374 | return int64(sz) | ||
1375 | case <-t.ctxDone: | ||
1376 | return -1 | ||
1377 | case <-timer.C: | ||
1378 | return -2 | ||
1379 | } | ||
1380 | } | ||