diff options
Diffstat (limited to 'vendor/github.com/hashicorp/go-plugin/client.go')
-rw-r--r-- | vendor/github.com/hashicorp/go-plugin/client.go | 523 |
1 files changed, 372 insertions, 151 deletions
diff --git a/vendor/github.com/hashicorp/go-plugin/client.go b/vendor/github.com/hashicorp/go-plugin/client.go index b3e3b78..679e10a 100644 --- a/vendor/github.com/hashicorp/go-plugin/client.go +++ b/vendor/github.com/hashicorp/go-plugin/client.go | |||
@@ -5,12 +5,13 @@ import ( | |||
5 | "context" | 5 | "context" |
6 | "crypto/subtle" | 6 | "crypto/subtle" |
7 | "crypto/tls" | 7 | "crypto/tls" |
8 | "crypto/x509" | ||
9 | "encoding/base64" | ||
8 | "errors" | 10 | "errors" |
9 | "fmt" | 11 | "fmt" |
10 | "hash" | 12 | "hash" |
11 | "io" | 13 | "io" |
12 | "io/ioutil" | 14 | "io/ioutil" |
13 | "log" | ||
14 | "net" | 15 | "net" |
15 | "os" | 16 | "os" |
16 | "os/exec" | 17 | "os/exec" |
@@ -20,7 +21,6 @@ import ( | |||
20 | "sync" | 21 | "sync" |
21 | "sync/atomic" | 22 | "sync/atomic" |
22 | "time" | 23 | "time" |
23 | "unicode" | ||
24 | 24 | ||
25 | hclog "github.com/hashicorp/go-hclog" | 25 | hclog "github.com/hashicorp/go-hclog" |
26 | ) | 26 | ) |
@@ -71,16 +71,31 @@ var ( | |||
71 | // | 71 | // |
72 | // See NewClient and ClientConfig for using a Client. | 72 | // See NewClient and ClientConfig for using a Client. |
73 | type Client struct { | 73 | type Client struct { |
74 | config *ClientConfig | 74 | config *ClientConfig |
75 | exited bool | 75 | exited bool |
76 | doneLogging chan struct{} | 76 | l sync.Mutex |
77 | l sync.Mutex | 77 | address net.Addr |
78 | address net.Addr | 78 | process *os.Process |
79 | process *os.Process | 79 | client ClientProtocol |
80 | client ClientProtocol | 80 | protocol Protocol |
81 | protocol Protocol | 81 | logger hclog.Logger |
82 | logger hclog.Logger | 82 | doneCtx context.Context |
83 | doneCtx context.Context | 83 | ctxCancel context.CancelFunc |
84 | negotiatedVersion int | ||
85 | |||
86 | // clientWaitGroup is used to manage the lifecycle of the plugin management | ||
87 | // goroutines. | ||
88 | clientWaitGroup sync.WaitGroup | ||
89 | |||
90 | // processKilled is used for testing only, to flag when the process was | ||
91 | // forcefully killed. | ||
92 | processKilled bool | ||
93 | } | ||
94 | |||
95 | // NegotiatedVersion returns the protocol version negotiated with the server. | ||
96 | // This is only valid after Start() is called. | ||
97 | func (c *Client) NegotiatedVersion() int { | ||
98 | return c.negotiatedVersion | ||
84 | } | 99 | } |
85 | 100 | ||
86 | // ClientConfig is the configuration used to initialize a new | 101 | // ClientConfig is the configuration used to initialize a new |
@@ -91,7 +106,13 @@ type ClientConfig struct { | |||
91 | HandshakeConfig | 106 | HandshakeConfig |
92 | 107 | ||
93 | // Plugins are the plugins that can be consumed. | 108 | // Plugins are the plugins that can be consumed. |
94 | Plugins map[string]Plugin | 109 | // The implied version of this PluginSet is the Handshake.ProtocolVersion. |
110 | Plugins PluginSet | ||
111 | |||
112 | // VersionedPlugins is a map of PluginSets for specific protocol versions. | ||
113 | // These can be used to negotiate a compatible version between client and | ||
114 | // server. If this is set, Handshake.ProtocolVersion is not required. | ||
115 | VersionedPlugins map[int]PluginSet | ||
95 | 116 | ||
96 | // One of the following must be set, but not both. | 117 | // One of the following must be set, but not both. |
97 | // | 118 | // |
@@ -158,6 +179,29 @@ type ClientConfig struct { | |||
158 | // Logger is the logger that the client will used. If none is provided, | 179 | // Logger is the logger that the client will used. If none is provided, |
159 | // it will default to hclog's default logger. | 180 | // it will default to hclog's default logger. |
160 | Logger hclog.Logger | 181 | Logger hclog.Logger |
182 | |||
183 | // AutoMTLS has the client and server automatically negotiate mTLS for | ||
184 | // transport authentication. This ensures that only the original client will | ||
185 | // be allowed to connect to the server, and all other connections will be | ||
186 | // rejected. The client will also refuse to connect to any server that isn't | ||
187 | // the original instance started by the client. | ||
188 | // | ||
189 | // In this mode of operation, the client generates a one-time use tls | ||
190 | // certificate, sends the public x.509 certificate to the new server, and | ||
191 | // the server generates a one-time use tls certificate, and sends the public | ||
192 | // x.509 certificate back to the client. These are used to authenticate all | ||
193 | // rpc connections between the client and server. | ||
194 | // | ||
195 | // Setting AutoMTLS to true implies that the server must support the | ||
196 | // protocol, and correctly negotiate the tls certificates, or a connection | ||
197 | // failure will result. | ||
198 | // | ||
199 | // The client should not set TLSConfig, nor should the server set a | ||
200 | // TLSProvider, because AutoMTLS implies that a new certificate and tls | ||
201 | // configuration will be generated at startup. | ||
202 | // | ||
203 | // You cannot Reattach to a server with this option enabled. | ||
204 | AutoMTLS bool | ||
161 | } | 205 | } |
162 | 206 | ||
163 | // ReattachConfig is used to configure a client to reattach to an | 207 | // ReattachConfig is used to configure a client to reattach to an |
@@ -234,7 +278,6 @@ func CleanupClients() { | |||
234 | } | 278 | } |
235 | managedClientsLock.Unlock() | 279 | managedClientsLock.Unlock() |
236 | 280 | ||
237 | log.Println("[DEBUG] plugin: waiting for all plugin processes to complete...") | ||
238 | wg.Wait() | 281 | wg.Wait() |
239 | } | 282 | } |
240 | 283 | ||
@@ -333,6 +376,14 @@ func (c *Client) Exited() bool { | |||
333 | return c.exited | 376 | return c.exited |
334 | } | 377 | } |
335 | 378 | ||
379 | // killed is used in tests to check if a process failed to exit gracefully, and | ||
380 | // needed to be killed. | ||
381 | func (c *Client) killed() bool { | ||
382 | c.l.Lock() | ||
383 | defer c.l.Unlock() | ||
384 | return c.processKilled | ||
385 | } | ||
386 | |||
336 | // End the executing subprocess (if it is running) and perform any cleanup | 387 | // End the executing subprocess (if it is running) and perform any cleanup |
337 | // tasks necessary such as capturing any remaining logs and so on. | 388 | // tasks necessary such as capturing any remaining logs and so on. |
338 | // | 389 | // |
@@ -344,14 +395,24 @@ func (c *Client) Kill() { | |||
344 | c.l.Lock() | 395 | c.l.Lock() |
345 | process := c.process | 396 | process := c.process |
346 | addr := c.address | 397 | addr := c.address |
347 | doneCh := c.doneLogging | ||
348 | c.l.Unlock() | 398 | c.l.Unlock() |
349 | 399 | ||
350 | // If there is no process, we never started anything. Nothing to kill. | 400 | // If there is no process, there is nothing to kill. |
351 | if process == nil { | 401 | if process == nil { |
352 | return | 402 | return |
353 | } | 403 | } |
354 | 404 | ||
405 | defer func() { | ||
406 | // Wait for the all client goroutines to finish. | ||
407 | c.clientWaitGroup.Wait() | ||
408 | |||
409 | // Make sure there is no reference to the old process after it has been | ||
410 | // killed. | ||
411 | c.l.Lock() | ||
412 | c.process = nil | ||
413 | c.l.Unlock() | ||
414 | }() | ||
415 | |||
355 | // We need to check for address here. It is possible that the plugin | 416 | // We need to check for address here. It is possible that the plugin |
356 | // started (process != nil) but has no address (addr == nil) if the | 417 | // started (process != nil) but has no address (addr == nil) if the |
357 | // plugin failed at startup. If we do have an address, we need to close | 418 | // plugin failed at startup. If we do have an address, we need to close |
@@ -372,6 +433,8 @@ func (c *Client) Kill() { | |||
372 | // kill in a moment anyways. | 433 | // kill in a moment anyways. |
373 | c.logger.Warn("error closing client during Kill", "err", err) | 434 | c.logger.Warn("error closing client during Kill", "err", err) |
374 | } | 435 | } |
436 | } else { | ||
437 | c.logger.Error("client", "error", err) | ||
375 | } | 438 | } |
376 | } | 439 | } |
377 | 440 | ||
@@ -380,17 +443,20 @@ func (c *Client) Kill() { | |||
380 | // doneCh which would be closed if the process exits. | 443 | // doneCh which would be closed if the process exits. |
381 | if graceful { | 444 | if graceful { |
382 | select { | 445 | select { |
383 | case <-doneCh: | 446 | case <-c.doneCtx.Done(): |
447 | c.logger.Debug("plugin exited") | ||
384 | return | 448 | return |
385 | case <-time.After(250 * time.Millisecond): | 449 | case <-time.After(2 * time.Second): |
386 | } | 450 | } |
387 | } | 451 | } |
388 | 452 | ||
389 | // If graceful exiting failed, just kill it | 453 | // If graceful exiting failed, just kill it |
454 | c.logger.Warn("plugin failed to exit gracefully") | ||
390 | process.Kill() | 455 | process.Kill() |
391 | 456 | ||
392 | // Wait for the client to finish logging so we have a complete log | 457 | c.l.Lock() |
393 | <-doneCh | 458 | c.processKilled = true |
459 | c.l.Unlock() | ||
394 | } | 460 | } |
395 | 461 | ||
396 | // Starts the underlying subprocess, communicating with it to negotiate | 462 | // Starts the underlying subprocess, communicating with it to negotiate |
@@ -409,7 +475,7 @@ func (c *Client) Start() (addr net.Addr, err error) { | |||
409 | 475 | ||
410 | // If one of cmd or reattach isn't set, then it is an error. We wrap | 476 | // If one of cmd or reattach isn't set, then it is an error. We wrap |
411 | // this in a {} for scoping reasons, and hopeful that the escape | 477 | // this in a {} for scoping reasons, and hopeful that the escape |
412 | // analysis will pop the stock here. | 478 | // analysis will pop the stack here. |
413 | { | 479 | { |
414 | cmdSet := c.config.Cmd != nil | 480 | cmdSet := c.config.Cmd != nil |
415 | attachSet := c.config.Reattach != nil | 481 | attachSet := c.config.Reattach != nil |
@@ -423,77 +489,49 @@ func (c *Client) Start() (addr net.Addr, err error) { | |||
423 | } | 489 | } |
424 | } | 490 | } |
425 | 491 | ||
426 | // Create the logging channel for when we kill | ||
427 | c.doneLogging = make(chan struct{}) | ||
428 | // Create a context for when we kill | ||
429 | var ctxCancel context.CancelFunc | ||
430 | c.doneCtx, ctxCancel = context.WithCancel(context.Background()) | ||
431 | |||
432 | if c.config.Reattach != nil { | 492 | if c.config.Reattach != nil { |
433 | // Verify the process still exists. If not, then it is an error | 493 | return c.reattach() |
434 | p, err := os.FindProcess(c.config.Reattach.Pid) | 494 | } |
435 | if err != nil { | ||
436 | return nil, err | ||
437 | } | ||
438 | 495 | ||
439 | // Attempt to connect to the addr since on Unix systems FindProcess | 496 | if c.config.VersionedPlugins == nil { |
440 | // doesn't actually return an error if it can't find the process. | 497 | c.config.VersionedPlugins = make(map[int]PluginSet) |
441 | conn, err := net.Dial( | 498 | } |
442 | c.config.Reattach.Addr.Network(), | ||
443 | c.config.Reattach.Addr.String()) | ||
444 | if err != nil { | ||
445 | p.Kill() | ||
446 | return nil, ErrProcessNotFound | ||
447 | } | ||
448 | conn.Close() | ||
449 | |||
450 | // Goroutine to mark exit status | ||
451 | go func(pid int) { | ||
452 | // Wait for the process to die | ||
453 | pidWait(pid) | ||
454 | |||
455 | // Log so we can see it | ||
456 | c.logger.Debug("reattached plugin process exited") | ||
457 | |||
458 | // Mark it | ||
459 | c.l.Lock() | ||
460 | defer c.l.Unlock() | ||
461 | c.exited = true | ||
462 | |||
463 | // Close the logging channel since that doesn't work on reattach | ||
464 | close(c.doneLogging) | ||
465 | |||
466 | // Cancel the context | ||
467 | ctxCancel() | ||
468 | }(p.Pid) | ||
469 | |||
470 | // Set the address and process | ||
471 | c.address = c.config.Reattach.Addr | ||
472 | c.process = p | ||
473 | c.protocol = c.config.Reattach.Protocol | ||
474 | if c.protocol == "" { | ||
475 | // Default the protocol to net/rpc for backwards compatibility | ||
476 | c.protocol = ProtocolNetRPC | ||
477 | } | ||
478 | 499 | ||
479 | return c.address, nil | 500 | // handle all plugins as versioned, using the handshake config as the default. |
501 | version := int(c.config.ProtocolVersion) | ||
502 | |||
503 | // Make sure we're not overwriting a real version 0. If ProtocolVersion was | ||
504 | // non-zero, then we have to just assume the user made sure that | ||
505 | // VersionedPlugins doesn't conflict. | ||
506 | if _, ok := c.config.VersionedPlugins[version]; !ok && c.config.Plugins != nil { | ||
507 | c.config.VersionedPlugins[version] = c.config.Plugins | ||
508 | } | ||
509 | |||
510 | var versionStrings []string | ||
511 | for v := range c.config.VersionedPlugins { | ||
512 | versionStrings = append(versionStrings, strconv.Itoa(v)) | ||
480 | } | 513 | } |
481 | 514 | ||
482 | env := []string{ | 515 | env := []string{ |
483 | fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue), | 516 | fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue), |
484 | fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort), | 517 | fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort), |
485 | fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort), | 518 | fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort), |
519 | fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")), | ||
486 | } | 520 | } |
487 | 521 | ||
488 | stdout_r, stdout_w := io.Pipe() | ||
489 | stderr_r, stderr_w := io.Pipe() | ||
490 | |||
491 | cmd := c.config.Cmd | 522 | cmd := c.config.Cmd |
492 | cmd.Env = append(cmd.Env, os.Environ()...) | 523 | cmd.Env = append(cmd.Env, os.Environ()...) |
493 | cmd.Env = append(cmd.Env, env...) | 524 | cmd.Env = append(cmd.Env, env...) |
494 | cmd.Stdin = os.Stdin | 525 | cmd.Stdin = os.Stdin |
495 | cmd.Stderr = stderr_w | 526 | |
496 | cmd.Stdout = stdout_w | 527 | cmdStdout, err := cmd.StdoutPipe() |
528 | if err != nil { | ||
529 | return nil, err | ||
530 | } | ||
531 | cmdStderr, err := cmd.StderrPipe() | ||
532 | if err != nil { | ||
533 | return nil, err | ||
534 | } | ||
497 | 535 | ||
498 | if c.config.SecureConfig != nil { | 536 | if c.config.SecureConfig != nil { |
499 | if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil { | 537 | if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil { |
@@ -503,6 +541,29 @@ func (c *Client) Start() (addr net.Addr, err error) { | |||
503 | } | 541 | } |
504 | } | 542 | } |
505 | 543 | ||
544 | // Setup a temporary certificate for client/server mtls, and send the public | ||
545 | // certificate to the plugin. | ||
546 | if c.config.AutoMTLS { | ||
547 | c.logger.Info("configuring client automatic mTLS") | ||
548 | certPEM, keyPEM, err := generateCert() | ||
549 | if err != nil { | ||
550 | c.logger.Error("failed to generate client certificate", "error", err) | ||
551 | return nil, err | ||
552 | } | ||
553 | cert, err := tls.X509KeyPair(certPEM, keyPEM) | ||
554 | if err != nil { | ||
555 | c.logger.Error("failed to parse client certificate", "error", err) | ||
556 | return nil, err | ||
557 | } | ||
558 | |||
559 | cmd.Env = append(cmd.Env, fmt.Sprintf("PLUGIN_CLIENT_CERT=%s", certPEM)) | ||
560 | |||
561 | c.config.TLSConfig = &tls.Config{ | ||
562 | Certificates: []tls.Certificate{cert}, | ||
563 | ServerName: "localhost", | ||
564 | } | ||
565 | } | ||
566 | |||
506 | c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args) | 567 | c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args) |
507 | err = cmd.Start() | 568 | err = cmd.Start() |
508 | if err != nil { | 569 | if err != nil { |
@@ -511,6 +572,7 @@ func (c *Client) Start() (addr net.Addr, err error) { | |||
511 | 572 | ||
512 | // Set the process | 573 | // Set the process |
513 | c.process = cmd.Process | 574 | c.process = cmd.Process |
575 | c.logger.Debug("plugin started", "path", cmd.Path, "pid", c.process.Pid) | ||
514 | 576 | ||
515 | // Make sure the command is properly cleaned up if there is an error | 577 | // Make sure the command is properly cleaned up if there is an error |
516 | defer func() { | 578 | defer func() { |
@@ -525,27 +587,37 @@ func (c *Client) Start() (addr net.Addr, err error) { | |||
525 | } | 587 | } |
526 | }() | 588 | }() |
527 | 589 | ||
528 | // Start goroutine to wait for process to exit | 590 | // Create a context for when we kill |
529 | exitCh := make(chan struct{}) | 591 | c.doneCtx, c.ctxCancel = context.WithCancel(context.Background()) |
592 | |||
593 | c.clientWaitGroup.Add(1) | ||
530 | go func() { | 594 | go func() { |
531 | // Make sure we close the write end of our stderr/stdout so | 595 | // ensure the context is cancelled when we're done |
532 | // that the readers send EOF properly. | 596 | defer c.ctxCancel() |
533 | defer stderr_w.Close() | 597 | |
534 | defer stdout_w.Close() | 598 | defer c.clientWaitGroup.Done() |
599 | |||
600 | // get the cmd info early, since the process information will be removed | ||
601 | // in Kill. | ||
602 | pid := c.process.Pid | ||
603 | path := cmd.Path | ||
535 | 604 | ||
536 | // Wait for the command to end. | 605 | // Wait for the command to end. |
537 | cmd.Wait() | 606 | err := cmd.Wait() |
607 | |||
608 | debugMsgArgs := []interface{}{ | ||
609 | "path", path, | ||
610 | "pid", pid, | ||
611 | } | ||
612 | if err != nil { | ||
613 | debugMsgArgs = append(debugMsgArgs, | ||
614 | []interface{}{"error", err.Error()}...) | ||
615 | } | ||
538 | 616 | ||
539 | // Log and make sure to flush the logs write away | 617 | // Log and make sure to flush the logs write away |
540 | c.logger.Debug("plugin process exited", "path", cmd.Path) | 618 | c.logger.Debug("plugin process exited", debugMsgArgs...) |
541 | os.Stderr.Sync() | 619 | os.Stderr.Sync() |
542 | 620 | ||
543 | // Mark that we exited | ||
544 | close(exitCh) | ||
545 | |||
546 | // Cancel the context, marking that we exited | ||
547 | ctxCancel() | ||
548 | |||
549 | // Set that we exited, which takes a lock | 621 | // Set that we exited, which takes a lock |
550 | c.l.Lock() | 622 | c.l.Lock() |
551 | defer c.l.Unlock() | 623 | defer c.l.Unlock() |
@@ -553,32 +625,33 @@ func (c *Client) Start() (addr net.Addr, err error) { | |||
553 | }() | 625 | }() |
554 | 626 | ||
555 | // Start goroutine that logs the stderr | 627 | // Start goroutine that logs the stderr |
556 | go c.logStderr(stderr_r) | 628 | c.clientWaitGroup.Add(1) |
629 | // logStderr calls Done() | ||
630 | go c.logStderr(cmdStderr) | ||
557 | 631 | ||
558 | // Start a goroutine that is going to be reading the lines | 632 | // Start a goroutine that is going to be reading the lines |
559 | // out of stdout | 633 | // out of stdout |
560 | linesCh := make(chan []byte) | 634 | linesCh := make(chan string) |
635 | c.clientWaitGroup.Add(1) | ||
561 | go func() { | 636 | go func() { |
637 | defer c.clientWaitGroup.Done() | ||
562 | defer close(linesCh) | 638 | defer close(linesCh) |
563 | 639 | ||
564 | buf := bufio.NewReader(stdout_r) | 640 | scanner := bufio.NewScanner(cmdStdout) |
565 | for { | 641 | for scanner.Scan() { |
566 | line, err := buf.ReadBytes('\n') | 642 | linesCh <- scanner.Text() |
567 | if line != nil { | ||
568 | linesCh <- line | ||
569 | } | ||
570 | |||
571 | if err == io.EOF { | ||
572 | return | ||
573 | } | ||
574 | } | 643 | } |
575 | }() | 644 | }() |
576 | 645 | ||
577 | // Make sure after we exit we read the lines from stdout forever | 646 | // Make sure after we exit we read the lines from stdout forever |
578 | // so they don't block since it is an io.Pipe | 647 | // so they don't block since it is a pipe. |
648 | // The scanner goroutine above will close this, but track it with a wait | ||
649 | // group for completeness. | ||
650 | c.clientWaitGroup.Add(1) | ||
579 | defer func() { | 651 | defer func() { |
580 | go func() { | 652 | go func() { |
581 | for _ = range linesCh { | 653 | defer c.clientWaitGroup.Done() |
654 | for range linesCh { | ||
582 | } | 655 | } |
583 | }() | 656 | }() |
584 | }() | 657 | }() |
@@ -591,12 +664,12 @@ func (c *Client) Start() (addr net.Addr, err error) { | |||
591 | select { | 664 | select { |
592 | case <-timeout: | 665 | case <-timeout: |
593 | err = errors.New("timeout while waiting for plugin to start") | 666 | err = errors.New("timeout while waiting for plugin to start") |
594 | case <-exitCh: | 667 | case <-c.doneCtx.Done(): |
595 | err = errors.New("plugin exited before we could connect") | 668 | err = errors.New("plugin exited before we could connect") |
596 | case lineBytes := <-linesCh: | 669 | case line := <-linesCh: |
597 | // Trim the line and split by "|" in order to get the parts of | 670 | // Trim the line and split by "|" in order to get the parts of |
598 | // the output. | 671 | // the output. |
599 | line := strings.TrimSpace(string(lineBytes)) | 672 | line = strings.TrimSpace(line) |
600 | parts := strings.SplitN(line, "|", 6) | 673 | parts := strings.SplitN(line, "|", 6) |
601 | if len(parts) < 4 { | 674 | if len(parts) < 4 { |
602 | err = fmt.Errorf( | 675 | err = fmt.Errorf( |
@@ -624,20 +697,18 @@ func (c *Client) Start() (addr net.Addr, err error) { | |||
624 | } | 697 | } |
625 | } | 698 | } |
626 | 699 | ||
627 | // Parse the protocol version | 700 | // Test the API version |
628 | var protocol int64 | 701 | version, pluginSet, err := c.checkProtoVersion(parts[1]) |
629 | protocol, err = strconv.ParseInt(parts[1], 10, 0) | ||
630 | if err != nil { | 702 | if err != nil { |
631 | err = fmt.Errorf("Error parsing protocol version: %s", err) | 703 | return addr, err |
632 | return | ||
633 | } | 704 | } |
634 | 705 | ||
635 | // Test the API version | 706 | // set the Plugins value to the compatible set, so the version |
636 | if uint(protocol) != c.config.ProtocolVersion { | 707 | // doesn't need to be passed through to the ClientProtocol |
637 | err = fmt.Errorf("Incompatible API version with plugin. "+ | 708 | // implementation. |
638 | "Plugin version: %s, Core version: %d", parts[1], c.config.ProtocolVersion) | 709 | c.config.Plugins = pluginSet |
639 | return | 710 | c.negotiatedVersion = version |
640 | } | 711 | c.logger.Debug("using plugin", "version", version) |
641 | 712 | ||
642 | switch parts[2] { | 713 | switch parts[2] { |
643 | case "tcp": | 714 | case "tcp": |
@@ -665,15 +736,125 @@ func (c *Client) Start() (addr net.Addr, err error) { | |||
665 | if !found { | 736 | if !found { |
666 | err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v", | 737 | err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v", |
667 | c.protocol, c.config.AllowedProtocols) | 738 | c.protocol, c.config.AllowedProtocols) |
668 | return | 739 | return addr, err |
669 | } | 740 | } |
670 | 741 | ||
742 | // See if we have a TLS certificate from the server. | ||
743 | // Checking if the length is > 50 rules out catching the unused "extra" | ||
744 | // data returned from some older implementations. | ||
745 | if len(parts) >= 6 && len(parts[5]) > 50 { | ||
746 | err := c.loadServerCert(parts[5]) | ||
747 | if err != nil { | ||
748 | return nil, fmt.Errorf("error parsing server cert: %s", err) | ||
749 | } | ||
750 | } | ||
671 | } | 751 | } |
672 | 752 | ||
673 | c.address = addr | 753 | c.address = addr |
674 | return | 754 | return |
675 | } | 755 | } |
676 | 756 | ||
757 | // loadServerCert is used by AutoMTLS to read an x.509 cert returned by the | ||
758 | // server, and load it as the RootCA for the client TLSConfig. | ||
759 | func (c *Client) loadServerCert(cert string) error { | ||
760 | certPool := x509.NewCertPool() | ||
761 | |||
762 | asn1, err := base64.RawStdEncoding.DecodeString(cert) | ||
763 | if err != nil { | ||
764 | return err | ||
765 | } | ||
766 | |||
767 | x509Cert, err := x509.ParseCertificate([]byte(asn1)) | ||
768 | if err != nil { | ||
769 | return err | ||
770 | } | ||
771 | |||
772 | certPool.AddCert(x509Cert) | ||
773 | |||
774 | c.config.TLSConfig.RootCAs = certPool | ||
775 | return nil | ||
776 | } | ||
777 | |||
778 | func (c *Client) reattach() (net.Addr, error) { | ||
779 | // Verify the process still exists. If not, then it is an error | ||
780 | p, err := os.FindProcess(c.config.Reattach.Pid) | ||
781 | if err != nil { | ||
782 | return nil, err | ||
783 | } | ||
784 | |||
785 | // Attempt to connect to the addr since on Unix systems FindProcess | ||
786 | // doesn't actually return an error if it can't find the process. | ||
787 | conn, err := net.Dial( | ||
788 | c.config.Reattach.Addr.Network(), | ||
789 | c.config.Reattach.Addr.String()) | ||
790 | if err != nil { | ||
791 | p.Kill() | ||
792 | return nil, ErrProcessNotFound | ||
793 | } | ||
794 | conn.Close() | ||
795 | |||
796 | // Create a context for when we kill | ||
797 | c.doneCtx, c.ctxCancel = context.WithCancel(context.Background()) | ||
798 | |||
799 | c.clientWaitGroup.Add(1) | ||
800 | // Goroutine to mark exit status | ||
801 | go func(pid int) { | ||
802 | defer c.clientWaitGroup.Done() | ||
803 | |||
804 | // ensure the context is cancelled when we're done | ||
805 | defer c.ctxCancel() | ||
806 | |||
807 | // Wait for the process to die | ||
808 | pidWait(pid) | ||
809 | |||
810 | // Log so we can see it | ||
811 | c.logger.Debug("reattached plugin process exited") | ||
812 | |||
813 | // Mark it | ||
814 | c.l.Lock() | ||
815 | defer c.l.Unlock() | ||
816 | c.exited = true | ||
817 | }(p.Pid) | ||
818 | |||
819 | // Set the address and process | ||
820 | c.address = c.config.Reattach.Addr | ||
821 | c.process = p | ||
822 | c.protocol = c.config.Reattach.Protocol | ||
823 | if c.protocol == "" { | ||
824 | // Default the protocol to net/rpc for backwards compatibility | ||
825 | c.protocol = ProtocolNetRPC | ||
826 | } | ||
827 | |||
828 | return c.address, nil | ||
829 | } | ||
830 | |||
831 | // checkProtoVersion returns the negotiated version and PluginSet. | ||
832 | // This returns an error if the server returned an incompatible protocol | ||
833 | // version, or an invalid handshake response. | ||
834 | func (c *Client) checkProtoVersion(protoVersion string) (int, PluginSet, error) { | ||
835 | serverVersion, err := strconv.Atoi(protoVersion) | ||
836 | if err != nil { | ||
837 | return 0, nil, fmt.Errorf("Error parsing protocol version %q: %s", protoVersion, err) | ||
838 | } | ||
839 | |||
840 | // record these for the error message | ||
841 | var clientVersions []int | ||
842 | |||
843 | // all versions, including the legacy ProtocolVersion have been added to | ||
844 | // the versions set | ||
845 | for version, plugins := range c.config.VersionedPlugins { | ||
846 | clientVersions = append(clientVersions, version) | ||
847 | |||
848 | if serverVersion != version { | ||
849 | continue | ||
850 | } | ||
851 | return version, plugins, nil | ||
852 | } | ||
853 | |||
854 | return 0, nil, fmt.Errorf("Incompatible API version with plugin. "+ | ||
855 | "Plugin version: %d, Client versions: %d", serverVersion, clientVersions) | ||
856 | } | ||
857 | |||
677 | // ReattachConfig returns the information that must be provided to NewClient | 858 | // ReattachConfig returns the information that must be provided to NewClient |
678 | // to reattach to the plugin process that this client started. This is | 859 | // to reattach to the plugin process that this client started. This is |
679 | // useful for plugins that detach from their parent process. | 860 | // useful for plugins that detach from their parent process. |
@@ -751,44 +932,84 @@ func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) { | |||
751 | return conn, nil | 932 | return conn, nil |
752 | } | 933 | } |
753 | 934 | ||
935 | var stdErrBufferSize = 64 * 1024 | ||
936 | |||
754 | func (c *Client) logStderr(r io.Reader) { | 937 | func (c *Client) logStderr(r io.Reader) { |
755 | bufR := bufio.NewReader(r) | 938 | defer c.clientWaitGroup.Done() |
939 | l := c.logger.Named(filepath.Base(c.config.Cmd.Path)) | ||
940 | |||
941 | reader := bufio.NewReaderSize(r, stdErrBufferSize) | ||
942 | // continuation indicates the previous line was a prefix | ||
943 | continuation := false | ||
944 | |||
756 | for { | 945 | for { |
757 | line, err := bufR.ReadString('\n') | 946 | line, isPrefix, err := reader.ReadLine() |
758 | if line != "" { | 947 | switch { |
759 | c.config.Stderr.Write([]byte(line)) | 948 | case err == io.EOF: |
760 | line = strings.TrimRightFunc(line, unicode.IsSpace) | 949 | return |
950 | case err != nil: | ||
951 | l.Error("reading plugin stderr", "error", err) | ||
952 | return | ||
953 | } | ||
761 | 954 | ||
762 | l := c.logger.Named(filepath.Base(c.config.Cmd.Path)) | 955 | c.config.Stderr.Write(line) |
763 | 956 | ||
764 | entry, err := parseJSON(line) | 957 | // The line was longer than our max token size, so it's likely |
765 | // If output is not JSON format, print directly to Debug | 958 | // incomplete and won't unmarshal. |
766 | if err != nil { | 959 | if isPrefix || continuation { |
767 | l.Debug(line) | 960 | l.Debug(string(line)) |
768 | } else { | 961 | |
769 | out := flattenKVPairs(entry.KVPairs) | 962 | // if we're finishing a continued line, add the newline back in |
770 | 963 | if !isPrefix { | |
771 | l = l.With("timestamp", entry.Timestamp.Format(hclog.TimeFormat)) | 964 | c.config.Stderr.Write([]byte{'\n'}) |
772 | switch hclog.LevelFromString(entry.Level) { | ||
773 | case hclog.Trace: | ||
774 | l.Trace(entry.Message, out...) | ||
775 | case hclog.Debug: | ||
776 | l.Debug(entry.Message, out...) | ||
777 | case hclog.Info: | ||
778 | l.Info(entry.Message, out...) | ||
779 | case hclog.Warn: | ||
780 | l.Warn(entry.Message, out...) | ||
781 | case hclog.Error: | ||
782 | l.Error(entry.Message, out...) | ||
783 | } | ||
784 | } | 965 | } |
966 | |||
967 | continuation = isPrefix | ||
968 | continue | ||
785 | } | 969 | } |
786 | 970 | ||
787 | if err == io.EOF { | 971 | c.config.Stderr.Write([]byte{'\n'}) |
788 | break | 972 | |
973 | entry, err := parseJSON(line) | ||
974 | // If output is not JSON format, print directly to Debug | ||
975 | if err != nil { | ||
976 | // Attempt to infer the desired log level from the commonly used | ||
977 | // string prefixes | ||
978 | switch line := string(line); { | ||
979 | case strings.HasPrefix(line, "[TRACE]"): | ||
980 | l.Trace(line) | ||
981 | case strings.HasPrefix(line, "[DEBUG]"): | ||
982 | l.Debug(line) | ||
983 | case strings.HasPrefix(line, "[INFO]"): | ||
984 | l.Info(line) | ||
985 | case strings.HasPrefix(line, "[WARN]"): | ||
986 | l.Warn(line) | ||
987 | case strings.HasPrefix(line, "[ERROR]"): | ||
988 | l.Error(line) | ||
989 | default: | ||
990 | l.Debug(line) | ||
991 | } | ||
992 | } else { | ||
993 | out := flattenKVPairs(entry.KVPairs) | ||
994 | |||
995 | out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat)) | ||
996 | switch hclog.LevelFromString(entry.Level) { | ||
997 | case hclog.Trace: | ||
998 | l.Trace(entry.Message, out...) | ||
999 | case hclog.Debug: | ||
1000 | l.Debug(entry.Message, out...) | ||
1001 | case hclog.Info: | ||
1002 | l.Info(entry.Message, out...) | ||
1003 | case hclog.Warn: | ||
1004 | l.Warn(entry.Message, out...) | ||
1005 | case hclog.Error: | ||
1006 | l.Error(entry.Message, out...) | ||
1007 | default: | ||
1008 | // if there was no log level, it's likely this is unexpected | ||
1009 | // json from something other than hclog, and we should output | ||
1010 | // it verbatim. | ||
1011 | l.Debug(string(line)) | ||
1012 | } | ||
789 | } | 1013 | } |
790 | } | 1014 | } |
791 | |||
792 | // Flag that we've completed logging for others | ||
793 | close(c.doneLogging) | ||
794 | } | 1015 | } |