]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blame - vendor/google.golang.org/grpc/transport/http2_client.go
Merge pull request #27 from terraform-providers/go-modules-2019-02-22
[github/fretlink/terraform-provider-statuscake.git] / vendor / google.golang.org / grpc / transport / http2_client.go
CommitLineData
15c0b25d
AP
1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "io"
24 "math"
25 "net"
26 "strings"
27 "sync"
28 "sync/atomic"
29 "time"
30
31 "golang.org/x/net/context"
32 "golang.org/x/net/http2"
33 "golang.org/x/net/http2/hpack"
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/credentials"
36 "google.golang.org/grpc/keepalive"
37 "google.golang.org/grpc/metadata"
38 "google.golang.org/grpc/peer"
39 "google.golang.org/grpc/stats"
40 "google.golang.org/grpc/status"
41)
42
43// http2Client implements the ClientTransport interface with HTTP2.
44type http2Client struct {
45 ctx context.Context
46 target string // server name/addr
47 userAgent string
48 md interface{}
49 conn net.Conn // underlying communication channel
50 remoteAddr net.Addr
51 localAddr net.Addr
52 authInfo credentials.AuthInfo // auth info about the connection
53 nextID uint32 // the next stream ID to be used
54
55 // writableChan synchronizes write access to the transport.
56 // A writer acquires the write lock by sending a value on writableChan
57 // and releases it by receiving from writableChan.
58 writableChan chan int
59 // shutdownChan is closed when Close is called.
60 // Blocking operations should select on shutdownChan to avoid
61 // blocking forever after Close.
62 // TODO(zhaoq): Maybe have a channel context?
63 shutdownChan chan struct{}
64 // errorChan is closed to notify the I/O error to the caller.
65 errorChan chan struct{}
66 // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
67 // that the server sent GoAway on this transport.
68 goAway chan struct{}
69 // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
70 awakenKeepalive chan struct{}
71
72 framer *framer
73 hBuf *bytes.Buffer // the buffer for HPACK encoding
74 hEnc *hpack.Encoder // HPACK encoder
75
76 // controlBuf delivers all the control related tasks (e.g., window
77 // updates, reset streams, and various settings) to the controller.
78 controlBuf *controlBuffer
79 fc *inFlow
80 // sendQuotaPool provides flow control to outbound message.
81 sendQuotaPool *quotaPool
82 // streamsQuota limits the max number of concurrent streams.
83 streamsQuota *quotaPool
84
85 // The scheme used: https if TLS is on, http otherwise.
86 scheme string
87
88 isSecure bool
89
90 creds []credentials.PerRPCCredentials
91
92 // Boolean to keep track of reading activity on transport.
93 // 1 is true and 0 is false.
94 activity uint32 // Accessed atomically.
95 kp keepalive.ClientParameters
96
97 statsHandler stats.Handler
98
99 initialWindowSize int32
100
101 bdpEst *bdpEstimator
102 outQuotaVersion uint32
103
104 mu sync.Mutex // guard the following variables
105 state transportState // the state of underlying connection
106 activeStreams map[uint32]*Stream
107 // The max number of concurrent streams
108 maxStreams int
109 // the per-stream outbound flow control window size set by the peer.
110 streamSendQuota uint32
111 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
112 prevGoAwayID uint32
113 // goAwayReason records the http2.ErrCode and debug data received with the
114 // GoAway frame.
115 goAwayReason GoAwayReason
116}
117
118func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
119 if fn != nil {
120 return fn(ctx, addr)
121 }
122 return dialContext(ctx, "tcp", addr)
123}
124
125func isTemporary(err error) bool {
126 switch err {
127 case io.EOF:
128 // Connection closures may be resolved upon retry, and are thus
129 // treated as temporary.
130 return true
131 case context.DeadlineExceeded:
132 // In Go 1.7, context.DeadlineExceeded implements Timeout(), and this
133 // special case is not needed. Until then, we need to keep this
134 // clause.
135 return true
136 }
137
138 switch err := err.(type) {
139 case interface {
140 Temporary() bool
141 }:
142 return err.Temporary()
143 case interface {
144 Timeout() bool
145 }:
146 // Timeouts may be resolved upon retry, and are thus treated as
147 // temporary.
148 return err.Timeout()
149 }
150 return false
151}
152
153// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
154// and starts to receive messages on it. Non-nil error returns if construction
155// fails.
156func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) {
157 scheme := "http"
158 conn, err := dial(ctx, opts.Dialer, addr.Addr)
159 if err != nil {
160 if opts.FailOnNonTempDialError {
161 return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
162 }
163 return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
164 }
165 // Any further errors will close the underlying connection
166 defer func(conn net.Conn) {
167 if err != nil {
168 conn.Close()
169 }
170 }(conn)
171 var (
172 isSecure bool
173 authInfo credentials.AuthInfo
174 )
175 if creds := opts.TransportCredentials; creds != nil {
176 scheme = "https"
177 conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn)
178 if err != nil {
179 // Credentials handshake errors are typically considered permanent
180 // to avoid retrying on e.g. bad certificates.
181 temp := isTemporary(err)
182 return nil, connectionErrorf(temp, err, "transport: authentication handshake failed: %v", err)
183 }
184 isSecure = true
185 }
186 kp := opts.KeepaliveParams
187 // Validate keepalive parameters.
188 if kp.Time == 0 {
189 kp.Time = defaultClientKeepaliveTime
190 }
191 if kp.Timeout == 0 {
192 kp.Timeout = defaultClientKeepaliveTimeout
193 }
194 dynamicWindow := true
195 icwz := int32(initialWindowSize)
196 if opts.InitialConnWindowSize >= defaultWindowSize {
197 icwz = opts.InitialConnWindowSize
198 dynamicWindow = false
199 }
200 var buf bytes.Buffer
201 t := &http2Client{
202 ctx: ctx,
203 target: addr.Addr,
204 userAgent: opts.UserAgent,
205 md: addr.Metadata,
206 conn: conn,
207 remoteAddr: conn.RemoteAddr(),
208 localAddr: conn.LocalAddr(),
209 authInfo: authInfo,
210 // The client initiated stream id is odd starting from 1.
211 nextID: 1,
212 writableChan: make(chan int, 1),
213 shutdownChan: make(chan struct{}),
214 errorChan: make(chan struct{}),
215 goAway: make(chan struct{}),
216 awakenKeepalive: make(chan struct{}, 1),
217 framer: newFramer(conn),
218 hBuf: &buf,
219 hEnc: hpack.NewEncoder(&buf),
220 controlBuf: newControlBuffer(),
221 fc: &inFlow{limit: uint32(icwz)},
222 sendQuotaPool: newQuotaPool(defaultWindowSize),
223 scheme: scheme,
224 state: reachable,
225 activeStreams: make(map[uint32]*Stream),
226 isSecure: isSecure,
227 creds: opts.PerRPCCredentials,
228 maxStreams: defaultMaxStreamsClient,
229 streamsQuota: newQuotaPool(defaultMaxStreamsClient),
230 streamSendQuota: defaultWindowSize,
231 kp: kp,
232 statsHandler: opts.StatsHandler,
233 initialWindowSize: initialWindowSize,
234 }
235 if opts.InitialWindowSize >= defaultWindowSize {
236 t.initialWindowSize = opts.InitialWindowSize
237 dynamicWindow = false
238 }
239 if dynamicWindow {
240 t.bdpEst = &bdpEstimator{
241 bdp: initialWindowSize,
242 updateFlowControl: t.updateFlowControl,
243 }
244 }
245 // Make sure awakenKeepalive can't be written upon.
246 // keepalive routine will make it writable, if need be.
247 t.awakenKeepalive <- struct{}{}
248 if t.statsHandler != nil {
249 t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
250 RemoteAddr: t.remoteAddr,
251 LocalAddr: t.localAddr,
252 })
253 connBegin := &stats.ConnBegin{
254 Client: true,
255 }
256 t.statsHandler.HandleConn(t.ctx, connBegin)
257 }
258 // Start the reader goroutine for incoming message. Each transport has
259 // a dedicated goroutine which reads HTTP2 frame from network. Then it
260 // dispatches the frame to the corresponding stream entity.
261 go t.reader()
262 // Send connection preface to server.
263 n, err := t.conn.Write(clientPreface)
264 if err != nil {
265 t.Close()
266 return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
267 }
268 if n != len(clientPreface) {
269 t.Close()
270 return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
271 }
272 if t.initialWindowSize != defaultWindowSize {
273 err = t.framer.writeSettings(true, http2.Setting{
274 ID: http2.SettingInitialWindowSize,
275 Val: uint32(t.initialWindowSize),
276 })
277 } else {
278 err = t.framer.writeSettings(true)
279 }
280 if err != nil {
281 t.Close()
282 return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
283 }
284 // Adjust the connection flow control window if needed.
285 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
286 if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
287 t.Close()
288 return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
289 }
290 }
291 go t.controller()
292 if t.kp.Time != infinity {
293 go t.keepalive()
294 }
295 t.writableChan <- 0
296 return t, nil
297}
298
299func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
300 // TODO(zhaoq): Handle uint32 overflow of Stream.id.
301 s := &Stream{
302 id: t.nextID,
303 done: make(chan struct{}),
304 goAway: make(chan struct{}),
305 method: callHdr.Method,
306 sendCompress: callHdr.SendCompress,
307 buf: newRecvBuffer(),
308 fc: &inFlow{limit: uint32(t.initialWindowSize)},
309 sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
310 headerChan: make(chan struct{}),
311 }
312 t.nextID += 2
313 s.requestRead = func(n int) {
314 t.adjustWindow(s, uint32(n))
315 }
316 // The client side stream context should have exactly the same life cycle with the user provided context.
317 // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
318 // So we use the original context here instead of creating a copy.
319 s.ctx = ctx
320 s.trReader = &transportReader{
321 reader: &recvBufferReader{
322 ctx: s.ctx,
323 goAway: s.goAway,
324 recv: s.buf,
325 },
326 windowHandler: func(n int) {
327 t.updateWindow(s, uint32(n))
328 },
329 }
330
331 return s
332}
333
334// NewStream creates a stream and registers it into the transport as "active"
335// streams.
336func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
337 pr := &peer.Peer{
338 Addr: t.remoteAddr,
339 }
340 // Attach Auth info if there is any.
341 if t.authInfo != nil {
342 pr.AuthInfo = t.authInfo
343 }
344 ctx = peer.NewContext(ctx, pr)
345 var (
346 authData = make(map[string]string)
347 audience string
348 )
349 // Create an audience string only if needed.
350 if len(t.creds) > 0 || callHdr.Creds != nil {
351 // Construct URI required to get auth request metadata.
352 var port string
353 if pos := strings.LastIndex(t.target, ":"); pos != -1 {
354 // Omit port if it is the default one.
355 if t.target[pos+1:] != "443" {
356 port = ":" + t.target[pos+1:]
357 }
358 }
359 pos := strings.LastIndex(callHdr.Method, "/")
360 if pos == -1 {
361 pos = len(callHdr.Method)
362 }
363 audience = "https://" + callHdr.Host + port + callHdr.Method[:pos]
364 }
365 for _, c := range t.creds {
366 data, err := c.GetRequestMetadata(ctx, audience)
367 if err != nil {
368 return nil, streamErrorf(codes.Internal, "transport: %v", err)
369 }
370 for k, v := range data {
371 // Capital header names are illegal in HTTP/2.
372 k = strings.ToLower(k)
373 authData[k] = v
374 }
375 }
376 callAuthData := make(map[string]string)
377 // Check if credentials.PerRPCCredentials were provided via call options.
378 // Note: if these credentials are provided both via dial options and call
379 // options, then both sets of credentials will be applied.
380 if callCreds := callHdr.Creds; callCreds != nil {
381 if !t.isSecure && callCreds.RequireTransportSecurity() {
382 return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure conneciton")
383 }
384 data, err := callCreds.GetRequestMetadata(ctx, audience)
385 if err != nil {
386 return nil, streamErrorf(codes.Internal, "transport: %v", err)
387 }
388 for k, v := range data {
389 // Capital header names are illegal in HTTP/2
390 k = strings.ToLower(k)
391 callAuthData[k] = v
392 }
393 }
394 t.mu.Lock()
395 if t.activeStreams == nil {
396 t.mu.Unlock()
397 return nil, ErrConnClosing
398 }
399 if t.state == draining {
400 t.mu.Unlock()
401 return nil, ErrStreamDrain
402 }
403 if t.state != reachable {
404 t.mu.Unlock()
405 return nil, ErrConnClosing
406 }
407 t.mu.Unlock()
408 sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
409 if err != nil {
410 return nil, err
411 }
412 // Returns the quota balance back.
413 if sq > 1 {
414 t.streamsQuota.add(sq - 1)
415 }
416 if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
417 // Return the quota back now because there is no stream returned to the caller.
418 if _, ok := err.(StreamError); ok {
419 t.streamsQuota.add(1)
420 }
421 return nil, err
422 }
423 t.mu.Lock()
424 if t.state == draining {
425 t.mu.Unlock()
426 t.streamsQuota.add(1)
427 // Need to make t writable again so that the rpc in flight can still proceed.
428 t.writableChan <- 0
429 return nil, ErrStreamDrain
430 }
431 if t.state != reachable {
432 t.mu.Unlock()
433 return nil, ErrConnClosing
434 }
435 s := t.newStream(ctx, callHdr)
436 t.activeStreams[s.id] = s
437 // If the number of active streams change from 0 to 1, then check if keepalive
438 // has gone dormant. If so, wake it up.
439 if len(t.activeStreams) == 1 {
440 select {
441 case t.awakenKeepalive <- struct{}{}:
442 t.framer.writePing(false, false, [8]byte{})
443 default:
444 }
445 }
446
447 t.mu.Unlock()
448
449 // HPACK encodes various headers. Note that once WriteField(...) is
450 // called, the corresponding headers/continuation frame has to be sent
451 // because hpack.Encoder is stateful.
452 t.hBuf.Reset()
453 t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"})
454 t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme})
455 t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method})
456 t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
457 t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
458 t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
459 t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"})
460
461 if callHdr.SendCompress != "" {
462 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
463 }
464 if dl, ok := ctx.Deadline(); ok {
465 // Send out timeout regardless its value. The server can detect timeout context by itself.
466 timeout := dl.Sub(time.Now())
467 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
468 }
469
470 for k, v := range authData {
471 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
472 }
473 for k, v := range callAuthData {
474 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
475 }
476 var (
477 endHeaders bool
478 )
479 if md, ok := metadata.FromOutgoingContext(ctx); ok {
480 for k, vv := range md {
481 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
482 if isReservedHeader(k) {
483 continue
484 }
485 for _, v := range vv {
486 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
487 }
488 }
489 }
490 if md, ok := t.md.(*metadata.MD); ok {
491 for k, vv := range *md {
492 if isReservedHeader(k) {
493 continue
494 }
495 for _, v := range vv {
496 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
497 }
498 }
499 }
500 first := true
501 bufLen := t.hBuf.Len()
502 // Sends the headers in a single batch even when they span multiple frames.
503 for !endHeaders {
504 size := t.hBuf.Len()
505 if size > http2MaxFrameLen {
506 size = http2MaxFrameLen
507 } else {
508 endHeaders = true
509 }
510 var flush bool
511 if callHdr.Flush && endHeaders {
512 flush = true
513 }
514 if first {
515 // Sends a HeadersFrame to server to start a new stream.
516 p := http2.HeadersFrameParam{
517 StreamID: s.id,
518 BlockFragment: t.hBuf.Next(size),
519 EndStream: false,
520 EndHeaders: endHeaders,
521 }
522 // Do a force flush for the buffered frames iff it is the last headers frame
523 // and there is header metadata to be sent. Otherwise, there is flushing until
524 // the corresponding data frame is written.
525 err = t.framer.writeHeaders(flush, p)
526 first = false
527 } else {
528 // Sends Continuation frames for the leftover headers.
529 err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size))
530 }
531 if err != nil {
532 t.notifyError(err)
533 return nil, connectionErrorf(true, err, "transport: %v", err)
534 }
535 }
536 s.mu.Lock()
537 s.bytesSent = true
538 s.mu.Unlock()
539
540 if t.statsHandler != nil {
541 outHeader := &stats.OutHeader{
542 Client: true,
543 WireLength: bufLen,
544 FullMethod: callHdr.Method,
545 RemoteAddr: t.remoteAddr,
546 LocalAddr: t.localAddr,
547 Compression: callHdr.SendCompress,
548 }
549 t.statsHandler.HandleRPC(s.ctx, outHeader)
550 }
551 t.writableChan <- 0
552 return s, nil
553}
554
555// CloseStream clears the footprint of a stream when the stream is not needed any more.
556// This must not be executed in reader's goroutine.
557func (t *http2Client) CloseStream(s *Stream, err error) {
558 t.mu.Lock()
559 if t.activeStreams == nil {
560 t.mu.Unlock()
561 return
562 }
563 if err != nil {
564 // notify in-flight streams, before the deletion
565 s.write(recvMsg{err: err})
566 }
567 delete(t.activeStreams, s.id)
568 if t.state == draining && len(t.activeStreams) == 0 {
569 // The transport is draining and s is the last live stream on t.
570 t.mu.Unlock()
571 t.Close()
572 return
573 }
574 t.mu.Unlock()
575 // rstStream is true in case the stream is being closed at the client-side
576 // and the server needs to be intimated about it by sending a RST_STREAM
577 // frame.
578 // To make sure this frame is written to the wire before the headers of the
579 // next stream waiting for streamsQuota, we add to streamsQuota pool only
580 // after having acquired the writableChan to send RST_STREAM out (look at
581 // the controller() routine).
582 var rstStream bool
583 var rstError http2.ErrCode
584 defer func() {
585 // In case, the client doesn't have to send RST_STREAM to server
586 // we can safely add back to streamsQuota pool now.
587 if !rstStream {
588 t.streamsQuota.add(1)
589 return
590 }
591 t.controlBuf.put(&resetStream{s.id, rstError})
592 }()
593 s.mu.Lock()
594 rstStream = s.rstStream
595 rstError = s.rstError
596 if s.state == streamDone {
597 s.mu.Unlock()
598 return
599 }
600 if !s.headerDone {
601 close(s.headerChan)
602 s.headerDone = true
603 }
604 s.state = streamDone
605 s.mu.Unlock()
606 if _, ok := err.(StreamError); ok {
607 rstStream = true
608 rstError = http2.ErrCodeCancel
609 }
610}
611
612// Close kicks off the shutdown process of the transport. This should be called
613// only once on a transport. Once it is called, the transport should not be
614// accessed any more.
615func (t *http2Client) Close() (err error) {
616 t.mu.Lock()
617 if t.state == closing {
618 t.mu.Unlock()
619 return
620 }
621 if t.state == reachable || t.state == draining {
622 close(t.errorChan)
623 }
624 t.state = closing
625 t.mu.Unlock()
626 close(t.shutdownChan)
627 err = t.conn.Close()
628 t.mu.Lock()
629 streams := t.activeStreams
630 t.activeStreams = nil
631 t.mu.Unlock()
632 // Notify all active streams.
633 for _, s := range streams {
634 s.mu.Lock()
635 if !s.headerDone {
636 close(s.headerChan)
637 s.headerDone = true
638 }
639 s.mu.Unlock()
640 s.write(recvMsg{err: ErrConnClosing})
641 }
642 if t.statsHandler != nil {
643 connEnd := &stats.ConnEnd{
644 Client: true,
645 }
646 t.statsHandler.HandleConn(t.ctx, connEnd)
647 }
648 return
649}
650
651func (t *http2Client) GracefulClose() error {
652 t.mu.Lock()
653 switch t.state {
654 case unreachable:
655 // The server may close the connection concurrently. t is not available for
656 // any streams. Close it now.
657 t.mu.Unlock()
658 t.Close()
659 return nil
660 case closing:
661 t.mu.Unlock()
662 return nil
663 }
664 if t.state == draining {
665 t.mu.Unlock()
666 return nil
667 }
668 t.state = draining
669 active := len(t.activeStreams)
670 t.mu.Unlock()
671 if active == 0 {
672 return t.Close()
673 }
674 return nil
675}
676
677// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
678// should proceed only if Write returns nil.
679// TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later
680// if it improves the performance.
681func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
682 r := bytes.NewBuffer(data)
683 var (
684 p []byte
685 oqv uint32
686 )
687 for {
688 oqv = atomic.LoadUint32(&t.outQuotaVersion)
689 if r.Len() > 0 || p != nil {
690 size := http2MaxFrameLen
691 // Wait until the stream has some quota to send the data.
692 sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
693 if err != nil {
694 return err
695 }
696 // Wait until the transport has some quota to send the data.
697 tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
698 if err != nil {
699 return err
700 }
701 if sq < size {
702 size = sq
703 }
704 if tq < size {
705 size = tq
706 }
707 if p == nil {
708 p = r.Next(size)
709 }
710 ps := len(p)
711 if ps < sq {
712 // Overbooked stream quota. Return it back.
713 s.sendQuotaPool.add(sq - ps)
714 }
715 if ps < tq {
716 // Overbooked transport quota. Return it back.
717 t.sendQuotaPool.add(tq - ps)
718 }
719 }
720 var (
721 endStream bool
722 forceFlush bool
723 )
724 if opts.Last && r.Len() == 0 {
725 endStream = true
726 }
727 // Indicate there is a writer who is about to write a data frame.
728 t.framer.adjustNumWriters(1)
729 // Got some quota. Try to acquire writing privilege on the transport.
730 if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil {
731 if _, ok := err.(StreamError); ok || err == io.EOF {
732 // Return the connection quota back.
733 t.sendQuotaPool.add(len(p))
734 }
735 if t.framer.adjustNumWriters(-1) == 0 {
736 // This writer is the last one in this batch and has the
737 // responsibility to flush the buffered frames. It queues
738 // a flush request to controlBuf instead of flushing directly
739 // in order to avoid the race with other writing or flushing.
740 t.controlBuf.put(&flushIO{})
741 }
742 return err
743 }
744 select {
745 case <-s.ctx.Done():
746 t.sendQuotaPool.add(len(p))
747 if t.framer.adjustNumWriters(-1) == 0 {
748 t.controlBuf.put(&flushIO{})
749 }
750 t.writableChan <- 0
751 return ContextErr(s.ctx.Err())
752 default:
753 }
754 if oqv != atomic.LoadUint32(&t.outQuotaVersion) {
755 // InitialWindowSize settings frame must have been received after we
756 // acquired send quota but before we got the writable channel.
757 // We must forsake this write.
758 t.sendQuotaPool.add(len(p))
759 s.sendQuotaPool.add(len(p))
760 if t.framer.adjustNumWriters(-1) == 0 {
761 t.controlBuf.put(&flushIO{})
762 }
763 t.writableChan <- 0
764 continue
765 }
766 if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 {
767 // Do a force flush iff this is last frame for the entire gRPC message
768 // and the caller is the only writer at this moment.
769 forceFlush = true
770 }
771 // If WriteData fails, all the pending streams will be handled
772 // by http2Client.Close(). No explicit CloseStream() needs to be
773 // invoked.
774 if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
775 t.notifyError(err)
776 return connectionErrorf(true, err, "transport: %v", err)
777 }
778 p = nil
779 if t.framer.adjustNumWriters(-1) == 0 {
780 t.framer.flushWrite()
781 }
782 t.writableChan <- 0
783 if r.Len() == 0 {
784 break
785 }
786 }
787 if !opts.Last {
788 return nil
789 }
790 s.mu.Lock()
791 if s.state != streamDone {
792 s.state = streamWriteDone
793 }
794 s.mu.Unlock()
795 return nil
796}
797
798func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
799 t.mu.Lock()
800 defer t.mu.Unlock()
801 s, ok := t.activeStreams[f.Header().StreamID]
802 return s, ok
803}
804
805// adjustWindow sends out extra window update over the initial window size
806// of stream if the application is requesting data larger in size than
807// the window.
808func (t *http2Client) adjustWindow(s *Stream, n uint32) {
809 s.mu.Lock()
810 defer s.mu.Unlock()
811 if s.state == streamDone {
812 return
813 }
814 if w := s.fc.maybeAdjust(n); w > 0 {
815 // Piggyback conneciton's window update along.
816 if cw := t.fc.resetPendingUpdate(); cw > 0 {
817 t.controlBuf.put(&windowUpdate{0, cw, false})
818 }
819 t.controlBuf.put(&windowUpdate{s.id, w, true})
820 }
821}
822
823// updateWindow adjusts the inbound quota for the stream and the transport.
824// Window updates will deliver to the controller for sending when
825// the cumulative quota exceeds the corresponding threshold.
826func (t *http2Client) updateWindow(s *Stream, n uint32) {
827 s.mu.Lock()
828 defer s.mu.Unlock()
829 if s.state == streamDone {
830 return
831 }
832 if w := s.fc.onRead(n); w > 0 {
833 if cw := t.fc.resetPendingUpdate(); cw > 0 {
834 t.controlBuf.put(&windowUpdate{0, cw, false})
835 }
836 t.controlBuf.put(&windowUpdate{s.id, w, true})
837 }
838}
839
840// updateFlowControl updates the incoming flow control windows
841// for the transport and the stream based on the current bdp
842// estimation.
843func (t *http2Client) updateFlowControl(n uint32) {
844 t.mu.Lock()
845 for _, s := range t.activeStreams {
846 s.fc.newLimit(n)
847 }
848 t.initialWindowSize = int32(n)
849 t.mu.Unlock()
850 t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false})
851 t.controlBuf.put(&settings{
852 ack: false,
853 ss: []http2.Setting{
854 {
855 ID: http2.SettingInitialWindowSize,
856 Val: uint32(n),
857 },
858 },
859 })
860}
861
862func (t *http2Client) handleData(f *http2.DataFrame) {
863 size := f.Header().Length
864 var sendBDPPing bool
865 if t.bdpEst != nil {
866 sendBDPPing = t.bdpEst.add(uint32(size))
867 }
868 // Decouple connection's flow control from application's read.
869 // An update on connection's flow control should not depend on
870 // whether user application has read the data or not. Such a
871 // restriction is already imposed on the stream's flow control,
872 // and therefore the sender will be blocked anyways.
873 // Decoupling the connection flow control will prevent other
874 // active(fast) streams from starving in presence of slow or
875 // inactive streams.
876 //
877 // Furthermore, if a bdpPing is being sent out we can piggyback
878 // connection's window update for the bytes we just received.
879 if sendBDPPing {
880 t.controlBuf.put(&windowUpdate{0, uint32(size), false})
881 t.controlBuf.put(bdpPing)
882 } else {
883 if err := t.fc.onData(uint32(size)); err != nil {
884 t.notifyError(connectionErrorf(true, err, "%v", err))
885 return
886 }
887 if w := t.fc.onRead(uint32(size)); w > 0 {
888 t.controlBuf.put(&windowUpdate{0, w, true})
889 }
890 }
891 // Select the right stream to dispatch.
892 s, ok := t.getStream(f)
893 if !ok {
894 return
895 }
896 if size > 0 {
897 s.mu.Lock()
898 if s.state == streamDone {
899 s.mu.Unlock()
900 return
901 }
902 if err := s.fc.onData(uint32(size)); err != nil {
903 s.rstStream = true
904 s.rstError = http2.ErrCodeFlowControl
905 s.finish(status.New(codes.Internal, err.Error()))
906 s.mu.Unlock()
907 s.write(recvMsg{err: io.EOF})
908 return
909 }
910 if f.Header().Flags.Has(http2.FlagDataPadded) {
911 if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
912 t.controlBuf.put(&windowUpdate{s.id, w, true})
913 }
914 }
915 s.mu.Unlock()
916 // TODO(bradfitz, zhaoq): A copy is required here because there is no
917 // guarantee f.Data() is consumed before the arrival of next frame.
918 // Can this copy be eliminated?
919 if len(f.Data()) > 0 {
920 data := make([]byte, len(f.Data()))
921 copy(data, f.Data())
922 s.write(recvMsg{data: data})
923 }
924 }
925 // The server has closed the stream without sending trailers. Record that
926 // the read direction is closed, and set the status appropriately.
927 if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
928 s.mu.Lock()
929 if s.state == streamDone {
930 s.mu.Unlock()
931 return
932 }
933 s.finish(status.New(codes.Internal, "server closed the stream without sending trailers"))
934 s.mu.Unlock()
935 s.write(recvMsg{err: io.EOF})
936 }
937}
938
939func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
940 s, ok := t.getStream(f)
941 if !ok {
942 return
943 }
944 s.mu.Lock()
945 if s.state == streamDone {
946 s.mu.Unlock()
947 return
948 }
949 if !s.headerDone {
950 close(s.headerChan)
951 s.headerDone = true
952 }
953 statusCode, ok := http2ErrConvTab[http2.ErrCode(f.ErrCode)]
954 if !ok {
955 warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
956 statusCode = codes.Unknown
957 }
958 s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %d", f.ErrCode))
959 s.mu.Unlock()
960 s.write(recvMsg{err: io.EOF})
961}
962
963func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
964 if f.IsAck() {
965 return
966 }
967 var ss []http2.Setting
968 f.ForeachSetting(func(s http2.Setting) error {
969 ss = append(ss, s)
970 return nil
971 })
972 // The settings will be applied once the ack is sent.
973 t.controlBuf.put(&settings{ack: true, ss: ss})
974}
975
976func (t *http2Client) handlePing(f *http2.PingFrame) {
977 if f.IsAck() {
978 // Maybe it's a BDP ping.
979 if t.bdpEst != nil {
980 t.bdpEst.calculate(f.Data)
981 }
982 return
983 }
984 pingAck := &ping{ack: true}
985 copy(pingAck.data[:], f.Data[:])
986 t.controlBuf.put(pingAck)
987}
988
989func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
990 t.mu.Lock()
991 if t.state != reachable && t.state != draining {
992 t.mu.Unlock()
993 return
994 }
995 if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
996 infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
997 }
998 id := f.LastStreamID
999 if id > 0 && id%2 != 1 {
1000 t.mu.Unlock()
1001 t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
1002 return
1003 }
1004 // A client can recieve multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387).
1005 // The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay
1006 // with the ID of the last stream the server will process.
1007 // Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we
1008 // close all streams created after the second GoAwayId. This way streams that were in-flight while the GoAway from server
1009 // was being sent don't get killed.
1010 select {
1011 case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
1012 // If there are multiple GoAways the first one should always have an ID greater than the following ones.
1013 if id > t.prevGoAwayID {
1014 t.mu.Unlock()
1015 t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
1016 return
1017 }
1018 default:
1019 t.setGoAwayReason(f)
1020 close(t.goAway)
1021 t.state = draining
1022 }
1023 // All streams with IDs greater than the GoAwayId
1024 // and smaller than the previous GoAway ID should be killed.
1025 upperLimit := t.prevGoAwayID
1026 if upperLimit == 0 { // This is the first GoAway Frame.
1027 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
1028 }
1029 for streamID, stream := range t.activeStreams {
1030 if streamID > id && streamID <= upperLimit {
1031 close(stream.goAway)
1032 }
1033 }
1034 t.prevGoAwayID = id
1035 active := len(t.activeStreams)
1036 t.mu.Unlock()
1037 if active == 0 {
1038 t.Close()
1039 }
1040}
1041
1042// setGoAwayReason sets the value of t.goAwayReason based
1043// on the GoAway frame received.
1044// It expects a lock on transport's mutext to be held by
1045// the caller.
1046func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1047 t.goAwayReason = NoReason
1048 switch f.ErrCode {
1049 case http2.ErrCodeEnhanceYourCalm:
1050 if string(f.DebugData()) == "too_many_pings" {
1051 t.goAwayReason = TooManyPings
1052 }
1053 }
1054}
1055
1056func (t *http2Client) GetGoAwayReason() GoAwayReason {
1057 t.mu.Lock()
1058 defer t.mu.Unlock()
1059 return t.goAwayReason
1060}
1061
1062func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1063 id := f.Header().StreamID
1064 incr := f.Increment
1065 if id == 0 {
1066 t.sendQuotaPool.add(int(incr))
1067 return
1068 }
1069 if s, ok := t.getStream(f); ok {
1070 s.sendQuotaPool.add(int(incr))
1071 }
1072}
1073
1074// operateHeaders takes action on the decoded headers.
1075func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1076 s, ok := t.getStream(frame)
1077 if !ok {
1078 return
1079 }
1080 s.mu.Lock()
1081 s.bytesReceived = true
1082 s.mu.Unlock()
1083 var state decodeState
1084 if err := state.decodeResponseHeader(frame); err != nil {
1085 s.mu.Lock()
1086 if !s.headerDone {
1087 close(s.headerChan)
1088 s.headerDone = true
1089 }
1090 s.mu.Unlock()
1091 s.write(recvMsg{err: err})
1092 // Something wrong. Stops reading even when there is remaining.
1093 return
1094 }
1095
1096 endStream := frame.StreamEnded()
1097 var isHeader bool
1098 defer func() {
1099 if t.statsHandler != nil {
1100 if isHeader {
1101 inHeader := &stats.InHeader{
1102 Client: true,
1103 WireLength: int(frame.Header().Length),
1104 }
1105 t.statsHandler.HandleRPC(s.ctx, inHeader)
1106 } else {
1107 inTrailer := &stats.InTrailer{
1108 Client: true,
1109 WireLength: int(frame.Header().Length),
1110 }
1111 t.statsHandler.HandleRPC(s.ctx, inTrailer)
1112 }
1113 }
1114 }()
1115
1116 s.mu.Lock()
1117 if !endStream {
1118 s.recvCompress = state.encoding
1119 }
1120 if !s.headerDone {
1121 if !endStream && len(state.mdata) > 0 {
1122 s.header = state.mdata
1123 }
1124 close(s.headerChan)
1125 s.headerDone = true
1126 isHeader = true
1127 }
1128 if !endStream || s.state == streamDone {
1129 s.mu.Unlock()
1130 return
1131 }
1132
1133 if len(state.mdata) > 0 {
1134 s.trailer = state.mdata
1135 }
1136 s.finish(state.status())
1137 s.mu.Unlock()
1138 s.write(recvMsg{err: io.EOF})
1139}
1140
1141func handleMalformedHTTP2(s *Stream, err error) {
1142 s.mu.Lock()
1143 if !s.headerDone {
1144 close(s.headerChan)
1145 s.headerDone = true
1146 }
1147 s.mu.Unlock()
1148 s.write(recvMsg{err: err})
1149}
1150
1151// reader runs as a separate goroutine in charge of reading data from network
1152// connection.
1153//
1154// TODO(zhaoq): currently one reader per transport. Investigate whether this is
1155// optimal.
1156// TODO(zhaoq): Check the validity of the incoming frame sequence.
1157func (t *http2Client) reader() {
1158 // Check the validity of server preface.
1159 frame, err := t.framer.readFrame()
1160 if err != nil {
1161 t.notifyError(err)
1162 return
1163 }
1164 atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1165 sf, ok := frame.(*http2.SettingsFrame)
1166 if !ok {
1167 t.notifyError(err)
1168 return
1169 }
1170 t.handleSettings(sf)
1171
1172 // loop to keep reading incoming messages on this transport.
1173 for {
1174 frame, err := t.framer.readFrame()
1175 atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1176 if err != nil {
1177 // Abort an active stream if the http2.Framer returns a
1178 // http2.StreamError. This can happen only if the server's response
1179 // is malformed http2.
1180 if se, ok := err.(http2.StreamError); ok {
1181 t.mu.Lock()
1182 s := t.activeStreams[se.StreamID]
1183 t.mu.Unlock()
1184 if s != nil {
1185 // use error detail to provide better err message
1186 handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
1187 }
1188 continue
1189 } else {
1190 // Transport error.
1191 t.notifyError(err)
1192 return
1193 }
1194 }
1195 switch frame := frame.(type) {
1196 case *http2.MetaHeadersFrame:
1197 t.operateHeaders(frame)
1198 case *http2.DataFrame:
1199 t.handleData(frame)
1200 case *http2.RSTStreamFrame:
1201 t.handleRSTStream(frame)
1202 case *http2.SettingsFrame:
1203 t.handleSettings(frame)
1204 case *http2.PingFrame:
1205 t.handlePing(frame)
1206 case *http2.GoAwayFrame:
1207 t.handleGoAway(frame)
1208 case *http2.WindowUpdateFrame:
1209 t.handleWindowUpdate(frame)
1210 default:
1211 errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1212 }
1213 }
1214}
1215
1216func (t *http2Client) applySettings(ss []http2.Setting) {
1217 for _, s := range ss {
1218 switch s.ID {
1219 case http2.SettingMaxConcurrentStreams:
1220 // TODO(zhaoq): This is a hack to avoid significant refactoring of the
1221 // code to deal with the unrealistic int32 overflow. Probably will try
1222 // to find a better way to handle this later.
1223 if s.Val > math.MaxInt32 {
1224 s.Val = math.MaxInt32
1225 }
1226 t.mu.Lock()
1227 ms := t.maxStreams
1228 t.maxStreams = int(s.Val)
1229 t.mu.Unlock()
1230 t.streamsQuota.add(int(s.Val) - ms)
1231 case http2.SettingInitialWindowSize:
1232 t.mu.Lock()
1233 for _, stream := range t.activeStreams {
1234 // Adjust the sending quota for each stream.
1235 stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota))
1236 }
1237 t.streamSendQuota = s.Val
1238 t.mu.Unlock()
1239 atomic.AddUint32(&t.outQuotaVersion, 1)
1240 }
1241 }
1242}
1243
1244// controller running in a separate goroutine takes charge of sending control
1245// frames (e.g., window update, reset stream, setting, etc.) to the server.
1246func (t *http2Client) controller() {
1247 for {
1248 select {
1249 case i := <-t.controlBuf.get():
1250 t.controlBuf.load()
1251 select {
1252 case <-t.writableChan:
1253 switch i := i.(type) {
1254 case *windowUpdate:
1255 t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
1256 case *settings:
1257 if i.ack {
1258 t.framer.writeSettingsAck(true)
1259 t.applySettings(i.ss)
1260 } else {
1261 t.framer.writeSettings(true, i.ss...)
1262 }
1263 case *resetStream:
1264 // If the server needs to be to intimated about stream closing,
1265 // then we need to make sure the RST_STREAM frame is written to
1266 // the wire before the headers of the next stream waiting on
1267 // streamQuota. We ensure this by adding to the streamsQuota pool
1268 // only after having acquired the writableChan to send RST_STREAM.
1269 t.streamsQuota.add(1)
1270 t.framer.writeRSTStream(true, i.streamID, i.code)
1271 case *flushIO:
1272 t.framer.flushWrite()
1273 case *ping:
1274 if !i.ack {
1275 t.bdpEst.timesnap(i.data)
1276 }
1277 t.framer.writePing(true, i.ack, i.data)
1278 default:
1279 errorf("transport: http2Client.controller got unexpected item type %v\n", i)
1280 }
1281 t.writableChan <- 0
1282 continue
1283 case <-t.shutdownChan:
1284 return
1285 }
1286 case <-t.shutdownChan:
1287 return
1288 }
1289 }
1290}
1291
1292// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
1293func (t *http2Client) keepalive() {
1294 p := &ping{data: [8]byte{}}
1295 timer := time.NewTimer(t.kp.Time)
1296 for {
1297 select {
1298 case <-timer.C:
1299 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1300 timer.Reset(t.kp.Time)
1301 continue
1302 }
1303 // Check if keepalive should go dormant.
1304 t.mu.Lock()
1305 if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1306 // Make awakenKeepalive writable.
1307 <-t.awakenKeepalive
1308 t.mu.Unlock()
1309 select {
1310 case <-t.awakenKeepalive:
1311 // If the control gets here a ping has been sent
1312 // need to reset the timer with keepalive.Timeout.
1313 case <-t.shutdownChan:
1314 return
1315 }
1316 } else {
1317 t.mu.Unlock()
1318 // Send ping.
1319 t.controlBuf.put(p)
1320 }
1321
1322 // By the time control gets here a ping has been sent one way or the other.
1323 timer.Reset(t.kp.Timeout)
1324 select {
1325 case <-timer.C:
1326 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1327 timer.Reset(t.kp.Time)
1328 continue
1329 }
1330 t.Close()
1331 return
1332 case <-t.shutdownChan:
1333 if !timer.Stop() {
1334 <-timer.C
1335 }
1336 return
1337 }
1338 case <-t.shutdownChan:
1339 if !timer.Stop() {
1340 <-timer.C
1341 }
1342 return
1343 }
1344 }
1345}
1346
1347func (t *http2Client) Error() <-chan struct{} {
1348 return t.errorChan
1349}
1350
1351func (t *http2Client) GoAway() <-chan struct{} {
1352 return t.goAway
1353}
1354
1355func (t *http2Client) notifyError(err error) {
1356 t.mu.Lock()
1357 // make sure t.errorChan is closed only once.
1358 if t.state == draining {
1359 t.mu.Unlock()
1360 t.Close()
1361 return
1362 }
1363 if t.state == reachable {
1364 t.state = unreachable
1365 close(t.errorChan)
1366 infof("transport: http2Client.notifyError got notified that the client transport was broken %v.", err)
1367 }
1368 t.mu.Unlock()
1369}