]>
Commit | Line | Data |
---|---|---|
bae9f6d2 JC |
1 | package plugin |
2 | ||
3 | import ( | |
15c0b25d | 4 | "crypto/tls" |
bae9f6d2 JC |
5 | "fmt" |
6 | "io" | |
7 | "net" | |
8 | "net/rpc" | |
9 | ||
10 | "github.com/hashicorp/yamux" | |
11 | ) | |
12 | ||
13 | // RPCClient connects to an RPCServer over net/rpc to dispense plugin types. | |
14 | type RPCClient struct { | |
15 | broker *MuxBroker | |
16 | control *rpc.Client | |
17 | plugins map[string]Plugin | |
18 | ||
19 | // These are the streams used for the various stdout/err overrides | |
20 | stdout, stderr net.Conn | |
21 | } | |
22 | ||
15c0b25d AP |
23 | // newRPCClient creates a new RPCClient. The Client argument is expected |
24 | // to be successfully started already with a lock held. | |
25 | func newRPCClient(c *Client) (*RPCClient, error) { | |
26 | // Connect to the client | |
27 | conn, err := net.Dial(c.address.Network(), c.address.String()) | |
28 | if err != nil { | |
29 | return nil, err | |
30 | } | |
31 | if tcpConn, ok := conn.(*net.TCPConn); ok { | |
32 | // Make sure to set keep alive so that the connection doesn't die | |
33 | tcpConn.SetKeepAlive(true) | |
34 | } | |
35 | ||
36 | if c.config.TLSConfig != nil { | |
37 | conn = tls.Client(conn, c.config.TLSConfig) | |
38 | } | |
39 | ||
40 | // Create the actual RPC client | |
41 | result, err := NewRPCClient(conn, c.config.Plugins) | |
42 | if err != nil { | |
43 | conn.Close() | |
44 | return nil, err | |
45 | } | |
46 | ||
47 | // Begin the stream syncing so that stdin, out, err work properly | |
48 | err = result.SyncStreams( | |
49 | c.config.SyncStdout, | |
50 | c.config.SyncStderr) | |
51 | if err != nil { | |
52 | result.Close() | |
53 | return nil, err | |
54 | } | |
55 | ||
56 | return result, nil | |
57 | } | |
58 | ||
bae9f6d2 JC |
59 | // NewRPCClient creates a client from an already-open connection-like value. |
60 | // Dial is typically used instead. | |
61 | func NewRPCClient(conn io.ReadWriteCloser, plugins map[string]Plugin) (*RPCClient, error) { | |
62 | // Create the yamux client so we can multiplex | |
63 | mux, err := yamux.Client(conn, nil) | |
64 | if err != nil { | |
65 | conn.Close() | |
66 | return nil, err | |
67 | } | |
68 | ||
69 | // Connect to the control stream. | |
70 | control, err := mux.Open() | |
71 | if err != nil { | |
72 | mux.Close() | |
73 | return nil, err | |
74 | } | |
75 | ||
76 | // Connect stdout, stderr streams | |
77 | stdstream := make([]net.Conn, 2) | |
78 | for i, _ := range stdstream { | |
79 | stdstream[i], err = mux.Open() | |
80 | if err != nil { | |
81 | mux.Close() | |
82 | return nil, err | |
83 | } | |
84 | } | |
85 | ||
86 | // Create the broker and start it up | |
87 | broker := newMuxBroker(mux) | |
88 | go broker.Run() | |
89 | ||
90 | // Build the client using our broker and control channel. | |
91 | return &RPCClient{ | |
92 | broker: broker, | |
93 | control: rpc.NewClient(control), | |
94 | plugins: plugins, | |
95 | stdout: stdstream[0], | |
96 | stderr: stdstream[1], | |
97 | }, nil | |
98 | } | |
99 | ||
100 | // SyncStreams should be called to enable syncing of stdout, | |
101 | // stderr with the plugin. | |
102 | // | |
103 | // This will return immediately and the syncing will continue to happen | |
104 | // in the background. You do not need to launch this in a goroutine itself. | |
105 | // | |
106 | // This should never be called multiple times. | |
107 | func (c *RPCClient) SyncStreams(stdout io.Writer, stderr io.Writer) error { | |
108 | go copyStream("stdout", stdout, c.stdout) | |
109 | go copyStream("stderr", stderr, c.stderr) | |
110 | return nil | |
111 | } | |
112 | ||
113 | // Close closes the connection. The client is no longer usable after this | |
114 | // is called. | |
115 | func (c *RPCClient) Close() error { | |
116 | // Call the control channel and ask it to gracefully exit. If this | |
117 | // errors, then we save it so that we always return an error but we | |
118 | // want to try to close the other channels anyways. | |
119 | var empty struct{} | |
120 | returnErr := c.control.Call("Control.Quit", true, &empty) | |
121 | ||
122 | // Close the other streams we have | |
123 | if err := c.control.Close(); err != nil { | |
124 | return err | |
125 | } | |
126 | if err := c.stdout.Close(); err != nil { | |
127 | return err | |
128 | } | |
129 | if err := c.stderr.Close(); err != nil { | |
130 | return err | |
131 | } | |
132 | if err := c.broker.Close(); err != nil { | |
133 | return err | |
134 | } | |
135 | ||
136 | // Return back the error we got from Control.Quit. This is very important | |
137 | // since we MUST return non-nil error if this fails so that Client.Kill | |
138 | // will properly try a process.Kill. | |
139 | return returnErr | |
140 | } | |
141 | ||
142 | func (c *RPCClient) Dispense(name string) (interface{}, error) { | |
143 | p, ok := c.plugins[name] | |
144 | if !ok { | |
145 | return nil, fmt.Errorf("unknown plugin type: %s", name) | |
146 | } | |
147 | ||
148 | var id uint32 | |
149 | if err := c.control.Call( | |
150 | "Dispenser.Dispense", name, &id); err != nil { | |
151 | return nil, err | |
152 | } | |
153 | ||
154 | conn, err := c.broker.Dial(id) | |
155 | if err != nil { | |
156 | return nil, err | |
157 | } | |
158 | ||
159 | return p.Client(c.broker, rpc.NewClient(conn)) | |
160 | } | |
15c0b25d AP |
161 | |
162 | // Ping pings the connection to ensure it is still alive. | |
163 | // | |
164 | // The error from the RPC call is returned exactly if you want to inspect | |
165 | // it for further error analysis. Any error returned from here would indicate | |
166 | // that the connection to the plugin is not healthy. | |
167 | func (c *RPCClient) Ping() error { | |
168 | var empty struct{} | |
169 | return c.control.Call("Control.Ping", true, &empty) | |
170 | } |