diff options
Diffstat (limited to 'vendor/golang.org/x/net/http2/transport.go')
-rw-r--r-- | vendor/golang.org/x/net/http2/transport.go | 2275 |
1 files changed, 2275 insertions, 0 deletions
diff --git a/vendor/golang.org/x/net/http2/transport.go b/vendor/golang.org/x/net/http2/transport.go new file mode 100644 index 0000000..adb77ff --- /dev/null +++ b/vendor/golang.org/x/net/http2/transport.go | |||
@@ -0,0 +1,2275 @@ | |||
1 | // Copyright 2015 The Go Authors. All rights reserved. | ||
2 | // Use of this source code is governed by a BSD-style | ||
3 | // license that can be found in the LICENSE file. | ||
4 | |||
5 | // Transport code. | ||
6 | |||
7 | package http2 | ||
8 | |||
9 | import ( | ||
10 | "bufio" | ||
11 | "bytes" | ||
12 | "compress/gzip" | ||
13 | "crypto/rand" | ||
14 | "crypto/tls" | ||
15 | "errors" | ||
16 | "fmt" | ||
17 | "io" | ||
18 | "io/ioutil" | ||
19 | "log" | ||
20 | "math" | ||
21 | mathrand "math/rand" | ||
22 | "net" | ||
23 | "net/http" | ||
24 | "sort" | ||
25 | "strconv" | ||
26 | "strings" | ||
27 | "sync" | ||
28 | "time" | ||
29 | |||
30 | "golang.org/x/net/http2/hpack" | ||
31 | "golang.org/x/net/idna" | ||
32 | "golang.org/x/net/lex/httplex" | ||
33 | ) | ||
34 | |||
35 | const ( | ||
36 | // transportDefaultConnFlow is how many connection-level flow control | ||
37 | // tokens we give the server at start-up, past the default 64k. | ||
38 | transportDefaultConnFlow = 1 << 30 | ||
39 | |||
40 | // transportDefaultStreamFlow is how many stream-level flow | ||
41 | // control tokens we announce to the peer, and how many bytes | ||
42 | // we buffer per stream. | ||
43 | transportDefaultStreamFlow = 4 << 20 | ||
44 | |||
45 | // transportDefaultStreamMinRefresh is the minimum number of bytes we'll send | ||
46 | // a stream-level WINDOW_UPDATE for at a time. | ||
47 | transportDefaultStreamMinRefresh = 4 << 10 | ||
48 | |||
49 | defaultUserAgent = "Go-http-client/2.0" | ||
50 | ) | ||
51 | |||
52 | // Transport is an HTTP/2 Transport. | ||
53 | // | ||
54 | // A Transport internally caches connections to servers. It is safe | ||
55 | // for concurrent use by multiple goroutines. | ||
56 | type Transport struct { | ||
57 | // DialTLS specifies an optional dial function for creating | ||
58 | // TLS connections for requests. | ||
59 | // | ||
60 | // If DialTLS is nil, tls.Dial is used. | ||
61 | // | ||
62 | // If the returned net.Conn has a ConnectionState method like tls.Conn, | ||
63 | // it will be used to set http.Response.TLS. | ||
64 | DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error) | ||
65 | |||
66 | // TLSClientConfig specifies the TLS configuration to use with | ||
67 | // tls.Client. If nil, the default configuration is used. | ||
68 | TLSClientConfig *tls.Config | ||
69 | |||
70 | // ConnPool optionally specifies an alternate connection pool to use. | ||
71 | // If nil, the default is used. | ||
72 | ConnPool ClientConnPool | ||
73 | |||
74 | // DisableCompression, if true, prevents the Transport from | ||
75 | // requesting compression with an "Accept-Encoding: gzip" | ||
76 | // request header when the Request contains no existing | ||
77 | // Accept-Encoding value. If the Transport requests gzip on | ||
78 | // its own and gets a gzipped response, it's transparently | ||
79 | // decoded in the Response.Body. However, if the user | ||
80 | // explicitly requested gzip it is not automatically | ||
81 | // uncompressed. | ||
82 | DisableCompression bool | ||
83 | |||
84 | // AllowHTTP, if true, permits HTTP/2 requests using the insecure, | ||
85 | // plain-text "http" scheme. Note that this does not enable h2c support. | ||
86 | AllowHTTP bool | ||
87 | |||
88 | // MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to | ||
89 | // send in the initial settings frame. It is how many bytes | ||
90 | // of response headers are allowed. Unlike the http2 spec, zero here | ||
91 | // means to use a default limit (currently 10MB). If you actually | ||
92 | // want to advertise an ulimited value to the peer, Transport | ||
93 | // interprets the highest possible value here (0xffffffff or 1<<32-1) | ||
94 | // to mean no limit. | ||
95 | MaxHeaderListSize uint32 | ||
96 | |||
97 | // t1, if non-nil, is the standard library Transport using | ||
98 | // this transport. Its settings are used (but not its | ||
99 | // RoundTrip method, etc). | ||
100 | t1 *http.Transport | ||
101 | |||
102 | connPoolOnce sync.Once | ||
103 | connPoolOrDef ClientConnPool // non-nil version of ConnPool | ||
104 | } | ||
105 | |||
106 | func (t *Transport) maxHeaderListSize() uint32 { | ||
107 | if t.MaxHeaderListSize == 0 { | ||
108 | return 10 << 20 | ||
109 | } | ||
110 | if t.MaxHeaderListSize == 0xffffffff { | ||
111 | return 0 | ||
112 | } | ||
113 | return t.MaxHeaderListSize | ||
114 | } | ||
115 | |||
116 | func (t *Transport) disableCompression() bool { | ||
117 | return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression) | ||
118 | } | ||
119 | |||
120 | var errTransportVersion = errors.New("http2: ConfigureTransport is only supported starting at Go 1.6") | ||
121 | |||
122 | // ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2. | ||
123 | // It requires Go 1.6 or later and returns an error if the net/http package is too old | ||
124 | // or if t1 has already been HTTP/2-enabled. | ||
125 | func ConfigureTransport(t1 *http.Transport) error { | ||
126 | _, err := configureTransport(t1) // in configure_transport.go (go1.6) or not_go16.go | ||
127 | return err | ||
128 | } | ||
129 | |||
130 | func (t *Transport) connPool() ClientConnPool { | ||
131 | t.connPoolOnce.Do(t.initConnPool) | ||
132 | return t.connPoolOrDef | ||
133 | } | ||
134 | |||
135 | func (t *Transport) initConnPool() { | ||
136 | if t.ConnPool != nil { | ||
137 | t.connPoolOrDef = t.ConnPool | ||
138 | } else { | ||
139 | t.connPoolOrDef = &clientConnPool{t: t} | ||
140 | } | ||
141 | } | ||
142 | |||
143 | // ClientConn is the state of a single HTTP/2 client connection to an | ||
144 | // HTTP/2 server. | ||
145 | type ClientConn struct { | ||
146 | t *Transport | ||
147 | tconn net.Conn // usually *tls.Conn, except specialized impls | ||
148 | tlsState *tls.ConnectionState // nil only for specialized impls | ||
149 | singleUse bool // whether being used for a single http.Request | ||
150 | |||
151 | // readLoop goroutine fields: | ||
152 | readerDone chan struct{} // closed on error | ||
153 | readerErr error // set before readerDone is closed | ||
154 | |||
155 | idleTimeout time.Duration // or 0 for never | ||
156 | idleTimer *time.Timer | ||
157 | |||
158 | mu sync.Mutex // guards following | ||
159 | cond *sync.Cond // hold mu; broadcast on flow/closed changes | ||
160 | flow flow // our conn-level flow control quota (cs.flow is per stream) | ||
161 | inflow flow // peer's conn-level flow control | ||
162 | closed bool | ||
163 | wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back | ||
164 | goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received | ||
165 | goAwayDebug string // goAway frame's debug data, retained as a string | ||
166 | streams map[uint32]*clientStream // client-initiated | ||
167 | nextStreamID uint32 | ||
168 | pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams | ||
169 | pings map[[8]byte]chan struct{} // in flight ping data to notification channel | ||
170 | bw *bufio.Writer | ||
171 | br *bufio.Reader | ||
172 | fr *Framer | ||
173 | lastActive time.Time | ||
174 | // Settings from peer: (also guarded by mu) | ||
175 | maxFrameSize uint32 | ||
176 | maxConcurrentStreams uint32 | ||
177 | peerMaxHeaderListSize uint64 | ||
178 | initialWindowSize uint32 | ||
179 | |||
180 | hbuf bytes.Buffer // HPACK encoder writes into this | ||
181 | henc *hpack.Encoder | ||
182 | freeBuf [][]byte | ||
183 | |||
184 | wmu sync.Mutex // held while writing; acquire AFTER mu if holding both | ||
185 | werr error // first write error that has occurred | ||
186 | } | ||
187 | |||
188 | // clientStream is the state for a single HTTP/2 stream. One of these | ||
189 | // is created for each Transport.RoundTrip call. | ||
190 | type clientStream struct { | ||
191 | cc *ClientConn | ||
192 | req *http.Request | ||
193 | trace *clientTrace // or nil | ||
194 | ID uint32 | ||
195 | resc chan resAndError | ||
196 | bufPipe pipe // buffered pipe with the flow-controlled response payload | ||
197 | startedWrite bool // started request body write; guarded by cc.mu | ||
198 | requestedGzip bool | ||
199 | on100 func() // optional code to run if get a 100 continue response | ||
200 | |||
201 | flow flow // guarded by cc.mu | ||
202 | inflow flow // guarded by cc.mu | ||
203 | bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read | ||
204 | readErr error // sticky read error; owned by transportResponseBody.Read | ||
205 | stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu | ||
206 | didReset bool // whether we sent a RST_STREAM to the server; guarded by cc.mu | ||
207 | |||
208 | peerReset chan struct{} // closed on peer reset | ||
209 | resetErr error // populated before peerReset is closed | ||
210 | |||
211 | done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu | ||
212 | |||
213 | // owned by clientConnReadLoop: | ||
214 | firstByte bool // got the first response byte | ||
215 | pastHeaders bool // got first MetaHeadersFrame (actual headers) | ||
216 | pastTrailers bool // got optional second MetaHeadersFrame (trailers) | ||
217 | |||
218 | trailer http.Header // accumulated trailers | ||
219 | resTrailer *http.Header // client's Response.Trailer | ||
220 | } | ||
221 | |||
222 | // awaitRequestCancel waits for the user to cancel a request or for the done | ||
223 | // channel to be signaled. A non-nil error is returned only if the request was | ||
224 | // canceled. | ||
225 | func awaitRequestCancel(req *http.Request, done <-chan struct{}) error { | ||
226 | ctx := reqContext(req) | ||
227 | if req.Cancel == nil && ctx.Done() == nil { | ||
228 | return nil | ||
229 | } | ||
230 | select { | ||
231 | case <-req.Cancel: | ||
232 | return errRequestCanceled | ||
233 | case <-ctx.Done(): | ||
234 | return ctx.Err() | ||
235 | case <-done: | ||
236 | return nil | ||
237 | } | ||
238 | } | ||
239 | |||
240 | // awaitRequestCancel waits for the user to cancel a request, its context to | ||
241 | // expire, or for the request to be done (any way it might be removed from the | ||
242 | // cc.streams map: peer reset, successful completion, TCP connection breakage, | ||
243 | // etc). If the request is canceled, then cs will be canceled and closed. | ||
244 | func (cs *clientStream) awaitRequestCancel(req *http.Request) { | ||
245 | if err := awaitRequestCancel(req, cs.done); err != nil { | ||
246 | cs.cancelStream() | ||
247 | cs.bufPipe.CloseWithError(err) | ||
248 | } | ||
249 | } | ||
250 | |||
251 | func (cs *clientStream) cancelStream() { | ||
252 | cc := cs.cc | ||
253 | cc.mu.Lock() | ||
254 | didReset := cs.didReset | ||
255 | cs.didReset = true | ||
256 | cc.mu.Unlock() | ||
257 | |||
258 | if !didReset { | ||
259 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | ||
260 | cc.forgetStreamID(cs.ID) | ||
261 | } | ||
262 | } | ||
263 | |||
264 | // checkResetOrDone reports any error sent in a RST_STREAM frame by the | ||
265 | // server, or errStreamClosed if the stream is complete. | ||
266 | func (cs *clientStream) checkResetOrDone() error { | ||
267 | select { | ||
268 | case <-cs.peerReset: | ||
269 | return cs.resetErr | ||
270 | case <-cs.done: | ||
271 | return errStreamClosed | ||
272 | default: | ||
273 | return nil | ||
274 | } | ||
275 | } | ||
276 | |||
277 | func (cs *clientStream) abortRequestBodyWrite(err error) { | ||
278 | if err == nil { | ||
279 | panic("nil error") | ||
280 | } | ||
281 | cc := cs.cc | ||
282 | cc.mu.Lock() | ||
283 | cs.stopReqBody = err | ||
284 | cc.cond.Broadcast() | ||
285 | cc.mu.Unlock() | ||
286 | } | ||
287 | |||
288 | type stickyErrWriter struct { | ||
289 | w io.Writer | ||
290 | err *error | ||
291 | } | ||
292 | |||
293 | func (sew stickyErrWriter) Write(p []byte) (n int, err error) { | ||
294 | if *sew.err != nil { | ||
295 | return 0, *sew.err | ||
296 | } | ||
297 | n, err = sew.w.Write(p) | ||
298 | *sew.err = err | ||
299 | return | ||
300 | } | ||
301 | |||
302 | var ErrNoCachedConn = errors.New("http2: no cached connection was available") | ||
303 | |||
304 | // RoundTripOpt are options for the Transport.RoundTripOpt method. | ||
305 | type RoundTripOpt struct { | ||
306 | // OnlyCachedConn controls whether RoundTripOpt may | ||
307 | // create a new TCP connection. If set true and | ||
308 | // no cached connection is available, RoundTripOpt | ||
309 | // will return ErrNoCachedConn. | ||
310 | OnlyCachedConn bool | ||
311 | } | ||
312 | |||
313 | func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { | ||
314 | return t.RoundTripOpt(req, RoundTripOpt{}) | ||
315 | } | ||
316 | |||
317 | // authorityAddr returns a given authority (a host/IP, or host:port / ip:port) | ||
318 | // and returns a host:port. The port 443 is added if needed. | ||
319 | func authorityAddr(scheme string, authority string) (addr string) { | ||
320 | host, port, err := net.SplitHostPort(authority) | ||
321 | if err != nil { // authority didn't have a port | ||
322 | port = "443" | ||
323 | if scheme == "http" { | ||
324 | port = "80" | ||
325 | } | ||
326 | host = authority | ||
327 | } | ||
328 | if a, err := idna.ToASCII(host); err == nil { | ||
329 | host = a | ||
330 | } | ||
331 | // IPv6 address literal, without a port: | ||
332 | if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") { | ||
333 | return host + ":" + port | ||
334 | } | ||
335 | return net.JoinHostPort(host, port) | ||
336 | } | ||
337 | |||
338 | // RoundTripOpt is like RoundTrip, but takes options. | ||
339 | func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) { | ||
340 | if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) { | ||
341 | return nil, errors.New("http2: unsupported scheme") | ||
342 | } | ||
343 | |||
344 | addr := authorityAddr(req.URL.Scheme, req.URL.Host) | ||
345 | for retry := 0; ; retry++ { | ||
346 | cc, err := t.connPool().GetClientConn(req, addr) | ||
347 | if err != nil { | ||
348 | t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err) | ||
349 | return nil, err | ||
350 | } | ||
351 | traceGotConn(req, cc) | ||
352 | res, err := cc.RoundTrip(req) | ||
353 | if err != nil && retry <= 6 { | ||
354 | afterBodyWrite := false | ||
355 | if e, ok := err.(afterReqBodyWriteError); ok { | ||
356 | err = e | ||
357 | afterBodyWrite = true | ||
358 | } | ||
359 | if req, err = shouldRetryRequest(req, err, afterBodyWrite); err == nil { | ||
360 | // After the first retry, do exponential backoff with 10% jitter. | ||
361 | if retry == 0 { | ||
362 | continue | ||
363 | } | ||
364 | backoff := float64(uint(1) << (uint(retry) - 1)) | ||
365 | backoff += backoff * (0.1 * mathrand.Float64()) | ||
366 | select { | ||
367 | case <-time.After(time.Second * time.Duration(backoff)): | ||
368 | continue | ||
369 | case <-reqContext(req).Done(): | ||
370 | return nil, reqContext(req).Err() | ||
371 | } | ||
372 | } | ||
373 | } | ||
374 | if err != nil { | ||
375 | t.vlogf("RoundTrip failure: %v", err) | ||
376 | return nil, err | ||
377 | } | ||
378 | return res, nil | ||
379 | } | ||
380 | } | ||
381 | |||
382 | // CloseIdleConnections closes any connections which were previously | ||
383 | // connected from previous requests but are now sitting idle. | ||
384 | // It does not interrupt any connections currently in use. | ||
385 | func (t *Transport) CloseIdleConnections() { | ||
386 | if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok { | ||
387 | cp.closeIdleConnections() | ||
388 | } | ||
389 | } | ||
390 | |||
391 | var ( | ||
392 | errClientConnClosed = errors.New("http2: client conn is closed") | ||
393 | errClientConnUnusable = errors.New("http2: client conn not usable") | ||
394 | errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY") | ||
395 | ) | ||
396 | |||
397 | // afterReqBodyWriteError is a wrapper around errors returned by ClientConn.RoundTrip. | ||
398 | // It is used to signal that err happened after part of Request.Body was sent to the server. | ||
399 | type afterReqBodyWriteError struct { | ||
400 | err error | ||
401 | } | ||
402 | |||
403 | func (e afterReqBodyWriteError) Error() string { | ||
404 | return e.err.Error() + "; some request body already written" | ||
405 | } | ||
406 | |||
407 | // shouldRetryRequest is called by RoundTrip when a request fails to get | ||
408 | // response headers. It is always called with a non-nil error. | ||
409 | // It returns either a request to retry (either the same request, or a | ||
410 | // modified clone), or an error if the request can't be replayed. | ||
411 | func shouldRetryRequest(req *http.Request, err error, afterBodyWrite bool) (*http.Request, error) { | ||
412 | if !canRetryError(err) { | ||
413 | return nil, err | ||
414 | } | ||
415 | if !afterBodyWrite { | ||
416 | return req, nil | ||
417 | } | ||
418 | // If the Body is nil (or http.NoBody), it's safe to reuse | ||
419 | // this request and its Body. | ||
420 | if req.Body == nil || reqBodyIsNoBody(req.Body) { | ||
421 | return req, nil | ||
422 | } | ||
423 | // Otherwise we depend on the Request having its GetBody | ||
424 | // func defined. | ||
425 | getBody := reqGetBody(req) // Go 1.8: getBody = req.GetBody | ||
426 | if getBody == nil { | ||
427 | return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err) | ||
428 | } | ||
429 | body, err := getBody() | ||
430 | if err != nil { | ||
431 | return nil, err | ||
432 | } | ||
433 | newReq := *req | ||
434 | newReq.Body = body | ||
435 | return &newReq, nil | ||
436 | } | ||
437 | |||
438 | func canRetryError(err error) bool { | ||
439 | if err == errClientConnUnusable || err == errClientConnGotGoAway { | ||
440 | return true | ||
441 | } | ||
442 | if se, ok := err.(StreamError); ok { | ||
443 | return se.Code == ErrCodeRefusedStream | ||
444 | } | ||
445 | return false | ||
446 | } | ||
447 | |||
448 | func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) { | ||
449 | host, _, err := net.SplitHostPort(addr) | ||
450 | if err != nil { | ||
451 | return nil, err | ||
452 | } | ||
453 | tconn, err := t.dialTLS()("tcp", addr, t.newTLSConfig(host)) | ||
454 | if err != nil { | ||
455 | return nil, err | ||
456 | } | ||
457 | return t.newClientConn(tconn, singleUse) | ||
458 | } | ||
459 | |||
460 | func (t *Transport) newTLSConfig(host string) *tls.Config { | ||
461 | cfg := new(tls.Config) | ||
462 | if t.TLSClientConfig != nil { | ||
463 | *cfg = *cloneTLSConfig(t.TLSClientConfig) | ||
464 | } | ||
465 | if !strSliceContains(cfg.NextProtos, NextProtoTLS) { | ||
466 | cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...) | ||
467 | } | ||
468 | if cfg.ServerName == "" { | ||
469 | cfg.ServerName = host | ||
470 | } | ||
471 | return cfg | ||
472 | } | ||
473 | |||
474 | func (t *Transport) dialTLS() func(string, string, *tls.Config) (net.Conn, error) { | ||
475 | if t.DialTLS != nil { | ||
476 | return t.DialTLS | ||
477 | } | ||
478 | return t.dialTLSDefault | ||
479 | } | ||
480 | |||
481 | func (t *Transport) dialTLSDefault(network, addr string, cfg *tls.Config) (net.Conn, error) { | ||
482 | cn, err := tls.Dial(network, addr, cfg) | ||
483 | if err != nil { | ||
484 | return nil, err | ||
485 | } | ||
486 | if err := cn.Handshake(); err != nil { | ||
487 | return nil, err | ||
488 | } | ||
489 | if !cfg.InsecureSkipVerify { | ||
490 | if err := cn.VerifyHostname(cfg.ServerName); err != nil { | ||
491 | return nil, err | ||
492 | } | ||
493 | } | ||
494 | state := cn.ConnectionState() | ||
495 | if p := state.NegotiatedProtocol; p != NextProtoTLS { | ||
496 | return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS) | ||
497 | } | ||
498 | if !state.NegotiatedProtocolIsMutual { | ||
499 | return nil, errors.New("http2: could not negotiate protocol mutually") | ||
500 | } | ||
501 | return cn, nil | ||
502 | } | ||
503 | |||
504 | // disableKeepAlives reports whether connections should be closed as | ||
505 | // soon as possible after handling the first request. | ||
506 | func (t *Transport) disableKeepAlives() bool { | ||
507 | return t.t1 != nil && t.t1.DisableKeepAlives | ||
508 | } | ||
509 | |||
510 | func (t *Transport) expectContinueTimeout() time.Duration { | ||
511 | if t.t1 == nil { | ||
512 | return 0 | ||
513 | } | ||
514 | return transportExpectContinueTimeout(t.t1) | ||
515 | } | ||
516 | |||
517 | func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) { | ||
518 | return t.newClientConn(c, false) | ||
519 | } | ||
520 | |||
521 | func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) { | ||
522 | cc := &ClientConn{ | ||
523 | t: t, | ||
524 | tconn: c, | ||
525 | readerDone: make(chan struct{}), | ||
526 | nextStreamID: 1, | ||
527 | maxFrameSize: 16 << 10, // spec default | ||
528 | initialWindowSize: 65535, // spec default | ||
529 | maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. | ||
530 | peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. | ||
531 | streams: make(map[uint32]*clientStream), | ||
532 | singleUse: singleUse, | ||
533 | wantSettingsAck: true, | ||
534 | pings: make(map[[8]byte]chan struct{}), | ||
535 | } | ||
536 | if d := t.idleConnTimeout(); d != 0 { | ||
537 | cc.idleTimeout = d | ||
538 | cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout) | ||
539 | } | ||
540 | if VerboseLogs { | ||
541 | t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) | ||
542 | } | ||
543 | |||
544 | cc.cond = sync.NewCond(&cc.mu) | ||
545 | cc.flow.add(int32(initialWindowSize)) | ||
546 | |||
547 | // TODO: adjust this writer size to account for frame size + | ||
548 | // MTU + crypto/tls record padding. | ||
549 | cc.bw = bufio.NewWriter(stickyErrWriter{c, &cc.werr}) | ||
550 | cc.br = bufio.NewReader(c) | ||
551 | cc.fr = NewFramer(cc.bw, cc.br) | ||
552 | cc.fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil) | ||
553 | cc.fr.MaxHeaderListSize = t.maxHeaderListSize() | ||
554 | |||
555 | // TODO: SetMaxDynamicTableSize, SetMaxDynamicTableSizeLimit on | ||
556 | // henc in response to SETTINGS frames? | ||
557 | cc.henc = hpack.NewEncoder(&cc.hbuf) | ||
558 | |||
559 | if cs, ok := c.(connectionStater); ok { | ||
560 | state := cs.ConnectionState() | ||
561 | cc.tlsState = &state | ||
562 | } | ||
563 | |||
564 | initialSettings := []Setting{ | ||
565 | {ID: SettingEnablePush, Val: 0}, | ||
566 | {ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow}, | ||
567 | } | ||
568 | if max := t.maxHeaderListSize(); max != 0 { | ||
569 | initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max}) | ||
570 | } | ||
571 | |||
572 | cc.bw.Write(clientPreface) | ||
573 | cc.fr.WriteSettings(initialSettings...) | ||
574 | cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow) | ||
575 | cc.inflow.add(transportDefaultConnFlow + initialWindowSize) | ||
576 | cc.bw.Flush() | ||
577 | if cc.werr != nil { | ||
578 | return nil, cc.werr | ||
579 | } | ||
580 | |||
581 | go cc.readLoop() | ||
582 | return cc, nil | ||
583 | } | ||
584 | |||
585 | func (cc *ClientConn) setGoAway(f *GoAwayFrame) { | ||
586 | cc.mu.Lock() | ||
587 | defer cc.mu.Unlock() | ||
588 | |||
589 | old := cc.goAway | ||
590 | cc.goAway = f | ||
591 | |||
592 | // Merge the previous and current GoAway error frames. | ||
593 | if cc.goAwayDebug == "" { | ||
594 | cc.goAwayDebug = string(f.DebugData()) | ||
595 | } | ||
596 | if old != nil && old.ErrCode != ErrCodeNo { | ||
597 | cc.goAway.ErrCode = old.ErrCode | ||
598 | } | ||
599 | last := f.LastStreamID | ||
600 | for streamID, cs := range cc.streams { | ||
601 | if streamID > last { | ||
602 | select { | ||
603 | case cs.resc <- resAndError{err: errClientConnGotGoAway}: | ||
604 | default: | ||
605 | } | ||
606 | } | ||
607 | } | ||
608 | } | ||
609 | |||
610 | // CanTakeNewRequest reports whether the connection can take a new request, | ||
611 | // meaning it has not been closed or received or sent a GOAWAY. | ||
612 | func (cc *ClientConn) CanTakeNewRequest() bool { | ||
613 | cc.mu.Lock() | ||
614 | defer cc.mu.Unlock() | ||
615 | return cc.canTakeNewRequestLocked() | ||
616 | } | ||
617 | |||
618 | func (cc *ClientConn) canTakeNewRequestLocked() bool { | ||
619 | if cc.singleUse && cc.nextStreamID > 1 { | ||
620 | return false | ||
621 | } | ||
622 | return cc.goAway == nil && !cc.closed && | ||
623 | int64(cc.nextStreamID)+int64(cc.pendingRequests) < math.MaxInt32 | ||
624 | } | ||
625 | |||
626 | // onIdleTimeout is called from a time.AfterFunc goroutine. It will | ||
627 | // only be called when we're idle, but because we're coming from a new | ||
628 | // goroutine, there could be a new request coming in at the same time, | ||
629 | // so this simply calls the synchronized closeIfIdle to shut down this | ||
630 | // connection. The timer could just call closeIfIdle, but this is more | ||
631 | // clear. | ||
632 | func (cc *ClientConn) onIdleTimeout() { | ||
633 | cc.closeIfIdle() | ||
634 | } | ||
635 | |||
636 | func (cc *ClientConn) closeIfIdle() { | ||
637 | cc.mu.Lock() | ||
638 | if len(cc.streams) > 0 { | ||
639 | cc.mu.Unlock() | ||
640 | return | ||
641 | } | ||
642 | cc.closed = true | ||
643 | nextID := cc.nextStreamID | ||
644 | // TODO: do clients send GOAWAY too? maybe? Just Close: | ||
645 | cc.mu.Unlock() | ||
646 | |||
647 | if VerboseLogs { | ||
648 | cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2) | ||
649 | } | ||
650 | cc.tconn.Close() | ||
651 | } | ||
652 | |||
653 | const maxAllocFrameSize = 512 << 10 | ||
654 | |||
655 | // frameBuffer returns a scratch buffer suitable for writing DATA frames. | ||
656 | // They're capped at the min of the peer's max frame size or 512KB | ||
657 | // (kinda arbitrarily), but definitely capped so we don't allocate 4GB | ||
658 | // bufers. | ||
659 | func (cc *ClientConn) frameScratchBuffer() []byte { | ||
660 | cc.mu.Lock() | ||
661 | size := cc.maxFrameSize | ||
662 | if size > maxAllocFrameSize { | ||
663 | size = maxAllocFrameSize | ||
664 | } | ||
665 | for i, buf := range cc.freeBuf { | ||
666 | if len(buf) >= int(size) { | ||
667 | cc.freeBuf[i] = nil | ||
668 | cc.mu.Unlock() | ||
669 | return buf[:size] | ||
670 | } | ||
671 | } | ||
672 | cc.mu.Unlock() | ||
673 | return make([]byte, size) | ||
674 | } | ||
675 | |||
676 | func (cc *ClientConn) putFrameScratchBuffer(buf []byte) { | ||
677 | cc.mu.Lock() | ||
678 | defer cc.mu.Unlock() | ||
679 | const maxBufs = 4 // arbitrary; 4 concurrent requests per conn? investigate. | ||
680 | if len(cc.freeBuf) < maxBufs { | ||
681 | cc.freeBuf = append(cc.freeBuf, buf) | ||
682 | return | ||
683 | } | ||
684 | for i, old := range cc.freeBuf { | ||
685 | if old == nil { | ||
686 | cc.freeBuf[i] = buf | ||
687 | return | ||
688 | } | ||
689 | } | ||
690 | // forget about it. | ||
691 | } | ||
692 | |||
693 | // errRequestCanceled is a copy of net/http's errRequestCanceled because it's not | ||
694 | // exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests. | ||
695 | var errRequestCanceled = errors.New("net/http: request canceled") | ||
696 | |||
697 | func commaSeparatedTrailers(req *http.Request) (string, error) { | ||
698 | keys := make([]string, 0, len(req.Trailer)) | ||
699 | for k := range req.Trailer { | ||
700 | k = http.CanonicalHeaderKey(k) | ||
701 | switch k { | ||
702 | case "Transfer-Encoding", "Trailer", "Content-Length": | ||
703 | return "", &badStringError{"invalid Trailer key", k} | ||
704 | } | ||
705 | keys = append(keys, k) | ||
706 | } | ||
707 | if len(keys) > 0 { | ||
708 | sort.Strings(keys) | ||
709 | return strings.Join(keys, ","), nil | ||
710 | } | ||
711 | return "", nil | ||
712 | } | ||
713 | |||
714 | func (cc *ClientConn) responseHeaderTimeout() time.Duration { | ||
715 | if cc.t.t1 != nil { | ||
716 | return cc.t.t1.ResponseHeaderTimeout | ||
717 | } | ||
718 | // No way to do this (yet?) with just an http2.Transport. Probably | ||
719 | // no need. Request.Cancel this is the new way. We only need to support | ||
720 | // this for compatibility with the old http.Transport fields when | ||
721 | // we're doing transparent http2. | ||
722 | return 0 | ||
723 | } | ||
724 | |||
725 | // checkConnHeaders checks whether req has any invalid connection-level headers. | ||
726 | // per RFC 7540 section 8.1.2.2: Connection-Specific Header Fields. | ||
727 | // Certain headers are special-cased as okay but not transmitted later. | ||
728 | func checkConnHeaders(req *http.Request) error { | ||
729 | if v := req.Header.Get("Upgrade"); v != "" { | ||
730 | return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"]) | ||
731 | } | ||
732 | if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") { | ||
733 | return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv) | ||
734 | } | ||
735 | if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "close" && vv[0] != "keep-alive") { | ||
736 | return fmt.Errorf("http2: invalid Connection request header: %q", vv) | ||
737 | } | ||
738 | return nil | ||
739 | } | ||
740 | |||
741 | // actualContentLength returns a sanitized version of | ||
742 | // req.ContentLength, where 0 actually means zero (not unknown) and -1 | ||
743 | // means unknown. | ||
744 | func actualContentLength(req *http.Request) int64 { | ||
745 | if req.Body == nil || reqBodyIsNoBody(req.Body) { | ||
746 | return 0 | ||
747 | } | ||
748 | if req.ContentLength != 0 { | ||
749 | return req.ContentLength | ||
750 | } | ||
751 | return -1 | ||
752 | } | ||
753 | |||
754 | func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) { | ||
755 | if err := checkConnHeaders(req); err != nil { | ||
756 | return nil, err | ||
757 | } | ||
758 | if cc.idleTimer != nil { | ||
759 | cc.idleTimer.Stop() | ||
760 | } | ||
761 | |||
762 | trailers, err := commaSeparatedTrailers(req) | ||
763 | if err != nil { | ||
764 | return nil, err | ||
765 | } | ||
766 | hasTrailers := trailers != "" | ||
767 | |||
768 | cc.mu.Lock() | ||
769 | if err := cc.awaitOpenSlotForRequest(req); err != nil { | ||
770 | cc.mu.Unlock() | ||
771 | return nil, err | ||
772 | } | ||
773 | |||
774 | body := req.Body | ||
775 | contentLen := actualContentLength(req) | ||
776 | hasBody := contentLen != 0 | ||
777 | |||
778 | // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? | ||
779 | var requestedGzip bool | ||
780 | if !cc.t.disableCompression() && | ||
781 | req.Header.Get("Accept-Encoding") == "" && | ||
782 | req.Header.Get("Range") == "" && | ||
783 | req.Method != "HEAD" { | ||
784 | // Request gzip only, not deflate. Deflate is ambiguous and | ||
785 | // not as universally supported anyway. | ||
786 | // See: http://www.gzip.org/zlib/zlib_faq.html#faq38 | ||
787 | // | ||
788 | // Note that we don't request this for HEAD requests, | ||
789 | // due to a bug in nginx: | ||
790 | // http://trac.nginx.org/nginx/ticket/358 | ||
791 | // https://golang.org/issue/5522 | ||
792 | // | ||
793 | // We don't request gzip if the request is for a range, since | ||
794 | // auto-decoding a portion of a gzipped document will just fail | ||
795 | // anyway. See https://golang.org/issue/8923 | ||
796 | requestedGzip = true | ||
797 | } | ||
798 | |||
799 | // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is | ||
800 | // sent by writeRequestBody below, along with any Trailers, | ||
801 | // again in form HEADERS{1}, CONTINUATION{0,}) | ||
802 | hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen) | ||
803 | if err != nil { | ||
804 | cc.mu.Unlock() | ||
805 | return nil, err | ||
806 | } | ||
807 | |||
808 | cs := cc.newStream() | ||
809 | cs.req = req | ||
810 | cs.trace = requestTrace(req) | ||
811 | cs.requestedGzip = requestedGzip | ||
812 | bodyWriter := cc.t.getBodyWriterState(cs, body) | ||
813 | cs.on100 = bodyWriter.on100 | ||
814 | |||
815 | cc.wmu.Lock() | ||
816 | endStream := !hasBody && !hasTrailers | ||
817 | werr := cc.writeHeaders(cs.ID, endStream, hdrs) | ||
818 | cc.wmu.Unlock() | ||
819 | traceWroteHeaders(cs.trace) | ||
820 | cc.mu.Unlock() | ||
821 | |||
822 | if werr != nil { | ||
823 | if hasBody { | ||
824 | req.Body.Close() // per RoundTripper contract | ||
825 | bodyWriter.cancel() | ||
826 | } | ||
827 | cc.forgetStreamID(cs.ID) | ||
828 | // Don't bother sending a RST_STREAM (our write already failed; | ||
829 | // no need to keep writing) | ||
830 | traceWroteRequest(cs.trace, werr) | ||
831 | return nil, werr | ||
832 | } | ||
833 | |||
834 | var respHeaderTimer <-chan time.Time | ||
835 | if hasBody { | ||
836 | bodyWriter.scheduleBodyWrite() | ||
837 | } else { | ||
838 | traceWroteRequest(cs.trace, nil) | ||
839 | if d := cc.responseHeaderTimeout(); d != 0 { | ||
840 | timer := time.NewTimer(d) | ||
841 | defer timer.Stop() | ||
842 | respHeaderTimer = timer.C | ||
843 | } | ||
844 | } | ||
845 | |||
846 | readLoopResCh := cs.resc | ||
847 | bodyWritten := false | ||
848 | ctx := reqContext(req) | ||
849 | |||
850 | handleReadLoopResponse := func(re resAndError) (*http.Response, error) { | ||
851 | res := re.res | ||
852 | if re.err != nil || res.StatusCode > 299 { | ||
853 | // On error or status code 3xx, 4xx, 5xx, etc abort any | ||
854 | // ongoing write, assuming that the server doesn't care | ||
855 | // about our request body. If the server replied with 1xx or | ||
856 | // 2xx, however, then assume the server DOES potentially | ||
857 | // want our body (e.g. full-duplex streaming: | ||
858 | // golang.org/issue/13444). If it turns out the server | ||
859 | // doesn't, they'll RST_STREAM us soon enough. This is a | ||
860 | // heuristic to avoid adding knobs to Transport. Hopefully | ||
861 | // we can keep it. | ||
862 | bodyWriter.cancel() | ||
863 | cs.abortRequestBodyWrite(errStopReqBodyWrite) | ||
864 | } | ||
865 | if re.err != nil { | ||
866 | cc.mu.Lock() | ||
867 | afterBodyWrite := cs.startedWrite | ||
868 | cc.mu.Unlock() | ||
869 | cc.forgetStreamID(cs.ID) | ||
870 | if afterBodyWrite { | ||
871 | return nil, afterReqBodyWriteError{re.err} | ||
872 | } | ||
873 | return nil, re.err | ||
874 | } | ||
875 | res.Request = req | ||
876 | res.TLS = cc.tlsState | ||
877 | return res, nil | ||
878 | } | ||
879 | |||
880 | for { | ||
881 | select { | ||
882 | case re := <-readLoopResCh: | ||
883 | return handleReadLoopResponse(re) | ||
884 | case <-respHeaderTimer: | ||
885 | if !hasBody || bodyWritten { | ||
886 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | ||
887 | } else { | ||
888 | bodyWriter.cancel() | ||
889 | cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) | ||
890 | } | ||
891 | cc.forgetStreamID(cs.ID) | ||
892 | return nil, errTimeout | ||
893 | case <-ctx.Done(): | ||
894 | if !hasBody || bodyWritten { | ||
895 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | ||
896 | } else { | ||
897 | bodyWriter.cancel() | ||
898 | cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) | ||
899 | } | ||
900 | cc.forgetStreamID(cs.ID) | ||
901 | return nil, ctx.Err() | ||
902 | case <-req.Cancel: | ||
903 | if !hasBody || bodyWritten { | ||
904 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | ||
905 | } else { | ||
906 | bodyWriter.cancel() | ||
907 | cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) | ||
908 | } | ||
909 | cc.forgetStreamID(cs.ID) | ||
910 | return nil, errRequestCanceled | ||
911 | case <-cs.peerReset: | ||
912 | // processResetStream already removed the | ||
913 | // stream from the streams map; no need for | ||
914 | // forgetStreamID. | ||
915 | return nil, cs.resetErr | ||
916 | case err := <-bodyWriter.resc: | ||
917 | // Prefer the read loop's response, if available. Issue 16102. | ||
918 | select { | ||
919 | case re := <-readLoopResCh: | ||
920 | return handleReadLoopResponse(re) | ||
921 | default: | ||
922 | } | ||
923 | if err != nil { | ||
924 | return nil, err | ||
925 | } | ||
926 | bodyWritten = true | ||
927 | if d := cc.responseHeaderTimeout(); d != 0 { | ||
928 | timer := time.NewTimer(d) | ||
929 | defer timer.Stop() | ||
930 | respHeaderTimer = timer.C | ||
931 | } | ||
932 | } | ||
933 | } | ||
934 | } | ||
935 | |||
936 | // awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams. | ||
937 | // Must hold cc.mu. | ||
938 | func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error { | ||
939 | var waitingForConn chan struct{} | ||
940 | var waitingForConnErr error // guarded by cc.mu | ||
941 | for { | ||
942 | cc.lastActive = time.Now() | ||
943 | if cc.closed || !cc.canTakeNewRequestLocked() { | ||
944 | return errClientConnUnusable | ||
945 | } | ||
946 | if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) { | ||
947 | if waitingForConn != nil { | ||
948 | close(waitingForConn) | ||
949 | } | ||
950 | return nil | ||
951 | } | ||
952 | // Unfortunately, we cannot wait on a condition variable and channel at | ||
953 | // the same time, so instead, we spin up a goroutine to check if the | ||
954 | // request is canceled while we wait for a slot to open in the connection. | ||
955 | if waitingForConn == nil { | ||
956 | waitingForConn = make(chan struct{}) | ||
957 | go func() { | ||
958 | if err := awaitRequestCancel(req, waitingForConn); err != nil { | ||
959 | cc.mu.Lock() | ||
960 | waitingForConnErr = err | ||
961 | cc.cond.Broadcast() | ||
962 | cc.mu.Unlock() | ||
963 | } | ||
964 | }() | ||
965 | } | ||
966 | cc.pendingRequests++ | ||
967 | cc.cond.Wait() | ||
968 | cc.pendingRequests-- | ||
969 | if waitingForConnErr != nil { | ||
970 | return waitingForConnErr | ||
971 | } | ||
972 | } | ||
973 | } | ||
974 | |||
975 | // requires cc.wmu be held | ||
976 | func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error { | ||
977 | first := true // first frame written (HEADERS is first, then CONTINUATION) | ||
978 | frameSize := int(cc.maxFrameSize) | ||
979 | for len(hdrs) > 0 && cc.werr == nil { | ||
980 | chunk := hdrs | ||
981 | if len(chunk) > frameSize { | ||
982 | chunk = chunk[:frameSize] | ||
983 | } | ||
984 | hdrs = hdrs[len(chunk):] | ||
985 | endHeaders := len(hdrs) == 0 | ||
986 | if first { | ||
987 | cc.fr.WriteHeaders(HeadersFrameParam{ | ||
988 | StreamID: streamID, | ||
989 | BlockFragment: chunk, | ||
990 | EndStream: endStream, | ||
991 | EndHeaders: endHeaders, | ||
992 | }) | ||
993 | first = false | ||
994 | } else { | ||
995 | cc.fr.WriteContinuation(streamID, endHeaders, chunk) | ||
996 | } | ||
997 | } | ||
998 | // TODO(bradfitz): this Flush could potentially block (as | ||
999 | // could the WriteHeaders call(s) above), which means they | ||
1000 | // wouldn't respond to Request.Cancel being readable. That's | ||
1001 | // rare, but this should probably be in a goroutine. | ||
1002 | cc.bw.Flush() | ||
1003 | return cc.werr | ||
1004 | } | ||
1005 | |||
1006 | // internal error values; they don't escape to callers | ||
1007 | var ( | ||
1008 | // abort request body write; don't send cancel | ||
1009 | errStopReqBodyWrite = errors.New("http2: aborting request body write") | ||
1010 | |||
1011 | // abort request body write, but send stream reset of cancel. | ||
1012 | errStopReqBodyWriteAndCancel = errors.New("http2: canceling request") | ||
1013 | ) | ||
1014 | |||
1015 | func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) { | ||
1016 | cc := cs.cc | ||
1017 | sentEnd := false // whether we sent the final DATA frame w/ END_STREAM | ||
1018 | buf := cc.frameScratchBuffer() | ||
1019 | defer cc.putFrameScratchBuffer(buf) | ||
1020 | |||
1021 | defer func() { | ||
1022 | traceWroteRequest(cs.trace, err) | ||
1023 | // TODO: write h12Compare test showing whether | ||
1024 | // Request.Body is closed by the Transport, | ||
1025 | // and in multiple cases: server replies <=299 and >299 | ||
1026 | // while still writing request body | ||
1027 | cerr := bodyCloser.Close() | ||
1028 | if err == nil { | ||
1029 | err = cerr | ||
1030 | } | ||
1031 | }() | ||
1032 | |||
1033 | req := cs.req | ||
1034 | hasTrailers := req.Trailer != nil | ||
1035 | |||
1036 | var sawEOF bool | ||
1037 | for !sawEOF { | ||
1038 | n, err := body.Read(buf) | ||
1039 | if err == io.EOF { | ||
1040 | sawEOF = true | ||
1041 | err = nil | ||
1042 | } else if err != nil { | ||
1043 | return err | ||
1044 | } | ||
1045 | |||
1046 | remain := buf[:n] | ||
1047 | for len(remain) > 0 && err == nil { | ||
1048 | var allowed int32 | ||
1049 | allowed, err = cs.awaitFlowControl(len(remain)) | ||
1050 | switch { | ||
1051 | case err == errStopReqBodyWrite: | ||
1052 | return err | ||
1053 | case err == errStopReqBodyWriteAndCancel: | ||
1054 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | ||
1055 | return err | ||
1056 | case err != nil: | ||
1057 | return err | ||
1058 | } | ||
1059 | cc.wmu.Lock() | ||
1060 | data := remain[:allowed] | ||
1061 | remain = remain[allowed:] | ||
1062 | sentEnd = sawEOF && len(remain) == 0 && !hasTrailers | ||
1063 | err = cc.fr.WriteData(cs.ID, sentEnd, data) | ||
1064 | if err == nil { | ||
1065 | // TODO(bradfitz): this flush is for latency, not bandwidth. | ||
1066 | // Most requests won't need this. Make this opt-in or | ||
1067 | // opt-out? Use some heuristic on the body type? Nagel-like | ||
1068 | // timers? Based on 'n'? Only last chunk of this for loop, | ||
1069 | // unless flow control tokens are low? For now, always. | ||
1070 | // If we change this, see comment below. | ||
1071 | err = cc.bw.Flush() | ||
1072 | } | ||
1073 | cc.wmu.Unlock() | ||
1074 | } | ||
1075 | if err != nil { | ||
1076 | return err | ||
1077 | } | ||
1078 | } | ||
1079 | |||
1080 | if sentEnd { | ||
1081 | // Already sent END_STREAM (which implies we have no | ||
1082 | // trailers) and flushed, because currently all | ||
1083 | // WriteData frames above get a flush. So we're done. | ||
1084 | return nil | ||
1085 | } | ||
1086 | |||
1087 | var trls []byte | ||
1088 | if hasTrailers { | ||
1089 | cc.mu.Lock() | ||
1090 | trls, err = cc.encodeTrailers(req) | ||
1091 | cc.mu.Unlock() | ||
1092 | if err != nil { | ||
1093 | cc.writeStreamReset(cs.ID, ErrCodeInternal, err) | ||
1094 | cc.forgetStreamID(cs.ID) | ||
1095 | return err | ||
1096 | } | ||
1097 | } | ||
1098 | |||
1099 | cc.wmu.Lock() | ||
1100 | defer cc.wmu.Unlock() | ||
1101 | |||
1102 | // Two ways to send END_STREAM: either with trailers, or | ||
1103 | // with an empty DATA frame. | ||
1104 | if len(trls) > 0 { | ||
1105 | err = cc.writeHeaders(cs.ID, true, trls) | ||
1106 | } else { | ||
1107 | err = cc.fr.WriteData(cs.ID, true, nil) | ||
1108 | } | ||
1109 | if ferr := cc.bw.Flush(); ferr != nil && err == nil { | ||
1110 | err = ferr | ||
1111 | } | ||
1112 | return err | ||
1113 | } | ||
1114 | |||
1115 | // awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow | ||
1116 | // control tokens from the server. | ||
1117 | // It returns either the non-zero number of tokens taken or an error | ||
1118 | // if the stream is dead. | ||
1119 | func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) { | ||
1120 | cc := cs.cc | ||
1121 | cc.mu.Lock() | ||
1122 | defer cc.mu.Unlock() | ||
1123 | for { | ||
1124 | if cc.closed { | ||
1125 | return 0, errClientConnClosed | ||
1126 | } | ||
1127 | if cs.stopReqBody != nil { | ||
1128 | return 0, cs.stopReqBody | ||
1129 | } | ||
1130 | if err := cs.checkResetOrDone(); err != nil { | ||
1131 | return 0, err | ||
1132 | } | ||
1133 | if a := cs.flow.available(); a > 0 { | ||
1134 | take := a | ||
1135 | if int(take) > maxBytes { | ||
1136 | |||
1137 | take = int32(maxBytes) // can't truncate int; take is int32 | ||
1138 | } | ||
1139 | if take > int32(cc.maxFrameSize) { | ||
1140 | take = int32(cc.maxFrameSize) | ||
1141 | } | ||
1142 | cs.flow.take(take) | ||
1143 | return take, nil | ||
1144 | } | ||
1145 | cc.cond.Wait() | ||
1146 | } | ||
1147 | } | ||
1148 | |||
1149 | type badStringError struct { | ||
1150 | what string | ||
1151 | str string | ||
1152 | } | ||
1153 | |||
1154 | func (e *badStringError) Error() string { return fmt.Sprintf("%s %q", e.what, e.str) } | ||
1155 | |||
1156 | // requires cc.mu be held. | ||
1157 | func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) { | ||
1158 | cc.hbuf.Reset() | ||
1159 | |||
1160 | host := req.Host | ||
1161 | if host == "" { | ||
1162 | host = req.URL.Host | ||
1163 | } | ||
1164 | host, err := httplex.PunycodeHostPort(host) | ||
1165 | if err != nil { | ||
1166 | return nil, err | ||
1167 | } | ||
1168 | |||
1169 | var path string | ||
1170 | if req.Method != "CONNECT" { | ||
1171 | path = req.URL.RequestURI() | ||
1172 | if !validPseudoPath(path) { | ||
1173 | orig := path | ||
1174 | path = strings.TrimPrefix(path, req.URL.Scheme+"://"+host) | ||
1175 | if !validPseudoPath(path) { | ||
1176 | if req.URL.Opaque != "" { | ||
1177 | return nil, fmt.Errorf("invalid request :path %q from URL.Opaque = %q", orig, req.URL.Opaque) | ||
1178 | } else { | ||
1179 | return nil, fmt.Errorf("invalid request :path %q", orig) | ||
1180 | } | ||
1181 | } | ||
1182 | } | ||
1183 | } | ||
1184 | |||
1185 | // Check for any invalid headers and return an error before we | ||
1186 | // potentially pollute our hpack state. (We want to be able to | ||
1187 | // continue to reuse the hpack encoder for future requests) | ||
1188 | for k, vv := range req.Header { | ||
1189 | if !httplex.ValidHeaderFieldName(k) { | ||
1190 | return nil, fmt.Errorf("invalid HTTP header name %q", k) | ||
1191 | } | ||
1192 | for _, v := range vv { | ||
1193 | if !httplex.ValidHeaderFieldValue(v) { | ||
1194 | return nil, fmt.Errorf("invalid HTTP header value %q for header %q", v, k) | ||
1195 | } | ||
1196 | } | ||
1197 | } | ||
1198 | |||
1199 | enumerateHeaders := func(f func(name, value string)) { | ||
1200 | // 8.1.2.3 Request Pseudo-Header Fields | ||
1201 | // The :path pseudo-header field includes the path and query parts of the | ||
1202 | // target URI (the path-absolute production and optionally a '?' character | ||
1203 | // followed by the query production (see Sections 3.3 and 3.4 of | ||
1204 | // [RFC3986]). | ||
1205 | f(":authority", host) | ||
1206 | f(":method", req.Method) | ||
1207 | if req.Method != "CONNECT" { | ||
1208 | f(":path", path) | ||
1209 | f(":scheme", req.URL.Scheme) | ||
1210 | } | ||
1211 | if trailers != "" { | ||
1212 | f("trailer", trailers) | ||
1213 | } | ||
1214 | |||
1215 | var didUA bool | ||
1216 | for k, vv := range req.Header { | ||
1217 | if strings.EqualFold(k, "host") || strings.EqualFold(k, "content-length") { | ||
1218 | // Host is :authority, already sent. | ||
1219 | // Content-Length is automatic, set below. | ||
1220 | continue | ||
1221 | } else if strings.EqualFold(k, "connection") || strings.EqualFold(k, "proxy-connection") || | ||
1222 | strings.EqualFold(k, "transfer-encoding") || strings.EqualFold(k, "upgrade") || | ||
1223 | strings.EqualFold(k, "keep-alive") { | ||
1224 | // Per 8.1.2.2 Connection-Specific Header | ||
1225 | // Fields, don't send connection-specific | ||
1226 | // fields. We have already checked if any | ||
1227 | // are error-worthy so just ignore the rest. | ||
1228 | continue | ||
1229 | } else if strings.EqualFold(k, "user-agent") { | ||
1230 | // Match Go's http1 behavior: at most one | ||
1231 | // User-Agent. If set to nil or empty string, | ||
1232 | // then omit it. Otherwise if not mentioned, | ||
1233 | // include the default (below). | ||
1234 | didUA = true | ||
1235 | if len(vv) < 1 { | ||
1236 | continue | ||
1237 | } | ||
1238 | vv = vv[:1] | ||
1239 | if vv[0] == "" { | ||
1240 | continue | ||
1241 | } | ||
1242 | |||
1243 | } | ||
1244 | |||
1245 | for _, v := range vv { | ||
1246 | f(k, v) | ||
1247 | } | ||
1248 | } | ||
1249 | if shouldSendReqContentLength(req.Method, contentLength) { | ||
1250 | f("content-length", strconv.FormatInt(contentLength, 10)) | ||
1251 | } | ||
1252 | if addGzipHeader { | ||
1253 | f("accept-encoding", "gzip") | ||
1254 | } | ||
1255 | if !didUA { | ||
1256 | f("user-agent", defaultUserAgent) | ||
1257 | } | ||
1258 | } | ||
1259 | |||
1260 | // Do a first pass over the headers counting bytes to ensure | ||
1261 | // we don't exceed cc.peerMaxHeaderListSize. This is done as a | ||
1262 | // separate pass before encoding the headers to prevent | ||
1263 | // modifying the hpack state. | ||
1264 | hlSize := uint64(0) | ||
1265 | enumerateHeaders(func(name, value string) { | ||
1266 | hf := hpack.HeaderField{Name: name, Value: value} | ||
1267 | hlSize += uint64(hf.Size()) | ||
1268 | }) | ||
1269 | |||
1270 | if hlSize > cc.peerMaxHeaderListSize { | ||
1271 | return nil, errRequestHeaderListSize | ||
1272 | } | ||
1273 | |||
1274 | // Header list size is ok. Write the headers. | ||
1275 | enumerateHeaders(func(name, value string) { | ||
1276 | cc.writeHeader(strings.ToLower(name), value) | ||
1277 | }) | ||
1278 | |||
1279 | return cc.hbuf.Bytes(), nil | ||
1280 | } | ||
1281 | |||
1282 | // shouldSendReqContentLength reports whether the http2.Transport should send | ||
1283 | // a "content-length" request header. This logic is basically a copy of the net/http | ||
1284 | // transferWriter.shouldSendContentLength. | ||
1285 | // The contentLength is the corrected contentLength (so 0 means actually 0, not unknown). | ||
1286 | // -1 means unknown. | ||
1287 | func shouldSendReqContentLength(method string, contentLength int64) bool { | ||
1288 | if contentLength > 0 { | ||
1289 | return true | ||
1290 | } | ||
1291 | if contentLength < 0 { | ||
1292 | return false | ||
1293 | } | ||
1294 | // For zero bodies, whether we send a content-length depends on the method. | ||
1295 | // It also kinda doesn't matter for http2 either way, with END_STREAM. | ||
1296 | switch method { | ||
1297 | case "POST", "PUT", "PATCH": | ||
1298 | return true | ||
1299 | default: | ||
1300 | return false | ||
1301 | } | ||
1302 | } | ||
1303 | |||
1304 | // requires cc.mu be held. | ||
1305 | func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) { | ||
1306 | cc.hbuf.Reset() | ||
1307 | |||
1308 | hlSize := uint64(0) | ||
1309 | for k, vv := range req.Trailer { | ||
1310 | for _, v := range vv { | ||
1311 | hf := hpack.HeaderField{Name: k, Value: v} | ||
1312 | hlSize += uint64(hf.Size()) | ||
1313 | } | ||
1314 | } | ||
1315 | if hlSize > cc.peerMaxHeaderListSize { | ||
1316 | return nil, errRequestHeaderListSize | ||
1317 | } | ||
1318 | |||
1319 | for k, vv := range req.Trailer { | ||
1320 | // Transfer-Encoding, etc.. have already been filtered at the | ||
1321 | // start of RoundTrip | ||
1322 | lowKey := strings.ToLower(k) | ||
1323 | for _, v := range vv { | ||
1324 | cc.writeHeader(lowKey, v) | ||
1325 | } | ||
1326 | } | ||
1327 | return cc.hbuf.Bytes(), nil | ||
1328 | } | ||
1329 | |||
1330 | func (cc *ClientConn) writeHeader(name, value string) { | ||
1331 | if VerboseLogs { | ||
1332 | log.Printf("http2: Transport encoding header %q = %q", name, value) | ||
1333 | } | ||
1334 | cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value}) | ||
1335 | } | ||
1336 | |||
1337 | type resAndError struct { | ||
1338 | res *http.Response | ||
1339 | err error | ||
1340 | } | ||
1341 | |||
1342 | // requires cc.mu be held. | ||
1343 | func (cc *ClientConn) newStream() *clientStream { | ||
1344 | cs := &clientStream{ | ||
1345 | cc: cc, | ||
1346 | ID: cc.nextStreamID, | ||
1347 | resc: make(chan resAndError, 1), | ||
1348 | peerReset: make(chan struct{}), | ||
1349 | done: make(chan struct{}), | ||
1350 | } | ||
1351 | cs.flow.add(int32(cc.initialWindowSize)) | ||
1352 | cs.flow.setConnFlow(&cc.flow) | ||
1353 | cs.inflow.add(transportDefaultStreamFlow) | ||
1354 | cs.inflow.setConnFlow(&cc.inflow) | ||
1355 | cc.nextStreamID += 2 | ||
1356 | cc.streams[cs.ID] = cs | ||
1357 | return cs | ||
1358 | } | ||
1359 | |||
1360 | func (cc *ClientConn) forgetStreamID(id uint32) { | ||
1361 | cc.streamByID(id, true) | ||
1362 | } | ||
1363 | |||
1364 | func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream { | ||
1365 | cc.mu.Lock() | ||
1366 | defer cc.mu.Unlock() | ||
1367 | cs := cc.streams[id] | ||
1368 | if andRemove && cs != nil && !cc.closed { | ||
1369 | cc.lastActive = time.Now() | ||
1370 | delete(cc.streams, id) | ||
1371 | if len(cc.streams) == 0 && cc.idleTimer != nil { | ||
1372 | cc.idleTimer.Reset(cc.idleTimeout) | ||
1373 | } | ||
1374 | close(cs.done) | ||
1375 | // Wake up checkResetOrDone via clientStream.awaitFlowControl and | ||
1376 | // wake up RoundTrip if there is a pending request. | ||
1377 | cc.cond.Broadcast() | ||
1378 | } | ||
1379 | return cs | ||
1380 | } | ||
1381 | |||
1382 | // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop. | ||
1383 | type clientConnReadLoop struct { | ||
1384 | cc *ClientConn | ||
1385 | activeRes map[uint32]*clientStream // keyed by streamID | ||
1386 | closeWhenIdle bool | ||
1387 | } | ||
1388 | |||
1389 | // readLoop runs in its own goroutine and reads and dispatches frames. | ||
1390 | func (cc *ClientConn) readLoop() { | ||
1391 | rl := &clientConnReadLoop{ | ||
1392 | cc: cc, | ||
1393 | activeRes: make(map[uint32]*clientStream), | ||
1394 | } | ||
1395 | |||
1396 | defer rl.cleanup() | ||
1397 | cc.readerErr = rl.run() | ||
1398 | if ce, ok := cc.readerErr.(ConnectionError); ok { | ||
1399 | cc.wmu.Lock() | ||
1400 | cc.fr.WriteGoAway(0, ErrCode(ce), nil) | ||
1401 | cc.wmu.Unlock() | ||
1402 | } | ||
1403 | } | ||
1404 | |||
1405 | // GoAwayError is returned by the Transport when the server closes the | ||
1406 | // TCP connection after sending a GOAWAY frame. | ||
1407 | type GoAwayError struct { | ||
1408 | LastStreamID uint32 | ||
1409 | ErrCode ErrCode | ||
1410 | DebugData string | ||
1411 | } | ||
1412 | |||
1413 | func (e GoAwayError) Error() string { | ||
1414 | return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q", | ||
1415 | e.LastStreamID, e.ErrCode, e.DebugData) | ||
1416 | } | ||
1417 | |||
1418 | func isEOFOrNetReadError(err error) bool { | ||
1419 | if err == io.EOF { | ||
1420 | return true | ||
1421 | } | ||
1422 | ne, ok := err.(*net.OpError) | ||
1423 | return ok && ne.Op == "read" | ||
1424 | } | ||
1425 | |||
1426 | func (rl *clientConnReadLoop) cleanup() { | ||
1427 | cc := rl.cc | ||
1428 | defer cc.tconn.Close() | ||
1429 | defer cc.t.connPool().MarkDead(cc) | ||
1430 | defer close(cc.readerDone) | ||
1431 | |||
1432 | if cc.idleTimer != nil { | ||
1433 | cc.idleTimer.Stop() | ||
1434 | } | ||
1435 | |||
1436 | // Close any response bodies if the server closes prematurely. | ||
1437 | // TODO: also do this if we've written the headers but not | ||
1438 | // gotten a response yet. | ||
1439 | err := cc.readerErr | ||
1440 | cc.mu.Lock() | ||
1441 | if cc.goAway != nil && isEOFOrNetReadError(err) { | ||
1442 | err = GoAwayError{ | ||
1443 | LastStreamID: cc.goAway.LastStreamID, | ||
1444 | ErrCode: cc.goAway.ErrCode, | ||
1445 | DebugData: cc.goAwayDebug, | ||
1446 | } | ||
1447 | } else if err == io.EOF { | ||
1448 | err = io.ErrUnexpectedEOF | ||
1449 | } | ||
1450 | for _, cs := range rl.activeRes { | ||
1451 | cs.bufPipe.CloseWithError(err) | ||
1452 | } | ||
1453 | for _, cs := range cc.streams { | ||
1454 | select { | ||
1455 | case cs.resc <- resAndError{err: err}: | ||
1456 | default: | ||
1457 | } | ||
1458 | close(cs.done) | ||
1459 | } | ||
1460 | cc.closed = true | ||
1461 | cc.cond.Broadcast() | ||
1462 | cc.mu.Unlock() | ||
1463 | } | ||
1464 | |||
1465 | func (rl *clientConnReadLoop) run() error { | ||
1466 | cc := rl.cc | ||
1467 | rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse | ||
1468 | gotReply := false // ever saw a HEADERS reply | ||
1469 | gotSettings := false | ||
1470 | for { | ||
1471 | f, err := cc.fr.ReadFrame() | ||
1472 | if err != nil { | ||
1473 | cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) | ||
1474 | } | ||
1475 | if se, ok := err.(StreamError); ok { | ||
1476 | if cs := cc.streamByID(se.StreamID, false); cs != nil { | ||
1477 | cs.cc.writeStreamReset(cs.ID, se.Code, err) | ||
1478 | cs.cc.forgetStreamID(cs.ID) | ||
1479 | if se.Cause == nil { | ||
1480 | se.Cause = cc.fr.errDetail | ||
1481 | } | ||
1482 | rl.endStreamError(cs, se) | ||
1483 | } | ||
1484 | continue | ||
1485 | } else if err != nil { | ||
1486 | return err | ||
1487 | } | ||
1488 | if VerboseLogs { | ||
1489 | cc.vlogf("http2: Transport received %s", summarizeFrame(f)) | ||
1490 | } | ||
1491 | if !gotSettings { | ||
1492 | if _, ok := f.(*SettingsFrame); !ok { | ||
1493 | cc.logf("protocol error: received %T before a SETTINGS frame", f) | ||
1494 | return ConnectionError(ErrCodeProtocol) | ||
1495 | } | ||
1496 | gotSettings = true | ||
1497 | } | ||
1498 | maybeIdle := false // whether frame might transition us to idle | ||
1499 | |||
1500 | switch f := f.(type) { | ||
1501 | case *MetaHeadersFrame: | ||
1502 | err = rl.processHeaders(f) | ||
1503 | maybeIdle = true | ||
1504 | gotReply = true | ||
1505 | case *DataFrame: | ||
1506 | err = rl.processData(f) | ||
1507 | maybeIdle = true | ||
1508 | case *GoAwayFrame: | ||
1509 | err = rl.processGoAway(f) | ||
1510 | maybeIdle = true | ||
1511 | case *RSTStreamFrame: | ||
1512 | err = rl.processResetStream(f) | ||
1513 | maybeIdle = true | ||
1514 | case *SettingsFrame: | ||
1515 | err = rl.processSettings(f) | ||
1516 | case *PushPromiseFrame: | ||
1517 | err = rl.processPushPromise(f) | ||
1518 | case *WindowUpdateFrame: | ||
1519 | err = rl.processWindowUpdate(f) | ||
1520 | case *PingFrame: | ||
1521 | err = rl.processPing(f) | ||
1522 | default: | ||
1523 | cc.logf("Transport: unhandled response frame type %T", f) | ||
1524 | } | ||
1525 | if err != nil { | ||
1526 | if VerboseLogs { | ||
1527 | cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err) | ||
1528 | } | ||
1529 | return err | ||
1530 | } | ||
1531 | if rl.closeWhenIdle && gotReply && maybeIdle && len(rl.activeRes) == 0 { | ||
1532 | cc.closeIfIdle() | ||
1533 | } | ||
1534 | } | ||
1535 | } | ||
1536 | |||
1537 | func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error { | ||
1538 | cc := rl.cc | ||
1539 | cs := cc.streamByID(f.StreamID, f.StreamEnded()) | ||
1540 | if cs == nil { | ||
1541 | // We'd get here if we canceled a request while the | ||
1542 | // server had its response still in flight. So if this | ||
1543 | // was just something we canceled, ignore it. | ||
1544 | return nil | ||
1545 | } | ||
1546 | if !cs.firstByte { | ||
1547 | if cs.trace != nil { | ||
1548 | // TODO(bradfitz): move first response byte earlier, | ||
1549 | // when we first read the 9 byte header, not waiting | ||
1550 | // until all the HEADERS+CONTINUATION frames have been | ||
1551 | // merged. This works for now. | ||
1552 | traceFirstResponseByte(cs.trace) | ||
1553 | } | ||
1554 | cs.firstByte = true | ||
1555 | } | ||
1556 | if !cs.pastHeaders { | ||
1557 | cs.pastHeaders = true | ||
1558 | } else { | ||
1559 | return rl.processTrailers(cs, f) | ||
1560 | } | ||
1561 | |||
1562 | res, err := rl.handleResponse(cs, f) | ||
1563 | if err != nil { | ||
1564 | if _, ok := err.(ConnectionError); ok { | ||
1565 | return err | ||
1566 | } | ||
1567 | // Any other error type is a stream error. | ||
1568 | cs.cc.writeStreamReset(f.StreamID, ErrCodeProtocol, err) | ||
1569 | cs.resc <- resAndError{err: err} | ||
1570 | return nil // return nil from process* funcs to keep conn alive | ||
1571 | } | ||
1572 | if res == nil { | ||
1573 | // (nil, nil) special case. See handleResponse docs. | ||
1574 | return nil | ||
1575 | } | ||
1576 | if res.Body != noBody { | ||
1577 | rl.activeRes[cs.ID] = cs | ||
1578 | } | ||
1579 | cs.resTrailer = &res.Trailer | ||
1580 | cs.resc <- resAndError{res: res} | ||
1581 | return nil | ||
1582 | } | ||
1583 | |||
1584 | // may return error types nil, or ConnectionError. Any other error value | ||
1585 | // is a StreamError of type ErrCodeProtocol. The returned error in that case | ||
1586 | // is the detail. | ||
1587 | // | ||
1588 | // As a special case, handleResponse may return (nil, nil) to skip the | ||
1589 | // frame (currently only used for 100 expect continue). This special | ||
1590 | // case is going away after Issue 13851 is fixed. | ||
1591 | func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*http.Response, error) { | ||
1592 | if f.Truncated { | ||
1593 | return nil, errResponseHeaderListSize | ||
1594 | } | ||
1595 | |||
1596 | status := f.PseudoValue("status") | ||
1597 | if status == "" { | ||
1598 | return nil, errors.New("missing status pseudo header") | ||
1599 | } | ||
1600 | statusCode, err := strconv.Atoi(status) | ||
1601 | if err != nil { | ||
1602 | return nil, errors.New("malformed non-numeric status pseudo header") | ||
1603 | } | ||
1604 | |||
1605 | if statusCode == 100 { | ||
1606 | traceGot100Continue(cs.trace) | ||
1607 | if cs.on100 != nil { | ||
1608 | cs.on100() // forces any write delay timer to fire | ||
1609 | } | ||
1610 | cs.pastHeaders = false // do it all again | ||
1611 | return nil, nil | ||
1612 | } | ||
1613 | |||
1614 | header := make(http.Header) | ||
1615 | res := &http.Response{ | ||
1616 | Proto: "HTTP/2.0", | ||
1617 | ProtoMajor: 2, | ||
1618 | Header: header, | ||
1619 | StatusCode: statusCode, | ||
1620 | Status: status + " " + http.StatusText(statusCode), | ||
1621 | } | ||
1622 | for _, hf := range f.RegularFields() { | ||
1623 | key := http.CanonicalHeaderKey(hf.Name) | ||
1624 | if key == "Trailer" { | ||
1625 | t := res.Trailer | ||
1626 | if t == nil { | ||
1627 | t = make(http.Header) | ||
1628 | res.Trailer = t | ||
1629 | } | ||
1630 | foreachHeaderElement(hf.Value, func(v string) { | ||
1631 | t[http.CanonicalHeaderKey(v)] = nil | ||
1632 | }) | ||
1633 | } else { | ||
1634 | header[key] = append(header[key], hf.Value) | ||
1635 | } | ||
1636 | } | ||
1637 | |||
1638 | streamEnded := f.StreamEnded() | ||
1639 | isHead := cs.req.Method == "HEAD" | ||
1640 | if !streamEnded || isHead { | ||
1641 | res.ContentLength = -1 | ||
1642 | if clens := res.Header["Content-Length"]; len(clens) == 1 { | ||
1643 | if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil { | ||
1644 | res.ContentLength = clen64 | ||
1645 | } else { | ||
1646 | // TODO: care? unlike http/1, it won't mess up our framing, so it's | ||
1647 | // more safe smuggling-wise to ignore. | ||
1648 | } | ||
1649 | } else if len(clens) > 1 { | ||
1650 | // TODO: care? unlike http/1, it won't mess up our framing, so it's | ||
1651 | // more safe smuggling-wise to ignore. | ||
1652 | } | ||
1653 | } | ||
1654 | |||
1655 | if streamEnded || isHead { | ||
1656 | res.Body = noBody | ||
1657 | return res, nil | ||
1658 | } | ||
1659 | |||
1660 | cs.bufPipe = pipe{b: &dataBuffer{expected: res.ContentLength}} | ||
1661 | cs.bytesRemain = res.ContentLength | ||
1662 | res.Body = transportResponseBody{cs} | ||
1663 | go cs.awaitRequestCancel(cs.req) | ||
1664 | |||
1665 | if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" { | ||
1666 | res.Header.Del("Content-Encoding") | ||
1667 | res.Header.Del("Content-Length") | ||
1668 | res.ContentLength = -1 | ||
1669 | res.Body = &gzipReader{body: res.Body} | ||
1670 | setResponseUncompressed(res) | ||
1671 | } | ||
1672 | return res, nil | ||
1673 | } | ||
1674 | |||
1675 | func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error { | ||
1676 | if cs.pastTrailers { | ||
1677 | // Too many HEADERS frames for this stream. | ||
1678 | return ConnectionError(ErrCodeProtocol) | ||
1679 | } | ||
1680 | cs.pastTrailers = true | ||
1681 | if !f.StreamEnded() { | ||
1682 | // We expect that any headers for trailers also | ||
1683 | // has END_STREAM. | ||
1684 | return ConnectionError(ErrCodeProtocol) | ||
1685 | } | ||
1686 | if len(f.PseudoFields()) > 0 { | ||
1687 | // No pseudo header fields are defined for trailers. | ||
1688 | // TODO: ConnectionError might be overly harsh? Check. | ||
1689 | return ConnectionError(ErrCodeProtocol) | ||
1690 | } | ||
1691 | |||
1692 | trailer := make(http.Header) | ||
1693 | for _, hf := range f.RegularFields() { | ||
1694 | key := http.CanonicalHeaderKey(hf.Name) | ||
1695 | trailer[key] = append(trailer[key], hf.Value) | ||
1696 | } | ||
1697 | cs.trailer = trailer | ||
1698 | |||
1699 | rl.endStream(cs) | ||
1700 | return nil | ||
1701 | } | ||
1702 | |||
1703 | // transportResponseBody is the concrete type of Transport.RoundTrip's | ||
1704 | // Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body. | ||
1705 | // On Close it sends RST_STREAM if EOF wasn't already seen. | ||
1706 | type transportResponseBody struct { | ||
1707 | cs *clientStream | ||
1708 | } | ||
1709 | |||
1710 | func (b transportResponseBody) Read(p []byte) (n int, err error) { | ||
1711 | cs := b.cs | ||
1712 | cc := cs.cc | ||
1713 | |||
1714 | if cs.readErr != nil { | ||
1715 | return 0, cs.readErr | ||
1716 | } | ||
1717 | n, err = b.cs.bufPipe.Read(p) | ||
1718 | if cs.bytesRemain != -1 { | ||
1719 | if int64(n) > cs.bytesRemain { | ||
1720 | n = int(cs.bytesRemain) | ||
1721 | if err == nil { | ||
1722 | err = errors.New("net/http: server replied with more than declared Content-Length; truncated") | ||
1723 | cc.writeStreamReset(cs.ID, ErrCodeProtocol, err) | ||
1724 | } | ||
1725 | cs.readErr = err | ||
1726 | return int(cs.bytesRemain), err | ||
1727 | } | ||
1728 | cs.bytesRemain -= int64(n) | ||
1729 | if err == io.EOF && cs.bytesRemain > 0 { | ||
1730 | err = io.ErrUnexpectedEOF | ||
1731 | cs.readErr = err | ||
1732 | return n, err | ||
1733 | } | ||
1734 | } | ||
1735 | if n == 0 { | ||
1736 | // No flow control tokens to send back. | ||
1737 | return | ||
1738 | } | ||
1739 | |||
1740 | cc.mu.Lock() | ||
1741 | defer cc.mu.Unlock() | ||
1742 | |||
1743 | var connAdd, streamAdd int32 | ||
1744 | // Check the conn-level first, before the stream-level. | ||
1745 | if v := cc.inflow.available(); v < transportDefaultConnFlow/2 { | ||
1746 | connAdd = transportDefaultConnFlow - v | ||
1747 | cc.inflow.add(connAdd) | ||
1748 | } | ||
1749 | if err == nil { // No need to refresh if the stream is over or failed. | ||
1750 | // Consider any buffered body data (read from the conn but not | ||
1751 | // consumed by the client) when computing flow control for this | ||
1752 | // stream. | ||
1753 | v := int(cs.inflow.available()) + cs.bufPipe.Len() | ||
1754 | if v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh { | ||
1755 | streamAdd = int32(transportDefaultStreamFlow - v) | ||
1756 | cs.inflow.add(streamAdd) | ||
1757 | } | ||
1758 | } | ||
1759 | if connAdd != 0 || streamAdd != 0 { | ||
1760 | cc.wmu.Lock() | ||
1761 | defer cc.wmu.Unlock() | ||
1762 | if connAdd != 0 { | ||
1763 | cc.fr.WriteWindowUpdate(0, mustUint31(connAdd)) | ||
1764 | } | ||
1765 | if streamAdd != 0 { | ||
1766 | cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd)) | ||
1767 | } | ||
1768 | cc.bw.Flush() | ||
1769 | } | ||
1770 | return | ||
1771 | } | ||
1772 | |||
1773 | var errClosedResponseBody = errors.New("http2: response body closed") | ||
1774 | |||
1775 | func (b transportResponseBody) Close() error { | ||
1776 | cs := b.cs | ||
1777 | cc := cs.cc | ||
1778 | |||
1779 | serverSentStreamEnd := cs.bufPipe.Err() == io.EOF | ||
1780 | unread := cs.bufPipe.Len() | ||
1781 | |||
1782 | if unread > 0 || !serverSentStreamEnd { | ||
1783 | cc.mu.Lock() | ||
1784 | cc.wmu.Lock() | ||
1785 | if !serverSentStreamEnd { | ||
1786 | cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel) | ||
1787 | cs.didReset = true | ||
1788 | } | ||
1789 | // Return connection-level flow control. | ||
1790 | if unread > 0 { | ||
1791 | cc.inflow.add(int32(unread)) | ||
1792 | cc.fr.WriteWindowUpdate(0, uint32(unread)) | ||
1793 | } | ||
1794 | cc.bw.Flush() | ||
1795 | cc.wmu.Unlock() | ||
1796 | cc.mu.Unlock() | ||
1797 | } | ||
1798 | |||
1799 | cs.bufPipe.BreakWithError(errClosedResponseBody) | ||
1800 | cc.forgetStreamID(cs.ID) | ||
1801 | return nil | ||
1802 | } | ||
1803 | |||
1804 | func (rl *clientConnReadLoop) processData(f *DataFrame) error { | ||
1805 | cc := rl.cc | ||
1806 | cs := cc.streamByID(f.StreamID, f.StreamEnded()) | ||
1807 | data := f.Data() | ||
1808 | if cs == nil { | ||
1809 | cc.mu.Lock() | ||
1810 | neverSent := cc.nextStreamID | ||
1811 | cc.mu.Unlock() | ||
1812 | if f.StreamID >= neverSent { | ||
1813 | // We never asked for this. | ||
1814 | cc.logf("http2: Transport received unsolicited DATA frame; closing connection") | ||
1815 | return ConnectionError(ErrCodeProtocol) | ||
1816 | } | ||
1817 | // We probably did ask for this, but canceled. Just ignore it. | ||
1818 | // TODO: be stricter here? only silently ignore things which | ||
1819 | // we canceled, but not things which were closed normally | ||
1820 | // by the peer? Tough without accumulating too much state. | ||
1821 | |||
1822 | // But at least return their flow control: | ||
1823 | if f.Length > 0 { | ||
1824 | cc.mu.Lock() | ||
1825 | cc.inflow.add(int32(f.Length)) | ||
1826 | cc.mu.Unlock() | ||
1827 | |||
1828 | cc.wmu.Lock() | ||
1829 | cc.fr.WriteWindowUpdate(0, uint32(f.Length)) | ||
1830 | cc.bw.Flush() | ||
1831 | cc.wmu.Unlock() | ||
1832 | } | ||
1833 | return nil | ||
1834 | } | ||
1835 | if !cs.firstByte { | ||
1836 | cc.logf("protocol error: received DATA before a HEADERS frame") | ||
1837 | rl.endStreamError(cs, StreamError{ | ||
1838 | StreamID: f.StreamID, | ||
1839 | Code: ErrCodeProtocol, | ||
1840 | }) | ||
1841 | return nil | ||
1842 | } | ||
1843 | if f.Length > 0 { | ||
1844 | // Check connection-level flow control. | ||
1845 | cc.mu.Lock() | ||
1846 | if cs.inflow.available() >= int32(f.Length) { | ||
1847 | cs.inflow.take(int32(f.Length)) | ||
1848 | } else { | ||
1849 | cc.mu.Unlock() | ||
1850 | return ConnectionError(ErrCodeFlowControl) | ||
1851 | } | ||
1852 | // Return any padded flow control now, since we won't | ||
1853 | // refund it later on body reads. | ||
1854 | var refund int | ||
1855 | if pad := int(f.Length) - len(data); pad > 0 { | ||
1856 | refund += pad | ||
1857 | } | ||
1858 | // Return len(data) now if the stream is already closed, | ||
1859 | // since data will never be read. | ||
1860 | didReset := cs.didReset | ||
1861 | if didReset { | ||
1862 | refund += len(data) | ||
1863 | } | ||
1864 | if refund > 0 { | ||
1865 | cc.inflow.add(int32(refund)) | ||
1866 | cc.wmu.Lock() | ||
1867 | cc.fr.WriteWindowUpdate(0, uint32(refund)) | ||
1868 | if !didReset { | ||
1869 | cs.inflow.add(int32(refund)) | ||
1870 | cc.fr.WriteWindowUpdate(cs.ID, uint32(refund)) | ||
1871 | } | ||
1872 | cc.bw.Flush() | ||
1873 | cc.wmu.Unlock() | ||
1874 | } | ||
1875 | cc.mu.Unlock() | ||
1876 | |||
1877 | if len(data) > 0 && !didReset { | ||
1878 | if _, err := cs.bufPipe.Write(data); err != nil { | ||
1879 | rl.endStreamError(cs, err) | ||
1880 | return err | ||
1881 | } | ||
1882 | } | ||
1883 | } | ||
1884 | |||
1885 | if f.StreamEnded() { | ||
1886 | rl.endStream(cs) | ||
1887 | } | ||
1888 | return nil | ||
1889 | } | ||
1890 | |||
1891 | var errInvalidTrailers = errors.New("http2: invalid trailers") | ||
1892 | |||
1893 | func (rl *clientConnReadLoop) endStream(cs *clientStream) { | ||
1894 | // TODO: check that any declared content-length matches, like | ||
1895 | // server.go's (*stream).endStream method. | ||
1896 | rl.endStreamError(cs, nil) | ||
1897 | } | ||
1898 | |||
1899 | func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) { | ||
1900 | var code func() | ||
1901 | if err == nil { | ||
1902 | err = io.EOF | ||
1903 | code = cs.copyTrailers | ||
1904 | } | ||
1905 | cs.bufPipe.closeWithErrorAndCode(err, code) | ||
1906 | delete(rl.activeRes, cs.ID) | ||
1907 | if isConnectionCloseRequest(cs.req) { | ||
1908 | rl.closeWhenIdle = true | ||
1909 | } | ||
1910 | |||
1911 | select { | ||
1912 | case cs.resc <- resAndError{err: err}: | ||
1913 | default: | ||
1914 | } | ||
1915 | } | ||
1916 | |||
1917 | func (cs *clientStream) copyTrailers() { | ||
1918 | for k, vv := range cs.trailer { | ||
1919 | t := cs.resTrailer | ||
1920 | if *t == nil { | ||
1921 | *t = make(http.Header) | ||
1922 | } | ||
1923 | (*t)[k] = vv | ||
1924 | } | ||
1925 | } | ||
1926 | |||
1927 | func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error { | ||
1928 | cc := rl.cc | ||
1929 | cc.t.connPool().MarkDead(cc) | ||
1930 | if f.ErrCode != 0 { | ||
1931 | // TODO: deal with GOAWAY more. particularly the error code | ||
1932 | cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode) | ||
1933 | } | ||
1934 | cc.setGoAway(f) | ||
1935 | return nil | ||
1936 | } | ||
1937 | |||
1938 | func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error { | ||
1939 | cc := rl.cc | ||
1940 | cc.mu.Lock() | ||
1941 | defer cc.mu.Unlock() | ||
1942 | |||
1943 | if f.IsAck() { | ||
1944 | if cc.wantSettingsAck { | ||
1945 | cc.wantSettingsAck = false | ||
1946 | return nil | ||
1947 | } | ||
1948 | return ConnectionError(ErrCodeProtocol) | ||
1949 | } | ||
1950 | |||
1951 | err := f.ForeachSetting(func(s Setting) error { | ||
1952 | switch s.ID { | ||
1953 | case SettingMaxFrameSize: | ||
1954 | cc.maxFrameSize = s.Val | ||
1955 | case SettingMaxConcurrentStreams: | ||
1956 | cc.maxConcurrentStreams = s.Val | ||
1957 | case SettingMaxHeaderListSize: | ||
1958 | cc.peerMaxHeaderListSize = uint64(s.Val) | ||
1959 | case SettingInitialWindowSize: | ||
1960 | // Values above the maximum flow-control | ||
1961 | // window size of 2^31-1 MUST be treated as a | ||
1962 | // connection error (Section 5.4.1) of type | ||
1963 | // FLOW_CONTROL_ERROR. | ||
1964 | if s.Val > math.MaxInt32 { | ||
1965 | return ConnectionError(ErrCodeFlowControl) | ||
1966 | } | ||
1967 | |||
1968 | // Adjust flow control of currently-open | ||
1969 | // frames by the difference of the old initial | ||
1970 | // window size and this one. | ||
1971 | delta := int32(s.Val) - int32(cc.initialWindowSize) | ||
1972 | for _, cs := range cc.streams { | ||
1973 | cs.flow.add(delta) | ||
1974 | } | ||
1975 | cc.cond.Broadcast() | ||
1976 | |||
1977 | cc.initialWindowSize = s.Val | ||
1978 | default: | ||
1979 | // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably. | ||
1980 | cc.vlogf("Unhandled Setting: %v", s) | ||
1981 | } | ||
1982 | return nil | ||
1983 | }) | ||
1984 | if err != nil { | ||
1985 | return err | ||
1986 | } | ||
1987 | |||
1988 | cc.wmu.Lock() | ||
1989 | defer cc.wmu.Unlock() | ||
1990 | |||
1991 | cc.fr.WriteSettingsAck() | ||
1992 | cc.bw.Flush() | ||
1993 | return cc.werr | ||
1994 | } | ||
1995 | |||
1996 | func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error { | ||
1997 | cc := rl.cc | ||
1998 | cs := cc.streamByID(f.StreamID, false) | ||
1999 | if f.StreamID != 0 && cs == nil { | ||
2000 | return nil | ||
2001 | } | ||
2002 | |||
2003 | cc.mu.Lock() | ||
2004 | defer cc.mu.Unlock() | ||
2005 | |||
2006 | fl := &cc.flow | ||
2007 | if cs != nil { | ||
2008 | fl = &cs.flow | ||
2009 | } | ||
2010 | if !fl.add(int32(f.Increment)) { | ||
2011 | return ConnectionError(ErrCodeFlowControl) | ||
2012 | } | ||
2013 | cc.cond.Broadcast() | ||
2014 | return nil | ||
2015 | } | ||
2016 | |||
2017 | func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error { | ||
2018 | cs := rl.cc.streamByID(f.StreamID, true) | ||
2019 | if cs == nil { | ||
2020 | // TODO: return error if server tries to RST_STEAM an idle stream | ||
2021 | return nil | ||
2022 | } | ||
2023 | select { | ||
2024 | case <-cs.peerReset: | ||
2025 | // Already reset. | ||
2026 | // This is the only goroutine | ||
2027 | // which closes this, so there | ||
2028 | // isn't a race. | ||
2029 | default: | ||
2030 | err := streamError(cs.ID, f.ErrCode) | ||
2031 | cs.resetErr = err | ||
2032 | close(cs.peerReset) | ||
2033 | cs.bufPipe.CloseWithError(err) | ||
2034 | cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl | ||
2035 | } | ||
2036 | delete(rl.activeRes, cs.ID) | ||
2037 | return nil | ||
2038 | } | ||
2039 | |||
2040 | // Ping sends a PING frame to the server and waits for the ack. | ||
2041 | // Public implementation is in go17.go and not_go17.go | ||
2042 | func (cc *ClientConn) ping(ctx contextContext) error { | ||
2043 | c := make(chan struct{}) | ||
2044 | // Generate a random payload | ||
2045 | var p [8]byte | ||
2046 | for { | ||
2047 | if _, err := rand.Read(p[:]); err != nil { | ||
2048 | return err | ||
2049 | } | ||
2050 | cc.mu.Lock() | ||
2051 | // check for dup before insert | ||
2052 | if _, found := cc.pings[p]; !found { | ||
2053 | cc.pings[p] = c | ||
2054 | cc.mu.Unlock() | ||
2055 | break | ||
2056 | } | ||
2057 | cc.mu.Unlock() | ||
2058 | } | ||
2059 | cc.wmu.Lock() | ||
2060 | if err := cc.fr.WritePing(false, p); err != nil { | ||
2061 | cc.wmu.Unlock() | ||
2062 | return err | ||
2063 | } | ||
2064 | if err := cc.bw.Flush(); err != nil { | ||
2065 | cc.wmu.Unlock() | ||
2066 | return err | ||
2067 | } | ||
2068 | cc.wmu.Unlock() | ||
2069 | select { | ||
2070 | case <-c: | ||
2071 | return nil | ||
2072 | case <-ctx.Done(): | ||
2073 | return ctx.Err() | ||
2074 | case <-cc.readerDone: | ||
2075 | // connection closed | ||
2076 | return cc.readerErr | ||
2077 | } | ||
2078 | } | ||
2079 | |||
2080 | func (rl *clientConnReadLoop) processPing(f *PingFrame) error { | ||
2081 | if f.IsAck() { | ||
2082 | cc := rl.cc | ||
2083 | cc.mu.Lock() | ||
2084 | defer cc.mu.Unlock() | ||
2085 | // If ack, notify listener if any | ||
2086 | if c, ok := cc.pings[f.Data]; ok { | ||
2087 | close(c) | ||
2088 | delete(cc.pings, f.Data) | ||
2089 | } | ||
2090 | return nil | ||
2091 | } | ||
2092 | cc := rl.cc | ||
2093 | cc.wmu.Lock() | ||
2094 | defer cc.wmu.Unlock() | ||
2095 | if err := cc.fr.WritePing(true, f.Data); err != nil { | ||
2096 | return err | ||
2097 | } | ||
2098 | return cc.bw.Flush() | ||
2099 | } | ||
2100 | |||
2101 | func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error { | ||
2102 | // We told the peer we don't want them. | ||
2103 | // Spec says: | ||
2104 | // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH | ||
2105 | // setting of the peer endpoint is set to 0. An endpoint that | ||
2106 | // has set this setting and has received acknowledgement MUST | ||
2107 | // treat the receipt of a PUSH_PROMISE frame as a connection | ||
2108 | // error (Section 5.4.1) of type PROTOCOL_ERROR." | ||
2109 | return ConnectionError(ErrCodeProtocol) | ||
2110 | } | ||
2111 | |||
2112 | func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) { | ||
2113 | // TODO: map err to more interesting error codes, once the | ||
2114 | // HTTP community comes up with some. But currently for | ||
2115 | // RST_STREAM there's no equivalent to GOAWAY frame's debug | ||
2116 | // data, and the error codes are all pretty vague ("cancel"). | ||
2117 | cc.wmu.Lock() | ||
2118 | cc.fr.WriteRSTStream(streamID, code) | ||
2119 | cc.bw.Flush() | ||
2120 | cc.wmu.Unlock() | ||
2121 | } | ||
2122 | |||
2123 | var ( | ||
2124 | errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit") | ||
2125 | errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit") | ||
2126 | errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers") | ||
2127 | ) | ||
2128 | |||
2129 | func (cc *ClientConn) logf(format string, args ...interface{}) { | ||
2130 | cc.t.logf(format, args...) | ||
2131 | } | ||
2132 | |||
2133 | func (cc *ClientConn) vlogf(format string, args ...interface{}) { | ||
2134 | cc.t.vlogf(format, args...) | ||
2135 | } | ||
2136 | |||
2137 | func (t *Transport) vlogf(format string, args ...interface{}) { | ||
2138 | if VerboseLogs { | ||
2139 | t.logf(format, args...) | ||
2140 | } | ||
2141 | } | ||
2142 | |||
2143 | func (t *Transport) logf(format string, args ...interface{}) { | ||
2144 | log.Printf(format, args...) | ||
2145 | } | ||
2146 | |||
2147 | var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil)) | ||
2148 | |||
2149 | func strSliceContains(ss []string, s string) bool { | ||
2150 | for _, v := range ss { | ||
2151 | if v == s { | ||
2152 | return true | ||
2153 | } | ||
2154 | } | ||
2155 | return false | ||
2156 | } | ||
2157 | |||
2158 | type erringRoundTripper struct{ err error } | ||
2159 | |||
2160 | func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err } | ||
2161 | |||
2162 | // gzipReader wraps a response body so it can lazily | ||
2163 | // call gzip.NewReader on the first call to Read | ||
2164 | type gzipReader struct { | ||
2165 | body io.ReadCloser // underlying Response.Body | ||
2166 | zr *gzip.Reader // lazily-initialized gzip reader | ||
2167 | zerr error // sticky error | ||
2168 | } | ||
2169 | |||
2170 | func (gz *gzipReader) Read(p []byte) (n int, err error) { | ||
2171 | if gz.zerr != nil { | ||
2172 | return 0, gz.zerr | ||
2173 | } | ||
2174 | if gz.zr == nil { | ||
2175 | gz.zr, err = gzip.NewReader(gz.body) | ||
2176 | if err != nil { | ||
2177 | gz.zerr = err | ||
2178 | return 0, err | ||
2179 | } | ||
2180 | } | ||
2181 | return gz.zr.Read(p) | ||
2182 | } | ||
2183 | |||
2184 | func (gz *gzipReader) Close() error { | ||
2185 | return gz.body.Close() | ||
2186 | } | ||
2187 | |||
2188 | type errorReader struct{ err error } | ||
2189 | |||
2190 | func (r errorReader) Read(p []byte) (int, error) { return 0, r.err } | ||
2191 | |||
2192 | // bodyWriterState encapsulates various state around the Transport's writing | ||
2193 | // of the request body, particularly regarding doing delayed writes of the body | ||
2194 | // when the request contains "Expect: 100-continue". | ||
2195 | type bodyWriterState struct { | ||
2196 | cs *clientStream | ||
2197 | timer *time.Timer // if non-nil, we're doing a delayed write | ||
2198 | fnonce *sync.Once // to call fn with | ||
2199 | fn func() // the code to run in the goroutine, writing the body | ||
2200 | resc chan error // result of fn's execution | ||
2201 | delay time.Duration // how long we should delay a delayed write for | ||
2202 | } | ||
2203 | |||
2204 | func (t *Transport) getBodyWriterState(cs *clientStream, body io.Reader) (s bodyWriterState) { | ||
2205 | s.cs = cs | ||
2206 | if body == nil { | ||
2207 | return | ||
2208 | } | ||
2209 | resc := make(chan error, 1) | ||
2210 | s.resc = resc | ||
2211 | s.fn = func() { | ||
2212 | cs.cc.mu.Lock() | ||
2213 | cs.startedWrite = true | ||
2214 | cs.cc.mu.Unlock() | ||
2215 | resc <- cs.writeRequestBody(body, cs.req.Body) | ||
2216 | } | ||
2217 | s.delay = t.expectContinueTimeout() | ||
2218 | if s.delay == 0 || | ||
2219 | !httplex.HeaderValuesContainsToken( | ||
2220 | cs.req.Header["Expect"], | ||
2221 | "100-continue") { | ||
2222 | return | ||
2223 | } | ||
2224 | s.fnonce = new(sync.Once) | ||
2225 | |||
2226 | // Arm the timer with a very large duration, which we'll | ||
2227 | // intentionally lower later. It has to be large now because | ||
2228 | // we need a handle to it before writing the headers, but the | ||
2229 | // s.delay value is defined to not start until after the | ||
2230 | // request headers were written. | ||
2231 | const hugeDuration = 365 * 24 * time.Hour | ||
2232 | s.timer = time.AfterFunc(hugeDuration, func() { | ||
2233 | s.fnonce.Do(s.fn) | ||
2234 | }) | ||
2235 | return | ||
2236 | } | ||
2237 | |||
2238 | func (s bodyWriterState) cancel() { | ||
2239 | if s.timer != nil { | ||
2240 | s.timer.Stop() | ||
2241 | } | ||
2242 | } | ||
2243 | |||
2244 | func (s bodyWriterState) on100() { | ||
2245 | if s.timer == nil { | ||
2246 | // If we didn't do a delayed write, ignore the server's | ||
2247 | // bogus 100 continue response. | ||
2248 | return | ||
2249 | } | ||
2250 | s.timer.Stop() | ||
2251 | go func() { s.fnonce.Do(s.fn) }() | ||
2252 | } | ||
2253 | |||
2254 | // scheduleBodyWrite starts writing the body, either immediately (in | ||
2255 | // the common case) or after the delay timeout. It should not be | ||
2256 | // called until after the headers have been written. | ||
2257 | func (s bodyWriterState) scheduleBodyWrite() { | ||
2258 | if s.timer == nil { | ||
2259 | // We're not doing a delayed write (see | ||
2260 | // getBodyWriterState), so just start the writing | ||
2261 | // goroutine immediately. | ||
2262 | go s.fn() | ||
2263 | return | ||
2264 | } | ||
2265 | traceWait100Continue(s.cs.trace) | ||
2266 | if s.timer.Stop() { | ||
2267 | s.timer.Reset(s.delay) | ||
2268 | } | ||
2269 | } | ||
2270 | |||
2271 | // isConnectionCloseRequest reports whether req should use its own | ||
2272 | // connection for a single request and then close the connection. | ||
2273 | func isConnectionCloseRequest(req *http.Request) bool { | ||
2274 | return req.Close || httplex.HeaderValuesContainsToken(req.Header["Connection"], "close") | ||
2275 | } | ||