3 * Copyright 2014 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
31 "golang.org/x/net/context"
32 "golang.org/x/net/http2"
33 "golang.org/x/net/http2/hpack"
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/credentials"
36 "google.golang.org/grpc/keepalive"
37 "google.golang.org/grpc/metadata"
38 "google.golang.org/grpc/peer"
39 "google.golang.org/grpc/stats"
40 "google.golang.org/grpc/status"
43 // http2Client implements the ClientTransport interface with HTTP2.
44 type http2Client struct {
46 target string // server name/addr
49 conn net.Conn // underlying communication channel
52 authInfo credentials.AuthInfo // auth info about the connection
53 nextID uint32 // the next stream ID to be used
55 // writableChan synchronizes write access to the transport.
56 // A writer acquires the write lock by sending a value on writableChan
57 // and releases it by receiving from writableChan.
59 // shutdownChan is closed when Close is called.
60 // Blocking operations should select on shutdownChan to avoid
61 // blocking forever after Close.
62 // TODO(zhaoq): Maybe have a channel context?
63 shutdownChan chan struct{}
64 // errorChan is closed to notify the I/O error to the caller.
65 errorChan chan struct{}
66 // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
67 // that the server sent GoAway on this transport.
69 // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
70 awakenKeepalive chan struct{}
73 hBuf *bytes.Buffer // the buffer for HPACK encoding
74 hEnc *hpack.Encoder // HPACK encoder
76 // controlBuf delivers all the control related tasks (e.g., window
77 // updates, reset streams, and various settings) to the controller.
78 controlBuf *controlBuffer
80 // sendQuotaPool provides flow control to outbound message.
81 sendQuotaPool *quotaPool
82 // streamsQuota limits the max number of concurrent streams.
83 streamsQuota *quotaPool
85 // The scheme used: https if TLS is on, http otherwise.
90 creds []credentials.PerRPCCredentials
92 // Boolean to keep track of reading activity on transport.
93 // 1 is true and 0 is false.
94 activity uint32 // Accessed atomically.
95 kp keepalive.ClientParameters
97 statsHandler stats.Handler
99 initialWindowSize int32
102 outQuotaVersion uint32
104 mu sync.Mutex // guard the following variables
105 state transportState // the state of underlying connection
106 activeStreams map[uint32]*Stream
107 // The max number of concurrent streams
109 // the per-stream outbound flow control window size set by the peer.
110 streamSendQuota uint32
111 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
113 // goAwayReason records the http2.ErrCode and debug data received with the
115 goAwayReason GoAwayReason
118 func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
122 return dialContext(ctx, "tcp", addr)
125 func isTemporary(err error) bool {
128 // Connection closures may be resolved upon retry, and are thus
129 // treated as temporary.
131 case context.DeadlineExceeded:
132 // In Go 1.7, context.DeadlineExceeded implements Timeout(), and this
133 // special case is not needed. Until then, we need to keep this
138 switch err := err.(type) {
142 return err.Temporary()
146 // Timeouts may be resolved upon retry, and are thus treated as
153 // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
154 // and starts to receive messages on it. Non-nil error returns if construction
156 func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) {
158 conn, err := dial(ctx, opts.Dialer, addr.Addr)
160 if opts.FailOnNonTempDialError {
161 return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
163 return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
165 // Any further errors will close the underlying connection
166 defer func(conn net.Conn) {
173 authInfo credentials.AuthInfo
175 if creds := opts.TransportCredentials; creds != nil {
177 conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn)
179 // Credentials handshake errors are typically considered permanent
180 // to avoid retrying on e.g. bad certificates.
181 temp := isTemporary(err)
182 return nil, connectionErrorf(temp, err, "transport: authentication handshake failed: %v", err)
186 kp := opts.KeepaliveParams
187 // Validate keepalive parameters.
189 kp.Time = defaultClientKeepaliveTime
192 kp.Timeout = defaultClientKeepaliveTimeout
194 dynamicWindow := true
195 icwz := int32(initialWindowSize)
196 if opts.InitialConnWindowSize >= defaultWindowSize {
197 icwz = opts.InitialConnWindowSize
198 dynamicWindow = false
204 userAgent: opts.UserAgent,
207 remoteAddr: conn.RemoteAddr(),
208 localAddr: conn.LocalAddr(),
210 // The client initiated stream id is odd starting from 1.
212 writableChan: make(chan int, 1),
213 shutdownChan: make(chan struct{}),
214 errorChan: make(chan struct{}),
215 goAway: make(chan struct{}),
216 awakenKeepalive: make(chan struct{}, 1),
217 framer: newFramer(conn),
219 hEnc: hpack.NewEncoder(&buf),
220 controlBuf: newControlBuffer(),
221 fc: &inFlow{limit: uint32(icwz)},
222 sendQuotaPool: newQuotaPool(defaultWindowSize),
225 activeStreams: make(map[uint32]*Stream),
227 creds: opts.PerRPCCredentials,
228 maxStreams: defaultMaxStreamsClient,
229 streamsQuota: newQuotaPool(defaultMaxStreamsClient),
230 streamSendQuota: defaultWindowSize,
232 statsHandler: opts.StatsHandler,
233 initialWindowSize: initialWindowSize,
235 if opts.InitialWindowSize >= defaultWindowSize {
236 t.initialWindowSize = opts.InitialWindowSize
237 dynamicWindow = false
240 t.bdpEst = &bdpEstimator{
241 bdp: initialWindowSize,
242 updateFlowControl: t.updateFlowControl,
245 // Make sure awakenKeepalive can't be written upon.
246 // keepalive routine will make it writable, if need be.
247 t.awakenKeepalive <- struct{}{}
248 if t.statsHandler != nil {
249 t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
250 RemoteAddr: t.remoteAddr,
251 LocalAddr: t.localAddr,
253 connBegin := &stats.ConnBegin{
256 t.statsHandler.HandleConn(t.ctx, connBegin)
258 // Start the reader goroutine for incoming message. Each transport has
259 // a dedicated goroutine which reads HTTP2 frame from network. Then it
260 // dispatches the frame to the corresponding stream entity.
262 // Send connection preface to server.
263 n, err := t.conn.Write(clientPreface)
266 return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
268 if n != len(clientPreface) {
270 return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
272 if t.initialWindowSize != defaultWindowSize {
273 err = t.framer.writeSettings(true, http2.Setting{
274 ID: http2.SettingInitialWindowSize,
275 Val: uint32(t.initialWindowSize),
278 err = t.framer.writeSettings(true)
282 return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
284 // Adjust the connection flow control window if needed.
285 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
286 if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
288 return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
292 if t.kp.Time != infinity {
299 func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
300 // TODO(zhaoq): Handle uint32 overflow of Stream.id.
303 done: make(chan struct{}),
304 goAway: make(chan struct{}),
305 method: callHdr.Method,
306 sendCompress: callHdr.SendCompress,
307 buf: newRecvBuffer(),
308 fc: &inFlow{limit: uint32(t.initialWindowSize)},
309 sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
310 headerChan: make(chan struct{}),
313 s.requestRead = func(n int) {
314 t.adjustWindow(s, uint32(n))
316 // The client side stream context should have exactly the same life cycle with the user provided context.
317 // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
318 // So we use the original context here instead of creating a copy.
320 s.trReader = &transportReader{
321 reader: &recvBufferReader{
326 windowHandler: func(n int) {
327 t.updateWindow(s, uint32(n))
334 // NewStream creates a stream and registers it into the transport as "active"
336 func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
340 // Attach Auth info if there is any.
341 if t.authInfo != nil {
342 pr.AuthInfo = t.authInfo
344 ctx = peer.NewContext(ctx, pr)
346 authData = make(map[string]string)
349 // Create an audience string only if needed.
350 if len(t.creds) > 0 || callHdr.Creds != nil {
351 // Construct URI required to get auth request metadata.
353 if pos := strings.LastIndex(t.target, ":"); pos != -1 {
354 // Omit port if it is the default one.
355 if t.target[pos+1:] != "443" {
356 port = ":" + t.target[pos+1:]
359 pos := strings.LastIndex(callHdr.Method, "/")
361 pos = len(callHdr.Method)
363 audience = "https://" + callHdr.Host + port + callHdr.Method[:pos]
365 for _, c := range t.creds {
366 data, err := c.GetRequestMetadata(ctx, audience)
368 return nil, streamErrorf(codes.Internal, "transport: %v", err)
370 for k, v := range data {
371 // Capital header names are illegal in HTTP/2.
372 k = strings.ToLower(k)
376 callAuthData := make(map[string]string)
377 // Check if credentials.PerRPCCredentials were provided via call options.
378 // Note: if these credentials are provided both via dial options and call
379 // options, then both sets of credentials will be applied.
380 if callCreds := callHdr.Creds; callCreds != nil {
381 if !t.isSecure && callCreds.RequireTransportSecurity() {
382 return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure conneciton")
384 data, err := callCreds.GetRequestMetadata(ctx, audience)
386 return nil, streamErrorf(codes.Internal, "transport: %v", err)
388 for k, v := range data {
389 // Capital header names are illegal in HTTP/2
390 k = strings.ToLower(k)
395 if t.activeStreams == nil {
397 return nil, ErrConnClosing
399 if t.state == draining {
401 return nil, ErrStreamDrain
403 if t.state != reachable {
405 return nil, ErrConnClosing
408 sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
412 // Returns the quota balance back.
414 t.streamsQuota.add(sq - 1)
416 if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
417 // Return the quota back now because there is no stream returned to the caller.
418 if _, ok := err.(StreamError); ok {
419 t.streamsQuota.add(1)
424 if t.state == draining {
426 t.streamsQuota.add(1)
427 // Need to make t writable again so that the rpc in flight can still proceed.
429 return nil, ErrStreamDrain
431 if t.state != reachable {
433 return nil, ErrConnClosing
435 s := t.newStream(ctx, callHdr)
436 t.activeStreams[s.id] = s
437 // If the number of active streams change from 0 to 1, then check if keepalive
438 // has gone dormant. If so, wake it up.
439 if len(t.activeStreams) == 1 {
441 case t.awakenKeepalive <- struct{}{}:
442 t.framer.writePing(false, false, [8]byte{})
449 // HPACK encodes various headers. Note that once WriteField(...) is
450 // called, the corresponding headers/continuation frame has to be sent
451 // because hpack.Encoder is stateful.
453 t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"})
454 t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme})
455 t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method})
456 t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
457 t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
458 t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
459 t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"})
461 if callHdr.SendCompress != "" {
462 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
464 if dl, ok := ctx.Deadline(); ok {
465 // Send out timeout regardless its value. The server can detect timeout context by itself.
466 timeout := dl.Sub(time.Now())
467 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
470 for k, v := range authData {
471 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
473 for k, v := range callAuthData {
474 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
479 if md, ok := metadata.FromOutgoingContext(ctx); ok {
480 for k, vv := range md {
481 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
482 if isReservedHeader(k) {
485 for _, v := range vv {
486 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
490 if md, ok := t.md.(*metadata.MD); ok {
491 for k, vv := range *md {
492 if isReservedHeader(k) {
495 for _, v := range vv {
496 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
501 bufLen := t.hBuf.Len()
502 // Sends the headers in a single batch even when they span multiple frames.
505 if size > http2MaxFrameLen {
506 size = http2MaxFrameLen
511 if callHdr.Flush && endHeaders {
515 // Sends a HeadersFrame to server to start a new stream.
516 p := http2.HeadersFrameParam{
518 BlockFragment: t.hBuf.Next(size),
520 EndHeaders: endHeaders,
522 // Do a force flush for the buffered frames iff it is the last headers frame
523 // and there is header metadata to be sent. Otherwise, there is flushing until
524 // the corresponding data frame is written.
525 err = t.framer.writeHeaders(flush, p)
528 // Sends Continuation frames for the leftover headers.
529 err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size))
533 return nil, connectionErrorf(true, err, "transport: %v", err)
540 if t.statsHandler != nil {
541 outHeader := &stats.OutHeader{
544 FullMethod: callHdr.Method,
545 RemoteAddr: t.remoteAddr,
546 LocalAddr: t.localAddr,
547 Compression: callHdr.SendCompress,
549 t.statsHandler.HandleRPC(s.ctx, outHeader)
555 // CloseStream clears the footprint of a stream when the stream is not needed any more.
556 // This must not be executed in reader's goroutine.
557 func (t *http2Client) CloseStream(s *Stream, err error) {
559 if t.activeStreams == nil {
564 // notify in-flight streams, before the deletion
565 s.write(recvMsg{err: err})
567 delete(t.activeStreams, s.id)
568 if t.state == draining && len(t.activeStreams) == 0 {
569 // The transport is draining and s is the last live stream on t.
575 // rstStream is true in case the stream is being closed at the client-side
576 // and the server needs to be intimated about it by sending a RST_STREAM
578 // To make sure this frame is written to the wire before the headers of the
579 // next stream waiting for streamsQuota, we add to streamsQuota pool only
580 // after having acquired the writableChan to send RST_STREAM out (look at
581 // the controller() routine).
583 var rstError http2.ErrCode
585 // In case, the client doesn't have to send RST_STREAM to server
586 // we can safely add back to streamsQuota pool now.
588 t.streamsQuota.add(1)
591 t.controlBuf.put(&resetStream{s.id, rstError})
594 rstStream = s.rstStream
595 rstError = s.rstError
596 if s.state == streamDone {
606 if _, ok := err.(StreamError); ok {
608 rstError = http2.ErrCodeCancel
612 // Close kicks off the shutdown process of the transport. This should be called
613 // only once on a transport. Once it is called, the transport should not be
614 // accessed any more.
615 func (t *http2Client) Close() (err error) {
617 if t.state == closing {
621 if t.state == reachable || t.state == draining {
626 close(t.shutdownChan)
629 streams := t.activeStreams
630 t.activeStreams = nil
632 // Notify all active streams.
633 for _, s := range streams {
640 s.write(recvMsg{err: ErrConnClosing})
642 if t.statsHandler != nil {
643 connEnd := &stats.ConnEnd{
646 t.statsHandler.HandleConn(t.ctx, connEnd)
651 func (t *http2Client) GracefulClose() error {
655 // The server may close the connection concurrently. t is not available for
656 // any streams. Close it now.
664 if t.state == draining {
669 active := len(t.activeStreams)
677 // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
678 // should proceed only if Write returns nil.
679 // TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later
680 // if it improves the performance.
681 func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
682 r := bytes.NewBuffer(data)
688 oqv = atomic.LoadUint32(&t.outQuotaVersion)
689 if r.Len() > 0 || p != nil {
690 size := http2MaxFrameLen
691 // Wait until the stream has some quota to send the data.
692 sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
696 // Wait until the transport has some quota to send the data.
697 tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
712 // Overbooked stream quota. Return it back.
713 s.sendQuotaPool.add(sq - ps)
716 // Overbooked transport quota. Return it back.
717 t.sendQuotaPool.add(tq - ps)
724 if opts.Last && r.Len() == 0 {
727 // Indicate there is a writer who is about to write a data frame.
728 t.framer.adjustNumWriters(1)
729 // Got some quota. Try to acquire writing privilege on the transport.
730 if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil {
731 if _, ok := err.(StreamError); ok || err == io.EOF {
732 // Return the connection quota back.
733 t.sendQuotaPool.add(len(p))
735 if t.framer.adjustNumWriters(-1) == 0 {
736 // This writer is the last one in this batch and has the
737 // responsibility to flush the buffered frames. It queues
738 // a flush request to controlBuf instead of flushing directly
739 // in order to avoid the race with other writing or flushing.
740 t.controlBuf.put(&flushIO{})
746 t.sendQuotaPool.add(len(p))
747 if t.framer.adjustNumWriters(-1) == 0 {
748 t.controlBuf.put(&flushIO{})
751 return ContextErr(s.ctx.Err())
754 if oqv != atomic.LoadUint32(&t.outQuotaVersion) {
755 // InitialWindowSize settings frame must have been received after we
756 // acquired send quota but before we got the writable channel.
757 // We must forsake this write.
758 t.sendQuotaPool.add(len(p))
759 s.sendQuotaPool.add(len(p))
760 if t.framer.adjustNumWriters(-1) == 0 {
761 t.controlBuf.put(&flushIO{})
766 if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 {
767 // Do a force flush iff this is last frame for the entire gRPC message
768 // and the caller is the only writer at this moment.
771 // If WriteData fails, all the pending streams will be handled
772 // by http2Client.Close(). No explicit CloseStream() needs to be
774 if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
776 return connectionErrorf(true, err, "transport: %v", err)
779 if t.framer.adjustNumWriters(-1) == 0 {
780 t.framer.flushWrite()
791 if s.state != streamDone {
792 s.state = streamWriteDone
798 func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
801 s, ok := t.activeStreams[f.Header().StreamID]
805 // adjustWindow sends out extra window update over the initial window size
806 // of stream if the application is requesting data larger in size than
808 func (t *http2Client) adjustWindow(s *Stream, n uint32) {
811 if s.state == streamDone {
814 if w := s.fc.maybeAdjust(n); w > 0 {
815 // Piggyback conneciton's window update along.
816 if cw := t.fc.resetPendingUpdate(); cw > 0 {
817 t.controlBuf.put(&windowUpdate{0, cw, false})
819 t.controlBuf.put(&windowUpdate{s.id, w, true})
823 // updateWindow adjusts the inbound quota for the stream and the transport.
824 // Window updates will deliver to the controller for sending when
825 // the cumulative quota exceeds the corresponding threshold.
826 func (t *http2Client) updateWindow(s *Stream, n uint32) {
829 if s.state == streamDone {
832 if w := s.fc.onRead(n); w > 0 {
833 if cw := t.fc.resetPendingUpdate(); cw > 0 {
834 t.controlBuf.put(&windowUpdate{0, cw, false})
836 t.controlBuf.put(&windowUpdate{s.id, w, true})
840 // updateFlowControl updates the incoming flow control windows
841 // for the transport and the stream based on the current bdp
843 func (t *http2Client) updateFlowControl(n uint32) {
845 for _, s := range t.activeStreams {
848 t.initialWindowSize = int32(n)
850 t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false})
851 t.controlBuf.put(&settings{
855 ID: http2.SettingInitialWindowSize,
862 func (t *http2Client) handleData(f *http2.DataFrame) {
863 size := f.Header().Length
866 sendBDPPing = t.bdpEst.add(uint32(size))
868 // Decouple connection's flow control from application's read.
869 // An update on connection's flow control should not depend on
870 // whether user application has read the data or not. Such a
871 // restriction is already imposed on the stream's flow control,
872 // and therefore the sender will be blocked anyways.
873 // Decoupling the connection flow control will prevent other
874 // active(fast) streams from starving in presence of slow or
877 // Furthermore, if a bdpPing is being sent out we can piggyback
878 // connection's window update for the bytes we just received.
880 t.controlBuf.put(&windowUpdate{0, uint32(size), false})
881 t.controlBuf.put(bdpPing)
883 if err := t.fc.onData(uint32(size)); err != nil {
884 t.notifyError(connectionErrorf(true, err, "%v", err))
887 if w := t.fc.onRead(uint32(size)); w > 0 {
888 t.controlBuf.put(&windowUpdate{0, w, true})
891 // Select the right stream to dispatch.
892 s, ok := t.getStream(f)
898 if s.state == streamDone {
902 if err := s.fc.onData(uint32(size)); err != nil {
904 s.rstError = http2.ErrCodeFlowControl
905 s.finish(status.New(codes.Internal, err.Error()))
907 s.write(recvMsg{err: io.EOF})
910 if f.Header().Flags.Has(http2.FlagDataPadded) {
911 if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
912 t.controlBuf.put(&windowUpdate{s.id, w, true})
916 // TODO(bradfitz, zhaoq): A copy is required here because there is no
917 // guarantee f.Data() is consumed before the arrival of next frame.
918 // Can this copy be eliminated?
919 if len(f.Data()) > 0 {
920 data := make([]byte, len(f.Data()))
922 s.write(recvMsg{data: data})
925 // The server has closed the stream without sending trailers. Record that
926 // the read direction is closed, and set the status appropriately.
927 if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
929 if s.state == streamDone {
933 s.finish(status.New(codes.Internal, "server closed the stream without sending trailers"))
935 s.write(recvMsg{err: io.EOF})
939 func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
940 s, ok := t.getStream(f)
945 if s.state == streamDone {
953 statusCode, ok := http2ErrConvTab[http2.ErrCode(f.ErrCode)]
955 warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
956 statusCode = codes.Unknown
958 s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %d", f.ErrCode))
960 s.write(recvMsg{err: io.EOF})
963 func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
967 var ss []http2.Setting
968 f.ForeachSetting(func(s http2.Setting) error {
972 // The settings will be applied once the ack is sent.
973 t.controlBuf.put(&settings{ack: true, ss: ss})
976 func (t *http2Client) handlePing(f *http2.PingFrame) {
978 // Maybe it's a BDP ping.
980 t.bdpEst.calculate(f.Data)
984 pingAck := &ping{ack: true}
985 copy(pingAck.data[:], f.Data[:])
986 t.controlBuf.put(pingAck)
989 func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
991 if t.state != reachable && t.state != draining {
995 if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
996 infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
999 if id > 0 && id%2 != 1 {
1001 t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
1004 // A client can recieve multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387).
1005 // The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay
1006 // with the ID of the last stream the server will process.
1007 // Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we
1008 // close all streams created after the second GoAwayId. This way streams that were in-flight while the GoAway from server
1009 // was being sent don't get killed.
1011 case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
1012 // If there are multiple GoAways the first one should always have an ID greater than the following ones.
1013 if id > t.prevGoAwayID {
1015 t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
1019 t.setGoAwayReason(f)
1023 // All streams with IDs greater than the GoAwayId
1024 // and smaller than the previous GoAway ID should be killed.
1025 upperLimit := t.prevGoAwayID
1026 if upperLimit == 0 { // This is the first GoAway Frame.
1027 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
1029 for streamID, stream := range t.activeStreams {
1030 if streamID > id && streamID <= upperLimit {
1031 close(stream.goAway)
1035 active := len(t.activeStreams)
1042 // setGoAwayReason sets the value of t.goAwayReason based
1043 // on the GoAway frame received.
1044 // It expects a lock on transport's mutext to be held by
1046 func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1047 t.goAwayReason = NoReason
1049 case http2.ErrCodeEnhanceYourCalm:
1050 if string(f.DebugData()) == "too_many_pings" {
1051 t.goAwayReason = TooManyPings
1056 func (t *http2Client) GetGoAwayReason() GoAwayReason {
1059 return t.goAwayReason
1062 func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1063 id := f.Header().StreamID
1066 t.sendQuotaPool.add(int(incr))
1069 if s, ok := t.getStream(f); ok {
1070 s.sendQuotaPool.add(int(incr))
1074 // operateHeaders takes action on the decoded headers.
1075 func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1076 s, ok := t.getStream(frame)
1081 s.bytesReceived = true
1083 var state decodeState
1084 if err := state.decodeResponseHeader(frame); err != nil {
1091 s.write(recvMsg{err: err})
1092 // Something wrong. Stops reading even when there is remaining.
1096 endStream := frame.StreamEnded()
1099 if t.statsHandler != nil {
1101 inHeader := &stats.InHeader{
1103 WireLength: int(frame.Header().Length),
1105 t.statsHandler.HandleRPC(s.ctx, inHeader)
1107 inTrailer := &stats.InTrailer{
1109 WireLength: int(frame.Header().Length),
1111 t.statsHandler.HandleRPC(s.ctx, inTrailer)
1118 s.recvCompress = state.encoding
1121 if !endStream && len(state.mdata) > 0 {
1122 s.header = state.mdata
1128 if !endStream || s.state == streamDone {
1133 if len(state.mdata) > 0 {
1134 s.trailer = state.mdata
1136 s.finish(state.status())
1138 s.write(recvMsg{err: io.EOF})
1141 func handleMalformedHTTP2(s *Stream, err error) {
1148 s.write(recvMsg{err: err})
1151 // reader runs as a separate goroutine in charge of reading data from network
1154 // TODO(zhaoq): currently one reader per transport. Investigate whether this is
1156 // TODO(zhaoq): Check the validity of the incoming frame sequence.
1157 func (t *http2Client) reader() {
1158 // Check the validity of server preface.
1159 frame, err := t.framer.readFrame()
1164 atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1165 sf, ok := frame.(*http2.SettingsFrame)
1170 t.handleSettings(sf)
1172 // loop to keep reading incoming messages on this transport.
1174 frame, err := t.framer.readFrame()
1175 atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1177 // Abort an active stream if the http2.Framer returns a
1178 // http2.StreamError. This can happen only if the server's response
1179 // is malformed http2.
1180 if se, ok := err.(http2.StreamError); ok {
1182 s := t.activeStreams[se.StreamID]
1185 // use error detail to provide better err message
1186 handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
1195 switch frame := frame.(type) {
1196 case *http2.MetaHeadersFrame:
1197 t.operateHeaders(frame)
1198 case *http2.DataFrame:
1200 case *http2.RSTStreamFrame:
1201 t.handleRSTStream(frame)
1202 case *http2.SettingsFrame:
1203 t.handleSettings(frame)
1204 case *http2.PingFrame:
1206 case *http2.GoAwayFrame:
1207 t.handleGoAway(frame)
1208 case *http2.WindowUpdateFrame:
1209 t.handleWindowUpdate(frame)
1211 errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1216 func (t *http2Client) applySettings(ss []http2.Setting) {
1217 for _, s := range ss {
1219 case http2.SettingMaxConcurrentStreams:
1220 // TODO(zhaoq): This is a hack to avoid significant refactoring of the
1221 // code to deal with the unrealistic int32 overflow. Probably will try
1222 // to find a better way to handle this later.
1223 if s.Val > math.MaxInt32 {
1224 s.Val = math.MaxInt32
1228 t.maxStreams = int(s.Val)
1230 t.streamsQuota.add(int(s.Val) - ms)
1231 case http2.SettingInitialWindowSize:
1233 for _, stream := range t.activeStreams {
1234 // Adjust the sending quota for each stream.
1235 stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota))
1237 t.streamSendQuota = s.Val
1239 atomic.AddUint32(&t.outQuotaVersion, 1)
1244 // controller running in a separate goroutine takes charge of sending control
1245 // frames (e.g., window update, reset stream, setting, etc.) to the server.
1246 func (t *http2Client) controller() {
1249 case i := <-t.controlBuf.get():
1252 case <-t.writableChan:
1253 switch i := i.(type) {
1255 t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
1258 t.framer.writeSettingsAck(true)
1259 t.applySettings(i.ss)
1261 t.framer.writeSettings(true, i.ss...)
1264 // If the server needs to be to intimated about stream closing,
1265 // then we need to make sure the RST_STREAM frame is written to
1266 // the wire before the headers of the next stream waiting on
1267 // streamQuota. We ensure this by adding to the streamsQuota pool
1268 // only after having acquired the writableChan to send RST_STREAM.
1269 t.streamsQuota.add(1)
1270 t.framer.writeRSTStream(true, i.streamID, i.code)
1272 t.framer.flushWrite()
1275 t.bdpEst.timesnap(i.data)
1277 t.framer.writePing(true, i.ack, i.data)
1279 errorf("transport: http2Client.controller got unexpected item type %v\n", i)
1283 case <-t.shutdownChan:
1286 case <-t.shutdownChan:
1292 // keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
1293 func (t *http2Client) keepalive() {
1294 p := &ping{data: [8]byte{}}
1295 timer := time.NewTimer(t.kp.Time)
1299 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1300 timer.Reset(t.kp.Time)
1303 // Check if keepalive should go dormant.
1305 if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1306 // Make awakenKeepalive writable.
1310 case <-t.awakenKeepalive:
1311 // If the control gets here a ping has been sent
1312 // need to reset the timer with keepalive.Timeout.
1313 case <-t.shutdownChan:
1322 // By the time control gets here a ping has been sent one way or the other.
1323 timer.Reset(t.kp.Timeout)
1326 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1327 timer.Reset(t.kp.Time)
1332 case <-t.shutdownChan:
1338 case <-t.shutdownChan:
1347 func (t *http2Client) Error() <-chan struct{} {
1351 func (t *http2Client) GoAway() <-chan struct{} {
1355 func (t *http2Client) notifyError(err error) {
1357 // make sure t.errorChan is closed only once.
1358 if t.state == draining {
1363 if t.state == reachable {
1364 t.state = unreachable
1366 infof("transport: http2Client.notifyError got notified that the client transport was broken %v.", err)