aboutsummaryrefslogblamecommitdiffhomepage
path: root/vendor/github.com/hashicorp/go-plugin/rpc_client.go
blob: f30a4b1d38729d10e375ee56561d510a6a182cca (plain) (tree)
1
2
3
4


              
                    

















                                                                            



































                                                                                 





































































































                                                                                           









                                                                             
package plugin

import (
	"crypto/tls"
	"fmt"
	"io"
	"net"
	"net/rpc"

	"github.com/hashicorp/yamux"
)

// RPCClient connects to an RPCServer over net/rpc to dispense plugin types.
type RPCClient struct {
	broker  *MuxBroker
	control *rpc.Client
	plugins map[string]Plugin

	// These are the streams used for the various stdout/err overrides
	stdout, stderr net.Conn
}

// newRPCClient creates a new RPCClient. The Client argument is expected
// to be successfully started already with a lock held.
func newRPCClient(c *Client) (*RPCClient, error) {
	// Connect to the client
	conn, err := net.Dial(c.address.Network(), c.address.String())
	if err != nil {
		return nil, err
	}
	if tcpConn, ok := conn.(*net.TCPConn); ok {
		// Make sure to set keep alive so that the connection doesn't die
		tcpConn.SetKeepAlive(true)
	}

	if c.config.TLSConfig != nil {
		conn = tls.Client(conn, c.config.TLSConfig)
	}

	// Create the actual RPC client
	result, err := NewRPCClient(conn, c.config.Plugins)
	if err != nil {
		conn.Close()
		return nil, err
	}

	// Begin the stream syncing so that stdin, out, err work properly
	err = result.SyncStreams(
		c.config.SyncStdout,
		c.config.SyncStderr)
	if err != nil {
		result.Close()
		return nil, err
	}

	return result, nil
}

// NewRPCClient creates a client from an already-open connection-like value.
// Dial is typically used instead.
func NewRPCClient(conn io.ReadWriteCloser, plugins map[string]Plugin) (*RPCClient, error) {
	// Create the yamux client so we can multiplex
	mux, err := yamux.Client(conn, nil)
	if err != nil {
		conn.Close()
		return nil, err
	}

	// Connect to the control stream.
	control, err := mux.Open()
	if err != nil {
		mux.Close()
		return nil, err
	}

	// Connect stdout, stderr streams
	stdstream := make([]net.Conn, 2)
	for i, _ := range stdstream {
		stdstream[i], err = mux.Open()
		if err != nil {
			mux.Close()
			return nil, err
		}
	}

	// Create the broker and start it up
	broker := newMuxBroker(mux)
	go broker.Run()

	// Build the client using our broker and control channel.
	return &RPCClient{
		broker:  broker,
		control: rpc.NewClient(control),
		plugins: plugins,
		stdout:  stdstream[0],
		stderr:  stdstream[1],
	}, nil
}

// SyncStreams should be called to enable syncing of stdout,
// stderr with the plugin.
//
// This will return immediately and the syncing will continue to happen
// in the background. You do not need to launch this in a goroutine itself.
//
// This should never be called multiple times.
func (c *RPCClient) SyncStreams(stdout io.Writer, stderr io.Writer) error {
	go copyStream("stdout", stdout, c.stdout)
	go copyStream("stderr", stderr, c.stderr)
	return nil
}

// Close closes the connection. The client is no longer usable after this
// is called.
func (c *RPCClient) Close() error {
	// Call the control channel and ask it to gracefully exit. If this
	// errors, then we save it so that we always return an error but we
	// want to try to close the other channels anyways.
	var empty struct{}
	returnErr := c.control.Call("Control.Quit", true, &empty)

	// Close the other streams we have
	if err := c.control.Close(); err != nil {
		return err
	}
	if err := c.stdout.Close(); err != nil {
		return err
	}
	if err := c.stderr.Close(); err != nil {
		return err
	}
	if err := c.broker.Close(); err != nil {
		return err
	}

	// Return back the error we got from Control.Quit. This is very important
	// since we MUST return non-nil error if this fails so that Client.Kill
	// will properly try a process.Kill.
	return returnErr
}

func (c *RPCClient) Dispense(name string) (interface{}, error) {
	p, ok := c.plugins[name]
	if !ok {
		return nil, fmt.Errorf("unknown plugin type: %s", name)
	}

	var id uint32
	if err := c.control.Call(
		"Dispenser.Dispense", name, &id); err != nil {
		return nil, err
	}

	conn, err := c.broker.Dial(id)
	if err != nil {
		return nil, err
	}

	return p.Client(c.broker, rpc.NewClient(conn))
}

// Ping pings the connection to ensure it is still alive.
//
// The error from the RPC call is returned exactly if you want to inspect
// it for further error analysis. Any error returned from here would indicate
// that the connection to the plugin is not healthy.
func (c *RPCClient) Ping() error {
	var empty struct{}
	return c.control.Call("Control.Ping", true, &empty)
}