aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/github.com/fsouza/go-dockerclient/event.go
diff options
context:
space:
mode:
authorJake Champlin <jake@gnu.space>2017-06-09 17:54:32 +0000
committerJake Champlin <jake@gnu.space>2017-06-09 17:54:32 +0000
commit9b12e4fe6f3c95986f1f3ec791636c58ca7e7583 (patch)
tree38f5f12bec0e488a12f0459a7356e6b7de7d8f84 /vendor/github.com/fsouza/go-dockerclient/event.go
parentcec3de8a3bcaffd21dedd1bf42da4b490cae7e16 (diff)
downloadterraform-provider-statuscake-9b12e4fe6f3c95986f1f3ec791636c58ca7e7583.tar.gz
terraform-provider-statuscake-9b12e4fe6f3c95986f1f3ec791636c58ca7e7583.tar.zst
terraform-provider-statuscake-9b12e4fe6f3c95986f1f3ec791636c58ca7e7583.zip
Transfer of provider code
Diffstat (limited to 'vendor/github.com/fsouza/go-dockerclient/event.go')
-rw-r--r--vendor/github.com/fsouza/go-dockerclient/event.go379
1 files changed, 379 insertions, 0 deletions
diff --git a/vendor/github.com/fsouza/go-dockerclient/event.go b/vendor/github.com/fsouza/go-dockerclient/event.go
new file mode 100644
index 0000000..120cdc9
--- /dev/null
+++ b/vendor/github.com/fsouza/go-dockerclient/event.go
@@ -0,0 +1,379 @@
1// Copyright 2015 go-dockerclient authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package docker
6
7import (
8 "encoding/json"
9 "errors"
10 "fmt"
11 "io"
12 "math"
13 "net"
14 "net/http"
15 "net/http/httputil"
16 "sync"
17 "sync/atomic"
18 "time"
19)
20
21// APIEvents represents events coming from the Docker API
22// The fields in the Docker API changed in API version 1.22, and
23// events for more than images and containers are now fired off.
24// To maintain forward and backward compatibility, go-dockerclient
25// replicates the event in both the new and old format as faithfully as possible.
26//
27// For events that only exist in 1.22 in later, `Status` is filled in as
28// `"Type:Action"` instead of just `Action` to allow for older clients to
29// differentiate and not break if they rely on the pre-1.22 Status types.
30//
31// The transformEvent method can be consulted for more information about how
32// events are translated from new/old API formats
33type APIEvents struct {
34 // New API Fields in 1.22
35 Action string `json:"action,omitempty"`
36 Type string `json:"type,omitempty"`
37 Actor APIActor `json:"actor,omitempty"`
38
39 // Old API fields for < 1.22
40 Status string `json:"status,omitempty"`
41 ID string `json:"id,omitempty"`
42 From string `json:"from,omitempty"`
43
44 // Fields in both
45 Time int64 `json:"time,omitempty"`
46 TimeNano int64 `json:"timeNano,omitempty"`
47}
48
49// APIActor represents an actor that accomplishes something for an event
50type APIActor struct {
51 ID string `json:"id,omitempty"`
52 Attributes map[string]string `json:"attributes,omitempty"`
53}
54
55type eventMonitoringState struct {
56 sync.RWMutex
57 sync.WaitGroup
58 enabled bool
59 lastSeen int64
60 C chan *APIEvents
61 errC chan error
62 listeners []chan<- *APIEvents
63}
64
65const (
66 maxMonitorConnRetries = 5
67 retryInitialWaitTime = 10.
68)
69
70var (
71 // ErrNoListeners is the error returned when no listeners are available
72 // to receive an event.
73 ErrNoListeners = errors.New("no listeners present to receive event")
74
75 // ErrListenerAlreadyExists is the error returned when the listerner already
76 // exists.
77 ErrListenerAlreadyExists = errors.New("listener already exists for docker events")
78
79 // EOFEvent is sent when the event listener receives an EOF error.
80 EOFEvent = &APIEvents{
81 Type: "EOF",
82 Status: "EOF",
83 }
84)
85
86// AddEventListener adds a new listener to container events in the Docker API.
87//
88// The parameter is a channel through which events will be sent.
89func (c *Client) AddEventListener(listener chan<- *APIEvents) error {
90 var err error
91 if !c.eventMonitor.isEnabled() {
92 err = c.eventMonitor.enableEventMonitoring(c)
93 if err != nil {
94 return err
95 }
96 }
97 err = c.eventMonitor.addListener(listener)
98 if err != nil {
99 return err
100 }
101 return nil
102}
103
104// RemoveEventListener removes a listener from the monitor.
105func (c *Client) RemoveEventListener(listener chan *APIEvents) error {
106 err := c.eventMonitor.removeListener(listener)
107 if err != nil {
108 return err
109 }
110 if len(c.eventMonitor.listeners) == 0 {
111 c.eventMonitor.disableEventMonitoring()
112 }
113 return nil
114}
115
116func (eventState *eventMonitoringState) addListener(listener chan<- *APIEvents) error {
117 eventState.Lock()
118 defer eventState.Unlock()
119 if listenerExists(listener, &eventState.listeners) {
120 return ErrListenerAlreadyExists
121 }
122 eventState.Add(1)
123 eventState.listeners = append(eventState.listeners, listener)
124 return nil
125}
126
127func (eventState *eventMonitoringState) removeListener(listener chan<- *APIEvents) error {
128 eventState.Lock()
129 defer eventState.Unlock()
130 if listenerExists(listener, &eventState.listeners) {
131 var newListeners []chan<- *APIEvents
132 for _, l := range eventState.listeners {
133 if l != listener {
134 newListeners = append(newListeners, l)
135 }
136 }
137 eventState.listeners = newListeners
138 eventState.Add(-1)
139 }
140 return nil
141}
142
143func (eventState *eventMonitoringState) closeListeners() {
144 for _, l := range eventState.listeners {
145 close(l)
146 eventState.Add(-1)
147 }
148 eventState.listeners = nil
149}
150
151func listenerExists(a chan<- *APIEvents, list *[]chan<- *APIEvents) bool {
152 for _, b := range *list {
153 if b == a {
154 return true
155 }
156 }
157 return false
158}
159
160func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error {
161 eventState.Lock()
162 defer eventState.Unlock()
163 if !eventState.enabled {
164 eventState.enabled = true
165 atomic.StoreInt64(&eventState.lastSeen, 0)
166 eventState.C = make(chan *APIEvents, 100)
167 eventState.errC = make(chan error, 1)
168 go eventState.monitorEvents(c)
169 }
170 return nil
171}
172
173func (eventState *eventMonitoringState) disableEventMonitoring() error {
174 eventState.Lock()
175 defer eventState.Unlock()
176
177 eventState.closeListeners()
178
179 eventState.Wait()
180
181 if eventState.enabled {
182 eventState.enabled = false
183 close(eventState.C)
184 close(eventState.errC)
185 }
186 return nil
187}
188
189func (eventState *eventMonitoringState) monitorEvents(c *Client) {
190 var err error
191 for eventState.noListeners() {
192 time.Sleep(10 * time.Millisecond)
193 }
194 if err = eventState.connectWithRetry(c); err != nil {
195 // terminate if connect failed
196 eventState.disableEventMonitoring()
197 return
198 }
199 for eventState.isEnabled() {
200 timeout := time.After(100 * time.Millisecond)
201 select {
202 case ev, ok := <-eventState.C:
203 if !ok {
204 return
205 }
206 if ev == EOFEvent {
207 eventState.disableEventMonitoring()
208 return
209 }
210 eventState.updateLastSeen(ev)
211 go eventState.sendEvent(ev)
212 case err = <-eventState.errC:
213 if err == ErrNoListeners {
214 eventState.disableEventMonitoring()
215 return
216 } else if err != nil {
217 defer func() { go eventState.monitorEvents(c) }()
218 return
219 }
220 case <-timeout:
221 continue
222 }
223 }
224}
225
226func (eventState *eventMonitoringState) connectWithRetry(c *Client) error {
227 var retries int
228 eventState.RLock()
229 eventChan := eventState.C
230 errChan := eventState.errC
231 eventState.RUnlock()
232 err := c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
233 for ; err != nil && retries < maxMonitorConnRetries; retries++ {
234 waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries)))
235 time.Sleep(time.Duration(waitTime) * time.Millisecond)
236 eventState.RLock()
237 eventChan = eventState.C
238 errChan = eventState.errC
239 eventState.RUnlock()
240 err = c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
241 }
242 return err
243}
244
245func (eventState *eventMonitoringState) noListeners() bool {
246 eventState.RLock()
247 defer eventState.RUnlock()
248 return len(eventState.listeners) == 0
249}
250
251func (eventState *eventMonitoringState) isEnabled() bool {
252 eventState.RLock()
253 defer eventState.RUnlock()
254 return eventState.enabled
255}
256
257func (eventState *eventMonitoringState) sendEvent(event *APIEvents) {
258 eventState.RLock()
259 defer eventState.RUnlock()
260 eventState.Add(1)
261 defer eventState.Done()
262 if eventState.enabled {
263 if len(eventState.listeners) == 0 {
264 eventState.errC <- ErrNoListeners
265 return
266 }
267
268 for _, listener := range eventState.listeners {
269 listener <- event
270 }
271 }
272}
273
274func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) {
275 eventState.Lock()
276 defer eventState.Unlock()
277 if atomic.LoadInt64(&eventState.lastSeen) < e.Time {
278 atomic.StoreInt64(&eventState.lastSeen, e.Time)
279 }
280}
281
282func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan chan error) error {
283 uri := "/events"
284 if startTime != 0 {
285 uri += fmt.Sprintf("?since=%d", startTime)
286 }
287 protocol := c.endpointURL.Scheme
288 address := c.endpointURL.Path
289 if protocol != "unix" {
290 protocol = "tcp"
291 address = c.endpointURL.Host
292 }
293 var dial net.Conn
294 var err error
295 if c.TLSConfig == nil {
296 dial, err = c.Dialer.Dial(protocol, address)
297 } else {
298 dial, err = tlsDialWithDialer(c.Dialer, protocol, address, c.TLSConfig)
299 }
300 if err != nil {
301 return err
302 }
303 conn := httputil.NewClientConn(dial, nil)
304 req, err := http.NewRequest("GET", uri, nil)
305 if err != nil {
306 return err
307 }
308 res, err := conn.Do(req)
309 if err != nil {
310 return err
311 }
312 go func(res *http.Response, conn *httputil.ClientConn) {
313 defer conn.Close()
314 defer res.Body.Close()
315 decoder := json.NewDecoder(res.Body)
316 for {
317 var event APIEvents
318 if err = decoder.Decode(&event); err != nil {
319 if err == io.EOF || err == io.ErrUnexpectedEOF {
320 c.eventMonitor.RLock()
321 if c.eventMonitor.enabled && c.eventMonitor.C == eventChan {
322 // Signal that we're exiting.
323 eventChan <- EOFEvent
324 }
325 c.eventMonitor.RUnlock()
326 break
327 }
328 errChan <- err
329 }
330 if event.Time == 0 {
331 continue
332 }
333 if !c.eventMonitor.isEnabled() || c.eventMonitor.C != eventChan {
334 return
335 }
336 transformEvent(&event)
337 eventChan <- &event
338 }
339 }(res, conn)
340 return nil
341}
342
343// transformEvent takes an event and determines what version it is from
344// then populates both versions of the event
345func transformEvent(event *APIEvents) {
346 // if event version is <= 1.21 there will be no Action and no Type
347 if event.Action == "" && event.Type == "" {
348 event.Action = event.Status
349 event.Actor.ID = event.ID
350 event.Actor.Attributes = map[string]string{}
351 switch event.Status {
352 case "delete", "import", "pull", "push", "tag", "untag":
353 event.Type = "image"
354 default:
355 event.Type = "container"
356 if event.From != "" {
357 event.Actor.Attributes["image"] = event.From
358 }
359 }
360 } else {
361 if event.Status == "" {
362 if event.Type == "image" || event.Type == "container" {
363 event.Status = event.Action
364 } else {
365 // Because just the Status has been overloaded with different Types
366 // if an event is not for an image or a container, we prepend the type
367 // to avoid problems for people relying on actions being only for
368 // images and containers
369 event.Status = event.Type + ":" + event.Action
370 }
371 }
372 if event.ID == "" {
373 event.ID = event.Actor.ID
374 }
375 if event.From == "" {
376 event.From = event.Actor.Attributes["image"]
377 }
378 }
379}