diff options
Diffstat (limited to 'vendor/github.com/hashicorp/yamux')
-rw-r--r-- | vendor/github.com/hashicorp/yamux/LICENSE | 362 | ||||
-rw-r--r-- | vendor/github.com/hashicorp/yamux/README.md | 86 | ||||
-rw-r--r-- | vendor/github.com/hashicorp/yamux/addr.go | 60 | ||||
-rw-r--r-- | vendor/github.com/hashicorp/yamux/const.go | 157 | ||||
-rw-r--r-- | vendor/github.com/hashicorp/yamux/mux.go | 87 | ||||
-rw-r--r-- | vendor/github.com/hashicorp/yamux/session.go | 623 | ||||
-rw-r--r-- | vendor/github.com/hashicorp/yamux/spec.md | 140 | ||||
-rw-r--r-- | vendor/github.com/hashicorp/yamux/stream.go | 457 | ||||
-rw-r--r-- | vendor/github.com/hashicorp/yamux/util.go | 28 |
9 files changed, 2000 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/yamux/LICENSE b/vendor/github.com/hashicorp/yamux/LICENSE new file mode 100644 index 0000000..f0e5c79 --- /dev/null +++ b/vendor/github.com/hashicorp/yamux/LICENSE | |||
@@ -0,0 +1,362 @@ | |||
1 | Mozilla Public License, version 2.0 | ||
2 | |||
3 | 1. Definitions | ||
4 | |||
5 | 1.1. "Contributor" | ||
6 | |||
7 | means each individual or legal entity that creates, contributes to the | ||
8 | creation of, or owns Covered Software. | ||
9 | |||
10 | 1.2. "Contributor Version" | ||
11 | |||
12 | means the combination of the Contributions of others (if any) used by a | ||
13 | Contributor and that particular Contributor's Contribution. | ||
14 | |||
15 | 1.3. "Contribution" | ||
16 | |||
17 | means Covered Software of a particular Contributor. | ||
18 | |||
19 | 1.4. "Covered Software" | ||
20 | |||
21 | means Source Code Form to which the initial Contributor has attached the | ||
22 | notice in Exhibit A, the Executable Form of such Source Code Form, and | ||
23 | Modifications of such Source Code Form, in each case including portions | ||
24 | thereof. | ||
25 | |||
26 | 1.5. "Incompatible With Secondary Licenses" | ||
27 | means | ||
28 | |||
29 | a. that the initial Contributor has attached the notice described in | ||
30 | Exhibit B to the Covered Software; or | ||
31 | |||
32 | b. that the Covered Software was made available under the terms of | ||
33 | version 1.1 or earlier of the License, but not also under the terms of | ||
34 | a Secondary License. | ||
35 | |||
36 | 1.6. "Executable Form" | ||
37 | |||
38 | means any form of the work other than Source Code Form. | ||
39 | |||
40 | 1.7. "Larger Work" | ||
41 | |||
42 | means a work that combines Covered Software with other material, in a | ||
43 | separate file or files, that is not Covered Software. | ||
44 | |||
45 | 1.8. "License" | ||
46 | |||
47 | means this document. | ||
48 | |||
49 | 1.9. "Licensable" | ||
50 | |||
51 | means having the right to grant, to the maximum extent possible, whether | ||
52 | at the time of the initial grant or subsequently, any and all of the | ||
53 | rights conveyed by this License. | ||
54 | |||
55 | 1.10. "Modifications" | ||
56 | |||
57 | means any of the following: | ||
58 | |||
59 | a. any file in Source Code Form that results from an addition to, | ||
60 | deletion from, or modification of the contents of Covered Software; or | ||
61 | |||
62 | b. any new file in Source Code Form that contains any Covered Software. | ||
63 | |||
64 | 1.11. "Patent Claims" of a Contributor | ||
65 | |||
66 | means any patent claim(s), including without limitation, method, | ||
67 | process, and apparatus claims, in any patent Licensable by such | ||
68 | Contributor that would be infringed, but for the grant of the License, | ||
69 | by the making, using, selling, offering for sale, having made, import, | ||
70 | or transfer of either its Contributions or its Contributor Version. | ||
71 | |||
72 | 1.12. "Secondary License" | ||
73 | |||
74 | means either the GNU General Public License, Version 2.0, the GNU Lesser | ||
75 | General Public License, Version 2.1, the GNU Affero General Public | ||
76 | License, Version 3.0, or any later versions of those licenses. | ||
77 | |||
78 | 1.13. "Source Code Form" | ||
79 | |||
80 | means the form of the work preferred for making modifications. | ||
81 | |||
82 | 1.14. "You" (or "Your") | ||
83 | |||
84 | means an individual or a legal entity exercising rights under this | ||
85 | License. For legal entities, "You" includes any entity that controls, is | ||
86 | controlled by, or is under common control with You. For purposes of this | ||
87 | definition, "control" means (a) the power, direct or indirect, to cause | ||
88 | the direction or management of such entity, whether by contract or | ||
89 | otherwise, or (b) ownership of more than fifty percent (50%) of the | ||
90 | outstanding shares or beneficial ownership of such entity. | ||
91 | |||
92 | |||
93 | 2. License Grants and Conditions | ||
94 | |||
95 | 2.1. Grants | ||
96 | |||
97 | Each Contributor hereby grants You a world-wide, royalty-free, | ||
98 | non-exclusive license: | ||
99 | |||
100 | a. under intellectual property rights (other than patent or trademark) | ||
101 | Licensable by such Contributor to use, reproduce, make available, | ||
102 | modify, display, perform, distribute, and otherwise exploit its | ||
103 | Contributions, either on an unmodified basis, with Modifications, or | ||
104 | as part of a Larger Work; and | ||
105 | |||
106 | b. under Patent Claims of such Contributor to make, use, sell, offer for | ||
107 | sale, have made, import, and otherwise transfer either its | ||
108 | Contributions or its Contributor Version. | ||
109 | |||
110 | 2.2. Effective Date | ||
111 | |||
112 | The licenses granted in Section 2.1 with respect to any Contribution | ||
113 | become effective for each Contribution on the date the Contributor first | ||
114 | distributes such Contribution. | ||
115 | |||
116 | 2.3. Limitations on Grant Scope | ||
117 | |||
118 | The licenses granted in this Section 2 are the only rights granted under | ||
119 | this License. No additional rights or licenses will be implied from the | ||
120 | distribution or licensing of Covered Software under this License. | ||
121 | Notwithstanding Section 2.1(b) above, no patent license is granted by a | ||
122 | Contributor: | ||
123 | |||
124 | a. for any code that a Contributor has removed from Covered Software; or | ||
125 | |||
126 | b. for infringements caused by: (i) Your and any other third party's | ||
127 | modifications of Covered Software, or (ii) the combination of its | ||
128 | Contributions with other software (except as part of its Contributor | ||
129 | Version); or | ||
130 | |||
131 | c. under Patent Claims infringed by Covered Software in the absence of | ||
132 | its Contributions. | ||
133 | |||
134 | This License does not grant any rights in the trademarks, service marks, | ||
135 | or logos of any Contributor (except as may be necessary to comply with | ||
136 | the notice requirements in Section 3.4). | ||
137 | |||
138 | 2.4. Subsequent Licenses | ||
139 | |||
140 | No Contributor makes additional grants as a result of Your choice to | ||
141 | distribute the Covered Software under a subsequent version of this | ||
142 | License (see Section 10.2) or under the terms of a Secondary License (if | ||
143 | permitted under the terms of Section 3.3). | ||
144 | |||
145 | 2.5. Representation | ||
146 | |||
147 | Each Contributor represents that the Contributor believes its | ||
148 | Contributions are its original creation(s) or it has sufficient rights to | ||
149 | grant the rights to its Contributions conveyed by this License. | ||
150 | |||
151 | 2.6. Fair Use | ||
152 | |||
153 | This License is not intended to limit any rights You have under | ||
154 | applicable copyright doctrines of fair use, fair dealing, or other | ||
155 | equivalents. | ||
156 | |||
157 | 2.7. Conditions | ||
158 | |||
159 | Sections 3.1, 3.2, 3.3, and 3.4 are conditions of the licenses granted in | ||
160 | Section 2.1. | ||
161 | |||
162 | |||
163 | 3. Responsibilities | ||
164 | |||
165 | 3.1. Distribution of Source Form | ||
166 | |||
167 | All distribution of Covered Software in Source Code Form, including any | ||
168 | Modifications that You create or to which You contribute, must be under | ||
169 | the terms of this License. You must inform recipients that the Source | ||
170 | Code Form of the Covered Software is governed by the terms of this | ||
171 | License, and how they can obtain a copy of this License. You may not | ||
172 | attempt to alter or restrict the recipients' rights in the Source Code | ||
173 | Form. | ||
174 | |||
175 | 3.2. Distribution of Executable Form | ||
176 | |||
177 | If You distribute Covered Software in Executable Form then: | ||
178 | |||
179 | a. such Covered Software must also be made available in Source Code Form, | ||
180 | as described in Section 3.1, and You must inform recipients of the | ||
181 | Executable Form how they can obtain a copy of such Source Code Form by | ||
182 | reasonable means in a timely manner, at a charge no more than the cost | ||
183 | of distribution to the recipient; and | ||
184 | |||
185 | b. You may distribute such Executable Form under the terms of this | ||
186 | License, or sublicense it under different terms, provided that the | ||
187 | license for the Executable Form does not attempt to limit or alter the | ||
188 | recipients' rights in the Source Code Form under this License. | ||
189 | |||
190 | 3.3. Distribution of a Larger Work | ||
191 | |||
192 | You may create and distribute a Larger Work under terms of Your choice, | ||
193 | provided that You also comply with the requirements of this License for | ||
194 | the Covered Software. If the Larger Work is a combination of Covered | ||
195 | Software with a work governed by one or more Secondary Licenses, and the | ||
196 | Covered Software is not Incompatible With Secondary Licenses, this | ||
197 | License permits You to additionally distribute such Covered Software | ||
198 | under the terms of such Secondary License(s), so that the recipient of | ||
199 | the Larger Work may, at their option, further distribute the Covered | ||
200 | Software under the terms of either this License or such Secondary | ||
201 | License(s). | ||
202 | |||
203 | 3.4. Notices | ||
204 | |||
205 | You may not remove or alter the substance of any license notices | ||
206 | (including copyright notices, patent notices, disclaimers of warranty, or | ||
207 | limitations of liability) contained within the Source Code Form of the | ||
208 | Covered Software, except that You may alter any license notices to the | ||
209 | extent required to remedy known factual inaccuracies. | ||
210 | |||
211 | 3.5. Application of Additional Terms | ||
212 | |||
213 | You may choose to offer, and to charge a fee for, warranty, support, | ||
214 | indemnity or liability obligations to one or more recipients of Covered | ||
215 | Software. However, You may do so only on Your own behalf, and not on | ||
216 | behalf of any Contributor. You must make it absolutely clear that any | ||
217 | such warranty, support, indemnity, or liability obligation is offered by | ||
218 | You alone, and You hereby agree to indemnify every Contributor for any | ||
219 | liability incurred by such Contributor as a result of warranty, support, | ||
220 | indemnity or liability terms You offer. You may include additional | ||
221 | disclaimers of warranty and limitations of liability specific to any | ||
222 | jurisdiction. | ||
223 | |||
224 | 4. Inability to Comply Due to Statute or Regulation | ||
225 | |||
226 | If it is impossible for You to comply with any of the terms of this License | ||
227 | with respect to some or all of the Covered Software due to statute, | ||
228 | judicial order, or regulation then You must: (a) comply with the terms of | ||
229 | this License to the maximum extent possible; and (b) describe the | ||
230 | limitations and the code they affect. Such description must be placed in a | ||
231 | text file included with all distributions of the Covered Software under | ||
232 | this License. Except to the extent prohibited by statute or regulation, | ||
233 | such description must be sufficiently detailed for a recipient of ordinary | ||
234 | skill to be able to understand it. | ||
235 | |||
236 | 5. Termination | ||
237 | |||
238 | 5.1. The rights granted under this License will terminate automatically if You | ||
239 | fail to comply with any of its terms. However, if You become compliant, | ||
240 | then the rights granted under this License from a particular Contributor | ||
241 | are reinstated (a) provisionally, unless and until such Contributor | ||
242 | explicitly and finally terminates Your grants, and (b) on an ongoing | ||
243 | basis, if such Contributor fails to notify You of the non-compliance by | ||
244 | some reasonable means prior to 60 days after You have come back into | ||
245 | compliance. Moreover, Your grants from a particular Contributor are | ||
246 | reinstated on an ongoing basis if such Contributor notifies You of the | ||
247 | non-compliance by some reasonable means, this is the first time You have | ||
248 | received notice of non-compliance with this License from such | ||
249 | Contributor, and You become compliant prior to 30 days after Your receipt | ||
250 | of the notice. | ||
251 | |||
252 | 5.2. If You initiate litigation against any entity by asserting a patent | ||
253 | infringement claim (excluding declaratory judgment actions, | ||
254 | counter-claims, and cross-claims) alleging that a Contributor Version | ||
255 | directly or indirectly infringes any patent, then the rights granted to | ||
256 | You by any and all Contributors for the Covered Software under Section | ||
257 | 2.1 of this License shall terminate. | ||
258 | |||
259 | 5.3. In the event of termination under Sections 5.1 or 5.2 above, all end user | ||
260 | license agreements (excluding distributors and resellers) which have been | ||
261 | validly granted by You or Your distributors under this License prior to | ||
262 | termination shall survive termination. | ||
263 | |||
264 | 6. Disclaimer of Warranty | ||
265 | |||
266 | Covered Software is provided under this License on an "as is" basis, | ||
267 | without warranty of any kind, either expressed, implied, or statutory, | ||
268 | including, without limitation, warranties that the Covered Software is free | ||
269 | of defects, merchantable, fit for a particular purpose or non-infringing. | ||
270 | The entire risk as to the quality and performance of the Covered Software | ||
271 | is with You. Should any Covered Software prove defective in any respect, | ||
272 | You (not any Contributor) assume the cost of any necessary servicing, | ||
273 | repair, or correction. This disclaimer of warranty constitutes an essential | ||
274 | part of this License. No use of any Covered Software is authorized under | ||
275 | this License except under this disclaimer. | ||
276 | |||
277 | 7. Limitation of Liability | ||
278 | |||
279 | Under no circumstances and under no legal theory, whether tort (including | ||
280 | negligence), contract, or otherwise, shall any Contributor, or anyone who | ||
281 | distributes Covered Software as permitted above, be liable to You for any | ||
282 | direct, indirect, special, incidental, or consequential damages of any | ||
283 | character including, without limitation, damages for lost profits, loss of | ||
284 | goodwill, work stoppage, computer failure or malfunction, or any and all | ||
285 | other commercial damages or losses, even if such party shall have been | ||
286 | informed of the possibility of such damages. This limitation of liability | ||
287 | shall not apply to liability for death or personal injury resulting from | ||
288 | such party's negligence to the extent applicable law prohibits such | ||
289 | limitation. Some jurisdictions do not allow the exclusion or limitation of | ||
290 | incidental or consequential damages, so this exclusion and limitation may | ||
291 | not apply to You. | ||
292 | |||
293 | 8. Litigation | ||
294 | |||
295 | Any litigation relating to this License may be brought only in the courts | ||
296 | of a jurisdiction where the defendant maintains its principal place of | ||
297 | business and such litigation shall be governed by laws of that | ||
298 | jurisdiction, without reference to its conflict-of-law provisions. Nothing | ||
299 | in this Section shall prevent a party's ability to bring cross-claims or | ||
300 | counter-claims. | ||
301 | |||
302 | 9. Miscellaneous | ||
303 | |||
304 | This License represents the complete agreement concerning the subject | ||
305 | matter hereof. If any provision of this License is held to be | ||
306 | unenforceable, such provision shall be reformed only to the extent | ||
307 | necessary to make it enforceable. Any law or regulation which provides that | ||
308 | the language of a contract shall be construed against the drafter shall not | ||
309 | be used to construe this License against a Contributor. | ||
310 | |||
311 | |||
312 | 10. Versions of the License | ||
313 | |||
314 | 10.1. New Versions | ||
315 | |||
316 | Mozilla Foundation is the license steward. Except as provided in Section | ||
317 | 10.3, no one other than the license steward has the right to modify or | ||
318 | publish new versions of this License. Each version will be given a | ||
319 | distinguishing version number. | ||
320 | |||
321 | 10.2. Effect of New Versions | ||
322 | |||
323 | You may distribute the Covered Software under the terms of the version | ||
324 | of the License under which You originally received the Covered Software, | ||
325 | or under the terms of any subsequent version published by the license | ||
326 | steward. | ||
327 | |||
328 | 10.3. Modified Versions | ||
329 | |||
330 | If you create software not governed by this License, and you want to | ||
331 | create a new license for such software, you may create and use a | ||
332 | modified version of this License if you rename the license and remove | ||
333 | any references to the name of the license steward (except to note that | ||
334 | such modified license differs from this License). | ||
335 | |||
336 | 10.4. Distributing Source Code Form that is Incompatible With Secondary | ||
337 | Licenses If You choose to distribute Source Code Form that is | ||
338 | Incompatible With Secondary Licenses under the terms of this version of | ||
339 | the License, the notice described in Exhibit B of this License must be | ||
340 | attached. | ||
341 | |||
342 | Exhibit A - Source Code Form License Notice | ||
343 | |||
344 | This Source Code Form is subject to the | ||
345 | terms of the Mozilla Public License, v. | ||
346 | 2.0. If a copy of the MPL was not | ||
347 | distributed with this file, You can | ||
348 | obtain one at | ||
349 | http://mozilla.org/MPL/2.0/. | ||
350 | |||
351 | If it is not possible or desirable to put the notice in a particular file, | ||
352 | then You may include the notice in a location (such as a LICENSE file in a | ||
353 | relevant directory) where a recipient would be likely to look for such a | ||
354 | notice. | ||
355 | |||
356 | You may add additional accurate notices of copyright ownership. | ||
357 | |||
358 | Exhibit B - "Incompatible With Secondary Licenses" Notice | ||
359 | |||
360 | This Source Code Form is "Incompatible | ||
361 | With Secondary Licenses", as defined by | ||
362 | the Mozilla Public License, v. 2.0. \ No newline at end of file | ||
diff --git a/vendor/github.com/hashicorp/yamux/README.md b/vendor/github.com/hashicorp/yamux/README.md new file mode 100644 index 0000000..d4db7fc --- /dev/null +++ b/vendor/github.com/hashicorp/yamux/README.md | |||
@@ -0,0 +1,86 @@ | |||
1 | # Yamux | ||
2 | |||
3 | Yamux (Yet another Multiplexer) is a multiplexing library for Golang. | ||
4 | It relies on an underlying connection to provide reliability | ||
5 | and ordering, such as TCP or Unix domain sockets, and provides | ||
6 | stream-oriented multiplexing. It is inspired by SPDY but is not | ||
7 | interoperable with it. | ||
8 | |||
9 | Yamux features include: | ||
10 | |||
11 | * Bi-directional streams | ||
12 | * Streams can be opened by either client or server | ||
13 | * Useful for NAT traversal | ||
14 | * Server-side push support | ||
15 | * Flow control | ||
16 | * Avoid starvation | ||
17 | * Back-pressure to prevent overwhelming a receiver | ||
18 | * Keep Alives | ||
19 | * Enables persistent connections over a load balancer | ||
20 | * Efficient | ||
21 | * Enables thousands of logical streams with low overhead | ||
22 | |||
23 | ## Documentation | ||
24 | |||
25 | For complete documentation, see the associated [Godoc](http://godoc.org/github.com/hashicorp/yamux). | ||
26 | |||
27 | ## Specification | ||
28 | |||
29 | The full specification for Yamux is provided in the `spec.md` file. | ||
30 | It can be used as a guide to implementors of interoperable libraries. | ||
31 | |||
32 | ## Usage | ||
33 | |||
34 | Using Yamux is remarkably simple: | ||
35 | |||
36 | ```go | ||
37 | |||
38 | func client() { | ||
39 | // Get a TCP connection | ||
40 | conn, err := net.Dial(...) | ||
41 | if err != nil { | ||
42 | panic(err) | ||
43 | } | ||
44 | |||
45 | // Setup client side of yamux | ||
46 | session, err := yamux.Client(conn, nil) | ||
47 | if err != nil { | ||
48 | panic(err) | ||
49 | } | ||
50 | |||
51 | // Open a new stream | ||
52 | stream, err := session.Open() | ||
53 | if err != nil { | ||
54 | panic(err) | ||
55 | } | ||
56 | |||
57 | // Stream implements net.Conn | ||
58 | stream.Write([]byte("ping")) | ||
59 | } | ||
60 | |||
61 | func server() { | ||
62 | // Accept a TCP connection | ||
63 | conn, err := listener.Accept() | ||
64 | if err != nil { | ||
65 | panic(err) | ||
66 | } | ||
67 | |||
68 | // Setup server side of yamux | ||
69 | session, err := yamux.Server(conn, nil) | ||
70 | if err != nil { | ||
71 | panic(err) | ||
72 | } | ||
73 | |||
74 | // Accept a stream | ||
75 | stream, err := session.Accept() | ||
76 | if err != nil { | ||
77 | panic(err) | ||
78 | } | ||
79 | |||
80 | // Listen for a message | ||
81 | buf := make([]byte, 4) | ||
82 | stream.Read(buf) | ||
83 | } | ||
84 | |||
85 | ``` | ||
86 | |||
diff --git a/vendor/github.com/hashicorp/yamux/addr.go b/vendor/github.com/hashicorp/yamux/addr.go new file mode 100644 index 0000000..be6ebca --- /dev/null +++ b/vendor/github.com/hashicorp/yamux/addr.go | |||
@@ -0,0 +1,60 @@ | |||
1 | package yamux | ||
2 | |||
3 | import ( | ||
4 | "fmt" | ||
5 | "net" | ||
6 | ) | ||
7 | |||
8 | // hasAddr is used to get the address from the underlying connection | ||
9 | type hasAddr interface { | ||
10 | LocalAddr() net.Addr | ||
11 | RemoteAddr() net.Addr | ||
12 | } | ||
13 | |||
14 | // yamuxAddr is used when we cannot get the underlying address | ||
15 | type yamuxAddr struct { | ||
16 | Addr string | ||
17 | } | ||
18 | |||
19 | func (*yamuxAddr) Network() string { | ||
20 | return "yamux" | ||
21 | } | ||
22 | |||
23 | func (y *yamuxAddr) String() string { | ||
24 | return fmt.Sprintf("yamux:%s", y.Addr) | ||
25 | } | ||
26 | |||
27 | // Addr is used to get the address of the listener. | ||
28 | func (s *Session) Addr() net.Addr { | ||
29 | return s.LocalAddr() | ||
30 | } | ||
31 | |||
32 | // LocalAddr is used to get the local address of the | ||
33 | // underlying connection. | ||
34 | func (s *Session) LocalAddr() net.Addr { | ||
35 | addr, ok := s.conn.(hasAddr) | ||
36 | if !ok { | ||
37 | return &yamuxAddr{"local"} | ||
38 | } | ||
39 | return addr.LocalAddr() | ||
40 | } | ||
41 | |||
42 | // RemoteAddr is used to get the address of remote end | ||
43 | // of the underlying connection | ||
44 | func (s *Session) RemoteAddr() net.Addr { | ||
45 | addr, ok := s.conn.(hasAddr) | ||
46 | if !ok { | ||
47 | return &yamuxAddr{"remote"} | ||
48 | } | ||
49 | return addr.RemoteAddr() | ||
50 | } | ||
51 | |||
52 | // LocalAddr returns the local address | ||
53 | func (s *Stream) LocalAddr() net.Addr { | ||
54 | return s.session.LocalAddr() | ||
55 | } | ||
56 | |||
57 | // LocalAddr returns the remote address | ||
58 | func (s *Stream) RemoteAddr() net.Addr { | ||
59 | return s.session.RemoteAddr() | ||
60 | } | ||
diff --git a/vendor/github.com/hashicorp/yamux/const.go b/vendor/github.com/hashicorp/yamux/const.go new file mode 100644 index 0000000..4f52938 --- /dev/null +++ b/vendor/github.com/hashicorp/yamux/const.go | |||
@@ -0,0 +1,157 @@ | |||
1 | package yamux | ||
2 | |||
3 | import ( | ||
4 | "encoding/binary" | ||
5 | "fmt" | ||
6 | ) | ||
7 | |||
8 | var ( | ||
9 | // ErrInvalidVersion means we received a frame with an | ||
10 | // invalid version | ||
11 | ErrInvalidVersion = fmt.Errorf("invalid protocol version") | ||
12 | |||
13 | // ErrInvalidMsgType means we received a frame with an | ||
14 | // invalid message type | ||
15 | ErrInvalidMsgType = fmt.Errorf("invalid msg type") | ||
16 | |||
17 | // ErrSessionShutdown is used if there is a shutdown during | ||
18 | // an operation | ||
19 | ErrSessionShutdown = fmt.Errorf("session shutdown") | ||
20 | |||
21 | // ErrStreamsExhausted is returned if we have no more | ||
22 | // stream ids to issue | ||
23 | ErrStreamsExhausted = fmt.Errorf("streams exhausted") | ||
24 | |||
25 | // ErrDuplicateStream is used if a duplicate stream is | ||
26 | // opened inbound | ||
27 | ErrDuplicateStream = fmt.Errorf("duplicate stream initiated") | ||
28 | |||
29 | // ErrReceiveWindowExceeded indicates the window was exceeded | ||
30 | ErrRecvWindowExceeded = fmt.Errorf("recv window exceeded") | ||
31 | |||
32 | // ErrTimeout is used when we reach an IO deadline | ||
33 | ErrTimeout = fmt.Errorf("i/o deadline reached") | ||
34 | |||
35 | // ErrStreamClosed is returned when using a closed stream | ||
36 | ErrStreamClosed = fmt.Errorf("stream closed") | ||
37 | |||
38 | // ErrUnexpectedFlag is set when we get an unexpected flag | ||
39 | ErrUnexpectedFlag = fmt.Errorf("unexpected flag") | ||
40 | |||
41 | // ErrRemoteGoAway is used when we get a go away from the other side | ||
42 | ErrRemoteGoAway = fmt.Errorf("remote end is not accepting connections") | ||
43 | |||
44 | // ErrConnectionReset is sent if a stream is reset. This can happen | ||
45 | // if the backlog is exceeded, or if there was a remote GoAway. | ||
46 | ErrConnectionReset = fmt.Errorf("connection reset") | ||
47 | |||
48 | // ErrConnectionWriteTimeout indicates that we hit the "safety valve" | ||
49 | // timeout writing to the underlying stream connection. | ||
50 | ErrConnectionWriteTimeout = fmt.Errorf("connection write timeout") | ||
51 | |||
52 | // ErrKeepAliveTimeout is sent if a missed keepalive caused the stream close | ||
53 | ErrKeepAliveTimeout = fmt.Errorf("keepalive timeout") | ||
54 | ) | ||
55 | |||
56 | const ( | ||
57 | // protoVersion is the only version we support | ||
58 | protoVersion uint8 = 0 | ||
59 | ) | ||
60 | |||
61 | const ( | ||
62 | // Data is used for data frames. They are followed | ||
63 | // by length bytes worth of payload. | ||
64 | typeData uint8 = iota | ||
65 | |||
66 | // WindowUpdate is used to change the window of | ||
67 | // a given stream. The length indicates the delta | ||
68 | // update to the window. | ||
69 | typeWindowUpdate | ||
70 | |||
71 | // Ping is sent as a keep-alive or to measure | ||
72 | // the RTT. The StreamID and Length value are echoed | ||
73 | // back in the response. | ||
74 | typePing | ||
75 | |||
76 | // GoAway is sent to terminate a session. The StreamID | ||
77 | // should be 0 and the length is an error code. | ||
78 | typeGoAway | ||
79 | ) | ||
80 | |||
81 | const ( | ||
82 | // SYN is sent to signal a new stream. May | ||
83 | // be sent with a data payload | ||
84 | flagSYN uint16 = 1 << iota | ||
85 | |||
86 | // ACK is sent to acknowledge a new stream. May | ||
87 | // be sent with a data payload | ||
88 | flagACK | ||
89 | |||
90 | // FIN is sent to half-close the given stream. | ||
91 | // May be sent with a data payload. | ||
92 | flagFIN | ||
93 | |||
94 | // RST is used to hard close a given stream. | ||
95 | flagRST | ||
96 | ) | ||
97 | |||
98 | const ( | ||
99 | // initialStreamWindow is the initial stream window size | ||
100 | initialStreamWindow uint32 = 256 * 1024 | ||
101 | ) | ||
102 | |||
103 | const ( | ||
104 | // goAwayNormal is sent on a normal termination | ||
105 | goAwayNormal uint32 = iota | ||
106 | |||
107 | // goAwayProtoErr sent on a protocol error | ||
108 | goAwayProtoErr | ||
109 | |||
110 | // goAwayInternalErr sent on an internal error | ||
111 | goAwayInternalErr | ||
112 | ) | ||
113 | |||
114 | const ( | ||
115 | sizeOfVersion = 1 | ||
116 | sizeOfType = 1 | ||
117 | sizeOfFlags = 2 | ||
118 | sizeOfStreamID = 4 | ||
119 | sizeOfLength = 4 | ||
120 | headerSize = sizeOfVersion + sizeOfType + sizeOfFlags + | ||
121 | sizeOfStreamID + sizeOfLength | ||
122 | ) | ||
123 | |||
124 | type header []byte | ||
125 | |||
126 | func (h header) Version() uint8 { | ||
127 | return h[0] | ||
128 | } | ||
129 | |||
130 | func (h header) MsgType() uint8 { | ||
131 | return h[1] | ||
132 | } | ||
133 | |||
134 | func (h header) Flags() uint16 { | ||
135 | return binary.BigEndian.Uint16(h[2:4]) | ||
136 | } | ||
137 | |||
138 | func (h header) StreamID() uint32 { | ||
139 | return binary.BigEndian.Uint32(h[4:8]) | ||
140 | } | ||
141 | |||
142 | func (h header) Length() uint32 { | ||
143 | return binary.BigEndian.Uint32(h[8:12]) | ||
144 | } | ||
145 | |||
146 | func (h header) String() string { | ||
147 | return fmt.Sprintf("Vsn:%d Type:%d Flags:%d StreamID:%d Length:%d", | ||
148 | h.Version(), h.MsgType(), h.Flags(), h.StreamID(), h.Length()) | ||
149 | } | ||
150 | |||
151 | func (h header) encode(msgType uint8, flags uint16, streamID uint32, length uint32) { | ||
152 | h[0] = protoVersion | ||
153 | h[1] = msgType | ||
154 | binary.BigEndian.PutUint16(h[2:4], flags) | ||
155 | binary.BigEndian.PutUint32(h[4:8], streamID) | ||
156 | binary.BigEndian.PutUint32(h[8:12], length) | ||
157 | } | ||
diff --git a/vendor/github.com/hashicorp/yamux/mux.go b/vendor/github.com/hashicorp/yamux/mux.go new file mode 100644 index 0000000..7abc7c7 --- /dev/null +++ b/vendor/github.com/hashicorp/yamux/mux.go | |||
@@ -0,0 +1,87 @@ | |||
1 | package yamux | ||
2 | |||
3 | import ( | ||
4 | "fmt" | ||
5 | "io" | ||
6 | "os" | ||
7 | "time" | ||
8 | ) | ||
9 | |||
10 | // Config is used to tune the Yamux session | ||
11 | type Config struct { | ||
12 | // AcceptBacklog is used to limit how many streams may be | ||
13 | // waiting an accept. | ||
14 | AcceptBacklog int | ||
15 | |||
16 | // EnableKeepalive is used to do a period keep alive | ||
17 | // messages using a ping. | ||
18 | EnableKeepAlive bool | ||
19 | |||
20 | // KeepAliveInterval is how often to perform the keep alive | ||
21 | KeepAliveInterval time.Duration | ||
22 | |||
23 | // ConnectionWriteTimeout is meant to be a "safety valve" timeout after | ||
24 | // we which will suspect a problem with the underlying connection and | ||
25 | // close it. This is only applied to writes, where's there's generally | ||
26 | // an expectation that things will move along quickly. | ||
27 | ConnectionWriteTimeout time.Duration | ||
28 | |||
29 | // MaxStreamWindowSize is used to control the maximum | ||
30 | // window size that we allow for a stream. | ||
31 | MaxStreamWindowSize uint32 | ||
32 | |||
33 | // LogOutput is used to control the log destination | ||
34 | LogOutput io.Writer | ||
35 | } | ||
36 | |||
37 | // DefaultConfig is used to return a default configuration | ||
38 | func DefaultConfig() *Config { | ||
39 | return &Config{ | ||
40 | AcceptBacklog: 256, | ||
41 | EnableKeepAlive: true, | ||
42 | KeepAliveInterval: 30 * time.Second, | ||
43 | ConnectionWriteTimeout: 10 * time.Second, | ||
44 | MaxStreamWindowSize: initialStreamWindow, | ||
45 | LogOutput: os.Stderr, | ||
46 | } | ||
47 | } | ||
48 | |||
49 | // VerifyConfig is used to verify the sanity of configuration | ||
50 | func VerifyConfig(config *Config) error { | ||
51 | if config.AcceptBacklog <= 0 { | ||
52 | return fmt.Errorf("backlog must be positive") | ||
53 | } | ||
54 | if config.KeepAliveInterval == 0 { | ||
55 | return fmt.Errorf("keep-alive interval must be positive") | ||
56 | } | ||
57 | if config.MaxStreamWindowSize < initialStreamWindow { | ||
58 | return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow) | ||
59 | } | ||
60 | return nil | ||
61 | } | ||
62 | |||
63 | // Server is used to initialize a new server-side connection. | ||
64 | // There must be at most one server-side connection. If a nil config is | ||
65 | // provided, the DefaultConfiguration will be used. | ||
66 | func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) { | ||
67 | if config == nil { | ||
68 | config = DefaultConfig() | ||
69 | } | ||
70 | if err := VerifyConfig(config); err != nil { | ||
71 | return nil, err | ||
72 | } | ||
73 | return newSession(config, conn, false), nil | ||
74 | } | ||
75 | |||
76 | // Client is used to initialize a new client-side connection. | ||
77 | // There must be at most one client-side connection. | ||
78 | func Client(conn io.ReadWriteCloser, config *Config) (*Session, error) { | ||
79 | if config == nil { | ||
80 | config = DefaultConfig() | ||
81 | } | ||
82 | |||
83 | if err := VerifyConfig(config); err != nil { | ||
84 | return nil, err | ||
85 | } | ||
86 | return newSession(config, conn, true), nil | ||
87 | } | ||
diff --git a/vendor/github.com/hashicorp/yamux/session.go b/vendor/github.com/hashicorp/yamux/session.go new file mode 100644 index 0000000..e179818 --- /dev/null +++ b/vendor/github.com/hashicorp/yamux/session.go | |||
@@ -0,0 +1,623 @@ | |||
1 | package yamux | ||
2 | |||
3 | import ( | ||
4 | "bufio" | ||
5 | "fmt" | ||
6 | "io" | ||
7 | "io/ioutil" | ||
8 | "log" | ||
9 | "math" | ||
10 | "net" | ||
11 | "strings" | ||
12 | "sync" | ||
13 | "sync/atomic" | ||
14 | "time" | ||
15 | ) | ||
16 | |||
17 | // Session is used to wrap a reliable ordered connection and to | ||
18 | // multiplex it into multiple streams. | ||
19 | type Session struct { | ||
20 | // remoteGoAway indicates the remote side does | ||
21 | // not want futher connections. Must be first for alignment. | ||
22 | remoteGoAway int32 | ||
23 | |||
24 | // localGoAway indicates that we should stop | ||
25 | // accepting futher connections. Must be first for alignment. | ||
26 | localGoAway int32 | ||
27 | |||
28 | // nextStreamID is the next stream we should | ||
29 | // send. This depends if we are a client/server. | ||
30 | nextStreamID uint32 | ||
31 | |||
32 | // config holds our configuration | ||
33 | config *Config | ||
34 | |||
35 | // logger is used for our logs | ||
36 | logger *log.Logger | ||
37 | |||
38 | // conn is the underlying connection | ||
39 | conn io.ReadWriteCloser | ||
40 | |||
41 | // bufRead is a buffered reader | ||
42 | bufRead *bufio.Reader | ||
43 | |||
44 | // pings is used to track inflight pings | ||
45 | pings map[uint32]chan struct{} | ||
46 | pingID uint32 | ||
47 | pingLock sync.Mutex | ||
48 | |||
49 | // streams maps a stream id to a stream, and inflight has an entry | ||
50 | // for any outgoing stream that has not yet been established. Both are | ||
51 | // protected by streamLock. | ||
52 | streams map[uint32]*Stream | ||
53 | inflight map[uint32]struct{} | ||
54 | streamLock sync.Mutex | ||
55 | |||
56 | // synCh acts like a semaphore. It is sized to the AcceptBacklog which | ||
57 | // is assumed to be symmetric between the client and server. This allows | ||
58 | // the client to avoid exceeding the backlog and instead blocks the open. | ||
59 | synCh chan struct{} | ||
60 | |||
61 | // acceptCh is used to pass ready streams to the client | ||
62 | acceptCh chan *Stream | ||
63 | |||
64 | // sendCh is used to mark a stream as ready to send, | ||
65 | // or to send a header out directly. | ||
66 | sendCh chan sendReady | ||
67 | |||
68 | // recvDoneCh is closed when recv() exits to avoid a race | ||
69 | // between stream registration and stream shutdown | ||
70 | recvDoneCh chan struct{} | ||
71 | |||
72 | // shutdown is used to safely close a session | ||
73 | shutdown bool | ||
74 | shutdownErr error | ||
75 | shutdownCh chan struct{} | ||
76 | shutdownLock sync.Mutex | ||
77 | } | ||
78 | |||
79 | // sendReady is used to either mark a stream as ready | ||
80 | // or to directly send a header | ||
81 | type sendReady struct { | ||
82 | Hdr []byte | ||
83 | Body io.Reader | ||
84 | Err chan error | ||
85 | } | ||
86 | |||
87 | // newSession is used to construct a new session | ||
88 | func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { | ||
89 | s := &Session{ | ||
90 | config: config, | ||
91 | logger: log.New(config.LogOutput, "", log.LstdFlags), | ||
92 | conn: conn, | ||
93 | bufRead: bufio.NewReader(conn), | ||
94 | pings: make(map[uint32]chan struct{}), | ||
95 | streams: make(map[uint32]*Stream), | ||
96 | inflight: make(map[uint32]struct{}), | ||
97 | synCh: make(chan struct{}, config.AcceptBacklog), | ||
98 | acceptCh: make(chan *Stream, config.AcceptBacklog), | ||
99 | sendCh: make(chan sendReady, 64), | ||
100 | recvDoneCh: make(chan struct{}), | ||
101 | shutdownCh: make(chan struct{}), | ||
102 | } | ||
103 | if client { | ||
104 | s.nextStreamID = 1 | ||
105 | } else { | ||
106 | s.nextStreamID = 2 | ||
107 | } | ||
108 | go s.recv() | ||
109 | go s.send() | ||
110 | if config.EnableKeepAlive { | ||
111 | go s.keepalive() | ||
112 | } | ||
113 | return s | ||
114 | } | ||
115 | |||
116 | // IsClosed does a safe check to see if we have shutdown | ||
117 | func (s *Session) IsClosed() bool { | ||
118 | select { | ||
119 | case <-s.shutdownCh: | ||
120 | return true | ||
121 | default: | ||
122 | return false | ||
123 | } | ||
124 | } | ||
125 | |||
126 | // NumStreams returns the number of currently open streams | ||
127 | func (s *Session) NumStreams() int { | ||
128 | s.streamLock.Lock() | ||
129 | num := len(s.streams) | ||
130 | s.streamLock.Unlock() | ||
131 | return num | ||
132 | } | ||
133 | |||
134 | // Open is used to create a new stream as a net.Conn | ||
135 | func (s *Session) Open() (net.Conn, error) { | ||
136 | conn, err := s.OpenStream() | ||
137 | if err != nil { | ||
138 | return nil, err | ||
139 | } | ||
140 | return conn, nil | ||
141 | } | ||
142 | |||
143 | // OpenStream is used to create a new stream | ||
144 | func (s *Session) OpenStream() (*Stream, error) { | ||
145 | if s.IsClosed() { | ||
146 | return nil, ErrSessionShutdown | ||
147 | } | ||
148 | if atomic.LoadInt32(&s.remoteGoAway) == 1 { | ||
149 | return nil, ErrRemoteGoAway | ||
150 | } | ||
151 | |||
152 | // Block if we have too many inflight SYNs | ||
153 | select { | ||
154 | case s.synCh <- struct{}{}: | ||
155 | case <-s.shutdownCh: | ||
156 | return nil, ErrSessionShutdown | ||
157 | } | ||
158 | |||
159 | GET_ID: | ||
160 | // Get an ID, and check for stream exhaustion | ||
161 | id := atomic.LoadUint32(&s.nextStreamID) | ||
162 | if id >= math.MaxUint32-1 { | ||
163 | return nil, ErrStreamsExhausted | ||
164 | } | ||
165 | if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) { | ||
166 | goto GET_ID | ||
167 | } | ||
168 | |||
169 | // Register the stream | ||
170 | stream := newStream(s, id, streamInit) | ||
171 | s.streamLock.Lock() | ||
172 | s.streams[id] = stream | ||
173 | s.inflight[id] = struct{}{} | ||
174 | s.streamLock.Unlock() | ||
175 | |||
176 | // Send the window update to create | ||
177 | if err := stream.sendWindowUpdate(); err != nil { | ||
178 | select { | ||
179 | case <-s.synCh: | ||
180 | default: | ||
181 | s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore") | ||
182 | } | ||
183 | return nil, err | ||
184 | } | ||
185 | return stream, nil | ||
186 | } | ||
187 | |||
188 | // Accept is used to block until the next available stream | ||
189 | // is ready to be accepted. | ||
190 | func (s *Session) Accept() (net.Conn, error) { | ||
191 | conn, err := s.AcceptStream() | ||
192 | if err != nil { | ||
193 | return nil, err | ||
194 | } | ||
195 | return conn, err | ||
196 | } | ||
197 | |||
198 | // AcceptStream is used to block until the next available stream | ||
199 | // is ready to be accepted. | ||
200 | func (s *Session) AcceptStream() (*Stream, error) { | ||
201 | select { | ||
202 | case stream := <-s.acceptCh: | ||
203 | if err := stream.sendWindowUpdate(); err != nil { | ||
204 | return nil, err | ||
205 | } | ||
206 | return stream, nil | ||
207 | case <-s.shutdownCh: | ||
208 | return nil, s.shutdownErr | ||
209 | } | ||
210 | } | ||
211 | |||
212 | // Close is used to close the session and all streams. | ||
213 | // Attempts to send a GoAway before closing the connection. | ||
214 | func (s *Session) Close() error { | ||
215 | s.shutdownLock.Lock() | ||
216 | defer s.shutdownLock.Unlock() | ||
217 | |||
218 | if s.shutdown { | ||
219 | return nil | ||
220 | } | ||
221 | s.shutdown = true | ||
222 | if s.shutdownErr == nil { | ||
223 | s.shutdownErr = ErrSessionShutdown | ||
224 | } | ||
225 | close(s.shutdownCh) | ||
226 | s.conn.Close() | ||
227 | <-s.recvDoneCh | ||
228 | |||
229 | s.streamLock.Lock() | ||
230 | defer s.streamLock.Unlock() | ||
231 | for _, stream := range s.streams { | ||
232 | stream.forceClose() | ||
233 | } | ||
234 | return nil | ||
235 | } | ||
236 | |||
237 | // exitErr is used to handle an error that is causing the | ||
238 | // session to terminate. | ||
239 | func (s *Session) exitErr(err error) { | ||
240 | s.shutdownLock.Lock() | ||
241 | if s.shutdownErr == nil { | ||
242 | s.shutdownErr = err | ||
243 | } | ||
244 | s.shutdownLock.Unlock() | ||
245 | s.Close() | ||
246 | } | ||
247 | |||
248 | // GoAway can be used to prevent accepting further | ||
249 | // connections. It does not close the underlying conn. | ||
250 | func (s *Session) GoAway() error { | ||
251 | return s.waitForSend(s.goAway(goAwayNormal), nil) | ||
252 | } | ||
253 | |||
254 | // goAway is used to send a goAway message | ||
255 | func (s *Session) goAway(reason uint32) header { | ||
256 | atomic.SwapInt32(&s.localGoAway, 1) | ||
257 | hdr := header(make([]byte, headerSize)) | ||
258 | hdr.encode(typeGoAway, 0, 0, reason) | ||
259 | return hdr | ||
260 | } | ||
261 | |||
262 | // Ping is used to measure the RTT response time | ||
263 | func (s *Session) Ping() (time.Duration, error) { | ||
264 | // Get a channel for the ping | ||
265 | ch := make(chan struct{}) | ||
266 | |||
267 | // Get a new ping id, mark as pending | ||
268 | s.pingLock.Lock() | ||
269 | id := s.pingID | ||
270 | s.pingID++ | ||
271 | s.pings[id] = ch | ||
272 | s.pingLock.Unlock() | ||
273 | |||
274 | // Send the ping request | ||
275 | hdr := header(make([]byte, headerSize)) | ||
276 | hdr.encode(typePing, flagSYN, 0, id) | ||
277 | if err := s.waitForSend(hdr, nil); err != nil { | ||
278 | return 0, err | ||
279 | } | ||
280 | |||
281 | // Wait for a response | ||
282 | start := time.Now() | ||
283 | select { | ||
284 | case <-ch: | ||
285 | case <-time.After(s.config.ConnectionWriteTimeout): | ||
286 | s.pingLock.Lock() | ||
287 | delete(s.pings, id) // Ignore it if a response comes later. | ||
288 | s.pingLock.Unlock() | ||
289 | return 0, ErrTimeout | ||
290 | case <-s.shutdownCh: | ||
291 | return 0, ErrSessionShutdown | ||
292 | } | ||
293 | |||
294 | // Compute the RTT | ||
295 | return time.Now().Sub(start), nil | ||
296 | } | ||
297 | |||
298 | // keepalive is a long running goroutine that periodically does | ||
299 | // a ping to keep the connection alive. | ||
300 | func (s *Session) keepalive() { | ||
301 | for { | ||
302 | select { | ||
303 | case <-time.After(s.config.KeepAliveInterval): | ||
304 | _, err := s.Ping() | ||
305 | if err != nil { | ||
306 | s.logger.Printf("[ERR] yamux: keepalive failed: %v", err) | ||
307 | s.exitErr(ErrKeepAliveTimeout) | ||
308 | return | ||
309 | } | ||
310 | case <-s.shutdownCh: | ||
311 | return | ||
312 | } | ||
313 | } | ||
314 | } | ||
315 | |||
316 | // waitForSendErr waits to send a header, checking for a potential shutdown | ||
317 | func (s *Session) waitForSend(hdr header, body io.Reader) error { | ||
318 | errCh := make(chan error, 1) | ||
319 | return s.waitForSendErr(hdr, body, errCh) | ||
320 | } | ||
321 | |||
322 | // waitForSendErr waits to send a header with optional data, checking for a | ||
323 | // potential shutdown. Since there's the expectation that sends can happen | ||
324 | // in a timely manner, we enforce the connection write timeout here. | ||
325 | func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error { | ||
326 | timer := time.NewTimer(s.config.ConnectionWriteTimeout) | ||
327 | defer timer.Stop() | ||
328 | |||
329 | ready := sendReady{Hdr: hdr, Body: body, Err: errCh} | ||
330 | select { | ||
331 | case s.sendCh <- ready: | ||
332 | case <-s.shutdownCh: | ||
333 | return ErrSessionShutdown | ||
334 | case <-timer.C: | ||
335 | return ErrConnectionWriteTimeout | ||
336 | } | ||
337 | |||
338 | select { | ||
339 | case err := <-errCh: | ||
340 | return err | ||
341 | case <-s.shutdownCh: | ||
342 | return ErrSessionShutdown | ||
343 | case <-timer.C: | ||
344 | return ErrConnectionWriteTimeout | ||
345 | } | ||
346 | } | ||
347 | |||
348 | // sendNoWait does a send without waiting. Since there's the expectation that | ||
349 | // the send happens right here, we enforce the connection write timeout if we | ||
350 | // can't queue the header to be sent. | ||
351 | func (s *Session) sendNoWait(hdr header) error { | ||
352 | timer := time.NewTimer(s.config.ConnectionWriteTimeout) | ||
353 | defer timer.Stop() | ||
354 | |||
355 | select { | ||
356 | case s.sendCh <- sendReady{Hdr: hdr}: | ||
357 | return nil | ||
358 | case <-s.shutdownCh: | ||
359 | return ErrSessionShutdown | ||
360 | case <-timer.C: | ||
361 | return ErrConnectionWriteTimeout | ||
362 | } | ||
363 | } | ||
364 | |||
365 | // send is a long running goroutine that sends data | ||
366 | func (s *Session) send() { | ||
367 | for { | ||
368 | select { | ||
369 | case ready := <-s.sendCh: | ||
370 | // Send a header if ready | ||
371 | if ready.Hdr != nil { | ||
372 | sent := 0 | ||
373 | for sent < len(ready.Hdr) { | ||
374 | n, err := s.conn.Write(ready.Hdr[sent:]) | ||
375 | if err != nil { | ||
376 | s.logger.Printf("[ERR] yamux: Failed to write header: %v", err) | ||
377 | asyncSendErr(ready.Err, err) | ||
378 | s.exitErr(err) | ||
379 | return | ||
380 | } | ||
381 | sent += n | ||
382 | } | ||
383 | } | ||
384 | |||
385 | // Send data from a body if given | ||
386 | if ready.Body != nil { | ||
387 | _, err := io.Copy(s.conn, ready.Body) | ||
388 | if err != nil { | ||
389 | s.logger.Printf("[ERR] yamux: Failed to write body: %v", err) | ||
390 | asyncSendErr(ready.Err, err) | ||
391 | s.exitErr(err) | ||
392 | return | ||
393 | } | ||
394 | } | ||
395 | |||
396 | // No error, successful send | ||
397 | asyncSendErr(ready.Err, nil) | ||
398 | case <-s.shutdownCh: | ||
399 | return | ||
400 | } | ||
401 | } | ||
402 | } | ||
403 | |||
404 | // recv is a long running goroutine that accepts new data | ||
405 | func (s *Session) recv() { | ||
406 | if err := s.recvLoop(); err != nil { | ||
407 | s.exitErr(err) | ||
408 | } | ||
409 | } | ||
410 | |||
411 | // recvLoop continues to receive data until a fatal error is encountered | ||
412 | func (s *Session) recvLoop() error { | ||
413 | defer close(s.recvDoneCh) | ||
414 | hdr := header(make([]byte, headerSize)) | ||
415 | var handler func(header) error | ||
416 | for { | ||
417 | // Read the header | ||
418 | if _, err := io.ReadFull(s.bufRead, hdr); err != nil { | ||
419 | if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") { | ||
420 | s.logger.Printf("[ERR] yamux: Failed to read header: %v", err) | ||
421 | } | ||
422 | return err | ||
423 | } | ||
424 | |||
425 | // Verify the version | ||
426 | if hdr.Version() != protoVersion { | ||
427 | s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version()) | ||
428 | return ErrInvalidVersion | ||
429 | } | ||
430 | |||
431 | // Switch on the type | ||
432 | switch hdr.MsgType() { | ||
433 | case typeData: | ||
434 | handler = s.handleStreamMessage | ||
435 | case typeWindowUpdate: | ||
436 | handler = s.handleStreamMessage | ||
437 | case typeGoAway: | ||
438 | handler = s.handleGoAway | ||
439 | case typePing: | ||
440 | handler = s.handlePing | ||
441 | default: | ||
442 | return ErrInvalidMsgType | ||
443 | } | ||
444 | |||
445 | // Invoke the handler | ||
446 | if err := handler(hdr); err != nil { | ||
447 | return err | ||
448 | } | ||
449 | } | ||
450 | } | ||
451 | |||
452 | // handleStreamMessage handles either a data or window update frame | ||
453 | func (s *Session) handleStreamMessage(hdr header) error { | ||
454 | // Check for a new stream creation | ||
455 | id := hdr.StreamID() | ||
456 | flags := hdr.Flags() | ||
457 | if flags&flagSYN == flagSYN { | ||
458 | if err := s.incomingStream(id); err != nil { | ||
459 | return err | ||
460 | } | ||
461 | } | ||
462 | |||
463 | // Get the stream | ||
464 | s.streamLock.Lock() | ||
465 | stream := s.streams[id] | ||
466 | s.streamLock.Unlock() | ||
467 | |||
468 | // If we do not have a stream, likely we sent a RST | ||
469 | if stream == nil { | ||
470 | // Drain any data on the wire | ||
471 | if hdr.MsgType() == typeData && hdr.Length() > 0 { | ||
472 | s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id) | ||
473 | if _, err := io.CopyN(ioutil.Discard, s.bufRead, int64(hdr.Length())); err != nil { | ||
474 | s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err) | ||
475 | return nil | ||
476 | } | ||
477 | } else { | ||
478 | s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr) | ||
479 | } | ||
480 | return nil | ||
481 | } | ||
482 | |||
483 | // Check if this is a window update | ||
484 | if hdr.MsgType() == typeWindowUpdate { | ||
485 | if err := stream.incrSendWindow(hdr, flags); err != nil { | ||
486 | if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { | ||
487 | s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) | ||
488 | } | ||
489 | return err | ||
490 | } | ||
491 | return nil | ||
492 | } | ||
493 | |||
494 | // Read the new data | ||
495 | if err := stream.readData(hdr, flags, s.bufRead); err != nil { | ||
496 | if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { | ||
497 | s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) | ||
498 | } | ||
499 | return err | ||
500 | } | ||
501 | return nil | ||
502 | } | ||
503 | |||
504 | // handlePing is invokde for a typePing frame | ||
505 | func (s *Session) handlePing(hdr header) error { | ||
506 | flags := hdr.Flags() | ||
507 | pingID := hdr.Length() | ||
508 | |||
509 | // Check if this is a query, respond back in a separate context so we | ||
510 | // don't interfere with the receiving thread blocking for the write. | ||
511 | if flags&flagSYN == flagSYN { | ||
512 | go func() { | ||
513 | hdr := header(make([]byte, headerSize)) | ||
514 | hdr.encode(typePing, flagACK, 0, pingID) | ||
515 | if err := s.sendNoWait(hdr); err != nil { | ||
516 | s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err) | ||
517 | } | ||
518 | }() | ||
519 | return nil | ||
520 | } | ||
521 | |||
522 | // Handle a response | ||
523 | s.pingLock.Lock() | ||
524 | ch := s.pings[pingID] | ||
525 | if ch != nil { | ||
526 | delete(s.pings, pingID) | ||
527 | close(ch) | ||
528 | } | ||
529 | s.pingLock.Unlock() | ||
530 | return nil | ||
531 | } | ||
532 | |||
533 | // handleGoAway is invokde for a typeGoAway frame | ||
534 | func (s *Session) handleGoAway(hdr header) error { | ||
535 | code := hdr.Length() | ||
536 | switch code { | ||
537 | case goAwayNormal: | ||
538 | atomic.SwapInt32(&s.remoteGoAway, 1) | ||
539 | case goAwayProtoErr: | ||
540 | s.logger.Printf("[ERR] yamux: received protocol error go away") | ||
541 | return fmt.Errorf("yamux protocol error") | ||
542 | case goAwayInternalErr: | ||
543 | s.logger.Printf("[ERR] yamux: received internal error go away") | ||
544 | return fmt.Errorf("remote yamux internal error") | ||
545 | default: | ||
546 | s.logger.Printf("[ERR] yamux: received unexpected go away") | ||
547 | return fmt.Errorf("unexpected go away received") | ||
548 | } | ||
549 | return nil | ||
550 | } | ||
551 | |||
552 | // incomingStream is used to create a new incoming stream | ||
553 | func (s *Session) incomingStream(id uint32) error { | ||
554 | // Reject immediately if we are doing a go away | ||
555 | if atomic.LoadInt32(&s.localGoAway) == 1 { | ||
556 | hdr := header(make([]byte, headerSize)) | ||
557 | hdr.encode(typeWindowUpdate, flagRST, id, 0) | ||
558 | return s.sendNoWait(hdr) | ||
559 | } | ||
560 | |||
561 | // Allocate a new stream | ||
562 | stream := newStream(s, id, streamSYNReceived) | ||
563 | |||
564 | s.streamLock.Lock() | ||
565 | defer s.streamLock.Unlock() | ||
566 | |||
567 | // Check if stream already exists | ||
568 | if _, ok := s.streams[id]; ok { | ||
569 | s.logger.Printf("[ERR] yamux: duplicate stream declared") | ||
570 | if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { | ||
571 | s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) | ||
572 | } | ||
573 | return ErrDuplicateStream | ||
574 | } | ||
575 | |||
576 | // Register the stream | ||
577 | s.streams[id] = stream | ||
578 | |||
579 | // Check if we've exceeded the backlog | ||
580 | select { | ||
581 | case s.acceptCh <- stream: | ||
582 | return nil | ||
583 | default: | ||
584 | // Backlog exceeded! RST the stream | ||
585 | s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset") | ||
586 | delete(s.streams, id) | ||
587 | stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0) | ||
588 | return s.sendNoWait(stream.sendHdr) | ||
589 | } | ||
590 | } | ||
591 | |||
592 | // closeStream is used to close a stream once both sides have | ||
593 | // issued a close. If there was an in-flight SYN and the stream | ||
594 | // was not yet established, then this will give the credit back. | ||
595 | func (s *Session) closeStream(id uint32) { | ||
596 | s.streamLock.Lock() | ||
597 | if _, ok := s.inflight[id]; ok { | ||
598 | select { | ||
599 | case <-s.synCh: | ||
600 | default: | ||
601 | s.logger.Printf("[ERR] yamux: SYN tracking out of sync") | ||
602 | } | ||
603 | } | ||
604 | delete(s.streams, id) | ||
605 | s.streamLock.Unlock() | ||
606 | } | ||
607 | |||
608 | // establishStream is used to mark a stream that was in the | ||
609 | // SYN Sent state as established. | ||
610 | func (s *Session) establishStream(id uint32) { | ||
611 | s.streamLock.Lock() | ||
612 | if _, ok := s.inflight[id]; ok { | ||
613 | delete(s.inflight, id) | ||
614 | } else { | ||
615 | s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)") | ||
616 | } | ||
617 | select { | ||
618 | case <-s.synCh: | ||
619 | default: | ||
620 | s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)") | ||
621 | } | ||
622 | s.streamLock.Unlock() | ||
623 | } | ||
diff --git a/vendor/github.com/hashicorp/yamux/spec.md b/vendor/github.com/hashicorp/yamux/spec.md new file mode 100644 index 0000000..183d797 --- /dev/null +++ b/vendor/github.com/hashicorp/yamux/spec.md | |||
@@ -0,0 +1,140 @@ | |||
1 | # Specification | ||
2 | |||
3 | We use this document to detail the internal specification of Yamux. | ||
4 | This is used both as a guide for implementing Yamux, but also for | ||
5 | alternative interoperable libraries to be built. | ||
6 | |||
7 | # Framing | ||
8 | |||
9 | Yamux uses a streaming connection underneath, but imposes a message | ||
10 | framing so that it can be shared between many logical streams. Each | ||
11 | frame contains a header like: | ||
12 | |||
13 | * Version (8 bits) | ||
14 | * Type (8 bits) | ||
15 | * Flags (16 bits) | ||
16 | * StreamID (32 bits) | ||
17 | * Length (32 bits) | ||
18 | |||
19 | This means that each header has a 12 byte overhead. | ||
20 | All fields are encoded in network order (big endian). | ||
21 | Each field is described below: | ||
22 | |||
23 | ## Version Field | ||
24 | |||
25 | The version field is used for future backward compatibility. At the | ||
26 | current time, the field is always set to 0, to indicate the initial | ||
27 | version. | ||
28 | |||
29 | ## Type Field | ||
30 | |||
31 | The type field is used to switch the frame message type. The following | ||
32 | message types are supported: | ||
33 | |||
34 | * 0x0 Data - Used to transmit data. May transmit zero length payloads | ||
35 | depending on the flags. | ||
36 | |||
37 | * 0x1 Window Update - Used to updated the senders receive window size. | ||
38 | This is used to implement per-session flow control. | ||
39 | |||
40 | * 0x2 Ping - Used to measure RTT. It can also be used to heart-beat | ||
41 | and do keep-alives over TCP. | ||
42 | |||
43 | * 0x3 Go Away - Used to close a session. | ||
44 | |||
45 | ## Flag Field | ||
46 | |||
47 | The flags field is used to provide additional information related | ||
48 | to the message type. The following flags are supported: | ||
49 | |||
50 | * 0x1 SYN - Signals the start of a new stream. May be sent with a data or | ||
51 | window update message. Also sent with a ping to indicate outbound. | ||
52 | |||
53 | * 0x2 ACK - Acknowledges the start of a new stream. May be sent with a data | ||
54 | or window update message. Also sent with a ping to indicate response. | ||
55 | |||
56 | * 0x4 FIN - Performs a half-close of a stream. May be sent with a data | ||
57 | message or window update. | ||
58 | |||
59 | * 0x8 RST - Reset a stream immediately. May be sent with a data or | ||
60 | window update message. | ||
61 | |||
62 | ## StreamID Field | ||
63 | |||
64 | The StreamID field is used to identify the logical stream the frame | ||
65 | is addressing. The client side should use odd ID's, and the server even. | ||
66 | This prevents any collisions. Additionally, the 0 ID is reserved to represent | ||
67 | the session. | ||
68 | |||
69 | Both Ping and Go Away messages should always use the 0 StreamID. | ||
70 | |||
71 | ## Length Field | ||
72 | |||
73 | The meaning of the length field depends on the message type: | ||
74 | |||
75 | * Data - provides the length of bytes following the header | ||
76 | * Window update - provides a delta update to the window size | ||
77 | * Ping - Contains an opaque value, echoed back | ||
78 | * Go Away - Contains an error code | ||
79 | |||
80 | # Message Flow | ||
81 | |||
82 | There is no explicit connection setup, as Yamux relies on an underlying | ||
83 | transport to be provided. However, there is a distinction between client | ||
84 | and server side of the connection. | ||
85 | |||
86 | ## Opening a stream | ||
87 | |||
88 | To open a stream, an initial data or window update frame is sent | ||
89 | with a new StreamID. The SYN flag should be set to signal a new stream. | ||
90 | |||
91 | The receiver must then reply with either a data or window update frame | ||
92 | with the StreamID along with the ACK flag to accept the stream or with | ||
93 | the RST flag to reject the stream. | ||
94 | |||
95 | Because we are relying on the reliable stream underneath, a connection | ||
96 | can begin sending data once the SYN flag is sent. The corresponding | ||
97 | ACK does not need to be received. This is particularly well suited | ||
98 | for an RPC system where a client wants to open a stream and immediately | ||
99 | fire a request without waiting for the RTT of the ACK. | ||
100 | |||
101 | This does introduce the possibility of a connection being rejected | ||
102 | after data has been sent already. This is a slight semantic difference | ||
103 | from TCP, where the conection cannot be refused after it is opened. | ||
104 | Clients should be prepared to handle this by checking for an error | ||
105 | that indicates a RST was received. | ||
106 | |||
107 | ## Closing a stream | ||
108 | |||
109 | To close a stream, either side sends a data or window update frame | ||
110 | along with the FIN flag. This does a half-close indicating the sender | ||
111 | will send no further data. | ||
112 | |||
113 | Once both sides have closed the connection, the stream is closed. | ||
114 | |||
115 | Alternatively, if an error occurs, the RST flag can be used to | ||
116 | hard close a stream immediately. | ||
117 | |||
118 | ## Flow Control | ||
119 | |||
120 | When Yamux is initially starts each stream with a 256KB window size. | ||
121 | There is no window size for the session. | ||
122 | |||
123 | To prevent the streams from stalling, window update frames should be | ||
124 | sent regularly. Yamux can be configured to provide a larger limit for | ||
125 | windows sizes. Both sides assume the initial 256KB window, but can | ||
126 | immediately send a window update as part of the SYN/ACK indicating a | ||
127 | larger window. | ||
128 | |||
129 | Both sides should track the number of bytes sent in Data frames | ||
130 | only, as only they are tracked as part of the window size. | ||
131 | |||
132 | ## Session termination | ||
133 | |||
134 | When a session is being terminated, the Go Away message should | ||
135 | be sent. The Length should be set to one of the following to | ||
136 | provide an error code: | ||
137 | |||
138 | * 0x0 Normal termination | ||
139 | * 0x1 Protocol error | ||
140 | * 0x2 Internal error | ||
diff --git a/vendor/github.com/hashicorp/yamux/stream.go b/vendor/github.com/hashicorp/yamux/stream.go new file mode 100644 index 0000000..d216e28 --- /dev/null +++ b/vendor/github.com/hashicorp/yamux/stream.go | |||
@@ -0,0 +1,457 @@ | |||
1 | package yamux | ||
2 | |||
3 | import ( | ||
4 | "bytes" | ||
5 | "io" | ||
6 | "sync" | ||
7 | "sync/atomic" | ||
8 | "time" | ||
9 | ) | ||
10 | |||
11 | type streamState int | ||
12 | |||
13 | const ( | ||
14 | streamInit streamState = iota | ||
15 | streamSYNSent | ||
16 | streamSYNReceived | ||
17 | streamEstablished | ||
18 | streamLocalClose | ||
19 | streamRemoteClose | ||
20 | streamClosed | ||
21 | streamReset | ||
22 | ) | ||
23 | |||
24 | // Stream is used to represent a logical stream | ||
25 | // within a session. | ||
26 | type Stream struct { | ||
27 | recvWindow uint32 | ||
28 | sendWindow uint32 | ||
29 | |||
30 | id uint32 | ||
31 | session *Session | ||
32 | |||
33 | state streamState | ||
34 | stateLock sync.Mutex | ||
35 | |||
36 | recvBuf *bytes.Buffer | ||
37 | recvLock sync.Mutex | ||
38 | |||
39 | controlHdr header | ||
40 | controlErr chan error | ||
41 | controlHdrLock sync.Mutex | ||
42 | |||
43 | sendHdr header | ||
44 | sendErr chan error | ||
45 | sendLock sync.Mutex | ||
46 | |||
47 | recvNotifyCh chan struct{} | ||
48 | sendNotifyCh chan struct{} | ||
49 | |||
50 | readDeadline time.Time | ||
51 | writeDeadline time.Time | ||
52 | } | ||
53 | |||
54 | // newStream is used to construct a new stream within | ||
55 | // a given session for an ID | ||
56 | func newStream(session *Session, id uint32, state streamState) *Stream { | ||
57 | s := &Stream{ | ||
58 | id: id, | ||
59 | session: session, | ||
60 | state: state, | ||
61 | controlHdr: header(make([]byte, headerSize)), | ||
62 | controlErr: make(chan error, 1), | ||
63 | sendHdr: header(make([]byte, headerSize)), | ||
64 | sendErr: make(chan error, 1), | ||
65 | recvWindow: initialStreamWindow, | ||
66 | sendWindow: initialStreamWindow, | ||
67 | recvNotifyCh: make(chan struct{}, 1), | ||
68 | sendNotifyCh: make(chan struct{}, 1), | ||
69 | } | ||
70 | return s | ||
71 | } | ||
72 | |||
73 | // Session returns the associated stream session | ||
74 | func (s *Stream) Session() *Session { | ||
75 | return s.session | ||
76 | } | ||
77 | |||
78 | // StreamID returns the ID of this stream | ||
79 | func (s *Stream) StreamID() uint32 { | ||
80 | return s.id | ||
81 | } | ||
82 | |||
83 | // Read is used to read from the stream | ||
84 | func (s *Stream) Read(b []byte) (n int, err error) { | ||
85 | defer asyncNotify(s.recvNotifyCh) | ||
86 | START: | ||
87 | s.stateLock.Lock() | ||
88 | switch s.state { | ||
89 | case streamLocalClose: | ||
90 | fallthrough | ||
91 | case streamRemoteClose: | ||
92 | fallthrough | ||
93 | case streamClosed: | ||
94 | s.recvLock.Lock() | ||
95 | if s.recvBuf == nil || s.recvBuf.Len() == 0 { | ||
96 | s.recvLock.Unlock() | ||
97 | s.stateLock.Unlock() | ||
98 | return 0, io.EOF | ||
99 | } | ||
100 | s.recvLock.Unlock() | ||
101 | case streamReset: | ||
102 | s.stateLock.Unlock() | ||
103 | return 0, ErrConnectionReset | ||
104 | } | ||
105 | s.stateLock.Unlock() | ||
106 | |||
107 | // If there is no data available, block | ||
108 | s.recvLock.Lock() | ||
109 | if s.recvBuf == nil || s.recvBuf.Len() == 0 { | ||
110 | s.recvLock.Unlock() | ||
111 | goto WAIT | ||
112 | } | ||
113 | |||
114 | // Read any bytes | ||
115 | n, _ = s.recvBuf.Read(b) | ||
116 | s.recvLock.Unlock() | ||
117 | |||
118 | // Send a window update potentially | ||
119 | err = s.sendWindowUpdate() | ||
120 | return n, err | ||
121 | |||
122 | WAIT: | ||
123 | var timeout <-chan time.Time | ||
124 | var timer *time.Timer | ||
125 | if !s.readDeadline.IsZero() { | ||
126 | delay := s.readDeadline.Sub(time.Now()) | ||
127 | timer = time.NewTimer(delay) | ||
128 | timeout = timer.C | ||
129 | } | ||
130 | select { | ||
131 | case <-s.recvNotifyCh: | ||
132 | if timer != nil { | ||
133 | timer.Stop() | ||
134 | } | ||
135 | goto START | ||
136 | case <-timeout: | ||
137 | return 0, ErrTimeout | ||
138 | } | ||
139 | } | ||
140 | |||
141 | // Write is used to write to the stream | ||
142 | func (s *Stream) Write(b []byte) (n int, err error) { | ||
143 | s.sendLock.Lock() | ||
144 | defer s.sendLock.Unlock() | ||
145 | total := 0 | ||
146 | for total < len(b) { | ||
147 | n, err := s.write(b[total:]) | ||
148 | total += n | ||
149 | if err != nil { | ||
150 | return total, err | ||
151 | } | ||
152 | } | ||
153 | return total, nil | ||
154 | } | ||
155 | |||
156 | // write is used to write to the stream, may return on | ||
157 | // a short write. | ||
158 | func (s *Stream) write(b []byte) (n int, err error) { | ||
159 | var flags uint16 | ||
160 | var max uint32 | ||
161 | var body io.Reader | ||
162 | START: | ||
163 | s.stateLock.Lock() | ||
164 | switch s.state { | ||
165 | case streamLocalClose: | ||
166 | fallthrough | ||
167 | case streamClosed: | ||
168 | s.stateLock.Unlock() | ||
169 | return 0, ErrStreamClosed | ||
170 | case streamReset: | ||
171 | s.stateLock.Unlock() | ||
172 | return 0, ErrConnectionReset | ||
173 | } | ||
174 | s.stateLock.Unlock() | ||
175 | |||
176 | // If there is no data available, block | ||
177 | window := atomic.LoadUint32(&s.sendWindow) | ||
178 | if window == 0 { | ||
179 | goto WAIT | ||
180 | } | ||
181 | |||
182 | // Determine the flags if any | ||
183 | flags = s.sendFlags() | ||
184 | |||
185 | // Send up to our send window | ||
186 | max = min(window, uint32(len(b))) | ||
187 | body = bytes.NewReader(b[:max]) | ||
188 | |||
189 | // Send the header | ||
190 | s.sendHdr.encode(typeData, flags, s.id, max) | ||
191 | if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil { | ||
192 | return 0, err | ||
193 | } | ||
194 | |||
195 | // Reduce our send window | ||
196 | atomic.AddUint32(&s.sendWindow, ^uint32(max-1)) | ||
197 | |||
198 | // Unlock | ||
199 | return int(max), err | ||
200 | |||
201 | WAIT: | ||
202 | var timeout <-chan time.Time | ||
203 | if !s.writeDeadline.IsZero() { | ||
204 | delay := s.writeDeadline.Sub(time.Now()) | ||
205 | timeout = time.After(delay) | ||
206 | } | ||
207 | select { | ||
208 | case <-s.sendNotifyCh: | ||
209 | goto START | ||
210 | case <-timeout: | ||
211 | return 0, ErrTimeout | ||
212 | } | ||
213 | return 0, nil | ||
214 | } | ||
215 | |||
216 | // sendFlags determines any flags that are appropriate | ||
217 | // based on the current stream state | ||
218 | func (s *Stream) sendFlags() uint16 { | ||
219 | s.stateLock.Lock() | ||
220 | defer s.stateLock.Unlock() | ||
221 | var flags uint16 | ||
222 | switch s.state { | ||
223 | case streamInit: | ||
224 | flags |= flagSYN | ||
225 | s.state = streamSYNSent | ||
226 | case streamSYNReceived: | ||
227 | flags |= flagACK | ||
228 | s.state = streamEstablished | ||
229 | } | ||
230 | return flags | ||
231 | } | ||
232 | |||
233 | // sendWindowUpdate potentially sends a window update enabling | ||
234 | // further writes to take place. Must be invoked with the lock. | ||
235 | func (s *Stream) sendWindowUpdate() error { | ||
236 | s.controlHdrLock.Lock() | ||
237 | defer s.controlHdrLock.Unlock() | ||
238 | |||
239 | // Determine the delta update | ||
240 | max := s.session.config.MaxStreamWindowSize | ||
241 | delta := max - atomic.LoadUint32(&s.recvWindow) | ||
242 | |||
243 | // Determine the flags if any | ||
244 | flags := s.sendFlags() | ||
245 | |||
246 | // Check if we can omit the update | ||
247 | if delta < (max/2) && flags == 0 { | ||
248 | return nil | ||
249 | } | ||
250 | |||
251 | // Update our window | ||
252 | atomic.AddUint32(&s.recvWindow, delta) | ||
253 | |||
254 | // Send the header | ||
255 | s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta) | ||
256 | if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil { | ||
257 | return err | ||
258 | } | ||
259 | return nil | ||
260 | } | ||
261 | |||
262 | // sendClose is used to send a FIN | ||
263 | func (s *Stream) sendClose() error { | ||
264 | s.controlHdrLock.Lock() | ||
265 | defer s.controlHdrLock.Unlock() | ||
266 | |||
267 | flags := s.sendFlags() | ||
268 | flags |= flagFIN | ||
269 | s.controlHdr.encode(typeWindowUpdate, flags, s.id, 0) | ||
270 | if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil { | ||
271 | return err | ||
272 | } | ||
273 | return nil | ||
274 | } | ||
275 | |||
276 | // Close is used to close the stream | ||
277 | func (s *Stream) Close() error { | ||
278 | closeStream := false | ||
279 | s.stateLock.Lock() | ||
280 | switch s.state { | ||
281 | // Opened means we need to signal a close | ||
282 | case streamSYNSent: | ||
283 | fallthrough | ||
284 | case streamSYNReceived: | ||
285 | fallthrough | ||
286 | case streamEstablished: | ||
287 | s.state = streamLocalClose | ||
288 | goto SEND_CLOSE | ||
289 | |||
290 | case streamLocalClose: | ||
291 | case streamRemoteClose: | ||
292 | s.state = streamClosed | ||
293 | closeStream = true | ||
294 | goto SEND_CLOSE | ||
295 | |||
296 | case streamClosed: | ||
297 | case streamReset: | ||
298 | default: | ||
299 | panic("unhandled state") | ||
300 | } | ||
301 | s.stateLock.Unlock() | ||
302 | return nil | ||
303 | SEND_CLOSE: | ||
304 | s.stateLock.Unlock() | ||
305 | s.sendClose() | ||
306 | s.notifyWaiting() | ||
307 | if closeStream { | ||
308 | s.session.closeStream(s.id) | ||
309 | } | ||
310 | return nil | ||
311 | } | ||
312 | |||
313 | // forceClose is used for when the session is exiting | ||
314 | func (s *Stream) forceClose() { | ||
315 | s.stateLock.Lock() | ||
316 | s.state = streamClosed | ||
317 | s.stateLock.Unlock() | ||
318 | s.notifyWaiting() | ||
319 | } | ||
320 | |||
321 | // processFlags is used to update the state of the stream | ||
322 | // based on set flags, if any. Lock must be held | ||
323 | func (s *Stream) processFlags(flags uint16) error { | ||
324 | // Close the stream without holding the state lock | ||
325 | closeStream := false | ||
326 | defer func() { | ||
327 | if closeStream { | ||
328 | s.session.closeStream(s.id) | ||
329 | } | ||
330 | }() | ||
331 | |||
332 | s.stateLock.Lock() | ||
333 | defer s.stateLock.Unlock() | ||
334 | if flags&flagACK == flagACK { | ||
335 | if s.state == streamSYNSent { | ||
336 | s.state = streamEstablished | ||
337 | } | ||
338 | s.session.establishStream(s.id) | ||
339 | } | ||
340 | if flags&flagFIN == flagFIN { | ||
341 | switch s.state { | ||
342 | case streamSYNSent: | ||
343 | fallthrough | ||
344 | case streamSYNReceived: | ||
345 | fallthrough | ||
346 | case streamEstablished: | ||
347 | s.state = streamRemoteClose | ||
348 | s.notifyWaiting() | ||
349 | case streamLocalClose: | ||
350 | s.state = streamClosed | ||
351 | closeStream = true | ||
352 | s.notifyWaiting() | ||
353 | default: | ||
354 | s.session.logger.Printf("[ERR] yamux: unexpected FIN flag in state %d", s.state) | ||
355 | return ErrUnexpectedFlag | ||
356 | } | ||
357 | } | ||
358 | if flags&flagRST == flagRST { | ||
359 | s.state = streamReset | ||
360 | closeStream = true | ||
361 | s.notifyWaiting() | ||
362 | } | ||
363 | return nil | ||
364 | } | ||
365 | |||
366 | // notifyWaiting notifies all the waiting channels | ||
367 | func (s *Stream) notifyWaiting() { | ||
368 | asyncNotify(s.recvNotifyCh) | ||
369 | asyncNotify(s.sendNotifyCh) | ||
370 | } | ||
371 | |||
372 | // incrSendWindow updates the size of our send window | ||
373 | func (s *Stream) incrSendWindow(hdr header, flags uint16) error { | ||
374 | if err := s.processFlags(flags); err != nil { | ||
375 | return err | ||
376 | } | ||
377 | |||
378 | // Increase window, unblock a sender | ||
379 | atomic.AddUint32(&s.sendWindow, hdr.Length()) | ||
380 | asyncNotify(s.sendNotifyCh) | ||
381 | return nil | ||
382 | } | ||
383 | |||
384 | // readData is used to handle a data frame | ||
385 | func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error { | ||
386 | if err := s.processFlags(flags); err != nil { | ||
387 | return err | ||
388 | } | ||
389 | |||
390 | // Check that our recv window is not exceeded | ||
391 | length := hdr.Length() | ||
392 | if length == 0 { | ||
393 | return nil | ||
394 | } | ||
395 | if remain := atomic.LoadUint32(&s.recvWindow); length > remain { | ||
396 | s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, remain, length) | ||
397 | return ErrRecvWindowExceeded | ||
398 | } | ||
399 | |||
400 | // Wrap in a limited reader | ||
401 | conn = &io.LimitedReader{R: conn, N: int64(length)} | ||
402 | |||
403 | // Copy into buffer | ||
404 | s.recvLock.Lock() | ||
405 | if s.recvBuf == nil { | ||
406 | // Allocate the receive buffer just-in-time to fit the full data frame. | ||
407 | // This way we can read in the whole packet without further allocations. | ||
408 | s.recvBuf = bytes.NewBuffer(make([]byte, 0, length)) | ||
409 | } | ||
410 | if _, err := io.Copy(s.recvBuf, conn); err != nil { | ||
411 | s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err) | ||
412 | s.recvLock.Unlock() | ||
413 | return err | ||
414 | } | ||
415 | |||
416 | // Decrement the receive window | ||
417 | atomic.AddUint32(&s.recvWindow, ^uint32(length-1)) | ||
418 | s.recvLock.Unlock() | ||
419 | |||
420 | // Unblock any readers | ||
421 | asyncNotify(s.recvNotifyCh) | ||
422 | return nil | ||
423 | } | ||
424 | |||
425 | // SetDeadline sets the read and write deadlines | ||
426 | func (s *Stream) SetDeadline(t time.Time) error { | ||
427 | if err := s.SetReadDeadline(t); err != nil { | ||
428 | return err | ||
429 | } | ||
430 | if err := s.SetWriteDeadline(t); err != nil { | ||
431 | return err | ||
432 | } | ||
433 | return nil | ||
434 | } | ||
435 | |||
436 | // SetReadDeadline sets the deadline for future Read calls. | ||
437 | func (s *Stream) SetReadDeadline(t time.Time) error { | ||
438 | s.readDeadline = t | ||
439 | return nil | ||
440 | } | ||
441 | |||
442 | // SetWriteDeadline sets the deadline for future Write calls | ||
443 | func (s *Stream) SetWriteDeadline(t time.Time) error { | ||
444 | s.writeDeadline = t | ||
445 | return nil | ||
446 | } | ||
447 | |||
448 | // Shrink is used to compact the amount of buffers utilized | ||
449 | // This is useful when using Yamux in a connection pool to reduce | ||
450 | // the idle memory utilization. | ||
451 | func (s *Stream) Shrink() { | ||
452 | s.recvLock.Lock() | ||
453 | if s.recvBuf != nil && s.recvBuf.Len() == 0 { | ||
454 | s.recvBuf = nil | ||
455 | } | ||
456 | s.recvLock.Unlock() | ||
457 | } | ||
diff --git a/vendor/github.com/hashicorp/yamux/util.go b/vendor/github.com/hashicorp/yamux/util.go new file mode 100644 index 0000000..5fe45af --- /dev/null +++ b/vendor/github.com/hashicorp/yamux/util.go | |||
@@ -0,0 +1,28 @@ | |||
1 | package yamux | ||
2 | |||
3 | // asyncSendErr is used to try an async send of an error | ||
4 | func asyncSendErr(ch chan error, err error) { | ||
5 | if ch == nil { | ||
6 | return | ||
7 | } | ||
8 | select { | ||
9 | case ch <- err: | ||
10 | default: | ||
11 | } | ||
12 | } | ||
13 | |||
14 | // asyncNotify is used to signal a waiting goroutine | ||
15 | func asyncNotify(ch chan struct{}) { | ||
16 | select { | ||
17 | case ch <- struct{}{}: | ||
18 | default: | ||
19 | } | ||
20 | } | ||
21 | |||
22 | // min computes the minimum of two values | ||
23 | func min(a, b uint32) uint32 { | ||
24 | if a < b { | ||
25 | return a | ||
26 | } | ||
27 | return b | ||
28 | } | ||