diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r-- | vendor/google.golang.org/grpc/stream.go | 661 |
1 files changed, 661 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go new file mode 100644 index 0000000..1c621ba --- /dev/null +++ b/vendor/google.golang.org/grpc/stream.go | |||
@@ -0,0 +1,661 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package grpc | ||
20 | |||
21 | import ( | ||
22 | "bytes" | ||
23 | "errors" | ||
24 | "io" | ||
25 | "sync" | ||
26 | "time" | ||
27 | |||
28 | "golang.org/x/net/context" | ||
29 | "golang.org/x/net/trace" | ||
30 | "google.golang.org/grpc/codes" | ||
31 | "google.golang.org/grpc/metadata" | ||
32 | "google.golang.org/grpc/peer" | ||
33 | "google.golang.org/grpc/stats" | ||
34 | "google.golang.org/grpc/status" | ||
35 | "google.golang.org/grpc/transport" | ||
36 | ) | ||
37 | |||
38 | // StreamHandler defines the handler called by gRPC server to complete the | ||
39 | // execution of a streaming RPC. | ||
40 | type StreamHandler func(srv interface{}, stream ServerStream) error | ||
41 | |||
42 | // StreamDesc represents a streaming RPC service's method specification. | ||
43 | type StreamDesc struct { | ||
44 | StreamName string | ||
45 | Handler StreamHandler | ||
46 | |||
47 | // At least one of these is true. | ||
48 | ServerStreams bool | ||
49 | ClientStreams bool | ||
50 | } | ||
51 | |||
52 | // Stream defines the common interface a client or server stream has to satisfy. | ||
53 | type Stream interface { | ||
54 | // Context returns the context for this stream. | ||
55 | Context() context.Context | ||
56 | // SendMsg blocks until it sends m, the stream is done or the stream | ||
57 | // breaks. | ||
58 | // On error, it aborts the stream and returns an RPC status on client | ||
59 | // side. On server side, it simply returns the error to the caller. | ||
60 | // SendMsg is called by generated code. Also Users can call SendMsg | ||
61 | // directly when it is really needed in their use cases. | ||
62 | // It's safe to have a goroutine calling SendMsg and another goroutine calling | ||
63 | // recvMsg on the same stream at the same time. | ||
64 | // But it is not safe to call SendMsg on the same stream in different goroutines. | ||
65 | SendMsg(m interface{}) error | ||
66 | // RecvMsg blocks until it receives a message or the stream is | ||
67 | // done. On client side, it returns io.EOF when the stream is done. On | ||
68 | // any other error, it aborts the stream and returns an RPC status. On | ||
69 | // server side, it simply returns the error to the caller. | ||
70 | // It's safe to have a goroutine calling SendMsg and another goroutine calling | ||
71 | // recvMsg on the same stream at the same time. | ||
72 | // But it is not safe to call RecvMsg on the same stream in different goroutines. | ||
73 | RecvMsg(m interface{}) error | ||
74 | } | ||
75 | |||
76 | // ClientStream defines the interface a client stream has to satisfy. | ||
77 | type ClientStream interface { | ||
78 | // Header returns the header metadata received from the server if there | ||
79 | // is any. It blocks if the metadata is not ready to read. | ||
80 | Header() (metadata.MD, error) | ||
81 | // Trailer returns the trailer metadata from the server, if there is any. | ||
82 | // It must only be called after stream.CloseAndRecv has returned, or | ||
83 | // stream.Recv has returned a non-nil error (including io.EOF). | ||
84 | Trailer() metadata.MD | ||
85 | // CloseSend closes the send direction of the stream. It closes the stream | ||
86 | // when non-nil error is met. | ||
87 | CloseSend() error | ||
88 | // Stream.SendMsg() may return a non-nil error when something wrong happens sending | ||
89 | // the request. The returned error indicates the status of this sending, not the final | ||
90 | // status of the RPC. | ||
91 | // Always call Stream.RecvMsg() to get the final status if you care about the status of | ||
92 | // the RPC. | ||
93 | Stream | ||
94 | } | ||
95 | |||
96 | // NewClientStream creates a new Stream for the client side. This is called | ||
97 | // by generated code. | ||
98 | func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { | ||
99 | if cc.dopts.streamInt != nil { | ||
100 | return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) | ||
101 | } | ||
102 | return newClientStream(ctx, desc, cc, method, opts...) | ||
103 | } | ||
104 | |||
105 | func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { | ||
106 | var ( | ||
107 | t transport.ClientTransport | ||
108 | s *transport.Stream | ||
109 | put func() | ||
110 | cancel context.CancelFunc | ||
111 | ) | ||
112 | c := defaultCallInfo | ||
113 | mc := cc.GetMethodConfig(method) | ||
114 | if mc.WaitForReady != nil { | ||
115 | c.failFast = !*mc.WaitForReady | ||
116 | } | ||
117 | |||
118 | if mc.Timeout != nil { | ||
119 | ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) | ||
120 | } | ||
121 | |||
122 | opts = append(cc.dopts.callOptions, opts...) | ||
123 | for _, o := range opts { | ||
124 | if err := o.before(&c); err != nil { | ||
125 | return nil, toRPCErr(err) | ||
126 | } | ||
127 | } | ||
128 | c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) | ||
129 | c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) | ||
130 | |||
131 | callHdr := &transport.CallHdr{ | ||
132 | Host: cc.authority, | ||
133 | Method: method, | ||
134 | // If it's not client streaming, we should already have the request to be sent, | ||
135 | // so we don't flush the header. | ||
136 | // If it's client streaming, the user may never send a request or send it any | ||
137 | // time soon, so we ask the transport to flush the header. | ||
138 | Flush: desc.ClientStreams, | ||
139 | } | ||
140 | if cc.dopts.cp != nil { | ||
141 | callHdr.SendCompress = cc.dopts.cp.Type() | ||
142 | } | ||
143 | if c.creds != nil { | ||
144 | callHdr.Creds = c.creds | ||
145 | } | ||
146 | var trInfo traceInfo | ||
147 | if EnableTracing { | ||
148 | trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) | ||
149 | trInfo.firstLine.client = true | ||
150 | if deadline, ok := ctx.Deadline(); ok { | ||
151 | trInfo.firstLine.deadline = deadline.Sub(time.Now()) | ||
152 | } | ||
153 | trInfo.tr.LazyLog(&trInfo.firstLine, false) | ||
154 | ctx = trace.NewContext(ctx, trInfo.tr) | ||
155 | defer func() { | ||
156 | if err != nil { | ||
157 | // Need to call tr.finish() if error is returned. | ||
158 | // Because tr will not be returned to caller. | ||
159 | trInfo.tr.LazyPrintf("RPC: [%v]", err) | ||
160 | trInfo.tr.SetError() | ||
161 | trInfo.tr.Finish() | ||
162 | } | ||
163 | }() | ||
164 | } | ||
165 | ctx = newContextWithRPCInfo(ctx) | ||
166 | sh := cc.dopts.copts.StatsHandler | ||
167 | if sh != nil { | ||
168 | ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) | ||
169 | begin := &stats.Begin{ | ||
170 | Client: true, | ||
171 | BeginTime: time.Now(), | ||
172 | FailFast: c.failFast, | ||
173 | } | ||
174 | sh.HandleRPC(ctx, begin) | ||
175 | defer func() { | ||
176 | if err != nil { | ||
177 | // Only handle end stats if err != nil. | ||
178 | end := &stats.End{ | ||
179 | Client: true, | ||
180 | Error: err, | ||
181 | } | ||
182 | sh.HandleRPC(ctx, end) | ||
183 | } | ||
184 | }() | ||
185 | } | ||
186 | gopts := BalancerGetOptions{ | ||
187 | BlockingWait: !c.failFast, | ||
188 | } | ||
189 | for { | ||
190 | t, put, err = cc.getTransport(ctx, gopts) | ||
191 | if err != nil { | ||
192 | // TODO(zhaoq): Probably revisit the error handling. | ||
193 | if _, ok := status.FromError(err); ok { | ||
194 | return nil, err | ||
195 | } | ||
196 | if err == errConnClosing || err == errConnUnavailable { | ||
197 | if c.failFast { | ||
198 | return nil, Errorf(codes.Unavailable, "%v", err) | ||
199 | } | ||
200 | continue | ||
201 | } | ||
202 | // All the other errors are treated as Internal errors. | ||
203 | return nil, Errorf(codes.Internal, "%v", err) | ||
204 | } | ||
205 | |||
206 | s, err = t.NewStream(ctx, callHdr) | ||
207 | if err != nil { | ||
208 | if _, ok := err.(transport.ConnectionError); ok && put != nil { | ||
209 | // If error is connection error, transport was sending data on wire, | ||
210 | // and we are not sure if anything has been sent on wire. | ||
211 | // If error is not connection error, we are sure nothing has been sent. | ||
212 | updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false}) | ||
213 | } | ||
214 | if put != nil { | ||
215 | put() | ||
216 | put = nil | ||
217 | } | ||
218 | if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { | ||
219 | continue | ||
220 | } | ||
221 | return nil, toRPCErr(err) | ||
222 | } | ||
223 | break | ||
224 | } | ||
225 | // Set callInfo.peer object from stream's context. | ||
226 | if peer, ok := peer.FromContext(s.Context()); ok { | ||
227 | c.peer = peer | ||
228 | } | ||
229 | cs := &clientStream{ | ||
230 | opts: opts, | ||
231 | c: c, | ||
232 | desc: desc, | ||
233 | codec: cc.dopts.codec, | ||
234 | cp: cc.dopts.cp, | ||
235 | dc: cc.dopts.dc, | ||
236 | cancel: cancel, | ||
237 | |||
238 | put: put, | ||
239 | t: t, | ||
240 | s: s, | ||
241 | p: &parser{r: s}, | ||
242 | |||
243 | tracing: EnableTracing, | ||
244 | trInfo: trInfo, | ||
245 | |||
246 | statsCtx: ctx, | ||
247 | statsHandler: cc.dopts.copts.StatsHandler, | ||
248 | } | ||
249 | if cc.dopts.cp != nil { | ||
250 | cs.cbuf = new(bytes.Buffer) | ||
251 | } | ||
252 | // Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination | ||
253 | // when there is no pending I/O operations on this stream. | ||
254 | go func() { | ||
255 | select { | ||
256 | case <-t.Error(): | ||
257 | // Incur transport error, simply exit. | ||
258 | case <-cc.ctx.Done(): | ||
259 | cs.finish(ErrClientConnClosing) | ||
260 | cs.closeTransportStream(ErrClientConnClosing) | ||
261 | case <-s.Done(): | ||
262 | // TODO: The trace of the RPC is terminated here when there is no pending | ||
263 | // I/O, which is probably not the optimal solution. | ||
264 | cs.finish(s.Status().Err()) | ||
265 | cs.closeTransportStream(nil) | ||
266 | case <-s.GoAway(): | ||
267 | cs.finish(errConnDrain) | ||
268 | cs.closeTransportStream(errConnDrain) | ||
269 | case <-s.Context().Done(): | ||
270 | err := s.Context().Err() | ||
271 | cs.finish(err) | ||
272 | cs.closeTransportStream(transport.ContextErr(err)) | ||
273 | } | ||
274 | }() | ||
275 | return cs, nil | ||
276 | } | ||
277 | |||
278 | // clientStream implements a client side Stream. | ||
279 | type clientStream struct { | ||
280 | opts []CallOption | ||
281 | c callInfo | ||
282 | t transport.ClientTransport | ||
283 | s *transport.Stream | ||
284 | p *parser | ||
285 | desc *StreamDesc | ||
286 | codec Codec | ||
287 | cp Compressor | ||
288 | cbuf *bytes.Buffer | ||
289 | dc Decompressor | ||
290 | cancel context.CancelFunc | ||
291 | |||
292 | tracing bool // set to EnableTracing when the clientStream is created. | ||
293 | |||
294 | mu sync.Mutex | ||
295 | put func() | ||
296 | closed bool | ||
297 | finished bool | ||
298 | // trInfo.tr is set when the clientStream is created (if EnableTracing is true), | ||
299 | // and is set to nil when the clientStream's finish method is called. | ||
300 | trInfo traceInfo | ||
301 | |||
302 | // statsCtx keeps the user context for stats handling. | ||
303 | // All stats collection should use the statsCtx (instead of the stream context) | ||
304 | // so that all the generated stats for a particular RPC can be associated in the processing phase. | ||
305 | statsCtx context.Context | ||
306 | statsHandler stats.Handler | ||
307 | } | ||
308 | |||
309 | func (cs *clientStream) Context() context.Context { | ||
310 | return cs.s.Context() | ||
311 | } | ||
312 | |||
313 | func (cs *clientStream) Header() (metadata.MD, error) { | ||
314 | m, err := cs.s.Header() | ||
315 | if err != nil { | ||
316 | if _, ok := err.(transport.ConnectionError); !ok { | ||
317 | cs.closeTransportStream(err) | ||
318 | } | ||
319 | } | ||
320 | return m, err | ||
321 | } | ||
322 | |||
323 | func (cs *clientStream) Trailer() metadata.MD { | ||
324 | return cs.s.Trailer() | ||
325 | } | ||
326 | |||
327 | func (cs *clientStream) SendMsg(m interface{}) (err error) { | ||
328 | if cs.tracing { | ||
329 | cs.mu.Lock() | ||
330 | if cs.trInfo.tr != nil { | ||
331 | cs.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) | ||
332 | } | ||
333 | cs.mu.Unlock() | ||
334 | } | ||
335 | // TODO Investigate how to signal the stats handling party. | ||
336 | // generate error stats if err != nil && err != io.EOF? | ||
337 | defer func() { | ||
338 | if err != nil { | ||
339 | cs.finish(err) | ||
340 | } | ||
341 | if err == nil { | ||
342 | return | ||
343 | } | ||
344 | if err == io.EOF { | ||
345 | // Specialize the process for server streaming. SendMesg is only called | ||
346 | // once when creating the stream object. io.EOF needs to be skipped when | ||
347 | // the rpc is early finished (before the stream object is created.). | ||
348 | // TODO: It is probably better to move this into the generated code. | ||
349 | if !cs.desc.ClientStreams && cs.desc.ServerStreams { | ||
350 | err = nil | ||
351 | } | ||
352 | return | ||
353 | } | ||
354 | if _, ok := err.(transport.ConnectionError); !ok { | ||
355 | cs.closeTransportStream(err) | ||
356 | } | ||
357 | err = toRPCErr(err) | ||
358 | }() | ||
359 | var outPayload *stats.OutPayload | ||
360 | if cs.statsHandler != nil { | ||
361 | outPayload = &stats.OutPayload{ | ||
362 | Client: true, | ||
363 | } | ||
364 | } | ||
365 | out, err := encode(cs.codec, m, cs.cp, cs.cbuf, outPayload) | ||
366 | defer func() { | ||
367 | if cs.cbuf != nil { | ||
368 | cs.cbuf.Reset() | ||
369 | } | ||
370 | }() | ||
371 | if err != nil { | ||
372 | return err | ||
373 | } | ||
374 | if cs.c.maxSendMessageSize == nil { | ||
375 | return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") | ||
376 | } | ||
377 | if len(out) > *cs.c.maxSendMessageSize { | ||
378 | return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(out), *cs.c.maxSendMessageSize) | ||
379 | } | ||
380 | err = cs.t.Write(cs.s, out, &transport.Options{Last: false}) | ||
381 | if err == nil && outPayload != nil { | ||
382 | outPayload.SentTime = time.Now() | ||
383 | cs.statsHandler.HandleRPC(cs.statsCtx, outPayload) | ||
384 | } | ||
385 | return err | ||
386 | } | ||
387 | |||
388 | func (cs *clientStream) RecvMsg(m interface{}) (err error) { | ||
389 | var inPayload *stats.InPayload | ||
390 | if cs.statsHandler != nil { | ||
391 | inPayload = &stats.InPayload{ | ||
392 | Client: true, | ||
393 | } | ||
394 | } | ||
395 | if cs.c.maxReceiveMessageSize == nil { | ||
396 | return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") | ||
397 | } | ||
398 | err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, inPayload) | ||
399 | defer func() { | ||
400 | // err != nil indicates the termination of the stream. | ||
401 | if err != nil { | ||
402 | cs.finish(err) | ||
403 | } | ||
404 | }() | ||
405 | if err == nil { | ||
406 | if cs.tracing { | ||
407 | cs.mu.Lock() | ||
408 | if cs.trInfo.tr != nil { | ||
409 | cs.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) | ||
410 | } | ||
411 | cs.mu.Unlock() | ||
412 | } | ||
413 | if inPayload != nil { | ||
414 | cs.statsHandler.HandleRPC(cs.statsCtx, inPayload) | ||
415 | } | ||
416 | if !cs.desc.ClientStreams || cs.desc.ServerStreams { | ||
417 | return | ||
418 | } | ||
419 | // Special handling for client streaming rpc. | ||
420 | // This recv expects EOF or errors, so we don't collect inPayload. | ||
421 | if cs.c.maxReceiveMessageSize == nil { | ||
422 | return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") | ||
423 | } | ||
424 | err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil) | ||
425 | cs.closeTransportStream(err) | ||
426 | if err == nil { | ||
427 | return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) | ||
428 | } | ||
429 | if err == io.EOF { | ||
430 | if se := cs.s.Status().Err(); se != nil { | ||
431 | return se | ||
432 | } | ||
433 | cs.finish(err) | ||
434 | return nil | ||
435 | } | ||
436 | return toRPCErr(err) | ||
437 | } | ||
438 | if _, ok := err.(transport.ConnectionError); !ok { | ||
439 | cs.closeTransportStream(err) | ||
440 | } | ||
441 | if err == io.EOF { | ||
442 | if statusErr := cs.s.Status().Err(); statusErr != nil { | ||
443 | return statusErr | ||
444 | } | ||
445 | // Returns io.EOF to indicate the end of the stream. | ||
446 | return | ||
447 | } | ||
448 | return toRPCErr(err) | ||
449 | } | ||
450 | |||
451 | func (cs *clientStream) CloseSend() (err error) { | ||
452 | err = cs.t.Write(cs.s, nil, &transport.Options{Last: true}) | ||
453 | defer func() { | ||
454 | if err != nil { | ||
455 | cs.finish(err) | ||
456 | } | ||
457 | }() | ||
458 | if err == nil || err == io.EOF { | ||
459 | return nil | ||
460 | } | ||
461 | if _, ok := err.(transport.ConnectionError); !ok { | ||
462 | cs.closeTransportStream(err) | ||
463 | } | ||
464 | err = toRPCErr(err) | ||
465 | return | ||
466 | } | ||
467 | |||
468 | func (cs *clientStream) closeTransportStream(err error) { | ||
469 | cs.mu.Lock() | ||
470 | if cs.closed { | ||
471 | cs.mu.Unlock() | ||
472 | return | ||
473 | } | ||
474 | cs.closed = true | ||
475 | cs.mu.Unlock() | ||
476 | cs.t.CloseStream(cs.s, err) | ||
477 | } | ||
478 | |||
479 | func (cs *clientStream) finish(err error) { | ||
480 | cs.mu.Lock() | ||
481 | defer cs.mu.Unlock() | ||
482 | if cs.finished { | ||
483 | return | ||
484 | } | ||
485 | cs.finished = true | ||
486 | defer func() { | ||
487 | if cs.cancel != nil { | ||
488 | cs.cancel() | ||
489 | } | ||
490 | }() | ||
491 | for _, o := range cs.opts { | ||
492 | o.after(&cs.c) | ||
493 | } | ||
494 | if cs.put != nil { | ||
495 | updateRPCInfoInContext(cs.s.Context(), rpcInfo{ | ||
496 | bytesSent: cs.s.BytesSent(), | ||
497 | bytesReceived: cs.s.BytesReceived(), | ||
498 | }) | ||
499 | cs.put() | ||
500 | cs.put = nil | ||
501 | } | ||
502 | if cs.statsHandler != nil { | ||
503 | end := &stats.End{ | ||
504 | Client: true, | ||
505 | EndTime: time.Now(), | ||
506 | } | ||
507 | if err != io.EOF { | ||
508 | // end.Error is nil if the RPC finished successfully. | ||
509 | end.Error = toRPCErr(err) | ||
510 | } | ||
511 | cs.statsHandler.HandleRPC(cs.statsCtx, end) | ||
512 | } | ||
513 | if !cs.tracing { | ||
514 | return | ||
515 | } | ||
516 | if cs.trInfo.tr != nil { | ||
517 | if err == nil || err == io.EOF { | ||
518 | cs.trInfo.tr.LazyPrintf("RPC: [OK]") | ||
519 | } else { | ||
520 | cs.trInfo.tr.LazyPrintf("RPC: [%v]", err) | ||
521 | cs.trInfo.tr.SetError() | ||
522 | } | ||
523 | cs.trInfo.tr.Finish() | ||
524 | cs.trInfo.tr = nil | ||
525 | } | ||
526 | } | ||
527 | |||
528 | // ServerStream defines the interface a server stream has to satisfy. | ||
529 | type ServerStream interface { | ||
530 | // SetHeader sets the header metadata. It may be called multiple times. | ||
531 | // When call multiple times, all the provided metadata will be merged. | ||
532 | // All the metadata will be sent out when one of the following happens: | ||
533 | // - ServerStream.SendHeader() is called; | ||
534 | // - The first response is sent out; | ||
535 | // - An RPC status is sent out (error or success). | ||
536 | SetHeader(metadata.MD) error | ||
537 | // SendHeader sends the header metadata. | ||
538 | // The provided md and headers set by SetHeader() will be sent. | ||
539 | // It fails if called multiple times. | ||
540 | SendHeader(metadata.MD) error | ||
541 | // SetTrailer sets the trailer metadata which will be sent with the RPC status. | ||
542 | // When called more than once, all the provided metadata will be merged. | ||
543 | SetTrailer(metadata.MD) | ||
544 | Stream | ||
545 | } | ||
546 | |||
547 | // serverStream implements a server side Stream. | ||
548 | type serverStream struct { | ||
549 | t transport.ServerTransport | ||
550 | s *transport.Stream | ||
551 | p *parser | ||
552 | codec Codec | ||
553 | cp Compressor | ||
554 | dc Decompressor | ||
555 | cbuf *bytes.Buffer | ||
556 | maxReceiveMessageSize int | ||
557 | maxSendMessageSize int | ||
558 | trInfo *traceInfo | ||
559 | |||
560 | statsHandler stats.Handler | ||
561 | |||
562 | mu sync.Mutex // protects trInfo.tr after the service handler runs. | ||
563 | } | ||
564 | |||
565 | func (ss *serverStream) Context() context.Context { | ||
566 | return ss.s.Context() | ||
567 | } | ||
568 | |||
569 | func (ss *serverStream) SetHeader(md metadata.MD) error { | ||
570 | if md.Len() == 0 { | ||
571 | return nil | ||
572 | } | ||
573 | return ss.s.SetHeader(md) | ||
574 | } | ||
575 | |||
576 | func (ss *serverStream) SendHeader(md metadata.MD) error { | ||
577 | return ss.t.WriteHeader(ss.s, md) | ||
578 | } | ||
579 | |||
580 | func (ss *serverStream) SetTrailer(md metadata.MD) { | ||
581 | if md.Len() == 0 { | ||
582 | return | ||
583 | } | ||
584 | ss.s.SetTrailer(md) | ||
585 | return | ||
586 | } | ||
587 | |||
588 | func (ss *serverStream) SendMsg(m interface{}) (err error) { | ||
589 | defer func() { | ||
590 | if ss.trInfo != nil { | ||
591 | ss.mu.Lock() | ||
592 | if ss.trInfo.tr != nil { | ||
593 | if err == nil { | ||
594 | ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) | ||
595 | } else { | ||
596 | ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | ||
597 | ss.trInfo.tr.SetError() | ||
598 | } | ||
599 | } | ||
600 | ss.mu.Unlock() | ||
601 | } | ||
602 | }() | ||
603 | var outPayload *stats.OutPayload | ||
604 | if ss.statsHandler != nil { | ||
605 | outPayload = &stats.OutPayload{} | ||
606 | } | ||
607 | out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outPayload) | ||
608 | defer func() { | ||
609 | if ss.cbuf != nil { | ||
610 | ss.cbuf.Reset() | ||
611 | } | ||
612 | }() | ||
613 | if err != nil { | ||
614 | return err | ||
615 | } | ||
616 | if len(out) > ss.maxSendMessageSize { | ||
617 | return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(out), ss.maxSendMessageSize) | ||
618 | } | ||
619 | if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil { | ||
620 | return toRPCErr(err) | ||
621 | } | ||
622 | if outPayload != nil { | ||
623 | outPayload.SentTime = time.Now() | ||
624 | ss.statsHandler.HandleRPC(ss.s.Context(), outPayload) | ||
625 | } | ||
626 | return nil | ||
627 | } | ||
628 | |||
629 | func (ss *serverStream) RecvMsg(m interface{}) (err error) { | ||
630 | defer func() { | ||
631 | if ss.trInfo != nil { | ||
632 | ss.mu.Lock() | ||
633 | if ss.trInfo.tr != nil { | ||
634 | if err == nil { | ||
635 | ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) | ||
636 | } else if err != io.EOF { | ||
637 | ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | ||
638 | ss.trInfo.tr.SetError() | ||
639 | } | ||
640 | } | ||
641 | ss.mu.Unlock() | ||
642 | } | ||
643 | }() | ||
644 | var inPayload *stats.InPayload | ||
645 | if ss.statsHandler != nil { | ||
646 | inPayload = &stats.InPayload{} | ||
647 | } | ||
648 | if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, inPayload); err != nil { | ||
649 | if err == io.EOF { | ||
650 | return err | ||
651 | } | ||
652 | if err == io.ErrUnexpectedEOF { | ||
653 | err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) | ||
654 | } | ||
655 | return toRPCErr(err) | ||
656 | } | ||
657 | if inPayload != nil { | ||
658 | ss.statsHandler.HandleRPC(ss.s.Context(), inPayload) | ||
659 | } | ||
660 | return nil | ||
661 | } | ||