aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/google.golang.org/grpc/transport
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/transport')
-rw-r--r--vendor/google.golang.org/grpc/transport/bdp_estimator.go143
-rw-r--r--vendor/google.golang.org/grpc/transport/control.go246
-rw-r--r--vendor/google.golang.org/grpc/transport/go16.go45
-rw-r--r--vendor/google.golang.org/grpc/transport/go17.go46
-rw-r--r--vendor/google.golang.org/grpc/transport/handler_server.go393
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_client.go1369
-rw-r--r--vendor/google.golang.org/grpc/transport/http2_server.go1195
-rw-r--r--vendor/google.golang.org/grpc/transport/http_util.go597
-rw-r--r--vendor/google.golang.org/grpc/transport/log.go50
-rw-r--r--vendor/google.golang.org/grpc/transport/transport.go730
10 files changed, 4814 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/transport/bdp_estimator.go b/vendor/google.golang.org/grpc/transport/bdp_estimator.go
new file mode 100644
index 0000000..667edb8
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/bdp_estimator.go
@@ -0,0 +1,143 @@
1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "sync"
23 "time"
24)
25
26const (
27 // bdpLimit is the maximum value the flow control windows
28 // will be increased to.
29 bdpLimit = (1 << 20) * 4
30 // alpha is a constant factor used to keep a moving average
31 // of RTTs.
32 alpha = 0.9
33 // If the current bdp sample is greater than or equal to
34 // our beta * our estimated bdp and the current bandwidth
35 // sample is the maximum bandwidth observed so far, we
36 // increase our bbp estimate by a factor of gamma.
37 beta = 0.66
38 // To put our bdp to be smaller than or equal to twice the real BDP,
39 // we should multiply our current sample with 4/3, however to round things out
40 // we use 2 as the multiplication factor.
41 gamma = 2
42)
43
44var (
45 // Adding arbitrary data to ping so that its ack can be
46 // identified.
47 // Easter-egg: what does the ping message say?
48 bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}}
49)
50
51type bdpEstimator struct {
52 // sentAt is the time when the ping was sent.
53 sentAt time.Time
54
55 mu sync.Mutex
56 // bdp is the current bdp estimate.
57 bdp uint32
58 // sample is the number of bytes received in one measurement cycle.
59 sample uint32
60 // bwMax is the maximum bandwidth noted so far (bytes/sec).
61 bwMax float64
62 // bool to keep track of the begining of a new measurement cycle.
63 isSent bool
64 // Callback to update the window sizes.
65 updateFlowControl func(n uint32)
66 // sampleCount is the number of samples taken so far.
67 sampleCount uint64
68 // round trip time (seconds)
69 rtt float64
70}
71
72// timesnap registers the time bdp ping was sent out so that
73// network rtt can be calculated when its ack is recieved.
74// It is called (by controller) when the bdpPing is
75// being written on the wire.
76func (b *bdpEstimator) timesnap(d [8]byte) {
77 if bdpPing.data != d {
78 return
79 }
80 b.sentAt = time.Now()
81}
82
83// add adds bytes to the current sample for calculating bdp.
84// It returns true only if a ping must be sent. This can be used
85// by the caller (handleData) to make decision about batching
86// a window update with it.
87func (b *bdpEstimator) add(n uint32) bool {
88 b.mu.Lock()
89 defer b.mu.Unlock()
90 if b.bdp == bdpLimit {
91 return false
92 }
93 if !b.isSent {
94 b.isSent = true
95 b.sample = n
96 b.sentAt = time.Time{}
97 b.sampleCount++
98 return true
99 }
100 b.sample += n
101 return false
102}
103
104// calculate is called when an ack for a bdp ping is received.
105// Here we calculate the current bdp and bandwidth sample and
106// decide if the flow control windows should go up.
107func (b *bdpEstimator) calculate(d [8]byte) {
108 // Check if the ping acked for was the bdp ping.
109 if bdpPing.data != d {
110 return
111 }
112 b.mu.Lock()
113 rttSample := time.Since(b.sentAt).Seconds()
114 if b.sampleCount < 10 {
115 // Bootstrap rtt with an average of first 10 rtt samples.
116 b.rtt += (rttSample - b.rtt) / float64(b.sampleCount)
117 } else {
118 // Heed to the recent past more.
119 b.rtt += (rttSample - b.rtt) * float64(alpha)
120 }
121 b.isSent = false
122 // The number of bytes accumalated so far in the sample is smaller
123 // than or equal to 1.5 times the real BDP on a saturated connection.
124 bwCurrent := float64(b.sample) / (b.rtt * float64(1.5))
125 if bwCurrent > b.bwMax {
126 b.bwMax = bwCurrent
127 }
128 // If the current sample (which is smaller than or equal to the 1.5 times the real BDP) is
129 // greater than or equal to 2/3rd our perceived bdp AND this is the maximum bandwidth seen so far, we
130 // should update our perception of the network BDP.
131 if float64(b.sample) >= beta*float64(b.bdp) && bwCurrent == b.bwMax && b.bdp != bdpLimit {
132 sampleFloat := float64(b.sample)
133 b.bdp = uint32(gamma * sampleFloat)
134 if b.bdp > bdpLimit {
135 b.bdp = bdpLimit
136 }
137 bdp := b.bdp
138 b.mu.Unlock()
139 b.updateFlowControl(bdp)
140 return
141 }
142 b.mu.Unlock()
143}
diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go
new file mode 100644
index 0000000..501eb03
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/control.go
@@ -0,0 +1,246 @@
1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "fmt"
23 "math"
24 "sync"
25 "time"
26
27 "golang.org/x/net/http2"
28)
29
30const (
31 // The default value of flow control window size in HTTP2 spec.
32 defaultWindowSize = 65535
33 // The initial window size for flow control.
34 initialWindowSize = defaultWindowSize // for an RPC
35 infinity = time.Duration(math.MaxInt64)
36 defaultClientKeepaliveTime = infinity
37 defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
38 defaultMaxStreamsClient = 100
39 defaultMaxConnectionIdle = infinity
40 defaultMaxConnectionAge = infinity
41 defaultMaxConnectionAgeGrace = infinity
42 defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
43 defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
44 defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
45 // max window limit set by HTTP2 Specs.
46 maxWindowSize = math.MaxInt32
47)
48
49// The following defines various control items which could flow through
50// the control buffer of transport. They represent different aspects of
51// control tasks, e.g., flow control, settings, streaming resetting, etc.
52type windowUpdate struct {
53 streamID uint32
54 increment uint32
55 flush bool
56}
57
58func (*windowUpdate) item() {}
59
60type settings struct {
61 ack bool
62 ss []http2.Setting
63}
64
65func (*settings) item() {}
66
67type resetStream struct {
68 streamID uint32
69 code http2.ErrCode
70}
71
72func (*resetStream) item() {}
73
74type goAway struct {
75 code http2.ErrCode
76 debugData []byte
77 headsUp bool
78 closeConn bool
79}
80
81func (*goAway) item() {}
82
83type flushIO struct {
84}
85
86func (*flushIO) item() {}
87
88type ping struct {
89 ack bool
90 data [8]byte
91}
92
93func (*ping) item() {}
94
95// quotaPool is a pool which accumulates the quota and sends it to acquire()
96// when it is available.
97type quotaPool struct {
98 c chan int
99
100 mu sync.Mutex
101 quota int
102}
103
104// newQuotaPool creates a quotaPool which has quota q available to consume.
105func newQuotaPool(q int) *quotaPool {
106 qb := &quotaPool{
107 c: make(chan int, 1),
108 }
109 if q > 0 {
110 qb.c <- q
111 } else {
112 qb.quota = q
113 }
114 return qb
115}
116
117// add cancels the pending quota sent on acquired, incremented by v and sends
118// it back on acquire.
119func (qb *quotaPool) add(v int) {
120 qb.mu.Lock()
121 defer qb.mu.Unlock()
122 select {
123 case n := <-qb.c:
124 qb.quota += n
125 default:
126 }
127 qb.quota += v
128 if qb.quota <= 0 {
129 return
130 }
131 // After the pool has been created, this is the only place that sends on
132 // the channel. Since mu is held at this point and any quota that was sent
133 // on the channel has been retrieved, we know that this code will always
134 // place any positive quota value on the channel.
135 select {
136 case qb.c <- qb.quota:
137 qb.quota = 0
138 default:
139 }
140}
141
142// acquire returns the channel on which available quota amounts are sent.
143func (qb *quotaPool) acquire() <-chan int {
144 return qb.c
145}
146
147// inFlow deals with inbound flow control
148type inFlow struct {
149 mu sync.Mutex
150 // The inbound flow control limit for pending data.
151 limit uint32
152 // pendingData is the overall data which have been received but not been
153 // consumed by applications.
154 pendingData uint32
155 // The amount of data the application has consumed but grpc has not sent
156 // window update for them. Used to reduce window update frequency.
157 pendingUpdate uint32
158 // delta is the extra window update given by receiver when an application
159 // is reading data bigger in size than the inFlow limit.
160 delta uint32
161}
162
163// newLimit updates the inflow window to a new value n.
164// It assumes that n is always greater than the old limit.
165func (f *inFlow) newLimit(n uint32) uint32 {
166 f.mu.Lock()
167 defer f.mu.Unlock()
168 d := n - f.limit
169 f.limit = n
170 return d
171}
172
173func (f *inFlow) maybeAdjust(n uint32) uint32 {
174 if n > uint32(math.MaxInt32) {
175 n = uint32(math.MaxInt32)
176 }
177 f.mu.Lock()
178 defer f.mu.Unlock()
179 // estSenderQuota is the receiver's view of the maximum number of bytes the sender
180 // can send without a window update.
181 estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
182 // estUntransmittedData is the maximum number of bytes the sends might not have put
183 // on the wire yet. A value of 0 or less means that we have already received all or
184 // more bytes than the application is requesting to read.
185 estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
186 // This implies that unless we send a window update, the sender won't be able to send all the bytes
187 // for this message. Therefore we must send an update over the limit since there's an active read
188 // request from the application.
189 if estUntransmittedData > estSenderQuota {
190 // Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec.
191 if f.limit+n > maxWindowSize {
192 f.delta = maxWindowSize - f.limit
193 } else {
194 // Send a window update for the whole message and not just the difference between
195 // estUntransmittedData and estSenderQuota. This will be helpful in case the message
196 // is padded; We will fallback on the current available window(at least a 1/4th of the limit).
197 f.delta = n
198 }
199 return f.delta
200 }
201 return 0
202}
203
204// onData is invoked when some data frame is received. It updates pendingData.
205func (f *inFlow) onData(n uint32) error {
206 f.mu.Lock()
207 defer f.mu.Unlock()
208 f.pendingData += n
209 if f.pendingData+f.pendingUpdate > f.limit+f.delta {
210 return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
211 }
212 return nil
213}
214
215// onRead is invoked when the application reads the data. It returns the window size
216// to be sent to the peer.
217func (f *inFlow) onRead(n uint32) uint32 {
218 f.mu.Lock()
219 defer f.mu.Unlock()
220 if f.pendingData == 0 {
221 return 0
222 }
223 f.pendingData -= n
224 if n > f.delta {
225 n -= f.delta
226 f.delta = 0
227 } else {
228 f.delta -= n
229 n = 0
230 }
231 f.pendingUpdate += n
232 if f.pendingUpdate >= f.limit/4 {
233 wu := f.pendingUpdate
234 f.pendingUpdate = 0
235 return wu
236 }
237 return 0
238}
239
240func (f *inFlow) resetPendingUpdate() uint32 {
241 f.mu.Lock()
242 defer f.mu.Unlock()
243 n := f.pendingUpdate
244 f.pendingUpdate = 0
245 return n
246}
diff --git a/vendor/google.golang.org/grpc/transport/go16.go b/vendor/google.golang.org/grpc/transport/go16.go
new file mode 100644
index 0000000..7cffee1
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/go16.go
@@ -0,0 +1,45 @@
1// +build go1.6,!go1.7
2
3/*
4 *
5 * Copyright 2016 gRPC authors.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 */
20
21package transport
22
23import (
24 "net"
25
26 "google.golang.org/grpc/codes"
27
28 "golang.org/x/net/context"
29)
30
31// dialContext connects to the address on the named network.
32func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
33 return (&net.Dialer{Cancel: ctx.Done()}).Dial(network, address)
34}
35
36// ContextErr converts the error from context package into a StreamError.
37func ContextErr(err error) StreamError {
38 switch err {
39 case context.DeadlineExceeded:
40 return streamErrorf(codes.DeadlineExceeded, "%v", err)
41 case context.Canceled:
42 return streamErrorf(codes.Canceled, "%v", err)
43 }
44 return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err)
45}
diff --git a/vendor/google.golang.org/grpc/transport/go17.go b/vendor/google.golang.org/grpc/transport/go17.go
new file mode 100644
index 0000000..2464e69
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/go17.go
@@ -0,0 +1,46 @@
1// +build go1.7
2
3/*
4 *
5 * Copyright 2016 gRPC authors.
6 *
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 *
19 */
20
21package transport
22
23import (
24 "context"
25 "net"
26
27 "google.golang.org/grpc/codes"
28
29 netctx "golang.org/x/net/context"
30)
31
32// dialContext connects to the address on the named network.
33func dialContext(ctx context.Context, network, address string) (net.Conn, error) {
34 return (&net.Dialer{}).DialContext(ctx, network, address)
35}
36
37// ContextErr converts the error from context package into a StreamError.
38func ContextErr(err error) StreamError {
39 switch err {
40 case context.DeadlineExceeded, netctx.DeadlineExceeded:
41 return streamErrorf(codes.DeadlineExceeded, "%v", err)
42 case context.Canceled, netctx.Canceled:
43 return streamErrorf(codes.Canceled, "%v", err)
44 }
45 return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err)
46}
diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go
new file mode 100644
index 0000000..27372b5
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/handler_server.go
@@ -0,0 +1,393 @@
1/*
2 *
3 * Copyright 2016 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// This file is the implementation of a gRPC server using HTTP/2 which
20// uses the standard Go http2 Server implementation (via the
21// http.Handler interface), rather than speaking low-level HTTP/2
22// frames itself. It is the implementation of *grpc.Server.ServeHTTP.
23
24package transport
25
26import (
27 "errors"
28 "fmt"
29 "io"
30 "net"
31 "net/http"
32 "strings"
33 "sync"
34 "time"
35
36 "golang.org/x/net/context"
37 "golang.org/x/net/http2"
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/credentials"
40 "google.golang.org/grpc/metadata"
41 "google.golang.org/grpc/peer"
42 "google.golang.org/grpc/status"
43)
44
45// NewServerHandlerTransport returns a ServerTransport handling gRPC
46// from inside an http.Handler. It requires that the http Server
47// supports HTTP/2.
48func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTransport, error) {
49 if r.ProtoMajor != 2 {
50 return nil, errors.New("gRPC requires HTTP/2")
51 }
52 if r.Method != "POST" {
53 return nil, errors.New("invalid gRPC request method")
54 }
55 if !validContentType(r.Header.Get("Content-Type")) {
56 return nil, errors.New("invalid gRPC request content-type")
57 }
58 if _, ok := w.(http.Flusher); !ok {
59 return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher")
60 }
61 if _, ok := w.(http.CloseNotifier); !ok {
62 return nil, errors.New("gRPC requires a ResponseWriter supporting http.CloseNotifier")
63 }
64
65 st := &serverHandlerTransport{
66 rw: w,
67 req: r,
68 closedCh: make(chan struct{}),
69 writes: make(chan func()),
70 }
71
72 if v := r.Header.Get("grpc-timeout"); v != "" {
73 to, err := decodeTimeout(v)
74 if err != nil {
75 return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err)
76 }
77 st.timeoutSet = true
78 st.timeout = to
79 }
80
81 var metakv []string
82 if r.Host != "" {
83 metakv = append(metakv, ":authority", r.Host)
84 }
85 for k, vv := range r.Header {
86 k = strings.ToLower(k)
87 if isReservedHeader(k) && !isWhitelistedPseudoHeader(k) {
88 continue
89 }
90 for _, v := range vv {
91 v, err := decodeMetadataHeader(k, v)
92 if err != nil {
93 return nil, streamErrorf(codes.InvalidArgument, "malformed binary metadata: %v", err)
94 }
95 metakv = append(metakv, k, v)
96 }
97 }
98 st.headerMD = metadata.Pairs(metakv...)
99
100 return st, nil
101}
102
103// serverHandlerTransport is an implementation of ServerTransport
104// which replies to exactly one gRPC request (exactly one HTTP request),
105// using the net/http.Handler interface. This http.Handler is guaranteed
106// at this point to be speaking over HTTP/2, so it's able to speak valid
107// gRPC.
108type serverHandlerTransport struct {
109 rw http.ResponseWriter
110 req *http.Request
111 timeoutSet bool
112 timeout time.Duration
113 didCommonHeaders bool
114
115 headerMD metadata.MD
116
117 closeOnce sync.Once
118 closedCh chan struct{} // closed on Close
119
120 // writes is a channel of code to run serialized in the
121 // ServeHTTP (HandleStreams) goroutine. The channel is closed
122 // when WriteStatus is called.
123 writes chan func()
124}
125
126func (ht *serverHandlerTransport) Close() error {
127 ht.closeOnce.Do(ht.closeCloseChanOnce)
128 return nil
129}
130
131func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) }
132
133func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) }
134
135// strAddr is a net.Addr backed by either a TCP "ip:port" string, or
136// the empty string if unknown.
137type strAddr string
138
139func (a strAddr) Network() string {
140 if a != "" {
141 // Per the documentation on net/http.Request.RemoteAddr, if this is
142 // set, it's set to the IP:port of the peer (hence, TCP):
143 // https://golang.org/pkg/net/http/#Request
144 //
145 // If we want to support Unix sockets later, we can
146 // add our own grpc-specific convention within the
147 // grpc codebase to set RemoteAddr to a different
148 // format, or probably better: we can attach it to the
149 // context and use that from serverHandlerTransport.RemoteAddr.
150 return "tcp"
151 }
152 return ""
153}
154
155func (a strAddr) String() string { return string(a) }
156
157// do runs fn in the ServeHTTP goroutine.
158func (ht *serverHandlerTransport) do(fn func()) error {
159 // Avoid a panic writing to closed channel. Imperfect but maybe good enough.
160 select {
161 case <-ht.closedCh:
162 return ErrConnClosing
163 default:
164 select {
165 case ht.writes <- fn:
166 return nil
167 case <-ht.closedCh:
168 return ErrConnClosing
169 }
170
171 }
172}
173
174func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
175 err := ht.do(func() {
176 ht.writeCommonHeaders(s)
177
178 // And flush, in case no header or body has been sent yet.
179 // This forces a separation of headers and trailers if this is the
180 // first call (for example, in end2end tests's TestNoService).
181 ht.rw.(http.Flusher).Flush()
182
183 h := ht.rw.Header()
184 h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
185 if m := st.Message(); m != "" {
186 h.Set("Grpc-Message", encodeGrpcMessage(m))
187 }
188
189 // TODO: Support Grpc-Status-Details-Bin
190
191 if md := s.Trailer(); len(md) > 0 {
192 for k, vv := range md {
193 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
194 if isReservedHeader(k) {
195 continue
196 }
197 for _, v := range vv {
198 // http2 ResponseWriter mechanism to send undeclared Trailers after
199 // the headers have possibly been written.
200 h.Add(http2.TrailerPrefix+k, encodeMetadataHeader(k, v))
201 }
202 }
203 }
204 })
205 close(ht.writes)
206 return err
207}
208
209// writeCommonHeaders sets common headers on the first write
210// call (Write, WriteHeader, or WriteStatus).
211func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
212 if ht.didCommonHeaders {
213 return
214 }
215 ht.didCommonHeaders = true
216
217 h := ht.rw.Header()
218 h["Date"] = nil // suppress Date to make tests happy; TODO: restore
219 h.Set("Content-Type", "application/grpc")
220
221 // Predeclare trailers we'll set later in WriteStatus (after the body).
222 // This is a SHOULD in the HTTP RFC, and the way you add (known)
223 // Trailers per the net/http.ResponseWriter contract.
224 // See https://golang.org/pkg/net/http/#ResponseWriter
225 // and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers
226 h.Add("Trailer", "Grpc-Status")
227 h.Add("Trailer", "Grpc-Message")
228 // TODO: Support Grpc-Status-Details-Bin
229
230 if s.sendCompress != "" {
231 h.Set("Grpc-Encoding", s.sendCompress)
232 }
233}
234
235func (ht *serverHandlerTransport) Write(s *Stream, data []byte, opts *Options) error {
236 return ht.do(func() {
237 ht.writeCommonHeaders(s)
238 ht.rw.Write(data)
239 if !opts.Delay {
240 ht.rw.(http.Flusher).Flush()
241 }
242 })
243}
244
245func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
246 return ht.do(func() {
247 ht.writeCommonHeaders(s)
248 h := ht.rw.Header()
249 for k, vv := range md {
250 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
251 if isReservedHeader(k) {
252 continue
253 }
254 for _, v := range vv {
255 v = encodeMetadataHeader(k, v)
256 h.Add(k, v)
257 }
258 }
259 ht.rw.WriteHeader(200)
260 ht.rw.(http.Flusher).Flush()
261 })
262}
263
264func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) {
265 // With this transport type there will be exactly 1 stream: this HTTP request.
266
267 var ctx context.Context
268 var cancel context.CancelFunc
269 if ht.timeoutSet {
270 ctx, cancel = context.WithTimeout(context.Background(), ht.timeout)
271 } else {
272 ctx, cancel = context.WithCancel(context.Background())
273 }
274
275 // requestOver is closed when either the request's context is done
276 // or the status has been written via WriteStatus.
277 requestOver := make(chan struct{})
278
279 // clientGone receives a single value if peer is gone, either
280 // because the underlying connection is dead or because the
281 // peer sends an http2 RST_STREAM.
282 clientGone := ht.rw.(http.CloseNotifier).CloseNotify()
283 go func() {
284 select {
285 case <-requestOver:
286 return
287 case <-ht.closedCh:
288 case <-clientGone:
289 }
290 cancel()
291 }()
292
293 req := ht.req
294
295 s := &Stream{
296 id: 0, // irrelevant
297 requestRead: func(int) {},
298 cancel: cancel,
299 buf: newRecvBuffer(),
300 st: ht,
301 method: req.URL.Path,
302 recvCompress: req.Header.Get("grpc-encoding"),
303 }
304 pr := &peer.Peer{
305 Addr: ht.RemoteAddr(),
306 }
307 if req.TLS != nil {
308 pr.AuthInfo = credentials.TLSInfo{State: *req.TLS}
309 }
310 ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
311 ctx = peer.NewContext(ctx, pr)
312 s.ctx = newContextWithStream(ctx, s)
313 s.trReader = &transportReader{
314 reader: &recvBufferReader{ctx: s.ctx, recv: s.buf},
315 windowHandler: func(int) {},
316 }
317
318 // readerDone is closed when the Body.Read-ing goroutine exits.
319 readerDone := make(chan struct{})
320 go func() {
321 defer close(readerDone)
322
323 // TODO: minimize garbage, optimize recvBuffer code/ownership
324 const readSize = 8196
325 for buf := make([]byte, readSize); ; {
326 n, err := req.Body.Read(buf)
327 if n > 0 {
328 s.buf.put(recvMsg{data: buf[:n:n]})
329 buf = buf[n:]
330 }
331 if err != nil {
332 s.buf.put(recvMsg{err: mapRecvMsgError(err)})
333 return
334 }
335 if len(buf) == 0 {
336 buf = make([]byte, readSize)
337 }
338 }
339 }()
340
341 // startStream is provided by the *grpc.Server's serveStreams.
342 // It starts a goroutine serving s and exits immediately.
343 // The goroutine that is started is the one that then calls
344 // into ht, calling WriteHeader, Write, WriteStatus, Close, etc.
345 startStream(s)
346
347 ht.runStream()
348 close(requestOver)
349
350 // Wait for reading goroutine to finish.
351 req.Body.Close()
352 <-readerDone
353}
354
355func (ht *serverHandlerTransport) runStream() {
356 for {
357 select {
358 case fn, ok := <-ht.writes:
359 if !ok {
360 return
361 }
362 fn()
363 case <-ht.closedCh:
364 return
365 }
366 }
367}
368
369func (ht *serverHandlerTransport) Drain() {
370 panic("Drain() is not implemented")
371}
372
373// mapRecvMsgError returns the non-nil err into the appropriate
374// error value as expected by callers of *grpc.parser.recvMsg.
375// In particular, in can only be:
376// * io.EOF
377// * io.ErrUnexpectedEOF
378// * of type transport.ConnectionError
379// * of type transport.StreamError
380func mapRecvMsgError(err error) error {
381 if err == io.EOF || err == io.ErrUnexpectedEOF {
382 return err
383 }
384 if se, ok := err.(http2.StreamError); ok {
385 if code, ok := http2ErrConvTab[se.Code]; ok {
386 return StreamError{
387 Code: code,
388 Desc: se.Error(),
389 }
390 }
391 }
392 return connectionErrorf(true, err, err.Error())
393}
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go
new file mode 100644
index 0000000..516ea06
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/http2_client.go
@@ -0,0 +1,1369 @@
1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "io"
24 "math"
25 "net"
26 "strings"
27 "sync"
28 "sync/atomic"
29 "time"
30
31 "golang.org/x/net/context"
32 "golang.org/x/net/http2"
33 "golang.org/x/net/http2/hpack"
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/credentials"
36 "google.golang.org/grpc/keepalive"
37 "google.golang.org/grpc/metadata"
38 "google.golang.org/grpc/peer"
39 "google.golang.org/grpc/stats"
40 "google.golang.org/grpc/status"
41)
42
43// http2Client implements the ClientTransport interface with HTTP2.
44type http2Client struct {
45 ctx context.Context
46 target string // server name/addr
47 userAgent string
48 md interface{}
49 conn net.Conn // underlying communication channel
50 remoteAddr net.Addr
51 localAddr net.Addr
52 authInfo credentials.AuthInfo // auth info about the connection
53 nextID uint32 // the next stream ID to be used
54
55 // writableChan synchronizes write access to the transport.
56 // A writer acquires the write lock by sending a value on writableChan
57 // and releases it by receiving from writableChan.
58 writableChan chan int
59 // shutdownChan is closed when Close is called.
60 // Blocking operations should select on shutdownChan to avoid
61 // blocking forever after Close.
62 // TODO(zhaoq): Maybe have a channel context?
63 shutdownChan chan struct{}
64 // errorChan is closed to notify the I/O error to the caller.
65 errorChan chan struct{}
66 // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
67 // that the server sent GoAway on this transport.
68 goAway chan struct{}
69 // awakenKeepalive is used to wake up keepalive when after it has gone dormant.
70 awakenKeepalive chan struct{}
71
72 framer *framer
73 hBuf *bytes.Buffer // the buffer for HPACK encoding
74 hEnc *hpack.Encoder // HPACK encoder
75
76 // controlBuf delivers all the control related tasks (e.g., window
77 // updates, reset streams, and various settings) to the controller.
78 controlBuf *controlBuffer
79 fc *inFlow
80 // sendQuotaPool provides flow control to outbound message.
81 sendQuotaPool *quotaPool
82 // streamsQuota limits the max number of concurrent streams.
83 streamsQuota *quotaPool
84
85 // The scheme used: https if TLS is on, http otherwise.
86 scheme string
87
88 isSecure bool
89
90 creds []credentials.PerRPCCredentials
91
92 // Boolean to keep track of reading activity on transport.
93 // 1 is true and 0 is false.
94 activity uint32 // Accessed atomically.
95 kp keepalive.ClientParameters
96
97 statsHandler stats.Handler
98
99 initialWindowSize int32
100
101 bdpEst *bdpEstimator
102 outQuotaVersion uint32
103
104 mu sync.Mutex // guard the following variables
105 state transportState // the state of underlying connection
106 activeStreams map[uint32]*Stream
107 // The max number of concurrent streams
108 maxStreams int
109 // the per-stream outbound flow control window size set by the peer.
110 streamSendQuota uint32
111 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
112 prevGoAwayID uint32
113 // goAwayReason records the http2.ErrCode and debug data received with the
114 // GoAway frame.
115 goAwayReason GoAwayReason
116}
117
118func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
119 if fn != nil {
120 return fn(ctx, addr)
121 }
122 return dialContext(ctx, "tcp", addr)
123}
124
125func isTemporary(err error) bool {
126 switch err {
127 case io.EOF:
128 // Connection closures may be resolved upon retry, and are thus
129 // treated as temporary.
130 return true
131 case context.DeadlineExceeded:
132 // In Go 1.7, context.DeadlineExceeded implements Timeout(), and this
133 // special case is not needed. Until then, we need to keep this
134 // clause.
135 return true
136 }
137
138 switch err := err.(type) {
139 case interface {
140 Temporary() bool
141 }:
142 return err.Temporary()
143 case interface {
144 Timeout() bool
145 }:
146 // Timeouts may be resolved upon retry, and are thus treated as
147 // temporary.
148 return err.Timeout()
149 }
150 return false
151}
152
153// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
154// and starts to receive messages on it. Non-nil error returns if construction
155// fails.
156func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) {
157 scheme := "http"
158 conn, err := dial(ctx, opts.Dialer, addr.Addr)
159 if err != nil {
160 if opts.FailOnNonTempDialError {
161 return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
162 }
163 return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
164 }
165 // Any further errors will close the underlying connection
166 defer func(conn net.Conn) {
167 if err != nil {
168 conn.Close()
169 }
170 }(conn)
171 var (
172 isSecure bool
173 authInfo credentials.AuthInfo
174 )
175 if creds := opts.TransportCredentials; creds != nil {
176 scheme = "https"
177 conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn)
178 if err != nil {
179 // Credentials handshake errors are typically considered permanent
180 // to avoid retrying on e.g. bad certificates.
181 temp := isTemporary(err)
182 return nil, connectionErrorf(temp, err, "transport: authentication handshake failed: %v", err)
183 }
184 isSecure = true
185 }
186 kp := opts.KeepaliveParams
187 // Validate keepalive parameters.
188 if kp.Time == 0 {
189 kp.Time = defaultClientKeepaliveTime
190 }
191 if kp.Timeout == 0 {
192 kp.Timeout = defaultClientKeepaliveTimeout
193 }
194 dynamicWindow := true
195 icwz := int32(initialWindowSize)
196 if opts.InitialConnWindowSize >= defaultWindowSize {
197 icwz = opts.InitialConnWindowSize
198 dynamicWindow = false
199 }
200 var buf bytes.Buffer
201 t := &http2Client{
202 ctx: ctx,
203 target: addr.Addr,
204 userAgent: opts.UserAgent,
205 md: addr.Metadata,
206 conn: conn,
207 remoteAddr: conn.RemoteAddr(),
208 localAddr: conn.LocalAddr(),
209 authInfo: authInfo,
210 // The client initiated stream id is odd starting from 1.
211 nextID: 1,
212 writableChan: make(chan int, 1),
213 shutdownChan: make(chan struct{}),
214 errorChan: make(chan struct{}),
215 goAway: make(chan struct{}),
216 awakenKeepalive: make(chan struct{}, 1),
217 framer: newFramer(conn),
218 hBuf: &buf,
219 hEnc: hpack.NewEncoder(&buf),
220 controlBuf: newControlBuffer(),
221 fc: &inFlow{limit: uint32(icwz)},
222 sendQuotaPool: newQuotaPool(defaultWindowSize),
223 scheme: scheme,
224 state: reachable,
225 activeStreams: make(map[uint32]*Stream),
226 isSecure: isSecure,
227 creds: opts.PerRPCCredentials,
228 maxStreams: defaultMaxStreamsClient,
229 streamsQuota: newQuotaPool(defaultMaxStreamsClient),
230 streamSendQuota: defaultWindowSize,
231 kp: kp,
232 statsHandler: opts.StatsHandler,
233 initialWindowSize: initialWindowSize,
234 }
235 if opts.InitialWindowSize >= defaultWindowSize {
236 t.initialWindowSize = opts.InitialWindowSize
237 dynamicWindow = false
238 }
239 if dynamicWindow {
240 t.bdpEst = &bdpEstimator{
241 bdp: initialWindowSize,
242 updateFlowControl: t.updateFlowControl,
243 }
244 }
245 // Make sure awakenKeepalive can't be written upon.
246 // keepalive routine will make it writable, if need be.
247 t.awakenKeepalive <- struct{}{}
248 if t.statsHandler != nil {
249 t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
250 RemoteAddr: t.remoteAddr,
251 LocalAddr: t.localAddr,
252 })
253 connBegin := &stats.ConnBegin{
254 Client: true,
255 }
256 t.statsHandler.HandleConn(t.ctx, connBegin)
257 }
258 // Start the reader goroutine for incoming message. Each transport has
259 // a dedicated goroutine which reads HTTP2 frame from network. Then it
260 // dispatches the frame to the corresponding stream entity.
261 go t.reader()
262 // Send connection preface to server.
263 n, err := t.conn.Write(clientPreface)
264 if err != nil {
265 t.Close()
266 return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
267 }
268 if n != len(clientPreface) {
269 t.Close()
270 return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
271 }
272 if t.initialWindowSize != defaultWindowSize {
273 err = t.framer.writeSettings(true, http2.Setting{
274 ID: http2.SettingInitialWindowSize,
275 Val: uint32(t.initialWindowSize),
276 })
277 } else {
278 err = t.framer.writeSettings(true)
279 }
280 if err != nil {
281 t.Close()
282 return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
283 }
284 // Adjust the connection flow control window if needed.
285 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
286 if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil {
287 t.Close()
288 return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
289 }
290 }
291 go t.controller()
292 if t.kp.Time != infinity {
293 go t.keepalive()
294 }
295 t.writableChan <- 0
296 return t, nil
297}
298
299func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
300 // TODO(zhaoq): Handle uint32 overflow of Stream.id.
301 s := &Stream{
302 id: t.nextID,
303 done: make(chan struct{}),
304 goAway: make(chan struct{}),
305 method: callHdr.Method,
306 sendCompress: callHdr.SendCompress,
307 buf: newRecvBuffer(),
308 fc: &inFlow{limit: uint32(t.initialWindowSize)},
309 sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
310 headerChan: make(chan struct{}),
311 }
312 t.nextID += 2
313 s.requestRead = func(n int) {
314 t.adjustWindow(s, uint32(n))
315 }
316 // The client side stream context should have exactly the same life cycle with the user provided context.
317 // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
318 // So we use the original context here instead of creating a copy.
319 s.ctx = ctx
320 s.trReader = &transportReader{
321 reader: &recvBufferReader{
322 ctx: s.ctx,
323 goAway: s.goAway,
324 recv: s.buf,
325 },
326 windowHandler: func(n int) {
327 t.updateWindow(s, uint32(n))
328 },
329 }
330
331 return s
332}
333
334// NewStream creates a stream and registers it into the transport as "active"
335// streams.
336func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
337 pr := &peer.Peer{
338 Addr: t.remoteAddr,
339 }
340 // Attach Auth info if there is any.
341 if t.authInfo != nil {
342 pr.AuthInfo = t.authInfo
343 }
344 ctx = peer.NewContext(ctx, pr)
345 var (
346 authData = make(map[string]string)
347 audience string
348 )
349 // Create an audience string only if needed.
350 if len(t.creds) > 0 || callHdr.Creds != nil {
351 // Construct URI required to get auth request metadata.
352 var port string
353 if pos := strings.LastIndex(t.target, ":"); pos != -1 {
354 // Omit port if it is the default one.
355 if t.target[pos+1:] != "443" {
356 port = ":" + t.target[pos+1:]
357 }
358 }
359 pos := strings.LastIndex(callHdr.Method, "/")
360 if pos == -1 {
361 pos = len(callHdr.Method)
362 }
363 audience = "https://" + callHdr.Host + port + callHdr.Method[:pos]
364 }
365 for _, c := range t.creds {
366 data, err := c.GetRequestMetadata(ctx, audience)
367 if err != nil {
368 return nil, streamErrorf(codes.Internal, "transport: %v", err)
369 }
370 for k, v := range data {
371 // Capital header names are illegal in HTTP/2.
372 k = strings.ToLower(k)
373 authData[k] = v
374 }
375 }
376 callAuthData := make(map[string]string)
377 // Check if credentials.PerRPCCredentials were provided via call options.
378 // Note: if these credentials are provided both via dial options and call
379 // options, then both sets of credentials will be applied.
380 if callCreds := callHdr.Creds; callCreds != nil {
381 if !t.isSecure && callCreds.RequireTransportSecurity() {
382 return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure conneciton")
383 }
384 data, err := callCreds.GetRequestMetadata(ctx, audience)
385 if err != nil {
386 return nil, streamErrorf(codes.Internal, "transport: %v", err)
387 }
388 for k, v := range data {
389 // Capital header names are illegal in HTTP/2
390 k = strings.ToLower(k)
391 callAuthData[k] = v
392 }
393 }
394 t.mu.Lock()
395 if t.activeStreams == nil {
396 t.mu.Unlock()
397 return nil, ErrConnClosing
398 }
399 if t.state == draining {
400 t.mu.Unlock()
401 return nil, ErrStreamDrain
402 }
403 if t.state != reachable {
404 t.mu.Unlock()
405 return nil, ErrConnClosing
406 }
407 t.mu.Unlock()
408 sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire())
409 if err != nil {
410 return nil, err
411 }
412 // Returns the quota balance back.
413 if sq > 1 {
414 t.streamsQuota.add(sq - 1)
415 }
416 if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
417 // Return the quota back now because there is no stream returned to the caller.
418 if _, ok := err.(StreamError); ok {
419 t.streamsQuota.add(1)
420 }
421 return nil, err
422 }
423 t.mu.Lock()
424 if t.state == draining {
425 t.mu.Unlock()
426 t.streamsQuota.add(1)
427 // Need to make t writable again so that the rpc in flight can still proceed.
428 t.writableChan <- 0
429 return nil, ErrStreamDrain
430 }
431 if t.state != reachable {
432 t.mu.Unlock()
433 return nil, ErrConnClosing
434 }
435 s := t.newStream(ctx, callHdr)
436 t.activeStreams[s.id] = s
437 // If the number of active streams change from 0 to 1, then check if keepalive
438 // has gone dormant. If so, wake it up.
439 if len(t.activeStreams) == 1 {
440 select {
441 case t.awakenKeepalive <- struct{}{}:
442 t.framer.writePing(false, false, [8]byte{})
443 default:
444 }
445 }
446
447 t.mu.Unlock()
448
449 // HPACK encodes various headers. Note that once WriteField(...) is
450 // called, the corresponding headers/continuation frame has to be sent
451 // because hpack.Encoder is stateful.
452 t.hBuf.Reset()
453 t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"})
454 t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme})
455 t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method})
456 t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
457 t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
458 t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
459 t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"})
460
461 if callHdr.SendCompress != "" {
462 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
463 }
464 if dl, ok := ctx.Deadline(); ok {
465 // Send out timeout regardless its value. The server can detect timeout context by itself.
466 timeout := dl.Sub(time.Now())
467 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
468 }
469
470 for k, v := range authData {
471 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
472 }
473 for k, v := range callAuthData {
474 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
475 }
476 var (
477 endHeaders bool
478 )
479 if md, ok := metadata.FromOutgoingContext(ctx); ok {
480 for k, vv := range md {
481 // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
482 if isReservedHeader(k) {
483 continue
484 }
485 for _, v := range vv {
486 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
487 }
488 }
489 }
490 if md, ok := t.md.(*metadata.MD); ok {
491 for k, vv := range *md {
492 if isReservedHeader(k) {
493 continue
494 }
495 for _, v := range vv {
496 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
497 }
498 }
499 }
500 first := true
501 bufLen := t.hBuf.Len()
502 // Sends the headers in a single batch even when they span multiple frames.
503 for !endHeaders {
504 size := t.hBuf.Len()
505 if size > http2MaxFrameLen {
506 size = http2MaxFrameLen
507 } else {
508 endHeaders = true
509 }
510 var flush bool
511 if callHdr.Flush && endHeaders {
512 flush = true
513 }
514 if first {
515 // Sends a HeadersFrame to server to start a new stream.
516 p := http2.HeadersFrameParam{
517 StreamID: s.id,
518 BlockFragment: t.hBuf.Next(size),
519 EndStream: false,
520 EndHeaders: endHeaders,
521 }
522 // Do a force flush for the buffered frames iff it is the last headers frame
523 // and there is header metadata to be sent. Otherwise, there is flushing until
524 // the corresponding data frame is written.
525 err = t.framer.writeHeaders(flush, p)
526 first = false
527 } else {
528 // Sends Continuation frames for the leftover headers.
529 err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size))
530 }
531 if err != nil {
532 t.notifyError(err)
533 return nil, connectionErrorf(true, err, "transport: %v", err)
534 }
535 }
536 s.mu.Lock()
537 s.bytesSent = true
538 s.mu.Unlock()
539
540 if t.statsHandler != nil {
541 outHeader := &stats.OutHeader{
542 Client: true,
543 WireLength: bufLen,
544 FullMethod: callHdr.Method,
545 RemoteAddr: t.remoteAddr,
546 LocalAddr: t.localAddr,
547 Compression: callHdr.SendCompress,
548 }
549 t.statsHandler.HandleRPC(s.ctx, outHeader)
550 }
551 t.writableChan <- 0
552 return s, nil
553}
554
555// CloseStream clears the footprint of a stream when the stream is not needed any more.
556// This must not be executed in reader's goroutine.
557func (t *http2Client) CloseStream(s *Stream, err error) {
558 t.mu.Lock()
559 if t.activeStreams == nil {
560 t.mu.Unlock()
561 return
562 }
563 if err != nil {
564 // notify in-flight streams, before the deletion
565 s.write(recvMsg{err: err})
566 }
567 delete(t.activeStreams, s.id)
568 if t.state == draining && len(t.activeStreams) == 0 {
569 // The transport is draining and s is the last live stream on t.
570 t.mu.Unlock()
571 t.Close()
572 return
573 }
574 t.mu.Unlock()
575 // rstStream is true in case the stream is being closed at the client-side
576 // and the server needs to be intimated about it by sending a RST_STREAM
577 // frame.
578 // To make sure this frame is written to the wire before the headers of the
579 // next stream waiting for streamsQuota, we add to streamsQuota pool only
580 // after having acquired the writableChan to send RST_STREAM out (look at
581 // the controller() routine).
582 var rstStream bool
583 var rstError http2.ErrCode
584 defer func() {
585 // In case, the client doesn't have to send RST_STREAM to server
586 // we can safely add back to streamsQuota pool now.
587 if !rstStream {
588 t.streamsQuota.add(1)
589 return
590 }
591 t.controlBuf.put(&resetStream{s.id, rstError})
592 }()
593 s.mu.Lock()
594 rstStream = s.rstStream
595 rstError = s.rstError
596 if s.state == streamDone {
597 s.mu.Unlock()
598 return
599 }
600 if !s.headerDone {
601 close(s.headerChan)
602 s.headerDone = true
603 }
604 s.state = streamDone
605 s.mu.Unlock()
606 if _, ok := err.(StreamError); ok {
607 rstStream = true
608 rstError = http2.ErrCodeCancel
609 }
610}
611
612// Close kicks off the shutdown process of the transport. This should be called
613// only once on a transport. Once it is called, the transport should not be
614// accessed any more.
615func (t *http2Client) Close() (err error) {
616 t.mu.Lock()
617 if t.state == closing {
618 t.mu.Unlock()
619 return
620 }
621 if t.state == reachable || t.state == draining {
622 close(t.errorChan)
623 }
624 t.state = closing
625 t.mu.Unlock()
626 close(t.shutdownChan)
627 err = t.conn.Close()
628 t.mu.Lock()
629 streams := t.activeStreams
630 t.activeStreams = nil
631 t.mu.Unlock()
632 // Notify all active streams.
633 for _, s := range streams {
634 s.mu.Lock()
635 if !s.headerDone {
636 close(s.headerChan)
637 s.headerDone = true
638 }
639 s.mu.Unlock()
640 s.write(recvMsg{err: ErrConnClosing})
641 }
642 if t.statsHandler != nil {
643 connEnd := &stats.ConnEnd{
644 Client: true,
645 }
646 t.statsHandler.HandleConn(t.ctx, connEnd)
647 }
648 return
649}
650
651func (t *http2Client) GracefulClose() error {
652 t.mu.Lock()
653 switch t.state {
654 case unreachable:
655 // The server may close the connection concurrently. t is not available for
656 // any streams. Close it now.
657 t.mu.Unlock()
658 t.Close()
659 return nil
660 case closing:
661 t.mu.Unlock()
662 return nil
663 }
664 if t.state == draining {
665 t.mu.Unlock()
666 return nil
667 }
668 t.state = draining
669 active := len(t.activeStreams)
670 t.mu.Unlock()
671 if active == 0 {
672 return t.Close()
673 }
674 return nil
675}
676
677// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
678// should proceed only if Write returns nil.
679// TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later
680// if it improves the performance.
681func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
682 r := bytes.NewBuffer(data)
683 var (
684 p []byte
685 oqv uint32
686 )
687 for {
688 oqv = atomic.LoadUint32(&t.outQuotaVersion)
689 if r.Len() > 0 || p != nil {
690 size := http2MaxFrameLen
691 // Wait until the stream has some quota to send the data.
692 sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire())
693 if err != nil {
694 return err
695 }
696 // Wait until the transport has some quota to send the data.
697 tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire())
698 if err != nil {
699 return err
700 }
701 if sq < size {
702 size = sq
703 }
704 if tq < size {
705 size = tq
706 }
707 if p == nil {
708 p = r.Next(size)
709 }
710 ps := len(p)
711 if ps < sq {
712 // Overbooked stream quota. Return it back.
713 s.sendQuotaPool.add(sq - ps)
714 }
715 if ps < tq {
716 // Overbooked transport quota. Return it back.
717 t.sendQuotaPool.add(tq - ps)
718 }
719 }
720 var (
721 endStream bool
722 forceFlush bool
723 )
724 if opts.Last && r.Len() == 0 {
725 endStream = true
726 }
727 // Indicate there is a writer who is about to write a data frame.
728 t.framer.adjustNumWriters(1)
729 // Got some quota. Try to acquire writing privilege on the transport.
730 if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil {
731 if _, ok := err.(StreamError); ok || err == io.EOF {
732 // Return the connection quota back.
733 t.sendQuotaPool.add(len(p))
734 }
735 if t.framer.adjustNumWriters(-1) == 0 {
736 // This writer is the last one in this batch and has the
737 // responsibility to flush the buffered frames. It queues
738 // a flush request to controlBuf instead of flushing directly
739 // in order to avoid the race with other writing or flushing.
740 t.controlBuf.put(&flushIO{})
741 }
742 return err
743 }
744 select {
745 case <-s.ctx.Done():
746 t.sendQuotaPool.add(len(p))
747 if t.framer.adjustNumWriters(-1) == 0 {
748 t.controlBuf.put(&flushIO{})
749 }
750 t.writableChan <- 0
751 return ContextErr(s.ctx.Err())
752 default:
753 }
754 if oqv != atomic.LoadUint32(&t.outQuotaVersion) {
755 // InitialWindowSize settings frame must have been received after we
756 // acquired send quota but before we got the writable channel.
757 // We must forsake this write.
758 t.sendQuotaPool.add(len(p))
759 s.sendQuotaPool.add(len(p))
760 if t.framer.adjustNumWriters(-1) == 0 {
761 t.controlBuf.put(&flushIO{})
762 }
763 t.writableChan <- 0
764 continue
765 }
766 if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 {
767 // Do a force flush iff this is last frame for the entire gRPC message
768 // and the caller is the only writer at this moment.
769 forceFlush = true
770 }
771 // If WriteData fails, all the pending streams will be handled
772 // by http2Client.Close(). No explicit CloseStream() needs to be
773 // invoked.
774 if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil {
775 t.notifyError(err)
776 return connectionErrorf(true, err, "transport: %v", err)
777 }
778 p = nil
779 if t.framer.adjustNumWriters(-1) == 0 {
780 t.framer.flushWrite()
781 }
782 t.writableChan <- 0
783 if r.Len() == 0 {
784 break
785 }
786 }
787 if !opts.Last {
788 return nil
789 }
790 s.mu.Lock()
791 if s.state != streamDone {
792 s.state = streamWriteDone
793 }
794 s.mu.Unlock()
795 return nil
796}
797
798func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
799 t.mu.Lock()
800 defer t.mu.Unlock()
801 s, ok := t.activeStreams[f.Header().StreamID]
802 return s, ok
803}
804
805// adjustWindow sends out extra window update over the initial window size
806// of stream if the application is requesting data larger in size than
807// the window.
808func (t *http2Client) adjustWindow(s *Stream, n uint32) {
809 s.mu.Lock()
810 defer s.mu.Unlock()
811 if s.state == streamDone {
812 return
813 }
814 if w := s.fc.maybeAdjust(n); w > 0 {
815 // Piggyback conneciton's window update along.
816 if cw := t.fc.resetPendingUpdate(); cw > 0 {
817 t.controlBuf.put(&windowUpdate{0, cw, false})
818 }
819 t.controlBuf.put(&windowUpdate{s.id, w, true})
820 }
821}
822
823// updateWindow adjusts the inbound quota for the stream and the transport.
824// Window updates will deliver to the controller for sending when
825// the cumulative quota exceeds the corresponding threshold.
826func (t *http2Client) updateWindow(s *Stream, n uint32) {
827 s.mu.Lock()
828 defer s.mu.Unlock()
829 if s.state == streamDone {
830 return
831 }
832 if w := s.fc.onRead(n); w > 0 {
833 if cw := t.fc.resetPendingUpdate(); cw > 0 {
834 t.controlBuf.put(&windowUpdate{0, cw, false})
835 }
836 t.controlBuf.put(&windowUpdate{s.id, w, true})
837 }
838}
839
840// updateFlowControl updates the incoming flow control windows
841// for the transport and the stream based on the current bdp
842// estimation.
843func (t *http2Client) updateFlowControl(n uint32) {
844 t.mu.Lock()
845 for _, s := range t.activeStreams {
846 s.fc.newLimit(n)
847 }
848 t.initialWindowSize = int32(n)
849 t.mu.Unlock()
850 t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false})
851 t.controlBuf.put(&settings{
852 ack: false,
853 ss: []http2.Setting{
854 {
855 ID: http2.SettingInitialWindowSize,
856 Val: uint32(n),
857 },
858 },
859 })
860}
861
862func (t *http2Client) handleData(f *http2.DataFrame) {
863 size := f.Header().Length
864 var sendBDPPing bool
865 if t.bdpEst != nil {
866 sendBDPPing = t.bdpEst.add(uint32(size))
867 }
868 // Decouple connection's flow control from application's read.
869 // An update on connection's flow control should not depend on
870 // whether user application has read the data or not. Such a
871 // restriction is already imposed on the stream's flow control,
872 // and therefore the sender will be blocked anyways.
873 // Decoupling the connection flow control will prevent other
874 // active(fast) streams from starving in presence of slow or
875 // inactive streams.
876 //
877 // Furthermore, if a bdpPing is being sent out we can piggyback
878 // connection's window update for the bytes we just received.
879 if sendBDPPing {
880 t.controlBuf.put(&windowUpdate{0, uint32(size), false})
881 t.controlBuf.put(bdpPing)
882 } else {
883 if err := t.fc.onData(uint32(size)); err != nil {
884 t.notifyError(connectionErrorf(true, err, "%v", err))
885 return
886 }
887 if w := t.fc.onRead(uint32(size)); w > 0 {
888 t.controlBuf.put(&windowUpdate{0, w, true})
889 }
890 }
891 // Select the right stream to dispatch.
892 s, ok := t.getStream(f)
893 if !ok {
894 return
895 }
896 if size > 0 {
897 s.mu.Lock()
898 if s.state == streamDone {
899 s.mu.Unlock()
900 return
901 }
902 if err := s.fc.onData(uint32(size)); err != nil {
903 s.rstStream = true
904 s.rstError = http2.ErrCodeFlowControl
905 s.finish(status.New(codes.Internal, err.Error()))
906 s.mu.Unlock()
907 s.write(recvMsg{err: io.EOF})
908 return
909 }
910 if f.Header().Flags.Has(http2.FlagDataPadded) {
911 if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
912 t.controlBuf.put(&windowUpdate{s.id, w, true})
913 }
914 }
915 s.mu.Unlock()
916 // TODO(bradfitz, zhaoq): A copy is required here because there is no
917 // guarantee f.Data() is consumed before the arrival of next frame.
918 // Can this copy be eliminated?
919 if len(f.Data()) > 0 {
920 data := make([]byte, len(f.Data()))
921 copy(data, f.Data())
922 s.write(recvMsg{data: data})
923 }
924 }
925 // The server has closed the stream without sending trailers. Record that
926 // the read direction is closed, and set the status appropriately.
927 if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
928 s.mu.Lock()
929 if s.state == streamDone {
930 s.mu.Unlock()
931 return
932 }
933 s.finish(status.New(codes.Internal, "server closed the stream without sending trailers"))
934 s.mu.Unlock()
935 s.write(recvMsg{err: io.EOF})
936 }
937}
938
939func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
940 s, ok := t.getStream(f)
941 if !ok {
942 return
943 }
944 s.mu.Lock()
945 if s.state == streamDone {
946 s.mu.Unlock()
947 return
948 }
949 if !s.headerDone {
950 close(s.headerChan)
951 s.headerDone = true
952 }
953 statusCode, ok := http2ErrConvTab[http2.ErrCode(f.ErrCode)]
954 if !ok {
955 warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
956 statusCode = codes.Unknown
957 }
958 s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %d", f.ErrCode))
959 s.mu.Unlock()
960 s.write(recvMsg{err: io.EOF})
961}
962
963func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
964 if f.IsAck() {
965 return
966 }
967 var ss []http2.Setting
968 f.ForeachSetting(func(s http2.Setting) error {
969 ss = append(ss, s)
970 return nil
971 })
972 // The settings will be applied once the ack is sent.
973 t.controlBuf.put(&settings{ack: true, ss: ss})
974}
975
976func (t *http2Client) handlePing(f *http2.PingFrame) {
977 if f.IsAck() {
978 // Maybe it's a BDP ping.
979 if t.bdpEst != nil {
980 t.bdpEst.calculate(f.Data)
981 }
982 return
983 }
984 pingAck := &ping{ack: true}
985 copy(pingAck.data[:], f.Data[:])
986 t.controlBuf.put(pingAck)
987}
988
989func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
990 t.mu.Lock()
991 if t.state != reachable && t.state != draining {
992 t.mu.Unlock()
993 return
994 }
995 if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
996 infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
997 }
998 id := f.LastStreamID
999 if id > 0 && id%2 != 1 {
1000 t.mu.Unlock()
1001 t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID))
1002 return
1003 }
1004 // A client can recieve multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387).
1005 // The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay
1006 // with the ID of the last stream the server will process.
1007 // Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we
1008 // close all streams created after the second GoAwayId. This way streams that were in-flight while the GoAway from server
1009 // was being sent don't get killed.
1010 select {
1011 case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
1012 // If there are multiple GoAways the first one should always have an ID greater than the following ones.
1013 if id > t.prevGoAwayID {
1014 t.mu.Unlock()
1015 t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID))
1016 return
1017 }
1018 default:
1019 t.setGoAwayReason(f)
1020 close(t.goAway)
1021 t.state = draining
1022 }
1023 // All streams with IDs greater than the GoAwayId
1024 // and smaller than the previous GoAway ID should be killed.
1025 upperLimit := t.prevGoAwayID
1026 if upperLimit == 0 { // This is the first GoAway Frame.
1027 upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
1028 }
1029 for streamID, stream := range t.activeStreams {
1030 if streamID > id && streamID <= upperLimit {
1031 close(stream.goAway)
1032 }
1033 }
1034 t.prevGoAwayID = id
1035 active := len(t.activeStreams)
1036 t.mu.Unlock()
1037 if active == 0 {
1038 t.Close()
1039 }
1040}
1041
1042// setGoAwayReason sets the value of t.goAwayReason based
1043// on the GoAway frame received.
1044// It expects a lock on transport's mutext to be held by
1045// the caller.
1046func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1047 t.goAwayReason = NoReason
1048 switch f.ErrCode {
1049 case http2.ErrCodeEnhanceYourCalm:
1050 if string(f.DebugData()) == "too_many_pings" {
1051 t.goAwayReason = TooManyPings
1052 }
1053 }
1054}
1055
1056func (t *http2Client) GetGoAwayReason() GoAwayReason {
1057 t.mu.Lock()
1058 defer t.mu.Unlock()
1059 return t.goAwayReason
1060}
1061
1062func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1063 id := f.Header().StreamID
1064 incr := f.Increment
1065 if id == 0 {
1066 t.sendQuotaPool.add(int(incr))
1067 return
1068 }
1069 if s, ok := t.getStream(f); ok {
1070 s.sendQuotaPool.add(int(incr))
1071 }
1072}
1073
1074// operateHeaders takes action on the decoded headers.
1075func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1076 s, ok := t.getStream(frame)
1077 if !ok {
1078 return
1079 }
1080 s.mu.Lock()
1081 s.bytesReceived = true
1082 s.mu.Unlock()
1083 var state decodeState
1084 if err := state.decodeResponseHeader(frame); err != nil {
1085 s.mu.Lock()
1086 if !s.headerDone {
1087 close(s.headerChan)
1088 s.headerDone = true
1089 }
1090 s.mu.Unlock()
1091 s.write(recvMsg{err: err})
1092 // Something wrong. Stops reading even when there is remaining.
1093 return
1094 }
1095
1096 endStream := frame.StreamEnded()
1097 var isHeader bool
1098 defer func() {
1099 if t.statsHandler != nil {
1100 if isHeader {
1101 inHeader := &stats.InHeader{
1102 Client: true,
1103 WireLength: int(frame.Header().Length),
1104 }
1105 t.statsHandler.HandleRPC(s.ctx, inHeader)
1106 } else {
1107 inTrailer := &stats.InTrailer{
1108 Client: true,
1109 WireLength: int(frame.Header().Length),
1110 }
1111 t.statsHandler.HandleRPC(s.ctx, inTrailer)
1112 }
1113 }
1114 }()
1115
1116 s.mu.Lock()
1117 if !endStream {
1118 s.recvCompress = state.encoding
1119 }
1120 if !s.headerDone {
1121 if !endStream && len(state.mdata) > 0 {
1122 s.header = state.mdata
1123 }
1124 close(s.headerChan)
1125 s.headerDone = true
1126 isHeader = true
1127 }
1128 if !endStream || s.state == streamDone {
1129 s.mu.Unlock()
1130 return
1131 }
1132
1133 if len(state.mdata) > 0 {
1134 s.trailer = state.mdata
1135 }
1136 s.finish(state.status())
1137 s.mu.Unlock()
1138 s.write(recvMsg{err: io.EOF})
1139}
1140
1141func handleMalformedHTTP2(s *Stream, err error) {
1142 s.mu.Lock()
1143 if !s.headerDone {
1144 close(s.headerChan)
1145 s.headerDone = true
1146 }
1147 s.mu.Unlock()
1148 s.write(recvMsg{err: err})
1149}
1150
1151// reader runs as a separate goroutine in charge of reading data from network
1152// connection.
1153//
1154// TODO(zhaoq): currently one reader per transport. Investigate whether this is
1155// optimal.
1156// TODO(zhaoq): Check the validity of the incoming frame sequence.
1157func (t *http2Client) reader() {
1158 // Check the validity of server preface.
1159 frame, err := t.framer.readFrame()
1160 if err != nil {
1161 t.notifyError(err)
1162 return
1163 }
1164 atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1165 sf, ok := frame.(*http2.SettingsFrame)
1166 if !ok {
1167 t.notifyError(err)
1168 return
1169 }
1170 t.handleSettings(sf)
1171
1172 // loop to keep reading incoming messages on this transport.
1173 for {
1174 frame, err := t.framer.readFrame()
1175 atomic.CompareAndSwapUint32(&t.activity, 0, 1)
1176 if err != nil {
1177 // Abort an active stream if the http2.Framer returns a
1178 // http2.StreamError. This can happen only if the server's response
1179 // is malformed http2.
1180 if se, ok := err.(http2.StreamError); ok {
1181 t.mu.Lock()
1182 s := t.activeStreams[se.StreamID]
1183 t.mu.Unlock()
1184 if s != nil {
1185 // use error detail to provide better err message
1186 handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail()))
1187 }
1188 continue
1189 } else {
1190 // Transport error.
1191 t.notifyError(err)
1192 return
1193 }
1194 }
1195 switch frame := frame.(type) {
1196 case *http2.MetaHeadersFrame:
1197 t.operateHeaders(frame)
1198 case *http2.DataFrame:
1199 t.handleData(frame)
1200 case *http2.RSTStreamFrame:
1201 t.handleRSTStream(frame)
1202 case *http2.SettingsFrame:
1203 t.handleSettings(frame)
1204 case *http2.PingFrame:
1205 t.handlePing(frame)
1206 case *http2.GoAwayFrame:
1207 t.handleGoAway(frame)
1208 case *http2.WindowUpdateFrame:
1209 t.handleWindowUpdate(frame)
1210 default:
1211 errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1212 }
1213 }
1214}
1215
1216func (t *http2Client) applySettings(ss []http2.Setting) {
1217 for _, s := range ss {
1218 switch s.ID {
1219 case http2.SettingMaxConcurrentStreams:
1220 // TODO(zhaoq): This is a hack to avoid significant refactoring of the
1221 // code to deal with the unrealistic int32 overflow. Probably will try
1222 // to find a better way to handle this later.
1223 if s.Val > math.MaxInt32 {
1224 s.Val = math.MaxInt32
1225 }
1226 t.mu.Lock()
1227 ms := t.maxStreams
1228 t.maxStreams = int(s.Val)
1229 t.mu.Unlock()
1230 t.streamsQuota.add(int(s.Val) - ms)
1231 case http2.SettingInitialWindowSize:
1232 t.mu.Lock()
1233 for _, stream := range t.activeStreams {
1234 // Adjust the sending quota for each stream.
1235 stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota))
1236 }
1237 t.streamSendQuota = s.Val
1238 t.mu.Unlock()
1239 atomic.AddUint32(&t.outQuotaVersion, 1)
1240 }
1241 }
1242}
1243
1244// controller running in a separate goroutine takes charge of sending control
1245// frames (e.g., window update, reset stream, setting, etc.) to the server.
1246func (t *http2Client) controller() {
1247 for {
1248 select {
1249 case i := <-t.controlBuf.get():
1250 t.controlBuf.load()
1251 select {
1252 case <-t.writableChan:
1253 switch i := i.(type) {
1254 case *windowUpdate:
1255 t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
1256 case *settings:
1257 if i.ack {
1258 t.framer.writeSettingsAck(true)
1259 t.applySettings(i.ss)
1260 } else {
1261 t.framer.writeSettings(true, i.ss...)
1262 }
1263 case *resetStream:
1264 // If the server needs to be to intimated about stream closing,
1265 // then we need to make sure the RST_STREAM frame is written to
1266 // the wire before the headers of the next stream waiting on
1267 // streamQuota. We ensure this by adding to the streamsQuota pool
1268 // only after having acquired the writableChan to send RST_STREAM.
1269 t.streamsQuota.add(1)
1270 t.framer.writeRSTStream(true, i.streamID, i.code)
1271 case *flushIO:
1272 t.framer.flushWrite()
1273 case *ping:
1274 if !i.ack {
1275 t.bdpEst.timesnap(i.data)
1276 }
1277 t.framer.writePing(true, i.ack, i.data)
1278 default:
1279 errorf("transport: http2Client.controller got unexpected item type %v\n", i)
1280 }
1281 t.writableChan <- 0
1282 continue
1283 case <-t.shutdownChan:
1284 return
1285 }
1286 case <-t.shutdownChan:
1287 return
1288 }
1289 }
1290}
1291
1292// keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
1293func (t *http2Client) keepalive() {
1294 p := &ping{data: [8]byte{}}
1295 timer := time.NewTimer(t.kp.Time)
1296 for {
1297 select {
1298 case <-timer.C:
1299 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1300 timer.Reset(t.kp.Time)
1301 continue
1302 }
1303 // Check if keepalive should go dormant.
1304 t.mu.Lock()
1305 if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1306 // Make awakenKeepalive writable.
1307 <-t.awakenKeepalive
1308 t.mu.Unlock()
1309 select {
1310 case <-t.awakenKeepalive:
1311 // If the control gets here a ping has been sent
1312 // need to reset the timer with keepalive.Timeout.
1313 case <-t.shutdownChan:
1314 return
1315 }
1316 } else {
1317 t.mu.Unlock()
1318 // Send ping.
1319 t.controlBuf.put(p)
1320 }
1321
1322 // By the time control gets here a ping has been sent one way or the other.
1323 timer.Reset(t.kp.Timeout)
1324 select {
1325 case <-timer.C:
1326 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1327 timer.Reset(t.kp.Time)
1328 continue
1329 }
1330 t.Close()
1331 return
1332 case <-t.shutdownChan:
1333 if !timer.Stop() {
1334 <-timer.C
1335 }
1336 return
1337 }
1338 case <-t.shutdownChan:
1339 if !timer.Stop() {
1340 <-timer.C
1341 }
1342 return
1343 }
1344 }
1345}
1346
1347func (t *http2Client) Error() <-chan struct{} {
1348 return t.errorChan
1349}
1350
1351func (t *http2Client) GoAway() <-chan struct{} {
1352 return t.goAway
1353}
1354
1355func (t *http2Client) notifyError(err error) {
1356 t.mu.Lock()
1357 // make sure t.errorChan is closed only once.
1358 if t.state == draining {
1359 t.mu.Unlock()
1360 t.Close()
1361 return
1362 }
1363 if t.state == reachable {
1364 t.state = unreachable
1365 close(t.errorChan)
1366 infof("transport: http2Client.notifyError got notified that the client transport was broken %v.", err)
1367 }
1368 t.mu.Unlock()
1369}
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go
new file mode 100644
index 0000000..b6f93e3
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/http2_server.go
@@ -0,0 +1,1195 @@
1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bytes"
23 "errors"
24 "io"
25 "math"
26 "math/rand"
27 "net"
28 "strconv"
29 "sync"
30 "sync/atomic"
31 "time"
32
33 "github.com/golang/protobuf/proto"
34 "golang.org/x/net/context"
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/credentials"
39 "google.golang.org/grpc/keepalive"
40 "google.golang.org/grpc/metadata"
41 "google.golang.org/grpc/peer"
42 "google.golang.org/grpc/stats"
43 "google.golang.org/grpc/status"
44 "google.golang.org/grpc/tap"
45)
46
47// ErrIllegalHeaderWrite indicates that setting header is illegal because of
48// the stream's state.
49var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called")
50
51// http2Server implements the ServerTransport interface with HTTP2.
52type http2Server struct {
53 ctx context.Context
54 conn net.Conn
55 remoteAddr net.Addr
56 localAddr net.Addr
57 maxStreamID uint32 // max stream ID ever seen
58 authInfo credentials.AuthInfo // auth info about the connection
59 inTapHandle tap.ServerInHandle
60 // writableChan synchronizes write access to the transport.
61 // A writer acquires the write lock by receiving a value on writableChan
62 // and releases it by sending on writableChan.
63 writableChan chan int
64 // shutdownChan is closed when Close is called.
65 // Blocking operations should select on shutdownChan to avoid
66 // blocking forever after Close.
67 shutdownChan chan struct{}
68 framer *framer
69 hBuf *bytes.Buffer // the buffer for HPACK encoding
70 hEnc *hpack.Encoder // HPACK encoder
71 // The max number of concurrent streams.
72 maxStreams uint32
73 // controlBuf delivers all the control related tasks (e.g., window
74 // updates, reset streams, and various settings) to the controller.
75 controlBuf *controlBuffer
76 fc *inFlow
77 // sendQuotaPool provides flow control to outbound message.
78 sendQuotaPool *quotaPool
79 stats stats.Handler
80 // Flag to keep track of reading activity on transport.
81 // 1 is true and 0 is false.
82 activity uint32 // Accessed atomically.
83 // Keepalive and max-age parameters for the server.
84 kp keepalive.ServerParameters
85
86 // Keepalive enforcement policy.
87 kep keepalive.EnforcementPolicy
88 // The time instance last ping was received.
89 lastPingAt time.Time
90 // Number of times the client has violated keepalive ping policy so far.
91 pingStrikes uint8
92 // Flag to signify that number of ping strikes should be reset to 0.
93 // This is set whenever data or header frames are sent.
94 // 1 means yes.
95 resetPingStrikes uint32 // Accessed atomically.
96 initialWindowSize int32
97 bdpEst *bdpEstimator
98
99 outQuotaVersion uint32
100
101 mu sync.Mutex // guard the following
102
103 // drainChan is initialized when drain(...) is called the first time.
104 // After which the server writes out the first GoAway(with ID 2^31-1) frame.
105 // Then an independent goroutine will be launched to later send the second GoAway.
106 // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame.
107 // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is
108 // already underway.
109 drainChan chan struct{}
110 state transportState
111 activeStreams map[uint32]*Stream
112 // the per-stream outbound flow control window size set by the peer.
113 streamSendQuota uint32
114 // idle is the time instant when the connection went idle.
115 // This is either the begining of the connection or when the number of
116 // RPCs go down to 0.
117 // When the connection is busy, this value is set to 0.
118 idle time.Time
119}
120
121// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
122// returned if something goes wrong.
123func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
124 framer := newFramer(conn)
125 // Send initial settings as connection preface to client.
126 var isettings []http2.Setting
127 // TODO(zhaoq): Have a better way to signal "no limit" because 0 is
128 // permitted in the HTTP2 spec.
129 maxStreams := config.MaxStreams
130 if maxStreams == 0 {
131 maxStreams = math.MaxUint32
132 } else {
133 isettings = append(isettings, http2.Setting{
134 ID: http2.SettingMaxConcurrentStreams,
135 Val: maxStreams,
136 })
137 }
138 dynamicWindow := true
139 iwz := int32(initialWindowSize)
140 if config.InitialWindowSize >= defaultWindowSize {
141 iwz = config.InitialWindowSize
142 dynamicWindow = false
143 }
144 icwz := int32(initialWindowSize)
145 if config.InitialConnWindowSize >= defaultWindowSize {
146 icwz = config.InitialConnWindowSize
147 dynamicWindow = false
148 }
149 if iwz != defaultWindowSize {
150 isettings = append(isettings, http2.Setting{
151 ID: http2.SettingInitialWindowSize,
152 Val: uint32(iwz)})
153 }
154 if err := framer.writeSettings(true, isettings...); err != nil {
155 return nil, connectionErrorf(true, err, "transport: %v", err)
156 }
157 // Adjust the connection flow control window if needed.
158 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
159 if err := framer.writeWindowUpdate(true, 0, delta); err != nil {
160 return nil, connectionErrorf(true, err, "transport: %v", err)
161 }
162 }
163 kp := config.KeepaliveParams
164 if kp.MaxConnectionIdle == 0 {
165 kp.MaxConnectionIdle = defaultMaxConnectionIdle
166 }
167 if kp.MaxConnectionAge == 0 {
168 kp.MaxConnectionAge = defaultMaxConnectionAge
169 }
170 // Add a jitter to MaxConnectionAge.
171 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
172 if kp.MaxConnectionAgeGrace == 0 {
173 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
174 }
175 if kp.Time == 0 {
176 kp.Time = defaultServerKeepaliveTime
177 }
178 if kp.Timeout == 0 {
179 kp.Timeout = defaultServerKeepaliveTimeout
180 }
181 kep := config.KeepalivePolicy
182 if kep.MinTime == 0 {
183 kep.MinTime = defaultKeepalivePolicyMinTime
184 }
185 var buf bytes.Buffer
186 t := &http2Server{
187 ctx: context.Background(),
188 conn: conn,
189 remoteAddr: conn.RemoteAddr(),
190 localAddr: conn.LocalAddr(),
191 authInfo: config.AuthInfo,
192 framer: framer,
193 hBuf: &buf,
194 hEnc: hpack.NewEncoder(&buf),
195 maxStreams: maxStreams,
196 inTapHandle: config.InTapHandle,
197 controlBuf: newControlBuffer(),
198 fc: &inFlow{limit: uint32(icwz)},
199 sendQuotaPool: newQuotaPool(defaultWindowSize),
200 state: reachable,
201 writableChan: make(chan int, 1),
202 shutdownChan: make(chan struct{}),
203 activeStreams: make(map[uint32]*Stream),
204 streamSendQuota: defaultWindowSize,
205 stats: config.StatsHandler,
206 kp: kp,
207 idle: time.Now(),
208 kep: kep,
209 initialWindowSize: iwz,
210 }
211 if dynamicWindow {
212 t.bdpEst = &bdpEstimator{
213 bdp: initialWindowSize,
214 updateFlowControl: t.updateFlowControl,
215 }
216 }
217 if t.stats != nil {
218 t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
219 RemoteAddr: t.remoteAddr,
220 LocalAddr: t.localAddr,
221 })
222 connBegin := &stats.ConnBegin{}
223 t.stats.HandleConn(t.ctx, connBegin)
224 }
225 go t.controller()
226 go t.keepalive()
227 t.writableChan <- 0
228 return t, nil
229}
230
231// operateHeader takes action on the decoded headers.
232func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
233 buf := newRecvBuffer()
234 s := &Stream{
235 id: frame.Header().StreamID,
236 st: t,
237 buf: buf,
238 fc: &inFlow{limit: uint32(t.initialWindowSize)},
239 }
240
241 var state decodeState
242 for _, hf := range frame.Fields {
243 if err := state.processHeaderField(hf); err != nil {
244 if se, ok := err.(StreamError); ok {
245 t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]})
246 }
247 return
248 }
249 }
250
251 if frame.StreamEnded() {
252 // s is just created by the caller. No lock needed.
253 s.state = streamReadDone
254 }
255 s.recvCompress = state.encoding
256 if state.timeoutSet {
257 s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout)
258 } else {
259 s.ctx, s.cancel = context.WithCancel(t.ctx)
260 }
261 pr := &peer.Peer{
262 Addr: t.remoteAddr,
263 }
264 // Attach Auth info if there is any.
265 if t.authInfo != nil {
266 pr.AuthInfo = t.authInfo
267 }
268 s.ctx = peer.NewContext(s.ctx, pr)
269 // Cache the current stream to the context so that the server application
270 // can find out. Required when the server wants to send some metadata
271 // back to the client (unary call only).
272 s.ctx = newContextWithStream(s.ctx, s)
273 // Attach the received metadata to the context.
274 if len(state.mdata) > 0 {
275 s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
276 }
277 s.trReader = &transportReader{
278 reader: &recvBufferReader{
279 ctx: s.ctx,
280 recv: s.buf,
281 },
282 windowHandler: func(n int) {
283 t.updateWindow(s, uint32(n))
284 },
285 }
286 s.recvCompress = state.encoding
287 s.method = state.method
288 if t.inTapHandle != nil {
289 var err error
290 info := &tap.Info{
291 FullMethodName: state.method,
292 }
293 s.ctx, err = t.inTapHandle(s.ctx, info)
294 if err != nil {
295 warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
296 t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
297 return
298 }
299 }
300 t.mu.Lock()
301 if t.state != reachable {
302 t.mu.Unlock()
303 return
304 }
305 if uint32(len(t.activeStreams)) >= t.maxStreams {
306 t.mu.Unlock()
307 t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
308 return
309 }
310 if s.id%2 != 1 || s.id <= t.maxStreamID {
311 t.mu.Unlock()
312 // illegal gRPC stream id.
313 errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", s.id)
314 return true
315 }
316 t.maxStreamID = s.id
317 s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
318 t.activeStreams[s.id] = s
319 if len(t.activeStreams) == 1 {
320 t.idle = time.Time{}
321 }
322 t.mu.Unlock()
323 s.requestRead = func(n int) {
324 t.adjustWindow(s, uint32(n))
325 }
326 s.ctx = traceCtx(s.ctx, s.method)
327 if t.stats != nil {
328 s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method})
329 inHeader := &stats.InHeader{
330 FullMethod: s.method,
331 RemoteAddr: t.remoteAddr,
332 LocalAddr: t.localAddr,
333 Compression: s.recvCompress,
334 WireLength: int(frame.Header().Length),
335 }
336 t.stats.HandleRPC(s.ctx, inHeader)
337 }
338 handle(s)
339 return
340}
341
342// HandleStreams receives incoming streams using the given handler. This is
343// typically run in a separate goroutine.
344// traceCtx attaches trace to ctx and returns the new context.
345func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
346 // Check the validity of client preface.
347 preface := make([]byte, len(clientPreface))
348 if _, err := io.ReadFull(t.conn, preface); err != nil {
349 // Only log if it isn't a simple tcp accept check (ie: tcp balancer doing open/close socket)
350 if err != io.EOF {
351 errorf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
352 }
353 t.Close()
354 return
355 }
356 if !bytes.Equal(preface, clientPreface) {
357 errorf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
358 t.Close()
359 return
360 }
361
362 frame, err := t.framer.readFrame()
363 if err == io.EOF || err == io.ErrUnexpectedEOF {
364 t.Close()
365 return
366 }
367 if err != nil {
368 errorf("transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
369 t.Close()
370 return
371 }
372 atomic.StoreUint32(&t.activity, 1)
373 sf, ok := frame.(*http2.SettingsFrame)
374 if !ok {
375 errorf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
376 t.Close()
377 return
378 }
379 t.handleSettings(sf)
380
381 for {
382 frame, err := t.framer.readFrame()
383 atomic.StoreUint32(&t.activity, 1)
384 if err != nil {
385 if se, ok := err.(http2.StreamError); ok {
386 t.mu.Lock()
387 s := t.activeStreams[se.StreamID]
388 t.mu.Unlock()
389 if s != nil {
390 t.closeStream(s)
391 }
392 t.controlBuf.put(&resetStream{se.StreamID, se.Code})
393 continue
394 }
395 if err == io.EOF || err == io.ErrUnexpectedEOF {
396 t.Close()
397 return
398 }
399 warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
400 t.Close()
401 return
402 }
403 switch frame := frame.(type) {
404 case *http2.MetaHeadersFrame:
405 if t.operateHeaders(frame, handle, traceCtx) {
406 t.Close()
407 break
408 }
409 case *http2.DataFrame:
410 t.handleData(frame)
411 case *http2.RSTStreamFrame:
412 t.handleRSTStream(frame)
413 case *http2.SettingsFrame:
414 t.handleSettings(frame)
415 case *http2.PingFrame:
416 t.handlePing(frame)
417 case *http2.WindowUpdateFrame:
418 t.handleWindowUpdate(frame)
419 case *http2.GoAwayFrame:
420 // TODO: Handle GoAway from the client appropriately.
421 default:
422 errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
423 }
424 }
425}
426
427func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
428 t.mu.Lock()
429 defer t.mu.Unlock()
430 if t.activeStreams == nil {
431 // The transport is closing.
432 return nil, false
433 }
434 s, ok := t.activeStreams[f.Header().StreamID]
435 if !ok {
436 // The stream is already done.
437 return nil, false
438 }
439 return s, true
440}
441
442// adjustWindow sends out extra window update over the initial window size
443// of stream if the application is requesting data larger in size than
444// the window.
445func (t *http2Server) adjustWindow(s *Stream, n uint32) {
446 s.mu.Lock()
447 defer s.mu.Unlock()
448 if s.state == streamDone {
449 return
450 }
451 if w := s.fc.maybeAdjust(n); w > 0 {
452 if cw := t.fc.resetPendingUpdate(); cw > 0 {
453 t.controlBuf.put(&windowUpdate{0, cw, false})
454 }
455 t.controlBuf.put(&windowUpdate{s.id, w, true})
456 }
457}
458
459// updateWindow adjusts the inbound quota for the stream and the transport.
460// Window updates will deliver to the controller for sending when
461// the cumulative quota exceeds the corresponding threshold.
462func (t *http2Server) updateWindow(s *Stream, n uint32) {
463 s.mu.Lock()
464 defer s.mu.Unlock()
465 if s.state == streamDone {
466 return
467 }
468 if w := s.fc.onRead(n); w > 0 {
469 if cw := t.fc.resetPendingUpdate(); cw > 0 {
470 t.controlBuf.put(&windowUpdate{0, cw, false})
471 }
472 t.controlBuf.put(&windowUpdate{s.id, w, true})
473 }
474}
475
476// updateFlowControl updates the incoming flow control windows
477// for the transport and the stream based on the current bdp
478// estimation.
479func (t *http2Server) updateFlowControl(n uint32) {
480 t.mu.Lock()
481 for _, s := range t.activeStreams {
482 s.fc.newLimit(n)
483 }
484 t.initialWindowSize = int32(n)
485 t.mu.Unlock()
486 t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false})
487 t.controlBuf.put(&settings{
488 ack: false,
489 ss: []http2.Setting{
490 {
491 ID: http2.SettingInitialWindowSize,
492 Val: uint32(n),
493 },
494 },
495 })
496
497}
498
499func (t *http2Server) handleData(f *http2.DataFrame) {
500 size := f.Header().Length
501 var sendBDPPing bool
502 if t.bdpEst != nil {
503 sendBDPPing = t.bdpEst.add(uint32(size))
504 }
505 // Decouple connection's flow control from application's read.
506 // An update on connection's flow control should not depend on
507 // whether user application has read the data or not. Such a
508 // restriction is already imposed on the stream's flow control,
509 // and therefore the sender will be blocked anyways.
510 // Decoupling the connection flow control will prevent other
511 // active(fast) streams from starving in presence of slow or
512 // inactive streams.
513 //
514 // Furthermore, if a bdpPing is being sent out we can piggyback
515 // connection's window update for the bytes we just received.
516 if sendBDPPing {
517 t.controlBuf.put(&windowUpdate{0, uint32(size), false})
518 t.controlBuf.put(bdpPing)
519 } else {
520 if err := t.fc.onData(uint32(size)); err != nil {
521 errorf("transport: http2Server %v", err)
522 t.Close()
523 return
524 }
525 if w := t.fc.onRead(uint32(size)); w > 0 {
526 t.controlBuf.put(&windowUpdate{0, w, true})
527 }
528 }
529 // Select the right stream to dispatch.
530 s, ok := t.getStream(f)
531 if !ok {
532 return
533 }
534 if size > 0 {
535 s.mu.Lock()
536 if s.state == streamDone {
537 s.mu.Unlock()
538 return
539 }
540 if err := s.fc.onData(uint32(size)); err != nil {
541 s.mu.Unlock()
542 t.closeStream(s)
543 t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
544 return
545 }
546 if f.Header().Flags.Has(http2.FlagDataPadded) {
547 if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
548 t.controlBuf.put(&windowUpdate{s.id, w, true})
549 }
550 }
551 s.mu.Unlock()
552 // TODO(bradfitz, zhaoq): A copy is required here because there is no
553 // guarantee f.Data() is consumed before the arrival of next frame.
554 // Can this copy be eliminated?
555 if len(f.Data()) > 0 {
556 data := make([]byte, len(f.Data()))
557 copy(data, f.Data())
558 s.write(recvMsg{data: data})
559 }
560 }
561 if f.Header().Flags.Has(http2.FlagDataEndStream) {
562 // Received the end of stream from the client.
563 s.mu.Lock()
564 if s.state != streamDone {
565 s.state = streamReadDone
566 }
567 s.mu.Unlock()
568 s.write(recvMsg{err: io.EOF})
569 }
570}
571
572func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
573 s, ok := t.getStream(f)
574 if !ok {
575 return
576 }
577 t.closeStream(s)
578}
579
580func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
581 if f.IsAck() {
582 return
583 }
584 var ss []http2.Setting
585 f.ForeachSetting(func(s http2.Setting) error {
586 ss = append(ss, s)
587 return nil
588 })
589 // The settings will be applied once the ack is sent.
590 t.controlBuf.put(&settings{ack: true, ss: ss})
591}
592
593const (
594 maxPingStrikes = 2
595 defaultPingTimeout = 2 * time.Hour
596)
597
598func (t *http2Server) handlePing(f *http2.PingFrame) {
599 if f.IsAck() {
600 if f.Data == goAwayPing.data && t.drainChan != nil {
601 close(t.drainChan)
602 return
603 }
604 // Maybe it's a BDP ping.
605 if t.bdpEst != nil {
606 t.bdpEst.calculate(f.Data)
607 }
608 return
609 }
610 pingAck := &ping{ack: true}
611 copy(pingAck.data[:], f.Data[:])
612 t.controlBuf.put(pingAck)
613
614 now := time.Now()
615 defer func() {
616 t.lastPingAt = now
617 }()
618 // A reset ping strikes means that we don't need to check for policy
619 // violation for this ping and the pingStrikes counter should be set
620 // to 0.
621 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
622 t.pingStrikes = 0
623 return
624 }
625 t.mu.Lock()
626 ns := len(t.activeStreams)
627 t.mu.Unlock()
628 if ns < 1 && !t.kep.PermitWithoutStream {
629 // Keepalive shouldn't be active thus, this new ping should
630 // have come after atleast defaultPingTimeout.
631 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
632 t.pingStrikes++
633 }
634 } else {
635 // Check if keepalive policy is respected.
636 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
637 t.pingStrikes++
638 }
639 }
640
641 if t.pingStrikes > maxPingStrikes {
642 // Send goaway and close the connection.
643 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true})
644 }
645}
646
647func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
648 id := f.Header().StreamID
649 incr := f.Increment
650 if id == 0 {
651 t.sendQuotaPool.add(int(incr))
652 return
653 }
654 if s, ok := t.getStream(f); ok {
655 s.sendQuotaPool.add(int(incr))
656 }
657}
658
659func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error {
660 first := true
661 endHeaders := false
662 var err error
663 defer func() {
664 if err == nil {
665 // Reset ping strikes when seding headers since that might cause the
666 // peer to send ping.
667 atomic.StoreUint32(&t.resetPingStrikes, 1)
668 }
669 }()
670 // Sends the headers in a single batch.
671 for !endHeaders {
672 size := t.hBuf.Len()
673 if size > http2MaxFrameLen {
674 size = http2MaxFrameLen
675 } else {
676 endHeaders = true
677 }
678 if first {
679 p := http2.HeadersFrameParam{
680 StreamID: s.id,
681 BlockFragment: b.Next(size),
682 EndStream: endStream,
683 EndHeaders: endHeaders,
684 }
685 err = t.framer.writeHeaders(endHeaders, p)
686 first = false
687 } else {
688 err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size))
689 }
690 if err != nil {
691 t.Close()
692 return connectionErrorf(true, err, "transport: %v", err)
693 }
694 }
695 return nil
696}
697
698// WriteHeader sends the header metedata md back to the client.
699func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
700 s.mu.Lock()
701 if s.headerOk || s.state == streamDone {
702 s.mu.Unlock()
703 return ErrIllegalHeaderWrite
704 }
705 s.headerOk = true
706 if md.Len() > 0 {
707 if s.header.Len() > 0 {
708 s.header = metadata.Join(s.header, md)
709 } else {
710 s.header = md
711 }
712 }
713 md = s.header
714 s.mu.Unlock()
715 if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
716 return err
717 }
718 t.hBuf.Reset()
719 t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
720 t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
721 if s.sendCompress != "" {
722 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
723 }
724 for k, vv := range md {
725 if isReservedHeader(k) {
726 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
727 continue
728 }
729 for _, v := range vv {
730 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
731 }
732 }
733 bufLen := t.hBuf.Len()
734 if err := t.writeHeaders(s, t.hBuf, false); err != nil {
735 return err
736 }
737 if t.stats != nil {
738 outHeader := &stats.OutHeader{
739 WireLength: bufLen,
740 }
741 t.stats.HandleRPC(s.Context(), outHeader)
742 }
743 t.writableChan <- 0
744 return nil
745}
746
747// WriteStatus sends stream status to the client and terminates the stream.
748// There is no further I/O operations being able to perform on this stream.
749// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
750// OK is adopted.
751func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
752 var headersSent, hasHeader bool
753 s.mu.Lock()
754 if s.state == streamDone {
755 s.mu.Unlock()
756 return nil
757 }
758 if s.headerOk {
759 headersSent = true
760 }
761 if s.header.Len() > 0 {
762 hasHeader = true
763 }
764 s.mu.Unlock()
765
766 if !headersSent && hasHeader {
767 t.WriteHeader(s, nil)
768 headersSent = true
769 }
770
771 if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
772 return err
773 }
774 t.hBuf.Reset()
775 if !headersSent {
776 t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
777 t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
778 }
779 t.hEnc.WriteField(
780 hpack.HeaderField{
781 Name: "grpc-status",
782 Value: strconv.Itoa(int(st.Code())),
783 })
784 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
785
786 if p := st.Proto(); p != nil && len(p.Details) > 0 {
787 stBytes, err := proto.Marshal(p)
788 if err != nil {
789 // TODO: return error instead, when callers are able to handle it.
790 panic(err)
791 }
792
793 t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)})
794 }
795
796 // Attach the trailer metadata.
797 for k, vv := range s.trailer {
798 // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
799 if isReservedHeader(k) {
800 continue
801 }
802 for _, v := range vv {
803 t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
804 }
805 }
806 bufLen := t.hBuf.Len()
807 if err := t.writeHeaders(s, t.hBuf, true); err != nil {
808 t.Close()
809 return err
810 }
811 if t.stats != nil {
812 outTrailer := &stats.OutTrailer{
813 WireLength: bufLen,
814 }
815 t.stats.HandleRPC(s.Context(), outTrailer)
816 }
817 t.closeStream(s)
818 t.writableChan <- 0
819 return nil
820}
821
822// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
823// is returns if it fails (e.g., framing error, transport error).
824func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) {
825 // TODO(zhaoq): Support multi-writers for a single stream.
826 var writeHeaderFrame bool
827 s.mu.Lock()
828 if s.state == streamDone {
829 s.mu.Unlock()
830 return streamErrorf(codes.Unknown, "the stream has been done")
831 }
832 if !s.headerOk {
833 writeHeaderFrame = true
834 }
835 s.mu.Unlock()
836 if writeHeaderFrame {
837 t.WriteHeader(s, nil)
838 }
839 r := bytes.NewBuffer(data)
840 var (
841 p []byte
842 oqv uint32
843 )
844 for {
845 if r.Len() == 0 && p == nil {
846 return nil
847 }
848 oqv = atomic.LoadUint32(&t.outQuotaVersion)
849 size := http2MaxFrameLen
850 // Wait until the stream has some quota to send the data.
851 sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire())
852 if err != nil {
853 return err
854 }
855 // Wait until the transport has some quota to send the data.
856 tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire())
857 if err != nil {
858 return err
859 }
860 if sq < size {
861 size = sq
862 }
863 if tq < size {
864 size = tq
865 }
866 if p == nil {
867 p = r.Next(size)
868 }
869 ps := len(p)
870 if ps < sq {
871 // Overbooked stream quota. Return it back.
872 s.sendQuotaPool.add(sq - ps)
873 }
874 if ps < tq {
875 // Overbooked transport quota. Return it back.
876 t.sendQuotaPool.add(tq - ps)
877 }
878 t.framer.adjustNumWriters(1)
879 // Got some quota. Try to acquire writing privilege on the
880 // transport.
881 if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil {
882 if _, ok := err.(StreamError); ok {
883 // Return the connection quota back.
884 t.sendQuotaPool.add(ps)
885 }
886 if t.framer.adjustNumWriters(-1) == 0 {
887 // This writer is the last one in this batch and has the
888 // responsibility to flush the buffered frames. It queues
889 // a flush request to controlBuf instead of flushing directly
890 // in order to avoid the race with other writing or flushing.
891 t.controlBuf.put(&flushIO{})
892 }
893 return err
894 }
895 select {
896 case <-s.ctx.Done():
897 t.sendQuotaPool.add(ps)
898 if t.framer.adjustNumWriters(-1) == 0 {
899 t.controlBuf.put(&flushIO{})
900 }
901 t.writableChan <- 0
902 return ContextErr(s.ctx.Err())
903 default:
904 }
905 if oqv != atomic.LoadUint32(&t.outQuotaVersion) {
906 // InitialWindowSize settings frame must have been received after we
907 // acquired send quota but before we got the writable channel.
908 // We must forsake this write.
909 t.sendQuotaPool.add(ps)
910 s.sendQuotaPool.add(ps)
911 if t.framer.adjustNumWriters(-1) == 0 {
912 t.controlBuf.put(&flushIO{})
913 }
914 t.writableChan <- 0
915 continue
916 }
917 var forceFlush bool
918 if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last {
919 forceFlush = true
920 }
921 // Reset ping strikes when sending data since this might cause
922 // the peer to send ping.
923 atomic.StoreUint32(&t.resetPingStrikes, 1)
924 if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil {
925 t.Close()
926 return connectionErrorf(true, err, "transport: %v", err)
927 }
928 p = nil
929 if t.framer.adjustNumWriters(-1) == 0 {
930 t.framer.flushWrite()
931 }
932 t.writableChan <- 0
933 }
934
935}
936
937func (t *http2Server) applySettings(ss []http2.Setting) {
938 for _, s := range ss {
939 if s.ID == http2.SettingInitialWindowSize {
940 t.mu.Lock()
941 defer t.mu.Unlock()
942 for _, stream := range t.activeStreams {
943 stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota))
944 }
945 t.streamSendQuota = s.Val
946 atomic.AddUint32(&t.outQuotaVersion, 1)
947 }
948
949 }
950}
951
952// keepalive running in a separate goroutine does the following:
953// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
954// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
955// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
956// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
957// after an additional duration of keepalive.Timeout.
958func (t *http2Server) keepalive() {
959 p := &ping{}
960 var pingSent bool
961 maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
962 maxAge := time.NewTimer(t.kp.MaxConnectionAge)
963 keepalive := time.NewTimer(t.kp.Time)
964 // NOTE: All exit paths of this function should reset their
965 // respecitve timers. A failure to do so will cause the
966 // following clean-up to deadlock and eventually leak.
967 defer func() {
968 if !maxIdle.Stop() {
969 <-maxIdle.C
970 }
971 if !maxAge.Stop() {
972 <-maxAge.C
973 }
974 if !keepalive.Stop() {
975 <-keepalive.C
976 }
977 }()
978 for {
979 select {
980 case <-maxIdle.C:
981 t.mu.Lock()
982 idle := t.idle
983 if idle.IsZero() { // The connection is non-idle.
984 t.mu.Unlock()
985 maxIdle.Reset(t.kp.MaxConnectionIdle)
986 continue
987 }
988 val := t.kp.MaxConnectionIdle - time.Since(idle)
989 t.mu.Unlock()
990 if val <= 0 {
991 // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
992 // Gracefully close the connection.
993 t.drain(http2.ErrCodeNo, []byte{})
994 // Reseting the timer so that the clean-up doesn't deadlock.
995 maxIdle.Reset(infinity)
996 return
997 }
998 maxIdle.Reset(val)
999 case <-maxAge.C:
1000 t.drain(http2.ErrCodeNo, []byte{})
1001 maxAge.Reset(t.kp.MaxConnectionAgeGrace)
1002 select {
1003 case <-maxAge.C:
1004 // Close the connection after grace period.
1005 t.Close()
1006 // Reseting the timer so that the clean-up doesn't deadlock.
1007 maxAge.Reset(infinity)
1008 case <-t.shutdownChan:
1009 }
1010 return
1011 case <-keepalive.C:
1012 if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
1013 pingSent = false
1014 keepalive.Reset(t.kp.Time)
1015 continue
1016 }
1017 if pingSent {
1018 t.Close()
1019 // Reseting the timer so that the clean-up doesn't deadlock.
1020 keepalive.Reset(infinity)
1021 return
1022 }
1023 pingSent = true
1024 t.controlBuf.put(p)
1025 keepalive.Reset(t.kp.Timeout)
1026 case <-t.shutdownChan:
1027 return
1028 }
1029 }
1030}
1031
1032var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1033
1034// controller running in a separate goroutine takes charge of sending control
1035// frames (e.g., window update, reset stream, setting, etc.) to the server.
1036func (t *http2Server) controller() {
1037 for {
1038 select {
1039 case i := <-t.controlBuf.get():
1040 t.controlBuf.load()
1041 select {
1042 case <-t.writableChan:
1043 switch i := i.(type) {
1044 case *windowUpdate:
1045 t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
1046 case *settings:
1047 if i.ack {
1048 t.framer.writeSettingsAck(true)
1049 t.applySettings(i.ss)
1050 } else {
1051 t.framer.writeSettings(true, i.ss...)
1052 }
1053 case *resetStream:
1054 t.framer.writeRSTStream(true, i.streamID, i.code)
1055 case *goAway:
1056 t.mu.Lock()
1057 if t.state == closing {
1058 t.mu.Unlock()
1059 // The transport is closing.
1060 return
1061 }
1062 sid := t.maxStreamID
1063 if !i.headsUp {
1064 // Stop accepting more streams now.
1065 t.state = draining
1066 t.mu.Unlock()
1067 t.framer.writeGoAway(true, sid, i.code, i.debugData)
1068 if i.closeConn {
1069 // Abruptly close the connection following the GoAway.
1070 t.Close()
1071 }
1072 t.writableChan <- 0
1073 continue
1074 }
1075 t.mu.Unlock()
1076 // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
1077 // Follow that with a ping and wait for the ack to come back or a timer
1078 // to expire. During this time accept new streams since they might have
1079 // originated before the GoAway reaches the client.
1080 // After getting the ack or timer expiration send out another GoAway this
1081 // time with an ID of the max stream server intends to process.
1082 t.framer.writeGoAway(true, math.MaxUint32, http2.ErrCodeNo, []byte{})
1083 t.framer.writePing(true, false, goAwayPing.data)
1084 go func() {
1085 timer := time.NewTimer(time.Minute)
1086 defer timer.Stop()
1087 select {
1088 case <-t.drainChan:
1089 case <-timer.C:
1090 case <-t.shutdownChan:
1091 return
1092 }
1093 t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
1094 }()
1095 case *flushIO:
1096 t.framer.flushWrite()
1097 case *ping:
1098 if !i.ack {
1099 t.bdpEst.timesnap(i.data)
1100 }
1101 t.framer.writePing(true, i.ack, i.data)
1102 default:
1103 errorf("transport: http2Server.controller got unexpected item type %v\n", i)
1104 }
1105 t.writableChan <- 0
1106 continue
1107 case <-t.shutdownChan:
1108 return
1109 }
1110 case <-t.shutdownChan:
1111 return
1112 }
1113 }
1114}
1115
1116// Close starts shutting down the http2Server transport.
1117// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
1118// could cause some resource issue. Revisit this later.
1119func (t *http2Server) Close() (err error) {
1120 t.mu.Lock()
1121 if t.state == closing {
1122 t.mu.Unlock()
1123 return errors.New("transport: Close() was already called")
1124 }
1125 t.state = closing
1126 streams := t.activeStreams
1127 t.activeStreams = nil
1128 t.mu.Unlock()
1129 close(t.shutdownChan)
1130 err = t.conn.Close()
1131 // Cancel all active streams.
1132 for _, s := range streams {
1133 s.cancel()
1134 }
1135 if t.stats != nil {
1136 connEnd := &stats.ConnEnd{}
1137 t.stats.HandleConn(t.ctx, connEnd)
1138 }
1139 return
1140}
1141
1142// closeStream clears the footprint of a stream when the stream is not needed
1143// any more.
1144func (t *http2Server) closeStream(s *Stream) {
1145 t.mu.Lock()
1146 delete(t.activeStreams, s.id)
1147 if len(t.activeStreams) == 0 {
1148 t.idle = time.Now()
1149 }
1150 if t.state == draining && len(t.activeStreams) == 0 {
1151 defer t.Close()
1152 }
1153 t.mu.Unlock()
1154 // In case stream sending and receiving are invoked in separate
1155 // goroutines (e.g., bi-directional streaming), cancel needs to be
1156 // called to interrupt the potential blocking on other goroutines.
1157 s.cancel()
1158 s.mu.Lock()
1159 if s.state == streamDone {
1160 s.mu.Unlock()
1161 return
1162 }
1163 s.state = streamDone
1164 s.mu.Unlock()
1165}
1166
1167func (t *http2Server) RemoteAddr() net.Addr {
1168 return t.remoteAddr
1169}
1170
1171func (t *http2Server) Drain() {
1172 t.drain(http2.ErrCodeNo, []byte{})
1173}
1174
1175func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
1176 t.mu.Lock()
1177 defer t.mu.Unlock()
1178 if t.drainChan != nil {
1179 return
1180 }
1181 t.drainChan = make(chan struct{})
1182 t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
1183}
1184
1185var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
1186
1187func getJitter(v time.Duration) time.Duration {
1188 if v == infinity {
1189 return 0
1190 }
1191 // Generate a jitter between +/- 10% of the value.
1192 r := int64(v / 10)
1193 j := rgen.Int63n(2*r) - r
1194 return time.Duration(j)
1195}
diff --git a/vendor/google.golang.org/grpc/transport/http_util.go b/vendor/google.golang.org/grpc/transport/http_util.go
new file mode 100644
index 0000000..685c6fb
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/http_util.go
@@ -0,0 +1,597 @@
1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "bufio"
23 "bytes"
24 "encoding/base64"
25 "fmt"
26 "io"
27 "net"
28 "net/http"
29 "strconv"
30 "strings"
31 "sync/atomic"
32 "time"
33
34 "github.com/golang/protobuf/proto"
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37 spb "google.golang.org/genproto/googleapis/rpc/status"
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/status"
40)
41
42const (
43 // http2MaxFrameLen specifies the max length of a HTTP2 frame.
44 http2MaxFrameLen = 16384 // 16KB frame
45 // http://http2.github.io/http2-spec/#SettingValues
46 http2InitHeaderTableSize = 4096
47 // http2IOBufSize specifies the buffer size for sending frames.
48 http2IOBufSize = 32 * 1024
49)
50
51var (
52 clientPreface = []byte(http2.ClientPreface)
53 http2ErrConvTab = map[http2.ErrCode]codes.Code{
54 http2.ErrCodeNo: codes.Internal,
55 http2.ErrCodeProtocol: codes.Internal,
56 http2.ErrCodeInternal: codes.Internal,
57 http2.ErrCodeFlowControl: codes.ResourceExhausted,
58 http2.ErrCodeSettingsTimeout: codes.Internal,
59 http2.ErrCodeStreamClosed: codes.Internal,
60 http2.ErrCodeFrameSize: codes.Internal,
61 http2.ErrCodeRefusedStream: codes.Unavailable,
62 http2.ErrCodeCancel: codes.Canceled,
63 http2.ErrCodeCompression: codes.Internal,
64 http2.ErrCodeConnect: codes.Internal,
65 http2.ErrCodeEnhanceYourCalm: codes.ResourceExhausted,
66 http2.ErrCodeInadequateSecurity: codes.PermissionDenied,
67 http2.ErrCodeHTTP11Required: codes.FailedPrecondition,
68 }
69 statusCodeConvTab = map[codes.Code]http2.ErrCode{
70 codes.Internal: http2.ErrCodeInternal,
71 codes.Canceled: http2.ErrCodeCancel,
72 codes.Unavailable: http2.ErrCodeRefusedStream,
73 codes.ResourceExhausted: http2.ErrCodeEnhanceYourCalm,
74 codes.PermissionDenied: http2.ErrCodeInadequateSecurity,
75 }
76 httpStatusConvTab = map[int]codes.Code{
77 // 400 Bad Request - INTERNAL.
78 http.StatusBadRequest: codes.Internal,
79 // 401 Unauthorized - UNAUTHENTICATED.
80 http.StatusUnauthorized: codes.Unauthenticated,
81 // 403 Forbidden - PERMISSION_DENIED.
82 http.StatusForbidden: codes.PermissionDenied,
83 // 404 Not Found - UNIMPLEMENTED.
84 http.StatusNotFound: codes.Unimplemented,
85 // 429 Too Many Requests - UNAVAILABLE.
86 http.StatusTooManyRequests: codes.Unavailable,
87 // 502 Bad Gateway - UNAVAILABLE.
88 http.StatusBadGateway: codes.Unavailable,
89 // 503 Service Unavailable - UNAVAILABLE.
90 http.StatusServiceUnavailable: codes.Unavailable,
91 // 504 Gateway timeout - UNAVAILABLE.
92 http.StatusGatewayTimeout: codes.Unavailable,
93 }
94)
95
96// Records the states during HPACK decoding. Must be reset once the
97// decoding of the entire headers are finished.
98type decodeState struct {
99 encoding string
100 // statusGen caches the stream status received from the trailer the server
101 // sent. Client side only. Do not access directly. After all trailers are
102 // parsed, use the status method to retrieve the status.
103 statusGen *status.Status
104 // rawStatusCode and rawStatusMsg are set from the raw trailer fields and are not
105 // intended for direct access outside of parsing.
106 rawStatusCode *int
107 rawStatusMsg string
108 httpStatus *int
109 // Server side only fields.
110 timeoutSet bool
111 timeout time.Duration
112 method string
113 // key-value metadata map from the peer.
114 mdata map[string][]string
115}
116
117// isReservedHeader checks whether hdr belongs to HTTP2 headers
118// reserved by gRPC protocol. Any other headers are classified as the
119// user-specified metadata.
120func isReservedHeader(hdr string) bool {
121 if hdr != "" && hdr[0] == ':' {
122 return true
123 }
124 switch hdr {
125 case "content-type",
126 "grpc-message-type",
127 "grpc-encoding",
128 "grpc-message",
129 "grpc-status",
130 "grpc-timeout",
131 "grpc-status-details-bin",
132 "te":
133 return true
134 default:
135 return false
136 }
137}
138
139// isWhitelistedPseudoHeader checks whether hdr belongs to HTTP2 pseudoheaders
140// that should be propagated into metadata visible to users.
141func isWhitelistedPseudoHeader(hdr string) bool {
142 switch hdr {
143 case ":authority":
144 return true
145 default:
146 return false
147 }
148}
149
150func validContentType(t string) bool {
151 e := "application/grpc"
152 if !strings.HasPrefix(t, e) {
153 return false
154 }
155 // Support variations on the content-type
156 // (e.g. "application/grpc+blah", "application/grpc;blah").
157 if len(t) > len(e) && t[len(e)] != '+' && t[len(e)] != ';' {
158 return false
159 }
160 return true
161}
162
163func (d *decodeState) status() *status.Status {
164 if d.statusGen == nil {
165 // No status-details were provided; generate status using code/msg.
166 d.statusGen = status.New(codes.Code(int32(*(d.rawStatusCode))), d.rawStatusMsg)
167 }
168 return d.statusGen
169}
170
171const binHdrSuffix = "-bin"
172
173func encodeBinHeader(v []byte) string {
174 return base64.RawStdEncoding.EncodeToString(v)
175}
176
177func decodeBinHeader(v string) ([]byte, error) {
178 if len(v)%4 == 0 {
179 // Input was padded, or padding was not necessary.
180 return base64.StdEncoding.DecodeString(v)
181 }
182 return base64.RawStdEncoding.DecodeString(v)
183}
184
185func encodeMetadataHeader(k, v string) string {
186 if strings.HasSuffix(k, binHdrSuffix) {
187 return encodeBinHeader(([]byte)(v))
188 }
189 return v
190}
191
192func decodeMetadataHeader(k, v string) (string, error) {
193 if strings.HasSuffix(k, binHdrSuffix) {
194 b, err := decodeBinHeader(v)
195 return string(b), err
196 }
197 return v, nil
198}
199
200func (d *decodeState) decodeResponseHeader(frame *http2.MetaHeadersFrame) error {
201 for _, hf := range frame.Fields {
202 if err := d.processHeaderField(hf); err != nil {
203 return err
204 }
205 }
206
207 // If grpc status exists, no need to check further.
208 if d.rawStatusCode != nil || d.statusGen != nil {
209 return nil
210 }
211
212 // If grpc status doesn't exist and http status doesn't exist,
213 // then it's a malformed header.
214 if d.httpStatus == nil {
215 return streamErrorf(codes.Internal, "malformed header: doesn't contain status(gRPC or HTTP)")
216 }
217
218 if *(d.httpStatus) != http.StatusOK {
219 code, ok := httpStatusConvTab[*(d.httpStatus)]
220 if !ok {
221 code = codes.Unknown
222 }
223 return streamErrorf(code, http.StatusText(*(d.httpStatus)))
224 }
225
226 // gRPC status doesn't exist and http status is OK.
227 // Set rawStatusCode to be unknown and return nil error.
228 // So that, if the stream has ended this Unknown status
229 // will be propogated to the user.
230 // Otherwise, it will be ignored. In which case, status from
231 // a later trailer, that has StreamEnded flag set, is propogated.
232 code := int(codes.Unknown)
233 d.rawStatusCode = &code
234 return nil
235
236}
237
238func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
239 switch f.Name {
240 case "content-type":
241 if !validContentType(f.Value) {
242 return streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value)
243 }
244 case "grpc-encoding":
245 d.encoding = f.Value
246 case "grpc-status":
247 code, err := strconv.Atoi(f.Value)
248 if err != nil {
249 return streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err)
250 }
251 d.rawStatusCode = &code
252 case "grpc-message":
253 d.rawStatusMsg = decodeGrpcMessage(f.Value)
254 case "grpc-status-details-bin":
255 v, err := decodeBinHeader(f.Value)
256 if err != nil {
257 return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
258 }
259 s := &spb.Status{}
260 if err := proto.Unmarshal(v, s); err != nil {
261 return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
262 }
263 d.statusGen = status.FromProto(s)
264 case "grpc-timeout":
265 d.timeoutSet = true
266 var err error
267 if d.timeout, err = decodeTimeout(f.Value); err != nil {
268 return streamErrorf(codes.Internal, "transport: malformed time-out: %v", err)
269 }
270 case ":path":
271 d.method = f.Value
272 case ":status":
273 code, err := strconv.Atoi(f.Value)
274 if err != nil {
275 return streamErrorf(codes.Internal, "transport: malformed http-status: %v", err)
276 }
277 d.httpStatus = &code
278 default:
279 if !isReservedHeader(f.Name) || isWhitelistedPseudoHeader(f.Name) {
280 if d.mdata == nil {
281 d.mdata = make(map[string][]string)
282 }
283 v, err := decodeMetadataHeader(f.Name, f.Value)
284 if err != nil {
285 errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err)
286 return nil
287 }
288 d.mdata[f.Name] = append(d.mdata[f.Name], v)
289 }
290 }
291 return nil
292}
293
294type timeoutUnit uint8
295
296const (
297 hour timeoutUnit = 'H'
298 minute timeoutUnit = 'M'
299 second timeoutUnit = 'S'
300 millisecond timeoutUnit = 'm'
301 microsecond timeoutUnit = 'u'
302 nanosecond timeoutUnit = 'n'
303)
304
305func timeoutUnitToDuration(u timeoutUnit) (d time.Duration, ok bool) {
306 switch u {
307 case hour:
308 return time.Hour, true
309 case minute:
310 return time.Minute, true
311 case second:
312 return time.Second, true
313 case millisecond:
314 return time.Millisecond, true
315 case microsecond:
316 return time.Microsecond, true
317 case nanosecond:
318 return time.Nanosecond, true
319 default:
320 }
321 return
322}
323
324const maxTimeoutValue int64 = 100000000 - 1
325
326// div does integer division and round-up the result. Note that this is
327// equivalent to (d+r-1)/r but has less chance to overflow.
328func div(d, r time.Duration) int64 {
329 if m := d % r; m > 0 {
330 return int64(d/r + 1)
331 }
332 return int64(d / r)
333}
334
335// TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it.
336func encodeTimeout(t time.Duration) string {
337 if t <= 0 {
338 return "0n"
339 }
340 if d := div(t, time.Nanosecond); d <= maxTimeoutValue {
341 return strconv.FormatInt(d, 10) + "n"
342 }
343 if d := div(t, time.Microsecond); d <= maxTimeoutValue {
344 return strconv.FormatInt(d, 10) + "u"
345 }
346 if d := div(t, time.Millisecond); d <= maxTimeoutValue {
347 return strconv.FormatInt(d, 10) + "m"
348 }
349 if d := div(t, time.Second); d <= maxTimeoutValue {
350 return strconv.FormatInt(d, 10) + "S"
351 }
352 if d := div(t, time.Minute); d <= maxTimeoutValue {
353 return strconv.FormatInt(d, 10) + "M"
354 }
355 // Note that maxTimeoutValue * time.Hour > MaxInt64.
356 return strconv.FormatInt(div(t, time.Hour), 10) + "H"
357}
358
359func decodeTimeout(s string) (time.Duration, error) {
360 size := len(s)
361 if size < 2 {
362 return 0, fmt.Errorf("transport: timeout string is too short: %q", s)
363 }
364 unit := timeoutUnit(s[size-1])
365 d, ok := timeoutUnitToDuration(unit)
366 if !ok {
367 return 0, fmt.Errorf("transport: timeout unit is not recognized: %q", s)
368 }
369 t, err := strconv.ParseInt(s[:size-1], 10, 64)
370 if err != nil {
371 return 0, err
372 }
373 return d * time.Duration(t), nil
374}
375
376const (
377 spaceByte = ' '
378 tildaByte = '~'
379 percentByte = '%'
380)
381
382// encodeGrpcMessage is used to encode status code in header field
383// "grpc-message".
384// It checks to see if each individual byte in msg is an
385// allowable byte, and then either percent encoding or passing it through.
386// When percent encoding, the byte is converted into hexadecimal notation
387// with a '%' prepended.
388func encodeGrpcMessage(msg string) string {
389 if msg == "" {
390 return ""
391 }
392 lenMsg := len(msg)
393 for i := 0; i < lenMsg; i++ {
394 c := msg[i]
395 if !(c >= spaceByte && c < tildaByte && c != percentByte) {
396 return encodeGrpcMessageUnchecked(msg)
397 }
398 }
399 return msg
400}
401
402func encodeGrpcMessageUnchecked(msg string) string {
403 var buf bytes.Buffer
404 lenMsg := len(msg)
405 for i := 0; i < lenMsg; i++ {
406 c := msg[i]
407 if c >= spaceByte && c < tildaByte && c != percentByte {
408 buf.WriteByte(c)
409 } else {
410 buf.WriteString(fmt.Sprintf("%%%02X", c))
411 }
412 }
413 return buf.String()
414}
415
416// decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
417func decodeGrpcMessage(msg string) string {
418 if msg == "" {
419 return ""
420 }
421 lenMsg := len(msg)
422 for i := 0; i < lenMsg; i++ {
423 if msg[i] == percentByte && i+2 < lenMsg {
424 return decodeGrpcMessageUnchecked(msg)
425 }
426 }
427 return msg
428}
429
430func decodeGrpcMessageUnchecked(msg string) string {
431 var buf bytes.Buffer
432 lenMsg := len(msg)
433 for i := 0; i < lenMsg; i++ {
434 c := msg[i]
435 if c == percentByte && i+2 < lenMsg {
436 parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
437 if err != nil {
438 buf.WriteByte(c)
439 } else {
440 buf.WriteByte(byte(parsed))
441 i += 2
442 }
443 } else {
444 buf.WriteByte(c)
445 }
446 }
447 return buf.String()
448}
449
450type framer struct {
451 numWriters int32
452 reader io.Reader
453 writer *bufio.Writer
454 fr *http2.Framer
455}
456
457func newFramer(conn net.Conn) *framer {
458 f := &framer{
459 reader: bufio.NewReaderSize(conn, http2IOBufSize),
460 writer: bufio.NewWriterSize(conn, http2IOBufSize),
461 }
462 f.fr = http2.NewFramer(f.writer, f.reader)
463 // Opt-in to Frame reuse API on framer to reduce garbage.
464 // Frames aren't safe to read from after a subsequent call to ReadFrame.
465 f.fr.SetReuseFrames()
466 f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
467 return f
468}
469
470func (f *framer) adjustNumWriters(i int32) int32 {
471 return atomic.AddInt32(&f.numWriters, i)
472}
473
474// The following writeXXX functions can only be called when the caller gets
475// unblocked from writableChan channel (i.e., owns the privilege to write).
476
477func (f *framer) writeContinuation(forceFlush bool, streamID uint32, endHeaders bool, headerBlockFragment []byte) error {
478 if err := f.fr.WriteContinuation(streamID, endHeaders, headerBlockFragment); err != nil {
479 return err
480 }
481 if forceFlush {
482 return f.writer.Flush()
483 }
484 return nil
485}
486
487func (f *framer) writeData(forceFlush bool, streamID uint32, endStream bool, data []byte) error {
488 if err := f.fr.WriteData(streamID, endStream, data); err != nil {
489 return err
490 }
491 if forceFlush {
492 return f.writer.Flush()
493 }
494 return nil
495}
496
497func (f *framer) writeGoAway(forceFlush bool, maxStreamID uint32, code http2.ErrCode, debugData []byte) error {
498 if err := f.fr.WriteGoAway(maxStreamID, code, debugData); err != nil {
499 return err
500 }
501 if forceFlush {
502 return f.writer.Flush()
503 }
504 return nil
505}
506
507func (f *framer) writeHeaders(forceFlush bool, p http2.HeadersFrameParam) error {
508 if err := f.fr.WriteHeaders(p); err != nil {
509 return err
510 }
511 if forceFlush {
512 return f.writer.Flush()
513 }
514 return nil
515}
516
517func (f *framer) writePing(forceFlush, ack bool, data [8]byte) error {
518 if err := f.fr.WritePing(ack, data); err != nil {
519 return err
520 }
521 if forceFlush {
522 return f.writer.Flush()
523 }
524 return nil
525}
526
527func (f *framer) writePriority(forceFlush bool, streamID uint32, p http2.PriorityParam) error {
528 if err := f.fr.WritePriority(streamID, p); err != nil {
529 return err
530 }
531 if forceFlush {
532 return f.writer.Flush()
533 }
534 return nil
535}
536
537func (f *framer) writePushPromise(forceFlush bool, p http2.PushPromiseParam) error {
538 if err := f.fr.WritePushPromise(p); err != nil {
539 return err
540 }
541 if forceFlush {
542 return f.writer.Flush()
543 }
544 return nil
545}
546
547func (f *framer) writeRSTStream(forceFlush bool, streamID uint32, code http2.ErrCode) error {
548 if err := f.fr.WriteRSTStream(streamID, code); err != nil {
549 return err
550 }
551 if forceFlush {
552 return f.writer.Flush()
553 }
554 return nil
555}
556
557func (f *framer) writeSettings(forceFlush bool, settings ...http2.Setting) error {
558 if err := f.fr.WriteSettings(settings...); err != nil {
559 return err
560 }
561 if forceFlush {
562 return f.writer.Flush()
563 }
564 return nil
565}
566
567func (f *framer) writeSettingsAck(forceFlush bool) error {
568 if err := f.fr.WriteSettingsAck(); err != nil {
569 return err
570 }
571 if forceFlush {
572 return f.writer.Flush()
573 }
574 return nil
575}
576
577func (f *framer) writeWindowUpdate(forceFlush bool, streamID, incr uint32) error {
578 if err := f.fr.WriteWindowUpdate(streamID, incr); err != nil {
579 return err
580 }
581 if forceFlush {
582 return f.writer.Flush()
583 }
584 return nil
585}
586
587func (f *framer) flushWrite() error {
588 return f.writer.Flush()
589}
590
591func (f *framer) readFrame() (http2.Frame, error) {
592 return f.fr.ReadFrame()
593}
594
595func (f *framer) errorDetail() error {
596 return f.fr.ErrorDetail()
597}
diff --git a/vendor/google.golang.org/grpc/transport/log.go b/vendor/google.golang.org/grpc/transport/log.go
new file mode 100644
index 0000000..ac8e358
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/log.go
@@ -0,0 +1,50 @@
1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// This file contains wrappers for grpclog functions.
20// The transport package only logs to verbose level 2 by default.
21
22package transport
23
24import "google.golang.org/grpc/grpclog"
25
26const logLevel = 2
27
28func infof(format string, args ...interface{}) {
29 if grpclog.V(logLevel) {
30 grpclog.Infof(format, args...)
31 }
32}
33
34func warningf(format string, args ...interface{}) {
35 if grpclog.V(logLevel) {
36 grpclog.Warningf(format, args...)
37 }
38}
39
40func errorf(format string, args ...interface{}) {
41 if grpclog.V(logLevel) {
42 grpclog.Errorf(format, args...)
43 }
44}
45
46func fatalf(format string, args ...interface{}) {
47 if grpclog.V(logLevel) {
48 grpclog.Fatalf(format, args...)
49 }
50}
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go
new file mode 100644
index 0000000..ec0fe67
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/transport.go
@@ -0,0 +1,730 @@
1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19// Package transport defines and implements message oriented communication
20// channel to complete various transactions (e.g., an RPC).
21package transport // import "google.golang.org/grpc/transport"
22
23import (
24 "fmt"
25 "io"
26 "net"
27 "sync"
28
29 "golang.org/x/net/context"
30 "golang.org/x/net/http2"
31 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/credentials"
33 "google.golang.org/grpc/keepalive"
34 "google.golang.org/grpc/metadata"
35 "google.golang.org/grpc/stats"
36 "google.golang.org/grpc/status"
37 "google.golang.org/grpc/tap"
38)
39
40// recvMsg represents the received msg from the transport. All transport
41// protocol specific info has been removed.
42type recvMsg struct {
43 data []byte
44 // nil: received some data
45 // io.EOF: stream is completed. data is nil.
46 // other non-nil error: transport failure. data is nil.
47 err error
48}
49
50// recvBuffer is an unbounded channel of recvMsg structs.
51// Note recvBuffer differs from controlBuffer only in that recvBuffer
52// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
53// recvBuffer is written to much more often than
54// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
55type recvBuffer struct {
56 c chan recvMsg
57 mu sync.Mutex
58 backlog []recvMsg
59}
60
61func newRecvBuffer() *recvBuffer {
62 b := &recvBuffer{
63 c: make(chan recvMsg, 1),
64 }
65 return b
66}
67
68func (b *recvBuffer) put(r recvMsg) {
69 b.mu.Lock()
70 defer b.mu.Unlock()
71 if len(b.backlog) == 0 {
72 select {
73 case b.c <- r:
74 return
75 default:
76 }
77 }
78 b.backlog = append(b.backlog, r)
79}
80
81func (b *recvBuffer) load() {
82 b.mu.Lock()
83 defer b.mu.Unlock()
84 if len(b.backlog) > 0 {
85 select {
86 case b.c <- b.backlog[0]:
87 b.backlog[0] = recvMsg{}
88 b.backlog = b.backlog[1:]
89 default:
90 }
91 }
92}
93
94// get returns the channel that receives a recvMsg in the buffer.
95//
96// Upon receipt of a recvMsg, the caller should call load to send another
97// recvMsg onto the channel if there is any.
98func (b *recvBuffer) get() <-chan recvMsg {
99 return b.c
100}
101
102// recvBufferReader implements io.Reader interface to read the data from
103// recvBuffer.
104type recvBufferReader struct {
105 ctx context.Context
106 goAway chan struct{}
107 recv *recvBuffer
108 last []byte // Stores the remaining data in the previous calls.
109 err error
110}
111
112// Read reads the next len(p) bytes from last. If last is drained, it tries to
113// read additional data from recv. It blocks if there no additional data available
114// in recv. If Read returns any non-nil error, it will continue to return that error.
115func (r *recvBufferReader) Read(p []byte) (n int, err error) {
116 if r.err != nil {
117 return 0, r.err
118 }
119 n, r.err = r.read(p)
120 return n, r.err
121}
122
123func (r *recvBufferReader) read(p []byte) (n int, err error) {
124 if r.last != nil && len(r.last) > 0 {
125 // Read remaining data left in last call.
126 copied := copy(p, r.last)
127 r.last = r.last[copied:]
128 return copied, nil
129 }
130 select {
131 case <-r.ctx.Done():
132 return 0, ContextErr(r.ctx.Err())
133 case <-r.goAway:
134 return 0, ErrStreamDrain
135 case m := <-r.recv.get():
136 r.recv.load()
137 if m.err != nil {
138 return 0, m.err
139 }
140 copied := copy(p, m.data)
141 r.last = m.data[copied:]
142 return copied, nil
143 }
144}
145
146// All items in an out of a controlBuffer should be the same type.
147type item interface {
148 item()
149}
150
151// controlBuffer is an unbounded channel of item.
152type controlBuffer struct {
153 c chan item
154 mu sync.Mutex
155 backlog []item
156}
157
158func newControlBuffer() *controlBuffer {
159 b := &controlBuffer{
160 c: make(chan item, 1),
161 }
162 return b
163}
164
165func (b *controlBuffer) put(r item) {
166 b.mu.Lock()
167 defer b.mu.Unlock()
168 if len(b.backlog) == 0 {
169 select {
170 case b.c <- r:
171 return
172 default:
173 }
174 }
175 b.backlog = append(b.backlog, r)
176}
177
178func (b *controlBuffer) load() {
179 b.mu.Lock()
180 defer b.mu.Unlock()
181 if len(b.backlog) > 0 {
182 select {
183 case b.c <- b.backlog[0]:
184 b.backlog[0] = nil
185 b.backlog = b.backlog[1:]
186 default:
187 }
188 }
189}
190
191// get returns the channel that receives an item in the buffer.
192//
193// Upon receipt of an item, the caller should call load to send another
194// item onto the channel if there is any.
195func (b *controlBuffer) get() <-chan item {
196 return b.c
197}
198
199type streamState uint8
200
201const (
202 streamActive streamState = iota
203 streamWriteDone // EndStream sent
204 streamReadDone // EndStream received
205 streamDone // the entire stream is finished.
206)
207
208// Stream represents an RPC in the transport layer.
209type Stream struct {
210 id uint32
211 // nil for client side Stream.
212 st ServerTransport
213 // ctx is the associated context of the stream.
214 ctx context.Context
215 // cancel is always nil for client side Stream.
216 cancel context.CancelFunc
217 // done is closed when the final status arrives.
218 done chan struct{}
219 // goAway is closed when the server sent GoAways signal before this stream was initiated.
220 goAway chan struct{}
221 // method records the associated RPC method of the stream.
222 method string
223 recvCompress string
224 sendCompress string
225 buf *recvBuffer
226 trReader io.Reader
227 fc *inFlow
228 recvQuota uint32
229
230 // TODO: Remote this unused variable.
231 // The accumulated inbound quota pending for window update.
232 updateQuota uint32
233
234 // Callback to state application's intentions to read data. This
235 // is used to adjust flow control, if need be.
236 requestRead func(int)
237
238 sendQuotaPool *quotaPool
239 // Close headerChan to indicate the end of reception of header metadata.
240 headerChan chan struct{}
241 // header caches the received header metadata.
242 header metadata.MD
243 // The key-value map of trailer metadata.
244 trailer metadata.MD
245
246 mu sync.RWMutex // guard the following
247 // headerOK becomes true from the first header is about to send.
248 headerOk bool
249 state streamState
250 // true iff headerChan is closed. Used to avoid closing headerChan
251 // multiple times.
252 headerDone bool
253 // the status error received from the server.
254 status *status.Status
255 // rstStream indicates whether a RST_STREAM frame needs to be sent
256 // to the server to signify that this stream is closing.
257 rstStream bool
258 // rstError is the error that needs to be sent along with the RST_STREAM frame.
259 rstError http2.ErrCode
260 // bytesSent and bytesReceived indicates whether any bytes have been sent or
261 // received on this stream.
262 bytesSent bool
263 bytesReceived bool
264}
265
266// RecvCompress returns the compression algorithm applied to the inbound
267// message. It is empty string if there is no compression applied.
268func (s *Stream) RecvCompress() string {
269 return s.recvCompress
270}
271
272// SetSendCompress sets the compression algorithm to the stream.
273func (s *Stream) SetSendCompress(str string) {
274 s.sendCompress = str
275}
276
277// Done returns a chanel which is closed when it receives the final status
278// from the server.
279func (s *Stream) Done() <-chan struct{} {
280 return s.done
281}
282
283// GoAway returns a channel which is closed when the server sent GoAways signal
284// before this stream was initiated.
285func (s *Stream) GoAway() <-chan struct{} {
286 return s.goAway
287}
288
289// Header acquires the key-value pairs of header metadata once it
290// is available. It blocks until i) the metadata is ready or ii) there is no
291// header metadata or iii) the stream is canceled/expired.
292func (s *Stream) Header() (metadata.MD, error) {
293 var err error
294 select {
295 case <-s.ctx.Done():
296 err = ContextErr(s.ctx.Err())
297 case <-s.goAway:
298 err = ErrStreamDrain
299 case <-s.headerChan:
300 return s.header.Copy(), nil
301 }
302 // Even if the stream is closed, header is returned if available.
303 select {
304 case <-s.headerChan:
305 return s.header.Copy(), nil
306 default:
307 }
308 return nil, err
309}
310
311// Trailer returns the cached trailer metedata. Note that if it is not called
312// after the entire stream is done, it could return an empty MD. Client
313// side only.
314func (s *Stream) Trailer() metadata.MD {
315 s.mu.RLock()
316 defer s.mu.RUnlock()
317 return s.trailer.Copy()
318}
319
320// ServerTransport returns the underlying ServerTransport for the stream.
321// The client side stream always returns nil.
322func (s *Stream) ServerTransport() ServerTransport {
323 return s.st
324}
325
326// Context returns the context of the stream.
327func (s *Stream) Context() context.Context {
328 return s.ctx
329}
330
331// Method returns the method for the stream.
332func (s *Stream) Method() string {
333 return s.method
334}
335
336// Status returns the status received from the server.
337func (s *Stream) Status() *status.Status {
338 return s.status
339}
340
341// SetHeader sets the header metadata. This can be called multiple times.
342// Server side only.
343func (s *Stream) SetHeader(md metadata.MD) error {
344 s.mu.Lock()
345 defer s.mu.Unlock()
346 if s.headerOk || s.state == streamDone {
347 return ErrIllegalHeaderWrite
348 }
349 if md.Len() == 0 {
350 return nil
351 }
352 s.header = metadata.Join(s.header, md)
353 return nil
354}
355
356// SetTrailer sets the trailer metadata which will be sent with the RPC status
357// by the server. This can be called multiple times. Server side only.
358func (s *Stream) SetTrailer(md metadata.MD) error {
359 if md.Len() == 0 {
360 return nil
361 }
362 s.mu.Lock()
363 defer s.mu.Unlock()
364 s.trailer = metadata.Join(s.trailer, md)
365 return nil
366}
367
368func (s *Stream) write(m recvMsg) {
369 s.buf.put(m)
370}
371
372// Read reads all p bytes from the wire for this stream.
373func (s *Stream) Read(p []byte) (n int, err error) {
374 // Don't request a read if there was an error earlier
375 if er := s.trReader.(*transportReader).er; er != nil {
376 return 0, er
377 }
378 s.requestRead(len(p))
379 return io.ReadFull(s.trReader, p)
380}
381
382// tranportReader reads all the data available for this Stream from the transport and
383// passes them into the decoder, which converts them into a gRPC message stream.
384// The error is io.EOF when the stream is done or another non-nil error if
385// the stream broke.
386type transportReader struct {
387 reader io.Reader
388 // The handler to control the window update procedure for both this
389 // particular stream and the associated transport.
390 windowHandler func(int)
391 er error
392}
393
394func (t *transportReader) Read(p []byte) (n int, err error) {
395 n, err = t.reader.Read(p)
396 if err != nil {
397 t.er = err
398 return
399 }
400 t.windowHandler(n)
401 return
402}
403
404// finish sets the stream's state and status, and closes the done channel.
405// s.mu must be held by the caller. st must always be non-nil.
406func (s *Stream) finish(st *status.Status) {
407 s.status = st
408 s.state = streamDone
409 close(s.done)
410}
411
412// BytesSent indicates whether any bytes have been sent on this stream.
413func (s *Stream) BytesSent() bool {
414 s.mu.Lock()
415 defer s.mu.Unlock()
416 return s.bytesSent
417}
418
419// BytesReceived indicates whether any bytes have been received on this stream.
420func (s *Stream) BytesReceived() bool {
421 s.mu.Lock()
422 defer s.mu.Unlock()
423 return s.bytesReceived
424}
425
426// GoString is implemented by Stream so context.String() won't
427// race when printing %#v.
428func (s *Stream) GoString() string {
429 return fmt.Sprintf("<stream: %p, %v>", s, s.method)
430}
431
432// The key to save transport.Stream in the context.
433type streamKey struct{}
434
435// newContextWithStream creates a new context from ctx and attaches stream
436// to it.
437func newContextWithStream(ctx context.Context, stream *Stream) context.Context {
438 return context.WithValue(ctx, streamKey{}, stream)
439}
440
441// StreamFromContext returns the stream saved in ctx.
442func StreamFromContext(ctx context.Context) (s *Stream, ok bool) {
443 s, ok = ctx.Value(streamKey{}).(*Stream)
444 return
445}
446
447// state of transport
448type transportState int
449
450const (
451 reachable transportState = iota
452 unreachable
453 closing
454 draining
455)
456
457// ServerConfig consists of all the configurations to establish a server transport.
458type ServerConfig struct {
459 MaxStreams uint32
460 AuthInfo credentials.AuthInfo
461 InTapHandle tap.ServerInHandle
462 StatsHandler stats.Handler
463 KeepaliveParams keepalive.ServerParameters
464 KeepalivePolicy keepalive.EnforcementPolicy
465 InitialWindowSize int32
466 InitialConnWindowSize int32
467}
468
469// NewServerTransport creates a ServerTransport with conn or non-nil error
470// if it fails.
471func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
472 return newHTTP2Server(conn, config)
473}
474
475// ConnectOptions covers all relevant options for communicating with the server.
476type ConnectOptions struct {
477 // UserAgent is the application user agent.
478 UserAgent string
479 // Authority is the :authority pseudo-header to use. This field has no effect if
480 // TransportCredentials is set.
481 Authority string
482 // Dialer specifies how to dial a network address.
483 Dialer func(context.Context, string) (net.Conn, error)
484 // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
485 FailOnNonTempDialError bool
486 // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
487 PerRPCCredentials []credentials.PerRPCCredentials
488 // TransportCredentials stores the Authenticator required to setup a client connection.
489 TransportCredentials credentials.TransportCredentials
490 // KeepaliveParams stores the keepalive parameters.
491 KeepaliveParams keepalive.ClientParameters
492 // StatsHandler stores the handler for stats.
493 StatsHandler stats.Handler
494 // InitialWindowSize sets the intial window size for a stream.
495 InitialWindowSize int32
496 // InitialConnWindowSize sets the intial window size for a connection.
497 InitialConnWindowSize int32
498}
499
500// TargetInfo contains the information of the target such as network address and metadata.
501type TargetInfo struct {
502 Addr string
503 Metadata interface{}
504}
505
506// NewClientTransport establishes the transport with the required ConnectOptions
507// and returns it to the caller.
508func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions) (ClientTransport, error) {
509 return newHTTP2Client(ctx, target, opts)
510}
511
512// Options provides additional hints and information for message
513// transmission.
514type Options struct {
515 // Last indicates whether this write is the last piece for
516 // this stream.
517 Last bool
518
519 // Delay is a hint to the transport implementation for whether
520 // the data could be buffered for a batching write. The
521 // Transport implementation may ignore the hint.
522 Delay bool
523}
524
525// CallHdr carries the information of a particular RPC.
526type CallHdr struct {
527 // Host specifies the peer's host.
528 Host string
529
530 // Method specifies the operation to perform.
531 Method string
532
533 // RecvCompress specifies the compression algorithm applied on
534 // inbound messages.
535 RecvCompress string
536
537 // SendCompress specifies the compression algorithm applied on
538 // outbound message.
539 SendCompress string
540
541 // Creds specifies credentials.PerRPCCredentials for a call.
542 Creds credentials.PerRPCCredentials
543
544 // Flush indicates whether a new stream command should be sent
545 // to the peer without waiting for the first data. This is
546 // only a hint.
547 // If it's true, the transport may modify the flush decision
548 // for performance purposes.
549 // If it's false, new stream will never be flushed.
550 Flush bool
551}
552
553// ClientTransport is the common interface for all gRPC client-side transport
554// implementations.
555type ClientTransport interface {
556 // Close tears down this transport. Once it returns, the transport
557 // should not be accessed any more. The caller must make sure this
558 // is called only once.
559 Close() error
560
561 // GracefulClose starts to tear down the transport. It stops accepting
562 // new RPCs and wait the completion of the pending RPCs.
563 GracefulClose() error
564
565 // Write sends the data for the given stream. A nil stream indicates
566 // the write is to be performed on the transport as a whole.
567 Write(s *Stream, data []byte, opts *Options) error
568
569 // NewStream creates a Stream for an RPC.
570 NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
571
572 // CloseStream clears the footprint of a stream when the stream is
573 // not needed any more. The err indicates the error incurred when
574 // CloseStream is called. Must be called when a stream is finished
575 // unless the associated transport is closing.
576 CloseStream(stream *Stream, err error)
577
578 // Error returns a channel that is closed when some I/O error
579 // happens. Typically the caller should have a goroutine to monitor
580 // this in order to take action (e.g., close the current transport
581 // and create a new one) in error case. It should not return nil
582 // once the transport is initiated.
583 Error() <-chan struct{}
584
585 // GoAway returns a channel that is closed when ClientTransport
586 // receives the draining signal from the server (e.g., GOAWAY frame in
587 // HTTP/2).
588 GoAway() <-chan struct{}
589
590 // GetGoAwayReason returns the reason why GoAway frame was received.
591 GetGoAwayReason() GoAwayReason
592}
593
594// ServerTransport is the common interface for all gRPC server-side transport
595// implementations.
596//
597// Methods may be called concurrently from multiple goroutines, but
598// Write methods for a given Stream will be called serially.
599type ServerTransport interface {
600 // HandleStreams receives incoming streams using the given handler.
601 HandleStreams(func(*Stream), func(context.Context, string) context.Context)
602
603 // WriteHeader sends the header metadata for the given stream.
604 // WriteHeader may not be called on all streams.
605 WriteHeader(s *Stream, md metadata.MD) error
606
607 // Write sends the data for the given stream.
608 // Write may not be called on all streams.
609 Write(s *Stream, data []byte, opts *Options) error
610
611 // WriteStatus sends the status of a stream to the client. WriteStatus is
612 // the final call made on a stream and always occurs.
613 WriteStatus(s *Stream, st *status.Status) error
614
615 // Close tears down the transport. Once it is called, the transport
616 // should not be accessed any more. All the pending streams and their
617 // handlers will be terminated asynchronously.
618 Close() error
619
620 // RemoteAddr returns the remote network address.
621 RemoteAddr() net.Addr
622
623 // Drain notifies the client this ServerTransport stops accepting new RPCs.
624 Drain()
625}
626
627// streamErrorf creates an StreamError with the specified error code and description.
628func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
629 return StreamError{
630 Code: c,
631 Desc: fmt.Sprintf(format, a...),
632 }
633}
634
635// connectionErrorf creates an ConnectionError with the specified error description.
636func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
637 return ConnectionError{
638 Desc: fmt.Sprintf(format, a...),
639 temp: temp,
640 err: e,
641 }
642}
643
644// ConnectionError is an error that results in the termination of the
645// entire connection and the retry of all the active streams.
646type ConnectionError struct {
647 Desc string
648 temp bool
649 err error
650}
651
652func (e ConnectionError) Error() string {
653 return fmt.Sprintf("connection error: desc = %q", e.Desc)
654}
655
656// Temporary indicates if this connection error is temporary or fatal.
657func (e ConnectionError) Temporary() bool {
658 return e.temp
659}
660
661// Origin returns the original error of this connection error.
662func (e ConnectionError) Origin() error {
663 // Never return nil error here.
664 // If the original error is nil, return itself.
665 if e.err == nil {
666 return e
667 }
668 return e.err
669}
670
671var (
672 // ErrConnClosing indicates that the transport is closing.
673 ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
674 // ErrStreamDrain indicates that the stream is rejected by the server because
675 // the server stops accepting new RPCs.
676 ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
677)
678
679// TODO: See if we can replace StreamError with status package errors.
680
681// StreamError is an error that only affects one stream within a connection.
682type StreamError struct {
683 Code codes.Code
684 Desc string
685}
686
687func (e StreamError) Error() string {
688 return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
689}
690
691// wait blocks until it can receive from ctx.Done, closing, or proceed.
692// If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err.
693// If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise
694// it return the StreamError for ctx.Err.
695// If it receives from goAway, it returns 0, ErrStreamDrain.
696// If it receives from closing, it returns 0, ErrConnClosing.
697// If it receives from proceed, it returns the received integer, nil.
698func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-chan int) (int, error) {
699 select {
700 case <-ctx.Done():
701 return 0, ContextErr(ctx.Err())
702 case <-done:
703 // User cancellation has precedence.
704 select {
705 case <-ctx.Done():
706 return 0, ContextErr(ctx.Err())
707 default:
708 }
709 return 0, io.EOF
710 case <-goAway:
711 return 0, ErrStreamDrain
712 case <-closing:
713 return 0, ErrConnClosing
714 case i := <-proceed:
715 return i, nil
716 }
717}
718
719// GoAwayReason contains the reason for the GoAway frame received.
720type GoAwayReason uint8
721
722const (
723 // Invalid indicates that no GoAway frame is received.
724 Invalid GoAwayReason = 0
725 // NoReason is the default value when GoAway frame is received.
726 NoReason GoAwayReason = 1
727 // TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm
728 // was recieved and that the debug data said "too_many_pings".
729 TooManyPings GoAwayReason = 2
730)