]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blame - vendor/github.com/hashicorp/yamux/session.go
Upgrade to 0.12
[github/fretlink/terraform-provider-statuscake.git] / vendor / github.com / hashicorp / yamux / session.go
CommitLineData
bae9f6d2
JC
1package yamux
2
3import (
4 "bufio"
5 "fmt"
6 "io"
7 "io/ioutil"
8 "log"
9 "math"
10 "net"
11 "strings"
12 "sync"
13 "sync/atomic"
14 "time"
15)
16
17// Session is used to wrap a reliable ordered connection and to
18// multiplex it into multiple streams.
19type Session struct {
20 // remoteGoAway indicates the remote side does
21 // not want futher connections. Must be first for alignment.
22 remoteGoAway int32
23
24 // localGoAway indicates that we should stop
25 // accepting futher connections. Must be first for alignment.
26 localGoAway int32
27
28 // nextStreamID is the next stream we should
29 // send. This depends if we are a client/server.
30 nextStreamID uint32
31
32 // config holds our configuration
33 config *Config
34
35 // logger is used for our logs
36 logger *log.Logger
37
38 // conn is the underlying connection
39 conn io.ReadWriteCloser
40
41 // bufRead is a buffered reader
42 bufRead *bufio.Reader
43
44 // pings is used to track inflight pings
45 pings map[uint32]chan struct{}
46 pingID uint32
47 pingLock sync.Mutex
48
49 // streams maps a stream id to a stream, and inflight has an entry
50 // for any outgoing stream that has not yet been established. Both are
51 // protected by streamLock.
52 streams map[uint32]*Stream
53 inflight map[uint32]struct{}
54 streamLock sync.Mutex
55
56 // synCh acts like a semaphore. It is sized to the AcceptBacklog which
57 // is assumed to be symmetric between the client and server. This allows
58 // the client to avoid exceeding the backlog and instead blocks the open.
59 synCh chan struct{}
60
61 // acceptCh is used to pass ready streams to the client
62 acceptCh chan *Stream
63
64 // sendCh is used to mark a stream as ready to send,
65 // or to send a header out directly.
66 sendCh chan sendReady
67
68 // recvDoneCh is closed when recv() exits to avoid a race
69 // between stream registration and stream shutdown
70 recvDoneCh chan struct{}
71
72 // shutdown is used to safely close a session
73 shutdown bool
74 shutdownErr error
75 shutdownCh chan struct{}
76 shutdownLock sync.Mutex
77}
78
79// sendReady is used to either mark a stream as ready
80// or to directly send a header
81type sendReady struct {
82 Hdr []byte
83 Body io.Reader
84 Err chan error
85}
86
87// newSession is used to construct a new session
88func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
89 s := &Session{
90 config: config,
91 logger: log.New(config.LogOutput, "", log.LstdFlags),
92 conn: conn,
93 bufRead: bufio.NewReader(conn),
94 pings: make(map[uint32]chan struct{}),
95 streams: make(map[uint32]*Stream),
96 inflight: make(map[uint32]struct{}),
97 synCh: make(chan struct{}, config.AcceptBacklog),
98 acceptCh: make(chan *Stream, config.AcceptBacklog),
99 sendCh: make(chan sendReady, 64),
100 recvDoneCh: make(chan struct{}),
101 shutdownCh: make(chan struct{}),
102 }
103 if client {
104 s.nextStreamID = 1
105 } else {
106 s.nextStreamID = 2
107 }
108 go s.recv()
109 go s.send()
110 if config.EnableKeepAlive {
111 go s.keepalive()
112 }
113 return s
114}
115
116// IsClosed does a safe check to see if we have shutdown
117func (s *Session) IsClosed() bool {
118 select {
119 case <-s.shutdownCh:
120 return true
121 default:
122 return false
123 }
124}
125
107c1cdb
ND
126// CloseChan returns a read-only channel which is closed as
127// soon as the session is closed.
128func (s *Session) CloseChan() <-chan struct{} {
129 return s.shutdownCh
130}
131
bae9f6d2
JC
132// NumStreams returns the number of currently open streams
133func (s *Session) NumStreams() int {
134 s.streamLock.Lock()
135 num := len(s.streams)
136 s.streamLock.Unlock()
137 return num
138}
139
140// Open is used to create a new stream as a net.Conn
141func (s *Session) Open() (net.Conn, error) {
142 conn, err := s.OpenStream()
143 if err != nil {
144 return nil, err
145 }
146 return conn, nil
147}
148
149// OpenStream is used to create a new stream
150func (s *Session) OpenStream() (*Stream, error) {
151 if s.IsClosed() {
152 return nil, ErrSessionShutdown
153 }
154 if atomic.LoadInt32(&s.remoteGoAway) == 1 {
155 return nil, ErrRemoteGoAway
156 }
157
158 // Block if we have too many inflight SYNs
159 select {
160 case s.synCh <- struct{}{}:
161 case <-s.shutdownCh:
162 return nil, ErrSessionShutdown
163 }
164
165GET_ID:
166 // Get an ID, and check for stream exhaustion
167 id := atomic.LoadUint32(&s.nextStreamID)
168 if id >= math.MaxUint32-1 {
169 return nil, ErrStreamsExhausted
170 }
171 if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) {
172 goto GET_ID
173 }
174
175 // Register the stream
176 stream := newStream(s, id, streamInit)
177 s.streamLock.Lock()
178 s.streams[id] = stream
179 s.inflight[id] = struct{}{}
180 s.streamLock.Unlock()
181
182 // Send the window update to create
183 if err := stream.sendWindowUpdate(); err != nil {
184 select {
185 case <-s.synCh:
186 default:
187 s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore")
188 }
189 return nil, err
190 }
191 return stream, nil
192}
193
194// Accept is used to block until the next available stream
195// is ready to be accepted.
196func (s *Session) Accept() (net.Conn, error) {
197 conn, err := s.AcceptStream()
198 if err != nil {
199 return nil, err
200 }
201 return conn, err
202}
203
204// AcceptStream is used to block until the next available stream
205// is ready to be accepted.
206func (s *Session) AcceptStream() (*Stream, error) {
207 select {
208 case stream := <-s.acceptCh:
209 if err := stream.sendWindowUpdate(); err != nil {
210 return nil, err
211 }
212 return stream, nil
213 case <-s.shutdownCh:
214 return nil, s.shutdownErr
215 }
216}
217
218// Close is used to close the session and all streams.
219// Attempts to send a GoAway before closing the connection.
220func (s *Session) Close() error {
221 s.shutdownLock.Lock()
222 defer s.shutdownLock.Unlock()
223
224 if s.shutdown {
225 return nil
226 }
227 s.shutdown = true
228 if s.shutdownErr == nil {
229 s.shutdownErr = ErrSessionShutdown
230 }
231 close(s.shutdownCh)
232 s.conn.Close()
233 <-s.recvDoneCh
234
235 s.streamLock.Lock()
236 defer s.streamLock.Unlock()
237 for _, stream := range s.streams {
238 stream.forceClose()
239 }
240 return nil
241}
242
243// exitErr is used to handle an error that is causing the
244// session to terminate.
245func (s *Session) exitErr(err error) {
246 s.shutdownLock.Lock()
247 if s.shutdownErr == nil {
248 s.shutdownErr = err
249 }
250 s.shutdownLock.Unlock()
251 s.Close()
252}
253
254// GoAway can be used to prevent accepting further
255// connections. It does not close the underlying conn.
256func (s *Session) GoAway() error {
257 return s.waitForSend(s.goAway(goAwayNormal), nil)
258}
259
260// goAway is used to send a goAway message
261func (s *Session) goAway(reason uint32) header {
262 atomic.SwapInt32(&s.localGoAway, 1)
263 hdr := header(make([]byte, headerSize))
264 hdr.encode(typeGoAway, 0, 0, reason)
265 return hdr
266}
267
268// Ping is used to measure the RTT response time
269func (s *Session) Ping() (time.Duration, error) {
270 // Get a channel for the ping
271 ch := make(chan struct{})
272
273 // Get a new ping id, mark as pending
274 s.pingLock.Lock()
275 id := s.pingID
276 s.pingID++
277 s.pings[id] = ch
278 s.pingLock.Unlock()
279
280 // Send the ping request
281 hdr := header(make([]byte, headerSize))
282 hdr.encode(typePing, flagSYN, 0, id)
283 if err := s.waitForSend(hdr, nil); err != nil {
284 return 0, err
285 }
286
287 // Wait for a response
288 start := time.Now()
289 select {
290 case <-ch:
291 case <-time.After(s.config.ConnectionWriteTimeout):
292 s.pingLock.Lock()
293 delete(s.pings, id) // Ignore it if a response comes later.
294 s.pingLock.Unlock()
295 return 0, ErrTimeout
296 case <-s.shutdownCh:
297 return 0, ErrSessionShutdown
298 }
299
300 // Compute the RTT
301 return time.Now().Sub(start), nil
302}
303
304// keepalive is a long running goroutine that periodically does
305// a ping to keep the connection alive.
306func (s *Session) keepalive() {
307 for {
308 select {
309 case <-time.After(s.config.KeepAliveInterval):
310 _, err := s.Ping()
311 if err != nil {
107c1cdb
ND
312 if err != ErrSessionShutdown {
313 s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
314 s.exitErr(ErrKeepAliveTimeout)
315 }
bae9f6d2
JC
316 return
317 }
318 case <-s.shutdownCh:
319 return
320 }
321 }
322}
323
324// waitForSendErr waits to send a header, checking for a potential shutdown
325func (s *Session) waitForSend(hdr header, body io.Reader) error {
326 errCh := make(chan error, 1)
327 return s.waitForSendErr(hdr, body, errCh)
328}
329
330// waitForSendErr waits to send a header with optional data, checking for a
331// potential shutdown. Since there's the expectation that sends can happen
332// in a timely manner, we enforce the connection write timeout here.
333func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error {
107c1cdb
ND
334 t := timerPool.Get()
335 timer := t.(*time.Timer)
336 timer.Reset(s.config.ConnectionWriteTimeout)
337 defer func() {
338 timer.Stop()
339 select {
340 case <-timer.C:
341 default:
342 }
343 timerPool.Put(t)
344 }()
bae9f6d2
JC
345
346 ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
347 select {
348 case s.sendCh <- ready:
349 case <-s.shutdownCh:
350 return ErrSessionShutdown
351 case <-timer.C:
352 return ErrConnectionWriteTimeout
353 }
354
355 select {
356 case err := <-errCh:
357 return err
358 case <-s.shutdownCh:
359 return ErrSessionShutdown
360 case <-timer.C:
361 return ErrConnectionWriteTimeout
362 }
363}
364
365// sendNoWait does a send without waiting. Since there's the expectation that
366// the send happens right here, we enforce the connection write timeout if we
367// can't queue the header to be sent.
368func (s *Session) sendNoWait(hdr header) error {
107c1cdb
ND
369 t := timerPool.Get()
370 timer := t.(*time.Timer)
371 timer.Reset(s.config.ConnectionWriteTimeout)
372 defer func() {
373 timer.Stop()
374 select {
375 case <-timer.C:
376 default:
377 }
378 timerPool.Put(t)
379 }()
bae9f6d2
JC
380
381 select {
382 case s.sendCh <- sendReady{Hdr: hdr}:
383 return nil
384 case <-s.shutdownCh:
385 return ErrSessionShutdown
386 case <-timer.C:
387 return ErrConnectionWriteTimeout
388 }
389}
390
391// send is a long running goroutine that sends data
392func (s *Session) send() {
393 for {
394 select {
395 case ready := <-s.sendCh:
396 // Send a header if ready
397 if ready.Hdr != nil {
398 sent := 0
399 for sent < len(ready.Hdr) {
400 n, err := s.conn.Write(ready.Hdr[sent:])
401 if err != nil {
402 s.logger.Printf("[ERR] yamux: Failed to write header: %v", err)
403 asyncSendErr(ready.Err, err)
404 s.exitErr(err)
405 return
406 }
407 sent += n
408 }
409 }
410
411 // Send data from a body if given
412 if ready.Body != nil {
413 _, err := io.Copy(s.conn, ready.Body)
414 if err != nil {
415 s.logger.Printf("[ERR] yamux: Failed to write body: %v", err)
416 asyncSendErr(ready.Err, err)
417 s.exitErr(err)
418 return
419 }
420 }
421
422 // No error, successful send
423 asyncSendErr(ready.Err, nil)
424 case <-s.shutdownCh:
425 return
426 }
427 }
428}
429
430// recv is a long running goroutine that accepts new data
431func (s *Session) recv() {
432 if err := s.recvLoop(); err != nil {
433 s.exitErr(err)
434 }
435}
436
107c1cdb
ND
437// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type
438var (
439 handlers = []func(*Session, header) error{
440 typeData: (*Session).handleStreamMessage,
441 typeWindowUpdate: (*Session).handleStreamMessage,
442 typePing: (*Session).handlePing,
443 typeGoAway: (*Session).handleGoAway,
444 }
445)
446
bae9f6d2
JC
447// recvLoop continues to receive data until a fatal error is encountered
448func (s *Session) recvLoop() error {
449 defer close(s.recvDoneCh)
450 hdr := header(make([]byte, headerSize))
bae9f6d2
JC
451 for {
452 // Read the header
453 if _, err := io.ReadFull(s.bufRead, hdr); err != nil {
454 if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") {
455 s.logger.Printf("[ERR] yamux: Failed to read header: %v", err)
456 }
457 return err
458 }
459
460 // Verify the version
461 if hdr.Version() != protoVersion {
462 s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version())
463 return ErrInvalidVersion
464 }
465
107c1cdb
ND
466 mt := hdr.MsgType()
467 if mt < typeData || mt > typeGoAway {
bae9f6d2
JC
468 return ErrInvalidMsgType
469 }
470
107c1cdb 471 if err := handlers[mt](s, hdr); err != nil {
bae9f6d2
JC
472 return err
473 }
474 }
475}
476
477// handleStreamMessage handles either a data or window update frame
478func (s *Session) handleStreamMessage(hdr header) error {
479 // Check for a new stream creation
480 id := hdr.StreamID()
481 flags := hdr.Flags()
482 if flags&flagSYN == flagSYN {
483 if err := s.incomingStream(id); err != nil {
484 return err
485 }
486 }
487
488 // Get the stream
489 s.streamLock.Lock()
490 stream := s.streams[id]
491 s.streamLock.Unlock()
492
493 // If we do not have a stream, likely we sent a RST
494 if stream == nil {
495 // Drain any data on the wire
496 if hdr.MsgType() == typeData && hdr.Length() > 0 {
497 s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id)
498 if _, err := io.CopyN(ioutil.Discard, s.bufRead, int64(hdr.Length())); err != nil {
499 s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err)
500 return nil
501 }
502 } else {
503 s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr)
504 }
505 return nil
506 }
507
508 // Check if this is a window update
509 if hdr.MsgType() == typeWindowUpdate {
510 if err := stream.incrSendWindow(hdr, flags); err != nil {
511 if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
512 s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
513 }
514 return err
515 }
516 return nil
517 }
518
519 // Read the new data
520 if err := stream.readData(hdr, flags, s.bufRead); err != nil {
521 if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
522 s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
523 }
524 return err
525 }
526 return nil
527}
528
529// handlePing is invokde for a typePing frame
530func (s *Session) handlePing(hdr header) error {
531 flags := hdr.Flags()
532 pingID := hdr.Length()
533
534 // Check if this is a query, respond back in a separate context so we
535 // don't interfere with the receiving thread blocking for the write.
536 if flags&flagSYN == flagSYN {
537 go func() {
538 hdr := header(make([]byte, headerSize))
539 hdr.encode(typePing, flagACK, 0, pingID)
540 if err := s.sendNoWait(hdr); err != nil {
541 s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err)
542 }
543 }()
544 return nil
545 }
546
547 // Handle a response
548 s.pingLock.Lock()
549 ch := s.pings[pingID]
550 if ch != nil {
551 delete(s.pings, pingID)
552 close(ch)
553 }
554 s.pingLock.Unlock()
555 return nil
556}
557
558// handleGoAway is invokde for a typeGoAway frame
559func (s *Session) handleGoAway(hdr header) error {
560 code := hdr.Length()
561 switch code {
562 case goAwayNormal:
563 atomic.SwapInt32(&s.remoteGoAway, 1)
564 case goAwayProtoErr:
565 s.logger.Printf("[ERR] yamux: received protocol error go away")
566 return fmt.Errorf("yamux protocol error")
567 case goAwayInternalErr:
568 s.logger.Printf("[ERR] yamux: received internal error go away")
569 return fmt.Errorf("remote yamux internal error")
570 default:
571 s.logger.Printf("[ERR] yamux: received unexpected go away")
572 return fmt.Errorf("unexpected go away received")
573 }
574 return nil
575}
576
577// incomingStream is used to create a new incoming stream
578func (s *Session) incomingStream(id uint32) error {
579 // Reject immediately if we are doing a go away
580 if atomic.LoadInt32(&s.localGoAway) == 1 {
581 hdr := header(make([]byte, headerSize))
582 hdr.encode(typeWindowUpdate, flagRST, id, 0)
583 return s.sendNoWait(hdr)
584 }
585
586 // Allocate a new stream
587 stream := newStream(s, id, streamSYNReceived)
588
589 s.streamLock.Lock()
590 defer s.streamLock.Unlock()
591
592 // Check if stream already exists
593 if _, ok := s.streams[id]; ok {
594 s.logger.Printf("[ERR] yamux: duplicate stream declared")
595 if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
596 s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
597 }
598 return ErrDuplicateStream
599 }
600
601 // Register the stream
602 s.streams[id] = stream
603
604 // Check if we've exceeded the backlog
605 select {
606 case s.acceptCh <- stream:
607 return nil
608 default:
609 // Backlog exceeded! RST the stream
610 s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset")
611 delete(s.streams, id)
612 stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0)
613 return s.sendNoWait(stream.sendHdr)
614 }
615}
616
617// closeStream is used to close a stream once both sides have
618// issued a close. If there was an in-flight SYN and the stream
619// was not yet established, then this will give the credit back.
620func (s *Session) closeStream(id uint32) {
621 s.streamLock.Lock()
622 if _, ok := s.inflight[id]; ok {
623 select {
624 case <-s.synCh:
625 default:
626 s.logger.Printf("[ERR] yamux: SYN tracking out of sync")
627 }
628 }
629 delete(s.streams, id)
630 s.streamLock.Unlock()
631}
632
633// establishStream is used to mark a stream that was in the
634// SYN Sent state as established.
635func (s *Session) establishStream(id uint32) {
636 s.streamLock.Lock()
637 if _, ok := s.inflight[id]; ok {
638 delete(s.inflight, id)
639 } else {
640 s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)")
641 }
642 select {
643 case <-s.synCh:
644 default:
645 s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)")
646 }
647 s.streamLock.Unlock()
648}