]>
Commit | Line | Data |
---|---|---|
107c1cdb ND |
1 | /* |
2 | * | |
3 | * Copyright 2014 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package transport | |
20 | ||
21 | import ( | |
22 | "bytes" | |
23 | "fmt" | |
24 | "runtime" | |
25 | "sync" | |
26 | ||
27 | "golang.org/x/net/http2" | |
28 | "golang.org/x/net/http2/hpack" | |
29 | ) | |
30 | ||
31 | var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) { | |
32 | e.SetMaxDynamicTableSizeLimit(v) | |
33 | } | |
34 | ||
35 | type itemNode struct { | |
36 | it interface{} | |
37 | next *itemNode | |
38 | } | |
39 | ||
40 | type itemList struct { | |
41 | head *itemNode | |
42 | tail *itemNode | |
43 | } | |
44 | ||
45 | func (il *itemList) enqueue(i interface{}) { | |
46 | n := &itemNode{it: i} | |
47 | if il.tail == nil { | |
48 | il.head, il.tail = n, n | |
49 | return | |
50 | } | |
51 | il.tail.next = n | |
52 | il.tail = n | |
53 | } | |
54 | ||
55 | // peek returns the first item in the list without removing it from the | |
56 | // list. | |
57 | func (il *itemList) peek() interface{} { | |
58 | return il.head.it | |
59 | } | |
60 | ||
61 | func (il *itemList) dequeue() interface{} { | |
62 | if il.head == nil { | |
63 | return nil | |
64 | } | |
65 | i := il.head.it | |
66 | il.head = il.head.next | |
67 | if il.head == nil { | |
68 | il.tail = nil | |
69 | } | |
70 | return i | |
71 | } | |
72 | ||
73 | func (il *itemList) dequeueAll() *itemNode { | |
74 | h := il.head | |
75 | il.head, il.tail = nil, nil | |
76 | return h | |
77 | } | |
78 | ||
79 | func (il *itemList) isEmpty() bool { | |
80 | return il.head == nil | |
81 | } | |
82 | ||
83 | // The following defines various control items which could flow through | |
84 | // the control buffer of transport. They represent different aspects of | |
85 | // control tasks, e.g., flow control, settings, streaming resetting, etc. | |
86 | ||
87 | // registerStream is used to register an incoming stream with loopy writer. | |
88 | type registerStream struct { | |
89 | streamID uint32 | |
90 | wq *writeQuota | |
91 | } | |
92 | ||
93 | // headerFrame is also used to register stream on the client-side. | |
94 | type headerFrame struct { | |
95 | streamID uint32 | |
96 | hf []hpack.HeaderField | |
97 | endStream bool // Valid on server side. | |
98 | initStream func(uint32) (bool, error) // Used only on the client side. | |
99 | onWrite func() | |
100 | wq *writeQuota // write quota for the stream created. | |
101 | cleanup *cleanupStream // Valid on the server side. | |
102 | onOrphaned func(error) // Valid on client-side | |
103 | } | |
104 | ||
105 | type cleanupStream struct { | |
106 | streamID uint32 | |
107 | rst bool | |
108 | rstCode http2.ErrCode | |
109 | onWrite func() | |
110 | } | |
111 | ||
112 | type dataFrame struct { | |
113 | streamID uint32 | |
114 | endStream bool | |
115 | h []byte | |
116 | d []byte | |
117 | // onEachWrite is called every time | |
118 | // a part of d is written out. | |
119 | onEachWrite func() | |
120 | } | |
121 | ||
122 | type incomingWindowUpdate struct { | |
123 | streamID uint32 | |
124 | increment uint32 | |
125 | } | |
126 | ||
127 | type outgoingWindowUpdate struct { | |
128 | streamID uint32 | |
129 | increment uint32 | |
130 | } | |
131 | ||
132 | type incomingSettings struct { | |
133 | ss []http2.Setting | |
134 | } | |
135 | ||
136 | type outgoingSettings struct { | |
137 | ss []http2.Setting | |
138 | } | |
139 | ||
140 | type incomingGoAway struct { | |
141 | } | |
142 | ||
143 | type goAway struct { | |
144 | code http2.ErrCode | |
145 | debugData []byte | |
146 | headsUp bool | |
147 | closeConn bool | |
148 | } | |
149 | ||
150 | type ping struct { | |
151 | ack bool | |
152 | data [8]byte | |
153 | } | |
154 | ||
155 | type outFlowControlSizeRequest struct { | |
156 | resp chan uint32 | |
157 | } | |
158 | ||
159 | type outStreamState int | |
160 | ||
161 | const ( | |
162 | active outStreamState = iota | |
163 | empty | |
164 | waitingOnStreamQuota | |
165 | ) | |
166 | ||
167 | type outStream struct { | |
168 | id uint32 | |
169 | state outStreamState | |
170 | itl *itemList | |
171 | bytesOutStanding int | |
172 | wq *writeQuota | |
173 | ||
174 | next *outStream | |
175 | prev *outStream | |
176 | } | |
177 | ||
178 | func (s *outStream) deleteSelf() { | |
179 | if s.prev != nil { | |
180 | s.prev.next = s.next | |
181 | } | |
182 | if s.next != nil { | |
183 | s.next.prev = s.prev | |
184 | } | |
185 | s.next, s.prev = nil, nil | |
186 | } | |
187 | ||
188 | type outStreamList struct { | |
189 | // Following are sentinel objects that mark the | |
190 | // beginning and end of the list. They do not | |
191 | // contain any item lists. All valid objects are | |
192 | // inserted in between them. | |
193 | // This is needed so that an outStream object can | |
194 | // deleteSelf() in O(1) time without knowing which | |
195 | // list it belongs to. | |
196 | head *outStream | |
197 | tail *outStream | |
198 | } | |
199 | ||
200 | func newOutStreamList() *outStreamList { | |
201 | head, tail := new(outStream), new(outStream) | |
202 | head.next = tail | |
203 | tail.prev = head | |
204 | return &outStreamList{ | |
205 | head: head, | |
206 | tail: tail, | |
207 | } | |
208 | } | |
209 | ||
210 | func (l *outStreamList) enqueue(s *outStream) { | |
211 | e := l.tail.prev | |
212 | e.next = s | |
213 | s.prev = e | |
214 | s.next = l.tail | |
215 | l.tail.prev = s | |
216 | } | |
217 | ||
218 | // remove from the beginning of the list. | |
219 | func (l *outStreamList) dequeue() *outStream { | |
220 | b := l.head.next | |
221 | if b == l.tail { | |
222 | return nil | |
223 | } | |
224 | b.deleteSelf() | |
225 | return b | |
226 | } | |
227 | ||
228 | // controlBuffer is a way to pass information to loopy. | |
229 | // Information is passed as specific struct types called control frames. | |
230 | // A control frame not only represents data, messages or headers to be sent out | |
231 | // but can also be used to instruct loopy to update its internal state. | |
232 | // It shouldn't be confused with an HTTP2 frame, although some of the control frames | |
233 | // like dataFrame and headerFrame do go out on wire as HTTP2 frames. | |
234 | type controlBuffer struct { | |
235 | ch chan struct{} | |
236 | done <-chan struct{} | |
237 | mu sync.Mutex | |
238 | consumerWaiting bool | |
239 | list *itemList | |
240 | err error | |
241 | } | |
242 | ||
243 | func newControlBuffer(done <-chan struct{}) *controlBuffer { | |
244 | return &controlBuffer{ | |
245 | ch: make(chan struct{}, 1), | |
246 | list: &itemList{}, | |
247 | done: done, | |
248 | } | |
249 | } | |
250 | ||
251 | func (c *controlBuffer) put(it interface{}) error { | |
252 | _, err := c.executeAndPut(nil, it) | |
253 | return err | |
254 | } | |
255 | ||
256 | func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) { | |
257 | var wakeUp bool | |
258 | c.mu.Lock() | |
259 | if c.err != nil { | |
260 | c.mu.Unlock() | |
261 | return false, c.err | |
262 | } | |
263 | if f != nil { | |
264 | if !f(it) { // f wasn't successful | |
265 | c.mu.Unlock() | |
266 | return false, nil | |
267 | } | |
268 | } | |
269 | if c.consumerWaiting { | |
270 | wakeUp = true | |
271 | c.consumerWaiting = false | |
272 | } | |
273 | c.list.enqueue(it) | |
274 | c.mu.Unlock() | |
275 | if wakeUp { | |
276 | select { | |
277 | case c.ch <- struct{}{}: | |
278 | default: | |
279 | } | |
280 | } | |
281 | return true, nil | |
282 | } | |
283 | ||
284 | // Note argument f should never be nil. | |
285 | func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) { | |
286 | c.mu.Lock() | |
287 | if c.err != nil { | |
288 | c.mu.Unlock() | |
289 | return false, c.err | |
290 | } | |
291 | if !f(it) { // f wasn't successful | |
292 | c.mu.Unlock() | |
293 | return false, nil | |
294 | } | |
295 | c.mu.Unlock() | |
296 | return true, nil | |
297 | } | |
298 | ||
299 | func (c *controlBuffer) get(block bool) (interface{}, error) { | |
300 | for { | |
301 | c.mu.Lock() | |
302 | if c.err != nil { | |
303 | c.mu.Unlock() | |
304 | return nil, c.err | |
305 | } | |
306 | if !c.list.isEmpty() { | |
307 | h := c.list.dequeue() | |
308 | c.mu.Unlock() | |
309 | return h, nil | |
310 | } | |
311 | if !block { | |
312 | c.mu.Unlock() | |
313 | return nil, nil | |
314 | } | |
315 | c.consumerWaiting = true | |
316 | c.mu.Unlock() | |
317 | select { | |
318 | case <-c.ch: | |
319 | case <-c.done: | |
320 | c.finish() | |
321 | return nil, ErrConnClosing | |
322 | } | |
323 | } | |
324 | } | |
325 | ||
326 | func (c *controlBuffer) finish() { | |
327 | c.mu.Lock() | |
328 | if c.err != nil { | |
329 | c.mu.Unlock() | |
330 | return | |
331 | } | |
332 | c.err = ErrConnClosing | |
333 | // There may be headers for streams in the control buffer. | |
334 | // These streams need to be cleaned out since the transport | |
335 | // is still not aware of these yet. | |
336 | for head := c.list.dequeueAll(); head != nil; head = head.next { | |
337 | hdr, ok := head.it.(*headerFrame) | |
338 | if !ok { | |
339 | continue | |
340 | } | |
341 | if hdr.onOrphaned != nil { // It will be nil on the server-side. | |
342 | hdr.onOrphaned(ErrConnClosing) | |
343 | } | |
344 | } | |
345 | c.mu.Unlock() | |
346 | } | |
347 | ||
348 | type side int | |
349 | ||
350 | const ( | |
351 | clientSide side = iota | |
352 | serverSide | |
353 | ) | |
354 | ||
355 | // Loopy receives frames from the control buffer. | |
356 | // Each frame is handled individually; most of the work done by loopy goes | |
357 | // into handling data frames. Loopy maintains a queue of active streams, and each | |
358 | // stream maintains a queue of data frames; as loopy receives data frames | |
359 | // it gets added to the queue of the relevant stream. | |
360 | // Loopy goes over this list of active streams by processing one node every iteration, | |
361 | // thereby closely resemebling to a round-robin scheduling over all streams. While | |
362 | // processing a stream, loopy writes out data bytes from this stream capped by the min | |
363 | // of http2MaxFrameLen, connection-level flow control and stream-level flow control. | |
364 | type loopyWriter struct { | |
365 | side side | |
366 | cbuf *controlBuffer | |
367 | sendQuota uint32 | |
368 | oiws uint32 // outbound initial window size. | |
369 | // estdStreams is map of all established streams that are not cleaned-up yet. | |
370 | // On client-side, this is all streams whose headers were sent out. | |
371 | // On server-side, this is all streams whose headers were received. | |
372 | estdStreams map[uint32]*outStream // Established streams. | |
373 | // activeStreams is a linked-list of all streams that have data to send and some | |
374 | // stream-level flow control quota. | |
375 | // Each of these streams internally have a list of data items(and perhaps trailers | |
376 | // on the server-side) to be sent out. | |
377 | activeStreams *outStreamList | |
378 | framer *framer | |
379 | hBuf *bytes.Buffer // The buffer for HPACK encoding. | |
380 | hEnc *hpack.Encoder // HPACK encoder. | |
381 | bdpEst *bdpEstimator | |
382 | draining bool | |
383 | ||
384 | // Side-specific handlers | |
385 | ssGoAwayHandler func(*goAway) (bool, error) | |
386 | } | |
387 | ||
388 | func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter { | |
389 | var buf bytes.Buffer | |
390 | l := &loopyWriter{ | |
391 | side: s, | |
392 | cbuf: cbuf, | |
393 | sendQuota: defaultWindowSize, | |
394 | oiws: defaultWindowSize, | |
395 | estdStreams: make(map[uint32]*outStream), | |
396 | activeStreams: newOutStreamList(), | |
397 | framer: fr, | |
398 | hBuf: &buf, | |
399 | hEnc: hpack.NewEncoder(&buf), | |
400 | bdpEst: bdpEst, | |
401 | } | |
402 | return l | |
403 | } | |
404 | ||
405 | const minBatchSize = 1000 | |
406 | ||
407 | // run should be run in a separate goroutine. | |
408 | // It reads control frames from controlBuf and processes them by: | |
409 | // 1. Updating loopy's internal state, or/and | |
410 | // 2. Writing out HTTP2 frames on the wire. | |
411 | // | |
412 | // Loopy keeps all active streams with data to send in a linked-list. | |
413 | // All streams in the activeStreams linked-list must have both: | |
414 | // 1. Data to send, and | |
415 | // 2. Stream level flow control quota available. | |
416 | // | |
417 | // In each iteration of run loop, other than processing the incoming control | |
418 | // frame, loopy calls processData, which processes one node from the activeStreams linked-list. | |
419 | // This results in writing of HTTP2 frames into an underlying write buffer. | |
420 | // When there's no more control frames to read from controlBuf, loopy flushes the write buffer. | |
421 | // As an optimization, to increase the batch size for each flush, loopy yields the processor, once | |
422 | // if the batch size is too low to give stream goroutines a chance to fill it up. | |
423 | func (l *loopyWriter) run() (err error) { | |
424 | defer func() { | |
425 | if err == ErrConnClosing { | |
426 | // Don't log ErrConnClosing as error since it happens | |
427 | // 1. When the connection is closed by some other known issue. | |
428 | // 2. User closed the connection. | |
429 | // 3. A graceful close of connection. | |
430 | infof("transport: loopyWriter.run returning. %v", err) | |
431 | err = nil | |
432 | } | |
433 | }() | |
434 | for { | |
435 | it, err := l.cbuf.get(true) | |
436 | if err != nil { | |
437 | return err | |
438 | } | |
439 | if err = l.handle(it); err != nil { | |
440 | return err | |
441 | } | |
442 | if _, err = l.processData(); err != nil { | |
443 | return err | |
444 | } | |
445 | gosched := true | |
446 | hasdata: | |
447 | for { | |
448 | it, err := l.cbuf.get(false) | |
449 | if err != nil { | |
450 | return err | |
451 | } | |
452 | if it != nil { | |
453 | if err = l.handle(it); err != nil { | |
454 | return err | |
455 | } | |
456 | if _, err = l.processData(); err != nil { | |
457 | return err | |
458 | } | |
459 | continue hasdata | |
460 | } | |
461 | isEmpty, err := l.processData() | |
462 | if err != nil { | |
463 | return err | |
464 | } | |
465 | if !isEmpty { | |
466 | continue hasdata | |
467 | } | |
468 | if gosched { | |
469 | gosched = false | |
470 | if l.framer.writer.offset < minBatchSize { | |
471 | runtime.Gosched() | |
472 | continue hasdata | |
473 | } | |
474 | } | |
475 | l.framer.writer.Flush() | |
476 | break hasdata | |
477 | ||
478 | } | |
479 | } | |
480 | } | |
481 | ||
482 | func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error { | |
483 | return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment) | |
484 | } | |
485 | ||
486 | func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error { | |
487 | // Otherwise update the quota. | |
488 | if w.streamID == 0 { | |
489 | l.sendQuota += w.increment | |
490 | return nil | |
491 | } | |
492 | // Find the stream and update it. | |
493 | if str, ok := l.estdStreams[w.streamID]; ok { | |
494 | str.bytesOutStanding -= int(w.increment) | |
495 | if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota { | |
496 | str.state = active | |
497 | l.activeStreams.enqueue(str) | |
498 | return nil | |
499 | } | |
500 | } | |
501 | return nil | |
502 | } | |
503 | ||
504 | func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error { | |
505 | return l.framer.fr.WriteSettings(s.ss...) | |
506 | } | |
507 | ||
508 | func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error { | |
509 | if err := l.applySettings(s.ss); err != nil { | |
510 | return err | |
511 | } | |
512 | return l.framer.fr.WriteSettingsAck() | |
513 | } | |
514 | ||
515 | func (l *loopyWriter) registerStreamHandler(h *registerStream) error { | |
516 | str := &outStream{ | |
517 | id: h.streamID, | |
518 | state: empty, | |
519 | itl: &itemList{}, | |
520 | wq: h.wq, | |
521 | } | |
522 | l.estdStreams[h.streamID] = str | |
523 | return nil | |
524 | } | |
525 | ||
526 | func (l *loopyWriter) headerHandler(h *headerFrame) error { | |
527 | if l.side == serverSide { | |
528 | str, ok := l.estdStreams[h.streamID] | |
529 | if !ok { | |
530 | warningf("transport: loopy doesn't recognize the stream: %d", h.streamID) | |
531 | return nil | |
532 | } | |
533 | // Case 1.A: Server is responding back with headers. | |
534 | if !h.endStream { | |
535 | return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite) | |
536 | } | |
537 | // else: Case 1.B: Server wants to close stream. | |
538 | ||
539 | if str.state != empty { // either active or waiting on stream quota. | |
540 | // add it str's list of items. | |
541 | str.itl.enqueue(h) | |
542 | return nil | |
543 | } | |
544 | if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil { | |
545 | return err | |
546 | } | |
547 | return l.cleanupStreamHandler(h.cleanup) | |
548 | } | |
549 | // Case 2: Client wants to originate stream. | |
550 | str := &outStream{ | |
551 | id: h.streamID, | |
552 | state: empty, | |
553 | itl: &itemList{}, | |
554 | wq: h.wq, | |
555 | } | |
556 | str.itl.enqueue(h) | |
557 | return l.originateStream(str) | |
558 | } | |
559 | ||
560 | func (l *loopyWriter) originateStream(str *outStream) error { | |
561 | hdr := str.itl.dequeue().(*headerFrame) | |
562 | sendPing, err := hdr.initStream(str.id) | |
563 | if err != nil { | |
564 | if err == ErrConnClosing { | |
565 | return err | |
566 | } | |
567 | // Other errors(errStreamDrain) need not close transport. | |
568 | return nil | |
569 | } | |
570 | if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil { | |
571 | return err | |
572 | } | |
573 | l.estdStreams[str.id] = str | |
574 | if sendPing { | |
575 | return l.pingHandler(&ping{data: [8]byte{}}) | |
576 | } | |
577 | return nil | |
578 | } | |
579 | ||
580 | func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error { | |
581 | if onWrite != nil { | |
582 | onWrite() | |
583 | } | |
584 | l.hBuf.Reset() | |
585 | for _, f := range hf { | |
586 | if err := l.hEnc.WriteField(f); err != nil { | |
587 | warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err) | |
588 | } | |
589 | } | |
590 | var ( | |
591 | err error | |
592 | endHeaders, first bool | |
593 | ) | |
594 | first = true | |
595 | for !endHeaders { | |
596 | size := l.hBuf.Len() | |
597 | if size > http2MaxFrameLen { | |
598 | size = http2MaxFrameLen | |
599 | } else { | |
600 | endHeaders = true | |
601 | } | |
602 | if first { | |
603 | first = false | |
604 | err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{ | |
605 | StreamID: streamID, | |
606 | BlockFragment: l.hBuf.Next(size), | |
607 | EndStream: endStream, | |
608 | EndHeaders: endHeaders, | |
609 | }) | |
610 | } else { | |
611 | err = l.framer.fr.WriteContinuation( | |
612 | streamID, | |
613 | endHeaders, | |
614 | l.hBuf.Next(size), | |
615 | ) | |
616 | } | |
617 | if err != nil { | |
618 | return err | |
619 | } | |
620 | } | |
621 | return nil | |
622 | } | |
623 | ||
624 | func (l *loopyWriter) preprocessData(df *dataFrame) error { | |
625 | str, ok := l.estdStreams[df.streamID] | |
626 | if !ok { | |
627 | return nil | |
628 | } | |
629 | // If we got data for a stream it means that | |
630 | // stream was originated and the headers were sent out. | |
631 | str.itl.enqueue(df) | |
632 | if str.state == empty { | |
633 | str.state = active | |
634 | l.activeStreams.enqueue(str) | |
635 | } | |
636 | return nil | |
637 | } | |
638 | ||
639 | func (l *loopyWriter) pingHandler(p *ping) error { | |
640 | if !p.ack { | |
641 | l.bdpEst.timesnap(p.data) | |
642 | } | |
643 | return l.framer.fr.WritePing(p.ack, p.data) | |
644 | ||
645 | } | |
646 | ||
647 | func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error { | |
648 | o.resp <- l.sendQuota | |
649 | return nil | |
650 | } | |
651 | ||
652 | func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error { | |
653 | c.onWrite() | |
654 | if str, ok := l.estdStreams[c.streamID]; ok { | |
655 | // On the server side it could be a trailers-only response or | |
656 | // a RST_STREAM before stream initialization thus the stream might | |
657 | // not be established yet. | |
658 | delete(l.estdStreams, c.streamID) | |
659 | str.deleteSelf() | |
660 | } | |
661 | if c.rst { // If RST_STREAM needs to be sent. | |
662 | if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil { | |
663 | return err | |
664 | } | |
665 | } | |
666 | if l.side == clientSide && l.draining && len(l.estdStreams) == 0 { | |
667 | return ErrConnClosing | |
668 | } | |
669 | return nil | |
670 | } | |
671 | ||
672 | func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error { | |
673 | if l.side == clientSide { | |
674 | l.draining = true | |
675 | if len(l.estdStreams) == 0 { | |
676 | return ErrConnClosing | |
677 | } | |
678 | } | |
679 | return nil | |
680 | } | |
681 | ||
682 | func (l *loopyWriter) goAwayHandler(g *goAway) error { | |
683 | // Handling of outgoing GoAway is very specific to side. | |
684 | if l.ssGoAwayHandler != nil { | |
685 | draining, err := l.ssGoAwayHandler(g) | |
686 | if err != nil { | |
687 | return err | |
688 | } | |
689 | l.draining = draining | |
690 | } | |
691 | return nil | |
692 | } | |
693 | ||
694 | func (l *loopyWriter) handle(i interface{}) error { | |
695 | switch i := i.(type) { | |
696 | case *incomingWindowUpdate: | |
697 | return l.incomingWindowUpdateHandler(i) | |
698 | case *outgoingWindowUpdate: | |
699 | return l.outgoingWindowUpdateHandler(i) | |
700 | case *incomingSettings: | |
701 | return l.incomingSettingsHandler(i) | |
702 | case *outgoingSettings: | |
703 | return l.outgoingSettingsHandler(i) | |
704 | case *headerFrame: | |
705 | return l.headerHandler(i) | |
706 | case *registerStream: | |
707 | return l.registerStreamHandler(i) | |
708 | case *cleanupStream: | |
709 | return l.cleanupStreamHandler(i) | |
710 | case *incomingGoAway: | |
711 | return l.incomingGoAwayHandler(i) | |
712 | case *dataFrame: | |
713 | return l.preprocessData(i) | |
714 | case *ping: | |
715 | return l.pingHandler(i) | |
716 | case *goAway: | |
717 | return l.goAwayHandler(i) | |
718 | case *outFlowControlSizeRequest: | |
719 | return l.outFlowControlSizeRequestHandler(i) | |
720 | default: | |
721 | return fmt.Errorf("transport: unknown control message type %T", i) | |
722 | } | |
723 | } | |
724 | ||
725 | func (l *loopyWriter) applySettings(ss []http2.Setting) error { | |
726 | for _, s := range ss { | |
727 | switch s.ID { | |
728 | case http2.SettingInitialWindowSize: | |
729 | o := l.oiws | |
730 | l.oiws = s.Val | |
731 | if o < l.oiws { | |
732 | // If the new limit is greater make all depleted streams active. | |
733 | for _, stream := range l.estdStreams { | |
734 | if stream.state == waitingOnStreamQuota { | |
735 | stream.state = active | |
736 | l.activeStreams.enqueue(stream) | |
737 | } | |
738 | } | |
739 | } | |
740 | case http2.SettingHeaderTableSize: | |
741 | updateHeaderTblSize(l.hEnc, s.Val) | |
742 | } | |
743 | } | |
744 | return nil | |
745 | } | |
746 | ||
747 | // processData removes the first stream from active streams, writes out at most 16KB | |
748 | // of its data and then puts it at the end of activeStreams if there's still more data | |
749 | // to be sent and stream has some stream-level flow control. | |
750 | func (l *loopyWriter) processData() (bool, error) { | |
751 | if l.sendQuota == 0 { | |
752 | return true, nil | |
753 | } | |
754 | str := l.activeStreams.dequeue() // Remove the first stream. | |
755 | if str == nil { | |
756 | return true, nil | |
757 | } | |
758 | dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream. | |
759 | // A data item is represented by a dataFrame, since it later translates into | |
760 | // multiple HTTP2 data frames. | |
761 | // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data. | |
762 | // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the | |
763 | // maximum possilbe HTTP2 frame size. | |
764 | ||
765 | if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame | |
766 | // Client sends out empty data frame with endStream = true | |
767 | if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil { | |
768 | return false, err | |
769 | } | |
770 | str.itl.dequeue() // remove the empty data item from stream | |
771 | if str.itl.isEmpty() { | |
772 | str.state = empty | |
773 | } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers. | |
774 | if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { | |
775 | return false, err | |
776 | } | |
777 | if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { | |
778 | return false, nil | |
779 | } | |
780 | } else { | |
781 | l.activeStreams.enqueue(str) | |
782 | } | |
783 | return false, nil | |
784 | } | |
785 | var ( | |
786 | idx int | |
787 | buf []byte | |
788 | ) | |
789 | if len(dataItem.h) != 0 { // data header has not been written out yet. | |
790 | buf = dataItem.h | |
791 | } else { | |
792 | idx = 1 | |
793 | buf = dataItem.d | |
794 | } | |
795 | size := http2MaxFrameLen | |
796 | if len(buf) < size { | |
797 | size = len(buf) | |
798 | } | |
799 | if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control. | |
800 | str.state = waitingOnStreamQuota | |
801 | return false, nil | |
802 | } else if strQuota < size { | |
803 | size = strQuota | |
804 | } | |
805 | ||
806 | if l.sendQuota < uint32(size) { // connection-level flow control. | |
807 | size = int(l.sendQuota) | |
808 | } | |
809 | // Now that outgoing flow controls are checked we can replenish str's write quota | |
810 | str.wq.replenish(size) | |
811 | var endStream bool | |
812 | // If this is the last data message on this stream and all of it can be written in this iteration. | |
813 | if dataItem.endStream && size == len(buf) { | |
814 | // buf contains either data or it contains header but data is empty. | |
815 | if idx == 1 || len(dataItem.d) == 0 { | |
816 | endStream = true | |
817 | } | |
818 | } | |
819 | if dataItem.onEachWrite != nil { | |
820 | dataItem.onEachWrite() | |
821 | } | |
822 | if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil { | |
823 | return false, err | |
824 | } | |
825 | buf = buf[size:] | |
826 | str.bytesOutStanding += size | |
827 | l.sendQuota -= uint32(size) | |
828 | if idx == 0 { | |
829 | dataItem.h = buf | |
830 | } else { | |
831 | dataItem.d = buf | |
832 | } | |
833 | ||
834 | if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out. | |
835 | str.itl.dequeue() | |
836 | } | |
837 | if str.itl.isEmpty() { | |
838 | str.state = empty | |
839 | } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers. | |
840 | if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil { | |
841 | return false, err | |
842 | } | |
843 | if err := l.cleanupStreamHandler(trailer.cleanup); err != nil { | |
844 | return false, err | |
845 | } | |
846 | } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota. | |
847 | str.state = waitingOnStreamQuota | |
848 | } else { // Otherwise add it back to the list of active streams. | |
849 | l.activeStreams.enqueue(str) | |
850 | } | |
851 | return false, nil | |
852 | } |