}
}
+// CloseChan returns a read-only channel which is closed as
+// soon as the session is closed.
+func (s *Session) CloseChan() <-chan struct{} {
+ return s.shutdownCh
+}
+
// NumStreams returns the number of currently open streams
func (s *Session) NumStreams() int {
s.streamLock.Lock()
case <-time.After(s.config.KeepAliveInterval):
_, err := s.Ping()
if err != nil {
- s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
- s.exitErr(ErrKeepAliveTimeout)
+ if err != ErrSessionShutdown {
+ s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
+ s.exitErr(ErrKeepAliveTimeout)
+ }
return
}
case <-s.shutdownCh:
// potential shutdown. Since there's the expectation that sends can happen
// in a timely manner, we enforce the connection write timeout here.
func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error {
- timer := time.NewTimer(s.config.ConnectionWriteTimeout)
- defer timer.Stop()
+ t := timerPool.Get()
+ timer := t.(*time.Timer)
+ timer.Reset(s.config.ConnectionWriteTimeout)
+ defer func() {
+ timer.Stop()
+ select {
+ case <-timer.C:
+ default:
+ }
+ timerPool.Put(t)
+ }()
ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
select {
// the send happens right here, we enforce the connection write timeout if we
// can't queue the header to be sent.
func (s *Session) sendNoWait(hdr header) error {
- timer := time.NewTimer(s.config.ConnectionWriteTimeout)
- defer timer.Stop()
+ t := timerPool.Get()
+ timer := t.(*time.Timer)
+ timer.Reset(s.config.ConnectionWriteTimeout)
+ defer func() {
+ timer.Stop()
+ select {
+ case <-timer.C:
+ default:
+ }
+ timerPool.Put(t)
+ }()
select {
case s.sendCh <- sendReady{Hdr: hdr}:
}
}
+// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type
+var (
+ handlers = []func(*Session, header) error{
+ typeData: (*Session).handleStreamMessage,
+ typeWindowUpdate: (*Session).handleStreamMessage,
+ typePing: (*Session).handlePing,
+ typeGoAway: (*Session).handleGoAway,
+ }
+)
+
// recvLoop continues to receive data until a fatal error is encountered
func (s *Session) recvLoop() error {
defer close(s.recvDoneCh)
hdr := header(make([]byte, headerSize))
- var handler func(header) error
for {
// Read the header
if _, err := io.ReadFull(s.bufRead, hdr); err != nil {
return ErrInvalidVersion
}
- // Switch on the type
- switch hdr.MsgType() {
- case typeData:
- handler = s.handleStreamMessage
- case typeWindowUpdate:
- handler = s.handleStreamMessage
- case typeGoAway:
- handler = s.handleGoAway
- case typePing:
- handler = s.handlePing
- default:
+ mt := hdr.MsgType()
+ if mt < typeData || mt > typeGoAway {
return ErrInvalidMsgType
}
- // Invoke the handler
- if err := handler(hdr); err != nil {
+ if err := handlers[mt](s, hdr); err != nil {
return err
}
}