diff options
author | Nathan Dench <ndenc2@gmail.com> | 2019-05-24 15:16:44 +1000 |
---|---|---|
committer | Nathan Dench <ndenc2@gmail.com> | 2019-05-24 15:16:44 +1000 |
commit | 107c1cdb09c575aa2f61d97f48d8587eb6bada4c (patch) | |
tree | ca7d008643efc555c388baeaf1d986e0b6b3e28c /vendor/github.com/hashicorp/yamux/stream.go | |
parent | 844b5a68d8af4791755b8f0ad293cc99f5959183 (diff) | |
download | terraform-provider-statuscake-107c1cdb09c575aa2f61d97f48d8587eb6bada4c.tar.gz terraform-provider-statuscake-107c1cdb09c575aa2f61d97f48d8587eb6bada4c.tar.zst terraform-provider-statuscake-107c1cdb09c575aa2f61d97f48d8587eb6bada4c.zip |
Upgrade to 0.12
Diffstat (limited to 'vendor/github.com/hashicorp/yamux/stream.go')
-rw-r--r-- | vendor/github.com/hashicorp/yamux/stream.go | 45 |
1 files changed, 29 insertions, 16 deletions
diff --git a/vendor/github.com/hashicorp/yamux/stream.go b/vendor/github.com/hashicorp/yamux/stream.go index d216e28..aa23919 100644 --- a/vendor/github.com/hashicorp/yamux/stream.go +++ b/vendor/github.com/hashicorp/yamux/stream.go | |||
@@ -47,8 +47,8 @@ type Stream struct { | |||
47 | recvNotifyCh chan struct{} | 47 | recvNotifyCh chan struct{} |
48 | sendNotifyCh chan struct{} | 48 | sendNotifyCh chan struct{} |
49 | 49 | ||
50 | readDeadline time.Time | 50 | readDeadline atomic.Value // time.Time |
51 | writeDeadline time.Time | 51 | writeDeadline atomic.Value // time.Time |
52 | } | 52 | } |
53 | 53 | ||
54 | // newStream is used to construct a new stream within | 54 | // newStream is used to construct a new stream within |
@@ -67,6 +67,8 @@ func newStream(session *Session, id uint32, state streamState) *Stream { | |||
67 | recvNotifyCh: make(chan struct{}, 1), | 67 | recvNotifyCh: make(chan struct{}, 1), |
68 | sendNotifyCh: make(chan struct{}, 1), | 68 | sendNotifyCh: make(chan struct{}, 1), |
69 | } | 69 | } |
70 | s.readDeadline.Store(time.Time{}) | ||
71 | s.writeDeadline.Store(time.Time{}) | ||
70 | return s | 72 | return s |
71 | } | 73 | } |
72 | 74 | ||
@@ -122,8 +124,9 @@ START: | |||
122 | WAIT: | 124 | WAIT: |
123 | var timeout <-chan time.Time | 125 | var timeout <-chan time.Time |
124 | var timer *time.Timer | 126 | var timer *time.Timer |
125 | if !s.readDeadline.IsZero() { | 127 | readDeadline := s.readDeadline.Load().(time.Time) |
126 | delay := s.readDeadline.Sub(time.Now()) | 128 | if !readDeadline.IsZero() { |
129 | delay := readDeadline.Sub(time.Now()) | ||
127 | timer = time.NewTimer(delay) | 130 | timer = time.NewTimer(delay) |
128 | timeout = timer.C | 131 | timeout = timer.C |
129 | } | 132 | } |
@@ -188,7 +191,7 @@ START: | |||
188 | 191 | ||
189 | // Send the header | 192 | // Send the header |
190 | s.sendHdr.encode(typeData, flags, s.id, max) | 193 | s.sendHdr.encode(typeData, flags, s.id, max) |
191 | if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil { | 194 | if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil { |
192 | return 0, err | 195 | return 0, err |
193 | } | 196 | } |
194 | 197 | ||
@@ -200,8 +203,9 @@ START: | |||
200 | 203 | ||
201 | WAIT: | 204 | WAIT: |
202 | var timeout <-chan time.Time | 205 | var timeout <-chan time.Time |
203 | if !s.writeDeadline.IsZero() { | 206 | writeDeadline := s.writeDeadline.Load().(time.Time) |
204 | delay := s.writeDeadline.Sub(time.Now()) | 207 | if !writeDeadline.IsZero() { |
208 | delay := writeDeadline.Sub(time.Now()) | ||
205 | timeout = time.After(delay) | 209 | timeout = time.After(delay) |
206 | } | 210 | } |
207 | select { | 211 | select { |
@@ -238,18 +242,25 @@ func (s *Stream) sendWindowUpdate() error { | |||
238 | 242 | ||
239 | // Determine the delta update | 243 | // Determine the delta update |
240 | max := s.session.config.MaxStreamWindowSize | 244 | max := s.session.config.MaxStreamWindowSize |
241 | delta := max - atomic.LoadUint32(&s.recvWindow) | 245 | var bufLen uint32 |
246 | s.recvLock.Lock() | ||
247 | if s.recvBuf != nil { | ||
248 | bufLen = uint32(s.recvBuf.Len()) | ||
249 | } | ||
250 | delta := (max - bufLen) - s.recvWindow | ||
242 | 251 | ||
243 | // Determine the flags if any | 252 | // Determine the flags if any |
244 | flags := s.sendFlags() | 253 | flags := s.sendFlags() |
245 | 254 | ||
246 | // Check if we can omit the update | 255 | // Check if we can omit the update |
247 | if delta < (max/2) && flags == 0 { | 256 | if delta < (max/2) && flags == 0 { |
257 | s.recvLock.Unlock() | ||
248 | return nil | 258 | return nil |
249 | } | 259 | } |
250 | 260 | ||
251 | // Update our window | 261 | // Update our window |
252 | atomic.AddUint32(&s.recvWindow, delta) | 262 | s.recvWindow += delta |
263 | s.recvLock.Unlock() | ||
253 | 264 | ||
254 | // Send the header | 265 | // Send the header |
255 | s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta) | 266 | s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta) |
@@ -392,16 +403,18 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error { | |||
392 | if length == 0 { | 403 | if length == 0 { |
393 | return nil | 404 | return nil |
394 | } | 405 | } |
395 | if remain := atomic.LoadUint32(&s.recvWindow); length > remain { | ||
396 | s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, remain, length) | ||
397 | return ErrRecvWindowExceeded | ||
398 | } | ||
399 | 406 | ||
400 | // Wrap in a limited reader | 407 | // Wrap in a limited reader |
401 | conn = &io.LimitedReader{R: conn, N: int64(length)} | 408 | conn = &io.LimitedReader{R: conn, N: int64(length)} |
402 | 409 | ||
403 | // Copy into buffer | 410 | // Copy into buffer |
404 | s.recvLock.Lock() | 411 | s.recvLock.Lock() |
412 | |||
413 | if length > s.recvWindow { | ||
414 | s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvWindow, length) | ||
415 | return ErrRecvWindowExceeded | ||
416 | } | ||
417 | |||
405 | if s.recvBuf == nil { | 418 | if s.recvBuf == nil { |
406 | // Allocate the receive buffer just-in-time to fit the full data frame. | 419 | // Allocate the receive buffer just-in-time to fit the full data frame. |
407 | // This way we can read in the whole packet without further allocations. | 420 | // This way we can read in the whole packet without further allocations. |
@@ -414,7 +427,7 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error { | |||
414 | } | 427 | } |
415 | 428 | ||
416 | // Decrement the receive window | 429 | // Decrement the receive window |
417 | atomic.AddUint32(&s.recvWindow, ^uint32(length-1)) | 430 | s.recvWindow -= length |
418 | s.recvLock.Unlock() | 431 | s.recvLock.Unlock() |
419 | 432 | ||
420 | // Unblock any readers | 433 | // Unblock any readers |
@@ -435,13 +448,13 @@ func (s *Stream) SetDeadline(t time.Time) error { | |||
435 | 448 | ||
436 | // SetReadDeadline sets the deadline for future Read calls. | 449 | // SetReadDeadline sets the deadline for future Read calls. |
437 | func (s *Stream) SetReadDeadline(t time.Time) error { | 450 | func (s *Stream) SetReadDeadline(t time.Time) error { |
438 | s.readDeadline = t | 451 | s.readDeadline.Store(t) |
439 | return nil | 452 | return nil |
440 | } | 453 | } |
441 | 454 | ||
442 | // SetWriteDeadline sets the deadline for future Write calls | 455 | // SetWriteDeadline sets the deadline for future Write calls |
443 | func (s *Stream) SetWriteDeadline(t time.Time) error { | 456 | func (s *Stream) SetWriteDeadline(t time.Time) error { |
444 | s.writeDeadline = t | 457 | s.writeDeadline.Store(t) |
445 | return nil | 458 | return nil |
446 | } | 459 | } |
447 | 460 | ||