diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/transport/handler_server.go')
-rw-r--r-- | vendor/google.golang.org/grpc/internal/transport/handler_server.go | 449 |
1 files changed, 449 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/internal/transport/handler_server.go b/vendor/google.golang.org/grpc/internal/transport/handler_server.go new file mode 100644 index 0000000..73b41ea --- /dev/null +++ b/vendor/google.golang.org/grpc/internal/transport/handler_server.go | |||
@@ -0,0 +1,449 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2016 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | // This file is the implementation of a gRPC server using HTTP/2 which | ||
20 | // uses the standard Go http2 Server implementation (via the | ||
21 | // http.Handler interface), rather than speaking low-level HTTP/2 | ||
22 | // frames itself. It is the implementation of *grpc.Server.ServeHTTP. | ||
23 | |||
24 | package transport | ||
25 | |||
26 | import ( | ||
27 | "context" | ||
28 | "errors" | ||
29 | "fmt" | ||
30 | "io" | ||
31 | "net" | ||
32 | "net/http" | ||
33 | "strings" | ||
34 | "sync" | ||
35 | "time" | ||
36 | |||
37 | "github.com/golang/protobuf/proto" | ||
38 | "golang.org/x/net/http2" | ||
39 | "google.golang.org/grpc/codes" | ||
40 | "google.golang.org/grpc/credentials" | ||
41 | "google.golang.org/grpc/metadata" | ||
42 | "google.golang.org/grpc/peer" | ||
43 | "google.golang.org/grpc/stats" | ||
44 | "google.golang.org/grpc/status" | ||
45 | ) | ||
46 | |||
47 | // NewServerHandlerTransport returns a ServerTransport handling gRPC | ||
48 | // from inside an http.Handler. It requires that the http Server | ||
49 | // supports HTTP/2. | ||
50 | func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats stats.Handler) (ServerTransport, error) { | ||
51 | if r.ProtoMajor != 2 { | ||
52 | return nil, errors.New("gRPC requires HTTP/2") | ||
53 | } | ||
54 | if r.Method != "POST" { | ||
55 | return nil, errors.New("invalid gRPC request method") | ||
56 | } | ||
57 | contentType := r.Header.Get("Content-Type") | ||
58 | // TODO: do we assume contentType is lowercase? we did before | ||
59 | contentSubtype, validContentType := contentSubtype(contentType) | ||
60 | if !validContentType { | ||
61 | return nil, errors.New("invalid gRPC request content-type") | ||
62 | } | ||
63 | if _, ok := w.(http.Flusher); !ok { | ||
64 | return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher") | ||
65 | } | ||
66 | if _, ok := w.(http.CloseNotifier); !ok { | ||
67 | return nil, errors.New("gRPC requires a ResponseWriter supporting http.CloseNotifier") | ||
68 | } | ||
69 | |||
70 | st := &serverHandlerTransport{ | ||
71 | rw: w, | ||
72 | req: r, | ||
73 | closedCh: make(chan struct{}), | ||
74 | writes: make(chan func()), | ||
75 | contentType: contentType, | ||
76 | contentSubtype: contentSubtype, | ||
77 | stats: stats, | ||
78 | } | ||
79 | |||
80 | if v := r.Header.Get("grpc-timeout"); v != "" { | ||
81 | to, err := decodeTimeout(v) | ||
82 | if err != nil { | ||
83 | return nil, status.Errorf(codes.Internal, "malformed time-out: %v", err) | ||
84 | } | ||
85 | st.timeoutSet = true | ||
86 | st.timeout = to | ||
87 | } | ||
88 | |||
89 | metakv := []string{"content-type", contentType} | ||
90 | if r.Host != "" { | ||
91 | metakv = append(metakv, ":authority", r.Host) | ||
92 | } | ||
93 | for k, vv := range r.Header { | ||
94 | k = strings.ToLower(k) | ||
95 | if isReservedHeader(k) && !isWhitelistedHeader(k) { | ||
96 | continue | ||
97 | } | ||
98 | for _, v := range vv { | ||
99 | v, err := decodeMetadataHeader(k, v) | ||
100 | if err != nil { | ||
101 | return nil, status.Errorf(codes.Internal, "malformed binary metadata: %v", err) | ||
102 | } | ||
103 | metakv = append(metakv, k, v) | ||
104 | } | ||
105 | } | ||
106 | st.headerMD = metadata.Pairs(metakv...) | ||
107 | |||
108 | return st, nil | ||
109 | } | ||
110 | |||
111 | // serverHandlerTransport is an implementation of ServerTransport | ||
112 | // which replies to exactly one gRPC request (exactly one HTTP request), | ||
113 | // using the net/http.Handler interface. This http.Handler is guaranteed | ||
114 | // at this point to be speaking over HTTP/2, so it's able to speak valid | ||
115 | // gRPC. | ||
116 | type serverHandlerTransport struct { | ||
117 | rw http.ResponseWriter | ||
118 | req *http.Request | ||
119 | timeoutSet bool | ||
120 | timeout time.Duration | ||
121 | didCommonHeaders bool | ||
122 | |||
123 | headerMD metadata.MD | ||
124 | |||
125 | closeOnce sync.Once | ||
126 | closedCh chan struct{} // closed on Close | ||
127 | |||
128 | // writes is a channel of code to run serialized in the | ||
129 | // ServeHTTP (HandleStreams) goroutine. The channel is closed | ||
130 | // when WriteStatus is called. | ||
131 | writes chan func() | ||
132 | |||
133 | // block concurrent WriteStatus calls | ||
134 | // e.g. grpc/(*serverStream).SendMsg/RecvMsg | ||
135 | writeStatusMu sync.Mutex | ||
136 | |||
137 | // we just mirror the request content-type | ||
138 | contentType string | ||
139 | // we store both contentType and contentSubtype so we don't keep recreating them | ||
140 | // TODO make sure this is consistent across handler_server and http2_server | ||
141 | contentSubtype string | ||
142 | |||
143 | stats stats.Handler | ||
144 | } | ||
145 | |||
146 | func (ht *serverHandlerTransport) Close() error { | ||
147 | ht.closeOnce.Do(ht.closeCloseChanOnce) | ||
148 | return nil | ||
149 | } | ||
150 | |||
151 | func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) } | ||
152 | |||
153 | func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) } | ||
154 | |||
155 | // strAddr is a net.Addr backed by either a TCP "ip:port" string, or | ||
156 | // the empty string if unknown. | ||
157 | type strAddr string | ||
158 | |||
159 | func (a strAddr) Network() string { | ||
160 | if a != "" { | ||
161 | // Per the documentation on net/http.Request.RemoteAddr, if this is | ||
162 | // set, it's set to the IP:port of the peer (hence, TCP): | ||
163 | // https://golang.org/pkg/net/http/#Request | ||
164 | // | ||
165 | // If we want to support Unix sockets later, we can | ||
166 | // add our own grpc-specific convention within the | ||
167 | // grpc codebase to set RemoteAddr to a different | ||
168 | // format, or probably better: we can attach it to the | ||
169 | // context and use that from serverHandlerTransport.RemoteAddr. | ||
170 | return "tcp" | ||
171 | } | ||
172 | return "" | ||
173 | } | ||
174 | |||
175 | func (a strAddr) String() string { return string(a) } | ||
176 | |||
177 | // do runs fn in the ServeHTTP goroutine. | ||
178 | func (ht *serverHandlerTransport) do(fn func()) error { | ||
179 | // Avoid a panic writing to closed channel. Imperfect but maybe good enough. | ||
180 | select { | ||
181 | case <-ht.closedCh: | ||
182 | return ErrConnClosing | ||
183 | default: | ||
184 | select { | ||
185 | case ht.writes <- fn: | ||
186 | return nil | ||
187 | case <-ht.closedCh: | ||
188 | return ErrConnClosing | ||
189 | } | ||
190 | } | ||
191 | } | ||
192 | |||
193 | func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error { | ||
194 | ht.writeStatusMu.Lock() | ||
195 | defer ht.writeStatusMu.Unlock() | ||
196 | |||
197 | err := ht.do(func() { | ||
198 | ht.writeCommonHeaders(s) | ||
199 | |||
200 | // And flush, in case no header or body has been sent yet. | ||
201 | // This forces a separation of headers and trailers if this is the | ||
202 | // first call (for example, in end2end tests's TestNoService). | ||
203 | ht.rw.(http.Flusher).Flush() | ||
204 | |||
205 | h := ht.rw.Header() | ||
206 | h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code())) | ||
207 | if m := st.Message(); m != "" { | ||
208 | h.Set("Grpc-Message", encodeGrpcMessage(m)) | ||
209 | } | ||
210 | |||
211 | if p := st.Proto(); p != nil && len(p.Details) > 0 { | ||
212 | stBytes, err := proto.Marshal(p) | ||
213 | if err != nil { | ||
214 | // TODO: return error instead, when callers are able to handle it. | ||
215 | panic(err) | ||
216 | } | ||
217 | |||
218 | h.Set("Grpc-Status-Details-Bin", encodeBinHeader(stBytes)) | ||
219 | } | ||
220 | |||
221 | if md := s.Trailer(); len(md) > 0 { | ||
222 | for k, vv := range md { | ||
223 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | ||
224 | if isReservedHeader(k) { | ||
225 | continue | ||
226 | } | ||
227 | for _, v := range vv { | ||
228 | // http2 ResponseWriter mechanism to send undeclared Trailers after | ||
229 | // the headers have possibly been written. | ||
230 | h.Add(http2.TrailerPrefix+k, encodeMetadataHeader(k, v)) | ||
231 | } | ||
232 | } | ||
233 | } | ||
234 | }) | ||
235 | |||
236 | if err == nil { // transport has not been closed | ||
237 | if ht.stats != nil { | ||
238 | ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{}) | ||
239 | } | ||
240 | close(ht.writes) | ||
241 | } | ||
242 | ht.Close() | ||
243 | return err | ||
244 | } | ||
245 | |||
246 | // writeCommonHeaders sets common headers on the first write | ||
247 | // call (Write, WriteHeader, or WriteStatus). | ||
248 | func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) { | ||
249 | if ht.didCommonHeaders { | ||
250 | return | ||
251 | } | ||
252 | ht.didCommonHeaders = true | ||
253 | |||
254 | h := ht.rw.Header() | ||
255 | h["Date"] = nil // suppress Date to make tests happy; TODO: restore | ||
256 | h.Set("Content-Type", ht.contentType) | ||
257 | |||
258 | // Predeclare trailers we'll set later in WriteStatus (after the body). | ||
259 | // This is a SHOULD in the HTTP RFC, and the way you add (known) | ||
260 | // Trailers per the net/http.ResponseWriter contract. | ||
261 | // See https://golang.org/pkg/net/http/#ResponseWriter | ||
262 | // and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers | ||
263 | h.Add("Trailer", "Grpc-Status") | ||
264 | h.Add("Trailer", "Grpc-Message") | ||
265 | h.Add("Trailer", "Grpc-Status-Details-Bin") | ||
266 | |||
267 | if s.sendCompress != "" { | ||
268 | h.Set("Grpc-Encoding", s.sendCompress) | ||
269 | } | ||
270 | } | ||
271 | |||
272 | func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { | ||
273 | return ht.do(func() { | ||
274 | ht.writeCommonHeaders(s) | ||
275 | ht.rw.Write(hdr) | ||
276 | ht.rw.Write(data) | ||
277 | ht.rw.(http.Flusher).Flush() | ||
278 | }) | ||
279 | } | ||
280 | |||
281 | func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error { | ||
282 | err := ht.do(func() { | ||
283 | ht.writeCommonHeaders(s) | ||
284 | h := ht.rw.Header() | ||
285 | for k, vv := range md { | ||
286 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | ||
287 | if isReservedHeader(k) { | ||
288 | continue | ||
289 | } | ||
290 | for _, v := range vv { | ||
291 | v = encodeMetadataHeader(k, v) | ||
292 | h.Add(k, v) | ||
293 | } | ||
294 | } | ||
295 | ht.rw.WriteHeader(200) | ||
296 | ht.rw.(http.Flusher).Flush() | ||
297 | }) | ||
298 | |||
299 | if err == nil { | ||
300 | if ht.stats != nil { | ||
301 | ht.stats.HandleRPC(s.Context(), &stats.OutHeader{}) | ||
302 | } | ||
303 | } | ||
304 | return err | ||
305 | } | ||
306 | |||
307 | func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) { | ||
308 | // With this transport type there will be exactly 1 stream: this HTTP request. | ||
309 | |||
310 | ctx := ht.req.Context() | ||
311 | var cancel context.CancelFunc | ||
312 | if ht.timeoutSet { | ||
313 | ctx, cancel = context.WithTimeout(ctx, ht.timeout) | ||
314 | } else { | ||
315 | ctx, cancel = context.WithCancel(ctx) | ||
316 | } | ||
317 | |||
318 | // requestOver is closed when either the request's context is done | ||
319 | // or the status has been written via WriteStatus. | ||
320 | requestOver := make(chan struct{}) | ||
321 | |||
322 | // clientGone receives a single value if peer is gone, either | ||
323 | // because the underlying connection is dead or because the | ||
324 | // peer sends an http2 RST_STREAM. | ||
325 | clientGone := ht.rw.(http.CloseNotifier).CloseNotify() | ||
326 | go func() { | ||
327 | select { | ||
328 | case <-requestOver: | ||
329 | case <-ht.closedCh: | ||
330 | case <-clientGone: | ||
331 | } | ||
332 | cancel() | ||
333 | ht.Close() | ||
334 | }() | ||
335 | |||
336 | req := ht.req | ||
337 | |||
338 | s := &Stream{ | ||
339 | id: 0, // irrelevant | ||
340 | requestRead: func(int) {}, | ||
341 | cancel: cancel, | ||
342 | buf: newRecvBuffer(), | ||
343 | st: ht, | ||
344 | method: req.URL.Path, | ||
345 | recvCompress: req.Header.Get("grpc-encoding"), | ||
346 | contentSubtype: ht.contentSubtype, | ||
347 | } | ||
348 | pr := &peer.Peer{ | ||
349 | Addr: ht.RemoteAddr(), | ||
350 | } | ||
351 | if req.TLS != nil { | ||
352 | pr.AuthInfo = credentials.TLSInfo{State: *req.TLS} | ||
353 | } | ||
354 | ctx = metadata.NewIncomingContext(ctx, ht.headerMD) | ||
355 | s.ctx = peer.NewContext(ctx, pr) | ||
356 | if ht.stats != nil { | ||
357 | s.ctx = ht.stats.TagRPC(s.ctx, &stats.RPCTagInfo{FullMethodName: s.method}) | ||
358 | inHeader := &stats.InHeader{ | ||
359 | FullMethod: s.method, | ||
360 | RemoteAddr: ht.RemoteAddr(), | ||
361 | Compression: s.recvCompress, | ||
362 | } | ||
363 | ht.stats.HandleRPC(s.ctx, inHeader) | ||
364 | } | ||
365 | s.trReader = &transportReader{ | ||
366 | reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf}, | ||
367 | windowHandler: func(int) {}, | ||
368 | } | ||
369 | |||
370 | // readerDone is closed when the Body.Read-ing goroutine exits. | ||
371 | readerDone := make(chan struct{}) | ||
372 | go func() { | ||
373 | defer close(readerDone) | ||
374 | |||
375 | // TODO: minimize garbage, optimize recvBuffer code/ownership | ||
376 | const readSize = 8196 | ||
377 | for buf := make([]byte, readSize); ; { | ||
378 | n, err := req.Body.Read(buf) | ||
379 | if n > 0 { | ||
380 | s.buf.put(recvMsg{data: buf[:n:n]}) | ||
381 | buf = buf[n:] | ||
382 | } | ||
383 | if err != nil { | ||
384 | s.buf.put(recvMsg{err: mapRecvMsgError(err)}) | ||
385 | return | ||
386 | } | ||
387 | if len(buf) == 0 { | ||
388 | buf = make([]byte, readSize) | ||
389 | } | ||
390 | } | ||
391 | }() | ||
392 | |||
393 | // startStream is provided by the *grpc.Server's serveStreams. | ||
394 | // It starts a goroutine serving s and exits immediately. | ||
395 | // The goroutine that is started is the one that then calls | ||
396 | // into ht, calling WriteHeader, Write, WriteStatus, Close, etc. | ||
397 | startStream(s) | ||
398 | |||
399 | ht.runStream() | ||
400 | close(requestOver) | ||
401 | |||
402 | // Wait for reading goroutine to finish. | ||
403 | req.Body.Close() | ||
404 | <-readerDone | ||
405 | } | ||
406 | |||
407 | func (ht *serverHandlerTransport) runStream() { | ||
408 | for { | ||
409 | select { | ||
410 | case fn, ok := <-ht.writes: | ||
411 | if !ok { | ||
412 | return | ||
413 | } | ||
414 | fn() | ||
415 | case <-ht.closedCh: | ||
416 | return | ||
417 | } | ||
418 | } | ||
419 | } | ||
420 | |||
421 | func (ht *serverHandlerTransport) IncrMsgSent() {} | ||
422 | |||
423 | func (ht *serverHandlerTransport) IncrMsgRecv() {} | ||
424 | |||
425 | func (ht *serverHandlerTransport) Drain() { | ||
426 | panic("Drain() is not implemented") | ||
427 | } | ||
428 | |||
429 | // mapRecvMsgError returns the non-nil err into the appropriate | ||
430 | // error value as expected by callers of *grpc.parser.recvMsg. | ||
431 | // In particular, in can only be: | ||
432 | // * io.EOF | ||
433 | // * io.ErrUnexpectedEOF | ||
434 | // * of type transport.ConnectionError | ||
435 | // * an error from the status package | ||
436 | func mapRecvMsgError(err error) error { | ||
437 | if err == io.EOF || err == io.ErrUnexpectedEOF { | ||
438 | return err | ||
439 | } | ||
440 | if se, ok := err.(http2.StreamError); ok { | ||
441 | if code, ok := http2ErrConvTab[se.Code]; ok { | ||
442 | return status.Error(code, se.Error()) | ||
443 | } | ||
444 | } | ||
445 | if strings.Contains(err.Error(), "body closed by handler") { | ||
446 | return status.Error(codes.Canceled, err.Error()) | ||
447 | } | ||
448 | return connectionErrorf(true, err, err.Error()) | ||
449 | } | ||