]>
Commit | Line | Data |
---|---|---|
107c1cdb ND |
1 | /* |
2 | * | |
3 | * Copyright 2018 gRPC authors. | |
4 | * | |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); | |
6 | * you may not use this file except in compliance with the License. | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, software | |
12 | * distributed under the License is distributed on an "AS IS" BASIS, | |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
14 | * See the License for the specific language governing permissions and | |
15 | * limitations under the License. | |
16 | * | |
17 | */ | |
18 | ||
19 | package binarylog | |
20 | ||
21 | import ( | |
22 | "net" | |
23 | "strings" | |
24 | "sync/atomic" | |
25 | "time" | |
26 | ||
27 | "github.com/golang/protobuf/proto" | |
28 | "github.com/golang/protobuf/ptypes" | |
29 | pb "google.golang.org/grpc/binarylog/grpc_binarylog_v1" | |
30 | "google.golang.org/grpc/grpclog" | |
31 | "google.golang.org/grpc/metadata" | |
32 | "google.golang.org/grpc/status" | |
33 | ) | |
34 | ||
35 | type callIDGenerator struct { | |
36 | id uint64 | |
37 | } | |
38 | ||
39 | func (g *callIDGenerator) next() uint64 { | |
40 | id := atomic.AddUint64(&g.id, 1) | |
41 | return id | |
42 | } | |
43 | ||
44 | // reset is for testing only, and doesn't need to be thread safe. | |
45 | func (g *callIDGenerator) reset() { | |
46 | g.id = 0 | |
47 | } | |
48 | ||
49 | var idGen callIDGenerator | |
50 | ||
51 | // MethodLogger is the sub-logger for each method. | |
52 | type MethodLogger struct { | |
53 | headerMaxLen, messageMaxLen uint64 | |
54 | ||
55 | callID uint64 | |
56 | idWithinCallGen *callIDGenerator | |
57 | ||
58 | sink Sink // TODO(blog): make this plugable. | |
59 | } | |
60 | ||
61 | func newMethodLogger(h, m uint64) *MethodLogger { | |
62 | return &MethodLogger{ | |
63 | headerMaxLen: h, | |
64 | messageMaxLen: m, | |
65 | ||
66 | callID: idGen.next(), | |
67 | idWithinCallGen: &callIDGenerator{}, | |
68 | ||
69 | sink: defaultSink, // TODO(blog): make it plugable. | |
70 | } | |
71 | } | |
72 | ||
73 | // Log creates a proto binary log entry, and logs it to the sink. | |
74 | func (ml *MethodLogger) Log(c LogEntryConfig) { | |
75 | m := c.toProto() | |
76 | timestamp, _ := ptypes.TimestampProto(time.Now()) | |
77 | m.Timestamp = timestamp | |
78 | m.CallId = ml.callID | |
79 | m.SequenceIdWithinCall = ml.idWithinCallGen.next() | |
80 | ||
81 | switch pay := m.Payload.(type) { | |
82 | case *pb.GrpcLogEntry_ClientHeader: | |
83 | m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata()) | |
84 | case *pb.GrpcLogEntry_ServerHeader: | |
85 | m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata()) | |
86 | case *pb.GrpcLogEntry_Message: | |
87 | m.PayloadTruncated = ml.truncateMessage(pay.Message) | |
88 | } | |
89 | ||
90 | ml.sink.Write(m) | |
91 | } | |
92 | ||
93 | func (ml *MethodLogger) truncateMetadata(mdPb *pb.Metadata) (truncated bool) { | |
94 | if ml.headerMaxLen == maxUInt { | |
95 | return false | |
96 | } | |
97 | var ( | |
98 | bytesLimit = ml.headerMaxLen | |
99 | index int | |
100 | ) | |
101 | // At the end of the loop, index will be the first entry where the total | |
102 | // size is greater than the limit: | |
103 | // | |
104 | // len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr. | |
105 | for ; index < len(mdPb.Entry); index++ { | |
106 | entry := mdPb.Entry[index] | |
107 | if entry.Key == "grpc-trace-bin" { | |
108 | // "grpc-trace-bin" is a special key. It's kept in the log entry, | |
109 | // but not counted towards the size limit. | |
110 | continue | |
111 | } | |
112 | currentEntryLen := uint64(len(entry.Value)) | |
113 | if currentEntryLen > bytesLimit { | |
114 | break | |
115 | } | |
116 | bytesLimit -= currentEntryLen | |
117 | } | |
118 | truncated = index < len(mdPb.Entry) | |
119 | mdPb.Entry = mdPb.Entry[:index] | |
120 | return truncated | |
121 | } | |
122 | ||
123 | func (ml *MethodLogger) truncateMessage(msgPb *pb.Message) (truncated bool) { | |
124 | if ml.messageMaxLen == maxUInt { | |
125 | return false | |
126 | } | |
127 | if ml.messageMaxLen >= uint64(len(msgPb.Data)) { | |
128 | return false | |
129 | } | |
130 | msgPb.Data = msgPb.Data[:ml.messageMaxLen] | |
131 | return true | |
132 | } | |
133 | ||
134 | // LogEntryConfig represents the configuration for binary log entry. | |
135 | type LogEntryConfig interface { | |
136 | toProto() *pb.GrpcLogEntry | |
137 | } | |
138 | ||
139 | // ClientHeader configs the binary log entry to be a ClientHeader entry. | |
140 | type ClientHeader struct { | |
141 | OnClientSide bool | |
142 | Header metadata.MD | |
143 | MethodName string | |
144 | Authority string | |
145 | Timeout time.Duration | |
146 | // PeerAddr is required only when it's on server side. | |
147 | PeerAddr net.Addr | |
148 | } | |
149 | ||
150 | func (c *ClientHeader) toProto() *pb.GrpcLogEntry { | |
151 | // This function doesn't need to set all the fields (e.g. seq ID). The Log | |
152 | // function will set the fields when necessary. | |
153 | clientHeader := &pb.ClientHeader{ | |
154 | Metadata: mdToMetadataProto(c.Header), | |
155 | MethodName: c.MethodName, | |
156 | Authority: c.Authority, | |
157 | } | |
158 | if c.Timeout > 0 { | |
159 | clientHeader.Timeout = ptypes.DurationProto(c.Timeout) | |
160 | } | |
161 | ret := &pb.GrpcLogEntry{ | |
162 | Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER, | |
163 | Payload: &pb.GrpcLogEntry_ClientHeader{ | |
164 | ClientHeader: clientHeader, | |
165 | }, | |
166 | } | |
167 | if c.OnClientSide { | |
168 | ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT | |
169 | } else { | |
170 | ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER | |
171 | } | |
172 | if c.PeerAddr != nil { | |
173 | ret.Peer = addrToProto(c.PeerAddr) | |
174 | } | |
175 | return ret | |
176 | } | |
177 | ||
178 | // ServerHeader configs the binary log entry to be a ServerHeader entry. | |
179 | type ServerHeader struct { | |
180 | OnClientSide bool | |
181 | Header metadata.MD | |
182 | // PeerAddr is required only when it's on client side. | |
183 | PeerAddr net.Addr | |
184 | } | |
185 | ||
186 | func (c *ServerHeader) toProto() *pb.GrpcLogEntry { | |
187 | ret := &pb.GrpcLogEntry{ | |
188 | Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER, | |
189 | Payload: &pb.GrpcLogEntry_ServerHeader{ | |
190 | ServerHeader: &pb.ServerHeader{ | |
191 | Metadata: mdToMetadataProto(c.Header), | |
192 | }, | |
193 | }, | |
194 | } | |
195 | if c.OnClientSide { | |
196 | ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT | |
197 | } else { | |
198 | ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER | |
199 | } | |
200 | if c.PeerAddr != nil { | |
201 | ret.Peer = addrToProto(c.PeerAddr) | |
202 | } | |
203 | return ret | |
204 | } | |
205 | ||
206 | // ClientMessage configs the binary log entry to be a ClientMessage entry. | |
207 | type ClientMessage struct { | |
208 | OnClientSide bool | |
209 | // Message can be a proto.Message or []byte. Other messages formats are not | |
210 | // supported. | |
211 | Message interface{} | |
212 | } | |
213 | ||
214 | func (c *ClientMessage) toProto() *pb.GrpcLogEntry { | |
215 | var ( | |
216 | data []byte | |
217 | err error | |
218 | ) | |
219 | if m, ok := c.Message.(proto.Message); ok { | |
220 | data, err = proto.Marshal(m) | |
221 | if err != nil { | |
222 | grpclog.Infof("binarylogging: failed to marshal proto message: %v", err) | |
223 | } | |
224 | } else if b, ok := c.Message.([]byte); ok { | |
225 | data = b | |
226 | } else { | |
227 | grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte") | |
228 | } | |
229 | ret := &pb.GrpcLogEntry{ | |
230 | Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE, | |
231 | Payload: &pb.GrpcLogEntry_Message{ | |
232 | Message: &pb.Message{ | |
233 | Length: uint32(len(data)), | |
234 | Data: data, | |
235 | }, | |
236 | }, | |
237 | } | |
238 | if c.OnClientSide { | |
239 | ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT | |
240 | } else { | |
241 | ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER | |
242 | } | |
243 | return ret | |
244 | } | |
245 | ||
246 | // ServerMessage configs the binary log entry to be a ServerMessage entry. | |
247 | type ServerMessage struct { | |
248 | OnClientSide bool | |
249 | // Message can be a proto.Message or []byte. Other messages formats are not | |
250 | // supported. | |
251 | Message interface{} | |
252 | } | |
253 | ||
254 | func (c *ServerMessage) toProto() *pb.GrpcLogEntry { | |
255 | var ( | |
256 | data []byte | |
257 | err error | |
258 | ) | |
259 | if m, ok := c.Message.(proto.Message); ok { | |
260 | data, err = proto.Marshal(m) | |
261 | if err != nil { | |
262 | grpclog.Infof("binarylogging: failed to marshal proto message: %v", err) | |
263 | } | |
264 | } else if b, ok := c.Message.([]byte); ok { | |
265 | data = b | |
266 | } else { | |
267 | grpclog.Infof("binarylogging: message to log is neither proto.message nor []byte") | |
268 | } | |
269 | ret := &pb.GrpcLogEntry{ | |
270 | Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE, | |
271 | Payload: &pb.GrpcLogEntry_Message{ | |
272 | Message: &pb.Message{ | |
273 | Length: uint32(len(data)), | |
274 | Data: data, | |
275 | }, | |
276 | }, | |
277 | } | |
278 | if c.OnClientSide { | |
279 | ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT | |
280 | } else { | |
281 | ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER | |
282 | } | |
283 | return ret | |
284 | } | |
285 | ||
286 | // ClientHalfClose configs the binary log entry to be a ClientHalfClose entry. | |
287 | type ClientHalfClose struct { | |
288 | OnClientSide bool | |
289 | } | |
290 | ||
291 | func (c *ClientHalfClose) toProto() *pb.GrpcLogEntry { | |
292 | ret := &pb.GrpcLogEntry{ | |
293 | Type: pb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE, | |
294 | Payload: nil, // No payload here. | |
295 | } | |
296 | if c.OnClientSide { | |
297 | ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT | |
298 | } else { | |
299 | ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER | |
300 | } | |
301 | return ret | |
302 | } | |
303 | ||
304 | // ServerTrailer configs the binary log entry to be a ServerTrailer entry. | |
305 | type ServerTrailer struct { | |
306 | OnClientSide bool | |
307 | Trailer metadata.MD | |
308 | // Err is the status error. | |
309 | Err error | |
310 | // PeerAddr is required only when it's on client side and the RPC is trailer | |
311 | // only. | |
312 | PeerAddr net.Addr | |
313 | } | |
314 | ||
315 | func (c *ServerTrailer) toProto() *pb.GrpcLogEntry { | |
316 | st, ok := status.FromError(c.Err) | |
317 | if !ok { | |
318 | grpclog.Info("binarylogging: error in trailer is not a status error") | |
319 | } | |
320 | var ( | |
321 | detailsBytes []byte | |
322 | err error | |
323 | ) | |
324 | stProto := st.Proto() | |
325 | if stProto != nil && len(stProto.Details) != 0 { | |
326 | detailsBytes, err = proto.Marshal(stProto) | |
327 | if err != nil { | |
328 | grpclog.Infof("binarylogging: failed to marshal status proto: %v", err) | |
329 | } | |
330 | } | |
331 | ret := &pb.GrpcLogEntry{ | |
332 | Type: pb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER, | |
333 | Payload: &pb.GrpcLogEntry_Trailer{ | |
334 | Trailer: &pb.Trailer{ | |
335 | Metadata: mdToMetadataProto(c.Trailer), | |
336 | StatusCode: uint32(st.Code()), | |
337 | StatusMessage: st.Message(), | |
338 | StatusDetails: detailsBytes, | |
339 | }, | |
340 | }, | |
341 | } | |
342 | if c.OnClientSide { | |
343 | ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT | |
344 | } else { | |
345 | ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER | |
346 | } | |
347 | if c.PeerAddr != nil { | |
348 | ret.Peer = addrToProto(c.PeerAddr) | |
349 | } | |
350 | return ret | |
351 | } | |
352 | ||
353 | // Cancel configs the binary log entry to be a Cancel entry. | |
354 | type Cancel struct { | |
355 | OnClientSide bool | |
356 | } | |
357 | ||
358 | func (c *Cancel) toProto() *pb.GrpcLogEntry { | |
359 | ret := &pb.GrpcLogEntry{ | |
360 | Type: pb.GrpcLogEntry_EVENT_TYPE_CANCEL, | |
361 | Payload: nil, | |
362 | } | |
363 | if c.OnClientSide { | |
364 | ret.Logger = pb.GrpcLogEntry_LOGGER_CLIENT | |
365 | } else { | |
366 | ret.Logger = pb.GrpcLogEntry_LOGGER_SERVER | |
367 | } | |
368 | return ret | |
369 | } | |
370 | ||
371 | // metadataKeyOmit returns whether the metadata entry with this key should be | |
372 | // omitted. | |
373 | func metadataKeyOmit(key string) bool { | |
374 | switch key { | |
375 | case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te": | |
376 | return true | |
377 | case "grpc-trace-bin": // grpc-trace-bin is special because it's visiable to users. | |
378 | return false | |
379 | } | |
380 | if strings.HasPrefix(key, "grpc-") { | |
381 | return true | |
382 | } | |
383 | return false | |
384 | } | |
385 | ||
386 | func mdToMetadataProto(md metadata.MD) *pb.Metadata { | |
387 | ret := &pb.Metadata{} | |
388 | for k, vv := range md { | |
389 | if metadataKeyOmit(k) { | |
390 | continue | |
391 | } | |
392 | for _, v := range vv { | |
393 | ret.Entry = append(ret.Entry, | |
394 | &pb.MetadataEntry{ | |
395 | Key: k, | |
396 | Value: []byte(v), | |
397 | }, | |
398 | ) | |
399 | } | |
400 | } | |
401 | return ret | |
402 | } | |
403 | ||
404 | func addrToProto(addr net.Addr) *pb.Address { | |
405 | ret := &pb.Address{} | |
406 | switch a := addr.(type) { | |
407 | case *net.TCPAddr: | |
408 | if a.IP.To4() != nil { | |
409 | ret.Type = pb.Address_TYPE_IPV4 | |
410 | } else if a.IP.To16() != nil { | |
411 | ret.Type = pb.Address_TYPE_IPV6 | |
412 | } else { | |
413 | ret.Type = pb.Address_TYPE_UNKNOWN | |
414 | // Do not set address and port fields. | |
415 | break | |
416 | } | |
417 | ret.Address = a.IP.String() | |
418 | ret.IpPort = uint32(a.Port) | |
419 | case *net.UnixAddr: | |
420 | ret.Type = pb.Address_TYPE_UNIX | |
421 | ret.Address = a.String() | |
422 | default: | |
423 | ret.Type = pb.Address_TYPE_UNKNOWN | |
424 | } | |
425 | return ret | |
426 | } |