]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blob - vendor/github.com/hashicorp/go-plugin/rpc_server.go
Initial transfer of provider code
[github/fretlink/terraform-provider-statuscake.git] / vendor / github.com / hashicorp / go-plugin / rpc_server.go
1 package plugin
2
3 import (
4 "errors"
5 "fmt"
6 "io"
7 "log"
8 "net"
9 "net/rpc"
10 "sync"
11
12 "github.com/hashicorp/yamux"
13 )
14
15 // RPCServer listens for network connections and then dispenses interface
16 // implementations over net/rpc.
17 //
18 // After setting the fields below, they shouldn't be read again directly
19 // from the structure which may be reading/writing them concurrently.
20 type RPCServer struct {
21 Plugins map[string]Plugin
22
23 // Stdout, Stderr are what this server will use instead of the
24 // normal stdin/out/err. This is because due to the multi-process nature
25 // of our plugin system, we can't use the normal process values so we
26 // make our own custom one we pipe across.
27 Stdout io.Reader
28 Stderr io.Reader
29
30 // DoneCh should be set to a non-nil channel that will be closed
31 // when the control requests the RPC server to end.
32 DoneCh chan<- struct{}
33
34 lock sync.Mutex
35 }
36
37 // Accept accepts connections on a listener and serves requests for
38 // each incoming connection. Accept blocks; the caller typically invokes
39 // it in a go statement.
40 func (s *RPCServer) Accept(lis net.Listener) {
41 for {
42 conn, err := lis.Accept()
43 if err != nil {
44 log.Printf("[ERR] plugin: plugin server: %s", err)
45 return
46 }
47
48 go s.ServeConn(conn)
49 }
50 }
51
52 // ServeConn runs a single connection.
53 //
54 // ServeConn blocks, serving the connection until the client hangs up.
55 func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
56 // First create the yamux server to wrap this connection
57 mux, err := yamux.Server(conn, nil)
58 if err != nil {
59 conn.Close()
60 log.Printf("[ERR] plugin: error creating yamux server: %s", err)
61 return
62 }
63
64 // Accept the control connection
65 control, err := mux.Accept()
66 if err != nil {
67 mux.Close()
68 if err != io.EOF {
69 log.Printf("[ERR] plugin: error accepting control connection: %s", err)
70 }
71
72 return
73 }
74
75 // Connect the stdstreams (in, out, err)
76 stdstream := make([]net.Conn, 2)
77 for i, _ := range stdstream {
78 stdstream[i], err = mux.Accept()
79 if err != nil {
80 mux.Close()
81 log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
82 return
83 }
84 }
85
86 // Copy std streams out to the proper place
87 go copyStream("stdout", stdstream[0], s.Stdout)
88 go copyStream("stderr", stdstream[1], s.Stderr)
89
90 // Create the broker and start it up
91 broker := newMuxBroker(mux)
92 go broker.Run()
93
94 // Use the control connection to build the dispenser and serve the
95 // connection.
96 server := rpc.NewServer()
97 server.RegisterName("Control", &controlServer{
98 server: s,
99 })
100 server.RegisterName("Dispenser", &dispenseServer{
101 broker: broker,
102 plugins: s.Plugins,
103 })
104 server.ServeConn(control)
105 }
106
107 // done is called internally by the control server to trigger the
108 // doneCh to close which is listened to by the main process to cleanly
109 // exit.
110 func (s *RPCServer) done() {
111 s.lock.Lock()
112 defer s.lock.Unlock()
113
114 if s.DoneCh != nil {
115 close(s.DoneCh)
116 s.DoneCh = nil
117 }
118 }
119
120 // dispenseServer dispenses variousinterface implementations for Terraform.
121 type controlServer struct {
122 server *RPCServer
123 }
124
125 func (c *controlServer) Quit(
126 null bool, response *struct{}) error {
127 // End the server
128 c.server.done()
129
130 // Always return true
131 *response = struct{}{}
132
133 return nil
134 }
135
136 // dispenseServer dispenses variousinterface implementations for Terraform.
137 type dispenseServer struct {
138 broker *MuxBroker
139 plugins map[string]Plugin
140 }
141
142 func (d *dispenseServer) Dispense(
143 name string, response *uint32) error {
144 // Find the function to create this implementation
145 p, ok := d.plugins[name]
146 if !ok {
147 return fmt.Errorf("unknown plugin type: %s", name)
148 }
149
150 // Create the implementation first so we know if there is an error.
151 impl, err := p.Server(d.broker)
152 if err != nil {
153 // We turn the error into an errors error so that it works across RPC
154 return errors.New(err.Error())
155 }
156
157 // Reserve an ID for our implementation
158 id := d.broker.NextId()
159 *response = id
160
161 // Run the rest in a goroutine since it can only happen once this RPC
162 // call returns. We wait for a connection for the plugin implementation
163 // and serve it.
164 go func() {
165 conn, err := d.broker.Accept(id)
166 if err != nil {
167 log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
168 return
169 }
170
171 serve(conn, "Plugin", impl)
172 }()
173
174 return nil
175 }
176
177 func serve(conn io.ReadWriteCloser, name string, v interface{}) {
178 server := rpc.NewServer()
179 if err := server.RegisterName(name, v); err != nil {
180 log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
181 return
182 }
183
184 server.ServeConn(conn)
185 }