]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | /* |
2 | * | |
3 | * Copyright 2014 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | // Package transport defines and implements message oriented communication | |
107c1cdb ND |
20 | // channel to complete various transactions (e.g., an RPC). It is meant for |
21 | // grpc-internal usage and is not intended to be imported directly by users. | |
22 | package transport | |
15c0b25d AP |
23 | |
24 | import ( | |
107c1cdb ND |
25 | "context" |
26 | "errors" | |
15c0b25d AP |
27 | "fmt" |
28 | "io" | |
29 | "net" | |
30 | "sync" | |
107c1cdb | 31 | "sync/atomic" |
15c0b25d | 32 | |
15c0b25d AP |
33 | "google.golang.org/grpc/codes" |
34 | "google.golang.org/grpc/credentials" | |
35 | "google.golang.org/grpc/keepalive" | |
36 | "google.golang.org/grpc/metadata" | |
37 | "google.golang.org/grpc/stats" | |
38 | "google.golang.org/grpc/status" | |
39 | "google.golang.org/grpc/tap" | |
40 | ) | |
41 | ||
42 | // recvMsg represents the received msg from the transport. All transport | |
43 | // protocol specific info has been removed. | |
44 | type recvMsg struct { | |
45 | data []byte | |
46 | // nil: received some data | |
47 | // io.EOF: stream is completed. data is nil. | |
48 | // other non-nil error: transport failure. data is nil. | |
49 | err error | |
50 | } | |
51 | ||
52 | // recvBuffer is an unbounded channel of recvMsg structs. | |
53 | // Note recvBuffer differs from controlBuffer only in that recvBuffer | |
54 | // holds a channel of only recvMsg structs instead of objects implementing "item" interface. | |
55 | // recvBuffer is written to much more often than | |
56 | // controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put" | |
57 | type recvBuffer struct { | |
58 | c chan recvMsg | |
59 | mu sync.Mutex | |
60 | backlog []recvMsg | |
107c1cdb | 61 | err error |
15c0b25d AP |
62 | } |
63 | ||
64 | func newRecvBuffer() *recvBuffer { | |
65 | b := &recvBuffer{ | |
66 | c: make(chan recvMsg, 1), | |
67 | } | |
68 | return b | |
69 | } | |
70 | ||
71 | func (b *recvBuffer) put(r recvMsg) { | |
72 | b.mu.Lock() | |
107c1cdb ND |
73 | if b.err != nil { |
74 | b.mu.Unlock() | |
75 | // An error had occurred earlier, don't accept more | |
76 | // data or errors. | |
77 | return | |
78 | } | |
79 | b.err = r.err | |
15c0b25d AP |
80 | if len(b.backlog) == 0 { |
81 | select { | |
82 | case b.c <- r: | |
107c1cdb | 83 | b.mu.Unlock() |
15c0b25d AP |
84 | return |
85 | default: | |
86 | } | |
87 | } | |
88 | b.backlog = append(b.backlog, r) | |
107c1cdb | 89 | b.mu.Unlock() |
15c0b25d AP |
90 | } |
91 | ||
92 | func (b *recvBuffer) load() { | |
93 | b.mu.Lock() | |
15c0b25d AP |
94 | if len(b.backlog) > 0 { |
95 | select { | |
96 | case b.c <- b.backlog[0]: | |
97 | b.backlog[0] = recvMsg{} | |
98 | b.backlog = b.backlog[1:] | |
99 | default: | |
100 | } | |
101 | } | |
107c1cdb | 102 | b.mu.Unlock() |
15c0b25d AP |
103 | } |
104 | ||
105 | // get returns the channel that receives a recvMsg in the buffer. | |
106 | // | |
107 | // Upon receipt of a recvMsg, the caller should call load to send another | |
108 | // recvMsg onto the channel if there is any. | |
109 | func (b *recvBuffer) get() <-chan recvMsg { | |
110 | return b.c | |
111 | } | |
112 | ||
113 | // recvBufferReader implements io.Reader interface to read the data from | |
114 | // recvBuffer. | |
115 | type recvBufferReader struct { | |
107c1cdb ND |
116 | closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata. |
117 | ctx context.Context | |
118 | ctxDone <-chan struct{} // cache of ctx.Done() (for performance). | |
119 | recv *recvBuffer | |
120 | last []byte // Stores the remaining data in the previous calls. | |
121 | err error | |
15c0b25d AP |
122 | } |
123 | ||
124 | // Read reads the next len(p) bytes from last. If last is drained, it tries to | |
125 | // read additional data from recv. It blocks if there no additional data available | |
126 | // in recv. If Read returns any non-nil error, it will continue to return that error. | |
127 | func (r *recvBufferReader) Read(p []byte) (n int, err error) { | |
128 | if r.err != nil { | |
129 | return 0, r.err | |
130 | } | |
15c0b25d AP |
131 | if r.last != nil && len(r.last) > 0 { |
132 | // Read remaining data left in last call. | |
133 | copied := copy(p, r.last) | |
134 | r.last = r.last[copied:] | |
135 | return copied, nil | |
136 | } | |
107c1cdb ND |
137 | if r.closeStream != nil { |
138 | n, r.err = r.readClient(p) | |
139 | } else { | |
140 | n, r.err = r.read(p) | |
141 | } | |
142 | return n, r.err | |
143 | } | |
144 | ||
145 | func (r *recvBufferReader) read(p []byte) (n int, err error) { | |
15c0b25d | 146 | select { |
107c1cdb | 147 | case <-r.ctxDone: |
15c0b25d | 148 | return 0, ContextErr(r.ctx.Err()) |
15c0b25d | 149 | case m := <-r.recv.get(): |
107c1cdb | 150 | return r.readAdditional(m, p) |
15c0b25d AP |
151 | } |
152 | } | |
153 | ||
107c1cdb ND |
154 | func (r *recvBufferReader) readClient(p []byte) (n int, err error) { |
155 | // If the context is canceled, then closes the stream with nil metadata. | |
156 | // closeStream writes its error parameter to r.recv as a recvMsg. | |
157 | // r.readAdditional acts on that message and returns the necessary error. | |
158 | select { | |
159 | case <-r.ctxDone: | |
160 | r.closeStream(ContextErr(r.ctx.Err())) | |
161 | m := <-r.recv.get() | |
162 | return r.readAdditional(m, p) | |
163 | case m := <-r.recv.get(): | |
164 | return r.readAdditional(m, p) | |
15c0b25d | 165 | } |
15c0b25d AP |
166 | } |
167 | ||
107c1cdb ND |
168 | func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) { |
169 | r.recv.load() | |
170 | if m.err != nil { | |
171 | return 0, m.err | |
15c0b25d | 172 | } |
107c1cdb ND |
173 | copied := copy(p, m.data) |
174 | r.last = m.data[copied:] | |
175 | return copied, nil | |
15c0b25d AP |
176 | } |
177 | ||
107c1cdb | 178 | type streamState uint32 |
15c0b25d AP |
179 | |
180 | const ( | |
181 | streamActive streamState = iota | |
182 | streamWriteDone // EndStream sent | |
183 | streamReadDone // EndStream received | |
184 | streamDone // the entire stream is finished. | |
185 | ) | |
186 | ||
187 | // Stream represents an RPC in the transport layer. | |
188 | type Stream struct { | |
107c1cdb ND |
189 | id uint32 |
190 | st ServerTransport // nil for client side Stream | |
191 | ctx context.Context // the associated context of the stream | |
192 | cancel context.CancelFunc // always nil for client side Stream | |
193 | done chan struct{} // closed at the end of stream to unblock writers. On the client side. | |
194 | ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance) | |
195 | method string // the associated RPC method of the stream | |
15c0b25d AP |
196 | recvCompress string |
197 | sendCompress string | |
198 | buf *recvBuffer | |
199 | trReader io.Reader | |
200 | fc *inFlow | |
107c1cdb | 201 | wq *writeQuota |
15c0b25d AP |
202 | |
203 | // Callback to state application's intentions to read data. This | |
107c1cdb | 204 | // is used to adjust flow control, if needed. |
15c0b25d AP |
205 | requestRead func(int) |
206 | ||
107c1cdb ND |
207 | headerChan chan struct{} // closed to indicate the end of header metadata. |
208 | headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. | |
209 | ||
210 | // hdrMu protects header and trailer metadata on the server-side. | |
211 | hdrMu sync.Mutex | |
212 | // On client side, header keeps the received header metadata. | |
213 | // | |
214 | // On server side, header keeps the header set by SetHeader(). The complete | |
215 | // header will merged into this after t.WriteHeader() is called. | |
216 | header metadata.MD | |
217 | trailer metadata.MD // the key-value map of trailer metadata. | |
218 | ||
219 | noHeaders bool // set if the client never received headers (set only after the stream is done). | |
220 | ||
221 | // On the server-side, headerSent is atomically set to 1 when the headers are sent out. | |
222 | headerSent uint32 | |
223 | ||
224 | state streamState | |
225 | ||
226 | // On client-side it is the status error received from the server. | |
227 | // On server-side it is unused. | |
15c0b25d | 228 | status *status.Status |
107c1cdb ND |
229 | |
230 | bytesReceived uint32 // indicates whether any bytes have been received on this stream | |
231 | unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream | |
232 | ||
233 | // contentSubtype is the content-subtype for requests. | |
234 | // this must be lowercase or the behavior is undefined. | |
235 | contentSubtype string | |
236 | } | |
237 | ||
238 | // isHeaderSent is only valid on the server-side. | |
239 | func (s *Stream) isHeaderSent() bool { | |
240 | return atomic.LoadUint32(&s.headerSent) == 1 | |
241 | } | |
242 | ||
243 | // updateHeaderSent updates headerSent and returns true | |
244 | // if it was alreay set. It is valid only on server-side. | |
245 | func (s *Stream) updateHeaderSent() bool { | |
246 | return atomic.SwapUint32(&s.headerSent, 1) == 1 | |
247 | } | |
248 | ||
249 | func (s *Stream) swapState(st streamState) streamState { | |
250 | return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st))) | |
251 | } | |
252 | ||
253 | func (s *Stream) compareAndSwapState(oldState, newState streamState) bool { | |
254 | return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState)) | |
255 | } | |
256 | ||
257 | func (s *Stream) getState() streamState { | |
258 | return streamState(atomic.LoadUint32((*uint32)(&s.state))) | |
259 | } | |
260 | ||
261 | func (s *Stream) waitOnHeader() error { | |
262 | if s.headerChan == nil { | |
263 | // On the server headerChan is always nil since a stream originates | |
264 | // only after having received headers. | |
265 | return nil | |
266 | } | |
267 | select { | |
268 | case <-s.ctx.Done(): | |
269 | return ContextErr(s.ctx.Err()) | |
270 | case <-s.headerChan: | |
271 | return nil | |
272 | } | |
15c0b25d AP |
273 | } |
274 | ||
275 | // RecvCompress returns the compression algorithm applied to the inbound | |
276 | // message. It is empty string if there is no compression applied. | |
277 | func (s *Stream) RecvCompress() string { | |
107c1cdb ND |
278 | if err := s.waitOnHeader(); err != nil { |
279 | return "" | |
280 | } | |
15c0b25d AP |
281 | return s.recvCompress |
282 | } | |
283 | ||
284 | // SetSendCompress sets the compression algorithm to the stream. | |
285 | func (s *Stream) SetSendCompress(str string) { | |
286 | s.sendCompress = str | |
287 | } | |
288 | ||
107c1cdb | 289 | // Done returns a channel which is closed when it receives the final status |
15c0b25d AP |
290 | // from the server. |
291 | func (s *Stream) Done() <-chan struct{} { | |
292 | return s.done | |
293 | } | |
294 | ||
107c1cdb ND |
295 | // Header returns the header metadata of the stream. |
296 | // | |
297 | // On client side, it acquires the key-value pairs of header metadata once it is | |
298 | // available. It blocks until i) the metadata is ready or ii) there is no header | |
299 | // metadata or iii) the stream is canceled/expired. | |
300 | // | |
301 | // On server side, it returns the out header after t.WriteHeader is called. | |
15c0b25d | 302 | func (s *Stream) Header() (metadata.MD, error) { |
107c1cdb ND |
303 | if s.headerChan == nil && s.header != nil { |
304 | // On server side, return the header in stream. It will be the out | |
305 | // header after t.WriteHeader is called. | |
15c0b25d AP |
306 | return s.header.Copy(), nil |
307 | } | |
107c1cdb | 308 | err := s.waitOnHeader() |
15c0b25d AP |
309 | // Even if the stream is closed, header is returned if available. |
310 | select { | |
311 | case <-s.headerChan: | |
107c1cdb ND |
312 | if s.header == nil { |
313 | return nil, nil | |
314 | } | |
15c0b25d AP |
315 | return s.header.Copy(), nil |
316 | default: | |
317 | } | |
318 | return nil, err | |
319 | } | |
320 | ||
107c1cdb ND |
321 | // TrailersOnly blocks until a header or trailers-only frame is received and |
322 | // then returns true if the stream was trailers-only. If the stream ends | |
323 | // before headers are received, returns true, nil. If a context error happens | |
324 | // first, returns it as a status error. Client-side only. | |
325 | func (s *Stream) TrailersOnly() (bool, error) { | |
326 | err := s.waitOnHeader() | |
327 | if err != nil { | |
328 | return false, err | |
329 | } | |
330 | // if !headerDone, some other connection error occurred. | |
331 | return s.noHeaders && atomic.LoadUint32(&s.headerDone) == 1, nil | |
332 | } | |
333 | ||
15c0b25d AP |
334 | // Trailer returns the cached trailer metedata. Note that if it is not called |
335 | // after the entire stream is done, it could return an empty MD. Client | |
336 | // side only. | |
107c1cdb ND |
337 | // It can be safely read only after stream has ended that is either read |
338 | // or write have returned io.EOF. | |
15c0b25d | 339 | func (s *Stream) Trailer() metadata.MD { |
107c1cdb ND |
340 | c := s.trailer.Copy() |
341 | return c | |
15c0b25d AP |
342 | } |
343 | ||
107c1cdb ND |
344 | // ContentSubtype returns the content-subtype for a request. For example, a |
345 | // content-subtype of "proto" will result in a content-type of | |
346 | // "application/grpc+proto". This will always be lowercase. See | |
347 | // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for | |
348 | // more details. | |
349 | func (s *Stream) ContentSubtype() string { | |
350 | return s.contentSubtype | |
15c0b25d AP |
351 | } |
352 | ||
353 | // Context returns the context of the stream. | |
354 | func (s *Stream) Context() context.Context { | |
355 | return s.ctx | |
356 | } | |
357 | ||
358 | // Method returns the method for the stream. | |
359 | func (s *Stream) Method() string { | |
360 | return s.method | |
361 | } | |
362 | ||
363 | // Status returns the status received from the server. | |
107c1cdb ND |
364 | // Status can be read safely only after the stream has ended, |
365 | // that is, after Done() is closed. | |
15c0b25d AP |
366 | func (s *Stream) Status() *status.Status { |
367 | return s.status | |
368 | } | |
369 | ||
370 | // SetHeader sets the header metadata. This can be called multiple times. | |
371 | // Server side only. | |
107c1cdb | 372 | // This should not be called in parallel to other data writes. |
15c0b25d | 373 | func (s *Stream) SetHeader(md metadata.MD) error { |
15c0b25d AP |
374 | if md.Len() == 0 { |
375 | return nil | |
376 | } | |
107c1cdb ND |
377 | if s.isHeaderSent() || s.getState() == streamDone { |
378 | return ErrIllegalHeaderWrite | |
379 | } | |
380 | s.hdrMu.Lock() | |
15c0b25d | 381 | s.header = metadata.Join(s.header, md) |
107c1cdb | 382 | s.hdrMu.Unlock() |
15c0b25d AP |
383 | return nil |
384 | } | |
385 | ||
107c1cdb ND |
386 | // SendHeader sends the given header metadata. The given metadata is |
387 | // combined with any metadata set by previous calls to SetHeader and | |
388 | // then written to the transport stream. | |
389 | func (s *Stream) SendHeader(md metadata.MD) error { | |
390 | return s.st.WriteHeader(s, md) | |
391 | } | |
392 | ||
15c0b25d AP |
393 | // SetTrailer sets the trailer metadata which will be sent with the RPC status |
394 | // by the server. This can be called multiple times. Server side only. | |
107c1cdb | 395 | // This should not be called parallel to other data writes. |
15c0b25d AP |
396 | func (s *Stream) SetTrailer(md metadata.MD) error { |
397 | if md.Len() == 0 { | |
398 | return nil | |
399 | } | |
107c1cdb ND |
400 | if s.getState() == streamDone { |
401 | return ErrIllegalHeaderWrite | |
402 | } | |
403 | s.hdrMu.Lock() | |
15c0b25d | 404 | s.trailer = metadata.Join(s.trailer, md) |
107c1cdb | 405 | s.hdrMu.Unlock() |
15c0b25d AP |
406 | return nil |
407 | } | |
408 | ||
409 | func (s *Stream) write(m recvMsg) { | |
410 | s.buf.put(m) | |
411 | } | |
412 | ||
413 | // Read reads all p bytes from the wire for this stream. | |
414 | func (s *Stream) Read(p []byte) (n int, err error) { | |
415 | // Don't request a read if there was an error earlier | |
416 | if er := s.trReader.(*transportReader).er; er != nil { | |
417 | return 0, er | |
418 | } | |
419 | s.requestRead(len(p)) | |
420 | return io.ReadFull(s.trReader, p) | |
421 | } | |
422 | ||
423 | // tranportReader reads all the data available for this Stream from the transport and | |
424 | // passes them into the decoder, which converts them into a gRPC message stream. | |
425 | // The error is io.EOF when the stream is done or another non-nil error if | |
426 | // the stream broke. | |
427 | type transportReader struct { | |
428 | reader io.Reader | |
429 | // The handler to control the window update procedure for both this | |
430 | // particular stream and the associated transport. | |
431 | windowHandler func(int) | |
432 | er error | |
433 | } | |
434 | ||
435 | func (t *transportReader) Read(p []byte) (n int, err error) { | |
436 | n, err = t.reader.Read(p) | |
437 | if err != nil { | |
438 | t.er = err | |
439 | return | |
440 | } | |
441 | t.windowHandler(n) | |
442 | return | |
443 | } | |
444 | ||
15c0b25d AP |
445 | // BytesReceived indicates whether any bytes have been received on this stream. |
446 | func (s *Stream) BytesReceived() bool { | |
107c1cdb ND |
447 | return atomic.LoadUint32(&s.bytesReceived) == 1 |
448 | } | |
449 | ||
450 | // Unprocessed indicates whether the server did not process this stream -- | |
451 | // i.e. it sent a refused stream or GOAWAY including this stream ID. | |
452 | func (s *Stream) Unprocessed() bool { | |
453 | return atomic.LoadUint32(&s.unprocessed) == 1 | |
15c0b25d AP |
454 | } |
455 | ||
456 | // GoString is implemented by Stream so context.String() won't | |
457 | // race when printing %#v. | |
458 | func (s *Stream) GoString() string { | |
459 | return fmt.Sprintf("<stream: %p, %v>", s, s.method) | |
460 | } | |
461 | ||
15c0b25d AP |
462 | // state of transport |
463 | type transportState int | |
464 | ||
465 | const ( | |
466 | reachable transportState = iota | |
15c0b25d AP |
467 | closing |
468 | draining | |
469 | ) | |
470 | ||
471 | // ServerConfig consists of all the configurations to establish a server transport. | |
472 | type ServerConfig struct { | |
473 | MaxStreams uint32 | |
474 | AuthInfo credentials.AuthInfo | |
475 | InTapHandle tap.ServerInHandle | |
476 | StatsHandler stats.Handler | |
477 | KeepaliveParams keepalive.ServerParameters | |
478 | KeepalivePolicy keepalive.EnforcementPolicy | |
479 | InitialWindowSize int32 | |
480 | InitialConnWindowSize int32 | |
107c1cdb ND |
481 | WriteBufferSize int |
482 | ReadBufferSize int | |
483 | ChannelzParentID int64 | |
484 | MaxHeaderListSize *uint32 | |
15c0b25d AP |
485 | } |
486 | ||
487 | // NewServerTransport creates a ServerTransport with conn or non-nil error | |
488 | // if it fails. | |
489 | func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) { | |
490 | return newHTTP2Server(conn, config) | |
491 | } | |
492 | ||
493 | // ConnectOptions covers all relevant options for communicating with the server. | |
494 | type ConnectOptions struct { | |
495 | // UserAgent is the application user agent. | |
496 | UserAgent string | |
15c0b25d AP |
497 | // Dialer specifies how to dial a network address. |
498 | Dialer func(context.Context, string) (net.Conn, error) | |
499 | // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors. | |
500 | FailOnNonTempDialError bool | |
501 | // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs. | |
502 | PerRPCCredentials []credentials.PerRPCCredentials | |
107c1cdb ND |
503 | // TransportCredentials stores the Authenticator required to setup a client |
504 | // connection. Only one of TransportCredentials and CredsBundle is non-nil. | |
15c0b25d | 505 | TransportCredentials credentials.TransportCredentials |
107c1cdb ND |
506 | // CredsBundle is the credentials bundle to be used. Only one of |
507 | // TransportCredentials and CredsBundle is non-nil. | |
508 | CredsBundle credentials.Bundle | |
15c0b25d AP |
509 | // KeepaliveParams stores the keepalive parameters. |
510 | KeepaliveParams keepalive.ClientParameters | |
511 | // StatsHandler stores the handler for stats. | |
512 | StatsHandler stats.Handler | |
107c1cdb | 513 | // InitialWindowSize sets the initial window size for a stream. |
15c0b25d | 514 | InitialWindowSize int32 |
107c1cdb | 515 | // InitialConnWindowSize sets the initial window size for a connection. |
15c0b25d | 516 | InitialConnWindowSize int32 |
107c1cdb ND |
517 | // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire. |
518 | WriteBufferSize int | |
519 | // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall. | |
520 | ReadBufferSize int | |
521 | // ChannelzParentID sets the addrConn id which initiate the creation of this client transport. | |
522 | ChannelzParentID int64 | |
523 | // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received. | |
524 | MaxHeaderListSize *uint32 | |
15c0b25d AP |
525 | } |
526 | ||
527 | // TargetInfo contains the information of the target such as network address and metadata. | |
528 | type TargetInfo struct { | |
107c1cdb ND |
529 | Addr string |
530 | Metadata interface{} | |
531 | Authority string | |
15c0b25d AP |
532 | } |
533 | ||
534 | // NewClientTransport establishes the transport with the required ConnectOptions | |
535 | // and returns it to the caller. | |
107c1cdb ND |
536 | func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) { |
537 | return newHTTP2Client(connectCtx, ctx, target, opts, onPrefaceReceipt, onGoAway, onClose) | |
15c0b25d AP |
538 | } |
539 | ||
540 | // Options provides additional hints and information for message | |
541 | // transmission. | |
542 | type Options struct { | |
543 | // Last indicates whether this write is the last piece for | |
544 | // this stream. | |
545 | Last bool | |
15c0b25d AP |
546 | } |
547 | ||
548 | // CallHdr carries the information of a particular RPC. | |
549 | type CallHdr struct { | |
550 | // Host specifies the peer's host. | |
551 | Host string | |
552 | ||
553 | // Method specifies the operation to perform. | |
554 | Method string | |
555 | ||
15c0b25d AP |
556 | // SendCompress specifies the compression algorithm applied on |
557 | // outbound message. | |
558 | SendCompress string | |
559 | ||
560 | // Creds specifies credentials.PerRPCCredentials for a call. | |
561 | Creds credentials.PerRPCCredentials | |
562 | ||
107c1cdb ND |
563 | // ContentSubtype specifies the content-subtype for a request. For example, a |
564 | // content-subtype of "proto" will result in a content-type of | |
565 | // "application/grpc+proto". The value of ContentSubtype must be all | |
566 | // lowercase, otherwise the behavior is undefined. See | |
567 | // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests | |
568 | // for more details. | |
569 | ContentSubtype string | |
570 | ||
571 | PreviousAttempts int // value of grpc-previous-rpc-attempts header to set | |
15c0b25d AP |
572 | } |
573 | ||
574 | // ClientTransport is the common interface for all gRPC client-side transport | |
575 | // implementations. | |
576 | type ClientTransport interface { | |
577 | // Close tears down this transport. Once it returns, the transport | |
578 | // should not be accessed any more. The caller must make sure this | |
579 | // is called only once. | |
580 | Close() error | |
581 | ||
582 | // GracefulClose starts to tear down the transport. It stops accepting | |
583 | // new RPCs and wait the completion of the pending RPCs. | |
584 | GracefulClose() error | |
585 | ||
586 | // Write sends the data for the given stream. A nil stream indicates | |
587 | // the write is to be performed on the transport as a whole. | |
107c1cdb | 588 | Write(s *Stream, hdr []byte, data []byte, opts *Options) error |
15c0b25d AP |
589 | |
590 | // NewStream creates a Stream for an RPC. | |
591 | NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) | |
592 | ||
593 | // CloseStream clears the footprint of a stream when the stream is | |
594 | // not needed any more. The err indicates the error incurred when | |
595 | // CloseStream is called. Must be called when a stream is finished | |
596 | // unless the associated transport is closing. | |
597 | CloseStream(stream *Stream, err error) | |
598 | ||
599 | // Error returns a channel that is closed when some I/O error | |
600 | // happens. Typically the caller should have a goroutine to monitor | |
601 | // this in order to take action (e.g., close the current transport | |
602 | // and create a new one) in error case. It should not return nil | |
603 | // once the transport is initiated. | |
604 | Error() <-chan struct{} | |
605 | ||
606 | // GoAway returns a channel that is closed when ClientTransport | |
607 | // receives the draining signal from the server (e.g., GOAWAY frame in | |
608 | // HTTP/2). | |
609 | GoAway() <-chan struct{} | |
610 | ||
611 | // GetGoAwayReason returns the reason why GoAway frame was received. | |
612 | GetGoAwayReason() GoAwayReason | |
107c1cdb ND |
613 | |
614 | // IncrMsgSent increments the number of message sent through this transport. | |
615 | IncrMsgSent() | |
616 | ||
617 | // IncrMsgRecv increments the number of message received through this transport. | |
618 | IncrMsgRecv() | |
15c0b25d AP |
619 | } |
620 | ||
621 | // ServerTransport is the common interface for all gRPC server-side transport | |
622 | // implementations. | |
623 | // | |
624 | // Methods may be called concurrently from multiple goroutines, but | |
625 | // Write methods for a given Stream will be called serially. | |
626 | type ServerTransport interface { | |
627 | // HandleStreams receives incoming streams using the given handler. | |
628 | HandleStreams(func(*Stream), func(context.Context, string) context.Context) | |
629 | ||
630 | // WriteHeader sends the header metadata for the given stream. | |
631 | // WriteHeader may not be called on all streams. | |
632 | WriteHeader(s *Stream, md metadata.MD) error | |
633 | ||
634 | // Write sends the data for the given stream. | |
635 | // Write may not be called on all streams. | |
107c1cdb | 636 | Write(s *Stream, hdr []byte, data []byte, opts *Options) error |
15c0b25d AP |
637 | |
638 | // WriteStatus sends the status of a stream to the client. WriteStatus is | |
639 | // the final call made on a stream and always occurs. | |
640 | WriteStatus(s *Stream, st *status.Status) error | |
641 | ||
642 | // Close tears down the transport. Once it is called, the transport | |
643 | // should not be accessed any more. All the pending streams and their | |
644 | // handlers will be terminated asynchronously. | |
645 | Close() error | |
646 | ||
647 | // RemoteAddr returns the remote network address. | |
648 | RemoteAddr() net.Addr | |
649 | ||
650 | // Drain notifies the client this ServerTransport stops accepting new RPCs. | |
651 | Drain() | |
15c0b25d | 652 | |
107c1cdb ND |
653 | // IncrMsgSent increments the number of message sent through this transport. |
654 | IncrMsgSent() | |
655 | ||
656 | // IncrMsgRecv increments the number of message received through this transport. | |
657 | IncrMsgRecv() | |
15c0b25d AP |
658 | } |
659 | ||
660 | // connectionErrorf creates an ConnectionError with the specified error description. | |
661 | func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError { | |
662 | return ConnectionError{ | |
663 | Desc: fmt.Sprintf(format, a...), | |
664 | temp: temp, | |
665 | err: e, | |
666 | } | |
667 | } | |
668 | ||
669 | // ConnectionError is an error that results in the termination of the | |
670 | // entire connection and the retry of all the active streams. | |
671 | type ConnectionError struct { | |
672 | Desc string | |
673 | temp bool | |
674 | err error | |
675 | } | |
676 | ||
677 | func (e ConnectionError) Error() string { | |
678 | return fmt.Sprintf("connection error: desc = %q", e.Desc) | |
679 | } | |
680 | ||
681 | // Temporary indicates if this connection error is temporary or fatal. | |
682 | func (e ConnectionError) Temporary() bool { | |
683 | return e.temp | |
684 | } | |
685 | ||
686 | // Origin returns the original error of this connection error. | |
687 | func (e ConnectionError) Origin() error { | |
688 | // Never return nil error here. | |
689 | // If the original error is nil, return itself. | |
690 | if e.err == nil { | |
691 | return e | |
692 | } | |
693 | return e.err | |
694 | } | |
695 | ||
696 | var ( | |
697 | // ErrConnClosing indicates that the transport is closing. | |
698 | ErrConnClosing = connectionErrorf(true, nil, "transport is closing") | |
107c1cdb ND |
699 | // errStreamDrain indicates that the stream is rejected because the |
700 | // connection is draining. This could be caused by goaway or balancer | |
701 | // removing the address. | |
702 | errStreamDrain = status.Error(codes.Unavailable, "the connection is draining") | |
703 | // errStreamDone is returned from write at the client side to indiacte application | |
704 | // layer of an error. | |
705 | errStreamDone = errors.New("the stream is done") | |
706 | // StatusGoAway indicates that the server sent a GOAWAY that included this | |
707 | // stream's ID in unprocessed RPCs. | |
708 | statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection") | |
15c0b25d AP |
709 | ) |
710 | ||
15c0b25d AP |
711 | // GoAwayReason contains the reason for the GoAway frame received. |
712 | type GoAwayReason uint8 | |
713 | ||
714 | const ( | |
107c1cdb ND |
715 | // GoAwayInvalid indicates that no GoAway frame is received. |
716 | GoAwayInvalid GoAwayReason = 0 | |
717 | // GoAwayNoReason is the default value when GoAway frame is received. | |
718 | GoAwayNoReason GoAwayReason = 1 | |
719 | // GoAwayTooManyPings indicates that a GoAway frame with | |
720 | // ErrCodeEnhanceYourCalm was received and that the debug data said | |
721 | // "too_many_pings". | |
722 | GoAwayTooManyPings GoAwayReason = 2 | |
15c0b25d | 723 | ) |
107c1cdb ND |
724 | |
725 | // channelzData is used to store channelz related data for http2Client and http2Server. | |
726 | // These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic | |
727 | // operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment. | |
728 | // Here, by grouping those int64 fields inside a struct, we are enforcing the alignment. | |
729 | type channelzData struct { | |
730 | kpCount int64 | |
731 | // The number of streams that have started, including already finished ones. | |
732 | streamsStarted int64 | |
733 | // Client side: The number of streams that have ended successfully by receiving | |
734 | // EoS bit set frame from server. | |
735 | // Server side: The number of streams that have ended successfully by sending | |
736 | // frame with EoS bit set. | |
737 | streamsSucceeded int64 | |
738 | streamsFailed int64 | |
739 | // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type | |
740 | // instead of time.Time since it's more costly to atomically update time.Time variable than int64 | |
741 | // variable. The same goes for lastMsgSentTime and lastMsgRecvTime. | |
742 | lastStreamCreatedTime int64 | |
743 | msgSent int64 | |
744 | msgRecv int64 | |
745 | lastMsgSentTime int64 | |
746 | lastMsgRecvTime int64 | |
747 | } | |
748 | ||
749 | // ContextErr converts the error from context package into a status error. | |
750 | func ContextErr(err error) error { | |
751 | switch err { | |
752 | case context.DeadlineExceeded: | |
753 | return status.Error(codes.DeadlineExceeded, err.Error()) | |
754 | case context.Canceled: | |
755 | return status.Error(codes.Canceled, err.Error()) | |
756 | } | |
757 | return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err) | |
758 | } |