]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | // Copyright 2014 The Go Authors. All rights reserved. |
2 | // Use of this source code is governed by a BSD-style | |
3 | // license that can be found in the LICENSE file. | |
4 | ||
5 | // TODO: turn off the serve goroutine when idle, so | |
6 | // an idle conn only has the readFrames goroutine active. (which could | |
7 | // also be optimized probably to pin less memory in crypto/tls). This | |
8 | // would involve tracking when the serve goroutine is active (atomic | |
9 | // int32 read/CAS probably?) and starting it up when frames arrive, | |
10 | // and shutting it down when all handlers exit. the occasional PING | |
11 | // packets could use time.AfterFunc to call sc.wakeStartServeLoop() | |
12 | // (which is a no-op if already running) and then queue the PING write | |
13 | // as normal. The serve loop would then exit in most cases (if no | |
14 | // Handlers running) and not be woken up again until the PING packet | |
15 | // returns. | |
16 | ||
17 | // TODO (maybe): add a mechanism for Handlers to going into | |
18 | // half-closed-local mode (rw.(io.Closer) test?) but not exit their | |
19 | // handler, and continue to be able to read from the | |
20 | // Request.Body. This would be a somewhat semantic change from HTTP/1 | |
21 | // (or at least what we expose in net/http), so I'd probably want to | |
22 | // add it there too. For now, this package says that returning from | |
23 | // the Handler ServeHTTP function means you're both done reading and | |
24 | // done writing, without a way to stop just one or the other. | |
25 | ||
26 | package http2 | |
27 | ||
28 | import ( | |
29 | "bufio" | |
30 | "bytes" | |
31 | "crypto/tls" | |
32 | "errors" | |
33 | "fmt" | |
34 | "io" | |
35 | "log" | |
36 | "math" | |
37 | "net" | |
38 | "net/http" | |
39 | "net/textproto" | |
40 | "net/url" | |
41 | "os" | |
42 | "reflect" | |
43 | "runtime" | |
44 | "strconv" | |
45 | "strings" | |
46 | "sync" | |
47 | "time" | |
48 | ||
49 | "golang.org/x/net/http2/hpack" | |
50 | ) | |
51 | ||
52 | const ( | |
53 | prefaceTimeout = 10 * time.Second | |
54 | firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway | |
55 | handlerChunkWriteSize = 4 << 10 | |
56 | defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to? | |
57 | ) | |
58 | ||
59 | var ( | |
60 | errClientDisconnected = errors.New("client disconnected") | |
61 | errClosedBody = errors.New("body closed by handler") | |
62 | errHandlerComplete = errors.New("http2: request body closed due to handler exiting") | |
63 | errStreamClosed = errors.New("http2: stream closed") | |
64 | ) | |
65 | ||
66 | var responseWriterStatePool = sync.Pool{ | |
67 | New: func() interface{} { | |
68 | rws := &responseWriterState{} | |
69 | rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize) | |
70 | return rws | |
71 | }, | |
72 | } | |
73 | ||
74 | // Test hooks. | |
75 | var ( | |
76 | testHookOnConn func() | |
77 | testHookGetServerConn func(*serverConn) | |
78 | testHookOnPanicMu *sync.Mutex // nil except in tests | |
79 | testHookOnPanic func(sc *serverConn, panicVal interface{}) (rePanic bool) | |
80 | ) | |
81 | ||
82 | // Server is an HTTP/2 server. | |
83 | type Server struct { | |
84 | // MaxHandlers limits the number of http.Handler ServeHTTP goroutines | |
85 | // which may run at a time over all connections. | |
86 | // Negative or zero no limit. | |
87 | // TODO: implement | |
88 | MaxHandlers int | |
89 | ||
90 | // MaxConcurrentStreams optionally specifies the number of | |
91 | // concurrent streams that each client may have open at a | |
92 | // time. This is unrelated to the number of http.Handler goroutines | |
93 | // which may be active globally, which is MaxHandlers. | |
94 | // If zero, MaxConcurrentStreams defaults to at least 100, per | |
95 | // the HTTP/2 spec's recommendations. | |
96 | MaxConcurrentStreams uint32 | |
97 | ||
98 | // MaxReadFrameSize optionally specifies the largest frame | |
99 | // this server is willing to read. A valid value is between | |
100 | // 16k and 16M, inclusive. If zero or otherwise invalid, a | |
101 | // default value is used. | |
102 | MaxReadFrameSize uint32 | |
103 | ||
104 | // PermitProhibitedCipherSuites, if true, permits the use of | |
105 | // cipher suites prohibited by the HTTP/2 spec. | |
106 | PermitProhibitedCipherSuites bool | |
107 | ||
108 | // IdleTimeout specifies how long until idle clients should be | |
109 | // closed with a GOAWAY frame. PING frames are not considered | |
110 | // activity for the purposes of IdleTimeout. | |
111 | IdleTimeout time.Duration | |
112 | ||
113 | // MaxUploadBufferPerConnection is the size of the initial flow | |
114 | // control window for each connections. The HTTP/2 spec does not | |
115 | // allow this to be smaller than 65535 or larger than 2^32-1. | |
116 | // If the value is outside this range, a default value will be | |
117 | // used instead. | |
118 | MaxUploadBufferPerConnection int32 | |
119 | ||
120 | // MaxUploadBufferPerStream is the size of the initial flow control | |
121 | // window for each stream. The HTTP/2 spec does not allow this to | |
122 | // be larger than 2^32-1. If the value is zero or larger than the | |
123 | // maximum, a default value will be used instead. | |
124 | MaxUploadBufferPerStream int32 | |
125 | ||
126 | // NewWriteScheduler constructs a write scheduler for a connection. | |
127 | // If nil, a default scheduler is chosen. | |
128 | NewWriteScheduler func() WriteScheduler | |
129 | ||
130 | // Internal state. This is a pointer (rather than embedded directly) | |
131 | // so that we don't embed a Mutex in this struct, which will make the | |
132 | // struct non-copyable, which might break some callers. | |
133 | state *serverInternalState | |
134 | } | |
135 | ||
136 | func (s *Server) initialConnRecvWindowSize() int32 { | |
137 | if s.MaxUploadBufferPerConnection > initialWindowSize { | |
138 | return s.MaxUploadBufferPerConnection | |
139 | } | |
140 | return 1 << 20 | |
141 | } | |
142 | ||
143 | func (s *Server) initialStreamRecvWindowSize() int32 { | |
144 | if s.MaxUploadBufferPerStream > 0 { | |
145 | return s.MaxUploadBufferPerStream | |
146 | } | |
147 | return 1 << 20 | |
148 | } | |
149 | ||
150 | func (s *Server) maxReadFrameSize() uint32 { | |
151 | if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize { | |
152 | return v | |
153 | } | |
154 | return defaultMaxReadFrameSize | |
155 | } | |
156 | ||
157 | func (s *Server) maxConcurrentStreams() uint32 { | |
158 | if v := s.MaxConcurrentStreams; v > 0 { | |
159 | return v | |
160 | } | |
161 | return defaultMaxStreams | |
162 | } | |
163 | ||
164 | type serverInternalState struct { | |
165 | mu sync.Mutex | |
166 | activeConns map[*serverConn]struct{} | |
167 | } | |
168 | ||
169 | func (s *serverInternalState) registerConn(sc *serverConn) { | |
170 | if s == nil { | |
171 | return // if the Server was used without calling ConfigureServer | |
172 | } | |
173 | s.mu.Lock() | |
174 | s.activeConns[sc] = struct{}{} | |
175 | s.mu.Unlock() | |
176 | } | |
177 | ||
178 | func (s *serverInternalState) unregisterConn(sc *serverConn) { | |
179 | if s == nil { | |
180 | return // if the Server was used without calling ConfigureServer | |
181 | } | |
182 | s.mu.Lock() | |
183 | delete(s.activeConns, sc) | |
184 | s.mu.Unlock() | |
185 | } | |
186 | ||
187 | func (s *serverInternalState) startGracefulShutdown() { | |
188 | if s == nil { | |
189 | return // if the Server was used without calling ConfigureServer | |
190 | } | |
191 | s.mu.Lock() | |
192 | for sc := range s.activeConns { | |
193 | sc.startGracefulShutdown() | |
194 | } | |
195 | s.mu.Unlock() | |
196 | } | |
197 | ||
198 | // ConfigureServer adds HTTP/2 support to a net/http Server. | |
199 | // | |
200 | // The configuration conf may be nil. | |
201 | // | |
202 | // ConfigureServer must be called before s begins serving. | |
203 | func ConfigureServer(s *http.Server, conf *Server) error { | |
204 | if s == nil { | |
205 | panic("nil *http.Server") | |
206 | } | |
207 | if conf == nil { | |
208 | conf = new(Server) | |
209 | } | |
210 | conf.state = &serverInternalState{activeConns: make(map[*serverConn]struct{})} | |
211 | if err := configureServer18(s, conf); err != nil { | |
212 | return err | |
213 | } | |
214 | if err := configureServer19(s, conf); err != nil { | |
215 | return err | |
216 | } | |
217 | ||
218 | if s.TLSConfig == nil { | |
219 | s.TLSConfig = new(tls.Config) | |
220 | } else if s.TLSConfig.CipherSuites != nil { | |
221 | // If they already provided a CipherSuite list, return | |
222 | // an error if it has a bad order or is missing | |
223 | // ECDHE_RSA_WITH_AES_128_GCM_SHA256. | |
224 | const requiredCipher = tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 | |
225 | haveRequired := false | |
226 | sawBad := false | |
227 | for i, cs := range s.TLSConfig.CipherSuites { | |
228 | if cs == requiredCipher { | |
229 | haveRequired = true | |
230 | } | |
231 | if isBadCipher(cs) { | |
232 | sawBad = true | |
233 | } else if sawBad { | |
234 | return fmt.Errorf("http2: TLSConfig.CipherSuites index %d contains an HTTP/2-approved cipher suite (%#04x), but it comes after unapproved cipher suites. With this configuration, clients that don't support previous, approved cipher suites may be given an unapproved one and reject the connection.", i, cs) | |
235 | } | |
236 | } | |
237 | if !haveRequired { | |
238 | return fmt.Errorf("http2: TLSConfig.CipherSuites is missing HTTP/2-required TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") | |
239 | } | |
240 | } | |
241 | ||
242 | // Note: not setting MinVersion to tls.VersionTLS12, | |
243 | // as we don't want to interfere with HTTP/1.1 traffic | |
244 | // on the user's server. We enforce TLS 1.2 later once | |
245 | // we accept a connection. Ideally this should be done | |
246 | // during next-proto selection, but using TLS <1.2 with | |
247 | // HTTP/2 is still the client's bug. | |
248 | ||
249 | s.TLSConfig.PreferServerCipherSuites = true | |
250 | ||
251 | haveNPN := false | |
252 | for _, p := range s.TLSConfig.NextProtos { | |
253 | if p == NextProtoTLS { | |
254 | haveNPN = true | |
255 | break | |
256 | } | |
257 | } | |
258 | if !haveNPN { | |
259 | s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS) | |
260 | } | |
261 | ||
262 | if s.TLSNextProto == nil { | |
263 | s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){} | |
264 | } | |
265 | protoHandler := func(hs *http.Server, c *tls.Conn, h http.Handler) { | |
266 | if testHookOnConn != nil { | |
267 | testHookOnConn() | |
268 | } | |
269 | conf.ServeConn(c, &ServeConnOpts{ | |
270 | Handler: h, | |
271 | BaseConfig: hs, | |
272 | }) | |
273 | } | |
274 | s.TLSNextProto[NextProtoTLS] = protoHandler | |
275 | return nil | |
276 | } | |
277 | ||
278 | // ServeConnOpts are options for the Server.ServeConn method. | |
279 | type ServeConnOpts struct { | |
280 | // BaseConfig optionally sets the base configuration | |
281 | // for values. If nil, defaults are used. | |
282 | BaseConfig *http.Server | |
283 | ||
284 | // Handler specifies which handler to use for processing | |
285 | // requests. If nil, BaseConfig.Handler is used. If BaseConfig | |
286 | // or BaseConfig.Handler is nil, http.DefaultServeMux is used. | |
287 | Handler http.Handler | |
288 | } | |
289 | ||
290 | func (o *ServeConnOpts) baseConfig() *http.Server { | |
291 | if o != nil && o.BaseConfig != nil { | |
292 | return o.BaseConfig | |
293 | } | |
294 | return new(http.Server) | |
295 | } | |
296 | ||
297 | func (o *ServeConnOpts) handler() http.Handler { | |
298 | if o != nil { | |
299 | if o.Handler != nil { | |
300 | return o.Handler | |
301 | } | |
302 | if o.BaseConfig != nil && o.BaseConfig.Handler != nil { | |
303 | return o.BaseConfig.Handler | |
304 | } | |
305 | } | |
306 | return http.DefaultServeMux | |
307 | } | |
308 | ||
309 | // ServeConn serves HTTP/2 requests on the provided connection and | |
310 | // blocks until the connection is no longer readable. | |
311 | // | |
312 | // ServeConn starts speaking HTTP/2 assuming that c has not had any | |
313 | // reads or writes. It writes its initial settings frame and expects | |
314 | // to be able to read the preface and settings frame from the | |
315 | // client. If c has a ConnectionState method like a *tls.Conn, the | |
316 | // ConnectionState is used to verify the TLS ciphersuite and to set | |
317 | // the Request.TLS field in Handlers. | |
318 | // | |
319 | // ServeConn does not support h2c by itself. Any h2c support must be | |
320 | // implemented in terms of providing a suitably-behaving net.Conn. | |
321 | // | |
322 | // The opts parameter is optional. If nil, default values are used. | |
323 | func (s *Server) ServeConn(c net.Conn, opts *ServeConnOpts) { | |
324 | baseCtx, cancel := serverConnBaseContext(c, opts) | |
325 | defer cancel() | |
326 | ||
327 | sc := &serverConn{ | |
328 | srv: s, | |
329 | hs: opts.baseConfig(), | |
330 | conn: c, | |
331 | baseCtx: baseCtx, | |
332 | remoteAddrStr: c.RemoteAddr().String(), | |
333 | bw: newBufferedWriter(c), | |
334 | handler: opts.handler(), | |
335 | streams: make(map[uint32]*stream), | |
336 | readFrameCh: make(chan readFrameResult), | |
337 | wantWriteFrameCh: make(chan FrameWriteRequest, 8), | |
338 | serveMsgCh: make(chan interface{}, 8), | |
339 | wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync | |
340 | bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way | |
341 | doneServing: make(chan struct{}), | |
342 | clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value" | |
343 | advMaxStreams: s.maxConcurrentStreams(), | |
344 | initialStreamSendWindowSize: initialWindowSize, | |
345 | maxFrameSize: initialMaxFrameSize, | |
346 | headerTableSize: initialHeaderTableSize, | |
347 | serveG: newGoroutineLock(), | |
348 | pushEnabled: true, | |
349 | } | |
350 | ||
351 | s.state.registerConn(sc) | |
352 | defer s.state.unregisterConn(sc) | |
353 | ||
354 | // The net/http package sets the write deadline from the | |
355 | // http.Server.WriteTimeout during the TLS handshake, but then | |
356 | // passes the connection off to us with the deadline already set. | |
357 | // Write deadlines are set per stream in serverConn.newStream. | |
358 | // Disarm the net.Conn write deadline here. | |
359 | if sc.hs.WriteTimeout != 0 { | |
360 | sc.conn.SetWriteDeadline(time.Time{}) | |
361 | } | |
362 | ||
363 | if s.NewWriteScheduler != nil { | |
364 | sc.writeSched = s.NewWriteScheduler() | |
365 | } else { | |
366 | sc.writeSched = NewRandomWriteScheduler() | |
367 | } | |
368 | ||
369 | // These start at the RFC-specified defaults. If there is a higher | |
370 | // configured value for inflow, that will be updated when we send a | |
371 | // WINDOW_UPDATE shortly after sending SETTINGS. | |
372 | sc.flow.add(initialWindowSize) | |
373 | sc.inflow.add(initialWindowSize) | |
374 | sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf) | |
375 | ||
376 | fr := NewFramer(sc.bw, c) | |
377 | fr.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil) | |
378 | fr.MaxHeaderListSize = sc.maxHeaderListSize() | |
379 | fr.SetMaxReadFrameSize(s.maxReadFrameSize()) | |
380 | sc.framer = fr | |
381 | ||
382 | if tc, ok := c.(connectionStater); ok { | |
383 | sc.tlsState = new(tls.ConnectionState) | |
384 | *sc.tlsState = tc.ConnectionState() | |
385 | // 9.2 Use of TLS Features | |
386 | // An implementation of HTTP/2 over TLS MUST use TLS | |
387 | // 1.2 or higher with the restrictions on feature set | |
388 | // and cipher suite described in this section. Due to | |
389 | // implementation limitations, it might not be | |
390 | // possible to fail TLS negotiation. An endpoint MUST | |
391 | // immediately terminate an HTTP/2 connection that | |
392 | // does not meet the TLS requirements described in | |
393 | // this section with a connection error (Section | |
394 | // 5.4.1) of type INADEQUATE_SECURITY. | |
395 | if sc.tlsState.Version < tls.VersionTLS12 { | |
396 | sc.rejectConn(ErrCodeInadequateSecurity, "TLS version too low") | |
397 | return | |
398 | } | |
399 | ||
400 | if sc.tlsState.ServerName == "" { | |
401 | // Client must use SNI, but we don't enforce that anymore, | |
402 | // since it was causing problems when connecting to bare IP | |
403 | // addresses during development. | |
404 | // | |
405 | // TODO: optionally enforce? Or enforce at the time we receive | |
406 | // a new request, and verify the the ServerName matches the :authority? | |
407 | // But that precludes proxy situations, perhaps. | |
408 | // | |
409 | // So for now, do nothing here again. | |
410 | } | |
411 | ||
412 | if !s.PermitProhibitedCipherSuites && isBadCipher(sc.tlsState.CipherSuite) { | |
413 | // "Endpoints MAY choose to generate a connection error | |
414 | // (Section 5.4.1) of type INADEQUATE_SECURITY if one of | |
415 | // the prohibited cipher suites are negotiated." | |
416 | // | |
417 | // We choose that. In my opinion, the spec is weak | |
418 | // here. It also says both parties must support at least | |
419 | // TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 so there's no | |
420 | // excuses here. If we really must, we could allow an | |
421 | // "AllowInsecureWeakCiphers" option on the server later. | |
422 | // Let's see how it plays out first. | |
423 | sc.rejectConn(ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite)) | |
424 | return | |
425 | } | |
426 | } | |
427 | ||
428 | if hook := testHookGetServerConn; hook != nil { | |
429 | hook(sc) | |
430 | } | |
431 | sc.serve() | |
432 | } | |
433 | ||
434 | func (sc *serverConn) rejectConn(err ErrCode, debug string) { | |
435 | sc.vlogf("http2: server rejecting conn: %v, %s", err, debug) | |
436 | // ignoring errors. hanging up anyway. | |
437 | sc.framer.WriteGoAway(0, err, []byte(debug)) | |
438 | sc.bw.Flush() | |
439 | sc.conn.Close() | |
440 | } | |
441 | ||
442 | type serverConn struct { | |
443 | // Immutable: | |
444 | srv *Server | |
445 | hs *http.Server | |
446 | conn net.Conn | |
447 | bw *bufferedWriter // writing to conn | |
448 | handler http.Handler | |
449 | baseCtx contextContext | |
450 | framer *Framer | |
451 | doneServing chan struct{} // closed when serverConn.serve ends | |
452 | readFrameCh chan readFrameResult // written by serverConn.readFrames | |
453 | wantWriteFrameCh chan FrameWriteRequest // from handlers -> serve | |
454 | wroteFrameCh chan frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes | |
455 | bodyReadCh chan bodyReadMsg // from handlers -> serve | |
456 | serveMsgCh chan interface{} // misc messages & code to send to / run on the serve loop | |
457 | flow flow // conn-wide (not stream-specific) outbound flow control | |
458 | inflow flow // conn-wide inbound flow control | |
459 | tlsState *tls.ConnectionState // shared by all handlers, like net/http | |
460 | remoteAddrStr string | |
461 | writeSched WriteScheduler | |
462 | ||
463 | // Everything following is owned by the serve loop; use serveG.check(): | |
464 | serveG goroutineLock // used to verify funcs are on serve() | |
465 | pushEnabled bool | |
466 | sawFirstSettings bool // got the initial SETTINGS frame after the preface | |
467 | needToSendSettingsAck bool | |
468 | unackedSettings int // how many SETTINGS have we sent without ACKs? | |
469 | clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit) | |
470 | advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client | |
471 | curClientStreams uint32 // number of open streams initiated by the client | |
472 | curPushedStreams uint32 // number of open streams initiated by server push | |
473 | maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests | |
474 | maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes | |
475 | streams map[uint32]*stream | |
476 | initialStreamSendWindowSize int32 | |
477 | maxFrameSize int32 | |
478 | headerTableSize uint32 | |
479 | peerMaxHeaderListSize uint32 // zero means unknown (default) | |
480 | canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case | |
481 | writingFrame bool // started writing a frame (on serve goroutine or separate) | |
482 | writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh | |
483 | needsFrameFlush bool // last frame write wasn't a flush | |
484 | inGoAway bool // we've started to or sent GOAWAY | |
485 | inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop | |
486 | needToSendGoAway bool // we need to schedule a GOAWAY frame write | |
487 | goAwayCode ErrCode | |
488 | shutdownTimer *time.Timer // nil until used | |
489 | idleTimer *time.Timer // nil if unused | |
490 | ||
491 | // Owned by the writeFrameAsync goroutine: | |
492 | headerWriteBuf bytes.Buffer | |
493 | hpackEncoder *hpack.Encoder | |
494 | ||
495 | // Used by startGracefulShutdown. | |
496 | shutdownOnce sync.Once | |
497 | } | |
498 | ||
499 | func (sc *serverConn) maxHeaderListSize() uint32 { | |
500 | n := sc.hs.MaxHeaderBytes | |
501 | if n <= 0 { | |
502 | n = http.DefaultMaxHeaderBytes | |
503 | } | |
504 | // http2's count is in a slightly different unit and includes 32 bytes per pair. | |
505 | // So, take the net/http.Server value and pad it up a bit, assuming 10 headers. | |
506 | const perFieldOverhead = 32 // per http2 spec | |
507 | const typicalHeaders = 10 // conservative | |
508 | return uint32(n + typicalHeaders*perFieldOverhead) | |
509 | } | |
510 | ||
511 | func (sc *serverConn) curOpenStreams() uint32 { | |
512 | sc.serveG.check() | |
513 | return sc.curClientStreams + sc.curPushedStreams | |
514 | } | |
515 | ||
516 | // stream represents a stream. This is the minimal metadata needed by | |
517 | // the serve goroutine. Most of the actual stream state is owned by | |
518 | // the http.Handler's goroutine in the responseWriter. Because the | |
519 | // responseWriter's responseWriterState is recycled at the end of a | |
520 | // handler, this struct intentionally has no pointer to the | |
521 | // *responseWriter{,State} itself, as the Handler ending nils out the | |
522 | // responseWriter's state field. | |
523 | type stream struct { | |
524 | // immutable: | |
525 | sc *serverConn | |
526 | id uint32 | |
527 | body *pipe // non-nil if expecting DATA frames | |
528 | cw closeWaiter // closed wait stream transitions to closed state | |
529 | ctx contextContext | |
530 | cancelCtx func() | |
531 | ||
532 | // owned by serverConn's serve loop: | |
533 | bodyBytes int64 // body bytes seen so far | |
534 | declBodyBytes int64 // or -1 if undeclared | |
535 | flow flow // limits writing from Handler to client | |
536 | inflow flow // what the client is allowed to POST/etc to us | |
537 | parent *stream // or nil | |
538 | numTrailerValues int64 | |
539 | weight uint8 | |
540 | state streamState | |
541 | resetQueued bool // RST_STREAM queued for write; set by sc.resetStream | |
542 | gotTrailerHeader bool // HEADER frame for trailers was seen | |
543 | wroteHeaders bool // whether we wrote headers (not status 100) | |
544 | writeDeadline *time.Timer // nil if unused | |
545 | ||
546 | trailer http.Header // accumulated trailers | |
547 | reqTrailer http.Header // handler's Request.Trailer | |
548 | } | |
549 | ||
550 | func (sc *serverConn) Framer() *Framer { return sc.framer } | |
551 | func (sc *serverConn) CloseConn() error { return sc.conn.Close() } | |
552 | func (sc *serverConn) Flush() error { return sc.bw.Flush() } | |
553 | func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) { | |
554 | return sc.hpackEncoder, &sc.headerWriteBuf | |
555 | } | |
556 | ||
557 | func (sc *serverConn) state(streamID uint32) (streamState, *stream) { | |
558 | sc.serveG.check() | |
559 | // http://tools.ietf.org/html/rfc7540#section-5.1 | |
560 | if st, ok := sc.streams[streamID]; ok { | |
561 | return st.state, st | |
562 | } | |
563 | // "The first use of a new stream identifier implicitly closes all | |
564 | // streams in the "idle" state that might have been initiated by | |
565 | // that peer with a lower-valued stream identifier. For example, if | |
566 | // a client sends a HEADERS frame on stream 7 without ever sending a | |
567 | // frame on stream 5, then stream 5 transitions to the "closed" | |
568 | // state when the first frame for stream 7 is sent or received." | |
569 | if streamID%2 == 1 { | |
570 | if streamID <= sc.maxClientStreamID { | |
571 | return stateClosed, nil | |
572 | } | |
573 | } else { | |
574 | if streamID <= sc.maxPushPromiseID { | |
575 | return stateClosed, nil | |
576 | } | |
577 | } | |
578 | return stateIdle, nil | |
579 | } | |
580 | ||
581 | // setConnState calls the net/http ConnState hook for this connection, if configured. | |
582 | // Note that the net/http package does StateNew and StateClosed for us. | |
583 | // There is currently no plan for StateHijacked or hijacking HTTP/2 connections. | |
584 | func (sc *serverConn) setConnState(state http.ConnState) { | |
585 | if sc.hs.ConnState != nil { | |
586 | sc.hs.ConnState(sc.conn, state) | |
587 | } | |
588 | } | |
589 | ||
590 | func (sc *serverConn) vlogf(format string, args ...interface{}) { | |
591 | if VerboseLogs { | |
592 | sc.logf(format, args...) | |
593 | } | |
594 | } | |
595 | ||
596 | func (sc *serverConn) logf(format string, args ...interface{}) { | |
597 | if lg := sc.hs.ErrorLog; lg != nil { | |
598 | lg.Printf(format, args...) | |
599 | } else { | |
600 | log.Printf(format, args...) | |
601 | } | |
602 | } | |
603 | ||
604 | // errno returns v's underlying uintptr, else 0. | |
605 | // | |
606 | // TODO: remove this helper function once http2 can use build | |
607 | // tags. See comment in isClosedConnError. | |
608 | func errno(v error) uintptr { | |
609 | if rv := reflect.ValueOf(v); rv.Kind() == reflect.Uintptr { | |
610 | return uintptr(rv.Uint()) | |
611 | } | |
612 | return 0 | |
613 | } | |
614 | ||
615 | // isClosedConnError reports whether err is an error from use of a closed | |
616 | // network connection. | |
617 | func isClosedConnError(err error) bool { | |
618 | if err == nil { | |
619 | return false | |
620 | } | |
621 | ||
622 | // TODO: remove this string search and be more like the Windows | |
623 | // case below. That might involve modifying the standard library | |
624 | // to return better error types. | |
625 | str := err.Error() | |
626 | if strings.Contains(str, "use of closed network connection") { | |
627 | return true | |
628 | } | |
629 | ||
630 | // TODO(bradfitz): x/tools/cmd/bundle doesn't really support | |
631 | // build tags, so I can't make an http2_windows.go file with | |
632 | // Windows-specific stuff. Fix that and move this, once we | |
633 | // have a way to bundle this into std's net/http somehow. | |
634 | if runtime.GOOS == "windows" { | |
635 | if oe, ok := err.(*net.OpError); ok && oe.Op == "read" { | |
636 | if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" { | |
637 | const WSAECONNABORTED = 10053 | |
638 | const WSAECONNRESET = 10054 | |
639 | if n := errno(se.Err); n == WSAECONNRESET || n == WSAECONNABORTED { | |
640 | return true | |
641 | } | |
642 | } | |
643 | } | |
644 | } | |
645 | return false | |
646 | } | |
647 | ||
648 | func (sc *serverConn) condlogf(err error, format string, args ...interface{}) { | |
649 | if err == nil { | |
650 | return | |
651 | } | |
652 | if err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) { | |
653 | // Boring, expected errors. | |
654 | sc.vlogf(format, args...) | |
655 | } else { | |
656 | sc.logf(format, args...) | |
657 | } | |
658 | } | |
659 | ||
660 | func (sc *serverConn) canonicalHeader(v string) string { | |
661 | sc.serveG.check() | |
662 | cv, ok := commonCanonHeader[v] | |
663 | if ok { | |
664 | return cv | |
665 | } | |
666 | cv, ok = sc.canonHeader[v] | |
667 | if ok { | |
668 | return cv | |
669 | } | |
670 | if sc.canonHeader == nil { | |
671 | sc.canonHeader = make(map[string]string) | |
672 | } | |
673 | cv = http.CanonicalHeaderKey(v) | |
674 | sc.canonHeader[v] = cv | |
675 | return cv | |
676 | } | |
677 | ||
678 | type readFrameResult struct { | |
679 | f Frame // valid until readMore is called | |
680 | err error | |
681 | ||
682 | // readMore should be called once the consumer no longer needs or | |
683 | // retains f. After readMore, f is invalid and more frames can be | |
684 | // read. | |
685 | readMore func() | |
686 | } | |
687 | ||
688 | // readFrames is the loop that reads incoming frames. | |
689 | // It takes care to only read one frame at a time, blocking until the | |
690 | // consumer is done with the frame. | |
691 | // It's run on its own goroutine. | |
692 | func (sc *serverConn) readFrames() { | |
693 | gate := make(gate) | |
694 | gateDone := gate.Done | |
695 | for { | |
696 | f, err := sc.framer.ReadFrame() | |
697 | select { | |
698 | case sc.readFrameCh <- readFrameResult{f, err, gateDone}: | |
699 | case <-sc.doneServing: | |
700 | return | |
701 | } | |
702 | select { | |
703 | case <-gate: | |
704 | case <-sc.doneServing: | |
705 | return | |
706 | } | |
707 | if terminalReadFrameError(err) { | |
708 | return | |
709 | } | |
710 | } | |
711 | } | |
712 | ||
713 | // frameWriteResult is the message passed from writeFrameAsync to the serve goroutine. | |
714 | type frameWriteResult struct { | |
715 | wr FrameWriteRequest // what was written (or attempted) | |
716 | err error // result of the writeFrame call | |
717 | } | |
718 | ||
719 | // writeFrameAsync runs in its own goroutine and writes a single frame | |
720 | // and then reports when it's done. | |
721 | // At most one goroutine can be running writeFrameAsync at a time per | |
722 | // serverConn. | |
723 | func (sc *serverConn) writeFrameAsync(wr FrameWriteRequest) { | |
724 | err := wr.write.writeFrame(sc) | |
725 | sc.wroteFrameCh <- frameWriteResult{wr, err} | |
726 | } | |
727 | ||
728 | func (sc *serverConn) closeAllStreamsOnConnClose() { | |
729 | sc.serveG.check() | |
730 | for _, st := range sc.streams { | |
731 | sc.closeStream(st, errClientDisconnected) | |
732 | } | |
733 | } | |
734 | ||
735 | func (sc *serverConn) stopShutdownTimer() { | |
736 | sc.serveG.check() | |
737 | if t := sc.shutdownTimer; t != nil { | |
738 | t.Stop() | |
739 | } | |
740 | } | |
741 | ||
742 | func (sc *serverConn) notePanic() { | |
743 | // Note: this is for serverConn.serve panicking, not http.Handler code. | |
744 | if testHookOnPanicMu != nil { | |
745 | testHookOnPanicMu.Lock() | |
746 | defer testHookOnPanicMu.Unlock() | |
747 | } | |
748 | if testHookOnPanic != nil { | |
749 | if e := recover(); e != nil { | |
750 | if testHookOnPanic(sc, e) { | |
751 | panic(e) | |
752 | } | |
753 | } | |
754 | } | |
755 | } | |
756 | ||
757 | func (sc *serverConn) serve() { | |
758 | sc.serveG.check() | |
759 | defer sc.notePanic() | |
760 | defer sc.conn.Close() | |
761 | defer sc.closeAllStreamsOnConnClose() | |
762 | defer sc.stopShutdownTimer() | |
763 | defer close(sc.doneServing) // unblocks handlers trying to send | |
764 | ||
765 | if VerboseLogs { | |
766 | sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs) | |
767 | } | |
768 | ||
769 | sc.writeFrame(FrameWriteRequest{ | |
770 | write: writeSettings{ | |
771 | {SettingMaxFrameSize, sc.srv.maxReadFrameSize()}, | |
772 | {SettingMaxConcurrentStreams, sc.advMaxStreams}, | |
773 | {SettingMaxHeaderListSize, sc.maxHeaderListSize()}, | |
774 | {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())}, | |
775 | }, | |
776 | }) | |
777 | sc.unackedSettings++ | |
778 | ||
779 | // Each connection starts with intialWindowSize inflow tokens. | |
780 | // If a higher value is configured, we add more tokens. | |
781 | if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 { | |
782 | sc.sendWindowUpdate(nil, int(diff)) | |
783 | } | |
784 | ||
785 | if err := sc.readPreface(); err != nil { | |
786 | sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err) | |
787 | return | |
788 | } | |
789 | // Now that we've got the preface, get us out of the | |
790 | // "StateNew" state. We can't go directly to idle, though. | |
791 | // Active means we read some data and anticipate a request. We'll | |
792 | // do another Active when we get a HEADERS frame. | |
793 | sc.setConnState(http.StateActive) | |
794 | sc.setConnState(http.StateIdle) | |
795 | ||
796 | if sc.srv.IdleTimeout != 0 { | |
797 | sc.idleTimer = time.AfterFunc(sc.srv.IdleTimeout, sc.onIdleTimer) | |
798 | defer sc.idleTimer.Stop() | |
799 | } | |
800 | ||
801 | go sc.readFrames() // closed by defer sc.conn.Close above | |
802 | ||
803 | settingsTimer := time.AfterFunc(firstSettingsTimeout, sc.onSettingsTimer) | |
804 | defer settingsTimer.Stop() | |
805 | ||
806 | loopNum := 0 | |
807 | for { | |
808 | loopNum++ | |
809 | select { | |
810 | case wr := <-sc.wantWriteFrameCh: | |
811 | if se, ok := wr.write.(StreamError); ok { | |
812 | sc.resetStream(se) | |
813 | break | |
814 | } | |
815 | sc.writeFrame(wr) | |
816 | case res := <-sc.wroteFrameCh: | |
817 | sc.wroteFrame(res) | |
818 | case res := <-sc.readFrameCh: | |
819 | if !sc.processFrameFromReader(res) { | |
820 | return | |
821 | } | |
822 | res.readMore() | |
823 | if settingsTimer != nil { | |
824 | settingsTimer.Stop() | |
825 | settingsTimer = nil | |
826 | } | |
827 | case m := <-sc.bodyReadCh: | |
828 | sc.noteBodyRead(m.st, m.n) | |
829 | case msg := <-sc.serveMsgCh: | |
830 | switch v := msg.(type) { | |
831 | case func(int): | |
832 | v(loopNum) // for testing | |
833 | case *serverMessage: | |
834 | switch v { | |
835 | case settingsTimerMsg: | |
836 | sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr()) | |
837 | return | |
838 | case idleTimerMsg: | |
839 | sc.vlogf("connection is idle") | |
840 | sc.goAway(ErrCodeNo) | |
841 | case shutdownTimerMsg: | |
842 | sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr()) | |
843 | return | |
844 | case gracefulShutdownMsg: | |
845 | sc.startGracefulShutdownInternal() | |
846 | default: | |
847 | panic("unknown timer") | |
848 | } | |
849 | case *startPushRequest: | |
850 | sc.startPush(v) | |
851 | default: | |
852 | panic(fmt.Sprintf("unexpected type %T", v)) | |
853 | } | |
854 | } | |
855 | ||
856 | if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame { | |
857 | return | |
858 | } | |
859 | } | |
860 | } | |
861 | ||
862 | func (sc *serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) { | |
863 | select { | |
864 | case <-sc.doneServing: | |
865 | case <-sharedCh: | |
866 | close(privateCh) | |
867 | } | |
868 | } | |
869 | ||
870 | type serverMessage int | |
871 | ||
872 | // Message values sent to serveMsgCh. | |
873 | var ( | |
874 | settingsTimerMsg = new(serverMessage) | |
875 | idleTimerMsg = new(serverMessage) | |
876 | shutdownTimerMsg = new(serverMessage) | |
877 | gracefulShutdownMsg = new(serverMessage) | |
878 | ) | |
879 | ||
880 | func (sc *serverConn) onSettingsTimer() { sc.sendServeMsg(settingsTimerMsg) } | |
881 | func (sc *serverConn) onIdleTimer() { sc.sendServeMsg(idleTimerMsg) } | |
882 | func (sc *serverConn) onShutdownTimer() { sc.sendServeMsg(shutdownTimerMsg) } | |
883 | ||
884 | func (sc *serverConn) sendServeMsg(msg interface{}) { | |
885 | sc.serveG.checkNotOn() // NOT | |
886 | select { | |
887 | case sc.serveMsgCh <- msg: | |
888 | case <-sc.doneServing: | |
889 | } | |
890 | } | |
891 | ||
892 | // readPreface reads the ClientPreface greeting from the peer | |
893 | // or returns an error on timeout or an invalid greeting. | |
894 | func (sc *serverConn) readPreface() error { | |
895 | errc := make(chan error, 1) | |
896 | go func() { | |
897 | // Read the client preface | |
898 | buf := make([]byte, len(ClientPreface)) | |
899 | if _, err := io.ReadFull(sc.conn, buf); err != nil { | |
900 | errc <- err | |
901 | } else if !bytes.Equal(buf, clientPreface) { | |
902 | errc <- fmt.Errorf("bogus greeting %q", buf) | |
903 | } else { | |
904 | errc <- nil | |
905 | } | |
906 | }() | |
907 | timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server? | |
908 | defer timer.Stop() | |
909 | select { | |
910 | case <-timer.C: | |
911 | return errors.New("timeout waiting for client preface") | |
912 | case err := <-errc: | |
913 | if err == nil { | |
914 | if VerboseLogs { | |
915 | sc.vlogf("http2: server: client %v said hello", sc.conn.RemoteAddr()) | |
916 | } | |
917 | } | |
918 | return err | |
919 | } | |
920 | } | |
921 | ||
922 | var errChanPool = sync.Pool{ | |
923 | New: func() interface{} { return make(chan error, 1) }, | |
924 | } | |
925 | ||
926 | var writeDataPool = sync.Pool{ | |
927 | New: func() interface{} { return new(writeData) }, | |
928 | } | |
929 | ||
930 | // writeDataFromHandler writes DATA response frames from a handler on | |
931 | // the given stream. | |
932 | func (sc *serverConn) writeDataFromHandler(stream *stream, data []byte, endStream bool) error { | |
933 | ch := errChanPool.Get().(chan error) | |
934 | writeArg := writeDataPool.Get().(*writeData) | |
935 | *writeArg = writeData{stream.id, data, endStream} | |
936 | err := sc.writeFrameFromHandler(FrameWriteRequest{ | |
937 | write: writeArg, | |
938 | stream: stream, | |
939 | done: ch, | |
940 | }) | |
941 | if err != nil { | |
942 | return err | |
943 | } | |
944 | var frameWriteDone bool // the frame write is done (successfully or not) | |
945 | select { | |
946 | case err = <-ch: | |
947 | frameWriteDone = true | |
948 | case <-sc.doneServing: | |
949 | return errClientDisconnected | |
950 | case <-stream.cw: | |
951 | // If both ch and stream.cw were ready (as might | |
952 | // happen on the final Write after an http.Handler | |
953 | // ends), prefer the write result. Otherwise this | |
954 | // might just be us successfully closing the stream. | |
955 | // The writeFrameAsync and serve goroutines guarantee | |
956 | // that the ch send will happen before the stream.cw | |
957 | // close. | |
958 | select { | |
959 | case err = <-ch: | |
960 | frameWriteDone = true | |
961 | default: | |
962 | return errStreamClosed | |
963 | } | |
964 | } | |
965 | errChanPool.Put(ch) | |
966 | if frameWriteDone { | |
967 | writeDataPool.Put(writeArg) | |
968 | } | |
969 | return err | |
970 | } | |
971 | ||
972 | // writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts | |
973 | // if the connection has gone away. | |
974 | // | |
975 | // This must not be run from the serve goroutine itself, else it might | |
976 | // deadlock writing to sc.wantWriteFrameCh (which is only mildly | |
977 | // buffered and is read by serve itself). If you're on the serve | |
978 | // goroutine, call writeFrame instead. | |
979 | func (sc *serverConn) writeFrameFromHandler(wr FrameWriteRequest) error { | |
980 | sc.serveG.checkNotOn() // NOT | |
981 | select { | |
982 | case sc.wantWriteFrameCh <- wr: | |
983 | return nil | |
984 | case <-sc.doneServing: | |
985 | // Serve loop is gone. | |
986 | // Client has closed their connection to the server. | |
987 | return errClientDisconnected | |
988 | } | |
989 | } | |
990 | ||
991 | // writeFrame schedules a frame to write and sends it if there's nothing | |
992 | // already being written. | |
993 | // | |
994 | // There is no pushback here (the serve goroutine never blocks). It's | |
995 | // the http.Handlers that block, waiting for their previous frames to | |
996 | // make it onto the wire | |
997 | // | |
998 | // If you're not on the serve goroutine, use writeFrameFromHandler instead. | |
999 | func (sc *serverConn) writeFrame(wr FrameWriteRequest) { | |
1000 | sc.serveG.check() | |
1001 | ||
1002 | // If true, wr will not be written and wr.done will not be signaled. | |
1003 | var ignoreWrite bool | |
1004 | ||
1005 | // We are not allowed to write frames on closed streams. RFC 7540 Section | |
1006 | // 5.1.1 says: "An endpoint MUST NOT send frames other than PRIORITY on | |
1007 | // a closed stream." Our server never sends PRIORITY, so that exception | |
1008 | // does not apply. | |
1009 | // | |
1010 | // The serverConn might close an open stream while the stream's handler | |
1011 | // is still running. For example, the server might close a stream when it | |
1012 | // receives bad data from the client. If this happens, the handler might | |
1013 | // attempt to write a frame after the stream has been closed (since the | |
1014 | // handler hasn't yet been notified of the close). In this case, we simply | |
1015 | // ignore the frame. The handler will notice that the stream is closed when | |
1016 | // it waits for the frame to be written. | |
1017 | // | |
1018 | // As an exception to this rule, we allow sending RST_STREAM after close. | |
1019 | // This allows us to immediately reject new streams without tracking any | |
1020 | // state for those streams (except for the queued RST_STREAM frame). This | |
1021 | // may result in duplicate RST_STREAMs in some cases, but the client should | |
1022 | // ignore those. | |
1023 | if wr.StreamID() != 0 { | |
1024 | _, isReset := wr.write.(StreamError) | |
1025 | if state, _ := sc.state(wr.StreamID()); state == stateClosed && !isReset { | |
1026 | ignoreWrite = true | |
1027 | } | |
1028 | } | |
1029 | ||
1030 | // Don't send a 100-continue response if we've already sent headers. | |
1031 | // See golang.org/issue/14030. | |
1032 | switch wr.write.(type) { | |
1033 | case *writeResHeaders: | |
1034 | wr.stream.wroteHeaders = true | |
1035 | case write100ContinueHeadersFrame: | |
1036 | if wr.stream.wroteHeaders { | |
1037 | // We do not need to notify wr.done because this frame is | |
1038 | // never written with wr.done != nil. | |
1039 | if wr.done != nil { | |
1040 | panic("wr.done != nil for write100ContinueHeadersFrame") | |
1041 | } | |
1042 | ignoreWrite = true | |
1043 | } | |
1044 | } | |
1045 | ||
1046 | if !ignoreWrite { | |
1047 | sc.writeSched.Push(wr) | |
1048 | } | |
1049 | sc.scheduleFrameWrite() | |
1050 | } | |
1051 | ||
1052 | // startFrameWrite starts a goroutine to write wr (in a separate | |
1053 | // goroutine since that might block on the network), and updates the | |
1054 | // serve goroutine's state about the world, updated from info in wr. | |
1055 | func (sc *serverConn) startFrameWrite(wr FrameWriteRequest) { | |
1056 | sc.serveG.check() | |
1057 | if sc.writingFrame { | |
1058 | panic("internal error: can only be writing one frame at a time") | |
1059 | } | |
1060 | ||
1061 | st := wr.stream | |
1062 | if st != nil { | |
1063 | switch st.state { | |
1064 | case stateHalfClosedLocal: | |
1065 | switch wr.write.(type) { | |
1066 | case StreamError, handlerPanicRST, writeWindowUpdate: | |
1067 | // RFC 7540 Section 5.1 allows sending RST_STREAM, PRIORITY, and WINDOW_UPDATE | |
1068 | // in this state. (We never send PRIORITY from the server, so that is not checked.) | |
1069 | default: | |
1070 | panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr)) | |
1071 | } | |
1072 | case stateClosed: | |
1073 | panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr)) | |
1074 | } | |
1075 | } | |
1076 | if wpp, ok := wr.write.(*writePushPromise); ok { | |
1077 | var err error | |
1078 | wpp.promisedID, err = wpp.allocatePromisedID() | |
1079 | if err != nil { | |
1080 | sc.writingFrameAsync = false | |
1081 | wr.replyToWriter(err) | |
1082 | return | |
1083 | } | |
1084 | } | |
1085 | ||
1086 | sc.writingFrame = true | |
1087 | sc.needsFrameFlush = true | |
1088 | if wr.write.staysWithinBuffer(sc.bw.Available()) { | |
1089 | sc.writingFrameAsync = false | |
1090 | err := wr.write.writeFrame(sc) | |
1091 | sc.wroteFrame(frameWriteResult{wr, err}) | |
1092 | } else { | |
1093 | sc.writingFrameAsync = true | |
1094 | go sc.writeFrameAsync(wr) | |
1095 | } | |
1096 | } | |
1097 | ||
1098 | // errHandlerPanicked is the error given to any callers blocked in a read from | |
1099 | // Request.Body when the main goroutine panics. Since most handlers read in the | |
1100 | // the main ServeHTTP goroutine, this will show up rarely. | |
1101 | var errHandlerPanicked = errors.New("http2: handler panicked") | |
1102 | ||
1103 | // wroteFrame is called on the serve goroutine with the result of | |
1104 | // whatever happened on writeFrameAsync. | |
1105 | func (sc *serverConn) wroteFrame(res frameWriteResult) { | |
1106 | sc.serveG.check() | |
1107 | if !sc.writingFrame { | |
1108 | panic("internal error: expected to be already writing a frame") | |
1109 | } | |
1110 | sc.writingFrame = false | |
1111 | sc.writingFrameAsync = false | |
1112 | ||
1113 | wr := res.wr | |
1114 | ||
1115 | if writeEndsStream(wr.write) { | |
1116 | st := wr.stream | |
1117 | if st == nil { | |
1118 | panic("internal error: expecting non-nil stream") | |
1119 | } | |
1120 | switch st.state { | |
1121 | case stateOpen: | |
1122 | // Here we would go to stateHalfClosedLocal in | |
1123 | // theory, but since our handler is done and | |
1124 | // the net/http package provides no mechanism | |
1125 | // for closing a ResponseWriter while still | |
1126 | // reading data (see possible TODO at top of | |
1127 | // this file), we go into closed state here | |
1128 | // anyway, after telling the peer we're | |
1129 | // hanging up on them. We'll transition to | |
1130 | // stateClosed after the RST_STREAM frame is | |
1131 | // written. | |
1132 | st.state = stateHalfClosedLocal | |
1133 | // Section 8.1: a server MAY request that the client abort | |
1134 | // transmission of a request without error by sending a | |
1135 | // RST_STREAM with an error code of NO_ERROR after sending | |
1136 | // a complete response. | |
1137 | sc.resetStream(streamError(st.id, ErrCodeNo)) | |
1138 | case stateHalfClosedRemote: | |
1139 | sc.closeStream(st, errHandlerComplete) | |
1140 | } | |
1141 | } else { | |
1142 | switch v := wr.write.(type) { | |
1143 | case StreamError: | |
1144 | // st may be unknown if the RST_STREAM was generated to reject bad input. | |
1145 | if st, ok := sc.streams[v.StreamID]; ok { | |
1146 | sc.closeStream(st, v) | |
1147 | } | |
1148 | case handlerPanicRST: | |
1149 | sc.closeStream(wr.stream, errHandlerPanicked) | |
1150 | } | |
1151 | } | |
1152 | ||
1153 | // Reply (if requested) to unblock the ServeHTTP goroutine. | |
1154 | wr.replyToWriter(res.err) | |
1155 | ||
1156 | sc.scheduleFrameWrite() | |
1157 | } | |
1158 | ||
1159 | // scheduleFrameWrite tickles the frame writing scheduler. | |
1160 | // | |
1161 | // If a frame is already being written, nothing happens. This will be called again | |
1162 | // when the frame is done being written. | |
1163 | // | |
1164 | // If a frame isn't being written we need to send one, the best frame | |
1165 | // to send is selected, preferring first things that aren't | |
1166 | // stream-specific (e.g. ACKing settings), and then finding the | |
1167 | // highest priority stream. | |
1168 | // | |
1169 | // If a frame isn't being written and there's nothing else to send, we | |
1170 | // flush the write buffer. | |
1171 | func (sc *serverConn) scheduleFrameWrite() { | |
1172 | sc.serveG.check() | |
1173 | if sc.writingFrame || sc.inFrameScheduleLoop { | |
1174 | return | |
1175 | } | |
1176 | sc.inFrameScheduleLoop = true | |
1177 | for !sc.writingFrameAsync { | |
1178 | if sc.needToSendGoAway { | |
1179 | sc.needToSendGoAway = false | |
1180 | sc.startFrameWrite(FrameWriteRequest{ | |
1181 | write: &writeGoAway{ | |
1182 | maxStreamID: sc.maxClientStreamID, | |
1183 | code: sc.goAwayCode, | |
1184 | }, | |
1185 | }) | |
1186 | continue | |
1187 | } | |
1188 | if sc.needToSendSettingsAck { | |
1189 | sc.needToSendSettingsAck = false | |
1190 | sc.startFrameWrite(FrameWriteRequest{write: writeSettingsAck{}}) | |
1191 | continue | |
1192 | } | |
1193 | if !sc.inGoAway || sc.goAwayCode == ErrCodeNo { | |
1194 | if wr, ok := sc.writeSched.Pop(); ok { | |
1195 | sc.startFrameWrite(wr) | |
1196 | continue | |
1197 | } | |
1198 | } | |
1199 | if sc.needsFrameFlush { | |
1200 | sc.startFrameWrite(FrameWriteRequest{write: flushFrameWriter{}}) | |
1201 | sc.needsFrameFlush = false // after startFrameWrite, since it sets this true | |
1202 | continue | |
1203 | } | |
1204 | break | |
1205 | } | |
1206 | sc.inFrameScheduleLoop = false | |
1207 | } | |
1208 | ||
1209 | // startGracefulShutdown gracefully shuts down a connection. This | |
1210 | // sends GOAWAY with ErrCodeNo to tell the client we're gracefully | |
1211 | // shutting down. The connection isn't closed until all current | |
1212 | // streams are done. | |
1213 | // | |
1214 | // startGracefulShutdown returns immediately; it does not wait until | |
1215 | // the connection has shut down. | |
1216 | func (sc *serverConn) startGracefulShutdown() { | |
1217 | sc.serveG.checkNotOn() // NOT | |
1218 | sc.shutdownOnce.Do(func() { sc.sendServeMsg(gracefulShutdownMsg) }) | |
1219 | } | |
1220 | ||
1221 | func (sc *serverConn) startGracefulShutdownInternal() { | |
1222 | sc.goAwayIn(ErrCodeNo, 0) | |
1223 | } | |
1224 | ||
1225 | func (sc *serverConn) goAway(code ErrCode) { | |
1226 | sc.serveG.check() | |
1227 | var forceCloseIn time.Duration | |
1228 | if code != ErrCodeNo { | |
1229 | forceCloseIn = 250 * time.Millisecond | |
1230 | } else { | |
1231 | // TODO: configurable | |
1232 | forceCloseIn = 1 * time.Second | |
1233 | } | |
1234 | sc.goAwayIn(code, forceCloseIn) | |
1235 | } | |
1236 | ||
1237 | func (sc *serverConn) goAwayIn(code ErrCode, forceCloseIn time.Duration) { | |
1238 | sc.serveG.check() | |
1239 | if sc.inGoAway { | |
1240 | return | |
1241 | } | |
1242 | if forceCloseIn != 0 { | |
1243 | sc.shutDownIn(forceCloseIn) | |
1244 | } | |
1245 | sc.inGoAway = true | |
1246 | sc.needToSendGoAway = true | |
1247 | sc.goAwayCode = code | |
1248 | sc.scheduleFrameWrite() | |
1249 | } | |
1250 | ||
1251 | func (sc *serverConn) shutDownIn(d time.Duration) { | |
1252 | sc.serveG.check() | |
1253 | sc.shutdownTimer = time.AfterFunc(d, sc.onShutdownTimer) | |
1254 | } | |
1255 | ||
1256 | func (sc *serverConn) resetStream(se StreamError) { | |
1257 | sc.serveG.check() | |
1258 | sc.writeFrame(FrameWriteRequest{write: se}) | |
1259 | if st, ok := sc.streams[se.StreamID]; ok { | |
1260 | st.resetQueued = true | |
1261 | } | |
1262 | } | |
1263 | ||
1264 | // processFrameFromReader processes the serve loop's read from readFrameCh from the | |
1265 | // frame-reading goroutine. | |
1266 | // processFrameFromReader returns whether the connection should be kept open. | |
1267 | func (sc *serverConn) processFrameFromReader(res readFrameResult) bool { | |
1268 | sc.serveG.check() | |
1269 | err := res.err | |
1270 | if err != nil { | |
1271 | if err == ErrFrameTooLarge { | |
1272 | sc.goAway(ErrCodeFrameSize) | |
1273 | return true // goAway will close the loop | |
1274 | } | |
1275 | clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || isClosedConnError(err) | |
1276 | if clientGone { | |
1277 | // TODO: could we also get into this state if | |
1278 | // the peer does a half close | |
1279 | // (e.g. CloseWrite) because they're done | |
1280 | // sending frames but they're still wanting | |
1281 | // our open replies? Investigate. | |
1282 | // TODO: add CloseWrite to crypto/tls.Conn first | |
1283 | // so we have a way to test this? I suppose | |
1284 | // just for testing we could have a non-TLS mode. | |
1285 | return false | |
1286 | } | |
1287 | } else { | |
1288 | f := res.f | |
1289 | if VerboseLogs { | |
1290 | sc.vlogf("http2: server read frame %v", summarizeFrame(f)) | |
1291 | } | |
1292 | err = sc.processFrame(f) | |
1293 | if err == nil { | |
1294 | return true | |
1295 | } | |
1296 | } | |
1297 | ||
1298 | switch ev := err.(type) { | |
1299 | case StreamError: | |
1300 | sc.resetStream(ev) | |
1301 | return true | |
1302 | case goAwayFlowError: | |
1303 | sc.goAway(ErrCodeFlowControl) | |
1304 | return true | |
1305 | case ConnectionError: | |
1306 | sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev) | |
1307 | sc.goAway(ErrCode(ev)) | |
1308 | return true // goAway will handle shutdown | |
1309 | default: | |
1310 | if res.err != nil { | |
1311 | sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err) | |
1312 | } else { | |
1313 | sc.logf("http2: server closing client connection: %v", err) | |
1314 | } | |
1315 | return false | |
1316 | } | |
1317 | } | |
1318 | ||
1319 | func (sc *serverConn) processFrame(f Frame) error { | |
1320 | sc.serveG.check() | |
1321 | ||
1322 | // First frame received must be SETTINGS. | |
1323 | if !sc.sawFirstSettings { | |
1324 | if _, ok := f.(*SettingsFrame); !ok { | |
1325 | return ConnectionError(ErrCodeProtocol) | |
1326 | } | |
1327 | sc.sawFirstSettings = true | |
1328 | } | |
1329 | ||
1330 | switch f := f.(type) { | |
1331 | case *SettingsFrame: | |
1332 | return sc.processSettings(f) | |
1333 | case *MetaHeadersFrame: | |
1334 | return sc.processHeaders(f) | |
1335 | case *WindowUpdateFrame: | |
1336 | return sc.processWindowUpdate(f) | |
1337 | case *PingFrame: | |
1338 | return sc.processPing(f) | |
1339 | case *DataFrame: | |
1340 | return sc.processData(f) | |
1341 | case *RSTStreamFrame: | |
1342 | return sc.processResetStream(f) | |
1343 | case *PriorityFrame: | |
1344 | return sc.processPriority(f) | |
1345 | case *GoAwayFrame: | |
1346 | return sc.processGoAway(f) | |
1347 | case *PushPromiseFrame: | |
1348 | // A client cannot push. Thus, servers MUST treat the receipt of a PUSH_PROMISE | |
1349 | // frame as a connection error (Section 5.4.1) of type PROTOCOL_ERROR. | |
1350 | return ConnectionError(ErrCodeProtocol) | |
1351 | default: | |
1352 | sc.vlogf("http2: server ignoring frame: %v", f.Header()) | |
1353 | return nil | |
1354 | } | |
1355 | } | |
1356 | ||
1357 | func (sc *serverConn) processPing(f *PingFrame) error { | |
1358 | sc.serveG.check() | |
1359 | if f.IsAck() { | |
1360 | // 6.7 PING: " An endpoint MUST NOT respond to PING frames | |
1361 | // containing this flag." | |
1362 | return nil | |
1363 | } | |
1364 | if f.StreamID != 0 { | |
1365 | // "PING frames are not associated with any individual | |
1366 | // stream. If a PING frame is received with a stream | |
1367 | // identifier field value other than 0x0, the recipient MUST | |
1368 | // respond with a connection error (Section 5.4.1) of type | |
1369 | // PROTOCOL_ERROR." | |
1370 | return ConnectionError(ErrCodeProtocol) | |
1371 | } | |
1372 | if sc.inGoAway && sc.goAwayCode != ErrCodeNo { | |
1373 | return nil | |
1374 | } | |
1375 | sc.writeFrame(FrameWriteRequest{write: writePingAck{f}}) | |
1376 | return nil | |
1377 | } | |
1378 | ||
1379 | func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error { | |
1380 | sc.serveG.check() | |
1381 | switch { | |
1382 | case f.StreamID != 0: // stream-level flow control | |
1383 | state, st := sc.state(f.StreamID) | |
1384 | if state == stateIdle { | |
1385 | // Section 5.1: "Receiving any frame other than HEADERS | |
1386 | // or PRIORITY on a stream in this state MUST be | |
1387 | // treated as a connection error (Section 5.4.1) of | |
1388 | // type PROTOCOL_ERROR." | |
1389 | return ConnectionError(ErrCodeProtocol) | |
1390 | } | |
1391 | if st == nil { | |
1392 | // "WINDOW_UPDATE can be sent by a peer that has sent a | |
1393 | // frame bearing the END_STREAM flag. This means that a | |
1394 | // receiver could receive a WINDOW_UPDATE frame on a "half | |
1395 | // closed (remote)" or "closed" stream. A receiver MUST | |
1396 | // NOT treat this as an error, see Section 5.1." | |
1397 | return nil | |
1398 | } | |
1399 | if !st.flow.add(int32(f.Increment)) { | |
1400 | return streamError(f.StreamID, ErrCodeFlowControl) | |
1401 | } | |
1402 | default: // connection-level flow control | |
1403 | if !sc.flow.add(int32(f.Increment)) { | |
1404 | return goAwayFlowError{} | |
1405 | } | |
1406 | } | |
1407 | sc.scheduleFrameWrite() | |
1408 | return nil | |
1409 | } | |
1410 | ||
1411 | func (sc *serverConn) processResetStream(f *RSTStreamFrame) error { | |
1412 | sc.serveG.check() | |
1413 | ||
1414 | state, st := sc.state(f.StreamID) | |
1415 | if state == stateIdle { | |
1416 | // 6.4 "RST_STREAM frames MUST NOT be sent for a | |
1417 | // stream in the "idle" state. If a RST_STREAM frame | |
1418 | // identifying an idle stream is received, the | |
1419 | // recipient MUST treat this as a connection error | |
1420 | // (Section 5.4.1) of type PROTOCOL_ERROR. | |
1421 | return ConnectionError(ErrCodeProtocol) | |
1422 | } | |
1423 | if st != nil { | |
1424 | st.cancelCtx() | |
1425 | sc.closeStream(st, streamError(f.StreamID, f.ErrCode)) | |
1426 | } | |
1427 | return nil | |
1428 | } | |
1429 | ||
1430 | func (sc *serverConn) closeStream(st *stream, err error) { | |
1431 | sc.serveG.check() | |
1432 | if st.state == stateIdle || st.state == stateClosed { | |
1433 | panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state)) | |
1434 | } | |
1435 | st.state = stateClosed | |
1436 | if st.writeDeadline != nil { | |
1437 | st.writeDeadline.Stop() | |
1438 | } | |
1439 | if st.isPushed() { | |
1440 | sc.curPushedStreams-- | |
1441 | } else { | |
1442 | sc.curClientStreams-- | |
1443 | } | |
1444 | delete(sc.streams, st.id) | |
1445 | if len(sc.streams) == 0 { | |
1446 | sc.setConnState(http.StateIdle) | |
1447 | if sc.srv.IdleTimeout != 0 { | |
1448 | sc.idleTimer.Reset(sc.srv.IdleTimeout) | |
1449 | } | |
1450 | if h1ServerKeepAlivesDisabled(sc.hs) { | |
1451 | sc.startGracefulShutdownInternal() | |
1452 | } | |
1453 | } | |
1454 | if p := st.body; p != nil { | |
1455 | // Return any buffered unread bytes worth of conn-level flow control. | |
1456 | // See golang.org/issue/16481 | |
1457 | sc.sendWindowUpdate(nil, p.Len()) | |
1458 | ||
1459 | p.CloseWithError(err) | |
1460 | } | |
1461 | st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc | |
1462 | sc.writeSched.CloseStream(st.id) | |
1463 | } | |
1464 | ||
1465 | func (sc *serverConn) processSettings(f *SettingsFrame) error { | |
1466 | sc.serveG.check() | |
1467 | if f.IsAck() { | |
1468 | sc.unackedSettings-- | |
1469 | if sc.unackedSettings < 0 { | |
1470 | // Why is the peer ACKing settings we never sent? | |
1471 | // The spec doesn't mention this case, but | |
1472 | // hang up on them anyway. | |
1473 | return ConnectionError(ErrCodeProtocol) | |
1474 | } | |
1475 | return nil | |
1476 | } | |
1477 | if err := f.ForeachSetting(sc.processSetting); err != nil { | |
1478 | return err | |
1479 | } | |
1480 | sc.needToSendSettingsAck = true | |
1481 | sc.scheduleFrameWrite() | |
1482 | return nil | |
1483 | } | |
1484 | ||
1485 | func (sc *serverConn) processSetting(s Setting) error { | |
1486 | sc.serveG.check() | |
1487 | if err := s.Valid(); err != nil { | |
1488 | return err | |
1489 | } | |
1490 | if VerboseLogs { | |
1491 | sc.vlogf("http2: server processing setting %v", s) | |
1492 | } | |
1493 | switch s.ID { | |
1494 | case SettingHeaderTableSize: | |
1495 | sc.headerTableSize = s.Val | |
1496 | sc.hpackEncoder.SetMaxDynamicTableSize(s.Val) | |
1497 | case SettingEnablePush: | |
1498 | sc.pushEnabled = s.Val != 0 | |
1499 | case SettingMaxConcurrentStreams: | |
1500 | sc.clientMaxStreams = s.Val | |
1501 | case SettingInitialWindowSize: | |
1502 | return sc.processSettingInitialWindowSize(s.Val) | |
1503 | case SettingMaxFrameSize: | |
1504 | sc.maxFrameSize = int32(s.Val) // the maximum valid s.Val is < 2^31 | |
1505 | case SettingMaxHeaderListSize: | |
1506 | sc.peerMaxHeaderListSize = s.Val | |
1507 | default: | |
1508 | // Unknown setting: "An endpoint that receives a SETTINGS | |
1509 | // frame with any unknown or unsupported identifier MUST | |
1510 | // ignore that setting." | |
1511 | if VerboseLogs { | |
1512 | sc.vlogf("http2: server ignoring unknown setting %v", s) | |
1513 | } | |
1514 | } | |
1515 | return nil | |
1516 | } | |
1517 | ||
1518 | func (sc *serverConn) processSettingInitialWindowSize(val uint32) error { | |
1519 | sc.serveG.check() | |
1520 | // Note: val already validated to be within range by | |
1521 | // processSetting's Valid call. | |
1522 | ||
1523 | // "A SETTINGS frame can alter the initial flow control window | |
1524 | // size for all current streams. When the value of | |
1525 | // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST | |
1526 | // adjust the size of all stream flow control windows that it | |
1527 | // maintains by the difference between the new value and the | |
1528 | // old value." | |
1529 | old := sc.initialStreamSendWindowSize | |
1530 | sc.initialStreamSendWindowSize = int32(val) | |
1531 | growth := int32(val) - old // may be negative | |
1532 | for _, st := range sc.streams { | |
1533 | if !st.flow.add(growth) { | |
1534 | // 6.9.2 Initial Flow Control Window Size | |
1535 | // "An endpoint MUST treat a change to | |
1536 | // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow | |
1537 | // control window to exceed the maximum size as a | |
1538 | // connection error (Section 5.4.1) of type | |
1539 | // FLOW_CONTROL_ERROR." | |
1540 | return ConnectionError(ErrCodeFlowControl) | |
1541 | } | |
1542 | } | |
1543 | return nil | |
1544 | } | |
1545 | ||
1546 | func (sc *serverConn) processData(f *DataFrame) error { | |
1547 | sc.serveG.check() | |
1548 | if sc.inGoAway && sc.goAwayCode != ErrCodeNo { | |
1549 | return nil | |
1550 | } | |
1551 | data := f.Data() | |
1552 | ||
1553 | // "If a DATA frame is received whose stream is not in "open" | |
1554 | // or "half closed (local)" state, the recipient MUST respond | |
1555 | // with a stream error (Section 5.4.2) of type STREAM_CLOSED." | |
1556 | id := f.Header().StreamID | |
1557 | state, st := sc.state(id) | |
1558 | if id == 0 || state == stateIdle { | |
1559 | // Section 5.1: "Receiving any frame other than HEADERS | |
1560 | // or PRIORITY on a stream in this state MUST be | |
1561 | // treated as a connection error (Section 5.4.1) of | |
1562 | // type PROTOCOL_ERROR." | |
1563 | return ConnectionError(ErrCodeProtocol) | |
1564 | } | |
1565 | if st == nil || state != stateOpen || st.gotTrailerHeader || st.resetQueued { | |
1566 | // This includes sending a RST_STREAM if the stream is | |
1567 | // in stateHalfClosedLocal (which currently means that | |
1568 | // the http.Handler returned, so it's done reading & | |
1569 | // done writing). Try to stop the client from sending | |
1570 | // more DATA. | |
1571 | ||
1572 | // But still enforce their connection-level flow control, | |
1573 | // and return any flow control bytes since we're not going | |
1574 | // to consume them. | |
1575 | if sc.inflow.available() < int32(f.Length) { | |
1576 | return streamError(id, ErrCodeFlowControl) | |
1577 | } | |
1578 | // Deduct the flow control from inflow, since we're | |
1579 | // going to immediately add it back in | |
1580 | // sendWindowUpdate, which also schedules sending the | |
1581 | // frames. | |
1582 | sc.inflow.take(int32(f.Length)) | |
1583 | sc.sendWindowUpdate(nil, int(f.Length)) // conn-level | |
1584 | ||
1585 | if st != nil && st.resetQueued { | |
1586 | // Already have a stream error in flight. Don't send another. | |
1587 | return nil | |
1588 | } | |
1589 | return streamError(id, ErrCodeStreamClosed) | |
1590 | } | |
1591 | if st.body == nil { | |
1592 | panic("internal error: should have a body in this state") | |
1593 | } | |
1594 | ||
1595 | // Sender sending more than they'd declared? | |
1596 | if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { | |
1597 | st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) | |
1598 | return streamError(id, ErrCodeStreamClosed) | |
1599 | } | |
1600 | if f.Length > 0 { | |
1601 | // Check whether the client has flow control quota. | |
1602 | if st.inflow.available() < int32(f.Length) { | |
1603 | return streamError(id, ErrCodeFlowControl) | |
1604 | } | |
1605 | st.inflow.take(int32(f.Length)) | |
1606 | ||
1607 | if len(data) > 0 { | |
1608 | wrote, err := st.body.Write(data) | |
1609 | if err != nil { | |
1610 | return streamError(id, ErrCodeStreamClosed) | |
1611 | } | |
1612 | if wrote != len(data) { | |
1613 | panic("internal error: bad Writer") | |
1614 | } | |
1615 | st.bodyBytes += int64(len(data)) | |
1616 | } | |
1617 | ||
1618 | // Return any padded flow control now, since we won't | |
1619 | // refund it later on body reads. | |
1620 | if pad := int32(f.Length) - int32(len(data)); pad > 0 { | |
1621 | sc.sendWindowUpdate32(nil, pad) | |
1622 | sc.sendWindowUpdate32(st, pad) | |
1623 | } | |
1624 | } | |
1625 | if f.StreamEnded() { | |
1626 | st.endStream() | |
1627 | } | |
1628 | return nil | |
1629 | } | |
1630 | ||
1631 | func (sc *serverConn) processGoAway(f *GoAwayFrame) error { | |
1632 | sc.serveG.check() | |
1633 | if f.ErrCode != ErrCodeNo { | |
1634 | sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f) | |
1635 | } else { | |
1636 | sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f) | |
1637 | } | |
1638 | sc.startGracefulShutdownInternal() | |
1639 | // http://tools.ietf.org/html/rfc7540#section-6.8 | |
1640 | // We should not create any new streams, which means we should disable push. | |
1641 | sc.pushEnabled = false | |
1642 | return nil | |
1643 | } | |
1644 | ||
1645 | // isPushed reports whether the stream is server-initiated. | |
1646 | func (st *stream) isPushed() bool { | |
1647 | return st.id%2 == 0 | |
1648 | } | |
1649 | ||
1650 | // endStream closes a Request.Body's pipe. It is called when a DATA | |
1651 | // frame says a request body is over (or after trailers). | |
1652 | func (st *stream) endStream() { | |
1653 | sc := st.sc | |
1654 | sc.serveG.check() | |
1655 | ||
1656 | if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes { | |
1657 | st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes", | |
1658 | st.declBodyBytes, st.bodyBytes)) | |
1659 | } else { | |
1660 | st.body.closeWithErrorAndCode(io.EOF, st.copyTrailersToHandlerRequest) | |
1661 | st.body.CloseWithError(io.EOF) | |
1662 | } | |
1663 | st.state = stateHalfClosedRemote | |
1664 | } | |
1665 | ||
1666 | // copyTrailersToHandlerRequest is run in the Handler's goroutine in | |
1667 | // its Request.Body.Read just before it gets io.EOF. | |
1668 | func (st *stream) copyTrailersToHandlerRequest() { | |
1669 | for k, vv := range st.trailer { | |
1670 | if _, ok := st.reqTrailer[k]; ok { | |
1671 | // Only copy it over it was pre-declared. | |
1672 | st.reqTrailer[k] = vv | |
1673 | } | |
1674 | } | |
1675 | } | |
1676 | ||
1677 | // onWriteTimeout is run on its own goroutine (from time.AfterFunc) | |
1678 | // when the stream's WriteTimeout has fired. | |
1679 | func (st *stream) onWriteTimeout() { | |
1680 | st.sc.writeFrameFromHandler(FrameWriteRequest{write: streamError(st.id, ErrCodeInternal)}) | |
1681 | } | |
1682 | ||
1683 | func (sc *serverConn) processHeaders(f *MetaHeadersFrame) error { | |
1684 | sc.serveG.check() | |
1685 | id := f.StreamID | |
1686 | if sc.inGoAway { | |
1687 | // Ignore. | |
1688 | return nil | |
1689 | } | |
1690 | // http://tools.ietf.org/html/rfc7540#section-5.1.1 | |
1691 | // Streams initiated by a client MUST use odd-numbered stream | |
1692 | // identifiers. [...] An endpoint that receives an unexpected | |
1693 | // stream identifier MUST respond with a connection error | |
1694 | // (Section 5.4.1) of type PROTOCOL_ERROR. | |
1695 | if id%2 != 1 { | |
1696 | return ConnectionError(ErrCodeProtocol) | |
1697 | } | |
1698 | // A HEADERS frame can be used to create a new stream or | |
1699 | // send a trailer for an open one. If we already have a stream | |
1700 | // open, let it process its own HEADERS frame (trailers at this | |
1701 | // point, if it's valid). | |
1702 | if st := sc.streams[f.StreamID]; st != nil { | |
1703 | if st.resetQueued { | |
1704 | // We're sending RST_STREAM to close the stream, so don't bother | |
1705 | // processing this frame. | |
1706 | return nil | |
1707 | } | |
1708 | return st.processTrailerHeaders(f) | |
1709 | } | |
1710 | ||
1711 | // [...] The identifier of a newly established stream MUST be | |
1712 | // numerically greater than all streams that the initiating | |
1713 | // endpoint has opened or reserved. [...] An endpoint that | |
1714 | // receives an unexpected stream identifier MUST respond with | |
1715 | // a connection error (Section 5.4.1) of type PROTOCOL_ERROR. | |
1716 | if id <= sc.maxClientStreamID { | |
1717 | return ConnectionError(ErrCodeProtocol) | |
1718 | } | |
1719 | sc.maxClientStreamID = id | |
1720 | ||
1721 | if sc.idleTimer != nil { | |
1722 | sc.idleTimer.Stop() | |
1723 | } | |
1724 | ||
1725 | // http://tools.ietf.org/html/rfc7540#section-5.1.2 | |
1726 | // [...] Endpoints MUST NOT exceed the limit set by their peer. An | |
1727 | // endpoint that receives a HEADERS frame that causes their | |
1728 | // advertised concurrent stream limit to be exceeded MUST treat | |
1729 | // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR | |
1730 | // or REFUSED_STREAM. | |
1731 | if sc.curClientStreams+1 > sc.advMaxStreams { | |
1732 | if sc.unackedSettings == 0 { | |
1733 | // They should know better. | |
1734 | return streamError(id, ErrCodeProtocol) | |
1735 | } | |
1736 | // Assume it's a network race, where they just haven't | |
1737 | // received our last SETTINGS update. But actually | |
1738 | // this can't happen yet, because we don't yet provide | |
1739 | // a way for users to adjust server parameters at | |
1740 | // runtime. | |
1741 | return streamError(id, ErrCodeRefusedStream) | |
1742 | } | |
1743 | ||
1744 | initialState := stateOpen | |
1745 | if f.StreamEnded() { | |
1746 | initialState = stateHalfClosedRemote | |
1747 | } | |
1748 | st := sc.newStream(id, 0, initialState) | |
1749 | ||
1750 | if f.HasPriority() { | |
1751 | if err := checkPriority(f.StreamID, f.Priority); err != nil { | |
1752 | return err | |
1753 | } | |
1754 | sc.writeSched.AdjustStream(st.id, f.Priority) | |
1755 | } | |
1756 | ||
1757 | rw, req, err := sc.newWriterAndRequest(st, f) | |
1758 | if err != nil { | |
1759 | return err | |
1760 | } | |
1761 | st.reqTrailer = req.Trailer | |
1762 | if st.reqTrailer != nil { | |
1763 | st.trailer = make(http.Header) | |
1764 | } | |
1765 | st.body = req.Body.(*requestBody).pipe // may be nil | |
1766 | st.declBodyBytes = req.ContentLength | |
1767 | ||
1768 | handler := sc.handler.ServeHTTP | |
1769 | if f.Truncated { | |
1770 | // Their header list was too long. Send a 431 error. | |
1771 | handler = handleHeaderListTooLong | |
1772 | } else if err := checkValidHTTP2RequestHeaders(req.Header); err != nil { | |
1773 | handler = new400Handler(err) | |
1774 | } | |
1775 | ||
1776 | // The net/http package sets the read deadline from the | |
1777 | // http.Server.ReadTimeout during the TLS handshake, but then | |
1778 | // passes the connection off to us with the deadline already | |
1779 | // set. Disarm it here after the request headers are read, | |
1780 | // similar to how the http1 server works. Here it's | |
1781 | // technically more like the http1 Server's ReadHeaderTimeout | |
1782 | // (in Go 1.8), though. That's a more sane option anyway. | |
1783 | if sc.hs.ReadTimeout != 0 { | |
1784 | sc.conn.SetReadDeadline(time.Time{}) | |
1785 | } | |
1786 | ||
1787 | go sc.runHandler(rw, req, handler) | |
1788 | return nil | |
1789 | } | |
1790 | ||
1791 | func (st *stream) processTrailerHeaders(f *MetaHeadersFrame) error { | |
1792 | sc := st.sc | |
1793 | sc.serveG.check() | |
1794 | if st.gotTrailerHeader { | |
1795 | return ConnectionError(ErrCodeProtocol) | |
1796 | } | |
1797 | st.gotTrailerHeader = true | |
1798 | if !f.StreamEnded() { | |
1799 | return streamError(st.id, ErrCodeProtocol) | |
1800 | } | |
1801 | ||
1802 | if len(f.PseudoFields()) > 0 { | |
1803 | return streamError(st.id, ErrCodeProtocol) | |
1804 | } | |
1805 | if st.trailer != nil { | |
1806 | for _, hf := range f.RegularFields() { | |
1807 | key := sc.canonicalHeader(hf.Name) | |
1808 | if !ValidTrailerHeader(key) { | |
1809 | // TODO: send more details to the peer somehow. But http2 has | |
1810 | // no way to send debug data at a stream level. Discuss with | |
1811 | // HTTP folk. | |
1812 | return streamError(st.id, ErrCodeProtocol) | |
1813 | } | |
1814 | st.trailer[key] = append(st.trailer[key], hf.Value) | |
1815 | } | |
1816 | } | |
1817 | st.endStream() | |
1818 | return nil | |
1819 | } | |
1820 | ||
1821 | func checkPriority(streamID uint32, p PriorityParam) error { | |
1822 | if streamID == p.StreamDep { | |
1823 | // Section 5.3.1: "A stream cannot depend on itself. An endpoint MUST treat | |
1824 | // this as a stream error (Section 5.4.2) of type PROTOCOL_ERROR." | |
1825 | // Section 5.3.3 says that a stream can depend on one of its dependencies, | |
1826 | // so it's only self-dependencies that are forbidden. | |
1827 | return streamError(streamID, ErrCodeProtocol) | |
1828 | } | |
1829 | return nil | |
1830 | } | |
1831 | ||
1832 | func (sc *serverConn) processPriority(f *PriorityFrame) error { | |
1833 | if sc.inGoAway { | |
1834 | return nil | |
1835 | } | |
1836 | if err := checkPriority(f.StreamID, f.PriorityParam); err != nil { | |
1837 | return err | |
1838 | } | |
1839 | sc.writeSched.AdjustStream(f.StreamID, f.PriorityParam) | |
1840 | return nil | |
1841 | } | |
1842 | ||
1843 | func (sc *serverConn) newStream(id, pusherID uint32, state streamState) *stream { | |
1844 | sc.serveG.check() | |
1845 | if id == 0 { | |
1846 | panic("internal error: cannot create stream with id 0") | |
1847 | } | |
1848 | ||
1849 | ctx, cancelCtx := contextWithCancel(sc.baseCtx) | |
1850 | st := &stream{ | |
1851 | sc: sc, | |
1852 | id: id, | |
1853 | state: state, | |
1854 | ctx: ctx, | |
1855 | cancelCtx: cancelCtx, | |
1856 | } | |
1857 | st.cw.Init() | |
1858 | st.flow.conn = &sc.flow // link to conn-level counter | |
1859 | st.flow.add(sc.initialStreamSendWindowSize) | |
1860 | st.inflow.conn = &sc.inflow // link to conn-level counter | |
1861 | st.inflow.add(sc.srv.initialStreamRecvWindowSize()) | |
1862 | if sc.hs.WriteTimeout != 0 { | |
1863 | st.writeDeadline = time.AfterFunc(sc.hs.WriteTimeout, st.onWriteTimeout) | |
1864 | } | |
1865 | ||
1866 | sc.streams[id] = st | |
1867 | sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID}) | |
1868 | if st.isPushed() { | |
1869 | sc.curPushedStreams++ | |
1870 | } else { | |
1871 | sc.curClientStreams++ | |
1872 | } | |
1873 | if sc.curOpenStreams() == 1 { | |
1874 | sc.setConnState(http.StateActive) | |
1875 | } | |
1876 | ||
1877 | return st | |
1878 | } | |
1879 | ||
1880 | func (sc *serverConn) newWriterAndRequest(st *stream, f *MetaHeadersFrame) (*responseWriter, *http.Request, error) { | |
1881 | sc.serveG.check() | |
1882 | ||
1883 | rp := requestParam{ | |
1884 | method: f.PseudoValue("method"), | |
1885 | scheme: f.PseudoValue("scheme"), | |
1886 | authority: f.PseudoValue("authority"), | |
1887 | path: f.PseudoValue("path"), | |
1888 | } | |
1889 | ||
1890 | isConnect := rp.method == "CONNECT" | |
1891 | if isConnect { | |
1892 | if rp.path != "" || rp.scheme != "" || rp.authority == "" { | |
1893 | return nil, nil, streamError(f.StreamID, ErrCodeProtocol) | |
1894 | } | |
1895 | } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") { | |
1896 | // See 8.1.2.6 Malformed Requests and Responses: | |
1897 | // | |
1898 | // Malformed requests or responses that are detected | |
1899 | // MUST be treated as a stream error (Section 5.4.2) | |
1900 | // of type PROTOCOL_ERROR." | |
1901 | // | |
1902 | // 8.1.2.3 Request Pseudo-Header Fields | |
1903 | // "All HTTP/2 requests MUST include exactly one valid | |
1904 | // value for the :method, :scheme, and :path | |
1905 | // pseudo-header fields" | |
1906 | return nil, nil, streamError(f.StreamID, ErrCodeProtocol) | |
1907 | } | |
1908 | ||
1909 | bodyOpen := !f.StreamEnded() | |
1910 | if rp.method == "HEAD" && bodyOpen { | |
1911 | // HEAD requests can't have bodies | |
1912 | return nil, nil, streamError(f.StreamID, ErrCodeProtocol) | |
1913 | } | |
1914 | ||
1915 | rp.header = make(http.Header) | |
1916 | for _, hf := range f.RegularFields() { | |
1917 | rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value) | |
1918 | } | |
1919 | if rp.authority == "" { | |
1920 | rp.authority = rp.header.Get("Host") | |
1921 | } | |
1922 | ||
1923 | rw, req, err := sc.newWriterAndRequestNoBody(st, rp) | |
1924 | if err != nil { | |
1925 | return nil, nil, err | |
1926 | } | |
1927 | if bodyOpen { | |
1928 | if vv, ok := rp.header["Content-Length"]; ok { | |
1929 | req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64) | |
1930 | } else { | |
1931 | req.ContentLength = -1 | |
1932 | } | |
1933 | req.Body.(*requestBody).pipe = &pipe{ | |
1934 | b: &dataBuffer{expected: req.ContentLength}, | |
1935 | } | |
1936 | } | |
1937 | return rw, req, nil | |
1938 | } | |
1939 | ||
1940 | type requestParam struct { | |
1941 | method string | |
1942 | scheme, authority, path string | |
1943 | header http.Header | |
1944 | } | |
1945 | ||
1946 | func (sc *serverConn) newWriterAndRequestNoBody(st *stream, rp requestParam) (*responseWriter, *http.Request, error) { | |
1947 | sc.serveG.check() | |
1948 | ||
1949 | var tlsState *tls.ConnectionState // nil if not scheme https | |
1950 | if rp.scheme == "https" { | |
1951 | tlsState = sc.tlsState | |
1952 | } | |
1953 | ||
1954 | needsContinue := rp.header.Get("Expect") == "100-continue" | |
1955 | if needsContinue { | |
1956 | rp.header.Del("Expect") | |
1957 | } | |
1958 | // Merge Cookie headers into one "; "-delimited value. | |
1959 | if cookies := rp.header["Cookie"]; len(cookies) > 1 { | |
1960 | rp.header.Set("Cookie", strings.Join(cookies, "; ")) | |
1961 | } | |
1962 | ||
1963 | // Setup Trailers | |
1964 | var trailer http.Header | |
1965 | for _, v := range rp.header["Trailer"] { | |
1966 | for _, key := range strings.Split(v, ",") { | |
1967 | key = http.CanonicalHeaderKey(strings.TrimSpace(key)) | |
1968 | switch key { | |
1969 | case "Transfer-Encoding", "Trailer", "Content-Length": | |
1970 | // Bogus. (copy of http1 rules) | |
1971 | // Ignore. | |
1972 | default: | |
1973 | if trailer == nil { | |
1974 | trailer = make(http.Header) | |
1975 | } | |
1976 | trailer[key] = nil | |
1977 | } | |
1978 | } | |
1979 | } | |
1980 | delete(rp.header, "Trailer") | |
1981 | ||
1982 | var url_ *url.URL | |
1983 | var requestURI string | |
1984 | if rp.method == "CONNECT" { | |
1985 | url_ = &url.URL{Host: rp.authority} | |
1986 | requestURI = rp.authority // mimic HTTP/1 server behavior | |
1987 | } else { | |
1988 | var err error | |
1989 | url_, err = url.ParseRequestURI(rp.path) | |
1990 | if err != nil { | |
1991 | return nil, nil, streamError(st.id, ErrCodeProtocol) | |
1992 | } | |
1993 | requestURI = rp.path | |
1994 | } | |
1995 | ||
1996 | body := &requestBody{ | |
1997 | conn: sc, | |
1998 | stream: st, | |
1999 | needsContinue: needsContinue, | |
2000 | } | |
2001 | req := &http.Request{ | |
2002 | Method: rp.method, | |
2003 | URL: url_, | |
2004 | RemoteAddr: sc.remoteAddrStr, | |
2005 | Header: rp.header, | |
2006 | RequestURI: requestURI, | |
2007 | Proto: "HTTP/2.0", | |
2008 | ProtoMajor: 2, | |
2009 | ProtoMinor: 0, | |
2010 | TLS: tlsState, | |
2011 | Host: rp.authority, | |
2012 | Body: body, | |
2013 | Trailer: trailer, | |
2014 | } | |
2015 | req = requestWithContext(req, st.ctx) | |
2016 | ||
2017 | rws := responseWriterStatePool.Get().(*responseWriterState) | |
2018 | bwSave := rws.bw | |
2019 | *rws = responseWriterState{} // zero all the fields | |
2020 | rws.conn = sc | |
2021 | rws.bw = bwSave | |
2022 | rws.bw.Reset(chunkWriter{rws}) | |
2023 | rws.stream = st | |
2024 | rws.req = req | |
2025 | rws.body = body | |
2026 | ||
2027 | rw := &responseWriter{rws: rws} | |
2028 | return rw, req, nil | |
2029 | } | |
2030 | ||
2031 | // Run on its own goroutine. | |
2032 | func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request, handler func(http.ResponseWriter, *http.Request)) { | |
2033 | didPanic := true | |
2034 | defer func() { | |
2035 | rw.rws.stream.cancelCtx() | |
2036 | if didPanic { | |
2037 | e := recover() | |
2038 | sc.writeFrameFromHandler(FrameWriteRequest{ | |
2039 | write: handlerPanicRST{rw.rws.stream.id}, | |
2040 | stream: rw.rws.stream, | |
2041 | }) | |
2042 | // Same as net/http: | |
2043 | if shouldLogPanic(e) { | |
2044 | const size = 64 << 10 | |
2045 | buf := make([]byte, size) | |
2046 | buf = buf[:runtime.Stack(buf, false)] | |
2047 | sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf) | |
2048 | } | |
2049 | return | |
2050 | } | |
2051 | rw.handlerDone() | |
2052 | }() | |
2053 | handler(rw, req) | |
2054 | didPanic = false | |
2055 | } | |
2056 | ||
2057 | func handleHeaderListTooLong(w http.ResponseWriter, r *http.Request) { | |
2058 | // 10.5.1 Limits on Header Block Size: | |
2059 | // .. "A server that receives a larger header block than it is | |
2060 | // willing to handle can send an HTTP 431 (Request Header Fields Too | |
2061 | // Large) status code" | |
2062 | const statusRequestHeaderFieldsTooLarge = 431 // only in Go 1.6+ | |
2063 | w.WriteHeader(statusRequestHeaderFieldsTooLarge) | |
2064 | io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>") | |
2065 | } | |
2066 | ||
2067 | // called from handler goroutines. | |
2068 | // h may be nil. | |
2069 | func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) error { | |
2070 | sc.serveG.checkNotOn() // NOT on | |
2071 | var errc chan error | |
2072 | if headerData.h != nil { | |
2073 | // If there's a header map (which we don't own), so we have to block on | |
2074 | // waiting for this frame to be written, so an http.Flush mid-handler | |
2075 | // writes out the correct value of keys, before a handler later potentially | |
2076 | // mutates it. | |
2077 | errc = errChanPool.Get().(chan error) | |
2078 | } | |
2079 | if err := sc.writeFrameFromHandler(FrameWriteRequest{ | |
2080 | write: headerData, | |
2081 | stream: st, | |
2082 | done: errc, | |
2083 | }); err != nil { | |
2084 | return err | |
2085 | } | |
2086 | if errc != nil { | |
2087 | select { | |
2088 | case err := <-errc: | |
2089 | errChanPool.Put(errc) | |
2090 | return err | |
2091 | case <-sc.doneServing: | |
2092 | return errClientDisconnected | |
2093 | case <-st.cw: | |
2094 | return errStreamClosed | |
2095 | } | |
2096 | } | |
2097 | return nil | |
2098 | } | |
2099 | ||
2100 | // called from handler goroutines. | |
2101 | func (sc *serverConn) write100ContinueHeaders(st *stream) { | |
2102 | sc.writeFrameFromHandler(FrameWriteRequest{ | |
2103 | write: write100ContinueHeadersFrame{st.id}, | |
2104 | stream: st, | |
2105 | }) | |
2106 | } | |
2107 | ||
2108 | // A bodyReadMsg tells the server loop that the http.Handler read n | |
2109 | // bytes of the DATA from the client on the given stream. | |
2110 | type bodyReadMsg struct { | |
2111 | st *stream | |
2112 | n int | |
2113 | } | |
2114 | ||
2115 | // called from handler goroutines. | |
2116 | // Notes that the handler for the given stream ID read n bytes of its body | |
2117 | // and schedules flow control tokens to be sent. | |
2118 | func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) { | |
2119 | sc.serveG.checkNotOn() // NOT on | |
2120 | if n > 0 { | |
2121 | select { | |
2122 | case sc.bodyReadCh <- bodyReadMsg{st, n}: | |
2123 | case <-sc.doneServing: | |
2124 | } | |
2125 | } | |
2126 | } | |
2127 | ||
2128 | func (sc *serverConn) noteBodyRead(st *stream, n int) { | |
2129 | sc.serveG.check() | |
2130 | sc.sendWindowUpdate(nil, n) // conn-level | |
2131 | if st.state != stateHalfClosedRemote && st.state != stateClosed { | |
2132 | // Don't send this WINDOW_UPDATE if the stream is closed | |
2133 | // remotely. | |
2134 | sc.sendWindowUpdate(st, n) | |
2135 | } | |
2136 | } | |
2137 | ||
2138 | // st may be nil for conn-level | |
2139 | func (sc *serverConn) sendWindowUpdate(st *stream, n int) { | |
2140 | sc.serveG.check() | |
2141 | // "The legal range for the increment to the flow control | |
2142 | // window is 1 to 2^31-1 (2,147,483,647) octets." | |
2143 | // A Go Read call on 64-bit machines could in theory read | |
2144 | // a larger Read than this. Very unlikely, but we handle it here | |
2145 | // rather than elsewhere for now. | |
2146 | const maxUint31 = 1<<31 - 1 | |
2147 | for n >= maxUint31 { | |
2148 | sc.sendWindowUpdate32(st, maxUint31) | |
2149 | n -= maxUint31 | |
2150 | } | |
2151 | sc.sendWindowUpdate32(st, int32(n)) | |
2152 | } | |
2153 | ||
2154 | // st may be nil for conn-level | |
2155 | func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) { | |
2156 | sc.serveG.check() | |
2157 | if n == 0 { | |
2158 | return | |
2159 | } | |
2160 | if n < 0 { | |
2161 | panic("negative update") | |
2162 | } | |
2163 | var streamID uint32 | |
2164 | if st != nil { | |
2165 | streamID = st.id | |
2166 | } | |
2167 | sc.writeFrame(FrameWriteRequest{ | |
2168 | write: writeWindowUpdate{streamID: streamID, n: uint32(n)}, | |
2169 | stream: st, | |
2170 | }) | |
2171 | var ok bool | |
2172 | if st == nil { | |
2173 | ok = sc.inflow.add(n) | |
2174 | } else { | |
2175 | ok = st.inflow.add(n) | |
2176 | } | |
2177 | if !ok { | |
2178 | panic("internal error; sent too many window updates without decrements?") | |
2179 | } | |
2180 | } | |
2181 | ||
2182 | // requestBody is the Handler's Request.Body type. | |
2183 | // Read and Close may be called concurrently. | |
2184 | type requestBody struct { | |
2185 | stream *stream | |
2186 | conn *serverConn | |
2187 | closed bool // for use by Close only | |
2188 | sawEOF bool // for use by Read only | |
2189 | pipe *pipe // non-nil if we have a HTTP entity message body | |
2190 | needsContinue bool // need to send a 100-continue | |
2191 | } | |
2192 | ||
2193 | func (b *requestBody) Close() error { | |
2194 | if b.pipe != nil && !b.closed { | |
2195 | b.pipe.BreakWithError(errClosedBody) | |
2196 | } | |
2197 | b.closed = true | |
2198 | return nil | |
2199 | } | |
2200 | ||
2201 | func (b *requestBody) Read(p []byte) (n int, err error) { | |
2202 | if b.needsContinue { | |
2203 | b.needsContinue = false | |
2204 | b.conn.write100ContinueHeaders(b.stream) | |
2205 | } | |
2206 | if b.pipe == nil || b.sawEOF { | |
2207 | return 0, io.EOF | |
2208 | } | |
2209 | n, err = b.pipe.Read(p) | |
2210 | if err == io.EOF { | |
2211 | b.sawEOF = true | |
2212 | } | |
2213 | if b.conn == nil && inTests { | |
2214 | return | |
2215 | } | |
2216 | b.conn.noteBodyReadFromHandler(b.stream, n, err) | |
2217 | return | |
2218 | } | |
2219 | ||
2220 | // responseWriter is the http.ResponseWriter implementation. It's | |
2221 | // intentionally small (1 pointer wide) to minimize garbage. The | |
2222 | // responseWriterState pointer inside is zeroed at the end of a | |
2223 | // request (in handlerDone) and calls on the responseWriter thereafter | |
2224 | // simply crash (caller's mistake), but the much larger responseWriterState | |
2225 | // and buffers are reused between multiple requests. | |
2226 | type responseWriter struct { | |
2227 | rws *responseWriterState | |
2228 | } | |
2229 | ||
2230 | // Optional http.ResponseWriter interfaces implemented. | |
2231 | var ( | |
2232 | _ http.CloseNotifier = (*responseWriter)(nil) | |
2233 | _ http.Flusher = (*responseWriter)(nil) | |
2234 | _ stringWriter = (*responseWriter)(nil) | |
2235 | ) | |
2236 | ||
2237 | type responseWriterState struct { | |
2238 | // immutable within a request: | |
2239 | stream *stream | |
2240 | req *http.Request | |
2241 | body *requestBody // to close at end of request, if DATA frames didn't | |
2242 | conn *serverConn | |
2243 | ||
2244 | // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc | |
2245 | bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState} | |
2246 | ||
2247 | // mutated by http.Handler goroutine: | |
2248 | handlerHeader http.Header // nil until called | |
2249 | snapHeader http.Header // snapshot of handlerHeader at WriteHeader time | |
2250 | trailers []string // set in writeChunk | |
2251 | status int // status code passed to WriteHeader | |
2252 | wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet. | |
2253 | sentHeader bool // have we sent the header frame? | |
2254 | handlerDone bool // handler has finished | |
2255 | dirty bool // a Write failed; don't reuse this responseWriterState | |
2256 | ||
2257 | sentContentLen int64 // non-zero if handler set a Content-Length header | |
2258 | wroteBytes int64 | |
2259 | ||
2260 | closeNotifierMu sync.Mutex // guards closeNotifierCh | |
2261 | closeNotifierCh chan bool // nil until first used | |
2262 | } | |
2263 | ||
2264 | type chunkWriter struct{ rws *responseWriterState } | |
2265 | ||
2266 | func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) } | |
2267 | ||
2268 | func (rws *responseWriterState) hasTrailers() bool { return len(rws.trailers) != 0 } | |
2269 | ||
2270 | // declareTrailer is called for each Trailer header when the | |
2271 | // response header is written. It notes that a header will need to be | |
2272 | // written in the trailers at the end of the response. | |
2273 | func (rws *responseWriterState) declareTrailer(k string) { | |
2274 | k = http.CanonicalHeaderKey(k) | |
2275 | if !ValidTrailerHeader(k) { | |
2276 | // Forbidden by RFC 2616 14.40. | |
2277 | rws.conn.logf("ignoring invalid trailer %q", k) | |
2278 | return | |
2279 | } | |
2280 | if !strSliceContains(rws.trailers, k) { | |
2281 | rws.trailers = append(rws.trailers, k) | |
2282 | } | |
2283 | } | |
2284 | ||
2285 | // writeChunk writes chunks from the bufio.Writer. But because | |
2286 | // bufio.Writer may bypass its chunking, sometimes p may be | |
2287 | // arbitrarily large. | |
2288 | // | |
2289 | // writeChunk is also responsible (on the first chunk) for sending the | |
2290 | // HEADER response. | |
2291 | func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) { | |
2292 | if !rws.wroteHeader { | |
2293 | rws.writeHeader(200) | |
2294 | } | |
2295 | ||
2296 | isHeadResp := rws.req.Method == "HEAD" | |
2297 | if !rws.sentHeader { | |
2298 | rws.sentHeader = true | |
2299 | var ctype, clen string | |
2300 | if clen = rws.snapHeader.Get("Content-Length"); clen != "" { | |
2301 | rws.snapHeader.Del("Content-Length") | |
2302 | clen64, err := strconv.ParseInt(clen, 10, 64) | |
2303 | if err == nil && clen64 >= 0 { | |
2304 | rws.sentContentLen = clen64 | |
2305 | } else { | |
2306 | clen = "" | |
2307 | } | |
2308 | } | |
2309 | if clen == "" && rws.handlerDone && bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) { | |
2310 | clen = strconv.Itoa(len(p)) | |
2311 | } | |
2312 | _, hasContentType := rws.snapHeader["Content-Type"] | |
2313 | if !hasContentType && bodyAllowedForStatus(rws.status) { | |
2314 | ctype = http.DetectContentType(p) | |
2315 | } | |
2316 | var date string | |
2317 | if _, ok := rws.snapHeader["Date"]; !ok { | |
2318 | // TODO(bradfitz): be faster here, like net/http? measure. | |
2319 | date = time.Now().UTC().Format(http.TimeFormat) | |
2320 | } | |
2321 | ||
2322 | for _, v := range rws.snapHeader["Trailer"] { | |
2323 | foreachHeaderElement(v, rws.declareTrailer) | |
2324 | } | |
2325 | ||
2326 | endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp | |
2327 | err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{ | |
2328 | streamID: rws.stream.id, | |
2329 | httpResCode: rws.status, | |
2330 | h: rws.snapHeader, | |
2331 | endStream: endStream, | |
2332 | contentType: ctype, | |
2333 | contentLength: clen, | |
2334 | date: date, | |
2335 | }) | |
2336 | if err != nil { | |
2337 | rws.dirty = true | |
2338 | return 0, err | |
2339 | } | |
2340 | if endStream { | |
2341 | return 0, nil | |
2342 | } | |
2343 | } | |
2344 | if isHeadResp { | |
2345 | return len(p), nil | |
2346 | } | |
2347 | if len(p) == 0 && !rws.handlerDone { | |
2348 | return 0, nil | |
2349 | } | |
2350 | ||
2351 | if rws.handlerDone { | |
2352 | rws.promoteUndeclaredTrailers() | |
2353 | } | |
2354 | ||
2355 | endStream := rws.handlerDone && !rws.hasTrailers() | |
2356 | if len(p) > 0 || endStream { | |
2357 | // only send a 0 byte DATA frame if we're ending the stream. | |
2358 | if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil { | |
2359 | rws.dirty = true | |
2360 | return 0, err | |
2361 | } | |
2362 | } | |
2363 | ||
2364 | if rws.handlerDone && rws.hasTrailers() { | |
2365 | err = rws.conn.writeHeaders(rws.stream, &writeResHeaders{ | |
2366 | streamID: rws.stream.id, | |
2367 | h: rws.handlerHeader, | |
2368 | trailers: rws.trailers, | |
2369 | endStream: true, | |
2370 | }) | |
2371 | if err != nil { | |
2372 | rws.dirty = true | |
2373 | } | |
2374 | return len(p), err | |
2375 | } | |
2376 | return len(p), nil | |
2377 | } | |
2378 | ||
2379 | // TrailerPrefix is a magic prefix for ResponseWriter.Header map keys | |
2380 | // that, if present, signals that the map entry is actually for | |
2381 | // the response trailers, and not the response headers. The prefix | |
2382 | // is stripped after the ServeHTTP call finishes and the values are | |
2383 | // sent in the trailers. | |
2384 | // | |
2385 | // This mechanism is intended only for trailers that are not known | |
2386 | // prior to the headers being written. If the set of trailers is fixed | |
2387 | // or known before the header is written, the normal Go trailers mechanism | |
2388 | // is preferred: | |
2389 | // https://golang.org/pkg/net/http/#ResponseWriter | |
2390 | // https://golang.org/pkg/net/http/#example_ResponseWriter_trailers | |
2391 | const TrailerPrefix = "Trailer:" | |
2392 | ||
2393 | // promoteUndeclaredTrailers permits http.Handlers to set trailers | |
2394 | // after the header has already been flushed. Because the Go | |
2395 | // ResponseWriter interface has no way to set Trailers (only the | |
2396 | // Header), and because we didn't want to expand the ResponseWriter | |
2397 | // interface, and because nobody used trailers, and because RFC 2616 | |
2398 | // says you SHOULD (but not must) predeclare any trailers in the | |
2399 | // header, the official ResponseWriter rules said trailers in Go must | |
2400 | // be predeclared, and then we reuse the same ResponseWriter.Header() | |
2401 | // map to mean both Headers and Trailers. When it's time to write the | |
2402 | // Trailers, we pick out the fields of Headers that were declared as | |
2403 | // trailers. That worked for a while, until we found the first major | |
2404 | // user of Trailers in the wild: gRPC (using them only over http2), | |
2405 | // and gRPC libraries permit setting trailers mid-stream without | |
2406 | // predeclarnig them. So: change of plans. We still permit the old | |
2407 | // way, but we also permit this hack: if a Header() key begins with | |
2408 | // "Trailer:", the suffix of that key is a Trailer. Because ':' is an | |
2409 | // invalid token byte anyway, there is no ambiguity. (And it's already | |
2410 | // filtered out) It's mildly hacky, but not terrible. | |
2411 | // | |
2412 | // This method runs after the Handler is done and promotes any Header | |
2413 | // fields to be trailers. | |
2414 | func (rws *responseWriterState) promoteUndeclaredTrailers() { | |
2415 | for k, vv := range rws.handlerHeader { | |
2416 | if !strings.HasPrefix(k, TrailerPrefix) { | |
2417 | continue | |
2418 | } | |
2419 | trailerKey := strings.TrimPrefix(k, TrailerPrefix) | |
2420 | rws.declareTrailer(trailerKey) | |
2421 | rws.handlerHeader[http.CanonicalHeaderKey(trailerKey)] = vv | |
2422 | } | |
2423 | ||
2424 | if len(rws.trailers) > 1 { | |
2425 | sorter := sorterPool.Get().(*sorter) | |
2426 | sorter.SortStrings(rws.trailers) | |
2427 | sorterPool.Put(sorter) | |
2428 | } | |
2429 | } | |
2430 | ||
2431 | func (w *responseWriter) Flush() { | |
2432 | rws := w.rws | |
2433 | if rws == nil { | |
2434 | panic("Header called after Handler finished") | |
2435 | } | |
2436 | if rws.bw.Buffered() > 0 { | |
2437 | if err := rws.bw.Flush(); err != nil { | |
2438 | // Ignore the error. The frame writer already knows. | |
2439 | return | |
2440 | } | |
2441 | } else { | |
2442 | // The bufio.Writer won't call chunkWriter.Write | |
2443 | // (writeChunk with zero bytes, so we have to do it | |
2444 | // ourselves to force the HTTP response header and/or | |
2445 | // final DATA frame (with END_STREAM) to be sent. | |
2446 | rws.writeChunk(nil) | |
2447 | } | |
2448 | } | |
2449 | ||
2450 | func (w *responseWriter) CloseNotify() <-chan bool { | |
2451 | rws := w.rws | |
2452 | if rws == nil { | |
2453 | panic("CloseNotify called after Handler finished") | |
2454 | } | |
2455 | rws.closeNotifierMu.Lock() | |
2456 | ch := rws.closeNotifierCh | |
2457 | if ch == nil { | |
2458 | ch = make(chan bool, 1) | |
2459 | rws.closeNotifierCh = ch | |
2460 | cw := rws.stream.cw | |
2461 | go func() { | |
2462 | cw.Wait() // wait for close | |
2463 | ch <- true | |
2464 | }() | |
2465 | } | |
2466 | rws.closeNotifierMu.Unlock() | |
2467 | return ch | |
2468 | } | |
2469 | ||
2470 | func (w *responseWriter) Header() http.Header { | |
2471 | rws := w.rws | |
2472 | if rws == nil { | |
2473 | panic("Header called after Handler finished") | |
2474 | } | |
2475 | if rws.handlerHeader == nil { | |
2476 | rws.handlerHeader = make(http.Header) | |
2477 | } | |
2478 | return rws.handlerHeader | |
2479 | } | |
2480 | ||
2481 | func (w *responseWriter) WriteHeader(code int) { | |
2482 | rws := w.rws | |
2483 | if rws == nil { | |
2484 | panic("WriteHeader called after Handler finished") | |
2485 | } | |
2486 | rws.writeHeader(code) | |
2487 | } | |
2488 | ||
2489 | func (rws *responseWriterState) writeHeader(code int) { | |
2490 | if !rws.wroteHeader { | |
2491 | rws.wroteHeader = true | |
2492 | rws.status = code | |
2493 | if len(rws.handlerHeader) > 0 { | |
2494 | rws.snapHeader = cloneHeader(rws.handlerHeader) | |
2495 | } | |
2496 | } | |
2497 | } | |
2498 | ||
2499 | func cloneHeader(h http.Header) http.Header { | |
2500 | h2 := make(http.Header, len(h)) | |
2501 | for k, vv := range h { | |
2502 | vv2 := make([]string, len(vv)) | |
2503 | copy(vv2, vv) | |
2504 | h2[k] = vv2 | |
2505 | } | |
2506 | return h2 | |
2507 | } | |
2508 | ||
2509 | // The Life Of A Write is like this: | |
2510 | // | |
2511 | // * Handler calls w.Write or w.WriteString -> | |
2512 | // * -> rws.bw (*bufio.Writer) -> | |
2513 | // * (Handler might call Flush) | |
2514 | // * -> chunkWriter{rws} | |
2515 | // * -> responseWriterState.writeChunk(p []byte) | |
2516 | // * -> responseWriterState.writeChunk (most of the magic; see comment there) | |
2517 | func (w *responseWriter) Write(p []byte) (n int, err error) { | |
2518 | return w.write(len(p), p, "") | |
2519 | } | |
2520 | ||
2521 | func (w *responseWriter) WriteString(s string) (n int, err error) { | |
2522 | return w.write(len(s), nil, s) | |
2523 | } | |
2524 | ||
2525 | // either dataB or dataS is non-zero. | |
2526 | func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) { | |
2527 | rws := w.rws | |
2528 | if rws == nil { | |
2529 | panic("Write called after Handler finished") | |
2530 | } | |
2531 | if !rws.wroteHeader { | |
2532 | w.WriteHeader(200) | |
2533 | } | |
2534 | if !bodyAllowedForStatus(rws.status) { | |
2535 | return 0, http.ErrBodyNotAllowed | |
2536 | } | |
2537 | rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) // only one can be set | |
2538 | if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen { | |
2539 | // TODO: send a RST_STREAM | |
2540 | return 0, errors.New("http2: handler wrote more than declared Content-Length") | |
2541 | } | |
2542 | ||
2543 | if dataB != nil { | |
2544 | return rws.bw.Write(dataB) | |
2545 | } else { | |
2546 | return rws.bw.WriteString(dataS) | |
2547 | } | |
2548 | } | |
2549 | ||
2550 | func (w *responseWriter) handlerDone() { | |
2551 | rws := w.rws | |
2552 | dirty := rws.dirty | |
2553 | rws.handlerDone = true | |
2554 | w.Flush() | |
2555 | w.rws = nil | |
2556 | if !dirty { | |
2557 | // Only recycle the pool if all prior Write calls to | |
2558 | // the serverConn goroutine completed successfully. If | |
2559 | // they returned earlier due to resets from the peer | |
2560 | // there might still be write goroutines outstanding | |
2561 | // from the serverConn referencing the rws memory. See | |
2562 | // issue 20704. | |
2563 | responseWriterStatePool.Put(rws) | |
2564 | } | |
2565 | } | |
2566 | ||
2567 | // Push errors. | |
2568 | var ( | |
2569 | ErrRecursivePush = errors.New("http2: recursive push not allowed") | |
2570 | ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS") | |
2571 | ) | |
2572 | ||
2573 | // pushOptions is the internal version of http.PushOptions, which we | |
2574 | // cannot include here because it's only defined in Go 1.8 and later. | |
2575 | type pushOptions struct { | |
2576 | Method string | |
2577 | Header http.Header | |
2578 | } | |
2579 | ||
2580 | func (w *responseWriter) push(target string, opts pushOptions) error { | |
2581 | st := w.rws.stream | |
2582 | sc := st.sc | |
2583 | sc.serveG.checkNotOn() | |
2584 | ||
2585 | // No recursive pushes: "PUSH_PROMISE frames MUST only be sent on a peer-initiated stream." | |
2586 | // http://tools.ietf.org/html/rfc7540#section-6.6 | |
2587 | if st.isPushed() { | |
2588 | return ErrRecursivePush | |
2589 | } | |
2590 | ||
2591 | // Default options. | |
2592 | if opts.Method == "" { | |
2593 | opts.Method = "GET" | |
2594 | } | |
2595 | if opts.Header == nil { | |
2596 | opts.Header = http.Header{} | |
2597 | } | |
2598 | wantScheme := "http" | |
2599 | if w.rws.req.TLS != nil { | |
2600 | wantScheme = "https" | |
2601 | } | |
2602 | ||
2603 | // Validate the request. | |
2604 | u, err := url.Parse(target) | |
2605 | if err != nil { | |
2606 | return err | |
2607 | } | |
2608 | if u.Scheme == "" { | |
2609 | if !strings.HasPrefix(target, "/") { | |
2610 | return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target) | |
2611 | } | |
2612 | u.Scheme = wantScheme | |
2613 | u.Host = w.rws.req.Host | |
2614 | } else { | |
2615 | if u.Scheme != wantScheme { | |
2616 | return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme) | |
2617 | } | |
2618 | if u.Host == "" { | |
2619 | return errors.New("URL must have a host") | |
2620 | } | |
2621 | } | |
2622 | for k := range opts.Header { | |
2623 | if strings.HasPrefix(k, ":") { | |
2624 | return fmt.Errorf("promised request headers cannot include pseudo header %q", k) | |
2625 | } | |
2626 | // These headers are meaningful only if the request has a body, | |
2627 | // but PUSH_PROMISE requests cannot have a body. | |
2628 | // http://tools.ietf.org/html/rfc7540#section-8.2 | |
2629 | // Also disallow Host, since the promised URL must be absolute. | |
2630 | switch strings.ToLower(k) { | |
2631 | case "content-length", "content-encoding", "trailer", "te", "expect", "host": | |
2632 | return fmt.Errorf("promised request headers cannot include %q", k) | |
2633 | } | |
2634 | } | |
2635 | if err := checkValidHTTP2RequestHeaders(opts.Header); err != nil { | |
2636 | return err | |
2637 | } | |
2638 | ||
2639 | // The RFC effectively limits promised requests to GET and HEAD: | |
2640 | // "Promised requests MUST be cacheable [GET, HEAD, or POST], and MUST be safe [GET or HEAD]" | |
2641 | // http://tools.ietf.org/html/rfc7540#section-8.2 | |
2642 | if opts.Method != "GET" && opts.Method != "HEAD" { | |
2643 | return fmt.Errorf("method %q must be GET or HEAD", opts.Method) | |
2644 | } | |
2645 | ||
2646 | msg := &startPushRequest{ | |
2647 | parent: st, | |
2648 | method: opts.Method, | |
2649 | url: u, | |
2650 | header: cloneHeader(opts.Header), | |
2651 | done: errChanPool.Get().(chan error), | |
2652 | } | |
2653 | ||
2654 | select { | |
2655 | case <-sc.doneServing: | |
2656 | return errClientDisconnected | |
2657 | case <-st.cw: | |
2658 | return errStreamClosed | |
2659 | case sc.serveMsgCh <- msg: | |
2660 | } | |
2661 | ||
2662 | select { | |
2663 | case <-sc.doneServing: | |
2664 | return errClientDisconnected | |
2665 | case <-st.cw: | |
2666 | return errStreamClosed | |
2667 | case err := <-msg.done: | |
2668 | errChanPool.Put(msg.done) | |
2669 | return err | |
2670 | } | |
2671 | } | |
2672 | ||
2673 | type startPushRequest struct { | |
2674 | parent *stream | |
2675 | method string | |
2676 | url *url.URL | |
2677 | header http.Header | |
2678 | done chan error | |
2679 | } | |
2680 | ||
2681 | func (sc *serverConn) startPush(msg *startPushRequest) { | |
2682 | sc.serveG.check() | |
2683 | ||
2684 | // http://tools.ietf.org/html/rfc7540#section-6.6. | |
2685 | // PUSH_PROMISE frames MUST only be sent on a peer-initiated stream that | |
2686 | // is in either the "open" or "half-closed (remote)" state. | |
2687 | if msg.parent.state != stateOpen && msg.parent.state != stateHalfClosedRemote { | |
2688 | // responseWriter.Push checks that the stream is peer-initiaed. | |
2689 | msg.done <- errStreamClosed | |
2690 | return | |
2691 | } | |
2692 | ||
2693 | // http://tools.ietf.org/html/rfc7540#section-6.6. | |
2694 | if !sc.pushEnabled { | |
2695 | msg.done <- http.ErrNotSupported | |
2696 | return | |
2697 | } | |
2698 | ||
2699 | // PUSH_PROMISE frames must be sent in increasing order by stream ID, so | |
2700 | // we allocate an ID for the promised stream lazily, when the PUSH_PROMISE | |
2701 | // is written. Once the ID is allocated, we start the request handler. | |
2702 | allocatePromisedID := func() (uint32, error) { | |
2703 | sc.serveG.check() | |
2704 | ||
2705 | // Check this again, just in case. Technically, we might have received | |
2706 | // an updated SETTINGS by the time we got around to writing this frame. | |
2707 | if !sc.pushEnabled { | |
2708 | return 0, http.ErrNotSupported | |
2709 | } | |
2710 | // http://tools.ietf.org/html/rfc7540#section-6.5.2. | |
2711 | if sc.curPushedStreams+1 > sc.clientMaxStreams { | |
2712 | return 0, ErrPushLimitReached | |
2713 | } | |
2714 | ||
2715 | // http://tools.ietf.org/html/rfc7540#section-5.1.1. | |
2716 | // Streams initiated by the server MUST use even-numbered identifiers. | |
2717 | // A server that is unable to establish a new stream identifier can send a GOAWAY | |
2718 | // frame so that the client is forced to open a new connection for new streams. | |
2719 | if sc.maxPushPromiseID+2 >= 1<<31 { | |
2720 | sc.startGracefulShutdownInternal() | |
2721 | return 0, ErrPushLimitReached | |
2722 | } | |
2723 | sc.maxPushPromiseID += 2 | |
2724 | promisedID := sc.maxPushPromiseID | |
2725 | ||
2726 | // http://tools.ietf.org/html/rfc7540#section-8.2. | |
2727 | // Strictly speaking, the new stream should start in "reserved (local)", then | |
2728 | // transition to "half closed (remote)" after sending the initial HEADERS, but | |
2729 | // we start in "half closed (remote)" for simplicity. | |
2730 | // See further comments at the definition of stateHalfClosedRemote. | |
2731 | promised := sc.newStream(promisedID, msg.parent.id, stateHalfClosedRemote) | |
2732 | rw, req, err := sc.newWriterAndRequestNoBody(promised, requestParam{ | |
2733 | method: msg.method, | |
2734 | scheme: msg.url.Scheme, | |
2735 | authority: msg.url.Host, | |
2736 | path: msg.url.RequestURI(), | |
2737 | header: cloneHeader(msg.header), // clone since handler runs concurrently with writing the PUSH_PROMISE | |
2738 | }) | |
2739 | if err != nil { | |
2740 | // Should not happen, since we've already validated msg.url. | |
2741 | panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err)) | |
2742 | } | |
2743 | ||
2744 | go sc.runHandler(rw, req, sc.handler.ServeHTTP) | |
2745 | return promisedID, nil | |
2746 | } | |
2747 | ||
2748 | sc.writeFrame(FrameWriteRequest{ | |
2749 | write: &writePushPromise{ | |
2750 | streamID: msg.parent.id, | |
2751 | method: msg.method, | |
2752 | url: msg.url, | |
2753 | h: msg.header, | |
2754 | allocatePromisedID: allocatePromisedID, | |
2755 | }, | |
2756 | stream: msg.parent, | |
2757 | done: msg.done, | |
2758 | }) | |
2759 | } | |
2760 | ||
2761 | // foreachHeaderElement splits v according to the "#rule" construction | |
2762 | // in RFC 2616 section 2.1 and calls fn for each non-empty element. | |
2763 | func foreachHeaderElement(v string, fn func(string)) { | |
2764 | v = textproto.TrimString(v) | |
2765 | if v == "" { | |
2766 | return | |
2767 | } | |
2768 | if !strings.Contains(v, ",") { | |
2769 | fn(v) | |
2770 | return | |
2771 | } | |
2772 | for _, f := range strings.Split(v, ",") { | |
2773 | if f = textproto.TrimString(f); f != "" { | |
2774 | fn(f) | |
2775 | } | |
2776 | } | |
2777 | } | |
2778 | ||
2779 | // From http://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2.2 | |
2780 | var connHeaders = []string{ | |
2781 | "Connection", | |
2782 | "Keep-Alive", | |
2783 | "Proxy-Connection", | |
2784 | "Transfer-Encoding", | |
2785 | "Upgrade", | |
2786 | } | |
2787 | ||
2788 | // checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request, | |
2789 | // per RFC 7540 Section 8.1.2.2. | |
2790 | // The returned error is reported to users. | |
2791 | func checkValidHTTP2RequestHeaders(h http.Header) error { | |
2792 | for _, k := range connHeaders { | |
2793 | if _, ok := h[k]; ok { | |
2794 | return fmt.Errorf("request header %q is not valid in HTTP/2", k) | |
2795 | } | |
2796 | } | |
2797 | te := h["Te"] | |
2798 | if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) { | |
2799 | return errors.New(`request header "TE" may only be "trailers" in HTTP/2`) | |
2800 | } | |
2801 | return nil | |
2802 | } | |
2803 | ||
2804 | func new400Handler(err error) http.HandlerFunc { | |
2805 | return func(w http.ResponseWriter, r *http.Request) { | |
2806 | http.Error(w, err.Error(), http.StatusBadRequest) | |
2807 | } | |
2808 | } | |
2809 | ||
2810 | // ValidTrailerHeader reports whether name is a valid header field name to appear | |
2811 | // in trailers. | |
2812 | // See: http://tools.ietf.org/html/rfc7230#section-4.1.2 | |
2813 | func ValidTrailerHeader(name string) bool { | |
2814 | name = http.CanonicalHeaderKey(name) | |
2815 | if strings.HasPrefix(name, "If-") || badTrailer[name] { | |
2816 | return false | |
2817 | } | |
2818 | return true | |
2819 | } | |
2820 | ||
2821 | var badTrailer = map[string]bool{ | |
2822 | "Authorization": true, | |
2823 | "Cache-Control": true, | |
2824 | "Connection": true, | |
2825 | "Content-Encoding": true, | |
2826 | "Content-Length": true, | |
2827 | "Content-Range": true, | |
2828 | "Content-Type": true, | |
2829 | "Expect": true, | |
2830 | "Host": true, | |
2831 | "Keep-Alive": true, | |
2832 | "Max-Forwards": true, | |
2833 | "Pragma": true, | |
2834 | "Proxy-Authenticate": true, | |
2835 | "Proxy-Authorization": true, | |
2836 | "Proxy-Connection": true, | |
2837 | "Range": true, | |
2838 | "Realm": true, | |
2839 | "Te": true, | |
2840 | "Trailer": true, | |
2841 | "Transfer-Encoding": true, | |
2842 | "Www-Authenticate": true, | |
2843 | } | |
2844 | ||
2845 | // h1ServerKeepAlivesDisabled reports whether hs has its keep-alives | |
2846 | // disabled. See comments on h1ServerShutdownChan above for why | |
2847 | // the code is written this way. | |
2848 | func h1ServerKeepAlivesDisabled(hs *http.Server) bool { | |
2849 | var x interface{} = hs | |
2850 | type I interface { | |
2851 | doKeepAlives() bool | |
2852 | } | |
2853 | if hs, ok := x.(I); ok { | |
2854 | return !hs.doKeepAlives() | |
2855 | } | |
2856 | return false | |
2857 | } |