package plugin
import (
+ "crypto/tls"
+ "encoding/base64"
"errors"
"fmt"
"io/ioutil"
"runtime"
"strconv"
"sync/atomic"
+
+ "github.com/hashicorp/go-hclog"
+
+ "google.golang.org/grpc"
)
// CoreProtocolVersion is the ProtocolVersion of the plugin system itself.
// HandshakeConfig is the configuration that must match clients.
HandshakeConfig
+ // TLSProvider is a function that returns a configured tls.Config.
+ TLSProvider func() (*tls.Config, error)
+
// Plugins are the plugins that are served.
Plugins map[string]Plugin
+
+ // GRPCServer should be non-nil to enable serving the plugins over
+ // gRPC. This is a function to create the server when needed with the
+ // given server options. The server options populated by go-plugin will
+ // be for TLS if set. You may modify the input slice.
+ //
+ // Note that the grpc.Server will automatically be registered with
+ // the gRPC health checking service. This is not optional since go-plugin
+ // relies on this to implement Ping().
+ GRPCServer func([]grpc.ServerOption) *grpc.Server
+
+ // Logger is used to pass a logger into the server. If none is provided the
+ // server will create a default logger.
+ Logger hclog.Logger
+}
+
+// Protocol returns the protocol that this server should speak.
+func (c *ServeConfig) Protocol() Protocol {
+ result := ProtocolNetRPC
+ if c.GRPCServer != nil {
+ result = ProtocolGRPC
+ }
+
+ return result
}
// Serve serves the plugins given by ServeConfig.
//
// Serve doesn't return until the plugin is done being executed. Any
-// errors will be outputted to the log.
+// errors will be outputted to os.Stderr.
//
// This is the method that plugins should call in their main() functions.
func Serve(opts *ServeConfig) {
// Logging goes to the original stderr
log.SetOutput(os.Stderr)
+ logger := opts.Logger
+ if logger == nil {
+ // internal logger to os.Stderr
+ logger = hclog.New(&hclog.LoggerOptions{
+ Level: hclog.Trace,
+ Output: os.Stderr,
+ JSONFormat: true,
+ })
+ }
+
// Create our new stdout, stderr files. These will override our built-in
// stdout/stderr so that it works across the stream boundary.
stdout_r, stdout_w, err := os.Pipe()
// Register a listener so we can accept a connection
listener, err := serverListener()
if err != nil {
- log.Printf("[ERR] plugin: plugin init: %s", err)
+ logger.Error("plugin init error", "error", err)
return
}
- defer listener.Close()
+
+ // Close the listener on return. We wrap this in a func() on purpose
+ // because the "listener" reference may change to TLS.
+ defer func() {
+ listener.Close()
+ }()
+
+ var tlsConfig *tls.Config
+ if opts.TLSProvider != nil {
+ tlsConfig, err = opts.TLSProvider()
+ if err != nil {
+ logger.Error("plugin tls init", "error", err)
+ return
+ }
+ }
// Create the channel to tell us when we're done
doneCh := make(chan struct{})
- // Create the RPC server to dispense
- server := &RPCServer{
- Plugins: opts.Plugins,
- Stdout: stdout_r,
- Stderr: stderr_r,
- DoneCh: doneCh,
+ // Build the server type
+ var server ServerProtocol
+ switch opts.Protocol() {
+ case ProtocolNetRPC:
+ // If we have a TLS configuration then we wrap the listener
+ // ourselves and do it at that level.
+ if tlsConfig != nil {
+ listener = tls.NewListener(listener, tlsConfig)
+ }
+
+ // Create the RPC server to dispense
+ server = &RPCServer{
+ Plugins: opts.Plugins,
+ Stdout: stdout_r,
+ Stderr: stderr_r,
+ DoneCh: doneCh,
+ }
+
+ case ProtocolGRPC:
+ // Create the gRPC server
+ server = &GRPCServer{
+ Plugins: opts.Plugins,
+ Server: opts.GRPCServer,
+ TLS: tlsConfig,
+ Stdout: stdout_r,
+ Stderr: stderr_r,
+ DoneCh: doneCh,
+ }
+
+ default:
+ panic("unknown server protocol: " + opts.Protocol())
}
+ // Initialize the servers
+ if err := server.Init(); err != nil {
+ logger.Error("protocol init", "error", err)
+ return
+ }
+
+ // Build the extra configuration
+ extra := ""
+ if v := server.Config(); v != "" {
+ extra = base64.StdEncoding.EncodeToString([]byte(v))
+ }
+ if extra != "" {
+ extra = "|" + extra
+ }
+
+ logger.Debug("plugin address", "network", listener.Addr().Network(), "address", listener.Addr().String())
+
// Output the address and service name to stdout so that core can bring it up.
- log.Printf("[DEBUG] plugin: plugin address: %s %s\n",
- listener.Addr().Network(), listener.Addr().String())
- fmt.Printf("%d|%d|%s|%s\n",
+ fmt.Printf("%d|%d|%s|%s|%s%s\n",
CoreProtocolVersion,
opts.ProtocolVersion,
listener.Addr().Network(),
- listener.Addr().String())
+ listener.Addr().String(),
+ opts.Protocol(),
+ extra)
os.Stdout.Sync()
// Eat the interrupts
for {
<-ch
newCount := atomic.AddInt32(&count, 1)
- log.Printf(
- "[DEBUG] plugin: received interrupt signal (count: %d). Ignoring.",
- newCount)
+ logger.Debug("plugin received interrupt signal, ignoring", "count", newCount)
}
}()
os.Stdout = stdout_w
os.Stderr = stderr_w
- // Serve
- go server.Accept(listener)
-
- // Wait for the graceful exit
+ // Accept connections and wait for completion
+ go server.Serve(listener)
<-doneCh
}