]>
Commit | Line | Data |
---|---|---|
bae9f6d2 JC |
1 | package plugin |
2 | ||
3 | import ( | |
4 | "errors" | |
5 | "fmt" | |
6 | "io" | |
7 | "log" | |
8 | "net" | |
9 | "net/rpc" | |
10 | "sync" | |
11 | ||
12 | "github.com/hashicorp/yamux" | |
13 | ) | |
14 | ||
15 | // RPCServer listens for network connections and then dispenses interface | |
16 | // implementations over net/rpc. | |
17 | // | |
18 | // After setting the fields below, they shouldn't be read again directly | |
19 | // from the structure which may be reading/writing them concurrently. | |
20 | type RPCServer struct { | |
21 | Plugins map[string]Plugin | |
22 | ||
23 | // Stdout, Stderr are what this server will use instead of the | |
24 | // normal stdin/out/err. This is because due to the multi-process nature | |
25 | // of our plugin system, we can't use the normal process values so we | |
26 | // make our own custom one we pipe across. | |
27 | Stdout io.Reader | |
28 | Stderr io.Reader | |
29 | ||
30 | // DoneCh should be set to a non-nil channel that will be closed | |
31 | // when the control requests the RPC server to end. | |
32 | DoneCh chan<- struct{} | |
33 | ||
34 | lock sync.Mutex | |
35 | } | |
36 | ||
37 | // Accept accepts connections on a listener and serves requests for | |
38 | // each incoming connection. Accept blocks; the caller typically invokes | |
39 | // it in a go statement. | |
40 | func (s *RPCServer) Accept(lis net.Listener) { | |
41 | for { | |
42 | conn, err := lis.Accept() | |
43 | if err != nil { | |
44 | log.Printf("[ERR] plugin: plugin server: %s", err) | |
45 | return | |
46 | } | |
47 | ||
48 | go s.ServeConn(conn) | |
49 | } | |
50 | } | |
51 | ||
52 | // ServeConn runs a single connection. | |
53 | // | |
54 | // ServeConn blocks, serving the connection until the client hangs up. | |
55 | func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) { | |
56 | // First create the yamux server to wrap this connection | |
57 | mux, err := yamux.Server(conn, nil) | |
58 | if err != nil { | |
59 | conn.Close() | |
60 | log.Printf("[ERR] plugin: error creating yamux server: %s", err) | |
61 | return | |
62 | } | |
63 | ||
64 | // Accept the control connection | |
65 | control, err := mux.Accept() | |
66 | if err != nil { | |
67 | mux.Close() | |
68 | if err != io.EOF { | |
69 | log.Printf("[ERR] plugin: error accepting control connection: %s", err) | |
70 | } | |
71 | ||
72 | return | |
73 | } | |
74 | ||
75 | // Connect the stdstreams (in, out, err) | |
76 | stdstream := make([]net.Conn, 2) | |
77 | for i, _ := range stdstream { | |
78 | stdstream[i], err = mux.Accept() | |
79 | if err != nil { | |
80 | mux.Close() | |
81 | log.Printf("[ERR] plugin: accepting stream %d: %s", i, err) | |
82 | return | |
83 | } | |
84 | } | |
85 | ||
86 | // Copy std streams out to the proper place | |
87 | go copyStream("stdout", stdstream[0], s.Stdout) | |
88 | go copyStream("stderr", stdstream[1], s.Stderr) | |
89 | ||
90 | // Create the broker and start it up | |
91 | broker := newMuxBroker(mux) | |
92 | go broker.Run() | |
93 | ||
94 | // Use the control connection to build the dispenser and serve the | |
95 | // connection. | |
96 | server := rpc.NewServer() | |
97 | server.RegisterName("Control", &controlServer{ | |
98 | server: s, | |
99 | }) | |
100 | server.RegisterName("Dispenser", &dispenseServer{ | |
101 | broker: broker, | |
102 | plugins: s.Plugins, | |
103 | }) | |
104 | server.ServeConn(control) | |
105 | } | |
106 | ||
107 | // done is called internally by the control server to trigger the | |
108 | // doneCh to close which is listened to by the main process to cleanly | |
109 | // exit. | |
110 | func (s *RPCServer) done() { | |
111 | s.lock.Lock() | |
112 | defer s.lock.Unlock() | |
113 | ||
114 | if s.DoneCh != nil { | |
115 | close(s.DoneCh) | |
116 | s.DoneCh = nil | |
117 | } | |
118 | } | |
119 | ||
120 | // dispenseServer dispenses variousinterface implementations for Terraform. | |
121 | type controlServer struct { | |
122 | server *RPCServer | |
123 | } | |
124 | ||
125 | func (c *controlServer) Quit( | |
126 | null bool, response *struct{}) error { | |
127 | // End the server | |
128 | c.server.done() | |
129 | ||
130 | // Always return true | |
131 | *response = struct{}{} | |
132 | ||
133 | return nil | |
134 | } | |
135 | ||
136 | // dispenseServer dispenses variousinterface implementations for Terraform. | |
137 | type dispenseServer struct { | |
138 | broker *MuxBroker | |
139 | plugins map[string]Plugin | |
140 | } | |
141 | ||
142 | func (d *dispenseServer) Dispense( | |
143 | name string, response *uint32) error { | |
144 | // Find the function to create this implementation | |
145 | p, ok := d.plugins[name] | |
146 | if !ok { | |
147 | return fmt.Errorf("unknown plugin type: %s", name) | |
148 | } | |
149 | ||
150 | // Create the implementation first so we know if there is an error. | |
151 | impl, err := p.Server(d.broker) | |
152 | if err != nil { | |
153 | // We turn the error into an errors error so that it works across RPC | |
154 | return errors.New(err.Error()) | |
155 | } | |
156 | ||
157 | // Reserve an ID for our implementation | |
158 | id := d.broker.NextId() | |
159 | *response = id | |
160 | ||
161 | // Run the rest in a goroutine since it can only happen once this RPC | |
162 | // call returns. We wait for a connection for the plugin implementation | |
163 | // and serve it. | |
164 | go func() { | |
165 | conn, err := d.broker.Accept(id) | |
166 | if err != nil { | |
167 | log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err) | |
168 | return | |
169 | } | |
170 | ||
171 | serve(conn, "Plugin", impl) | |
172 | }() | |
173 | ||
174 | return nil | |
175 | } | |
176 | ||
177 | func serve(conn io.ReadWriteCloser, name string, v interface{}) { | |
178 | server := rpc.NewServer() | |
179 | if err := server.RegisterName(name, v); err != nil { | |
180 | log.Printf("[ERR] go-plugin: plugin dispense error: %s", err) | |
181 | return | |
182 | } | |
183 | ||
184 | server.ServeConn(conn) | |
185 | } |