diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport/controlbuf.go')
-rw-r--r-- | vendor/google.golang.org/grpc/internal/transport/controlbuf.go | 852 |
1 files changed, 852 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/controlbuf.go b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go new file mode 100644 index 0000000..204ba15 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/transport/controlbuf.go | |||
@@ -0,0 +1,852 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package transport | ||
20 | |||
21 | import ( | ||
22 | "bytes" | ||
23 | "fmt" | ||
24 | "runtime" | ||
25 | "sync" | ||
26 | |||
27 | "golang.org/x/net/http2" | ||
28 | "golang.org/x/net/http2/hpack" | ||
29 | ) | ||
30 | |||
31 | var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) { | ||
32 | e.SetMaxDynamicTableSizeLimit(v) | ||
33 | } | ||
34 | |||
35 | type itemNode struct { | ||
36 | it interface{} | ||
37 | next *itemNode | ||
38 | } | ||
39 | |||
40 | type itemList struct { | ||
41 | head *itemNode | ||
42 | tail *itemNode | ||
43 | } | ||
44 | |||
45 | func (il *itemList) enqueue(i interface{}) { | ||
46 | n := &itemNode{it: i} | ||
47 | if il.tail == nil { | ||
48 | il.head, il.tail = n, n | ||
49 | return | ||
50 | } | ||
51 | il.tail.next = n | ||
52 | il.tail = n | ||
53 | } | ||
54 | |||
55 | // peek returns the first item in the list without removing it from the | ||
56 | // list. | ||
57 | func (il *itemList) peek() interface{} { | ||
58 | return il.head.it | ||
59 | } | ||
60 | |||
61 | func (il *itemList) dequeue() interface{} { | ||
62 | if il.head == nil { | ||
63 | return nil | ||
64 | } | ||
65 | i := il.head.it | ||
66 | il.head = il.head.next | ||
67 | if il.head == nil { | ||
68 | il.tail = nil | ||
69 | } | ||
70 | return i | ||
71 | } | ||
72 | |||
73 | func (il *itemList) dequeueAll() *itemNode { | ||
74 | h := il.head | ||
75 | il.head, il.tail = nil, nil | ||
76 | return h | ||
77 | } | ||
78 | |||
79 | func (il *itemList) isEmpty() bool { | ||
80 | return il.head == nil | ||
81 | } | ||
82 | |||
83 | // The following defines various control items which could flow through | ||
84 | // the control buffer of transport. They represent different aspects of | ||
85 | // control tasks, e.g., flow control, settings, streaming resetting, etc. | ||
86 | |||
87 | // registerStream is used to register an incoming stream with loopy writer. | ||
88 | type registerStream struct { | ||
89 | streamID uint32 | ||
90 | wq *writeQuota | ||
91 | } | ||
92 | |||
93 | // headerFrame is also used to register stream on the client-side. | ||
94 | type headerFrame struct { | ||
95 | streamID uint32 | ||
96 | hf []hpack.HeaderField | ||
97 | endStream bool // Valid on server side. | ||
98 | initStream func(uint32) (bool, error) // Used only on the client side. | ||
99 | onWrite func() | ||
100 | wq *writeQuota // write quota for the stream created. | ||
101 | cleanup *cleanupStream // Valid on the server side. | ||
102 | onOrphaned func(error) // Valid on client-side | ||
103 | } | ||
104 | |||
105 | type cleanupStream struct { | ||
106 | streamID uint32 | ||
107 | rst bool | ||
108 | rstCode http2.ErrCode | ||
109 | onWrite func() | ||
110 | } | ||
111 | |||
112 | type dataFrame struct { | ||
113 | streamID uint32 | ||
114 | endStream bool | ||
115 | h []byte | ||
116 | d []byte | ||
117 | // onEachWrite is called every time | ||
118 | // a part of d is written out. | ||
119 | onEachWrite func() | ||
120 | } | ||
121 | |||
122 | type incomingWindowUpdate struct { | ||
123 | streamID uint32 | ||
124 | increment uint32 | ||
125 | } | ||
126 | |||
127 | type outgoingWindowUpdate struct { | ||
128 | streamID uint32 | ||
129 | increment uint32 | ||
130 | } | ||
131 | |||
132 | type incomingSettings struct { | ||
133 | ss []http2.Setting | ||
134 | } | ||
135 | |||
136 | type outgoingSettings struct { | ||
137 | ss []http2.Setting | ||
138 | } | ||
139 | |||
140 | type incomingGoAway struct { | ||
141 | } | ||
142 | |||
143 | type goAway struct { | ||
144 | code http2.ErrCode | ||
145 | debugData []byte | ||
146 | headsUp bool | ||
147 | closeConn bool | ||
148 | } | ||
149 | |||
150 | type ping struct { | ||
151 | ack bool | ||
152 | data [8]byte | ||
153 | } | ||
154 | |||
155 | type outFlowControlSizeRequest struct { | ||
156 | resp chan uint32 | ||
157 | } | ||
158 | |||
159 | type outStreamState int | ||
160 | |||
161 | const ( | ||
162 | active outStreamState = iota | ||
163 | empty | ||
164 | waitingOnStreamQuota | ||
165 | ) | ||
166 | |||
167 | type outStream struct { | ||
168 | id uint32 | ||
169 | state outStreamState | ||
170 | itl *itemList | ||
171 | bytesOutStanding int | ||
172 | wq *writeQuota | ||
173 | |||
174 | next *outStream | ||
175 | prev *outStream | ||
176 | } | ||
177 | |||
178 | func (s *outStream) deleteSelf() { | ||
179 | if s.prev != nil { | ||
180 | s.prev.next = s.next | ||
181 | } | ||
182 | if s.next != nil { | ||
183 | s.next.prev = s.prev | ||
184 | } | ||
185 | s.next, s.prev = nil, nil | ||
186 | } | ||
187 | |||
188 | type outStreamList struct { | ||
189 | // Following are sentinel objects that mark the | ||
190 | // beginning and end of the list. They do not | ||
191 | // contain any item lists. All valid objects are | ||
192 | // inserted in between them. | ||
193 | // This is needed so that an outStream object can | ||
194 | // deleteSelf() in O(1) time without knowing which | ||
195 | // list it belongs to. | ||
196 | head *outStream | ||
197 | tail *outStream | ||
198 | } | ||
199 | |||
200 | func newOutStreamList() *outStreamList { | ||
201 | head, tail := new(outStream), new(outStream) | ||
202 | head.next = tail | ||
203 | tail.prev = head | ||
204 | return &outStreamList{ | ||
205 | head: head, | ||
206 | tail: tail, | ||
207 | } | ||
208 | } | ||
209 | |||
210 | func (l *outStreamList) enqueue(s *outStream) { | ||
211 | e := l.tail.prev | ||
212 | e.next = s | ||
213 | s.prev = e | ||
214 | s.next = l.tail | ||
215 | l.tail.prev = s | ||
216 | } | ||
217 | |||
218 | // remove from the beginning of the list. | ||
219 | func (l *outStreamList) dequeue() *outStream { | ||
220 | b := l.head.next | ||
221 | if b == l.tail { | ||
222 | return nil | ||
223 | } | ||
224 | b.deleteSelf() | ||
225 | return b | ||
226 | } | ||
227 | |||
228 | // controlBuffer is a way to pass information to loopy. | ||
229 | // Information is passed as specific struct types called control frames. | ||
230 | // A control frame not only represents data, messages or headers to be sent out | ||
231 | // but can also be used to instruct loopy to update its internal state. | ||
232 | // It shouldn't be confused with an HTTP2 frame, although some of the control frames | ||
233 | // like dataFrame and headerFrame do go out on wire as HTTP2 frames. | ||
234 | type controlBuffer struct { | ||
235 | ch chan struct{} | ||
236 | done <-chan struct{} | ||
237 | mu sync.Mutex | ||
238 | consumerWaiting bool | ||
239 | list *itemList | ||
240 | err error | ||
241 | } | ||
242 | |||
243 | func newControlBuffer(done <-chan struct{}) *controlBuffer { | ||
244 | return &controlBuffer{ | ||
245 | ch: make(chan struct{}, 1), | ||
246 | list: &itemList{}, | ||
247 | done: done, | ||
248 | } | ||
249 | } | ||
250 | |||
251 | func (c *controlBuffer) put(it interface{}) error { | ||
252 | _, err := c.executeAndPut(nil, it) | ||
253 | return err | ||
254 | } | ||
255 | |||
256 | func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) { | ||
257 | var wakeUp bool | ||
258 | c.mu.Lock() | ||
259 | if c.err != nil { | ||
260 | c.mu.Unlock() | ||
261 | return false, c.err | ||
262 | } | ||
263 | if f != nil { | ||
264 | if !f(it) { // f wasn't successful | ||
265 | c.mu.Unlock() | ||
266 | return false, nil | ||
267 | } | ||
268 | } | ||
269 | if c.consumerWaiting { | ||
270 | wakeUp = true | ||
271 | c.consumerWaiting = false | ||
272 | } | ||
273 | c.list.enqueue(it) | ||
274 | c.mu.Unlock() | ||
275 | if wakeUp { | ||
276 | select { | ||
277 | case c.ch <- struct{}{}: | ||
278 | default: | ||
279 | } | ||
280 | } | ||
281 | return true, nil | ||
282 | } | ||
283 | |||
284 | // Note argument f should never be nil. | ||
285 | func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) { | ||
286 | c.mu.Lock() | ||
287 | if c.err != nil { | ||
288 | c.mu.Unlock() | ||
289 | return false, c.err | ||
290 | } | ||
291 | if !f(it) { // f wasn't successful | ||
292 | c.mu.Unlock() | ||
293 | return false, nil | ||
294 | } | ||
295 | c.mu.Unlock() | ||
296 | return true, nil | ||
297 | } | ||
298 | |||
299 | func (c *controlBuffer) get(block bool) (interface{}, error) { | ||
300 | for { | ||
301 | c.mu.Lock() | ||
302 | if c.err != nil { | ||
303 | c.mu.Unlock() | ||
304 | return nil, c.err | ||
305 | } | ||
306 | if !c.list.isEmpty() { | ||
307 | h := c.list.dequeue() | ||
308 | c.mu.Unlock() | ||
309 | return h, nil | ||
310 | } | ||
311 | if !block { | ||
312 | c.mu.Unlock() | ||
313 | return nil, nil | ||
314 | } | ||
315 | c.consumerWaiting = true | ||
316 | c.mu.Unlock() | ||
317 | select { | ||
318 | case <-c.ch: | ||
319 | case <-c.done: | ||
320 | c.finish() | ||
321 | return nil, ErrConnClosing | ||
322 | } | ||
323 | } | ||
324 | } | ||
325 | |||
326 | func (c *controlBuffer) finish() { | ||
327 | c.mu.Lock() | ||
328 | if c.err != nil { | ||
329 | c.mu.Unlock() | ||
330 | return | ||
331 | } | ||
332 | c.err = ErrConnClosing | ||
333 | // There may be headers for streams in the control buffer. | ||
334 | // These streams need to be cleaned out since the transport | ||
335 | // is still not aware of these yet. | ||
336 | for head := c.list.dequeueAll(); head != nil; head = head.next { | ||
337 | hdr, ok := head.it.(*headerFrame) | ||
338 | if !ok { | ||
339 | continue | ||
340 | } | ||
341 | if hdr.onOrphaned != nil { // It will be nil on the server-side. | ||
342 | hdr.onOrphaned(ErrConnClosing) | ||
343 | } | ||
344 | } | ||
345 | c.mu.Unlock() | ||
346 | } | ||
347 | |||
348 | type side int | ||
349 | |||
350 | const ( | ||
351 | clientSide side = iota | ||
352 | serverSide | ||
353 | ) | ||
354 | |||
355 | // Loopy receives frames from the control buffer. | ||
356 | // Each frame is handled individually; most of the work done by loopy goes | ||
357 | // into handling data frames. Loopy maintains a queue of active streams, and each | ||
358 | // stream maintains a queue of data frames; as loopy receives data frames | ||
359 | // it gets added to the queue of the relevant stream. | ||
360 | // Loopy goes over this list of active streams by processing one node every iteration, | ||
361 | // thereby closely resemebling to a round-robin scheduling over all streams. While | ||
362 | // processing a stream, loopy writes out data bytes from this stream capped by the min | ||
363 | // of http2MaxFrameLen, connection-level flow control and stream-level flow control. | ||
364 | type loopyWriter struct { | ||
365 | side side | ||
366 | cbuf *controlBuffer | ||
367 | sendQuota uint32 | ||
368 | oiws uint32 // outbound initial window size. | ||
369 | // estdStreams is map of all established streams that are not cleaned-up yet. | ||
370 | // On client-side, this is all streams whose headers were sent out. | ||
371 | // On server-side, this is all streams whose headers were received. | ||
372 | estdStreams map[uint32]*outStream // Established streams. | ||
373 | // activeStreams is a linked-list of all streams that have data to send and some | ||
374 | // stream-level flow control quota. | ||
375 | // Each of these streams internally have a list of data items(and perhaps trailers | ||
376 | // on the server-side) to be sent out. | ||
377 | activeStreams *outStreamList | ||
378 | framer *framer | ||
379 | hBuf *bytes.Buffer // The buffer for HPACK encoding. | ||
380 | hEnc *hpack.Encoder // HPACK encoder. | ||
381 | bdpEst *bdpEstimator | ||
382 | draining bool | ||
383 | |||
384 | // Side-specific handlers | ||
385 | ssGoAwayHandler func(*goAway) (bool, error) | ||
386 | } | ||
387 | |||
388 | func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter { | ||
389 | var buf bytes.Buffer | ||
390 | l := &loopyWriter{ | ||
391 | side: s, | ||
392 | cbuf: cbuf, | ||
393 | sendQuota: defaultWindowSize, | ||
394 | oiws: defaultWindowSize, | ||
395 | estdStreams: make(map[uint32]*outStream), | ||
396 | activeStreams: newOutStreamList(), | ||
397 | framer: fr, | ||
398 | hBuf: &buf, | ||
399 | hEnc: hpack.NewEncoder(&buf), | ||
400 | bdpEst: bdpEst, | ||
401 | } | ||
402 | return l | ||
403 | } | ||
404 | |||
405 | const minBatchSize = 1000 | ||
406 | |||
407 | // run should be run in a separate goroutine. | ||
408 | // It reads control frames from controlBuf and processes them by: | ||
409 | // 1. Updating loopy's internal state, or/and | ||
410 | // 2. Writing out HTTP2 frames on the wire. | ||
411 | // | ||
412 | // Loopy keeps all active streams with data to send in a linked-list. | ||
413 | // All streams in the activeStreams linked-list must have both: | ||
414 | // 1. Data to send, and | ||
415 | // 2. Stream level flow control quota available. | ||
416 | // | ||
417 | // In each iteration of run loop, other than processing the incoming control | ||
418 | // frame, loopy calls processData, which processes one node from the activeStreams linked-list. | ||
419 | // This results in writing of HTTP2 frames into an underlying write buffer. | ||
420 | // When there's no more control frames to read from controlBuf, loopy flushes the write buffer. | ||
421 | // As an optimization, to increase the batch size for each flush, loopy yields the processor, once | ||
422 | // if the batch size is too low to give stream goroutines a chance to fill it up. | ||
423 | func (l *loopyWriter) run() (err error) { | ||
424 | defer func() { | ||
425 | if err == ErrConnClosing { | ||
426 | // Don't log ErrConnClosing as error since it happens | ||
427 | // 1. When the connection is closed by some other known issue. | ||
428 | // 2. User closed the connection. | ||
429 | // 3. A graceful close of connection. | ||
430 | infof("transport: loopyWriter.run returning. %v", err) | ||
431 | err = nil | ||
432 | } | ||
433 | }() | ||
434 | for { | ||
435 | it, err := l.cbuf.get(true) | ||
436 | if err != nil { | ||
437 | return err | ||
438 | } | ||
439 | if err = l.handle(it); err != nil { | ||
440 | return err | ||
441 | } | ||
442 | if _, err = l.processData(); err != nil { | ||
443 | return err | ||
444 | } | ||
445 | gosched := true | ||
446 | hasdata: | ||
447 | for { | ||
448 | it, err := l.cbuf.get(false) | ||
449 | if err != nil { | ||
450 | return err | ||
451 | } | ||
452 | if it != nil { | ||
453 | if err = l.handle(it); err != nil { | ||
454 | return err | ||
455 | } | ||
456 | if _, err = l.processData(); err != nil { | ||
457 | return err | ||
458 | } | ||
459 | continue hasdata | ||
460 | } | ||
461 | isEmpty, err := l.processData() | ||
462 | if err != nil { | ||
463 | return err | ||
464 | } | ||
465 | if !isEmpty { | ||
466 | continue hasdata | ||
467 | } | ||
468 | if gosched { | ||
469 | gosched = false | ||
470 | if l.framer.writer.offset < minBatchSize { | ||
471 | runtime.Gosched() | ||
472 | continue hasdata | ||
473 | } | ||
474 | } | ||
475 | l.framer.writer.Flush() | ||
476 | break hasdata | ||
477 | |||
478 | } | ||
479 | } | ||
480 | } | ||
481 | |||
482 | func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error { | ||
483 | return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment) | ||
484 | } | ||
485 | |||
486 | func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error { | ||
487 | // Otherwise update the quota. | ||
488 | if w.streamID == 0 { | ||
489 | l.sendQuota += w.increment | ||
490 | return nil | ||
491 | } | ||
492 | // Find the stream and update it. | ||
493 | if str, ok := l.estdStreams[w.streamID]; ok { | ||
494 | str.bytesOutStanding -= int(w.increment) | ||
495 | if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota { | ||
496 | str.state = active | ||
497 | l.activeStreams.enqueue(str) | ||
498 | return nil | ||
499 | } | ||
500 | } | ||
501 | return nil | ||
502 | } | ||
503 | |||
504 | func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error { | ||
505 | return l.framer.fr.WriteSettings(s.ss...) | ||
506 | } | ||
507 | |||
508 | func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error { | ||
509 | if err := l.applySettings(s.ss); err != nil { | ||
510 | return err | ||
511 | } | ||
512 | return l.framer.fr.WriteSettingsAck() | ||
513 | } | ||
514 | |||
515 | func (l *loopyWriter) registerStreamHandler(h *registerStream) error { | ||
516 | str := &outStream{ | ||
517 | id: h.streamID, | ||
518 | state: empty, | ||
519 | itl: &itemList{}, | ||
520 | wq: h.wq, | ||
521 | } | ||
522 | l.estdStreams[h.streamID] = str | ||
523 | return nil | ||
524 | } | ||
525 | |||
526 | func (l *loopyWriter) headerHandler(h *headerFrame) error { | ||
527 | if l.side == serverSide { | ||
528 | str, ok := l.estdStreams[h.streamID] | ||
529 | if !ok { | ||
530 | warningf("transport: loopy doesn't recognize the stream: %d", h.streamID) | ||
531 | return nil | ||
532 | } | ||
533 | // Case 1.A: Server is responding back with headers. | ||
534 | if !h.endStream { | ||
535 | return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite) | ||
536 | } | ||
537 | // else: Case 1.B: Server wants to close stream. | ||
538 | |||
539 | if str.state != empty { // either active or waiting on stream quota. | ||
540 | // add it str's list of items. | ||
541 | str.itl.enqueue(h) | ||
542 | return nil | ||
543 | } | ||
544 | if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil { | ||
545 | return err | ||
546 | } | ||
547 | return l.cleanupStreamHandler(h.cleanup) | ||
548 | } | ||
549 | // Case 2: Client wants to originate stream. | ||
550 | str := &outStream{ | ||
551 | id: h.streamID, | ||
552 | state: empty, | ||
553 | itl: &itemList{}, | ||
554 | wq: h.wq, | ||
555 | } | ||
556 | str.itl.enqueue(h) | ||
557 | return l.originateStream(str) | ||
558 | } | ||
559 | |||
560 | func (l *loopyWriter) originateStream(str *outStream) error { | ||
561 | hdr := str.itl.dequeue().(*headerFrame) | ||
562 | sendPing, err := hdr.initStream(str.id) | ||
563 | if err != nil { | ||
564 | if err == ErrConnClosing { | ||
565 | return err | ||
566 | } | ||
567 | // Other errors(errStreamDrain) need not close transport. | ||
568 | return nil | ||
569 | } | ||
570 | if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil { | ||
571 | return err | ||
572 | } | ||
573 | l.estdStreams[str.id] = str | ||
574 | if sendPing { | ||
575 | return l.pingHandler(&ping{data: [8]byte{}}) | ||
576 | } | ||
577 | return nil | ||
578 | } | ||
579 | |||
580 | func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error { | ||
581 | if onWrite != nil { | ||
582 | onWrite() | ||
583 | } | ||
584 | l.hBuf.Reset() | ||
585 | for _, f := range hf { | ||
586 | if err := l.hEnc.WriteField(f); err != nil { | ||
587 | warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err) | ||
588 | } | ||
589 | } | ||
590 | var ( | ||
591 | err error | ||
592 | endHeaders, first bool | ||
593 | ) | ||
594 | first = true | ||
595 | for !endHeaders { | ||
596 | size := l.hBuf.Len() | ||
597 | if size > http2MaxFrameLen { | ||
598 | size = http2MaxFrameLen | ||
599 | } else { | ||
600 | endHeaders = true | ||
601 | } | ||
602 | if first { | ||
603 | first = false | ||
604 | err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{ | ||
605 | StreamID: streamID, | ||
606 | BlockFragment: l.hBuf.Next(size), | ||
607 | EndStream: endStream, | ||
608 | EndHeaders: endHeaders, | ||
609 | }) | ||
610 | } else { | ||
611 | err = l.framer.fr.WriteContinuation( | ||
612 | streamID, | ||
613 | endHeaders, | ||
614 | l.hBuf.Next(size), | ||
615 | ) | ||
616 | } | ||
617 | if err != nil { | ||
618 | return err | ||
619 | } | ||
620 | } | ||
621 | return nil | ||
622 | } | ||
623 | |||
624 | func (l *loopyWriter) preprocessData(df *dataFrame) error { | ||
625 | str, ok := l.estdStreams[df.streamID] | ||
626 | if !ok { | ||
627 | return nil | ||
628 | } | ||
629 | // If we got data for a stream it means that | ||
630 | // stream was originated and the headers were sent out. | ||
631 | str.itl.enqueue(df) | ||
632 | if str.state == empty { | ||
633 | str.state = active | ||
634 | l.activeStreams.enqueue(str) | ||
635 | } | ||
636 | return nil | ||
637 | } | ||
638 | |||
639 | func (l *loopyWriter) pingHandler(p *ping) error { | ||
640 | if !p.ack { | ||
641 | l.bdpEst.timesnap(p.data) | ||
642 | } | ||
643 | return l.framer.fr.WritePing(p.ack, p.data) | ||
644 | |||
645 | } | ||
646 | |||
647 | func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error { | ||
648 | o.resp <- l.sendQuota | ||
649 | return nil | ||
650 | } | ||
651 | |||
652 | func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error { | ||
653 | c.onWrite() | ||
654 | if str, ok := l.estdStreams[c.streamID]; ok { | ||
655 | // On the server side it could be a trailers-only response or | ||
656 | // a RST_STREAM before stream initialization thus the stream might | ||
657 | // not be established yet. | ||
658 | delete(l.estdStreams, c.streamID) | ||
659 | str.deleteSelf() | ||
660 | } | ||
661 | if c.rst { // If RST_STREAM needs to be sent. | ||
662 | if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil { | ||
663 | return err | ||
664 | } | ||
665 | } | ||
666 | if l.side == clientSide && l.draining && len(l.estdStreams) == 0 { | ||
667 | return ErrConnClosing | ||
668 | } | ||
669 | return nil | ||
670 | } | ||
671 | |||
672 | func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error { | ||
673 | if l.side == clientSide { | ||
674 | l.draining = true | ||
675 | if len(l.estdStreams) == 0 { | ||
676 | return ErrConnClosing | ||
677 | } | ||
678 | } | ||
679 | return nil | ||
680 | } | ||
681 | |||
682 | func (l *loopyWriter) goAwayHandler(g *goAway) error { | ||
683 | // Handling of outgoing GoAway is very specific to side. | ||
684 | if l.ssGoAwayHandler != nil { | ||
685 | draining, err := l.ssGoAwayHandler(g) | ||
686 | if err != nil { | ||
687 | return err | ||
688 | } | ||
689 | l.draining = draining | ||
690 | } | ||
691 | return nil | ||
692 | } | ||
693 | |||
694 | func (l *loopyWriter) handle(i interface{}) error { | ||
695 | switch i := i.(type) { | ||
696 | case *incomingWindowUpdate: | ||
697 | return l.incomingWindowUpdateHandler(i) | ||
698 | case *outgoingWindowUpdate: | ||
699 | return l.outgoingWindowUpdateHandler(i) | ||
700 | case *incomingSettings: | ||
701 | return l.incomingSettingsHandler(i) | ||
702 | case *outgoingSettings: | ||
703 | return l.outgoingSettingsHandler(i) | ||
704 | case *headerFrame: | ||
705 | return l.headerHandler(i) | ||
706 | case *registerStream: | ||
707 | return l.registerStreamHandler(i) | ||
708 | case *cleanupStream: | ||
709 | return l.cleanupStreamHandler(i) | ||
710 | case *incomingGoAway: | ||
711 | return l.incomingGoAwayHandler(i) | ||
712 | case *dataFrame: | ||
713 | return l.preprocessData(i) | ||
714 | case *ping: | ||
715 | return l.pingHandler(i) | ||
716 | case *goAway: | ||
717 | return l.goAwayHandler(i) | ||
718 | case *outFlowControlSizeRequest: | ||
719 | return l.outFlowControlSizeRequestHandler(i) | ||
720 | default: | ||
721 | return fmt.Errorf("transport: unknown control message type %T", i) | ||
722 | } | ||
723 | } | ||
724 | |||
725 | func (l *loopyWriter) applySettings(ss []http2.Setting) error { | ||
726 | for _, s := range ss { | ||
727 | switch s.ID { | ||
728 | case http2.SettingInitialWindowSize: | ||
729 | o := l.oiws | ||
730 | l.oiws = s.Val | ||
731 | if o < l.oiws { | ||
732 | // If the new limit is greater make all depleted streams active. | ||
733 | for _, stream := range l.estdStreams { | ||
734 | if stream.state == waitingOnStreamQuota { | ||
735 | stream.state = active | ||
736 | l.activeStreams.enqueue(stream) | ||
737 | } | ||
738 | } | ||
739 | } | ||
740 | case http2.SettingHeaderTableSize: | ||
741 | updateHeaderTblSize(l.hEnc, s.Val) | ||
742 | } | ||
743 | } | ||
744 | return nil | ||
745 | } | ||
746 | |||
747 | // processData removes the first stream from active streams, writes out at most 16KB | ||
748 | // of its data and then puts it at the end of activeStreams if there's still more data | ||
749 | // to be sent and stream has some stream-level flow control. | ||
750 | func (l *loopyWriter) processData() (bool, error) { | ||
751 | if l.sendQuota == 0 { | ||
752 | return true, nil | ||
753 | } | ||
754 | str := l.activeStreams.dequeue() // Remove the first stream. | ||
755 | if str == nil { | ||
756 | return true, nil | ||
757 | } | ||
758 | dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream. | ||
759 | // A data item is represented by a dataFrame, since it later translates into | ||
760 | // multiple HTTP2 data frames. | ||
761 | // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data. | ||
762 | // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the | ||
763 | // maximum possilbe HTTP2 frame size. | ||
764 | |||
765 | if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame | ||
766 | // Client sends out empty data frame with endStream = true | ||
767 | if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil { | ||
768 | return false, err | ||
769 | } | ||
770 | str.itl.dequeue() // remove the empty data item from stream | ||
771 | if str.itl.isEmpty() { | ||
772 | str.state = empty | ||
773 | } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers. | ||
774 | if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { | ||
775 | return false, err | ||
776 | } | ||
777 | if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { | ||
778 | return false, nil | ||
779 | } | ||
780 | } else { | ||
781 | l.activeStreams.enqueue(str) | ||
782 | } | ||
783 | return false, nil | ||
784 | } | ||
785 | var ( | ||
786 | idx int | ||
787 | buf []byte | ||
788 | ) | ||
789 | if len(dataItem.h) != 0 { // data header has not been written out yet. | ||
790 | buf = dataItem.h | ||
791 | } else { | ||
792 | idx = 1 | ||
793 | buf = dataItem.d | ||
794 | } | ||
795 | size := http2MaxFrameLen | ||
796 | if len(buf) < size { | ||
797 | size = len(buf) | ||
798 | } | ||
799 | if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control. | ||
800 | str.state = waitingOnStreamQuota | ||
801 | return false, nil | ||
802 | } else if strQuota < size { | ||
803 | size = strQuota | ||
804 | } | ||
805 | |||
806 | if l.sendQuota < uint32(size) { // connection-level flow control. | ||
807 | size = int(l.sendQuota) | ||
808 | } | ||
809 | // Now that outgoing flow controls are checked we can replenish str's write quota | ||
810 | str.wq.replenish(size) | ||
811 | var endStream bool | ||
812 | // If this is the last data message on this stream and all of it can be written in this iteration. | ||
813 | if dataItem.endStream && size == len(buf) { | ||
814 | // buf contains either data or it contains header but data is empty. | ||
815 | if idx == 1 || len(dataItem.d) == 0 { | ||
816 | endStream = true | ||
817 | } | ||
818 | } | ||
819 | if dataItem.onEachWrite != nil { | ||
820 | dataItem.onEachWrite() | ||
821 | } | ||
822 | if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil { | ||
823 | return false, err | ||
824 | } | ||
825 | buf = buf[size:] | ||
826 | str.bytesOutStanding += size | ||
827 | l.sendQuota -= uint32(size) | ||
828 | if idx == 0 { | ||
829 | dataItem.h = buf | ||
830 | } else { | ||
831 | dataItem.d = buf | ||
832 | } | ||
833 | |||
834 | if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out. | ||
835 | str.itl.dequeue() | ||
836 | } | ||
837 | if str.itl.isEmpty() { | ||
838 | str.state = empty | ||
839 | } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers. | ||
840 | if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { | ||
841 | return false, err | ||
842 | } | ||
843 | if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { | ||
844 | return false, err | ||
845 | } | ||
846 | } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota. | ||
847 | str.state = waitingOnStreamQuota | ||
848 | } else { // Otherwise add it back to the list of active streams. | ||
849 | l.activeStreams.enqueue(str) | ||
850 | } | ||
851 | return false, nil | ||
852 | } | ||