]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | // Copyright 2015 The Go Authors. All rights reserved. |
2 | // Use of this source code is governed by a BSD-style | |
3 | // license that can be found in the LICENSE file. | |
4 | ||
5 | // Transport code. | |
6 | ||
7 | package http2 | |
8 | ||
9 | import ( | |
10 | "bufio" | |
11 | "bytes" | |
12 | "compress/gzip" | |
107c1cdb | 13 | "context" |
15c0b25d AP |
14 | "crypto/rand" |
15 | "crypto/tls" | |
16 | "errors" | |
17 | "fmt" | |
18 | "io" | |
19 | "io/ioutil" | |
20 | "log" | |
21 | "math" | |
22 | mathrand "math/rand" | |
23 | "net" | |
24 | "net/http" | |
107c1cdb ND |
25 | "net/http/httptrace" |
26 | "net/textproto" | |
15c0b25d AP |
27 | "sort" |
28 | "strconv" | |
29 | "strings" | |
30 | "sync" | |
31 | "time" | |
32 | ||
107c1cdb | 33 | "golang.org/x/net/http/httpguts" |
15c0b25d AP |
34 | "golang.org/x/net/http2/hpack" |
35 | "golang.org/x/net/idna" | |
15c0b25d AP |
36 | ) |
37 | ||
38 | const ( | |
39 | // transportDefaultConnFlow is how many connection-level flow control | |
40 | // tokens we give the server at start-up, past the default 64k. | |
41 | transportDefaultConnFlow = 1 << 30 | |
42 | ||
43 | // transportDefaultStreamFlow is how many stream-level flow | |
44 | // control tokens we announce to the peer, and how many bytes | |
45 | // we buffer per stream. | |
46 | transportDefaultStreamFlow = 4 << 20 | |
47 | ||
48 | // transportDefaultStreamMinRefresh is the minimum number of bytes we'll send | |
49 | // a stream-level WINDOW_UPDATE for at a time. | |
50 | transportDefaultStreamMinRefresh = 4 << 10 | |
51 | ||
52 | defaultUserAgent = "Go-http-client/2.0" | |
53 | ) | |
54 | ||
55 | // Transport is an HTTP/2 Transport. | |
56 | // | |
57 | // A Transport internally caches connections to servers. It is safe | |
58 | // for concurrent use by multiple goroutines. | |
59 | type Transport struct { | |
60 | // DialTLS specifies an optional dial function for creating | |
61 | // TLS connections for requests. | |
62 | // | |
63 | // If DialTLS is nil, tls.Dial is used. | |
64 | // | |
65 | // If the returned net.Conn has a ConnectionState method like tls.Conn, | |
66 | // it will be used to set http.Response.TLS. | |
67 | DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error) | |
68 | ||
69 | // TLSClientConfig specifies the TLS configuration to use with | |
70 | // tls.Client. If nil, the default configuration is used. | |
71 | TLSClientConfig *tls.Config | |
72 | ||
73 | // ConnPool optionally specifies an alternate connection pool to use. | |
74 | // If nil, the default is used. | |
75 | ConnPool ClientConnPool | |
76 | ||
77 | // DisableCompression, if true, prevents the Transport from | |
78 | // requesting compression with an "Accept-Encoding: gzip" | |
79 | // request header when the Request contains no existing | |
80 | // Accept-Encoding value. If the Transport requests gzip on | |
81 | // its own and gets a gzipped response, it's transparently | |
82 | // decoded in the Response.Body. However, if the user | |
83 | // explicitly requested gzip it is not automatically | |
84 | // uncompressed. | |
85 | DisableCompression bool | |
86 | ||
87 | // AllowHTTP, if true, permits HTTP/2 requests using the insecure, | |
88 | // plain-text "http" scheme. Note that this does not enable h2c support. | |
89 | AllowHTTP bool | |
90 | ||
91 | // MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to | |
92 | // send in the initial settings frame. It is how many bytes | |
93 | // of response headers are allowed. Unlike the http2 spec, zero here | |
94 | // means to use a default limit (currently 10MB). If you actually | |
95 | // want to advertise an ulimited value to the peer, Transport | |
96 | // interprets the highest possible value here (0xffffffff or 1<<32-1) | |
97 | // to mean no limit. | |
98 | MaxHeaderListSize uint32 | |
99 | ||
107c1cdb ND |
100 | // StrictMaxConcurrentStreams controls whether the server's |
101 | // SETTINGS_MAX_CONCURRENT_STREAMS should be respected | |
102 | // globally. If false, new TCP connections are created to the | |
103 | // server as needed to keep each under the per-connection | |
104 | // SETTINGS_MAX_CONCURRENT_STREAMS limit. If true, the | |
105 | // server's SETTINGS_MAX_CONCURRENT_STREAMS is interpreted as | |
106 | // a global limit and callers of RoundTrip block when needed, | |
107 | // waiting for their turn. | |
108 | StrictMaxConcurrentStreams bool | |
109 | ||
15c0b25d AP |
110 | // t1, if non-nil, is the standard library Transport using |
111 | // this transport. Its settings are used (but not its | |
112 | // RoundTrip method, etc). | |
113 | t1 *http.Transport | |
114 | ||
115 | connPoolOnce sync.Once | |
116 | connPoolOrDef ClientConnPool // non-nil version of ConnPool | |
117 | } | |
118 | ||
119 | func (t *Transport) maxHeaderListSize() uint32 { | |
120 | if t.MaxHeaderListSize == 0 { | |
121 | return 10 << 20 | |
122 | } | |
123 | if t.MaxHeaderListSize == 0xffffffff { | |
124 | return 0 | |
125 | } | |
126 | return t.MaxHeaderListSize | |
127 | } | |
128 | ||
129 | func (t *Transport) disableCompression() bool { | |
130 | return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression) | |
131 | } | |
132 | ||
15c0b25d | 133 | // ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2. |
107c1cdb | 134 | // It returns an error if t1 has already been HTTP/2-enabled. |
15c0b25d | 135 | func ConfigureTransport(t1 *http.Transport) error { |
107c1cdb | 136 | _, err := configureTransport(t1) |
15c0b25d AP |
137 | return err |
138 | } | |
139 | ||
107c1cdb ND |
140 | func configureTransport(t1 *http.Transport) (*Transport, error) { |
141 | connPool := new(clientConnPool) | |
142 | t2 := &Transport{ | |
143 | ConnPool: noDialClientConnPool{connPool}, | |
144 | t1: t1, | |
145 | } | |
146 | connPool.t = t2 | |
147 | if err := registerHTTPSProtocol(t1, noDialH2RoundTripper{t2}); err != nil { | |
148 | return nil, err | |
149 | } | |
150 | if t1.TLSClientConfig == nil { | |
151 | t1.TLSClientConfig = new(tls.Config) | |
152 | } | |
153 | if !strSliceContains(t1.TLSClientConfig.NextProtos, "h2") { | |
154 | t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...) | |
155 | } | |
156 | if !strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") { | |
157 | t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1") | |
158 | } | |
159 | upgradeFn := func(authority string, c *tls.Conn) http.RoundTripper { | |
160 | addr := authorityAddr("https", authority) | |
161 | if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil { | |
162 | go c.Close() | |
163 | return erringRoundTripper{err} | |
164 | } else if !used { | |
165 | // Turns out we don't need this c. | |
166 | // For example, two goroutines made requests to the same host | |
167 | // at the same time, both kicking off TCP dials. (since protocol | |
168 | // was unknown) | |
169 | go c.Close() | |
170 | } | |
171 | return t2 | |
172 | } | |
173 | if m := t1.TLSNextProto; len(m) == 0 { | |
174 | t1.TLSNextProto = map[string]func(string, *tls.Conn) http.RoundTripper{ | |
175 | "h2": upgradeFn, | |
176 | } | |
177 | } else { | |
178 | m["h2"] = upgradeFn | |
179 | } | |
180 | return t2, nil | |
181 | } | |
182 | ||
15c0b25d AP |
183 | func (t *Transport) connPool() ClientConnPool { |
184 | t.connPoolOnce.Do(t.initConnPool) | |
185 | return t.connPoolOrDef | |
186 | } | |
187 | ||
188 | func (t *Transport) initConnPool() { | |
189 | if t.ConnPool != nil { | |
190 | t.connPoolOrDef = t.ConnPool | |
191 | } else { | |
192 | t.connPoolOrDef = &clientConnPool{t: t} | |
193 | } | |
194 | } | |
195 | ||
196 | // ClientConn is the state of a single HTTP/2 client connection to an | |
197 | // HTTP/2 server. | |
198 | type ClientConn struct { | |
199 | t *Transport | |
200 | tconn net.Conn // usually *tls.Conn, except specialized impls | |
201 | tlsState *tls.ConnectionState // nil only for specialized impls | |
202 | singleUse bool // whether being used for a single http.Request | |
203 | ||
204 | // readLoop goroutine fields: | |
205 | readerDone chan struct{} // closed on error | |
206 | readerErr error // set before readerDone is closed | |
207 | ||
208 | idleTimeout time.Duration // or 0 for never | |
209 | idleTimer *time.Timer | |
210 | ||
211 | mu sync.Mutex // guards following | |
212 | cond *sync.Cond // hold mu; broadcast on flow/closed changes | |
213 | flow flow // our conn-level flow control quota (cs.flow is per stream) | |
214 | inflow flow // peer's conn-level flow control | |
107c1cdb | 215 | closing bool |
15c0b25d AP |
216 | closed bool |
217 | wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back | |
218 | goAway *GoAwayFrame // if non-nil, the GoAwayFrame we received | |
219 | goAwayDebug string // goAway frame's debug data, retained as a string | |
220 | streams map[uint32]*clientStream // client-initiated | |
221 | nextStreamID uint32 | |
222 | pendingRequests int // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams | |
223 | pings map[[8]byte]chan struct{} // in flight ping data to notification channel | |
224 | bw *bufio.Writer | |
225 | br *bufio.Reader | |
226 | fr *Framer | |
227 | lastActive time.Time | |
228 | // Settings from peer: (also guarded by mu) | |
229 | maxFrameSize uint32 | |
230 | maxConcurrentStreams uint32 | |
231 | peerMaxHeaderListSize uint64 | |
232 | initialWindowSize uint32 | |
233 | ||
234 | hbuf bytes.Buffer // HPACK encoder writes into this | |
235 | henc *hpack.Encoder | |
236 | freeBuf [][]byte | |
237 | ||
238 | wmu sync.Mutex // held while writing; acquire AFTER mu if holding both | |
239 | werr error // first write error that has occurred | |
240 | } | |
241 | ||
242 | // clientStream is the state for a single HTTP/2 stream. One of these | |
243 | // is created for each Transport.RoundTrip call. | |
244 | type clientStream struct { | |
245 | cc *ClientConn | |
246 | req *http.Request | |
107c1cdb | 247 | trace *httptrace.ClientTrace // or nil |
15c0b25d AP |
248 | ID uint32 |
249 | resc chan resAndError | |
250 | bufPipe pipe // buffered pipe with the flow-controlled response payload | |
251 | startedWrite bool // started request body write; guarded by cc.mu | |
252 | requestedGzip bool | |
253 | on100 func() // optional code to run if get a 100 continue response | |
254 | ||
255 | flow flow // guarded by cc.mu | |
256 | inflow flow // guarded by cc.mu | |
257 | bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read | |
258 | readErr error // sticky read error; owned by transportResponseBody.Read | |
259 | stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu | |
260 | didReset bool // whether we sent a RST_STREAM to the server; guarded by cc.mu | |
261 | ||
262 | peerReset chan struct{} // closed on peer reset | |
263 | resetErr error // populated before peerReset is closed | |
264 | ||
265 | done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu | |
266 | ||
267 | // owned by clientConnReadLoop: | |
107c1cdb ND |
268 | firstByte bool // got the first response byte |
269 | pastHeaders bool // got first MetaHeadersFrame (actual headers) | |
270 | pastTrailers bool // got optional second MetaHeadersFrame (trailers) | |
271 | num1xx uint8 // number of 1xx responses seen | |
15c0b25d AP |
272 | |
273 | trailer http.Header // accumulated trailers | |
274 | resTrailer *http.Header // client's Response.Trailer | |
275 | } | |
276 | ||
277 | // awaitRequestCancel waits for the user to cancel a request or for the done | |
278 | // channel to be signaled. A non-nil error is returned only if the request was | |
279 | // canceled. | |
280 | func awaitRequestCancel(req *http.Request, done <-chan struct{}) error { | |
107c1cdb | 281 | ctx := req.Context() |
15c0b25d AP |
282 | if req.Cancel == nil && ctx.Done() == nil { |
283 | return nil | |
284 | } | |
285 | select { | |
286 | case <-req.Cancel: | |
287 | return errRequestCanceled | |
288 | case <-ctx.Done(): | |
289 | return ctx.Err() | |
290 | case <-done: | |
291 | return nil | |
292 | } | |
293 | } | |
294 | ||
107c1cdb ND |
295 | var got1xxFuncForTests func(int, textproto.MIMEHeader) error |
296 | ||
297 | // get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func, | |
298 | // if any. It returns nil if not set or if the Go version is too old. | |
299 | func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error { | |
300 | if fn := got1xxFuncForTests; fn != nil { | |
301 | return fn | |
302 | } | |
303 | return traceGot1xxResponseFunc(cs.trace) | |
304 | } | |
305 | ||
15c0b25d AP |
306 | // awaitRequestCancel waits for the user to cancel a request, its context to |
307 | // expire, or for the request to be done (any way it might be removed from the | |
308 | // cc.streams map: peer reset, successful completion, TCP connection breakage, | |
309 | // etc). If the request is canceled, then cs will be canceled and closed. | |
310 | func (cs *clientStream) awaitRequestCancel(req *http.Request) { | |
311 | if err := awaitRequestCancel(req, cs.done); err != nil { | |
312 | cs.cancelStream() | |
313 | cs.bufPipe.CloseWithError(err) | |
314 | } | |
315 | } | |
316 | ||
317 | func (cs *clientStream) cancelStream() { | |
318 | cc := cs.cc | |
319 | cc.mu.Lock() | |
320 | didReset := cs.didReset | |
321 | cs.didReset = true | |
322 | cc.mu.Unlock() | |
323 | ||
324 | if !didReset { | |
325 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
326 | cc.forgetStreamID(cs.ID) | |
327 | } | |
328 | } | |
329 | ||
330 | // checkResetOrDone reports any error sent in a RST_STREAM frame by the | |
331 | // server, or errStreamClosed if the stream is complete. | |
332 | func (cs *clientStream) checkResetOrDone() error { | |
333 | select { | |
334 | case <-cs.peerReset: | |
335 | return cs.resetErr | |
336 | case <-cs.done: | |
337 | return errStreamClosed | |
338 | default: | |
339 | return nil | |
340 | } | |
341 | } | |
342 | ||
107c1cdb ND |
343 | func (cs *clientStream) getStartedWrite() bool { |
344 | cc := cs.cc | |
345 | cc.mu.Lock() | |
346 | defer cc.mu.Unlock() | |
347 | return cs.startedWrite | |
348 | } | |
349 | ||
15c0b25d AP |
350 | func (cs *clientStream) abortRequestBodyWrite(err error) { |
351 | if err == nil { | |
352 | panic("nil error") | |
353 | } | |
354 | cc := cs.cc | |
355 | cc.mu.Lock() | |
356 | cs.stopReqBody = err | |
357 | cc.cond.Broadcast() | |
358 | cc.mu.Unlock() | |
359 | } | |
360 | ||
361 | type stickyErrWriter struct { | |
362 | w io.Writer | |
363 | err *error | |
364 | } | |
365 | ||
366 | func (sew stickyErrWriter) Write(p []byte) (n int, err error) { | |
367 | if *sew.err != nil { | |
368 | return 0, *sew.err | |
369 | } | |
370 | n, err = sew.w.Write(p) | |
371 | *sew.err = err | |
372 | return | |
373 | } | |
374 | ||
107c1cdb ND |
375 | // noCachedConnError is the concrete type of ErrNoCachedConn, which |
376 | // needs to be detected by net/http regardless of whether it's its | |
377 | // bundled version (in h2_bundle.go with a rewritten type name) or | |
378 | // from a user's x/net/http2. As such, as it has a unique method name | |
379 | // (IsHTTP2NoCachedConnError) that net/http sniffs for via func | |
380 | // isNoCachedConnError. | |
381 | type noCachedConnError struct{} | |
382 | ||
383 | func (noCachedConnError) IsHTTP2NoCachedConnError() {} | |
384 | func (noCachedConnError) Error() string { return "http2: no cached connection was available" } | |
385 | ||
386 | // isNoCachedConnError reports whether err is of type noCachedConnError | |
387 | // or its equivalent renamed type in net/http2's h2_bundle.go. Both types | |
388 | // may coexist in the same running program. | |
389 | func isNoCachedConnError(err error) bool { | |
390 | _, ok := err.(interface{ IsHTTP2NoCachedConnError() }) | |
391 | return ok | |
392 | } | |
393 | ||
394 | var ErrNoCachedConn error = noCachedConnError{} | |
15c0b25d AP |
395 | |
396 | // RoundTripOpt are options for the Transport.RoundTripOpt method. | |
397 | type RoundTripOpt struct { | |
398 | // OnlyCachedConn controls whether RoundTripOpt may | |
399 | // create a new TCP connection. If set true and | |
400 | // no cached connection is available, RoundTripOpt | |
401 | // will return ErrNoCachedConn. | |
402 | OnlyCachedConn bool | |
403 | } | |
404 | ||
405 | func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) { | |
406 | return t.RoundTripOpt(req, RoundTripOpt{}) | |
407 | } | |
408 | ||
409 | // authorityAddr returns a given authority (a host/IP, or host:port / ip:port) | |
410 | // and returns a host:port. The port 443 is added if needed. | |
411 | func authorityAddr(scheme string, authority string) (addr string) { | |
412 | host, port, err := net.SplitHostPort(authority) | |
413 | if err != nil { // authority didn't have a port | |
414 | port = "443" | |
415 | if scheme == "http" { | |
416 | port = "80" | |
417 | } | |
418 | host = authority | |
419 | } | |
420 | if a, err := idna.ToASCII(host); err == nil { | |
421 | host = a | |
422 | } | |
423 | // IPv6 address literal, without a port: | |
424 | if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") { | |
425 | return host + ":" + port | |
426 | } | |
427 | return net.JoinHostPort(host, port) | |
428 | } | |
429 | ||
430 | // RoundTripOpt is like RoundTrip, but takes options. | |
431 | func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) { | |
432 | if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) { | |
433 | return nil, errors.New("http2: unsupported scheme") | |
434 | } | |
435 | ||
436 | addr := authorityAddr(req.URL.Scheme, req.URL.Host) | |
437 | for retry := 0; ; retry++ { | |
438 | cc, err := t.connPool().GetClientConn(req, addr) | |
439 | if err != nil { | |
440 | t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err) | |
441 | return nil, err | |
442 | } | |
443 | traceGotConn(req, cc) | |
107c1cdb | 444 | res, gotErrAfterReqBodyWrite, err := cc.roundTrip(req) |
15c0b25d | 445 | if err != nil && retry <= 6 { |
107c1cdb | 446 | if req, err = shouldRetryRequest(req, err, gotErrAfterReqBodyWrite); err == nil { |
15c0b25d AP |
447 | // After the first retry, do exponential backoff with 10% jitter. |
448 | if retry == 0 { | |
449 | continue | |
450 | } | |
451 | backoff := float64(uint(1) << (uint(retry) - 1)) | |
452 | backoff += backoff * (0.1 * mathrand.Float64()) | |
453 | select { | |
454 | case <-time.After(time.Second * time.Duration(backoff)): | |
455 | continue | |
107c1cdb ND |
456 | case <-req.Context().Done(): |
457 | return nil, req.Context().Err() | |
15c0b25d AP |
458 | } |
459 | } | |
460 | } | |
461 | if err != nil { | |
462 | t.vlogf("RoundTrip failure: %v", err) | |
463 | return nil, err | |
464 | } | |
465 | return res, nil | |
466 | } | |
467 | } | |
468 | ||
469 | // CloseIdleConnections closes any connections which were previously | |
470 | // connected from previous requests but are now sitting idle. | |
471 | // It does not interrupt any connections currently in use. | |
472 | func (t *Transport) CloseIdleConnections() { | |
473 | if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok { | |
474 | cp.closeIdleConnections() | |
475 | } | |
476 | } | |
477 | ||
478 | var ( | |
479 | errClientConnClosed = errors.New("http2: client conn is closed") | |
480 | errClientConnUnusable = errors.New("http2: client conn not usable") | |
481 | errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY") | |
482 | ) | |
483 | ||
15c0b25d AP |
484 | // shouldRetryRequest is called by RoundTrip when a request fails to get |
485 | // response headers. It is always called with a non-nil error. | |
486 | // It returns either a request to retry (either the same request, or a | |
487 | // modified clone), or an error if the request can't be replayed. | |
488 | func shouldRetryRequest(req *http.Request, err error, afterBodyWrite bool) (*http.Request, error) { | |
489 | if !canRetryError(err) { | |
490 | return nil, err | |
491 | } | |
15c0b25d AP |
492 | // If the Body is nil (or http.NoBody), it's safe to reuse |
493 | // this request and its Body. | |
107c1cdb | 494 | if req.Body == nil || req.Body == http.NoBody { |
15c0b25d AP |
495 | return req, nil |
496 | } | |
107c1cdb ND |
497 | |
498 | // If the request body can be reset back to its original | |
499 | // state via the optional req.GetBody, do that. | |
500 | if req.GetBody != nil { | |
501 | // TODO: consider a req.Body.Close here? or audit that all caller paths do? | |
502 | body, err := req.GetBody() | |
503 | if err != nil { | |
504 | return nil, err | |
505 | } | |
506 | newReq := *req | |
507 | newReq.Body = body | |
508 | return &newReq, nil | |
15c0b25d | 509 | } |
107c1cdb ND |
510 | |
511 | // The Request.Body can't reset back to the beginning, but we | |
512 | // don't seem to have started to read from it yet, so reuse | |
513 | // the request directly. The "afterBodyWrite" means the | |
514 | // bodyWrite process has started, which becomes true before | |
515 | // the first Read. | |
516 | if !afterBodyWrite { | |
517 | return req, nil | |
15c0b25d | 518 | } |
107c1cdb ND |
519 | |
520 | return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err) | |
15c0b25d AP |
521 | } |
522 | ||
523 | func canRetryError(err error) bool { | |
524 | if err == errClientConnUnusable || err == errClientConnGotGoAway { | |
525 | return true | |
526 | } | |
527 | if se, ok := err.(StreamError); ok { | |
528 | return se.Code == ErrCodeRefusedStream | |
529 | } | |
530 | return false | |
531 | } | |
532 | ||
533 | func (t *Transport) dialClientConn(addr string, singleUse bool) (*ClientConn, error) { | |
534 | host, _, err := net.SplitHostPort(addr) | |
535 | if err != nil { | |
536 | return nil, err | |
537 | } | |
538 | tconn, err := t.dialTLS()("tcp", addr, t.newTLSConfig(host)) | |
539 | if err != nil { | |
540 | return nil, err | |
541 | } | |
542 | return t.newClientConn(tconn, singleUse) | |
543 | } | |
544 | ||
545 | func (t *Transport) newTLSConfig(host string) *tls.Config { | |
546 | cfg := new(tls.Config) | |
547 | if t.TLSClientConfig != nil { | |
107c1cdb | 548 | *cfg = *t.TLSClientConfig.Clone() |
15c0b25d AP |
549 | } |
550 | if !strSliceContains(cfg.NextProtos, NextProtoTLS) { | |
551 | cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...) | |
552 | } | |
553 | if cfg.ServerName == "" { | |
554 | cfg.ServerName = host | |
555 | } | |
556 | return cfg | |
557 | } | |
558 | ||
559 | func (t *Transport) dialTLS() func(string, string, *tls.Config) (net.Conn, error) { | |
560 | if t.DialTLS != nil { | |
561 | return t.DialTLS | |
562 | } | |
563 | return t.dialTLSDefault | |
564 | } | |
565 | ||
566 | func (t *Transport) dialTLSDefault(network, addr string, cfg *tls.Config) (net.Conn, error) { | |
567 | cn, err := tls.Dial(network, addr, cfg) | |
568 | if err != nil { | |
569 | return nil, err | |
570 | } | |
571 | if err := cn.Handshake(); err != nil { | |
572 | return nil, err | |
573 | } | |
574 | if !cfg.InsecureSkipVerify { | |
575 | if err := cn.VerifyHostname(cfg.ServerName); err != nil { | |
576 | return nil, err | |
577 | } | |
578 | } | |
579 | state := cn.ConnectionState() | |
580 | if p := state.NegotiatedProtocol; p != NextProtoTLS { | |
581 | return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS) | |
582 | } | |
583 | if !state.NegotiatedProtocolIsMutual { | |
584 | return nil, errors.New("http2: could not negotiate protocol mutually") | |
585 | } | |
586 | return cn, nil | |
587 | } | |
588 | ||
589 | // disableKeepAlives reports whether connections should be closed as | |
590 | // soon as possible after handling the first request. | |
591 | func (t *Transport) disableKeepAlives() bool { | |
592 | return t.t1 != nil && t.t1.DisableKeepAlives | |
593 | } | |
594 | ||
595 | func (t *Transport) expectContinueTimeout() time.Duration { | |
596 | if t.t1 == nil { | |
597 | return 0 | |
598 | } | |
107c1cdb | 599 | return t.t1.ExpectContinueTimeout |
15c0b25d AP |
600 | } |
601 | ||
602 | func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) { | |
603 | return t.newClientConn(c, false) | |
604 | } | |
605 | ||
606 | func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) { | |
607 | cc := &ClientConn{ | |
608 | t: t, | |
609 | tconn: c, | |
610 | readerDone: make(chan struct{}), | |
611 | nextStreamID: 1, | |
612 | maxFrameSize: 16 << 10, // spec default | |
613 | initialWindowSize: 65535, // spec default | |
614 | maxConcurrentStreams: 1000, // "infinite", per spec. 1000 seems good enough. | |
615 | peerMaxHeaderListSize: 0xffffffffffffffff, // "infinite", per spec. Use 2^64-1 instead. | |
616 | streams: make(map[uint32]*clientStream), | |
617 | singleUse: singleUse, | |
618 | wantSettingsAck: true, | |
619 | pings: make(map[[8]byte]chan struct{}), | |
620 | } | |
621 | if d := t.idleConnTimeout(); d != 0 { | |
622 | cc.idleTimeout = d | |
623 | cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout) | |
624 | } | |
625 | if VerboseLogs { | |
626 | t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) | |
627 | } | |
628 | ||
629 | cc.cond = sync.NewCond(&cc.mu) | |
630 | cc.flow.add(int32(initialWindowSize)) | |
631 | ||
632 | // TODO: adjust this writer size to account for frame size + | |
633 | // MTU + crypto/tls record padding. | |
634 | cc.bw = bufio.NewWriter(stickyErrWriter{c, &cc.werr}) | |
635 | cc.br = bufio.NewReader(c) | |
636 | cc.fr = NewFramer(cc.bw, cc.br) | |
637 | cc.fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil) | |
638 | cc.fr.MaxHeaderListSize = t.maxHeaderListSize() | |
639 | ||
640 | // TODO: SetMaxDynamicTableSize, SetMaxDynamicTableSizeLimit on | |
641 | // henc in response to SETTINGS frames? | |
642 | cc.henc = hpack.NewEncoder(&cc.hbuf) | |
643 | ||
107c1cdb ND |
644 | if t.AllowHTTP { |
645 | cc.nextStreamID = 3 | |
646 | } | |
647 | ||
15c0b25d AP |
648 | if cs, ok := c.(connectionStater); ok { |
649 | state := cs.ConnectionState() | |
650 | cc.tlsState = &state | |
651 | } | |
652 | ||
653 | initialSettings := []Setting{ | |
654 | {ID: SettingEnablePush, Val: 0}, | |
655 | {ID: SettingInitialWindowSize, Val: transportDefaultStreamFlow}, | |
656 | } | |
657 | if max := t.maxHeaderListSize(); max != 0 { | |
658 | initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max}) | |
659 | } | |
660 | ||
661 | cc.bw.Write(clientPreface) | |
662 | cc.fr.WriteSettings(initialSettings...) | |
663 | cc.fr.WriteWindowUpdate(0, transportDefaultConnFlow) | |
664 | cc.inflow.add(transportDefaultConnFlow + initialWindowSize) | |
665 | cc.bw.Flush() | |
666 | if cc.werr != nil { | |
667 | return nil, cc.werr | |
668 | } | |
669 | ||
670 | go cc.readLoop() | |
671 | return cc, nil | |
672 | } | |
673 | ||
674 | func (cc *ClientConn) setGoAway(f *GoAwayFrame) { | |
675 | cc.mu.Lock() | |
676 | defer cc.mu.Unlock() | |
677 | ||
678 | old := cc.goAway | |
679 | cc.goAway = f | |
680 | ||
681 | // Merge the previous and current GoAway error frames. | |
682 | if cc.goAwayDebug == "" { | |
683 | cc.goAwayDebug = string(f.DebugData()) | |
684 | } | |
685 | if old != nil && old.ErrCode != ErrCodeNo { | |
686 | cc.goAway.ErrCode = old.ErrCode | |
687 | } | |
688 | last := f.LastStreamID | |
689 | for streamID, cs := range cc.streams { | |
690 | if streamID > last { | |
691 | select { | |
692 | case cs.resc <- resAndError{err: errClientConnGotGoAway}: | |
693 | default: | |
694 | } | |
695 | } | |
696 | } | |
697 | } | |
698 | ||
699 | // CanTakeNewRequest reports whether the connection can take a new request, | |
700 | // meaning it has not been closed or received or sent a GOAWAY. | |
701 | func (cc *ClientConn) CanTakeNewRequest() bool { | |
702 | cc.mu.Lock() | |
703 | defer cc.mu.Unlock() | |
704 | return cc.canTakeNewRequestLocked() | |
705 | } | |
706 | ||
107c1cdb ND |
707 | // clientConnIdleState describes the suitability of a client |
708 | // connection to initiate a new RoundTrip request. | |
709 | type clientConnIdleState struct { | |
710 | canTakeNewRequest bool | |
711 | freshConn bool // whether it's unused by any previous request | |
712 | } | |
713 | ||
714 | func (cc *ClientConn) idleState() clientConnIdleState { | |
715 | cc.mu.Lock() | |
716 | defer cc.mu.Unlock() | |
717 | return cc.idleStateLocked() | |
718 | } | |
719 | ||
720 | func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) { | |
15c0b25d | 721 | if cc.singleUse && cc.nextStreamID > 1 { |
107c1cdb | 722 | return |
15c0b25d | 723 | } |
107c1cdb ND |
724 | var maxConcurrentOkay bool |
725 | if cc.t.StrictMaxConcurrentStreams { | |
726 | // We'll tell the caller we can take a new request to | |
727 | // prevent the caller from dialing a new TCP | |
728 | // connection, but then we'll block later before | |
729 | // writing it. | |
730 | maxConcurrentOkay = true | |
731 | } else { | |
732 | maxConcurrentOkay = int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) | |
733 | } | |
734 | ||
735 | st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay && | |
736 | int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 | |
737 | st.freshConn = cc.nextStreamID == 1 && st.canTakeNewRequest | |
738 | return | |
739 | } | |
740 | ||
741 | func (cc *ClientConn) canTakeNewRequestLocked() bool { | |
742 | st := cc.idleStateLocked() | |
743 | return st.canTakeNewRequest | |
15c0b25d AP |
744 | } |
745 | ||
746 | // onIdleTimeout is called from a time.AfterFunc goroutine. It will | |
747 | // only be called when we're idle, but because we're coming from a new | |
748 | // goroutine, there could be a new request coming in at the same time, | |
749 | // so this simply calls the synchronized closeIfIdle to shut down this | |
750 | // connection. The timer could just call closeIfIdle, but this is more | |
751 | // clear. | |
752 | func (cc *ClientConn) onIdleTimeout() { | |
753 | cc.closeIfIdle() | |
754 | } | |
755 | ||
756 | func (cc *ClientConn) closeIfIdle() { | |
757 | cc.mu.Lock() | |
758 | if len(cc.streams) > 0 { | |
759 | cc.mu.Unlock() | |
760 | return | |
761 | } | |
762 | cc.closed = true | |
763 | nextID := cc.nextStreamID | |
764 | // TODO: do clients send GOAWAY too? maybe? Just Close: | |
765 | cc.mu.Unlock() | |
766 | ||
767 | if VerboseLogs { | |
768 | cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2) | |
769 | } | |
770 | cc.tconn.Close() | |
771 | } | |
772 | ||
107c1cdb ND |
773 | var shutdownEnterWaitStateHook = func() {} |
774 | ||
775 | // Shutdown gracefully close the client connection, waiting for running streams to complete. | |
776 | func (cc *ClientConn) Shutdown(ctx context.Context) error { | |
777 | if err := cc.sendGoAway(); err != nil { | |
778 | return err | |
779 | } | |
780 | // Wait for all in-flight streams to complete or connection to close | |
781 | done := make(chan error, 1) | |
782 | cancelled := false // guarded by cc.mu | |
783 | go func() { | |
784 | cc.mu.Lock() | |
785 | defer cc.mu.Unlock() | |
786 | for { | |
787 | if len(cc.streams) == 0 || cc.closed { | |
788 | cc.closed = true | |
789 | done <- cc.tconn.Close() | |
790 | break | |
791 | } | |
792 | if cancelled { | |
793 | break | |
794 | } | |
795 | cc.cond.Wait() | |
796 | } | |
797 | }() | |
798 | shutdownEnterWaitStateHook() | |
799 | select { | |
800 | case err := <-done: | |
801 | return err | |
802 | case <-ctx.Done(): | |
803 | cc.mu.Lock() | |
804 | // Free the goroutine above | |
805 | cancelled = true | |
806 | cc.cond.Broadcast() | |
807 | cc.mu.Unlock() | |
808 | return ctx.Err() | |
809 | } | |
810 | } | |
811 | ||
812 | func (cc *ClientConn) sendGoAway() error { | |
813 | cc.mu.Lock() | |
814 | defer cc.mu.Unlock() | |
815 | cc.wmu.Lock() | |
816 | defer cc.wmu.Unlock() | |
817 | if cc.closing { | |
818 | // GOAWAY sent already | |
819 | return nil | |
820 | } | |
821 | // Send a graceful shutdown frame to server | |
822 | maxStreamID := cc.nextStreamID | |
823 | if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil { | |
824 | return err | |
825 | } | |
826 | if err := cc.bw.Flush(); err != nil { | |
827 | return err | |
828 | } | |
829 | // Prevent new requests | |
830 | cc.closing = true | |
831 | return nil | |
832 | } | |
833 | ||
834 | // Close closes the client connection immediately. | |
835 | // | |
836 | // In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead. | |
837 | func (cc *ClientConn) Close() error { | |
838 | cc.mu.Lock() | |
839 | defer cc.cond.Broadcast() | |
840 | defer cc.mu.Unlock() | |
841 | err := errors.New("http2: client connection force closed via ClientConn.Close") | |
842 | for id, cs := range cc.streams { | |
843 | select { | |
844 | case cs.resc <- resAndError{err: err}: | |
845 | default: | |
846 | } | |
847 | cs.bufPipe.CloseWithError(err) | |
848 | delete(cc.streams, id) | |
849 | } | |
850 | cc.closed = true | |
851 | return cc.tconn.Close() | |
852 | } | |
853 | ||
15c0b25d AP |
854 | const maxAllocFrameSize = 512 << 10 |
855 | ||
856 | // frameBuffer returns a scratch buffer suitable for writing DATA frames. | |
857 | // They're capped at the min of the peer's max frame size or 512KB | |
858 | // (kinda arbitrarily), but definitely capped so we don't allocate 4GB | |
859 | // bufers. | |
860 | func (cc *ClientConn) frameScratchBuffer() []byte { | |
861 | cc.mu.Lock() | |
862 | size := cc.maxFrameSize | |
863 | if size > maxAllocFrameSize { | |
864 | size = maxAllocFrameSize | |
865 | } | |
866 | for i, buf := range cc.freeBuf { | |
867 | if len(buf) >= int(size) { | |
868 | cc.freeBuf[i] = nil | |
869 | cc.mu.Unlock() | |
870 | return buf[:size] | |
871 | } | |
872 | } | |
873 | cc.mu.Unlock() | |
874 | return make([]byte, size) | |
875 | } | |
876 | ||
877 | func (cc *ClientConn) putFrameScratchBuffer(buf []byte) { | |
878 | cc.mu.Lock() | |
879 | defer cc.mu.Unlock() | |
880 | const maxBufs = 4 // arbitrary; 4 concurrent requests per conn? investigate. | |
881 | if len(cc.freeBuf) < maxBufs { | |
882 | cc.freeBuf = append(cc.freeBuf, buf) | |
883 | return | |
884 | } | |
885 | for i, old := range cc.freeBuf { | |
886 | if old == nil { | |
887 | cc.freeBuf[i] = buf | |
888 | return | |
889 | } | |
890 | } | |
891 | // forget about it. | |
892 | } | |
893 | ||
894 | // errRequestCanceled is a copy of net/http's errRequestCanceled because it's not | |
895 | // exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests. | |
896 | var errRequestCanceled = errors.New("net/http: request canceled") | |
897 | ||
898 | func commaSeparatedTrailers(req *http.Request) (string, error) { | |
899 | keys := make([]string, 0, len(req.Trailer)) | |
900 | for k := range req.Trailer { | |
901 | k = http.CanonicalHeaderKey(k) | |
902 | switch k { | |
903 | case "Transfer-Encoding", "Trailer", "Content-Length": | |
904 | return "", &badStringError{"invalid Trailer key", k} | |
905 | } | |
906 | keys = append(keys, k) | |
907 | } | |
908 | if len(keys) > 0 { | |
909 | sort.Strings(keys) | |
910 | return strings.Join(keys, ","), nil | |
911 | } | |
912 | return "", nil | |
913 | } | |
914 | ||
915 | func (cc *ClientConn) responseHeaderTimeout() time.Duration { | |
916 | if cc.t.t1 != nil { | |
917 | return cc.t.t1.ResponseHeaderTimeout | |
918 | } | |
919 | // No way to do this (yet?) with just an http2.Transport. Probably | |
920 | // no need. Request.Cancel this is the new way. We only need to support | |
921 | // this for compatibility with the old http.Transport fields when | |
922 | // we're doing transparent http2. | |
923 | return 0 | |
924 | } | |
925 | ||
926 | // checkConnHeaders checks whether req has any invalid connection-level headers. | |
927 | // per RFC 7540 section 8.1.2.2: Connection-Specific Header Fields. | |
928 | // Certain headers are special-cased as okay but not transmitted later. | |
929 | func checkConnHeaders(req *http.Request) error { | |
930 | if v := req.Header.Get("Upgrade"); v != "" { | |
931 | return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"]) | |
932 | } | |
933 | if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") { | |
934 | return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv) | |
935 | } | |
107c1cdb | 936 | if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !strings.EqualFold(vv[0], "close") && !strings.EqualFold(vv[0], "keep-alive")) { |
15c0b25d AP |
937 | return fmt.Errorf("http2: invalid Connection request header: %q", vv) |
938 | } | |
939 | return nil | |
940 | } | |
941 | ||
942 | // actualContentLength returns a sanitized version of | |
943 | // req.ContentLength, where 0 actually means zero (not unknown) and -1 | |
944 | // means unknown. | |
945 | func actualContentLength(req *http.Request) int64 { | |
107c1cdb | 946 | if req.Body == nil || req.Body == http.NoBody { |
15c0b25d AP |
947 | return 0 |
948 | } | |
949 | if req.ContentLength != 0 { | |
950 | return req.ContentLength | |
951 | } | |
952 | return -1 | |
953 | } | |
954 | ||
955 | func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) { | |
107c1cdb ND |
956 | resp, _, err := cc.roundTrip(req) |
957 | return resp, err | |
958 | } | |
959 | ||
960 | func (cc *ClientConn) roundTrip(req *http.Request) (res *http.Response, gotErrAfterReqBodyWrite bool, err error) { | |
15c0b25d | 961 | if err := checkConnHeaders(req); err != nil { |
107c1cdb | 962 | return nil, false, err |
15c0b25d AP |
963 | } |
964 | if cc.idleTimer != nil { | |
965 | cc.idleTimer.Stop() | |
966 | } | |
967 | ||
968 | trailers, err := commaSeparatedTrailers(req) | |
969 | if err != nil { | |
107c1cdb | 970 | return nil, false, err |
15c0b25d AP |
971 | } |
972 | hasTrailers := trailers != "" | |
973 | ||
974 | cc.mu.Lock() | |
975 | if err := cc.awaitOpenSlotForRequest(req); err != nil { | |
976 | cc.mu.Unlock() | |
107c1cdb | 977 | return nil, false, err |
15c0b25d AP |
978 | } |
979 | ||
980 | body := req.Body | |
981 | contentLen := actualContentLength(req) | |
982 | hasBody := contentLen != 0 | |
983 | ||
984 | // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? | |
985 | var requestedGzip bool | |
986 | if !cc.t.disableCompression() && | |
987 | req.Header.Get("Accept-Encoding") == "" && | |
988 | req.Header.Get("Range") == "" && | |
989 | req.Method != "HEAD" { | |
990 | // Request gzip only, not deflate. Deflate is ambiguous and | |
991 | // not as universally supported anyway. | |
992 | // See: http://www.gzip.org/zlib/zlib_faq.html#faq38 | |
993 | // | |
994 | // Note that we don't request this for HEAD requests, | |
995 | // due to a bug in nginx: | |
996 | // http://trac.nginx.org/nginx/ticket/358 | |
997 | // https://golang.org/issue/5522 | |
998 | // | |
999 | // We don't request gzip if the request is for a range, since | |
1000 | // auto-decoding a portion of a gzipped document will just fail | |
1001 | // anyway. See https://golang.org/issue/8923 | |
1002 | requestedGzip = true | |
1003 | } | |
1004 | ||
1005 | // we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is | |
1006 | // sent by writeRequestBody below, along with any Trailers, | |
1007 | // again in form HEADERS{1}, CONTINUATION{0,}) | |
1008 | hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen) | |
1009 | if err != nil { | |
1010 | cc.mu.Unlock() | |
107c1cdb | 1011 | return nil, false, err |
15c0b25d AP |
1012 | } |
1013 | ||
1014 | cs := cc.newStream() | |
1015 | cs.req = req | |
107c1cdb | 1016 | cs.trace = httptrace.ContextClientTrace(req.Context()) |
15c0b25d AP |
1017 | cs.requestedGzip = requestedGzip |
1018 | bodyWriter := cc.t.getBodyWriterState(cs, body) | |
1019 | cs.on100 = bodyWriter.on100 | |
1020 | ||
1021 | cc.wmu.Lock() | |
1022 | endStream := !hasBody && !hasTrailers | |
107c1cdb | 1023 | werr := cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs) |
15c0b25d AP |
1024 | cc.wmu.Unlock() |
1025 | traceWroteHeaders(cs.trace) | |
1026 | cc.mu.Unlock() | |
1027 | ||
1028 | if werr != nil { | |
1029 | if hasBody { | |
1030 | req.Body.Close() // per RoundTripper contract | |
1031 | bodyWriter.cancel() | |
1032 | } | |
1033 | cc.forgetStreamID(cs.ID) | |
1034 | // Don't bother sending a RST_STREAM (our write already failed; | |
1035 | // no need to keep writing) | |
1036 | traceWroteRequest(cs.trace, werr) | |
107c1cdb | 1037 | return nil, false, werr |
15c0b25d AP |
1038 | } |
1039 | ||
1040 | var respHeaderTimer <-chan time.Time | |
1041 | if hasBody { | |
1042 | bodyWriter.scheduleBodyWrite() | |
1043 | } else { | |
1044 | traceWroteRequest(cs.trace, nil) | |
1045 | if d := cc.responseHeaderTimeout(); d != 0 { | |
1046 | timer := time.NewTimer(d) | |
1047 | defer timer.Stop() | |
1048 | respHeaderTimer = timer.C | |
1049 | } | |
1050 | } | |
1051 | ||
1052 | readLoopResCh := cs.resc | |
1053 | bodyWritten := false | |
107c1cdb | 1054 | ctx := req.Context() |
15c0b25d | 1055 | |
107c1cdb | 1056 | handleReadLoopResponse := func(re resAndError) (*http.Response, bool, error) { |
15c0b25d AP |
1057 | res := re.res |
1058 | if re.err != nil || res.StatusCode > 299 { | |
1059 | // On error or status code 3xx, 4xx, 5xx, etc abort any | |
1060 | // ongoing write, assuming that the server doesn't care | |
1061 | // about our request body. If the server replied with 1xx or | |
1062 | // 2xx, however, then assume the server DOES potentially | |
1063 | // want our body (e.g. full-duplex streaming: | |
1064 | // golang.org/issue/13444). If it turns out the server | |
1065 | // doesn't, they'll RST_STREAM us soon enough. This is a | |
1066 | // heuristic to avoid adding knobs to Transport. Hopefully | |
1067 | // we can keep it. | |
1068 | bodyWriter.cancel() | |
1069 | cs.abortRequestBodyWrite(errStopReqBodyWrite) | |
1070 | } | |
1071 | if re.err != nil { | |
15c0b25d | 1072 | cc.forgetStreamID(cs.ID) |
107c1cdb | 1073 | return nil, cs.getStartedWrite(), re.err |
15c0b25d AP |
1074 | } |
1075 | res.Request = req | |
1076 | res.TLS = cc.tlsState | |
107c1cdb | 1077 | return res, false, nil |
15c0b25d AP |
1078 | } |
1079 | ||
1080 | for { | |
1081 | select { | |
1082 | case re := <-readLoopResCh: | |
1083 | return handleReadLoopResponse(re) | |
1084 | case <-respHeaderTimer: | |
1085 | if !hasBody || bodyWritten { | |
1086 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
1087 | } else { | |
1088 | bodyWriter.cancel() | |
1089 | cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) | |
1090 | } | |
1091 | cc.forgetStreamID(cs.ID) | |
107c1cdb | 1092 | return nil, cs.getStartedWrite(), errTimeout |
15c0b25d AP |
1093 | case <-ctx.Done(): |
1094 | if !hasBody || bodyWritten { | |
1095 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
1096 | } else { | |
1097 | bodyWriter.cancel() | |
1098 | cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) | |
1099 | } | |
1100 | cc.forgetStreamID(cs.ID) | |
107c1cdb | 1101 | return nil, cs.getStartedWrite(), ctx.Err() |
15c0b25d AP |
1102 | case <-req.Cancel: |
1103 | if !hasBody || bodyWritten { | |
1104 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
1105 | } else { | |
1106 | bodyWriter.cancel() | |
1107 | cs.abortRequestBodyWrite(errStopReqBodyWriteAndCancel) | |
1108 | } | |
1109 | cc.forgetStreamID(cs.ID) | |
107c1cdb | 1110 | return nil, cs.getStartedWrite(), errRequestCanceled |
15c0b25d AP |
1111 | case <-cs.peerReset: |
1112 | // processResetStream already removed the | |
1113 | // stream from the streams map; no need for | |
1114 | // forgetStreamID. | |
107c1cdb | 1115 | return nil, cs.getStartedWrite(), cs.resetErr |
15c0b25d AP |
1116 | case err := <-bodyWriter.resc: |
1117 | // Prefer the read loop's response, if available. Issue 16102. | |
1118 | select { | |
1119 | case re := <-readLoopResCh: | |
1120 | return handleReadLoopResponse(re) | |
1121 | default: | |
1122 | } | |
1123 | if err != nil { | |
107c1cdb ND |
1124 | cc.forgetStreamID(cs.ID) |
1125 | return nil, cs.getStartedWrite(), err | |
15c0b25d AP |
1126 | } |
1127 | bodyWritten = true | |
1128 | if d := cc.responseHeaderTimeout(); d != 0 { | |
1129 | timer := time.NewTimer(d) | |
1130 | defer timer.Stop() | |
1131 | respHeaderTimer = timer.C | |
1132 | } | |
1133 | } | |
1134 | } | |
1135 | } | |
1136 | ||
1137 | // awaitOpenSlotForRequest waits until len(streams) < maxConcurrentStreams. | |
1138 | // Must hold cc.mu. | |
1139 | func (cc *ClientConn) awaitOpenSlotForRequest(req *http.Request) error { | |
1140 | var waitingForConn chan struct{} | |
1141 | var waitingForConnErr error // guarded by cc.mu | |
1142 | for { | |
1143 | cc.lastActive = time.Now() | |
1144 | if cc.closed || !cc.canTakeNewRequestLocked() { | |
107c1cdb ND |
1145 | if waitingForConn != nil { |
1146 | close(waitingForConn) | |
1147 | } | |
15c0b25d AP |
1148 | return errClientConnUnusable |
1149 | } | |
1150 | if int64(len(cc.streams))+1 <= int64(cc.maxConcurrentStreams) { | |
1151 | if waitingForConn != nil { | |
1152 | close(waitingForConn) | |
1153 | } | |
1154 | return nil | |
1155 | } | |
1156 | // Unfortunately, we cannot wait on a condition variable and channel at | |
1157 | // the same time, so instead, we spin up a goroutine to check if the | |
1158 | // request is canceled while we wait for a slot to open in the connection. | |
1159 | if waitingForConn == nil { | |
1160 | waitingForConn = make(chan struct{}) | |
1161 | go func() { | |
1162 | if err := awaitRequestCancel(req, waitingForConn); err != nil { | |
1163 | cc.mu.Lock() | |
1164 | waitingForConnErr = err | |
1165 | cc.cond.Broadcast() | |
1166 | cc.mu.Unlock() | |
1167 | } | |
1168 | }() | |
1169 | } | |
1170 | cc.pendingRequests++ | |
1171 | cc.cond.Wait() | |
1172 | cc.pendingRequests-- | |
1173 | if waitingForConnErr != nil { | |
1174 | return waitingForConnErr | |
1175 | } | |
1176 | } | |
1177 | } | |
1178 | ||
1179 | // requires cc.wmu be held | |
107c1cdb | 1180 | func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error { |
15c0b25d | 1181 | first := true // first frame written (HEADERS is first, then CONTINUATION) |
15c0b25d AP |
1182 | for len(hdrs) > 0 && cc.werr == nil { |
1183 | chunk := hdrs | |
107c1cdb ND |
1184 | if len(chunk) > maxFrameSize { |
1185 | chunk = chunk[:maxFrameSize] | |
15c0b25d AP |
1186 | } |
1187 | hdrs = hdrs[len(chunk):] | |
1188 | endHeaders := len(hdrs) == 0 | |
1189 | if first { | |
1190 | cc.fr.WriteHeaders(HeadersFrameParam{ | |
1191 | StreamID: streamID, | |
1192 | BlockFragment: chunk, | |
1193 | EndStream: endStream, | |
1194 | EndHeaders: endHeaders, | |
1195 | }) | |
1196 | first = false | |
1197 | } else { | |
1198 | cc.fr.WriteContinuation(streamID, endHeaders, chunk) | |
1199 | } | |
1200 | } | |
1201 | // TODO(bradfitz): this Flush could potentially block (as | |
1202 | // could the WriteHeaders call(s) above), which means they | |
1203 | // wouldn't respond to Request.Cancel being readable. That's | |
1204 | // rare, but this should probably be in a goroutine. | |
1205 | cc.bw.Flush() | |
1206 | return cc.werr | |
1207 | } | |
1208 | ||
1209 | // internal error values; they don't escape to callers | |
1210 | var ( | |
1211 | // abort request body write; don't send cancel | |
1212 | errStopReqBodyWrite = errors.New("http2: aborting request body write") | |
1213 | ||
1214 | // abort request body write, but send stream reset of cancel. | |
1215 | errStopReqBodyWriteAndCancel = errors.New("http2: canceling request") | |
1216 | ) | |
1217 | ||
1218 | func (cs *clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) { | |
1219 | cc := cs.cc | |
1220 | sentEnd := false // whether we sent the final DATA frame w/ END_STREAM | |
1221 | buf := cc.frameScratchBuffer() | |
1222 | defer cc.putFrameScratchBuffer(buf) | |
1223 | ||
1224 | defer func() { | |
1225 | traceWroteRequest(cs.trace, err) | |
1226 | // TODO: write h12Compare test showing whether | |
1227 | // Request.Body is closed by the Transport, | |
1228 | // and in multiple cases: server replies <=299 and >299 | |
1229 | // while still writing request body | |
1230 | cerr := bodyCloser.Close() | |
1231 | if err == nil { | |
1232 | err = cerr | |
1233 | } | |
1234 | }() | |
1235 | ||
1236 | req := cs.req | |
1237 | hasTrailers := req.Trailer != nil | |
1238 | ||
1239 | var sawEOF bool | |
1240 | for !sawEOF { | |
1241 | n, err := body.Read(buf) | |
1242 | if err == io.EOF { | |
1243 | sawEOF = true | |
1244 | err = nil | |
1245 | } else if err != nil { | |
107c1cdb | 1246 | cc.writeStreamReset(cs.ID, ErrCodeCancel, err) |
15c0b25d AP |
1247 | return err |
1248 | } | |
1249 | ||
1250 | remain := buf[:n] | |
1251 | for len(remain) > 0 && err == nil { | |
1252 | var allowed int32 | |
1253 | allowed, err = cs.awaitFlowControl(len(remain)) | |
1254 | switch { | |
1255 | case err == errStopReqBodyWrite: | |
1256 | return err | |
1257 | case err == errStopReqBodyWriteAndCancel: | |
1258 | cc.writeStreamReset(cs.ID, ErrCodeCancel, nil) | |
1259 | return err | |
1260 | case err != nil: | |
1261 | return err | |
1262 | } | |
1263 | cc.wmu.Lock() | |
1264 | data := remain[:allowed] | |
1265 | remain = remain[allowed:] | |
1266 | sentEnd = sawEOF && len(remain) == 0 && !hasTrailers | |
1267 | err = cc.fr.WriteData(cs.ID, sentEnd, data) | |
1268 | if err == nil { | |
1269 | // TODO(bradfitz): this flush is for latency, not bandwidth. | |
1270 | // Most requests won't need this. Make this opt-in or | |
1271 | // opt-out? Use some heuristic on the body type? Nagel-like | |
1272 | // timers? Based on 'n'? Only last chunk of this for loop, | |
1273 | // unless flow control tokens are low? For now, always. | |
1274 | // If we change this, see comment below. | |
1275 | err = cc.bw.Flush() | |
1276 | } | |
1277 | cc.wmu.Unlock() | |
1278 | } | |
1279 | if err != nil { | |
1280 | return err | |
1281 | } | |
1282 | } | |
1283 | ||
1284 | if sentEnd { | |
1285 | // Already sent END_STREAM (which implies we have no | |
1286 | // trailers) and flushed, because currently all | |
1287 | // WriteData frames above get a flush. So we're done. | |
1288 | return nil | |
1289 | } | |
1290 | ||
1291 | var trls []byte | |
1292 | if hasTrailers { | |
1293 | cc.mu.Lock() | |
1294 | trls, err = cc.encodeTrailers(req) | |
1295 | cc.mu.Unlock() | |
1296 | if err != nil { | |
1297 | cc.writeStreamReset(cs.ID, ErrCodeInternal, err) | |
1298 | cc.forgetStreamID(cs.ID) | |
1299 | return err | |
1300 | } | |
1301 | } | |
1302 | ||
107c1cdb ND |
1303 | cc.mu.Lock() |
1304 | maxFrameSize := int(cc.maxFrameSize) | |
1305 | cc.mu.Unlock() | |
1306 | ||
15c0b25d AP |
1307 | cc.wmu.Lock() |
1308 | defer cc.wmu.Unlock() | |
1309 | ||
1310 | // Two ways to send END_STREAM: either with trailers, or | |
1311 | // with an empty DATA frame. | |
1312 | if len(trls) > 0 { | |
107c1cdb | 1313 | err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls) |
15c0b25d AP |
1314 | } else { |
1315 | err = cc.fr.WriteData(cs.ID, true, nil) | |
1316 | } | |
1317 | if ferr := cc.bw.Flush(); ferr != nil && err == nil { | |
1318 | err = ferr | |
1319 | } | |
1320 | return err | |
1321 | } | |
1322 | ||
1323 | // awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow | |
1324 | // control tokens from the server. | |
1325 | // It returns either the non-zero number of tokens taken or an error | |
1326 | // if the stream is dead. | |
1327 | func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) { | |
1328 | cc := cs.cc | |
1329 | cc.mu.Lock() | |
1330 | defer cc.mu.Unlock() | |
1331 | for { | |
1332 | if cc.closed { | |
1333 | return 0, errClientConnClosed | |
1334 | } | |
1335 | if cs.stopReqBody != nil { | |
1336 | return 0, cs.stopReqBody | |
1337 | } | |
1338 | if err := cs.checkResetOrDone(); err != nil { | |
1339 | return 0, err | |
1340 | } | |
1341 | if a := cs.flow.available(); a > 0 { | |
1342 | take := a | |
1343 | if int(take) > maxBytes { | |
1344 | ||
1345 | take = int32(maxBytes) // can't truncate int; take is int32 | |
1346 | } | |
1347 | if take > int32(cc.maxFrameSize) { | |
1348 | take = int32(cc.maxFrameSize) | |
1349 | } | |
1350 | cs.flow.take(take) | |
1351 | return take, nil | |
1352 | } | |
1353 | cc.cond.Wait() | |
1354 | } | |
1355 | } | |
1356 | ||
1357 | type badStringError struct { | |
1358 | what string | |
1359 | str string | |
1360 | } | |
1361 | ||
1362 | func (e *badStringError) Error() string { return fmt.Sprintf("%s %q", e.what, e.str) } | |
1363 | ||
1364 | // requires cc.mu be held. | |
1365 | func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) { | |
1366 | cc.hbuf.Reset() | |
1367 | ||
1368 | host := req.Host | |
1369 | if host == "" { | |
1370 | host = req.URL.Host | |
1371 | } | |
107c1cdb | 1372 | host, err := httpguts.PunycodeHostPort(host) |
15c0b25d AP |
1373 | if err != nil { |
1374 | return nil, err | |
1375 | } | |
1376 | ||
1377 | var path string | |
1378 | if req.Method != "CONNECT" { | |
1379 | path = req.URL.RequestURI() | |
1380 | if !validPseudoPath(path) { | |
1381 | orig := path | |
1382 | path = strings.TrimPrefix(path, req.URL.Scheme+"://"+host) | |
1383 | if !validPseudoPath(path) { | |
1384 | if req.URL.Opaque != "" { | |
1385 | return nil, fmt.Errorf("invalid request :path %q from URL.Opaque = %q", orig, req.URL.Opaque) | |
1386 | } else { | |
1387 | return nil, fmt.Errorf("invalid request :path %q", orig) | |
1388 | } | |
1389 | } | |
1390 | } | |
1391 | } | |
1392 | ||
1393 | // Check for any invalid headers and return an error before we | |
1394 | // potentially pollute our hpack state. (We want to be able to | |
1395 | // continue to reuse the hpack encoder for future requests) | |
1396 | for k, vv := range req.Header { | |
107c1cdb | 1397 | if !httpguts.ValidHeaderFieldName(k) { |
15c0b25d AP |
1398 | return nil, fmt.Errorf("invalid HTTP header name %q", k) |
1399 | } | |
1400 | for _, v := range vv { | |
107c1cdb | 1401 | if !httpguts.ValidHeaderFieldValue(v) { |
15c0b25d AP |
1402 | return nil, fmt.Errorf("invalid HTTP header value %q for header %q", v, k) |
1403 | } | |
1404 | } | |
1405 | } | |
1406 | ||
1407 | enumerateHeaders := func(f func(name, value string)) { | |
1408 | // 8.1.2.3 Request Pseudo-Header Fields | |
1409 | // The :path pseudo-header field includes the path and query parts of the | |
1410 | // target URI (the path-absolute production and optionally a '?' character | |
1411 | // followed by the query production (see Sections 3.3 and 3.4 of | |
1412 | // [RFC3986]). | |
1413 | f(":authority", host) | |
107c1cdb ND |
1414 | m := req.Method |
1415 | if m == "" { | |
1416 | m = http.MethodGet | |
1417 | } | |
1418 | f(":method", m) | |
15c0b25d AP |
1419 | if req.Method != "CONNECT" { |
1420 | f(":path", path) | |
1421 | f(":scheme", req.URL.Scheme) | |
1422 | } | |
1423 | if trailers != "" { | |
1424 | f("trailer", trailers) | |
1425 | } | |
1426 | ||
1427 | var didUA bool | |
1428 | for k, vv := range req.Header { | |
1429 | if strings.EqualFold(k, "host") || strings.EqualFold(k, "content-length") { | |
1430 | // Host is :authority, already sent. | |
1431 | // Content-Length is automatic, set below. | |
1432 | continue | |
1433 | } else if strings.EqualFold(k, "connection") || strings.EqualFold(k, "proxy-connection") || | |
1434 | strings.EqualFold(k, "transfer-encoding") || strings.EqualFold(k, "upgrade") || | |
1435 | strings.EqualFold(k, "keep-alive") { | |
1436 | // Per 8.1.2.2 Connection-Specific Header | |
1437 | // Fields, don't send connection-specific | |
1438 | // fields. We have already checked if any | |
1439 | // are error-worthy so just ignore the rest. | |
1440 | continue | |
1441 | } else if strings.EqualFold(k, "user-agent") { | |
1442 | // Match Go's http1 behavior: at most one | |
1443 | // User-Agent. If set to nil or empty string, | |
1444 | // then omit it. Otherwise if not mentioned, | |
1445 | // include the default (below). | |
1446 | didUA = true | |
1447 | if len(vv) < 1 { | |
1448 | continue | |
1449 | } | |
1450 | vv = vv[:1] | |
1451 | if vv[0] == "" { | |
1452 | continue | |
1453 | } | |
1454 | ||
1455 | } | |
1456 | ||
1457 | for _, v := range vv { | |
1458 | f(k, v) | |
1459 | } | |
1460 | } | |
1461 | if shouldSendReqContentLength(req.Method, contentLength) { | |
1462 | f("content-length", strconv.FormatInt(contentLength, 10)) | |
1463 | } | |
1464 | if addGzipHeader { | |
1465 | f("accept-encoding", "gzip") | |
1466 | } | |
1467 | if !didUA { | |
1468 | f("user-agent", defaultUserAgent) | |
1469 | } | |
1470 | } | |
1471 | ||
1472 | // Do a first pass over the headers counting bytes to ensure | |
1473 | // we don't exceed cc.peerMaxHeaderListSize. This is done as a | |
1474 | // separate pass before encoding the headers to prevent | |
1475 | // modifying the hpack state. | |
1476 | hlSize := uint64(0) | |
1477 | enumerateHeaders(func(name, value string) { | |
1478 | hf := hpack.HeaderField{Name: name, Value: value} | |
1479 | hlSize += uint64(hf.Size()) | |
1480 | }) | |
1481 | ||
1482 | if hlSize > cc.peerMaxHeaderListSize { | |
1483 | return nil, errRequestHeaderListSize | |
1484 | } | |
1485 | ||
107c1cdb ND |
1486 | trace := httptrace.ContextClientTrace(req.Context()) |
1487 | traceHeaders := traceHasWroteHeaderField(trace) | |
1488 | ||
15c0b25d AP |
1489 | // Header list size is ok. Write the headers. |
1490 | enumerateHeaders(func(name, value string) { | |
107c1cdb ND |
1491 | name = strings.ToLower(name) |
1492 | cc.writeHeader(name, value) | |
1493 | if traceHeaders { | |
1494 | traceWroteHeaderField(trace, name, value) | |
1495 | } | |
15c0b25d AP |
1496 | }) |
1497 | ||
1498 | return cc.hbuf.Bytes(), nil | |
1499 | } | |
1500 | ||
1501 | // shouldSendReqContentLength reports whether the http2.Transport should send | |
1502 | // a "content-length" request header. This logic is basically a copy of the net/http | |
1503 | // transferWriter.shouldSendContentLength. | |
1504 | // The contentLength is the corrected contentLength (so 0 means actually 0, not unknown). | |
1505 | // -1 means unknown. | |
1506 | func shouldSendReqContentLength(method string, contentLength int64) bool { | |
1507 | if contentLength > 0 { | |
1508 | return true | |
1509 | } | |
1510 | if contentLength < 0 { | |
1511 | return false | |
1512 | } | |
1513 | // For zero bodies, whether we send a content-length depends on the method. | |
1514 | // It also kinda doesn't matter for http2 either way, with END_STREAM. | |
1515 | switch method { | |
1516 | case "POST", "PUT", "PATCH": | |
1517 | return true | |
1518 | default: | |
1519 | return false | |
1520 | } | |
1521 | } | |
1522 | ||
1523 | // requires cc.mu be held. | |
1524 | func (cc *ClientConn) encodeTrailers(req *http.Request) ([]byte, error) { | |
1525 | cc.hbuf.Reset() | |
1526 | ||
1527 | hlSize := uint64(0) | |
1528 | for k, vv := range req.Trailer { | |
1529 | for _, v := range vv { | |
1530 | hf := hpack.HeaderField{Name: k, Value: v} | |
1531 | hlSize += uint64(hf.Size()) | |
1532 | } | |
1533 | } | |
1534 | if hlSize > cc.peerMaxHeaderListSize { | |
1535 | return nil, errRequestHeaderListSize | |
1536 | } | |
1537 | ||
1538 | for k, vv := range req.Trailer { | |
1539 | // Transfer-Encoding, etc.. have already been filtered at the | |
1540 | // start of RoundTrip | |
1541 | lowKey := strings.ToLower(k) | |
1542 | for _, v := range vv { | |
1543 | cc.writeHeader(lowKey, v) | |
1544 | } | |
1545 | } | |
1546 | return cc.hbuf.Bytes(), nil | |
1547 | } | |
1548 | ||
1549 | func (cc *ClientConn) writeHeader(name, value string) { | |
1550 | if VerboseLogs { | |
1551 | log.Printf("http2: Transport encoding header %q = %q", name, value) | |
1552 | } | |
1553 | cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value}) | |
1554 | } | |
1555 | ||
1556 | type resAndError struct { | |
1557 | res *http.Response | |
1558 | err error | |
1559 | } | |
1560 | ||
1561 | // requires cc.mu be held. | |
1562 | func (cc *ClientConn) newStream() *clientStream { | |
1563 | cs := &clientStream{ | |
1564 | cc: cc, | |
1565 | ID: cc.nextStreamID, | |
1566 | resc: make(chan resAndError, 1), | |
1567 | peerReset: make(chan struct{}), | |
1568 | done: make(chan struct{}), | |
1569 | } | |
1570 | cs.flow.add(int32(cc.initialWindowSize)) | |
1571 | cs.flow.setConnFlow(&cc.flow) | |
1572 | cs.inflow.add(transportDefaultStreamFlow) | |
1573 | cs.inflow.setConnFlow(&cc.inflow) | |
1574 | cc.nextStreamID += 2 | |
1575 | cc.streams[cs.ID] = cs | |
1576 | return cs | |
1577 | } | |
1578 | ||
1579 | func (cc *ClientConn) forgetStreamID(id uint32) { | |
1580 | cc.streamByID(id, true) | |
1581 | } | |
1582 | ||
1583 | func (cc *ClientConn) streamByID(id uint32, andRemove bool) *clientStream { | |
1584 | cc.mu.Lock() | |
1585 | defer cc.mu.Unlock() | |
1586 | cs := cc.streams[id] | |
1587 | if andRemove && cs != nil && !cc.closed { | |
1588 | cc.lastActive = time.Now() | |
1589 | delete(cc.streams, id) | |
1590 | if len(cc.streams) == 0 && cc.idleTimer != nil { | |
1591 | cc.idleTimer.Reset(cc.idleTimeout) | |
1592 | } | |
1593 | close(cs.done) | |
1594 | // Wake up checkResetOrDone via clientStream.awaitFlowControl and | |
1595 | // wake up RoundTrip if there is a pending request. | |
1596 | cc.cond.Broadcast() | |
1597 | } | |
1598 | return cs | |
1599 | } | |
1600 | ||
1601 | // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop. | |
1602 | type clientConnReadLoop struct { | |
1603 | cc *ClientConn | |
15c0b25d AP |
1604 | closeWhenIdle bool |
1605 | } | |
1606 | ||
1607 | // readLoop runs in its own goroutine and reads and dispatches frames. | |
1608 | func (cc *ClientConn) readLoop() { | |
107c1cdb | 1609 | rl := &clientConnReadLoop{cc: cc} |
15c0b25d AP |
1610 | defer rl.cleanup() |
1611 | cc.readerErr = rl.run() | |
1612 | if ce, ok := cc.readerErr.(ConnectionError); ok { | |
1613 | cc.wmu.Lock() | |
1614 | cc.fr.WriteGoAway(0, ErrCode(ce), nil) | |
1615 | cc.wmu.Unlock() | |
1616 | } | |
1617 | } | |
1618 | ||
1619 | // GoAwayError is returned by the Transport when the server closes the | |
1620 | // TCP connection after sending a GOAWAY frame. | |
1621 | type GoAwayError struct { | |
1622 | LastStreamID uint32 | |
1623 | ErrCode ErrCode | |
1624 | DebugData string | |
1625 | } | |
1626 | ||
1627 | func (e GoAwayError) Error() string { | |
1628 | return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q", | |
1629 | e.LastStreamID, e.ErrCode, e.DebugData) | |
1630 | } | |
1631 | ||
1632 | func isEOFOrNetReadError(err error) bool { | |
1633 | if err == io.EOF { | |
1634 | return true | |
1635 | } | |
1636 | ne, ok := err.(*net.OpError) | |
1637 | return ok && ne.Op == "read" | |
1638 | } | |
1639 | ||
1640 | func (rl *clientConnReadLoop) cleanup() { | |
1641 | cc := rl.cc | |
1642 | defer cc.tconn.Close() | |
1643 | defer cc.t.connPool().MarkDead(cc) | |
1644 | defer close(cc.readerDone) | |
1645 | ||
1646 | if cc.idleTimer != nil { | |
1647 | cc.idleTimer.Stop() | |
1648 | } | |
1649 | ||
1650 | // Close any response bodies if the server closes prematurely. | |
1651 | // TODO: also do this if we've written the headers but not | |
1652 | // gotten a response yet. | |
1653 | err := cc.readerErr | |
1654 | cc.mu.Lock() | |
1655 | if cc.goAway != nil && isEOFOrNetReadError(err) { | |
1656 | err = GoAwayError{ | |
1657 | LastStreamID: cc.goAway.LastStreamID, | |
1658 | ErrCode: cc.goAway.ErrCode, | |
1659 | DebugData: cc.goAwayDebug, | |
1660 | } | |
1661 | } else if err == io.EOF { | |
1662 | err = io.ErrUnexpectedEOF | |
1663 | } | |
15c0b25d | 1664 | for _, cs := range cc.streams { |
107c1cdb | 1665 | cs.bufPipe.CloseWithError(err) // no-op if already closed |
15c0b25d AP |
1666 | select { |
1667 | case cs.resc <- resAndError{err: err}: | |
1668 | default: | |
1669 | } | |
1670 | close(cs.done) | |
1671 | } | |
1672 | cc.closed = true | |
1673 | cc.cond.Broadcast() | |
1674 | cc.mu.Unlock() | |
1675 | } | |
1676 | ||
1677 | func (rl *clientConnReadLoop) run() error { | |
1678 | cc := rl.cc | |
1679 | rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse | |
1680 | gotReply := false // ever saw a HEADERS reply | |
1681 | gotSettings := false | |
1682 | for { | |
1683 | f, err := cc.fr.ReadFrame() | |
1684 | if err != nil { | |
1685 | cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) | |
1686 | } | |
1687 | if se, ok := err.(StreamError); ok { | |
1688 | if cs := cc.streamByID(se.StreamID, false); cs != nil { | |
1689 | cs.cc.writeStreamReset(cs.ID, se.Code, err) | |
1690 | cs.cc.forgetStreamID(cs.ID) | |
1691 | if se.Cause == nil { | |
1692 | se.Cause = cc.fr.errDetail | |
1693 | } | |
1694 | rl.endStreamError(cs, se) | |
1695 | } | |
1696 | continue | |
1697 | } else if err != nil { | |
1698 | return err | |
1699 | } | |
1700 | if VerboseLogs { | |
1701 | cc.vlogf("http2: Transport received %s", summarizeFrame(f)) | |
1702 | } | |
1703 | if !gotSettings { | |
1704 | if _, ok := f.(*SettingsFrame); !ok { | |
1705 | cc.logf("protocol error: received %T before a SETTINGS frame", f) | |
1706 | return ConnectionError(ErrCodeProtocol) | |
1707 | } | |
1708 | gotSettings = true | |
1709 | } | |
1710 | maybeIdle := false // whether frame might transition us to idle | |
1711 | ||
1712 | switch f := f.(type) { | |
1713 | case *MetaHeadersFrame: | |
1714 | err = rl.processHeaders(f) | |
1715 | maybeIdle = true | |
1716 | gotReply = true | |
1717 | case *DataFrame: | |
1718 | err = rl.processData(f) | |
1719 | maybeIdle = true | |
1720 | case *GoAwayFrame: | |
1721 | err = rl.processGoAway(f) | |
1722 | maybeIdle = true | |
1723 | case *RSTStreamFrame: | |
1724 | err = rl.processResetStream(f) | |
1725 | maybeIdle = true | |
1726 | case *SettingsFrame: | |
1727 | err = rl.processSettings(f) | |
1728 | case *PushPromiseFrame: | |
1729 | err = rl.processPushPromise(f) | |
1730 | case *WindowUpdateFrame: | |
1731 | err = rl.processWindowUpdate(f) | |
1732 | case *PingFrame: | |
1733 | err = rl.processPing(f) | |
1734 | default: | |
1735 | cc.logf("Transport: unhandled response frame type %T", f) | |
1736 | } | |
1737 | if err != nil { | |
1738 | if VerboseLogs { | |
1739 | cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err) | |
1740 | } | |
1741 | return err | |
1742 | } | |
107c1cdb | 1743 | if rl.closeWhenIdle && gotReply && maybeIdle { |
15c0b25d AP |
1744 | cc.closeIfIdle() |
1745 | } | |
1746 | } | |
1747 | } | |
1748 | ||
1749 | func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error { | |
1750 | cc := rl.cc | |
107c1cdb | 1751 | cs := cc.streamByID(f.StreamID, false) |
15c0b25d AP |
1752 | if cs == nil { |
1753 | // We'd get here if we canceled a request while the | |
1754 | // server had its response still in flight. So if this | |
1755 | // was just something we canceled, ignore it. | |
1756 | return nil | |
1757 | } | |
107c1cdb ND |
1758 | if f.StreamEnded() { |
1759 | // Issue 20521: If the stream has ended, streamByID() causes | |
1760 | // clientStream.done to be closed, which causes the request's bodyWriter | |
1761 | // to be closed with an errStreamClosed, which may be received by | |
1762 | // clientConn.RoundTrip before the result of processing these headers. | |
1763 | // Deferring stream closure allows the header processing to occur first. | |
1764 | // clientConn.RoundTrip may still receive the bodyWriter error first, but | |
1765 | // the fix for issue 16102 prioritises any response. | |
1766 | // | |
1767 | // Issue 22413: If there is no request body, we should close the | |
1768 | // stream before writing to cs.resc so that the stream is closed | |
1769 | // immediately once RoundTrip returns. | |
1770 | if cs.req.Body != nil { | |
1771 | defer cc.forgetStreamID(f.StreamID) | |
1772 | } else { | |
1773 | cc.forgetStreamID(f.StreamID) | |
1774 | } | |
1775 | } | |
15c0b25d AP |
1776 | if !cs.firstByte { |
1777 | if cs.trace != nil { | |
1778 | // TODO(bradfitz): move first response byte earlier, | |
1779 | // when we first read the 9 byte header, not waiting | |
1780 | // until all the HEADERS+CONTINUATION frames have been | |
1781 | // merged. This works for now. | |
1782 | traceFirstResponseByte(cs.trace) | |
1783 | } | |
1784 | cs.firstByte = true | |
1785 | } | |
1786 | if !cs.pastHeaders { | |
1787 | cs.pastHeaders = true | |
1788 | } else { | |
1789 | return rl.processTrailers(cs, f) | |
1790 | } | |
1791 | ||
1792 | res, err := rl.handleResponse(cs, f) | |
1793 | if err != nil { | |
1794 | if _, ok := err.(ConnectionError); ok { | |
1795 | return err | |
1796 | } | |
1797 | // Any other error type is a stream error. | |
1798 | cs.cc.writeStreamReset(f.StreamID, ErrCodeProtocol, err) | |
107c1cdb | 1799 | cc.forgetStreamID(cs.ID) |
15c0b25d AP |
1800 | cs.resc <- resAndError{err: err} |
1801 | return nil // return nil from process* funcs to keep conn alive | |
1802 | } | |
1803 | if res == nil { | |
1804 | // (nil, nil) special case. See handleResponse docs. | |
1805 | return nil | |
1806 | } | |
15c0b25d AP |
1807 | cs.resTrailer = &res.Trailer |
1808 | cs.resc <- resAndError{res: res} | |
1809 | return nil | |
1810 | } | |
1811 | ||
1812 | // may return error types nil, or ConnectionError. Any other error value | |
1813 | // is a StreamError of type ErrCodeProtocol. The returned error in that case | |
1814 | // is the detail. | |
1815 | // | |
1816 | // As a special case, handleResponse may return (nil, nil) to skip the | |
107c1cdb | 1817 | // frame (currently only used for 1xx responses). |
15c0b25d AP |
1818 | func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*http.Response, error) { |
1819 | if f.Truncated { | |
1820 | return nil, errResponseHeaderListSize | |
1821 | } | |
1822 | ||
1823 | status := f.PseudoValue("status") | |
1824 | if status == "" { | |
107c1cdb | 1825 | return nil, errors.New("malformed response from server: missing status pseudo header") |
15c0b25d AP |
1826 | } |
1827 | statusCode, err := strconv.Atoi(status) | |
1828 | if err != nil { | |
107c1cdb | 1829 | return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header") |
15c0b25d AP |
1830 | } |
1831 | ||
1832 | header := make(http.Header) | |
1833 | res := &http.Response{ | |
1834 | Proto: "HTTP/2.0", | |
1835 | ProtoMajor: 2, | |
1836 | Header: header, | |
1837 | StatusCode: statusCode, | |
1838 | Status: status + " " + http.StatusText(statusCode), | |
1839 | } | |
1840 | for _, hf := range f.RegularFields() { | |
1841 | key := http.CanonicalHeaderKey(hf.Name) | |
1842 | if key == "Trailer" { | |
1843 | t := res.Trailer | |
1844 | if t == nil { | |
1845 | t = make(http.Header) | |
1846 | res.Trailer = t | |
1847 | } | |
1848 | foreachHeaderElement(hf.Value, func(v string) { | |
1849 | t[http.CanonicalHeaderKey(v)] = nil | |
1850 | }) | |
1851 | } else { | |
1852 | header[key] = append(header[key], hf.Value) | |
1853 | } | |
1854 | } | |
1855 | ||
107c1cdb ND |
1856 | if statusCode >= 100 && statusCode <= 199 { |
1857 | cs.num1xx++ | |
1858 | const max1xxResponses = 5 // arbitrary bound on number of informational responses, same as net/http | |
1859 | if cs.num1xx > max1xxResponses { | |
1860 | return nil, errors.New("http2: too many 1xx informational responses") | |
1861 | } | |
1862 | if fn := cs.get1xxTraceFunc(); fn != nil { | |
1863 | if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil { | |
1864 | return nil, err | |
1865 | } | |
1866 | } | |
1867 | if statusCode == 100 { | |
1868 | traceGot100Continue(cs.trace) | |
1869 | if cs.on100 != nil { | |
1870 | cs.on100() // forces any write delay timer to fire | |
1871 | } | |
1872 | } | |
1873 | cs.pastHeaders = false // do it all again | |
1874 | return nil, nil | |
1875 | } | |
1876 | ||
15c0b25d AP |
1877 | streamEnded := f.StreamEnded() |
1878 | isHead := cs.req.Method == "HEAD" | |
1879 | if !streamEnded || isHead { | |
1880 | res.ContentLength = -1 | |
1881 | if clens := res.Header["Content-Length"]; len(clens) == 1 { | |
1882 | if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil { | |
1883 | res.ContentLength = clen64 | |
1884 | } else { | |
1885 | // TODO: care? unlike http/1, it won't mess up our framing, so it's | |
1886 | // more safe smuggling-wise to ignore. | |
1887 | } | |
1888 | } else if len(clens) > 1 { | |
1889 | // TODO: care? unlike http/1, it won't mess up our framing, so it's | |
1890 | // more safe smuggling-wise to ignore. | |
1891 | } | |
1892 | } | |
1893 | ||
1894 | if streamEnded || isHead { | |
1895 | res.Body = noBody | |
1896 | return res, nil | |
1897 | } | |
1898 | ||
1899 | cs.bufPipe = pipe{b: &dataBuffer{expected: res.ContentLength}} | |
1900 | cs.bytesRemain = res.ContentLength | |
1901 | res.Body = transportResponseBody{cs} | |
1902 | go cs.awaitRequestCancel(cs.req) | |
1903 | ||
1904 | if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" { | |
1905 | res.Header.Del("Content-Encoding") | |
1906 | res.Header.Del("Content-Length") | |
1907 | res.ContentLength = -1 | |
1908 | res.Body = &gzipReader{body: res.Body} | |
107c1cdb | 1909 | res.Uncompressed = true |
15c0b25d AP |
1910 | } |
1911 | return res, nil | |
1912 | } | |
1913 | ||
1914 | func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error { | |
1915 | if cs.pastTrailers { | |
1916 | // Too many HEADERS frames for this stream. | |
1917 | return ConnectionError(ErrCodeProtocol) | |
1918 | } | |
1919 | cs.pastTrailers = true | |
1920 | if !f.StreamEnded() { | |
1921 | // We expect that any headers for trailers also | |
1922 | // has END_STREAM. | |
1923 | return ConnectionError(ErrCodeProtocol) | |
1924 | } | |
1925 | if len(f.PseudoFields()) > 0 { | |
1926 | // No pseudo header fields are defined for trailers. | |
1927 | // TODO: ConnectionError might be overly harsh? Check. | |
1928 | return ConnectionError(ErrCodeProtocol) | |
1929 | } | |
1930 | ||
1931 | trailer := make(http.Header) | |
1932 | for _, hf := range f.RegularFields() { | |
1933 | key := http.CanonicalHeaderKey(hf.Name) | |
1934 | trailer[key] = append(trailer[key], hf.Value) | |
1935 | } | |
1936 | cs.trailer = trailer | |
1937 | ||
1938 | rl.endStream(cs) | |
1939 | return nil | |
1940 | } | |
1941 | ||
1942 | // transportResponseBody is the concrete type of Transport.RoundTrip's | |
1943 | // Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body. | |
1944 | // On Close it sends RST_STREAM if EOF wasn't already seen. | |
1945 | type transportResponseBody struct { | |
1946 | cs *clientStream | |
1947 | } | |
1948 | ||
1949 | func (b transportResponseBody) Read(p []byte) (n int, err error) { | |
1950 | cs := b.cs | |
1951 | cc := cs.cc | |
1952 | ||
1953 | if cs.readErr != nil { | |
1954 | return 0, cs.readErr | |
1955 | } | |
1956 | n, err = b.cs.bufPipe.Read(p) | |
1957 | if cs.bytesRemain != -1 { | |
1958 | if int64(n) > cs.bytesRemain { | |
1959 | n = int(cs.bytesRemain) | |
1960 | if err == nil { | |
1961 | err = errors.New("net/http: server replied with more than declared Content-Length; truncated") | |
1962 | cc.writeStreamReset(cs.ID, ErrCodeProtocol, err) | |
1963 | } | |
1964 | cs.readErr = err | |
1965 | return int(cs.bytesRemain), err | |
1966 | } | |
1967 | cs.bytesRemain -= int64(n) | |
1968 | if err == io.EOF && cs.bytesRemain > 0 { | |
1969 | err = io.ErrUnexpectedEOF | |
1970 | cs.readErr = err | |
1971 | return n, err | |
1972 | } | |
1973 | } | |
1974 | if n == 0 { | |
1975 | // No flow control tokens to send back. | |
1976 | return | |
1977 | } | |
1978 | ||
1979 | cc.mu.Lock() | |
1980 | defer cc.mu.Unlock() | |
1981 | ||
1982 | var connAdd, streamAdd int32 | |
1983 | // Check the conn-level first, before the stream-level. | |
1984 | if v := cc.inflow.available(); v < transportDefaultConnFlow/2 { | |
1985 | connAdd = transportDefaultConnFlow - v | |
1986 | cc.inflow.add(connAdd) | |
1987 | } | |
1988 | if err == nil { // No need to refresh if the stream is over or failed. | |
1989 | // Consider any buffered body data (read from the conn but not | |
1990 | // consumed by the client) when computing flow control for this | |
1991 | // stream. | |
1992 | v := int(cs.inflow.available()) + cs.bufPipe.Len() | |
1993 | if v < transportDefaultStreamFlow-transportDefaultStreamMinRefresh { | |
1994 | streamAdd = int32(transportDefaultStreamFlow - v) | |
1995 | cs.inflow.add(streamAdd) | |
1996 | } | |
1997 | } | |
1998 | if connAdd != 0 || streamAdd != 0 { | |
1999 | cc.wmu.Lock() | |
2000 | defer cc.wmu.Unlock() | |
2001 | if connAdd != 0 { | |
2002 | cc.fr.WriteWindowUpdate(0, mustUint31(connAdd)) | |
2003 | } | |
2004 | if streamAdd != 0 { | |
2005 | cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd)) | |
2006 | } | |
2007 | cc.bw.Flush() | |
2008 | } | |
2009 | return | |
2010 | } | |
2011 | ||
2012 | var errClosedResponseBody = errors.New("http2: response body closed") | |
2013 | ||
2014 | func (b transportResponseBody) Close() error { | |
2015 | cs := b.cs | |
2016 | cc := cs.cc | |
2017 | ||
2018 | serverSentStreamEnd := cs.bufPipe.Err() == io.EOF | |
2019 | unread := cs.bufPipe.Len() | |
2020 | ||
2021 | if unread > 0 || !serverSentStreamEnd { | |
2022 | cc.mu.Lock() | |
2023 | cc.wmu.Lock() | |
2024 | if !serverSentStreamEnd { | |
2025 | cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel) | |
2026 | cs.didReset = true | |
2027 | } | |
2028 | // Return connection-level flow control. | |
2029 | if unread > 0 { | |
2030 | cc.inflow.add(int32(unread)) | |
2031 | cc.fr.WriteWindowUpdate(0, uint32(unread)) | |
2032 | } | |
2033 | cc.bw.Flush() | |
2034 | cc.wmu.Unlock() | |
2035 | cc.mu.Unlock() | |
2036 | } | |
2037 | ||
2038 | cs.bufPipe.BreakWithError(errClosedResponseBody) | |
2039 | cc.forgetStreamID(cs.ID) | |
2040 | return nil | |
2041 | } | |
2042 | ||
2043 | func (rl *clientConnReadLoop) processData(f *DataFrame) error { | |
2044 | cc := rl.cc | |
2045 | cs := cc.streamByID(f.StreamID, f.StreamEnded()) | |
2046 | data := f.Data() | |
2047 | if cs == nil { | |
2048 | cc.mu.Lock() | |
2049 | neverSent := cc.nextStreamID | |
2050 | cc.mu.Unlock() | |
2051 | if f.StreamID >= neverSent { | |
2052 | // We never asked for this. | |
2053 | cc.logf("http2: Transport received unsolicited DATA frame; closing connection") | |
2054 | return ConnectionError(ErrCodeProtocol) | |
2055 | } | |
2056 | // We probably did ask for this, but canceled. Just ignore it. | |
2057 | // TODO: be stricter here? only silently ignore things which | |
2058 | // we canceled, but not things which were closed normally | |
2059 | // by the peer? Tough without accumulating too much state. | |
2060 | ||
2061 | // But at least return their flow control: | |
2062 | if f.Length > 0 { | |
2063 | cc.mu.Lock() | |
2064 | cc.inflow.add(int32(f.Length)) | |
2065 | cc.mu.Unlock() | |
2066 | ||
2067 | cc.wmu.Lock() | |
2068 | cc.fr.WriteWindowUpdate(0, uint32(f.Length)) | |
2069 | cc.bw.Flush() | |
2070 | cc.wmu.Unlock() | |
2071 | } | |
2072 | return nil | |
2073 | } | |
2074 | if !cs.firstByte { | |
2075 | cc.logf("protocol error: received DATA before a HEADERS frame") | |
2076 | rl.endStreamError(cs, StreamError{ | |
2077 | StreamID: f.StreamID, | |
2078 | Code: ErrCodeProtocol, | |
2079 | }) | |
2080 | return nil | |
2081 | } | |
2082 | if f.Length > 0 { | |
107c1cdb ND |
2083 | if cs.req.Method == "HEAD" && len(data) > 0 { |
2084 | cc.logf("protocol error: received DATA on a HEAD request") | |
2085 | rl.endStreamError(cs, StreamError{ | |
2086 | StreamID: f.StreamID, | |
2087 | Code: ErrCodeProtocol, | |
2088 | }) | |
2089 | return nil | |
2090 | } | |
15c0b25d AP |
2091 | // Check connection-level flow control. |
2092 | cc.mu.Lock() | |
2093 | if cs.inflow.available() >= int32(f.Length) { | |
2094 | cs.inflow.take(int32(f.Length)) | |
2095 | } else { | |
2096 | cc.mu.Unlock() | |
2097 | return ConnectionError(ErrCodeFlowControl) | |
2098 | } | |
2099 | // Return any padded flow control now, since we won't | |
2100 | // refund it later on body reads. | |
2101 | var refund int | |
2102 | if pad := int(f.Length) - len(data); pad > 0 { | |
2103 | refund += pad | |
2104 | } | |
2105 | // Return len(data) now if the stream is already closed, | |
2106 | // since data will never be read. | |
2107 | didReset := cs.didReset | |
2108 | if didReset { | |
2109 | refund += len(data) | |
2110 | } | |
2111 | if refund > 0 { | |
2112 | cc.inflow.add(int32(refund)) | |
2113 | cc.wmu.Lock() | |
2114 | cc.fr.WriteWindowUpdate(0, uint32(refund)) | |
2115 | if !didReset { | |
2116 | cs.inflow.add(int32(refund)) | |
2117 | cc.fr.WriteWindowUpdate(cs.ID, uint32(refund)) | |
2118 | } | |
2119 | cc.bw.Flush() | |
2120 | cc.wmu.Unlock() | |
2121 | } | |
2122 | cc.mu.Unlock() | |
2123 | ||
2124 | if len(data) > 0 && !didReset { | |
2125 | if _, err := cs.bufPipe.Write(data); err != nil { | |
2126 | rl.endStreamError(cs, err) | |
2127 | return err | |
2128 | } | |
2129 | } | |
2130 | } | |
2131 | ||
2132 | if f.StreamEnded() { | |
2133 | rl.endStream(cs) | |
2134 | } | |
2135 | return nil | |
2136 | } | |
2137 | ||
2138 | var errInvalidTrailers = errors.New("http2: invalid trailers") | |
2139 | ||
2140 | func (rl *clientConnReadLoop) endStream(cs *clientStream) { | |
2141 | // TODO: check that any declared content-length matches, like | |
2142 | // server.go's (*stream).endStream method. | |
2143 | rl.endStreamError(cs, nil) | |
2144 | } | |
2145 | ||
2146 | func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) { | |
2147 | var code func() | |
2148 | if err == nil { | |
2149 | err = io.EOF | |
2150 | code = cs.copyTrailers | |
2151 | } | |
15c0b25d AP |
2152 | if isConnectionCloseRequest(cs.req) { |
2153 | rl.closeWhenIdle = true | |
2154 | } | |
107c1cdb | 2155 | cs.bufPipe.closeWithErrorAndCode(err, code) |
15c0b25d AP |
2156 | |
2157 | select { | |
2158 | case cs.resc <- resAndError{err: err}: | |
2159 | default: | |
2160 | } | |
2161 | } | |
2162 | ||
2163 | func (cs *clientStream) copyTrailers() { | |
2164 | for k, vv := range cs.trailer { | |
2165 | t := cs.resTrailer | |
2166 | if *t == nil { | |
2167 | *t = make(http.Header) | |
2168 | } | |
2169 | (*t)[k] = vv | |
2170 | } | |
2171 | } | |
2172 | ||
2173 | func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error { | |
2174 | cc := rl.cc | |
2175 | cc.t.connPool().MarkDead(cc) | |
2176 | if f.ErrCode != 0 { | |
2177 | // TODO: deal with GOAWAY more. particularly the error code | |
2178 | cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode) | |
2179 | } | |
2180 | cc.setGoAway(f) | |
2181 | return nil | |
2182 | } | |
2183 | ||
2184 | func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error { | |
2185 | cc := rl.cc | |
2186 | cc.mu.Lock() | |
2187 | defer cc.mu.Unlock() | |
2188 | ||
2189 | if f.IsAck() { | |
2190 | if cc.wantSettingsAck { | |
2191 | cc.wantSettingsAck = false | |
2192 | return nil | |
2193 | } | |
2194 | return ConnectionError(ErrCodeProtocol) | |
2195 | } | |
2196 | ||
2197 | err := f.ForeachSetting(func(s Setting) error { | |
2198 | switch s.ID { | |
2199 | case SettingMaxFrameSize: | |
2200 | cc.maxFrameSize = s.Val | |
2201 | case SettingMaxConcurrentStreams: | |
2202 | cc.maxConcurrentStreams = s.Val | |
2203 | case SettingMaxHeaderListSize: | |
2204 | cc.peerMaxHeaderListSize = uint64(s.Val) | |
2205 | case SettingInitialWindowSize: | |
2206 | // Values above the maximum flow-control | |
2207 | // window size of 2^31-1 MUST be treated as a | |
2208 | // connection error (Section 5.4.1) of type | |
2209 | // FLOW_CONTROL_ERROR. | |
2210 | if s.Val > math.MaxInt32 { | |
2211 | return ConnectionError(ErrCodeFlowControl) | |
2212 | } | |
2213 | ||
2214 | // Adjust flow control of currently-open | |
2215 | // frames by the difference of the old initial | |
2216 | // window size and this one. | |
2217 | delta := int32(s.Val) - int32(cc.initialWindowSize) | |
2218 | for _, cs := range cc.streams { | |
2219 | cs.flow.add(delta) | |
2220 | } | |
2221 | cc.cond.Broadcast() | |
2222 | ||
2223 | cc.initialWindowSize = s.Val | |
2224 | default: | |
2225 | // TODO(bradfitz): handle more settings? SETTINGS_HEADER_TABLE_SIZE probably. | |
2226 | cc.vlogf("Unhandled Setting: %v", s) | |
2227 | } | |
2228 | return nil | |
2229 | }) | |
2230 | if err != nil { | |
2231 | return err | |
2232 | } | |
2233 | ||
2234 | cc.wmu.Lock() | |
2235 | defer cc.wmu.Unlock() | |
2236 | ||
2237 | cc.fr.WriteSettingsAck() | |
2238 | cc.bw.Flush() | |
2239 | return cc.werr | |
2240 | } | |
2241 | ||
2242 | func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error { | |
2243 | cc := rl.cc | |
2244 | cs := cc.streamByID(f.StreamID, false) | |
2245 | if f.StreamID != 0 && cs == nil { | |
2246 | return nil | |
2247 | } | |
2248 | ||
2249 | cc.mu.Lock() | |
2250 | defer cc.mu.Unlock() | |
2251 | ||
2252 | fl := &cc.flow | |
2253 | if cs != nil { | |
2254 | fl = &cs.flow | |
2255 | } | |
2256 | if !fl.add(int32(f.Increment)) { | |
2257 | return ConnectionError(ErrCodeFlowControl) | |
2258 | } | |
2259 | cc.cond.Broadcast() | |
2260 | return nil | |
2261 | } | |
2262 | ||
2263 | func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error { | |
2264 | cs := rl.cc.streamByID(f.StreamID, true) | |
2265 | if cs == nil { | |
2266 | // TODO: return error if server tries to RST_STEAM an idle stream | |
2267 | return nil | |
2268 | } | |
2269 | select { | |
2270 | case <-cs.peerReset: | |
2271 | // Already reset. | |
2272 | // This is the only goroutine | |
2273 | // which closes this, so there | |
2274 | // isn't a race. | |
2275 | default: | |
2276 | err := streamError(cs.ID, f.ErrCode) | |
2277 | cs.resetErr = err | |
2278 | close(cs.peerReset) | |
2279 | cs.bufPipe.CloseWithError(err) | |
2280 | cs.cc.cond.Broadcast() // wake up checkResetOrDone via clientStream.awaitFlowControl | |
2281 | } | |
15c0b25d AP |
2282 | return nil |
2283 | } | |
2284 | ||
2285 | // Ping sends a PING frame to the server and waits for the ack. | |
107c1cdb | 2286 | func (cc *ClientConn) Ping(ctx context.Context) error { |
15c0b25d AP |
2287 | c := make(chan struct{}) |
2288 | // Generate a random payload | |
2289 | var p [8]byte | |
2290 | for { | |
2291 | if _, err := rand.Read(p[:]); err != nil { | |
2292 | return err | |
2293 | } | |
2294 | cc.mu.Lock() | |
2295 | // check for dup before insert | |
2296 | if _, found := cc.pings[p]; !found { | |
2297 | cc.pings[p] = c | |
2298 | cc.mu.Unlock() | |
2299 | break | |
2300 | } | |
2301 | cc.mu.Unlock() | |
2302 | } | |
2303 | cc.wmu.Lock() | |
2304 | if err := cc.fr.WritePing(false, p); err != nil { | |
2305 | cc.wmu.Unlock() | |
2306 | return err | |
2307 | } | |
2308 | if err := cc.bw.Flush(); err != nil { | |
2309 | cc.wmu.Unlock() | |
2310 | return err | |
2311 | } | |
2312 | cc.wmu.Unlock() | |
2313 | select { | |
2314 | case <-c: | |
2315 | return nil | |
2316 | case <-ctx.Done(): | |
2317 | return ctx.Err() | |
2318 | case <-cc.readerDone: | |
2319 | // connection closed | |
2320 | return cc.readerErr | |
2321 | } | |
2322 | } | |
2323 | ||
2324 | func (rl *clientConnReadLoop) processPing(f *PingFrame) error { | |
2325 | if f.IsAck() { | |
2326 | cc := rl.cc | |
2327 | cc.mu.Lock() | |
2328 | defer cc.mu.Unlock() | |
2329 | // If ack, notify listener if any | |
2330 | if c, ok := cc.pings[f.Data]; ok { | |
2331 | close(c) | |
2332 | delete(cc.pings, f.Data) | |
2333 | } | |
2334 | return nil | |
2335 | } | |
2336 | cc := rl.cc | |
2337 | cc.wmu.Lock() | |
2338 | defer cc.wmu.Unlock() | |
2339 | if err := cc.fr.WritePing(true, f.Data); err != nil { | |
2340 | return err | |
2341 | } | |
2342 | return cc.bw.Flush() | |
2343 | } | |
2344 | ||
2345 | func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error { | |
2346 | // We told the peer we don't want them. | |
2347 | // Spec says: | |
2348 | // "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH | |
2349 | // setting of the peer endpoint is set to 0. An endpoint that | |
2350 | // has set this setting and has received acknowledgement MUST | |
2351 | // treat the receipt of a PUSH_PROMISE frame as a connection | |
2352 | // error (Section 5.4.1) of type PROTOCOL_ERROR." | |
2353 | return ConnectionError(ErrCodeProtocol) | |
2354 | } | |
2355 | ||
2356 | func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) { | |
2357 | // TODO: map err to more interesting error codes, once the | |
2358 | // HTTP community comes up with some. But currently for | |
2359 | // RST_STREAM there's no equivalent to GOAWAY frame's debug | |
2360 | // data, and the error codes are all pretty vague ("cancel"). | |
2361 | cc.wmu.Lock() | |
2362 | cc.fr.WriteRSTStream(streamID, code) | |
2363 | cc.bw.Flush() | |
2364 | cc.wmu.Unlock() | |
2365 | } | |
2366 | ||
2367 | var ( | |
2368 | errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit") | |
2369 | errRequestHeaderListSize = errors.New("http2: request header list larger than peer's advertised limit") | |
2370 | errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers") | |
2371 | ) | |
2372 | ||
2373 | func (cc *ClientConn) logf(format string, args ...interface{}) { | |
2374 | cc.t.logf(format, args...) | |
2375 | } | |
2376 | ||
2377 | func (cc *ClientConn) vlogf(format string, args ...interface{}) { | |
2378 | cc.t.vlogf(format, args...) | |
2379 | } | |
2380 | ||
2381 | func (t *Transport) vlogf(format string, args ...interface{}) { | |
2382 | if VerboseLogs { | |
2383 | t.logf(format, args...) | |
2384 | } | |
2385 | } | |
2386 | ||
2387 | func (t *Transport) logf(format string, args ...interface{}) { | |
2388 | log.Printf(format, args...) | |
2389 | } | |
2390 | ||
2391 | var noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil)) | |
2392 | ||
2393 | func strSliceContains(ss []string, s string) bool { | |
2394 | for _, v := range ss { | |
2395 | if v == s { | |
2396 | return true | |
2397 | } | |
2398 | } | |
2399 | return false | |
2400 | } | |
2401 | ||
2402 | type erringRoundTripper struct{ err error } | |
2403 | ||
2404 | func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err } | |
2405 | ||
2406 | // gzipReader wraps a response body so it can lazily | |
2407 | // call gzip.NewReader on the first call to Read | |
2408 | type gzipReader struct { | |
2409 | body io.ReadCloser // underlying Response.Body | |
2410 | zr *gzip.Reader // lazily-initialized gzip reader | |
2411 | zerr error // sticky error | |
2412 | } | |
2413 | ||
2414 | func (gz *gzipReader) Read(p []byte) (n int, err error) { | |
2415 | if gz.zerr != nil { | |
2416 | return 0, gz.zerr | |
2417 | } | |
2418 | if gz.zr == nil { | |
2419 | gz.zr, err = gzip.NewReader(gz.body) | |
2420 | if err != nil { | |
2421 | gz.zerr = err | |
2422 | return 0, err | |
2423 | } | |
2424 | } | |
2425 | return gz.zr.Read(p) | |
2426 | } | |
2427 | ||
2428 | func (gz *gzipReader) Close() error { | |
2429 | return gz.body.Close() | |
2430 | } | |
2431 | ||
2432 | type errorReader struct{ err error } | |
2433 | ||
2434 | func (r errorReader) Read(p []byte) (int, error) { return 0, r.err } | |
2435 | ||
2436 | // bodyWriterState encapsulates various state around the Transport's writing | |
2437 | // of the request body, particularly regarding doing delayed writes of the body | |
2438 | // when the request contains "Expect: 100-continue". | |
2439 | type bodyWriterState struct { | |
2440 | cs *clientStream | |
2441 | timer *time.Timer // if non-nil, we're doing a delayed write | |
2442 | fnonce *sync.Once // to call fn with | |
2443 | fn func() // the code to run in the goroutine, writing the body | |
2444 | resc chan error // result of fn's execution | |
2445 | delay time.Duration // how long we should delay a delayed write for | |
2446 | } | |
2447 | ||
2448 | func (t *Transport) getBodyWriterState(cs *clientStream, body io.Reader) (s bodyWriterState) { | |
2449 | s.cs = cs | |
2450 | if body == nil { | |
2451 | return | |
2452 | } | |
2453 | resc := make(chan error, 1) | |
2454 | s.resc = resc | |
2455 | s.fn = func() { | |
2456 | cs.cc.mu.Lock() | |
2457 | cs.startedWrite = true | |
2458 | cs.cc.mu.Unlock() | |
2459 | resc <- cs.writeRequestBody(body, cs.req.Body) | |
2460 | } | |
2461 | s.delay = t.expectContinueTimeout() | |
2462 | if s.delay == 0 || | |
107c1cdb | 2463 | !httpguts.HeaderValuesContainsToken( |
15c0b25d AP |
2464 | cs.req.Header["Expect"], |
2465 | "100-continue") { | |
2466 | return | |
2467 | } | |
2468 | s.fnonce = new(sync.Once) | |
2469 | ||
2470 | // Arm the timer with a very large duration, which we'll | |
2471 | // intentionally lower later. It has to be large now because | |
2472 | // we need a handle to it before writing the headers, but the | |
2473 | // s.delay value is defined to not start until after the | |
2474 | // request headers were written. | |
2475 | const hugeDuration = 365 * 24 * time.Hour | |
2476 | s.timer = time.AfterFunc(hugeDuration, func() { | |
2477 | s.fnonce.Do(s.fn) | |
2478 | }) | |
2479 | return | |
2480 | } | |
2481 | ||
2482 | func (s bodyWriterState) cancel() { | |
2483 | if s.timer != nil { | |
2484 | s.timer.Stop() | |
2485 | } | |
2486 | } | |
2487 | ||
2488 | func (s bodyWriterState) on100() { | |
2489 | if s.timer == nil { | |
2490 | // If we didn't do a delayed write, ignore the server's | |
2491 | // bogus 100 continue response. | |
2492 | return | |
2493 | } | |
2494 | s.timer.Stop() | |
2495 | go func() { s.fnonce.Do(s.fn) }() | |
2496 | } | |
2497 | ||
2498 | // scheduleBodyWrite starts writing the body, either immediately (in | |
2499 | // the common case) or after the delay timeout. It should not be | |
2500 | // called until after the headers have been written. | |
2501 | func (s bodyWriterState) scheduleBodyWrite() { | |
2502 | if s.timer == nil { | |
2503 | // We're not doing a delayed write (see | |
2504 | // getBodyWriterState), so just start the writing | |
2505 | // goroutine immediately. | |
2506 | go s.fn() | |
2507 | return | |
2508 | } | |
2509 | traceWait100Continue(s.cs.trace) | |
2510 | if s.timer.Stop() { | |
2511 | s.timer.Reset(s.delay) | |
2512 | } | |
2513 | } | |
2514 | ||
2515 | // isConnectionCloseRequest reports whether req should use its own | |
2516 | // connection for a single request and then close the connection. | |
2517 | func isConnectionCloseRequest(req *http.Request) bool { | |
107c1cdb ND |
2518 | return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close") |
2519 | } | |
2520 | ||
2521 | // registerHTTPSProtocol calls Transport.RegisterProtocol but | |
2522 | // converting panics into errors. | |
2523 | func registerHTTPSProtocol(t *http.Transport, rt noDialH2RoundTripper) (err error) { | |
2524 | defer func() { | |
2525 | if e := recover(); e != nil { | |
2526 | err = fmt.Errorf("%v", e) | |
2527 | } | |
2528 | }() | |
2529 | t.RegisterProtocol("https", rt) | |
2530 | return nil | |
2531 | } | |
2532 | ||
2533 | // noDialH2RoundTripper is a RoundTripper which only tries to complete the request | |
2534 | // if there's already has a cached connection to the host. | |
2535 | // (The field is exported so it can be accessed via reflect from net/http; tested | |
2536 | // by TestNoDialH2RoundTripperType) | |
2537 | type noDialH2RoundTripper struct{ *Transport } | |
2538 | ||
2539 | func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { | |
2540 | res, err := rt.Transport.RoundTrip(req) | |
2541 | if isNoCachedConnError(err) { | |
2542 | return nil, http.ErrSkipAltProtocol | |
2543 | } | |
2544 | return res, err | |
2545 | } | |
2546 | ||
2547 | func (t *Transport) idleConnTimeout() time.Duration { | |
2548 | if t.t1 != nil { | |
2549 | return t.t1.IdleConnTimeout | |
2550 | } | |
2551 | return 0 | |
2552 | } | |
2553 | ||
2554 | func traceGetConn(req *http.Request, hostPort string) { | |
2555 | trace := httptrace.ContextClientTrace(req.Context()) | |
2556 | if trace == nil || trace.GetConn == nil { | |
2557 | return | |
2558 | } | |
2559 | trace.GetConn(hostPort) | |
2560 | } | |
2561 | ||
2562 | func traceGotConn(req *http.Request, cc *ClientConn) { | |
2563 | trace := httptrace.ContextClientTrace(req.Context()) | |
2564 | if trace == nil || trace.GotConn == nil { | |
2565 | return | |
2566 | } | |
2567 | ci := httptrace.GotConnInfo{Conn: cc.tconn} | |
2568 | cc.mu.Lock() | |
2569 | ci.Reused = cc.nextStreamID > 1 | |
2570 | ci.WasIdle = len(cc.streams) == 0 && ci.Reused | |
2571 | if ci.WasIdle && !cc.lastActive.IsZero() { | |
2572 | ci.IdleTime = time.Now().Sub(cc.lastActive) | |
2573 | } | |
2574 | cc.mu.Unlock() | |
2575 | ||
2576 | trace.GotConn(ci) | |
2577 | } | |
2578 | ||
2579 | func traceWroteHeaders(trace *httptrace.ClientTrace) { | |
2580 | if trace != nil && trace.WroteHeaders != nil { | |
2581 | trace.WroteHeaders() | |
2582 | } | |
2583 | } | |
2584 | ||
2585 | func traceGot100Continue(trace *httptrace.ClientTrace) { | |
2586 | if trace != nil && trace.Got100Continue != nil { | |
2587 | trace.Got100Continue() | |
2588 | } | |
2589 | } | |
2590 | ||
2591 | func traceWait100Continue(trace *httptrace.ClientTrace) { | |
2592 | if trace != nil && trace.Wait100Continue != nil { | |
2593 | trace.Wait100Continue() | |
2594 | } | |
2595 | } | |
2596 | ||
2597 | func traceWroteRequest(trace *httptrace.ClientTrace, err error) { | |
2598 | if trace != nil && trace.WroteRequest != nil { | |
2599 | trace.WroteRequest(httptrace.WroteRequestInfo{Err: err}) | |
2600 | } | |
2601 | } | |
2602 | ||
2603 | func traceFirstResponseByte(trace *httptrace.ClientTrace) { | |
2604 | if trace != nil && trace.GotFirstResponseByte != nil { | |
2605 | trace.GotFirstResponseByte() | |
2606 | } | |
15c0b25d | 2607 | } |