]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | /* |
2 | * | |
3 | * Copyright 2014 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package transport | |
20 | ||
21 | import ( | |
22 | "fmt" | |
23 | "math" | |
24 | "sync" | |
25 | "time" | |
26 | ||
27 | "golang.org/x/net/http2" | |
28 | ) | |
29 | ||
30 | const ( | |
31 | // The default value of flow control window size in HTTP2 spec. | |
32 | defaultWindowSize = 65535 | |
33 | // The initial window size for flow control. | |
34 | initialWindowSize = defaultWindowSize // for an RPC | |
35 | infinity = time.Duration(math.MaxInt64) | |
36 | defaultClientKeepaliveTime = infinity | |
37 | defaultClientKeepaliveTimeout = time.Duration(20 * time.Second) | |
38 | defaultMaxStreamsClient = 100 | |
39 | defaultMaxConnectionIdle = infinity | |
40 | defaultMaxConnectionAge = infinity | |
41 | defaultMaxConnectionAgeGrace = infinity | |
42 | defaultServerKeepaliveTime = time.Duration(2 * time.Hour) | |
43 | defaultServerKeepaliveTimeout = time.Duration(20 * time.Second) | |
44 | defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute) | |
45 | // max window limit set by HTTP2 Specs. | |
46 | maxWindowSize = math.MaxInt32 | |
47 | ) | |
48 | ||
49 | // The following defines various control items which could flow through | |
50 | // the control buffer of transport. They represent different aspects of | |
51 | // control tasks, e.g., flow control, settings, streaming resetting, etc. | |
52 | type windowUpdate struct { | |
53 | streamID uint32 | |
54 | increment uint32 | |
55 | flush bool | |
56 | } | |
57 | ||
58 | func (*windowUpdate) item() {} | |
59 | ||
60 | type settings struct { | |
61 | ack bool | |
62 | ss []http2.Setting | |
63 | } | |
64 | ||
65 | func (*settings) item() {} | |
66 | ||
67 | type resetStream struct { | |
68 | streamID uint32 | |
69 | code http2.ErrCode | |
70 | } | |
71 | ||
72 | func (*resetStream) item() {} | |
73 | ||
74 | type goAway struct { | |
75 | code http2.ErrCode | |
76 | debugData []byte | |
77 | headsUp bool | |
78 | closeConn bool | |
79 | } | |
80 | ||
81 | func (*goAway) item() {} | |
82 | ||
83 | type flushIO struct { | |
84 | } | |
85 | ||
86 | func (*flushIO) item() {} | |
87 | ||
88 | type ping struct { | |
89 | ack bool | |
90 | data [8]byte | |
91 | } | |
92 | ||
93 | func (*ping) item() {} | |
94 | ||
95 | // quotaPool is a pool which accumulates the quota and sends it to acquire() | |
96 | // when it is available. | |
97 | type quotaPool struct { | |
98 | c chan int | |
99 | ||
100 | mu sync.Mutex | |
101 | quota int | |
102 | } | |
103 | ||
104 | // newQuotaPool creates a quotaPool which has quota q available to consume. | |
105 | func newQuotaPool(q int) *quotaPool { | |
106 | qb := "aPool{ | |
107 | c: make(chan int, 1), | |
108 | } | |
109 | if q > 0 { | |
110 | qb.c <- q | |
111 | } else { | |
112 | qb.quota = q | |
113 | } | |
114 | return qb | |
115 | } | |
116 | ||
117 | // add cancels the pending quota sent on acquired, incremented by v and sends | |
118 | // it back on acquire. | |
119 | func (qb *quotaPool) add(v int) { | |
120 | qb.mu.Lock() | |
121 | defer qb.mu.Unlock() | |
122 | select { | |
123 | case n := <-qb.c: | |
124 | qb.quota += n | |
125 | default: | |
126 | } | |
127 | qb.quota += v | |
128 | if qb.quota <= 0 { | |
129 | return | |
130 | } | |
131 | // After the pool has been created, this is the only place that sends on | |
132 | // the channel. Since mu is held at this point and any quota that was sent | |
133 | // on the channel has been retrieved, we know that this code will always | |
134 | // place any positive quota value on the channel. | |
135 | select { | |
136 | case qb.c <- qb.quota: | |
137 | qb.quota = 0 | |
138 | default: | |
139 | } | |
140 | } | |
141 | ||
142 | // acquire returns the channel on which available quota amounts are sent. | |
143 | func (qb *quotaPool) acquire() <-chan int { | |
144 | return qb.c | |
145 | } | |
146 | ||
147 | // inFlow deals with inbound flow control | |
148 | type inFlow struct { | |
149 | mu sync.Mutex | |
150 | // The inbound flow control limit for pending data. | |
151 | limit uint32 | |
152 | // pendingData is the overall data which have been received but not been | |
153 | // consumed by applications. | |
154 | pendingData uint32 | |
155 | // The amount of data the application has consumed but grpc has not sent | |
156 | // window update for them. Used to reduce window update frequency. | |
157 | pendingUpdate uint32 | |
158 | // delta is the extra window update given by receiver when an application | |
159 | // is reading data bigger in size than the inFlow limit. | |
160 | delta uint32 | |
161 | } | |
162 | ||
163 | // newLimit updates the inflow window to a new value n. | |
164 | // It assumes that n is always greater than the old limit. | |
165 | func (f *inFlow) newLimit(n uint32) uint32 { | |
166 | f.mu.Lock() | |
167 | defer f.mu.Unlock() | |
168 | d := n - f.limit | |
169 | f.limit = n | |
170 | return d | |
171 | } | |
172 | ||
173 | func (f *inFlow) maybeAdjust(n uint32) uint32 { | |
174 | if n > uint32(math.MaxInt32) { | |
175 | n = uint32(math.MaxInt32) | |
176 | } | |
177 | f.mu.Lock() | |
178 | defer f.mu.Unlock() | |
179 | // estSenderQuota is the receiver's view of the maximum number of bytes the sender | |
180 | // can send without a window update. | |
181 | estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate)) | |
182 | // estUntransmittedData is the maximum number of bytes the sends might not have put | |
183 | // on the wire yet. A value of 0 or less means that we have already received all or | |
184 | // more bytes than the application is requesting to read. | |
185 | estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative. | |
186 | // This implies that unless we send a window update, the sender won't be able to send all the bytes | |
187 | // for this message. Therefore we must send an update over the limit since there's an active read | |
188 | // request from the application. | |
189 | if estUntransmittedData > estSenderQuota { | |
190 | // Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec. | |
191 | if f.limit+n > maxWindowSize { | |
192 | f.delta = maxWindowSize - f.limit | |
193 | } else { | |
194 | // Send a window update for the whole message and not just the difference between | |
195 | // estUntransmittedData and estSenderQuota. This will be helpful in case the message | |
196 | // is padded; We will fallback on the current available window(at least a 1/4th of the limit). | |
197 | f.delta = n | |
198 | } | |
199 | return f.delta | |
200 | } | |
201 | return 0 | |
202 | } | |
203 | ||
204 | // onData is invoked when some data frame is received. It updates pendingData. | |
205 | func (f *inFlow) onData(n uint32) error { | |
206 | f.mu.Lock() | |
207 | defer f.mu.Unlock() | |
208 | f.pendingData += n | |
209 | if f.pendingData+f.pendingUpdate > f.limit+f.delta { | |
210 | return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit) | |
211 | } | |
212 | return nil | |
213 | } | |
214 | ||
215 | // onRead is invoked when the application reads the data. It returns the window size | |
216 | // to be sent to the peer. | |
217 | func (f *inFlow) onRead(n uint32) uint32 { | |
218 | f.mu.Lock() | |
219 | defer f.mu.Unlock() | |
220 | if f.pendingData == 0 { | |
221 | return 0 | |
222 | } | |
223 | f.pendingData -= n | |
224 | if n > f.delta { | |
225 | n -= f.delta | |
226 | f.delta = 0 | |
227 | } else { | |
228 | f.delta -= n | |
229 | n = 0 | |
230 | } | |
231 | f.pendingUpdate += n | |
232 | if f.pendingUpdate >= f.limit/4 { | |
233 | wu := f.pendingUpdate | |
234 | f.pendingUpdate = 0 | |
235 | return wu | |
236 | } | |
237 | return 0 | |
238 | } | |
239 | ||
240 | func (f *inFlow) resetPendingUpdate() uint32 { | |
241 | f.mu.Lock() | |
242 | defer f.mu.Unlock() | |
243 | n := f.pendingUpdate | |
244 | f.pendingUpdate = 0 | |
245 | return n | |
246 | } |