diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport/http2_server.go')
-rw-r--r-- | vendor/google.golang.org/grpc/internal/transport/http2_server.go | 1180 |
1 files changed, 1180 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/http2_server.go b/vendor/google.golang.org/grpc/internal/transport/http2_server.go new file mode 100644 index 0000000..df27403 --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/transport/http2_server.go | |||
@@ -0,0 +1,1180 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2014 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | package transport | ||
20 | |||
21 | import ( | ||
22 | "bytes" | ||
23 | "context" | ||
24 | "errors" | ||
25 | "fmt" | ||
26 | "io" | ||
27 | "math" | ||
28 | "net" | ||
29 | "strconv" | ||
30 | "sync" | ||
31 | "sync/atomic" | ||
32 | "time" | ||
33 | |||
34 | "github.com/golang/protobuf/proto" | ||
35 | "golang.org/x/net/http2" | ||
36 | "golang.org/x/net/http2/hpack" | ||
37 | |||
38 | "google.golang.org/grpc/codes" | ||
39 | "google.golang.org/grpc/credentials" | ||
40 | "google.golang.org/grpc/grpclog" | ||
41 | "google.golang.org/grpc/internal/channelz" | ||
42 | "google.golang.org/grpc/internal/grpcrand" | ||
43 | "google.golang.org/grpc/keepalive" | ||
44 | "google.golang.org/grpc/metadata" | ||
45 | "google.golang.org/grpc/peer" | ||
46 | "google.golang.org/grpc/stats" | ||
47 | "google.golang.org/grpc/status" | ||
48 | "google.golang.org/grpc/tap" | ||
49 | ) | ||
50 | |||
51 | var ( | ||
52 | // ErrIllegalHeaderWrite indicates that setting header is illegal because of | ||
53 | // the stream's state. | ||
54 | ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called") | ||
55 | // ErrHeaderListSizeLimitViolation indicates that the header list size is larger | ||
56 | // than the limit set by peer. | ||
57 | ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer") | ||
58 | ) | ||
59 | |||
60 | // http2Server implements the ServerTransport interface with HTTP2. | ||
61 | type http2Server struct { | ||
62 | ctx context.Context | ||
63 | ctxDone <-chan struct{} // Cache the context.Done() chan | ||
64 | cancel context.CancelFunc | ||
65 | conn net.Conn | ||
66 | loopy *loopyWriter | ||
67 | readerDone chan struct{} // sync point to enable testing. | ||
68 | writerDone chan struct{} // sync point to enable testing. | ||
69 | remoteAddr net.Addr | ||
70 | localAddr net.Addr | ||
71 | maxStreamID uint32 // max stream ID ever seen | ||
72 | authInfo credentials.AuthInfo // auth info about the connection | ||
73 | inTapHandle tap.ServerInHandle | ||
74 | framer *framer | ||
75 | // The max number of concurrent streams. | ||
76 | maxStreams uint32 | ||
77 | // controlBuf delivers all the control related tasks (e.g., window | ||
78 | // updates, reset streams, and various settings) to the controller. | ||
79 | controlBuf *controlBuffer | ||
80 | fc *trInFlow | ||
81 | stats stats.Handler | ||
82 | // Flag to keep track of reading activity on transport. | ||
83 | // 1 is true and 0 is false. | ||
84 | activity uint32 // Accessed atomically. | ||
85 | // Keepalive and max-age parameters for the server. | ||
86 | kp keepalive.ServerParameters | ||
87 | |||
88 | // Keepalive enforcement policy. | ||
89 | kep keepalive.EnforcementPolicy | ||
90 | // The time instance last ping was received. | ||
91 | lastPingAt time.Time | ||
92 | // Number of times the client has violated keepalive ping policy so far. | ||
93 | pingStrikes uint8 | ||
94 | // Flag to signify that number of ping strikes should be reset to 0. | ||
95 | // This is set whenever data or header frames are sent. | ||
96 | // 1 means yes. | ||
97 | resetPingStrikes uint32 // Accessed atomically. | ||
98 | initialWindowSize int32 | ||
99 | bdpEst *bdpEstimator | ||
100 | maxSendHeaderListSize *uint32 | ||
101 | |||
102 | mu sync.Mutex // guard the following | ||
103 | |||
104 | // drainChan is initialized when drain(...) is called the first time. | ||
105 | // After which the server writes out the first GoAway(with ID 2^31-1) frame. | ||
106 | // Then an independent goroutine will be launched to later send the second GoAway. | ||
107 | // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. | ||
108 | // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is | ||
109 | // already underway. | ||
110 | drainChan chan struct{} | ||
111 | state transportState | ||
112 | activeStreams map[uint32]*Stream | ||
113 | // idle is the time instant when the connection went idle. | ||
114 | // This is either the beginning of the connection or when the number of | ||
115 | // RPCs go down to 0. | ||
116 | // When the connection is busy, this value is set to 0. | ||
117 | idle time.Time | ||
118 | |||
119 | // Fields below are for channelz metric collection. | ||
120 | channelzID int64 // channelz unique identification number | ||
121 | czData *channelzData | ||
122 | } | ||
123 | |||
124 | // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is | ||
125 | // returned if something goes wrong. | ||
126 | func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { | ||
127 | writeBufSize := config.WriteBufferSize | ||
128 | readBufSize := config.ReadBufferSize | ||
129 | maxHeaderListSize := defaultServerMaxHeaderListSize | ||
130 | if config.MaxHeaderListSize != nil { | ||
131 | maxHeaderListSize = *config.MaxHeaderListSize | ||
132 | } | ||
133 | framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize) | ||
134 | // Send initial settings as connection preface to client. | ||
135 | var isettings []http2.Setting | ||
136 | // TODO(zhaoq): Have a better way to signal "no limit" because 0 is | ||
137 | // permitted in the HTTP2 spec. | ||
138 | maxStreams := config.MaxStreams | ||
139 | if maxStreams == 0 { | ||
140 | maxStreams = math.MaxUint32 | ||
141 | } else { | ||
142 | isettings = append(isettings, http2.Setting{ | ||
143 | ID: http2.SettingMaxConcurrentStreams, | ||
144 | Val: maxStreams, | ||
145 | }) | ||
146 | } | ||
147 | dynamicWindow := true | ||
148 | iwz := int32(initialWindowSize) | ||
149 | if config.InitialWindowSize >= defaultWindowSize { | ||
150 | iwz = config.InitialWindowSize | ||
151 | dynamicWindow = false | ||
152 | } | ||
153 | icwz := int32(initialWindowSize) | ||
154 | if config.InitialConnWindowSize >= defaultWindowSize { | ||
155 | icwz = config.InitialConnWindowSize | ||
156 | dynamicWindow = false | ||
157 | } | ||
158 | if iwz != defaultWindowSize { | ||
159 | isettings = append(isettings, http2.Setting{ | ||
160 | ID: http2.SettingInitialWindowSize, | ||
161 | Val: uint32(iwz)}) | ||
162 | } | ||
163 | if config.MaxHeaderListSize != nil { | ||
164 | isettings = append(isettings, http2.Setting{ | ||
165 | ID: http2.SettingMaxHeaderListSize, | ||
166 | Val: *config.MaxHeaderListSize, | ||
167 | }) | ||
168 | } | ||
169 | if err := framer.fr.WriteSettings(isettings...); err != nil { | ||
170 | return nil, connectionErrorf(false, err, "transport: %v", err) | ||
171 | } | ||
172 | // Adjust the connection flow control window if needed. | ||
173 | if delta := uint32(icwz - defaultWindowSize); delta > 0 { | ||
174 | if err := framer.fr.WriteWindowUpdate(0, delta); err != nil { | ||
175 | return nil, connectionErrorf(false, err, "transport: %v", err) | ||
176 | } | ||
177 | } | ||
178 | kp := config.KeepaliveParams | ||
179 | if kp.MaxConnectionIdle == 0 { | ||
180 | kp.MaxConnectionIdle = defaultMaxConnectionIdle | ||
181 | } | ||
182 | if kp.MaxConnectionAge == 0 { | ||
183 | kp.MaxConnectionAge = defaultMaxConnectionAge | ||
184 | } | ||
185 | // Add a jitter to MaxConnectionAge. | ||
186 | kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge) | ||
187 | if kp.MaxConnectionAgeGrace == 0 { | ||
188 | kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace | ||
189 | } | ||
190 | if kp.Time == 0 { | ||
191 | kp.Time = defaultServerKeepaliveTime | ||
192 | } | ||
193 | if kp.Timeout == 0 { | ||
194 | kp.Timeout = defaultServerKeepaliveTimeout | ||
195 | } | ||
196 | kep := config.KeepalivePolicy | ||
197 | if kep.MinTime == 0 { | ||
198 | kep.MinTime = defaultKeepalivePolicyMinTime | ||
199 | } | ||
200 | ctx, cancel := context.WithCancel(context.Background()) | ||
201 | t := &http2Server{ | ||
202 | ctx: ctx, | ||
203 | cancel: cancel, | ||
204 | ctxDone: ctx.Done(), | ||
205 | conn: conn, | ||
206 | remoteAddr: conn.RemoteAddr(), | ||
207 | localAddr: conn.LocalAddr(), | ||
208 | authInfo: config.AuthInfo, | ||
209 | framer: framer, | ||
210 | readerDone: make(chan struct{}), | ||
211 | writerDone: make(chan struct{}), | ||
212 | maxStreams: maxStreams, | ||
213 | inTapHandle: config.InTapHandle, | ||
214 | fc: &trInFlow{limit: uint32(icwz)}, | ||
215 | state: reachable, | ||
216 | activeStreams: make(map[uint32]*Stream), | ||
217 | stats: config.StatsHandler, | ||
218 | kp: kp, | ||
219 | idle: time.Now(), | ||
220 | kep: kep, | ||
221 | initialWindowSize: iwz, | ||
222 | czData: new(channelzData), | ||
223 | } | ||
224 | t.controlBuf = newControlBuffer(t.ctxDone) | ||
225 | if dynamicWindow { | ||
226 | t.bdpEst = &bdpEstimator{ | ||
227 | bdp: initialWindowSize, | ||
228 | updateFlowControl: t.updateFlowControl, | ||
229 | } | ||
230 | } | ||
231 | if t.stats != nil { | ||
232 | t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ | ||
233 | RemoteAddr: t.remoteAddr, | ||
234 | LocalAddr: t.localAddr, | ||
235 | }) | ||
236 | connBegin := &stats.ConnBegin{} | ||
237 | t.stats.HandleConn(t.ctx, connBegin) | ||
238 | } | ||
239 | if channelz.IsOn() { | ||
240 | t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) | ||
241 | } | ||
242 | t.framer.writer.Flush() | ||
243 | |||
244 | defer func() { | ||
245 | if err != nil { | ||
246 | t.Close() | ||
247 | } | ||
248 | }() | ||
249 | |||
250 | // Check the validity of client preface. | ||
251 | preface := make([]byte, len(clientPreface)) | ||
252 | if _, err := io.ReadFull(t.conn, preface); err != nil { | ||
253 | return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err) | ||
254 | } | ||
255 | if !bytes.Equal(preface, clientPreface) { | ||
256 | return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface) | ||
257 | } | ||
258 | |||
259 | frame, err := t.framer.fr.ReadFrame() | ||
260 | if err == io.EOF || err == io.ErrUnexpectedEOF { | ||
261 | return nil, err | ||
262 | } | ||
263 | if err != nil { | ||
264 | return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err) | ||
265 | } | ||
266 | atomic.StoreUint32(&t.activity, 1) | ||
267 | sf, ok := frame.(*http2.SettingsFrame) | ||
268 | if !ok { | ||
269 | return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) | ||
270 | } | ||
271 | t.handleSettings(sf) | ||
272 | |||
273 | go func() { | ||
274 | t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst) | ||
275 | t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler | ||
276 | if err := t.loopy.run(); err != nil { | ||
277 | errorf("transport: loopyWriter.run returning. Err: %v", err) | ||
278 | } | ||
279 | t.conn.Close() | ||
280 | close(t.writerDone) | ||
281 | }() | ||
282 | go t.keepalive() | ||
283 | return t, nil | ||
284 | } | ||
285 | |||
286 | // operateHeader takes action on the decoded headers. | ||
287 | func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { | ||
288 | streamID := frame.Header().StreamID | ||
289 | state := decodeState{serverSide: true} | ||
290 | if err := state.decodeHeader(frame); err != nil { | ||
291 | if se, ok := status.FromError(err); ok { | ||
292 | t.controlBuf.put(&cleanupStream{ | ||
293 | streamID: streamID, | ||
294 | rst: true, | ||
295 | rstCode: statusCodeConvTab[se.Code()], | ||
296 | onWrite: func() {}, | ||
297 | }) | ||
298 | } | ||
299 | return false | ||
300 | } | ||
301 | |||
302 | buf := newRecvBuffer() | ||
303 | s := &Stream{ | ||
304 | id: streamID, | ||
305 | st: t, | ||
306 | buf: buf, | ||
307 | fc: &inFlow{limit: uint32(t.initialWindowSize)}, | ||
308 | recvCompress: state.encoding, | ||
309 | method: state.method, | ||
310 | contentSubtype: state.contentSubtype, | ||
311 | } | ||
312 | if frame.StreamEnded() { | ||
313 | // s is just created by the caller. No lock needed. | ||
314 | s.state = streamReadDone | ||
315 | } | ||
316 | if state.timeoutSet { | ||
317 | s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout) | ||
318 | } else { | ||
319 | s.ctx, s.cancel = context.WithCancel(t.ctx) | ||
320 | } | ||
321 | pr := &peer.Peer{ | ||
322 | Addr: t.remoteAddr, | ||
323 | } | ||
324 | // Attach Auth info if there is any. | ||
325 | if t.authInfo != nil { | ||
326 | pr.AuthInfo = t.authInfo | ||
327 | } | ||
328 | s.ctx = peer.NewContext(s.ctx, pr) | ||
329 | // Attach the received metadata to the context. | ||
330 | if len(state.mdata) > 0 { | ||
331 | s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata) | ||
332 | } | ||
333 | if state.statsTags != nil { | ||
334 | s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags) | ||
335 | } | ||
336 | if state.statsTrace != nil { | ||
337 | s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace) | ||
338 | } | ||
339 | if t.inTapHandle != nil { | ||
340 | var err error | ||
341 | info := &tap.Info{ | ||
342 | FullMethodName: state.method, | ||
343 | } | ||
344 | s.ctx, err = t.inTapHandle(s.ctx, info) | ||
345 | if err != nil { | ||
346 | warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err) | ||
347 | t.controlBuf.put(&cleanupStream{ | ||
348 | streamID: s.id, | ||
349 | rst: true, | ||
350 | rstCode: http2.ErrCodeRefusedStream, | ||
351 | onWrite: func() {}, | ||
352 | }) | ||
353 | return false | ||
354 | } | ||
355 | } | ||
356 | t.mu.Lock() | ||
357 | if t.state != reachable { | ||
358 | t.mu.Unlock() | ||
359 | return false | ||
360 | } | ||
361 | if uint32(len(t.activeStreams)) >= t.maxStreams { | ||
362 | t.mu.Unlock() | ||
363 | t.controlBuf.put(&cleanupStream{ | ||
364 | streamID: streamID, | ||
365 | rst: true, | ||
366 | rstCode: http2.ErrCodeRefusedStream, | ||
367 | onWrite: func() {}, | ||
368 | }) | ||
369 | return false | ||
370 | } | ||
371 | if streamID%2 != 1 || streamID <= t.maxStreamID { | ||
372 | t.mu.Unlock() | ||
373 | // illegal gRPC stream id. | ||
374 | errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID) | ||
375 | return true | ||
376 | } | ||
377 | t.maxStreamID = streamID | ||
378 | t.activeStreams[streamID] = s | ||
379 | if len(t.activeStreams) == 1 { | ||
380 | t.idle = time.Time{} | ||
381 | } | ||
382 | t.mu.Unlock() | ||
383 | if channelz.IsOn() { | ||
384 | atomic.AddInt64(&t.czData.streamsStarted, 1) | ||
385 | atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) | ||
386 | } | ||
387 | s.requestRead = func(n int) { | ||
388 | t.adjustWindow(s, uint32(n)) | ||
389 | } | ||
390 | s.ctx = traceCtx(s.ctx, s.method) | ||
391 | if t.stats != nil { | ||
392 | s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) | ||
393 | inHeader := &stats.InHeader{ | ||
394 | FullMethod: s.method, | ||
395 | RemoteAddr: t.remoteAddr, | ||
396 | LocalAddr: t.localAddr, | ||
397 | Compression: s.recvCompress, | ||
398 | WireLength: int(frame.Header().Length), | ||
399 | } | ||
400 | t.stats.HandleRPC(s.ctx, inHeader) | ||
401 | } | ||
402 | s.ctxDone = s.ctx.Done() | ||
403 | s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone) | ||
404 | s.trReader = &transportReader{ | ||
405 | reader: &recvBufferReader{ | ||
406 | ctx: s.ctx, | ||
407 | ctxDone: s.ctxDone, | ||
408 | recv: s.buf, | ||
409 | }, | ||
410 | windowHandler: func(n int) { | ||
411 | t.updateWindow(s, uint32(n)) | ||
412 | }, | ||
413 | } | ||
414 | // Register the stream with loopy. | ||
415 | t.controlBuf.put(®isterStream{ | ||
416 | streamID: s.id, | ||
417 | wq: s.wq, | ||
418 | }) | ||
419 | handle(s) | ||
420 | return false | ||
421 | } | ||
422 | |||
423 | // HandleStreams receives incoming streams using the given handler. This is | ||
424 | // typically run in a separate goroutine. | ||
425 | // traceCtx attaches trace to ctx and returns the new context. | ||
426 | func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { | ||
427 | defer close(t.readerDone) | ||
428 | for { | ||
429 | frame, err := t.framer.fr.ReadFrame() | ||
430 | atomic.StoreUint32(&t.activity, 1) | ||
431 | if err != nil { | ||
432 | if se, ok := err.(http2.StreamError); ok { | ||
433 | warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se) | ||
434 | t.mu.Lock() | ||
435 | s := t.activeStreams[se.StreamID] | ||
436 | t.mu.Unlock() | ||
437 | if s != nil { | ||
438 | t.closeStream(s, true, se.Code, nil, false) | ||
439 | } else { | ||
440 | t.controlBuf.put(&cleanupStream{ | ||
441 | streamID: se.StreamID, | ||
442 | rst: true, | ||
443 | rstCode: se.Code, | ||
444 | onWrite: func() {}, | ||
445 | }) | ||
446 | } | ||
447 | continue | ||
448 | } | ||
449 | if err == io.EOF || err == io.ErrUnexpectedEOF { | ||
450 | t.Close() | ||
451 | return | ||
452 | } | ||
453 | warningf("transport: http2Server.HandleStreams failed to read frame: %v", err) | ||
454 | t.Close() | ||
455 | return | ||
456 | } | ||
457 | switch frame := frame.(type) { | ||
458 | case *http2.MetaHeadersFrame: | ||
459 | if t.operateHeaders(frame, handle, traceCtx) { | ||
460 | t.Close() | ||
461 | break | ||
462 | } | ||
463 | case *http2.DataFrame: | ||
464 | t.handleData(frame) | ||
465 | case *http2.RSTStreamFrame: | ||
466 | t.handleRSTStream(frame) | ||
467 | case *http2.SettingsFrame: | ||
468 | t.handleSettings(frame) | ||
469 | case *http2.PingFrame: | ||
470 | t.handlePing(frame) | ||
471 | case *http2.WindowUpdateFrame: | ||
472 | t.handleWindowUpdate(frame) | ||
473 | case *http2.GoAwayFrame: | ||
474 | // TODO: Handle GoAway from the client appropriately. | ||
475 | default: | ||
476 | errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) | ||
477 | } | ||
478 | } | ||
479 | } | ||
480 | |||
481 | func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { | ||
482 | t.mu.Lock() | ||
483 | defer t.mu.Unlock() | ||
484 | if t.activeStreams == nil { | ||
485 | // The transport is closing. | ||
486 | return nil, false | ||
487 | } | ||
488 | s, ok := t.activeStreams[f.Header().StreamID] | ||
489 | if !ok { | ||
490 | // The stream is already done. | ||
491 | return nil, false | ||
492 | } | ||
493 | return s, true | ||
494 | } | ||
495 | |||
496 | // adjustWindow sends out extra window update over the initial window size | ||
497 | // of stream if the application is requesting data larger in size than | ||
498 | // the window. | ||
499 | func (t *http2Server) adjustWindow(s *Stream, n uint32) { | ||
500 | if w := s.fc.maybeAdjust(n); w > 0 { | ||
501 | t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) | ||
502 | } | ||
503 | |||
504 | } | ||
505 | |||
506 | // updateWindow adjusts the inbound quota for the stream and the transport. | ||
507 | // Window updates will deliver to the controller for sending when | ||
508 | // the cumulative quota exceeds the corresponding threshold. | ||
509 | func (t *http2Server) updateWindow(s *Stream, n uint32) { | ||
510 | if w := s.fc.onRead(n); w > 0 { | ||
511 | t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, | ||
512 | increment: w, | ||
513 | }) | ||
514 | } | ||
515 | } | ||
516 | |||
517 | // updateFlowControl updates the incoming flow control windows | ||
518 | // for the transport and the stream based on the current bdp | ||
519 | // estimation. | ||
520 | func (t *http2Server) updateFlowControl(n uint32) { | ||
521 | t.mu.Lock() | ||
522 | for _, s := range t.activeStreams { | ||
523 | s.fc.newLimit(n) | ||
524 | } | ||
525 | t.initialWindowSize = int32(n) | ||
526 | t.mu.Unlock() | ||
527 | t.controlBuf.put(&outgoingWindowUpdate{ | ||
528 | streamID: 0, | ||
529 | increment: t.fc.newLimit(n), | ||
530 | }) | ||
531 | t.controlBuf.put(&outgoingSettings{ | ||
532 | ss: []http2.Setting{ | ||
533 | { | ||
534 | ID: http2.SettingInitialWindowSize, | ||
535 | Val: n, | ||
536 | }, | ||
537 | }, | ||
538 | }) | ||
539 | |||
540 | } | ||
541 | |||
542 | func (t *http2Server) handleData(f *http2.DataFrame) { | ||
543 | size := f.Header().Length | ||
544 | var sendBDPPing bool | ||
545 | if t.bdpEst != nil { | ||
546 | sendBDPPing = t.bdpEst.add(size) | ||
547 | } | ||
548 | // Decouple connection's flow control from application's read. | ||
549 | // An update on connection's flow control should not depend on | ||
550 | // whether user application has read the data or not. Such a | ||
551 | // restriction is already imposed on the stream's flow control, | ||
552 | // and therefore the sender will be blocked anyways. | ||
553 | // Decoupling the connection flow control will prevent other | ||
554 | // active(fast) streams from starving in presence of slow or | ||
555 | // inactive streams. | ||
556 | if w := t.fc.onData(size); w > 0 { | ||
557 | t.controlBuf.put(&outgoingWindowUpdate{ | ||
558 | streamID: 0, | ||
559 | increment: w, | ||
560 | }) | ||
561 | } | ||
562 | if sendBDPPing { | ||
563 | // Avoid excessive ping detection (e.g. in an L7 proxy) | ||
564 | // by sending a window update prior to the BDP ping. | ||
565 | if w := t.fc.reset(); w > 0 { | ||
566 | t.controlBuf.put(&outgoingWindowUpdate{ | ||
567 | streamID: 0, | ||
568 | increment: w, | ||
569 | }) | ||
570 | } | ||
571 | t.controlBuf.put(bdpPing) | ||
572 | } | ||
573 | // Select the right stream to dispatch. | ||
574 | s, ok := t.getStream(f) | ||
575 | if !ok { | ||
576 | return | ||
577 | } | ||
578 | if size > 0 { | ||
579 | if err := s.fc.onData(size); err != nil { | ||
580 | t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false) | ||
581 | return | ||
582 | } | ||
583 | if f.Header().Flags.Has(http2.FlagDataPadded) { | ||
584 | if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 { | ||
585 | t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) | ||
586 | } | ||
587 | } | ||
588 | // TODO(bradfitz, zhaoq): A copy is required here because there is no | ||
589 | // guarantee f.Data() is consumed before the arrival of next frame. | ||
590 | // Can this copy be eliminated? | ||
591 | if len(f.Data()) > 0 { | ||
592 | data := make([]byte, len(f.Data())) | ||
593 | copy(data, f.Data()) | ||
594 | s.write(recvMsg{data: data}) | ||
595 | } | ||
596 | } | ||
597 | if f.Header().Flags.Has(http2.FlagDataEndStream) { | ||
598 | // Received the end of stream from the client. | ||
599 | s.compareAndSwapState(streamActive, streamReadDone) | ||
600 | s.write(recvMsg{err: io.EOF}) | ||
601 | } | ||
602 | } | ||
603 | |||
604 | func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { | ||
605 | s, ok := t.getStream(f) | ||
606 | if !ok { | ||
607 | return | ||
608 | } | ||
609 | t.closeStream(s, false, 0, nil, false) | ||
610 | } | ||
611 | |||
612 | func (t *http2Server) handleSettings(f *http2.SettingsFrame) { | ||
613 | if f.IsAck() { | ||
614 | return | ||
615 | } | ||
616 | var ss []http2.Setting | ||
617 | var updateFuncs []func() | ||
618 | f.ForeachSetting(func(s http2.Setting) error { | ||
619 | switch s.ID { | ||
620 | case http2.SettingMaxHeaderListSize: | ||
621 | updateFuncs = append(updateFuncs, func() { | ||
622 | t.maxSendHeaderListSize = new(uint32) | ||
623 | *t.maxSendHeaderListSize = s.Val | ||
624 | }) | ||
625 | default: | ||
626 | ss = append(ss, s) | ||
627 | } | ||
628 | return nil | ||
629 | }) | ||
630 | t.controlBuf.executeAndPut(func(interface{}) bool { | ||
631 | for _, f := range updateFuncs { | ||
632 | f() | ||
633 | } | ||
634 | return true | ||
635 | }, &incomingSettings{ | ||
636 | ss: ss, | ||
637 | }) | ||
638 | } | ||
639 | |||
640 | const ( | ||
641 | maxPingStrikes = 2 | ||
642 | defaultPingTimeout = 2 * time.Hour | ||
643 | ) | ||
644 | |||
645 | func (t *http2Server) handlePing(f *http2.PingFrame) { | ||
646 | if f.IsAck() { | ||
647 | if f.Data == goAwayPing.data && t.drainChan != nil { | ||
648 | close(t.drainChan) | ||
649 | return | ||
650 | } | ||
651 | // Maybe it's a BDP ping. | ||
652 | if t.bdpEst != nil { | ||
653 | t.bdpEst.calculate(f.Data) | ||
654 | } | ||
655 | return | ||
656 | } | ||
657 | pingAck := &ping{ack: true} | ||
658 | copy(pingAck.data[:], f.Data[:]) | ||
659 | t.controlBuf.put(pingAck) | ||
660 | |||
661 | now := time.Now() | ||
662 | defer func() { | ||
663 | t.lastPingAt = now | ||
664 | }() | ||
665 | // A reset ping strikes means that we don't need to check for policy | ||
666 | // violation for this ping and the pingStrikes counter should be set | ||
667 | // to 0. | ||
668 | if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) { | ||
669 | t.pingStrikes = 0 | ||
670 | return | ||
671 | } | ||
672 | t.mu.Lock() | ||
673 | ns := len(t.activeStreams) | ||
674 | t.mu.Unlock() | ||
675 | if ns < 1 && !t.kep.PermitWithoutStream { | ||
676 | // Keepalive shouldn't be active thus, this new ping should | ||
677 | // have come after at least defaultPingTimeout. | ||
678 | if t.lastPingAt.Add(defaultPingTimeout).After(now) { | ||
679 | t.pingStrikes++ | ||
680 | } | ||
681 | } else { | ||
682 | // Check if keepalive policy is respected. | ||
683 | if t.lastPingAt.Add(t.kep.MinTime).After(now) { | ||
684 | t.pingStrikes++ | ||
685 | } | ||
686 | } | ||
687 | |||
688 | if t.pingStrikes > maxPingStrikes { | ||
689 | // Send goaway and close the connection. | ||
690 | errorf("transport: Got too many pings from the client, closing the connection.") | ||
691 | t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) | ||
692 | } | ||
693 | } | ||
694 | |||
695 | func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { | ||
696 | t.controlBuf.put(&incomingWindowUpdate{ | ||
697 | streamID: f.Header().StreamID, | ||
698 | increment: f.Increment, | ||
699 | }) | ||
700 | } | ||
701 | |||
702 | func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField { | ||
703 | for k, vv := range md { | ||
704 | if isReservedHeader(k) { | ||
705 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | ||
706 | continue | ||
707 | } | ||
708 | for _, v := range vv { | ||
709 | headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | ||
710 | } | ||
711 | } | ||
712 | return headerFields | ||
713 | } | ||
714 | |||
715 | func (t *http2Server) checkForHeaderListSize(it interface{}) bool { | ||
716 | if t.maxSendHeaderListSize == nil { | ||
717 | return true | ||
718 | } | ||
719 | hdrFrame := it.(*headerFrame) | ||
720 | var sz int64 | ||
721 | for _, f := range hdrFrame.hf { | ||
722 | if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { | ||
723 | errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize) | ||
724 | return false | ||
725 | } | ||
726 | } | ||
727 | return true | ||
728 | } | ||
729 | |||
730 | // WriteHeader sends the header metedata md back to the client. | ||
731 | func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { | ||
732 | if s.updateHeaderSent() || s.getState() == streamDone { | ||
733 | return ErrIllegalHeaderWrite | ||
734 | } | ||
735 | s.hdrMu.Lock() | ||
736 | if md.Len() > 0 { | ||
737 | if s.header.Len() > 0 { | ||
738 | s.header = metadata.Join(s.header, md) | ||
739 | } else { | ||
740 | s.header = md | ||
741 | } | ||
742 | } | ||
743 | if err := t.writeHeaderLocked(s); err != nil { | ||
744 | s.hdrMu.Unlock() | ||
745 | return err | ||
746 | } | ||
747 | s.hdrMu.Unlock() | ||
748 | return nil | ||
749 | } | ||
750 | |||
751 | func (t *http2Server) writeHeaderLocked(s *Stream) error { | ||
752 | // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields | ||
753 | // first and create a slice of that exact size. | ||
754 | headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. | ||
755 | headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) | ||
756 | headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) | ||
757 | if s.sendCompress != "" { | ||
758 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) | ||
759 | } | ||
760 | headerFields = appendHeaderFieldsFromMD(headerFields, s.header) | ||
761 | success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{ | ||
762 | streamID: s.id, | ||
763 | hf: headerFields, | ||
764 | endStream: false, | ||
765 | onWrite: func() { | ||
766 | atomic.StoreUint32(&t.resetPingStrikes, 1) | ||
767 | }, | ||
768 | }) | ||
769 | if !success { | ||
770 | if err != nil { | ||
771 | return err | ||
772 | } | ||
773 | t.closeStream(s, true, http2.ErrCodeInternal, nil, false) | ||
774 | return ErrHeaderListSizeLimitViolation | ||
775 | } | ||
776 | if t.stats != nil { | ||
777 | // Note: WireLength is not set in outHeader. | ||
778 | // TODO(mmukhi): Revisit this later, if needed. | ||
779 | outHeader := &stats.OutHeader{} | ||
780 | t.stats.HandleRPC(s.Context(), outHeader) | ||
781 | } | ||
782 | return nil | ||
783 | } | ||
784 | |||
785 | // WriteStatus sends stream status to the client and terminates the stream. | ||
786 | // There is no further I/O operations being able to perform on this stream. | ||
787 | // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early | ||
788 | // OK is adopted. | ||
789 | func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { | ||
790 | if s.getState() == streamDone { | ||
791 | return nil | ||
792 | } | ||
793 | s.hdrMu.Lock() | ||
794 | // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields | ||
795 | // first and create a slice of that exact size. | ||
796 | headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else. | ||
797 | if !s.updateHeaderSent() { // No headers have been sent. | ||
798 | if len(s.header) > 0 { // Send a separate header frame. | ||
799 | if err := t.writeHeaderLocked(s); err != nil { | ||
800 | s.hdrMu.Unlock() | ||
801 | return err | ||
802 | } | ||
803 | } else { // Send a trailer only response. | ||
804 | headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) | ||
805 | headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) | ||
806 | } | ||
807 | } | ||
808 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) | ||
809 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) | ||
810 | |||
811 | if p := st.Proto(); p != nil && len(p.Details) > 0 { | ||
812 | stBytes, err := proto.Marshal(p) | ||
813 | if err != nil { | ||
814 | // TODO: return error instead, when callers are able to handle it. | ||
815 | grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err) | ||
816 | } else { | ||
817 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)}) | ||
818 | } | ||
819 | } | ||
820 | |||
821 | // Attach the trailer metadata. | ||
822 | headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer) | ||
823 | trailingHeader := &headerFrame{ | ||
824 | streamID: s.id, | ||
825 | hf: headerFields, | ||
826 | endStream: true, | ||
827 | onWrite: func() { | ||
828 | atomic.StoreUint32(&t.resetPingStrikes, 1) | ||
829 | }, | ||
830 | } | ||
831 | s.hdrMu.Unlock() | ||
832 | success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader) | ||
833 | if !success { | ||
834 | if err != nil { | ||
835 | return err | ||
836 | } | ||
837 | t.closeStream(s, true, http2.ErrCodeInternal, nil, false) | ||
838 | return ErrHeaderListSizeLimitViolation | ||
839 | } | ||
840 | t.closeStream(s, false, 0, trailingHeader, true) | ||
841 | if t.stats != nil { | ||
842 | t.stats.HandleRPC(s.Context(), &stats.OutTrailer{}) | ||
843 | } | ||
844 | return nil | ||
845 | } | ||
846 | |||
847 | // Write converts the data into HTTP2 data frame and sends it out. Non-nil error | ||
848 | // is returns if it fails (e.g., framing error, transport error). | ||
849 | func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { | ||
850 | if !s.isHeaderSent() { // Headers haven't been written yet. | ||
851 | if err := t.WriteHeader(s, nil); err != nil { | ||
852 | // TODO(mmukhi, dfawley): Make sure this is the right code to return. | ||
853 | return status.Errorf(codes.Internal, "transport: %v", err) | ||
854 | } | ||
855 | } else { | ||
856 | // Writing headers checks for this condition. | ||
857 | if s.getState() == streamDone { | ||
858 | // TODO(mmukhi, dfawley): Should the server write also return io.EOF? | ||
859 | s.cancel() | ||
860 | select { | ||
861 | case <-t.ctx.Done(): | ||
862 | return ErrConnClosing | ||
863 | default: | ||
864 | } | ||
865 | return ContextErr(s.ctx.Err()) | ||
866 | } | ||
867 | } | ||
868 | // Add some data to header frame so that we can equally distribute bytes across frames. | ||
869 | emptyLen := http2MaxFrameLen - len(hdr) | ||
870 | if emptyLen > len(data) { | ||
871 | emptyLen = len(data) | ||
872 | } | ||
873 | hdr = append(hdr, data[:emptyLen]...) | ||
874 | data = data[emptyLen:] | ||
875 | df := &dataFrame{ | ||
876 | streamID: s.id, | ||
877 | h: hdr, | ||
878 | d: data, | ||
879 | onEachWrite: func() { | ||
880 | atomic.StoreUint32(&t.resetPingStrikes, 1) | ||
881 | }, | ||
882 | } | ||
883 | if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { | ||
884 | select { | ||
885 | case <-t.ctx.Done(): | ||
886 | return ErrConnClosing | ||
887 | default: | ||
888 | } | ||
889 | return ContextErr(s.ctx.Err()) | ||
890 | } | ||
891 | return t.controlBuf.put(df) | ||
892 | } | ||
893 | |||
894 | // keepalive running in a separate goroutine does the following: | ||
895 | // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. | ||
896 | // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. | ||
897 | // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. | ||
898 | // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection | ||
899 | // after an additional duration of keepalive.Timeout. | ||
900 | func (t *http2Server) keepalive() { | ||
901 | p := &ping{} | ||
902 | var pingSent bool | ||
903 | maxIdle := time.NewTimer(t.kp.MaxConnectionIdle) | ||
904 | maxAge := time.NewTimer(t.kp.MaxConnectionAge) | ||
905 | keepalive := time.NewTimer(t.kp.Time) | ||
906 | // NOTE: All exit paths of this function should reset their | ||
907 | // respective timers. A failure to do so will cause the | ||
908 | // following clean-up to deadlock and eventually leak. | ||
909 | defer func() { | ||
910 | if !maxIdle.Stop() { | ||
911 | <-maxIdle.C | ||
912 | } | ||
913 | if !maxAge.Stop() { | ||
914 | <-maxAge.C | ||
915 | } | ||
916 | if !keepalive.Stop() { | ||
917 | <-keepalive.C | ||
918 | } | ||
919 | }() | ||
920 | for { | ||
921 | select { | ||
922 | case <-maxIdle.C: | ||
923 | t.mu.Lock() | ||
924 | idle := t.idle | ||
925 | if idle.IsZero() { // The connection is non-idle. | ||
926 | t.mu.Unlock() | ||
927 | maxIdle.Reset(t.kp.MaxConnectionIdle) | ||
928 | continue | ||
929 | } | ||
930 | val := t.kp.MaxConnectionIdle - time.Since(idle) | ||
931 | t.mu.Unlock() | ||
932 | if val <= 0 { | ||
933 | // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. | ||
934 | // Gracefully close the connection. | ||
935 | t.drain(http2.ErrCodeNo, []byte{}) | ||
936 | // Resetting the timer so that the clean-up doesn't deadlock. | ||
937 | maxIdle.Reset(infinity) | ||
938 | return | ||
939 | } | ||
940 | maxIdle.Reset(val) | ||
941 | case <-maxAge.C: | ||
942 | t.drain(http2.ErrCodeNo, []byte{}) | ||
943 | maxAge.Reset(t.kp.MaxConnectionAgeGrace) | ||
944 | select { | ||
945 | case <-maxAge.C: | ||
946 | // Close the connection after grace period. | ||
947 | t.Close() | ||
948 | // Resetting the timer so that the clean-up doesn't deadlock. | ||
949 | maxAge.Reset(infinity) | ||
950 | case <-t.ctx.Done(): | ||
951 | } | ||
952 | return | ||
953 | case <-keepalive.C: | ||
954 | if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | ||
955 | pingSent = false | ||
956 | keepalive.Reset(t.kp.Time) | ||
957 | continue | ||
958 | } | ||
959 | if pingSent { | ||
960 | t.Close() | ||
961 | // Resetting the timer so that the clean-up doesn't deadlock. | ||
962 | keepalive.Reset(infinity) | ||
963 | return | ||
964 | } | ||
965 | pingSent = true | ||
966 | if channelz.IsOn() { | ||
967 | atomic.AddInt64(&t.czData.kpCount, 1) | ||
968 | } | ||
969 | t.controlBuf.put(p) | ||
970 | keepalive.Reset(t.kp.Timeout) | ||
971 | case <-t.ctx.Done(): | ||
972 | return | ||
973 | } | ||
974 | } | ||
975 | } | ||
976 | |||
977 | // Close starts shutting down the http2Server transport. | ||
978 | // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This | ||
979 | // could cause some resource issue. Revisit this later. | ||
980 | func (t *http2Server) Close() error { | ||
981 | t.mu.Lock() | ||
982 | if t.state == closing { | ||
983 | t.mu.Unlock() | ||
984 | return errors.New("transport: Close() was already called") | ||
985 | } | ||
986 | t.state = closing | ||
987 | streams := t.activeStreams | ||
988 | t.activeStreams = nil | ||
989 | t.mu.Unlock() | ||
990 | t.controlBuf.finish() | ||
991 | t.cancel() | ||
992 | err := t.conn.Close() | ||
993 | if channelz.IsOn() { | ||
994 | channelz.RemoveEntry(t.channelzID) | ||
995 | } | ||
996 | // Cancel all active streams. | ||
997 | for _, s := range streams { | ||
998 | s.cancel() | ||
999 | } | ||
1000 | if t.stats != nil { | ||
1001 | connEnd := &stats.ConnEnd{} | ||
1002 | t.stats.HandleConn(t.ctx, connEnd) | ||
1003 | } | ||
1004 | return err | ||
1005 | } | ||
1006 | |||
1007 | // closeStream clears the footprint of a stream when the stream is not needed | ||
1008 | // any more. | ||
1009 | func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) { | ||
1010 | if s.swapState(streamDone) == streamDone { | ||
1011 | // If the stream was already done, return. | ||
1012 | return | ||
1013 | } | ||
1014 | // In case stream sending and receiving are invoked in separate | ||
1015 | // goroutines (e.g., bi-directional streaming), cancel needs to be | ||
1016 | // called to interrupt the potential blocking on other goroutines. | ||
1017 | s.cancel() | ||
1018 | cleanup := &cleanupStream{ | ||
1019 | streamID: s.id, | ||
1020 | rst: rst, | ||
1021 | rstCode: rstCode, | ||
1022 | onWrite: func() { | ||
1023 | t.mu.Lock() | ||
1024 | if t.activeStreams != nil { | ||
1025 | delete(t.activeStreams, s.id) | ||
1026 | if len(t.activeStreams) == 0 { | ||
1027 | t.idle = time.Now() | ||
1028 | } | ||
1029 | } | ||
1030 | t.mu.Unlock() | ||
1031 | if channelz.IsOn() { | ||
1032 | if eosReceived { | ||
1033 | atomic.AddInt64(&t.czData.streamsSucceeded, 1) | ||
1034 | } else { | ||
1035 | atomic.AddInt64(&t.czData.streamsFailed, 1) | ||
1036 | } | ||
1037 | } | ||
1038 | }, | ||
1039 | } | ||
1040 | if hdr != nil { | ||
1041 | hdr.cleanup = cleanup | ||
1042 | t.controlBuf.put(hdr) | ||
1043 | } else { | ||
1044 | t.controlBuf.put(cleanup) | ||
1045 | } | ||
1046 | } | ||
1047 | |||
1048 | func (t *http2Server) RemoteAddr() net.Addr { | ||
1049 | return t.remoteAddr | ||
1050 | } | ||
1051 | |||
1052 | func (t *http2Server) Drain() { | ||
1053 | t.drain(http2.ErrCodeNo, []byte{}) | ||
1054 | } | ||
1055 | |||
1056 | func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { | ||
1057 | t.mu.Lock() | ||
1058 | defer t.mu.Unlock() | ||
1059 | if t.drainChan != nil { | ||
1060 | return | ||
1061 | } | ||
1062 | t.drainChan = make(chan struct{}) | ||
1063 | t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true}) | ||
1064 | } | ||
1065 | |||
1066 | var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} | ||
1067 | |||
1068 | // Handles outgoing GoAway and returns true if loopy needs to put itself | ||
1069 | // in draining mode. | ||
1070 | func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { | ||
1071 | t.mu.Lock() | ||
1072 | if t.state == closing { // TODO(mmukhi): This seems unnecessary. | ||
1073 | t.mu.Unlock() | ||
1074 | // The transport is closing. | ||
1075 | return false, ErrConnClosing | ||
1076 | } | ||
1077 | sid := t.maxStreamID | ||
1078 | if !g.headsUp { | ||
1079 | // Stop accepting more streams now. | ||
1080 | t.state = draining | ||
1081 | if len(t.activeStreams) == 0 { | ||
1082 | g.closeConn = true | ||
1083 | } | ||
1084 | t.mu.Unlock() | ||
1085 | if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil { | ||
1086 | return false, err | ||
1087 | } | ||
1088 | if g.closeConn { | ||
1089 | // Abruptly close the connection following the GoAway (via | ||
1090 | // loopywriter). But flush out what's inside the buffer first. | ||
1091 | t.framer.writer.Flush() | ||
1092 | return false, fmt.Errorf("transport: Connection closing") | ||
1093 | } | ||
1094 | return true, nil | ||
1095 | } | ||
1096 | t.mu.Unlock() | ||
1097 | // For a graceful close, send out a GoAway with stream ID of MaxUInt32, | ||
1098 | // Follow that with a ping and wait for the ack to come back or a timer | ||
1099 | // to expire. During this time accept new streams since they might have | ||
1100 | // originated before the GoAway reaches the client. | ||
1101 | // After getting the ack or timer expiration send out another GoAway this | ||
1102 | // time with an ID of the max stream server intends to process. | ||
1103 | if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil { | ||
1104 | return false, err | ||
1105 | } | ||
1106 | if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil { | ||
1107 | return false, err | ||
1108 | } | ||
1109 | go func() { | ||
1110 | timer := time.NewTimer(time.Minute) | ||
1111 | defer timer.Stop() | ||
1112 | select { | ||
1113 | case <-t.drainChan: | ||
1114 | case <-timer.C: | ||
1115 | case <-t.ctx.Done(): | ||
1116 | return | ||
1117 | } | ||
1118 | t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData}) | ||
1119 | }() | ||
1120 | return false, nil | ||
1121 | } | ||
1122 | |||
1123 | func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric { | ||
1124 | s := channelz.SocketInternalMetric{ | ||
1125 | StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted), | ||
1126 | StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded), | ||
1127 | StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed), | ||
1128 | MessagesSent: atomic.LoadInt64(&t.czData.msgSent), | ||
1129 | MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv), | ||
1130 | KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount), | ||
1131 | LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), | ||
1132 | LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), | ||
1133 | LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), | ||
1134 | LocalFlowControlWindow: int64(t.fc.getSize()), | ||
1135 | SocketOptions: channelz.GetSocketOption(t.conn), | ||
1136 | LocalAddr: t.localAddr, | ||
1137 | RemoteAddr: t.remoteAddr, | ||
1138 | // RemoteName : | ||
1139 | } | ||
1140 | if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { | ||
1141 | s.Security = au.GetSecurityValue() | ||
1142 | } | ||
1143 | s.RemoteFlowControlWindow = t.getOutFlowWindow() | ||
1144 | return &s | ||
1145 | } | ||
1146 | |||
1147 | func (t *http2Server) IncrMsgSent() { | ||
1148 | atomic.AddInt64(&t.czData.msgSent, 1) | ||
1149 | atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) | ||
1150 | } | ||
1151 | |||
1152 | func (t *http2Server) IncrMsgRecv() { | ||
1153 | atomic.AddInt64(&t.czData.msgRecv, 1) | ||
1154 | atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) | ||
1155 | } | ||
1156 | |||
1157 | func (t *http2Server) getOutFlowWindow() int64 { | ||
1158 | resp := make(chan uint32) | ||
1159 | timer := time.NewTimer(time.Second) | ||
1160 | defer timer.Stop() | ||
1161 | t.controlBuf.put(&outFlowControlSizeRequest{resp}) | ||
1162 | select { | ||
1163 | case sz := <-resp: | ||
1164 | return int64(sz) | ||
1165 | case <-t.ctxDone: | ||
1166 | return -1 | ||
1167 | case <-timer.C: | ||
1168 | return -2 | ||
1169 | } | ||
1170 | } | ||
1171 | |||
1172 | func getJitter(v time.Duration) time.Duration { | ||
1173 | if v == infinity { | ||
1174 | return 0 | ||
1175 | } | ||
1176 | // Generate a jitter between +/- 10% of the value. | ||
1177 | r := int64(v / 10) | ||
1178 | j := grpcrand.Int63n(2*r) - r | ||
1179 | return time.Duration(j) | ||
1180 | } | ||