]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | /* |
2 | * | |
3 | * Copyright 2014 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package grpc | |
20 | ||
21 | import ( | |
107c1cdb | 22 | "context" |
15c0b25d AP |
23 | "errors" |
24 | "io" | |
107c1cdb ND |
25 | "math" |
26 | "strconv" | |
15c0b25d AP |
27 | "sync" |
28 | "time" | |
29 | ||
15c0b25d | 30 | "golang.org/x/net/trace" |
107c1cdb | 31 | "google.golang.org/grpc/balancer" |
15c0b25d | 32 | "google.golang.org/grpc/codes" |
107c1cdb ND |
33 | "google.golang.org/grpc/connectivity" |
34 | "google.golang.org/grpc/encoding" | |
35 | "google.golang.org/grpc/grpclog" | |
36 | "google.golang.org/grpc/internal/binarylog" | |
37 | "google.golang.org/grpc/internal/channelz" | |
38 | "google.golang.org/grpc/internal/grpcrand" | |
39 | "google.golang.org/grpc/internal/transport" | |
15c0b25d AP |
40 | "google.golang.org/grpc/metadata" |
41 | "google.golang.org/grpc/peer" | |
42 | "google.golang.org/grpc/stats" | |
43 | "google.golang.org/grpc/status" | |
15c0b25d AP |
44 | ) |
45 | ||
46 | // StreamHandler defines the handler called by gRPC server to complete the | |
107c1cdb ND |
47 | // execution of a streaming RPC. If a StreamHandler returns an error, it |
48 | // should be produced by the status package, or else gRPC will use | |
49 | // codes.Unknown as the status code and err.Error() as the status message | |
50 | // of the RPC. | |
15c0b25d AP |
51 | type StreamHandler func(srv interface{}, stream ServerStream) error |
52 | ||
53 | // StreamDesc represents a streaming RPC service's method specification. | |
54 | type StreamDesc struct { | |
55 | StreamName string | |
56 | Handler StreamHandler | |
57 | ||
58 | // At least one of these is true. | |
59 | ServerStreams bool | |
60 | ClientStreams bool | |
61 | } | |
62 | ||
63 | // Stream defines the common interface a client or server stream has to satisfy. | |
107c1cdb ND |
64 | // |
65 | // Deprecated: See ClientStream and ServerStream documentation instead. | |
15c0b25d | 66 | type Stream interface { |
107c1cdb | 67 | // Deprecated: See ClientStream and ServerStream documentation instead. |
15c0b25d | 68 | Context() context.Context |
107c1cdb | 69 | // Deprecated: See ClientStream and ServerStream documentation instead. |
15c0b25d | 70 | SendMsg(m interface{}) error |
107c1cdb | 71 | // Deprecated: See ClientStream and ServerStream documentation instead. |
15c0b25d AP |
72 | RecvMsg(m interface{}) error |
73 | } | |
74 | ||
107c1cdb ND |
75 | // ClientStream defines the client-side behavior of a streaming RPC. |
76 | // | |
77 | // All errors returned from ClientStream methods are compatible with the | |
78 | // status package. | |
15c0b25d AP |
79 | type ClientStream interface { |
80 | // Header returns the header metadata received from the server if there | |
81 | // is any. It blocks if the metadata is not ready to read. | |
82 | Header() (metadata.MD, error) | |
83 | // Trailer returns the trailer metadata from the server, if there is any. | |
84 | // It must only be called after stream.CloseAndRecv has returned, or | |
85 | // stream.Recv has returned a non-nil error (including io.EOF). | |
86 | Trailer() metadata.MD | |
87 | // CloseSend closes the send direction of the stream. It closes the stream | |
107c1cdb ND |
88 | // when non-nil error is met. It is also not safe to call CloseSend |
89 | // concurrently with SendMsg. | |
15c0b25d | 90 | CloseSend() error |
107c1cdb ND |
91 | // Context returns the context for this stream. |
92 | // | |
93 | // It should not be called until after Header or RecvMsg has returned. Once | |
94 | // called, subsequent client-side retries are disabled. | |
95 | Context() context.Context | |
96 | // SendMsg is generally called by generated code. On error, SendMsg aborts | |
97 | // the stream. If the error was generated by the client, the status is | |
98 | // returned directly; otherwise, io.EOF is returned and the status of | |
99 | // the stream may be discovered using RecvMsg. | |
100 | // | |
101 | // SendMsg blocks until: | |
102 | // - There is sufficient flow control to schedule m with the transport, or | |
103 | // - The stream is done, or | |
104 | // - The stream breaks. | |
105 | // | |
106 | // SendMsg does not wait until the message is received by the server. An | |
107 | // untimely stream closure may result in lost messages. To ensure delivery, | |
108 | // users should ensure the RPC completed successfully using RecvMsg. | |
109 | // | |
110 | // It is safe to have a goroutine calling SendMsg and another goroutine | |
111 | // calling RecvMsg on the same stream at the same time, but it is not safe | |
112 | // to call SendMsg on the same stream in different goroutines. It is also | |
113 | // not safe to call CloseSend concurrently with SendMsg. | |
114 | SendMsg(m interface{}) error | |
115 | // RecvMsg blocks until it receives a message into m or the stream is | |
116 | // done. It returns io.EOF when the stream completes successfully. On | |
117 | // any other error, the stream is aborted and the error contains the RPC | |
118 | // status. | |
119 | // | |
120 | // It is safe to have a goroutine calling SendMsg and another goroutine | |
121 | // calling RecvMsg on the same stream at the same time, but it is not | |
122 | // safe to call RecvMsg on the same stream in different goroutines. | |
123 | RecvMsg(m interface{}) error | |
15c0b25d AP |
124 | } |
125 | ||
107c1cdb ND |
126 | // NewStream creates a new Stream for the client side. This is typically |
127 | // called by generated code. ctx is used for the lifetime of the stream. | |
128 | // | |
129 | // To ensure resources are not leaked due to the stream returned, one of the following | |
130 | // actions must be performed: | |
131 | // | |
132 | // 1. Call Close on the ClientConn. | |
133 | // 2. Cancel the context provided. | |
134 | // 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated | |
135 | // client-streaming RPC, for instance, might use the helper function | |
136 | // CloseAndRecv (note that CloseSend does not Recv, therefore is not | |
137 | // guaranteed to release all resources). | |
138 | // 4. Receive a non-nil, non-io.EOF error from Header or SendMsg. | |
139 | // | |
140 | // If none of the above happen, a goroutine and a context will be leaked, and grpc | |
141 | // will not call the optionally-configured stats handler with a stats.End message. | |
142 | func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { | |
143 | // allow interceptor to see all applicable call options, which means those | |
144 | // configured as defaults from dial option as well as per-call options | |
145 | opts = combine(cc.dopts.callOptions, opts) | |
146 | ||
15c0b25d AP |
147 | if cc.dopts.streamInt != nil { |
148 | return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) | |
149 | } | |
150 | return newClientStream(ctx, desc, cc, method, opts...) | |
151 | } | |
152 | ||
107c1cdb ND |
153 | // NewClientStream is a wrapper for ClientConn.NewStream. |
154 | func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { | |
155 | return cc.NewStream(ctx, desc, method, opts...) | |
156 | } | |
157 | ||
15c0b25d | 158 | func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { |
107c1cdb ND |
159 | if channelz.IsOn() { |
160 | cc.incrCallsStarted() | |
161 | defer func() { | |
162 | if err != nil { | |
163 | cc.incrCallsFailed() | |
164 | } | |
165 | }() | |
166 | } | |
167 | c := defaultCallInfo() | |
168 | // Provide an opportunity for the first RPC to see the first service config | |
169 | // provided by the resolver. | |
170 | if err := cc.waitForResolvedAddrs(ctx); err != nil { | |
171 | return nil, err | |
172 | } | |
15c0b25d AP |
173 | mc := cc.GetMethodConfig(method) |
174 | if mc.WaitForReady != nil { | |
175 | c.failFast = !*mc.WaitForReady | |
176 | } | |
177 | ||
107c1cdb ND |
178 | // Possible context leak: |
179 | // The cancel function for the child context we create will only be called | |
180 | // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if | |
181 | // an error is generated by SendMsg. | |
182 | // https://github.com/grpc/grpc-go/issues/1818. | |
183 | var cancel context.CancelFunc | |
184 | if mc.Timeout != nil && *mc.Timeout >= 0 { | |
15c0b25d | 185 | ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) |
107c1cdb ND |
186 | } else { |
187 | ctx, cancel = context.WithCancel(ctx) | |
15c0b25d | 188 | } |
107c1cdb ND |
189 | defer func() { |
190 | if err != nil { | |
191 | cancel() | |
192 | } | |
193 | }() | |
15c0b25d | 194 | |
15c0b25d | 195 | for _, o := range opts { |
107c1cdb | 196 | if err := o.before(c); err != nil { |
15c0b25d AP |
197 | return nil, toRPCErr(err) |
198 | } | |
199 | } | |
200 | c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) | |
201 | c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) | |
107c1cdb ND |
202 | if err := setCallInfoCodec(c); err != nil { |
203 | return nil, err | |
204 | } | |
15c0b25d AP |
205 | |
206 | callHdr := &transport.CallHdr{ | |
107c1cdb ND |
207 | Host: cc.authority, |
208 | Method: method, | |
209 | ContentSubtype: c.contentSubtype, | |
210 | } | |
211 | ||
212 | // Set our outgoing compression according to the UseCompressor CallOption, if | |
213 | // set. In that case, also find the compressor from the encoding package. | |
214 | // Otherwise, use the compressor configured by the WithCompressor DialOption, | |
215 | // if set. | |
216 | var cp Compressor | |
217 | var comp encoding.Compressor | |
218 | if ct := c.compressorType; ct != "" { | |
219 | callHdr.SendCompress = ct | |
220 | if ct != encoding.Identity { | |
221 | comp = encoding.GetCompressor(ct) | |
222 | if comp == nil { | |
223 | return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) | |
224 | } | |
225 | } | |
226 | } else if cc.dopts.cp != nil { | |
15c0b25d | 227 | callHdr.SendCompress = cc.dopts.cp.Type() |
107c1cdb | 228 | cp = cc.dopts.cp |
15c0b25d AP |
229 | } |
230 | if c.creds != nil { | |
231 | callHdr.Creds = c.creds | |
232 | } | |
233 | var trInfo traceInfo | |
234 | if EnableTracing { | |
235 | trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) | |
236 | trInfo.firstLine.client = true | |
237 | if deadline, ok := ctx.Deadline(); ok { | |
238 | trInfo.firstLine.deadline = deadline.Sub(time.Now()) | |
239 | } | |
240 | trInfo.tr.LazyLog(&trInfo.firstLine, false) | |
241 | ctx = trace.NewContext(ctx, trInfo.tr) | |
15c0b25d | 242 | } |
107c1cdb | 243 | ctx = newContextWithRPCInfo(ctx, c.failFast) |
15c0b25d | 244 | sh := cc.dopts.copts.StatsHandler |
107c1cdb | 245 | var beginTime time.Time |
15c0b25d AP |
246 | if sh != nil { |
247 | ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) | |
107c1cdb | 248 | beginTime = time.Now() |
15c0b25d AP |
249 | begin := &stats.Begin{ |
250 | Client: true, | |
107c1cdb | 251 | BeginTime: beginTime, |
15c0b25d AP |
252 | FailFast: c.failFast, |
253 | } | |
254 | sh.HandleRPC(ctx, begin) | |
15c0b25d | 255 | } |
107c1cdb ND |
256 | |
257 | cs := &clientStream{ | |
258 | callHdr: callHdr, | |
259 | ctx: ctx, | |
260 | methodConfig: &mc, | |
261 | opts: opts, | |
262 | callInfo: c, | |
263 | cc: cc, | |
264 | desc: desc, | |
265 | codec: c.codec, | |
266 | cp: cp, | |
267 | comp: comp, | |
268 | cancel: cancel, | |
269 | beginTime: beginTime, | |
270 | firstAttempt: true, | |
271 | } | |
272 | if !cc.dopts.disableRetry { | |
273 | cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler) | |
15c0b25d | 274 | } |
107c1cdb | 275 | cs.binlog = binarylog.GetMethodLogger(method) |
15c0b25d | 276 | |
107c1cdb ND |
277 | cs.callInfo.stream = cs |
278 | // Only this initial attempt has stats/tracing. | |
279 | // TODO(dfawley): move to newAttempt when per-attempt stats are implemented. | |
280 | if err := cs.newAttemptLocked(sh, trInfo); err != nil { | |
281 | cs.finish(err) | |
282 | return nil, err | |
283 | } | |
284 | ||
285 | op := func(a *csAttempt) error { return a.newStream() } | |
286 | if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil { | |
287 | cs.finish(err) | |
288 | return nil, err | |
289 | } | |
290 | ||
291 | if cs.binlog != nil { | |
292 | md, _ := metadata.FromOutgoingContext(ctx) | |
293 | logEntry := &binarylog.ClientHeader{ | |
294 | OnClientSide: true, | |
295 | Header: md, | |
296 | MethodName: method, | |
297 | Authority: cs.cc.authority, | |
298 | } | |
299 | if deadline, ok := ctx.Deadline(); ok { | |
300 | logEntry.Timeout = deadline.Sub(time.Now()) | |
301 | if logEntry.Timeout < 0 { | |
302 | logEntry.Timeout = 0 | |
15c0b25d | 303 | } |
15c0b25d | 304 | } |
107c1cdb | 305 | cs.binlog.Log(logEntry) |
15c0b25d | 306 | } |
107c1cdb ND |
307 | |
308 | if desc != unaryStreamDesc { | |
309 | // Listen on cc and stream contexts to cleanup when the user closes the | |
310 | // ClientConn or cancels the stream context. In all other cases, an error | |
311 | // should already be injected into the recv buffer by the transport, which | |
312 | // the client will eventually receive, and then we will cancel the stream's | |
313 | // context in clientStream.finish. | |
314 | go func() { | |
315 | select { | |
316 | case <-cc.ctx.Done(): | |
317 | cs.finish(ErrClientConnClosing) | |
318 | case <-ctx.Done(): | |
319 | cs.finish(toRPCErr(ctx.Err())) | |
320 | } | |
321 | }() | |
15c0b25d | 322 | } |
15c0b25d AP |
323 | return cs, nil |
324 | } | |
325 | ||
107c1cdb ND |
326 | func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error { |
327 | cs.attempt = &csAttempt{ | |
328 | cs: cs, | |
329 | dc: cs.cc.dopts.dc, | |
330 | statsHandler: sh, | |
331 | trInfo: trInfo, | |
332 | } | |
333 | ||
334 | if err := cs.ctx.Err(); err != nil { | |
335 | return toRPCErr(err) | |
336 | } | |
337 | t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method) | |
338 | if err != nil { | |
339 | return err | |
340 | } | |
341 | cs.attempt.t = t | |
342 | cs.attempt.done = done | |
343 | return nil | |
344 | } | |
345 | ||
346 | func (a *csAttempt) newStream() error { | |
347 | cs := a.cs | |
348 | cs.callHdr.PreviousAttempts = cs.numRetries | |
349 | s, err := a.t.NewStream(cs.ctx, cs.callHdr) | |
350 | if err != nil { | |
351 | return toRPCErr(err) | |
352 | } | |
353 | cs.attempt.s = s | |
354 | cs.attempt.p = &parser{r: s} | |
355 | return nil | |
356 | } | |
357 | ||
15c0b25d AP |
358 | // clientStream implements a client side Stream. |
359 | type clientStream struct { | |
107c1cdb ND |
360 | callHdr *transport.CallHdr |
361 | opts []CallOption | |
362 | callInfo *callInfo | |
363 | cc *ClientConn | |
364 | desc *StreamDesc | |
365 | ||
366 | codec baseCodec | |
367 | cp Compressor | |
368 | comp encoding.Compressor | |
369 | ||
370 | cancel context.CancelFunc // cancels all attempts | |
371 | ||
372 | sentLast bool // sent an end stream | |
373 | beginTime time.Time | |
374 | ||
375 | methodConfig *MethodConfig | |
376 | ||
377 | ctx context.Context // the application's context, wrapped by stats/tracing | |
15c0b25d | 378 | |
107c1cdb | 379 | retryThrottler *retryThrottler // The throttler active when the RPC began. |
15c0b25d | 380 | |
107c1cdb ND |
381 | binlog *binarylog.MethodLogger // Binary logger, can be nil. |
382 | // serverHeaderBinlogged is a boolean for whether server header has been | |
383 | // logged. Server header will be logged when the first time one of those | |
384 | // happens: stream.Header(), stream.Recv(). | |
385 | // | |
386 | // It's only read and used by Recv() and Header(), so it doesn't need to be | |
387 | // synchronized. | |
388 | serverHeaderBinlogged bool | |
389 | ||
390 | mu sync.Mutex | |
391 | firstAttempt bool // if true, transparent retry is valid | |
392 | numRetries int // exclusive of transparent retry attempt(s) | |
393 | numRetriesSincePushback int // retries since pushback; to reset backoff | |
394 | finished bool // TODO: replace with atomic cmpxchg or sync.Once? | |
395 | attempt *csAttempt // the active client stream attempt | |
396 | // TODO(hedging): hedging will have multiple attempts simultaneously. | |
397 | committed bool // active attempt committed for retry? | |
398 | buffer []func(a *csAttempt) error // operations to replay on retry | |
399 | bufferSize int // current size of buffer | |
400 | } | |
401 | ||
402 | // csAttempt implements a single transport stream attempt within a | |
403 | // clientStream. | |
404 | type csAttempt struct { | |
405 | cs *clientStream | |
406 | t transport.ClientTransport | |
407 | s *transport.Stream | |
408 | p *parser | |
409 | done func(balancer.DoneInfo) | |
410 | ||
411 | finished bool | |
412 | dc Decompressor | |
413 | decomp encoding.Compressor | |
414 | decompSet bool | |
415 | ||
416 | mu sync.Mutex // guards trInfo.tr | |
417 | // trInfo.tr is set when created (if EnableTracing is true), | |
418 | // and cleared when the finish method is called. | |
15c0b25d AP |
419 | trInfo traceInfo |
420 | ||
15c0b25d AP |
421 | statsHandler stats.Handler |
422 | } | |
423 | ||
107c1cdb ND |
424 | func (cs *clientStream) commitAttemptLocked() { |
425 | cs.committed = true | |
426 | cs.buffer = nil | |
427 | } | |
428 | ||
429 | func (cs *clientStream) commitAttempt() { | |
430 | cs.mu.Lock() | |
431 | cs.commitAttemptLocked() | |
432 | cs.mu.Unlock() | |
433 | } | |
434 | ||
435 | // shouldRetry returns nil if the RPC should be retried; otherwise it returns | |
436 | // the error that should be returned by the operation. | |
437 | func (cs *clientStream) shouldRetry(err error) error { | |
438 | if cs.attempt.s == nil && !cs.callInfo.failFast { | |
439 | // In the event of any error from NewStream (attempt.s == nil), we | |
440 | // never attempted to write anything to the wire, so we can retry | |
441 | // indefinitely for non-fail-fast RPCs. | |
442 | return nil | |
443 | } | |
444 | if cs.finished || cs.committed { | |
445 | // RPC is finished or committed; cannot retry. | |
446 | return err | |
447 | } | |
448 | // Wait for the trailers. | |
449 | if cs.attempt.s != nil { | |
450 | <-cs.attempt.s.Done() | |
451 | } | |
452 | if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) { | |
453 | // First attempt, wait-for-ready, stream unprocessed: transparently retry. | |
454 | cs.firstAttempt = false | |
455 | return nil | |
456 | } | |
457 | cs.firstAttempt = false | |
458 | if cs.cc.dopts.disableRetry { | |
459 | return err | |
460 | } | |
461 | ||
462 | pushback := 0 | |
463 | hasPushback := false | |
464 | if cs.attempt.s != nil { | |
465 | if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to { | |
466 | return err | |
467 | } | |
468 | ||
469 | // TODO(retry): Move down if the spec changes to not check server pushback | |
470 | // before considering this a failure for throttling. | |
471 | sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"] | |
472 | if len(sps) == 1 { | |
473 | var e error | |
474 | if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 { | |
475 | grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0]) | |
476 | cs.retryThrottler.throttle() // This counts as a failure for throttling. | |
477 | return err | |
478 | } | |
479 | hasPushback = true | |
480 | } else if len(sps) > 1 { | |
481 | grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps) | |
482 | cs.retryThrottler.throttle() // This counts as a failure for throttling. | |
483 | return err | |
484 | } | |
485 | } | |
486 | ||
487 | var code codes.Code | |
488 | if cs.attempt.s != nil { | |
489 | code = cs.attempt.s.Status().Code() | |
490 | } else { | |
491 | code = status.Convert(err).Code() | |
492 | } | |
493 | ||
494 | rp := cs.methodConfig.retryPolicy | |
495 | if rp == nil || !rp.retryableStatusCodes[code] { | |
496 | return err | |
497 | } | |
498 | ||
499 | // Note: the ordering here is important; we count this as a failure | |
500 | // only if the code matched a retryable code. | |
501 | if cs.retryThrottler.throttle() { | |
502 | return err | |
503 | } | |
504 | if cs.numRetries+1 >= rp.maxAttempts { | |
505 | return err | |
506 | } | |
507 | ||
508 | var dur time.Duration | |
509 | if hasPushback { | |
510 | dur = time.Millisecond * time.Duration(pushback) | |
511 | cs.numRetriesSincePushback = 0 | |
512 | } else { | |
513 | fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback)) | |
514 | cur := float64(rp.initialBackoff) * fact | |
515 | if max := float64(rp.maxBackoff); cur > max { | |
516 | cur = max | |
517 | } | |
518 | dur = time.Duration(grpcrand.Int63n(int64(cur))) | |
519 | cs.numRetriesSincePushback++ | |
520 | } | |
521 | ||
522 | // TODO(dfawley): we could eagerly fail here if dur puts us past the | |
523 | // deadline, but unsure if it is worth doing. | |
524 | t := time.NewTimer(dur) | |
525 | select { | |
526 | case <-t.C: | |
527 | cs.numRetries++ | |
528 | return nil | |
529 | case <-cs.ctx.Done(): | |
530 | t.Stop() | |
531 | return status.FromContextError(cs.ctx.Err()).Err() | |
532 | } | |
533 | } | |
534 | ||
535 | // Returns nil if a retry was performed and succeeded; error otherwise. | |
536 | func (cs *clientStream) retryLocked(lastErr error) error { | |
537 | for { | |
538 | cs.attempt.finish(lastErr) | |
539 | if err := cs.shouldRetry(lastErr); err != nil { | |
540 | cs.commitAttemptLocked() | |
541 | return err | |
542 | } | |
543 | if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil { | |
544 | return err | |
545 | } | |
546 | if lastErr = cs.replayBufferLocked(); lastErr == nil { | |
547 | return nil | |
548 | } | |
549 | } | |
550 | } | |
551 | ||
15c0b25d | 552 | func (cs *clientStream) Context() context.Context { |
107c1cdb ND |
553 | cs.commitAttempt() |
554 | // No need to lock before using attempt, since we know it is committed and | |
555 | // cannot change. | |
556 | return cs.attempt.s.Context() | |
557 | } | |
558 | ||
559 | func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error { | |
560 | cs.mu.Lock() | |
561 | for { | |
562 | if cs.committed { | |
563 | cs.mu.Unlock() | |
564 | return op(cs.attempt) | |
565 | } | |
566 | a := cs.attempt | |
567 | cs.mu.Unlock() | |
568 | err := op(a) | |
569 | cs.mu.Lock() | |
570 | if a != cs.attempt { | |
571 | // We started another attempt already. | |
572 | continue | |
573 | } | |
574 | if err == io.EOF { | |
575 | <-a.s.Done() | |
576 | } | |
577 | if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) { | |
578 | onSuccess() | |
579 | cs.mu.Unlock() | |
580 | return err | |
581 | } | |
582 | if err := cs.retryLocked(err); err != nil { | |
583 | cs.mu.Unlock() | |
584 | return err | |
585 | } | |
586 | } | |
15c0b25d AP |
587 | } |
588 | ||
589 | func (cs *clientStream) Header() (metadata.MD, error) { | |
107c1cdb ND |
590 | var m metadata.MD |
591 | err := cs.withRetry(func(a *csAttempt) error { | |
592 | var err error | |
593 | m, err = a.s.Header() | |
594 | return toRPCErr(err) | |
595 | }, cs.commitAttemptLocked) | |
15c0b25d | 596 | if err != nil { |
107c1cdb ND |
597 | cs.finish(err) |
598 | return nil, err | |
599 | } | |
600 | if cs.binlog != nil && !cs.serverHeaderBinlogged { | |
601 | // Only log if binary log is on and header has not been logged. | |
602 | logEntry := &binarylog.ServerHeader{ | |
603 | OnClientSide: true, | |
604 | Header: m, | |
605 | PeerAddr: nil, | |
15c0b25d | 606 | } |
107c1cdb ND |
607 | if peer, ok := peer.FromContext(cs.Context()); ok { |
608 | logEntry.PeerAddr = peer.Addr | |
609 | } | |
610 | cs.binlog.Log(logEntry) | |
611 | cs.serverHeaderBinlogged = true | |
15c0b25d AP |
612 | } |
613 | return m, err | |
614 | } | |
615 | ||
616 | func (cs *clientStream) Trailer() metadata.MD { | |
107c1cdb ND |
617 | // On RPC failure, we never need to retry, because usage requires that |
618 | // RecvMsg() returned a non-nil error before calling this function is valid. | |
619 | // We would have retried earlier if necessary. | |
620 | // | |
621 | // Commit the attempt anyway, just in case users are not following those | |
622 | // directions -- it will prevent races and should not meaningfully impact | |
623 | // performance. | |
624 | cs.commitAttempt() | |
625 | if cs.attempt.s == nil { | |
626 | return nil | |
627 | } | |
628 | return cs.attempt.s.Trailer() | |
15c0b25d AP |
629 | } |
630 | ||
107c1cdb ND |
631 | func (cs *clientStream) replayBufferLocked() error { |
632 | a := cs.attempt | |
633 | for _, f := range cs.buffer { | |
634 | if err := f(a); err != nil { | |
635 | return err | |
15c0b25d | 636 | } |
15c0b25d | 637 | } |
107c1cdb ND |
638 | return nil |
639 | } | |
640 | ||
641 | func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) { | |
642 | // Note: we still will buffer if retry is disabled (for transparent retries). | |
643 | if cs.committed { | |
644 | return | |
645 | } | |
646 | cs.bufferSize += sz | |
647 | if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize { | |
648 | cs.commitAttemptLocked() | |
649 | return | |
650 | } | |
651 | cs.buffer = append(cs.buffer, op) | |
652 | } | |
653 | ||
654 | func (cs *clientStream) SendMsg(m interface{}) (err error) { | |
15c0b25d | 655 | defer func() { |
107c1cdb ND |
656 | if err != nil && err != io.EOF { |
657 | // Call finish on the client stream for errors generated by this SendMsg | |
658 | // call, as these indicate problems created by this client. (Transport | |
659 | // errors are converted to an io.EOF error in csAttempt.sendMsg; the real | |
660 | // error will be returned from RecvMsg eventually in that case, or be | |
661 | // retried.) | |
15c0b25d AP |
662 | cs.finish(err) |
663 | } | |
15c0b25d | 664 | }() |
107c1cdb ND |
665 | if cs.sentLast { |
666 | return status.Errorf(codes.Internal, "SendMsg called after CloseSend") | |
15c0b25d | 667 | } |
107c1cdb ND |
668 | if !cs.desc.ClientStreams { |
669 | cs.sentLast = true | |
670 | } | |
671 | data, err := encode(cs.codec, m) | |
15c0b25d AP |
672 | if err != nil { |
673 | return err | |
674 | } | |
107c1cdb ND |
675 | compData, err := compress(data, cs.cp, cs.comp) |
676 | if err != nil { | |
677 | return err | |
15c0b25d | 678 | } |
107c1cdb ND |
679 | hdr, payload := msgHeader(data, compData) |
680 | // TODO(dfawley): should we be checking len(data) instead? | |
681 | if len(payload) > *cs.callInfo.maxSendMessageSize { | |
682 | return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize) | |
15c0b25d | 683 | } |
107c1cdb ND |
684 | msgBytes := data // Store the pointer before setting to nil. For binary logging. |
685 | op := func(a *csAttempt) error { | |
686 | err := a.sendMsg(m, hdr, payload, data) | |
687 | // nil out the message and uncomp when replaying; they are only needed for | |
688 | // stats which is disabled for subsequent attempts. | |
689 | m, data = nil, nil | |
690 | return err | |
15c0b25d | 691 | } |
107c1cdb ND |
692 | err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) }) |
693 | if cs.binlog != nil && err == nil { | |
694 | cs.binlog.Log(&binarylog.ClientMessage{ | |
695 | OnClientSide: true, | |
696 | Message: msgBytes, | |
697 | }) | |
698 | } | |
699 | return | |
15c0b25d AP |
700 | } |
701 | ||
107c1cdb ND |
702 | func (cs *clientStream) RecvMsg(m interface{}) error { |
703 | if cs.binlog != nil && !cs.serverHeaderBinlogged { | |
704 | // Call Header() to binary log header if it's not already logged. | |
705 | cs.Header() | |
15c0b25d | 706 | } |
107c1cdb ND |
707 | var recvInfo *payloadInfo |
708 | if cs.binlog != nil { | |
709 | recvInfo = &payloadInfo{} | |
15c0b25d | 710 | } |
107c1cdb ND |
711 | err := cs.withRetry(func(a *csAttempt) error { |
712 | return a.recvMsg(m, recvInfo) | |
713 | }, cs.commitAttemptLocked) | |
714 | if cs.binlog != nil && err == nil { | |
715 | cs.binlog.Log(&binarylog.ServerMessage{ | |
716 | OnClientSide: true, | |
717 | Message: recvInfo.uncompressedBytes, | |
718 | }) | |
719 | } | |
720 | if err != nil || !cs.desc.ServerStreams { | |
721 | // err != nil or non-server-streaming indicates end of stream. | |
722 | cs.finish(err) | |
723 | ||
724 | if cs.binlog != nil { | |
725 | // finish will not log Trailer. Log Trailer here. | |
726 | logEntry := &binarylog.ServerTrailer{ | |
727 | OnClientSide: true, | |
728 | Trailer: cs.Trailer(), | |
729 | Err: err, | |
730 | } | |
731 | if logEntry.Err == io.EOF { | |
732 | logEntry.Err = nil | |
733 | } | |
734 | if peer, ok := peer.FromContext(cs.Context()); ok { | |
735 | logEntry.PeerAddr = peer.Addr | |
736 | } | |
737 | cs.binlog.Log(logEntry) | |
15c0b25d | 738 | } |
107c1cdb ND |
739 | } |
740 | return err | |
741 | } | |
742 | ||
743 | func (cs *clientStream) CloseSend() error { | |
744 | if cs.sentLast { | |
745 | // TODO: return an error and finish the stream instead, due to API misuse? | |
746 | return nil | |
747 | } | |
748 | cs.sentLast = true | |
749 | op := func(a *csAttempt) error { | |
750 | a.t.Write(a.s, nil, nil, &transport.Options{Last: true}) | |
751 | // Always return nil; io.EOF is the only error that might make sense | |
752 | // instead, but there is no need to signal the client to call RecvMsg | |
753 | // as the only use left for the stream after CloseSend is to call | |
754 | // RecvMsg. This also matches historical behavior. | |
755 | return nil | |
756 | } | |
757 | cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }) | |
758 | if cs.binlog != nil { | |
759 | cs.binlog.Log(&binarylog.ClientHalfClose{ | |
760 | OnClientSide: true, | |
761 | }) | |
762 | } | |
763 | // We never returned an error here for reasons. | |
764 | return nil | |
765 | } | |
766 | ||
767 | func (cs *clientStream) finish(err error) { | |
768 | if err == io.EOF { | |
769 | // Ending a stream with EOF indicates a success. | |
770 | err = nil | |
771 | } | |
772 | cs.mu.Lock() | |
773 | if cs.finished { | |
774 | cs.mu.Unlock() | |
775 | return | |
776 | } | |
777 | cs.finished = true | |
778 | cs.commitAttemptLocked() | |
779 | cs.mu.Unlock() | |
780 | // For binary logging. only log cancel in finish (could be caused by RPC ctx | |
781 | // canceled or ClientConn closed). Trailer will be logged in RecvMsg. | |
782 | // | |
783 | // Only one of cancel or trailer needs to be logged. In the cases where | |
784 | // users don't call RecvMsg, users must have already canceled the RPC. | |
785 | if cs.binlog != nil && status.Code(err) == codes.Canceled { | |
786 | cs.binlog.Log(&binarylog.Cancel{ | |
787 | OnClientSide: true, | |
788 | }) | |
789 | } | |
15c0b25d | 790 | if err == nil { |
107c1cdb ND |
791 | cs.retryThrottler.successfulRPC() |
792 | } | |
793 | if channelz.IsOn() { | |
794 | if err != nil { | |
795 | cs.cc.incrCallsFailed() | |
796 | } else { | |
797 | cs.cc.incrCallsSucceeded() | |
15c0b25d | 798 | } |
107c1cdb ND |
799 | } |
800 | if cs.attempt != nil { | |
801 | cs.attempt.finish(err) | |
802 | } | |
803 | // after functions all rely upon having a stream. | |
804 | if cs.attempt.s != nil { | |
805 | for _, o := range cs.opts { | |
806 | o.after(cs.callInfo) | |
15c0b25d | 807 | } |
107c1cdb ND |
808 | } |
809 | cs.cancel() | |
810 | } | |
811 | ||
812 | func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { | |
813 | cs := a.cs | |
814 | if EnableTracing { | |
815 | a.mu.Lock() | |
816 | if a.trInfo.tr != nil { | |
817 | a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) | |
15c0b25d | 818 | } |
107c1cdb ND |
819 | a.mu.Unlock() |
820 | } | |
821 | if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { | |
822 | if !cs.desc.ClientStreams { | |
823 | // For non-client-streaming RPCs, we return nil instead of EOF on error | |
824 | // because the generated code requires it. finish is not called; RecvMsg() | |
825 | // will call it with the stream's status independently. | |
826 | return nil | |
15c0b25d | 827 | } |
107c1cdb ND |
828 | return io.EOF |
829 | } | |
830 | if a.statsHandler != nil { | |
831 | a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now())) | |
832 | } | |
833 | if channelz.IsOn() { | |
834 | a.t.IncrMsgSent() | |
835 | } | |
836 | return nil | |
837 | } | |
838 | ||
839 | func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) { | |
840 | cs := a.cs | |
841 | if a.statsHandler != nil && payInfo == nil { | |
842 | payInfo = &payloadInfo{} | |
843 | } | |
844 | ||
845 | if !a.decompSet { | |
846 | // Block until we receive headers containing received message encoding. | |
847 | if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity { | |
848 | if a.dc == nil || a.dc.Type() != ct { | |
849 | // No configured decompressor, or it does not match the incoming | |
850 | // message encoding; attempt to find a registered compressor that does. | |
851 | a.dc = nil | |
852 | a.decomp = encoding.GetCompressor(ct) | |
853 | } | |
854 | } else { | |
855 | // No compression is used; disable our decompressor. | |
856 | a.dc = nil | |
15c0b25d | 857 | } |
107c1cdb ND |
858 | // Only initialize this state once per stream. |
859 | a.decompSet = true | |
860 | } | |
861 | err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp) | |
862 | if err != nil { | |
15c0b25d | 863 | if err == io.EOF { |
107c1cdb ND |
864 | if statusErr := a.s.Status().Err(); statusErr != nil { |
865 | return statusErr | |
15c0b25d | 866 | } |
107c1cdb | 867 | return io.EOF // indicates successful end of stream. |
15c0b25d AP |
868 | } |
869 | return toRPCErr(err) | |
870 | } | |
107c1cdb ND |
871 | if EnableTracing { |
872 | a.mu.Lock() | |
873 | if a.trInfo.tr != nil { | |
874 | a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) | |
875 | } | |
876 | a.mu.Unlock() | |
877 | } | |
878 | if a.statsHandler != nil { | |
879 | a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{ | |
880 | Client: true, | |
881 | RecvTime: time.Now(), | |
882 | Payload: m, | |
883 | // TODO truncate large payload. | |
884 | Data: payInfo.uncompressedBytes, | |
885 | Length: len(payInfo.uncompressedBytes), | |
886 | }) | |
887 | } | |
888 | if channelz.IsOn() { | |
889 | a.t.IncrMsgRecv() | |
890 | } | |
891 | if cs.desc.ServerStreams { | |
892 | // Subsequent messages should be received by subsequent RecvMsg calls. | |
893 | return nil | |
894 | } | |
895 | // Special handling for non-server-stream rpcs. | |
896 | // This recv expects EOF or errors, so we don't collect inPayload. | |
897 | err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp) | |
898 | if err == nil { | |
899 | return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) | |
15c0b25d AP |
900 | } |
901 | if err == io.EOF { | |
107c1cdb | 902 | return a.s.Status().Err() // non-server streaming Recv returns nil on success |
15c0b25d AP |
903 | } |
904 | return toRPCErr(err) | |
905 | } | |
906 | ||
107c1cdb ND |
907 | func (a *csAttempt) finish(err error) { |
908 | a.mu.Lock() | |
909 | if a.finished { | |
910 | a.mu.Unlock() | |
911 | return | |
912 | } | |
913 | a.finished = true | |
914 | if err == io.EOF { | |
915 | // Ending a stream with EOF indicates a success. | |
916 | err = nil | |
917 | } | |
918 | if a.s != nil { | |
919 | a.t.CloseStream(a.s, err) | |
920 | } | |
921 | ||
922 | if a.done != nil { | |
923 | br := false | |
924 | var tr metadata.MD | |
925 | if a.s != nil { | |
926 | br = a.s.BytesReceived() | |
927 | tr = a.s.Trailer() | |
928 | } | |
929 | a.done(balancer.DoneInfo{ | |
930 | Err: err, | |
931 | Trailer: tr, | |
932 | BytesSent: a.s != nil, | |
933 | BytesReceived: br, | |
934 | }) | |
935 | } | |
936 | if a.statsHandler != nil { | |
937 | end := &stats.End{ | |
938 | Client: true, | |
939 | BeginTime: a.cs.beginTime, | |
940 | EndTime: time.Now(), | |
941 | Error: err, | |
942 | } | |
943 | a.statsHandler.HandleRPC(a.cs.ctx, end) | |
944 | } | |
945 | if a.trInfo.tr != nil { | |
946 | if err == nil { | |
947 | a.trInfo.tr.LazyPrintf("RPC: [OK]") | |
948 | } else { | |
949 | a.trInfo.tr.LazyPrintf("RPC: [%v]", err) | |
950 | a.trInfo.tr.SetError() | |
951 | } | |
952 | a.trInfo.tr.Finish() | |
953 | a.trInfo.tr = nil | |
954 | } | |
955 | a.mu.Unlock() | |
956 | } | |
957 | ||
958 | func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) { | |
959 | ac.mu.Lock() | |
960 | if ac.transport != t { | |
961 | ac.mu.Unlock() | |
962 | return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") | |
963 | } | |
964 | // transition to CONNECTING state when an attempt starts | |
965 | if ac.state != connectivity.Connecting { | |
966 | ac.updateConnectivityState(connectivity.Connecting) | |
967 | ac.cc.handleSubConnStateChange(ac.acbw, ac.state) | |
968 | } | |
969 | ac.mu.Unlock() | |
970 | ||
971 | if t == nil { | |
972 | // TODO: return RPC error here? | |
973 | return nil, errors.New("transport provided is nil") | |
974 | } | |
975 | // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct. | |
976 | c := &callInfo{} | |
977 | ||
978 | for _, o := range opts { | |
979 | if err := o.before(c); err != nil { | |
980 | return nil, toRPCErr(err) | |
981 | } | |
982 | } | |
983 | c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) | |
984 | c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize) | |
985 | ||
986 | // Possible context leak: | |
987 | // The cancel function for the child context we create will only be called | |
988 | // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if | |
989 | // an error is generated by SendMsg. | |
990 | // https://github.com/grpc/grpc-go/issues/1818. | |
991 | ctx, cancel := context.WithCancel(ctx) | |
15c0b25d AP |
992 | defer func() { |
993 | if err != nil { | |
107c1cdb | 994 | cancel() |
15c0b25d AP |
995 | } |
996 | }() | |
107c1cdb ND |
997 | |
998 | if err := setCallInfoCodec(c); err != nil { | |
999 | return nil, err | |
15c0b25d | 1000 | } |
107c1cdb ND |
1001 | |
1002 | callHdr := &transport.CallHdr{ | |
1003 | Host: ac.cc.authority, | |
1004 | Method: method, | |
1005 | ContentSubtype: c.contentSubtype, | |
15c0b25d | 1006 | } |
107c1cdb ND |
1007 | |
1008 | // Set our outgoing compression according to the UseCompressor CallOption, if | |
1009 | // set. In that case, also find the compressor from the encoding package. | |
1010 | // Otherwise, use the compressor configured by the WithCompressor DialOption, | |
1011 | // if set. | |
1012 | var cp Compressor | |
1013 | var comp encoding.Compressor | |
1014 | if ct := c.compressorType; ct != "" { | |
1015 | callHdr.SendCompress = ct | |
1016 | if ct != encoding.Identity { | |
1017 | comp = encoding.GetCompressor(ct) | |
1018 | if comp == nil { | |
1019 | return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) | |
1020 | } | |
1021 | } | |
1022 | } else if ac.cc.dopts.cp != nil { | |
1023 | callHdr.SendCompress = ac.cc.dopts.cp.Type() | |
1024 | cp = ac.cc.dopts.cp | |
1025 | } | |
1026 | if c.creds != nil { | |
1027 | callHdr.Creds = c.creds | |
1028 | } | |
1029 | ||
1030 | as := &addrConnStream{ | |
1031 | callHdr: callHdr, | |
1032 | ac: ac, | |
1033 | ctx: ctx, | |
1034 | cancel: cancel, | |
1035 | opts: opts, | |
1036 | callInfo: c, | |
1037 | desc: desc, | |
1038 | codec: c.codec, | |
1039 | cp: cp, | |
1040 | comp: comp, | |
1041 | t: t, | |
1042 | } | |
1043 | ||
1044 | as.callInfo.stream = as | |
1045 | s, err := as.t.NewStream(as.ctx, as.callHdr) | |
1046 | if err != nil { | |
1047 | err = toRPCErr(err) | |
1048 | return nil, err | |
1049 | } | |
1050 | as.s = s | |
1051 | as.p = &parser{r: s} | |
1052 | ac.incrCallsStarted() | |
1053 | if desc != unaryStreamDesc { | |
1054 | // Listen on cc and stream contexts to cleanup when the user closes the | |
1055 | // ClientConn or cancels the stream context. In all other cases, an error | |
1056 | // should already be injected into the recv buffer by the transport, which | |
1057 | // the client will eventually receive, and then we will cancel the stream's | |
1058 | // context in clientStream.finish. | |
1059 | go func() { | |
1060 | select { | |
1061 | case <-ac.ctx.Done(): | |
1062 | as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing")) | |
1063 | case <-ctx.Done(): | |
1064 | as.finish(toRPCErr(ctx.Err())) | |
1065 | } | |
1066 | }() | |
1067 | } | |
1068 | return as, nil | |
15c0b25d AP |
1069 | } |
1070 | ||
107c1cdb ND |
1071 | type addrConnStream struct { |
1072 | s *transport.Stream | |
1073 | ac *addrConn | |
1074 | callHdr *transport.CallHdr | |
1075 | cancel context.CancelFunc | |
1076 | opts []CallOption | |
1077 | callInfo *callInfo | |
1078 | t transport.ClientTransport | |
1079 | ctx context.Context | |
1080 | sentLast bool | |
1081 | desc *StreamDesc | |
1082 | codec baseCodec | |
1083 | cp Compressor | |
1084 | comp encoding.Compressor | |
1085 | decompSet bool | |
1086 | dc Decompressor | |
1087 | decomp encoding.Compressor | |
1088 | p *parser | |
1089 | done func(balancer.DoneInfo) | |
1090 | mu sync.Mutex | |
1091 | finished bool | |
1092 | } | |
1093 | ||
1094 | func (as *addrConnStream) Header() (metadata.MD, error) { | |
1095 | m, err := as.s.Header() | |
1096 | if err != nil { | |
1097 | as.finish(toRPCErr(err)) | |
15c0b25d | 1098 | } |
107c1cdb | 1099 | return m, err |
15c0b25d AP |
1100 | } |
1101 | ||
107c1cdb ND |
1102 | func (as *addrConnStream) Trailer() metadata.MD { |
1103 | return as.s.Trailer() | |
1104 | } | |
1105 | ||
1106 | func (as *addrConnStream) CloseSend() error { | |
1107 | if as.sentLast { | |
1108 | // TODO: return an error and finish the stream instead, due to API misuse? | |
1109 | return nil | |
15c0b25d | 1110 | } |
107c1cdb ND |
1111 | as.sentLast = true |
1112 | ||
1113 | as.t.Write(as.s, nil, nil, &transport.Options{Last: true}) | |
1114 | // Always return nil; io.EOF is the only error that might make sense | |
1115 | // instead, but there is no need to signal the client to call RecvMsg | |
1116 | // as the only use left for the stream after CloseSend is to call | |
1117 | // RecvMsg. This also matches historical behavior. | |
1118 | return nil | |
1119 | } | |
1120 | ||
1121 | func (as *addrConnStream) Context() context.Context { | |
1122 | return as.s.Context() | |
1123 | } | |
1124 | ||
1125 | func (as *addrConnStream) SendMsg(m interface{}) (err error) { | |
15c0b25d | 1126 | defer func() { |
107c1cdb ND |
1127 | if err != nil && err != io.EOF { |
1128 | // Call finish on the client stream for errors generated by this SendMsg | |
1129 | // call, as these indicate problems created by this client. (Transport | |
1130 | // errors are converted to an io.EOF error in csAttempt.sendMsg; the real | |
1131 | // error will be returned from RecvMsg eventually in that case, or be | |
1132 | // retried.) | |
1133 | as.finish(err) | |
15c0b25d AP |
1134 | } |
1135 | }() | |
107c1cdb ND |
1136 | if as.sentLast { |
1137 | return status.Errorf(codes.Internal, "SendMsg called after CloseSend") | |
15c0b25d | 1138 | } |
107c1cdb ND |
1139 | if !as.desc.ClientStreams { |
1140 | as.sentLast = true | |
15c0b25d | 1141 | } |
107c1cdb ND |
1142 | data, err := encode(as.codec, m) |
1143 | if err != nil { | |
1144 | return err | |
1145 | } | |
1146 | compData, err := compress(data, as.cp, as.comp) | |
1147 | if err != nil { | |
1148 | return err | |
1149 | } | |
1150 | hdr, payld := msgHeader(data, compData) | |
1151 | // TODO(dfawley): should we be checking len(data) instead? | |
1152 | if len(payld) > *as.callInfo.maxSendMessageSize { | |
1153 | return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize) | |
1154 | } | |
1155 | ||
1156 | if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil { | |
1157 | if !as.desc.ClientStreams { | |
1158 | // For non-client-streaming RPCs, we return nil instead of EOF on error | |
1159 | // because the generated code requires it. finish is not called; RecvMsg() | |
1160 | // will call it with the stream's status independently. | |
1161 | return nil | |
15c0b25d | 1162 | } |
107c1cdb | 1163 | return io.EOF |
15c0b25d | 1164 | } |
107c1cdb ND |
1165 | |
1166 | if channelz.IsOn() { | |
1167 | as.t.IncrMsgSent() | |
15c0b25d | 1168 | } |
107c1cdb ND |
1169 | return nil |
1170 | } | |
1171 | ||
1172 | func (as *addrConnStream) RecvMsg(m interface{}) (err error) { | |
1173 | defer func() { | |
1174 | if err != nil || !as.desc.ServerStreams { | |
1175 | // err != nil or non-server-streaming indicates end of stream. | |
1176 | as.finish(err) | |
1177 | } | |
1178 | }() | |
1179 | ||
1180 | if !as.decompSet { | |
1181 | // Block until we receive headers containing received message encoding. | |
1182 | if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity { | |
1183 | if as.dc == nil || as.dc.Type() != ct { | |
1184 | // No configured decompressor, or it does not match the incoming | |
1185 | // message encoding; attempt to find a registered compressor that does. | |
1186 | as.dc = nil | |
1187 | as.decomp = encoding.GetCompressor(ct) | |
1188 | } | |
15c0b25d | 1189 | } else { |
107c1cdb ND |
1190 | // No compression is used; disable our decompressor. |
1191 | as.dc = nil | |
15c0b25d | 1192 | } |
107c1cdb ND |
1193 | // Only initialize this state once per stream. |
1194 | as.decompSet = true | |
1195 | } | |
1196 | err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) | |
1197 | if err != nil { | |
1198 | if err == io.EOF { | |
1199 | if statusErr := as.s.Status().Err(); statusErr != nil { | |
1200 | return statusErr | |
1201 | } | |
1202 | return io.EOF // indicates successful end of stream. | |
1203 | } | |
1204 | return toRPCErr(err) | |
1205 | } | |
1206 | ||
1207 | if channelz.IsOn() { | |
1208 | as.t.IncrMsgRecv() | |
15c0b25d | 1209 | } |
107c1cdb ND |
1210 | if as.desc.ServerStreams { |
1211 | // Subsequent messages should be received by subsequent RecvMsg calls. | |
1212 | return nil | |
1213 | } | |
1214 | ||
1215 | // Special handling for non-server-stream rpcs. | |
1216 | // This recv expects EOF or errors, so we don't collect inPayload. | |
1217 | err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) | |
1218 | if err == nil { | |
1219 | return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) | |
1220 | } | |
1221 | if err == io.EOF { | |
1222 | return as.s.Status().Err() // non-server streaming Recv returns nil on success | |
1223 | } | |
1224 | return toRPCErr(err) | |
15c0b25d AP |
1225 | } |
1226 | ||
107c1cdb ND |
1227 | func (as *addrConnStream) finish(err error) { |
1228 | as.mu.Lock() | |
1229 | if as.finished { | |
1230 | as.mu.Unlock() | |
1231 | return | |
1232 | } | |
1233 | as.finished = true | |
1234 | if err == io.EOF { | |
1235 | // Ending a stream with EOF indicates a success. | |
1236 | err = nil | |
1237 | } | |
1238 | if as.s != nil { | |
1239 | as.t.CloseStream(as.s, err) | |
1240 | } | |
1241 | ||
1242 | if err != nil { | |
1243 | as.ac.incrCallsFailed() | |
1244 | } else { | |
1245 | as.ac.incrCallsSucceeded() | |
1246 | } | |
1247 | as.cancel() | |
1248 | as.mu.Unlock() | |
1249 | } | |
1250 | ||
1251 | // ServerStream defines the server-side behavior of a streaming RPC. | |
1252 | // | |
1253 | // All errors returned from ServerStream methods are compatible with the | |
1254 | // status package. | |
15c0b25d AP |
1255 | type ServerStream interface { |
1256 | // SetHeader sets the header metadata. It may be called multiple times. | |
1257 | // When call multiple times, all the provided metadata will be merged. | |
1258 | // All the metadata will be sent out when one of the following happens: | |
1259 | // - ServerStream.SendHeader() is called; | |
1260 | // - The first response is sent out; | |
1261 | // - An RPC status is sent out (error or success). | |
1262 | SetHeader(metadata.MD) error | |
1263 | // SendHeader sends the header metadata. | |
1264 | // The provided md and headers set by SetHeader() will be sent. | |
1265 | // It fails if called multiple times. | |
1266 | SendHeader(metadata.MD) error | |
1267 | // SetTrailer sets the trailer metadata which will be sent with the RPC status. | |
1268 | // When called more than once, all the provided metadata will be merged. | |
1269 | SetTrailer(metadata.MD) | |
107c1cdb ND |
1270 | // Context returns the context for this stream. |
1271 | Context() context.Context | |
1272 | // SendMsg sends a message. On error, SendMsg aborts the stream and the | |
1273 | // error is returned directly. | |
1274 | // | |
1275 | // SendMsg blocks until: | |
1276 | // - There is sufficient flow control to schedule m with the transport, or | |
1277 | // - The stream is done, or | |
1278 | // - The stream breaks. | |
1279 | // | |
1280 | // SendMsg does not wait until the message is received by the client. An | |
1281 | // untimely stream closure may result in lost messages. | |
1282 | // | |
1283 | // It is safe to have a goroutine calling SendMsg and another goroutine | |
1284 | // calling RecvMsg on the same stream at the same time, but it is not safe | |
1285 | // to call SendMsg on the same stream in different goroutines. | |
1286 | SendMsg(m interface{}) error | |
1287 | // RecvMsg blocks until it receives a message into m or the stream is | |
1288 | // done. It returns io.EOF when the client has performed a CloseSend. On | |
1289 | // any non-EOF error, the stream is aborted and the error contains the | |
1290 | // RPC status. | |
1291 | // | |
1292 | // It is safe to have a goroutine calling SendMsg and another goroutine | |
1293 | // calling RecvMsg on the same stream at the same time, but it is not | |
1294 | // safe to call RecvMsg on the same stream in different goroutines. | |
1295 | RecvMsg(m interface{}) error | |
15c0b25d AP |
1296 | } |
1297 | ||
1298 | // serverStream implements a server side Stream. | |
1299 | type serverStream struct { | |
107c1cdb ND |
1300 | ctx context.Context |
1301 | t transport.ServerTransport | |
1302 | s *transport.Stream | |
1303 | p *parser | |
1304 | codec baseCodec | |
1305 | ||
1306 | cp Compressor | |
1307 | dc Decompressor | |
1308 | comp encoding.Compressor | |
1309 | decomp encoding.Compressor | |
1310 | ||
15c0b25d AP |
1311 | maxReceiveMessageSize int |
1312 | maxSendMessageSize int | |
1313 | trInfo *traceInfo | |
1314 | ||
1315 | statsHandler stats.Handler | |
1316 | ||
107c1cdb ND |
1317 | binlog *binarylog.MethodLogger |
1318 | // serverHeaderBinlogged indicates whether server header has been logged. It | |
1319 | // will happen when one of the following two happens: stream.SendHeader(), | |
1320 | // stream.Send(). | |
1321 | // | |
1322 | // It's only checked in send and sendHeader, doesn't need to be | |
1323 | // synchronized. | |
1324 | serverHeaderBinlogged bool | |
1325 | ||
15c0b25d AP |
1326 | mu sync.Mutex // protects trInfo.tr after the service handler runs. |
1327 | } | |
1328 | ||
1329 | func (ss *serverStream) Context() context.Context { | |
107c1cdb | 1330 | return ss.ctx |
15c0b25d AP |
1331 | } |
1332 | ||
1333 | func (ss *serverStream) SetHeader(md metadata.MD) error { | |
1334 | if md.Len() == 0 { | |
1335 | return nil | |
1336 | } | |
1337 | return ss.s.SetHeader(md) | |
1338 | } | |
1339 | ||
1340 | func (ss *serverStream) SendHeader(md metadata.MD) error { | |
107c1cdb ND |
1341 | err := ss.t.WriteHeader(ss.s, md) |
1342 | if ss.binlog != nil && !ss.serverHeaderBinlogged { | |
1343 | h, _ := ss.s.Header() | |
1344 | ss.binlog.Log(&binarylog.ServerHeader{ | |
1345 | Header: h, | |
1346 | }) | |
1347 | ss.serverHeaderBinlogged = true | |
1348 | } | |
1349 | return err | |
15c0b25d AP |
1350 | } |
1351 | ||
1352 | func (ss *serverStream) SetTrailer(md metadata.MD) { | |
1353 | if md.Len() == 0 { | |
1354 | return | |
1355 | } | |
1356 | ss.s.SetTrailer(md) | |
15c0b25d AP |
1357 | } |
1358 | ||
1359 | func (ss *serverStream) SendMsg(m interface{}) (err error) { | |
1360 | defer func() { | |
1361 | if ss.trInfo != nil { | |
1362 | ss.mu.Lock() | |
1363 | if ss.trInfo.tr != nil { | |
1364 | if err == nil { | |
1365 | ss.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) | |
1366 | } else { | |
1367 | ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | |
1368 | ss.trInfo.tr.SetError() | |
1369 | } | |
1370 | } | |
1371 | ss.mu.Unlock() | |
1372 | } | |
107c1cdb ND |
1373 | if err != nil && err != io.EOF { |
1374 | st, _ := status.FromError(toRPCErr(err)) | |
1375 | ss.t.WriteStatus(ss.s, st) | |
1376 | // Non-user specified status was sent out. This should be an error | |
1377 | // case (as a server side Cancel maybe). | |
1378 | // | |
1379 | // This is not handled specifically now. User will return a final | |
1380 | // status from the service handler, we will log that error instead. | |
1381 | // This behavior is similar to an interceptor. | |
1382 | } | |
1383 | if channelz.IsOn() && err == nil { | |
1384 | ss.t.IncrMsgSent() | |
15c0b25d AP |
1385 | } |
1386 | }() | |
107c1cdb ND |
1387 | data, err := encode(ss.codec, m) |
1388 | if err != nil { | |
1389 | return err | |
1390 | } | |
1391 | compData, err := compress(data, ss.cp, ss.comp) | |
15c0b25d AP |
1392 | if err != nil { |
1393 | return err | |
1394 | } | |
107c1cdb ND |
1395 | hdr, payload := msgHeader(data, compData) |
1396 | // TODO(dfawley): should we be checking len(data) instead? | |
1397 | if len(payload) > ss.maxSendMessageSize { | |
1398 | return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) | |
15c0b25d | 1399 | } |
107c1cdb | 1400 | if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { |
15c0b25d AP |
1401 | return toRPCErr(err) |
1402 | } | |
107c1cdb ND |
1403 | if ss.binlog != nil { |
1404 | if !ss.serverHeaderBinlogged { | |
1405 | h, _ := ss.s.Header() | |
1406 | ss.binlog.Log(&binarylog.ServerHeader{ | |
1407 | Header: h, | |
1408 | }) | |
1409 | ss.serverHeaderBinlogged = true | |
1410 | } | |
1411 | ss.binlog.Log(&binarylog.ServerMessage{ | |
1412 | Message: data, | |
1413 | }) | |
1414 | } | |
1415 | if ss.statsHandler != nil { | |
1416 | ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now())) | |
15c0b25d AP |
1417 | } |
1418 | return nil | |
1419 | } | |
1420 | ||
1421 | func (ss *serverStream) RecvMsg(m interface{}) (err error) { | |
1422 | defer func() { | |
1423 | if ss.trInfo != nil { | |
1424 | ss.mu.Lock() | |
1425 | if ss.trInfo.tr != nil { | |
1426 | if err == nil { | |
1427 | ss.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) | |
1428 | } else if err != io.EOF { | |
1429 | ss.trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | |
1430 | ss.trInfo.tr.SetError() | |
1431 | } | |
1432 | } | |
1433 | ss.mu.Unlock() | |
1434 | } | |
107c1cdb ND |
1435 | if err != nil && err != io.EOF { |
1436 | st, _ := status.FromError(toRPCErr(err)) | |
1437 | ss.t.WriteStatus(ss.s, st) | |
1438 | // Non-user specified status was sent out. This should be an error | |
1439 | // case (as a server side Cancel maybe). | |
1440 | // | |
1441 | // This is not handled specifically now. User will return a final | |
1442 | // status from the service handler, we will log that error instead. | |
1443 | // This behavior is similar to an interceptor. | |
1444 | } | |
1445 | if channelz.IsOn() && err == nil { | |
1446 | ss.t.IncrMsgRecv() | |
1447 | } | |
15c0b25d | 1448 | }() |
107c1cdb ND |
1449 | var payInfo *payloadInfo |
1450 | if ss.statsHandler != nil || ss.binlog != nil { | |
1451 | payInfo = &payloadInfo{} | |
15c0b25d | 1452 | } |
107c1cdb | 1453 | if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil { |
15c0b25d | 1454 | if err == io.EOF { |
107c1cdb ND |
1455 | if ss.binlog != nil { |
1456 | ss.binlog.Log(&binarylog.ClientHalfClose{}) | |
1457 | } | |
15c0b25d AP |
1458 | return err |
1459 | } | |
1460 | if err == io.ErrUnexpectedEOF { | |
107c1cdb | 1461 | err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) |
15c0b25d AP |
1462 | } |
1463 | return toRPCErr(err) | |
1464 | } | |
107c1cdb ND |
1465 | if ss.statsHandler != nil { |
1466 | ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{ | |
1467 | RecvTime: time.Now(), | |
1468 | Payload: m, | |
1469 | // TODO truncate large payload. | |
1470 | Data: payInfo.uncompressedBytes, | |
1471 | Length: len(payInfo.uncompressedBytes), | |
1472 | }) | |
1473 | } | |
1474 | if ss.binlog != nil { | |
1475 | ss.binlog.Log(&binarylog.ClientMessage{ | |
1476 | Message: payInfo.uncompressedBytes, | |
1477 | }) | |
15c0b25d AP |
1478 | } |
1479 | return nil | |
1480 | } | |
107c1cdb ND |
1481 | |
1482 | // MethodFromServerStream returns the method string for the input stream. | |
1483 | // The returned string is in the format of "/service/method". | |
1484 | func MethodFromServerStream(stream ServerStream) (string, bool) { | |
1485 | return Method(stream.Context()) | |
1486 | } |