]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | // Copyright 2014 The Go Authors. All rights reserved. |
2 | // Use of this source code is governed by a BSD-style | |
3 | // license that can be found in the LICENSE file. | |
4 | ||
5 | // TODO: turn off the serve goroutine when idle, so | |
6 | // an idle conn only has the readFrames goroutine active. (which could | |
7 | // also be optimized probably to pin less memory in crypto/tls). This | |
8 | // would involve tracking when the serve goroutine is active (atomic | |
9 | // int32 read/CAS probably?) and starting it up when frames arrive, | |
10 | // and shutting it down when all handlers exit. the occasional PING | |
11 | // packets could use time.AfterFunc to call sc.wakeStartServeLoop() | |
12 | // (which is a no-op if already running) and then queue the PING write | |
13 | // as normal. The serve loop would then exit in most cases (if no | |
14 | // Handlers running) and not be woken up again until the PING packet | |
15 | // returns. | |
16 | ||
17 | // TODO (maybe): add a mechanism for Handlers to going into | |
18 | // half-closed-local mode (rw.(io.Closer) test?) but not exit their | |
19 | // handler, and continue to be able to read from the | |
20 | // Request.Body. This would be a somewhat semantic change from HTTP/1 | |
21 | // (or at least what we expose in net/http), so I'd probably want to | |
22 | // add it there too. For now, this package says that returning from | |
23 | // the Handler ServeHTTP function means you're both done reading and | |
24 | // done writing, without a way to stop just one or the other. | |
25 | ||
26 | package http2 | |
27 | ||
28 | import ( | |
29 | "bufio" | |
30 | "bytes" | |
107c1cdb | 31 | "context" |
15c0b25d AP |
32 | "crypto/tls" |
33 | "errors" | |
34 | "fmt" | |
35 | "io" | |
36 | "log" | |
37 | "math" | |
38 | "net" | |
39 | "net/http" | |
40 | "net/textproto" | |
41 | "net/url" | |
42 | "os" | |
43 | "reflect" | |
44 | "runtime" | |
45 | "strconv" | |
46 | "strings" | |
47 | "sync" | |
48 | "time" | |
49 | ||
107c1cdb | 50 | "golang.org/x/net/http/httpguts" |
15c0b25d AP |
51 | "golang.org/x/net/http2/hpack" |
52 | ) | |
53 | ||
54 | const ( | |
55 | prefaceTimeout = 10 * time.Second | |
56 | firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway | |
57 | handlerChunkWriteSize = 4 << 10 | |
58 | defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to? | |
59 | ) | |
60 | ||
61 | var ( | |
62 | errClientDisconnected = errors.New("client disconnected") | |
63 | errClosedBody = errors.New("body closed by handler") | |
64 | errHandlerComplete = errors.New("http2: request body closed due to handler exiting") | |
65 | errStreamClosed = errors.New("http2: stream closed") | |
66 | ) | |
67 | ||
68 | var responseWriterStatePool = sync.Pool{ | |
69 | New: func() interface{} { | |
70 | rws := &responseWriterState{} | |
71 | rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize) | |
72 | return rws | |
73 | }, | |
74 | } | |
75 | ||
76 | // Test hooks. | |
77 | var ( | |
78 | testHookOnConn func() | |
79 | testHookGetServerConn func(*serverConn) | |
80 | testHookOnPanicMu *sync.Mutex // nil except in tests | |
81 | testHookOnPanic func(sc *serverConn, panicVal interface{}) (rePanic bool) | |
82 | ) | |
83 | ||
84 | // Server is an HTTP/2 server. | |
85 | type Server struct { | |
86 | // MaxHandlers limits the number of http.Handler ServeHTTP goroutines | |
87 | // which may run at a time over all connections. | |
88 | // Negative or zero no limit. | |
89 | // TODO: implement | |
90 | MaxHandlers int | |
91 | ||
92 | // MaxConcurrentStreams optionally specifies the number of | |
93 | // concurrent streams that each client may have open at a | |
94 | // time. This is unrelated to the number of http.Handler goroutines | |
95 | // which may be active globally, which is MaxHandlers. | |
96 | // If zero, MaxConcurrentStreams defaults to at least 100, per | |
97 | // the HTTP/2 spec's recommendations. | |
98 | MaxConcurrentStreams uint32 | |
99 | ||
100 | // MaxReadFrameSize optionally specifies the largest frame | |
101 | // this server is willing to read. A valid value is between | |
102 | // 16k and 16M, inclusive. If zero or otherwise invalid, a | |
103 | // default value is used. | |
104 | MaxReadFrameSize uint32 | |
105 | ||
106 | // PermitProhibitedCipherSuites, if true, permits the use of | |
107 | // cipher suites prohibited by the HTTP/2 spec. | |
108 | PermitProhibitedCipherSuites bool | |
109 | ||
110 | // IdleTimeout specifies how long until idle clients should be | |
111 | // closed with a GOAWAY frame. PING frames are not considered | |
112 | // activity for the purposes of IdleTimeout. | |
113 | IdleTimeout time.Duration | |
114 | ||
115 | // MaxUploadBufferPerConnection is the size of the initial flow | |
116 | // control window for each connections. The HTTP/2 spec does not | |
117 | // allow this to be smaller than 65535 or larger than 2^32-1. | |
118 | // If the value is outside this range, a default value will be | |
119 | // used instead. | |
120 | MaxUploadBufferPerConnection int32 | |
121 | ||
122 | // MaxUploadBufferPerStream is the size of the initial flow control | |
123 | // window for each stream. The HTTP/2 spec does not allow this to | |
124 | // be larger than 2^32-1. If the value is zero or larger than the | |
125 | // maximum, a default value will be used instead. | |
126 | MaxUploadBufferPerStream int32 | |
127 | ||
128 | // NewWriteScheduler constructs a write scheduler for a connection. | |
129 | // If nil, a default scheduler is chosen. | |
130 | NewWriteScheduler func() WriteScheduler | |
131 | ||
132 | // Internal state. This is a pointer (rather than embedded directly) | |
133 | // so that we don't embed a Mutex in this struct, which will make the | |
134 | // struct non-copyable, which might break some callers. | |
135 | state *serverInternalState | |
136 | } | |
137 | ||
138 | func (s *Server) initialConnRecvWindowSize() int32 { | |
139 | if s.MaxUploadBufferPerConnection > initialWindowSize { | |
140 | return s.MaxUploadBufferPerConnection | |
141 | } | |
142 | return 1 << 20 | |
143 | } | |
144 | ||
145 | func (s *Server) initialStreamRecvWindowSize() int32 { | |
146 | if s.MaxUploadBufferPerStream > 0 { | |
147 | return s.MaxUploadBufferPerStream | |
148 | } | |
149 | return 1 << 20 | |
150 | } | |
151 | ||
152 | func (s *Server) maxReadFrameSize() uint32 { | |
153 | if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize { | |
154 | return v | |
155 | } | |
156 | return defaultMaxReadFrameSize | |
157 | } | |
158 | ||
159 | func (s *Server) maxConcurrentStreams() uint32 { | |
160 | if v := s.MaxConcurrentStreams; v > 0 { | |
161 | return v | |
162 | } | |
163 | return defaultMaxStreams | |
164 | } | |
165 | ||
166 | type serverInternalState struct { | |
167 | mu sync.Mutex | |
168 | activeConns map[*serverConn]struct{} | |
169 | } | |
170 | ||
171 | func (s *serverInternalState) registerConn(sc *serverConn) { | |
172 | if s == nil { | |
173 | return // if the Server was used without calling ConfigureServer | |
174 | } | |
175 | s.mu.Lock() | |
176 | s.activeConns[sc] = struct{}{} | |
177 | s.mu.Unlock() | |
178 | } | |
179 | ||
180 | func (s *serverInternalState) unregisterConn(sc *serverConn) { | |
181 | if s == nil { | |
182 | return // if the Server was used without calling ConfigureServer | |
183 | } | |
184 | s.mu.Lock() | |
185 | delete(s.activeConns, sc) | |
186 | s.mu.Unlock() | |
187 | } | |
188 | ||
189 | func (s *serverInternalState) startGracefulShutdown() { | |
190 | if s == nil { | |
191 | return // if the Server was used without calling ConfigureServer | |
192 | } | |
193 | s.mu.Lock() | |
194 | for sc := range s.activeConns { | |
195 | sc.startGracefulShutdown() | |
196 | } | |
197 | s.mu.Unlock() | |
198 | } | |
199 | ||
200 | // ConfigureServer adds HTTP/2 support to a net/http Server. | |
201 | // | |
202 | // The configuration conf may be nil. | |
203 | // | |
204 | // ConfigureServer must be called before s begins serving. | |
205 | func ConfigureServer(s *http.Server, conf *Server) error { | |
206 | if s == nil { | |
207 | panic("nil *http.Server") | |
208 | } | |
209 | if conf == nil { | |
210 | conf = new(Server) | |
211 | } | |
212 | conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})} | |
107c1cdb ND |
213 | if h1, h2 := s, conf; h2.IdleTimeout == 0 { |
214 | if h1.IdleTimeout != 0 { | |
215 | h2.IdleTimeout = h1.IdleTimeout | |
216 | } else { | |
217 | h2.IdleTimeout = h1.ReadTimeout | |
218 | } | |
15c0b25d | 219 | } |
107c1cdb | 220 | s.RegisterOnShutdown(conf.state.startGracefulShutdown) |
15c0b25d AP |
221 | |
222 | if s.TLSConfig == nil { | |
223 | s.TLSConfig = new(tls.Config) | |
224 | } else if s.TLSConfig.CipherSuites != nil { | |
225 | // If they already provided a CipherSuite list, return | |
226 | // an error if it has a bad order or is missing | |
107c1cdb | 227 | // ECDHE_RSA_WITH_AES_128_GCM_SHA256 or ECDHE_ECDSA_WITH_AES_128_GCM_SHA256. |
15c0b25d AP |
228 | haveRequired := false |
229 | sawBad := false | |
230 | for i, cs := range s.TLSConfig.CipherSuites { | |
107c1cdb ND |
231 | switch cs { |
232 | case tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, | |
233 | // Alternative MTI cipher to not discourage ECDSA-only servers. | |
234 | // See http://golang.org/cl/30721 for further information. | |
235 | tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256: | |
15c0b25d AP |
236 | haveRequired = true |
237 | } | |
238 | if isBadCipher(cs) { | |
239 | sawBad = true | |
240 | } else if sawBad { | |
241 | return fmt.Errorf("http2: TLSConfig.CipherSuites index %d contains an HTTP/2-approved cipher suite (%#04x), but it comes after unapproved cipher suites. With this configuration, clients that don't support previous, approved cipher suites may be given an unapproved one and reject the connection.", i, cs) | |
242 | } | |
243 | } | |
244 | if !haveRequired { | |
107c1cdb | 245 | return fmt.Errorf("http2: TLSConfig.CipherSuites is missing an HTTP/2-required AES_128_GCM_SHA256 cipher.") |
15c0b25d AP |
246 | } |
247 | } | |
248 | ||
249 | // Note: not setting MinVersion to tls.VersionTLS12, | |
250 | // as we don't want to interfere with HTTP/1.1 traffic | |
251 | // on the user's server. We enforce TLS 1.2 later once | |
252 | // we accept a connection. Ideally this should be done | |
253 | // during next-proto selection, but using TLS <1.2 with | |
254 | // HTTP/2 is still the client's bug. | |
255 | ||
256 | s.TLSConfig.PreferServerCipherSuites = true | |
257 | ||
258 | haveNPN := false | |
259 | for _, p := range s.TLSConfig.NextProtos { | |
260 | if p == NextProtoTLS { | |
261 | haveNPN = true | |
262 | break | |
263 | } | |
264 | } | |
265 | if !haveNPN { | |
266 | s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS) | |
267 | } | |
268 | ||
269 | if s.TLSNextProto == nil { | |
270 | s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){} | |
271 | } | |
272 | protoHandler := func(hs *http.Server, c *tls.Conn, h http.Handler) { | |
273 | if testHookOnConn != nil { | |
274 | testHookOnConn() | |
275 | } | |
276 | conf.ServeConn(c, &ServeConnOpts{ | |
277 | Handler: h, | |
278 | BaseConfig: hs, | |
279 | }) | |
280 | } | |
281 | s.TLSNextProto[NextProtoTLS] = protoHandler | |
282 | return nil | |
283 | } | |
284 | ||
285 | // ServeConnOpts are options for the Server.ServeConn method. | |
286 | type ServeConnOpts struct { | |
287 | // BaseConfig optionally sets the base configuration | |
288 | // for values. If nil, defaults are used. | |
289 | BaseConfig *http.Server | |
290 | ||
291 | // Handler specifies which handler to use for processing | |
292 | // requests. If nil, BaseConfig.Handler is used. If BaseConfig | |
293 | // or BaseConfig.Handler is nil, http.DefaultServeMux is used. | |
294 | Handler http.Handler | |
295 | } | |
296 | ||
297 | func (o *ServeConnOpts) baseConfig() *http.Server { | |
298 | if o != nil && o.BaseConfig != nil { | |
299 | return o.BaseConfig | |
300 | } | |
301 | return new(http.Server) | |
302 | } | |
303 | ||
304 | func (o *ServeConnOpts) handler() http.Handler { | |
305 | if o != nil { | |
306 | if o.Handler != nil { | |
307 | return o.Handler | |
308 | } | |
309 | if o.BaseConfig != nil && o.BaseConfig.Handler != nil { | |
310 | return o.BaseConfig.Handler | |
311 | } | |
312 | } | |
313 | return http.DefaultServeMux | |
314 | } | |
315 | ||
316 | // ServeConn serves HTTP/2 requests on the provided connection and | |
317 | // blocks until the connection is no longer readable. | |
318 | // | |
319 | // ServeConn starts speaking HTTP/2 assuming that c has not had any | |
320 | // reads or writes. It writes its initial settings frame and expects | |
321 | // to be able to read the preface and settings frame from the | |
322 | // client. If c has a ConnectionState method like a *tls.Conn, the | |
323 | // ConnectionState is used to verify the TLS ciphersuite and to set | |
324 | // the Request.TLS field in Handlers. | |
325 | // | |
326 | // ServeConn does not support h2c by itself. Any h2c support must be | |
327 | // implemented in terms of providing a suitably-behaving net.Conn. | |
328 | // | |
329 | // The opts parameter is optional. If nil, default values are used. | |
330 | func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) { | |
331 | baseCtx, cancel := serverConnBaseContext(c, opts) | |
332 | defer cancel() | |
333 | ||
334 | sc := &serverConn{ | |
335 | srv: s, | |
336 | hs: opts.baseConfig(), | |
337 | conn: c, | |
338 | baseCtx: baseCtx, | |
339 | remoteAddrStr: c.RemoteAddr().String(), | |
340 | bw: newBufferedWriter(c), | |
341 | handler: opts.handler(), | |
342 | streams: make(map[uint32]*stream), | |
343 | readFrameCh: make(chan readFrameResult), | |
344 | wantWriteFrameCh: make(chan FrameWriteRequest, 8), | |
345 | serveMsgCh: make(chan interface{}, 8), | |
346 | wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync | |
347 | bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way | |
348 | doneServing: make(chan struct{}), | |
349 | clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value" | |
350 | advMaxStreams: s.maxConcurrentStreams(), | |
351 | initialStreamSendWindowSize: initialWindowSize, | |
352 | maxFrameSize: initialMaxFrameSize, | |
353 | headerTableSize: initialHeaderTableSize, | |
354 | serveG: newGoroutineLock(), | |
355 | pushEnabled: true, | |
356 | } | |
357 | ||
358 | s.state.registerConn(sc) | |
359 | defer s.state.unregisterConn(sc) | |
360 | ||
361 | // The net/http package sets the write deadline from the | |
362 | // http.Server.WriteTimeout during the TLS handshake, but then | |
363 | // passes the connection off to us with the deadline already set. | |
364 | // Write deadlines are set per stream in serverConn.newStream. | |
365 | // Disarm the net.Conn write deadline here. | |
366 | if sc.hs.WriteTimeout != 0 { | |
367 | sc.conn.SetWriteDeadline(time.Time{}) | |
368 | } | |
369 | ||
370 | if s.NewWriteScheduler != nil { | |
371 | sc.writeSched = s.NewWriteScheduler() | |
372 | } else { | |
373 | sc.writeSched = NewRandomWriteScheduler() | |
374 | } | |
375 | ||
376 | // These start at the RFC-specified defaults. If there is a higher | |
377 | // configured value for inflow, that will be updated when we send a | |
378 | // WINDOW_UPDATE shortly after sending SETTINGS. | |
379 | sc.flow.add(initialWindowSize) | |
380 | sc.inflow.add(initialWindowSize) | |
381 | sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf) | |
382 | ||
383 | fr := NewFramer(sc.bw, c) | |
384 | fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil) | |
385 | fr.MaxHeaderListSize = sc.maxHeaderListSize() | |
386 | fr.SetMaxReadFrameSize(s.maxReadFrameSize()) | |
387 | sc.framer = fr | |
388 | ||
389 | if tc, ok := c.(connectionStater); ok { | |
390 | sc.tlsState = new(tls.ConnectionState) | |
391 | *sc.tlsState = tc.ConnectionState() | |
392 | // 9.2 Use of TLS Features | |
393 | // An implementation of HTTP/2 over TLS MUST use TLS | |
394 | // 1.2 or higher with the restrictions on feature set | |
395 | // and cipher suite described in this section. Due to | |
396 | // implementation limitations, it might not be | |
397 | // possible to fail TLS negotiation. An endpoint MUST | |
398 | // immediately terminate an HTTP/2 connection that | |
399 | // does not meet the TLS requirements described in | |
400 | // this section with a connection error (Section | |
401 | // 5.4.1) of type INADEQUATE_SECURITY. | |
402 | if sc.tlsState.Version < tls.VersionTLS12 { | |
403 | sc.rejectConn(ErrCodeInadequateSecurity, "TLS version too low") | |
404 | return | |
405 | } | |
406 | ||
407 | if sc.tlsState.ServerName == "" { | |
408 | // Client must use SNI, but we don't enforce that anymore, | |
409 | // since it was causing problems when connecting to bare IP | |
410 | // addresses during development. | |
411 | // | |
412 | // TODO: optionally enforce? Or enforce at the time we receive | |
107c1cdb | 413 | // a new request, and verify the ServerName matches the :authority? |
15c0b25d AP |
414 | // But that precludes proxy situations, perhaps. |
415 | // | |
416 | // So for now, do nothing here again. | |
417 | } | |
418 | ||
419 | if !s.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) { | |
420 | // "Endpoints MAY choose to generate a connection error | |
421 | // (Section 5.4.1) of type INADEQUATE_SECURITY if one of | |
422 | // the prohibited cipher suites are negotiated." | |
423 | // | |
424 | // We choose that. In my opinion, the spec is weak | |
425 | // here. It also says both parties must support at least | |
426 | // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no | |
427 | // excuses here. If we really must, we could allow an | |
428 | // "AllowInsecureWeakCiphers" option on the server later. | |
429 | // Let's see how it plays out first. | |
430 | sc.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite)) | |
431 | return | |
432 | } | |
433 | } | |
434 | ||
435 | if hook := testHookGetServerConn; hook != nil { | |
436 | hook(sc) | |
437 | } | |
438 | sc.serve() | |
439 | } | |
440 | ||
107c1cdb ND |
441 | func serverConnBaseContext(c net.Conn, opts *ServeConnOpts) (ctx context.Context, cancel func()) { |
442 | ctx, cancel = context.WithCancel(context.Background()) | |
443 | ctx = context.WithValue(ctx, http.LocalAddrContextKey, c.LocalAddr()) | |
444 | if hs := opts.baseConfig(); hs != nil { | |
445 | ctx = context.WithValue(ctx, http.ServerContextKey, hs) | |
446 | } | |
447 | return | |
448 | } | |
449 | ||
15c0b25d AP |
450 | func (sc *serverConn) rejectConn(err ErrCode, debug string) { |
451 | sc.vlogf("http2: server rejecting conn: %v, %s", err, debug) | |
452 | // ignoring errors. hanging up anyway. | |
453 | sc.framer.WriteGoAway(0, err, []byte(debug)) | |
454 | sc.bw.Flush() | |
455 | sc.conn.Close() | |
456 | } | |
457 | ||
458 | type serverConn struct { | |
459 | // Immutable: | |
460 | srv *Server | |
461 | hs *http.Server | |
462 | conn net.Conn | |
463 | bw *bufferedWriter // writing to conn | |
464 | handler http.Handler | |
107c1cdb | 465 | baseCtx context.Context |
15c0b25d AP |
466 | framer *Framer |
467 | doneServing chan struct{} // closed when serverConn.serve ends | |
468 | readFrameCh chan readFrameResult // written by serverConn.readFrames | |
469 | wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve | |
470 | wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes | |
471 | bodyReadCh chan bodyReadMsg // from handlers -> serve | |
472 | serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop | |
473 | flow flow // conn-wide (not stream-specific) outbound flow control | |
474 | inflow flow // conn-wide inbound flow control | |
475 | tlsState *tls.ConnectionState // shared by all handlers, like net/http | |
476 | remoteAddrStr string | |
477 | writeSched WriteScheduler | |
478 | ||
479 | // Everything following is owned by the serve loop; use serveG.check(): | |
480 | serveG goroutineLock // used to verify funcs are on serve() | |
481 | pushEnabled bool | |
482 | sawFirstSettings bool // got the initial SETTINGS frame after the preface | |
483 | needToSendSettingsAck bool | |
484 | unackedSettings int // how many SETTINGS have we sent without ACKs? | |
485 | clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit) | |
486 | advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client | |
487 | curClientStreams uint32 // number of open streams initiated by the client | |
488 | curPushedStreams uint32 // number of open streams initiated by server push | |
489 | maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests | |
490 | maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes | |
491 | streams map[uint32]*stream | |
492 | initialStreamSendWindowSize int32 | |
493 | maxFrameSize int32 | |
494 | headerTableSize uint32 | |
495 | peerMaxHeaderListSize uint32 // zero means unknown (default) | |
496 | canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case | |
497 | writingFrame bool // started writing a frame (on serve goroutine or separate) | |
498 | writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh | |
499 | needsFrameFlush bool // last frame write wasn't a flush | |
500 | inGoAway bool // we've started to or sent GOAWAY | |
501 | inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop | |
502 | needToSendGoAway bool // we need to schedule a GOAWAY frame write | |
503 | goAwayCode ErrCode | |
504 | shutdownTimer *time.Timer // nil until used | |
505 | idleTimer *time.Timer // nil if unused | |
506 | ||
507 | // Owned by the writeFrameAsync goroutine: | |
508 | headerWriteBuf bytes.Buffer | |
509 | hpackEncoder *hpack.Encoder | |
510 | ||
511 | // Used by startGracefulShutdown. | |
512 | shutdownOnce sync.Once | |
513 | } | |
514 | ||
515 | func (sc *serverConn) maxHeaderListSize() uint32 { | |
516 | n := sc.hs.MaxHeaderBytes | |
517 | if n <= 0 { | |
518 | n = http.DefaultMaxHeaderBytes | |
519 | } | |
520 | // http2's count is in a slightly different unit and includes 32 bytes per pair. | |
521 | // So, take the net/http.Server value and pad it up a bit, assuming 10 headers. | |
522 | const perFieldOverhead = 32 // per http2 spec | |
523 | const typicalHeaders = 10 // conservative | |
524 | return uint32(n + typicalHeaders*perFieldOverhead) | |
525 | } | |
526 | ||
527 | func (sc *serverConn) curOpenStreams() uint32 { | |
528 | sc.serveG.check() | |
529 | return sc.curClientStreams + sc.curPushedStreams | |
530 | } | |
531 | ||
532 | // stream represents a stream. This is the minimal metadata needed by | |
533 | // the serve goroutine. Most of the actual stream state is owned by | |
534 | // the http.Handler's goroutine in the responseWriter. Because the | |
535 | // responseWriter's responseWriterState is recycled at the end of a | |
536 | // handler, this struct intentionally has no pointer to the | |
537 | // *responseWriter{,State} itself, as the Handler ending nils out the | |
538 | // responseWriter's state field. | |
539 | type stream struct { | |
540 | // immutable: | |
541 | sc *serverConn | |
542 | id uint32 | |
543 | body *pipe // non-nil if expecting DATA frames | |
544 | cw closeWaiter // closed wait stream transitions to closed state | |
107c1cdb | 545 | ctx context.Context |
15c0b25d AP |
546 | cancelCtx func() |
547 | ||
548 | // owned by serverConn's serve loop: | |
549 | bodyBytes int64 // body bytes seen so far | |
550 | declBodyBytes int64 // or -1 if undeclared | |
551 | flow flow // limits writing from Handler to client | |
552 | inflow flow // what the client is allowed to POST/etc to us | |
553 | parent *stream // or nil | |
554 | numTrailerValues int64 | |
555 | weight uint8 | |
556 | state streamState | |
557 | resetQueued bool // RST_STREAM queued for write; set by sc.resetStream | |
558 | gotTrailerHeader bool // HEADER frame for trailers was seen | |
559 | wroteHeaders bool // whether we wrote headers (not status 100) | |
560 | writeDeadline *time.Timer // nil if unused | |
561 | ||
562 | trailer http.Header // accumulated trailers | |
563 | reqTrailer http.Header // handler's Request.Trailer | |
564 | } | |
565 | ||
566 | func (sc *serverConn) Framer() *Framer { return sc.framer } | |
567 | func (sc *serverConn) CloseConn() error { return sc.conn.Close() } | |
568 | func (sc *serverConn) Flush() error { return sc.bw.Flush() } | |
569 | func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) { | |
570 | return sc.hpackEncoder, &sc.headerWriteBuf | |
571 | } | |
572 | ||
573 | func (sc *serverConn) state(streamID uint32) (streamState, *stream) { | |
574 | sc.serveG.check() | |
575 | // http://tools.ietf.org/html/rfc7540#section-5.1 | |
576 | if st, ok := sc.streams[streamID]; ok { | |
577 | return st.state, st | |
578 | } | |
579 | // "The first use of a new stream identifier implicitly closes all | |
580 | // streams in the "idle" state that might have been initiated by | |
581 | // that peer with a lower-valued stream identifier. For example, if | |
582 | // a client sends a HEADERS frame on stream 7 without ever sending a | |
583 | // frame on stream 5, then stream 5 transitions to the "closed" | |
584 | // state when the first frame for stream 7 is sent or received." | |
585 | if streamID%2 == 1 { | |
586 | if streamID <= sc.maxClientStreamID { | |
587 | return stateClosed, nil | |
588 | } | |
589 | } else { | |
590 | if streamID <= sc.maxPushPromiseID { | |
591 | return stateClosed, nil | |
592 | } | |
593 | } | |
594 | return stateIdle, nil | |
595 | } | |
596 | ||
597 | // setConnState calls the net/http ConnState hook for this connection, if configured. | |
598 | // Note that the net/http package does StateNew and StateClosed for us. | |
599 | // There is currently no plan for StateHijacked or hijacking HTTP/2 connections. | |
600 | func (sc *serverConn) setConnState(state http.ConnState) { | |
601 | if sc.hs.ConnState != nil { | |
602 | sc.hs.ConnState(sc.conn, state) | |
603 | } | |
604 | } | |
605 | ||
606 | func (sc *serverConn) vlogf(format string, args ...interface{}) { | |
607 | if VerboseLogs { | |
608 | sc.logf(format, args...) | |
609 | } | |
610 | } | |
611 | ||
612 | func (sc *serverConn) logf(format string, args ...interface{}) { | |
613 | if lg := sc.hs.ErrorLog; lg != nil { | |
614 | lg.Printf(format, args...) | |
615 | } else { | |
616 | log.Printf(format, args...) | |
617 | } | |
618 | } | |
619 | ||
620 | // errno returns v's underlying uintptr, else 0. | |
621 | // | |
622 | // TODO: remove this helper function once http2 can use build | |
623 | // tags. See comment in isClosedConnError. | |
624 | func errno(v error) uintptr { | |
625 | if rv := reflect.ValueOf(v); rv.Kind() == reflect.Uintptr { | |
626 | return uintptr(rv.Uint()) | |
627 | } | |
628 | return 0 | |
629 | } | |
630 | ||
631 | // isClosedConnError reports whether err is an error from use of a closed | |
632 | // network connection. | |
633 | func isClosedConnError(err error) bool { | |
634 | if err == nil { | |
635 | return false | |
636 | } | |
637 | ||
638 | // TODO: remove this string search and be more like the Windows | |
639 | // case below. That might involve modifying the standard library | |
640 | // to return better error types. | |
641 | str := err.Error() | |
642 | if strings.Contains(str, "use of closed network connection") { | |
643 | return true | |
644 | } | |
645 | ||
646 | // TODO(bradfitz): x/tools/cmd/bundle doesn't really support | |
647 | // build tags, so I can't make an http2_windows.go file with | |
648 | // Windows-specific stuff. Fix that and move this, once we | |
649 | // have a way to bundle this into std's net/http somehow. | |
650 | if runtime.GOOS == "windows" { | |
651 | if oe, ok := err.(*net.OpError); ok && oe.Op == "read" { | |
652 | if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" { | |
653 | const WSAECONNABORTED = 10053 | |
654 | const WSAECONNRESET = 10054 | |
655 | if n := errno(se.Err); n == WSAECONNRESET || n == WSAECONNABORTED { | |
656 | return true | |
657 | } | |
658 | } | |
659 | } | |
660 | } | |
661 | return false | |
662 | } | |
663 | ||
664 | func (sc *serverConn) condlogf(err error, format string, args ...interface{}) { | |
665 | if err == nil { | |
666 | return | |
667 | } | |
107c1cdb | 668 | if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) || err == errPrefaceTimeout { |
15c0b25d AP |
669 | // Boring, expected errors. |
670 | sc.vlogf(format, args...) | |
671 | } else { | |
672 | sc.logf(format, args...) | |
673 | } | |
674 | } | |
675 | ||
676 | func (sc *serverConn) canonicalHeader(v string) string { | |
677 | sc.serveG.check() | |
107c1cdb | 678 | buildCommonHeaderMapsOnce() |
15c0b25d AP |
679 | cv, ok := commonCanonHeader[v] |
680 | if ok { | |
681 | return cv | |
682 | } | |
683 | cv, ok = sc.canonHeader[v] | |
684 | if ok { | |
685 | return cv | |
686 | } | |
687 | if sc.canonHeader == nil { | |
688 | sc.canonHeader = make(map[string]string) | |
689 | } | |
690 | cv = http.CanonicalHeaderKey(v) | |
691 | sc.canonHeader[v] = cv | |
692 | return cv | |
693 | } | |
694 | ||
695 | type readFrameResult struct { | |
696 | f Frame // valid until readMore is called | |
697 | err error | |
698 | ||
699 | // readMore should be called once the consumer no longer needs or | |
700 | // retains f. After readMore, f is invalid and more frames can be | |
701 | // read. | |
702 | readMore func() | |
703 | } | |
704 | ||
705 | // readFrames is the loop that reads incoming frames. | |
706 | // It takes care to only read one frame at a time, blocking until the | |
707 | // consumer is done with the frame. | |
708 | // It's run on its own goroutine. | |
709 | func (sc *serverConn) readFrames() { | |
710 | gate := make(gate) | |
711 | gateDone := gate.Done | |
712 | for { | |
713 | f, err := sc.framer.ReadFrame() | |
714 | select { | |
715 | case sc.readFrameCh <- readFrameResult{f, err, gateDone}: | |
716 | case <-sc.doneServing: | |
717 | return | |
718 | } | |
719 | select { | |
720 | case <-gate: | |
721 | case <-sc.doneServing: | |
722 | return | |
723 | } | |
724 | if terminalReadFrameError(err) { | |
725 | return | |
726 | } | |
727 | } | |
728 | } | |
729 | ||
730 | // frameWriteResult is the message passed from writeFrameAsync to the serve goroutine. | |
731 | type frameWriteResult struct { | |
732 | wr FrameWriteRequest // what was written (or attempted) | |
733 | err error // result of the writeFrame call | |
734 | } | |
735 | ||
736 | // writeFrameAsync runs in its own goroutine and writes a single frame | |
737 | // and then reports when it's done. | |
738 | // At most one goroutine can be running writeFrameAsync at a time per | |
739 | // serverConn. | |
740 | func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) { | |
741 | err := wr.write.writeFrame(sc) | |
742 | sc.wroteFrameCh <- frameWriteResult{wr, err} | |
743 | } | |
744 | ||
745 | func (sc *serverConn) closeAllStreamsOnConnClose() { | |
746 | sc.serveG.check() | |
747 | for _, st := range sc.streams { | |
748 | sc.closeStream(st, errClientDisconnected) | |
749 | } | |
750 | } | |
751 | ||
752 | func (sc *serverConn) stopShutdownTimer() { | |
753 | sc.serveG.check() | |
754 | if t := sc.shutdownTimer; t != nil { | |
755 | t.Stop() | |
756 | } | |
757 | } | |
758 | ||
759 | func (sc *serverConn) notePanic() { | |
760 | // Note: this is for serverConn.serve panicking, not http.Handler code. | |
761 | if testHookOnPanicMu != nil { | |
762 | testHookOnPanicMu.Lock() | |
763 | defer testHookOnPanicMu.Unlock() | |
764 | } | |
765 | if testHookOnPanic != nil { | |
766 | if e := recover(); e != nil { | |
767 | if testHookOnPanic(sc, e) { | |
768 | panic(e) | |
769 | } | |
770 | } | |
771 | } | |
772 | } | |
773 | ||
774 | func (sc *serverConn) serve() { | |
775 | sc.serveG.check() | |
776 | defer sc.notePanic() | |
777 | defer sc.conn.Close() | |
778 | defer sc.closeAllStreamsOnConnClose() | |
779 | defer sc.stopShutdownTimer() | |
780 | defer close(sc.doneServing) // unblocks handlers trying to send | |
781 | ||
782 | if VerboseLogs { | |
783 | sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs) | |
784 | } | |
785 | ||
786 | sc.writeFrame(FrameWriteRequest{ | |
787 | write: writeSettings{ | |
788 | {SettingMaxFrameSize, sc.srv.maxReadFrameSize()}, | |
789 | {SettingMaxConcurrentStreams, sc.advMaxStreams}, | |
790 | {SettingMaxHeaderListSize, sc.maxHeaderListSize()}, | |
791 | {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())}, | |
792 | }, | |
793 | }) | |
794 | sc.unackedSettings++ | |
795 | ||
796 | // Each connection starts with intialWindowSize inflow tokens. | |
797 | // If a higher value is configured, we add more tokens. | |
798 | if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 { | |
799 | sc.sendWindowUpdate(nil, int(diff)) | |
800 | } | |
801 | ||
802 | if err := sc.readPreface(); err != nil { | |
803 | sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err) | |
804 | return | |
805 | } | |
806 | // Now that we've got the preface, get us out of the | |
807 | // "StateNew" state. We can't go directly to idle, though. | |
808 | // Active means we read some data and anticipate a request. We'll | |
809 | // do another Active when we get a HEADERS frame. | |
810 | sc.setConnState(http.StateActive) | |
811 | sc.setConnState(http.StateIdle) | |
812 | ||
813 | if sc.srv.IdleTimeout != 0 { | |
814 | sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer) | |
815 | defer sc.idleTimer.Stop() | |
816 | } | |
817 | ||
818 | go sc.readFrames() // closed by defer sc.conn.Close above | |
819 | ||
820 | settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer) | |
821 | defer settingsTimer.Stop() | |
822 | ||
823 | loopNum := 0 | |
824 | for { | |
825 | loopNum++ | |
826 | select { | |
827 | case wr := <-sc.wantWriteFrameCh: | |
828 | if se, ok := wr.write.(StreamError); ok { | |
829 | sc.resetStream(se) | |
830 | break | |
831 | } | |
832 | sc.writeFrame(wr) | |
833 | case res := <-sc.wroteFrameCh: | |
834 | sc.wroteFrame(res) | |
835 | case res := <-sc.readFrameCh: | |
836 | if !sc.processFrameFromReader(res) { | |
837 | return | |
838 | } | |
839 | res.readMore() | |
840 | if settingsTimer != nil { | |
841 | settingsTimer.Stop() | |
842 | settingsTimer = nil | |
843 | } | |
844 | case m := <-sc.bodyReadCh: | |
845 | sc.noteBodyRead(m.st, m.n) | |
846 | case msg := <-sc.serveMsgCh: | |
847 | switch v := msg.(type) { | |
848 | case func(int): | |
849 | v(loopNum) // for testing | |
850 | case *serverMessage: | |
851 | switch v { | |
852 | case settingsTimerMsg: | |
853 | sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr()) | |
854 | return | |
855 | case idleTimerMsg: | |
856 | sc.vlogf("connection is idle") | |
857 | sc.goAway(ErrCodeNo) | |
858 | case shutdownTimerMsg: | |
859 | sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr()) | |
860 | return | |
861 | case gracefulShutdownMsg: | |
862 | sc.startGracefulShutdownInternal() | |
863 | default: | |
864 | panic("unknown timer") | |
865 | } | |
866 | case *startPushRequest: | |
867 | sc.startPush(v) | |
868 | default: | |
869 | panic(fmt.Sprintf("unexpected type %T", v)) | |
870 | } | |
871 | } | |
872 | ||
107c1cdb ND |
873 | // Start the shutdown timer after sending a GOAWAY. When sending GOAWAY |
874 | // with no error code (graceful shutdown), don't start the timer until | |
875 | // all open streams have been completed. | |
876 | sentGoAway := sc.inGoAway && !sc.needToSendGoAway && !sc.writingFrame | |
877 | gracefulShutdownComplete := sc.goAwayCode == ErrCodeNo && sc.curOpenStreams() == 0 | |
878 | if sentGoAway && sc.shutdownTimer == nil && (sc.goAwayCode != ErrCodeNo || gracefulShutdownComplete) { | |
879 | sc.shutDownIn(goAwayTimeout) | |
15c0b25d AP |
880 | } |
881 | } | |
882 | } | |
883 | ||
884 | func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) { | |
885 | select { | |
886 | case <-sc.doneServing: | |
887 | case <-sharedCh: | |
888 | close(privateCh) | |
889 | } | |
890 | } | |
891 | ||
892 | type serverMessage int | |
893 | ||
894 | // Message values sent to serveMsgCh. | |
895 | var ( | |
896 | settingsTimerMsg = new(serverMessage) | |
897 | idleTimerMsg = new(serverMessage) | |
898 | shutdownTimerMsg = new(serverMessage) | |
899 | gracefulShutdownMsg = new(serverMessage) | |
900 | ) | |
901 | ||
902 | func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) } | |
903 | func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) } | |
904 | func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) } | |
905 | ||
906 | func (sc *serverConn) sendServeMsg(msg interface{}) { | |
907 | sc.serveG.checkNotOn() // NOT | |
908 | select { | |
909 | case sc.serveMsgCh <- msg: | |
910 | case <-sc.doneServing: | |
911 | } | |
912 | } | |
913 | ||
107c1cdb ND |
914 | var errPrefaceTimeout = errors.New("timeout waiting for client preface") |
915 | ||
916 | // readPreface reads the ClientPreface greeting from the peer or | |
917 | // returns errPrefaceTimeout on timeout, or an error if the greeting | |
918 | // is invalid. | |
15c0b25d AP |
919 | func (sc *serverConn) readPreface() error { |
920 | errc := make(chan error, 1) | |
921 | go func() { | |
922 | // Read the client preface | |
923 | buf := make([]byte, len(ClientPreface)) | |
924 | if _, err := io.ReadFull(sc.conn, buf); err != nil { | |
925 | errc <- err | |
926 | } else if !bytes.Equal(buf, clientPreface) { | |
927 | errc <- fmt.Errorf("bogus greeting %q", buf) | |
928 | } else { | |
929 | errc <- nil | |
930 | } | |
931 | }() | |
932 | timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server? | |
933 | defer timer.Stop() | |
934 | select { | |
935 | case <-timer.C: | |
107c1cdb | 936 | return errPrefaceTimeout |
15c0b25d AP |
937 | case err := <-errc: |
938 | if err == nil { | |
939 | if VerboseLogs { | |
940 | sc.vlogf("http2: server: client %v said hello", sc.conn.RemoteAddr()) | |
941 | } | |
942 | } | |
943 | return err | |
944 | } | |
945 | } | |
946 | ||
947 | var errChanPool = sync.Pool{ | |
948 | New: func() interface{} { return make(chan error, 1) }, | |
949 | } | |
950 | ||
951 | var writeDataPool = sync.Pool{ | |
952 | New: func() interface{} { return new(writeData) }, | |
953 | } | |
954 | ||
955 | // writeDataFromHandler writes DATA response frames from a handler on | |
956 | // the given stream. | |
957 | func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error { | |
958 | ch := errChanPool.Get().(chan error) | |
959 | writeArg := writeDataPool.Get().(*writeData) | |
960 | *writeArg = writeData{stream.id, data, endStream} | |
961 | err := sc.writeFrameFromHandler(FrameWriteRequest{ | |
962 | write: writeArg, | |
963 | stream: stream, | |
964 | done: ch, | |
965 | }) | |
966 | if err != nil { | |
967 | return err | |
968 | } | |
969 | var frameWriteDone bool // the frame write is done (successfully or not) | |
970 | select { | |
971 | case err = <-ch: | |
972 | frameWriteDone = true | |
973 | case <-sc.doneServing: | |
974 | return errClientDisconnected | |
975 | case <-stream.cw: | |
976 | // If both ch and stream.cw were ready (as might | |
977 | // happen on the final Write after an http.Handler | |
978 | // ends), prefer the write result. Otherwise this | |
979 | // might just be us successfully closing the stream. | |
980 | // The writeFrameAsync and serve goroutines guarantee | |
981 | // that the ch send will happen before the stream.cw | |
982 | // close. | |
983 | select { | |
984 | case err = <-ch: | |
985 | frameWriteDone = true | |
986 | default: | |
987 | return errStreamClosed | |
988 | } | |
989 | } | |
990 | errChanPool.Put(ch) | |
991 | if frameWriteDone { | |
992 | writeDataPool.Put(writeArg) | |
993 | } | |
994 | return err | |
995 | } | |
996 | ||
997 | // writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts | |
998 | // if the connection has gone away. | |
999 | // | |
1000 | // This must not be run from the serve goroutine itself, else it might | |
1001 | // deadlock writing to sc.wantWriteFrameCh (which is only mildly | |
1002 | // buffered and is read by serve itself). If you're on the serve | |
1003 | // goroutine, call writeFrame instead. | |
1004 | func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error { | |
1005 | sc.serveG.checkNotOn() // NOT | |
1006 | select { | |
1007 | case sc.wantWriteFrameCh <- wr: | |
1008 | return nil | |
1009 | case <-sc.doneServing: | |
1010 | // Serve loop is gone. | |
1011 | // Client has closed their connection to the server. | |
1012 | return errClientDisconnected | |
1013 | } | |
1014 | } | |
1015 | ||
1016 | // writeFrame schedules a frame to write and sends it if there's nothing | |
1017 | // already being written. | |
1018 | // | |
1019 | // There is no pushback here (the serve goroutine never blocks). It's | |
1020 | // the http.Handlers that block, waiting for their previous frames to | |
1021 | // make it onto the wire | |
1022 | // | |
1023 | // If you're not on the serve goroutine, use writeFrameFromHandler instead. | |
1024 | func (sc *serverConn) writeFrame(wr FrameWriteRequest) { | |
1025 | sc.serveG.check() | |
1026 | ||
1027 | // If true, wr will not be written and wr.done will not be signaled. | |
1028 | var ignoreWrite bool | |
1029 | ||
1030 | // We are not allowed to write frames on closed streams. RFC 7540 Section | |
1031 | // 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on | |
1032 | // a closed stream." Our server never sends PRIORITY, so that exception | |
1033 | // does not apply. | |
1034 | // | |
1035 | // The serverConn might close an open stream while the stream's handler | |
1036 | // is still running. For example, the server might close a stream when it | |
1037 | // receives bad data from the client. If this happens, the handler might | |
1038 | // attempt to write a frame after the stream has been closed (since the | |
1039 | // handler hasn't yet been notified of the close). In this case, we simply | |
1040 | // ignore the frame. The handler will notice that the stream is closed when | |
1041 | // it waits for the frame to be written. | |
1042 | // | |
1043 | // As an exception to this rule, we allow sending RST_STREAM after close. | |
1044 | // This allows us to immediately reject new streams without tracking any | |
1045 | // state for those streams (except for the queued RST_STREAM frame). This | |
1046 | // may result in duplicate RST_STREAMs in some cases, but the client should | |
1047 | // ignore those. | |
1048 | if wr.StreamID() != 0 { | |
1049 | _, isReset := wr.write.(StreamError) | |
1050 | if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset { | |
1051 | ignoreWrite = true | |
1052 | } | |
1053 | } | |
1054 | ||
1055 | // Don't send a 100-continue response if we've already sent headers. | |
1056 | // See golang.org/issue/14030. | |
1057 | switch wr.write.(type) { | |
1058 | case *writeResHeaders: | |
1059 | wr.stream.wroteHeaders = true | |
1060 | case write100ContinueHeadersFrame: | |
1061 | if wr.stream.wroteHeaders { | |
1062 | // We do not need to notify wr.done because this frame is | |
1063 | // never written with wr.done != nil. | |
1064 | if wr.done != nil { | |
1065 | panic("wr.done != nil for write100ContinueHeadersFrame") | |
1066 | } | |
1067 | ignoreWrite = true | |
1068 | } | |
1069 | } | |
1070 | ||
1071 | if !ignoreWrite { | |
1072 | sc.writeSched.Push(wr) | |
1073 | } | |
1074 | sc.scheduleFrameWrite() | |
1075 | } | |
1076 | ||
1077 | // startFrameWrite starts a goroutine to write wr (in a separate | |
1078 | // goroutine since that might block on the network), and updates the | |
1079 | // serve goroutine's state about the world, updated from info in wr. | |
1080 | func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) { | |
1081 | sc.serveG.check() | |
1082 | if sc.writingFrame { | |
1083 | panic("internal error: can only be writing one frame at a time") | |
1084 | } | |
1085 | ||
1086 | st := wr.stream | |
1087 | if st != nil { | |
1088 | switch st.state { | |
1089 | case stateHalfClosedLocal: | |
1090 | switch wr.write.(type) { | |
1091 | case StreamError, handlerPanicRST, writeWindowUpdate: | |
1092 | // RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE | |
1093 | // in this state. (We never send PRIORITY from the server, so that is not checked.) | |
1094 | default: | |
1095 | panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr)) | |
1096 | } | |
1097 | case stateClosed: | |
1098 | panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr)) | |
1099 | } | |
1100 | } | |
1101 | if wpp, ok := wr.write.(*writePushPromise); ok { | |
1102 | var err error | |
1103 | wpp.promisedID, err = wpp.allocatePromisedID() | |
1104 | if err != nil { | |
1105 | sc.writingFrameAsync = false | |
1106 | wr.replyToWriter(err) | |
1107 | return | |
1108 | } | |
1109 | } | |
1110 | ||
1111 | sc.writingFrame = true | |
1112 | sc.needsFrameFlush = true | |
1113 | if wr.write.staysWithinBuffer(sc.bw.Available()) { | |
1114 | sc.writingFrameAsync = false | |
1115 | err := wr.write.writeFrame(sc) | |
1116 | sc.wroteFrame(frameWriteResult{wr, err}) | |
1117 | } else { | |
1118 | sc.writingFrameAsync = true | |
1119 | go sc.writeFrameAsync(wr) | |
1120 | } | |
1121 | } | |
1122 | ||
1123 | // errHandlerPanicked is the error given to any callers blocked in a read from | |
1124 | // Request.Body when the main goroutine panics. Since most handlers read in the | |
107c1cdb | 1125 | // main ServeHTTP goroutine, this will show up rarely. |
15c0b25d AP |
1126 | var errHandlerPanicked = errors.New("http2: handler panicked") |
1127 | ||
1128 | // wroteFrame is called on the serve goroutine with the result of | |
1129 | // whatever happened on writeFrameAsync. | |
1130 | func (sc *serverConn) wroteFrame(res frameWriteResult) { | |
1131 | sc.serveG.check() | |
1132 | if !sc.writingFrame { | |
1133 | panic("internal error: expected to be already writing a frame") | |
1134 | } | |
1135 | sc.writingFrame = false | |
1136 | sc.writingFrameAsync = false | |
1137 | ||
1138 | wr := res.wr | |
1139 | ||
1140 | if writeEndsStream(wr.write) { | |
1141 | st := wr.stream | |
1142 | if st == nil { | |
1143 | panic("internal error: expecting non-nil stream") | |
1144 | } | |
1145 | switch st.state { | |
1146 | case stateOpen: | |
1147 | // Here we would go to stateHalfClosedLocal in | |
1148 | // theory, but since our handler is done and | |
1149 | // the net/http package provides no mechanism | |
1150 | // for closing a ResponseWriter while still | |
1151 | // reading data (see possible TODO at top of | |
1152 | // this file), we go into closed state here | |
1153 | // anyway, after telling the peer we're | |
1154 | // hanging up on them. We'll transition to | |
1155 | // stateClosed after the RST_STREAM frame is | |
1156 | // written. | |
1157 | st.state = stateHalfClosedLocal | |
1158 | // Section 8.1: a server MAY request that the client abort | |
1159 | // transmission of a request without error by sending a | |
1160 | // RST_STREAM with an error code of NO_ERROR after sending | |
1161 | // a complete response. | |
1162 | sc.resetStream(streamError(st.id, ErrCodeNo)) | |
1163 | case stateHalfClosedRemote: | |
1164 | sc.closeStream(st, errHandlerComplete) | |
1165 | } | |
1166 | } else { | |
1167 | switch v := wr.write.(type) { | |
1168 | case StreamError: | |
1169 | // st may be unknown if the RST_STREAM was generated to reject bad input. | |
1170 | if st, ok := sc.streams[v.StreamID]; ok { | |
1171 | sc.closeStream(st, v) | |
1172 | } | |
1173 | case handlerPanicRST: | |
1174 | sc.closeStream(wr.stream, errHandlerPanicked) | |
1175 | } | |
1176 | } | |
1177 | ||
1178 | // Reply (if requested) to unblock the ServeHTTP goroutine. | |
1179 | wr.replyToWriter(res.err) | |
1180 | ||
1181 | sc.scheduleFrameWrite() | |
1182 | } | |
1183 | ||
1184 | // scheduleFrameWrite tickles the frame writing scheduler. | |
1185 | // | |
1186 | // If a frame is already being written, nothing happens. This will be called again | |
1187 | // when the frame is done being written. | |
1188 | // | |
1189 | // If a frame isn't being written we need to send one, the best frame | |
1190 | // to send is selected, preferring first things that aren't | |
1191 | // stream-specific (e.g. ACKing settings), and then finding the | |
1192 | // highest priority stream. | |
1193 | // | |
1194 | // If a frame isn't being written and there's nothing else to send, we | |
1195 | // flush the write buffer. | |
1196 | func (sc *serverConn) scheduleFrameWrite() { | |
1197 | sc.serveG.check() | |
1198 | if sc.writingFrame || sc.inFrameScheduleLoop { | |
1199 | return | |
1200 | } | |
1201 | sc.inFrameScheduleLoop = true | |
1202 | for !sc.writingFrameAsync { | |
1203 | if sc.needToSendGoAway { | |
1204 | sc.needToSendGoAway = false | |
1205 | sc.startFrameWrite(FrameWriteRequest{ | |
1206 | write: &writeGoAway{ | |
1207 | maxStreamID: sc.maxClientStreamID, | |
1208 | code: sc.goAwayCode, | |
1209 | }, | |
1210 | }) | |
1211 | continue | |
1212 | } | |
1213 | if sc.needToSendSettingsAck { | |
1214 | sc.needToSendSettingsAck = false | |
1215 | sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}}) | |
1216 | continue | |
1217 | } | |
1218 | if !sc.inGoAway || sc.goAwayCode == ErrCodeNo { | |
1219 | if wr, ok := sc.writeSched.Pop(); ok { | |
1220 | sc.startFrameWrite(wr) | |
1221 | continue | |
1222 | } | |
1223 | } | |
1224 | if sc.needsFrameFlush { | |
1225 | sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}}) | |
1226 | sc.needsFrameFlush = false // after startFrameWrite, since it sets this true | |
1227 | continue | |
1228 | } | |
1229 | break | |
1230 | } | |
1231 | sc.inFrameScheduleLoop = false | |
1232 | } | |
1233 | ||
1234 | // startGracefulShutdown gracefully shuts down a connection. This | |
1235 | // sends GOAWAY with ErrCodeNo to tell the client we're gracefully | |
1236 | // shutting down. The connection isn't closed until all current | |
1237 | // streams are done. | |
1238 | // | |
1239 | // startGracefulShutdown returns immediately; it does not wait until | |
1240 | // the connection has shut down. | |
1241 | func (sc *serverConn) startGracefulShutdown() { | |
1242 | sc.serveG.checkNotOn() // NOT | |
1243 | sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) }) | |
1244 | } | |
1245 | ||
107c1cdb ND |
1246 | // After sending GOAWAY, the connection will close after goAwayTimeout. |
1247 | // If we close the connection immediately after sending GOAWAY, there may | |
1248 | // be unsent data in our kernel receive buffer, which will cause the kernel | |
1249 | // to send a TCP RST on close() instead of a FIN. This RST will abort the | |
1250 | // connection immediately, whether or not the client had received the GOAWAY. | |
1251 | // | |
1252 | // Ideally we should delay for at least 1 RTT + epsilon so the client has | |
1253 | // a chance to read the GOAWAY and stop sending messages. Measuring RTT | |
1254 | // is hard, so we approximate with 1 second. See golang.org/issue/18701. | |
1255 | // | |
1256 | // This is a var so it can be shorter in tests, where all requests uses the | |
1257 | // loopback interface making the expected RTT very small. | |
1258 | // | |
1259 | // TODO: configurable? | |
1260 | var goAwayTimeout = 1 * time.Second | |
1261 | ||
15c0b25d | 1262 | func (sc *serverConn) startGracefulShutdownInternal() { |
107c1cdb | 1263 | sc.goAway(ErrCodeNo) |
15c0b25d AP |
1264 | } |
1265 | ||
1266 | func (sc *serverConn) goAway(code ErrCode) { | |
15c0b25d AP |
1267 | sc.serveG.check() |
1268 | if sc.inGoAway { | |
1269 | return | |
1270 | } | |
15c0b25d AP |
1271 | sc.inGoAway = true |
1272 | sc.needToSendGoAway = true | |
1273 | sc.goAwayCode = code | |
1274 | sc.scheduleFrameWrite() | |
1275 | } | |
1276 | ||
1277 | func (sc *serverConn) shutDownIn(d time.Duration) { | |
1278 | sc.serveG.check() | |
1279 | sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer) | |
1280 | } | |
1281 | ||
1282 | func (sc *serverConn) resetStream(se StreamError) { | |
1283 | sc.serveG.check() | |
1284 | sc.writeFrame(FrameWriteRequest{write: se}) | |
1285 | if st, ok := sc.streams[se.StreamID]; ok { | |
1286 | st.resetQueued = true | |
1287 | } | |
1288 | } | |
1289 | ||
1290 | // processFrameFromReader processes the serve loop's read from readFrameCh from the | |
1291 | // frame-reading goroutine. | |
1292 | // processFrameFromReader returns whether the connection should be kept open. | |
1293 | func (sc *serverConn) processFrameFromReader(res readFrameResult) bool { | |
1294 | sc.serveG.check() | |
1295 | err := res.err | |
1296 | if err != nil { | |
1297 | if err == ErrFrameTooLarge { | |
1298 | sc.goAway(ErrCodeFrameSize) | |
1299 | return true // goAway will close the loop | |
1300 | } | |
1301 | clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) | |
1302 | if clientGone { | |
1303 | // TODO: could we also get into this state if | |
1304 | // the peer does a half close | |
1305 | // (e.g. CloseWrite) because they're done | |
1306 | // sending frames but they're still wanting | |
1307 | // our open replies? Investigate. | |
1308 | // TODO: add CloseWrite to crypto/tls.Conn first | |
1309 | // so we have a way to test this? I suppose | |
1310 | // just for testing we could have a non-TLS mode. | |
1311 | return false | |
1312 | } | |
1313 | } else { | |
1314 | f := res.f | |
1315 | if VerboseLogs { | |
1316 | sc.vlogf("http2: server read frame %v", summarizeFrame(f)) | |
1317 | } | |
1318 | err = sc.processFrame(f) | |
1319 | if err == nil { | |
1320 | return true | |
1321 | } | |
1322 | } | |
1323 | ||
1324 | switch ev := err.(type) { | |
1325 | case StreamError: | |
1326 | sc.resetStream(ev) | |
1327 | return true | |
1328 | case goAwayFlowError: | |
1329 | sc.goAway(ErrCodeFlowControl) | |
1330 | return true | |
1331 | case ConnectionError: | |
1332 | sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev) | |
1333 | sc.goAway(ErrCode(ev)) | |
1334 | return true // goAway will handle shutdown | |
1335 | default: | |
1336 | if res.err != nil { | |
1337 | sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err) | |
1338 | } else { | |
1339 | sc.logf("http2: server closing client connection: %v", err) | |
1340 | } | |
1341 | return false | |
1342 | } | |
1343 | } | |
1344 | ||
1345 | func (sc *serverConn) processFrame(f Frame) error { | |
1346 | sc.serveG.check() | |
1347 | ||
1348 | // First frame received must be SETTINGS. | |
1349 | if !sc.sawFirstSettings { | |
1350 | if _, ok := f.(*SettingsFrame); !ok { | |
1351 | return ConnectionError(ErrCodeProtocol) | |
1352 | } | |
1353 | sc.sawFirstSettings = true | |
1354 | } | |
1355 | ||
1356 | switch f := f.(type) { | |
1357 | case *SettingsFrame: | |
1358 | return sc.processSettings(f) | |
1359 | case *MetaHeadersFrame: | |
1360 | return sc.processHeaders(f) | |
1361 | case *WindowUpdateFrame: | |
1362 | return sc.processWindowUpdate(f) | |
1363 | case *PingFrame: | |
1364 | return sc.processPing(f) | |
1365 | case *DataFrame: | |
1366 | return sc.processData(f) | |
1367 | case *RSTStreamFrame: | |
1368 | return sc.processResetStream(f) | |
1369 | case *PriorityFrame: | |
1370 | return sc.processPriority(f) | |
1371 | case *GoAwayFrame: | |
1372 | return sc.processGoAway(f) | |
1373 | case *PushPromiseFrame: | |
1374 | // A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE | |
1375 | // frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR. | |
1376 | return ConnectionError(ErrCodeProtocol) | |
1377 | default: | |
1378 | sc.vlogf("http2: server ignoring frame: %v", f.Header()) | |
1379 | return nil | |
1380 | } | |
1381 | } | |
1382 | ||
1383 | func (sc *serverConn) processPing(f *PingFrame) error { | |
1384 | sc.serveG.check() | |
1385 | if f.IsAck() { | |
1386 | // 6.7 PING: " An endpoint MUST NOT respond to PING frames | |
1387 | // containing this flag." | |
1388 | return nil | |
1389 | } | |
1390 | if f.StreamID != 0 { | |
1391 | // "PING frames are not associated with any individual | |
1392 | // stream. If a PING frame is received with a stream | |
1393 | // identifier field value other than 0x0, the recipient MUST | |
1394 | // respond with a connection error (Section 5.4.1) of type | |
1395 | // PROTOCOL_ERROR." | |
1396 | return ConnectionError(ErrCodeProtocol) | |
1397 | } | |
1398 | if sc.inGoAway && sc.goAwayCode != ErrCodeNo { | |
1399 | return nil | |
1400 | } | |
1401 | sc.writeFrame(FrameWriteRequest{write: writePingAck{f}}) | |
1402 | return nil | |
1403 | } | |
1404 | ||
1405 | func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error { | |
1406 | sc.serveG.check() | |
1407 | switch { | |
1408 | case f.StreamID != 0: // stream-level flow control | |
1409 | state, st := sc.state(f.StreamID) | |
1410 | if state == stateIdle { | |
1411 | // Section 5.1: "Receiving any frame other than HEADERS | |
1412 | // or PRIORITY on a stream in this state MUST be | |
1413 | // treated as a connection error (Section 5.4.1) of | |
1414 | // type PROTOCOL_ERROR." | |
1415 | return ConnectionError(ErrCodeProtocol) | |
1416 | } | |
1417 | if st == nil { | |
1418 | // "WINDOW_UPDATE can be sent by a peer that has sent a | |
1419 | // frame bearing the END_STREAM flag. This means that a | |
1420 | // receiver could receive a WINDOW_UPDATE frame on a "half | |
1421 | // closed (remote)" or "closed" stream. A receiver MUST | |
1422 | // NOT treat this as an error, see Section 5.1." | |
1423 | return nil | |
1424 | } | |
1425 | if !st.flow.add(int32(f.Increment)) { | |
1426 | return streamError(f.StreamID, ErrCodeFlowControl) | |
1427 | } | |
1428 | default: // connection-level flow control | |
1429 | if !sc.flow.add(int32(f.Increment)) { | |
1430 | return goAwayFlowError{} | |
1431 | } | |
1432 | } | |
1433 | sc.scheduleFrameWrite() | |
1434 | return nil | |
1435 | } | |
1436 | ||
1437 | func (sc *serverConn) processResetStream(f *RSTStreamFrame) error { | |
1438 | sc.serveG.check() | |
1439 | ||
1440 | state, st := sc.state(f.StreamID) | |
1441 | if state == stateIdle { | |
1442 | // 6.4 "RST_STREAM frames MUST NOT be sent for a | |
1443 | // stream in the "idle" state. If a RST_STREAM frame | |
1444 | // identifying an idle stream is received, the | |
1445 | // recipient MUST treat this as a connection error | |
1446 | // (Section 5.4.1) of type PROTOCOL_ERROR. | |
1447 | return ConnectionError(ErrCodeProtocol) | |
1448 | } | |
1449 | if st != nil { | |
1450 | st.cancelCtx() | |
1451 | sc.closeStream(st, streamError(f.StreamID, f.ErrCode)) | |
1452 | } | |
1453 | return nil | |
1454 | } | |
1455 | ||
1456 | func (sc *serverConn) closeStream(st *stream, err error) { | |
1457 | sc.serveG.check() | |
1458 | if st.state == stateIdle || st.state == stateClosed { | |
1459 | panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state)) | |
1460 | } | |
1461 | st.state = stateClosed | |
1462 | if st.writeDeadline != nil { | |
1463 | st.writeDeadline.Stop() | |
1464 | } | |
1465 | if st.isPushed() { | |
1466 | sc.curPushedStreams-- | |
1467 | } else { | |
1468 | sc.curClientStreams-- | |
1469 | } | |
1470 | delete(sc.streams, st.id) | |
1471 | if len(sc.streams) == 0 { | |
1472 | sc.setConnState(http.StateIdle) | |
1473 | if sc.srv.IdleTimeout != 0 { | |
1474 | sc.idleTimer.Reset(sc.srv.IdleTimeout) | |
1475 | } | |
1476 | if h1ServerKeepAlivesDisabled(sc.hs) { | |
1477 | sc.startGracefulShutdownInternal() | |
1478 | } | |
1479 | } | |
1480 | if p := st.body; p != nil { | |
1481 | // Return any buffered unread bytes worth of conn-level flow control. | |
1482 | // See golang.org/issue/16481 | |
1483 | sc.sendWindowUpdate(nil, p.Len()) | |
1484 | ||
1485 | p.CloseWithError(err) | |
1486 | } | |
1487 | st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc | |
1488 | sc.writeSched.CloseStream(st.id) | |
1489 | } | |
1490 | ||
1491 | func (sc *serverConn) processSettings(f *SettingsFrame) error { | |
1492 | sc.serveG.check() | |
1493 | if f.IsAck() { | |
1494 | sc.unackedSettings-- | |
1495 | if sc.unackedSettings < 0 { | |
1496 | // Why is the peer ACKing settings we never sent? | |
1497 | // The spec doesn't mention this case, but | |
1498 | // hang up on them anyway. | |
1499 | return ConnectionError(ErrCodeProtocol) | |
1500 | } | |
1501 | return nil | |
1502 | } | |
107c1cdb ND |
1503 | if f.NumSettings() > 100 || f.HasDuplicates() { |
1504 | // This isn't actually in the spec, but hang up on | |
1505 | // suspiciously large settings frames or those with | |
1506 | // duplicate entries. | |
1507 | return ConnectionError(ErrCodeProtocol) | |
1508 | } | |
15c0b25d AP |
1509 | if err := f.ForeachSetting(sc.processSetting); err != nil { |
1510 | return err | |
1511 | } | |
1512 | sc.needToSendSettingsAck = true | |
1513 | sc.scheduleFrameWrite() | |
1514 | return nil | |
1515 | } | |
1516 | ||
1517 | func (sc *serverConn) processSetting(s Setting) error { | |
1518 | sc.serveG.check() | |
1519 | if err := s.Valid(); err != nil { | |
1520 | return err | |
1521 | } | |
1522 | if VerboseLogs { | |
1523 | sc.vlogf("http2: server processing setting %v", s) | |
1524 | } | |
1525 | switch s.ID { | |
1526 | case SettingHeaderTableSize: | |
1527 | sc.headerTableSize = s.Val | |
1528 | sc.hpackEncoder.SetMaxDynamicTableSize(s.Val) | |
1529 | case SettingEnablePush: | |
1530 | sc.pushEnabled = s.Val != 0 | |
1531 | case SettingMaxConcurrentStreams: | |
1532 | sc.clientMaxStreams = s.Val | |
1533 | case SettingInitialWindowSize: | |
1534 | return sc.processSettingInitialWindowSize(s.Val) | |
1535 | case SettingMaxFrameSize: | |
1536 | sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31 | |
1537 | case SettingMaxHeaderListSize: | |
1538 | sc.peerMaxHeaderListSize = s.Val | |
1539 | default: | |
1540 | // Unknown setting: "An endpoint that receives a SETTINGS | |
1541 | // frame with any unknown or unsupported identifier MUST | |
1542 | // ignore that setting." | |
1543 | if VerboseLogs { | |
1544 | sc.vlogf("http2: server ignoring unknown setting %v", s) | |
1545 | } | |
1546 | } | |
1547 | return nil | |
1548 | } | |
1549 | ||
1550 | func (sc *serverConn) processSettingInitialWindowSize(val uint32) error { | |
1551 | sc.serveG.check() | |
1552 | // Note: val already validated to be within range by | |
1553 | // processSetting's Valid call. | |
1554 | ||
1555 | // "A SETTINGS frame can alter the initial flow control window | |
1556 | // size for all current streams. When the value of | |
1557 | // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST | |
1558 | // adjust the size of all stream flow control windows that it | |
1559 | // maintains by the difference between the new value and the | |
1560 | // old value." | |
1561 | old := sc.initialStreamSendWindowSize | |
1562 | sc.initialStreamSendWindowSize = int32(val) | |
1563 | growth := int32(val) - old // may be negative | |
1564 | for _, st := range sc.streams { | |
1565 | if !st.flow.add(growth) { | |
1566 | // 6.9.2 Initial Flow Control Window Size | |
1567 | // "An endpoint MUST treat a change to | |
1568 | // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow | |
1569 | // control window to exceed the maximum size as a | |
1570 | // connection error (Section 5.4.1) of type | |
1571 | // FLOW_CONTROL_ERROR." | |
1572 | return ConnectionError(ErrCodeFlowControl) | |
1573 | } | |
1574 | } | |
1575 | return nil | |
1576 | } | |
1577 | ||
1578 | func (sc *serverConn) processData(f *DataFrame) error { | |
1579 | sc.serveG.check() | |
1580 | if sc.inGoAway && sc.goAwayCode != ErrCodeNo { | |
1581 | return nil | |
1582 | } | |
1583 | data := f.Data() | |
1584 | ||
1585 | // "If a DATA frame is received whose stream is not in "open" | |
1586 | // or "half closed (local)" state, the recipient MUST respond | |
1587 | // with a stream error (Section 5.4.2) of type STREAM_CLOSED." | |
1588 | id := f.Header().StreamID | |
1589 | state, st := sc.state(id) | |
1590 | if id == 0 || state == stateIdle { | |
1591 | // Section 5.1: "Receiving any frame other than HEADERS | |
1592 | // or PRIORITY on a stream in this state MUST be | |
1593 | // treated as a connection error (Section 5.4.1) of | |
1594 | // type PROTOCOL_ERROR." | |
1595 | return ConnectionError(ErrCodeProtocol) | |
1596 | } | |
1597 | if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued { | |
1598 | // This includes sending a RST_STREAM if the stream is | |
1599 | // in stateHalfClosedLocal (which currently means that | |
1600 | // the http.Handler returned, so it's done reading & | |
1601 | // done writing). Try to stop the client from sending | |
1602 | // more DATA. | |
1603 | ||
1604 | // But still enforce their connection-level flow control, | |
1605 | // and return any flow control bytes since we're not going | |
1606 | // to consume them. | |
1607 | if sc.inflow.available() < int32(f.Length) { | |
1608 | return streamError(id, ErrCodeFlowControl) | |
1609 | } | |
1610 | // Deduct the flow control from inflow, since we're | |
1611 | // going to immediately add it back in | |
1612 | // sendWindowUpdate, which also schedules sending the | |
1613 | // frames. | |
1614 | sc.inflow.take(int32(f.Length)) | |
1615 | sc.sendWindowUpdate(nil, int(f.Length)) // conn-level | |
1616 | ||
1617 | if st != nil && st.resetQueued { | |
1618 | // Already have a stream error in flight. Don't send another. | |
1619 | return nil | |
1620 | } | |
1621 | return streamError(id, ErrCodeStreamClosed) | |
1622 | } | |
1623 | if st.body == nil { | |
1624 | panic("internal error: should have a body in this state") | |
1625 | } | |
1626 | ||
1627 | // Sender sending more than they'd declared? | |
1628 | if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { | |
1629 | st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) | |
107c1cdb ND |
1630 | // RFC 7540, sec 8.1.2.6: A request or response is also malformed if the |
1631 | // value of a content-length header field does not equal the sum of the | |
1632 | // DATA frame payload lengths that form the body. | |
1633 | return streamError(id, ErrCodeProtocol) | |
15c0b25d AP |
1634 | } |
1635 | if f.Length > 0 { | |
1636 | // Check whether the client has flow control quota. | |
1637 | if st.inflow.available() < int32(f.Length) { | |
1638 | return streamError(id, ErrCodeFlowControl) | |
1639 | } | |
1640 | st.inflow.take(int32(f.Length)) | |
1641 | ||
1642 | if len(data) > 0 { | |
1643 | wrote, err := st.body.Write(data) | |
1644 | if err != nil { | |
1645 | return streamError(id, ErrCodeStreamClosed) | |
1646 | } | |
1647 | if wrote != len(data) { | |
1648 | panic("internal error: bad Writer") | |
1649 | } | |
1650 | st.bodyBytes += int64(len(data)) | |
1651 | } | |
1652 | ||
1653 | // Return any padded flow control now, since we won't | |
1654 | // refund it later on body reads. | |
1655 | if pad := int32(f.Length) - int32(len(data)); pad > 0 { | |
1656 | sc.sendWindowUpdate32(nil, pad) | |
1657 | sc.sendWindowUpdate32(st, pad) | |
1658 | } | |
1659 | } | |
1660 | if f.StreamEnded() { | |
1661 | st.endStream() | |
1662 | } | |
1663 | return nil | |
1664 | } | |
1665 | ||
1666 | func (sc *serverConn) processGoAway(f *GoAwayFrame) error { | |
1667 | sc.serveG.check() | |
1668 | if f.ErrCode != ErrCodeNo { | |
1669 | sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f) | |
1670 | } else { | |
1671 | sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f) | |
1672 | } | |
1673 | sc.startGracefulShutdownInternal() | |
1674 | // http://tools.ietf.org/html/rfc7540#section-6.8 | |
1675 | // We should not create any new streams, which means we should disable push. | |
1676 | sc.pushEnabled = false | |
1677 | return nil | |
1678 | } | |
1679 | ||
1680 | // isPushed reports whether the stream is server-initiated. | |
1681 | func (st *stream) isPushed() bool { | |
1682 | return st.id%2 == 0 | |
1683 | } | |
1684 | ||
1685 | // endStream closes a Request.Body's pipe. It is called when a DATA | |
1686 | // frame says a request body is over (or after trailers). | |
1687 | func (st *stream) endStream() { | |
1688 | sc := st.sc | |
1689 | sc.serveG.check() | |
1690 | ||
1691 | if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes { | |
1692 | st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes", | |
1693 | st.declBodyBytes, st.bodyBytes)) | |
1694 | } else { | |
1695 | st.body.closeWithErrorAndCode(io.EOF, st.copyTrailersToHandlerRequest) | |
1696 | st.body.CloseWithError(io.EOF) | |
1697 | } | |
1698 | st.state = stateHalfClosedRemote | |
1699 | } | |
1700 | ||
1701 | // copyTrailersToHandlerRequest is run in the Handler's goroutine in | |
1702 | // its Request.Body.Read just before it gets io.EOF. | |
1703 | func (st *stream) copyTrailersToHandlerRequest() { | |
1704 | for k, vv := range st.trailer { | |
1705 | if _, ok := st.reqTrailer[k]; ok { | |
1706 | // Only copy it over it was pre-declared. | |
1707 | st.reqTrailer[k] = vv | |
1708 | } | |
1709 | } | |
1710 | } | |
1711 | ||
1712 | // onWriteTimeout is run on its own goroutine (from time.AfterFunc) | |
1713 | // when the stream's WriteTimeout has fired. | |
1714 | func (st *stream) onWriteTimeout() { | |
1715 | st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)}) | |
1716 | } | |
1717 | ||
1718 | func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error { | |
1719 | sc.serveG.check() | |
1720 | id := f.StreamID | |
1721 | if sc.inGoAway { | |
1722 | // Ignore. | |
1723 | return nil | |
1724 | } | |
1725 | // http://tools.ietf.org/html/rfc7540#section-5.1.1 | |
1726 | // Streams initiated by a client MUST use odd-numbered stream | |
1727 | // identifiers. [...] An endpoint that receives an unexpected | |
1728 | // stream identifier MUST respond with a connection error | |
1729 | // (Section 5.4.1) of type PROTOCOL_ERROR. | |
1730 | if id%2 != 1 { | |
1731 | return ConnectionError(ErrCodeProtocol) | |
1732 | } | |
1733 | // A HEADERS frame can be used to create a new stream or | |
1734 | // send a trailer for an open one. If we already have a stream | |
1735 | // open, let it process its own HEADERS frame (trailers at this | |
1736 | // point, if it's valid). | |
1737 | if st := sc.streams[f.StreamID]; st != nil { | |
1738 | if st.resetQueued { | |
1739 | // We're sending RST_STREAM to close the stream, so don't bother | |
1740 | // processing this frame. | |
1741 | return nil | |
1742 | } | |
107c1cdb ND |
1743 | // RFC 7540, sec 5.1: If an endpoint receives additional frames, other than |
1744 | // WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in | |
1745 | // this state, it MUST respond with a stream error (Section 5.4.2) of | |
1746 | // type STREAM_CLOSED. | |
1747 | if st.state == stateHalfClosedRemote { | |
1748 | return streamError(id, ErrCodeStreamClosed) | |
1749 | } | |
15c0b25d AP |
1750 | return st.processTrailerHeaders(f) |
1751 | } | |
1752 | ||
1753 | // [...] The identifier of a newly established stream MUST be | |
1754 | // numerically greater than all streams that the initiating | |
1755 | // endpoint has opened or reserved. [...] An endpoint that | |
1756 | // receives an unexpected stream identifier MUST respond with | |
1757 | // a connection error (Section 5.4.1) of type PROTOCOL_ERROR. | |
1758 | if id <= sc.maxClientStreamID { | |
1759 | return ConnectionError(ErrCodeProtocol) | |
1760 | } | |
1761 | sc.maxClientStreamID = id | |
1762 | ||
1763 | if sc.idleTimer != nil { | |
1764 | sc.idleTimer.Stop() | |
1765 | } | |
1766 | ||
1767 | // http://tools.ietf.org/html/rfc7540#section-5.1.2 | |
1768 | // [...] Endpoints MUST NOT exceed the limit set by their peer. An | |
1769 | // endpoint that receives a HEADERS frame that causes their | |
1770 | // advertised concurrent stream limit to be exceeded MUST treat | |
1771 | // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR | |
1772 | // or REFUSED_STREAM. | |
1773 | if sc.curClientStreams+1 > sc.advMaxStreams { | |
1774 | if sc.unackedSettings == 0 { | |
1775 | // They should know better. | |
1776 | return streamError(id, ErrCodeProtocol) | |
1777 | } | |
1778 | // Assume it's a network race, where they just haven't | |
1779 | // received our last SETTINGS update. But actually | |
1780 | // this can't happen yet, because we don't yet provide | |
1781 | // a way for users to adjust server parameters at | |
1782 | // runtime. | |
1783 | return streamError(id, ErrCodeRefusedStream) | |
1784 | } | |
1785 | ||
1786 | initialState := stateOpen | |
1787 | if f.StreamEnded() { | |
1788 | initialState = stateHalfClosedRemote | |
1789 | } | |
1790 | st := sc.newStream(id, 0, initialState) | |
1791 | ||
1792 | if f.HasPriority() { | |
1793 | if err := checkPriority(f.StreamID, f.Priority); err != nil { | |
1794 | return err | |
1795 | } | |
1796 | sc.writeSched.AdjustStream(st.id, f.Priority) | |
1797 | } | |
1798 | ||
1799 | rw, req, err := sc.newWriterAndRequest(st, f) | |
1800 | if err != nil { | |
1801 | return err | |
1802 | } | |
1803 | st.reqTrailer = req.Trailer | |
1804 | if st.reqTrailer != nil { | |
1805 | st.trailer = make(http.Header) | |
1806 | } | |
1807 | st.body = req.Body.(*requestBody).pipe // may be nil | |
1808 | st.declBodyBytes = req.ContentLength | |
1809 | ||
1810 | handler := sc.handler.ServeHTTP | |
1811 | if f.Truncated { | |
1812 | // Their header list was too long. Send a 431 error. | |
1813 | handler = handleHeaderListTooLong | |
1814 | } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil { | |
1815 | handler = new400Handler(err) | |
1816 | } | |
1817 | ||
1818 | // The net/http package sets the read deadline from the | |
1819 | // http.Server.ReadTimeout during the TLS handshake, but then | |
1820 | // passes the connection off to us with the deadline already | |
1821 | // set. Disarm it here after the request headers are read, | |
1822 | // similar to how the http1 server works. Here it's | |
1823 | // technically more like the http1 Server's ReadHeaderTimeout | |
1824 | // (in Go 1.8), though. That's a more sane option anyway. | |
1825 | if sc.hs.ReadTimeout != 0 { | |
1826 | sc.conn.SetReadDeadline(time.Time{}) | |
1827 | } | |
1828 | ||
1829 | go sc.runHandler(rw, req, handler) | |
1830 | return nil | |
1831 | } | |
1832 | ||
1833 | func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error { | |
1834 | sc := st.sc | |
1835 | sc.serveG.check() | |
1836 | if st.gotTrailerHeader { | |
1837 | return ConnectionError(ErrCodeProtocol) | |
1838 | } | |
1839 | st.gotTrailerHeader = true | |
1840 | if !f.StreamEnded() { | |
1841 | return streamError(st.id, ErrCodeProtocol) | |
1842 | } | |
1843 | ||
1844 | if len(f.PseudoFields()) > 0 { | |
1845 | return streamError(st.id, ErrCodeProtocol) | |
1846 | } | |
1847 | if st.trailer != nil { | |
1848 | for _, hf := range f.RegularFields() { | |
1849 | key := sc.canonicalHeader(hf.Name) | |
107c1cdb | 1850 | if !httpguts.ValidTrailerHeader(key) { |
15c0b25d AP |
1851 | // TODO: send more details to the peer somehow. But http2 has |
1852 | // no way to send debug data at a stream level. Discuss with | |
1853 | // HTTP folk. | |
1854 | return streamError(st.id, ErrCodeProtocol) | |
1855 | } | |
1856 | st.trailer[key] = append(st.trailer[key], hf.Value) | |
1857 | } | |
1858 | } | |
1859 | st.endStream() | |
1860 | return nil | |
1861 | } | |
1862 | ||
1863 | func checkPriority(streamID uint32, p PriorityParam) error { | |
1864 | if streamID == p.StreamDep { | |
1865 | // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat | |
1866 | // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR." | |
1867 | // Section 5.3.3 says that a stream can depend on one of its dependencies, | |
1868 | // so it's only self-dependencies that are forbidden. | |
1869 | return streamError(streamID, ErrCodeProtocol) | |
1870 | } | |
1871 | return nil | |
1872 | } | |
1873 | ||
1874 | func (sc *serverConn) processPriority(f *PriorityFrame) error { | |
1875 | if sc.inGoAway { | |
1876 | return nil | |
1877 | } | |
1878 | if err := checkPriority(f.StreamID, f.PriorityParam); err != nil { | |
1879 | return err | |
1880 | } | |
1881 | sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam) | |
1882 | return nil | |
1883 | } | |
1884 | ||
1885 | func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream { | |
1886 | sc.serveG.check() | |
1887 | if id == 0 { | |
1888 | panic("internal error: cannot create stream with id 0") | |
1889 | } | |
1890 | ||
107c1cdb | 1891 | ctx, cancelCtx := context.WithCancel(sc.baseCtx) |
15c0b25d AP |
1892 | st := &stream{ |
1893 | sc: sc, | |
1894 | id: id, | |
1895 | state: state, | |
1896 | ctx: ctx, | |
1897 | cancelCtx: cancelCtx, | |
1898 | } | |
1899 | st.cw.Init() | |
1900 | st.flow.conn = &sc.flow // link to conn-level counter | |
1901 | st.flow.add(sc.initialStreamSendWindowSize) | |
1902 | st.inflow.conn = &sc.inflow // link to conn-level counter | |
1903 | st.inflow.add(sc.srv.initialStreamRecvWindowSize()) | |
1904 | if sc.hs.WriteTimeout != 0 { | |
1905 | st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout) | |
1906 | } | |
1907 | ||
1908 | sc.streams[id] = st | |
1909 | sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID}) | |
1910 | if st.isPushed() { | |
1911 | sc.curPushedStreams++ | |
1912 | } else { | |
1913 | sc.curClientStreams++ | |
1914 | } | |
1915 | if sc.curOpenStreams() == 1 { | |
1916 | sc.setConnState(http.StateActive) | |
1917 | } | |
1918 | ||
1919 | return st | |
1920 | } | |
1921 | ||
1922 | func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) { | |
1923 | sc.serveG.check() | |
1924 | ||
1925 | rp := requestParam{ | |
1926 | method: f.PseudoValue("method"), | |
1927 | scheme: f.PseudoValue("scheme"), | |
1928 | authority: f.PseudoValue("authority"), | |
1929 | path: f.PseudoValue("path"), | |
1930 | } | |
1931 | ||
1932 | isConnect := rp.method == "CONNECT" | |
1933 | if isConnect { | |
1934 | if rp.path != "" || rp.scheme != "" || rp.authority == "" { | |
1935 | return nil, nil, streamError(f.StreamID, ErrCodeProtocol) | |
1936 | } | |
1937 | } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") { | |
1938 | // See 8.1.2.6 Malformed Requests and Responses: | |
1939 | // | |
1940 | // Malformed requests or responses that are detected | |
1941 | // MUST be treated as a stream error (Section 5.4.2) | |
1942 | // of type PROTOCOL_ERROR." | |
1943 | // | |
1944 | // 8.1.2.3 Request Pseudo-Header Fields | |
1945 | // "All HTTP/2 requests MUST include exactly one valid | |
1946 | // value for the :method, :scheme, and :path | |
1947 | // pseudo-header fields" | |
1948 | return nil, nil, streamError(f.StreamID, ErrCodeProtocol) | |
1949 | } | |
1950 | ||
1951 | bodyOpen := !f.StreamEnded() | |
1952 | if rp.method == "HEAD" && bodyOpen { | |
1953 | // HEAD requests can't have bodies | |
1954 | return nil, nil, streamError(f.StreamID, ErrCodeProtocol) | |
1955 | } | |
1956 | ||
1957 | rp.header = make(http.Header) | |
1958 | for _, hf := range f.RegularFields() { | |
1959 | rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value) | |
1960 | } | |
1961 | if rp.authority == "" { | |
1962 | rp.authority = rp.header.Get("Host") | |
1963 | } | |
1964 | ||
1965 | rw, req, err := sc.newWriterAndRequestNoBody(st, rp) | |
1966 | if err != nil { | |
1967 | return nil, nil, err | |
1968 | } | |
1969 | if bodyOpen { | |
1970 | if vv, ok := rp.header["Content-Length"]; ok { | |
1971 | req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64) | |
1972 | } else { | |
1973 | req.ContentLength = -1 | |
1974 | } | |
1975 | req.Body.(*requestBody).pipe = &pipe{ | |
1976 | b: &dataBuffer{expected: req.ContentLength}, | |
1977 | } | |
1978 | } | |
1979 | return rw, req, nil | |
1980 | } | |
1981 | ||
1982 | type requestParam struct { | |
1983 | method string | |
1984 | scheme, authority, path string | |
1985 | header http.Header | |
1986 | } | |
1987 | ||
1988 | func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) { | |
1989 | sc.serveG.check() | |
1990 | ||
1991 | var tlsState *tls.ConnectionState // nil if not scheme https | |
1992 | if rp.scheme == "https" { | |
1993 | tlsState = sc.tlsState | |
1994 | } | |
1995 | ||
1996 | needsContinue := rp.header.Get("Expect") == "100-continue" | |
1997 | if needsContinue { | |
1998 | rp.header.Del("Expect") | |
1999 | } | |
2000 | // Merge Cookie headers into one "; "-delimited value. | |
2001 | if cookies := rp.header["Cookie"]; len(cookies) > 1 { | |
2002 | rp.header.Set("Cookie", strings.Join(cookies, "; ")) | |
2003 | } | |
2004 | ||
2005 | // Setup Trailers | |
2006 | var trailer http.Header | |
2007 | for _, v := range rp.header["Trailer"] { | |
2008 | for _, key := range strings.Split(v, ",") { | |
2009 | key = http.CanonicalHeaderKey(strings.TrimSpace(key)) | |
2010 | switch key { | |
2011 | case "Transfer-Encoding", "Trailer", "Content-Length": | |
2012 | // Bogus. (copy of http1 rules) | |
2013 | // Ignore. | |
2014 | default: | |
2015 | if trailer == nil { | |
2016 | trailer = make(http.Header) | |
2017 | } | |
2018 | trailer[key] = nil | |
2019 | } | |
2020 | } | |
2021 | } | |
2022 | delete(rp.header, "Trailer") | |
2023 | ||
2024 | var url_ *url.URL | |
2025 | var requestURI string | |
2026 | if rp.method == "CONNECT" { | |
2027 | url_ = &url.URL{Host: rp.authority} | |
2028 | requestURI = rp.authority // mimic HTTP/1 server behavior | |
2029 | } else { | |
2030 | var err error | |
2031 | url_, err = url.ParseRequestURI(rp.path) | |
2032 | if err != nil { | |
2033 | return nil, nil, streamError(st.id, ErrCodeProtocol) | |
2034 | } | |
2035 | requestURI = rp.path | |
2036 | } | |
2037 | ||
2038 | body := &requestBody{ | |
2039 | conn: sc, | |
2040 | stream: st, | |
2041 | needsContinue: needsContinue, | |
2042 | } | |
2043 | req := &http.Request{ | |
2044 | Method: rp.method, | |
2045 | URL: url_, | |
2046 | RemoteAddr: sc.remoteAddrStr, | |
2047 | Header: rp.header, | |
2048 | RequestURI: requestURI, | |
2049 | Proto: "HTTP/2.0", | |
2050 | ProtoMajor: 2, | |
2051 | ProtoMinor: 0, | |
2052 | TLS: tlsState, | |
2053 | Host: rp.authority, | |
2054 | Body: body, | |
2055 | Trailer: trailer, | |
2056 | } | |
107c1cdb | 2057 | req = req.WithContext(st.ctx) |
15c0b25d AP |
2058 | |
2059 | rws := responseWriterStatePool.Get().(*responseWriterState) | |
2060 | bwSave := rws.bw | |
2061 | *rws = responseWriterState{} // zero all the fields | |
2062 | rws.conn = sc | |
2063 | rws.bw = bwSave | |
2064 | rws.bw.Reset(chunkWriter{rws}) | |
2065 | rws.stream = st | |
2066 | rws.req = req | |
2067 | rws.body = body | |
2068 | ||
2069 | rw := &responseWriter{rws: rws} | |
2070 | return rw, req, nil | |
2071 | } | |
2072 | ||
2073 | // Run on its own goroutine. | |
2074 | func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) { | |
2075 | didPanic := true | |
2076 | defer func() { | |
2077 | rw.rws.stream.cancelCtx() | |
2078 | if didPanic { | |
2079 | e := recover() | |
2080 | sc.writeFrameFromHandler(FrameWriteRequest{ | |
2081 | write: handlerPanicRST{rw.rws.stream.id}, | |
2082 | stream: rw.rws.stream, | |
2083 | }) | |
2084 | // Same as net/http: | |
107c1cdb | 2085 | if e != nil && e != http.ErrAbortHandler { |
15c0b25d AP |
2086 | const size = 64 << 10 |
2087 | buf := make([]byte, size) | |
2088 | buf = buf[:runtime.Stack(buf, false)] | |
2089 | sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf) | |
2090 | } | |
2091 | return | |
2092 | } | |
2093 | rw.handlerDone() | |
2094 | }() | |
2095 | handler(rw, req) | |
2096 | didPanic = false | |
2097 | } | |
2098 | ||
2099 | func handleHeaderListTooLong(w http.ResponseWriter, r *http.Request) { | |
2100 | // 10.5.1 Limits on Header Block Size: | |
2101 | // .. "A server that receives a larger header block than it is | |
2102 | // willing to handle can send an HTTP 431 (Request Header Fields Too | |
2103 | // Large) status code" | |
2104 | const statusRequestHeaderFieldsTooLarge = 431 // only in Go 1.6+ | |
2105 | w.WriteHeader(statusRequestHeaderFieldsTooLarge) | |
2106 | io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>") | |
2107 | } | |
2108 | ||
2109 | // called from handler goroutines. | |
2110 | // h may be nil. | |
2111 | func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) error { | |
2112 | sc.serveG.checkNotOn() // NOT on | |
2113 | var errc chan error | |
2114 | if headerData.h != nil { | |
2115 | // If there's a header map (which we don't own), so we have to block on | |
2116 | // waiting for this frame to be written, so an http.Flush mid-handler | |
2117 | // writes out the correct value of keys, before a handler later potentially | |
2118 | // mutates it. | |
2119 | errc = errChanPool.Get().(chan error) | |
2120 | } | |
2121 | if err := sc.writeFrameFromHandler(FrameWriteRequest{ | |
2122 | write: headerData, | |
2123 | stream: st, | |
2124 | done: errc, | |
2125 | }); err != nil { | |
2126 | return err | |
2127 | } | |
2128 | if errc != nil { | |
2129 | select { | |
2130 | case err := <-errc: | |
2131 | errChanPool.Put(errc) | |
2132 | return err | |
2133 | case <-sc.doneServing: | |
2134 | return errClientDisconnected | |
2135 | case <-st.cw: | |
2136 | return errStreamClosed | |
2137 | } | |
2138 | } | |
2139 | return nil | |
2140 | } | |
2141 | ||
2142 | // called from handler goroutines. | |
2143 | func (sc *serverConn) write100ContinueHeaders(st *stream) { | |
2144 | sc.writeFrameFromHandler(FrameWriteRequest{ | |
2145 | write: write100ContinueHeadersFrame{st.id}, | |
2146 | stream: st, | |
2147 | }) | |
2148 | } | |
2149 | ||
2150 | // A bodyReadMsg tells the server loop that the http.Handler read n | |
2151 | // bytes of the DATA from the client on the given stream. | |
2152 | type bodyReadMsg struct { | |
2153 | st *stream | |
2154 | n int | |
2155 | } | |
2156 | ||
2157 | // called from handler goroutines. | |
2158 | // Notes that the handler for the given stream ID read n bytes of its body | |
2159 | // and schedules flow control tokens to be sent. | |
2160 | func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) { | |
2161 | sc.serveG.checkNotOn() // NOT on | |
2162 | if n > 0 { | |
2163 | select { | |
2164 | case sc.bodyReadCh <- bodyReadMsg{st, n}: | |
2165 | case <-sc.doneServing: | |
2166 | } | |
2167 | } | |
2168 | } | |
2169 | ||
2170 | func (sc *serverConn) noteBodyRead(st *stream, n int) { | |
2171 | sc.serveG.check() | |
2172 | sc.sendWindowUpdate(nil, n) // conn-level | |
2173 | if st.state != stateHalfClosedRemote && st.state != stateClosed { | |
2174 | // Don't send this WINDOW_UPDATE if the stream is closed | |
2175 | // remotely. | |
2176 | sc.sendWindowUpdate(st, n) | |
2177 | } | |
2178 | } | |
2179 | ||
2180 | // st may be nil for conn-level | |
2181 | func (sc *serverConn) sendWindowUpdate(st *stream, n int) { | |
2182 | sc.serveG.check() | |
2183 | // "The legal range for the increment to the flow control | |
2184 | // window is 1 to 2^31-1 (2,147,483,647) octets." | |
2185 | // A Go Read call on 64-bit machines could in theory read | |
2186 | // a larger Read than this. Very unlikely, but we handle it here | |
2187 | // rather than elsewhere for now. | |
2188 | const maxUint31 = 1<<31 - 1 | |
2189 | for n >= maxUint31 { | |
2190 | sc.sendWindowUpdate32(st, maxUint31) | |
2191 | n -= maxUint31 | |
2192 | } | |
2193 | sc.sendWindowUpdate32(st, int32(n)) | |
2194 | } | |
2195 | ||
2196 | // st may be nil for conn-level | |
2197 | func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) { | |
2198 | sc.serveG.check() | |
2199 | if n == 0 { | |
2200 | return | |
2201 | } | |
2202 | if n < 0 { | |
2203 | panic("negative update") | |
2204 | } | |
2205 | var streamID uint32 | |
2206 | if st != nil { | |
2207 | streamID = st.id | |
2208 | } | |
2209 | sc.writeFrame(FrameWriteRequest{ | |
2210 | write: writeWindowUpdate{streamID: streamID, n: uint32(n)}, | |
2211 | stream: st, | |
2212 | }) | |
2213 | var ok bool | |
2214 | if st == nil { | |
2215 | ok = sc.inflow.add(n) | |
2216 | } else { | |
2217 | ok = st.inflow.add(n) | |
2218 | } | |
2219 | if !ok { | |
2220 | panic("internal error; sent too many window updates without decrements?") | |
2221 | } | |
2222 | } | |
2223 | ||
2224 | // requestBody is the Handler's Request.Body type. | |
2225 | // Read and Close may be called concurrently. | |
2226 | type requestBody struct { | |
2227 | stream *stream | |
2228 | conn *serverConn | |
2229 | closed bool // for use by Close only | |
2230 | sawEOF bool // for use by Read only | |
2231 | pipe *pipe // non-nil if we have a HTTP entity message body | |
2232 | needsContinue bool // need to send a 100-continue | |
2233 | } | |
2234 | ||
2235 | func (b *requestBody) Close() error { | |
2236 | if b.pipe != nil && !b.closed { | |
2237 | b.pipe.BreakWithError(errClosedBody) | |
2238 | } | |
2239 | b.closed = true | |
2240 | return nil | |
2241 | } | |
2242 | ||
2243 | func (b *requestBody) Read(p []byte) (n int, err error) { | |
2244 | if b.needsContinue { | |
2245 | b.needsContinue = false | |
2246 | b.conn.write100ContinueHeaders(b.stream) | |
2247 | } | |
2248 | if b.pipe == nil || b.sawEOF { | |
2249 | return 0, io.EOF | |
2250 | } | |
2251 | n, err = b.pipe.Read(p) | |
2252 | if err == io.EOF { | |
2253 | b.sawEOF = true | |
2254 | } | |
2255 | if b.conn == nil && inTests { | |
2256 | return | |
2257 | } | |
2258 | b.conn.noteBodyReadFromHandler(b.stream, n, err) | |
2259 | return | |
2260 | } | |
2261 | ||
2262 | // responseWriter is the http.ResponseWriter implementation. It's | |
2263 | // intentionally small (1 pointer wide) to minimize garbage. The | |
2264 | // responseWriterState pointer inside is zeroed at the end of a | |
2265 | // request (in handlerDone) and calls on the responseWriter thereafter | |
2266 | // simply crash (caller's mistake), but the much larger responseWriterState | |
2267 | // and buffers are reused between multiple requests. | |
2268 | type responseWriter struct { | |
2269 | rws *responseWriterState | |
2270 | } | |
2271 | ||
2272 | // Optional http.ResponseWriter interfaces implemented. | |
2273 | var ( | |
2274 | _ http.CloseNotifier = (*responseWriter)(nil) | |
2275 | _ http.Flusher = (*responseWriter)(nil) | |
2276 | _ stringWriter = (*responseWriter)(nil) | |
2277 | ) | |
2278 | ||
2279 | type responseWriterState struct { | |
2280 | // immutable within a request: | |
2281 | stream *stream | |
2282 | req *http.Request | |
2283 | body *requestBody // to close at end of request, if DATA frames didn't | |
2284 | conn *serverConn | |
2285 | ||
2286 | // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc | |
2287 | bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState} | |
2288 | ||
2289 | // mutated by http.Handler goroutine: | |
2290 | handlerHeader http.Header // nil until called | |
2291 | snapHeader http.Header // snapshot of handlerHeader at WriteHeader time | |
2292 | trailers []string // set in writeChunk | |
2293 | status int // status code passed to WriteHeader | |
2294 | wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet. | |
2295 | sentHeader bool // have we sent the header frame? | |
2296 | handlerDone bool // handler has finished | |
2297 | dirty bool // a Write failed; don't reuse this responseWriterState | |
2298 | ||
2299 | sentContentLen int64 // non-zero if handler set a Content-Length header | |
2300 | wroteBytes int64 | |
2301 | ||
2302 | closeNotifierMu sync.Mutex // guards closeNotifierCh | |
2303 | closeNotifierCh chan bool // nil until first used | |
2304 | } | |
2305 | ||
2306 | type chunkWriter struct{ rws *responseWriterState } | |
2307 | ||
2308 | func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) } | |
2309 | ||
107c1cdb ND |
2310 | func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) > 0 } |
2311 | ||
2312 | func (rws *responseWriterState) hasNonemptyTrailers() bool { | |
2313 | for _, trailer := range rws.trailers { | |
2314 | if _, ok := rws.handlerHeader[trailer]; ok { | |
2315 | return true | |
2316 | } | |
2317 | } | |
2318 | return false | |
2319 | } | |
15c0b25d AP |
2320 | |
2321 | // declareTrailer is called for each Trailer header when the | |
2322 | // response header is written. It notes that a header will need to be | |
2323 | // written in the trailers at the end of the response. | |
2324 | func (rws *responseWriterState) declareTrailer(k string) { | |
2325 | k = http.CanonicalHeaderKey(k) | |
107c1cdb ND |
2326 | if !httpguts.ValidTrailerHeader(k) { |
2327 | // Forbidden by RFC 7230, section 4.1.2. | |
15c0b25d AP |
2328 | rws.conn.logf("ignoring invalid trailer %q", k) |
2329 | return | |
2330 | } | |
2331 | if !strSliceContains(rws.trailers, k) { | |
2332 | rws.trailers = append(rws.trailers, k) | |
2333 | } | |
2334 | } | |
2335 | ||
2336 | // writeChunk writes chunks from the bufio.Writer. But because | |
2337 | // bufio.Writer may bypass its chunking, sometimes p may be | |
2338 | // arbitrarily large. | |
2339 | // | |
2340 | // writeChunk is also responsible (on the first chunk) for sending the | |
2341 | // HEADER response. | |
2342 | func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) { | |
2343 | if !rws.wroteHeader { | |
2344 | rws.writeHeader(200) | |
2345 | } | |
2346 | ||
2347 | isHeadResp := rws.req.Method == "HEAD" | |
2348 | if !rws.sentHeader { | |
2349 | rws.sentHeader = true | |
2350 | var ctype, clen string | |
2351 | if clen = rws.snapHeader.Get("Content-Length"); clen != "" { | |
2352 | rws.snapHeader.Del("Content-Length") | |
2353 | clen64, err := strconv.ParseInt(clen, 10, 64) | |
2354 | if err == nil && clen64 >= 0 { | |
2355 | rws.sentContentLen = clen64 | |
2356 | } else { | |
2357 | clen = "" | |
2358 | } | |
2359 | } | |
2360 | if clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) { | |
2361 | clen = strconv.Itoa(len(p)) | |
2362 | } | |
2363 | _, hasContentType := rws.snapHeader["Content-Type"] | |
107c1cdb | 2364 | if !hasContentType && bodyAllowedForStatus(rws.status) && len(p) > 0 { |
15c0b25d AP |
2365 | ctype = http.DetectContentType(p) |
2366 | } | |
2367 | var date string | |
2368 | if _, ok := rws.snapHeader["Date"]; !ok { | |
2369 | // TODO(bradfitz): be faster here, like net/http? measure. | |
2370 | date = time.Now().UTC().Format(http.TimeFormat) | |
2371 | } | |
2372 | ||
2373 | for _, v := range rws.snapHeader["Trailer"] { | |
2374 | foreachHeaderElement(v, rws.declareTrailer) | |
2375 | } | |
2376 | ||
107c1cdb ND |
2377 | // "Connection" headers aren't allowed in HTTP/2 (RFC 7540, 8.1.2.2), |
2378 | // but respect "Connection" == "close" to mean sending a GOAWAY and tearing | |
2379 | // down the TCP connection when idle, like we do for HTTP/1. | |
2380 | // TODO: remove more Connection-specific header fields here, in addition | |
2381 | // to "Connection". | |
2382 | if _, ok := rws.snapHeader["Connection"]; ok { | |
2383 | v := rws.snapHeader.Get("Connection") | |
2384 | delete(rws.snapHeader, "Connection") | |
2385 | if v == "close" { | |
2386 | rws.conn.startGracefulShutdown() | |
2387 | } | |
2388 | } | |
2389 | ||
15c0b25d AP |
2390 | endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp |
2391 | err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{ | |
2392 | streamID: rws.stream.id, | |
2393 | httpResCode: rws.status, | |
2394 | h: rws.snapHeader, | |
2395 | endStream: endStream, | |
2396 | contentType: ctype, | |
2397 | contentLength: clen, | |
2398 | date: date, | |
2399 | }) | |
2400 | if err != nil { | |
2401 | rws.dirty = true | |
2402 | return 0, err | |
2403 | } | |
2404 | if endStream { | |
2405 | return 0, nil | |
2406 | } | |
2407 | } | |
2408 | if isHeadResp { | |
2409 | return len(p), nil | |
2410 | } | |
2411 | if len(p) == 0 && !rws.handlerDone { | |
2412 | return 0, nil | |
2413 | } | |
2414 | ||
2415 | if rws.handlerDone { | |
2416 | rws.promoteUndeclaredTrailers() | |
2417 | } | |
2418 | ||
107c1cdb ND |
2419 | // only send trailers if they have actually been defined by the |
2420 | // server handler. | |
2421 | hasNonemptyTrailers := rws.hasNonemptyTrailers() | |
2422 | endStream := rws.handlerDone && !hasNonemptyTrailers | |
15c0b25d AP |
2423 | if len(p) > 0 || endStream { |
2424 | // only send a 0 byte DATA frame if we're ending the stream. | |
2425 | if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil { | |
2426 | rws.dirty = true | |
2427 | return 0, err | |
2428 | } | |
2429 | } | |
2430 | ||
107c1cdb | 2431 | if rws.handlerDone && hasNonemptyTrailers { |
15c0b25d AP |
2432 | err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{ |
2433 | streamID: rws.stream.id, | |
2434 | h: rws.handlerHeader, | |
2435 | trailers: rws.trailers, | |
2436 | endStream: true, | |
2437 | }) | |
2438 | if err != nil { | |
2439 | rws.dirty = true | |
2440 | } | |
2441 | return len(p), err | |
2442 | } | |
2443 | return len(p), nil | |
2444 | } | |
2445 | ||
2446 | // TrailerPrefix is a magic prefix for ResponseWriter.Header map keys | |
2447 | // that, if present, signals that the map entry is actually for | |
2448 | // the response trailers, and not the response headers. The prefix | |
2449 | // is stripped after the ServeHTTP call finishes and the values are | |
2450 | // sent in the trailers. | |
2451 | // | |
2452 | // This mechanism is intended only for trailers that are not known | |
2453 | // prior to the headers being written. If the set of trailers is fixed | |
2454 | // or known before the header is written, the normal Go trailers mechanism | |
2455 | // is preferred: | |
2456 | // https://golang.org/pkg/net/http/#ResponseWriter | |
2457 | // https://golang.org/pkg/net/http/#example_ResponseWriter_trailers | |
2458 | const TrailerPrefix = "Trailer:" | |
2459 | ||
2460 | // promoteUndeclaredTrailers permits http.Handlers to set trailers | |
2461 | // after the header has already been flushed. Because the Go | |
2462 | // ResponseWriter interface has no way to set Trailers (only the | |
2463 | // Header), and because we didn't want to expand the ResponseWriter | |
107c1cdb | 2464 | // interface, and because nobody used trailers, and because RFC 7230 |
15c0b25d AP |
2465 | // says you SHOULD (but not must) predeclare any trailers in the |
2466 | // header, the official ResponseWriter rules said trailers in Go must | |
2467 | // be predeclared, and then we reuse the same ResponseWriter.Header() | |
2468 | // map to mean both Headers and Trailers. When it's time to write the | |
2469 | // Trailers, we pick out the fields of Headers that were declared as | |
2470 | // trailers. That worked for a while, until we found the first major | |
2471 | // user of Trailers in the wild: gRPC (using them only over http2), | |
2472 | // and gRPC libraries permit setting trailers mid-stream without | |
2473 | // predeclarnig them. So: change of plans. We still permit the old | |
2474 | // way, but we also permit this hack: if a Header() key begins with | |
2475 | // "Trailer:", the suffix of that key is a Trailer. Because ':' is an | |
2476 | // invalid token byte anyway, there is no ambiguity. (And it's already | |
2477 | // filtered out) It's mildly hacky, but not terrible. | |
2478 | // | |
2479 | // This method runs after the Handler is done and promotes any Header | |
2480 | // fields to be trailers. | |
2481 | func (rws *responseWriterState) promoteUndeclaredTrailers() { | |
2482 | for k, vv := range rws.handlerHeader { | |
2483 | if !strings.HasPrefix(k, TrailerPrefix) { | |
2484 | continue | |
2485 | } | |
2486 | trailerKey := strings.TrimPrefix(k, TrailerPrefix) | |
2487 | rws.declareTrailer(trailerKey) | |
2488 | rws.handlerHeader[http.CanonicalHeaderKey(trailerKey)] = vv | |
2489 | } | |
2490 | ||
2491 | if len(rws.trailers) > 1 { | |
2492 | sorter := sorterPool.Get().(*sorter) | |
2493 | sorter.SortStrings(rws.trailers) | |
2494 | sorterPool.Put(sorter) | |
2495 | } | |
2496 | } | |
2497 | ||
2498 | func (w *responseWriter) Flush() { | |
2499 | rws := w.rws | |
2500 | if rws == nil { | |
2501 | panic("Header called after Handler finished") | |
2502 | } | |
2503 | if rws.bw.Buffered() > 0 { | |
2504 | if err := rws.bw.Flush(); err != nil { | |
2505 | // Ignore the error. The frame writer already knows. | |
2506 | return | |
2507 | } | |
2508 | } else { | |
2509 | // The bufio.Writer won't call chunkWriter.Write | |
2510 | // (writeChunk with zero bytes, so we have to do it | |
2511 | // ourselves to force the HTTP response header and/or | |
2512 | // final DATA frame (with END_STREAM) to be sent. | |
2513 | rws.writeChunk(nil) | |
2514 | } | |
2515 | } | |
2516 | ||
2517 | func (w *responseWriter) CloseNotify() <-chan bool { | |
2518 | rws := w.rws | |
2519 | if rws == nil { | |
2520 | panic("CloseNotify called after Handler finished") | |
2521 | } | |
2522 | rws.closeNotifierMu.Lock() | |
2523 | ch := rws.closeNotifierCh | |
2524 | if ch == nil { | |
2525 | ch = make(chan bool, 1) | |
2526 | rws.closeNotifierCh = ch | |
2527 | cw := rws.stream.cw | |
2528 | go func() { | |
2529 | cw.Wait() // wait for close | |
2530 | ch <- true | |
2531 | }() | |
2532 | } | |
2533 | rws.closeNotifierMu.Unlock() | |
2534 | return ch | |
2535 | } | |
2536 | ||
2537 | func (w *responseWriter) Header() http.Header { | |
2538 | rws := w.rws | |
2539 | if rws == nil { | |
2540 | panic("Header called after Handler finished") | |
2541 | } | |
2542 | if rws.handlerHeader == nil { | |
2543 | rws.handlerHeader = make(http.Header) | |
2544 | } | |
2545 | return rws.handlerHeader | |
2546 | } | |
2547 | ||
107c1cdb ND |
2548 | // checkWriteHeaderCode is a copy of net/http's checkWriteHeaderCode. |
2549 | func checkWriteHeaderCode(code int) { | |
2550 | // Issue 22880: require valid WriteHeader status codes. | |
2551 | // For now we only enforce that it's three digits. | |
2552 | // In the future we might block things over 599 (600 and above aren't defined | |
2553 | // at http://httpwg.org/specs/rfc7231.html#status.codes) | |
2554 | // and we might block under 200 (once we have more mature 1xx support). | |
2555 | // But for now any three digits. | |
2556 | // | |
2557 | // We used to send "HTTP/1.1 000 0" on the wire in responses but there's | |
2558 | // no equivalent bogus thing we can realistically send in HTTP/2, | |
2559 | // so we'll consistently panic instead and help people find their bugs | |
2560 | // early. (We can't return an error from WriteHeader even if we wanted to.) | |
2561 | if code < 100 || code > 999 { | |
2562 | panic(fmt.Sprintf("invalid WriteHeader code %v", code)) | |
2563 | } | |
2564 | } | |
2565 | ||
15c0b25d AP |
2566 | func (w *responseWriter) WriteHeader(code int) { |
2567 | rws := w.rws | |
2568 | if rws == nil { | |
2569 | panic("WriteHeader called after Handler finished") | |
2570 | } | |
2571 | rws.writeHeader(code) | |
2572 | } | |
2573 | ||
2574 | func (rws *responseWriterState) writeHeader(code int) { | |
2575 | if !rws.wroteHeader { | |
107c1cdb | 2576 | checkWriteHeaderCode(code) |
15c0b25d AP |
2577 | rws.wroteHeader = true |
2578 | rws.status = code | |
2579 | if len(rws.handlerHeader) > 0 { | |
2580 | rws.snapHeader = cloneHeader(rws.handlerHeader) | |
2581 | } | |
2582 | } | |
2583 | } | |
2584 | ||
2585 | func cloneHeader(h http.Header) http.Header { | |
2586 | h2 := make(http.Header, len(h)) | |
2587 | for k, vv := range h { | |
2588 | vv2 := make([]string, len(vv)) | |
2589 | copy(vv2, vv) | |
2590 | h2[k] = vv2 | |
2591 | } | |
2592 | return h2 | |
2593 | } | |
2594 | ||
2595 | // The Life Of A Write is like this: | |
2596 | // | |
2597 | // * Handler calls w.Write or w.WriteString -> | |
2598 | // * -> rws.bw (*bufio.Writer) -> | |
2599 | // * (Handler might call Flush) | |
2600 | // * -> chunkWriter{rws} | |
2601 | // * -> responseWriterState.writeChunk(p []byte) | |
2602 | // * -> responseWriterState.writeChunk (most of the magic; see comment there) | |
2603 | func (w *responseWriter) Write(p []byte) (n int, err error) { | |
2604 | return w.write(len(p), p, "") | |
2605 | } | |
2606 | ||
2607 | func (w *responseWriter) WriteString(s string) (n int, err error) { | |
2608 | return w.write(len(s), nil, s) | |
2609 | } | |
2610 | ||
2611 | // either dataB or dataS is non-zero. | |
2612 | func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) { | |
2613 | rws := w.rws | |
2614 | if rws == nil { | |
2615 | panic("Write called after Handler finished") | |
2616 | } | |
2617 | if !rws.wroteHeader { | |
2618 | w.WriteHeader(200) | |
2619 | } | |
2620 | if !bodyAllowedForStatus(rws.status) { | |
2621 | return 0, http.ErrBodyNotAllowed | |
2622 | } | |
2623 | rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) // only one can be set | |
2624 | if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen { | |
2625 | // TODO: send a RST_STREAM | |
2626 | return 0, errors.New("http2: handler wrote more than declared Content-Length") | |
2627 | } | |
2628 | ||
2629 | if dataB != nil { | |
2630 | return rws.bw.Write(dataB) | |
2631 | } else { | |
2632 | return rws.bw.WriteString(dataS) | |
2633 | } | |
2634 | } | |
2635 | ||
2636 | func (w *responseWriter) handlerDone() { | |
2637 | rws := w.rws | |
2638 | dirty := rws.dirty | |
2639 | rws.handlerDone = true | |
2640 | w.Flush() | |
2641 | w.rws = nil | |
2642 | if !dirty { | |
2643 | // Only recycle the pool if all prior Write calls to | |
2644 | // the serverConn goroutine completed successfully. If | |
2645 | // they returned earlier due to resets from the peer | |
2646 | // there might still be write goroutines outstanding | |
2647 | // from the serverConn referencing the rws memory. See | |
2648 | // issue 20704. | |
2649 | responseWriterStatePool.Put(rws) | |
2650 | } | |
2651 | } | |
2652 | ||
2653 | // Push errors. | |
2654 | var ( | |
2655 | ErrRecursivePush = errors.New("http2: recursive push not allowed") | |
2656 | ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS") | |
2657 | ) | |
2658 | ||
107c1cdb | 2659 | var _ http.Pusher = (*responseWriter)(nil) |
15c0b25d | 2660 | |
107c1cdb | 2661 | func (w *responseWriter) Push(target string, opts *http.PushOptions) error { |
15c0b25d AP |
2662 | st := w.rws.stream |
2663 | sc := st.sc | |
2664 | sc.serveG.checkNotOn() | |
2665 | ||
2666 | // No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream." | |
2667 | // http://tools.ietf.org/html/rfc7540#section-6.6 | |
2668 | if st.isPushed() { | |
2669 | return ErrRecursivePush | |
2670 | } | |
2671 | ||
107c1cdb ND |
2672 | if opts == nil { |
2673 | opts = new(http.PushOptions) | |
2674 | } | |
2675 | ||
15c0b25d AP |
2676 | // Default options. |
2677 | if opts.Method == "" { | |
2678 | opts.Method = "GET" | |
2679 | } | |
2680 | if opts.Header == nil { | |
2681 | opts.Header = http.Header{} | |
2682 | } | |
2683 | wantScheme := "http" | |
2684 | if w.rws.req.TLS != nil { | |
2685 | wantScheme = "https" | |
2686 | } | |
2687 | ||
2688 | // Validate the request. | |
2689 | u, err := url.Parse(target) | |
2690 | if err != nil { | |
2691 | return err | |
2692 | } | |
2693 | if u.Scheme == "" { | |
2694 | if !strings.HasPrefix(target, "/") { | |
2695 | return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target) | |
2696 | } | |
2697 | u.Scheme = wantScheme | |
2698 | u.Host = w.rws.req.Host | |
2699 | } else { | |
2700 | if u.Scheme != wantScheme { | |
2701 | return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme) | |
2702 | } | |
2703 | if u.Host == "" { | |
2704 | return errors.New("URL must have a host") | |
2705 | } | |
2706 | } | |
2707 | for k := range opts.Header { | |
2708 | if strings.HasPrefix(k, ":") { | |
2709 | return fmt.Errorf("promised request headers cannot include pseudo header %q", k) | |
2710 | } | |
2711 | // These headers are meaningful only if the request has a body, | |
2712 | // but PUSH_PROMISE requests cannot have a body. | |
2713 | // http://tools.ietf.org/html/rfc7540#section-8.2 | |
2714 | // Also disallow Host, since the promised URL must be absolute. | |
2715 | switch strings.ToLower(k) { | |
2716 | case "content-length", "content-encoding", "trailer", "te", "expect", "host": | |
2717 | return fmt.Errorf("promised request headers cannot include %q", k) | |
2718 | } | |
2719 | } | |
2720 | if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil { | |
2721 | return err | |
2722 | } | |
2723 | ||
2724 | // The RFC effectively limits promised requests to GET and HEAD: | |
2725 | // "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]" | |
2726 | // http://tools.ietf.org/html/rfc7540#section-8.2 | |
2727 | if opts.Method != "GET" && opts.Method != "HEAD" { | |
2728 | return fmt.Errorf("method %q must be GET or HEAD", opts.Method) | |
2729 | } | |
2730 | ||
2731 | msg := &startPushRequest{ | |
2732 | parent: st, | |
2733 | method: opts.Method, | |
2734 | url: u, | |
2735 | header: cloneHeader(opts.Header), | |
2736 | done: errChanPool.Get().(chan error), | |
2737 | } | |
2738 | ||
2739 | select { | |
2740 | case <-sc.doneServing: | |
2741 | return errClientDisconnected | |
2742 | case <-st.cw: | |
2743 | return errStreamClosed | |
2744 | case sc.serveMsgCh <- msg: | |
2745 | } | |
2746 | ||
2747 | select { | |
2748 | case <-sc.doneServing: | |
2749 | return errClientDisconnected | |
2750 | case <-st.cw: | |
2751 | return errStreamClosed | |
2752 | case err := <-msg.done: | |
2753 | errChanPool.Put(msg.done) | |
2754 | return err | |
2755 | } | |
2756 | } | |
2757 | ||
2758 | type startPushRequest struct { | |
2759 | parent *stream | |
2760 | method string | |
2761 | url *url.URL | |
2762 | header http.Header | |
2763 | done chan error | |
2764 | } | |
2765 | ||
2766 | func (sc *serverConn) startPush(msg *startPushRequest) { | |
2767 | sc.serveG.check() | |
2768 | ||
2769 | // http://tools.ietf.org/html/rfc7540#section-6.6. | |
2770 | // PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that | |
2771 | // is in either the "open" or "half-closed (remote)" state. | |
2772 | if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote { | |
2773 | // responseWriter.Push checks that the stream is peer-initiaed. | |
2774 | msg.done <- errStreamClosed | |
2775 | return | |
2776 | } | |
2777 | ||
2778 | // http://tools.ietf.org/html/rfc7540#section-6.6. | |
2779 | if !sc.pushEnabled { | |
2780 | msg.done <- http.ErrNotSupported | |
2781 | return | |
2782 | } | |
2783 | ||
2784 | // PUSH_PROMISE frames must be sent in increasing order by stream ID, so | |
2785 | // we allocate an ID for the promised stream lazily, when the PUSH_PROMISE | |
2786 | // is written. Once the ID is allocated, we start the request handler. | |
2787 | allocatePromisedID := func() (uint32, error) { | |
2788 | sc.serveG.check() | |
2789 | ||
2790 | // Check this again, just in case. Technically, we might have received | |
2791 | // an updated SETTINGS by the time we got around to writing this frame. | |
2792 | if !sc.pushEnabled { | |
2793 | return 0, http.ErrNotSupported | |
2794 | } | |
2795 | // http://tools.ietf.org/html/rfc7540#section-6.5.2. | |
2796 | if sc.curPushedStreams+1 > sc.clientMaxStreams { | |
2797 | return 0, ErrPushLimitReached | |
2798 | } | |
2799 | ||
2800 | // http://tools.ietf.org/html/rfc7540#section-5.1.1. | |
2801 | // Streams initiated by the server MUST use even-numbered identifiers. | |
2802 | // A server that is unable to establish a new stream identifier can send a GOAWAY | |
2803 | // frame so that the client is forced to open a new connection for new streams. | |
2804 | if sc.maxPushPromiseID+2 >= 1<<31 { | |
2805 | sc.startGracefulShutdownInternal() | |
2806 | return 0, ErrPushLimitReached | |
2807 | } | |
2808 | sc.maxPushPromiseID += 2 | |
2809 | promisedID := sc.maxPushPromiseID | |
2810 | ||
2811 | // http://tools.ietf.org/html/rfc7540#section-8.2. | |
2812 | // Strictly speaking, the new stream should start in "reserved (local)", then | |
2813 | // transition to "half closed (remote)" after sending the initial HEADERS, but | |
2814 | // we start in "half closed (remote)" for simplicity. | |
2815 | // See further comments at the definition of stateHalfClosedRemote. | |
2816 | promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote) | |
2817 | rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{ | |
2818 | method: msg.method, | |
2819 | scheme: msg.url.Scheme, | |
2820 | authority: msg.url.Host, | |
2821 | path: msg.url.RequestURI(), | |
2822 | header: cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE | |
2823 | }) | |
2824 | if err != nil { | |
2825 | // Should not happen, since we've already validated msg.url. | |
2826 | panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err)) | |
2827 | } | |
2828 | ||
2829 | go sc.runHandler(rw, req, sc.handler.ServeHTTP) | |
2830 | return promisedID, nil | |
2831 | } | |
2832 | ||
2833 | sc.writeFrame(FrameWriteRequest{ | |
2834 | write: &writePushPromise{ | |
2835 | streamID: msg.parent.id, | |
2836 | method: msg.method, | |
2837 | url: msg.url, | |
2838 | h: msg.header, | |
2839 | allocatePromisedID: allocatePromisedID, | |
2840 | }, | |
2841 | stream: msg.parent, | |
2842 | done: msg.done, | |
2843 | }) | |
2844 | } | |
2845 | ||
2846 | // foreachHeaderElement splits v according to the "#rule" construction | |
107c1cdb | 2847 | // in RFC 7230 section 7 and calls fn for each non-empty element. |
15c0b25d AP |
2848 | func foreachHeaderElement(v string, fn func(string)) { |
2849 | v = textproto.TrimString(v) | |
2850 | if v == "" { | |
2851 | return | |
2852 | } | |
2853 | if !strings.Contains(v, ",") { | |
2854 | fn(v) | |
2855 | return | |
2856 | } | |
2857 | for _, f := range strings.Split(v, ",") { | |
2858 | if f = textproto.TrimString(f); f != "" { | |
2859 | fn(f) | |
2860 | } | |
2861 | } | |
2862 | } | |
2863 | ||
2864 | // From http://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2.2 | |
2865 | var connHeaders = []string{ | |
2866 | "Connection", | |
2867 | "Keep-Alive", | |
2868 | "Proxy-Connection", | |
2869 | "Transfer-Encoding", | |
2870 | "Upgrade", | |
2871 | } | |
2872 | ||
2873 | // checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request, | |
2874 | // per RFC 7540 Section 8.1.2.2. | |
2875 | // The returned error is reported to users. | |
2876 | func checkValidHTTP2RequestHeaders(h http.Header) error { | |
2877 | for _, k := range connHeaders { | |
2878 | if _, ok := h[k]; ok { | |
2879 | return fmt.Errorf("request header %q is not valid in HTTP/2", k) | |
2880 | } | |
2881 | } | |
2882 | te := h["Te"] | |
2883 | if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) { | |
2884 | return errors.New(`request header "TE" may only be "trailers" in HTTP/2`) | |
2885 | } | |
2886 | return nil | |
2887 | } | |
2888 | ||
2889 | func new400Handler(err error) http.HandlerFunc { | |
2890 | return func(w http.ResponseWriter, r *http.Request) { | |
2891 | http.Error(w, err.Error(), http.StatusBadRequest) | |
2892 | } | |
2893 | } | |
2894 | ||
15c0b25d AP |
2895 | // h1ServerKeepAlivesDisabled reports whether hs has its keep-alives |
2896 | // disabled. See comments on h1ServerShutdownChan above for why | |
2897 | // the code is written this way. | |
2898 | func h1ServerKeepAlivesDisabled(hs *http.Server) bool { | |
2899 | var x interface{} = hs | |
2900 | type I interface { | |
2901 | doKeepAlives() bool | |
2902 | } | |
2903 | if hs, ok := x.(I); ok { | |
2904 | return !hs.doKeepAlives() | |
2905 | } | |
2906 | return false | |
2907 | } |