aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/google.golang.org/grpc/rpc_util.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/rpc_util.go')
-rw-r--r--vendor/google.golang.org/grpc/rpc_util.go734
1 files changed, 513 insertions, 221 deletions
diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go
index 9b9d388..8d0d3dc 100644
--- a/vendor/google.golang.org/grpc/rpc_util.go
+++ b/vendor/google.golang.org/grpc/rpc_util.go
@@ -21,24 +21,31 @@ package grpc
21import ( 21import (
22 "bytes" 22 "bytes"
23 "compress/gzip" 23 "compress/gzip"
24 "context"
24 "encoding/binary" 25 "encoding/binary"
26 "fmt"
25 "io" 27 "io"
26 "io/ioutil" 28 "io/ioutil"
27 "math" 29 "math"
30 "net/url"
31 "strings"
28 "sync" 32 "sync"
29 "time" 33 "time"
30 34
31 "golang.org/x/net/context"
32 "google.golang.org/grpc/codes" 35 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/credentials" 36 "google.golang.org/grpc/credentials"
37 "google.golang.org/grpc/encoding"
38 "google.golang.org/grpc/encoding/proto"
39 "google.golang.org/grpc/internal/transport"
34 "google.golang.org/grpc/metadata" 40 "google.golang.org/grpc/metadata"
35 "google.golang.org/grpc/peer" 41 "google.golang.org/grpc/peer"
36 "google.golang.org/grpc/stats" 42 "google.golang.org/grpc/stats"
37 "google.golang.org/grpc/status" 43 "google.golang.org/grpc/status"
38 "google.golang.org/grpc/transport"
39) 44)
40 45
41// Compressor defines the interface gRPC uses to compress a message. 46// Compressor defines the interface gRPC uses to compress a message.
47//
48// Deprecated: use package encoding.
42type Compressor interface { 49type Compressor interface {
43 // Do compresses p into w. 50 // Do compresses p into w.
44 Do(w io.Writer, p []byte) error 51 Do(w io.Writer, p []byte) error
@@ -51,18 +58,39 @@ type gzipCompressor struct {
51} 58}
52 59
53// NewGZIPCompressor creates a Compressor based on GZIP. 60// NewGZIPCompressor creates a Compressor based on GZIP.
61//
62// Deprecated: use package encoding/gzip.
54func NewGZIPCompressor() Compressor { 63func NewGZIPCompressor() Compressor {
64 c, _ := NewGZIPCompressorWithLevel(gzip.DefaultCompression)
65 return c
66}
67
68// NewGZIPCompressorWithLevel is like NewGZIPCompressor but specifies the gzip compression level instead
69// of assuming DefaultCompression.
70//
71// The error returned will be nil if the level is valid.
72//
73// Deprecated: use package encoding/gzip.
74func NewGZIPCompressorWithLevel(level int) (Compressor, error) {
75 if level < gzip.DefaultCompression || level > gzip.BestCompression {
76 return nil, fmt.Errorf("grpc: invalid compression level: %d", level)
77 }
55 return &gzipCompressor{ 78 return &gzipCompressor{
56 pool: sync.Pool{ 79 pool: sync.Pool{
57 New: func() interface{} { 80 New: func() interface{} {
58 return gzip.NewWriter(ioutil.Discard) 81 w, err := gzip.NewWriterLevel(ioutil.Discard, level)
82 if err != nil {
83 panic(err)
84 }
85 return w
59 }, 86 },
60 }, 87 },
61 } 88 }, nil
62} 89}
63 90
64func (c *gzipCompressor) Do(w io.Writer, p []byte) error { 91func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
65 z := c.pool.Get().(*gzip.Writer) 92 z := c.pool.Get().(*gzip.Writer)
93 defer c.pool.Put(z)
66 z.Reset(w) 94 z.Reset(w)
67 if _, err := z.Write(p); err != nil { 95 if _, err := z.Write(p); err != nil {
68 return err 96 return err
@@ -75,6 +103,8 @@ func (c *gzipCompressor) Type() string {
75} 103}
76 104
77// Decompressor defines the interface gRPC uses to decompress a message. 105// Decompressor defines the interface gRPC uses to decompress a message.
106//
107// Deprecated: use package encoding.
78type Decompressor interface { 108type Decompressor interface {
79 // Do reads the data from r and uncompress them. 109 // Do reads the data from r and uncompress them.
80 Do(r io.Reader) ([]byte, error) 110 Do(r io.Reader) ([]byte, error)
@@ -87,6 +117,8 @@ type gzipDecompressor struct {
87} 117}
88 118
89// NewGZIPDecompressor creates a Decompressor based on GZIP. 119// NewGZIPDecompressor creates a Decompressor based on GZIP.
120//
121// Deprecated: use package encoding/gzip.
90func NewGZIPDecompressor() Decompressor { 122func NewGZIPDecompressor() Decompressor {
91 return &gzipDecompressor{} 123 return &gzipDecompressor{}
92} 124}
@@ -121,17 +153,23 @@ func (d *gzipDecompressor) Type() string {
121 153
122// callInfo contains all related configuration and information about an RPC. 154// callInfo contains all related configuration and information about an RPC.
123type callInfo struct { 155type callInfo struct {
156 compressorType string
124 failFast bool 157 failFast bool
125 headerMD metadata.MD 158 stream ClientStream
126 trailerMD metadata.MD
127 peer *peer.Peer
128 traceInfo traceInfo // in trace.go
129 maxReceiveMessageSize *int 159 maxReceiveMessageSize *int
130 maxSendMessageSize *int 160 maxSendMessageSize *int
131 creds credentials.PerRPCCredentials 161 creds credentials.PerRPCCredentials
162 contentSubtype string
163 codec baseCodec
164 maxRetryRPCBufferSize int
132} 165}
133 166
134var defaultCallInfo = callInfo{failFast: true} 167func defaultCallInfo() *callInfo {
168 return &callInfo{
169 failFast: true,
170 maxRetryRPCBufferSize: 256 * 1024, // 256KB
171 }
172}
135 173
136// CallOption configures a Call before it starts or extracts information from 174// CallOption configures a Call before it starts or extracts information from
137// a Call after it completes. 175// a Call after it completes.
@@ -153,87 +191,267 @@ type EmptyCallOption struct{}
153func (EmptyCallOption) before(*callInfo) error { return nil } 191func (EmptyCallOption) before(*callInfo) error { return nil }
154func (EmptyCallOption) after(*callInfo) {} 192func (EmptyCallOption) after(*callInfo) {}
155 193
156type beforeCall func(c *callInfo) error
157
158func (o beforeCall) before(c *callInfo) error { return o(c) }
159func (o beforeCall) after(c *callInfo) {}
160
161type afterCall func(c *callInfo)
162
163func (o afterCall) before(c *callInfo) error { return nil }
164func (o afterCall) after(c *callInfo) { o(c) }
165
166// Header returns a CallOptions that retrieves the header metadata 194// Header returns a CallOptions that retrieves the header metadata
167// for a unary RPC. 195// for a unary RPC.
168func Header(md *metadata.MD) CallOption { 196func Header(md *metadata.MD) CallOption {
169 return afterCall(func(c *callInfo) { 197 return HeaderCallOption{HeaderAddr: md}
170 *md = c.headerMD 198}
171 }) 199
200// HeaderCallOption is a CallOption for collecting response header metadata.
201// The metadata field will be populated *after* the RPC completes.
202// This is an EXPERIMENTAL API.
203type HeaderCallOption struct {
204 HeaderAddr *metadata.MD
205}
206
207func (o HeaderCallOption) before(c *callInfo) error { return nil }
208func (o HeaderCallOption) after(c *callInfo) {
209 if c.stream != nil {
210 *o.HeaderAddr, _ = c.stream.Header()
211 }
172} 212}
173 213
174// Trailer returns a CallOptions that retrieves the trailer metadata 214// Trailer returns a CallOptions that retrieves the trailer metadata
175// for a unary RPC. 215// for a unary RPC.
176func Trailer(md *metadata.MD) CallOption { 216func Trailer(md *metadata.MD) CallOption {
177 return afterCall(func(c *callInfo) { 217 return TrailerCallOption{TrailerAddr: md}
178 *md = c.trailerMD 218}
179 }) 219
220// TrailerCallOption is a CallOption for collecting response trailer metadata.
221// The metadata field will be populated *after* the RPC completes.
222// This is an EXPERIMENTAL API.
223type TrailerCallOption struct {
224 TrailerAddr *metadata.MD
225}
226
227func (o TrailerCallOption) before(c *callInfo) error { return nil }
228func (o TrailerCallOption) after(c *callInfo) {
229 if c.stream != nil {
230 *o.TrailerAddr = c.stream.Trailer()
231 }
232}
233
234// Peer returns a CallOption that retrieves peer information for a unary RPC.
235// The peer field will be populated *after* the RPC completes.
236func Peer(p *peer.Peer) CallOption {
237 return PeerCallOption{PeerAddr: p}
180} 238}
181 239
182// Peer returns a CallOption that retrieves peer information for a 240// PeerCallOption is a CallOption for collecting the identity of the remote
183// unary RPC. 241// peer. The peer field will be populated *after* the RPC completes.
184func Peer(peer *peer.Peer) CallOption { 242// This is an EXPERIMENTAL API.
185 return afterCall(func(c *callInfo) { 243type PeerCallOption struct {
186 if c.peer != nil { 244 PeerAddr *peer.Peer
187 *peer = *c.peer 245}
246
247func (o PeerCallOption) before(c *callInfo) error { return nil }
248func (o PeerCallOption) after(c *callInfo) {
249 if c.stream != nil {
250 if x, ok := peer.FromContext(c.stream.Context()); ok {
251 *o.PeerAddr = *x
188 } 252 }
189 }) 253 }
190} 254}
191 255
192// FailFast configures the action to take when an RPC is attempted on broken 256// WaitForReady configures the action to take when an RPC is attempted on broken
193// connections or unreachable servers. If failfast is true, the RPC will fail 257// connections or unreachable servers. If waitForReady is false, the RPC will fail
194// immediately. Otherwise, the RPC client will block the call until a 258// immediately. Otherwise, the RPC client will block the call until a
195// connection is available (or the call is canceled or times out) and will retry 259// connection is available (or the call is canceled or times out) and will
196// the call if it fails due to a transient error. Please refer to 260// retry the call if it fails due to a transient error. gRPC will not retry if
261// data was written to the wire unless the server indicates it did not process
262// the data. Please refer to
197// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md. 263// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md.
198// Note: failFast is default to true. 264//
265// By default, RPCs don't "wait for ready".
266func WaitForReady(waitForReady bool) CallOption {
267 return FailFastCallOption{FailFast: !waitForReady}
268}
269
270// FailFast is the opposite of WaitForReady.
271//
272// Deprecated: use WaitForReady.
199func FailFast(failFast bool) CallOption { 273func FailFast(failFast bool) CallOption {
200 return beforeCall(func(c *callInfo) error { 274 return FailFastCallOption{FailFast: failFast}
201 c.failFast = failFast
202 return nil
203 })
204} 275}
205 276
277// FailFastCallOption is a CallOption for indicating whether an RPC should fail
278// fast or not.
279// This is an EXPERIMENTAL API.
280type FailFastCallOption struct {
281 FailFast bool
282}
283
284func (o FailFastCallOption) before(c *callInfo) error {
285 c.failFast = o.FailFast
286 return nil
287}
288func (o FailFastCallOption) after(c *callInfo) {}
289
206// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size the client can receive. 290// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size the client can receive.
207func MaxCallRecvMsgSize(s int) CallOption { 291func MaxCallRecvMsgSize(s int) CallOption {
208 return beforeCall(func(o *callInfo) error { 292 return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: s}
209 o.maxReceiveMessageSize = &s 293}
210 return nil 294
211 }) 295// MaxRecvMsgSizeCallOption is a CallOption that indicates the maximum message
296// size the client can receive.
297// This is an EXPERIMENTAL API.
298type MaxRecvMsgSizeCallOption struct {
299 MaxRecvMsgSize int
212} 300}
213 301
302func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error {
303 c.maxReceiveMessageSize = &o.MaxRecvMsgSize
304 return nil
305}
306func (o MaxRecvMsgSizeCallOption) after(c *callInfo) {}
307
214// MaxCallSendMsgSize returns a CallOption which sets the maximum message size the client can send. 308// MaxCallSendMsgSize returns a CallOption which sets the maximum message size the client can send.
215func MaxCallSendMsgSize(s int) CallOption { 309func MaxCallSendMsgSize(s int) CallOption {
216 return beforeCall(func(o *callInfo) error { 310 return MaxSendMsgSizeCallOption{MaxSendMsgSize: s}
217 o.maxSendMessageSize = &s 311}
218 return nil 312
219 }) 313// MaxSendMsgSizeCallOption is a CallOption that indicates the maximum message
314// size the client can send.
315// This is an EXPERIMENTAL API.
316type MaxSendMsgSizeCallOption struct {
317 MaxSendMsgSize int
220} 318}
221 319
320func (o MaxSendMsgSizeCallOption) before(c *callInfo) error {
321 c.maxSendMessageSize = &o.MaxSendMsgSize
322 return nil
323}
324func (o MaxSendMsgSizeCallOption) after(c *callInfo) {}
325
222// PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials 326// PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials
223// for a call. 327// for a call.
224func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption { 328func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption {
225 return beforeCall(func(c *callInfo) error { 329 return PerRPCCredsCallOption{Creds: creds}
226 c.creds = creds 330}
227 return nil 331
228 }) 332// PerRPCCredsCallOption is a CallOption that indicates the per-RPC
333// credentials to use for the call.
334// This is an EXPERIMENTAL API.
335type PerRPCCredsCallOption struct {
336 Creds credentials.PerRPCCredentials
337}
338
339func (o PerRPCCredsCallOption) before(c *callInfo) error {
340 c.creds = o.Creds
341 return nil
342}
343func (o PerRPCCredsCallOption) after(c *callInfo) {}
344
345// UseCompressor returns a CallOption which sets the compressor used when
346// sending the request. If WithCompressor is also set, UseCompressor has
347// higher priority.
348//
349// This API is EXPERIMENTAL.
350func UseCompressor(name string) CallOption {
351 return CompressorCallOption{CompressorType: name}
352}
353
354// CompressorCallOption is a CallOption that indicates the compressor to use.
355// This is an EXPERIMENTAL API.
356type CompressorCallOption struct {
357 CompressorType string
358}
359
360func (o CompressorCallOption) before(c *callInfo) error {
361 c.compressorType = o.CompressorType
362 return nil
363}
364func (o CompressorCallOption) after(c *callInfo) {}
365
366// CallContentSubtype returns a CallOption that will set the content-subtype
367// for a call. For example, if content-subtype is "json", the Content-Type over
368// the wire will be "application/grpc+json". The content-subtype is converted
369// to lowercase before being included in Content-Type. See Content-Type on
370// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
371// more details.
372//
373// If CallCustomCodec is not also used, the content-subtype will be used to
374// look up the Codec to use in the registry controlled by RegisterCodec. See
375// the documentation on RegisterCodec for details on registration. The lookup
376// of content-subtype is case-insensitive. If no such Codec is found, the call
377// will result in an error with code codes.Internal.
378//
379// If CallCustomCodec is also used, that Codec will be used for all request and
380// response messages, with the content-subtype set to the given contentSubtype
381// here for requests.
382func CallContentSubtype(contentSubtype string) CallOption {
383 return ContentSubtypeCallOption{ContentSubtype: strings.ToLower(contentSubtype)}
384}
385
386// ContentSubtypeCallOption is a CallOption that indicates the content-subtype
387// used for marshaling messages.
388// This is an EXPERIMENTAL API.
389type ContentSubtypeCallOption struct {
390 ContentSubtype string
391}
392
393func (o ContentSubtypeCallOption) before(c *callInfo) error {
394 c.contentSubtype = o.ContentSubtype
395 return nil
396}
397func (o ContentSubtypeCallOption) after(c *callInfo) {}
398
399// CallCustomCodec returns a CallOption that will set the given Codec to be
400// used for all request and response messages for a call. The result of calling
401// String() will be used as the content-subtype in a case-insensitive manner.
402//
403// See Content-Type on
404// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
405// more details. Also see the documentation on RegisterCodec and
406// CallContentSubtype for more details on the interaction between Codec and
407// content-subtype.
408//
409// This function is provided for advanced users; prefer to use only
410// CallContentSubtype to select a registered codec instead.
411func CallCustomCodec(codec Codec) CallOption {
412 return CustomCodecCallOption{Codec: codec}
413}
414
415// CustomCodecCallOption is a CallOption that indicates the codec used for
416// marshaling messages.
417// This is an EXPERIMENTAL API.
418type CustomCodecCallOption struct {
419 Codec Codec
420}
421
422func (o CustomCodecCallOption) before(c *callInfo) error {
423 c.codec = o.Codec
424 return nil
425}
426func (o CustomCodecCallOption) after(c *callInfo) {}
427
428// MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory
429// used for buffering this RPC's requests for retry purposes.
430//
431// This API is EXPERIMENTAL.
432func MaxRetryRPCBufferSize(bytes int) CallOption {
433 return MaxRetryRPCBufferSizeCallOption{bytes}
434}
435
436// MaxRetryRPCBufferSizeCallOption is a CallOption indicating the amount of
437// memory to be used for caching this RPC for retry purposes.
438// This is an EXPERIMENTAL API.
439type MaxRetryRPCBufferSizeCallOption struct {
440 MaxRetryRPCBufferSize int
441}
442
443func (o MaxRetryRPCBufferSizeCallOption) before(c *callInfo) error {
444 c.maxRetryRPCBufferSize = o.MaxRetryRPCBufferSize
445 return nil
229} 446}
447func (o MaxRetryRPCBufferSizeCallOption) after(c *callInfo) {}
230 448
231// The format of the payload: compressed or not? 449// The format of the payload: compressed or not?
232type payloadFormat uint8 450type payloadFormat uint8
233 451
234const ( 452const (
235 compressionNone payloadFormat = iota // no compression 453 compressionNone payloadFormat = 0 // no compression
236 compressionMade 454 compressionMade payloadFormat = 1 // compressed
237) 455)
238 456
239// parser reads complete gRPC messages from the underlying reader. 457// parser reads complete gRPC messages from the underlying reader.
@@ -243,8 +461,8 @@ type parser struct {
243 // error types. 461 // error types.
244 r io.Reader 462 r io.Reader
245 463
246 // The header of a gRPC message. Find more detail 464 // The header of a gRPC message. Find more detail at
247 // at https://grpc.io/docs/guides/wire.html. 465 // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
248 header [5]byte 466 header [5]byte
249} 467}
250 468
@@ -257,7 +475,7 @@ type parser struct {
257// * io.EOF, when no messages remain 475// * io.EOF, when no messages remain
258// * io.ErrUnexpectedEOF 476// * io.ErrUnexpectedEOF
259// * of type transport.ConnectionError 477// * of type transport.ConnectionError
260// * of type transport.StreamError 478// * an error from the status package
261// No other error values or types must be returned, which also means 479// No other error values or types must be returned, which also means
262// that the underlying io.Reader must not return an incompatible 480// that the underlying io.Reader must not return an incompatible
263// error. 481// error.
@@ -272,8 +490,11 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
272 if length == 0 { 490 if length == 0 {
273 return pf, nil, nil 491 return pf, nil, nil
274 } 492 }
275 if length > uint32(maxReceiveMessageSize) { 493 if int64(length) > int64(maxInt) {
276 return 0, nil, Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize) 494 return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
495 }
496 if int(length) > maxReceiveMessageSize {
497 return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
277 } 498 }
278 // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead 499 // TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
279 // of making it for each message: 500 // of making it for each message:
@@ -287,120 +508,173 @@ func (p *parser) recvMsg(maxReceiveMessageSize int) (pf payloadFormat, msg []byt
287 return pf, msg, nil 508 return pf, msg, nil
288} 509}
289 510
290// encode serializes msg and prepends the message header. If msg is nil, it 511// encode serializes msg and returns a buffer containing the message, or an
291// generates the message header of 0 message length. 512// error if it is too large to be transmitted by grpc. If msg is nil, it
292func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayload *stats.OutPayload) ([]byte, error) { 513// generates an empty message.
293 var ( 514func encode(c baseCodec, msg interface{}) ([]byte, error) {
294 b []byte 515 if msg == nil { // NOTE: typed nils will not be caught by this check
295 length uint 516 return nil, nil
296 ) 517 }
297 if msg != nil { 518 b, err := c.Marshal(msg)
298 var err error 519 if err != nil {
299 // TODO(zhaoq): optimize to reduce memory alloc and copying. 520 return nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
300 b, err = c.Marshal(msg) 521 }
522 if uint(len(b)) > math.MaxUint32 {
523 return nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b))
524 }
525 return b, nil
526}
527
528// compress returns the input bytes compressed by compressor or cp. If both
529// compressors are nil, returns nil.
530//
531// TODO(dfawley): eliminate cp parameter by wrapping Compressor in an encoding.Compressor.
532func compress(in []byte, cp Compressor, compressor encoding.Compressor) ([]byte, error) {
533 if compressor == nil && cp == nil {
534 return nil, nil
535 }
536 wrapErr := func(err error) error {
537 return status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
538 }
539 cbuf := &bytes.Buffer{}
540 if compressor != nil {
541 z, err := compressor.Compress(cbuf)
301 if err != nil { 542 if err != nil {
302 return nil, Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error()) 543 return nil, wrapErr(err)
303 } 544 }
304 if outPayload != nil { 545 if _, err := z.Write(in); err != nil {
305 outPayload.Payload = msg 546 return nil, wrapErr(err)
306 // TODO truncate large payload.
307 outPayload.Data = b
308 outPayload.Length = len(b)
309 } 547 }
310 if cp != nil { 548 if err := z.Close(); err != nil {
311 if err := cp.Do(cbuf, b); err != nil { 549 return nil, wrapErr(err)
312 return nil, Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error()) 550 }
313 } 551 } else {
314 b = cbuf.Bytes() 552 if err := cp.Do(cbuf, in); err != nil {
553 return nil, wrapErr(err)
315 } 554 }
316 length = uint(len(b))
317 }
318 if length > math.MaxUint32 {
319 return nil, Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", length)
320 } 555 }
556 return cbuf.Bytes(), nil
557}
321 558
322 const ( 559const (
323 payloadLen = 1 560 payloadLen = 1
324 sizeLen = 4 561 sizeLen = 4
325 ) 562 headerLen = payloadLen + sizeLen
326 563)
327 var buf = make([]byte, payloadLen+sizeLen+len(b))
328 564
329 // Write payload format 565// msgHeader returns a 5-byte header for the message being transmitted and the
330 if cp == nil { 566// payload, which is compData if non-nil or data otherwise.
331 buf[0] = byte(compressionNone) 567func msgHeader(data, compData []byte) (hdr []byte, payload []byte) {
568 hdr = make([]byte, headerLen)
569 if compData != nil {
570 hdr[0] = byte(compressionMade)
571 data = compData
332 } else { 572 } else {
333 buf[0] = byte(compressionMade) 573 hdr[0] = byte(compressionNone)
334 } 574 }
335 // Write length of b into buf
336 binary.BigEndian.PutUint32(buf[1:], uint32(length))
337 // Copy encoded msg to buf
338 copy(buf[5:], b)
339 575
340 if outPayload != nil { 576 // Write length of payload into buf
341 outPayload.WireLength = len(buf) 577 binary.BigEndian.PutUint32(hdr[payloadLen:], uint32(len(data)))
342 } 578 return hdr, data
579}
343 580
344 return buf, nil 581func outPayload(client bool, msg interface{}, data, payload []byte, t time.Time) *stats.OutPayload {
582 return &stats.OutPayload{
583 Client: client,
584 Payload: msg,
585 Data: data,
586 Length: len(data),
587 WireLength: len(payload) + headerLen,
588 SentTime: t,
589 }
345} 590}
346 591
347func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) error { 592func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool) *status.Status {
348 switch pf { 593 switch pf {
349 case compressionNone: 594 case compressionNone:
350 case compressionMade: 595 case compressionMade:
351 if dc == nil || recvCompress != dc.Type() { 596 if recvCompress == "" || recvCompress == encoding.Identity {
352 return Errorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress) 597 return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding")
598 }
599 if !haveCompressor {
600 return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
353 } 601 }
354 default: 602 default:
355 return Errorf(codes.Internal, "grpc: received unexpected payload format %d", pf) 603 return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", pf)
356 } 604 }
357 return nil 605 return nil
358} 606}
359 607
360func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload) error { 608type payloadInfo struct {
609 wireLength int // The compressed length got from wire.
610 uncompressedBytes []byte
611}
612
613func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
361 pf, d, err := p.recvMsg(maxReceiveMessageSize) 614 pf, d, err := p.recvMsg(maxReceiveMessageSize)
362 if err != nil { 615 if err != nil {
363 return err 616 return nil, err
364 } 617 }
365 if inPayload != nil { 618 if payInfo != nil {
366 inPayload.WireLength = len(d) 619 payInfo.wireLength = len(d)
367 } 620 }
368 if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil { 621
369 return err 622 if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
623 return nil, st.Err()
370 } 624 }
625
371 if pf == compressionMade { 626 if pf == compressionMade {
372 d, err = dc.Do(bytes.NewReader(d)) 627 // To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
373 if err != nil { 628 // use this decompressor as the default.
374 return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) 629 if dc != nil {
630 d, err = dc.Do(bytes.NewReader(d))
631 if err != nil {
632 return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
633 }
634 } else {
635 dcReader, err := compressor.Decompress(bytes.NewReader(d))
636 if err != nil {
637 return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
638 }
639 d, err = ioutil.ReadAll(dcReader)
640 if err != nil {
641 return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
642 }
375 } 643 }
376 } 644 }
377 if len(d) > maxReceiveMessageSize { 645 if len(d) > maxReceiveMessageSize {
378 // TODO: Revisit the error code. Currently keep it consistent with java 646 // TODO: Revisit the error code. Currently keep it consistent with java
379 // implementation. 647 // implementation.
380 return Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize) 648 return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize)
649 }
650 return d, nil
651}
652
653// For the two compressor parameters, both should not be set, but if they are,
654// dc takes precedence over compressor.
655// TODO(dfawley): wrap the old compressor/decompressor using the new API?
656func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
657 d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
658 if err != nil {
659 return err
381 } 660 }
382 if err := c.Unmarshal(d, m); err != nil { 661 if err := c.Unmarshal(d, m); err != nil {
383 return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err) 662 return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
384 } 663 }
385 if inPayload != nil { 664 if payInfo != nil {
386 inPayload.RecvTime = time.Now() 665 payInfo.uncompressedBytes = d
387 inPayload.Payload = m
388 // TODO truncate large payload.
389 inPayload.Data = d
390 inPayload.Length = len(d)
391 } 666 }
392 return nil 667 return nil
393} 668}
394 669
395type rpcInfo struct { 670type rpcInfo struct {
396 bytesSent bool 671 failfast bool
397 bytesReceived bool
398} 672}
399 673
400type rpcInfoContextKey struct{} 674type rpcInfoContextKey struct{}
401 675
402func newContextWithRPCInfo(ctx context.Context) context.Context { 676func newContextWithRPCInfo(ctx context.Context, failfast bool) context.Context {
403 return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{}) 677 return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{failfast: failfast})
404} 678}
405 679
406func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) { 680func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) {
@@ -408,117 +682,135 @@ func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) {
408 return 682 return
409} 683}
410 684
411func updateRPCInfoInContext(ctx context.Context, s rpcInfo) {
412 if ss, ok := rpcInfoFromContext(ctx); ok {
413 *ss = s
414 }
415 return
416}
417
418// Code returns the error code for err if it was produced by the rpc system. 685// Code returns the error code for err if it was produced by the rpc system.
419// Otherwise, it returns codes.Unknown. 686// Otherwise, it returns codes.Unknown.
420// 687//
421// Deprecated; use status.FromError and Code method instead. 688// Deprecated: use status.Code instead.
422func Code(err error) codes.Code { 689func Code(err error) codes.Code {
423 if s, ok := status.FromError(err); ok { 690 return status.Code(err)
424 return s.Code()
425 }
426 return codes.Unknown
427} 691}
428 692
429// ErrorDesc returns the error description of err if it was produced by the rpc system. 693// ErrorDesc returns the error description of err if it was produced by the rpc system.
430// Otherwise, it returns err.Error() or empty string when err is nil. 694// Otherwise, it returns err.Error() or empty string when err is nil.
431// 695//
432// Deprecated; use status.FromError and Message method instead. 696// Deprecated: use status.Convert and Message method instead.
433func ErrorDesc(err error) string { 697func ErrorDesc(err error) string {
434 if s, ok := status.FromError(err); ok { 698 return status.Convert(err).Message()
435 return s.Message()
436 }
437 return err.Error()
438} 699}
439 700
440// Errorf returns an error containing an error code and a description; 701// Errorf returns an error containing an error code and a description;
441// Errorf returns nil if c is OK. 702// Errorf returns nil if c is OK.
442// 703//
443// Deprecated; use status.Errorf instead. 704// Deprecated: use status.Errorf instead.
444func Errorf(c codes.Code, format string, a ...interface{}) error { 705func Errorf(c codes.Code, format string, a ...interface{}) error {
445 return status.Errorf(c, format, a...) 706 return status.Errorf(c, format, a...)
446} 707}
447 708
448// MethodConfig defines the configuration recommended by the service providers for a 709// toRPCErr converts an error into an error from the status package.
449// particular method. 710func toRPCErr(err error) error {
450// This is EXPERIMENTAL and subject to change. 711 if err == nil || err == io.EOF {
451type MethodConfig struct { 712 return err
452 // WaitForReady indicates whether RPCs sent to this method should wait until 713 }
453 // the connection is ready by default (!failfast). The value specified via the 714 if err == io.ErrUnexpectedEOF {
454 // gRPC client API will override the value set here. 715 return status.Error(codes.Internal, err.Error())
455 WaitForReady *bool 716 }
456 // Timeout is the default timeout for RPCs sent to this method. The actual 717 if _, ok := status.FromError(err); ok {
457 // deadline used will be the minimum of the value specified here and the value 718 return err
458 // set by the application via the gRPC client API. If either one is not set, 719 }
459 // then the other will be used. If neither is set, then the RPC has no deadline. 720 switch e := err.(type) {
460 Timeout *time.Duration 721 case transport.ConnectionError:
461 // MaxReqSize is the maximum allowed payload size for an individual request in a 722 return status.Error(codes.Unavailable, e.Desc)
462 // stream (client->server) in bytes. The size which is measured is the serialized 723 default:
463 // payload after per-message compression (but before stream compression) in bytes. 724 switch err {
464 // The actual value used is the minumum of the value specified here and the value set 725 case context.DeadlineExceeded:
465 // by the application via the gRPC client API. If either one is not set, then the other 726 return status.Error(codes.DeadlineExceeded, err.Error())
466 // will be used. If neither is set, then the built-in default is used. 727 case context.Canceled:
467 MaxReqSize *int 728 return status.Error(codes.Canceled, err.Error())
468 // MaxRespSize is the maximum allowed payload size for an individual response in a 729 }
469 // stream (server->client) in bytes. 730 }
470 MaxRespSize *int 731 return status.Error(codes.Unknown, err.Error())
471} 732}
472
473// ServiceConfig is provided by the service provider and contains parameters for how
474// clients that connect to the service should behave.
475// This is EXPERIMENTAL and subject to change.
476type ServiceConfig struct {
477 // LB is the load balancer the service providers recommends. The balancer specified
478 // via grpc.WithBalancer will override this.
479 LB Balancer
480 // Methods contains a map for the methods in this service.
481 // If there is an exact match for a method (i.e. /service/method) in the map, use the corresponding MethodConfig.
482 // If there's no exact match, look for the default config for the service (/service/) and use the corresponding MethodConfig if it exists.
483 // Otherwise, the method has no MethodConfig to use.
484 Methods map[string]MethodConfig
485}
486
487func min(a, b *int) *int {
488 if *a < *b {
489 return a
490 }
491 return b
492}
493
494func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
495 if mcMax == nil && doptMax == nil {
496 return &defaultVal
497 }
498 if mcMax != nil && doptMax != nil {
499 return min(mcMax, doptMax)
500 }
501 if mcMax != nil {
502 return mcMax
503 }
504 return doptMax
505}
506
507// SupportPackageIsVersion3 is referenced from generated protocol buffer files.
508// The latest support package version is 4.
509// SupportPackageIsVersion3 is kept for compability. It will be removed in the
510// next support package version update.
511const SupportPackageIsVersion3 = true
512
513// SupportPackageIsVersion4 is referenced from generated protocol buffer files
514// to assert that that code is compatible with this version of the grpc package.
515//
516// This constant may be renamed in the future if a change in the generated code
517// requires a synchronised update of grpc-go and protoc-gen-go. This constant
518// should not be referenced from any other code.
519const SupportPackageIsVersion4 = true
520 733
521// Version is the current grpc version. 734// setCallInfoCodec should only be called after CallOptions have been applied.
522const Version = "1.6.0-dev" 735func setCallInfoCodec(c *callInfo) error {
736 if c.codec != nil {
737 // codec was already set by a CallOption; use it.
738 return nil
739 }
740
741 if c.contentSubtype == "" {
742 // No codec specified in CallOptions; use proto by default.
743 c.codec = encoding.GetCodec(proto.Name)
744 return nil
745 }
746
747 // c.contentSubtype is already lowercased in CallContentSubtype
748 c.codec = encoding.GetCodec(c.contentSubtype)
749 if c.codec == nil {
750 return status.Errorf(codes.Internal, "no codec registered for content-subtype %s", c.contentSubtype)
751 }
752 return nil
753}
754
755// parseDialTarget returns the network and address to pass to dialer
756func parseDialTarget(target string) (net string, addr string) {
757 net = "tcp"
758
759 m1 := strings.Index(target, ":")
760 m2 := strings.Index(target, ":/")
761
762 // handle unix:addr which will fail with url.Parse
763 if m1 >= 0 && m2 < 0 {
764 if n := target[0:m1]; n == "unix" {
765 net = n
766 addr = target[m1+1:]
767 return net, addr
768 }
769 }
770 if m2 >= 0 {
771 t, err := url.Parse(target)
772 if err != nil {
773 return net, target
774 }
775 scheme := t.Scheme
776 addr = t.Path
777 if scheme == "unix" {
778 net = scheme
779 if addr == "" {
780 addr = t.Host
781 }
782 return net, addr
783 }
784 }
785
786 return net, target
787}
788
789// channelzData is used to store channelz related data for ClientConn, addrConn and Server.
790// These fields cannot be embedded in the original structs (e.g. ClientConn), since to do atomic
791// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
792// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
793type channelzData struct {
794 callsStarted int64
795 callsFailed int64
796 callsSucceeded int64
797 // lastCallStartedTime stores the timestamp that last call starts. It is of int64 type instead of
798 // time.Time since it's more costly to atomically update time.Time variable than int64 variable.
799 lastCallStartedTime int64
800}
801
802// The SupportPackageIsVersion variables are referenced from generated protocol
803// buffer files to ensure compatibility with the gRPC version used. The latest
804// support package version is 5.
805//
806// Older versions are kept for compatibility. They may be removed if
807// compatibility cannot be maintained.
808//
809// These constants should not be referenced from any other code.
810const (
811 SupportPackageIsVersion3 = true
812 SupportPackageIsVersion4 = true
813 SupportPackageIsVersion5 = true
814)
523 815
524const grpcUA = "grpc-go/" + Version 816const grpcUA = "grpc-go/" + Version