]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | /* |
2 | * | |
3 | * Copyright 2014 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package grpc | |
20 | ||
21 | import ( | |
107c1cdb | 22 | "context" |
15c0b25d AP |
23 | "errors" |
24 | "fmt" | |
25 | "io" | |
107c1cdb | 26 | "math" |
15c0b25d AP |
27 | "net" |
28 | "net/http" | |
29 | "reflect" | |
30 | "runtime" | |
31 | "strings" | |
32 | "sync" | |
107c1cdb | 33 | "sync/atomic" |
15c0b25d AP |
34 | "time" |
35 | ||
15c0b25d | 36 | "golang.org/x/net/trace" |
107c1cdb | 37 | |
15c0b25d AP |
38 | "google.golang.org/grpc/codes" |
39 | "google.golang.org/grpc/credentials" | |
107c1cdb ND |
40 | "google.golang.org/grpc/encoding" |
41 | "google.golang.org/grpc/encoding/proto" | |
15c0b25d | 42 | "google.golang.org/grpc/grpclog" |
107c1cdb ND |
43 | "google.golang.org/grpc/internal/binarylog" |
44 | "google.golang.org/grpc/internal/channelz" | |
45 | "google.golang.org/grpc/internal/transport" | |
15c0b25d AP |
46 | "google.golang.org/grpc/keepalive" |
47 | "google.golang.org/grpc/metadata" | |
107c1cdb | 48 | "google.golang.org/grpc/peer" |
15c0b25d AP |
49 | "google.golang.org/grpc/stats" |
50 | "google.golang.org/grpc/status" | |
51 | "google.golang.org/grpc/tap" | |
15c0b25d AP |
52 | ) |
53 | ||
54 | const ( | |
55 | defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 | |
107c1cdb | 56 | defaultServerMaxSendMessageSize = math.MaxInt32 |
15c0b25d AP |
57 | ) |
58 | ||
59 | type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error) | |
60 | ||
61 | // MethodDesc represents an RPC service's method specification. | |
62 | type MethodDesc struct { | |
63 | MethodName string | |
64 | Handler methodHandler | |
65 | } | |
66 | ||
67 | // ServiceDesc represents an RPC service's specification. | |
68 | type ServiceDesc struct { | |
69 | ServiceName string | |
70 | // The pointer to the service interface. Used to check whether the user | |
71 | // provided implementation satisfies the interface requirements. | |
72 | HandlerType interface{} | |
73 | Methods []MethodDesc | |
74 | Streams []StreamDesc | |
75 | Metadata interface{} | |
76 | } | |
77 | ||
78 | // service consists of the information of the server serving this service and | |
79 | // the methods in this service. | |
80 | type service struct { | |
81 | server interface{} // the server for service methods | |
82 | md map[string]*MethodDesc | |
83 | sd map[string]*StreamDesc | |
84 | mdata interface{} | |
85 | } | |
86 | ||
87 | // Server is a gRPC server to serve RPC requests. | |
88 | type Server struct { | |
89 | opts options | |
90 | ||
91 | mu sync.Mutex // guards following | |
92 | lis map[net.Listener]bool | |
93 | conns map[io.Closer]bool | |
94 | serve bool | |
95 | drain bool | |
107c1cdb | 96 | cv *sync.Cond // signaled when connections close for GracefulStop |
15c0b25d AP |
97 | m map[string]*service // service name -> service info |
98 | events trace.EventLog | |
107c1cdb ND |
99 | |
100 | quit chan struct{} | |
101 | done chan struct{} | |
102 | quitOnce sync.Once | |
103 | doneOnce sync.Once | |
104 | channelzRemoveOnce sync.Once | |
105 | serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop | |
106 | ||
107 | channelzID int64 // channelz unique identification number | |
108 | czData *channelzData | |
15c0b25d AP |
109 | } |
110 | ||
111 | type options struct { | |
112 | creds credentials.TransportCredentials | |
107c1cdb | 113 | codec baseCodec |
15c0b25d AP |
114 | cp Compressor |
115 | dc Decompressor | |
116 | unaryInt UnaryServerInterceptor | |
117 | streamInt StreamServerInterceptor | |
118 | inTapHandle tap.ServerInHandle | |
119 | statsHandler stats.Handler | |
120 | maxConcurrentStreams uint32 | |
121 | maxReceiveMessageSize int | |
122 | maxSendMessageSize int | |
15c0b25d AP |
123 | unknownStreamDesc *StreamDesc |
124 | keepaliveParams keepalive.ServerParameters | |
125 | keepalivePolicy keepalive.EnforcementPolicy | |
126 | initialWindowSize int32 | |
127 | initialConnWindowSize int32 | |
107c1cdb ND |
128 | writeBufferSize int |
129 | readBufferSize int | |
130 | connectionTimeout time.Duration | |
131 | maxHeaderListSize *uint32 | |
15c0b25d AP |
132 | } |
133 | ||
134 | var defaultServerOptions = options{ | |
135 | maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, | |
136 | maxSendMessageSize: defaultServerMaxSendMessageSize, | |
107c1cdb ND |
137 | connectionTimeout: 120 * time.Second, |
138 | writeBufferSize: defaultWriteBufSize, | |
139 | readBufferSize: defaultReadBufSize, | |
15c0b25d AP |
140 | } |
141 | ||
142 | // A ServerOption sets options such as credentials, codec and keepalive parameters, etc. | |
143 | type ServerOption func(*options) | |
144 | ||
107c1cdb ND |
145 | // WriteBufferSize determines how much data can be batched before doing a write on the wire. |
146 | // The corresponding memory allocation for this buffer will be twice the size to keep syscalls low. | |
147 | // The default value for this buffer is 32KB. | |
148 | // Zero will disable the write buffer such that each write will be on underlying connection. | |
149 | // Note: A Send call may not directly translate to a write. | |
150 | func WriteBufferSize(s int) ServerOption { | |
151 | return func(o *options) { | |
152 | o.writeBufferSize = s | |
153 | } | |
154 | } | |
155 | ||
156 | // ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most | |
157 | // for one read syscall. | |
158 | // The default value for this buffer is 32KB. | |
159 | // Zero will disable read buffer for a connection so data framer can access the underlying | |
160 | // conn directly. | |
161 | func ReadBufferSize(s int) ServerOption { | |
162 | return func(o *options) { | |
163 | o.readBufferSize = s | |
164 | } | |
165 | } | |
166 | ||
15c0b25d AP |
167 | // InitialWindowSize returns a ServerOption that sets window size for stream. |
168 | // The lower bound for window size is 64K and any value smaller than that will be ignored. | |
169 | func InitialWindowSize(s int32) ServerOption { | |
170 | return func(o *options) { | |
171 | o.initialWindowSize = s | |
172 | } | |
173 | } | |
174 | ||
175 | // InitialConnWindowSize returns a ServerOption that sets window size for a connection. | |
176 | // The lower bound for window size is 64K and any value smaller than that will be ignored. | |
177 | func InitialConnWindowSize(s int32) ServerOption { | |
178 | return func(o *options) { | |
179 | o.initialConnWindowSize = s | |
180 | } | |
181 | } | |
182 | ||
183 | // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. | |
184 | func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { | |
185 | return func(o *options) { | |
186 | o.keepaliveParams = kp | |
187 | } | |
188 | } | |
189 | ||
190 | // KeepaliveEnforcementPolicy returns a ServerOption that sets keepalive enforcement policy for the server. | |
191 | func KeepaliveEnforcementPolicy(kep keepalive.EnforcementPolicy) ServerOption { | |
192 | return func(o *options) { | |
193 | o.keepalivePolicy = kep | |
194 | } | |
195 | } | |
196 | ||
197 | // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. | |
107c1cdb ND |
198 | // |
199 | // This will override any lookups by content-subtype for Codecs registered with RegisterCodec. | |
15c0b25d AP |
200 | func CustomCodec(codec Codec) ServerOption { |
201 | return func(o *options) { | |
202 | o.codec = codec | |
203 | } | |
204 | } | |
205 | ||
107c1cdb ND |
206 | // RPCCompressor returns a ServerOption that sets a compressor for outbound |
207 | // messages. For backward compatibility, all outbound messages will be sent | |
208 | // using this compressor, regardless of incoming message compression. By | |
209 | // default, server messages will be sent using the same compressor with which | |
210 | // request messages were sent. | |
211 | // | |
212 | // Deprecated: use encoding.RegisterCompressor instead. | |
15c0b25d AP |
213 | func RPCCompressor(cp Compressor) ServerOption { |
214 | return func(o *options) { | |
215 | o.cp = cp | |
216 | } | |
217 | } | |
218 | ||
107c1cdb ND |
219 | // RPCDecompressor returns a ServerOption that sets a decompressor for inbound |
220 | // messages. It has higher priority than decompressors registered via | |
221 | // encoding.RegisterCompressor. | |
222 | // | |
223 | // Deprecated: use encoding.RegisterCompressor instead. | |
15c0b25d AP |
224 | func RPCDecompressor(dc Decompressor) ServerOption { |
225 | return func(o *options) { | |
226 | o.dc = dc | |
227 | } | |
228 | } | |
229 | ||
230 | // MaxMsgSize returns a ServerOption to set the max message size in bytes the server can receive. | |
107c1cdb ND |
231 | // If this is not set, gRPC uses the default limit. |
232 | // | |
233 | // Deprecated: use MaxRecvMsgSize instead. | |
15c0b25d AP |
234 | func MaxMsgSize(m int) ServerOption { |
235 | return MaxRecvMsgSize(m) | |
236 | } | |
237 | ||
238 | // MaxRecvMsgSize returns a ServerOption to set the max message size in bytes the server can receive. | |
239 | // If this is not set, gRPC uses the default 4MB. | |
240 | func MaxRecvMsgSize(m int) ServerOption { | |
241 | return func(o *options) { | |
242 | o.maxReceiveMessageSize = m | |
243 | } | |
244 | } | |
245 | ||
246 | // MaxSendMsgSize returns a ServerOption to set the max message size in bytes the server can send. | |
247 | // If this is not set, gRPC uses the default 4MB. | |
248 | func MaxSendMsgSize(m int) ServerOption { | |
249 | return func(o *options) { | |
250 | o.maxSendMessageSize = m | |
251 | } | |
252 | } | |
253 | ||
254 | // MaxConcurrentStreams returns a ServerOption that will apply a limit on the number | |
255 | // of concurrent streams to each ServerTransport. | |
256 | func MaxConcurrentStreams(n uint32) ServerOption { | |
257 | return func(o *options) { | |
258 | o.maxConcurrentStreams = n | |
259 | } | |
260 | } | |
261 | ||
262 | // Creds returns a ServerOption that sets credentials for server connections. | |
263 | func Creds(c credentials.TransportCredentials) ServerOption { | |
264 | return func(o *options) { | |
265 | o.creds = c | |
266 | } | |
267 | } | |
268 | ||
269 | // UnaryInterceptor returns a ServerOption that sets the UnaryServerInterceptor for the | |
270 | // server. Only one unary interceptor can be installed. The construction of multiple | |
271 | // interceptors (e.g., chaining) can be implemented at the caller. | |
272 | func UnaryInterceptor(i UnaryServerInterceptor) ServerOption { | |
273 | return func(o *options) { | |
274 | if o.unaryInt != nil { | |
275 | panic("The unary server interceptor was already set and may not be reset.") | |
276 | } | |
277 | o.unaryInt = i | |
278 | } | |
279 | } | |
280 | ||
281 | // StreamInterceptor returns a ServerOption that sets the StreamServerInterceptor for the | |
282 | // server. Only one stream interceptor can be installed. | |
283 | func StreamInterceptor(i StreamServerInterceptor) ServerOption { | |
284 | return func(o *options) { | |
285 | if o.streamInt != nil { | |
286 | panic("The stream server interceptor was already set and may not be reset.") | |
287 | } | |
288 | o.streamInt = i | |
289 | } | |
290 | } | |
291 | ||
292 | // InTapHandle returns a ServerOption that sets the tap handle for all the server | |
293 | // transport to be created. Only one can be installed. | |
294 | func InTapHandle(h tap.ServerInHandle) ServerOption { | |
295 | return func(o *options) { | |
296 | if o.inTapHandle != nil { | |
297 | panic("The tap handle was already set and may not be reset.") | |
298 | } | |
299 | o.inTapHandle = h | |
300 | } | |
301 | } | |
302 | ||
303 | // StatsHandler returns a ServerOption that sets the stats handler for the server. | |
304 | func StatsHandler(h stats.Handler) ServerOption { | |
305 | return func(o *options) { | |
306 | o.statsHandler = h | |
307 | } | |
308 | } | |
309 | ||
310 | // UnknownServiceHandler returns a ServerOption that allows for adding a custom | |
311 | // unknown service handler. The provided method is a bidi-streaming RPC service | |
312 | // handler that will be invoked instead of returning the "unimplemented" gRPC | |
313 | // error whenever a request is received for an unregistered service or method. | |
314 | // The handling function has full access to the Context of the request and the | |
107c1cdb | 315 | // stream, and the invocation bypasses interceptors. |
15c0b25d AP |
316 | func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { |
317 | return func(o *options) { | |
318 | o.unknownStreamDesc = &StreamDesc{ | |
319 | StreamName: "unknown_service_handler", | |
320 | Handler: streamHandler, | |
321 | // We need to assume that the users of the streamHandler will want to use both. | |
322 | ClientStreams: true, | |
323 | ServerStreams: true, | |
324 | } | |
325 | } | |
326 | } | |
327 | ||
107c1cdb ND |
328 | // ConnectionTimeout returns a ServerOption that sets the timeout for |
329 | // connection establishment (up to and including HTTP/2 handshaking) for all | |
330 | // new connections. If this is not set, the default is 120 seconds. A zero or | |
331 | // negative value will result in an immediate timeout. | |
332 | // | |
333 | // This API is EXPERIMENTAL. | |
334 | func ConnectionTimeout(d time.Duration) ServerOption { | |
335 | return func(o *options) { | |
336 | o.connectionTimeout = d | |
337 | } | |
338 | } | |
339 | ||
340 | // MaxHeaderListSize returns a ServerOption that sets the max (uncompressed) size | |
341 | // of header list that the server is prepared to accept. | |
342 | func MaxHeaderListSize(s uint32) ServerOption { | |
343 | return func(o *options) { | |
344 | o.maxHeaderListSize = &s | |
345 | } | |
346 | } | |
347 | ||
15c0b25d AP |
348 | // NewServer creates a gRPC server which has no service registered and has not |
349 | // started to accept requests yet. | |
350 | func NewServer(opt ...ServerOption) *Server { | |
351 | opts := defaultServerOptions | |
352 | for _, o := range opt { | |
353 | o(&opts) | |
354 | } | |
15c0b25d | 355 | s := &Server{ |
107c1cdb ND |
356 | lis: make(map[net.Listener]bool), |
357 | opts: opts, | |
358 | conns: make(map[io.Closer]bool), | |
359 | m: make(map[string]*service), | |
360 | quit: make(chan struct{}), | |
361 | done: make(chan struct{}), | |
362 | czData: new(channelzData), | |
15c0b25d AP |
363 | } |
364 | s.cv = sync.NewCond(&s.mu) | |
15c0b25d AP |
365 | if EnableTracing { |
366 | _, file, line, _ := runtime.Caller(1) | |
367 | s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) | |
368 | } | |
107c1cdb ND |
369 | |
370 | if channelz.IsOn() { | |
371 | s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") | |
372 | } | |
15c0b25d AP |
373 | return s |
374 | } | |
375 | ||
376 | // printf records an event in s's event log, unless s has been stopped. | |
377 | // REQUIRES s.mu is held. | |
378 | func (s *Server) printf(format string, a ...interface{}) { | |
379 | if s.events != nil { | |
380 | s.events.Printf(format, a...) | |
381 | } | |
382 | } | |
383 | ||
384 | // errorf records an error in s's event log, unless s has been stopped. | |
385 | // REQUIRES s.mu is held. | |
386 | func (s *Server) errorf(format string, a ...interface{}) { | |
387 | if s.events != nil { | |
388 | s.events.Errorf(format, a...) | |
389 | } | |
390 | } | |
391 | ||
392 | // RegisterService registers a service and its implementation to the gRPC | |
393 | // server. It is called from the IDL generated code. This must be called before | |
394 | // invoking Serve. | |
395 | func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) { | |
396 | ht := reflect.TypeOf(sd.HandlerType).Elem() | |
397 | st := reflect.TypeOf(ss) | |
398 | if !st.Implements(ht) { | |
399 | grpclog.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) | |
400 | } | |
401 | s.register(sd, ss) | |
402 | } | |
403 | ||
404 | func (s *Server) register(sd *ServiceDesc, ss interface{}) { | |
405 | s.mu.Lock() | |
406 | defer s.mu.Unlock() | |
407 | s.printf("RegisterService(%q)", sd.ServiceName) | |
408 | if s.serve { | |
409 | grpclog.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) | |
410 | } | |
411 | if _, ok := s.m[sd.ServiceName]; ok { | |
412 | grpclog.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) | |
413 | } | |
414 | srv := &service{ | |
415 | server: ss, | |
416 | md: make(map[string]*MethodDesc), | |
417 | sd: make(map[string]*StreamDesc), | |
418 | mdata: sd.Metadata, | |
419 | } | |
420 | for i := range sd.Methods { | |
421 | d := &sd.Methods[i] | |
422 | srv.md[d.MethodName] = d | |
423 | } | |
424 | for i := range sd.Streams { | |
425 | d := &sd.Streams[i] | |
426 | srv.sd[d.StreamName] = d | |
427 | } | |
428 | s.m[sd.ServiceName] = srv | |
429 | } | |
430 | ||
431 | // MethodInfo contains the information of an RPC including its method name and type. | |
432 | type MethodInfo struct { | |
433 | // Name is the method name only, without the service name or package name. | |
434 | Name string | |
435 | // IsClientStream indicates whether the RPC is a client streaming RPC. | |
436 | IsClientStream bool | |
437 | // IsServerStream indicates whether the RPC is a server streaming RPC. | |
438 | IsServerStream bool | |
439 | } | |
440 | ||
441 | // ServiceInfo contains unary RPC method info, streaming RPC method info and metadata for a service. | |
442 | type ServiceInfo struct { | |
443 | Methods []MethodInfo | |
444 | // Metadata is the metadata specified in ServiceDesc when registering service. | |
445 | Metadata interface{} | |
446 | } | |
447 | ||
448 | // GetServiceInfo returns a map from service names to ServiceInfo. | |
449 | // Service names include the package names, in the form of <package>.<service>. | |
450 | func (s *Server) GetServiceInfo() map[string]ServiceInfo { | |
451 | ret := make(map[string]ServiceInfo) | |
452 | for n, srv := range s.m { | |
453 | methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd)) | |
454 | for m := range srv.md { | |
455 | methods = append(methods, MethodInfo{ | |
456 | Name: m, | |
457 | IsClientStream: false, | |
458 | IsServerStream: false, | |
459 | }) | |
460 | } | |
461 | for m, d := range srv.sd { | |
462 | methods = append(methods, MethodInfo{ | |
463 | Name: m, | |
464 | IsClientStream: d.ClientStreams, | |
465 | IsServerStream: d.ServerStreams, | |
466 | }) | |
467 | } | |
468 | ||
469 | ret[n] = ServiceInfo{ | |
470 | Methods: methods, | |
471 | Metadata: srv.mdata, | |
472 | } | |
473 | } | |
474 | return ret | |
475 | } | |
476 | ||
107c1cdb ND |
477 | // ErrServerStopped indicates that the operation is now illegal because of |
478 | // the server being stopped. | |
479 | var ErrServerStopped = errors.New("grpc: the server has been stopped") | |
15c0b25d AP |
480 | |
481 | func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { | |
482 | if s.opts.creds == nil { | |
483 | return rawConn, nil, nil | |
484 | } | |
485 | return s.opts.creds.ServerHandshake(rawConn) | |
486 | } | |
487 | ||
107c1cdb ND |
488 | type listenSocket struct { |
489 | net.Listener | |
490 | channelzID int64 | |
491 | } | |
492 | ||
493 | func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { | |
494 | return &channelz.SocketInternalMetric{ | |
495 | SocketOptions: channelz.GetSocketOption(l.Listener), | |
496 | LocalAddr: l.Listener.Addr(), | |
497 | } | |
498 | } | |
499 | ||
500 | func (l *listenSocket) Close() error { | |
501 | err := l.Listener.Close() | |
502 | if channelz.IsOn() { | |
503 | channelz.RemoveEntry(l.channelzID) | |
504 | } | |
505 | return err | |
506 | } | |
507 | ||
15c0b25d AP |
508 | // Serve accepts incoming connections on the listener lis, creating a new |
509 | // ServerTransport and service goroutine for each. The service goroutines | |
510 | // read gRPC requests and then call the registered handlers to reply to them. | |
511 | // Serve returns when lis.Accept fails with fatal errors. lis will be closed when | |
512 | // this method returns. | |
107c1cdb | 513 | // Serve will return a non-nil error unless Stop or GracefulStop is called. |
15c0b25d AP |
514 | func (s *Server) Serve(lis net.Listener) error { |
515 | s.mu.Lock() | |
516 | s.printf("serving") | |
517 | s.serve = true | |
518 | if s.lis == nil { | |
107c1cdb | 519 | // Serve called after Stop or GracefulStop. |
15c0b25d AP |
520 | s.mu.Unlock() |
521 | lis.Close() | |
522 | return ErrServerStopped | |
523 | } | |
107c1cdb ND |
524 | |
525 | s.serveWG.Add(1) | |
526 | defer func() { | |
527 | s.serveWG.Done() | |
528 | select { | |
529 | // Stop or GracefulStop called; block until done and return nil. | |
530 | case <-s.quit: | |
531 | <-s.done | |
532 | default: | |
533 | } | |
534 | }() | |
535 | ||
536 | ls := &listenSocket{Listener: lis} | |
537 | s.lis[ls] = true | |
538 | ||
539 | if channelz.IsOn() { | |
540 | ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, lis.Addr().String()) | |
541 | } | |
15c0b25d | 542 | s.mu.Unlock() |
107c1cdb | 543 | |
15c0b25d AP |
544 | defer func() { |
545 | s.mu.Lock() | |
107c1cdb ND |
546 | if s.lis != nil && s.lis[ls] { |
547 | ls.Close() | |
548 | delete(s.lis, ls) | |
15c0b25d AP |
549 | } |
550 | s.mu.Unlock() | |
551 | }() | |
552 | ||
553 | var tempDelay time.Duration // how long to sleep on accept failure | |
554 | ||
555 | for { | |
556 | rawConn, err := lis.Accept() | |
557 | if err != nil { | |
558 | if ne, ok := err.(interface { | |
559 | Temporary() bool | |
560 | }); ok && ne.Temporary() { | |
561 | if tempDelay == 0 { | |
562 | tempDelay = 5 * time.Millisecond | |
563 | } else { | |
564 | tempDelay *= 2 | |
565 | } | |
566 | if max := 1 * time.Second; tempDelay > max { | |
567 | tempDelay = max | |
568 | } | |
569 | s.mu.Lock() | |
570 | s.printf("Accept error: %v; retrying in %v", err, tempDelay) | |
571 | s.mu.Unlock() | |
572 | timer := time.NewTimer(tempDelay) | |
573 | select { | |
574 | case <-timer.C: | |
107c1cdb ND |
575 | case <-s.quit: |
576 | timer.Stop() | |
577 | return nil | |
15c0b25d | 578 | } |
15c0b25d AP |
579 | continue |
580 | } | |
581 | s.mu.Lock() | |
582 | s.printf("done serving; Accept = %v", err) | |
583 | s.mu.Unlock() | |
107c1cdb ND |
584 | |
585 | select { | |
586 | case <-s.quit: | |
587 | return nil | |
588 | default: | |
589 | } | |
15c0b25d AP |
590 | return err |
591 | } | |
592 | tempDelay = 0 | |
107c1cdb ND |
593 | // Start a new goroutine to deal with rawConn so we don't stall this Accept |
594 | // loop goroutine. | |
595 | // | |
596 | // Make sure we account for the goroutine so GracefulStop doesn't nil out | |
597 | // s.conns before this conn can be added. | |
598 | s.serveWG.Add(1) | |
599 | go func() { | |
600 | s.handleRawConn(rawConn) | |
601 | s.serveWG.Done() | |
602 | }() | |
15c0b25d AP |
603 | } |
604 | } | |
605 | ||
107c1cdb ND |
606 | // handleRawConn forks a goroutine to handle a just-accepted connection that |
607 | // has not had any I/O performed on it yet. | |
15c0b25d | 608 | func (s *Server) handleRawConn(rawConn net.Conn) { |
107c1cdb | 609 | rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) |
15c0b25d AP |
610 | conn, authInfo, err := s.useTransportAuthenticator(rawConn) |
611 | if err != nil { | |
612 | s.mu.Lock() | |
613 | s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) | |
614 | s.mu.Unlock() | |
615 | grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) | |
107c1cdb | 616 | // If serverHandshake returns ErrConnDispatched, keep rawConn open. |
15c0b25d AP |
617 | if err != credentials.ErrConnDispatched { |
618 | rawConn.Close() | |
619 | } | |
107c1cdb | 620 | rawConn.SetDeadline(time.Time{}) |
15c0b25d AP |
621 | return |
622 | } | |
623 | ||
624 | s.mu.Lock() | |
625 | if s.conns == nil { | |
626 | s.mu.Unlock() | |
627 | conn.Close() | |
628 | return | |
629 | } | |
630 | s.mu.Unlock() | |
631 | ||
107c1cdb ND |
632 | // Finish handshaking (HTTP2) |
633 | st := s.newHTTP2Transport(conn, authInfo) | |
634 | if st == nil { | |
635 | return | |
636 | } | |
637 | ||
638 | rawConn.SetDeadline(time.Time{}) | |
639 | if !s.addConn(st) { | |
640 | return | |
15c0b25d | 641 | } |
107c1cdb ND |
642 | go func() { |
643 | s.serveStreams(st) | |
644 | s.removeConn(st) | |
645 | }() | |
15c0b25d AP |
646 | } |
647 | ||
107c1cdb ND |
648 | // newHTTP2Transport sets up a http/2 transport (using the |
649 | // gRPC http2 server transport in transport/http2_server.go). | |
650 | func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport { | |
15c0b25d AP |
651 | config := &transport.ServerConfig{ |
652 | MaxStreams: s.opts.maxConcurrentStreams, | |
653 | AuthInfo: authInfo, | |
654 | InTapHandle: s.opts.inTapHandle, | |
655 | StatsHandler: s.opts.statsHandler, | |
656 | KeepaliveParams: s.opts.keepaliveParams, | |
657 | KeepalivePolicy: s.opts.keepalivePolicy, | |
658 | InitialWindowSize: s.opts.initialWindowSize, | |
659 | InitialConnWindowSize: s.opts.initialConnWindowSize, | |
107c1cdb ND |
660 | WriteBufferSize: s.opts.writeBufferSize, |
661 | ReadBufferSize: s.opts.readBufferSize, | |
662 | ChannelzParentID: s.channelzID, | |
663 | MaxHeaderListSize: s.opts.maxHeaderListSize, | |
15c0b25d AP |
664 | } |
665 | st, err := transport.NewServerTransport("http2", c, config) | |
666 | if err != nil { | |
667 | s.mu.Lock() | |
668 | s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) | |
669 | s.mu.Unlock() | |
670 | c.Close() | |
671 | grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err) | |
107c1cdb | 672 | return nil |
15c0b25d | 673 | } |
107c1cdb ND |
674 | |
675 | return st | |
15c0b25d AP |
676 | } |
677 | ||
678 | func (s *Server) serveStreams(st transport.ServerTransport) { | |
15c0b25d AP |
679 | defer st.Close() |
680 | var wg sync.WaitGroup | |
681 | st.HandleStreams(func(stream *transport.Stream) { | |
682 | wg.Add(1) | |
683 | go func() { | |
684 | defer wg.Done() | |
685 | s.handleStream(st, stream, s.traceInfo(st, stream)) | |
686 | }() | |
687 | }, func(ctx context.Context, method string) context.Context { | |
688 | if !EnableTracing { | |
689 | return ctx | |
690 | } | |
691 | tr := trace.New("grpc.Recv."+methodFamily(method), method) | |
692 | return trace.NewContext(ctx, tr) | |
693 | }) | |
694 | wg.Wait() | |
695 | } | |
696 | ||
697 | var _ http.Handler = (*Server)(nil) | |
698 | ||
15c0b25d AP |
699 | // ServeHTTP implements the Go standard library's http.Handler |
700 | // interface by responding to the gRPC request r, by looking up | |
701 | // the requested gRPC method in the gRPC server s. | |
702 | // | |
703 | // The provided HTTP request must have arrived on an HTTP/2 | |
704 | // connection. When using the Go standard library's server, | |
705 | // practically this means that the Request must also have arrived | |
706 | // over TLS. | |
707 | // | |
708 | // To share one port (such as 443 for https) between gRPC and an | |
709 | // existing http.Handler, use a root http.Handler such as: | |
710 | // | |
711 | // if r.ProtoMajor == 2 && strings.HasPrefix( | |
712 | // r.Header.Get("Content-Type"), "application/grpc") { | |
713 | // grpcServer.ServeHTTP(w, r) | |
714 | // } else { | |
715 | // yourMux.ServeHTTP(w, r) | |
716 | // } | |
717 | // | |
718 | // Note that ServeHTTP uses Go's HTTP/2 server implementation which is totally | |
719 | // separate from grpc-go's HTTP/2 server. Performance and features may vary | |
720 | // between the two paths. ServeHTTP does not support some gRPC features | |
721 | // available through grpc-go's HTTP/2 server, and it is currently EXPERIMENTAL | |
722 | // and subject to change. | |
723 | func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { | |
107c1cdb | 724 | st, err := transport.NewServerHandlerTransport(w, r, s.opts.statsHandler) |
15c0b25d AP |
725 | if err != nil { |
726 | http.Error(w, err.Error(), http.StatusInternalServerError) | |
727 | return | |
728 | } | |
729 | if !s.addConn(st) { | |
15c0b25d AP |
730 | return |
731 | } | |
732 | defer s.removeConn(st) | |
733 | s.serveStreams(st) | |
734 | } | |
735 | ||
736 | // traceInfo returns a traceInfo and associates it with stream, if tracing is enabled. | |
737 | // If tracing is not enabled, it returns nil. | |
738 | func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Stream) (trInfo *traceInfo) { | |
739 | tr, ok := trace.FromContext(stream.Context()) | |
740 | if !ok { | |
741 | return nil | |
742 | } | |
743 | ||
744 | trInfo = &traceInfo{ | |
745 | tr: tr, | |
746 | } | |
747 | trInfo.firstLine.client = false | |
748 | trInfo.firstLine.remoteAddr = st.RemoteAddr() | |
749 | ||
750 | if dl, ok := stream.Context().Deadline(); ok { | |
751 | trInfo.firstLine.deadline = dl.Sub(time.Now()) | |
752 | } | |
753 | return trInfo | |
754 | } | |
755 | ||
756 | func (s *Server) addConn(c io.Closer) bool { | |
757 | s.mu.Lock() | |
758 | defer s.mu.Unlock() | |
107c1cdb ND |
759 | if s.conns == nil { |
760 | c.Close() | |
15c0b25d AP |
761 | return false |
762 | } | |
107c1cdb ND |
763 | if s.drain { |
764 | // Transport added after we drained our existing conns: drain it | |
765 | // immediately. | |
766 | c.(transport.ServerTransport).Drain() | |
767 | } | |
15c0b25d AP |
768 | s.conns[c] = true |
769 | return true | |
770 | } | |
771 | ||
772 | func (s *Server) removeConn(c io.Closer) { | |
773 | s.mu.Lock() | |
774 | defer s.mu.Unlock() | |
775 | if s.conns != nil { | |
776 | delete(s.conns, c) | |
777 | s.cv.Broadcast() | |
778 | } | |
779 | } | |
780 | ||
107c1cdb ND |
781 | func (s *Server) channelzMetric() *channelz.ServerInternalMetric { |
782 | return &channelz.ServerInternalMetric{ | |
783 | CallsStarted: atomic.LoadInt64(&s.czData.callsStarted), | |
784 | CallsSucceeded: atomic.LoadInt64(&s.czData.callsSucceeded), | |
785 | CallsFailed: atomic.LoadInt64(&s.czData.callsFailed), | |
786 | LastCallStartedTimestamp: time.Unix(0, atomic.LoadInt64(&s.czData.lastCallStartedTime)), | |
15c0b25d | 787 | } |
107c1cdb ND |
788 | } |
789 | ||
790 | func (s *Server) incrCallsStarted() { | |
791 | atomic.AddInt64(&s.czData.callsStarted, 1) | |
792 | atomic.StoreInt64(&s.czData.lastCallStartedTime, time.Now().UnixNano()) | |
793 | } | |
794 | ||
795 | func (s *Server) incrCallsSucceeded() { | |
796 | atomic.AddInt64(&s.czData.callsSucceeded, 1) | |
797 | } | |
798 | ||
799 | func (s *Server) incrCallsFailed() { | |
800 | atomic.AddInt64(&s.czData.callsFailed, 1) | |
801 | } | |
802 | ||
803 | func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { | |
804 | data, err := encode(s.getCodec(stream.ContentSubtype()), msg) | |
15c0b25d AP |
805 | if err != nil { |
806 | grpclog.Errorln("grpc: server failed to encode response: ", err) | |
807 | return err | |
808 | } | |
107c1cdb ND |
809 | compData, err := compress(data, cp, comp) |
810 | if err != nil { | |
811 | grpclog.Errorln("grpc: server failed to compress response: ", err) | |
812 | return err | |
813 | } | |
814 | hdr, payload := msgHeader(data, compData) | |
815 | // TODO(dfawley): should we be checking len(data) instead? | |
816 | if len(payload) > s.opts.maxSendMessageSize { | |
817 | return status.Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(payload), s.opts.maxSendMessageSize) | |
15c0b25d | 818 | } |
107c1cdb ND |
819 | err = t.Write(stream, hdr, payload, opts) |
820 | if err == nil && s.opts.statsHandler != nil { | |
821 | s.opts.statsHandler.HandleRPC(stream.Context(), outPayload(false, msg, data, payload, time.Now())) | |
15c0b25d AP |
822 | } |
823 | return err | |
824 | } | |
825 | ||
826 | func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { | |
107c1cdb ND |
827 | if channelz.IsOn() { |
828 | s.incrCallsStarted() | |
829 | defer func() { | |
830 | if err != nil && err != io.EOF { | |
831 | s.incrCallsFailed() | |
832 | } else { | |
833 | s.incrCallsSucceeded() | |
834 | } | |
835 | }() | |
836 | } | |
15c0b25d AP |
837 | sh := s.opts.statsHandler |
838 | if sh != nil { | |
107c1cdb | 839 | beginTime := time.Now() |
15c0b25d | 840 | begin := &stats.Begin{ |
107c1cdb | 841 | BeginTime: beginTime, |
15c0b25d AP |
842 | } |
843 | sh.HandleRPC(stream.Context(), begin) | |
844 | defer func() { | |
845 | end := &stats.End{ | |
107c1cdb ND |
846 | BeginTime: beginTime, |
847 | EndTime: time.Now(), | |
15c0b25d AP |
848 | } |
849 | if err != nil && err != io.EOF { | |
850 | end.Error = toRPCErr(err) | |
851 | } | |
852 | sh.HandleRPC(stream.Context(), end) | |
853 | }() | |
854 | } | |
855 | if trInfo != nil { | |
856 | defer trInfo.tr.Finish() | |
857 | trInfo.firstLine.client = false | |
858 | trInfo.tr.LazyLog(&trInfo.firstLine, false) | |
859 | defer func() { | |
860 | if err != nil && err != io.EOF { | |
861 | trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | |
862 | trInfo.tr.SetError() | |
863 | } | |
864 | }() | |
865 | } | |
107c1cdb ND |
866 | |
867 | binlog := binarylog.GetMethodLogger(stream.Method()) | |
868 | if binlog != nil { | |
869 | ctx := stream.Context() | |
870 | md, _ := metadata.FromIncomingContext(ctx) | |
871 | logEntry := &binarylog.ClientHeader{ | |
872 | Header: md, | |
873 | MethodName: stream.Method(), | |
874 | PeerAddr: nil, | |
875 | } | |
876 | if deadline, ok := ctx.Deadline(); ok { | |
877 | logEntry.Timeout = deadline.Sub(time.Now()) | |
878 | if logEntry.Timeout < 0 { | |
879 | logEntry.Timeout = 0 | |
880 | } | |
881 | } | |
882 | if a := md[":authority"]; len(a) > 0 { | |
883 | logEntry.Authority = a[0] | |
884 | } | |
885 | if peer, ok := peer.FromContext(ctx); ok { | |
886 | logEntry.PeerAddr = peer.Addr | |
887 | } | |
888 | binlog.Log(logEntry) | |
889 | } | |
890 | ||
891 | // comp and cp are used for compression. decomp and dc are used for | |
892 | // decompression. If comp and decomp are both set, they are the same; | |
893 | // however they are kept separate to ensure that at most one of the | |
894 | // compressor/decompressor variable pairs are set for use later. | |
895 | var comp, decomp encoding.Compressor | |
896 | var cp Compressor | |
897 | var dc Decompressor | |
898 | ||
899 | // If dc is set and matches the stream's compression, use it. Otherwise, try | |
900 | // to find a matching registered compressor for decomp. | |
901 | if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { | |
902 | dc = s.opts.dc | |
903 | } else if rc != "" && rc != encoding.Identity { | |
904 | decomp = encoding.GetCompressor(rc) | |
905 | if decomp == nil { | |
906 | st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) | |
907 | t.WriteStatus(stream, st) | |
908 | return st.Err() | |
909 | } | |
15c0b25d | 910 | } |
107c1cdb ND |
911 | |
912 | // If cp is set, use it. Otherwise, attempt to compress the response using | |
913 | // the incoming message compression method. | |
914 | // | |
915 | // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. | |
916 | if s.opts.cp != nil { | |
917 | cp = s.opts.cp | |
918 | stream.SetSendCompress(cp.Type()) | |
919 | } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { | |
920 | // Legacy compressor not specified; attempt to respond with same encoding. | |
921 | comp = encoding.GetCompressor(rc) | |
922 | if comp != nil { | |
923 | stream.SetSendCompress(rc) | |
924 | } | |
15c0b25d | 925 | } |
107c1cdb ND |
926 | |
927 | var payInfo *payloadInfo | |
928 | if sh != nil || binlog != nil { | |
929 | payInfo = &payloadInfo{} | |
15c0b25d | 930 | } |
107c1cdb | 931 | d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp) |
15c0b25d AP |
932 | if err != nil { |
933 | if st, ok := status.FromError(err); ok { | |
934 | if e := t.WriteStatus(stream, st); e != nil { | |
935 | grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) | |
936 | } | |
15c0b25d AP |
937 | } |
938 | return err | |
939 | } | |
107c1cdb ND |
940 | if channelz.IsOn() { |
941 | t.IncrMsgRecv() | |
15c0b25d AP |
942 | } |
943 | df := func(v interface{}) error { | |
107c1cdb | 944 | if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil { |
15c0b25d AP |
945 | return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err) |
946 | } | |
107c1cdb ND |
947 | if sh != nil { |
948 | sh.HandleRPC(stream.Context(), &stats.InPayload{ | |
949 | RecvTime: time.Now(), | |
950 | Payload: v, | |
951 | Data: d, | |
952 | Length: len(d), | |
953 | }) | |
954 | } | |
955 | if binlog != nil { | |
956 | binlog.Log(&binarylog.ClientMessage{ | |
957 | Message: d, | |
958 | }) | |
15c0b25d AP |
959 | } |
960 | if trInfo != nil { | |
961 | trInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) | |
962 | } | |
963 | return nil | |
964 | } | |
107c1cdb ND |
965 | ctx := NewContextWithServerTransportStream(stream.Context(), stream) |
966 | reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) | |
15c0b25d AP |
967 | if appErr != nil { |
968 | appStatus, ok := status.FromError(appErr) | |
969 | if !ok { | |
970 | // Convert appErr if it is not a grpc status error. | |
107c1cdb | 971 | appErr = status.Error(codes.Unknown, appErr.Error()) |
15c0b25d AP |
972 | appStatus, _ = status.FromError(appErr) |
973 | } | |
974 | if trInfo != nil { | |
975 | trInfo.tr.LazyLog(stringer(appStatus.Message()), true) | |
976 | trInfo.tr.SetError() | |
977 | } | |
978 | if e := t.WriteStatus(stream, appStatus); e != nil { | |
979 | grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) | |
980 | } | |
107c1cdb ND |
981 | if binlog != nil { |
982 | if h, _ := stream.Header(); h.Len() > 0 { | |
983 | // Only log serverHeader if there was header. Otherwise it can | |
984 | // be trailer only. | |
985 | binlog.Log(&binarylog.ServerHeader{ | |
986 | Header: h, | |
987 | }) | |
988 | } | |
989 | binlog.Log(&binarylog.ServerTrailer{ | |
990 | Trailer: stream.Trailer(), | |
991 | Err: appErr, | |
992 | }) | |
993 | } | |
15c0b25d AP |
994 | return appErr |
995 | } | |
996 | if trInfo != nil { | |
997 | trInfo.tr.LazyLog(stringer("OK"), false) | |
998 | } | |
107c1cdb ND |
999 | opts := &transport.Options{Last: true} |
1000 | ||
1001 | if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { | |
15c0b25d AP |
1002 | if err == io.EOF { |
1003 | // The entire stream is done (for unary RPC only). | |
1004 | return err | |
1005 | } | |
1006 | if s, ok := status.FromError(err); ok { | |
1007 | if e := t.WriteStatus(stream, s); e != nil { | |
1008 | grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status: %v", e) | |
1009 | } | |
1010 | } else { | |
1011 | switch st := err.(type) { | |
1012 | case transport.ConnectionError: | |
1013 | // Nothing to do here. | |
15c0b25d AP |
1014 | default: |
1015 | panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) | |
1016 | } | |
1017 | } | |
107c1cdb ND |
1018 | if binlog != nil { |
1019 | h, _ := stream.Header() | |
1020 | binlog.Log(&binarylog.ServerHeader{ | |
1021 | Header: h, | |
1022 | }) | |
1023 | binlog.Log(&binarylog.ServerTrailer{ | |
1024 | Trailer: stream.Trailer(), | |
1025 | Err: appErr, | |
1026 | }) | |
1027 | } | |
15c0b25d AP |
1028 | return err |
1029 | } | |
107c1cdb ND |
1030 | if binlog != nil { |
1031 | h, _ := stream.Header() | |
1032 | binlog.Log(&binarylog.ServerHeader{ | |
1033 | Header: h, | |
1034 | }) | |
1035 | binlog.Log(&binarylog.ServerMessage{ | |
1036 | Message: reply, | |
1037 | }) | |
1038 | } | |
1039 | if channelz.IsOn() { | |
1040 | t.IncrMsgSent() | |
1041 | } | |
15c0b25d AP |
1042 | if trInfo != nil { |
1043 | trInfo.tr.LazyLog(&payload{sent: true, msg: reply}, true) | |
1044 | } | |
1045 | // TODO: Should we be logging if writing status failed here, like above? | |
1046 | // Should the logging be in WriteStatus? Should we ignore the WriteStatus | |
1047 | // error or allow the stats handler to see it? | |
107c1cdb ND |
1048 | err = t.WriteStatus(stream, status.New(codes.OK, "")) |
1049 | if binlog != nil { | |
1050 | binlog.Log(&binarylog.ServerTrailer{ | |
1051 | Trailer: stream.Trailer(), | |
1052 | Err: appErr, | |
1053 | }) | |
1054 | } | |
1055 | return err | |
15c0b25d AP |
1056 | } |
1057 | ||
1058 | func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { | |
107c1cdb ND |
1059 | if channelz.IsOn() { |
1060 | s.incrCallsStarted() | |
1061 | defer func() { | |
1062 | if err != nil && err != io.EOF { | |
1063 | s.incrCallsFailed() | |
1064 | } else { | |
1065 | s.incrCallsSucceeded() | |
1066 | } | |
1067 | }() | |
1068 | } | |
15c0b25d AP |
1069 | sh := s.opts.statsHandler |
1070 | if sh != nil { | |
107c1cdb | 1071 | beginTime := time.Now() |
15c0b25d | 1072 | begin := &stats.Begin{ |
107c1cdb | 1073 | BeginTime: beginTime, |
15c0b25d AP |
1074 | } |
1075 | sh.HandleRPC(stream.Context(), begin) | |
1076 | defer func() { | |
1077 | end := &stats.End{ | |
107c1cdb ND |
1078 | BeginTime: beginTime, |
1079 | EndTime: time.Now(), | |
15c0b25d AP |
1080 | } |
1081 | if err != nil && err != io.EOF { | |
1082 | end.Error = toRPCErr(err) | |
1083 | } | |
1084 | sh.HandleRPC(stream.Context(), end) | |
1085 | }() | |
1086 | } | |
107c1cdb | 1087 | ctx := NewContextWithServerTransportStream(stream.Context(), stream) |
15c0b25d | 1088 | ss := &serverStream{ |
107c1cdb ND |
1089 | ctx: ctx, |
1090 | t: t, | |
1091 | s: stream, | |
1092 | p: &parser{r: stream}, | |
1093 | codec: s.getCodec(stream.ContentSubtype()), | |
15c0b25d AP |
1094 | maxReceiveMessageSize: s.opts.maxReceiveMessageSize, |
1095 | maxSendMessageSize: s.opts.maxSendMessageSize, | |
1096 | trInfo: trInfo, | |
1097 | statsHandler: sh, | |
1098 | } | |
107c1cdb ND |
1099 | |
1100 | ss.binlog = binarylog.GetMethodLogger(stream.Method()) | |
1101 | if ss.binlog != nil { | |
1102 | md, _ := metadata.FromIncomingContext(ctx) | |
1103 | logEntry := &binarylog.ClientHeader{ | |
1104 | Header: md, | |
1105 | MethodName: stream.Method(), | |
1106 | PeerAddr: nil, | |
1107 | } | |
1108 | if deadline, ok := ctx.Deadline(); ok { | |
1109 | logEntry.Timeout = deadline.Sub(time.Now()) | |
1110 | if logEntry.Timeout < 0 { | |
1111 | logEntry.Timeout = 0 | |
1112 | } | |
1113 | } | |
1114 | if a := md[":authority"]; len(a) > 0 { | |
1115 | logEntry.Authority = a[0] | |
1116 | } | |
1117 | if peer, ok := peer.FromContext(ss.Context()); ok { | |
1118 | logEntry.PeerAddr = peer.Addr | |
1119 | } | |
1120 | ss.binlog.Log(logEntry) | |
1121 | } | |
1122 | ||
1123 | // If dc is set and matches the stream's compression, use it. Otherwise, try | |
1124 | // to find a matching registered compressor for decomp. | |
1125 | if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { | |
1126 | ss.dc = s.opts.dc | |
1127 | } else if rc != "" && rc != encoding.Identity { | |
1128 | ss.decomp = encoding.GetCompressor(rc) | |
1129 | if ss.decomp == nil { | |
1130 | st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) | |
1131 | t.WriteStatus(ss.s, st) | |
1132 | return st.Err() | |
1133 | } | |
1134 | } | |
1135 | ||
1136 | // If cp is set, use it. Otherwise, attempt to compress the response using | |
1137 | // the incoming message compression method. | |
1138 | // | |
1139 | // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. | |
1140 | if s.opts.cp != nil { | |
1141 | ss.cp = s.opts.cp | |
1142 | stream.SetSendCompress(s.opts.cp.Type()) | |
1143 | } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { | |
1144 | // Legacy compressor not specified; attempt to respond with same encoding. | |
1145 | ss.comp = encoding.GetCompressor(rc) | |
1146 | if ss.comp != nil { | |
1147 | stream.SetSendCompress(rc) | |
1148 | } | |
15c0b25d | 1149 | } |
107c1cdb | 1150 | |
15c0b25d AP |
1151 | if trInfo != nil { |
1152 | trInfo.tr.LazyLog(&trInfo.firstLine, false) | |
1153 | defer func() { | |
1154 | ss.mu.Lock() | |
1155 | if err != nil && err != io.EOF { | |
1156 | ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | |
1157 | ss.trInfo.tr.SetError() | |
1158 | } | |
1159 | ss.trInfo.tr.Finish() | |
1160 | ss.trInfo.tr = nil | |
1161 | ss.mu.Unlock() | |
1162 | }() | |
1163 | } | |
1164 | var appErr error | |
1165 | var server interface{} | |
1166 | if srv != nil { | |
1167 | server = srv.server | |
1168 | } | |
1169 | if s.opts.streamInt == nil { | |
1170 | appErr = sd.Handler(server, ss) | |
1171 | } else { | |
1172 | info := &StreamServerInfo{ | |
1173 | FullMethod: stream.Method(), | |
1174 | IsClientStream: sd.ClientStreams, | |
1175 | IsServerStream: sd.ServerStreams, | |
1176 | } | |
1177 | appErr = s.opts.streamInt(server, ss, info, sd.Handler) | |
1178 | } | |
1179 | if appErr != nil { | |
1180 | appStatus, ok := status.FromError(appErr) | |
1181 | if !ok { | |
107c1cdb | 1182 | appStatus = status.New(codes.Unknown, appErr.Error()) |
15c0b25d AP |
1183 | appErr = appStatus.Err() |
1184 | } | |
1185 | if trInfo != nil { | |
1186 | ss.mu.Lock() | |
1187 | ss.trInfo.tr.LazyLog(stringer(appStatus.Message()), true) | |
1188 | ss.trInfo.tr.SetError() | |
1189 | ss.mu.Unlock() | |
1190 | } | |
1191 | t.WriteStatus(ss.s, appStatus) | |
107c1cdb ND |
1192 | if ss.binlog != nil { |
1193 | ss.binlog.Log(&binarylog.ServerTrailer{ | |
1194 | Trailer: ss.s.Trailer(), | |
1195 | Err: appErr, | |
1196 | }) | |
1197 | } | |
15c0b25d AP |
1198 | // TODO: Should we log an error from WriteStatus here and below? |
1199 | return appErr | |
1200 | } | |
1201 | if trInfo != nil { | |
1202 | ss.mu.Lock() | |
1203 | ss.trInfo.tr.LazyLog(stringer("OK"), false) | |
1204 | ss.mu.Unlock() | |
1205 | } | |
107c1cdb ND |
1206 | err = t.WriteStatus(ss.s, status.New(codes.OK, "")) |
1207 | if ss.binlog != nil { | |
1208 | ss.binlog.Log(&binarylog.ServerTrailer{ | |
1209 | Trailer: ss.s.Trailer(), | |
1210 | Err: appErr, | |
1211 | }) | |
1212 | } | |
1213 | return err | |
15c0b25d AP |
1214 | } |
1215 | ||
1216 | func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) { | |
1217 | sm := stream.Method() | |
1218 | if sm != "" && sm[0] == '/' { | |
1219 | sm = sm[1:] | |
1220 | } | |
1221 | pos := strings.LastIndex(sm, "/") | |
1222 | if pos == -1 { | |
1223 | if trInfo != nil { | |
1224 | trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true) | |
1225 | trInfo.tr.SetError() | |
1226 | } | |
1227 | errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) | |
1228 | if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil { | |
1229 | if trInfo != nil { | |
1230 | trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | |
1231 | trInfo.tr.SetError() | |
1232 | } | |
1233 | grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) | |
1234 | } | |
1235 | if trInfo != nil { | |
1236 | trInfo.tr.Finish() | |
1237 | } | |
1238 | return | |
1239 | } | |
1240 | service := sm[:pos] | |
1241 | method := sm[pos+1:] | |
107c1cdb ND |
1242 | |
1243 | if srv, ok := s.m[service]; ok { | |
1244 | if md, ok := srv.md[method]; ok { | |
1245 | s.processUnaryRPC(t, stream, srv, md, trInfo) | |
15c0b25d AP |
1246 | return |
1247 | } | |
107c1cdb ND |
1248 | if sd, ok := srv.sd[method]; ok { |
1249 | s.processStreamingRPC(t, stream, srv, sd, trInfo) | |
1250 | return | |
15c0b25d | 1251 | } |
15c0b25d | 1252 | } |
107c1cdb ND |
1253 | // Unknown service, or known server unknown method. |
1254 | if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil { | |
1255 | s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo) | |
15c0b25d AP |
1256 | return |
1257 | } | |
1258 | if trInfo != nil { | |
107c1cdb | 1259 | trInfo.tr.LazyLog(&fmtStringer{"Unknown service %v", []interface{}{service}}, true) |
15c0b25d AP |
1260 | trInfo.tr.SetError() |
1261 | } | |
107c1cdb | 1262 | errDesc := fmt.Sprintf("unknown service %v", service) |
15c0b25d AP |
1263 | if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil { |
1264 | if trInfo != nil { | |
1265 | trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | |
1266 | trInfo.tr.SetError() | |
1267 | } | |
1268 | grpclog.Warningf("grpc: Server.handleStream failed to write status: %v", err) | |
1269 | } | |
1270 | if trInfo != nil { | |
1271 | trInfo.tr.Finish() | |
1272 | } | |
1273 | } | |
1274 | ||
107c1cdb ND |
1275 | // The key to save ServerTransportStream in the context. |
1276 | type streamKey struct{} | |
1277 | ||
1278 | // NewContextWithServerTransportStream creates a new context from ctx and | |
1279 | // attaches stream to it. | |
1280 | // | |
1281 | // This API is EXPERIMENTAL. | |
1282 | func NewContextWithServerTransportStream(ctx context.Context, stream ServerTransportStream) context.Context { | |
1283 | return context.WithValue(ctx, streamKey{}, stream) | |
1284 | } | |
1285 | ||
1286 | // ServerTransportStream is a minimal interface that a transport stream must | |
1287 | // implement. This can be used to mock an actual transport stream for tests of | |
1288 | // handler code that use, for example, grpc.SetHeader (which requires some | |
1289 | // stream to be in context). | |
1290 | // | |
1291 | // See also NewContextWithServerTransportStream. | |
1292 | // | |
1293 | // This API is EXPERIMENTAL. | |
1294 | type ServerTransportStream interface { | |
1295 | Method() string | |
1296 | SetHeader(md metadata.MD) error | |
1297 | SendHeader(md metadata.MD) error | |
1298 | SetTrailer(md metadata.MD) error | |
1299 | } | |
1300 | ||
1301 | // ServerTransportStreamFromContext returns the ServerTransportStream saved in | |
1302 | // ctx. Returns nil if the given context has no stream associated with it | |
1303 | // (which implies it is not an RPC invocation context). | |
1304 | // | |
1305 | // This API is EXPERIMENTAL. | |
1306 | func ServerTransportStreamFromContext(ctx context.Context) ServerTransportStream { | |
1307 | s, _ := ctx.Value(streamKey{}).(ServerTransportStream) | |
1308 | return s | |
1309 | } | |
1310 | ||
15c0b25d AP |
1311 | // Stop stops the gRPC server. It immediately closes all open |
1312 | // connections and listeners. | |
1313 | // It cancels all active RPCs on the server side and the corresponding | |
1314 | // pending RPCs on the client side will get notified by connection | |
1315 | // errors. | |
1316 | func (s *Server) Stop() { | |
107c1cdb ND |
1317 | s.quitOnce.Do(func() { |
1318 | close(s.quit) | |
1319 | }) | |
1320 | ||
1321 | defer func() { | |
1322 | s.serveWG.Wait() | |
1323 | s.doneOnce.Do(func() { | |
1324 | close(s.done) | |
1325 | }) | |
1326 | }() | |
1327 | ||
1328 | s.channelzRemoveOnce.Do(func() { | |
1329 | if channelz.IsOn() { | |
1330 | channelz.RemoveEntry(s.channelzID) | |
1331 | } | |
1332 | }) | |
1333 | ||
15c0b25d AP |
1334 | s.mu.Lock() |
1335 | listeners := s.lis | |
1336 | s.lis = nil | |
1337 | st := s.conns | |
1338 | s.conns = nil | |
1339 | // interrupt GracefulStop if Stop and GracefulStop are called concurrently. | |
1340 | s.cv.Broadcast() | |
1341 | s.mu.Unlock() | |
1342 | ||
1343 | for lis := range listeners { | |
1344 | lis.Close() | |
1345 | } | |
1346 | for c := range st { | |
1347 | c.Close() | |
1348 | } | |
1349 | ||
1350 | s.mu.Lock() | |
15c0b25d AP |
1351 | if s.events != nil { |
1352 | s.events.Finish() | |
1353 | s.events = nil | |
1354 | } | |
1355 | s.mu.Unlock() | |
1356 | } | |
1357 | ||
1358 | // GracefulStop stops the gRPC server gracefully. It stops the server from | |
1359 | // accepting new connections and RPCs and blocks until all the pending RPCs are | |
1360 | // finished. | |
1361 | func (s *Server) GracefulStop() { | |
107c1cdb ND |
1362 | s.quitOnce.Do(func() { |
1363 | close(s.quit) | |
1364 | }) | |
1365 | ||
1366 | defer func() { | |
1367 | s.doneOnce.Do(func() { | |
1368 | close(s.done) | |
1369 | }) | |
1370 | }() | |
1371 | ||
1372 | s.channelzRemoveOnce.Do(func() { | |
1373 | if channelz.IsOn() { | |
1374 | channelz.RemoveEntry(s.channelzID) | |
1375 | } | |
1376 | }) | |
15c0b25d | 1377 | s.mu.Lock() |
15c0b25d | 1378 | if s.conns == nil { |
107c1cdb | 1379 | s.mu.Unlock() |
15c0b25d AP |
1380 | return |
1381 | } | |
107c1cdb | 1382 | |
15c0b25d AP |
1383 | for lis := range s.lis { |
1384 | lis.Close() | |
1385 | } | |
1386 | s.lis = nil | |
15c0b25d AP |
1387 | if !s.drain { |
1388 | for c := range s.conns { | |
1389 | c.(transport.ServerTransport).Drain() | |
1390 | } | |
1391 | s.drain = true | |
1392 | } | |
107c1cdb ND |
1393 | |
1394 | // Wait for serving threads to be ready to exit. Only then can we be sure no | |
1395 | // new conns will be created. | |
1396 | s.mu.Unlock() | |
1397 | s.serveWG.Wait() | |
1398 | s.mu.Lock() | |
1399 | ||
15c0b25d AP |
1400 | for len(s.conns) != 0 { |
1401 | s.cv.Wait() | |
1402 | } | |
1403 | s.conns = nil | |
1404 | if s.events != nil { | |
1405 | s.events.Finish() | |
1406 | s.events = nil | |
1407 | } | |
107c1cdb | 1408 | s.mu.Unlock() |
15c0b25d AP |
1409 | } |
1410 | ||
107c1cdb ND |
1411 | // contentSubtype must be lowercase |
1412 | // cannot return nil | |
1413 | func (s *Server) getCodec(contentSubtype string) baseCodec { | |
1414 | if s.opts.codec != nil { | |
1415 | return s.opts.codec | |
15c0b25d | 1416 | } |
107c1cdb ND |
1417 | if contentSubtype == "" { |
1418 | return encoding.GetCodec(proto.Name) | |
15c0b25d | 1419 | } |
107c1cdb ND |
1420 | codec := encoding.GetCodec(contentSubtype) |
1421 | if codec == nil { | |
1422 | return encoding.GetCodec(proto.Name) | |
15c0b25d | 1423 | } |
107c1cdb | 1424 | return codec |
15c0b25d AP |
1425 | } |
1426 | ||
1427 | // SetHeader sets the header metadata. | |
1428 | // When called multiple times, all the provided metadata will be merged. | |
1429 | // All the metadata will be sent out when one of the following happens: | |
1430 | // - grpc.SendHeader() is called; | |
1431 | // - The first response is sent out; | |
1432 | // - An RPC status is sent out (error or success). | |
1433 | func SetHeader(ctx context.Context, md metadata.MD) error { | |
1434 | if md.Len() == 0 { | |
1435 | return nil | |
1436 | } | |
107c1cdb ND |
1437 | stream := ServerTransportStreamFromContext(ctx) |
1438 | if stream == nil { | |
1439 | return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) | |
15c0b25d AP |
1440 | } |
1441 | return stream.SetHeader(md) | |
1442 | } | |
1443 | ||
1444 | // SendHeader sends header metadata. It may be called at most once. | |
1445 | // The provided md and headers set by SetHeader() will be sent. | |
1446 | func SendHeader(ctx context.Context, md metadata.MD) error { | |
107c1cdb ND |
1447 | stream := ServerTransportStreamFromContext(ctx) |
1448 | if stream == nil { | |
1449 | return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) | |
15c0b25d | 1450 | } |
107c1cdb | 1451 | if err := stream.SendHeader(md); err != nil { |
15c0b25d AP |
1452 | return toRPCErr(err) |
1453 | } | |
1454 | return nil | |
1455 | } | |
1456 | ||
1457 | // SetTrailer sets the trailer metadata that will be sent when an RPC returns. | |
1458 | // When called more than once, all the provided metadata will be merged. | |
1459 | func SetTrailer(ctx context.Context, md metadata.MD) error { | |
1460 | if md.Len() == 0 { | |
1461 | return nil | |
1462 | } | |
107c1cdb ND |
1463 | stream := ServerTransportStreamFromContext(ctx) |
1464 | if stream == nil { | |
1465 | return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx) | |
15c0b25d AP |
1466 | } |
1467 | return stream.SetTrailer(md) | |
1468 | } | |
107c1cdb ND |
1469 | |
1470 | // Method returns the method string for the server context. The returned | |
1471 | // string is in the format of "/service/method". | |
1472 | func Method(ctx context.Context) (string, bool) { | |
1473 | s := ServerTransportStreamFromContext(ctx) | |
1474 | if s == nil { | |
1475 | return "", false | |
1476 | } | |
1477 | return s.Method(), true | |
1478 | } | |
1479 | ||
1480 | type channelzServer struct { | |
1481 | s *Server | |
1482 | } | |
1483 | ||
1484 | func (c *channelzServer) ChannelzMetric() *channelz.ServerInternalMetric { | |
1485 | return c.s.channelzMetric() | |
1486 | } |