aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/github.com/aws/aws-sdk-go/aws/csm/reporter.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/aws/aws-sdk-go/aws/csm/reporter.go')
-rw-r--r--vendor/github.com/aws/aws-sdk-go/aws/csm/reporter.go231
1 files changed, 231 insertions, 0 deletions
diff --git a/vendor/github.com/aws/aws-sdk-go/aws/csm/reporter.go b/vendor/github.com/aws/aws-sdk-go/aws/csm/reporter.go
new file mode 100644
index 0000000..11082e5
--- /dev/null
+++ b/vendor/github.com/aws/aws-sdk-go/aws/csm/reporter.go
@@ -0,0 +1,231 @@
1package csm
2
3import (
4 "encoding/json"
5 "net"
6 "time"
7
8 "github.com/aws/aws-sdk-go/aws"
9 "github.com/aws/aws-sdk-go/aws/awserr"
10 "github.com/aws/aws-sdk-go/aws/request"
11)
12
13const (
14 // DefaultPort is used when no port is specified
15 DefaultPort = "31000"
16)
17
18// Reporter will gather metrics of API requests made and
19// send those metrics to the CSM endpoint.
20type Reporter struct {
21 clientID string
22 url string
23 conn net.Conn
24 metricsCh metricChan
25 done chan struct{}
26}
27
28var (
29 sender *Reporter
30)
31
32func connect(url string) error {
33 const network = "udp"
34 if err := sender.connect(network, url); err != nil {
35 return err
36 }
37
38 if sender.done == nil {
39 sender.done = make(chan struct{})
40 go sender.start()
41 }
42
43 return nil
44}
45
46func newReporter(clientID, url string) *Reporter {
47 return &Reporter{
48 clientID: clientID,
49 url: url,
50 metricsCh: newMetricChan(MetricsChannelSize),
51 }
52}
53
54func (rep *Reporter) sendAPICallAttemptMetric(r *request.Request) {
55 if rep == nil {
56 return
57 }
58
59 now := time.Now()
60 creds, _ := r.Config.Credentials.Get()
61
62 m := metric{
63 ClientID: aws.String(rep.clientID),
64 API: aws.String(r.Operation.Name),
65 Service: aws.String(r.ClientInfo.ServiceID),
66 Timestamp: (*metricTime)(&now),
67 UserAgent: aws.String(r.HTTPRequest.Header.Get("User-Agent")),
68 Region: r.Config.Region,
69 Type: aws.String("ApiCallAttempt"),
70 Version: aws.Int(1),
71
72 XAmzRequestID: aws.String(r.RequestID),
73
74 AttemptCount: aws.Int(r.RetryCount + 1),
75 AttemptLatency: aws.Int(int(now.Sub(r.AttemptTime).Nanoseconds() / int64(time.Millisecond))),
76 AccessKey: aws.String(creds.AccessKeyID),
77 }
78
79 if r.HTTPResponse != nil {
80 m.HTTPStatusCode = aws.Int(r.HTTPResponse.StatusCode)
81 }
82
83 if r.Error != nil {
84 if awserr, ok := r.Error.(awserr.Error); ok {
85 setError(&m, awserr)
86 }
87 }
88
89 rep.metricsCh.Push(m)
90}
91
92func setError(m *metric, err awserr.Error) {
93 msg := err.Error()
94 code := err.Code()
95
96 switch code {
97 case "RequestError",
98 "SerializationError",
99 request.CanceledErrorCode:
100 m.SDKException = &code
101 m.SDKExceptionMessage = &msg
102 default:
103 m.AWSException = &code
104 m.AWSExceptionMessage = &msg
105 }
106}
107
108func (rep *Reporter) sendAPICallMetric(r *request.Request) {
109 if rep == nil {
110 return
111 }
112
113 now := time.Now()
114 m := metric{
115 ClientID: aws.String(rep.clientID),
116 API: aws.String(r.Operation.Name),
117 Service: aws.String(r.ClientInfo.ServiceID),
118 Timestamp: (*metricTime)(&now),
119 Type: aws.String("ApiCall"),
120 AttemptCount: aws.Int(r.RetryCount + 1),
121 Latency: aws.Int(int(time.Now().Sub(r.Time) / time.Millisecond)),
122 XAmzRequestID: aws.String(r.RequestID),
123 }
124
125 // TODO: Probably want to figure something out for logging dropped
126 // metrics
127 rep.metricsCh.Push(m)
128}
129
130func (rep *Reporter) connect(network, url string) error {
131 if rep.conn != nil {
132 rep.conn.Close()
133 }
134
135 conn, err := net.Dial(network, url)
136 if err != nil {
137 return awserr.New("UDPError", "Could not connect", err)
138 }
139
140 rep.conn = conn
141
142 return nil
143}
144
145func (rep *Reporter) close() {
146 if rep.done != nil {
147 close(rep.done)
148 }
149
150 rep.metricsCh.Pause()
151}
152
153func (rep *Reporter) start() {
154 defer func() {
155 rep.metricsCh.Pause()
156 }()
157
158 for {
159 select {
160 case <-rep.done:
161 rep.done = nil
162 return
163 case m := <-rep.metricsCh.ch:
164 // TODO: What to do with this error? Probably should just log
165 b, err := json.Marshal(m)
166 if err != nil {
167 continue
168 }
169
170 rep.conn.Write(b)
171 }
172 }
173}
174
175// Pause will pause the metric channel preventing any new metrics from
176// being added.
177func (rep *Reporter) Pause() {
178 lock.Lock()
179 defer lock.Unlock()
180
181 if rep == nil {
182 return
183 }
184
185 rep.close()
186}
187
188// Continue will reopen the metric channel and allow for monitoring
189// to be resumed.
190func (rep *Reporter) Continue() {
191 lock.Lock()
192 defer lock.Unlock()
193 if rep == nil {
194 return
195 }
196
197 if !rep.metricsCh.IsPaused() {
198 return
199 }
200
201 rep.metricsCh.Continue()
202}
203
204// InjectHandlers will will enable client side metrics and inject the proper
205// handlers to handle how metrics are sent.
206//
207// Example:
208// // Start must be called in order to inject the correct handlers
209// r, err := csm.Start("clientID", "127.0.0.1:8094")
210// if err != nil {
211// panic(fmt.Errorf("expected no error, but received %v", err))
212// }
213//
214// sess := session.NewSession()
215// r.InjectHandlers(&sess.Handlers)
216//
217// // create a new service client with our client side metric session
218// svc := s3.New(sess)
219func (rep *Reporter) InjectHandlers(handlers *request.Handlers) {
220 if rep == nil {
221 return
222 }
223
224 apiCallHandler := request.NamedHandler{Name: APICallMetricHandlerName, Fn: rep.sendAPICallMetric}
225 apiCallAttemptHandler := request.NamedHandler{Name: APICallAttemptMetricHandlerName, Fn: rep.sendAPICallAttemptMetric}
226
227 handlers.Complete.PushFrontNamed(apiCallHandler)
228 handlers.Complete.PushFrontNamed(apiCallAttemptHandler)
229
230 handlers.AfterRetry.PushFrontNamed(apiCallAttemptHandler)
231}