diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r-- | vendor/google.golang.org/grpc/server.go | 861 |
1 files changed, 594 insertions, 267 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index 42733e2..d705d7a 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go | |||
@@ -19,36 +19,41 @@ | |||
19 | package grpc | 19 | package grpc |
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "bytes" | 22 | "context" |
23 | "errors" | 23 | "errors" |
24 | "fmt" | 24 | "fmt" |
25 | "io" | 25 | "io" |
26 | "math" | ||
26 | "net" | 27 | "net" |
27 | "net/http" | 28 | "net/http" |
28 | "reflect" | 29 | "reflect" |
29 | "runtime" | 30 | "runtime" |
30 | "strings" | 31 | "strings" |
31 | "sync" | 32 | "sync" |
33 | "sync/atomic" | ||
32 | "time" | 34 | "time" |
33 | 35 | ||
34 | "golang.org/x/net/context" | ||
35 | "golang.org/x/net/http2" | ||
36 | "golang.org/x/net/trace" | 36 | "golang.org/x/net/trace" |
37 | |||
37 | "google.golang.org/grpc/codes" | 38 | "google.golang.org/grpc/codes" |
38 | "google.golang.org/grpc/credentials" | 39 | "google.golang.org/grpc/credentials" |
40 | "google.golang.org/grpc/encoding" | ||
41 | "google.golang.org/grpc/encoding/proto" | ||
39 | "google.golang.org/grpc/grpclog" | 42 | "google.golang.org/grpc/grpclog" |
40 | "google.golang.org/grpc/internal" | 43 | "google.golang.org/grpc/internal/binarylog" |
44 | "google.golang.org/grpc/internal/channelz" | ||
45 | "google.golang.org/grpc/internal/transport" | ||
41 | "google.golang.org/grpc/keepalive" | 46 | "google.golang.org/grpc/keepalive" |
42 | "google.golang.org/grpc/metadata" | 47 | "google.golang.org/grpc/metadata" |
48 | "google.golang.org/grpc/peer" | ||
43 | "google.golang.org/grpc/stats" | 49 | "google.golang.org/grpc/stats" |
44 | "google.golang.org/grpc/status" | 50 | "google.golang.org/grpc/status" |
45 | "google.golang.org/grpc/tap" | 51 | "google.golang.org/grpc/tap" |
46 | "google.golang.org/grpc/transport" | ||
47 | ) | 52 | ) |
48 | 53 | ||
49 | const ( | 54 | const ( |
50 | defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 | 55 | defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 |
51 | defaultServerMaxSendMessageSize = 1024 * 1024 * 4 | 56 | defaultServerMaxSendMessageSize = math.MaxInt32 |
52 | ) | 57 | ) |
53 | 58 | ||
54 | type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) | 59 | type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) |
@@ -88,18 +93,24 @@ type Server struct { | |||
88 | conns map[io.Closer]bool | 93 | conns map[io.Closer]bool |
89 | serve bool | 94 | serve bool |
90 | drain bool | 95 | drain bool |
91 | ctx context.Context | 96 | cv *sync.Cond // signaled when connections close for GracefulStop |
92 | cancel context.CancelFunc | ||
93 | // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished | ||
94 | // and all the transport goes away. | ||
95 | cv *sync.Cond | ||
96 | m map[string]*service // service name -> service info | 97 | m map[string]*service // service name -> service info |
97 | events trace.EventLog | 98 | events trace.EventLog |
99 | |||
100 | quit chan struct{} | ||
101 | done chan struct{} | ||
102 | quitOnce sync.Once | ||
103 | doneOnce sync.Once | ||
104 | channelzRemoveOnce sync.Once | ||
105 | serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop | ||
106 | |||
107 | channelzID int64 // channelz unique identification number | ||
108 | czData *channelzData | ||
98 | } | 109 | } |
99 | 110 | ||
100 | type options struct { | 111 | type options struct { |
101 | creds credentials.TransportCredentials | 112 | creds credentials.TransportCredentials |
102 | codec Codec | 113 | codec baseCodec |
103 | cp Compressor | 114 | cp Compressor |
104 | dc Decompressor | 115 | dc Decompressor |
105 | unaryInt UnaryServerInterceptor | 116 | unaryInt UnaryServerInterceptor |
@@ -109,22 +120,50 @@ type options struct { | |||
109 | maxConcurrentStreams uint32 | 120 | maxConcurrentStreams uint32 |
110 | maxReceiveMessageSize int | 121 | maxReceiveMessageSize int |
111 | maxSendMessageSize int | 122 | maxSendMessageSize int |
112 | useHandlerImpl bool // use http.Handler-based server | ||
113 | unknownStreamDesc *StreamDesc | 123 | unknownStreamDesc *StreamDesc |
114 | keepaliveParams keepalive.ServerParameters | 124 | keepaliveParams keepalive.ServerParameters |
115 | keepalivePolicy keepalive.EnforcementPolicy | 125 | keepalivePolicy keepalive.EnforcementPolicy |
116 | initialWindowSize int32 | 126 | initialWindowSize int32 |
117 | initialConnWindowSize int32 | 127 | initialConnWindowSize int32 |
128 | writeBufferSize int | ||
129 | readBufferSize int | ||
130 | connectionTimeout time.Duration | ||
131 | maxHeaderListSize *uint32 | ||
118 | } | 132 | } |
119 | 133 | ||
120 | var defaultServerOptions = options{ | 134 | var defaultServerOptions = options{ |
121 | maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, | 135 | maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, |
122 | maxSendMessageSize: defaultServerMaxSendMessageSize, | 136 | maxSendMessageSize: defaultServerMaxSendMessageSize, |
137 | connectionTimeout: 120 * time.Second, | ||
138 | writeBufferSize: defaultWriteBufSize, | ||
139 | readBufferSize: defaultReadBufSize, | ||
123 | } | 140 | } |
124 | 141 | ||
125 | // A ServerOption sets options such as credentials, codec and keepalive parameters, etc. | 142 | // A ServerOption sets options such as credentials, codec and keepalive parameters, etc. |
126 | type ServerOption func(*options) | 143 | type ServerOption func(*options) |
127 | 144 | ||
145 | // WriteBufferSize determines how much data can be batched before doing a write on the wire. | ||
146 | // The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. | ||
147 | // The default value for this buffer is 32KB. | ||
148 | // Zero will disable the write buffer such that each write will be on underlying connection. | ||
149 | // Note: A Send call may not directly translate to a write. | ||
150 | func WriteBufferSize(s int) ServerOption { | ||
151 | return func(o *options) { | ||
152 | o.writeBufferSize = s | ||
153 | } | ||
154 | } | ||
155 | |||
156 | // ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most | ||
157 | // for one read syscall. | ||
158 | // The default value for this buffer is 32KB. | ||
159 | // Zero will disable read buffer for a connection so data framer can access the underlying | ||
160 | // conn directly. | ||
161 | func ReadBufferSize(s int) ServerOption { | ||
162 | return func(o *options) { | ||
163 | o.readBufferSize = s | ||
164 | } | ||
165 | } | ||
166 | |||
128 | // InitialWindowSize returns a ServerOption that sets window size for stream. | 167 | // InitialWindowSize returns a ServerOption that sets window size for stream. |
129 | // The lower bound for window size is 64K and any value smaller than that will be ignored. | 168 | // The lower bound for window size is 64K and any value smaller than that will be ignored. |
130 | func InitialWindowSize(s int32) ServerOption { | 169 | func InitialWindowSize(s int32) ServerOption { |
@@ -156,20 +195,32 @@ func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { | |||
156 | } | 195 | } |
157 | 196 | ||
158 | // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. | 197 | // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. |
198 | // | ||
199 | // This will override any lookups by content-subtype for Codecs registered with RegisterCodec. | ||
159 | func CustomCodec(codec Codec) ServerOption { | 200 | func CustomCodec(codec Codec) ServerOption { |
160 | return func(o *options) { | 201 | return func(o *options) { |
161 | o.codec = codec | 202 | o.codec = codec |
162 | } | 203 | } |
163 | } | 204 | } |
164 | 205 | ||
165 | // RPCCompressor returns a ServerOption that sets a compressor for outbound messages. | 206 | // RPCCompressor returns a ServerOption that sets a compressor for outbound |
207 | // messages. For backward compatibility, all outbound messages will be sent | ||
208 | // using this compressor, regardless of incoming message compression. By | ||
209 | // default, server messages will be sent using the same compressor with which | ||
210 | // request messages were sent. | ||
211 | // | ||
212 | // Deprecated: use encoding.RegisterCompressor instead. | ||
166 | func RPCCompressor(cp Compressor) ServerOption { | 213 | func RPCCompressor(cp Compressor) ServerOption { |
167 | return func(o *options) { | 214 | return func(o *options) { |
168 | o.cp = cp | 215 | o.cp = cp |
169 | } | 216 | } |
170 | } | 217 | } |
171 | 218 | ||
172 | // RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages. | 219 | // RPCDecompressor returns a ServerOption that sets a decompressor for inbound |
220 | // messages. It has higher priority than decompressors registered via | ||
221 | // encoding.RegisterCompressor. | ||
222 | // | ||
223 | // Deprecated: use encoding.RegisterCompressor instead. | ||
173 | func RPCDecompressor(dc Decompressor) ServerOption { | 224 | func RPCDecompressor(dc Decompressor) ServerOption { |
174 | return func(o *options) { | 225 | return func(o *options) { |
175 | o.dc = dc | 226 | o.dc = dc |
@@ -177,7 +228,9 @@ func RPCDecompressor(dc Decompressor) ServerOption { | |||
177 | } | 228 | } |
178 | 229 | ||
179 | // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. | 230 | // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. |
180 | // If this is not set, gRPC uses the default limit. Deprecated: use MaxRecvMsgSize instead. | 231 | // If this is not set, gRPC uses the default limit. |
232 | // | ||
233 | // Deprecated: use MaxRecvMsgSize instead. | ||
181 | func MaxMsgSize(m int) ServerOption { | 234 | func MaxMsgSize(m int) ServerOption { |
182 | return MaxRecvMsgSize(m) | 235 | return MaxRecvMsgSize(m) |
183 | } | 236 | } |
@@ -259,7 +312,7 @@ func StatsHandler(h stats.Handler) ServerOption { | |||
259 | // handler that will be invoked instead of returning the "unimplemented" gRPC | 312 | // handler that will be invoked instead of returning the "unimplemented" gRPC |
260 | // error whenever a request is received for an unregistered service or method. | 313 | // error whenever a request is received for an unregistered service or method. |
261 | // The handling function has full access to the Context of the request and the | 314 | // The handling function has full access to the Context of the request and the |
262 | // stream, and the invocation passes through interceptors. | 315 | // stream, and the invocation bypasses interceptors. |
263 | func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { | 316 | func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { |
264 | return func(o *options) { | 317 | return func(o *options) { |
265 | o.unknownStreamDesc = &StreamDesc{ | 318 | o.unknownStreamDesc = &StreamDesc{ |
@@ -272,6 +325,26 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { | |||
272 | } | 325 | } |
273 | } | 326 | } |
274 | 327 | ||
328 | // ConnectionTimeout returns a ServerOption that sets the timeout for | ||
329 | // connection establishment (up to and including HTTP/2 handshaking) for all | ||
330 | // new connections. If this is not set, the default is 120 seconds. A zero or | ||
331 | // negative value will result in an immediate timeout. | ||
332 | // | ||
333 | // This API is EXPERIMENTAL. | ||
334 | func ConnectionTimeout(d time.Duration) ServerOption { | ||
335 | return func(o *options) { | ||
336 | o.connectionTimeout = d | ||
337 | } | ||
338 | } | ||
339 | |||
340 | // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size | ||
341 | // of header list that the server is prepared to accept. | ||
342 | func MaxHeaderListSize(s uint32) ServerOption { | ||
343 | return func(o *options) { | ||
344 | o.maxHeaderListSize = &s | ||
345 | } | ||
346 | } | ||
347 | |||
275 | // NewServer creates a gRPC server which has no service registered and has not | 348 | // NewServer creates a gRPC server which has no service registered and has not |
276 | // started to accept requests yet. | 349 | // started to accept requests yet. |
277 | func NewServer(opt ...ServerOption) *Server { | 350 | func NewServer(opt ...ServerOption) *Server { |
@@ -279,22 +352,24 @@ func NewServer(opt ...ServerOption) *Server { | |||
279 | for _, o := range opt { | 352 | for _, o := range opt { |
280 | o(&opts) | 353 | o(&opts) |
281 | } | 354 | } |
282 | if opts.codec == nil { | ||
283 | // Set the default codec. | ||
284 | opts.codec = protoCodec{} | ||
285 | } | ||
286 | s := &Server{ | 355 | s := &Server{ |
287 | lis: make(map[net.Listener]bool), | 356 | lis: make(map[net.Listener]bool), |
288 | opts: opts, | 357 | opts: opts, |
289 | conns: make(map[io.Closer]bool), | 358 | conns: make(map[io.Closer]bool), |
290 | m: make(map[string]*service), | 359 | m: make(map[string]*service), |
360 | quit: make(chan struct{}), | ||
361 | done: make(chan struct{}), | ||
362 | czData: new(channelzData), | ||
291 | } | 363 | } |
292 | s.cv = sync.NewCond(&s.mu) | 364 | s.cv = sync.NewCond(&s.mu) |
293 | s.ctx, s.cancel = context.WithCancel(context.Background()) | ||
294 | if EnableTracing { | 365 | if EnableTracing { |
295 | _, file, line, _ := runtime.Caller(1) | 366 | _, file, line, _ := runtime.Caller(1) |
296 | s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) | 367 | s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) |
297 | } | 368 | } |
369 | |||
370 | if channelz.IsOn() { | ||
371 | s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") | ||
372 | } | ||
298 | return s | 373 | return s |
299 | } | 374 | } |
300 | 375 | ||
@@ -399,11 +474,9 @@ func (s *Server) GetServiceInfo() map[string]ServiceInfo { | |||
399 | return ret | 474 | return ret |
400 | } | 475 | } |
401 | 476 | ||
402 | var ( | 477 | // ErrServerStopped indicates that the operation is now illegal because of |
403 | // ErrServerStopped indicates that the operation is now illegal because of | 478 | // the server being stopped. |
404 | // the server being stopped. | 479 | var ErrServerStopped = errors.New("grpc: the server has been stopped") |
405 | ErrServerStopped = errors.New("grpc: the server has been stopped") | ||
406 | ) | ||
407 | 480 | ||
408 | func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { | 481 | func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { |
409 | if s.opts.creds == nil { | 482 | if s.opts.creds == nil { |
@@ -412,28 +485,67 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti | |||
412 | return s.opts.creds.ServerHandshake(rawConn) | 485 | return s.opts.creds.ServerHandshake(rawConn) |
413 | } | 486 | } |
414 | 487 | ||
488 | type listenSocket struct { | ||
489 | net.Listener | ||
490 | channelzID int64 | ||
491 | } | ||
492 | |||
493 | func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { | ||
494 | return &channelz.SocketInternalMetric{ | ||
495 | SocketOptions: channelz.GetSocketOption(l.Listener), | ||
496 | LocalAddr: l.Listener.Addr(), | ||
497 | } | ||
498 | } | ||
499 | |||
500 | func (l *listenSocket) Close() error { | ||
501 | err := l.Listener.Close() | ||
502 | if channelz.IsOn() { | ||
503 | channelz.RemoveEntry(l.channelzID) | ||
504 | } | ||
505 | return err | ||
506 | } | ||
507 | |||
415 | // Serve accepts incoming connections on the listener lis, creating a new | 508 | // Serve accepts incoming connections on the listener lis, creating a new |
416 | // ServerTransport and service goroutine for each. The service goroutines | 509 | // ServerTransport and service goroutine for each. The service goroutines |
417 | // read gRPC requests and then call the registered handlers to reply to them. | 510 | // read gRPC requests and then call the registered handlers to reply to them. |
418 | // Serve returns when lis.Accept fails with fatal errors. lis will be closed when | 511 | // Serve returns when lis.Accept fails with fatal errors. lis will be closed when |
419 | // this method returns. | 512 | // this method returns. |
420 | // Serve always returns non-nil error. | 513 | // Serve will return a non-nil error unless Stop or GracefulStop is called. |
421 | func (s *Server) Serve(lis net.Listener) error { | 514 | func (s *Server) Serve(lis net.Listener) error { |
422 | s.mu.Lock() | 515 | s.mu.Lock() |
423 | s.printf("serving") | 516 | s.printf("serving") |
424 | s.serve = true | 517 | s.serve = true |
425 | if s.lis == nil { | 518 | if s.lis == nil { |
519 | // Serve called after Stop or GracefulStop. | ||
426 | s.mu.Unlock() | 520 | s.mu.Unlock() |
427 | lis.Close() | 521 | lis.Close() |
428 | return ErrServerStopped | 522 | return ErrServerStopped |
429 | } | 523 | } |
430 | s.lis[lis] = true | 524 | |
525 | s.serveWG.Add(1) | ||
526 | defer func() { | ||
527 | s.serveWG.Done() | ||
528 | select { | ||
529 | // Stop or GracefulStop called; block until done and return nil. | ||
530 | case <-s.quit: | ||
531 | <-s.done | ||
532 | default: | ||
533 | } | ||
534 | }() | ||
535 | |||
536 | ls := &listenSocket{Listener: lis} | ||
537 | s.lis[ls] = true | ||
538 | |||
539 | if channelz.IsOn() { | ||
540 | ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) | ||
541 | } | ||
431 | s.mu.Unlock() | 542 | s.mu.Unlock() |
543 | |||
432 | defer func() { | 544 | defer func() { |
433 | s.mu.Lock() | 545 | s.mu.Lock() |
434 | if s.lis != nil && s.lis[lis] { | 546 | if s.lis != nil && s.lis[ls] { |
435 | lis.Close() | 547 | ls.Close() |
436 | delete(s.lis, lis) | 548 | delete(s.lis, ls) |
437 | } | 549 | } |
438 | s.mu.Unlock() | 550 | s.mu.Unlock() |
439 | }() | 551 | }() |
@@ -460,36 +572,52 @@ func (s *Server) Serve(lis net.Listener) error { | |||
460 | timer := time.NewTimer(tempDelay) | 572 | timer := time.NewTimer(tempDelay) |
461 | select { | 573 | select { |
462 | case <-timer.C: | 574 | case <-timer.C: |
463 | case <-s.ctx.Done(): | 575 | case <-s.quit: |
576 | timer.Stop() | ||
577 | return nil | ||
464 | } | 578 | } |
465 | timer.Stop() | ||
466 | continue | 579 | continue |
467 | } | 580 | } |
468 | s.mu.Lock() | 581 | s.mu.Lock() |
469 | s.printf("done serving; Accept = %v", err) | 582 | s.printf("done serving; Accept = %v", err) |
470 | s.mu.Unlock() | 583 | s.mu.Unlock() |
584 | |||
585 | select { | ||
586 | case <-s.quit: | ||
587 | return nil | ||
588 | default: | ||
589 | } | ||
471 | return err | 590 | return err |
472 | } | 591 | } |
473 | tempDelay = 0 | 592 | tempDelay = 0 |
474 | // Start a new goroutine to deal with rawConn | 593 | // Start a new goroutine to deal with rawConn so we don't stall this Accept |
475 | // so we don't stall this Accept loop goroutine. | 594 | // loop goroutine. |
476 | go s.handleRawConn(rawConn) | 595 | // |
596 | // Make sure we account for the goroutine so GracefulStop doesn't nil out | ||
597 | // s.conns before this conn can be added. | ||
598 | s.serveWG.Add(1) | ||
599 | go func() { | ||
600 | s.handleRawConn(rawConn) | ||
601 | s.serveWG.Done() | ||
602 | }() | ||
477 | } | 603 | } |
478 | } | 604 | } |
479 | 605 | ||
480 | // handleRawConn is run in its own goroutine and handles a just-accepted | 606 | // handleRawConn forks a goroutine to handle a just-accepted connection that |
481 | // connection that has not had any I/O performed on it yet. | 607 | // has not had any I/O performed on it yet. |
482 | func (s *Server) handleRawConn(rawConn net.Conn) { | 608 | func (s *Server) handleRawConn(rawConn net.Conn) { |
609 | rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) | ||
483 | conn, authInfo, err := s.useTransportAuthenticator(rawConn) | 610 | conn, authInfo, err := s.useTransportAuthenticator(rawConn) |
484 | if err != nil { | 611 | if err != nil { |
485 | s.mu.Lock() | 612 | s.mu.Lock() |
486 | s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) | 613 | s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) |
487 | s.mu.Unlock() | 614 | s.mu.Unlock() |
488 | grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) | 615 | grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) |
489 | // If serverHandShake returns ErrConnDispatched, keep rawConn open. | 616 | // If serverHandshake returns ErrConnDispatched, keep rawConn open. |
490 | if err != credentials.ErrConnDispatched { | 617 | if err != credentials.ErrConnDispatched { |
491 | rawConn.Close() | 618 | rawConn.Close() |
492 | } | 619 | } |
620 | rawConn.SetDeadline(time.Time{}) | ||
493 | return | 621 | return |
494 | } | 622 | } |
495 | 623 | ||
@@ -501,19 +629,25 @@ func (s *Server) handleRawConn(rawConn net.Conn) { | |||
501 | } | 629 | } |
502 | s.mu.Unlock() | 630 | s.mu.Unlock() |
503 | 631 | ||
504 | if s.opts.useHandlerImpl { | 632 | // Finish handshaking (HTTP2) |
505 | s.serveUsingHandler(conn) | 633 | st := s.newHTTP2Transport(conn, authInfo) |
506 | } else { | 634 | if st == nil { |
507 | s.serveHTTP2Transport(conn, authInfo) | 635 | return |
636 | } | ||
637 | |||
638 | rawConn.SetDeadline(time.Time{}) | ||
639 | if !s.addConn(st) { | ||
640 | return | ||
508 | } | 641 | } |
642 | go func() { | ||
643 | s.serveStreams(st) | ||
644 | s.removeConn(st) | ||
645 | }() | ||
509 | } | 646 | } |
510 | 647 | ||
511 | // serveHTTP2Transport sets up a http/2 transport (using the | 648 | // newHTTP2Transport sets up a http/2 transport (using the |
512 | // gRPC http2 server transport in transport/http2_server.go) and | 649 | // gRPC http2 server transport in transport/http2_server.go). |
513 | // serves streams on it. | 650 | func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport { |
514 | // This is run in its own goroutine (it does network I/O in | ||
515 | // transport.NewServerTransport). | ||
516 | func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) { | ||
517 | config := &transport.ServerConfig{ | 651 | config := &transport.ServerConfig{ |
518 | MaxStreams: s.opts.maxConcurrentStreams, | 652 | MaxStreams: s.opts.maxConcurrentStreams, |
519 | AuthInfo: authInfo, | 653 | AuthInfo: authInfo, |
@@ -523,6 +657,10 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) | |||
523 | KeepalivePolicy: s.opts.keepalivePolicy, | 657 | KeepalivePolicy: s.opts.keepalivePolicy, |
524 | InitialWindowSize: s.opts.initialWindowSize, | 658 | InitialWindowSize: s.opts.initialWindowSize, |
525 | InitialConnWindowSize: s.opts.initialConnWindowSize, | 659 | InitialConnWindowSize: s.opts.initialConnWindowSize, |
660 | WriteBufferSize: s.opts.writeBufferSize, | ||
661 | ReadBufferSize: s.opts.readBufferSize, | ||
662 | ChannelzParentID: s.channelzID, | ||
663 | MaxHeaderListSize: s.opts.maxHeaderListSize, | ||
526 | } | 664 | } |
527 | st, err := transport.NewServerTransport("http2", c, config) | 665 | st, err := transport.NewServerTransport("http2", c, config) |
528 | if err != nil { | 666 | if err != nil { |
@@ -531,17 +669,13 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) | |||
531 | s.mu.Unlock() | 669 | s.mu.Unlock() |
532 | c.Close() | 670 | c.Close() |
533 | grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err) | 671 | grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err) |
534 | return | 672 | return nil |
535 | } | ||
536 | if !s.addConn(st) { | ||
537 | st.Close() | ||
538 | return | ||
539 | } | 673 | } |
540 | s.serveStreams(st) | 674 | |
675 | return st | ||
541 | } | 676 | } |
542 | 677 | ||
543 | func (s *Server) serveStreams(st transport.ServerTransport) { | 678 | func (s *Server) serveStreams(st transport.ServerTransport) { |
544 | defer s.removeConn(st) | ||
545 | defer st.Close() | 679 | defer st.Close() |
546 | var wg sync.WaitGroup | 680 | var wg sync.WaitGroup |
547 | st.HandleStreams(func(stream *transport.Stream) { | 681 | st.HandleStreams(func(stream *transport.Stream) { |
@@ -562,32 +696,6 @@ func (s *Server) serveStreams(st transport.ServerTransport) { | |||
562 | 696 | ||
563 | var _ http.Handler = (*Server)(nil) | 697 | var _ http.Handler = (*Server)(nil) |
564 | 698 | ||
565 | // serveUsingHandler is called from handleRawConn when s is configured | ||
566 | // to handle requests via the http.Handler interface. It sets up a | ||
567 | // net/http.Server to handle the just-accepted conn. The http.Server | ||
568 | // is configured to route all incoming requests (all HTTP/2 streams) | ||
569 | // to ServeHTTP, which creates a new ServerTransport for each stream. | ||
570 | // serveUsingHandler blocks until conn closes. | ||
571 | // | ||
572 | // This codepath is only used when Server.TestingUseHandlerImpl has | ||
573 | // been configured. This lets the end2end tests exercise the ServeHTTP | ||
574 | // method as one of the environment types. | ||
575 | // | ||
576 | // conn is the *tls.Conn that's already been authenticated. | ||
577 | func (s *Server) serveUsingHandler(conn net.Conn) { | ||
578 | if !s.addConn(conn) { | ||
579 | conn.Close() | ||
580 | return | ||
581 | } | ||
582 | defer s.removeConn(conn) | ||
583 | h2s := &http2.Server{ | ||
584 | MaxConcurrentStreams: s.opts.maxConcurrentStreams, | ||
585 | } | ||
586 | h2s.ServeConn(conn, &http2.ServeConnOpts{ | ||
587 | Handler: s, | ||
588 | }) | ||
589 | } | ||
590 | |||
591 | // ServeHTTP implements the Go standard library's http.Handler | 699 | // ServeHTTP implements the Go standard library's http.Handler |
592 | // interface by responding to the gRPC request r, by looking up | 700 | // interface by responding to the gRPC request r, by looking up |
593 | // the requested gRPC method in the gRPC server s. | 701 | // the requested gRPC method in the gRPC server s. |
@@ -613,13 +721,12 @@ func (s *Server) serveUsingHandler(conn net.Conn) { | |||
613 | // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL | 721 | // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL |
614 | // and subject to change. | 722 | // and subject to change. |
615 | func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { | 723 | func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
616 | st, err := transport.NewServerHandlerTransport(w, r) | 724 | st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) |
617 | if err != nil { | 725 | if err != nil { |
618 | http.Error(w, err.Error(), http.StatusInternalServerError) | 726 | http.Error(w, err.Error(), http.StatusInternalServerError) |
619 | return | 727 | return |
620 | } | 728 | } |
621 | if !s.addConn(st) { | 729 | if !s.addConn(st) { |
622 | st.Close() | ||
623 | return | 730 | return |
624 | } | 731 | } |
625 | defer s.removeConn(st) | 732 | defer s.removeConn(st) |
@@ -649,9 +756,15 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea | |||
649 | func (s *Server) addConn(c io.Closer) bool { | 756 | func (s *Server) addConn(c io.Closer) bool { |
650 | s.mu.Lock() | 757 | s.mu.Lock() |
651 | defer s.mu.Unlock() | 758 | defer s.mu.Unlock() |
652 | if s.conns == nil || s.drain { | 759 | if s.conns == nil { |
760 | c.Close() | ||
653 | return false | 761 | return false |
654 | } | 762 | } |
763 | if s.drain { | ||
764 | // Transport added after we drained our existing conns: drain it | ||
765 | // immediately. | ||
766 | c.(transport.ServerTransport).Drain() | ||
767 | } | ||
655 | s.conns[c] = true | 768 | s.conns[c] = true |
656 | return true | 769 | return true |
657 | } | 770 | } |
@@ -665,43 +778,73 @@ func (s *Server) removeConn(c io.Closer) { | |||
665 | } | 778 | } |
666 | } | 779 | } |
667 | 780 | ||
668 | func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error { | 781 | func (s *Server) channelzMetric() *channelz.ServerInternalMetric { |
669 | var ( | 782 | return &channelz.ServerInternalMetric{ |
670 | cbuf *bytes.Buffer | 783 | CallsStarted: atomic.LoadInt64(&s.czData.callsStarted), |
671 | outPayload *stats.OutPayload | 784 | CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded), |
672 | ) | 785 | CallsFailed: atomic.LoadInt64(&s.czData.callsFailed), |
673 | if cp != nil { | 786 | LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)), |
674 | cbuf = new(bytes.Buffer) | ||
675 | } | 787 | } |
676 | if s.opts.statsHandler != nil { | 788 | } |
677 | outPayload = &stats.OutPayload{} | 789 | |
678 | } | 790 | func (s *Server) incrCallsStarted() { |
679 | p, err := encode(s.opts.codec, msg, cp, cbuf, outPayload) | 791 | atomic.AddInt64(&s.czData.callsStarted, 1) |
792 | atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano()) | ||
793 | } | ||
794 | |||
795 | func (s *Server) incrCallsSucceeded() { | ||
796 | atomic.AddInt64(&s.czData.callsSucceeded, 1) | ||
797 | } | ||
798 | |||
799 | func (s *Server) incrCallsFailed() { | ||
800 | atomic.AddInt64(&s.czData.callsFailed, 1) | ||
801 | } | ||
802 | |||
803 | func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { | ||
804 | data, err := encode(s.getCodec(stream.ContentSubtype()), msg) | ||
680 | if err != nil { | 805 | if err != nil { |
681 | grpclog.Errorln("grpc: server failed to encode response: ", err) | 806 | grpclog.Errorln("grpc: server failed to encode response: ", err) |
682 | return err | 807 | return err |
683 | } | 808 | } |
684 | if len(p) > s.opts.maxSendMessageSize { | 809 | compData, err := compress(data, cp, comp) |
685 | return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(p), s.opts.maxSendMessageSize) | 810 | if err != nil { |
811 | grpclog.Errorln("grpc: server failed to compress response: ", err) | ||
812 | return err | ||
813 | } | ||
814 | hdr, payload := msgHeader(data, compData) | ||
815 | // TODO(dfawley): should we be checking len(data) instead? | ||
816 | if len(payload) > s.opts.maxSendMessageSize { | ||
817 | return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) | ||
686 | } | 818 | } |
687 | err = t.Write(stream, p, opts) | 819 | err = t.Write(stream, hdr, payload, opts) |
688 | if err == nil && outPayload != nil { | 820 | if err == nil && s.opts.statsHandler != nil { |
689 | outPayload.SentTime = time.Now() | 821 | s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) |
690 | s.opts.statsHandler.HandleRPC(stream.Context(), outPayload) | ||
691 | } | 822 | } |
692 | return err | 823 | return err |
693 | } | 824 | } |
694 | 825 | ||
695 | func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { | 826 | func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { |
827 | if channelz.IsOn() { | ||
828 | s.incrCallsStarted() | ||
829 | defer func() { | ||
830 | if err != nil && err != io.EOF { | ||
831 | s.incrCallsFailed() | ||
832 | } else { | ||
833 | s.incrCallsSucceeded() | ||
834 | } | ||
835 | }() | ||
836 | } | ||
696 | sh := s.opts.statsHandler | 837 | sh := s.opts.statsHandler |
697 | if sh != nil { | 838 | if sh != nil { |
839 | beginTime := time.Now() | ||
698 | begin := &stats.Begin{ | 840 | begin := &stats.Begin{ |
699 | BeginTime: time.Now(), | 841 | BeginTime: beginTime, |
700 | } | 842 | } |
701 | sh.HandleRPC(stream.Context(), begin) | 843 | sh.HandleRPC(stream.Context(), begin) |
702 | defer func() { | 844 | defer func() { |
703 | end := &stats.End{ | 845 | end := &stats.End{ |
704 | EndTime: time.Now(), | 846 | BeginTime: beginTime, |
847 | EndTime: time.Now(), | ||
705 | } | 848 | } |
706 | if err != nil && err != io.EOF { | 849 | if err != nil && err != io.EOF { |
707 | end.Error = toRPCErr(err) | 850 | end.Error = toRPCErr(err) |
@@ -720,94 +863,112 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. | |||
720 | } | 863 | } |
721 | }() | 864 | }() |
722 | } | 865 | } |
723 | if s.opts.cp != nil { | 866 | |
724 | // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. | 867 | binlog := binarylog.GetMethodLogger(stream.Method()) |
725 | stream.SetSendCompress(s.opts.cp.Type()) | 868 | if binlog != nil { |
869 | ctx := stream.Context() | ||
870 | md, _ := metadata.FromIncomingContext(ctx) | ||
871 | logEntry := &binarylog.ClientHeader{ | ||
872 | Header: md, | ||
873 | MethodName: stream.Method(), | ||
874 | PeerAddr: nil, | ||
875 | } | ||
876 | if deadline, ok := ctx.Deadline(); ok { | ||
877 | logEntry.Timeout = deadline.Sub(time.Now()) | ||
878 | if logEntry.Timeout < 0 { | ||
879 | logEntry.Timeout = 0 | ||
880 | } | ||
881 | } | ||
882 | if a := md[":authority"]; len(a) > 0 { | ||
883 | logEntry.Authority = a[0] | ||
884 | } | ||
885 | if peer, ok := peer.FromContext(ctx); ok { | ||
886 | logEntry.PeerAddr = peer.Addr | ||
887 | } | ||
888 | binlog.Log(logEntry) | ||
889 | } | ||
890 | |||
891 | // comp and cp are used for compression. decomp and dc are used for | ||
892 | // decompression. If comp and decomp are both set, they are the same; | ||
893 | // however they are kept separate to ensure that at most one of the | ||
894 | // compressor/decompressor variable pairs are set for use later. | ||
895 | var comp, decomp encoding.Compressor | ||
896 | var cp Compressor | ||
897 | var dc Decompressor | ||
898 | |||
899 | // If dc is set and matches the stream's compression, use it. Otherwise, try | ||
900 | // to find a matching registered compressor for decomp. | ||
901 | if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { | ||
902 | dc = s.opts.dc | ||
903 | } else if rc != "" && rc != encoding.Identity { | ||
904 | decomp = encoding.GetCompressor(rc) | ||
905 | if decomp == nil { | ||
906 | st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) | ||
907 | t.WriteStatus(stream, st) | ||
908 | return st.Err() | ||
909 | } | ||
726 | } | 910 | } |
727 | p := &parser{r: stream} | 911 | |
728 | pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize) | 912 | // If cp is set, use it. Otherwise, attempt to compress the response using |
729 | if err == io.EOF { | 913 | // the incoming message compression method. |
730 | // The entire stream is done (for unary RPC only). | 914 | // |
731 | return err | 915 | // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. |
916 | if s.opts.cp != nil { | ||
917 | cp = s.opts.cp | ||
918 | stream.SetSendCompress(cp.Type()) | ||
919 | } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { | ||
920 | // Legacy compressor not specified; attempt to respond with same encoding. | ||
921 | comp = encoding.GetCompressor(rc) | ||
922 | if comp != nil { | ||
923 | stream.SetSendCompress(rc) | ||
924 | } | ||
732 | } | 925 | } |
733 | if err == io.ErrUnexpectedEOF { | 926 | |
734 | err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) | 927 | var payInfo *payloadInfo |
928 | if sh != nil || binlog != nil { | ||
929 | payInfo = &payloadInfo{} | ||
735 | } | 930 | } |
931 | d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) | ||
736 | if err != nil { | 932 | if err != nil { |
737 | if st, ok := status.FromError(err); ok { | 933 | if st, ok := status.FromError(err); ok { |
738 | if e := t.WriteStatus(stream, st); e != nil { | 934 | if e := t.WriteStatus(stream, st); e != nil { |
739 | grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) | 935 | grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) |
740 | } | 936 | } |
741 | } else { | ||
742 | switch st := err.(type) { | ||
743 | case transport.ConnectionError: | ||
744 | // Nothing to do here. | ||
745 | case transport.StreamError: | ||
746 | if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil { | ||
747 | grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) | ||
748 | } | ||
749 | default: | ||
750 | panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st)) | ||
751 | } | ||
752 | } | 937 | } |
753 | return err | 938 | return err |
754 | } | 939 | } |
755 | 940 | if channelz.IsOn() { | |
756 | if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil { | 941 | t.IncrMsgRecv() |
757 | if st, ok := status.FromError(err); ok { | ||
758 | if e := t.WriteStatus(stream, st); e != nil { | ||
759 | grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) | ||
760 | } | ||
761 | return err | ||
762 | } | ||
763 | if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil { | ||
764 | grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) | ||
765 | } | ||
766 | |||
767 | // TODO checkRecvPayload always return RPC error. Add a return here if necessary. | ||
768 | } | ||
769 | var inPayload *stats.InPayload | ||
770 | if sh != nil { | ||
771 | inPayload = &stats.InPayload{ | ||
772 | RecvTime: time.Now(), | ||
773 | } | ||
774 | } | 942 | } |
775 | df := func(v interface{}) error { | 943 | df := func(v interface{}) error { |
776 | if inPayload != nil { | 944 | if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { |
777 | inPayload.WireLength = len(req) | ||
778 | } | ||
779 | if pf == compressionMade { | ||
780 | var err error | ||
781 | req, err = s.opts.dc.Do(bytes.NewReader(req)) | ||
782 | if err != nil { | ||
783 | return Errorf(codes.Internal, err.Error()) | ||
784 | } | ||
785 | } | ||
786 | if len(req) > s.opts.maxReceiveMessageSize { | ||
787 | // TODO: Revisit the error code. Currently keep it consistent with | ||
788 | // java implementation. | ||
789 | return status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(req), s.opts.maxReceiveMessageSize) | ||
790 | } | ||
791 | if err := s.opts.codec.Unmarshal(req, v); err != nil { | ||
792 | return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) | 945 | return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) |
793 | } | 946 | } |
794 | if inPayload != nil { | 947 | if sh != nil { |
795 | inPayload.Payload = v | 948 | sh.HandleRPC(stream.Context(), &stats.InPayload{ |
796 | inPayload.Data = req | 949 | RecvTime: time.Now(), |
797 | inPayload.Length = len(req) | 950 | Payload: v, |
798 | sh.HandleRPC(stream.Context(), inPayload) | 951 | Data: d, |
952 | Length: len(d), | ||
953 | }) | ||
954 | } | ||
955 | if binlog != nil { | ||
956 | binlog.Log(&binarylog.ClientMessage{ | ||
957 | Message: d, | ||
958 | }) | ||
799 | } | 959 | } |
800 | if trInfo != nil { | 960 | if trInfo != nil { |
801 | trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) | 961 | trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) |
802 | } | 962 | } |
803 | return nil | 963 | return nil |
804 | } | 964 | } |
805 | reply, appErr := md.Handler(srv.server, stream.Context(), df, s.opts.unaryInt) | 965 | ctx := NewContextWithServerTransportStream(stream.Context(), stream) |
966 | reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) | ||
806 | if appErr != nil { | 967 | if appErr != nil { |
807 | appStatus, ok := status.FromError(appErr) | 968 | appStatus, ok := status.FromError(appErr) |
808 | if !ok { | 969 | if !ok { |
809 | // Convert appErr if it is not a grpc status error. | 970 | // Convert appErr if it is not a grpc status error. |
810 | appErr = status.Error(convertCode(appErr), appErr.Error()) | 971 | appErr = status.Error(codes.Unknown, appErr.Error()) |
811 | appStatus, _ = status.FromError(appErr) | 972 | appStatus, _ = status.FromError(appErr) |
812 | } | 973 | } |
813 | if trInfo != nil { | 974 | if trInfo != nil { |
@@ -817,16 +978,27 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. | |||
817 | if e := t.WriteStatus(stream, appStatus); e != nil { | 978 | if e := t.WriteStatus(stream, appStatus); e != nil { |
818 | grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) | 979 | grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) |
819 | } | 980 | } |
981 | if binlog != nil { | ||
982 | if h, _ := stream.Header(); h.Len() > 0 { | ||
983 | // Only log serverHeader if there was header. Otherwise it can | ||
984 | // be trailer only. | ||
985 | binlog.Log(&binarylog.ServerHeader{ | ||
986 | Header: h, | ||
987 | }) | ||
988 | } | ||
989 | binlog.Log(&binarylog.ServerTrailer{ | ||
990 | Trailer: stream.Trailer(), | ||
991 | Err: appErr, | ||
992 | }) | ||
993 | } | ||
820 | return appErr | 994 | return appErr |
821 | } | 995 | } |
822 | if trInfo != nil { | 996 | if trInfo != nil { |
823 | trInfo.tr.LazyLog(stringer("OK"), false) | 997 | trInfo.tr.LazyLog(stringer("OK"), false) |
824 | } | 998 | } |
825 | opts := &transport.Options{ | 999 | opts := &transport.Options{Last: true} |
826 | Last: true, | 1000 | |
827 | Delay: false, | 1001 | if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { |
828 | } | ||
829 | if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil { | ||
830 | if err == io.EOF { | 1002 | if err == io.EOF { |
831 | // The entire stream is done (for unary RPC only). | 1003 | // The entire stream is done (for unary RPC only). |
832 | return err | 1004 | return err |
@@ -839,35 +1011,72 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. | |||
839 | switch st := err.(type) { | 1011 | switch st := err.(type) { |
840 | case transport.ConnectionError: | 1012 | case transport.ConnectionError: |
841 | // Nothing to do here. | 1013 | // Nothing to do here. |
842 | case transport.StreamError: | ||
843 | if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil { | ||
844 | grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) | ||
845 | } | ||
846 | default: | 1014 | default: |
847 | panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) | 1015 | panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) |
848 | } | 1016 | } |
849 | } | 1017 | } |
1018 | if binlog != nil { | ||
1019 | h, _ := stream.Header() | ||
1020 | binlog.Log(&binarylog.ServerHeader{ | ||
1021 | Header: h, | ||
1022 | }) | ||
1023 | binlog.Log(&binarylog.ServerTrailer{ | ||
1024 | Trailer: stream.Trailer(), | ||
1025 | Err: appErr, | ||
1026 | }) | ||
1027 | } | ||
850 | return err | 1028 | return err |
851 | } | 1029 | } |
1030 | if binlog != nil { | ||
1031 | h, _ := stream.Header() | ||
1032 | binlog.Log(&binarylog.ServerHeader{ | ||
1033 | Header: h, | ||
1034 | }) | ||
1035 | binlog.Log(&binarylog.ServerMessage{ | ||
1036 | Message: reply, | ||
1037 | }) | ||
1038 | } | ||
1039 | if channelz.IsOn() { | ||
1040 | t.IncrMsgSent() | ||
1041 | } | ||
852 | if trInfo != nil { | 1042 | if trInfo != nil { |
853 | trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) | 1043 | trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) |
854 | } | 1044 | } |
855 | // TODO: Should we be logging if writing status failed here, like above? | 1045 | // TODO: Should we be logging if writing status failed here, like above? |
856 | // Should the logging be in WriteStatus? Should we ignore the WriteStatus | 1046 | // Should the logging be in WriteStatus? Should we ignore the WriteStatus |
857 | // error or allow the stats handler to see it? | 1047 | // error or allow the stats handler to see it? |
858 | return t.WriteStatus(stream, status.New(codes.OK, "")) | 1048 | err = t.WriteStatus(stream, status.New(codes.OK, "")) |
1049 | if binlog != nil { | ||
1050 | binlog.Log(&binarylog.ServerTrailer{ | ||
1051 | Trailer: stream.Trailer(), | ||
1052 | Err: appErr, | ||
1053 | }) | ||
1054 | } | ||
1055 | return err | ||
859 | } | 1056 | } |
860 | 1057 | ||
861 | func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { | 1058 | func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { |
1059 | if channelz.IsOn() { | ||
1060 | s.incrCallsStarted() | ||
1061 | defer func() { | ||
1062 | if err != nil && err != io.EOF { | ||
1063 | s.incrCallsFailed() | ||
1064 | } else { | ||
1065 | s.incrCallsSucceeded() | ||
1066 | } | ||
1067 | }() | ||
1068 | } | ||
862 | sh := s.opts.statsHandler | 1069 | sh := s.opts.statsHandler |
863 | if sh != nil { | 1070 | if sh != nil { |
1071 | beginTime := time.Now() | ||
864 | begin := &stats.Begin{ | 1072 | begin := &stats.Begin{ |
865 | BeginTime: time.Now(), | 1073 | BeginTime: beginTime, |
866 | } | 1074 | } |
867 | sh.HandleRPC(stream.Context(), begin) | 1075 | sh.HandleRPC(stream.Context(), begin) |
868 | defer func() { | 1076 | defer func() { |
869 | end := &stats.End{ | 1077 | end := &stats.End{ |
870 | EndTime: time.Now(), | 1078 | BeginTime: beginTime, |
1079 | EndTime: time.Now(), | ||
871 | } | 1080 | } |
872 | if err != nil && err != io.EOF { | 1081 | if err != nil && err != io.EOF { |
873 | end.Error = toRPCErr(err) | 1082 | end.Error = toRPCErr(err) |
@@ -875,24 +1084,70 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp | |||
875 | sh.HandleRPC(stream.Context(), end) | 1084 | sh.HandleRPC(stream.Context(), end) |
876 | }() | 1085 | }() |
877 | } | 1086 | } |
878 | if s.opts.cp != nil { | 1087 | ctx := NewContextWithServerTransportStream(stream.Context(), stream) |
879 | stream.SetSendCompress(s.opts.cp.Type()) | ||
880 | } | ||
881 | ss := &serverStream{ | 1088 | ss := &serverStream{ |
882 | t: t, | 1089 | ctx: ctx, |
883 | s: stream, | 1090 | t: t, |
884 | p: &parser{r: stream}, | 1091 | s: stream, |
885 | codec: s.opts.codec, | 1092 | p: &parser{r: stream}, |
886 | cp: s.opts.cp, | 1093 | codec: s.getCodec(stream.ContentSubtype()), |
887 | dc: s.opts.dc, | ||
888 | maxReceiveMessageSize: s.opts.maxReceiveMessageSize, | 1094 | maxReceiveMessageSize: s.opts.maxReceiveMessageSize, |
889 | maxSendMessageSize: s.opts.maxSendMessageSize, | 1095 | maxSendMessageSize: s.opts.maxSendMessageSize, |
890 | trInfo: trInfo, | 1096 | trInfo: trInfo, |
891 | statsHandler: sh, | 1097 | statsHandler: sh, |
892 | } | 1098 | } |
893 | if ss.cp != nil { | 1099 | |
894 | ss.cbuf = new(bytes.Buffer) | 1100 | ss.binlog = binarylog.GetMethodLogger(stream.Method()) |
1101 | if ss.binlog != nil { | ||
1102 | md, _ := metadata.FromIncomingContext(ctx) | ||
1103 | logEntry := &binarylog.ClientHeader{ | ||
1104 | Header: md, | ||
1105 | MethodName: stream.Method(), | ||
1106 | PeerAddr: nil, | ||
1107 | } | ||
1108 | if deadline, ok := ctx.Deadline(); ok { | ||
1109 | logEntry.Timeout = deadline.Sub(time.Now()) | ||
1110 | if logEntry.Timeout < 0 { | ||
1111 | logEntry.Timeout = 0 | ||
1112 | } | ||
1113 | } | ||
1114 | if a := md[":authority"]; len(a) > 0 { | ||
1115 | logEntry.Authority = a[0] | ||
1116 | } | ||
1117 | if peer, ok := peer.FromContext(ss.Context()); ok { | ||
1118 | logEntry.PeerAddr = peer.Addr | ||
1119 | } | ||
1120 | ss.binlog.Log(logEntry) | ||
1121 | } | ||
1122 | |||
1123 | // If dc is set and matches the stream's compression, use it. Otherwise, try | ||
1124 | // to find a matching registered compressor for decomp. | ||
1125 | if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { | ||
1126 | ss.dc = s.opts.dc | ||
1127 | } else if rc != "" && rc != encoding.Identity { | ||
1128 | ss.decomp = encoding.GetCompressor(rc) | ||
1129 | if ss.decomp == nil { | ||
1130 | st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) | ||
1131 | t.WriteStatus(ss.s, st) | ||
1132 | return st.Err() | ||
1133 | } | ||
1134 | } | ||
1135 | |||
1136 | // If cp is set, use it. Otherwise, attempt to compress the response using | ||
1137 | // the incoming message compression method. | ||
1138 | // | ||
1139 | // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. | ||
1140 | if s.opts.cp != nil { | ||
1141 | ss.cp = s.opts.cp | ||
1142 | stream.SetSendCompress(s.opts.cp.Type()) | ||
1143 | } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { | ||
1144 | // Legacy compressor not specified; attempt to respond with same encoding. | ||
1145 | ss.comp = encoding.GetCompressor(rc) | ||
1146 | if ss.comp != nil { | ||
1147 | stream.SetSendCompress(rc) | ||
1148 | } | ||
895 | } | 1149 | } |
1150 | |||
896 | if trInfo != nil { | 1151 | if trInfo != nil { |
897 | trInfo.tr.LazyLog(&trInfo.firstLine, false) | 1152 | trInfo.tr.LazyLog(&trInfo.firstLine, false) |
898 | defer func() { | 1153 | defer func() { |
@@ -924,12 +1179,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp | |||
924 | if appErr != nil { | 1179 | if appErr != nil { |
925 | appStatus, ok := status.FromError(appErr) | 1180 | appStatus, ok := status.FromError(appErr) |
926 | if !ok { | 1181 | if !ok { |
927 | switch err := appErr.(type) { | 1182 | appStatus = status.New(codes.Unknown, appErr.Error()) |
928 | case transport.StreamError: | ||
929 | appStatus = status.New(err.Code, err.Desc) | ||
930 | default: | ||
931 | appStatus = status.New(convertCode(appErr), appErr.Error()) | ||
932 | } | ||
933 | appErr = appStatus.Err() | 1183 | appErr = appStatus.Err() |
934 | } | 1184 | } |
935 | if trInfo != nil { | 1185 | if trInfo != nil { |
@@ -939,6 +1189,12 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp | |||
939 | ss.mu.Unlock() | 1189 | ss.mu.Unlock() |
940 | } | 1190 | } |
941 | t.WriteStatus(ss.s, appStatus) | 1191 | t.WriteStatus(ss.s, appStatus) |
1192 | if ss.binlog != nil { | ||
1193 | ss.binlog.Log(&binarylog.ServerTrailer{ | ||
1194 | Trailer: ss.s.Trailer(), | ||
1195 | Err: appErr, | ||
1196 | }) | ||
1197 | } | ||
942 | // TODO: Should we log an error from WriteStatus here and below? | 1198 | // TODO: Should we log an error from WriteStatus here and below? |
943 | return appErr | 1199 | return appErr |
944 | } | 1200 | } |
@@ -947,8 +1203,14 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp | |||
947 | ss.trInfo.tr.LazyLog(stringer("OK"), false) | 1203 | ss.trInfo.tr.LazyLog(stringer("OK"), false) |
948 | ss.mu.Unlock() | 1204 | ss.mu.Unlock() |
949 | } | 1205 | } |
950 | return t.WriteStatus(ss.s, status.New(codes.OK, "")) | 1206 | err = t.WriteStatus(ss.s, status.New(codes.OK, "")) |
951 | 1207 | if ss.binlog != nil { | |
1208 | ss.binlog.Log(&binarylog.ServerTrailer{ | ||
1209 | Trailer: ss.s.Trailer(), | ||
1210 | Err: appErr, | ||
1211 | }) | ||
1212 | } | ||
1213 | return err | ||
952 | } | 1214 | } |
953 | 1215 | ||
954 | func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { | 1216 | func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { |
@@ -977,47 +1239,27 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str | |||
977 | } | 1239 | } |
978 | service := sm[:pos] | 1240 | service := sm[:pos] |
979 | method := sm[pos+1:] | 1241 | method := sm[pos+1:] |
980 | srv, ok := s.m[service] | 1242 | |
981 | if !ok { | 1243 | if srv, ok := s.m[service]; ok { |
982 | if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { | 1244 | if md, ok := srv.md[method]; ok { |
983 | s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) | 1245 | s.processUnaryRPC(t, stream, srv, md, trInfo) |
984 | return | 1246 | return |
985 | } | 1247 | } |
986 | if trInfo != nil { | 1248 | if sd, ok := srv.sd[method]; ok { |
987 | trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true) | 1249 | s.processStreamingRPC(t, stream, srv, sd, trInfo) |
988 | trInfo.tr.SetError() | 1250 | return |
989 | } | ||
990 | errDesc := fmt.Sprintf("unknown service %v", service) | ||
991 | if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { | ||
992 | if trInfo != nil { | ||
993 | trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | ||
994 | trInfo.tr.SetError() | ||
995 | } | ||
996 | grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) | ||
997 | } | ||
998 | if trInfo != nil { | ||
999 | trInfo.tr.Finish() | ||
1000 | } | 1251 | } |
1001 | return | ||
1002 | } | ||
1003 | // Unary RPC or Streaming RPC? | ||
1004 | if md, ok := srv.md[method]; ok { | ||
1005 | s.processUnaryRPC(t, stream, srv, md, trInfo) | ||
1006 | return | ||
1007 | } | 1252 | } |
1008 | if sd, ok := srv.sd[method]; ok { | 1253 | // Unknown service, or known server unknown method. |
1009 | s.processStreamingRPC(t, stream, srv, sd, trInfo) | 1254 | if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { |
1255 | s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) | ||
1010 | return | 1256 | return |
1011 | } | 1257 | } |
1012 | if trInfo != nil { | 1258 | if trInfo != nil { |
1013 | trInfo.tr.LazyLog(&fmtStringer{"Unknown method %v", []interface{}{method}}, true) | 1259 | trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true) |
1014 | trInfo.tr.SetError() | 1260 | trInfo.tr.SetError() |
1015 | } | 1261 | } |
1016 | if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { | 1262 | errDesc := fmt.Sprintf("unknown service %v", service) |
1017 | s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) | ||
1018 | return | ||
1019 | } | ||
1020 | errDesc := fmt.Sprintf("unknown method %v", method) | ||
1021 | if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { | 1263 | if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { |
1022 | if trInfo != nil { | 1264 | if trInfo != nil { |
1023 | trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | 1265 | trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) |
@@ -1030,12 +1272,65 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str | |||
1030 | } | 1272 | } |
1031 | } | 1273 | } |
1032 | 1274 | ||
1275 | // The key to save ServerTransportStream in the context. | ||
1276 | type streamKey struct{} | ||
1277 | |||
1278 | // NewContextWithServerTransportStream creates a new context from ctx and | ||
1279 | // attaches stream to it. | ||
1280 | // | ||
1281 | // This API is EXPERIMENTAL. | ||
1282 | func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { | ||
1283 | return context.WithValue(ctx, streamKey{}, stream) | ||
1284 | } | ||
1285 | |||
1286 | // ServerTransportStream is a minimal interface that a transport stream must | ||
1287 | // implement. This can be used to mock an actual transport stream for tests of | ||
1288 | // handler code that use, for example, grpc.SetHeader (which requires some | ||
1289 | // stream to be in context). | ||
1290 | // | ||
1291 | // See also NewContextWithServerTransportStream. | ||
1292 | // | ||
1293 | // This API is EXPERIMENTAL. | ||
1294 | type ServerTransportStream interface { | ||
1295 | Method() string | ||
1296 | SetHeader(md metadata.MD) error | ||
1297 | SendHeader(md metadata.MD) error | ||
1298 | SetTrailer(md metadata.MD) error | ||
1299 | } | ||
1300 | |||
1301 | // ServerTransportStreamFromContext returns the ServerTransportStream saved in | ||
1302 | // ctx. Returns nil if the given context has no stream associated with it | ||
1303 | // (which implies it is not an RPC invocation context). | ||
1304 | // | ||
1305 | // This API is EXPERIMENTAL. | ||
1306 | func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { | ||
1307 | s, _ := ctx.Value(streamKey{}).(ServerTransportStream) | ||
1308 | return s | ||
1309 | } | ||
1310 | |||
1033 | // Stop stops the gRPC server. It immediately closes all open | 1311 | // Stop stops the gRPC server. It immediately closes all open |
1034 | // connections and listeners. | 1312 | // connections and listeners. |
1035 | // It cancels all active RPCs on the server side and the corresponding | 1313 | // It cancels all active RPCs on the server side and the corresponding |
1036 | // pending RPCs on the client side will get notified by connection | 1314 | // pending RPCs on the client side will get notified by connection |
1037 | // errors. | 1315 | // errors. |
1038 | func (s *Server) Stop() { | 1316 | func (s *Server) Stop() { |
1317 | s.quitOnce.Do(func() { | ||
1318 | close(s.quit) | ||
1319 | }) | ||
1320 | |||
1321 | defer func() { | ||
1322 | s.serveWG.Wait() | ||
1323 | s.doneOnce.Do(func() { | ||
1324 | close(s.done) | ||
1325 | }) | ||
1326 | }() | ||
1327 | |||
1328 | s.channelzRemoveOnce.Do(func() { | ||
1329 | if channelz.IsOn() { | ||
1330 | channelz.RemoveEntry(s.channelzID) | ||
1331 | } | ||
1332 | }) | ||
1333 | |||
1039 | s.mu.Lock() | 1334 | s.mu.Lock() |
1040 | listeners := s.lis | 1335 | listeners := s.lis |
1041 | s.lis = nil | 1336 | s.lis = nil |
@@ -1053,7 +1348,6 @@ func (s *Server) Stop() { | |||
1053 | } | 1348 | } |
1054 | 1349 | ||
1055 | s.mu.Lock() | 1350 | s.mu.Lock() |
1056 | s.cancel() | ||
1057 | if s.events != nil { | 1351 | if s.events != nil { |
1058 | s.events.Finish() | 1352 | s.events.Finish() |
1059 | s.events = nil | 1353 | s.events = nil |
@@ -1065,22 +1359,44 @@ func (s *Server) Stop() { | |||
1065 | // accepting new connections and RPCs and blocks until all the pending RPCs are | 1359 | // accepting new connections and RPCs and blocks until all the pending RPCs are |
1066 | // finished. | 1360 | // finished. |
1067 | func (s *Server) GracefulStop() { | 1361 | func (s *Server) GracefulStop() { |
1362 | s.quitOnce.Do(func() { | ||
1363 | close(s.quit) | ||
1364 | }) | ||
1365 | |||
1366 | defer func() { | ||
1367 | s.doneOnce.Do(func() { | ||
1368 | close(s.done) | ||
1369 | }) | ||
1370 | }() | ||
1371 | |||
1372 | s.channelzRemoveOnce.Do(func() { | ||
1373 | if channelz.IsOn() { | ||
1374 | channelz.RemoveEntry(s.channelzID) | ||
1375 | } | ||
1376 | }) | ||
1068 | s.mu.Lock() | 1377 | s.mu.Lock() |
1069 | defer s.mu.Unlock() | ||
1070 | if s.conns == nil { | 1378 | if s.conns == nil { |
1379 | s.mu.Unlock() | ||
1071 | return | 1380 | return |
1072 | } | 1381 | } |
1382 | |||
1073 | for lis := range s.lis { | 1383 | for lis := range s.lis { |
1074 | lis.Close() | 1384 | lis.Close() |
1075 | } | 1385 | } |
1076 | s.lis = nil | 1386 | s.lis = nil |
1077 | s.cancel() | ||
1078 | if !s.drain { | 1387 | if !s.drain { |
1079 | for c := range s.conns { | 1388 | for c := range s.conns { |
1080 | c.(transport.ServerTransport).Drain() | 1389 | c.(transport.ServerTransport).Drain() |
1081 | } | 1390 | } |
1082 | s.drain = true | 1391 | s.drain = true |
1083 | } | 1392 | } |
1393 | |||
1394 | // Wait for serving threads to be ready to exit. Only then can we be sure no | ||
1395 | // new conns will be created. | ||
1396 | s.mu.Unlock() | ||
1397 | s.serveWG.Wait() | ||
1398 | s.mu.Lock() | ||
1399 | |||
1084 | for len(s.conns) != 0 { | 1400 | for len(s.conns) != 0 { |
1085 | s.cv.Wait() | 1401 | s.cv.Wait() |
1086 | } | 1402 | } |
@@ -1089,26 +1405,23 @@ func (s *Server) GracefulStop() { | |||
1089 | s.events.Finish() | 1405 | s.events.Finish() |
1090 | s.events = nil | 1406 | s.events = nil |
1091 | } | 1407 | } |
1408 | s.mu.Unlock() | ||
1092 | } | 1409 | } |
1093 | 1410 | ||
1094 | func init() { | 1411 | // contentSubtype must be lowercase |
1095 | internal.TestingCloseConns = func(arg interface{}) { | 1412 | // cannot return nil |
1096 | arg.(*Server).testingCloseConns() | 1413 | func (s *Server) getCodec(contentSubtype string) baseCodec { |
1414 | if s.opts.codec != nil { | ||
1415 | return s.opts.codec | ||
1097 | } | 1416 | } |
1098 | internal.TestingUseHandlerImpl = func(arg interface{}) { | 1417 | if contentSubtype == "" { |
1099 | arg.(*Server).opts.useHandlerImpl = true | 1418 | return encoding.GetCodec(proto.Name) |
1100 | } | 1419 | } |
1101 | } | 1420 | codec := encoding.GetCodec(contentSubtype) |
1102 | 1421 | if codec == nil { | |
1103 | // testingCloseConns closes all existing transports but keeps s.lis | 1422 | return encoding.GetCodec(proto.Name) |
1104 | // accepting new connections. | ||
1105 | func (s *Server) testingCloseConns() { | ||
1106 | s.mu.Lock() | ||
1107 | for c := range s.conns { | ||
1108 | c.Close() | ||
1109 | delete(s.conns, c) | ||
1110 | } | 1423 | } |
1111 | s.mu.Unlock() | 1424 | return codec |
1112 | } | 1425 | } |
1113 | 1426 | ||
1114 | // SetHeader sets the header metadata. | 1427 | // SetHeader sets the header metadata. |
@@ -1121,9 +1434,9 @@ func SetHeader(ctx context.Context, md metadata.MD) error { | |||
1121 | if md.Len() == 0 { | 1434 | if md.Len() == 0 { |
1122 | return nil | 1435 | return nil |
1123 | } | 1436 | } |
1124 | stream, ok := transport.StreamFromContext(ctx) | 1437 | stream := ServerTransportStreamFromContext(ctx) |
1125 | if !ok { | 1438 | if stream == nil { |
1126 | return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) | 1439 | return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) |
1127 | } | 1440 | } |
1128 | return stream.SetHeader(md) | 1441 | return stream.SetHeader(md) |
1129 | } | 1442 | } |
@@ -1131,15 +1444,11 @@ func SetHeader(ctx context.Context, md metadata.MD) error { | |||
1131 | // SendHeader sends header metadata. It may be called at most once. | 1444 | // SendHeader sends header metadata. It may be called at most once. |
1132 | // The provided md and headers set by SetHeader() will be sent. | 1445 | // The provided md and headers set by SetHeader() will be sent. |
1133 | func SendHeader(ctx context.Context, md metadata.MD) error { | 1446 | func SendHeader(ctx context.Context, md metadata.MD) error { |
1134 | stream, ok := transport.StreamFromContext(ctx) | 1447 | stream := ServerTransportStreamFromContext(ctx) |
1135 | if !ok { | 1448 | if stream == nil { |
1136 | return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) | 1449 | return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) |
1137 | } | 1450 | } |
1138 | t := stream.ServerTransport() | 1451 | if err := stream.SendHeader(md); err != nil { |
1139 | if t == nil { | ||
1140 | grpclog.Fatalf("grpc: SendHeader: %v has no ServerTransport to send header metadata.", stream) | ||
1141 | } | ||
1142 | if err := t.WriteHeader(stream, md); err != nil { | ||
1143 | return toRPCErr(err) | 1452 | return toRPCErr(err) |
1144 | } | 1453 | } |
1145 | return nil | 1454 | return nil |
@@ -1151,9 +1460,27 @@ func SetTrailer(ctx context.Context, md metadata.MD) error { | |||
1151 | if md.Len() == 0 { | 1460 | if md.Len() == 0 { |
1152 | return nil | 1461 | return nil |
1153 | } | 1462 | } |
1154 | stream, ok := transport.StreamFromContext(ctx) | 1463 | stream := ServerTransportStreamFromContext(ctx) |
1155 | if !ok { | 1464 | if stream == nil { |
1156 | return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) | 1465 | return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) |
1157 | } | 1466 | } |
1158 | return stream.SetTrailer(md) | 1467 | return stream.SetTrailer(md) |
1159 | } | 1468 | } |
1469 | |||
1470 | // Method returns the method string for the server context. The returned | ||
1471 | // string is in the format of "/service/method". | ||
1472 | func Method(ctx context.Context) (string, bool) { | ||
1473 | s := ServerTransportStreamFromContext(ctx) | ||
1474 | if s == nil { | ||
1475 | return "", false | ||
1476 | } | ||
1477 | return s.Method(), true | ||
1478 | } | ||
1479 | |||
1480 | type channelzServer struct { | ||
1481 | s *Server | ||
1482 | } | ||
1483 | |||
1484 | func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { | ||
1485 | return c.s.channelzMetric() | ||
1486 | } | ||