]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blobdiff - vendor/google.golang.org/grpc/internal/transport/http2_server.go
Upgrade to 0.12
[github/fretlink/terraform-provider-statuscake.git] / vendor / google.golang.org / grpc / internal / transport / http2_server.go
similarity index 53%
rename from vendor/google.golang.org/grpc/transport/http2_server.go
rename to vendor/google.golang.org/grpc/internal/transport/http2_server.go
index b6f93e3c0c9f56286a4d96772cbda3ec973c92b5..df2740398bd404712913aac4c32984835c6cd595 100644 (file)
@@ -20,10 +20,11 @@ package transport
 
 import (
        "bytes"
+       "context"
        "errors"
+       "fmt"
        "io"
        "math"
-       "math/rand"
        "net"
        "strconv"
        "sync"
@@ -31,11 +32,14 @@ import (
        "time"
 
        "github.com/golang/protobuf/proto"
-       "golang.org/x/net/context"
        "golang.org/x/net/http2"
        "golang.org/x/net/http2/hpack"
+
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/grpclog"
+       "google.golang.org/grpc/internal/channelz"
+       "google.golang.org/grpc/internal/grpcrand"
        "google.golang.org/grpc/keepalive"
        "google.golang.org/grpc/metadata"
        "google.golang.org/grpc/peer"
@@ -44,39 +48,37 @@ import (
        "google.golang.org/grpc/tap"
 )
 
-// ErrIllegalHeaderWrite indicates that setting header is illegal because of
-// the stream's state.
-var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
+var (
+       // ErrIllegalHeaderWrite indicates that setting header is illegal because of
+       // the stream's state.
+       ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
+       // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
+       // than the limit set by peer.
+       ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer")
+)
 
 // http2Server implements the ServerTransport interface with HTTP2.
 type http2Server struct {
        ctx         context.Context
+       ctxDone     <-chan struct{} // Cache the context.Done() chan
+       cancel      context.CancelFunc
        conn        net.Conn
+       loopy       *loopyWriter
+       readerDone  chan struct{} // sync point to enable testing.
+       writerDone  chan struct{} // sync point to enable testing.
        remoteAddr  net.Addr
        localAddr   net.Addr
        maxStreamID uint32               // max stream ID ever seen
        authInfo    credentials.AuthInfo // auth info about the connection
        inTapHandle tap.ServerInHandle
-       // writableChan synchronizes write access to the transport.
-       // A writer acquires the write lock by receiving a value on writableChan
-       // and releases it by sending on writableChan.
-       writableChan chan int
-       // shutdownChan is closed when Close is called.
-       // Blocking operations should select on shutdownChan to avoid
-       // blocking forever after Close.
-       shutdownChan chan struct{}
-       framer       *framer
-       hBuf         *bytes.Buffer  // the buffer for HPACK encoding
-       hEnc         *hpack.Encoder // HPACK encoder
+       framer      *framer
        // The max number of concurrent streams.
        maxStreams uint32
        // controlBuf delivers all the control related tasks (e.g., window
        // updates, reset streams, and various settings) to the controller.
        controlBuf *controlBuffer
-       fc         *inFlow
-       // sendQuotaPool provides flow control to outbound message.
-       sendQuotaPool *quotaPool
-       stats         stats.Handler
+       fc         *trInFlow
+       stats      stats.Handler
        // Flag to keep track of reading activity on transport.
        // 1 is true and 0 is false.
        activity uint32 // Accessed atomically.
@@ -92,11 +94,10 @@ type http2Server struct {
        // Flag to signify that number of ping strikes should be reset to 0.
        // This is set whenever data or header frames are sent.
        // 1 means yes.
-       resetPingStrikes  uint32 // Accessed atomically.
-       initialWindowSize int32
-       bdpEst            *bdpEstimator
-
-       outQuotaVersion uint32
+       resetPingStrikes      uint32 // Accessed atomically.
+       initialWindowSize     int32
+       bdpEst                *bdpEstimator
+       maxSendHeaderListSize *uint32
 
        mu sync.Mutex // guard the following
 
@@ -109,19 +110,27 @@ type http2Server struct {
        drainChan     chan struct{}
        state         transportState
        activeStreams map[uint32]*Stream
-       // the per-stream outbound flow control window size set by the peer.
-       streamSendQuota uint32
        // idle is the time instant when the connection went idle.
-       // This is either the begining of the connection or when the number of
+       // This is either the beginning of the connection or when the number of
        // RPCs go down to 0.
        // When the connection is busy, this value is set to 0.
        idle time.Time
+
+       // Fields below are for channelz metric collection.
+       channelzID int64 // channelz unique identification number
+       czData     *channelzData
 }
 
 // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
 // returned if something goes wrong.
 func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
-       framer := newFramer(conn)
+       writeBufSize := config.WriteBufferSize
+       readBufSize := config.ReadBufferSize
+       maxHeaderListSize := defaultServerMaxHeaderListSize
+       if config.MaxHeaderListSize != nil {
+               maxHeaderListSize = *config.MaxHeaderListSize
+       }
+       framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
        // Send initial settings as connection preface to client.
        var isettings []http2.Setting
        // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
@@ -151,13 +160,19 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
                        ID:  http2.SettingInitialWindowSize,
                        Val: uint32(iwz)})
        }
-       if err := framer.writeSettings(true, isettings...); err != nil {
-               return nil, connectionErrorf(true, err, "transport: %v", err)
+       if config.MaxHeaderListSize != nil {
+               isettings = append(isettings, http2.Setting{
+                       ID:  http2.SettingMaxHeaderListSize,
+                       Val: *config.MaxHeaderListSize,
+               })
+       }
+       if err := framer.fr.WriteSettings(isettings...); err != nil {
+               return nil, connectionErrorf(false, err, "transport: %v", err)
        }
        // Adjust the connection flow control window if needed.
        if delta := uint32(icwz - defaultWindowSize); delta > 0 {
-               if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
-                       return nil, connectionErrorf(true, err, "transport: %v", err)
+               if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
+                       return nil, connectionErrorf(false, err, "transport: %v", err)
                }
        }
        kp := config.KeepaliveParams
@@ -182,32 +197,31 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
        if kep.MinTime == 0 {
                kep.MinTime = defaultKeepalivePolicyMinTime
        }
-       var buf bytes.Buffer
+       ctx, cancel := context.WithCancel(context.Background())
        t := &http2Server{
-               ctx:               context.Background(),
+               ctx:               ctx,
+               cancel:            cancel,
+               ctxDone:           ctx.Done(),
                conn:              conn,
                remoteAddr:        conn.RemoteAddr(),
                localAddr:         conn.LocalAddr(),
                authInfo:          config.AuthInfo,
                framer:            framer,
-               hBuf:              &buf,
-               hEnc:              hpack.NewEncoder(&buf),
+               readerDone:        make(chan struct{}),
+               writerDone:        make(chan struct{}),
                maxStreams:        maxStreams,
                inTapHandle:       config.InTapHandle,
-               controlBuf:        newControlBuffer(),
-               fc:                &inFlow{limit: uint32(icwz)},
-               sendQuotaPool:     newQuotaPool(defaultWindowSize),
+               fc:                &trInFlow{limit: uint32(icwz)},
                state:             reachable,
-               writableChan:      make(chan int, 1),
-               shutdownChan:      make(chan struct{}),
                activeStreams:     make(map[uint32]*Stream),
-               streamSendQuota:   defaultWindowSize,
                stats:             config.StatsHandler,
                kp:                kp,
                idle:              time.Now(),
                kep:               kep,
                initialWindowSize: iwz,
+               czData:            new(channelzData),
        }
+       t.controlBuf = newControlBuffer(t.ctxDone)
        if dynamicWindow {
                t.bdpEst = &bdpEstimator{
                        bdp:               initialWindowSize,
@@ -222,37 +236,83 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
                connBegin := &stats.ConnBegin{}
                t.stats.HandleConn(t.ctx, connBegin)
        }
-       go t.controller()
+       if channelz.IsOn() {
+               t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
+       }
+       t.framer.writer.Flush()
+
+       defer func() {
+               if err != nil {
+                       t.Close()
+               }
+       }()
+
+       // Check the validity of client preface.
+       preface := make([]byte, len(clientPreface))
+       if _, err := io.ReadFull(t.conn, preface); err != nil {
+               return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
+       }
+       if !bytes.Equal(preface, clientPreface) {
+               return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
+       }
+
+       frame, err := t.framer.fr.ReadFrame()
+       if err == io.EOF || err == io.ErrUnexpectedEOF {
+               return nil, err
+       }
+       if err != nil {
+               return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
+       }
+       atomic.StoreUint32(&t.activity, 1)
+       sf, ok := frame.(*http2.SettingsFrame)
+       if !ok {
+               return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
+       }
+       t.handleSettings(sf)
+
+       go func() {
+               t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
+               t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
+               if err := t.loopy.run(); err != nil {
+                       errorf("transport: loopyWriter.run returning. Err: %v", err)
+               }
+               t.conn.Close()
+               close(t.writerDone)
+       }()
        go t.keepalive()
-       t.writableChan <- 0
        return t, nil
 }
 
 // operateHeader takes action on the decoded headers.
-func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
-       buf := newRecvBuffer()
-       s := &Stream{
-               id:  frame.Header().StreamID,
-               st:  t,
-               buf: buf,
-               fc:  &inFlow{limit: uint32(t.initialWindowSize)},
-       }
-
-       var state decodeState
-       for _, hf := range frame.Fields {
-               if err := state.processHeaderField(hf); err != nil {
-                       if se, ok := err.(StreamError); ok {
-                               t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
-                       }
-                       return
+func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
+       streamID := frame.Header().StreamID
+       state := decodeState{serverSide: true}
+       if err := state.decodeHeader(frame); err != nil {
+               if se, ok := status.FromError(err); ok {
+                       t.controlBuf.put(&cleanupStream{
+                               streamID: streamID,
+                               rst:      true,
+                               rstCode:  statusCodeConvTab[se.Code()],
+                               onWrite:  func() {},
+                       })
                }
+               return false
        }
 
+       buf := newRecvBuffer()
+       s := &Stream{
+               id:             streamID,
+               st:             t,
+               buf:            buf,
+               fc:             &inFlow{limit: uint32(t.initialWindowSize)},
+               recvCompress:   state.encoding,
+               method:         state.method,
+               contentSubtype: state.contentSubtype,
+       }
        if frame.StreamEnded() {
                // s is just created by the caller. No lock needed.
                s.state = streamReadDone
        }
-       s.recvCompress = state.encoding
        if state.timeoutSet {
                s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
        } else {
@@ -266,25 +326,16 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
                pr.AuthInfo = t.authInfo
        }
        s.ctx = peer.NewContext(s.ctx, pr)
-       // Cache the current stream to the context so that the server application
-       // can find out. Required when the server wants to send some metadata
-       // back to the client (unary call only).
-       s.ctx = newContextWithStream(s.ctx, s)
        // Attach the received metadata to the context.
        if len(state.mdata) > 0 {
                s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
        }
-       s.trReader = &transportReader{
-               reader: &recvBufferReader{
-                       ctx:  s.ctx,
-                       recv: s.buf,
-               },
-               windowHandler: func(n int) {
-                       t.updateWindow(s, uint32(n))
-               },
+       if state.statsTags != nil {
+               s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
+       }
+       if state.statsTrace != nil {
+               s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
        }
-       s.recvCompress = state.encoding
-       s.method = state.method
        if t.inTapHandle != nil {
                var err error
                info := &tap.Info{
@@ -293,33 +344,46 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
                s.ctx, err = t.inTapHandle(s.ctx, info)
                if err != nil {
                        warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
-                       t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
-                       return
+                       t.controlBuf.put(&cleanupStream{
+                               streamID: s.id,
+                               rst:      true,
+                               rstCode:  http2.ErrCodeRefusedStream,
+                               onWrite:  func() {},
+                       })
+                       return false
                }
        }
        t.mu.Lock()
        if t.state != reachable {
                t.mu.Unlock()
-               return
+               return false
        }
        if uint32(len(t.activeStreams)) >= t.maxStreams {
                t.mu.Unlock()
-               t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
-               return
+               t.controlBuf.put(&cleanupStream{
+                       streamID: streamID,
+                       rst:      true,
+                       rstCode:  http2.ErrCodeRefusedStream,
+                       onWrite:  func() {},
+               })
+               return false
        }
-       if s.id%2 != 1 || s.id <= t.maxStreamID {
+       if streamID%2 != 1 || streamID <= t.maxStreamID {
                t.mu.Unlock()
                // illegal gRPC stream id.
-               errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", s.id)
+               errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID)
                return true
        }
-       t.maxStreamID = s.id
-       s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
-       t.activeStreams[s.id] = s
+       t.maxStreamID = streamID
+       t.activeStreams[streamID] = s
        if len(t.activeStreams) == 1 {
                t.idle = time.Time{}
        }
        t.mu.Unlock()
+       if channelz.IsOn() {
+               atomic.AddInt64(&t.czData.streamsStarted, 1)
+               atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
+       }
        s.requestRead = func(n int) {
                t.adjustWindow(s, uint32(n))
        }
@@ -335,61 +399,51 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
                }
                t.stats.HandleRPC(s.ctx, inHeader)
        }
+       s.ctxDone = s.ctx.Done()
+       s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
+       s.trReader = &transportReader{
+               reader: &recvBufferReader{
+                       ctx:     s.ctx,
+                       ctxDone: s.ctxDone,
+                       recv:    s.buf,
+               },
+               windowHandler: func(n int) {
+                       t.updateWindow(s, uint32(n))
+               },
+       }
+       // Register the stream with loopy.
+       t.controlBuf.put(&registerStream{
+               streamID: s.id,
+               wq:       s.wq,
+       })
        handle(s)
-       return
+       return false
 }
 
 // HandleStreams receives incoming streams using the given handler. This is
 // typically run in a separate goroutine.
 // traceCtx attaches trace to ctx and returns the new context.
 func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
-       // Check the validity of client preface.
-       preface := make([]byte, len(clientPreface))
-       if _, err := io.ReadFull(t.conn, preface); err != nil {
-               // Only log if it isn't a simple tcp accept check (ie: tcp balancer doing open/close socket)
-               if err != io.EOF {
-                       errorf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
-               }
-               t.Close()
-               return
-       }
-       if !bytes.Equal(preface, clientPreface) {
-               errorf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
-               t.Close()
-               return
-       }
-
-       frame, err := t.framer.readFrame()
-       if err == io.EOF || err == io.ErrUnexpectedEOF {
-               t.Close()
-               return
-       }
-       if err != nil {
-               errorf("transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
-               t.Close()
-               return
-       }
-       atomic.StoreUint32(&t.activity, 1)
-       sf, ok := frame.(*http2.SettingsFrame)
-       if !ok {
-               errorf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
-               t.Close()
-               return
-       }
-       t.handleSettings(sf)
-
+       defer close(t.readerDone)
        for {
-               frame, err := t.framer.readFrame()
+               frame, err := t.framer.fr.ReadFrame()
                atomic.StoreUint32(&t.activity, 1)
                if err != nil {
                        if se, ok := err.(http2.StreamError); ok {
+                               warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
                                t.mu.Lock()
                                s := t.activeStreams[se.StreamID]
                                t.mu.Unlock()
                                if s != nil {
-                                       t.closeStream(s)
+                                       t.closeStream(s, true, se.Code, nil, false)
+                               } else {
+                                       t.controlBuf.put(&cleanupStream{
+                                               streamID: se.StreamID,
+                                               rst:      true,
+                                               rstCode:  se.Code,
+                                               onWrite:  func() {},
+                                       })
                                }
-                               t.controlBuf.put(&resetStream{se.StreamID, se.Code})
                                continue
                        }
                        if err == io.EOF || err == io.ErrUnexpectedEOF {
@@ -443,33 +497,20 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
 // of stream if the application is requesting data larger in size than
 // the window.
 func (t *http2Server) adjustWindow(s *Stream, n uint32) {
-       s.mu.Lock()
-       defer s.mu.Unlock()
-       if s.state == streamDone {
-               return
-       }
        if w := s.fc.maybeAdjust(n); w > 0 {
-               if cw := t.fc.resetPendingUpdate(); cw > 0 {
-                       t.controlBuf.put(&windowUpdate{0, cw, false})
-               }
-               t.controlBuf.put(&windowUpdate{s.id, w, true})
+               t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
        }
+
 }
 
 // updateWindow adjusts the inbound quota for the stream and the transport.
 // Window updates will deliver to the controller for sending when
 // the cumulative quota exceeds the corresponding threshold.
 func (t *http2Server) updateWindow(s *Stream, n uint32) {
-       s.mu.Lock()
-       defer s.mu.Unlock()
-       if s.state == streamDone {
-               return
-       }
        if w := s.fc.onRead(n); w > 0 {
-               if cw := t.fc.resetPendingUpdate(); cw > 0 {
-                       t.controlBuf.put(&windowUpdate{0, cw, false})
-               }
-               t.controlBuf.put(&windowUpdate{s.id, w, true})
+               t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
+                       increment: w,
+               })
        }
 }
 
@@ -483,13 +524,15 @@ func (t *http2Server) updateFlowControl(n uint32) {
        }
        t.initialWindowSize = int32(n)
        t.mu.Unlock()
-       t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false})
-       t.controlBuf.put(&settings{
-               ack: false,
+       t.controlBuf.put(&outgoingWindowUpdate{
+               streamID:  0,
+               increment: t.fc.newLimit(n),
+       })
+       t.controlBuf.put(&outgoingSettings{
                ss: []http2.Setting{
                        {
                                ID:  http2.SettingInitialWindowSize,
-                               Val: uint32(n),
+                               Val: n,
                        },
                },
        })
@@ -500,7 +543,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
        size := f.Header().Length
        var sendBDPPing bool
        if t.bdpEst != nil {
-               sendBDPPing = t.bdpEst.add(uint32(size))
+               sendBDPPing = t.bdpEst.add(size)
        }
        // Decouple connection's flow control from application's read.
        // An update on connection's flow control should not depend on
@@ -510,21 +553,22 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
        // Decoupling the connection flow control will prevent other
        // active(fast) streams from starving in presence of slow or
        // inactive streams.
-       //
-       // Furthermore, if a bdpPing is being sent out we can piggyback
-       // connection's window update for the bytes we just received.
+       if w := t.fc.onData(size); w > 0 {
+               t.controlBuf.put(&outgoingWindowUpdate{
+                       streamID:  0,
+                       increment: w,
+               })
+       }
        if sendBDPPing {
-               t.controlBuf.put(&windowUpdate{0, uint32(size), false})
-               t.controlBuf.put(bdpPing)
-       } else {
-               if err := t.fc.onData(uint32(size)); err != nil {
-                       errorf("transport: http2Server %v", err)
-                       t.Close()
-                       return
-               }
-               if w := t.fc.onRead(uint32(size)); w > 0 {
-                       t.controlBuf.put(&windowUpdate{0, w, true})
+               // Avoid excessive ping detection (e.g. in an L7 proxy)
+               // by sending a window update prior to the BDP ping.
+               if w := t.fc.reset(); w > 0 {
+                       t.controlBuf.put(&outgoingWindowUpdate{
+                               streamID:  0,
+                               increment: w,
+                       })
                }
+               t.controlBuf.put(bdpPing)
        }
        // Select the right stream to dispatch.
        s, ok := t.getStream(f)
@@ -532,23 +576,15 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
                return
        }
        if size > 0 {
-               s.mu.Lock()
-               if s.state == streamDone {
-                       s.mu.Unlock()
-                       return
-               }
-               if err := s.fc.onData(uint32(size)); err != nil {
-                       s.mu.Unlock()
-                       t.closeStream(s)
-                       t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
+               if err := s.fc.onData(size); err != nil {
+                       t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false)
                        return
                }
                if f.Header().Flags.Has(http2.FlagDataPadded) {
-                       if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
-                               t.controlBuf.put(&windowUpdate{s.id, w, true})
+                       if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
+                               t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
                        }
                }
-               s.mu.Unlock()
                // TODO(bradfitz, zhaoq): A copy is required here because there is no
                // guarantee f.Data() is consumed before the arrival of next frame.
                // Can this copy be eliminated?
@@ -560,11 +596,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
        }
        if f.Header().Flags.Has(http2.FlagDataEndStream) {
                // Received the end of stream from the client.
-               s.mu.Lock()
-               if s.state != streamDone {
-                       s.state = streamReadDone
-               }
-               s.mu.Unlock()
+               s.compareAndSwapState(streamActive, streamReadDone)
                s.write(recvMsg{err: io.EOF})
        }
 }
@@ -574,7 +606,7 @@ func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
        if !ok {
                return
        }
-       t.closeStream(s)
+       t.closeStream(s, false, 0, nil, false)
 }
 
 func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
@@ -582,12 +614,27 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
                return
        }
        var ss []http2.Setting
