diff options
Diffstat (limited to 'vendor/github.com/aws/aws-sdk-go/aws/csm/reporter.go')
-rw-r--r-- | vendor/github.com/aws/aws-sdk-go/aws/csm/reporter.go | 231 |
1 files changed, 231 insertions, 0 deletions
diff --git a/vendor/github.com/aws/aws-sdk-go/aws/csm/reporter.go b/vendor/github.com/aws/aws-sdk-go/aws/csm/reporter.go new file mode 100644 index 0000000..11082e5 --- /dev/null +++ b/vendor/github.com/aws/aws-sdk-go/aws/csm/reporter.go | |||
@@ -0,0 +1,231 @@ | |||
1 | package csm | ||
2 | |||
3 | import ( | ||
4 | "encoding/json" | ||
5 | "net" | ||
6 | "time" | ||
7 | |||
8 | "github.com/aws/aws-sdk-go/aws" | ||
9 | "github.com/aws/aws-sdk-go/aws/awserr" | ||
10 | "github.com/aws/aws-sdk-go/aws/request" | ||
11 | ) | ||
12 | |||
13 | const ( | ||
14 | // DefaultPort is used when no port is specified | ||
15 | DefaultPort = "31000" | ||
16 | ) | ||
17 | |||
18 | // Reporter will gather metrics of API requests made and | ||
19 | // send those metrics to the CSM endpoint. | ||
20 | type Reporter struct { | ||
21 | clientID string | ||
22 | url string | ||
23 | conn net.Conn | ||
24 | metricsCh metricChan | ||
25 | done chan struct{} | ||
26 | } | ||
27 | |||
28 | var ( | ||
29 | sender *Reporter | ||
30 | ) | ||
31 | |||
32 | func connect(url string) error { | ||
33 | const network = "udp" | ||
34 | if err := sender.connect(network, url); err != nil { | ||
35 | return err | ||
36 | } | ||
37 | |||
38 | if sender.done == nil { | ||
39 | sender.done = make(chan struct{}) | ||
40 | go sender.start() | ||
41 | } | ||
42 | |||
43 | return nil | ||
44 | } | ||
45 | |||
46 | func newReporter(clientID, url string) *Reporter { | ||
47 | return &Reporter{ | ||
48 | clientID: clientID, | ||
49 | url: url, | ||
50 | metricsCh: newMetricChan(MetricsChannelSize), | ||
51 | } | ||
52 | } | ||
53 | |||
54 | func (rep *Reporter) sendAPICallAttemptMetric(r *request.Request) { | ||
55 | if rep == nil { | ||
56 | return | ||
57 | } | ||
58 | |||
59 | now := time.Now() | ||
60 | creds, _ := r.Config.Credentials.Get() | ||
61 | |||
62 | m := metric{ | ||
63 | ClientID: aws.String(rep.clientID), | ||
64 | API: aws.String(r.Operation.Name), | ||
65 | Service: aws.String(r.ClientInfo.ServiceID), | ||
66 | Timestamp: (*metricTime)(&now), | ||
67 | UserAgent: aws.String(r.HTTPRequest.Header.Get("User-Agent")), | ||
68 | Region: r.Config.Region, | ||
69 | Type: aws.String("ApiCallAttempt"), | ||
70 | Version: aws.Int(1), | ||
71 | |||
72 | XAmzRequestID: aws.String(r.RequestID), | ||
73 | |||
74 | AttemptCount: aws.Int(r.RetryCount + 1), | ||
75 | AttemptLatency: aws.Int(int(now.Sub(r.AttemptTime).Nanoseconds() / int64(time.Millisecond))), | ||
76 | AccessKey: aws.String(creds.AccessKeyID), | ||
77 | } | ||
78 | |||
79 | if r.HTTPResponse != nil { | ||
80 | m.HTTPStatusCode = aws.Int(r.HTTPResponse.StatusCode) | ||
81 | } | ||
82 | |||
83 | if r.Error != nil { | ||
84 | if awserr, ok := r.Error.(awserr.Error); ok { | ||
85 | setError(&m, awserr) | ||
86 | } | ||
87 | } | ||
88 | |||
89 | rep.metricsCh.Push(m) | ||
90 | } | ||
91 | |||
92 | func setError(m *metric, err awserr.Error) { | ||
93 | msg := err.Error() | ||
94 | code := err.Code() | ||
95 | |||
96 | switch code { | ||
97 | case "RequestError", | ||
98 | "SerializationError", | ||
99 | request.CanceledErrorCode: | ||
100 | m.SDKException = &code | ||
101 | m.SDKExceptionMessage = &msg | ||
102 | default: | ||
103 | m.AWSException = &code | ||
104 | m.AWSExceptionMessage = &msg | ||
105 | } | ||
106 | } | ||
107 | |||
108 | func (rep *Reporter) sendAPICallMetric(r *request.Request) { | ||
109 | if rep == nil { | ||
110 | return | ||
111 | } | ||
112 | |||
113 | now := time.Now() | ||
114 | m := metric{ | ||
115 | ClientID: aws.String(rep.clientID), | ||
116 | API: aws.String(r.Operation.Name), | ||
117 | Service: aws.String(r.ClientInfo.ServiceID), | ||
118 | Timestamp: (*metricTime)(&now), | ||
119 | Type: aws.String("ApiCall"), | ||
120 | AttemptCount: aws.Int(r.RetryCount + 1), | ||
121 | Latency: aws.Int(int(time.Now().Sub(r.Time) / time.Millisecond)), | ||
122 | XAmzRequestID: aws.String(r.RequestID), | ||
123 | } | ||
124 | |||
125 | // TODO: Probably want to figure something out for logging dropped | ||
126 | // metrics | ||
127 | rep.metricsCh.Push(m) | ||
128 | } | ||
129 | |||
130 | func (rep *Reporter) connect(network, url string) error { | ||
131 | if rep.conn != nil { | ||
132 | rep.conn.Close() | ||
133 | } | ||
134 | |||
135 | conn, err := net.Dial(network, url) | ||
136 | if err != nil { | ||
137 | return awserr.New("UDPError", "Could not connect", err) | ||
138 | } | ||
139 | |||
140 | rep.conn = conn | ||
141 | |||
142 | return nil | ||
143 | } | ||
144 | |||
145 | func (rep *Reporter) close() { | ||
146 | if rep.done != nil { | ||
147 | close(rep.done) | ||
148 | } | ||
149 | |||
150 | rep.metricsCh.Pause() | ||
151 | } | ||
152 | |||
153 | func (rep *Reporter) start() { | ||
154 | defer func() { | ||
155 | rep.metricsCh.Pause() | ||
156 | }() | ||
157 | |||
158 | for { | ||
159 | select { | ||
160 | case <-rep.done: | ||
161 | rep.done = nil | ||
162 | return | ||
163 | case m := <-rep.metricsCh.ch: | ||
164 | // TODO: What to do with this error? Probably should just log | ||
165 | b, err := json.Marshal(m) | ||
166 | if err != nil { | ||
167 | continue | ||
168 | } | ||
169 | |||
170 | rep.conn.Write(b) | ||
171 | } | ||
172 | } | ||
173 | } | ||
174 | |||
175 | // Pause will pause the metric channel preventing any new metrics from | ||
176 | // being added. | ||
177 | func (rep *Reporter) Pause() { | ||
178 | lock.Lock() | ||
179 | defer lock.Unlock() | ||
180 | |||
181 | if rep == nil { | ||
182 | return | ||
183 | } | ||
184 | |||
185 | rep.close() | ||
186 | } | ||
187 | |||
188 | // Continue will reopen the metric channel and allow for monitoring | ||
189 | // to be resumed. | ||
190 | func (rep *Reporter) Continue() { | ||
191 | lock.Lock() | ||
192 | defer lock.Unlock() | ||
193 | if rep == nil { | ||
194 | return | ||
195 | } | ||
196 | |||
197 | if !rep.metricsCh.IsPaused() { | ||
198 | return | ||
199 | } | ||
200 | |||
201 | rep.metricsCh.Continue() | ||
202 | } | ||
203 | |||
204 | // InjectHandlers will will enable client side metrics and inject the proper | ||
205 | // handlers to handle how metrics are sent. | ||
206 | // | ||
207 | // Example: | ||
208 | // // Start must be called in order to inject the correct handlers | ||
209 | // r, err := csm.Start("clientID", "127.0.0.1:8094") | ||
210 | // if err != nil { | ||
211 | // panic(fmt.Errorf("expected no error, but received %v", err)) | ||
212 | // } | ||
213 | // | ||
214 | // sess := session.NewSession() | ||
215 | // r.InjectHandlers(&sess.Handlers) | ||
216 | // | ||
217 | // // create a new service client with our client side metric session | ||
218 | // svc := s3.New(sess) | ||
219 | func (rep *Reporter) InjectHandlers(handlers *request.Handlers) { | ||
220 | if rep == nil { | ||
221 | return | ||
222 | } | ||
223 | |||
224 | apiCallHandler := request.NamedHandler{Name: APICallMetricHandlerName, Fn: rep.sendAPICallMetric} | ||
225 | apiCallAttemptHandler := request.NamedHandler{Name: APICallAttemptMetricHandlerName, Fn: rep.sendAPICallAttemptMetric} | ||
226 | |||
227 | handlers.Complete.PushFrontNamed(apiCallHandler) | ||
228 | handlers.Complete.PushFrontNamed(apiCallAttemptHandler) | ||
229 | |||
230 | handlers.AfterRetry.PushFrontNamed(apiCallAttemptHandler) | ||
231 | } | ||