]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blobdiff - vendor/google.golang.org/grpc/server.go
Upgrade to 0.12
[github/fretlink/terraform-provider-statuscake.git] / vendor / google.golang.org / grpc / server.go
index 42733e2214e7e301fd1390a020fca01d01ef00a2..d705d7a80cd1c484829f40828cd219f2ee5cc92f 100644 (file)
 package grpc
 
 import (
-       "bytes"
+       "context"
        "errors"
        "fmt"
        "io"
+       "math"
        "net"
        "net/http"
        "reflect"
        "runtime"
        "strings"
        "sync"
+       "sync/atomic"
        "time"
 
-       "golang.org/x/net/context"
-       "golang.org/x/net/http2"
        "golang.org/x/net/trace"
+
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/credentials"
+       "google.golang.org/grpc/encoding"
+       "google.golang.org/grpc/encoding/proto"
        "google.golang.org/grpc/grpclog"
-       "google.golang.org/grpc/internal"
+       "google.golang.org/grpc/internal/binarylog"
+       "google.golang.org/grpc/internal/channelz"
+       "google.golang.org/grpc/internal/transport"
        "google.golang.org/grpc/keepalive"
        "google.golang.org/grpc/metadata"
+       "google.golang.org/grpc/peer"
        "google.golang.org/grpc/stats"
        "google.golang.org/grpc/status"
        "google.golang.org/grpc/tap"
-       "google.golang.org/grpc/transport"
 )
 
 const (
        defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
-       defaultServerMaxSendMessageSize    = 1024 * 1024 * 4
+       defaultServerMaxSendMessageSize    = math.MaxInt32
 )
 
 type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
