aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/github.com/hashicorp/yamux/session.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hashicorp/yamux/session.go')
-rw-r--r--vendor/github.com/hashicorp/yamux/session.go65
1 files changed, 45 insertions, 20 deletions
diff --git a/vendor/github.com/hashicorp/yamux/session.go b/vendor/github.com/hashicorp/yamux/session.go
index e179818..32ba02e 100644
--- a/vendor/github.com/hashicorp/yamux/session.go
+++ b/vendor/github.com/hashicorp/yamux/session.go
@@ -123,6 +123,12 @@ func (s *Session) IsClosed() bool {
123 } 123 }
124} 124}
125 125
126// CloseChan returns a read-only channel which is closed as
127// soon as the session is closed.
128func (s *Session) CloseChan() <-chan struct{} {
129 return s.shutdownCh
130}
131
126// NumStreams returns the number of currently open streams 132// NumStreams returns the number of currently open streams
127func (s *Session) NumStreams() int { 133func (s *Session) NumStreams() int {
128 s.streamLock.Lock() 134 s.streamLock.Lock()
@@ -303,8 +309,10 @@ func (s *Session) keepalive() {
303 case <-time.After(s.config.KeepAliveInterval): 309 case <-time.After(s.config.KeepAliveInterval):
304 _, err := s.Ping() 310 _, err := s.Ping()
305 if err != nil { 311 if err != nil {
306 s.logger.Printf("[ERR] yamux: keepalive failed: %v", err) 312 if err != ErrSessionShutdown {
307 s.exitErr(ErrKeepAliveTimeout) 313 s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
314 s.exitErr(ErrKeepAliveTimeout)
315 }
308 return 316 return
309 } 317 }
310 case <-s.shutdownCh: 318 case <-s.shutdownCh:
@@ -323,8 +331,17 @@ func (s *Session) waitForSend(hdr header, body io.Reader) error {
323// potential shutdown. Since there's the expectation that sends can happen 331// potential shutdown. Since there's the expectation that sends can happen
324// in a timely manner, we enforce the connection write timeout here. 332// in a timely manner, we enforce the connection write timeout here.
325func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error { 333func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error {
326 timer := time.NewTimer(s.config.ConnectionWriteTimeout) 334 t := timerPool.Get()
327 defer timer.Stop() 335 timer := t.(*time.Timer)
336 timer.Reset(s.config.ConnectionWriteTimeout)
337 defer func() {
338 timer.Stop()
339 select {
340 case <-timer.C:
341 default:
342 }
343 timerPool.Put(t)
344 }()
328 345
329 ready := sendReady{Hdr: hdr, Body: body, Err: errCh} 346 ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
330 select { 347 select {
@@ -349,8 +366,17 @@ func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) e
349// the send happens right here, we enforce the connection write timeout if we 366// the send happens right here, we enforce the connection write timeout if we
350// can't queue the header to be sent. 367// can't queue the header to be sent.
351func (s *Session) sendNoWait(hdr header) error { 368func (s *Session) sendNoWait(hdr header) error {
352 timer := time.NewTimer(s.config.ConnectionWriteTimeout) 369 t := timerPool.Get()
353 defer timer.Stop() 370 timer := t.(*time.Timer)
371 timer.Reset(s.config.ConnectionWriteTimeout)
372 defer func() {
373 timer.Stop()
374 select {
375 case <-timer.C:
376 default:
377 }
378 timerPool.Put(t)
379 }()
354 380
355 select { 381 select {
356 case s.sendCh <- sendReady{Hdr: hdr}: 382 case s.sendCh <- sendReady{Hdr: hdr}:
@@ -408,11 +434,20 @@ func (s *Session) recv() {
408 } 434 }
409} 435}
410 436
437// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type
438var (
439 handlers = []func(*Session, header) error{
440 typeData: (*Session).handleStreamMessage,
441 typeWindowUpdate: (*Session).handleStreamMessage,
442 typePing: (*Session).handlePing,
443 typeGoAway: (*Session).handleGoAway,
444 }
445)
446
411// recvLoop continues to receive data until a fatal error is encountered 447// recvLoop continues to receive data until a fatal error is encountered
412func (s *Session) recvLoop() error { 448func (s *Session) recvLoop() error {
413 defer close(s.recvDoneCh) 449 defer close(s.recvDoneCh)
414 hdr := header(make([]byte, headerSize)) 450 hdr := header(make([]byte, headerSize))
415 var handler func(header) error
416 for { 451 for {
417 // Read the header 452 // Read the header
418 if _, err := io.ReadFull(s.bufRead, hdr); err != nil { 453 if _, err := io.ReadFull(s.bufRead, hdr); err != nil {
@@ -428,22 +463,12 @@ func (s *Session) recvLoop() error {
428 return ErrInvalidVersion 463 return ErrInvalidVersion
429 } 464 }
430 465
431 // Switch on the type 466 mt := hdr.MsgType()
432 switch hdr.MsgType() { 467 if mt < typeData || mt > typeGoAway {
433 case typeData:
434 handler = s.handleStreamMessage
435 case typeWindowUpdate:
436 handler = s.handleStreamMessage
437 case typeGoAway:
438 handler = s.handleGoAway
439 case typePing:
440 handler = s.handlePing
441 default:
442 return ErrInvalidMsgType 468 return ErrInvalidMsgType
443 } 469 }
444 470
445 // Invoke the handler 471 if err := handlers[mt](s, hdr); err != nil {
446 if err := handler(hdr); err != nil {
447 return err 472 return err
448 } 473 }
449 } 474 }