]>
Commit | Line | Data |
---|---|---|
107c1cdb ND |
1 | /* |
2 | * | |
3 | * Copyright 2018 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package binarylog | |
20 | ||
21 | import ( | |
22 | "bufio" | |
23 | "encoding/binary" | |
24 | "fmt" | |
25 | "io" | |
26 | "io/ioutil" | |
27 | "sync" | |
28 | "time" | |
29 | ||
30 | "github.com/golang/protobuf/proto" | |
31 | pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" | |
32 | "google.golang.org/grpc/grpclog" | |
33 | ) | |
34 | ||
35 | var ( | |
36 | defaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp). | |
37 | ) | |
38 | ||
39 | // SetDefaultSink sets the sink where binary logs will be written to. | |
40 | // | |
41 | // Not thread safe. Only set during initialization. | |
42 | func SetDefaultSink(s Sink) { | |
43 | if defaultSink != nil { | |
44 | defaultSink.Close() | |
45 | } | |
46 | defaultSink = s | |
47 | } | |
48 | ||
49 | // Sink writes log entry into the binary log sink. | |
50 | type Sink interface { | |
51 | // Write will be called to write the log entry into the sink. | |
52 | // | |
53 | // It should be thread-safe so it can be called in parallel. | |
54 | Write(*pb.GrpcLogEntry) error | |
55 | // Close will be called when the Sink is replaced by a new Sink. | |
56 | Close() error | |
57 | } | |
58 | ||
59 | type noopSink struct{} | |
60 | ||
61 | func (ns *noopSink) Write(*pb.GrpcLogEntry) error { return nil } | |
62 | func (ns *noopSink) Close() error { return nil } | |
63 | ||
64 | // newWriterSink creates a binary log sink with the given writer. | |
65 | // | |
66 | // Write() marshalls the proto message and writes it to the given writer. Each | |
67 | // message is prefixed with a 4 byte big endian unsigned integer as the length. | |
68 | // | |
69 | // No buffer is done, Close() doesn't try to close the writer. | |
70 | func newWriterSink(w io.Writer) *writerSink { | |
71 | return &writerSink{out: w} | |
72 | } | |
73 | ||
74 | type writerSink struct { | |
75 | out io.Writer | |
76 | } | |
77 | ||
78 | func (ws *writerSink) Write(e *pb.GrpcLogEntry) error { | |
79 | b, err := proto.Marshal(e) | |
80 | if err != nil { | |
81 | grpclog.Infof("binary logging: failed to marshal proto message: %v", err) | |
82 | } | |
83 | hdr := make([]byte, 4) | |
84 | binary.BigEndian.PutUint32(hdr, uint32(len(b))) | |
85 | if _, err := ws.out.Write(hdr); err != nil { | |
86 | return err | |
87 | } | |
88 | if _, err := ws.out.Write(b); err != nil { | |
89 | return err | |
90 | } | |
91 | return nil | |
92 | } | |
93 | ||
94 | func (ws *writerSink) Close() error { return nil } | |
95 | ||
96 | type bufWriteCloserSink struct { | |
97 | mu sync.Mutex | |
98 | closer io.Closer | |
99 | out *writerSink // out is built on buf. | |
100 | buf *bufio.Writer // buf is kept for flush. | |
101 | ||
102 | writeStartOnce sync.Once | |
103 | writeTicker *time.Ticker | |
104 | } | |
105 | ||
106 | func (fs *bufWriteCloserSink) Write(e *pb.GrpcLogEntry) error { | |
107 | // Start the write loop when Write is called. | |
108 | fs.writeStartOnce.Do(fs.startFlushGoroutine) | |
109 | fs.mu.Lock() | |
110 | if err := fs.out.Write(e); err != nil { | |
111 | fs.mu.Unlock() | |
112 | return err | |
113 | } | |
114 | fs.mu.Unlock() | |
115 | return nil | |
116 | } | |
117 | ||
118 | const ( | |
119 | bufFlushDuration = 60 * time.Second | |
120 | ) | |
121 | ||
122 | func (fs *bufWriteCloserSink) startFlushGoroutine() { | |
123 | fs.writeTicker = time.NewTicker(bufFlushDuration) | |
124 | go func() { | |
125 | for range fs.writeTicker.C { | |
126 | fs.mu.Lock() | |
127 | fs.buf.Flush() | |
128 | fs.mu.Unlock() | |
129 | } | |
130 | }() | |
131 | } | |
132 | ||
133 | func (fs *bufWriteCloserSink) Close() error { | |
134 | if fs.writeTicker != nil { | |
135 | fs.writeTicker.Stop() | |
136 | } | |
137 | fs.mu.Lock() | |
138 | fs.buf.Flush() | |
139 | fs.closer.Close() | |
140 | fs.out.Close() | |
141 | fs.mu.Unlock() | |
142 | return nil | |
143 | } | |
144 | ||
145 | func newBufWriteCloserSink(o io.WriteCloser) Sink { | |
146 | bufW := bufio.NewWriter(o) | |
147 | return &bufWriteCloserSink{ | |
148 | closer: o, | |
149 | out: newWriterSink(bufW), | |
150 | buf: bufW, | |
151 | } | |
152 | } | |
153 | ||
154 | // NewTempFileSink creates a temp file and returns a Sink that writes to this | |
155 | // file. | |
156 | func NewTempFileSink() (Sink, error) { | |
157 | tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt") | |
158 | if err != nil { | |
159 | return nil, fmt.Errorf("failed to create temp file: %v", err) | |
160 | } | |
161 | return newBufWriteCloserSink(tempFile), nil | |
162 | } |