]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | /* |
2 | * | |
3 | * Copyright 2014 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package transport | |
20 | ||
21 | import ( | |
22 | "bytes" | |
23 | "io" | |
24 | "math" | |
25 | "net" | |
26 | "strings" | |
27 | "sync" | |
28 | "sync/atomic" | |
29 | "time" | |
30 | ||
31 | "golang.org/x/net/context" | |
32 | "golang.org/x/net/http2" | |
33 | "golang.org/x/net/http2/hpack" | |
34 | "google.golang.org/grpc/codes" | |
35 | "google.golang.org/grpc/credentials" | |
36 | "google.golang.org/grpc/keepalive" | |
37 | "google.golang.org/grpc/metadata" | |
38 | "google.golang.org/grpc/peer" | |
39 | "google.golang.org/grpc/stats" | |
40 | "google.golang.org/grpc/status" | |
41 | ) | |
42 | ||
43 | // http2Client implements the ClientTransport interface with HTTP2. | |
44 | type http2Client struct { | |
45 | ctx context.Context | |
46 | target string // server name/addr | |
47 | userAgent string | |
48 | md interface{} | |
49 | conn net.Conn // underlying communication channel | |
50 | remoteAddr net.Addr | |
51 | localAddr net.Addr | |
52 | authInfo credentials.AuthInfo // auth info about the connection | |
53 | nextID uint32 // the next stream ID to be used | |
54 | ||
55 | // writableChan synchronizes write access to the transport. | |
56 | // A writer acquires the write lock by sending a value on writableChan | |
57 | // and releases it by receiving from writableChan. | |
58 | writableChan chan int | |
59 | // shutdownChan is closed when Close is called. | |
60 | // Blocking operations should select on shutdownChan to avoid | |
61 | // blocking forever after Close. | |
62 | // TODO(zhaoq): Maybe have a channel context? | |
63 | shutdownChan chan struct{} | |
64 | // errorChan is closed to notify the I/O error to the caller. | |
65 | errorChan chan struct{} | |
66 | // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) | |
67 | // that the server sent GoAway on this transport. | |
68 | goAway chan struct{} | |
69 | // awakenKeepalive is used to wake up keepalive when after it has gone dormant. | |
70 | awakenKeepalive chan struct{} | |
71 | ||
72 | framer *framer | |
73 | hBuf *bytes.Buffer // the buffer for HPACK encoding | |
74 | hEnc *hpack.Encoder // HPACK encoder | |
75 | ||
76 | // controlBuf delivers all the control related tasks (e.g., window | |
77 | // updates, reset streams, and various settings) to the controller. | |
78 | controlBuf *controlBuffer | |
79 | fc *inFlow | |
80 | // sendQuotaPool provides flow control to outbound message. | |
81 | sendQuotaPool *quotaPool | |
82 | // streamsQuota limits the max number of concurrent streams. | |
83 | streamsQuota *quotaPool | |
84 | ||
85 | // The scheme used: https if TLS is on, http otherwise. | |
86 | scheme string | |
87 | ||
88 | isSecure bool | |
89 | ||
90 | creds []credentials.PerRPCCredentials | |
91 | ||
92 | // Boolean to keep track of reading activity on transport. | |
93 | // 1 is true and 0 is false. | |
94 | activity uint32 // Accessed atomically. | |
95 | kp keepalive.ClientParameters | |
96 | ||
97 | statsHandler stats.Handler | |
98 | ||
99 | initialWindowSize int32 | |
100 | ||
101 | bdpEst *bdpEstimator | |
102 | outQuotaVersion uint32 | |
103 | ||
104 | mu sync.Mutex // guard the following variables | |
105 | state transportState // the state of underlying connection | |
106 | activeStreams map[uint32]*Stream | |
107 | // The max number of concurrent streams | |
108 | maxStreams int | |
109 | // the per-stream outbound flow control window size set by the peer. | |
110 | streamSendQuota uint32 | |
111 | // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. | |
112 | prevGoAwayID uint32 | |
113 | // goAwayReason records the http2.ErrCode and debug data received with the | |
114 | // GoAway frame. | |
115 | goAwayReason GoAwayReason | |
116 | } | |
117 | ||
118 | func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { | |
119 | if fn != nil { | |
120 | return fn(ctx, addr) | |
121 | } | |
122 | return dialContext(ctx, "tcp", addr) | |
123 | } | |
124 | ||
125 | func isTemporary(err error) bool { | |
126 | switch err { | |
127 | case io.EOF: | |
128 | // Connection closures may be resolved upon retry, and are thus | |
129 | // treated as temporary. | |
130 | return true | |
131 | case context.DeadlineExceeded: | |
132 | // In Go 1.7, context.DeadlineExceeded implements Timeout(), and this | |
133 | // special case is not needed. Until then, we need to keep this | |
134 | // clause. | |
135 | return true | |
136 | } | |
137 | ||
138 | switch err := err.(type) { | |
139 | case interface { | |
140 | Temporary() bool | |
141 | }: | |
142 | return err.Temporary() | |
143 | case interface { | |
144 | Timeout() bool | |
145 | }: | |
146 | // Timeouts may be resolved upon retry, and are thus treated as | |
147 | // temporary. | |
148 | return err.Timeout() | |
149 | } | |
150 | return false | |
151 | } | |
152 | ||
153 | // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 | |
154 | // and starts to receive messages on it. Non-nil error returns if construction | |
155 | // fails. | |
156 | func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) { | |
157 | scheme := "http" | |
158 | conn, err := dial(ctx, opts.Dialer, addr.Addr) | |
159 | if err != nil { | |
160 | if opts.FailOnNonTempDialError { | |
161 | return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err) | |
162 | } | |
163 | return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err) | |
164 | } | |
165 | // Any further errors will close the underlying connection | |
166 | defer func(conn net.Conn) { | |
167 | if err != nil { | |
168 | conn.Close() | |
169 | } | |
170 | }(conn) | |
171 | var ( | |
172 | isSecure bool | |
173 | authInfo credentials.AuthInfo | |
174 | ) | |
175 | if creds := opts.TransportCredentials; creds != nil { | |
176 | scheme = "https" | |
177 | conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn) | |
178 | if err != nil { | |
179 | // Credentials handshake errors are typically considered permanent | |
180 | // to avoid retrying on e.g. bad certificates. | |
181 | temp := isTemporary(err) | |
182 | return nil, connectionErrorf(temp, err, "transport: authentication handshake failed: %v", err) | |
183 | } | |
184 | isSecure = true | |
185 | } | |
186 | kp := opts.KeepaliveParams | |
187 | // Validate keepalive parameters. | |
188 | if kp.Time == 0 { | |
189 | kp.Time = defaultClientKeepaliveTime | |
190 | } | |
191 | if kp.Timeout == 0 { | |
192 | kp.Timeout = defaultClientKeepaliveTimeout | |
193 | } | |
194 | dynamicWindow := true | |
195 | icwz := int32(initialWindowSize) | |
196 | if opts.InitialConnWindowSize >= defaultWindowSize { | |
197 | icwz = opts.InitialConnWindowSize | |
198 | dynamicWindow = false | |
199 | } | |
200 | var buf bytes.Buffer | |
201 | t := &http2Client{ | |
202 | ctx: ctx, | |
203 | target: addr.Addr, | |
204 | userAgent: opts.UserAgent, | |
205 | md: addr.Metadata, | |
206 | conn: conn, | |
207 | remoteAddr: conn.RemoteAddr(), | |
208 | localAddr: conn.LocalAddr(), | |
209 | authInfo: authInfo, | |
210 | // The client initiated stream id is odd starting from 1. | |
211 | nextID: 1, | |
212 | writableChan: make(chan int, 1), | |
213 | shutdownChan: make(chan struct{}), | |
214 | errorChan: make(chan struct{}), | |
215 | goAway: make(chan struct{}), | |
216 | awakenKeepalive: make(chan struct{}, 1), | |
217 | framer: newFramer(conn), | |
218 | hBuf: &buf, | |
219 | hEnc: hpack.NewEncoder(&buf), | |
220 | controlBuf: newControlBuffer(), | |
221 | fc: &inFlow{limit: uint32(icwz)}, | |
222 | sendQuotaPool: newQuotaPool(defaultWindowSize), | |
223 | scheme: scheme, | |
224 | state: reachable, | |
225 | activeStreams: make(map[uint32]*Stream), | |
226 | isSecure: isSecure, | |
227 | creds: opts.PerRPCCredentials, | |
228 | maxStreams: defaultMaxStreamsClient, | |
229 | streamsQuota: newQuotaPool(defaultMaxStreamsClient), | |
230 | streamSendQuota: defaultWindowSize, | |
231 | kp: kp, | |
232 | statsHandler: opts.StatsHandler, | |
233 | initialWindowSize: initialWindowSize, | |
234 | } | |
235 | if opts.InitialWindowSize >= defaultWindowSize { | |
236 | t.initialWindowSize = opts.InitialWindowSize | |
237 | dynamicWindow = false | |
238 | } | |
239 | if dynamicWindow { | |
240 | t.bdpEst = &bdpEstimator{ | |
241 | bdp: initialWindowSize, | |
242 | updateFlowControl: t.updateFlowControl, | |
243 | } | |
244 | } | |
245 | // Make sure awakenKeepalive can't be written upon. | |
246 | // keepalive routine will make it writable, if need be. | |
247 | t.awakenKeepalive <- struct{}{} | |
248 | if t.statsHandler != nil { | |
249 | t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ | |
250 | RemoteAddr: t.remoteAddr, | |
251 | LocalAddr: t.localAddr, | |
252 | }) | |
253 | connBegin := &stats.ConnBegin{ | |
254 | Client: true, | |
255 | } | |
256 | t.statsHandler.HandleConn(t.ctx, connBegin) | |
257 | } | |
258 | // Start the reader goroutine for incoming message. Each transport has | |
259 | // a dedicated goroutine which reads HTTP2 frame from network. Then it | |
260 | // dispatches the frame to the corresponding stream entity. | |
261 | go t.reader() | |
262 | // Send connection preface to server. | |
263 | n, err := t.conn.Write(clientPreface) | |
264 | if err != nil { | |
265 | t.Close() | |
266 | return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err) | |
267 | } | |
268 | if n != len(clientPreface) { | |
269 | t.Close() | |
270 | return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) | |
271 | } | |
272 | if t.initialWindowSize != defaultWindowSize { | |
273 | err = t.framer.writeSettings(true, http2.Setting{ | |
274 | ID: http2.SettingInitialWindowSize, | |
275 | Val: uint32(t.initialWindowSize), | |
276 | }) | |
277 | } else { | |
278 | err = t.framer.writeSettings(true) | |
279 | } | |
280 | if err != nil { | |
281 | t.Close() | |
282 | return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err) | |
283 | } | |
284 | // Adjust the connection flow control window if needed. | |
285 | if delta := uint32(icwz - defaultWindowSize); delta > 0 { | |
286 | if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil { | |
287 | t.Close() | |
288 | return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err) | |
289 | } | |
290 | } | |
291 | go t.controller() | |
292 | if t.kp.Time != infinity { | |
293 | go t.keepalive() | |
294 | } | |
295 | t.writableChan <- 0 | |
296 | return t, nil | |
297 | } | |
298 | ||
299 | func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { | |
300 | // TODO(zhaoq): Handle uint32 overflow of Stream.id. | |
301 | s := &Stream{ | |
302 | id: t.nextID, | |
303 | done: make(chan struct{}), | |
304 | goAway: make(chan struct{}), | |
305 | method: callHdr.Method, | |
306 | sendCompress: callHdr.SendCompress, | |
307 | buf: newRecvBuffer(), | |
308 | fc: &inFlow{limit: uint32(t.initialWindowSize)}, | |
309 | sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), | |
310 | headerChan: make(chan struct{}), | |
311 | } | |
312 | t.nextID += 2 | |
313 | s.requestRead = func(n int) { | |
314 | t.adjustWindow(s, uint32(n)) | |
315 | } | |
316 | // The client side stream context should have exactly the same life cycle with the user provided context. | |
317 | // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done. | |
318 | // So we use the original context here instead of creating a copy. | |
319 | s.ctx = ctx | |
320 | s.trReader = &transportReader{ | |
321 | reader: &recvBufferReader{ | |
322 | ctx: s.ctx, | |
323 | goAway: s.goAway, | |
324 | recv: s.buf, | |
325 | }, | |
326 | windowHandler: func(n int) { | |
327 | t.updateWindow(s, uint32(n)) | |
328 | }, | |
329 | } | |
330 | ||
331 | return s | |
332 | } | |
333 | ||
334 | // NewStream creates a stream and registers it into the transport as "active" | |
335 | // streams. | |
336 | func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { | |
337 | pr := &peer.Peer{ | |
338 | Addr: t.remoteAddr, | |
339 | } | |
340 | // Attach Auth info if there is any. | |
341 | if t.authInfo != nil { | |
342 | pr.AuthInfo = t.authInfo | |
343 | } | |
344 | ctx = peer.NewContext(ctx, pr) | |
345 | var ( | |
346 | authData = make(map[string]string) | |
347 | audience string | |
348 | ) | |
349 | // Create an audience string only if needed. | |
350 | if len(t.creds) > 0 || callHdr.Creds != nil { | |
351 | // Construct URI required to get auth request metadata. | |
352 | var port string | |
353 | if pos := strings.LastIndex(t.target, ":"); pos != -1 { | |
354 | // Omit port if it is the default one. | |
355 | if t.target[pos+1:] != "443" { | |
356 | port = ":" + t.target[pos+1:] | |
357 | } | |
358 | } | |
359 | pos := strings.LastIndex(callHdr.Method, "/") | |
360 | if pos == -1 { | |
361 | pos = len(callHdr.Method) | |
362 | } | |
363 | audience = "https://" + callHdr.Host + port + callHdr.Method[:pos] | |
364 | } | |
365 | for _, c := range t.creds { | |
366 | data, err := c.GetRequestMetadata(ctx, audience) | |
367 | if err != nil { | |
368 | return nil, streamErrorf(codes.Internal, "transport: %v", err) | |
369 | } | |
370 | for k, v := range data { | |
371 | // Capital header names are illegal in HTTP/2. | |
372 | k = strings.ToLower(k) | |
373 | authData[k] = v | |
374 | } | |
375 | } | |
376 | callAuthData := make(map[string]string) | |
377 | // Check if credentials.PerRPCCredentials were provided via call options. | |
378 | // Note: if these credentials are provided both via dial options and call | |
379 | // options, then both sets of credentials will be applied. | |
380 | if callCreds := callHdr.Creds; callCreds != nil { | |
381 | if !t.isSecure && callCreds.RequireTransportSecurity() { | |
382 | return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure conneciton") | |
383 | } | |
384 | data, err := callCreds.GetRequestMetadata(ctx, audience) | |
385 | if err != nil { | |
386 | return nil, streamErrorf(codes.Internal, "transport: %v", err) | |
387 | } | |
388 | for k, v := range data { | |
389 | // Capital header names are illegal in HTTP/2 | |
390 | k = strings.ToLower(k) | |
391 | callAuthData[k] = v | |
392 | } | |
393 | } | |
394 | t.mu.Lock() | |
395 | if t.activeStreams == nil { | |
396 | t.mu.Unlock() | |
397 | return nil, ErrConnClosing | |
398 | } | |
399 | if t.state == draining { | |
400 | t.mu.Unlock() | |
401 | return nil, ErrStreamDrain | |
402 | } | |
403 | if t.state != reachable { | |
404 | t.mu.Unlock() | |
405 | return nil, ErrConnClosing | |
406 | } | |
407 | t.mu.Unlock() | |
408 | sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) | |
409 | if err != nil { | |
410 | return nil, err | |
411 | } | |
412 | // Returns the quota balance back. | |
413 | if sq > 1 { | |
414 | t.streamsQuota.add(sq - 1) | |
415 | } | |
416 | if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { | |
417 | // Return the quota back now because there is no stream returned to the caller. | |
418 | if _, ok := err.(StreamError); ok { | |
419 | t.streamsQuota.add(1) | |
420 | } | |
421 | return nil, err | |
422 | } | |
423 | t.mu.Lock() | |
424 | if t.state == draining { | |
425 | t.mu.Unlock() | |
426 | t.streamsQuota.add(1) | |
427 | // Need to make t writable again so that the rpc in flight can still proceed. | |
428 | t.writableChan <- 0 | |
429 | return nil, ErrStreamDrain | |
430 | } | |
431 | if t.state != reachable { | |
432 | t.mu.Unlock() | |
433 | return nil, ErrConnClosing | |
434 | } | |
435 | s := t.newStream(ctx, callHdr) | |
436 | t.activeStreams[s.id] = s | |
437 | // If the number of active streams change from 0 to 1, then check if keepalive | |
438 | // has gone dormant. If so, wake it up. | |
439 | if len(t.activeStreams) == 1 { | |
440 | select { | |
441 | case t.awakenKeepalive <- struct{}{}: | |
442 | t.framer.writePing(false, false, [8]byte{}) | |
443 | default: | |
444 | } | |
445 | } | |
446 | ||
447 | t.mu.Unlock() | |
448 | ||
449 | // HPACK encodes various headers. Note that once WriteField(...) is | |
450 | // called, the corresponding headers/continuation frame has to be sent | |
451 | // because hpack.Encoder is stateful. | |
452 | t.hBuf.Reset() | |
453 | t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"}) | |
454 | t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme}) | |
455 | t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method}) | |
456 | t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) | |
457 | t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) | |
458 | t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent}) | |
459 | t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"}) | |
460 | ||
461 | if callHdr.SendCompress != "" { | |
462 | t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) | |
463 | } | |
464 | if dl, ok := ctx.Deadline(); ok { | |
465 | // Send out timeout regardless its value. The server can detect timeout context by itself. | |
466 | timeout := dl.Sub(time.Now()) | |
467 | t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)}) | |
468 | } | |
469 | ||
470 | for k, v := range authData { | |
471 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | |
472 | } | |
473 | for k, v := range callAuthData { | |
474 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | |
475 | } | |
476 | var ( | |
477 | endHeaders bool | |
478 | ) | |
479 | if md, ok := metadata.FromOutgoingContext(ctx); ok { | |
480 | for k, vv := range md { | |
481 | // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. | |
482 | if isReservedHeader(k) { | |
483 | continue | |
484 | } | |
485 | for _, v := range vv { | |
486 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | |
487 | } | |
488 | } | |
489 | } | |
490 | if md, ok := t.md.(*metadata.MD); ok { | |
491 | for k, vv := range *md { | |
492 | if isReservedHeader(k) { | |
493 | continue | |
494 | } | |
495 | for _, v := range vv { | |
496 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | |
497 | } | |
498 | } | |
499 | } | |
500 | first := true | |
501 | bufLen := t.hBuf.Len() | |
502 | // Sends the headers in a single batch even when they span multiple frames. | |
503 | for !endHeaders { | |
504 | size := t.hBuf.Len() | |
505 | if size > http2MaxFrameLen { | |
506 | size = http2MaxFrameLen | |
507 | } else { | |
508 | endHeaders = true | |
509 | } | |
510 | var flush bool | |
511 | if callHdr.Flush && endHeaders { | |
512 | flush = true | |
513 | } | |
514 | if first { | |
515 | // Sends a HeadersFrame to server to start a new stream. | |
516 | p := http2.HeadersFrameParam{ | |
517 | StreamID: s.id, | |
518 | BlockFragment: t.hBuf.Next(size), | |
519 | EndStream: false, | |
520 | EndHeaders: endHeaders, | |
521 | } | |
522 | // Do a force flush for the buffered frames iff it is the last headers frame | |
523 | // and there is header metadata to be sent. Otherwise, there is flushing until | |
524 | // the corresponding data frame is written. | |
525 | err = t.framer.writeHeaders(flush, p) | |
526 | first = false | |
527 | } else { | |
528 | // Sends Continuation frames for the leftover headers. | |
529 | err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size)) | |
530 | } | |
531 | if err != nil { | |
532 | t.notifyError(err) | |
533 | return nil, connectionErrorf(true, err, "transport: %v", err) | |
534 | } | |
535 | } | |
536 | s.mu.Lock() | |
537 | s.bytesSent = true | |
538 | s.mu.Unlock() | |
539 | ||
540 | if t.statsHandler != nil { | |
541 | outHeader := &stats.OutHeader{ | |
542 | Client: true, | |
543 | WireLength: bufLen, | |
544 | FullMethod: callHdr.Method, | |
545 | RemoteAddr: t.remoteAddr, | |
546 | LocalAddr: t.localAddr, | |
547 | Compression: callHdr.SendCompress, | |
548 | } | |
549 | t.statsHandler.HandleRPC(s.ctx, outHeader) | |
550 | } | |
551 | t.writableChan <- 0 | |
552 | return s, nil | |
553 | } | |
554 | ||
555 | // CloseStream clears the footprint of a stream when the stream is not needed any more. | |
556 | // This must not be executed in reader's goroutine. | |
557 | func (t *http2Client) CloseStream(s *Stream, err error) { | |
558 | t.mu.Lock() | |
559 | if t.activeStreams == nil { | |
560 | t.mu.Unlock() | |
561 | return | |
562 | } | |
563 | if err != nil { | |
564 | // notify in-flight streams, before the deletion | |
565 | s.write(recvMsg{err: err}) | |
566 | } | |
567 | delete(t.activeStreams, s.id) | |
568 | if t.state == draining && len(t.activeStreams) == 0 { | |
569 | // The transport is draining and s is the last live stream on t. | |
570 | t.mu.Unlock() | |
571 | t.Close() | |
572 | return | |
573 | } | |
574 | t.mu.Unlock() | |
575 | // rstStream is true in case the stream is being closed at the client-side | |
576 | // and the server needs to be intimated about it by sending a RST_STREAM | |
577 | // frame. | |
578 | // To make sure this frame is written to the wire before the headers of the | |
579 | // next stream waiting for streamsQuota, we add to streamsQuota pool only | |
580 | // after having acquired the writableChan to send RST_STREAM out (look at | |
581 | // the controller() routine). | |
582 | var rstStream bool | |
583 | var rstError http2.ErrCode | |
584 | defer func() { | |
585 | // In case, the client doesn't have to send RST_STREAM to server | |
586 | // we can safely add back to streamsQuota pool now. | |
587 | if !rstStream { | |
588 | t.streamsQuota.add(1) | |
589 | return | |
590 | } | |
591 | t.controlBuf.put(&resetStream{s.id, rstError}) | |
592 | }() | |
593 | s.mu.Lock() | |
594 | rstStream = s.rstStream | |
595 | rstError = s.rstError | |
596 | if s.state == streamDone { | |
597 | s.mu.Unlock() | |
598 | return | |
599 | } | |
600 | if !s.headerDone { | |
601 | close(s.headerChan) | |
602 | s.headerDone = true | |
603 | } | |
604 | s.state = streamDone | |
605 | s.mu.Unlock() | |
606 | if _, ok := err.(StreamError); ok { | |
607 | rstStream = true | |
608 | rstError = http2.ErrCodeCancel | |
609 | } | |
610 | } | |
611 | ||
612 | // Close kicks off the shutdown process of the transport. This should be called | |
613 | // only once on a transport. Once it is called, the transport should not be | |
614 | // accessed any more. | |
615 | func (t *http2Client) Close() (err error) { | |
616 | t.mu.Lock() | |
617 | if t.state == closing { | |
618 | t.mu.Unlock() | |
619 | return | |
620 | } | |
621 | if t.state == reachable || t.state == draining { | |
622 | close(t.errorChan) | |
623 | } | |
624 | t.state = closing | |
625 | t.mu.Unlock() | |
626 | close(t.shutdownChan) | |
627 | err = t.conn.Close() | |
628 | t.mu.Lock() | |
629 | streams := t.activeStreams | |
630 | t.activeStreams = nil | |
631 | t.mu.Unlock() | |
632 | // Notify all active streams. | |
633 | for _, s := range streams { | |
634 | s.mu.Lock() | |
635 | if !s.headerDone { | |
636 | close(s.headerChan) | |
637 | s.headerDone = true | |
638 | } | |
639 | s.mu.Unlock() | |
640 | s.write(recvMsg{err: ErrConnClosing}) | |
641 | } | |
642 | if t.statsHandler != nil { | |
643 | connEnd := &stats.ConnEnd{ | |
644 | Client: true, | |
645 | } | |
646 | t.statsHandler.HandleConn(t.ctx, connEnd) | |
647 | } | |
648 | return | |
649 | } | |
650 | ||
651 | func (t *http2Client) GracefulClose() error { | |
652 | t.mu.Lock() | |
653 | switch t.state { | |
654 | case unreachable: | |
655 | // The server may close the connection concurrently. t is not available for | |
656 | // any streams. Close it now. | |
657 | t.mu.Unlock() | |
658 | t.Close() | |
659 | return nil | |
660 | case closing: | |
661 | t.mu.Unlock() | |
662 | return nil | |
663 | } | |
664 | if t.state == draining { | |
665 | t.mu.Unlock() | |
666 | return nil | |
667 | } | |
668 | t.state = draining | |
669 | active := len(t.activeStreams) | |
670 | t.mu.Unlock() | |
671 | if active == 0 { | |
672 | return t.Close() | |
673 | } | |
674 | return nil | |
675 | } | |
676 | ||
677 | // Write formats the data into HTTP2 data frame(s) and sends it out. The caller | |
678 | // should proceed only if Write returns nil. | |
679 | // TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later | |
680 | // if it improves the performance. | |
681 | func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { | |
682 | r := bytes.NewBuffer(data) | |
683 | var ( | |
684 | p []byte | |
685 | oqv uint32 | |
686 | ) | |
687 | for { | |
688 | oqv = atomic.LoadUint32(&t.outQuotaVersion) | |
689 | if r.Len() > 0 || p != nil { | |
690 | size := http2MaxFrameLen | |
691 | // Wait until the stream has some quota to send the data. | |
692 | sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire()) | |
693 | if err != nil { | |
694 | return err | |
695 | } | |
696 | // Wait until the transport has some quota to send the data. | |
697 | tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire()) | |
698 | if err != nil { | |
699 | return err | |
700 | } | |
701 | if sq < size { | |
702 | size = sq | |
703 | } | |
704 | if tq < size { | |
705 | size = tq | |
706 | } | |
707 | if p == nil { | |
708 | p = r.Next(size) | |
709 | } | |
710 | ps := len(p) | |
711 | if ps < sq { | |
712 | // Overbooked stream quota. Return it back. | |
713 | s.sendQuotaPool.add(sq - ps) | |
714 | } | |
715 | if ps < tq { | |
716 | // Overbooked transport quota. Return it back. | |
717 | t.sendQuotaPool.add(tq - ps) | |
718 | } | |
719 | } | |
720 | var ( | |
721 | endStream bool | |
722 | forceFlush bool | |
723 | ) | |
724 | if opts.Last && r.Len() == 0 { | |
725 | endStream = true | |
726 | } | |
727 | // Indicate there is a writer who is about to write a data frame. | |
728 | t.framer.adjustNumWriters(1) | |
729 | // Got some quota. Try to acquire writing privilege on the transport. | |
730 | if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil { | |
731 | if _, ok := err.(StreamError); ok || err == io.EOF { | |
732 | // Return the connection quota back. | |
733 | t.sendQuotaPool.add(len(p)) | |
734 | } | |
735 | if t.framer.adjustNumWriters(-1) == 0 { | |
736 | // This writer is the last one in this batch and has the | |
737 | // responsibility to flush the buffered frames. It queues | |
738 | // a flush request to controlBuf instead of flushing directly | |
739 | // in order to avoid the race with other writing or flushing. | |
740 | t.controlBuf.put(&flushIO{}) | |
741 | } | |
742 | return err | |
743 | } | |
744 | select { | |
745 | case <-s.ctx.Done(): | |
746 | t.sendQuotaPool.add(len(p)) | |
747 | if t.framer.adjustNumWriters(-1) == 0 { | |
748 | t.controlBuf.put(&flushIO{}) | |
749 | } | |
750 | t.writableChan <- 0 | |
751 | return ContextErr(s.ctx.Err()) | |
752 | default: | |
753 | } | |
754 | if oqv != atomic.LoadUint32(&t.outQuotaVersion) { | |
755 | // InitialWindowSize settings frame must have been received after we | |
756 | // acquired send quota but before we got the writable channel. | |
757 | // We must forsake this write. | |
758 | t.sendQuotaPool.add(len(p)) | |
759 | s.sendQuotaPool.add(len(p)) | |
760 | if t.framer.adjustNumWriters(-1) == 0 { | |
761 | t.controlBuf.put(&flushIO{}) | |
762 | } | |
763 | t.writableChan <- 0 | |
764 | continue | |
765 | } | |
766 | if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 { | |
767 | // Do a force flush iff this is last frame for the entire gRPC message | |
768 | // and the caller is the only writer at this moment. | |
769 | forceFlush = true | |
770 | } | |
771 | // If WriteData fails, all the pending streams will be handled | |
772 | // by http2Client.Close(). No explicit CloseStream() needs to be | |
773 | // invoked. | |
774 | if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil { | |
775 | t.notifyError(err) | |
776 | return connectionErrorf(true, err, "transport: %v", err) | |
777 | } | |
778 | p = nil | |
779 | if t.framer.adjustNumWriters(-1) == 0 { | |
780 | t.framer.flushWrite() | |
781 | } | |
782 | t.writableChan <- 0 | |
783 | if r.Len() == 0 { | |
784 | break | |
785 | } | |
786 | } | |
787 | if !opts.Last { | |
788 | return nil | |
789 | } | |
790 | s.mu.Lock() | |
791 | if s.state != streamDone { | |
792 | s.state = streamWriteDone | |
793 | } | |
794 | s.mu.Unlock() | |
795 | return nil | |
796 | } | |
797 | ||
798 | func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { | |
799 | t.mu.Lock() | |
800 | defer t.mu.Unlock() | |
801 | s, ok := t.activeStreams[f.Header().StreamID] | |
802 | return s, ok | |
803 | } | |
804 | ||
805 | // adjustWindow sends out extra window update over the initial window size | |
806 | // of stream if the application is requesting data larger in size than | |
807 | // the window. | |
808 | func (t *http2Client) adjustWindow(s *Stream, n uint32) { | |
809 | s.mu.Lock() | |
810 | defer s.mu.Unlock() | |
811 | if s.state == streamDone { | |
812 | return | |
813 | } | |
814 | if w := s.fc.maybeAdjust(n); w > 0 { | |
815 | // Piggyback conneciton's window update along. | |
816 | if cw := t.fc.resetPendingUpdate(); cw > 0 { | |
817 | t.controlBuf.put(&windowUpdate{0, cw, false}) | |
818 | } | |
819 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | |
820 | } | |
821 | } | |
822 | ||
823 | // updateWindow adjusts the inbound quota for the stream and the transport. | |
824 | // Window updates will deliver to the controller for sending when | |
825 | // the cumulative quota exceeds the corresponding threshold. | |
826 | func (t *http2Client) updateWindow(s *Stream, n uint32) { | |
827 | s.mu.Lock() | |
828 | defer s.mu.Unlock() | |
829 | if s.state == streamDone { | |
830 | return | |
831 | } | |
832 | if w := s.fc.onRead(n); w > 0 { | |
833 | if cw := t.fc.resetPendingUpdate(); cw > 0 { | |
834 | t.controlBuf.put(&windowUpdate{0, cw, false}) | |
835 | } | |
836 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | |
837 | } | |
838 | } | |
839 | ||
840 | // updateFlowControl updates the incoming flow control windows | |
841 | // for the transport and the stream based on the current bdp | |
842 | // estimation. | |
843 | func (t *http2Client) updateFlowControl(n uint32) { | |
844 | t.mu.Lock() | |
845 | for _, s := range t.activeStreams { | |
846 | s.fc.newLimit(n) | |
847 | } | |
848 | t.initialWindowSize = int32(n) | |
849 | t.mu.Unlock() | |
850 | t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false}) | |
851 | t.controlBuf.put(&settings{ | |
852 | ack: false, | |
853 | ss: []http2.Setting{ | |
854 | { | |
855 | ID: http2.SettingInitialWindowSize, | |
856 | Val: uint32(n), | |
857 | }, | |
858 | }, | |
859 | }) | |
860 | } | |
861 | ||
862 | func (t *http2Client) handleData(f *http2.DataFrame) { | |
863 | size := f.Header().Length | |
864 | var sendBDPPing bool | |
865 | if t.bdpEst != nil { | |
866 | sendBDPPing = t.bdpEst.add(uint32(size)) | |
867 | } | |
868 | // Decouple connection's flow control from application's read. | |
869 | // An update on connection's flow control should not depend on | |
870 | // whether user application has read the data or not. Such a | |
871 | // restriction is already imposed on the stream's flow control, | |
872 | // and therefore the sender will be blocked anyways. | |
873 | // Decoupling the connection flow control will prevent other | |
874 | // active(fast) streams from starving in presence of slow or | |
875 | // inactive streams. | |
876 | // | |
877 | // Furthermore, if a bdpPing is being sent out we can piggyback | |
878 | // connection's window update for the bytes we just received. | |
879 | if sendBDPPing { | |
880 | t.controlBuf.put(&windowUpdate{0, uint32(size), false}) | |
881 | t.controlBuf.put(bdpPing) | |
882 | } else { | |
883 | if err := t.fc.onData(uint32(size)); err != nil { | |
884 | t.notifyError(connectionErrorf(true, err, "%v", err)) | |
885 | return | |
886 | } | |
887 | if w := t.fc.onRead(uint32(size)); w > 0 { | |
888 | t.controlBuf.put(&windowUpdate{0, w, true}) | |
889 | } | |
890 | } | |
891 | // Select the right stream to dispatch. | |
892 | s, ok := t.getStream(f) | |
893 | if !ok { | |
894 | return | |
895 | } | |
896 | if size > 0 { | |
897 | s.mu.Lock() | |
898 | if s.state == streamDone { | |
899 | s.mu.Unlock() | |
900 | return | |
901 | } | |
902 | if err := s.fc.onData(uint32(size)); err != nil { | |
903 | s.rstStream = true | |
904 | s.rstError = http2.ErrCodeFlowControl | |
905 | s.finish(status.New(codes.Internal, err.Error())) | |
906 | s.mu.Unlock() | |
907 | s.write(recvMsg{err: io.EOF}) | |
908 | return | |
909 | } | |
910 | if f.Header().Flags.Has(http2.FlagDataPadded) { | |
911 | if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { | |
912 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | |
913 | } | |
914 | } | |
915 | s.mu.Unlock() | |
916 | // TODO(bradfitz, zhaoq): A copy is required here because there is no | |
917 | // guarantee f.Data() is consumed before the arrival of next frame. | |
918 | // Can this copy be eliminated? | |
919 | if len(f.Data()) > 0 { | |
920 | data := make([]byte, len(f.Data())) | |
921 | copy(data, f.Data()) | |
922 | s.write(recvMsg{data: data}) | |
923 | } | |
924 | } | |
925 | // The server has closed the stream without sending trailers. Record that | |
926 | // the read direction is closed, and set the status appropriately. | |
927 | if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { | |
928 | s.mu.Lock() | |
929 | if s.state == streamDone { | |
930 | s.mu.Unlock() | |
931 | return | |
932 | } | |
933 | s.finish(status.New(codes.Internal, "server closed the stream without sending trailers")) | |
934 | s.mu.Unlock() | |
935 | s.write(recvMsg{err: io.EOF}) | |
936 | } | |
937 | } | |
938 | ||
939 | func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { | |
940 | s, ok := t.getStream(f) | |
941 | if !ok { | |
942 | return | |
943 | } | |
944 | s.mu.Lock() | |
945 | if s.state == streamDone { | |
946 | s.mu.Unlock() | |
947 | return | |
948 | } | |
949 | if !s.headerDone { | |
950 | close(s.headerChan) | |
951 | s.headerDone = true | |
952 | } | |
953 | statusCode, ok := http2ErrConvTab[http2.ErrCode(f.ErrCode)] | |
954 | if !ok { | |
955 | warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode) | |
956 | statusCode = codes.Unknown | |
957 | } | |
958 | s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %d", f.ErrCode)) | |
959 | s.mu.Unlock() | |
960 | s.write(recvMsg{err: io.EOF}) | |
961 | } | |
962 | ||
963 | func (t *http2Client) handleSettings(f *http2.SettingsFrame) { | |
964 | if f.IsAck() { | |
965 | return | |
966 | } | |
967 | var ss []http2.Setting | |
968 | f.ForeachSetting(func(s http2.Setting) error { | |
969 | ss = append(ss, s) | |
970 | return nil | |
971 | }) | |
972 | // The settings will be applied once the ack is sent. | |
973 | t.controlBuf.put(&settings{ack: true, ss: ss}) | |
974 | } | |
975 | ||
976 | func (t *http2Client) handlePing(f *http2.PingFrame) { | |
977 | if f.IsAck() { | |
978 | // Maybe it's a BDP ping. | |
979 | if t.bdpEst != nil { | |
980 | t.bdpEst.calculate(f.Data) | |
981 | } | |
982 | return | |
983 | } | |
984 | pingAck := &ping{ack: true} | |
985 | copy(pingAck.data[:], f.Data[:]) | |
986 | t.controlBuf.put(pingAck) | |
987 | } | |
988 | ||
989 | func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { | |
990 | t.mu.Lock() | |
991 | if t.state != reachable && t.state != draining { | |
992 | t.mu.Unlock() | |
993 | return | |
994 | } | |
995 | if f.ErrCode == http2.ErrCodeEnhanceYourCalm { | |
996 | infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") | |
997 | } | |
998 | id := f.LastStreamID | |
999 | if id > 0 && id%2 != 1 { | |
1000 | t.mu.Unlock() | |
1001 | t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID)) | |
1002 | return | |
1003 | } | |
1004 | // A client can recieve multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387). | |
1005 | // The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay | |
1006 | // with the ID of the last stream the server will process. | |
1007 | // Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we | |
1008 | // close all streams created after the second GoAwayId. This way streams that were in-flight while the GoAway from server | |
1009 | // was being sent don't get killed. | |
1010 | select { | |
1011 | case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways). | |
1012 | // If there are multiple GoAways the first one should always have an ID greater than the following ones. | |
1013 | if id > t.prevGoAwayID { | |
1014 | t.mu.Unlock() | |
1015 | t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID)) | |
1016 | return | |
1017 | } | |
1018 | default: | |
1019 | t.setGoAwayReason(f) | |
1020 | close(t.goAway) | |
1021 | t.state = draining | |
1022 | } | |
1023 | // All streams with IDs greater than the GoAwayId | |
1024 | // and smaller than the previous GoAway ID should be killed. | |
1025 | upperLimit := t.prevGoAwayID | |
1026 | if upperLimit == 0 { // This is the first GoAway Frame. | |
1027 | upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID. | |
1028 | } | |
1029 | for streamID, stream := range t.activeStreams { | |
1030 | if streamID > id && streamID <= upperLimit { | |
1031 | close(stream.goAway) | |
1032 | } | |
1033 | } | |
1034 | t.prevGoAwayID = id | |
1035 | active := len(t.activeStreams) | |
1036 | t.mu.Unlock() | |
1037 | if active == 0 { | |
1038 | t.Close() | |
1039 | } | |
1040 | } | |
1041 | ||
1042 | // setGoAwayReason sets the value of t.goAwayReason based | |
1043 | // on the GoAway frame received. | |
1044 | // It expects a lock on transport's mutext to be held by | |
1045 | // the caller. | |
1046 | func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { | |
1047 | t.goAwayReason = NoReason | |
1048 | switch f.ErrCode { | |
1049 | case http2.ErrCodeEnhanceYourCalm: | |
1050 | if string(f.DebugData()) == "too_many_pings" { | |
1051 | t.goAwayReason = TooManyPings | |
1052 | } | |
1053 | } | |
1054 | } | |
1055 | ||
1056 | func (t *http2Client) GetGoAwayReason() GoAwayReason { | |
1057 | t.mu.Lock() | |
1058 | defer t.mu.Unlock() | |
1059 | return t.goAwayReason | |
1060 | } | |
1061 | ||
1062 | func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { | |
1063 | id := f.Header().StreamID | |
1064 | incr := f.Increment | |
1065 | if id == 0 { | |
1066 | t.sendQuotaPool.add(int(incr)) | |
1067 | return | |
1068 | } | |
1069 | if s, ok := t.getStream(f); ok { | |
1070 | s.sendQuotaPool.add(int(incr)) | |
1071 | } | |
1072 | } | |
1073 | ||
1074 | // operateHeaders takes action on the decoded headers. | |
1075 | func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { | |
1076 | s, ok := t.getStream(frame) | |
1077 | if !ok { | |
1078 | return | |
1079 | } | |
1080 | s.mu.Lock() | |
1081 | s.bytesReceived = true | |
1082 | s.mu.Unlock() | |
1083 | var state decodeState | |
1084 | if err := state.decodeResponseHeader(frame); err != nil { | |
1085 | s.mu.Lock() | |
1086 | if !s.headerDone { | |
1087 | close(s.headerChan) | |
1088 | s.headerDone = true | |
1089 | } | |
1090 | s.mu.Unlock() | |
1091 | s.write(recvMsg{err: err}) | |
1092 | // Something wrong. Stops reading even when there is remaining. | |
1093 | return | |
1094 | } | |
1095 | ||
1096 | endStream := frame.StreamEnded() | |
1097 | var isHeader bool | |
1098 | defer func() { | |
1099 | if t.statsHandler != nil { | |
1100 | if isHeader { | |
1101 | inHeader := &stats.InHeader{ | |
1102 | Client: true, | |
1103 | WireLength: int(frame.Header().Length), | |
1104 | } | |
1105 | t.statsHandler.HandleRPC(s.ctx, inHeader) | |
1106 | } else { | |
1107 | inTrailer := &stats.InTrailer{ | |
1108 | Client: true, | |
1109 | WireLength: int(frame.Header().Length), | |
1110 | } | |
1111 | t.statsHandler.HandleRPC(s.ctx, inTrailer) | |
1112 | } | |
1113 | } | |
1114 | }() | |
1115 | ||
1116 | s.mu.Lock() | |
1117 | if !endStream { | |
1118 | s.recvCompress = state.encoding | |
1119 | } | |
1120 | if !s.headerDone { | |
1121 | if !endStream && len(state.mdata) > 0 { | |
1122 | s.header = state.mdata | |
1123 | } | |
1124 | close(s.headerChan) | |
1125 | s.headerDone = true | |
1126 | isHeader = true | |
1127 | } | |
1128 | if !endStream || s.state == streamDone { | |
1129 | s.mu.Unlock() | |
1130 | return | |
1131 | } | |
1132 | ||
1133 | if len(state.mdata) > 0 { | |
1134 | s.trailer = state.mdata | |
1135 | } | |
1136 | s.finish(state.status()) | |
1137 | s.mu.Unlock() | |
1138 | s.write(recvMsg{err: io.EOF}) | |
1139 | } | |
1140 | ||
1141 | func handleMalformedHTTP2(s *Stream, err error) { | |
1142 | s.mu.Lock() | |
1143 | if !s.headerDone { | |
1144 | close(s.headerChan) | |
1145 | s.headerDone = true | |
1146 | } | |
1147 | s.mu.Unlock() | |
1148 | s.write(recvMsg{err: err}) | |
1149 | } | |
1150 | ||
1151 | // reader runs as a separate goroutine in charge of reading data from network | |
1152 | // connection. | |
1153 | // | |
1154 | // TODO(zhaoq): currently one reader per transport. Investigate whether this is | |
1155 | // optimal. | |
1156 | // TODO(zhaoq): Check the validity of the incoming frame sequence. | |
1157 | func (t *http2Client) reader() { | |
1158 | // Check the validity of server preface. | |
1159 | frame, err := t.framer.readFrame() | |
1160 | if err != nil { | |
1161 | t.notifyError(err) | |
1162 | return | |
1163 | } | |
1164 | atomic.CompareAndSwapUint32(&t.activity, 0, 1) | |
1165 | sf, ok := frame.(*http2.SettingsFrame) | |
1166 | if !ok { | |
1167 | t.notifyError(err) | |
1168 | return | |
1169 | } | |
1170 | t.handleSettings(sf) | |
1171 | ||
1172 | // loop to keep reading incoming messages on this transport. | |
1173 | for { | |
1174 | frame, err := t.framer.readFrame() | |
1175 | atomic.CompareAndSwapUint32(&t.activity, 0, 1) | |
1176 | if err != nil { | |
1177 | // Abort an active stream if the http2.Framer returns a | |
1178 | // http2.StreamError. This can happen only if the server's response | |
1179 | // is malformed http2. | |
1180 | if se, ok := err.(http2.StreamError); ok { | |
1181 | t.mu.Lock() | |
1182 | s := t.activeStreams[se.StreamID] | |
1183 | t.mu.Unlock() | |
1184 | if s != nil { | |
1185 | // use error detail to provide better err message | |
1186 | handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail())) | |
1187 | } | |
1188 | continue | |
1189 | } else { | |
1190 | // Transport error. | |
1191 | t.notifyError(err) | |
1192 | return | |
1193 | } | |
1194 | } | |
1195 | switch frame := frame.(type) { | |
1196 | case *http2.MetaHeadersFrame: | |
1197 | t.operateHeaders(frame) | |
1198 | case *http2.DataFrame: | |
1199 | t.handleData(frame) | |
1200 | case *http2.RSTStreamFrame: | |
1201 | t.handleRSTStream(frame) | |
1202 | case *http2.SettingsFrame: | |
1203 | t.handleSettings(frame) | |
1204 | case *http2.PingFrame: | |
1205 | t.handlePing(frame) | |
1206 | case *http2.GoAwayFrame: | |
1207 | t.handleGoAway(frame) | |
1208 | case *http2.WindowUpdateFrame: | |
1209 | t.handleWindowUpdate(frame) | |
1210 | default: | |
1211 | errorf("transport: http2Client.reader got unhandled frame type %v.", frame) | |
1212 | } | |
1213 | } | |
1214 | } | |
1215 | ||
1216 | func (t *http2Client) applySettings(ss []http2.Setting) { | |
1217 | for _, s := range ss { | |
1218 | switch s.ID { | |
1219 | case http2.SettingMaxConcurrentStreams: | |
1220 | // TODO(zhaoq): This is a hack to avoid significant refactoring of the | |
1221 | // code to deal with the unrealistic int32 overflow. Probably will try | |
1222 | // to find a better way to handle this later. | |
1223 | if s.Val > math.MaxInt32 { | |
1224 | s.Val = math.MaxInt32 | |
1225 | } | |
1226 | t.mu.Lock() | |
1227 | ms := t.maxStreams | |
1228 | t.maxStreams = int(s.Val) | |
1229 | t.mu.Unlock() | |
1230 | t.streamsQuota.add(int(s.Val) - ms) | |
1231 | case http2.SettingInitialWindowSize: | |
1232 | t.mu.Lock() | |
1233 | for _, stream := range t.activeStreams { | |
1234 | // Adjust the sending quota for each stream. | |
1235 | stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota)) | |
1236 | } | |
1237 | t.streamSendQuota = s.Val | |
1238 | t.mu.Unlock() | |
1239 | atomic.AddUint32(&t.outQuotaVersion, 1) | |
1240 | } | |
1241 | } | |
1242 | } | |
1243 | ||
1244 | // controller running in a separate goroutine takes charge of sending control | |
1245 | // frames (e.g., window update, reset stream, setting, etc.) to the server. | |
1246 | func (t *http2Client) controller() { | |
1247 | for { | |
1248 | select { | |
1249 | case i := <-t.controlBuf.get(): | |
1250 | t.controlBuf.load() | |
1251 | select { | |
1252 | case <-t.writableChan: | |
1253 | switch i := i.(type) { | |
1254 | case *windowUpdate: | |
1255 | t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment) | |
1256 | case *settings: | |
1257 | if i.ack { | |
1258 | t.framer.writeSettingsAck(true) | |
1259 | t.applySettings(i.ss) | |
1260 | } else { | |
1261 | t.framer.writeSettings(true, i.ss...) | |
1262 | } | |
1263 | case *resetStream: | |
1264 | // If the server needs to be to intimated about stream closing, | |
1265 | // then we need to make sure the RST_STREAM frame is written to | |
1266 | // the wire before the headers of the next stream waiting on | |
1267 | // streamQuota. We ensure this by adding to the streamsQuota pool | |
1268 | // only after having acquired the writableChan to send RST_STREAM. | |
1269 | t.streamsQuota.add(1) | |
1270 | t.framer.writeRSTStream(true, i.streamID, i.code) | |
1271 | case *flushIO: | |
1272 | t.framer.flushWrite() | |
1273 | case *ping: | |
1274 | if !i.ack { | |
1275 | t.bdpEst.timesnap(i.data) | |
1276 | } | |
1277 | t.framer.writePing(true, i.ack, i.data) | |
1278 | default: | |
1279 | errorf("transport: http2Client.controller got unexpected item type %v\n", i) | |
1280 | } | |
1281 | t.writableChan <- 0 | |
1282 | continue | |
1283 | case <-t.shutdownChan: | |
1284 | return | |
1285 | } | |
1286 | case <-t.shutdownChan: | |
1287 | return | |
1288 | } | |
1289 | } | |
1290 | } | |
1291 | ||
1292 | // keepalive running in a separate goroutune makes sure the connection is alive by sending pings. | |
1293 | func (t *http2Client) keepalive() { | |
1294 | p := &ping{data: [8]byte{}} | |
1295 | timer := time.NewTimer(t.kp.Time) | |
1296 | for { | |
1297 | select { | |
1298 | case <-timer.C: | |
1299 | if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | |
1300 | timer.Reset(t.kp.Time) | |
1301 | continue | |
1302 | } | |
1303 | // Check if keepalive should go dormant. | |
1304 | t.mu.Lock() | |
1305 | if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { | |
1306 | // Make awakenKeepalive writable. | |
1307 | <-t.awakenKeepalive | |
1308 | t.mu.Unlock() | |
1309 | select { | |
1310 | case <-t.awakenKeepalive: | |
1311 | // If the control gets here a ping has been sent | |
1312 | // need to reset the timer with keepalive.Timeout. | |
1313 | case <-t.shutdownChan: | |
1314 | return | |
1315 | } | |
1316 | } else { | |
1317 | t.mu.Unlock() | |
1318 | // Send ping. | |
1319 | t.controlBuf.put(p) | |
1320 | } | |
1321 | ||
1322 | // By the time control gets here a ping has been sent one way or the other. | |
1323 | timer.Reset(t.kp.Timeout) | |
1324 | select { | |
1325 | case <-timer.C: | |
1326 | if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | |
1327 | timer.Reset(t.kp.Time) | |
1328 | continue | |
1329 | } | |
1330 | t.Close() | |
1331 | return | |
1332 | case <-t.shutdownChan: | |
1333 | if !timer.Stop() { | |
1334 | <-timer.C | |
1335 | } | |
1336 | return | |
1337 | } | |
1338 | case <-t.shutdownChan: | |
1339 | if !timer.Stop() { | |
1340 | <-timer.C | |
1341 | } | |
1342 | return | |
1343 | } | |
1344 | } | |
1345 | } | |
1346 | ||
1347 | func (t *http2Client) Error() <-chan struct{} { | |
1348 | return t.errorChan | |
1349 | } | |
1350 | ||
1351 | func (t *http2Client) GoAway() <-chan struct{} { | |
1352 | return t.goAway | |
1353 | } | |
1354 | ||
1355 | func (t *http2Client) notifyError(err error) { | |
1356 | t.mu.Lock() | |
1357 | // make sure t.errorChan is closed only once. | |
1358 | if t.state == draining { | |
1359 | t.mu.Unlock() | |
1360 | t.Close() | |
1361 | return | |
1362 | } | |
1363 | if t.state == reachable { | |
1364 | t.state = unreachable | |
1365 | close(t.errorChan) | |
1366 | infof("transport: http2Client.notifyError got notified that the client transport was broken %v.", err) | |
1367 | } | |
1368 | t.mu.Unlock() | |
1369 | } |