1 // Copyright 2015 go-dockerclient authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
21 // APIEvents represents events coming from the Docker API
22 // The fields in the Docker API changed in API version 1.22, and
23 // events for more than images and containers are now fired off.
24 // To maintain forward and backward compatibility, go-dockerclient
25 // replicates the event in both the new and old format as faithfully as possible.
27 // For events that only exist in 1.22 in later, `Status` is filled in as
28 // `"Type:Action"` instead of just `Action` to allow for older clients to
29 // differentiate and not break if they rely on the pre-1.22 Status types.
31 // The transformEvent method can be consulted for more information about how
32 // events are translated from new/old API formats
33 type APIEvents struct {
34 // New API Fields in 1.22
35 Action string `json:"action,omitempty"`
36 Type string `json:"type,omitempty"`
37 Actor APIActor `json:"actor,omitempty"`
39 // Old API fields for < 1.22
40 Status string `json:"status,omitempty"`
41 ID string `json:"id,omitempty"`
42 From string `json:"from,omitempty"`
45 Time int64 `json:"time,omitempty"`
46 TimeNano int64 `json:"timeNano,omitempty"`
49 // APIActor represents an actor that accomplishes something for an event
50 type APIActor struct {
51 ID string `json:"id,omitempty"`
52 Attributes map[string]string `json:"attributes,omitempty"`
55 type eventMonitoringState struct {
62 listeners []chan<- *APIEvents
66 maxMonitorConnRetries = 5
67 retryInitialWaitTime = 10.
71 // ErrNoListeners is the error returned when no listeners are available
72 // to receive an event.
73 ErrNoListeners = errors.New("no listeners present to receive event")
75 // ErrListenerAlreadyExists is the error returned when the listerner already
77 ErrListenerAlreadyExists = errors.New("listener already exists for docker events")
79 // EOFEvent is sent when the event listener receives an EOF error.
80 EOFEvent = &APIEvents{
86 // AddEventListener adds a new listener to container events in the Docker API.
88 // The parameter is a channel through which events will be sent.
89 func (c *Client) AddEventListener(listener chan<- *APIEvents) error {
91 if !c.eventMonitor.isEnabled() {
92 err = c.eventMonitor.enableEventMonitoring(c)
97 err = c.eventMonitor.addListener(listener)
104 // RemoveEventListener removes a listener from the monitor.
105 func (c *Client) RemoveEventListener(listener chan *APIEvents) error {
106 err := c.eventMonitor.removeListener(listener)
110 if len(c.eventMonitor.listeners) == 0 {
111 c.eventMonitor.disableEventMonitoring()
116 func (eventState *eventMonitoringState) addListener(listener chan<- *APIEvents) error {
118 defer eventState.Unlock()
119 if listenerExists(listener, &eventState.listeners) {
120 return ErrListenerAlreadyExists
123 eventState.listeners = append(eventState.listeners, listener)
127 func (eventState *eventMonitoringState) removeListener(listener chan<- *APIEvents) error {
129 defer eventState.Unlock()
130 if listenerExists(listener, &eventState.listeners) {
131 var newListeners []chan<- *APIEvents
132 for _, l := range eventState.listeners {
134 newListeners = append(newListeners, l)
137 eventState.listeners = newListeners
143 func (eventState *eventMonitoringState) closeListeners() {
144 for _, l := range eventState.listeners {
148 eventState.listeners = nil
151 func listenerExists(a chan<- *APIEvents, list *[]chan<- *APIEvents) bool {
152 for _, b := range *list {
160 func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error {
162 defer eventState.Unlock()
163 if !eventState.enabled {
164 eventState.enabled = true
165 atomic.StoreInt64(&eventState.lastSeen, 0)
166 eventState.C = make(chan *APIEvents, 100)
167 eventState.errC = make(chan error, 1)
168 go eventState.monitorEvents(c)
173 func (eventState *eventMonitoringState) disableEventMonitoring() error {
175 defer eventState.Unlock()
177 eventState.closeListeners()
181 if eventState.enabled {
182 eventState.enabled = false
184 close(eventState.errC)
189 func (eventState *eventMonitoringState) monitorEvents(c *Client) {
191 for eventState.noListeners() {
192 time.Sleep(10 * time.Millisecond)
194 if err = eventState.connectWithRetry(c); err != nil {
195 // terminate if connect failed
196 eventState.disableEventMonitoring()
199 for eventState.isEnabled() {
200 timeout := time.After(100 * time.Millisecond)
202 case ev, ok := <-eventState.C:
207 eventState.disableEventMonitoring()
210 eventState.updateLastSeen(ev)
211 go eventState.sendEvent(ev)
212 case err = <-eventState.errC:
213 if err == ErrNoListeners {
214 eventState.disableEventMonitoring()
216 } else if err != nil {
217 defer func() { go eventState.monitorEvents(c) }()
226 func (eventState *eventMonitoringState) connectWithRetry(c *Client) error {
229 eventChan := eventState.C
230 errChan := eventState.errC
232 err := c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
233 for ; err != nil && retries < maxMonitorConnRetries; retries++ {
234 waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries)))
235 time.Sleep(time.Duration(waitTime) * time.Millisecond)
237 eventChan = eventState.C
238 errChan = eventState.errC
240 err = c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
245 func (eventState *eventMonitoringState) noListeners() bool {
247 defer eventState.RUnlock()
248 return len(eventState.listeners) == 0
251 func (eventState *eventMonitoringState) isEnabled() bool {
253 defer eventState.RUnlock()
254 return eventState.enabled
257 func (eventState *eventMonitoringState) sendEvent(event *APIEvents) {
259 defer eventState.RUnlock()
261 defer eventState.Done()
262 if eventState.enabled {
263 if len(eventState.listeners) == 0 {
264 eventState.errC <- ErrNoListeners
268 for _, listener := range eventState.listeners {
274 func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) {
276 defer eventState.Unlock()
277 if atomic.LoadInt64(&eventState.lastSeen) < e.Time {
278 atomic.StoreInt64(&eventState.lastSeen, e.Time)
282 func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan chan error) error {
285 uri += fmt.Sprintf("?since=%d", startTime)
287 protocol := c.endpointURL.Scheme
288 address := c.endpointURL.Path
289 if protocol != "unix" {
291 address = c.endpointURL.Host
295 if c.TLSConfig == nil {
296 dial, err = c.Dialer.Dial(protocol, address)
298 dial, err = tlsDialWithDialer(c.Dialer, protocol, address, c.TLSConfig)
303 conn := httputil.NewClientConn(dial, nil)
304 req, err := http.NewRequest("GET", uri, nil)
308 res, err := conn.Do(req)
312 go func(res *http.Response, conn *httputil.ClientConn) {
314 defer res.Body.Close()
315 decoder := json.NewDecoder(res.Body)
318 if err = decoder.Decode(&event); err != nil {
319 if err == io.EOF || err == io.ErrUnexpectedEOF {
320 c.eventMonitor.RLock()
321 if c.eventMonitor.enabled && c.eventMonitor.C == eventChan {
322 // Signal that we're exiting.
323 eventChan <- EOFEvent
325 c.eventMonitor.RUnlock()
333 if !c.eventMonitor.isEnabled() || c.eventMonitor.C != eventChan {
336 transformEvent(&event)
343 // transformEvent takes an event and determines what version it is from
344 // then populates both versions of the event
345 func transformEvent(event *APIEvents) {
346 // if event version is <= 1.21 there will be no Action and no Type
347 if event.Action == "" && event.Type == "" {
348 event.Action = event.Status
349 event.Actor.ID = event.ID
350 event.Actor.Attributes = map[string]string{}
351 switch event.Status {
352 case "delete", "import", "pull", "push", "tag", "untag":
355 event.Type = "container"
356 if event.From != "" {
357 event.Actor.Attributes["image"] = event.From
361 if event.Status == "" {
362 if event.Type == "image" || event.Type == "container" {
363 event.Status = event.Action
365 // Because just the Status has been overloaded with different Types
366 // if an event is not for an image or a container, we prepend the type
367 // to avoid problems for people relying on actions being only for
368 // images and containers
369 event.Status = event.Type + ":" + event.Action
373 event.ID = event.Actor.ID
375 if event.From == "" {
376 event.From = event.Actor.Attributes["image"]