]>
Commit | Line | Data |
---|---|---|
15c0b25d AP |
1 | package plugin |
2 | ||
3 | import ( | |
4 | "context" | |
5 | "crypto/tls" | |
6 | "errors" | |
7 | "fmt" | |
8 | "log" | |
9 | "net" | |
10 | "sync" | |
11 | "sync/atomic" | |
12 | "time" | |
13 | ||
107c1cdb ND |
14 | "github.com/hashicorp/go-plugin/internal/plugin" |
15 | ||
15c0b25d AP |
16 | "github.com/oklog/run" |
17 | "google.golang.org/grpc" | |
18 | "google.golang.org/grpc/credentials" | |
19 | ) | |
20 | ||
21 | // streamer interface is used in the broker to send/receive connection | |
22 | // information. | |
23 | type streamer interface { | |
107c1cdb ND |
24 | Send(*plugin.ConnInfo) error |
25 | Recv() (*plugin.ConnInfo, error) | |
15c0b25d AP |
26 | Close() |
27 | } | |
28 | ||
29 | // sendErr is used to pass errors back during a send. | |
30 | type sendErr struct { | |
107c1cdb | 31 | i *plugin.ConnInfo |
15c0b25d AP |
32 | ch chan error |
33 | } | |
34 | ||
35 | // gRPCBrokerServer is used by the plugin to start a stream and to send | |
36 | // connection information to/from the plugin. Implements GRPCBrokerServer and | |
37 | // streamer interfaces. | |
38 | type gRPCBrokerServer struct { | |
39 | // send is used to send connection info to the gRPC stream. | |
40 | send chan *sendErr | |
41 | ||
42 | // recv is used to receive connection info from the gRPC stream. | |
107c1cdb | 43 | recv chan *plugin.ConnInfo |
15c0b25d AP |
44 | |
45 | // quit closes down the stream. | |
46 | quit chan struct{} | |
47 | ||
48 | // o is used to ensure we close the quit channel only once. | |
49 | o sync.Once | |
50 | } | |
51 | ||
52 | func newGRPCBrokerServer() *gRPCBrokerServer { | |
53 | return &gRPCBrokerServer{ | |
54 | send: make(chan *sendErr), | |
107c1cdb | 55 | recv: make(chan *plugin.ConnInfo), |
15c0b25d AP |
56 | quit: make(chan struct{}), |
57 | } | |
58 | } | |
59 | ||
60 | // StartStream implements the GRPCBrokerServer interface and will block until | |
61 | // the quit channel is closed or the context reports Done. The stream will pass | |
62 | // connection information to/from the client. | |
107c1cdb | 63 | func (s *gRPCBrokerServer) StartStream(stream plugin.GRPCBroker_StartStreamServer) error { |
15c0b25d AP |
64 | doneCh := stream.Context().Done() |
65 | defer s.Close() | |
66 | ||
67 | // Proccess send stream | |
68 | go func() { | |
69 | for { | |
70 | select { | |
71 | case <-doneCh: | |
72 | return | |
73 | case <-s.quit: | |
74 | return | |
75 | case se := <-s.send: | |
76 | err := stream.Send(se.i) | |
77 | se.ch <- err | |
78 | } | |
79 | } | |
80 | }() | |
81 | ||
82 | // Process receive stream | |
83 | for { | |
84 | i, err := stream.Recv() | |
85 | if err != nil { | |
86 | return err | |
87 | } | |
88 | select { | |
89 | case <-doneCh: | |
90 | return nil | |
91 | case <-s.quit: | |
92 | return nil | |
93 | case s.recv <- i: | |
94 | } | |
95 | } | |
96 | ||
97 | return nil | |
98 | } | |
99 | ||
100 | // Send is used by the GRPCBroker to pass connection information into the stream | |
101 | // to the client. | |
107c1cdb | 102 | func (s *gRPCBrokerServer) Send(i *plugin.ConnInfo) error { |
15c0b25d AP |
103 | ch := make(chan error) |
104 | defer close(ch) | |
105 | ||
106 | select { | |
107 | case <-s.quit: | |
108 | return errors.New("broker closed") | |
109 | case s.send <- &sendErr{ | |
110 | i: i, | |
111 | ch: ch, | |
112 | }: | |
113 | } | |
114 | ||
115 | return <-ch | |
116 | } | |
117 | ||
118 | // Recv is used by the GRPCBroker to pass connection information that has been | |
119 | // sent from the client from the stream to the broker. | |
107c1cdb | 120 | func (s *gRPCBrokerServer) Recv() (*plugin.ConnInfo, error) { |
15c0b25d AP |
121 | select { |
122 | case <-s.quit: | |
123 | return nil, errors.New("broker closed") | |
124 | case i := <-s.recv: | |
125 | return i, nil | |
126 | } | |
127 | } | |
128 | ||
129 | // Close closes the quit channel, shutting down the stream. | |
130 | func (s *gRPCBrokerServer) Close() { | |
131 | s.o.Do(func() { | |
132 | close(s.quit) | |
133 | }) | |
134 | } | |
135 | ||
136 | // gRPCBrokerClientImpl is used by the client to start a stream and to send | |
137 | // connection information to/from the client. Implements GRPCBrokerClient and | |
138 | // streamer interfaces. | |
139 | type gRPCBrokerClientImpl struct { | |
140 | // client is the underlying GRPC client used to make calls to the server. | |
107c1cdb | 141 | client plugin.GRPCBrokerClient |
15c0b25d AP |
142 | |
143 | // send is used to send connection info to the gRPC stream. | |
144 | send chan *sendErr | |
145 | ||
146 | // recv is used to receive connection info from the gRPC stream. | |
107c1cdb | 147 | recv chan *plugin.ConnInfo |
15c0b25d AP |
148 | |
149 | // quit closes down the stream. | |
150 | quit chan struct{} | |
151 | ||
152 | // o is used to ensure we close the quit channel only once. | |
153 | o sync.Once | |
154 | } | |
155 | ||
156 | func newGRPCBrokerClient(conn *grpc.ClientConn) *gRPCBrokerClientImpl { | |
157 | return &gRPCBrokerClientImpl{ | |
107c1cdb | 158 | client: plugin.NewGRPCBrokerClient(conn), |
15c0b25d | 159 | send: make(chan *sendErr), |
107c1cdb | 160 | recv: make(chan *plugin.ConnInfo), |
15c0b25d AP |
161 | quit: make(chan struct{}), |
162 | } | |
163 | } | |
164 | ||
165 | // StartStream implements the GRPCBrokerClient interface and will block until | |
166 | // the quit channel is closed or the context reports Done. The stream will pass | |
167 | // connection information to/from the plugin. | |
168 | func (s *gRPCBrokerClientImpl) StartStream() error { | |
169 | ctx, cancelFunc := context.WithCancel(context.Background()) | |
170 | defer cancelFunc() | |
171 | defer s.Close() | |
172 | ||
173 | stream, err := s.client.StartStream(ctx) | |
174 | if err != nil { | |
175 | return err | |
176 | } | |
177 | doneCh := stream.Context().Done() | |
178 | ||
179 | go func() { | |
180 | for { | |
181 | select { | |
182 | case <-doneCh: | |
183 | return | |
184 | case <-s.quit: | |
185 | return | |
186 | case se := <-s.send: | |
187 | err := stream.Send(se.i) | |
188 | se.ch <- err | |
189 | } | |
190 | } | |
191 | }() | |
192 | ||
193 | for { | |
194 | i, err := stream.Recv() | |
195 | if err != nil { | |
196 | return err | |
197 | } | |
198 | select { | |
199 | case <-doneCh: | |
200 | return nil | |
201 | case <-s.quit: | |
202 | return nil | |
203 | case s.recv <- i: | |
204 | } | |
205 | } | |
206 | ||
207 | return nil | |
208 | } | |
209 | ||
210 | // Send is used by the GRPCBroker to pass connection information into the stream | |
211 | // to the plugin. | |
107c1cdb | 212 | func (s *gRPCBrokerClientImpl) Send(i *plugin.ConnInfo) error { |
15c0b25d AP |
213 | ch := make(chan error) |
214 | defer close(ch) | |
215 | ||
216 | select { | |
217 | case <-s.quit: | |
218 | return errors.New("broker closed") | |
219 | case s.send <- &sendErr{ | |
220 | i: i, | |
221 | ch: ch, | |
222 | }: | |
223 | } | |
224 | ||
225 | return <-ch | |
226 | } | |
227 | ||
228 | // Recv is used by the GRPCBroker to pass connection information that has been | |
229 | // sent from the plugin to the broker. | |
107c1cdb | 230 | func (s *gRPCBrokerClientImpl) Recv() (*plugin.ConnInfo, error) { |
15c0b25d AP |
231 | select { |
232 | case <-s.quit: | |
233 | return nil, errors.New("broker closed") | |
234 | case i := <-s.recv: | |
235 | return i, nil | |
236 | } | |
237 | } | |
238 | ||
239 | // Close closes the quit channel, shutting down the stream. | |
240 | func (s *gRPCBrokerClientImpl) Close() { | |
241 | s.o.Do(func() { | |
242 | close(s.quit) | |
243 | }) | |
244 | } | |
245 | ||
246 | // GRPCBroker is responsible for brokering connections by unique ID. | |
247 | // | |
248 | // It is used by plugins to create multiple gRPC connections and data | |
249 | // streams between the plugin process and the host process. | |
250 | // | |
251 | // This allows a plugin to request a channel with a specific ID to connect to | |
252 | // or accept a connection from, and the broker handles the details of | |
253 | // holding these channels open while they're being negotiated. | |
254 | // | |
255 | // The Plugin interface has access to these for both Server and Client. | |
256 | // The broker can be used by either (optionally) to reserve and connect to | |
257 | // new streams. This is useful for complex args and return values, | |
258 | // or anything else you might need a data stream for. | |
259 | type GRPCBroker struct { | |
260 | nextId uint32 | |
261 | streamer streamer | |
262 | streams map[uint32]*gRPCBrokerPending | |
263 | tls *tls.Config | |
264 | doneCh chan struct{} | |
265 | o sync.Once | |
266 | ||
267 | sync.Mutex | |
268 | } | |
269 | ||
270 | type gRPCBrokerPending struct { | |
107c1cdb | 271 | ch chan *plugin.ConnInfo |
15c0b25d AP |
272 | doneCh chan struct{} |
273 | } | |
274 | ||
275 | func newGRPCBroker(s streamer, tls *tls.Config) *GRPCBroker { | |
276 | return &GRPCBroker{ | |
277 | streamer: s, | |
278 | streams: make(map[uint32]*gRPCBrokerPending), | |
279 | tls: tls, | |
280 | doneCh: make(chan struct{}), | |
281 | } | |
282 | } | |
283 | ||
284 | // Accept accepts a connection by ID. | |
285 | // | |
286 | // This should not be called multiple times with the same ID at one time. | |
287 | func (b *GRPCBroker) Accept(id uint32) (net.Listener, error) { | |
288 | listener, err := serverListener() | |
289 | if err != nil { | |
290 | return nil, err | |
291 | } | |
292 | ||
107c1cdb | 293 | err = b.streamer.Send(&plugin.ConnInfo{ |
15c0b25d AP |
294 | ServiceId: id, |
295 | Network: listener.Addr().Network(), | |
296 | Address: listener.Addr().String(), | |
297 | }) | |
298 | if err != nil { | |
299 | return nil, err | |
300 | } | |
301 | ||
302 | return listener, nil | |
303 | } | |
304 | ||
305 | // AcceptAndServe is used to accept a specific stream ID and immediately | |
306 | // serve a gRPC server on that stream ID. This is used to easily serve | |
307 | // complex arguments. Each AcceptAndServe call opens a new listener socket and | |
308 | // sends the connection info down the stream to the dialer. Since a new | |
309 | // connection is opened every call, these calls should be used sparingly. | |
310 | // Multiple gRPC server implementations can be registered to a single | |
311 | // AcceptAndServe call. | |
312 | func (b *GRPCBroker) AcceptAndServe(id uint32, s func([]grpc.ServerOption) *grpc.Server) { | |
313 | listener, err := b.Accept(id) | |
314 | if err != nil { | |
315 | log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err) | |
316 | return | |
317 | } | |
318 | defer listener.Close() | |
319 | ||
320 | var opts []grpc.ServerOption | |
321 | if b.tls != nil { | |
322 | opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(b.tls))} | |
323 | } | |
324 | ||
325 | server := s(opts) | |
326 | ||
327 | // Here we use a run group to close this goroutine if the server is shutdown | |
328 | // or the broker is shutdown. | |
329 | var g run.Group | |
330 | { | |
331 | // Serve on the listener, if shutting down call GracefulStop. | |
332 | g.Add(func() error { | |
333 | return server.Serve(listener) | |
334 | }, func(err error) { | |
335 | server.GracefulStop() | |
336 | }) | |
337 | } | |
338 | { | |
339 | // block on the closeCh or the doneCh. If we are shutting down close the | |
340 | // closeCh. | |
341 | closeCh := make(chan struct{}) | |
342 | g.Add(func() error { | |
343 | select { | |
344 | case <-b.doneCh: | |
345 | case <-closeCh: | |
346 | } | |
347 | return nil | |
348 | }, func(err error) { | |
349 | close(closeCh) | |
350 | }) | |
351 | } | |
352 | ||
353 | // Block until we are done | |
354 | g.Run() | |
355 | } | |
356 | ||
357 | // Close closes the stream and all servers. | |
358 | func (b *GRPCBroker) Close() error { | |
359 | b.streamer.Close() | |
360 | b.o.Do(func() { | |
361 | close(b.doneCh) | |
362 | }) | |
363 | return nil | |
364 | } | |
365 | ||
366 | // Dial opens a connection by ID. | |
367 | func (b *GRPCBroker) Dial(id uint32) (conn *grpc.ClientConn, err error) { | |
107c1cdb | 368 | var c *plugin.ConnInfo |
15c0b25d AP |
369 | |
370 | // Open the stream | |
371 | p := b.getStream(id) | |
372 | select { | |
373 | case c = <-p.ch: | |
374 | close(p.doneCh) | |
375 | case <-time.After(5 * time.Second): | |
376 | return nil, fmt.Errorf("timeout waiting for connection info") | |
377 | } | |
378 | ||
379 | var addr net.Addr | |
380 | switch c.Network { | |
381 | case "tcp": | |
382 | addr, err = net.ResolveTCPAddr("tcp", c.Address) | |
383 | case "unix": | |
384 | addr, err = net.ResolveUnixAddr("unix", c.Address) | |
385 | default: | |
386 | err = fmt.Errorf("Unknown address type: %s", c.Address) | |
387 | } | |
388 | if err != nil { | |
389 | return nil, err | |
390 | } | |
391 | ||
392 | return dialGRPCConn(b.tls, netAddrDialer(addr)) | |
393 | } | |
394 | ||
395 | // NextId returns a unique ID to use next. | |
396 | // | |
397 | // It is possible for very long-running plugin hosts to wrap this value, | |
398 | // though it would require a very large amount of calls. In practice | |
399 | // we've never seen it happen. | |
400 | func (m *GRPCBroker) NextId() uint32 { | |
401 | return atomic.AddUint32(&m.nextId, 1) | |
402 | } | |
403 | ||
404 | // Run starts the brokering and should be executed in a goroutine, since it | |
405 | // blocks forever, or until the session closes. | |
406 | // | |
407 | // Uses of GRPCBroker never need to call this. It is called internally by | |
408 | // the plugin host/client. | |
409 | func (m *GRPCBroker) Run() { | |
410 | for { | |
411 | stream, err := m.streamer.Recv() | |
412 | if err != nil { | |
413 | // Once we receive an error, just exit | |
414 | break | |
415 | } | |
416 | ||
417 | // Initialize the waiter | |
418 | p := m.getStream(stream.ServiceId) | |
419 | select { | |
420 | case p.ch <- stream: | |
421 | default: | |
422 | } | |
423 | ||
424 | go m.timeoutWait(stream.ServiceId, p) | |
425 | } | |
426 | } | |
427 | ||
428 | func (m *GRPCBroker) getStream(id uint32) *gRPCBrokerPending { | |
429 | m.Lock() | |
430 | defer m.Unlock() | |
431 | ||
432 | p, ok := m.streams[id] | |
433 | if ok { | |
434 | return p | |
435 | } | |
436 | ||
437 | m.streams[id] = &gRPCBrokerPending{ | |
107c1cdb | 438 | ch: make(chan *plugin.ConnInfo, 1), |
15c0b25d AP |
439 | doneCh: make(chan struct{}), |
440 | } | |
441 | return m.streams[id] | |
442 | } | |
443 | ||
444 | func (m *GRPCBroker) timeoutWait(id uint32, p *gRPCBrokerPending) { | |
445 | // Wait for the stream to either be picked up and connected, or | |
446 | // for a timeout. | |
447 | select { | |
448 | case <-p.doneCh: | |
449 | case <-time.After(5 * time.Second): | |
450 | } | |
451 | ||
452 | m.Lock() | |
453 | defer m.Unlock() | |
454 | ||
455 | // Delete the stream so no one else can grab it | |
456 | delete(m.streams, id) | |
457 | } |