diff options
author | Alex Pilon <apilon@hashicorp.com> | 2019-02-22 18:24:37 -0500 |
---|---|---|
committer | Alex Pilon <apilon@hashicorp.com> | 2019-02-22 18:24:37 -0500 |
commit | 15c0b25d011f37e7c20aeca9eaf461f78285b8d9 (patch) | |
tree | 255c250a5c9d4801c74092d33b7337d8c14438ff /vendor/google.golang.org/grpc/transport/transport.go | |
parent | 07971ca38143c5faf951d152fba370ddcbe26ad5 (diff) | |
download | terraform-provider-statuscake-15c0b25d011f37e7c20aeca9eaf461f78285b8d9.tar.gz terraform-provider-statuscake-15c0b25d011f37e7c20aeca9eaf461f78285b8d9.tar.zst terraform-provider-statuscake-15c0b25d011f37e7c20aeca9eaf461f78285b8d9.zip |
deps: github.com/hashicorp/terraform@sdk-v0.11-with-go-modules
Updated via: go get github.com/hashicorp/terraform@sdk-v0.11-with-go-modules and go mod tidy
Diffstat (limited to 'vendor/google.golang.org/grpc/transport/transport.go')
-rw-r--r-- | vendor/google.golang.org/grpc/transport/transport.go | 730 |
1 files changed, 730 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go new file mode 100644 index 0000000..ec0fe67 --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/transport.go | |||
@@ -0,0 +1,730 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | // Package transport defines and implements message oriented communication | ||
20 | // channel to complete various transactions (e.g., an RPC). | ||
21 | package transport // import "google.golang.org/grpc/transport" | ||
22 | |||
23 | import ( | ||
24 | "fmt" | ||
25 | "io" | ||
26 | "net" | ||
27 | "sync" | ||
28 | |||
29 | "golang.org/x/net/context" | ||
30 | "golang.org/x/net/http2" | ||
31 | "google.golang.org/grpc/codes" | ||
32 | "google.golang.org/grpc/credentials" | ||
33 | "google.golang.org/grpc/keepalive" | ||
34 | "google.golang.org/grpc/metadata" | ||
35 | "google.golang.org/grpc/stats" | ||
36 | "google.golang.org/grpc/status" | ||
37 | "google.golang.org/grpc/tap" | ||
38 | ) | ||
39 | |||
40 | // recvMsg represents the received msg from the transport. All transport | ||
41 | // protocol specific info has been removed. | ||
42 | type recvMsg struct { | ||
43 | data []byte | ||
44 | // nil: received some data | ||
45 | // io.EOF: stream is completed. data is nil. | ||
46 | // other non-nil error: transport failure. data is nil. | ||
47 | err error | ||
48 | } | ||
49 | |||
50 | // recvBuffer is an unbounded channel of recvMsg structs. | ||
51 | // Note recvBuffer differs from controlBuffer only in that recvBuffer | ||
52 | // holds a channel of only recvMsg structs instead of objects implementing "item" interface. | ||
53 | // recvBuffer is written to much more often than | ||
54 | // controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put" | ||
55 | type recvBuffer struct { | ||
56 | c chan recvMsg | ||
57 | mu sync.Mutex | ||
58 | backlog []recvMsg | ||
59 | } | ||
60 | |||
61 | func newRecvBuffer() *recvBuffer { | ||
62 | b := &recvBuffer{ | ||
63 | c: make(chan recvMsg, 1), | ||
64 | } | ||
65 | return b | ||
66 | } | ||
67 | |||
68 | func (b *recvBuffer) put(r recvMsg) { | ||
69 | b.mu.Lock() | ||
70 | defer b.mu.Unlock() | ||
71 | if len(b.backlog) == 0 { | ||
72 | select { | ||
73 | case b.c <- r: | ||
74 | return | ||
75 | default: | ||
76 | } | ||
77 | } | ||
78 | b.backlog = append(b.backlog, r) | ||
79 | } | ||
80 | |||
81 | func (b *recvBuffer) load() { | ||
82 | b.mu.Lock() | ||
83 | defer b.mu.Unlock() | ||
84 | if len(b.backlog) > 0 { | ||
85 | select { | ||
86 | case b.c <- b.backlog[0]: | ||
87 | b.backlog[0] = recvMsg{} | ||
88 | b.backlog = b.backlog[1:] | ||
89 | default: | ||
90 | } | ||
91 | } | ||
92 | } | ||
93 | |||
94 | // get returns the channel that receives a recvMsg in the buffer. | ||
95 | // | ||
96 | // Upon receipt of a recvMsg, the caller should call load to send another | ||
97 | // recvMsg onto the channel if there is any. | ||
98 | func (b *recvBuffer) get() <-chan recvMsg { | ||
99 | return b.c | ||
100 | } | ||
101 | |||
102 | // recvBufferReader implements io.Reader interface to read the data from | ||
103 | // recvBuffer. | ||
104 | type recvBufferReader struct { | ||
105 | ctx context.Context | ||
106 | goAway chan struct{} | ||
107 | recv *recvBuffer | ||
108 | last []byte // Stores the remaining data in the previous calls. | ||
109 | err error | ||
110 | } | ||
111 | |||
112 | // Read reads the next len(p) bytes from last. If last is drained, it tries to | ||
113 | // read additional data from recv. It blocks if there no additional data available | ||
114 | // in recv. If Read returns any non-nil error, it will continue to return that error. | ||
115 | func (r *recvBufferReader) Read(p []byte) (n int, err error) { | ||
116 | if r.err != nil { | ||
117 | return 0, r.err | ||
118 | } | ||
119 | n, r.err = r.read(p) | ||
120 | return n, r.err | ||
121 | } | ||
122 | |||
123 | func (r *recvBufferReader) read(p []byte) (n int, err error) { | ||
124 | if r.last != nil && len(r.last) > 0 { | ||
125 | // Read remaining data left in last call. | ||
126 | copied := copy(p, r.last) | ||
127 | r.last = r.last[copied:] | ||
128 | return copied, nil | ||
129 | } | ||
130 | select { | ||
131 | case <-r.ctx.Done(): | ||
132 | return 0, ContextErr(r.ctx.Err()) | ||
133 | case <-r.goAway: | ||
134 | return 0, ErrStreamDrain | ||
135 | case m := <-r.recv.get(): | ||
136 | r.recv.load() | ||
137 | if m.err != nil { | ||
138 | return 0, m.err | ||
139 | } | ||
140 | copied := copy(p, m.data) | ||
141 | r.last = m.data[copied:] | ||
142 | return copied, nil | ||
143 | } | ||
144 | } | ||
145 | |||
146 | // All items in an out of a controlBuffer should be the same type. | ||
147 | type item interface { | ||
148 | item() | ||
149 | } | ||
150 | |||
151 | // controlBuffer is an unbounded channel of item. | ||
152 | type controlBuffer struct { | ||
153 | c chan item | ||
154 | mu sync.Mutex | ||
155 | backlog []item | ||
156 | } | ||
157 | |||
158 | func newControlBuffer() *controlBuffer { | ||
159 | b := &controlBuffer{ | ||
160 | c: make(chan item, 1), | ||
161 | } | ||
162 | return b | ||
163 | } | ||
164 | |||
165 | func (b *controlBuffer) put(r item) { | ||
166 | b.mu.Lock() | ||
167 | defer b.mu.Unlock() | ||
168 | if len(b.backlog) == 0 { | ||
169 | select { | ||
170 | case b.c <- r: | ||
171 | return | ||
172 | default: | ||
173 | } | ||
174 | } | ||
175 | b.backlog = append(b.backlog, r) | ||
176 | } | ||
177 | |||
178 | func (b *controlBuffer) load() { | ||
179 | b.mu.Lock() | ||
180 | defer b.mu.Unlock() | ||
181 | if len(b.backlog) > 0 { | ||
182 | select { | ||
183 | case b.c <- b.backlog[0]: | ||
184 | b.backlog[0] = nil | ||
185 | b.backlog = b.backlog[1:] | ||
186 | default: | ||
187 | } | ||
188 | } | ||
189 | } | ||
190 | |||
191 | // get returns the channel that receives an item in the buffer. | ||
192 | // | ||
193 | // Upon receipt of an item, the caller should call load to send another | ||
194 | // item onto the channel if there is any. | ||
195 | func (b *controlBuffer) get() <-chan item { | ||
196 | return b.c | ||
197 | } | ||
198 | |||
199 | type streamState uint8 | ||
200 | |||
201 | const ( | ||
202 | streamActive streamState = iota | ||
203 | streamWriteDone // EndStream sent | ||
204 | streamReadDone // EndStream received | ||
205 | streamDone // the entire stream is finished. | ||
206 | ) | ||
207 | |||
208 | // Stream represents an RPC in the transport layer. | ||
209 | type Stream struct { | ||
210 | id uint32 | ||
211 | // nil for client side Stream. | ||
212 | st ServerTransport | ||
213 | // ctx is the associated context of the stream. | ||
214 | ctx context.Context | ||
215 | // cancel is always nil for client side Stream. | ||
216 | cancel context.CancelFunc | ||
217 | // done is closed when the final status arrives. | ||
218 | done chan struct{} | ||
219 | // goAway is closed when the server sent GoAways signal before this stream was initiated. | ||
220 | goAway chan struct{} | ||
221 | // method records the associated RPC method of the stream. | ||
222 | method string | ||
223 | recvCompress string | ||
224 | sendCompress string | ||
225 | buf *recvBuffer | ||
226 | trReader io.Reader | ||
227 | fc *inFlow | ||
228 | recvQuota uint32 | ||
229 | |||
230 | // TODO: Remote this unused variable. | ||
231 | // The accumulated inbound quota pending for window update. | ||
232 | updateQuota uint32 | ||
233 | |||
234 | // Callback to state application's intentions to read data. This | ||
235 | // is used to adjust flow control, if need be. | ||
236 | requestRead func(int) | ||
237 | |||
238 | sendQuotaPool *quotaPool | ||
239 | // Close headerChan to indicate the end of reception of header metadata. | ||
240 | headerChan chan struct{} | ||
241 | // header caches the received header metadata. | ||
242 | header metadata.MD | ||
243 | // The key-value map of trailer metadata. | ||
244 | trailer metadata.MD | ||
245 | |||
246 | mu sync.RWMutex // guard the following | ||
247 | // headerOK becomes true from the first header is about to send. | ||
248 | headerOk bool | ||
249 | state streamState | ||
250 | // true iff headerChan is closed. Used to avoid closing headerChan | ||
251 | // multiple times. | ||
252 | headerDone bool | ||
253 | // the status error received from the server. | ||
254 | status *status.Status | ||
255 | // rstStream indicates whether a RST_STREAM frame needs to be sent | ||
256 | // to the server to signify that this stream is closing. | ||
257 | rstStream bool | ||
258 | // rstError is the error that needs to be sent along with the RST_STREAM frame. | ||
259 | rstError http2.ErrCode | ||
260 | // bytesSent and bytesReceived indicates whether any bytes have been sent or | ||
261 | // received on this stream. | ||
262 | bytesSent bool | ||
263 | bytesReceived bool | ||
264 | } | ||
265 | |||
266 | // RecvCompress returns the compression algorithm applied to the inbound | ||
267 | // message. It is empty string if there is no compression applied. | ||
268 | func (s *Stream) RecvCompress() string { | ||
269 | return s.recvCompress | ||
270 | } | ||
271 | |||
272 | // SetSendCompress sets the compression algorithm to the stream. | ||
273 | func (s *Stream) SetSendCompress(str string) { | ||
274 | s.sendCompress = str | ||
275 | } | ||
276 | |||
277 | // Done returns a chanel which is closed when it receives the final status | ||
278 | // from the server. | ||
279 | func (s *Stream) Done() <-chan struct{} { | ||
280 | return s.done | ||
281 | } | ||
282 | |||
283 | // GoAway returns a channel which is closed when the server sent GoAways signal | ||
284 | // before this stream was initiated. | ||
285 | func (s *Stream) GoAway() <-chan struct{} { | ||
286 | return s.goAway | ||
287 | } | ||
288 | |||
289 | // Header acquires the key-value pairs of header metadata once it | ||
290 | // is available. It blocks until i) the metadata is ready or ii) there is no | ||
291 | // header metadata or iii) the stream is canceled/expired. | ||
292 | func (s *Stream) Header() (metadata.MD, error) { | ||
293 | var err error | ||
294 | select { | ||
295 | case <-s.ctx.Done(): | ||
296 | err = ContextErr(s.ctx.Err()) | ||
297 | case <-s.goAway: | ||
298 | err = ErrStreamDrain | ||
299 | case <-s.headerChan: | ||
300 | return s.header.Copy(), nil | ||
301 | } | ||
302 | // Even if the stream is closed, header is returned if available. | ||
303 | select { | ||
304 | case <-s.headerChan: | ||
305 | return s.header.Copy(), nil | ||
306 | default: | ||
307 | } | ||
308 | return nil, err | ||
309 | } | ||
310 | |||
311 | // Trailer returns the cached trailer metedata. Note that if it is not called | ||
312 | // after the entire stream is done, it could return an empty MD. Client | ||
313 | // side only. | ||
314 | func (s *Stream) Trailer() metadata.MD { | ||
315 | s.mu.RLock() | ||
316 | defer s.mu.RUnlock() | ||
317 | return s.trailer.Copy() | ||
318 | } | ||
319 | |||
320 | // ServerTransport returns the underlying ServerTransport for the stream. | ||
321 | // The client side stream always returns nil. | ||
322 | func (s *Stream) ServerTransport() ServerTransport { | ||
323 | return s.st | ||
324 | } | ||
325 | |||
326 | // Context returns the context of the stream. | ||
327 | func (s *Stream) Context() context.Context { | ||
328 | return s.ctx | ||
329 | } | ||
330 | |||
331 | // Method returns the method for the stream. | ||
332 | func (s *Stream) Method() string { | ||
333 | return s.method | ||
334 | } | ||
335 | |||
336 | // Status returns the status received from the server. | ||
337 | func (s *Stream) Status() *status.Status { | ||
338 | return s.status | ||
339 | } | ||
340 | |||
341 | // SetHeader sets the header metadata. This can be called multiple times. | ||
342 | // Server side only. | ||
343 | func (s *Stream) SetHeader(md metadata.MD) error { | ||
344 | s.mu.Lock() | ||
345 | defer s.mu.Unlock() | ||
346 | if s.headerOk || s.state == streamDone { | ||
347 | return ErrIllegalHeaderWrite | ||
348 | } | ||
349 | if md.Len() == 0 { | ||
350 | return nil | ||
351 | } | ||
352 | s.header = metadata.Join(s.header, md) | ||
353 | return nil | ||
354 | } | ||
355 | |||
356 | // SetTrailer sets the trailer metadata which will be sent with the RPC status | ||
357 | // by the server. This can be called multiple times. Server side only. | ||
358 | func (s *Stream) SetTrailer(md metadata.MD) error { | ||
359 | if md.Len() == 0 { | ||
360 | return nil | ||
361 | } | ||
362 | s.mu.Lock() | ||
363 | defer s.mu.Unlock() | ||
364 | s.trailer = metadata.Join(s.trailer, md) | ||
365 | return nil | ||
366 | } | ||
367 | |||
368 | func (s *Stream) write(m recvMsg) { | ||
369 | s.buf.put(m) | ||
370 | } | ||
371 | |||
372 | // Read reads all p bytes from the wire for this stream. | ||
373 | func (s *Stream) Read(p []byte) (n int, err error) { | ||
374 | // Don't request a read if there was an error earlier | ||
375 | if er := s.trReader.(*transportReader).er; er != nil { | ||
376 | return 0, er | ||
377 | } | ||
378 | s.requestRead(len(p)) | ||
379 | return io.ReadFull(s.trReader, p) | ||
380 | } | ||
381 | |||
382 | // tranportReader reads all the data available for this Stream from the transport and | ||
383 | // passes them into the decoder, which converts them into a gRPC message stream. | ||
384 | // The error is io.EOF when the stream is done or another non-nil error if | ||
385 | // the stream broke. | ||
386 | type transportReader struct { | ||
387 | reader io.Reader | ||
388 | // The handler to control the window update procedure for both this | ||
389 | // particular stream and the associated transport. | ||
390 | windowHandler func(int) | ||
391 | er error | ||
392 | } | ||
393 | |||
394 | func (t *transportReader) Read(p []byte) (n int, err error) { | ||
395 | n, err = t.reader.Read(p) | ||
396 | if err != nil { | ||
397 | t.er = err | ||
398 | return | ||
399 | } | ||
400 | t.windowHandler(n) | ||
401 | return | ||
402 | } | ||
403 | |||
404 | // finish sets the stream's state and status, and closes the done channel. | ||
405 | // s.mu must be held by the caller. st must always be non-nil. | ||
406 | func (s *Stream) finish(st *status.Status) { | ||
407 | s.status = st | ||
408 | s.state = streamDone | ||
409 | close(s.done) | ||
410 | } | ||
411 | |||
412 | // BytesSent indicates whether any bytes have been sent on this stream. | ||
413 | func (s *Stream) BytesSent() bool { | ||
414 | s.mu.Lock() | ||
415 | defer s.mu.Unlock() | ||
416 | return s.bytesSent | ||
417 | } | ||
418 | |||
419 | // BytesReceived indicates whether any bytes have been received on this stream. | ||
420 | func (s *Stream) BytesReceived() bool { | ||
421 | s.mu.Lock() | ||
422 | defer s.mu.Unlock() | ||
423 | return s.bytesReceived | ||
424 | } | ||
425 | |||
426 | // GoString is implemented by Stream so context.String() won't | ||
427 | // race when printing %#v. | ||
428 | func (s *Stream) GoString() string { | ||
429 | return fmt.Sprintf("<stream: %p, %v>", s, s.method) | ||
430 | } | ||
431 | |||
432 | // The key to save transport.Stream in the context. | ||
433 | type streamKey struct{} | ||
434 | |||
435 | // newContextWithStream creates a new context from ctx and attaches stream | ||
436 | // to it. | ||
437 | func newContextWithStream(ctx context.Context, stream *Stream) context.Context { | ||
438 | return context.WithValue(ctx, streamKey{}, stream) | ||
439 | } | ||
440 | |||
441 | // StreamFromContext returns the stream saved in ctx. | ||
442 | func StreamFromContext(ctx context.Context) (s *Stream, ok bool) { | ||
443 | s, ok = ctx.Value(streamKey{}).(*Stream) | ||
444 | return | ||
445 | } | ||
446 | |||
447 | // state of transport | ||
448 | type transportState int | ||
449 | |||
450 | const ( | ||
451 | reachable transportState = iota | ||
452 | unreachable | ||
453 | closing | ||
454 | draining | ||
455 | ) | ||
456 | |||
457 | // ServerConfig consists of all the configurations to establish a server transport. | ||
458 | type ServerConfig struct { | ||
459 | MaxStreams uint32 | ||
460 | AuthInfo credentials.AuthInfo | ||
461 | InTapHandle tap.ServerInHandle | ||
462 | StatsHandler stats.Handler | ||
463 | KeepaliveParams keepalive.ServerParameters | ||
464 | KeepalivePolicy keepalive.EnforcementPolicy | ||
465 | InitialWindowSize int32 | ||
466 | InitialConnWindowSize int32 | ||
467 | } | ||
468 | |||
469 | // NewServerTransport creates a ServerTransport with conn or non-nil error | ||
470 | // if it fails. | ||
471 | func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) { | ||
472 | return newHTTP2Server(conn, config) | ||
473 | } | ||
474 | |||
475 | // ConnectOptions covers all relevant options for communicating with the server. | ||
476 | type ConnectOptions struct { | ||
477 | // UserAgent is the application user agent. | ||
478 | UserAgent string | ||
479 | // Authority is the :authority pseudo-header to use. This field has no effect if | ||
480 | // TransportCredentials is set. | ||
481 | Authority string | ||
482 | // Dialer specifies how to dial a network address. | ||
483 | Dialer func(context.Context, string) (net.Conn, error) | ||
484 | // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors. | ||
485 | FailOnNonTempDialError bool | ||
486 | // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs. | ||
487 | PerRPCCredentials []credentials.PerRPCCredentials | ||
488 | // TransportCredentials stores the Authenticator required to setup a client connection. | ||
489 | TransportCredentials credentials.TransportCredentials | ||
490 | // KeepaliveParams stores the keepalive parameters. | ||
491 | KeepaliveParams keepalive.ClientParameters | ||
492 | // StatsHandler stores the handler for stats. | ||
493 | StatsHandler stats.Handler | ||
494 | // InitialWindowSize sets the intial window size for a stream. | ||
495 | InitialWindowSize int32 | ||
496 | // InitialConnWindowSize sets the intial window size for a connection. | ||
497 | InitialConnWindowSize int32 | ||
498 | } | ||
499 | |||
500 | // TargetInfo contains the information of the target such as network address and metadata. | ||
501 | type TargetInfo struct { | ||
502 | Addr string | ||
503 | Metadata interface{} | ||
504 | } | ||
505 | |||
506 | // NewClientTransport establishes the transport with the required ConnectOptions | ||
507 | // and returns it to the caller. | ||
508 | func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions) (ClientTransport, error) { | ||
509 | return newHTTP2Client(ctx, target, opts) | ||
510 | } | ||
511 | |||
512 | // Options provides additional hints and information for message | ||
513 | // transmission. | ||
514 | type Options struct { | ||
515 | // Last indicates whether this write is the last piece for | ||
516 | // this stream. | ||
517 | Last bool | ||
518 | |||
519 | // Delay is a hint to the transport implementation for whether | ||
520 | // the data could be buffered for a batching write. The | ||
521 | // Transport implementation may ignore the hint. | ||
522 | Delay bool | ||
523 | } | ||
524 | |||
525 | // CallHdr carries the information of a particular RPC. | ||
526 | type CallHdr struct { | ||
527 | // Host specifies the peer's host. | ||
528 | Host string | ||
529 | |||
530 | // Method specifies the operation to perform. | ||
531 | Method string | ||
532 | |||
533 | // RecvCompress specifies the compression algorithm applied on | ||
534 | // inbound messages. | ||
535 | RecvCompress string | ||
536 | |||
537 | // SendCompress specifies the compression algorithm applied on | ||
538 | // outbound message. | ||
539 | SendCompress string | ||
540 | |||
541 | // Creds specifies credentials.PerRPCCredentials for a call. | ||
542 | Creds credentials.PerRPCCredentials | ||
543 | |||
544 | // Flush indicates whether a new stream command should be sent | ||
545 | // to the peer without waiting for the first data. This is | ||
546 | // only a hint. | ||
547 | // If it's true, the transport may modify the flush decision | ||
548 | // for performance purposes. | ||
549 | // If it's false, new stream will never be flushed. | ||
550 | Flush bool | ||
551 | } | ||
552 | |||
553 | // ClientTransport is the common interface for all gRPC client-side transport | ||
554 | // implementations. | ||
555 | type ClientTransport interface { | ||
556 | // Close tears down this transport. Once it returns, the transport | ||
557 | // should not be accessed any more. The caller must make sure this | ||
558 | // is called only once. | ||
559 | Close() error | ||
560 | |||
561 | // GracefulClose starts to tear down the transport. It stops accepting | ||
562 | // new RPCs and wait the completion of the pending RPCs. | ||
563 | GracefulClose() error | ||
564 | |||
565 | // Write sends the data for the given stream. A nil stream indicates | ||
566 | // the write is to be performed on the transport as a whole. | ||
567 | Write(s *Stream, data []byte, opts *Options) error | ||
568 | |||
569 | // NewStream creates a Stream for an RPC. | ||
570 | NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) | ||
571 | |||
572 | // CloseStream clears the footprint of a stream when the stream is | ||
573 | // not needed any more. The err indicates the error incurred when | ||
574 | // CloseStream is called. Must be called when a stream is finished | ||
575 | // unless the associated transport is closing. | ||
576 | CloseStream(stream *Stream, err error) | ||
577 | |||
578 | // Error returns a channel that is closed when some I/O error | ||
579 | // happens. Typically the caller should have a goroutine to monitor | ||
580 | // this in order to take action (e.g., close the current transport | ||
581 | // and create a new one) in error case. It should not return nil | ||
582 | // once the transport is initiated. | ||
583 | Error() <-chan struct{} | ||
584 | |||
585 | // GoAway returns a channel that is closed when ClientTransport | ||
586 | // receives the draining signal from the server (e.g., GOAWAY frame in | ||
587 | // HTTP/2). | ||
588 | GoAway() <-chan struct{} | ||
589 | |||
590 | // GetGoAwayReason returns the reason why GoAway frame was received. | ||
591 | GetGoAwayReason() GoAwayReason | ||
592 | } | ||
593 | |||
594 | // ServerTransport is the common interface for all gRPC server-side transport | ||
595 | // implementations. | ||
596 | // | ||
597 | // Methods may be called concurrently from multiple goroutines, but | ||
598 | // Write methods for a given Stream will be called serially. | ||
599 | type ServerTransport interface { | ||
600 | // HandleStreams receives incoming streams using the given handler. | ||
601 | HandleStreams(func(*Stream), func(context.Context, string) context.Context) | ||
602 | |||
603 | // WriteHeader sends the header metadata for the given stream. | ||
604 | // WriteHeader may not be called on all streams. | ||
605 | WriteHeader(s *Stream, md metadata.MD) error | ||
606 | |||
607 | // Write sends the data for the given stream. | ||
608 | // Write may not be called on all streams. | ||
609 | Write(s *Stream, data []byte, opts *Options) error | ||
610 | |||
611 | // WriteStatus sends the status of a stream to the client. WriteStatus is | ||
612 | // the final call made on a stream and always occurs. | ||
613 | WriteStatus(s *Stream, st *status.Status) error | ||
614 | |||
615 | // Close tears down the transport. Once it is called, the transport | ||
616 | // should not be accessed any more. All the pending streams and their | ||
617 | // handlers will be terminated asynchronously. | ||
618 | Close() error | ||
619 | |||
620 | // RemoteAddr returns the remote network address. | ||
621 | RemoteAddr() net.Addr | ||
622 | |||
623 | // Drain notifies the client this ServerTransport stops accepting new RPCs. | ||
624 | Drain() | ||
625 | } | ||
626 | |||
627 | // streamErrorf creates an StreamError with the specified error code and description. | ||
628 | func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError { | ||
629 | return StreamError{ | ||
630 | Code: c, | ||
631 | Desc: fmt.Sprintf(format, a...), | ||
632 | } | ||
633 | } | ||
634 | |||
635 | // connectionErrorf creates an ConnectionError with the specified error description. | ||
636 | func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError { | ||
637 | return ConnectionError{ | ||
638 | Desc: fmt.Sprintf(format, a...), | ||
639 | temp: temp, | ||
640 | err: e, | ||
641 | } | ||
642 | } | ||
643 | |||
644 | // ConnectionError is an error that results in the termination of the | ||
645 | // entire connection and the retry of all the active streams. | ||
646 | type ConnectionError struct { | ||
647 | Desc string | ||
648 | temp bool | ||
649 | err error | ||
650 | } | ||
651 | |||
652 | func (e ConnectionError) Error() string { | ||
653 | return fmt.Sprintf("connection error: desc = %q", e.Desc) | ||
654 | } | ||
655 | |||
656 | // Temporary indicates if this connection error is temporary or fatal. | ||
657 | func (e ConnectionError) Temporary() bool { | ||
658 | return e.temp | ||
659 | } | ||
660 | |||
661 | // Origin returns the original error of this connection error. | ||
662 | func (e ConnectionError) Origin() error { | ||
663 | // Never return nil error here. | ||
664 | // If the original error is nil, return itself. | ||
665 | if e.err == nil { | ||
666 | return e | ||
667 | } | ||
668 | return e.err | ||
669 | } | ||
670 | |||
671 | var ( | ||
672 | // ErrConnClosing indicates that the transport is closing. | ||
673 | ErrConnClosing = connectionErrorf(true, nil, "transport is closing") | ||
674 | // ErrStreamDrain indicates that the stream is rejected by the server because | ||
675 | // the server stops accepting new RPCs. | ||
676 | ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs") | ||
677 | ) | ||
678 | |||
679 | // TODO: See if we can replace StreamError with status package errors. | ||
680 | |||
681 | // StreamError is an error that only affects one stream within a connection. | ||
682 | type StreamError struct { | ||
683 | Code codes.Code | ||
684 | Desc string | ||
685 | } | ||
686 | |||
687 | func (e StreamError) Error() string { | ||
688 | return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc) | ||
689 | } | ||
690 | |||
691 | // wait blocks until it can receive from ctx.Done, closing, or proceed. | ||
692 | // If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err. | ||
693 | // If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise | ||
694 | // it return the StreamError for ctx.Err. | ||
695 | // If it receives from goAway, it returns 0, ErrStreamDrain. | ||
696 | // If it receives from closing, it returns 0, ErrConnClosing. | ||
697 | // If it receives from proceed, it returns the received integer, nil. | ||
698 | func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-chan int) (int, error) { | ||
699 | select { | ||
700 | case <-ctx.Done(): | ||
701 | return 0, ContextErr(ctx.Err()) | ||
702 | case <-done: | ||
703 | // User cancellation has precedence. | ||
704 | select { | ||
705 | case <-ctx.Done(): | ||
706 | return 0, ContextErr(ctx.Err()) | ||
707 | default: | ||
708 | } | ||
709 | return 0, io.EOF | ||
710 | case <-goAway: | ||
711 | return 0, ErrStreamDrain | ||
712 | case <-closing: | ||
713 | return 0, ErrConnClosing | ||
714 | case i := <-proceed: | ||
715 | return i, nil | ||
716 | } | ||
717 | } | ||
718 | |||
719 | // GoAwayReason contains the reason for the GoAway frame received. | ||
720 | type GoAwayReason uint8 | ||
721 | |||
722 | const ( | ||
723 | // Invalid indicates that no GoAway frame is received. | ||
724 | Invalid GoAwayReason = 0 | ||
725 | // NoReason is the default value when GoAway frame is received. | ||
726 | NoReason GoAwayReason = 1 | ||
727 | // TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm | ||
728 | // was recieved and that the debug data said "too_many_pings". | ||
729 | TooManyPings GoAwayReason = 2 | ||
730 | ) | ||