]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blob - vendor/github.com/hashicorp/go-plugin/rpc_server.go
Merge pull request #27 from terraform-providers/go-modules-2019-02-22
[github/fretlink/terraform-provider-statuscake.git] / vendor / github.com / hashicorp / go-plugin / rpc_server.go
1 package plugin
2
3 import (
4 "errors"
5 "fmt"
6 "io"
7 "log"
8 "net"
9 "net/rpc"
10 "sync"
11
12 "github.com/hashicorp/yamux"
13 )
14
15 // RPCServer listens for network connections and then dispenses interface
16 // implementations over net/rpc.
17 //
18 // After setting the fields below, they shouldn't be read again directly
19 // from the structure which may be reading/writing them concurrently.
20 type RPCServer struct {
21 Plugins map[string]Plugin
22
23 // Stdout, Stderr are what this server will use instead of the
24 // normal stdin/out/err. This is because due to the multi-process nature
25 // of our plugin system, we can't use the normal process values so we
26 // make our own custom one we pipe across.
27 Stdout io.Reader
28 Stderr io.Reader
29
30 // DoneCh should be set to a non-nil channel that will be closed
31 // when the control requests the RPC server to end.
32 DoneCh chan<- struct{}
33
34 lock sync.Mutex
35 }
36
37 // ServerProtocol impl.
38 func (s *RPCServer) Init() error { return nil }
39
40 // ServerProtocol impl.
41 func (s *RPCServer) Config() string { return "" }
42
43 // ServerProtocol impl.
44 func (s *RPCServer) Serve(lis net.Listener) {
45 for {
46 conn, err := lis.Accept()
47 if err != nil {
48 log.Printf("[ERR] plugin: plugin server: %s", err)
49 return
50 }
51
52 go s.ServeConn(conn)
53 }
54 }
55
56 // ServeConn runs a single connection.
57 //
58 // ServeConn blocks, serving the connection until the client hangs up.
59 func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
60 // First create the yamux server to wrap this connection
61 mux, err := yamux.Server(conn, nil)
62 if err != nil {
63 conn.Close()
64 log.Printf("[ERR] plugin: error creating yamux server: %s", err)
65 return
66 }
67
68 // Accept the control connection
69 control, err := mux.Accept()
70 if err != nil {
71 mux.Close()
72 if err != io.EOF {
73 log.Printf("[ERR] plugin: error accepting control connection: %s", err)
74 }
75
76 return
77 }
78
79 // Connect the stdstreams (in, out, err)
80 stdstream := make([]net.Conn, 2)
81 for i, _ := range stdstream {
82 stdstream[i], err = mux.Accept()
83 if err != nil {
84 mux.Close()
85 log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
86 return
87 }
88 }
89
90 // Copy std streams out to the proper place
91 go copyStream("stdout", stdstream[0], s.Stdout)
92 go copyStream("stderr", stdstream[1], s.Stderr)
93
94 // Create the broker and start it up
95 broker := newMuxBroker(mux)
96 go broker.Run()
97
98 // Use the control connection to build the dispenser and serve the
99 // connection.
100 server := rpc.NewServer()
101 server.RegisterName("Control", &controlServer{
102 server: s,
103 })
104 server.RegisterName("Dispenser", &dispenseServer{
105 broker: broker,
106 plugins: s.Plugins,
107 })
108 server.ServeConn(control)
109 }
110
111 // done is called internally by the control server to trigger the
112 // doneCh to close which is listened to by the main process to cleanly
113 // exit.
114 func (s *RPCServer) done() {
115 s.lock.Lock()
116 defer s.lock.Unlock()
117
118 if s.DoneCh != nil {
119 close(s.DoneCh)
120 s.DoneCh = nil
121 }
122 }
123
124 // dispenseServer dispenses variousinterface implementations for Terraform.
125 type controlServer struct {
126 server *RPCServer
127 }
128
129 // Ping can be called to verify the connection (and likely the binary)
130 // is still alive to a plugin.
131 func (c *controlServer) Ping(
132 null bool, response *struct{}) error {
133 *response = struct{}{}
134 return nil
135 }
136
137 func (c *controlServer) Quit(
138 null bool, response *struct{}) error {
139 // End the server
140 c.server.done()
141
142 // Always return true
143 *response = struct{}{}
144
145 return nil
146 }
147
148 // dispenseServer dispenses variousinterface implementations for Terraform.
149 type dispenseServer struct {
150 broker *MuxBroker
151 plugins map[string]Plugin
152 }
153
154 func (d *dispenseServer) Dispense(
155 name string, response *uint32) error {
156 // Find the function to create this implementation
157 p, ok := d.plugins[name]
158 if !ok {
159 return fmt.Errorf("unknown plugin type: %s", name)
160 }
161
162 // Create the implementation first so we know if there is an error.
163 impl, err := p.Server(d.broker)
164 if err != nil {
165 // We turn the error into an errors error so that it works across RPC
166 return errors.New(err.Error())
167 }
168
169 // Reserve an ID for our implementation
170 id := d.broker.NextId()
171 *response = id
172
173 // Run the rest in a goroutine since it can only happen once this RPC
174 // call returns. We wait for a connection for the plugin implementation
175 // and serve it.
176 go func() {
177 conn, err := d.broker.Accept(id)
178 if err != nil {
179 log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
180 return
181 }
182
183 serve(conn, "Plugin", impl)
184 }()
185
186 return nil
187 }
188
189 func serve(conn io.ReadWriteCloser, name string, v interface{}) {
190 server := rpc.NewServer()
191 if err := server.RegisterName(name, v); err != nil {
192 log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
193 return
194 }
195
196 server.ServeConn(conn)
197 }