aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/github.com/hashicorp/yamux
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hashicorp/yamux')
-rw-r--r--vendor/github.com/hashicorp/yamux/LICENSE362
-rw-r--r--vendor/github.com/hashicorp/yamux/README.md86
-rw-r--r--vendor/github.com/hashicorp/yamux/addr.go60
-rw-r--r--vendor/github.com/hashicorp/yamux/const.go157
-rw-r--r--vendor/github.com/hashicorp/yamux/mux.go87
-rw-r--r--vendor/github.com/hashicorp/yamux/session.go623
-rw-r--r--vendor/github.com/hashicorp/yamux/spec.md140
-rw-r--r--vendor/github.com/hashicorp/yamux/stream.go457
-rw-r--r--vendor/github.com/hashicorp/yamux/util.go28
9 files changed, 2000 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/yamux/LICENSE b/vendor/github.com/hashicorp/yamux/LICENSE
new file mode 100644
index 0000000..f0e5c79
--- /dev/null
+++ b/vendor/github.com/hashicorp/yamux/LICENSE
@@ -0,0 +1,362 @@
1Mozilla Public License, version 2.0
2
31. Definitions
4
51.1. "Contributor"
6
7 means each individual or legal entity that creates, contributes to the
8 creation of, or owns Covered Software.
9
101.2. "Contributor Version"
11
12 means the combination of the Contributions of others (if any) used by a
13 Contributor and that particular Contributor's Contribution.
14
151.3. "Contribution"
16
17 means Covered Software of a particular Contributor.
18
191.4. "Covered Software"
20
21 means Source Code Form to which the initial Contributor has attached the
22 notice in Exhibit A, the Executable Form of such Source Code Form, and
23 Modifications of such Source Code Form, in each case including portions
24 thereof.
25
261.5. "Incompatible With Secondary Licenses"
27 means
28
29 a. that the initial Contributor has attached the notice described in
30 Exhibit B to the Covered Software; or
31
32 b. that the Covered Software was made available under the terms of
33 version 1.1 or earlier of the License, but not also under the terms of
34 a Secondary License.
35
361.6. "Executable Form"
37
38 means any form of the work other than Source Code Form.
39
401.7. "Larger Work"
41
42 means a work that combines Covered Software with other material, in a
43 separate file or files, that is not Covered Software.
44
451.8. "License"
46
47 means this document.
48
491.9. "Licensable"
50
51 means having the right to grant, to the maximum extent possible, whether
52 at the time of the initial grant or subsequently, any and all of the
53 rights conveyed by this License.
54
551.10. "Modifications"
56
57 means any of the following:
58
59 a. any file in Source Code Form that results from an addition to,
60 deletion from, or modification of the contents of Covered Software; or
61
62 b. any new file in Source Code Form that contains any Covered Software.
63
641.11. "Patent Claims" of a Contributor
65
66 means any patent claim(s), including without limitation, method,
67 process, and apparatus claims, in any patent Licensable by such
68 Contributor that would be infringed, but for the grant of the License,
69 by the making, using, selling, offering for sale, having made, import,
70 or transfer of either its Contributions or its Contributor Version.
71
721.12. "Secondary License"
73
74 means either the GNU General Public License, Version 2.0, the GNU Lesser
75 General Public License, Version 2.1, the GNU Affero General Public
76 License, Version 3.0, or any later versions of those licenses.
77
781.13. "Source Code Form"
79
80 means the form of the work preferred for making modifications.
81
821.14. "You" (or "Your")
83
84 means an individual or a legal entity exercising rights under this
85 License. For legal entities, "You" includes any entity that controls, is
86 controlled by, or is under common control with You. For purposes of this
87 definition, "control" means (a) the power, direct or indirect, to cause
88 the direction or management of such entity, whether by contract or
89 otherwise, or (b) ownership of more than fifty percent (50%) of the
90 outstanding shares or beneficial ownership of such entity.
91
92
932. License Grants and Conditions
94
952.1. Grants
96
97 Each Contributor hereby grants You a world-wide, royalty-free,
98 non-exclusive license:
99
100 a. under intellectual property rights (other than patent or trademark)
101 Licensable by such Contributor to use, reproduce, make available,
102 modify, display, perform, distribute, and otherwise exploit its
103 Contributions, either on an unmodified basis, with Modifications, or
104 as part of a Larger Work; and
105
106 b. under Patent Claims of such Contributor to make, use, sell, offer for
107 sale, have made, import, and otherwise transfer either its
108 Contributions or its Contributor Version.
109
1102.2. Effective Date
111
112 The licenses granted in Section 2.1 with respect to any Contribution
113 become effective for each Contribution on the date the Contributor first
114 distributes such Contribution.
115
1162.3. Limitations on Grant Scope
117
118 The licenses granted in this Section 2 are the only rights granted under
119 this License. No additional rights or licenses will be implied from the
120 distribution or licensing of Covered Software under this License.
121 Notwithstanding Section 2.1(b) above, no patent license is granted by a
122 Contributor:
123
124 a. for any code that a Contributor has removed from Covered Software; or
125
126 b. for infringements caused by: (i) Your and any other third party's
127 modifications of Covered Software, or (ii) the combination of its
128 Contributions with other software (except as part of its Contributor
129 Version); or
130
131 c. under Patent Claims infringed by Covered Software in the absence of
132 its Contributions.
133
134 This License does not grant any rights in the trademarks, service marks,
135 or logos of any Contributor (except as may be necessary to comply with
136 the notice requirements in Section 3.4).
137
1382.4. Subsequent Licenses
139
140 No Contributor makes additional grants as a result of Your choice to
141 distribute the Covered Software under a subsequent version of this
142 License (see Section 10.2) or under the terms of a Secondary License (if
143 permitted under the terms of Section 3.3).
144
1452.5. Representation
146
147 Each Contributor represents that the Contributor believes its
148 Contributions are its original creation(s) or it has sufficient rights to
149 grant the rights to its Contributions conveyed by this License.
150
1512.6. Fair Use
152
153 This License is not intended to limit any rights You have under
154 applicable copyright doctrines of fair use, fair dealing, or other
155 equivalents.
156
1572.7. Conditions
158
159 Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted in
160 Section 2.1.
161
162
1633. Responsibilities
164
1653.1. Distribution of Source Form
166
167 All distribution of Covered Software in Source Code Form, including any
168 Modifications that You create or to which You contribute, must be under
169 the terms of this License. You must inform recipients that the Source
170 Code Form of the Covered Software is governed by the terms of this
171 License, and how they can obtain a copy of this License. You may not
172 attempt to alter or restrict the recipients' rights in the Source Code
173 Form.
174
1753.2. Distribution of Executable Form
176
177 If You distribute Covered Software in Executable Form then:
178
179 a. such Covered Software must also be made available in Source Code Form,
180 as described in Section 3.1, and You must inform recipients of the
181 Executable Form how they can obtain a copy of such Source Code Form by
182 reasonable means in a timely manner, at a charge no more than the cost
183 of distribution to the recipient; and
184
185 b. You may distribute such Executable Form under the terms of this
186 License, or sublicense it under different terms, provided that the
187 license for the Executable Form does not attempt to limit or alter the
188 recipients' rights in the Source Code Form under this License.
189
1903.3. Distribution of a Larger Work
191
192 You may create and distribute a Larger Work under terms of Your choice,
193 provided that You also comply with the requirements of this License for
194 the Covered Software. If the Larger Work is a combination of Covered
195 Software with a work governed by one or more Secondary Licenses, and the
196 Covered Software is not Incompatible With Secondary Licenses, this
197 License permits You to additionally distribute such Covered Software
198 under the terms of such Secondary License(s), so that the recipient of
199 the Larger Work may, at their option, further distribute the Covered
200 Software under the terms of either this License or such Secondary
201 License(s).
202
2033.4. Notices
204
205 You may not remove or alter the substance of any license notices
206 (including copyright notices, patent notices, disclaimers of warranty, or
207 limitations of liability) contained within the Source Code Form of the
208 Covered Software, except that You may alter any license notices to the
209 extent required to remedy known factual inaccuracies.
210
2113.5. Application of Additional Terms
212
213 You may choose to offer, and to charge a fee for, warranty, support,
214 indemnity or liability obligations to one or more recipients of Covered
215 Software. However, You may do so only on Your own behalf, and not on
216 behalf of any Contributor. You must make it absolutely clear that any
217 such warranty, support, indemnity, or liability obligation is offered by
218 You alone, and You hereby agree to indemnify every Contributor for any
219 liability incurred by such Contributor as a result of warranty, support,
220 indemnity or liability terms You offer. You may include additional
221 disclaimers of warranty and limitations of liability specific to any
222 jurisdiction.
223
2244. Inability to Comply Due to Statute or Regulation
225
226 If it is impossible for You to comply with any of the terms of this License
227 with respect to some or all of the Covered Software due to statute,
228 judicial order, or regulation then You must: (a) comply with the terms of
229 this License to the maximum extent possible; and (b) describe the
230 limitations and the code they affect. Such description must be placed in a
231 text file included with all distributions of the Covered Software under
232 this License. Except to the extent prohibited by statute or regulation,
233 such description must be sufficiently detailed for a recipient of ordinary
234 skill to be able to understand it.
235
2365. Termination
237
2385.1. The rights granted under this License will terminate automatically if You
239 fail to comply with any of its terms. However, if You become compliant,
240 then the rights granted under this License from a particular Contributor
241 are reinstated (a) provisionally, unless and until such Contributor
242 explicitly and finally terminates Your grants, and (b) on an ongoing
243 basis, if such Contributor fails to notify You of the non-compliance by
244 some reasonable means prior to 60 days after You have come back into
245 compliance. Moreover, Your grants from a particular Contributor are
246 reinstated on an ongoing basis if such Contributor notifies You of the
247 non-compliance by some reasonable means, this is the first time You have
248 received notice of non-compliance with this License from such
249 Contributor, and You become compliant prior to 30 days after Your receipt
250 of the notice.
251
2525.2. If You initiate litigation against any entity by asserting a patent
253 infringement claim (excluding declaratory judgment actions,
254 counter-claims, and cross-claims) alleging that a Contributor Version
255 directly or indirectly infringes any patent, then the rights granted to
256 You by any and all Contributors for the Covered Software under Section
257 2.1 of this License shall terminate.
258
2595.3. In the event of termination under Sections 5.1 or 5.2 above, all end user
260 license agreements (excluding distributors and resellers) which have been
261 validly granted by You or Your distributors under this License prior to
262 termination shall survive termination.
263
2646. Disclaimer of Warranty
265
266 Covered Software is provided under this License on an "as is" basis,
267 without warranty of any kind, either expressed, implied, or statutory,
268 including, without limitation, warranties that the Covered Software is free
269 of defects, merchantable, fit for a particular purpose or non-infringing.
270 The entire risk as to the quality and performance of the Covered Software
271 is with You. Should any Covered Software prove defective in any respect,
272 You (not any Contributor) assume the cost of any necessary servicing,
273 repair, or correction. This disclaimer of warranty constitutes an essential
274 part of this License. No use of any Covered Software is authorized under
275 this License except under this disclaimer.
276
2777. Limitation of Liability
278
279 Under no circumstances and under no legal theory, whether tort (including
280 negligence), contract, or otherwise, shall any Contributor, or anyone who
281 distributes Covered Software as permitted above, be liable to You for any
282 direct, indirect, special, incidental, or consequential damages of any
283 character including, without limitation, damages for lost profits, loss of
284 goodwill, work stoppage, computer failure or malfunction, or any and all
285 other commercial damages or losses, even if such party shall have been
286 informed of the possibility of such damages. This limitation of liability
287 shall not apply to liability for death or personal injury resulting from
288 such party's negligence to the extent applicable law prohibits such
289 limitation. Some jurisdictions do not allow the exclusion or limitation of
290 incidental or consequential damages, so this exclusion and limitation may
291 not apply to You.
292
2938. Litigation
294
295 Any litigation relating to this License may be brought only in the courts
296 of a jurisdiction where the defendant maintains its principal place of
297 business and such litigation shall be governed by laws of that
298 jurisdiction, without reference to its conflict-of-law provisions. Nothing
299 in this Section shall prevent a party's ability to bring cross-claims or
300 counter-claims.
301
3029. Miscellaneous
303
304 This License represents the complete agreement concerning the subject
305 matter hereof. If any provision of this License is held to be
306 unenforceable, such provision shall be reformed only to the extent
307 necessary to make it enforceable. Any law or regulation which provides that
308 the language of a contract shall be construed against the drafter shall not
309 be used to construe this License against a Contributor.
310
311
31210. Versions of the License
313
31410.1. New Versions
315
316 Mozilla Foundation is the license steward. Except as provided in Section
317 10.3, no one other than the license steward has the right to modify or
318 publish new versions of this License. Each version will be given a
319 distinguishing version number.
320
32110.2. Effect of New Versions
322
323 You may distribute the Covered Software under the terms of the version
324 of the License under which You originally received the Covered Software,
325 or under the terms of any subsequent version published by the license
326 steward.
327
32810.3. Modified Versions
329
330 If you create software not governed by this License, and you want to
331 create a new license for such software, you may create and use a
332 modified version of this License if you rename the license and remove
333 any references to the name of the license steward (except to note that
334 such modified license differs from this License).
335
33610.4. Distributing Source Code Form that is Incompatible With Secondary
337 Licenses If You choose to distribute Source Code Form that is
338 Incompatible With Secondary Licenses under the terms of this version of
339 the License, the notice described in Exhibit B of this License must be
340 attached.
341
342Exhibit A - Source Code Form License Notice
343
344 This Source Code Form is subject to the
345 terms of the Mozilla Public License, v.
346 2.0. If a copy of the MPL was not
347 distributed with this file, You can
348 obtain one at
349 http://mozilla.org/MPL/2.0/.
350
351If it is not possible or desirable to put the notice in a particular file,
352then You may include the notice in a location (such as a LICENSE file in a
353relevant directory) where a recipient would be likely to look for such a
354notice.
355
356You may add additional accurate notices of copyright ownership.
357
358Exhibit B - "Incompatible With Secondary Licenses" Notice
359
360 This Source Code Form is "Incompatible
361 With Secondary Licenses", as defined by
362 the Mozilla Public License, v. 2.0. \ No newline at end of file
diff --git a/vendor/github.com/hashicorp/yamux/README.md b/vendor/github.com/hashicorp/yamux/README.md
new file mode 100644
index 0000000..d4db7fc
--- /dev/null
+++ b/vendor/github.com/hashicorp/yamux/README.md
@@ -0,0 +1,86 @@
1# Yamux
2
3Yamux (Yet another Multiplexer) is a multiplexing library for Golang.
4It relies on an underlying connection to provide reliability
5and ordering, such as TCP or Unix domain sockets, and provides
6stream-oriented multiplexing. It is inspired by SPDY but is not
7interoperable with it.
8
9Yamux features include:
10
11* Bi-directional streams
12 * Streams can be opened by either client or server
13 * Useful for NAT traversal
14 * Server-side push support
15* Flow control
16 * Avoid starvation
17 * Back-pressure to prevent overwhelming a receiver
18* Keep Alives
19 * Enables persistent connections over a load balancer
20* Efficient
21 * Enables thousands of logical streams with low overhead
22
23## Documentation
24
25For complete documentation, see the associated [Godoc](http://godoc.org/github.com/hashicorp/yamux).
26
27## Specification
28
29The full specification for Yamux is provided in the `spec.md` file.
30It can be used as a guide to implementors of interoperable libraries.
31
32## Usage
33
34Using Yamux is remarkably simple:
35
36```go
37
38func client() {
39 // Get a TCP connection
40 conn, err := net.Dial(...)
41 if err != nil {
42 panic(err)
43 }
44
45 // Setup client side of yamux
46 session, err := yamux.Client(conn, nil)
47 if err != nil {
48 panic(err)
49 }
50
51 // Open a new stream
52 stream, err := session.Open()
53 if err != nil {
54 panic(err)
55 }
56
57 // Stream implements net.Conn
58 stream.Write([]byte("ping"))
59}
60
61func server() {
62 // Accept a TCP connection
63 conn, err := listener.Accept()
64 if err != nil {
65 panic(err)
66 }
67
68 // Setup server side of yamux
69 session, err := yamux.Server(conn, nil)
70 if err != nil {
71 panic(err)
72 }
73
74 // Accept a stream
75 stream, err := session.Accept()
76 if err != nil {
77 panic(err)
78 }
79
80 // Listen for a message
81 buf := make([]byte, 4)
82 stream.Read(buf)
83}
84
85```
86
diff --git a/vendor/github.com/hashicorp/yamux/addr.go b/vendor/github.com/hashicorp/yamux/addr.go
new file mode 100644
index 0000000..be6ebca
--- /dev/null
+++ b/vendor/github.com/hashicorp/yamux/addr.go
@@ -0,0 +1,60 @@
1package yamux
2
3import (
4 "fmt"
5 "net"
6)
7
8// hasAddr is used to get the address from the underlying connection
9type hasAddr interface {
10 LocalAddr() net.Addr
11 RemoteAddr() net.Addr
12}
13
14// yamuxAddr is used when we cannot get the underlying address
15type yamuxAddr struct {
16 Addr string
17}
18
19func (*yamuxAddr) Network() string {
20 return "yamux"
21}
22
23func (y *yamuxAddr) String() string {
24 return fmt.Sprintf("yamux:%s", y.Addr)
25}
26
27// Addr is used to get the address of the listener.
28func (s *Session) Addr() net.Addr {
29 return s.LocalAddr()
30}
31
32// LocalAddr is used to get the local address of the
33// underlying connection.
34func (s *Session) LocalAddr() net.Addr {
35 addr, ok := s.conn.(hasAddr)
36 if !ok {
37 return &yamuxAddr{"local"}
38 }
39 return addr.LocalAddr()
40}
41
42// RemoteAddr is used to get the address of remote end
43// of the underlying connection
44func (s *Session) RemoteAddr() net.Addr {
45 addr, ok := s.conn.(hasAddr)
46 if !ok {
47 return &yamuxAddr{"remote"}
48 }
49 return addr.RemoteAddr()
50}
51
52// LocalAddr returns the local address
53func (s *Stream) LocalAddr() net.Addr {
54 return s.session.LocalAddr()
55}
56
57// LocalAddr returns the remote address
58func (s *Stream) RemoteAddr() net.Addr {
59 return s.session.RemoteAddr()
60}
diff --git a/vendor/github.com/hashicorp/yamux/const.go b/vendor/github.com/hashicorp/yamux/const.go
new file mode 100644
index 0000000..4f52938
--- /dev/null
+++ b/vendor/github.com/hashicorp/yamux/const.go
@@ -0,0 +1,157 @@
1package yamux
2
3import (
4 "encoding/binary"
5 "fmt"
6)
7
8var (
9 // ErrInvalidVersion means we received a frame with an
10 // invalid version
11 ErrInvalidVersion = fmt.Errorf("invalid protocol version")
12
13 // ErrInvalidMsgType means we received a frame with an
14 // invalid message type
15 ErrInvalidMsgType = fmt.Errorf("invalid msg type")
16
17 // ErrSessionShutdown is used if there is a shutdown during
18 // an operation
19 ErrSessionShutdown = fmt.Errorf("session shutdown")
20
21 // ErrStreamsExhausted is returned if we have no more
22 // stream ids to issue
23 ErrStreamsExhausted = fmt.Errorf("streams exhausted")
24
25 // ErrDuplicateStream is used if a duplicate stream is
26 // opened inbound
27 ErrDuplicateStream = fmt.Errorf("duplicate stream initiated")
28
29 // ErrReceiveWindowExceeded indicates the window was exceeded
30 ErrRecvWindowExceeded = fmt.Errorf("recv window exceeded")
31
32 // ErrTimeout is used when we reach an IO deadline
33 ErrTimeout = fmt.Errorf("i/o deadline reached")
34
35 // ErrStreamClosed is returned when using a closed stream
36 ErrStreamClosed = fmt.Errorf("stream closed")
37
38 // ErrUnexpectedFlag is set when we get an unexpected flag
39 ErrUnexpectedFlag = fmt.Errorf("unexpected flag")
40
41 // ErrRemoteGoAway is used when we get a go away from the other side
42 ErrRemoteGoAway = fmt.Errorf("remote end is not accepting connections")
43
44 // ErrConnectionReset is sent if a stream is reset. This can happen
45 // if the backlog is exceeded, or if there was a remote GoAway.
46 ErrConnectionReset = fmt.Errorf("connection reset")
47
48 // ErrConnectionWriteTimeout indicates that we hit the "safety valve"
49 // timeout writing to the underlying stream connection.
50 ErrConnectionWriteTimeout = fmt.Errorf("connection write timeout")
51
52 // ErrKeepAliveTimeout is sent if a missed keepalive caused the stream close
53 ErrKeepAliveTimeout = fmt.Errorf("keepalive timeout")
54)
55
56const (
57 // protoVersion is the only version we support
58 protoVersion uint8 = 0
59)
60
61const (
62 // Data is used for data frames. They are followed
63 // by length bytes worth of payload.
64 typeData uint8 = iota
65
66 // WindowUpdate is used to change the window of
67 // a given stream. The length indicates the delta
68 // update to the window.
69 typeWindowUpdate
70
71 // Ping is sent as a keep-alive or to measure
72 // the RTT. The StreamID and Length value are echoed
73 // back in the response.
74 typePing
75
76 // GoAway is sent to terminate a session. The StreamID
77 // should be 0 and the length is an error code.
78 typeGoAway
79)
80
81const (
82 // SYN is sent to signal a new stream. May
83 // be sent with a data payload
84 flagSYN uint16 = 1 << iota
85
86 // ACK is sent to acknowledge a new stream. May
87 // be sent with a data payload
88 flagACK
89
90 // FIN is sent to half-close the given stream.
91 // May be sent with a data payload.
92 flagFIN
93
94 // RST is used to hard close a given stream.
95 flagRST
96)
97
98const (
99 // initialStreamWindow is the initial stream window size
100 initialStreamWindow uint32 = 256 * 1024
101)
102
103const (
104 // goAwayNormal is sent on a normal termination
105 goAwayNormal uint32 = iota
106
107 // goAwayProtoErr sent on a protocol error
108 goAwayProtoErr
109
110 // goAwayInternalErr sent on an internal error
111 goAwayInternalErr
112)
113
114const (
115 sizeOfVersion = 1
116 sizeOfType = 1
117 sizeOfFlags = 2
118 sizeOfStreamID = 4
119 sizeOfLength = 4
120 headerSize = sizeOfVersion + sizeOfType + sizeOfFlags +
121 sizeOfStreamID + sizeOfLength
122)
123
124type header []byte
125
126func (h header) Version() uint8 {
127 return h[0]
128}
129
130func (h header) MsgType() uint8 {
131 return h[1]
132}
133
134func (h header) Flags() uint16 {
135 return binary.BigEndian.Uint16(h[2:4])
136}
137
138func (h header) StreamID() uint32 {
139 return binary.BigEndian.Uint32(h[4:8])
140}
141
142func (h header) Length() uint32 {
143 return binary.BigEndian.Uint32(h[8:12])
144}
145
146func (h header) String() string {
147 return fmt.Sprintf("Vsn:%d Type:%d Flags:%d StreamID:%d Length:%d",
148 h.Version(), h.MsgType(), h.Flags(), h.StreamID(), h.Length())
149}
150
151func (h header) encode(msgType uint8, flags uint16, streamID uint32, length uint32) {
152 h[0] = protoVersion
153 h[1] = msgType
154 binary.BigEndian.PutUint16(h[2:4], flags)
155 binary.BigEndian.PutUint32(h[4:8], streamID)
156 binary.BigEndian.PutUint32(h[8:12], length)
157}
diff --git a/vendor/github.com/hashicorp/yamux/mux.go b/vendor/github.com/hashicorp/yamux/mux.go
new file mode 100644
index 0000000..7abc7c7
--- /dev/null
+++ b/vendor/github.com/hashicorp/yamux/mux.go
@@ -0,0 +1,87 @@
1package yamux
2
3import (
4 "fmt"
5 "io"
6 "os"
7 "time"
8)
9
10// Config is used to tune the Yamux session
11type Config struct {
12 // AcceptBacklog is used to limit how many streams may be
13 // waiting an accept.
14 AcceptBacklog int
15
16 // EnableKeepalive is used to do a period keep alive
17 // messages using a ping.
18 EnableKeepAlive bool
19
20 // KeepAliveInterval is how often to perform the keep alive
21 KeepAliveInterval time.Duration
22
23 // ConnectionWriteTimeout is meant to be a "safety valve" timeout after
24 // we which will suspect a problem with the underlying connection and
25 // close it. This is only applied to writes, where's there's generally
26 // an expectation that things will move along quickly.
27 ConnectionWriteTimeout time.Duration
28
29 // MaxStreamWindowSize is used to control the maximum
30 // window size that we allow for a stream.
31 MaxStreamWindowSize uint32
32
33 // LogOutput is used to control the log destination
34 LogOutput io.Writer
35}
36
37// DefaultConfig is used to return a default configuration
38func DefaultConfig() *Config {
39 return &Config{
40 AcceptBacklog: 256,
41 EnableKeepAlive: true,
42 KeepAliveInterval: 30 * time.Second,
43 ConnectionWriteTimeout: 10 * time.Second,
44 MaxStreamWindowSize: initialStreamWindow,
45 LogOutput: os.Stderr,
46 }
47}
48
49// VerifyConfig is used to verify the sanity of configuration
50func VerifyConfig(config *Config) error {
51 if config.AcceptBacklog <= 0 {
52 return fmt.Errorf("backlog must be positive")
53 }
54 if config.KeepAliveInterval == 0 {
55 return fmt.Errorf("keep-alive interval must be positive")
56 }
57 if config.MaxStreamWindowSize < initialStreamWindow {
58 return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow)
59 }
60 return nil
61}
62
63// Server is used to initialize a new server-side connection.
64// There must be at most one server-side connection. If a nil config is
65// provided, the DefaultConfiguration will be used.
66func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) {
67 if config == nil {
68 config = DefaultConfig()
69 }
70 if err := VerifyConfig(config); err != nil {
71 return nil, err
72 }
73 return newSession(config, conn, false), nil
74}
75
76// Client is used to initialize a new client-side connection.
77// There must be at most one client-side connection.
78func Client(conn io.ReadWriteCloser, config *Config) (*Session, error) {
79 if config == nil {
80 config = DefaultConfig()
81 }
82
83 if err := VerifyConfig(config); err != nil {
84 return nil, err
85 }
86 return newSession(config, conn, true), nil
87}
diff --git a/vendor/github.com/hashicorp/yamux/session.go b/vendor/github.com/hashicorp/yamux/session.go
new file mode 100644
index 0000000..e179818
--- /dev/null
+++ b/vendor/github.com/hashicorp/yamux/session.go
@@ -0,0 +1,623 @@
1package yamux
2
3import (
4 "bufio"
5 "fmt"
6 "io"
7 "io/ioutil"
8 "log"
9 "math"
10 "net"
11 "strings"
12 "sync"
13 "sync/atomic"
14 "time"
15)
16
17// Session is used to wrap a reliable ordered connection and to
18// multiplex it into multiple streams.
19type Session struct {
20 // remoteGoAway indicates the remote side does
21 // not want futher connections. Must be first for alignment.
22 remoteGoAway int32
23
24 // localGoAway indicates that we should stop
25 // accepting futher connections. Must be first for alignment.
26 localGoAway int32
27
28 // nextStreamID is the next stream we should
29 // send. This depends if we are a client/server.
30 nextStreamID uint32
31
32 // config holds our configuration
33 config *Config
34
35 // logger is used for our logs
36 logger *log.Logger
37
38 // conn is the underlying connection
39 conn io.ReadWriteCloser
40
41 // bufRead is a buffered reader
42 bufRead *bufio.Reader
43
44 // pings is used to track inflight pings
45 pings map[uint32]chan struct{}
46 pingID uint32
47 pingLock sync.Mutex
48
49 // streams maps a stream id to a stream, and inflight has an entry
50 // for any outgoing stream that has not yet been established. Both are
51 // protected by streamLock.
52 streams map[uint32]*Stream
53 inflight map[uint32]struct{}
54 streamLock sync.Mutex
55
56 // synCh acts like a semaphore. It is sized to the AcceptBacklog which
57 // is assumed to be symmetric between the client and server. This allows
58 // the client to avoid exceeding the backlog and instead blocks the open.
59 synCh chan struct{}
60
61 // acceptCh is used to pass ready streams to the client
62 acceptCh chan *Stream
63
64 // sendCh is used to mark a stream as ready to send,
65 // or to send a header out directly.
66 sendCh chan sendReady
67
68 // recvDoneCh is closed when recv() exits to avoid a race
69 // between stream registration and stream shutdown
70 recvDoneCh chan struct{}
71
72 // shutdown is used to safely close a session
73 shutdown bool
74 shutdownErr error
75 shutdownCh chan struct{}
76 shutdownLock sync.Mutex
77}
78
79// sendReady is used to either mark a stream as ready
80// or to directly send a header
81type sendReady struct {
82 Hdr []byte
83 Body io.Reader
84 Err chan error
85}
86
87// newSession is used to construct a new session
88func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
89 s := &Session{
90 config: config,
91 logger: log.New(config.LogOutput, "", log.LstdFlags),
92 conn: conn,
93 bufRead: bufio.NewReader(conn),
94 pings: make(map[uint32]chan struct{}),
95 streams: make(map[uint32]*Stream),
96 inflight: make(map[uint32]struct{}),
97 synCh: make(chan struct{}, config.AcceptBacklog),
98 acceptCh: make(chan *Stream, config.AcceptBacklog),
99 sendCh: make(chan sendReady, 64),
100 recvDoneCh: make(chan struct{}),
101 shutdownCh: make(chan struct{}),
102 }
103 if client {
104 s.nextStreamID = 1
105 } else {
106 s.nextStreamID = 2
107 }
108 go s.recv()
109 go s.send()
110 if config.EnableKeepAlive {
111 go s.keepalive()
112 }
113 return s
114}
115
116// IsClosed does a safe check to see if we have shutdown
117func (s *Session) IsClosed() bool {
118 select {
119 case <-s.shutdownCh:
120 return true
121 default:
122 return false
123 }
124}
125
126// NumStreams returns the number of currently open streams
127func (s *Session) NumStreams() int {
128 s.streamLock.Lock()
129 num := len(s.streams)
130 s.streamLock.Unlock()
131 return num
132}
133
134// Open is used to create a new stream as a net.Conn
135func (s *Session) Open() (net.Conn, error) {
136 conn, err := s.OpenStream()
137 if err != nil {
138 return nil, err
139 }
140 return conn, nil
141}
142
143// OpenStream is used to create a new stream
144func (s *Session) OpenStream() (*Stream, error) {
145 if s.IsClosed() {
146 return nil, ErrSessionShutdown
147 }
148 if atomic.LoadInt32(&s.remoteGoAway) == 1 {
149 return nil, ErrRemoteGoAway
150 }
151
152 // Block if we have too many inflight SYNs
153 select {
154 case s.synCh <- struct{}{}:
155 case <-s.shutdownCh:
156 return nil, ErrSessionShutdown
157 }
158
159GET_ID:
160 // Get an ID, and check for stream exhaustion
161 id := atomic.LoadUint32(&s.nextStreamID)
162 if id >= math.MaxUint32-1 {
163 return nil, ErrStreamsExhausted
164 }
165 if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) {
166 goto GET_ID
167 }
168
169 // Register the stream
170 stream := newStream(s, id, streamInit)
171 s.streamLock.Lock()
172 s.streams[id] = stream
173 s.inflight[id] = struct{}{}
174 s.streamLock.Unlock()
175
176 // Send the window update to create
177 if err := stream.sendWindowUpdate(); err != nil {
178 select {
179 case <-s.synCh:
180 default:
181 s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore")
182 }
183 return nil, err
184 }
185 return stream, nil
186}
187
188// Accept is used to block until the next available stream
189// is ready to be accepted.
190func (s *Session) Accept() (net.Conn, error) {
191 conn, err := s.AcceptStream()
192 if err != nil {
193 return nil, err
194 }
195 return conn, err
196}
197
198// AcceptStream is used to block until the next available stream
199// is ready to be accepted.
200func (s *Session) AcceptStream() (*Stream, error) {
201 select {
202 case stream := <-s.acceptCh:
203 if err := stream.sendWindowUpdate(); err != nil {
204 return nil, err
205 }
206 return stream, nil
207 case <-s.shutdownCh:
208 return nil, s.shutdownErr
209 }
210}
211
212// Close is used to close the session and all streams.
213// Attempts to send a GoAway before closing the connection.
214func (s *Session) Close() error {
215 s.shutdownLock.Lock()
216 defer s.shutdownLock.Unlock()
217
218 if s.shutdown {
219 return nil
220 }
221 s.shutdown = true
222 if s.shutdownErr == nil {
223 s.shutdownErr = ErrSessionShutdown
224 }
225 close(s.shutdownCh)
226 s.conn.Close()
227 <-s.recvDoneCh
228
229 s.streamLock.Lock()
230 defer s.streamLock.Unlock()
231 for _, stream := range s.streams {
232 stream.forceClose()
233 }
234 return nil
235}
236
237// exitErr is used to handle an error that is causing the
238// session to terminate.
239func (s *Session) exitErr(err error) {
240 s.shutdownLock.Lock()
241 if s.shutdownErr == nil {
242 s.shutdownErr = err
243 }
244 s.shutdownLock.Unlock()
245 s.Close()
246}
247
248// GoAway can be used to prevent accepting further
249// connections. It does not close the underlying conn.
250func (s *Session) GoAway() error {
251 return s.waitForSend(s.goAway(goAwayNormal), nil)
252}
253
254// goAway is used to send a goAway message
255func (s *Session) goAway(reason uint32) header {
256 atomic.SwapInt32(&s.localGoAway, 1)
257 hdr := header(make([]byte, headerSize))
258 hdr.encode(typeGoAway, 0, 0, reason)
259 return hdr
260}
261
262// Ping is used to measure the RTT response time
263func (s *Session) Ping() (time.Duration, error) {
264 // Get a channel for the ping
265 ch := make(chan struct{})
266
267 // Get a new ping id, mark as pending
268 s.pingLock.Lock()
269 id := s.pingID
270 s.pingID++
271 s.pings[id] = ch
272 s.pingLock.Unlock()
273
274 // Send the ping request
275 hdr := header(make([]byte, headerSize))
276 hdr.encode(typePing, flagSYN, 0, id)
277 if err := s.waitForSend(hdr, nil); err != nil {
278 return 0, err
279 }
280
281 // Wait for a response
282 start := time.Now()
283 select {
284 case <-ch:
285 case <-time.After(s.config.ConnectionWriteTimeout):
286 s.pingLock.Lock()
287 delete(s.pings, id) // Ignore it if a response comes later.
288 s.pingLock.Unlock()
289 return 0, ErrTimeout
290 case <-s.shutdownCh:
291 return 0, ErrSessionShutdown
292 }
293
294 // Compute the RTT
295 return time.Now().Sub(start), nil
296}
297
298// keepalive is a long running goroutine that periodically does
299// a ping to keep the connection alive.
300func (s *Session) keepalive() {
301 for {
302 select {
303 case <-time.After(s.config.KeepAliveInterval):
304 _, err := s.Ping()
305 if err != nil {
306 s.logger.Printf("[ERR] yamux: keepalive failed: %v", err)
307 s.exitErr(ErrKeepAliveTimeout)
308 return
309 }
310 case <-s.shutdownCh:
311 return
312 }
313 }
314}
315
316// waitForSendErr waits to send a header, checking for a potential shutdown
317func (s *Session) waitForSend(hdr header, body io.Reader) error {
318 errCh := make(chan error, 1)
319 return s.waitForSendErr(hdr, body, errCh)
320}
321
322// waitForSendErr waits to send a header with optional data, checking for a
323// potential shutdown. Since there's the expectation that sends can happen
324// in a timely manner, we enforce the connection write timeout here.
325func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error {
326 timer := time.NewTimer(s.config.ConnectionWriteTimeout)
327 defer timer.Stop()
328
329 ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
330 select {
331 case s.sendCh <- ready:
332 case <-s.shutdownCh:
333 return ErrSessionShutdown
334 case <-timer.C:
335 return ErrConnectionWriteTimeout
336 }
337
338 select {
339 case err := <-errCh:
340 return err
341 case <-s.shutdownCh:
342 return ErrSessionShutdown
343 case <-timer.C:
344 return ErrConnectionWriteTimeout
345 }
346}
347
348// sendNoWait does a send without waiting. Since there's the expectation that
349// the send happens right here, we enforce the connection write timeout if we
350// can't queue the header to be sent.
351func (s *Session) sendNoWait(hdr header) error {
352 timer := time.NewTimer(s.config.ConnectionWriteTimeout)
353 defer timer.Stop()
354
355 select {
356 case s.sendCh <- sendReady{Hdr: hdr}:
357 return nil
358 case <-s.shutdownCh:
359 return ErrSessionShutdown
360 case <-timer.C:
361 return ErrConnectionWriteTimeout
362 }
363}
364
365// send is a long running goroutine that sends data
366func (s *Session) send() {
367 for {
368 select {
369 case ready := <-s.sendCh:
370 // Send a header if ready
371 if ready.Hdr != nil {
372 sent := 0
373 for sent < len(ready.Hdr) {
374 n, err := s.conn.Write(ready.Hdr[sent:])
375 if err != nil {
376 s.logger.Printf("[ERR] yamux: Failed to write header: %v", err)
377 asyncSendErr(ready.Err, err)
378 s.exitErr(err)
379 return
380 }
381 sent += n
382 }
383 }
384
385 // Send data from a body if given
386 if ready.Body != nil {
387 _, err := io.Copy(s.conn, ready.Body)
388 if err != nil {
389 s.logger.Printf("[ERR] yamux: Failed to write body: %v", err)
390 asyncSendErr(ready.Err, err)
391 s.exitErr(err)
392 return
393 }
394 }
395
396 // No error, successful send
397 asyncSendErr(ready.Err, nil)
398 case <-s.shutdownCh:
399 return
400 }
401 }
402}
403
404// recv is a long running goroutine that accepts new data
405func (s *Session) recv() {
406 if err := s.recvLoop(); err != nil {
407 s.exitErr(err)
408 }
409}
410
411// recvLoop continues to receive data until a fatal error is encountered
412func (s *Session) recvLoop() error {
413 defer close(s.recvDoneCh)
414 hdr := header(make([]byte, headerSize))
415 var handler func(header) error
416 for {
417 // Read the header
418 if _, err := io.ReadFull(s.bufRead, hdr); err != nil {
419 if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") {
420 s.logger.Printf("[ERR] yamux: Failed to read header: %v", err)
421 }
422 return err
423 }
424
425 // Verify the version
426 if hdr.Version() != protoVersion {
427 s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version())
428 return ErrInvalidVersion
429 }
430
431 // Switch on the type
432 switch hdr.MsgType() {
433 case typeData:
434 handler = s.handleStreamMessage
435 case typeWindowUpdate:
436 handler = s.handleStreamMessage
437 case typeGoAway:
438 handler = s.handleGoAway
439 case typePing:
440 handler = s.handlePing
441 default:
442 return ErrInvalidMsgType
443 }
444
445 // Invoke the handler
446 if err := handler(hdr); err != nil {
447 return err
448 }
449 }
450}
451
452// handleStreamMessage handles either a data or window update frame
453func (s *Session) handleStreamMessage(hdr header) error {
454 // Check for a new stream creation
455 id := hdr.StreamID()
456 flags := hdr.Flags()
457 if flags&flagSYN == flagSYN {
458 if err := s.incomingStream(id); err != nil {
459 return err
460 }
461 }
462
463 // Get the stream
464 s.streamLock.Lock()
465 stream := s.streams[id]
466 s.streamLock.Unlock()
467
468 // If we do not have a stream, likely we sent a RST
469 if stream == nil {
470 // Drain any data on the wire
471 if hdr.MsgType() == typeData && hdr.Length() > 0 {
472 s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id)
473 if _, err := io.CopyN(ioutil.Discard, s.bufRead, int64(hdr.Length())); err != nil {
474 s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err)
475 return nil
476 }
477 } else {
478 s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr)
479 }
480 return nil
481 }
482
483 // Check if this is a window update
484 if hdr.MsgType() == typeWindowUpdate {
485 if err := stream.incrSendWindow(hdr, flags); err != nil {
486 if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
487 s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
488 }
489 return err
490 }
491 return nil
492 }
493
494 // Read the new data
495 if err := stream.readData(hdr, flags, s.bufRead); err != nil {
496 if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
497 s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
498 }
499 return err
500 }
501 return nil
502}
503
504// handlePing is invokde for a typePing frame
505func (s *Session) handlePing(hdr header) error {
506 flags := hdr.Flags()
507 pingID := hdr.Length()
508
509 // Check if this is a query, respond back in a separate context so we
510 // don't interfere with the receiving thread blocking for the write.
511 if flags&flagSYN == flagSYN {
512 go func() {
513 hdr := header(make([]byte, headerSize))
514 hdr.encode(typePing, flagACK, 0, pingID)
515 if err := s.sendNoWait(hdr); err != nil {
516 s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err)
517 }
518 }()
519 return nil
520 }
521
522 // Handle a response
523 s.pingLock.Lock()
524 ch := s.pings[pingID]
525 if ch != nil {
526 delete(s.pings, pingID)
527 close(ch)
528 }
529 s.pingLock.Unlock()
530 return nil
531}
532
533// handleGoAway is invokde for a typeGoAway frame
534func (s *Session) handleGoAway(hdr header) error {
535 code := hdr.Length()
536 switch code {
537 case goAwayNormal:
538 atomic.SwapInt32(&s.remoteGoAway, 1)
539 case goAwayProtoErr:
540 s.logger.Printf("[ERR] yamux: received protocol error go away")
541 return fmt.Errorf("yamux protocol error")
542 case goAwayInternalErr:
543 s.logger.Printf("[ERR] yamux: received internal error go away")
544 return fmt.Errorf("remote yamux internal error")
545 default:
546 s.logger.Printf("[ERR] yamux: received unexpected go away")
547 return fmt.Errorf("unexpected go away received")
548 }
549 return nil
550}
551
552// incomingStream is used to create a new incoming stream
553func (s *Session) incomingStream(id uint32) error {
554 // Reject immediately if we are doing a go away
555 if atomic.LoadInt32(&s.localGoAway) == 1 {
556 hdr := header(make([]byte, headerSize))
557 hdr.encode(typeWindowUpdate, flagRST, id, 0)
558 return s.sendNoWait(hdr)
559 }
560
561 // Allocate a new stream
562 stream := newStream(s, id, streamSYNReceived)
563
564 s.streamLock.Lock()
565 defer s.streamLock.Unlock()
566
567 // Check if stream already exists
568 if _, ok := s.streams[id]; ok {
569 s.logger.Printf("[ERR] yamux: duplicate stream declared")
570 if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil {
571 s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr)
572 }
573 return ErrDuplicateStream
574 }
575
576 // Register the stream
577 s.streams[id] = stream
578
579 // Check if we've exceeded the backlog
580 select {
581 case s.acceptCh <- stream:
582 return nil
583 default:
584 // Backlog exceeded! RST the stream
585 s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset")
586 delete(s.streams, id)
587 stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0)
588 return s.sendNoWait(stream.sendHdr)
589 }
590}
591
592// closeStream is used to close a stream once both sides have
593// issued a close. If there was an in-flight SYN and the stream
594// was not yet established, then this will give the credit back.
595func (s *Session) closeStream(id uint32) {
596 s.streamLock.Lock()
597 if _, ok := s.inflight[id]; ok {
598 select {
599 case <-s.synCh:
600 default:
601 s.logger.Printf("[ERR] yamux: SYN tracking out of sync")
602 }
603 }
604 delete(s.streams, id)
605 s.streamLock.Unlock()
606}
607
608// establishStream is used to mark a stream that was in the
609// SYN Sent state as established.
610func (s *Session) establishStream(id uint32) {
611 s.streamLock.Lock()
612 if _, ok := s.inflight[id]; ok {
613 delete(s.inflight, id)
614 } else {
615 s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)")
616 }
617 select {
618 case <-s.synCh:
619 default:
620 s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)")
621 }
622 s.streamLock.Unlock()
623}
diff --git a/vendor/github.com/hashicorp/yamux/spec.md b/vendor/github.com/hashicorp/yamux/spec.md
new file mode 100644
index 0000000..183d797
--- /dev/null
+++ b/vendor/github.com/hashicorp/yamux/spec.md
@@ -0,0 +1,140 @@
1# Specification
2
3We use this document to detail the internal specification of Yamux.
4This is used both as a guide for implementing Yamux, but also for
5alternative interoperable libraries to be built.
6
7# Framing
8
9Yamux uses a streaming connection underneath, but imposes a message
10framing so that it can be shared between many logical streams. Each
11frame contains a header like:
12
13* Version (8 bits)
14* Type (8 bits)
15* Flags (16 bits)
16* StreamID (32 bits)
17* Length (32 bits)
18
19This means that each header has a 12 byte overhead.
20All fields are encoded in network order (big endian).
21Each field is described below:
22
23## Version Field
24
25The version field is used for future backward compatibility. At the
26current time, the field is always set to 0, to indicate the initial
27version.
28
29## Type Field
30
31The type field is used to switch the frame message type. The following
32message types are supported:
33
34* 0x0 Data - Used to transmit data. May transmit zero length payloads
35 depending on the flags.
36
37* 0x1 Window Update - Used to updated the senders receive window size.
38 This is used to implement per-session flow control.
39
40* 0x2 Ping - Used to measure RTT. It can also be used to heart-beat
41 and do keep-alives over TCP.
42
43* 0x3 Go Away - Used to close a session.
44
45## Flag Field
46
47The flags field is used to provide additional information related
48to the message type. The following flags are supported:
49
50* 0x1 SYN - Signals the start of a new stream. May be sent with a data or
51 window update message. Also sent with a ping to indicate outbound.
52
53* 0x2 ACK - Acknowledges the start of a new stream. May be sent with a data
54 or window update message. Also sent with a ping to indicate response.
55
56* 0x4 FIN - Performs a half-close of a stream. May be sent with a data
57 message or window update.
58
59* 0x8 RST - Reset a stream immediately. May be sent with a data or
60 window update message.
61
62## StreamID Field
63
64The StreamID field is used to identify the logical stream the frame
65is addressing. The client side should use odd ID's, and the server even.
66This prevents any collisions. Additionally, the 0 ID is reserved to represent
67the session.
68
69Both Ping and Go Away messages should always use the 0 StreamID.
70
71## Length Field
72
73The meaning of the length field depends on the message type:
74
75* Data - provides the length of bytes following the header
76* Window update - provides a delta update to the window size
77* Ping - Contains an opaque value, echoed back
78* Go Away - Contains an error code
79
80# Message Flow
81
82There is no explicit connection setup, as Yamux relies on an underlying
83transport to be provided. However, there is a distinction between client
84and server side of the connection.
85
86## Opening a stream
87
88To open a stream, an initial data or window update frame is sent
89with a new StreamID. The SYN flag should be set to signal a new stream.
90
91The receiver must then reply with either a data or window update frame
92with the StreamID along with the ACK flag to accept the stream or with
93the RST flag to reject the stream.
94
95Because we are relying on the reliable stream underneath, a connection
96can begin sending data once the SYN flag is sent. The corresponding
97ACK does not need to be received. This is particularly well suited
98for an RPC system where a client wants to open a stream and immediately
99fire a request without waiting for the RTT of the ACK.
100
101This does introduce the possibility of a connection being rejected
102after data has been sent already. This is a slight semantic difference
103from TCP, where the conection cannot be refused after it is opened.
104Clients should be prepared to handle this by checking for an error
105that indicates a RST was received.
106
107## Closing a stream
108
109To close a stream, either side sends a data or window update frame
110along with the FIN flag. This does a half-close indicating the sender
111will send no further data.
112
113Once both sides have closed the connection, the stream is closed.
114
115Alternatively, if an error occurs, the RST flag can be used to
116hard close a stream immediately.
117
118## Flow Control
119
120When Yamux is initially starts each stream with a 256KB window size.
121There is no window size for the session.
122
123To prevent the streams from stalling, window update frames should be
124sent regularly. Yamux can be configured to provide a larger limit for
125windows sizes. Both sides assume the initial 256KB window, but can
126immediately send a window update as part of the SYN/ACK indicating a
127larger window.
128
129Both sides should track the number of bytes sent in Data frames
130only, as only they are tracked as part of the window size.
131
132## Session termination
133
134When a session is being terminated, the Go Away message should
135be sent. The Length should be set to one of the following to
136provide an error code:
137
138* 0x0 Normal termination
139* 0x1 Protocol error
140* 0x2 Internal error
diff --git a/vendor/github.com/hashicorp/yamux/stream.go b/vendor/github.com/hashicorp/yamux/stream.go
new file mode 100644
index 0000000..d216e28
--- /dev/null
+++ b/vendor/github.com/hashicorp/yamux/stream.go
@@ -0,0 +1,457 @@
1package yamux
2
3import (
4 "bytes"
5 "io"
6 "sync"
7 "sync/atomic"
8 "time"
9)
10
11type streamState int
12
13const (
14 streamInit streamState = iota
15 streamSYNSent
16 streamSYNReceived
17 streamEstablished
18 streamLocalClose
19 streamRemoteClose
20 streamClosed
21 streamReset
22)
23
24// Stream is used to represent a logical stream
25// within a session.
26type Stream struct {
27 recvWindow uint32
28 sendWindow uint32
29
30 id uint32
31 session *Session
32
33 state streamState
34 stateLock sync.Mutex
35
36 recvBuf *bytes.Buffer
37 recvLock sync.Mutex
38
39 controlHdr header
40 controlErr chan error
41 controlHdrLock sync.Mutex
42
43 sendHdr header
44 sendErr chan error
45 sendLock sync.Mutex
46
47 recvNotifyCh chan struct{}
48 sendNotifyCh chan struct{}
49
50 readDeadline time.Time
51 writeDeadline time.Time
52}
53
54// newStream is used to construct a new stream within
55// a given session for an ID
56func newStream(session *Session, id uint32, state streamState) *Stream {
57 s := &Stream{
58 id: id,
59 session: session,
60 state: state,
61 controlHdr: header(make([]byte, headerSize)),
62 controlErr: make(chan error, 1),
63 sendHdr: header(make([]byte, headerSize)),
64 sendErr: make(chan error, 1),
65 recvWindow: initialStreamWindow,
66 sendWindow: initialStreamWindow,
67 recvNotifyCh: make(chan struct{}, 1),
68 sendNotifyCh: make(chan struct{}, 1),
69 }
70 return s
71}
72
73// Session returns the associated stream session
74func (s *Stream) Session() *Session {
75 return s.session
76}
77
78// StreamID returns the ID of this stream
79func (s *Stream) StreamID() uint32 {
80 return s.id
81}
82
83// Read is used to read from the stream
84func (s *Stream) Read(b []byte) (n int, err error) {
85 defer asyncNotify(s.recvNotifyCh)
86START:
87 s.stateLock.Lock()
88 switch s.state {
89 case streamLocalClose:
90 fallthrough
91 case streamRemoteClose:
92 fallthrough
93 case streamClosed:
94 s.recvLock.Lock()
95 if s.recvBuf == nil || s.recvBuf.Len() == 0 {
96 s.recvLock.Unlock()
97 s.stateLock.Unlock()
98 return 0, io.EOF
99 }
100 s.recvLock.Unlock()
101 case streamReset:
102 s.stateLock.Unlock()
103 return 0, ErrConnectionReset
104 }
105 s.stateLock.Unlock()
106
107 // If there is no data available, block
108 s.recvLock.Lock()
109 if s.recvBuf == nil || s.recvBuf.Len() == 0 {
110 s.recvLock.Unlock()
111 goto WAIT
112 }
113
114 // Read any bytes
115 n, _ = s.recvBuf.Read(b)
116 s.recvLock.Unlock()
117
118 // Send a window update potentially
119 err = s.sendWindowUpdate()
120 return n, err
121
122WAIT:
123 var timeout <-chan time.Time
124 var timer *time.Timer
125 if !s.readDeadline.IsZero() {
126 delay := s.readDeadline.Sub(time.Now())
127 timer = time.NewTimer(delay)
128 timeout = timer.C
129 }
130 select {
131 case <-s.recvNotifyCh:
132 if timer != nil {
133 timer.Stop()
134 }
135 goto START
136 case <-timeout:
137 return 0, ErrTimeout
138 }
139}
140
141// Write is used to write to the stream
142func (s *Stream) Write(b []byte) (n int, err error) {
143 s.sendLock.Lock()
144 defer s.sendLock.Unlock()
145 total := 0
146 for total < len(b) {
147 n, err := s.write(b[total:])
148 total += n
149 if err != nil {
150 return total, err
151 }
152 }
153 return total, nil
154}
155
156// write is used to write to the stream, may return on
157// a short write.
158func (s *Stream) write(b []byte) (n int, err error) {
159 var flags uint16
160 var max uint32
161 var body io.Reader
162START:
163 s.stateLock.Lock()
164 switch s.state {
165 case streamLocalClose:
166 fallthrough
167 case streamClosed:
168 s.stateLock.Unlock()
169 return 0, ErrStreamClosed
170 case streamReset:
171 s.stateLock.Unlock()
172 return 0, ErrConnectionReset
173 }
174 s.stateLock.Unlock()
175
176 // If there is no data available, block
177 window := atomic.LoadUint32(&s.sendWindow)
178 if window == 0 {
179 goto WAIT
180 }
181
182 // Determine the flags if any
183 flags = s.sendFlags()
184
185 // Send up to our send window
186 max = min(window, uint32(len(b)))
187 body = bytes.NewReader(b[:max])
188
189 // Send the header
190 s.sendHdr.encode(typeData, flags, s.id, max)
191 if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
192 return 0, err
193 }
194
195 // Reduce our send window
196 atomic.AddUint32(&s.sendWindow, ^uint32(max-1))
197
198 // Unlock
199 return int(max), err
200
201WAIT:
202 var timeout <-chan time.Time
203 if !s.writeDeadline.IsZero() {
204 delay := s.writeDeadline.Sub(time.Now())
205 timeout = time.After(delay)
206 }
207 select {
208 case <-s.sendNotifyCh:
209 goto START
210 case <-timeout:
211 return 0, ErrTimeout
212 }
213 return 0, nil
214}
215
216// sendFlags determines any flags that are appropriate
217// based on the current stream state
218func (s *Stream) sendFlags() uint16 {
219 s.stateLock.Lock()
220 defer s.stateLock.Unlock()
221 var flags uint16
222 switch s.state {
223 case streamInit:
224 flags |= flagSYN
225 s.state = streamSYNSent
226 case streamSYNReceived:
227 flags |= flagACK
228 s.state = streamEstablished
229 }
230 return flags
231}
232
233// sendWindowUpdate potentially sends a window update enabling
234// further writes to take place. Must be invoked with the lock.
235func (s *Stream) sendWindowUpdate() error {
236 s.controlHdrLock.Lock()
237 defer s.controlHdrLock.Unlock()
238
239 // Determine the delta update
240 max := s.session.config.MaxStreamWindowSize
241 delta := max - atomic.LoadUint32(&s.recvWindow)
242
243 // Determine the flags if any
244 flags := s.sendFlags()
245
246 // Check if we can omit the update
247 if delta < (max/2) && flags == 0 {
248 return nil
249 }
250
251 // Update our window
252 atomic.AddUint32(&s.recvWindow, delta)
253
254 // Send the header
255 s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
256 if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil {
257 return err
258 }
259 return nil
260}
261
262// sendClose is used to send a FIN
263func (s *Stream) sendClose() error {
264 s.controlHdrLock.Lock()
265 defer s.controlHdrLock.Unlock()
266
267 flags := s.sendFlags()
268 flags |= flagFIN
269 s.controlHdr.encode(typeWindowUpdate, flags, s.id, 0)
270 if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil {
271 return err
272 }
273 return nil
274}
275
276// Close is used to close the stream
277func (s *Stream) Close() error {
278 closeStream := false
279 s.stateLock.Lock()
280 switch s.state {
281 // Opened means we need to signal a close
282 case streamSYNSent:
283 fallthrough
284 case streamSYNReceived:
285 fallthrough
286 case streamEstablished:
287 s.state = streamLocalClose
288 goto SEND_CLOSE
289
290 case streamLocalClose:
291 case streamRemoteClose:
292 s.state = streamClosed
293 closeStream = true
294 goto SEND_CLOSE
295
296 case streamClosed:
297 case streamReset:
298 default:
299 panic("unhandled state")
300 }
301 s.stateLock.Unlock()
302 return nil
303SEND_CLOSE:
304 s.stateLock.Unlock()
305 s.sendClose()
306 s.notifyWaiting()
307 if closeStream {
308 s.session.closeStream(s.id)
309 }
310 return nil
311}
312
313// forceClose is used for when the session is exiting
314func (s *Stream) forceClose() {
315 s.stateLock.Lock()
316 s.state = streamClosed
317 s.stateLock.Unlock()
318 s.notifyWaiting()
319}
320
321// processFlags is used to update the state of the stream
322// based on set flags, if any. Lock must be held
323func (s *Stream) processFlags(flags uint16) error {
324 // Close the stream without holding the state lock
325 closeStream := false
326 defer func() {
327 if closeStream {
328 s.session.closeStream(s.id)
329 }
330 }()
331
332 s.stateLock.Lock()
333 defer s.stateLock.Unlock()
334 if flags&flagACK == flagACK {
335 if s.state == streamSYNSent {
336 s.state = streamEstablished
337 }
338 s.session.establishStream(s.id)
339 }
340 if flags&flagFIN == flagFIN {
341 switch s.state {
342 case streamSYNSent:
343 fallthrough
344 case streamSYNReceived:
345 fallthrough
346 case streamEstablished:
347 s.state = streamRemoteClose
348 s.notifyWaiting()
349 case streamLocalClose:
350 s.state = streamClosed
351 closeStream = true
352 s.notifyWaiting()
353 default:
354 s.session.logger.Printf("[ERR] yamux: unexpected FIN flag in state %d", s.state)
355 return ErrUnexpectedFlag
356 }
357 }
358 if flags&flagRST == flagRST {
359 s.state = streamReset
360 closeStream = true
361 s.notifyWaiting()
362 }
363 return nil
364}
365
366// notifyWaiting notifies all the waiting channels
367func (s *Stream) notifyWaiting() {
368 asyncNotify(s.recvNotifyCh)
369 asyncNotify(s.sendNotifyCh)
370}
371
372// incrSendWindow updates the size of our send window
373func (s *Stream) incrSendWindow(hdr header, flags uint16) error {
374 if err := s.processFlags(flags); err != nil {
375 return err
376 }
377
378 // Increase window, unblock a sender
379 atomic.AddUint32(&s.sendWindow, hdr.Length())
380 asyncNotify(s.sendNotifyCh)
381 return nil
382}
383
384// readData is used to handle a data frame
385func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
386 if err := s.processFlags(flags); err != nil {
387 return err
388 }
389
390 // Check that our recv window is not exceeded
391 length := hdr.Length()
392 if length == 0 {
393 return nil
394 }
395 if remain := atomic.LoadUint32(&s.recvWindow); length > remain {
396 s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, remain, length)
397 return ErrRecvWindowExceeded
398 }
399
400 // Wrap in a limited reader
401 conn = &io.LimitedReader{R: conn, N: int64(length)}
402
403 // Copy into buffer
404 s.recvLock.Lock()
405 if s.recvBuf == nil {
406 // Allocate the receive buffer just-in-time to fit the full data frame.
407 // This way we can read in the whole packet without further allocations.
408 s.recvBuf = bytes.NewBuffer(make([]byte, 0, length))
409 }
410 if _, err := io.Copy(s.recvBuf, conn); err != nil {
411 s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err)
412 s.recvLock.Unlock()
413 return err
414 }
415
416 // Decrement the receive window
417 atomic.AddUint32(&s.recvWindow, ^uint32(length-1))
418 s.recvLock.Unlock()
419
420 // Unblock any readers
421 asyncNotify(s.recvNotifyCh)
422 return nil
423}
424
425// SetDeadline sets the read and write deadlines
426func (s *Stream) SetDeadline(t time.Time) error {
427 if err := s.SetReadDeadline(t); err != nil {
428 return err
429 }
430 if err := s.SetWriteDeadline(t); err != nil {
431 return err
432 }
433 return nil
434}
435
436// SetReadDeadline sets the deadline for future Read calls.
437func (s *Stream) SetReadDeadline(t time.Time) error {
438 s.readDeadline = t
439 return nil
440}
441
442// SetWriteDeadline sets the deadline for future Write calls
443func (s *Stream) SetWriteDeadline(t time.Time) error {
444 s.writeDeadline = t
445 return nil
446}
447
448// Shrink is used to compact the amount of buffers utilized
449// This is useful when using Yamux in a connection pool to reduce
450// the idle memory utilization.
451func (s *Stream) Shrink() {
452 s.recvLock.Lock()
453 if s.recvBuf != nil && s.recvBuf.Len() == 0 {
454 s.recvBuf = nil
455 }
456 s.recvLock.Unlock()
457}
diff --git a/vendor/github.com/hashicorp/yamux/util.go b/vendor/github.com/hashicorp/yamux/util.go
new file mode 100644
index 0000000..5fe45af
--- /dev/null
+++ b/vendor/github.com/hashicorp/yamux/util.go
@@ -0,0 +1,28 @@
1package yamux
2
3// asyncSendErr is used to try an async send of an error
4func asyncSendErr(ch chan error, err error) {
5 if ch == nil {
6 return
7 }
8 select {
9 case ch <- err:
10 default:
11 }
12}
13
14// asyncNotify is used to signal a waiting goroutine
15func asyncNotify(ch chan struct{}) {
16 select {
17 case ch <- struct{}{}:
18 default:
19 }
20}
21
22// min computes the minimum of two values
23func min(a, b uint32) uint32 {
24 if a < b {
25 return a
26 }
27 return b
28}