]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | /* |
2 | * | |
3 | * Copyright 2014 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package transport | |
20 | ||
21 | import ( | |
22 | "bytes" | |
23 | "errors" | |
24 | "io" | |
25 | "math" | |
26 | "math/rand" | |
27 | "net" | |
28 | "strconv" | |
29 | "sync" | |
30 | "sync/atomic" | |
31 | "time" | |
32 | ||
33 | "github.com/golang/protobuf/proto" | |
34 | "golang.org/x/net/context" | |
35 | "golang.org/x/net/http2" | |
36 | "golang.org/x/net/http2/hpack" | |
37 | "google.golang.org/grpc/codes" | |
38 | "google.golang.org/grpc/credentials" | |
39 | "google.golang.org/grpc/keepalive" | |
40 | "google.golang.org/grpc/metadata" | |
41 | "google.golang.org/grpc/peer" | |
42 | "google.golang.org/grpc/stats" | |
43 | "google.golang.org/grpc/status" | |
44 | "google.golang.org/grpc/tap" | |
45 | ) | |
46 | ||
47 | // ErrIllegalHeaderWrite indicates that setting header is illegal because of | |
48 | // the stream's state. | |
49 | var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called") | |
50 | ||
51 | // http2Server implements the ServerTransport interface with HTTP2. | |
52 | type http2Server struct { | |
53 | ctx context.Context | |
54 | conn net.Conn | |
55 | remoteAddr net.Addr | |
56 | localAddr net.Addr | |
57 | maxStreamID uint32 // max stream ID ever seen | |
58 | authInfo credentials.AuthInfo // auth info about the connection | |
59 | inTapHandle tap.ServerInHandle | |
60 | // writableChan synchronizes write access to the transport. | |
61 | // A writer acquires the write lock by receiving a value on writableChan | |
62 | // and releases it by sending on writableChan. | |
63 | writableChan chan int | |
64 | // shutdownChan is closed when Close is called. | |
65 | // Blocking operations should select on shutdownChan to avoid | |
66 | // blocking forever after Close. | |
67 | shutdownChan chan struct{} | |
68 | framer *framer | |
69 | hBuf *bytes.Buffer // the buffer for HPACK encoding | |
70 | hEnc *hpack.Encoder // HPACK encoder | |
71 | // The max number of concurrent streams. | |
72 | maxStreams uint32 | |
73 | // controlBuf delivers all the control related tasks (e.g., window | |
74 | // updates, reset streams, and various settings) to the controller. | |
75 | controlBuf *controlBuffer | |
76 | fc *inFlow | |
77 | // sendQuotaPool provides flow control to outbound message. | |
78 | sendQuotaPool *quotaPool | |
79 | stats stats.Handler | |
80 | // Flag to keep track of reading activity on transport. | |
81 | // 1 is true and 0 is false. | |
82 | activity uint32 // Accessed atomically. | |
83 | // Keepalive and max-age parameters for the server. | |
84 | kp keepalive.ServerParameters | |
85 | ||
86 | // Keepalive enforcement policy. | |
87 | kep keepalive.EnforcementPolicy | |
88 | // The time instance last ping was received. | |
89 | lastPingAt time.Time | |
90 | // Number of times the client has violated keepalive ping policy so far. | |
91 | pingStrikes uint8 | |
92 | // Flag to signify that number of ping strikes should be reset to 0. | |
93 | // This is set whenever data or header frames are sent. | |
94 | // 1 means yes. | |
95 | resetPingStrikes uint32 // Accessed atomically. | |
96 | initialWindowSize int32 | |
97 | bdpEst *bdpEstimator | |
98 | ||
99 | outQuotaVersion uint32 | |
100 | ||
101 | mu sync.Mutex // guard the following | |
102 | ||
103 | // drainChan is initialized when drain(...) is called the first time. | |
104 | // After which the server writes out the first GoAway(with ID 2^31-1) frame. | |
105 | // Then an independent goroutine will be launched to later send the second GoAway. | |
106 | // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. | |
107 | // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is | |
108 | // already underway. | |
109 | drainChan chan struct{} | |
110 | state transportState | |
111 | activeStreams map[uint32]*Stream | |
112 | // the per-stream outbound flow control window size set by the peer. | |
113 | streamSendQuota uint32 | |
114 | // idle is the time instant when the connection went idle. | |
115 | // This is either the begining of the connection or when the number of | |
116 | // RPCs go down to 0. | |
117 | // When the connection is busy, this value is set to 0. | |
118 | idle time.Time | |
119 | } | |
120 | ||
121 | // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is | |
122 | // returned if something goes wrong. | |
123 | func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { | |
124 | framer := newFramer(conn) | |
125 | // Send initial settings as connection preface to client. | |
126 | var isettings []http2.Setting | |
127 | // TODO(zhaoq): Have a better way to signal "no limit" because 0 is | |
128 | // permitted in the HTTP2 spec. | |
129 | maxStreams := config.MaxStreams | |
130 | if maxStreams == 0 { | |
131 | maxStreams = math.MaxUint32 | |
132 | } else { | |
133 | isettings = append(isettings, http2.Setting{ | |
134 | ID: http2.SettingMaxConcurrentStreams, | |
135 | Val: maxStreams, | |
136 | }) | |
137 | } | |
138 | dynamicWindow := true | |
139 | iwz := int32(initialWindowSize) | |
140 | if config.InitialWindowSize >= defaultWindowSize { | |
141 | iwz = config.InitialWindowSize | |
142 | dynamicWindow = false | |
143 | } | |
144 | icwz := int32(initialWindowSize) | |
145 | if config.InitialConnWindowSize >= defaultWindowSize { | |
146 | icwz = config.InitialConnWindowSize | |
147 | dynamicWindow = false | |
148 | } | |
149 | if iwz != defaultWindowSize { | |
150 | isettings = append(isettings, http2.Setting{ | |
151 | ID: http2.SettingInitialWindowSize, | |
152 | Val: uint32(iwz)}) | |
153 | } | |
154 | if err := framer.writeSettings(true, isettings...); err != nil { | |
155 | return nil, connectionErrorf(true, err, "transport: %v", err) | |
156 | } | |
157 | // Adjust the connection flow control window if needed. | |
158 | if delta := uint32(icwz - defaultWindowSize); delta > 0 { | |
159 | if err := framer.writeWindowUpdate(true, 0, delta); err != nil { | |
160 | return nil, connectionErrorf(true, err, "transport: %v", err) | |
161 | } | |
162 | } | |
163 | kp := config.KeepaliveParams | |
164 | if kp.MaxConnectionIdle == 0 { | |
165 | kp.MaxConnectionIdle = defaultMaxConnectionIdle | |
166 | } | |
167 | if kp.MaxConnectionAge == 0 { | |
168 | kp.MaxConnectionAge = defaultMaxConnectionAge | |
169 | } | |
170 | // Add a jitter to MaxConnectionAge. | |
171 | kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge) | |
172 | if kp.MaxConnectionAgeGrace == 0 { | |
173 | kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace | |
174 | } | |
175 | if kp.Time == 0 { | |
176 | kp.Time = defaultServerKeepaliveTime | |
177 | } | |
178 | if kp.Timeout == 0 { | |
179 | kp.Timeout = defaultServerKeepaliveTimeout | |
180 | } | |
181 | kep := config.KeepalivePolicy | |
182 | if kep.MinTime == 0 { | |
183 | kep.MinTime = defaultKeepalivePolicyMinTime | |
184 | } | |
185 | var buf bytes.Buffer | |
186 | t := &http2Server{ | |
187 | ctx: context.Background(), | |
188 | conn: conn, | |
189 | remoteAddr: conn.RemoteAddr(), | |
190 | localAddr: conn.LocalAddr(), | |
191 | authInfo: config.AuthInfo, | |
192 | framer: framer, | |
193 | hBuf: &buf, | |
194 | hEnc: hpack.NewEncoder(&buf), | |
195 | maxStreams: maxStreams, | |
196 | inTapHandle: config.InTapHandle, | |
197 | controlBuf: newControlBuffer(), | |
198 | fc: &inFlow{limit: uint32(icwz)}, | |
199 | sendQuotaPool: newQuotaPool(defaultWindowSize), | |
200 | state: reachable, | |
201 | writableChan: make(chan int, 1), | |
202 | shutdownChan: make(chan struct{}), | |
203 | activeStreams: make(map[uint32]*Stream), | |
204 | streamSendQuota: defaultWindowSize, | |
205 | stats: config.StatsHandler, | |
206 | kp: kp, | |
207 | idle: time.Now(), | |
208 | kep: kep, | |
209 | initialWindowSize: iwz, | |
210 | } | |
211 | if dynamicWindow { | |
212 | t.bdpEst = &bdpEstimator{ | |
213 | bdp: initialWindowSize, | |
214 | updateFlowControl: t.updateFlowControl, | |
215 | } | |
216 | } | |
217 | if t.stats != nil { | |
218 | t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ | |
219 | RemoteAddr: t.remoteAddr, | |
220 | LocalAddr: t.localAddr, | |
221 | }) | |
222 | connBegin := &stats.ConnBegin{} | |
223 | t.stats.HandleConn(t.ctx, connBegin) | |
224 | } | |
225 | go t.controller() | |
226 | go t.keepalive() | |
227 | t.writableChan <- 0 | |
228 | return t, nil | |
229 | } | |
230 | ||
231 | // operateHeader takes action on the decoded headers. | |
232 | func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) { | |
233 | buf := newRecvBuffer() | |
234 | s := &Stream{ | |
235 | id: frame.Header().StreamID, | |
236 | st: t, | |
237 | buf: buf, | |
238 | fc: &inFlow{limit: uint32(t.initialWindowSize)}, | |
239 | } | |
240 | ||
241 | var state decodeState | |
242 | for _, hf := range frame.Fields { | |
243 | if err := state.processHeaderField(hf); err != nil { | |
244 | if se, ok := err.(StreamError); ok { | |
245 | t.controlBuf.put(&resetStream{s.id, statusCodeConvTab[se.Code]}) | |
246 | } | |
247 | return | |
248 | } | |
249 | } | |
250 | ||
251 | if frame.StreamEnded() { | |
252 | // s is just created by the caller. No lock needed. | |
253 | s.state = streamReadDone | |
254 | } | |
255 | s.recvCompress = state.encoding | |
256 | if state.timeoutSet { | |
257 | s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout) | |
258 | } else { | |
259 | s.ctx, s.cancel = context.WithCancel(t.ctx) | |
260 | } | |
261 | pr := &peer.Peer{ | |
262 | Addr: t.remoteAddr, | |
263 | } | |
264 | // Attach Auth info if there is any. | |
265 | if t.authInfo != nil { | |
266 | pr.AuthInfo = t.authInfo | |
267 | } | |
268 | s.ctx = peer.NewContext(s.ctx, pr) | |
269 | // Cache the current stream to the context so that the server application | |
270 | // can find out. Required when the server wants to send some metadata | |
271 | // back to the client (unary call only). | |
272 | s.ctx = newContextWithStream(s.ctx, s) | |
273 | // Attach the received metadata to the context. | |
274 | if len(state.mdata) > 0 { | |
275 | s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata) | |
276 | } | |
277 | s.trReader = &transportReader{ | |
278 | reader: &recvBufferReader{ | |
279 | ctx: s.ctx, | |
280 | recv: s.buf, | |
281 | }, | |
282 | windowHandler: func(n int) { | |
283 | t.updateWindow(s, uint32(n)) | |
284 | }, | |
285 | } | |
286 | s.recvCompress = state.encoding | |
287 | s.method = state.method | |
288 | if t.inTapHandle != nil { | |
289 | var err error | |
290 | info := &tap.Info{ | |
291 | FullMethodName: state.method, | |
292 | } | |
293 | s.ctx, err = t.inTapHandle(s.ctx, info) | |
294 | if err != nil { | |
295 | warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err) | |
296 | t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream}) | |
297 | return | |
298 | } | |
299 | } | |
300 | t.mu.Lock() | |
301 | if t.state != reachable { | |
302 | t.mu.Unlock() | |
303 | return | |
304 | } | |
305 | if uint32(len(t.activeStreams)) >= t.maxStreams { | |
306 | t.mu.Unlock() | |
307 | t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream}) | |
308 | return | |
309 | } | |
310 | if s.id%2 != 1 || s.id <= t.maxStreamID { | |
311 | t.mu.Unlock() | |
312 | // illegal gRPC stream id. | |
313 | errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", s.id) | |
314 | return true | |
315 | } | |
316 | t.maxStreamID = s.id | |
317 | s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota)) | |
318 | t.activeStreams[s.id] = s | |
319 | if len(t.activeStreams) == 1 { | |
320 | t.idle = time.Time{} | |
321 | } | |
322 | t.mu.Unlock() | |
323 | s.requestRead = func(n int) { | |
324 | t.adjustWindow(s, uint32(n)) | |
325 | } | |
326 | s.ctx = traceCtx(s.ctx, s.method) | |
327 | if t.stats != nil { | |
328 | s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) | |
329 | inHeader := &stats.InHeader{ | |
330 | FullMethod: s.method, | |
331 | RemoteAddr: t.remoteAddr, | |
332 | LocalAddr: t.localAddr, | |
333 | Compression: s.recvCompress, | |
334 | WireLength: int(frame.Header().Length), | |
335 | } | |
336 | t.stats.HandleRPC(s.ctx, inHeader) | |
337 | } | |
338 | handle(s) | |
339 | return | |
340 | } | |
341 | ||
342 | // HandleStreams receives incoming streams using the given handler. This is | |
343 | // typically run in a separate goroutine. | |
344 | // traceCtx attaches trace to ctx and returns the new context. | |
345 | func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { | |
346 | // Check the validity of client preface. | |
347 | preface := make([]byte, len(clientPreface)) | |
348 | if _, err := io.ReadFull(t.conn, preface); err != nil { | |
349 | // Only log if it isn't a simple tcp accept check (ie: tcp balancer doing open/close socket) | |
350 | if err != io.EOF { | |
351 | errorf("transport: http2Server.HandleStreams failed to receive the preface from client: %v", err) | |
352 | } | |
353 | t.Close() | |
354 | return | |
355 | } | |
356 | if !bytes.Equal(preface, clientPreface) { | |
357 | errorf("transport: http2Server.HandleStreams received bogus greeting from client: %q", preface) | |
358 | t.Close() | |
359 | return | |
360 | } | |
361 | ||
362 | frame, err := t.framer.readFrame() | |
363 | if err == io.EOF || err == io.ErrUnexpectedEOF { | |
364 | t.Close() | |
365 | return | |
366 | } | |
367 | if err != nil { | |
368 | errorf("transport: http2Server.HandleStreams failed to read initial settings frame: %v", err) | |
369 | t.Close() | |
370 | return | |
371 | } | |
372 | atomic.StoreUint32(&t.activity, 1) | |
373 | sf, ok := frame.(*http2.SettingsFrame) | |
374 | if !ok { | |
375 | errorf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) | |
376 | t.Close() | |
377 | return | |
378 | } | |
379 | t.handleSettings(sf) | |
380 | ||
381 | for { | |
382 | frame, err := t.framer.readFrame() | |
383 | atomic.StoreUint32(&t.activity, 1) | |
384 | if err != nil { | |
385 | if se, ok := err.(http2.StreamError); ok { | |
386 | t.mu.Lock() | |
387 | s := t.activeStreams[se.StreamID] | |
388 | t.mu.Unlock() | |
389 | if s != nil { | |
390 | t.closeStream(s) | |
391 | } | |
392 | t.controlBuf.put(&resetStream{se.StreamID, se.Code}) | |
393 | continue | |
394 | } | |
395 | if err == io.EOF || err == io.ErrUnexpectedEOF { | |
396 | t.Close() | |
397 | return | |
398 | } | |
399 | warningf("transport: http2Server.HandleStreams failed to read frame: %v", err) | |
400 | t.Close() | |
401 | return | |
402 | } | |
403 | switch frame := frame.(type) { | |
404 | case *http2.MetaHeadersFrame: | |
405 | if t.operateHeaders(frame, handle, traceCtx) { | |
406 | t.Close() | |
407 | break | |
408 | } | |
409 | case *http2.DataFrame: | |
410 | t.handleData(frame) | |
411 | case *http2.RSTStreamFrame: | |
412 | t.handleRSTStream(frame) | |
413 | case *http2.SettingsFrame: | |
414 | t.handleSettings(frame) | |
415 | case *http2.PingFrame: | |
416 | t.handlePing(frame) | |
417 | case *http2.WindowUpdateFrame: | |
418 | t.handleWindowUpdate(frame) | |
419 | case *http2.GoAwayFrame: | |
420 | // TODO: Handle GoAway from the client appropriately. | |
421 | default: | |
422 | errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) | |
423 | } | |
424 | } | |
425 | } | |
426 | ||
427 | func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { | |
428 | t.mu.Lock() | |
429 | defer t.mu.Unlock() | |
430 | if t.activeStreams == nil { | |
431 | // The transport is closing. | |
432 | return nil, false | |
433 | } | |
434 | s, ok := t.activeStreams[f.Header().StreamID] | |
435 | if !ok { | |
436 | // The stream is already done. | |
437 | return nil, false | |
438 | } | |
439 | return s, true | |
440 | } | |
441 | ||
442 | // adjustWindow sends out extra window update over the initial window size | |
443 | // of stream if the application is requesting data larger in size than | |
444 | // the window. | |
445 | func (t *http2Server) adjustWindow(s *Stream, n uint32) { | |
446 | s.mu.Lock() | |
447 | defer s.mu.Unlock() | |
448 | if s.state == streamDone { | |
449 | return | |
450 | } | |
451 | if w := s.fc.maybeAdjust(n); w > 0 { | |
452 | if cw := t.fc.resetPendingUpdate(); cw > 0 { | |
453 | t.controlBuf.put(&windowUpdate{0, cw, false}) | |
454 | } | |
455 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | |
456 | } | |
457 | } | |
458 | ||
459 | // updateWindow adjusts the inbound quota for the stream and the transport. | |
460 | // Window updates will deliver to the controller for sending when | |
461 | // the cumulative quota exceeds the corresponding threshold. | |
462 | func (t *http2Server) updateWindow(s *Stream, n uint32) { | |
463 | s.mu.Lock() | |
464 | defer s.mu.Unlock() | |
465 | if s.state == streamDone { | |
466 | return | |
467 | } | |
468 | if w := s.fc.onRead(n); w > 0 { | |
469 | if cw := t.fc.resetPendingUpdate(); cw > 0 { | |
470 | t.controlBuf.put(&windowUpdate{0, cw, false}) | |
471 | } | |
472 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | |
473 | } | |
474 | } | |
475 | ||
476 | // updateFlowControl updates the incoming flow control windows | |
477 | // for the transport and the stream based on the current bdp | |
478 | // estimation. | |
479 | func (t *http2Server) updateFlowControl(n uint32) { | |
480 | t.mu.Lock() | |
481 | for _, s := range t.activeStreams { | |
482 | s.fc.newLimit(n) | |
483 | } | |
484 | t.initialWindowSize = int32(n) | |
485 | t.mu.Unlock() | |
486 | t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n), false}) | |
487 | t.controlBuf.put(&settings{ | |
488 | ack: false, | |
489 | ss: []http2.Setting{ | |
490 | { | |
491 | ID: http2.SettingInitialWindowSize, | |
492 | Val: uint32(n), | |
493 | }, | |
494 | }, | |
495 | }) | |
496 | ||
497 | } | |
498 | ||
499 | func (t *http2Server) handleData(f *http2.DataFrame) { | |
500 | size := f.Header().Length | |
501 | var sendBDPPing bool | |
502 | if t.bdpEst != nil { | |
503 | sendBDPPing = t.bdpEst.add(uint32(size)) | |
504 | } | |
505 | // Decouple connection's flow control from application's read. | |
506 | // An update on connection's flow control should not depend on | |
507 | // whether user application has read the data or not. Such a | |
508 | // restriction is already imposed on the stream's flow control, | |
509 | // and therefore the sender will be blocked anyways. | |
510 | // Decoupling the connection flow control will prevent other | |
511 | // active(fast) streams from starving in presence of slow or | |
512 | // inactive streams. | |
513 | // | |
514 | // Furthermore, if a bdpPing is being sent out we can piggyback | |
515 | // connection's window update for the bytes we just received. | |
516 | if sendBDPPing { | |
517 | t.controlBuf.put(&windowUpdate{0, uint32(size), false}) | |
518 | t.controlBuf.put(bdpPing) | |
519 | } else { | |
520 | if err := t.fc.onData(uint32(size)); err != nil { | |
521 | errorf("transport: http2Server %v", err) | |
522 | t.Close() | |
523 | return | |
524 | } | |
525 | if w := t.fc.onRead(uint32(size)); w > 0 { | |
526 | t.controlBuf.put(&windowUpdate{0, w, true}) | |
527 | } | |
528 | } | |
529 | // Select the right stream to dispatch. | |
530 | s, ok := t.getStream(f) | |
531 | if !ok { | |
532 | return | |
533 | } | |
534 | if size > 0 { | |
535 | s.mu.Lock() | |
536 | if s.state == streamDone { | |
537 | s.mu.Unlock() | |
538 | return | |
539 | } | |
540 | if err := s.fc.onData(uint32(size)); err != nil { | |
541 | s.mu.Unlock() | |
542 | t.closeStream(s) | |
543 | t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) | |
544 | return | |
545 | } | |
546 | if f.Header().Flags.Has(http2.FlagDataPadded) { | |
547 | if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { | |
548 | t.controlBuf.put(&windowUpdate{s.id, w, true}) | |
549 | } | |
550 | } | |
551 | s.mu.Unlock() | |
552 | // TODO(bradfitz, zhaoq): A copy is required here because there is no | |
553 | // guarantee f.Data() is consumed before the arrival of next frame. | |
554 | // Can this copy be eliminated? | |
555 | if len(f.Data()) > 0 { | |
556 | data := make([]byte, len(f.Data())) | |
557 | copy(data, f.Data()) | |
558 | s.write(recvMsg{data: data}) | |
559 | } | |
560 | } | |
561 | if f.Header().Flags.Has(http2.FlagDataEndStream) { | |
562 | // Received the end of stream from the client. | |
563 | s.mu.Lock() | |
564 | if s.state != streamDone { | |
565 | s.state = streamReadDone | |
566 | } | |
567 | s.mu.Unlock() | |
568 | s.write(recvMsg{err: io.EOF}) | |
569 | } | |
570 | } | |
571 | ||
572 | func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { | |
573 | s, ok := t.getStream(f) | |
574 | if !ok { | |
575 | return | |
576 | } | |
577 | t.closeStream(s) | |
578 | } | |
579 | ||
580 | func (t *http2Server) handleSettings(f *http2.SettingsFrame) { | |
581 | if f.IsAck() { | |
582 | return | |
583 | } | |
584 | var ss []http2.Setting | |
585 | f.ForeachSetting(func(s http2.Setting) error { | |
586 | ss = append(ss, s) | |
587 | return nil | |
588 | }) | |
589 | // The settings will be applied once the ack is sent. | |
590 | t.controlBuf.put(&settings{ack: true, ss: ss}) | |
591 | } | |
592 | ||
593 | const ( | |
594 | maxPingStrikes = 2 | |
595 | defaultPingTimeout = 2 * time.Hour | |
596 | ) | |
597 | ||
598 | func (t *http2Server) handlePing(f *http2.PingFrame) { | |
599 | if f.IsAck() { | |
600 | if f.Data == goAwayPing.data && t.drainChan != nil { | |
601 | close(t.drainChan) | |
602 | return | |
603 | } | |
604 | // Maybe it's a BDP ping. | |
605 | if t.bdpEst != nil { | |
606 | t.bdpEst.calculate(f.Data) | |
607 | } | |
608 | return | |
609 | } | |
610 | pingAck := &ping{ack: true} | |
611 | copy(pingAck.data[:], f.Data[:]) | |
612 | t.controlBuf.put(pingAck) | |
613 | ||
614 | now := time.Now() | |
615 | defer func() { | |
616 | t.lastPingAt = now | |
617 | }() | |
618 | // A reset ping strikes means that we don't need to check for policy | |
619 | // violation for this ping and the pingStrikes counter should be set | |
620 | // to 0. | |
621 | if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) { | |
622 | t.pingStrikes = 0 | |
623 | return | |
624 | } | |
625 | t.mu.Lock() | |
626 | ns := len(t.activeStreams) | |
627 | t.mu.Unlock() | |
628 | if ns < 1 && !t.kep.PermitWithoutStream { | |
629 | // Keepalive shouldn't be active thus, this new ping should | |
630 | // have come after atleast defaultPingTimeout. | |
631 | if t.lastPingAt.Add(defaultPingTimeout).After(now) { | |
632 | t.pingStrikes++ | |
633 | } | |
634 | } else { | |
635 | // Check if keepalive policy is respected. | |
636 | if t.lastPingAt.Add(t.kep.MinTime).After(now) { | |
637 | t.pingStrikes++ | |
638 | } | |
639 | } | |
640 | ||
641 | if t.pingStrikes > maxPingStrikes { | |
642 | // Send goaway and close the connection. | |
643 | t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) | |
644 | } | |
645 | } | |
646 | ||
647 | func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { | |
648 | id := f.Header().StreamID | |
649 | incr := f.Increment | |
650 | if id == 0 { | |
651 | t.sendQuotaPool.add(int(incr)) | |
652 | return | |
653 | } | |
654 | if s, ok := t.getStream(f); ok { | |
655 | s.sendQuotaPool.add(int(incr)) | |
656 | } | |
657 | } | |
658 | ||
659 | func (t *http2Server) writeHeaders(s *Stream, b *bytes.Buffer, endStream bool) error { | |
660 | first := true | |
661 | endHeaders := false | |
662 | var err error | |
663 | defer func() { | |
664 | if err == nil { | |
665 | // Reset ping strikes when seding headers since that might cause the | |
666 | // peer to send ping. | |
667 | atomic.StoreUint32(&t.resetPingStrikes, 1) | |
668 | } | |
669 | }() | |
670 | // Sends the headers in a single batch. | |
671 | for !endHeaders { | |
672 | size := t.hBuf.Len() | |
673 | if size > http2MaxFrameLen { | |
674 | size = http2MaxFrameLen | |
675 | } else { | |
676 | endHeaders = true | |
677 | } | |
678 | if first { | |
679 | p := http2.HeadersFrameParam{ | |
680 | StreamID: s.id, | |
681 | BlockFragment: b.Next(size), | |
682 | EndStream: endStream, | |
683 | EndHeaders: endHeaders, | |
684 | } | |
685 | err = t.framer.writeHeaders(endHeaders, p) | |
686 | first = false | |
687 | } else { | |
688 | err = t.framer.writeContinuation(endHeaders, s.id, endHeaders, b.Next(size)) | |
689 | } | |
690 | if err != nil { | |
691 | t.Close() | |
692 | return connectionErrorf(true, err, "transport: %v", err) | |
693 | } | |
694 | } | |
695 | return nil | |
696 | } | |
697 | ||
698 | // WriteHeader sends the header metedata md back to the client. | |
699 | func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { | |
700 | s.mu.Lock() | |
701 | if s.headerOk || s.state == streamDone { | |
702 | s.mu.Unlock() | |
703 | return ErrIllegalHeaderWrite | |
704 | } | |
705 | s.headerOk = true | |
706 | if md.Len() > 0 { | |
707 | if s.header.Len() > 0 { | |
708 | s.header = metadata.Join(s.header, md) | |
709 | } else { | |
710 | s.header = md | |
711 | } | |
712 | } | |
713 | md = s.header | |
714 | s.mu.Unlock() | |
715 | if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { | |
716 | return err | |
717 | } | |
718 | t.hBuf.Reset() | |
719 | t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) | |
720 | t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) | |
721 | if s.sendCompress != "" { | |
722 | t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) | |
723 | } | |
724 | for k, vv := range md { | |
725 | if isReservedHeader(k) { | |
726 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | |
727 | continue | |
728 | } | |
729 | for _, v := range vv { | |
730 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | |
731 | } | |
732 | } | |
733 | bufLen := t.hBuf.Len() | |
734 | if err := t.writeHeaders(s, t.hBuf, false); err != nil { | |
735 | return err | |
736 | } | |
737 | if t.stats != nil { | |
738 | outHeader := &stats.OutHeader{ | |
739 | WireLength: bufLen, | |
740 | } | |
741 | t.stats.HandleRPC(s.Context(), outHeader) | |
742 | } | |
743 | t.writableChan <- 0 | |
744 | return nil | |
745 | } | |
746 | ||
747 | // WriteStatus sends stream status to the client and terminates the stream. | |
748 | // There is no further I/O operations being able to perform on this stream. | |
749 | // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early | |
750 | // OK is adopted. | |
751 | func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { | |
752 | var headersSent, hasHeader bool | |
753 | s.mu.Lock() | |
754 | if s.state == streamDone { | |
755 | s.mu.Unlock() | |
756 | return nil | |
757 | } | |
758 | if s.headerOk { | |
759 | headersSent = true | |
760 | } | |
761 | if s.header.Len() > 0 { | |
762 | hasHeader = true | |
763 | } | |
764 | s.mu.Unlock() | |
765 | ||
766 | if !headersSent && hasHeader { | |
767 | t.WriteHeader(s, nil) | |
768 | headersSent = true | |
769 | } | |
770 | ||
771 | if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { | |
772 | return err | |
773 | } | |
774 | t.hBuf.Reset() | |
775 | if !headersSent { | |
776 | t.hEnc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"}) | |
777 | t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) | |
778 | } | |
779 | t.hEnc.WriteField( | |
780 | hpack.HeaderField{ | |
781 | Name: "grpc-status", | |
782 | Value: strconv.Itoa(int(st.Code())), | |
783 | }) | |
784 | t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) | |
785 | ||
786 | if p := st.Proto(); p != nil && len(p.Details) > 0 { | |
787 | stBytes, err := proto.Marshal(p) | |
788 | if err != nil { | |
789 | // TODO: return error instead, when callers are able to handle it. | |
790 | panic(err) | |
791 | } | |
792 | ||
793 | t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)}) | |
794 | } | |
795 | ||
796 | // Attach the trailer metadata. | |
797 | for k, vv := range s.trailer { | |
798 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | |
799 | if isReservedHeader(k) { | |
800 | continue | |
801 | } | |
802 | for _, v := range vv { | |
803 | t.hEnc.WriteField(hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | |
804 | } | |
805 | } | |
806 | bufLen := t.hBuf.Len() | |
807 | if err := t.writeHeaders(s, t.hBuf, true); err != nil { | |
808 | t.Close() | |
809 | return err | |
810 | } | |
811 | if t.stats != nil { | |
812 | outTrailer := &stats.OutTrailer{ | |
813 | WireLength: bufLen, | |
814 | } | |
815 | t.stats.HandleRPC(s.Context(), outTrailer) | |
816 | } | |
817 | t.closeStream(s) | |
818 | t.writableChan <- 0 | |
819 | return nil | |
820 | } | |
821 | ||
822 | // Write converts the data into HTTP2 data frame and sends it out. Non-nil error | |
823 | // is returns if it fails (e.g., framing error, transport error). | |
824 | func (t *http2Server) Write(s *Stream, data []byte, opts *Options) (err error) { | |
825 | // TODO(zhaoq): Support multi-writers for a single stream. | |
826 | var writeHeaderFrame bool | |
827 | s.mu.Lock() | |
828 | if s.state == streamDone { | |
829 | s.mu.Unlock() | |
830 | return streamErrorf(codes.Unknown, "the stream has been done") | |
831 | } | |
832 | if !s.headerOk { | |
833 | writeHeaderFrame = true | |
834 | } | |
835 | s.mu.Unlock() | |
836 | if writeHeaderFrame { | |
837 | t.WriteHeader(s, nil) | |
838 | } | |
839 | r := bytes.NewBuffer(data) | |
840 | var ( | |
841 | p []byte | |
842 | oqv uint32 | |
843 | ) | |
844 | for { | |
845 | if r.Len() == 0 && p == nil { | |
846 | return nil | |
847 | } | |
848 | oqv = atomic.LoadUint32(&t.outQuotaVersion) | |
849 | size := http2MaxFrameLen | |
850 | // Wait until the stream has some quota to send the data. | |
851 | sq, err := wait(s.ctx, nil, nil, t.shutdownChan, s.sendQuotaPool.acquire()) | |
852 | if err != nil { | |
853 | return err | |
854 | } | |
855 | // Wait until the transport has some quota to send the data. | |
856 | tq, err := wait(s.ctx, nil, nil, t.shutdownChan, t.sendQuotaPool.acquire()) | |
857 | if err != nil { | |
858 | return err | |
859 | } | |
860 | if sq < size { | |
861 | size = sq | |
862 | } | |
863 | if tq < size { | |
864 | size = tq | |
865 | } | |
866 | if p == nil { | |
867 | p = r.Next(size) | |
868 | } | |
869 | ps := len(p) | |
870 | if ps < sq { | |
871 | // Overbooked stream quota. Return it back. | |
872 | s.sendQuotaPool.add(sq - ps) | |
873 | } | |
874 | if ps < tq { | |
875 | // Overbooked transport quota. Return it back. | |
876 | t.sendQuotaPool.add(tq - ps) | |
877 | } | |
878 | t.framer.adjustNumWriters(1) | |
879 | // Got some quota. Try to acquire writing privilege on the | |
880 | // transport. | |
881 | if _, err := wait(s.ctx, nil, nil, t.shutdownChan, t.writableChan); err != nil { | |
882 | if _, ok := err.(StreamError); ok { | |
883 | // Return the connection quota back. | |
884 | t.sendQuotaPool.add(ps) | |
885 | } | |
886 | if t.framer.adjustNumWriters(-1) == 0 { | |
887 | // This writer is the last one in this batch and has the | |
888 | // responsibility to flush the buffered frames. It queues | |
889 | // a flush request to controlBuf instead of flushing directly | |
890 | // in order to avoid the race with other writing or flushing. | |
891 | t.controlBuf.put(&flushIO{}) | |
892 | } | |
893 | return err | |
894 | } | |
895 | select { | |
896 | case <-s.ctx.Done(): | |
897 | t.sendQuotaPool.add(ps) | |
898 | if t.framer.adjustNumWriters(-1) == 0 { | |
899 | t.controlBuf.put(&flushIO{}) | |
900 | } | |
901 | t.writableChan <- 0 | |
902 | return ContextErr(s.ctx.Err()) | |
903 | default: | |
904 | } | |
905 | if oqv != atomic.LoadUint32(&t.outQuotaVersion) { | |
906 | // InitialWindowSize settings frame must have been received after we | |
907 | // acquired send quota but before we got the writable channel. | |
908 | // We must forsake this write. | |
909 | t.sendQuotaPool.add(ps) | |
910 | s.sendQuotaPool.add(ps) | |
911 | if t.framer.adjustNumWriters(-1) == 0 { | |
912 | t.controlBuf.put(&flushIO{}) | |
913 | } | |
914 | t.writableChan <- 0 | |
915 | continue | |
916 | } | |
917 | var forceFlush bool | |
918 | if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 && !opts.Last { | |
919 | forceFlush = true | |
920 | } | |
921 | // Reset ping strikes when sending data since this might cause | |
922 | // the peer to send ping. | |
923 | atomic.StoreUint32(&t.resetPingStrikes, 1) | |
924 | if err := t.framer.writeData(forceFlush, s.id, false, p); err != nil { | |
925 | t.Close() | |
926 | return connectionErrorf(true, err, "transport: %v", err) | |
927 | } | |
928 | p = nil | |
929 | if t.framer.adjustNumWriters(-1) == 0 { | |
930 | t.framer.flushWrite() | |
931 | } | |
932 | t.writableChan <- 0 | |
933 | } | |
934 | ||
935 | } | |
936 | ||
937 | func (t *http2Server) applySettings(ss []http2.Setting) { | |
938 | for _, s := range ss { | |
939 | if s.ID == http2.SettingInitialWindowSize { | |
940 | t.mu.Lock() | |
941 | defer t.mu.Unlock() | |
942 | for _, stream := range t.activeStreams { | |
943 | stream.sendQuotaPool.add(int(s.Val) - int(t.streamSendQuota)) | |
944 | } | |
945 | t.streamSendQuota = s.Val | |
946 | atomic.AddUint32(&t.outQuotaVersion, 1) | |
947 | } | |
948 | ||
949 | } | |
950 | } | |
951 | ||
952 | // keepalive running in a separate goroutine does the following: | |
953 | // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. | |
954 | // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. | |
955 | // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. | |
956 | // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection | |
957 | // after an additional duration of keepalive.Timeout. | |
958 | func (t *http2Server) keepalive() { | |
959 | p := &ping{} | |
960 | var pingSent bool | |
961 | maxIdle := time.NewTimer(t.kp.MaxConnectionIdle) | |
962 | maxAge := time.NewTimer(t.kp.MaxConnectionAge) | |
963 | keepalive := time.NewTimer(t.kp.Time) | |
964 | // NOTE: All exit paths of this function should reset their | |
965 | // respecitve timers. A failure to do so will cause the | |
966 | // following clean-up to deadlock and eventually leak. | |
967 | defer func() { | |
968 | if !maxIdle.Stop() { | |
969 | <-maxIdle.C | |
970 | } | |
971 | if !maxAge.Stop() { | |
972 | <-maxAge.C | |
973 | } | |
974 | if !keepalive.Stop() { | |
975 | <-keepalive.C | |
976 | } | |
977 | }() | |
978 | for { | |
979 | select { | |
980 | case <-maxIdle.C: | |
981 | t.mu.Lock() | |
982 | idle := t.idle | |
983 | if idle.IsZero() { // The connection is non-idle. | |
984 | t.mu.Unlock() | |
985 | maxIdle.Reset(t.kp.MaxConnectionIdle) | |
986 | continue | |
987 | } | |
988 | val := t.kp.MaxConnectionIdle - time.Since(idle) | |
989 | t.mu.Unlock() | |
990 | if val <= 0 { | |
991 | // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. | |
992 | // Gracefully close the connection. | |
993 | t.drain(http2.ErrCodeNo, []byte{}) | |
994 | // Reseting the timer so that the clean-up doesn't deadlock. | |
995 | maxIdle.Reset(infinity) | |
996 | return | |
997 | } | |
998 | maxIdle.Reset(val) | |
999 | case <-maxAge.C: | |
1000 | t.drain(http2.ErrCodeNo, []byte{}) | |
1001 | maxAge.Reset(t.kp.MaxConnectionAgeGrace) | |
1002 | select { | |
1003 | case <-maxAge.C: | |
1004 | // Close the connection after grace period. | |
1005 | t.Close() | |
1006 | // Reseting the timer so that the clean-up doesn't deadlock. | |
1007 | maxAge.Reset(infinity) | |
1008 | case <-t.shutdownChan: | |
1009 | } | |
1010 | return | |
1011 | case <-keepalive.C: | |
1012 | if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | |
1013 | pingSent = false | |
1014 | keepalive.Reset(t.kp.Time) | |
1015 | continue | |
1016 | } | |
1017 | if pingSent { | |
1018 | t.Close() | |
1019 | // Reseting the timer so that the clean-up doesn't deadlock. | |
1020 | keepalive.Reset(infinity) | |
1021 | return | |
1022 | } | |
1023 | pingSent = true | |
1024 | t.controlBuf.put(p) | |
1025 | keepalive.Reset(t.kp.Timeout) | |
1026 | case <-t.shutdownChan: | |
1027 | return | |
1028 | } | |
1029 | } | |
1030 | } | |
1031 | ||
1032 | var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} | |
1033 | ||
1034 | // controller running in a separate goroutine takes charge of sending control | |
1035 | // frames (e.g., window update, reset stream, setting, etc.) to the server. | |
1036 | func (t *http2Server) controller() { | |
1037 | for { | |
1038 | select { | |
1039 | case i := <-t.controlBuf.get(): | |
1040 | t.controlBuf.load() | |
1041 | select { | |
1042 | case <-t.writableChan: | |
1043 | switch i := i.(type) { | |
1044 | case *windowUpdate: | |
1045 | t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment) | |
1046 | case *settings: | |
1047 | if i.ack { | |
1048 | t.framer.writeSettingsAck(true) | |
1049 | t.applySettings(i.ss) | |
1050 | } else { | |
1051 | t.framer.writeSettings(true, i.ss...) | |
1052 | } | |
1053 | case *resetStream: | |
1054 | t.framer.writeRSTStream(true, i.streamID, i.code) | |
1055 | case *goAway: | |
1056 | t.mu.Lock() | |
1057 | if t.state == closing { | |
1058 | t.mu.Unlock() | |
1059 | // The transport is closing. | |
1060 | return | |
1061 | } | |
1062 | sid := t.maxStreamID | |
1063 | if !i.headsUp { | |
1064 | // Stop accepting more streams now. | |
1065 | t.state = draining | |
1066 | t.mu.Unlock() | |
1067 | t.framer.writeGoAway(true, sid, i.code, i.debugData) | |
1068 | if i.closeConn { | |
1069 | // Abruptly close the connection following the GoAway. | |
1070 | t.Close() | |
1071 | } | |
1072 | t.writableChan <- 0 | |
1073 | continue | |
1074 | } | |
1075 | t.mu.Unlock() | |
1076 | // For a graceful close, send out a GoAway with stream ID of MaxUInt32, | |
1077 | // Follow that with a ping and wait for the ack to come back or a timer | |
1078 | // to expire. During this time accept new streams since they might have | |
1079 | // originated before the GoAway reaches the client. | |
1080 | // After getting the ack or timer expiration send out another GoAway this | |
1081 | // time with an ID of the max stream server intends to process. | |
1082 | t.framer.writeGoAway(true, math.MaxUint32, http2.ErrCodeNo, []byte{}) | |
1083 | t.framer.writePing(true, false, goAwayPing.data) | |
1084 | go func() { | |
1085 | timer := time.NewTimer(time.Minute) | |
1086 | defer timer.Stop() | |
1087 | select { | |
1088 | case <-t.drainChan: | |
1089 | case <-timer.C: | |
1090 | case <-t.shutdownChan: | |
1091 | return | |
1092 | } | |
1093 | t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData}) | |
1094 | }() | |
1095 | case *flushIO: | |
1096 | t.framer.flushWrite() | |
1097 | case *ping: | |
1098 | if !i.ack { | |
1099 | t.bdpEst.timesnap(i.data) | |
1100 | } | |
1101 | t.framer.writePing(true, i.ack, i.data) | |
1102 | default: | |
1103 | errorf("transport: http2Server.controller got unexpected item type %v\n", i) | |
1104 | } | |
1105 | t.writableChan <- 0 | |
1106 | continue | |
1107 | case <-t.shutdownChan: | |
1108 | return | |
1109 | } | |
1110 | case <-t.shutdownChan: | |
1111 | return | |
1112 | } | |
1113 | } | |
1114 | } | |
1115 | ||
1116 | // Close starts shutting down the http2Server transport. | |
1117 | // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This | |
1118 | // could cause some resource issue. Revisit this later. | |
1119 | func (t *http2Server) Close() (err error) { | |
1120 | t.mu.Lock() | |
1121 | if t.state == closing { | |
1122 | t.mu.Unlock() | |
1123 | return errors.New("transport: Close() was already called") | |
1124 | } | |
1125 | t.state = closing | |
1126 | streams := t.activeStreams | |
1127 | t.activeStreams = nil | |
1128 | t.mu.Unlock() | |
1129 | close(t.shutdownChan) | |
1130 | err = t.conn.Close() | |
1131 | // Cancel all active streams. | |
1132 | for _, s := range streams { | |
1133 | s.cancel() | |
1134 | } | |
1135 | if t.stats != nil { | |
1136 | connEnd := &stats.ConnEnd{} | |
1137 | t.stats.HandleConn(t.ctx, connEnd) | |
1138 | } | |
1139 | return | |
1140 | } | |
1141 | ||
1142 | // closeStream clears the footprint of a stream when the stream is not needed | |
1143 | // any more. | |
1144 | func (t *http2Server) closeStream(s *Stream) { | |
1145 | t.mu.Lock() | |
1146 | delete(t.activeStreams, s.id) | |
1147 | if len(t.activeStreams) == 0 { | |
1148 | t.idle = time.Now() | |
1149 | } | |
1150 | if t.state == draining && len(t.activeStreams) == 0 { | |
1151 | defer t.Close() | |
1152 | } | |
1153 | t.mu.Unlock() | |
1154 | // In case stream sending and receiving are invoked in separate | |
1155 | // goroutines (e.g., bi-directional streaming), cancel needs to be | |
1156 | // called to interrupt the potential blocking on other goroutines. | |
1157 | s.cancel() | |
1158 | s.mu.Lock() | |
1159 | if s.state == streamDone { | |
1160 | s.mu.Unlock() | |
1161 | return | |
1162 | } | |
1163 | s.state = streamDone | |
1164 | s.mu.Unlock() | |
1165 | } | |
1166 | ||
1167 | func (t *http2Server) RemoteAddr() net.Addr { | |
1168 | return t.remoteAddr | |
1169 | } | |
1170 | ||
1171 | func (t *http2Server) Drain() { | |
1172 | t.drain(http2.ErrCodeNo, []byte{}) | |
1173 | } | |
1174 | ||
1175 | func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { | |
1176 | t.mu.Lock() | |
1177 | defer t.mu.Unlock() | |
1178 | if t.drainChan != nil { | |
1179 | return | |
1180 | } | |
1181 | t.drainChan = make(chan struct{}) | |
1182 | t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true}) | |
1183 | } | |
1184 | ||
1185 | var rgen = rand.New(rand.NewSource(time.Now().UnixNano())) | |
1186 | ||
1187 | func getJitter(v time.Duration) time.Duration { | |
1188 | if v == infinity { | |
1189 | return 0 | |
1190 | } | |
1191 | // Generate a jitter between +/- 10% of the value. | |
1192 | r := int64(v / 10) | |
1193 | j := rgen.Int63n(2*r) - r | |
1194 | return time.Duration(j) | |
1195 | } |