]>
Commit | Line | Data |
---|---|---|
bae9f6d2 JC |
1 | package yamux |
2 | ||
3 | import ( | |
4 | "bufio" | |
5 | "fmt" | |
6 | "io" | |
7 | "io/ioutil" | |
8 | "log" | |
9 | "math" | |
10 | "net" | |
11 | "strings" | |
12 | "sync" | |
13 | "sync/atomic" | |
14 | "time" | |
15 | ) | |
16 | ||
17 | // Session is used to wrap a reliable ordered connection and to | |
18 | // multiplex it into multiple streams. | |
19 | type Session struct { | |
20 | // remoteGoAway indicates the remote side does | |
21 | // not want futher connections. Must be first for alignment. | |
22 | remoteGoAway int32 | |
23 | ||
24 | // localGoAway indicates that we should stop | |
25 | // accepting futher connections. Must be first for alignment. | |
26 | localGoAway int32 | |
27 | ||
28 | // nextStreamID is the next stream we should | |
29 | // send. This depends if we are a client/server. | |
30 | nextStreamID uint32 | |
31 | ||
32 | // config holds our configuration | |
33 | config *Config | |
34 | ||
35 | // logger is used for our logs | |
36 | logger *log.Logger | |
37 | ||
38 | // conn is the underlying connection | |
39 | conn io.ReadWriteCloser | |
40 | ||
41 | // bufRead is a buffered reader | |
42 | bufRead *bufio.Reader | |
43 | ||
44 | // pings is used to track inflight pings | |
45 | pings map[uint32]chan struct{} | |
46 | pingID uint32 | |
47 | pingLock sync.Mutex | |
48 | ||
49 | // streams maps a stream id to a stream, and inflight has an entry | |
50 | // for any outgoing stream that has not yet been established. Both are | |
51 | // protected by streamLock. | |
52 | streams map[uint32]*Stream | |
53 | inflight map[uint32]struct{} | |
54 | streamLock sync.Mutex | |
55 | ||
56 | // synCh acts like a semaphore. It is sized to the AcceptBacklog which | |
57 | // is assumed to be symmetric between the client and server. This allows | |
58 | // the client to avoid exceeding the backlog and instead blocks the open. | |
59 | synCh chan struct{} | |
60 | ||
61 | // acceptCh is used to pass ready streams to the client | |
62 | acceptCh chan *Stream | |
63 | ||
64 | // sendCh is used to mark a stream as ready to send, | |
65 | // or to send a header out directly. | |
66 | sendCh chan sendReady | |
67 | ||
68 | // recvDoneCh is closed when recv() exits to avoid a race | |
69 | // between stream registration and stream shutdown | |
70 | recvDoneCh chan struct{} | |
71 | ||
72 | // shutdown is used to safely close a session | |
73 | shutdown bool | |
74 | shutdownErr error | |
75 | shutdownCh chan struct{} | |
76 | shutdownLock sync.Mutex | |
77 | } | |
78 | ||
79 | // sendReady is used to either mark a stream as ready | |
80 | // or to directly send a header | |
81 | type sendReady struct { | |
82 | Hdr []byte | |
83 | Body io.Reader | |
84 | Err chan error | |
85 | } | |
86 | ||
87 | // newSession is used to construct a new session | |
88 | func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { | |
89 | s := &Session{ | |
90 | config: config, | |
91 | logger: log.New(config.LogOutput, "", log.LstdFlags), | |
92 | conn: conn, | |
93 | bufRead: bufio.NewReader(conn), | |
94 | pings: make(map[uint32]chan struct{}), | |
95 | streams: make(map[uint32]*Stream), | |
96 | inflight: make(map[uint32]struct{}), | |
97 | synCh: make(chan struct{}, config.AcceptBacklog), | |
98 | acceptCh: make(chan *Stream, config.AcceptBacklog), | |
99 | sendCh: make(chan sendReady, 64), | |
100 | recvDoneCh: make(chan struct{}), | |
101 | shutdownCh: make(chan struct{}), | |
102 | } | |
103 | if client { | |
104 | s.nextStreamID = 1 | |
105 | } else { | |
106 | s.nextStreamID = 2 | |
107 | } | |
108 | go s.recv() | |
109 | go s.send() | |
110 | if config.EnableKeepAlive { | |
111 | go s.keepalive() | |
112 | } | |
113 | return s | |
114 | } | |
115 | ||
116 | // IsClosed does a safe check to see if we have shutdown | |
117 | func (s *Session) IsClosed() bool { | |
118 | select { | |
119 | case <-s.shutdownCh: | |
120 | return true | |
121 | default: | |
122 | return false | |
123 | } | |
124 | } | |
125 | ||
107c1cdb ND |
126 | // CloseChan returns a read-only channel which is closed as |
127 | // soon as the session is closed. | |
128 | func (s *Session) CloseChan() <-chan struct{} { | |
129 | return s.shutdownCh | |
130 | } | |
131 | ||
bae9f6d2 JC |
132 | // NumStreams returns the number of currently open streams |
133 | func (s *Session) NumStreams() int { | |
134 | s.streamLock.Lock() | |
135 | num := len(s.streams) | |
136 | s.streamLock.Unlock() | |
137 | return num | |
138 | } | |
139 | ||
140 | // Open is used to create a new stream as a net.Conn | |
141 | func (s *Session) Open() (net.Conn, error) { | |
142 | conn, err := s.OpenStream() | |
143 | if err != nil { | |
144 | return nil, err | |
145 | } | |
146 | return conn, nil | |
147 | } | |
148 | ||
149 | // OpenStream is used to create a new stream | |
150 | func (s *Session) OpenStream() (*Stream, error) { | |
151 | if s.IsClosed() { | |
152 | return nil, ErrSessionShutdown | |
153 | } | |
154 | if atomic.LoadInt32(&s.remoteGoAway) == 1 { | |
155 | return nil, ErrRemoteGoAway | |
156 | } | |
157 | ||
158 | // Block if we have too many inflight SYNs | |
159 | select { | |
160 | case s.synCh <- struct{}{}: | |
161 | case <-s.shutdownCh: | |
162 | return nil, ErrSessionShutdown | |
163 | } | |
164 | ||
165 | GET_ID: | |
166 | // Get an ID, and check for stream exhaustion | |
167 | id := atomic.LoadUint32(&s.nextStreamID) | |
168 | if id >= math.MaxUint32-1 { | |
169 | return nil, ErrStreamsExhausted | |
170 | } | |
171 | if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) { | |
172 | goto GET_ID | |
173 | } | |
174 | ||
175 | // Register the stream | |
176 | stream := newStream(s, id, streamInit) | |
177 | s.streamLock.Lock() | |
178 | s.streams[id] = stream | |
179 | s.inflight[id] = struct{}{} | |
180 | s.streamLock.Unlock() | |
181 | ||
182 | // Send the window update to create | |
183 | if err := stream.sendWindowUpdate(); err != nil { | |
184 | select { | |
185 | case <-s.synCh: | |
186 | default: | |
187 | s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore") | |
188 | } | |
189 | return nil, err | |
190 | } | |
191 | return stream, nil | |
192 | } | |
193 | ||
194 | // Accept is used to block until the next available stream | |
195 | // is ready to be accepted. | |
196 | func (s *Session) Accept() (net.Conn, error) { | |
197 | conn, err := s.AcceptStream() | |
198 | if err != nil { | |
199 | return nil, err | |
200 | } | |
201 | return conn, err | |
202 | } | |
203 | ||
204 | // AcceptStream is used to block until the next available stream | |
205 | // is ready to be accepted. | |
206 | func (s *Session) AcceptStream() (*Stream, error) { | |
207 | select { | |
208 | case stream := <-s.acceptCh: | |
209 | if err := stream.sendWindowUpdate(); err != nil { | |
210 | return nil, err | |
211 | } | |
212 | return stream, nil | |
213 | case <-s.shutdownCh: | |
214 | return nil, s.shutdownErr | |
215 | } | |
216 | } | |
217 | ||
218 | // Close is used to close the session and all streams. | |
219 | // Attempts to send a GoAway before closing the connection. | |
220 | func (s *Session) Close() error { | |
221 | s.shutdownLock.Lock() | |
222 | defer s.shutdownLock.Unlock() | |
223 | ||
224 | if s.shutdown { | |
225 | return nil | |
226 | } | |
227 | s.shutdown = true | |
228 | if s.shutdownErr == nil { | |
229 | s.shutdownErr = ErrSessionShutdown | |
230 | } | |
231 | close(s.shutdownCh) | |
232 | s.conn.Close() | |
233 | <-s.recvDoneCh | |
234 | ||
235 | s.streamLock.Lock() | |
236 | defer s.streamLock.Unlock() | |
237 | for _, stream := range s.streams { | |
238 | stream.forceClose() | |
239 | } | |
240 | return nil | |
241 | } | |
242 | ||
243 | // exitErr is used to handle an error that is causing the | |
244 | // session to terminate. | |
245 | func (s *Session) exitErr(err error) { | |
246 | s.shutdownLock.Lock() | |
247 | if s.shutdownErr == nil { | |
248 | s.shutdownErr = err | |
249 | } | |
250 | s.shutdownLock.Unlock() | |
251 | s.Close() | |
252 | } | |
253 | ||
254 | // GoAway can be used to prevent accepting further | |
255 | // connections. It does not close the underlying conn. | |
256 | func (s *Session) GoAway() error { | |
257 | return s.waitForSend(s.goAway(goAwayNormal), nil) | |
258 | } | |
259 | ||
260 | // goAway is used to send a goAway message | |
261 | func (s *Session) goAway(reason uint32) header { | |
262 | atomic.SwapInt32(&s.localGoAway, 1) | |
263 | hdr := header(make([]byte, headerSize)) | |
264 | hdr.encode(typeGoAway, 0, 0, reason) | |
265 | return hdr | |
266 | } | |
267 | ||
268 | // Ping is used to measure the RTT response time | |
269 | func (s *Session) Ping() (time.Duration, error) { | |
270 | // Get a channel for the ping | |
271 | ch := make(chan struct{}) | |
272 | ||
273 | // Get a new ping id, mark as pending | |
274 | s.pingLock.Lock() | |
275 | id := s.pingID | |
276 | s.pingID++ | |
277 | s.pings[id] = ch | |
278 | s.pingLock.Unlock() | |
279 | ||
280 | // Send the ping request | |
281 | hdr := header(make([]byte, headerSize)) | |
282 | hdr.encode(typePing, flagSYN, 0, id) | |
283 | if err := s.waitForSend(hdr, nil); err != nil { | |
284 | return 0, err | |
285 | } | |
286 | ||
287 | // Wait for a response | |
288 | start := time.Now() | |
289 | select { | |
290 | case <-ch: | |
291 | case <-time.After(s.config.ConnectionWriteTimeout): | |
292 | s.pingLock.Lock() | |
293 | delete(s.pings, id) // Ignore it if a response comes later. | |
294 | s.pingLock.Unlock() | |
295 | return 0, ErrTimeout | |
296 | case <-s.shutdownCh: | |
297 | return 0, ErrSessionShutdown | |
298 | } | |
299 | ||
300 | // Compute the RTT | |
301 | return time.Now().Sub(start), nil | |
302 | } | |
303 | ||
304 | // keepalive is a long running goroutine that periodically does | |
305 | // a ping to keep the connection alive. | |
306 | func (s *Session) keepalive() { | |
307 | for { | |
308 | select { | |
309 | case <-time.After(s.config.KeepAliveInterval): | |
310 | _, err := s.Ping() | |
311 | if err != nil { | |
107c1cdb ND |
312 | if err != ErrSessionShutdown { |
313 | s.logger.Printf("[ERR] yamux: keepalive failed: %v", err) | |
314 | s.exitErr(ErrKeepAliveTimeout) | |
315 | } | |
bae9f6d2 JC |
316 | return |
317 | } | |
318 | case <-s.shutdownCh: | |
319 | return | |
320 | } | |
321 | } | |
322 | } | |
323 | ||
324 | // waitForSendErr waits to send a header, checking for a potential shutdown | |
325 | func (s *Session) waitForSend(hdr header, body io.Reader) error { | |
326 | errCh := make(chan error, 1) | |
327 | return s.waitForSendErr(hdr, body, errCh) | |
328 | } | |
329 | ||
330 | // waitForSendErr waits to send a header with optional data, checking for a | |
331 | // potential shutdown. Since there's the expectation that sends can happen | |
332 | // in a timely manner, we enforce the connection write timeout here. | |
333 | func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error { | |
107c1cdb ND |
334 | t := timerPool.Get() |
335 | timer := t.(*time.Timer) | |
336 | timer.Reset(s.config.ConnectionWriteTimeout) | |
337 | defer func() { | |
338 | timer.Stop() | |
339 | select { | |
340 | case <-timer.C: | |
341 | default: | |
342 | } | |
343 | timerPool.Put(t) | |
344 | }() | |
bae9f6d2 JC |
345 | |
346 | ready := sendReady{Hdr: hdr, Body: body, Err: errCh} | |
347 | select { | |
348 | case s.sendCh <- ready: | |
349 | case <-s.shutdownCh: | |
350 | return ErrSessionShutdown | |
351 | case <-timer.C: | |
352 | return ErrConnectionWriteTimeout | |
353 | } | |
354 | ||
355 | select { | |
356 | case err := <-errCh: | |
357 | return err | |
358 | case <-s.shutdownCh: | |
359 | return ErrSessionShutdown | |
360 | case <-timer.C: | |
361 | return ErrConnectionWriteTimeout | |
362 | } | |
363 | } | |
364 | ||
365 | // sendNoWait does a send without waiting. Since there's the expectation that | |
366 | // the send happens right here, we enforce the connection write timeout if we | |
367 | // can't queue the header to be sent. | |
368 | func (s *Session) sendNoWait(hdr header) error { | |
107c1cdb ND |
369 | t := timerPool.Get() |
370 | timer := t.(*time.Timer) | |
371 | timer.Reset(s.config.ConnectionWriteTimeout) | |
372 | defer func() { | |
373 | timer.Stop() | |
374 | select { | |
375 | case <-timer.C: | |
376 | default: | |
377 | } | |
378 | timerPool.Put(t) | |
379 | }() | |
bae9f6d2 JC |
380 | |
381 | select { | |
382 | case s.sendCh <- sendReady{Hdr: hdr}: | |
383 | return nil | |
384 | case <-s.shutdownCh: | |
385 | return ErrSessionShutdown | |
386 | case <-timer.C: | |
387 | return ErrConnectionWriteTimeout | |
388 | } | |
389 | } | |
390 | ||
391 | // send is a long running goroutine that sends data | |
392 | func (s *Session) send() { | |
393 | for { | |
394 | select { | |
395 | case ready := <-s.sendCh: | |
396 | // Send a header if ready | |
397 | if ready.Hdr != nil { | |
398 | sent := 0 | |
399 | for sent < len(ready.Hdr) { | |
400 | n, err := s.conn.Write(ready.Hdr[sent:]) | |
401 | if err != nil { | |
402 | s.logger.Printf("[ERR] yamux: Failed to write header: %v", err) | |
403 | asyncSendErr(ready.Err, err) | |
404 | s.exitErr(err) | |
405 | return | |
406 | } | |
407 | sent += n | |
408 | } | |
409 | } | |
410 | ||
411 | // Send data from a body if given | |
412 | if ready.Body != nil { | |
413 | _, err := io.Copy(s.conn, ready.Body) | |
414 | if err != nil { | |
415 | s.logger.Printf("[ERR] yamux: Failed to write body: %v", err) | |
416 | asyncSendErr(ready.Err, err) | |
417 | s.exitErr(err) | |
418 | return | |
419 | } | |
420 | } | |
421 | ||
422 | // No error, successful send | |
423 | asyncSendErr(ready.Err, nil) | |
424 | case <-s.shutdownCh: | |
425 | return | |
426 | } | |
427 | } | |
428 | } | |
429 | ||
430 | // recv is a long running goroutine that accepts new data | |
431 | func (s *Session) recv() { | |
432 | if err := s.recvLoop(); err != nil { | |
433 | s.exitErr(err) | |
434 | } | |
435 | } | |
436 | ||
107c1cdb ND |
437 | // Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type |
438 | var ( | |
439 | handlers = []func(*Session, header) error{ | |
440 | typeData: (*Session).handleStreamMessage, | |
441 | typeWindowUpdate: (*Session).handleStreamMessage, | |
442 | typePing: (*Session).handlePing, | |
443 | typeGoAway: (*Session).handleGoAway, | |
444 | } | |
445 | ) | |
446 | ||
bae9f6d2 JC |
447 | // recvLoop continues to receive data until a fatal error is encountered |
448 | func (s *Session) recvLoop() error { | |
449 | defer close(s.recvDoneCh) | |
450 | hdr := header(make([]byte, headerSize)) | |
bae9f6d2 JC |
451 | for { |
452 | // Read the header | |
453 | if _, err := io.ReadFull(s.bufRead, hdr); err != nil { | |
454 | if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") { | |
455 | s.logger.Printf("[ERR] yamux: Failed to read header: %v", err) | |
456 | } | |
457 | return err | |
458 | } | |
459 | ||
460 | // Verify the version | |
461 | if hdr.Version() != protoVersion { | |
462 | s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version()) | |
463 | return ErrInvalidVersion | |
464 | } | |
465 | ||
107c1cdb ND |
466 | mt := hdr.MsgType() |
467 | if mt < typeData || mt > typeGoAway { | |
bae9f6d2 JC |
468 | return ErrInvalidMsgType |
469 | } | |
470 | ||
107c1cdb | 471 | if err := handlers[mt](s, hdr); err != nil { |
bae9f6d2 JC |
472 | return err |
473 | } | |
474 | } | |
475 | } | |
476 | ||
477 | // handleStreamMessage handles either a data or window update frame | |
478 | func (s *Session) handleStreamMessage(hdr header) error { | |
479 | // Check for a new stream creation | |
480 | id := hdr.StreamID() | |
481 | flags := hdr.Flags() | |
482 | if flags&flagSYN == flagSYN { | |
483 | if err := s.incomingStream(id); err != nil { | |
484 | return err | |
485 | } | |
486 | } | |
487 | ||
488 | // Get the stream | |
489 | s.streamLock.Lock() | |
490 | stream := s.streams[id] | |
491 | s.streamLock.Unlock() | |
492 | ||
493 | // If we do not have a stream, likely we sent a RST | |
494 | if stream == nil { | |
495 | // Drain any data on the wire | |
496 | if hdr.MsgType() == typeData && hdr.Length() > 0 { | |
497 | s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id) | |
498 | if _, err := io.CopyN(ioutil.Discard, s.bufRead, int64(hdr.Length())); err != nil { | |
499 | s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err) | |
500 | return nil | |
501 | } | |
502 | } else { | |
503 | s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr) | |
504 | } | |
505 | return nil | |
506 | } | |
507 | ||
508 | // Check if this is a window update | |
509 | if hdr.MsgType() == typeWindowUpdate { | |
510 | if err := stream.incrSendWindow(hdr, flags); err != nil { | |
511 | if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { | |
512 | s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) | |
513 | } | |
514 | return err | |
515 | } | |
516 | return nil | |
517 | } | |
518 | ||
519 | // Read the new data | |
520 | if err := stream.readData(hdr, flags, s.bufRead); err != nil { | |
521 | if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { | |
522 | s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) | |
523 | } | |
524 | return err | |
525 | } | |
526 | return nil | |
527 | } | |
528 | ||
529 | // handlePing is invokde for a typePing frame | |
530 | func (s *Session) handlePing(hdr header) error { | |
531 | flags := hdr.Flags() | |
532 | pingID := hdr.Length() | |
533 | ||
534 | // Check if this is a query, respond back in a separate context so we | |
535 | // don't interfere with the receiving thread blocking for the write. | |
536 | if flags&flagSYN == flagSYN { | |
537 | go func() { | |
538 | hdr := header(make([]byte, headerSize)) | |
539 | hdr.encode(typePing, flagACK, 0, pingID) | |
540 | if err := s.sendNoWait(hdr); err != nil { | |
541 | s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err) | |
542 | } | |
543 | }() | |
544 | return nil | |
545 | } | |
546 | ||
547 | // Handle a response | |
548 | s.pingLock.Lock() | |
549 | ch := s.pings[pingID] | |
550 | if ch != nil { | |
551 | delete(s.pings, pingID) | |
552 | close(ch) | |
553 | } | |
554 | s.pingLock.Unlock() | |
555 | return nil | |
556 | } | |
557 | ||
558 | // handleGoAway is invokde for a typeGoAway frame | |
559 | func (s *Session) handleGoAway(hdr header) error { | |
560 | code := hdr.Length() | |
561 | switch code { | |
562 | case goAwayNormal: | |
563 | atomic.SwapInt32(&s.remoteGoAway, 1) | |
564 | case goAwayProtoErr: | |
565 | s.logger.Printf("[ERR] yamux: received protocol error go away") | |
566 | return fmt.Errorf("yamux protocol error") | |
567 | case goAwayInternalErr: | |
568 | s.logger.Printf("[ERR] yamux: received internal error go away") | |
569 | return fmt.Errorf("remote yamux internal error") | |
570 | default: | |
571 | s.logger.Printf("[ERR] yamux: received unexpected go away") | |
572 | return fmt.Errorf("unexpected go away received") | |
573 | } | |
574 | return nil | |
575 | } | |
576 | ||
577 | // incomingStream is used to create a new incoming stream | |
578 | func (s *Session) incomingStream(id uint32) error { | |
579 | // Reject immediately if we are doing a go away | |
580 | if atomic.LoadInt32(&s.localGoAway) == 1 { | |
581 | hdr := header(make([]byte, headerSize)) | |
582 | hdr.encode(typeWindowUpdate, flagRST, id, 0) | |
583 | return s.sendNoWait(hdr) | |
584 | } | |
585 | ||
586 | // Allocate a new stream | |
587 | stream := newStream(s, id, streamSYNReceived) | |
588 | ||
589 | s.streamLock.Lock() | |
590 | defer s.streamLock.Unlock() | |
591 | ||
592 | // Check if stream already exists | |
593 | if _, ok := s.streams[id]; ok { | |
594 | s.logger.Printf("[ERR] yamux: duplicate stream declared") | |
595 | if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { | |
596 | s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) | |
597 | } | |
598 | return ErrDuplicateStream | |
599 | } | |
600 | ||
601 | // Register the stream | |
602 | s.streams[id] = stream | |
603 | ||
604 | // Check if we've exceeded the backlog | |
605 | select { | |
606 | case s.acceptCh <- stream: | |
607 | return nil | |
608 | default: | |
609 | // Backlog exceeded! RST the stream | |
610 | s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset") | |
611 | delete(s.streams, id) | |
612 | stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0) | |
613 | return s.sendNoWait(stream.sendHdr) | |
614 | } | |
615 | } | |
616 | ||
617 | // closeStream is used to close a stream once both sides have | |
618 | // issued a close. If there was an in-flight SYN and the stream | |
619 | // was not yet established, then this will give the credit back. | |
620 | func (s *Session) closeStream(id uint32) { | |
621 | s.streamLock.Lock() | |
622 | if _, ok := s.inflight[id]; ok { | |
623 | select { | |
624 | case <-s.synCh: | |
625 | default: | |
626 | s.logger.Printf("[ERR] yamux: SYN tracking out of sync") | |
627 | } | |
628 | } | |
629 | delete(s.streams, id) | |
630 | s.streamLock.Unlock() | |
631 | } | |
632 | ||
633 | // establishStream is used to mark a stream that was in the | |
634 | // SYN Sent state as established. | |
635 | func (s *Session) establishStream(id uint32) { | |
636 | s.streamLock.Lock() | |
637 | if _, ok := s.inflight[id]; ok { | |
638 | delete(s.inflight, id) | |
639 | } else { | |
640 | s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)") | |
641 | } | |
642 | select { | |
643 | case <-s.synCh: | |
644 | default: | |
645 | s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)") | |
646 | } | |
647 | s.streamLock.Unlock() | |
648 | } |