diff options
Diffstat (limited to 'vendor/github.com/hashicorp/yamux/session.go')
-rw-r--r-- | vendor/github.com/hashicorp/yamux/session.go | 65 |
1 files changed, 45 insertions, 20 deletions
diff --git a/vendor/github.com/hashicorp/yamux/session.go b/vendor/github.com/hashicorp/yamux/session.go index e179818..32ba02e 100644 --- a/vendor/github.com/hashicorp/yamux/session.go +++ b/vendor/github.com/hashicorp/yamux/session.go | |||
@@ -123,6 +123,12 @@ func (s *Session) IsClosed() bool { | |||
123 | } | 123 | } |
124 | } | 124 | } |
125 | 125 | ||
126 | // CloseChan returns a read-only channel which is closed as | ||
127 | // soon as the session is closed. | ||
128 | func (s *Session) CloseChan() <-chan struct{} { | ||
129 | return s.shutdownCh | ||
130 | } | ||
131 | |||
126 | // NumStreams returns the number of currently open streams | 132 | // NumStreams returns the number of currently open streams |
127 | func (s *Session) NumStreams() int { | 133 | func (s *Session) NumStreams() int { |
128 | s.streamLock.Lock() | 134 | s.streamLock.Lock() |
@@ -303,8 +309,10 @@ func (s *Session) keepalive() { | |||
303 | case <-time.After(s.config.KeepAliveInterval): | 309 | case <-time.After(s.config.KeepAliveInterval): |
304 | _, err := s.Ping() | 310 | _, err := s.Ping() |
305 | if err != nil { | 311 | if err != nil { |
306 | s.logger.Printf("[ERR] yamux: keepalive failed: %v", err) | 312 | if err != ErrSessionShutdown { |
307 | s.exitErr(ErrKeepAliveTimeout) | 313 | s.logger.Printf("[ERR] yamux: keepalive failed: %v", err) |
314 | s.exitErr(ErrKeepAliveTimeout) | ||
315 | } | ||
308 | return | 316 | return |
309 | } | 317 | } |
310 | case <-s.shutdownCh: | 318 | case <-s.shutdownCh: |
@@ -323,8 +331,17 @@ func (s *Session) waitForSend(hdr header, body io.Reader) error { | |||
323 | // potential shutdown. Since there's the expectation that sends can happen | 331 | // potential shutdown. Since there's the expectation that sends can happen |
324 | // in a timely manner, we enforce the connection write timeout here. | 332 | // in a timely manner, we enforce the connection write timeout here. |
325 | func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error { | 333 | func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error { |
326 | timer := time.NewTimer(s.config.ConnectionWriteTimeout) | 334 | t := timerPool.Get() |
327 | defer timer.Stop() | 335 | timer := t.(*time.Timer) |
336 | timer.Reset(s.config.ConnectionWriteTimeout) | ||
337 | defer func() { | ||
338 | timer.Stop() | ||
339 | select { | ||
340 | case <-timer.C: | ||
341 | default: | ||
342 | } | ||
343 | timerPool.Put(t) | ||
344 | }() | ||
328 | 345 | ||
329 | ready := sendReady{Hdr: hdr, Body: body, Err: errCh} | 346 | ready := sendReady{Hdr: hdr, Body: body, Err: errCh} |
330 | select { | 347 | select { |
@@ -349,8 +366,17 @@ func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) e | |||
349 | // the send happens right here, we enforce the connection write timeout if we | 366 | // the send happens right here, we enforce the connection write timeout if we |
350 | // can't queue the header to be sent. | 367 | // can't queue the header to be sent. |
351 | func (s *Session) sendNoWait(hdr header) error { | 368 | func (s *Session) sendNoWait(hdr header) error { |
352 | timer := time.NewTimer(s.config.ConnectionWriteTimeout) | 369 | t := timerPool.Get() |
353 | defer timer.Stop() | 370 | timer := t.(*time.Timer) |
371 | timer.Reset(s.config.ConnectionWriteTimeout) | ||
372 | defer func() { | ||
373 | timer.Stop() | ||
374 | select { | ||
375 | case <-timer.C: | ||
376 | default: | ||
377 | } | ||
378 | timerPool.Put(t) | ||
379 | }() | ||
354 | 380 | ||
355 | select { | 381 | select { |
356 | case s.sendCh <- sendReady{Hdr: hdr}: | 382 | case s.sendCh <- sendReady{Hdr: hdr}: |
@@ -408,11 +434,20 @@ func (s *Session) recv() { | |||
408 | } | 434 | } |
409 | } | 435 | } |
410 | 436 | ||
437 | // Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type | ||
438 | var ( | ||
439 | handlers = []func(*Session, header) error{ | ||
440 | typeData: (*Session).handleStreamMessage, | ||
441 | typeWindowUpdate: (*Session).handleStreamMessage, | ||
442 | typePing: (*Session).handlePing, | ||
443 | typeGoAway: (*Session).handleGoAway, | ||
444 | } | ||
445 | ) | ||
446 | |||
411 | // recvLoop continues to receive data until a fatal error is encountered | 447 | // recvLoop continues to receive data until a fatal error is encountered |
412 | func (s *Session) recvLoop() error { | 448 | func (s *Session) recvLoop() error { |
413 | defer close(s.recvDoneCh) | 449 | defer close(s.recvDoneCh) |
414 | hdr := header(make([]byte, headerSize)) | 450 | hdr := header(make([]byte, headerSize)) |
415 | var handler func(header) error | ||
416 | for { | 451 | for { |
417 | // Read the header | 452 | // Read the header |
418 | if _, err := io.ReadFull(s.bufRead, hdr); err != nil { | 453 | if _, err := io.ReadFull(s.bufRead, hdr); err != nil { |
@@ -428,22 +463,12 @@ func (s *Session) recvLoop() error { | |||
428 | return ErrInvalidVersion | 463 | return ErrInvalidVersion |
429 | } | 464 | } |
430 | 465 | ||
431 | // Switch on the type | 466 | mt := hdr.MsgType() |
432 | switch hdr.MsgType() { | 467 | if mt < typeData || mt > typeGoAway { |
433 | case typeData: | ||
434 | handler = s.handleStreamMessage | ||
435 | case typeWindowUpdate: | ||
436 | handler = s.handleStreamMessage | ||
437 | case typeGoAway: | ||
438 | handler = s.handleGoAway | ||
439 | case typePing: | ||
440 | handler = s.handlePing | ||
441 | default: | ||
442 | return ErrInvalidMsgType | 468 | return ErrInvalidMsgType |
443 | } | 469 | } |
444 | 470 | ||
445 | // Invoke the handler | 471 | if err := handlers[mt](s, hdr); err != nil { |
446 | if err := handler(hdr); err != nil { | ||
447 | return err | 472 | return err |
448 | } | 473 | } |
449 | } | 474 | } |