]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blob - vendor/github.com/fsouza/go-dockerclient/event.go
provider: Ensured Go 1.11 in TravisCI and README
[github/fretlink/terraform-provider-statuscake.git] / vendor / github.com / fsouza / go-dockerclient / event.go
1 // Copyright 2015 go-dockerclient authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4
5 package docker
6
7 import (
8 "encoding/json"
9 "errors"
10 "fmt"
11 "io"
12 "math"
13 "net"
14 "net/http"
15 "net/http/httputil"
16 "sync"
17 "sync/atomic"
18 "time"
19 )
20
21 // APIEvents represents events coming from the Docker API
22 // The fields in the Docker API changed in API version 1.22, and
23 // events for more than images and containers are now fired off.
24 // To maintain forward and backward compatibility, go-dockerclient
25 // replicates the event in both the new and old format as faithfully as possible.
26 //
27 // For events that only exist in 1.22 in later, `Status` is filled in as
28 // `"Type:Action"` instead of just `Action` to allow for older clients to
29 // differentiate and not break if they rely on the pre-1.22 Status types.
30 //
31 // The transformEvent method can be consulted for more information about how
32 // events are translated from new/old API formats
33 type APIEvents struct {
34 // New API Fields in 1.22
35 Action string `json:"action,omitempty"`
36 Type string `json:"type,omitempty"`
37 Actor APIActor `json:"actor,omitempty"`
38
39 // Old API fields for < 1.22
40 Status string `json:"status,omitempty"`
41 ID string `json:"id,omitempty"`
42 From string `json:"from,omitempty"`
43
44 // Fields in both
45 Time int64 `json:"time,omitempty"`
46 TimeNano int64 `json:"timeNano,omitempty"`
47 }
48
49 // APIActor represents an actor that accomplishes something for an event
50 type APIActor struct {
51 ID string `json:"id,omitempty"`
52 Attributes map[string]string `json:"attributes,omitempty"`
53 }
54
55 type eventMonitoringState struct {
56 sync.RWMutex
57 sync.WaitGroup
58 enabled bool
59 lastSeen int64
60 C chan *APIEvents
61 errC chan error
62 listeners []chan<- *APIEvents
63 }
64
65 const (
66 maxMonitorConnRetries = 5
67 retryInitialWaitTime = 10.
68 )
69
70 var (
71 // ErrNoListeners is the error returned when no listeners are available
72 // to receive an event.
73 ErrNoListeners = errors.New("no listeners present to receive event")
74
75 // ErrListenerAlreadyExists is the error returned when the listerner already
76 // exists.
77 ErrListenerAlreadyExists = errors.New("listener already exists for docker events")
78
79 // EOFEvent is sent when the event listener receives an EOF error.
80 EOFEvent = &APIEvents{
81 Type: "EOF",
82 Status: "EOF",
83 }
84 )
85
86 // AddEventListener adds a new listener to container events in the Docker API.
87 //
88 // The parameter is a channel through which events will be sent.
89 func (c *Client) AddEventListener(listener chan<- *APIEvents) error {
90 var err error
91 if !c.eventMonitor.isEnabled() {
92 err = c.eventMonitor.enableEventMonitoring(c)
93 if err != nil {
94 return err
95 }
96 }
97 err = c.eventMonitor.addListener(listener)
98 if err != nil {
99 return err
100 }
101 return nil
102 }
103
104 // RemoveEventListener removes a listener from the monitor.
105 func (c *Client) RemoveEventListener(listener chan *APIEvents) error {
106 err := c.eventMonitor.removeListener(listener)
107 if err != nil {
108 return err
109 }
110 if len(c.eventMonitor.listeners) == 0 {
111 c.eventMonitor.disableEventMonitoring()
112 }
113 return nil
114 }
115
116 func (eventState *eventMonitoringState) addListener(listener chan<- *APIEvents) error {
117 eventState.Lock()
118 defer eventState.Unlock()
119 if listenerExists(listener, &eventState.listeners) {
120 return ErrListenerAlreadyExists
121 }
122 eventState.Add(1)
123 eventState.listeners = append(eventState.listeners, listener)
124 return nil
125 }
126
127 func (eventState *eventMonitoringState) removeListener(listener chan<- *APIEvents) error {
128 eventState.Lock()
129 defer eventState.Unlock()
130 if listenerExists(listener, &eventState.listeners) {
131 var newListeners []chan<- *APIEvents
132 for _, l := range eventState.listeners {
133 if l != listener {
134 newListeners = append(newListeners, l)
135 }
136 }
137 eventState.listeners = newListeners
138 eventState.Add(-1)
139 }
140 return nil
141 }
142
143 func (eventState *eventMonitoringState) closeListeners() {
144 for _, l := range eventState.listeners {
145 close(l)
146 eventState.Add(-1)
147 }
148 eventState.listeners = nil
149 }
150
151 func listenerExists(a chan<- *APIEvents, list *[]chan<- *APIEvents) bool {
152 for _, b := range *list {
153 if b == a {
154 return true
155 }
156 }
157 return false
158 }
159
160 func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error {
161 eventState.Lock()
162 defer eventState.Unlock()
163 if !eventState.enabled {
164 eventState.enabled = true
165 atomic.StoreInt64(&eventState.lastSeen, 0)
166 eventState.C = make(chan *APIEvents, 100)
167 eventState.errC = make(chan error, 1)
168 go eventState.monitorEvents(c)
169 }
170 return nil
171 }
172
173 func (eventState *eventMonitoringState) disableEventMonitoring() error {
174 eventState.Lock()
175 defer eventState.Unlock()
176
177 eventState.closeListeners()
178
179 eventState.Wait()
180
181 if eventState.enabled {
182 eventState.enabled = false
183 close(eventState.C)
184 close(eventState.errC)
185 }
186 return nil
187 }
188
189 func (eventState *eventMonitoringState) monitorEvents(c *Client) {
190 var err error
191 for eventState.noListeners() {
192 time.Sleep(10 * time.Millisecond)
193 }
194 if err = eventState.connectWithRetry(c); err != nil {
195 // terminate if connect failed
196 eventState.disableEventMonitoring()
197 return
198 }
199 for eventState.isEnabled() {
200 timeout := time.After(100 * time.Millisecond)
201 select {
202 case ev, ok := <-eventState.C:
203 if !ok {
204 return
205 }
206 if ev == EOFEvent {
207 eventState.disableEventMonitoring()
208 return
209 }
210 eventState.updateLastSeen(ev)
211 go eventState.sendEvent(ev)
212 case err = <-eventState.errC:
213 if err == ErrNoListeners {
214 eventState.disableEventMonitoring()
215 return
216 } else if err != nil {
217 defer func() { go eventState.monitorEvents(c) }()
218 return
219 }
220 case <-timeout:
221 continue
222 }
223 }
224 }
225
226 func (eventState *eventMonitoringState) connectWithRetry(c *Client) error {
227 var retries int
228 eventState.RLock()
229 eventChan := eventState.C
230 errChan := eventState.errC
231 eventState.RUnlock()
232 err := c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
233 for ; err != nil && retries < maxMonitorConnRetries; retries++ {
234 waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries)))
235 time.Sleep(time.Duration(waitTime) * time.Millisecond)
236 eventState.RLock()
237 eventChan = eventState.C
238 errChan = eventState.errC
239 eventState.RUnlock()
240 err = c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
241 }
242 return err
243 }
244
245 func (eventState *eventMonitoringState) noListeners() bool {
246 eventState.RLock()
247 defer eventState.RUnlock()
248 return len(eventState.listeners) == 0
249 }
250
251 func (eventState *eventMonitoringState) isEnabled() bool {
252 eventState.RLock()
253 defer eventState.RUnlock()
254 return eventState.enabled
255 }
256
257 func (eventState *eventMonitoringState) sendEvent(event *APIEvents) {
258 eventState.RLock()
259 defer eventState.RUnlock()
260 eventState.Add(1)
261 defer eventState.Done()
262 if eventState.enabled {
263 if len(eventState.listeners) == 0 {
264 eventState.errC <- ErrNoListeners
265 return
266 }
267
268 for _, listener := range eventState.listeners {
269 listener <- event
270 }
271 }
272 }
273
274 func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) {
275 eventState.Lock()
276 defer eventState.Unlock()
277 if atomic.LoadInt64(&eventState.lastSeen) < e.Time {
278 atomic.StoreInt64(&eventState.lastSeen, e.Time)
279 }
280 }
281
282 func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan chan error) error {
283 uri := "/events"
284 if startTime != 0 {
285 uri += fmt.Sprintf("?since=%d", startTime)
286 }
287 protocol := c.endpointURL.Scheme
288 address := c.endpointURL.Path
289 if protocol != "unix" {
290 protocol = "tcp"
291 address = c.endpointURL.Host
292 }
293 var dial net.Conn
294 var err error
295 if c.TLSConfig == nil {
296 dial, err = c.Dialer.Dial(protocol, address)
297 } else {
298 dial, err = tlsDialWithDialer(c.Dialer, protocol, address, c.TLSConfig)
299 }
300 if err != nil {
301 return err
302 }
303 conn := httputil.NewClientConn(dial, nil)
304 req, err := http.NewRequest("GET", uri, nil)
305 if err != nil {
306 return err
307 }
308 res, err := conn.Do(req)
309 if err != nil {
310 return err
311 }
312 go func(res *http.Response, conn *httputil.ClientConn) {
313 defer conn.Close()
314 defer res.Body.Close()
315 decoder := json.NewDecoder(res.Body)
316 for {
317 var event APIEvents
318 if err = decoder.Decode(&event); err != nil {
319 if err == io.EOF || err == io.ErrUnexpectedEOF {
320 c.eventMonitor.RLock()
321 if c.eventMonitor.enabled && c.eventMonitor.C == eventChan {
322 // Signal that we're exiting.
323 eventChan <- EOFEvent
324 }
325 c.eventMonitor.RUnlock()
326 break
327 }
328 errChan <- err
329 }
330 if event.Time == 0 {
331 continue
332 }
333 if !c.eventMonitor.isEnabled() || c.eventMonitor.C != eventChan {
334 return
335 }
336 transformEvent(&event)
337 eventChan <- &event
338 }
339 }(res, conn)
340 return nil
341 }
342
343 // transformEvent takes an event and determines what version it is from
344 // then populates both versions of the event
345 func transformEvent(event *APIEvents) {
346 // if event version is <= 1.21 there will be no Action and no Type
347 if event.Action == "" && event.Type == "" {
348 event.Action = event.Status
349 event.Actor.ID = event.ID
350 event.Actor.Attributes = map[string]string{}
351 switch event.Status {
352 case "delete", "import", "pull", "push", "tag", "untag":
353 event.Type = "image"
354 default:
355 event.Type = "container"
356 if event.From != "" {
357 event.Actor.Attributes["image"] = event.From
358 }
359 }
360 } else {
361 if event.Status == "" {
362 if event.Type == "image" || event.Type == "container" {
363 event.Status = event.Action
364 } else {
365 // Because just the Status has been overloaded with different Types
366 // if an event is not for an image or a container, we prepend the type
367 // to avoid problems for people relying on actions being only for
368 // images and containers
369 event.Status = event.Type + ":" + event.Action
370 }
371 }
372 if event.ID == "" {
373 event.ID = event.Actor.ID
374 }
375 if event.From == "" {
376 event.From = event.Actor.Attributes["image"]
377 }
378 }
379 }