aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/google.golang.org/grpc/server.go861
1 files changed, 594 insertions, 267 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 42733e2..d705d7a 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -19,36 +19,41 @@
19package grpc 19package grpc
20 20
21import ( 21import (
22 "bytes" 22 "context"
23 "errors" 23 "errors"
24 "fmt" 24 "fmt"
25 "io" 25 "io"
26 "math"
26 "net" 27 "net"
27 "net/http" 28 "net/http"
28 "reflect" 29 "reflect"
29 "runtime" 30 "runtime"
30 "strings" 31 "strings"
31 "sync" 32 "sync"
33 "sync/atomic"
32 "time" 34 "time"
33 35
34 "golang.org/x/net/context"
35 "golang.org/x/net/http2"
36 "golang.org/x/net/trace" 36 "golang.org/x/net/trace"
37
37 "google.golang.org/grpc/codes" 38 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/credentials" 39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/encoding"
41 "google.golang.org/grpc/encoding/proto"
39 "google.golang.org/grpc/grpclog" 42 "google.golang.org/grpc/grpclog"
40 "google.golang.org/grpc/internal" 43 "google.golang.org/grpc/internal/binarylog"
44 "google.golang.org/grpc/internal/channelz"
45 "google.golang.org/grpc/internal/transport"
41 "google.golang.org/grpc/keepalive" 46 "google.golang.org/grpc/keepalive"
42 "google.golang.org/grpc/metadata" 47 "google.golang.org/grpc/metadata"
48 "google.golang.org/grpc/peer"
43 "google.golang.org/grpc/stats" 49 "google.golang.org/grpc/stats"
44 "google.golang.org/grpc/status" 50 "google.golang.org/grpc/status"
45 "google.golang.org/grpc/tap" 51 "google.golang.org/grpc/tap"
46 "google.golang.org/grpc/transport"
47) 52)
48 53
49const ( 54const (
50 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 55 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
51 defaultServerMaxSendMessageSize = 1024 * 1024 * 4 56 defaultServerMaxSendMessageSize = math.MaxInt32
52) 57)
53 58
54type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) 59type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
@@ -88,18 +93,24 @@ type Server struct {
88 conns map[io.Closer]bool 93 conns map[io.Closer]bool
89 serve bool 94 serve bool
90 drain bool 95 drain bool
91 ctx context.Context 96 cv *sync.Cond // signaled when connections close for GracefulStop
92 cancel context.CancelFunc
93 // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
94 // and all the transport goes away.
95 cv *sync.Cond
96 m map[string]*service // service name -> service info 97 m map[string]*service // service name -> service info
97 events trace.EventLog 98 events trace.EventLog
99
100 quit chan struct{}
101 done chan struct{}
102 quitOnce sync.Once
103 doneOnce sync.Once
104 channelzRemoveOnce sync.Once
105 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
106
107 channelzID int64 // channelz unique identification number
108 czData *channelzData
98} 109}
99 110
100type options struct { 111type options struct {
101 creds credentials.TransportCredentials 112 creds credentials.TransportCredentials
102 codec Codec 113 codec baseCodec
103 cp Compressor 114 cp Compressor
104 dc Decompressor 115 dc Decompressor
105 unaryInt UnaryServerInterceptor 116 unaryInt UnaryServerInterceptor
@@ -109,22 +120,50 @@ type options struct {
109 maxConcurrentStreams uint32 120 maxConcurrentStreams uint32
110 maxReceiveMessageSize int 121 maxReceiveMessageSize int
111 maxSendMessageSize int 122 maxSendMessageSize int
112 useHandlerImpl bool // use http.Handler-based server
113 unknownStreamDesc *StreamDesc 123 unknownStreamDesc *StreamDesc
114 keepaliveParams keepalive.ServerParameters 124 keepaliveParams keepalive.ServerParameters
115 keepalivePolicy keepalive.EnforcementPolicy 125 keepalivePolicy keepalive.EnforcementPolicy
116 initialWindowSize int32 126 initialWindowSize int32
117 initialConnWindowSize int32 127 initialConnWindowSize int32
128 writeBufferSize int
129 readBufferSize int
130 connectionTimeout time.Duration
131 maxHeaderListSize *uint32
118} 132}
119 133
120var defaultServerOptions = options{ 134var defaultServerOptions = options{
121 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, 135 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
122 maxSendMessageSize: defaultServerMaxSendMessageSize, 136 maxSendMessageSize: defaultServerMaxSendMessageSize,
137 connectionTimeout: 120 * time.Second,
138 writeBufferSize: defaultWriteBufSize,
139 readBufferSize: defaultReadBufSize,
123} 140}
124 141
125// A ServerOption sets options such as credentials, codec and keepalive parameters, etc. 142// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
126type ServerOption func(*options) 143type ServerOption func(*options)
127 144
145// WriteBufferSize determines how much data can be batched before doing a write on the wire.
146// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
147// The default value for this buffer is 32KB.
148// Zero will disable the write buffer such that each write will be on underlying connection.
149// Note: A Send call may not directly translate to a write.
150func WriteBufferSize(s int) ServerOption {
151 return func(o *options) {
152 o.writeBufferSize = s
153 }
154}
155
156// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
157// for one read syscall.
158// The default value for this buffer is 32KB.
159// Zero will disable read buffer for a connection so data framer can access the underlying
160// conn directly.
161func ReadBufferSize(s int) ServerOption {
162 return func(o *options) {
163 o.readBufferSize = s
164 }
165}
166
128// InitialWindowSize returns a ServerOption that sets window size for stream. 167// InitialWindowSize returns a ServerOption that sets window size for stream.
129// The lower bound for window size is 64K and any value smaller than that will be ignored. 168// The lower bound for window size is 64K and any value smaller than that will be ignored.
130func InitialWindowSize(s int32) ServerOption { 169func InitialWindowSize(s int32) ServerOption {
@@ -156,20 +195,32 @@ func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
156} 195}
157 196
158// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. 197// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
198//
199// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
159func CustomCodec(codec Codec) ServerOption { 200func CustomCodec(codec Codec) ServerOption {
160 return func(o *options) { 201 return func(o *options) {
161 o.codec = codec 202 o.codec = codec
162 } 203 }
163} 204}
164 205
165// RPCCompressor returns a ServerOption that sets a compressor for outbound messages. 206// RPCCompressor returns a ServerOption that sets a compressor for outbound
207// messages. For backward compatibility, all outbound messages will be sent
208// using this compressor, regardless of incoming message compression. By
209// default, server messages will be sent using the same compressor with which
210// request messages were sent.
211//
212// Deprecated: use encoding.RegisterCompressor instead.
166func RPCCompressor(cp Compressor) ServerOption { 213func RPCCompressor(cp Compressor) ServerOption {
167 return func(o *options) { 214 return func(o *options) {
168 o.cp = cp 215 o.cp = cp
169 } 216 }
170} 217}
171 218
172// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages. 219// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
220// messages. It has higher priority than decompressors registered via
221// encoding.RegisterCompressor.
222//
223// Deprecated: use encoding.RegisterCompressor instead.
173func RPCDecompressor(dc Decompressor) ServerOption { 224func RPCDecompressor(dc Decompressor) ServerOption {
174 return func(o *options) { 225 return func(o *options) {
175 o.dc = dc 226 o.dc = dc
@@ -177,7 +228,9 @@ func RPCDecompressor(dc Decompressor) ServerOption {
177} 228}
178 229
179// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. 230// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
180// If this is not set, gRPC uses the default limit. Deprecated: use MaxRecvMsgSize instead. 231// If this is not set, gRPC uses the default limit.
232//
233// Deprecated: use MaxRecvMsgSize instead.
181func MaxMsgSize(m int) ServerOption { 234func MaxMsgSize(m int) ServerOption {
182 return MaxRecvMsgSize(m) 235 return MaxRecvMsgSize(m)
183} 236}
@@ -259,7 +312,7 @@ func StatsHandler(h stats.Handler) ServerOption {
259// handler that will be invoked instead of returning the "unimplemented" gRPC 312// handler that will be invoked instead of returning the "unimplemented" gRPC
260// error whenever a request is received for an unregistered service or method. 313// error whenever a request is received for an unregistered service or method.
261// The handling function has full access to the Context of the request and the 314// The handling function has full access to the Context of the request and the
262// stream, and the invocation passes through interceptors. 315// stream, and the invocation bypasses interceptors.
263func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { 316func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
264 return func(o *options) { 317 return func(o *options) {
265 o.unknownStreamDesc = &StreamDesc{ 318 o.unknownStreamDesc = &StreamDesc{
@@ -272,6 +325,26 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
272 } 325 }
273} 326}
274 327
328// ConnectionTimeout returns a ServerOption that sets the timeout for
329// connection establishment (up to and including HTTP/2 handshaking) for all
330// new connections. If this is not set, the default is 120 seconds. A zero or
331// negative value will result in an immediate timeout.
332//
333// This API is EXPERIMENTAL.
334func ConnectionTimeout(d time.Duration) ServerOption {
335 return func(o *options) {
336 o.connectionTimeout = d
337 }
338}
339
340// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
341// of header list that the server is prepared to accept.
342func MaxHeaderListSize(s uint32) ServerOption {
343 return func(o *options) {
344 o.maxHeaderListSize = &s
345 }
346}
347
275// NewServer creates a gRPC server which has no service registered and has not 348// NewServer creates a gRPC server which has no service registered and has not
276// started to accept requests yet. 349// started to accept requests yet.
277func NewServer(opt ...ServerOption) *Server { 350func NewServer(opt ...ServerOption) *Server {
@@ -279,22 +352,24 @@ func NewServer(opt ...ServerOption) *Server {
279 for _, o := range opt { 352 for _, o := range opt {
280 o(&opts) 353 o(&opts)
281 } 354 }
282 if opts.codec == nil {
283 // Set the default codec.
284 opts.codec = protoCodec{}
285 }
286 s := &Server{ 355 s := &Server{
287 lis: make(map[net.Listener]bool), 356 lis: make(map[net.Listener]bool),
288 opts: opts, 357 opts: opts,
289 conns: make(map[io.Closer]bool), 358 conns: make(map[io.Closer]bool),
290 m: make(map[string]*service), 359 m: make(map[string]*service),
360 quit: make(chan struct{}),
361 done: make(chan struct{}),
362 czData: new(channelzData),
291 } 363 }
292 s.cv = sync.NewCond(&s.mu) 364 s.cv = sync.NewCond(&s.mu)
293 s.ctx, s.cancel = context.WithCancel(context.Background())
294 if EnableTracing { 365 if EnableTracing {
295 _, file, line, _ := runtime.Caller(1) 366 _, file, line, _ := runtime.Caller(1)
296 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) 367 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
297 } 368 }
369
370 if channelz.IsOn() {
371 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
372 }
298 return s 373 return s
299} 374}
300 375
@@ -399,11 +474,9 @@ func (s *Server) GetServiceInfo() map[string]ServiceInfo {
399 return ret 474 return ret
400} 475}
401 476
402var ( 477// ErrServerStopped indicates that the operation is now illegal because of
403 // ErrServerStopped indicates that the operation is now illegal because of 478// the server being stopped.
404 // the server being stopped. 479var ErrServerStopped = errors.New("grpc: the server has been stopped")
405 ErrServerStopped = errors.New("grpc: the server has been stopped")
406)
407 480
408func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { 481func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
409 if s.opts.creds == nil { 482 if s.opts.creds == nil {
@@ -412,28 +485,67 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
412 return s.opts.creds.ServerHandshake(rawConn) 485 return s.opts.creds.ServerHandshake(rawConn)
413} 486}
414 487
488type listenSocket struct {
489 net.Listener
490 channelzID int64
491}
492
493func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
494 return &channelz.SocketInternalMetric{
495 SocketOptions: channelz.GetSocketOption(l.Listener),
496 LocalAddr: l.Listener.Addr(),
497 }
498}
499
500func (l *listenSocket) Close() error {
501 err := l.Listener.Close()
502 if channelz.IsOn() {
503 channelz.RemoveEntry(l.channelzID)
504 }
505 return err
506}
507
415// Serve accepts incoming connections on the listener lis, creating a new 508// Serve accepts incoming connections on the listener lis, creating a new
416// ServerTransport and service goroutine for each. The service goroutines 509// ServerTransport and service goroutine for each. The service goroutines
417// read gRPC requests and then call the registered handlers to reply to them. 510// read gRPC requests and then call the registered handlers to reply to them.
418// Serve returns when lis.Accept fails with fatal errors. lis will be closed when 511// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
419// this method returns. 512// this method returns.
420// Serve always returns non-nil error. 513// Serve will return a non-nil error unless Stop or GracefulStop is called.
421func (s *Server) Serve(lis net.Listener) error { 514func (s *Server) Serve(lis net.Listener) error {
422 s.mu.Lock() 515 s.mu.Lock()
423 s.printf("serving") 516 s.printf("serving")
424 s.serve = true 517 s.serve = true
425 if s.lis == nil { 518 if s.lis == nil {
519 // Serve called after Stop or GracefulStop.
426 s.mu.Unlock() 520 s.mu.Unlock()
427 lis.Close() 521 lis.Close()
428 return ErrServerStopped 522 return ErrServerStopped
429 } 523 }
430 s.lis[lis] = true 524
525 s.serveWG.Add(1)
526 defer func() {
527 s.serveWG.Done()
528 select {
529 // Stop or GracefulStop called; block until done and return nil.
530 case <-s.quit:
531 <-s.done
532 default:
533 }
534 }()
535
536 ls := &listenSocket{Listener: lis}
537 s.lis[ls] = true
538
539 if channelz.IsOn() {
540 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
541 }
431 s.mu.Unlock() 542 s.mu.Unlock()
543
432 defer func() { 544 defer func() {
433 s.mu.Lock() 545 s.mu.Lock()
434 if s.lis != nil && s.lis[lis] { 546 if s.lis != nil && s.lis[ls] {
435 lis.Close() 547 ls.Close()
436 delete(s.lis, lis) 548 delete(s.lis, ls)
437 } 549 }
438 s.mu.Unlock() 550 s.mu.Unlock()
439 }() 551 }()
@@ -460,36 +572,52 @@ func (s *Server) Serve(lis net.Listener) error {
460 timer := time.NewTimer(tempDelay) 572 timer := time.NewTimer(tempDelay)
461 select { 573 select {
462 case <-timer.C: 574 case <-timer.C:
463 case <-s.ctx.Done(): 575 case <-s.quit:
576 timer.Stop()
577 return nil
464 } 578 }
465 timer.Stop()
466 continue 579 continue
467 } 580 }
468 s.mu.Lock() 581 s.mu.Lock()
469 s.printf("done serving; Accept = %v", err) 582 s.printf("done serving; Accept = %v", err)
470 s.mu.Unlock() 583 s.mu.Unlock()
584
585 select {
586 case <-s.quit:
587 return nil
588 default:
589 }
471 return err 590 return err
472 } 591 }
473 tempDelay = 0 592 tempDelay = 0
474 // Start a new goroutine to deal with rawConn 593 // Start a new goroutine to deal with rawConn so we don't stall this Accept
475 // so we don't stall this Accept loop goroutine. 594 // loop goroutine.
476 go s.handleRawConn(rawConn) 595 //
596 // Make sure we account for the goroutine so GracefulStop doesn't nil out
597 // s.conns before this conn can be added.
598 s.serveWG.Add(1)
599 go func() {
600 s.handleRawConn(rawConn)
601 s.serveWG.Done()
602 }()
477 } 603 }
478} 604}
479 605
480// handleRawConn is run in its own goroutine and handles a just-accepted 606// handleRawConn forks a goroutine to handle a just-accepted connection that
481// connection that has not had any I/O performed on it yet. 607// has not had any I/O performed on it yet.
482func (s *Server) handleRawConn(rawConn net.Conn) { 608func (s *Server) handleRawConn(rawConn net.Conn) {
609 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
483 conn, authInfo, err := s.useTransportAuthenticator(rawConn) 610 conn, authInfo, err := s.useTransportAuthenticator(rawConn)
484 if err != nil { 611 if err != nil {
485 s.mu.Lock() 612 s.mu.Lock()
486 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) 613 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
487 s.mu.Unlock() 614 s.mu.Unlock()
488 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) 615 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
489 // If serverHandShake returns ErrConnDispatched, keep rawConn open. 616 // If serverHandshake returns ErrConnDispatched, keep rawConn open.
490 if err != credentials.ErrConnDispatched { 617 if err != credentials.ErrConnDispatched {
491 rawConn.Close() 618 rawConn.Close()
492 } 619 }
620 rawConn.SetDeadline(time.Time{})
493 return 621 return
494 } 622 }
495 623
@@ -501,19 +629,25 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
501 } 629 }
502 s.mu.Unlock() 630 s.mu.Unlock()
503 631
504 if s.opts.useHandlerImpl { 632 // Finish handshaking (HTTP2)
505 s.serveUsingHandler(conn) 633 st := s.newHTTP2Transport(conn, authInfo)
506 } else { 634 if st == nil {
507 s.serveHTTP2Transport(conn, authInfo) 635 return
636 }
637
638 rawConn.SetDeadline(time.Time{})
639 if !s.addConn(st) {
640 return
508 } 641 }
642 go func() {
643 s.serveStreams(st)
644 s.removeConn(st)
645 }()
509} 646}
510 647
511// serveHTTP2Transport sets up a http/2 transport (using the 648// newHTTP2Transport sets up a http/2 transport (using the
512// gRPC http2 server transport in transport/http2_server.go) and 649// gRPC http2 server transport in transport/http2_server.go).
513// serves streams on it. 650func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
514// This is run in its own goroutine (it does network I/O in
515// transport.NewServerTransport).
516func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
517 config := &transport.ServerConfig{ 651 config := &transport.ServerConfig{
518 MaxStreams: s.opts.maxConcurrentStreams, 652 MaxStreams: s.opts.maxConcurrentStreams,
519 AuthInfo: authInfo, 653 AuthInfo: authInfo,
@@ -523,6 +657,10 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
523 KeepalivePolicy: s.opts.keepalivePolicy, 657 KeepalivePolicy: s.opts.keepalivePolicy,
524 InitialWindowSize: s.opts.initialWindowSize, 658 InitialWindowSize: s.opts.initialWindowSize,
525 InitialConnWindowSize: s.opts.initialConnWindowSize, 659 InitialConnWindowSize: s.opts.initialConnWindowSize,
660 WriteBufferSize: s.opts.writeBufferSize,
661 ReadBufferSize: s.opts.readBufferSize,
662 ChannelzParentID: s.channelzID,
663 MaxHeaderListSize: s.opts.maxHeaderListSize,
526 } 664 }
527 st, err := transport.NewServerTransport("http2", c, config) 665 st, err := transport.NewServerTransport("http2", c, config)
528 if err != nil { 666 if err != nil {
@@ -531,17 +669,13 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
531 s.mu.Unlock() 669 s.mu.Unlock()
532 c.Close() 670 c.Close()
533 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err) 671 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
534 return 672 return nil
535 }
536 if !s.addConn(st) {
537 st.Close()
538 return
539 } 673 }
540 s.serveStreams(st) 674
675 return st
541} 676}
542 677
543func (s *Server) serveStreams(st transport.ServerTransport) { 678func (s *Server) serveStreams(st transport.ServerTransport) {
544 defer s.removeConn(st)
545 defer st.Close() 679 defer st.Close()
546 var wg sync.WaitGroup 680 var wg sync.WaitGroup
547 st.HandleStreams(func(stream *transport.Stream) { 681 st.HandleStreams(func(stream *transport.Stream) {
@@ -562,32 +696,6 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
562 696
563var _ http.Handler = (*Server)(nil) 697var _ http.Handler = (*Server)(nil)
564 698
565// serveUsingHandler is called from handleRawConn when s is configured
566// to handle requests via the http.Handler interface. It sets up a
567// net/http.Server to handle the just-accepted conn. The http.Server
568// is configured to route all incoming requests (all HTTP/2 streams)
569// to ServeHTTP, which creates a new ServerTransport for each stream.
570// serveUsingHandler blocks until conn closes.
571//
572// This codepath is only used when Server.TestingUseHandlerImpl has
573// been configured. This lets the end2end tests exercise the ServeHTTP
574// method as one of the environment types.
575//
576// conn is the *tls.Conn that's already been authenticated.
577func (s *Server) serveUsingHandler(conn net.Conn) {
578 if !s.addConn(conn) {
579 conn.Close()
580 return
581 }
582 defer s.removeConn(conn)
583 h2s := &http2.Server{
584 MaxConcurrentStreams: s.opts.maxConcurrentStreams,
585 }
586 h2s.ServeConn(conn, &http2.ServeConnOpts{
587 Handler: s,
588 })
589}
590
591// ServeHTTP implements the Go standard library's http.Handler 699// ServeHTTP implements the Go standard library's http.Handler
592// interface by responding to the gRPC request r, by looking up 700// interface by responding to the gRPC request r, by looking up
593// the requested gRPC method in the gRPC server s. 701// the requested gRPC method in the gRPC server s.
@@ -613,13 +721,12 @@ func (s *Server) serveUsingHandler(conn net.Conn) {
613// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL 721// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
614// and subject to change. 722// and subject to change.
615func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { 723func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
616 st, err := transport.NewServerHandlerTransport(w, r) 724 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
617 if err != nil { 725 if err != nil {
618 http.Error(w, err.Error(), http.StatusInternalServerError) 726 http.Error(w, err.Error(), http.StatusInternalServerError)
619 return 727 return
620 } 728 }
621 if !s.addConn(st) { 729 if !s.addConn(st) {
622 st.Close()
623 return 730 return
624 } 731 }
625 defer s.removeConn(st) 732 defer s.removeConn(st)
@@ -649,9 +756,15 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
649func (s *Server) addConn(c io.Closer) bool { 756func (s *Server) addConn(c io.Closer) bool {
650 s.mu.Lock() 757 s.mu.Lock()
651 defer s.mu.Unlock() 758 defer s.mu.Unlock()
652 if s.conns == nil || s.drain { 759 if s.conns == nil {
760 c.Close()
653 return false 761 return false
654 } 762 }
763 if s.drain {
764 // Transport added after we drained our existing conns: drain it
765 // immediately.
766 c.(transport.ServerTransport).Drain()
767 }
655 s.conns[c] = true 768 s.conns[c] = true
656 return true 769 return true
657} 770}
@@ -665,43 +778,73 @@ func (s *Server) removeConn(c io.Closer) {
665 } 778 }
666} 779}
667 780
668func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error { 781func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
669 var ( 782 return &channelz.ServerInternalMetric{
670 cbuf *bytes.Buffer 783 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
671 outPayload *stats.OutPayload 784 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
672 ) 785 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
673 if cp != nil { 786 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
674 cbuf = new(bytes.Buffer)
675 } 787 }
676 if s.opts.statsHandler != nil { 788}
677 outPayload = &stats.OutPayload{} 789
678 } 790func (s *Server) incrCallsStarted() {
679 p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload) 791 atomic.AddInt64(&s.czData.callsStarted, 1)
792 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
793}
794
795func (s *Server) incrCallsSucceeded() {
796 atomic.AddInt64(&s.czData.callsSucceeded, 1)
797}
798
799func (s *Server) incrCallsFailed() {
800 atomic.AddInt64(&s.czData.callsFailed, 1)
801}
802
803func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
804 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
680 if err != nil { 805 if err != nil {
681 grpclog.Errorln("grpc: server failed to encode response: ", err) 806 grpclog.Errorln("grpc: server failed to encode response: ", err)
682 return err 807 return err
683 } 808 }
684 if len(p) > s.opts.maxSendMessageSize { 809 compData, err := compress(data, cp, comp)
685 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(p), s.opts.maxSendMessageSize) 810 if err != nil {
811 grpclog.Errorln("grpc: server failed to compress response: ", err)
812 return err
813 }
814 hdr, payload := msgHeader(data, compData)
815 // TODO(dfawley): should we be checking len(data) instead?
816 if len(payload) > s.opts.maxSendMessageSize {
817 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
686 } 818 }
687 err = t.Write(stream, p, opts) 819 err = t.Write(stream, hdr, payload, opts)
688 if err == nil && outPayload != nil { 820 if err == nil && s.opts.statsHandler != nil {
689 outPayload.SentTime = time.Now() 821 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
690 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload)
691 } 822 }
692 return err 823 return err
693} 824}
694 825
695func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { 826func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
827 if channelz.IsOn() {
828 s.incrCallsStarted()
829 defer func() {
830 if err != nil && err != io.EOF {
831 s.incrCallsFailed()
832 } else {
833 s.incrCallsSucceeded()
834 }
835 }()
836 }
696 sh := s.opts.statsHandler 837 sh := s.opts.statsHandler
697 if sh != nil { 838 if sh != nil {
839 beginTime := time.Now()
698 begin := &stats.Begin{ 840 begin := &stats.Begin{
699 BeginTime: time.Now(), 841 BeginTime: beginTime,
700 } 842 }
701 sh.HandleRPC(stream.Context(), begin) 843 sh.HandleRPC(stream.Context(), begin)
702 defer func() { 844 defer func() {
703 end := &stats.End{ 845 end := &stats.End{
704 EndTime: time.Now(), 846 BeginTime: beginTime,
847 EndTime: time.Now(),
705 } 848 }
706 if err != nil && err != io.EOF { 849 if err != nil && err != io.EOF {
707 end.Error = toRPCErr(err) 850 end.Error = toRPCErr(err)
@@ -720,94 +863,112 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
720 } 863 }
721 }() 864 }()
722 } 865 }
723 if s.opts.cp != nil { 866
724 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. 867 binlog := binarylog.GetMethodLogger(stream.Method())
725 stream.SetSendCompress(s.opts.cp.Type()) 868 if binlog != nil {
869 ctx := stream.Context()
870 md, _ := metadata.FromIncomingContext(ctx)
871 logEntry := &binarylog.ClientHeader{
872 Header: md,
873 MethodName: stream.Method(),
874 PeerAddr: nil,
875 }
876 if deadline, ok := ctx.Deadline(); ok {
877 logEntry.Timeout = deadline.Sub(time.Now())
878 if logEntry.Timeout < 0 {
879 logEntry.Timeout = 0
880 }
881 }
882 if a := md[":authority"]; len(a) > 0 {
883 logEntry.Authority = a[0]
884 }
885 if peer, ok := peer.FromContext(ctx); ok {
886 logEntry.PeerAddr = peer.Addr
887 }
888 binlog.Log(logEntry)
889 }
890
891 // comp and cp are used for compression. decomp and dc are used for
892 // decompression. If comp and decomp are both set, they are the same;
893 // however they are kept separate to ensure that at most one of the
894 // compressor/decompressor variable pairs are set for use later.
895 var comp, decomp encoding.Compressor
896 var cp Compressor
897 var dc Decompressor
898
899 // If dc is set and matches the stream's compression, use it. Otherwise, try
900 // to find a matching registered compressor for decomp.
901 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
902 dc = s.opts.dc
903 } else if rc != "" && rc != encoding.Identity {
904 decomp = encoding.GetCompressor(rc)
905 if decomp == nil {
906 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
907 t.WriteStatus(stream, st)
908 return st.Err()
909 }
726 } 910 }
727 p := &parser{r: stream} 911
728 pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize) 912 // If cp is set, use it. Otherwise, attempt to compress the response using
729 if err == io.EOF { 913 // the incoming message compression method.
730 // The entire stream is done (for unary RPC only). 914 //
731 return err 915 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
916 if s.opts.cp != nil {
917 cp = s.opts.cp
918 stream.SetSendCompress(cp.Type())
919 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
920 // Legacy compressor not specified; attempt to respond with same encoding.
921 comp = encoding.GetCompressor(rc)
922 if comp != nil {
923 stream.SetSendCompress(rc)
924 }
732 } 925 }
733 if err == io.ErrUnexpectedEOF { 926
734 err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) 927 var payInfo *payloadInfo
928 if sh != nil || binlog != nil {
929 payInfo = &payloadInfo{}
735 } 930 }
931 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
736 if err != nil { 932 if err != nil {
737 if st, ok := status.FromError(err); ok { 933 if st, ok := status.FromError(err); ok {
738 if e := t.WriteStatus(stream, st); e != nil { 934 if e := t.WriteStatus(stream, st); e != nil {
739 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) 935 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
740 } 936 }
741 } else {
742 switch st := err.(type) {
743 case transport.ConnectionError:
744 // Nothing to do here.
745 case transport.StreamError:
746 if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
747 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
748 }
749 default:
750 panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
751 }
752 } 937 }
753 return err 938 return err
754 } 939 }
755 940 if channelz.IsOn() {
756 if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil { 941 t.IncrMsgRecv()
757 if st, ok := status.FromError(err); ok {
758 if e := t.WriteStatus(stream, st); e != nil {
759 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
760 }
761 return err
762 }
763 if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
764 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
765 }
766
767 // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
768 }
769 var inPayload *stats.InPayload
770 if sh != nil {
771 inPayload = &stats.InPayload{
772 RecvTime: time.Now(),
773 }
774 } 942 }
775 df := func(v interface{}) error { 943 df := func(v interface{}) error {
776 if inPayload != nil { 944 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
777 inPayload.WireLength = len(req)
778 }
779 if pf == compressionMade {
780 var err error
781 req, err = s.opts.dc.Do(bytes.NewReader(req))
782 if err != nil {
783 return Errorf(codes.Internal, err.Error())
784 }
785 }
786 if len(req) > s.opts.maxReceiveMessageSize {
787 // TODO: Revisit the error code. Currently keep it consistent with
788 // java implementation.
789 return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize)
790 }
791 if err := s.opts.codec.Unmarshal(req, v); err != nil {
792 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) 945 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
793 } 946 }
794 if inPayload != nil { 947 if sh != nil {
795 inPayload.Payload = v 948 sh.HandleRPC(stream.Context(), &stats.InPayload{
796 inPayload.Data = req 949 RecvTime: time.Now(),
797 inPayload.Length = len(req) 950 Payload: v,
798 sh.HandleRPC(stream.Context(), inPayload) 951 Data: d,
952 Length: len(d),
953 })
954 }
955 if binlog != nil {
956 binlog.Log(&binarylog.ClientMessage{
957 Message: d,
958 })
799 } 959 }
800 if trInfo != nil { 960 if trInfo != nil {
801 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) 961 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
802 } 962 }
803 return nil 963 return nil
804 } 964 }
805 reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt) 965 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
966 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
806 if appErr != nil { 967 if appErr != nil {
807 appStatus, ok := status.FromError(appErr) 968 appStatus, ok := status.FromError(appErr)
808 if !ok { 969 if !ok {
809 // Convert appErr if it is not a grpc status error. 970 // Convert appErr if it is not a grpc status error.
810 appErr = status.Error(convertCode(appErr), appErr.Error()) 971 appErr = status.Error(codes.Unknown, appErr.Error())
811 appStatus, _ = status.FromError(appErr) 972 appStatus, _ = status.FromError(appErr)
812 } 973 }
813 if trInfo != nil { 974 if trInfo != nil {
@@ -817,16 +978,27 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
817 if e := t.WriteStatus(stream, appStatus); e != nil { 978 if e := t.WriteStatus(stream, appStatus); e != nil {
818 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) 979 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
819 } 980 }
981 if binlog != nil {
982 if h, _ := stream.Header(); h.Len() > 0 {
983 // Only log serverHeader if there was header. Otherwise it can
984 // be trailer only.
985 binlog.Log(&binarylog.ServerHeader{
986 Header: h,
987 })
988 }
989 binlog.Log(&binarylog.ServerTrailer{
990 Trailer: stream.Trailer(),
991 Err: appErr,
992 })
993 }
820 return appErr 994 return appErr
821 } 995 }
822 if trInfo != nil { 996 if trInfo != nil {
823 trInfo.tr.LazyLog(stringer("OK"), false) 997 trInfo.tr.LazyLog(stringer("OK"), false)
824 } 998 }
825 opts := &transport.Options{ 999 opts := &transport.Options{Last: true}
826 Last: true, 1000
827 Delay: false, 1001 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
828 }
829 if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
830 if err == io.EOF { 1002 if err == io.EOF {
831 // The entire stream is done (for unary RPC only). 1003 // The entire stream is done (for unary RPC only).
832 return err 1004 return err
@@ -839,35 +1011,72 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
839 switch st := err.(type) { 1011 switch st := err.(type) {
840 case transport.ConnectionError: 1012 case transport.ConnectionError:
841 // Nothing to do here. 1013 // Nothing to do here.
842 case transport.StreamError:
843 if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
844 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
845 }
846 default: 1014 default:
847 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) 1015 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
848 } 1016 }
849 } 1017 }
1018 if binlog != nil {
1019 h, _ := stream.Header()
1020 binlog.Log(&binarylog.ServerHeader{
1021 Header: h,
1022 })
1023 binlog.Log(&binarylog.ServerTrailer{
1024 Trailer: stream.Trailer(),
1025 Err: appErr,
1026 })
1027 }
850 return err 1028 return err
851 } 1029 }
1030 if binlog != nil {
1031 h, _ := stream.Header()
1032 binlog.Log(&binarylog.ServerHeader{
1033 Header: h,
1034 })
1035 binlog.Log(&binarylog.ServerMessage{
1036 Message: reply,
1037 })
1038 }
1039 if channelz.IsOn() {
1040 t.IncrMsgSent()
1041 }
852 if trInfo != nil { 1042 if trInfo != nil {
853 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) 1043 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
854 } 1044 }
855 // TODO: Should we be logging if writing status failed here, like above? 1045 // TODO: Should we be logging if writing status failed here, like above?
856 // Should the logging be in WriteStatus? Should we ignore the WriteStatus 1046 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
857 // error or allow the stats handler to see it? 1047 // error or allow the stats handler to see it?
858 return t.WriteStatus(stream, status.New(codes.OK, "")) 1048 err = t.WriteStatus(stream, status.New(codes.OK, ""))
1049 if binlog != nil {
1050 binlog.Log(&binarylog.ServerTrailer{
1051 Trailer: stream.Trailer(),
1052 Err: appErr,
1053 })
1054 }
1055 return err
859} 1056}
860 1057
861func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { 1058func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
1059 if channelz.IsOn() {
1060 s.incrCallsStarted()
1061 defer func() {
1062 if err != nil && err != io.EOF {
1063 s.incrCallsFailed()
1064 } else {
1065 s.incrCallsSucceeded()
1066 }
1067 }()
1068 }
862 sh := s.opts.statsHandler 1069 sh := s.opts.statsHandler
863 if sh != nil { 1070 if sh != nil {
1071 beginTime := time.Now()
864 begin := &stats.Begin{ 1072 begin := &stats.Begin{
865 BeginTime: time.Now(), 1073 BeginTime: beginTime,
866 } 1074 }
867 sh.HandleRPC(stream.Context(), begin) 1075 sh.HandleRPC(stream.Context(), begin)
868 defer func() { 1076 defer func() {
869 end := &stats.End{ 1077 end := &stats.End{
870 EndTime: time.Now(), 1078 BeginTime: beginTime,
1079 EndTime: time.Now(),
871 } 1080 }
872 if err != nil && err != io.EOF { 1081 if err != nil && err != io.EOF {
873 end.Error = toRPCErr(err) 1082 end.Error = toRPCErr(err)
@@ -875,24 +1084,70 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
875 sh.HandleRPC(stream.Context(), end) 1084 sh.HandleRPC(stream.Context(), end)
876 }() 1085 }()
877 } 1086 }
878 if s.opts.cp != nil { 1087 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
879 stream.SetSendCompress(s.opts.cp.Type())
880 }
881 ss := &serverStream{ 1088 ss := &serverStream{
882 t: t, 1089 ctx: ctx,
883 s: stream, 1090 t: t,
884 p: &parser{r: stream}, 1091 s: stream,
885 codec: s.opts.codec, 1092 p: &parser{r: stream},
886 cp: s.opts.cp, 1093 codec: s.getCodec(stream.ContentSubtype()),
887 dc: s.opts.dc,
888 maxReceiveMessageSize: s.opts.maxReceiveMessageSize, 1094 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
889 maxSendMessageSize: s.opts.maxSendMessageSize, 1095 maxSendMessageSize: s.opts.maxSendMessageSize,
890 trInfo: trInfo, 1096 trInfo: trInfo,
891 statsHandler: sh, 1097 statsHandler: sh,
892 } 1098 }
893 if ss.cp != nil { 1099
894 ss.cbuf = new(bytes.Buffer) 1100 ss.binlog = binarylog.GetMethodLogger(stream.Method())
1101 if ss.binlog != nil {
1102 md, _ := metadata.FromIncomingContext(ctx)
1103 logEntry := &binarylog.ClientHeader{
1104 Header: md,
1105 MethodName: stream.Method(),
1106 PeerAddr: nil,
1107 }
1108 if deadline, ok := ctx.Deadline(); ok {
1109 logEntry.Timeout = deadline.Sub(time.Now())
1110 if logEntry.Timeout < 0 {
1111 logEntry.Timeout = 0
1112 }
1113 }
1114 if a := md[":authority"]; len(a) > 0 {
1115 logEntry.Authority = a[0]
1116 }
1117 if peer, ok := peer.FromContext(ss.Context()); ok {
1118 logEntry.PeerAddr = peer.Addr
1119 }
1120 ss.binlog.Log(logEntry)
1121 }
1122
1123 // If dc is set and matches the stream's compression, use it. Otherwise, try
1124 // to find a matching registered compressor for decomp.
1125 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1126 ss.dc = s.opts.dc
1127 } else if rc != "" && rc != encoding.Identity {
1128 ss.decomp = encoding.GetCompressor(rc)
1129 if ss.decomp == nil {
1130 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1131 t.WriteStatus(ss.s, st)
1132 return st.Err()
1133 }
1134 }
1135
1136 // If cp is set, use it. Otherwise, attempt to compress the response using
1137 // the incoming message compression method.
1138 //
1139 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1140 if s.opts.cp != nil {
1141 ss.cp = s.opts.cp
1142 stream.SetSendCompress(s.opts.cp.Type())
1143 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1144 // Legacy compressor not specified; attempt to respond with same encoding.
1145 ss.comp = encoding.GetCompressor(rc)
1146 if ss.comp != nil {
1147 stream.SetSendCompress(rc)
1148 }
895 } 1149 }
1150
896 if trInfo != nil { 1151 if trInfo != nil {
897 trInfo.tr.LazyLog(&trInfo.firstLine, false) 1152 trInfo.tr.LazyLog(&trInfo.firstLine, false)
898 defer func() { 1153 defer func() {
@@ -924,12 +1179,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
924 if appErr != nil { 1179 if appErr != nil {
925 appStatus, ok := status.FromError(appErr) 1180 appStatus, ok := status.FromError(appErr)
926 if !ok { 1181 if !ok {
927 switch err := appErr.(type) { 1182 appStatus = status.New(codes.Unknown, appErr.Error())
928 case transport.StreamError:
929 appStatus = status.New(err.Code, err.Desc)
930 default:
931 appStatus = status.New(convertCode(appErr), appErr.Error())
932 }
933 appErr = appStatus.Err() 1183 appErr = appStatus.Err()
934 } 1184 }
935 if trInfo != nil { 1185 if trInfo != nil {
@@ -939,6 +1189,12 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
939 ss.mu.Unlock() 1189 ss.mu.Unlock()
940 } 1190 }
941 t.WriteStatus(ss.s, appStatus) 1191 t.WriteStatus(ss.s, appStatus)
1192 if ss.binlog != nil {
1193 ss.binlog.Log(&binarylog.ServerTrailer{
1194 Trailer: ss.s.Trailer(),
1195 Err: appErr,
1196 })
1197 }
942 // TODO: Should we log an error from WriteStatus here and below? 1198 // TODO: Should we log an error from WriteStatus here and below?
943 return appErr 1199 return appErr
944 } 1200 }
@@ -947,8 +1203,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
947 ss.trInfo.tr.LazyLog(stringer("OK"), false) 1203 ss.trInfo.tr.LazyLog(stringer("OK"), false)
948 ss.mu.Unlock() 1204 ss.mu.Unlock()
949 } 1205 }
950 return t.WriteStatus(ss.s, status.New(codes.OK, "")) 1206 err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
951 1207 if ss.binlog != nil {
1208 ss.binlog.Log(&binarylog.ServerTrailer{
1209 Trailer: ss.s.Trailer(),
1210 Err: appErr,
1211 })
1212 }
1213 return err
952} 1214}
953 1215
954func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { 1216func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
@@ -977,47 +1239,27 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
977 } 1239 }
978 service := sm[:pos] 1240 service := sm[:pos]
979 method := sm[pos+1:] 1241 method := sm[pos+1:]
980 srv, ok := s.m[service] 1242
981 if !ok { 1243 if srv, ok := s.m[service]; ok {
982 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1244 if md, ok := srv.md[method]; ok {
983 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) 1245 s.processUnaryRPC(t, stream, srv, md, trInfo)
984 return 1246 return
985 } 1247 }
986 if trInfo != nil { 1248 if sd, ok := srv.sd[method]; ok {
987 trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true) 1249 s.processStreamingRPC(t, stream, srv, sd, trInfo)
988 trInfo.tr.SetError() 1250 return
989 }
990 errDesc := fmt.Sprintf("unknown service %v", service)
991 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
992 if trInfo != nil {
993 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
994 trInfo.tr.SetError()
995 }
996 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
997 }
998 if trInfo != nil {
999 trInfo.tr.Finish()
1000 } 1251 }
1001 return
1002 }
1003 // Unary RPC or Streaming RPC?
1004 if md, ok := srv.md[method]; ok {
1005 s.processUnaryRPC(t, stream, srv, md, trInfo)
1006 return
1007 } 1252 }
1008 if sd, ok := srv.sd[method]; ok { 1253 // Unknown service, or known server unknown method.
1009 s.processStreamingRPC(t, stream, srv, sd, trInfo) 1254 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1255 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1010 return 1256 return
1011 } 1257 }
1012 if trInfo != nil { 1258 if trInfo != nil {
1013 trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true) 1259 trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
1014 trInfo.tr.SetError() 1260 trInfo.tr.SetError()
1015 } 1261 }
1016 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { 1262 errDesc := fmt.Sprintf("unknown service %v", service)
1017 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
1018 return
1019 }
1020 errDesc := fmt.Sprintf("unknown method %v", method)
1021 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { 1263 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1022 if trInfo != nil { 1264 if trInfo != nil {
1023 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) 1265 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
@@ -1030,12 +1272,65 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
1030 } 1272 }
1031} 1273}
1032 1274
1275// The key to save ServerTransportStream in the context.
1276type streamKey struct{}
1277
1278// NewContextWithServerTransportStream creates a new context from ctx and
1279// attaches stream to it.
1280//
1281// This API is EXPERIMENTAL.
1282func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1283 return context.WithValue(ctx, streamKey{}, stream)
1284}
1285
1286// ServerTransportStream is a minimal interface that a transport stream must
1287// implement. This can be used to mock an actual transport stream for tests of
1288// handler code that use, for example, grpc.SetHeader (which requires some
1289// stream to be in context).
1290//
1291// See also NewContextWithServerTransportStream.
1292//
1293// This API is EXPERIMENTAL.
1294type ServerTransportStream interface {
1295 Method() string
1296 SetHeader(md metadata.MD) error
1297 SendHeader(md metadata.MD) error
1298 SetTrailer(md metadata.MD) error
1299}
1300
1301// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1302// ctx. Returns nil if the given context has no stream associated with it
1303// (which implies it is not an RPC invocation context).
1304//
1305// This API is EXPERIMENTAL.
1306func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1307 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1308 return s
1309}
1310
1033// Stop stops the gRPC server. It immediately closes all open 1311// Stop stops the gRPC server. It immediately closes all open
1034// connections and listeners. 1312// connections and listeners.
1035// It cancels all active RPCs on the server side and the corresponding 1313// It cancels all active RPCs on the server side and the corresponding
1036// pending RPCs on the client side will get notified by connection 1314// pending RPCs on the client side will get notified by connection
1037// errors. 1315// errors.
1038func (s *Server) Stop() { 1316func (s *Server) Stop() {
1317 s.quitOnce.Do(func() {
1318 close(s.quit)
1319 })
1320
1321 defer func() {
1322 s.serveWG.Wait()
1323 s.doneOnce.Do(func() {
1324 close(s.done)
1325 })
1326 }()
1327
1328 s.channelzRemoveOnce.Do(func() {
1329 if channelz.IsOn() {
1330 channelz.RemoveEntry(s.channelzID)
1331 }
1332 })
1333
1039 s.mu.Lock() 1334 s.mu.Lock()
1040 listeners := s.lis 1335 listeners := s.lis
1041 s.lis = nil 1336 s.lis = nil
@@ -1053,7 +1348,6 @@ func (s *Server) Stop() {
1053 } 1348 }
1054 1349
1055 s.mu.Lock() 1350 s.mu.Lock()
1056 s.cancel()
1057 if s.events != nil { 1351 if s.events != nil {
1058 s.events.Finish() 1352 s.events.Finish()
1059 s.events = nil 1353 s.events = nil
@@ -1065,22 +1359,44 @@ func (s *Server) Stop() {
1065// accepting new connections and RPCs and blocks until all the pending RPCs are 1359// accepting new connections and RPCs and blocks until all the pending RPCs are
1066// finished. 1360// finished.
1067func (s *Server) GracefulStop() { 1361func (s *Server) GracefulStop() {
1362 s.quitOnce.Do(func() {
1363 close(s.quit)
1364 })
1365
1366 defer func() {
1367 s.doneOnce.Do(func() {
1368 close(s.done)
1369 })
1370 }()
1371
1372 s.channelzRemoveOnce.Do(func() {
1373 if channelz.IsOn() {
1374 channelz.RemoveEntry(s.channelzID)
1375 }
1376 })
1068 s.mu.Lock() 1377 s.mu.Lock()
1069 defer s.mu.Unlock()
1070 if s.conns == nil { 1378 if s.conns == nil {
1379 s.mu.Unlock()
1071 return 1380 return
1072 } 1381 }
1382
1073 for lis := range s.lis { 1383 for lis := range s.lis {
1074 lis.Close() 1384 lis.Close()
1075 } 1385 }
1076 s.lis = nil 1386 s.lis = nil
1077 s.cancel()
1078 if !s.drain { 1387 if !s.drain {
1079 for c := range s.conns { 1388 for c := range s.conns {
1080 c.(transport.ServerTransport).Drain() 1389 c.(transport.ServerTransport).Drain()
1081 } 1390 }
1082 s.drain = true 1391 s.drain = true
1083 } 1392 }
1393
1394 // Wait for serving threads to be ready to exit. Only then can we be sure no
1395 // new conns will be created.
1396 s.mu.Unlock()
1397 s.serveWG.Wait()
1398 s.mu.Lock()
1399
1084 for len(s.conns) != 0 { 1400 for len(s.conns) != 0 {
1085 s.cv.Wait() 1401 s.cv.Wait()
1086 } 1402 }
@@ -1089,26 +1405,23 @@ func (s *Server) GracefulStop() {
1089 s.events.Finish() 1405 s.events.Finish()
1090 s.events = nil 1406 s.events = nil
1091 } 1407 }
1408 s.mu.Unlock()
1092} 1409}
1093 1410
1094func init() { 1411// contentSubtype must be lowercase
1095 internal.TestingCloseConns = func(arg interface{}) { 1412// cannot return nil
1096 arg.(*Server).testingCloseConns() 1413func (s *Server) getCodec(contentSubtype string) baseCodec {
1414 if s.opts.codec != nil {
1415 return s.opts.codec
1097 } 1416 }
1098 internal.TestingUseHandlerImpl = func(arg interface{}) { 1417 if contentSubtype == "" {
1099 arg.(*Server).opts.useHandlerImpl = true 1418 return encoding.GetCodec(proto.Name)
1100 } 1419 }
1101} 1420 codec := encoding.GetCodec(contentSubtype)
1102 1421 if codec == nil {
1103// testingCloseConns closes all existing transports but keeps s.lis 1422 return encoding.GetCodec(proto.Name)
1104// accepting new connections.
1105func (s *Server) testingCloseConns() {
1106 s.mu.Lock()
1107 for c := range s.conns {
1108 c.Close()
1109 delete(s.conns, c)
1110 } 1423 }
1111 s.mu.Unlock() 1424 return codec
1112} 1425}
1113 1426
1114// SetHeader sets the header metadata. 1427// SetHeader sets the header metadata.
@@ -1121,9 +1434,9 @@ func SetHeader(ctx context.Context, md metadata.MD) error {
1121 if md.Len() == 0 { 1434 if md.Len() == 0 {
1122 return nil 1435 return nil
1123 } 1436 }
1124 stream, ok := transport.StreamFromContext(ctx) 1437 stream := ServerTransportStreamFromContext(ctx)
1125 if !ok { 1438 if stream == nil {
1126 return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1439 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1127 } 1440 }
1128 return stream.SetHeader(md) 1441 return stream.SetHeader(md)
1129} 1442}
@@ -1131,15 +1444,11 @@ func SetHeader(ctx context.Context, md metadata.MD) error {
1131// SendHeader sends header metadata. It may be called at most once. 1444// SendHeader sends header metadata. It may be called at most once.
1132// The provided md and headers set by SetHeader() will be sent. 1445// The provided md and headers set by SetHeader() will be sent.
1133func SendHeader(ctx context.Context, md metadata.MD) error { 1446func SendHeader(ctx context.Context, md metadata.MD) error {
1134 stream, ok := transport.StreamFromContext(ctx) 1447 stream := ServerTransportStreamFromContext(ctx)
1135 if !ok { 1448 if stream == nil {
1136 return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1449 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1137 } 1450 }
1138 t := stream.ServerTransport() 1451 if err := stream.SendHeader(md); err != nil {
1139 if t == nil {
1140 grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream)
1141 }
1142 if err := t.WriteHeader(stream, md); err != nil {
1143 return toRPCErr(err) 1452 return toRPCErr(err)
1144 } 1453 }
1145 return nil 1454 return nil
@@ -1151,9 +1460,27 @@ func SetTrailer(ctx context.Context, md metadata.MD) error {
1151 if md.Len() == 0 { 1460 if md.Len() == 0 {
1152 return nil 1461 return nil
1153 } 1462 }
1154 stream, ok := transport.StreamFromContext(ctx) 1463 stream := ServerTransportStreamFromContext(ctx)
1155 if !ok { 1464 if stream == nil {
1156 return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) 1465 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
1157 } 1466 }
1158 return stream.SetTrailer(md) 1467 return stream.SetTrailer(md)
1159} 1468}
1469
1470// Method returns the method string for the server context. The returned
1471// string is in the format of "/service/method".
1472func Method(ctx context.Context) (string, bool) {
1473 s := ServerTransportStreamFromContext(ctx)
1474 if s == nil {
1475 return "", false
1476 }
1477 return s.Method(), true
1478}
1479
1480type channelzServer struct {
1481 s *Server
1482}
1483
1484func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1485 return c.s.channelzMetric()
1486}