+       var updateFuncs []func()
        f.ForeachSetting(func(s http2.Setting) error {
-               ss = append(ss, s)
+               switch s.ID {
+               case http2.SettingMaxHeaderListSize:
+                       updateFuncs = append(updateFuncs, func() {
+                               t.maxSendHeaderListSize = new(uint32)
+                               *t.maxSendHeaderListSize = s.Val
+                       })
+               default:
+                       ss = append(ss, s)
+               }
                return nil
        })
-       // The settings will be applied once the ack is sent.
-       t.controlBuf.put(&settings{ack: true, ss: ss})
+       t.controlBuf.executeAndPut(func(interface{}) bool {
+               for _, f := range updateFuncs {
+                       f()
+               }
+               return true
+       }, &incomingSettings{
+               ss: ss,
+       })
 }
 
 const (
@@ -627,7 +674,7 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
        t.mu.Unlock()
        if ns < 1 && !t.kep.PermitWithoutStream {
                // Keepalive shouldn't be active thus, this new ping should
-               // have come after atleast defaultPingTimeout.
+               // have come after at least defaultPingTimeout.
                if t.lastPingAt.Add(defaultPingTimeout).After(now) {
                        t.pingStrikes++
                }
@@ -640,69 +687,52 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
 
        if t.pingStrikes > maxPingStrikes {
                // Send goaway and close the connection.
+               errorf("transport: Got too many pings from the client, closing the connection.")
                t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
        }
 }
 
 func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
-       id := f.Header().StreamID
-       incr := f.Increment
-       if id == 0 {
-               t.sendQuotaPool.add(int(incr))
-               return
-       }
-       if s, ok := t.getStream(f); ok {
-               s.sendQuotaPool.add(int(incr))
-       }
+       t.controlBuf.put(&incomingWindowUpdate{
+               streamID:  f.Header().StreamID,
+               increment: f.Increment,
+       })
 }
 
