]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blobdiff - vendor/github.com/hashicorp/yamux/stream.go
Upgrade to 0.12
[github/fretlink/terraform-provider-statuscake.git] / vendor / github.com / hashicorp / yamux / stream.go
index d216e281ca1dbda6c5643dca851735e281d55840..aa23919739832b25e79d4c4787f6d5eaad8776b5 100644 (file)
@@ -47,8 +47,8 @@ type Stream struct {
        recvNotifyCh chan struct{}
        sendNotifyCh chan struct{}
 
-       readDeadline  time.Time
-       writeDeadline time.Time
+       readDeadline  atomic.Value // time.Time
+       writeDeadline atomic.Value // time.Time
 }
 
 // newStream is used to construct a new stream within
@@ -67,6 +67,8 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
                recvNotifyCh: make(chan struct{}, 1),
                sendNotifyCh: make(chan struct{}, 1),
        }
+       s.readDeadline.Store(time.Time{})
+       s.writeDeadline.Store(time.Time{})
        return s
 }
 
@@ -122,8 +124,9 @@ START:
 WAIT:
        var timeout <-chan time.Time
        var timer *time.Timer
-       if !s.readDeadline.IsZero() {
-               delay := s.readDeadline.Sub(time.Now())
+       readDeadline := s.readDeadline.Load().(time.Time)
+       if !readDeadline.IsZero() {
+               delay := readDeadline.Sub(time.Now())
                timer = time.NewTimer(delay)
                timeout = timer.C
        }
@@ -188,7 +191,7 @@ START:
 
        // Send the header
        s.sendHdr.encode(typeData, flags, s.id, max)
-       if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
+       if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
                return 0, err
        }
 
@@ -200,8 +203,9 @@ START:
 
 WAIT:
        var timeout <-chan time.Time
-       if !s.writeDeadline.IsZero() {
-               delay := s.writeDeadline.Sub(time.Now())
+       writeDeadline := s.writeDeadline.Load().(time.Time)
+       if !writeDeadline.IsZero() {
+               delay := writeDeadline.Sub(time.Now())
                timeout = time.After(delay)
        }
        select {
@@ -238,18 +242,25 @@ func (s *Stream) sendWindowUpdate() error {
 
        // Determine the delta update
        max := s.session.config.MaxStreamWindowSize
-       delta := max - atomic.LoadUint32(&s.recvWindow)
+       var bufLen uint32
+       s.recvLock.Lock()
+       if s.recvBuf != nil {
+               bufLen = uint32(s.recvBuf.Len())
+       }
+       delta := (max - bufLen) - s.recvWindow
 
        // Determine the flags if any
        flags := s.sendFlags()
 
        // Check if we can omit the update
        if delta < (max/2) && flags == 0 {
+               s.recvLock.Unlock()
                return nil
        }
 
        // Update our window
-       atomic.AddUint32(&s.recvWindow, delta)
+       s.recvWindow += delta
+       s.recvLock.Unlock()
 
        // Send the header
        s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
@@ -392,16 +403,18 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
        if length == 0 {
                return nil
        }
-       if remain := atomic.LoadUint32(&s.recvWindow); length > remain {
-               s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, remain, length)
-               return ErrRecvWindowExceeded
-       }
 
        // Wrap in a limited reader
        conn = &io.LimitedReader{R: conn, N: int64(length)}
 
        // Copy into buffer
        s.recvLock.Lock()
+
+       if length > s.recvWindow {
+               s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvWindow, length)
+               return ErrRecvWindowExceeded
+       }
+
        if s.recvBuf == nil {
                // Allocate the receive buffer just-in-time to fit the full data frame.
                // This way we can read in the whole packet without further allocations.
@@ -414,7 +427,7 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
        }
 
        // Decrement the receive window
-       atomic.AddUint32(&s.recvWindow, ^uint32(length-1))
+       s.recvWindow -= length
        s.recvLock.Unlock()
 
        // Unblock any readers
@@ -435,13 +448,13 @@ func (s *Stream) SetDeadline(t time.Time) error {
 
 // SetReadDeadline sets the deadline for future Read calls.
 func (s *Stream) SetReadDeadline(t time.Time) error {
-       s.readDeadline = t
+       s.readDeadline.Store(t)
        return nil
 }
 
 // SetWriteDeadline sets the deadline for future Write calls
 func (s *Stream) SetWriteDeadline(t time.Time) error {
-       s.writeDeadline = t
+       s.writeDeadline.Store(t)
        return nil
 }