recvNotifyCh chan struct{}
sendNotifyCh chan struct{}
- readDeadline time.Time
- writeDeadline time.Time
+ readDeadline atomic.Value // time.Time
+ writeDeadline atomic.Value // time.Time
}
// newStream is used to construct a new stream within
recvNotifyCh: make(chan struct{}, 1),
sendNotifyCh: make(chan struct{}, 1),
}
+ s.readDeadline.Store(time.Time{})
+ s.writeDeadline.Store(time.Time{})
return s
}
WAIT:
var timeout <-chan time.Time
var timer *time.Timer
- if !s.readDeadline.IsZero() {
- delay := s.readDeadline.Sub(time.Now())
+ readDeadline := s.readDeadline.Load().(time.Time)
+ if !readDeadline.IsZero() {
+ delay := readDeadline.Sub(time.Now())
timer = time.NewTimer(delay)
timeout = timer.C
}
// Send the header
s.sendHdr.encode(typeData, flags, s.id, max)
- if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
+ if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
return 0, err
}
WAIT:
var timeout <-chan time.Time
- if !s.writeDeadline.IsZero() {
- delay := s.writeDeadline.Sub(time.Now())
+ writeDeadline := s.writeDeadline.Load().(time.Time)
+ if !writeDeadline.IsZero() {
+ delay := writeDeadline.Sub(time.Now())
timeout = time.After(delay)
}
select {
// Determine the delta update
max := s.session.config.MaxStreamWindowSize
- delta := max - atomic.LoadUint32(&s.recvWindow)
+ var bufLen uint32
+ s.recvLock.Lock()
+ if s.recvBuf != nil {
+ bufLen = uint32(s.recvBuf.Len())
+ }
+ delta := (max - bufLen) - s.recvWindow
// Determine the flags if any
flags := s.sendFlags()
// Check if we can omit the update
if delta < (max/2) && flags == 0 {
+ s.recvLock.Unlock()
return nil
}
// Update our window
- atomic.AddUint32(&s.recvWindow, delta)
+ s.recvWindow += delta
+ s.recvLock.Unlock()
// Send the header
s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
if length == 0 {
return nil
}
- if remain := atomic.LoadUint32(&s.recvWindow); length > remain {
- s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, remain, length)
- return ErrRecvWindowExceeded
- }
// Wrap in a limited reader
conn = &io.LimitedReader{R: conn, N: int64(length)}
// Copy into buffer
s.recvLock.Lock()
+
+ if length > s.recvWindow {
+ s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvWindow, length)
+ return ErrRecvWindowExceeded
+ }
+
if s.recvBuf == nil {
// Allocate the receive buffer just-in-time to fit the full data frame.
// This way we can read in the whole packet without further allocations.
}
// Decrement the receive window
- atomic.AddUint32(&s.recvWindow, ^uint32(length-1))
+ s.recvWindow -= length
s.recvLock.Unlock()
// Unblock any readers
// SetReadDeadline sets the deadline for future Read calls.
func (s *Stream) SetReadDeadline(t time.Time) error {
- s.readDeadline = t
+ s.readDeadline.Store(t)
return nil
}
// SetWriteDeadline sets the deadline for future Write calls
func (s *Stream) SetWriteDeadline(t time.Time) error {
- s.writeDeadline = t
+ s.writeDeadline.Store(t)
return nil
}