]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blob - vendor/github.com/hashicorp/go-plugin/grpc_broker.go
deps: github.com/hashicorp/terraform@sdk-v0.11-with-go-modules
[github/fretlink/terraform-provider-statuscake.git] / vendor / github.com / hashicorp / go-plugin / grpc_broker.go
1 package plugin
2
3 import (
4 "context"
5 "crypto/tls"
6 "errors"
7 "fmt"
8 "log"
9 "net"
10 "sync"
11 "sync/atomic"
12 "time"
13
14 "github.com/oklog/run"
15 "google.golang.org/grpc"
16 "google.golang.org/grpc/credentials"
17 )
18
19 // streamer interface is used in the broker to send/receive connection
20 // information.
21 type streamer interface {
22 Send(*ConnInfo) error
23 Recv() (*ConnInfo, error)
24 Close()
25 }
26
27 // sendErr is used to pass errors back during a send.
28 type sendErr struct {
29 i *ConnInfo
30 ch chan error
31 }
32
33 // gRPCBrokerServer is used by the plugin to start a stream and to send
34 // connection information to/from the plugin. Implements GRPCBrokerServer and
35 // streamer interfaces.
36 type gRPCBrokerServer struct {
37 // send is used to send connection info to the gRPC stream.
38 send chan *sendErr
39
40 // recv is used to receive connection info from the gRPC stream.
41 recv chan *ConnInfo
42
43 // quit closes down the stream.
44 quit chan struct{}
45
46 // o is used to ensure we close the quit channel only once.
47 o sync.Once
48 }
49
50 func newGRPCBrokerServer() *gRPCBrokerServer {
51 return &gRPCBrokerServer{
52 send: make(chan *sendErr),
53 recv: make(chan *ConnInfo),
54 quit: make(chan struct{}),
55 }
56 }
57
58 // StartStream implements the GRPCBrokerServer interface and will block until
59 // the quit channel is closed or the context reports Done. The stream will pass
60 // connection information to/from the client.
61 func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) error {
62 doneCh := stream.Context().Done()
63 defer s.Close()
64
65 // Proccess send stream
66 go func() {
67 for {
68 select {
69 case <-doneCh:
70 return
71 case <-s.quit:
72 return
73 case se := <-s.send:
74 err := stream.Send(se.i)
75 se.ch <- err
76 }
77 }
78 }()
79
80 // Process receive stream
81 for {
82 i, err := stream.Recv()
83 if err != nil {
84 return err
85 }
86 select {
87 case <-doneCh:
88 return nil
89 case <-s.quit:
90 return nil
91 case s.recv <- i:
92 }
93 }
94
95 return nil
96 }
97
98 // Send is used by the GRPCBroker to pass connection information into the stream
99 // to the client.
100 func (s *gRPCBrokerServer) Send(i *ConnInfo) error {
101 ch := make(chan error)
102 defer close(ch)
103
104 select {
105 case <-s.quit:
106 return errors.New("broker closed")
107 case s.send <- &sendErr{
108 i: i,
109 ch: ch,
110 }:
111 }
112
113 return <-ch
114 }
115
116 // Recv is used by the GRPCBroker to pass connection information that has been
117 // sent from the client from the stream to the broker.
118 func (s *gRPCBrokerServer) Recv() (*ConnInfo, error) {
119 select {
120 case <-s.quit:
121 return nil, errors.New("broker closed")
122 case i := <-s.recv:
123 return i, nil
124 }
125 }
126
127 // Close closes the quit channel, shutting down the stream.
128 func (s *gRPCBrokerServer) Close() {
129 s.o.Do(func() {
130 close(s.quit)
131 })
132 }
133
134 // gRPCBrokerClientImpl is used by the client to start a stream and to send
135 // connection information to/from the client. Implements GRPCBrokerClient and
136 // streamer interfaces.
137 type gRPCBrokerClientImpl struct {
138 // client is the underlying GRPC client used to make calls to the server.
139 client GRPCBrokerClient
140
141 // send is used to send connection info to the gRPC stream.
142 send chan *sendErr
143
144 // recv is used to receive connection info from the gRPC stream.
145 recv chan *ConnInfo
146
147 // quit closes down the stream.
148 quit chan struct{}
149
150 // o is used to ensure we close the quit channel only once.
151 o sync.Once
152 }
153
154 func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
155 return &gRPCBrokerClientImpl{
156 client: NewGRPCBrokerClient(conn),
157 send: make(chan *sendErr),
158 recv: make(chan *ConnInfo),
159 quit: make(chan struct{}),
160 }
161 }
162
163 // StartStream implements the GRPCBrokerClient interface and will block until
164 // the quit channel is closed or the context reports Done. The stream will pass
165 // connection information to/from the plugin.
166 func (s *gRPCBrokerClientImpl) StartStream() error {
167 ctx, cancelFunc := context.WithCancel(context.Background())
168 defer cancelFunc()
169 defer s.Close()
170
171 stream, err := s.client.StartStream(ctx)
172 if err != nil {
173 return err
174 }
175 doneCh := stream.Context().Done()
176
177 go func() {
178 for {
179 select {
180 case <-doneCh:
181 return
182 case <-s.quit:
183 return
184 case se := <-s.send:
185 err := stream.Send(se.i)
186 se.ch <- err
187 }
188 }
189 }()
190
191 for {
192 i, err := stream.Recv()
193 if err != nil {
194 return err
195 }
196 select {
197 case <-doneCh:
198 return nil
199 case <-s.quit:
200 return nil
201 case s.recv <- i:
202 }
203 }
204
205 return nil
206 }
207
208 // Send is used by the GRPCBroker to pass connection information into the stream
209 // to the plugin.
210 func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error {
211 ch := make(chan error)
212 defer close(ch)
213
214 select {
215 case <-s.quit:
216 return errors.New("broker closed")
217 case s.send <- &sendErr{
218 i: i,
219 ch: ch,
220 }:
221 }
222
223 return <-ch
224 }
225
226 // Recv is used by the GRPCBroker to pass connection information that has been
227 // sent from the plugin to the broker.
228 func (s *gRPCBrokerClientImpl) Recv() (*ConnInfo, error) {
229 select {
230 case <-s.quit:
231 return nil, errors.New("broker closed")
232 case i := <-s.recv:
233 return i, nil
234 }
235 }
236
237 // Close closes the quit channel, shutting down the stream.
238 func (s *gRPCBrokerClientImpl) Close() {
239 s.o.Do(func() {
240 close(s.quit)
241 })
242 }
243
244 // GRPCBroker is responsible for brokering connections by unique ID.
245 //
246 // It is used by plugins to create multiple gRPC connections and data
247 // streams between the plugin process and the host process.
248 //
249 // This allows a plugin to request a channel with a specific ID to connect to
250 // or accept a connection from, and the broker handles the details of
251 // holding these channels open while they're being negotiated.
252 //
253 // The Plugin interface has access to these for both Server and Client.
254 // The broker can be used by either (optionally) to reserve and connect to
255 // new streams. This is useful for complex args and return values,
256 // or anything else you might need a data stream for.
257 type GRPCBroker struct {
258 nextId uint32
259 streamer streamer
260 streams map[uint32]*gRPCBrokerPending
261 tls *tls.Config
262 doneCh chan struct{}
263 o sync.Once
264
265 sync.Mutex
266 }
267
268 type gRPCBrokerPending struct {
269 ch chan *ConnInfo
270 doneCh chan struct{}
271 }
272
273 func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
274 return &GRPCBroker{
275 streamer: s,
276 streams: make(map[uint32]*gRPCBrokerPending),
277 tls: tls,
278 doneCh: make(chan struct{}),
279 }
280 }
281
282 // Accept accepts a connection by ID.
283 //
284 // This should not be called multiple times with the same ID at one time.
285 func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
286 listener, err := serverListener()
287 if err != nil {
288 return nil, err
289 }
290
291 err = b.streamer.Send(&ConnInfo{
292 ServiceId: id,
293 Network: listener.Addr().Network(),
294 Address: listener.Addr().String(),
295 })
296 if err != nil {
297 return nil, err
298 }
299
300 return listener, nil
301 }
302
303 // AcceptAndServe is used to accept a specific stream ID and immediately
304 // serve a gRPC server on that stream ID. This is used to easily serve
305 // complex arguments. Each AcceptAndServe call opens a new listener socket and
306 // sends the connection info down the stream to the dialer. Since a new
307 // connection is opened every call, these calls should be used sparingly.
308 // Multiple gRPC server implementations can be registered to a single
309 // AcceptAndServe call.
310 func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
311 listener, err := b.Accept(id)
312 if err != nil {
313 log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
314 return
315 }
316 defer listener.Close()
317
318 var opts []grpc.ServerOption
319 if b.tls != nil {
320 opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
321 }
322
323 server := s(opts)
324
325 // Here we use a run group to close this goroutine if the server is shutdown
326 // or the broker is shutdown.
327 var g run.Group
328 {
329 // Serve on the listener, if shutting down call GracefulStop.
330 g.Add(func() error {
331 return server.Serve(listener)
332 }, func(err error) {
333 server.GracefulStop()
334 })
335 }
336 {
337 // block on the closeCh or the doneCh. If we are shutting down close the
338 // closeCh.
339 closeCh := make(chan struct{})
340 g.Add(func() error {
341 select {
342 case <-b.doneCh:
343 case <-closeCh:
344 }
345 return nil
346 }, func(err error) {
347 close(closeCh)
348 })
349 }
350
351 // Block until we are done
352 g.Run()
353 }
354
355 // Close closes the stream and all servers.
356 func (b *GRPCBroker) Close() error {
357 b.streamer.Close()
358 b.o.Do(func() {
359 close(b.doneCh)
360 })
361 return nil
362 }
363
364 // Dial opens a connection by ID.
365 func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
366 var c *ConnInfo
367
368 // Open the stream
369 p := b.getStream(id)
370 select {
371 case c = <-p.ch:
372 close(p.doneCh)
373 case <-time.After(5 * time.Second):
374 return nil, fmt.Errorf("timeout waiting for connection info")
375 }
376
377 var addr net.Addr
378 switch c.Network {
379 case "tcp":
380 addr, err = net.ResolveTCPAddr("tcp", c.Address)
381 case "unix":
382 addr, err = net.ResolveUnixAddr("unix", c.Address)
383 default:
384 err = fmt.Errorf("Unknown address type: %s", c.Address)
385 }
386 if err != nil {
387 return nil, err
388 }
389
390 return dialGRPCConn(b.tls, netAddrDialer(addr))
391 }
392
393 // NextId returns a unique ID to use next.
394 //
395 // It is possible for very long-running plugin hosts to wrap this value,
396 // though it would require a very large amount of calls. In practice
397 // we've never seen it happen.
398 func (m *GRPCBroker) NextId() uint32 {
399 return atomic.AddUint32(&m.nextId, 1)
400 }
401
402 // Run starts the brokering and should be executed in a goroutine, since it
403 // blocks forever, or until the session closes.
404 //
405 // Uses of GRPCBroker never need to call this. It is called internally by
406 // the plugin host/client.
407 func (m *GRPCBroker) Run() {
408 for {
409 stream, err := m.streamer.Recv()
410 if err != nil {
411 // Once we receive an error, just exit
412 break
413 }
414
415 // Initialize the waiter
416 p := m.getStream(stream.ServiceId)
417 select {
418 case p.ch <- stream:
419 default:
420 }
421
422 go m.timeoutWait(stream.ServiceId, p)
423 }
424 }
425
426 func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
427 m.Lock()
428 defer m.Unlock()
429
430 p, ok := m.streams[id]
431 if ok {
432 return p
433 }
434
435 m.streams[id] = &gRPCBrokerPending{
436 ch: make(chan *ConnInfo, 1),
437 doneCh: make(chan struct{}),
438 }
439 return m.streams[id]
440 }
441
442 func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
443 // Wait for the stream to either be picked up and connected, or
444 // for a timeout.
445 select {
446 case <-p.doneCh:
447 case <-time.After(5 * time.Second):
448 }
449
450 m.Lock()
451 defer m.Unlock()
452
453 // Delete the stream so no one else can grab it
454 delete(m.streams, id)
455 }