]>
Commit | Line | Data |
---|---|---|
9b12e4fe JC |
1 | // Copyright 2015 go-dockerclient authors. All rights reserved. |
2 | // Use of this source code is governed by a BSD-style | |
3 | // license that can be found in the LICENSE file. | |
4 | ||
5 | package docker | |
6 | ||
7 | import ( | |
8 | "encoding/json" | |
9 | "errors" | |
10 | "fmt" | |
11 | "io" | |
12 | "math" | |
13 | "net" | |
14 | "net/http" | |
15 | "net/http/httputil" | |
16 | "sync" | |
17 | "sync/atomic" | |
18 | "time" | |
19 | ) | |
20 | ||
21 | // APIEvents represents events coming from the Docker API | |
22 | // The fields in the Docker API changed in API version 1.22, and | |
23 | // events for more than images and containers are now fired off. | |
24 | // To maintain forward and backward compatibility, go-dockerclient | |
25 | // replicates the event in both the new and old format as faithfully as possible. | |
26 | // | |
27 | // For events that only exist in 1.22 in later, `Status` is filled in as | |
28 | // `"Type:Action"` instead of just `Action` to allow for older clients to | |
29 | // differentiate and not break if they rely on the pre-1.22 Status types. | |
30 | // | |
31 | // The transformEvent method can be consulted for more information about how | |
32 | // events are translated from new/old API formats | |
33 | type APIEvents struct { | |
34 | // New API Fields in 1.22 | |
35 | Action string `json:"action,omitempty"` | |
36 | Type string `json:"type,omitempty"` | |
37 | Actor APIActor `json:"actor,omitempty"` | |
38 | ||
39 | // Old API fields for < 1.22 | |
40 | Status string `json:"status,omitempty"` | |
41 | ID string `json:"id,omitempty"` | |
42 | From string `json:"from,omitempty"` | |
43 | ||
44 | // Fields in both | |
45 | Time int64 `json:"time,omitempty"` | |
46 | TimeNano int64 `json:"timeNano,omitempty"` | |
47 | } | |
48 | ||
49 | // APIActor represents an actor that accomplishes something for an event | |
50 | type APIActor struct { | |
51 | ID string `json:"id,omitempty"` | |
52 | Attributes map[string]string `json:"attributes,omitempty"` | |
53 | } | |
54 | ||
55 | type eventMonitoringState struct { | |
56 | sync.RWMutex | |
57 | sync.WaitGroup | |
58 | enabled bool | |
59 | lastSeen int64 | |
60 | C chan *APIEvents | |
61 | errC chan error | |
62 | listeners []chan<- *APIEvents | |
63 | } | |
64 | ||
65 | const ( | |
66 | maxMonitorConnRetries = 5 | |
67 | retryInitialWaitTime = 10. | |
68 | ) | |
69 | ||
70 | var ( | |
71 | // ErrNoListeners is the error returned when no listeners are available | |
72 | // to receive an event. | |
73 | ErrNoListeners = errors.New("no listeners present to receive event") | |
74 | ||
75 | // ErrListenerAlreadyExists is the error returned when the listerner already | |
76 | // exists. | |
77 | ErrListenerAlreadyExists = errors.New("listener already exists for docker events") | |
78 | ||
79 | // EOFEvent is sent when the event listener receives an EOF error. | |
80 | EOFEvent = &APIEvents{ | |
81 | Type: "EOF", | |
82 | Status: "EOF", | |
83 | } | |
84 | ) | |
85 | ||
86 | // AddEventListener adds a new listener to container events in the Docker API. | |
87 | // | |
88 | // The parameter is a channel through which events will be sent. | |
89 | func (c *Client) AddEventListener(listener chan<- *APIEvents) error { | |
90 | var err error | |
91 | if !c.eventMonitor.isEnabled() { | |
92 | err = c.eventMonitor.enableEventMonitoring(c) | |
93 | if err != nil { | |
94 | return err | |
95 | } | |
96 | } | |
97 | err = c.eventMonitor.addListener(listener) | |
98 | if err != nil { | |
99 | return err | |
100 | } | |
101 | return nil | |
102 | } | |
103 | ||
104 | // RemoveEventListener removes a listener from the monitor. | |
105 | func (c *Client) RemoveEventListener(listener chan *APIEvents) error { | |
106 | err := c.eventMonitor.removeListener(listener) | |
107 | if err != nil { | |
108 | return err | |
109 | } | |
110 | if len(c.eventMonitor.listeners) == 0 { | |
111 | c.eventMonitor.disableEventMonitoring() | |
112 | } | |
113 | return nil | |
114 | } | |
115 | ||
116 | func (eventState *eventMonitoringState) addListener(listener chan<- *APIEvents) error { | |
117 | eventState.Lock() | |
118 | defer eventState.Unlock() | |
119 | if listenerExists(listener, &eventState.listeners) { | |
120 | return ErrListenerAlreadyExists | |
121 | } | |
122 | eventState.Add(1) | |
123 | eventState.listeners = append(eventState.listeners, listener) | |
124 | return nil | |
125 | } | |
126 | ||
127 | func (eventState *eventMonitoringState) removeListener(listener chan<- *APIEvents) error { | |
128 | eventState.Lock() | |
129 | defer eventState.Unlock() | |
130 | if listenerExists(listener, &eventState.listeners) { | |
131 | var newListeners []chan<- *APIEvents | |
132 | for _, l := range eventState.listeners { | |
133 | if l != listener { | |
134 | newListeners = append(newListeners, l) | |
135 | } | |
136 | } | |
137 | eventState.listeners = newListeners | |
138 | eventState.Add(-1) | |
139 | } | |
140 | return nil | |
141 | } | |
142 | ||
143 | func (eventState *eventMonitoringState) closeListeners() { | |
144 | for _, l := range eventState.listeners { | |
145 | close(l) | |
146 | eventState.Add(-1) | |
147 | } | |
148 | eventState.listeners = nil | |
149 | } | |
150 | ||
151 | func listenerExists(a chan<- *APIEvents, list *[]chan<- *APIEvents) bool { | |
152 | for _, b := range *list { | |
153 | if b == a { | |
154 | return true | |
155 | } | |
156 | } | |
157 | return false | |
158 | } | |
159 | ||
160 | func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error { | |
161 | eventState.Lock() | |
162 | defer eventState.Unlock() | |
163 | if !eventState.enabled { | |
164 | eventState.enabled = true | |
165 | atomic.StoreInt64(&eventState.lastSeen, 0) | |
166 | eventState.C = make(chan *APIEvents, 100) | |
167 | eventState.errC = make(chan error, 1) | |
168 | go eventState.monitorEvents(c) | |
169 | } | |
170 | return nil | |
171 | } | |
172 | ||
173 | func (eventState *eventMonitoringState) disableEventMonitoring() error { | |
174 | eventState.Lock() | |
175 | defer eventState.Unlock() | |
176 | ||
177 | eventState.closeListeners() | |
178 | ||
179 | eventState.Wait() | |
180 | ||
181 | if eventState.enabled { | |
182 | eventState.enabled = false | |
183 | close(eventState.C) | |
184 | close(eventState.errC) | |
185 | } | |
186 | return nil | |
187 | } | |
188 | ||
189 | func (eventState *eventMonitoringState) monitorEvents(c *Client) { | |
190 | var err error | |
191 | for eventState.noListeners() { | |
192 | time.Sleep(10 * time.Millisecond) | |
193 | } | |
194 | if err = eventState.connectWithRetry(c); err != nil { | |
195 | // terminate if connect failed | |
196 | eventState.disableEventMonitoring() | |
197 | return | |
198 | } | |
199 | for eventState.isEnabled() { | |
200 | timeout := time.After(100 * time.Millisecond) | |
201 | select { | |
202 | case ev, ok := <-eventState.C: | |
203 | if !ok { | |
204 | return | |
205 | } | |
206 | if ev == EOFEvent { | |
207 | eventState.disableEventMonitoring() | |
208 | return | |
209 | } | |
210 | eventState.updateLastSeen(ev) | |
211 | go eventState.sendEvent(ev) | |
212 | case err = <-eventState.errC: | |
213 | if err == ErrNoListeners { | |
214 | eventState.disableEventMonitoring() | |
215 | return | |
216 | } else if err != nil { | |
217 | defer func() { go eventState.monitorEvents(c) }() | |
218 | return | |
219 | } | |
220 | case <-timeout: | |
221 | continue | |
222 | } | |
223 | } | |
224 | } | |
225 | ||
226 | func (eventState *eventMonitoringState) connectWithRetry(c *Client) error { | |
227 | var retries int | |
228 | eventState.RLock() | |
229 | eventChan := eventState.C | |
230 | errChan := eventState.errC | |
231 | eventState.RUnlock() | |
232 | err := c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan) | |
233 | for ; err != nil && retries < maxMonitorConnRetries; retries++ { | |
234 | waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries))) | |
235 | time.Sleep(time.Duration(waitTime) * time.Millisecond) | |
236 | eventState.RLock() | |
237 | eventChan = eventState.C | |
238 | errChan = eventState.errC | |
239 | eventState.RUnlock() | |
240 | err = c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan) | |
241 | } | |
242 | return err | |
243 | } | |
244 | ||
245 | func (eventState *eventMonitoringState) noListeners() bool { | |
246 | eventState.RLock() | |
247 | defer eventState.RUnlock() | |
248 | return len(eventState.listeners) == 0 | |
249 | } | |
250 | ||
251 | func (eventState *eventMonitoringState) isEnabled() bool { | |
252 | eventState.RLock() | |
253 | defer eventState.RUnlock() | |
254 | return eventState.enabled | |
255 | } | |
256 | ||
257 | func (eventState *eventMonitoringState) sendEvent(event *APIEvents) { | |
258 | eventState.RLock() | |
259 | defer eventState.RUnlock() | |
260 | eventState.Add(1) | |
261 | defer eventState.Done() | |
262 | if eventState.enabled { | |
263 | if len(eventState.listeners) == 0 { | |
264 | eventState.errC <- ErrNoListeners | |
265 | return | |
266 | } | |
267 | ||
268 | for _, listener := range eventState.listeners { | |
269 | listener <- event | |
270 | } | |
271 | } | |
272 | } | |
273 | ||
274 | func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) { | |
275 | eventState.Lock() | |
276 | defer eventState.Unlock() | |
277 | if atomic.LoadInt64(&eventState.lastSeen) < e.Time { | |
278 | atomic.StoreInt64(&eventState.lastSeen, e.Time) | |
279 | } | |
280 | } | |
281 | ||
282 | func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan chan error) error { | |
283 | uri := "/events" | |
284 | if startTime != 0 { | |
285 | uri += fmt.Sprintf("?since=%d", startTime) | |
286 | } | |
287 | protocol := c.endpointURL.Scheme | |
288 | address := c.endpointURL.Path | |
289 | if protocol != "unix" { | |
290 | protocol = "tcp" | |
291 | address = c.endpointURL.Host | |
292 | } | |
293 | var dial net.Conn | |
294 | var err error | |
295 | if c.TLSConfig == nil { | |
296 | dial, err = c.Dialer.Dial(protocol, address) | |
297 | } else { | |
298 | dial, err = tlsDialWithDialer(c.Dialer, protocol, address, c.TLSConfig) | |
299 | } | |
300 | if err != nil { | |
301 | return err | |
302 | } | |
303 | conn := httputil.NewClientConn(dial, nil) | |
304 | req, err := http.NewRequest("GET", uri, nil) | |
305 | if err != nil { | |
306 | return err | |
307 | } | |
308 | res, err := conn.Do(req) | |
309 | if err != nil { | |
310 | return err | |
311 | } | |
312 | go func(res *http.Response, conn *httputil.ClientConn) { | |
313 | defer conn.Close() | |
314 | defer res.Body.Close() | |
315 | decoder := json.NewDecoder(res.Body) | |
316 | for { | |
317 | var event APIEvents | |
318 | if err = decoder.Decode(&event); err != nil { | |
319 | if err == io.EOF || err == io.ErrUnexpectedEOF { | |
320 | c.eventMonitor.RLock() | |
321 | if c.eventMonitor.enabled && c.eventMonitor.C == eventChan { | |
322 | // Signal that we're exiting. | |
323 | eventChan <- EOFEvent | |
324 | } | |
325 | c.eventMonitor.RUnlock() | |
326 | break | |
327 | } | |
328 | errChan <- err | |
329 | } | |
330 | if event.Time == 0 { | |
331 | continue | |
332 | } | |
333 | if !c.eventMonitor.isEnabled() || c.eventMonitor.C != eventChan { | |
334 | return | |
335 | } | |
336 | transformEvent(&event) | |
337 | eventChan <- &event | |
338 | } | |
339 | }(res, conn) | |
340 | return nil | |
341 | } | |
342 | ||
343 | // transformEvent takes an event and determines what version it is from | |
344 | // then populates both versions of the event | |
345 | func transformEvent(event *APIEvents) { | |
346 | // if event version is <= 1.21 there will be no Action and no Type | |
347 | if event.Action == "" && event.Type == "" { | |
348 | event.Action = event.Status | |
349 | event.Actor.ID = event.ID | |
350 | event.Actor.Attributes = map[string]string{} | |
351 | switch event.Status { | |
352 | case "delete", "import", "pull", "push", "tag", "untag": | |
353 | event.Type = "image" | |
354 | default: | |
355 | event.Type = "container" | |
356 | if event.From != "" { | |
357 | event.Actor.Attributes["image"] = event.From | |
358 | } | |
359 | } | |
360 | } else { | |
361 | if event.Status == "" { | |
362 | if event.Type == "image" || event.Type == "container" { | |
363 | event.Status = event.Action | |
364 | } else { | |
365 | // Because just the Status has been overloaded with different Types | |
366 | // if an event is not for an image or a container, we prepend the type | |
367 | // to avoid problems for people relying on actions being only for | |
368 | // images and containers | |
369 | event.Status = event.Type + ":" + event.Action | |
370 | } | |
371 | } | |
372 | if event.ID == "" { | |
373 | event.ID = event.Actor.ID | |
374 | } | |
375 | if event.From == "" { | |
376 | event.From = event.Actor.Attributes["image"] | |
377 | } | |
378 | } | |
379 | } |