diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/transport/handler_server.go')
-rw-r--r-- | vendor/google.golang.org/grpc/transport/handler_server.go | 393 |
1 files changed, 393 insertions, 0 deletions
diff --git a/vendor/google.golang.org/grpc/transport/handler_server.go b/vendor/google.golang.org/grpc/transport/handler_server.go new file mode 100644 index 0000000..27372b5 --- /dev/null +++ b/vendor/google.golang.org/grpc/transport/handler_server.go | |||
@@ -0,0 +1,393 @@ | |||
1 | /* | ||
2 | * | ||
3 | * Copyright 2016 gRPC authors. | ||
4 | * | ||
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | * you may not use this file except in compliance with the License. | ||
7 | * You may obtain a copy of the License at | ||
8 | * | ||
9 | * http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | * | ||
11 | * Unless required by applicable law or agreed to in writing, software | ||
12 | * distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | * See the License for the specific language governing permissions and | ||
15 | * limitations under the License. | ||
16 | * | ||
17 | */ | ||
18 | |||
19 | // This file is the implementation of a gRPC server using HTTP/2 which | ||
20 | // uses the standard Go http2 Server implementation (via the | ||
21 | // http.Handler interface), rather than speaking low-level HTTP/2 | ||
22 | // frames itself. It is the implementation of *grpc.Server.ServeHTTP. | ||
23 | |||
24 | package transport | ||
25 | |||
26 | import ( | ||
27 | "errors" | ||
28 | "fmt" | ||
29 | "io" | ||
30 | "net" | ||
31 | "net/http" | ||
32 | "strings" | ||
33 | "sync" | ||
34 | "time" | ||
35 | |||
36 | "golang.org/x/net/context" | ||
37 | "golang.org/x/net/http2" | ||
38 | "google.golang.org/grpc/codes" | ||
39 | "google.golang.org/grpc/credentials" | ||
40 | "google.golang.org/grpc/metadata" | ||
41 | "google.golang.org/grpc/peer" | ||
42 | "google.golang.org/grpc/status" | ||
43 | ) | ||
44 | |||
45 | // NewServerHandlerTransport returns a ServerTransport handling gRPC | ||
46 | // from inside an http.Handler. It requires that the http Server | ||
47 | // supports HTTP/2. | ||
48 | func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTransport, error) { | ||
49 | if r.ProtoMajor != 2 { | ||
50 | return nil, errors.New("gRPC requires HTTP/2") | ||
51 | } | ||
52 | if r.Method != "POST" { | ||
53 | return nil, errors.New("invalid gRPC request method") | ||
54 | } | ||
55 | if !validContentType(r.Header.Get("Content-Type")) { | ||
56 | return nil, errors.New("invalid gRPC request content-type") | ||
57 | } | ||
58 | if _, ok := w.(http.Flusher); !ok { | ||
59 | return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher") | ||
60 | } | ||
61 | if _, ok := w.(http.CloseNotifier); !ok { | ||
62 | return nil, errors.New("gRPC requires a ResponseWriter supporting http.CloseNotifier") | ||
63 | } | ||
64 | |||
65 | st := &serverHandlerTransport{ | ||
66 | rw: w, | ||
67 | req: r, | ||
68 | closedCh: make(chan struct{}), | ||
69 | writes: make(chan func()), | ||
70 | } | ||
71 | |||
72 | if v := r.Header.Get("grpc-timeout"); v != "" { | ||
73 | to, err := decodeTimeout(v) | ||
74 | if err != nil { | ||
75 | return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err) | ||
76 | } | ||
77 | st.timeoutSet = true | ||
78 | st.timeout = to | ||
79 | } | ||
80 | |||
81 | var metakv []string | ||
82 | if r.Host != "" { | ||
83 | metakv = append(metakv, ":authority", r.Host) | ||
84 | } | ||
85 | for k, vv := range r.Header { | ||
86 | k = strings.ToLower(k) | ||
87 | if isReservedHeader(k) && !isWhitelistedPseudoHeader(k) { | ||
88 | continue | ||
89 | } | ||
90 | for _, v := range vv { | ||
91 | v, err := decodeMetadataHeader(k, v) | ||
92 | if err != nil { | ||
93 | return nil, streamErrorf(codes.InvalidArgument, "malformed binary metadata: %v", err) | ||
94 | } | ||
95 | metakv = append(metakv, k, v) | ||
96 | } | ||
97 | } | ||
98 | st.headerMD = metadata.Pairs(metakv...) | ||
99 | |||
100 | return st, nil | ||
101 | } | ||
102 | |||
103 | // serverHandlerTransport is an implementation of ServerTransport | ||
104 | // which replies to exactly one gRPC request (exactly one HTTP request), | ||
105 | // using the net/http.Handler interface. This http.Handler is guaranteed | ||
106 | // at this point to be speaking over HTTP/2, so it's able to speak valid | ||
107 | // gRPC. | ||
108 | type serverHandlerTransport struct { | ||
109 | rw http.ResponseWriter | ||
110 | req *http.Request | ||
111 | timeoutSet bool | ||
112 | timeout time.Duration | ||
113 | didCommonHeaders bool | ||
114 | |||
115 | headerMD metadata.MD | ||
116 | |||
117 | closeOnce sync.Once | ||
118 | closedCh chan struct{} // closed on Close | ||
119 | |||
120 | // writes is a channel of code to run serialized in the | ||
121 | // ServeHTTP (HandleStreams) goroutine. The channel is closed | ||
122 | // when WriteStatus is called. | ||
123 | writes chan func() | ||
124 | } | ||
125 | |||
126 | func (ht *serverHandlerTransport) Close() error { | ||
127 | ht.closeOnce.Do(ht.closeCloseChanOnce) | ||
128 | return nil | ||
129 | } | ||
130 | |||
131 | func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) } | ||
132 | |||
133 | func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) } | ||
134 | |||
135 | // strAddr is a net.Addr backed by either a TCP "ip:port" string, or | ||
136 | // the empty string if unknown. | ||
137 | type strAddr string | ||
138 | |||
139 | func (a strAddr) Network() string { | ||
140 | if a != "" { | ||
141 | // Per the documentation on net/http.Request.RemoteAddr, if this is | ||
142 | // set, it's set to the IP:port of the peer (hence, TCP): | ||
143 | // https://golang.org/pkg/net/http/#Request | ||
144 | // | ||
145 | // If we want to support Unix sockets later, we can | ||
146 | // add our own grpc-specific convention within the | ||
147 | // grpc codebase to set RemoteAddr to a different | ||
148 | // format, or probably better: we can attach it to the | ||
149 | // context and use that from serverHandlerTransport.RemoteAddr. | ||
150 | return "tcp" | ||
151 | } | ||
152 | return "" | ||
153 | } | ||
154 | |||
155 | func (a strAddr) String() string { return string(a) } | ||
156 | |||
157 | // do runs fn in the ServeHTTP goroutine. | ||
158 | func (ht *serverHandlerTransport) do(fn func()) error { | ||
159 | // Avoid a panic writing to closed channel. Imperfect but maybe good enough. | ||
160 | select { | ||
161 | case <-ht.closedCh: | ||
162 | return ErrConnClosing | ||
163 | default: | ||
164 | select { | ||
165 | case ht.writes <- fn: | ||
166 | return nil | ||
167 | case <-ht.closedCh: | ||
168 | return ErrConnClosing | ||
169 | } | ||
170 | |||
171 | } | ||
172 | } | ||
173 | |||
174 | func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error { | ||
175 | err := ht.do(func() { | ||
176 | ht.writeCommonHeaders(s) | ||
177 | |||
178 | // And flush, in case no header or body has been sent yet. | ||
179 | // This forces a separation of headers and trailers if this is the | ||
180 | // first call (for example, in end2end tests's TestNoService). | ||
181 | ht.rw.(http.Flusher).Flush() | ||
182 | |||
183 | h := ht.rw.Header() | ||
184 | h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code())) | ||
185 | if m := st.Message(); m != "" { | ||
186 | h.Set("Grpc-Message", encodeGrpcMessage(m)) | ||
187 | } | ||
188 | |||
189 | // TODO: Support Grpc-Status-Details-Bin | ||
190 | |||
191 | if md := s.Trailer(); len(md) > 0 { | ||
192 | for k, vv := range md { | ||
193 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | ||
194 | if isReservedHeader(k) { | ||
195 | continue | ||
196 | } | ||
197 | for _, v := range vv { | ||
198 | // http2 ResponseWriter mechanism to send undeclared Trailers after | ||
199 | // the headers have possibly been written. | ||
200 | h.Add(http2.TrailerPrefix+k, encodeMetadataHeader(k, v)) | ||
201 | } | ||
202 | } | ||
203 | } | ||
204 | }) | ||
205 | close(ht.writes) | ||
206 | return err | ||
207 | } | ||
208 | |||
209 | // writeCommonHeaders sets common headers on the first write | ||
210 | // call (Write, WriteHeader, or WriteStatus). | ||
211 | func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) { | ||
212 | if ht.didCommonHeaders { | ||
213 | return | ||
214 | } | ||
215 | ht.didCommonHeaders = true | ||
216 | |||
217 | h := ht.rw.Header() | ||
218 | h["Date"] = nil // suppress Date to make tests happy; TODO: restore | ||
219 | h.Set("Content-Type", "application/grpc") | ||
220 | |||
221 | // Predeclare trailers we'll set later in WriteStatus (after the body). | ||
222 | // This is a SHOULD in the HTTP RFC, and the way you add (known) | ||
223 | // Trailers per the net/http.ResponseWriter contract. | ||
224 | // See https://golang.org/pkg/net/http/#ResponseWriter | ||
225 | // and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers | ||
226 | h.Add("Trailer", "Grpc-Status") | ||
227 | h.Add("Trailer", "Grpc-Message") | ||
228 | // TODO: Support Grpc-Status-Details-Bin | ||
229 | |||
230 | if s.sendCompress != "" { | ||
231 | h.Set("Grpc-Encoding", s.sendCompress) | ||
232 | } | ||
233 | } | ||
234 | |||
235 | func (ht *serverHandlerTransport) Write(s *Stream, data []byte, opts *Options) error { | ||
236 | return ht.do(func() { | ||
237 | ht.writeCommonHeaders(s) | ||
238 | ht.rw.Write(data) | ||
239 | if !opts.Delay { | ||
240 | ht.rw.(http.Flusher).Flush() | ||
241 | } | ||
242 | }) | ||
243 | } | ||
244 | |||
245 | func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error { | ||
246 | return ht.do(func() { | ||
247 | ht.writeCommonHeaders(s) | ||
248 | h := ht.rw.Header() | ||
249 | for k, vv := range md { | ||
250 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | ||
251 | if isReservedHeader(k) { | ||
252 | continue | ||
253 | } | ||
254 | for _, v := range vv { | ||
255 | v = encodeMetadataHeader(k, v) | ||
256 | h.Add(k, v) | ||
257 | } | ||
258 | } | ||
259 | ht.rw.WriteHeader(200) | ||
260 | ht.rw.(http.Flusher).Flush() | ||
261 | }) | ||
262 | } | ||
263 | |||
264 | func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) { | ||
265 | // With this transport type there will be exactly 1 stream: this HTTP request. | ||
266 | |||
267 | var ctx context.Context | ||
268 | var cancel context.CancelFunc | ||
269 | if ht.timeoutSet { | ||
270 | ctx, cancel = context.WithTimeout(context.Background(), ht.timeout) | ||
271 | } else { | ||
272 | ctx, cancel = context.WithCancel(context.Background()) | ||
273 | } | ||
274 | |||
275 | // requestOver is closed when either the request's context is done | ||
276 | // or the status has been written via WriteStatus. | ||
277 | requestOver := make(chan struct{}) | ||
278 | |||
279 | // clientGone receives a single value if peer is gone, either | ||
280 | // because the underlying connection is dead or because the | ||
281 | // peer sends an http2 RST_STREAM. | ||
282 | clientGone := ht.rw.(http.CloseNotifier).CloseNotify() | ||
283 | go func() { | ||
284 | select { | ||
285 | case <-requestOver: | ||
286 | return | ||
287 | case <-ht.closedCh: | ||
288 | case <-clientGone: | ||
289 | } | ||
290 | cancel() | ||
291 | }() | ||
292 | |||
293 | req := ht.req | ||
294 | |||
295 | s := &Stream{ | ||
296 | id: 0, // irrelevant | ||
297 | requestRead: func(int) {}, | ||
298 | cancel: cancel, | ||
299 | buf: newRecvBuffer(), | ||
300 | st: ht, | ||
301 | method: req.URL.Path, | ||
302 | recvCompress: req.Header.Get("grpc-encoding"), | ||
303 | } | ||
304 | pr := &peer.Peer{ | ||
305 | Addr: ht.RemoteAddr(), | ||
306 | } | ||
307 | if req.TLS != nil { | ||
308 | pr.AuthInfo = credentials.TLSInfo{State: *req.TLS} | ||
309 | } | ||
310 | ctx = metadata.NewIncomingContext(ctx, ht.headerMD) | ||
311 | ctx = peer.NewContext(ctx, pr) | ||
312 | s.ctx = newContextWithStream(ctx, s) | ||
313 | s.trReader = &transportReader{ | ||
314 | reader: &recvBufferReader{ctx: s.ctx, recv: s.buf}, | ||
315 | windowHandler: func(int) {}, | ||
316 | } | ||
317 | |||
318 | // readerDone is closed when the Body.Read-ing goroutine exits. | ||
319 | readerDone := make(chan struct{}) | ||
320 | go func() { | ||
321 | defer close(readerDone) | ||
322 | |||
323 | // TODO: minimize garbage, optimize recvBuffer code/ownership | ||
324 | const readSize = 8196 | ||
325 | for buf := make([]byte, readSize); ; { | ||
326 | n, err := req.Body.Read(buf) | ||
327 | if n > 0 { | ||
328 | s.buf.put(recvMsg{data: buf[:n:n]}) | ||
329 | buf = buf[n:] | ||
330 | } | ||
331 | if err != nil { | ||
332 | s.buf.put(recvMsg{err: mapRecvMsgError(err)}) | ||
333 | return | ||
334 | } | ||
335 | if len(buf) == 0 { | ||
336 | buf = make([]byte, readSize) | ||
337 | } | ||
338 | } | ||
339 | }() | ||
340 | |||
341 | // startStream is provided by the *grpc.Server's serveStreams. | ||
342 | // It starts a goroutine serving s and exits immediately. | ||
343 | // The goroutine that is started is the one that then calls | ||
344 | // into ht, calling WriteHeader, Write, WriteStatus, Close, etc. | ||
345 | startStream(s) | ||
346 | |||
347 | ht.runStream() | ||
348 | close(requestOver) | ||
349 | |||
350 | // Wait for reading goroutine to finish. | ||
351 | req.Body.Close() | ||
352 | <-readerDone | ||
353 | } | ||
354 | |||
355 | func (ht *serverHandlerTransport) runStream() { | ||
356 | for { | ||
357 | select { | ||
358 | case fn, ok := <-ht.writes: | ||
359 | if !ok { | ||
360 | return | ||
361 | } | ||
362 | fn() | ||
363 | case <-ht.closedCh: | ||
364 | return | ||
365 | } | ||
366 | } | ||
367 | } | ||
368 | |||
369 | func (ht *serverHandlerTransport) Drain() { | ||
370 | panic("Drain() is not implemented") | ||
371 | } | ||
372 | |||
373 | // mapRecvMsgError returns the non-nil err into the appropriate | ||
374 | // error value as expected by callers of *grpc.parser.recvMsg. | ||
375 | // In particular, in can only be: | ||
376 | // * io.EOF | ||
377 | // * io.ErrUnexpectedEOF | ||
378 | // * of type transport.ConnectionError | ||
379 | // * of type transport.StreamError | ||
380 | func mapRecvMsgError(err error) error { | ||
381 | if err == io.EOF || err == io.ErrUnexpectedEOF { | ||
382 | return err | ||
383 | } | ||
384 | if se, ok := err.(http2.StreamError); ok { | ||
385 | if code, ok := http2ErrConvTab[se.Code]; ok { | ||
386 | return StreamError{ | ||
387 | Code: code, | ||
388 | Desc: se.Error(), | ||
389 | } | ||
390 | } | ||
391 | } | ||
392 | return connectionErrorf(true, err, err.Error()) | ||
393 | } | ||