]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | package csm |
2 | ||
3 | import ( | |
4 | "encoding/json" | |
5 | "net" | |
6 | "time" | |
7 | ||
8 | "github.com/aws/aws-sdk-go/aws" | |
9 | "github.com/aws/aws-sdk-go/aws/awserr" | |
10 | "github.com/aws/aws-sdk-go/aws/request" | |
11 | ) | |
12 | ||
13 | const ( | |
14 | // DefaultPort is used when no port is specified | |
15 | DefaultPort = "31000" | |
16 | ) | |
17 | ||
18 | // Reporter will gather metrics of API requests made and | |
19 | // send those metrics to the CSM endpoint. | |
20 | type Reporter struct { | |
21 | clientID string | |
22 | url string | |
23 | conn net.Conn | |
24 | metricsCh metricChan | |
25 | done chan struct{} | |
26 | } | |
27 | ||
28 | var ( | |
29 | sender *Reporter | |
30 | ) | |
31 | ||
32 | func connect(url string) error { | |
33 | const network = "udp" | |
34 | if err := sender.connect(network, url); err != nil { | |
35 | return err | |
36 | } | |
37 | ||
38 | if sender.done == nil { | |
39 | sender.done = make(chan struct{}) | |
40 | go sender.start() | |
41 | } | |
42 | ||
43 | return nil | |
44 | } | |
45 | ||
46 | func newReporter(clientID, url string) *Reporter { | |
47 | return &Reporter{ | |
48 | clientID: clientID, | |
49 | url: url, | |
50 | metricsCh: newMetricChan(MetricsChannelSize), | |
51 | } | |
52 | } | |
53 | ||
54 | func (rep *Reporter) sendAPICallAttemptMetric(r *request.Request) { | |
55 | if rep == nil { | |
56 | return | |
57 | } | |
58 | ||
59 | now := time.Now() | |
60 | creds, _ := r.Config.Credentials.Get() | |
61 | ||
62 | m := metric{ | |
63 | ClientID: aws.String(rep.clientID), | |
64 | API: aws.String(r.Operation.Name), | |
65 | Service: aws.String(r.ClientInfo.ServiceID), | |
66 | Timestamp: (*metricTime)(&now), | |
67 | UserAgent: aws.String(r.HTTPRequest.Header.Get("User-Agent")), | |
68 | Region: r.Config.Region, | |
69 | Type: aws.String("ApiCallAttempt"), | |
70 | Version: aws.Int(1), | |
71 | ||
72 | XAmzRequestID: aws.String(r.RequestID), | |
73 | ||
74 | AttemptCount: aws.Int(r.RetryCount + 1), | |
75 | AttemptLatency: aws.Int(int(now.Sub(r.AttemptTime).Nanoseconds() / int64(time.Millisecond))), | |
76 | AccessKey: aws.String(creds.AccessKeyID), | |
77 | } | |
78 | ||
79 | if r.HTTPResponse != nil { | |
80 | m.HTTPStatusCode = aws.Int(r.HTTPResponse.StatusCode) | |
81 | } | |
82 | ||
83 | if r.Error != nil { | |
84 | if awserr, ok := r.Error.(awserr.Error); ok { | |
107c1cdb | 85 | m.SetException(getMetricException(awserr)) |
15c0b25d AP |
86 | } |
87 | } | |
88 | ||
107c1cdb | 89 | m.TruncateFields() |
15c0b25d AP |
90 | rep.metricsCh.Push(m) |
91 | } | |
92 | ||
107c1cdb | 93 | func getMetricException(err awserr.Error) metricException { |
15c0b25d AP |
94 | msg := err.Error() |
95 | code := err.Code() | |
96 | ||
97 | switch code { | |
98 | case "RequestError", | |
99 | "SerializationError", | |
100 | request.CanceledErrorCode: | |
107c1cdb ND |
101 | return sdkException{ |
102 | requestException{exception: code, message: msg}, | |
103 | } | |
15c0b25d | 104 | default: |
107c1cdb ND |
105 | return awsException{ |
106 | requestException{exception: code, message: msg}, | |
107 | } | |
15c0b25d AP |
108 | } |
109 | } | |
110 | ||
111 | func (rep *Reporter) sendAPICallMetric(r *request.Request) { | |
112 | if rep == nil { | |
113 | return | |
114 | } | |
115 | ||
116 | now := time.Now() | |
117 | m := metric{ | |
107c1cdb ND |
118 | ClientID: aws.String(rep.clientID), |
119 | API: aws.String(r.Operation.Name), | |
120 | Service: aws.String(r.ClientInfo.ServiceID), | |
121 | Timestamp: (*metricTime)(&now), | |
122 | UserAgent: aws.String(r.HTTPRequest.Header.Get("User-Agent")), | |
123 | Type: aws.String("ApiCall"), | |
124 | AttemptCount: aws.Int(r.RetryCount + 1), | |
125 | Region: r.Config.Region, | |
126 | Latency: aws.Int(int(time.Now().Sub(r.Time) / time.Millisecond)), | |
127 | XAmzRequestID: aws.String(r.RequestID), | |
128 | MaxRetriesExceeded: aws.Int(boolIntValue(r.RetryCount >= r.MaxRetries())), | |
129 | } | |
130 | ||
131 | if r.HTTPResponse != nil { | |
132 | m.FinalHTTPStatusCode = aws.Int(r.HTTPResponse.StatusCode) | |
15c0b25d AP |
133 | } |
134 | ||
107c1cdb ND |
135 | if r.Error != nil { |
136 | if awserr, ok := r.Error.(awserr.Error); ok { | |
137 | m.SetFinalException(getMetricException(awserr)) | |
138 | } | |
139 | } | |
140 | ||
141 | m.TruncateFields() | |
142 | ||
15c0b25d AP |
143 | // TODO: Probably want to figure something out for logging dropped |
144 | // metrics | |
145 | rep.metricsCh.Push(m) | |
146 | } | |
147 | ||
148 | func (rep *Reporter) connect(network, url string) error { | |
149 | if rep.conn != nil { | |
150 | rep.conn.Close() | |
151 | } | |
152 | ||
153 | conn, err := net.Dial(network, url) | |
154 | if err != nil { | |
155 | return awserr.New("UDPError", "Could not connect", err) | |
156 | } | |
157 | ||
158 | rep.conn = conn | |
159 | ||
160 | return nil | |
161 | } | |
162 | ||
163 | func (rep *Reporter) close() { | |
164 | if rep.done != nil { | |
165 | close(rep.done) | |
166 | } | |
167 | ||
168 | rep.metricsCh.Pause() | |
169 | } | |
170 | ||
171 | func (rep *Reporter) start() { | |
172 | defer func() { | |
173 | rep.metricsCh.Pause() | |
174 | }() | |
175 | ||
176 | for { | |
177 | select { | |
178 | case <-rep.done: | |
179 | rep.done = nil | |
180 | return | |
181 | case m := <-rep.metricsCh.ch: | |
182 | // TODO: What to do with this error? Probably should just log | |
183 | b, err := json.Marshal(m) | |
184 | if err != nil { | |
185 | continue | |
186 | } | |
187 | ||
188 | rep.conn.Write(b) | |
189 | } | |
190 | } | |
191 | } | |
192 | ||
193 | // Pause will pause the metric channel preventing any new metrics from | |
194 | // being added. | |
195 | func (rep *Reporter) Pause() { | |
196 | lock.Lock() | |
197 | defer lock.Unlock() | |
198 | ||
199 | if rep == nil { | |
200 | return | |
201 | } | |
202 | ||
203 | rep.close() | |
204 | } | |
205 | ||
206 | // Continue will reopen the metric channel and allow for monitoring | |
207 | // to be resumed. | |
208 | func (rep *Reporter) Continue() { | |
209 | lock.Lock() | |
210 | defer lock.Unlock() | |
211 | if rep == nil { | |
212 | return | |
213 | } | |
214 | ||
215 | if !rep.metricsCh.IsPaused() { | |
216 | return | |
217 | } | |
218 | ||
219 | rep.metricsCh.Continue() | |
220 | } | |
221 | ||
222 | // InjectHandlers will will enable client side metrics and inject the proper | |
223 | // handlers to handle how metrics are sent. | |
224 | // | |
225 | // Example: | |
226 | // // Start must be called in order to inject the correct handlers | |
227 | // r, err := csm.Start("clientID", "127.0.0.1:8094") | |
228 | // if err != nil { | |
229 | // panic(fmt.Errorf("expected no error, but received %v", err)) | |
230 | // } | |
231 | // | |
232 | // sess := session.NewSession() | |
233 | // r.InjectHandlers(&sess.Handlers) | |
234 | // | |
235 | // // create a new service client with our client side metric session | |
236 | // svc := s3.New(sess) | |
237 | func (rep *Reporter) InjectHandlers(handlers *request.Handlers) { | |
238 | if rep == nil { | |
239 | return | |
240 | } | |
241 | ||
107c1cdb ND |
242 | handlers.Complete.PushFrontNamed(request.NamedHandler{ |
243 | Name: APICallMetricHandlerName, | |
244 | Fn: rep.sendAPICallMetric, | |
245 | }) | |
15c0b25d | 246 | |
107c1cdb ND |
247 | handlers.CompleteAttempt.PushFrontNamed(request.NamedHandler{ |
248 | Name: APICallAttemptMetricHandlerName, | |
249 | Fn: rep.sendAPICallAttemptMetric, | |
250 | }) | |
251 | } | |
252 | ||
253 | // boolIntValue return 1 for true and 0 for false. | |
254 | func boolIntValue(b bool) int { | |
255 | if b { | |
256 | return 1 | |
257 | } | |
15c0b25d | 258 | |
107c1cdb | 259 | return 0 |
15c0b25d | 260 | } |