]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | /* |
2 | * | |
3 | * Copyright 2014 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package transport | |
20 | ||
21 | import ( | |
22 | "bytes" | |
107c1cdb | 23 | "context" |
15c0b25d | 24 | "errors" |
107c1cdb | 25 | "fmt" |
15c0b25d AP |
26 | "io" |
27 | "math" | |
15c0b25d AP |
28 | "net" |
29 | "strconv" | |
30 | "sync" | |
31 | "sync/atomic" | |
32 | "time" | |
33 | ||
34 | "github.com/golang/protobuf/proto" | |
15c0b25d AP |
35 | "golang.org/x/net/http2" |
36 | "golang.org/x/net/http2/hpack" | |
107c1cdb | 37 | |
15c0b25d AP |
38 | "google.golang.org/grpc/codes" |
39 | "google.golang.org/grpc/credentials" | |
107c1cdb ND |
40 | "google.golang.org/grpc/grpclog" |
41 | "google.golang.org/grpc/internal/channelz" | |
42 | "google.golang.org/grpc/internal/grpcrand" | |
15c0b25d AP |
43 | "google.golang.org/grpc/keepalive" |
44 | "google.golang.org/grpc/metadata" | |
45 | "google.golang.org/grpc/peer" | |
46 | "google.golang.org/grpc/stats" | |
47 | "google.golang.org/grpc/status" | |
48 | "google.golang.org/grpc/tap" | |
49 | ) | |
50 | ||
107c1cdb ND |
51 | var ( |
52 | // ErrIllegalHeaderWrite indicates that setting header is illegal because of | |
53 | // the stream's state. | |
54 | ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHeader was already called") | |
55 | // ErrHeaderListSizeLimitViolation indicates that the header list size is larger | |
56 | // than the limit set by peer. | |
57 | ErrHeaderListSizeLimitViolation = errors.New("transport: trying to send header list size larger than the limit set by peer") | |
58 | ) | |
15c0b25d AP |
59 | |
60 | // http2Server implements the ServerTransport interface with HTTP2. | |
61 | type http2Server struct { | |
62 | ctx context.Context | |
107c1cdb ND |
63 | ctxDone <-chan struct{} // Cache the context.Done() chan |
64 | cancel context.CancelFunc | |
15c0b25d | 65 | conn net.Conn |
107c1cdb ND |
66 | loopy *loopyWriter |
67 | readerDone chan struct{} // sync point to enable testing. | |
68 | writerDone chan struct{} // sync point to enable testing. | |
15c0b25d AP |
69 | remoteAddr net.Addr |
70 | localAddr net.Addr | |
71 | maxStreamID uint32 // max stream ID ever seen | |
72 | authInfo credentials.AuthInfo // auth info about the connection | |
73 | inTapHandle tap.ServerInHandle | |
107c1cdb | 74 | framer *framer |
15c0b25d AP |
75 | // The max number of concurrent streams. |
76 | maxStreams uint32 | |
77 | // controlBuf delivers all the control related tasks (e.g., window | |
78 | // updates, reset streams, and various settings) to the controller. | |
79 | controlBuf *controlBuffer | |
107c1cdb ND |
80 | fc *trInFlow |
81 | stats stats.Handler | |
15c0b25d AP |
82 | // Flag to keep track of reading activity on transport. |
83 | // 1 is true and 0 is false. | |
84 | activity uint32 // Accessed atomically. | |
85 | // Keepalive and max-age parameters for the server. | |
86 | kp keepalive.ServerParameters | |
87 | ||
88 | // Keepalive enforcement policy. | |
89 | kep keepalive.EnforcementPolicy | |
90 | // The time instance last ping was received. | |
91 | lastPingAt time.Time | |
92 | // Number of times the client has violated keepalive ping policy so far. | |
93 | pingStrikes uint8 | |
94 | // Flag to signify that number of ping strikes should be reset to 0. | |
95 | // This is set whenever data or header frames are sent. | |
96 | // 1 means yes. | |
107c1cdb ND |
97 | resetPingStrikes uint32 // Accessed atomically. |
98 | initialWindowSize int32 | |
99 | bdpEst *bdpEstimator | |
100 | maxSendHeaderListSize *uint32 | |
15c0b25d AP |
101 | |
102 | mu sync.Mutex // guard the following | |
103 | ||
104 | // drainChan is initialized when drain(...) is called the first time. | |
105 | // After which the server writes out the first GoAway(with ID 2^31-1) frame. | |
106 | // Then an independent goroutine will be launched to later send the second GoAway. | |
107 | // During this time we don't want to write another first GoAway(with ID 2^31 -1) frame. | |
108 | // Thus call to drain(...) will be a no-op if drainChan is already initialized since draining is | |
109 | // already underway. | |
110 | drainChan chan struct{} | |
111 | state transportState | |
112 | activeStreams map[uint32]*Stream | |
15c0b25d | 113 | // idle is the time instant when the connection went idle. |
107c1cdb | 114 | // This is either the beginning of the connection or when the number of |
15c0b25d AP |
115 | // RPCs go down to 0. |
116 | // When the connection is busy, this value is set to 0. | |
117 | idle time.Time | |
107c1cdb ND |
118 | |
119 | // Fields below are for channelz metric collection. | |
120 | channelzID int64 // channelz unique identification number | |
121 | czData *channelzData | |
15c0b25d AP |
122 | } |
123 | ||
124 | // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is | |
125 | // returned if something goes wrong. | |
126 | func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { | |
107c1cdb ND |
127 | writeBufSize := config.WriteBufferSize |
128 | readBufSize := config.ReadBufferSize | |
129 | maxHeaderListSize := defaultServerMaxHeaderListSize | |
130 | if config.MaxHeaderListSize != nil { | |
131 | maxHeaderListSize = *config.MaxHeaderListSize | |
132 | } | |
133 | framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize) | |
15c0b25d AP |
134 | // Send initial settings as connection preface to client. |
135 | var isettings []http2.Setting | |
136 | // TODO(zhaoq): Have a better way to signal "no limit" because 0 is | |
137 | // permitted in the HTTP2 spec. | |
138 | maxStreams := config.MaxStreams | |
139 | if maxStreams == 0 { | |
140 | maxStreams = math.MaxUint32 | |
141 | } else { | |
142 | isettings = append(isettings, http2.Setting{ | |
143 | ID: http2.SettingMaxConcurrentStreams, | |
144 | Val: maxStreams, | |
145 | }) | |
146 | } | |
147 | dynamicWindow := true | |
148 | iwz := int32(initialWindowSize) | |
149 | if config.InitialWindowSize >= defaultWindowSize { | |
150 | iwz = config.InitialWindowSize | |
151 | dynamicWindow = false | |
152 | } | |
153 | icwz := int32(initialWindowSize) | |
154 | if config.InitialConnWindowSize >= defaultWindowSize { | |
155 | icwz = config.InitialConnWindowSize | |
156 | dynamicWindow = false | |
157 | } | |
158 | if iwz != defaultWindowSize { | |
159 | isettings = append(isettings, http2.Setting{ | |
160 | ID: http2.SettingInitialWindowSize, | |
161 | Val: uint32(iwz)}) | |
162 | } | |
107c1cdb ND |
163 | if config.MaxHeaderListSize != nil { |
164 | isettings = append(isettings, http2.Setting{ | |
165 | ID: http2.SettingMaxHeaderListSize, | |
166 | Val: *config.MaxHeaderListSize, | |
167 | }) | |
168 | } | |
169 | if err := framer.fr.WriteSettings(isettings...); err != nil { | |
170 | return nil, connectionErrorf(false, err, "transport: %v", err) | |
15c0b25d AP |
171 | } |
172 | // Adjust the connection flow control window if needed. | |
173 | if delta := uint32(icwz - defaultWindowSize); delta > 0 { | |
107c1cdb ND |
174 | if err := framer.fr.WriteWindowUpdate(0, delta); err != nil { |
175 | return nil, connectionErrorf(false, err, "transport: %v", err) | |
15c0b25d AP |
176 | } |
177 | } | |
178 | kp := config.KeepaliveParams | |
179 | if kp.MaxConnectionIdle == 0 { | |
180 | kp.MaxConnectionIdle = defaultMaxConnectionIdle | |
181 | } | |
182 | if kp.MaxConnectionAge == 0 { | |
183 | kp.MaxConnectionAge = defaultMaxConnectionAge | |
184 | } | |
185 | // Add a jitter to MaxConnectionAge. | |
186 | kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge) | |
187 | if kp.MaxConnectionAgeGrace == 0 { | |
188 | kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace | |
189 | } | |
190 | if kp.Time == 0 { | |
191 | kp.Time = defaultServerKeepaliveTime | |
192 | } | |
193 | if kp.Timeout == 0 { | |
194 | kp.Timeout = defaultServerKeepaliveTimeout | |
195 | } | |
196 | kep := config.KeepalivePolicy | |
197 | if kep.MinTime == 0 { | |
198 | kep.MinTime = defaultKeepalivePolicyMinTime | |
199 | } | |
107c1cdb | 200 | ctx, cancel := context.WithCancel(context.Background()) |
15c0b25d | 201 | t := &http2Server{ |
107c1cdb ND |
202 | ctx: ctx, |
203 | cancel: cancel, | |
204 | ctxDone: ctx.Done(), | |
15c0b25d AP |
205 | conn: conn, |
206 | remoteAddr: conn.RemoteAddr(), | |
207 | localAddr: conn.LocalAddr(), | |
208 | authInfo: config.AuthInfo, | |
209 | framer: framer, | |
107c1cdb ND |
210 | readerDone: make(chan struct{}), |
211 | writerDone: make(chan struct{}), | |
15c0b25d AP |
212 | maxStreams: maxStreams, |
213 | inTapHandle: config.InTapHandle, | |
107c1cdb | 214 | fc: &trInFlow{limit: uint32(icwz)}, |
15c0b25d | 215 | state: reachable, |
15c0b25d | 216 | activeStreams: make(map[uint32]*Stream), |
15c0b25d AP |
217 | stats: config.StatsHandler, |
218 | kp: kp, | |
219 | idle: time.Now(), | |
220 | kep: kep, | |
221 | initialWindowSize: iwz, | |
107c1cdb | 222 | czData: new(channelzData), |
15c0b25d | 223 | } |
107c1cdb | 224 | t.controlBuf = newControlBuffer(t.ctxDone) |
15c0b25d AP |
225 | if dynamicWindow { |
226 | t.bdpEst = &bdpEstimator{ | |
227 | bdp: initialWindowSize, | |
228 | updateFlowControl: t.updateFlowControl, | |
229 | } | |
230 | } | |
231 | if t.stats != nil { | |
232 | t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ | |
233 | RemoteAddr: t.remoteAddr, | |
234 | LocalAddr: t.localAddr, | |
235 | }) | |
236 | connBegin := &stats.ConnBegin{} | |
237 | t.stats.HandleConn(t.ctx, connBegin) | |
238 | } | |
107c1cdb ND |
239 | if channelz.IsOn() { |
240 | t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr)) | |
241 | } | |
242 | t.framer.writer.Flush() | |
243 | ||
244 | defer func() { | |
245 | if err != nil { | |
246 | t.Close() | |
247 | } | |
248 | }() | |
249 | ||
250 | // Check the validity of client preface. | |
251 | preface := make([]byte, len(clientPreface)) | |
252 | if _, err := io.ReadFull(t.conn, preface); err != nil { | |
253 | return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err) | |
254 | } | |
255 | if !bytes.Equal(preface, clientPreface) { | |
256 | return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface) | |
257 | } | |
258 | ||
259 | frame, err := t.framer.fr.ReadFrame() | |
260 | if err == io.EOF || err == io.ErrUnexpectedEOF { | |
261 | return nil, err | |
262 | } | |
263 | if err != nil { | |
264 | return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err) | |
265 | } | |
266 | atomic.StoreUint32(&t.activity, 1) | |
267 | sf, ok := frame.(*http2.SettingsFrame) | |
268 | if !ok { | |
269 | return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) | |
270 | } | |
271 | t.handleSettings(sf) | |
272 | ||
273 | go func() { | |
274 | t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst) | |
275 | t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler | |
276 | if err := t.loopy.run(); err != nil { | |
277 | errorf("transport: loopyWriter.run returning. Err: %v", err) | |
278 | } | |
279 | t.conn.Close() | |
280 | close(t.writerDone) | |
281 | }() | |
15c0b25d | 282 | go t.keepalive() |
15c0b25d AP |
283 | return t, nil |
284 | } | |
285 | ||
286 | // operateHeader takes action on the decoded headers. | |
107c1cdb ND |
287 | func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) { |
288 | streamID := frame.Header().StreamID | |
289 | state := decodeState{serverSide: true} | |
290 | if err := state.decodeHeader(frame); err != nil { | |
291 | if se, ok := status.FromError(err); ok { | |
292 | t.controlBuf.put(&cleanupStream{ | |
293 | streamID: streamID, | |
294 | rst: true, | |
295 | rstCode: statusCodeConvTab[se.Code()], | |
296 | onWrite: func() {}, | |
297 | }) | |
15c0b25d | 298 | } |
107c1cdb | 299 | return false |
15c0b25d AP |
300 | } |
301 | ||
107c1cdb ND |
302 | buf := newRecvBuffer() |
303 | s := &Stream{ | |
304 | id: streamID, | |
305 | st: t, | |
306 | buf: buf, | |
307 | fc: &inFlow{limit: uint32(t.initialWindowSize)}, | |
308 | recvCompress: state.encoding, | |
309 | method: state.method, | |
310 | contentSubtype: state.contentSubtype, | |
311 | } | |
15c0b25d AP |
312 | if frame.StreamEnded() { |
313 | // s is just created by the caller. No lock needed. | |
314 | s.state = streamReadDone | |
315 | } | |
15c0b25d AP |
316 | if state.timeoutSet { |
317 | s.ctx, s.cancel = context.WithTimeout(t.ctx, state.timeout) | |
318 | } else { | |
319 | s.ctx, s.cancel = context.WithCancel(t.ctx) | |
320 | } | |
321 | pr := &peer.Peer{ | |
322 | Addr: t.remoteAddr, | |
323 | } | |
324 | // Attach Auth info if there is any. | |
325 | if t.authInfo != nil { | |
326 | pr.AuthInfo = t.authInfo | |
327 | } | |
328 | s.ctx = peer.NewContext(s.ctx, pr) | |
15c0b25d AP |
329 | // Attach the received metadata to the context. |
330 | if len(state.mdata) > 0 { | |
331 | s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata) | |
332 | } | |
107c1cdb ND |
333 | if state.statsTags != nil { |
334 | s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags) | |
335 | } | |
336 | if state.statsTrace != nil { | |
337 | s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace) | |
15c0b25d | 338 | } |
15c0b25d AP |
339 | if t.inTapHandle != nil { |
340 | var err error | |
341 | info := &tap.Info{ | |
342 | FullMethodName: state.method, | |
343 | } | |
344 | s.ctx, err = t.inTapHandle(s.ctx, info) | |
345 | if err != nil { | |
346 | warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err) | |
107c1cdb ND |
347 | t.controlBuf.put(&cleanupStream{ |
348 | streamID: s.id, | |
349 | rst: true, | |
350 | rstCode: http2.ErrCodeRefusedStream, | |
351 | onWrite: func() {}, | |
352 | }) | |
353 | return false | |
15c0b25d AP |
354 | } |
355 | } | |
356 | t.mu.Lock() | |
357 | if t.state != reachable { | |
358 | t.mu.Unlock() | |
107c1cdb | 359 | return false |
15c0b25d AP |
360 | } |
361 | if uint32(len(t.activeStreams)) >= t.maxStreams { | |
362 | t.mu.Unlock() | |
107c1cdb ND |
363 | t.controlBuf.put(&cleanupStream{ |
364 | streamID: streamID, | |
365 | rst: true, | |
366 | rstCode: http2.ErrCodeRefusedStream, | |
367 | onWrite: func() {}, | |
368 | }) | |
369 | return false | |
15c0b25d | 370 | } |
107c1cdb | 371 | if streamID%2 != 1 || streamID <= t.maxStreamID { |
15c0b25d AP |
372 | t.mu.Unlock() |
373 | // illegal gRPC stream id. | |
107c1cdb | 374 | errorf("transport: http2Server.HandleStreams received an illegal stream id: %v", streamID) |
15c0b25d AP |
375 | return true |
376 | } | |
107c1cdb ND |
377 | t.maxStreamID = streamID |
378 | t.activeStreams[streamID] = s | |
15c0b25d AP |
379 | if len(t.activeStreams) == 1 { |
380 | t.idle = time.Time{} | |
381 | } | |
382 | t.mu.Unlock() | |
107c1cdb ND |
383 | if channelz.IsOn() { |
384 | atomic.AddInt64(&t.czData.streamsStarted, 1) | |
385 | atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano()) | |
386 | } | |
15c0b25d AP |
387 | s.requestRead = func(n int) { |
388 | t.adjustWindow(s, uint32(n)) | |
389 | } | |
390 | s.ctx = traceCtx(s.ctx, s.method) | |
391 | if t.stats != nil { | |
392 | s.ctx = t.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) | |
393 | inHeader := &stats.InHeader{ | |
394 | FullMethod: s.method, | |
395 | RemoteAddr: t.remoteAddr, | |
396 | LocalAddr: t.localAddr, | |
397 | Compression: s.recvCompress, | |
398 | WireLength: int(frame.Header().Length), | |
399 | } | |
400 | t.stats.HandleRPC(s.ctx, inHeader) | |
401 | } | |
107c1cdb ND |
402 | s.ctxDone = s.ctx.Done() |
403 | s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone) | |
404 | s.trReader = &transportReader{ | |
405 | reader: &recvBufferReader{ | |
406 | ctx: s.ctx, | |
407 | ctxDone: s.ctxDone, | |
408 | recv: s.buf, | |
409 | }, | |
410 | windowHandler: func(n int) { | |
411 | t.updateWindow(s, uint32(n)) | |
412 | }, | |
413 | } | |
414 | // Register the stream with loopy. | |
415 | t.controlBuf.put(®isterStream{ | |
416 | streamID: s.id, | |
417 | wq: s.wq, | |
418 | }) | |
15c0b25d | 419 | handle(s) |
107c1cdb | 420 | return false |
15c0b25d AP |
421 | } |
422 | ||
423 | // HandleStreams receives incoming streams using the given handler. This is | |
424 | // typically run in a separate goroutine. | |
425 | // traceCtx attaches trace to ctx and returns the new context. | |
426 | func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { | |
107c1cdb | 427 | defer close(t.readerDone) |
15c0b25d | 428 | for { |
107c1cdb | 429 | frame, err := t.framer.fr.ReadFrame() |
15c0b25d AP |
430 | atomic.StoreUint32(&t.activity, 1) |
431 | if err != nil { | |
432 | if se, ok := err.(http2.StreamError); ok { | |
107c1cdb | 433 | warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se) |
15c0b25d AP |
434 | t.mu.Lock() |
435 | s := t.activeStreams[se.StreamID] | |
436 | t.mu.Unlock() | |
437 | if s != nil { | |
107c1cdb ND |
438 | t.closeStream(s, true, se.Code, nil, false) |
439 | } else { | |
440 | t.controlBuf.put(&cleanupStream{ | |
441 | streamID: se.StreamID, | |
442 | rst: true, | |
443 | rstCode: se.Code, | |
444 | onWrite: func() {}, | |
445 | }) | |
15c0b25d | 446 | } |
15c0b25d AP |
447 | continue |
448 | } | |
449 | if err == io.EOF || err == io.ErrUnexpectedEOF { | |
450 | t.Close() | |
451 | return | |
452 | } | |
453 | warningf("transport: http2Server.HandleStreams failed to read frame: %v", err) | |
454 | t.Close() | |
455 | return | |
456 | } | |
457 | switch frame := frame.(type) { | |
458 | case *http2.MetaHeadersFrame: | |
459 | if t.operateHeaders(frame, handle, traceCtx) { | |
460 | t.Close() | |
461 | break | |
462 | } | |
463 | case *http2.DataFrame: | |
464 | t.handleData(frame) | |
465 | case *http2.RSTStreamFrame: | |
466 | t.handleRSTStream(frame) | |
467 | case *http2.SettingsFrame: | |
468 | t.handleSettings(frame) | |
469 | case *http2.PingFrame: | |
470 | t.handlePing(frame) | |
471 | case *http2.WindowUpdateFrame: | |
472 | t.handleWindowUpdate(frame) | |
473 | case *http2.GoAwayFrame: | |
474 | // TODO: Handle GoAway from the client appropriately. | |
475 | default: | |
476 | errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) | |
477 | } | |
478 | } | |
479 | } | |
480 | ||
481 | func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { | |
482 | t.mu.Lock() | |
483 | defer t.mu.Unlock() | |
484 | if t.activeStreams == nil { | |
485 | // The transport is closing. | |
486 | return nil, false | |
487 | } | |
488 | s, ok := t.activeStreams[f.Header().StreamID] | |
489 | if !ok { | |
490 | // The stream is already done. | |
491 | return nil, false | |
492 | } | |
493 | return s, true | |
494 | } | |
495 | ||
496 | // adjustWindow sends out extra window update over the initial window size | |
497 | // of stream if the application is requesting data larger in size than | |
498 | // the window. | |
499 | func (t *http2Server) adjustWindow(s *Stream, n uint32) { | |
15c0b25d | 500 | if w := s.fc.maybeAdjust(n); w > 0 { |
107c1cdb | 501 | t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w}) |
15c0b25d | 502 | } |
107c1cdb | 503 | |
15c0b25d AP |
504 | } |
505 | ||
506 | // updateWindow adjusts the inbound quota for the stream and the transport. | |
507 | // Window updates will deliver to the controller for sending when | |
508 | // the cumulative quota exceeds the corresponding threshold. | |
509 | func (t *http2Server) updateWindow(s *Stream, n uint32) { | |
15c0b25d | 510 | if w := s.fc.onRead(n); w > 0 { |
107c1cdb ND |
511 | t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, |
512 | increment: w, | |
513 | }) | |
15c0b25d AP |
514 | } |
515 | } | |
516 | ||
517 | // updateFlowControl updates the incoming flow control windows | |
518 | // for the transport and the stream based on the current bdp | |
519 | // estimation. | |
520 | func (t *http2Server) updateFlowControl(n uint32) { | |
521 | t.mu.Lock() | |
522 | for _, s := range t.activeStreams { | |
523 | s.fc.newLimit(n) | |
524 | } | |
525 | t.initialWindowSize = int32(n) | |
526 | t.mu.Unlock() | |
107c1cdb ND |
527 | t.controlBuf.put(&outgoingWindowUpdate{ |
528 | streamID: 0, | |
529 | increment: t.fc.newLimit(n), | |
530 | }) | |
531 | t.controlBuf.put(&outgoingSettings{ | |
15c0b25d AP |
532 | ss: []http2.Setting{ |
533 | { | |
534 | ID: http2.SettingInitialWindowSize, | |
107c1cdb | 535 | Val: n, |
15c0b25d AP |
536 | }, |
537 | }, | |
538 | }) | |
539 | ||
540 | } | |
541 | ||
542 | func (t *http2Server) handleData(f *http2.DataFrame) { | |
543 | size := f.Header().Length | |
544 | var sendBDPPing bool | |
545 | if t.bdpEst != nil { | |
107c1cdb | 546 | sendBDPPing = t.bdpEst.add(size) |
15c0b25d AP |
547 | } |
548 | // Decouple connection's flow control from application's read. | |
549 | // An update on connection's flow control should not depend on | |
550 | // whether user application has read the data or not. Such a | |
551 | // restriction is already imposed on the stream's flow control, | |
552 | // and therefore the sender will be blocked anyways. | |
553 | // Decoupling the connection flow control will prevent other | |
554 | // active(fast) streams from starving in presence of slow or | |
555 | // inactive streams. | |
107c1cdb ND |
556 | if w := t.fc.onData(size); w > 0 { |
557 | t.controlBuf.put(&outgoingWindowUpdate{ | |
558 | streamID: 0, | |
559 | increment: w, | |
560 | }) | |
561 | } | |
15c0b25d | 562 | if sendBDPPing { |
107c1cdb ND |
563 | // Avoid excessive ping detection (e.g. in an L7 proxy) |
564 | // by sending a window update prior to the BDP ping. | |
565 | if w := t.fc.reset(); w > 0 { | |
566 | t.controlBuf.put(&outgoingWindowUpdate{ | |
567 | streamID: 0, | |
568 | increment: w, | |
569 | }) | |
15c0b25d | 570 | } |
107c1cdb | 571 | t.controlBuf.put(bdpPing) |
15c0b25d AP |
572 | } |
573 | // Select the right stream to dispatch. | |
574 | s, ok := t.getStream(f) | |
575 | if !ok { | |
576 | return | |
577 | } | |
578 | if size > 0 { | |
107c1cdb ND |
579 | if err := s.fc.onData(size); err != nil { |
580 | t.closeStream(s, true, http2.ErrCodeFlowControl, nil, false) | |
15c0b25d AP |
581 | return |
582 | } | |
583 | if f.Header().Flags.Has(http2.FlagDataPadded) { | |
107c1cdb ND |
584 | if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 { |
585 | t.controlBuf.put(&outgoingWindowUpdate{s.id, w}) | |
15c0b25d AP |
586 | } |
587 | } | |
15c0b25d AP |
588 | // TODO(bradfitz, zhaoq): A copy is required here because there is no |
589 | // guarantee f.Data() is consumed before the arrival of next frame. | |
590 | // Can this copy be eliminated? | |
591 | if len(f.Data()) > 0 { | |
592 | data := make([]byte, len(f.Data())) | |
593 | copy(data, f.Data()) | |
594 | s.write(recvMsg{data: data}) | |
595 | } | |
596 | } | |
597 | if f.Header().Flags.Has(http2.FlagDataEndStream) { | |
598 | // Received the end of stream from the client. | |
107c1cdb | 599 | s.compareAndSwapState(streamActive, streamReadDone) |
15c0b25d AP |
600 | s.write(recvMsg{err: io.EOF}) |
601 | } | |
602 | } | |
603 | ||
604 | func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) { | |
605 | s, ok := t.getStream(f) | |
606 | if !ok { | |
607 | return | |
608 | } | |
107c1cdb | 609 | t.closeStream(s, false, 0, nil, false) |
15c0b25d AP |
610 | } |
611 | ||
612 | func (t *http2Server) handleSettings(f *http2.SettingsFrame) { | |
613 | if f.IsAck() { | |
614 | return | |
615 | } | |
616 | var ss []http2.Setting | |
107c1cdb | 617 | var updateFuncs []func() |
15c0b25d | 618 | f.ForeachSetting(func(s http2.Setting) error { |
107c1cdb ND |
619 | switch s.ID { |
620 | case http2.SettingMaxHeaderListSize: | |
621 | updateFuncs = append(updateFuncs, func() { | |
622 | t.maxSendHeaderListSize = new(uint32) | |
623 | *t.maxSendHeaderListSize = s.Val | |
624 | }) | |
625 | default: | |
626 | ss = append(ss, s) | |
627 | } | |
15c0b25d AP |
628 | return nil |
629 | }) | |
107c1cdb ND |
630 | t.controlBuf.executeAndPut(func(interface{}) bool { |
631 | for _, f := range updateFuncs { | |
632 | f() | |
633 | } | |
634 | return true | |
635 | }, &incomingSettings{ | |
636 | ss: ss, | |
637 | }) | |
15c0b25d AP |
638 | } |
639 | ||
640 | const ( | |
641 | maxPingStrikes = 2 | |
642 | defaultPingTimeout = 2 * time.Hour | |
643 | ) | |
644 | ||
645 | func (t *http2Server) handlePing(f *http2.PingFrame) { | |
646 | if f.IsAck() { | |
647 | if f.Data == goAwayPing.data && t.drainChan != nil { | |
648 | close(t.drainChan) | |
649 | return | |
650 | } | |
651 | // Maybe it's a BDP ping. | |
652 | if t.bdpEst != nil { | |
653 | t.bdpEst.calculate(f.Data) | |
654 | } | |
655 | return | |
656 | } | |
657 | pingAck := &ping{ack: true} | |
658 | copy(pingAck.data[:], f.Data[:]) | |
659 | t.controlBuf.put(pingAck) | |
660 | ||
661 | now := time.Now() | |
662 | defer func() { | |
663 | t.lastPingAt = now | |
664 | }() | |
665 | // A reset ping strikes means that we don't need to check for policy | |
666 | // violation for this ping and the pingStrikes counter should be set | |
667 | // to 0. | |
668 | if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) { | |
669 | t.pingStrikes = 0 | |
670 | return | |
671 | } | |
672 | t.mu.Lock() | |
673 | ns := len(t.activeStreams) | |
674 | t.mu.Unlock() | |
675 | if ns < 1 && !t.kep.PermitWithoutStream { | |
676 | // Keepalive shouldn't be active thus, this new ping should | |
107c1cdb | 677 | // have come after at least defaultPingTimeout. |
15c0b25d AP |
678 | if t.lastPingAt.Add(defaultPingTimeout).After(now) { |
679 | t.pingStrikes++ | |
680 | } | |
681 | } else { | |
682 | // Check if keepalive policy is respected. | |
683 | if t.lastPingAt.Add(t.kep.MinTime).After(now) { | |
684 | t.pingStrikes++ | |
685 | } | |
686 | } | |
687 | ||
688 | if t.pingStrikes > maxPingStrikes { | |
689 | // Send goaway and close the connection. | |
107c1cdb | 690 | errorf("transport: Got too many pings from the client, closing the connection.") |
15c0b25d AP |
691 | t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) |
692 | } | |
693 | } | |
694 | ||
695 | func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { | |
107c1cdb ND |
696 | t.controlBuf.put(&incomingWindowUpdate{ |
697 | streamID: f.Header().StreamID, | |
698 | increment: f.Increment, | |
699 | }) | |
15c0b25d AP |
700 | } |
701 | ||
107c1cdb ND |
702 | func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField { |
703 | for k, vv := range md { | |
704 | if isReservedHeader(k) { | |
705 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | |
706 | continue | |
15c0b25d | 707 | } |
107c1cdb ND |
708 | for _, v := range vv { |
709 | headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) | |
15c0b25d | 710 | } |
107c1cdb ND |
711 | } |
712 | return headerFields | |
713 | } | |
714 | ||
715 | func (t *http2Server) checkForHeaderListSize(it interface{}) bool { | |
716 | if t.maxSendHeaderListSize == nil { | |
717 | return true | |
718 | } | |
719 | hdrFrame := it.(*headerFrame) | |
720 | var sz int64 | |
721 | for _, f := range hdrFrame.hf { | |
722 | if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { | |
723 | errorf("header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize) | |
724 | return false | |
15c0b25d AP |
725 | } |
726 | } | |
107c1cdb | 727 | return true |
15c0b25d AP |
728 | } |
729 | ||
730 | // WriteHeader sends the header metedata md back to the client. | |
731 | func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { | |
107c1cdb | 732 | if s.updateHeaderSent() || s.getState() == streamDone { |
15c0b25d AP |
733 | return ErrIllegalHeaderWrite |
734 | } | |
107c1cdb | 735 | s.hdrMu.Lock() |
15c0b25d AP |
736 | if md.Len() > 0 { |
737 | if s.header.Len() > 0 { | |
738 | s.header = metadata.Join(s.header, md) | |
739 | } else { | |
740 | s.header = md | |
741 | } | |
742 | } | |
107c1cdb ND |
743 | if err := t.writeHeaderLocked(s); err != nil { |
744 | s.hdrMu.Unlock() | |
15c0b25d AP |
745 | return err |
746 | } | |
107c1cdb ND |
747 | s.hdrMu.Unlock() |
748 | return nil | |
749 | } | |
750 | ||
751 | func (t *http2Server) writeHeaderLocked(s *Stream) error { | |
752 | // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields | |
753 | // first and create a slice of that exact size. | |
754 | headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. | |
755 | headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) | |
756 | headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) | |
15c0b25d | 757 | if s.sendCompress != "" { |
107c1cdb ND |
758 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) |
759 | } | |
760 | headerFields = appendHeaderFieldsFromMD(headerFields, s.header) | |
761 | success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{ | |
762 | streamID: s.id, | |
763 | hf: headerFields, | |
764 | endStream: false, | |
765 | onWrite: func() { | |
766 | atomic.StoreUint32(&t.resetPingStrikes, 1) | |
767 | }, | |
768 | }) | |
769 | if !success { | |
770 | if err != nil { | |
771 | return err | |
15c0b25d | 772 | } |
107c1cdb ND |
773 | t.closeStream(s, true, http2.ErrCodeInternal, nil, false) |
774 | return ErrHeaderListSizeLimitViolation | |
15c0b25d AP |
775 | } |
776 | if t.stats != nil { | |
107c1cdb ND |
777 | // Note: WireLength is not set in outHeader. |
778 | // TODO(mmukhi): Revisit this later, if needed. | |
779 | outHeader := &stats.OutHeader{} | |
15c0b25d AP |
780 | t.stats.HandleRPC(s.Context(), outHeader) |
781 | } | |
15c0b25d AP |
782 | return nil |
783 | } | |
784 | ||
785 | // WriteStatus sends stream status to the client and terminates the stream. | |
786 | // There is no further I/O operations being able to perform on this stream. | |
787 | // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early | |
788 | // OK is adopted. | |
789 | func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { | |
107c1cdb | 790 | if s.getState() == streamDone { |
15c0b25d AP |
791 | return nil |
792 | } | |
107c1cdb ND |
793 | s.hdrMu.Lock() |
794 | // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields | |
795 | // first and create a slice of that exact size. | |
796 | headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else. | |
797 | if !s.updateHeaderSent() { // No headers have been sent. | |
798 | if len(s.header) > 0 { // Send a separate header frame. | |
799 | if err := t.writeHeaderLocked(s); err != nil { | |
800 | s.hdrMu.Unlock() | |
801 | return err | |
802 | } | |
803 | } else { // Send a trailer only response. | |
804 | headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) | |
805 | headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) | |
806 | } | |
15c0b25d | 807 | } |
107c1cdb ND |
808 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) |
809 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) | |
15c0b25d AP |
810 | |
811 | if p := st.Proto(); p != nil && len(p.Details) > 0 { | |
812 | stBytes, err := proto.Marshal(p) | |
813 | if err != nil { | |
814 | // TODO: return error instead, when callers are able to handle it. | |
107c1cdb ND |
815 | grpclog.Errorf("transport: failed to marshal rpc status: %v, error: %v", p, err) |
816 | } else { | |
817 | headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status-details-bin", Value: encodeBinHeader(stBytes)}) | |
15c0b25d | 818 | } |
15c0b25d AP |
819 | } |
820 | ||
821 | // Attach the trailer metadata. | |
107c1cdb ND |
822 | headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer) |
823 | trailingHeader := &headerFrame{ | |
824 | streamID: s.id, | |
825 | hf: headerFields, | |
826 | endStream: true, | |
827 | onWrite: func() { | |
828 | atomic.StoreUint32(&t.resetPingStrikes, 1) | |
829 | }, | |
15c0b25d | 830 | } |
107c1cdb ND |
831 | s.hdrMu.Unlock() |
832 | success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader) | |
833 | if !success { | |
834 | if err != nil { | |
835 | return err | |
836 | } | |
837 | t.closeStream(s, true, http2.ErrCodeInternal, nil, false) | |
838 | return ErrHeaderListSizeLimitViolation | |
15c0b25d | 839 | } |
107c1cdb | 840 | t.closeStream(s, false, 0, trailingHeader, true) |
15c0b25d | 841 | if t.stats != nil { |
107c1cdb | 842 | t.stats.HandleRPC(s.Context(), &stats.OutTrailer{}) |
15c0b25d | 843 | } |
15c0b25d AP |
844 | return nil |
845 | } | |
846 | ||
847 | // Write converts the data into HTTP2 data frame and sends it out. Non-nil error | |
848 | // is returns if it fails (e.g., framing error, transport error). | |
107c1cdb ND |
849 | func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { |
850 | if !s.isHeaderSent() { // Headers haven't been written yet. | |
851 | if err := t.WriteHeader(s, nil); err != nil { | |
852 | // TODO(mmukhi, dfawley): Make sure this is the right code to return. | |
853 | return status.Errorf(codes.Internal, "transport: %v", err) | |
15c0b25d | 854 | } |
107c1cdb ND |
855 | } else { |
856 | // Writing headers checks for this condition. | |
857 | if s.getState() == streamDone { | |
858 | // TODO(mmukhi, dfawley): Should the server write also return io.EOF? | |
859 | s.cancel() | |
860 | select { | |
861 | case <-t.ctx.Done(): | |
862 | return ErrConnClosing | |
863 | default: | |
15c0b25d | 864 | } |
15c0b25d | 865 | return ContextErr(s.ctx.Err()) |
15c0b25d | 866 | } |
15c0b25d | 867 | } |
107c1cdb ND |
868 | // Add some data to header frame so that we can equally distribute bytes across frames. |
869 | emptyLen := http2MaxFrameLen - len(hdr) | |
870 | if emptyLen > len(data) { | |
871 | emptyLen = len(data) | |
872 | } | |
873 | hdr = append(hdr, data[:emptyLen]...) | |
874 | data = data[emptyLen:] | |
875 | df := &dataFrame{ | |
876 | streamID: s.id, | |
877 | h: hdr, | |
878 | d: data, | |
879 | onEachWrite: func() { | |
880 | atomic.StoreUint32(&t.resetPingStrikes, 1) | |
881 | }, | |
882 | } | |
883 | if err := s.wq.get(int32(len(hdr) + len(data))); err != nil { | |
884 | select { | |
885 | case <-t.ctx.Done(): | |
886 | return ErrConnClosing | |
887 | default: | |
15c0b25d | 888 | } |
107c1cdb | 889 | return ContextErr(s.ctx.Err()) |
15c0b25d | 890 | } |
107c1cdb | 891 | return t.controlBuf.put(df) |
15c0b25d AP |
892 | } |
893 | ||
894 | // keepalive running in a separate goroutine does the following: | |
895 | // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. | |
896 | // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. | |
897 | // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. | |
898 | // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection | |
899 | // after an additional duration of keepalive.Timeout. | |
900 | func (t *http2Server) keepalive() { | |
901 | p := &ping{} | |
902 | var pingSent bool | |
903 | maxIdle := time.NewTimer(t.kp.MaxConnectionIdle) | |
904 | maxAge := time.NewTimer(t.kp.MaxConnectionAge) | |
905 | keepalive := time.NewTimer(t.kp.Time) | |
906 | // NOTE: All exit paths of this function should reset their | |
107c1cdb | 907 | // respective timers. A failure to do so will cause the |
15c0b25d AP |
908 | // following clean-up to deadlock and eventually leak. |
909 | defer func() { | |
910 | if !maxIdle.Stop() { | |
911 | <-maxIdle.C | |
912 | } | |
913 | if !maxAge.Stop() { | |
914 | <-maxAge.C | |
915 | } | |
916 | if !keepalive.Stop() { | |
917 | <-keepalive.C | |
918 | } | |
919 | }() | |
920 | for { | |
921 | select { | |
922 | case <-maxIdle.C: | |
923 | t.mu.Lock() | |
924 | idle := t.idle | |
925 | if idle.IsZero() { // The connection is non-idle. | |
926 | t.mu.Unlock() | |
927 | maxIdle.Reset(t.kp.MaxConnectionIdle) | |
928 | continue | |
929 | } | |
930 | val := t.kp.MaxConnectionIdle - time.Since(idle) | |
931 | t.mu.Unlock() | |
932 | if val <= 0 { | |
933 | // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. | |
934 | // Gracefully close the connection. | |
935 | t.drain(http2.ErrCodeNo, []byte{}) | |
107c1cdb | 936 | // Resetting the timer so that the clean-up doesn't deadlock. |
15c0b25d AP |
937 | maxIdle.Reset(infinity) |
938 | return | |
939 | } | |
940 | maxIdle.Reset(val) | |
941 | case <-maxAge.C: | |
942 | t.drain(http2.ErrCodeNo, []byte{}) | |
943 | maxAge.Reset(t.kp.MaxConnectionAgeGrace) | |
944 | select { | |
945 | case <-maxAge.C: | |
946 | // Close the connection after grace period. | |
947 | t.Close() | |
107c1cdb | 948 | // Resetting the timer so that the clean-up doesn't deadlock. |
15c0b25d | 949 | maxAge.Reset(infinity) |
107c1cdb | 950 | case <-t.ctx.Done(): |
15c0b25d AP |
951 | } |
952 | return | |
953 | case <-keepalive.C: | |
954 | if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { | |
955 | pingSent = false | |
956 | keepalive.Reset(t.kp.Time) | |
957 | continue | |
958 | } | |
959 | if pingSent { | |
960 | t.Close() | |
107c1cdb | 961 | // Resetting the timer so that the clean-up doesn't deadlock. |
15c0b25d AP |
962 | keepalive.Reset(infinity) |
963 | return | |
964 | } | |
965 | pingSent = true | |
107c1cdb ND |
966 | if channelz.IsOn() { |
967 | atomic.AddInt64(&t.czData.kpCount, 1) | |
968 | } | |
15c0b25d AP |
969 | t.controlBuf.put(p) |
970 | keepalive.Reset(t.kp.Timeout) | |
107c1cdb | 971 | case <-t.ctx.Done(): |
15c0b25d AP |
972 | return |
973 | } | |
974 | } | |
975 | } | |
976 | ||
977 | // Close starts shutting down the http2Server transport. | |
978 | // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This | |
979 | // could cause some resource issue. Revisit this later. | |
107c1cdb | 980 | func (t *http2Server) Close() error { |
15c0b25d AP |
981 | t.mu.Lock() |
982 | if t.state == closing { | |
983 | t.mu.Unlock() | |
984 | return errors.New("transport: Close() was already called") | |
985 | } | |
986 | t.state = closing | |
987 | streams := t.activeStreams | |
988 | t.activeStreams = nil | |
989 | t.mu.Unlock() | |
107c1cdb ND |
990 | t.controlBuf.finish() |
991 | t.cancel() | |
992 | err := t.conn.Close() | |
993 | if channelz.IsOn() { | |
994 | channelz.RemoveEntry(t.channelzID) | |
995 | } | |
15c0b25d AP |
996 | // Cancel all active streams. |
997 | for _, s := range streams { | |
998 | s.cancel() | |
999 | } | |
1000 | if t.stats != nil { | |
1001 | connEnd := &stats.ConnEnd{} | |
1002 | t.stats.HandleConn(t.ctx, connEnd) | |
1003 | } | |
107c1cdb | 1004 | return err |
15c0b25d AP |
1005 | } |
1006 | ||
1007 | // closeStream clears the footprint of a stream when the stream is not needed | |
1008 | // any more. | |
107c1cdb ND |
1009 | func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) { |
1010 | if s.swapState(streamDone) == streamDone { | |
1011 | // If the stream was already done, return. | |
1012 | return | |
15c0b25d | 1013 | } |
15c0b25d AP |
1014 | // In case stream sending and receiving are invoked in separate |
1015 | // goroutines (e.g., bi-directional streaming), cancel needs to be | |
1016 | // called to interrupt the potential blocking on other goroutines. | |
1017 | s.cancel() | |
107c1cdb ND |
1018 | cleanup := &cleanupStream{ |
1019 | streamID: s.id, | |
1020 | rst: rst, | |
1021 | rstCode: rstCode, | |
1022 | onWrite: func() { | |
1023 | t.mu.Lock() | |
1024 | if t.activeStreams != nil { | |
1025 | delete(t.activeStreams, s.id) | |
1026 | if len(t.activeStreams) == 0 { | |
1027 | t.idle = time.Now() | |
1028 | } | |
1029 | } | |
1030 | t.mu.Unlock() | |
1031 | if channelz.IsOn() { | |
1032 | if eosReceived { | |
1033 | atomic.AddInt64(&t.czData.streamsSucceeded, 1) | |
1034 | } else { | |
1035 | atomic.AddInt64(&t.czData.streamsFailed, 1) | |
1036 | } | |
1037 | } | |
1038 | }, | |
1039 | } | |
1040 | if hdr != nil { | |
1041 | hdr.cleanup = cleanup | |
1042 | t.controlBuf.put(hdr) | |
1043 | } else { | |
1044 | t.controlBuf.put(cleanup) | |
15c0b25d | 1045 | } |
15c0b25d AP |
1046 | } |
1047 | ||
1048 | func (t *http2Server) RemoteAddr() net.Addr { | |
1049 | return t.remoteAddr | |
1050 | } | |
1051 | ||
1052 | func (t *http2Server) Drain() { | |
1053 | t.drain(http2.ErrCodeNo, []byte{}) | |
1054 | } | |
1055 | ||
1056 | func (t *http2Server) drain(code http2.ErrCode, debugData []byte) { | |
1057 | t.mu.Lock() | |
1058 | defer t.mu.Unlock() | |
1059 | if t.drainChan != nil { | |
1060 | return | |
1061 | } | |
1062 | t.drainChan = make(chan struct{}) | |
1063 | t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true}) | |
1064 | } | |
1065 | ||
107c1cdb ND |
1066 | var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}} |
1067 | ||
1068 | // Handles outgoing GoAway and returns true if loopy needs to put itself | |
1069 | // in draining mode. | |
1070 | func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { | |
1071 | t.mu.Lock() | |
1072 | if t.state == closing { // TODO(mmukhi): This seems unnecessary. | |
1073 | t.mu.Unlock() | |
1074 | // The transport is closing. | |
1075 | return false, ErrConnClosing | |
1076 | } | |
1077 | sid := t.maxStreamID | |
1078 | if !g.headsUp { | |
1079 | // Stop accepting more streams now. | |
1080 | t.state = draining | |
1081 | if len(t.activeStreams) == 0 { | |
1082 | g.closeConn = true | |
1083 | } | |
1084 | t.mu.Unlock() | |
1085 | if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil { | |
1086 | return false, err | |
1087 | } | |
1088 | if g.closeConn { | |
1089 | // Abruptly close the connection following the GoAway (via | |
1090 | // loopywriter). But flush out what's inside the buffer first. | |
1091 | t.framer.writer.Flush() | |
1092 | return false, fmt.Errorf("transport: Connection closing") | |
1093 | } | |
1094 | return true, nil | |
1095 | } | |
1096 | t.mu.Unlock() | |
1097 | // For a graceful close, send out a GoAway with stream ID of MaxUInt32, | |
1098 | // Follow that with a ping and wait for the ack to come back or a timer | |
1099 | // to expire. During this time accept new streams since they might have | |
1100 | // originated before the GoAway reaches the client. | |
1101 | // After getting the ack or timer expiration send out another GoAway this | |
1102 | // time with an ID of the max stream server intends to process. | |
1103 | if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil { | |
1104 | return false, err | |
1105 | } | |
1106 | if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil { | |
1107 | return false, err | |
1108 | } | |
1109 | go func() { | |
1110 | timer := time.NewTimer(time.Minute) | |
1111 | defer timer.Stop() | |
1112 | select { | |
1113 | case <-t.drainChan: | |
1114 | case <-timer.C: | |
1115 | case <-t.ctx.Done(): | |
1116 | return | |
1117 | } | |
1118 | t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData}) | |
1119 | }() | |
1120 | return false, nil | |
1121 | } | |
1122 | ||
1123 | func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric { | |
1124 | s := channelz.SocketInternalMetric{ | |
1125 | StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted), | |
1126 | StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded), | |
1127 | StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed), | |
1128 | MessagesSent: atomic.LoadInt64(&t.czData.msgSent), | |
1129 | MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv), | |
1130 | KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount), | |
1131 | LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)), | |
1132 | LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)), | |
1133 | LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)), | |
1134 | LocalFlowControlWindow: int64(t.fc.getSize()), | |
1135 | SocketOptions: channelz.GetSocketOption(t.conn), | |
1136 | LocalAddr: t.localAddr, | |
1137 | RemoteAddr: t.remoteAddr, | |
1138 | // RemoteName : | |
1139 | } | |
1140 | if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok { | |
1141 | s.Security = au.GetSecurityValue() | |
1142 | } | |
1143 | s.RemoteFlowControlWindow = t.getOutFlowWindow() | |
1144 | return &s | |
1145 | } | |
1146 | ||
1147 | func (t *http2Server) IncrMsgSent() { | |
1148 | atomic.AddInt64(&t.czData.msgSent, 1) | |
1149 | atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano()) | |
1150 | } | |
1151 | ||
1152 | func (t *http2Server) IncrMsgRecv() { | |
1153 | atomic.AddInt64(&t.czData.msgRecv, 1) | |
1154 | atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano()) | |
1155 | } | |
1156 | ||
1157 | func (t *http2Server) getOutFlowWindow() int64 { | |
1158 | resp := make(chan uint32) | |
1159 | timer := time.NewTimer(time.Second) | |
1160 | defer timer.Stop() | |
1161 | t.controlBuf.put(&outFlowControlSizeRequest{resp}) | |
1162 | select { | |
1163 | case sz := <-resp: | |
1164 | return int64(sz) | |
1165 | case <-t.ctxDone: | |
1166 | return -1 | |
1167 | case <-timer.C: | |
1168 | return -2 | |
1169 | } | |
1170 | } | |
15c0b25d AP |
1171 | |
1172 | func getJitter(v time.Duration) time.Duration { | |
1173 | if v == infinity { | |
1174 | return 0 | |
1175 | } | |
1176 | // Generate a jitter between +/- 10% of the value. | |
1177 | r := int64(v / 10) | |
107c1cdb | 1178 | j := grpcrand.Int63n(2*r) - r |
15c0b25d AP |
1179 | return time.Duration(j) |
1180 | } |