aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/google.golang.org/grpc/transport/control.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/transport/control.go')
-rw-r--r--vendor/google.golang.org/grpc/transport/control.go246
1 files changed, 246 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/transport/control.go b/vendor/google.golang.org/grpc/transport/control.go
new file mode 100644
index 0000000..501eb03
--- /dev/null
+++ b/vendor/google.golang.org/grpc/transport/control.go
@@ -0,0 +1,246 @@
1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19package transport
20
21import (
22 "fmt"
23 "math"
24 "sync"
25 "time"
26
27 "golang.org/x/net/http2"
28)
29
30const (
31 // The default value of flow control window size in HTTP2 spec.
32 defaultWindowSize = 65535
33 // The initial window size for flow control.
34 initialWindowSize = defaultWindowSize // for an RPC
35 infinity = time.Duration(math.MaxInt64)
36 defaultClientKeepaliveTime = infinity
37 defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
38 defaultMaxStreamsClient = 100
39 defaultMaxConnectionIdle = infinity
40 defaultMaxConnectionAge = infinity
41 defaultMaxConnectionAgeGrace = infinity
42 defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
43 defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
44 defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
45 // max window limit set by HTTP2 Specs.
46 maxWindowSize = math.MaxInt32
47)
48
49// The following defines various control items which could flow through
50// the control buffer of transport. They represent different aspects of
51// control tasks, e.g., flow control, settings, streaming resetting, etc.
52type windowUpdate struct {
53 streamID uint32
54 increment uint32
55 flush bool
56}
57
58func (*windowUpdate) item() {}
59
60type settings struct {
61 ack bool
62 ss []http2.Setting
63}
64
65func (*settings) item() {}
66
67type resetStream struct {
68 streamID uint32
69 code http2.ErrCode
70}
71
72func (*resetStream) item() {}
73
74type goAway struct {
75 code http2.ErrCode
76 debugData []byte
77 headsUp bool
78 closeConn bool
79}
80
81func (*goAway) item() {}
82
83type flushIO struct {
84}
85
86func (*flushIO) item() {}
87
88type ping struct {
89 ack bool
90 data [8]byte
91}
92
93func (*ping) item() {}
94
95// quotaPool is a pool which accumulates the quota and sends it to acquire()
96// when it is available.
97type quotaPool struct {
98 c chan int
99
100 mu sync.Mutex
101 quota int
102}
103
104// newQuotaPool creates a quotaPool which has quota q available to consume.
105func newQuotaPool(q int) *quotaPool {
106 qb := &quotaPool{
107 c: make(chan int, 1),
108 }
109 if q > 0 {
110 qb.c <- q
111 } else {
112 qb.quota = q
113 }
114 return qb
115}
116
117// add cancels the pending quota sent on acquired, incremented by v and sends
118// it back on acquire.
119func (qb *quotaPool) add(v int) {
120 qb.mu.Lock()
121 defer qb.mu.Unlock()
122 select {
123 case n := <-qb.c:
124 qb.quota += n
125 default:
126 }
127 qb.quota += v
128 if qb.quota <= 0 {
129 return
130 }
131 // After the pool has been created, this is the only place that sends on
132 // the channel. Since mu is held at this point and any quota that was sent
133 // on the channel has been retrieved, we know that this code will always
134 // place any positive quota value on the channel.
135 select {
136 case qb.c <- qb.quota:
137 qb.quota = 0
138 default:
139 }
140}
141
142// acquire returns the channel on which available quota amounts are sent.
143func (qb *quotaPool) acquire() <-chan int {
144 return qb.c
145}
146
147// inFlow deals with inbound flow control
148type inFlow struct {
149 mu sync.Mutex
150 // The inbound flow control limit for pending data.
151 limit uint32
152 // pendingData is the overall data which have been received but not been
153 // consumed by applications.
154 pendingData uint32
155 // The amount of data the application has consumed but grpc has not sent
156 // window update for them. Used to reduce window update frequency.
157 pendingUpdate uint32
158 // delta is the extra window update given by receiver when an application
159 // is reading data bigger in size than the inFlow limit.
160 delta uint32
161}
162
163// newLimit updates the inflow window to a new value n.
164// It assumes that n is always greater than the old limit.
165func (f *inFlow) newLimit(n uint32) uint32 {
166 f.mu.Lock()
167 defer f.mu.Unlock()
168 d := n - f.limit
169 f.limit = n
170 return d
171}
172
173func (f *inFlow) maybeAdjust(n uint32) uint32 {
174 if n > uint32(math.MaxInt32) {
175 n = uint32(math.MaxInt32)
176 }
177 f.mu.Lock()
178 defer f.mu.Unlock()
179 // estSenderQuota is the receiver's view of the maximum number of bytes the sender
180 // can send without a window update.
181 estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
182 // estUntransmittedData is the maximum number of bytes the sends might not have put
183 // on the wire yet. A value of 0 or less means that we have already received all or
184 // more bytes than the application is requesting to read.
185 estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative.
186 // This implies that unless we send a window update, the sender won't be able to send all the bytes
187 // for this message. Therefore we must send an update over the limit since there's an active read
188 // request from the application.
189 if estUntransmittedData > estSenderQuota {
190 // Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec.
191 if f.limit+n > maxWindowSize {
192 f.delta = maxWindowSize - f.limit
193 } else {
194 // Send a window update for the whole message and not just the difference between
195 // estUntransmittedData and estSenderQuota. This will be helpful in case the message
196 // is padded; We will fallback on the current available window(at least a 1/4th of the limit).
197 f.delta = n
198 }
199 return f.delta
200 }
201 return 0
202}
203
204// onData is invoked when some data frame is received. It updates pendingData.
205func (f *inFlow) onData(n uint32) error {
206 f.mu.Lock()
207 defer f.mu.Unlock()
208 f.pendingData += n
209 if f.pendingData+f.pendingUpdate > f.limit+f.delta {
210 return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
211 }
212 return nil
213}
214
215// onRead is invoked when the application reads the data. It returns the window size
216// to be sent to the peer.
217func (f *inFlow) onRead(n uint32) uint32 {
218 f.mu.Lock()
219 defer f.mu.Unlock()
220 if f.pendingData == 0 {
221 return 0
222 }
223 f.pendingData -= n
224 if n > f.delta {
225 n -= f.delta
226 f.delta = 0
227 } else {
228 f.delta -= n
229 n = 0
230 }
231 f.pendingUpdate += n
232 if f.pendingUpdate >= f.limit/4 {
233 wu := f.pendingUpdate
234 f.pendingUpdate = 0
235 return wu
236 }
237 return 0
238}
239
240func (f *inFlow) resetPendingUpdate() uint32 {
241 f.mu.Lock()
242 defer f.mu.Unlock()
243 n := f.pendingUpdate
244 f.pendingUpdate = 0
245 return n
246}