aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/github.com/hashicorp/go-plugin/client.go
diff options
context:
space:
mode:
authorNathan Dench <ndenc2@gmail.com>2019-05-24 15:16:44 +1000
committerNathan Dench <ndenc2@gmail.com>2019-05-24 15:16:44 +1000
commit107c1cdb09c575aa2f61d97f48d8587eb6bada4c (patch)
treeca7d008643efc555c388baeaf1d986e0b6b3e28c /vendor/github.com/hashicorp/go-plugin/client.go
parent844b5a68d8af4791755b8f0ad293cc99f5959183 (diff)
downloadterraform-provider-statuscake-107c1cdb09c575aa2f61d97f48d8587eb6bada4c.tar.gz
terraform-provider-statuscake-107c1cdb09c575aa2f61d97f48d8587eb6bada4c.tar.zst
terraform-provider-statuscake-107c1cdb09c575aa2f61d97f48d8587eb6bada4c.zip
Upgrade to 0.12
Diffstat (limited to 'vendor/github.com/hashicorp/go-plugin/client.go')
-rw-r--r--vendor/github.com/hashicorp/go-plugin/client.go523
1 files changed, 372 insertions, 151 deletions
diff --git a/vendor/github.com/hashicorp/go-plugin/client.go b/vendor/github.com/hashicorp/go-plugin/client.go
index b3e3b78..679e10a 100644
--- a/vendor/github.com/hashicorp/go-plugin/client.go
+++ b/vendor/github.com/hashicorp/go-plugin/client.go
@@ -5,12 +5,13 @@ import (
5 "context" 5 "context"
6 "crypto/subtle" 6 "crypto/subtle"
7 "crypto/tls" 7 "crypto/tls"
8 "crypto/x509"
9 "encoding/base64"
8 "errors" 10 "errors"
9 "fmt" 11 "fmt"
10 "hash" 12 "hash"
11 "io" 13 "io"
12 "io/ioutil" 14 "io/ioutil"
13 "log"
14 "net" 15 "net"
15 "os" 16 "os"
16 "os/exec" 17 "os/exec"
@@ -20,7 +21,6 @@ import (
20 "sync" 21 "sync"
21 "sync/atomic" 22 "sync/atomic"
22 "time" 23 "time"
23 "unicode"
24 24
25 hclog "github.com/hashicorp/go-hclog" 25 hclog "github.com/hashicorp/go-hclog"
26) 26)
@@ -71,16 +71,31 @@ var (
71// 71//
72// See NewClient and ClientConfig for using a Client. 72// See NewClient and ClientConfig for using a Client.
73type Client struct { 73type Client struct {
74 config *ClientConfig 74 config *ClientConfig
75 exited bool 75 exited bool
76 doneLogging chan struct{} 76 l sync.Mutex
77 l sync.Mutex 77 address net.Addr
78 address net.Addr 78 process *os.Process
79 process *os.Process 79 client ClientProtocol
80 client ClientProtocol 80 protocol Protocol
81 protocol Protocol 81 logger hclog.Logger
82 logger hclog.Logger 82 doneCtx context.Context
83 doneCtx context.Context 83 ctxCancel context.CancelFunc
84 negotiatedVersion int
85
86 // clientWaitGroup is used to manage the lifecycle of the plugin management
87 // goroutines.
88 clientWaitGroup sync.WaitGroup
89
90 // processKilled is used for testing only, to flag when the process was
91 // forcefully killed.
92 processKilled bool
93}
94
95// NegotiatedVersion returns the protocol version negotiated with the server.
96// This is only valid after Start() is called.
97func (c *Client) NegotiatedVersion() int {
98 return c.negotiatedVersion
84} 99}
85 100
86// ClientConfig is the configuration used to initialize a new 101// ClientConfig is the configuration used to initialize a new
@@ -91,7 +106,13 @@ type ClientConfig struct {
91 HandshakeConfig 106 HandshakeConfig
92 107
93 // Plugins are the plugins that can be consumed. 108 // Plugins are the plugins that can be consumed.
94 Plugins map[string]Plugin 109 // The implied version of this PluginSet is the Handshake.ProtocolVersion.
110 Plugins PluginSet
111
112 // VersionedPlugins is a map of PluginSets for specific protocol versions.
113 // These can be used to negotiate a compatible version between client and
114 // server. If this is set, Handshake.ProtocolVersion is not required.
115 VersionedPlugins map[int]PluginSet
95 116
96 // One of the following must be set, but not both. 117 // One of the following must be set, but not both.
97 // 118 //
@@ -158,6 +179,29 @@ type ClientConfig struct {
158 // Logger is the logger that the client will used. If none is provided, 179 // Logger is the logger that the client will used. If none is provided,
159 // it will default to hclog's default logger. 180 // it will default to hclog's default logger.
160 Logger hclog.Logger 181 Logger hclog.Logger
182
183 // AutoMTLS has the client and server automatically negotiate mTLS for
184 // transport authentication. This ensures that only the original client will
185 // be allowed to connect to the server, and all other connections will be
186 // rejected. The client will also refuse to connect to any server that isn't
187 // the original instance started by the client.
188 //
189 // In this mode of operation, the client generates a one-time use tls
190 // certificate, sends the public x.509 certificate to the new server, and
191 // the server generates a one-time use tls certificate, and sends the public
192 // x.509 certificate back to the client. These are used to authenticate all
193 // rpc connections between the client and server.
194 //
195 // Setting AutoMTLS to true implies that the server must support the
196 // protocol, and correctly negotiate the tls certificates, or a connection
197 // failure will result.
198 //
199 // The client should not set TLSConfig, nor should the server set a
200 // TLSProvider, because AutoMTLS implies that a new certificate and tls
201 // configuration will be generated at startup.
202 //
203 // You cannot Reattach to a server with this option enabled.
204 AutoMTLS bool
161} 205}
162 206
163// ReattachConfig is used to configure a client to reattach to an 207// ReattachConfig is used to configure a client to reattach to an
@@ -234,7 +278,6 @@ func CleanupClients() {
234 } 278 }
235 managedClientsLock.Unlock() 279 managedClientsLock.Unlock()
236 280
237 log.Println("[DEBUG] plugin: waiting for all plugin processes to complete...")
238 wg.Wait() 281 wg.Wait()
239} 282}
240 283
@@ -333,6 +376,14 @@ func (c *Client) Exited() bool {
333 return c.exited 376 return c.exited
334} 377}
335 378
379// killed is used in tests to check if a process failed to exit gracefully, and
380// needed to be killed.
381func (c *Client) killed() bool {
382 c.l.Lock()
383 defer c.l.Unlock()
384 return c.processKilled
385}
386
336// End the executing subprocess (if it is running) and perform any cleanup 387// End the executing subprocess (if it is running) and perform any cleanup
337// tasks necessary such as capturing any remaining logs and so on. 388// tasks necessary such as capturing any remaining logs and so on.
338// 389//
@@ -344,14 +395,24 @@ func (c *Client) Kill() {
344 c.l.Lock() 395 c.l.Lock()
345 process := c.process 396 process := c.process
346 addr := c.address 397 addr := c.address
347 doneCh := c.doneLogging
348 c.l.Unlock() 398 c.l.Unlock()
349 399
350 // If there is no process, we never started anything. Nothing to kill. 400 // If there is no process, there is nothing to kill.
351 if process == nil { 401 if process == nil {
352 return 402 return
353 } 403 }
354 404
405 defer func() {
406 // Wait for the all client goroutines to finish.
407 c.clientWaitGroup.Wait()
408
409 // Make sure there is no reference to the old process after it has been
410 // killed.
411 c.l.Lock()
412 c.process = nil
413 c.l.Unlock()
414 }()
415
355 // We need to check for address here. It is possible that the plugin 416 // We need to check for address here. It is possible that the plugin
356 // started (process != nil) but has no address (addr == nil) if the 417 // started (process != nil) but has no address (addr == nil) if the
357 // plugin failed at startup. If we do have an address, we need to close 418 // plugin failed at startup. If we do have an address, we need to close
@@ -372,6 +433,8 @@ func (c *Client) Kill() {
372 // kill in a moment anyways. 433 // kill in a moment anyways.
373 c.logger.Warn("error closing client during Kill", "err", err) 434 c.logger.Warn("error closing client during Kill", "err", err)
374 } 435 }
436 } else {
437 c.logger.Error("client", "error", err)
375 } 438 }
376 } 439 }
377 440
@@ -380,17 +443,20 @@ func (c *Client) Kill() {
380 // doneCh which would be closed if the process exits. 443 // doneCh which would be closed if the process exits.
381 if graceful { 444 if graceful {
382 select { 445 select {
383 case <-doneCh: 446 case <-c.doneCtx.Done():
447 c.logger.Debug("plugin exited")
384 return 448 return
385 case <-time.After(250 * time.Millisecond): 449 case <-time.After(2 * time.Second):
386 } 450 }
387 } 451 }
388 452
389 // If graceful exiting failed, just kill it 453 // If graceful exiting failed, just kill it
454 c.logger.Warn("plugin failed to exit gracefully")
390 process.Kill() 455 process.Kill()
391 456
392 // Wait for the client to finish logging so we have a complete log 457 c.l.Lock()
393 <-doneCh 458 c.processKilled = true
459 c.l.Unlock()
394} 460}
395 461
396// Starts the underlying subprocess, communicating with it to negotiate 462// Starts the underlying subprocess, communicating with it to negotiate
@@ -409,7 +475,7 @@ func (c *Client) Start() (addr net.Addr, err error) {
409 475
410 // If one of cmd or reattach isn't set, then it is an error. We wrap 476 // If one of cmd or reattach isn't set, then it is an error. We wrap
411 // this in a {} for scoping reasons, and hopeful that the escape 477 // this in a {} for scoping reasons, and hopeful that the escape
412 // analysis will pop the stock here. 478 // analysis will pop the stack here.
413 { 479 {
414 cmdSet := c.config.Cmd != nil 480 cmdSet := c.config.Cmd != nil
415 attachSet := c.config.Reattach != nil 481 attachSet := c.config.Reattach != nil
@@ -423,77 +489,49 @@ func (c *Client) Start() (addr net.Addr, err error) {
423 } 489 }
424 } 490 }
425 491
426 // Create the logging channel for when we kill
427 c.doneLogging = make(chan struct{})
428 // Create a context for when we kill
429 var ctxCancel context.CancelFunc
430 c.doneCtx, ctxCancel = context.WithCancel(context.Background())
431
432 if c.config.Reattach != nil { 492 if c.config.Reattach != nil {
433 // Verify the process still exists. If not, then it is an error 493 return c.reattach()
434 p, err := os.FindProcess(c.config.Reattach.Pid) 494 }
435 if err != nil {
436 return nil, err
437 }
438 495
439 // Attempt to connect to the addr since on Unix systems FindProcess 496 if c.config.VersionedPlugins == nil {
440 // doesn't actually return an error if it can't find the process. 497 c.config.VersionedPlugins = make(map[int]PluginSet)
441 conn, err := net.Dial( 498 }
442 c.config.Reattach.Addr.Network(),
443 c.config.Reattach.Addr.String())
444 if err != nil {
445 p.Kill()
446 return nil, ErrProcessNotFound
447 }
448 conn.Close()
449
450 // Goroutine to mark exit status
451 go func(pid int) {
452 // Wait for the process to die
453 pidWait(pid)
454
455 // Log so we can see it
456 c.logger.Debug("reattached plugin process exited")
457
458 // Mark it
459 c.l.Lock()
460 defer c.l.Unlock()
461 c.exited = true
462
463 // Close the logging channel since that doesn't work on reattach
464 close(c.doneLogging)
465
466 // Cancel the context
467 ctxCancel()
468 }(p.Pid)
469
470 // Set the address and process
471 c.address = c.config.Reattach.Addr
472 c.process = p
473 c.protocol = c.config.Reattach.Protocol
474 if c.protocol == "" {
475 // Default the protocol to net/rpc for backwards compatibility
476 c.protocol = ProtocolNetRPC
477 }
478 499
479 return c.address, nil 500 // handle all plugins as versioned, using the handshake config as the default.
501 version := int(c.config.ProtocolVersion)
502
503 // Make sure we're not overwriting a real version 0. If ProtocolVersion was
504 // non-zero, then we have to just assume the user made sure that
505 // VersionedPlugins doesn't conflict.
506 if _, ok := c.config.VersionedPlugins[version]; !ok && c.config.Plugins != nil {
507 c.config.VersionedPlugins[version] = c.config.Plugins
508 }
509
510 var versionStrings []string
511 for v := range c.config.VersionedPlugins {
512 versionStrings = append(versionStrings, strconv.Itoa(v))
480 } 513 }
481 514
482 env := []string{ 515 env := []string{
483 fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue), 516 fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue),
484 fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort), 517 fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort),
485 fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort), 518 fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort),
519 fmt.Sprintf("PLUGIN_PROTOCOL_VERSIONS=%s", strings.Join(versionStrings, ",")),
486 } 520 }
487 521
488 stdout_r, stdout_w := io.Pipe()
489 stderr_r, stderr_w := io.Pipe()
490
491 cmd := c.config.Cmd 522 cmd := c.config.Cmd
492 cmd.Env = append(cmd.Env, os.Environ()...) 523 cmd.Env = append(cmd.Env, os.Environ()...)
493 cmd.Env = append(cmd.Env, env...) 524 cmd.Env = append(cmd.Env, env...)
494 cmd.Stdin = os.Stdin 525 cmd.Stdin = os.Stdin
495 cmd.Stderr = stderr_w 526
496 cmd.Stdout = stdout_w 527 cmdStdout, err := cmd.StdoutPipe()
528 if err != nil {
529 return nil, err
530 }
531 cmdStderr, err := cmd.StderrPipe()
532 if err != nil {
533 return nil, err
534 }
497 535
498 if c.config.SecureConfig != nil { 536 if c.config.SecureConfig != nil {
499 if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil { 537 if ok, err := c.config.SecureConfig.Check(cmd.Path); err != nil {
@@ -503,6 +541,29 @@ func (c *Client) Start() (addr net.Addr, err error) {
503 } 541 }
504 } 542 }
505 543
544 // Setup a temporary certificate for client/server mtls, and send the public
545 // certificate to the plugin.
546 if c.config.AutoMTLS {
547 c.logger.Info("configuring client automatic mTLS")
548 certPEM, keyPEM, err := generateCert()
549 if err != nil {
550 c.logger.Error("failed to generate client certificate", "error", err)
551 return nil, err
552 }
553 cert, err := tls.X509KeyPair(certPEM, keyPEM)
554 if err != nil {
555 c.logger.Error("failed to parse client certificate", "error", err)
556 return nil, err
557 }
558
559 cmd.Env = append(cmd.Env, fmt.Sprintf("PLUGIN_CLIENT_CERT=%s", certPEM))
560
561 c.config.TLSConfig = &tls.Config{
562 Certificates: []tls.Certificate{cert},
563 ServerName: "localhost",
564 }
565 }
566
506 c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args) 567 c.logger.Debug("starting plugin", "path", cmd.Path, "args", cmd.Args)
507 err = cmd.Start() 568 err = cmd.Start()
508 if err != nil { 569 if err != nil {
@@ -511,6 +572,7 @@ func (c *Client) Start() (addr net.Addr, err error) {
511 572
512 // Set the process 573 // Set the process
513 c.process = cmd.Process 574 c.process = cmd.Process
575 c.logger.Debug("plugin started", "path", cmd.Path, "pid", c.process.Pid)
514 576
515 // Make sure the command is properly cleaned up if there is an error 577 // Make sure the command is properly cleaned up if there is an error
516 defer func() { 578 defer func() {
@@ -525,27 +587,37 @@ func (c *Client) Start() (addr net.Addr, err error) {
525 } 587 }
526 }() 588 }()
527 589
528 // Start goroutine to wait for process to exit 590 // Create a context for when we kill
529 exitCh := make(chan struct{}) 591 c.doneCtx, c.ctxCancel = context.WithCancel(context.Background())
592
593 c.clientWaitGroup.Add(1)
530 go func() { 594 go func() {
531 // Make sure we close the write end of our stderr/stdout so 595 // ensure the context is cancelled when we're done
532 // that the readers send EOF properly. 596 defer c.ctxCancel()
533 defer stderr_w.Close() 597
534 defer stdout_w.Close() 598 defer c.clientWaitGroup.Done()
599
600 // get the cmd info early, since the process information will be removed
601 // in Kill.
602 pid := c.process.Pid
603 path := cmd.Path
535 604
536 // Wait for the command to end. 605 // Wait for the command to end.
537 cmd.Wait() 606 err := cmd.Wait()
607
608 debugMsgArgs := []interface{}{
609 "path", path,
610 "pid", pid,
611 }
612 if err != nil {
613 debugMsgArgs = append(debugMsgArgs,
614 []interface{}{"error", err.Error()}...)
615 }
538 616
539 // Log and make sure to flush the logs write away 617 // Log and make sure to flush the logs write away
540 c.logger.Debug("plugin process exited", "path", cmd.Path) 618 c.logger.Debug("plugin process exited", debugMsgArgs...)
541 os.Stderr.Sync() 619 os.Stderr.Sync()
542 620
543 // Mark that we exited
544 close(exitCh)
545
546 // Cancel the context, marking that we exited
547 ctxCancel()
548
549 // Set that we exited, which takes a lock 621 // Set that we exited, which takes a lock
550 c.l.Lock() 622 c.l.Lock()
551 defer c.l.Unlock() 623 defer c.l.Unlock()
@@ -553,32 +625,33 @@ func (c *Client) Start() (addr net.Addr, err error) {
553 }() 625 }()
554 626
555 // Start goroutine that logs the stderr 627 // Start goroutine that logs the stderr
556 go c.logStderr(stderr_r) 628 c.clientWaitGroup.Add(1)
629 // logStderr calls Done()
630 go c.logStderr(cmdStderr)
557 631
558 // Start a goroutine that is going to be reading the lines 632 // Start a goroutine that is going to be reading the lines
559 // out of stdout 633 // out of stdout
560 linesCh := make(chan []byte) 634 linesCh := make(chan string)
635 c.clientWaitGroup.Add(1)
561 go func() { 636 go func() {
637 defer c.clientWaitGroup.Done()
562 defer close(linesCh) 638 defer close(linesCh)
563 639
564 buf := bufio.NewReader(stdout_r) 640 scanner := bufio.NewScanner(cmdStdout)
565 for { 641 for scanner.Scan() {
566 line, err := buf.ReadBytes('\n') 642 linesCh <- scanner.Text()
567 if line != nil {
568 linesCh <- line
569 }
570
571 if err == io.EOF {
572 return
573 }
574 } 643 }
575 }() 644 }()
576 645
577 // Make sure after we exit we read the lines from stdout forever 646 // Make sure after we exit we read the lines from stdout forever
578 // so they don't block since it is an io.Pipe 647 // so they don't block since it is a pipe.
648 // The scanner goroutine above will close this, but track it with a wait
649 // group for completeness.
650 c.clientWaitGroup.Add(1)
579 defer func() { 651 defer func() {
580 go func() { 652 go func() {
581 for _ = range linesCh { 653 defer c.clientWaitGroup.Done()
654 for range linesCh {
582 } 655 }
583 }() 656 }()
584 }() 657 }()
@@ -591,12 +664,12 @@ func (c *Client) Start() (addr net.Addr, err error) {
591 select { 664 select {
592 case <-timeout: 665 case <-timeout:
593 err = errors.New("timeout while waiting for plugin to start") 666 err = errors.New("timeout while waiting for plugin to start")
594 case <-exitCh: 667 case <-c.doneCtx.Done():
595 err = errors.New("plugin exited before we could connect") 668 err = errors.New("plugin exited before we could connect")
596 case lineBytes := <-linesCh: 669 case line := <-linesCh:
597 // Trim the line and split by "|" in order to get the parts of 670 // Trim the line and split by "|" in order to get the parts of
598 // the output. 671 // the output.
599 line := strings.TrimSpace(string(lineBytes)) 672 line = strings.TrimSpace(line)
600 parts := strings.SplitN(line, "|", 6) 673 parts := strings.SplitN(line, "|", 6)
601 if len(parts) < 4 { 674 if len(parts) < 4 {
602 err = fmt.Errorf( 675 err = fmt.Errorf(
@@ -624,20 +697,18 @@ func (c *Client) Start() (addr net.Addr, err error) {
624 } 697 }
625 } 698 }
626 699
627 // Parse the protocol version 700 // Test the API version
628 var protocol int64 701 version, pluginSet, err := c.checkProtoVersion(parts[1])
629 protocol, err = strconv.ParseInt(parts[1], 10, 0)
630 if err != nil { 702 if err != nil {
631 err = fmt.Errorf("Error parsing protocol version: %s", err) 703 return addr, err
632 return
633 } 704 }
634 705
635 // Test the API version 706 // set the Plugins value to the compatible set, so the version
636 if uint(protocol) != c.config.ProtocolVersion { 707 // doesn't need to be passed through to the ClientProtocol
637 err = fmt.Errorf("Incompatible API version with plugin. "+ 708 // implementation.
638 "Plugin version: %s, Core version: %d", parts[1], c.config.ProtocolVersion) 709 c.config.Plugins = pluginSet
639 return 710 c.negotiatedVersion = version
640 } 711 c.logger.Debug("using plugin", "version", version)
641 712
642 switch parts[2] { 713 switch parts[2] {
643 case "tcp": 714 case "tcp":
@@ -665,15 +736,125 @@ func (c *Client) Start() (addr net.Addr, err error) {
665 if !found { 736 if !found {
666 err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v", 737 err = fmt.Errorf("Unsupported plugin protocol %q. Supported: %v",
667 c.protocol, c.config.AllowedProtocols) 738 c.protocol, c.config.AllowedProtocols)
668 return 739 return addr, err
669 } 740 }
670 741
742 // See if we have a TLS certificate from the server.
743 // Checking if the length is > 50 rules out catching the unused "extra"
744 // data returned from some older implementations.
745 if len(parts) >= 6 && len(parts[5]) > 50 {
746 err := c.loadServerCert(parts[5])
747 if err != nil {
748 return nil, fmt.Errorf("error parsing server cert: %s", err)
749 }
750 }
671 } 751 }
672 752
673 c.address = addr 753 c.address = addr
674 return 754 return
675} 755}
676 756
757// loadServerCert is used by AutoMTLS to read an x.509 cert returned by the
758// server, and load it as the RootCA for the client TLSConfig.
759func (c *Client) loadServerCert(cert string) error {
760 certPool := x509.NewCertPool()
761
762 asn1, err := base64.RawStdEncoding.DecodeString(cert)
763 if err != nil {
764 return err
765 }
766
767 x509Cert, err := x509.ParseCertificate([]byte(asn1))
768 if err != nil {
769 return err
770 }
771
772 certPool.AddCert(x509Cert)
773
774 c.config.TLSConfig.RootCAs = certPool
775 return nil
776}
777
778func (c *Client) reattach() (net.Addr, error) {
779 // Verify the process still exists. If not, then it is an error
780 p, err := os.FindProcess(c.config.Reattach.Pid)
781 if err != nil {
782 return nil, err
783 }
784
785 // Attempt to connect to the addr since on Unix systems FindProcess
786 // doesn't actually return an error if it can't find the process.
787 conn, err := net.Dial(
788 c.config.Reattach.Addr.Network(),
789 c.config.Reattach.Addr.String())
790 if err != nil {
791 p.Kill()
792 return nil, ErrProcessNotFound
793 }
794 conn.Close()
795
796 // Create a context for when we kill
797 c.doneCtx, c.ctxCancel = context.WithCancel(context.Background())
798
799 c.clientWaitGroup.Add(1)
800 // Goroutine to mark exit status
801 go func(pid int) {
802 defer c.clientWaitGroup.Done()
803
804 // ensure the context is cancelled when we're done
805 defer c.ctxCancel()
806
807 // Wait for the process to die
808 pidWait(pid)
809
810 // Log so we can see it
811 c.logger.Debug("reattached plugin process exited")
812
813 // Mark it
814 c.l.Lock()
815 defer c.l.Unlock()
816 c.exited = true
817 }(p.Pid)
818
819 // Set the address and process
820 c.address = c.config.Reattach.Addr
821 c.process = p
822 c.protocol = c.config.Reattach.Protocol
823 if c.protocol == "" {
824 // Default the protocol to net/rpc for backwards compatibility
825 c.protocol = ProtocolNetRPC
826 }
827
828 return c.address, nil
829}
830
831// checkProtoVersion returns the negotiated version and PluginSet.
832// This returns an error if the server returned an incompatible protocol
833// version, or an invalid handshake response.
834func (c *Client) checkProtoVersion(protoVersion string) (int, PluginSet, error) {
835 serverVersion, err := strconv.Atoi(protoVersion)
836 if err != nil {
837 return 0, nil, fmt.Errorf("Error parsing protocol version %q: %s", protoVersion, err)
838 }
839
840 // record these for the error message
841 var clientVersions []int
842
843 // all versions, including the legacy ProtocolVersion have been added to
844 // the versions set
845 for version, plugins := range c.config.VersionedPlugins {
846 clientVersions = append(clientVersions, version)
847
848 if serverVersion != version {
849 continue
850 }
851 return version, plugins, nil
852 }
853
854 return 0, nil, fmt.Errorf("Incompatible API version with plugin. "+
855 "Plugin version: %d, Client versions: %d", serverVersion, clientVersions)
856}
857
677// ReattachConfig returns the information that must be provided to NewClient 858// ReattachConfig returns the information that must be provided to NewClient
678// to reattach to the plugin process that this client started. This is 859// to reattach to the plugin process that this client started. This is
679// useful for plugins that detach from their parent process. 860// useful for plugins that detach from their parent process.
@@ -751,44 +932,84 @@ func (c *Client) dialer(_ string, timeout time.Duration) (net.Conn, error) {
751 return conn, nil 932 return conn, nil
752} 933}
753 934
935var stdErrBufferSize = 64 * 1024
936
754func (c *Client) logStderr(r io.Reader) { 937func (c *Client) logStderr(r io.Reader) {
755 bufR := bufio.NewReader(r) 938 defer c.clientWaitGroup.Done()
939 l := c.logger.Named(filepath.Base(c.config.Cmd.Path))
940
941 reader := bufio.NewReaderSize(r, stdErrBufferSize)
942 // continuation indicates the previous line was a prefix
943 continuation := false
944
756 for { 945 for {
757 line, err := bufR.ReadString('\n') 946 line, isPrefix, err := reader.ReadLine()
758 if line != "" { 947 switch {
759 c.config.Stderr.Write([]byte(line)) 948 case err == io.EOF:
760 line = strings.TrimRightFunc(line, unicode.IsSpace) 949 return
950 case err != nil:
951 l.Error("reading plugin stderr", "error", err)
952 return
953 }
761 954
762 l := c.logger.Named(filepath.Base(c.config.Cmd.Path)) 955 c.config.Stderr.Write(line)
763 956
764 entry, err := parseJSON(line) 957 // The line was longer than our max token size, so it's likely
765 // If output is not JSON format, print directly to Debug 958 // incomplete and won't unmarshal.
766 if err != nil { 959 if isPrefix || continuation {
767 l.Debug(line) 960 l.Debug(string(line))
768 } else { 961
769 out := flattenKVPairs(entry.KVPairs) 962 // if we're finishing a continued line, add the newline back in
770 963 if !isPrefix {
771 l = l.With("timestamp", entry.Timestamp.Format(hclog.TimeFormat)) 964 c.config.Stderr.Write([]byte{'\n'})
772 switch hclog.LevelFromString(entry.Level) {
773 case hclog.Trace:
774 l.Trace(entry.Message, out...)
775 case hclog.Debug:
776 l.Debug(entry.Message, out...)
777 case hclog.Info:
778 l.Info(entry.Message, out...)
779 case hclog.Warn:
780 l.Warn(entry.Message, out...)
781 case hclog.Error:
782 l.Error(entry.Message, out...)
783 }
784 } 965 }
966
967 continuation = isPrefix
968 continue
785 } 969 }
786 970
787 if err == io.EOF { 971 c.config.Stderr.Write([]byte{'\n'})
788 break 972
973 entry, err := parseJSON(line)
974 // If output is not JSON format, print directly to Debug
975 if err != nil {
976 // Attempt to infer the desired log level from the commonly used
977 // string prefixes
978 switch line := string(line); {
979 case strings.HasPrefix(line, "[TRACE]"):
980 l.Trace(line)
981 case strings.HasPrefix(line, "[DEBUG]"):
982 l.Debug(line)
983 case strings.HasPrefix(line, "[INFO]"):
984 l.Info(line)
985 case strings.HasPrefix(line, "[WARN]"):
986 l.Warn(line)
987 case strings.HasPrefix(line, "[ERROR]"):
988 l.Error(line)
989 default:
990 l.Debug(line)
991 }
992 } else {
993 out := flattenKVPairs(entry.KVPairs)
994
995 out = append(out, "timestamp", entry.Timestamp.Format(hclog.TimeFormat))
996 switch hclog.LevelFromString(entry.Level) {
997 case hclog.Trace:
998 l.Trace(entry.Message, out...)
999 case hclog.Debug:
1000 l.Debug(entry.Message, out...)
1001 case hclog.Info:
1002 l.Info(entry.Message, out...)
1003 case hclog.Warn:
1004 l.Warn(entry.Message, out...)
1005 case hclog.Error:
1006 l.Error(entry.Message, out...)
1007 default:
1008 // if there was no log level, it's likely this is unexpected
1009 // json from something other than hclog, and we should output
1010 // it verbatim.
1011 l.Debug(string(line))
1012 }
789 } 1013 }
790 } 1014 }
791
792 // Flag that we've completed logging for others
793 close(c.doneLogging)
794} 1015}