]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | /* |
2 | * | |
3 | * Copyright 2016 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | // This file is the implementation of a gRPC server using HTTP/2 which | |
20 | // uses the standard Go http2 Server implementation (via the | |
21 | // http.Handler interface), rather than speaking low-level HTTP/2 | |
22 | // frames itself. It is the implementation of *grpc.Server.ServeHTTP. | |
23 | ||
24 | package transport | |
25 | ||
26 | import ( | |
27 | "errors" | |
28 | "fmt" | |
29 | "io" | |
30 | "net" | |
31 | "net/http" | |
32 | "strings" | |
33 | "sync" | |
34 | "time" | |
35 | ||
36 | "golang.org/x/net/context" | |
37 | "golang.org/x/net/http2" | |
38 | "google.golang.org/grpc/codes" | |
39 | "google.golang.org/grpc/credentials" | |
40 | "google.golang.org/grpc/metadata" | |
41 | "google.golang.org/grpc/peer" | |
42 | "google.golang.org/grpc/status" | |
43 | ) | |
44 | ||
45 | // NewServerHandlerTransport returns a ServerTransport handling gRPC | |
46 | // from inside an http.Handler. It requires that the http Server | |
47 | // supports HTTP/2. | |
48 | func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request) (ServerTransport, error) { | |
49 | if r.ProtoMajor != 2 { | |
50 | return nil, errors.New("gRPC requires HTTP/2") | |
51 | } | |
52 | if r.Method != "POST" { | |
53 | return nil, errors.New("invalid gRPC request method") | |
54 | } | |
55 | if !validContentType(r.Header.Get("Content-Type")) { | |
56 | return nil, errors.New("invalid gRPC request content-type") | |
57 | } | |
58 | if _, ok := w.(http.Flusher); !ok { | |
59 | return nil, errors.New("gRPC requires a ResponseWriter supporting http.Flusher") | |
60 | } | |
61 | if _, ok := w.(http.CloseNotifier); !ok { | |
62 | return nil, errors.New("gRPC requires a ResponseWriter supporting http.CloseNotifier") | |
63 | } | |
64 | ||
65 | st := &serverHandlerTransport{ | |
66 | rw: w, | |
67 | req: r, | |
68 | closedCh: make(chan struct{}), | |
69 | writes: make(chan func()), | |
70 | } | |
71 | ||
72 | if v := r.Header.Get("grpc-timeout"); v != "" { | |
73 | to, err := decodeTimeout(v) | |
74 | if err != nil { | |
75 | return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err) | |
76 | } | |
77 | st.timeoutSet = true | |
78 | st.timeout = to | |
79 | } | |
80 | ||
81 | var metakv []string | |
82 | if r.Host != "" { | |
83 | metakv = append(metakv, ":authority", r.Host) | |
84 | } | |
85 | for k, vv := range r.Header { | |
86 | k = strings.ToLower(k) | |
87 | if isReservedHeader(k) && !isWhitelistedPseudoHeader(k) { | |
88 | continue | |
89 | } | |
90 | for _, v := range vv { | |
91 | v, err := decodeMetadataHeader(k, v) | |
92 | if err != nil { | |
93 | return nil, streamErrorf(codes.InvalidArgument, "malformed binary metadata: %v", err) | |
94 | } | |
95 | metakv = append(metakv, k, v) | |
96 | } | |
97 | } | |
98 | st.headerMD = metadata.Pairs(metakv...) | |
99 | ||
100 | return st, nil | |
101 | } | |
102 | ||
103 | // serverHandlerTransport is an implementation of ServerTransport | |
104 | // which replies to exactly one gRPC request (exactly one HTTP request), | |
105 | // using the net/http.Handler interface. This http.Handler is guaranteed | |
106 | // at this point to be speaking over HTTP/2, so it's able to speak valid | |
107 | // gRPC. | |
108 | type serverHandlerTransport struct { | |
109 | rw http.ResponseWriter | |
110 | req *http.Request | |
111 | timeoutSet bool | |
112 | timeout time.Duration | |
113 | didCommonHeaders bool | |
114 | ||
115 | headerMD metadata.MD | |
116 | ||
117 | closeOnce sync.Once | |
118 | closedCh chan struct{} // closed on Close | |
119 | ||
120 | // writes is a channel of code to run serialized in the | |
121 | // ServeHTTP (HandleStreams) goroutine. The channel is closed | |
122 | // when WriteStatus is called. | |
123 | writes chan func() | |
124 | } | |
125 | ||
126 | func (ht *serverHandlerTransport) Close() error { | |
127 | ht.closeOnce.Do(ht.closeCloseChanOnce) | |
128 | return nil | |
129 | } | |
130 | ||
131 | func (ht *serverHandlerTransport) closeCloseChanOnce() { close(ht.closedCh) } | |
132 | ||
133 | func (ht *serverHandlerTransport) RemoteAddr() net.Addr { return strAddr(ht.req.RemoteAddr) } | |
134 | ||
135 | // strAddr is a net.Addr backed by either a TCP "ip:port" string, or | |
136 | // the empty string if unknown. | |
137 | type strAddr string | |
138 | ||
139 | func (a strAddr) Network() string { | |
140 | if a != "" { | |
141 | // Per the documentation on net/http.Request.RemoteAddr, if this is | |
142 | // set, it's set to the IP:port of the peer (hence, TCP): | |
143 | // https://golang.org/pkg/net/http/#Request | |
144 | // | |
145 | // If we want to support Unix sockets later, we can | |
146 | // add our own grpc-specific convention within the | |
147 | // grpc codebase to set RemoteAddr to a different | |
148 | // format, or probably better: we can attach it to the | |
149 | // context and use that from serverHandlerTransport.RemoteAddr. | |
150 | return "tcp" | |
151 | } | |
152 | return "" | |
153 | } | |
154 | ||
155 | func (a strAddr) String() string { return string(a) } | |
156 | ||
157 | // do runs fn in the ServeHTTP goroutine. | |
158 | func (ht *serverHandlerTransport) do(fn func()) error { | |
159 | // Avoid a panic writing to closed channel. Imperfect but maybe good enough. | |
160 | select { | |
161 | case <-ht.closedCh: | |
162 | return ErrConnClosing | |
163 | default: | |
164 | select { | |
165 | case ht.writes <- fn: | |
166 | return nil | |
167 | case <-ht.closedCh: | |
168 | return ErrConnClosing | |
169 | } | |
170 | ||
171 | } | |
172 | } | |
173 | ||
174 | func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error { | |
175 | err := ht.do(func() { | |
176 | ht.writeCommonHeaders(s) | |
177 | ||
178 | // And flush, in case no header or body has been sent yet. | |
179 | // This forces a separation of headers and trailers if this is the | |
180 | // first call (for example, in end2end tests's TestNoService). | |
181 | ht.rw.(http.Flusher).Flush() | |
182 | ||
183 | h := ht.rw.Header() | |
184 | h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code())) | |
185 | if m := st.Message(); m != "" { | |
186 | h.Set("Grpc-Message", encodeGrpcMessage(m)) | |
187 | } | |
188 | ||
189 | // TODO: Support Grpc-Status-Details-Bin | |
190 | ||
191 | if md := s.Trailer(); len(md) > 0 { | |
192 | for k, vv := range md { | |
193 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | |
194 | if isReservedHeader(k) { | |
195 | continue | |
196 | } | |
197 | for _, v := range vv { | |
198 | // http2 ResponseWriter mechanism to send undeclared Trailers after | |
199 | // the headers have possibly been written. | |
200 | h.Add(http2.TrailerPrefix+k, encodeMetadataHeader(k, v)) | |
201 | } | |
202 | } | |
203 | } | |
204 | }) | |
205 | close(ht.writes) | |
206 | return err | |
207 | } | |
208 | ||
209 | // writeCommonHeaders sets common headers on the first write | |
210 | // call (Write, WriteHeader, or WriteStatus). | |
211 | func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) { | |
212 | if ht.didCommonHeaders { | |
213 | return | |
214 | } | |
215 | ht.didCommonHeaders = true | |
216 | ||
217 | h := ht.rw.Header() | |
218 | h["Date"] = nil // suppress Date to make tests happy; TODO: restore | |
219 | h.Set("Content-Type", "application/grpc") | |
220 | ||
221 | // Predeclare trailers we'll set later in WriteStatus (after the body). | |
222 | // This is a SHOULD in the HTTP RFC, and the way you add (known) | |
223 | // Trailers per the net/http.ResponseWriter contract. | |
224 | // See https://golang.org/pkg/net/http/#ResponseWriter | |
225 | // and https://golang.org/pkg/net/http/#example_ResponseWriter_trailers | |
226 | h.Add("Trailer", "Grpc-Status") | |
227 | h.Add("Trailer", "Grpc-Message") | |
228 | // TODO: Support Grpc-Status-Details-Bin | |
229 | ||
230 | if s.sendCompress != "" { | |
231 | h.Set("Grpc-Encoding", s.sendCompress) | |
232 | } | |
233 | } | |
234 | ||
235 | func (ht *serverHandlerTransport) Write(s *Stream, data []byte, opts *Options) error { | |
236 | return ht.do(func() { | |
237 | ht.writeCommonHeaders(s) | |
238 | ht.rw.Write(data) | |
239 | if !opts.Delay { | |
240 | ht.rw.(http.Flusher).Flush() | |
241 | } | |
242 | }) | |
243 | } | |
244 | ||
245 | func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error { | |
246 | return ht.do(func() { | |
247 | ht.writeCommonHeaders(s) | |
248 | h := ht.rw.Header() | |
249 | for k, vv := range md { | |
250 | // Clients don't tolerate reading restricted headers after some non restricted ones were sent. | |
251 | if isReservedHeader(k) { | |
252 | continue | |
253 | } | |
254 | for _, v := range vv { | |
255 | v = encodeMetadataHeader(k, v) | |
256 | h.Add(k, v) | |
257 | } | |
258 | } | |
259 | ht.rw.WriteHeader(200) | |
260 | ht.rw.(http.Flusher).Flush() | |
261 | }) | |
262 | } | |
263 | ||
264 | func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), traceCtx func(context.Context, string) context.Context) { | |
265 | // With this transport type there will be exactly 1 stream: this HTTP request. | |
266 | ||
267 | var ctx context.Context | |
268 | var cancel context.CancelFunc | |
269 | if ht.timeoutSet { | |
270 | ctx, cancel = context.WithTimeout(context.Background(), ht.timeout) | |
271 | } else { | |
272 | ctx, cancel = context.WithCancel(context.Background()) | |
273 | } | |
274 | ||
275 | // requestOver is closed when either the request's context is done | |
276 | // or the status has been written via WriteStatus. | |
277 | requestOver := make(chan struct{}) | |
278 | ||
279 | // clientGone receives a single value if peer is gone, either | |
280 | // because the underlying connection is dead or because the | |
281 | // peer sends an http2 RST_STREAM. | |
282 | clientGone := ht.rw.(http.CloseNotifier).CloseNotify() | |
283 | go func() { | |
284 | select { | |
285 | case <-requestOver: | |
286 | return | |
287 | case <-ht.closedCh: | |
288 | case <-clientGone: | |
289 | } | |
290 | cancel() | |
291 | }() | |
292 | ||
293 | req := ht.req | |
294 | ||
295 | s := &Stream{ | |
296 | id: 0, // irrelevant | |
297 | requestRead: func(int) {}, | |
298 | cancel: cancel, | |
299 | buf: newRecvBuffer(), | |
300 | st: ht, | |
301 | method: req.URL.Path, | |
302 | recvCompress: req.Header.Get("grpc-encoding"), | |
303 | } | |
304 | pr := &peer.Peer{ | |
305 | Addr: ht.RemoteAddr(), | |
306 | } | |
307 | if req.TLS != nil { | |
308 | pr.AuthInfo = credentials.TLSInfo{State: *req.TLS} | |
309 | } | |
310 | ctx = metadata.NewIncomingContext(ctx, ht.headerMD) | |
311 | ctx = peer.NewContext(ctx, pr) | |
312 | s.ctx = newContextWithStream(ctx, s) | |
313 | s.trReader = &transportReader{ | |
314 | reader: &recvBufferReader{ctx: s.ctx, recv: s.buf}, | |
315 | windowHandler: func(int) {}, | |
316 | } | |
317 | ||
318 | // readerDone is closed when the Body.Read-ing goroutine exits. | |
319 | readerDone := make(chan struct{}) | |
320 | go func() { | |
321 | defer close(readerDone) | |
322 | ||
323 | // TODO: minimize garbage, optimize recvBuffer code/ownership | |
324 | const readSize = 8196 | |
325 | for buf := make([]byte, readSize); ; { | |
326 | n, err := req.Body.Read(buf) | |
327 | if n > 0 { | |
328 | s.buf.put(recvMsg{data: buf[:n:n]}) | |
329 | buf = buf[n:] | |
330 | } | |
331 | if err != nil { | |
332 | s.buf.put(recvMsg{err: mapRecvMsgError(err)}) | |
333 | return | |
334 | } | |
335 | if len(buf) == 0 { | |
336 | buf = make([]byte, readSize) | |
337 | } | |
338 | } | |
339 | }() | |
340 | ||
341 | // startStream is provided by the *grpc.Server's serveStreams. | |
342 | // It starts a goroutine serving s and exits immediately. | |
343 | // The goroutine that is started is the one that then calls | |
344 | // into ht, calling WriteHeader, Write, WriteStatus, Close, etc. | |
345 | startStream(s) | |
346 | ||
347 | ht.runStream() | |
348 | close(requestOver) | |
349 | ||
350 | // Wait for reading goroutine to finish. | |
351 | req.Body.Close() | |
352 | <-readerDone | |
353 | } | |
354 | ||
355 | func (ht *serverHandlerTransport) runStream() { | |
356 | for { | |
357 | select { | |
358 | case fn, ok := <-ht.writes: | |
359 | if !ok { | |
360 | return | |
361 | } | |
362 | fn() | |
363 | case <-ht.closedCh: | |
364 | return | |
365 | } | |
366 | } | |
367 | } | |
368 | ||
369 | func (ht *serverHandlerTransport) Drain() { | |
370 | panic("Drain() is not implemented") | |
371 | } | |
372 | ||
373 | // mapRecvMsgError returns the non-nil err into the appropriate | |
374 | // error value as expected by callers of *grpc.parser.recvMsg. | |
375 | // In particular, in can only be: | |
376 | // * io.EOF | |
377 | // * io.ErrUnexpectedEOF | |
378 | // * of type transport.ConnectionError | |
379 | // * of type transport.StreamError | |
380 | func mapRecvMsgError(err error) error { | |
381 | if err == io.EOF || err == io.ErrUnexpectedEOF { | |
382 | return err | |
383 | } | |
384 | if se, ok := err.(http2.StreamError); ok { | |
385 | if code, ok := http2ErrConvTab[se.Code]; ok { | |
386 | return StreamError{ | |
387 | Code: code, | |
388 | Desc: se.Error(), | |
389 | } | |
390 | } | |
391 | } | |
392 | return connectionErrorf(true, err, err.Error()) | |
393 | } |