import (
"bytes"
"compress/gzip"
+ "context"
"encoding/binary"
+ "fmt"
"io"
"io/ioutil"
"math"
+ "net/url"
+ "strings"
"sync"
"time"
- "golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/encoding"
+ "google.golang.org/grpc/encoding/proto"
+ "google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
- "google.golang.org/grpc/transport"
)
// Compressor defines the interface gRPC uses to compress a message.
+//
+// Deprecated: use package encoding.
type Compressor interface {
// Do compresses p into w.
Do(w io.Writer, p []byte) error
}
// NewGZIPCompressor creates a Compressor based on GZIP.
+//
+// Deprecated: use package encoding/gzip.
func NewGZIPCompressor() Compressor {
+ c, _ := NewGZIPCompressorWithLevel(gzip.DefaultCompression)
+ return c
+}
+
+// NewGZIPCompressorWithLevel is like NewGZIPCompressor but specifies the gzip compression level instead
+// of assuming DefaultCompression.
+//
+// The error returned will be nil if the level is valid.
+//
+// Deprecated: use package encoding/gzip.
+func NewGZIPCompressorWithLevel(level int) (Compressor, error) {
+ if level < gzip.DefaultCompression || level > gzip.BestCompression {
+ return nil, fmt.Errorf("grpc: invalid compression level: %d", level)
+ }
return &gzipCompressor{
pool: sync.Pool{
New: func() interface{} {
- return gzip.NewWriter(ioutil.Discard)
+ w, err := gzip.NewWriterLevel(ioutil.Discard, level)
+ if err != nil {
+ panic(err)
+ }
+ return w
},
},
- }
+ }, nil
}
func (c *gzipCompressor) Do(w io.Writer, p []byte) error {
z := c.pool.Get().(*gzip.Writer)
+ defer c.pool.Put(z)
z.Reset(w)
if _, err := z.Write(p); err != nil {
return err
}
// Decompressor defines the interface gRPC uses to decompress a message.
+//
+// Deprecated: use package encoding.
type Decompressor interface {
// Do reads the data from r and uncompress them.
Do(r io.Reader) ([]byte, error)
}
// NewGZIPDecompressor creates a Decompressor based on GZIP.
+//
+// Deprecated: use package encoding/gzip.
func NewGZIPDecompressor() Decompressor {
return &gzipDecompressor{}
}
// callInfo contains all related configuration and information about an RPC.
type callInfo struct {
+ compressorType string
failFast bool
- headerMD metadata.MD
- trailerMD metadata.MD
- peer *peer.Peer
- traceInfo traceInfo // in trace.go
+ stream ClientStream
maxReceiveMessageSize *int
maxSendMessageSize *int
creds credentials.PerRPCCredentials
+ contentSubtype string
+ codec baseCodec
+ maxRetryRPCBufferSize int
}
-var defaultCallInfo = callInfo{failFast: true}
+func defaultCallInfo() *callInfo {
+ return &callInfo{
+ failFast: true,
+ maxRetryRPCBufferSize: 256 * 1024, // 256KB
+ }
+}
// CallOption configures a Call before it starts or extracts information from
// a Call after it completes.
func (EmptyCallOption) before(*callInfo) error { return nil }
func (EmptyCallOption) after(*callInfo) {}
-type beforeCall func(c *callInfo) error
-
-func (o beforeCall) before(c *callInfo) error { return o(c) }
-func (o beforeCall) after(c *callInfo) {}
-
-type afterCall func(c *callInfo)
-
-func (o afterCall) before(c *callInfo) error { return nil }
-func (o afterCall) after(c *callInfo) { o(c) }
-
// Header returns a CallOptions that retrieves the header metadata
// for a unary RPC.
func Header(md *metadata.MD) CallOption {
- return afterCall(func(c *callInfo) {
- *md = c.headerMD
- })
+ return HeaderCallOption{HeaderAddr: md}
+}
+
+// HeaderCallOption is a CallOption for collecting response header metadata.
+// The metadata field will be populated *after* the RPC completes.
+// This is an EXPERIMENTAL API.
+type HeaderCallOption struct {
+ HeaderAddr *metadata.MD
+}
+
+func (o HeaderCallOption) before(c *callInfo) error { return nil }
+func (o HeaderCallOption) after(c *callInfo) {
+ if c.stream != nil {
+ *o.HeaderAddr, _ = c.stream.Header()
+ }
}
// Trailer returns a CallOptions that retrieves the trailer metadata
// for a unary RPC.
func Trailer(md *metadata.MD) CallOption {
- return afterCall(func(c *callInfo) {
- *md = c.trailerMD
- })
+ return TrailerCallOption{TrailerAddr: md}
+}
+
+// TrailerCallOption is a CallOption for collecting response trailer metadata.
+// The metadata field will be populated *after* the RPC completes.
+// This is an EXPERIMENTAL API.
+type TrailerCallOption struct {
+ TrailerAddr *metadata.MD
+}
+
+func (o TrailerCallOption) before(c *callInfo) error { return nil }
+func (o TrailerCallOption) after(c *callInfo) {
+ if c.stream != nil {
+ *o.TrailerAddr = c.stream.Trailer()
+ }
+}
+
+// Peer returns a CallOption that retrieves peer information for a unary RPC.
+// The peer field will be populated *after* the RPC completes.
+func Peer(p *peer.Peer) CallOption {
+ return PeerCallOption{PeerAddr: p}
}
-// Peer returns a CallOption that retrieves peer information for a
-// unary RPC.
-func Peer(peer *peer.Peer) CallOption {
- return afterCall(func(c *callInfo) {
- if c.peer != nil {
- *peer = *c.peer
+// PeerCallOption is a CallOption for collecting the identity of the remote
+// peer. The peer field will be populated *after* the RPC completes.
+// This is an EXPERIMENTAL API.
+type PeerCallOption struct {
+ PeerAddr *peer.Peer
+}
+
+func (o PeerCallOption) before(c *callInfo) error { return nil }
+func (o PeerCallOption) after(c *callInfo) {
+ if c.stream != nil {
+ if x, ok := peer.FromContext(c.stream.Context()); ok {
+ *o.PeerAddr = *x
}
- })
+ }
}
-// FailFast configures the action to take when an RPC is attempted on broken
-// connections or unreachable servers. If failfast is true, the RPC will fail
+// WaitForReady configures the action to take when an RPC is attempted on broken
+// connections or unreachable servers. If waitForReady is false, the RPC will fail
// immediately. Otherwise, the RPC client will block the call until a
-// connection is available (or the call is canceled or times out) and will retry
-// the call if it fails due to a transient error. Please refer to
+// connection is available (or the call is canceled or times out) and will
+// retry the call if it fails due to a transient error. gRPC will not retry if
+// data was written to the wire unless the server indicates it did not process
+// the data. Please refer to
// https://github.com/grpc/grpc/blob/master/doc/wait-for-ready.md.
-// Note: failFast is default to true.
+//
+// By default, RPCs don't "wait for ready".
+func WaitForReady(waitForReady bool) CallOption {
+ return FailFastCallOption{FailFast: !waitForReady}
+}
+
+// FailFast is the opposite of WaitForReady.
+//
+// Deprecated: use WaitForReady.
func FailFast(failFast bool) CallOption {
- return beforeCall(func(c *callInfo) error {
- c.failFast = failFast
- return nil
- })
+ return FailFastCallOption{FailFast: failFast}
}
+// FailFastCallOption is a CallOption for indicating whether an RPC should fail
+// fast or not.
+// This is an EXPERIMENTAL API.
+type FailFastCallOption struct {
+ FailFast bool
+}
+
+func (o FailFastCallOption) before(c *callInfo) error {
+ c.failFast = o.FailFast
+ return nil
+}
+func (o FailFastCallOption) after(c *callInfo) {}
+
// MaxCallRecvMsgSize returns a CallOption which sets the maximum message size the client can receive.
func MaxCallRecvMsgSize(s int) CallOption {
- return beforeCall(func(o *callInfo) error {
- o.maxReceiveMessageSize = &s
- return nil
- })
+ return MaxRecvMsgSizeCallOption{MaxRecvMsgSize: s}
+}
+
+// MaxRecvMsgSizeCallOption is a CallOption that indicates the maximum message
+// size the client can receive.
+// This is an EXPERIMENTAL API.
+type MaxRecvMsgSizeCallOption struct {
+ MaxRecvMsgSize int
}
+func (o MaxRecvMsgSizeCallOption) before(c *callInfo) error {
+ c.maxReceiveMessageSize = &o.MaxRecvMsgSize
+ return nil
+}
+func (o MaxRecvMsgSizeCallOption) after(c *callInfo) {}
+
// MaxCallSendMsgSize returns a CallOption which sets the maximum message size the client can send.
func MaxCallSendMsgSize(s int) CallOption {
- return beforeCall(func(o *callInfo) error {
- o.maxSendMessageSize = &s
- return nil
- })
+ return MaxSendMsgSizeCallOption{MaxSendMsgSize: s}
+}
+
+// MaxSendMsgSizeCallOption is a CallOption that indicates the maximum message
+// size the client can send.
+// This is an EXPERIMENTAL API.
+type MaxSendMsgSizeCallOption struct {
+ MaxSendMsgSize int
}
+func (o MaxSendMsgSizeCallOption) before(c *callInfo) error {
+ c.maxSendMessageSize = &o.MaxSendMsgSize
+ return nil
+}
+func (o MaxSendMsgSizeCallOption) after(c *callInfo) {}
+
// PerRPCCredentials returns a CallOption that sets credentials.PerRPCCredentials
// for a call.
func PerRPCCredentials(creds credentials.PerRPCCredentials) CallOption {
- return beforeCall(func(c *callInfo) error {
- c.creds = creds
- return nil
- })
+ return PerRPCCredsCallOption{Creds: creds}
+}
+
+// PerRPCCredsCallOption is a CallOption that indicates the per-RPC
+// credentials to use for the call.
+// This is an EXPERIMENTAL API.
+type PerRPCCredsCallOption struct {
+ Creds credentials.PerRPCCredentials
+}
+
+func (o PerRPCCredsCallOption) before(c *callInfo) error {
+ c.creds = o.Creds
+ return nil
+}
+func (o PerRPCCredsCallOption) after(c *callInfo) {}
+
+// UseCompressor returns a CallOption which sets the compressor used when
+// sending the request. If WithCompressor is also set, UseCompressor has
+// higher priority.
+//
+// This API is EXPERIMENTAL.
+func UseCompressor(name string) CallOption {
+ return CompressorCallOption{CompressorType: name}
+}
+
+// CompressorCallOption is a CallOption that indicates the compressor to use.
+// This is an EXPERIMENTAL API.
+type CompressorCallOption struct {
+ CompressorType string
+}
+
+func (o CompressorCallOption) before(c *callInfo) error {
+ c.compressorType = o.CompressorType
+ return nil
+}
+func (o CompressorCallOption) after(c *callInfo) {}
+
+// CallContentSubtype returns a CallOption that will set the content-subtype
+// for a call. For example, if content-subtype is "json", the Content-Type over
+// the wire will be "application/grpc+json". The content-subtype is converted
+// to lowercase before being included in Content-Type. See Content-Type on
+// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
+// more details.
+//
+// If CallCustomCodec is not also used, the content-subtype will be used to
+// look up the Codec to use in the registry controlled by RegisterCodec. See
+// the documentation on RegisterCodec for details on registration. The lookup
+// of content-subtype is case-insensitive. If no such Codec is found, the call
+// will result in an error with code codes.Internal.
+//
+// If CallCustomCodec is also used, that Codec will be used for all request and
+// response messages, with the content-subtype set to the given contentSubtype
+// here for requests.
+func CallContentSubtype(contentSubtype string) CallOption {
+ return ContentSubtypeCallOption{ContentSubtype: strings.ToLower(contentSubtype)}
+}
+
+// ContentSubtypeCallOption is a CallOption that indicates the content-subtype
+// used for marshaling messages.
+// This is an EXPERIMENTAL API.
+type ContentSubtypeCallOption struct {
+ ContentSubtype string
+}
+
+func (o ContentSubtypeCallOption) before(c *callInfo) error {
+ c.contentSubtype = o.ContentSubtype
+ return nil
+}
+func (o ContentSubtypeCallOption) after(c *callInfo) {}
+
+// CallCustomCodec returns a CallOption that will set the given Codec to be
+// used for all request and response messages for a call. The result of calling
+// String() will be used as the content-subtype in a case-insensitive manner.
+//
+// See Content-Type on
+// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
+// more details. Also see the documentation on RegisterCodec and
+// CallContentSubtype for more details on the interaction between Codec and
+// content-subtype.
+//
+// This function is provided for advanced users; prefer to use only
+// CallContentSubtype to select a registered codec instead.
+func CallCustomCodec(codec Codec) CallOption {
+ return CustomCodecCallOption{Codec: codec}
+}
+
+// CustomCodecCallOption is a CallOption that indicates the codec used for
+// marshaling messages.
+// This is an EXPERIMENTAL API.
+type CustomCodecCallOption struct {
+ Codec Codec
+}
+
+func (o CustomCodecCallOption) before(c *callInfo) error {
+ c.codec = o.Codec
+ return nil
+}
+func (o CustomCodecCallOption) after(c *callInfo) {}
+
+// MaxRetryRPCBufferSize returns a CallOption that limits the amount of memory
+// used for buffering this RPC's requests for retry purposes.
+//
+// This API is EXPERIMENTAL.
+func MaxRetryRPCBufferSize(bytes int) CallOption {
+ return MaxRetryRPCBufferSizeCallOption{bytes}
+}
+
+// MaxRetryRPCBufferSizeCallOption is a CallOption indicating the amount of
+// memory to be used for caching this RPC for retry purposes.
+// This is an EXPERIMENTAL API.
+type MaxRetryRPCBufferSizeCallOption struct {
+ MaxRetryRPCBufferSize int
+}
+
+func (o MaxRetryRPCBufferSizeCallOption) before(c *callInfo) error {
+ c.maxRetryRPCBufferSize = o.MaxRetryRPCBufferSize
+ return nil
}
+func (o MaxRetryRPCBufferSizeCallOption) after(c *callInfo) {}
// The format of the payload: compressed or not?
type payloadFormat uint8
const (
- compressionNone payloadFormat = iota // no compression
- compressionMade
+ compressionNone payloadFormat = 0 // no compression
+ compressionMade payloadFormat = 1 // compressed
)
// parser reads complete gRPC messages from the underlying reader.
// error types.
r io.Reader
- // The header of a gRPC message. Find more detail
- // at https://grpc.io/docs/guides/wire.html.
+ // The header of a gRPC message. Find more detail at
+ // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
header [5]byte
}
// * io.EOF, when no messages remain
// * io.ErrUnexpectedEOF
// * of type transport.ConnectionError
-// * of type transport.StreamError
+// * an error from the status package
// No other error values or types must be returned, which also means
// that the underlying io.Reader must not return an incompatible
// error.
if length == 0 {
return pf, nil, nil
}
- if length > uint32(maxReceiveMessageSize) {
- return 0, nil, Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
+ if int64(length) > int64(maxInt) {
+ return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max length allowed on current machine (%d vs. %d)", length, maxInt)
+ }
+ if int(length) > maxReceiveMessageSize {
+ return 0, nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", length, maxReceiveMessageSize)
}
// TODO(bradfitz,zhaoq): garbage. reuse buffer after proto decoding instead
// of making it for each message:
return pf, msg, nil
}
-// encode serializes msg and prepends the message header. If msg is nil, it
-// generates the message header of 0 message length.
-func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayload *stats.OutPayload) ([]byte, error) {
- var (
- b []byte
- length uint
- )
- if msg != nil {
- var err error
- // TODO(zhaoq): optimize to reduce memory alloc and copying.
- b, err = c.Marshal(msg)
+// encode serializes msg and returns a buffer containing the message, or an
+// error if it is too large to be transmitted by grpc. If msg is nil, it
+// generates an empty message.
+func encode(c baseCodec, msg interface{}) ([]byte, error) {
+ if msg == nil { // NOTE: typed nils will not be caught by this check
+ return nil, nil
+ }
+ b, err := c.Marshal(msg)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
+ }
+ if uint(len(b)) > math.MaxUint32 {
+ return nil, status.Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", len(b))
+ }
+ return b, nil
+}
+
+// compress returns the input bytes compressed by compressor or cp. If both
+// compressors are nil, returns nil.
+//
+// TODO(dfawley): eliminate cp parameter by wrapping Compressor in an encoding.Compressor.
+func compress(in []byte, cp Compressor, compressor encoding.Compressor) ([]byte, error) {
+ if compressor == nil && cp == nil {
+ return nil, nil
+ }
+ wrapErr := func(err error) error {
+ return status.Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
+ }
+ cbuf := &bytes.Buffer{}
+ if compressor != nil {
+ z, err := compressor.Compress(cbuf)
if err != nil {
- return nil, Errorf(codes.Internal, "grpc: error while marshaling: %v", err.Error())
+ return nil, wrapErr(err)
}
- if outPayload != nil {
- outPayload.Payload = msg
- // TODO truncate large payload.
- outPayload.Data = b
- outPayload.Length = len(b)
+ if _, err := z.Write(in); err != nil {
+ return nil, wrapErr(err)
}
- if cp != nil {
- if err := cp.Do(cbuf, b); err != nil {
- return nil, Errorf(codes.Internal, "grpc: error while compressing: %v", err.Error())
- }
- b = cbuf.Bytes()
+ if err := z.Close(); err != nil {
+ return nil, wrapErr(err)
+ }
+ } else {
+ if err := cp.Do(cbuf, in); err != nil {
+ return nil, wrapErr(err)
}
- length = uint(len(b))
- }
- if length > math.MaxUint32 {
- return nil, Errorf(codes.ResourceExhausted, "grpc: message too large (%d bytes)", length)
}
+ return cbuf.Bytes(), nil
+}
- const (
- payloadLen = 1
- sizeLen = 4
- )
-
- var buf = make([]byte, payloadLen+sizeLen+len(b))
+const (
+ payloadLen = 1
+ sizeLen = 4
+ headerLen = payloadLen + sizeLen
+)
- // Write payload format
- if cp == nil {
- buf[0] = byte(compressionNone)
+// msgHeader returns a 5-byte header for the message being transmitted and the
+// payload, which is compData if non-nil or data otherwise.
+func msgHeader(data, compData []byte) (hdr []byte, payload []byte) {
+ hdr = make([]byte, headerLen)
+ if compData != nil {
+ hdr[0] = byte(compressionMade)
+ data = compData
} else {
- buf[0] = byte(compressionMade)
+ hdr[0] = byte(compressionNone)
}
- // Write length of b into buf
- binary.BigEndian.PutUint32(buf[1:], uint32(length))
- // Copy encoded msg to buf
- copy(buf[5:], b)
- if outPayload != nil {
- outPayload.WireLength = len(buf)
- }
+ // Write length of payload into buf
+ binary.BigEndian.PutUint32(hdr[payloadLen:], uint32(len(data)))
+ return hdr, data
+}
- return buf, nil
+func outPayload(client bool, msg interface{}, data, payload []byte, t time.Time) *stats.OutPayload {
+ return &stats.OutPayload{
+ Client: client,
+ Payload: msg,
+ Data: data,
+ Length: len(data),
+ WireLength: len(payload) + headerLen,
+ SentTime: t,
+ }
}
-func checkRecvPayload(pf payloadFormat, recvCompress string, dc Decompressor) error {
+func checkRecvPayload(pf payloadFormat, recvCompress string, haveCompressor bool) *status.Status {
switch pf {
case compressionNone:
case compressionMade:
- if dc == nil || recvCompress != dc.Type() {
- return Errorf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
+ if recvCompress == "" || recvCompress == encoding.Identity {
+ return status.New(codes.Internal, "grpc: compressed flag set with identity or empty encoding")
+ }
+ if !haveCompressor {
+ return status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", recvCompress)
}
default:
- return Errorf(codes.Internal, "grpc: received unexpected payload format %d", pf)
+ return status.Newf(codes.Internal, "grpc: received unexpected payload format %d", pf)
}
return nil
}
-func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, inPayload *stats.InPayload) error {
+type payloadInfo struct {
+ wireLength int // The compressed length got from wire.
+ uncompressedBytes []byte
+}
+
+func recvAndDecompress(p *parser, s *transport.Stream, dc Decompressor, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) ([]byte, error) {
pf, d, err := p.recvMsg(maxReceiveMessageSize)
if err != nil {
- return err
+ return nil, err
}
- if inPayload != nil {
- inPayload.WireLength = len(d)
+ if payInfo != nil {
+ payInfo.wireLength = len(d)
}
- if err := checkRecvPayload(pf, s.RecvCompress(), dc); err != nil {
- return err
+
+ if st := checkRecvPayload(pf, s.RecvCompress(), compressor != nil || dc != nil); st != nil {
+ return nil, st.Err()
}
+
if pf == compressionMade {
- d, err = dc.Do(bytes.NewReader(d))
- if err != nil {
- return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
+ // To match legacy behavior, if the decompressor is set by WithDecompressor or RPCDecompressor,
+ // use this decompressor as the default.
+ if dc != nil {
+ d, err = dc.Do(bytes.NewReader(d))
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
+ }
+ } else {
+ dcReader, err := compressor.Decompress(bytes.NewReader(d))
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
+ }
+ d, err = ioutil.ReadAll(dcReader)
+ if err != nil {
+ return nil, status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
+ }
}
}
if len(d) > maxReceiveMessageSize {
// TODO: Revisit the error code. Currently keep it consistent with java
// implementation.
- return Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize)
+ return nil, status.Errorf(codes.ResourceExhausted, "grpc: received message larger than max (%d vs. %d)", len(d), maxReceiveMessageSize)
+ }
+ return d, nil
+}
+
+// For the two compressor parameters, both should not be set, but if they are,
+// dc takes precedence over compressor.
+// TODO(dfawley): wrap the old compressor/decompressor using the new API?
+func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
+ d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
+ if err != nil {
+ return err
}
if err := c.Unmarshal(d, m); err != nil {
- return Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
+ return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message %v", err)
}
- if inPayload != nil {
- inPayload.RecvTime = time.Now()
- inPayload.Payload = m
- // TODO truncate large payload.
- inPayload.Data = d
- inPayload.Length = len(d)
+ if payInfo != nil {
+ payInfo.uncompressedBytes = d
}
return nil
}
type rpcInfo struct {
- bytesSent bool
- bytesReceived bool
+ failfast bool
}
type rpcInfoContextKey struct{}
-func newContextWithRPCInfo(ctx context.Context) context.Context {
- return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{})
+func newContextWithRPCInfo(ctx context.Context, failfast bool) context.Context {
+ return context.WithValue(ctx, rpcInfoContextKey{}, &rpcInfo{failfast: failfast})
}
func rpcInfoFromContext(ctx context.Context) (s *rpcInfo, ok bool) {
return
}
-func updateRPCInfoInContext(ctx context.Context, s rpcInfo) {
- if ss, ok := rpcInfoFromContext(ctx); ok {
- *ss = s
- }
- return
-}
-
// Code returns the error code for err if it was produced by the rpc system.
// Otherwise, it returns codes.Unknown.
//
-// Deprecated; use status.FromError and Code method instead.
+// Deprecated: use status.Code instead.
func Code(err error) codes.Code {
- if s, ok := status.FromError(err); ok {
- return s.Code()
- }
- return codes.Unknown
+ return status.Code(err)
}
// ErrorDesc returns the error description of err if it was produced by the rpc system.
// Otherwise, it returns err.Error() or empty string when err is nil.
//
-// Deprecated; use status.FromError and Message method instead.
+// Deprecated: use status.Convert and Message method instead.
func ErrorDesc(err error) string {
- if s, ok := status.FromError(err); ok {
- return s.Message()
- }
- return err.Error()
+ return status.Convert(err).Message()
}
// Errorf returns an error containing an error code and a description;
// Errorf returns nil if c is OK.
//
-// Deprecated; use status.Errorf instead.
+// Deprecated: use status.Errorf instead.
func Errorf(c codes.Code, format string, a ...interface{}) error {
return status.Errorf(c, format, a...)
}
-// MethodConfig defines the configuration recommended by the service providers for a
-// particular method.
-// This is EXPERIMENTAL and subject to change.
-type MethodConfig struct {
- // WaitForReady indicates whether RPCs sent to this method should wait until
- // the connection is ready by default (!failfast). The value specified via the
- // gRPC client API will override the value set here.
- WaitForReady *bool
- // Timeout is the default timeout for RPCs sent to this method. The actual
- // deadline used will be the minimum of the value specified here and the value
- // set by the application via the gRPC client API. If either one is not set,
- // then the other will be used. If neither is set, then the RPC has no deadline.
- Timeout *time.Duration
- // MaxReqSize is the maximum allowed payload size for an individual request in a
- // stream (client->server) in bytes. The size which is measured is the serialized
- // payload after per-message compression (but before stream compression) in bytes.
- // The actual value used is the minumum of the value specified here and the value set
- // by the application via the gRPC client API. If either one is not set, then the other
- // will be used. If neither is set, then the built-in default is used.
- MaxReqSize *int
- // MaxRespSize is the maximum allowed payload size for an individual response in a
- // stream (server->client) in bytes.
- MaxRespSize *int
-}
-
-// ServiceConfig is provided by the service provider and contains parameters for how
-// clients that connect to the service should behave.
-// This is EXPERIMENTAL and subject to change.
-type ServiceConfig struct {
- // LB is the load balancer the service providers recommends. The balancer specified
- // via grpc.WithBalancer will override this.
- LB Balancer
- // Methods contains a map for the methods in this service.
- // If there is an exact match for a method (i.e. /service/method) in the map, use the corresponding MethodConfig.
- // If there's no exact match, look for the default config for the service (/service/) and use the corresponding MethodConfig if it exists.
- // Otherwise, the method has no MethodConfig to use.
- Methods map[string]MethodConfig
-}
-
-func min(a, b *int) *int {
- if *a < *b {
- return a
- }
- return b
-}
-
-func getMaxSize(mcMax, doptMax *int, defaultVal int) *int {
- if mcMax == nil && doptMax == nil {
- return &defaultVal
- }
- if mcMax != nil && doptMax != nil {
- return min(mcMax, doptMax)
- }
- if mcMax != nil {
- return mcMax
- }
- return doptMax
-}
-
-// SupportPackageIsVersion3 is referenced from generated protocol buffer files.
-// The latest support package version is 4.
-// SupportPackageIsVersion3 is kept for compability. It will be removed in the
-// next support package version update.
-const SupportPackageIsVersion3 = true
-
-// SupportPackageIsVersion4 is referenced from generated protocol buffer files
-// to assert that that code is compatible with this version of the grpc package.
-//
-// This constant may be renamed in the future if a change in the generated code
-// requires a synchronised update of grpc-go and protoc-gen-go. This constant
-// should not be referenced from any other code.
-const SupportPackageIsVersion4 = true
+// toRPCErr converts an error into an error from the status package.
+func toRPCErr(err error) error {
+ if err == nil || err == io.EOF {
+ return err
+ }
+ if err == io.ErrUnexpectedEOF {
+ return status.Error(codes.Internal, err.Error())
+ }
+ if _, ok := status.FromError(err); ok {
+ return err
+ }
+ switch e := err.(type) {
+ case transport.ConnectionError:
+ return status.Error(codes.Unavailable, e.Desc)
+ default:
+ switch err {
+ case context.DeadlineExceeded:
+ return status.Error(codes.DeadlineExceeded, err.Error())
+ case context.Canceled:
+ return status.Error(codes.Canceled, err.Error())
+ }
+ }
+ return status.Error(codes.Unknown, err.Error())
+}
-// Version is the current grpc version.
-const Version = "1.6.0-dev"
+// setCallInfoCodec should only be called after CallOptions have been applied.
+func setCallInfoCodec(c *callInfo) error {
+ if c.codec != nil {
+ // codec was already set by a CallOption; use it.
+ return nil
+ }
+
+ if c.contentSubtype == "" {
+ // No codec specified in CallOptions; use proto by default.
+ c.codec = encoding.GetCodec(proto.Name)
+ return nil
+ }
+
+ // c.contentSubtype is already lowercased in CallContentSubtype
+ c.codec = encoding.GetCodec(c.contentSubtype)
+ if c.codec == nil {
+ return status.Errorf(codes.Internal, "no codec registered for content-subtype %s", c.contentSubtype)
+ }
+ return nil
+}
+
+// parseDialTarget returns the network and address to pass to dialer
+func parseDialTarget(target string) (net string, addr string) {
+ net = "tcp"
+
+ m1 := strings.Index(target, ":")
+ m2 := strings.Index(target, ":/")
+
+ // handle unix:addr which will fail with url.Parse
+ if m1 >= 0 && m2 < 0 {
+ if n := target[0:m1]; n == "unix" {
+ net = n
+ addr = target[m1+1:]
+ return net, addr
+ }
+ }
+ if m2 >= 0 {
+ t, err := url.Parse(target)
+ if err != nil {
+ return net, target
+ }
+ scheme := t.Scheme
+ addr = t.Path
+ if scheme == "unix" {
+ net = scheme
+ if addr == "" {
+ addr = t.Host
+ }
+ return net, addr
+ }
+ }
+
+ return net, target
+}
+
+// channelzData is used to store channelz related data for ClientConn, addrConn and Server.
+// These fields cannot be embedded in the original structs (e.g. ClientConn), since to do atomic
+// operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
+// Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
+type channelzData struct {
+ callsStarted int64
+ callsFailed int64
+ callsSucceeded int64
+ // lastCallStartedTime stores the timestamp that last call starts. It is of int64 type instead of
+ // time.Time since it's more costly to atomically update time.Time variable than int64 variable.
+ lastCallStartedTime int64
+}
+
+// The SupportPackageIsVersion variables are referenced from generated protocol
+// buffer files to ensure compatibility with the gRPC version used. The latest
+// support package version is 5.
+//
+// Older versions are kept for compatibility. They may be removed if
+// compatibility cannot be maintained.
+//
+// These constants should not be referenced from any other code.
+const (
+ SupportPackageIsVersion3 = true
+ SupportPackageIsVersion4 = true
+ SupportPackageIsVersion5 = true
+)
const grpcUA = "grpc-go/" + Version