]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blobdiff - vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
Upgrade to 0.12
[github/fretlink/terraform-provider-statuscake.git] / vendor / google.golang.org / grpc / internal / transport / flowcontrol.go
similarity index 53%
rename from vendor/google.golang.org/grpc/transport/control.go
rename to vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
index 501eb03c49f74f1a7a82021071570eaddbec8700..5ea997a7e45b0fcd5e2086ceb76bfa47ddf6a61e 100644 (file)
@@ -22,128 +22,102 @@ import (
        "fmt"
        "math"
        "sync"
-       "time"
-
-       "golang.org/x/net/http2"
-)
-
-const (
-       // The default value of flow control window size in HTTP2 spec.
-       defaultWindowSize = 65535
-       // The initial window size for flow control.
-       initialWindowSize             = defaultWindowSize // for an RPC
-       infinity                      = time.Duration(math.MaxInt64)
-       defaultClientKeepaliveTime    = infinity
-       defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
-       defaultMaxStreamsClient       = 100
-       defaultMaxConnectionIdle      = infinity
-       defaultMaxConnectionAge       = infinity
-       defaultMaxConnectionAgeGrace  = infinity
-       defaultServerKeepaliveTime    = time.Duration(2 * time.Hour)
-       defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
-       defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
-       // max window limit set by HTTP2 Specs.
-       maxWindowSize = math.MaxInt32
+       "sync/atomic"
 )
 
-// The following defines various control items which could flow through
-// the control buffer of transport. They represent different aspects of
-// control tasks, e.g., flow control, settings, streaming resetting, etc.
-type windowUpdate struct {
-       streamID  uint32
-       increment uint32
-       flush     bool
-}
-
-func (*windowUpdate) item() {}
-
-type settings struct {
-       ack bool
-       ss  []http2.Setting
+// writeQuota is a soft limit on the amount of data a stream can
+// schedule before some of it is written out.
+type writeQuota struct {
+       quota int32
+       // get waits on read from when quota goes less than or equal to zero.
+       // replenish writes on it when quota goes positive again.
+       ch chan struct{}
+       // done is triggered in error case.
+       done <-chan struct{}
+       // replenish is called by loopyWriter to give quota back to.
+       // It is implemented as a field so that it can be updated
+       // by tests.
+       replenish func(n int)
+}
+
+func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
+       w := &writeQuota{
+               quota: sz,
+               ch:    make(chan struct{}, 1),
+               done:  done,
+       }
+       w.replenish = w.realReplenish
+       return w
 }
 
