]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blob - vendor/google.golang.org/grpc/test/bufconn/bufconn.go
Upgrade to 0.12
[github/fretlink/terraform-provider-statuscake.git] / vendor / google.golang.org / grpc / test / bufconn / bufconn.go
1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 // Package bufconn provides a net.Conn implemented by a buffer and related
20 // dialing and listening functionality.
21 package bufconn
22
23 import (
24 "fmt"
25 "io"
26 "net"
27 "sync"
28 "time"
29 )
30
31 // Listener implements a net.Listener that creates local, buffered net.Conns
32 // via its Accept and Dial method.
33 type Listener struct {
34 mu sync.Mutex
35 sz int
36 ch chan net.Conn
37 done chan struct{}
38 }
39
40 var errClosed = fmt.Errorf("Closed")
41
42 // Listen returns a Listener that can only be contacted by its own Dialers and
43 // creates buffered connections between the two.
44 func Listen(sz int) *Listener {
45 return &Listener{sz: sz, ch: make(chan net.Conn), done: make(chan struct{})}
46 }
47
48 // Accept blocks until Dial is called, then returns a net.Conn for the server
49 // half of the connection.
50 func (l *Listener) Accept() (net.Conn, error) {
51 select {
52 case <-l.done:
53 return nil, errClosed
54 case c := <-l.ch:
55 return c, nil
56 }
57 }
58
59 // Close stops the listener.
60 func (l *Listener) Close() error {
61 l.mu.Lock()
62 defer l.mu.Unlock()
63 select {
64 case <-l.done:
65 // Already closed.
66 break
67 default:
68 close(l.done)
69 }
70 return nil
71 }
72
73 // Addr reports the address of the listener.
74 func (l *Listener) Addr() net.Addr { return addr{} }
75
76 // Dial creates an in-memory full-duplex network connection, unblocks Accept by
77 // providing it the server half of the connection, and returns the client half
78 // of the connection.
79 func (l *Listener) Dial() (net.Conn, error) {
80 p1, p2 := newPipe(l.sz), newPipe(l.sz)
81 select {
82 case <-l.done:
83 return nil, errClosed
84 case l.ch <- &conn{p1, p2}:
85 return &conn{p2, p1}, nil
86 }
87 }
88
89 type pipe struct {
90 mu sync.Mutex
91
92 // buf contains the data in the pipe. It is a ring buffer of fixed capacity,
93 // with r and w pointing to the offset to read and write, respsectively.
94 //
95 // Data is read between [r, w) and written to [w, r), wrapping around the end
96 // of the slice if necessary.
97 //
98 // The buffer is empty if r == len(buf), otherwise if r == w, it is full.
99 //
100 // w and r are always in the range [0, cap(buf)) and [0, len(buf)].
101 buf []byte
102 w, r int
103
104 wwait sync.Cond
105 rwait sync.Cond
106
107 closed bool
108 writeClosed bool
109 }
110
111 func newPipe(sz int) *pipe {
112 p := &pipe{buf: make([]byte, 0, sz)}
113 p.wwait.L = &p.mu
114 p.rwait.L = &p.mu
115 return p
116 }
117
118 func (p *pipe) empty() bool {
119 return p.r == len(p.buf)
120 }
121
122 func (p *pipe) full() bool {
123 return p.r < len(p.buf) && p.r == p.w
124 }
125
126 func (p *pipe) Read(b []byte) (n int, err error) {
127 p.mu.Lock()
128 defer p.mu.Unlock()
129 // Block until p has data.
130 for {
131 if p.closed {
132 return 0, io.ErrClosedPipe
133 }
134 if !p.empty() {
135 break
136 }
137 if p.writeClosed {
138 return 0, io.EOF
139 }
140 p.rwait.Wait()
141 }
142 wasFull := p.full()
143
144 n = copy(b, p.buf[p.r:len(p.buf)])
145 p.r += n
146 if p.r == cap(p.buf) {
147 p.r = 0
148 p.buf = p.buf[:p.w]
149 }
150
151 // Signal a blocked writer, if any
152 if wasFull {
153 p.wwait.Signal()
154 }
155
156 return n, nil
157 }
158
159 func (p *pipe) Write(b []byte) (n int, err error) {
160 p.mu.Lock()
161 defer p.mu.Unlock()
162 if p.closed {
163 return 0, io.ErrClosedPipe
164 }
165 for len(b) > 0 {
166 // Block until p is not full.
167 for {
168 if p.closed || p.writeClosed {
169 return 0, io.ErrClosedPipe
170 }
171 if !p.full() {
172 break
173 }
174 p.wwait.Wait()
175 }
176 wasEmpty := p.empty()
177
178 end := cap(p.buf)
179 if p.w < p.r {
180 end = p.r
181 }
182 x := copy(p.buf[p.w:end], b)
183 b = b[x:]
184 n += x
185 p.w += x
186 if p.w > len(p.buf) {
187 p.buf = p.buf[:p.w]
188 }
189 if p.w == cap(p.buf) {
190 p.w = 0
191 }
192
193 // Signal a blocked reader, if any.
194 if wasEmpty {
195 p.rwait.Signal()
196 }
197 }
198 return n, nil
199 }
200
201 func (p *pipe) Close() error {
202 p.mu.Lock()
203 defer p.mu.Unlock()
204 p.closed = true
205 // Signal all blocked readers and writers to return an error.
206 p.rwait.Broadcast()
207 p.wwait.Broadcast()
208 return nil
209 }
210
211 func (p *pipe) closeWrite() error {
212 p.mu.Lock()
213 defer p.mu.Unlock()
214 p.writeClosed = true
215 // Signal all blocked readers and writers to return an error.
216 p.rwait.Broadcast()
217 p.wwait.Broadcast()
218 return nil
219 }
220
221 type conn struct {
222 io.Reader
223 io.Writer
224 }
225
226 func (c *conn) Close() error {
227 err1 := c.Reader.(*pipe).Close()
228 err2 := c.Writer.(*pipe).closeWrite()
229 if err1 != nil {
230 return err1
231 }
232 return err2
233 }
234
235 func (*conn) LocalAddr() net.Addr { return addr{} }
236 func (*conn) RemoteAddr() net.Addr { return addr{} }
237 func (c *conn) SetDeadline(t time.Time) error { return fmt.Errorf("unsupported") }
238 func (c *conn) SetReadDeadline(t time.Time) error { return fmt.Errorf("unsupported") }
239 func (c *conn) SetWriteDeadline(t time.Time) error { return fmt.Errorf("unsupported") }
240
241 type addr struct{}
242
243 func (addr) Network() string { return "bufconn" }
244 func (addr) String() string { return "bufconn" }