diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/transport/control.go')
-rw-r--r-- | vendor/google.golang.org/grpc/transport/control.go | 246 |
1 files changed, 246 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go new file mode 100644 index 0000000..501eb03 --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/control.go | |||
@@ -0,0 +1,246 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package transport | ||
20 | |||
21 | import ( | ||
22 | "fmt" | ||
23 | "math" | ||
24 | "sync" | ||
25 | "time" | ||
26 | |||
27 | "golang.org/x/net/http2" | ||
28 | ) | ||
29 | |||
30 | const ( | ||
31 | // The default value of flow control window size in HTTP2 spec. | ||
32 | defaultWindowSize = 65535 | ||
33 | // The initial window size for flow control. | ||
34 | initialWindowSize = defaultWindowSize // for an RPC | ||
35 | infinity = time.Duration(math.MaxInt64) | ||
36 | defaultClientKeepaliveTime = infinity | ||
37 | defaultClientKeepaliveTimeout = time.Duration(20 * time.Second) | ||
38 | defaultMaxStreamsClient = 100 | ||
39 | defaultMaxConnectionIdle = infinity | ||
40 | defaultMaxConnectionAge = infinity | ||
41 | defaultMaxConnectionAgeGrace = infinity | ||
42 | defaultServerKeepaliveTime = time.Duration(2 * time.Hour) | ||
43 | defaultServerKeepaliveTimeout = time.Duration(20 * time.Second) | ||
44 | defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute) | ||
45 | // max window limit set by HTTP2 Specs. | ||
46 | maxWindowSize = math.MaxInt32 | ||
47 | ) | ||
48 | |||
49 | // The following defines various control items which could flow through | ||
50 | // the control buffer of transport. They represent different aspects of | ||
51 | // control tasks, e.g., flow control, settings, streaming resetting, etc. | ||
52 | type windowUpdate struct { | ||
53 | streamID uint32 | ||
54 | increment uint32 | ||
55 | flush bool | ||
56 | } | ||
57 | |||
58 | func (*windowUpdate) item() {} | ||
59 | |||
60 | type settings struct { | ||
61 | ack bool | ||
62 | ss []http2.Setting | ||
63 | } | ||
64 | |||
65 | func (*settings) item() {} | ||
66 | |||
67 | type resetStream struct { | ||
68 | streamID uint32 | ||
69 | code http2.ErrCode | ||
70 | } | ||
71 | |||
72 | func (*resetStream) item() {} | ||
73 | |||
74 | type goAway struct { | ||
75 | code http2.ErrCode | ||
76 | debugData []byte | ||
77 | headsUp bool | ||
78 | closeConn bool | ||
79 | } | ||
80 | |||
81 | func (*goAway) item() {} | ||
82 | |||
83 | type flushIO struct { | ||
84 | } | ||
85 | |||
86 | func (*flushIO) item() {} | ||
87 | |||
88 | type ping struct { | ||
89 | ack bool | ||
90 | data [8]byte | ||
91 | } | ||
92 | |||
93 | func (*ping) item() {} | ||
94 | |||
95 | // quotaPool is a pool which accumulates the quota and sends it to acquire() | ||
96 | // when it is available. | ||
97 | type quotaPool struct { | ||
98 | c chan int | ||
99 | |||
100 | mu sync.Mutex | ||
101 | quota int | ||
102 | } | ||
103 | |||
104 | // newQuotaPool creates a quotaPool which has quota q available to consume. | ||
105 | func newQuotaPool(q int) *quotaPool { | ||
106 | qb := "aPool{ | ||
107 | c: make(chan int, 1), | ||
108 | } | ||
109 | if q > 0 { | ||
110 | qb.c <- q | ||
111 | } else { | ||
112 | qb.quota = q | ||
113 | } | ||
114 | return qb | ||
115 | } | ||
116 | |||
117 | // add cancels the pending quota sent on acquired, incremented by v and sends | ||
118 | // it back on acquire. | ||
119 | func (qb *quotaPool) add(v int) { | ||
120 | qb.mu.Lock() | ||
121 | defer qb.mu.Unlock() | ||
122 | select { | ||
123 | case n := <-qb.c: | ||
124 | qb.quota += n | ||
125 | default: | ||
126 | } | ||
127 | qb.quota += v | ||
128 | if qb.quota <= 0 { | ||
129 | return | ||
130 | } | ||
131 | // After the pool has been created, this is the only place that sends on | ||
132 | // the channel. Since mu is held at this point and any quota that was sent | ||
133 | // on the channel has been retrieved, we know that this code will always | ||
134 | // place any positive quota value on the channel. | ||
135 | select { | ||
136 | case qb.c <- qb.quota: | ||
137 | qb.quota = 0 | ||
138 | default: | ||
139 | } | ||
140 | } | ||
141 | |||
142 | // acquire returns the channel on which available quota amounts are sent. | ||
143 | func (qb *quotaPool) acquire() <-chan int { | ||
144 | return qb.c | ||
145 | } | ||
146 | |||
147 | // inFlow deals with inbound flow control | ||
148 | type inFlow struct { | ||
149 | mu sync.Mutex | ||
150 | // The inbound flow control limit for pending data. | ||
151 | limit uint32 | ||
152 | // pendingData is the overall data which have been received but not been | ||
153 | // consumed by applications. | ||
154 | pendingData uint32 | ||
155 | // The amount of data the application has consumed but grpc has not sent | ||
156 | // window update for them. Used to reduce window update frequency. | ||
157 | pendingUpdate uint32 | ||
158 | // delta is the extra window update given by receiver when an application | ||
159 | // is reading data bigger in size than the inFlow limit. | ||
160 | delta uint32 | ||
161 | } | ||
162 | |||
163 | // newLimit updates the inflow window to a new value n. | ||
164 | // It assumes that n is always greater than the old limit. | ||
165 | func (f *inFlow) newLimit(n uint32) uint32 { | ||
166 | f.mu.Lock() | ||
167 | defer f.mu.Unlock() | ||
168 | d := n - f.limit | ||
169 | f.limit = n | ||
170 | return d | ||
171 | } | ||
172 | |||
173 | func (f *inFlow) maybeAdjust(n uint32) uint32 { | ||
174 | if n > uint32(math.MaxInt32) { | ||
175 | n = uint32(math.MaxInt32) | ||
176 | } | ||
177 | f.mu.Lock() | ||
178 | defer f.mu.Unlock() | ||
179 | // estSenderQuota is the receiver's view of the maximum number of bytes the sender | ||
180 | // can send without a window update. | ||
181 | estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate)) | ||
182 | // estUntransmittedData is the maximum number of bytes the sends might not have put | ||
183 | // on the wire yet. A value of 0 or less means that we have already received all or | ||
184 | // more bytes than the application is requesting to read. | ||
185 | estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative. | ||
186 | // This implies that unless we send a window update, the sender won't be able to send all the bytes | ||
187 | // for this message. Therefore we must send an update over the limit since there's an active read | ||
188 | // request from the application. | ||
189 | if estUntransmittedData > estSenderQuota { | ||
190 | // Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec. | ||
191 | if f.limit+n > maxWindowSize { | ||
192 | f.delta = maxWindowSize - f.limit | ||
193 | } else { | ||
194 | // Send a window update for the whole message and not just the difference between | ||
195 | // estUntransmittedData and estSenderQuota. This will be helpful in case the message | ||
196 | // is padded; We will fallback on the current available window(at least a 1/4th of the limit). | ||
197 | f.delta = n | ||
198 | } | ||
199 | return f.delta | ||
200 | } | ||
201 | return 0 | ||
202 | } | ||
203 | |||
204 | // onData is invoked when some data frame is received. It updates pendingData. | ||
205 | func (f *inFlow) onData(n uint32) error { | ||
206 | f.mu.Lock() | ||
207 | defer f.mu.Unlock() | ||
208 | f.pendingData += n | ||
209 | if f.pendingData+f.pendingUpdate > f.limit+f.delta { | ||
210 | return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit) | ||
211 | } | ||
212 | return nil | ||
213 | } | ||
214 | |||
215 | // onRead is invoked when the application reads the data. It returns the window size | ||
216 | // to be sent to the peer. | ||
217 | func (f *inFlow) onRead(n uint32) uint32 { | ||
218 | f.mu.Lock() | ||
219 | defer f.mu.Unlock() | ||
220 | if f.pendingData == 0 { | ||
221 | return 0 | ||
222 | } | ||
223 | f.pendingData -= n | ||
224 | if n > f.delta { | ||
225 | n -= f.delta | ||
226 | f.delta = 0 | ||
227 | } else { | ||
228 | f.delta -= n | ||
229 | n = 0 | ||
230 | } | ||
231 | f.pendingUpdate += n | ||
232 | if f.pendingUpdate >= f.limit/4 { | ||
233 | wu := f.pendingUpdate | ||
234 | f.pendingUpdate = 0 | ||
235 | return wu | ||
236 | } | ||
237 | return 0 | ||
238 | } | ||
239 | |||
240 | func (f *inFlow) resetPendingUpdate() uint32 { | ||
241 | f.mu.Lock() | ||
242 | defer f.mu.Unlock() | ||
243 | n := f.pendingUpdate | ||
244 | f.pendingUpdate = 0 | ||
245 | return n | ||
246 | } | ||