aboutsummaryrefslogblamecommitdiffhomepage
path: root/vendor/github.com/fsouza/go-dockerclient/event.go
blob: 120cdc9bff2b8afbe4cd55e3078b6d48beffce11 (plain) (tree)


























































































































































































































































































































































































                                                                                                      
// Copyright 2015 go-dockerclient authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package docker

import (
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"math"
	"net"
	"net/http"
	"net/http/httputil"
	"sync"
	"sync/atomic"
	"time"
)

// APIEvents represents events coming from the Docker API
// The fields in the Docker API changed in API version 1.22, and
// events for more than images and containers are now fired off.
// To maintain forward and backward compatibility, go-dockerclient
// replicates the event in both the new and old format as faithfully as possible.
//
// For events that only exist in 1.22 in later, `Status` is filled in as
// `"Type:Action"` instead of just `Action` to allow for older clients to
// differentiate and not break if they rely on the pre-1.22 Status types.
//
// The transformEvent method can be consulted for more information about how
// events are translated from new/old API formats
type APIEvents struct {
	// New API Fields in 1.22
	Action string   `json:"action,omitempty"`
	Type   string   `json:"type,omitempty"`
	Actor  APIActor `json:"actor,omitempty"`

	// Old API fields for < 1.22
	Status string `json:"status,omitempty"`
	ID     string `json:"id,omitempty"`
	From   string `json:"from,omitempty"`

	// Fields in both
	Time     int64 `json:"time,omitempty"`
	TimeNano int64 `json:"timeNano,omitempty"`
}

// APIActor represents an actor that accomplishes something for an event
type APIActor struct {
	ID         string            `json:"id,omitempty"`
	Attributes map[string]string `json:"attributes,omitempty"`
}

type eventMonitoringState struct {
	sync.RWMutex
	sync.WaitGroup
	enabled   bool
	lastSeen  int64
	C         chan *APIEvents
	errC      chan error
	listeners []chan<- *APIEvents
}

const (
	maxMonitorConnRetries = 5
	retryInitialWaitTime  = 10.
)

var (
	// ErrNoListeners is the error returned when no listeners are available
	// to receive an event.
	ErrNoListeners = errors.New("no listeners present to receive event")

	// ErrListenerAlreadyExists is the error returned when the listerner already
	// exists.
	ErrListenerAlreadyExists = errors.New("listener already exists for docker events")

	// EOFEvent is sent when the event listener receives an EOF error.
	EOFEvent = &APIEvents{
		Type:   "EOF",
		Status: "EOF",
	}
)

// AddEventListener adds a new listener to container events in the Docker API.
//
// The parameter is a channel through which events will be sent.
func (c *Client) AddEventListener(listener chan<- *APIEvents) error {
	var err error
	if !c.eventMonitor.isEnabled() {
		err = c.eventMonitor.enableEventMonitoring(c)
		if err != nil {
			return err
		}
	}
	err = c.eventMonitor.addListener(listener)
	if err != nil {
		return err
	}
	return nil
}

// RemoveEventListener removes a listener from the monitor.
func (c *Client) RemoveEventListener(listener chan *APIEvents) error {
	err := c.eventMonitor.removeListener(listener)
	if err != nil {
		return err
	}
	if len(c.eventMonitor.listeners) == 0 {
		c.eventMonitor.disableEventMonitoring()
	}
	return nil
}

func (eventState *eventMonitoringState) addListener(listener chan<- *APIEvents) error {
	eventState.Lock()
	defer eventState.Unlock()
	if listenerExists(listener, &eventState.listeners) {
		return ErrListenerAlreadyExists
	}
	eventState.Add(1)
	eventState.listeners = append(eventState.listeners, listener)
	return nil
}

func (eventState *eventMonitoringState) removeListener(listener chan<- *APIEvents) error {
	eventState.Lock()
	defer eventState.Unlock()
	if listenerExists(listener, &eventState.listeners) {
		var newListeners []chan<- *APIEvents
		for _, l := range eventState.listeners {
			if l != listener {
				newListeners = append(newListeners, l)
			}
		}
		eventState.listeners = newListeners
		eventState.Add(-1)
	}
	return nil
}

func (eventState *eventMonitoringState) closeListeners() {
	for _, l := range eventState.listeners {
		close(l)
		eventState.Add(-1)
	}
	eventState.listeners = nil
}

func listenerExists(a chan<- *APIEvents, list *[]chan<- *APIEvents) bool {
	for _, b := range *list {
		if b == a {
			return true
		}
	}
	return false
}

func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error {
	eventState.Lock()
	defer eventState.Unlock()
	if !eventState.enabled {
		eventState.enabled = true
		atomic.StoreInt64(&eventState.lastSeen, 0)
		eventState.C = make(chan *APIEvents, 100)
		eventState.errC = make(chan error, 1)
		go eventState.monitorEvents(c)
	}
	return nil
}

func (eventState *eventMonitoringState) disableEventMonitoring() error {
	eventState.Lock()
	defer eventState.Unlock()

	eventState.closeListeners()

	eventState.Wait()

	if eventState.enabled {
		eventState.enabled = false
		close(eventState.C)
		close(eventState.errC)
	}
	return nil
}

