aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/github.com/hashicorp/go-plugin/client.go
diff options
context:
space:
mode:
authorJake Champlin <jake.champlin.27@gmail.com>2017-06-06 12:40:07 -0400
committerJake Champlin <jake.champlin.27@gmail.com>2017-06-06 12:40:07 -0400
commitbae9f6d2fd5eb5bc80929bd393932b23f14d7c93 (patch)
treeca9ab12a7d78b1fc27a8f734729081357ce6d252 /vendor/github.com/hashicorp/go-plugin/client.go
parent254c495b6bebab3fb72a243c4bce858d79e6ee99 (diff)
downloadterraform-provider-statuscake-bae9f6d2fd5eb5bc80929bd393932b23f14d7c93.tar.gz
terraform-provider-statuscake-bae9f6d2fd5eb5bc80929bd393932b23f14d7c93.tar.zst
terraform-provider-statuscake-bae9f6d2fd5eb5bc80929bd393932b23f14d7c93.zip
Initial transfer of provider code
Diffstat (limited to 'vendor/github.com/hashicorp/go-plugin/client.go')
-rw-r--r--vendor/github.com/hashicorp/go-plugin/client.go581
1 files changed, 581 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/go-plugin/client.go b/vendor/github.com/hashicorp/go-plugin/client.go
new file mode 100644
index 0000000..9f8a0f2
--- /dev/null
+++ b/vendor/github.com/hashicorp/go-plugin/client.go
@@ -0,0 +1,581 @@
1package plugin
2
3import (
4 "bufio"
5 "errors"
6 "fmt"
7 "io"
8 "io/ioutil"
9 "log"
10 "net"
11 "os"
12 "os/exec"
13 "path/filepath"
14 "strconv"
15 "strings"
16 "sync"
17 "sync/atomic"
18 "time"
19 "unicode"
20)
21
22// If this is 1, then we've called CleanupClients. This can be used
23// by plugin RPC implementations to change error behavior since you
24// can expected network connection errors at this point. This should be
25// read by using sync/atomic.
26var Killed uint32 = 0
27
28// This is a slice of the "managed" clients which are cleaned up when
29// calling Cleanup
30var managedClients = make([]*Client, 0, 5)
31var managedClientsLock sync.Mutex
32
33// Error types
34var (
35 // ErrProcessNotFound is returned when a client is instantiated to
36 // reattach to an existing process and it isn't found.
37 ErrProcessNotFound = errors.New("Reattachment process not found")
38)
39
40// Client handles the lifecycle of a plugin application. It launches
41// plugins, connects to them, dispenses interface implementations, and handles
42// killing the process.
43//
44// Plugin hosts should use one Client for each plugin executable. To
45// dispense a plugin type, use the `Client.Client` function, and then
46// cal `Dispense`. This awkward API is mostly historical but is used to split
47// the client that deals with subprocess management and the client that
48// does RPC management.
49//
50// See NewClient and ClientConfig for using a Client.
51type Client struct {
52 config *ClientConfig
53 exited bool
54 doneLogging chan struct{}
55 l sync.Mutex
56 address net.Addr
57 process *os.Process
58 client *RPCClient
59}
60
61// ClientConfig is the configuration used to initialize a new
62// plugin client. After being used to initialize a plugin client,
63// that configuration must not be modified again.
64type ClientConfig struct {
65 // HandshakeConfig is the configuration that must match servers.
66 HandshakeConfig
67
68 // Plugins are the plugins that can be consumed.
69 Plugins map[string]Plugin
70
71 // One of the following must be set, but not both.
72 //
73 // Cmd is the unstarted subprocess for starting the plugin. If this is
74 // set, then the Client starts the plugin process on its own and connects
75 // to it.
76 //
77 // Reattach is configuration for reattaching to an existing plugin process
78 // that is already running. This isn't common.
79 Cmd *exec.Cmd
80 Reattach *ReattachConfig
81
82 // Managed represents if the client should be managed by the
83 // plugin package or not. If true, then by calling CleanupClients,
84 // it will automatically be cleaned up. Otherwise, the client
85 // user is fully responsible for making sure to Kill all plugin
86 // clients. By default the client is _not_ managed.
87 Managed bool
88
89 // The minimum and maximum port to use for communicating with
90 // the subprocess. If not set, this defaults to 10,000 and 25,000
91 // respectively.
92 MinPort, MaxPort uint
93
94 // StartTimeout is the timeout to wait for the plugin to say it
95 // has started successfully.
96 StartTimeout time.Duration
97
98 // If non-nil, then the stderr of the client will be written to here
99 // (as well as the log). This is the original os.Stderr of the subprocess.
100 // This isn't the output of synced stderr.
101 Stderr io.Writer
102
103 // SyncStdout, SyncStderr can be set to override the
104 // respective os.Std* values in the plugin. Care should be taken to
105 // avoid races here. If these are nil, then this will automatically be
106 // hooked up to os.Stdin, Stdout, and Stderr, respectively.
107 //
108 // If the default values (nil) are used, then this package will not
109 // sync any of these streams.
110 SyncStdout io.Writer
111 SyncStderr io.Writer
112}
113
114// ReattachConfig is used to configure a client to reattach to an
115// already-running plugin process. You can retrieve this information by
116// calling ReattachConfig on Client.
117type ReattachConfig struct {
118 Addr net.Addr
119 Pid int
120}
121
122// This makes sure all the managed subprocesses are killed and properly
123// logged. This should be called before the parent process running the
124// plugins exits.
125//
126// This must only be called _once_.
127func CleanupClients() {
128 // Set the killed to true so that we don't get unexpected panics
129 atomic.StoreUint32(&Killed, 1)
130
131 // Kill all the managed clients in parallel and use a WaitGroup
132 // to wait for them all to finish up.
133 var wg sync.WaitGroup
134 managedClientsLock.Lock()
135 for _, client := range managedClients {
136 wg.Add(1)
137
138 go func(client *Client) {
139 client.Kill()
140 wg.Done()
141 }(client)
142 }
143 managedClientsLock.Unlock()
144
145 log.Println("[DEBUG] plugin: waiting for all plugin processes to complete...")
146 wg.Wait()
147}
148
149// Creates a new plugin client which manages the lifecycle of an external
150// plugin and gets the address for the RPC connection.
151//
152// The client must be cleaned up at some point by calling Kill(). If
153// the client is a managed client (created with NewManagedClient) you
154// can just call CleanupClients at the end of your program and they will
155// be properly cleaned.
156func NewClient(config *ClientConfig) (c *Client) {
157 if config.MinPort == 0 && config.MaxPort == 0 {
158 config.MinPort = 10000
159 config.MaxPort = 25000
160 }
161
162 if config.StartTimeout == 0 {
163 config.StartTimeout = 1 * time.Minute
164 }
165
166 if config.Stderr == nil {
167 config.Stderr = ioutil.Discard
168 }
169
170 if config.SyncStdout == nil {
171 config.SyncStdout = ioutil.Discard
172 }
173 if config.SyncStderr == nil {
174 config.SyncStderr = ioutil.Discard
175 }
176
177 c = &Client{config: config}
178 if config.Managed {
179 managedClientsLock.Lock()
180 managedClients = append(managedClients, c)
181 managedClientsLock.Unlock()
182 }
183
184 return
185}
186
187// Client returns an RPC client for the plugin.
188//
189// Subsequent calls to this will return the same RPC client.
190func (c *Client) Client() (*RPCClient, error) {
191 addr, err := c.Start()
192 if err != nil {
193 return nil, err
194 }
195
196 c.l.Lock()
197 defer c.l.Unlock()
198
199 if c.client != nil {
200 return c.client, nil
201 }
202
203 // Connect to the client
204 conn, err := net.Dial(addr.Network(), addr.String())
205 if err != nil {
206 return nil, err
207 }
208 if tcpConn, ok := conn.(*net.TCPConn); ok {
209 // Make sure to set keep alive so that the connection doesn't die
210 tcpConn.SetKeepAlive(true)
211 }
212
213 // Create the actual RPC client
214 c.client, err = NewRPCClient(conn, c.config.Plugins)
215 if err != nil {
216 conn.Close()
217 return nil, err
218 }
219
220 // Begin the stream syncing so that stdin, out, err work properly
221 err = c.client.SyncStreams(
222 c.config.SyncStdout,
223 c.config.SyncStderr)
224 if err != nil {
225 c.client.Close()
226 c.client = nil
227 return nil, err
228 }
229
230 return c.client, nil
231}
232
233// Tells whether or not the underlying process has exited.
234func (c *Client) Exited() bool {
235 c.l.Lock()
236 defer c.l.Unlock()
237 return c.exited
238}
239
240// End the executing subprocess (if it is running) and perform any cleanup
241// tasks necessary such as capturing any remaining logs and so on.
242//
243// This method blocks until the process successfully exits.
244//
245// This method can safely be called multiple times.
246func (c *Client) Kill() {
247 // Grab a lock to read some private fields.
248 c.l.Lock()
249 process := c.process
250 addr := c.address
251 doneCh := c.doneLogging
252 c.l.Unlock()
253
254 // If there is no process, we never started anything. Nothing to kill.
255 if process == nil {
256 return
257 }
258
259 // We need to check for address here. It is possible that the plugin
260 // started (process != nil) but has no address (addr == nil) if the
261 // plugin failed at startup. If we do have an address, we need to close
262 // the plugin net connections.
263 graceful := false
264 if addr != nil {
265 // Close the client to cleanly exit the process.
266 client, err := c.Client()
267 if err == nil {
268 err = client.Close()
269
270 // If there is no error, then we attempt to wait for a graceful
271 // exit. If there was an error, we assume that graceful cleanup
272 // won't happen and just force kill.
273 graceful = err == nil
274 if err != nil {
275 // If there was an error just log it. We're going to force
276 // kill in a moment anyways.
277 log.Printf(
278 "[WARN] plugin: error closing client during Kill: %s", err)
279 }
280 }
281 }
282
283 // If we're attempting a graceful exit, then we wait for a short period
284 // of time to allow that to happen. To wait for this we just wait on the
285 // doneCh which would be closed if the process exits.
286 if graceful {
287 select {
288 case <-doneCh:
289 return
290 case <-time.After(250 * time.Millisecond):
291 }
292 }
293
294 // If graceful exiting failed, just kill it
295 process.Kill()
296
297 // Wait for the client to finish logging so we have a complete log
298 <-doneCh
299}
300
301// Starts the underlying subprocess, communicating with it to negotiate
302// a port for RPC connections, and returning the address to connect via RPC.
303//
304// This method is safe to call multiple times. Subsequent calls have no effect.
305// Once a client has been started once, it cannot be started again, even if
306// it was killed.
307func (c *Client) Start() (addr net.Addr, err error) {
308 c.l.Lock()
309 defer c.l.Unlock()
310
311 if c.address != nil {
312 return c.address, nil
313 }
314
315 // If one of cmd or reattach isn't set, then it is an error. We wrap
316 // this in a {} for scoping reasons, and hopeful that the escape
317 // analysis will pop the stock here.
318 {
319 cmdSet := c.config.Cmd != nil
320 attachSet := c.config.Reattach != nil
321 if cmdSet == attachSet {
322 return nil, fmt.Errorf("Only one of Cmd or Reattach must be set")
323 }
324 }
325
326 // Create the logging channel for when we kill
327 c.doneLogging = make(chan struct{})
328
329 if c.config.Reattach != nil {
330 // Verify the process still exists. If not, then it is an error
331 p, err := os.FindProcess(c.config.Reattach.Pid)
332 if err != nil {
333 return nil, err
334 }
335
336 // Attempt to connect to the addr since on Unix systems FindProcess
337 // doesn't actually return an error if it can't find the process.
338 conn, err := net.Dial(
339 c.config.Reattach.Addr.Network(),
340 c.config.Reattach.Addr.String())
341 if err != nil {
342 p.Kill()
343 return nil, ErrProcessNotFound
344 }
345 conn.Close()
346
347 // Goroutine to mark exit status
348 go func(pid int) {
349 // Wait for the process to die
350 pidWait(pid)
351
352 // Log so we can see it
353 log.Printf("[DEBUG] plugin: reattached plugin process exited\n")
354
355 // Mark it
356 c.l.Lock()
357 defer c.l.Unlock()
358 c.exited = true
359
360 // Close the logging channel since that doesn't work on reattach
361 close(c.doneLogging)
362 }(p.Pid)
363
364 // Set the address and process
365 c.address = c.config.Reattach.Addr
366 c.process = p
367
368 return c.address, nil
369 }
370
371 env := []string{
372 fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue),
373 fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort),
374 fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort),
375 }
376
377 stdout_r, stdout_w := io.Pipe()
378 stderr_r, stderr_w := io.Pipe()
379
380 cmd := c.config.Cmd
381 cmd.Env = append(cmd.Env, os.Environ()...)
382 cmd.Env = append(cmd.Env, env...)
383 cmd.Stdin = os.Stdin
384 cmd.Stderr = stderr_w
385 cmd.Stdout = stdout_w
386
387 log.Printf("[DEBUG] plugin: starting plugin: %s %#v", cmd.Path, cmd.Args)
388 err = cmd.Start()
389 if err != nil {
390 return
391 }
392
393 // Set the process
394 c.process = cmd.Process
395
396 // Make sure the command is properly cleaned up if there is an error
397 defer func() {
398 r := recover()
399
400 if err != nil || r != nil {
401 cmd.Process.Kill()
402 }
403
404 if r != nil {
405 panic(r)
406 }
407 }()
408
409 // Start goroutine to wait for process to exit
410 exitCh := make(chan struct{})
411 go func() {
412 // Make sure we close the write end of our stderr/stdout so
413 // that the readers send EOF properly.
414 defer stderr_w.Close()
415 defer stdout_w.Close()
416
417 // Wait for the command to end.
418 cmd.Wait()
419
420 // Log and make sure to flush the logs write away
421 log.Printf("[DEBUG] plugin: %s: plugin process exited\n", cmd.Path)
422 os.Stderr.Sync()
423
424 // Mark that we exited
425 close(exitCh)
426
427 // Set that we exited, which takes a lock
428 c.l.Lock()
429 defer c.l.Unlock()
430 c.exited = true
431 }()
432
433 // Start goroutine that logs the stderr
434 go c.logStderr(stderr_r)
435
436 // Start a goroutine that is going to be reading the lines
437 // out of stdout
438 linesCh := make(chan []byte)
439 go func() {
440 defer close(linesCh)
441
442 buf := bufio.NewReader(stdout_r)
443 for {
444 line, err := buf.ReadBytes('\n')
445 if line != nil {
446 linesCh <- line
447 }
448
449 if err == io.EOF {
450 return
451 }
452 }
453 }()
454
455 // Make sure after we exit we read the lines from stdout forever
456 // so they don't block since it is an io.Pipe
457 defer func() {
458 go func() {
459 for _ = range linesCh {
460 }
461 }()
462 }()
463
464 // Some channels for the next step
465 timeout := time.After(c.config.StartTimeout)
466
467 // Start looking for the address
468 log.Printf("[DEBUG] plugin: waiting for RPC address for: %s", cmd.Path)
469 select {
470 case <-timeout:
471 err = errors.New("timeout while waiting for plugin to start")
472 case <-exitCh:
473 err = errors.New("plugin exited before we could connect")
474 case lineBytes := <-linesCh:
475 // Trim the line and split by "|" in order to get the parts of
476 // the output.
477 line := strings.TrimSpace(string(lineBytes))
478 parts := strings.SplitN(line, "|", 4)
479 if len(parts) < 4 {
480 err = fmt.Errorf(
481 "Unrecognized remote plugin message: %s\n\n"+
482 "This usually means that the plugin is either invalid or simply\n"+
483 "needs to be recompiled to support the latest protocol.", line)
484 return
485 }
486
487 // Check the core protocol. Wrapped in a {} for scoping.
488 {
489 var coreProtocol int64
490 coreProtocol, err = strconv.ParseInt(parts[0], 10, 0)
491 if err != nil {
492 err = fmt.Errorf("Error parsing core protocol version: %s", err)
493 return
494 }
495
496 if int(coreProtocol) != CoreProtocolVersion {
497 err = fmt.Errorf("Incompatible core API version with plugin. "+
498 "Plugin version: %s, Ours: %d\n\n"+
499 "To fix this, the plugin usually only needs to be recompiled.\n"+
500 "Please report this to the plugin author.", parts[0], CoreProtocolVersion)
501 return
502 }
503 }
504
505 // Parse the protocol version
506 var protocol int64
507 protocol, err = strconv.ParseInt(parts[1], 10, 0)
508 if err != nil {
509 err = fmt.Errorf("Error parsing protocol version: %s", err)
510 return
511 }
512
513 // Test the API version
514 if uint(protocol) != c.config.ProtocolVersion {
515 err = fmt.Errorf("Incompatible API version with plugin. "+
516 "Plugin version: %s, Ours: %d", parts[1], c.config.ProtocolVersion)
517 return
518 }
519
520 switch parts[2] {
521 case "tcp":
522 addr, err = net.ResolveTCPAddr("tcp", parts[3])
523 case "unix":
524 addr, err = net.ResolveUnixAddr("unix", parts[3])
525 default:
526 err = fmt.Errorf("Unknown address type: %s", parts[3])
527 }
528 }
529
530 c.address = addr
531 return
532}
533
534// ReattachConfig returns the information that must be provided to NewClient
535// to reattach to the plugin process that this client started. This is
536// useful for plugins that detach from their parent process.
537//
538// If this returns nil then the process hasn't been started yet. Please
539// call Start or Client before calling this.
540func (c *Client) ReattachConfig() *ReattachConfig {
541 c.l.Lock()
542 defer c.l.Unlock()
543
544 if c.address == nil {
545 return nil
546 }
547
548 if c.config.Cmd != nil && c.config.Cmd.Process == nil {
549 return nil
550 }
551
552 // If we connected via reattach, just return the information as-is
553 if c.config.Reattach != nil {
554 return c.config.Reattach
555 }
556
557 return &ReattachConfig{
558 Addr: c.address,
559 Pid: c.config.Cmd.Process.Pid,
560 }
561}
562
563func (c *Client) logStderr(r io.Reader) {
564 bufR := bufio.NewReader(r)
565 for {
566 line, err := bufR.ReadString('\n')
567 if line != "" {
568 c.config.Stderr.Write([]byte(line))
569
570 line = strings.TrimRightFunc(line, unicode.IsSpace)
571 log.Printf("[DEBUG] plugin: %s: %s", filepath.Base(c.config.Cmd.Path), line)
572 }
573
574 if err == io.EOF {
575 break
576 }
577 }
578
579 // Flag that we've completed logging for others
580 close(c.doneLogging)
581}