]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blob - vendor/google.golang.org/grpc/internal/binarylog/sink.go
Upgrade to 0.12
[github/fretlink/terraform-provider-statuscake.git] / vendor / google.golang.org / grpc / internal / binarylog / sink.go
1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 package binarylog
20
21 import (
22 "bufio"
23 "encoding/binary"
24 "fmt"
25 "io"
26 "io/ioutil"
27 "sync"
28 "time"
29
30 "github.com/golang/protobuf/proto"
31 pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
32 "google.golang.org/grpc/grpclog"
33 )
34
35 var (
36 defaultSink Sink = &noopSink{} // TODO(blog): change this default (file in /tmp).
37 )
38
39 // SetDefaultSink sets the sink where binary logs will be written to.
40 //
41 // Not thread safe. Only set during initialization.
42 func SetDefaultSink(s Sink) {
43 if defaultSink != nil {
44 defaultSink.Close()
45 }
46 defaultSink = s
47 }
48
49 // Sink writes log entry into the binary log sink.
50 type Sink interface {
51 // Write will be called to write the log entry into the sink.
52 //
53 // It should be thread-safe so it can be called in parallel.
54 Write(*pb.GrpcLogEntry) error
55 // Close will be called when the Sink is replaced by a new Sink.
56 Close() error
57 }
58
59 type noopSink struct{}
60
61 func (ns *noopSink) Write(*pb.GrpcLogEntry) error { return nil }
62 func (ns *noopSink) Close() error { return nil }
63
64 // newWriterSink creates a binary log sink with the given writer.
65 //
66 // Write() marshalls the proto message and writes it to the given writer. Each
67 // message is prefixed with a 4 byte big endian unsigned integer as the length.
68 //
69 // No buffer is done, Close() doesn't try to close the writer.
70 func newWriterSink(w io.Writer) *writerSink {
71 return &writerSink{out: w}
72 }
73
74 type writerSink struct {
75 out io.Writer
76 }
77
78 func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
79 b, err := proto.Marshal(e)
80 if err != nil {
81 grpclog.Infof("binary logging: failed to marshal proto message: %v", err)
82 }
83 hdr := make([]byte, 4)
84 binary.BigEndian.PutUint32(hdr, uint32(len(b)))
85 if _, err := ws.out.Write(hdr); err != nil {
86 return err
87 }
88 if _, err := ws.out.Write(b); err != nil {
89 return err
90 }
91 return nil
92 }
93
94 func (ws *writerSink) Close() error { return nil }
95
96 type bufWriteCloserSink struct {
97 mu sync.Mutex
98 closer io.Closer
99 out *writerSink // out is built on buf.
100 buf *bufio.Writer // buf is kept for flush.
101
102 writeStartOnce sync.Once
103 writeTicker *time.Ticker
104 }
105
106 func (fs *bufWriteCloserSink) Write(e *pb.GrpcLogEntry) error {
107 // Start the write loop when Write is called.
108 fs.writeStartOnce.Do(fs.startFlushGoroutine)
109 fs.mu.Lock()
110 if err := fs.out.Write(e); err != nil {
111 fs.mu.Unlock()
112 return err
113 }
114 fs.mu.Unlock()
115 return nil
116 }
117
118 const (
119 bufFlushDuration = 60 * time.Second
120 )
121
122 func (fs *bufWriteCloserSink) startFlushGoroutine() {
123 fs.writeTicker = time.NewTicker(bufFlushDuration)
124 go func() {
125 for range fs.writeTicker.C {
126 fs.mu.Lock()
127 fs.buf.Flush()
128 fs.mu.Unlock()
129 }
130 }()
131 }
132
133 func (fs *bufWriteCloserSink) Close() error {
134 if fs.writeTicker != nil {
135 fs.writeTicker.Stop()
136 }
137 fs.mu.Lock()
138 fs.buf.Flush()
139 fs.closer.Close()
140 fs.out.Close()
141 fs.mu.Unlock()
142 return nil
143 }
144
145 func newBufWriteCloserSink(o io.WriteCloser) Sink {
146 bufW := bufio.NewWriter(o)
147 return &bufWriteCloserSink{
148 closer: o,
149 out: newWriterSink(bufW),
150 buf: bufW,
151 }
152 }
153
154 // NewTempFileSink creates a temp file and returns a Sink that writes to this
155 // file.
156 func NewTempFileSink() (Sink, error) {
157 tempFile, err := ioutil.TempFile("/tmp", "grpcgo_binarylog_*.txt")
158 if err != nil {
159 return nil, fmt.Errorf("failed to create temp file: %v", err)
160 }
161 return newBufWriteCloserSink(tempFile), nil
162 }