3 * Copyright 2018 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
30 "github.com/golang/protobuf/proto"
31 pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
32 "google.golang.org/grpc/grpclog"
36 defaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
39 // SetDefaultSink sets the sink where binary logs will be written to.
41 // Not thread safe. Only set during initialization.
42 func SetDefaultSink(s Sink) {
43 if defaultSink != nil {
49 // Sink writes log entry into the binary log sink.
51 // Write will be called to write the log entry into the sink.
53 // It should be thread-safe so it can be called in parallel.
54 Write(*pb.GrpcLogEntry) error
55 // Close will be called when the Sink is replaced by a new Sink.
59 type noopSink struct{}
61 func (ns *noopSink) Write(*pb.GrpcLogEntry) error { return nil }
62 func (ns *noopSink) Close() error { return nil }
64 // newWriterSink creates a binary log sink with the given writer.
66 // Write() marshalls the proto message and writes it to the given writer. Each
67 // message is prefixed with a 4 byte big endian unsigned integer as the length.
69 // No buffer is done, Close() doesn't try to close the writer.
70 func newWriterSink(w io.Writer) *writerSink {
71 return &writerSink{out: w}
74 type writerSink struct {
78 func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
79 b, err := proto.Marshal(e)
81 grpclog.Infof("binary logging: failed to marshal proto message: %v", err)
83 hdr := make([]byte, 4)
84 binary.BigEndian.PutUint32(hdr, uint32(len(b)))
85 if _, err := ws.out.Write(hdr); err != nil {
88 if _, err := ws.out.Write(b); err != nil {
94 func (ws *writerSink) Close() error { return nil }
96 type bufWriteCloserSink struct {
99 out *writerSink // out is built on buf.
100 buf *bufio.Writer // buf is kept for flush.
102 writeStartOnce sync.Once
103 writeTicker *time.Ticker
106 func (fs *bufWriteCloserSink) Write(e *pb.GrpcLogEntry) error {
107 // Start the write loop when Write is called.
108 fs.writeStartOnce.Do(fs.startFlushGoroutine)
110 if err := fs.out.Write(e); err != nil {
119 bufFlushDuration = 60 * time.Second
122 func (fs *bufWriteCloserSink) startFlushGoroutine() {
123 fs.writeTicker = time.NewTicker(bufFlushDuration)
125 for range fs.writeTicker.C {
133 func (fs *bufWriteCloserSink) Close() error {
134 if fs.writeTicker != nil {
135 fs.writeTicker.Stop()
145 func newBufWriteCloserSink(o io.WriteCloser) Sink {
146 bufW := bufio.NewWriter(o)
147 return &bufWriteCloserSink{
149 out: newWriterSink(bufW),
154 // NewTempFileSink creates a temp file and returns a Sink that writes to this
156 func NewTempFileSink() (Sink, error) {
157 tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt")
159 return nil, fmt.Errorf("failed to create temp file: %v", err)
161 return newBufWriteCloserSink(tempFile), nil