-func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error {
-       first := true
-       endHeaders := false
-       var err error
-       defer func() {
-               if err == nil {
-                       // Reset ping strikes when seding headers since that might cause the
-                       // peer to send ping.
-                       atomic.StoreUint32(&t.resetPingStrikes, 1)
-               }
-       }()
-       // Sends the headers in a single batch.
-       for !endHeaders {
-               size := t.hBuf.Len()
-               if size > http2MaxFrameLen {
-                       size = http2MaxFrameLen
-               } else {
-                       endHeaders = true
+func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
+       for k, vv := range md {
+               if isReservedHeader(k) {
+                       // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
+                       continue
                }
-               if first {
-                       p := http2.HeadersFrameParam{
-                               StreamID:      s.id,
-                               BlockFragment: b.Next(size),
-                               EndStream:     endStream,
-                               EndHeaders:    endHeaders,
-                       }
-                       err = t.framer.writeHeaders(endHeaders, p)
-                       first = false
-               } else {
-                       err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size))
+               for _, v := range vv {
+                       headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
                }
-               if err != nil {
-                       t.Close()
-                       return connectionErrorf(true, err, "transport: %v", err)
+       }
+       return headerFields
+}
+
+func (t *http2Server) checkForHeaderListSize(it interface{}) bool {
+       if t.maxSendHeaderListSize == nil {
+               return true
+       }
+       hdrFrame := it.(*headerFrame)
+       var sz int64
+       for _, f := range hdrFrame.hf {
+               if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
+                       errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
+                       return false
                }
        }
-       return nil
+       return true
 }
 
 // WriteHeader sends the header metedata md back to the client.
 func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
-       s.mu.Lock()
-       if s.headerOk || s.state == streamDone {
-               s.mu.Unlock()
+       if s.updateHeaderSent() || s.getState() == streamDone {
                return ErrIllegalHeaderWrite
        }
-       s.headerOk = true
+       s.hdrMu.Lock()
        if md.Len() > 0 {
                if s.header.Len() > 0 {
                        s.header = metadata.Join(s.header, md)
@@ -710,37 +740,45 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
                        s.header = md
                }
        }
-       md = s.header
-       s.mu.Unlock()
-       if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
+       if err := t.writeHeaderLocked(s); err != nil {
+               s.hdrMu.Unlock()
                return err
        }
-       t.hBuf.Reset()
-       t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
-       t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
+       s.hdrMu.Unlock()
+       return nil
+}
+
+func (t *http2Server) writeHeaderLocked(s *Stream) error {
+       // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
+       // first and create a slice of that exact size.
+       headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
+       headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
+       headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
        if s.sendCompress != "" {
-               t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
-       }
-       for k, vv := range md {
-               if isReservedHeader(k) {
-                       // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
-                       continue
-               }
-               for _, v := range vv {
-                       t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
+               headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
+       }
+       headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
+       success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
+               streamID:  s.id,
+               hf:        headerFields,
+               endStream: false,
+               onWrite: func() {
+                       atomic.StoreUint32(&t.resetPingStrikes, 1)
+               },
+       })
+       if !success {
+               if err != nil {
+                       return err
                }
-       }
-       bufLen := t.hBuf.Len()
-       if err := t.writeHeaders(s, t.hBuf, false); err != nil {
-               return err
+               t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
+               return ErrHeaderListSizeLimitViolation
        }
        if t.stats != nil {
-               outHeader := &stats.OutHeader{
-                       WireLength: bufLen,
-               }
+               // Note: WireLength is not set in outHeader.
+               // TODO(mmukhi): Revisit this later, if needed.
+               outHeader := &stats.OutHeader{}
                t.stats.HandleRPC(s.Context(), outHeader)
        }
-       t.writableChan <- 0
        return nil
 }
 
