9 "github.com/hashicorp/yamux"
12 // RPCClient connects to an RPCServer over net/rpc to dispense plugin types.
13 type RPCClient struct {
16 plugins map[string]Plugin
18 // These are the streams used for the various stdout/err overrides
19 stdout, stderr net.Conn
22 // NewRPCClient creates a client from an already-open connection-like value.
23 // Dial is typically used instead.
24 func NewRPCClient(conn io.ReadWriteCloser, plugins map[string]Plugin) (*RPCClient, error) {
25 // Create the yamux client so we can multiplex
26 mux, err := yamux.Client(conn, nil)
32 // Connect to the control stream.
33 control, err := mux.Open()
39 // Connect stdout, stderr streams
40 stdstream := make([]net.Conn, 2)
41 for i, _ := range stdstream {
42 stdstream[i], err = mux.Open()
49 // Create the broker and start it up
50 broker := newMuxBroker(mux)
53 // Build the client using our broker and control channel.
56 control: rpc.NewClient(control),
63 // SyncStreams should be called to enable syncing of stdout,
64 // stderr with the plugin.
66 // This will return immediately and the syncing will continue to happen
67 // in the background. You do not need to launch this in a goroutine itself.
69 // This should never be called multiple times.
70 func (c *RPCClient) SyncStreams(stdout io.Writer, stderr io.Writer) error {
71 go copyStream("stdout", stdout, c.stdout)
72 go copyStream("stderr", stderr, c.stderr)
76 // Close closes the connection. The client is no longer usable after this
78 func (c *RPCClient) Close() error {
79 // Call the control channel and ask it to gracefully exit. If this
80 // errors, then we save it so that we always return an error but we
81 // want to try to close the other channels anyways.
83 returnErr := c.control.Call("Control.Quit", true, &empty)
85 // Close the other streams we have
86 if err := c.control.Close(); err != nil {
89 if err := c.stdout.Close(); err != nil {
92 if err := c.stderr.Close(); err != nil {
95 if err := c.broker.Close(); err != nil {
99 // Return back the error we got from Control.Quit. This is very important
100 // since we MUST return non-nil error if this fails so that Client.Kill
101 // will properly try a process.Kill.
105 func (c *RPCClient) Dispense(name string) (interface{}, error) {
106 p, ok := c.plugins[name]
108 return nil, fmt.Errorf("unknown plugin type: %s", name)
112 if err := c.control.Call(
113 "Dispenser.Dispense", name, &id); err != nil {
117 conn, err := c.broker.Dial(id)
122 return p.Client(c.broker, rpc.NewClient(conn))