func (eventState *eventMonitoringState) monitorEvents(c *Client) {
	var err error
	for eventState.noListeners() {
		time.Sleep(10 * time.Millisecond)
	}
	if err = eventState.connectWithRetry(c); err != nil {
		// terminate if connect failed
		eventState.disableEventMonitoring()
		return
	}
	for eventState.isEnabled() {
		timeout := time.After(100 * time.Millisecond)
		select {
		case ev, ok := <-eventState.C:
			if !ok {
				return
			}
			if ev == EOFEvent {
				eventState.disableEventMonitoring()
				return
			}
			eventState.updateLastSeen(ev)
			go eventState.sendEvent(ev)
		case err = <-eventState.errC:
			if err == ErrNoListeners {
				eventState.disableEventMonitoring()
				return
			} else if err != nil {
				defer func() { go eventState.monitorEvents(c) }()
				return
			}
		case <-timeout:
			continue
		}
	}
}

func (eventState *eventMonitoringState) connectWithRetry(c *Client) error {
	var retries int
	eventState.RLock()
	eventChan := eventState.C
	errChan := eventState.errC
	eventState.RUnlock()
	err := c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
	for ; err != nil && retries < maxMonitorConnRetries; retries++ {
		waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries)))
		time.Sleep(time.Duration(waitTime) * time.Millisecond)
		eventState.RLock()
		eventChan = eventState.C
		errChan = eventState.errC
		eventState.RUnlock()
		err = c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
	}
	return err
}

func (eventState *eventMonitoringState) noListeners() bool {
	eventState.RLock()
	defer eventState.RUnlock()
	return len(eventState.listeners) == 0
}

func (eventState *eventMonitoringState) isEnabled() bool {
	eventState.RLock()
	defer eventState.RUnlock()
	return eventState.enabled
}

func (eventState *eventMonitoringState) sendEvent(event *APIEvents) {
	eventState.RLock()
	defer eventState.RUnlock()
	eventState.Add(1)
	defer eventState.Done()
	if eventState.enabled {
		if len(eventState.listeners) == 0 {
			eventState.errC <- ErrNoListeners
			return
		}

		for _, listener := range eventState.listeners {
			listener <- event
		}
	}
}

func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) {
	eventState.Lock()
	defer eventState.Unlock()
	if atomic.LoadInt64(&eventState.lastSeen) < e.Time {
		atomic.StoreInt64(&eventState.lastSeen, e.Time)
	}
}

func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan chan error) error {
	uri := "/events"
	if startTime != 0 {
		uri += fmt.Sprintf("?since=%d", startTime)
	}
	protocol := c.endpointURL.Scheme
	address := c.endpointURL.Path
	if protocol != "unix" {
		protocol = "tcp"
		address = c.endpointURL.Host
	}
	var dial net.Conn
	var err error
	if c.TLSConfig == nil {
		dial, err = c.Dialer.Dial(protocol, address)
	} else {
		dial, err = tlsDialWithDialer(c.Dialer, protocol, address, c.TLSConfig)
	}
	if err != nil {
		return err
	}
	conn := httputil.NewClientConn(dial, nil)
	req, err := http.NewRequest("GET", uri, nil)
	if err != nil {
		return err
	}
	res, err := conn.Do(req)
	if err != nil {
		return err
	}
	go func(res *http.Response, conn *httputil.ClientConn) {
		defer conn.Close()
		defer res.Body.Close()
		decoder := json.NewDecoder(res.Body)
		for {
			var event APIEvents
			if err = decoder.Decode(&event); err != nil {
				if err == io.EOF || err == io.ErrUnexpectedEOF {
					c.eventMonitor.RLock()
					if c.eventMonitor.enabled && c.eventMonitor.C == eventChan {
						// Signal that we're exiting.
						eventChan <- EOFEvent
					}
					c.eventMonitor.RUnlock()
					break
				}
				errChan <- err
			}
			if event.Time == 0 {
				continue
			}
			if !c.eventMonitor.isEnabled() || c.eventMonitor.C != eventChan {
				return
			}
			transformEvent(&event)
			eventChan <- &event
		}
	}(res, conn)
	return nil
}

// transformEvent takes an event and determines what version it is from
// then populates both versions of the event
func transformEvent(event *APIEvents) {
	// if event version is <= 1.21 there will be no Action and no Type
	if event.Action == "" && event.Type == "" {
		event.Action = event.Status
		event.Actor.ID = event.ID
		event.Actor.Attributes = map[string]string{}
		switch event.Status {
		case "delete", "import", "pull", "push", "tag", "untag":
			event.Type = "image"
		default:
			event.Type = "container"
			if event.From != "" {
				event.Actor.Attributes["image"] = event.From
			}
		}
	} else {
		if event.Status == "" {
			if event.Type == "image" || event.Type == "container" {
				event.Status = event.Action
			} else {
				// Because just the Status has been overloaded with different Types
				// if an event is not for an image or a container, we prepend the type
				// to avoid problems for people relying on actions being only for
				// images and containers
				event.Status = event.Type + ":" + event.Action
			}
		}
		if event.ID == "" {
			event.ID = event.Actor.ID
		}
		if event.From == "" {
			event.From = event.Actor.Attributes["image"]
		}
	}
}