14 streamInit streamState = iota
24 // Stream is used to represent a logical stream
41 controlHdrLock sync.Mutex
47 recvNotifyCh chan struct{}
48 sendNotifyCh chan struct{}
50 readDeadline atomic.Value // time.Time
51 writeDeadline atomic.Value // time.Time
54 // newStream is used to construct a new stream within
55 // a given session for an ID
56 func newStream(session *Session, id uint32, state streamState) *Stream {
61 controlHdr: header(make([]byte, headerSize)),
62 controlErr: make(chan error, 1),
63 sendHdr: header(make([]byte, headerSize)),
64 sendErr: make(chan error, 1),
65 recvWindow: initialStreamWindow,
66 sendWindow: initialStreamWindow,
67 recvNotifyCh: make(chan struct{}, 1),
68 sendNotifyCh: make(chan struct{}, 1),
70 s.readDeadline.Store(time.Time{})
71 s.writeDeadline.Store(time.Time{})
75 // Session returns the associated stream session
76 func (s *Stream) Session() *Session {
80 // StreamID returns the ID of this stream
81 func (s *Stream) StreamID() uint32 {
85 // Read is used to read from the stream
86 func (s *Stream) Read(b []byte) (n int, err error) {
87 defer asyncNotify(s.recvNotifyCh)
91 case streamLocalClose:
93 case streamRemoteClose:
97 if s.recvBuf == nil || s.recvBuf.Len() == 0 {
105 return 0, ErrConnectionReset
109 // If there is no data available, block
111 if s.recvBuf == nil || s.recvBuf.Len() == 0 {
117 n, _ = s.recvBuf.Read(b)
120 // Send a window update potentially
121 err = s.sendWindowUpdate()
125 var timeout <-chan time.Time
126 var timer *time.Timer
127 readDeadline := s.readDeadline.Load().(time.Time)
128 if !readDeadline.IsZero() {
129 delay := readDeadline.Sub(time.Now())
130 timer = time.NewTimer(delay)
134 case <-s.recvNotifyCh:
144 // Write is used to write to the stream
145 func (s *Stream) Write(b []byte) (n int, err error) {
147 defer s.sendLock.Unlock()
150 n, err := s.write(b[total:])
159 // write is used to write to the stream, may return on
161 func (s *Stream) write(b []byte) (n int, err error) {
168 case streamLocalClose:
172 return 0, ErrStreamClosed
175 return 0, ErrConnectionReset
179 // If there is no data available, block
180 window := atomic.LoadUint32(&s.sendWindow)
185 // Determine the flags if any
186 flags = s.sendFlags()
188 // Send up to our send window
189 max = min(window, uint32(len(b)))
190 body = bytes.NewReader(b[:max])
193 s.sendHdr.encode(typeData, flags, s.id, max)
194 if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
198 // Reduce our send window
199 atomic.AddUint32(&s.sendWindow, ^uint32(max-1))
205 var timeout <-chan time.Time
206 writeDeadline := s.writeDeadline.Load().(time.Time)
207 if !writeDeadline.IsZero() {
208 delay := writeDeadline.Sub(time.Now())
209 timeout = time.After(delay)
212 case <-s.sendNotifyCh:
220 // sendFlags determines any flags that are appropriate
221 // based on the current stream state
222 func (s *Stream) sendFlags() uint16 {
224 defer s.stateLock.Unlock()
229 s.state = streamSYNSent
230 case streamSYNReceived:
232 s.state = streamEstablished
237 // sendWindowUpdate potentially sends a window update enabling
238 // further writes to take place. Must be invoked with the lock.
239 func (s *Stream) sendWindowUpdate() error {
240 s.controlHdrLock.Lock()
241 defer s.controlHdrLock.Unlock()
243 // Determine the delta update
244 max := s.session.config.MaxStreamWindowSize
247 if s.recvBuf != nil {
248 bufLen = uint32(s.recvBuf.Len())
250 delta := (max - bufLen) - s.recvWindow
252 // Determine the flags if any
253 flags := s.sendFlags()
255 // Check if we can omit the update
256 if delta < (max/2) && flags == 0 {
262 s.recvWindow += delta
266 s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
267 if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil {
273 // sendClose is used to send a FIN
274 func (s *Stream) sendClose() error {
275 s.controlHdrLock.Lock()
276 defer s.controlHdrLock.Unlock()
278 flags := s.sendFlags()
280 s.controlHdr.encode(typeWindowUpdate, flags, s.id, 0)
281 if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil {
287 // Close is used to close the stream
288 func (s *Stream) Close() error {
292 // Opened means we need to signal a close
295 case streamSYNReceived:
297 case streamEstablished:
298 s.state = streamLocalClose
301 case streamLocalClose:
302 case streamRemoteClose:
303 s.state = streamClosed
310 panic("unhandled state")
319 s.session.closeStream(s.id)
324 // forceClose is used for when the session is exiting
325 func (s *Stream) forceClose() {
327 s.state = streamClosed
332 // processFlags is used to update the state of the stream
333 // based on set flags, if any. Lock must be held
334 func (s *Stream) processFlags(flags uint16) error {
335 // Close the stream without holding the state lock
339 s.session.closeStream(s.id)
344 defer s.stateLock.Unlock()
345 if flags&flagACK == flagACK {
346 if s.state == streamSYNSent {
347 s.state = streamEstablished
349 s.session.establishStream(s.id)
351 if flags&flagFIN == flagFIN {
355 case streamSYNReceived:
357 case streamEstablished:
358 s.state = streamRemoteClose
360 case streamLocalClose:
361 s.state = streamClosed
365 s.session.logger.Printf("[ERR] yamux: unexpected FIN flag in state %d", s.state)
366 return ErrUnexpectedFlag
369 if flags&flagRST == flagRST {
370 s.state = streamReset
377 // notifyWaiting notifies all the waiting channels
378 func (s *Stream) notifyWaiting() {
379 asyncNotify(s.recvNotifyCh)
380 asyncNotify(s.sendNotifyCh)
383 // incrSendWindow updates the size of our send window
384 func (s *Stream) incrSendWindow(hdr header, flags uint16) error {
385 if err := s.processFlags(flags); err != nil {
389 // Increase window, unblock a sender
390 atomic.AddUint32(&s.sendWindow, hdr.Length())
391 asyncNotify(s.sendNotifyCh)
395 // readData is used to handle a data frame
396 func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
397 if err := s.processFlags(flags); err != nil {
401 // Check that our recv window is not exceeded
402 length := hdr.Length()
407 // Wrap in a limited reader
408 conn = &io.LimitedReader{R: conn, N: int64(length)}
413 if length > s.recvWindow {
414 s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvWindow, length)
415 return ErrRecvWindowExceeded
418 if s.recvBuf == nil {
419 // Allocate the receive buffer just-in-time to fit the full data frame.
420 // This way we can read in the whole packet without further allocations.
421 s.recvBuf = bytes.NewBuffer(make([]byte, 0, length))
423 if _, err := io.Copy(s.recvBuf, conn); err != nil {
424 s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err)
429 // Decrement the receive window
430 s.recvWindow -= length
433 // Unblock any readers
434 asyncNotify(s.recvNotifyCh)
438 // SetDeadline sets the read and write deadlines
439 func (s *Stream) SetDeadline(t time.Time) error {
440 if err := s.SetReadDeadline(t); err != nil {
443 if err := s.SetWriteDeadline(t); err != nil {
449 // SetReadDeadline sets the deadline for future Read calls.
450 func (s *Stream) SetReadDeadline(t time.Time) error {
451 s.readDeadline.Store(t)
455 // SetWriteDeadline sets the deadline for future Write calls
456 func (s *Stream) SetWriteDeadline(t time.Time) error {
457 s.writeDeadline.Store(t)
461 // Shrink is used to compact the amount of buffers utilized
462 // This is useful when using Yamux in a connection pool to reduce
463 // the idle memory utilization.
464 func (s *Stream) Shrink() {
466 if s.recvBuf != nil && s.recvBuf.Len() == 0 {