]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blame - vendor/github.com/hashicorp/go-plugin/grpc_broker.go
Upgrade to 0.12
[github/fretlink/terraform-provider-statuscake.git] / vendor / github.com / hashicorp / go-plugin / grpc_broker.go
CommitLineData
15c0b25d
AP
1package plugin
2
3import (
4 "context"
5 "crypto/tls"
6 "errors"
7 "fmt"
8 "log"
9 "net"
10 "sync"
11 "sync/atomic"
12 "time"
13
107c1cdb
ND
14 "github.com/hashicorp/go-plugin/internal/plugin"
15
15c0b25d
AP
16 "github.com/oklog/run"
17 "google.golang.org/grpc"
18 "google.golang.org/grpc/credentials"
19)
20
21// streamer interface is used in the broker to send/receive connection
22// information.
23type streamer interface {
107c1cdb
ND
24 Send(*plugin.ConnInfo) error
25 Recv() (*plugin.ConnInfo, error)
15c0b25d
AP
26 Close()
27}
28
29// sendErr is used to pass errors back during a send.
30type sendErr struct {
107c1cdb 31 i *plugin.ConnInfo
15c0b25d
AP
32 ch chan error
33}
34
35// gRPCBrokerServer is used by the plugin to start a stream and to send
36// connection information to/from the plugin. Implements GRPCBrokerServer and
37// streamer interfaces.
38type gRPCBrokerServer struct {
39 // send is used to send connection info to the gRPC stream.
40 send chan *sendErr
41
42 // recv is used to receive connection info from the gRPC stream.
107c1cdb 43 recv chan *plugin.ConnInfo
15c0b25d
AP
44
45 // quit closes down the stream.
46 quit chan struct{}
47
48 // o is used to ensure we close the quit channel only once.
49 o sync.Once
50}
51
52func newGRPCBrokerServer() *gRPCBrokerServer {
53 return &gRPCBrokerServer{
54 send: make(chan *sendErr),
107c1cdb 55 recv: make(chan *plugin.ConnInfo),
15c0b25d
AP
56 quit: make(chan struct{}),
57 }
58}
59
60// StartStream implements the GRPCBrokerServer interface and will block until
61// the quit channel is closed or the context reports Done. The stream will pass
62// connection information to/from the client.
107c1cdb 63func (s *gRPCBrokerServer) StartStream(stream plugin.GRPCBroker_StartStreamServer) error {
15c0b25d
AP
64 doneCh := stream.Context().Done()
65 defer s.Close()
66
67 // Proccess send stream
68 go func() {
69 for {
70 select {
71 case <-doneCh:
72 return
73 case <-s.quit:
74 return
75 case se := <-s.send:
76 err := stream.Send(se.i)
77 se.ch <- err
78 }
79 }
80 }()
81
82 // Process receive stream
83 for {
84 i, err := stream.Recv()
85 if err != nil {
86 return err
87 }
88 select {
89 case <-doneCh:
90 return nil
91 case <-s.quit:
92 return nil
93 case s.recv <- i:
94 }
95 }
96
97 return nil
98}
99
100// Send is used by the GRPCBroker to pass connection information into the stream
101// to the client.
107c1cdb 102func (s *gRPCBrokerServer) Send(i *plugin.ConnInfo) error {
15c0b25d
AP
103 ch := make(chan error)
104 defer close(ch)
105
106 select {
107 case <-s.quit:
108 return errors.New("broker closed")
109 case s.send <- &sendErr{
110 i: i,
111 ch: ch,
112 }:
113 }
114
115 return <-ch
116}
117
118// Recv is used by the GRPCBroker to pass connection information that has been
119// sent from the client from the stream to the broker.
107c1cdb 120func (s *gRPCBrokerServer) Recv() (*plugin.ConnInfo, error) {
15c0b25d
AP
121 select {
122 case <-s.quit:
123 return nil, errors.New("broker closed")
124 case i := <-s.recv:
125 return i, nil
126 }
127}
128
129// Close closes the quit channel, shutting down the stream.
130func (s *gRPCBrokerServer) Close() {
131 s.o.Do(func() {
132 close(s.quit)
133 })
134}
135
136// gRPCBrokerClientImpl is used by the client to start a stream and to send
137// connection information to/from the client. Implements GRPCBrokerClient and
138// streamer interfaces.
139type gRPCBrokerClientImpl struct {
140 // client is the underlying GRPC client used to make calls to the server.
107c1cdb 141 client plugin.GRPCBrokerClient
15c0b25d
AP
142
143 // send is used to send connection info to the gRPC stream.
144 send chan *sendErr
145
146 // recv is used to receive connection info from the gRPC stream.
107c1cdb 147 recv chan *plugin.ConnInfo
15c0b25d
AP
148
149 // quit closes down the stream.
150 quit chan struct{}
151
152 // o is used to ensure we close the quit channel only once.
153 o sync.Once
154}
155
156func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
157 return &gRPCBrokerClientImpl{
107c1cdb 158 client: plugin.NewGRPCBrokerClient(conn),
15c0b25d 159 send: make(chan *sendErr),
107c1cdb 160 recv: make(chan *plugin.ConnInfo),
15c0b25d
AP
161 quit: make(chan struct{}),
162 }
163}
164
165// StartStream implements the GRPCBrokerClient interface and will block until
166// the quit channel is closed or the context reports Done. The stream will pass
167// connection information to/from the plugin.
168func (s *gRPCBrokerClientImpl) StartStream() error {
169 ctx, cancelFunc := context.WithCancel(context.Background())
170 defer cancelFunc()
171 defer s.Close()
172
173 stream, err := s.client.StartStream(ctx)
174 if err != nil {
175 return err
176 }
177 doneCh := stream.Context().Done()
178
179 go func() {
180 for {
181 select {
182 case <-doneCh:
183 return
184 case <-s.quit:
185 return
186 case se := <-s.send:
187 err := stream.Send(se.i)
188 se.ch <- err
189 }
190 }
191 }()
192
193 for {
194 i, err := stream.Recv()
195 if err != nil {
196 return err
197 }
198 select {
199 case <-doneCh:
200 return nil
201 case <-s.quit:
202 return nil
203 case s.recv <- i:
204 }
205 }
206
207 return nil
208}
209
210// Send is used by the GRPCBroker to pass connection information into the stream
211// to the plugin.
107c1cdb 212func (s *gRPCBrokerClientImpl) Send(i *plugin.ConnInfo) error {
15c0b25d
AP
213 ch := make(chan error)
214 defer close(ch)
215
216 select {
217 case <-s.quit:
218 return errors.New("broker closed")
219 case s.send <- &sendErr{
220 i: i,
221 ch: ch,
222 }:
223 }
224
225 return <-ch
226}
227
228// Recv is used by the GRPCBroker to pass connection information that has been
229// sent from the plugin to the broker.
107c1cdb 230func (s *gRPCBrokerClientImpl) Recv() (*plugin.ConnInfo, error) {
15c0b25d
AP
231 select {
232 case <-s.quit:
233 return nil, errors.New("broker closed")
234 case i := <-s.recv:
235 return i, nil
236 }
237}
238
239// Close closes the quit channel, shutting down the stream.
240func (s *gRPCBrokerClientImpl) Close() {
241 s.o.Do(func() {
242 close(s.quit)
243 })
244}
245
246// GRPCBroker is responsible for brokering connections by unique ID.
247//
248// It is used by plugins to create multiple gRPC connections and data
249// streams between the plugin process and the host process.
250//
251// This allows a plugin to request a channel with a specific ID to connect to
252// or accept a connection from, and the broker handles the details of
253// holding these channels open while they're being negotiated.
254//
255// The Plugin interface has access to these for both Server and Client.
256// The broker can be used by either (optionally) to reserve and connect to
257// new streams. This is useful for complex args and return values,
258// or anything else you might need a data stream for.
259type GRPCBroker struct {
260 nextId uint32
261 streamer streamer
262 streams map[uint32]*gRPCBrokerPending
263 tls *tls.Config
264 doneCh chan struct{}
265 o sync.Once
266
267 sync.Mutex
268}
269
270type gRPCBrokerPending struct {
107c1cdb 271 ch chan *plugin.ConnInfo
15c0b25d
AP
272 doneCh chan struct{}
273}
274
275func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
276 return &GRPCBroker{
277 streamer: s,
278 streams: make(map[uint32]*gRPCBrokerPending),
279 tls: tls,
280 doneCh: make(chan struct{}),
281 }
282}
283
284// Accept accepts a connection by ID.
285//
286// This should not be called multiple times with the same ID at one time.
287func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
288 listener, err := serverListener()
289 if err != nil {
290 return nil, err
291 }
292
107c1cdb 293 err = b.streamer.Send(&plugin.ConnInfo{
15c0b25d
AP
294 ServiceId: id,
295 Network: listener.Addr().Network(),
296 Address: listener.Addr().String(),
297 })
298 if err != nil {
299 return nil, err
300 }
301
302 return listener, nil
303}
304
305// AcceptAndServe is used to accept a specific stream ID and immediately
306// serve a gRPC server on that stream ID. This is used to easily serve
307// complex arguments. Each AcceptAndServe call opens a new listener socket and
308// sends the connection info down the stream to the dialer. Since a new
309// connection is opened every call, these calls should be used sparingly.
310// Multiple gRPC server implementations can be registered to a single
311// AcceptAndServe call.
312func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
313 listener, err := b.Accept(id)
314 if err != nil {
315 log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
316 return
317 }
318 defer listener.Close()
319
320 var opts []grpc.ServerOption
321 if b.tls != nil {
322 opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
323 }
324
325 server := s(opts)
326
327 // Here we use a run group to close this goroutine if the server is shutdown
328 // or the broker is shutdown.
329 var g run.Group
330 {
331 // Serve on the listener, if shutting down call GracefulStop.
332 g.Add(func() error {
333 return server.Serve(listener)
334 }, func(err error) {
335 server.GracefulStop()
336 })
337 }
338 {
339 // block on the closeCh or the doneCh. If we are shutting down close the
340 // closeCh.
341 closeCh := make(chan struct{})
342 g.Add(func() error {
343 select {
344 case <-b.doneCh:
345 case <-closeCh:
346 }
347 return nil
348 }, func(err error) {
349 close(closeCh)
350 })
351 }
352
353 // Block until we are done
354 g.Run()
355}
356
357// Close closes the stream and all servers.
358func (b *GRPCBroker) Close() error {
359 b.streamer.Close()
360 b.o.Do(func() {
361 close(b.doneCh)
362 })
363 return nil
364}
365
366// Dial opens a connection by ID.
367func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
107c1cdb 368 var c *plugin.ConnInfo
15c0b25d
AP
369
370 // Open the stream
371 p := b.getStream(id)
372 select {
373 case c = <-p.ch:
374 close(p.doneCh)
375 case <-time.After(5 * time.Second):
376 return nil, fmt.Errorf("timeout waiting for connection info")
377 }
378
379 var addr net.Addr
380 switch c.Network {
381 case "tcp":
382 addr, err = net.ResolveTCPAddr("tcp", c.Address)
383 case "unix":
384 addr, err = net.ResolveUnixAddr("unix", c.Address)
385 default:
386 err = fmt.Errorf("Unknown address type: %s", c.Address)
387 }
388 if err != nil {
389 return nil, err
390 }
391
392 return dialGRPCConn(b.tls, netAddrDialer(addr))
393}
394
395// NextId returns a unique ID to use next.
396//
397// It is possible for very long-running plugin hosts to wrap this value,
398// though it would require a very large amount of calls. In practice
399// we've never seen it happen.
400func (m *GRPCBroker) NextId() uint32 {
401 return atomic.AddUint32(&m.nextId, 1)
402}
403
404// Run starts the brokering and should be executed in a goroutine, since it
405// blocks forever, or until the session closes.
406//
407// Uses of GRPCBroker never need to call this. It is called internally by
408// the plugin host/client.
409func (m *GRPCBroker) Run() {
410 for {
411 stream, err := m.streamer.Recv()
412 if err != nil {
413 // Once we receive an error, just exit
414 break
415 }
416
417 // Initialize the waiter
418 p := m.getStream(stream.ServiceId)
419 select {
420 case p.ch <- stream:
421 default:
422 }
423
424 go m.timeoutWait(stream.ServiceId, p)
425 }
426}
427
428func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
429 m.Lock()
430 defer m.Unlock()
431
432 p, ok := m.streams[id]
433 if ok {
434 return p
435 }
436
437 m.streams[id] = &gRPCBrokerPending{
107c1cdb 438 ch: make(chan *plugin.ConnInfo, 1),
15c0b25d
AP
439 doneCh: make(chan struct{}),
440 }
441 return m.streams[id]
442}
443
444func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
445 // Wait for the stream to either be picked up and connected, or
446 // for a timeout.
447 select {
448 case <-p.doneCh:
449 case <-time.After(5 * time.Second):
450 }
451
452 m.Lock()
453 defer m.Unlock()
454
455 // Delete the stream so no one else can grab it
456 delete(m.streams, id)
457}