aboutsummaryrefslogblamecommitdiffhomepage
path: root/vendor/github.com/hashicorp/go-plugin/rpc_server.go
blob: 5bb18dd5db16309f838d4b0f8ddb8f99698e6a08 (plain) (tree)



































                                                                                







                                                 



















































































                                                                                               







                                                                      




























































                                                                                               
package plugin

import (
	"errors"
	"fmt"
	"io"
	"log"
	"net"
	"net/rpc"
	"sync"

	"github.com/hashicorp/yamux"
)

// RPCServer listens for network connections and then dispenses interface
// implementations over net/rpc.
//
// After setting the fields below, they shouldn't be read again directly
// from the structure which may be reading/writing them concurrently.
type RPCServer struct {
	Plugins map[string]Plugin

	// Stdout, Stderr are what this server will use instead of the
	// normal stdin/out/err. This is because due to the multi-process nature
	// of our plugin system, we can't use the normal process values so we
	// make our own custom one we pipe across.
	Stdout io.Reader
	Stderr io.Reader

	// DoneCh should be set to a non-nil channel that will be closed
	// when the control requests the RPC server to end.
	DoneCh chan<- struct{}

	lock sync.Mutex
}

// ServerProtocol impl.
func (s *RPCServer) Init() error { return nil }

// ServerProtocol impl.
func (s *RPCServer) Config() string { return "" }

// ServerProtocol impl.
func (s *RPCServer) Serve(lis net.Listener) {
	for {
		conn, err := lis.Accept()
		if err != nil {
			log.Printf("[ERR] plugin: plugin server: %s", err)
			return
		}

		go s.ServeConn(conn)
	}
}

// ServeConn runs a single connection.
//
// ServeConn blocks, serving the connection until the client hangs up.
func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
	// First create the yamux server to wrap this connection
	mux, err := yamux.Server(conn, nil)
	if err != nil {
		conn.Close()
		log.Printf("[ERR] plugin: error creating yamux server: %s", err)
		return
	}

	// Accept the control connection
	control, err := mux.Accept()
	if err != nil {
		mux.Close()
		if err != io.EOF {
			log.Printf("[ERR] plugin: error accepting control connection: %s", err)
		}

		return
	}

	// Connect the stdstreams (in, out, err)
	stdstream := make([]net.Conn, 2)
	for i, _ := range stdstream {
		stdstream[i], err = mux.Accept()
		if err != nil {
			mux.Close()
			log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
			return
		}
	}

	// Copy std streams out to the proper place
	go copyStream("stdout", stdstream[0], s.Stdout)
	go copyStream("stderr", stdstream[1], s.Stderr)

	// Create the broker and start it up
	broker := newMuxBroker(mux)
	go broker.Run()

	// Use the control connection to build the dispenser and serve the
	// connection.
	server := rpc.NewServer()
	server.RegisterName("Control", &controlServer{
		server: s,
	})
	server.RegisterName("Dispenser", &dispenseServer{
		broker:  broker,
		plugins: s.Plugins,
	})
	server.ServeConn(control)
}

// done is called internally by the control server to trigger the
// doneCh to close which is listened to by the main process to cleanly
// exit.
func (s *RPCServer) done() {
	s.lock.Lock()
	defer s.lock.Unlock()

	if s.DoneCh != nil {
		close(s.DoneCh)
		s.DoneCh = nil
	}
}

// dispenseServer dispenses variousinterface implementations for Terraform.
type controlServer struct {
	server *RPCServer
}

// Ping can be called to verify the connection (and likely the binary)
// is still alive to a plugin.
func (c *controlServer) Ping(
	null bool, response *struct{}) error {
	*response = struct{}{}
	return nil
}

func (c *controlServer) Quit(
	null bool, response *struct{}) error {
	// End the server
	c.server.done()

	// Always return true
	*response = struct{}{}

	return nil
}

// dispenseServer dispenses variousinterface implementations for Terraform.
type dispenseServer struct {
	broker  *MuxBroker
	plugins map[string]Plugin
}

func (d *dispenseServer) Dispense(
	name string, response *uint32) error {
	// Find the function to create this implementation
	p, ok := d.plugins[name]
	if !ok {
		return fmt.Errorf("unknown plugin type: %s", name)
	}

	// Create the implementation first so we know if there is an error.
	impl, err := p.Server(d.broker)
	if err != nil {
		// We turn the error into an errors error so that it works across RPC
		return errors.New(err.Error())
	}

	// Reserve an ID for our implementation
	id := d.broker.NextId()
	*response = id

	// Run the rest in a goroutine since it can only happen once this RPC
	// call returns. We wait for a connection for the plugin implementation
	// and serve it.
	go func() {
		conn, err := d.broker.Accept(id)
		if err != nil {
			log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
			return
		}

		serve(conn, "Plugin", impl)
	}()

	return nil
}

func serve(conn io.ReadWriteCloser, name string, v interface{}) {
	server := rpc.NewServer()
	if err := server.RegisterName(name, v); err != nil {
		log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
		return
	}

	server.ServeConn(conn)
}