]>
Commit | Line | Data |
---|---|---|
bae9f6d2 JC |
1 | package plugin |
2 | ||
3 | import ( | |
4 | "errors" | |
5 | "fmt" | |
6 | "io" | |
7 | "log" | |
8 | "net" | |
9 | "net/rpc" | |
10 | "sync" | |
11 | ||
12 | "github.com/hashicorp/yamux" | |
13 | ) | |
14 | ||
15 | // RPCServer listens for network connections and then dispenses interface | |
16 | // implementations over net/rpc. | |
17 | // | |
18 | // After setting the fields below, they shouldn't be read again directly | |
19 | // from the structure which may be reading/writing them concurrently. | |
20 | type RPCServer struct { | |
21 | Plugins map[string]Plugin | |
22 | ||
23 | // Stdout, Stderr are what this server will use instead of the | |
24 | // normal stdin/out/err. This is because due to the multi-process nature | |
25 | // of our plugin system, we can't use the normal process values so we | |
26 | // make our own custom one we pipe across. | |
27 | Stdout io.Reader | |
28 | Stderr io.Reader | |
29 | ||
30 | // DoneCh should be set to a non-nil channel that will be closed | |
31 | // when the control requests the RPC server to end. | |
32 | DoneCh chan<- struct{} | |
33 | ||
34 | lock sync.Mutex | |
35 | } | |
36 | ||
15c0b25d AP |
37 | // ServerProtocol impl. |
38 | func (s *RPCServer) Init() error { return nil } | |
39 | ||
40 | // ServerProtocol impl. | |
41 | func (s *RPCServer) Config() string { return "" } | |
42 | ||
43 | // ServerProtocol impl. | |
44 | func (s *RPCServer) Serve(lis net.Listener) { | |
bae9f6d2 JC |
45 | for { |
46 | conn, err := lis.Accept() | |
47 | if err != nil { | |
48 | log.Printf("[ERR] plugin: plugin server: %s", err) | |
49 | return | |
50 | } | |
51 | ||
52 | go s.ServeConn(conn) | |
53 | } | |
54 | } | |
55 | ||
56 | // ServeConn runs a single connection. | |
57 | // | |
58 | // ServeConn blocks, serving the connection until the client hangs up. | |
59 | func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) { | |
60 | // First create the yamux server to wrap this connection | |
61 | mux, err := yamux.Server(conn, nil) | |
62 | if err != nil { | |
63 | conn.Close() | |
64 | log.Printf("[ERR] plugin: error creating yamux server: %s", err) | |
65 | return | |
66 | } | |
67 | ||
68 | // Accept the control connection | |
69 | control, err := mux.Accept() | |
70 | if err != nil { | |
71 | mux.Close() | |
72 | if err != io.EOF { | |
73 | log.Printf("[ERR] plugin: error accepting control connection: %s", err) | |
74 | } | |
75 | ||
76 | return | |
77 | } | |
78 | ||
79 | // Connect the stdstreams (in, out, err) | |
80 | stdstream := make([]net.Conn, 2) | |
81 | for i, _ := range stdstream { | |
82 | stdstream[i], err = mux.Accept() | |
83 | if err != nil { | |
84 | mux.Close() | |
85 | log.Printf("[ERR] plugin: accepting stream %d: %s", i, err) | |
86 | return | |
87 | } | |
88 | } | |
89 | ||
90 | // Copy std streams out to the proper place | |
91 | go copyStream("stdout", stdstream[0], s.Stdout) | |
92 | go copyStream("stderr", stdstream[1], s.Stderr) | |
93 | ||
94 | // Create the broker and start it up | |
95 | broker := newMuxBroker(mux) | |
96 | go broker.Run() | |
97 | ||
98 | // Use the control connection to build the dispenser and serve the | |
99 | // connection. | |
100 | server := rpc.NewServer() | |
101 | server.RegisterName("Control", &controlServer{ | |
102 | server: s, | |
103 | }) | |
104 | server.RegisterName("Dispenser", &dispenseServer{ | |
105 | broker: broker, | |
106 | plugins: s.Plugins, | |
107 | }) | |
108 | server.ServeConn(control) | |
109 | } | |
110 | ||
111 | // done is called internally by the control server to trigger the | |
112 | // doneCh to close which is listened to by the main process to cleanly | |
113 | // exit. | |
114 | func (s *RPCServer) done() { | |
115 | s.lock.Lock() | |
116 | defer s.lock.Unlock() | |
117 | ||
118 | if s.DoneCh != nil { | |
119 | close(s.DoneCh) | |
120 | s.DoneCh = nil | |
121 | } | |
122 | } | |
123 | ||
124 | // dispenseServer dispenses variousinterface implementations for Terraform. | |
125 | type controlServer struct { | |
126 | server *RPCServer | |
127 | } | |
128 | ||
15c0b25d AP |
129 | // Ping can be called to verify the connection (and likely the binary) |
130 | // is still alive to a plugin. | |
131 | func (c *controlServer) Ping( | |
132 | null bool, response *struct{}) error { | |
133 | *response = struct{}{} | |
134 | return nil | |
135 | } | |
136 | ||
bae9f6d2 JC |
137 | func (c *controlServer) Quit( |
138 | null bool, response *struct{}) error { | |
139 | // End the server | |
140 | c.server.done() | |
141 | ||
142 | // Always return true | |
143 | *response = struct{}{} | |
144 | ||
145 | return nil | |
146 | } | |
147 | ||
148 | // dispenseServer dispenses variousinterface implementations for Terraform. | |
149 | type dispenseServer struct { | |
150 | broker *MuxBroker | |
151 | plugins map[string]Plugin | |
152 | } | |
153 | ||
154 | func (d *dispenseServer) Dispense( | |
155 | name string, response *uint32) error { | |
156 | // Find the function to create this implementation | |
157 | p, ok := d.plugins[name] | |
158 | if !ok { | |
159 | return fmt.Errorf("unknown plugin type: %s", name) | |
160 | } | |
161 | ||
162 | // Create the implementation first so we know if there is an error. | |
163 | impl, err := p.Server(d.broker) | |
164 | if err != nil { | |
165 | // We turn the error into an errors error so that it works across RPC | |
166 | return errors.New(err.Error()) | |
167 | } | |
168 | ||
169 | // Reserve an ID for our implementation | |
170 | id := d.broker.NextId() | |
171 | *response = id | |
172 | ||
173 | // Run the rest in a goroutine since it can only happen once this RPC | |
174 | // call returns. We wait for a connection for the plugin implementation | |
175 | // and serve it. | |
176 | go func() { | |
177 | conn, err := d.broker.Accept(id) | |
178 | if err != nil { | |
179 | log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err) | |
180 | return | |
181 | } | |
182 | ||
183 | serve(conn, "Plugin", impl) | |
184 | }() | |
185 | ||
186 | return nil | |
187 | } | |
188 | ||
189 | func serve(conn io.ReadWriteCloser, name string, v interface{}) { | |
190 | server := rpc.NewServer() | |
191 | if err := server.RegisterName(name, v); err != nil { | |
192 | log.Printf("[ERR] go-plugin: plugin dispense error: %s", err) | |
193 | return | |
194 | } | |
195 | ||
196 | server.ServeConn(conn) | |
197 | } |