]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | /* |
2 | * | |
3 | * Copyright 2014 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package grpc | |
20 | ||
21 | import ( | |
22 | "bytes" | |
23 | "compress/gzip" | |
107c1cdb | 24 | "context" |
15c0b25d | 25 | "encoding/binary" |
107c1cdb | 26 | "fmt" |
15c0b25d AP |
27 | "io" |
28 | "io/ioutil" | |
29 | "math" | |
107c1cdb ND |
30 | "net/url" |
31 | "strings" | |
15c0b25d AP |
32 | "sync" |
33 | "time" | |
34 | ||
15c0b25d AP |
35 | "google.golang.org/grpc/codes" |
36 | "google.golang.org/grpc/credentials" | |
107c1cdb ND |
37 | "google.golang.org/grpc/encoding" |
38 | "google.golang.org/grpc/encoding/proto" | |
39 | "google.golang.org/grpc/internal/transport" | |
15c0b25d AP |
40 | "google.golang.org/grpc/metadata" |
41 | "google.golang.org/grpc/peer" | |
42 | "google.golang.org/grpc/stats" | |
43 | "google.golang.org/grpc/status" | |
15c0b25d AP |
44 | ) |
45 | ||
46 | // Compressor defines the interface gRPC uses to compress a message. | |
107c1cdb ND |
47 | // |
48 | // Deprecated: use package encoding. | |
15c0b25d AP |
49 | type Compressor interface { |
50 | // Do compresses p into w. | |
51 | Do(w io.Writer, p []byte) error | |
52 | // Type returns the compression algorithm the Compressor uses. | |
53 | Type() string | |
54 | } | |
55 | ||
56 | type gzipCompressor struct { | |
57 | pool sync.Pool | |
58 | } | |
59 | ||
60 | // NewGZIPCompressor creates a Compressor based on GZIP. | |
107c1cdb ND |
61 | // |
62 | // Deprecated: use package encoding/gzip. | |
15c0b25d | 63 | func NewGZIPCompressor() Compressor { |
107c1cdb ND |
64 | c, _ := NewGZIPCompressorWithLevel(gzip.DefaultCompression) |
65 | return c | |
66 | } | |
67 | ||
68 | // NewGZIPCompressorWithLevel is like NewGZIPCompressor but specifies the gzip compression level instead | |
69 | // of assuming DefaultCompression. | |
70 | // | |
71 | // The error returned will be nil if the level is valid. | |
72 | // | |
73 | // Deprecated: use package encoding/gzip. | |
74 | func NewGZIPCompressorWithLevel(level int) (Compressor, error) { | |
75 | if level < gzip.DefaultCompression || level > gzip.BestCompression { | |
76 | return nil, fmt.Errorf("grpc: invalid compression level: %d", level) | |
77 | } | |
15c0b25d AP |
78 | return &gzipCompressor{ |
79 | pool: sync.Pool{ | |
80 | New: func() interface{} { | |
107c1cdb ND |
81 | w, err := gzip.NewWriterLevel(ioutil.Discard, level) |
82 | if err != nil { | |
83 | panic(err) | |
84 | } | |
85 | return w | |
15c0b25d AP |
86 | }, |
87 | }, | |
107c1cdb | 88 | }, nil |
15c0b25d AP |
89 | } |
90 | ||
91 | func (c *gzipCompressor) Do(w io.Writer, p []byte) error { | |
92 | z := c.pool.Get().(*gzip.Writer) | |
107c1cdb | 93 | defer c.pool.Put(z) |
15c0b25d AP |
94 | z.Reset(w) |
95 | if _, err := z.Write(p); err != nil { | |
96 | return err | |
97 | } | |
98 | return z.Close() | |
99 | } | |
100 | ||
101 | func (c *gzipCompressor) Type() string { | |
102 | return "gzip" | |
103 | } | |
104 | ||
105 | // Decompressor defines the interface gRPC uses to decompress a message. | |
107c1cdb ND |
106 | // |
107 | // Deprecated: use package encoding. | |
15c0b25d AP |
108 | type Decompressor interface { |
109 | // Do reads the data from r and uncompress them. | |
110 | Do(r io.Reader) ([]byte, error) | |
111 | // Type returns the compression algorithm the Decompressor uses. | |
112 | Type() string | |
113 | } | |
114 | ||
115 | type gzipDecompressor struct { | |
116 | pool sync.Pool | |
117 | } | |
118 | ||
119 | // NewGZIPDecompressor creates a Decompressor based on GZIP. | |
107c1cdb ND |
120 | // |
121 | // Deprecated: use package encoding/gzip. | |
15c0b25d AP |
122 | func NewGZIPDecompressor() Decompressor { |
123 | return &gzipDecompressor{} | |
124 | } | |
125 | ||
126 | func (d *gzipDecompressor) Do(r io.Reader) ([]byte, error) { | |
127 | var z *gzip.Reader | |
128 | switch maybeZ := d.pool.Get().(type) { | |
129 | case nil: | |
130 | newZ, err := gzip.NewReader(r) | |
131 | if err != nil { | |
132 | return nil, err | |
133 | } | |
134 | z = newZ | |
135 | case *gzip.Reader: | |
136 | z = maybeZ | |
137 | if err := z.Reset(r); err != nil { | |
138 | d.pool.Put(z) | |
139 | return nil, err | |
140 | } | |
141 | } | |
142 | ||
143 | defer func() { | |
144 | z.Close() | |
145 | d.pool.Put(z) | |
146 | }() | |
147 | return ioutil.ReadAll(z) | |
148 | } | |
149 | ||
150 | func (d *gzipDecompressor) Type() string { | |
151 | return "gzip" | |
152 | } | |
153 | ||
154 | // callInfo contains all related configuration and information about an RPC. | |
155 | type callInfo struct { | |
107c1cdb | 156 | compressorType string |
15c0b25d | 157 | failFast bool |
107c1cdb | 158 | stream ClientStream |
15c0b25d AP |
159 | maxReceiveMessageSize *int |
160 | maxSendMessageSize *int | |
161 | creds credentials.PerRPCCredentials | |
107c1cdb ND |
162 | contentSubtype string |
163 | codec baseCodec | |
164 | maxRetryRPCBufferSize int | |
15c0b25d AP |
165 | } |
166 | ||
107c1cdb ND |
167 | func defaultCallInfo() *callInfo { |
168 | return &callInfo{ | |
169 | failFast: true, | |
170 | maxRetryRPCBufferSize: 256 * 1024, // 256KB | |
171 | } | |
172 | } | |
15c0b25d AP |
173 | |
174 | // CallOption configures a Call before it starts or extracts information from | |
175 | // a Call after it completes. | |
176 | type CallOption interface { | |
177 | // before is called before the call is sent to any server. If before | |
178 | // returns a non-nil error, the RPC fails with that error. | |
179 | before(*callInfo) error | |
180 | ||
181 | // after is called after the call has completed. after cannot return an | |
182 | // error, so any failures should be reported via output parameters. | |
183 | after(*callInfo) | |
184 | } | |
185 | ||
186 | // EmptyCallOption does not alter the Call configuration. | |
187 | // It can be embedded in another structure to carry satellite data for use | |
188 | // by interceptors. | |
189 | type EmptyCallOption struct{} | |
190 | ||
191 | func (EmptyCallOption) before(*callInfo) error { return nil } | |
192 | func (EmptyCallOption) after(*callInfo) {} | |
193 | ||
15c0b25d AP |
194 | // Header returns a CallOptions that retrieves the header metadata |
195 | // for a unary RPC. | |
196 | func Header(md *metadata.MD) CallOption { | |
107c1cdb ND |
197 | return HeaderCallOption{HeaderAddr: md} |
198 | } | |
199 | ||
200 | // HeaderCallOption is a CallOption for collecting response header metadata. | |
201 | // The metadata field will be populated *after* the RPC completes. | |
202 | // This is an EXPERIMENTAL API. | |
203 | type HeaderCallOption struct { | |
204 | HeaderAddr *metadata.MD | |
205 | } | |
206 | ||
207 | func (o HeaderCallOption) before(c *callInfo) error { return nil } | |
208 | func (o HeaderCallOption) after(c *callInfo) { | |
209 | if c.stream != nil { | |
210 | *o.HeaderAddr, _ = c.stream.Header() | |
211 | } | |
15c0b25d AP |
212 | } |
213 | ||
214 | // Trailer returns a CallOptions that retrieves the trailer metadata | |
215 | // for a unary RPC. | |
216 | func Trailer(md *metadata.MD) CallOption { | |
107c1cdb ND |
217 | return TrailerCallOption{TrailerAddr: md} |
218 | } | |
219 | ||
220 | // TrailerCallOption is a CallOption for collecting response trailer metadata. | |
221 | // The metadata field will be populated *after* the RPC completes. | |
222 | // This is an EXPERIMENTAL API. | |
223 | type TrailerCallOption struct { | |
224 | TrailerAddr *metadata.MD | |
225 | } | |
226 | ||
227 | func (o TrailerCallOption) before(c *callInfo) error { return nil } | |
228 | func (o TrailerCallOption) after(c *callInfo) { | |
229 | if c.stream != nil { | |
230 | *o.TrailerAddr = c.stream.Trailer() | |
231 | } | |
232 | } | |
233 | ||
234 | // Peer returns a CallOption that retrieves peer information for a unary RPC. | |
235 | // The peer field will be populated *after* the RPC completes. | |
236 | func Peer(p *peer.Peer) CallOption { | |
237 | return PeerCallOption{PeerAddr: p} | |
15c0b25d AP |
238 | } |
239 | ||
107c1cdb ND |
240 | // PeerCallOption is a CallOption for collecting the identity of the remote |
241 | // peer. The peer field will be populated *after* the RPC completes. | |
242 | // This is an EXPERIMENTAL API. | |
243 | type PeerCallOption struct { | |
244 | PeerAddr *peer.Peer | |
245 | } | |
246 | ||
247 | func (o PeerCallOption) before(c *callInfo) error { return nil } | |
248 | func (o PeerCallOption) after(c *callInfo) { | |
249 | if c.stream != nil { | |
250 | if x, ok := peer.FromContext(c.stream.Context()); ok { | |
251 | *o.PeerAddr = *x | |
15c0b25d | 252 | } |
107c1cdb | 253 | } |
15c0b25d AP |
254 | } |
255 | ||
107c1cdb ND |
256 | // WaitForReady configures the action to take when an RPC is attempted on broken |
257 | // connections or unreachable servers. If waitForReady is false, the RPC will fail | |
15c0b25d | 258 | // immediately. Otherwise, the RPC client will block the call until a |
107c1cdb ND |
259 | // connection is available (or the call is canceled or times out) and will |
260 | // retry the call if it fails due to a transient error. gRPC will not retry if | |
261 | // data was written to the wire unless the server indicates it did not process | |
262 | // the data. Please refer to | |
15c0b25d | 263 | // https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md. |
107c1cdb ND |
264 | // |
265 | // By default, RPCs don't "wait for ready". | |
266 | func WaitForReady(waitForReady bool) CallOption { | |
267 | return FailFastCallOption{FailFast: !waitForReady} | |
268 | } | |
269 | ||
270 | // FailFast is the opposite of WaitForReady. | |
271 | // | |
272 | // Deprecated: use WaitForReady. | |
15c0b25d | 273 | func FailFast(failFast bool) CallOption { |
107c1cdb | 274 | return FailFastCallOption{FailFast: failFast} |
15c0b25d AP |
275 | } |
276 | ||
107c1cdb ND |
277 | // FailFastCallOption is a CallOption for indicating whether an RPC should fail |
278 | // fast or not. | |
279 | // This is an EXPERIMENTAL API. | |
280 | type FailFastCallOption struct { | |
281 | FailFast bool | |
282 | } | |
283 | ||
284 | func (o FailFastCallOption) before(c *callInfo) error { | |
285 | c.failFast = o.FailFast | |
286 | return nil | |
287 | } | |
288 | func (o FailFastCallOption) after(c *callInfo) {} | |
289 | ||
15c0b25d AP |
290 | // MaxCallRecvMsgSize returns a CallOption which sets the maximum message size the client can receive. |
291 | func MaxCallRecvMsgSize(s int) CallOption { | |
107c1cdb ND |
292 | return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: s} |
293 | } | |
294 | ||
295 | // MaxRecvMsgSizeCallOption is a CallOption that indicates the maximum message | |
296 | // size the client can receive. | |
297 | // This is an EXPERIMENTAL API. | |
298 | type MaxRecvMsgSizeCallOption struct { | |
299 | MaxRecvMsgSize int | |
15c0b25d AP |
300 | } |
301 | ||
107c1cdb ND |
302 | func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error { |
303 | c.maxReceiveMessageSize = &o.MaxRecvMsgSize | |
304 | return nil | |
305 | } | |
306 | func (o MaxRecvMsgSizeCallOption) after(c *callInfo) {} | |
307 | ||
15c0b25d AP |
308 | // MaxCallSendMsgSize returns a CallOption which sets the maximum message size the client can send. |
309 | func MaxCallSendMsgSize(s int) CallOption { | |
107c1cdb ND |
310 | return MaxSendMsgSizeCallOption{MaxSendMsgSize: s} |
311 | } | |
312 | ||
313 | // MaxSendMsgSizeCallOption is a CallOption that indicates the maximum message | |
314 | // size the client can send. | |
315 | // This is an EXPERIMENTAL API. | |
316 | type MaxSendMsgSizeCallOption struct { | |
317 | MaxSendMsgSize int | |
15c0b25d AP |
318 | } |
319 | ||
107c1cdb ND |
320 | func (o MaxSendMsgSizeCallOption) before(c *callInfo) error { |
321 | c.maxSendMessageSize = &o.MaxSendMsgSize | |
322 | return nil | |
323 | } | |
324 | func (o MaxSendMsgSizeCallOption) after(c *callInfo) {} | |
325 | ||
15c0b25d AP |
326 | // PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials |
327 | // for a call. | |
328 | func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption { | |
107c1cdb ND |
329 | return PerRPCCredsCallOption{Creds: creds} |
330 | } | |
331 | ||
332 | // PerRPCCredsCallOption is a CallOption that indicates the per-RPC | |
333 | // credentials to use for the call. | |
334 | // This is an EXPERIMENTAL API. | |
335 | type PerRPCCredsCallOption struct { | |
336 | Creds credentials.PerRPCCredentials | |
337 | } | |
338 | ||
339 | func (o PerRPCCredsCallOption) before(c *callInfo) error { | |
340 | c.creds = o.Creds | |
341 | return nil | |
342 | } | |
343 | func (o PerRPCCredsCallOption) after(c *callInfo) {} | |
344 | ||
345 | // UseCompressor returns a CallOption which sets the compressor used when | |
346 | // sending the request. If WithCompressor is also set, UseCompressor has | |
347 | // higher priority. | |
348 | // | |
349 | // This API is EXPERIMENTAL. | |
350 | func UseCompressor(name string) CallOption { | |
351 | return CompressorCallOption{CompressorType: name} | |
352 | } | |
353 | ||
354 | // CompressorCallOption is a CallOption that indicates the compressor to use. | |
355 | // This is an EXPERIMENTAL API. | |
356 | type CompressorCallOption struct { | |
357 | CompressorType string | |
358 | } | |
359 | ||
360 | func (o CompressorCallOption) before(c *callInfo) error { | |
361 | c.compressorType = o.CompressorType | |
362 | return nil | |
363 | } | |
364 | func (o CompressorCallOption) after(c *callInfo) {} | |
365 | ||
366 | // CallContentSubtype returns a CallOption that will set the content-subtype | |
367 | // for a call. For example, if content-subtype is "json", the Content-Type over | |
368 | // the wire will be "application/grpc+json". The content-subtype is converted | |
369 | // to lowercase before being included in Content-Type. See Content-Type on | |
370 | // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for | |
371 | // more details. | |
372 | // | |
373 | // If CallCustomCodec is not also used, the content-subtype will be used to | |
374 | // look up the Codec to use in the registry controlled by RegisterCodec. See | |
375 | // the documentation on RegisterCodec for details on registration. The lookup | |
376 | // of content-subtype is case-insensitive. If no such Codec is found, the call | |
377 | // will result in an error with code codes.Internal. | |
378 | // | |
379 | // If CallCustomCodec is also used, that Codec will be used for all request and | |
380 | // response messages, with the content-subtype set to the given contentSubtype | |
381 | // here for requests. | |
382 | func CallContentSubtype(contentSubtype string) CallOption { | |
383 | return ContentSubtypeCallOption{ContentSubtype: strings.ToLower(contentSubtype)} | |
384 | } | |
385 | ||
386 | // ContentSubtypeCallOption is a CallOption that indicates the content-subtype | |
387 | // used for marshaling messages. | |
388 | // This is an EXPERIMENTAL API. | |
389 | type ContentSubtypeCallOption struct { | |
390 | ContentSubtype string | |
391 | } | |
392 | ||
393 | func (o ContentSubtypeCallOption) before(c *callInfo) error { | |
394 | c.contentSubtype = o.ContentSubtype | |
395 | return nil | |
396 | } | |
397 | func (o ContentSubtypeCallOption) after(c *callInfo) {} | |
398 | ||
399 | // CallCustomCodec returns a CallOption that will set the given Codec to be | |
400 | // used for all request and response messages for a call. The result of calling | |
401 | // String() will be used as the content-subtype in a case-insensitive manner. | |
402 | // | |
403 | // See Content-Type on | |
404 | // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for | |
405 | // more details. Also see the documentation on RegisterCodec and | |
406 | // CallContentSubtype for more details on the interaction between Codec and | |
407 | // content-subtype. | |
408 | // | |
409 | // This function is provided for advanced users; prefer to use only | |
410 | // CallContentSubtype to select a registered codec instead. | |
411 | func CallCustomCodec(codec Codec) CallOption { | |
412 | return CustomCodecCallOption{Codec: codec} | |
413 | } | |
414 | ||
415 | // CustomCodecCallOption is a CallOption that indicates the codec used for | |
416 | // marshaling messages. | |
417 | // This is an EXPERIMENTAL API. | |
418 | type CustomCodecCallOption struct { | |
419 | Codec Codec | |
420 | } | |
421 | ||
422 | func (o CustomCodecCallOption) before(c *callInfo) error { | |
423 | c.codec = o.Codec | |
424 | return nil | |
425 | } | |
426 | func (o CustomCodecCallOption) after(c *callInfo) {} | |
427 | ||
428 | // MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory | |
429 | // used for buffering this RPC's requests for retry purposes. | |
430 | // | |
431 | // This API is EXPERIMENTAL. | |
432 | func MaxRetryRPCBufferSize(bytes int) CallOption { | |
433 | return MaxRetryRPCBufferSizeCallOption{bytes} | |
434 | } | |
435 | ||
436 | // MaxRetryRPCBufferSizeCallOption is a CallOption indicating the amount of | |
437 | // memory to be used for caching this RPC for retry purposes. | |
438 | // This is an EXPERIMENTAL API. | |
439 | type MaxRetryRPCBufferSizeCallOption struct { | |
440 | MaxRetryRPCBufferSize int | |
441 | } | |
442 | ||
443 | func (o MaxRetryRPCBufferSizeCallOption) before(c *callInfo) error { | |
444 | c.maxRetryRPCBufferSize = o.MaxRetryRPCBufferSize | |
445 | return nil | |
15c0b25d | 446 | } |
107c1cdb | 447 | func (o MaxRetryRPCBufferSizeCallOption) after(c *callInfo) {} |
15c0b25d AP |
448 | |
449 | // The format of the payload: compressed or not? | |
450 | type payloadFormat uint8 | |
451 | ||
452 | const ( | |
107c1cdb ND |
453 | compressionNone payloadFormat = 0 // no compression |
454 | compressionMade payloadFormat = 1 // compressed | |
15c0b25d AP |
455 | ) |
456 | ||
457 | // parser reads complete gRPC messages from the underlying reader. | |
458 | type parser struct { | |
459 | // r is the underlying reader. | |
460 | // See the comment on recvMsg for the permissible | |
461 | // error types. | |
462 | r io.Reader | |
463 | ||
107c1cdb ND |
464 | // The header of a gRPC message. Find more detail at |
465 | // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md | |
15c0b25d AP |
466 | header [5]byte |
467 | } | |
468 | ||
469 | // recvMsg reads a complete gRPC message from the stream. | |
470 | // | |
471 | // It returns the message and its payload (compression/encoding) | |
472 | // format. The caller owns the returned msg memory. | |
473 | // | |
474 | // If there is an error, possible values are: | |
475 | // * io.EOF, when no messages remain | |
476 | // * io.ErrUnexpectedEOF | |
477 | // * of type transport.ConnectionError | |
107c1cdb | 478 | // * an error from the status package |
15c0b25d AP |
479 | // No other error values or types must be returned, which also means |
480 | // that the underlying io.Reader must not return an incompatible | |
481 | // error. | |
482 | func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byte, err error) { | |
483 | if _, err := p.r.Read(p.header[:]); err != nil { | |
484 | return 0, nil, err | |
485 | } | |
486 | ||
487 | pf = payloadFormat(p.header[0]) | |
488 | length := binary.BigEndian.Uint32(p.header[1:]) | |
489 | ||
490 | if length == 0 { | |
491 | return pf, nil, nil | |
492 | } | |
107c1cdb ND |
493 | if int64(length) > int64(maxInt) { |
494 | return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt) | |
495 | } | |
496 | if int(length) > maxReceiveMessageSize { | |
497 | return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) | |
15c0b25d AP |
498 | } |
499 | // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead | |
500 | // of making it for each message: | |
501 | msg = make([]byte, int(length)) | |
502 | if _, err := p.r.Read(msg); err != nil { | |
503 | if err == io.EOF { | |
504 | err = io.ErrUnexpectedEOF | |
505 | } | |
506 | return 0, nil, err | |
507 | } | |
508 | return pf, msg, nil | |
509 | } | |
510 | ||
107c1cdb ND |
511 | // encode serializes msg and returns a buffer containing the message, or an |
512 | // error if it is too large to be transmitted by grpc. If msg is nil, it | |
513 | // generates an empty message. | |
514 | func encode(c baseCodec, msg interface{}) ([]byte, error) { | |
515 | if msg == nil { // NOTE: typed nils will not be caught by this check | |
516 | return nil, nil | |
517 | } | |
518 | b, err := c.Marshal(msg) | |
519 | if err != nil { | |
520 | return nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error()) | |
521 | } | |
522 | if uint(len(b)) > math.MaxUint32 { | |
523 | return nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b)) | |
524 | } | |
525 | return b, nil | |
526 | } | |
527 | ||
528 | // compress returns the input bytes compressed by compressor or cp. If both | |
529 | // compressors are nil, returns nil. | |
530 | // | |
531 | // TODO(dfawley): eliminate cp parameter by wrapping Compressor in an encoding.Compressor. | |
532 | func compress(in []byte, cp Compressor, compressor encoding.Compressor) ([]byte, error) { | |
533 | if compressor == nil && cp == nil { | |
534 | return nil, nil | |
535 | } | |
536 | wrapErr := func(err error) error { | |
537 | return status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error()) | |
538 | } | |
539 | cbuf := &bytes.Buffer{} | |
540 | if compressor != nil { | |
541 | z, err := compressor.Compress(cbuf) | |
15c0b25d | 542 | if err != nil { |
107c1cdb | 543 | return nil, wrapErr(err) |
15c0b25d | 544 | } |
107c1cdb ND |
545 | if _, err := z.Write(in); err != nil { |
546 | return nil, wrapErr(err) | |
15c0b25d | 547 | } |
107c1cdb ND |
548 | if err := z.Close(); err != nil { |
549 | return nil, wrapErr(err) | |
550 | } | |
551 | } else { | |
552 | if err := cp.Do(cbuf, in); err != nil { | |
553 | return nil, wrapErr(err) | |
15c0b25d | 554 | } |
15c0b25d | 555 | } |
107c1cdb ND |
556 | return cbuf.Bytes(), nil |
557 | } | |
15c0b25d | 558 | |
107c1cdb ND |
559 | const ( |
560 | payloadLen = 1 | |
561 | sizeLen = 4 | |
562 | headerLen = payloadLen + sizeLen | |
563 | ) | |
15c0b25d | 564 | |
107c1cdb ND |
565 | // msgHeader returns a 5-byte header for the message being transmitted and the |
566 | // payload, which is compData if non-nil or data otherwise. | |
567 | func msgHeader(data, compData []byte) (hdr []byte, payload []byte) { | |
568 | hdr = make([]byte, headerLen) | |
569 | if compData != nil { | |
570 | hdr[0] = byte(compressionMade) | |
571 | data = compData | |
15c0b25d | 572 | } else { |
107c1cdb | 573 | hdr[0] = byte(compressionNone) |
15c0b25d | 574 | } |
15c0b25d | 575 | |
107c1cdb ND |
576 | // Write length of payload into buf |
577 | binary.BigEndian.PutUint32(hdr[payloadLen:], uint32(len(data))) | |
578 | return hdr, data | |
579 | } | |
15c0b25d | 580 | |
107c1cdb ND |
581 | func outPayload(client bool, msg interface{}, data, payload []byte, t time.Time) *stats.OutPayload { |
582 | return &stats.OutPayload{ | |
583 | Client: client, | |
584 | Payload: msg, | |
585 | Data: data, | |
586 | Length: len(data), | |
587 | WireLength: len(payload) + headerLen, | |
588 | SentTime: t, | |
589 | } | |
15c0b25d AP |
590 | } |
591 | ||
107c1cdb | 592 | func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool) *status.Status { |
15c0b25d AP |
593 | switch pf { |
594 | case compressionNone: | |
595 | case compressionMade: | |
107c1cdb ND |
596 | if recvCompress == "" || recvCompress == encoding.Identity { |
597 | return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding") | |
598 | } | |
599 | if !haveCompressor { | |
600 | return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress) | |
15c0b25d AP |
601 | } |
602 | default: | |
107c1cdb | 603 | return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", pf) |
15c0b25d AP |
604 | } |
605 | return nil | |
606 | } | |
607 | ||
107c1cdb ND |
608 | type payloadInfo struct { |
609 | wireLength int // The compressed length got from wire. | |
610 | uncompressedBytes []byte | |
611 | } | |
612 | ||
613 | func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) { | |
15c0b25d AP |
614 | pf, d, err := p.recvMsg(maxReceiveMessageSize) |
615 | if err != nil { | |
107c1cdb | 616 | return nil, err |
15c0b25d | 617 | } |
107c1cdb ND |
618 | if payInfo != nil { |
619 | payInfo.wireLength = len(d) | |
15c0b25d | 620 | } |
107c1cdb ND |
621 | |
622 | if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil { | |
623 | return nil, st.Err() | |
15c0b25d | 624 | } |
107c1cdb | 625 | |
15c0b25d | 626 | if pf == compressionMade { |
107c1cdb ND |
627 | // To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor, |
628 | // use this decompressor as the default. | |
629 | if dc != nil { | |
630 | d, err = dc.Do(bytes.NewReader(d)) | |
631 | if err != nil { | |
632 | return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) | |
633 | } | |
634 | } else { | |
635 | dcReader, err := compressor.Decompress(bytes.NewReader(d)) | |
636 | if err != nil { | |
637 | return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) | |
638 | } | |
639 | d, err = ioutil.ReadAll(dcReader) | |
640 | if err != nil { | |
641 | return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) | |
642 | } | |
15c0b25d AP |
643 | } |
644 | } | |
645 | if len(d) > maxReceiveMessageSize { | |
646 | // TODO: Revisit the error code. Currently keep it consistent with java | |
647 | // implementation. | |
107c1cdb ND |
648 | return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize) |
649 | } | |
650 | return d, nil | |
651 | } | |
652 | ||
653 | // For the two compressor parameters, both should not be set, but if they are, | |
654 | // dc takes precedence over compressor. | |
655 | // TODO(dfawley): wrap the old compressor/decompressor using the new API? | |
656 | func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error { | |
657 | d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor) | |
658 | if err != nil { | |
659 | return err | |
15c0b25d AP |
660 | } |
661 | if err := c.Unmarshal(d, m); err != nil { | |
107c1cdb | 662 | return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) |
15c0b25d | 663 | } |
107c1cdb ND |
664 | if payInfo != nil { |
665 | payInfo.uncompressedBytes = d | |
15c0b25d AP |
666 | } |
667 | return nil | |
668 | } | |
669 | ||
670 | type rpcInfo struct { | |
107c1cdb | 671 | failfast bool |
15c0b25d AP |
672 | } |
673 | ||
674 | type rpcInfoContextKey struct{} | |
675 | ||
107c1cdb ND |
676 | func newContextWithRPCInfo(ctx context.Context, failfast bool) context.Context { |
677 | return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{failfast: failfast}) | |
15c0b25d AP |
678 | } |
679 | ||
680 | func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) { | |
681 | s, ok = ctx.Value(rpcInfoContextKey{}).(*rpcInfo) | |
682 | return | |
683 | } | |
684 | ||
15c0b25d AP |
685 | // Code returns the error code for err if it was produced by the rpc system. |
686 | // Otherwise, it returns codes.Unknown. | |
687 | // | |
107c1cdb | 688 | // Deprecated: use status.Code instead. |
15c0b25d | 689 | func Code(err error) codes.Code { |
107c1cdb | 690 | return status.Code(err) |
15c0b25d AP |
691 | } |
692 | ||
693 | // ErrorDesc returns the error description of err if it was produced by the rpc system. | |
694 | // Otherwise, it returns err.Error() or empty string when err is nil. | |
695 | // | |
107c1cdb | 696 | // Deprecated: use status.Convert and Message method instead. |
15c0b25d | 697 | func ErrorDesc(err error) string { |
107c1cdb | 698 | return status.Convert(err).Message() |
15c0b25d AP |
699 | } |
700 | ||
701 | // Errorf returns an error containing an error code and a description; | |
702 | // Errorf returns nil if c is OK. | |
703 | // | |
107c1cdb | 704 | // Deprecated: use status.Errorf instead. |
15c0b25d AP |
705 | func Errorf(c codes.Code, format string, a ...interface{}) error { |
706 | return status.Errorf(c, format, a...) | |
707 | } | |
708 | ||
107c1cdb ND |
709 | // toRPCErr converts an error into an error from the status package. |
710 | func toRPCErr(err error) error { | |
711 | if err == nil || err == io.EOF { | |
712 | return err | |
713 | } | |
714 | if err == io.ErrUnexpectedEOF { | |
715 | return status.Error(codes.Internal, err.Error()) | |
716 | } | |
717 | if _, ok := status.FromError(err); ok { | |
718 | return err | |
719 | } | |
720 | switch e := err.(type) { | |
721 | case transport.ConnectionError: | |
722 | return status.Error(codes.Unavailable, e.Desc) | |
723 | default: | |
724 | switch err { | |
725 | case context.DeadlineExceeded: | |
726 | return status.Error(codes.DeadlineExceeded, err.Error()) | |
727 | case context.Canceled: | |
728 | return status.Error(codes.Canceled, err.Error()) | |
729 | } | |
730 | } | |
731 | return status.Error(codes.Unknown, err.Error()) | |
732 | } | |
15c0b25d | 733 | |
107c1cdb ND |
734 | // setCallInfoCodec should only be called after CallOptions have been applied. |
735 | func setCallInfoCodec(c *callInfo) error { | |
736 | if c.codec != nil { | |
737 | // codec was already set by a CallOption; use it. | |
738 | return nil | |
739 | } | |
740 | ||
741 | if c.contentSubtype == "" { | |
742 | // No codec specified in CallOptions; use proto by default. | |
743 | c.codec = encoding.GetCodec(proto.Name) | |
744 | return nil | |
745 | } | |
746 | ||
747 | // c.contentSubtype is already lowercased in CallContentSubtype | |
748 | c.codec = encoding.GetCodec(c.contentSubtype) | |
749 | if c.codec == nil { | |
750 | return status.Errorf(codes.Internal, "no codec registered for content-subtype %s", c.contentSubtype) | |
751 | } | |
752 | return nil | |
753 | } | |
754 | ||
755 | // parseDialTarget returns the network and address to pass to dialer | |
756 | func parseDialTarget(target string) (net string, addr string) { | |
757 | net = "tcp" | |
758 | ||
759 | m1 := strings.Index(target, ":") | |
760 | m2 := strings.Index(target, ":/") | |
761 | ||
762 | // handle unix:addr which will fail with url.Parse | |
763 | if m1 >= 0 && m2 < 0 { | |
764 | if n := target[0:m1]; n == "unix" { | |
765 | net = n | |
766 | addr = target[m1+1:] | |
767 | return net, addr | |
768 | } | |
769 | } | |
770 | if m2 >= 0 { | |
771 | t, err := url.Parse(target) | |
772 | if err != nil { | |
773 | return net, target | |
774 | } | |
775 | scheme := t.Scheme | |
776 | addr = t.Path | |
777 | if scheme == "unix" { | |
778 | net = scheme | |
779 | if addr == "" { | |
780 | addr = t.Host | |
781 | } | |
782 | return net, addr | |
783 | } | |
784 | } | |
785 | ||
786 | return net, target | |
787 | } | |
788 | ||
789 | // channelzData is used to store channelz related data for ClientConn, addrConn and Server. | |
790 | // These fields cannot be embedded in the original structs (e.g. ClientConn), since to do atomic | |
791 | // operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment. | |
792 | // Here, by grouping those int64 fields inside a struct, we are enforcing the alignment. | |
793 | type channelzData struct { | |
794 | callsStarted int64 | |
795 | callsFailed int64 | |
796 | callsSucceeded int64 | |
797 | // lastCallStartedTime stores the timestamp that last call starts. It is of int64 type instead of | |
798 | // time.Time since it's more costly to atomically update time.Time variable than int64 variable. | |
799 | lastCallStartedTime int64 | |
800 | } | |
801 | ||
802 | // The SupportPackageIsVersion variables are referenced from generated protocol | |
803 | // buffer files to ensure compatibility with the gRPC version used. The latest | |
804 | // support package version is 5. | |
805 | // | |
806 | // Older versions are kept for compatibility. They may be removed if | |
807 | // compatibility cannot be maintained. | |
808 | // | |
809 | // These constants should not be referenced from any other code. | |
810 | const ( | |
811 | SupportPackageIsVersion3 = true | |
812 | SupportPackageIsVersion4 = true | |
813 | SupportPackageIsVersion5 = true | |
814 | ) | |
15c0b25d AP |
815 | |
816 | const grpcUA = "grpc-go/" + Version |