-func (*settings) item() {}
-
-type resetStream struct {
-       streamID uint32
-       code     http2.ErrCode
+func (w *writeQuota) get(sz int32) error {
+       for {
+               if atomic.LoadInt32(&w.quota) > 0 {
+                       atomic.AddInt32(&w.quota, -sz)
+                       return nil
+               }
+               select {
+               case <-w.ch:
+                       continue
+               case <-w.done:
+                       return errStreamDone
+               }
+       }
 }
 
-func (*resetStream) item() {}
-
-type goAway struct {
-       code      http2.ErrCode
-       debugData []byte
-       headsUp   bool
-       closeConn bool
+func (w *writeQuota) realReplenish(n int) {
+       sz := int32(n)
+       a := atomic.AddInt32(&w.quota, sz)
+       b := a - sz
+       if b <= 0 && a > 0 {
+               select {
+               case w.ch <- struct{}{}:
+               default:
+               }
+       }
 }
 
-func (*goAway) item() {}
-
-type flushIO struct {
+type trInFlow struct {
+       limit               uint32
+       unacked             uint32
+       effectiveWindowSize uint32
 }
 
-func (*flushIO) item() {}
-
-type ping struct {
-       ack  bool
-       data [8]byte
+func (f *trInFlow) newLimit(n uint32) uint32 {
+       d := n - f.limit
+       f.limit = n
+       f.updateEffectiveWindowSize()
+       return d
 }
 
-func (*ping) item() {}
-
-// quotaPool is a pool which accumulates the quota and sends it to acquire()
-// when it is available.
-type quotaPool struct {
-       c chan int
-
-       mu    sync.Mutex
-       quota int
+func (f *trInFlow) onData(n uint32) uint32 {
+       f.unacked += n
+       if f.unacked >= f.limit/4 {
+               w := f.unacked
+               f.unacked = 0
+               f.updateEffectiveWindowSize()
+               return w
+       }
+       f.updateEffectiveWindowSize()
+       return 0
 }
 
-// newQuotaPool creates a quotaPool which has quota q available to consume.
-func newQuotaPool(q int) *quotaPool {
-       qb := &quotaPool{
-               c: make(chan int, 1),
-       }
-       if q > 0 {
-               qb.c <- q
-       } else {
-               qb.quota = q
-       }
-       return qb
+func (f *trInFlow) reset() uint32 {
+       w := f.unacked
+       f.unacked = 0
+       f.updateEffectiveWindowSize()
+       return w
 }
 
-// add cancels the pending quota sent on acquired, incremented by v and sends
-// it back on acquire.
-func (qb *quotaPool) add(v int) {
-       qb.mu.Lock()
-       defer qb.mu.Unlock()
-       select {
-       case n := <-qb.c:
-               qb.quota += n
-       default:
-       }
-       qb.quota += v
-       if qb.quota <= 0 {
-               return
-       }
-       // After the pool has been created, this is the only place that sends on
-       // the channel. Since mu is held at this point and any quota that was sent
-       // on the channel has been retrieved, we know that this code will always
-       // place any positive quota value on the channel.
-       select {
-       case qb.c <- qb.quota:
-               qb.quota = 0
-       default:
-       }
+func (f *trInFlow) updateEffectiveWindowSize() {
+       atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked)
 }
 
-// acquire returns the channel on which available quota amounts are sent.
-func (qb *quotaPool) acquire() <-chan int {
-       return qb.c
+func (f *trInFlow) getSize() uint32 {
+       return atomic.LoadUint32(&f.effectiveWindowSize)
 }
 
+// TODO(mmukhi): Simplify this code.
 // inFlow deals with inbound flow control
 type inFlow struct {
        mu sync.Mutex
@@ -164,9 +138,9 @@ type inFlow struct {
 // It assumes that n is always greater than the old limit.
 func (f *inFlow) newLimit(n uint32) uint32 {
        f.mu.Lock()
-       defer f.mu.Unlock()
        d := n - f.limit
        f.limit = n
+       f.mu.Unlock()
        return d
 }
 
@@ -175,7 +149,6 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 {
                n = uint32(math.MaxInt32)
        }
        f.mu.Lock()
-       defer f.mu.Unlock()
        // estSenderQuota is the receiver's view of the maximum number of bytes the sender
        // can send without a window update.
        estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
@@ -187,7 +160,7 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 {
        // for this message. Therefore we must send an update over the limit since there's an active read
        // request from the application.
        if estUntransmittedData > estSenderQuota {
-               // Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec.
+               // Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.
                if f.limit+n > maxWindowSize {
                        f.delta = maxWindowSize - f.limit
                } else {
@@ -196,19 +169,24 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 {
                        // is padded; We will fallback on the current available window(at least a 1/4th of the limit).
                        f.delta = n
                }
+               f.mu.Unlock()
                return f.delta
        }
+       f.mu.Unlock()
        return 0
 }
 
 // onData is invoked when some data frame is received. It updates pendingData.
 func (f *inFlow) onData(n uint32) error {
        f.mu.Lock()
-       defer f.mu.Unlock()
        f.pendingData += n
        if f.pendingData+f.pendingUpdate > f.limit+f.delta {
-               return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
+               limit := f.limit
+               rcvd := f.pendingData + f.pendingUpdate
+               f.mu.Unlock()
+               return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
        }
+       f.mu.Unlock()
        return nil
 }
 
@@ -216,8 +194,8 @@ func (f *inFlow) onData(n uint32) error {
 // to be sent to the peer.
 func (f *inFlow) onRead(n uint32) uint32 {
        f.mu.Lock()
-       defer f.mu.Unlock()
        if f.pendingData == 0 {
+               f.mu.Unlock()
                return 0
        }
        f.pendingData -= n
@@ -232,15 +210,9 @@ func (f *inFlow) onRead(n uint32) uint32 {
        if f.pendingUpdate >= f.limit/4 {
                wu := f.pendingUpdate
                f.pendingUpdate = 0
+               f.mu.Unlock()
                return wu
        }
+       f.mu.Unlock()
        return 0
 }
-
-func (f *inFlow) resetPendingUpdate() uint32 {
-       f.mu.Lock()
-       defer f.mu.Unlock()
-       n := f.pendingUpdate
-       f.pendingUpdate = 0
-       return n
-}