package grpc
import (
- "bytes"
+ "context"
"errors"
"fmt"
"io"
+ "math"
"net"
"net/http"
"reflect"
"runtime"
"strings"
"sync"
+ "sync/atomic"
"time"
- "golang.org/x/net/context"
- "golang.org/x/net/http2"
"golang.org/x/net/trace"
+
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/encoding"
+ "google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/grpclog"
- "google.golang.org/grpc/internal"
+ "google.golang.org/grpc/internal/binarylog"
+ "google.golang.org/grpc/internal/channelz"
+ "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
+ "google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
- "google.golang.org/grpc/transport"
)
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
- defaultServerMaxSendMessageSize = 1024 * 1024 * 4
+ defaultServerMaxSendMessageSize = math.MaxInt32
)
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
conns map[io.Closer]bool
serve bool
drain bool
- ctx context.Context
- cancel context.CancelFunc
- // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
- // and all the transport goes away.
- cv *sync.Cond
+ cv *sync.Cond // signaled when connections close for GracefulStop
m map[string]*service // service name -> service info
events trace.EventLog
+
+ quit chan struct{}
+ done chan struct{}
+ quitOnce sync.Once
+ doneOnce sync.Once
+ channelzRemoveOnce sync.Once
+ serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
+
+ channelzID int64 // channelz unique identification number
+ czData *channelzData
}
type options struct {
creds credentials.TransportCredentials
- codec Codec
+ codec baseCodec
cp Compressor
dc Decompressor
unaryInt UnaryServerInterceptor
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
- useHandlerImpl bool // use http.Handler-based server
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
initialWindowSize int32
initialConnWindowSize int32
+ writeBufferSize int
+ readBufferSize int
+ connectionTimeout time.Duration
+ maxHeaderListSize *uint32
}
var defaultServerOptions = options{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
+ connectionTimeout: 120 * time.Second,
+ writeBufferSize: defaultWriteBufSize,
+ readBufferSize: defaultReadBufSize,
}
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
type ServerOption func(*options)
+// WriteBufferSize determines how much data can be batched before doing a write on the wire.
+// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
+// The default value for this buffer is 32KB.
+// Zero will disable the write buffer such that each write will be on underlying connection.
+// Note: A Send call may not directly translate to a write.
+func WriteBufferSize(s int) ServerOption {
+ return func(o *options) {
+ o.writeBufferSize = s
+ }
+}
+
+// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
+// for one read syscall.
+// The default value for this buffer is 32KB.
+// Zero will disable read buffer for a connection so data framer can access the underlying
+// conn directly.
+func ReadBufferSize(s int) ServerOption {
+ return func(o *options) {
+ o.readBufferSize = s
+ }
+}
+
// InitialWindowSize returns a ServerOption that sets window size for stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func InitialWindowSize(s int32) ServerOption {
}
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
+//
+// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
func CustomCodec(codec Codec) ServerOption {
return func(o *options) {
o.codec = codec
}
}
-// RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
+// RPCCompressor returns a ServerOption that sets a compressor for outbound
+// messages. For backward compatibility, all outbound messages will be sent
+// using this compressor, regardless of incoming message compression. By
+// default, server messages will be sent using the same compressor with which
+// request messages were sent.
+//
+// Deprecated: use encoding.RegisterCompressor instead.
func RPCCompressor(cp Compressor) ServerOption {
return func(o *options) {
o.cp = cp
}
}
-// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
+// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
+// messages. It has higher priority than decompressors registered via
+// encoding.RegisterCompressor.
+//
+// Deprecated: use encoding.RegisterCompressor instead.
func RPCDecompressor(dc Decompressor) ServerOption {
return func(o *options) {
o.dc = dc
}
// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
-// If this is not set, gRPC uses the default limit. Deprecated: use MaxRecvMsgSize instead.
+// If this is not set, gRPC uses the default limit.
+//
+// Deprecated: use MaxRecvMsgSize instead.
func MaxMsgSize(m int) ServerOption {
return MaxRecvMsgSize(m)
}
// handler that will be invoked instead of returning the "unimplemented" gRPC
// error whenever a request is received for an unregistered service or method.
// The handling function has full access to the Context of the request and the
-// stream, and the invocation passes through interceptors.
+// stream, and the invocation bypasses interceptors.
func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
return func(o *options) {
o.unknownStreamDesc = &StreamDesc{
}
}
+// ConnectionTimeout returns a ServerOption that sets the timeout for
+// connection establishment (up to and including HTTP/2 handshaking) for all
+// new connections. If this is not set, the default is 120 seconds. A zero or
+// negative value will result in an immediate timeout.
+//
+// This API is EXPERIMENTAL.
+func ConnectionTimeout(d time.Duration) ServerOption {
+ return func(o *options) {
+ o.connectionTimeout = d
+ }
+}
+
+// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
+// of header list that the server is prepared to accept.
+func MaxHeaderListSize(s uint32) ServerOption {
+ return func(o *options) {
+ o.maxHeaderListSize = &s
+ }
+}
+
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
for _, o := range opt {
o(&opts)
}
- if opts.codec == nil {
- // Set the default codec.
- opts.codec = protoCodec{}
- }
s := &Server{
- lis: make(map[net.Listener]bool),
- opts: opts,
- conns: make(map[io.Closer]bool),
- m: make(map[string]*service),
+ lis: make(map[net.Listener]bool),
+ opts: opts,
+ conns: make(map[io.Closer]bool),
+ m: make(map[string]*service),
+ quit: make(chan struct{}),
+ done: make(chan struct{}),
+ czData: new(channelzData),
}
s.cv = sync.NewCond(&s.mu)
- s.ctx, s.cancel = context.WithCancel(context.Background())
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
}
+
+ if channelz.IsOn() {
+ s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
+ }
return s
}
return ret
}
-var (
- // ErrServerStopped indicates that the operation is now illegal because of
- // the server being stopped.
- ErrServerStopped = errors.New("grpc: the server has been stopped")
-)
+// ErrServerStopped indicates that the operation is now illegal because of
+// the server being stopped.
+var ErrServerStopped = errors.New("grpc: the server has been stopped")
func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
if s.opts.creds == nil {
return s.opts.creds.ServerHandshake(rawConn)
}
+type listenSocket struct {
+ net.Listener
+ channelzID int64
+}
+
+func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
+ return &channelz.SocketInternalMetric{
+ SocketOptions: channelz.GetSocketOption(l.Listener),
+ LocalAddr: l.Listener.Addr(),
+ }
+}
+
+func (l *listenSocket) Close() error {
+ err := l.Listener.Close()
+ if channelz.IsOn() {
+ channelz.RemoveEntry(l.channelzID)
+ }
+ return err
+}
+
// Serve accepts incoming connections on the listener lis, creating a new
// ServerTransport and service goroutine for each. The service goroutines
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
-// Serve always returns non-nil error.
+// Serve will return a non-nil error unless Stop or GracefulStop is called.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
s.serve = true
if s.lis == nil {
+ // Serve called after Stop or GracefulStop.
s.mu.Unlock()
lis.Close()
return ErrServerStopped
}
- s.lis[lis] = true
+
+ s.serveWG.Add(1)
+ defer func() {
+ s.serveWG.Done()
+ select {
+ // Stop or GracefulStop called; block until done and return nil.
+ case <-s.quit:
+ <-s.done
+ default:
+ }
+ }()
+
+ ls := &listenSocket{Listener: lis}
+ s.lis[ls] = true
+
+ if channelz.IsOn() {
+ ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
+ }
s.mu.Unlock()
+
defer func() {
s.mu.Lock()
- if s.lis != nil && s.lis[lis] {
- lis.Close()
- delete(s.lis, lis)
+ if s.lis != nil && s.lis[ls] {
+ ls.Close()
+ delete(s.lis, ls)
}
s.mu.Unlock()
}()
timer := time.NewTimer(tempDelay)
select {
case <-timer.C:
- case <-s.ctx.Done():
+ case <-s.quit:
+ timer.Stop()
+ return nil
}
- timer.Stop()
continue
}
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
+
+ select {
+ case <-s.quit:
+ return nil
+ default:
+ }
return err
}
tempDelay = 0
- // Start a new goroutine to deal with rawConn
- // so we don't stall this Accept loop goroutine.
- go s.handleRawConn(rawConn)
+ // Start a new goroutine to deal with rawConn so we don't stall this Accept
+ // loop goroutine.
+ //
+ // Make sure we account for the goroutine so GracefulStop doesn't nil out
+ // s.conns before this conn can be added.
+ s.serveWG.Add(1)
+ go func() {
+ s.handleRawConn(rawConn)
+ s.serveWG.Done()
+ }()
}
}
-// handleRawConn is run in its own goroutine and handles a just-accepted
-// connection that has not had any I/O performed on it yet.
+// handleRawConn forks a goroutine to handle a just-accepted connection that
+// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
+ rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
- // If serverHandShake returns ErrConnDispatched, keep rawConn open.
+ // If serverHandshake returns ErrConnDispatched, keep rawConn open.
if err != credentials.ErrConnDispatched {
rawConn.Close()
}
+ rawConn.SetDeadline(time.Time{})
return
}
}
s.mu.Unlock()
- if s.opts.useHandlerImpl {
- s.serveUsingHandler(conn)
- } else {
- s.serveHTTP2Transport(conn, authInfo)
+ // Finish handshaking (HTTP2)
+ st := s.newHTTP2Transport(conn, authInfo)
+ if st == nil {
+ return
+ }
+
+ rawConn.SetDeadline(time.Time{})
+ if !s.addConn(st) {
+ return
}
+ go func() {
+ s.serveStreams(st)
+ s.removeConn(st)
+ }()
}
-// serveHTTP2Transport sets up a http/2 transport (using the
-// gRPC http2 server transport in transport/http2_server.go) and
-// serves streams on it.
-// This is run in its own goroutine (it does network I/O in
-// transport.NewServerTransport).
-func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
+// newHTTP2Transport sets up a http/2 transport (using the
+// gRPC http2 server transport in transport/http2_server.go).
+func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
config := &transport.ServerConfig{
MaxStreams: s.opts.maxConcurrentStreams,
AuthInfo: authInfo,
KeepalivePolicy: s.opts.keepalivePolicy,
InitialWindowSize: s.opts.initialWindowSize,
InitialConnWindowSize: s.opts.initialConnWindowSize,
+ WriteBufferSize: s.opts.writeBufferSize,
+ ReadBufferSize: s.opts.readBufferSize,
+ ChannelzParentID: s.channelzID,
+ MaxHeaderListSize: s.opts.maxHeaderListSize,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {
s.mu.Unlock()
c.Close()
grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
- return
- }
- if !s.addConn(st) {
- st.Close()
- return
+ return nil
}
- s.serveStreams(st)
+
+ return st
}
func (s *Server) serveStreams(st transport.ServerTransport) {
- defer s.removeConn(st)
defer st.Close()
var wg sync.WaitGroup
st.HandleStreams(func(stream *transport.Stream) {
var _ http.Handler = (*Server)(nil)
-// serveUsingHandler is called from handleRawConn when s is configured
-// to handle requests via the http.Handler interface. It sets up a
-// net/http.Server to handle the just-accepted conn. The http.Server
-// is configured to route all incoming requests (all HTTP/2 streams)
-// to ServeHTTP, which creates a new ServerTransport for each stream.
-// serveUsingHandler blocks until conn closes.
-//
-// This codepath is only used when Server.TestingUseHandlerImpl has
-// been configured. This lets the end2end tests exercise the ServeHTTP
-// method as one of the environment types.
-//
-// conn is the *tls.Conn that's already been authenticated.
-func (s *Server) serveUsingHandler(conn net.Conn) {
- if !s.addConn(conn) {
- conn.Close()
- return
- }
- defer s.removeConn(conn)
- h2s := &http2.Server{
- MaxConcurrentStreams: s.opts.maxConcurrentStreams,
- }
- h2s.ServeConn(conn, &http2.ServeConnOpts{
- Handler: s,
- })
-}
-
// ServeHTTP implements the Go standard library's http.Handler
// interface by responding to the gRPC request r, by looking up
// the requested gRPC method in the gRPC server s.
// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
// and subject to change.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- st, err := transport.NewServerHandlerTransport(w, r)
+ st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !s.addConn(st) {
- st.Close()
return
}
defer s.removeConn(st)
func (s *Server) addConn(c io.Closer) bool {
s.mu.Lock()
defer s.mu.Unlock()
- if s.conns == nil || s.drain {
+ if s.conns == nil {
+ c.Close()
return false
}
+ if s.drain {
+ // Transport added after we drained our existing conns: drain it
+ // immediately.
+ c.(transport.ServerTransport).Drain()
+ }
s.conns[c] = true
return true
}
}
}
-func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
- var (
- cbuf *bytes.Buffer
- outPayload *stats.OutPayload
- )
- if cp != nil {
- cbuf = new(bytes.Buffer)
+func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
+ return &channelz.ServerInternalMetric{
+ CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
+ CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
+ CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
+ LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
}
- if s.opts.statsHandler != nil {
- outPayload = &stats.OutPayload{}
- }
- p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
+}
+
+func (s *Server) incrCallsStarted() {
+ atomic.AddInt64(&s.czData.callsStarted, 1)
+ atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
+}
+
+func (s *Server) incrCallsSucceeded() {
+ atomic.AddInt64(&s.czData.callsSucceeded, 1)
+}
+
+func (s *Server) incrCallsFailed() {
+ atomic.AddInt64(&s.czData.callsFailed, 1)
+}
+
+func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
+ data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
if err != nil {
grpclog.Errorln("grpc: server failed to encode response: ", err)
return err
}
- if len(p) > s.opts.maxSendMessageSize {
- return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(p), s.opts.maxSendMessageSize)
+ compData, err := compress(data, cp, comp)
+ if err != nil {
+ grpclog.Errorln("grpc: server failed to compress response: ", err)
+ return err
+ }
+ hdr, payload := msgHeader(data, compData)
+ // TODO(dfawley): should we be checking len(data) instead?
+ if len(payload) > s.opts.maxSendMessageSize {
+ return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
}
- err = t.Write(stream, p, opts)
- if err == nil && outPayload != nil {
- outPayload.SentTime = time.Now()
- s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
+ err = t.Write(stream, hdr, payload, opts)
+ if err == nil && s.opts.statsHandler != nil {
+ s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
}
return err
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
+ if channelz.IsOn() {
+ s.incrCallsStarted()
+ defer func() {
+ if err != nil && err != io.EOF {
+ s.incrCallsFailed()
+ } else {
+ s.incrCallsSucceeded()
+ }
+ }()
+ }
sh := s.opts.statsHandler
if sh != nil {
+ beginTime := time.Now()
begin := &stats.Begin{
- BeginTime: time.Now(),
+ BeginTime: beginTime,
}
sh.HandleRPC(stream.Context(), begin)
defer func() {
end := &stats.End{
- EndTime: time.Now(),
+ BeginTime: beginTime,
+ EndTime: time.Now(),
}
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
}
}()
}
- if s.opts.cp != nil {
- // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
- stream.SetSendCompress(s.opts.cp.Type())
+
+ binlog := binarylog.GetMethodLogger(stream.Method())
+ if binlog != nil {
+ ctx := stream.Context()
+ md, _ := metadata.FromIncomingContext(ctx)
+ logEntry := &binarylog.ClientHeader{
+ Header: md,
+ MethodName: stream.Method(),
+ PeerAddr: nil,
+ }
+ if deadline, ok := ctx.Deadline(); ok {
+ logEntry.Timeout = deadline.Sub(time.Now())
+ if logEntry.Timeout < 0 {
+ logEntry.Timeout = 0
+ }
+ }
+ if a := md[":authority"]; len(a) > 0 {
+ logEntry.Authority = a[0]
+ }
+ if peer, ok := peer.FromContext(ctx); ok {
+ logEntry.PeerAddr = peer.Addr
+ }
+ binlog.Log(logEntry)
+ }
+
+ // comp and cp are used for compression. decomp and dc are used for
+ // decompression. If comp and decomp are both set, they are the same;
+ // however they are kept separate to ensure that at most one of the
+ // compressor/decompressor variable pairs are set for use later.
+ var comp, decomp encoding.Compressor
+ var cp Compressor
+ var dc Decompressor
+
+ // If dc is set and matches the stream's compression, use it. Otherwise, try
+ // to find a matching registered compressor for decomp.
+ if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
+ dc = s.opts.dc
+ } else if rc != "" && rc != encoding.Identity {
+ decomp = encoding.GetCompressor(rc)
+ if decomp == nil {
+ st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
+ t.WriteStatus(stream, st)
+ return st.Err()
+ }
}
- p := &parser{r: stream}
- pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
- if err == io.EOF {
- // The entire stream is done (for unary RPC only).
- return err
+
+ // If cp is set, use it. Otherwise, attempt to compress the response using
+ // the incoming message compression method.
+ //
+ // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
+ if s.opts.cp != nil {
+ cp = s.opts.cp
+ stream.SetSendCompress(cp.Type())
+ } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
+ // Legacy compressor not specified; attempt to respond with same encoding.
+ comp = encoding.GetCompressor(rc)
+ if comp != nil {
+ stream.SetSendCompress(rc)
+ }
}
- if err == io.ErrUnexpectedEOF {
- err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
+
+ var payInfo *payloadInfo
+ if sh != nil || binlog != nil {
+ payInfo = &payloadInfo{}
}
+ d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
if err != nil {
if st, ok := status.FromError(err); ok {
if e := t.WriteStatus(stream, st); e != nil {
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
- } else {
- switch st := err.(type) {
- case transport.ConnectionError:
- // Nothing to do here.
- case transport.StreamError:
- if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
- grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- default:
- panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
- }
}
return err
}
-
- if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
- if st, ok := status.FromError(err); ok {
- if e := t.WriteStatus(stream, st); e != nil {
- grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- return err
- }
- if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
- grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
-
- // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
- }
- var inPayload *stats.InPayload
- if sh != nil {
- inPayload = &stats.InPayload{
- RecvTime: time.Now(),
- }
+ if channelz.IsOn() {
+ t.IncrMsgRecv()
}
df := func(v interface{}) error {
- if inPayload != nil {
- inPayload.WireLength = len(req)
- }
- if pf == compressionMade {
- var err error
- req, err = s.opts.dc.Do(bytes.NewReader(req))
- if err != nil {
- return Errorf(codes.Internal, err.Error())
- }
- }
- if len(req) > s.opts.maxReceiveMessageSize {
- // TODO: Revisit the error code. Currently keep it consistent with
- // java implementation.
- return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
- }
- if err := s.opts.codec.Unmarshal(req, v); err != nil {
+ if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
- if inPayload != nil {
- inPayload.Payload = v
- inPayload.Data = req
- inPayload.Length = len(req)
- sh.HandleRPC(stream.Context(), inPayload)
+ if sh != nil {
+ sh.HandleRPC(stream.Context(), &stats.InPayload{
+ RecvTime: time.Now(),
+ Payload: v,
+ Data: d,
+ Length: len(d),
+ })
+ }
+ if binlog != nil {
+ binlog.Log(&binarylog.ClientMessage{
+ Message: d,
+ })
}
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
}
return nil
}
- reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt)
+ ctx := NewContextWithServerTransportStream(stream.Context(), stream)
+ reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
if appErr != nil {
appStatus, ok := status.FromError(appErr)
if !ok {
// Convert appErr if it is not a grpc status error.
- appErr = status.Error(convertCode(appErr), appErr.Error())
+ appErr = status.Error(codes.Unknown, appErr.Error())
appStatus, _ = status.FromError(appErr)
}
if trInfo != nil {
if e := t.WriteStatus(stream, appStatus); e != nil {
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
}
+ if binlog != nil {
+ if h, _ := stream.Header(); h.Len() > 0 {
+ // Only log serverHeader if there was header. Otherwise it can
+ // be trailer only.
+ binlog.Log(&binarylog.ServerHeader{
+ Header: h,
+ })
+ }
+ binlog.Log(&binarylog.ServerTrailer{
+ Trailer: stream.Trailer(),
+ Err: appErr,
+ })
+ }
return appErr
}
if trInfo != nil {
trInfo.tr.LazyLog(stringer("OK"), false)
}
- opts := &transport.Options{
- Last: true,
- Delay: false,
- }
- if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
+ opts := &transport.Options{Last: true}
+
+ if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
switch st := err.(type) {
case transport.ConnectionError:
// Nothing to do here.
- case transport.StreamError:
- if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
- grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
}
}
+ if binlog != nil {
+ h, _ := stream.Header()
+ binlog.Log(&binarylog.ServerHeader{
+ Header: h,
+ })
+ binlog.Log(&binarylog.ServerTrailer{
+ Trailer: stream.Trailer(),
+ Err: appErr,
+ })
+ }
return err
}
+ if binlog != nil {
+ h, _ := stream.Header()
+ binlog.Log(&binarylog.ServerHeader{
+ Header: h,
+ })
+ binlog.Log(&binarylog.ServerMessage{
+ Message: reply,
+ })
+ }
+ if channelz.IsOn() {
+ t.IncrMsgSent()
+ }
if trInfo != nil {
trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
}
// TODO: Should we be logging if writing status failed here, like above?
// Should the logging be in WriteStatus? Should we ignore the WriteStatus
// error or allow the stats handler to see it?
- return t.WriteStatus(stream, status.New(codes.OK, ""))
+ err = t.WriteStatus(stream, status.New(codes.OK, ""))
+ if binlog != nil {
+ binlog.Log(&binarylog.ServerTrailer{
+ Trailer: stream.Trailer(),
+ Err: appErr,
+ })
+ }
+ return err
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
+ if channelz.IsOn() {
+ s.incrCallsStarted()
+ defer func() {
+ if err != nil && err != io.EOF {
+ s.incrCallsFailed()
+ } else {
+ s.incrCallsSucceeded()
+ }
+ }()
+ }
sh := s.opts.statsHandler
if sh != nil {
+ beginTime := time.Now()
begin := &stats.Begin{
- BeginTime: time.Now(),
+ BeginTime: beginTime,
}
sh.HandleRPC(stream.Context(), begin)
defer func() {
end := &stats.End{
- EndTime: time.Now(),
+ BeginTime: beginTime,
+ EndTime: time.Now(),
}
if err != nil && err != io.EOF {
end.Error = toRPCErr(err)
sh.HandleRPC(stream.Context(), end)
}()
}
- if s.opts.cp != nil {
- stream.SetSendCompress(s.opts.cp.Type())
- }
+ ctx := NewContextWithServerTransportStream(stream.Context(), stream)
ss := &serverStream{
- t: t,
- s: stream,
- p: &parser{r: stream},
- codec: s.opts.codec,
- cp: s.opts.cp,
- dc: s.opts.dc,
+ ctx: ctx,
+ t: t,
+ s: stream,
+ p: &parser{r: stream},
+ codec: s.getCodec(stream.ContentSubtype()),
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize,
trInfo: trInfo,
statsHandler: sh,
}
- if ss.cp != nil {
- ss.cbuf = new(bytes.Buffer)
+
+ ss.binlog = binarylog.GetMethodLogger(stream.Method())
+ if ss.binlog != nil {
+ md, _ := metadata.FromIncomingContext(ctx)
+ logEntry := &binarylog.ClientHeader{
+ Header: md,
+ MethodName: stream.Method(),
+ PeerAddr: nil,
+ }
+ if deadline, ok := ctx.Deadline(); ok {
+ logEntry.Timeout = deadline.Sub(time.Now())
+ if logEntry.Timeout < 0 {
+ logEntry.Timeout = 0
+ }
+ }
+ if a := md[":authority"]; len(a) > 0 {
+ logEntry.Authority = a[0]
+ }
+ if peer, ok := peer.FromContext(ss.Context()); ok {
+ logEntry.PeerAddr = peer.Addr
+ }
+ ss.binlog.Log(logEntry)
+ }
+
+ // If dc is set and matches the stream's compression, use it. Otherwise, try
+ // to find a matching registered compressor for decomp.
+ if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
+ ss.dc = s.opts.dc
+ } else if rc != "" && rc != encoding.Identity {
+ ss.decomp = encoding.GetCompressor(rc)
+ if ss.decomp == nil {
+ st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
+ t.WriteStatus(ss.s, st)
+ return st.Err()
+ }
+ }
+
+ // If cp is set, use it. Otherwise, attempt to compress the response using
+ // the incoming message compression method.
+ //
+ // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
+ if s.opts.cp != nil {
+ ss.cp = s.opts.cp
+ stream.SetSendCompress(s.opts.cp.Type())
+ } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
+ // Legacy compressor not specified; attempt to respond with same encoding.
+ ss.comp = encoding.GetCompressor(rc)
+ if ss.comp != nil {
+ stream.SetSendCompress(rc)
+ }
}
+
if trInfo != nil {
trInfo.tr.LazyLog(&trInfo.firstLine, false)
defer func() {
if appErr != nil {
appStatus, ok := status.FromError(appErr)
if !ok {
- switch err := appErr.(type) {
- case transport.StreamError:
- appStatus = status.New(err.Code, err.Desc)
- default:
- appStatus = status.New(convertCode(appErr), appErr.Error())
- }
+ appStatus = status.New(codes.Unknown, appErr.Error())
appErr = appStatus.Err()
}
if trInfo != nil {
ss.mu.Unlock()
}
t.WriteStatus(ss.s, appStatus)
+ if ss.binlog != nil {
+ ss.binlog.Log(&binarylog.ServerTrailer{
+ Trailer: ss.s.Trailer(),
+ Err: appErr,
+ })
+ }
// TODO: Should we log an error from WriteStatus here and below?
return appErr
}
ss.trInfo.tr.LazyLog(stringer("OK"), false)
ss.mu.Unlock()
}
- return t.WriteStatus(ss.s, status.New(codes.OK, ""))
-
+ err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
+ if ss.binlog != nil {
+ ss.binlog.Log(&binarylog.ServerTrailer{
+ Trailer: ss.s.Trailer(),
+ Err: appErr,
+ })
+ }
+ return err
}
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
}
service := sm[:pos]
method := sm[pos+1:]
- srv, ok := s.m[service]
- if !ok {
- if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
- s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
+
+ if srv, ok := s.m[service]; ok {
+ if md, ok := srv.md[method]; ok {
+ s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
- if trInfo != nil {
- trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
- trInfo.tr.SetError()
- }
- errDesc := fmt.Sprintf("unknown service %v", service)
- if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
- if trInfo != nil {
- trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
- trInfo.tr.SetError()
- }
- grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
- }
- if trInfo != nil {
- trInfo.tr.Finish()
+ if sd, ok := srv.sd[method]; ok {
+ s.processStreamingRPC(t, stream, srv, sd, trInfo)
+ return
}
- return
- }
- // Unary RPC or Streaming RPC?
- if md, ok := srv.md[method]; ok {
- s.processUnaryRPC(t, stream, srv, md, trInfo)
- return
}
- if sd, ok := srv.sd[method]; ok {
- s.processStreamingRPC(t, stream, srv, sd, trInfo)
+ // Unknown service, or known server unknown method.
+ if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
+ s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
return
}
if trInfo != nil {
- trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true)
+ trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
trInfo.tr.SetError()
}
- if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
- s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
- return
- }
- errDesc := fmt.Sprintf("unknown method %v", method)
+ errDesc := fmt.Sprintf("unknown service %v", service)
if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
}
}
+// The key to save ServerTransportStream in the context.
+type streamKey struct{}
+
+// NewContextWithServerTransportStream creates a new context from ctx and
+// attaches stream to it.
+//
+// This API is EXPERIMENTAL.
+func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
+ return context.WithValue(ctx, streamKey{}, stream)
+}
+
+// ServerTransportStream is a minimal interface that a transport stream must
+// implement. This can be used to mock an actual transport stream for tests of
+// handler code that use, for example, grpc.SetHeader (which requires some
+// stream to be in context).
+//
+// See also NewContextWithServerTransportStream.
+//
+// This API is EXPERIMENTAL.
+type ServerTransportStream interface {
+ Method() string
+ SetHeader(md metadata.MD) error
+ SendHeader(md metadata.MD) error
+ SetTrailer(md metadata.MD) error
+}
+
+// ServerTransportStreamFromContext returns the ServerTransportStream saved in
+// ctx. Returns nil if the given context has no stream associated with it
+// (which implies it is not an RPC invocation context).
+//
+// This API is EXPERIMENTAL.
+func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
+ s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
+ return s
+}
+
// Stop stops the gRPC server. It immediately closes all open
// connections and listeners.
// It cancels all active RPCs on the server side and the corresponding
// pending RPCs on the client side will get notified by connection
// errors.
func (s *Server) Stop() {
+ s.quitOnce.Do(func() {
+ close(s.quit)
+ })
+
+ defer func() {
+ s.serveWG.Wait()
+ s.doneOnce.Do(func() {
+ close(s.done)
+ })
+ }()
+
+ s.channelzRemoveOnce.Do(func() {
+ if channelz.IsOn() {
+ channelz.RemoveEntry(s.channelzID)
+ }
+ })
+
s.mu.Lock()
listeners := s.lis
s.lis = nil
}
s.mu.Lock()
- s.cancel()
if s.events != nil {
s.events.Finish()
s.events = nil
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
+ s.quitOnce.Do(func() {
+ close(s.quit)
+ })
+
+ defer func() {
+ s.doneOnce.Do(func() {
+ close(s.done)
+ })
+ }()
+
+ s.channelzRemoveOnce.Do(func() {
+ if channelz.IsOn() {
+ channelz.RemoveEntry(s.channelzID)
+ }
+ })
s.mu.Lock()
- defer s.mu.Unlock()
if s.conns == nil {
+ s.mu.Unlock()
return
}
+
for lis := range s.lis {
lis.Close()
}
s.lis = nil
- s.cancel()
if !s.drain {
for c := range s.conns {
c.(transport.ServerTransport).Drain()
}
s.drain = true
}
+
+ // Wait for serving threads to be ready to exit. Only then can we be sure no
+ // new conns will be created.
+ s.mu.Unlock()
+ s.serveWG.Wait()
+ s.mu.Lock()
+
for len(s.conns) != 0 {
s.cv.Wait()
}
s.events.Finish()
s.events = nil
}
+ s.mu.Unlock()
}
-func init() {
- internal.TestingCloseConns = func(arg interface{}) {
- arg.(*Server).testingCloseConns()
+// contentSubtype must be lowercase
+// cannot return nil
+func (s *Server) getCodec(contentSubtype string) baseCodec {
+ if s.opts.codec != nil {
+ return s.opts.codec
}
- internal.TestingUseHandlerImpl = func(arg interface{}) {
- arg.(*Server).opts.useHandlerImpl = true
+ if contentSubtype == "" {
+ return encoding.GetCodec(proto.Name)
}
-}
-
-// testingCloseConns closes all existing transports but keeps s.lis
-// accepting new connections.
-func (s *Server) testingCloseConns() {
- s.mu.Lock()
- for c := range s.conns {
- c.Close()
- delete(s.conns, c)
+ codec := encoding.GetCodec(contentSubtype)
+ if codec == nil {
+ return encoding.GetCodec(proto.Name)
}
- s.mu.Unlock()
+ return codec
}
// SetHeader sets the header metadata.
if md.Len() == 0 {
return nil
}
- stream, ok := transport.StreamFromContext(ctx)
- if !ok {
- return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ stream := ServerTransportStreamFromContext(ctx)
+ if stream == nil {
+ return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
return stream.SetHeader(md)
}
// SendHeader sends header metadata. It may be called at most once.
// The provided md and headers set by SetHeader() will be sent.
func SendHeader(ctx context.Context, md metadata.MD) error {
- stream, ok := transport.StreamFromContext(ctx)
- if !ok {
- return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ stream := ServerTransportStreamFromContext(ctx)
+ if stream == nil {
+ return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
- t := stream.ServerTransport()
- if t == nil {
- grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
- }
- if err := t.WriteHeader(stream, md); err != nil {
+ if err := stream.SendHeader(md); err != nil {
return toRPCErr(err)
}
return nil
if md.Len() == 0 {
return nil
}
- stream, ok := transport.StreamFromContext(ctx)
- if !ok {
- return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ stream := ServerTransportStreamFromContext(ctx)
+ if stream == nil {
+ return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
return stream.SetTrailer(md)
}
+
+// Method returns the method string for the server context. The returned
+// string is in the format of "/service/method".
+func Method(ctx context.Context) (string, bool) {
+ s := ServerTransportStreamFromContext(ctx)
+ if s == nil {
+ return "", false
+ }
+ return s.Method(), true
+}
+
+type channelzServer struct {
+ s *Server
+}
+
+func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
+ return c.s.channelzMetric()
+}