diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/transport/http2_client.go')
-rw-r--r-- | vendor/google.golang.org/grpc/transport/http2_client.go | 1369 |
1 files changed, 1369 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go new file mode 100644 index 0000000..516ea06 --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/http2_client.go | |||
@@ -0,0 +1,1369 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package transport | ||
20 | |||
21 | import ( | ||
22 | "bytes" | ||
23 | "io" | ||
24 | "math" | ||
25 | "net" | ||
26 | "strings" | ||
27 | "sync" | ||
28 | "sync/atomic" | ||
29 | "time" | ||
30 | |||
31 | "golang.org/x/net/context" | ||
32 | "golang.org/x/net/http2" | ||
33 | "golang.org/x/net/http2/hpack" | ||
34 | "google.golang.org/grpc/codes" | ||
35 | "google.golang.org/grpc/credentials" | ||
36 | "google.golang.org/grpc/keepalive" | ||
37 | "google.golang.org/grpc/metadata" | ||
38 | "google.golang.org/grpc/peer" | ||
39 | "google.golang.org/grpc/stats" | ||
40 | "google.golang.org/grpc/status" | ||
41 | ) | ||
42 | |||
43 | // http2Client implements the ClientTransport interface with HTTP2. | ||
44 | type http2Client struct { | ||
45 | ctx context.Context | ||
46 | target string // server name/addr | ||
47 | userAgent string | ||
48 | md interface{} | ||
49 | conn net.Conn // underlying communication channel | ||
50 | remoteAddr net.Addr | ||
51 | localAddr net.Addr | ||
52 | authInfo credentials.AuthInfo // auth info about the connection | ||
53 | nextID uint32 // the next stream ID to be used | ||
54 | |||
55 | // writableChan synchronizes write access to the transport. | ||
56 | // A writer acquires the write lock by sending a value on writableChan | ||
57 | // and releases it by receiving from writableChan. | ||
58 | writableChan chan int | ||
59 | // shutdownChan is closed when Close is called. | ||
60 | // Blocking operations should select on shutdownChan to avoid | ||
61 | // blocking forever after Close. | ||
62 | // TODO(zhaoq): Maybe have a channel context? | ||
63 | shutdownChan chan struct{} | ||
64 | // errorChan is closed to notify the I/O error to the caller. | ||
65 | errorChan chan struct{} | ||
66 | // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) | ||
67 | // that the server sent GoAway on this transport. | ||
68 | goAway chan struct{} | ||
69 | // awakenKeepalive is used to wake up keepalive when after it has gone dormant. | ||
70 | awakenKeepalive chan struct{} | ||
71 | |||
72 | framer *framer | ||
73 | hBuf *bytes.Buffer // the buffer for HPACK encoding | ||
74 | hEnc *hpack.Encoder // HPACK encoder | ||
75 | |||
76 | // controlBuf delivers all the control related tasks (e.g., window | ||
77 | // updates, reset streams, and various settings) to the controller. | ||
78 | controlBuf *controlBuffer | ||
79 | fc *inFlow | ||
80 | // sendQuotaPool provides flow control to outbound message. | ||
81 | sendQuotaPool *quotaPool | ||
82 | // streamsQuota limits the max number of concurrent streams. | ||
83 | streamsQuota *quotaPool | ||
84 | |||
85 | // The scheme used: https if TLS is on, http otherwise. | ||
86 | scheme string | ||
87 | |||
88 | isSecure bool | ||
89 | |||
90 | creds []credentials.PerRPCCredentials | ||
91 | |||
92 | // Boolean to keep track of reading activity on transport. | ||
93 | // 1 is true and 0 is false. | ||
94 | activity uint32 // Accessed atomically. | ||
95 | kp keepalive.ClientParameters | ||
96 | |||
97 | statsHandler stats.Handler | ||
98 | |||
99 | initialWindowSize int32 | ||
100 | |||
101 | bdpEst *bdpEstimator | ||
102 | outQuotaVersion uint32 | ||
103 | |||
104 | mu sync.Mutex // guard the following variables | ||
105 | state transportState // the state of underlying connection | ||
106 | activeStreams map[uint32]*Stream | ||
107 | // The max number of concurrent streams | ||
108 | maxStreams int | ||
109 | // the per-stream outbound flow control window size set by the peer. | ||
110 | streamSendQuota uint32 | ||
111 | // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. | ||
112 | prevGoAwayID uint32 | ||
113 | // goAwayReason records the http2.ErrCode and debug data received with the | ||
114 | // GoAway frame. | ||
115 | goAwayReason GoAwayReason | ||
116 | } | ||
117 | |||
118 | func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { | ||
119 | if fn != nil { | ||
120 | return fn(ctx, addr) | ||
121 | } | ||
122 | return dialContext(ctx, "tcp", addr) | ||
123 | } | ||
124 | |||
125 | func isTemporary(err error) bool { | ||
126 | switch err { | ||
127 | case io.EOF: | ||
128 | // Connection closures may be resolved upon retry, and are thus | ||
129 | // treated as temporary. | ||
130 | return true | ||
131 | case context.DeadlineExceeded: | ||
132 | // In Go 1.7, context.DeadlineExceeded implements Timeout(), and this | ||
133 | // special case is not needed. Until then, we need to keep this | ||
134 | // clause. | ||
135 | return true | ||
136 | } | ||
137 | |||
138 | switch err := err.(type) { | ||
139 | case interface { | ||
140 | Temporary() bool | ||
141 | }: | ||
142 | return err.Temporary() | ||
143 | case interface { | ||
144 | Timeout() bool | ||
145 | }: | ||
146 | // Timeouts may be resolved upon retry, and are thus treated as | ||
147 | // temporary. | ||
148 | return err.Timeout() | ||
149 | } | ||
150 | return false | ||
151 | } | ||
152 | |||
153 | // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 | ||
154 | // and starts to receive messages on it. Non-nil error returns if construction | ||
155 | // fails. | ||
156 | func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) { | ||
157 | scheme := "http" | ||
158 | conn, err := dial(ctx, opts.Dialer, addr.Addr) | ||
159 | if err != nil { | ||
160 | if opts.FailOnNonTempDialError { | ||
161 | return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err) | ||
162 | } | ||
163 | return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err) | ||
164 | } | ||
165 | // Any further errors will close the underlying connection | ||
166 | defer func(conn net.Conn) { | ||
167 | if err != nil { | ||
168 | conn.Close() | ||
169 | } | ||
170 | }(conn) | ||
171 | var ( | ||
172 | isSecure bool | ||
173 | authInfo credentials.AuthInfo | ||
174 | ) | ||
175 | if creds := opts.TransportCredentials; creds != nil { | ||
176 | scheme = "https" | ||
177 | conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn) | ||
178 | if err != nil { | ||
179 | // Credentials handshake errors are typically considered permanent | ||
180 | // to avoid retrying on e.g. bad certificates. | ||
181 | temp := isTemporary(err) | ||
182 | return nil, connectionErrorf(temp, err, "transport: authentication handshake failed: %v", err) | ||
183 | } | ||
184 | isSecure = true | ||
185 | } | ||
186 | kp := opts.KeepaliveParams | ||
187 | // Validate keepalive parameters. | ||
188 | if kp.Time == 0 { | ||
189 | kp.Time = defaultClientKeepaliveTime | ||
190 | } | ||
191 | if kp.Timeout == 0 { | ||
192 | kp.Timeout = defaultClientKeepaliveTimeout | ||
193 | } | ||
194 | dynamicWindow := true | ||
195 | icwz := int32(initialWindowSize) | ||
196 | if opts.InitialConnWindowSize >= defaultWindowSize { | ||
197 | icwz = opts.InitialConnWindowSize | ||
198 | dynamicWindow = false | ||
199 | } | ||
200 | var buf bytes.Buffer | ||
201 | t := &http2Client{ | ||
202 | ctx: ctx, | ||
203 | target: addr.Addr, | ||
204 | userAgent: opts.UserAgent, | ||
205 | md: addr.Metadata, | ||
206 | conn: conn, | ||
207 | remoteAddr: conn.RemoteAddr(), | ||
208 | localAddr: conn.LocalAddr(), | ||
209 | authInfo: authInfo, | ||
210 | // The client initiated stream id is odd starting from 1. | ||
211 | nextID: 1, | ||
212 | writableChan: make(chan int, 1), | ||
213 | shutdownChan: make(chan struct{}), | ||
214 | errorChan: make(chan struct{}), | ||
215 | goAway: make(chan struct{}), | ||
216 | awakenKeepalive: make(chan struct{}, 1), | ||
217 | framer: newFramer(conn), | ||
218 | hBuf: &buf, | ||
219 | hEnc: hpack.NewEncoder(&buf), | ||
220 | controlBuf: newControlBuffer(), | ||
221 | fc: &inFlow{limit: uint32(icwz)}, | ||
222 | sendQuotaPool: newQuotaPool(defaultWindowSize), | ||
223 | scheme: scheme, | ||
224 | state: reachable, | ||
225 | activeStreams: make(map[uint32]*Stream), | ||
226 | isSecure: isSecure, | ||
227 | creds: opts.PerRPCCredentials, | ||
228 | maxStreams: defaultMaxStreamsClient, | ||
229 | streamsQuota: newQuotaPool(defaultMaxStreamsClient), | ||
230 | streamSendQuota: defaultWindowSize, | ||
231 | kp: kp, | ||
232 | statsHandler: opts.StatsHandler, | ||
233 | initialWindowSize: initialWindowSize, | ||
234 | } | ||
235 | if opts.InitialWindowSize >= defaultWindowSize { | ||
236 | t.initialWindowSize = opts.InitialWindowSize | ||
237 | dynamicWindow = false | ||
238 | } | ||
239 | if dynamicWindow { | ||
240 | t.bdpEst = &bdpEstimator{ | ||
241 | bdp: initialWindowSize, | ||
242 | updateFlowControl: t.updateFlowControl, | ||
243 | } | ||
244 | } | ||
245 | // Make sure awakenKeepalive can't be written upon. | ||
246 | // keepalive routine will make it writable, if need be. | ||
247 | t.awakenKeepalive <- struct{}{} | ||
248 | if t.statsHandler != nil { | ||
249 | t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ | ||
250 | RemoteAddr: t.remoteAddr, | ||
251 | LocalAddr: t.localAddr, | ||
252 | }) | ||
253 | connBegin := &stats.ConnBegin{ | ||
254 | Client: true, | ||
255 | } | ||
256 | t.statsHandler.HandleConn(t.ctx, connBegin) | ||
257 | } | ||
258 | // Start the reader goroutine for incoming message. Each transport has | ||
259 | // a dedicated goroutine which reads HTTP2 frame from network. Then it | ||
260 | // dispatches the frame to the corresponding stream entity. | ||
261 | go t.reader() | ||
262 | // Send connection preface to server. | ||
263 | n, err := t.conn.Write(clientPreface) | ||
264 | if err != nil { | ||
265 | t.Close() | ||
266 | return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err) | ||
267 | } | ||
268 | if n != len(clientPreface) { | ||
269 | t.Close() | ||
270 | return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) | ||
271 | } | ||
272 | if t.initialWindowSize != defaultWindowSize { | ||
273 | err = t.framer.writeSettings(true, http2.Setting{ | ||
274 | ID: http2.SettingInitialWindowSize, | ||
275 | Val: uint32(t.initialWindowSize), | ||
276 | }) | ||
277 | } else { | ||
278 | err = t.framer.writeSettings(true) | ||
279 | } | ||
280 | if err != nil { | ||
281 | t.Close() | ||
282 | return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err) | ||
283 | } | ||
284 | // Adjust the connection flow control window if needed. | ||
285 | if delta := uint32(icwz - defaultWindowSize); delta > 0 { | ||
286 | if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil { | ||
287 | t.Close() | ||
288 | return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err) | ||
289 | } | ||
290 | } | ||
291 | go t.controller() | ||
292 | if t.kp.Time != infinity { | ||
293 | go t.keepalive() | ||
294 | } | ||
295 | t.writableChan <- 0 | ||
296 | return t, nil | ||
297 | } | ||
298 | |||
299 | func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { | ||
300 | // TODO(zhaoq): Handle uint32 overflow of Stream.id. | ||
301 | s := &Stream{ | ||
302 | id: t.nextID, | ||
303 | done: make(chan struct{}), | ||
304 | goAway: make(chan struct{}), | ||
305 | method: callHdr.Method, | ||
306 | sendCompress: callHdr.SendCompress, | ||
307 | buf: newRecvBuffer(), | ||
308 | fc: &inFlow{limit: uint32(t.initialWindowSize)}, | ||
309 | sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), | ||
310 | headerChan: make(chan struct{}), | ||
311 | } | ||
312 | t.nextID += 2 | ||
313 | s.requestRead = func(n int) { | ||
314 | t.adjustWindow(s, uint32(n)) | ||
315 | } | ||
316 | // The client side stream context should have exactly the same life cycle with the user provided context. | ||
317 | // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done. | ||
318 | // So we use the original context here instead of creating a copy. | ||
319 | s.ctx = ctx | ||
320 | s.trReader = &transportReader{ | ||
321 | reader: &recvBufferReader{ | ||
322 | ctx: s.ctx, | ||
323 | goAway: s.goAway, | ||
324 | recv: s.buf, | ||
325 | }, | ||
326 | windowHandler: func(n int) { | ||
327 | t.updateWindow(s, uint32(n)) | ||
328 | }, | ||
329 | } | ||
330 | |||
331 | return s | ||
332 | } | ||
333 | |||
334 | // NewStream creates a stream and registers it into the transport as "active" | ||
335 | // streams. | ||
336 | func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { | ||
337 | pr := &peer.Peer{ | ||
338 | Addr: t.remoteAddr, | ||
339 | } | ||
340 | // Attach Auth info if there is any. | ||
341 | if t.authInfo != nil { | ||
342 | pr.AuthInfo = t.authInfo | ||
343 | } | ||
344 | ctx = peer.NewContext(ctx, pr) | ||
345 | var ( | ||
346 | authData = make(map[string]string) | ||
347 | audience string | ||
348 | ) | ||
349 | // Create an audience string only if needed. | ||
350 | if len(t.creds) > 0 || callHdr.Creds != nil { | ||
351 | // Construct URI required to get auth request metadata. | ||
352 | var port string | ||
353 | if pos := strings.LastIndex(t.target, ":"); pos != -1 { | ||
354 | // Omit port if it is the default one. | ||
355 | if t.target[pos+1:] != "443" { | ||
356 | port = ":" + t.target[pos+1:] | ||
357 | } | ||
358 | } | ||
359 | pos := strings.LastIndex(callHdr.Method, "/") | ||
360 | if pos == -1 { | ||
361 | pos = len(callHdr.Method) | ||
362 | } | ||
363 | audience = "https://" + callHdr.Host + port + callHdr.Method[:pos] | ||
364 | } | ||
365 | for _, c := range t.creds { | ||
366 | data, err := c.GetRequestMetadata(ctx, audience) | ||
367 | if err != nil { | ||
368 | return nil, streamErrorf(codes.Internal, "transport: %v", err) | ||
369 | } | ||
370 | for k, v := range data { | ||
371 | // Capital header names are illegal in HTTP/2. | ||
372 | k = strings.ToLower(k) | ||
373 | authData[k] = v | ||
374 | } | ||
375 | } | ||
376 | callAuthData := make(map[string]string) | ||
377 | // Check if credentials.PerRPCCredentials were provided via call options. | ||
378 | // Note: if these credentials are provided both via dial options and call | ||
379 | // options, then both sets of credentials will be applied. | ||
380 | if callCreds := callHdr.Creds; callCreds != nil { | ||
381 | if !t.isSecure && callCreds.RequireTransportSecurity() { | ||
382 | return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure conneciton") | ||
383 | } | ||
384 | data, err := callCreds.GetRequestMetadata(ctx, audience) | ||
385 | if err != nil { | ||
386 | return nil, streamErrorf(codes.Internal, "transport: %v", err) | ||
387 | } | ||
388 | for k, v := range data { | ||
389 | // Capital header names are illegal in HTTP/2 | ||
390 | k = strings.ToLower(k) | ||
391 | callAuthData[k] = v | ||
392 | } | ||
393 | } | ||
394 | t.mu.Lock() | ||
395 | if t.activeStreams == nil { | ||
396 | t.mu.Unlock() | ||
397 | return nil, ErrConnClosing | ||
398 | } | ||
399 | if t.state == draining { | ||
400 | t.mu.Unlock() | ||
401 | return nil, ErrStreamDrain | ||
402 | } | ||
403 | if t.state != reachable { | ||
404 | t.mu.Unlock() | ||
405 | return nil, ErrConnClosing | ||
406 | } | ||
407 | t.mu.Unlock() | ||
408 | sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) | ||
409 | if err != nil { | ||
410 | return nil, err | ||
411 | } | ||
412 | // Returns the quota balance back. | ||
413 | if sq > 1 { | ||
414 | t.streamsQuota.add(sq - 1) | ||
415 | } | ||
416 | if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { | ||
417 | // Return the quota back now because there is no stream returned to the caller. | ||
418 | if _, ok := err.(StreamError); ok { | ||
419 | t.streamsQuota.add(1) | ||
420 | } | ||
421 | return nil, err | ||
422 | } | ||
423 | t.mu.Lock() | ||
424 | if t.state == draining { | ||
425 | t.mu.Unlock() | ||
426 | t.streamsQuota.add(1) | ||
427 | // Need to make t writable again so that the rpc in flight can still proceed. | ||
428 | t.writableChan <- 0 | ||
429 | return nil, ErrStreamDrain | ||
430 | } | ||
431 | if t.state != reachable { | ||
432 | t.mu.Unlock() | ||
433 | return nil, ErrConnClosing | ||
434 | } | ||
435 | s := t.newStream(ctx, callHdr) | ||
436 | t.activeStreams[s.id] = s | ||
437 | // If the number of active streams change from 0 to 1, then check if keepalive | ||
438 | // has gone dormant. If so, wake it up. | ||
439 | if len(t.activeStreams) == 1 { | ||
440 | select { | ||
441 | case t.awakenKeepalive <- struct{}{}: | ||
442 | t.framer.writePing(false, false, [8]byte{}) | ||
443 | default: | ||
444 | } | ||
445 | } | ||
446 | |||
447 | t.mu.Unlock() | ||
448 | |||
449 | // HPACK encodes various headers. Note that once WriteField(...) is | ||
450 | // called, the corresponding headers/continuation frame has to be sent | ||
451 | // because hpack.Encoder is stateful. | ||
452 | t.hBuf.Reset() | ||
453 | t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"}) | ||
454 | t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme}) | ||
455 | t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method}) | ||
456 | t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) | ||
457 | t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) | ||
458 | t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent}) | ||
459 | t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"}) | ||
460 | |||
461 | if callHdr.SendCompress != "" { | ||
462 | t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) | ||
463 | } | ||
464 | if dl, ok := ctx.Deadline(); ok { | ||
465 | // Send out timeout regardless its value. The server can detect timeout context by itself. | ||
466 | timeout := dl.Sub(time.Now()) | ||
467 | t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)}) | ||
468 | } | ||
469 | |||
470 | for k, v := range authData { | ||
471 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
472 | } | ||
473 | for k, v := range callAuthData { | ||
474 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
475 | } | ||
476 | var ( | ||
477 | endHeaders bool | ||
478 | ) | ||
479 | if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||
480 | for k, vv := range md { | ||
481 | // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. | ||
482 | if isReservedHeader(k) { | ||
483 | continue | ||
484 | } | ||
485 | for _, v := range vv { | ||
486 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
487 | } | ||
488 | } | ||
489 | } | ||
490 | if md, ok := t.md.(*metadata.MD); ok { | ||
491 | for k, vv := range *md { | ||
492 | if isReservedHeader(k) { | ||
493 | continue | ||
494 | } | ||
495 | for _, v := range vv { | ||
496 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
497 | } | ||
498 | } | ||
499 | } | ||
500 | first := true | ||
501 | bufLen := t.hBuf.Len() | ||
502 | // Sends the headers in a single batch even when they span multiple frames. | ||
503 | for !endHeaders { | ||
504 | size := t.hBuf.Len() | ||
505 | if size > http2MaxFrameLen { | ||
506 | size = http2MaxFrameLen | ||
507 | } else { | ||
508 | endHeaders = true | ||
509 | } | ||
510 | var flush bool | ||
511 | if callHdr.Flush && endHeaders { | ||
512 | flush = true | ||
513 | } | ||
514 | if first { | ||
515 | // Sends a HeadersFrame to server to start a new stream. | ||
516 | p := http2.HeadersFrameParam{ | ||
517 | StreamID: s.id, | ||
518 | BlockFragment: t.hBuf.Next(size), | ||
519 | EndStream: false, | ||
520 | EndHeaders: endHeaders, | ||
521 | } | ||
522 | // Do a force flush for the buffered frames iff it is the last headers frame | ||
523 | // and there is header metadata to be sent. Otherwise, there is flushing until | ||
524 | // the corresponding data frame is written. | ||
525 | err = t.framer.writeHeaders(flush, p) | ||
526 | first = false | ||
527 | } else { | ||
528 | // Sends Continuation frames for the leftover headers. | ||
529 | err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size)) | ||
530 | } | ||
531 | if err != nil { | ||
532 | t.notifyError(err) | ||
533 | return nil, connectionErrorf(true, err, "transport: %v", err) | ||
534 | } | ||
535 | } | ||
536 | s.mu.Lock() | ||
537 | s.bytesSent = true | ||
538 | s.mu.Unlock() | ||
539 | |||
540 | if t.statsHandler != nil { | ||
541 | outHeader := &stats.OutHeader{ | ||
542 | Client: true, | ||
543 | WireLength: bufLen, | ||
544 | FullMethod: callHdr.Method, | ||
545 | RemoteAddr: t.remoteAddr, | ||
546 | LocalAddr: t.localAddr, | ||
547 | Compression: callHdr.SendCompress, | ||
548 | } | ||
549 | t.statsHandler.HandleRPC(s.ctx, outHeader) | ||
550 | } | ||
551 | t.writableChan <- 0 | ||
552 | return s, nil | ||
553 | } | ||
554 | |||
555 | // CloseStream clears the footprint of a stream when the stream is not needed any more. | ||
556 | // This must not be executed in reader's goroutine. | ||
557 | func (t *http2Client) CloseStream(s *Stream, err error) { | ||
558 | t.mu.Lock() | ||
559 | if t.activeStreams == nil { | ||
560 | t.mu.Unlock() | ||
561 | return | ||
562 | } | ||
563 | if err != nil { | ||
564 | // notify in-flight streams, before the deletion | ||
565 | s.write(recvMsg{err: err}) | ||
566 | } | ||
567 | delete(t.activeStreams, s.id) | ||
568 | if t.state == draining && len(t.activeStreams) == 0 { | ||
569 | // The transport is draining and s is the last live stream on t. | ||
570 | t.mu.Unlock() | ||
571 | t.Close() | ||
572 | return | ||
573 | } | ||
574 | t.mu.Unlock() | ||
575 | // rstStream is true in case the stream is being closed at the client-side | ||
576 | // and the server needs to be intimated about it by sending a RST_STREAM | ||
577 | // frame. | ||
578 | // To make sure this frame is written to the wire before the headers of the | ||
579 | // next stream waiting for streamsQuota, we add to streamsQuota pool only | ||
580 | // after having acquired the writableChan to send RST_STREAM out (look at | ||
581 | // the controller() routine). | ||
582 | var rstStream bool | ||
583 | var rstError http2.ErrCode | ||
584 | defer func() { | ||
585 | // In case, the client doesn't have to send RST_STREAM to server | ||
586 | // we can safely add back to streamsQuota pool now. | ||
587 | if !rstStream { | ||
588 | t.streamsQuota.add(1) | ||
589 | return | ||
590 | } | ||
591 | t.controlBuf.put(&resetStream{s.id, rstError}) | ||
592 | }() | ||
593 | s.mu.Lock() | ||
594 | rstStream = s.rstStream | ||
595 | rstError = s.rstError | ||
596 | if s.state == streamDone { | ||
597 | s.mu.Unlock() | ||
598 | return | ||
599 | } | ||
600 | if !s.headerDone { | ||
601 | close(s.headerChan) | ||
602 | s.headerDone = true | ||
603 | } | ||
604 | s.state = streamDone | ||
605 | s.mu.Unlock() | ||
606 | if _, ok := err.(StreamError); ok { | ||
607 | rstStream = true | ||
608 | rstError = http2.ErrCodeCancel | ||
609 | } | ||
610 | } | ||
611 | |||
612 | // Close kicks off the shutdown process of the transport. This should be called | ||
613 | // only once on a transport. Once it is called, the transport should not be | ||
614 | // accessed any more. | ||
615 | func (t *http2Client) Close() (err error) { | ||
616 | t.mu.Lock() | ||
617 | if t.state == closing { | ||
618 | t.mu.Unlock() | ||
619 | return | ||
620 | } | ||
621 | if t.state == reachable || t.state == draining { | ||
622 | close(t.errorChan) | ||
623 | } | ||
624 | t.state = closing | ||
625 | t.mu.Unlock() | ||
626 | close(t.shutdownChan) | ||
627 | err = t.conn.Close() | ||
628 | t.mu.Lock() | ||
629 | streams := t.activeStreams | ||
630 | t.activeStreams = nil | ||
631 | t.mu.Unlock() | ||
632 | // Notify all active streams. | ||
633 | for _, s := range streams { | ||
634 | s.mu.Lock() | ||
635 | if !s.headerDone { | ||
636 | close(s.headerChan) | ||
637 | s.headerDone = true | ||
638 | } | ||
639 | s.mu.Unlock() | ||
640 | s.write(recvMsg{err: ErrConnClosing}) | ||
641 | } | ||
642 | if t.statsHandler != nil { | ||
643 | connEnd := &stats.ConnEnd{ | ||
644 | Client: true, | ||
645 | } | ||
646 | t.statsHandler.HandleConn(t.ctx, connEnd) | ||
647 | } | ||
648 | return | ||
649 | } | ||
650 | |||
651 | func (t *http2Client) GracefulClose() error { | ||
652 | t.mu.Lock() | ||
653 | switch t.state { | ||
654 | case unreachable: | ||
655 | // The server may close the connection concurrently. t is not available for | ||
656 | // any streams. Close it now. | ||
657 | t.mu.Unlock() | ||
658 | t.Close() | ||
659 | return nil | ||
660 | case closing: | ||
661 | t.mu.Unlock() | ||
662 | return nil | ||
663 | } | ||
664 | if t.state == draining { | ||
665 | t.mu.Unlock() | ||
666 | return nil | ||
667 | } | ||
668 | t.state = draining | ||
669 | active := len(t.activeStreams) | ||
670 | t.mu.Unlock() | ||
671 | if active == 0 { | ||
672 | return t.Close() | ||
673 | } | ||
674 | return nil | ||
675 | } | ||
676 | |||
677 | // Write formats the data into HTTP2 data frame(s) and sends it out. The caller | ||
678 | // should proceed only if Write returns nil. | ||
679 | // TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later | ||
680 | // if it improves the performance. | ||
681 | func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { | ||
682 | r := bytes.NewBuffer(data) | ||
683 | var ( | ||
684 | p []byte | ||
685 | oqv uint32 | ||
686 | ) | ||
687 | for { | ||
688 | oqv = atomic.LoadUint32(&t.outQuotaVersion) | ||
689 | if r.Len() > 0 || p != nil { | ||
690 | size := http2MaxFrameLen | ||
691 | // Wait until the stream has some quota to send the data. | ||
692 | sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire()) | ||
693 | if err != nil { | ||
694 | return err | ||
695 | } | ||
696 | // Wait until the transport has some quota to send the data. | ||
697 | tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire()) | ||
698 | if err != nil { | ||
699 | return err | ||
700 | } | ||
701 | if sq < size { | ||
702 | size = sq | ||
703 | } | ||
704 | if tq < size { | ||
705 | size = tq | ||
706 | } | ||
707 | if p == nil { | ||
708 | p = r.Next(size) | ||
709 | } | ||
710 | ps := len(p) | ||
711 | if ps < sq { | ||
712 | // Overbooked stream quota. Return it back. | ||
713 | s.sendQuotaPool.add(sq - ps) | ||
714 | } | ||
715 | if ps < tq { | ||
716 | // Overbooked transport quota. Return it back. | ||
717 | t.sendQuotaPool.add(tq - ps) | ||
718 | } | ||
719 | } | ||
720 | var ( | ||
721 | endStream bool | ||
722 | forceFlush bool | ||
723 | ) | ||
724 | if opts.Last && r.Len() == 0 { | ||
725 | endStream = true | ||
726 | } | ||
727 | // Indicate there is a writer who is about to write a data frame. | ||
728 | t.framer.adjustNumWriters(1) | ||
729 | // Got some quota. Try to acquire writing privilege on the transport. | ||
730 | if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil { | ||
731 | if _, ok := err.(StreamError); ok || err == io.EOF { | ||
732 | // Return the connection quota back. | ||
733 | t.sendQuotaPool.add(len(p)) | ||
734 | } | ||
735 | if t.framer.adjustNumWriters(-1) == 0 { | ||
736 | // This writer is the last one in this batch and has the | ||
737 | // responsibility to flush the buffered frames. It queues | ||
738 | // a flush request to controlBuf instead of flushing directly | ||
739 | // in order to avoid the race with other writing or flushing. | ||
740 | t.controlBuf.put(&flushIO{}) | ||
741 | } | ||
742 | return err | ||
743 | } | ||
744 | select { | ||
745 | case <-s.ctx.Done(): | ||
746 | t.sendQuotaPool.add(len(p)) | ||
747 | if t.framer.adjustNumWriters(-1) == 0 { | ||
748 | t.controlBuf.put(&flushIO{}) | ||
749 | } | ||
750 | t.writableChan <- 0 | ||
751 | return ContextErr(s.ctx.Err()) | ||
752 | default: | ||
753 | } | ||
754 | if oqv != atomic.LoadUint32(&t.outQuotaVersion) { | ||
755 | // InitialWindowSize settings frame must have been received after we | ||
756 | // acquired send quota but before we got the writable channel. | ||
757 | // We must forsake this write. | ||
758 | t.sendQuotaPool.add(len(p)) | ||
759 | s.sendQuotaPool.add(len(p)) | ||
760 | if t.framer.adjustNumWriters(-1) == 0 { | ||
761 | t.controlBuf.put(&flushIO{}) | ||
762 | } | ||
763 | t.writableChan <- 0 | ||
764 | continue | ||
765 | } | ||
766 | if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 { | ||
767 | // Do a force flush iff this is last frame for the entire gRPC message | ||
768 | // and the caller is the only writer at this moment. | ||
769 | forceFlush = true | ||
770 | } | ||
771 | // If WriteData fails, all the pending streams will be handled | ||
772 | // by http2Client.Close(). No explicit CloseStream() needs to be | ||
773 | // invoked. | ||
774 | if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil { | ||
775 | t.notifyError(err) | ||
776 | return connectionErrorf(true, err, "transport: %v", err) | ||
777 | } | ||
778 | p = nil | ||
779 | if t.framer.adjustNumWriters(-1) == 0 { | ||
780 | t.framer.flushWrite() | ||
781 | } | ||
782 | t.writableChan <- 0 | ||
783 | if r.Len() == 0 { | ||
784 | break | ||
785 | } | ||
786 | } | ||
787 | if !opts.Last { | ||
788 | return nil | ||
789 | } | ||
790 | s.mu.Lock() | ||
791 | if s.state != streamDone { | ||
792 | s.state = streamWriteDone | ||
793 | } | ||
794 | s.mu.Unlock() | ||
795 | return nil | ||
796 | } | ||
797 | |||
798 | func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { | ||
799 | t.mu.Lock() | ||
800 | defer t.mu.Unlock() | ||
801 | s, ok := t.activeStreams[f.Header().StreamID] | ||
802 | return s, ok | ||
803 | } | ||
804 | |||
805 | // adjustWindow sends out extra window update over the initial window size | ||
806 | // of stream if the application is requesting data larger in size than | ||
807 | // the window. | ||
808 | func (t *http2Client) adjustWindow(s *Stream, n uint32) { | ||
809 | s.mu.Lock() | ||
810 | defer s.mu.Unlock() | ||
811 | if s.state == streamDone { | ||
812 | return | ||
813 | } | ||
814 | if w := s.fc.maybeAdjust(n); w > 0 { | ||
815 | // Piggyback conneciton's window update along. | ||
816 | if cw := t.fc.resetPendingUpdate(); cw > 0 { | ||
817 | t.controlBuf.put(&windowUpdate{0, cw, false}) | ||
818 | } | ||
819 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | ||
820 | } | ||
821 | } | ||
822 | |||
823 | // updateWindow adjusts the inbound quota for the stream and the transport. | ||
824 | // Window updates will deliver to the controller for sending when | ||
825 | // the cumulative quota exceeds the corresponding threshold. | ||
826 | func (t *http2Client) updateWindow(s *Stream, n uint32) { | ||
827 | s.mu.Lock() | ||
828 | defer s.mu.Unlock() | ||
829 | if s.state == streamDone { | ||
830 | return | ||
831 | } | ||
832 | if w := s.fc.onRead(n); w > 0 { | ||
833 | if cw := t.fc.resetPendingUpdate(); cw > 0 { | ||
834 | t.controlBuf.put(&windowUpdate{0, cw, false}) | ||
835 | } | ||
836 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | ||
837 | } | ||
838 | } | ||
839 | |||
840 | // updateFlowControl updates the incoming flow control windows | ||
841 | // for the transport and the stream based on the current bdp | ||
842 | // estimation. | ||
843 | func (t *http2Client) updateFlowControl(n uint32) { | ||
844 | t.mu.Lock() | ||
845 | for _, s := range t.activeStreams { | ||
846 | s.fc.newLimit(n) | ||
847 | } | ||
848 | t.initialWindowSize = int32(n) | ||
849 | t.mu.Unlock() | ||
850 | t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false}) | ||
851 | t.controlBuf.put(&settings{ | ||
852 | ack: false, | ||
853 | ss: []http2.Setting{ | ||
854 | { | ||
855 | ID: http2.SettingInitialWindowSize, | ||
856 | Val: uint32(n), | ||
857 | }, | ||
858 | }, | ||
859 | }) | ||
860 | } | ||
861 | |||
862 | func (t *http2Client) handleData(f *http2.DataFrame) { | ||
863 | size := f.Header().Length | ||
864 | var sendBDPPing bool | ||
865 | if t.bdpEst != nil { | ||
866 | sendBDPPing = t.bdpEst.add(uint32(size)) | ||
867 | } | ||
868 | // Decouple connection's flow control from application's read. | ||
869 | // An update on connection's flow control should not depend on | ||
870 | // whether user application has read the data or not. Such a | ||
871 | // restriction is already imposed on the stream's flow control, | ||
872 | // and therefore the sender will be blocked anyways. | ||
873 | // Decoupling the connection flow control will prevent other | ||
874 | // active(fast) streams from starving in presence of slow or | ||
875 | // inactive streams. | ||
876 | // | ||
877 | // Furthermore, if a bdpPing is being sent out we can piggyback | ||
878 | // connection's window update for the bytes we just received. | ||
879 | if sendBDPPing { | ||
880 | t.controlBuf.put(&windowUpdate{0, uint32(size), false}) | ||
881 | t.controlBuf.put(bdpPing) | ||
882 | } else { | ||
883 | if err := t.fc.onData(uint32(size)); err != nil { | ||
884 | t.notifyError(connectionErrorf(true, err, "%v", err)) | ||
885 | return | ||
886 | } | ||
887 | if w := t.fc.onRead(uint32(size)); w > 0 { | ||
888 | t.controlBuf.put(&windowUpdate{0, w, true}) | ||
889 | } | ||
890 | } | ||
891 | // Select the right stream to dispatch. | ||
892 | s, ok := t.getStream(f) | ||
893 | if !ok { | ||
894 | return | ||
895 | } | ||
896 | if size > 0 { | ||
897 | s.mu.Lock() | ||
898 | if s.state == streamDone { | ||
899 | s.mu.Unlock() | ||
900 | return | ||
901 | } | ||
902 | if err := s.fc.onData(uint32(size)); err != nil { | ||
903 | s.rstStream = true | ||
904 | s.rstError = http2.ErrCodeFlowControl | ||
905 | s.finish(status.New(codes.Internal, err.Error())) | ||
906 | s.mu.Unlock() | ||
907 | s.write(recvMsg{err: io.EOF}) | ||
908 | return | ||
909 | } | ||
910 | if f.Header().Flags.Has(http2.FlagDataPadded) { | ||
911 | if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { | ||
912 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | ||
913 | } | ||
914 | } | ||
915 | s.mu.Unlock() | ||
916 | // TODO(bradfitz, zhaoq): A copy is required here because there is no | ||
917 | // guarantee f.Data() is consumed before the arrival of next frame. | ||
918 | // Can this copy be eliminated? | ||
919 | if len(f.Data()) > 0 { | ||
920 | data := make([]byte, len(f.Data())) | ||
921 | copy(data, f.Data()) | ||
922 | s.write(recvMsg{data: data}) | ||
923 | } | ||
924 | } | ||
925 | // The server has closed the stream without sending trailers. Record that | ||
926 | // the read direction is closed, and set the status appropriately. | ||
927 | if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { | ||
928 | s.mu.Lock() | ||
929 | if s.state == streamDone { | ||
930 | s.mu.Unlock() | ||
931 | return | ||
932 | } | ||
933 | s.finish(status.New(codes.Internal, "server closed the stream without sending trailers")) | ||
934 | s.mu.Unlock() | ||
935 | s.write(recvMsg{err: io.EOF}) | ||
936 | } | ||
937 | } | ||
938 | |||
939 | func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { | ||
940 | s, ok := t.getStream(f) | ||
941 | if !ok { | ||
942 | return | ||
943 | } | ||
944 | s.mu.Lock() | ||
945 | if s.state == streamDone { | ||
946 | s.mu.Unlock() | ||
947 | return | ||
948 | } | ||
949 | if !s.headerDone { | ||
950 | close(s.headerChan) | ||
951 | s.headerDone = true | ||
952 | } | ||
953 | statusCode, ok := http2ErrConvTab[http2.ErrCode(f.ErrCode)] | ||
954 | if !ok { | ||
955 | warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode) | ||
956 | statusCode = codes.Unknown | ||
957 | } | ||
958 | s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %d", f.ErrCode)) | ||
959 | s.mu.Unlock() | ||
960 | s.write(recvMsg{err: io.EOF}) | ||
961 | } | ||
962 | |||
963 | func (t *http2Client) handleSettings(f *http2.SettingsFrame) { | ||
964 | if f.IsAck() { | ||
965 | return | ||
966 | } | ||
967 | var ss []http2.Setting | ||
968 | f.ForeachSetting(func(s http2.Setting) error { | ||
969 | ss = append(ss, s) | ||
970 | return nil | ||
971 | }) | ||
972 | // The settings will be applied once the ack is sent. | ||
973 | t.controlBuf.put(&settings{ack: true, ss: ss}) | ||
974 | } | ||
975 | |||
976 | func (t *http2Client) handlePing(f *http2.PingFrame) { | ||
977 | if f.IsAck() { | ||
978 | // Maybe it's a BDP ping. | ||
979 | if t.bdpEst != nil { | ||
980 | t.bdpEst.calculate(f.Data) | ||
981 | } | ||
982 | return | ||
983 | } | ||
984 | pingAck := &ping{ack: true} | ||
985 | copy(pingAck.data[:], f.Data[:]) | ||
986 | t.controlBuf.put(pingAck) | ||
987 | } | ||
988 | |||
989 | func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { | ||
990 | t.mu.Lock() | ||
991 | if t.state != reachable && t.state != draining { | ||
992 | t.mu.Unlock() | ||
993 | return | ||
994 | } | ||
995 | if f.ErrCode == http2.ErrCodeEnhanceYourCalm { | ||
996 | infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") | ||
997 | } | ||
998 | id := f.LastStreamID | ||
999 | if id > 0 && id%2 != 1 { | ||
1000 | t.mu.Unlock() | ||
1001 | t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID)) | ||
1002 | return | ||
1003 | } | ||
1004 | // A client can recieve multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387). | ||
1005 | // The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay | ||
1006 | // with the ID of the last stream the server will process. | ||
1007 | // Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we | ||
1008 | // close all streams created after the second GoAwayId. This way streams that were in-flight while the GoAway from server | ||
1009 | // was being sent don't get killed. | ||
1010 | select { | ||
1011 | case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways). | ||
1012 | // If there are multiple GoAways the first one should always have an ID greater than the following ones. | ||
1013 | if id > t.prevGoAwayID { | ||
1014 | t.mu.Unlock() | ||
1015 | t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID)) | ||
1016 | return | ||
1017 | } | ||
1018 | default: | ||
1019 | t.setGoAwayReason(f) | ||
1020 | close(t.goAway) | ||
1021 | t.state = draining | ||
1022 | } | ||
1023 | // All streams with IDs greater than the GoAwayId | ||
1024 | // and smaller than the previous GoAway ID should be killed. | ||
1025 | upperLimit := t.prevGoAwayID | ||
1026 | if upperLimit == 0 { // This is the first GoAway Frame. | ||
1027 | upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID. | ||
1028 | } | ||
1029 | for streamID, stream := range t.activeStreams { | ||
1030 | if streamID > id && streamID <= upperLimit { | ||
1031 | close(stream.goAway) | ||
1032 | } | ||
1033 | } | ||
1034 | t.prevGoAwayID = id | ||
1035 | active := len(t.activeStreams) | ||
1036 | t.mu.Unlock() | ||
1037 | if active == 0 { | ||
1038 | t.Close() | ||
1039 | } | ||
1040 | } | ||
1041 | |||
1042 | // setGoAwayReason sets the value of t.goAwayReason based | ||
1043 | // on the GoAway frame received. | ||
1044 | // It expects a lock on transport's mutext to be held by | ||
1045 | // the caller. | ||
1046 | func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { | ||
1047 | t.goAwayReason = NoReason | ||
1048 | switch f.ErrCode { | ||
1049 | case http2.ErrCodeEnhanceYourCalm: | ||
1050 | if string(f.DebugData()) == "too_many_pings" { | ||
1051 | t.goAwayReason = TooManyPings | ||
1052 | } | ||
1053 | } | ||
1054 | } | ||
1055 | |||
1056 | func (t *http2Client) GetGoAwayReason() GoAwayReason { | ||
1057 | t.mu.Lock() | ||
1058 | defer t.mu.Unlock() | ||
1059 | return t.goAwayReason | ||
1060 | } | ||
1061 | |||
1062 | func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { | ||
1063 | id := f.Header().StreamID | ||
1064 | incr := f.Increment | ||
1065 | if id == 0 { | ||
1066 | t.sendQuotaPool.add(int(incr)) | ||
1067 | return | ||
1068 | } | ||
1069 | if s, ok := t.getStream(f); ok { | ||
1070 | s.sendQuotaPool.add(int(incr)) | ||
1071 | } | ||
1072 | } | ||
1073 | |||
1074 | // operateHeaders takes action on the decoded headers. | ||
1075 | func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { | ||
1076 | s, ok := t.getStream(frame) | ||
1077 | if !ok { | ||
1078 | return | ||
1079 | } | ||
1080 | s.mu.Lock() | ||
1081 | s.bytesReceived = true | ||
1082 | s.mu.Unlock() | ||
1083 | var state decodeState | ||
1084 | if err := state.decodeResponseHeader(frame); err != nil { | ||
1085 | s.mu.Lock() | ||
1086 | if !s.headerDone { | ||
1087 | close(s.headerChan) | ||
1088 | s.headerDone = true | ||
1089 | } | ||
1090 | s.mu.Unlock() | ||
1091 | s.write(recvMsg{err: err}) | ||
1092 | // Something wrong. Stops reading even when there is remaining. | ||
1093 | return | ||
1094 | } | ||
1095 | |||
1096 | endStream := frame.StreamEnded() | ||
1097 | var isHeader bool | ||
1098 | defer func() { | ||
1099 | if t.statsHandler != nil { | ||
1100 | if isHeader { | ||
1101 | inHeader := &stats.InHeader{ | ||
1102 | Client: true, | ||
1103 | WireLength: int(frame.Header().Length), | ||
1104 | } | ||
1105 | t.statsHandler.HandleRPC(s.ctx, inHeader) | ||
1106 | } else { | ||
1107 | inTrailer := &stats.InTrailer{ | ||
1108 | Client: true, | ||
1109 | WireLength: int(frame.Header().Length), | ||
1110 | } | ||
1111 | t.statsHandler.HandleRPC(s.ctx, inTrailer) | ||
1112 | } | ||
1113 | } | ||
1114 | }() | ||
1115 | |||
1116 | s.mu.Lock() | ||
1117 | if !endStream { | ||
1118 | s.recvCompress = state.encoding | ||
1119 | } | ||
1120 | if !s.headerDone { | ||
1121 | if !endStream && len(state.mdata) > 0 { | ||
1122 | s.header = state.mdata | ||
1123 | } | ||
1124 | close(s.headerChan) | ||
1125 | s.headerDone = true | ||
1126 | isHeader = true | ||
1127 | } | ||
1128 | if !endStream || s.state == streamDone { | ||
1129 | s.mu.Unlock() | ||
1130 | return | ||
1131 | } | ||
1132 | |||
1133 | if len(state.mdata) > 0 { | ||
1134 | s.trailer = state.mdata | ||
1135 | } | ||
1136 | s.finish(state.status()) | ||
1137 | s.mu.Unlock() | ||
1138 | s.write(recvMsg{err: io.EOF}) | ||
1139 | } | ||
1140 | |||
1141 | func handleMalformedHTTP2(s *Stream, err error) { | ||
1142 | s.mu.Lock() | ||
1143 | if !s.headerDone { | ||
1144 | close(s.headerChan) | ||
1145 | s.headerDone = true | ||
1146 | } | ||
1147 | s.mu.Unlock() | ||
1148 | s.write(recvMsg{err: err}) | ||
1149 | } | ||
1150 | |||
1151 | // reader runs as a separate goroutine in charge of reading data from network | ||
1152 | // connection. | ||
1153 | // | ||
1154 | // TODO(zhaoq): currently one reader per transport. Investigate whether this is | ||
1155 | // optimal. | ||
1156 | // TODO(zhaoq): Check the validity of the incoming frame sequence. | ||
1157 | func (t *http2Client) reader() { | ||
1158 | // Check the validity of server preface. | ||
1159 | frame, err := t.framer.readFrame() | ||
1160 | if err != nil { | ||
1161 | t.notifyError(err) | ||
1162 | return | ||
1163 | } | ||
1164 | atomic.CompareAndSwapUint32(&t.activity, 0, 1) | ||
1165 | sf, ok := frame.(*http2.SettingsFrame) | ||
1166 | if !ok { | ||
1167 | t.notifyError(err) | ||
1168 | return | ||
1169 | } | ||
1170 | t.handleSettings(sf) | ||
1171 | |||
1172 | // loop to keep reading incoming messages on this transport. | ||
1173 | for { | ||
1174 | frame, err := t.framer.readFrame() | ||
1175 | atomic.CompareAndSwapUint32(&t.activity, 0, 1) | ||
1176 | if err != nil { | ||
1177 | // Abort an active stream if the http2.Framer returns a | ||
1178 | // http2.StreamError. This can happen only if the server's response | ||
1179 | // is malformed http2. | ||
1180 | if se, ok := err.(http2.StreamError); ok { | ||
1181 | t.mu.Lock() | ||
1182 | s := t.activeStreams[se.StreamID] | ||
1183 | t.mu.Unlock() | ||
1184 | if s != nil { | ||
1185 | // use error detail to provide better err message | ||
1186 | handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail())) | ||
1187 | } | ||
1188 | continue | ||
1189 | } else { | ||
1190 | // Transport error. | ||
1191 | t.notifyError(err) | ||
1192 | return | ||
1193 | } | ||
1194 | } | ||
1195 | switch frame := frame.(type) { | ||
1196 | case *http2.MetaHeadersFrame: | ||
1197 | t.operateHeaders(frame) | ||
1198 | case *http2.DataFrame: | ||
1199 | t.handleData(frame) | ||
1200 | case *http2.RSTStreamFrame: | ||
1201 | t.handleRSTStream(frame) | ||
1202 | case *http2.SettingsFrame: | ||
1203 | t.handleSettings(frame) | ||
1204 | case *http2.PingFrame: | ||
1205 | t.handlePing(frame) | ||
1206 | case *http2.GoAwayFrame: | ||
1207 | t.handleGoAway(frame) | ||
1208 | case *http2.WindowUpdateFrame: | ||
1209 | t.handleWindowUpdate(frame) | ||
1210 | default: | ||
1211 | errorf("transport: http2Client.reader got unhandled frame type %v.", frame) | ||
1212 | } | ||
1213 | } | ||
1214 | } | ||
1215 | |||
1216 | func (t *http2Client) applySettings(ss []http2.Setting) { | ||
1217 | for _, s := range ss { | ||
1218 | switch s.ID { | ||
1219 | case http2.SettingMaxConcurrentStreams: | ||
1220 | // TODO(zhaoq): This is a hack to avoid significant refactoring of the | ||
1221 | // code to deal with the unrealistic int32 overflow. Probably will try | ||
1222 | // to find a better way to handle this later. | ||
1223 | if s.Val > math.MaxInt32 { | ||
1224 | s.Val = math.MaxInt32 | ||
1225 | } | ||
1226 | t.mu.Lock() | ||
1227 | ms := t.maxStreams | ||
1228 | t.maxStreams = int(s.Val) | ||
1229 | t.mu.Unlock() | ||
1230 | t.streamsQuota.add(int(s.Val) - ms) | ||
1231 | case http2.SettingInitialWindowSize: | ||
1232 | t.mu.Lock() | ||
1233 | for _, stream := range t.activeStreams { | ||
1234 | // Adjust the sending quota for each stream. | ||
1235 | stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota)) | ||
1236 | } | ||
1237 | t.streamSendQuota = s.Val | ||
1238 | t.mu.Unlock() | ||
1239 | atomic.AddUint32(&t.outQuotaVersion, 1) | ||
1240 | } | ||
1241 | } | ||
1242 | } | ||
1243 | |||
1244 | // controller running in a separate goroutine takes charge of sending control | ||
1245 | // frames (e.g., window update, reset stream, setting, etc.) to the server. | ||
1246 | func (t *http2Client) controller() { | ||
1247 | for { | ||
1248 | select { | ||
1249 | case i := <-t.controlBuf.get(): | ||
1250 | t.controlBuf.load() | ||
1251 | select { | ||
1252 | case <-t.writableChan: | ||
1253 | switch i := i.(type) { | ||
1254 | case *windowUpdate: | ||
1255 | t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment) | ||
1256 | case *settings: | ||
1257 | if i.ack { | ||
1258 | t.framer.writeSettingsAck(true) | ||
1259 | t.applySettings(i.ss) | ||
1260 | } else { | ||
1261 | t.framer.writeSettings(true, i.ss...) | ||
1262 | } | ||
1263 | case *resetStream: | ||
1264 | // If the server needs to be to intimated about stream closing, | ||
1265 | // then we need to make sure the RST_STREAM frame is written to | ||
1266 | // the wire before the headers of the next stream waiting on | ||
1267 | // streamQuota. We ensure this by adding to the streamsQuota pool | ||
1268 | // only after having acquired the writableChan to send RST_STREAM. | ||
1269 | t.streamsQuota.add(1) | ||
1270 | t.framer.writeRSTStream(true, i.streamID, i.code) | ||
1271 | case *flushIO: | ||
1272 | t.framer.flushWrite() | ||
1273 | case *ping: | ||
1274 | if !i.ack { | ||
1275 | t.bdpEst.timesnap(i.data) | ||
1276 | } | ||
1277 | t.framer.writePing(true, i.ack, i.data) | ||
1278 | default: | ||
1279 | errorf("transport: http2Client.controller got unexpected item type %v\n", i) | ||
1280 | } | ||
1281 | t.writableChan <- 0 | ||
1282 | continue | ||
1283 | case <-t.shutdownChan: | ||
1284 | return | ||
1285 | } | ||
1286 | case <-t.shutdownChan: | ||
1287 | return | ||
1288 | } | ||
1289 | } | ||
1290 | } | ||
1291 | |||
1292 | // keepalive running in a separate goroutune makes sure the connection is alive by sending pings. | ||
1293 | func (t *http2Client) keepalive() { | ||
1294 | p := &ping{data: [8]byte{}} | ||
1295 | timer := time.NewTimer(t.kp.Time) | ||
1296 | for { | ||
1297 | select { | ||
1298 | case <-timer.C: | ||
1299 | if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | ||
1300 | timer.Reset(t.kp.Time) | ||
1301 | continue | ||
1302 | } | ||
1303 | // Check if keepalive should go dormant. | ||
1304 | t.mu.Lock() | ||
1305 | if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { | ||
1306 | // Make awakenKeepalive writable. | ||
1307 | <-t.awakenKeepalive | ||
1308 | t.mu.Unlock() | ||
1309 | select { | ||
1310 | case <-t.awakenKeepalive: | ||
1311 | // If the control gets here a ping has been sent | ||
1312 | // need to reset the timer with keepalive.Timeout. | ||
1313 | case <-t.shutdownChan: | ||
1314 | return | ||
1315 | } | ||
1316 | } else { | ||
1317 | t.mu.Unlock() | ||
1318 | // Send ping. | ||
1319 | t.controlBuf.put(p) | ||
1320 | } | ||
1321 | |||
1322 | // By the time control gets here a ping has been sent one way or the other. | ||
1323 | timer.Reset(t.kp.Timeout) | ||
1324 | select { | ||
1325 | case <-timer.C: | ||
1326 | if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | ||
1327 | timer.Reset(t.kp.Time) | ||
1328 | continue | ||
1329 | } | ||
1330 | t.Close() | ||
1331 | return | ||
1332 | case <-t.shutdownChan: | ||
1333 | if !timer.Stop() { | ||
1334 | <-timer.C | ||
1335 | } | ||
1336 | return | ||
1337 | } | ||
1338 | case <-t.shutdownChan: | ||
1339 | if !timer.Stop() { | ||
1340 | <-timer.C | ||
1341 | } | ||
1342 | return | ||
1343 | } | ||
1344 | } | ||
1345 | } | ||
1346 | |||
1347 | func (t *http2Client) Error() <-chan struct{} { | ||
1348 | return t.errorChan | ||
1349 | } | ||
1350 | |||
1351 | func (t *http2Client) GoAway() <-chan struct{} { | ||
1352 | return t.goAway | ||
1353 | } | ||
1354 | |||
1355 | func (t *http2Client) notifyError(err error) { | ||
1356 | t.mu.Lock() | ||
1357 | // make sure t.errorChan is closed only once. | ||
1358 | if t.state == draining { | ||
1359 | t.mu.Unlock() | ||
1360 | t.Close() | ||
1361 | return | ||
1362 | } | ||
1363 | if t.state == reachable { | ||
1364 | t.state = unreachable | ||
1365 | close(t.errorChan) | ||
1366 | infof("transport: http2Client.notifyError got notified that the client transport was broken %v.", err) | ||
1367 | } | ||
1368 | t.mu.Unlock() | ||
1369 | } | ||