X-Git-Url: https://git.immae.eu/?a=blobdiff_plain;f=vendor%2Fgithub.com%2Fhashicorp%2Fyamux%2Fstream.go;h=aa23919739832b25e79d4c4787f6d5eaad8776b5;hb=107c1cdb09c575aa2f61d97f48d8587eb6bada4c;hp=d216e281ca1dbda6c5643dca851735e281d55840;hpb=cec3de8a3bcaffd21dedd1bf42da4b490cae7e16;p=github%2Ffretlink%2Fterraform-provider-statuscake.git diff --git a/vendor/github.com/hashicorp/yamux/stream.go b/vendor/github.com/hashicorp/yamux/stream.go index d216e28..aa23919 100644 --- a/vendor/github.com/hashicorp/yamux/stream.go +++ b/vendor/github.com/hashicorp/yamux/stream.go @@ -47,8 +47,8 @@ type Stream struct { recvNotifyCh chan struct{} sendNotifyCh chan struct{} - readDeadline time.Time - writeDeadline time.Time + readDeadline atomic.Value // time.Time + writeDeadline atomic.Value // time.Time } // newStream is used to construct a new stream within @@ -67,6 +67,8 @@ func newStream(session *Session, id uint32, state streamState) *Stream { recvNotifyCh: make(chan struct{}, 1), sendNotifyCh: make(chan struct{}, 1), } + s.readDeadline.Store(time.Time{}) + s.writeDeadline.Store(time.Time{}) return s } @@ -122,8 +124,9 @@ START: WAIT: var timeout <-chan time.Time var timer *time.Timer - if !s.readDeadline.IsZero() { - delay := s.readDeadline.Sub(time.Now()) + readDeadline := s.readDeadline.Load().(time.Time) + if !readDeadline.IsZero() { + delay := readDeadline.Sub(time.Now()) timer = time.NewTimer(delay) timeout = timer.C } @@ -188,7 +191,7 @@ START: // Send the header s.sendHdr.encode(typeData, flags, s.id, max) - if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil { + if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil { return 0, err } @@ -200,8 +203,9 @@ START: WAIT: var timeout <-chan time.Time - if !s.writeDeadline.IsZero() { - delay := s.writeDeadline.Sub(time.Now()) + writeDeadline := s.writeDeadline.Load().(time.Time) + if !writeDeadline.IsZero() { + delay := writeDeadline.Sub(time.Now()) timeout = time.After(delay) } select { @@ -238,18 +242,25 @@ func (s *Stream) sendWindowUpdate() error { // Determine the delta update max := s.session.config.MaxStreamWindowSize - delta := max - atomic.LoadUint32(&s.recvWindow) + var bufLen uint32 + s.recvLock.Lock() + if s.recvBuf != nil { + bufLen = uint32(s.recvBuf.Len()) + } + delta := (max - bufLen) - s.recvWindow // Determine the flags if any flags := s.sendFlags() // Check if we can omit the update if delta < (max/2) && flags == 0 { + s.recvLock.Unlock() return nil } // Update our window - atomic.AddUint32(&s.recvWindow, delta) + s.recvWindow += delta + s.recvLock.Unlock() // Send the header s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta) @@ -392,16 +403,18 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error { if length == 0 { return nil } - if remain := atomic.LoadUint32(&s.recvWindow); length > remain { - s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, remain, length) - return ErrRecvWindowExceeded - } // Wrap in a limited reader conn = &io.LimitedReader{R: conn, N: int64(length)} // Copy into buffer s.recvLock.Lock() + + if length > s.recvWindow { + s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvWindow, length) + return ErrRecvWindowExceeded + } + if s.recvBuf == nil { // Allocate the receive buffer just-in-time to fit the full data frame. // This way we can read in the whole packet without further allocations. @@ -414,7 +427,7 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error { } // Decrement the receive window - atomic.AddUint32(&s.recvWindow, ^uint32(length-1)) + s.recvWindow -= length s.recvLock.Unlock() // Unblock any readers @@ -435,13 +448,13 @@ func (s *Stream) SetDeadline(t time.Time) error { // SetReadDeadline sets the deadline for future Read calls. func (s *Stream) SetReadDeadline(t time.Time) error { - s.readDeadline = t + s.readDeadline.Store(t) return nil } // SetWriteDeadline sets the deadline for future Write calls func (s *Stream) SetWriteDeadline(t time.Time) error { - s.writeDeadline = t + s.writeDeadline.Store(t) return nil }