diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport/flowcontrol.go')
-rw-r--r-- | vendor/google.golang.org/grpc/internal/transport/flowcontrol.go | 218 |
1 files changed, 218 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go new file mode 100644 index 0000000..5ea997a --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/transport/flowcontrol.go | |||
@@ -0,0 +1,218 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package transport | ||
20 | |||
21 | import ( | ||
22 | "fmt" | ||
23 | "math" | ||
24 | "sync" | ||
25 | "sync/atomic" | ||
26 | ) | ||
27 | |||
28 | // writeQuota is a soft limit on the amount of data a stream can | ||
29 | // schedule before some of it is written out. | ||
30 | type writeQuota struct { | ||
31 | quota int32 | ||
32 | // get waits on read from when quota goes less than or equal to zero. | ||
33 | // replenish writes on it when quota goes positive again. | ||
34 | ch chan struct{} | ||
35 | // done is triggered in error case. | ||
36 | done <-chan struct{} | ||
37 | // replenish is called by loopyWriter to give quota back to. | ||
38 | // It is implemented as a field so that it can be updated | ||
39 | // by tests. | ||
40 | replenish func(n int) | ||
41 | } | ||
42 | |||
43 | func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota { | ||
44 | w := &writeQuota{ | ||
45 | quota: sz, | ||
46 | ch: make(chan struct{}, 1), | ||
47 | done: done, | ||
48 | } | ||
49 | w.replenish = w.realReplenish | ||
50 | return w | ||
51 | } | ||
52 | |||
53 | func (w *writeQuota) get(sz int32) error { | ||
54 | for { | ||
55 | if atomic.LoadInt32(&w.quota) > 0 { | ||
56 | atomic.AddInt32(&w.quota, -sz) | ||
57 | return nil | ||
58 | } | ||
59 | select { | ||
60 | case <-w.ch: | ||
61 | continue | ||
62 | case <-w.done: | ||
63 | return errStreamDone | ||
64 | } | ||
65 | } | ||
66 | } | ||
67 | |||
68 | func (w *writeQuota) realReplenish(n int) { | ||
69 | sz := int32(n) | ||
70 | a := atomic.AddInt32(&w.quota, sz) | ||
71 | b := a - sz | ||
72 | if b <= 0 && a > 0 { | ||
73 | select { | ||
74 | case w.ch <- struct{}{}: | ||
75 | default: | ||
76 | } | ||
77 | } | ||
78 | } | ||
79 | |||
80 | type trInFlow struct { | ||
81 | limit uint32 | ||
82 | unacked uint32 | ||
83 | effectiveWindowSize uint32 | ||
84 | } | ||
85 | |||
86 | func (f *trInFlow) newLimit(n uint32) uint32 { | ||
87 | d := n - f.limit | ||
88 | f.limit = n | ||
89 | f.updateEffectiveWindowSize() | ||
90 | return d | ||
91 | } | ||
92 | |||
93 | func (f *trInFlow) onData(n uint32) uint32 { | ||
94 | f.unacked += n | ||
95 | if f.unacked >= f.limit/4 { | ||
96 | w := f.unacked | ||
97 | f.unacked = 0 | ||
98 | f.updateEffectiveWindowSize() | ||
99 | return w | ||
100 | } | ||
101 | f.updateEffectiveWindowSize() | ||
102 | return 0 | ||
103 | } | ||
104 | |||
105 | func (f *trInFlow) reset() uint32 { | ||
106 | w := f.unacked | ||
107 | f.unacked = 0 | ||
108 | f.updateEffectiveWindowSize() | ||
109 | return w | ||
110 | } | ||
111 | |||
112 | func (f *trInFlow) updateEffectiveWindowSize() { | ||
113 | atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked) | ||
114 | } | ||
115 | |||
116 | func (f *trInFlow) getSize() uint32 { | ||
117 | return atomic.LoadUint32(&f.effectiveWindowSize) | ||
118 | } | ||
119 | |||
120 | // TODO(mmukhi): Simplify this code. | ||
121 | // inFlow deals with inbound flow control | ||
122 | type inFlow struct { | ||
123 | mu sync.Mutex | ||
124 | // The inbound flow control limit for pending data. | ||
125 | limit uint32 | ||
126 | // pendingData is the overall data which have been received but not been | ||
127 | // consumed by applications. | ||
128 | pendingData uint32 | ||
129 | // The amount of data the application has consumed but grpc has not sent | ||
130 | // window update for them. Used to reduce window update frequency. | ||
131 | pendingUpdate uint32 | ||
132 | // delta is the extra window update given by receiver when an application | ||
133 | // is reading data bigger in size than the inFlow limit. | ||
134 | delta uint32 | ||
135 | } | ||
136 | |||
137 | // newLimit updates the inflow window to a new value n. | ||
138 | // It assumes that n is always greater than the old limit. | ||
139 | func (f *inFlow) newLimit(n uint32) uint32 { | ||
140 | f.mu.Lock() | ||
141 | d := n - f.limit | ||
142 | f.limit = n | ||
143 | f.mu.Unlock() | ||
144 | return d | ||
145 | } | ||
146 | |||
147 | func (f *inFlow) maybeAdjust(n uint32) uint32 { | ||
148 | if n > uint32(math.MaxInt32) { | ||
149 | n = uint32(math.MaxInt32) | ||
150 | } | ||
151 | f.mu.Lock() | ||
152 | // estSenderQuota is the receiver's view of the maximum number of bytes the sender | ||
153 | // can send without a window update. | ||
154 | estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate)) | ||
155 | // estUntransmittedData is the maximum number of bytes the sends might not have put | ||
156 | // on the wire yet. A value of 0 or less means that we have already received all or | ||
157 | // more bytes than the application is requesting to read. | ||
158 | estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative. | ||
159 | // This implies that unless we send a window update, the sender won't be able to send all the bytes | ||
160 | // for this message. Therefore we must send an update over the limit since there's an active read | ||
161 | // request from the application. | ||
162 | if estUntransmittedData > estSenderQuota { | ||
163 | // Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec. | ||
164 | if f.limit+n > maxWindowSize { | ||
165 | f.delta = maxWindowSize - f.limit | ||
166 | } else { | ||
167 | // Send a window update for the whole message and not just the difference between | ||
168 | // estUntransmittedData and estSenderQuota. This will be helpful in case the message | ||
169 | // is padded; We will fallback on the current available window(at least a 1/4th of the limit). | ||
170 | f.delta = n | ||
171 | } | ||
172 | f.mu.Unlock() | ||
173 | return f.delta | ||
174 | } | ||
175 | f.mu.Unlock() | ||
176 | return 0 | ||
177 | } | ||
178 | |||
179 | // onData is invoked when some data frame is received. It updates pendingData. | ||
180 | func (f *inFlow) onData(n uint32) error { | ||
181 | f.mu.Lock() | ||
182 | f.pendingData += n | ||
183 | if f.pendingData+f.pendingUpdate > f.limit+f.delta { | ||
184 | limit := f.limit | ||
185 | rcvd := f.pendingData + f.pendingUpdate | ||
186 | f.mu.Unlock() | ||
187 | return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit) | ||
188 | } | ||
189 | f.mu.Unlock() | ||
190 | return nil | ||
191 | } | ||
192 | |||
193 | // onRead is invoked when the application reads the data. It returns the window size | ||
194 | // to be sent to the peer. | ||
195 | func (f *inFlow) onRead(n uint32) uint32 { | ||
196 | f.mu.Lock() | ||
197 | if f.pendingData == 0 { | ||
198 | f.mu.Unlock() | ||
199 | return 0 | ||
200 | } | ||
201 | f.pendingData -= n | ||
202 | if n > f.delta { | ||
203 | n -= f.delta | ||
204 | f.delta = 0 | ||
205 | } else { | ||
206 | f.delta -= n | ||
207 | n = 0 | ||
208 | } | ||
209 | f.pendingUpdate += n | ||
210 | if f.pendingUpdate >= f.limit/4 { | ||
211 | wu := f.pendingUpdate | ||
212 | f.pendingUpdate = 0 | ||
213 | f.mu.Unlock() | ||
214 | return wu | ||
215 | } | ||
216 | f.mu.Unlock() | ||
217 | return 0 | ||
218 | } | ||