diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport/transport.go')
-rw-r--r-- | vendor/google.golang.org/grpc/internal/transport/transport.go | 758 |
1 files changed, 758 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/transport.go b/vendor/google.golang.org/grpc/internal/transport/transport.go new file mode 100644 index 0000000..2580aa7 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/transport/transport.go | |||
@@ -0,0 +1,758 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | // Package transport defines and implements message oriented communication | ||
20 | // channel to complete various transactions (e.g., an RPC). It is meant for | ||
21 | // grpc-internal usage and is not intended to be imported directly by users. | ||
22 | package transport | ||
23 | |||
24 | import ( | ||
25 | "context" | ||
26 | "errors" | ||
27 | "fmt" | ||
28 | "io" | ||
29 | "net" | ||
30 | "sync" | ||
31 | "sync/atomic" | ||
32 | |||
33 | "google.golang.org/grpc/codes" | ||
34 | "google.golang.org/grpc/credentials" | ||
35 | "google.golang.org/grpc/keepalive" | ||
36 | "google.golang.org/grpc/metadata" | ||
37 | "google.golang.org/grpc/stats" | ||
38 | "google.golang.org/grpc/status" | ||
39 | "google.golang.org/grpc/tap" | ||
40 | ) | ||
41 | |||
42 | // recvMsg represents the received msg from the transport. All transport | ||
43 | // protocol specific info has been removed. | ||
44 | type recvMsg struct { | ||
45 | data []byte | ||
46 | // nil: received some data | ||
47 | // io.EOF: stream is completed. data is nil. | ||
48 | // other non-nil error: transport failure. data is nil. | ||
49 | err error | ||
50 | } | ||
51 | |||
52 | // recvBuffer is an unbounded channel of recvMsg structs. | ||
53 | // Note recvBuffer differs from controlBuffer only in that recvBuffer | ||
54 | // holds a channel of only recvMsg structs instead of objects implementing "item" interface. | ||
55 | // recvBuffer is written to much more often than | ||
56 | // controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put" | ||
57 | type recvBuffer struct { | ||
58 | c chan recvMsg | ||
59 | mu sync.Mutex | ||
60 | backlog []recvMsg | ||
61 | err error | ||
62 | } | ||
63 | |||
64 | func newRecvBuffer() *recvBuffer { | ||
65 | b := &recvBuffer{ | ||
66 | c: make(chan recvMsg, 1), | ||
67 | } | ||
68 | return b | ||
69 | } | ||
70 | |||
71 | func (b *recvBuffer) put(r recvMsg) { | ||
72 | b.mu.Lock() | ||
73 | if b.err != nil { | ||
74 | b.mu.Unlock() | ||
75 | // An error had occurred earlier, don't accept more | ||
76 | // data or errors. | ||
77 | return | ||
78 | } | ||
79 | b.err = r.err | ||
80 | if len(b.backlog) == 0 { | ||
81 | select { | ||
82 | case b.c <- r: | ||
83 | b.mu.Unlock() | ||
84 | return | ||
85 | default: | ||
86 | } | ||
87 | } | ||
88 | b.backlog = append(b.backlog, r) | ||
89 | b.mu.Unlock() | ||
90 | } | ||
91 | |||
92 | func (b *recvBuffer) load() { | ||
93 | b.mu.Lock() | ||
94 | if len(b.backlog) > 0 { | ||
95 | select { | ||
96 | case b.c <- b.backlog[0]: | ||
97 | b.backlog[0] = recvMsg{} | ||
98 | b.backlog = b.backlog[1:] | ||
99 | default: | ||
100 | } | ||
101 | } | ||
102 | b.mu.Unlock() | ||
103 | } | ||
104 | |||
105 | // get returns the channel that receives a recvMsg in the buffer. | ||
106 | // | ||
107 | // Upon receipt of a recvMsg, the caller should call load to send another | ||
108 | // recvMsg onto the channel if there is any. | ||
109 | func (b *recvBuffer) get() <-chan recvMsg { | ||
110 | return b.c | ||
111 | } | ||
112 | |||
113 | // recvBufferReader implements io.Reader interface to read the data from | ||
114 | // recvBuffer. | ||
115 | type recvBufferReader struct { | ||
116 | closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata. | ||
117 | ctx context.Context | ||
118 | ctxDone <-chan struct{} // cache of ctx.Done() (for performance). | ||
119 | recv *recvBuffer | ||
120 | last []byte // Stores the remaining data in the previous calls. | ||
121 | err error | ||
122 | } | ||
123 | |||
124 | // Read reads the next len(p) bytes from last. If last is drained, it tries to | ||
125 | // read additional data from recv. It blocks if there no additional data available | ||
126 | // in recv. If Read returns any non-nil error, it will continue to return that error. | ||
127 | func (r *recvBufferReader) Read(p []byte) (n int, err error) { | ||
128 | if r.err != nil { | ||
129 | return 0, r.err | ||
130 | } | ||
131 | if r.last != nil && len(r.last) > 0 { | ||
132 | // Read remaining data left in last call. | ||
133 | copied := copy(p, r.last) | ||
134 | r.last = r.last[copied:] | ||
135 | return copied, nil | ||
136 | } | ||
137 | if r.closeStream != nil { | ||
138 | n, r.err = r.readClient(p) | ||
139 | } else { | ||
140 | n, r.err = r.read(p) | ||
141 | } | ||
142 | return n, r.err | ||
143 | } | ||
144 | |||
145 | func (r *recvBufferReader) read(p []byte) (n int, err error) { | ||
146 | select { | ||
147 | case <-r.ctxDone: | ||
148 | return 0, ContextErr(r.ctx.Err()) | ||
149 | case m := <-r.recv.get(): | ||
150 | return r.readAdditional(m, p) | ||
151 | } | ||
152 | } | ||
153 | |||
154 | func (r *recvBufferReader) readClient(p []byte) (n int, err error) { | ||
155 | // If the context is canceled, then closes the stream with nil metadata. | ||
156 | // closeStream writes its error parameter to r.recv as a recvMsg. | ||
157 | // r.readAdditional acts on that message and returns the necessary error. | ||
158 | select { | ||
159 | case <-r.ctxDone: | ||
160 | r.closeStream(ContextErr(r.ctx.Err())) | ||
161 | m := <-r.recv.get() | ||
162 | return r.readAdditional(m, p) | ||
163 | case m := <-r.recv.get(): | ||
164 | return r.readAdditional(m, p) | ||
165 | } | ||
166 | } | ||
167 | |||
168 | func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) { | ||
169 | r.recv.load() | ||
170 | if m.err != nil { | ||
171 | return 0, m.err | ||
172 | } | ||
173 | copied := copy(p, m.data) | ||
174 | r.last = m.data[copied:] | ||
175 | return copied, nil | ||
176 | } | ||
177 | |||
178 | type streamState uint32 | ||
179 | |||
180 | const ( | ||
181 | streamActive streamState = iota | ||
182 | streamWriteDone // EndStream sent | ||
183 | streamReadDone // EndStream received | ||
184 | streamDone // the entire stream is finished. | ||
185 | ) | ||
186 | |||
187 | // Stream represents an RPC in the transport layer. | ||
188 | type Stream struct { | ||
189 | id uint32 | ||
190 | st ServerTransport // nil for client side Stream | ||
191 | ctx context.Context // the associated context of the stream | ||
192 | cancel context.CancelFunc // always nil for client side Stream | ||
193 | done chan struct{} // closed at the end of stream to unblock writers. On the client side. | ||
194 | ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance) | ||
195 | method string // the associated RPC method of the stream | ||
196 | recvCompress string | ||
197 | sendCompress string | ||
198 | buf *recvBuffer | ||
199 | trReader io.Reader | ||
200 | fc *inFlow | ||
201 | wq *writeQuota | ||
202 | |||
203 | // Callback to state application's intentions to read data. This | ||
204 | // is used to adjust flow control, if needed. | ||
205 | requestRead func(int) | ||
206 | |||
207 | headerChan chan struct{} // closed to indicate the end of header metadata. | ||
208 | headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. | ||
209 | |||
210 | // hdrMu protects header and trailer metadata on the server-side. | ||
211 | hdrMu sync.Mutex | ||
212 | // On client side, header keeps the received header metadata. | ||
213 | // | ||
214 | // On server side, header keeps the header set by SetHeader(). The complete | ||
215 | // header will merged into this after t.WriteHeader() is called. | ||
216 | header metadata.MD | ||
217 | trailer metadata.MD // the key-value map of trailer metadata. | ||
218 | |||
219 | noHeaders bool // set if the client never received headers (set only after the stream is done). | ||
220 | |||
221 | // On the server-side, headerSent is atomically set to 1 when the headers are sent out. | ||
222 | headerSent uint32 | ||
223 | |||
224 | state streamState | ||
225 | |||
226 | // On client-side it is the status error received from the server. | ||
227 | // On server-side it is unused. | ||
228 | status *status.Status | ||
229 | |||
230 | bytesReceived uint32 // indicates whether any bytes have been received on this stream | ||
231 | unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream | ||
232 | |||
233 | // contentSubtype is the content-subtype for requests. | ||
234 | // this must be lowercase or the behavior is undefined. | ||
235 | contentSubtype string | ||
236 | } | ||
237 | |||
238 | // isHeaderSent is only valid on the server-side. | ||
239 | func (s *Stream) isHeaderSent() bool { | ||
240 | return atomic.LoadUint32(&s.headerSent) == 1 | ||
241 | } | ||
242 | |||
243 | // updateHeaderSent updates headerSent and returns true | ||
244 | // if it was alreay set. It is valid only on server-side. | ||
245 | func (s *Stream) updateHeaderSent() bool { | ||
246 | return atomic.SwapUint32(&s.headerSent, 1) == 1 | ||
247 | } | ||
248 | |||
249 | func (s *Stream) swapState(st streamState) streamState { | ||
250 | return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st))) | ||
251 | } | ||
252 | |||
253 | func (s *Stream) compareAndSwapState(oldState, newState streamState) bool { | ||
254 | return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState)) | ||
255 | } | ||
256 | |||
257 | func (s *Stream) getState() streamState { | ||
258 | return streamState(atomic.LoadUint32((*uint32)(&s.state))) | ||
259 | } | ||
260 | |||
261 | func (s *Stream) waitOnHeader() error { | ||
262 | if s.headerChan == nil { | ||
263 | // On the server headerChan is always nil since a stream originates | ||
264 | // only after having received headers. | ||
265 | return nil | ||
266 | } | ||
267 | select { | ||
268 | case <-s.ctx.Done(): | ||
269 | return ContextErr(s.ctx.Err()) | ||
270 | case <-s.headerChan: | ||
271 | return nil | ||
272 | } | ||
273 | } | ||
274 | |||
275 | // RecvCompress returns the compression algorithm applied to the inbound | ||
276 | // message. It is empty string if there is no compression applied. | ||
277 | func (s *Stream) RecvCompress() string { | ||
278 | if err := s.waitOnHeader(); err != nil { | ||
279 | return "" | ||
280 | } | ||
281 | return s.recvCompress | ||
282 | } | ||
283 | |||
284 | // SetSendCompress sets the compression algorithm to the stream. | ||
285 | func (s *Stream) SetSendCompress(str string) { | ||
286 | s.sendCompress = str | ||
287 | } | ||
288 | |||
289 | // Done returns a channel which is closed when it receives the final status | ||
290 | // from the server. | ||
291 | func (s *Stream) Done() <-chan struct{} { | ||
292 | return s.done | ||
293 | } | ||
294 | |||
295 | // Header returns the header metadata of the stream. | ||
296 | // | ||
297 | // On client side, it acquires the key-value pairs of header metadata once it is | ||
298 | // available. It blocks until i) the metadata is ready or ii) there is no header | ||
299 | // metadata or iii) the stream is canceled/expired. | ||
300 | // | ||
301 | // On server side, it returns the out header after t.WriteHeader is called. | ||
302 | func (s *Stream) Header() (metadata.MD, error) { | ||
303 | if s.headerChan == nil && s.header != nil { | ||
304 | // On server side, return the header in stream. It will be the out | ||
305 | // header after t.WriteHeader is called. | ||
306 | return s.header.Copy(), nil | ||
307 | } | ||
308 | err := s.waitOnHeader() | ||
309 | // Even if the stream is closed, header is returned if available. | ||
310 | select { | ||
311 | case <-s.headerChan: | ||
312 | if s.header == nil { | ||
313 | return nil, nil | ||
314 | } | ||
315 | return s.header.Copy(), nil | ||
316 | default: | ||
317 | } | ||
318 | return nil, err | ||
319 | } | ||
320 | |||
321 | // TrailersOnly blocks until a header or trailers-only frame is received and | ||
322 | // then returns true if the stream was trailers-only. If the stream ends | ||
323 | // before headers are received, returns true, nil. If a context error happens | ||
324 | // first, returns it as a status error. Client-side only. | ||
325 | func (s *Stream) TrailersOnly() (bool, error) { | ||
326 | err := s.waitOnHeader() | ||
327 | if err != nil { | ||
328 | return false, err | ||
329 | } | ||
330 | // if !headerDone, some other connection error occurred. | ||
331 | return s.noHeaders && atomic.LoadUint32(&s.headerDone) == 1, nil | ||
332 | } | ||
333 | |||
334 | // Trailer returns the cached trailer metedata. Note that if it is not called | ||
335 | // after the entire stream is done, it could return an empty MD. Client | ||
336 | // side only. | ||
337 | // It can be safely read only after stream has ended that is either read | ||
338 | // or write have returned io.EOF. | ||
339 | func (s *Stream) Trailer() metadata.MD { | ||
340 | c := s.trailer.Copy() | ||
341 | return c | ||
342 | } | ||
343 | |||
344 | // ContentSubtype returns the content-subtype for a request. For example, a | ||
345 | // content-subtype of "proto" will result in a content-type of | ||
346 | // "application/grpc+proto". This will always be lowercase. See | ||
347 | // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for | ||
348 | // more details. | ||
349 | func (s *Stream) ContentSubtype() string { | ||
350 | return s.contentSubtype | ||
351 | } | ||
352 | |||
353 | // Context returns the context of the stream. | ||
354 | func (s *Stream) Context() context.Context { | ||
355 | return s.ctx | ||
356 | } | ||
357 | |||
358 | // Method returns the method for the stream. | ||
359 | func (s *Stream) Method() string { | ||
360 | return s.method | ||
361 | } | ||
362 | |||
363 | // Status returns the status received from the server. | ||
364 | // Status can be read safely only after the stream has ended, | ||
365 | // that is, after Done() is closed. | ||
366 | func (s *Stream) Status() *status.Status { | ||
367 | return s.status | ||
368 | } | ||
369 | |||
370 | // SetHeader sets the header metadata. This can be called multiple times. | ||
371 | // Server side only. | ||
372 | // This should not be called in parallel to other data writes. | ||
373 | func (s *Stream) SetHeader(md metadata.MD) error { | ||
374 | if md.Len() == 0 { | ||
375 | return nil | ||
376 | } | ||
377 | if s.isHeaderSent() || s.getState() == streamDone { | ||
378 | return ErrIllegalHeaderWrite | ||
379 | } | ||
380 | s.hdrMu.Lock() | ||
381 | s.header = metadata.Join(s.header, md) | ||
382 | s.hdrMu.Unlock() | ||
383 | return nil | ||
384 | } | ||
385 | |||
386 | // SendHeader sends the given header metadata. The given metadata is | ||
387 | // combined with any metadata set by previous calls to SetHeader and | ||
388 | // then written to the transport stream. | ||
389 | func (s *Stream) SendHeader(md metadata.MD) error { | ||
390 | return s.st.WriteHeader(s, md) | ||
391 | } | ||
392 | |||
393 | // SetTrailer sets the trailer metadata which will be sent with the RPC status | ||
394 | // by the server. This can be called multiple times. Server side only. | ||
395 | // This should not be called parallel to other data writes. | ||
396 | func (s *Stream) SetTrailer(md metadata.MD) error { | ||
397 | if md.Len() == 0 { | ||
398 | return nil | ||
399 | } | ||
400 | if s.getState() == streamDone { | ||
401 | return ErrIllegalHeaderWrite | ||
402 | } | ||
403 | s.hdrMu.Lock() | ||
404 | s.trailer = metadata.Join(s.trailer, md) | ||
405 | s.hdrMu.Unlock() | ||
406 | return nil | ||
407 | } | ||
408 | |||
409 | func (s *Stream) write(m recvMsg) { | ||
410 | s.buf.put(m) | ||
411 | } | ||
412 | |||
413 | // Read reads all p bytes from the wire for this stream. | ||
414 | func (s *Stream) Read(p []byte) (n int, err error) { | ||
415 | // Don't request a read if there was an error earlier | ||
416 | if er := s.trReader.(*transportReader).er; er != nil { | ||
417 | return 0, er | ||
418 | } | ||
419 | s.requestRead(len(p)) | ||
420 | return io.ReadFull(s.trReader, p) | ||
421 | } | ||
422 | |||
423 | // tranportReader reads all the data available for this Stream from the transport and | ||
424 | // passes them into the decoder, which converts them into a gRPC message stream. | ||
425 | // The error is io.EOF when the stream is done or another non-nil error if | ||
426 | // the stream broke. | ||
427 | type transportReader struct { | ||
428 | reader io.Reader | ||
429 | // The handler to control the window update procedure for both this | ||
430 | // particular stream and the associated transport. | ||
431 | windowHandler func(int) | ||
432 | er error | ||
433 | } | ||
434 | |||
435 | func (t *transportReader) Read(p []byte) (n int, err error) { | ||
436 | n, err = t.reader.Read(p) | ||
437 | if err != nil { | ||
438 | t.er = err | ||
439 | return | ||
440 | } | ||
441 | t.windowHandler(n) | ||
442 | return | ||
443 | } | ||
444 | |||
445 | // BytesReceived indicates whether any bytes have been received on this stream. | ||
446 | func (s *Stream) BytesReceived() bool { | ||
447 | return atomic.LoadUint32(&s.bytesReceived) == 1 | ||
448 | } | ||
449 | |||
450 | // Unprocessed indicates whether the server did not process this stream -- | ||
451 | // i.e. it sent a refused stream or GOAWAY including this stream ID. | ||
452 | func (s *Stream) Unprocessed() bool { | ||
453 | return atomic.LoadUint32(&s.unprocessed) == 1 | ||
454 | } | ||
455 | |||
456 | // GoString is implemented by Stream so context.String() won't | ||
457 | // race when printing %#v. | ||
458 | func (s *Stream) GoString() string { | ||
459 | return fmt.Sprintf("<stream: %p, %v>", s, s.method) | ||
460 | } | ||
461 | |||
462 | // state of transport | ||
463 | type transportState int | ||
464 | |||
465 | const ( | ||
466 | reachable transportState = iota | ||
467 | closing | ||
468 | draining | ||
469 | ) | ||
470 | |||
471 | // ServerConfig consists of all the configurations to establish a server transport. | ||
472 | type ServerConfig struct { | ||
473 | MaxStreams uint32 | ||
474 | AuthInfo credentials.AuthInfo | ||
475 | InTapHandle tap.ServerInHandle | ||
476 | StatsHandler stats.Handler | ||
477 | KeepaliveParams keepalive.ServerParameters | ||
478 | KeepalivePolicy keepalive.EnforcementPolicy | ||
479 | InitialWindowSize int32 | ||
480 | InitialConnWindowSize int32 | ||
481 | WriteBufferSize int | ||
482 | ReadBufferSize int | ||
483 | ChannelzParentID int64 | ||
484 | MaxHeaderListSize *uint32 | ||
485 | } | ||
486 | |||
487 | // NewServerTransport creates a ServerTransport with conn or non-nil error | ||
488 | // if it fails. | ||
489 | func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) { | ||
490 | return newHTTP2Server(conn, config) | ||
491 | } | ||
492 | |||
493 | // ConnectOptions covers all relevant options for communicating with the server. | ||
494 | type ConnectOptions struct { | ||
495 | // UserAgent is the application user agent. | ||
496 | UserAgent string | ||
497 | // Dialer specifies how to dial a network address. | ||
498 | Dialer func(context.Context, string) (net.Conn, error) | ||
499 | // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors. | ||
500 | FailOnNonTempDialError bool | ||
501 | // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs. | ||
502 | PerRPCCredentials []credentials.PerRPCCredentials | ||
503 | // TransportCredentials stores the Authenticator required to setup a client | ||
504 | // connection. Only one of TransportCredentials and CredsBundle is non-nil. | ||
505 | TransportCredentials credentials.TransportCredentials | ||
506 | // CredsBundle is the credentials bundle to be used. Only one of | ||
507 | // TransportCredentials and CredsBundle is non-nil. | ||
508 | CredsBundle credentials.Bundle | ||
509 | // KeepaliveParams stores the keepalive parameters. | ||
510 | KeepaliveParams keepalive.ClientParameters | ||
511 | // StatsHandler stores the handler for stats. | ||
512 | StatsHandler stats.Handler | ||
513 | // InitialWindowSize sets the initial window size for a stream. | ||
514 | InitialWindowSize int32 | ||
515 | // InitialConnWindowSize sets the initial window size for a connection. | ||
516 | InitialConnWindowSize int32 | ||
517 | // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire. | ||
518 | WriteBufferSize int | ||
519 | // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall. | ||
520 | ReadBufferSize int | ||
521 | // ChannelzParentID sets the addrConn id which initiate the creation of this client transport. | ||
522 | ChannelzParentID int64 | ||
523 | // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received. | ||
524 | MaxHeaderListSize *uint32 | ||
525 | } | ||
526 | |||
527 | // TargetInfo contains the information of the target such as network address and metadata. | ||
528 | type TargetInfo struct { | ||
529 | Addr string | ||
530 | Metadata interface{} | ||
531 | Authority string | ||
532 | } | ||
533 | |||
534 | // NewClientTransport establishes the transport with the required ConnectOptions | ||
535 | // and returns it to the caller. | ||
536 | func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) { | ||
537 | return newHTTP2Client(connectCtx, ctx, target, opts, onPrefaceReceipt, onGoAway, onClose) | ||
538 | } | ||
539 | |||
540 | // Options provides additional hints and information for message | ||
541 | // transmission. | ||
542 | type Options struct { | ||
543 | // Last indicates whether this write is the last piece for | ||
544 | // this stream. | ||
545 | Last bool | ||
546 | } | ||
547 | |||
548 | // CallHdr carries the information of a particular RPC. | ||
549 | type CallHdr struct { | ||
550 | // Host specifies the peer's host. | ||
551 | Host string | ||
552 | |||
553 | // Method specifies the operation to perform. | ||
554 | Method string | ||
555 | |||
556 | // SendCompress specifies the compression algorithm applied on | ||
557 | // outbound message. | ||
558 | SendCompress string | ||
559 | |||
560 | // Creds specifies credentials.PerRPCCredentials for a call. | ||
561 | Creds credentials.PerRPCCredentials | ||
562 | |||
563 | // ContentSubtype specifies the content-subtype for a request. For example, a | ||
564 | // content-subtype of "proto" will result in a content-type of | ||
565 | // "application/grpc+proto". The value of ContentSubtype must be all | ||
566 | // lowercase, otherwise the behavior is undefined. See | ||
567 | // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests | ||
568 | // for more details. | ||
569 | ContentSubtype string | ||
570 | |||
571 | PreviousAttempts int // value of grpc-previous-rpc-attempts header to set | ||
572 | } | ||
573 | |||
574 | // ClientTransport is the common interface for all gRPC client-side transport | ||
575 | // implementations. | ||
576 | type ClientTransport interface { | ||
577 | // Close tears down this transport. Once it returns, the transport | ||
578 | // should not be accessed any more. The caller must make sure this | ||
579 | // is called only once. | ||
580 | Close() error | ||
581 | |||
582 | // GracefulClose starts to tear down the transport. It stops accepting | ||
583 | // new RPCs and wait the completion of the pending RPCs. | ||
584 | GracefulClose() error | ||
585 | |||
586 | // Write sends the data for the given stream. A nil stream indicates | ||
587 | // the write is to be performed on the transport as a whole. | ||
588 | Write(s *Stream, hdr []byte, data []byte, opts *Options) error | ||
589 | |||
590 | // NewStream creates a Stream for an RPC. | ||
591 | NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) | ||
592 | |||
593 | // CloseStream clears the footprint of a stream when the stream is | ||
594 | // not needed any more. The err indicates the error incurred when | ||
595 | // CloseStream is called. Must be called when a stream is finished | ||
596 | // unless the associated transport is closing. | ||
597 | CloseStream(stream *Stream, err error) | ||
598 | |||
599 | // Error returns a channel that is closed when some I/O error | ||
600 | // happens. Typically the caller should have a goroutine to monitor | ||
601 | // this in order to take action (e.g., close the current transport | ||
602 | // and create a new one) in error case. It should not return nil | ||
603 | // once the transport is initiated. | ||
604 | Error() <-chan struct{} | ||
605 | |||
606 | // GoAway returns a channel that is closed when ClientTransport | ||
607 | // receives the draining signal from the server (e.g., GOAWAY frame in | ||
608 | // HTTP/2). | ||
609 | GoAway() <-chan struct{} | ||
610 | |||
611 | // GetGoAwayReason returns the reason why GoAway frame was received. | ||
612 | GetGoAwayReason() GoAwayReason | ||
613 | |||
614 | // IncrMsgSent increments the number of message sent through this transport. | ||
615 | IncrMsgSent() | ||
616 | |||
617 | // IncrMsgRecv increments the number of message received through this transport. | ||
618 | IncrMsgRecv() | ||
619 | } | ||
620 | |||
621 | // ServerTransport is the common interface for all gRPC server-side transport | ||
622 | // implementations. | ||
623 | // | ||
624 | // Methods may be called concurrently from multiple goroutines, but | ||
625 | // Write methods for a given Stream will be called serially. | ||
626 | type ServerTransport interface { | ||
627 | // HandleStreams receives incoming streams using the given handler. | ||
628 | HandleStreams(func(*Stream), func(context.Context, string) context.Context) | ||
629 | |||
630 | // WriteHeader sends the header metadata for the given stream. | ||
631 | // WriteHeader may not be called on all streams. | ||
632 | WriteHeader(s *Stream, md metadata.MD) error | ||
633 | |||
634 | // Write sends the data for the given stream. | ||
635 | // Write may not be called on all streams. | ||
636 | Write(s *Stream, hdr []byte, data []byte, opts *Options) error | ||
637 | |||
638 | // WriteStatus sends the status of a stream to the client. WriteStatus is | ||
639 | // the final call made on a stream and always occurs. | ||
640 | WriteStatus(s *Stream, st *status.Status) error | ||
641 | |||
642 | // Close tears down the transport. Once it is called, the transport | ||
643 | // should not be accessed any more. All the pending streams and their | ||
644 | // handlers will be terminated asynchronously. | ||
645 | Close() error | ||
646 | |||
647 | // RemoteAddr returns the remote network address. | ||
648 | RemoteAddr() net.Addr | ||
649 | |||
650 | // Drain notifies the client this ServerTransport stops accepting new RPCs. | ||
651 | Drain() | ||
652 | |||
653 | // IncrMsgSent increments the number of message sent through this transport. | ||
654 | IncrMsgSent() | ||
655 | |||
656 | // IncrMsgRecv increments the number of message received through this transport. | ||
657 | IncrMsgRecv() | ||
658 | } | ||
659 | |||
660 | // connectionErrorf creates an ConnectionError with the specified error description. | ||
661 | func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError { | ||
662 | return ConnectionError{ | ||
663 | Desc: fmt.Sprintf(format, a...), | ||
664 | temp: temp, | ||
665 | err: e, | ||
666 | } | ||
667 | } | ||
668 | |||
669 | // ConnectionError is an error that results in the termination of the | ||
670 | // entire connection and the retry of all the active streams. | ||
671 | type ConnectionError struct { | ||
672 | Desc string | ||
673 | temp bool | ||
674 | err error | ||
675 | } | ||
676 | |||
677 | func (e ConnectionError) Error() string { | ||
678 | return fmt.Sprintf("connection error: desc = %q", e.Desc) | ||
679 | } | ||
680 | |||
681 | // Temporary indicates if this connection error is temporary or fatal. | ||
682 | func (e ConnectionError) Temporary() bool { | ||
683 | return e.temp | ||
684 | } | ||
685 | |||
686 | // Origin returns the original error of this connection error. | ||
687 | func (e ConnectionError) Origin() error { | ||
688 | // Never return nil error here. | ||
689 | // If the original error is nil, return itself. | ||
690 | if e.err == nil { | ||
691 | return e | ||
692 | } | ||
693 | return e.err | ||
694 | } | ||
695 | |||
696 | var ( | ||
697 | // ErrConnClosing indicates that the transport is closing. | ||
698 | ErrConnClosing = connectionErrorf(true, nil, "transport is closing") | ||
699 | // errStreamDrain indicates that the stream is rejected because the | ||
700 | // connection is draining. This could be caused by goaway or balancer | ||
701 | // removing the address. | ||
702 | errStreamDrain = status.Error(codes.Unavailable, "the connection is draining") | ||
703 | // errStreamDone is returned from write at the client side to indiacte application | ||
704 | // layer of an error. | ||
705 | errStreamDone = errors.New("the stream is done") | ||
706 | // StatusGoAway indicates that the server sent a GOAWAY that included this | ||
707 | // stream's ID in unprocessed RPCs. | ||
708 | statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection") | ||
709 | ) | ||
710 | |||
711 | // GoAwayReason contains the reason for the GoAway frame received. | ||
712 | type GoAwayReason uint8 | ||
713 | |||
714 | const ( | ||
715 | // GoAwayInvalid indicates that no GoAway frame is received. | ||
716 | GoAwayInvalid GoAwayReason = 0 | ||
717 | // GoAwayNoReason is the default value when GoAway frame is received. | ||
718 | GoAwayNoReason GoAwayReason = 1 | ||
719 | // GoAwayTooManyPings indicates that a GoAway frame with | ||
720 | // ErrCodeEnhanceYourCalm was received and that the debug data said | ||
721 | // "too_many_pings". | ||
722 | GoAwayTooManyPings GoAwayReason = 2 | ||
723 | ) | ||
724 | |||
725 | // channelzData is used to store channelz related data for http2Client and http2Server. | ||
726 | // These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic | ||
727 | // operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment. | ||
728 | // Here, by grouping those int64 fields inside a struct, we are enforcing the alignment. | ||
729 | type channelzData struct { | ||
730 | kpCount int64 | ||
731 | // The number of streams that have started, including already finished ones. | ||
732 | streamsStarted int64 | ||
733 | // Client side: The number of streams that have ended successfully by receiving | ||
734 | // EoS bit set frame from server. | ||
735 | // Server side: The number of streams that have ended successfully by sending | ||
736 | // frame with EoS bit set. | ||
737 | streamsSucceeded int64 | ||
738 | streamsFailed int64 | ||
739 | // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type | ||
740 | // instead of time.Time since it's more costly to atomically update time.Time variable than int64 | ||
741 | // variable. The same goes for lastMsgSentTime and lastMsgRecvTime. | ||
742 | lastStreamCreatedTime int64 | ||
743 | msgSent int64 | ||
744 | msgRecv int64 | ||
745 | lastMsgSentTime int64 | ||
746 | lastMsgRecvTime int64 | ||
747 | } | ||
748 | |||
749 | // ContextErr converts the error from context package into a status error. | ||
750 | func ContextErr(err error) error { | ||
751 | switch err { | ||
752 | case context.DeadlineExceeded: | ||
753 | return status.Error(codes.DeadlineExceeded, err.Error()) | ||
754 | case context.Canceled: | ||
755 | return status.Error(codes.Canceled, err.Error()) | ||
756 | } | ||
757 | return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err) | ||
758 | } | ||