aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/github.com/hashicorp/go-plugin/grpc_broker.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/hashicorp/go-plugin/grpc_broker.go')
-rw-r--r--vendor/github.com/hashicorp/go-plugin/grpc_broker.go455
1 files changed, 455 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/go-plugin/grpc_broker.go b/vendor/github.com/hashicorp/go-plugin/grpc_broker.go
new file mode 100644
index 0000000..49fd21c
--- /dev/null
+++ b/vendor/github.com/hashicorp/go-plugin/grpc_broker.go
@@ -0,0 +1,455 @@
1package plugin
2
3import (
4 "context"
5 "crypto/tls"
6 "errors"
7 "fmt"
8 "log"
9 "net"
10 "sync"
11 "sync/atomic"
12 "time"
13
14 "github.com/oklog/run"
15 "google.golang.org/grpc"
16 "google.golang.org/grpc/credentials"
17)
18
19// streamer interface is used in the broker to send/receive connection
20// information.
21type streamer interface {
22 Send(*ConnInfo) error
23 Recv() (*ConnInfo, error)
24 Close()
25}
26
27// sendErr is used to pass errors back during a send.
28type sendErr struct {
29 i *ConnInfo
30 ch chan error
31}
32
33// gRPCBrokerServer is used by the plugin to start a stream and to send
34// connection information to/from the plugin. Implements GRPCBrokerServer and
35// streamer interfaces.
36type gRPCBrokerServer struct {
37 // send is used to send connection info to the gRPC stream.
38 send chan *sendErr
39
40 // recv is used to receive connection info from the gRPC stream.
41 recv chan *ConnInfo
42
43 // quit closes down the stream.
44 quit chan struct{}
45
46 // o is used to ensure we close the quit channel only once.
47 o sync.Once
48}
49
50func newGRPCBrokerServer() *gRPCBrokerServer {
51 return &gRPCBrokerServer{
52 send: make(chan *sendErr),
53 recv: make(chan *ConnInfo),
54 quit: make(chan struct{}),
55 }
56}
57
58// StartStream implements the GRPCBrokerServer interface and will block until
59// the quit channel is closed or the context reports Done. The stream will pass
60// connection information to/from the client.
61func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) error {
62 doneCh := stream.Context().Done()
63 defer s.Close()
64
65 // Proccess send stream
66 go func() {
67 for {
68 select {
69 case <-doneCh:
70 return
71 case <-s.quit:
72 return
73 case se := <-s.send:
74 err := stream.Send(se.i)
75 se.ch <- err
76 }
77 }
78 }()
79
80 // Process receive stream
81 for {
82 i, err := stream.Recv()
83 if err != nil {
84 return err
85 }
86 select {
87 case <-doneCh:
88 return nil
89 case <-s.quit:
90 return nil
91 case s.recv <- i:
92 }
93 }
94
95 return nil
96}
97
98// Send is used by the GRPCBroker to pass connection information into the stream
99// to the client.
100func (s *gRPCBrokerServer) Send(i *ConnInfo) error {
101 ch := make(chan error)
102 defer close(ch)
103
104 select {
105 case <-s.quit:
106 return errors.New("broker closed")
107 case s.send <- &sendErr{
108 i: i,
109 ch: ch,
110 }:
111 }
112
113 return <-ch
114}
115
116// Recv is used by the GRPCBroker to pass connection information that has been
117// sent from the client from the stream to the broker.
118func (s *gRPCBrokerServer) Recv() (*ConnInfo, error) {
119 select {
120 case <-s.quit:
121 return nil, errors.New("broker closed")
122 case i := <-s.recv:
123 return i, nil
124 }
125}
126
127// Close closes the quit channel, shutting down the stream.
128func (s *gRPCBrokerServer) Close() {
129 s.o.Do(func() {
130 close(s.quit)
131 })
132}
133
134// gRPCBrokerClientImpl is used by the client to start a stream and to send
135// connection information to/from the client. Implements GRPCBrokerClient and
136// streamer interfaces.
137type gRPCBrokerClientImpl struct {
138 // client is the underlying GRPC client used to make calls to the server.
139 client GRPCBrokerClient
140
141 // send is used to send connection info to the gRPC stream.
142 send chan *sendErr
143
144 // recv is used to receive connection info from the gRPC stream.
145 recv chan *ConnInfo
146
147 // quit closes down the stream.
148 quit chan struct{}
149
150 // o is used to ensure we close the quit channel only once.
151 o sync.Once
152}
153
154func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
155 return &gRPCBrokerClientImpl{
156 client: NewGRPCBrokerClient(conn),
157 send: make(chan *sendErr),
158 recv: make(chan *ConnInfo),
159 quit: make(chan struct{}),
160 }
161}
162
163// StartStream implements the GRPCBrokerClient interface and will block until
164// the quit channel is closed or the context reports Done. The stream will pass
165// connection information to/from the plugin.
166func (s *gRPCBrokerClientImpl) StartStream() error {
167 ctx, cancelFunc := context.WithCancel(context.Background())
168 defer cancelFunc()
169 defer s.Close()
170
171 stream, err := s.client.StartStream(ctx)
172 if err != nil {
173 return err
174 }
175 doneCh := stream.Context().Done()
176
177 go func() {
178 for {
179 select {
180 case <-doneCh:
181 return
182 case <-s.quit:
183 return
184 case se := <-s.send:
185 err := stream.Send(se.i)
186 se.ch <- err
187 }
188 }
189 }()
190
191 for {
192 i, err := stream.Recv()
193 if err != nil {
194 return err
195 }
196 select {
197 case <-doneCh:
198 return nil
199 case <-s.quit:
200 return nil
201 case s.recv <- i:
202 }
203 }
204
205 return nil
206}
207
208// Send is used by the GRPCBroker to pass connection information into the stream
209// to the plugin.
210func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error {
211 ch := make(chan error)
212 defer close(ch)
213
214 select {
215 case <-s.quit:
216 return errors.New("broker closed")
217 case s.send <- &sendErr{
218 i: i,
219 ch: ch,
220 }:
221 }
222
223 return <-ch
224}
225
226// Recv is used by the GRPCBroker to pass connection information that has been
227// sent from the plugin to the broker.
228func (s *gRPCBrokerClientImpl) Recv() (*ConnInfo, error) {
229 select {
230 case <-s.quit:
231 return nil, errors.New("broker closed")
232 case i := <-s.recv:
233 return i, nil
234 }
235}
236
237// Close closes the quit channel, shutting down the stream.
238func (s *gRPCBrokerClientImpl) Close() {
239 s.o.Do(func() {
240 close(s.quit)
241 })
242}
243
244// GRPCBroker is responsible for brokering connections by unique ID.
245//
246// It is used by plugins to create multiple gRPC connections and data
247// streams between the plugin process and the host process.
248//
249// This allows a plugin to request a channel with a specific ID to connect to
250// or accept a connection from, and the broker handles the details of
251// holding these channels open while they're being negotiated.
252//
253// The Plugin interface has access to these for both Server and Client.
254// The broker can be used by either (optionally) to reserve and connect to
255// new streams. This is useful for complex args and return values,
256// or anything else you might need a data stream for.
257type GRPCBroker struct {
258 nextId uint32
259 streamer streamer
260 streams map[uint32]*gRPCBrokerPending
261 tls *tls.Config
262 doneCh chan struct{}
263 o sync.Once
264
265 sync.Mutex
266}
267
268type gRPCBrokerPending struct {
269 ch chan *ConnInfo
270 doneCh chan struct{}
271}
272
273func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
274 return &GRPCBroker{
275 streamer: s,
276 streams: make(map[uint32]*gRPCBrokerPending),
277 tls: tls,
278 doneCh: make(chan struct{}),
279 }
280}
281
282// Accept accepts a connection by ID.
283//
284// This should not be called multiple times with the same ID at one time.
285func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
286 listener, err := serverListener()
287 if err != nil {
288 return nil, err
289 }
290
291 err = b.streamer.Send(&ConnInfo{
292 ServiceId: id,
293 Network: listener.Addr().Network(),
294 Address: listener.Addr().String(),
295 })
296 if err != nil {
297 return nil, err
298 }
299
300 return listener, nil
301}
302
303// AcceptAndServe is used to accept a specific stream ID and immediately
304// serve a gRPC server on that stream ID. This is used to easily serve
305// complex arguments. Each AcceptAndServe call opens a new listener socket and
306// sends the connection info down the stream to the dialer. Since a new
307// connection is opened every call, these calls should be used sparingly.
308// Multiple gRPC server implementations can be registered to a single
309// AcceptAndServe call.
310func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
311 listener, err := b.Accept(id)
312 if err != nil {
313 log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
314 return
315 }
316 defer listener.Close()
317
318 var opts []grpc.ServerOption
319 if b.tls != nil {
320 opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
321 }
322
323 server := s(opts)
324
325 // Here we use a run group to close this goroutine if the server is shutdown
326 // or the broker is shutdown.
327 var g run.Group
328 {
329 // Serve on the listener, if shutting down call GracefulStop.
330 g.Add(func() error {
331 return server.Serve(listener)
332 }, func(err error) {
333 server.GracefulStop()
334 })
335 }
336 {
337 // block on the closeCh or the doneCh. If we are shutting down close the
338 // closeCh.
339 closeCh := make(chan struct{})
340 g.Add(func() error {
341 select {
342 case <-b.doneCh:
343 case <-closeCh:
344 }
345 return nil
346 }, func(err error) {
347 close(closeCh)
348 })
349 }
350
351 // Block until we are done
352 g.Run()
353}
354
355// Close closes the stream and all servers.
356func (b *GRPCBroker) Close() error {
357 b.streamer.Close()
358 b.o.Do(func() {
359 close(b.doneCh)
360 })
361 return nil
362}
363
364// Dial opens a connection by ID.
365func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
366 var c *ConnInfo
367
368 // Open the stream
369 p := b.getStream(id)
370 select {
371 case c = <-p.ch:
372 close(p.doneCh)
373 case <-time.After(5 * time.Second):
374 return nil, fmt.Errorf("timeout waiting for connection info")
375 }
376
377 var addr net.Addr
378 switch c.Network {
379 case "tcp":
380 addr, err = net.ResolveTCPAddr("tcp", c.Address)
381 case "unix":
382 addr, err = net.ResolveUnixAddr("unix", c.Address)
383 default:
384 err = fmt.Errorf("Unknown address type: %s", c.Address)
385 }
386 if err != nil {
387 return nil, err
388 }
389
390 return dialGRPCConn(b.tls, netAddrDialer(addr))
391}
392
393// NextId returns a unique ID to use next.
394//
395// It is possible for very long-running plugin hosts to wrap this value,
396// though it would require a very large amount of calls. In practice
397// we've never seen it happen.
398func (m *GRPCBroker) NextId() uint32 {
399 return atomic.AddUint32(&m.nextId, 1)
400}
401
402// Run starts the brokering and should be executed in a goroutine, since it
403// blocks forever, or until the session closes.
404//
405// Uses of GRPCBroker never need to call this. It is called internally by
406// the plugin host/client.
407func (m *GRPCBroker) Run() {
408 for {
409 stream, err := m.streamer.Recv()
410 if err != nil {
411 // Once we receive an error, just exit
412 break
413 }
414
415 // Initialize the waiter
416 p := m.getStream(stream.ServiceId)
417 select {
418 case p.ch <- stream:
419 default:
420 }
421
422 go m.timeoutWait(stream.ServiceId, p)
423 }
424}
425
426func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
427 m.Lock()
428 defer m.Unlock()
429
430 p, ok := m.streams[id]
431 if ok {
432 return p
433 }
434
435 m.streams[id] = &gRPCBrokerPending{
436 ch: make(chan *ConnInfo, 1),
437 doneCh: make(chan struct{}),
438 }
439 return m.streams[id]
440}
441
442func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
443 // Wait for the stream to either be picked up and connected, or
444 // for a timeout.
445 select {
446 case <-p.doneCh:
447 case <-time.After(5 * time.Second):
448 }
449
450 m.Lock()
451 defer m.Unlock()
452
453 // Delete the stream so no one else can grab it
454 delete(m.streams, id)
455}