12 "github.com/hashicorp/yamux"
15 // RPCServer listens for network connections and then dispenses interface
16 // implementations over net/rpc.
18 // After setting the fields below, they shouldn't be read again directly
19 // from the structure which may be reading/writing them concurrently.
20 type RPCServer struct {
21 Plugins map[string]Plugin
23 // Stdout, Stderr are what this server will use instead of the
24 // normal stdin/out/err. This is because due to the multi-process nature
25 // of our plugin system, we can't use the normal process values so we
26 // make our own custom one we pipe across.
30 // DoneCh should be set to a non-nil channel that will be closed
31 // when the control requests the RPC server to end.
32 DoneCh chan<- struct{}
37 // Accept accepts connections on a listener and serves requests for
38 // each incoming connection. Accept blocks; the caller typically invokes
39 // it in a go statement.
40 func (s *RPCServer) Accept(lis net.Listener) {
42 conn, err := lis.Accept()
44 log.Printf("[ERR] plugin: plugin server: %s", err)
52 // ServeConn runs a single connection.
54 // ServeConn blocks, serving the connection until the client hangs up.
55 func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
56 // First create the yamux server to wrap this connection
57 mux, err := yamux.Server(conn, nil)
60 log.Printf("[ERR] plugin: error creating yamux server: %s", err)
64 // Accept the control connection
65 control, err := mux.Accept()
69 log.Printf("[ERR] plugin: error accepting control connection: %s", err)
75 // Connect the stdstreams (in, out, err)
76 stdstream := make([]net.Conn, 2)
77 for i, _ := range stdstream {
78 stdstream[i], err = mux.Accept()
81 log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
86 // Copy std streams out to the proper place
87 go copyStream("stdout", stdstream[0], s.Stdout)
88 go copyStream("stderr", stdstream[1], s.Stderr)
90 // Create the broker and start it up
91 broker := newMuxBroker(mux)
94 // Use the control connection to build the dispenser and serve the
96 server := rpc.NewServer()
97 server.RegisterName("Control", &controlServer{
100 server.RegisterName("Dispenser", &dispenseServer{
104 server.ServeConn(control)
107 // done is called internally by the control server to trigger the
108 // doneCh to close which is listened to by the main process to cleanly
110 func (s *RPCServer) done() {
112 defer s.lock.Unlock()
120 // dispenseServer dispenses variousinterface implementations for Terraform.
121 type controlServer struct {
125 func (c *controlServer) Quit(
126 null bool, response *struct{}) error {
130 // Always return true
131 *response = struct{}{}
136 // dispenseServer dispenses variousinterface implementations for Terraform.
137 type dispenseServer struct {
139 plugins map[string]Plugin
142 func (d *dispenseServer) Dispense(
143 name string, response *uint32) error {
144 // Find the function to create this implementation
145 p, ok := d.plugins[name]
147 return fmt.Errorf("unknown plugin type: %s", name)
150 // Create the implementation first so we know if there is an error.
151 impl, err := p.Server(d.broker)
153 // We turn the error into an errors error so that it works across RPC
154 return errors.New(err.Error())
157 // Reserve an ID for our implementation
158 id := d.broker.NextId()
161 // Run the rest in a goroutine since it can only happen once this RPC
162 // call returns. We wait for a connection for the plugin implementation
165 conn, err := d.broker.Accept(id)
167 log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
171 serve(conn, "Plugin", impl)
177 func serve(conn io.ReadWriteCloser, name string, v interface{}) {
178 server := rpc.NewServer()
179 if err := server.RegisterName(name, v); err != nil {
180 log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
184 server.ServeConn(conn)