]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blame - vendor/google.golang.org/grpc/transport/http2_server.go
deps: github.com/hashicorp/terraform@sdk-v0.11-with-go-modules
[github/fretlink/terraform-provider-statuscake.git] / vendor / google.golang.org / grpc / transport / http2_server.go
CommitLineData
15c0b25d
AP
1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "errors"
24 "io"
25 "math"
26 "math/rand"
27 "net"
28 "strconv"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "github.com/golang/protobuf/proto"
34 "golang.org/x/net/context"
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/credentials"
39 "google.golang.org/grpc/keepalive"
40 "google.golang.org/grpc/metadata"
41 "google.golang.org/grpc/peer"
42 "google.golang.org/grpc/stats"
43 "google.golang.org/grpc/status"
44 "google.golang.org/grpc/tap"
45)
46
47// ErrIllegalHeaderWrite indicates that setting header is illegal because of
48// the stream's state.
49var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
50
51// http2Server implements the ServerTransport interface with HTTP2.
52type http2Server struct {
53 ctx context.Context
54 conn net.Conn
55 remoteAddr net.Addr
56 localAddr net.Addr
57 maxStreamID uint32 // max stream ID ever seen
58 authInfo credentials.AuthInfo // auth info about the connection
59 inTapHandle tap.ServerInHandle
60 // writableChan synchronizes write access to the transport.
61 // A writer acquires the write lock by receiving a value on writableChan
62 // and releases it by sending on writableChan.
63 writableChan chan int
64 // shutdownChan is closed when Close is called.
65 // Blocking operations should select on shutdownChan to avoid
66 // blocking forever after Close.
67 shutdownChan chan struct{}
68 framer *framer
69 hBuf *bytes.Buffer // the buffer for HPACK encoding
70 hEnc *hpack.Encoder // HPACK encoder
71 // The max number of concurrent streams.
72 maxStreams uint32
73 // controlBuf delivers all the control related tasks (e.g., window
74 // updates, reset streams, and various settings) to the controller.
75 controlBuf *controlBuffer
76 fc *inFlow
77 // sendQuotaPool provides flow control to outbound message.
78 sendQuotaPool *quotaPool
79 stats stats.Handler
80 // Flag to keep track of reading activity on transport.
81 // 1 is true and 0 is false.
82 activity uint32 // Accessed atomically.
83 // Keepalive and max-age parameters for the server.
84 kp keepalive.ServerParameters
85
86 // Keepalive enforcement policy.
87 kep keepalive.EnforcementPolicy
88 // The time instance last ping was received.
89 lastPingAt time.Time
90 // Number of times the client has violated keepalive ping policy so far.
91 pingStrikes uint8
92 // Flag to signify that number of ping strikes should be reset to 0.
93 // This is set whenever data or header frames are sent.
94 // 1 means yes.
95 resetPingStrikes uint32 // Accessed atomically.
96 initialWindowSize int32
97 bdpEst *bdpEstimator
98
99 outQuotaVersion uint32
100
101 mu sync.Mutex // guard the following
102
103 // drainChan is initialized when drain(...) is called the first time.
104 // After which the server writes out the first GoAway(with ID 2^31-1) frame.
105 // Then an independent goroutine will be launched to later send the second GoAway.
106 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
107 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
108 // already underway.
109 drainChan chan struct{}
110 state transportState
111 activeStreams map[uint32]*Stream
112 // the per-stream outbound flow control window size set by the peer.
113 streamSendQuota uint32
114 // idle is the time instant when the connection went idle.
115 // This is either the begining of the connection or when the number of
116 // RPCs go down to 0.
117 // When the connection is busy, this value is set to 0.
118 idle time.Time
119}
120
121// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
122// returned if something goes wrong.
123func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
124 framer := newFramer(conn)
125 // Send initial settings as connection preface to client.
126 var isettings []http2.Setting
127 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
128 // permitted in the HTTP2 spec.
129 maxStreams := config.MaxStreams
130 if maxStreams == 0 {
131 maxStreams = math.MaxUint32
132 } else {
133 isettings = append(isettings, http2.Setting{
134 ID: http2.SettingMaxConcurrentStreams,
135 Val: maxStreams,
136 })
137 }
138 dynamicWindow := true
139 iwz := int32(initialWindowSize)
140 if config.InitialWindowSize >= defaultWindowSize {
141 iwz = config.InitialWindowSize
142 dynamicWindow = false
143 }
144 icwz := int32(initialWindowSize)
145 if config.InitialConnWindowSize >= defaultWindowSize {
146 icwz = config.InitialConnWindowSize
147 dynamicWindow = false
148 }
149 if iwz != defaultWindowSize {
150 isettings = append(isettings, http2.Setting{
151 ID: http2.SettingInitialWindowSize,
152 Val: uint32(iwz)})
153 }
154 if err := framer.writeSettings(true, isettings...); err != nil {
155 return nil, connectionErrorf(true, err, "transport: %v", err)
156 }
157 // Adjust the connection flow control window if needed.
158 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
159 if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
160 return nil, connectionErrorf(true, err, "transport: %v", err)
161 }
162 }
163 kp := config.KeepaliveParams
164 if kp.MaxConnectionIdle == 0 {
165 kp.MaxConnectionIdle = defaultMaxConnectionIdle
166 }
167 if kp.MaxConnectionAge == 0 {
168 kp.MaxConnectionAge = defaultMaxConnectionAge
169 }
170 // Add a jitter to MaxConnectionAge.
171 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
172 if kp.MaxConnectionAgeGrace == 0 {
173 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
174 }
175 if kp.Time == 0 {
176 kp.Time = defaultServerKeepaliveTime
177 }
178 if kp.Timeout == 0 {
179 kp.Timeout = defaultServerKeepaliveTimeout
180 }
181 kep := config.KeepalivePolicy
182 if kep.MinTime == 0 {
183 kep.MinTime = defaultKeepalivePolicyMinTime
184 }
185 var buf bytes.Buffer
186 t := &http2Server{
187 ctx: context.Background(),
188 conn: conn,
189 remoteAddr: conn.RemoteAddr(),
190 localAddr: conn.LocalAddr(),
191 authInfo: config.AuthInfo,
192 framer: framer,
193 hBuf: &buf,
194 hEnc: hpack.NewEncoder(&buf),
195 maxStreams: maxStreams,
196 inTapHandle: config.InTapHandle,
197 controlBuf: newControlBuffer(),
198 fc: &inFlow{limit: uint32(icwz)},
199 sendQuotaPool: newQuotaPool(defaultWindowSize),
200 state: reachable,
201 writableChan: make(chan int, 1),
202 shutdownChan: make(chan struct{}),
203 activeStreams: make(map[uint32]*Stream),
204 streamSendQuota: defaultWindowSize,
205 stats: config.StatsHandler,
206 kp: kp,
207 idle: time.Now(),
208 kep: kep,
209 initialWindowSize: iwz,
210 }
211 if dynamicWindow {
212 t.bdpEst = &bdpEstimator{
213 bdp: initialWindowSize,
214 updateFlowControl: t.updateFlowControl,
215 }
216 }
217 if t.stats != nil {
218 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
219 RemoteAddr: t.remoteAddr,
220 LocalAddr: t.localAddr,
221 })
222 connBegin := &stats.ConnBegin{}
223 t.stats.HandleConn(t.ctx, connBegin)
224 }
225 go t.controller()
226 go t.keepalive()
227 t.writableChan <- 0
228 return t, nil
229}
230
231// operateHeader takes action on the decoded headers.
232func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
233 buf := newRecvBuffer()
234 s := &Stream{
235 id: frame.Header().StreamID,
236 st: t,
237 buf: buf,
238 fc: &inFlow{limit: uint32(t.initialWindowSize)},
239 }
240
241 var state decodeState
242 for _, hf := range frame.Fields {
243 if err := state.processHeaderField(hf); err != nil {
244 if se, ok := err.(StreamError); ok {
245 t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
246 }
247 return
248 }
249 }
250
251 if frame.StreamEnded() {
252 // s is just created by the caller. No lock needed.
253 s.state = streamReadDone
254 }
255 s.recvCompress = state.encoding
256 if state.timeoutSet {
257 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
258 } else {
259 s.ctx, s.cancel = context.WithCancel(t.ctx)
260 }
261 pr := &peer.Peer{
262 Addr: t.remoteAddr,
263 }
264 // Attach Auth info if there is any.
265 if t.authInfo != nil {
266 pr.AuthInfo = t.authInfo
267 }
268 s.ctx = peer.NewContext(s.ctx, pr)
269 // Cache the current stream to the context so that the server application
270 // can find out. Required when the server wants to send some metadata
271 // back to the client (unary call only).
272 s.ctx = newContextWithStream(s.ctx, s)
273 // Attach the received metadata to the context.
274 if len(state.mdata) > 0 {
275 s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
276 }
277 s.trReader = &transportReader{
278 reader: &recvBufferReader{
279 ctx: s.ctx,
280 recv: s.buf,
281 },
282 windowHandler: func(n int) {
283 t.updateWindow(s, uint32(n))
284 },
285 }
286 s.recvCompress = state.encoding
287 s.method = state.method
288 if t.inTapHandle != nil {
289 var err error
290 info := &tap.Info{
291 FullMethodName: state.method,
292 }
293 s.ctx, err = t.inTapHandle(s.ctx, info)
294 if err != nil {
295 warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
296 t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
297 return
298 }
299 }
300 t.mu.Lock()
301 if t.state != reachable {
302 t.mu.Unlock()
303 return
304 }
305 if uint32(len(t.activeStreams)) >= t.maxStreams {
306 t.mu.Unlock()
307 t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
308 return
309 }
310 if s.id%2 != 1 || s.id <= t.maxStreamID {
311 t.mu.Unlock()
312 // illegal gRPC stream id.
313 errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", s.id)
314 return true
315 }
316 t.maxStreamID = s.id
317 s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
318 t.activeStreams[s.id] = s
319 if len(t.activeStreams) == 1 {
320 t.idle = time.Time{}
321 }
322 t.mu.Unlock()
323 s.requestRead = func(n int) {
324 t.adjustWindow(s, uint32(n))
325 }
326 s.ctx = traceCtx(s.ctx, s.method)
327 if t.stats != nil {
328 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
329 inHeader := &stats.InHeader{
330 FullMethod: s.method,
331 RemoteAddr: t.remoteAddr,
332 LocalAddr: t.localAddr,
333 Compression: s.recvCompress,
334 WireLength: int(frame.Header().Length),
335 }
336 t.stats.HandleRPC(s.ctx, inHeader)
337 }
338 handle(s)
339 return
340}
341
342// HandleStreams receives incoming streams using the given handler. This is
343// typically run in a separate goroutine.
344// traceCtx attaches trace to ctx and returns the new context.
345func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
346 // Check the validity of client preface.
347 preface := make([]byte, len(clientPreface))
348 if _, err := io.ReadFull(t.conn, preface); err != nil {
349 // Only log if it isn't a simple tcp accept check (ie: tcp balancer doing open/close socket)
350 if err != io.EOF {
351 errorf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
352 }
353 t.Close()
354 return
355 }
356 if !bytes.Equal(preface, clientPreface) {
357 errorf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
358 t.Close()
359 return
360 }
361
362 frame, err := t.framer.readFrame()
363 if err == io.EOF || err == io.ErrUnexpectedEOF {
364 t.Close()
365 return
366 }
367 if err != nil {
368 errorf("transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
369 t.Close()
370 return
371 }
372 atomic.StoreUint32(&t.activity, 1)
373 sf, ok := frame.(*http2.SettingsFrame)
374 if !ok {
375 errorf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
376 t.Close()
377 return
378 }
379 t.handleSettings(sf)
380
381 for {
382 frame, err := t.framer.readFrame()
383 atomic.StoreUint32(&t.activity, 1)
384 if err != nil {
385 if se, ok := err.(http2.StreamError); ok {
386 t.mu.Lock()
387 s := t.activeStreams[se.StreamID]
388 t.mu.Unlock()
389 if s != nil {
390 t.closeStream(s)
391 }
392 t.controlBuf.put(&resetStream{se.StreamID, se.Code})
393 continue
394 }
395 if err == io.EOF || err == io.ErrUnexpectedEOF {
396 t.Close()
397 return
398 }
399 warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
400 t.Close()
401 return
402 }
403 switch frame := frame.(type) {
404 case *http2.MetaHeadersFrame:
405 if t.operateHeaders(frame, handle, traceCtx) {
406 t.Close()
407 break
408 }
409 case *http2.DataFrame:
410 t.handleData(frame)
411 case *http2.RSTStreamFrame:
412 t.handleRSTStream(frame)
413 case *http2.SettingsFrame:
414 t.handleSettings(frame)
415 case *http2.PingFrame:
416 t.handlePing(frame)
417 case *http2.WindowUpdateFrame:
418 t.handleWindowUpdate(frame)
419 case *http2.GoAwayFrame:
420 // TODO: Handle GoAway from the client appropriately.
421 default:
422 errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
423 }
424 }
425}
426
427func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
428 t.mu.Lock()
429 defer t.mu.Unlock()
430 if t.activeStreams == nil {
431 // The transport is closing.
432 return nil, false
433 }
434 s, ok := t.activeStreams[f.Header().StreamID]
435 if !ok {
436 // The stream is already done.
437 return nil, false
438 }
439 return s, true
440}
441
442// adjustWindow sends out extra window update over the initial window size
443// of stream if the application is requesting data larger in size than
444// the window.
445func (t *http2Server) adjustWindow(s *Stream, n uint32) {
446 s.mu.Lock()
447 defer s.mu.Unlock()
448 if s.state == streamDone {
449 return
450 }
451 if w := s.fc.maybeAdjust(n); w > 0 {
452 if cw := t.fc.resetPendingUpdate(); cw > 0 {
453 t.controlBuf.put(&windowUpdate{0, cw, false})
454 }
455 t.controlBuf.put(&windowUpdate{s.id, w, true})
456 }
457}
458
459// updateWindow adjusts the inbound quota for the stream and the transport.
460// Window updates will deliver to the controller for sending when
461// the cumulative quota exceeds the corresponding threshold.
462func (t *http2Server) updateWindow(s *Stream, n uint32) {
463 s.mu.Lock()
464 defer s.mu.Unlock()
465 if s.state == streamDone {
466 return
467 }
468 if w := s.fc.onRead(n); w > 0 {
469 if cw := t.fc.resetPendingUpdate(); cw > 0 {
470 t.controlBuf.put(&windowUpdate{0, cw, false})
471 }
472 t.controlBuf.put(&windowUpdate{s.id, w, true})
473 }
474}
475
476// updateFlowControl updates the incoming flow control windows
477// for the transport and the stream based on the current bdp
478// estimation.
479func (t *http2Server) updateFlowControl(n uint32) {
480 t.mu.Lock()
481 for _, s := range t.activeStreams {
482 s.fc.newLimit(n)
483 }
484 t.initialWindowSize = int32(n)
485 t.mu.Unlock()
486 t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false})
487 t.controlBuf.put(&settings{
488 ack: false,
489 ss: []http2.Setting{
490 {
491 ID: http2.SettingInitialWindowSize,
492 Val: uint32(n),
493 },
494 },
495 })
496
497}
498
499func (t *http2Server) handleData(f *http2.DataFrame) {
500 size := f.Header().Length
501 var sendBDPPing bool
502 if t.bdpEst != nil {
503 sendBDPPing = t.bdpEst.add(uint32(size))
504 }
505 // Decouple connection's flow control from application's read.
506 // An update on connection's flow control should not depend on
507 // whether user application has read the data or not. Such a
508 // restriction is already imposed on the stream's flow control,
509 // and therefore the sender will be blocked anyways.
510 // Decoupling the connection flow control will prevent other
511 // active(fast) streams from starving in presence of slow or
512 // inactive streams.
513 //
514 // Furthermore, if a bdpPing is being sent out we can piggyback
515 // connection's window update for the bytes we just received.
516 if sendBDPPing {
517 t.controlBuf.put(&windowUpdate{0, uint32(size), false})
518 t.controlBuf.put(bdpPing)
519 } else {
520 if err := t.fc.onData(uint32(size)); err != nil {
521 errorf("transport: http2Server %v", err)
522 t.Close()
523 return
524 }
525 if w := t.fc.onRead(uint32(size)); w > 0 {
526 t.controlBuf.put(&windowUpdate{0, w, true})
527 }
528 }
529 // Select the right stream to dispatch.
530 s, ok := t.getStream(f)
531 if !ok {
532 return
533 }
534 if size > 0 {
535 s.mu.Lock()
536 if s.state == streamDone {
537 s.mu.Unlock()
538 return
539 }
540 if err := s.fc.onData(uint32(size)); err != nil {
541 s.mu.Unlock()
542 t.closeStream(s)
543 t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
544 return
545 }
546 if f.Header().Flags.Has(http2.FlagDataPadded) {
547 if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
548 t.controlBuf.put(&windowUpdate{s.id, w, true})
549 }
550 }
551 s.mu.Unlock()
552 // TODO(bradfitz, zhaoq): A copy is required here because there is no
553 // guarantee f.Data() is consumed before the arrival of next frame.
554 // Can this copy be eliminated?
555 if len(f.Data()) > 0 {
556 data := make([]byte, len(f.Data()))
557 copy(data, f.Data())
558 s.write(recvMsg{data: data})
559 }
560 }
561 if f.Header().Flags.Has(http2.FlagDataEndStream) {
562 // Received the end of stream from the client.
563 s.mu.Lock()
564 if s.state != streamDone {
565 s.state = streamReadDone
566 }
567 s.mu.Unlock()
568 s.write(recvMsg{err: io.EOF})
569 }
570}
571
572func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
573 s, ok := t.getStream(f)
574 if !ok {
575 return
576 }
577 t.closeStream(s)
578}
579
580func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
581 if f.IsAck() {
582 return
583 }
584 var ss []http2.Setting
585 f.ForeachSetting(func(s http2.Setting) error {
586 ss = append(ss, s)
587 return nil
588 })
589 // The settings will be applied once the ack is sent.
590 t.controlBuf.put(&settings{ack: true, ss: ss})
591}
592
593const (
594 maxPingStrikes = 2
595 defaultPingTimeout = 2 * time.Hour
596)
597
598func (t *http2Server) handlePing(f *http2.PingFrame) {
599 if f.IsAck() {
600 if f.Data == goAwayPing.data && t.drainChan != nil {
601 close(t.drainChan)
602 return
603 }
604 // Maybe it's a BDP ping.
605 if t.bdpEst != nil {
606 t.bdpEst.calculate(f.Data)
607 }
608 return
609 }
610 pingAck := &ping{ack: true}
611 copy(pingAck.data[:], f.Data[:])
612 t.controlBuf.put(pingAck)
613
614 now := time.Now()
615 defer func() {
616 t.lastPingAt = now
617 }()
618 // A reset ping strikes means that we don't need to check for policy
619 // violation for this ping and the pingStrikes counter should be set
620 // to 0.
621 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
622 t.pingStrikes = 0
623 return
624 }
625 t.mu.Lock()
626 ns := len(t.activeStreams)
627 t.mu.Unlock()
628 if ns < 1 && !t.kep.PermitWithoutStream {
629 // Keepalive shouldn't be active thus, this new ping should
630 // have come after atleast defaultPingTimeout.
631 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
632 t.pingStrikes++
633 }
634 } else {
635 // Check if keepalive policy is respected.
636 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
637 t.pingStrikes++
638 }
639 }
640
641 if t.pingStrikes > maxPingStrikes {
642 // Send goaway and close the connection.
643 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
644 }
645}
646
647func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
648 id := f.Header().StreamID
649 incr := f.Increment
650 if id == 0 {
651 t.sendQuotaPool.add(int(incr))
652 return
653 }
654 if s, ok := t.getStream(f); ok {
655 s.sendQuotaPool.add(int(incr))
656 }
657}
658
659func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error {
660 first := true
661 endHeaders := false
662 var err error
663 defer func() {
664 if err == nil {
665 // Reset ping strikes when seding headers since that might cause the
666 // peer to send ping.
667 atomic.StoreUint32(&t.resetPingStrikes, 1)
668 }
669 }()
670 // Sends the headers in a single batch.
671 for !endHeaders {
672 size := t.hBuf.Len()
673 if size > http2MaxFrameLen {
674 size = http2MaxFrameLen
675 } else {
676 endHeaders = true
677 }
678 if first {
679 p := http2.HeadersFrameParam{
680 StreamID: s.id,
681 BlockFragment: b.Next(size),
682 EndStream: endStream,
683 EndHeaders: endHeaders,
684 }
685 err = t.framer.writeHeaders(endHeaders, p)
686 first = false
687 } else {
688 err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size))
689 }
690 if err != nil {
691 t.Close()
692 return connectionErrorf(true, err, "transport: %v", err)
693 }
694 }
695 return nil
696}
697
698// WriteHeader sends the header metedata md back to the client.
699func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
700 s.mu.Lock()
701 if s.headerOk || s.state == streamDone {
702 s.mu.Unlock()
703 return ErrIllegalHeaderWrite
704 }
705 s.headerOk = true
706 if md.Len() > 0 {
707 if s.header.Len() > 0 {
708 s.header = metadata.Join(s.header, md)
709 } else {
710 s.header = md
711 }
712 }
713 md = s.header
714 s.mu.Unlock()
715 if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
716 return err
717 }
718 t.hBuf.Reset()
719 t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
720 t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
721 if s.sendCompress != "" {
722 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
723 }
724 for k, vv := range md {
725 if isReservedHeader(k) {
726 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
727 continue
728 }
729 for _, v := range vv {
730 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
731 }
732 }
733 bufLen := t.hBuf.Len()
734 if err := t.writeHeaders(s, t.hBuf, false); err != nil {
735 return err
736 }
737 if t.stats != nil {
738 outHeader := &stats.OutHeader{
739 WireLength: bufLen,
740 }
741 t.stats.HandleRPC(s.Context(), outHeader)
742 }
743 t.writableChan <- 0
744 return nil
745}
746
747// WriteStatus sends stream status to the client and terminates the stream.
748// There is no further I/O operations being able to perform on this stream.
749// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
750// OK is adopted.
751func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
752 var headersSent, hasHeader bool
753 s.mu.Lock()
754 if s.state == streamDone {
755 s.mu.Unlock()
756 return nil
757 }
758 if s.headerOk {
759 headersSent = true
760 }
761 if s.header.Len() > 0 {
762 hasHeader = true
763 }
764 s.mu.Unlock()
765
766 if !headersSent && hasHeader {
767 t.WriteHeader(s, nil)
768 headersSent = true
769 }
770
771 if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
772 return err
773 }
774 t.hBuf.Reset()
775 if !headersSent {
776 t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
777 t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
778 }
779 t.hEnc.WriteField(
780 hpack.HeaderField{
781 Name: "grpc-status",
782 Value: strconv.Itoa(int(st.Code())),
783 })
784 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
785
786 if p := st.Proto(); p != nil && len(p.Details) > 0 {
787 stBytes, err := proto.Marshal(p)
788 if err != nil {
789 // TODO: return error instead, when callers are able to handle it.
790 panic(err)
791 }
792
793 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
794 }
795
796 // Attach the trailer metadata.
797 for k, vv := range s.trailer {
798 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
799 if isReservedHeader(k) {
800 continue
801 }
802 for _, v := range vv {
803 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
804 }
805 }
806 bufLen := t.hBuf.Len()
807 if err := t.writeHeaders(s, t.hBuf, true); err != nil {
808 t.Close()
809 return err
810 }
811 if t.stats != nil {
812 outTrailer := &stats.OutTrailer{
813 WireLength: bufLen,
814 }
815 t.stats.HandleRPC(s.Context(), outTrailer)
816 }
817 t.closeStream(s)
818 t.writableChan <- 0
819 return nil
820}
821
822// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
823// is returns if it fails (e.g., framing error, transport error).
824func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
825 // TODO(zhaoq): Support multi-writers for a single stream.
826 var writeHeaderFrame bool
827 s.mu.Lock()
828 if s.state == streamDone {
829 s.mu.Unlock()
830 return streamErrorf(codes.Unknown, "the stream has been done")
831 }
832 if !s.headerOk {
833 writeHeaderFrame = true
834 }
835 s.mu.Unlock()
836 if writeHeaderFrame {
837 t.WriteHeader(s, nil)
838 }
839 r := bytes.NewBuffer(data)
840 var (
841 p []byte
842 oqv uint32
843 )
844 for {
845 if r.Len() == 0 && p == nil {
846 return nil
847 }
848 oqv = atomic.LoadUint32(&t.outQuotaVersion)
849 size := http2MaxFrameLen
850 // Wait until the stream has some quota to send the data.
851 sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
852 if err != nil {
853 return err
854 }
855 // Wait until the transport has some quota to send the data.
856 tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
857 if err != nil {
858 return err
859 }
860 if sq < size {
861 size = sq
862 }
863 if tq < size {
864 size = tq
865 }
866 if p == nil {
867 p = r.Next(size)
868 }
869 ps := len(p)
870 if ps < sq {
871 // Overbooked stream quota. Return it back.
872 s.sendQuotaPool.add(sq - ps)
873 }
874 if ps < tq {
875 // Overbooked transport quota. Return it back.
876 t.sendQuotaPool.add(tq - ps)
877 }
878 t.framer.adjustNumWriters(1)
879 // Got some quota. Try to acquire writing privilege on the
880 // transport.
881 if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
882 if _, ok := err.(StreamError); ok {
883 // Return the connection quota back.
884 t.sendQuotaPool.add(ps)
885 }
886 if t.framer.adjustNumWriters(-1) == 0 {
887 // This writer is the last one in this batch and has the
888 // responsibility to flush the buffered frames. It queues
889 // a flush request to controlBuf instead of flushing directly
890 // in order to avoid the race with other writing or flushing.
891 t.controlBuf.put(&flushIO{})
892 }
893 return err
894 }
895 select {
896 case <-s.ctx.Done():
897 t.sendQuotaPool.add(ps)
898 if t.framer.adjustNumWriters(-1) == 0 {
899 t.controlBuf.put(&flushIO{})
900 }
901 t.writableChan <- 0
902 return ContextErr(s.ctx.Err())
903 default:
904 }
905 if oqv != atomic.LoadUint32(&t.outQuotaVersion) {
906 // InitialWindowSize settings frame must have been received after we
907 // acquired send quota but before we got the writable channel.
908 // We must forsake this write.
909 t.sendQuotaPool.add(ps)
910 s.sendQuotaPool.add(ps)
911 if t.framer.adjustNumWriters(-1) == 0 {
912 t.controlBuf.put(&flushIO{})
913 }
914 t.writableChan <- 0
915 continue
916 }
917 var forceFlush bool
918 if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last {
919 forceFlush = true
920 }
921 // Reset ping strikes when sending data since this might cause
922 // the peer to send ping.
923 atomic.StoreUint32(&t.resetPingStrikes, 1)
924 if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
925 t.Close()
926 return connectionErrorf(true, err, "transport: %v", err)
927 }
928 p = nil
929 if t.framer.adjustNumWriters(-1) == 0 {
930 t.framer.flushWrite()
931 }
932 t.writableChan <- 0
933 }
934
935}
936
937func (t *http2Server) applySettings(ss []http2.Setting) {
938 for _, s := range ss {
939 if s.ID == http2.SettingInitialWindowSize {
940 t.mu.Lock()
941 defer t.mu.Unlock()
942 for _, stream := range t.activeStreams {
943 stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota))
944 }
945 t.streamSendQuota = s.Val
946 atomic.AddUint32(&t.outQuotaVersion, 1)
947 }
948
949 }
950}
951
952// keepalive running in a separate goroutine does the following:
953// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
954// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
955// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
956// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
957// after an additional duration of keepalive.Timeout.
958func (t *http2Server) keepalive() {
959 p := &ping{}
960 var pingSent bool
961 maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
962 maxAge := time.NewTimer(t.kp.MaxConnectionAge)
963 keepalive := time.NewTimer(t.kp.Time)
964 // NOTE: All exit paths of this function should reset their
965 // respecitve timers. A failure to do so will cause the
966 // following clean-up to deadlock and eventually leak.
967 defer func() {
968 if !maxIdle.Stop() {
969 <-maxIdle.C
970 }
971 if !maxAge.Stop() {
972 <-maxAge.C
973 }
974 if !keepalive.Stop() {
975 <-keepalive.C
976 }
977 }()
978 for {
979 select {
980 case <-maxIdle.C:
981 t.mu.Lock()
982 idle := t.idle
983 if idle.IsZero() { // The connection is non-idle.
984 t.mu.Unlock()
985 maxIdle.Reset(t.kp.MaxConnectionIdle)
986 continue
987 }
988 val := t.kp.MaxConnectionIdle - time.Since(idle)
989 t.mu.Unlock()
990 if val <= 0 {
991 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
992 // Gracefully close the connection.
993 t.drain(http2.ErrCodeNo, []byte{})
994 // Reseting the timer so that the clean-up doesn't deadlock.
995 maxIdle.Reset(infinity)
996 return
997 }
998 maxIdle.Reset(val)
999 case <-maxAge.C:
1000 t.drain(http2.ErrCodeNo, []byte{})
1001 maxAge.Reset(t.kp.MaxConnectionAgeGrace)
1002 select {
1003 case <-maxAge.C:
1004 // Close the connection after grace period.
1005 t.Close()
1006 // Reseting the timer so that the clean-up doesn't deadlock.
1007 maxAge.Reset(infinity)
1008 case <-t.shutdownChan:
1009 }
1010 return
1011 case <-keepalive.C:
1012 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1013 pingSent = false
1014 keepalive.Reset(t.kp.Time)
1015 continue
1016 }
1017 if pingSent {
1018 t.Close()
1019 // Reseting the timer so that the clean-up doesn't deadlock.
1020 keepalive.Reset(infinity)
1021 return
1022 }
1023 pingSent = true
1024 t.controlBuf.put(p)
1025 keepalive.Reset(t.kp.Timeout)
1026 case <-t.shutdownChan:
1027 return
1028 }
1029 }
1030}
1031
1032var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1033
1034// controller running in a separate goroutine takes charge of sending control
1035// frames (e.g., window update, reset stream, setting, etc.) to the server.
1036func (t *http2Server) controller() {
1037 for {
1038 select {
1039 case i := <-t.controlBuf.get():
1040 t.controlBuf.load()
1041 select {
1042 case <-t.writableChan:
1043 switch i := i.(type) {
1044 case *windowUpdate:
1045 t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
1046 case *settings:
1047 if i.ack {
1048 t.framer.writeSettingsAck(true)
1049 t.applySettings(i.ss)
1050 } else {
1051 t.framer.writeSettings(true, i.ss...)
1052 }
1053 case *resetStream:
1054 t.framer.writeRSTStream(true, i.streamID, i.code)
1055 case *goAway:
1056 t.mu.Lock()
1057 if t.state == closing {
1058 t.mu.Unlock()
1059 // The transport is closing.
1060 return
1061 }
1062 sid := t.maxStreamID
1063 if !i.headsUp {
1064 // Stop accepting more streams now.
1065 t.state = draining
1066 t.mu.Unlock()
1067 t.framer.writeGoAway(true, sid, i.code, i.debugData)
1068 if i.closeConn {
1069 // Abruptly close the connection following the GoAway.
1070 t.Close()
1071 }
1072 t.writableChan <- 0
1073 continue
1074 }
1075 t.mu.Unlock()
1076 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1077 // Follow that with a ping and wait for the ack to come back or a timer
1078 // to expire. During this time accept new streams since they might have
1079 // originated before the GoAway reaches the client.
1080 // After getting the ack or timer expiration send out another GoAway this
1081 // time with an ID of the max stream server intends to process.
1082 t.framer.writeGoAway(true, math.MaxUint32, http2.ErrCodeNo, []byte{})
1083 t.framer.writePing(true, false, goAwayPing.data)
1084 go func() {
1085 timer := time.NewTimer(time.Minute)
1086 defer timer.Stop()
1087 select {
1088 case <-t.drainChan:
1089 case <-timer.C:
1090 case <-t.shutdownChan:
1091 return
1092 }
1093 t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
1094 }()
1095 case *flushIO:
1096 t.framer.flushWrite()
1097 case *ping:
1098 if !i.ack {
1099 t.bdpEst.timesnap(i.data)
1100 }
1101 t.framer.writePing(true, i.ack, i.data)
1102 default:
1103 errorf("transport: http2Server.controller got unexpected item type %v\n", i)
1104 }
1105 t.writableChan <- 0
1106 continue
1107 case <-t.shutdownChan:
1108 return
1109 }
1110 case <-t.shutdownChan:
1111 return
1112 }
1113 }
1114}
1115
1116// Close starts shutting down the http2Server transport.
1117// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1118// could cause some resource issue. Revisit this later.
1119func (t *http2Server) Close() (err error) {
1120 t.mu.Lock()
1121 if t.state == closing {
1122 t.mu.Unlock()
1123 return errors.New("transport: Close() was already called")
1124 }
1125 t.state = closing
1126 streams := t.activeStreams
1127 t.activeStreams = nil
1128 t.mu.Unlock()
1129 close(t.shutdownChan)
1130 err = t.conn.Close()
1131 // Cancel all active streams.
1132 for _, s := range streams {
1133 s.cancel()
1134 }
1135 if t.stats != nil {
1136 connEnd := &stats.ConnEnd{}
1137 t.stats.HandleConn(t.ctx, connEnd)
1138 }
1139 return
1140}
1141
1142// closeStream clears the footprint of a stream when the stream is not needed
1143// any more.
1144func (t *http2Server) closeStream(s *Stream) {
1145 t.mu.Lock()
1146 delete(t.activeStreams, s.id)
1147 if len(t.activeStreams) == 0 {
1148 t.idle = time.Now()
1149 }
1150 if t.state == draining && len(t.activeStreams) == 0 {
1151 defer t.Close()
1152 }
1153 t.mu.Unlock()
1154 // In case stream sending and receiving are invoked in separate
1155 // goroutines (e.g., bi-directional streaming), cancel needs to be
1156 // called to interrupt the potential blocking on other goroutines.
1157 s.cancel()
1158 s.mu.Lock()
1159 if s.state == streamDone {
1160 s.mu.Unlock()
1161 return
1162 }
1163 s.state = streamDone
1164 s.mu.Unlock()
1165}
1166
1167func (t *http2Server) RemoteAddr() net.Addr {
1168 return t.remoteAddr
1169}
1170
1171func (t *http2Server) Drain() {
1172 t.drain(http2.ErrCodeNo, []byte{})
1173}
1174
1175func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1176 t.mu.Lock()
1177 defer t.mu.Unlock()
1178 if t.drainChan != nil {
1179 return
1180 }
1181 t.drainChan = make(chan struct{})
1182 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1183}
1184
1185var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
1186
1187func getJitter(v time.Duration) time.Duration {
1188 if v == infinity {
1189 return 0
1190 }
1191 // Generate a jitter between +/- 10% of the value.
1192 r := int64(v / 10)
1193 j := rgen.Int63n(2*r) - r
1194 return time.Duration(j)
1195}