"fmt"
"math"
"sync"
- "time"
-
- "golang.org/x/net/http2"
-)
-
-const (
- // The default value of flow control window size in HTTP2 spec.
- defaultWindowSize = 65535
- // The initial window size for flow control.
- initialWindowSize = defaultWindowSize // for an RPC
- infinity = time.Duration(math.MaxInt64)
- defaultClientKeepaliveTime = infinity
- defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
- defaultMaxStreamsClient = 100
- defaultMaxConnectionIdle = infinity
- defaultMaxConnectionAge = infinity
- defaultMaxConnectionAgeGrace = infinity
- defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
- defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
- defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
- // max window limit set by HTTP2 Specs.
- maxWindowSize = math.MaxInt32
+ "sync/atomic"
)
-// The following defines various control items which could flow through
-// the control buffer of transport. They represent different aspects of
-// control tasks, e.g., flow control, settings, streaming resetting, etc.
-type windowUpdate struct {
- streamID uint32
- increment uint32
- flush bool
-}
-
-func (*windowUpdate) item() {}
-
-type settings struct {
- ack bool
- ss []http2.Setting
+// writeQuota is a soft limit on the amount of data a stream can
+// schedule before some of it is written out.
+type writeQuota struct {
+ quota int32
+ // get waits on read from when quota goes less than or equal to zero.
+ // replenish writes on it when quota goes positive again.
+ ch chan struct{}
+ // done is triggered in error case.
+ done <-chan struct{}
+ // replenish is called by loopyWriter to give quota back to.
+ // It is implemented as a field so that it can be updated
+ // by tests.
+ replenish func(n int)
+}
+
+func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
+ w := &writeQuota{
+ quota: sz,
+ ch: make(chan struct{}, 1),
+ done: done,
+ }
+ w.replenish = w.realReplenish
+ return w
}
-func (*settings) item() {}
-
-type resetStream struct {
- streamID uint32
- code http2.ErrCode
+func (w *writeQuota) get(sz int32) error {
+ for {
+ if atomic.LoadInt32(&w.quota) > 0 {
+ atomic.AddInt32(&w.quota, -sz)
+ return nil
+ }
+ select {
+ case <-w.ch:
+ continue
+ case <-w.done:
+ return errStreamDone
+ }
+ }
}
-func (*resetStream) item() {}
-
-type goAway struct {
- code http2.ErrCode
- debugData []byte
- headsUp bool
- closeConn bool
+func (w *writeQuota) realReplenish(n int) {
+ sz := int32(n)
+ a := atomic.AddInt32(&w.quota, sz)
+ b := a - sz
+ if b <= 0 && a > 0 {
+ select {
+ case w.ch <- struct{}{}:
+ default:
+ }
+ }
}
-func (*goAway) item() {}
-
-type flushIO struct {
+type trInFlow struct {
+ limit uint32
+ unacked uint32
+ effectiveWindowSize uint32
}
-func (*flushIO) item() {}
-
-type ping struct {
- ack bool
- data [8]byte
+func (f *trInFlow) newLimit(n uint32) uint32 {
+ d := n - f.limit
+ f.limit = n
+ f.updateEffectiveWindowSize()
+ return d
}
-func (*ping) item() {}
-
-// quotaPool is a pool which accumulates the quota and sends it to acquire()
-// when it is available.
-type quotaPool struct {
- c chan int
-
- mu sync.Mutex
- quota int
+func (f *trInFlow) onData(n uint32) uint32 {
+ f.unacked += n
+ if f.unacked >= f.limit/4 {
+ w := f.unacked
+ f.unacked = 0
+ f.updateEffectiveWindowSize()
+ return w
+ }
+ f.updateEffectiveWindowSize()
+ return 0
}
-// newQuotaPool creates a quotaPool which has quota q available to consume.
-func newQuotaPool(q int) *quotaPool {
- qb := "aPool{
- c: make(chan int, 1),
- }
- if q > 0 {
- qb.c <- q
- } else {
- qb.quota = q
- }
- return qb
+func (f *trInFlow) reset() uint32 {
+ w := f.unacked
+ f.unacked = 0
+ f.updateEffectiveWindowSize()
+ return w
}
-// add cancels the pending quota sent on acquired, incremented by v and sends
-// it back on acquire.
-func (qb *quotaPool) add(v int) {
- qb.mu.Lock()
- defer qb.mu.Unlock()
- select {
- case n := <-qb.c:
- qb.quota += n
- default:
- }
- qb.quota += v
- if qb.quota <= 0 {
- return
- }
- // After the pool has been created, this is the only place that sends on
- // the channel. Since mu is held at this point and any quota that was sent
- // on the channel has been retrieved, we know that this code will always
- // place any positive quota value on the channel.
- select {
- case qb.c <- qb.quota:
- qb.quota = 0
- default:
- }
+func (f *trInFlow) updateEffectiveWindowSize() {
+ atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked)
}
-// acquire returns the channel on which available quota amounts are sent.
-func (qb *quotaPool) acquire() <-chan int {
- return qb.c
+func (f *trInFlow) getSize() uint32 {
+ return atomic.LoadUint32(&f.effectiveWindowSize)
}
+// TODO(mmukhi): Simplify this code.
// inFlow deals with inbound flow control
type inFlow struct {
mu sync.Mutex
// It assumes that n is always greater than the old limit.
func (f *inFlow) newLimit(n uint32) uint32 {
f.mu.Lock()
- defer f.mu.Unlock()
d := n - f.limit
f.limit = n
+ f.mu.Unlock()
return d
}
n = uint32(math.MaxInt32)
}
f.mu.Lock()
- defer f.mu.Unlock()
// estSenderQuota is the receiver's view of the maximum number of bytes the sender
// can send without a window update.
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
// for this message. Therefore we must send an update over the limit since there's an active read
// request from the application.
if estUntransmittedData > estSenderQuota {
- // Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec.
+ // Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.
if f.limit+n > maxWindowSize {
f.delta = maxWindowSize - f.limit
} else {
// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
f.delta = n
}
+ f.mu.Unlock()
return f.delta
}
+ f.mu.Unlock()
return 0
}
// onData is invoked when some data frame is received. It updates pendingData.
func (f *inFlow) onData(n uint32) error {
f.mu.Lock()
- defer f.mu.Unlock()
f.pendingData += n
if f.pendingData+f.pendingUpdate > f.limit+f.delta {
- return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
+ limit := f.limit
+ rcvd := f.pendingData + f.pendingUpdate
+ f.mu.Unlock()
+ return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
}
+ f.mu.Unlock()
return nil
}
// to be sent to the peer.
func (f *inFlow) onRead(n uint32) uint32 {
f.mu.Lock()
- defer f.mu.Unlock()
if f.pendingData == 0 {
+ f.mu.Unlock()
return 0
}
f.pendingData -= n
if f.pendingUpdate >= f.limit/4 {
wu := f.pendingUpdate
f.pendingUpdate = 0
+ f.mu.Unlock()
return wu
}
+ f.mu.Unlock()
return 0
}
-
-func (f *inFlow) resetPendingUpdate() uint32 {
- f.mu.Lock()
- defer f.mu.Unlock()
- n := f.pendingUpdate
- f.pendingUpdate = 0
- return n
-}