aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go1539
1 files changed, 1182 insertions, 357 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 1c621ba..d06279a 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -19,24 +19,35 @@
19package grpc 19package grpc
20 20
21import ( 21import (
22 "bytes" 22 "context"
23 "errors" 23 "errors"
24 "io" 24 "io"
25 "math"
26 "strconv"
25 "sync" 27 "sync"
26 "time" 28 "time"
27 29
28 "golang.org/x/net/context"
29 "golang.org/x/net/trace" 30 "golang.org/x/net/trace"
31 "google.golang.org/grpc/balancer"
30 "google.golang.org/grpc/codes" 32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/connectivity"
34 "google.golang.org/grpc/encoding"
35 "google.golang.org/grpc/grpclog"
36 "google.golang.org/grpc/internal/binarylog"
37 "google.golang.org/grpc/internal/channelz"
38 "google.golang.org/grpc/internal/grpcrand"
39 "google.golang.org/grpc/internal/transport"
31 "google.golang.org/grpc/metadata" 40 "google.golang.org/grpc/metadata"
32 "google.golang.org/grpc/peer" 41 "google.golang.org/grpc/peer"
33 "google.golang.org/grpc/stats" 42 "google.golang.org/grpc/stats"
34 "google.golang.org/grpc/status" 43 "google.golang.org/grpc/status"
35 "google.golang.org/grpc/transport"
36) 44)
37 45
38// StreamHandler defines the handler called by gRPC server to complete the 46// StreamHandler defines the handler called by gRPC server to complete the
39// execution of a streaming RPC. 47// execution of a streaming RPC. If a StreamHandler returns an error, it
48// should be produced by the status package, or else gRPC will use
49// codes.Unknown as the status code and err.Error() as the status message
50// of the RPC.
40type StreamHandler func(srv interface{}, stream ServerStream) error 51type StreamHandler func(srv interface{}, stream ServerStream) error
41 52
42// StreamDesc represents a streaming RPC service's method specification. 53// StreamDesc represents a streaming RPC service's method specification.
@@ -50,30 +61,21 @@ type StreamDesc struct {
50} 61}
51 62
52// Stream defines the common interface a client or server stream has to satisfy. 63// Stream defines the common interface a client or server stream has to satisfy.
64//
65// Deprecated: See ClientStream and ServerStream documentation instead.
53type Stream interface { 66type Stream interface {
54 // Context returns the context for this stream. 67 // Deprecated: See ClientStream and ServerStream documentation instead.
55 Context() context.Context 68 Context() context.Context
56 // SendMsg blocks until it sends m, the stream is done or the stream 69 // Deprecated: See ClientStream and ServerStream documentation instead.
57 // breaks.
58 // On error, it aborts the stream and returns an RPC status on client
59 // side. On server side, it simply returns the error to the caller.
60 // SendMsg is called by generated code. Also Users can call SendMsg
61 // directly when it is really needed in their use cases.
62 // It's safe to have a goroutine calling SendMsg and another goroutine calling
63 // recvMsg on the same stream at the same time.
64 // But it is not safe to call SendMsg on the same stream in different goroutines.
65 SendMsg(m interface{}) error 70 SendMsg(m interface{}) error
66 // RecvMsg blocks until it receives a message or the stream is 71 // Deprecated: See ClientStream and ServerStream documentation instead.
67 // done. On client side, it returns io.EOF when the stream is done. On
68 // any other error, it aborts the stream and returns an RPC status. On
69 // server side, it simply returns the error to the caller.
70 // It's safe to have a goroutine calling SendMsg and another goroutine calling
71 // recvMsg on the same stream at the same time.
72 // But it is not safe to call RecvMsg on the same stream in different goroutines.
73 RecvMsg(m interface{}) error 72 RecvMsg(m interface{}) error
74} 73}
75 74
76// ClientStream defines the interface a client stream has to satisfy. 75// ClientStream defines the client-side behavior of a streaming RPC.
76//
77// All errors returned from ClientStream methods are compatible with the
78// status package.
77type ClientStream interface { 79type ClientStream interface {
78 // Header returns the header metadata received from the server if there 80 // Header returns the header metadata received from the server if there
79 // is any. It blocks if the metadata is not ready to read. 81 // is any. It blocks if the metadata is not ready to read.
@@ -83,62 +85,147 @@ type ClientStream interface {
83 // stream.Recv has returned a non-nil error (including io.EOF). 85 // stream.Recv has returned a non-nil error (including io.EOF).
84 Trailer() metadata.MD 86 Trailer() metadata.MD
85 // CloseSend closes the send direction of the stream. It closes the stream 87 // CloseSend closes the send direction of the stream. It closes the stream
86 // when non-nil error is met. 88 // when non-nil error is met. It is also not safe to call CloseSend
89 // concurrently with SendMsg.
87 CloseSend() error 90 CloseSend() error
88 // Stream.SendMsg() may return a non-nil error when something wrong happens sending 91 // Context returns the context for this stream.
89 // the request. The returned error indicates the status of this sending, not the final 92 //
90 // status of the RPC. 93 // It should not be called until after Header or RecvMsg has returned. Once
91 // Always call Stream.RecvMsg() to get the final status if you care about the status of 94 // called, subsequent client-side retries are disabled.
92 // the RPC. 95 Context() context.Context
93 Stream 96 // SendMsg is generally called by generated code. On error, SendMsg aborts
97 // the stream. If the error was generated by the client, the status is
98 // returned directly; otherwise, io.EOF is returned and the status of
99 // the stream may be discovered using RecvMsg.
100 //
101 // SendMsg blocks until:
102 // - There is sufficient flow control to schedule m with the transport, or
103 // - The stream is done, or
104 // - The stream breaks.
105 //
106 // SendMsg does not wait until the message is received by the server. An
107 // untimely stream closure may result in lost messages. To ensure delivery,
108 // users should ensure the RPC completed successfully using RecvMsg.
109 //
110 // It is safe to have a goroutine calling SendMsg and another goroutine
111 // calling RecvMsg on the same stream at the same time, but it is not safe
112 // to call SendMsg on the same stream in different goroutines. It is also
113 // not safe to call CloseSend concurrently with SendMsg.
114 SendMsg(m interface{}) error
115 // RecvMsg blocks until it receives a message into m or the stream is
116 // done. It returns io.EOF when the stream completes successfully. On
117 // any other error, the stream is aborted and the error contains the RPC
118 // status.
119 //
120 // It is safe to have a goroutine calling SendMsg and another goroutine
121 // calling RecvMsg on the same stream at the same time, but it is not
122 // safe to call RecvMsg on the same stream in different goroutines.
123 RecvMsg(m interface{}) error
94} 124}
95 125
96// NewClientStream creates a new Stream for the client side. This is called 126// NewStream creates a new Stream for the client side. This is typically
97// by generated code. 127// called by generated code. ctx is used for the lifetime of the stream.
98func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { 128//
129// To ensure resources are not leaked due to the stream returned, one of the following
130// actions must be performed:
131//
132// 1. Call Close on the ClientConn.
133// 2. Cancel the context provided.
134// 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated
135// client-streaming RPC, for instance, might use the helper function
136// CloseAndRecv (note that CloseSend does not Recv, therefore is not
137// guaranteed to release all resources).
138// 4. Receive a non-nil, non-io.EOF error from Header or SendMsg.
139//
140// If none of the above happen, a goroutine and a context will be leaked, and grpc
141// will not call the optionally-configured stats handler with a stats.End message.
142func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) {
143 // allow interceptor to see all applicable call options, which means those
144 // configured as defaults from dial option as well as per-call options
145 opts = combine(cc.dopts.callOptions, opts)
146
99 if cc.dopts.streamInt != nil { 147 if cc.dopts.streamInt != nil {
100 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) 148 return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
101 } 149 }
102 return newClientStream(ctx, desc, cc, method, opts...) 150 return newClientStream(ctx, desc, cc, method, opts...)
103} 151}
104 152
153// NewClientStream is a wrapper for ClientConn.NewStream.
154func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
155 return cc.NewStream(ctx, desc, method, opts...)
156}
157
105func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { 158func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
106 var ( 159 if channelz.IsOn() {
107 t transport.ClientTransport 160 cc.incrCallsStarted()
108 s *transport.Stream 161 defer func() {
109 put func() 162 if err != nil {
110 cancel context.CancelFunc 163 cc.incrCallsFailed()
111 ) 164 }
112 c := defaultCallInfo 165 }()
166 }
167 c := defaultCallInfo()
168 // Provide an opportunity for the first RPC to see the first service config
169 // provided by the resolver.
170 if err := cc.waitForResolvedAddrs(ctx); err != nil {
171 return nil, err
172 }
113 mc := cc.GetMethodConfig(method) 173 mc := cc.GetMethodConfig(method)
114 if mc.WaitForReady != nil { 174 if mc.WaitForReady != nil {
115 c.failFast = !*mc.WaitForReady 175 c.failFast = !*mc.WaitForReady
116 } 176 }
117 177
118 if mc.Timeout != nil { 178 // Possible context leak:
179 // The cancel function for the child context we create will only be called
180 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
181 // an error is generated by SendMsg.
182 // https://github.com/grpc/grpc-go/issues/1818.
183 var cancel context.CancelFunc
184 if mc.Timeout != nil && *mc.Timeout >= 0 {
119 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) 185 ctx, cancel = context.WithTimeout(ctx, *mc.Timeout)
186 } else {
187 ctx, cancel = context.WithCancel(ctx)
120 } 188 }
189 defer func() {
190 if err != nil {
191 cancel()
192 }
193 }()
121 194
122 opts = append(cc.dopts.callOptions, opts...)
123 for _, o := range opts { 195 for _, o := range opts {
124 if err := o.before(&c); err != nil { 196 if err := o.before(c); err != nil {
125 return nil, toRPCErr(err) 197 return nil, toRPCErr(err)
126 } 198 }
127 } 199 }
128 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) 200 c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize)
129 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) 201 c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
202 if err := setCallInfoCodec(c); err != nil {
203 return nil, err
204 }
130 205
131 callHdr := &transport.CallHdr{ 206 callHdr := &transport.CallHdr{
132 Host: cc.authority, 207 Host: cc.authority,
133 Method: method, 208 Method: method,
134 // If it's not client streaming, we should already have the request to be sent, 209 ContentSubtype: c.contentSubtype,
135 // so we don't flush the header. 210 }
136 // If it's client streaming, the user may never send a request or send it any 211
137 // time soon, so we ask the transport to flush the header. 212 // Set our outgoing compression according to the UseCompressor CallOption, if
138 Flush: desc.ClientStreams, 213 // set. In that case, also find the compressor from the encoding package.
139 } 214 // Otherwise, use the compressor configured by the WithCompressor DialOption,
140 if cc.dopts.cp != nil { 215 // if set.
216 var cp Compressor
217 var comp encoding.Compressor
218 if ct := c.compressorType; ct != "" {
219 callHdr.SendCompress = ct
220 if ct != encoding.Identity {
221 comp = encoding.GetCompressor(ct)
222 if comp == nil {
223 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
224 }
225 }
226 } else if cc.dopts.cp != nil {
141 callHdr.SendCompress = cc.dopts.cp.Type() 227 callHdr.SendCompress = cc.dopts.cp.Type()
228 cp = cc.dopts.cp
142 } 229 }
143 if c.creds != nil { 230 if c.creds != nil {
144 callHdr.Creds = c.creds 231 callHdr.Creds = c.creds
@@ -152,380 +239,1019 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
152 } 239 }
153 trInfo.tr.LazyLog(&trInfo.firstLine, false) 240 trInfo.tr.LazyLog(&trInfo.firstLine, false)
154 ctx = trace.NewContext(ctx, trInfo.tr) 241 ctx = trace.NewContext(ctx, trInfo.tr)
155 defer func() {
156 if err != nil {
157 // Need to call tr.finish() if error is returned.
158 // Because tr will not be returned to caller.
159 trInfo.tr.LazyPrintf("RPC: [%v]", err)
160 trInfo.tr.SetError()
161 trInfo.tr.Finish()
162 }
163 }()
164 } 242 }
165 ctx = newContextWithRPCInfo(ctx) 243 ctx = newContextWithRPCInfo(ctx, c.failFast)
166 sh := cc.dopts.copts.StatsHandler 244 sh := cc.dopts.copts.StatsHandler
245 var beginTime time.Time
167 if sh != nil { 246 if sh != nil {
168 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) 247 ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
248 beginTime = time.Now()
169 begin := &stats.Begin{ 249 begin := &stats.Begin{
170 Client: true, 250 Client: true,
171 BeginTime: time.Now(), 251 BeginTime: beginTime,
172 FailFast: c.failFast, 252 FailFast: c.failFast,
173 } 253 }
174 sh.HandleRPC(ctx, begin) 254 sh.HandleRPC(ctx, begin)
175 defer func() {
176 if err != nil {
177 // Only handle end stats if err != nil.
178 end := &stats.End{
179 Client: true,
180 Error: err,
181 }
182 sh.HandleRPC(ctx, end)
183 }
184 }()
185 } 255 }
186 gopts := BalancerGetOptions{ 256
187 BlockingWait: !c.failFast, 257 cs := &clientStream{
258 callHdr: callHdr,
259 ctx: ctx,
260 methodConfig: &mc,
261 opts: opts,
262 callInfo: c,
263 cc: cc,
264 desc: desc,
265 codec: c.codec,
266 cp: cp,
267 comp: comp,
268 cancel: cancel,
269 beginTime: beginTime,
270 firstAttempt: true,
271 }
272 if !cc.dopts.disableRetry {
273 cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler)
188 } 274 }
189 for { 275 cs.binlog = binarylog.GetMethodLogger(method)
190 t, put, err = cc.getTransport(ctx, gopts)
191 if err != nil {
192 // TODO(zhaoq): Probably revisit the error handling.
193 if _, ok := status.FromError(err); ok {
194 return nil, err
195 }
196 if err == errConnClosing || err == errConnUnavailable {
197 if c.failFast {
198 return nil, Errorf(codes.Unavailable, "%v", err)
199 }
200 continue
201 }
202 // All the other errors are treated as Internal errors.
203 return nil, Errorf(codes.Internal, "%v", err)
204 }
205 276
206 s, err = t.NewStream(ctx, callHdr) 277 cs.callInfo.stream = cs
207 if err != nil { 278 // Only this initial attempt has stats/tracing.
208 if _, ok := err.(transport.ConnectionError); ok && put != nil { 279 // TODO(dfawley): move to newAttempt when per-attempt stats are implemented.
209 // If error is connection error, transport was sending data on wire, 280 if err := cs.newAttemptLocked(sh, trInfo); err != nil {
210 // and we are not sure if anything has been sent on wire. 281 cs.finish(err)
211 // If error is not connection error, we are sure nothing has been sent. 282 return nil, err
212 updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false}) 283 }
213 } 284
214 if put != nil { 285 op := func(a *csAttempt) error { return a.newStream() }
215 put() 286 if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
216 put = nil 287 cs.finish(err)
217 } 288 return nil, err
218 if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { 289 }
219 continue 290
291 if cs.binlog != nil {
292 md, _ := metadata.FromOutgoingContext(ctx)
293 logEntry := &binarylog.ClientHeader{
294 OnClientSide: true,
295 Header: md,
296 MethodName: method,
297 Authority: cs.cc.authority,
298 }
299 if deadline, ok := ctx.Deadline(); ok {
300 logEntry.Timeout = deadline.Sub(time.Now())
301 if logEntry.Timeout < 0 {
302 logEntry.Timeout = 0
220 } 303 }
221 return nil, toRPCErr(err)
222 } 304 }
223 break 305 cs.binlog.Log(logEntry)
224 } 306 }
225 // Set callInfo.peer object from stream's context. 307
226 if peer, ok := peer.FromContext(s.Context()); ok { 308 if desc != unaryStreamDesc {
227 c.peer = peer 309 // Listen on cc and stream contexts to cleanup when the user closes the
310 // ClientConn or cancels the stream context. In all other cases, an error
311 // should already be injected into the recv buffer by the transport, which
312 // the client will eventually receive, and then we will cancel the stream's
313 // context in clientStream.finish.
314 go func() {
315 select {
316 case <-cc.ctx.Done():
317 cs.finish(ErrClientConnClosing)
318 case <-ctx.Done():
319 cs.finish(toRPCErr(ctx.Err()))
320 }
321 }()
228 } 322 }
229 cs := &clientStream{
230 opts: opts,
231 c: c,
232 desc: desc,
233 codec: cc.dopts.codec,
234 cp: cc.dopts.cp,
235 dc: cc.dopts.dc,
236 cancel: cancel,
237
238 put: put,
239 t: t,
240 s: s,
241 p: &parser{r: s},
242
243 tracing: EnableTracing,
244 trInfo: trInfo,
245
246 statsCtx: ctx,
247 statsHandler: cc.dopts.copts.StatsHandler,
248 }
249 if cc.dopts.cp != nil {
250 cs.cbuf = new(bytes.Buffer)
251 }
252 // Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination
253 // when there is no pending I/O operations on this stream.
254 go func() {
255 select {
256 case <-t.Error():
257 // Incur transport error, simply exit.
258 case <-cc.ctx.Done():
259 cs.finish(ErrClientConnClosing)
260 cs.closeTransportStream(ErrClientConnClosing)
261 case <-s.Done():
262 // TODO: The trace of the RPC is terminated here when there is no pending
263 // I/O, which is probably not the optimal solution.
264 cs.finish(s.Status().Err())
265 cs.closeTransportStream(nil)
266 case <-s.GoAway():
267 cs.finish(errConnDrain)
268 cs.closeTransportStream(errConnDrain)
269 case <-s.Context().Done():
270 err := s.Context().Err()
271 cs.finish(err)
272 cs.closeTransportStream(transport.ContextErr(err))
273 }
274 }()
275 return cs, nil 323 return cs, nil
276} 324}
277 325
326func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error {
327 cs.attempt = &csAttempt{
328 cs: cs,
329 dc: cs.cc.dopts.dc,
330 statsHandler: sh,
331 trInfo: trInfo,
332 }
333
334 if err := cs.ctx.Err(); err != nil {
335 return toRPCErr(err)
336 }
337 t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method)
338 if err != nil {
339 return err
340 }
341 cs.attempt.t = t
342 cs.attempt.done = done
343 return nil
344}
345
346func (a *csAttempt) newStream() error {
347 cs := a.cs
348 cs.callHdr.PreviousAttempts = cs.numRetries
349 s, err := a.t.NewStream(cs.ctx, cs.callHdr)
350 if err != nil {
351 return toRPCErr(err)
352 }
353 cs.attempt.s = s
354 cs.attempt.p = &parser{r: s}
355 return nil
356}
357
278// clientStream implements a client side Stream. 358// clientStream implements a client side Stream.
279type clientStream struct { 359type clientStream struct {
280 opts []CallOption 360 callHdr *transport.CallHdr
281 c callInfo 361 opts []CallOption
282 t transport.ClientTransport 362 callInfo *callInfo
283 s *transport.Stream 363 cc *ClientConn
284 p *parser 364 desc *StreamDesc
285 desc *StreamDesc 365
286 codec Codec 366 codec baseCodec
287 cp Compressor 367 cp Compressor
288 cbuf *bytes.Buffer 368 comp encoding.Compressor
289 dc Decompressor 369
290 cancel context.CancelFunc 370 cancel context.CancelFunc // cancels all attempts
371
372 sentLast bool // sent an end stream
373 beginTime time.Time
374
375 methodConfig *MethodConfig
376
377 ctx context.Context // the application's context, wrapped by stats/tracing
291 378
292 tracing bool // set to EnableTracing when the clientStream is created. 379 retryThrottler *retryThrottler // The throttler active when the RPC began.
293 380
294 mu sync.Mutex 381 binlog *binarylog.MethodLogger // Binary logger, can be nil.
295 put func() 382 // serverHeaderBinlogged is a boolean for whether server header has been
296 closed bool 383 // logged. Server header will be logged when the first time one of those
297 finished bool 384 // happens: stream.Header(), stream.Recv().
298 // trInfo.tr is set when the clientStream is created (if EnableTracing is true), 385 //
299 // and is set to nil when the clientStream's finish method is called. 386 // It's only read and used by Recv() and Header(), so it doesn't need to be
387 // synchronized.
388 serverHeaderBinlogged bool
389
390 mu sync.Mutex
391 firstAttempt bool // if true, transparent retry is valid
392 numRetries int // exclusive of transparent retry attempt(s)
393 numRetriesSincePushback int // retries since pushback; to reset backoff
394 finished bool // TODO: replace with atomic cmpxchg or sync.Once?
395 attempt *csAttempt // the active client stream attempt
396 // TODO(hedging): hedging will have multiple attempts simultaneously.
397 committed bool // active attempt committed for retry?
398 buffer []func(a *csAttempt) error // operations to replay on retry
399 bufferSize int // current size of buffer
400}
401
402// csAttempt implements a single transport stream attempt within a
403// clientStream.
404type csAttempt struct {
405 cs *clientStream
406 t transport.ClientTransport
407 s *transport.Stream
408 p *parser
409 done func(balancer.DoneInfo)
410
411 finished bool
412 dc Decompressor
413 decomp encoding.Compressor
414 decompSet bool
415
416 mu sync.Mutex // guards trInfo.tr
417 // trInfo.tr is set when created (if EnableTracing is true),
418 // and cleared when the finish method is called.
300 trInfo traceInfo 419 trInfo traceInfo
301 420
302 // statsCtx keeps the user context for stats handling.
303 // All stats collection should use the statsCtx (instead of the stream context)
304 // so that all the generated stats for a particular RPC can be associated in the processing phase.
305 statsCtx context.Context
306 statsHandler stats.Handler 421 statsHandler stats.Handler
307} 422}
308 423
424func (cs *clientStream) commitAttemptLocked() {
425 cs.committed = true
426 cs.buffer = nil
427}
428
429func (cs *clientStream) commitAttempt() {
430 cs.mu.Lock()
431 cs.commitAttemptLocked()
432 cs.mu.Unlock()
433}
434
435// shouldRetry returns nil if the RPC should be retried; otherwise it returns
436// the error that should be returned by the operation.
437func (cs *clientStream) shouldRetry(err error) error {
438 if cs.attempt.s == nil && !cs.callInfo.failFast {
439 // In the event of any error from NewStream (attempt.s == nil), we
440 // never attempted to write anything to the wire, so we can retry
441 // indefinitely for non-fail-fast RPCs.
442 return nil
443 }
444 if cs.finished || cs.committed {
445 // RPC is finished or committed; cannot retry.
446 return err
447 }
448 // Wait for the trailers.
449 if cs.attempt.s != nil {
450 <-cs.attempt.s.Done()
451 }
452 if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
453 // First attempt, wait-for-ready, stream unprocessed: transparently retry.
454 cs.firstAttempt = false
455 return nil
456 }
457 cs.firstAttempt = false
458 if cs.cc.dopts.disableRetry {
459 return err
460 }
461
462 pushback := 0
463 hasPushback := false
464 if cs.attempt.s != nil {
465 if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to {
466 return err
467 }
468
469 // TODO(retry): Move down if the spec changes to not check server pushback
470 // before considering this a failure for throttling.
471 sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"]
472 if len(sps) == 1 {
473 var e error
474 if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 {
475 grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0])
476 cs.retryThrottler.throttle() // This counts as a failure for throttling.
477 return err
478 }
479 hasPushback = true
480 } else if len(sps) > 1 {
481 grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps)
482 cs.retryThrottler.throttle() // This counts as a failure for throttling.
483 return err
484 }
485 }
486
487 var code codes.Code
488 if cs.attempt.s != nil {
489 code = cs.attempt.s.Status().Code()
490 } else {
491 code = status.Convert(err).Code()
492 }
493
494 rp := cs.methodConfig.retryPolicy
495 if rp == nil || !rp.retryableStatusCodes[code] {
496 return err
497 }
498
499 // Note: the ordering here is important; we count this as a failure
500 // only if the code matched a retryable code.
501 if cs.retryThrottler.throttle() {
502 return err
503 }
504 if cs.numRetries+1 >= rp.maxAttempts {
505 return err
506 }
507
508 var dur time.Duration
509 if hasPushback {
510 dur = time.Millisecond * time.Duration(pushback)
511 cs.numRetriesSincePushback = 0
512 } else {
513 fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback))
514 cur := float64(rp.initialBackoff) * fact
515 if max := float64(rp.maxBackoff); cur > max {
516 cur = max
517 }
518 dur = time.Duration(grpcrand.Int63n(int64(cur)))
519 cs.numRetriesSincePushback++
520 }
521
522 // TODO(dfawley): we could eagerly fail here if dur puts us past the
523 // deadline, but unsure if it is worth doing.
524 t := time.NewTimer(dur)
525 select {
526 case <-t.C:
527 cs.numRetries++
528 return nil
529 case <-cs.ctx.Done():
530 t.Stop()
531 return status.FromContextError(cs.ctx.Err()).Err()
532 }
533}
534
535// Returns nil if a retry was performed and succeeded; error otherwise.
536func (cs *clientStream) retryLocked(lastErr error) error {
537 for {
538 cs.attempt.finish(lastErr)
539 if err := cs.shouldRetry(lastErr); err != nil {
540 cs.commitAttemptLocked()
541 return err
542 }
543 if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil {
544 return err
545 }
546 if lastErr = cs.replayBufferLocked(); lastErr == nil {
547 return nil
548 }
549 }
550}
551
309func (cs *clientStream) Context() context.Context { 552func (cs *clientStream) Context() context.Context {
310 return cs.s.Context() 553 cs.commitAttempt()
554 // No need to lock before using attempt, since we know it is committed and
555 // cannot change.
556 return cs.attempt.s.Context()
557}
558
559func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
560 cs.mu.Lock()
561 for {
562 if cs.committed {
563 cs.mu.Unlock()
564 return op(cs.attempt)
565 }
566 a := cs.attempt
567 cs.mu.Unlock()
568 err := op(a)
569 cs.mu.Lock()
570 if a != cs.attempt {
571 // We started another attempt already.
572 continue
573 }
574 if err == io.EOF {
575 <-a.s.Done()
576 }
577 if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
578 onSuccess()
579 cs.mu.Unlock()
580 return err
581 }
582 if err := cs.retryLocked(err); err != nil {
583 cs.mu.Unlock()
584 return err
585 }
586 }
311} 587}
312 588
313func (cs *clientStream) Header() (metadata.MD, error) { 589func (cs *clientStream) Header() (metadata.MD, error) {
314 m, err := cs.s.Header() 590 var m metadata.MD
591 err := cs.withRetry(func(a *csAttempt) error {
592 var err error
593 m, err = a.s.Header()
594 return toRPCErr(err)
595 }, cs.commitAttemptLocked)
315 if err != nil { 596 if err != nil {
316 if _, ok := err.(transport.ConnectionError); !ok { 597 cs.finish(err)
317 cs.closeTransportStream(err) 598 return nil, err
599 }
600 if cs.binlog != nil && !cs.serverHeaderBinlogged {
601 // Only log if binary log is on and header has not been logged.
602 logEntry := &binarylog.ServerHeader{
603 OnClientSide: true,
604 Header: m,
605 PeerAddr: nil,
318 } 606 }
607 if peer, ok := peer.FromContext(cs.Context()); ok {
608 logEntry.PeerAddr = peer.Addr
609 }
610 cs.binlog.Log(logEntry)
611 cs.serverHeaderBinlogged = true
319 } 612 }
320 return m, err 613 return m, err
321} 614}
322 615
323func (cs *clientStream) Trailer() metadata.MD { 616func (cs *clientStream) Trailer() metadata.MD {
324 return cs.s.Trailer() 617 // On RPC failure, we never need to retry, because usage requires that
618 // RecvMsg() returned a non-nil error before calling this function is valid.
619 // We would have retried earlier if necessary.
620 //
621 // Commit the attempt anyway, just in case users are not following those
622 // directions -- it will prevent races and should not meaningfully impact
623 // performance.
624 cs.commitAttempt()
625 if cs.attempt.s == nil {
626 return nil
627 }
628 return cs.attempt.s.Trailer()
325} 629}
326 630
327func (cs *clientStream) SendMsg(m interface{}) (err error) { 631func (cs *clientStream) replayBufferLocked() error {
328 if cs.tracing { 632 a := cs.attempt
329 cs.mu.Lock() 633 for _, f := range cs.buffer {
330 if cs.trInfo.tr != nil { 634 if err := f(a); err != nil {
331 cs.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) 635 return err
332 } 636 }
333 cs.mu.Unlock()
334 } 637 }
335 // TODO Investigate how to signal the stats handling party. 638 return nil
336 // generate error stats if err != nil && err != io.EOF? 639}
640
641func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) {
642 // Note: we still will buffer if retry is disabled (for transparent retries).
643 if cs.committed {
644 return
645 }
646 cs.bufferSize += sz
647 if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize {
648 cs.commitAttemptLocked()
649 return
650 }
651 cs.buffer = append(cs.buffer, op)
652}
653
654func (cs *clientStream) SendMsg(m interface{}) (err error) {
337 defer func() { 655 defer func() {
338 if err != nil { 656 if err != nil && err != io.EOF {
657 // Call finish on the client stream for errors generated by this SendMsg
658 // call, as these indicate problems created by this client. (Transport
659 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
660 // error will be returned from RecvMsg eventually in that case, or be
661 // retried.)
339 cs.finish(err) 662 cs.finish(err)
340 } 663 }
341 if err == nil {
342 return
343 }
344 if err == io.EOF {
345 // Specialize the process for server streaming. SendMesg is only called
346 // once when creating the stream object. io.EOF needs to be skipped when
347 // the rpc is early finished (before the stream object is created.).
348 // TODO: It is probably better to move this into the generated code.
349 if !cs.desc.ClientStreams && cs.desc.ServerStreams {
350 err = nil
351 }
352 return
353 }
354 if _, ok := err.(transport.ConnectionError); !ok {
355 cs.closeTransportStream(err)
356 }
357 err = toRPCErr(err)
358 }() 664 }()
359 var outPayload *stats.OutPayload 665 if cs.sentLast {
360 if cs.statsHandler != nil { 666 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
361 outPayload = &stats.OutPayload{
362 Client: true,
363 }
364 } 667 }
365 out, err := encode(cs.codec, m, cs.cp, cs.cbuf, outPayload) 668 if !cs.desc.ClientStreams {
366 defer func() { 669 cs.sentLast = true
367 if cs.cbuf != nil { 670 }
368 cs.cbuf.Reset() 671 data, err := encode(cs.codec, m)
369 }
370 }()
371 if err != nil { 672 if err != nil {
372 return err 673 return err
373 } 674 }
374 if cs.c.maxSendMessageSize == nil { 675 compData, err := compress(data, cs.cp, cs.comp)
375 return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") 676 if err != nil {
677 return err
376 } 678 }
377 if len(out) > *cs.c.maxSendMessageSize { 679 hdr, payload := msgHeader(data, compData)
378 return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(out), *cs.c.maxSendMessageSize) 680 // TODO(dfawley): should we be checking len(data) instead?
681 if len(payload) > *cs.callInfo.maxSendMessageSize {
682 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize)
379 } 683 }
380 err = cs.t.Write(cs.s, out, &transport.Options{Last: false}) 684 msgBytes := data // Store the pointer before setting to nil. For binary logging.
381 if err == nil && outPayload != nil { 685 op := func(a *csAttempt) error {
382 outPayload.SentTime = time.Now() 686 err := a.sendMsg(m, hdr, payload, data)
383 cs.statsHandler.HandleRPC(cs.statsCtx, outPayload) 687 // nil out the message and uncomp when replaying; they are only needed for
688 // stats which is disabled for subsequent attempts.
689 m, data = nil, nil
690 return err
384 } 691 }
385 return err 692 err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
693 if cs.binlog != nil && err == nil {
694 cs.binlog.Log(&binarylog.ClientMessage{
695 OnClientSide: true,
696 Message: msgBytes,
697 })
698 }
699 return
386} 700}
387 701
388func (cs *clientStream) RecvMsg(m interface{}) (err error) { 702func (cs *clientStream) RecvMsg(m interface{}) error {
389 var inPayload *stats.InPayload 703 if cs.binlog != nil && !cs.serverHeaderBinlogged {
390 if cs.statsHandler != nil { 704 // Call Header() to binary log header if it's not already logged.
391 inPayload = &stats.InPayload{ 705 cs.Header()
392 Client: true,
393 }
394 } 706 }
395 if cs.c.maxReceiveMessageSize == nil { 707 var recvInfo *payloadInfo
396 return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") 708 if cs.binlog != nil {
709 recvInfo = &payloadInfo{}
397 } 710 }
398 err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, inPayload) 711 err := cs.withRetry(func(a *csAttempt) error {
399 defer func() { 712 return a.recvMsg(m, recvInfo)
400 // err != nil indicates the termination of the stream. 713 }, cs.commitAttemptLocked)
401 if err != nil { 714 if cs.binlog != nil && err == nil {
402 cs.finish(err) 715 cs.binlog.Log(&binarylog.ServerMessage{
716 OnClientSide: true,
717 Message: recvInfo.uncompressedBytes,
718 })
719 }
720 if err != nil || !cs.desc.ServerStreams {
721 // err != nil or non-server-streaming indicates end of stream.
722 cs.finish(err)
723
724 if cs.binlog != nil {
725 // finish will not log Trailer. Log Trailer here.
726 logEntry := &binarylog.ServerTrailer{
727 OnClientSide: true,
728 Trailer: cs.Trailer(),
729 Err: err,
730 }
731 if logEntry.Err == io.EOF {
732 logEntry.Err = nil
733 }
734 if peer, ok := peer.FromContext(cs.Context()); ok {
735 logEntry.PeerAddr = peer.Addr
736 }
737 cs.binlog.Log(logEntry)
403 } 738 }
404 }() 739 }
740 return err
741}
742
743func (cs *clientStream) CloseSend() error {
744 if cs.sentLast {
745 // TODO: return an error and finish the stream instead, due to API misuse?
746 return nil
747 }
748 cs.sentLast = true
749 op := func(a *csAttempt) error {
750 a.t.Write(a.s, nil, nil, &transport.Options{Last: true})
751 // Always return nil; io.EOF is the only error that might make sense
752 // instead, but there is no need to signal the client to call RecvMsg
753 // as the only use left for the stream after CloseSend is to call
754 // RecvMsg. This also matches historical behavior.
755 return nil
756 }
757 cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) })
758 if cs.binlog != nil {
759 cs.binlog.Log(&binarylog.ClientHalfClose{
760 OnClientSide: true,
761 })
762 }
763 // We never returned an error here for reasons.
764 return nil
765}
766
767func (cs *clientStream) finish(err error) {
768 if err == io.EOF {
769 // Ending a stream with EOF indicates a success.
770 err = nil
771 }
772 cs.mu.Lock()
773 if cs.finished {
774 cs.mu.Unlock()
775 return
776 }
777 cs.finished = true
778 cs.commitAttemptLocked()
779 cs.mu.Unlock()
780 // For binary logging. only log cancel in finish (could be caused by RPC ctx
781 // canceled or ClientConn closed). Trailer will be logged in RecvMsg.
782 //
783 // Only one of cancel or trailer needs to be logged. In the cases where
784 // users don't call RecvMsg, users must have already canceled the RPC.
785 if cs.binlog != nil && status.Code(err) == codes.Canceled {
786 cs.binlog.Log(&binarylog.Cancel{
787 OnClientSide: true,
788 })
789 }
405 if err == nil { 790 if err == nil {
406 if cs.tracing { 791 cs.retryThrottler.successfulRPC()
407 cs.mu.Lock() 792 }
408 if cs.trInfo.tr != nil { 793 if channelz.IsOn() {
409 cs.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) 794 if err != nil {
410 } 795 cs.cc.incrCallsFailed()
411 cs.mu.Unlock() 796 } else {
797 cs.cc.incrCallsSucceeded()
412 } 798 }
413 if inPayload != nil { 799 }
414 cs.statsHandler.HandleRPC(cs.statsCtx, inPayload) 800 if cs.attempt != nil {
801 cs.attempt.finish(err)
802 }
803 // after functions all rely upon having a stream.
804 if cs.attempt.s != nil {
805 for _, o := range cs.opts {
806 o.after(cs.callInfo)
415 } 807 }
416 if !cs.desc.ClientStreams || cs.desc.ServerStreams { 808 }
417 return 809 cs.cancel()
810}
811
812func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
813 cs := a.cs
814 if EnableTracing {
815 a.mu.Lock()
816 if a.trInfo.tr != nil {
817 a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true)
418 } 818 }
419 // Special handling for client streaming rpc. 819 a.mu.Unlock()
420 // This recv expects EOF or errors, so we don't collect inPayload. 820 }
421 if cs.c.maxReceiveMessageSize == nil { 821 if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
422 return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") 822 if !cs.desc.ClientStreams {
823 // For non-client-streaming RPCs, we return nil instead of EOF on error
824 // because the generated code requires it. finish is not called; RecvMsg()
825 // will call it with the stream's status independently.
826 return nil
423 } 827 }
424 err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil) 828 return io.EOF
425 cs.closeTransportStream(err) 829 }
426 if err == nil { 830 if a.statsHandler != nil {
427 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) 831 a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now()))
832 }
833 if channelz.IsOn() {
834 a.t.IncrMsgSent()
835 }
836 return nil
837}
838
839func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
840 cs := a.cs
841 if a.statsHandler != nil && payInfo == nil {
842 payInfo = &payloadInfo{}
843 }
844
845 if !a.decompSet {
846 // Block until we receive headers containing received message encoding.
847 if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity {
848 if a.dc == nil || a.dc.Type() != ct {
849 // No configured decompressor, or it does not match the incoming
850 // message encoding; attempt to find a registered compressor that does.
851 a.dc = nil
852 a.decomp = encoding.GetCompressor(ct)
853 }
854 } else {
855 // No compression is used; disable our decompressor.
856 a.dc = nil
428 } 857 }
858 // Only initialize this state once per stream.
859 a.decompSet = true
860 }
861 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
862 if err != nil {
429 if err == io.EOF { 863 if err == io.EOF {
430 if se := cs.s.Status().Err(); se != nil { 864 if statusErr := a.s.Status().Err(); statusErr != nil {
431 return se 865 return statusErr
432 } 866 }
433 cs.finish(err) 867 return io.EOF // indicates successful end of stream.
434 return nil
435 } 868 }
436 return toRPCErr(err) 869 return toRPCErr(err)
437 } 870 }
438 if _, ok := err.(transport.ConnectionError); !ok { 871 if EnableTracing {
439 cs.closeTransportStream(err) 872 a.mu.Lock()
873 if a.trInfo.tr != nil {
874 a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true)
875 }
876 a.mu.Unlock()
877 }
878 if a.statsHandler != nil {
879 a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{
880 Client: true,
881 RecvTime: time.Now(),
882 Payload: m,
883 // TODO truncate large payload.
884 Data: payInfo.uncompressedBytes,
885 Length: len(payInfo.uncompressedBytes),
886 })
887 }
888 if channelz.IsOn() {
889 a.t.IncrMsgRecv()
890 }
891 if cs.desc.ServerStreams {
892 // Subsequent messages should be received by subsequent RecvMsg calls.
893 return nil
894 }
895 // Special handling for non-server-stream rpcs.
896 // This recv expects EOF or errors, so we don't collect inPayload.
897 err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp)
898 if err == nil {
899 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
440 } 900 }
441 if err == io.EOF { 901 if err == io.EOF {
442 if statusErr := cs.s.Status().Err(); statusErr != nil { 902 return a.s.Status().Err() // non-server streaming Recv returns nil on success
443 return statusErr
444 }
445 // Returns io.EOF to indicate the end of the stream.
446 return
447 } 903 }
448 return toRPCErr(err) 904 return toRPCErr(err)
449} 905}
450 906
451func (cs *clientStream) CloseSend() (err error) { 907func (a *csAttempt) finish(err error) {
452 err = cs.t.Write(cs.s, nil, &transport.Options{Last: true}) 908 a.mu.Lock()
909 if a.finished {
910 a.mu.Unlock()
911 return
912 }
913 a.finished = true
914 if err == io.EOF {
915 // Ending a stream with EOF indicates a success.
916 err = nil
917 }
918 if a.s != nil {
919 a.t.CloseStream(a.s, err)
920 }
921
922 if a.done != nil {
923 br := false
924 var tr metadata.MD
925 if a.s != nil {
926 br = a.s.BytesReceived()
927 tr = a.s.Trailer()
928 }
929 a.done(balancer.DoneInfo{
930 Err: err,
931 Trailer: tr,
932 BytesSent: a.s != nil,
933 BytesReceived: br,
934 })
935 }
936 if a.statsHandler != nil {
937 end := &stats.End{
938 Client: true,
939 BeginTime: a.cs.beginTime,
940 EndTime: time.Now(),
941 Error: err,
942 }
943 a.statsHandler.HandleRPC(a.cs.ctx, end)
944 }
945 if a.trInfo.tr != nil {
946 if err == nil {
947 a.trInfo.tr.LazyPrintf("RPC: [OK]")
948 } else {
949 a.trInfo.tr.LazyPrintf("RPC: [%v]", err)
950 a.trInfo.tr.SetError()
951 }
952 a.trInfo.tr.Finish()
953 a.trInfo.tr = nil
954 }
955 a.mu.Unlock()
956}
957
958func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) {
959 ac.mu.Lock()
960 if ac.transport != t {
961 ac.mu.Unlock()
962 return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use")
963 }
964 // transition to CONNECTING state when an attempt starts
965 if ac.state != connectivity.Connecting {
966 ac.updateConnectivityState(connectivity.Connecting)
967 ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
968 }
969 ac.mu.Unlock()
970
971 if t == nil {
972 // TODO: return RPC error here?
973 return nil, errors.New("transport provided is nil")
974 }
975 // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct.
976 c := &callInfo{}
977
978 for _, o := range opts {
979 if err := o.before(c); err != nil {
980 return nil, toRPCErr(err)
981 }
982 }
983 c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize)
984 c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize)
985
986 // Possible context leak:
987 // The cancel function for the child context we create will only be called
988 // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if
989 // an error is generated by SendMsg.
990 // https://github.com/grpc/grpc-go/issues/1818.
991 ctx, cancel := context.WithCancel(ctx)
453 defer func() { 992 defer func() {
454 if err != nil { 993 if err != nil {
455 cs.finish(err) 994 cancel()
456 } 995 }
457 }() 996 }()
458 if err == nil || err == io.EOF { 997
459 return nil 998 if err := setCallInfoCodec(c); err != nil {
999 return nil, err
460 } 1000 }
461 if _, ok := err.(transport.ConnectionError); !ok { 1001
462 cs.closeTransportStream(err) 1002 callHdr := &transport.CallHdr{
1003 Host: ac.cc.authority,
1004 Method: method,
1005 ContentSubtype: c.contentSubtype,
463 } 1006 }
464 err = toRPCErr(err) 1007
465 return 1008 // Set our outgoing compression according to the UseCompressor CallOption, if
1009 // set. In that case, also find the compressor from the encoding package.
1010 // Otherwise, use the compressor configured by the WithCompressor DialOption,
1011 // if set.
1012 var cp Compressor
1013 var comp encoding.Compressor
1014 if ct := c.compressorType; ct != "" {
1015 callHdr.SendCompress = ct
1016 if ct != encoding.Identity {
1017 comp = encoding.GetCompressor(ct)
1018 if comp == nil {
1019 return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
1020 }
1021 }
1022 } else if ac.cc.dopts.cp != nil {
1023 callHdr.SendCompress = ac.cc.dopts.cp.Type()
1024 cp = ac.cc.dopts.cp
1025 }
1026 if c.creds != nil {
1027 callHdr.Creds = c.creds
1028 }
1029
1030 as := &addrConnStream{
1031 callHdr: callHdr,
1032 ac: ac,
1033 ctx: ctx,
1034 cancel: cancel,
1035 opts: opts,
1036 callInfo: c,
1037 desc: desc,
1038 codec: c.codec,
1039 cp: cp,
1040 comp: comp,
1041 t: t,
1042 }
1043
1044 as.callInfo.stream = as
1045 s, err := as.t.NewStream(as.ctx, as.callHdr)
1046 if err != nil {
1047 err = toRPCErr(err)
1048 return nil, err
1049 }
1050 as.s = s
1051 as.p = &parser{r: s}
1052 ac.incrCallsStarted()
1053 if desc != unaryStreamDesc {
1054 // Listen on cc and stream contexts to cleanup when the user closes the
1055 // ClientConn or cancels the stream context. In all other cases, an error
1056 // should already be injected into the recv buffer by the transport, which
1057 // the client will eventually receive, and then we will cancel the stream's
1058 // context in clientStream.finish.
1059 go func() {
1060 select {
1061 case <-ac.ctx.Done():
1062 as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing"))
1063 case <-ctx.Done():
1064 as.finish(toRPCErr(ctx.Err()))
1065 }
1066 }()
1067 }
1068 return as, nil
466} 1069}
467 1070
468func (cs *clientStream) closeTransportStream(err error) { 1071type addrConnStream struct {
469 cs.mu.Lock() 1072 s *transport.Stream
470 if cs.closed { 1073 ac *addrConn
471 cs.mu.Unlock() 1074 callHdr *transport.CallHdr
472 return 1075 cancel context.CancelFunc
1076 opts []CallOption
1077 callInfo *callInfo
1078 t transport.ClientTransport
1079 ctx context.Context
1080 sentLast bool
1081 desc *StreamDesc
1082 codec baseCodec
1083 cp Compressor
1084 comp encoding.Compressor
1085 decompSet bool
1086 dc Decompressor
1087 decomp encoding.Compressor
1088 p *parser
1089 done func(balancer.DoneInfo)
1090 mu sync.Mutex
1091 finished bool
1092}
1093
1094func (as *addrConnStream) Header() (metadata.MD, error) {
1095 m, err := as.s.Header()
1096 if err != nil {
1097 as.finish(toRPCErr(err))
473 } 1098 }
474 cs.closed = true 1099 return m, err
475 cs.mu.Unlock()
476 cs.t.CloseStream(cs.s, err)
477} 1100}
478 1101
479func (cs *clientStream) finish(err error) { 1102func (as *addrConnStream) Trailer() metadata.MD {
480 cs.mu.Lock() 1103 return as.s.Trailer()
481 defer cs.mu.Unlock() 1104}
482 if cs.finished { 1105
483 return 1106func (as *addrConnStream) CloseSend() error {
1107 if as.sentLast {
1108 // TODO: return an error and finish the stream instead, due to API misuse?
1109 return nil
484 } 1110 }
485 cs.finished = true 1111 as.sentLast = true
1112
1113 as.t.Write(as.s, nil, nil, &transport.Options{Last: true})
1114 // Always return nil; io.EOF is the only error that might make sense
1115 // instead, but there is no need to signal the client to call RecvMsg
1116 // as the only use left for the stream after CloseSend is to call
1117 // RecvMsg. This also matches historical behavior.
1118 return nil
1119}
1120
1121func (as *addrConnStream) Context() context.Context {
1122 return as.s.Context()
1123}
1124
1125func (as *addrConnStream) SendMsg(m interface{}) (err error) {
486 defer func() { 1126 defer func() {
487 if cs.cancel != nil { 1127 if err != nil && err != io.EOF {
488 cs.cancel() 1128 // Call finish on the client stream for errors generated by this SendMsg
1129 // call, as these indicate problems created by this client. (Transport
1130 // errors are converted to an io.EOF error in csAttempt.sendMsg; the real
1131 // error will be returned from RecvMsg eventually in that case, or be
1132 // retried.)
1133 as.finish(err)
489 } 1134 }
490 }() 1135 }()
491 for _, o := range cs.opts { 1136 if as.sentLast {
492 o.after(&cs.c) 1137 return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
493 } 1138 }
494 if cs.put != nil { 1139 if !as.desc.ClientStreams {
495 updateRPCInfoInContext(cs.s.Context(), rpcInfo{ 1140 as.sentLast = true
496 bytesSent: cs.s.BytesSent(),
497 bytesReceived: cs.s.BytesReceived(),
498 })
499 cs.put()
500 cs.put = nil
501 } 1141 }
502 if cs.statsHandler != nil { 1142 data, err := encode(as.codec, m)
503 end := &stats.End{ 1143 if err != nil {
504 Client: true, 1144 return err
505 EndTime: time.Now(), 1145 }
506 } 1146 compData, err := compress(data, as.cp, as.comp)
507 if err != io.EOF { 1147 if err != nil {
508 // end.Error is nil if the RPC finished successfully. 1148 return err
509 end.Error = toRPCErr(err) 1149 }
1150 hdr, payld := msgHeader(data, compData)
1151 // TODO(dfawley): should we be checking len(data) instead?
1152 if len(payld) > *as.callInfo.maxSendMessageSize {
1153 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize)
1154 }
1155
1156 if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil {
1157 if !as.desc.ClientStreams {
1158 // For non-client-streaming RPCs, we return nil instead of EOF on error
1159 // because the generated code requires it. finish is not called; RecvMsg()
1160 // will call it with the stream's status independently.
1161 return nil
510 } 1162 }
511 cs.statsHandler.HandleRPC(cs.statsCtx, end) 1163 return io.EOF
512 } 1164 }
513 if !cs.tracing { 1165
514 return 1166 if channelz.IsOn() {
1167 as.t.IncrMsgSent()
515 } 1168 }
516 if cs.trInfo.tr != nil { 1169 return nil
517 if err == nil || err == io.EOF { 1170}
518 cs.trInfo.tr.LazyPrintf("RPC: [OK]") 1171
1172func (as *addrConnStream) RecvMsg(m interface{}) (err error) {
1173 defer func() {
1174 if err != nil || !as.desc.ServerStreams {
1175 // err != nil or non-server-streaming indicates end of stream.
1176 as.finish(err)
1177 }
1178 }()
1179
1180 if !as.decompSet {
1181 // Block until we receive headers containing received message encoding.
1182 if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity {
1183 if as.dc == nil || as.dc.Type() != ct {
1184 // No configured decompressor, or it does not match the incoming
1185 // message encoding; attempt to find a registered compressor that does.
1186 as.dc = nil
1187 as.decomp = encoding.GetCompressor(ct)
1188 }
519 } else { 1189 } else {
520 cs.trInfo.tr.LazyPrintf("RPC: [%v]", err) 1190 // No compression is used; disable our decompressor.
521 cs.trInfo.tr.SetError() 1191 as.dc = nil
522 } 1192 }
523 cs.trInfo.tr.Finish() 1193 // Only initialize this state once per stream.
524 cs.trInfo.tr = nil 1194 as.decompSet = true
1195 }
1196 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1197 if err != nil {
1198 if err == io.EOF {
1199 if statusErr := as.s.Status().Err(); statusErr != nil {
1200 return statusErr
1201 }
1202 return io.EOF // indicates successful end of stream.
1203 }
1204 return toRPCErr(err)
1205 }
1206
1207 if channelz.IsOn() {
1208 as.t.IncrMsgRecv()
525 } 1209 }
1210 if as.desc.ServerStreams {
1211 // Subsequent messages should be received by subsequent RecvMsg calls.
1212 return nil
1213 }
1214
1215 // Special handling for non-server-stream rpcs.
1216 // This recv expects EOF or errors, so we don't collect inPayload.
1217 err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp)
1218 if err == nil {
1219 return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
1220 }
1221 if err == io.EOF {
1222 return as.s.Status().Err() // non-server streaming Recv returns nil on success
1223 }
1224 return toRPCErr(err)
526} 1225}
527 1226
528// ServerStream defines the interface a server stream has to satisfy. 1227func (as *addrConnStream) finish(err error) {
1228 as.mu.Lock()
1229 if as.finished {
1230 as.mu.Unlock()
1231 return
1232 }
1233 as.finished = true
1234 if err == io.EOF {
1235 // Ending a stream with EOF indicates a success.
1236 err = nil
1237 }
1238 if as.s != nil {
1239 as.t.CloseStream(as.s, err)
1240 }
1241
1242 if err != nil {
1243 as.ac.incrCallsFailed()
1244 } else {
1245 as.ac.incrCallsSucceeded()
1246 }
1247 as.cancel()
1248 as.mu.Unlock()
1249}
1250
1251// ServerStream defines the server-side behavior of a streaming RPC.
1252//
1253// All errors returned from ServerStream methods are compatible with the
1254// status package.
529type ServerStream interface { 1255type ServerStream interface {
530 // SetHeader sets the header metadata. It may be called multiple times. 1256 // SetHeader sets the header metadata. It may be called multiple times.
531 // When call multiple times, all the provided metadata will be merged. 1257 // When call multiple times, all the provided metadata will be merged.
@@ -541,29 +1267,67 @@ type ServerStream interface {
541 // SetTrailer sets the trailer metadata which will be sent with the RPC status. 1267 // SetTrailer sets the trailer metadata which will be sent with the RPC status.
542 // When called more than once, all the provided metadata will be merged. 1268 // When called more than once, all the provided metadata will be merged.
543 SetTrailer(metadata.MD) 1269 SetTrailer(metadata.MD)
544 Stream 1270 // Context returns the context for this stream.
1271 Context() context.Context
1272 // SendMsg sends a message. On error, SendMsg aborts the stream and the
1273 // error is returned directly.
1274 //
1275 // SendMsg blocks until:
1276 // - There is sufficient flow control to schedule m with the transport, or
1277 // - The stream is done, or
1278 // - The stream breaks.
1279 //
1280 // SendMsg does not wait until the message is received by the client. An
1281 // untimely stream closure may result in lost messages.
1282 //
1283 // It is safe to have a goroutine calling SendMsg and another goroutine
1284 // calling RecvMsg on the same stream at the same time, but it is not safe
1285 // to call SendMsg on the same stream in different goroutines.
1286 SendMsg(m interface{}) error
1287 // RecvMsg blocks until it receives a message into m or the stream is
1288 // done. It returns io.EOF when the client has performed a CloseSend. On
1289 // any non-EOF error, the stream is aborted and the error contains the
1290 // RPC status.
1291 //
1292 // It is safe to have a goroutine calling SendMsg and another goroutine
1293 // calling RecvMsg on the same stream at the same time, but it is not
1294 // safe to call RecvMsg on the same stream in different goroutines.
1295 RecvMsg(m interface{}) error
545} 1296}
546 1297
547// serverStream implements a server side Stream. 1298// serverStream implements a server side Stream.
548type serverStream struct { 1299type serverStream struct {
549 t transport.ServerTransport 1300 ctx context.Context
550 s *transport.Stream 1301 t transport.ServerTransport
551 p *parser 1302 s *transport.Stream
552 codec Codec 1303 p *parser
553 cp Compressor 1304 codec baseCodec
554 dc Decompressor 1305
555 cbuf *bytes.Buffer 1306 cp Compressor
1307 dc Decompressor
1308 comp encoding.Compressor
1309 decomp encoding.Compressor
1310
556 maxReceiveMessageSize int 1311 maxReceiveMessageSize int
557 maxSendMessageSize int 1312 maxSendMessageSize int
558 trInfo *traceInfo 1313 trInfo *traceInfo
559 1314
560 statsHandler stats.Handler 1315 statsHandler stats.Handler
561 1316
1317 binlog *binarylog.MethodLogger
1318 // serverHeaderBinlogged indicates whether server header has been logged. It
1319 // will happen when one of the following two happens: stream.SendHeader(),
1320 // stream.Send().
1321 //
1322 // It's only checked in send and sendHeader, doesn't need to be
1323 // synchronized.
1324 serverHeaderBinlogged bool
1325
562 mu sync.Mutex // protects trInfo.tr after the service handler runs. 1326 mu sync.Mutex // protects trInfo.tr after the service handler runs.
563} 1327}
564 1328
565func (ss *serverStream) Context() context.Context { 1329func (ss *serverStream) Context() context.Context {
566 return ss.s.Context() 1330 return ss.ctx
567} 1331}
568 1332
569func (ss *serverStream) SetHeader(md metadata.MD) error { 1333func (ss *serverStream) SetHeader(md metadata.MD) error {
@@ -574,7 +1338,15 @@ func (ss *serverStream) SetHeader(md metadata.MD) error {
574} 1338}
575 1339
576func (ss *serverStream) SendHeader(md metadata.MD) error { 1340func (ss *serverStream) SendHeader(md metadata.MD) error {
577 return ss.t.WriteHeader(ss.s, md) 1341 err := ss.t.WriteHeader(ss.s, md)
1342 if ss.binlog != nil && !ss.serverHeaderBinlogged {
1343 h, _ := ss.s.Header()
1344 ss.binlog.Log(&binarylog.ServerHeader{
1345 Header: h,
1346 })
1347 ss.serverHeaderBinlogged = true
1348 }
1349 return err
578} 1350}
579 1351
580func (ss *serverStream) SetTrailer(md metadata.MD) { 1352func (ss *serverStream) SetTrailer(md metadata.MD) {
@@ -582,7 +1354,6 @@ func (ss *serverStream) SetTrailer(md metadata.MD) {
582 return 1354 return
583 } 1355 }
584 ss.s.SetTrailer(md) 1356 ss.s.SetTrailer(md)
585 return
586} 1357}
587 1358
588func (ss *serverStream) SendMsg(m interface{}) (err error) { 1359func (ss *serverStream) SendMsg(m interface{}) (err error) {
@@ -599,29 +1370,50 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
599 } 1370 }
600 ss.mu.Unlock() 1371 ss.mu.Unlock()
601 } 1372 }
602 }() 1373 if err != nil && err != io.EOF {
603 var outPayload *stats.OutPayload 1374 st, _ := status.FromError(toRPCErr(err))
604 if ss.statsHandler != nil { 1375 ss.t.WriteStatus(ss.s, st)
605 outPayload = &stats.OutPayload{} 1376 // Non-user specified status was sent out. This should be an error
606 } 1377 // case (as a server side Cancel maybe).
607 out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outPayload) 1378 //
608 defer func() { 1379 // This is not handled specifically now. User will return a final
609 if ss.cbuf != nil { 1380 // status from the service handler, we will log that error instead.
610 ss.cbuf.Reset() 1381 // This behavior is similar to an interceptor.
1382 }
1383 if channelz.IsOn() && err == nil {
1384 ss.t.IncrMsgSent()
611 } 1385 }
612 }() 1386 }()
1387 data, err := encode(ss.codec, m)
1388 if err != nil {
1389 return err
1390 }
1391 compData, err := compress(data, ss.cp, ss.comp)
613 if err != nil { 1392 if err != nil {
614 return err 1393 return err
615 } 1394 }
616 if len(out) > ss.maxSendMessageSize { 1395 hdr, payload := msgHeader(data, compData)
617 return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(out), ss.maxSendMessageSize) 1396 // TODO(dfawley): should we be checking len(data) instead?
1397 if len(payload) > ss.maxSendMessageSize {
1398 return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize)
618 } 1399 }
619 if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil { 1400 if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil {
620 return toRPCErr(err) 1401 return toRPCErr(err)
621 } 1402 }
622 if outPayload != nil { 1403 if ss.binlog != nil {
623 outPayload.SentTime = time.Now() 1404 if !ss.serverHeaderBinlogged {
624 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload) 1405 h, _ := ss.s.Header()
1406 ss.binlog.Log(&binarylog.ServerHeader{
1407 Header: h,
1408 })
1409 ss.serverHeaderBinlogged = true
1410 }
1411 ss.binlog.Log(&binarylog.ServerMessage{
1412 Message: data,
1413 })
1414 }
1415 if ss.statsHandler != nil {
1416 ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now()))
625 } 1417 }
626 return nil 1418 return nil
627} 1419}
@@ -640,22 +1432,55 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
640 } 1432 }
641 ss.mu.Unlock() 1433 ss.mu.Unlock()
642 } 1434 }
1435 if err != nil && err != io.EOF {
1436 st, _ := status.FromError(toRPCErr(err))
1437 ss.t.WriteStatus(ss.s, st)
1438 // Non-user specified status was sent out. This should be an error
1439 // case (as a server side Cancel maybe).
1440 //
1441 // This is not handled specifically now. User will return a final
1442 // status from the service handler, we will log that error instead.
1443 // This behavior is similar to an interceptor.
1444 }
1445 if channelz.IsOn() && err == nil {
1446 ss.t.IncrMsgRecv()
1447 }
643 }() 1448 }()
644 var inPayload *stats.InPayload 1449 var payInfo *payloadInfo
645 if ss.statsHandler != nil { 1450 if ss.statsHandler != nil || ss.binlog != nil {
646 inPayload = &stats.InPayload{} 1451 payInfo = &payloadInfo{}
647 } 1452 }
648 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, inPayload); err != nil { 1453 if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil {
649 if err == io.EOF { 1454 if err == io.EOF {
1455 if ss.binlog != nil {
1456 ss.binlog.Log(&binarylog.ClientHalfClose{})
1457 }
650 return err 1458 return err
651 } 1459 }
652 if err == io.ErrUnexpectedEOF { 1460 if err == io.ErrUnexpectedEOF {
653 err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) 1461 err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
654 } 1462 }
655 return toRPCErr(err) 1463 return toRPCErr(err)
656 } 1464 }
657 if inPayload != nil { 1465 if ss.statsHandler != nil {
658 ss.statsHandler.HandleRPC(ss.s.Context(), inPayload) 1466 ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{
1467 RecvTime: time.Now(),
1468 Payload: m,
1469 // TODO truncate large payload.
1470 Data: payInfo.uncompressedBytes,
1471 Length: len(payInfo.uncompressedBytes),
1472 })
1473 }
1474 if ss.binlog != nil {
1475 ss.binlog.Log(&binarylog.ClientMessage{
1476 Message: payInfo.uncompressedBytes,
1477 })
659 } 1478 }
660 return nil 1479 return nil
661} 1480}
1481
1482// MethodFromServerStream returns the method string for the input stream.
1483// The returned string is in the format of "/service/method".
1484func MethodFromServerStream(stream ServerStream) (string, bool) {
1485 return Method(stream.Context())
1486}