@@ -88,18 +93,24 @@ type Server struct {
        conns  map[io.Closer]bool
        serve  bool
        drain  bool
-       ctx    context.Context
-       cancel context.CancelFunc
-       // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
-       // and all the transport goes away.
-       cv     *sync.Cond
+       cv     *sync.Cond          // signaled when connections close for GracefulStop
        m      map[string]*service // service name -> service info
        events trace.EventLog
+
+       quit               chan struct{}
+       done               chan struct{}
+       quitOnce           sync.Once
+       doneOnce           sync.Once
+       channelzRemoveOnce sync.Once
+       serveWG            sync.WaitGroup // counts active Serve goroutines for GracefulStop
+
+       channelzID int64 // channelz unique identification number
+       czData     *channelzData
 }
 
 type options struct {
        creds                 credentials.TransportCredentials
-       codec                 Codec
+       codec                 baseCodec
        cp                    Compressor
        dc                    Decompressor
        unaryInt              UnaryServerInterceptor
@@ -109,22 +120,50 @@ type options struct {
        maxConcurrentStreams  uint32
        maxReceiveMessageSize int
        maxSendMessageSize    int
-       useHandlerImpl        bool // use http.Handler-based server
        unknownStreamDesc     *StreamDesc
        keepaliveParams       keepalive.ServerParameters
        keepalivePolicy       keepalive.EnforcementPolicy
        initialWindowSize     int32
        initialConnWindowSize int32
+       writeBufferSize       int
+       readBufferSize        int
+       connectionTimeout     time.Duration
+       maxHeaderListSize     *uint32
 }
 
 var defaultServerOptions = options{
        maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
        maxSendMessageSize:    defaultServerMaxSendMessageSize,
+       connectionTimeout:     120 * time.Second,
+       writeBufferSize:       defaultWriteBufSize,
+       readBufferSize:        defaultReadBufSize,
 }
 
 // A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
 type ServerOption func(*options)
 
+// WriteBufferSize determines how much data can be batched before doing a write on the wire.
+// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
+// The default value for this buffer is 32KB.
+// Zero will disable the write buffer such that each write will be on underlying connection.
+// Note: A Send call may not directly translate to a write.
+func WriteBufferSize(s int) ServerOption {
+       return func(o *options) {
+               o.writeBufferSize = s
+       }
+}
+
+// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
+// for one read syscall.
+// The default value for this buffer is 32KB.
+// Zero will disable read buffer for a connection so data framer can access the underlying
+// conn directly.
+func ReadBufferSize(s int) ServerOption {
+       return func(o *options) {
+               o.readBufferSize = s
+       }
+}
+
 // InitialWindowSize returns a ServerOption that sets window size for stream.
 // The lower bound for window size is 64K and any value smaller than that will be ignored.
 func InitialWindowSize(s int32) ServerOption {
@@ -156,20 +195,32 @@ func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
 }
 
 // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
+//
+// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
 func CustomCodec(codec Codec) ServerOption {
        return func(o *options) {
                o.codec = codec
        }
 }
 
-// RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
+// RPCCompressor returns a ServerOption that sets a compressor for outbound
+// messages.  For backward compatibility, all outbound messages will be sent
+// using this compressor, regardless of incoming message compression.  By
+// default, server messages will be sent using the same compressor with which
+// request messages were sent.
+//
+// Deprecated: use encoding.RegisterCompressor instead.
 func RPCCompressor(cp Compressor) ServerOption {
        return func(o *options) {
                o.cp = cp
        }
 }
 
-// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
+// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
+// messages.  It has higher priority than decompressors registered via
+// encoding.RegisterCompressor.
+//
+// Deprecated: use encoding.RegisterCompressor instead.
 func RPCDecompressor(dc Decompressor) ServerOption {
        return func(o *options) {
                o.dc = dc
@@ -177,7 +228,9 @@ func RPCDecompressor(dc Decompressor) ServerOption {
 }
 
 // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
-// If this is not set, gRPC uses the default limit. Deprecated: use MaxRecvMsgSize instead.
+// If this is not set, gRPC uses the default limit.
+//
+// Deprecated: use MaxRecvMsgSize instead.
 func MaxMsgSize(m int) ServerOption {
        return MaxRecvMsgSize(m)
 }
@@ -259,7 +312,7 @@ func StatsHandler(h stats.Handler) ServerOption {
 // handler that will be invoked instead of returning the "unimplemented" gRPC
 // error whenever a request is received for an unregistered service or method.
 // The handling function has full access to the Context of the request and the
-// stream, and the invocation passes through interceptors.
+// stream, and the invocation bypasses interceptors.
 func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
        return func(o *options) {
                o.unknownStreamDesc = &StreamDesc{
@@ -272,6 +325,26 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
        }
 }
 
+// ConnectionTimeout returns a ServerOption that sets the timeout for
+// connection establishment (up to and including HTTP/2 handshaking) for all
+// new connections.  If this is not set, the default is 120 seconds.  A zero or
+// negative value will result in an immediate timeout.
+//
+// This API is EXPERIMENTAL.
+func ConnectionTimeout(d time.Duration) ServerOption {
+       return func(o *options) {
+               o.connectionTimeout = d
+       }
+}
+
+// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
+// of header list that the server is prepared to accept.
+func MaxHeaderListSize(s uint32) ServerOption {
+       return func(o *options) {
+               o.maxHeaderListSize = &s
+       }
+}
+
 // NewServer creates a gRPC server which has no service registered and has not
 // started to accept requests yet.
 func NewServer(opt ...ServerOption) *Server {
@@ -279,22 +352,24 @@ func NewServer(opt ...ServerOption) *Server {
        for _, o := range opt {
                o(&opts)
        }
-       if opts.codec == nil {
-               // Set the default codec.
-               opts.codec = protoCodec{}
-       }
        s := &Server{
-               lis:   make(map[net.Listener]bool),
-               opts:  opts,
-               conns: make(map[io.Closer]bool),
-               m:     make(map[string]*service),
+               lis:    make(map[net.Listener]bool),
+               opts:   opts,
+               conns:  make(map[io.Closer]bool),
+               m:      make(map[string]*service),
+               quit:   make(chan struct{}),
+               done:   make(chan struct{}),
+               czData: new(channelzData),
        }
        s.cv = sync.NewCond(&s.mu)
-       s.ctx, s.cancel = context.WithCancel(context.Background())
        if EnableTracing {
                _, file, line, _ := runtime.Caller(1)
                s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
        }
+
+       if channelz.IsOn() {
+               s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
+       }
        return s
 }
 
@@ -399,11 +474,9 @@ func (s *Server) GetServiceInfo() map[string]ServiceInfo {
        return ret
 }
 
-var (
-       // ErrServerStopped indicates that the operation is now illegal because of
-       // the server being stopped.
-       ErrServerStopped = errors.New("grpc: the server has been stopped")
-)
+// ErrServerStopped indicates that the operation is now illegal because of
+// the server being stopped.
+var ErrServerStopped = errors.New("grpc: the server has been stopped")
 
 func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
        if s.opts.creds == nil {
@@ -412,28 +485,67 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
        return s.opts.creds.ServerHandshake(rawConn)
 }
 
+type listenSocket struct {
+       net.Listener
+       channelzID int64
+}
+
+func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
+       return &channelz.SocketInternalMetric{
+               SocketOptions: channelz.GetSocketOption(l.Listener),
+               LocalAddr:     l.Listener.Addr(),
+       }
+}
+
+func (l *listenSocket) Close() error {
+       err := l.Listener.Close()
+       if channelz.IsOn() {
+               channelz.RemoveEntry(l.channelzID)
+       }
+       return err
+}
+
 // Serve accepts incoming connections on the listener lis, creating a new
 // ServerTransport and service goroutine for each. The service goroutines
 // read gRPC requests and then call the registered handlers to reply to them.
 // Serve returns when lis.Accept fails with fatal errors.  lis will be closed when
 // this method returns.
-// Serve always returns non-nil error.
+// Serve will return a non-nil error unless Stop or GracefulStop is called.
 func (s *Server) Serve(lis net.Listener) error {
        s.mu.Lock()
        s.printf("serving")
        s.serve = true
        if s.lis == nil {
+               // Serve called after Stop or GracefulStop.
                s.mu.Unlock()
                lis.Close()
                return ErrServerStopped
        }
-       s.lis[lis] = true
+
+       s.serveWG.Add(1)
+       defer func() {
+               s.serveWG.Done()
+               select {
+               // Stop or GracefulStop called; block until done and return nil.
+               case <-s.quit:
+                       <-s.done
+               default:
+               }
+       }()
+
+       ls := &listenSocket{Listener: lis}
+       s.lis[ls] = true
+
+       if channelz.IsOn() {
+               ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
+       }
        s.mu.Unlock()
+
        defer func() {
                s.mu.Lock()
-               if s.lis != nil && s.lis[lis] {
-                       lis.Close()
-                       delete(s.lis, lis)
+               if s.lis != nil && s.lis[ls] {
+                       ls.Close()
+                       delete(s.lis, ls)
                }
                s.mu.Unlock()
        }()
@@ -460,36 +572,52 @@ func (s *Server) Serve(lis net.Listener) error {
                                timer := time.NewTimer(tempDelay)
                                select {
                                case <-timer.C:
-                               case <-s.ctx.Done():
+                               case <-s.quit:
+                                       timer.Stop()
+                                       return nil
                                }
-                               timer.Stop()
                                continue
                        }
                        s.mu.Lock()
                        s.printf("done serving; Accept = %v", err)
                        s.mu.Unlock()
+
+                       select {
+                       case <-s.quit:
+                               return nil
+                       default:
+                       }
                        return err
                }
                tempDelay = 0
-               // Start a new goroutine to deal with rawConn
-               // so we don't stall this Accept loop goroutine.
-               go s.handleRawConn(rawConn)
+               // Start a new goroutine to deal with rawConn so we don't stall this Accept
+               // loop goroutine.
+               //
+               // Make sure we account for the goroutine so GracefulStop doesn't nil out
+               // s.conns before this conn can be added.
+               s.serveWG.Add(1)
+               go func() {
+                       s.handleRawConn(rawConn)
+                       s.serveWG.Done()
+               }()
        }
 }
 
-// handleRawConn is run in its own goroutine and handles a just-accepted
-// connection that has not had any I/O performed on it yet.
+// handleRawConn forks a goroutine to handle a just-accepted connection that
+// has not had any I/O performed on it yet.
 func (s *Server) handleRawConn(rawConn net.Conn) {
+       rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
        conn, authInfo, err := s.useTransportAuthenticator(rawConn)
        if err != nil {
                s.mu.Lock()
                s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
                s.mu.Unlock()
                grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
-               // If serverHandShake returns ErrConnDispatched, keep rawConn open.
+               // If serverHandshake returns ErrConnDispatched, keep rawConn open.
                if err != credentials.ErrConnDispatched {
                        rawConn.Close()
                }
+               rawConn.SetDeadline(time.Time{})
                return
        }
 
@@ -501,19 +629,25 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
        }
        s.mu.Unlock()
 
-       if s.opts.useHandlerImpl {
-               s.serveUsingHandler(conn)
-       } else {
-               s.serveHTTP2Transport(conn, authInfo)
+       // Finish handshaking (HTTP2)
+       st := s.newHTTP2Transport(conn, authInfo)
+       if st == nil {
+               return
+       }
+
+       rawConn.SetDeadline(time.Time{})
+       if !s.addConn(st) {
+               return
        }
+       go func() {
+               s.serveStreams(st)
+               s.removeConn(st)
+       }()
 }
 
-// serveHTTP2Transport sets up a http/2 transport (using the
-// gRPC http2 server transport in transport/http2_server.go) and
-// serves streams on it.
-// This is run in its own goroutine (it does network I/O in
-// transport.NewServerTransport).
-func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
+// newHTTP2Transport sets up a http/2 transport (using the
+// gRPC http2 server transport in transport/http2_server.go).
+func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
        config := &transport.ServerConfig{
                MaxStreams:            s.opts.maxConcurrentStreams,
                AuthInfo:              authInfo,
@@ -523,6 +657,10 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
                KeepalivePolicy:       s.opts.keepalivePolicy,
                InitialWindowSize:     s.opts.initialWindowSize,
                InitialConnWindowSize: s.opts.initialConnWindowSize,
+               WriteBufferSize:       s.opts.writeBufferSize,
+               ReadBufferSize:        s.opts.readBufferSize,
+               ChannelzParentID:      s.channelzID,
+               MaxHeaderListSize:     s.opts.maxHeaderListSize,
        }
        st, err := transport.NewServerTransport("http2", c, config)
        if err != nil {
@@ -531,17 +669,13 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
                s.mu.Unlock()
                c.Close()
                grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
-               return
-       }
-       if !s.addConn(st) {
-               st.Close()
-               return
+               return nil
        }
-       s.serveStreams(st)
+
+       return st
 }
 
 func (s *Server) serveStreams(st transport.ServerTransport) {
-       defer s.removeConn(st)
        defer st.Close()
        var wg sync.WaitGroup
        st.HandleStreams(func(stream *transport.Stream) {
@@ -562,32 +696,6 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
 
 var _ http.Handler = (*Server)(nil)
 
-// serveUsingHandler is called from handleRawConn when s is configured
-// to handle requests via the http.Handler interface. It sets up a
-// net/http.Server to handle the just-accepted conn. The http.Server
-// is configured to route all incoming requests (all HTTP/2 streams)
-// to ServeHTTP, which creates a new ServerTransport for each stream.
-// serveUsingHandler blocks until conn closes.
-//
-// This codepath is only used when Server.TestingUseHandlerImpl has
-// been configured. This lets the end2end tests exercise the ServeHTTP
-// method as one of the environment types.
-//
-// conn is the *tls.Conn that's already been authenticated.
-func (s *Server) serveUsingHandler(conn net.Conn) {
-       if !s.addConn(conn) {
-               conn.Close()
-               return
-       }
-       defer s.removeConn(conn)
-       h2s := &http2.Server{
-               MaxConcurrentStreams: s.opts.maxConcurrentStreams,
-       }
-       h2s.ServeConn(conn, &http2.ServeConnOpts{
-               Handler: s,
-       })
-}
-
 // ServeHTTP implements the Go standard library's http.Handler
 // interface by responding to the gRPC request r, by looking up
 // the requested gRPC method in the gRPC server s.
@@ -613,13 +721,12 @@ func (s *Server) serveUsingHandler(conn net.Conn) {
 // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
 // and subject to change.
 func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-       st, err := transport.NewServerHandlerTransport(w, r)
+       st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
        if err != nil {
                http.Error(w, err.Error(), http.StatusInternalServerError)
                return
        }
        if !s.addConn(st) {
-               st.Close()
                return
        }
        defer s.removeConn(st)
@@ -649,9 +756,15 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
 func (s *Server) addConn(c io.Closer) bool {
        s.mu.Lock()
        defer s.mu.Unlock()
-       if s.conns == nil || s.drain {
+       if s.conns == nil {
+               c.Close()
                return false
        }
+       if s.drain {
+               // Transport added after we drained our existing conns: drain it
+               // immediately.
+               c.(transport.ServerTransport).Drain()
+       }
        s.conns[c] = true
        return true
 }
@@ -665,43 +778,73 @@ func (s *Server) removeConn(c io.Closer) {
        }
 }
 
-func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
-       var (
-               cbuf       *bytes.Buffer
-               outPayload *stats.OutPayload
-       )
-       if cp != nil {
-               cbuf = new(bytes.Buffer)
+func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
+       return &channelz.ServerInternalMetric{
+               CallsStarted:             atomic.LoadInt64(&s.czData.callsStarted),
+               CallsSucceeded:           atomic.LoadInt64(&s.czData.callsSucceeded),
+               CallsFailed:              atomic.LoadInt64(&s.czData.callsFailed),
+               LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
        }
-       if s.opts.statsHandler != nil {
-               outPayload = &stats.OutPayload{}
-       }
-       p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
+}
+
+func (s *Server) incrCallsStarted() {
+       atomic.AddInt64(&s.czData.callsStarted, 1)
+       atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
+}
+
+func (s *Server) incrCallsSucceeded() {
+       atomic.AddInt64(&s.czData.callsSucceeded, 1)
+}
+
+func (s *Server) incrCallsFailed() {
+       atomic.AddInt64(&s.czData.callsFailed, 1)
+}
+
+func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
+       data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
        if err != nil {
                grpclog.Errorln("grpc: server failed to encode response: ", err)
                return err
        }
-       if len(p) > s.opts.maxSendMessageSize {
-               return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(p), s.opts.maxSendMessageSize)
+       compData, err := compress(data, cp, comp)
+       if err != nil {
+               grpclog.Errorln("grpc: server failed to compress response: ", err)
+               return err
+       }
+       hdr, payload := msgHeader(data, compData)
+       // TODO(dfawley): should we be checking len(data) instead?
+       if len(payload) > s.opts.maxSendMessageSize {
+               return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
        }
-       err = t.Write(stream, p, opts)
-       if err == nil && outPayload != nil {
-               outPayload.SentTime = time.Now()
-               s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
+       err = t.Write(stream, hdr, payload, opts)
+       if err == nil && s.opts.statsHandler != nil {
+               s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
        }
        return err
 }
 
 func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
+       if channelz.IsOn() {
+               s.incrCallsStarted()
+               defer func() {
+                       if err != nil && err != io.EOF {
+                               s.incrCallsFailed()
+                       } else {
+                               s.incrCallsSucceeded()
+                       }
+               }()
+       }
        sh := s.opts.statsHandler
        if sh != nil {
+               beginTime := time.Now()
                begin := &stats.Begin{
-                       BeginTime: time.Now(),
+                       BeginTime: beginTime,
                }
                sh.HandleRPC(stream.Context(), begin)
                defer func() {
                        end := &stats.End{
-                               EndTime: time.Now(),
+                               BeginTime: beginTime,
+                               EndTime:   time.Now(),
                        }
                        if err != nil && err != io.EOF {
                                end.Error = toRPCErr(err)
@@ -720,94 +863,112 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
                        }
                }()
        }
-       if s.opts.cp != nil {
-               // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
-               stream.SetSendCompress(s.opts.cp.Type())
+
+       binlog := binarylog.GetMethodLogger(stream.Method())
+       if binlog != nil {
+               ctx := stream.Context()
+               md, _ := metadata.FromIncomingContext(ctx)
+               logEntry := &binarylog.ClientHeader{
+                       Header:     md,
+                       MethodName: stream.Method(),
+                       PeerAddr:   nil,
+               }
+               if deadline, ok := ctx.Deadline(); ok {
+                       logEntry.Timeout = deadline.Sub(time.Now())
+                       if logEntry.Timeout < 0 {
+                               logEntry.Timeout = 0
+                       }
+               }
+               if a := md[":authority"]; len(a) > 0 {
+                       logEntry.Authority = a[0]
+               }
+               if peer, ok := peer.FromContext(ctx); ok {
+                       logEntry.PeerAddr = peer.Addr
+               }
+               binlog.Log(logEntry)
+       }
+
+       // comp and cp are used for compression.  decomp and dc are used for
+       // decompression.  If comp and decomp are both set, they are the same;
+       // however they are kept separate to ensure that at most one of the
+       // compressor/decompressor variable pairs are set for use later.
+       var comp, decomp encoding.Compressor
+       var cp Compressor
+       var dc Decompressor
+
+       // If dc is set and matches the stream's compression, use it.  Otherwise, try
+       // to find a matching registered compressor for decomp.
+       if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
+               dc = s.opts.dc
+       } else if rc != "" && rc != encoding.Identity {
+               decomp = encoding.GetCompressor(rc)
+               if decomp == nil {
+                       st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
+                       t.WriteStatus(stream, st)
+                       return st.Err()
+               }
        }
-       p := &parser{r: stream}
-       pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
-       if err == io.EOF {
-               // The entire stream is done (for unary RPC only).
-               return err
+
+       // If cp is set, use it.  Otherwise, attempt to compress the response using
+       // the incoming message compression method.
+       //
+       // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
+       if s.opts.cp != nil {
+               cp = s.opts.cp
+               stream.SetSendCompress(cp.Type())
+       } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
+               // Legacy compressor not specified; attempt to respond with same encoding.
+               comp = encoding.GetCompressor(rc)
+               if comp != nil {
+                       stream.SetSendCompress(rc)
+               }
        }
-       if err == io.ErrUnexpectedEOF {
-               err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
+
+       var payInfo *payloadInfo
+       if sh != nil || binlog != nil {
+               payInfo = &payloadInfo{}
        }
+       d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
        if err != nil {
                if st, ok := status.FromError(err); ok {
                        if e := t.WriteStatus(stream, st); e != nil {
                                grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
                        }
-               } else {
-                       switch st := err.(type) {
-                       case transport.ConnectionError:
-                               // Nothing to do here.
-                       case transport.StreamError:
-                               if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
-                                       grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
-                               }
-                       default:
-                               panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
-                       }
                }
                return err
        }
-
-       if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
-               if st, ok := status.FromError(err); ok {
-                       if e := t.WriteStatus(stream, st); e != nil {
-                               grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
-                       }
-                       return err
-               }
-               if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
-                       grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
-               }
-
-               // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
-       }
-       var inPayload *stats.InPayload
-       if sh != nil {
-               inPayload = &stats.InPayload{
-                       RecvTime: time.Now(),
-               }
+       if channelz.IsOn() {
+               t.IncrMsgRecv()
        }
        df := func(v interface{}) error {
-               if inPayload != nil {
-                       inPayload.WireLength = len(req)
-               }
-               if pf == compressionMade {
-                       var err error
-                       req, err = s.opts.dc.Do(bytes.NewReader(req))
-                       if err != nil {
-                               return Errorf(codes.Internal, err.Error())
-                       }
-               }
-               if len(req) > s.opts.maxReceiveMessageSize {
-                       // TODO: Revisit the error code. Currently keep it consistent with
-                       // java implementation.
-                       return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
-               }
-               if err := s.opts.codec.Unmarshal(req, v); err != nil {
+               if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
                        return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
                }
-               if inPayload != nil {
-                       inPayload.Payload = v
-                       inPayload.Data = req
-                       inPayload.Length = len(req)
-                       sh.HandleRPC(stream.Context(), inPayload)
+               if sh != nil {
+                       sh.HandleRPC(stream.Context(), &stats.InPayload{
+                               RecvTime: time.Now(),
+                               Payload:  v,
+                               Data:     d,
+                               Length:   len(d),
+                       })
+               }
+               if binlog != nil {
+                       binlog.Log(&binarylog.ClientMessage{
+                               Message: d,
+                       })
                }
                if trInfo != nil {
                        trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
                }
                return nil
        }
-       reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
+       ctx := NewContextWithServerTransportStream(stream.Context(), stream)
+       reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
        if appErr != nil {
                appStatus, ok := status.FromError(appErr)
                if !ok {
                        // Convert appErr if it is not a grpc status error.
-                       appErr = status.Error(convertCode(appErr), appErr.Error())
+                       appErr = status.Error(codes.Unknown, appErr.Error())
                        appStatus, _ = status.FromError(appErr)
                }
                if trInfo != nil {
@@ -817,16 +978,27 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
                if e := t.WriteStatus(stream, appStatus); e != nil {
                        grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
                }
+               if binlog != nil {
+                       if h, _ := stream.Header(); h.Len() > 0 {
+                               // Only log serverHeader if there was header. Otherwise it can
+                               // be trailer only.
+                               binlog.Log(&binarylog.ServerHeader{
+                                       Header: h,
+                               })
+                       }
+                       binlog.Log(&binarylog.ServerTrailer{
+                               Trailer: stream.Trailer(),
+                               Err:     appErr,
+                       })
+               }
                return appErr
        }
        if trInfo != nil {
                trInfo.tr.LazyLog(stringer("OK"), false)
        }
-       opts := &transport.Options{
-               Last:  true,
-               Delay: false,
-       }
-       if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
+       opts := &transport.Options{Last: true}
+
+       if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
                if err == io.EOF {
                        // The entire stream is done (for unary RPC only).
                        return err
@@ -839,35 +1011,72 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
                        switch st := err.(type) {
                        case transport.ConnectionError:
                                // Nothing to do here.
-                       case transport.StreamError:
-                               if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
-                                       grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
-                               }
                        default:
                                panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
                        }
                }
+               if binlog != nil {
+                       h, _ := stream.Header()
+                       binlog.Log(&binarylog.ServerHeader{
+                               Header: h,
+                       })
+                       binlog.Log(&binarylog.ServerTrailer{
+                               Trailer: stream.Trailer(),
+                               Err:     appErr,
+                       })
+               }
                return err
        }
+       if binlog != nil {
+               h, _ := stream.Header()
+               binlog.Log(&binarylog.ServerHeader{
+                       Header: h,
+               })
+               binlog.Log(&binarylog.ServerMessage{
+                       Message: reply,
+               })
+       }
+       if channelz.IsOn() {
+               t.IncrMsgSent()
+       }
        if trInfo != nil {
                trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
        }
        // TODO: Should we be logging if writing status failed here, like above?
        // Should the logging be in WriteStatus?  Should we ignore the WriteStatus
        // error or allow the stats handler to see it?
-       return t.WriteStatus(stream, status.New(codes.OK, ""))
+       err = t.WriteStatus(stream, status.New(codes.OK, ""))
+       if binlog != nil {
+               binlog.Log(&binarylog.ServerTrailer{
+                       Trailer: stream.Trailer(),
+                       Err:     appErr,
+               })
+       }
+       return err
 }
 
 func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
+       if channelz.IsOn() {
+               s.incrCallsStarted()
+               defer func() {
+                       if err != nil && err != io.EOF {
+                               s.incrCallsFailed()
+                       } else {
+                               s.incrCallsSucceeded()
+                       }
+               }()
+       }
        sh := s.opts.statsHandler
        if sh != nil {
+               beginTime := time.Now()
                begin := &stats.Begin{
-                       BeginTime: time.Now(),
+                       BeginTime: beginTime,
                }
                sh.HandleRPC(stream.Context(), begin)
                defer func() {
                        end := &stats.End{
-                               EndTime: time.Now(),
+                               BeginTime: beginTime,
+                               EndTime:   time.Now(),
                        }
                        if err != nil && err != io.EOF {
                                end.Error = toRPCErr(err)
@@ -875,24 +1084,70 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
                        sh.HandleRPC(stream.Context(), end)
                }()
        }
-       if s.opts.cp != nil {
-               stream.SetSendCompress(s.opts.cp.Type())
-       }
+       ctx := NewContextWithServerTransportStream(stream.Context(), stream)
        ss := &serverStream{
-               t:     t,
-               s:     stream,
-               p:     &parser{r: stream},
-               codec: s.opts.codec,
-               cp:    s.opts.cp,
-               dc:    s.opts.dc,
+               ctx:                   ctx,
+               t:                     t,
+               s:                     stream,
+               p:                     &parser{r: stream},
+               codec:                 s.getCodec(stream.ContentSubtype()),
                maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
                maxSendMessageSize:    s.opts.maxSendMessageSize,
                trInfo:                trInfo,
                statsHandler:          sh,
        }
-       if ss.cp != nil {
-               ss.cbuf = new(bytes.Buffer)
+
+       ss.binlog = binarylog.GetMethodLogger(stream.Method())
+       if ss.binlog != nil {
+               md, _ := metadata.FromIncomingContext(ctx)
+               logEntry := &binarylog.ClientHeader{
+                       Header:     md,
+                       MethodName: stream.Method(),
+                       PeerAddr:   nil,
+               }
+               if deadline, ok := ctx.Deadline(); ok {
+                       logEntry.Timeout = deadline.Sub(time.Now())
+                       if logEntry.Timeout < 0 {
+                               logEntry.Timeout = 0
+                       }
+               }
+               if a := md[":authority"]; len(a) > 0 {
+                       logEntry.Authority = a[0]
+               }
+               if peer, ok := peer.FromContext(ss.Context()); ok {
+                       logEntry.PeerAddr = peer.Addr
+               }
+               ss.binlog.Log(logEntry)
+       }
+
+       // If dc is set and matches the stream's compression, use it.  Otherwise, try
+       // to find a matching registered compressor for decomp.
+       if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
+               ss.dc = s.opts.dc
+       } else if rc != "" && rc != encoding.Identity {
+               ss.decomp = encoding.GetCompressor(rc)
+               if ss.decomp == nil {
+                       st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
+                       t.WriteStatus(ss.s, st)
+                       return st.Err()
+               }
+       }
+
+       // If cp is set, use it.  Otherwise, attempt to compress the response using
+       // the incoming message compression method.
+       //
+       // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
+       if s.opts.cp != nil {
+               ss.cp = s.opts.cp
+               stream.SetSendCompress(s.opts.cp.Type())
+       } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
+               // Legacy compressor not specified; attempt to respond with same encoding.
+               ss.comp = encoding.GetCompressor(rc)
+               if ss.comp != nil {
+                       stream.SetSendCompress(rc)
+               }
        }
+
        if trInfo != nil {
                trInfo.tr.LazyLog(&trInfo.firstLine, false)
                defer func() {
@@ -924,12 +1179,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
        if appErr != nil {
                appStatus, ok := status.FromError(appErr)
                if !ok {
-                       switch err := appErr.(type) {
-                       case transport.StreamError:
-                               appStatus = status.New(err.Code, err.Desc)
-                       default:
-                               appStatus = status.New(convertCode(appErr), appErr.Error())
-                       }
+                       appStatus = status.New(codes.Unknown, appErr.Error())
                        appErr = appStatus.Err()
                }
                if trInfo != nil {
@@ -939,6 +1189,12 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
                        ss.mu.Unlock()
                }
                t.WriteStatus(ss.s, appStatus)
+               if ss.binlog != nil {
+                       ss.binlog.Log(&binarylog.ServerTrailer{
+                               Trailer: ss.s.Trailer(),
+                               Err:     appErr,
+                       })
+               }
                // TODO: Should we log an error from WriteStatus here and below?
                return appErr
        }
@@ -947,8 +1203,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
                ss.trInfo.tr.LazyLog(stringer("OK"), false)
                ss.mu.Unlock()
        }
-       return t.WriteStatus(ss.s, status.New(codes.OK, ""))
-
+       err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
+       if ss.binlog != nil {
+               ss.binlog.Log(&binarylog.ServerTrailer{
+                       Trailer: ss.s.Trailer(),
+                       Err:     appErr,
+               })
+       }
+       return err
 }
 
 func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
@@ -977,47 +1239,27 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
        }
        service := sm[:pos]
        method := sm[pos+1:]
-       srv, ok := s.m[service]
-       if !ok {
-               if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
-                       s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
+
+       if srv, ok := s.m[service]; ok {
+               if md, ok := srv.md[method]; ok {
+                       s.processUnaryRPC(t, stream, srv, md, trInfo)
                        return
                }
-               if trInfo != nil {
-                       trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
-                       trInfo.tr.SetError()
-               }
-               errDesc := fmt.Sprintf("unknown service %v", service)
-               if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
-                       if trInfo != nil {
-                               trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
-                               trInfo.tr.SetError()
-                       }
-                       grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
-               }
-               if trInfo != nil {
-                       trInfo.tr.Finish()
+               if sd, ok := srv.sd[method]; ok {
+                       s.processStreamingRPC(t, stream, srv, sd, trInfo)
+                       return
                }
-               return
-       }
-       // Unary RPC or Streaming RPC?
-       if md, ok := srv.md[method]; ok {
-               s.processUnaryRPC(t, stream, srv, md, trInfo)
-               return
        }
-       if sd, ok := srv.sd[method]; ok {
-               s.processStreamingRPC(t, stream, srv, sd, trInfo)
+       // Unknown service, or known server unknown method.
+       if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
+               s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
                return
        }
        if trInfo != nil {
-               trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
+               trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
                trInfo.tr.SetError()
        }
-       if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
-               s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
-               return
-       }
-       errDesc := fmt.Sprintf("unknown method %v", method)
+       errDesc := fmt.Sprintf("unknown service %v", service)
        if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
                if trInfo != nil {
                        trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
@@ -1030,12 +1272,65 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
        }
 }
 
+// The key to save ServerTransportStream in the context.
+type streamKey struct{}
+
+// NewContextWithServerTransportStream creates a new context from ctx and
+// attaches stream to it.
+//
+// This API is EXPERIMENTAL.
+func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
+       return context.WithValue(ctx, streamKey{}, stream)
+}
+
+// ServerTransportStream is a minimal interface that a transport stream must
+// implement. This can be used to mock an actual transport stream for tests of
+// handler code that use, for example, grpc.SetHeader (which requires some
+// stream to be in context).
+//
+// See also NewContextWithServerTransportStream.
+//
+// This API is EXPERIMENTAL.
+type ServerTransportStream interface {
+       Method() string
+       SetHeader(md metadata.MD) error
+       SendHeader(md metadata.MD) error
+       SetTrailer(md metadata.MD) error
+}
+
+// ServerTransportStreamFromContext returns the ServerTransportStream saved in
+// ctx. Returns nil if the given context has no stream associated with it
+// (which implies it is not an RPC invocation context).
+//
+// This API is EXPERIMENTAL.
+func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
+       s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
+       return s
+}
+
 // Stop stops the gRPC server. It immediately closes all open
 // connections and listeners.
 // It cancels all active RPCs on the server side and the corresponding
 // pending RPCs on the client side will get notified by connection
 // errors.
 func (s *Server) Stop() {
+       s.quitOnce.Do(func() {
+               close(s.quit)
+       })
+
+       defer func() {
+               s.serveWG.Wait()
+               s.doneOnce.Do(func() {
+                       close(s.done)
+               })
+       }()
+
+       s.channelzRemoveOnce.Do(func() {
+               if channelz.IsOn() {
+                       channelz.RemoveEntry(s.channelzID)
+               }
+       })
+
        s.mu.Lock()
        listeners := s.lis
        s.lis = nil
@@ -1053,7 +1348,6 @@ func (s *Server) Stop() {
        }
 
        s.mu.Lock()
-       s.cancel()
        if s.events != nil {
                s.events.Finish()
                s.events = nil
@@ -1065,22 +1359,44 @@ func (s *Server) Stop() {
 // accepting new connections and RPCs and blocks until all the pending RPCs are
 // finished.
 func (s *Server) GracefulStop() {
+       s.quitOnce.Do(func() {
+               close(s.quit)
+       })
+
+       defer func() {
+               s.doneOnce.Do(func() {
+                       close(s.done)
+               })
+       }()
+
+       s.channelzRemoveOnce.Do(func() {
+               if channelz.IsOn() {
+                       channelz.RemoveEntry(s.channelzID)
+               }
+       })
        s.mu.Lock()
-       defer s.mu.Unlock()
        if s.conns == nil {
+               s.mu.Unlock()
                return
        }
+
        for lis := range s.lis {
                lis.Close()
        }
        s.lis = nil
-       s.cancel()
        if !s.drain {
                for c := range s.conns {
                        c.(transport.ServerTransport).Drain()
                }
                s.drain = true
        }
+
+       // Wait for serving threads to be ready to exit.  Only then can we be sure no
+       // new conns will be created.
+       s.mu.Unlock()
+       s.serveWG.Wait()
+       s.mu.Lock()
+
        for len(s.conns) != 0 {
                s.cv.Wait()
        }
@@ -1089,26 +1405,23 @@ func (s *Server) GracefulStop() {
                s.events.Finish()
                s.events = nil
        }
+       s.mu.Unlock()
 }
 
-func init() {
-       internal.TestingCloseConns = func(arg interface{}) {
-               arg.(*Server).testingCloseConns()
+// contentSubtype must be lowercase
+// cannot return nil
+func (s *Server) getCodec(contentSubtype string) baseCodec {
+       if s.opts.codec != nil {
+               return s.opts.codec
        }
-       internal.TestingUseHandlerImpl = func(arg interface{}) {
-               arg.(*Server).opts.useHandlerImpl = true
+       if contentSubtype == "" {
+               return encoding.GetCodec(proto.Name)
        }
-}
-
-// testingCloseConns closes all existing transports but keeps s.lis
-// accepting new connections.
-func (s *Server) testingCloseConns() {
-       s.mu.Lock()
-       for c := range s.conns {
-               c.Close()
-               delete(s.conns, c)
+       codec := encoding.GetCodec(contentSubtype)
+       if codec == nil {
+               return encoding.GetCodec(proto.Name)
        }
-       s.mu.Unlock()
+       return codec
 }
 
 // SetHeader sets the header metadata.
@@ -1121,9 +1434,9 @@ func SetHeader(ctx context.Context, md metadata.MD) error {
        if md.Len() == 0 {
                return nil
        }
-       stream, ok := transport.StreamFromContext(ctx)
-       if !ok {
-               return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+       stream := ServerTransportStreamFromContext(ctx)
+       if stream == nil {
+               return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
        }
        return stream.SetHeader(md)
 }
@@ -1131,15 +1444,11 @@ func SetHeader(ctx context.Context, md metadata.MD) error {
 // SendHeader sends header metadata. It may be called at most once.
 // The provided md and headers set by SetHeader() will be sent.
 func SendHeader(ctx context.Context, md metadata.MD) error {
-       stream, ok := transport.StreamFromContext(ctx)
-       if !ok {
-               return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+       stream := ServerTransportStreamFromContext(ctx)
+       if stream == nil {
+               return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
        }
-       t := stream.ServerTransport()
-       if t == nil {
-               grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
-       }
-       if err := t.WriteHeader(stream, md); err != nil {
+       if err := stream.SendHeader(md); err != nil {
                return toRPCErr(err)
        }
        return nil
@@ -1151,9 +1460,27 @@ func SetTrailer(ctx context.Context, md metadata.MD) error {
        if md.Len() == 0 {
                return nil
        }
-       stream, ok := transport.StreamFromContext(ctx)
-       if !ok {
-               return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+       stream := ServerTransportStreamFromContext(ctx)
+       if stream == nil {
+               return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
        }
        return stream.SetTrailer(md)
 }
+
+// Method returns the method string for the server context.  The returned
+// string is in the format of "/service/method".
+func Method(ctx context.Context) (string, bool) {
+       s := ServerTransportStreamFromContext(ctx)
+       if s == nil {
+               return "", false
+       }
+       return s.Method(), true
+}
+
+type channelzServer struct {
+       s *Server
+}
+
+func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
+       return c.s.channelzMetric()
+}