14 streamInit streamState = iota
24 // Stream is used to represent a logical stream
41 controlHdrLock sync.Mutex
47 recvNotifyCh chan struct{}
48 sendNotifyCh chan struct{}
50 readDeadline time.Time
51 writeDeadline time.Time
54 // newStream is used to construct a new stream within
55 // a given session for an ID
56 func newStream(session *Session, id uint32, state streamState) *Stream {
61 controlHdr: header(make([]byte, headerSize)),
62 controlErr: make(chan error, 1),
63 sendHdr: header(make([]byte, headerSize)),
64 sendErr: make(chan error, 1),
65 recvWindow: initialStreamWindow,
66 sendWindow: initialStreamWindow,
67 recvNotifyCh: make(chan struct{}, 1),
68 sendNotifyCh: make(chan struct{}, 1),
73 // Session returns the associated stream session
74 func (s *Stream) Session() *Session {
78 // StreamID returns the ID of this stream
79 func (s *Stream) StreamID() uint32 {
83 // Read is used to read from the stream
84 func (s *Stream) Read(b []byte) (n int, err error) {
85 defer asyncNotify(s.recvNotifyCh)
89 case streamLocalClose:
91 case streamRemoteClose:
95 if s.recvBuf == nil || s.recvBuf.Len() == 0 {
103 return 0, ErrConnectionReset
107 // If there is no data available, block
109 if s.recvBuf == nil || s.recvBuf.Len() == 0 {
115 n, _ = s.recvBuf.Read(b)
118 // Send a window update potentially
119 err = s.sendWindowUpdate()
123 var timeout <-chan time.Time
124 var timer *time.Timer
125 if !s.readDeadline.IsZero() {
126 delay := s.readDeadline.Sub(time.Now())
127 timer = time.NewTimer(delay)
131 case <-s.recvNotifyCh:
141 // Write is used to write to the stream
142 func (s *Stream) Write(b []byte) (n int, err error) {
144 defer s.sendLock.Unlock()
147 n, err := s.write(b[total:])
156 // write is used to write to the stream, may return on
158 func (s *Stream) write(b []byte) (n int, err error) {
165 case streamLocalClose:
169 return 0, ErrStreamClosed
172 return 0, ErrConnectionReset
176 // If there is no data available, block
177 window := atomic.LoadUint32(&s.sendWindow)
182 // Determine the flags if any
183 flags = s.sendFlags()
185 // Send up to our send window
186 max = min(window, uint32(len(b)))
187 body = bytes.NewReader(b[:max])
190 s.sendHdr.encode(typeData, flags, s.id, max)
191 if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
195 // Reduce our send window
196 atomic.AddUint32(&s.sendWindow, ^uint32(max-1))
202 var timeout <-chan time.Time
203 if !s.writeDeadline.IsZero() {
204 delay := s.writeDeadline.Sub(time.Now())
205 timeout = time.After(delay)
208 case <-s.sendNotifyCh:
216 // sendFlags determines any flags that are appropriate
217 // based on the current stream state
218 func (s *Stream) sendFlags() uint16 {
220 defer s.stateLock.Unlock()
225 s.state = streamSYNSent
226 case streamSYNReceived:
228 s.state = streamEstablished
233 // sendWindowUpdate potentially sends a window update enabling
234 // further writes to take place. Must be invoked with the lock.
235 func (s *Stream) sendWindowUpdate() error {
236 s.controlHdrLock.Lock()
237 defer s.controlHdrLock.Unlock()
239 // Determine the delta update
240 max := s.session.config.MaxStreamWindowSize
241 delta := max - atomic.LoadUint32(&s.recvWindow)
243 // Determine the flags if any
244 flags := s.sendFlags()
246 // Check if we can omit the update
247 if delta < (max/2) && flags == 0 {
252 atomic.AddUint32(&s.recvWindow, delta)
255 s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
256 if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil {
262 // sendClose is used to send a FIN
263 func (s *Stream) sendClose() error {
264 s.controlHdrLock.Lock()
265 defer s.controlHdrLock.Unlock()
267 flags := s.sendFlags()
269 s.controlHdr.encode(typeWindowUpdate, flags, s.id, 0)
270 if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil {
276 // Close is used to close the stream
277 func (s *Stream) Close() error {
281 // Opened means we need to signal a close
284 case streamSYNReceived:
286 case streamEstablished:
287 s.state = streamLocalClose
290 case streamLocalClose:
291 case streamRemoteClose:
292 s.state = streamClosed
299 panic("unhandled state")
308 s.session.closeStream(s.id)
313 // forceClose is used for when the session is exiting
314 func (s *Stream) forceClose() {
316 s.state = streamClosed
321 // processFlags is used to update the state of the stream
322 // based on set flags, if any. Lock must be held
323 func (s *Stream) processFlags(flags uint16) error {
324 // Close the stream without holding the state lock
328 s.session.closeStream(s.id)
333 defer s.stateLock.Unlock()
334 if flags&flagACK == flagACK {
335 if s.state == streamSYNSent {
336 s.state = streamEstablished
338 s.session.establishStream(s.id)
340 if flags&flagFIN == flagFIN {
344 case streamSYNReceived:
346 case streamEstablished:
347 s.state = streamRemoteClose
349 case streamLocalClose:
350 s.state = streamClosed
354 s.session.logger.Printf("[ERR] yamux: unexpected FIN flag in state %d", s.state)
355 return ErrUnexpectedFlag
358 if flags&flagRST == flagRST {
359 s.state = streamReset
366 // notifyWaiting notifies all the waiting channels
367 func (s *Stream) notifyWaiting() {
368 asyncNotify(s.recvNotifyCh)
369 asyncNotify(s.sendNotifyCh)
372 // incrSendWindow updates the size of our send window
373 func (s *Stream) incrSendWindow(hdr header, flags uint16) error {
374 if err := s.processFlags(flags); err != nil {
378 // Increase window, unblock a sender
379 atomic.AddUint32(&s.sendWindow, hdr.Length())
380 asyncNotify(s.sendNotifyCh)
384 // readData is used to handle a data frame
385 func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
386 if err := s.processFlags(flags); err != nil {
390 // Check that our recv window is not exceeded
391 length := hdr.Length()
395 if remain := atomic.LoadUint32(&s.recvWindow); length > remain {
396 s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, remain, length)
397 return ErrRecvWindowExceeded
400 // Wrap in a limited reader
401 conn = &io.LimitedReader{R: conn, N: int64(length)}
405 if s.recvBuf == nil {
406 // Allocate the receive buffer just-in-time to fit the full data frame.
407 // This way we can read in the whole packet without further allocations.
408 s.recvBuf = bytes.NewBuffer(make([]byte, 0, length))
410 if _, err := io.Copy(s.recvBuf, conn); err != nil {
411 s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err)
416 // Decrement the receive window
417 atomic.AddUint32(&s.recvWindow, ^uint32(length-1))
420 // Unblock any readers
421 asyncNotify(s.recvNotifyCh)
425 // SetDeadline sets the read and write deadlines
426 func (s *Stream) SetDeadline(t time.Time) error {
427 if err := s.SetReadDeadline(t); err != nil {
430 if err := s.SetWriteDeadline(t); err != nil {
436 // SetReadDeadline sets the deadline for future Read calls.
437 func (s *Stream) SetReadDeadline(t time.Time) error {
442 // SetWriteDeadline sets the deadline for future Write calls
443 func (s *Stream) SetWriteDeadline(t time.Time) error {
448 // Shrink is used to compact the amount of buffers utilized
449 // This is useful when using Yamux in a connection pool to reduce
450 // the idle memory utilization.
451 func (s *Stream) Shrink() {
453 if s.recvBuf != nil && s.recvBuf.Len() == 0 {