aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/github.com/hashicorp/yamux/stream.go
diff options
context:
space:
mode:
authorNathan Dench <ndenc2@gmail.com>2019-05-24 15:16:44 +1000
committerNathan Dench <ndenc2@gmail.com>2019-05-24 15:16:44 +1000
commit107c1cdb09c575aa2f61d97f48d8587eb6bada4c (patch)
treeca7d008643efc555c388baeaf1d986e0b6b3e28c /vendor/github.com/hashicorp/yamux/stream.go
parent844b5a68d8af4791755b8f0ad293cc99f5959183 (diff)
downloadterraform-provider-statuscake-107c1cdb09c575aa2f61d97f48d8587eb6bada4c.tar.gz
terraform-provider-statuscake-107c1cdb09c575aa2f61d97f48d8587eb6bada4c.tar.zst
terraform-provider-statuscake-107c1cdb09c575aa2f61d97f48d8587eb6bada4c.zip
Upgrade to 0.12
Diffstat (limited to 'vendor/github.com/hashicorp/yamux/stream.go')
-rw-r--r--vendor/github.com/hashicorp/yamux/stream.go45
1 files changed, 29 insertions, 16 deletions
diff --git a/vendor/github.com/hashicorp/yamux/stream.go b/vendor/github.com/hashicorp/yamux/stream.go
index d216e28..aa23919 100644
--- a/vendor/github.com/hashicorp/yamux/stream.go
+++ b/vendor/github.com/hashicorp/yamux/stream.go
@@ -47,8 +47,8 @@ type Stream struct {
47 recvNotifyCh chan struct{} 47 recvNotifyCh chan struct{}
48 sendNotifyCh chan struct{} 48 sendNotifyCh chan struct{}
49 49
50 readDeadline time.Time 50 readDeadline atomic.Value // time.Time
51 writeDeadline time.Time 51 writeDeadline atomic.Value // time.Time
52} 52}
53 53
54// newStream is used to construct a new stream within 54// newStream is used to construct a new stream within
@@ -67,6 +67,8 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
67 recvNotifyCh: make(chan struct{}, 1), 67 recvNotifyCh: make(chan struct{}, 1),
68 sendNotifyCh: make(chan struct{}, 1), 68 sendNotifyCh: make(chan struct{}, 1),
69 } 69 }
70 s.readDeadline.Store(time.Time{})
71 s.writeDeadline.Store(time.Time{})
70 return s 72 return s
71} 73}
72 74
@@ -122,8 +124,9 @@ START:
122WAIT: 124WAIT:
123 var timeout <-chan time.Time 125 var timeout <-chan time.Time
124 var timer *time.Timer 126 var timer *time.Timer
125 if !s.readDeadline.IsZero() { 127 readDeadline := s.readDeadline.Load().(time.Time)
126 delay := s.readDeadline.Sub(time.Now()) 128 if !readDeadline.IsZero() {
129 delay := readDeadline.Sub(time.Now())
127 timer = time.NewTimer(delay) 130 timer = time.NewTimer(delay)
128 timeout = timer.C 131 timeout = timer.C
129 } 132 }
@@ -188,7 +191,7 @@ START:
188 191
189 // Send the header 192 // Send the header
190 s.sendHdr.encode(typeData, flags, s.id, max) 193 s.sendHdr.encode(typeData, flags, s.id, max)
191 if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil { 194 if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
192 return 0, err 195 return 0, err
193 } 196 }
194 197
@@ -200,8 +203,9 @@ START:
200 203
201WAIT: 204WAIT:
202 var timeout <-chan time.Time 205 var timeout <-chan time.Time
203 if !s.writeDeadline.IsZero() { 206 writeDeadline := s.writeDeadline.Load().(time.Time)
204 delay := s.writeDeadline.Sub(time.Now()) 207 if !writeDeadline.IsZero() {
208 delay := writeDeadline.Sub(time.Now())
205 timeout = time.After(delay) 209 timeout = time.After(delay)
206 } 210 }
207 select { 211 select {
@@ -238,18 +242,25 @@ func (s *Stream) sendWindowUpdate() error {
238 242
239 // Determine the delta update 243 // Determine the delta update
240 max := s.session.config.MaxStreamWindowSize 244 max := s.session.config.MaxStreamWindowSize
241 delta := max - atomic.LoadUint32(&s.recvWindow) 245 var bufLen uint32
246 s.recvLock.Lock()
247 if s.recvBuf != nil {
248 bufLen = uint32(s.recvBuf.Len())
249 }
250 delta := (max - bufLen) - s.recvWindow
242 251
243 // Determine the flags if any 252 // Determine the flags if any
244 flags := s.sendFlags() 253 flags := s.sendFlags()
245 254
246 // Check if we can omit the update 255 // Check if we can omit the update
247 if delta < (max/2) && flags == 0 { 256 if delta < (max/2) && flags == 0 {
257 s.recvLock.Unlock()
248 return nil 258 return nil
249 } 259 }
250 260
251 // Update our window 261 // Update our window
252 atomic.AddUint32(&s.recvWindow, delta) 262 s.recvWindow += delta
263 s.recvLock.Unlock()
253 264
254 // Send the header 265 // Send the header
255 s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta) 266 s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
@@ -392,16 +403,18 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
392 if length == 0 { 403 if length == 0 {
393 return nil 404 return nil
394 } 405 }
395 if remain := atomic.LoadUint32(&s.recvWindow); length > remain {
396 s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, remain, length)
397 return ErrRecvWindowExceeded
398 }
399 406
400 // Wrap in a limited reader 407 // Wrap in a limited reader
401 conn = &io.LimitedReader{R: conn, N: int64(length)} 408 conn = &io.LimitedReader{R: conn, N: int64(length)}
402 409
403 // Copy into buffer 410 // Copy into buffer
404 s.recvLock.Lock() 411 s.recvLock.Lock()
412
413 if length > s.recvWindow {
414 s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvWindow, length)
415 return ErrRecvWindowExceeded
416 }
417
405 if s.recvBuf == nil { 418 if s.recvBuf == nil {
406 // Allocate the receive buffer just-in-time to fit the full data frame. 419 // Allocate the receive buffer just-in-time to fit the full data frame.
407 // This way we can read in the whole packet without further allocations. 420 // This way we can read in the whole packet without further allocations.
@@ -414,7 +427,7 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
414 } 427 }
415 428
416 // Decrement the receive window 429 // Decrement the receive window
417 atomic.AddUint32(&s.recvWindow, ^uint32(length-1)) 430 s.recvWindow -= length
418 s.recvLock.Unlock() 431 s.recvLock.Unlock()
419 432
420 // Unblock any readers 433 // Unblock any readers
@@ -435,13 +448,13 @@ func (s *Stream) SetDeadline(t time.Time) error {
435 448
436// SetReadDeadline sets the deadline for future Read calls. 449// SetReadDeadline sets the deadline for future Read calls.
437func (s *Stream) SetReadDeadline(t time.Time) error { 450func (s *Stream) SetReadDeadline(t time.Time) error {
438 s.readDeadline = t 451 s.readDeadline.Store(t)
439 return nil 452 return nil
440} 453}
441 454
442// SetWriteDeadline sets the deadline for future Write calls 455// SetWriteDeadline sets the deadline for future Write calls
443func (s *Stream) SetWriteDeadline(t time.Time) error { 456func (s *Stream) SetWriteDeadline(t time.Time) error {
444 s.writeDeadline = t 457 s.writeDeadline.Store(t)
445 return nil 458 return nil
446} 459}
447 460