diff options
author | Nathan Dench <ndenc2@gmail.com> | 2019-05-24 15:16:44 +1000 |
---|---|---|
committer | Nathan Dench <ndenc2@gmail.com> | 2019-05-24 15:16:44 +1000 |
commit | 107c1cdb09c575aa2f61d97f48d8587eb6bada4c (patch) | |
tree | ca7d008643efc555c388baeaf1d986e0b6b3e28c /vendor/google.golang.org/grpc/stream.go | |
parent | 844b5a68d8af4791755b8f0ad293cc99f5959183 (diff) | |
download | terraform-provider-statuscake-107c1cdb09c575aa2f61d97f48d8587eb6bada4c.tar.gz terraform-provider-statuscake-107c1cdb09c575aa2f61d97f48d8587eb6bada4c.tar.zst terraform-provider-statuscake-107c1cdb09c575aa2f61d97f48d8587eb6bada4c.zip |
Upgrade to 0.12
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r-- | vendor/google.golang.org/grpc/stream.go | 1539 |
1 files changed, 1182 insertions, 357 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index 1c621ba..d06279a 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go | |||
@@ -19,24 +19,35 @@ | |||
19 | package grpc | 19 | package grpc |
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "bytes" | 22 | "context" |
23 | "errors" | 23 | "errors" |
24 | "io" | 24 | "io" |
25 | "math" | ||
26 | "strconv" | ||
25 | "sync" | 27 | "sync" |
26 | "time" | 28 | "time" |
27 | 29 | ||
28 | "golang.org/x/net/context" | ||
29 | "golang.org/x/net/trace" | 30 | "golang.org/x/net/trace" |
31 | "google.golang.org/grpc/balancer" | ||
30 | "google.golang.org/grpc/codes" | 32 | "google.golang.org/grpc/codes" |
33 | "google.golang.org/grpc/connectivity" | ||
34 | "google.golang.org/grpc/encoding" | ||
35 | "google.golang.org/grpc/grpclog" | ||
36 | "google.golang.org/grpc/internal/binarylog" | ||
37 | "google.golang.org/grpc/internal/channelz" | ||
38 | "google.golang.org/grpc/internal/grpcrand" | ||
39 | "google.golang.org/grpc/internal/transport" | ||
31 | "google.golang.org/grpc/metadata" | 40 | "google.golang.org/grpc/metadata" |
32 | "google.golang.org/grpc/peer" | 41 | "google.golang.org/grpc/peer" |
33 | "google.golang.org/grpc/stats" | 42 | "google.golang.org/grpc/stats" |
34 | "google.golang.org/grpc/status" | 43 | "google.golang.org/grpc/status" |
35 | "google.golang.org/grpc/transport" | ||
36 | ) | 44 | ) |
37 | 45 | ||
38 | // StreamHandler defines the handler called by gRPC server to complete the | 46 | // StreamHandler defines the handler called by gRPC server to complete the |
39 | // execution of a streaming RPC. | 47 | // execution of a streaming RPC. If a StreamHandler returns an error, it |
48 | // should be produced by the status package, or else gRPC will use | ||
49 | // codes.Unknown as the status code and err.Error() as the status message | ||
50 | // of the RPC. | ||
40 | type StreamHandler func(srv interface{}, stream ServerStream) error | 51 | type StreamHandler func(srv interface{}, stream ServerStream) error |
41 | 52 | ||
42 | // StreamDesc represents a streaming RPC service's method specification. | 53 | // StreamDesc represents a streaming RPC service's method specification. |
@@ -50,30 +61,21 @@ type StreamDesc struct { | |||
50 | } | 61 | } |
51 | 62 | ||
52 | // Stream defines the common interface a client or server stream has to satisfy. | 63 | // Stream defines the common interface a client or server stream has to satisfy. |
64 | // | ||
65 | // Deprecated: See ClientStream and ServerStream documentation instead. | ||
53 | type Stream interface { | 66 | type Stream interface { |
54 | // Context returns the context for this stream. | 67 | // Deprecated: See ClientStream and ServerStream documentation instead. |
55 | Context() context.Context | 68 | Context() context.Context |
56 | // SendMsg blocks until it sends m, the stream is done or the stream | 69 | // Deprecated: See ClientStream and ServerStream documentation instead. |
57 | // breaks. | ||
58 | // On error, it aborts the stream and returns an RPC status on client | ||
59 | // side. On server side, it simply returns the error to the caller. | ||
60 | // SendMsg is called by generated code. Also Users can call SendMsg | ||
61 | // directly when it is really needed in their use cases. | ||
62 | // It's safe to have a goroutine calling SendMsg and another goroutine calling | ||
63 | // recvMsg on the same stream at the same time. | ||
64 | // But it is not safe to call SendMsg on the same stream in different goroutines. | ||
65 | SendMsg(m interface{}) error | 70 | SendMsg(m interface{}) error |
66 | // RecvMsg blocks until it receives a message or the stream is | 71 | // Deprecated: See ClientStream and ServerStream documentation instead. |
67 | // done. On client side, it returns io.EOF when the stream is done. On | ||
68 | // any other error, it aborts the stream and returns an RPC status. On | ||
69 | // server side, it simply returns the error to the caller. | ||
70 | // It's safe to have a goroutine calling SendMsg and another goroutine calling | ||
71 | // recvMsg on the same stream at the same time. | ||
72 | // But it is not safe to call RecvMsg on the same stream in different goroutines. | ||
73 | RecvMsg(m interface{}) error | 72 | RecvMsg(m interface{}) error |
74 | } | 73 | } |
75 | 74 | ||
76 | // ClientStream defines the interface a client stream has to satisfy. | 75 | // ClientStream defines the client-side behavior of a streaming RPC. |
76 | // | ||
77 | // All errors returned from ClientStream methods are compatible with the | ||
78 | // status package. | ||
77 | type ClientStream interface { | 79 | type ClientStream interface { |
78 | // Header returns the header metadata received from the server if there | 80 | // Header returns the header metadata received from the server if there |
79 | // is any. It blocks if the metadata is not ready to read. | 81 | // is any. It blocks if the metadata is not ready to read. |
@@ -83,62 +85,147 @@ type ClientStream interface { | |||
83 | // stream.Recv has returned a non-nil error (including io.EOF). | 85 | // stream.Recv has returned a non-nil error (including io.EOF). |
84 | Trailer() metadata.MD | 86 | Trailer() metadata.MD |
85 | // CloseSend closes the send direction of the stream. It closes the stream | 87 | // CloseSend closes the send direction of the stream. It closes the stream |
86 | // when non-nil error is met. | 88 | // when non-nil error is met. It is also not safe to call CloseSend |
89 | // concurrently with SendMsg. | ||
87 | CloseSend() error | 90 | CloseSend() error |
88 | // Stream.SendMsg() may return a non-nil error when something wrong happens sending | 91 | // Context returns the context for this stream. |
89 | // the request. The returned error indicates the status of this sending, not the final | 92 | // |
90 | // status of the RPC. | 93 | // It should not be called until after Header or RecvMsg has returned. Once |
91 | // Always call Stream.RecvMsg() to get the final status if you care about the status of | 94 | // called, subsequent client-side retries are disabled. |
92 | // the RPC. | 95 | Context() context.Context |
93 | Stream | 96 | // SendMsg is generally called by generated code. On error, SendMsg aborts |
97 | // the stream. If the error was generated by the client, the status is | ||
98 | // returned directly; otherwise, io.EOF is returned and the status of | ||
99 | // the stream may be discovered using RecvMsg. | ||
100 | // | ||
101 | // SendMsg blocks until: | ||
102 | // - There is sufficient flow control to schedule m with the transport, or | ||
103 | // - The stream is done, or | ||
104 | // - The stream breaks. | ||
105 | // | ||
106 | // SendMsg does not wait until the message is received by the server. An | ||
107 | // untimely stream closure may result in lost messages. To ensure delivery, | ||
108 | // users should ensure the RPC completed successfully using RecvMsg. | ||
109 | // | ||
110 | // It is safe to have a goroutine calling SendMsg and another goroutine | ||
111 | // calling RecvMsg on the same stream at the same time, but it is not safe | ||
112 | // to call SendMsg on the same stream in different goroutines. It is also | ||
113 | // not safe to call CloseSend concurrently with SendMsg. | ||
114 | SendMsg(m interface{}) error | ||
115 | // RecvMsg blocks until it receives a message into m or the stream is | ||
116 | // done. It returns io.EOF when the stream completes successfully. On | ||
117 | // any other error, the stream is aborted and the error contains the RPC | ||
118 | // status. | ||
119 | // | ||
120 | // It is safe to have a goroutine calling SendMsg and another goroutine | ||
121 | // calling RecvMsg on the same stream at the same time, but it is not | ||
122 | // safe to call RecvMsg on the same stream in different goroutines. | ||
123 | RecvMsg(m interface{}) error | ||
94 | } | 124 | } |
95 | 125 | ||
96 | // NewClientStream creates a new Stream for the client side. This is called | 126 | // NewStream creates a new Stream for the client side. This is typically |
97 | // by generated code. | 127 | // called by generated code. ctx is used for the lifetime of the stream. |
98 | func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { | 128 | // |
129 | // To ensure resources are not leaked due to the stream returned, one of the following | ||
130 | // actions must be performed: | ||
131 | // | ||
132 | // 1. Call Close on the ClientConn. | ||
133 | // 2. Cancel the context provided. | ||
134 | // 3. Call RecvMsg until a non-nil error is returned. A protobuf-generated | ||
135 | // client-streaming RPC, for instance, might use the helper function | ||
136 | // CloseAndRecv (note that CloseSend does not Recv, therefore is not | ||
137 | // guaranteed to release all resources). | ||
138 | // 4. Receive a non-nil, non-io.EOF error from Header or SendMsg. | ||
139 | // | ||
140 | // If none of the above happen, a goroutine and a context will be leaked, and grpc | ||
141 | // will not call the optionally-configured stats handler with a stats.End message. | ||
142 | func (cc *ClientConn) NewStream(ctx context.Context, desc *StreamDesc, method string, opts ...CallOption) (ClientStream, error) { | ||
143 | // allow interceptor to see all applicable call options, which means those | ||
144 | // configured as defaults from dial option as well as per-call options | ||
145 | opts = combine(cc.dopts.callOptions, opts) | ||
146 | |||
99 | if cc.dopts.streamInt != nil { | 147 | if cc.dopts.streamInt != nil { |
100 | return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) | 148 | return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) |
101 | } | 149 | } |
102 | return newClientStream(ctx, desc, cc, method, opts...) | 150 | return newClientStream(ctx, desc, cc, method, opts...) |
103 | } | 151 | } |
104 | 152 | ||
153 | // NewClientStream is a wrapper for ClientConn.NewStream. | ||
154 | func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { | ||
155 | return cc.NewStream(ctx, desc, method, opts...) | ||
156 | } | ||
157 | |||
105 | func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { | 158 | func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { |
106 | var ( | 159 | if channelz.IsOn() { |
107 | t transport.ClientTransport | 160 | cc.incrCallsStarted() |
108 | s *transport.Stream | 161 | defer func() { |
109 | put func() | 162 | if err != nil { |
110 | cancel context.CancelFunc | 163 | cc.incrCallsFailed() |
111 | ) | 164 | } |
112 | c := defaultCallInfo | 165 | }() |
166 | } | ||
167 | c := defaultCallInfo() | ||
168 | // Provide an opportunity for the first RPC to see the first service config | ||
169 | // provided by the resolver. | ||
170 | if err := cc.waitForResolvedAddrs(ctx); err != nil { | ||
171 | return nil, err | ||
172 | } | ||
113 | mc := cc.GetMethodConfig(method) | 173 | mc := cc.GetMethodConfig(method) |
114 | if mc.WaitForReady != nil { | 174 | if mc.WaitForReady != nil { |
115 | c.failFast = !*mc.WaitForReady | 175 | c.failFast = !*mc.WaitForReady |
116 | } | 176 | } |
117 | 177 | ||
118 | if mc.Timeout != nil { | 178 | // Possible context leak: |
179 | // The cancel function for the child context we create will only be called | ||
180 | // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if | ||
181 | // an error is generated by SendMsg. | ||
182 | // https://github.com/grpc/grpc-go/issues/1818. | ||
183 | var cancel context.CancelFunc | ||
184 | if mc.Timeout != nil && *mc.Timeout >= 0 { | ||
119 | ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) | 185 | ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) |
186 | } else { | ||
187 | ctx, cancel = context.WithCancel(ctx) | ||
120 | } | 188 | } |
189 | defer func() { | ||
190 | if err != nil { | ||
191 | cancel() | ||
192 | } | ||
193 | }() | ||
121 | 194 | ||
122 | opts = append(cc.dopts.callOptions, opts...) | ||
123 | for _, o := range opts { | 195 | for _, o := range opts { |
124 | if err := o.before(&c); err != nil { | 196 | if err := o.before(c); err != nil { |
125 | return nil, toRPCErr(err) | 197 | return nil, toRPCErr(err) |
126 | } | 198 | } |
127 | } | 199 | } |
128 | c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) | 200 | c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) |
129 | c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) | 201 | c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) |
202 | if err := setCallInfoCodec(c); err != nil { | ||
203 | return nil, err | ||
204 | } | ||
130 | 205 | ||
131 | callHdr := &transport.CallHdr{ | 206 | callHdr := &transport.CallHdr{ |
132 | Host: cc.authority, | 207 | Host: cc.authority, |
133 | Method: method, | 208 | Method: method, |
134 | // If it's not client streaming, we should already have the request to be sent, | 209 | ContentSubtype: c.contentSubtype, |
135 | // so we don't flush the header. | 210 | } |
136 | // If it's client streaming, the user may never send a request or send it any | 211 | |
137 | // time soon, so we ask the transport to flush the header. | 212 | // Set our outgoing compression according to the UseCompressor CallOption, if |
138 | Flush: desc.ClientStreams, | 213 | // set. In that case, also find the compressor from the encoding package. |
139 | } | 214 | // Otherwise, use the compressor configured by the WithCompressor DialOption, |
140 | if cc.dopts.cp != nil { | 215 | // if set. |
216 | var cp Compressor | ||
217 | var comp encoding.Compressor | ||
218 | if ct := c.compressorType; ct != "" { | ||
219 | callHdr.SendCompress = ct | ||
220 | if ct != encoding.Identity { | ||
221 | comp = encoding.GetCompressor(ct) | ||
222 | if comp == nil { | ||
223 | return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) | ||
224 | } | ||
225 | } | ||
226 | } else if cc.dopts.cp != nil { | ||
141 | callHdr.SendCompress = cc.dopts.cp.Type() | 227 | callHdr.SendCompress = cc.dopts.cp.Type() |
228 | cp = cc.dopts.cp | ||
142 | } | 229 | } |
143 | if c.creds != nil { | 230 | if c.creds != nil { |
144 | callHdr.Creds = c.creds | 231 | callHdr.Creds = c.creds |
@@ -152,380 +239,1019 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth | |||
152 | } | 239 | } |
153 | trInfo.tr.LazyLog(&trInfo.firstLine, false) | 240 | trInfo.tr.LazyLog(&trInfo.firstLine, false) |
154 | ctx = trace.NewContext(ctx, trInfo.tr) | 241 | ctx = trace.NewContext(ctx, trInfo.tr) |
155 | defer func() { | ||
156 | if err != nil { | ||
157 | // Need to call tr.finish() if error is returned. | ||
158 | // Because tr will not be returned to caller. | ||
159 | trInfo.tr.LazyPrintf("RPC: [%v]", err) | ||
160 | trInfo.tr.SetError() | ||
161 | trInfo.tr.Finish() | ||
162 | } | ||
163 | }() | ||
164 | } | 242 | } |
165 | ctx = newContextWithRPCInfo(ctx) | 243 | ctx = newContextWithRPCInfo(ctx, c.failFast) |
166 | sh := cc.dopts.copts.StatsHandler | 244 | sh := cc.dopts.copts.StatsHandler |
245 | var beginTime time.Time | ||
167 | if sh != nil { | 246 | if sh != nil { |
168 | ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) | 247 | ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) |
248 | beginTime = time.Now() | ||
169 | begin := &stats.Begin{ | 249 | begin := &stats.Begin{ |
170 | Client: true, | 250 | Client: true, |
171 | BeginTime: time.Now(), | 251 | BeginTime: beginTime, |
172 | FailFast: c.failFast, | 252 | FailFast: c.failFast, |
173 | } | 253 | } |
174 | sh.HandleRPC(ctx, begin) | 254 | sh.HandleRPC(ctx, begin) |
175 | defer func() { | ||
176 | if err != nil { | ||
177 | // Only handle end stats if err != nil. | ||
178 | end := &stats.End{ | ||
179 | Client: true, | ||
180 | Error: err, | ||
181 | } | ||
182 | sh.HandleRPC(ctx, end) | ||
183 | } | ||
184 | }() | ||
185 | } | 255 | } |
186 | gopts := BalancerGetOptions{ | 256 | |
187 | BlockingWait: !c.failFast, | 257 | cs := &clientStream{ |
258 | callHdr: callHdr, | ||
259 | ctx: ctx, | ||
260 | methodConfig: &mc, | ||
261 | opts: opts, | ||
262 | callInfo: c, | ||
263 | cc: cc, | ||
264 | desc: desc, | ||
265 | codec: c.codec, | ||
266 | cp: cp, | ||
267 | comp: comp, | ||
268 | cancel: cancel, | ||
269 | beginTime: beginTime, | ||
270 | firstAttempt: true, | ||
271 | } | ||
272 | if !cc.dopts.disableRetry { | ||
273 | cs.retryThrottler = cc.retryThrottler.Load().(*retryThrottler) | ||
188 | } | 274 | } |
189 | for { | 275 | cs.binlog = binarylog.GetMethodLogger(method) |
190 | t, put, err = cc.getTransport(ctx, gopts) | ||
191 | if err != nil { | ||
192 | // TODO(zhaoq): Probably revisit the error handling. | ||
193 | if _, ok := status.FromError(err); ok { | ||
194 | return nil, err | ||
195 | } | ||
196 | if err == errConnClosing || err == errConnUnavailable { | ||
197 | if c.failFast { | ||
198 | return nil, Errorf(codes.Unavailable, "%v", err) | ||
199 | } | ||
200 | continue | ||
201 | } | ||
202 | // All the other errors are treated as Internal errors. | ||
203 | return nil, Errorf(codes.Internal, "%v", err) | ||
204 | } | ||
205 | 276 | ||
206 | s, err = t.NewStream(ctx, callHdr) | 277 | cs.callInfo.stream = cs |
207 | if err != nil { | 278 | // Only this initial attempt has stats/tracing. |
208 | if _, ok := err.(transport.ConnectionError); ok && put != nil { | 279 | // TODO(dfawley): move to newAttempt when per-attempt stats are implemented. |
209 | // If error is connection error, transport was sending data on wire, | 280 | if err := cs.newAttemptLocked(sh, trInfo); err != nil { |
210 | // and we are not sure if anything has been sent on wire. | 281 | cs.finish(err) |
211 | // If error is not connection error, we are sure nothing has been sent. | 282 | return nil, err |
212 | updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false}) | 283 | } |
213 | } | 284 | |
214 | if put != nil { | 285 | op := func(a *csAttempt) error { return a.newStream() } |
215 | put() | 286 | if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil { |
216 | put = nil | 287 | cs.finish(err) |
217 | } | 288 | return nil, err |
218 | if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { | 289 | } |
219 | continue | 290 | |
291 | if cs.binlog != nil { | ||
292 | md, _ := metadata.FromOutgoingContext(ctx) | ||
293 | logEntry := &binarylog.ClientHeader{ | ||
294 | OnClientSide: true, | ||
295 | Header: md, | ||
296 | MethodName: method, | ||
297 | Authority: cs.cc.authority, | ||
298 | } | ||
299 | if deadline, ok := ctx.Deadline(); ok { | ||
300 | logEntry.Timeout = deadline.Sub(time.Now()) | ||
301 | if logEntry.Timeout < 0 { | ||
302 | logEntry.Timeout = 0 | ||
220 | } | 303 | } |
221 | return nil, toRPCErr(err) | ||
222 | } | 304 | } |
223 | break | 305 | cs.binlog.Log(logEntry) |
224 | } | 306 | } |
225 | // Set callInfo.peer object from stream's context. | 307 | |
226 | if peer, ok := peer.FromContext(s.Context()); ok { | 308 | if desc != unaryStreamDesc { |
227 | c.peer = peer | 309 | // Listen on cc and stream contexts to cleanup when the user closes the |
310 | // ClientConn or cancels the stream context. In all other cases, an error | ||
311 | // should already be injected into the recv buffer by the transport, which | ||
312 | // the client will eventually receive, and then we will cancel the stream's | ||
313 | // context in clientStream.finish. | ||
314 | go func() { | ||
315 | select { | ||
316 | case <-cc.ctx.Done(): | ||
317 | cs.finish(ErrClientConnClosing) | ||
318 | case <-ctx.Done(): | ||
319 | cs.finish(toRPCErr(ctx.Err())) | ||
320 | } | ||
321 | }() | ||
228 | } | 322 | } |
229 | cs := &clientStream{ | ||
230 | opts: opts, | ||
231 | c: c, | ||
232 | desc: desc, | ||
233 | codec: cc.dopts.codec, | ||
234 | cp: cc.dopts.cp, | ||
235 | dc: cc.dopts.dc, | ||
236 | cancel: cancel, | ||
237 | |||
238 | put: put, | ||
239 | t: t, | ||
240 | s: s, | ||
241 | p: &parser{r: s}, | ||
242 | |||
243 | tracing: EnableTracing, | ||
244 | trInfo: trInfo, | ||
245 | |||
246 | statsCtx: ctx, | ||
247 | statsHandler: cc.dopts.copts.StatsHandler, | ||
248 | } | ||
249 | if cc.dopts.cp != nil { | ||
250 | cs.cbuf = new(bytes.Buffer) | ||
251 | } | ||
252 | // Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination | ||
253 | // when there is no pending I/O operations on this stream. | ||
254 | go func() { | ||
255 | select { | ||
256 | case <-t.Error(): | ||
257 | // Incur transport error, simply exit. | ||
258 | case <-cc.ctx.Done(): | ||
259 | cs.finish(ErrClientConnClosing) | ||
260 | cs.closeTransportStream(ErrClientConnClosing) | ||
261 | case <-s.Done(): | ||
262 | // TODO: The trace of the RPC is terminated here when there is no pending | ||
263 | // I/O, which is probably not the optimal solution. | ||
264 | cs.finish(s.Status().Err()) | ||
265 | cs.closeTransportStream(nil) | ||
266 | case <-s.GoAway(): | ||
267 | cs.finish(errConnDrain) | ||
268 | cs.closeTransportStream(errConnDrain) | ||
269 | case <-s.Context().Done(): | ||
270 | err := s.Context().Err() | ||
271 | cs.finish(err) | ||
272 | cs.closeTransportStream(transport.ContextErr(err)) | ||
273 | } | ||
274 | }() | ||
275 | return cs, nil | 323 | return cs, nil |
276 | } | 324 | } |
277 | 325 | ||
326 | func (cs *clientStream) newAttemptLocked(sh stats.Handler, trInfo traceInfo) error { | ||
327 | cs.attempt = &csAttempt{ | ||
328 | cs: cs, | ||
329 | dc: cs.cc.dopts.dc, | ||
330 | statsHandler: sh, | ||
331 | trInfo: trInfo, | ||
332 | } | ||
333 | |||
334 | if err := cs.ctx.Err(); err != nil { | ||
335 | return toRPCErr(err) | ||
336 | } | ||
337 | t, done, err := cs.cc.getTransport(cs.ctx, cs.callInfo.failFast, cs.callHdr.Method) | ||
338 | if err != nil { | ||
339 | return err | ||
340 | } | ||
341 | cs.attempt.t = t | ||
342 | cs.attempt.done = done | ||
343 | return nil | ||
344 | } | ||
345 | |||
346 | func (a *csAttempt) newStream() error { | ||
347 | cs := a.cs | ||
348 | cs.callHdr.PreviousAttempts = cs.numRetries | ||
349 | s, err := a.t.NewStream(cs.ctx, cs.callHdr) | ||
350 | if err != nil { | ||
351 | return toRPCErr(err) | ||
352 | } | ||
353 | cs.attempt.s = s | ||
354 | cs.attempt.p = &parser{r: s} | ||
355 | return nil | ||
356 | } | ||
357 | |||
278 | // clientStream implements a client side Stream. | 358 | // clientStream implements a client side Stream. |
279 | type clientStream struct { | 359 | type clientStream struct { |
280 | opts []CallOption | 360 | callHdr *transport.CallHdr |
281 | c callInfo | 361 | opts []CallOption |
282 | t transport.ClientTransport | 362 | callInfo *callInfo |
283 | s *transport.Stream | 363 | cc *ClientConn |
284 | p *parser | 364 | desc *StreamDesc |
285 | desc *StreamDesc | 365 | |
286 | codec Codec | 366 | codec baseCodec |
287 | cp Compressor | 367 | cp Compressor |
288 | cbuf *bytes.Buffer | 368 | comp encoding.Compressor |
289 | dc Decompressor | 369 | |
290 | cancel context.CancelFunc | 370 | cancel context.CancelFunc // cancels all attempts |
371 | |||
372 | sentLast bool // sent an end stream | ||
373 | beginTime time.Time | ||
374 | |||
375 | methodConfig *MethodConfig | ||
376 | |||
377 | ctx context.Context // the application's context, wrapped by stats/tracing | ||
291 | 378 | ||
292 | tracing bool // set to EnableTracing when the clientStream is created. | 379 | retryThrottler *retryThrottler // The throttler active when the RPC began. |
293 | 380 | ||
294 | mu sync.Mutex | 381 | binlog *binarylog.MethodLogger // Binary logger, can be nil. |
295 | put func() | 382 | // serverHeaderBinlogged is a boolean for whether server header has been |
296 | closed bool | 383 | // logged. Server header will be logged when the first time one of those |
297 | finished bool | 384 | // happens: stream.Header(), stream.Recv(). |
298 | // trInfo.tr is set when the clientStream is created (if EnableTracing is true), | 385 | // |
299 | // and is set to nil when the clientStream's finish method is called. | 386 | // It's only read and used by Recv() and Header(), so it doesn't need to be |
387 | // synchronized. | ||
388 | serverHeaderBinlogged bool | ||
389 | |||
390 | mu sync.Mutex | ||
391 | firstAttempt bool // if true, transparent retry is valid | ||
392 | numRetries int // exclusive of transparent retry attempt(s) | ||
393 | numRetriesSincePushback int // retries since pushback; to reset backoff | ||
394 | finished bool // TODO: replace with atomic cmpxchg or sync.Once? | ||
395 | attempt *csAttempt // the active client stream attempt | ||
396 | // TODO(hedging): hedging will have multiple attempts simultaneously. | ||
397 | committed bool // active attempt committed for retry? | ||
398 | buffer []func(a *csAttempt) error // operations to replay on retry | ||
399 | bufferSize int // current size of buffer | ||
400 | } | ||
401 | |||
402 | // csAttempt implements a single transport stream attempt within a | ||
403 | // clientStream. | ||
404 | type csAttempt struct { | ||
405 | cs *clientStream | ||
406 | t transport.ClientTransport | ||
407 | s *transport.Stream | ||
408 | p *parser | ||
409 | done func(balancer.DoneInfo) | ||
410 | |||
411 | finished bool | ||
412 | dc Decompressor | ||
413 | decomp encoding.Compressor | ||
414 | decompSet bool | ||
415 | |||
416 | mu sync.Mutex // guards trInfo.tr | ||
417 | // trInfo.tr is set when created (if EnableTracing is true), | ||
418 | // and cleared when the finish method is called. | ||
300 | trInfo traceInfo | 419 | trInfo traceInfo |
301 | 420 | ||
302 | // statsCtx keeps the user context for stats handling. | ||
303 | // All stats collection should use the statsCtx (instead of the stream context) | ||
304 | // so that all the generated stats for a particular RPC can be associated in the processing phase. | ||
305 | statsCtx context.Context | ||
306 | statsHandler stats.Handler | 421 | statsHandler stats.Handler |
307 | } | 422 | } |
308 | 423 | ||
424 | func (cs *clientStream) commitAttemptLocked() { | ||
425 | cs.committed = true | ||
426 | cs.buffer = nil | ||
427 | } | ||
428 | |||
429 | func (cs *clientStream) commitAttempt() { | ||
430 | cs.mu.Lock() | ||
431 | cs.commitAttemptLocked() | ||
432 | cs.mu.Unlock() | ||
433 | } | ||
434 | |||
435 | // shouldRetry returns nil if the RPC should be retried; otherwise it returns | ||
436 | // the error that should be returned by the operation. | ||
437 | func (cs *clientStream) shouldRetry(err error) error { | ||
438 | if cs.attempt.s == nil && !cs.callInfo.failFast { | ||
439 | // In the event of any error from NewStream (attempt.s == nil), we | ||
440 | // never attempted to write anything to the wire, so we can retry | ||
441 | // indefinitely for non-fail-fast RPCs. | ||
442 | return nil | ||
443 | } | ||
444 | if cs.finished || cs.committed { | ||
445 | // RPC is finished or committed; cannot retry. | ||
446 | return err | ||
447 | } | ||
448 | // Wait for the trailers. | ||
449 | if cs.attempt.s != nil { | ||
450 | <-cs.attempt.s.Done() | ||
451 | } | ||
452 | if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) { | ||
453 | // First attempt, wait-for-ready, stream unprocessed: transparently retry. | ||
454 | cs.firstAttempt = false | ||
455 | return nil | ||
456 | } | ||
457 | cs.firstAttempt = false | ||
458 | if cs.cc.dopts.disableRetry { | ||
459 | return err | ||
460 | } | ||
461 | |||
462 | pushback := 0 | ||
463 | hasPushback := false | ||
464 | if cs.attempt.s != nil { | ||
465 | if to, toErr := cs.attempt.s.TrailersOnly(); toErr != nil || !to { | ||
466 | return err | ||
467 | } | ||
468 | |||
469 | // TODO(retry): Move down if the spec changes to not check server pushback | ||
470 | // before considering this a failure for throttling. | ||
471 | sps := cs.attempt.s.Trailer()["grpc-retry-pushback-ms"] | ||
472 | if len(sps) == 1 { | ||
473 | var e error | ||
474 | if pushback, e = strconv.Atoi(sps[0]); e != nil || pushback < 0 { | ||
475 | grpclog.Infof("Server retry pushback specified to abort (%q).", sps[0]) | ||
476 | cs.retryThrottler.throttle() // This counts as a failure for throttling. | ||
477 | return err | ||
478 | } | ||
479 | hasPushback = true | ||
480 | } else if len(sps) > 1 { | ||
481 | grpclog.Warningf("Server retry pushback specified multiple values (%q); not retrying.", sps) | ||
482 | cs.retryThrottler.throttle() // This counts as a failure for throttling. | ||
483 | return err | ||
484 | } | ||
485 | } | ||
486 | |||
487 | var code codes.Code | ||
488 | if cs.attempt.s != nil { | ||
489 | code = cs.attempt.s.Status().Code() | ||
490 | } else { | ||
491 | code = status.Convert(err).Code() | ||
492 | } | ||
493 | |||
494 | rp := cs.methodConfig.retryPolicy | ||
495 | if rp == nil || !rp.retryableStatusCodes[code] { | ||
496 | return err | ||
497 | } | ||
498 | |||
499 | // Note: the ordering here is important; we count this as a failure | ||
500 | // only if the code matched a retryable code. | ||
501 | if cs.retryThrottler.throttle() { | ||
502 | return err | ||
503 | } | ||
504 | if cs.numRetries+1 >= rp.maxAttempts { | ||
505 | return err | ||
506 | } | ||
507 | |||
508 | var dur time.Duration | ||
509 | if hasPushback { | ||
510 | dur = time.Millisecond * time.Duration(pushback) | ||
511 | cs.numRetriesSincePushback = 0 | ||
512 | } else { | ||
513 | fact := math.Pow(rp.backoffMultiplier, float64(cs.numRetriesSincePushback)) | ||
514 | cur := float64(rp.initialBackoff) * fact | ||
515 | if max := float64(rp.maxBackoff); cur > max { | ||
516 | cur = max | ||
517 | } | ||
518 | dur = time.Duration(grpcrand.Int63n(int64(cur))) | ||
519 | cs.numRetriesSincePushback++ | ||
520 | } | ||
521 | |||
522 | // TODO(dfawley): we could eagerly fail here if dur puts us past the | ||
523 | // deadline, but unsure if it is worth doing. | ||
524 | t := time.NewTimer(dur) | ||
525 | select { | ||
526 | case <-t.C: | ||
527 | cs.numRetries++ | ||
528 | return nil | ||
529 | case <-cs.ctx.Done(): | ||
530 | t.Stop() | ||
531 | return status.FromContextError(cs.ctx.Err()).Err() | ||
532 | } | ||
533 | } | ||
534 | |||
535 | // Returns nil if a retry was performed and succeeded; error otherwise. | ||
536 | func (cs *clientStream) retryLocked(lastErr error) error { | ||
537 | for { | ||
538 | cs.attempt.finish(lastErr) | ||
539 | if err := cs.shouldRetry(lastErr); err != nil { | ||
540 | cs.commitAttemptLocked() | ||
541 | return err | ||
542 | } | ||
543 | if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil { | ||
544 | return err | ||
545 | } | ||
546 | if lastErr = cs.replayBufferLocked(); lastErr == nil { | ||
547 | return nil | ||
548 | } | ||
549 | } | ||
550 | } | ||
551 | |||
309 | func (cs *clientStream) Context() context.Context { | 552 | func (cs *clientStream) Context() context.Context { |
310 | return cs.s.Context() | 553 | cs.commitAttempt() |
554 | // No need to lock before using attempt, since we know it is committed and | ||
555 | // cannot change. | ||
556 | return cs.attempt.s.Context() | ||
557 | } | ||
558 | |||
559 | func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error { | ||
560 | cs.mu.Lock() | ||
561 | for { | ||
562 | if cs.committed { | ||
563 | cs.mu.Unlock() | ||
564 | return op(cs.attempt) | ||
565 | } | ||
566 | a := cs.attempt | ||
567 | cs.mu.Unlock() | ||
568 | err := op(a) | ||
569 | cs.mu.Lock() | ||
570 | if a != cs.attempt { | ||
571 | // We started another attempt already. | ||
572 | continue | ||
573 | } | ||
574 | if err == io.EOF { | ||
575 | <-a.s.Done() | ||
576 | } | ||
577 | if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) { | ||
578 | onSuccess() | ||
579 | cs.mu.Unlock() | ||
580 | return err | ||
581 | } | ||
582 | if err := cs.retryLocked(err); err != nil { | ||
583 | cs.mu.Unlock() | ||
584 | return err | ||
585 | } | ||
586 | } | ||
311 | } | 587 | } |
312 | 588 | ||
313 | func (cs *clientStream) Header() (metadata.MD, error) { | 589 | func (cs *clientStream) Header() (metadata.MD, error) { |
314 | m, err := cs.s.Header() | 590 | var m metadata.MD |
591 | err := cs.withRetry(func(a *csAttempt) error { | ||
592 | var err error | ||
593 | m, err = a.s.Header() | ||
594 | return toRPCErr(err) | ||
595 | }, cs.commitAttemptLocked) | ||
315 | if err != nil { | 596 | if err != nil { |
316 | if _, ok := err.(transport.ConnectionError); !ok { | 597 | cs.finish(err) |
317 | cs.closeTransportStream(err) | 598 | return nil, err |
599 | } | ||
600 | if cs.binlog != nil && !cs.serverHeaderBinlogged { | ||
601 | // Only log if binary log is on and header has not been logged. | ||
602 | logEntry := &binarylog.ServerHeader{ | ||
603 | OnClientSide: true, | ||
604 | Header: m, | ||
605 | PeerAddr: nil, | ||
318 | } | 606 | } |
607 | if peer, ok := peer.FromContext(cs.Context()); ok { | ||
608 | logEntry.PeerAddr = peer.Addr | ||
609 | } | ||
610 | cs.binlog.Log(logEntry) | ||
611 | cs.serverHeaderBinlogged = true | ||
319 | } | 612 | } |
320 | return m, err | 613 | return m, err |
321 | } | 614 | } |
322 | 615 | ||
323 | func (cs *clientStream) Trailer() metadata.MD { | 616 | func (cs *clientStream) Trailer() metadata.MD { |
324 | return cs.s.Trailer() | 617 | // On RPC failure, we never need to retry, because usage requires that |
618 | // RecvMsg() returned a non-nil error before calling this function is valid. | ||
619 | // We would have retried earlier if necessary. | ||
620 | // | ||
621 | // Commit the attempt anyway, just in case users are not following those | ||
622 | // directions -- it will prevent races and should not meaningfully impact | ||
623 | // performance. | ||
624 | cs.commitAttempt() | ||
625 | if cs.attempt.s == nil { | ||
626 | return nil | ||
627 | } | ||
628 | return cs.attempt.s.Trailer() | ||
325 | } | 629 | } |
326 | 630 | ||
327 | func (cs *clientStream) SendMsg(m interface{}) (err error) { | 631 | func (cs *clientStream) replayBufferLocked() error { |
328 | if cs.tracing { | 632 | a := cs.attempt |
329 | cs.mu.Lock() | 633 | for _, f := range cs.buffer { |
330 | if cs.trInfo.tr != nil { | 634 | if err := f(a); err != nil { |
331 | cs.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) | 635 | return err |
332 | } | 636 | } |
333 | cs.mu.Unlock() | ||
334 | } | 637 | } |
335 | // TODO Investigate how to signal the stats handling party. | 638 | return nil |
336 | // generate error stats if err != nil && err != io.EOF? | 639 | } |
640 | |||
641 | func (cs *clientStream) bufferForRetryLocked(sz int, op func(a *csAttempt) error) { | ||
642 | // Note: we still will buffer if retry is disabled (for transparent retries). | ||
643 | if cs.committed { | ||
644 | return | ||
645 | } | ||
646 | cs.bufferSize += sz | ||
647 | if cs.bufferSize > cs.callInfo.maxRetryRPCBufferSize { | ||
648 | cs.commitAttemptLocked() | ||
649 | return | ||
650 | } | ||
651 | cs.buffer = append(cs.buffer, op) | ||
652 | } | ||
653 | |||
654 | func (cs *clientStream) SendMsg(m interface{}) (err error) { | ||
337 | defer func() { | 655 | defer func() { |
338 | if err != nil { | 656 | if err != nil && err != io.EOF { |
657 | // Call finish on the client stream for errors generated by this SendMsg | ||
658 | // call, as these indicate problems created by this client. (Transport | ||
659 | // errors are converted to an io.EOF error in csAttempt.sendMsg; the real | ||
660 | // error will be returned from RecvMsg eventually in that case, or be | ||
661 | // retried.) | ||
339 | cs.finish(err) | 662 | cs.finish(err) |
340 | } | 663 | } |
341 | if err == nil { | ||
342 | return | ||
343 | } | ||
344 | if err == io.EOF { | ||
345 | // Specialize the process for server streaming. SendMesg is only called | ||
346 | // once when creating the stream object. io.EOF needs to be skipped when | ||
347 | // the rpc is early finished (before the stream object is created.). | ||
348 | // TODO: It is probably better to move this into the generated code. | ||
349 | if !cs.desc.ClientStreams && cs.desc.ServerStreams { | ||
350 | err = nil | ||
351 | } | ||
352 | return | ||
353 | } | ||
354 | if _, ok := err.(transport.ConnectionError); !ok { | ||
355 | cs.closeTransportStream(err) | ||
356 | } | ||
357 | err = toRPCErr(err) | ||
358 | }() | 664 | }() |
359 | var outPayload *stats.OutPayload | 665 | if cs.sentLast { |
360 | if cs.statsHandler != nil { | 666 | return status.Errorf(codes.Internal, "SendMsg called after CloseSend") |
361 | outPayload = &stats.OutPayload{ | ||
362 | Client: true, | ||
363 | } | ||
364 | } | 667 | } |
365 | out, err := encode(cs.codec, m, cs.cp, cs.cbuf, outPayload) | 668 | if !cs.desc.ClientStreams { |
366 | defer func() { | 669 | cs.sentLast = true |
367 | if cs.cbuf != nil { | 670 | } |
368 | cs.cbuf.Reset() | 671 | data, err := encode(cs.codec, m) |
369 | } | ||
370 | }() | ||
371 | if err != nil { | 672 | if err != nil { |
372 | return err | 673 | return err |
373 | } | 674 | } |
374 | if cs.c.maxSendMessageSize == nil { | 675 | compData, err := compress(data, cs.cp, cs.comp) |
375 | return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") | 676 | if err != nil { |
677 | return err | ||
376 | } | 678 | } |
377 | if len(out) > *cs.c.maxSendMessageSize { | 679 | hdr, payload := msgHeader(data, compData) |
378 | return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(out), *cs.c.maxSendMessageSize) | 680 | // TODO(dfawley): should we be checking len(data) instead? |
681 | if len(payload) > *cs.callInfo.maxSendMessageSize { | ||
682 | return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), *cs.callInfo.maxSendMessageSize) | ||
379 | } | 683 | } |
380 | err = cs.t.Write(cs.s, out, &transport.Options{Last: false}) | 684 | msgBytes := data // Store the pointer before setting to nil. For binary logging. |
381 | if err == nil && outPayload != nil { | 685 | op := func(a *csAttempt) error { |
382 | outPayload.SentTime = time.Now() | 686 | err := a.sendMsg(m, hdr, payload, data) |
383 | cs.statsHandler.HandleRPC(cs.statsCtx, outPayload) | 687 | // nil out the message and uncomp when replaying; they are only needed for |
688 | // stats which is disabled for subsequent attempts. | ||
689 | m, data = nil, nil | ||
690 | return err | ||
384 | } | 691 | } |
385 | return err | 692 | err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) }) |
693 | if cs.binlog != nil && err == nil { | ||
694 | cs.binlog.Log(&binarylog.ClientMessage{ | ||
695 | OnClientSide: true, | ||
696 | Message: msgBytes, | ||
697 | }) | ||
698 | } | ||
699 | return | ||
386 | } | 700 | } |
387 | 701 | ||
388 | func (cs *clientStream) RecvMsg(m interface{}) (err error) { | 702 | func (cs *clientStream) RecvMsg(m interface{}) error { |
389 | var inPayload *stats.InPayload | 703 | if cs.binlog != nil && !cs.serverHeaderBinlogged { |
390 | if cs.statsHandler != nil { | 704 | // Call Header() to binary log header if it's not already logged. |
391 | inPayload = &stats.InPayload{ | 705 | cs.Header() |
392 | Client: true, | ||
393 | } | ||
394 | } | 706 | } |
395 | if cs.c.maxReceiveMessageSize == nil { | 707 | var recvInfo *payloadInfo |
396 | return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") | 708 | if cs.binlog != nil { |
709 | recvInfo = &payloadInfo{} | ||
397 | } | 710 | } |
398 | err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, inPayload) | 711 | err := cs.withRetry(func(a *csAttempt) error { |
399 | defer func() { | 712 | return a.recvMsg(m, recvInfo) |
400 | // err != nil indicates the termination of the stream. | 713 | }, cs.commitAttemptLocked) |
401 | if err != nil { | 714 | if cs.binlog != nil && err == nil { |
402 | cs.finish(err) | 715 | cs.binlog.Log(&binarylog.ServerMessage{ |
716 | OnClientSide: true, | ||
717 | Message: recvInfo.uncompressedBytes, | ||
718 | }) | ||
719 | } | ||
720 | if err != nil || !cs.desc.ServerStreams { | ||
721 | // err != nil or non-server-streaming indicates end of stream. | ||
722 | cs.finish(err) | ||
723 | |||
724 | if cs.binlog != nil { | ||
725 | // finish will not log Trailer. Log Trailer here. | ||
726 | logEntry := &binarylog.ServerTrailer{ | ||
727 | OnClientSide: true, | ||
728 | Trailer: cs.Trailer(), | ||
729 | Err: err, | ||
730 | } | ||
731 | if logEntry.Err == io.EOF { | ||
732 | logEntry.Err = nil | ||
733 | } | ||
734 | if peer, ok := peer.FromContext(cs.Context()); ok { | ||
735 | logEntry.PeerAddr = peer.Addr | ||
736 | } | ||
737 | cs.binlog.Log(logEntry) | ||
403 | } | 738 | } |
404 | }() | 739 | } |
740 | return err | ||
741 | } | ||
742 | |||
743 | func (cs *clientStream) CloseSend() error { | ||
744 | if cs.sentLast { | ||
745 | // TODO: return an error and finish the stream instead, due to API misuse? | ||
746 | return nil | ||
747 | } | ||
748 | cs.sentLast = true | ||
749 | op := func(a *csAttempt) error { | ||
750 | a.t.Write(a.s, nil, nil, &transport.Options{Last: true}) | ||
751 | // Always return nil; io.EOF is the only error that might make sense | ||
752 | // instead, but there is no need to signal the client to call RecvMsg | ||
753 | // as the only use left for the stream after CloseSend is to call | ||
754 | // RecvMsg. This also matches historical behavior. | ||
755 | return nil | ||
756 | } | ||
757 | cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }) | ||
758 | if cs.binlog != nil { | ||
759 | cs.binlog.Log(&binarylog.ClientHalfClose{ | ||
760 | OnClientSide: true, | ||
761 | }) | ||
762 | } | ||
763 | // We never returned an error here for reasons. | ||
764 | return nil | ||
765 | } | ||
766 | |||
767 | func (cs *clientStream) finish(err error) { | ||
768 | if err == io.EOF { | ||
769 | // Ending a stream with EOF indicates a success. | ||
770 | err = nil | ||
771 | } | ||
772 | cs.mu.Lock() | ||
773 | if cs.finished { | ||
774 | cs.mu.Unlock() | ||
775 | return | ||
776 | } | ||
777 | cs.finished = true | ||
778 | cs.commitAttemptLocked() | ||
779 | cs.mu.Unlock() | ||
780 | // For binary logging. only log cancel in finish (could be caused by RPC ctx | ||
781 | // canceled or ClientConn closed). Trailer will be logged in RecvMsg. | ||
782 | // | ||
783 | // Only one of cancel or trailer needs to be logged. In the cases where | ||
784 | // users don't call RecvMsg, users must have already canceled the RPC. | ||
785 | if cs.binlog != nil && status.Code(err) == codes.Canceled { | ||
786 | cs.binlog.Log(&binarylog.Cancel{ | ||
787 | OnClientSide: true, | ||
788 | }) | ||
789 | } | ||
405 | if err == nil { | 790 | if err == nil { |
406 | if cs.tracing { | 791 | cs.retryThrottler.successfulRPC() |
407 | cs.mu.Lock() | 792 | } |
408 | if cs.trInfo.tr != nil { | 793 | if channelz.IsOn() { |
409 | cs.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) | 794 | if err != nil { |
410 | } | 795 | cs.cc.incrCallsFailed() |
411 | cs.mu.Unlock() | 796 | } else { |
797 | cs.cc.incrCallsSucceeded() | ||
412 | } | 798 | } |
413 | if inPayload != nil { | 799 | } |
414 | cs.statsHandler.HandleRPC(cs.statsCtx, inPayload) | 800 | if cs.attempt != nil { |
801 | cs.attempt.finish(err) | ||
802 | } | ||
803 | // after functions all rely upon having a stream. | ||
804 | if cs.attempt.s != nil { | ||
805 | for _, o := range cs.opts { | ||
806 | o.after(cs.callInfo) | ||
415 | } | 807 | } |
416 | if !cs.desc.ClientStreams || cs.desc.ServerStreams { | 808 | } |
417 | return | 809 | cs.cancel() |
810 | } | ||
811 | |||
812 | func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error { | ||
813 | cs := a.cs | ||
814 | if EnableTracing { | ||
815 | a.mu.Lock() | ||
816 | if a.trInfo.tr != nil { | ||
817 | a.trInfo.tr.LazyLog(&payload{sent: true, msg: m}, true) | ||
418 | } | 818 | } |
419 | // Special handling for client streaming rpc. | 819 | a.mu.Unlock() |
420 | // This recv expects EOF or errors, so we don't collect inPayload. | 820 | } |
421 | if cs.c.maxReceiveMessageSize == nil { | 821 | if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil { |
422 | return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") | 822 | if !cs.desc.ClientStreams { |
823 | // For non-client-streaming RPCs, we return nil instead of EOF on error | ||
824 | // because the generated code requires it. finish is not called; RecvMsg() | ||
825 | // will call it with the stream's status independently. | ||
826 | return nil | ||
423 | } | 827 | } |
424 | err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil) | 828 | return io.EOF |
425 | cs.closeTransportStream(err) | 829 | } |
426 | if err == nil { | 830 | if a.statsHandler != nil { |
427 | return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) | 831 | a.statsHandler.HandleRPC(cs.ctx, outPayload(true, m, data, payld, time.Now())) |
832 | } | ||
833 | if channelz.IsOn() { | ||
834 | a.t.IncrMsgSent() | ||
835 | } | ||
836 | return nil | ||
837 | } | ||
838 | |||
839 | func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) { | ||
840 | cs := a.cs | ||
841 | if a.statsHandler != nil && payInfo == nil { | ||
842 | payInfo = &payloadInfo{} | ||
843 | } | ||
844 | |||
845 | if !a.decompSet { | ||
846 | // Block until we receive headers containing received message encoding. | ||
847 | if ct := a.s.RecvCompress(); ct != "" && ct != encoding.Identity { | ||
848 | if a.dc == nil || a.dc.Type() != ct { | ||
849 | // No configured decompressor, or it does not match the incoming | ||
850 | // message encoding; attempt to find a registered compressor that does. | ||
851 | a.dc = nil | ||
852 | a.decomp = encoding.GetCompressor(ct) | ||
853 | } | ||
854 | } else { | ||
855 | // No compression is used; disable our decompressor. | ||
856 | a.dc = nil | ||
428 | } | 857 | } |
858 | // Only initialize this state once per stream. | ||
859 | a.decompSet = true | ||
860 | } | ||
861 | err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp) | ||
862 | if err != nil { | ||
429 | if err == io.EOF { | 863 | if err == io.EOF { |
430 | if se := cs.s.Status().Err(); se != nil { | 864 | if statusErr := a.s.Status().Err(); statusErr != nil { |
431 | return se | 865 | return statusErr |
432 | } | 866 | } |
433 | cs.finish(err) | 867 | return io.EOF // indicates successful end of stream. |
434 | return nil | ||
435 | } | 868 | } |
436 | return toRPCErr(err) | 869 | return toRPCErr(err) |
437 | } | 870 | } |
438 | if _, ok := err.(transport.ConnectionError); !ok { | 871 | if EnableTracing { |
439 | cs.closeTransportStream(err) | 872 | a.mu.Lock() |
873 | if a.trInfo.tr != nil { | ||
874 | a.trInfo.tr.LazyLog(&payload{sent: false, msg: m}, true) | ||
875 | } | ||
876 | a.mu.Unlock() | ||
877 | } | ||
878 | if a.statsHandler != nil { | ||
879 | a.statsHandler.HandleRPC(cs.ctx, &stats.InPayload{ | ||
880 | Client: true, | ||
881 | RecvTime: time.Now(), | ||
882 | Payload: m, | ||
883 | // TODO truncate large payload. | ||
884 | Data: payInfo.uncompressedBytes, | ||
885 | Length: len(payInfo.uncompressedBytes), | ||
886 | }) | ||
887 | } | ||
888 | if channelz.IsOn() { | ||
889 | a.t.IncrMsgRecv() | ||
890 | } | ||
891 | if cs.desc.ServerStreams { | ||
892 | // Subsequent messages should be received by subsequent RecvMsg calls. | ||
893 | return nil | ||
894 | } | ||
895 | // Special handling for non-server-stream rpcs. | ||
896 | // This recv expects EOF or errors, so we don't collect inPayload. | ||
897 | err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, nil, a.decomp) | ||
898 | if err == nil { | ||
899 | return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) | ||
440 | } | 900 | } |
441 | if err == io.EOF { | 901 | if err == io.EOF { |
442 | if statusErr := cs.s.Status().Err(); statusErr != nil { | 902 | return a.s.Status().Err() // non-server streaming Recv returns nil on success |
443 | return statusErr | ||
444 | } | ||
445 | // Returns io.EOF to indicate the end of the stream. | ||
446 | return | ||
447 | } | 903 | } |
448 | return toRPCErr(err) | 904 | return toRPCErr(err) |
449 | } | 905 | } |
450 | 906 | ||
451 | func (cs *clientStream) CloseSend() (err error) { | 907 | func (a *csAttempt) finish(err error) { |
452 | err = cs.t.Write(cs.s, nil, &transport.Options{Last: true}) | 908 | a.mu.Lock() |
909 | if a.finished { | ||
910 | a.mu.Unlock() | ||
911 | return | ||
912 | } | ||
913 | a.finished = true | ||
914 | if err == io.EOF { | ||
915 | // Ending a stream with EOF indicates a success. | ||
916 | err = nil | ||
917 | } | ||
918 | if a.s != nil { | ||
919 | a.t.CloseStream(a.s, err) | ||
920 | } | ||
921 | |||
922 | if a.done != nil { | ||
923 | br := false | ||
924 | var tr metadata.MD | ||
925 | if a.s != nil { | ||
926 | br = a.s.BytesReceived() | ||
927 | tr = a.s.Trailer() | ||
928 | } | ||
929 | a.done(balancer.DoneInfo{ | ||
930 | Err: err, | ||
931 | Trailer: tr, | ||
932 | BytesSent: a.s != nil, | ||
933 | BytesReceived: br, | ||
934 | }) | ||
935 | } | ||
936 | if a.statsHandler != nil { | ||
937 | end := &stats.End{ | ||
938 | Client: true, | ||
939 | BeginTime: a.cs.beginTime, | ||
940 | EndTime: time.Now(), | ||
941 | Error: err, | ||
942 | } | ||
943 | a.statsHandler.HandleRPC(a.cs.ctx, end) | ||
944 | } | ||
945 | if a.trInfo.tr != nil { | ||
946 | if err == nil { | ||
947 | a.trInfo.tr.LazyPrintf("RPC: [OK]") | ||
948 | } else { | ||
949 | a.trInfo.tr.LazyPrintf("RPC: [%v]", err) | ||
950 | a.trInfo.tr.SetError() | ||
951 | } | ||
952 | a.trInfo.tr.Finish() | ||
953 | a.trInfo.tr = nil | ||
954 | } | ||
955 | a.mu.Unlock() | ||
956 | } | ||
957 | |||
958 | func (ac *addrConn) newClientStream(ctx context.Context, desc *StreamDesc, method string, t transport.ClientTransport, opts ...CallOption) (_ ClientStream, err error) { | ||
959 | ac.mu.Lock() | ||
960 | if ac.transport != t { | ||
961 | ac.mu.Unlock() | ||
962 | return nil, status.Error(codes.Canceled, "the provided transport is no longer valid to use") | ||
963 | } | ||
964 | // transition to CONNECTING state when an attempt starts | ||
965 | if ac.state != connectivity.Connecting { | ||
966 | ac.updateConnectivityState(connectivity.Connecting) | ||
967 | ac.cc.handleSubConnStateChange(ac.acbw, ac.state) | ||
968 | } | ||
969 | ac.mu.Unlock() | ||
970 | |||
971 | if t == nil { | ||
972 | // TODO: return RPC error here? | ||
973 | return nil, errors.New("transport provided is nil") | ||
974 | } | ||
975 | // defaultCallInfo contains unnecessary info(i.e. failfast, maxRetryRPCBufferSize), so we just initialize an empty struct. | ||
976 | c := &callInfo{} | ||
977 | |||
978 | for _, o := range opts { | ||
979 | if err := o.before(c); err != nil { | ||
980 | return nil, toRPCErr(err) | ||
981 | } | ||
982 | } | ||
983 | c.maxReceiveMessageSize = getMaxSize(nil, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) | ||
984 | c.maxSendMessageSize = getMaxSize(nil, c.maxSendMessageSize, defaultServerMaxSendMessageSize) | ||
985 | |||
986 | // Possible context leak: | ||
987 | // The cancel function for the child context we create will only be called | ||
988 | // when RecvMsg returns a non-nil error, if the ClientConn is closed, or if | ||
989 | // an error is generated by SendMsg. | ||
990 | // https://github.com/grpc/grpc-go/issues/1818. | ||
991 | ctx, cancel := context.WithCancel(ctx) | ||
453 | defer func() { | 992 | defer func() { |
454 | if err != nil { | 993 | if err != nil { |
455 | cs.finish(err) | 994 | cancel() |
456 | } | 995 | } |
457 | }() | 996 | }() |
458 | if err == nil || err == io.EOF { | 997 | |
459 | return nil | 998 | if err := setCallInfoCodec(c); err != nil { |
999 | return nil, err | ||
460 | } | 1000 | } |
461 | if _, ok := err.(transport.ConnectionError); !ok { | 1001 | |
462 | cs.closeTransportStream(err) | 1002 | callHdr := &transport.CallHdr{ |
1003 | Host: ac.cc.authority, | ||
1004 | Method: method, | ||
1005 | ContentSubtype: c.contentSubtype, | ||
463 | } | 1006 | } |
464 | err = toRPCErr(err) | 1007 | |
465 | return | 1008 | // Set our outgoing compression according to the UseCompressor CallOption, if |
1009 | // set. In that case, also find the compressor from the encoding package. | ||
1010 | // Otherwise, use the compressor configured by the WithCompressor DialOption, | ||
1011 | // if set. | ||
1012 | var cp Compressor | ||
1013 | var comp encoding.Compressor | ||
1014 | if ct := c.compressorType; ct != "" { | ||
1015 | callHdr.SendCompress = ct | ||
1016 | if ct != encoding.Identity { | ||
1017 | comp = encoding.GetCompressor(ct) | ||
1018 | if comp == nil { | ||
1019 | return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) | ||
1020 | } | ||
1021 | } | ||
1022 | } else if ac.cc.dopts.cp != nil { | ||
1023 | callHdr.SendCompress = ac.cc.dopts.cp.Type() | ||
1024 | cp = ac.cc.dopts.cp | ||
1025 | } | ||
1026 | if c.creds != nil { | ||
1027 | callHdr.Creds = c.creds | ||
1028 | } | ||
1029 | |||
1030 | as := &addrConnStream{ | ||
1031 | callHdr: callHdr, | ||
1032 | ac: ac, | ||
1033 | ctx: ctx, | ||
1034 | cancel: cancel, | ||
1035 | opts: opts, | ||
1036 | callInfo: c, | ||
1037 | desc: desc, | ||
1038 | codec: c.codec, | ||
1039 | cp: cp, | ||
1040 | comp: comp, | ||
1041 | t: t, | ||
1042 | } | ||
1043 | |||
1044 | as.callInfo.stream = as | ||
1045 | s, err := as.t.NewStream(as.ctx, as.callHdr) | ||
1046 | if err != nil { | ||
1047 | err = toRPCErr(err) | ||
1048 | return nil, err | ||
1049 | } | ||
1050 | as.s = s | ||
1051 | as.p = &parser{r: s} | ||
1052 | ac.incrCallsStarted() | ||
1053 | if desc != unaryStreamDesc { | ||
1054 | // Listen on cc and stream contexts to cleanup when the user closes the | ||
1055 | // ClientConn or cancels the stream context. In all other cases, an error | ||
1056 | // should already be injected into the recv buffer by the transport, which | ||
1057 | // the client will eventually receive, and then we will cancel the stream's | ||
1058 | // context in clientStream.finish. | ||
1059 | go func() { | ||
1060 | select { | ||
1061 | case <-ac.ctx.Done(): | ||
1062 | as.finish(status.Error(codes.Canceled, "grpc: the SubConn is closing")) | ||
1063 | case <-ctx.Done(): | ||
1064 | as.finish(toRPCErr(ctx.Err())) | ||
1065 | } | ||
1066 | }() | ||
1067 | } | ||
1068 | return as, nil | ||
466 | } | 1069 | } |
467 | 1070 | ||
468 | func (cs *clientStream) closeTransportStream(err error) { | 1071 | type addrConnStream struct { |
469 | cs.mu.Lock() | 1072 | s *transport.Stream |
470 | if cs.closed { | 1073 | ac *addrConn |
471 | cs.mu.Unlock() | 1074 | callHdr *transport.CallHdr |
472 | return | 1075 | cancel context.CancelFunc |
1076 | opts []CallOption | ||
1077 | callInfo *callInfo | ||
1078 | t transport.ClientTransport | ||
1079 | ctx context.Context | ||
1080 | sentLast bool | ||
1081 | desc *StreamDesc | ||
1082 | codec baseCodec | ||
1083 | cp Compressor | ||
1084 | comp encoding.Compressor | ||
1085 | decompSet bool | ||
1086 | dc Decompressor | ||
1087 | decomp encoding.Compressor | ||
1088 | p *parser | ||
1089 | done func(balancer.DoneInfo) | ||
1090 | mu sync.Mutex | ||
1091 | finished bool | ||
1092 | } | ||
1093 | |||
1094 | func (as *addrConnStream) Header() (metadata.MD, error) { | ||
1095 | m, err := as.s.Header() | ||
1096 | if err != nil { | ||
1097 | as.finish(toRPCErr(err)) | ||
473 | } | 1098 | } |
474 | cs.closed = true | 1099 | return m, err |
475 | cs.mu.Unlock() | ||
476 | cs.t.CloseStream(cs.s, err) | ||
477 | } | 1100 | } |
478 | 1101 | ||
479 | func (cs *clientStream) finish(err error) { | 1102 | func (as *addrConnStream) Trailer() metadata.MD { |
480 | cs.mu.Lock() | 1103 | return as.s.Trailer() |
481 | defer cs.mu.Unlock() | 1104 | } |
482 | if cs.finished { | 1105 | |
483 | return | 1106 | func (as *addrConnStream) CloseSend() error { |
1107 | if as.sentLast { | ||
1108 | // TODO: return an error and finish the stream instead, due to API misuse? | ||
1109 | return nil | ||
484 | } | 1110 | } |
485 | cs.finished = true | 1111 | as.sentLast = true |
1112 | |||
1113 | as.t.Write(as.s, nil, nil, &transport.Options{Last: true}) | ||
1114 | // Always return nil; io.EOF is the only error that might make sense | ||
1115 | // instead, but there is no need to signal the client to call RecvMsg | ||
1116 | // as the only use left for the stream after CloseSend is to call | ||
1117 | // RecvMsg. This also matches historical behavior. | ||
1118 | return nil | ||
1119 | } | ||
1120 | |||
1121 | func (as *addrConnStream) Context() context.Context { | ||
1122 | return as.s.Context() | ||
1123 | } | ||
1124 | |||
1125 | func (as *addrConnStream) SendMsg(m interface{}) (err error) { | ||
486 | defer func() { | 1126 | defer func() { |
487 | if cs.cancel != nil { | 1127 | if err != nil && err != io.EOF { |
488 | cs.cancel() | 1128 | // Call finish on the client stream for errors generated by this SendMsg |
1129 | // call, as these indicate problems created by this client. (Transport | ||
1130 | // errors are converted to an io.EOF error in csAttempt.sendMsg; the real | ||
1131 | // error will be returned from RecvMsg eventually in that case, or be | ||
1132 | // retried.) | ||
1133 | as.finish(err) | ||
489 | } | 1134 | } |
490 | }() | 1135 | }() |
491 | for _, o := range cs.opts { | 1136 | if as.sentLast { |
492 | o.after(&cs.c) | 1137 | return status.Errorf(codes.Internal, "SendMsg called after CloseSend") |
493 | } | 1138 | } |
494 | if cs.put != nil { | 1139 | if !as.desc.ClientStreams { |
495 | updateRPCInfoInContext(cs.s.Context(), rpcInfo{ | 1140 | as.sentLast = true |
496 | bytesSent: cs.s.BytesSent(), | ||
497 | bytesReceived: cs.s.BytesReceived(), | ||
498 | }) | ||
499 | cs.put() | ||
500 | cs.put = nil | ||
501 | } | 1141 | } |
502 | if cs.statsHandler != nil { | 1142 | data, err := encode(as.codec, m) |
503 | end := &stats.End{ | 1143 | if err != nil { |
504 | Client: true, | 1144 | return err |
505 | EndTime: time.Now(), | 1145 | } |
506 | } | 1146 | compData, err := compress(data, as.cp, as.comp) |
507 | if err != io.EOF { | 1147 | if err != nil { |
508 | // end.Error is nil if the RPC finished successfully. | 1148 | return err |
509 | end.Error = toRPCErr(err) | 1149 | } |
1150 | hdr, payld := msgHeader(data, compData) | ||
1151 | // TODO(dfawley): should we be checking len(data) instead? | ||
1152 | if len(payld) > *as.callInfo.maxSendMessageSize { | ||
1153 | return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payld), *as.callInfo.maxSendMessageSize) | ||
1154 | } | ||
1155 | |||
1156 | if err := as.t.Write(as.s, hdr, payld, &transport.Options{Last: !as.desc.ClientStreams}); err != nil { | ||
1157 | if !as.desc.ClientStreams { | ||
1158 | // For non-client-streaming RPCs, we return nil instead of EOF on error | ||
1159 | // because the generated code requires it. finish is not called; RecvMsg() | ||
1160 | // will call it with the stream's status independently. | ||
1161 | return nil | ||
510 | } | 1162 | } |
511 | cs.statsHandler.HandleRPC(cs.statsCtx, end) | 1163 | return io.EOF |
512 | } | 1164 | } |
513 | if !cs.tracing { | 1165 | |
514 | return | 1166 | if channelz.IsOn() { |
1167 | as.t.IncrMsgSent() | ||
515 | } | 1168 | } |
516 | if cs.trInfo.tr != nil { | 1169 | return nil |
517 | if err == nil || err == io.EOF { | 1170 | } |
518 | cs.trInfo.tr.LazyPrintf("RPC: [OK]") | 1171 | |
1172 | func (as *addrConnStream) RecvMsg(m interface{}) (err error) { | ||
1173 | defer func() { | ||
1174 | if err != nil || !as.desc.ServerStreams { | ||
1175 | // err != nil or non-server-streaming indicates end of stream. | ||
1176 | as.finish(err) | ||
1177 | } | ||
1178 | }() | ||
1179 | |||
1180 | if !as.decompSet { | ||
1181 | // Block until we receive headers containing received message encoding. | ||
1182 | if ct := as.s.RecvCompress(); ct != "" && ct != encoding.Identity { | ||
1183 | if as.dc == nil || as.dc.Type() != ct { | ||
1184 | // No configured decompressor, or it does not match the incoming | ||
1185 | // message encoding; attempt to find a registered compressor that does. | ||
1186 | as.dc = nil | ||
1187 | as.decomp = encoding.GetCompressor(ct) | ||
1188 | } | ||
519 | } else { | 1189 | } else { |
520 | cs.trInfo.tr.LazyPrintf("RPC: [%v]", err) | 1190 | // No compression is used; disable our decompressor. |
521 | cs.trInfo.tr.SetError() | 1191 | as.dc = nil |
522 | } | 1192 | } |
523 | cs.trInfo.tr.Finish() | 1193 | // Only initialize this state once per stream. |
524 | cs.trInfo.tr = nil | 1194 | as.decompSet = true |
1195 | } | ||
1196 | err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) | ||
1197 | if err != nil { | ||
1198 | if err == io.EOF { | ||
1199 | if statusErr := as.s.Status().Err(); statusErr != nil { | ||
1200 | return statusErr | ||
1201 | } | ||
1202 | return io.EOF // indicates successful end of stream. | ||
1203 | } | ||
1204 | return toRPCErr(err) | ||
1205 | } | ||
1206 | |||
1207 | if channelz.IsOn() { | ||
1208 | as.t.IncrMsgRecv() | ||
525 | } | 1209 | } |
1210 | if as.desc.ServerStreams { | ||
1211 | // Subsequent messages should be received by subsequent RecvMsg calls. | ||
1212 | return nil | ||
1213 | } | ||
1214 | |||
1215 | // Special handling for non-server-stream rpcs. | ||
1216 | // This recv expects EOF or errors, so we don't collect inPayload. | ||
1217 | err = recv(as.p, as.codec, as.s, as.dc, m, *as.callInfo.maxReceiveMessageSize, nil, as.decomp) | ||
1218 | if err == nil { | ||
1219 | return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) | ||
1220 | } | ||
1221 | if err == io.EOF { | ||
1222 | return as.s.Status().Err() // non-server streaming Recv returns nil on success | ||
1223 | } | ||
1224 | return toRPCErr(err) | ||
526 | } | 1225 | } |
527 | 1226 | ||
528 | // ServerStream defines the interface a server stream has to satisfy. | 1227 | func (as *addrConnStream) finish(err error) { |
1228 | as.mu.Lock() | ||
1229 | if as.finished { | ||
1230 | as.mu.Unlock() | ||
1231 | return | ||
1232 | } | ||
1233 | as.finished = true | ||
1234 | if err == io.EOF { | ||
1235 | // Ending a stream with EOF indicates a success. | ||
1236 | err = nil | ||
1237 | } | ||
1238 | if as.s != nil { | ||
1239 | as.t.CloseStream(as.s, err) | ||
1240 | } | ||
1241 | |||
1242 | if err != nil { | ||
1243 | as.ac.incrCallsFailed() | ||
1244 | } else { | ||
1245 | as.ac.incrCallsSucceeded() | ||
1246 | } | ||
1247 | as.cancel() | ||
1248 | as.mu.Unlock() | ||
1249 | } | ||
1250 | |||
1251 | // ServerStream defines the server-side behavior of a streaming RPC. | ||
1252 | // | ||
1253 | // All errors returned from ServerStream methods are compatible with the | ||
1254 | // status package. | ||
529 | type ServerStream interface { | 1255 | type ServerStream interface { |
530 | // SetHeader sets the header metadata. It may be called multiple times. | 1256 | // SetHeader sets the header metadata. It may be called multiple times. |
531 | // When call multiple times, all the provided metadata will be merged. | 1257 | // When call multiple times, all the provided metadata will be merged. |
@@ -541,29 +1267,67 @@ type ServerStream interface { | |||
541 | // SetTrailer sets the trailer metadata which will be sent with the RPC status. | 1267 | // SetTrailer sets the trailer metadata which will be sent with the RPC status. |
542 | // When called more than once, all the provided metadata will be merged. | 1268 | // When called more than once, all the provided metadata will be merged. |
543 | SetTrailer(metadata.MD) | 1269 | SetTrailer(metadata.MD) |
544 | Stream | 1270 | // Context returns the context for this stream. |
1271 | Context() context.Context | ||
1272 | // SendMsg sends a message. On error, SendMsg aborts the stream and the | ||
1273 | // error is returned directly. | ||
1274 | // | ||
1275 | // SendMsg blocks until: | ||
1276 | // - There is sufficient flow control to schedule m with the transport, or | ||
1277 | // - The stream is done, or | ||
1278 | // - The stream breaks. | ||
1279 | // | ||
1280 | // SendMsg does not wait until the message is received by the client. An | ||
1281 | // untimely stream closure may result in lost messages. | ||
1282 | // | ||
1283 | // It is safe to have a goroutine calling SendMsg and another goroutine | ||
1284 | // calling RecvMsg on the same stream at the same time, but it is not safe | ||
1285 | // to call SendMsg on the same stream in different goroutines. | ||
1286 | SendMsg(m interface{}) error | ||
1287 | // RecvMsg blocks until it receives a message into m or the stream is | ||
1288 | // done. It returns io.EOF when the client has performed a CloseSend. On | ||
1289 | // any non-EOF error, the stream is aborted and the error contains the | ||
1290 | // RPC status. | ||
1291 | // | ||
1292 | // It is safe to have a goroutine calling SendMsg and another goroutine | ||
1293 | // calling RecvMsg on the same stream at the same time, but it is not | ||
1294 | // safe to call RecvMsg on the same stream in different goroutines. | ||
1295 | RecvMsg(m interface{}) error | ||
545 | } | 1296 | } |
546 | 1297 | ||
547 | // serverStream implements a server side Stream. | 1298 | // serverStream implements a server side Stream. |
548 | type serverStream struct { | 1299 | type serverStream struct { |
549 | t transport.ServerTransport | 1300 | ctx context.Context |
550 | s *transport.Stream | 1301 | t transport.ServerTransport |
551 | p *parser | 1302 | s *transport.Stream |
552 | codec Codec | 1303 | p *parser |
553 | cp Compressor | 1304 | codec baseCodec |
554 | dc Decompressor | 1305 | |
555 | cbuf *bytes.Buffer | 1306 | cp Compressor |
1307 | dc Decompressor | ||
1308 | comp encoding.Compressor | ||
1309 | decomp encoding.Compressor | ||
1310 | |||
556 | maxReceiveMessageSize int | 1311 | maxReceiveMessageSize int |
557 | maxSendMessageSize int | 1312 | maxSendMessageSize int |
558 | trInfo *traceInfo | 1313 | trInfo *traceInfo |
559 | 1314 | ||
560 | statsHandler stats.Handler | 1315 | statsHandler stats.Handler |
561 | 1316 | ||
1317 | binlog *binarylog.MethodLogger | ||
1318 | // serverHeaderBinlogged indicates whether server header has been logged. It | ||
1319 | // will happen when one of the following two happens: stream.SendHeader(), | ||
1320 | // stream.Send(). | ||
1321 | // | ||
1322 | // It's only checked in send and sendHeader, doesn't need to be | ||
1323 | // synchronized. | ||
1324 | serverHeaderBinlogged bool | ||
1325 | |||
562 | mu sync.Mutex // protects trInfo.tr after the service handler runs. | 1326 | mu sync.Mutex // protects trInfo.tr after the service handler runs. |
563 | } | 1327 | } |
564 | 1328 | ||
565 | func (ss *serverStream) Context() context.Context { | 1329 | func (ss *serverStream) Context() context.Context { |
566 | return ss.s.Context() | 1330 | return ss.ctx |
567 | } | 1331 | } |
568 | 1332 | ||
569 | func (ss *serverStream) SetHeader(md metadata.MD) error { | 1333 | func (ss *serverStream) SetHeader(md metadata.MD) error { |
@@ -574,7 +1338,15 @@ func (ss *serverStream) SetHeader(md metadata.MD) error { | |||
574 | } | 1338 | } |
575 | 1339 | ||
576 | func (ss *serverStream) SendHeader(md metadata.MD) error { | 1340 | func (ss *serverStream) SendHeader(md metadata.MD) error { |
577 | return ss.t.WriteHeader(ss.s, md) | 1341 | err := ss.t.WriteHeader(ss.s, md) |
1342 | if ss.binlog != nil && !ss.serverHeaderBinlogged { | ||
1343 | h, _ := ss.s.Header() | ||
1344 | ss.binlog.Log(&binarylog.ServerHeader{ | ||
1345 | Header: h, | ||
1346 | }) | ||
1347 | ss.serverHeaderBinlogged = true | ||
1348 | } | ||
1349 | return err | ||
578 | } | 1350 | } |
579 | 1351 | ||
580 | func (ss *serverStream) SetTrailer(md metadata.MD) { | 1352 | func (ss *serverStream) SetTrailer(md metadata.MD) { |
@@ -582,7 +1354,6 @@ func (ss *serverStream) SetTrailer(md metadata.MD) { | |||
582 | return | 1354 | return |
583 | } | 1355 | } |
584 | ss.s.SetTrailer(md) | 1356 | ss.s.SetTrailer(md) |
585 | return | ||
586 | } | 1357 | } |
587 | 1358 | ||
588 | func (ss *serverStream) SendMsg(m interface{}) (err error) { | 1359 | func (ss *serverStream) SendMsg(m interface{}) (err error) { |
@@ -599,29 +1370,50 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { | |||
599 | } | 1370 | } |
600 | ss.mu.Unlock() | 1371 | ss.mu.Unlock() |
601 | } | 1372 | } |
602 | }() | 1373 | if err != nil && err != io.EOF { |
603 | var outPayload *stats.OutPayload | 1374 | st, _ := status.FromError(toRPCErr(err)) |
604 | if ss.statsHandler != nil { | 1375 | ss.t.WriteStatus(ss.s, st) |
605 | outPayload = &stats.OutPayload{} | 1376 | // Non-user specified status was sent out. This should be an error |
606 | } | 1377 | // case (as a server side Cancel maybe). |
607 | out, err := encode(ss.codec, m, ss.cp, ss.cbuf, outPayload) | 1378 | // |
608 | defer func() { | 1379 | // This is not handled specifically now. User will return a final |
609 | if ss.cbuf != nil { | 1380 | // status from the service handler, we will log that error instead. |
610 | ss.cbuf.Reset() | 1381 | // This behavior is similar to an interceptor. |
1382 | } | ||
1383 | if channelz.IsOn() && err == nil { | ||
1384 | ss.t.IncrMsgSent() | ||
611 | } | 1385 | } |
612 | }() | 1386 | }() |
1387 | data, err := encode(ss.codec, m) | ||
1388 | if err != nil { | ||
1389 | return err | ||
1390 | } | ||
1391 | compData, err := compress(data, ss.cp, ss.comp) | ||
613 | if err != nil { | 1392 | if err != nil { |
614 | return err | 1393 | return err |
615 | } | 1394 | } |
616 | if len(out) > ss.maxSendMessageSize { | 1395 | hdr, payload := msgHeader(data, compData) |
617 | return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(out), ss.maxSendMessageSize) | 1396 | // TODO(dfawley): should we be checking len(data) instead? |
1397 | if len(payload) > ss.maxSendMessageSize { | ||
1398 | return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(payload), ss.maxSendMessageSize) | ||
618 | } | 1399 | } |
619 | if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil { | 1400 | if err := ss.t.Write(ss.s, hdr, payload, &transport.Options{Last: false}); err != nil { |
620 | return toRPCErr(err) | 1401 | return toRPCErr(err) |
621 | } | 1402 | } |
622 | if outPayload != nil { | 1403 | if ss.binlog != nil { |
623 | outPayload.SentTime = time.Now() | 1404 | if !ss.serverHeaderBinlogged { |
624 | ss.statsHandler.HandleRPC(ss.s.Context(), outPayload) | 1405 | h, _ := ss.s.Header() |
1406 | ss.binlog.Log(&binarylog.ServerHeader{ | ||
1407 | Header: h, | ||
1408 | }) | ||
1409 | ss.serverHeaderBinlogged = true | ||
1410 | } | ||
1411 | ss.binlog.Log(&binarylog.ServerMessage{ | ||
1412 | Message: data, | ||
1413 | }) | ||
1414 | } | ||
1415 | if ss.statsHandler != nil { | ||
1416 | ss.statsHandler.HandleRPC(ss.s.Context(), outPayload(false, m, data, payload, time.Now())) | ||
625 | } | 1417 | } |
626 | return nil | 1418 | return nil |
627 | } | 1419 | } |
@@ -640,22 +1432,55 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { | |||
640 | } | 1432 | } |
641 | ss.mu.Unlock() | 1433 | ss.mu.Unlock() |
642 | } | 1434 | } |
1435 | if err != nil && err != io.EOF { | ||
1436 | st, _ := status.FromError(toRPCErr(err)) | ||
1437 | ss.t.WriteStatus(ss.s, st) | ||
1438 | // Non-user specified status was sent out. This should be an error | ||
1439 | // case (as a server side Cancel maybe). | ||
1440 | // | ||
1441 | // This is not handled specifically now. User will return a final | ||
1442 | // status from the service handler, we will log that error instead. | ||
1443 | // This behavior is similar to an interceptor. | ||
1444 | } | ||
1445 | if channelz.IsOn() && err == nil { | ||
1446 | ss.t.IncrMsgRecv() | ||
1447 | } | ||
643 | }() | 1448 | }() |
644 | var inPayload *stats.InPayload | 1449 | var payInfo *payloadInfo |
645 | if ss.statsHandler != nil { | 1450 | if ss.statsHandler != nil || ss.binlog != nil { |
646 | inPayload = &stats.InPayload{} | 1451 | payInfo = &payloadInfo{} |
647 | } | 1452 | } |
648 | if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, inPayload); err != nil { | 1453 | if err := recv(ss.p, ss.codec, ss.s, ss.dc, m, ss.maxReceiveMessageSize, payInfo, ss.decomp); err != nil { |
649 | if err == io.EOF { | 1454 | if err == io.EOF { |
1455 | if ss.binlog != nil { | ||
1456 | ss.binlog.Log(&binarylog.ClientHalfClose{}) | ||
1457 | } | ||
650 | return err | 1458 | return err |
651 | } | 1459 | } |
652 | if err == io.ErrUnexpectedEOF { | 1460 | if err == io.ErrUnexpectedEOF { |
653 | err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) | 1461 | err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) |
654 | } | 1462 | } |
655 | return toRPCErr(err) | 1463 | return toRPCErr(err) |
656 | } | 1464 | } |
657 | if inPayload != nil { | 1465 | if ss.statsHandler != nil { |
658 | ss.statsHandler.HandleRPC(ss.s.Context(), inPayload) | 1466 | ss.statsHandler.HandleRPC(ss.s.Context(), &stats.InPayload{ |
1467 | RecvTime: time.Now(), | ||
1468 | Payload: m, | ||
1469 | // TODO truncate large payload. | ||
1470 | Data: payInfo.uncompressedBytes, | ||
1471 | Length: len(payInfo.uncompressedBytes), | ||
1472 | }) | ||
1473 | } | ||
1474 | if ss.binlog != nil { | ||
1475 | ss.binlog.Log(&binarylog.ClientMessage{ | ||
1476 | Message: payInfo.uncompressedBytes, | ||
1477 | }) | ||
659 | } | 1478 | } |
660 | return nil | 1479 | return nil |
661 | } | 1480 | } |
1481 | |||
1482 | // MethodFromServerStream returns the method string for the input stream. | ||
1483 | // The returned string is in the format of "/service/method". | ||
1484 | func MethodFromServerStream(stream ServerStream) (string, bool) { | ||
1485 | return Method(stream.Context()) | ||
1486 | } | ||