14 "github.com/hashicorp/go-plugin/internal/plugin"
16 "github.com/oklog/run"
17 "google.golang.org/grpc"
18 "google.golang.org/grpc/credentials"
21 // streamer interface is used in the broker to send/receive connection
23 type streamer interface {
24 Send(*plugin.ConnInfo) error
25 Recv() (*plugin.ConnInfo, error)
29 // sendErr is used to pass errors back during a send.
35 // gRPCBrokerServer is used by the plugin to start a stream and to send
36 // connection information to/from the plugin. Implements GRPCBrokerServer and
37 // streamer interfaces.
38 type gRPCBrokerServer struct {
39 // send is used to send connection info to the gRPC stream.
42 // recv is used to receive connection info from the gRPC stream.
43 recv chan *plugin.ConnInfo
45 // quit closes down the stream.
48 // o is used to ensure we close the quit channel only once.
52 func newGRPCBrokerServer() *gRPCBrokerServer {
53 return &gRPCBrokerServer{
54 send: make(chan *sendErr),
55 recv: make(chan *plugin.ConnInfo),
56 quit: make(chan struct{}),
60 // StartStream implements the GRPCBrokerServer interface and will block until
61 // the quit channel is closed or the context reports Done. The stream will pass
62 // connection information to/from the client.
63 func (s *gRPCBrokerServer) StartStream(stream plugin.GRPCBroker_StartStreamServer) error {
64 doneCh := stream.Context().Done()
67 // Proccess send stream
76 err := stream.Send(se.i)
82 // Process receive stream
84 i, err := stream.Recv()
100 // Send is used by the GRPCBroker to pass connection information into the stream
102 func (s *gRPCBrokerServer) Send(i *plugin.ConnInfo) error {
103 ch := make(chan error)
108 return errors.New("broker closed")
109 case s.send <- &sendErr{
118 // Recv is used by the GRPCBroker to pass connection information that has been
119 // sent from the client from the stream to the broker.
120 func (s *gRPCBrokerServer) Recv() (*plugin.ConnInfo, error) {
123 return nil, errors.New("broker closed")
129 // Close closes the quit channel, shutting down the stream.
130 func (s *gRPCBrokerServer) Close() {
136 // gRPCBrokerClientImpl is used by the client to start a stream and to send
137 // connection information to/from the client. Implements GRPCBrokerClient and
138 // streamer interfaces.
139 type gRPCBrokerClientImpl struct {
140 // client is the underlying GRPC client used to make calls to the server.
141 client plugin.GRPCBrokerClient
143 // send is used to send connection info to the gRPC stream.
146 // recv is used to receive connection info from the gRPC stream.
147 recv chan *plugin.ConnInfo
149 // quit closes down the stream.
152 // o is used to ensure we close the quit channel only once.
156 func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
157 return &gRPCBrokerClientImpl{
158 client: plugin.NewGRPCBrokerClient(conn),
159 send: make(chan *sendErr),
160 recv: make(chan *plugin.ConnInfo),
161 quit: make(chan struct{}),
165 // StartStream implements the GRPCBrokerClient interface and will block until
166 // the quit channel is closed or the context reports Done. The stream will pass
167 // connection information to/from the plugin.
168 func (s *gRPCBrokerClientImpl) StartStream() error {
169 ctx, cancelFunc := context.WithCancel(context.Background())
173 stream, err := s.client.StartStream(ctx)
177 doneCh := stream.Context().Done()
187 err := stream.Send(se.i)
194 i, err := stream.Recv()
210 // Send is used by the GRPCBroker to pass connection information into the stream
212 func (s *gRPCBrokerClientImpl) Send(i *plugin.ConnInfo) error {
213 ch := make(chan error)
218 return errors.New("broker closed")
219 case s.send <- &sendErr{
228 // Recv is used by the GRPCBroker to pass connection information that has been
229 // sent from the plugin to the broker.
230 func (s *gRPCBrokerClientImpl) Recv() (*plugin.ConnInfo, error) {
233 return nil, errors.New("broker closed")
239 // Close closes the quit channel, shutting down the stream.
240 func (s *gRPCBrokerClientImpl) Close() {
246 // GRPCBroker is responsible for brokering connections by unique ID.
248 // It is used by plugins to create multiple gRPC connections and data
249 // streams between the plugin process and the host process.
251 // This allows a plugin to request a channel with a specific ID to connect to
252 // or accept a connection from, and the broker handles the details of
253 // holding these channels open while they're being negotiated.
255 // The Plugin interface has access to these for both Server and Client.
256 // The broker can be used by either (optionally) to reserve and connect to
257 // new streams. This is useful for complex args and return values,
258 // or anything else you might need a data stream for.
259 type GRPCBroker struct {
262 streams map[uint32]*gRPCBrokerPending
270 type gRPCBrokerPending struct {
271 ch chan *plugin.ConnInfo
275 func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker {
278 streams: make(map[uint32]*gRPCBrokerPending),
280 doneCh: make(chan struct{}),
284 // Accept accepts a connection by ID.
286 // This should not be called multiple times with the same ID at one time.
287 func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) {
288 listener, err := serverListener()
293 err = b.streamer.Send(&plugin.ConnInfo{
295 Network: listener.Addr().Network(),
296 Address: listener.Addr().String(),
305 // AcceptAndServe is used to accept a specific stream ID and immediately
306 // serve a gRPC server on that stream ID. This is used to easily serve
307 // complex arguments. Each AcceptAndServe call opens a new listener socket and
308 // sends the connection info down the stream to the dialer. Since a new
309 // connection is opened every call, these calls should be used sparingly.
310 // Multiple gRPC server implementations can be registered to a single
311 // AcceptAndServe call.
312 func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) {
313 listener, err := b.Accept(id)
315 log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
318 defer listener.Close()
320 var opts []grpc.ServerOption
322 opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))}
327 // Here we use a run group to close this goroutine if the server is shutdown
328 // or the broker is shutdown.
331 // Serve on the listener, if shutting down call GracefulStop.
333 return server.Serve(listener)
335 server.GracefulStop()
339 // block on the closeCh or the doneCh. If we are shutting down close the
341 closeCh := make(chan struct{})
353 // Block until we are done
357 // Close closes the stream and all servers.
358 func (b *GRPCBroker) Close() error {
366 // Dial opens a connection by ID.
367 func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
368 var c *plugin.ConnInfo
375 case <-time.After(5 * time.Second):
376 return nil, fmt.Errorf("timeout waiting for connection info")
382 addr, err = net.ResolveTCPAddr("tcp", c.Address)
384 addr, err = net.ResolveUnixAddr("unix", c.Address)
386 err = fmt.Errorf("Unknown address type: %s", c.Address)
392 return dialGRPCConn(b.tls, netAddrDialer(addr))
395 // NextId returns a unique ID to use next.
397 // It is possible for very long-running plugin hosts to wrap this value,
398 // though it would require a very large amount of calls. In practice
399 // we've never seen it happen.
400 func (m *GRPCBroker) NextId() uint32 {
401 return atomic.AddUint32(&m.nextId, 1)
404 // Run starts the brokering and should be executed in a goroutine, since it
405 // blocks forever, or until the session closes.
407 // Uses of GRPCBroker never need to call this. It is called internally by
408 // the plugin host/client.
409 func (m *GRPCBroker) Run() {
411 stream, err := m.streamer.Recv()
413 // Once we receive an error, just exit
417 // Initialize the waiter
418 p := m.getStream(stream.ServiceId)
424 go m.timeoutWait(stream.ServiceId, p)
428 func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending {
432 p, ok := m.streams[id]
437 m.streams[id] = &gRPCBrokerPending{
438 ch: make(chan *plugin.ConnInfo, 1),
439 doneCh: make(chan struct{}),
444 func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) {
445 // Wait for the stream to either be picked up and connected, or
449 case <-time.After(5 * time.Second):
455 // Delete the stream so no one else can grab it
456 delete(m.streams, id)