]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | /* |
2 | * | |
3 | * Copyright 2016 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | // This file is the implementation of a gRPC server using HTTP/2 which | |
20 | // uses the standard Go http2 Server implementation (via the | |
21 | // http.Handler interface), rather than speaking low-level HTTP/2 | |
22 | // frames itself. It is the implementation of *grpc.Server.ServeHTTP. | |
23 | ||
24 | package transport | |
25 | ||
26 | import ( | |
107c1cdb | 27 | "context" |
15c0b25d AP |
28 | "errors" |
29 | "fmt" | |
30 | "io" | |
31 | "net" | |
32 | "net/http" | |
33 | "strings" | |
34 | "sync" | |
35 | "time" | |
36 | ||
107c1cdb | 37 | "github.com/golang/protobuf/proto" |
15c0b25d AP |
38 | "golang.org/x/net/http2" |
39 | "google.golang.org/grpc/codes" | |
40 | "google.golang.org/grpc/credentials" | |
41 | "google.golang.org/grpc/metadata" | |
42 | "google.golang.org/grpc/peer" | |
107c1cdb | 43 | "google.golang.org/grpc/stats" |
15c0b25d AP |
44 | "google.golang.org/grpc/status" |
45 | ) | |
46 | ||
47 | // NewServerHandlerTransport returns a ServerTransport handling gRPC | |
48 | // from inside an http.Handler. It requires that the http Server | |
49 | // supports HTTP/2. | |
107c1cdb | 50 | func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) { |
15c0b25d AP |
51 | if r.ProtoMajor != 2 { |
52 | return nil, errors.New("gRPC requires HTTP/2") | |
53 | } | |
54 | if r.Method != "POST" { | |
55 | return nil, errors.New("invalid gRPC request method") | |
56 | } | |
107c1cdb ND |
57 | contentType := r.Header.Get("Content-Type") |
58 | // TODO: do we assume contentType is lowercase? we did before | |
59 | contentSubtype, validContentType := contentSubtype(contentType) | |
60 | if !validContentType { | |
15c0b25d AP |
61 | return nil, errors.New("invalid gRPC request content-type") |
62 | } | |
63 | if _, ok := w.(http.Flusher); !ok { | |
64 | return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher") | |
65 | } | |
66 | if _, ok := w.(http.CloseNotifier); !ok { | |
67 | return nil, errors.New("gRPC requires a ResponseWriter supporting http.CloseNotifier") | |
68 | } | |
69 | ||
70 | st := &serverHandlerTransport{ | |
107c1cdb ND |
71 | rw: w, |
72 | req: r, | |
73 | closedCh: make(chan struct{}), | |
74 | writes: make(chan func()), | |
75 | contentType: contentType, | |
76 | contentSubtype: contentSubtype, | |
77 | stats: stats, | |
15c0b25d AP |
78 | } |
79 | ||
80 | if v := r.Header.Get("grpc-timeout"); v != "" { | |
81 | to, err := decodeTimeout(v) | |
82 | if err != nil { | |
107c1cdb | 83 | return nil, status.Errorf(codes.Internal, "malformed time-out: %v", err) |
15c0b25d AP |
84 | } |
85 | st.timeoutSet = true | |
86 | st.timeout = to | |
87 | } | |
88 | ||
107c1cdb | 89 | metakv := []string{"content-type", contentType} |
15c0b25d AP |
90 | if r.Host != "" { |
91 | metakv = append(metakv, ":authority", r.Host) | |
92 | } | |
93 | for k, vv := range r.Header { | |
94 | k = strings.ToLower(k) | |
107c1cdb | 95 | if isReservedHeader(k) && !isWhitelistedHeader(k) { |
15c0b25d AP |
96 | continue |
97 | } | |
98 | for _, v := range vv { | |
99 | v, err := decodeMetadataHeader(k, v) | |
100 | if err != nil { | |
107c1cdb | 101 | return nil, status.Errorf(codes.Internal, "malformed binary metadata: %v", err) |
15c0b25d AP |
102 | } |
103 | metakv = append(metakv, k, v) | |
104 | } | |
105 | } | |
106 | st.headerMD = metadata.Pairs(metakv...) | |
107 | ||
108 | return st, nil | |
109 | } | |
110 | ||
111 | // serverHandlerTransport is an implementation of ServerTransport | |
112 | // which replies to exactly one gRPC request (exactly one HTTP request), | |
113 | // using the net/http.Handler interface. This http.Handler is guaranteed | |
114 | // at this point to be speaking over HTTP/2, so it's able to speak valid | |
115 | // gRPC. | |
116 | type serverHandlerTransport struct { | |
117 | rw http.ResponseWriter | |
118 | req *http.Request | |
119 | timeoutSet bool | |
120 | timeout time.Duration | |
121 | didCommonHeaders bool | |
122 | ||
123 | headerMD metadata.MD | |
124 | ||
125 | closeOnce sync.Once | |
126 | closedCh chan struct{} // closed on Close | |
127 | ||
128 | // writes is a channel of code to run serialized in the | |
129 | // ServeHTTP (HandleStreams) goroutine. The channel is closed | |
130 | // when WriteStatus is called. | |
131 | writes chan func() | |
107c1cdb ND |
132 | |
133 | // block concurrent WriteStatus calls | |
134 | // e.g. grpc/(*serverStream).SendMsg/RecvMsg | |
135 | writeStatusMu sync.Mutex | |
136 | ||
137 | // we just mirror the request content-type | |
138 | contentType string | |
139 | // we store both contentType and contentSubtype so we don't keep recreating them | |
140 | // TODO make sure this is consistent across handler_server and http2_server | |
141 | contentSubtype string | |
142 | ||
143 | stats stats.Handler | |
15c0b25d AP |
144 | } |
145 | ||
146 | func (ht *serverHandlerTransport) Close() error { | |
147 | ht.closeOnce.Do(ht.closeCloseChanOnce) | |
148 | return nil | |
149 | } | |
150 | ||
151 | func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) } | |
152 | ||
153 | func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) } | |
154 | ||
155 | // strAddr is a net.Addr backed by either a TCP "ip:port" string, or | |
156 | // the empty string if unknown. | |
157 | type strAddr string | |
158 | ||
159 | func (a strAddr) Network() string { | |
160 | if a != "" { | |
161 | // Per the documentation on net/http.Request.RemoteAddr, if this is | |
162 | // set, it's set to the IP:port of the peer (hence, TCP): | |
163 | // https://golang.org/pkg/net/http/#Request | |
164 | // | |
165 | // If we want to support Unix sockets later, we can | |
166 | // add our own grpc-specific convention within the | |
167 | // grpc codebase to set RemoteAddr to a different | |
168 | // format, or probably better: we can attach it to the | |
169 | // context and use that from serverHandlerTransport.RemoteAddr. | |
170 | return "tcp" | |
171 | } | |
172 | return "" | |
173 | } | |
174 | ||
175 | func (a strAddr) String() string { return string(a) } | |
176 | ||
177 | // do runs fn in the ServeHTTP goroutine. | |
178 | func (ht *serverHandlerTransport) do(fn func()) error { | |
179 | // Avoid a panic writing to closed channel. Imperfect but maybe good enough. | |
180 | select { | |
181 | case <-ht.closedCh: | |
182 | return ErrConnClosing | |
183 | default: | |
184 | select { | |
185 | case ht.writes <- fn: | |
186 | return nil | |
187 | case <-ht.closedCh: | |
188 | return ErrConnClosing | |
189 | } | |
15c0b25d AP |
190 | } |
191 | } | |
192 | ||
193 | func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error { | |
107c1cdb ND |
194 | ht.writeStatusMu.Lock() |
195 | defer ht.writeStatusMu.Unlock() | |
196 | ||
15c0b25d AP |
197 | err := ht.do(func() { |
198 | ht.writeCommonHeaders(s) | |
199 | ||
200 | // And flush, in case no header or body has been sent yet. | |
201 | // This forces a separation of headers and trailers if this is the | |
202 | // first call (for example, in end2end tests's TestNoService). | |
203 | ht.rw.(http.Flusher).Flush() | |
204 | ||
205 | h := ht.rw.Header() | |
206 | h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code())) | |
207 | if m := st.Message(); m != "" { | |
208 | h.Set("Grpc-Message", encodeGrpcMessage(m)) | |
209 | } | |
210 | ||
107c1cdb ND |
211 | if p := st.Proto(); p != nil && len(p.Details) > 0 { |
212 | stBytes, err := proto.Marshal(p) | |
213 | if err != nil { | |
214 | // TODO: return error instead, when callers are able to handle it. | |
215 | panic(err) | |
216 | } | |
217 | ||
218 | h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes)) | |
219 | } | |
15c0b25d AP |
220 | |
221 | if md := s.Trailer(); len(md) > 0 { | |
222 | for k, vv := range md { | |
223 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | |
224 | if isReservedHeader(k) { | |
225 | continue | |
226 | } | |
227 | for _, v := range vv { | |
228 | // http2 ResponseWriter mechanism to send undeclared Trailers after | |
229 | // the headers have possibly been written. | |
230 | h.Add(http2.TrailerPrefix+k, encodeMetadataHeader(k, v)) | |
231 | } | |
232 | } | |
233 | } | |
234 | }) | |
107c1cdb ND |
235 | |
236 | if err == nil { // transport has not been closed | |
237 | if ht.stats != nil { | |
238 | ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{}) | |
239 | } | |
240 | close(ht.writes) | |
241 | } | |
242 | ht.Close() | |
15c0b25d AP |
243 | return err |
244 | } | |
245 | ||
246 | // writeCommonHeaders sets common headers on the first write | |
247 | // call (Write, WriteHeader, or WriteStatus). | |
248 | func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) { | |
249 | if ht.didCommonHeaders { | |
250 | return | |
251 | } | |
252 | ht.didCommonHeaders = true | |
253 | ||
254 | h := ht.rw.Header() | |
255 | h["Date"] = nil // suppress Date to make tests happy; TODO: restore | |
107c1cdb | 256 | h.Set("Content-Type", ht.contentType) |
15c0b25d AP |
257 | |
258 | // Predeclare trailers we'll set later in WriteStatus (after the body). | |
259 | // This is a SHOULD in the HTTP RFC, and the way you add (known) | |
260 | // Trailers per the net/http.ResponseWriter contract. | |
261 | // See https://golang.org/pkg/net/http/#ResponseWriter | |
262 | // and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers | |
263 | h.Add("Trailer", "Grpc-Status") | |
264 | h.Add("Trailer", "Grpc-Message") | |
107c1cdb | 265 | h.Add("Trailer", "Grpc-Status-Details-Bin") |
15c0b25d AP |
266 | |
267 | if s.sendCompress != "" { | |
268 | h.Set("Grpc-Encoding", s.sendCompress) | |
269 | } | |
270 | } | |
271 | ||
107c1cdb | 272 | func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { |
15c0b25d AP |
273 | return ht.do(func() { |
274 | ht.writeCommonHeaders(s) | |
107c1cdb | 275 | ht.rw.Write(hdr) |
15c0b25d | 276 | ht.rw.Write(data) |
107c1cdb | 277 | ht.rw.(http.Flusher).Flush() |
15c0b25d AP |
278 | }) |
279 | } | |
280 | ||
281 | func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error { | |
107c1cdb | 282 | err := ht.do(func() { |
15c0b25d AP |
283 | ht.writeCommonHeaders(s) |
284 | h := ht.rw.Header() | |
285 | for k, vv := range md { | |
286 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | |
287 | if isReservedHeader(k) { | |
288 | continue | |
289 | } | |
290 | for _, v := range vv { | |
291 | v = encodeMetadataHeader(k, v) | |
292 | h.Add(k, v) | |
293 | } | |
294 | } | |
295 | ht.rw.WriteHeader(200) | |
296 | ht.rw.(http.Flusher).Flush() | |
297 | }) | |
107c1cdb ND |
298 | |
299 | if err == nil { | |
300 | if ht.stats != nil { | |
301 | ht.stats.HandleRPC(s.Context(), &stats.OutHeader{}) | |
302 | } | |
303 | } | |
304 | return err | |
15c0b25d AP |
305 | } |
306 | ||
307 | func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) { | |
308 | // With this transport type there will be exactly 1 stream: this HTTP request. | |
309 | ||
107c1cdb | 310 | ctx := ht.req.Context() |
15c0b25d AP |
311 | var cancel context.CancelFunc |
312 | if ht.timeoutSet { | |
107c1cdb | 313 | ctx, cancel = context.WithTimeout(ctx, ht.timeout) |
15c0b25d | 314 | } else { |
107c1cdb | 315 | ctx, cancel = context.WithCancel(ctx) |
15c0b25d AP |
316 | } |
317 | ||
318 | // requestOver is closed when either the request's context is done | |
319 | // or the status has been written via WriteStatus. | |
320 | requestOver := make(chan struct{}) | |
321 | ||
322 | // clientGone receives a single value if peer is gone, either | |
323 | // because the underlying connection is dead or because the | |
324 | // peer sends an http2 RST_STREAM. | |
325 | clientGone := ht.rw.(http.CloseNotifier).CloseNotify() | |
326 | go func() { | |
327 | select { | |
328 | case <-requestOver: | |
15c0b25d AP |
329 | case <-ht.closedCh: |
330 | case <-clientGone: | |
331 | } | |
332 | cancel() | |
107c1cdb | 333 | ht.Close() |
15c0b25d AP |
334 | }() |
335 | ||
336 | req := ht.req | |
337 | ||
338 | s := &Stream{ | |
107c1cdb ND |
339 | id: 0, // irrelevant |
340 | requestRead: func(int) {}, | |
341 | cancel: cancel, | |
342 | buf: newRecvBuffer(), | |
343 | st: ht, | |
344 | method: req.URL.Path, | |
345 | recvCompress: req.Header.Get("grpc-encoding"), | |
346 | contentSubtype: ht.contentSubtype, | |
15c0b25d AP |
347 | } |
348 | pr := &peer.Peer{ | |
349 | Addr: ht.RemoteAddr(), | |
350 | } | |
351 | if req.TLS != nil { | |
352 | pr.AuthInfo = credentials.TLSInfo{State: *req.TLS} | |
353 | } | |
354 | ctx = metadata.NewIncomingContext(ctx, ht.headerMD) | |
107c1cdb ND |
355 | s.ctx = peer.NewContext(ctx, pr) |
356 | if ht.stats != nil { | |
357 | s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) | |
358 | inHeader := &stats.InHeader{ | |
359 | FullMethod: s.method, | |
360 | RemoteAddr: ht.RemoteAddr(), | |
361 | Compression: s.recvCompress, | |
362 | } | |
363 | ht.stats.HandleRPC(s.ctx, inHeader) | |
364 | } | |
15c0b25d | 365 | s.trReader = &transportReader{ |
107c1cdb | 366 | reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf}, |
15c0b25d AP |
367 | windowHandler: func(int) {}, |
368 | } | |
369 | ||
370 | // readerDone is closed when the Body.Read-ing goroutine exits. | |
371 | readerDone := make(chan struct{}) | |
372 | go func() { | |
373 | defer close(readerDone) | |
374 | ||
375 | // TODO: minimize garbage, optimize recvBuffer code/ownership | |
376 | const readSize = 8196 | |
377 | for buf := make([]byte, readSize); ; { | |
378 | n, err := req.Body.Read(buf) | |
379 | if n > 0 { | |
380 | s.buf.put(recvMsg{data: buf[:n:n]}) | |
381 | buf = buf[n:] | |
382 | } | |
383 | if err != nil { | |
384 | s.buf.put(recvMsg{err: mapRecvMsgError(err)}) | |
385 | return | |
386 | } | |
387 | if len(buf) == 0 { | |
388 | buf = make([]byte, readSize) | |
389 | } | |
390 | } | |
391 | }() | |
392 | ||
393 | // startStream is provided by the *grpc.Server's serveStreams. | |
394 | // It starts a goroutine serving s and exits immediately. | |
395 | // The goroutine that is started is the one that then calls | |
396 | // into ht, calling WriteHeader, Write, WriteStatus, Close, etc. | |
397 | startStream(s) | |
398 | ||
399 | ht.runStream() | |
400 | close(requestOver) | |
401 | ||
402 | // Wait for reading goroutine to finish. | |
403 | req.Body.Close() | |
404 | <-readerDone | |
405 | } | |
406 | ||
407 | func (ht *serverHandlerTransport) runStream() { | |
408 | for { | |
409 | select { | |
410 | case fn, ok := <-ht.writes: | |
411 | if !ok { | |
412 | return | |
413 | } | |
414 | fn() | |
415 | case <-ht.closedCh: | |
416 | return | |
417 | } | |
418 | } | |
419 | } | |
420 | ||
107c1cdb ND |
421 | func (ht *serverHandlerTransport) IncrMsgSent() {} |
422 | ||
423 | func (ht *serverHandlerTransport) IncrMsgRecv() {} | |
424 | ||
15c0b25d AP |
425 | func (ht *serverHandlerTransport) Drain() { |
426 | panic("Drain() is not implemented") | |
427 | } | |
428 | ||
429 | // mapRecvMsgError returns the non-nil err into the appropriate | |
430 | // error value as expected by callers of *grpc.parser.recvMsg. | |
431 | // In particular, in can only be: | |
432 | // * io.EOF | |
433 | // * io.ErrUnexpectedEOF | |
434 | // * of type transport.ConnectionError | |
107c1cdb | 435 | // * an error from the status package |
15c0b25d AP |
436 | func mapRecvMsgError(err error) error { |
437 | if err == io.EOF || err == io.ErrUnexpectedEOF { | |
438 | return err | |
439 | } | |
440 | if se, ok := err.(http2.StreamError); ok { | |
441 | if code, ok := http2ErrConvTab[se.Code]; ok { | |
107c1cdb | 442 | return status.Error(code, se.Error()) |
15c0b25d AP |
443 | } |
444 | } | |
107c1cdb ND |
445 | if strings.Contains(err.Error(), "body closed by handler") { |
446 | return status.Error(codes.Canceled, err.Error()) | |
447 | } | |
15c0b25d AP |
448 | return connectionErrorf(true, err, err.Error()) |
449 | } |