diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/call.go')
-rw-r--r-- | vendor/google.golang.org/grpc/call.go | 309 |
1 files changed, 37 insertions, 272 deletions
diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go index 797190f..100f05d 100644 --- a/vendor/google.golang.org/grpc/call.go +++ b/vendor/google.golang.org/grpc/call.go | |||
@@ -19,291 +19,56 @@ | |||
19 | package grpc | 19 | package grpc |
20 | 20 | ||
21 | import ( | 21 | import ( |
22 | "bytes" | 22 | "context" |
23 | "io" | ||
24 | "time" | ||
25 | |||
26 | "golang.org/x/net/context" | ||
27 | "golang.org/x/net/trace" | ||
28 | "google.golang.org/grpc/codes" | ||
29 | "google.golang.org/grpc/peer" | ||
30 | "google.golang.org/grpc/stats" | ||
31 | "google.golang.org/grpc/status" | ||
32 | "google.golang.org/grpc/transport" | ||
33 | ) | 23 | ) |
34 | 24 | ||
35 | // recvResponse receives and parses an RPC response. | 25 | // Invoke sends the RPC request on the wire and returns after response is |
36 | // On error, it returns the error and indicates whether the call should be retried. | 26 | // received. This is typically called by generated code. |
37 | // | 27 | // |
38 | // TODO(zhaoq): Check whether the received message sequence is valid. | 28 | // All errors returned by Invoke are compatible with the status package. |
39 | // TODO ctx is used for stats collection and processing. It is the context passed from the application. | 29 | func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error { |
40 | func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) { | 30 | // allow interceptor to see all applicable call options, which means those |
41 | // Try to acquire header metadata from the server if there is any. | 31 | // configured as defaults from dial option as well as per-call options |
42 | defer func() { | 32 | opts = combine(cc.dopts.callOptions, opts) |
43 | if err != nil { | ||
44 | if _, ok := err.(transport.ConnectionError); !ok { | ||
45 | t.CloseStream(stream, err) | ||
46 | } | ||
47 | } | ||
48 | }() | ||
49 | c.headerMD, err = stream.Header() | ||
50 | if err != nil { | ||
51 | return | ||
52 | } | ||
53 | p := &parser{r: stream} | ||
54 | var inPayload *stats.InPayload | ||
55 | if dopts.copts.StatsHandler != nil { | ||
56 | inPayload = &stats.InPayload{ | ||
57 | Client: true, | ||
58 | } | ||
59 | } | ||
60 | for { | ||
61 | if c.maxReceiveMessageSize == nil { | ||
62 | return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") | ||
63 | } | ||
64 | if err = recv(p, dopts.codec, stream, dopts.dc, reply, *c.maxReceiveMessageSize, inPayload); err != nil { | ||
65 | if err == io.EOF { | ||
66 | break | ||
67 | } | ||
68 | return | ||
69 | } | ||
70 | } | ||
71 | if inPayload != nil && err == io.EOF && stream.Status().Code() == codes.OK { | ||
72 | // TODO in the current implementation, inTrailer may be handled before inPayload in some cases. | ||
73 | // Fix the order if necessary. | ||
74 | dopts.copts.StatsHandler.HandleRPC(ctx, inPayload) | ||
75 | } | ||
76 | c.trailerMD = stream.Trailer() | ||
77 | return nil | ||
78 | } | ||
79 | |||
80 | // sendRequest writes out various information of an RPC such as Context and Message. | ||
81 | func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, c *callInfo, callHdr *transport.CallHdr, stream *transport.Stream, t transport.ClientTransport, args interface{}, opts *transport.Options) (err error) { | ||
82 | defer func() { | ||
83 | if err != nil { | ||
84 | // If err is connection error, t will be closed, no need to close stream here. | ||
85 | if _, ok := err.(transport.ConnectionError); !ok { | ||
86 | t.CloseStream(stream, err) | ||
87 | } | ||
88 | } | ||
89 | }() | ||
90 | var ( | ||
91 | cbuf *bytes.Buffer | ||
92 | outPayload *stats.OutPayload | ||
93 | ) | ||
94 | if compressor != nil { | ||
95 | cbuf = new(bytes.Buffer) | ||
96 | } | ||
97 | if dopts.copts.StatsHandler != nil { | ||
98 | outPayload = &stats.OutPayload{ | ||
99 | Client: true, | ||
100 | } | ||
101 | } | ||
102 | outBuf, err := encode(dopts.codec, args, compressor, cbuf, outPayload) | ||
103 | if err != nil { | ||
104 | return err | ||
105 | } | ||
106 | if c.maxSendMessageSize == nil { | ||
107 | return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") | ||
108 | } | ||
109 | if len(outBuf) > *c.maxSendMessageSize { | ||
110 | return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize) | ||
111 | } | ||
112 | err = t.Write(stream, outBuf, opts) | ||
113 | if err == nil && outPayload != nil { | ||
114 | outPayload.SentTime = time.Now() | ||
115 | dopts.copts.StatsHandler.HandleRPC(ctx, outPayload) | ||
116 | } | ||
117 | // t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method | ||
118 | // does not exist.) so that t.Write could get io.EOF from wait(...). Leave the following | ||
119 | // recvResponse to get the final status. | ||
120 | if err != nil && err != io.EOF { | ||
121 | return err | ||
122 | } | ||
123 | // Sent successfully. | ||
124 | return nil | ||
125 | } | ||
126 | 33 | ||
127 | // Invoke sends the RPC request on the wire and returns after response is received. | ||
128 | // Invoke is called by generated code. Also users can call Invoke directly when it | ||
129 | // is really needed in their use cases. | ||
130 | func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error { | ||
131 | if cc.dopts.unaryInt != nil { | 34 | if cc.dopts.unaryInt != nil { |
132 | return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) | 35 | return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) |
133 | } | 36 | } |
134 | return invoke(ctx, method, args, reply, cc, opts...) | 37 | return invoke(ctx, method, args, reply, cc, opts...) |
135 | } | 38 | } |
136 | 39 | ||
137 | func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { | 40 | func combine(o1 []CallOption, o2 []CallOption) []CallOption { |
138 | c := defaultCallInfo | 41 | // we don't use append because o1 could have extra capacity whose |
139 | mc := cc.GetMethodConfig(method) | 42 | // elements would be overwritten, which could cause inadvertent |
140 | if mc.WaitForReady != nil { | 43 | // sharing (and race connditions) between concurrent calls |
141 | c.failFast = !*mc.WaitForReady | 44 | if len(o1) == 0 { |
142 | } | 45 | return o2 |
143 | 46 | } else if len(o2) == 0 { | |
144 | if mc.Timeout != nil && *mc.Timeout >= 0 { | 47 | return o1 |
145 | var cancel context.CancelFunc | 48 | } |
146 | ctx, cancel = context.WithTimeout(ctx, *mc.Timeout) | 49 | ret := make([]CallOption, len(o1)+len(o2)) |
147 | defer cancel() | 50 | copy(ret, o1) |
148 | } | 51 | copy(ret[len(o1):], o2) |
52 | return ret | ||
53 | } | ||
149 | 54 | ||
150 | opts = append(cc.dopts.callOptions, opts...) | 55 | // Invoke sends the RPC request on the wire and returns after response is |
151 | for _, o := range opts { | 56 | // received. This is typically called by generated code. |
152 | if err := o.before(&c); err != nil { | 57 | // |
153 | return toRPCErr(err) | 58 | // DEPRECATED: Use ClientConn.Invoke instead. |
154 | } | 59 | func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error { |
155 | } | 60 | return cc.Invoke(ctx, method, args, reply, opts...) |
156 | defer func() { | 61 | } |
157 | for _, o := range opts { | ||
158 | o.after(&c) | ||
159 | } | ||
160 | }() | ||
161 | 62 | ||
162 | c.maxSendMessageSize = getMaxSize(mc.MaxReqSize, c.maxSendMessageSize, defaultClientMaxSendMessageSize) | 63 | var unaryStreamDesc = &StreamDesc{ServerStreams: false, ClientStreams: false} |
163 | c.maxReceiveMessageSize = getMaxSize(mc.MaxRespSize, c.maxReceiveMessageSize, defaultClientMaxReceiveMessageSize) | ||
164 | 64 | ||
165 | if EnableTracing { | 65 | func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error { |
166 | c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) | 66 | cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...) |
167 | defer c.traceInfo.tr.Finish() | 67 | if err != nil { |
168 | c.traceInfo.firstLine.client = true | 68 | return err |
169 | if deadline, ok := ctx.Deadline(); ok { | ||
170 | c.traceInfo.firstLine.deadline = deadline.Sub(time.Now()) | ||
171 | } | ||
172 | c.traceInfo.tr.LazyLog(&c.traceInfo.firstLine, false) | ||
173 | // TODO(dsymonds): Arrange for c.traceInfo.firstLine.remoteAddr to be set. | ||
174 | defer func() { | ||
175 | if e != nil { | ||
176 | c.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{e}}, true) | ||
177 | c.traceInfo.tr.SetError() | ||
178 | } | ||
179 | }() | ||
180 | } | ||
181 | ctx = newContextWithRPCInfo(ctx) | ||
182 | sh := cc.dopts.copts.StatsHandler | ||
183 | if sh != nil { | ||
184 | ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast}) | ||
185 | begin := &stats.Begin{ | ||
186 | Client: true, | ||
187 | BeginTime: time.Now(), | ||
188 | FailFast: c.failFast, | ||
189 | } | ||
190 | sh.HandleRPC(ctx, begin) | ||
191 | defer func() { | ||
192 | end := &stats.End{ | ||
193 | Client: true, | ||
194 | EndTime: time.Now(), | ||
195 | Error: e, | ||
196 | } | ||
197 | sh.HandleRPC(ctx, end) | ||
198 | }() | ||
199 | } | ||
200 | topts := &transport.Options{ | ||
201 | Last: true, | ||
202 | Delay: false, | ||
203 | } | 69 | } |
204 | for { | 70 | if err := cs.SendMsg(req); err != nil { |
205 | var ( | 71 | return err |
206 | err error | ||
207 | t transport.ClientTransport | ||
208 | stream *transport.Stream | ||
209 | // Record the put handler from Balancer.Get(...). It is called once the | ||
210 | // RPC has completed or failed. | ||
211 | put func() | ||
212 | ) | ||
213 | // TODO(zhaoq): Need a formal spec of fail-fast. | ||
214 | callHdr := &transport.CallHdr{ | ||
215 | Host: cc.authority, | ||
216 | Method: method, | ||
217 | } | ||
218 | if cc.dopts.cp != nil { | ||
219 | callHdr.SendCompress = cc.dopts.cp.Type() | ||
220 | } | ||
221 | if c.creds != nil { | ||
222 | callHdr.Creds = c.creds | ||
223 | } | ||
224 | |||
225 | gopts := BalancerGetOptions{ | ||
226 | BlockingWait: !c.failFast, | ||
227 | } | ||
228 | t, put, err = cc.getTransport(ctx, gopts) | ||
229 | if err != nil { | ||
230 | // TODO(zhaoq): Probably revisit the error handling. | ||
231 | if _, ok := status.FromError(err); ok { | ||
232 | return err | ||
233 | } | ||
234 | if err == errConnClosing || err == errConnUnavailable { | ||
235 | if c.failFast { | ||
236 | return Errorf(codes.Unavailable, "%v", err) | ||
237 | } | ||
238 | continue | ||
239 | } | ||
240 | // All the other errors are treated as Internal errors. | ||
241 | return Errorf(codes.Internal, "%v", err) | ||
242 | } | ||
243 | if c.traceInfo.tr != nil { | ||
244 | c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) | ||
245 | } | ||
246 | stream, err = t.NewStream(ctx, callHdr) | ||
247 | if err != nil { | ||
248 | if put != nil { | ||
249 | if _, ok := err.(transport.ConnectionError); ok { | ||
250 | // If error is connection error, transport was sending data on wire, | ||
251 | // and we are not sure if anything has been sent on wire. | ||
252 | // If error is not connection error, we are sure nothing has been sent. | ||
253 | updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false}) | ||
254 | } | ||
255 | put() | ||
256 | } | ||
257 | if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { | ||
258 | continue | ||
259 | } | ||
260 | return toRPCErr(err) | ||
261 | } | ||
262 | if peer, ok := peer.FromContext(stream.Context()); ok { | ||
263 | c.peer = peer | ||
264 | } | ||
265 | err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts) | ||
266 | if err != nil { | ||
267 | if put != nil { | ||
268 | updateRPCInfoInContext(ctx, rpcInfo{ | ||
269 | bytesSent: stream.BytesSent(), | ||
270 | bytesReceived: stream.BytesReceived(), | ||
271 | }) | ||
272 | put() | ||
273 | } | ||
274 | // Retry a non-failfast RPC when | ||
275 | // i) there is a connection error; or | ||
276 | // ii) the server started to drain before this RPC was initiated. | ||
277 | if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { | ||
278 | continue | ||
279 | } | ||
280 | return toRPCErr(err) | ||
281 | } | ||
282 | err = recvResponse(ctx, cc.dopts, t, &c, stream, reply) | ||
283 | if err != nil { | ||
284 | if put != nil { | ||
285 | updateRPCInfoInContext(ctx, rpcInfo{ | ||
286 | bytesSent: stream.BytesSent(), | ||
287 | bytesReceived: stream.BytesReceived(), | ||
288 | }) | ||
289 | put() | ||
290 | } | ||
291 | if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { | ||
292 | continue | ||
293 | } | ||
294 | return toRPCErr(err) | ||
295 | } | ||
296 | if c.traceInfo.tr != nil { | ||
297 | c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true) | ||
298 | } | ||
299 | t.CloseStream(stream, nil) | ||
300 | if put != nil { | ||
301 | updateRPCInfoInContext(ctx, rpcInfo{ | ||
302 | bytesSent: stream.BytesSent(), | ||
303 | bytesReceived: stream.BytesReceived(), | ||
304 | }) | ||
305 | put() | ||
306 | } | ||
307 | return stream.Status().Err() | ||
308 | } | 72 | } |
73 | return cs.RecvMsg(reply) | ||
309 | } | 74 | } |