@@ -749,204 +787,108 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
 // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
 // OK is adopted.
 func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
-       var headersSent, hasHeader bool
-       s.mu.Lock()
-       if s.state == streamDone {
-               s.mu.Unlock()
+       if s.getState() == streamDone {
                return nil
        }
-       if s.headerOk {
-               headersSent = true
-       }
-       if s.header.Len() > 0 {
-               hasHeader = true
-       }
-       s.mu.Unlock()
-
-       if !headersSent && hasHeader {
-               t.WriteHeader(s, nil)
-               headersSent = true
-       }
-
-       if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
-               return err
-       }
-       t.hBuf.Reset()
-       if !headersSent {
-               t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
-               t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
+       s.hdrMu.Lock()
+       // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
+       // first and create a slice of that exact size.
+       headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
+       if !s.updateHeaderSent() {                      // No headers have been sent.
+               if len(s.header) > 0 { // Send a separate header frame.
+                       if err := t.writeHeaderLocked(s); err != nil {
+                               s.hdrMu.Unlock()
+                               return err
+                       }
+               } else { // Send a trailer only response.
+                       headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
+                       headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
+               }
        }
-       t.hEnc.WriteField(
-               hpack.HeaderField{
-                       Name:  "grpc-status",
-                       Value: strconv.Itoa(int(st.Code())),
-               })
-       t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
+       headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
+       headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
 
        if p := st.Proto(); p != nil && len(p.Details) > 0 {
                stBytes, err := proto.Marshal(p)
                if err != nil {
                        // TODO: return error instead, when callers are able to handle it.
-                       panic(err)
+                       grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err)
+               } else {
+                       headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
                }
-
-               t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
        }
 
        // Attach the trailer metadata.
