]>
Commit | Line | Data |
---|---|---|
9b12e4fe JC |
1 | package ioutils |
2 | ||
3 | import ( | |
4 | "errors" | |
5 | "io" | |
6 | "sync" | |
7 | ) | |
8 | ||
9 | // maxCap is the highest capacity to use in byte slices that buffer data. | |
10 | const maxCap = 1e6 | |
11 | ||
12 | // blockThreshold is the minimum number of bytes in the buffer which will cause | |
13 | // a write to BytesPipe to block when allocating a new slice. | |
14 | const blockThreshold = 1e6 | |
15 | ||
16 | // ErrClosed is returned when Write is called on a closed BytesPipe. | |
17 | var ErrClosed = errors.New("write to closed BytesPipe") | |
18 | ||
19 | // BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue). | |
20 | // All written data may be read at most once. Also, BytesPipe allocates | |
21 | // and releases new byte slices to adjust to current needs, so the buffer | |
22 | // won't be overgrown after peak loads. | |
23 | type BytesPipe struct { | |
24 | mu sync.Mutex | |
25 | wait *sync.Cond | |
26 | buf [][]byte // slice of byte-slices of buffered data | |
27 | lastRead int // index in the first slice to a read point | |
28 | bufLen int // length of data buffered over the slices | |
29 | closeErr error // error to return from next Read. set to nil if not closed. | |
30 | } | |
31 | ||
32 | // NewBytesPipe creates new BytesPipe, initialized by specified slice. | |
33 | // If buf is nil, then it will be initialized with slice which cap is 64. | |
34 | // buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf). | |
35 | func NewBytesPipe(buf []byte) *BytesPipe { | |
36 | if cap(buf) == 0 { | |
37 | buf = make([]byte, 0, 64) | |
38 | } | |
39 | bp := &BytesPipe{ | |
40 | buf: [][]byte{buf[:0]}, | |
41 | } | |
42 | bp.wait = sync.NewCond(&bp.mu) | |
43 | return bp | |
44 | } | |
45 | ||
46 | // Write writes p to BytesPipe. | |
47 | // It can allocate new []byte slices in a process of writing. | |
48 | func (bp *BytesPipe) Write(p []byte) (int, error) { | |
49 | bp.mu.Lock() | |
50 | defer bp.mu.Unlock() | |
51 | written := 0 | |
52 | for { | |
53 | if bp.closeErr != nil { | |
54 | return written, ErrClosed | |
55 | } | |
56 | // write data to the last buffer | |
57 | b := bp.buf[len(bp.buf)-1] | |
58 | // copy data to the current empty allocated area | |
59 | n := copy(b[len(b):cap(b)], p) | |
60 | // increment buffered data length | |
61 | bp.bufLen += n | |
62 | // include written data in last buffer | |
63 | bp.buf[len(bp.buf)-1] = b[:len(b)+n] | |
64 | ||
65 | written += n | |
66 | ||
67 | // if there was enough room to write all then break | |
68 | if len(p) == n { | |
69 | break | |
70 | } | |
71 | ||
72 | // more data: write to the next slice | |
73 | p = p[n:] | |
74 | ||
75 | // block if too much data is still in the buffer | |
76 | for bp.bufLen >= blockThreshold { | |
77 | bp.wait.Wait() | |
78 | } | |
79 | ||
80 | // allocate slice that has twice the size of the last unless maximum reached | |
81 | nextCap := 2 * cap(bp.buf[len(bp.buf)-1]) | |
82 | if nextCap > maxCap { | |
83 | nextCap = maxCap | |
84 | } | |
85 | // add new byte slice to the buffers slice and continue writing | |
86 | bp.buf = append(bp.buf, make([]byte, 0, nextCap)) | |
87 | } | |
88 | bp.wait.Broadcast() | |
89 | return written, nil | |
90 | } | |
91 | ||
92 | // CloseWithError causes further reads from a BytesPipe to return immediately. | |
93 | func (bp *BytesPipe) CloseWithError(err error) error { | |
94 | bp.mu.Lock() | |
95 | if err != nil { | |
96 | bp.closeErr = err | |
97 | } else { | |
98 | bp.closeErr = io.EOF | |
99 | } | |
100 | bp.wait.Broadcast() | |
101 | bp.mu.Unlock() | |
102 | return nil | |
103 | } | |
104 | ||
105 | // Close causes further reads from a BytesPipe to return immediately. | |
106 | func (bp *BytesPipe) Close() error { | |
107 | return bp.CloseWithError(nil) | |
108 | } | |
109 | ||
110 | func (bp *BytesPipe) len() int { | |
111 | return bp.bufLen - bp.lastRead | |
112 | } | |
113 | ||
114 | // Read reads bytes from BytesPipe. | |
115 | // Data could be read only once. | |
116 | func (bp *BytesPipe) Read(p []byte) (n int, err error) { | |
117 | bp.mu.Lock() | |
118 | defer bp.mu.Unlock() | |
119 | if bp.len() == 0 { | |
120 | if bp.closeErr != nil { | |
121 | return 0, bp.closeErr | |
122 | } | |
123 | bp.wait.Wait() | |
124 | if bp.len() == 0 && bp.closeErr != nil { | |
125 | return 0, bp.closeErr | |
126 | } | |
127 | } | |
128 | for { | |
129 | read := copy(p, bp.buf[0][bp.lastRead:]) | |
130 | n += read | |
131 | bp.lastRead += read | |
132 | if bp.len() == 0 { | |
133 | // we have read everything. reset to the beginning. | |
134 | bp.lastRead = 0 | |
135 | bp.bufLen -= len(bp.buf[0]) | |
136 | bp.buf[0] = bp.buf[0][:0] | |
137 | break | |
138 | } | |
139 | // break if everything was read | |
140 | if len(p) == read { | |
141 | break | |
142 | } | |
143 | // more buffered data and more asked. read from next slice. | |
144 | p = p[read:] | |
145 | bp.lastRead = 0 | |
146 | bp.bufLen -= len(bp.buf[0]) | |
147 | bp.buf[0] = nil // throw away old slice | |
148 | bp.buf = bp.buf[1:] // switch to next | |
149 | } | |
150 | bp.wait.Broadcast() | |
151 | return | |
152 | } |