]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blame - vendor/google.golang.org/grpc/server.go
Upgrade to 0.12
[github/fretlink/terraform-provider-statuscake.git] / vendor / google.golang.org / grpc / server.go
CommitLineData
15c0b25d
AP
1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package grpc
20
21import (
107c1cdb 22 "context"
15c0b25d
AP
23 "errors"
24 "fmt"
25 "io"
107c1cdb 26 "math"
15c0b25d
AP
27 "net"
28 "net/http"
29 "reflect"
30 "runtime"
31 "strings"
32 "sync"
107c1cdb 33 "sync/atomic"
15c0b25d
AP
34 "time"
35
15c0b25d 36 "golang.org/x/net/trace"
107c1cdb 37
15c0b25d
AP
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
107c1cdb
ND
40 "google.golang.org/grpc/encoding"
41 "google.golang.org/grpc/encoding/proto"
15c0b25d 42 "google.golang.org/grpc/grpclog"
107c1cdb
ND
43 "google.golang.org/grpc/internal/binarylog"
44 "google.golang.org/grpc/internal/channelz"
45 "google.golang.org/grpc/internal/transport"
15c0b25d
AP
46 "google.golang.org/grpc/keepalive"
47 "google.golang.org/grpc/metadata"
107c1cdb 48 "google.golang.org/grpc/peer"
15c0b25d
AP
49 "google.golang.org/grpc/stats"
50 "google.golang.org/grpc/status"
51 "google.golang.org/grpc/tap"
15c0b25d
AP
52)
53
54const (
55 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
107c1cdb 56 defaultServerMaxSendMessageSize = math.MaxInt32
15c0b25d
AP
57)
58
59type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
60
61// MethodDesc represents an RPC service's method specification.
62type MethodDesc struct {
63 MethodName string
64 Handler methodHandler
65}
66
67// ServiceDesc represents an RPC service's specification.
68type ServiceDesc struct {
69 ServiceName string
70 // The pointer to the service interface. Used to check whether the user
71 // provided implementation satisfies the interface requirements.
72 HandlerType interface{}
73 Methods []MethodDesc
74 Streams []StreamDesc
75 Metadata interface{}
76}
77
78// service consists of the information of the server serving this service and
79// the methods in this service.
80type service struct {
81 server interface{} // the server for service methods
82 md map[string]*MethodDesc
83 sd map[string]*StreamDesc
84 mdata interface{}
85}
86
87// Server is a gRPC server to serve RPC requests.
88type Server struct {
89 opts options
90
91 mu sync.Mutex // guards following
92 lis map[net.Listener]bool
93 conns map[io.Closer]bool
94 serve bool
95 drain bool
107c1cdb 96 cv *sync.Cond // signaled when connections close for GracefulStop
15c0b25d
AP
97 m map[string]*service // service name -> service info
98 events trace.EventLog
107c1cdb
ND
99
100 quit chan struct{}
101 done chan struct{}
102 quitOnce sync.Once
103 doneOnce sync.Once
104 channelzRemoveOnce sync.Once
105 serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
106
107 channelzID int64 // channelz unique identification number
108 czData *channelzData
15c0b25d
AP
109}
110
111type options struct {
112 creds credentials.TransportCredentials
107c1cdb 113 codec baseCodec
15c0b25d
AP
114 cp Compressor
115 dc Decompressor
116 unaryInt UnaryServerInterceptor
117 streamInt StreamServerInterceptor
118 inTapHandle tap.ServerInHandle
119 statsHandler stats.Handler
120 maxConcurrentStreams uint32
121 maxReceiveMessageSize int
122 maxSendMessageSize int
15c0b25d
AP
123 unknownStreamDesc *StreamDesc
124 keepaliveParams keepalive.ServerParameters
125 keepalivePolicy keepalive.EnforcementPolicy
126 initialWindowSize int32
127 initialConnWindowSize int32
107c1cdb
ND
128 writeBufferSize int
129 readBufferSize int
130 connectionTimeout time.Duration
131 maxHeaderListSize *uint32
15c0b25d
AP
132}
133
134var defaultServerOptions = options{
135 maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
136 maxSendMessageSize: defaultServerMaxSendMessageSize,
107c1cdb
ND
137 connectionTimeout: 120 * time.Second,
138 writeBufferSize: defaultWriteBufSize,
139 readBufferSize: defaultReadBufSize,
15c0b25d
AP
140}
141
142// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
143type ServerOption func(*options)
144
107c1cdb
ND
145// WriteBufferSize determines how much data can be batched before doing a write on the wire.
146// The corresponding memory allocation for this buffer will be twice the size to keep syscalls low.
147// The default value for this buffer is 32KB.
148// Zero will disable the write buffer such that each write will be on underlying connection.
149// Note: A Send call may not directly translate to a write.
150func WriteBufferSize(s int) ServerOption {
151 return func(o *options) {
152 o.writeBufferSize = s
153 }
154}
155
156// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
157// for one read syscall.
158// The default value for this buffer is 32KB.
159// Zero will disable read buffer for a connection so data framer can access the underlying
160// conn directly.
161func ReadBufferSize(s int) ServerOption {
162 return func(o *options) {
163 o.readBufferSize = s
164 }
165}
166
15c0b25d
AP
167// InitialWindowSize returns a ServerOption that sets window size for stream.
168// The lower bound for window size is 64K and any value smaller than that will be ignored.
169func InitialWindowSize(s int32) ServerOption {
170 return func(o *options) {
171 o.initialWindowSize = s
172 }
173}
174
175// InitialConnWindowSize returns a ServerOption that sets window size for a connection.
176// The lower bound for window size is 64K and any value smaller than that will be ignored.
177func InitialConnWindowSize(s int32) ServerOption {
178 return func(o *options) {
179 o.initialConnWindowSize = s
180 }
181}
182
183// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
184func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
185 return func(o *options) {
186 o.keepaliveParams = kp
187 }
188}
189
190// KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server.
191func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption {
192 return func(o *options) {
193 o.keepalivePolicy = kep
194 }
195}
196
197// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
107c1cdb
ND
198//
199// This will override any lookups by content-subtype for Codecs registered with RegisterCodec.
15c0b25d
AP
200func CustomCodec(codec Codec) ServerOption {
201 return func(o *options) {
202 o.codec = codec
203 }
204}
205
107c1cdb
ND
206// RPCCompressor returns a ServerOption that sets a compressor for outbound
207// messages. For backward compatibility, all outbound messages will be sent
208// using this compressor, regardless of incoming message compression. By
209// default, server messages will be sent using the same compressor with which
210// request messages were sent.
211//
212// Deprecated: use encoding.RegisterCompressor instead.
15c0b25d
AP
213func RPCCompressor(cp Compressor) ServerOption {
214 return func(o *options) {
215 o.cp = cp
216 }
217}
218
107c1cdb
ND
219// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
220// messages. It has higher priority than decompressors registered via
221// encoding.RegisterCompressor.
222//
223// Deprecated: use encoding.RegisterCompressor instead.
15c0b25d
AP
224func RPCDecompressor(dc Decompressor) ServerOption {
225 return func(o *options) {
226 o.dc = dc
227 }
228}
229
230// MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
107c1cdb
ND
231// If this is not set, gRPC uses the default limit.
232//
233// Deprecated: use MaxRecvMsgSize instead.
15c0b25d
AP
234func MaxMsgSize(m int) ServerOption {
235 return MaxRecvMsgSize(m)
236}
237
238// MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive.
239// If this is not set, gRPC uses the default 4MB.
240func MaxRecvMsgSize(m int) ServerOption {
241 return func(o *options) {
242 o.maxReceiveMessageSize = m
243 }
244}
245
246// MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send.
247// If this is not set, gRPC uses the default 4MB.
248func MaxSendMsgSize(m int) ServerOption {
249 return func(o *options) {
250 o.maxSendMessageSize = m
251 }
252}
253
254// MaxConcurrentStreams returns a ServerOption that will apply a limit on the number
255// of concurrent streams to each ServerTransport.
256func MaxConcurrentStreams(n uint32) ServerOption {
257 return func(o *options) {
258 o.maxConcurrentStreams = n
259 }
260}
261
262// Creds returns a ServerOption that sets credentials for server connections.
263func Creds(c credentials.TransportCredentials) ServerOption {
264 return func(o *options) {
265 o.creds = c
266 }
267}
268
269// UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the
270// server. Only one unary interceptor can be installed. The construction of multiple
271// interceptors (e.g., chaining) can be implemented at the caller.
272func UnaryInterceptor(i UnaryServerInterceptor) ServerOption {
273 return func(o *options) {
274 if o.unaryInt != nil {
275 panic("The unary server interceptor was already set and may not be reset.")
276 }
277 o.unaryInt = i
278 }
279}
280
281// StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the
282// server. Only one stream interceptor can be installed.
283func StreamInterceptor(i StreamServerInterceptor) ServerOption {
284 return func(o *options) {
285 if o.streamInt != nil {
286 panic("The stream server interceptor was already set and may not be reset.")
287 }
288 o.streamInt = i
289 }
290}
291
292// InTapHandle returns a ServerOption that sets the tap handle for all the server
293// transport to be created. Only one can be installed.
294func InTapHandle(h tap.ServerInHandle) ServerOption {
295 return func(o *options) {
296 if o.inTapHandle != nil {
297 panic("The tap handle was already set and may not be reset.")
298 }
299 o.inTapHandle = h
300 }
301}
302
303// StatsHandler returns a ServerOption that sets the stats handler for the server.
304func StatsHandler(h stats.Handler) ServerOption {
305 return func(o *options) {
306 o.statsHandler = h
307 }
308}
309
310// UnknownServiceHandler returns a ServerOption that allows for adding a custom
311// unknown service handler. The provided method is a bidi-streaming RPC service
312// handler that will be invoked instead of returning the "unimplemented" gRPC
313// error whenever a request is received for an unregistered service or method.
314// The handling function has full access to the Context of the request and the
107c1cdb 315// stream, and the invocation bypasses interceptors.
15c0b25d
AP
316func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
317 return func(o *options) {
318 o.unknownStreamDesc = &StreamDesc{
319 StreamName: "unknown_service_handler",
320 Handler: streamHandler,
321 // We need to assume that the users of the streamHandler will want to use both.
322 ClientStreams: true,
323 ServerStreams: true,
324 }
325 }
326}
327
107c1cdb
ND
328// ConnectionTimeout returns a ServerOption that sets the timeout for
329// connection establishment (up to and including HTTP/2 handshaking) for all
330// new connections. If this is not set, the default is 120 seconds. A zero or
331// negative value will result in an immediate timeout.
332//
333// This API is EXPERIMENTAL.
334func ConnectionTimeout(d time.Duration) ServerOption {
335 return func(o *options) {
336 o.connectionTimeout = d
337 }
338}
339
340// MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size
341// of header list that the server is prepared to accept.
342func MaxHeaderListSize(s uint32) ServerOption {
343 return func(o *options) {
344 o.maxHeaderListSize = &s
345 }
346}
347
15c0b25d
AP
348// NewServer creates a gRPC server which has no service registered and has not
349// started to accept requests yet.
350func NewServer(opt ...ServerOption) *Server {
351 opts := defaultServerOptions
352 for _, o := range opt {
353 o(&opts)
354 }
15c0b25d 355 s := &Server{
107c1cdb
ND
356 lis: make(map[net.Listener]bool),
357 opts: opts,
358 conns: make(map[io.Closer]bool),
359 m: make(map[string]*service),
360 quit: make(chan struct{}),
361 done: make(chan struct{}),
362 czData: new(channelzData),
15c0b25d
AP
363 }
364 s.cv = sync.NewCond(&s.mu)
15c0b25d
AP
365 if EnableTracing {
366 _, file, line, _ := runtime.Caller(1)
367 s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
368 }
107c1cdb
ND
369
370 if channelz.IsOn() {
371 s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
372 }
15c0b25d
AP
373 return s
374}
375
376// printf records an event in s's event log, unless s has been stopped.
377// REQUIRES s.mu is held.
378func (s *Server) printf(format string, a ...interface{}) {
379 if s.events != nil {
380 s.events.Printf(format, a...)
381 }
382}
383
384// errorf records an error in s's event log, unless s has been stopped.
385// REQUIRES s.mu is held.
386func (s *Server) errorf(format string, a ...interface{}) {
387 if s.events != nil {
388 s.events.Errorf(format, a...)
389 }
390}
391
392// RegisterService registers a service and its implementation to the gRPC
393// server. It is called from the IDL generated code. This must be called before
394// invoking Serve.
395func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
396 ht := reflect.TypeOf(sd.HandlerType).Elem()
397 st := reflect.TypeOf(ss)
398 if !st.Implements(ht) {
399 grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht)
400 }
401 s.register(sd, ss)
402}
403
404func (s *Server) register(sd *ServiceDesc, ss interface{}) {
405 s.mu.Lock()
406 defer s.mu.Unlock()
407 s.printf("RegisterService(%q)", sd.ServiceName)
408 if s.serve {
409 grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
410 }
411 if _, ok := s.m[sd.ServiceName]; ok {
412 grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
413 }
414 srv := &service{
415 server: ss,
416 md: make(map[string]*MethodDesc),
417 sd: make(map[string]*StreamDesc),
418 mdata: sd.Metadata,
419 }
420 for i := range sd.Methods {
421 d := &sd.Methods[i]
422 srv.md[d.MethodName] = d
423 }
424 for i := range sd.Streams {
425 d := &sd.Streams[i]
426 srv.sd[d.StreamName] = d
427 }
428 s.m[sd.ServiceName] = srv
429}
430
431// MethodInfo contains the information of an RPC including its method name and type.
432type MethodInfo struct {
433 // Name is the method name only, without the service name or package name.
434 Name string
435 // IsClientStream indicates whether the RPC is a client streaming RPC.
436 IsClientStream bool
437 // IsServerStream indicates whether the RPC is a server streaming RPC.
438 IsServerStream bool
439}
440
441// ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service.
442type ServiceInfo struct {
443 Methods []MethodInfo
444 // Metadata is the metadata specified in ServiceDesc when registering service.
445 Metadata interface{}
446}
447
448// GetServiceInfo returns a map from service names to ServiceInfo.
449// Service names include the package names, in the form of <package>.<service>.
450func (s *Server) GetServiceInfo() map[string]ServiceInfo {
451 ret := make(map[string]ServiceInfo)
452 for n, srv := range s.m {
453 methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd))
454 for m := range srv.md {
455 methods = append(methods, MethodInfo{
456 Name: m,
457 IsClientStream: false,
458 IsServerStream: false,
459 })
460 }
461 for m, d := range srv.sd {
462 methods = append(methods, MethodInfo{
463 Name: m,
464 IsClientStream: d.ClientStreams,
465 IsServerStream: d.ServerStreams,
466 })
467 }
468
469 ret[n] = ServiceInfo{
470 Methods: methods,
471 Metadata: srv.mdata,
472 }
473 }
474 return ret
475}
476
107c1cdb
ND
477// ErrServerStopped indicates that the operation is now illegal because of
478// the server being stopped.
479var ErrServerStopped = errors.New("grpc: the server has been stopped")
15c0b25d
AP
480
481func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
482 if s.opts.creds == nil {
483 return rawConn, nil, nil
484 }
485 return s.opts.creds.ServerHandshake(rawConn)
486}
487
107c1cdb
ND
488type listenSocket struct {
489 net.Listener
490 channelzID int64
491}
492
493func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric {
494 return &channelz.SocketInternalMetric{
495 SocketOptions: channelz.GetSocketOption(l.Listener),
496 LocalAddr: l.Listener.Addr(),
497 }
498}
499
500func (l *listenSocket) Close() error {
501 err := l.Listener.Close()
502 if channelz.IsOn() {
503 channelz.RemoveEntry(l.channelzID)
504 }
505 return err
506}
507
15c0b25d
AP
508// Serve accepts incoming connections on the listener lis, creating a new
509// ServerTransport and service goroutine for each. The service goroutines
510// read gRPC requests and then call the registered handlers to reply to them.
511// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
512// this method returns.
107c1cdb 513// Serve will return a non-nil error unless Stop or GracefulStop is called.
15c0b25d
AP
514func (s *Server) Serve(lis net.Listener) error {
515 s.mu.Lock()
516 s.printf("serving")
517 s.serve = true
518 if s.lis == nil {
107c1cdb 519 // Serve called after Stop or GracefulStop.
15c0b25d
AP
520 s.mu.Unlock()
521 lis.Close()
522 return ErrServerStopped
523 }
107c1cdb
ND
524
525 s.serveWG.Add(1)
526 defer func() {
527 s.serveWG.Done()
528 select {
529 // Stop or GracefulStop called; block until done and return nil.
530 case <-s.quit:
531 <-s.done
532 default:
533 }
534 }()
535
536 ls := &listenSocket{Listener: lis}
537 s.lis[ls] = true
538
539 if channelz.IsOn() {
540 ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String())
541 }
15c0b25d 542 s.mu.Unlock()
107c1cdb 543
15c0b25d
AP
544 defer func() {
545 s.mu.Lock()
107c1cdb
ND
546 if s.lis != nil && s.lis[ls] {
547 ls.Close()
548 delete(s.lis, ls)
15c0b25d
AP
549 }
550 s.mu.Unlock()
551 }()
552
553 var tempDelay time.Duration // how long to sleep on accept failure
554
555 for {
556 rawConn, err := lis.Accept()
557 if err != nil {
558 if ne, ok := err.(interface {
559 Temporary() bool
560 }); ok && ne.Temporary() {
561 if tempDelay == 0 {
562 tempDelay = 5 * time.Millisecond
563 } else {
564 tempDelay *= 2
565 }
566 if max := 1 * time.Second; tempDelay > max {
567 tempDelay = max
568 }
569 s.mu.Lock()
570 s.printf("Accept error: %v; retrying in %v", err, tempDelay)
571 s.mu.Unlock()
572 timer := time.NewTimer(tempDelay)
573 select {
574 case <-timer.C:
107c1cdb
ND
575 case <-s.quit:
576 timer.Stop()
577 return nil
15c0b25d 578 }
15c0b25d
AP
579 continue
580 }
581 s.mu.Lock()
582 s.printf("done serving; Accept = %v", err)
583 s.mu.Unlock()
107c1cdb
ND
584
585 select {
586 case <-s.quit:
587 return nil
588 default:
589 }
15c0b25d
AP
590 return err
591 }
592 tempDelay = 0
107c1cdb
ND
593 // Start a new goroutine to deal with rawConn so we don't stall this Accept
594 // loop goroutine.
595 //
596 // Make sure we account for the goroutine so GracefulStop doesn't nil out
597 // s.conns before this conn can be added.
598 s.serveWG.Add(1)
599 go func() {
600 s.handleRawConn(rawConn)
601 s.serveWG.Done()
602 }()
15c0b25d
AP
603 }
604}
605
107c1cdb
ND
606// handleRawConn forks a goroutine to handle a just-accepted connection that
607// has not had any I/O performed on it yet.
15c0b25d 608func (s *Server) handleRawConn(rawConn net.Conn) {
107c1cdb 609 rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
15c0b25d
AP
610 conn, authInfo, err := s.useTransportAuthenticator(rawConn)
611 if err != nil {
612 s.mu.Lock()
613 s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
614 s.mu.Unlock()
615 grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
107c1cdb 616 // If serverHandshake returns ErrConnDispatched, keep rawConn open.
15c0b25d
AP
617 if err != credentials.ErrConnDispatched {
618 rawConn.Close()
619 }
107c1cdb 620 rawConn.SetDeadline(time.Time{})
15c0b25d
AP
621 return
622 }
623
624 s.mu.Lock()
625 if s.conns == nil {
626 s.mu.Unlock()
627 conn.Close()
628 return
629 }
630 s.mu.Unlock()
631
107c1cdb
ND
632 // Finish handshaking (HTTP2)
633 st := s.newHTTP2Transport(conn, authInfo)
634 if st == nil {
635 return
636 }
637
638 rawConn.SetDeadline(time.Time{})
639 if !s.addConn(st) {
640 return
15c0b25d 641 }
107c1cdb
ND
642 go func() {
643 s.serveStreams(st)
644 s.removeConn(st)
645 }()
15c0b25d
AP
646}
647
107c1cdb
ND
648// newHTTP2Transport sets up a http/2 transport (using the
649// gRPC http2 server transport in transport/http2_server.go).
650func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
15c0b25d
AP
651 config := &transport.ServerConfig{
652 MaxStreams: s.opts.maxConcurrentStreams,
653 AuthInfo: authInfo,
654 InTapHandle: s.opts.inTapHandle,
655 StatsHandler: s.opts.statsHandler,
656 KeepaliveParams: s.opts.keepaliveParams,
657 KeepalivePolicy: s.opts.keepalivePolicy,
658 InitialWindowSize: s.opts.initialWindowSize,
659 InitialConnWindowSize: s.opts.initialConnWindowSize,
107c1cdb
ND
660 WriteBufferSize: s.opts.writeBufferSize,
661 ReadBufferSize: s.opts.readBufferSize,
662 ChannelzParentID: s.channelzID,
663 MaxHeaderListSize: s.opts.maxHeaderListSize,
15c0b25d
AP
664 }
665 st, err := transport.NewServerTransport("http2", c, config)
666 if err != nil {
667 s.mu.Lock()
668 s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
669 s.mu.Unlock()
670 c.Close()
671 grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
107c1cdb 672 return nil
15c0b25d 673 }
107c1cdb
ND
674
675 return st
15c0b25d
AP
676}
677
678func (s *Server) serveStreams(st transport.ServerTransport) {
15c0b25d
AP
679 defer st.Close()
680 var wg sync.WaitGroup
681 st.HandleStreams(func(stream *transport.Stream) {
682 wg.Add(1)
683 go func() {
684 defer wg.Done()
685 s.handleStream(st, stream, s.traceInfo(st, stream))
686 }()
687 }, func(ctx context.Context, method string) context.Context {
688 if !EnableTracing {
689 return ctx
690 }
691 tr := trace.New("grpc.Recv."+methodFamily(method), method)
692 return trace.NewContext(ctx, tr)
693 })
694 wg.Wait()
695}
696
697var _ http.Handler = (*Server)(nil)
698
15c0b25d
AP
699// ServeHTTP implements the Go standard library's http.Handler
700// interface by responding to the gRPC request r, by looking up
701// the requested gRPC method in the gRPC server s.
702//
703// The provided HTTP request must have arrived on an HTTP/2
704// connection. When using the Go standard library's server,
705// practically this means that the Request must also have arrived
706// over TLS.
707//
708// To share one port (such as 443 for https) between gRPC and an
709// existing http.Handler, use a root http.Handler such as:
710//
711// if r.ProtoMajor == 2 && strings.HasPrefix(
712// r.Header.Get("Content-Type"), "application/grpc") {
713// grpcServer.ServeHTTP(w, r)
714// } else {
715// yourMux.ServeHTTP(w, r)
716// }
717//
718// Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally
719// separate from grpc-go's HTTP/2 server. Performance and features may vary
720// between the two paths. ServeHTTP does not support some gRPC features
721// available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL
722// and subject to change.
723func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
107c1cdb 724 st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler)
15c0b25d
AP
725 if err != nil {
726 http.Error(w, err.Error(), http.StatusInternalServerError)
727 return
728 }
729 if !s.addConn(st) {
15c0b25d
AP
730 return
731 }
732 defer s.removeConn(st)
733 s.serveStreams(st)
734}
735
736// traceInfo returns a traceInfo and associates it with stream, if tracing is enabled.
737// If tracing is not enabled, it returns nil.
738func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) {
739 tr, ok := trace.FromContext(stream.Context())
740 if !ok {
741 return nil
742 }
743
744 trInfo = &traceInfo{
745 tr: tr,
746 }
747 trInfo.firstLine.client = false
748 trInfo.firstLine.remoteAddr = st.RemoteAddr()
749
750 if dl, ok := stream.Context().Deadline(); ok {
751 trInfo.firstLine.deadline = dl.Sub(time.Now())
752 }
753 return trInfo
754}
755
756func (s *Server) addConn(c io.Closer) bool {
757 s.mu.Lock()
758 defer s.mu.Unlock()
107c1cdb
ND
759 if s.conns == nil {
760 c.Close()
15c0b25d
AP
761 return false
762 }
107c1cdb
ND
763 if s.drain {
764 // Transport added after we drained our existing conns: drain it
765 // immediately.
766 c.(transport.ServerTransport).Drain()
767 }
15c0b25d
AP
768 s.conns[c] = true
769 return true
770}
771
772func (s *Server) removeConn(c io.Closer) {
773 s.mu.Lock()
774 defer s.mu.Unlock()
775 if s.conns != nil {
776 delete(s.conns, c)
777 s.cv.Broadcast()
778 }
779}
780
107c1cdb
ND
781func (s *Server) channelzMetric() *channelz.ServerInternalMetric {
782 return &channelz.ServerInternalMetric{
783 CallsStarted: atomic.LoadInt64(&s.czData.callsStarted),
784 CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded),
785 CallsFailed: atomic.LoadInt64(&s.czData.callsFailed),
786 LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)),
15c0b25d 787 }
107c1cdb
ND
788}
789
790func (s *Server) incrCallsStarted() {
791 atomic.AddInt64(&s.czData.callsStarted, 1)
792 atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano())
793}
794
795func (s *Server) incrCallsSucceeded() {
796 atomic.AddInt64(&s.czData.callsSucceeded, 1)
797}
798
799func (s *Server) incrCallsFailed() {
800 atomic.AddInt64(&s.czData.callsFailed, 1)
801}
802
803func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
804 data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
15c0b25d
AP
805 if err != nil {
806 grpclog.Errorln("grpc: server failed to encode response: ", err)
807 return err
808 }
107c1cdb
ND
809 compData, err := compress(data, cp, comp)
810 if err != nil {
811 grpclog.Errorln("grpc: server failed to compress response: ", err)
812 return err
813 }
814 hdr, payload := msgHeader(data, compData)
815 // TODO(dfawley): should we be checking len(data) instead?
816 if len(payload) > s.opts.maxSendMessageSize {
817 return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize)
15c0b25d 818 }
107c1cdb
ND
819 err = t.Write(stream, hdr, payload, opts)
820 if err == nil && s.opts.statsHandler != nil {
821 s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now()))
15c0b25d
AP
822 }
823 return err
824}
825
826func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
107c1cdb
ND
827 if channelz.IsOn() {
828 s.incrCallsStarted()
829 defer func() {
830 if err != nil && err != io.EOF {
831 s.incrCallsFailed()
832 } else {
833 s.incrCallsSucceeded()
834 }
835 }()
836 }
15c0b25d
AP
837 sh := s.opts.statsHandler
838 if sh != nil {
107c1cdb 839 beginTime := time.Now()
15c0b25d 840 begin := &stats.Begin{
107c1cdb 841 BeginTime: beginTime,
15c0b25d
AP
842 }
843 sh.HandleRPC(stream.Context(), begin)
844 defer func() {
845 end := &stats.End{
107c1cdb
ND
846 BeginTime: beginTime,
847 EndTime: time.Now(),
15c0b25d
AP
848 }
849 if err != nil && err != io.EOF {
850 end.Error = toRPCErr(err)
851 }
852 sh.HandleRPC(stream.Context(), end)
853 }()
854 }
855 if trInfo != nil {
856 defer trInfo.tr.Finish()
857 trInfo.firstLine.client = false
858 trInfo.tr.LazyLog(&trInfo.firstLine, false)
859 defer func() {
860 if err != nil && err != io.EOF {
861 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
862 trInfo.tr.SetError()
863 }
864 }()
865 }
107c1cdb
ND
866
867 binlog := binarylog.GetMethodLogger(stream.Method())
868 if binlog != nil {
869 ctx := stream.Context()
870 md, _ := metadata.FromIncomingContext(ctx)
871 logEntry := &binarylog.ClientHeader{
872 Header: md,
873 MethodName: stream.Method(),
874 PeerAddr: nil,
875 }
876 if deadline, ok := ctx.Deadline(); ok {
877 logEntry.Timeout = deadline.Sub(time.Now())
878 if logEntry.Timeout < 0 {
879 logEntry.Timeout = 0
880 }
881 }
882 if a := md[":authority"]; len(a) > 0 {
883 logEntry.Authority = a[0]
884 }
885 if peer, ok := peer.FromContext(ctx); ok {
886 logEntry.PeerAddr = peer.Addr
887 }
888 binlog.Log(logEntry)
889 }
890
891 // comp and cp are used for compression. decomp and dc are used for
892 // decompression. If comp and decomp are both set, they are the same;
893 // however they are kept separate to ensure that at most one of the
894 // compressor/decompressor variable pairs are set for use later.
895 var comp, decomp encoding.Compressor
896 var cp Compressor
897 var dc Decompressor
898
899 // If dc is set and matches the stream's compression, use it. Otherwise, try
900 // to find a matching registered compressor for decomp.
901 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
902 dc = s.opts.dc
903 } else if rc != "" && rc != encoding.Identity {
904 decomp = encoding.GetCompressor(rc)
905 if decomp == nil {
906 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
907 t.WriteStatus(stream, st)
908 return st.Err()
909 }
15c0b25d 910 }
107c1cdb
ND
911
912 // If cp is set, use it. Otherwise, attempt to compress the response using
913 // the incoming message compression method.
914 //
915 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
916 if s.opts.cp != nil {
917 cp = s.opts.cp
918 stream.SetSendCompress(cp.Type())
919 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
920 // Legacy compressor not specified; attempt to respond with same encoding.
921 comp = encoding.GetCompressor(rc)
922 if comp != nil {
923 stream.SetSendCompress(rc)
924 }
15c0b25d 925 }
107c1cdb
ND
926
927 var payInfo *payloadInfo
928 if sh != nil || binlog != nil {
929 payInfo = &payloadInfo{}
15c0b25d 930 }
107c1cdb 931 d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
15c0b25d
AP
932 if err != nil {
933 if st, ok := status.FromError(err); ok {
934 if e := t.WriteStatus(stream, st); e != nil {
935 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
936 }
15c0b25d
AP
937 }
938 return err
939 }
107c1cdb
ND
940 if channelz.IsOn() {
941 t.IncrMsgRecv()
15c0b25d
AP
942 }
943 df := func(v interface{}) error {
107c1cdb 944 if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
15c0b25d
AP
945 return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
946 }
107c1cdb
ND
947 if sh != nil {
948 sh.HandleRPC(stream.Context(), &stats.InPayload{
949 RecvTime: time.Now(),
950 Payload: v,
951 Data: d,
952 Length: len(d),
953 })
954 }
955 if binlog != nil {
956 binlog.Log(&binarylog.ClientMessage{
957 Message: d,
958 })
15c0b25d
AP
959 }
960 if trInfo != nil {
961 trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
962 }
963 return nil
964 }
107c1cdb
ND
965 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
966 reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
15c0b25d
AP
967 if appErr != nil {
968 appStatus, ok := status.FromError(appErr)
969 if !ok {
970 // Convert appErr if it is not a grpc status error.
107c1cdb 971 appErr = status.Error(codes.Unknown, appErr.Error())
15c0b25d
AP
972 appStatus, _ = status.FromError(appErr)
973 }
974 if trInfo != nil {
975 trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
976 trInfo.tr.SetError()
977 }
978 if e := t.WriteStatus(stream, appStatus); e != nil {
979 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
980 }
107c1cdb
ND
981 if binlog != nil {
982 if h, _ := stream.Header(); h.Len() > 0 {
983 // Only log serverHeader if there was header. Otherwise it can
984 // be trailer only.
985 binlog.Log(&binarylog.ServerHeader{
986 Header: h,
987 })
988 }
989 binlog.Log(&binarylog.ServerTrailer{
990 Trailer: stream.Trailer(),
991 Err: appErr,
992 })
993 }
15c0b25d
AP
994 return appErr
995 }
996 if trInfo != nil {
997 trInfo.tr.LazyLog(stringer("OK"), false)
998 }
107c1cdb
ND
999 opts := &transport.Options{Last: true}
1000
1001 if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
15c0b25d
AP
1002 if err == io.EOF {
1003 // The entire stream is done (for unary RPC only).
1004 return err
1005 }
1006 if s, ok := status.FromError(err); ok {
1007 if e := t.WriteStatus(stream, s); e != nil {
1008 grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e)
1009 }
1010 } else {
1011 switch st := err.(type) {
1012 case transport.ConnectionError:
1013 // Nothing to do here.
15c0b25d
AP
1014 default:
1015 panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
1016 }
1017 }
107c1cdb
ND
1018 if binlog != nil {
1019 h, _ := stream.Header()
1020 binlog.Log(&binarylog.ServerHeader{
1021 Header: h,
1022 })
1023 binlog.Log(&binarylog.ServerTrailer{
1024 Trailer: stream.Trailer(),
1025 Err: appErr,
1026 })
1027 }
15c0b25d
AP
1028 return err
1029 }
107c1cdb
ND
1030 if binlog != nil {
1031 h, _ := stream.Header()
1032 binlog.Log(&binarylog.ServerHeader{
1033 Header: h,
1034 })
1035 binlog.Log(&binarylog.ServerMessage{
1036 Message: reply,
1037 })
1038 }
1039 if channelz.IsOn() {
1040 t.IncrMsgSent()
1041 }
15c0b25d
AP
1042 if trInfo != nil {
1043 trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true)
1044 }
1045 // TODO: Should we be logging if writing status failed here, like above?
1046 // Should the logging be in WriteStatus? Should we ignore the WriteStatus
1047 // error or allow the stats handler to see it?
107c1cdb
ND
1048 err = t.WriteStatus(stream, status.New(codes.OK, ""))
1049 if binlog != nil {
1050 binlog.Log(&binarylog.ServerTrailer{
1051 Trailer: stream.Trailer(),
1052 Err: appErr,
1053 })
1054 }
1055 return err
15c0b25d
AP
1056}
1057
1058func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
107c1cdb
ND
1059 if channelz.IsOn() {
1060 s.incrCallsStarted()
1061 defer func() {
1062 if err != nil && err != io.EOF {
1063 s.incrCallsFailed()
1064 } else {
1065 s.incrCallsSucceeded()
1066 }
1067 }()
1068 }
15c0b25d
AP
1069 sh := s.opts.statsHandler
1070 if sh != nil {
107c1cdb 1071 beginTime := time.Now()
15c0b25d 1072 begin := &stats.Begin{
107c1cdb 1073 BeginTime: beginTime,
15c0b25d
AP
1074 }
1075 sh.HandleRPC(stream.Context(), begin)
1076 defer func() {
1077 end := &stats.End{
107c1cdb
ND
1078 BeginTime: beginTime,
1079 EndTime: time.Now(),
15c0b25d
AP
1080 }
1081 if err != nil && err != io.EOF {
1082 end.Error = toRPCErr(err)
1083 }
1084 sh.HandleRPC(stream.Context(), end)
1085 }()
1086 }
107c1cdb 1087 ctx := NewContextWithServerTransportStream(stream.Context(), stream)
15c0b25d 1088 ss := &serverStream{
107c1cdb
ND
1089 ctx: ctx,
1090 t: t,
1091 s: stream,
1092 p: &parser{r: stream},
1093 codec: s.getCodec(stream.ContentSubtype()),
15c0b25d
AP
1094 maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
1095 maxSendMessageSize: s.opts.maxSendMessageSize,
1096 trInfo: trInfo,
1097 statsHandler: sh,
1098 }
107c1cdb
ND
1099
1100 ss.binlog = binarylog.GetMethodLogger(stream.Method())
1101 if ss.binlog != nil {
1102 md, _ := metadata.FromIncomingContext(ctx)
1103 logEntry := &binarylog.ClientHeader{
1104 Header: md,
1105 MethodName: stream.Method(),
1106 PeerAddr: nil,
1107 }
1108 if deadline, ok := ctx.Deadline(); ok {
1109 logEntry.Timeout = deadline.Sub(time.Now())
1110 if logEntry.Timeout < 0 {
1111 logEntry.Timeout = 0
1112 }
1113 }
1114 if a := md[":authority"]; len(a) > 0 {
1115 logEntry.Authority = a[0]
1116 }
1117 if peer, ok := peer.FromContext(ss.Context()); ok {
1118 logEntry.PeerAddr = peer.Addr
1119 }
1120 ss.binlog.Log(logEntry)
1121 }
1122
1123 // If dc is set and matches the stream's compression, use it. Otherwise, try
1124 // to find a matching registered compressor for decomp.
1125 if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
1126 ss.dc = s.opts.dc
1127 } else if rc != "" && rc != encoding.Identity {
1128 ss.decomp = encoding.GetCompressor(rc)
1129 if ss.decomp == nil {
1130 st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
1131 t.WriteStatus(ss.s, st)
1132 return st.Err()
1133 }
1134 }
1135
1136 // If cp is set, use it. Otherwise, attempt to compress the response using
1137 // the incoming message compression method.
1138 //
1139 // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
1140 if s.opts.cp != nil {
1141 ss.cp = s.opts.cp
1142 stream.SetSendCompress(s.opts.cp.Type())
1143 } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
1144 // Legacy compressor not specified; attempt to respond with same encoding.
1145 ss.comp = encoding.GetCompressor(rc)
1146 if ss.comp != nil {
1147 stream.SetSendCompress(rc)
1148 }
15c0b25d 1149 }
107c1cdb 1150
15c0b25d
AP
1151 if trInfo != nil {
1152 trInfo.tr.LazyLog(&trInfo.firstLine, false)
1153 defer func() {
1154 ss.mu.Lock()
1155 if err != nil && err != io.EOF {
1156 ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1157 ss.trInfo.tr.SetError()
1158 }
1159 ss.trInfo.tr.Finish()
1160 ss.trInfo.tr = nil
1161 ss.mu.Unlock()
1162 }()
1163 }
1164 var appErr error
1165 var server interface{}
1166 if srv != nil {
1167 server = srv.server
1168 }
1169 if s.opts.streamInt == nil {
1170 appErr = sd.Handler(server, ss)
1171 } else {
1172 info := &StreamServerInfo{
1173 FullMethod: stream.Method(),
1174 IsClientStream: sd.ClientStreams,
1175 IsServerStream: sd.ServerStreams,
1176 }
1177 appErr = s.opts.streamInt(server, ss, info, sd.Handler)
1178 }
1179 if appErr != nil {
1180 appStatus, ok := status.FromError(appErr)
1181 if !ok {
107c1cdb 1182 appStatus = status.New(codes.Unknown, appErr.Error())
15c0b25d
AP
1183 appErr = appStatus.Err()
1184 }
1185 if trInfo != nil {
1186 ss.mu.Lock()
1187 ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true)
1188 ss.trInfo.tr.SetError()
1189 ss.mu.Unlock()
1190 }
1191 t.WriteStatus(ss.s, appStatus)
107c1cdb
ND
1192 if ss.binlog != nil {
1193 ss.binlog.Log(&binarylog.ServerTrailer{
1194 Trailer: ss.s.Trailer(),
1195 Err: appErr,
1196 })
1197 }
15c0b25d
AP
1198 // TODO: Should we log an error from WriteStatus here and below?
1199 return appErr
1200 }
1201 if trInfo != nil {
1202 ss.mu.Lock()
1203 ss.trInfo.tr.LazyLog(stringer("OK"), false)
1204 ss.mu.Unlock()
1205 }
107c1cdb
ND
1206 err = t.WriteStatus(ss.s, status.New(codes.OK, ""))
1207 if ss.binlog != nil {
1208 ss.binlog.Log(&binarylog.ServerTrailer{
1209 Trailer: ss.s.Trailer(),
1210 Err: appErr,
1211 })
1212 }
1213 return err
15c0b25d
AP
1214}
1215
1216func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
1217 sm := stream.Method()
1218 if sm != "" && sm[0] == '/' {
1219 sm = sm[1:]
1220 }
1221 pos := strings.LastIndex(sm, "/")
1222 if pos == -1 {
1223 if trInfo != nil {
1224 trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
1225 trInfo.tr.SetError()
1226 }
1227 errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
1228 if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
1229 if trInfo != nil {
1230 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1231 trInfo.tr.SetError()
1232 }
1233 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1234 }
1235 if trInfo != nil {
1236 trInfo.tr.Finish()
1237 }
1238 return
1239 }
1240 service := sm[:pos]
1241 method := sm[pos+1:]
107c1cdb
ND
1242
1243 if srv, ok := s.m[service]; ok {
1244 if md, ok := srv.md[method]; ok {
1245 s.processUnaryRPC(t, stream, srv, md, trInfo)
15c0b25d
AP
1246 return
1247 }
107c1cdb
ND
1248 if sd, ok := srv.sd[method]; ok {
1249 s.processStreamingRPC(t, stream, srv, sd, trInfo)
1250 return
15c0b25d 1251 }
15c0b25d 1252 }
107c1cdb
ND
1253 // Unknown service, or known server unknown method.
1254 if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
1255 s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
15c0b25d
AP
1256 return
1257 }
1258 if trInfo != nil {
107c1cdb 1259 trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true)
15c0b25d
AP
1260 trInfo.tr.SetError()
1261 }
107c1cdb 1262 errDesc := fmt.Sprintf("unknown service %v", service)
15c0b25d
AP
1263 if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
1264 if trInfo != nil {
1265 trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
1266 trInfo.tr.SetError()
1267 }
1268 grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err)
1269 }
1270 if trInfo != nil {
1271 trInfo.tr.Finish()
1272 }
1273}
1274
107c1cdb
ND
1275// The key to save ServerTransportStream in the context.
1276type streamKey struct{}
1277
1278// NewContextWithServerTransportStream creates a new context from ctx and
1279// attaches stream to it.
1280//
1281// This API is EXPERIMENTAL.
1282func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context {
1283 return context.WithValue(ctx, streamKey{}, stream)
1284}
1285
1286// ServerTransportStream is a minimal interface that a transport stream must
1287// implement. This can be used to mock an actual transport stream for tests of
1288// handler code that use, for example, grpc.SetHeader (which requires some
1289// stream to be in context).
1290//
1291// See also NewContextWithServerTransportStream.
1292//
1293// This API is EXPERIMENTAL.
1294type ServerTransportStream interface {
1295 Method() string
1296 SetHeader(md metadata.MD) error
1297 SendHeader(md metadata.MD) error
1298 SetTrailer(md metadata.MD) error
1299}
1300
1301// ServerTransportStreamFromContext returns the ServerTransportStream saved in
1302// ctx. Returns nil if the given context has no stream associated with it
1303// (which implies it is not an RPC invocation context).
1304//
1305// This API is EXPERIMENTAL.
1306func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream {
1307 s, _ := ctx.Value(streamKey{}).(ServerTransportStream)
1308 return s
1309}
1310
15c0b25d
AP
1311// Stop stops the gRPC server. It immediately closes all open
1312// connections and listeners.
1313// It cancels all active RPCs on the server side and the corresponding
1314// pending RPCs on the client side will get notified by connection
1315// errors.
1316func (s *Server) Stop() {
107c1cdb
ND
1317 s.quitOnce.Do(func() {
1318 close(s.quit)
1319 })
1320
1321 defer func() {
1322 s.serveWG.Wait()
1323 s.doneOnce.Do(func() {
1324 close(s.done)
1325 })
1326 }()
1327
1328 s.channelzRemoveOnce.Do(func() {
1329 if channelz.IsOn() {
1330 channelz.RemoveEntry(s.channelzID)
1331 }
1332 })
1333
15c0b25d
AP
1334 s.mu.Lock()
1335 listeners := s.lis
1336 s.lis = nil
1337 st := s.conns
1338 s.conns = nil
1339 // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
1340 s.cv.Broadcast()
1341 s.mu.Unlock()
1342
1343 for lis := range listeners {
1344 lis.Close()
1345 }
1346 for c := range st {
1347 c.Close()
1348 }
1349
1350 s.mu.Lock()
15c0b25d
AP
1351 if s.events != nil {
1352 s.events.Finish()
1353 s.events = nil
1354 }
1355 s.mu.Unlock()
1356}
1357
1358// GracefulStop stops the gRPC server gracefully. It stops the server from
1359// accepting new connections and RPCs and blocks until all the pending RPCs are
1360// finished.
1361func (s *Server) GracefulStop() {
107c1cdb
ND
1362 s.quitOnce.Do(func() {
1363 close(s.quit)
1364 })
1365
1366 defer func() {
1367 s.doneOnce.Do(func() {
1368 close(s.done)
1369 })
1370 }()
1371
1372 s.channelzRemoveOnce.Do(func() {
1373 if channelz.IsOn() {
1374 channelz.RemoveEntry(s.channelzID)
1375 }
1376 })
15c0b25d 1377 s.mu.Lock()
15c0b25d 1378 if s.conns == nil {
107c1cdb 1379 s.mu.Unlock()
15c0b25d
AP
1380 return
1381 }
107c1cdb 1382
15c0b25d
AP
1383 for lis := range s.lis {
1384 lis.Close()
1385 }
1386 s.lis = nil
15c0b25d
AP
1387 if !s.drain {
1388 for c := range s.conns {
1389 c.(transport.ServerTransport).Drain()
1390 }
1391 s.drain = true
1392 }
107c1cdb
ND
1393
1394 // Wait for serving threads to be ready to exit. Only then can we be sure no
1395 // new conns will be created.
1396 s.mu.Unlock()
1397 s.serveWG.Wait()
1398 s.mu.Lock()
1399
15c0b25d
AP
1400 for len(s.conns) != 0 {
1401 s.cv.Wait()
1402 }
1403 s.conns = nil
1404 if s.events != nil {
1405 s.events.Finish()
1406 s.events = nil
1407 }
107c1cdb 1408 s.mu.Unlock()
15c0b25d
AP
1409}
1410
107c1cdb
ND
1411// contentSubtype must be lowercase
1412// cannot return nil
1413func (s *Server) getCodec(contentSubtype string) baseCodec {
1414 if s.opts.codec != nil {
1415 return s.opts.codec
15c0b25d 1416 }
107c1cdb
ND
1417 if contentSubtype == "" {
1418 return encoding.GetCodec(proto.Name)
15c0b25d 1419 }
107c1cdb
ND
1420 codec := encoding.GetCodec(contentSubtype)
1421 if codec == nil {
1422 return encoding.GetCodec(proto.Name)
15c0b25d 1423 }
107c1cdb 1424 return codec
15c0b25d
AP
1425}
1426
1427// SetHeader sets the header metadata.
1428// When called multiple times, all the provided metadata will be merged.
1429// All the metadata will be sent out when one of the following happens:
1430// - grpc.SendHeader() is called;
1431// - The first response is sent out;
1432// - An RPC status is sent out (error or success).
1433func SetHeader(ctx context.Context, md metadata.MD) error {
1434 if md.Len() == 0 {
1435 return nil
1436 }
107c1cdb
ND
1437 stream := ServerTransportStreamFromContext(ctx)
1438 if stream == nil {
1439 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
15c0b25d
AP
1440 }
1441 return stream.SetHeader(md)
1442}
1443
1444// SendHeader sends header metadata. It may be called at most once.
1445// The provided md and headers set by SetHeader() will be sent.
1446func SendHeader(ctx context.Context, md metadata.MD) error {
107c1cdb
ND
1447 stream := ServerTransportStreamFromContext(ctx)
1448 if stream == nil {
1449 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
15c0b25d 1450 }
107c1cdb 1451 if err := stream.SendHeader(md); err != nil {
15c0b25d
AP
1452 return toRPCErr(err)
1453 }
1454 return nil
1455}
1456
1457// SetTrailer sets the trailer metadata that will be sent when an RPC returns.
1458// When called more than once, all the provided metadata will be merged.
1459func SetTrailer(ctx context.Context, md metadata.MD) error {
1460 if md.Len() == 0 {
1461 return nil
1462 }
107c1cdb
ND
1463 stream := ServerTransportStreamFromContext(ctx)
1464 if stream == nil {
1465 return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
15c0b25d
AP
1466 }
1467 return stream.SetTrailer(md)
1468}
107c1cdb
ND
1469
1470// Method returns the method string for the server context. The returned
1471// string is in the format of "/service/method".
1472func Method(ctx context.Context) (string, bool) {
1473 s := ServerTransportStreamFromContext(ctx)
1474 if s == nil {
1475 return "", false
1476 }
1477 return s.Method(), true
1478}
1479
1480type channelzServer struct {
1481 s *Server
1482}
1483
1484func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric {
1485 return c.s.channelzMetric()
1486}