]>
Commit | Line | Data |
---|---|---|
bae9f6d2 JC |
1 | package plugin |
2 | ||
3 | import ( | |
4 | "bufio" | |
15c0b25d AP |
5 | "context" |
6 | "crypto/subtle" | |
7 | "crypto/tls" | |
107c1cdb ND |
8 | "crypto/x509" |
9 | "encoding/base64" | |
bae9f6d2 JC |
10 | "errors" |
11 | "fmt" | |
15c0b25d | 12 | "hash" |
bae9f6d2 JC |
13 | "io" |
14 | "io/ioutil" | |
bae9f6d2 JC |
15 | "net" |
16 | "os" | |
17 | "os/exec" | |
18 | "path/filepath" | |
19 | "strconv" | |
20 | "strings" | |
21 | "sync" | |
22 | "sync/atomic" | |
23 | "time" | |
15c0b25d AP |
24 | |
25 | hclog "github.com/hashicorp/go-hclog" | |
bae9f6d2 JC |
26 | ) |
27 | ||
28 | // If this is 1, then we've called CleanupClients. This can be used | |
29 | // by plugin RPC implementations to change error behavior since you | |
30 | // can expected network connection errors at this point. This should be | |
31 | // read by using sync/atomic. | |
32 | var Killed uint32 = 0 | |
33 | ||
34 | // This is a slice of the "managed" clients which are cleaned up when | |
35 | // calling Cleanup | |
36 | var managedClients = make([]*Client, 0, 5) | |
37 | var managedClientsLock sync.Mutex | |
38 | ||
39 | // Error types | |
40 | var ( | |
41 | // ErrProcessNotFound is returned when a client is instantiated to | |
42 | // reattach to an existing process and it isn't found. | |
43 | ErrProcessNotFound = errors.New("Reattachment process not found") | |
15c0b25d AP |
44 | |
45 | // ErrChecksumsDoNotMatch is returned when binary's checksum doesn't match | |
46 | // the one provided in the SecureConfig. | |
47 | ErrChecksumsDoNotMatch = errors.New("checksums did not match") | |
48 | ||
49 | // ErrSecureNoChecksum is returned when an empty checksum is provided to the | |
50 | // SecureConfig. | |
51 | ErrSecureConfigNoChecksum = errors.New("no checksum provided") | |
52 | ||
53 | // ErrSecureNoHash is returned when a nil Hash object is provided to the | |
54 | // SecureConfig. | |
55 | ErrSecureConfigNoHash = errors.New("no hash implementation provided") | |
56 | ||
57 | // ErrSecureConfigAndReattach is returned when both Reattach and | |
58 | // SecureConfig are set. | |
59 | ErrSecureConfigAndReattach = errors.New("only one of Reattach or SecureConfig can be set") | |
bae9f6d2 JC |
60 | ) |
61 | ||
62 | // Client handles the lifecycle of a plugin application. It launches | |
63 | // plugins, connects to them, dispenses interface implementations, and handles | |
64 | // killing the process. | |
65 | // | |
66 | // Plugin hosts should use one Client for each plugin executable. To | |
67 | // dispense a plugin type, use the `Client.Client` function, and then | |
68 | // cal `Dispense`. This awkward API is mostly historical but is used to split | |
69 | // the client that deals with subprocess management and the client that | |
70 | // does RPC management. | |
71 | // | |
72 | // See NewClient and ClientConfig for using a Client. | |
73 | type Client struct { | |
107c1cdb ND |
74 | config *ClientConfig |
75 | exited bool | |
76 | l sync.Mutex | |
77 | address net.Addr | |
78 | process *os.Process | |
79 | client ClientProtocol | |
80 | protocol Protocol | |
81 | logger hclog.Logger | |
82 | doneCtx context.Context | |
83 | ctxCancel context.CancelFunc | |
84 | negotiatedVersion int | |
85 | ||
86 | // clientWaitGroup is used to manage the lifecycle of the plugin management | |
87 | // goroutines. | |
88 | clientWaitGroup sync.WaitGroup | |
89 | ||
863486a6 AG |
90 | // stderrWaitGroup is used to prevent the command's Wait() function from |
91 | // being called before we've finished reading from the stderr pipe. | |
92 | stderrWaitGroup sync.WaitGroup | |
93 | ||
107c1cdb ND |
94 | // processKilled is used for testing only, to flag when the process was |
95 | // forcefully killed. | |
96 | processKilled bool | |
97 | } | |
98 | ||
99 | // NegotiatedVersion returns the protocol version negotiated with the server. | |
100 | // This is only valid after Start() is called. | |
101 | func (c *Client) NegotiatedVersion() int { | |
102 | return c.negotiatedVersion | |
bae9f6d2 JC |
103 | } |
104 | ||
105 | // ClientConfig is the configuration used to initialize a new | |
106 | // plugin client. After being used to initialize a plugin client, | |
107 | // that configuration must not be modified again. | |
108 | type ClientConfig struct { | |
109 | // HandshakeConfig is the configuration that must match servers. | |
110 | HandshakeConfig | |
111 | ||
112 | // Plugins are the plugins that can be consumed. | |
107c1cdb ND |
113 | // The implied version of this PluginSet is the Handshake.ProtocolVersion. |
114 | Plugins PluginSet | |
115 | ||
116 | // VersionedPlugins is a map of PluginSets for specific protocol versions. | |
117 | // These can be used to negotiate a compatible version between client and | |
118 | // server. If this is set, Handshake.ProtocolVersion is not required. | |
119 | VersionedPlugins map[int]PluginSet | |
bae9f6d2 JC |
120 | |
121 | // One of the following must be set, but not both. | |
122 | // | |
123 | // Cmd is the unstarted subprocess for starting the plugin. If this is | |
124 | // set, then the Client starts the plugin process on its own and connects | |
125 | // to it. | |
126 | // | |
127 | // Reattach is configuration for reattaching to an existing plugin process | |
128 | // that is already running. This isn't common. | |
129 | Cmd *exec.Cmd | |
130 | Reattach *ReattachConfig | |
131 | ||
15c0b25d AP |
132 | // SecureConfig is configuration for verifying the integrity of the |
133 | // executable. It can not be used with Reattach. | |
134 | SecureConfig *SecureConfig | |
135 | ||
136 | // TLSConfig is used to enable TLS on the RPC client. | |
137 | TLSConfig *tls.Config | |
138 | ||
bae9f6d2 JC |
139 | // Managed represents if the client should be managed by the |
140 | // plugin package or not. If true, then by calling CleanupClients, | |
141 | // it will automatically be cleaned up. Otherwise, the client | |
142 | // user is fully responsible for making sure to Kill all plugin | |
143 | // clients. By default the client is _not_ managed. | |
144 | Managed bool | |
145 | ||
146 | // The minimum and maximum port to use for communicating with | |
147 | // the subprocess. If not set, this defaults to 10,000 and 25,000 | |
148 | // respectively. | |
149 | MinPort, MaxPort uint | |
150 | ||
151 | // StartTimeout is the timeout to wait for the plugin to say it | |
152 | // has started successfully. | |
153 | StartTimeout time.Duration | |
154 | ||
155 | // If non-nil, then the stderr of the client will be written to here | |
156 | // (as well as the log). This is the original os.Stderr of the subprocess. | |
157 | // This isn't the output of synced stderr. | |
158 | Stderr io.Writer | |
159 | ||
160 | // SyncStdout, SyncStderr can be set to override the | |
161 | // respective os.Std* values in the plugin. Care should be taken to | |
162 | // avoid races here. If these are nil, then this will automatically be | |
163 | // hooked up to os.Stdin, Stdout, and Stderr, respectively. | |
164 | // | |
165 | // If the default values (nil) are used, then this package will not | |
166 | // sync any of these streams. | |
167 | SyncStdout io.Writer | |
168 | SyncStderr io.Writer | |
15c0b25d AP |
169 | |
170 | // AllowedProtocols is a list of allowed protocols. If this isn't set, | |
171 | // then only netrpc is allowed. This is so that older go-plugin systems | |
172 | // can show friendly errors if they see a plugin with an unknown | |
173 | // protocol. | |
174 | // | |
175 | // By setting this, you can cause an error immediately on plugin start | |
176 | // if an unsupported protocol is used with a good error message. | |
177 | // | |
178 | // If this isn't set at all (nil value), then only net/rpc is accepted. | |
179 | // This is done for legacy reasons. You must explicitly opt-in to | |
180 | // new protocols. | |
181 | AllowedProtocols []Protocol | |
182 | ||
183 | // Logger is the logger that the client will used. If none is provided, | |
184 | // it will default to hclog's default logger. | |
185 | Logger hclog.Logger | |
107c1cdb ND |
186 | |
187 | // AutoMTLS has the client and server automatically negotiate mTLS for | |
188 | // transport authentication. This ensures that only the original client will | |
189 | // be allowed to connect to the server, and all other connections will be | |
190 | // rejected. The client will also refuse to connect to any server that isn't | |
191 | // the original instance started by the client. | |
192 | // | |
193 | // In this mode of operation, the client generates a one-time use tls | |
194 | // certificate, sends the public x.509 certificate to the new server, and | |
195 | // the server generates a one-time use tls certificate, and sends the public | |
196 | // x.509 certificate back to the client. These are used to authenticate all | |
197 | // rpc connections between the client and server. | |
198 | // | |
199 | // Setting AutoMTLS to true implies that the server must support the | |
200 | // protocol, and correctly negotiate the tls certificates, or a connection | |
201 | // failure will result. | |
202 | // | |
203 | // The client should not set TLSConfig, nor should the server set a | |
204 | // TLSProvider, because AutoMTLS implies that a new certificate and tls | |
205 | // configuration will be generated at startup. | |
206 | // | |
207 | // You cannot Reattach to a server with this option enabled. | |
208 | AutoMTLS bool | |
bae9f6d2 JC |
209 | } |
210 | ||
211 | // ReattachConfig is used to configure a client to reattach to an | |
212 | // already-running plugin process. You can retrieve this information by | |
213 | // calling ReattachConfig on Client. | |
214 | type ReattachConfig struct { | |
15c0b25d AP |
215 | Protocol Protocol |
216 | Addr net.Addr | |
217 | Pid int | |
218 | } | |
219 | ||
220 | // SecureConfig is used to configure a client to verify the integrity of an | |
221 | // executable before running. It does this by verifying the checksum is | |
222 | // expected. Hash is used to specify the hashing method to use when checksumming | |
223 | // the file. The configuration is verified by the client by calling the | |
224 | // SecureConfig.Check() function. | |
225 | // | |
226 | // The host process should ensure the checksum was provided by a trusted and | |
227 | // authoritative source. The binary should be installed in such a way that it | |
228 | // can not be modified by an unauthorized user between the time of this check | |
229 | // and the time of execution. | |
230 | type SecureConfig struct { | |
231 | Checksum []byte | |
232 | Hash hash.Hash | |
233 | } | |
234 | ||
235 | // Check takes the filepath to an executable and returns true if the checksum of | |
236 | // the file matches the checksum provided in the SecureConfig. | |
237 | func (s *SecureConfig) Check(filePath string) (bool, error) { | |
238 | if len(s.Checksum) == 0 { | |
239 | return false, ErrSecureConfigNoChecksum | |
240 | } | |
241 | ||
242 | if s.Hash == nil { | |
243 | return false, ErrSecureConfigNoHash | |
244 | } | |
245 | ||
246 | file, err := os.Open(filePath) | |
247 | if err != nil { | |
248 | return false, err | |
249 | } | |
250 | defer file.Close() | |
251 | ||
252 | _, err = io.Copy(s.Hash, file) | |
253 | if err != nil { | |
254 | return false, err | |
255 | } | |
256 | ||
257 | sum := s.Hash.Sum(nil) | |
258 | ||
259 | return subtle.ConstantTimeCompare(sum, s.Checksum) == 1, nil | |
bae9f6d2 JC |
260 | } |
261 | ||
262 | // This makes sure all the managed subprocesses are killed and properly | |
263 | // logged. This should be called before the parent process running the | |
264 | // plugins exits. | |
265 | // | |
266 | // This must only be called _once_. | |
267 | func CleanupClients() { | |
268 | // Set the killed to true so that we don't get unexpected panics | |
269 | atomic.StoreUint32(&Killed, 1) | |
270 | ||
271 | // Kill all the managed clients in parallel and use a WaitGroup | |
272 | // to wait for them all to finish up. | |
273 | var wg sync.WaitGroup | |
274 | managedClientsLock.Lock() | |
275 | for _, client := range managedClients { | |
276 | wg.Add(1) | |
277 | ||
278 | go func(client *Client) { | |
279 | client.Kill() | |
280 | wg.Done() | |
281 | }(client) | |
282 | } | |
283 | managedClientsLock.Unlock() | |
284 | ||
bae9f6d2 JC |
285 | wg.Wait() |
286 | } | |
287 | ||
288 | // Creates a new plugin client which manages the lifecycle of an external | |
289 | // plugin and gets the address for the RPC connection. | |
290 | // | |
291 | // The client must be cleaned up at some point by calling Kill(). If | |
292 | // the client is a managed client (created with NewManagedClient) you | |
293 | // can just call CleanupClients at the end of your program and they will | |
294 | // be properly cleaned. | |
295 | func NewClient(config *ClientConfig) (c *Client) { | |
296 | if config.MinPort == 0 && config.MaxPort == 0 { | |
297 | config.MinPort = 10000 | |
298 | config.MaxPort = 25000 | |
299 | } | |
300 | ||
301 | if config.StartTimeout == 0 { | |
302 | config.StartTimeout = 1 * time.Minute | |
303 | } | |
304 | ||
305 | if config.Stderr == nil { | |
306 | config.Stderr = ioutil.Discard | |
307 | } | |
308 | ||
309 | if config.SyncStdout == nil { | |
310 | config.SyncStdout = ioutil.Discard | |
311 | } | |
312 | if config.SyncStderr == nil { | |
313 | config.SyncStderr = ioutil.Discard | |
314 | } | |
315 | ||
15c0b25d AP |
316 | if config.AllowedProtocols == nil { |
317 | config.AllowedProtocols = []Protocol{ProtocolNetRPC} | |
318 | } | |
319 | ||
320 | if config.Logger == nil { | |
321 | config.Logger = hclog.New(&hclog.LoggerOptions{ | |
322 | Output: hclog.DefaultOutput, | |
323 | Level: hclog.Trace, | |
324 | Name: "plugin", | |
325 | }) | |
326 | } | |
327 | ||
328 | c = &Client{ | |
329 | config: config, | |
330 | logger: config.Logger, | |
331 | } | |
bae9f6d2 JC |
332 | if config.Managed { |
333 | managedClientsLock.Lock() | |
334 | managedClients = append(managedClients, c) | |
335 | managedClientsLock.Unlock() | |
336 | } | |
337 | ||
338 | return | |
339 | } | |
340 | ||
15c0b25d | 341 | // Client returns the protocol client for this connection. |
bae9f6d2 | 342 | // |
15c0b25d AP |
343 | // Subsequent calls to this will return the same client. |
344 | func (c *Client) Client() (ClientProtocol, error) { | |
345 | _, err := c.Start() | |
bae9f6d2 JC |
346 | if err != nil { |
347 | return nil, err | |
348 | } | |
349 | ||
350 | c.l.Lock() | |
351 | defer c.l.Unlock() | |
352 | ||
353 | if c.client != nil { | |
354 | return c.client, nil | |
355 | } | |
356 | ||
15c0b25d AP |
357 | switch c.protocol { |
358 | case ProtocolNetRPC: | |
359 | c.client, err = newRPCClient(c) | |
bae9f6d2 | 360 | |
15c0b25d AP |
361 | case ProtocolGRPC: |
362 | c.client, err = newGRPCClient(c.doneCtx, c) | |
363 | ||
364 | default: | |
365 | return nil, fmt.Errorf("unknown server protocol: %s", c.protocol) | |
bae9f6d2 JC |
366 | } |
367 | ||
bae9f6d2 | 368 | if err != nil { |
bae9f6d2 JC |
369 | c.client = nil |
370 | return nil, err | |
371 | } | |
372 | ||
373 | return c.client, nil | |
374 | } | |
375 | ||
376 | // Tells whether or not the underlying process has exited. | |
377 | func (c *Client) Exited() bool { | |
378 | c.l.Lock() | |
379 | defer c.l.Unlock() | |
380 | return c.exited | |
381 | } | |
382 | ||
107c1cdb ND |
383 | // killed is used in tests to check if a process failed to exit gracefully, and |
384 | // needed to be killed. | |
385 | func (c *Client) killed() bool { | |
386 | c.l.Lock() | |
387 | defer c.l.Unlock() | |
388 | return c.processKilled | |
389 | } | |
390 | ||
bae9f6d2 JC |
391 | // End the executing subprocess (if it is running) and perform any cleanup |
392 | // tasks necessary such as capturing any remaining logs and so on. | |
393 | // | |
394 | // This method blocks until the process successfully exits. | |
395 | // | |
396 | // This method can safely be called multiple times. | |
397 | func (c *Client) Kill() { | |
398 | // Grab a lock to read some private fields. | |
399 | c.l.Lock() | |
400 | process := c.process | |
401 | addr := c.address | |
bae9f6d2 JC |
402 | c.l.Unlock() |
403 | ||
107c1cdb | 404 | // If there is no process, there is nothing to kill. |
bae9f6d2 JC |
405 | if process == nil { |
406 | return | |
407 | } | |
408 | ||
107c1cdb ND |
409 | defer func() { |
410 | // Wait for the all client goroutines to finish. | |
411 | c.clientWaitGroup.Wait() | |
412 | ||
413 | // Make sure there is no reference to the old process after it has been | |
414 | // killed. | |
415 | c.l.Lock() | |
416 | c.process = nil | |
417 | c.l.Unlock() | |
418 | }() | |
419 | ||
bae9f6d2 JC |
420 | // We need to check for address here. It is possible that the plugin |
421 | // started (process != nil) but has no address (addr == nil) if the | |
422 | // plugin failed at startup. If we do have an address, we need to close | |
423 | // the plugin net connections. | |
424 | graceful := false | |
425 | if addr != nil { | |
426 | // Close the client to cleanly exit the process. | |
427 | client, err := c.Client() | |
428 | if err == nil { | |
429 | err = client.Close() | |
430 | ||
431 | // If there is no error, then we attempt to wait for a graceful | |
432 | // exit. If there was an error, we assume that graceful cleanup | |
433 | // won't happen and just force kill. | |
434 | graceful = err == nil | |
435 | if err != nil { | |
436 | // If there was an error just log it. We're going to force | |
437 | // kill in a moment anyways. | |
15c0b25d | 438 | c.logger.Warn("error closing client during Kill", "err", err) |
bae9f6d2 | 439 | } |
107c1cdb ND |
440 | } else { |
441 | c.logger.Error("client", "error", err) | |
bae9f6d2 JC |
442 | } |
443 | } | |
444 | ||
445 | // If we're attempting a graceful exit, then we wait for a short period | |
446 | // of time to allow that to happen. To wait for this we just wait on the | |
447 | // doneCh which would be closed if the process exits. | |
448 | if graceful { | |
449 | select { | |
107c1cdb ND |
450 | case <-c.doneCtx.Done(): |
451 | c.logger.Debug("plugin exited") | |
bae9f6d2 | 452 | return |
107c1cdb | 453 | case <-time.After(2 * time.Second): |
bae9f6d2 JC |
454 | } |
455 | } | |
456 | ||
457 | // If graceful exiting failed, just kill it | |
107c1cdb | 458 | c.logger.Warn("plugin failed to exit gracefully") |
bae9f6d2 JC |
459 | process.Kill() |
460 | ||
107c1cdb ND |
461 | c.l.Lock() |
462 | c.processKilled = true | |
463 | c.l.Unlock() | |
bae9f6d2 JC |
464 | } |
465 | ||
466 | // Starts the underlying subprocess, communicating with it to negotiate | |
467 | // a port for RPC connections, and returning the address to connect via RPC. | |
468 | // | |
469 | // This method is safe to call multiple times. Subsequent calls have no effect. | |
470 | // Once a client has been started once, it cannot be started again, even if | |
471 | // it was killed. | |
472 | func (c *Client) Start() (addr net.Addr, err error) { | |
473 | c.l.Lock() | |
474 | defer c.l.Unlock() | |
475 | ||
476 | if c.address != nil { | |
477 | return c.address, nil | |
478 | } | |
479 | ||
480 | // If one of cmd or reattach isn't set, then it is an error. We wrap | |
481 | // this in a {} for scoping reasons, and hopeful that the escape | |
107c1cdb | 482 | // analysis will pop the stack here. |
bae9f6d2 JC |
483 | { |
484 | cmdSet := c.config.Cmd != nil | |
485 | attachSet := c.config.Reattach != nil | |
15c0b25d | 486 | secureSet := c.config.SecureConfig != nil |
bae9f6d2 JC |
487 | if cmdSet == attachSet { |
488 | return nil, fmt.Errorf("Only one of Cmd or Reattach must be set") | |
489 | } | |
15c0b25d AP |
490 | |
491 | if secureSet && attachSet { | |
492 | return nil, ErrSecureConfigAndReattach | |
493 | } | |
bae9f6d2 JC |
494 | } |
495 | ||
bae9f6d2 | 496 | if c.config.Reattach != nil { |
107c1cdb ND |
497 | return c.reattach() |
498 | } | |
bae9f6d2 | 499 | |
107c1cdb ND |
500 | if c.config.VersionedPlugins == nil { |
501 | c.config.VersionedPlugins = make(map[int]PluginSet) | |
502 | } | |
bae9f6d2 | 503 | |
107c1cdb ND |
504 | // handle all plugins as versioned, using the handshake config as the default. |
505 | version := int(c.config.ProtocolVersion) | |
506 | ||
507 | // Make sure we're not overwriting a real version 0. If ProtocolVersion was | |
508 | // non-zero, then we have to just assume the user made sure that | |
509 | // VersionedPlugins doesn't conflict. | |
510 | if _, ok := c.config.VersionedPlugins[version]; !ok && c.config.Plugins != nil { | |
511 | c.config.VersionedPlugins[version] = c.config.Plugins | |
512 | } | |
513 | ||
514 | var versionStrings []string | |
515 | for v := range c.config.VersionedPlugins { | |
516 | versionStrings = append(versionStrings, strconv.Itoa(v)) | |
bae9f6d2 JC |
517 | } |
518 | ||
519 | env := []string{ | |
520 | fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue), | |
521 | fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort), | |
522 | fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort), | |
107c1cdb | 523 | fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")), |
bae9f6d2 JC |
524 | } |
525 | ||
bae9f6d2 JC |
526 | cmd := c.config.Cmd |
527 | cmd.Env = append(cmd.Env, os.Environ()...) | |
528 | cmd.Env = append(cmd.Env, env...) | |
529 | cmd.Stdin = os.Stdin | |
107c1cdb ND |
530 | |
531 | cmdStdout, err := cmd.StdoutPipe() | |
532 | if err != nil { | |
533 | return nil, err | |
534 | } | |
535 | cmdStderr, err := cmd.StderrPipe() | |
536 | if err != nil { | |
537 | return nil, err | |
538 | } | |
bae9f6d2 | 539 | |
15c0b25d AP |
540 | if c.config.SecureConfig != nil { |
541 | if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil { | |
542 | return nil, fmt.Errorf("error verifying checksum: %s", err) | |
543 | } else if !ok { | |
544 | return nil, ErrChecksumsDoNotMatch | |
545 | } | |
546 | } | |
547 | ||
107c1cdb ND |
548 | // Setup a temporary certificate for client/server mtls, and send the public |
549 | // certificate to the plugin. | |
550 | if c.config.AutoMTLS { | |
551 | c.logger.Info("configuring client automatic mTLS") | |
552 | certPEM, keyPEM, err := generateCert() | |
553 | if err != nil { | |
554 | c.logger.Error("failed to generate client certificate", "error", err) | |
555 | return nil, err | |
556 | } | |
557 | cert, err := tls.X509KeyPair(certPEM, keyPEM) | |
558 | if err != nil { | |
559 | c.logger.Error("failed to parse client certificate", "error", err) | |
560 | return nil, err | |
561 | } | |
562 | ||
563 | cmd.Env = append(cmd.Env, fmt.Sprintf("PLUGIN_CLIENT_CERT=%s", certPEM)) | |
564 | ||
565 | c.config.TLSConfig = &tls.Config{ | |
566 | Certificates: []tls.Certificate{cert}, | |
567 | ServerName: "localhost", | |
568 | } | |
569 | } | |
570 | ||
15c0b25d | 571 | c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args) |
bae9f6d2 JC |
572 | err = cmd.Start() |
573 | if err != nil { | |
574 | return | |
575 | } | |
576 | ||
577 | // Set the process | |
578 | c.process = cmd.Process | |
107c1cdb | 579 | c.logger.Debug("plugin started", "path", cmd.Path, "pid", c.process.Pid) |
bae9f6d2 JC |
580 | |
581 | // Make sure the command is properly cleaned up if there is an error | |
582 | defer func() { | |
583 | r := recover() | |
584 | ||
585 | if err != nil || r != nil { | |
586 | cmd.Process.Kill() | |
587 | } | |
588 | ||
589 | if r != nil { | |
590 | panic(r) | |
591 | } | |
592 | }() | |
593 | ||
107c1cdb ND |
594 | // Create a context for when we kill |
595 | c.doneCtx, c.ctxCancel = context.WithCancel(context.Background()) | |
596 | ||
863486a6 AG |
597 | // Start goroutine that logs the stderr |
598 | c.clientWaitGroup.Add(1) | |
599 | c.stderrWaitGroup.Add(1) | |
600 | // logStderr calls Done() | |
601 | go c.logStderr(cmdStderr) | |
602 | ||
107c1cdb | 603 | c.clientWaitGroup.Add(1) |
bae9f6d2 | 604 | go func() { |
107c1cdb ND |
605 | // ensure the context is cancelled when we're done |
606 | defer c.ctxCancel() | |
607 | ||
608 | defer c.clientWaitGroup.Done() | |
609 | ||
610 | // get the cmd info early, since the process information will be removed | |
611 | // in Kill. | |
612 | pid := c.process.Pid | |
613 | path := cmd.Path | |
bae9f6d2 | 614 | |
863486a6 AG |
615 | // wait to finish reading from stderr since the stderr pipe reader |
616 | // will be closed by the subsequent call to cmd.Wait(). | |
617 | c.stderrWaitGroup.Wait() | |
618 | ||
bae9f6d2 | 619 | // Wait for the command to end. |
107c1cdb ND |
620 | err := cmd.Wait() |
621 | ||
622 | debugMsgArgs := []interface{}{ | |
623 | "path", path, | |
624 | "pid", pid, | |
625 | } | |
626 | if err != nil { | |
627 | debugMsgArgs = append(debugMsgArgs, | |
628 | []interface{}{"error", err.Error()}...) | |
629 | } | |
bae9f6d2 JC |
630 | |
631 | // Log and make sure to flush the logs write away | |
107c1cdb | 632 | c.logger.Debug("plugin process exited", debugMsgArgs...) |
bae9f6d2 JC |
633 | os.Stderr.Sync() |
634 | ||
bae9f6d2 JC |
635 | // Set that we exited, which takes a lock |
636 | c.l.Lock() | |
637 | defer c.l.Unlock() | |
638 | c.exited = true | |
639 | }() | |
640 | ||
bae9f6d2 JC |
641 | // Start a goroutine that is going to be reading the lines |
642 | // out of stdout | |
107c1cdb ND |
643 | linesCh := make(chan string) |
644 | c.clientWaitGroup.Add(1) | |
bae9f6d2 | 645 | go func() { |
107c1cdb | 646 | defer c.clientWaitGroup.Done() |
bae9f6d2 JC |
647 | defer close(linesCh) |
648 | ||
107c1cdb ND |
649 | scanner := bufio.NewScanner(cmdStdout) |
650 | for scanner.Scan() { | |
651 | linesCh <- scanner.Text() | |
bae9f6d2 JC |
652 | } |
653 | }() | |
654 | ||
655 | // Make sure after we exit we read the lines from stdout forever | |
107c1cdb ND |
656 | // so they don't block since it is a pipe. |
657 | // The scanner goroutine above will close this, but track it with a wait | |
658 | // group for completeness. | |
659 | c.clientWaitGroup.Add(1) | |
bae9f6d2 JC |
660 | defer func() { |
661 | go func() { | |
107c1cdb ND |
662 | defer c.clientWaitGroup.Done() |
663 | for range linesCh { | |
bae9f6d2 JC |
664 | } |
665 | }() | |
666 | }() | |
667 | ||
668 | // Some channels for the next step | |
669 | timeout := time.After(c.config.StartTimeout) | |
670 | ||
671 | // Start looking for the address | |
15c0b25d | 672 | c.logger.Debug("waiting for RPC address", "path", cmd.Path) |
bae9f6d2 JC |
673 | select { |
674 | case <-timeout: | |
675 | err = errors.New("timeout while waiting for plugin to start") | |
107c1cdb | 676 | case <-c.doneCtx.Done(): |
bae9f6d2 | 677 | err = errors.New("plugin exited before we could connect") |
107c1cdb | 678 | case line := <-linesCh: |
bae9f6d2 JC |
679 | // Trim the line and split by "|" in order to get the parts of |
680 | // the output. | |
107c1cdb | 681 | line = strings.TrimSpace(line) |
15c0b25d | 682 | parts := strings.SplitN(line, "|", 6) |
bae9f6d2 JC |
683 | if len(parts) < 4 { |
684 | err = fmt.Errorf( | |
685 | "Unrecognized remote plugin message: %s\n\n"+ | |
686 | "This usually means that the plugin is either invalid or simply\n"+ | |
687 | "needs to be recompiled to support the latest protocol.", line) | |
688 | return | |
689 | } | |
690 | ||
691 | // Check the core protocol. Wrapped in a {} for scoping. | |
692 | { | |
693 | var coreProtocol int64 | |
694 | coreProtocol, err = strconv.ParseInt(parts[0], 10, 0) | |
695 | if err != nil { | |
696 | err = fmt.Errorf("Error parsing core protocol version: %s", err) | |
697 | return | |
698 | } | |
699 | ||
700 | if int(coreProtocol) != CoreProtocolVersion { | |
701 | err = fmt.Errorf("Incompatible core API version with plugin. "+ | |
15c0b25d | 702 | "Plugin version: %s, Core version: %d\n\n"+ |
bae9f6d2 JC |
703 | "To fix this, the plugin usually only needs to be recompiled.\n"+ |
704 | "Please report this to the plugin author.", parts[0], CoreProtocolVersion) | |
705 | return | |
706 | } | |
707 | } | |
708 | ||
107c1cdb ND |
709 | // Test the API version |
710 | version, pluginSet, err := c.checkProtoVersion(parts[1]) | |
bae9f6d2 | 711 | if err != nil { |
107c1cdb | 712 | return addr, err |
bae9f6d2 JC |
713 | } |
714 | ||
107c1cdb ND |
715 | // set the Plugins value to the compatible set, so the version |
716 | // doesn't need to be passed through to the ClientProtocol | |
717 | // implementation. | |
718 | c.config.Plugins = pluginSet | |
719 | c.negotiatedVersion = version | |
720 | c.logger.Debug("using plugin", "version", version) | |
bae9f6d2 JC |
721 | |
722 | switch parts[2] { | |
723 | case "tcp": | |
724 | addr, err = net.ResolveTCPAddr("tcp", parts[3]) | |
725 | case "unix": | |
726 | addr, err = net.ResolveUnixAddr("unix", parts[3]) | |
727 | default: | |
728 | err = fmt.Errorf("Unknown address type: %s", parts[3]) | |
729 | } | |
15c0b25d AP |
730 | |
731 | // If we have a server type, then record that. We default to net/rpc | |
732 | // for backwards compatibility. | |
733 | c.protocol = ProtocolNetRPC | |
734 | if len(parts) >= 5 { | |
735 | c.protocol = Protocol(parts[4]) | |
736 | } | |
737 | ||
738 | found := false | |
739 | for _, p := range c.config.AllowedProtocols { | |
740 | if p == c.protocol { | |
741 | found = true | |
742 | break | |
743 | } | |
744 | } | |
745 | if !found { | |
746 | err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v", | |
747 | c.protocol, c.config.AllowedProtocols) | |
107c1cdb | 748 | return addr, err |
15c0b25d AP |
749 | } |
750 | ||
107c1cdb ND |
751 | // See if we have a TLS certificate from the server. |
752 | // Checking if the length is > 50 rules out catching the unused "extra" | |
753 | // data returned from some older implementations. | |
754 | if len(parts) >= 6 && len(parts[5]) > 50 { | |
755 | err := c.loadServerCert(parts[5]) | |
756 | if err != nil { | |
757 | return nil, fmt.Errorf("error parsing server cert: %s", err) | |
758 | } | |
759 | } | |
bae9f6d2 JC |
760 | } |
761 | ||
762 | c.address = addr | |
763 | return | |
764 | } | |
765 | ||
107c1cdb ND |
766 | // loadServerCert is used by AutoMTLS to read an x.509 cert returned by the |
767 | // server, and load it as the RootCA for the client TLSConfig. | |
768 | func (c *Client) loadServerCert(cert string) error { | |
769 | certPool := x509.NewCertPool() | |
770 | ||
771 | asn1, err := base64.RawStdEncoding.DecodeString(cert) | |
772 | if err != nil { | |
773 | return err | |
774 | } | |
775 | ||
776 | x509Cert, err := x509.ParseCertificate([]byte(asn1)) | |
777 | if err != nil { | |
778 | return err | |
779 | } | |
780 | ||
781 | certPool.AddCert(x509Cert) | |
782 | ||
783 | c.config.TLSConfig.RootCAs = certPool | |
784 | return nil | |
785 | } | |
786 | ||
787 | func (c *Client) reattach() (net.Addr, error) { | |
788 | // Verify the process still exists. If not, then it is an error | |
789 | p, err := os.FindProcess(c.config.Reattach.Pid) | |
790 | if err != nil { | |
791 | return nil, err | |
792 | } | |
793 | ||
794 | // Attempt to connect to the addr since on Unix systems FindProcess | |
795 | // doesn't actually return an error if it can't find the process. | |
796 | conn, err := net.Dial( | |
797 | c.config.Reattach.Addr.Network(), | |
798 | c.config.Reattach.Addr.String()) | |
799 | if err != nil { | |
800 | p.Kill() | |
801 | return nil, ErrProcessNotFound | |
802 | } | |
803 | conn.Close() | |
804 | ||
805 | // Create a context for when we kill | |
806 | c.doneCtx, c.ctxCancel = context.WithCancel(context.Background()) | |
807 | ||
808 | c.clientWaitGroup.Add(1) | |
809 | // Goroutine to mark exit status | |
810 | go func(pid int) { | |
811 | defer c.clientWaitGroup.Done() | |
812 | ||
813 | // ensure the context is cancelled when we're done | |
814 | defer c.ctxCancel() | |
815 | ||
816 | // Wait for the process to die | |
817 | pidWait(pid) | |
818 | ||
819 | // Log so we can see it | |
820 | c.logger.Debug("reattached plugin process exited") | |
821 | ||
822 | // Mark it | |
823 | c.l.Lock() | |
824 | defer c.l.Unlock() | |
825 | c.exited = true | |
826 | }(p.Pid) | |
827 | ||
828 | // Set the address and process | |
829 | c.address = c.config.Reattach.Addr | |
830 | c.process = p | |
831 | c.protocol = c.config.Reattach.Protocol | |
832 | if c.protocol == "" { | |
833 | // Default the protocol to net/rpc for backwards compatibility | |
834 | c.protocol = ProtocolNetRPC | |
835 | } | |
836 | ||
837 | return c.address, nil | |
838 | } | |
839 | ||
840 | // checkProtoVersion returns the negotiated version and PluginSet. | |
841 | // This returns an error if the server returned an incompatible protocol | |
842 | // version, or an invalid handshake response. | |
843 | func (c *Client) checkProtoVersion(protoVersion string) (int, PluginSet, error) { | |
844 | serverVersion, err := strconv.Atoi(protoVersion) | |
845 | if err != nil { | |
846 | return 0, nil, fmt.Errorf("Error parsing protocol version %q: %s", protoVersion, err) | |
847 | } | |
848 | ||
849 | // record these for the error message | |
850 | var clientVersions []int | |
851 | ||
852 | // all versions, including the legacy ProtocolVersion have been added to | |
853 | // the versions set | |
854 | for version, plugins := range c.config.VersionedPlugins { | |
855 | clientVersions = append(clientVersions, version) | |
856 | ||
857 | if serverVersion != version { | |
858 | continue | |
859 | } | |
860 | return version, plugins, nil | |
861 | } | |
862 | ||
863 | return 0, nil, fmt.Errorf("Incompatible API version with plugin. "+ | |
864 | "Plugin version: %d, Client versions: %d", serverVersion, clientVersions) | |
865 | } | |
866 | ||
bae9f6d2 JC |
867 | // ReattachConfig returns the information that must be provided to NewClient |
868 | // to reattach to the plugin process that this client started. This is | |
869 | // useful for plugins that detach from their parent process. | |
870 | // | |
871 | // If this returns nil then the process hasn't been started yet. Please | |
872 | // call Start or Client before calling this. | |
873 | func (c *Client) ReattachConfig() *ReattachConfig { | |
874 | c.l.Lock() | |
875 | defer c.l.Unlock() | |
876 | ||
877 | if c.address == nil { | |
878 | return nil | |
879 | } | |
880 | ||
881 | if c.config.Cmd != nil && c.config.Cmd.Process == nil { | |
882 | return nil | |
883 | } | |
884 | ||
885 | // If we connected via reattach, just return the information as-is | |
886 | if c.config.Reattach != nil { | |
887 | return c.config.Reattach | |
888 | } | |
889 | ||
890 | return &ReattachConfig{ | |
15c0b25d AP |
891 | Protocol: c.protocol, |
892 | Addr: c.address, | |
893 | Pid: c.config.Cmd.Process.Pid, | |
894 | } | |
895 | } | |
896 | ||
897 | // Protocol returns the protocol of server on the remote end. This will | |
898 | // start the plugin process if it isn't already started. Errors from | |
899 | // starting the plugin are surpressed and ProtocolInvalid is returned. It | |
900 | // is recommended you call Start explicitly before calling Protocol to ensure | |
901 | // no errors occur. | |
902 | func (c *Client) Protocol() Protocol { | |
903 | _, err := c.Start() | |
904 | if err != nil { | |
905 | return ProtocolInvalid | |
906 | } | |
907 | ||
908 | return c.protocol | |
909 | } | |
910 | ||
911 | func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error) { | |
912 | return func(_ string, _ time.Duration) (net.Conn, error) { | |
913 | // Connect to the client | |
914 | conn, err := net.Dial(addr.Network(), addr.String()) | |
915 | if err != nil { | |
916 | return nil, err | |
917 | } | |
918 | if tcpConn, ok := conn.(*net.TCPConn); ok { | |
919 | // Make sure to set keep alive so that the connection doesn't die | |
920 | tcpConn.SetKeepAlive(true) | |
921 | } | |
922 | ||
923 | return conn, nil | |
924 | } | |
925 | } | |
926 | ||
927 | // dialer is compatible with grpc.WithDialer and creates the connection | |
928 | // to the plugin. | |
929 | func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) { | |
930 | conn, err := netAddrDialer(c.address)("", timeout) | |
931 | if err != nil { | |
932 | return nil, err | |
bae9f6d2 | 933 | } |
15c0b25d AP |
934 | |
935 | // If we have a TLS config we wrap our connection. We only do this | |
936 | // for net/rpc since gRPC uses its own mechanism for TLS. | |
937 | if c.protocol == ProtocolNetRPC && c.config.TLSConfig != nil { | |
938 | conn = tls.Client(conn, c.config.TLSConfig) | |
939 | } | |
940 | ||
941 | return conn, nil | |
bae9f6d2 JC |
942 | } |
943 | ||
107c1cdb ND |
944 | var stdErrBufferSize = 64 * 1024 |
945 | ||
bae9f6d2 | 946 | func (c *Client) logStderr(r io.Reader) { |
107c1cdb | 947 | defer c.clientWaitGroup.Done() |
863486a6 | 948 | defer c.stderrWaitGroup.Done() |
107c1cdb ND |
949 | l := c.logger.Named(filepath.Base(c.config.Cmd.Path)) |
950 | ||
951 | reader := bufio.NewReaderSize(r, stdErrBufferSize) | |
952 | // continuation indicates the previous line was a prefix | |
953 | continuation := false | |
954 | ||
bae9f6d2 | 955 | for { |
107c1cdb ND |
956 | line, isPrefix, err := reader.ReadLine() |
957 | switch { | |
958 | case err == io.EOF: | |
959 | return | |
960 | case err != nil: | |
961 | l.Error("reading plugin stderr", "error", err) | |
962 | return | |
963 | } | |
15c0b25d | 964 | |
107c1cdb | 965 | c.config.Stderr.Write(line) |
15c0b25d | 966 | |
107c1cdb ND |
967 | // The line was longer than our max token size, so it's likely |
968 | // incomplete and won't unmarshal. | |
969 | if isPrefix || continuation { | |
970 | l.Debug(string(line)) | |
971 | ||
972 | // if we're finishing a continued line, add the newline back in | |
973 | if !isPrefix { | |
974 | c.config.Stderr.Write([]byte{'\n'}) | |
15c0b25d | 975 | } |
107c1cdb ND |
976 | |
977 | continuation = isPrefix | |
978 | continue | |
bae9f6d2 JC |
979 | } |
980 | ||
107c1cdb ND |
981 | c.config.Stderr.Write([]byte{'\n'}) |
982 | ||
983 | entry, err := parseJSON(line) | |
984 | // If output is not JSON format, print directly to Debug | |
985 | if err != nil { | |
986 | // Attempt to infer the desired log level from the commonly used | |
987 | // string prefixes | |
988 | switch line := string(line); { | |
989 | case strings.HasPrefix(line, "[TRACE]"): | |
990 | l.Trace(line) | |
991 | case strings.HasPrefix(line, "[DEBUG]"): | |
992 | l.Debug(line) | |
993 | case strings.HasPrefix(line, "[INFO]"): | |
994 | l.Info(line) | |
995 | case strings.HasPrefix(line, "[WARN]"): | |
996 | l.Warn(line) | |
997 | case strings.HasPrefix(line, "[ERROR]"): | |
998 | l.Error(line) | |
999 | default: | |
1000 | l.Debug(line) | |
1001 | } | |
1002 | } else { | |
1003 | out := flattenKVPairs(entry.KVPairs) | |
1004 | ||
1005 | out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat)) | |
1006 | switch hclog.LevelFromString(entry.Level) { | |
1007 | case hclog.Trace: | |
1008 | l.Trace(entry.Message, out...) | |
1009 | case hclog.Debug: | |
1010 | l.Debug(entry.Message, out...) | |
1011 | case hclog.Info: | |
1012 | l.Info(entry.Message, out...) | |
1013 | case hclog.Warn: | |
1014 | l.Warn(entry.Message, out...) | |
1015 | case hclog.Error: | |
1016 | l.Error(entry.Message, out...) | |
1017 | default: | |
1018 | // if there was no log level, it's likely this is unexpected | |
1019 | // json from something other than hclog, and we should output | |
1020 | // it verbatim. | |
1021 | l.Debug(string(line)) | |
1022 | } | |
bae9f6d2 JC |
1023 | } |
1024 | } | |
bae9f6d2 | 1025 | } |