diff options
Diffstat (limited to 'vendor/google.golang.org/appengine/internal/api.go')
-rw-r--r-- | vendor/google.golang.org/appengine/internal/api.go | 671 |
1 files changed, 671 insertions, 0 deletions
diff --git a/vendor/google.golang.org/appengine/internal/api.go b/vendor/google.golang.org/appengine/internal/api.go new file mode 100644 index 0000000..bbc1cb9 --- /dev/null +++ b/vendor/google.golang.org/appengine/internal/api.go | |||
@@ -0,0 +1,671 @@ | |||
1 | // Copyright 2011 Google Inc. All rights reserved. | ||
2 | // Use of this source code is governed by the Apache 2.0 | ||
3 | // license that can be found in the LICENSE file. | ||
4 | |||
5 | // +build !appengine | ||
6 | |||
7 | package internal | ||
8 | |||
9 | import ( | ||
10 | "bytes" | ||
11 | "errors" | ||
12 | "fmt" | ||
13 | "io/ioutil" | ||
14 | "log" | ||
15 | "net" | ||
16 | "net/http" | ||
17 | "net/url" | ||
18 | "os" | ||
19 | "runtime" | ||
20 | "strconv" | ||
21 | "strings" | ||
22 | "sync" | ||
23 | "sync/atomic" | ||
24 | "time" | ||
25 | |||
26 | "github.com/golang/protobuf/proto" | ||
27 | netcontext "golang.org/x/net/context" | ||
28 | |||
29 | basepb "google.golang.org/appengine/internal/base" | ||
30 | logpb "google.golang.org/appengine/internal/log" | ||
31 | remotepb "google.golang.org/appengine/internal/remote_api" | ||
32 | ) | ||
33 | |||
34 | const ( | ||
35 | apiPath = "/rpc_http" | ||
36 | defaultTicketSuffix = "/default.20150612t184001.0" | ||
37 | ) | ||
38 | |||
39 | var ( | ||
40 | // Incoming headers. | ||
41 | ticketHeader = http.CanonicalHeaderKey("X-AppEngine-API-Ticket") | ||
42 | dapperHeader = http.CanonicalHeaderKey("X-Google-DapperTraceInfo") | ||
43 | traceHeader = http.CanonicalHeaderKey("X-Cloud-Trace-Context") | ||
44 | curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace") | ||
45 | userIPHeader = http.CanonicalHeaderKey("X-AppEngine-User-IP") | ||
46 | remoteAddrHeader = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr") | ||
47 | |||
48 | // Outgoing headers. | ||
49 | apiEndpointHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint") | ||
50 | apiEndpointHeaderValue = []string{"app-engine-apis"} | ||
51 | apiMethodHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Method") | ||
52 | apiMethodHeaderValue = []string{"/VMRemoteAPI.CallRemoteAPI"} | ||
53 | apiDeadlineHeader = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline") | ||
54 | apiContentType = http.CanonicalHeaderKey("Content-Type") | ||
55 | apiContentTypeValue = []string{"application/octet-stream"} | ||
56 | logFlushHeader = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count") | ||
57 | |||
58 | apiHTTPClient = &http.Client{ | ||
59 | Transport: &http.Transport{ | ||
60 | Proxy: http.ProxyFromEnvironment, | ||
61 | Dial: limitDial, | ||
62 | }, | ||
63 | } | ||
64 | |||
65 | defaultTicketOnce sync.Once | ||
66 | defaultTicket string | ||
67 | backgroundContextOnce sync.Once | ||
68 | backgroundContext netcontext.Context | ||
69 | ) | ||
70 | |||
71 | func apiURL() *url.URL { | ||
72 | host, port := "appengine.googleapis.internal", "10001" | ||
73 | if h := os.Getenv("API_HOST"); h != "" { | ||
74 | host = h | ||
75 | } | ||
76 | if p := os.Getenv("API_PORT"); p != "" { | ||
77 | port = p | ||
78 | } | ||
79 | return &url.URL{ | ||
80 | Scheme: "http", | ||
81 | Host: host + ":" + port, | ||
82 | Path: apiPath, | ||
83 | } | ||
84 | } | ||
85 | |||
86 | func handleHTTP(w http.ResponseWriter, r *http.Request) { | ||
87 | c := &context{ | ||
88 | req: r, | ||
89 | outHeader: w.Header(), | ||
90 | apiURL: apiURL(), | ||
91 | } | ||
92 | r = r.WithContext(withContext(r.Context(), c)) | ||
93 | c.req = r | ||
94 | |||
95 | stopFlushing := make(chan int) | ||
96 | |||
97 | // Patch up RemoteAddr so it looks reasonable. | ||
98 | if addr := r.Header.Get(userIPHeader); addr != "" { | ||
99 | r.RemoteAddr = addr | ||
100 | } else if addr = r.Header.Get(remoteAddrHeader); addr != "" { | ||
101 | r.RemoteAddr = addr | ||
102 | } else { | ||
103 | // Should not normally reach here, but pick a sensible default anyway. | ||
104 | r.RemoteAddr = "127.0.0.1" | ||
105 | } | ||
106 | // The address in the headers will most likely be of these forms: | ||
107 | // 123.123.123.123 | ||
108 | // 2001:db8::1 | ||
109 | // net/http.Request.RemoteAddr is specified to be in "IP:port" form. | ||
110 | if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil { | ||
111 | // Assume the remote address is only a host; add a default port. | ||
112 | r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80") | ||
113 | } | ||
114 | |||
115 | // Start goroutine responsible for flushing app logs. | ||
116 | // This is done after adding c to ctx.m (and stopped before removing it) | ||
117 | // because flushing logs requires making an API call. | ||
118 | go c.logFlusher(stopFlushing) | ||
119 | |||
120 | executeRequestSafely(c, r) | ||
121 | c.outHeader = nil // make sure header changes aren't respected any more | ||
122 | |||
123 | stopFlushing <- 1 // any logging beyond this point will be dropped | ||
124 | |||
125 | // Flush any pending logs asynchronously. | ||
126 | c.pendingLogs.Lock() | ||
127 | flushes := c.pendingLogs.flushes | ||
128 | if len(c.pendingLogs.lines) > 0 { | ||
129 | flushes++ | ||
130 | } | ||
131 | c.pendingLogs.Unlock() | ||
132 | flushed := make(chan struct{}) | ||
133 | go func() { | ||
134 | defer close(flushed) | ||
135 | // Force a log flush, because with very short requests we | ||
136 | // may not ever flush logs. | ||
137 | c.flushLog(true) | ||
138 | }() | ||
139 | w.Header().Set(logFlushHeader, strconv.Itoa(flushes)) | ||
140 | |||
141 | // Avoid nil Write call if c.Write is never called. | ||
142 | if c.outCode != 0 { | ||
143 | w.WriteHeader(c.outCode) | ||
144 | } | ||
145 | if c.outBody != nil { | ||
146 | w.Write(c.outBody) | ||
147 | } | ||
148 | // Wait for the last flush to complete before returning, | ||
149 | // otherwise the security ticket will not be valid. | ||
150 | <-flushed | ||
151 | } | ||
152 | |||
153 | func executeRequestSafely(c *context, r *http.Request) { | ||
154 | defer func() { | ||
155 | if x := recover(); x != nil { | ||
156 | logf(c, 4, "%s", renderPanic(x)) // 4 == critical | ||
157 | c.outCode = 500 | ||
158 | } | ||
159 | }() | ||
160 | |||
161 | http.DefaultServeMux.ServeHTTP(c, r) | ||
162 | } | ||
163 | |||
164 | func renderPanic(x interface{}) string { | ||
165 | buf := make([]byte, 16<<10) // 16 KB should be plenty | ||
166 | buf = buf[:runtime.Stack(buf, false)] | ||
167 | |||
168 | // Remove the first few stack frames: | ||
169 | // this func | ||
170 | // the recover closure in the caller | ||
171 | // That will root the stack trace at the site of the panic. | ||
172 | const ( | ||
173 | skipStart = "internal.renderPanic" | ||
174 | skipFrames = 2 | ||
175 | ) | ||
176 | start := bytes.Index(buf, []byte(skipStart)) | ||
177 | p := start | ||
178 | for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ { | ||
179 | p = bytes.IndexByte(buf[p+1:], '\n') + p + 1 | ||
180 | if p < 0 { | ||
181 | break | ||
182 | } | ||
183 | } | ||
184 | if p >= 0 { | ||
185 | // buf[start:p+1] is the block to remove. | ||
186 | // Copy buf[p+1:] over buf[start:] and shrink buf. | ||
187 | copy(buf[start:], buf[p+1:]) | ||
188 | buf = buf[:len(buf)-(p+1-start)] | ||
189 | } | ||
190 | |||
191 | // Add panic heading. | ||
192 | head := fmt.Sprintf("panic: %v\n\n", x) | ||
193 | if len(head) > len(buf) { | ||
194 | // Extremely unlikely to happen. | ||
195 | return head | ||
196 | } | ||
197 | copy(buf[len(head):], buf) | ||
198 | copy(buf, head) | ||
199 | |||
200 | return string(buf) | ||
201 | } | ||
202 | |||
203 | // context represents the context of an in-flight HTTP request. | ||
204 | // It implements the appengine.Context and http.ResponseWriter interfaces. | ||
205 | type context struct { | ||
206 | req *http.Request | ||
207 | |||
208 | outCode int | ||
209 | outHeader http.Header | ||
210 | outBody []byte | ||
211 | |||
212 | pendingLogs struct { | ||
213 | sync.Mutex | ||
214 | lines []*logpb.UserAppLogLine | ||
215 | flushes int | ||
216 | } | ||
217 | |||
218 | apiURL *url.URL | ||
219 | } | ||
220 | |||
221 | var contextKey = "holds a *context" | ||
222 | |||
223 | // jointContext joins two contexts in a superficial way. | ||
224 | // It takes values and timeouts from a base context, and only values from another context. | ||
225 | type jointContext struct { | ||
226 | base netcontext.Context | ||
227 | valuesOnly netcontext.Context | ||
228 | } | ||
229 | |||
230 | func (c jointContext) Deadline() (time.Time, bool) { | ||
231 | return c.base.Deadline() | ||
232 | } | ||
233 | |||
234 | func (c jointContext) Done() <-chan struct{} { | ||
235 | return c.base.Done() | ||
236 | } | ||
237 | |||
238 | func (c jointContext) Err() error { | ||
239 | return c.base.Err() | ||
240 | } | ||
241 | |||
242 | func (c jointContext) Value(key interface{}) interface{} { | ||
243 | if val := c.base.Value(key); val != nil { | ||
244 | return val | ||
245 | } | ||
246 | return c.valuesOnly.Value(key) | ||
247 | } | ||
248 | |||
249 | // fromContext returns the App Engine context or nil if ctx is not | ||
250 | // derived from an App Engine context. | ||
251 | func fromContext(ctx netcontext.Context) *context { | ||
252 | c, _ := ctx.Value(&contextKey).(*context) | ||
253 | return c | ||
254 | } | ||
255 | |||
256 | func withContext(parent netcontext.Context, c *context) netcontext.Context { | ||
257 | ctx := netcontext.WithValue(parent, &contextKey, c) | ||
258 | if ns := c.req.Header.Get(curNamespaceHeader); ns != "" { | ||
259 | ctx = withNamespace(ctx, ns) | ||
260 | } | ||
261 | return ctx | ||
262 | } | ||
263 | |||
264 | func toContext(c *context) netcontext.Context { | ||
265 | return withContext(netcontext.Background(), c) | ||
266 | } | ||
267 | |||
268 | func IncomingHeaders(ctx netcontext.Context) http.Header { | ||
269 | if c := fromContext(ctx); c != nil { | ||
270 | return c.req.Header | ||
271 | } | ||
272 | return nil | ||
273 | } | ||
274 | |||
275 | func ReqContext(req *http.Request) netcontext.Context { | ||
276 | return req.Context() | ||
277 | } | ||
278 | |||
279 | func WithContext(parent netcontext.Context, req *http.Request) netcontext.Context { | ||
280 | return jointContext{ | ||
281 | base: parent, | ||
282 | valuesOnly: req.Context(), | ||
283 | } | ||
284 | } | ||
285 | |||
286 | // DefaultTicket returns a ticket used for background context or dev_appserver. | ||
287 | func DefaultTicket() string { | ||
288 | defaultTicketOnce.Do(func() { | ||
289 | if IsDevAppServer() { | ||
290 | defaultTicket = "testapp" + defaultTicketSuffix | ||
291 | return | ||
292 | } | ||
293 | appID := partitionlessAppID() | ||
294 | escAppID := strings.Replace(strings.Replace(appID, ":", "_", -1), ".", "_", -1) | ||
295 | majVersion := VersionID(nil) | ||
296 | if i := strings.Index(majVersion, "."); i > 0 { | ||
297 | majVersion = majVersion[:i] | ||
298 | } | ||
299 | defaultTicket = fmt.Sprintf("%s/%s.%s.%s", escAppID, ModuleName(nil), majVersion, InstanceID()) | ||
300 | }) | ||
301 | return defaultTicket | ||
302 | } | ||
303 | |||
304 | func BackgroundContext() netcontext.Context { | ||
305 | backgroundContextOnce.Do(func() { | ||
306 | // Compute background security ticket. | ||
307 | ticket := DefaultTicket() | ||
308 | |||
309 | c := &context{ | ||
310 | req: &http.Request{ | ||
311 | Header: http.Header{ | ||
312 | ticketHeader: []string{ticket}, | ||
313 | }, | ||
314 | }, | ||
315 | apiURL: apiURL(), | ||
316 | } | ||
317 | backgroundContext = toContext(c) | ||
318 | |||
319 | // TODO(dsymonds): Wire up the shutdown handler to do a final flush. | ||
320 | go c.logFlusher(make(chan int)) | ||
321 | }) | ||
322 | |||
323 | return backgroundContext | ||
324 | } | ||
325 | |||
326 | // RegisterTestRequest registers the HTTP request req for testing, such that | ||
327 | // any API calls are sent to the provided URL. It returns a closure to delete | ||
328 | // the registration. | ||
329 | // It should only be used by aetest package. | ||
330 | func RegisterTestRequest(req *http.Request, apiURL *url.URL, decorate func(netcontext.Context) netcontext.Context) (*http.Request, func()) { | ||
331 | c := &context{ | ||
332 | req: req, | ||
333 | apiURL: apiURL, | ||
334 | } | ||
335 | ctx := withContext(decorate(req.Context()), c) | ||
336 | req = req.WithContext(ctx) | ||
337 | c.req = req | ||
338 | return req, func() {} | ||
339 | } | ||
340 | |||
341 | var errTimeout = &CallError{ | ||
342 | Detail: "Deadline exceeded", | ||
343 | Code: int32(remotepb.RpcError_CANCELLED), | ||
344 | Timeout: true, | ||
345 | } | ||
346 | |||
347 | func (c *context) Header() http.Header { return c.outHeader } | ||
348 | |||
349 | // Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status | ||
350 | // codes do not permit a response body (nor response entity headers such as | ||
351 | // Content-Length, Content-Type, etc). | ||
352 | func bodyAllowedForStatus(status int) bool { | ||
353 | switch { | ||
354 | case status >= 100 && status <= 199: | ||
355 | return false | ||
356 | case status == 204: | ||
357 | return false | ||
358 | case status == 304: | ||
359 | return false | ||
360 | } | ||
361 | return true | ||
362 | } | ||
363 | |||
364 | func (c *context) Write(b []byte) (int, error) { | ||
365 | if c.outCode == 0 { | ||
366 | c.WriteHeader(http.StatusOK) | ||
367 | } | ||
368 | if len(b) > 0 && !bodyAllowedForStatus(c.outCode) { | ||
369 | return 0, http.ErrBodyNotAllowed | ||
370 | } | ||
371 | c.outBody = append(c.outBody, b...) | ||
372 | return len(b), nil | ||
373 | } | ||
374 | |||
375 | func (c *context) WriteHeader(code int) { | ||
376 | if c.outCode != 0 { | ||
377 | logf(c, 3, "WriteHeader called multiple times on request.") // error level | ||
378 | return | ||
379 | } | ||
380 | c.outCode = code | ||
381 | } | ||
382 | |||
383 | func (c *context) post(body []byte, timeout time.Duration) (b []byte, err error) { | ||
384 | hreq := &http.Request{ | ||
385 | Method: "POST", | ||
386 | URL: c.apiURL, | ||
387 | Header: http.Header{ | ||
388 | apiEndpointHeader: apiEndpointHeaderValue, | ||
389 | apiMethodHeader: apiMethodHeaderValue, | ||
390 | apiContentType: apiContentTypeValue, | ||
391 | apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)}, | ||
392 | }, | ||
393 | Body: ioutil.NopCloser(bytes.NewReader(body)), | ||
394 | ContentLength: int64(len(body)), | ||
395 | Host: c.apiURL.Host, | ||
396 | } | ||
397 | if info := c.req.Header.Get(dapperHeader); info != "" { | ||
398 | hreq.Header.Set(dapperHeader, info) | ||
399 | } | ||
400 | if info := c.req.Header.Get(traceHeader); info != "" { | ||
401 | hreq.Header.Set(traceHeader, info) | ||
402 | } | ||
403 | |||
404 | tr := apiHTTPClient.Transport.(*http.Transport) | ||
405 | |||
406 | var timedOut int32 // atomic; set to 1 if timed out | ||
407 | t := time.AfterFunc(timeout, func() { | ||
408 | atomic.StoreInt32(&timedOut, 1) | ||
409 | tr.CancelRequest(hreq) | ||
410 | }) | ||
411 | defer t.Stop() | ||
412 | defer func() { | ||
413 | // Check if timeout was exceeded. | ||
414 | if atomic.LoadInt32(&timedOut) != 0 { | ||
415 | err = errTimeout | ||
416 | } | ||
417 | }() | ||
418 | |||
419 | hresp, err := apiHTTPClient.Do(hreq) | ||
420 | if err != nil { | ||
421 | return nil, &CallError{ | ||
422 | Detail: fmt.Sprintf("service bridge HTTP failed: %v", err), | ||
423 | Code: int32(remotepb.RpcError_UNKNOWN), | ||
424 | } | ||
425 | } | ||
426 | defer hresp.Body.Close() | ||
427 | hrespBody, err := ioutil.ReadAll(hresp.Body) | ||
428 | if hresp.StatusCode != 200 { | ||
429 | return nil, &CallError{ | ||
430 | Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody), | ||
431 | Code: int32(remotepb.RpcError_UNKNOWN), | ||
432 | } | ||
433 | } | ||
434 | if err != nil { | ||
435 | return nil, &CallError{ | ||
436 | Detail: fmt.Sprintf("service bridge response bad: %v", err), | ||
437 | Code: int32(remotepb.RpcError_UNKNOWN), | ||
438 | } | ||
439 | } | ||
440 | return hrespBody, nil | ||
441 | } | ||
442 | |||
443 | func Call(ctx netcontext.Context, service, method string, in, out proto.Message) error { | ||
444 | if ns := NamespaceFromContext(ctx); ns != "" { | ||
445 | if fn, ok := NamespaceMods[service]; ok { | ||
446 | fn(in, ns) | ||
447 | } | ||
448 | } | ||
449 | |||
450 | if f, ctx, ok := callOverrideFromContext(ctx); ok { | ||
451 | return f(ctx, service, method, in, out) | ||
452 | } | ||
453 | |||
454 | // Handle already-done contexts quickly. | ||
455 | select { | ||
456 | case <-ctx.Done(): | ||
457 | return ctx.Err() | ||
458 | default: | ||
459 | } | ||
460 | |||
461 | c := fromContext(ctx) | ||
462 | if c == nil { | ||
463 | // Give a good error message rather than a panic lower down. | ||
464 | return errNotAppEngineContext | ||
465 | } | ||
466 | |||
467 | // Apply transaction modifications if we're in a transaction. | ||
468 | if t := transactionFromContext(ctx); t != nil { | ||
469 | if t.finished { | ||
470 | return errors.New("transaction context has expired") | ||
471 | } | ||
472 | applyTransaction(in, &t.transaction) | ||
473 | } | ||
474 | |||
475 | // Default RPC timeout is 60s. | ||
476 | timeout := 60 * time.Second | ||
477 | if deadline, ok := ctx.Deadline(); ok { | ||
478 | timeout = deadline.Sub(time.Now()) | ||
479 | } | ||
480 | |||
481 | data, err := proto.Marshal(in) | ||
482 | if err != nil { | ||
483 | return err | ||
484 | } | ||
485 | |||
486 | ticket := c.req.Header.Get(ticketHeader) | ||
487 | // Use a test ticket under test environment. | ||
488 | if ticket == "" { | ||
489 | if appid := ctx.Value(&appIDOverrideKey); appid != nil { | ||
490 | ticket = appid.(string) + defaultTicketSuffix | ||
491 | } | ||
492 | } | ||
493 | // Fall back to use background ticket when the request ticket is not available in Flex or dev_appserver. | ||
494 | if ticket == "" { | ||
495 | ticket = DefaultTicket() | ||
496 | } | ||
497 | req := &remotepb.Request{ | ||
498 | ServiceName: &service, | ||
499 | Method: &method, | ||
500 | Request: data, | ||
501 | RequestId: &ticket, | ||
502 | } | ||
503 | hreqBody, err := proto.Marshal(req) | ||
504 | if err != nil { | ||
505 | return err | ||
506 | } | ||
507 | |||
508 | hrespBody, err := c.post(hreqBody, timeout) | ||
509 | if err != nil { | ||
510 | return err | ||
511 | } | ||
512 | |||
513 | res := &remotepb.Response{} | ||
514 | if err := proto.Unmarshal(hrespBody, res); err != nil { | ||
515 | return err | ||
516 | } | ||
517 | if res.RpcError != nil { | ||
518 | ce := &CallError{ | ||
519 | Detail: res.RpcError.GetDetail(), | ||
520 | Code: *res.RpcError.Code, | ||
521 | } | ||
522 | switch remotepb.RpcError_ErrorCode(ce.Code) { | ||
523 | case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED: | ||
524 | ce.Timeout = true | ||
525 | } | ||
526 | return ce | ||
527 | } | ||
528 | if res.ApplicationError != nil { | ||
529 | return &APIError{ | ||
530 | Service: *req.ServiceName, | ||
531 | Detail: res.ApplicationError.GetDetail(), | ||
532 | Code: *res.ApplicationError.Code, | ||
533 | } | ||
534 | } | ||
535 | if res.Exception != nil || res.JavaException != nil { | ||
536 | // This shouldn't happen, but let's be defensive. | ||
537 | return &CallError{ | ||
538 | Detail: "service bridge returned exception", | ||
539 | Code: int32(remotepb.RpcError_UNKNOWN), | ||
540 | } | ||
541 | } | ||
542 | return proto.Unmarshal(res.Response, out) | ||
543 | } | ||
544 | |||
545 | func (c *context) Request() *http.Request { | ||
546 | return c.req | ||
547 | } | ||
548 | |||
549 | func (c *context) addLogLine(ll *logpb.UserAppLogLine) { | ||
550 | // Truncate long log lines. | ||
551 | // TODO(dsymonds): Check if this is still necessary. | ||
552 | const lim = 8 << 10 | ||
553 | if len(*ll.Message) > lim { | ||
554 | suffix := fmt.Sprintf("...(length %d)", len(*ll.Message)) | ||
555 | ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix) | ||
556 | } | ||
557 | |||
558 | c.pendingLogs.Lock() | ||
559 | c.pendingLogs.lines = append(c.pendingLogs.lines, ll) | ||
560 | c.pendingLogs.Unlock() | ||
561 | } | ||
562 | |||
563 | var logLevelName = map[int64]string{ | ||
564 | 0: "DEBUG", | ||
565 | 1: "INFO", | ||
566 | 2: "WARNING", | ||
567 | 3: "ERROR", | ||
568 | 4: "CRITICAL", | ||
569 | } | ||
570 | |||
571 | func logf(c *context, level int64, format string, args ...interface{}) { | ||
572 | if c == nil { | ||
573 | panic("not an App Engine context") | ||
574 | } | ||
575 | s := fmt.Sprintf(format, args...) | ||
576 | s = strings.TrimRight(s, "\n") // Remove any trailing newline characters. | ||
577 | c.addLogLine(&logpb.UserAppLogLine{ | ||
578 | TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3), | ||
579 | Level: &level, | ||
580 | Message: &s, | ||
581 | }) | ||
582 | // Only duplicate log to stderr if not running on App Engine second generation | ||
583 | if !IsSecondGen() { | ||
584 | log.Print(logLevelName[level] + ": " + s) | ||
585 | } | ||
586 | } | ||
587 | |||
588 | // flushLog attempts to flush any pending logs to the appserver. | ||
589 | // It should not be called concurrently. | ||
590 | func (c *context) flushLog(force bool) (flushed bool) { | ||
591 | c.pendingLogs.Lock() | ||
592 | // Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious. | ||
593 | n, rem := 0, 30<<20 | ||
594 | for ; n < len(c.pendingLogs.lines); n++ { | ||
595 | ll := c.pendingLogs.lines[n] | ||
596 | // Each log line will require about 3 bytes of overhead. | ||
597 | nb := proto.Size(ll) + 3 | ||
598 | if nb > rem { | ||
599 | break | ||
600 | } | ||
601 | rem -= nb | ||
602 | } | ||
603 | lines := c.pendingLogs.lines[:n] | ||
604 | c.pendingLogs.lines = c.pendingLogs.lines[n:] | ||
605 | c.pendingLogs.Unlock() | ||
606 | |||
607 | if len(lines) == 0 && !force { | ||
608 | // Nothing to flush. | ||
609 | return false | ||
610 | } | ||
611 | |||
612 | rescueLogs := false | ||
613 | defer func() { | ||
614 | if rescueLogs { | ||
615 | c.pendingLogs.Lock() | ||
616 | c.pendingLogs.lines = append(lines, c.pendingLogs.lines...) | ||
617 | c.pendingLogs.Unlock() | ||
618 | } | ||
619 | }() | ||
620 | |||
621 | buf, err := proto.Marshal(&logpb.UserAppLogGroup{ | ||
622 | LogLine: lines, | ||
623 | }) | ||
624 | if err != nil { | ||
625 | log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err) | ||
626 | rescueLogs = true | ||
627 | return false | ||
628 | } | ||
629 | |||
630 | req := &logpb.FlushRequest{ | ||
631 | Logs: buf, | ||
632 | } | ||
633 | res := &basepb.VoidProto{} | ||
634 | c.pendingLogs.Lock() | ||
635 | c.pendingLogs.flushes++ | ||
636 | c.pendingLogs.Unlock() | ||
637 | if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil { | ||
638 | log.Printf("internal.flushLog: Flush RPC: %v", err) | ||
639 | rescueLogs = true | ||
640 | return false | ||
641 | } | ||
642 | return true | ||
643 | } | ||
644 | |||
645 | const ( | ||
646 | // Log flushing parameters. | ||
647 | flushInterval = 1 * time.Second | ||
648 | forceFlushInterval = 60 * time.Second | ||
649 | ) | ||
650 | |||
651 | func (c *context) logFlusher(stop <-chan int) { | ||
652 | lastFlush := time.Now() | ||
653 | tick := time.NewTicker(flushInterval) | ||
654 | for { | ||
655 | select { | ||
656 | case <-stop: | ||
657 | // Request finished. | ||
658 | tick.Stop() | ||
659 | return | ||
660 | case <-tick.C: | ||
661 | force := time.Now().Sub(lastFlush) > forceFlushInterval | ||
662 | if c.flushLog(force) { | ||
663 | lastFlush = time.Now() | ||
664 | } | ||
665 | } | ||
666 | } | ||
667 | } | ||
668 | |||
669 | func ContextForTesting(req *http.Request) netcontext.Context { | ||
670 | return toContext(&context{req: req}) | ||
671 | } | ||