3 * Copyright 2017 gRPC authors.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
19 // Package bufconn provides a net.Conn implemented by a buffer and related
20 // dialing and listening functionality.
31 // Listener implements a net.Listener that creates local, buffered net.Conns
32 // via its Accept and Dial method.
33 type Listener struct {
40 var errClosed = fmt.Errorf("Closed")
42 // Listen returns a Listener that can only be contacted by its own Dialers and
43 // creates buffered connections between the two.
44 func Listen(sz int) *Listener {
45 return &Listener{sz: sz, ch: make(chan net.Conn), done: make(chan struct{})}
48 // Accept blocks until Dial is called, then returns a net.Conn for the server
49 // half of the connection.
50 func (l *Listener) Accept() (net.Conn, error) {
59 // Close stops the listener.
60 func (l *Listener) Close() error {
73 // Addr reports the address of the listener.
74 func (l *Listener) Addr() net.Addr { return addr{} }
76 // Dial creates an in-memory full-duplex network connection, unblocks Accept by
77 // providing it the server half of the connection, and returns the client half
79 func (l *Listener) Dial() (net.Conn, error) {
80 p1, p2 := newPipe(l.sz), newPipe(l.sz)
84 case l.ch <- &conn{p1, p2}:
85 return &conn{p2, p1}, nil
92 // buf contains the data in the pipe. It is a ring buffer of fixed capacity,
93 // with r and w pointing to the offset to read and write, respsectively.
95 // Data is read between [r, w) and written to [w, r), wrapping around the end
96 // of the slice if necessary.
98 // The buffer is empty if r == len(buf), otherwise if r == w, it is full.
100 // w and r are always in the range [0, cap(buf)) and [0, len(buf)].
111 func newPipe(sz int) *pipe {
112 p := &pipe{buf: make([]byte, 0, sz)}
118 func (p *pipe) empty() bool {
119 return p.r == len(p.buf)
122 func (p *pipe) full() bool {
123 return p.r < len(p.buf) && p.r == p.w
126 func (p *pipe) Read(b []byte) (n int, err error) {
129 // Block until p has data.
132 return 0, io.ErrClosedPipe
144 n = copy(b, p.buf[p.r:len(p.buf)])
146 if p.r == cap(p.buf) {
151 // Signal a blocked writer, if any
159 func (p *pipe) Write(b []byte) (n int, err error) {
163 return 0, io.ErrClosedPipe
166 // Block until p is not full.
168 if p.closed || p.writeClosed {
169 return 0, io.ErrClosedPipe
176 wasEmpty := p.empty()
182 x := copy(p.buf[p.w:end], b)
186 if p.w > len(p.buf) {
189 if p.w == cap(p.buf) {
193 // Signal a blocked reader, if any.
201 func (p *pipe) Close() error {
205 // Signal all blocked readers and writers to return an error.
211 func (p *pipe) closeWrite() error {
215 // Signal all blocked readers and writers to return an error.
226 func (c *conn) Close() error {
227 err1 := c.Reader.(*pipe).Close()
228 err2 := c.Writer.(*pipe).closeWrite()
235 func (*conn) LocalAddr() net.Addr { return addr{} }
236 func (*conn) RemoteAddr() net.Addr { return addr{} }
237 func (c *conn) SetDeadline(t time.Time) error { return fmt.Errorf("unsupported") }
238 func (c *conn) SetReadDeadline(t time.Time) error { return fmt.Errorf("unsupported") }
239 func (c *conn) SetWriteDeadline(t time.Time) error { return fmt.Errorf("unsupported") }
243 func (addr) Network() string { return "bufconn" }
244 func (addr) String() string { return "bufconn" }