-       for k, vv := range s.trailer {
-               // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
-               if isReservedHeader(k) {
-                       continue
-               }
-               for _, v := range vv {
-                       t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
-               }
+       headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
+       trailingHeader := &headerFrame{
+               streamID:  s.id,
+               hf:        headerFields,
+               endStream: true,
+               onWrite: func() {
+                       atomic.StoreUint32(&t.resetPingStrikes, 1)
+               },
        }
-       bufLen := t.hBuf.Len()
-       if err := t.writeHeaders(s, t.hBuf, true); err != nil {
-               t.Close()
-               return err
+       s.hdrMu.Unlock()
+       success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
+       if !success {
+               if err != nil {
+                       return err
+               }
+               t.closeStream(s, true, http2.ErrCodeInternal, nil, false)
+               return ErrHeaderListSizeLimitViolation
        }
+       t.closeStream(s, false, 0, trailingHeader, true)
        if t.stats != nil {
-               outTrailer := &stats.OutTrailer{
-                       WireLength: bufLen,
-               }
-               t.stats.HandleRPC(s.Context(), outTrailer)
+               t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
        }
-       t.closeStream(s)
-       t.writableChan <- 0
        return nil
 }
 
 // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
 // is returns if it fails (e.g., framing error, transport error).
-func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
-       // TODO(zhaoq): Support multi-writers for a single stream.
-       var writeHeaderFrame bool
-       s.mu.Lock()
-       if s.state == streamDone {
-               s.mu.Unlock()
-               return streamErrorf(codes.Unknown, "the stream has been done")
-       }
-       if !s.headerOk {
-               writeHeaderFrame = true
-       }
-       s.mu.Unlock()
-       if writeHeaderFrame {
-               t.WriteHeader(s, nil)
-       }
-       r := bytes.NewBuffer(data)
-       var (
-               p   []byte
-               oqv uint32
-       )
-       for {
-               if r.Len() == 0 && p == nil {
-                       return nil
-               }
-               oqv = atomic.LoadUint32(&t.outQuotaVersion)
-               size := http2MaxFrameLen
-               // Wait until the stream has some quota to send the data.
-               sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
-               if err != nil {
-                       return err
-               }
-               // Wait until the transport has some quota to send the data.
-               tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
-               if err != nil {
-                       return err
-               }
-               if sq < size {
-                       size = sq
-               }
-               if tq < size {
-                       size = tq
+func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
+       if !s.isHeaderSent() { // Headers haven't been written yet.
+               if err := t.WriteHeader(s, nil); err != nil {
+                       // TODO(mmukhi, dfawley): Make sure this is the right code to return.
+                       return status.Errorf(codes.Internal, "transport: %v", err)
                }
-               if p == nil {
-                       p = r.Next(size)
-               }
-               ps := len(p)
-               if ps < sq {
-                       // Overbooked stream quota. Return it back.
-                       s.sendQuotaPool.add(sq - ps)
-               }
-               if ps < tq {
-                       // Overbooked transport quota. Return it back.
-                       t.sendQuotaPool.add(tq - ps)
-               }
-               t.framer.adjustNumWriters(1)
-               // Got some quota. Try to acquire writing privilege on the
-               // transport.
-               if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
-                       if _, ok := err.(StreamError); ok {
-                               // Return the connection quota back.
-                               t.sendQuotaPool.add(ps)
-                       }
-                       if t.framer.adjustNumWriters(-1) == 0 {
-                               // This writer is the last one in this batch and has the
-                               // responsibility to flush the buffered frames. It queues
-                               // a flush request to controlBuf instead of flushing directly
-                               // in order to avoid the race with other writing or flushing.
-                               t.controlBuf.put(&flushIO{})
-                       }
-                       return err
-               }
-               select {
-               case <-s.ctx.Done():
-                       t.sendQuotaPool.add(ps)
-                       if t.framer.adjustNumWriters(-1) == 0 {
-                               t.controlBuf.put(&flushIO{})
+       } else {
+               // Writing headers checks for this condition.
+               if s.getState() == streamDone {
+                       // TODO(mmukhi, dfawley): Should the server write also return io.EOF?
+                       s.cancel()
+                       select {
+                       case <-t.ctx.Done():
+                               return ErrConnClosing
+                       default:
                        }
-                       t.writableChan <- 0
                        return ContextErr(s.ctx.Err())
-               default:
-               }
-               if oqv != atomic.LoadUint32(&t.outQuotaVersion) {
-                       // InitialWindowSize settings frame must have been received after we
-                       // acquired send quota but before we got the writable channel.
-                       // We must forsake this write.
-                       t.sendQuotaPool.add(ps)
-                       s.sendQuotaPool.add(ps)
-                       if t.framer.adjustNumWriters(-1) == 0 {
-                               t.controlBuf.put(&flushIO{})
-                       }
-                       t.writableChan <- 0
-                       continue
-               }
-               var forceFlush bool
-               if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last {
-                       forceFlush = true
-               }
-               // Reset ping strikes when sending data since this might cause
-               // the peer to send ping.
-               atomic.StoreUint32(&t.resetPingStrikes, 1)
-               if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
-                       t.Close()
-                       return connectionErrorf(true, err, "transport: %v", err)
                }
-               p = nil
-               if t.framer.adjustNumWriters(-1) == 0 {
-                       t.framer.flushWrite()
-               }
-               t.writableChan <- 0
        }
-
-}
-
-func (t *http2Server) applySettings(ss []http2.Setting) {
-       for _, s := range ss {
-               if s.ID == http2.SettingInitialWindowSize {
-                       t.mu.Lock()
-                       defer t.mu.Unlock()
-                       for _, stream := range t.activeStreams {
-                               stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota))
-                       }
-                       t.streamSendQuota = s.Val
-                       atomic.AddUint32(&t.outQuotaVersion, 1)
+       // Add some data to header frame so that we can equally distribute bytes across frames.
+       emptyLen := http2MaxFrameLen - len(hdr)
+       if emptyLen > len(data) {
+               emptyLen = len(data)
+       }
+       hdr = append(hdr, data[:emptyLen]...)
+       data = data[emptyLen:]
+       df := &dataFrame{
+               streamID: s.id,
+               h:        hdr,
+               d:        data,
+               onEachWrite: func() {
+                       atomic.StoreUint32(&t.resetPingStrikes, 1)
+               },
+       }
+       if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
+               select {
+               case <-t.ctx.Done():
+                       return ErrConnClosing
+               default:
                }
-
+               return ContextErr(s.ctx.Err())
        }
+       return t.controlBuf.put(df)
 }
 
 // keepalive running in a separate goroutine does the following:
@@ -962,7 +904,7 @@ func (t *http2Server) keepalive() {
        maxAge := time.NewTimer(t.kp.MaxConnectionAge)
        keepalive := time.NewTimer(t.kp.Time)
        // NOTE: All exit paths of this function should reset their
-       // respecitve timers. A failure to do so will cause the
+       // respective timers. A failure to do so will cause the
        // following clean-up to deadlock and eventually leak.
        defer func() {
                if !maxIdle.Stop() {
@@ -991,7 +933,7 @@ func (t *http2Server) keepalive() {
                                // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
                                // Gracefully close the connection.
                                t.drain(http2.ErrCodeNo, []byte{})
-                               // Reseting the timer so that the clean-up doesn't deadlock.
+                               // Resetting the timer so that the clean-up doesn't deadlock.
                                maxIdle.Reset(infinity)
                                return
                        }
@@ -1003,9 +945,9 @@ func (t *http2Server) keepalive() {
                        case <-maxAge.C:
                                // Close the connection after grace period.
                                t.Close()
-                               // Reseting the timer so that the clean-up doesn't deadlock.
+                               // Resetting the timer so that the clean-up doesn't deadlock.
                                maxAge.Reset(infinity)
-                       case <-t.shutdownChan:
+                       case <-t.ctx.Done():
                        }
                        return
                case <-keepalive.C:
@@ -1016,98 +958,17 @@ func (t *http2Server) keepalive() {
                        }
                        if pingSent {
                                t.Close()
-                               // Reseting the timer so that the clean-up doesn't deadlock.
+                               // Resetting the timer so that the clean-up doesn't deadlock.
                                keepalive.Reset(infinity)
                                return
                        }
                        pingSent = true
+                       if channelz.IsOn() {
+                               atomic.AddInt64(&t.czData.kpCount, 1)
+                       }
                        t.controlBuf.put(p)
                        keepalive.Reset(t.kp.Timeout)
-               case <-t.shutdownChan:
-                       return
-               }
-       }
-}
-
-var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
-
-// controller running in a separate goroutine takes charge of sending control
-// frames (e.g., window update, reset stream, setting, etc.) to the server.
-func (t *http2Server) controller() {
-       for {
-               select {
-               case i := <-t.controlBuf.get():
-                       t.controlBuf.load()
-                       select {
-                       case <-t.writableChan:
-                               switch i := i.(type) {
-                               case *windowUpdate:
-                                       t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
-                               case *settings:
-                                       if i.ack {
-                                               t.framer.writeSettingsAck(true)
-                                               t.applySettings(i.ss)
-                                       } else {
-                                               t.framer.writeSettings(true, i.ss...)
-                                       }
-                               case *resetStream:
-                                       t.framer.writeRSTStream(true, i.streamID, i.code)
-                               case *goAway:
-                                       t.mu.Lock()
-                                       if t.state == closing {
-                                               t.mu.Unlock()
-                                               // The transport is closing.
-                                               return
-                                       }
-                                       sid := t.maxStreamID
-                                       if !i.headsUp {
-                                               // Stop accepting more streams now.
-                                               t.state = draining
-                                               t.mu.Unlock()
-                                               t.framer.writeGoAway(true, sid, i.code, i.debugData)
-                                               if i.closeConn {
-                                                       // Abruptly close the connection following the GoAway.
-                                                       t.Close()
-                                               }
-                                               t.writableChan <- 0
-                                               continue
-                                       }
-                                       t.mu.Unlock()
-                                       // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
-                                       // Follow that with a ping and wait for the ack to come back or a timer
-                                       // to expire. During this time accept new streams since they might have
-                                       // originated before the GoAway reaches the client.
-                                       // After getting the ack or timer expiration send out another GoAway this
-                                       // time with an ID of the max stream server intends to process.
-                                       t.framer.writeGoAway(true, math.MaxUint32, http2.ErrCodeNo, []byte{})
-                                       t.framer.writePing(true, false, goAwayPing.data)
-                                       go func() {
-                                               timer := time.NewTimer(time.Minute)
-                                               defer timer.Stop()
-                                               select {
-                                               case <-t.drainChan:
-                                               case <-timer.C:
-                                               case <-t.shutdownChan:
-                                                       return
-                                               }
-                                               t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
-                                       }()
-                               case *flushIO:
-                                       t.framer.flushWrite()
-                               case *ping:
-                                       if !i.ack {
-                                               t.bdpEst.timesnap(i.data)
-                                       }
-                                       t.framer.writePing(true, i.ack, i.data)
-                               default:
-                                       errorf("transport: http2Server.controller got unexpected item type %v\n", i)
-                               }
-                               t.writableChan <- 0
-                               continue
-                       case <-t.shutdownChan:
-                               return
-                       }
-               case <-t.shutdownChan:
+               case <-t.ctx.Done():
                        return
                }
        }
@@ -1116,7 +977,7 @@ func (t *http2Server) controller() {
 // Close starts shutting down the http2Server transport.
 // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
 // could cause some resource issue. Revisit this later.
-func (t *http2Server) Close() (err error) {
+func (t *http2Server) Close() error {
        t.mu.Lock()
        if t.state == closing {
                t.mu.Unlock()
@@ -1126,8 +987,12 @@ func (t *http2Server) Close() (err error) {
        streams := t.activeStreams
        t.activeStreams = nil
        t.mu.Unlock()
-       close(t.shutdownChan)
-       err = t.conn.Close()
+       t.controlBuf.finish()
+       t.cancel()
+       err := t.conn.Close()
+       if channelz.IsOn() {
+               channelz.RemoveEntry(t.channelzID)
+       }
        // Cancel all active streams.
        for _, s := range streams {
                s.cancel()
@@ -1136,32 +1001,48 @@ func (t *http2Server) Close() (err error) {
                connEnd := &stats.ConnEnd{}
                t.stats.HandleConn(t.ctx, connEnd)
        }
-       return
+       return err
 }
 
 // closeStream clears the footprint of a stream when the stream is not needed
 // any more.
-func (t *http2Server) closeStream(s *Stream) {
-       t.mu.Lock()
-       delete(t.activeStreams, s.id)
-       if len(t.activeStreams) == 0 {
-               t.idle = time.Now()
-       }
-       if t.state == draining && len(t.activeStreams) == 0 {
-               defer t.Close()
+func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
+       if s.swapState(streamDone) == streamDone {
+               // If the stream was already done, return.
+               return
        }
-       t.mu.Unlock()
        // In case stream sending and receiving are invoked in separate
        // goroutines (e.g., bi-directional streaming), cancel needs to be
        // called to interrupt the potential blocking on other goroutines.
        s.cancel()
-       s.mu.Lock()
-       if s.state == streamDone {
-               s.mu.Unlock()
-               return
+       cleanup := &cleanupStream{
+               streamID: s.id,
+               rst:      rst,
+               rstCode:  rstCode,
+               onWrite: func() {
+                       t.mu.Lock()
+                       if t.activeStreams != nil {
+                               delete(t.activeStreams, s.id)
+                               if len(t.activeStreams) == 0 {
+                                       t.idle = time.Now()
+                               }
+                       }
+                       t.mu.Unlock()
+                       if channelz.IsOn() {
+                               if eosReceived {
+                                       atomic.AddInt64(&t.czData.streamsSucceeded, 1)
+                               } else {
+                                       atomic.AddInt64(&t.czData.streamsFailed, 1)
+                               }
+                       }
+               },
+       }
+       if hdr != nil {
+               hdr.cleanup = cleanup
+               t.controlBuf.put(hdr)
+       } else {
+               t.controlBuf.put(cleanup)
        }
-       s.state = streamDone
-       s.mu.Unlock()
 }
 
 func (t *http2Server) RemoteAddr() net.Addr {
@@ -1182,7 +1063,111 @@ func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
        t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
 }
 
-var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
+var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
+
+// Handles outgoing GoAway and returns true if loopy needs to put itself
+// in draining mode.
+func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
+       t.mu.Lock()
+       if t.state == closing { // TODO(mmukhi): This seems unnecessary.
+               t.mu.Unlock()
+               // The transport is closing.
+               return false, ErrConnClosing
+       }
+       sid := t.maxStreamID
+       if !g.headsUp {
+               // Stop accepting more streams now.
+               t.state = draining
+               if len(t.activeStreams) == 0 {
+                       g.closeConn = true
+               }
+               t.mu.Unlock()
+               if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
+                       return false, err
+               }
+               if g.closeConn {
+                       // Abruptly close the connection following the GoAway (via
+                       // loopywriter).  But flush out what's inside the buffer first.
+                       t.framer.writer.Flush()
+                       return false, fmt.Errorf("transport: Connection closing")
+               }
+               return true, nil
+       }
+       t.mu.Unlock()
+       // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
+       // Follow that with a ping and wait for the ack to come back or a timer
+       // to expire. During this time accept new streams since they might have
+       // originated before the GoAway reaches the client.
+       // After getting the ack or timer expiration send out another GoAway this
+       // time with an ID of the max stream server intends to process.
+       if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
+               return false, err
+       }
+       if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
+               return false, err
+       }
+       go func() {
+               timer := time.NewTimer(time.Minute)
+               defer timer.Stop()
+               select {
+               case <-t.drainChan:
+               case <-timer.C:
+               case <-t.ctx.Done():
+                       return
+               }
+               t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
+       }()
+       return false, nil
+}
+
+func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
+       s := channelz.SocketInternalMetric{
+               StreamsStarted:                   atomic.LoadInt64(&t.czData.streamsStarted),
+               StreamsSucceeded:                 atomic.LoadInt64(&t.czData.streamsSucceeded),
+               StreamsFailed:                    atomic.LoadInt64(&t.czData.streamsFailed),
+               MessagesSent:                     atomic.LoadInt64(&t.czData.msgSent),
+               MessagesReceived:                 atomic.LoadInt64(&t.czData.msgRecv),
+               KeepAlivesSent:                   atomic.LoadInt64(&t.czData.kpCount),
+               LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
+               LastMessageSentTimestamp:         time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
+               LastMessageReceivedTimestamp:     time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
+               LocalFlowControlWindow:           int64(t.fc.getSize()),
+               SocketOptions:                    channelz.GetSocketOption(t.conn),
+               LocalAddr:                        t.localAddr,
+               RemoteAddr:                       t.remoteAddr,
+               // RemoteName :
+       }
+       if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
+               s.Security = au.GetSecurityValue()
+       }
+       s.RemoteFlowControlWindow = t.getOutFlowWindow()
+       return &s
+}
+
+func (t *http2Server) IncrMsgSent() {
+       atomic.AddInt64(&t.czData.msgSent, 1)
+       atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
+}
+
+func (t *http2Server) IncrMsgRecv() {
+       atomic.AddInt64(&t.czData.msgRecv, 1)
+       atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
+}
+
+func (t *http2Server) getOutFlowWindow() int64 {
+       resp := make(chan uint32)
+       timer := time.NewTimer(time.Second)
+       defer timer.Stop()
+       t.controlBuf.put(&outFlowControlSizeRequest{resp})
+       select {
+       case sz := <-resp:
+               return int64(sz)
+       case <-t.ctxDone:
+               return -1
+       case <-timer.C:
+               return -2
+       }
+}
 
 func getJitter(v time.Duration) time.Duration {
        if v == infinity {
@@ -1190,6 +1175,6 @@ func getJitter(v time.Duration) time.Duration {
        }
        // Generate a jitter between +/- 10% of the value.
        r := int64(v / 10)
-       j := rgen.Int63n(2*r) - r
+       j := grpcrand.Int63n(2*r) - r
        return time.Duration(j)
 }