]>
Commit | Line | Data |
---|---|---|
bae9f6d2 JC |
1 | package plugin |
2 | ||
3 | import ( | |
4 | "bufio" | |
15c0b25d AP |
5 | "context" |
6 | "crypto/subtle" | |
7 | "crypto/tls" | |
107c1cdb ND |
8 | "crypto/x509" |
9 | "encoding/base64" | |
bae9f6d2 JC |
10 | "errors" |
11 | "fmt" | |
15c0b25d | 12 | "hash" |
bae9f6d2 JC |
13 | "io" |
14 | "io/ioutil" | |
bae9f6d2 JC |
15 | "net" |
16 | "os" | |
17 | "os/exec" | |
18 | "path/filepath" | |
19 | "strconv" | |
20 | "strings" | |
21 | "sync" | |
22 | "sync/atomic" | |
23 | "time" | |
15c0b25d AP |
24 | |
25 | hclog "github.com/hashicorp/go-hclog" | |
bae9f6d2 JC |
26 | ) |
27 | ||
28 | // If this is 1, then we've called CleanupClients. This can be used | |
29 | // by plugin RPC implementations to change error behavior since you | |
30 | // can expected network connection errors at this point. This should be | |
31 | // read by using sync/atomic. | |
32 | var Killed uint32 = 0 | |
33 | ||
34 | // This is a slice of the "managed" clients which are cleaned up when | |
35 | // calling Cleanup | |
36 | var managedClients = make([]*Client, 0, 5) | |
37 | var managedClientsLock sync.Mutex | |
38 | ||
39 | // Error types | |
40 | var ( | |
41 | // ErrProcessNotFound is returned when a client is instantiated to | |
42 | // reattach to an existing process and it isn't found. | |
43 | ErrProcessNotFound = errors.New("Reattachment process not found") | |
15c0b25d AP |
44 | |
45 | // ErrChecksumsDoNotMatch is returned when binary's checksum doesn't match | |
46 | // the one provided in the SecureConfig. | |
47 | ErrChecksumsDoNotMatch = errors.New("checksums did not match") | |
48 | ||
49 | // ErrSecureNoChecksum is returned when an empty checksum is provided to the | |
50 | // SecureConfig. | |
51 | ErrSecureConfigNoChecksum = errors.New("no checksum provided") | |
52 | ||
53 | // ErrSecureNoHash is returned when a nil Hash object is provided to the | |
54 | // SecureConfig. | |
55 | ErrSecureConfigNoHash = errors.New("no hash implementation provided") | |
56 | ||
57 | // ErrSecureConfigAndReattach is returned when both Reattach and | |
58 | // SecureConfig are set. | |
59 | ErrSecureConfigAndReattach = errors.New("only one of Reattach or SecureConfig can be set") | |
bae9f6d2 JC |
60 | ) |
61 | ||
62 | // Client handles the lifecycle of a plugin application. It launches | |
63 | // plugins, connects to them, dispenses interface implementations, and handles | |
64 | // killing the process. | |
65 | // | |
66 | // Plugin hosts should use one Client for each plugin executable. To | |
67 | // dispense a plugin type, use the `Client.Client` function, and then | |
68 | // cal `Dispense`. This awkward API is mostly historical but is used to split | |
69 | // the client that deals with subprocess management and the client that | |
70 | // does RPC management. | |
71 | // | |
72 | // See NewClient and ClientConfig for using a Client. | |
73 | type Client struct { | |
107c1cdb ND |
74 | config *ClientConfig |
75 | exited bool | |
76 | l sync.Mutex | |
77 | address net.Addr | |
78 | process *os.Process | |
79 | client ClientProtocol | |
80 | protocol Protocol | |
81 | logger hclog.Logger | |
82 | doneCtx context.Context | |
83 | ctxCancel context.CancelFunc | |
84 | negotiatedVersion int | |
85 | ||
86 | // clientWaitGroup is used to manage the lifecycle of the plugin management | |
87 | // goroutines. | |
88 | clientWaitGroup sync.WaitGroup | |
89 | ||
90 | // processKilled is used for testing only, to flag when the process was | |
91 | // forcefully killed. | |
92 | processKilled bool | |
93 | } | |
94 | ||
95 | // NegotiatedVersion returns the protocol version negotiated with the server. | |
96 | // This is only valid after Start() is called. | |
97 | func (c *Client) NegotiatedVersion() int { | |
98 | return c.negotiatedVersion | |
bae9f6d2 JC |
99 | } |
100 | ||
101 | // ClientConfig is the configuration used to initialize a new | |
102 | // plugin client. After being used to initialize a plugin client, | |
103 | // that configuration must not be modified again. | |
104 | type ClientConfig struct { | |
105 | // HandshakeConfig is the configuration that must match servers. | |
106 | HandshakeConfig | |
107 | ||
108 | // Plugins are the plugins that can be consumed. | |
107c1cdb ND |
109 | // The implied version of this PluginSet is the Handshake.ProtocolVersion. |
110 | Plugins PluginSet | |
111 | ||
112 | // VersionedPlugins is a map of PluginSets for specific protocol versions. | |
113 | // These can be used to negotiate a compatible version between client and | |
114 | // server. If this is set, Handshake.ProtocolVersion is not required. | |
115 | VersionedPlugins map[int]PluginSet | |
bae9f6d2 JC |
116 | |
117 | // One of the following must be set, but not both. | |
118 | // | |
119 | // Cmd is the unstarted subprocess for starting the plugin. If this is | |
120 | // set, then the Client starts the plugin process on its own and connects | |
121 | // to it. | |
122 | // | |
123 | // Reattach is configuration for reattaching to an existing plugin process | |
124 | // that is already running. This isn't common. | |
125 | Cmd *exec.Cmd | |
126 | Reattach *ReattachConfig | |
127 | ||
15c0b25d AP |
128 | // SecureConfig is configuration for verifying the integrity of the |
129 | // executable. It can not be used with Reattach. | |
130 | SecureConfig *SecureConfig | |
131 | ||
132 | // TLSConfig is used to enable TLS on the RPC client. | |
133 | TLSConfig *tls.Config | |
134 | ||
bae9f6d2 JC |
135 | // Managed represents if the client should be managed by the |
136 | // plugin package or not. If true, then by calling CleanupClients, | |
137 | // it will automatically be cleaned up. Otherwise, the client | |
138 | // user is fully responsible for making sure to Kill all plugin | |
139 | // clients. By default the client is _not_ managed. | |
140 | Managed bool | |
141 | ||
142 | // The minimum and maximum port to use for communicating with | |
143 | // the subprocess. If not set, this defaults to 10,000 and 25,000 | |
144 | // respectively. | |
145 | MinPort, MaxPort uint | |
146 | ||
147 | // StartTimeout is the timeout to wait for the plugin to say it | |
148 | // has started successfully. | |
149 | StartTimeout time.Duration | |
150 | ||
151 | // If non-nil, then the stderr of the client will be written to here | |
152 | // (as well as the log). This is the original os.Stderr of the subprocess. | |
153 | // This isn't the output of synced stderr. | |
154 | Stderr io.Writer | |
155 | ||
156 | // SyncStdout, SyncStderr can be set to override the | |
157 | // respective os.Std* values in the plugin. Care should be taken to | |
158 | // avoid races here. If these are nil, then this will automatically be | |
159 | // hooked up to os.Stdin, Stdout, and Stderr, respectively. | |
160 | // | |
161 | // If the default values (nil) are used, then this package will not | |
162 | // sync any of these streams. | |
163 | SyncStdout io.Writer | |
164 | SyncStderr io.Writer | |
15c0b25d AP |
165 | |
166 | // AllowedProtocols is a list of allowed protocols. If this isn't set, | |
167 | // then only netrpc is allowed. This is so that older go-plugin systems | |
168 | // can show friendly errors if they see a plugin with an unknown | |
169 | // protocol. | |
170 | // | |
171 | // By setting this, you can cause an error immediately on plugin start | |
172 | // if an unsupported protocol is used with a good error message. | |
173 | // | |
174 | // If this isn't set at all (nil value), then only net/rpc is accepted. | |
175 | // This is done for legacy reasons. You must explicitly opt-in to | |
176 | // new protocols. | |
177 | AllowedProtocols []Protocol | |
178 | ||
179 | // Logger is the logger that the client will used. If none is provided, | |
180 | // it will default to hclog's default logger. | |
181 | Logger hclog.Logger | |
107c1cdb ND |
182 | |
183 | // AutoMTLS has the client and server automatically negotiate mTLS for | |
184 | // transport authentication. This ensures that only the original client will | |
185 | // be allowed to connect to the server, and all other connections will be | |
186 | // rejected. The client will also refuse to connect to any server that isn't | |
187 | // the original instance started by the client. | |
188 | // | |
189 | // In this mode of operation, the client generates a one-time use tls | |
190 | // certificate, sends the public x.509 certificate to the new server, and | |
191 | // the server generates a one-time use tls certificate, and sends the public | |
192 | // x.509 certificate back to the client. These are used to authenticate all | |
193 | // rpc connections between the client and server. | |
194 | // | |
195 | // Setting AutoMTLS to true implies that the server must support the | |
196 | // protocol, and correctly negotiate the tls certificates, or a connection | |
197 | // failure will result. | |
198 | // | |
199 | // The client should not set TLSConfig, nor should the server set a | |
200 | // TLSProvider, because AutoMTLS implies that a new certificate and tls | |
201 | // configuration will be generated at startup. | |
202 | // | |
203 | // You cannot Reattach to a server with this option enabled. | |
204 | AutoMTLS bool | |
bae9f6d2 JC |
205 | } |
206 | ||
207 | // ReattachConfig is used to configure a client to reattach to an | |
208 | // already-running plugin process. You can retrieve this information by | |
209 | // calling ReattachConfig on Client. | |
210 | type ReattachConfig struct { | |
15c0b25d AP |
211 | Protocol Protocol |
212 | Addr net.Addr | |
213 | Pid int | |
214 | } | |
215 | ||
216 | // SecureConfig is used to configure a client to verify the integrity of an | |
217 | // executable before running. It does this by verifying the checksum is | |
218 | // expected. Hash is used to specify the hashing method to use when checksumming | |
219 | // the file. The configuration is verified by the client by calling the | |
220 | // SecureConfig.Check() function. | |
221 | // | |
222 | // The host process should ensure the checksum was provided by a trusted and | |
223 | // authoritative source. The binary should be installed in such a way that it | |
224 | // can not be modified by an unauthorized user between the time of this check | |
225 | // and the time of execution. | |
226 | type SecureConfig struct { | |
227 | Checksum []byte | |
228 | Hash hash.Hash | |
229 | } | |
230 | ||
231 | // Check takes the filepath to an executable and returns true if the checksum of | |
232 | // the file matches the checksum provided in the SecureConfig. | |
233 | func (s *SecureConfig) Check(filePath string) (bool, error) { | |
234 | if len(s.Checksum) == 0 { | |
235 | return false, ErrSecureConfigNoChecksum | |
236 | } | |
237 | ||
238 | if s.Hash == nil { | |
239 | return false, ErrSecureConfigNoHash | |
240 | } | |
241 | ||
242 | file, err := os.Open(filePath) | |
243 | if err != nil { | |
244 | return false, err | |
245 | } | |
246 | defer file.Close() | |
247 | ||
248 | _, err = io.Copy(s.Hash, file) | |
249 | if err != nil { | |
250 | return false, err | |
251 | } | |
252 | ||
253 | sum := s.Hash.Sum(nil) | |
254 | ||
255 | return subtle.ConstantTimeCompare(sum, s.Checksum) == 1, nil | |
bae9f6d2 JC |
256 | } |
257 | ||
258 | // This makes sure all the managed subprocesses are killed and properly | |
259 | // logged. This should be called before the parent process running the | |
260 | // plugins exits. | |
261 | // | |
262 | // This must only be called _once_. | |
263 | func CleanupClients() { | |
264 | // Set the killed to true so that we don't get unexpected panics | |
265 | atomic.StoreUint32(&Killed, 1) | |
266 | ||
267 | // Kill all the managed clients in parallel and use a WaitGroup | |
268 | // to wait for them all to finish up. | |
269 | var wg sync.WaitGroup | |
270 | managedClientsLock.Lock() | |
271 | for _, client := range managedClients { | |
272 | wg.Add(1) | |
273 | ||
274 | go func(client *Client) { | |
275 | client.Kill() | |
276 | wg.Done() | |
277 | }(client) | |
278 | } | |
279 | managedClientsLock.Unlock() | |
280 | ||
bae9f6d2 JC |
281 | wg.Wait() |
282 | } | |
283 | ||
284 | // Creates a new plugin client which manages the lifecycle of an external | |
285 | // plugin and gets the address for the RPC connection. | |
286 | // | |
287 | // The client must be cleaned up at some point by calling Kill(). If | |
288 | // the client is a managed client (created with NewManagedClient) you | |
289 | // can just call CleanupClients at the end of your program and they will | |
290 | // be properly cleaned. | |
291 | func NewClient(config *ClientConfig) (c *Client) { | |
292 | if config.MinPort == 0 && config.MaxPort == 0 { | |
293 | config.MinPort = 10000 | |
294 | config.MaxPort = 25000 | |
295 | } | |
296 | ||
297 | if config.StartTimeout == 0 { | |
298 | config.StartTimeout = 1 * time.Minute | |
299 | } | |
300 | ||
301 | if config.Stderr == nil { | |
302 | config.Stderr = ioutil.Discard | |
303 | } | |
304 | ||
305 | if config.SyncStdout == nil { | |
306 | config.SyncStdout = ioutil.Discard | |
307 | } | |
308 | if config.SyncStderr == nil { | |
309 | config.SyncStderr = ioutil.Discard | |
310 | } | |
311 | ||
15c0b25d AP |
312 | if config.AllowedProtocols == nil { |
313 | config.AllowedProtocols = []Protocol{ProtocolNetRPC} | |
314 | } | |
315 | ||
316 | if config.Logger == nil { | |
317 | config.Logger = hclog.New(&hclog.LoggerOptions{ | |
318 | Output: hclog.DefaultOutput, | |
319 | Level: hclog.Trace, | |
320 | Name: "plugin", | |
321 | }) | |
322 | } | |
323 | ||
324 | c = &Client{ | |
325 | config: config, | |
326 | logger: config.Logger, | |
327 | } | |
bae9f6d2 JC |
328 | if config.Managed { |
329 | managedClientsLock.Lock() | |
330 | managedClients = append(managedClients, c) | |
331 | managedClientsLock.Unlock() | |
332 | } | |
333 | ||
334 | return | |
335 | } | |
336 | ||
15c0b25d | 337 | // Client returns the protocol client for this connection. |
bae9f6d2 | 338 | // |
15c0b25d AP |
339 | // Subsequent calls to this will return the same client. |
340 | func (c *Client) Client() (ClientProtocol, error) { | |
341 | _, err := c.Start() | |
bae9f6d2 JC |
342 | if err != nil { |
343 | return nil, err | |
344 | } | |
345 | ||
346 | c.l.Lock() | |
347 | defer c.l.Unlock() | |
348 | ||
349 | if c.client != nil { | |
350 | return c.client, nil | |
351 | } | |
352 | ||
15c0b25d AP |
353 | switch c.protocol { |
354 | case ProtocolNetRPC: | |
355 | c.client, err = newRPCClient(c) | |
bae9f6d2 | 356 | |
15c0b25d AP |
357 | case ProtocolGRPC: |
358 | c.client, err = newGRPCClient(c.doneCtx, c) | |
359 | ||
360 | default: | |
361 | return nil, fmt.Errorf("unknown server protocol: %s", c.protocol) | |
bae9f6d2 JC |
362 | } |
363 | ||
bae9f6d2 | 364 | if err != nil { |
bae9f6d2 JC |
365 | c.client = nil |
366 | return nil, err | |
367 | } | |
368 | ||
369 | return c.client, nil | |
370 | } | |
371 | ||
372 | // Tells whether or not the underlying process has exited. | |
373 | func (c *Client) Exited() bool { | |
374 | c.l.Lock() | |
375 | defer c.l.Unlock() | |
376 | return c.exited | |
377 | } | |
378 | ||
107c1cdb ND |
379 | // killed is used in tests to check if a process failed to exit gracefully, and |
380 | // needed to be killed. | |
381 | func (c *Client) killed() bool { | |
382 | c.l.Lock() | |
383 | defer c.l.Unlock() | |
384 | return c.processKilled | |
385 | } | |
386 | ||
bae9f6d2 JC |
387 | // End the executing subprocess (if it is running) and perform any cleanup |
388 | // tasks necessary such as capturing any remaining logs and so on. | |
389 | // | |
390 | // This method blocks until the process successfully exits. | |
391 | // | |
392 | // This method can safely be called multiple times. | |
393 | func (c *Client) Kill() { | |
394 | // Grab a lock to read some private fields. | |
395 | c.l.Lock() | |
396 | process := c.process | |
397 | addr := c.address | |
bae9f6d2 JC |
398 | c.l.Unlock() |
399 | ||
107c1cdb | 400 | // If there is no process, there is nothing to kill. |
bae9f6d2 JC |
401 | if process == nil { |
402 | return | |
403 | } | |
404 | ||
107c1cdb ND |
405 | defer func() { |
406 | // Wait for the all client goroutines to finish. | |
407 | c.clientWaitGroup.Wait() | |
408 | ||
409 | // Make sure there is no reference to the old process after it has been | |
410 | // killed. | |
411 | c.l.Lock() | |
412 | c.process = nil | |
413 | c.l.Unlock() | |
414 | }() | |
415 | ||
bae9f6d2 JC |
416 | // We need to check for address here. It is possible that the plugin |
417 | // started (process != nil) but has no address (addr == nil) if the | |
418 | // plugin failed at startup. If we do have an address, we need to close | |
419 | // the plugin net connections. | |
420 | graceful := false | |
421 | if addr != nil { | |
422 | // Close the client to cleanly exit the process. | |
423 | client, err := c.Client() | |
424 | if err == nil { | |
425 | err = client.Close() | |
426 | ||
427 | // If there is no error, then we attempt to wait for a graceful | |
428 | // exit. If there was an error, we assume that graceful cleanup | |
429 | // won't happen and just force kill. | |
430 | graceful = err == nil | |
431 | if err != nil { | |
432 | // If there was an error just log it. We're going to force | |
433 | // kill in a moment anyways. | |
15c0b25d | 434 | c.logger.Warn("error closing client during Kill", "err", err) |
bae9f6d2 | 435 | } |
107c1cdb ND |
436 | } else { |
437 | c.logger.Error("client", "error", err) | |
bae9f6d2 JC |
438 | } |
439 | } | |
440 | ||
441 | // If we're attempting a graceful exit, then we wait for a short period | |
442 | // of time to allow that to happen. To wait for this we just wait on the | |
443 | // doneCh which would be closed if the process exits. | |
444 | if graceful { | |
445 | select { | |
107c1cdb ND |
446 | case <-c.doneCtx.Done(): |
447 | c.logger.Debug("plugin exited") | |
bae9f6d2 | 448 | return |
107c1cdb | 449 | case <-time.After(2 * time.Second): |
bae9f6d2 JC |
450 | } |
451 | } | |
452 | ||
453 | // If graceful exiting failed, just kill it | |
107c1cdb | 454 | c.logger.Warn("plugin failed to exit gracefully") |
bae9f6d2 JC |
455 | process.Kill() |
456 | ||
107c1cdb ND |
457 | c.l.Lock() |
458 | c.processKilled = true | |
459 | c.l.Unlock() | |
bae9f6d2 JC |
460 | } |
461 | ||
462 | // Starts the underlying subprocess, communicating with it to negotiate | |
463 | // a port for RPC connections, and returning the address to connect via RPC. | |
464 | // | |
465 | // This method is safe to call multiple times. Subsequent calls have no effect. | |
466 | // Once a client has been started once, it cannot be started again, even if | |
467 | // it was killed. | |
468 | func (c *Client) Start() (addr net.Addr, err error) { | |
469 | c.l.Lock() | |
470 | defer c.l.Unlock() | |
471 | ||
472 | if c.address != nil { | |
473 | return c.address, nil | |
474 | } | |
475 | ||
476 | // If one of cmd or reattach isn't set, then it is an error. We wrap | |
477 | // this in a {} for scoping reasons, and hopeful that the escape | |
107c1cdb | 478 | // analysis will pop the stack here. |
bae9f6d2 JC |
479 | { |
480 | cmdSet := c.config.Cmd != nil | |
481 | attachSet := c.config.Reattach != nil | |
15c0b25d | 482 | secureSet := c.config.SecureConfig != nil |
bae9f6d2 JC |
483 | if cmdSet == attachSet { |
484 | return nil, fmt.Errorf("Only one of Cmd or Reattach must be set") | |
485 | } | |
15c0b25d AP |
486 | |
487 | if secureSet && attachSet { | |
488 | return nil, ErrSecureConfigAndReattach | |
489 | } | |
bae9f6d2 JC |
490 | } |
491 | ||
bae9f6d2 | 492 | if c.config.Reattach != nil { |
107c1cdb ND |
493 | return c.reattach() |
494 | } | |
bae9f6d2 | 495 | |
107c1cdb ND |
496 | if c.config.VersionedPlugins == nil { |
497 | c.config.VersionedPlugins = make(map[int]PluginSet) | |
498 | } | |
bae9f6d2 | 499 | |
107c1cdb ND |
500 | // handle all plugins as versioned, using the handshake config as the default. |
501 | version := int(c.config.ProtocolVersion) | |
502 | ||
503 | // Make sure we're not overwriting a real version 0. If ProtocolVersion was | |
504 | // non-zero, then we have to just assume the user made sure that | |
505 | // VersionedPlugins doesn't conflict. | |
506 | if _, ok := c.config.VersionedPlugins[version]; !ok && c.config.Plugins != nil { | |
507 | c.config.VersionedPlugins[version] = c.config.Plugins | |
508 | } | |
509 | ||
510 | var versionStrings []string | |
511 | for v := range c.config.VersionedPlugins { | |
512 | versionStrings = append(versionStrings, strconv.Itoa(v)) | |
bae9f6d2 JC |
513 | } |
514 | ||
515 | env := []string{ | |
516 | fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue), | |
517 | fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort), | |
518 | fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort), | |
107c1cdb | 519 | fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")), |
bae9f6d2 JC |
520 | } |
521 | ||
bae9f6d2 JC |
522 | cmd := c.config.Cmd |
523 | cmd.Env = append(cmd.Env, os.Environ()...) | |
524 | cmd.Env = append(cmd.Env, env...) | |
525 | cmd.Stdin = os.Stdin | |
107c1cdb ND |
526 | |
527 | cmdStdout, err := cmd.StdoutPipe() | |
528 | if err != nil { | |
529 | return nil, err | |
530 | } | |
531 | cmdStderr, err := cmd.StderrPipe() | |
532 | if err != nil { | |
533 | return nil, err | |
534 | } | |
bae9f6d2 | 535 | |
15c0b25d AP |
536 | if c.config.SecureConfig != nil { |
537 | if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil { | |
538 | return nil, fmt.Errorf("error verifying checksum: %s", err) | |
539 | } else if !ok { | |
540 | return nil, ErrChecksumsDoNotMatch | |
541 | } | |
542 | } | |
543 | ||
107c1cdb ND |
544 | // Setup a temporary certificate for client/server mtls, and send the public |
545 | // certificate to the plugin. | |
546 | if c.config.AutoMTLS { | |
547 | c.logger.Info("configuring client automatic mTLS") | |
548 | certPEM, keyPEM, err := generateCert() | |
549 | if err != nil { | |
550 | c.logger.Error("failed to generate client certificate", "error", err) | |
551 | return nil, err | |
552 | } | |
553 | cert, err := tls.X509KeyPair(certPEM, keyPEM) | |
554 | if err != nil { | |
555 | c.logger.Error("failed to parse client certificate", "error", err) | |
556 | return nil, err | |
557 | } | |
558 | ||
559 | cmd.Env = append(cmd.Env, fmt.Sprintf("PLUGIN_CLIENT_CERT=%s", certPEM)) | |
560 | ||
561 | c.config.TLSConfig = &tls.Config{ | |
562 | Certificates: []tls.Certificate{cert}, | |
563 | ServerName: "localhost", | |
564 | } | |
565 | } | |
566 | ||
15c0b25d | 567 | c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args) |
bae9f6d2 JC |
568 | err = cmd.Start() |
569 | if err != nil { | |
570 | return | |
571 | } | |
572 | ||
573 | // Set the process | |
574 | c.process = cmd.Process | |
107c1cdb | 575 | c.logger.Debug("plugin started", "path", cmd.Path, "pid", c.process.Pid) |
bae9f6d2 JC |
576 | |
577 | // Make sure the command is properly cleaned up if there is an error | |
578 | defer func() { | |
579 | r := recover() | |
580 | ||
581 | if err != nil || r != nil { | |
582 | cmd.Process.Kill() | |
583 | } | |
584 | ||
585 | if r != nil { | |
586 | panic(r) | |
587 | } | |
588 | }() | |
589 | ||
107c1cdb ND |
590 | // Create a context for when we kill |
591 | c.doneCtx, c.ctxCancel = context.WithCancel(context.Background()) | |
592 | ||
593 | c.clientWaitGroup.Add(1) | |
bae9f6d2 | 594 | go func() { |
107c1cdb ND |
595 | // ensure the context is cancelled when we're done |
596 | defer c.ctxCancel() | |
597 | ||
598 | defer c.clientWaitGroup.Done() | |
599 | ||
600 | // get the cmd info early, since the process information will be removed | |
601 | // in Kill. | |
602 | pid := c.process.Pid | |
603 | path := cmd.Path | |
bae9f6d2 JC |
604 | |
605 | // Wait for the command to end. | |
107c1cdb ND |
606 | err := cmd.Wait() |
607 | ||
608 | debugMsgArgs := []interface{}{ | |
609 | "path", path, | |
610 | "pid", pid, | |
611 | } | |
612 | if err != nil { | |
613 | debugMsgArgs = append(debugMsgArgs, | |
614 | []interface{}{"error", err.Error()}...) | |
615 | } | |
bae9f6d2 JC |
616 | |
617 | // Log and make sure to flush the logs write away | |
107c1cdb | 618 | c.logger.Debug("plugin process exited", debugMsgArgs...) |
bae9f6d2 JC |
619 | os.Stderr.Sync() |
620 | ||
bae9f6d2 JC |
621 | // Set that we exited, which takes a lock |
622 | c.l.Lock() | |
623 | defer c.l.Unlock() | |
624 | c.exited = true | |
625 | }() | |
626 | ||
627 | // Start goroutine that logs the stderr | |
107c1cdb ND |
628 | c.clientWaitGroup.Add(1) |
629 | // logStderr calls Done() | |
630 | go c.logStderr(cmdStderr) | |
bae9f6d2 JC |
631 | |
632 | // Start a goroutine that is going to be reading the lines | |
633 | // out of stdout | |
107c1cdb ND |
634 | linesCh := make(chan string) |
635 | c.clientWaitGroup.Add(1) | |
bae9f6d2 | 636 | go func() { |
107c1cdb | 637 | defer c.clientWaitGroup.Done() |
bae9f6d2 JC |
638 | defer close(linesCh) |
639 | ||
107c1cdb ND |
640 | scanner := bufio.NewScanner(cmdStdout) |
641 | for scanner.Scan() { | |
642 | linesCh <- scanner.Text() | |
bae9f6d2 JC |
643 | } |
644 | }() | |
645 | ||
646 | // Make sure after we exit we read the lines from stdout forever | |
107c1cdb ND |
647 | // so they don't block since it is a pipe. |
648 | // The scanner goroutine above will close this, but track it with a wait | |
649 | // group for completeness. | |
650 | c.clientWaitGroup.Add(1) | |
bae9f6d2 JC |
651 | defer func() { |
652 | go func() { | |
107c1cdb ND |
653 | defer c.clientWaitGroup.Done() |
654 | for range linesCh { | |
bae9f6d2 JC |
655 | } |
656 | }() | |
657 | }() | |
658 | ||
659 | // Some channels for the next step | |
660 | timeout := time.After(c.config.StartTimeout) | |
661 | ||
662 | // Start looking for the address | |
15c0b25d | 663 | c.logger.Debug("waiting for RPC address", "path", cmd.Path) |
bae9f6d2 JC |
664 | select { |
665 | case <-timeout: | |
666 | err = errors.New("timeout while waiting for plugin to start") | |
107c1cdb | 667 | case <-c.doneCtx.Done(): |
bae9f6d2 | 668 | err = errors.New("plugin exited before we could connect") |
107c1cdb | 669 | case line := <-linesCh: |
bae9f6d2 JC |
670 | // Trim the line and split by "|" in order to get the parts of |
671 | // the output. | |
107c1cdb | 672 | line = strings.TrimSpace(line) |
15c0b25d | 673 | parts := strings.SplitN(line, "|", 6) |
bae9f6d2 JC |
674 | if len(parts) < 4 { |
675 | err = fmt.Errorf( | |
676 | "Unrecognized remote plugin message: %s\n\n"+ | |
677 | "This usually means that the plugin is either invalid or simply\n"+ | |
678 | "needs to be recompiled to support the latest protocol.", line) | |
679 | return | |
680 | } | |
681 | ||
682 | // Check the core protocol. Wrapped in a {} for scoping. | |
683 | { | |
684 | var coreProtocol int64 | |
685 | coreProtocol, err = strconv.ParseInt(parts[0], 10, 0) | |
686 | if err != nil { | |
687 | err = fmt.Errorf("Error parsing core protocol version: %s", err) | |
688 | return | |
689 | } | |
690 | ||
691 | if int(coreProtocol) != CoreProtocolVersion { | |
692 | err = fmt.Errorf("Incompatible core API version with plugin. "+ | |
15c0b25d | 693 | "Plugin version: %s, Core version: %d\n\n"+ |
bae9f6d2 JC |
694 | "To fix this, the plugin usually only needs to be recompiled.\n"+ |
695 | "Please report this to the plugin author.", parts[0], CoreProtocolVersion) | |
696 | return | |
697 | } | |
698 | } | |
699 | ||
107c1cdb ND |
700 | // Test the API version |
701 | version, pluginSet, err := c.checkProtoVersion(parts[1]) | |
bae9f6d2 | 702 | if err != nil { |
107c1cdb | 703 | return addr, err |
bae9f6d2 JC |
704 | } |
705 | ||
107c1cdb ND |
706 | // set the Plugins value to the compatible set, so the version |
707 | // doesn't need to be passed through to the ClientProtocol | |
708 | // implementation. | |
709 | c.config.Plugins = pluginSet | |
710 | c.negotiatedVersion = version | |
711 | c.logger.Debug("using plugin", "version", version) | |
bae9f6d2 JC |
712 | |
713 | switch parts[2] { | |
714 | case "tcp": | |
715 | addr, err = net.ResolveTCPAddr("tcp", parts[3]) | |
716 | case "unix": | |
717 | addr, err = net.ResolveUnixAddr("unix", parts[3]) | |
718 | default: | |
719 | err = fmt.Errorf("Unknown address type: %s", parts[3]) | |
720 | } | |
15c0b25d AP |
721 | |
722 | // If we have a server type, then record that. We default to net/rpc | |
723 | // for backwards compatibility. | |
724 | c.protocol = ProtocolNetRPC | |
725 | if len(parts) >= 5 { | |
726 | c.protocol = Protocol(parts[4]) | |
727 | } | |
728 | ||
729 | found := false | |
730 | for _, p := range c.config.AllowedProtocols { | |
731 | if p == c.protocol { | |
732 | found = true | |
733 | break | |
734 | } | |
735 | } | |
736 | if !found { | |
737 | err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v", | |
738 | c.protocol, c.config.AllowedProtocols) | |
107c1cdb | 739 | return addr, err |
15c0b25d AP |
740 | } |
741 | ||
107c1cdb ND |
742 | // See if we have a TLS certificate from the server. |
743 | // Checking if the length is > 50 rules out catching the unused "extra" | |
744 | // data returned from some older implementations. | |
745 | if len(parts) >= 6 && len(parts[5]) > 50 { | |
746 | err := c.loadServerCert(parts[5]) | |
747 | if err != nil { | |
748 | return nil, fmt.Errorf("error parsing server cert: %s", err) | |
749 | } | |
750 | } | |
bae9f6d2 JC |
751 | } |
752 | ||
753 | c.address = addr | |
754 | return | |
755 | } | |
756 | ||
107c1cdb ND |
757 | // loadServerCert is used by AutoMTLS to read an x.509 cert returned by the |
758 | // server, and load it as the RootCA for the client TLSConfig. | |
759 | func (c *Client) loadServerCert(cert string) error { | |
760 | certPool := x509.NewCertPool() | |
761 | ||
762 | asn1, err := base64.RawStdEncoding.DecodeString(cert) | |
763 | if err != nil { | |
764 | return err | |
765 | } | |
766 | ||
767 | x509Cert, err := x509.ParseCertificate([]byte(asn1)) | |
768 | if err != nil { | |
769 | return err | |
770 | } | |
771 | ||
772 | certPool.AddCert(x509Cert) | |
773 | ||
774 | c.config.TLSConfig.RootCAs = certPool | |
775 | return nil | |
776 | } | |
777 | ||
778 | func (c *Client) reattach() (net.Addr, error) { | |
779 | // Verify the process still exists. If not, then it is an error | |
780 | p, err := os.FindProcess(c.config.Reattach.Pid) | |
781 | if err != nil { | |
782 | return nil, err | |
783 | } | |
784 | ||
785 | // Attempt to connect to the addr since on Unix systems FindProcess | |
786 | // doesn't actually return an error if it can't find the process. | |
787 | conn, err := net.Dial( | |
788 | c.config.Reattach.Addr.Network(), | |
789 | c.config.Reattach.Addr.String()) | |
790 | if err != nil { | |
791 | p.Kill() | |
792 | return nil, ErrProcessNotFound | |
793 | } | |
794 | conn.Close() | |
795 | ||
796 | // Create a context for when we kill | |
797 | c.doneCtx, c.ctxCancel = context.WithCancel(context.Background()) | |
798 | ||
799 | c.clientWaitGroup.Add(1) | |
800 | // Goroutine to mark exit status | |
801 | go func(pid int) { | |
802 | defer c.clientWaitGroup.Done() | |
803 | ||
804 | // ensure the context is cancelled when we're done | |
805 | defer c.ctxCancel() | |
806 | ||
807 | // Wait for the process to die | |
808 | pidWait(pid) | |
809 | ||
810 | // Log so we can see it | |
811 | c.logger.Debug("reattached plugin process exited") | |
812 | ||
813 | // Mark it | |
814 | c.l.Lock() | |
815 | defer c.l.Unlock() | |
816 | c.exited = true | |
817 | }(p.Pid) | |
818 | ||
819 | // Set the address and process | |
820 | c.address = c.config.Reattach.Addr | |
821 | c.process = p | |
822 | c.protocol = c.config.Reattach.Protocol | |
823 | if c.protocol == "" { | |
824 | // Default the protocol to net/rpc for backwards compatibility | |
825 | c.protocol = ProtocolNetRPC | |
826 | } | |
827 | ||
828 | return c.address, nil | |
829 | } | |
830 | ||
831 | // checkProtoVersion returns the negotiated version and PluginSet. | |
832 | // This returns an error if the server returned an incompatible protocol | |
833 | // version, or an invalid handshake response. | |
834 | func (c *Client) checkProtoVersion(protoVersion string) (int, PluginSet, error) { | |
835 | serverVersion, err := strconv.Atoi(protoVersion) | |
836 | if err != nil { | |
837 | return 0, nil, fmt.Errorf("Error parsing protocol version %q: %s", protoVersion, err) | |
838 | } | |
839 | ||
840 | // record these for the error message | |
841 | var clientVersions []int | |
842 | ||
843 | // all versions, including the legacy ProtocolVersion have been added to | |
844 | // the versions set | |
845 | for version, plugins := range c.config.VersionedPlugins { | |
846 | clientVersions = append(clientVersions, version) | |
847 | ||
848 | if serverVersion != version { | |
849 | continue | |
850 | } | |
851 | return version, plugins, nil | |
852 | } | |
853 | ||
854 | return 0, nil, fmt.Errorf("Incompatible API version with plugin. "+ | |
855 | "Plugin version: %d, Client versions: %d", serverVersion, clientVersions) | |
856 | } | |
857 | ||
bae9f6d2 JC |
858 | // ReattachConfig returns the information that must be provided to NewClient |
859 | // to reattach to the plugin process that this client started. This is | |
860 | // useful for plugins that detach from their parent process. | |
861 | // | |
862 | // If this returns nil then the process hasn't been started yet. Please | |
863 | // call Start or Client before calling this. | |
864 | func (c *Client) ReattachConfig() *ReattachConfig { | |
865 | c.l.Lock() | |
866 | defer c.l.Unlock() | |
867 | ||
868 | if c.address == nil { | |
869 | return nil | |
870 | } | |
871 | ||
872 | if c.config.Cmd != nil && c.config.Cmd.Process == nil { | |
873 | return nil | |
874 | } | |
875 | ||
876 | // If we connected via reattach, just return the information as-is | |
877 | if c.config.Reattach != nil { | |
878 | return c.config.Reattach | |
879 | } | |
880 | ||
881 | return &ReattachConfig{ | |
15c0b25d AP |
882 | Protocol: c.protocol, |
883 | Addr: c.address, | |
884 | Pid: c.config.Cmd.Process.Pid, | |
885 | } | |
886 | } | |
887 | ||
888 | // Protocol returns the protocol of server on the remote end. This will | |
889 | // start the plugin process if it isn't already started. Errors from | |
890 | // starting the plugin are surpressed and ProtocolInvalid is returned. It | |
891 | // is recommended you call Start explicitly before calling Protocol to ensure | |
892 | // no errors occur. | |
893 | func (c *Client) Protocol() Protocol { | |
894 | _, err := c.Start() | |
895 | if err != nil { | |
896 | return ProtocolInvalid | |
897 | } | |
898 | ||
899 | return c.protocol | |
900 | } | |
901 | ||
902 | func netAddrDialer(addr net.Addr) func(string, time.Duration) (net.Conn, error) { | |
903 | return func(_ string, _ time.Duration) (net.Conn, error) { | |
904 | // Connect to the client | |
905 | conn, err := net.Dial(addr.Network(), addr.String()) | |
906 | if err != nil { | |
907 | return nil, err | |
908 | } | |
909 | if tcpConn, ok := conn.(*net.TCPConn); ok { | |
910 | // Make sure to set keep alive so that the connection doesn't die | |
911 | tcpConn.SetKeepAlive(true) | |
912 | } | |
913 | ||
914 | return conn, nil | |
915 | } | |
916 | } | |
917 | ||
918 | // dialer is compatible with grpc.WithDialer and creates the connection | |
919 | // to the plugin. | |
920 | func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) { | |
921 | conn, err := netAddrDialer(c.address)("", timeout) | |
922 | if err != nil { | |
923 | return nil, err | |
bae9f6d2 | 924 | } |
15c0b25d AP |
925 | |
926 | // If we have a TLS config we wrap our connection. We only do this | |
927 | // for net/rpc since gRPC uses its own mechanism for TLS. | |
928 | if c.protocol == ProtocolNetRPC && c.config.TLSConfig != nil { | |
929 | conn = tls.Client(conn, c.config.TLSConfig) | |
930 | } | |
931 | ||
932 | return conn, nil | |
bae9f6d2 JC |
933 | } |
934 | ||
107c1cdb ND |
935 | var stdErrBufferSize = 64 * 1024 |
936 | ||
bae9f6d2 | 937 | func (c *Client) logStderr(r io.Reader) { |
107c1cdb ND |
938 | defer c.clientWaitGroup.Done() |
939 | l := c.logger.Named(filepath.Base(c.config.Cmd.Path)) | |
940 | ||
941 | reader := bufio.NewReaderSize(r, stdErrBufferSize) | |
942 | // continuation indicates the previous line was a prefix | |
943 | continuation := false | |
944 | ||
bae9f6d2 | 945 | for { |
107c1cdb ND |
946 | line, isPrefix, err := reader.ReadLine() |
947 | switch { | |
948 | case err == io.EOF: | |
949 | return | |
950 | case err != nil: | |
951 | l.Error("reading plugin stderr", "error", err) | |
952 | return | |
953 | } | |
15c0b25d | 954 | |
107c1cdb | 955 | c.config.Stderr.Write(line) |
15c0b25d | 956 | |
107c1cdb ND |
957 | // The line was longer than our max token size, so it's likely |
958 | // incomplete and won't unmarshal. | |
959 | if isPrefix || continuation { | |
960 | l.Debug(string(line)) | |
961 | ||
962 | // if we're finishing a continued line, add the newline back in | |
963 | if !isPrefix { | |
964 | c.config.Stderr.Write([]byte{'\n'}) | |
15c0b25d | 965 | } |
107c1cdb ND |
966 | |
967 | continuation = isPrefix | |
968 | continue | |
bae9f6d2 JC |
969 | } |
970 | ||
107c1cdb ND |
971 | c.config.Stderr.Write([]byte{'\n'}) |
972 | ||
973 | entry, err := parseJSON(line) | |
974 | // If output is not JSON format, print directly to Debug | |
975 | if err != nil { | |
976 | // Attempt to infer the desired log level from the commonly used | |
977 | // string prefixes | |
978 | switch line := string(line); { | |
979 | case strings.HasPrefix(line, "[TRACE]"): | |
980 | l.Trace(line) | |
981 | case strings.HasPrefix(line, "[DEBUG]"): | |
982 | l.Debug(line) | |
983 | case strings.HasPrefix(line, "[INFO]"): | |
984 | l.Info(line) | |
985 | case strings.HasPrefix(line, "[WARN]"): | |
986 | l.Warn(line) | |
987 | case strings.HasPrefix(line, "[ERROR]"): | |
988 | l.Error(line) | |
989 | default: | |
990 | l.Debug(line) | |
991 | } | |
992 | } else { | |
993 | out := flattenKVPairs(entry.KVPairs) | |
994 | ||
995 | out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat)) | |
996 | switch hclog.LevelFromString(entry.Level) { | |
997 | case hclog.Trace: | |
998 | l.Trace(entry.Message, out...) | |
999 | case hclog.Debug: | |
1000 | l.Debug(entry.Message, out...) | |
1001 | case hclog.Info: | |
1002 | l.Info(entry.Message, out...) | |
1003 | case hclog.Warn: | |
1004 | l.Warn(entry.Message, out...) | |
1005 | case hclog.Error: | |
1006 | l.Error(entry.Message, out...) | |
1007 | default: | |
1008 | // if there was no log level, it's likely this is unexpected | |
1009 | // json from something other than hclog, and we should output | |
1010 | // it verbatim. | |
1011 | l.Debug(string(line)) | |
1012 | } | |
bae9f6d2 JC |
1013 | } |
1014 | } | |
bae9f6d2 | 1015 | } |