"sync/atomic"
"time"
+ "github.com/hashicorp/go-plugin/internal/plugin"
+
"github.com/oklog/run"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
// streamer interface is used in the broker to send/receive connection
// information.
type streamer interface {
- Send(*ConnInfo) error
- Recv() (*ConnInfo, error)
+ Send(*plugin.ConnInfo) error
+ Recv() (*plugin.ConnInfo, error)
Close()
}
// sendErr is used to pass errors back during a send.
type sendErr struct {
- i *ConnInfo
+ i *plugin.ConnInfo
ch chan error
}
send chan *sendErr
// recv is used to receive connection info from the gRPC stream.
- recv chan *ConnInfo
+ recv chan *plugin.ConnInfo
// quit closes down the stream.
quit chan struct{}
func newGRPCBrokerServer() *gRPCBrokerServer {
return &gRPCBrokerServer{
send: make(chan *sendErr),
- recv: make(chan *ConnInfo),
+ recv: make(chan *plugin.ConnInfo),
quit: make(chan struct{}),
}
}
// StartStream implements the GRPCBrokerServer interface and will block until
// the quit channel is closed or the context reports Done. The stream will pass
// connection information to/from the client.
-func (s *gRPCBrokerServer) StartStream(stream GRPCBroker_StartStreamServer) error {
+func (s *gRPCBrokerServer) StartStream(stream plugin.GRPCBroker_StartStreamServer) error {
doneCh := stream.Context().Done()
defer s.Close()
// Send is used by the GRPCBroker to pass connection information into the stream
// to the client.
-func (s *gRPCBrokerServer) Send(i *ConnInfo) error {
+func (s *gRPCBrokerServer) Send(i *plugin.ConnInfo) error {
ch := make(chan error)
defer close(ch)
// Recv is used by the GRPCBroker to pass connection information that has been
// sent from the client from the stream to the broker.
-func (s *gRPCBrokerServer) Recv() (*ConnInfo, error) {
+func (s *gRPCBrokerServer) Recv() (*plugin.ConnInfo, error) {
select {
case <-s.quit:
return nil, errors.New("broker closed")
// streamer interfaces.
type gRPCBrokerClientImpl struct {
// client is the underlying GRPC client used to make calls to the server.
- client GRPCBrokerClient
+ client plugin.GRPCBrokerClient
// send is used to send connection info to the gRPC stream.
send chan *sendErr
// recv is used to receive connection info from the gRPC stream.
- recv chan *ConnInfo
+ recv chan *plugin.ConnInfo
// quit closes down the stream.
quit chan struct{}
func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl {
return &gRPCBrokerClientImpl{
- client: NewGRPCBrokerClient(conn),
+ client: plugin.NewGRPCBrokerClient(conn),
send: make(chan *sendErr),
- recv: make(chan *ConnInfo),
+ recv: make(chan *plugin.ConnInfo),
quit: make(chan struct{}),
}
}
// Send is used by the GRPCBroker to pass connection information into the stream
// to the plugin.
-func (s *gRPCBrokerClientImpl) Send(i *ConnInfo) error {
+func (s *gRPCBrokerClientImpl) Send(i *plugin.ConnInfo) error {
ch := make(chan error)
defer close(ch)
// Recv is used by the GRPCBroker to pass connection information that has been
// sent from the plugin to the broker.
-func (s *gRPCBrokerClientImpl) Recv() (*ConnInfo, error) {
+func (s *gRPCBrokerClientImpl) Recv() (*plugin.ConnInfo, error) {
select {
case <-s.quit:
return nil, errors.New("broker closed")
}
type gRPCBrokerPending struct {
- ch chan *ConnInfo
+ ch chan *plugin.ConnInfo
doneCh chan struct{}
}
return nil, err
}
- err = b.streamer.Send(&ConnInfo{
+ err = b.streamer.Send(&plugin.ConnInfo{
ServiceId: id,
Network: listener.Addr().Network(),
Address: listener.Addr().String(),
// Dial opens a connection by ID.
func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) {
- var c *ConnInfo
+ var c *plugin.ConnInfo
// Open the stream
p := b.getStream(id)
}
m.streams[id] = &gRPCBrokerPending{
- ch: make(chan *ConnInfo, 1),
+ ch: make(chan *plugin.ConnInfo, 1),
doneCh: make(chan struct{}),
}
return m.streams[id]