aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/github.com/fsouza/go-dockerclient/external/github.com/docker/docker/pkg/ioutils/bytespipe.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/fsouza/go-dockerclient/external/github.com/docker/docker/pkg/ioutils/bytespipe.go')
-rw-r--r--vendor/github.com/fsouza/go-dockerclient/external/github.com/docker/docker/pkg/ioutils/bytespipe.go152
1 files changed, 0 insertions, 152 deletions
diff --git a/vendor/github.com/fsouza/go-dockerclient/external/github.com/docker/docker/pkg/ioutils/bytespipe.go b/vendor/github.com/fsouza/go-dockerclient/external/github.com/docker/docker/pkg/ioutils/bytespipe.go
deleted file mode 100644
index e263c28..0000000
--- a/vendor/github.com/fsouza/go-dockerclient/external/github.com/docker/docker/pkg/ioutils/bytespipe.go
+++ /dev/null
@@ -1,152 +0,0 @@
1package ioutils
2
3import (
4 "errors"
5 "io"
6 "sync"
7)
8
9// maxCap is the highest capacity to use in byte slices that buffer data.
10const maxCap = 1e6
11
12// blockThreshold is the minimum number of bytes in the buffer which will cause
13// a write to BytesPipe to block when allocating a new slice.
14const blockThreshold = 1e6
15
16// ErrClosed is returned when Write is called on a closed BytesPipe.
17var ErrClosed = errors.New("write to closed BytesPipe")
18
19// BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
20// All written data may be read at most once. Also, BytesPipe allocates
21// and releases new byte slices to adjust to current needs, so the buffer
22// won't be overgrown after peak loads.
23type BytesPipe struct {
24 mu sync.Mutex
25 wait *sync.Cond
26 buf [][]byte // slice of byte-slices of buffered data
27 lastRead int // index in the first slice to a read point
28 bufLen int // length of data buffered over the slices
29 closeErr error // error to return from next Read. set to nil if not closed.
30}
31
32// NewBytesPipe creates new BytesPipe, initialized by specified slice.
33// If buf is nil, then it will be initialized with slice which cap is 64.
34// buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
35func NewBytesPipe(buf []byte) *BytesPipe {
36 if cap(buf) == 0 {
37 buf = make([]byte, 0, 64)
38 }
39 bp := &BytesPipe{
40 buf: [][]byte{buf[:0]},
41 }
42 bp.wait = sync.NewCond(&bp.mu)
43 return bp
44}
45
46// Write writes p to BytesPipe.
47// It can allocate new []byte slices in a process of writing.
48func (bp *BytesPipe) Write(p []byte) (int, error) {
49 bp.mu.Lock()
50 defer bp.mu.Unlock()
51 written := 0
52 for {
53 if bp.closeErr != nil {
54 return written, ErrClosed
55 }
56 // write data to the last buffer
57 b := bp.buf[len(bp.buf)-1]
58 // copy data to the current empty allocated area
59 n := copy(b[len(b):cap(b)], p)
60 // increment buffered data length
61 bp.bufLen += n
62 // include written data in last buffer
63 bp.buf[len(bp.buf)-1] = b[:len(b)+n]
64
65 written += n
66
67 // if there was enough room to write all then break
68 if len(p) == n {
69 break
70 }
71
72 // more data: write to the next slice
73 p = p[n:]
74
75 // block if too much data is still in the buffer
76 for bp.bufLen >= blockThreshold {
77 bp.wait.Wait()
78 }
79
80 // allocate slice that has twice the size of the last unless maximum reached
81 nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
82 if nextCap > maxCap {
83 nextCap = maxCap
84 }
85 // add new byte slice to the buffers slice and continue writing
86 bp.buf = append(bp.buf, make([]byte, 0, nextCap))
87 }
88 bp.wait.Broadcast()
89 return written, nil
90}
91
92// CloseWithError causes further reads from a BytesPipe to return immediately.
93func (bp *BytesPipe) CloseWithError(err error) error {
94 bp.mu.Lock()
95 if err != nil {
96 bp.closeErr = err
97 } else {
98 bp.closeErr = io.EOF
99 }
100 bp.wait.Broadcast()
101 bp.mu.Unlock()
102 return nil
103}
104
105// Close causes further reads from a BytesPipe to return immediately.
106func (bp *BytesPipe) Close() error {
107 return bp.CloseWithError(nil)
108}
109
110func (bp *BytesPipe) len() int {
111 return bp.bufLen - bp.lastRead
112}
113
114// Read reads bytes from BytesPipe.
115// Data could be read only once.
116func (bp *BytesPipe) Read(p []byte) (n int, err error) {
117 bp.mu.Lock()
118 defer bp.mu.Unlock()
119 if bp.len() == 0 {
120 if bp.closeErr != nil {
121 return 0, bp.closeErr
122 }
123 bp.wait.Wait()
124 if bp.len() == 0 && bp.closeErr != nil {
125 return 0, bp.closeErr
126 }
127 }
128 for {
129 read := copy(p, bp.buf[0][bp.lastRead:])
130 n += read
131 bp.lastRead += read
132 if bp.len() == 0 {
133 // we have read everything. reset to the beginning.
134 bp.lastRead = 0
135 bp.bufLen -= len(bp.buf[0])
136 bp.buf[0] = bp.buf[0][:0]
137 break
138 }
139 // break if everything was read
140 if len(p) == read {
141 break
142 }
143 // more buffered data and more asked. read from next slice.
144 p = p[read:]
145 bp.lastRead = 0
146 bp.bufLen -= len(bp.buf[0])
147 bp.buf[0] = nil // throw away old slice
148 bp.buf = bp.buf[1:] // switch to next
149 }
150 bp.wait.Broadcast()
151 return
152}