diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/transport')
-rw-r--r-- | vendor/google.golang.org/grpc/transport/bdp_estimator.go | 143 | ||||
-rw-r--r-- | vendor/google.golang.org/grpc/transport/control.go | 246 | ||||
-rw-r--r-- | vendor/google.golang.org/grpc/transport/go16.go | 45 | ||||
-rw-r--r-- | vendor/google.golang.org/grpc/transport/go17.go | 46 | ||||
-rw-r--r-- | vendor/google.golang.org/grpc/transport/handler_server.go | 393 | ||||
-rw-r--r-- | vendor/google.golang.org/grpc/transport/http2_client.go | 1369 | ||||
-rw-r--r-- | vendor/google.golang.org/grpc/transport/http2_server.go | 1195 | ||||
-rw-r--r-- | vendor/google.golang.org/grpc/transport/http_util.go | 597 | ||||
-rw-r--r-- | vendor/google.golang.org/grpc/transport/log.go | 50 | ||||
-rw-r--r-- | vendor/google.golang.org/grpc/transport/transport.go | 730 |
10 files changed, 4814 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/transport/bdp_estimator.go b/vendor/google.golang.org/grpc/transport/bdp_estimator.go new file mode 100644 index 0000000..667edb8 --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/bdp_estimator.go | |||
@@ -0,0 +1,143 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2017 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package transport | ||
20 | |||
21 | import ( | ||
22 | "sync" | ||
23 | "time" | ||
24 | ) | ||
25 | |||
26 | const ( | ||
27 | // bdpLimit is the maximum value the flow control windows | ||
28 | // will be increased to. | ||
29 | bdpLimit = (1 << 20) * 4 | ||
30 | // alpha is a constant factor used to keep a moving average | ||
31 | // of RTTs. | ||
32 | alpha = 0.9 | ||
33 | // If the current bdp sample is greater than or equal to | ||
34 | // our beta * our estimated bdp and the current bandwidth | ||
35 | // sample is the maximum bandwidth observed so far, we | ||
36 | // increase our bbp estimate by a factor of gamma. | ||
37 | beta = 0.66 | ||
38 | // To put our bdp to be smaller than or equal to twice the real BDP, | ||
39 | // we should multiply our current sample with 4/3, however to round things out | ||
40 | // we use 2 as the multiplication factor. | ||
41 | gamma = 2 | ||
42 | ) | ||
43 | |||
44 | var ( | ||
45 | // Adding arbitrary data to ping so that its ack can be | ||
46 | // identified. | ||
47 | // Easter-egg: what does the ping message say? | ||
48 | bdpPing = &ping{data: [8]byte{2, 4, 16, 16, 9, 14, 7, 7}} | ||
49 | ) | ||
50 | |||
51 | type bdpEstimator struct { | ||
52 | // sentAt is the time when the ping was sent. | ||
53 | sentAt time.Time | ||
54 | |||
55 | mu sync.Mutex | ||
56 | // bdp is the current bdp estimate. | ||
57 | bdp uint32 | ||
58 | // sample is the number of bytes received in one measurement cycle. | ||
59 | sample uint32 | ||
60 | // bwMax is the maximum bandwidth noted so far (bytes/sec). | ||
61 | bwMax float64 | ||
62 | // bool to keep track of the begining of a new measurement cycle. | ||
63 | isSent bool | ||
64 | // Callback to update the window sizes. | ||
65 | updateFlowControl func(n uint32) | ||
66 | // sampleCount is the number of samples taken so far. | ||
67 | sampleCount uint64 | ||
68 | // round trip time (seconds) | ||
69 | rtt float64 | ||
70 | } | ||
71 | |||
72 | // timesnap registers the time bdp ping was sent out so that | ||
73 | // network rtt can be calculated when its ack is recieved. | ||
74 | // It is called (by controller) when the bdpPing is | ||
75 | // being written on the wire. | ||
76 | func (b *bdpEstimator) timesnap(d [8]byte) { | ||
77 | if bdpPing.data != d { | ||
78 | return | ||
79 | } | ||
80 | b.sentAt = time.Now() | ||
81 | } | ||
82 | |||
83 | // add adds bytes to the current sample for calculating bdp. | ||
84 | // It returns true only if a ping must be sent. This can be used | ||
85 | // by the caller (handleData) to make decision about batching | ||
86 | // a window update with it. | ||
87 | func (b *bdpEstimator) add(n uint32) bool { | ||
88 | b.mu.Lock() | ||
89 | defer b.mu.Unlock() | ||
90 | if b.bdp == bdpLimit { | ||
91 | return false | ||
92 | } | ||
93 | if !b.isSent { | ||
94 | b.isSent = true | ||
95 | b.sample = n | ||
96 | b.sentAt = time.Time{} | ||
97 | b.sampleCount++ | ||
98 | return true | ||
99 | } | ||
100 | b.sample += n | ||
101 | return false | ||
102 | } | ||
103 | |||
104 | // calculate is called when an ack for a bdp ping is received. | ||
105 | // Here we calculate the current bdp and bandwidth sample and | ||
106 | // decide if the flow control windows should go up. | ||
107 | func (b *bdpEstimator) calculate(d [8]byte) { | ||
108 | // Check if the ping acked for was the bdp ping. | ||
109 | if bdpPing.data != d { | ||
110 | return | ||
111 | } | ||
112 | b.mu.Lock() | ||
113 | rttSample := time.Since(b.sentAt).Seconds() | ||
114 | if b.sampleCount < 10 { | ||
115 | // Bootstrap rtt with an average of first 10 rtt samples. | ||
116 | b.rtt += (rttSample - b.rtt) / float64(b.sampleCount) | ||
117 | } else { | ||
118 | // Heed to the recent past more. | ||
119 | b.rtt += (rttSample - b.rtt) * float64(alpha) | ||
120 | } | ||
121 | b.isSent = false | ||
122 | // The number of bytes accumalated so far in the sample is smaller | ||
123 | // than or equal to 1.5 times the real BDP on a saturated connection. | ||
124 | bwCurrent := float64(b.sample) / (b.rtt * float64(1.5)) | ||
125 | if bwCurrent > b.bwMax { | ||
126 | b.bwMax = bwCurrent | ||
127 | } | ||
128 | // If the current sample (which is smaller than or equal to the 1.5 times the real BDP) is | ||
129 | // greater than or equal to 2/3rd our perceived bdp AND this is the maximum bandwidth seen so far, we | ||
130 | // should update our perception of the network BDP. | ||
131 | if float64(b.sample) >= beta*float64(b.bdp) && bwCurrent == b.bwMax && b.bdp != bdpLimit { | ||
132 | sampleFloat := float64(b.sample) | ||
133 | b.bdp = uint32(gamma * sampleFloat) | ||
134 | if b.bdp > bdpLimit { | ||
135 | b.bdp = bdpLimit | ||
136 | } | ||
137 | bdp := b.bdp | ||
138 | b.mu.Unlock() | ||
139 | b.updateFlowControl(bdp) | ||
140 | return | ||
141 | } | ||
142 | b.mu.Unlock() | ||
143 | } | ||
diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go new file mode 100644 index 0000000..501eb03 --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/control.go | |||
@@ -0,0 +1,246 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package transport | ||
20 | |||
21 | import ( | ||
22 | "fmt" | ||
23 | "math" | ||
24 | "sync" | ||
25 | "time" | ||
26 | |||
27 | "golang.org/x/net/http2" | ||
28 | ) | ||
29 | |||
30 | const ( | ||
31 | // The default value of flow control window size in HTTP2 spec. | ||
32 | defaultWindowSize = 65535 | ||
33 | // The initial window size for flow control. | ||
34 | initialWindowSize = defaultWindowSize // for an RPC | ||
35 | infinity = time.Duration(math.MaxInt64) | ||
36 | defaultClientKeepaliveTime = infinity | ||
37 | defaultClientKeepaliveTimeout = time.Duration(20 * time.Second) | ||
38 | defaultMaxStreamsClient = 100 | ||
39 | defaultMaxConnectionIdle = infinity | ||
40 | defaultMaxConnectionAge = infinity | ||
41 | defaultMaxConnectionAgeGrace = infinity | ||
42 | defaultServerKeepaliveTime = time.Duration(2 * time.Hour) | ||
43 | defaultServerKeepaliveTimeout = time.Duration(20 * time.Second) | ||
44 | defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute) | ||
45 | // max window limit set by HTTP2 Specs. | ||
46 | maxWindowSize = math.MaxInt32 | ||
47 | ) | ||
48 | |||
49 | // The following defines various control items which could flow through | ||
50 | // the control buffer of transport. They represent different aspects of | ||
51 | // control tasks, e.g., flow control, settings, streaming resetting, etc. | ||
52 | type windowUpdate struct { | ||
53 | streamID uint32 | ||
54 | increment uint32 | ||
55 | flush bool | ||
56 | } | ||
57 | |||
58 | func (*windowUpdate) item() {} | ||
59 | |||
60 | type settings struct { | ||
61 | ack bool | ||
62 | ss []http2.Setting | ||
63 | } | ||
64 | |||
65 | func (*settings) item() {} | ||
66 | |||
67 | type resetStream struct { | ||
68 | streamID uint32 | ||
69 | code http2.ErrCode | ||
70 | } | ||
71 | |||
72 | func (*resetStream) item() {} | ||
73 | |||
74 | type goAway struct { | ||
75 | code http2.ErrCode | ||
76 | debugData []byte | ||
77 | headsUp bool | ||
78 | closeConn bool | ||
79 | } | ||
80 | |||
81 | func (*goAway) item() {} | ||
82 | |||
83 | type flushIO struct { | ||
84 | } | ||
85 | |||
86 | func (*flushIO) item() {} | ||
87 | |||
88 | type ping struct { | ||
89 | ack bool | ||
90 | data [8]byte | ||
91 | } | ||
92 | |||
93 | func (*ping) item() {} | ||
94 | |||
95 | // quotaPool is a pool which accumulates the quota and sends it to acquire() | ||
96 | // when it is available. | ||
97 | type quotaPool struct { | ||
98 | c chan int | ||
99 | |||
100 | mu sync.Mutex | ||
101 | quota int | ||
102 | } | ||
103 | |||
104 | // newQuotaPool creates a quotaPool which has quota q available to consume. | ||
105 | func newQuotaPool(q int) *quotaPool { | ||
106 | qb := "aPool{ | ||
107 | c: make(chan int, 1), | ||
108 | } | ||
109 | if q > 0 { | ||
110 | qb.c <- q | ||
111 | } else { | ||
112 | qb.quota = q | ||
113 | } | ||
114 | return qb | ||
115 | } | ||
116 | |||
117 | // add cancels the pending quota sent on acquired, incremented by v and sends | ||
118 | // it back on acquire. | ||
119 | func (qb *quotaPool) add(v int) { | ||
120 | qb.mu.Lock() | ||
121 | defer qb.mu.Unlock() | ||
122 | select { | ||
123 | case n := <-qb.c: | ||
124 | qb.quota += n | ||
125 | default: | ||
126 | } | ||
127 | qb.quota += v | ||
128 | if qb.quota <= 0 { | ||
129 | return | ||
130 | } | ||
131 | // After the pool has been created, this is the only place that sends on | ||
132 | // the channel. Since mu is held at this point and any quota that was sent | ||
133 | // on the channel has been retrieved, we know that this code will always | ||
134 | // place any positive quota value on the channel. | ||
135 | select { | ||
136 | case qb.c <- qb.quota: | ||
137 | qb.quota = 0 | ||
138 | default: | ||
139 | } | ||
140 | } | ||
141 | |||
142 | // acquire returns the channel on which available quota amounts are sent. | ||
143 | func (qb *quotaPool) acquire() <-chan int { | ||
144 | return qb.c | ||
145 | } | ||
146 | |||
147 | // inFlow deals with inbound flow control | ||
148 | type inFlow struct { | ||
149 | mu sync.Mutex | ||
150 | // The inbound flow control limit for pending data. | ||
151 | limit uint32 | ||
152 | // pendingData is the overall data which have been received but not been | ||
153 | // consumed by applications. | ||
154 | pendingData uint32 | ||
155 | // The amount of data the application has consumed but grpc has not sent | ||
156 | // window update for them. Used to reduce window update frequency. | ||
157 | pendingUpdate uint32 | ||
158 | // delta is the extra window update given by receiver when an application | ||
159 | // is reading data bigger in size than the inFlow limit. | ||
160 | delta uint32 | ||
161 | } | ||
162 | |||
163 | // newLimit updates the inflow window to a new value n. | ||
164 | // It assumes that n is always greater than the old limit. | ||
165 | func (f *inFlow) newLimit(n uint32) uint32 { | ||
166 | f.mu.Lock() | ||
167 | defer f.mu.Unlock() | ||
168 | d := n - f.limit | ||
169 | f.limit = n | ||
170 | return d | ||
171 | } | ||
172 | |||
173 | func (f *inFlow) maybeAdjust(n uint32) uint32 { | ||
174 | if n > uint32(math.MaxInt32) { | ||
175 | n = uint32(math.MaxInt32) | ||
176 | } | ||
177 | f.mu.Lock() | ||
178 | defer f.mu.Unlock() | ||
179 | // estSenderQuota is the receiver's view of the maximum number of bytes the sender | ||
180 | // can send without a window update. | ||
181 | estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate)) | ||
182 | // estUntransmittedData is the maximum number of bytes the sends might not have put | ||
183 | // on the wire yet. A value of 0 or less means that we have already received all or | ||
184 | // more bytes than the application is requesting to read. | ||
185 | estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative. | ||
186 | // This implies that unless we send a window update, the sender won't be able to send all the bytes | ||
187 | // for this message. Therefore we must send an update over the limit since there's an active read | ||
188 | // request from the application. | ||
189 | if estUntransmittedData > estSenderQuota { | ||
190 | // Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec. | ||
191 | if f.limit+n > maxWindowSize { | ||
192 | f.delta = maxWindowSize - f.limit | ||
193 | } else { | ||
194 | // Send a window update for the whole message and not just the difference between | ||
195 | // estUntransmittedData and estSenderQuota. This will be helpful in case the message | ||
196 | // is padded; We will fallback on the current available window(at least a 1/4th of the limit). | ||
197 | f.delta = n | ||
198 | } | ||
199 | return f.delta | ||
200 | } | ||
201 | return 0 | ||
202 | } | ||
203 | |||
204 | // onData is invoked when some data frame is received. It updates pendingData. | ||
205 | func (f *inFlow) onData(n uint32) error { | ||
206 | f.mu.Lock() | ||
207 | defer f.mu.Unlock() | ||
208 | f.pendingData += n | ||
209 | if f.pendingData+f.pendingUpdate > f.limit+f.delta { | ||
210 | return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit) | ||
211 | } | ||
212 | return nil | ||
213 | } | ||
214 | |||
215 | // onRead is invoked when the application reads the data. It returns the window size | ||
216 | // to be sent to the peer. | ||
217 | func (f *inFlow) onRead(n uint32) uint32 { | ||
218 | f.mu.Lock() | ||
219 | defer f.mu.Unlock() | ||
220 | if f.pendingData == 0 { | ||
221 | return 0 | ||
222 | } | ||
223 | f.pendingData -= n | ||
224 | if n > f.delta { | ||
225 | n -= f.delta | ||
226 | f.delta = 0 | ||
227 | } else { | ||
228 | f.delta -= n | ||
229 | n = 0 | ||
230 | } | ||
231 | f.pendingUpdate += n | ||
232 | if f.pendingUpdate >= f.limit/4 { | ||
233 | wu := f.pendingUpdate | ||
234 | f.pendingUpdate = 0 | ||
235 | return wu | ||
236 | } | ||
237 | return 0 | ||
238 | } | ||
239 | |||
240 | func (f *inFlow) resetPendingUpdate() uint32 { | ||
241 | f.mu.Lock() | ||
242 | defer f.mu.Unlock() | ||
243 | n := f.pendingUpdate | ||
244 | f.pendingUpdate = 0 | ||
245 | return n | ||
246 | } | ||
diff --git a/vendor/google.golang.org/grpc/transport/go16.go b/vendor/google.golang.org/grpc/transport/go16.go new file mode 100644 index 0000000..7cffee1 --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/go16.go | |||
@@ -0,0 +1,45 @@ | |||
1 | // +build go1.6,!go1.7 | ||
2 | |||
3 | /* | ||
4 | * | ||
5 | * Copyright 2016 gRPC authors. | ||
6 | * | ||
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
8 | * you may not use this file except in compliance with the License. | ||
9 | * You may obtain a copy of the License at | ||
10 | * | ||
11 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
12 | * | ||
13 | * Unless required by applicable law or agreed to in writing, software | ||
14 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
16 | * See the License for the specific language governing permissions and | ||
17 | * limitations under the License. | ||
18 | * | ||
19 | */ | ||
20 | |||
21 | package transport | ||
22 | |||
23 | import ( | ||
24 | "net" | ||
25 | |||
26 | "google.golang.org/grpc/codes" | ||
27 | |||
28 | "golang.org/x/net/context" | ||
29 | ) | ||
30 | |||
31 | // dialContext connects to the address on the named network. | ||
32 | func dialContext(ctx context.Context, network, address string) (net.Conn, error) { | ||
33 | return (&net.Dialer{Cancel: ctx.Done()}).Dial(network, address) | ||
34 | } | ||
35 | |||
36 | // ContextErr converts the error from context package into a StreamError. | ||
37 | func ContextErr(err error) StreamError { | ||
38 | switch err { | ||
39 | case context.DeadlineExceeded: | ||
40 | return streamErrorf(codes.DeadlineExceeded, "%v", err) | ||
41 | case context.Canceled: | ||
42 | return streamErrorf(codes.Canceled, "%v", err) | ||
43 | } | ||
44 | return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err) | ||
45 | } | ||
diff --git a/vendor/google.golang.org/grpc/transport/go17.go b/vendor/google.golang.org/grpc/transport/go17.go new file mode 100644 index 0000000..2464e69 --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/go17.go | |||
@@ -0,0 +1,46 @@ | |||
1 | // +build go1.7 | ||
2 | |||
3 | /* | ||
4 | * | ||
5 | * Copyright 2016 gRPC authors. | ||
6 | * | ||
7 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
8 | * you may not use this file except in compliance with the License. | ||
9 | * You may obtain a copy of the License at | ||
10 | * | ||
11 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
12 | * | ||
13 | * Unless required by applicable law or agreed to in writing, software | ||
14 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
15 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
16 | * See the License for the specific language governing permissions and | ||
17 | * limitations under the License. | ||
18 | * | ||
19 | */ | ||
20 | |||
21 | package transport | ||
22 | |||
23 | import ( | ||
24 | "context" | ||
25 | "net" | ||
26 | |||
27 | "google.golang.org/grpc/codes" | ||
28 | |||
29 | netctx "golang.org/x/net/context" | ||
30 | ) | ||
31 | |||
32 | // dialContext connects to the address on the named network. | ||
33 | func dialContext(ctx context.Context, network, address string) (net.Conn, error) { | ||
34 | return (&net.Dialer{}).DialContext(ctx, network, address) | ||
35 | } | ||
36 | |||
37 | // ContextErr converts the error from context package into a StreamError. | ||
38 | func ContextErr(err error) StreamError { | ||
39 | switch err { | ||
40 | case context.DeadlineExceeded, netctx.DeadlineExceeded: | ||
41 | return streamErrorf(codes.DeadlineExceeded, "%v", err) | ||
42 | case context.Canceled, netctx.Canceled: | ||
43 | return streamErrorf(codes.Canceled, "%v", err) | ||
44 | } | ||
45 | return streamErrorf(codes.Internal, "Unexpected error from context packet: %v", err) | ||
46 | } | ||
diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go new file mode 100644 index 0000000..27372b5 --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/handler_server.go | |||
@@ -0,0 +1,393 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2016 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | // This file is the implementation of a gRPC server using HTTP/2 which | ||
20 | // uses the standard Go http2 Server implementation (via the | ||
21 | // http.Handler interface), rather than speaking low-level HTTP/2 | ||
22 | // frames itself. It is the implementation of *grpc.Server.ServeHTTP. | ||
23 | |||
24 | package transport | ||
25 | |||
26 | import ( | ||
27 | "errors" | ||
28 | "fmt" | ||
29 | "io" | ||
30 | "net" | ||
31 | "net/http" | ||
32 | "strings" | ||
33 | "sync" | ||
34 | "time" | ||
35 | |||
36 | "golang.org/x/net/context" | ||
37 | "golang.org/x/net/http2" | ||
38 | "google.golang.org/grpc/codes" | ||
39 | "google.golang.org/grpc/credentials" | ||
40 | "google.golang.org/grpc/metadata" | ||
41 | "google.golang.org/grpc/peer" | ||
42 | "google.golang.org/grpc/status" | ||
43 | ) | ||
44 | |||
45 | // NewServerHandlerTransport returns a ServerTransport handling gRPC | ||
46 | // from inside an http.Handler. It requires that the http Server | ||
47 | // supports HTTP/2. | ||
48 | func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTransport, error) { | ||
49 | if r.ProtoMajor != 2 { | ||
50 | return nil, errors.New("gRPC requires HTTP/2") | ||
51 | } | ||
52 | if r.Method != "POST" { | ||
53 | return nil, errors.New("invalid gRPC request method") | ||
54 | } | ||
55 | if !validContentType(r.Header.Get("Content-Type")) { | ||
56 | return nil, errors.New("invalid gRPC request content-type") | ||
57 | } | ||
58 | if _, ok := w.(http.Flusher); !ok { | ||
59 | return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher") | ||
60 | } | ||
61 | if _, ok := w.(http.CloseNotifier); !ok { | ||
62 | return nil, errors.New("gRPC requires a ResponseWriter supporting http.CloseNotifier") | ||
63 | } | ||
64 | |||
65 | st := &serverHandlerTransport{ | ||
66 | rw: w, | ||
67 | req: r, | ||
68 | closedCh: make(chan struct{}), | ||
69 | writes: make(chan func()), | ||
70 | } | ||
71 | |||
72 | if v := r.Header.Get("grpc-timeout"); v != "" { | ||
73 | to, err := decodeTimeout(v) | ||
74 | if err != nil { | ||
75 | return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err) | ||
76 | } | ||
77 | st.timeoutSet = true | ||
78 | st.timeout = to | ||
79 | } | ||
80 | |||
81 | var metakv []string | ||
82 | if r.Host != "" { | ||
83 | metakv = append(metakv, ":authority", r.Host) | ||
84 | } | ||
85 | for k, vv := range r.Header { | ||
86 | k = strings.ToLower(k) | ||
87 | if isReservedHeader(k) && !isWhitelistedPseudoHeader(k) { | ||
88 | continue | ||
89 | } | ||
90 | for _, v := range vv { | ||
91 | v, err := decodeMetadataHeader(k, v) | ||
92 | if err != nil { | ||
93 | return nil, streamErrorf(codes.InvalidArgument, "malformed binary metadata: %v", err) | ||
94 | } | ||
95 | metakv = append(metakv, k, v) | ||
96 | } | ||
97 | } | ||
98 | st.headerMD = metadata.Pairs(metakv...) | ||
99 | |||
100 | return st, nil | ||
101 | } | ||
102 | |||
103 | // serverHandlerTransport is an implementation of ServerTransport | ||
104 | // which replies to exactly one gRPC request (exactly one HTTP request), | ||
105 | // using the net/http.Handler interface. This http.Handler is guaranteed | ||
106 | // at this point to be speaking over HTTP/2, so it's able to speak valid | ||
107 | // gRPC. | ||
108 | type serverHandlerTransport struct { | ||
109 | rw http.ResponseWriter | ||
110 | req *http.Request | ||
111 | timeoutSet bool | ||
112 | timeout time.Duration | ||
113 | didCommonHeaders bool | ||
114 | |||
115 | headerMD metadata.MD | ||
116 | |||
117 | closeOnce sync.Once | ||
118 | closedCh chan struct{} // closed on Close | ||
119 | |||
120 | // writes is a channel of code to run serialized in the | ||
121 | // ServeHTTP (HandleStreams) goroutine. The channel is closed | ||
122 | // when WriteStatus is called. | ||
123 | writes chan func() | ||
124 | } | ||
125 | |||
126 | func (ht *serverHandlerTransport) Close() error { | ||
127 | ht.closeOnce.Do(ht.closeCloseChanOnce) | ||
128 | return nil | ||
129 | } | ||
130 | |||
131 | func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) } | ||
132 | |||
133 | func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) } | ||
134 | |||
135 | // strAddr is a net.Addr backed by either a TCP "ip:port" string, or | ||
136 | // the empty string if unknown. | ||
137 | type strAddr string | ||
138 | |||
139 | func (a strAddr) Network() string { | ||
140 | if a != "" { | ||
141 | // Per the documentation on net/http.Request.RemoteAddr, if this is | ||
142 | // set, it's set to the IP:port of the peer (hence, TCP): | ||
143 | // https://golang.org/pkg/net/http/#Request | ||
144 | // | ||
145 | // If we want to support Unix sockets later, we can | ||
146 | // add our own grpc-specific convention within the | ||
147 | // grpc codebase to set RemoteAddr to a different | ||
148 | // format, or probably better: we can attach it to the | ||
149 | // context and use that from serverHandlerTransport.RemoteAddr. | ||
150 | return "tcp" | ||
151 | } | ||
152 | return "" | ||
153 | } | ||
154 | |||
155 | func (a strAddr) String() string { return string(a) } | ||
156 | |||
157 | // do runs fn in the ServeHTTP goroutine. | ||
158 | func (ht *serverHandlerTransport) do(fn func()) error { | ||
159 | // Avoid a panic writing to closed channel. Imperfect but maybe good enough. | ||
160 | select { | ||
161 | case <-ht.closedCh: | ||
162 | return ErrConnClosing | ||
163 | default: | ||
164 | select { | ||
165 | case ht.writes <- fn: | ||
166 | return nil | ||
167 | case <-ht.closedCh: | ||
168 | return ErrConnClosing | ||
169 | } | ||
170 | |||
171 | } | ||
172 | } | ||
173 | |||
174 | func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error { | ||
175 | err := ht.do(func() { | ||
176 | ht.writeCommonHeaders(s) | ||
177 | |||
178 | // And flush, in case no header or body has been sent yet. | ||
179 | // This forces a separation of headers and trailers if this is the | ||
180 | // first call (for example, in end2end tests's TestNoService). | ||
181 | ht.rw.(http.Flusher).Flush() | ||
182 | |||
183 | h := ht.rw.Header() | ||
184 | h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code())) | ||
185 | if m := st.Message(); m != "" { | ||
186 | h.Set("Grpc-Message", encodeGrpcMessage(m)) | ||
187 | } | ||
188 | |||
189 | // TODO: Support Grpc-Status-Details-Bin | ||
190 | |||
191 | if md := s.Trailer(); len(md) > 0 { | ||
192 | for k, vv := range md { | ||
193 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | ||
194 | if isReservedHeader(k) { | ||
195 | continue | ||
196 | } | ||
197 | for _, v := range vv { | ||
198 | // http2 ResponseWriter mechanism to send undeclared Trailers after | ||
199 | // the headers have possibly been written. | ||
200 | h.Add(http2.TrailerPrefix+k, encodeMetadataHeader(k, v)) | ||
201 | } | ||
202 | } | ||
203 | } | ||
204 | }) | ||
205 | close(ht.writes) | ||
206 | return err | ||
207 | } | ||
208 | |||
209 | // writeCommonHeaders sets common headers on the first write | ||
210 | // call (Write, WriteHeader, or WriteStatus). | ||
211 | func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) { | ||
212 | if ht.didCommonHeaders { | ||
213 | return | ||
214 | } | ||
215 | ht.didCommonHeaders = true | ||
216 | |||
217 | h := ht.rw.Header() | ||
218 | h["Date"] = nil // suppress Date to make tests happy; TODO: restore | ||
219 | h.Set("Content-Type", "application/grpc") | ||
220 | |||
221 | // Predeclare trailers we'll set later in WriteStatus (after the body). | ||
222 | // This is a SHOULD in the HTTP RFC, and the way you add (known) | ||
223 | // Trailers per the net/http.ResponseWriter contract. | ||
224 | // See https://golang.org/pkg/net/http/#ResponseWriter | ||
225 | // and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers | ||
226 | h.Add("Trailer", "Grpc-Status") | ||
227 | h.Add("Trailer", "Grpc-Message") | ||
228 | // TODO: Support Grpc-Status-Details-Bin | ||
229 | |||
230 | if s.sendCompress != "" { | ||
231 | h.Set("Grpc-Encoding", s.sendCompress) | ||
232 | } | ||
233 | } | ||
234 | |||
235 | func (ht *serverHandlerTransport) Write(s *Stream, data []byte, opts *Options) error { | ||
236 | return ht.do(func() { | ||
237 | ht.writeCommonHeaders(s) | ||
238 | ht.rw.Write(data) | ||
239 | if !opts.Delay { | ||
240 | ht.rw.(http.Flusher).Flush() | ||
241 | } | ||
242 | }) | ||
243 | } | ||
244 | |||
245 | func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error { | ||
246 | return ht.do(func() { | ||
247 | ht.writeCommonHeaders(s) | ||
248 | h := ht.rw.Header() | ||
249 | for k, vv := range md { | ||
250 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | ||
251 | if isReservedHeader(k) { | ||
252 | continue | ||
253 | } | ||
254 | for _, v := range vv { | ||
255 | v = encodeMetadataHeader(k, v) | ||
256 | h.Add(k, v) | ||
257 | } | ||
258 | } | ||
259 | ht.rw.WriteHeader(200) | ||
260 | ht.rw.(http.Flusher).Flush() | ||
261 | }) | ||
262 | } | ||
263 | |||
264 | func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) { | ||
265 | // With this transport type there will be exactly 1 stream: this HTTP request. | ||
266 | |||
267 | var ctx context.Context | ||
268 | var cancel context.CancelFunc | ||
269 | if ht.timeoutSet { | ||
270 | ctx, cancel = context.WithTimeout(context.Background(), ht.timeout) | ||
271 | } else { | ||
272 | ctx, cancel = context.WithCancel(context.Background()) | ||
273 | } | ||
274 | |||
275 | // requestOver is closed when either the request's context is done | ||
276 | // or the status has been written via WriteStatus. | ||
277 | requestOver := make(chan struct{}) | ||
278 | |||
279 | // clientGone receives a single value if peer is gone, either | ||
280 | // because the underlying connection is dead or because the | ||
281 | // peer sends an http2 RST_STREAM. | ||
282 | clientGone := ht.rw.(http.CloseNotifier).CloseNotify() | ||
283 | go func() { | ||
284 | select { | ||
285 | case <-requestOver: | ||
286 | return | ||
287 | case <-ht.closedCh: | ||
288 | case <-clientGone: | ||
289 | } | ||
290 | cancel() | ||
291 | }() | ||
292 | |||
293 | req := ht.req | ||
294 | |||
295 | s := &Stream{ | ||
296 | id: 0, // irrelevant | ||
297 | requestRead: func(int) {}, | ||
298 | cancel: cancel, | ||
299 | buf: newRecvBuffer(), | ||
300 | st: ht, | ||
301 | method: req.URL.Path, | ||
302 | recvCompress: req.Header.Get("grpc-encoding"), | ||
303 | } | ||
304 | pr := &peer.Peer{ | ||
305 | Addr: ht.RemoteAddr(), | ||
306 | } | ||
307 | if req.TLS != nil { | ||
308 | pr.AuthInfo = credentials.TLSInfo{State: *req.TLS} | ||
309 | } | ||
310 | ctx = metadata.NewIncomingContext(ctx, ht.headerMD) | ||
311 | ctx = peer.NewContext(ctx, pr) | ||
312 | s.ctx = newContextWithStream(ctx, s) | ||
313 | s.trReader = &transportReader{ | ||
314 | reader: &recvBufferReader{ctx: s.ctx, recv: s.buf}, | ||
315 | windowHandler: func(int) {}, | ||
316 | } | ||
317 | |||
318 | // readerDone is closed when the Body.Read-ing goroutine exits. | ||
319 | readerDone := make(chan struct{}) | ||
320 | go func() { | ||
321 | defer close(readerDone) | ||
322 | |||
323 | // TODO: minimize garbage, optimize recvBuffer code/ownership | ||
324 | const readSize = 8196 | ||
325 | for buf := make([]byte, readSize); ; { | ||
326 | n, err := req.Body.Read(buf) | ||
327 | if n > 0 { | ||
328 | s.buf.put(recvMsg{data: buf[:n:n]}) | ||
329 | buf = buf[n:] | ||
330 | } | ||
331 | if err != nil { | ||
332 | s.buf.put(recvMsg{err: mapRecvMsgError(err)}) | ||
333 | return | ||
334 | } | ||
335 | if len(buf) == 0 { | ||
336 | buf = make([]byte, readSize) | ||
337 | } | ||
338 | } | ||
339 | }() | ||
340 | |||
341 | // startStream is provided by the *grpc.Server's serveStreams. | ||
342 | // It starts a goroutine serving s and exits immediately. | ||
343 | // The goroutine that is started is the one that then calls | ||
344 | // into ht, calling WriteHeader, Write, WriteStatus, Close, etc. | ||
345 | startStream(s) | ||
346 | |||
347 | ht.runStream() | ||
348 | close(requestOver) | ||
349 | |||
350 | // Wait for reading goroutine to finish. | ||
351 | req.Body.Close() | ||
352 | <-readerDone | ||
353 | } | ||
354 | |||
355 | func (ht *serverHandlerTransport) runStream() { | ||
356 | for { | ||
357 | select { | ||
358 | case fn, ok := <-ht.writes: | ||
359 | if !ok { | ||
360 | return | ||
361 | } | ||
362 | fn() | ||
363 | case <-ht.closedCh: | ||
364 | return | ||
365 | } | ||
366 | } | ||
367 | } | ||
368 | |||
369 | func (ht *serverHandlerTransport) Drain() { | ||
370 | panic("Drain() is not implemented") | ||
371 | } | ||
372 | |||
373 | // mapRecvMsgError returns the non-nil err into the appropriate | ||
374 | // error value as expected by callers of *grpc.parser.recvMsg. | ||
375 | // In particular, in can only be: | ||
376 | // * io.EOF | ||
377 | // * io.ErrUnexpectedEOF | ||
378 | // * of type transport.ConnectionError | ||
379 | // * of type transport.StreamError | ||
380 | func mapRecvMsgError(err error) error { | ||
381 | if err == io.EOF || err == io.ErrUnexpectedEOF { | ||
382 | return err | ||
383 | } | ||
384 | if se, ok := err.(http2.StreamError); ok { | ||
385 | if code, ok := http2ErrConvTab[se.Code]; ok { | ||
386 | return StreamError{ | ||
387 | Code: code, | ||
388 | Desc: se.Error(), | ||
389 | } | ||
390 | } | ||
391 | } | ||
392 | return connectionErrorf(true, err, err.Error()) | ||
393 | } | ||
diff --git a/vendor/google.golang.org/grpc/transport/http2_client.go b/vendor/google.golang.org/grpc/transport/http2_client.go new file mode 100644 index 0000000..516ea06 --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/http2_client.go | |||
@@ -0,0 +1,1369 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package transport | ||
20 | |||
21 | import ( | ||
22 | "bytes" | ||
23 | "io" | ||
24 | "math" | ||
25 | "net" | ||
26 | "strings" | ||
27 | "sync" | ||
28 | "sync/atomic" | ||
29 | "time" | ||
30 | |||
31 | "golang.org/x/net/context" | ||
32 | "golang.org/x/net/http2" | ||
33 | "golang.org/x/net/http2/hpack" | ||
34 | "google.golang.org/grpc/codes" | ||
35 | "google.golang.org/grpc/credentials" | ||
36 | "google.golang.org/grpc/keepalive" | ||
37 | "google.golang.org/grpc/metadata" | ||
38 | "google.golang.org/grpc/peer" | ||
39 | "google.golang.org/grpc/stats" | ||
40 | "google.golang.org/grpc/status" | ||
41 | ) | ||
42 | |||
43 | // http2Client implements the ClientTransport interface with HTTP2. | ||
44 | type http2Client struct { | ||
45 | ctx context.Context | ||
46 | target string // server name/addr | ||
47 | userAgent string | ||
48 | md interface{} | ||
49 | conn net.Conn // underlying communication channel | ||
50 | remoteAddr net.Addr | ||
51 | localAddr net.Addr | ||
52 | authInfo credentials.AuthInfo // auth info about the connection | ||
53 | nextID uint32 // the next stream ID to be used | ||
54 | |||
55 | // writableChan synchronizes write access to the transport. | ||
56 | // A writer acquires the write lock by sending a value on writableChan | ||
57 | // and releases it by receiving from writableChan. | ||
58 | writableChan chan int | ||
59 | // shutdownChan is closed when Close is called. | ||
60 | // Blocking operations should select on shutdownChan to avoid | ||
61 | // blocking forever after Close. | ||
62 | // TODO(zhaoq): Maybe have a channel context? | ||
63 | shutdownChan chan struct{} | ||
64 | // errorChan is closed to notify the I/O error to the caller. | ||
65 | errorChan chan struct{} | ||
66 | // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) | ||
67 | // that the server sent GoAway on this transport. | ||
68 | goAway chan struct{} | ||
69 | // awakenKeepalive is used to wake up keepalive when after it has gone dormant. | ||
70 | awakenKeepalive chan struct{} | ||
71 | |||
72 | framer *framer | ||
73 | hBuf *bytes.Buffer // the buffer for HPACK encoding | ||
74 | hEnc *hpack.Encoder // HPACK encoder | ||
75 | |||
76 | // controlBuf delivers all the control related tasks (e.g., window | ||
77 | // updates, reset streams, and various settings) to the controller. | ||
78 | controlBuf *controlBuffer | ||
79 | fc *inFlow | ||
80 | // sendQuotaPool provides flow control to outbound message. | ||
81 | sendQuotaPool *quotaPool | ||
82 | // streamsQuota limits the max number of concurrent streams. | ||
83 | streamsQuota *quotaPool | ||
84 | |||
85 | // The scheme used: https if TLS is on, http otherwise. | ||
86 | scheme string | ||
87 | |||
88 | isSecure bool | ||
89 | |||
90 | creds []credentials.PerRPCCredentials | ||
91 | |||
92 | // Boolean to keep track of reading activity on transport. | ||
93 | // 1 is true and 0 is false. | ||
94 | activity uint32 // Accessed atomically. | ||
95 | kp keepalive.ClientParameters | ||
96 | |||
97 | statsHandler stats.Handler | ||
98 | |||
99 | initialWindowSize int32 | ||
100 | |||
101 | bdpEst *bdpEstimator | ||
102 | outQuotaVersion uint32 | ||
103 | |||
104 | mu sync.Mutex // guard the following variables | ||
105 | state transportState // the state of underlying connection | ||
106 | activeStreams map[uint32]*Stream | ||
107 | // The max number of concurrent streams | ||
108 | maxStreams int | ||
109 | // the per-stream outbound flow control window size set by the peer. | ||
110 | streamSendQuota uint32 | ||
111 | // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. | ||
112 | prevGoAwayID uint32 | ||
113 | // goAwayReason records the http2.ErrCode and debug data received with the | ||
114 | // GoAway frame. | ||
115 | goAwayReason GoAwayReason | ||
116 | } | ||
117 | |||
118 | func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { | ||
119 | if fn != nil { | ||
120 | return fn(ctx, addr) | ||
121 | } | ||
122 | return dialContext(ctx, "tcp", addr) | ||
123 | } | ||
124 | |||
125 | func isTemporary(err error) bool { | ||
126 | switch err { | ||
127 | case io.EOF: | ||
128 | // Connection closures may be resolved upon retry, and are thus | ||
129 | // treated as temporary. | ||
130 | return true | ||
131 | case context.DeadlineExceeded: | ||
132 | // In Go 1.7, context.DeadlineExceeded implements Timeout(), and this | ||
133 | // special case is not needed. Until then, we need to keep this | ||
134 | // clause. | ||
135 | return true | ||
136 | } | ||
137 | |||
138 | switch err := err.(type) { | ||
139 | case interface { | ||
140 | Temporary() bool | ||
141 | }: | ||
142 | return err.Temporary() | ||
143 | case interface { | ||
144 | Timeout() bool | ||
145 | }: | ||
146 | // Timeouts may be resolved upon retry, and are thus treated as | ||
147 | // temporary. | ||
148 | return err.Timeout() | ||
149 | } | ||
150 | return false | ||
151 | } | ||
152 | |||
153 | // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 | ||
154 | // and starts to receive messages on it. Non-nil error returns if construction | ||
155 | // fails. | ||
156 | func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (_ ClientTransport, err error) { | ||
157 | scheme := "http" | ||
158 | conn, err := dial(ctx, opts.Dialer, addr.Addr) | ||
159 | if err != nil { | ||
160 | if opts.FailOnNonTempDialError { | ||
161 | return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err) | ||
162 | } | ||
163 | return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err) | ||
164 | } | ||
165 | // Any further errors will close the underlying connection | ||
166 | defer func(conn net.Conn) { | ||
167 | if err != nil { | ||
168 | conn.Close() | ||
169 | } | ||
170 | }(conn) | ||
171 | var ( | ||
172 | isSecure bool | ||
173 | authInfo credentials.AuthInfo | ||
174 | ) | ||
175 | if creds := opts.TransportCredentials; creds != nil { | ||
176 | scheme = "https" | ||
177 | conn, authInfo, err = creds.ClientHandshake(ctx, addr.Addr, conn) | ||
178 | if err != nil { | ||
179 | // Credentials handshake errors are typically considered permanent | ||
180 | // to avoid retrying on e.g. bad certificates. | ||
181 | temp := isTemporary(err) | ||
182 | return nil, connectionErrorf(temp, err, "transport: authentication handshake failed: %v", err) | ||
183 | } | ||
184 | isSecure = true | ||
185 | } | ||
186 | kp := opts.KeepaliveParams | ||
187 | // Validate keepalive parameters. | ||
188 | if kp.Time == 0 { | ||
189 | kp.Time = defaultClientKeepaliveTime | ||
190 | } | ||
191 | if kp.Timeout == 0 { | ||
192 | kp.Timeout = defaultClientKeepaliveTimeout | ||
193 | } | ||
194 | dynamicWindow := true | ||
195 | icwz := int32(initialWindowSize) | ||
196 | if opts.InitialConnWindowSize >= defaultWindowSize { | ||
197 | icwz = opts.InitialConnWindowSize | ||
198 | dynamicWindow = false | ||
199 | } | ||
200 | var buf bytes.Buffer | ||
201 | t := &http2Client{ | ||
202 | ctx: ctx, | ||
203 | target: addr.Addr, | ||
204 | userAgent: opts.UserAgent, | ||
205 | md: addr.Metadata, | ||
206 | conn: conn, | ||
207 | remoteAddr: conn.RemoteAddr(), | ||
208 | localAddr: conn.LocalAddr(), | ||
209 | authInfo: authInfo, | ||
210 | // The client initiated stream id is odd starting from 1. | ||
211 | nextID: 1, | ||
212 | writableChan: make(chan int, 1), | ||
213 | shutdownChan: make(chan struct{}), | ||
214 | errorChan: make(chan struct{}), | ||
215 | goAway: make(chan struct{}), | ||
216 | awakenKeepalive: make(chan struct{}, 1), | ||
217 | framer: newFramer(conn), | ||
218 | hBuf: &buf, | ||
219 | hEnc: hpack.NewEncoder(&buf), | ||
220 | controlBuf: newControlBuffer(), | ||
221 | fc: &inFlow{limit: uint32(icwz)}, | ||
222 | sendQuotaPool: newQuotaPool(defaultWindowSize), | ||
223 | scheme: scheme, | ||
224 | state: reachable, | ||
225 | activeStreams: make(map[uint32]*Stream), | ||
226 | isSecure: isSecure, | ||
227 | creds: opts.PerRPCCredentials, | ||
228 | maxStreams: defaultMaxStreamsClient, | ||
229 | streamsQuota: newQuotaPool(defaultMaxStreamsClient), | ||
230 | streamSendQuota: defaultWindowSize, | ||
231 | kp: kp, | ||
232 | statsHandler: opts.StatsHandler, | ||
233 | initialWindowSize: initialWindowSize, | ||
234 | } | ||
235 | if opts.InitialWindowSize >= defaultWindowSize { | ||
236 | t.initialWindowSize = opts.InitialWindowSize | ||
237 | dynamicWindow = false | ||
238 | } | ||
239 | if dynamicWindow { | ||
240 | t.bdpEst = &bdpEstimator{ | ||
241 | bdp: initialWindowSize, | ||
242 | updateFlowControl: t.updateFlowControl, | ||
243 | } | ||
244 | } | ||
245 | // Make sure awakenKeepalive can't be written upon. | ||
246 | // keepalive routine will make it writable, if need be. | ||
247 | t.awakenKeepalive <- struct{}{} | ||
248 | if t.statsHandler != nil { | ||
249 | t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ | ||
250 | RemoteAddr: t.remoteAddr, | ||
251 | LocalAddr: t.localAddr, | ||
252 | }) | ||
253 | connBegin := &stats.ConnBegin{ | ||
254 | Client: true, | ||
255 | } | ||
256 | t.statsHandler.HandleConn(t.ctx, connBegin) | ||
257 | } | ||
258 | // Start the reader goroutine for incoming message. Each transport has | ||
259 | // a dedicated goroutine which reads HTTP2 frame from network. Then it | ||
260 | // dispatches the frame to the corresponding stream entity. | ||
261 | go t.reader() | ||
262 | // Send connection preface to server. | ||
263 | n, err := t.conn.Write(clientPreface) | ||
264 | if err != nil { | ||
265 | t.Close() | ||
266 | return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err) | ||
267 | } | ||
268 | if n != len(clientPreface) { | ||
269 | t.Close() | ||
270 | return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) | ||
271 | } | ||
272 | if t.initialWindowSize != defaultWindowSize { | ||
273 | err = t.framer.writeSettings(true, http2.Setting{ | ||
274 | ID: http2.SettingInitialWindowSize, | ||
275 | Val: uint32(t.initialWindowSize), | ||
276 | }) | ||
277 | } else { | ||
278 | err = t.framer.writeSettings(true) | ||
279 | } | ||
280 | if err != nil { | ||
281 | t.Close() | ||
282 | return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err) | ||
283 | } | ||
284 | // Adjust the connection flow control window if needed. | ||
285 | if delta := uint32(icwz - defaultWindowSize); delta > 0 { | ||
286 | if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil { | ||
287 | t.Close() | ||
288 | return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err) | ||
289 | } | ||
290 | } | ||
291 | go t.controller() | ||
292 | if t.kp.Time != infinity { | ||
293 | go t.keepalive() | ||
294 | } | ||
295 | t.writableChan <- 0 | ||
296 | return t, nil | ||
297 | } | ||
298 | |||
299 | func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { | ||
300 | // TODO(zhaoq): Handle uint32 overflow of Stream.id. | ||
301 | s := &Stream{ | ||
302 | id: t.nextID, | ||
303 | done: make(chan struct{}), | ||
304 | goAway: make(chan struct{}), | ||
305 | method: callHdr.Method, | ||
306 | sendCompress: callHdr.SendCompress, | ||
307 | buf: newRecvBuffer(), | ||
308 | fc: &inFlow{limit: uint32(t.initialWindowSize)}, | ||
309 | sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), | ||
310 | headerChan: make(chan struct{}), | ||
311 | } | ||
312 | t.nextID += 2 | ||
313 | s.requestRead = func(n int) { | ||
314 | t.adjustWindow(s, uint32(n)) | ||
315 | } | ||
316 | // The client side stream context should have exactly the same life cycle with the user provided context. | ||
317 | // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done. | ||
318 | // So we use the original context here instead of creating a copy. | ||
319 | s.ctx = ctx | ||
320 | s.trReader = &transportReader{ | ||
321 | reader: &recvBufferReader{ | ||
322 | ctx: s.ctx, | ||
323 | goAway: s.goAway, | ||
324 | recv: s.buf, | ||
325 | }, | ||
326 | windowHandler: func(n int) { | ||
327 | t.updateWindow(s, uint32(n)) | ||
328 | }, | ||
329 | } | ||
330 | |||
331 | return s | ||
332 | } | ||
333 | |||
334 | // NewStream creates a stream and registers it into the transport as "active" | ||
335 | // streams. | ||
336 | func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) { | ||
337 | pr := &peer.Peer{ | ||
338 | Addr: t.remoteAddr, | ||
339 | } | ||
340 | // Attach Auth info if there is any. | ||
341 | if t.authInfo != nil { | ||
342 | pr.AuthInfo = t.authInfo | ||
343 | } | ||
344 | ctx = peer.NewContext(ctx, pr) | ||
345 | var ( | ||
346 | authData = make(map[string]string) | ||
347 | audience string | ||
348 | ) | ||
349 | // Create an audience string only if needed. | ||
350 | if len(t.creds) > 0 || callHdr.Creds != nil { | ||
351 | // Construct URI required to get auth request metadata. | ||
352 | var port string | ||
353 | if pos := strings.LastIndex(t.target, ":"); pos != -1 { | ||
354 | // Omit port if it is the default one. | ||
355 | if t.target[pos+1:] != "443" { | ||
356 | port = ":" + t.target[pos+1:] | ||
357 | } | ||
358 | } | ||
359 | pos := strings.LastIndex(callHdr.Method, "/") | ||
360 | if pos == -1 { | ||
361 | pos = len(callHdr.Method) | ||
362 | } | ||
363 | audience = "https://" + callHdr.Host + port + callHdr.Method[:pos] | ||
364 | } | ||
365 | for _, c := range t.creds { | ||
366 | data, err := c.GetRequestMetadata(ctx, audience) | ||
367 | if err != nil { | ||
368 | return nil, streamErrorf(codes.Internal, "transport: %v", err) | ||
369 | } | ||
370 | for k, v := range data { | ||
371 | // Capital header names are illegal in HTTP/2. | ||
372 | k = strings.ToLower(k) | ||
373 | authData[k] = v | ||
374 | } | ||
375 | } | ||
376 | callAuthData := make(map[string]string) | ||
377 | // Check if credentials.PerRPCCredentials were provided via call options. | ||
378 | // Note: if these credentials are provided both via dial options and call | ||
379 | // options, then both sets of credentials will be applied. | ||
380 | if callCreds := callHdr.Creds; callCreds != nil { | ||
381 | if !t.isSecure && callCreds.RequireTransportSecurity() { | ||
382 | return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure conneciton") | ||
383 | } | ||
384 | data, err := callCreds.GetRequestMetadata(ctx, audience) | ||
385 | if err != nil { | ||
386 | return nil, streamErrorf(codes.Internal, "transport: %v", err) | ||
387 | } | ||
388 | for k, v := range data { | ||
389 | // Capital header names are illegal in HTTP/2 | ||
390 | k = strings.ToLower(k) | ||
391 | callAuthData[k] = v | ||
392 | } | ||
393 | } | ||
394 | t.mu.Lock() | ||
395 | if t.activeStreams == nil { | ||
396 | t.mu.Unlock() | ||
397 | return nil, ErrConnClosing | ||
398 | } | ||
399 | if t.state == draining { | ||
400 | t.mu.Unlock() | ||
401 | return nil, ErrStreamDrain | ||
402 | } | ||
403 | if t.state != reachable { | ||
404 | t.mu.Unlock() | ||
405 | return nil, ErrConnClosing | ||
406 | } | ||
407 | t.mu.Unlock() | ||
408 | sq, err := wait(ctx, nil, nil, t.shutdownChan, t.streamsQuota.acquire()) | ||
409 | if err != nil { | ||
410 | return nil, err | ||
411 | } | ||
412 | // Returns the quota balance back. | ||
413 | if sq > 1 { | ||
414 | t.streamsQuota.add(sq - 1) | ||
415 | } | ||
416 | if _, err := wait(ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { | ||
417 | // Return the quota back now because there is no stream returned to the caller. | ||
418 | if _, ok := err.(StreamError); ok { | ||
419 | t.streamsQuota.add(1) | ||
420 | } | ||
421 | return nil, err | ||
422 | } | ||
423 | t.mu.Lock() | ||
424 | if t.state == draining { | ||
425 | t.mu.Unlock() | ||
426 | t.streamsQuota.add(1) | ||
427 | // Need to make t writable again so that the rpc in flight can still proceed. | ||
428 | t.writableChan <- 0 | ||
429 | return nil, ErrStreamDrain | ||
430 | } | ||
431 | if t.state != reachable { | ||
432 | t.mu.Unlock() | ||
433 | return nil, ErrConnClosing | ||
434 | } | ||
435 | s := t.newStream(ctx, callHdr) | ||
436 | t.activeStreams[s.id] = s | ||
437 | // If the number of active streams change from 0 to 1, then check if keepalive | ||
438 | // has gone dormant. If so, wake it up. | ||
439 | if len(t.activeStreams) == 1 { | ||
440 | select { | ||
441 | case t.awakenKeepalive <- struct{}{}: | ||
442 | t.framer.writePing(false, false, [8]byte{}) | ||
443 | default: | ||
444 | } | ||
445 | } | ||
446 | |||
447 | t.mu.Unlock() | ||
448 | |||
449 | // HPACK encodes various headers. Note that once WriteField(...) is | ||
450 | // called, the corresponding headers/continuation frame has to be sent | ||
451 | // because hpack.Encoder is stateful. | ||
452 | t.hBuf.Reset() | ||
453 | t.hEnc.WriteField(hpack.HeaderField{Name: ":method", Value: "POST"}) | ||
454 | t.hEnc.WriteField(hpack.HeaderField{Name: ":scheme", Value: t.scheme}) | ||
455 | t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method}) | ||
456 | t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) | ||
457 | t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) | ||
458 | t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent}) | ||
459 | t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"}) | ||
460 | |||
461 | if callHdr.SendCompress != "" { | ||
462 | t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress}) | ||
463 | } | ||
464 | if dl, ok := ctx.Deadline(); ok { | ||
465 | // Send out timeout regardless its value. The server can detect timeout context by itself. | ||
466 | timeout := dl.Sub(time.Now()) | ||
467 | t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)}) | ||
468 | } | ||
469 | |||
470 | for k, v := range authData { | ||
471 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
472 | } | ||
473 | for k, v := range callAuthData { | ||
474 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
475 | } | ||
476 | var ( | ||
477 | endHeaders bool | ||
478 | ) | ||
479 | if md, ok := metadata.FromOutgoingContext(ctx); ok { | ||
480 | for k, vv := range md { | ||
481 | // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. | ||
482 | if isReservedHeader(k) { | ||
483 | continue | ||
484 | } | ||
485 | for _, v := range vv { | ||
486 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
487 | } | ||
488 | } | ||
489 | } | ||
490 | if md, ok := t.md.(*metadata.MD); ok { | ||
491 | for k, vv := range *md { | ||
492 | if isReservedHeader(k) { | ||
493 | continue | ||
494 | } | ||
495 | for _, v := range vv { | ||
496 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
497 | } | ||
498 | } | ||
499 | } | ||
500 | first := true | ||
501 | bufLen := t.hBuf.Len() | ||
502 | // Sends the headers in a single batch even when they span multiple frames. | ||
503 | for !endHeaders { | ||
504 | size := t.hBuf.Len() | ||
505 | if size > http2MaxFrameLen { | ||
506 | size = http2MaxFrameLen | ||
507 | } else { | ||
508 | endHeaders = true | ||
509 | } | ||
510 | var flush bool | ||
511 | if callHdr.Flush && endHeaders { | ||
512 | flush = true | ||
513 | } | ||
514 | if first { | ||
515 | // Sends a HeadersFrame to server to start a new stream. | ||
516 | p := http2.HeadersFrameParam{ | ||
517 | StreamID: s.id, | ||
518 | BlockFragment: t.hBuf.Next(size), | ||
519 | EndStream: false, | ||
520 | EndHeaders: endHeaders, | ||
521 | } | ||
522 | // Do a force flush for the buffered frames iff it is the last headers frame | ||
523 | // and there is header metadata to be sent. Otherwise, there is flushing until | ||
524 | // the corresponding data frame is written. | ||
525 | err = t.framer.writeHeaders(flush, p) | ||
526 | first = false | ||
527 | } else { | ||
528 | // Sends Continuation frames for the leftover headers. | ||
529 | err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size)) | ||
530 | } | ||
531 | if err != nil { | ||
532 | t.notifyError(err) | ||
533 | return nil, connectionErrorf(true, err, "transport: %v", err) | ||
534 | } | ||
535 | } | ||
536 | s.mu.Lock() | ||
537 | s.bytesSent = true | ||
538 | s.mu.Unlock() | ||
539 | |||
540 | if t.statsHandler != nil { | ||
541 | outHeader := &stats.OutHeader{ | ||
542 | Client: true, | ||
543 | WireLength: bufLen, | ||
544 | FullMethod: callHdr.Method, | ||
545 | RemoteAddr: t.remoteAddr, | ||
546 | LocalAddr: t.localAddr, | ||
547 | Compression: callHdr.SendCompress, | ||
548 | } | ||
549 | t.statsHandler.HandleRPC(s.ctx, outHeader) | ||
550 | } | ||
551 | t.writableChan <- 0 | ||
552 | return s, nil | ||
553 | } | ||
554 | |||
555 | // CloseStream clears the footprint of a stream when the stream is not needed any more. | ||
556 | // This must not be executed in reader's goroutine. | ||
557 | func (t *http2Client) CloseStream(s *Stream, err error) { | ||
558 | t.mu.Lock() | ||
559 | if t.activeStreams == nil { | ||
560 | t.mu.Unlock() | ||
561 | return | ||
562 | } | ||
563 | if err != nil { | ||
564 | // notify in-flight streams, before the deletion | ||
565 | s.write(recvMsg{err: err}) | ||
566 | } | ||
567 | delete(t.activeStreams, s.id) | ||
568 | if t.state == draining && len(t.activeStreams) == 0 { | ||
569 | // The transport is draining and s is the last live stream on t. | ||
570 | t.mu.Unlock() | ||
571 | t.Close() | ||
572 | return | ||
573 | } | ||
574 | t.mu.Unlock() | ||
575 | // rstStream is true in case the stream is being closed at the client-side | ||
576 | // and the server needs to be intimated about it by sending a RST_STREAM | ||
577 | // frame. | ||
578 | // To make sure this frame is written to the wire before the headers of the | ||
579 | // next stream waiting for streamsQuota, we add to streamsQuota pool only | ||
580 | // after having acquired the writableChan to send RST_STREAM out (look at | ||
581 | // the controller() routine). | ||
582 | var rstStream bool | ||
583 | var rstError http2.ErrCode | ||
584 | defer func() { | ||
585 | // In case, the client doesn't have to send RST_STREAM to server | ||
586 | // we can safely add back to streamsQuota pool now. | ||
587 | if !rstStream { | ||
588 | t.streamsQuota.add(1) | ||
589 | return | ||
590 | } | ||
591 | t.controlBuf.put(&resetStream{s.id, rstError}) | ||
592 | }() | ||
593 | s.mu.Lock() | ||
594 | rstStream = s.rstStream | ||
595 | rstError = s.rstError | ||
596 | if s.state == streamDone { | ||
597 | s.mu.Unlock() | ||
598 | return | ||
599 | } | ||
600 | if !s.headerDone { | ||
601 | close(s.headerChan) | ||
602 | s.headerDone = true | ||
603 | } | ||
604 | s.state = streamDone | ||
605 | s.mu.Unlock() | ||
606 | if _, ok := err.(StreamError); ok { | ||
607 | rstStream = true | ||
608 | rstError = http2.ErrCodeCancel | ||
609 | } | ||
610 | } | ||
611 | |||
612 | // Close kicks off the shutdown process of the transport. This should be called | ||
613 | // only once on a transport. Once it is called, the transport should not be | ||
614 | // accessed any more. | ||
615 | func (t *http2Client) Close() (err error) { | ||
616 | t.mu.Lock() | ||
617 | if t.state == closing { | ||
618 | t.mu.Unlock() | ||
619 | return | ||
620 | } | ||
621 | if t.state == reachable || t.state == draining { | ||
622 | close(t.errorChan) | ||
623 | } | ||
624 | t.state = closing | ||
625 | t.mu.Unlock() | ||
626 | close(t.shutdownChan) | ||
627 | err = t.conn.Close() | ||
628 | t.mu.Lock() | ||
629 | streams := t.activeStreams | ||
630 | t.activeStreams = nil | ||
631 | t.mu.Unlock() | ||
632 | // Notify all active streams. | ||
633 | for _, s := range streams { | ||
634 | s.mu.Lock() | ||
635 | if !s.headerDone { | ||
636 | close(s.headerChan) | ||
637 | s.headerDone = true | ||
638 | } | ||
639 | s.mu.Unlock() | ||
640 | s.write(recvMsg{err: ErrConnClosing}) | ||
641 | } | ||
642 | if t.statsHandler != nil { | ||
643 | connEnd := &stats.ConnEnd{ | ||
644 | Client: true, | ||
645 | } | ||
646 | t.statsHandler.HandleConn(t.ctx, connEnd) | ||
647 | } | ||
648 | return | ||
649 | } | ||
650 | |||
651 | func (t *http2Client) GracefulClose() error { | ||
652 | t.mu.Lock() | ||
653 | switch t.state { | ||
654 | case unreachable: | ||
655 | // The server may close the connection concurrently. t is not available for | ||
656 | // any streams. Close it now. | ||
657 | t.mu.Unlock() | ||
658 | t.Close() | ||
659 | return nil | ||
660 | case closing: | ||
661 | t.mu.Unlock() | ||
662 | return nil | ||
663 | } | ||
664 | if t.state == draining { | ||
665 | t.mu.Unlock() | ||
666 | return nil | ||
667 | } | ||
668 | t.state = draining | ||
669 | active := len(t.activeStreams) | ||
670 | t.mu.Unlock() | ||
671 | if active == 0 { | ||
672 | return t.Close() | ||
673 | } | ||
674 | return nil | ||
675 | } | ||
676 | |||
677 | // Write formats the data into HTTP2 data frame(s) and sends it out. The caller | ||
678 | // should proceed only if Write returns nil. | ||
679 | // TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later | ||
680 | // if it improves the performance. | ||
681 | func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { | ||
682 | r := bytes.NewBuffer(data) | ||
683 | var ( | ||
684 | p []byte | ||
685 | oqv uint32 | ||
686 | ) | ||
687 | for { | ||
688 | oqv = atomic.LoadUint32(&t.outQuotaVersion) | ||
689 | if r.Len() > 0 || p != nil { | ||
690 | size := http2MaxFrameLen | ||
691 | // Wait until the stream has some quota to send the data. | ||
692 | sq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, s.sendQuotaPool.acquire()) | ||
693 | if err != nil { | ||
694 | return err | ||
695 | } | ||
696 | // Wait until the transport has some quota to send the data. | ||
697 | tq, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.sendQuotaPool.acquire()) | ||
698 | if err != nil { | ||
699 | return err | ||
700 | } | ||
701 | if sq < size { | ||
702 | size = sq | ||
703 | } | ||
704 | if tq < size { | ||
705 | size = tq | ||
706 | } | ||
707 | if p == nil { | ||
708 | p = r.Next(size) | ||
709 | } | ||
710 | ps := len(p) | ||
711 | if ps < sq { | ||
712 | // Overbooked stream quota. Return it back. | ||
713 | s.sendQuotaPool.add(sq - ps) | ||
714 | } | ||
715 | if ps < tq { | ||
716 | // Overbooked transport quota. Return it back. | ||
717 | t.sendQuotaPool.add(tq - ps) | ||
718 | } | ||
719 | } | ||
720 | var ( | ||
721 | endStream bool | ||
722 | forceFlush bool | ||
723 | ) | ||
724 | if opts.Last && r.Len() == 0 { | ||
725 | endStream = true | ||
726 | } | ||
727 | // Indicate there is a writer who is about to write a data frame. | ||
728 | t.framer.adjustNumWriters(1) | ||
729 | // Got some quota. Try to acquire writing privilege on the transport. | ||
730 | if _, err := wait(s.ctx, s.done, s.goAway, t.shutdownChan, t.writableChan); err != nil { | ||
731 | if _, ok := err.(StreamError); ok || err == io.EOF { | ||
732 | // Return the connection quota back. | ||
733 | t.sendQuotaPool.add(len(p)) | ||
734 | } | ||
735 | if t.framer.adjustNumWriters(-1) == 0 { | ||
736 | // This writer is the last one in this batch and has the | ||
737 | // responsibility to flush the buffered frames. It queues | ||
738 | // a flush request to controlBuf instead of flushing directly | ||
739 | // in order to avoid the race with other writing or flushing. | ||
740 | t.controlBuf.put(&flushIO{}) | ||
741 | } | ||
742 | return err | ||
743 | } | ||
744 | select { | ||
745 | case <-s.ctx.Done(): | ||
746 | t.sendQuotaPool.add(len(p)) | ||
747 | if t.framer.adjustNumWriters(-1) == 0 { | ||
748 | t.controlBuf.put(&flushIO{}) | ||
749 | } | ||
750 | t.writableChan <- 0 | ||
751 | return ContextErr(s.ctx.Err()) | ||
752 | default: | ||
753 | } | ||
754 | if oqv != atomic.LoadUint32(&t.outQuotaVersion) { | ||
755 | // InitialWindowSize settings frame must have been received after we | ||
756 | // acquired send quota but before we got the writable channel. | ||
757 | // We must forsake this write. | ||
758 | t.sendQuotaPool.add(len(p)) | ||
759 | s.sendQuotaPool.add(len(p)) | ||
760 | if t.framer.adjustNumWriters(-1) == 0 { | ||
761 | t.controlBuf.put(&flushIO{}) | ||
762 | } | ||
763 | t.writableChan <- 0 | ||
764 | continue | ||
765 | } | ||
766 | if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 { | ||
767 | // Do a force flush iff this is last frame for the entire gRPC message | ||
768 | // and the caller is the only writer at this moment. | ||
769 | forceFlush = true | ||
770 | } | ||
771 | // If WriteData fails, all the pending streams will be handled | ||
772 | // by http2Client.Close(). No explicit CloseStream() needs to be | ||
773 | // invoked. | ||
774 | if err := t.framer.writeData(forceFlush, s.id, endStream, p); err != nil { | ||
775 | t.notifyError(err) | ||
776 | return connectionErrorf(true, err, "transport: %v", err) | ||
777 | } | ||
778 | p = nil | ||
779 | if t.framer.adjustNumWriters(-1) == 0 { | ||
780 | t.framer.flushWrite() | ||
781 | } | ||
782 | t.writableChan <- 0 | ||
783 | if r.Len() == 0 { | ||
784 | break | ||
785 | } | ||
786 | } | ||
787 | if !opts.Last { | ||
788 | return nil | ||
789 | } | ||
790 | s.mu.Lock() | ||
791 | if s.state != streamDone { | ||
792 | s.state = streamWriteDone | ||
793 | } | ||
794 | s.mu.Unlock() | ||
795 | return nil | ||
796 | } | ||
797 | |||
798 | func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { | ||
799 | t.mu.Lock() | ||
800 | defer t.mu.Unlock() | ||
801 | s, ok := t.activeStreams[f.Header().StreamID] | ||
802 | return s, ok | ||
803 | } | ||
804 | |||
805 | // adjustWindow sends out extra window update over the initial window size | ||
806 | // of stream if the application is requesting data larger in size than | ||
807 | // the window. | ||
808 | func (t *http2Client) adjustWindow(s *Stream, n uint32) { | ||
809 | s.mu.Lock() | ||
810 | defer s.mu.Unlock() | ||
811 | if s.state == streamDone { | ||
812 | return | ||
813 | } | ||
814 | if w := s.fc.maybeAdjust(n); w > 0 { | ||
815 | // Piggyback conneciton's window update along. | ||
816 | if cw := t.fc.resetPendingUpdate(); cw > 0 { | ||
817 | t.controlBuf.put(&windowUpdate{0, cw, false}) | ||
818 | } | ||
819 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | ||
820 | } | ||
821 | } | ||
822 | |||
823 | // updateWindow adjusts the inbound quota for the stream and the transport. | ||
824 | // Window updates will deliver to the controller for sending when | ||
825 | // the cumulative quota exceeds the corresponding threshold. | ||
826 | func (t *http2Client) updateWindow(s *Stream, n uint32) { | ||
827 | s.mu.Lock() | ||
828 | defer s.mu.Unlock() | ||
829 | if s.state == streamDone { | ||
830 | return | ||
831 | } | ||
832 | if w := s.fc.onRead(n); w > 0 { | ||
833 | if cw := t.fc.resetPendingUpdate(); cw > 0 { | ||
834 | t.controlBuf.put(&windowUpdate{0, cw, false}) | ||
835 | } | ||
836 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | ||
837 | } | ||
838 | } | ||
839 | |||
840 | // updateFlowControl updates the incoming flow control windows | ||
841 | // for the transport and the stream based on the current bdp | ||
842 | // estimation. | ||
843 | func (t *http2Client) updateFlowControl(n uint32) { | ||
844 | t.mu.Lock() | ||
845 | for _, s := range t.activeStreams { | ||
846 | s.fc.newLimit(n) | ||
847 | } | ||
848 | t.initialWindowSize = int32(n) | ||
849 | t.mu.Unlock() | ||
850 | t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false}) | ||
851 | t.controlBuf.put(&settings{ | ||
852 | ack: false, | ||
853 | ss: []http2.Setting{ | ||
854 | { | ||
855 | ID: http2.SettingInitialWindowSize, | ||
856 | Val: uint32(n), | ||
857 | }, | ||
858 | }, | ||
859 | }) | ||
860 | } | ||
861 | |||
862 | func (t *http2Client) handleData(f *http2.DataFrame) { | ||
863 | size := f.Header().Length | ||
864 | var sendBDPPing bool | ||
865 | if t.bdpEst != nil { | ||
866 | sendBDPPing = t.bdpEst.add(uint32(size)) | ||
867 | } | ||
868 | // Decouple connection's flow control from application's read. | ||
869 | // An update on connection's flow control should not depend on | ||
870 | // whether user application has read the data or not. Such a | ||
871 | // restriction is already imposed on the stream's flow control, | ||
872 | // and therefore the sender will be blocked anyways. | ||
873 | // Decoupling the connection flow control will prevent other | ||
874 | // active(fast) streams from starving in presence of slow or | ||
875 | // inactive streams. | ||
876 | // | ||
877 | // Furthermore, if a bdpPing is being sent out we can piggyback | ||
878 | // connection's window update for the bytes we just received. | ||
879 | if sendBDPPing { | ||
880 | t.controlBuf.put(&windowUpdate{0, uint32(size), false}) | ||
881 | t.controlBuf.put(bdpPing) | ||
882 | } else { | ||
883 | if err := t.fc.onData(uint32(size)); err != nil { | ||
884 | t.notifyError(connectionErrorf(true, err, "%v", err)) | ||
885 | return | ||
886 | } | ||
887 | if w := t.fc.onRead(uint32(size)); w > 0 { | ||
888 | t.controlBuf.put(&windowUpdate{0, w, true}) | ||
889 | } | ||
890 | } | ||
891 | // Select the right stream to dispatch. | ||
892 | s, ok := t.getStream(f) | ||
893 | if !ok { | ||
894 | return | ||
895 | } | ||
896 | if size > 0 { | ||
897 | s.mu.Lock() | ||
898 | if s.state == streamDone { | ||
899 | s.mu.Unlock() | ||
900 | return | ||
901 | } | ||
902 | if err := s.fc.onData(uint32(size)); err != nil { | ||
903 | s.rstStream = true | ||
904 | s.rstError = http2.ErrCodeFlowControl | ||
905 | s.finish(status.New(codes.Internal, err.Error())) | ||
906 | s.mu.Unlock() | ||
907 | s.write(recvMsg{err: io.EOF}) | ||
908 | return | ||
909 | } | ||
910 | if f.Header().Flags.Has(http2.FlagDataPadded) { | ||
911 | if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { | ||
912 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | ||
913 | } | ||
914 | } | ||
915 | s.mu.Unlock() | ||
916 | // TODO(bradfitz, zhaoq): A copy is required here because there is no | ||
917 | // guarantee f.Data() is consumed before the arrival of next frame. | ||
918 | // Can this copy be eliminated? | ||
919 | if len(f.Data()) > 0 { | ||
920 | data := make([]byte, len(f.Data())) | ||
921 | copy(data, f.Data()) | ||
922 | s.write(recvMsg{data: data}) | ||
923 | } | ||
924 | } | ||
925 | // The server has closed the stream without sending trailers. Record that | ||
926 | // the read direction is closed, and set the status appropriately. | ||
927 | if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { | ||
928 | s.mu.Lock() | ||
929 | if s.state == streamDone { | ||
930 | s.mu.Unlock() | ||
931 | return | ||
932 | } | ||
933 | s.finish(status.New(codes.Internal, "server closed the stream without sending trailers")) | ||
934 | s.mu.Unlock() | ||
935 | s.write(recvMsg{err: io.EOF}) | ||
936 | } | ||
937 | } | ||
938 | |||
939 | func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) { | ||
940 | s, ok := t.getStream(f) | ||
941 | if !ok { | ||
942 | return | ||
943 | } | ||
944 | s.mu.Lock() | ||
945 | if s.state == streamDone { | ||
946 | s.mu.Unlock() | ||
947 | return | ||
948 | } | ||
949 | if !s.headerDone { | ||
950 | close(s.headerChan) | ||
951 | s.headerDone = true | ||
952 | } | ||
953 | statusCode, ok := http2ErrConvTab[http2.ErrCode(f.ErrCode)] | ||
954 | if !ok { | ||
955 | warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode) | ||
956 | statusCode = codes.Unknown | ||
957 | } | ||
958 | s.finish(status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %d", f.ErrCode)) | ||
959 | s.mu.Unlock() | ||
960 | s.write(recvMsg{err: io.EOF}) | ||
961 | } | ||
962 | |||
963 | func (t *http2Client) handleSettings(f *http2.SettingsFrame) { | ||
964 | if f.IsAck() { | ||
965 | return | ||
966 | } | ||
967 | var ss []http2.Setting | ||
968 | f.ForeachSetting(func(s http2.Setting) error { | ||
969 | ss = append(ss, s) | ||
970 | return nil | ||
971 | }) | ||
972 | // The settings will be applied once the ack is sent. | ||
973 | t.controlBuf.put(&settings{ack: true, ss: ss}) | ||
974 | } | ||
975 | |||
976 | func (t *http2Client) handlePing(f *http2.PingFrame) { | ||
977 | if f.IsAck() { | ||
978 | // Maybe it's a BDP ping. | ||
979 | if t.bdpEst != nil { | ||
980 | t.bdpEst.calculate(f.Data) | ||
981 | } | ||
982 | return | ||
983 | } | ||
984 | pingAck := &ping{ack: true} | ||
985 | copy(pingAck.data[:], f.Data[:]) | ||
986 | t.controlBuf.put(pingAck) | ||
987 | } | ||
988 | |||
989 | func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { | ||
990 | t.mu.Lock() | ||
991 | if t.state != reachable && t.state != draining { | ||
992 | t.mu.Unlock() | ||
993 | return | ||
994 | } | ||
995 | if f.ErrCode == http2.ErrCodeEnhanceYourCalm { | ||
996 | infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.") | ||
997 | } | ||
998 | id := f.LastStreamID | ||
999 | if id > 0 && id%2 != 1 { | ||
1000 | t.mu.Unlock() | ||
1001 | t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: stream ID %d is even", f.LastStreamID)) | ||
1002 | return | ||
1003 | } | ||
1004 | // A client can recieve multiple GoAways from server (look at https://github.com/grpc/grpc-go/issues/1387). | ||
1005 | // The idea is that the first GoAway will be sent with an ID of MaxInt32 and the second GoAway will be sent after an RTT delay | ||
1006 | // with the ID of the last stream the server will process. | ||
1007 | // Therefore, when we get the first GoAway we don't really close any streams. While in case of second GoAway we | ||
1008 | // close all streams created after the second GoAwayId. This way streams that were in-flight while the GoAway from server | ||
1009 | // was being sent don't get killed. | ||
1010 | select { | ||
1011 | case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways). | ||
1012 | // If there are multiple GoAways the first one should always have an ID greater than the following ones. | ||
1013 | if id > t.prevGoAwayID { | ||
1014 | t.mu.Unlock() | ||
1015 | t.notifyError(connectionErrorf(true, nil, "received illegal http2 GOAWAY frame: previously recv GOAWAY frame with LastStramID %d, currently recv %d", id, f.LastStreamID)) | ||
1016 | return | ||
1017 | } | ||
1018 | default: | ||
1019 | t.setGoAwayReason(f) | ||
1020 | close(t.goAway) | ||
1021 | t.state = draining | ||
1022 | } | ||
1023 | // All streams with IDs greater than the GoAwayId | ||
1024 | // and smaller than the previous GoAway ID should be killed. | ||
1025 | upperLimit := t.prevGoAwayID | ||
1026 | if upperLimit == 0 { // This is the first GoAway Frame. | ||
1027 | upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID. | ||
1028 | } | ||
1029 | for streamID, stream := range t.activeStreams { | ||
1030 | if streamID > id && streamID <= upperLimit { | ||
1031 | close(stream.goAway) | ||
1032 | } | ||
1033 | } | ||
1034 | t.prevGoAwayID = id | ||
1035 | active := len(t.activeStreams) | ||
1036 | t.mu.Unlock() | ||
1037 | if active == 0 { | ||
1038 | t.Close() | ||
1039 | } | ||
1040 | } | ||
1041 | |||
1042 | // setGoAwayReason sets the value of t.goAwayReason based | ||
1043 | // on the GoAway frame received. | ||
1044 | // It expects a lock on transport's mutext to be held by | ||
1045 | // the caller. | ||
1046 | func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) { | ||
1047 | t.goAwayReason = NoReason | ||
1048 | switch f.ErrCode { | ||
1049 | case http2.ErrCodeEnhanceYourCalm: | ||
1050 | if string(f.DebugData()) == "too_many_pings" { | ||
1051 | t.goAwayReason = TooManyPings | ||
1052 | } | ||
1053 | } | ||
1054 | } | ||
1055 | |||
1056 | func (t *http2Client) GetGoAwayReason() GoAwayReason { | ||
1057 | t.mu.Lock() | ||
1058 | defer t.mu.Unlock() | ||
1059 | return t.goAwayReason | ||
1060 | } | ||
1061 | |||
1062 | func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { | ||
1063 | id := f.Header().StreamID | ||
1064 | incr := f.Increment | ||
1065 | if id == 0 { | ||
1066 | t.sendQuotaPool.add(int(incr)) | ||
1067 | return | ||
1068 | } | ||
1069 | if s, ok := t.getStream(f); ok { | ||
1070 | s.sendQuotaPool.add(int(incr)) | ||
1071 | } | ||
1072 | } | ||
1073 | |||
1074 | // operateHeaders takes action on the decoded headers. | ||
1075 | func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { | ||
1076 | s, ok := t.getStream(frame) | ||
1077 | if !ok { | ||
1078 | return | ||
1079 | } | ||
1080 | s.mu.Lock() | ||
1081 | s.bytesReceived = true | ||
1082 | s.mu.Unlock() | ||
1083 | var state decodeState | ||
1084 | if err := state.decodeResponseHeader(frame); err != nil { | ||
1085 | s.mu.Lock() | ||
1086 | if !s.headerDone { | ||
1087 | close(s.headerChan) | ||
1088 | s.headerDone = true | ||
1089 | } | ||
1090 | s.mu.Unlock() | ||
1091 | s.write(recvMsg{err: err}) | ||
1092 | // Something wrong. Stops reading even when there is remaining. | ||
1093 | return | ||
1094 | } | ||
1095 | |||
1096 | endStream := frame.StreamEnded() | ||
1097 | var isHeader bool | ||
1098 | defer func() { | ||
1099 | if t.statsHandler != nil { | ||
1100 | if isHeader { | ||
1101 | inHeader := &stats.InHeader{ | ||
1102 | Client: true, | ||
1103 | WireLength: int(frame.Header().Length), | ||
1104 | } | ||
1105 | t.statsHandler.HandleRPC(s.ctx, inHeader) | ||
1106 | } else { | ||
1107 | inTrailer := &stats.InTrailer{ | ||
1108 | Client: true, | ||
1109 | WireLength: int(frame.Header().Length), | ||
1110 | } | ||
1111 | t.statsHandler.HandleRPC(s.ctx, inTrailer) | ||
1112 | } | ||
1113 | } | ||
1114 | }() | ||
1115 | |||
1116 | s.mu.Lock() | ||
1117 | if !endStream { | ||
1118 | s.recvCompress = state.encoding | ||
1119 | } | ||
1120 | if !s.headerDone { | ||
1121 | if !endStream && len(state.mdata) > 0 { | ||
1122 | s.header = state.mdata | ||
1123 | } | ||
1124 | close(s.headerChan) | ||
1125 | s.headerDone = true | ||
1126 | isHeader = true | ||
1127 | } | ||
1128 | if !endStream || s.state == streamDone { | ||
1129 | s.mu.Unlock() | ||
1130 | return | ||
1131 | } | ||
1132 | |||
1133 | if len(state.mdata) > 0 { | ||
1134 | s.trailer = state.mdata | ||
1135 | } | ||
1136 | s.finish(state.status()) | ||
1137 | s.mu.Unlock() | ||
1138 | s.write(recvMsg{err: io.EOF}) | ||
1139 | } | ||
1140 | |||
1141 | func handleMalformedHTTP2(s *Stream, err error) { | ||
1142 | s.mu.Lock() | ||
1143 | if !s.headerDone { | ||
1144 | close(s.headerChan) | ||
1145 | s.headerDone = true | ||
1146 | } | ||
1147 | s.mu.Unlock() | ||
1148 | s.write(recvMsg{err: err}) | ||
1149 | } | ||
1150 | |||
1151 | // reader runs as a separate goroutine in charge of reading data from network | ||
1152 | // connection. | ||
1153 | // | ||
1154 | // TODO(zhaoq): currently one reader per transport. Investigate whether this is | ||
1155 | // optimal. | ||
1156 | // TODO(zhaoq): Check the validity of the incoming frame sequence. | ||
1157 | func (t *http2Client) reader() { | ||
1158 | // Check the validity of server preface. | ||
1159 | frame, err := t.framer.readFrame() | ||
1160 | if err != nil { | ||
1161 | t.notifyError(err) | ||
1162 | return | ||
1163 | } | ||
1164 | atomic.CompareAndSwapUint32(&t.activity, 0, 1) | ||
1165 | sf, ok := frame.(*http2.SettingsFrame) | ||
1166 | if !ok { | ||
1167 | t.notifyError(err) | ||
1168 | return | ||
1169 | } | ||
1170 | t.handleSettings(sf) | ||
1171 | |||
1172 | // loop to keep reading incoming messages on this transport. | ||
1173 | for { | ||
1174 | frame, err := t.framer.readFrame() | ||
1175 | atomic.CompareAndSwapUint32(&t.activity, 0, 1) | ||
1176 | if err != nil { | ||
1177 | // Abort an active stream if the http2.Framer returns a | ||
1178 | // http2.StreamError. This can happen only if the server's response | ||
1179 | // is malformed http2. | ||
1180 | if se, ok := err.(http2.StreamError); ok { | ||
1181 | t.mu.Lock() | ||
1182 | s := t.activeStreams[se.StreamID] | ||
1183 | t.mu.Unlock() | ||
1184 | if s != nil { | ||
1185 | // use error detail to provide better err message | ||
1186 | handleMalformedHTTP2(s, streamErrorf(http2ErrConvTab[se.Code], "%v", t.framer.errorDetail())) | ||
1187 | } | ||
1188 | continue | ||
1189 | } else { | ||
1190 | // Transport error. | ||
1191 | t.notifyError(err) | ||
1192 | return | ||
1193 | } | ||
1194 | } | ||
1195 | switch frame := frame.(type) { | ||
1196 | case *http2.MetaHeadersFrame: | ||
1197 | t.operateHeaders(frame) | ||
1198 | case *http2.DataFrame: | ||
1199 | t.handleData(frame) | ||
1200 | case *http2.RSTStreamFrame: | ||
1201 | t.handleRSTStream(frame) | ||
1202 | case *http2.SettingsFrame: | ||
1203 | t.handleSettings(frame) | ||
1204 | case *http2.PingFrame: | ||
1205 | t.handlePing(frame) | ||
1206 | case *http2.GoAwayFrame: | ||
1207 | t.handleGoAway(frame) | ||
1208 | case *http2.WindowUpdateFrame: | ||
1209 | t.handleWindowUpdate(frame) | ||
1210 | default: | ||
1211 | errorf("transport: http2Client.reader got unhandled frame type %v.", frame) | ||
1212 | } | ||
1213 | } | ||
1214 | } | ||
1215 | |||
1216 | func (t *http2Client) applySettings(ss []http2.Setting) { | ||
1217 | for _, s := range ss { | ||
1218 | switch s.ID { | ||
1219 | case http2.SettingMaxConcurrentStreams: | ||
1220 | // TODO(zhaoq): This is a hack to avoid significant refactoring of the | ||
1221 | // code to deal with the unrealistic int32 overflow. Probably will try | ||
1222 | // to find a better way to handle this later. | ||
1223 | if s.Val > math.MaxInt32 { | ||
1224 | s.Val = math.MaxInt32 | ||
1225 | } | ||
1226 | t.mu.Lock() | ||
1227 | ms := t.maxStreams | ||
1228 | t.maxStreams = int(s.Val) | ||
1229 | t.mu.Unlock() | ||
1230 | t.streamsQuota.add(int(s.Val) - ms) | ||
1231 | case http2.SettingInitialWindowSize: | ||
1232 | t.mu.Lock() | ||
1233 | for _, stream := range t.activeStreams { | ||
1234 | // Adjust the sending quota for each stream. | ||
1235 | stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota)) | ||
1236 | } | ||
1237 | t.streamSendQuota = s.Val | ||
1238 | t.mu.Unlock() | ||
1239 | atomic.AddUint32(&t.outQuotaVersion, 1) | ||
1240 | } | ||
1241 | } | ||
1242 | } | ||
1243 | |||
1244 | // controller running in a separate goroutine takes charge of sending control | ||
1245 | // frames (e.g., window update, reset stream, setting, etc.) to the server. | ||
1246 | func (t *http2Client) controller() { | ||
1247 | for { | ||
1248 | select { | ||
1249 | case i := <-t.controlBuf.get(): | ||
1250 | t.controlBuf.load() | ||
1251 | select { | ||
1252 | case <-t.writableChan: | ||
1253 | switch i := i.(type) { | ||
1254 | case *windowUpdate: | ||
1255 | t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment) | ||
1256 | case *settings: | ||
1257 | if i.ack { | ||
1258 | t.framer.writeSettingsAck(true) | ||
1259 | t.applySettings(i.ss) | ||
1260 | } else { | ||
1261 | t.framer.writeSettings(true, i.ss...) | ||
1262 | } | ||
1263 | case *resetStream: | ||
1264 | // If the server needs to be to intimated about stream closing, | ||
1265 | // then we need to make sure the RST_STREAM frame is written to | ||
1266 | // the wire before the headers of the next stream waiting on | ||
1267 | // streamQuota. We ensure this by adding to the streamsQuota pool | ||
1268 | // only after having acquired the writableChan to send RST_STREAM. | ||
1269 | t.streamsQuota.add(1) | ||
1270 | t.framer.writeRSTStream(true, i.streamID, i.code) | ||
1271 | case *flushIO: | ||
1272 | t.framer.flushWrite() | ||
1273 | case *ping: | ||
1274 | if !i.ack { | ||
1275 | t.bdpEst.timesnap(i.data) | ||
1276 | } | ||
1277 | t.framer.writePing(true, i.ack, i.data) | ||
1278 | default: | ||
1279 | errorf("transport: http2Client.controller got unexpected item type %v\n", i) | ||
1280 | } | ||
1281 | t.writableChan <- 0 | ||
1282 | continue | ||
1283 | case <-t.shutdownChan: | ||
1284 | return | ||
1285 | } | ||
1286 | case <-t.shutdownChan: | ||
1287 | return | ||
1288 | } | ||
1289 | } | ||
1290 | } | ||
1291 | |||
1292 | // keepalive running in a separate goroutune makes sure the connection is alive by sending pings. | ||
1293 | func (t *http2Client) keepalive() { | ||
1294 | p := &ping{data: [8]byte{}} | ||
1295 | timer := time.NewTimer(t.kp.Time) | ||
1296 | for { | ||
1297 | select { | ||
1298 | case <-timer.C: | ||
1299 | if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | ||
1300 | timer.Reset(t.kp.Time) | ||
1301 | continue | ||
1302 | } | ||
1303 | // Check if keepalive should go dormant. | ||
1304 | t.mu.Lock() | ||
1305 | if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { | ||
1306 | // Make awakenKeepalive writable. | ||
1307 | <-t.awakenKeepalive | ||
1308 | t.mu.Unlock() | ||
1309 | select { | ||
1310 | case <-t.awakenKeepalive: | ||
1311 | // If the control gets here a ping has been sent | ||
1312 | // need to reset the timer with keepalive.Timeout. | ||
1313 | case <-t.shutdownChan: | ||
1314 | return | ||
1315 | } | ||
1316 | } else { | ||
1317 | t.mu.Unlock() | ||
1318 | // Send ping. | ||
1319 | t.controlBuf.put(p) | ||
1320 | } | ||
1321 | |||
1322 | // By the time control gets here a ping has been sent one way or the other. | ||
1323 | timer.Reset(t.kp.Timeout) | ||
1324 | select { | ||
1325 | case <-timer.C: | ||
1326 | if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | ||
1327 | timer.Reset(t.kp.Time) | ||
1328 | continue | ||
1329 | } | ||
1330 | t.Close() | ||
1331 | return | ||
1332 | case <-t.shutdownChan: | ||
1333 | if !timer.Stop() { | ||
1334 | <-timer.C | ||
1335 | } | ||
1336 | return | ||
1337 | } | ||
1338 | case <-t.shutdownChan: | ||
1339 | if !timer.Stop() { | ||
1340 | <-timer.C | ||
1341 | } | ||
1342 | return | ||
1343 | } | ||
1344 | } | ||
1345 | } | ||
1346 | |||
1347 | func (t *http2Client) Error() <-chan struct{} { | ||
1348 | return t.errorChan | ||
1349 | } | ||
1350 | |||
1351 | func (t *http2Client) GoAway() <-chan struct{} { | ||
1352 | return t.goAway | ||
1353 | } | ||
1354 | |||
1355 | func (t *http2Client) notifyError(err error) { | ||
1356 | t.mu.Lock() | ||
1357 | // make sure t.errorChan is closed only once. | ||
1358 | if t.state == draining { | ||
1359 | t.mu.Unlock() | ||
1360 | t.Close() | ||
1361 | return | ||
1362 | } | ||
1363 | if t.state == reachable { | ||
1364 | t.state = unreachable | ||
1365 | close(t.errorChan) | ||
1366 | infof("transport: http2Client.notifyError got notified that the client transport was broken %v.", err) | ||
1367 | } | ||
1368 | t.mu.Unlock() | ||
1369 | } | ||
diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go new file mode 100644 index 0000000..b6f93e3 --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/http2_server.go | |||
@@ -0,0 +1,1195 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package transport | ||
20 | |||
21 | import ( | ||
22 | "bytes" | ||
23 | "errors" | ||
24 | "io" | ||
25 | "math" | ||
26 | "math/rand" | ||
27 | "net" | ||
28 | "strconv" | ||
29 | "sync" | ||
30 | "sync/atomic" | ||
31 | "time" | ||
32 | |||
33 | "github.com/golang/protobuf/proto" | ||
34 | "golang.org/x/net/context" | ||
35 | "golang.org/x/net/http2" | ||
36 | "golang.org/x/net/http2/hpack" | ||
37 | "google.golang.org/grpc/codes" | ||
38 | "google.golang.org/grpc/credentials" | ||
39 | "google.golang.org/grpc/keepalive" | ||
40 | "google.golang.org/grpc/metadata" | ||
41 | "google.golang.org/grpc/peer" | ||
42 | "google.golang.org/grpc/stats" | ||
43 | "google.golang.org/grpc/status" | ||
44 | "google.golang.org/grpc/tap" | ||
45 | ) | ||
46 | |||
47 | // ErrIllegalHeaderWrite indicates that setting header is illegal because of | ||
48 | // the stream's state. | ||
49 | var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called") | ||
50 | |||
51 | // http2Server implements the ServerTransport interface with HTTP2. | ||
52 | type http2Server struct { | ||
53 | ctx context.Context | ||
54 | conn net.Conn | ||
55 | remoteAddr net.Addr | ||
56 | localAddr net.Addr | ||
57 | maxStreamID uint32 // max stream ID ever seen | ||
58 | authInfo credentials.AuthInfo // auth info about the connection | ||
59 | inTapHandle tap.ServerInHandle | ||
60 | // writableChan synchronizes write access to the transport. | ||
61 | // A writer acquires the write lock by receiving a value on writableChan | ||
62 | // and releases it by sending on writableChan. | ||
63 | writableChan chan int | ||
64 | // shutdownChan is closed when Close is called. | ||
65 | // Blocking operations should select on shutdownChan to avoid | ||
66 | // blocking forever after Close. | ||
67 | shutdownChan chan struct{} | ||
68 | framer *framer | ||
69 | hBuf *bytes.Buffer // the buffer for HPACK encoding | ||
70 | hEnc *hpack.Encoder // HPACK encoder | ||
71 | // The max number of concurrent streams. | ||
72 | maxStreams uint32 | ||
73 | // controlBuf delivers all the control related tasks (e.g., window | ||
74 | // updates, reset streams, and various settings) to the controller. | ||
75 | controlBuf *controlBuffer | ||
76 | fc *inFlow | ||
77 | // sendQuotaPool provides flow control to outbound message. | ||
78 | sendQuotaPool *quotaPool | ||
79 | stats stats.Handler | ||
80 | // Flag to keep track of reading activity on transport. | ||
81 | // 1 is true and 0 is false. | ||
82 | activity uint32 // Accessed atomically. | ||
83 | // Keepalive and max-age parameters for the server. | ||
84 | kp keepalive.ServerParameters | ||
85 | |||
86 | // Keepalive enforcement policy. | ||
87 | kep keepalive.EnforcementPolicy | ||
88 | // The time instance last ping was received. | ||
89 | lastPingAt time.Time | ||
90 | // Number of times the client has violated keepalive ping policy so far. | ||
91 | pingStrikes uint8 | ||
92 | // Flag to signify that number of ping strikes should be reset to 0. | ||
93 | // This is set whenever data or header frames are sent. | ||
94 | // 1 means yes. | ||
95 | resetPingStrikes uint32 // Accessed atomically. | ||
96 | initialWindowSize int32 | ||
97 | bdpEst *bdpEstimator | ||
98 | |||
99 | outQuotaVersion uint32 | ||
100 | |||
101 | mu sync.Mutex // guard the following | ||
102 | |||
103 | // drainChan is initialized when drain(...) is called the first time. | ||
104 | // After which the server writes out the first GoAway(with ID 2^31-1) frame. | ||
105 | // Then an independent goroutine will be launched to later send the second GoAway. | ||
106 | // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. | ||
107 | // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is | ||
108 | // already underway. | ||
109 | drainChan chan struct{} | ||
110 | state transportState | ||
111 | activeStreams map[uint32]*Stream | ||
112 | // the per-stream outbound flow control window size set by the peer. | ||
113 | streamSendQuota uint32 | ||
114 | // idle is the time instant when the connection went idle. | ||
115 | // This is either the begining of the connection or when the number of | ||
116 | // RPCs go down to 0. | ||
117 | // When the connection is busy, this value is set to 0. | ||
118 | idle time.Time | ||
119 | } | ||
120 | |||
121 | // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is | ||
122 | // returned if something goes wrong. | ||
123 | func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { | ||
124 | framer := newFramer(conn) | ||
125 | // Send initial settings as connection preface to client. | ||
126 | var isettings []http2.Setting | ||
127 | // TODO(zhaoq): Have a better way to signal "no limit" because 0 is | ||
128 | // permitted in the HTTP2 spec. | ||
129 | maxStreams := config.MaxStreams | ||
130 | if maxStreams == 0 { | ||
131 | maxStreams = math.MaxUint32 | ||
132 | } else { | ||
133 | isettings = append(isettings, http2.Setting{ | ||
134 | ID: http2.SettingMaxConcurrentStreams, | ||
135 | Val: maxStreams, | ||
136 | }) | ||
137 | } | ||
138 | dynamicWindow := true | ||
139 | iwz := int32(initialWindowSize) | ||
140 | if config.InitialWindowSize >= defaultWindowSize { | ||
141 | iwz = config.InitialWindowSize | ||
142 | dynamicWindow = false | ||
143 | } | ||
144 | icwz := int32(initialWindowSize) | ||
145 | if config.InitialConnWindowSize >= defaultWindowSize { | ||
146 | icwz = config.InitialConnWindowSize | ||
147 | dynamicWindow = false | ||
148 | } | ||
149 | if iwz != defaultWindowSize { | ||
150 | isettings = append(isettings, http2.Setting{ | ||
151 | ID: http2.SettingInitialWindowSize, | ||
152 | Val: uint32(iwz)}) | ||
153 | } | ||
154 | if err := framer.writeSettings(true, isettings...); err != nil { | ||
155 | return nil, connectionErrorf(true, err, "transport: %v", err) | ||
156 | } | ||
157 | // Adjust the connection flow control window if needed. | ||
158 | if delta := uint32(icwz - defaultWindowSize); delta > 0 { | ||
159 | if err := framer.writeWindowUpdate(true, 0, delta); err != nil { | ||
160 | return nil, connectionErrorf(true, err, "transport: %v", err) | ||
161 | } | ||
162 | } | ||
163 | kp := config.KeepaliveParams | ||
164 | if kp.MaxConnectionIdle == 0 { | ||
165 | kp.MaxConnectionIdle = defaultMaxConnectionIdle | ||
166 | } | ||
167 | if kp.MaxConnectionAge == 0 { | ||
168 | kp.MaxConnectionAge = defaultMaxConnectionAge | ||
169 | } | ||
170 | // Add a jitter to MaxConnectionAge. | ||
171 | kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge) | ||
172 | if kp.MaxConnectionAgeGrace == 0 { | ||
173 | kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace | ||
174 | } | ||
175 | if kp.Time == 0 { | ||
176 | kp.Time = defaultServerKeepaliveTime | ||
177 | } | ||
178 | if kp.Timeout == 0 { | ||
179 | kp.Timeout = defaultServerKeepaliveTimeout | ||
180 | } | ||
181 | kep := config.KeepalivePolicy | ||
182 | if kep.MinTime == 0 { | ||
183 | kep.MinTime = defaultKeepalivePolicyMinTime | ||
184 | } | ||
185 | var buf bytes.Buffer | ||
186 | t := &http2Server{ | ||
187 | ctx: context.Background(), | ||
188 | conn: conn, | ||
189 | remoteAddr: conn.RemoteAddr(), | ||
190 | localAddr: conn.LocalAddr(), | ||
191 | authInfo: config.AuthInfo, | ||
192 | framer: framer, | ||
193 | hBuf: &buf, | ||
194 | hEnc: hpack.NewEncoder(&buf), | ||
195 | maxStreams: maxStreams, | ||
196 | inTapHandle: config.InTapHandle, | ||
197 | controlBuf: newControlBuffer(), | ||
198 | fc: &inFlow{limit: uint32(icwz)}, | ||
199 | sendQuotaPool: newQuotaPool(defaultWindowSize), | ||
200 | state: reachable, | ||
201 | writableChan: make(chan int, 1), | ||
202 | shutdownChan: make(chan struct{}), | ||
203 | activeStreams: make(map[uint32]*Stream), | ||
204 | streamSendQuota: defaultWindowSize, | ||
205 | stats: config.StatsHandler, | ||
206 | kp: kp, | ||
207 | idle: time.Now(), | ||
208 | kep: kep, | ||
209 | initialWindowSize: iwz, | ||
210 | } | ||
211 | if dynamicWindow { | ||
212 | t.bdpEst = &bdpEstimator{ | ||
213 | bdp: initialWindowSize, | ||
214 | updateFlowControl: t.updateFlowControl, | ||
215 | } | ||
216 | } | ||
217 | if t.stats != nil { | ||
218 | t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ | ||
219 | RemoteAddr: t.remoteAddr, | ||
220 | LocalAddr: t.localAddr, | ||
221 | }) | ||
222 | connBegin := &stats.ConnBegin{} | ||
223 | t.stats.HandleConn(t.ctx, connBegin) | ||
224 | } | ||
225 | go t.controller() | ||
226 | go t.keepalive() | ||
227 | t.writableChan <- 0 | ||
228 | return t, nil | ||
229 | } | ||
230 | |||
231 | // operateHeader takes action on the decoded headers. | ||
232 | func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) { | ||
233 | buf := newRecvBuffer() | ||
234 | s := &Stream{ | ||
235 | id: frame.Header().StreamID, | ||
236 | st: t, | ||
237 | buf: buf, | ||
238 | fc: &inFlow{limit: uint32(t.initialWindowSize)}, | ||
239 | } | ||
240 | |||
241 | var state decodeState | ||
242 | for _, hf := range frame.Fields { | ||
243 | if err := state.processHeaderField(hf); err != nil { | ||
244 | if se, ok := err.(StreamError); ok { | ||
245 | t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]}) | ||
246 | } | ||
247 | return | ||
248 | } | ||
249 | } | ||
250 | |||
251 | if frame.StreamEnded() { | ||
252 | // s is just created by the caller. No lock needed. | ||
253 | s.state = streamReadDone | ||
254 | } | ||
255 | s.recvCompress = state.encoding | ||
256 | if state.timeoutSet { | ||
257 | s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout) | ||
258 | } else { | ||
259 | s.ctx, s.cancel = context.WithCancel(t.ctx) | ||
260 | } | ||
261 | pr := &peer.Peer{ | ||
262 | Addr: t.remoteAddr, | ||
263 | } | ||
264 | // Attach Auth info if there is any. | ||
265 | if t.authInfo != nil { | ||
266 | pr.AuthInfo = t.authInfo | ||
267 | } | ||
268 | s.ctx = peer.NewContext(s.ctx, pr) | ||
269 | // Cache the current stream to the context so that the server application | ||
270 | // can find out. Required when the server wants to send some metadata | ||
271 | // back to the client (unary call only). | ||
272 | s.ctx = newContextWithStream(s.ctx, s) | ||
273 | // Attach the received metadata to the context. | ||
274 | if len(state.mdata) > 0 { | ||
275 | s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata) | ||
276 | } | ||
277 | s.trReader = &transportReader{ | ||
278 | reader: &recvBufferReader{ | ||
279 | ctx: s.ctx, | ||
280 | recv: s.buf, | ||
281 | }, | ||
282 | windowHandler: func(n int) { | ||
283 | t.updateWindow(s, uint32(n)) | ||
284 | }, | ||
285 | } | ||
286 | s.recvCompress = state.encoding | ||
287 | s.method = state.method | ||
288 | if t.inTapHandle != nil { | ||
289 | var err error | ||
290 | info := &tap.Info{ | ||
291 | FullMethodName: state.method, | ||
292 | } | ||
293 | s.ctx, err = t.inTapHandle(s.ctx, info) | ||
294 | if err != nil { | ||
295 | warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err) | ||
296 | t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream}) | ||
297 | return | ||
298 | } | ||
299 | } | ||
300 | t.mu.Lock() | ||
301 | if t.state != reachable { | ||
302 | t.mu.Unlock() | ||
303 | return | ||
304 | } | ||
305 | if uint32(len(t.activeStreams)) >= t.maxStreams { | ||
306 | t.mu.Unlock() | ||
307 | t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream}) | ||
308 | return | ||
309 | } | ||
310 | if s.id%2 != 1 || s.id <= t.maxStreamID { | ||
311 | t.mu.Unlock() | ||
312 | // illegal gRPC stream id. | ||
313 | errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", s.id) | ||
314 | return true | ||
315 | } | ||
316 | t.maxStreamID = s.id | ||
317 | s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota)) | ||
318 | t.activeStreams[s.id] = s | ||
319 | if len(t.activeStreams) == 1 { | ||
320 | t.idle = time.Time{} | ||
321 | } | ||
322 | t.mu.Unlock() | ||
323 | s.requestRead = func(n int) { | ||
324 | t.adjustWindow(s, uint32(n)) | ||
325 | } | ||
326 | s.ctx = traceCtx(s.ctx, s.method) | ||
327 | if t.stats != nil { | ||
328 | s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) | ||
329 | inHeader := &stats.InHeader{ | ||
330 | FullMethod: s.method, | ||
331 | RemoteAddr: t.remoteAddr, | ||
332 | LocalAddr: t.localAddr, | ||
333 | Compression: s.recvCompress, | ||
334 | WireLength: int(frame.Header().Length), | ||
335 | } | ||
336 | t.stats.HandleRPC(s.ctx, inHeader) | ||
337 | } | ||
338 | handle(s) | ||
339 | return | ||
340 | } | ||
341 | |||
342 | // HandleStreams receives incoming streams using the given handler. This is | ||
343 | // typically run in a separate goroutine. | ||
344 | // traceCtx attaches trace to ctx and returns the new context. | ||
345 | func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { | ||
346 | // Check the validity of client preface. | ||
347 | preface := make([]byte, len(clientPreface)) | ||
348 | if _, err := io.ReadFull(t.conn, preface); err != nil { | ||
349 | // Only log if it isn't a simple tcp accept check (ie: tcp balancer doing open/close socket) | ||
350 | if err != io.EOF { | ||
351 | errorf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err) | ||
352 | } | ||
353 | t.Close() | ||
354 | return | ||
355 | } | ||
356 | if !bytes.Equal(preface, clientPreface) { | ||
357 | errorf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface) | ||
358 | t.Close() | ||
359 | return | ||
360 | } | ||
361 | |||
362 | frame, err := t.framer.readFrame() | ||
363 | if err == io.EOF || err == io.ErrUnexpectedEOF { | ||
364 | t.Close() | ||
365 | return | ||
366 | } | ||
367 | if err != nil { | ||
368 | errorf("transport: http2Server.HandleStreams failed to read initial settings frame: %v", err) | ||
369 | t.Close() | ||
370 | return | ||
371 | } | ||
372 | atomic.StoreUint32(&t.activity, 1) | ||
373 | sf, ok := frame.(*http2.SettingsFrame) | ||
374 | if !ok { | ||
375 | errorf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) | ||
376 | t.Close() | ||
377 | return | ||
378 | } | ||
379 | t.handleSettings(sf) | ||
380 | |||
381 | for { | ||
382 | frame, err := t.framer.readFrame() | ||
383 | atomic.StoreUint32(&t.activity, 1) | ||
384 | if err != nil { | ||
385 | if se, ok := err.(http2.StreamError); ok { | ||
386 | t.mu.Lock() | ||
387 | s := t.activeStreams[se.StreamID] | ||
388 | t.mu.Unlock() | ||
389 | if s != nil { | ||
390 | t.closeStream(s) | ||
391 | } | ||
392 | t.controlBuf.put(&resetStream{se.StreamID, se.Code}) | ||
393 | continue | ||
394 | } | ||
395 | if err == io.EOF || err == io.ErrUnexpectedEOF { | ||
396 | t.Close() | ||
397 | return | ||
398 | } | ||
399 | warningf("transport: http2Server.HandleStreams failed to read frame: %v", err) | ||
400 | t.Close() | ||
401 | return | ||
402 | } | ||
403 | switch frame := frame.(type) { | ||
404 | case *http2.MetaHeadersFrame: | ||
405 | if t.operateHeaders(frame, handle, traceCtx) { | ||
406 | t.Close() | ||
407 | break | ||
408 | } | ||
409 | case *http2.DataFrame: | ||
410 | t.handleData(frame) | ||
411 | case *http2.RSTStreamFrame: | ||
412 | t.handleRSTStream(frame) | ||
413 | case *http2.SettingsFrame: | ||
414 | t.handleSettings(frame) | ||
415 | case *http2.PingFrame: | ||
416 | t.handlePing(frame) | ||
417 | case *http2.WindowUpdateFrame: | ||
418 | t.handleWindowUpdate(frame) | ||
419 | case *http2.GoAwayFrame: | ||
420 | // TODO: Handle GoAway from the client appropriately. | ||
421 | default: | ||
422 | errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) | ||
423 | } | ||
424 | } | ||
425 | } | ||
426 | |||
427 | func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { | ||
428 | t.mu.Lock() | ||
429 | defer t.mu.Unlock() | ||
430 | if t.activeStreams == nil { | ||
431 | // The transport is closing. | ||
432 | return nil, false | ||
433 | } | ||
434 | s, ok := t.activeStreams[f.Header().StreamID] | ||
435 | if !ok { | ||
436 | // The stream is already done. | ||
437 | return nil, false | ||
438 | } | ||
439 | return s, true | ||
440 | } | ||
441 | |||
442 | // adjustWindow sends out extra window update over the initial window size | ||
443 | // of stream if the application is requesting data larger in size than | ||
444 | // the window. | ||
445 | func (t *http2Server) adjustWindow(s *Stream, n uint32) { | ||
446 | s.mu.Lock() | ||
447 | defer s.mu.Unlock() | ||
448 | if s.state == streamDone { | ||
449 | return | ||
450 | } | ||
451 | if w := s.fc.maybeAdjust(n); w > 0 { | ||
452 | if cw := t.fc.resetPendingUpdate(); cw > 0 { | ||
453 | t.controlBuf.put(&windowUpdate{0, cw, false}) | ||
454 | } | ||
455 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | ||
456 | } | ||
457 | } | ||
458 | |||
459 | // updateWindow adjusts the inbound quota for the stream and the transport. | ||
460 | // Window updates will deliver to the controller for sending when | ||
461 | // the cumulative quota exceeds the corresponding threshold. | ||
462 | func (t *http2Server) updateWindow(s *Stream, n uint32) { | ||
463 | s.mu.Lock() | ||
464 | defer s.mu.Unlock() | ||
465 | if s.state == streamDone { | ||
466 | return | ||
467 | } | ||
468 | if w := s.fc.onRead(n); w > 0 { | ||
469 | if cw := t.fc.resetPendingUpdate(); cw > 0 { | ||
470 | t.controlBuf.put(&windowUpdate{0, cw, false}) | ||
471 | } | ||
472 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | ||
473 | } | ||
474 | } | ||
475 | |||
476 | // updateFlowControl updates the incoming flow control windows | ||
477 | // for the transport and the stream based on the current bdp | ||
478 | // estimation. | ||
479 | func (t *http2Server) updateFlowControl(n uint32) { | ||
480 | t.mu.Lock() | ||
481 | for _, s := range t.activeStreams { | ||
482 | s.fc.newLimit(n) | ||
483 | } | ||
484 | t.initialWindowSize = int32(n) | ||
485 | t.mu.Unlock() | ||
486 | t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false}) | ||
487 | t.controlBuf.put(&settings{ | ||
488 | ack: false, | ||
489 | ss: []http2.Setting{ | ||
490 | { | ||
491 | ID: http2.SettingInitialWindowSize, | ||
492 | Val: uint32(n), | ||
493 | }, | ||
494 | }, | ||
495 | }) | ||
496 | |||
497 | } | ||
498 | |||
499 | func (t *http2Server) handleData(f *http2.DataFrame) { | ||
500 | size := f.Header().Length | ||
501 | var sendBDPPing bool | ||
502 | if t.bdpEst != nil { | ||
503 | sendBDPPing = t.bdpEst.add(uint32(size)) | ||
504 | } | ||
505 | // Decouple connection's flow control from application's read. | ||
506 | // An update on connection's flow control should not depend on | ||
507 | // whether user application has read the data or not. Such a | ||
508 | // restriction is already imposed on the stream's flow control, | ||
509 | // and therefore the sender will be blocked anyways. | ||
510 | // Decoupling the connection flow control will prevent other | ||
511 | // active(fast) streams from starving in presence of slow or | ||
512 | // inactive streams. | ||
513 | // | ||
514 | // Furthermore, if a bdpPing is being sent out we can piggyback | ||
515 | // connection's window update for the bytes we just received. | ||
516 | if sendBDPPing { | ||
517 | t.controlBuf.put(&windowUpdate{0, uint32(size), false}) | ||
518 | t.controlBuf.put(bdpPing) | ||
519 | } else { | ||
520 | if err := t.fc.onData(uint32(size)); err != nil { | ||
521 | errorf("transport: http2Server %v", err) | ||
522 | t.Close() | ||
523 | return | ||
524 | } | ||
525 | if w := t.fc.onRead(uint32(size)); w > 0 { | ||
526 | t.controlBuf.put(&windowUpdate{0, w, true}) | ||
527 | } | ||
528 | } | ||
529 | // Select the right stream to dispatch. | ||
530 | s, ok := t.getStream(f) | ||
531 | if !ok { | ||
532 | return | ||
533 | } | ||
534 | if size > 0 { | ||
535 | s.mu.Lock() | ||
536 | if s.state == streamDone { | ||
537 | s.mu.Unlock() | ||
538 | return | ||
539 | } | ||
540 | if err := s.fc.onData(uint32(size)); err != nil { | ||
541 | s.mu.Unlock() | ||
542 | t.closeStream(s) | ||
543 | t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) | ||
544 | return | ||
545 | } | ||
546 | if f.Header().Flags.Has(http2.FlagDataPadded) { | ||
547 | if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { | ||
548 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | ||
549 | } | ||
550 | } | ||
551 | s.mu.Unlock() | ||
552 | // TODO(bradfitz, zhaoq): A copy is required here because there is no | ||
553 | // guarantee f.Data() is consumed before the arrival of next frame. | ||
554 | // Can this copy be eliminated? | ||
555 | if len(f.Data()) > 0 { | ||
556 | data := make([]byte, len(f.Data())) | ||
557 | copy(data, f.Data()) | ||
558 | s.write(recvMsg{data: data}) | ||
559 | } | ||
560 | } | ||
561 | if f.Header().Flags.Has(http2.FlagDataEndStream) { | ||
562 | // Received the end of stream from the client. | ||
563 | s.mu.Lock() | ||
564 | if s.state != streamDone { | ||
565 | s.state = streamReadDone | ||
566 | } | ||
567 | s.mu.Unlock() | ||
568 | s.write(recvMsg{err: io.EOF}) | ||
569 | } | ||
570 | } | ||
571 | |||
572 | func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { | ||
573 | s, ok := t.getStream(f) | ||
574 | if !ok { | ||
575 | return | ||
576 | } | ||
577 | t.closeStream(s) | ||
578 | } | ||
579 | |||
580 | func (t *http2Server) handleSettings(f *http2.SettingsFrame) { | ||
581 | if f.IsAck() { | ||
582 | return | ||
583 | } | ||
584 | var ss []http2.Setting | ||
585 | f.ForeachSetting(func(s http2.Setting) error { | ||
586 | ss = append(ss, s) | ||
587 | return nil | ||
588 | }) | ||
589 | // The settings will be applied once the ack is sent. | ||
590 | t.controlBuf.put(&settings{ack: true, ss: ss}) | ||
591 | } | ||
592 | |||
593 | const ( | ||
594 | maxPingStrikes = 2 | ||
595 | defaultPingTimeout = 2 * time.Hour | ||
596 | ) | ||
597 | |||
598 | func (t *http2Server) handlePing(f *http2.PingFrame) { | ||
599 | if f.IsAck() { | ||
600 | if f.Data == goAwayPing.data && t.drainChan != nil { | ||
601 | close(t.drainChan) | ||
602 | return | ||
603 | } | ||
604 | // Maybe it's a BDP ping. | ||
605 | if t.bdpEst != nil { | ||
606 | t.bdpEst.calculate(f.Data) | ||
607 | } | ||
608 | return | ||
609 | } | ||
610 | pingAck := &ping{ack: true} | ||
611 | copy(pingAck.data[:], f.Data[:]) | ||
612 | t.controlBuf.put(pingAck) | ||
613 | |||
614 | now := time.Now() | ||
615 | defer func() { | ||
616 | t.lastPingAt = now | ||
617 | }() | ||
618 | // A reset ping strikes means that we don't need to check for policy | ||
619 | // violation for this ping and the pingStrikes counter should be set | ||
620 | // to 0. | ||
621 | if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) { | ||
622 | t.pingStrikes = 0 | ||
623 | return | ||
624 | } | ||
625 | t.mu.Lock() | ||
626 | ns := len(t.activeStreams) | ||
627 | t.mu.Unlock() | ||
628 | if ns < 1 && !t.kep.PermitWithoutStream { | ||
629 | // Keepalive shouldn't be active thus, this new ping should | ||
630 | // have come after atleast defaultPingTimeout. | ||
631 | if t.lastPingAt.Add(defaultPingTimeout).After(now) { | ||
632 | t.pingStrikes++ | ||
633 | } | ||
634 | } else { | ||
635 | // Check if keepalive policy is respected. | ||
636 | if t.lastPingAt.Add(t.kep.MinTime).After(now) { | ||
637 | t.pingStrikes++ | ||
638 | } | ||
639 | } | ||
640 | |||
641 | if t.pingStrikes > maxPingStrikes { | ||
642 | // Send goaway and close the connection. | ||
643 | t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) | ||
644 | } | ||
645 | } | ||
646 | |||
647 | func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { | ||
648 | id := f.Header().StreamID | ||
649 | incr := f.Increment | ||
650 | if id == 0 { | ||
651 | t.sendQuotaPool.add(int(incr)) | ||
652 | return | ||
653 | } | ||
654 | if s, ok := t.getStream(f); ok { | ||
655 | s.sendQuotaPool.add(int(incr)) | ||
656 | } | ||
657 | } | ||
658 | |||
659 | func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error { | ||
660 | first := true | ||
661 | endHeaders := false | ||
662 | var err error | ||
663 | defer func() { | ||
664 | if err == nil { | ||
665 | // Reset ping strikes when seding headers since that might cause the | ||
666 | // peer to send ping. | ||
667 | atomic.StoreUint32(&t.resetPingStrikes, 1) | ||
668 | } | ||
669 | }() | ||
670 | // Sends the headers in a single batch. | ||
671 | for !endHeaders { | ||
672 | size := t.hBuf.Len() | ||
673 | if size > http2MaxFrameLen { | ||
674 | size = http2MaxFrameLen | ||
675 | } else { | ||
676 | endHeaders = true | ||
677 | } | ||
678 | if first { | ||
679 | p := http2.HeadersFrameParam{ | ||
680 | StreamID: s.id, | ||
681 | BlockFragment: b.Next(size), | ||
682 | EndStream: endStream, | ||
683 | EndHeaders: endHeaders, | ||
684 | } | ||
685 | err = t.framer.writeHeaders(endHeaders, p) | ||
686 | first = false | ||
687 | } else { | ||
688 | err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size)) | ||
689 | } | ||
690 | if err != nil { | ||
691 | t.Close() | ||
692 | return connectionErrorf(true, err, "transport: %v", err) | ||
693 | } | ||
694 | } | ||
695 | return nil | ||
696 | } | ||
697 | |||
698 | // WriteHeader sends the header metedata md back to the client. | ||
699 | func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { | ||
700 | s.mu.Lock() | ||
701 | if s.headerOk || s.state == streamDone { | ||
702 | s.mu.Unlock() | ||
703 | return ErrIllegalHeaderWrite | ||
704 | } | ||
705 | s.headerOk = true | ||
706 | if md.Len() > 0 { | ||
707 | if s.header.Len() > 0 { | ||
708 | s.header = metadata.Join(s.header, md) | ||
709 | } else { | ||
710 | s.header = md | ||
711 | } | ||
712 | } | ||
713 | md = s.header | ||
714 | s.mu.Unlock() | ||
715 | if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { | ||
716 | return err | ||
717 | } | ||
718 | t.hBuf.Reset() | ||
719 | t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) | ||
720 | t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) | ||
721 | if s.sendCompress != "" { | ||
722 | t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) | ||
723 | } | ||
724 | for k, vv := range md { | ||
725 | if isReservedHeader(k) { | ||
726 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | ||
727 | continue | ||
728 | } | ||
729 | for _, v := range vv { | ||
730 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
731 | } | ||
732 | } | ||
733 | bufLen := t.hBuf.Len() | ||
734 | if err := t.writeHeaders(s, t.hBuf, false); err != nil { | ||
735 | return err | ||
736 | } | ||
737 | if t.stats != nil { | ||
738 | outHeader := &stats.OutHeader{ | ||
739 | WireLength: bufLen, | ||
740 | } | ||
741 | t.stats.HandleRPC(s.Context(), outHeader) | ||
742 | } | ||
743 | t.writableChan <- 0 | ||
744 | return nil | ||
745 | } | ||
746 | |||
747 | // WriteStatus sends stream status to the client and terminates the stream. | ||
748 | // There is no further I/O operations being able to perform on this stream. | ||
749 | // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early | ||
750 | // OK is adopted. | ||
751 | func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { | ||
752 | var headersSent, hasHeader bool | ||
753 | s.mu.Lock() | ||
754 | if s.state == streamDone { | ||
755 | s.mu.Unlock() | ||
756 | return nil | ||
757 | } | ||
758 | if s.headerOk { | ||
759 | headersSent = true | ||
760 | } | ||
761 | if s.header.Len() > 0 { | ||
762 | hasHeader = true | ||
763 | } | ||
764 | s.mu.Unlock() | ||
765 | |||
766 | if !headersSent && hasHeader { | ||
767 | t.WriteHeader(s, nil) | ||
768 | headersSent = true | ||
769 | } | ||
770 | |||
771 | if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { | ||
772 | return err | ||
773 | } | ||
774 | t.hBuf.Reset() | ||
775 | if !headersSent { | ||
776 | t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) | ||
777 | t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) | ||
778 | } | ||
779 | t.hEnc.WriteField( | ||
780 | hpack.HeaderField{ | ||
781 | Name: "grpc-status", | ||
782 | Value: strconv.Itoa(int(st.Code())), | ||
783 | }) | ||
784 | t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) | ||
785 | |||
786 | if p := st.Proto(); p != nil && len(p.Details) > 0 { | ||
787 | stBytes, err := proto.Marshal(p) | ||
788 | if err != nil { | ||
789 | // TODO: return error instead, when callers are able to handle it. | ||
790 | panic(err) | ||
791 | } | ||
792 | |||
793 | t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)}) | ||
794 | } | ||
795 | |||
796 | // Attach the trailer metadata. | ||
797 | for k, vv := range s.trailer { | ||
798 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | ||
799 | if isReservedHeader(k) { | ||
800 | continue | ||
801 | } | ||
802 | for _, v := range vv { | ||
803 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
804 | } | ||
805 | } | ||
806 | bufLen := t.hBuf.Len() | ||
807 | if err := t.writeHeaders(s, t.hBuf, true); err != nil { | ||
808 | t.Close() | ||
809 | return err | ||
810 | } | ||
811 | if t.stats != nil { | ||
812 | outTrailer := &stats.OutTrailer{ | ||
813 | WireLength: bufLen, | ||
814 | } | ||
815 | t.stats.HandleRPC(s.Context(), outTrailer) | ||
816 | } | ||
817 | t.closeStream(s) | ||
818 | t.writableChan <- 0 | ||
819 | return nil | ||
820 | } | ||
821 | |||
822 | // Write converts the data into HTTP2 data frame and sends it out. Non-nil error | ||
823 | // is returns if it fails (e.g., framing error, transport error). | ||
824 | func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { | ||
825 | // TODO(zhaoq): Support multi-writers for a single stream. | ||
826 | var writeHeaderFrame bool | ||
827 | s.mu.Lock() | ||
828 | if s.state == streamDone { | ||
829 | s.mu.Unlock() | ||
830 | return streamErrorf(codes.Unknown, "the stream has been done") | ||
831 | } | ||
832 | if !s.headerOk { | ||
833 | writeHeaderFrame = true | ||
834 | } | ||
835 | s.mu.Unlock() | ||
836 | if writeHeaderFrame { | ||
837 | t.WriteHeader(s, nil) | ||
838 | } | ||
839 | r := bytes.NewBuffer(data) | ||
840 | var ( | ||
841 | p []byte | ||
842 | oqv uint32 | ||
843 | ) | ||
844 | for { | ||
845 | if r.Len() == 0 && p == nil { | ||
846 | return nil | ||
847 | } | ||
848 | oqv = atomic.LoadUint32(&t.outQuotaVersion) | ||
849 | size := http2MaxFrameLen | ||
850 | // Wait until the stream has some quota to send the data. | ||
851 | sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire()) | ||
852 | if err != nil { | ||
853 | return err | ||
854 | } | ||
855 | // Wait until the transport has some quota to send the data. | ||
856 | tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire()) | ||
857 | if err != nil { | ||
858 | return err | ||
859 | } | ||
860 | if sq < size { | ||
861 | size = sq | ||
862 | } | ||
863 | if tq < size { | ||
864 | size = tq | ||
865 | } | ||
866 | if p == nil { | ||
867 | p = r.Next(size) | ||
868 | } | ||
869 | ps := len(p) | ||
870 | if ps < sq { | ||
871 | // Overbooked stream quota. Return it back. | ||
872 | s.sendQuotaPool.add(sq - ps) | ||
873 | } | ||
874 | if ps < tq { | ||
875 | // Overbooked transport quota. Return it back. | ||
876 | t.sendQuotaPool.add(tq - ps) | ||
877 | } | ||
878 | t.framer.adjustNumWriters(1) | ||
879 | // Got some quota. Try to acquire writing privilege on the | ||
880 | // transport. | ||
881 | if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { | ||
882 | if _, ok := err.(StreamError); ok { | ||
883 | // Return the connection quota back. | ||
884 | t.sendQuotaPool.add(ps) | ||
885 | } | ||
886 | if t.framer.adjustNumWriters(-1) == 0 { | ||
887 | // This writer is the last one in this batch and has the | ||
888 | // responsibility to flush the buffered frames. It queues | ||
889 | // a flush request to controlBuf instead of flushing directly | ||
890 | // in order to avoid the race with other writing or flushing. | ||
891 | t.controlBuf.put(&flushIO{}) | ||
892 | } | ||
893 | return err | ||
894 | } | ||
895 | select { | ||
896 | case <-s.ctx.Done(): | ||
897 | t.sendQuotaPool.add(ps) | ||
898 | if t.framer.adjustNumWriters(-1) == 0 { | ||
899 | t.controlBuf.put(&flushIO{}) | ||
900 | } | ||
901 | t.writableChan <- 0 | ||
902 | return ContextErr(s.ctx.Err()) | ||
903 | default: | ||
904 | } | ||
905 | if oqv != atomic.LoadUint32(&t.outQuotaVersion) { | ||
906 | // InitialWindowSize settings frame must have been received after we | ||
907 | // acquired send quota but before we got the writable channel. | ||
908 | // We must forsake this write. | ||
909 | t.sendQuotaPool.add(ps) | ||
910 | s.sendQuotaPool.add(ps) | ||
911 | if t.framer.adjustNumWriters(-1) == 0 { | ||
912 | t.controlBuf.put(&flushIO{}) | ||
913 | } | ||
914 | t.writableChan <- 0 | ||
915 | continue | ||
916 | } | ||
917 | var forceFlush bool | ||
918 | if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last { | ||
919 | forceFlush = true | ||
920 | } | ||
921 | // Reset ping strikes when sending data since this might cause | ||
922 | // the peer to send ping. | ||
923 | atomic.StoreUint32(&t.resetPingStrikes, 1) | ||
924 | if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil { | ||
925 | t.Close() | ||
926 | return connectionErrorf(true, err, "transport: %v", err) | ||
927 | } | ||
928 | p = nil | ||
929 | if t.framer.adjustNumWriters(-1) == 0 { | ||
930 | t.framer.flushWrite() | ||
931 | } | ||
932 | t.writableChan <- 0 | ||
933 | } | ||
934 | |||
935 | } | ||
936 | |||
937 | func (t *http2Server) applySettings(ss []http2.Setting) { | ||
938 | for _, s := range ss { | ||
939 | if s.ID == http2.SettingInitialWindowSize { | ||
940 | t.mu.Lock() | ||
941 | defer t.mu.Unlock() | ||
942 | for _, stream := range t.activeStreams { | ||
943 | stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota)) | ||
944 | } | ||
945 | t.streamSendQuota = s.Val | ||
946 | atomic.AddUint32(&t.outQuotaVersion, 1) | ||
947 | } | ||
948 | |||
949 | } | ||
950 | } | ||
951 | |||
952 | // keepalive running in a separate goroutine does the following: | ||
953 | // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. | ||
954 | // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. | ||
955 | // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. | ||
956 | // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection | ||
957 | // after an additional duration of keepalive.Timeout. | ||
958 | func (t *http2Server) keepalive() { | ||
959 | p := &ping{} | ||
960 | var pingSent bool | ||
961 | maxIdle := time.NewTimer(t.kp.MaxConnectionIdle) | ||
962 | maxAge := time.NewTimer(t.kp.MaxConnectionAge) | ||
963 | keepalive := time.NewTimer(t.kp.Time) | ||
964 | // NOTE: All exit paths of this function should reset their | ||
965 | // respecitve timers. A failure to do so will cause the | ||
966 | // following clean-up to deadlock and eventually leak. | ||
967 | defer func() { | ||
968 | if !maxIdle.Stop() { | ||
969 | <-maxIdle.C | ||
970 | } | ||
971 | if !maxAge.Stop() { | ||
972 | <-maxAge.C | ||
973 | } | ||
974 | if !keepalive.Stop() { | ||
975 | <-keepalive.C | ||
976 | } | ||
977 | }() | ||
978 | for { | ||
979 | select { | ||
980 | case <-maxIdle.C: | ||
981 | t.mu.Lock() | ||
982 | idle := t.idle | ||
983 | if idle.IsZero() { // The connection is non-idle. | ||
984 | t.mu.Unlock() | ||
985 | maxIdle.Reset(t.kp.MaxConnectionIdle) | ||
986 | continue | ||
987 | } | ||
988 | val := t.kp.MaxConnectionIdle - time.Since(idle) | ||
989 | t.mu.Unlock() | ||
990 | if val <= 0 { | ||
991 | // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. | ||
992 | // Gracefully close the connection. | ||
993 | t.drain(http2.ErrCodeNo, []byte{}) | ||
994 | // Reseting the timer so that the clean-up doesn't deadlock. | ||
995 | maxIdle.Reset(infinity) | ||
996 | return | ||
997 | } | ||
998 | maxIdle.Reset(val) | ||
999 | case <-maxAge.C: | ||
1000 | t.drain(http2.ErrCodeNo, []byte{}) | ||
1001 | maxAge.Reset(t.kp.MaxConnectionAgeGrace) | ||
1002 | select { | ||
1003 | case <-maxAge.C: | ||
1004 | // Close the connection after grace period. | ||
1005 | t.Close() | ||
1006 | // Reseting the timer so that the clean-up doesn't deadlock. | ||
1007 | maxAge.Reset(infinity) | ||
1008 | case <-t.shutdownChan: | ||
1009 | } | ||
1010 | return | ||
1011 | case <-keepalive.C: | ||
1012 | if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | ||
1013 | pingSent = false | ||
1014 | keepalive.Reset(t.kp.Time) | ||
1015 | continue | ||
1016 | } | ||
1017 | if pingSent { | ||
1018 | t.Close() | ||
1019 | // Reseting the timer so that the clean-up doesn't deadlock. | ||
1020 | keepalive.Reset(infinity) | ||
1021 | return | ||
1022 | } | ||
1023 | pingSent = true | ||
1024 | t.controlBuf.put(p) | ||
1025 | keepalive.Reset(t.kp.Timeout) | ||
1026 | case <-t.shutdownChan: | ||
1027 | return | ||
1028 | } | ||
1029 | } | ||
1030 | } | ||
1031 | |||
1032 | var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} | ||
1033 | |||
1034 | // controller running in a separate goroutine takes charge of sending control | ||
1035 | // frames (e.g., window update, reset stream, setting, etc.) to the server. | ||
1036 | func (t *http2Server) controller() { | ||
1037 | for { | ||
1038 | select { | ||
1039 | case i := <-t.controlBuf.get(): | ||
1040 | t.controlBuf.load() | ||
1041 | select { | ||
1042 | case <-t.writableChan: | ||
1043 | switch i := i.(type) { | ||
1044 | case *windowUpdate: | ||
1045 | t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment) | ||
1046 | case *settings: | ||
1047 | if i.ack { | ||
1048 | t.framer.writeSettingsAck(true) | ||
1049 | t.applySettings(i.ss) | ||
1050 | } else { | ||
1051 | t.framer.writeSettings(true, i.ss...) | ||
1052 | } | ||
1053 | case *resetStream: | ||
1054 | t.framer.writeRSTStream(true, i.streamID, i.code) | ||
1055 | case *goAway: | ||
1056 | t.mu.Lock() | ||
1057 | if t.state == closing { | ||
1058 | t.mu.Unlock() | ||
1059 | // The transport is closing. | ||
1060 | return | ||
1061 | } | ||
1062 | sid := t.maxStreamID | ||
1063 | if !i.headsUp { | ||
1064 | // Stop accepting more streams now. | ||
1065 | t.state = draining | ||
1066 | t.mu.Unlock() | ||
1067 | t.framer.writeGoAway(true, sid, i.code, i.debugData) | ||
1068 | if i.closeConn { | ||
1069 | // Abruptly close the connection following the GoAway. | ||
1070 | t.Close() | ||
1071 | } | ||
1072 | t.writableChan <- 0 | ||
1073 | continue | ||
1074 | } | ||
1075 | t.mu.Unlock() | ||
1076 | // For a graceful close, send out a GoAway with stream ID of MaxUInt32, | ||
1077 | // Follow that with a ping and wait for the ack to come back or a timer | ||
1078 | // to expire. During this time accept new streams since they might have | ||
1079 | // originated before the GoAway reaches the client. | ||
1080 | // After getting the ack or timer expiration send out another GoAway this | ||
1081 | // time with an ID of the max stream server intends to process. | ||
1082 | t.framer.writeGoAway(true, math.MaxUint32, http2.ErrCodeNo, []byte{}) | ||
1083 | t.framer.writePing(true, false, goAwayPing.data) | ||
1084 | go func() { | ||
1085 | timer := time.NewTimer(time.Minute) | ||
1086 | defer timer.Stop() | ||
1087 | select { | ||
1088 | case <-t.drainChan: | ||
1089 | case <-timer.C: | ||
1090 | case <-t.shutdownChan: | ||
1091 | return | ||
1092 | } | ||
1093 | t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData}) | ||
1094 | }() | ||
1095 | case *flushIO: | ||
1096 | t.framer.flushWrite() | ||
1097 | case *ping: | ||
1098 | if !i.ack { | ||
1099 | t.bdpEst.timesnap(i.data) | ||
1100 | } | ||
1101 | t.framer.writePing(true, i.ack, i.data) | ||
1102 | default: | ||
1103 | errorf("transport: http2Server.controller got unexpected item type %v\n", i) | ||
1104 | } | ||
1105 | t.writableChan <- 0 | ||
1106 | continue | ||
1107 | case <-t.shutdownChan: | ||
1108 | return | ||
1109 | } | ||
1110 | case <-t.shutdownChan: | ||
1111 | return | ||
1112 | } | ||
1113 | } | ||
1114 | } | ||
1115 | |||
1116 | // Close starts shutting down the http2Server transport. | ||
1117 | // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This | ||
1118 | // could cause some resource issue. Revisit this later. | ||
1119 | func (t *http2Server) Close() (err error) { | ||
1120 | t.mu.Lock() | ||
1121 | if t.state == closing { | ||
1122 | t.mu.Unlock() | ||
1123 | return errors.New("transport: Close() was already called") | ||
1124 | } | ||
1125 | t.state = closing | ||
1126 | streams := t.activeStreams | ||
1127 | t.activeStreams = nil | ||
1128 | t.mu.Unlock() | ||
1129 | close(t.shutdownChan) | ||
1130 | err = t.conn.Close() | ||
1131 | // Cancel all active streams. | ||
1132 | for _, s := range streams { | ||
1133 | s.cancel() | ||
1134 | } | ||
1135 | if t.stats != nil { | ||
1136 | connEnd := &stats.ConnEnd{} | ||
1137 | t.stats.HandleConn(t.ctx, connEnd) | ||
1138 | } | ||
1139 | return | ||
1140 | } | ||
1141 | |||
1142 | // closeStream clears the footprint of a stream when the stream is not needed | ||
1143 | // any more. | ||
1144 | func (t *http2Server) closeStream(s *Stream) { | ||
1145 | t.mu.Lock() | ||
1146 | delete(t.activeStreams, s.id) | ||
1147 | if len(t.activeStreams) == 0 { | ||
1148 | t.idle = time.Now() | ||
1149 | } | ||
1150 | if t.state == draining && len(t.activeStreams) == 0 { | ||
1151 | defer t.Close() | ||
1152 | } | ||
1153 | t.mu.Unlock() | ||
1154 | // In case stream sending and receiving are invoked in separate | ||
1155 | // goroutines (e.g., bi-directional streaming), cancel needs to be | ||
1156 | // called to interrupt the potential blocking on other goroutines. | ||
1157 | s.cancel() | ||
1158 | s.mu.Lock() | ||
1159 | if s.state == streamDone { | ||
1160 | s.mu.Unlock() | ||
1161 | return | ||
1162 | } | ||
1163 | s.state = streamDone | ||
1164 | s.mu.Unlock() | ||
1165 | } | ||
1166 | |||
1167 | func (t *http2Server) RemoteAddr() net.Addr { | ||
1168 | return t.remoteAddr | ||
1169 | } | ||
1170 | |||
1171 | func (t *http2Server) Drain() { | ||
1172 | t.drain(http2.ErrCodeNo, []byte{}) | ||
1173 | } | ||
1174 | |||
1175 | func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { | ||
1176 | t.mu.Lock() | ||
1177 | defer t.mu.Unlock() | ||
1178 | if t.drainChan != nil { | ||
1179 | return | ||
1180 | } | ||
1181 | t.drainChan = make(chan struct{}) | ||
1182 | t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true}) | ||
1183 | } | ||
1184 | |||
1185 | var rgen = rand.New(rand.NewSource(time.Now().UnixNano())) | ||
1186 | |||
1187 | func getJitter(v time.Duration) time.Duration { | ||
1188 | if v == infinity { | ||
1189 | return 0 | ||
1190 | } | ||
1191 | // Generate a jitter between +/- 10% of the value. | ||
1192 | r := int64(v / 10) | ||
1193 | j := rgen.Int63n(2*r) - r | ||
1194 | return time.Duration(j) | ||
1195 | } | ||
diff --git a/vendor/google.golang.org/grpc/transport/http_util.go b/vendor/google.golang.org/grpc/transport/http_util.go new file mode 100644 index 0000000..685c6fb --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/http_util.go | |||
@@ -0,0 +1,597 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package transport | ||
20 | |||
21 | import ( | ||
22 | "bufio" | ||
23 | "bytes" | ||
24 | "encoding/base64" | ||
25 | "fmt" | ||
26 | "io" | ||
27 | "net" | ||
28 | "net/http" | ||
29 | "strconv" | ||
30 | "strings" | ||
31 | "sync/atomic" | ||
32 | "time" | ||
33 | |||
34 | "github.com/golang/protobuf/proto" | ||
35 | "golang.org/x/net/http2" | ||
36 | "golang.org/x/net/http2/hpack" | ||
37 | spb "google.golang.org/genproto/googleapis/rpc/status" | ||
38 | "google.golang.org/grpc/codes" | ||
39 | "google.golang.org/grpc/status" | ||
40 | ) | ||
41 | |||
42 | const ( | ||
43 | // http2MaxFrameLen specifies the max length of a HTTP2 frame. | ||
44 | http2MaxFrameLen = 16384 // 16KB frame | ||
45 | // http://http2.github.io/http2-spec/#SettingValues | ||
46 | http2InitHeaderTableSize = 4096 | ||
47 | // http2IOBufSize specifies the buffer size for sending frames. | ||
48 | http2IOBufSize = 32 * 1024 | ||
49 | ) | ||
50 | |||
51 | var ( | ||
52 | clientPreface = []byte(http2.ClientPreface) | ||
53 | http2ErrConvTab = map[http2.ErrCode]codes.Code{ | ||
54 | http2.ErrCodeNo: codes.Internal, | ||
55 | http2.ErrCodeProtocol: codes.Internal, | ||
56 | http2.ErrCodeInternal: codes.Internal, | ||
57 | http2.ErrCodeFlowControl: codes.ResourceExhausted, | ||
58 | http2.ErrCodeSettingsTimeout: codes.Internal, | ||
59 | http2.ErrCodeStreamClosed: codes.Internal, | ||
60 | http2.ErrCodeFrameSize: codes.Internal, | ||
61 | http2.ErrCodeRefusedStream: codes.Unavailable, | ||
62 | http2.ErrCodeCancel: codes.Canceled, | ||
63 | http2.ErrCodeCompression: codes.Internal, | ||
64 | http2.ErrCodeConnect: codes.Internal, | ||
65 | http2.ErrCodeEnhanceYourCalm: codes.ResourceExhausted, | ||
66 | http2.ErrCodeInadequateSecurity: codes.PermissionDenied, | ||
67 | http2.ErrCodeHTTP11Required: codes.FailedPrecondition, | ||
68 | } | ||
69 | statusCodeConvTab = map[codes.Code]http2.ErrCode{ | ||
70 | codes.Internal: http2.ErrCodeInternal, | ||
71 | codes.Canceled: http2.ErrCodeCancel, | ||
72 | codes.Unavailable: http2.ErrCodeRefusedStream, | ||
73 | codes.ResourceExhausted: http2.ErrCodeEnhanceYourCalm, | ||
74 | codes.PermissionDenied: http2.ErrCodeInadequateSecurity, | ||
75 | } | ||
76 | httpStatusConvTab = map[int]codes.Code{ | ||
77 | // 400 Bad Request - INTERNAL. | ||
78 | http.StatusBadRequest: codes.Internal, | ||
79 | // 401 Unauthorized - UNAUTHENTICATED. | ||
80 | http.StatusUnauthorized: codes.Unauthenticated, | ||
81 | // 403 Forbidden - PERMISSION_DENIED. | ||
82 | http.StatusForbidden: codes.PermissionDenied, | ||
83 | // 404 Not Found - UNIMPLEMENTED. | ||
84 | http.StatusNotFound: codes.Unimplemented, | ||
85 | // 429 Too Many Requests - UNAVAILABLE. | ||
86 | http.StatusTooManyRequests: codes.Unavailable, | ||
87 | // 502 Bad Gateway - UNAVAILABLE. | ||
88 | http.StatusBadGateway: codes.Unavailable, | ||
89 | // 503 Service Unavailable - UNAVAILABLE. | ||
90 | http.StatusServiceUnavailable: codes.Unavailable, | ||
91 | // 504 Gateway timeout - UNAVAILABLE. | ||
92 | http.StatusGatewayTimeout: codes.Unavailable, | ||
93 | } | ||
94 | ) | ||
95 | |||
96 | // Records the states during HPACK decoding. Must be reset once the | ||
97 | // decoding of the entire headers are finished. | ||
98 | type decodeState struct { | ||
99 | encoding string | ||
100 | // statusGen caches the stream status received from the trailer the server | ||
101 | // sent. Client side only. Do not access directly. After all trailers are | ||
102 | // parsed, use the status method to retrieve the status. | ||
103 | statusGen *status.Status | ||
104 | // rawStatusCode and rawStatusMsg are set from the raw trailer fields and are not | ||
105 | // intended for direct access outside of parsing. | ||
106 | rawStatusCode *int | ||
107 | rawStatusMsg string | ||
108 | httpStatus *int | ||
109 | // Server side only fields. | ||
110 | timeoutSet bool | ||
111 | timeout time.Duration | ||
112 | method string | ||
113 | // key-value metadata map from the peer. | ||
114 | mdata map[string][]string | ||
115 | } | ||
116 | |||
117 | // isReservedHeader checks whether hdr belongs to HTTP2 headers | ||
118 | // reserved by gRPC protocol. Any other headers are classified as the | ||
119 | // user-specified metadata. | ||
120 | func isReservedHeader(hdr string) bool { | ||
121 | if hdr != "" && hdr[0] == ':' { | ||
122 | return true | ||
123 | } | ||
124 | switch hdr { | ||
125 | case "content-type", | ||
126 | "grpc-message-type", | ||
127 | "grpc-encoding", | ||
128 | "grpc-message", | ||
129 | "grpc-status", | ||
130 | "grpc-timeout", | ||
131 | "grpc-status-details-bin", | ||
132 | "te": | ||
133 | return true | ||
134 | default: | ||
135 | return false | ||
136 | } | ||
137 | } | ||
138 | |||
139 | // isWhitelistedPseudoHeader checks whether hdr belongs to HTTP2 pseudoheaders | ||
140 | // that should be propagated into metadata visible to users. | ||
141 | func isWhitelistedPseudoHeader(hdr string) bool { | ||
142 | switch hdr { | ||
143 | case ":authority": | ||
144 | return true | ||
145 | default: | ||
146 | return false | ||
147 | } | ||
148 | } | ||
149 | |||
150 | func validContentType(t string) bool { | ||
151 | e := "application/grpc" | ||
152 | if !strings.HasPrefix(t, e) { | ||
153 | return false | ||
154 | } | ||
155 | // Support variations on the content-type | ||
156 | // (e.g. "application/grpc+blah", "application/grpc;blah"). | ||
157 | if len(t) > len(e) && t[len(e)] != '+' && t[len(e)] != ';' { | ||
158 | return false | ||
159 | } | ||
160 | return true | ||
161 | } | ||
162 | |||
163 | func (d *decodeState) status() *status.Status { | ||
164 | if d.statusGen == nil { | ||
165 | // No status-details were provided; generate status using code/msg. | ||
166 | d.statusGen = status.New(codes.Code(int32(*(d.rawStatusCode))), d.rawStatusMsg) | ||
167 | } | ||
168 | return d.statusGen | ||
169 | } | ||
170 | |||
171 | const binHdrSuffix = "-bin" | ||
172 | |||
173 | func encodeBinHeader(v []byte) string { | ||
174 | return base64.RawStdEncoding.EncodeToString(v) | ||
175 | } | ||
176 | |||
177 | func decodeBinHeader(v string) ([]byte, error) { | ||
178 | if len(v)%4 == 0 { | ||
179 | // Input was padded, or padding was not necessary. | ||
180 | return base64.StdEncoding.DecodeString(v) | ||
181 | } | ||
182 | return base64.RawStdEncoding.DecodeString(v) | ||
183 | } | ||
184 | |||
185 | func encodeMetadataHeader(k, v string) string { | ||
186 | if strings.HasSuffix(k, binHdrSuffix) { | ||
187 | return encodeBinHeader(([]byte)(v)) | ||
188 | } | ||
189 | return v | ||
190 | } | ||
191 | |||
192 | func decodeMetadataHeader(k, v string) (string, error) { | ||
193 | if strings.HasSuffix(k, binHdrSuffix) { | ||
194 | b, err := decodeBinHeader(v) | ||
195 | return string(b), err | ||
196 | } | ||
197 | return v, nil | ||
198 | } | ||
199 | |||
200 | func (d *decodeState) decodeResponseHeader(frame *http2.MetaHeadersFrame) error { | ||
201 | for _, hf := range frame.Fields { | ||
202 | if err := d.processHeaderField(hf); err != nil { | ||
203 | return err | ||
204 | } | ||
205 | } | ||
206 | |||
207 | // If grpc status exists, no need to check further. | ||
208 | if d.rawStatusCode != nil || d.statusGen != nil { | ||
209 | return nil | ||
210 | } | ||
211 | |||
212 | // If grpc status doesn't exist and http status doesn't exist, | ||
213 | // then it's a malformed header. | ||
214 | if d.httpStatus == nil { | ||
215 | return streamErrorf(codes.Internal, "malformed header: doesn't contain status(gRPC or HTTP)") | ||
216 | } | ||
217 | |||
218 | if *(d.httpStatus) != http.StatusOK { | ||
219 | code, ok := httpStatusConvTab[*(d.httpStatus)] | ||
220 | if !ok { | ||
221 | code = codes.Unknown | ||
222 | } | ||
223 | return streamErrorf(code, http.StatusText(*(d.httpStatus))) | ||
224 | } | ||
225 | |||
226 | // gRPC status doesn't exist and http status is OK. | ||
227 | // Set rawStatusCode to be unknown and return nil error. | ||
228 | // So that, if the stream has ended this Unknown status | ||
229 | // will be propogated to the user. | ||
230 | // Otherwise, it will be ignored. In which case, status from | ||
231 | // a later trailer, that has StreamEnded flag set, is propogated. | ||
232 | code := int(codes.Unknown) | ||
233 | d.rawStatusCode = &code | ||
234 | return nil | ||
235 | |||
236 | } | ||
237 | |||
238 | func (d *decodeState) processHeaderField(f hpack.HeaderField) error { | ||
239 | switch f.Name { | ||
240 | case "content-type": | ||
241 | if !validContentType(f.Value) { | ||
242 | return streamErrorf(codes.FailedPrecondition, "transport: received the unexpected content-type %q", f.Value) | ||
243 | } | ||
244 | case "grpc-encoding": | ||
245 | d.encoding = f.Value | ||
246 | case "grpc-status": | ||
247 | code, err := strconv.Atoi(f.Value) | ||
248 | if err != nil { | ||
249 | return streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err) | ||
250 | } | ||
251 | d.rawStatusCode = &code | ||
252 | case "grpc-message": | ||
253 | d.rawStatusMsg = decodeGrpcMessage(f.Value) | ||
254 | case "grpc-status-details-bin": | ||
255 | v, err := decodeBinHeader(f.Value) | ||
256 | if err != nil { | ||
257 | return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) | ||
258 | } | ||
259 | s := &spb.Status{} | ||
260 | if err := proto.Unmarshal(v, s); err != nil { | ||
261 | return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) | ||
262 | } | ||
263 | d.statusGen = status.FromProto(s) | ||
264 | case "grpc-timeout": | ||
265 | d.timeoutSet = true | ||
266 | var err error | ||
267 | if d.timeout, err = decodeTimeout(f.Value); err != nil { | ||
268 | return streamErrorf(codes.Internal, "transport: malformed time-out: %v", err) | ||
269 | } | ||
270 | case ":path": | ||
271 | d.method = f.Value | ||
272 | case ":status": | ||
273 | code, err := strconv.Atoi(f.Value) | ||
274 | if err != nil { | ||
275 | return streamErrorf(codes.Internal, "transport: malformed http-status: %v", err) | ||
276 | } | ||
277 | d.httpStatus = &code | ||
278 | default: | ||
279 | if !isReservedHeader(f.Name) || isWhitelistedPseudoHeader(f.Name) { | ||
280 | if d.mdata == nil { | ||
281 | d.mdata = make(map[string][]string) | ||
282 | } | ||
283 | v, err := decodeMetadataHeader(f.Name, f.Value) | ||
284 | if err != nil { | ||
285 | errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err) | ||
286 | return nil | ||
287 | } | ||
288 | d.mdata[f.Name] = append(d.mdata[f.Name], v) | ||
289 | } | ||
290 | } | ||
291 | return nil | ||
292 | } | ||
293 | |||
294 | type timeoutUnit uint8 | ||
295 | |||
296 | const ( | ||
297 | hour timeoutUnit = 'H' | ||
298 | minute timeoutUnit = 'M' | ||
299 | second timeoutUnit = 'S' | ||
300 | millisecond timeoutUnit = 'm' | ||
301 | microsecond timeoutUnit = 'u' | ||
302 | nanosecond timeoutUnit = 'n' | ||
303 | ) | ||
304 | |||
305 | func timeoutUnitToDuration(u timeoutUnit) (d time.Duration, ok bool) { | ||
306 | switch u { | ||
307 | case hour: | ||
308 | return time.Hour, true | ||
309 | case minute: | ||
310 | return time.Minute, true | ||
311 | case second: | ||
312 | return time.Second, true | ||
313 | case millisecond: | ||
314 | return time.Millisecond, true | ||
315 | case microsecond: | ||
316 | return time.Microsecond, true | ||
317 | case nanosecond: | ||
318 | return time.Nanosecond, true | ||
319 | default: | ||
320 | } | ||
321 | return | ||
322 | } | ||
323 | |||
324 | const maxTimeoutValue int64 = 100000000 - 1 | ||
325 | |||
326 | // div does integer division and round-up the result. Note that this is | ||
327 | // equivalent to (d+r-1)/r but has less chance to overflow. | ||
328 | func div(d, r time.Duration) int64 { | ||
329 | if m := d % r; m > 0 { | ||
330 | return int64(d/r + 1) | ||
331 | } | ||
332 | return int64(d / r) | ||
333 | } | ||
334 | |||
335 | // TODO(zhaoq): It is the simplistic and not bandwidth efficient. Improve it. | ||
336 | func encodeTimeout(t time.Duration) string { | ||
337 | if t <= 0 { | ||
338 | return "0n" | ||
339 | } | ||
340 | if d := div(t, time.Nanosecond); d <= maxTimeoutValue { | ||
341 | return strconv.FormatInt(d, 10) + "n" | ||
342 | } | ||
343 | if d := div(t, time.Microsecond); d <= maxTimeoutValue { | ||
344 | return strconv.FormatInt(d, 10) + "u" | ||
345 | } | ||
346 | if d := div(t, time.Millisecond); d <= maxTimeoutValue { | ||
347 | return strconv.FormatInt(d, 10) + "m" | ||
348 | } | ||
349 | if d := div(t, time.Second); d <= maxTimeoutValue { | ||
350 | return strconv.FormatInt(d, 10) + "S" | ||
351 | } | ||
352 | if d := div(t, time.Minute); d <= maxTimeoutValue { | ||
353 | return strconv.FormatInt(d, 10) + "M" | ||
354 | } | ||
355 | // Note that maxTimeoutValue * time.Hour > MaxInt64. | ||
356 | return strconv.FormatInt(div(t, time.Hour), 10) + "H" | ||
357 | } | ||
358 | |||
359 | func decodeTimeout(s string) (time.Duration, error) { | ||
360 | size := len(s) | ||
361 | if size < 2 { | ||
362 | return 0, fmt.Errorf("transport: timeout string is too short: %q", s) | ||
363 | } | ||
364 | unit := timeoutUnit(s[size-1]) | ||
365 | d, ok := timeoutUnitToDuration(unit) | ||
366 | if !ok { | ||
367 | return 0, fmt.Errorf("transport: timeout unit is not recognized: %q", s) | ||
368 | } | ||
369 | t, err := strconv.ParseInt(s[:size-1], 10, 64) | ||
370 | if err != nil { | ||
371 | return 0, err | ||
372 | } | ||
373 | return d * time.Duration(t), nil | ||
374 | } | ||
375 | |||
376 | const ( | ||
377 | spaceByte = ' ' | ||
378 | tildaByte = '~' | ||
379 | percentByte = '%' | ||
380 | ) | ||
381 | |||
382 | // encodeGrpcMessage is used to encode status code in header field | ||
383 | // "grpc-message". | ||
384 | // It checks to see if each individual byte in msg is an | ||
385 | // allowable byte, and then either percent encoding or passing it through. | ||
386 | // When percent encoding, the byte is converted into hexadecimal notation | ||
387 | // with a '%' prepended. | ||
388 | func encodeGrpcMessage(msg string) string { | ||
389 | if msg == "" { | ||
390 | return "" | ||
391 | } | ||
392 | lenMsg := len(msg) | ||
393 | for i := 0; i < lenMsg; i++ { | ||
394 | c := msg[i] | ||
395 | if !(c >= spaceByte && c < tildaByte && c != percentByte) { | ||
396 | return encodeGrpcMessageUnchecked(msg) | ||
397 | } | ||
398 | } | ||
399 | return msg | ||
400 | } | ||
401 | |||
402 | func encodeGrpcMessageUnchecked(msg string) string { | ||
403 | var buf bytes.Buffer | ||
404 | lenMsg := len(msg) | ||
405 | for i := 0; i < lenMsg; i++ { | ||
406 | c := msg[i] | ||
407 | if c >= spaceByte && c < tildaByte && c != percentByte { | ||
408 | buf.WriteByte(c) | ||
409 | } else { | ||
410 | buf.WriteString(fmt.Sprintf("%%%02X", c)) | ||
411 | } | ||
412 | } | ||
413 | return buf.String() | ||
414 | } | ||
415 | |||
416 | // decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage. | ||
417 | func decodeGrpcMessage(msg string) string { | ||
418 | if msg == "" { | ||
419 | return "" | ||
420 | } | ||
421 | lenMsg := len(msg) | ||
422 | for i := 0; i < lenMsg; i++ { | ||
423 | if msg[i] == percentByte && i+2 < lenMsg { | ||
424 | return decodeGrpcMessageUnchecked(msg) | ||
425 | } | ||
426 | } | ||
427 | return msg | ||
428 | } | ||
429 | |||
430 | func decodeGrpcMessageUnchecked(msg string) string { | ||
431 | var buf bytes.Buffer | ||
432 | lenMsg := len(msg) | ||
433 | for i := 0; i < lenMsg; i++ { | ||
434 | c := msg[i] | ||
435 | if c == percentByte && i+2 < lenMsg { | ||
436 | parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8) | ||
437 | if err != nil { | ||
438 | buf.WriteByte(c) | ||
439 | } else { | ||
440 | buf.WriteByte(byte(parsed)) | ||
441 | i += 2 | ||
442 | } | ||
443 | } else { | ||
444 | buf.WriteByte(c) | ||
445 | } | ||
446 | } | ||
447 | return buf.String() | ||
448 | } | ||
449 | |||
450 | type framer struct { | ||
451 | numWriters int32 | ||
452 | reader io.Reader | ||
453 | writer *bufio.Writer | ||
454 | fr *http2.Framer | ||
455 | } | ||
456 | |||
457 | func newFramer(conn net.Conn) *framer { | ||
458 | f := &framer{ | ||
459 | reader: bufio.NewReaderSize(conn, http2IOBufSize), | ||
460 | writer: bufio.NewWriterSize(conn, http2IOBufSize), | ||
461 | } | ||
462 | f.fr = http2.NewFramer(f.writer, f.reader) | ||
463 | // Opt-in to Frame reuse API on framer to reduce garbage. | ||
464 | // Frames aren't safe to read from after a subsequent call to ReadFrame. | ||
465 | f.fr.SetReuseFrames() | ||
466 | f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil) | ||
467 | return f | ||
468 | } | ||
469 | |||
470 | func (f *framer) adjustNumWriters(i int32) int32 { | ||
471 | return atomic.AddInt32(&f.numWriters, i) | ||
472 | } | ||
473 | |||
474 | // The following writeXXX functions can only be called when the caller gets | ||
475 | // unblocked from writableChan channel (i.e., owns the privilege to write). | ||
476 | |||
477 | func (f *framer) writeContinuation(forceFlush bool, streamID uint32, endHeaders bool, headerBlockFragment []byte) error { | ||
478 | if err := f.fr.WriteContinuation(streamID, endHeaders, headerBlockFragment); err != nil { | ||
479 | return err | ||
480 | } | ||
481 | if forceFlush { | ||
482 | return f.writer.Flush() | ||
483 | } | ||
484 | return nil | ||
485 | } | ||
486 | |||
487 | func (f *framer) writeData(forceFlush bool, streamID uint32, endStream bool, data []byte) error { | ||
488 | if err := f.fr.WriteData(streamID, endStream, data); err != nil { | ||
489 | return err | ||
490 | } | ||
491 | if forceFlush { | ||
492 | return f.writer.Flush() | ||
493 | } | ||
494 | return nil | ||
495 | } | ||
496 | |||
497 | func (f *framer) writeGoAway(forceFlush bool, maxStreamID uint32, code http2.ErrCode, debugData []byte) error { | ||
498 | if err := f.fr.WriteGoAway(maxStreamID, code, debugData); err != nil { | ||
499 | return err | ||
500 | } | ||
501 | if forceFlush { | ||
502 | return f.writer.Flush() | ||
503 | } | ||
504 | return nil | ||
505 | } | ||
506 | |||
507 | func (f *framer) writeHeaders(forceFlush bool, p http2.HeadersFrameParam) error { | ||
508 | if err := f.fr.WriteHeaders(p); err != nil { | ||
509 | return err | ||
510 | } | ||
511 | if forceFlush { | ||
512 | return f.writer.Flush() | ||
513 | } | ||
514 | return nil | ||
515 | } | ||
516 | |||
517 | func (f *framer) writePing(forceFlush, ack bool, data [8]byte) error { | ||
518 | if err := f.fr.WritePing(ack, data); err != nil { | ||
519 | return err | ||
520 | } | ||
521 | if forceFlush { | ||
522 | return f.writer.Flush() | ||
523 | } | ||
524 | return nil | ||
525 | } | ||
526 | |||
527 | func (f *framer) writePriority(forceFlush bool, streamID uint32, p http2.PriorityParam) error { | ||
528 | if err := f.fr.WritePriority(streamID, p); err != nil { | ||
529 | return err | ||
530 | } | ||
531 | if forceFlush { | ||
532 | return f.writer.Flush() | ||
533 | } | ||
534 | return nil | ||
535 | } | ||
536 | |||
537 | func (f *framer) writePushPromise(forceFlush bool, p http2.PushPromiseParam) error { | ||
538 | if err := f.fr.WritePushPromise(p); err != nil { | ||
539 | return err | ||
540 | } | ||
541 | if forceFlush { | ||
542 | return f.writer.Flush() | ||
543 | } | ||
544 | return nil | ||
545 | } | ||
546 | |||
547 | func (f *framer) writeRSTStream(forceFlush bool, streamID uint32, code http2.ErrCode) error { | ||
548 | if err := f.fr.WriteRSTStream(streamID, code); err != nil { | ||
549 | return err | ||
550 | } | ||
551 | if forceFlush { | ||
552 | return f.writer.Flush() | ||
553 | } | ||
554 | return nil | ||
555 | } | ||
556 | |||
557 | func (f *framer) writeSettings(forceFlush bool, settings ...http2.Setting) error { | ||
558 | if err := f.fr.WriteSettings(settings...); err != nil { | ||
559 | return err | ||
560 | } | ||
561 | if forceFlush { | ||
562 | return f.writer.Flush() | ||
563 | } | ||
564 | return nil | ||
565 | } | ||
566 | |||
567 | func (f *framer) writeSettingsAck(forceFlush bool) error { | ||
568 | if err := f.fr.WriteSettingsAck(); err != nil { | ||
569 | return err | ||
570 | } | ||
571 | if forceFlush { | ||
572 | return f.writer.Flush() | ||
573 | } | ||
574 | return nil | ||
575 | } | ||
576 | |||
577 | func (f *framer) writeWindowUpdate(forceFlush bool, streamID, incr uint32) error { | ||
578 | if err := f.fr.WriteWindowUpdate(streamID, incr); err != nil { | ||
579 | return err | ||
580 | } | ||
581 | if forceFlush { | ||
582 | return f.writer.Flush() | ||
583 | } | ||
584 | return nil | ||
585 | } | ||
586 | |||
587 | func (f *framer) flushWrite() error { | ||
588 | return f.writer.Flush() | ||
589 | } | ||
590 | |||
591 | func (f *framer) readFrame() (http2.Frame, error) { | ||
592 | return f.fr.ReadFrame() | ||
593 | } | ||
594 | |||
595 | func (f *framer) errorDetail() error { | ||
596 | return f.fr.ErrorDetail() | ||
597 | } | ||
diff --git a/vendor/google.golang.org/grpc/transport/log.go b/vendor/google.golang.org/grpc/transport/log.go new file mode 100644 index 0000000..ac8e358 --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/log.go | |||
@@ -0,0 +1,50 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2017 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | // This file contains wrappers for grpclog functions. | ||
20 | // The transport package only logs to verbose level 2 by default. | ||
21 | |||
22 | package transport | ||
23 | |||
24 | import "google.golang.org/grpc/grpclog" | ||
25 | |||
26 | const logLevel = 2 | ||
27 | |||
28 | func infof(format string, args ...interface{}) { | ||
29 | if grpclog.V(logLevel) { | ||
30 | grpclog.Infof(format, args...) | ||
31 | } | ||
32 | } | ||
33 | |||
34 | func warningf(format string, args ...interface{}) { | ||
35 | if grpclog.V(logLevel) { | ||
36 | grpclog.Warningf(format, args...) | ||
37 | } | ||
38 | } | ||
39 | |||
40 | func errorf(format string, args ...interface{}) { | ||
41 | if grpclog.V(logLevel) { | ||
42 | grpclog.Errorf(format, args...) | ||
43 | } | ||
44 | } | ||
45 | |||
46 | func fatalf(format string, args ...interface{}) { | ||
47 | if grpclog.V(logLevel) { | ||
48 | grpclog.Fatalf(format, args...) | ||
49 | } | ||
50 | } | ||
diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go new file mode 100644 index 0000000..ec0fe67 --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/transport.go | |||
@@ -0,0 +1,730 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | // Package transport defines and implements message oriented communication | ||
20 | // channel to complete various transactions (e.g., an RPC). | ||
21 | package transport // import "google.golang.org/grpc/transport" | ||
22 | |||
23 | import ( | ||
24 | "fmt" | ||
25 | "io" | ||
26 | "net" | ||
27 | "sync" | ||
28 | |||
29 | "golang.org/x/net/context" | ||
30 | "golang.org/x/net/http2" | ||
31 | "google.golang.org/grpc/codes" | ||
32 | "google.golang.org/grpc/credentials" | ||
33 | "google.golang.org/grpc/keepalive" | ||
34 | "google.golang.org/grpc/metadata" | ||
35 | "google.golang.org/grpc/stats" | ||
36 | "google.golang.org/grpc/status" | ||
37 | "google.golang.org/grpc/tap" | ||
38 | ) | ||
39 | |||
40 | // recvMsg represents the received msg from the transport. All transport | ||
41 | // protocol specific info has been removed. | ||
42 | type recvMsg struct { | ||
43 | data []byte | ||
44 | // nil: received some data | ||
45 | // io.EOF: stream is completed. data is nil. | ||
46 | // other non-nil error: transport failure. data is nil. | ||
47 | err error | ||
48 | } | ||
49 | |||
50 | // recvBuffer is an unbounded channel of recvMsg structs. | ||
51 | // Note recvBuffer differs from controlBuffer only in that recvBuffer | ||
52 | // holds a channel of only recvMsg structs instead of objects implementing "item" interface. | ||
53 | // recvBuffer is written to much more often than | ||
54 | // controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put" | ||
55 | type recvBuffer struct { | ||
56 | c chan recvMsg | ||
57 | mu sync.Mutex | ||
58 | backlog []recvMsg | ||
59 | } | ||
60 | |||
61 | func newRecvBuffer() *recvBuffer { | ||
62 | b := &recvBuffer{ | ||
63 | c: make(chan recvMsg, 1), | ||
64 | } | ||
65 | return b | ||
66 | } | ||
67 | |||
68 | func (b *recvBuffer) put(r recvMsg) { | ||
69 | b.mu.Lock() | ||
70 | defer b.mu.Unlock() | ||
71 | if len(b.backlog) == 0 { | ||
72 | select { | ||
73 | case b.c <- r: | ||
74 | return | ||
75 | default: | ||
76 | } | ||
77 | } | ||
78 | b.backlog = append(b.backlog, r) | ||
79 | } | ||
80 | |||
81 | func (b *recvBuffer) load() { | ||
82 | b.mu.Lock() | ||
83 | defer b.mu.Unlock() | ||
84 | if len(b.backlog) > 0 { | ||
85 | select { | ||
86 | case b.c <- b.backlog[0]: | ||
87 | b.backlog[0] = recvMsg{} | ||
88 | b.backlog = b.backlog[1:] | ||
89 | default: | ||
90 | } | ||
91 | } | ||
92 | } | ||
93 | |||
94 | // get returns the channel that receives a recvMsg in the buffer. | ||
95 | // | ||
96 | // Upon receipt of a recvMsg, the caller should call load to send another | ||
97 | // recvMsg onto the channel if there is any. | ||
98 | func (b *recvBuffer) get() <-chan recvMsg { | ||
99 | return b.c | ||
100 | } | ||
101 | |||
102 | // recvBufferReader implements io.Reader interface to read the data from | ||
103 | // recvBuffer. | ||
104 | type recvBufferReader struct { | ||
105 | ctx context.Context | ||
106 | goAway chan struct{} | ||
107 | recv *recvBuffer | ||
108 | last []byte // Stores the remaining data in the previous calls. | ||
109 | err error | ||
110 | } | ||
111 | |||
112 | // Read reads the next len(p) bytes from last. If last is drained, it tries to | ||
113 | // read additional data from recv. It blocks if there no additional data available | ||
114 | // in recv. If Read returns any non-nil error, it will continue to return that error. | ||
115 | func (r *recvBufferReader) Read(p []byte) (n int, err error) { | ||
116 | if r.err != nil { | ||
117 | return 0, r.err | ||
118 | } | ||
119 | n, r.err = r.read(p) | ||
120 | return n, r.err | ||
121 | } | ||
122 | |||
123 | func (r *recvBufferReader) read(p []byte) (n int, err error) { | ||
124 | if r.last != nil && len(r.last) > 0 { | ||
125 | // Read remaining data left in last call. | ||
126 | copied := copy(p, r.last) | ||
127 | r.last = r.last[copied:] | ||
128 | return copied, nil | ||
129 | } | ||
130 | select { | ||
131 | case <-r.ctx.Done(): | ||
132 | return 0, ContextErr(r.ctx.Err()) | ||
133 | case <-r.goAway: | ||
134 | return 0, ErrStreamDrain | ||
135 | case m := <-r.recv.get(): | ||
136 | r.recv.load() | ||
137 | if m.err != nil { | ||
138 | return 0, m.err | ||
139 | } | ||
140 | copied := copy(p, m.data) | ||
141 | r.last = m.data[copied:] | ||
142 | return copied, nil | ||
143 | } | ||
144 | } | ||
145 | |||
146 | // All items in an out of a controlBuffer should be the same type. | ||
147 | type item interface { | ||
148 | item() | ||
149 | } | ||
150 | |||
151 | // controlBuffer is an unbounded channel of item. | ||
152 | type controlBuffer struct { | ||
153 | c chan item | ||
154 | mu sync.Mutex | ||
155 | backlog []item | ||
156 | } | ||
157 | |||
158 | func newControlBuffer() *controlBuffer { | ||
159 | b := &controlBuffer{ | ||
160 | c: make(chan item, 1), | ||
161 | } | ||
162 | return b | ||
163 | } | ||
164 | |||
165 | func (b *controlBuffer) put(r item) { | ||
166 | b.mu.Lock() | ||
167 | defer b.mu.Unlock() | ||
168 | if len(b.backlog) == 0 { | ||
169 | select { | ||
170 | case b.c <- r: | ||
171 | return | ||
172 | default: | ||
173 | } | ||
174 | } | ||
175 | b.backlog = append(b.backlog, r) | ||
176 | } | ||
177 | |||
178 | func (b *controlBuffer) load() { | ||
179 | b.mu.Lock() | ||
180 | defer b.mu.Unlock() | ||
181 | if len(b.backlog) > 0 { | ||
182 | select { | ||
183 | case b.c <- b.backlog[0]: | ||
184 | b.backlog[0] = nil | ||
185 | b.backlog = b.backlog[1:] | ||
186 | default: | ||
187 | } | ||
188 | } | ||
189 | } | ||
190 | |||
191 | // get returns the channel that receives an item in the buffer. | ||
192 | // | ||
193 | // Upon receipt of an item, the caller should call load to send another | ||
194 | // item onto the channel if there is any. | ||
195 | func (b *controlBuffer) get() <-chan item { | ||
196 | return b.c | ||
197 | } | ||
198 | |||
199 | type streamState uint8 | ||
200 | |||
201 | const ( | ||
202 | streamActive streamState = iota | ||
203 | streamWriteDone // EndStream sent | ||
204 | streamReadDone // EndStream received | ||
205 | streamDone // the entire stream is finished. | ||
206 | ) | ||
207 | |||
208 | // Stream represents an RPC in the transport layer. | ||
209 | type Stream struct { | ||
210 | id uint32 | ||
211 | // nil for client side Stream. | ||
212 | st ServerTransport | ||
213 | // ctx is the associated context of the stream. | ||
214 | ctx context.Context | ||
215 | // cancel is always nil for client side Stream. | ||
216 | cancel context.CancelFunc | ||
217 | // done is closed when the final status arrives. | ||
218 | done chan struct{} | ||
219 | // goAway is closed when the server sent GoAways signal before this stream was initiated. | ||
220 | goAway chan struct{} | ||
221 | // method records the associated RPC method of the stream. | ||
222 | method string | ||
223 | recvCompress string | ||
224 | sendCompress string | ||
225 | buf *recvBuffer | ||
226 | trReader io.Reader | ||
227 | fc *inFlow | ||
228 | recvQuota uint32 | ||
229 | |||
230 | // TODO: Remote this unused variable. | ||
231 | // The accumulated inbound quota pending for window update. | ||
232 | updateQuota uint32 | ||
233 | |||
234 | // Callback to state application's intentions to read data. This | ||
235 | // is used to adjust flow control, if need be. | ||
236 | requestRead func(int) | ||
237 | |||
238 | sendQuotaPool *quotaPool | ||
239 | // Close headerChan to indicate the end of reception of header metadata. | ||
240 | headerChan chan struct{} | ||
241 | // header caches the received header metadata. | ||
242 | header metadata.MD | ||
243 | // The key-value map of trailer metadata. | ||
244 | trailer metadata.MD | ||
245 | |||
246 | mu sync.RWMutex // guard the following | ||
247 | // headerOK becomes true from the first header is about to send. | ||
248 | headerOk bool | ||
249 | state streamState | ||
250 | // true iff headerChan is closed. Used to avoid closing headerChan | ||
251 | // multiple times. | ||
252 | headerDone bool | ||
253 | // the status error received from the server. | ||
254 | status *status.Status | ||
255 | // rstStream indicates whether a RST_STREAM frame needs to be sent | ||
256 | // to the server to signify that this stream is closing. | ||
257 | rstStream bool | ||
258 | // rstError is the error that needs to be sent along with the RST_STREAM frame. | ||
259 | rstError http2.ErrCode | ||
260 | // bytesSent and bytesReceived indicates whether any bytes have been sent or | ||
261 | // received on this stream. | ||
262 | bytesSent bool | ||
263 | bytesReceived bool | ||
264 | } | ||
265 | |||
266 | // RecvCompress returns the compression algorithm applied to the inbound | ||
267 | // message. It is empty string if there is no compression applied. | ||
268 | func (s *Stream) RecvCompress() string { | ||
269 | return s.recvCompress | ||
270 | } | ||
271 | |||
272 | // SetSendCompress sets the compression algorithm to the stream. | ||
273 | func (s *Stream) SetSendCompress(str string) { | ||
274 | s.sendCompress = str | ||
275 | } | ||
276 | |||
277 | // Done returns a chanel which is closed when it receives the final status | ||
278 | // from the server. | ||
279 | func (s *Stream) Done() <-chan struct{} { | ||
280 | return s.done | ||
281 | } | ||
282 | |||
283 | // GoAway returns a channel which is closed when the server sent GoAways signal | ||
284 | // before this stream was initiated. | ||
285 | func (s *Stream) GoAway() <-chan struct{} { | ||
286 | return s.goAway | ||
287 | } | ||
288 | |||
289 | // Header acquires the key-value pairs of header metadata once it | ||
290 | // is available. It blocks until i) the metadata is ready or ii) there is no | ||
291 | // header metadata or iii) the stream is canceled/expired. | ||
292 | func (s *Stream) Header() (metadata.MD, error) { | ||
293 | var err error | ||
294 | select { | ||
295 | case <-s.ctx.Done(): | ||
296 | err = ContextErr(s.ctx.Err()) | ||
297 | case <-s.goAway: | ||
298 | err = ErrStreamDrain | ||
299 | case <-s.headerChan: | ||
300 | return s.header.Copy(), nil | ||
301 | } | ||
302 | // Even if the stream is closed, header is returned if available. | ||
303 | select { | ||
304 | case <-s.headerChan: | ||
305 | return s.header.Copy(), nil | ||
306 | default: | ||
307 | } | ||
308 | return nil, err | ||
309 | } | ||
310 | |||
311 | // Trailer returns the cached trailer metedata. Note that if it is not called | ||
312 | // after the entire stream is done, it could return an empty MD. Client | ||
313 | // side only. | ||
314 | func (s *Stream) Trailer() metadata.MD { | ||
315 | s.mu.RLock() | ||
316 | defer s.mu.RUnlock() | ||
317 | return s.trailer.Copy() | ||
318 | } | ||
319 | |||
320 | // ServerTransport returns the underlying ServerTransport for the stream. | ||
321 | // The client side stream always returns nil. | ||
322 | func (s *Stream) ServerTransport() ServerTransport { | ||
323 | return s.st | ||
324 | } | ||
325 | |||
326 | // Context returns the context of the stream. | ||
327 | func (s *Stream) Context() context.Context { | ||
328 | return s.ctx | ||
329 | } | ||
330 | |||
331 | // Method returns the method for the stream. | ||
332 | func (s *Stream) Method() string { | ||
333 | return s.method | ||
334 | } | ||
335 | |||
336 | // Status returns the status received from the server. | ||
337 | func (s *Stream) Status() *status.Status { | ||
338 | return s.status | ||
339 | } | ||
340 | |||
341 | // SetHeader sets the header metadata. This can be called multiple times. | ||
342 | // Server side only. | ||
343 | func (s *Stream) SetHeader(md metadata.MD) error { | ||
344 | s.mu.Lock() | ||
345 | defer s.mu.Unlock() | ||
346 | if s.headerOk || s.state == streamDone { | ||
347 | return ErrIllegalHeaderWrite | ||
348 | } | ||
349 | if md.Len() == 0 { | ||
350 | return nil | ||
351 | } | ||
352 | s.header = metadata.Join(s.header, md) | ||
353 | return nil | ||
354 | } | ||
355 | |||
356 | // SetTrailer sets the trailer metadata which will be sent with the RPC status | ||
357 | // by the server. This can be called multiple times. Server side only. | ||
358 | func (s *Stream) SetTrailer(md metadata.MD) error { | ||
359 | if md.Len() == 0 { | ||
360 | return nil | ||
361 | } | ||
362 | s.mu.Lock() | ||
363 | defer s.mu.Unlock() | ||
364 | s.trailer = metadata.Join(s.trailer, md) | ||
365 | return nil | ||
366 | } | ||
367 | |||
368 | func (s *Stream) write(m recvMsg) { | ||
369 | s.buf.put(m) | ||
370 | } | ||
371 | |||
372 | // Read reads all p bytes from the wire for this stream. | ||
373 | func (s *Stream) Read(p []byte) (n int, err error) { | ||
374 | // Don't request a read if there was an error earlier | ||
375 | if er := s.trReader.(*transportReader).er; er != nil { | ||
376 | return 0, er | ||
377 | } | ||
378 | s.requestRead(len(p)) | ||
379 | return io.ReadFull(s.trReader, p) | ||
380 | } | ||
381 | |||
382 | // tranportReader reads all the data available for this Stream from the transport and | ||
383 | // passes them into the decoder, which converts them into a gRPC message stream. | ||
384 | // The error is io.EOF when the stream is done or another non-nil error if | ||
385 | // the stream broke. | ||
386 | type transportReader struct { | ||
387 | reader io.Reader | ||
388 | // The handler to control the window update procedure for both this | ||
389 | // particular stream and the associated transport. | ||
390 | windowHandler func(int) | ||
391 | er error | ||
392 | } | ||
393 | |||
394 | func (t *transportReader) Read(p []byte) (n int, err error) { | ||
395 | n, err = t.reader.Read(p) | ||
396 | if err != nil { | ||
397 | t.er = err | ||
398 | return | ||
399 | } | ||
400 | t.windowHandler(n) | ||
401 | return | ||
402 | } | ||
403 | |||
404 | // finish sets the stream's state and status, and closes the done channel. | ||
405 | // s.mu must be held by the caller. st must always be non-nil. | ||
406 | func (s *Stream) finish(st *status.Status) { | ||
407 | s.status = st | ||
408 | s.state = streamDone | ||
409 | close(s.done) | ||
410 | } | ||
411 | |||
412 | // BytesSent indicates whether any bytes have been sent on this stream. | ||
413 | func (s *Stream) BytesSent() bool { | ||
414 | s.mu.Lock() | ||
415 | defer s.mu.Unlock() | ||
416 | return s.bytesSent | ||
417 | } | ||
418 | |||
419 | // BytesReceived indicates whether any bytes have been received on this stream. | ||
420 | func (s *Stream) BytesReceived() bool { | ||
421 | s.mu.Lock() | ||
422 | defer s.mu.Unlock() | ||
423 | return s.bytesReceived | ||
424 | } | ||
425 | |||
426 | // GoString is implemented by Stream so context.String() won't | ||
427 | // race when printing %#v. | ||
428 | func (s *Stream) GoString() string { | ||
429 | return fmt.Sprintf("<stream: %p, %v>", s, s.method) | ||
430 | } | ||
431 | |||
432 | // The key to save transport.Stream in the context. | ||
433 | type streamKey struct{} | ||
434 | |||
435 | // newContextWithStream creates a new context from ctx and attaches stream | ||
436 | // to it. | ||
437 | func newContextWithStream(ctx context.Context, stream *Stream) context.Context { | ||
438 | return context.WithValue(ctx, streamKey{}, stream) | ||
439 | } | ||
440 | |||
441 | // StreamFromContext returns the stream saved in ctx. | ||
442 | func StreamFromContext(ctx context.Context) (s *Stream, ok bool) { | ||
443 | s, ok = ctx.Value(streamKey{}).(*Stream) | ||
444 | return | ||
445 | } | ||
446 | |||
447 | // state of transport | ||
448 | type transportState int | ||
449 | |||
450 | const ( | ||
451 | reachable transportState = iota | ||
452 | unreachable | ||
453 | closing | ||
454 | draining | ||
455 | ) | ||
456 | |||
457 | // ServerConfig consists of all the configurations to establish a server transport. | ||
458 | type ServerConfig struct { | ||
459 | MaxStreams uint32 | ||
460 | AuthInfo credentials.AuthInfo | ||
461 | InTapHandle tap.ServerInHandle | ||
462 | StatsHandler stats.Handler | ||
463 | KeepaliveParams keepalive.ServerParameters | ||
464 | KeepalivePolicy keepalive.EnforcementPolicy | ||
465 | InitialWindowSize int32 | ||
466 | InitialConnWindowSize int32 | ||
467 | } | ||
468 | |||
469 | // NewServerTransport creates a ServerTransport with conn or non-nil error | ||
470 | // if it fails. | ||
471 | func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) { | ||
472 | return newHTTP2Server(conn, config) | ||
473 | } | ||
474 | |||
475 | // ConnectOptions covers all relevant options for communicating with the server. | ||
476 | type ConnectOptions struct { | ||
477 | // UserAgent is the application user agent. | ||
478 | UserAgent string | ||
479 | // Authority is the :authority pseudo-header to use. This field has no effect if | ||
480 | // TransportCredentials is set. | ||
481 | Authority string | ||
482 | // Dialer specifies how to dial a network address. | ||
483 | Dialer func(context.Context, string) (net.Conn, error) | ||
484 | // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors. | ||
485 | FailOnNonTempDialError bool | ||
486 | // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs. | ||
487 | PerRPCCredentials []credentials.PerRPCCredentials | ||
488 | // TransportCredentials stores the Authenticator required to setup a client connection. | ||
489 | TransportCredentials credentials.TransportCredentials | ||
490 | // KeepaliveParams stores the keepalive parameters. | ||
491 | KeepaliveParams keepalive.ClientParameters | ||
492 | // StatsHandler stores the handler for stats. | ||
493 | StatsHandler stats.Handler | ||
494 | // InitialWindowSize sets the intial window size for a stream. | ||
495 | InitialWindowSize int32 | ||
496 | // InitialConnWindowSize sets the intial window size for a connection. | ||
497 | InitialConnWindowSize int32 | ||
498 | } | ||
499 | |||
500 | // TargetInfo contains the information of the target such as network address and metadata. | ||
501 | type TargetInfo struct { | ||
502 | Addr string | ||
503 | Metadata interface{} | ||
504 | } | ||
505 | |||
506 | // NewClientTransport establishes the transport with the required ConnectOptions | ||
507 | // and returns it to the caller. | ||
508 | func NewClientTransport(ctx context.Context, target TargetInfo, opts ConnectOptions) (ClientTransport, error) { | ||
509 | return newHTTP2Client(ctx, target, opts) | ||
510 | } | ||
511 | |||
512 | // Options provides additional hints and information for message | ||
513 | // transmission. | ||
514 | type Options struct { | ||
515 | // Last indicates whether this write is the last piece for | ||
516 | // this stream. | ||
517 | Last bool | ||
518 | |||
519 | // Delay is a hint to the transport implementation for whether | ||
520 | // the data could be buffered for a batching write. The | ||
521 | // Transport implementation may ignore the hint. | ||
522 | Delay bool | ||
523 | } | ||
524 | |||
525 | // CallHdr carries the information of a particular RPC. | ||
526 | type CallHdr struct { | ||
527 | // Host specifies the peer's host. | ||
528 | Host string | ||
529 | |||
530 | // Method specifies the operation to perform. | ||
531 | Method string | ||
532 | |||
533 | // RecvCompress specifies the compression algorithm applied on | ||
534 | // inbound messages. | ||
535 | RecvCompress string | ||
536 | |||
537 | // SendCompress specifies the compression algorithm applied on | ||
538 | // outbound message. | ||
539 | SendCompress string | ||
540 | |||
541 | // Creds specifies credentials.PerRPCCredentials for a call. | ||
542 | Creds credentials.PerRPCCredentials | ||
543 | |||
544 | // Flush indicates whether a new stream command should be sent | ||
545 | // to the peer without waiting for the first data. This is | ||
546 | // only a hint. | ||
547 | // If it's true, the transport may modify the flush decision | ||
548 | // for performance purposes. | ||
549 | // If it's false, new stream will never be flushed. | ||
550 | Flush bool | ||
551 | } | ||
552 | |||
553 | // ClientTransport is the common interface for all gRPC client-side transport | ||
554 | // implementations. | ||
555 | type ClientTransport interface { | ||
556 | // Close tears down this transport. Once it returns, the transport | ||
557 | // should not be accessed any more. The caller must make sure this | ||
558 | // is called only once. | ||
559 | Close() error | ||
560 | |||
561 | // GracefulClose starts to tear down the transport. It stops accepting | ||
562 | // new RPCs and wait the completion of the pending RPCs. | ||
563 | GracefulClose() error | ||
564 | |||
565 | // Write sends the data for the given stream. A nil stream indicates | ||
566 | // the write is to be performed on the transport as a whole. | ||
567 | Write(s *Stream, data []byte, opts *Options) error | ||
568 | |||
569 | // NewStream creates a Stream for an RPC. | ||
570 | NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) | ||
571 | |||
572 | // CloseStream clears the footprint of a stream when the stream is | ||
573 | // not needed any more. The err indicates the error incurred when | ||
574 | // CloseStream is called. Must be called when a stream is finished | ||
575 | // unless the associated transport is closing. | ||
576 | CloseStream(stream *Stream, err error) | ||
577 | |||
578 | // Error returns a channel that is closed when some I/O error | ||
579 | // happens. Typically the caller should have a goroutine to monitor | ||
580 | // this in order to take action (e.g., close the current transport | ||
581 | // and create a new one) in error case. It should not return nil | ||
582 | // once the transport is initiated. | ||
583 | Error() <-chan struct{} | ||
584 | |||
585 | // GoAway returns a channel that is closed when ClientTransport | ||
586 | // receives the draining signal from the server (e.g., GOAWAY frame in | ||
587 | // HTTP/2). | ||
588 | GoAway() <-chan struct{} | ||
589 | |||
590 | // GetGoAwayReason returns the reason why GoAway frame was received. | ||
591 | GetGoAwayReason() GoAwayReason | ||
592 | } | ||
593 | |||
594 | // ServerTransport is the common interface for all gRPC server-side transport | ||
595 | // implementations. | ||
596 | // | ||
597 | // Methods may be called concurrently from multiple goroutines, but | ||
598 | // Write methods for a given Stream will be called serially. | ||
599 | type ServerTransport interface { | ||
600 | // HandleStreams receives incoming streams using the given handler. | ||
601 | HandleStreams(func(*Stream), func(context.Context, string) context.Context) | ||
602 | |||
603 | // WriteHeader sends the header metadata for the given stream. | ||
604 | // WriteHeader may not be called on all streams. | ||
605 | WriteHeader(s *Stream, md metadata.MD) error | ||
606 | |||
607 | // Write sends the data for the given stream. | ||
608 | // Write may not be called on all streams. | ||
609 | Write(s *Stream, data []byte, opts *Options) error | ||
610 | |||
611 | // WriteStatus sends the status of a stream to the client. WriteStatus is | ||
612 | // the final call made on a stream and always occurs. | ||
613 | WriteStatus(s *Stream, st *status.Status) error | ||
614 | |||
615 | // Close tears down the transport. Once it is called, the transport | ||
616 | // should not be accessed any more. All the pending streams and their | ||
617 | // handlers will be terminated asynchronously. | ||
618 | Close() error | ||
619 | |||
620 | // RemoteAddr returns the remote network address. | ||
621 | RemoteAddr() net.Addr | ||
622 | |||
623 | // Drain notifies the client this ServerTransport stops accepting new RPCs. | ||
624 | Drain() | ||
625 | } | ||
626 | |||
627 | // streamErrorf creates an StreamError with the specified error code and description. | ||
628 | func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError { | ||
629 | return StreamError{ | ||
630 | Code: c, | ||
631 | Desc: fmt.Sprintf(format, a...), | ||
632 | } | ||
633 | } | ||
634 | |||
635 | // connectionErrorf creates an ConnectionError with the specified error description. | ||
636 | func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError { | ||
637 | return ConnectionError{ | ||
638 | Desc: fmt.Sprintf(format, a...), | ||
639 | temp: temp, | ||
640 | err: e, | ||
641 | } | ||
642 | } | ||
643 | |||
644 | // ConnectionError is an error that results in the termination of the | ||
645 | // entire connection and the retry of all the active streams. | ||
646 | type ConnectionError struct { | ||
647 | Desc string | ||
648 | temp bool | ||
649 | err error | ||
650 | } | ||
651 | |||
652 | func (e ConnectionError) Error() string { | ||
653 | return fmt.Sprintf("connection error: desc = %q", e.Desc) | ||
654 | } | ||
655 | |||
656 | // Temporary indicates if this connection error is temporary or fatal. | ||
657 | func (e ConnectionError) Temporary() bool { | ||
658 | return e.temp | ||
659 | } | ||
660 | |||
661 | // Origin returns the original error of this connection error. | ||
662 | func (e ConnectionError) Origin() error { | ||
663 | // Never return nil error here. | ||
664 | // If the original error is nil, return itself. | ||
665 | if e.err == nil { | ||
666 | return e | ||
667 | } | ||
668 | return e.err | ||
669 | } | ||
670 | |||
671 | var ( | ||
672 | // ErrConnClosing indicates that the transport is closing. | ||
673 | ErrConnClosing = connectionErrorf(true, nil, "transport is closing") | ||
674 | // ErrStreamDrain indicates that the stream is rejected by the server because | ||
675 | // the server stops accepting new RPCs. | ||
676 | ErrStreamDrain = streamErrorf(codes.Unavailable, "the server stops accepting new RPCs") | ||
677 | ) | ||
678 | |||
679 | // TODO: See if we can replace StreamError with status package errors. | ||
680 | |||
681 | // StreamError is an error that only affects one stream within a connection. | ||
682 | type StreamError struct { | ||
683 | Code codes.Code | ||
684 | Desc string | ||
685 | } | ||
686 | |||
687 | func (e StreamError) Error() string { | ||
688 | return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc) | ||
689 | } | ||
690 | |||
691 | // wait blocks until it can receive from ctx.Done, closing, or proceed. | ||
692 | // If it receives from ctx.Done, it returns 0, the StreamError for ctx.Err. | ||
693 | // If it receives from done, it returns 0, io.EOF if ctx is not done; otherwise | ||
694 | // it return the StreamError for ctx.Err. | ||
695 | // If it receives from goAway, it returns 0, ErrStreamDrain. | ||
696 | // If it receives from closing, it returns 0, ErrConnClosing. | ||
697 | // If it receives from proceed, it returns the received integer, nil. | ||
698 | func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-chan int) (int, error) { | ||
699 | select { | ||
700 | case <-ctx.Done(): | ||
701 | return 0, ContextErr(ctx.Err()) | ||
702 | case <-done: | ||
703 | // User cancellation has precedence. | ||
704 | select { | ||
705 | case <-ctx.Done(): | ||
706 | return 0, ContextErr(ctx.Err()) | ||
707 | default: | ||
708 | } | ||
709 | return 0, io.EOF | ||
710 | case <-goAway: | ||
711 | return 0, ErrStreamDrain | ||
712 | case <-closing: | ||
713 | return 0, ErrConnClosing | ||
714 | case i := <-proceed: | ||
715 | return i, nil | ||
716 | } | ||
717 | } | ||
718 | |||
719 | // GoAwayReason contains the reason for the GoAway frame received. | ||
720 | type GoAwayReason uint8 | ||
721 | |||
722 | const ( | ||
723 | // Invalid indicates that no GoAway frame is received. | ||
724 | Invalid GoAwayReason = 0 | ||
725 | // NoReason is the default value when GoAway frame is received. | ||
726 | NoReason GoAwayReason = 1 | ||
727 | // TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm | ||
728 | // was recieved and that the debug data said "too_many_pings". | ||
729 | TooManyPings GoAwayReason = 2 | ||
730 | ) | ||