9 // maxCap is the highest capacity to use in byte slices that buffer data.
12 // blockThreshold is the minimum number of bytes in the buffer which will cause
13 // a write to BytesPipe to block when allocating a new slice.
14 const blockThreshold = 1e6
16 // ErrClosed is returned when Write is called on a closed BytesPipe.
17 var ErrClosed = errors.New("write to closed BytesPipe")
19 // BytesPipe is io.ReadWriteCloser which works similarly to pipe(queue).
20 // All written data may be read at most once. Also, BytesPipe allocates
21 // and releases new byte slices to adjust to current needs, so the buffer
22 // won't be overgrown after peak loads.
23 type BytesPipe struct {
26 buf [][]byte // slice of byte-slices of buffered data
27 lastRead int // index in the first slice to a read point
28 bufLen int // length of data buffered over the slices
29 closeErr error // error to return from next Read. set to nil if not closed.
32 // NewBytesPipe creates new BytesPipe, initialized by specified slice.
33 // If buf is nil, then it will be initialized with slice which cap is 64.
34 // buf will be adjusted in a way that len(buf) == 0, cap(buf) == cap(buf).
35 func NewBytesPipe(buf []byte) *BytesPipe {
37 buf = make([]byte, 0, 64)
40 buf: [][]byte{buf[:0]},
42 bp.wait = sync.NewCond(&bp.mu)
46 // Write writes p to BytesPipe.
47 // It can allocate new []byte slices in a process of writing.
48 func (bp *BytesPipe) Write(p []byte) (int, error) {
53 if bp.closeErr != nil {
54 return written, ErrClosed
56 // write data to the last buffer
57 b := bp.buf[len(bp.buf)-1]
58 // copy data to the current empty allocated area
59 n := copy(b[len(b):cap(b)], p)
60 // increment buffered data length
62 // include written data in last buffer
63 bp.buf[len(bp.buf)-1] = b[:len(b)+n]
67 // if there was enough room to write all then break
72 // more data: write to the next slice
75 // block if too much data is still in the buffer
76 for bp.bufLen >= blockThreshold {
80 // allocate slice that has twice the size of the last unless maximum reached
81 nextCap := 2 * cap(bp.buf[len(bp.buf)-1])
85 // add new byte slice to the buffers slice and continue writing
86 bp.buf = append(bp.buf, make([]byte, 0, nextCap))
92 // CloseWithError causes further reads from a BytesPipe to return immediately.
93 func (bp *BytesPipe) CloseWithError(err error) error {
105 // Close causes further reads from a BytesPipe to return immediately.
106 func (bp *BytesPipe) Close() error {
107 return bp.CloseWithError(nil)
110 func (bp *BytesPipe) len() int {
111 return bp.bufLen - bp.lastRead
114 // Read reads bytes from BytesPipe.
115 // Data could be read only once.
116 func (bp *BytesPipe) Read(p []byte) (n int, err error) {
120 if bp.closeErr != nil {
121 return 0, bp.closeErr
124 if bp.len() == 0 && bp.closeErr != nil {
125 return 0, bp.closeErr
129 read := copy(p, bp.buf[0][bp.lastRead:])
133 // we have read everything. reset to the beginning.
135 bp.bufLen -= len(bp.buf[0])
136 bp.buf[0] = bp.buf[0][:0]
139 // break if everything was read
143 // more buffered data and more asked. read from next slice.
146 bp.bufLen -= len(bp.buf[0])
147 bp.buf[0] = nil // throw away old slice
148 bp.buf = bp.buf[1:] // switch to next