]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | // Copyright 2015 The Go Authors. All rights reserved. |
2 | // Use of this source code is governed by a BSD-style | |
3 | // license that can be found in the LICENSE file. | |
4 | ||
5 | // Transport code. | |
6 | ||
7 | package http2 | |
8 | ||
9 | import ( | |
10 | "bufio" | |
11 | "bytes" | |
12 | "compress/gzip" | |
13 | "crypto/rand" | |
14 | "crypto/tls" | |
15 | "errors" | |
16 | "fmt" | |
17 | "io" | |
18 | "io/ioutil" | |
19 | "log" | |
20 | "math" | |
21 | mathrand "math/rand" | |
22 | "net" | |
23 | "net/http" | |
24 | "sort" | |
25 | "strconv" | |
26 | "strings" | |
27 | "sync" | |
28 | "time" | |
29 | ||
30 | "golang.org/x/net/http2/hpack" | |
31 | "golang.org/x/net/idna" | |
32 | "golang.org/x/net/lex/httplex" | |
33 | ) | |
34 | ||
35 | const ( | |
36 | // transportDefaultConnFlow is how many connection-level flow control | |
37 | // tokens we give the server at start-up, past the default 64k. | |
38 | transportDefaultConnFlow = 1 << 30 | |
39 | ||
40 | // transportDefaultStreamFlow is how many stream-level flow | |
41 | // control tokens we announce to the peer, and how many bytes | |
42 | // we buffer per stream. | |
43 | transportDefaultStreamFlow = 4 << 20 | |
44 | ||
45 | // transportDefaultStreamMinRefresh is the minimum number of bytes we'll send | |
46 | // a stream-level WINDOW_UPDATE for at a time. | |
47 | transportDefaultStreamMinRefresh = 4 << 10 | |
48 | ||
49 | defaultUserAgent = "Go-http-client/2.0" | |
50 | ) | |
51 | ||
52 | // Transport is an HTTP/2 Transport. | |
53 | // | |
54 | // A Transport internally caches connections to servers. It is safe | |
55 | // for concurrent use by multiple goroutines. | |
56 | type Transport struct { | |
57 | // DialTLS specifies an optional dial function for creating | |
58 | // TLS connections for requests. | |
59 | // | |
60 | // If DialTLS is nil, tls.Dial is used. | |
61 | // | |
62 | // If the returned net.Conn has a ConnectionState method like tls.Conn, | |
63 | // it will be used to set http.Response.TLS. | |
64 | DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error) | |
65 | ||
66 | // TLSClientConfig specifies the TLS configuration to use with | |
67 | // tls.Client. If nil, the default configuration is used. | |
68 | TLSClientConfig *tls.Config | |
69 | ||
70 | // ConnPool optionally specifies an alternate connection pool to use. | |
71 | // If nil, the default is used. | |
72 | ConnPool ClientConnPool | |
73 | ||
74 | // DisableCompression, if true, prevents the Transport from | |
75 | // requesting compression with an "Accept-Encoding: gzip" | |
76 | // request header when the Request contains no existing | |
77 | // Accept-Encoding value. If the Transport requests gzip on | |
78 | // its own and gets a gzipped response, it's transparently | |
79 | // decoded in the Response.Body. However, if the user | |
80 | // explicitly requested gzip it is not automatically | |
81 | // uncompressed. | |
82 | DisableCompression bool | |
83 | ||
84 | // AllowHTTP, if true, permits HTTP/2 requests using the insecure, | |
85 | // plain-text "http" scheme. Note that this does not enable h2c support. | |
86 | AllowHTTP bool | |
87 | ||
88 | // MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to | |
89 | // send in the initial settings frame. It is how many bytes | |
90 | // of response headers are allowed. Unlike the http2 spec, zero here | |
91 | // means to use a default limit (currently 10MB). If you actually | |
92 | // want to advertise an ulimited value to the peer, Transport | |
93 | // interprets the highest possible value here (0xffffffff or 1<<32-1) | |
94 | // to mean no limit. | |
95 | MaxHeaderListSize uint32 | |
96 | ||
97 | // t1, if non-nil, is the standard library Transport using | |
98 | // this transport. Its settings are used (but not its | |
99 | // RoundTrip method, etc). | |
100 | t1 *http.Transport | |
101 | ||
102 | connPoolOnce sync.Once | |
103 | connPoolOrDef ClientConnPool // non-nil version of ConnPool | |
104 | } | |
105 | ||
106 | func (t *Transport) maxHeaderListSize() uint32 { | |
107 | if t.MaxHeaderListSize == 0 { | |
108 | return 10 << 20 | |
109 | } | |
110 | if t.MaxHeaderListSize == 0xffffffff { | |
111 | return 0 | |
112 | } | |
113 | return t.MaxHeaderListSize | |
114 | } | |
115 | ||
116 | func (t *Transport) disableCompression() bool { | |
117 | return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression) | |
118 | } | |
119 | ||
120 | var errTransportVersion = errors.New("http2: ConfigureTransport is only supported starting at Go 1.6") | |
121 | ||
122 | // ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2. | |
123 | // It requires Go 1.6 or later and returns an error if the net/http package is too old | |
124 | // or if t1 has already been HTTP/2-enabled. | |
125 | func ConfigureTransport(t1 *http.Transport) error { | |
126 | _, err := configureTransport(t1) // in configure_transport.go (go1.6) or not_go16.go | |
127 | return err | |
128 | } | |
129 | ||
130 | func (t *Transport) connPool() ClientConnPool { | |
131 | t.connPoolOnce.Do(t.initConnPool) | |
132 | return t.connPoolOrDef | |
133 | } | |
134 | ||
135 | func (t *Transport) initConnPool() { | |
136 | if t.ConnPool != nil { | |
137 | t.connPoolOrDef = t.ConnPool | |
138 | } else { | |
139 | t.connPoolOrDef = &clientConnPool{t: t} | |
140 | } | |
141 | } | |
142 | ||
143 | // ClientConn is the state of a single HTTP/2 client connection to an | |
144 | // HTTP/2 server. | |
145 | type ClientConn struct { | |
146 | t *Transport | |
147 | tconn net.Conn // usually *tls.Conn, except specialized impls | |
148 | tlsState *tls.ConnectionState // nil only for specialized impls | |
149 | singleUse bool // whether being used for a single http.Request | |
150 | ||
151 | // readLoop goroutine fields: | |
152 | readerDone chan struct{} // closed on error | |
153 | readerErr error // set before readerDone is closed | |
154 | ||
155 | idleTimeout time.Duration // or 0 for never | |
156 | idleTimer *time.Timer | |
157 | ||
158 | mu sync.Mutex // guards following | |
159 | cond *sync.Cond // hold mu; broadcast on flow/closed changes | |
160 | flow flow // our conn-level flow control quota (cs.flow is per stream) | |
161 | inflow flow // peer's conn-level flow control | |
162 | closed bool | |
163 | wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back | |
164 | goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received | |
165 | goAwayDebug string // goAway frame's debug data, retained as a string | |
166 | streams map[uint32]*clientStream // client-initiated | |
167 | nextStreamID uint32 | |
168 | pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams | |
169 | pings map[[8]byte]chan struct{} // in flight ping data to notification channel | |
170 | bw *bufio.Writer | |
171 | br *bufio.Reader | |
172 | fr *Framer | |
173 | lastActive time.Time | |
174 | // Settings from peer: (also guarded by mu) | |
175 | maxFrameSize uint32 | |
176 | maxConcurrentStreams uint32 | |
177 | peerMaxHeaderListSize uint64 | |
178 | initialWindowSize uint32 | |
179 | ||
180 | hbuf bytes.Buffer // HPACK encoder writes into this | |
181 | henc *hpack.Encoder | |
182 | freeBuf [][]byte | |
183 | ||
184 | wmu sync.Mutex // held while writing; acquire AFTER mu if holding both | |
185 | werr error // first write error that has occurred | |
186 | } | |
187 | ||
188 | // clientStream is the state for a single HTTP/2 stream. One of these | |
189 | // is created for each Transport.RoundTrip call. | |
190 | type clientStream struct { | |
191 | cc *ClientConn | |
192 | req *http.Request | |
193 | trace *clientTrace // or nil | |
194 | ID uint32 | |
195 | resc chan resAndError | |
196 | bufPipe pipe // buffered pipe with the flow-controlled response payload | |
197 | startedWrite bool // started request body write; guarded by cc.mu | |
198 | requestedGzip bool | |
199 | on100 func() // optional code to run if get a 100 continue response | |
200 | ||
201 | flow flow // guarded by cc.mu | |
202 | inflow flow // guarded by cc.mu | |
203 | bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read | |
204 | readErr error // sticky read error; owned by transportResponseBody.Read | |
205 | stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu | |
206 | didReset bool // whether we sent a RST_STREAM to the server; guarded by cc.mu | |
207 | ||
208 | peerReset chan struct{} // closed on peer reset | |
209 | resetErr error // populated before peerReset is closed | |
210 | ||
211 | done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu | |
212 | ||
213 | // owned by clientConnReadLoop: | |
214 | firstByte bool // got the first response byte | |
215 | pastHeaders bool // got first MetaHeadersFrame (actual headers) | |
216 | pastTrailers bool // got optional second MetaHeadersFrame (trailers) | |
217 | ||
218 | trailer http.Header // accumulated trailers | |
219 | resTrailer *http.Header // client's Response.Trailer | |
220 | } | |
221 | ||
222 | // awaitRequestCancel waits for the user to cancel a request or for the done | |
223 | // channel to be signaled. A non-nil error is returned only if the request was | |
224 | // canceled. | |
225 | func awaitRequestCancel(req *http.Request, done <-chan struct{}) error { | |
226 | ctx := reqContext(req) | |
227 | if req.Cancel == nil && ctx.Done() == nil { | |
228 | return nil | |
229 | } | |
230 | select { | |
231 | case <-req.Cancel: | |
232 | return errRequestCanceled | |
233 | case <-ctx.Done(): | |
234 | return ctx.Err() | |
235 | case <-done: | |
236 | return nil | |
237 | } | |
238 | } | |
239 | ||
240 | // awaitRequestCancel waits for the user to cancel a request, its context to | |
241 | // expire, or for the request to be done (any way it might be removed from the | |
242 | // cc.streams map: peer reset, successful completion, TCP connection breakage, | |
243 | // etc). If the request is canceled, then cs will be canceled and closed. | |
244 | func (cs *clientStream) awaitRequestCancel(req *http.Request) { | |
245 | if err := awaitRequestCancel(req, cs.done); err != nil { | |
246 | cs.cancelStream() | |
247 | cs.bufPipe.CloseWithError(err) | |
248 | } | |
249 | } | |
250 | ||
251 | func (cs *clientStream) cancelStream() { | |
252 | cc := cs.cc | |
253 | cc.mu.Lock() | |
254 | didReset := cs.didReset | |
255 | cs.didReset = true | |
256 | cc.mu.Unlock() | |
257 | ||
258 | if !didReset { | |
259 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
260 | cc.forgetStreamID(cs.ID) | |
261 | } | |
262 | } | |
263 | ||
264 | // checkResetOrDone reports any error sent in a RST_STREAM frame by the | |
265 | // server, or errStreamClosed if the stream is complete. | |
266 | func (cs *clientStream) checkResetOrDone() error { | |
267 | select { | |
268 | case <-cs.peerReset: | |
269 | return cs.resetErr | |
270 | case <-cs.done: | |
271 | return errStreamClosed | |
272 | default: | |
273 | return nil | |
274 | } | |
275 | } | |
276 | ||
277 | func (cs *clientStream) abortRequestBodyWrite(err error) { | |
278 | if err == nil { | |
279 | panic("nil error") | |
280 | } | |
281 | cc := cs.cc | |
282 | cc.mu.Lock() | |
283 | cs.stopReqBody = err | |
284 | cc.cond.Broadcast() | |
285 | cc.mu.Unlock() | |
286 | } | |
287 | ||
288 | type stickyErrWriter struct { | |
289 | w io.Writer | |
290 | err *error | |
291 | } | |
292 | ||
293 | func (sew stickyErrWriter) Write(p []byte) (n int, err error) { | |
294 | if *sew.err != nil { | |
295 | return 0, *sew.err | |
296 | } | |
297 | n, err = sew.w.Write(p) | |
298 | *sew.err = err | |
299 | return | |
300 | } | |
301 | ||
302 | var ErrNoCachedConn = errors.New("http2: no cached connection was available") | |
303 | ||
304 | // RoundTripOpt are options for the Transport.RoundTripOpt method. | |
305 | type RoundTripOpt struct { | |
306 | // OnlyCachedConn controls whether RoundTripOpt may | |
307 | // create a new TCP connection. If set true and | |
308 | // no cached connection is available, RoundTripOpt | |
309 | // will return ErrNoCachedConn. | |
310 | OnlyCachedConn bool | |
311 | } | |
312 | ||
313 | func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { | |
314 | return t.RoundTripOpt(req, RoundTripOpt{}) | |
315 | } | |
316 | ||
317 | // authorityAddr returns a given authority (a host/IP, or host:port / ip:port) | |
318 | // and returns a host:port. The port 443 is added if needed. | |
319 | func authorityAddr(scheme string, authority string) (addr string) { | |
320 | host, port, err := net.SplitHostPort(authority) | |
321 | if err != nil { // authority didn't have a port | |
322 | port = "443" | |
323 | if scheme == "http" { | |
324 | port = "80" | |
325 | } | |
326 | host = authority | |
327 | } | |
328 | if a, err := idna.ToASCII(host); err == nil { | |
329 | host = a | |
330 | } | |
331 | // IPv6 address literal, without a port: | |
332 | if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") { | |
333 | return host + ":" + port | |
334 | } | |
335 | return net.JoinHostPort(host, port) | |
336 | } | |
337 | ||
338 | // RoundTripOpt is like RoundTrip, but takes options. | |
339 | func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) { | |
340 | if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) { | |
341 | return nil, errors.New("http2: unsupported scheme") | |
342 | } | |
343 | ||
344 | addr := authorityAddr(req.URL.Scheme, req.URL.Host) | |
345 | for retry := 0; ; retry++ { | |
346 | cc, err := t.connPool().GetClientConn(req, addr) | |
347 | if err != nil { | |
348 | t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err) | |
349 | return nil, err | |
350 | } | |
351 | traceGotConn(req, cc) | |
352 | res, err := cc.RoundTrip(req) | |
353 | if err != nil && retry <= 6 { | |
354 | afterBodyWrite := false | |
355 | if e, ok := err.(afterReqBodyWriteError); ok { | |
356 | err = e | |
357 | afterBodyWrite = true | |
358 | } | |
359 | if req, err = shouldRetryRequest(req, err, afterBodyWrite); err == nil { | |
360 | // After the first retry, do exponential backoff with 10% jitter. | |
361 | if retry == 0 { | |
362 | continue | |
363 | } | |
364 | backoff := float64(uint(1) << (uint(retry) - 1)) | |
365 | backoff += backoff * (0.1 * mathrand.Float64()) | |
366 | select { | |
367 | case <-time.After(time.Second * time.Duration(backoff)): | |
368 | continue | |
369 | case <-reqContext(req).Done(): | |
370 | return nil, reqContext(req).Err() | |
371 | } | |
372 | } | |
373 | } | |
374 | if err != nil { | |
375 | t.vlogf("RoundTrip failure: %v", err) | |
376 | return nil, err | |
377 | } | |
378 | return res, nil | |
379 | } | |
380 | } | |
381 | ||
382 | // CloseIdleConnections closes any connections which were previously | |
383 | // connected from previous requests but are now sitting idle. | |
384 | // It does not interrupt any connections currently in use. | |
385 | func (t *Transport) CloseIdleConnections() { | |
386 | if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok { | |
387 | cp.closeIdleConnections() | |
388 | } | |
389 | } | |
390 | ||
391 | var ( | |
392 | errClientConnClosed = errors.New("http2: client conn is closed") | |
393 | errClientConnUnusable = errors.New("http2: client conn not usable") | |
394 | errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY") | |
395 | ) | |
396 | ||
397 | // afterReqBodyWriteError is a wrapper around errors returned by ClientConn.RoundTrip. | |
398 | // It is used to signal that err happened after part of Request.Body was sent to the server. | |
399 | type afterReqBodyWriteError struct { | |
400 | err error | |
401 | } | |
402 | ||
403 | func (e afterReqBodyWriteError) Error() string { | |
404 | return e.err.Error() + "; some request body already written" | |
405 | } | |
406 | ||
407 | // shouldRetryRequest is called by RoundTrip when a request fails to get | |
408 | // response headers. It is always called with a non-nil error. | |
409 | // It returns either a request to retry (either the same request, or a | |
410 | // modified clone), or an error if the request can't be replayed. | |
411 | func shouldRetryRequest(req *http.Request, err error, afterBodyWrite bool) (*http.Request, error) { | |
412 | if !canRetryError(err) { | |
413 | return nil, err | |
414 | } | |
415 | if !afterBodyWrite { | |
416 | return req, nil | |
417 | } | |
418 | // If the Body is nil (or http.NoBody), it's safe to reuse | |
419 | // this request and its Body. | |
420 | if req.Body == nil || reqBodyIsNoBody(req.Body) { | |
421 | return req, nil | |
422 | } | |
423 | // Otherwise we depend on the Request having its GetBody | |
424 | // func defined. | |
425 | getBody := reqGetBody(req) // Go 1.8: getBody = req.GetBody | |
426 | if getBody == nil { | |
427 | return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err) | |
428 | } | |
429 | body, err := getBody() | |
430 | if err != nil { | |
431 | return nil, err | |
432 | } | |
433 | newReq := *req | |
434 | newReq.Body = body | |
435 | return &newReq, nil | |
436 | } | |
437 | ||
438 | func canRetryError(err error) bool { | |
439 | if err == errClientConnUnusable || err == errClientConnGotGoAway { | |
440 | return true | |
441 | } | |
442 | if se, ok := err.(StreamError); ok { | |
443 | return se.Code == ErrCodeRefusedStream | |
444 | } | |
445 | return false | |
446 | } | |
447 | ||
448 | func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) { | |
449 | host, _, err := net.SplitHostPort(addr) | |
450 | if err != nil { | |
451 | return nil, err | |
452 | } | |
453 | tconn, err := t.dialTLS()("tcp", addr, t.newTLSConfig(host)) | |
454 | if err != nil { | |
455 | return nil, err | |
456 | } | |
457 | return t.newClientConn(tconn, singleUse) | |
458 | } | |
459 | ||
460 | func (t *Transport) newTLSConfig(host string) *tls.Config { | |
461 | cfg := new(tls.Config) | |
462 | if t.TLSClientConfig != nil { | |
463 | *cfg = *cloneTLSConfig(t.TLSClientConfig) | |
464 | } | |
465 | if !strSliceContains(cfg.NextProtos, NextProtoTLS) { | |
466 | cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...) | |
467 | } | |
468 | if cfg.ServerName == "" { | |
469 | cfg.ServerName = host | |
470 | } | |
471 | return cfg | |
472 | } | |
473 | ||
474 | func (t *Transport) dialTLS() func(string, string, *tls.Config) (net.Conn, error) { | |
475 | if t.DialTLS != nil { | |
476 | return t.DialTLS | |
477 | } | |
478 | return t.dialTLSDefault | |
479 | } | |
480 | ||
481 | func (t *Transport) dialTLSDefault(network, addr string, cfg *tls.Config) (net.Conn, error) { | |
482 | cn, err := tls.Dial(network, addr, cfg) | |
483 | if err != nil { | |
484 | return nil, err | |
485 | } | |
486 | if err := cn.Handshake(); err != nil { | |
487 | return nil, err | |
488 | } | |
489 | if !cfg.InsecureSkipVerify { | |
490 | if err := cn.VerifyHostname(cfg.ServerName); err != nil { | |
491 | return nil, err | |
492 | } | |
493 | } | |
494 | state := cn.ConnectionState() | |
495 | if p := state.NegotiatedProtocol; p != NextProtoTLS { | |
496 | return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS) | |
497 | } | |
498 | if !state.NegotiatedProtocolIsMutual { | |
499 | return nil, errors.New("http2: could not negotiate protocol mutually") | |
500 | } | |
501 | return cn, nil | |
502 | } | |
503 | ||
504 | // disableKeepAlives reports whether connections should be closed as | |
505 | // soon as possible after handling the first request. | |
506 | func (t *Transport) disableKeepAlives() bool { | |
507 | return t.t1 != nil && t.t1.DisableKeepAlives | |
508 | } | |
509 | ||
510 | func (t *Transport) expectContinueTimeout() time.Duration { | |
511 | if t.t1 == nil { | |
512 | return 0 | |
513 | } | |
514 | return transportExpectContinueTimeout(t.t1) | |
515 | } | |
516 | ||
517 | func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) { | |
518 | return t.newClientConn(c, false) | |
519 | } | |
520 | ||
521 | func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) { | |
522 | cc := &ClientConn{ | |
523 | t: t, | |
524 | tconn: c, | |
525 | readerDone: make(chan struct{}), | |
526 | nextStreamID: 1, | |
527 | maxFrameSize: 16 << 10, // spec default | |
528 | initialWindowSize: 65535, // spec default | |
529 | maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. | |
530 | peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. | |
531 | streams: make(map[uint32]*clientStream), | |
532 | singleUse: singleUse, | |
533 | wantSettingsAck: true, | |
534 | pings: make(map[[8]byte]chan struct{}), | |
535 | } | |
536 | if d := t.idleConnTimeout(); d != 0 { | |
537 | cc.idleTimeout = d | |
538 | cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout) | |
539 | } | |
540 | if VerboseLogs { | |
541 | t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) | |
542 | } | |
543 | ||
544 | cc.cond = sync.NewCond(&cc.mu) | |
545 | cc.flow.add(int32(initialWindowSize)) | |
546 | ||
547 | // TODO: adjust this writer size to account for frame size + | |
548 | // MTU + crypto/tls record padding. | |
549 | cc.bw = bufio.NewWriter(stickyErrWriter{c, &cc.werr}) | |
550 | cc.br = bufio.NewReader(c) | |
551 | cc.fr = NewFramer(cc.bw, cc.br) | |
552 | cc.fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil) | |
553 | cc.fr.MaxHeaderListSize = t.maxHeaderListSize() | |
554 | ||
555 | // TODO: SetMaxDynamicTableSize, SetMaxDynamicTableSizeLimit on | |
556 | // henc in response to SETTINGS frames? | |
557 | cc.henc = hpack.NewEncoder(&cc.hbuf) | |
558 | ||
559 | if cs, ok := c.(connectionStater); ok { | |
560 | state := cs.ConnectionState() | |
561 | cc.tlsState = &state | |
562 | } | |
563 | ||
564 | initialSettings := []Setting{ | |
565 | {ID: SettingEnablePush, Val: 0}, | |
566 | {ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow}, | |
567 | } | |
568 | if max := t.maxHeaderListSize(); max != 0 { | |
569 | initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max}) | |
570 | } | |
571 | ||
572 | cc.bw.Write(clientPreface) | |
573 | cc.fr.WriteSettings(initialSettings...) | |
574 | cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow) | |
575 | cc.inflow.add(transportDefaultConnFlow + initialWindowSize) | |
576 | cc.bw.Flush() | |
577 | if cc.werr != nil { | |
578 | return nil, cc.werr | |
579 | } | |
580 | ||
581 | go cc.readLoop() | |
582 | return cc, nil | |
583 | } | |
584 | ||
585 | func (cc *ClientConn) setGoAway(f *GoAwayFrame) { | |
586 | cc.mu.Lock() | |
587 | defer cc.mu.Unlock() | |
588 | ||
589 | old := cc.goAway | |
590 | cc.goAway = f | |
591 | ||
592 | // Merge the previous and current GoAway error frames. | |
593 | if cc.goAwayDebug == "" { | |
594 | cc.goAwayDebug = string(f.DebugData()) | |
595 | } | |
596 | if old != nil && old.ErrCode != ErrCodeNo { | |
597 | cc.goAway.ErrCode = old.ErrCode | |
598 | } | |
599 | last := f.LastStreamID | |
600 | for streamID, cs := range cc.streams { | |
601 | if streamID > last { | |
602 | select { | |
603 | case cs.resc <- resAndError{err: errClientConnGotGoAway}: | |
604 | default: | |
605 | } | |
606 | } | |
607 | } | |
608 | } | |
609 | ||
610 | // CanTakeNewRequest reports whether the connection can take a new request, | |
611 | // meaning it has not been closed or received or sent a GOAWAY. | |
612 | func (cc *ClientConn) CanTakeNewRequest() bool { | |
613 | cc.mu.Lock() | |
614 | defer cc.mu.Unlock() | |
615 | return cc.canTakeNewRequestLocked() | |
616 | } | |
617 | ||
618 | func (cc *ClientConn) canTakeNewRequestLocked() bool { | |
619 | if cc.singleUse && cc.nextStreamID > 1 { | |
620 | return false | |
621 | } | |
622 | return cc.goAway == nil && !cc.closed && | |
623 | int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32 | |
624 | } | |
625 | ||
626 | // onIdleTimeout is called from a time.AfterFunc goroutine. It will | |
627 | // only be called when we're idle, but because we're coming from a new | |
628 | // goroutine, there could be a new request coming in at the same time, | |
629 | // so this simply calls the synchronized closeIfIdle to shut down this | |
630 | // connection. The timer could just call closeIfIdle, but this is more | |
631 | // clear. | |
632 | func (cc *ClientConn) onIdleTimeout() { | |
633 | cc.closeIfIdle() | |
634 | } | |
635 | ||
636 | func (cc *ClientConn) closeIfIdle() { | |
637 | cc.mu.Lock() | |
638 | if len(cc.streams) > 0 { | |
639 | cc.mu.Unlock() | |
640 | return | |
641 | } | |
642 | cc.closed = true | |
643 | nextID := cc.nextStreamID | |
644 | // TODO: do clients send GOAWAY too? maybe? Just Close: | |
645 | cc.mu.Unlock() | |
646 | ||
647 | if VerboseLogs { | |
648 | cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2) | |
649 | } | |
650 | cc.tconn.Close() | |
651 | } | |
652 | ||
653 | const maxAllocFrameSize = 512 << 10 | |
654 | ||
655 | // frameBuffer returns a scratch buffer suitable for writing DATA frames. | |
656 | // They're capped at the min of the peer's max frame size or 512KB | |
657 | // (kinda arbitrarily), but definitely capped so we don't allocate 4GB | |
658 | // bufers. | |
659 | func (cc *ClientConn) frameScratchBuffer() []byte { | |
660 | cc.mu.Lock() | |
661 | size := cc.maxFrameSize | |
662 | if size > maxAllocFrameSize { | |
663 | size = maxAllocFrameSize | |
664 | } | |
665 | for i, buf := range cc.freeBuf { | |
666 | if len(buf) >= int(size) { | |
667 | cc.freeBuf[i] = nil | |
668 | cc.mu.Unlock() | |
669 | return buf[:size] | |
670 | } | |
671 | } | |
672 | cc.mu.Unlock() | |
673 | return make([]byte, size) | |
674 | } | |
675 | ||
676 | func (cc *ClientConn) putFrameScratchBuffer(buf []byte) { | |
677 | cc.mu.Lock() | |
678 | defer cc.mu.Unlock() | |
679 | const maxBufs = 4 // arbitrary; 4 concurrent requests per conn? investigate. | |
680 | if len(cc.freeBuf) < maxBufs { | |
681 | cc.freeBuf = append(cc.freeBuf, buf) | |
682 | return | |
683 | } | |
684 | for i, old := range cc.freeBuf { | |
685 | if old == nil { | |
686 | cc.freeBuf[i] = buf | |
687 | return | |
688 | } | |
689 | } | |
690 | // forget about it. | |
691 | } | |
692 | ||
693 | // errRequestCanceled is a copy of net/http's errRequestCanceled because it's not | |
694 | // exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests. | |
695 | var errRequestCanceled = errors.New("net/http: request canceled") | |
696 | ||
697 | func commaSeparatedTrailers(req *http.Request) (string, error) { | |
698 | keys := make([]string, 0, len(req.Trailer)) | |
699 | for k := range req.Trailer { | |
700 | k = http.CanonicalHeaderKey(k) | |
701 | switch k { | |
702 | case "Transfer-Encoding", "Trailer", "Content-Length": | |
703 | return "", &badStringError{"invalid Trailer key", k} | |
704 | } | |
705 | keys = append(keys, k) | |
706 | } | |
707 | if len(keys) > 0 { | |
708 | sort.Strings(keys) | |
709 | return strings.Join(keys, ","), nil | |
710 | } | |
711 | return "", nil | |
712 | } | |
713 | ||
714 | func (cc *ClientConn) responseHeaderTimeout() time.Duration { | |
715 | if cc.t.t1 != nil { | |
716 | return cc.t.t1.ResponseHeaderTimeout | |
717 | } | |
718 | // No way to do this (yet?) with just an http2.Transport. Probably | |
719 | // no need. Request.Cancel this is the new way. We only need to support | |
720 | // this for compatibility with the old http.Transport fields when | |
721 | // we're doing transparent http2. | |
722 | return 0 | |
723 | } | |
724 | ||
725 | // checkConnHeaders checks whether req has any invalid connection-level headers. | |
726 | // per RFC 7540 section 8.1.2.2: Connection-Specific Header Fields. | |
727 | // Certain headers are special-cased as okay but not transmitted later. | |
728 | func checkConnHeaders(req *http.Request) error { | |
729 | if v := req.Header.Get("Upgrade"); v != "" { | |
730 | return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"]) | |
731 | } | |
732 | if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") { | |
733 | return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv) | |
734 | } | |
735 | if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "close" && vv[0] != "keep-alive") { | |
736 | return fmt.Errorf("http2: invalid Connection request header: %q", vv) | |
737 | } | |
738 | return nil | |
739 | } | |
740 | ||
741 | // actualContentLength returns a sanitized version of | |
742 | // req.ContentLength, where 0 actually means zero (not unknown) and -1 | |
743 | // means unknown. | |
744 | func actualContentLength(req *http.Request) int64 { | |
745 | if req.Body == nil || reqBodyIsNoBody(req.Body) { | |
746 | return 0 | |
747 | } | |
748 | if req.ContentLength != 0 { | |
749 | return req.ContentLength | |
750 | } | |
751 | return -1 | |
752 | } | |
753 | ||
754 | func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) { | |
755 | if err := checkConnHeaders(req); err != nil { | |
756 | return nil, err | |
757 | } | |
758 | if cc.idleTimer != nil { | |
759 | cc.idleTimer.Stop() | |
760 | } | |
761 | ||
762 | trailers, err := commaSeparatedTrailers(req) | |
763 | if err != nil { | |
764 | return nil, err | |
765 | } | |
766 | hasTrailers := trailers != "" | |
767 | ||
768 | cc.mu.Lock() | |
769 | if err := cc.awaitOpenSlotForRequest(req); err != nil { | |
770 | cc.mu.Unlock() | |
771 | return nil, err | |
772 | } | |
773 | ||
774 | body := req.Body | |
775 | contentLen := actualContentLength(req) | |
776 | hasBody := contentLen != 0 | |
777 | ||
778 | // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? | |
779 | var requestedGzip bool | |
780 | if !cc.t.disableCompression() && | |
781 | req.Header.Get("Accept-Encoding") == "" && | |
782 | req.Header.Get("Range") == "" && | |
783 | req.Method != "HEAD" { | |
784 | // Request gzip only, not deflate. Deflate is ambiguous and | |
785 | // not as universally supported anyway. | |
786 | // See: http://www.gzip.org/zlib/zlib_faq.html#faq38 | |
787 | // | |
788 | // Note that we don't request this for HEAD requests, | |
789 | // due to a bug in nginx: | |
790 | // http://trac.nginx.org/nginx/ticket/358 | |
791 | // https://golang.org/issue/5522 | |
792 | // | |
793 | // We don't request gzip if the request is for a range, since | |
794 | // auto-decoding a portion of a gzipped document will just fail | |
795 | // anyway. See https://golang.org/issue/8923 | |
796 | requestedGzip = true | |
797 | } | |
798 | ||
799 | // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is | |
800 | // sent by writeRequestBody below, along with any Trailers, | |
801 | // again in form HEADERS{1}, CONTINUATION{0,}) | |
802 | hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen) | |
803 | if err != nil { | |
804 | cc.mu.Unlock() | |
805 | return nil, err | |
806 | } | |
807 | ||
808 | cs := cc.newStream() | |
809 | cs.req = req | |
810 | cs.trace = requestTrace(req) | |
811 | cs.requestedGzip = requestedGzip | |
812 | bodyWriter := cc.t.getBodyWriterState(cs, body) | |
813 | cs.on100 = bodyWriter.on100 | |
814 | ||
815 | cc.wmu.Lock() | |
816 | endStream := !hasBody && !hasTrailers | |
817 | werr := cc.writeHeaders(cs.ID, endStream, hdrs) | |
818 | cc.wmu.Unlock() | |
819 | traceWroteHeaders(cs.trace) | |
820 | cc.mu.Unlock() | |
821 | ||
822 | if werr != nil { | |
823 | if hasBody { | |
824 | req.Body.Close() // per RoundTripper contract | |
825 | bodyWriter.cancel() | |
826 | } | |
827 | cc.forgetStreamID(cs.ID) | |
828 | // Don't bother sending a RST_STREAM (our write already failed; | |
829 | // no need to keep writing) | |
830 | traceWroteRequest(cs.trace, werr) | |
831 | return nil, werr | |
832 | } | |
833 | ||
834 | var respHeaderTimer <-chan time.Time | |
835 | if hasBody { | |
836 | bodyWriter.scheduleBodyWrite() | |
837 | } else { | |
838 | traceWroteRequest(cs.trace, nil) | |
839 | if d := cc.responseHeaderTimeout(); d != 0 { | |
840 | timer := time.NewTimer(d) | |
841 | defer timer.Stop() | |
842 | respHeaderTimer = timer.C | |
843 | } | |
844 | } | |
845 | ||
846 | readLoopResCh := cs.resc | |
847 | bodyWritten := false | |
848 | ctx := reqContext(req) | |
849 | ||
850 | handleReadLoopResponse := func(re resAndError) (*http.Response, error) { | |
851 | res := re.res | |
852 | if re.err != nil || res.StatusCode > 299 { | |
853 | // On error or status code 3xx, 4xx, 5xx, etc abort any | |
854 | // ongoing write, assuming that the server doesn't care | |
855 | // about our request body. If the server replied with 1xx or | |
856 | // 2xx, however, then assume the server DOES potentially | |
857 | // want our body (e.g. full-duplex streaming: | |
858 | // golang.org/issue/13444). If it turns out the server | |
859 | // doesn't, they'll RST_STREAM us soon enough. This is a | |
860 | // heuristic to avoid adding knobs to Transport. Hopefully | |
861 | // we can keep it. | |
862 | bodyWriter.cancel() | |
863 | cs.abortRequestBodyWrite(errStopReqBodyWrite) | |
864 | } | |
865 | if re.err != nil { | |
866 | cc.mu.Lock() | |
867 | afterBodyWrite := cs.startedWrite | |
868 | cc.mu.Unlock() | |
869 | cc.forgetStreamID(cs.ID) | |
870 | if afterBodyWrite { | |
871 | return nil, afterReqBodyWriteError{re.err} | |
872 | } | |
873 | return nil, re.err | |
874 | } | |
875 | res.Request = req | |
876 | res.TLS = cc.tlsState | |
877 | return res, nil | |
878 | } | |
879 | ||
880 | for { | |
881 | select { | |
882 | case re := <-readLoopResCh: | |
883 | return handleReadLoopResponse(re) | |
884 | case <-respHeaderTimer: | |
885 | if !hasBody || bodyWritten { | |
886 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
887 | } else { | |
888 | bodyWriter.cancel() | |
889 | cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) | |
890 | } | |
891 | cc.forgetStreamID(cs.ID) | |
892 | return nil, errTimeout | |
893 | case <-ctx.Done(): | |
894 | if !hasBody || bodyWritten { | |
895 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
896 | } else { | |
897 | bodyWriter.cancel() | |
898 | cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) | |
899 | } | |
900 | cc.forgetStreamID(cs.ID) | |
901 | return nil, ctx.Err() | |
902 | case <-req.Cancel: | |
903 | if !hasBody || bodyWritten { | |
904 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
905 | } else { | |
906 | bodyWriter.cancel() | |
907 | cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) | |
908 | } | |
909 | cc.forgetStreamID(cs.ID) | |
910 | return nil, errRequestCanceled | |
911 | case <-cs.peerReset: | |
912 | // processResetStream already removed the | |
913 | // stream from the streams map; no need for | |
914 | // forgetStreamID. | |
915 | return nil, cs.resetErr | |
916 | case err := <-bodyWriter.resc: | |
917 | // Prefer the read loop's response, if available. Issue 16102. | |
918 | select { | |
919 | case re := <-readLoopResCh: | |
920 | return handleReadLoopResponse(re) | |
921 | default: | |
922 | } | |
923 | if err != nil { | |
924 | return nil, err | |
925 | } | |
926 | bodyWritten = true | |
927 | if d := cc.responseHeaderTimeout(); d != 0 { | |
928 | timer := time.NewTimer(d) | |
929 | defer timer.Stop() | |
930 | respHeaderTimer = timer.C | |
931 | } | |
932 | } | |
933 | } | |
934 | } | |
935 | ||
936 | // awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams. | |
937 | // Must hold cc.mu. | |
938 | func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error { | |
939 | var waitingForConn chan struct{} | |
940 | var waitingForConnErr error // guarded by cc.mu | |
941 | for { | |
942 | cc.lastActive = time.Now() | |
943 | if cc.closed || !cc.canTakeNewRequestLocked() { | |
944 | return errClientConnUnusable | |
945 | } | |
946 | if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) { | |
947 | if waitingForConn != nil { | |
948 | close(waitingForConn) | |
949 | } | |
950 | return nil | |
951 | } | |
952 | // Unfortunately, we cannot wait on a condition variable and channel at | |
953 | // the same time, so instead, we spin up a goroutine to check if the | |
954 | // request is canceled while we wait for a slot to open in the connection. | |
955 | if waitingForConn == nil { | |
956 | waitingForConn = make(chan struct{}) | |
957 | go func() { | |
958 | if err := awaitRequestCancel(req, waitingForConn); err != nil { | |
959 | cc.mu.Lock() | |
960 | waitingForConnErr = err | |
961 | cc.cond.Broadcast() | |
962 | cc.mu.Unlock() | |
963 | } | |
964 | }() | |
965 | } | |
966 | cc.pendingRequests++ | |
967 | cc.cond.Wait() | |
968 | cc.pendingRequests-- | |
969 | if waitingForConnErr != nil { | |
970 | return waitingForConnErr | |
971 | } | |
972 | } | |
973 | } | |
974 | ||
975 | // requires cc.wmu be held | |
976 | func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error { | |
977 | first := true // first frame written (HEADERS is first, then CONTINUATION) | |
978 | frameSize := int(cc.maxFrameSize) | |
979 | for len(hdrs) > 0 && cc.werr == nil { | |
980 | chunk := hdrs | |
981 | if len(chunk) > frameSize { | |
982 | chunk = chunk[:frameSize] | |
983 | } | |
984 | hdrs = hdrs[len(chunk):] | |
985 | endHeaders := len(hdrs) == 0 | |
986 | if first { | |
987 | cc.fr.WriteHeaders(HeadersFrameParam{ | |
988 | StreamID: streamID, | |
989 | BlockFragment: chunk, | |
990 | EndStream: endStream, | |
991 | EndHeaders: endHeaders, | |
992 | }) | |
993 | first = false | |
994 | } else { | |
995 | cc.fr.WriteContinuation(streamID, endHeaders, chunk) | |
996 | } | |
997 | } | |
998 | // TODO(bradfitz): this Flush could potentially block (as | |
999 | // could the WriteHeaders call(s) above), which means they | |
1000 | // wouldn't respond to Request.Cancel being readable. That's | |
1001 | // rare, but this should probably be in a goroutine. | |
1002 | cc.bw.Flush() | |
1003 | return cc.werr | |
1004 | } | |
1005 | ||
1006 | // internal error values; they don't escape to callers | |
1007 | var ( | |
1008 | // abort request body write; don't send cancel | |
1009 | errStopReqBodyWrite = errors.New("http2: aborting request body write") | |
1010 | ||
1011 | // abort request body write, but send stream reset of cancel. | |
1012 | errStopReqBodyWriteAndCancel = errors.New("http2: canceling request") | |
1013 | ) | |
1014 | ||
1015 | func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) { | |
1016 | cc := cs.cc | |
1017 | sentEnd := false // whether we sent the final DATA frame w/ END_STREAM | |
1018 | buf := cc.frameScratchBuffer() | |
1019 | defer cc.putFrameScratchBuffer(buf) | |
1020 | ||
1021 | defer func() { | |
1022 | traceWroteRequest(cs.trace, err) | |
1023 | // TODO: write h12Compare test showing whether | |
1024 | // Request.Body is closed by the Transport, | |
1025 | // and in multiple cases: server replies <=299 and >299 | |
1026 | // while still writing request body | |
1027 | cerr := bodyCloser.Close() | |
1028 | if err == nil { | |
1029 | err = cerr | |
1030 | } | |
1031 | }() | |
1032 | ||
1033 | req := cs.req | |
1034 | hasTrailers := req.Trailer != nil | |
1035 | ||
1036 | var sawEOF bool | |
1037 | for !sawEOF { | |
1038 | n, err := body.Read(buf) | |
1039 | if err == io.EOF { | |
1040 | sawEOF = true | |
1041 | err = nil | |
1042 | } else if err != nil { | |
1043 | return err | |
1044 | } | |
1045 | ||
1046 | remain := buf[:n] | |
1047 | for len(remain) > 0 && err == nil { | |
1048 | var allowed int32 | |
1049 | allowed, err = cs.awaitFlowControl(len(remain)) | |
1050 | switch { | |
1051 | case err == errStopReqBodyWrite: | |
1052 | return err | |
1053 | case err == errStopReqBodyWriteAndCancel: | |
1054 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
1055 | return err | |
1056 | case err != nil: | |
1057 | return err | |
1058 | } | |
1059 | cc.wmu.Lock() | |
1060 | data := remain[:allowed] | |
1061 | remain = remain[allowed:] | |
1062 | sentEnd = sawEOF && len(remain) == 0 && !hasTrailers | |
1063 | err = cc.fr.WriteData(cs.ID, sentEnd, data) | |
1064 | if err == nil { | |
1065 | // TODO(bradfitz): this flush is for latency, not bandwidth. | |
1066 | // Most requests won't need this. Make this opt-in or | |
1067 | // opt-out? Use some heuristic on the body type? Nagel-like | |
1068 | // timers? Based on 'n'? Only last chunk of this for loop, | |
1069 | // unless flow control tokens are low? For now, always. | |
1070 | // If we change this, see comment below. | |
1071 | err = cc.bw.Flush() | |
1072 | } | |
1073 | cc.wmu.Unlock() | |
1074 | } | |
1075 | if err != nil { | |
1076 | return err | |
1077 | } | |
1078 | } | |
1079 | ||
1080 | if sentEnd { | |
1081 | // Already sent END_STREAM (which implies we have no | |
1082 | // trailers) and flushed, because currently all | |
1083 | // WriteData frames above get a flush. So we're done. | |
1084 | return nil | |
1085 | } | |
1086 | ||
1087 | var trls []byte | |
1088 | if hasTrailers { | |
1089 | cc.mu.Lock() | |
1090 | trls, err = cc.encodeTrailers(req) | |
1091 | cc.mu.Unlock() | |
1092 | if err != nil { | |
1093 | cc.writeStreamReset(cs.ID, ErrCodeInternal, err) | |
1094 | cc.forgetStreamID(cs.ID) | |
1095 | return err | |
1096 | } | |
1097 | } | |
1098 | ||
1099 | cc.wmu.Lock() | |
1100 | defer cc.wmu.Unlock() | |
1101 | ||
1102 | // Two ways to send END_STREAM: either with trailers, or | |
1103 | // with an empty DATA frame. | |
1104 | if len(trls) > 0 { | |
1105 | err = cc.writeHeaders(cs.ID, true, trls) | |
1106 | } else { | |
1107 | err = cc.fr.WriteData(cs.ID, true, nil) | |
1108 | } | |
1109 | if ferr := cc.bw.Flush(); ferr != nil && err == nil { | |
1110 | err = ferr | |
1111 | } | |
1112 | return err | |
1113 | } | |
1114 | ||
1115 | // awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow | |
1116 | // control tokens from the server. | |
1117 | // It returns either the non-zero number of tokens taken or an error | |
1118 | // if the stream is dead. | |
1119 | func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) { | |
1120 | cc := cs.cc | |
1121 | cc.mu.Lock() | |
1122 | defer cc.mu.Unlock() | |
1123 | for { | |
1124 | if cc.closed { | |
1125 | return 0, errClientConnClosed | |
1126 | } | |
1127 | if cs.stopReqBody != nil { | |
1128 | return 0, cs.stopReqBody | |
1129 | } | |
1130 | if err := cs.checkResetOrDone(); err != nil { | |
1131 | return 0, err | |
1132 | } | |
1133 | if a := cs.flow.available(); a > 0 { | |
1134 | take := a | |
1135 | if int(take) > maxBytes { | |
1136 | ||
1137 | take = int32(maxBytes) // can't truncate int; take is int32 | |
1138 | } | |
1139 | if take > int32(cc.maxFrameSize) { | |
1140 | take = int32(cc.maxFrameSize) | |
1141 | } | |
1142 | cs.flow.take(take) | |
1143 | return take, nil | |
1144 | } | |
1145 | cc.cond.Wait() | |
1146 | } | |
1147 | } | |
1148 | ||
1149 | type badStringError struct { | |
1150 | what string | |
1151 | str string | |
1152 | } | |
1153 | ||
1154 | func (e *badStringError) Error() string { return fmt.Sprintf("%s %q", e.what, e.str) } | |
1155 | ||
1156 | // requires cc.mu be held. | |
1157 | func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) { | |
1158 | cc.hbuf.Reset() | |
1159 | ||
1160 | host := req.Host | |
1161 | if host == "" { | |
1162 | host = req.URL.Host | |
1163 | } | |
1164 | host, err := httplex.PunycodeHostPort(host) | |
1165 | if err != nil { | |
1166 | return nil, err | |
1167 | } | |
1168 | ||
1169 | var path string | |
1170 | if req.Method != "CONNECT" { | |
1171 | path = req.URL.RequestURI() | |
1172 | if !validPseudoPath(path) { | |
1173 | orig := path | |
1174 | path = strings.TrimPrefix(path, req.URL.Scheme+"://"+host) | |
1175 | if !validPseudoPath(path) { | |
1176 | if req.URL.Opaque != "" { | |
1177 | return nil, fmt.Errorf("invalid request :path %q from URL.Opaque = %q", orig, req.URL.Opaque) | |
1178 | } else { | |
1179 | return nil, fmt.Errorf("invalid request :path %q", orig) | |
1180 | } | |
1181 | } | |
1182 | } | |
1183 | } | |
1184 | ||
1185 | // Check for any invalid headers and return an error before we | |
1186 | // potentially pollute our hpack state. (We want to be able to | |
1187 | // continue to reuse the hpack encoder for future requests) | |
1188 | for k, vv := range req.Header { | |
1189 | if !httplex.ValidHeaderFieldName(k) { | |
1190 | return nil, fmt.Errorf("invalid HTTP header name %q", k) | |
1191 | } | |
1192 | for _, v := range vv { | |
1193 | if !httplex.ValidHeaderFieldValue(v) { | |
1194 | return nil, fmt.Errorf("invalid HTTP header value %q for header %q", v, k) | |
1195 | } | |
1196 | } | |
1197 | } | |
1198 | ||
1199 | enumerateHeaders := func(f func(name, value string)) { | |
1200 | // 8.1.2.3 Request Pseudo-Header Fields | |
1201 | // The :path pseudo-header field includes the path and query parts of the | |
1202 | // target URI (the path-absolute production and optionally a '?' character | |
1203 | // followed by the query production (see Sections 3.3 and 3.4 of | |
1204 | // [RFC3986]). | |
1205 | f(":authority", host) | |
1206 | f(":method", req.Method) | |
1207 | if req.Method != "CONNECT" { | |
1208 | f(":path", path) | |
1209 | f(":scheme", req.URL.Scheme) | |
1210 | } | |
1211 | if trailers != "" { | |
1212 | f("trailer", trailers) | |
1213 | } | |
1214 | ||
1215 | var didUA bool | |
1216 | for k, vv := range req.Header { | |
1217 | if strings.EqualFold(k, "host") || strings.EqualFold(k, "content-length") { | |
1218 | // Host is :authority, already sent. | |
1219 | // Content-Length is automatic, set below. | |
1220 | continue | |
1221 | } else if strings.EqualFold(k, "connection") || strings.EqualFold(k, "proxy-connection") || | |
1222 | strings.EqualFold(k, "transfer-encoding") || strings.EqualFold(k, "upgrade") || | |
1223 | strings.EqualFold(k, "keep-alive") { | |
1224 | // Per 8.1.2.2 Connection-Specific Header | |
1225 | // Fields, don't send connection-specific | |
1226 | // fields. We have already checked if any | |
1227 | // are error-worthy so just ignore the rest. | |
1228 | continue | |
1229 | } else if strings.EqualFold(k, "user-agent") { | |
1230 | // Match Go's http1 behavior: at most one | |
1231 | // User-Agent. If set to nil or empty string, | |
1232 | // then omit it. Otherwise if not mentioned, | |
1233 | // include the default (below). | |
1234 | didUA = true | |
1235 | if len(vv) < 1 { | |
1236 | continue | |
1237 | } | |
1238 | vv = vv[:1] | |
1239 | if vv[0] == "" { | |
1240 | continue | |
1241 | } | |
1242 | ||
1243 | } | |
1244 | ||
1245 | for _, v := range vv { | |
1246 | f(k, v) | |
1247 | } | |
1248 | } | |
1249 | if shouldSendReqContentLength(req.Method, contentLength) { | |
1250 | f("content-length", strconv.FormatInt(contentLength, 10)) | |
1251 | } | |
1252 | if addGzipHeader { | |
1253 | f("accept-encoding", "gzip") | |
1254 | } | |
1255 | if !didUA { | |
1256 | f("user-agent", defaultUserAgent) | |
1257 | } | |
1258 | } | |
1259 | ||
1260 | // Do a first pass over the headers counting bytes to ensure | |
1261 | // we don't exceed cc.peerMaxHeaderListSize. This is done as a | |
1262 | // separate pass before encoding the headers to prevent | |
1263 | // modifying the hpack state. | |
1264 | hlSize := uint64(0) | |
1265 | enumerateHeaders(func(name, value string) { | |
1266 | hf := hpack.HeaderField{Name: name, Value: value} | |
1267 | hlSize += uint64(hf.Size()) | |
1268 | }) | |
1269 | ||
1270 | if hlSize > cc.peerMaxHeaderListSize { | |
1271 | return nil, errRequestHeaderListSize | |
1272 | } | |
1273 | ||
1274 | // Header list size is ok. Write the headers. | |
1275 | enumerateHeaders(func(name, value string) { | |
1276 | cc.writeHeader(strings.ToLower(name), value) | |
1277 | }) | |
1278 | ||
1279 | return cc.hbuf.Bytes(), nil | |
1280 | } | |
1281 | ||
1282 | // shouldSendReqContentLength reports whether the http2.Transport should send | |
1283 | // a "content-length" request header. This logic is basically a copy of the net/http | |
1284 | // transferWriter.shouldSendContentLength. | |
1285 | // The contentLength is the corrected contentLength (so 0 means actually 0, not unknown). | |
1286 | // -1 means unknown. | |
1287 | func shouldSendReqContentLength(method string, contentLength int64) bool { | |
1288 | if contentLength > 0 { | |
1289 | return true | |
1290 | } | |
1291 | if contentLength < 0 { | |
1292 | return false | |
1293 | } | |
1294 | // For zero bodies, whether we send a content-length depends on the method. | |
1295 | // It also kinda doesn't matter for http2 either way, with END_STREAM. | |
1296 | switch method { | |
1297 | case "POST", "PUT", "PATCH": | |
1298 | return true | |
1299 | default: | |
1300 | return false | |
1301 | } | |
1302 | } | |
1303 | ||
1304 | // requires cc.mu be held. | |
1305 | func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) { | |
1306 | cc.hbuf.Reset() | |
1307 | ||
1308 | hlSize := uint64(0) | |
1309 | for k, vv := range req.Trailer { | |
1310 | for _, v := range vv { | |
1311 | hf := hpack.HeaderField{Name: k, Value: v} | |
1312 | hlSize += uint64(hf.Size()) | |
1313 | } | |
1314 | } | |
1315 | if hlSize > cc.peerMaxHeaderListSize { | |
1316 | return nil, errRequestHeaderListSize | |
1317 | } | |
1318 | ||
1319 | for k, vv := range req.Trailer { | |
1320 | // Transfer-Encoding, etc.. have already been filtered at the | |
1321 | // start of RoundTrip | |
1322 | lowKey := strings.ToLower(k) | |
1323 | for _, v := range vv { | |
1324 | cc.writeHeader(lowKey, v) | |
1325 | } | |
1326 | } | |
1327 | return cc.hbuf.Bytes(), nil | |
1328 | } | |
1329 | ||
1330 | func (cc *ClientConn) writeHeader(name, value string) { | |
1331 | if VerboseLogs { | |
1332 | log.Printf("http2: Transport encoding header %q = %q", name, value) | |
1333 | } | |
1334 | cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value}) | |
1335 | } | |
1336 | ||
1337 | type resAndError struct { | |
1338 | res *http.Response | |
1339 | err error | |
1340 | } | |
1341 | ||
1342 | // requires cc.mu be held. | |
1343 | func (cc *ClientConn) newStream() *clientStream { | |
1344 | cs := &clientStream{ | |
1345 | cc: cc, | |
1346 | ID: cc.nextStreamID, | |
1347 | resc: make(chan resAndError, 1), | |
1348 | peerReset: make(chan struct{}), | |
1349 | done: make(chan struct{}), | |
1350 | } | |
1351 | cs.flow.add(int32(cc.initialWindowSize)) | |
1352 | cs.flow.setConnFlow(&cc.flow) | |
1353 | cs.inflow.add(transportDefaultStreamFlow) | |
1354 | cs.inflow.setConnFlow(&cc.inflow) | |
1355 | cc.nextStreamID += 2 | |
1356 | cc.streams[cs.ID] = cs | |
1357 | return cs | |
1358 | } | |
1359 | ||
1360 | func (cc *ClientConn) forgetStreamID(id uint32) { | |
1361 | cc.streamByID(id, true) | |
1362 | } | |
1363 | ||
1364 | func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream { | |
1365 | cc.mu.Lock() | |
1366 | defer cc.mu.Unlock() | |
1367 | cs := cc.streams[id] | |
1368 | if andRemove && cs != nil && !cc.closed { | |
1369 | cc.lastActive = time.Now() | |
1370 | delete(cc.streams, id) | |
1371 | if len(cc.streams) == 0 && cc.idleTimer != nil { | |
1372 | cc.idleTimer.Reset(cc.idleTimeout) | |
1373 | } | |
1374 | close(cs.done) | |
1375 | // Wake up checkResetOrDone via clientStream.awaitFlowControl and | |
1376 | // wake up RoundTrip if there is a pending request. | |
1377 | cc.cond.Broadcast() | |
1378 | } | |
1379 | return cs | |
1380 | } | |
1381 | ||
1382 | // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop. | |
1383 | type clientConnReadLoop struct { | |
1384 | cc *ClientConn | |
1385 | activeRes map[uint32]*clientStream // keyed by streamID | |
1386 | closeWhenIdle bool | |
1387 | } | |
1388 | ||
1389 | // readLoop runs in its own goroutine and reads and dispatches frames. | |
1390 | func (cc *ClientConn) readLoop() { | |
1391 | rl := &clientConnReadLoop{ | |
1392 | cc: cc, | |
1393 | activeRes: make(map[uint32]*clientStream), | |
1394 | } | |
1395 | ||
1396 | defer rl.cleanup() | |
1397 | cc.readerErr = rl.run() | |
1398 | if ce, ok := cc.readerErr.(ConnectionError); ok { | |
1399 | cc.wmu.Lock() | |
1400 | cc.fr.WriteGoAway(0, ErrCode(ce), nil) | |
1401 | cc.wmu.Unlock() | |
1402 | } | |
1403 | } | |
1404 | ||
1405 | // GoAwayError is returned by the Transport when the server closes the | |
1406 | // TCP connection after sending a GOAWAY frame. | |
1407 | type GoAwayError struct { | |
1408 | LastStreamID uint32 | |
1409 | ErrCode ErrCode | |
1410 | DebugData string | |
1411 | } | |
1412 | ||
1413 | func (e GoAwayError) Error() string { | |
1414 | return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q", | |
1415 | e.LastStreamID, e.ErrCode, e.DebugData) | |
1416 | } | |
1417 | ||
1418 | func isEOFOrNetReadError(err error) bool { | |
1419 | if err == io.EOF { | |
1420 | return true | |
1421 | } | |
1422 | ne, ok := err.(*net.OpError) | |
1423 | return ok && ne.Op == "read" | |
1424 | } | |
1425 | ||
1426 | func (rl *clientConnReadLoop) cleanup() { | |
1427 | cc := rl.cc | |
1428 | defer cc.tconn.Close() | |
1429 | defer cc.t.connPool().MarkDead(cc) | |
1430 | defer close(cc.readerDone) | |
1431 | ||
1432 | if cc.idleTimer != nil { | |
1433 | cc.idleTimer.Stop() | |
1434 | } | |
1435 | ||
1436 | // Close any response bodies if the server closes prematurely. | |
1437 | // TODO: also do this if we've written the headers but not | |
1438 | // gotten a response yet. | |
1439 | err := cc.readerErr | |
1440 | cc.mu.Lock() | |
1441 | if cc.goAway != nil && isEOFOrNetReadError(err) { | |
1442 | err = GoAwayError{ | |
1443 | LastStreamID: cc.goAway.LastStreamID, | |
1444 | ErrCode: cc.goAway.ErrCode, | |
1445 | DebugData: cc.goAwayDebug, | |
1446 | } | |
1447 | } else if err == io.EOF { | |
1448 | err = io.ErrUnexpectedEOF | |
1449 | } | |
1450 | for _, cs := range rl.activeRes { | |
1451 | cs.bufPipe.CloseWithError(err) | |
1452 | } | |
1453 | for _, cs := range cc.streams { | |
1454 | select { | |
1455 | case cs.resc <- resAndError{err: err}: | |
1456 | default: | |
1457 | } | |
1458 | close(cs.done) | |
1459 | } | |
1460 | cc.closed = true | |
1461 | cc.cond.Broadcast() | |
1462 | cc.mu.Unlock() | |
1463 | } | |
1464 | ||
1465 | func (rl *clientConnReadLoop) run() error { | |
1466 | cc := rl.cc | |
1467 | rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse | |
1468 | gotReply := false // ever saw a HEADERS reply | |
1469 | gotSettings := false | |
1470 | for { | |
1471 | f, err := cc.fr.ReadFrame() | |
1472 | if err != nil { | |
1473 | cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) | |
1474 | } | |
1475 | if se, ok := err.(StreamError); ok { | |
1476 | if cs := cc.streamByID(se.StreamID, false); cs != nil { | |
1477 | cs.cc.writeStreamReset(cs.ID, se.Code, err) | |
1478 | cs.cc.forgetStreamID(cs.ID) | |
1479 | if se.Cause == nil { | |
1480 | se.Cause = cc.fr.errDetail | |
1481 | } | |
1482 | rl.endStreamError(cs, se) | |
1483 | } | |
1484 | continue | |
1485 | } else if err != nil { | |
1486 | return err | |
1487 | } | |
1488 | if VerboseLogs { | |
1489 | cc.vlogf("http2: Transport received %s", summarizeFrame(f)) | |
1490 | } | |
1491 | if !gotSettings { | |
1492 | if _, ok := f.(*SettingsFrame); !ok { | |
1493 | cc.logf("protocol error: received %T before a SETTINGS frame", f) | |
1494 | return ConnectionError(ErrCodeProtocol) | |
1495 | } | |
1496 | gotSettings = true | |
1497 | } | |
1498 | maybeIdle := false // whether frame might transition us to idle | |
1499 | ||
1500 | switch f := f.(type) { | |
1501 | case *MetaHeadersFrame: | |
1502 | err = rl.processHeaders(f) | |
1503 | maybeIdle = true | |
1504 | gotReply = true | |
1505 | case *DataFrame: | |
1506 | err = rl.processData(f) | |
1507 | maybeIdle = true | |
1508 | case *GoAwayFrame: | |
1509 | err = rl.processGoAway(f) | |
1510 | maybeIdle = true | |
1511 | case *RSTStreamFrame: | |
1512 | err = rl.processResetStream(f) | |
1513 | maybeIdle = true | |
1514 | case *SettingsFrame: | |
1515 | err = rl.processSettings(f) | |
1516 | case *PushPromiseFrame: | |
1517 | err = rl.processPushPromise(f) | |
1518 | case *WindowUpdateFrame: | |
1519 | err = rl.processWindowUpdate(f) | |
1520 | case *PingFrame: | |
1521 | err = rl.processPing(f) | |
1522 | default: | |
1523 | cc.logf("Transport: unhandled response frame type %T", f) | |
1524 | } | |
1525 | if err != nil { | |
1526 | if VerboseLogs { | |
1527 | cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err) | |
1528 | } | |
1529 | return err | |
1530 | } | |
1531 | if rl.closeWhenIdle && gotReply && maybeIdle && len(rl.activeRes) == 0 { | |
1532 | cc.closeIfIdle() | |
1533 | } | |
1534 | } | |
1535 | } | |
1536 | ||
1537 | func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error { | |
1538 | cc := rl.cc | |
1539 | cs := cc.streamByID(f.StreamID, f.StreamEnded()) | |
1540 | if cs == nil { | |
1541 | // We'd get here if we canceled a request while the | |
1542 | // server had its response still in flight. So if this | |
1543 | // was just something we canceled, ignore it. | |
1544 | return nil | |
1545 | } | |
1546 | if !cs.firstByte { | |
1547 | if cs.trace != nil { | |
1548 | // TODO(bradfitz): move first response byte earlier, | |
1549 | // when we first read the 9 byte header, not waiting | |
1550 | // until all the HEADERS+CONTINUATION frames have been | |
1551 | // merged. This works for now. | |
1552 | traceFirstResponseByte(cs.trace) | |
1553 | } | |
1554 | cs.firstByte = true | |
1555 | } | |
1556 | if !cs.pastHeaders { | |
1557 | cs.pastHeaders = true | |
1558 | } else { | |
1559 | return rl.processTrailers(cs, f) | |
1560 | } | |
1561 | ||
1562 | res, err := rl.handleResponse(cs, f) | |
1563 | if err != nil { | |
1564 | if _, ok := err.(ConnectionError); ok { | |
1565 | return err | |
1566 | } | |
1567 | // Any other error type is a stream error. | |
1568 | cs.cc.writeStreamReset(f.StreamID, ErrCodeProtocol, err) | |
1569 | cs.resc <- resAndError{err: err} | |
1570 | return nil // return nil from process* funcs to keep conn alive | |
1571 | } | |
1572 | if res == nil { | |
1573 | // (nil, nil) special case. See handleResponse docs. | |
1574 | return nil | |
1575 | } | |
1576 | if res.Body != noBody { | |
1577 | rl.activeRes[cs.ID] = cs | |
1578 | } | |
1579 | cs.resTrailer = &res.Trailer | |
1580 | cs.resc <- resAndError{res: res} | |
1581 | return nil | |
1582 | } | |
1583 | ||
1584 | // may return error types nil, or ConnectionError. Any other error value | |
1585 | // is a StreamError of type ErrCodeProtocol. The returned error in that case | |
1586 | // is the detail. | |
1587 | // | |
1588 | // As a special case, handleResponse may return (nil, nil) to skip the | |
1589 | // frame (currently only used for 100 expect continue). This special | |
1590 | // case is going away after Issue 13851 is fixed. | |
1591 | func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*http.Response, error) { | |
1592 | if f.Truncated { | |
1593 | return nil, errResponseHeaderListSize | |
1594 | } | |
1595 | ||
1596 | status := f.PseudoValue("status") | |
1597 | if status == "" { | |
1598 | return nil, errors.New("missing status pseudo header") | |
1599 | } | |
1600 | statusCode, err := strconv.Atoi(status) | |
1601 | if err != nil { | |
1602 | return nil, errors.New("malformed non-numeric status pseudo header") | |
1603 | } | |
1604 | ||
1605 | if statusCode == 100 { | |
1606 | traceGot100Continue(cs.trace) | |
1607 | if cs.on100 != nil { | |
1608 | cs.on100() // forces any write delay timer to fire | |
1609 | } | |
1610 | cs.pastHeaders = false // do it all again | |
1611 | return nil, nil | |
1612 | } | |
1613 | ||
1614 | header := make(http.Header) | |
1615 | res := &http.Response{ | |
1616 | Proto: "HTTP/2.0", | |
1617 | ProtoMajor: 2, | |
1618 | Header: header, | |
1619 | StatusCode: statusCode, | |
1620 | Status: status + " " + http.StatusText(statusCode), | |
1621 | } | |
1622 | for _, hf := range f.RegularFields() { | |
1623 | key := http.CanonicalHeaderKey(hf.Name) | |
1624 | if key == "Trailer" { | |
1625 | t := res.Trailer | |
1626 | if t == nil { | |
1627 | t = make(http.Header) | |
1628 | res.Trailer = t | |
1629 | } | |
1630 | foreachHeaderElement(hf.Value, func(v string) { | |
1631 | t[http.CanonicalHeaderKey(v)] = nil | |
1632 | }) | |
1633 | } else { | |
1634 | header[key] = append(header[key], hf.Value) | |
1635 | } | |
1636 | } | |
1637 | ||
1638 | streamEnded := f.StreamEnded() | |
1639 | isHead := cs.req.Method == "HEAD" | |
1640 | if !streamEnded || isHead { | |
1641 | res.ContentLength = -1 | |
1642 | if clens := res.Header["Content-Length"]; len(clens) == 1 { | |
1643 | if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil { | |
1644 | res.ContentLength = clen64 | |
1645 | } else { | |
1646 | // TODO: care? unlike http/1, it won't mess up our framing, so it's | |
1647 | // more safe smuggling-wise to ignore. | |
1648 | } | |
1649 | } else if len(clens) > 1 { | |
1650 | // TODO: care? unlike http/1, it won't mess up our framing, so it's | |
1651 | // more safe smuggling-wise to ignore. | |
1652 | } | |
1653 | } | |
1654 | ||
1655 | if streamEnded || isHead { | |
1656 | res.Body = noBody | |
1657 | return res, nil | |
1658 | } | |
1659 | ||
1660 | cs.bufPipe = pipe{b: &dataBuffer{expected: res.ContentLength}} | |
1661 | cs.bytesRemain = res.ContentLength | |
1662 | res.Body = transportResponseBody{cs} | |
1663 | go cs.awaitRequestCancel(cs.req) | |
1664 | ||
1665 | if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" { | |
1666 | res.Header.Del("Content-Encoding") | |
1667 | res.Header.Del("Content-Length") | |
1668 | res.ContentLength = -1 | |
1669 | res.Body = &gzipReader{body: res.Body} | |
1670 | setResponseUncompressed(res) | |
1671 | } | |
1672 | return res, nil | |
1673 | } | |
1674 | ||
1675 | func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error { | |
1676 | if cs.pastTrailers { | |
1677 | // Too many HEADERS frames for this stream. | |
1678 | return ConnectionError(ErrCodeProtocol) | |
1679 | } | |
1680 | cs.pastTrailers = true | |
1681 | if !f.StreamEnded() { | |
1682 | // We expect that any headers for trailers also | |
1683 | // has END_STREAM. | |
1684 | return ConnectionError(ErrCodeProtocol) | |
1685 | } | |
1686 | if len(f.PseudoFields()) > 0 { | |
1687 | // No pseudo header fields are defined for trailers. | |
1688 | // TODO: ConnectionError might be overly harsh? Check. | |
1689 | return ConnectionError(ErrCodeProtocol) | |
1690 | } | |
1691 | ||
1692 | trailer := make(http.Header) | |
1693 | for _, hf := range f.RegularFields() { | |
1694 | key := http.CanonicalHeaderKey(hf.Name) | |
1695 | trailer[key] = append(trailer[key], hf.Value) | |
1696 | } | |
1697 | cs.trailer = trailer | |
1698 | ||
1699 | rl.endStream(cs) | |
1700 | return nil | |
1701 | } | |
1702 | ||
1703 | // transportResponseBody is the concrete type of Transport.RoundTrip's | |
1704 | // Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body. | |
1705 | // On Close it sends RST_STREAM if EOF wasn't already seen. | |
1706 | type transportResponseBody struct { | |
1707 | cs *clientStream | |
1708 | } | |
1709 | ||
1710 | func (b transportResponseBody) Read(p []byte) (n int, err error) { | |
1711 | cs := b.cs | |
1712 | cc := cs.cc | |
1713 | ||
1714 | if cs.readErr != nil { | |
1715 | return 0, cs.readErr | |
1716 | } | |
1717 | n, err = b.cs.bufPipe.Read(p) | |
1718 | if cs.bytesRemain != -1 { | |
1719 | if int64(n) > cs.bytesRemain { | |
1720 | n = int(cs.bytesRemain) | |
1721 | if err == nil { | |
1722 | err = errors.New("net/http: server replied with more than declared Content-Length; truncated") | |
1723 | cc.writeStreamReset(cs.ID, ErrCodeProtocol, err) | |
1724 | } | |
1725 | cs.readErr = err | |
1726 | return int(cs.bytesRemain), err | |
1727 | } | |
1728 | cs.bytesRemain -= int64(n) | |
1729 | if err == io.EOF && cs.bytesRemain > 0 { | |
1730 | err = io.ErrUnexpectedEOF | |
1731 | cs.readErr = err | |
1732 | return n, err | |
1733 | } | |
1734 | } | |
1735 | if n == 0 { | |
1736 | // No flow control tokens to send back. | |
1737 | return | |
1738 | } | |
1739 | ||
1740 | cc.mu.Lock() | |
1741 | defer cc.mu.Unlock() | |
1742 | ||
1743 | var connAdd, streamAdd int32 | |
1744 | // Check the conn-level first, before the stream-level. | |
1745 | if v := cc.inflow.available(); v < transportDefaultConnFlow/2 { | |
1746 | connAdd = transportDefaultConnFlow - v | |
1747 | cc.inflow.add(connAdd) | |
1748 | } | |
1749 | if err == nil { // No need to refresh if the stream is over or failed. | |
1750 | // Consider any buffered body data (read from the conn but not | |
1751 | // consumed by the client) when computing flow control for this | |
1752 | // stream. | |
1753 | v := int(cs.inflow.available()) + cs.bufPipe.Len() | |
1754 | if v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh { | |
1755 | streamAdd = int32(transportDefaultStreamFlow - v) | |
1756 | cs.inflow.add(streamAdd) | |
1757 | } | |
1758 | } | |
1759 | if connAdd != 0 || streamAdd != 0 { | |
1760 | cc.wmu.Lock() | |
1761 | defer cc.wmu.Unlock() | |
1762 | if connAdd != 0 { | |
1763 | cc.fr.WriteWindowUpdate(0, mustUint31(connAdd)) | |
1764 | } | |
1765 | if streamAdd != 0 { | |
1766 | cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd)) | |
1767 | } | |
1768 | cc.bw.Flush() | |
1769 | } | |
1770 | return | |
1771 | } | |
1772 | ||
1773 | var errClosedResponseBody = errors.New("http2: response body closed") | |
1774 | ||
1775 | func (b transportResponseBody) Close() error { | |
1776 | cs := b.cs | |
1777 | cc := cs.cc | |
1778 | ||
1779 | serverSentStreamEnd := cs.bufPipe.Err() == io.EOF | |
1780 | unread := cs.bufPipe.Len() | |
1781 | ||
1782 | if unread > 0 || !serverSentStreamEnd { | |
1783 | cc.mu.Lock() | |
1784 | cc.wmu.Lock() | |
1785 | if !serverSentStreamEnd { | |
1786 | cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel) | |
1787 | cs.didReset = true | |
1788 | } | |
1789 | // Return connection-level flow control. | |
1790 | if unread > 0 { | |
1791 | cc.inflow.add(int32(unread)) | |
1792 | cc.fr.WriteWindowUpdate(0, uint32(unread)) | |
1793 | } | |
1794 | cc.bw.Flush() | |
1795 | cc.wmu.Unlock() | |
1796 | cc.mu.Unlock() | |
1797 | } | |
1798 | ||
1799 | cs.bufPipe.BreakWithError(errClosedResponseBody) | |
1800 | cc.forgetStreamID(cs.ID) | |
1801 | return nil | |
1802 | } | |
1803 | ||
1804 | func (rl *clientConnReadLoop) processData(f *DataFrame) error { | |
1805 | cc := rl.cc | |
1806 | cs := cc.streamByID(f.StreamID, f.StreamEnded()) | |
1807 | data := f.Data() | |
1808 | if cs == nil { | |
1809 | cc.mu.Lock() | |
1810 | neverSent := cc.nextStreamID | |
1811 | cc.mu.Unlock() | |
1812 | if f.StreamID >= neverSent { | |
1813 | // We never asked for this. | |
1814 | cc.logf("http2: Transport received unsolicited DATA frame; closing connection") | |
1815 | return ConnectionError(ErrCodeProtocol) | |
1816 | } | |
1817 | // We probably did ask for this, but canceled. Just ignore it. | |
1818 | // TODO: be stricter here? only silently ignore things which | |
1819 | // we canceled, but not things which were closed normally | |
1820 | // by the peer? Tough without accumulating too much state. | |
1821 | ||
1822 | // But at least return their flow control: | |
1823 | if f.Length > 0 { | |
1824 | cc.mu.Lock() | |
1825 | cc.inflow.add(int32(f.Length)) | |
1826 | cc.mu.Unlock() | |
1827 | ||
1828 | cc.wmu.Lock() | |
1829 | cc.fr.WriteWindowUpdate(0, uint32(f.Length)) | |
1830 | cc.bw.Flush() | |
1831 | cc.wmu.Unlock() | |
1832 | } | |
1833 | return nil | |
1834 | } | |
1835 | if !cs.firstByte { | |
1836 | cc.logf("protocol error: received DATA before a HEADERS frame") | |
1837 | rl.endStreamError(cs, StreamError{ | |
1838 | StreamID: f.StreamID, | |
1839 | Code: ErrCodeProtocol, | |
1840 | }) | |
1841 | return nil | |
1842 | } | |
1843 | if f.Length > 0 { | |
1844 | // Check connection-level flow control. | |
1845 | cc.mu.Lock() | |
1846 | if cs.inflow.available() >= int32(f.Length) { | |
1847 | cs.inflow.take(int32(f.Length)) | |
1848 | } else { | |
1849 | cc.mu.Unlock() | |
1850 | return ConnectionError(ErrCodeFlowControl) | |
1851 | } | |
1852 | // Return any padded flow control now, since we won't | |
1853 | // refund it later on body reads. | |
1854 | var refund int | |
1855 | if pad := int(f.Length) - len(data); pad > 0 { | |
1856 | refund += pad | |
1857 | } | |
1858 | // Return len(data) now if the stream is already closed, | |
1859 | // since data will never be read. | |
1860 | didReset := cs.didReset | |
1861 | if didReset { | |
1862 | refund += len(data) | |
1863 | } | |
1864 | if refund > 0 { | |
1865 | cc.inflow.add(int32(refund)) | |
1866 | cc.wmu.Lock() | |
1867 | cc.fr.WriteWindowUpdate(0, uint32(refund)) | |
1868 | if !didReset { | |
1869 | cs.inflow.add(int32(refund)) | |
1870 | cc.fr.WriteWindowUpdate(cs.ID, uint32(refund)) | |
1871 | } | |
1872 | cc.bw.Flush() | |
1873 | cc.wmu.Unlock() | |
1874 | } | |
1875 | cc.mu.Unlock() | |
1876 | ||
1877 | if len(data) > 0 && !didReset { | |
1878 | if _, err := cs.bufPipe.Write(data); err != nil { | |
1879 | rl.endStreamError(cs, err) | |
1880 | return err | |
1881 | } | |
1882 | } | |
1883 | } | |
1884 | ||
1885 | if f.StreamEnded() { | |
1886 | rl.endStream(cs) | |
1887 | } | |
1888 | return nil | |
1889 | } | |
1890 | ||
1891 | var errInvalidTrailers = errors.New("http2: invalid trailers") | |
1892 | ||
1893 | func (rl *clientConnReadLoop) endStream(cs *clientStream) { | |
1894 | // TODO: check that any declared content-length matches, like | |
1895 | // server.go's (*stream).endStream method. | |
1896 | rl.endStreamError(cs, nil) | |
1897 | } | |
1898 | ||
1899 | func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) { | |
1900 | var code func() | |
1901 | if err == nil { | |
1902 | err = io.EOF | |
1903 | code = cs.copyTrailers | |
1904 | } | |
1905 | cs.bufPipe.closeWithErrorAndCode(err, code) | |
1906 | delete(rl.activeRes, cs.ID) | |
1907 | if isConnectionCloseRequest(cs.req) { | |
1908 | rl.closeWhenIdle = true | |
1909 | } | |
1910 | ||
1911 | select { | |
1912 | case cs.resc <- resAndError{err: err}: | |
1913 | default: | |
1914 | } | |
1915 | } | |
1916 | ||
1917 | func (cs *clientStream) copyTrailers() { | |
1918 | for k, vv := range cs.trailer { | |
1919 | t := cs.resTrailer | |
1920 | if *t == nil { | |
1921 | *t = make(http.Header) | |
1922 | } | |
1923 | (*t)[k] = vv | |
1924 | } | |
1925 | } | |
1926 | ||
1927 | func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error { | |
1928 | cc := rl.cc | |
1929 | cc.t.connPool().MarkDead(cc) | |
1930 | if f.ErrCode != 0 { | |
1931 | // TODO: deal with GOAWAY more. particularly the error code | |
1932 | cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode) | |
1933 | } | |
1934 | cc.setGoAway(f) | |
1935 | return nil | |
1936 | } | |
1937 | ||
1938 | func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error { | |
1939 | cc := rl.cc | |
1940 | cc.mu.Lock() | |
1941 | defer cc.mu.Unlock() | |
1942 | ||
1943 | if f.IsAck() { | |
1944 | if cc.wantSettingsAck { | |
1945 | cc.wantSettingsAck = false | |
1946 | return nil | |
1947 | } | |
1948 | return ConnectionError(ErrCodeProtocol) | |
1949 | } | |
1950 | ||
1951 | err := f.ForeachSetting(func(s Setting) error { | |
1952 | switch s.ID { | |
1953 | case SettingMaxFrameSize: | |
1954 | cc.maxFrameSize = s.Val | |
1955 | case SettingMaxConcurrentStreams: | |
1956 | cc.maxConcurrentStreams = s.Val | |
1957 | case SettingMaxHeaderListSize: | |
1958 | cc.peerMaxHeaderListSize = uint64(s.Val) | |
1959 | case SettingInitialWindowSize: | |
1960 | // Values above the maximum flow-control | |
1961 | // window size of 2^31-1 MUST be treated as a | |
1962 | // connection error (Section 5.4.1) of type | |
1963 | // FLOW_CONTROL_ERROR. | |
1964 | if s.Val > math.MaxInt32 { | |
1965 | return ConnectionError(ErrCodeFlowControl) | |
1966 | } | |
1967 | ||
1968 | // Adjust flow control of currently-open | |
1969 | // frames by the difference of the old initial | |
1970 | // window size and this one. | |
1971 | delta := int32(s.Val) - int32(cc.initialWindowSize) | |
1972 | for _, cs := range cc.streams { | |
1973 | cs.flow.add(delta) | |
1974 | } | |
1975 | cc.cond.Broadcast() | |
1976 | ||
1977 | cc.initialWindowSize = s.Val | |
1978 | default: | |
1979 | // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably. | |
1980 | cc.vlogf("Unhandled Setting: %v", s) | |
1981 | } | |
1982 | return nil | |
1983 | }) | |
1984 | if err != nil { | |
1985 | return err | |
1986 | } | |
1987 | ||
1988 | cc.wmu.Lock() | |
1989 | defer cc.wmu.Unlock() | |
1990 | ||
1991 | cc.fr.WriteSettingsAck() | |
1992 | cc.bw.Flush() | |
1993 | return cc.werr | |
1994 | } | |
1995 | ||
1996 | func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error { | |
1997 | cc := rl.cc | |
1998 | cs := cc.streamByID(f.StreamID, false) | |
1999 | if f.StreamID != 0 && cs == nil { | |
2000 | return nil | |
2001 | } | |
2002 | ||
2003 | cc.mu.Lock() | |
2004 | defer cc.mu.Unlock() | |
2005 | ||
2006 | fl := &cc.flow | |
2007 | if cs != nil { | |
2008 | fl = &cs.flow | |
2009 | } | |
2010 | if !fl.add(int32(f.Increment)) { | |
2011 | return ConnectionError(ErrCodeFlowControl) | |
2012 | } | |
2013 | cc.cond.Broadcast() | |
2014 | return nil | |
2015 | } | |
2016 | ||
2017 | func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error { | |
2018 | cs := rl.cc.streamByID(f.StreamID, true) | |
2019 | if cs == nil { | |
2020 | // TODO: return error if server tries to RST_STEAM an idle stream | |
2021 | return nil | |
2022 | } | |
2023 | select { | |
2024 | case <-cs.peerReset: | |
2025 | // Already reset. | |
2026 | // This is the only goroutine | |
2027 | // which closes this, so there | |
2028 | // isn't a race. | |
2029 | default: | |
2030 | err := streamError(cs.ID, f.ErrCode) | |
2031 | cs.resetErr = err | |
2032 | close(cs.peerReset) | |
2033 | cs.bufPipe.CloseWithError(err) | |
2034 | cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl | |
2035 | } | |
2036 | delete(rl.activeRes, cs.ID) | |
2037 | return nil | |
2038 | } | |
2039 | ||
2040 | // Ping sends a PING frame to the server and waits for the ack. | |
2041 | // Public implementation is in go17.go and not_go17.go | |
2042 | func (cc *ClientConn) ping(ctx contextContext) error { | |
2043 | c := make(chan struct{}) | |
2044 | // Generate a random payload | |
2045 | var p [8]byte | |
2046 | for { | |
2047 | if _, err := rand.Read(p[:]); err != nil { | |
2048 | return err | |
2049 | } | |
2050 | cc.mu.Lock() | |
2051 | // check for dup before insert | |
2052 | if _, found := cc.pings[p]; !found { | |
2053 | cc.pings[p] = c | |
2054 | cc.mu.Unlock() | |
2055 | break | |
2056 | } | |
2057 | cc.mu.Unlock() | |
2058 | } | |
2059 | cc.wmu.Lock() | |
2060 | if err := cc.fr.WritePing(false, p); err != nil { | |
2061 | cc.wmu.Unlock() | |
2062 | return err | |
2063 | } | |
2064 | if err := cc.bw.Flush(); err != nil { | |
2065 | cc.wmu.Unlock() | |
2066 | return err | |
2067 | } | |
2068 | cc.wmu.Unlock() | |
2069 | select { | |
2070 | case <-c: | |
2071 | return nil | |
2072 | case <-ctx.Done(): | |
2073 | return ctx.Err() | |
2074 | case <-cc.readerDone: | |
2075 | // connection closed | |
2076 | return cc.readerErr | |
2077 | } | |
2078 | } | |
2079 | ||
2080 | func (rl *clientConnReadLoop) processPing(f *PingFrame) error { | |
2081 | if f.IsAck() { | |
2082 | cc := rl.cc | |
2083 | cc.mu.Lock() | |
2084 | defer cc.mu.Unlock() | |
2085 | // If ack, notify listener if any | |
2086 | if c, ok := cc.pings[f.Data]; ok { | |
2087 | close(c) | |
2088 | delete(cc.pings, f.Data) | |
2089 | } | |
2090 | return nil | |
2091 | } | |
2092 | cc := rl.cc | |
2093 | cc.wmu.Lock() | |
2094 | defer cc.wmu.Unlock() | |
2095 | if err := cc.fr.WritePing(true, f.Data); err != nil { | |
2096 | return err | |
2097 | } | |
2098 | return cc.bw.Flush() | |
2099 | } | |
2100 | ||
2101 | func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error { | |
2102 | // We told the peer we don't want them. | |
2103 | // Spec says: | |
2104 | // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH | |
2105 | // setting of the peer endpoint is set to 0. An endpoint that | |
2106 | // has set this setting and has received acknowledgement MUST | |
2107 | // treat the receipt of a PUSH_PROMISE frame as a connection | |
2108 | // error (Section 5.4.1) of type PROTOCOL_ERROR." | |
2109 | return ConnectionError(ErrCodeProtocol) | |
2110 | } | |
2111 | ||
2112 | func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) { | |
2113 | // TODO: map err to more interesting error codes, once the | |
2114 | // HTTP community comes up with some. But currently for | |
2115 | // RST_STREAM there's no equivalent to GOAWAY frame's debug | |
2116 | // data, and the error codes are all pretty vague ("cancel"). | |
2117 | cc.wmu.Lock() | |
2118 | cc.fr.WriteRSTStream(streamID, code) | |
2119 | cc.bw.Flush() | |
2120 | cc.wmu.Unlock() | |
2121 | } | |
2122 | ||
2123 | var ( | |
2124 | errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit") | |
2125 | errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit") | |
2126 | errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers") | |
2127 | ) | |
2128 | ||
2129 | func (cc *ClientConn) logf(format string, args ...interface{}) { | |
2130 | cc.t.logf(format, args...) | |
2131 | } | |
2132 | ||
2133 | func (cc *ClientConn) vlogf(format string, args ...interface{}) { | |
2134 | cc.t.vlogf(format, args...) | |
2135 | } | |
2136 | ||
2137 | func (t *Transport) vlogf(format string, args ...interface{}) { | |
2138 | if VerboseLogs { | |
2139 | t.logf(format, args...) | |
2140 | } | |
2141 | } | |
2142 | ||
2143 | func (t *Transport) logf(format string, args ...interface{}) { | |
2144 | log.Printf(format, args...) | |
2145 | } | |
2146 | ||
2147 | var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil)) | |
2148 | ||
2149 | func strSliceContains(ss []string, s string) bool { | |
2150 | for _, v := range ss { | |
2151 | if v == s { | |
2152 | return true | |
2153 | } | |
2154 | } | |
2155 | return false | |
2156 | } | |
2157 | ||
2158 | type erringRoundTripper struct{ err error } | |
2159 | ||
2160 | func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err } | |
2161 | ||
2162 | // gzipReader wraps a response body so it can lazily | |
2163 | // call gzip.NewReader on the first call to Read | |
2164 | type gzipReader struct { | |
2165 | body io.ReadCloser // underlying Response.Body | |
2166 | zr *gzip.Reader // lazily-initialized gzip reader | |
2167 | zerr error // sticky error | |
2168 | } | |
2169 | ||
2170 | func (gz *gzipReader) Read(p []byte) (n int, err error) { | |
2171 | if gz.zerr != nil { | |
2172 | return 0, gz.zerr | |
2173 | } | |
2174 | if gz.zr == nil { | |
2175 | gz.zr, err = gzip.NewReader(gz.body) | |
2176 | if err != nil { | |
2177 | gz.zerr = err | |
2178 | return 0, err | |
2179 | } | |
2180 | } | |
2181 | return gz.zr.Read(p) | |
2182 | } | |
2183 | ||
2184 | func (gz *gzipReader) Close() error { | |
2185 | return gz.body.Close() | |
2186 | } | |
2187 | ||
2188 | type errorReader struct{ err error } | |
2189 | ||
2190 | func (r errorReader) Read(p []byte) (int, error) { return 0, r.err } | |
2191 | ||
2192 | // bodyWriterState encapsulates various state around the Transport's writing | |
2193 | // of the request body, particularly regarding doing delayed writes of the body | |
2194 | // when the request contains "Expect: 100-continue". | |
2195 | type bodyWriterState struct { | |
2196 | cs *clientStream | |
2197 | timer *time.Timer // if non-nil, we're doing a delayed write | |
2198 | fnonce *sync.Once // to call fn with | |
2199 | fn func() // the code to run in the goroutine, writing the body | |
2200 | resc chan error // result of fn's execution | |
2201 | delay time.Duration // how long we should delay a delayed write for | |
2202 | } | |
2203 | ||
2204 | func (t *Transport) getBodyWriterState(cs *clientStream, body io.Reader) (s bodyWriterState) { | |
2205 | s.cs = cs | |
2206 | if body == nil { | |
2207 | return | |
2208 | } | |
2209 | resc := make(chan error, 1) | |
2210 | s.resc = resc | |
2211 | s.fn = func() { | |
2212 | cs.cc.mu.Lock() | |
2213 | cs.startedWrite = true | |
2214 | cs.cc.mu.Unlock() | |
2215 | resc <- cs.writeRequestBody(body, cs.req.Body) | |
2216 | } | |
2217 | s.delay = t.expectContinueTimeout() | |
2218 | if s.delay == 0 || | |
2219 | !httplex.HeaderValuesContainsToken( | |
2220 | cs.req.Header["Expect"], | |
2221 | "100-continue") { | |
2222 | return | |
2223 | } | |
2224 | s.fnonce = new(sync.Once) | |
2225 | ||
2226 | // Arm the timer with a very large duration, which we'll | |
2227 | // intentionally lower later. It has to be large now because | |
2228 | // we need a handle to it before writing the headers, but the | |
2229 | // s.delay value is defined to not start until after the | |
2230 | // request headers were written. | |
2231 | const hugeDuration = 365 * 24 * time.Hour | |
2232 | s.timer = time.AfterFunc(hugeDuration, func() { | |
2233 | s.fnonce.Do(s.fn) | |
2234 | }) | |
2235 | return | |
2236 | } | |
2237 | ||
2238 | func (s bodyWriterState) cancel() { | |
2239 | if s.timer != nil { | |
2240 | s.timer.Stop() | |
2241 | } | |
2242 | } | |
2243 | ||
2244 | func (s bodyWriterState) on100() { | |
2245 | if s.timer == nil { | |
2246 | // If we didn't do a delayed write, ignore the server's | |
2247 | // bogus 100 continue response. | |
2248 | return | |
2249 | } | |
2250 | s.timer.Stop() | |
2251 | go func() { s.fnonce.Do(s.fn) }() | |
2252 | } | |
2253 | ||
2254 | // scheduleBodyWrite starts writing the body, either immediately (in | |
2255 | // the common case) or after the delay timeout. It should not be | |
2256 | // called until after the headers have been written. | |
2257 | func (s bodyWriterState) scheduleBodyWrite() { | |
2258 | if s.timer == nil { | |
2259 | // We're not doing a delayed write (see | |
2260 | // getBodyWriterState), so just start the writing | |
2261 | // goroutine immediately. | |
2262 | go s.fn() | |
2263 | return | |
2264 | } | |
2265 | traceWait100Continue(s.cs.trace) | |
2266 | if s.timer.Stop() { | |
2267 | s.timer.Reset(s.delay) | |
2268 | } | |
2269 | } | |
2270 | ||
2271 | // isConnectionCloseRequest reports whether req should use its own | |
2272 | // connection for a single request and then close the connection. | |
2273 | func isConnectionCloseRequest(req *http.Request) bool { | |
2274 | return req.Close || httplex.HeaderValuesContainsToken(req.Header["Connection"], "close") | |
2275 | } |