aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/github.com/fsouza/go-dockerclient/external/github.com/docker/docker/pkg/ioutils/bytespipe.go
blob: e263c284f09f2bfe2e180519e9554c8a4c189a5d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package ioutils

import (
	"errors"
	"io"
	"sync"
)

// maxCap is the highest capacity to use in byte slices that buffer data.
const maxCap = 1e6

// blockThreshold is the minimum number of bytes in the buffer which will cause
// a write to BytesPipe to block when allocating a new slice.
const blockThreshold = 1e6

// ErrClosed is returned when Write is called on a closed BytesPipe.
var ErrClosed = errors.New("write to closed BytesPipe")

// BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
// All written data may be read at most once. Also, BytesPipe allocates
// and releases new byte slices to adjust to current needs, so the buffer
// won't be overgrown after peak loads.
type BytesPipe struct {
	mu       sync.Mutex
	wait     *sync.Cond
	buf      [][]byte // slice of byte-slices of buffered data
	lastRead int      // index in the first slice to a read point
	bufLen   int      // length of data buffered over the slices
	closeErr error    // error to return from next Read. set to nil if not closed.
}

// NewBytesPipe creates new BytesPipe, initialized by specified slice.
// If buf is nil, then it will be initialized with slice which cap is 64.
// buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
func NewBytesPipe(buf []byte) *BytesPipe {
	if cap(buf) == 0 {
		buf = make([]byte, 0, 64)
	}
	bp := &BytesPipe{
		buf: [][]byte{buf[:0]},
	}
	bp.wait = sync.NewCond(&bp.mu)
	return bp
}

// Write writes p to BytesPipe.
// It can allocate new []byte slices in a process of writing.
func (bp *BytesPipe) Write(p []byte) (int, error) {
	bp.mu.Lock()
	defer bp.mu.Unlock()
	written := 0
	for {
		if bp.closeErr != nil {
			return written, ErrClosed
		}
		// write data to the last buffer
		b := bp.buf[len(bp.buf)-1]
		// copy data to the current empty allocated area
		n := copy(b[len(b):cap(b)], p)
		// increment buffered data length
		bp.bufLen += n
		// include written data in last buffer
		bp.buf[len(bp.buf)-1] = b[:len(b)+n]

		written += n

		// if there was enough room to write all then break
		if len(p) == n {
			break
		}

		// more data: write to the next slice
		p = p[n:]

		// block if too much data is still in the buffer
		for bp.bufLen >= blockThreshold {
			bp.wait.Wait()
		}

		// allocate slice that has twice the size of the last unless maximum reached
		nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
		if nextCap > maxCap {
			nextCap = maxCap
		}
		// add new byte slice to the buffers slice and continue writing
		bp.buf = append(bp.buf, make([]byte, 0, nextCap))
	}
	bp.wait.Broadcast()
	return written, nil
}

// CloseWithError causes further reads from a BytesPipe to return immediately.
func (bp *BytesPipe) CloseWithError(err error) error {
	bp.mu.Lock()
	if err != nil {
		bp.closeErr = err
	} else {
		bp.closeErr = io.EOF
	}
	bp.wait.Broadcast()
	bp.mu.Unlock()
	return nil
}

// Close causes further reads from a BytesPipe to return immediately.
func (bp *BytesPipe) Close() error {
	return bp.CloseWithError(nil)
}

func (bp *BytesPipe) len() int {
	return bp.bufLen - bp.lastRead
}

// Read reads bytes from BytesPipe.
// Data could be read only once.
func (bp *BytesPipe) Read(p []byte) (n int, err error) {
	bp.mu.Lock()
	defer bp.mu.Unlock()
	if bp.len() == 0 {
		if bp.closeErr != nil {
			return 0, bp.closeErr
		}
		bp.wait.Wait()
		if bp.len() == 0 && bp.closeErr != nil {
			return 0, bp.closeErr
		}
	}
	for {
		read := copy(p, bp.buf[0][bp.lastRead:])
		n += read
		bp.lastRead += read
		if bp.len() == 0 {
			// we have read everything. reset to the beginning.
			bp.lastRead = 0
			bp.bufLen -= len(bp.buf[0])
			bp.buf[0] = bp.buf[0][:0]
			break
		}
		// break if everything was read
		if len(p) == read {
			break
		}
		// more buffered data and more asked. read from next slice.
		p = p[read:]
		bp.lastRead = 0
		bp.bufLen -= len(bp.buf[0])
		bp.buf[0] = nil     // throw away old slice
		bp.buf = bp.buf[1:] // switch to next
	}
	bp.wait.Broadcast()
	return
}