1# Go Plugin System over RPC
3`go-plugin` is a Go (golang) plugin system over RPC. It is the plugin system
4that has been in use by HashiCorp tooling for over 3 years. While initially
5created for [Packer](https://www.packer.io), it has since been used by
6[Terraform](https://www.terraform.io) and [Otto](https://www.ottoproject.io),
7with plans to also use it for [Nomad](https://www.nomadproject.io) and
10While the plugin system is over RPC, it is currently only designed to work
11over a local [reliable] network. Plugins over a real network are not supported
12and will lead to unexpected behavior.
14This plugin system has been used on millions of machines across many different
15projects and has proven to be battle hardened and ready for production use.
17## Features
19The HashiCorp plugin system supports a number of features:
21**Plugins are Go interface implementations.** This makes writing and consuming
22plugins feel very natural. To a plugin author: you just implement an
23interface as if it were going to run in the same process. For a plugin user:
24you just use and call functions on an interface as if it were in the same
25process. This plugin system handles the communication in between.
27**Complex arguments and return values are supported.** This library
28provides APIs for handling complex arguments and return values such
29as interfaces, `io.Reader/Writer`, etc. We do this by giving you a library
30(`MuxBroker`) for creating new connections between the client/server to
31serve additional interfaces or transfer raw data.
33**Bidirectional communication.** Because the plugin system supports
34complex arguments, the host process can send it interface implementations
35and the plugin can call back into the host process.
37**Built-in Logging.** Any plugins that use the `log` standard library
38will have log data automatically sent to the host process. The host
39process will mirror this output prefixed with the path to the plugin
40binary. This makes debugging with plugins simple.
42**Protocol Versioning.** A very basic "protocol version" is supported that
43can be incremented to invalidate any previous plugins. This is useful when
44interface signatures are changing, protocol level changes are necessary,
45etc. When a protocol version is incompatible, a human friendly error
46message is shown to the end user.
48**Stdout/Stderr Syncing.** While plugins are subprocesses, they can continue
49to use stdout/stderr as usual and the output will get mirrored back to
50the host process. The host process can control what `io.Writer` these
51streams go to to prevent this from happening.
53**TTY Preservation.** Plugin subprocesses are connected to the identical
54stdin file descriptor as the host process, allowing software that requires
55a TTY to work. For example, a plugin can execute `ssh` and even though there
56are multiple subprocesses and RPC happening, it will look and act perfectly
57to the end user.
59**Host upgrade while a plugin is running.** Plugins can be "reattached"
60so that the host process can be upgraded while the plugin is still running.
61This requires the host/plugin to know this is possible and daemonize
62properly. `NewClient` takes a `ReattachConfig` to determine if and how to
65## Architecture
67The HashiCorp plugin system works by launching subprocesses and communicating
68over RPC (using standard `net/rpc`). A single connection is made between
69any plugin and the host process, and we use a
70[connection multiplexing](https://github.com/hashicorp/yamux)
71library to multiplex any other connections on top.
73This architecture has a number of benefits:
75 * Plugins can't crash your host process: A panic in a plugin doesn't
76 panic the plugin user.
78 * Plugins are very easy to write: just write a Go application and `go build`.
79 Theoretically you could also use another language as long as it can
80 communicate the Go `net/rpc` protocol but this hasn't yet been tried.
82 * Plugins are very easy to install: just put the binary in a location where
83 the host will find it (depends on the host but this library also provides
84 helpers), and the plugin host handles the rest.
86 * Plugins can be relatively secure: The plugin only has access to the
87 interfaces and args given to it, not to the entire memory space of the
88 process. More security features are planned (see the coming soon section
89 below).
91## Usage
93To use the plugin system, you must take the following steps. These are
94high-level steps that must be done. Examples are available in the
95`examples/` directory.
97 1. Choose the interface(s) you want to expose for plugins.
99 2. For each interface, implement an implementation of that interface
100 that communicates over an `*rpc.Client` (from the standard `net/rpc`
101 package) for every function call. Likewise, implement the RPC server
102 struct this communicates to which is then communicating to a real,
103 concrete implementation.
105 3. Create a `Plugin` implementation that knows how to create the RPC
106 client/server for a given plugin type.
108 4. Plugin authors call `plugin.Serve` to serve a plugin from the
109 `main` function.
111 5. Plugin users use `plugin.Client` to launch a subprocess and request
112 an interface implementation over RPC.
114That's it! In practice, step 2 is the most tedious and time consuming step.
115Even so, it isn't very difficult and you can see examples in the `examples/`
116directory as well as throughout our various open source projects.
118For complete API documentation, see [GoDoc](https://godoc.org/github.com/hashicorp/go-plugin).
120## Roadmap
122Our plugin system is constantly evolving. As we use the plugin system for
123new projects or for new features in existing projects, we constantly find
124improvements we can make.
126At this point in time, the roadmap for the plugin system is:
128**Cryptographically Secure Plugins.** We'll implement signing plugins
129and loading signed plugins in order to allow Vault to make use of multi-process
130in a secure way.
132**Semantic Versioning.** Plugins will be able to implement a semantic version.
133This plugin system will give host processes a system for constraining
134versions. This is in addition to the protocol versioning already present
135which is more for larger underlying changes.
137**Plugin fetching.** We will integrate with [go-getter](https://github.com/hashicorp/go-getter)
138to support automatic download + install of plugins. Paired with cryptographically
139secure plugins (above), we can make this a safe operation for an amazing
140user experience.
142## What About Shared Libraries?
144When we started using plugins (late 2012, early 2013), plugins over RPC
145were the only option since Go didn't support dynamic library loading. Today,
146Go still doesn't support dynamic library loading, but they do intend to.
147Since 2012, our plugin system has stabilized from millions of users using it,
148and has many benefits we've come to value greatly.
150For example, we intend to use this plugin system in
151[Vault](https://www.vaultproject.io), and dynamic library loading will
152simply never be acceptable in Vault for security reasons. That is an extreme
153example, but we believe our library system has more upsides than downsides
154over dynamic library loading and since we've had it built and tested for years,
155we'll likely continue to use it.
157Shared libraries have one major advantage over our system which is much
158higher performance. In real world scenarios across our various tools,
159we've never required any more performance out of our plugin system and it
160has seen very high throughput, so this isn't a concern for us at the moment.
1package plugin
3import (
4 "bufio"
5 "errors"
6 "fmt"
7 "io"
8 "io/ioutil"
9 "log"
10 "net"
11 "os"
12 "os/exec"
13 "path/filepath"
14 "strconv"
15 "strings"
16 "sync"
17 "sync/atomic"
18 "time"
19 "unicode"
22// If this is 1, then we've called CleanupClients. This can be used
23// by plugin RPC implementations to change error behavior since you
24// can expected network connection errors at this point. This should be
25// read by using sync/atomic.
26var Killed uint32 = 0
28// This is a slice of the "managed" clients which are cleaned up when
29// calling Cleanup
30var managedClients = make([]*Client, 0, 5)
31var managedClientsLock sync.Mutex
33// Error types
34var (
35 // ErrProcessNotFound is returned when a client is instantiated to
36 // reattach to an existing process and it isn't found.
37 ErrProcessNotFound = errors.New("Reattachment process not found")
40// Client handles the lifecycle of a plugin application. It launches
41// plugins, connects to them, dispenses interface implementations, and handles
42// killing the process.
44// Plugin hosts should use one Client for each plugin executable. To
45// dispense a plugin type, use the `Client.Client` function, and then
46// cal `Dispense`. This awkward API is mostly historical but is used to split
47// the client that deals with subprocess management and the client that
48// does RPC management.
50// See NewClient and ClientConfig for using a Client.
51type Client struct {
52 config *ClientConfig
53 exited bool
54 doneLogging chan struct{}
55 l sync.Mutex
56 address net.Addr
57 process *os.Process
58 client *RPCClient
61// ClientConfig is the configuration used to initialize a new
62// plugin client. After being used to initialize a plugin client,
63// that configuration must not be modified again.
64type ClientConfig struct {
65 // HandshakeConfig is the configuration that must match servers.
66 HandshakeConfig
68 // Plugins are the plugins that can be consumed.
69 Plugins map[string]Plugin
71 // One of the following must be set, but not both.
72 //
73 // Cmd is the unstarted subprocess for starting the plugin. If this is
74 // set, then the Client starts the plugin process on its own and connects
75 // to it.
76 //
77 // Reattach is configuration for reattaching to an existing plugin process
78 // that is already running. This isn't common.
79 Cmd *exec.Cmd
80 Reattach *ReattachConfig
82 // Managed represents if the client should be managed by the
83 // plugin package or not. If true, then by calling CleanupClients,
84 // it will automatically be cleaned up. Otherwise, the client
85 // user is fully responsible for making sure to Kill all plugin
86 // clients. By default the client is _not_ managed.
87 Managed bool
89 // The minimum and maximum port to use for communicating with
90 // the subprocess. If not set, this defaults to 10,000 and 25,000
91 // respectively.
92 MinPort, MaxPort uint
94 // StartTimeout is the timeout to wait for the plugin to say it
95 // has started successfully.
96 StartTimeout time.Duration
98 // If non-nil, then the stderr of the client will be written to here
99 // (as well as the log). This is the original os.Stderr of the subprocess.
100 // This isn't the output of synced stderr.
101 Stderr io.Writer
103 // SyncStdout, SyncStderr can be set to override the
104 // respective os.Std* values in the plugin. Care should be taken to
105 // avoid races here. If these are nil, then this will automatically be
106 // hooked up to os.Stdin, Stdout, and Stderr, respectively.
107 //
108 // If the default values (nil) are used, then this package will not
109 // sync any of these streams.
110 SyncStdout io.Writer
111 SyncStderr io.Writer
114// ReattachConfig is used to configure a client to reattach to an
115// already-running plugin process. You can retrieve this information by
116// calling ReattachConfig on Client.
117type ReattachConfig struct {
118 Addr net.Addr
119 Pid int
122// This makes sure all the managed subprocesses are killed and properly
123// logged. This should be called before the parent process running the
124// plugins exits.
126// This must only be called _once_.
127func CleanupClients() {
128 // Set the killed to true so that we don't get unexpected panics
129 atomic.StoreUint32(&Killed, 1)
131 // Kill all the managed clients in parallel and use a WaitGroup
132 // to wait for them all to finish up.
133 var wg sync.WaitGroup
134 managedClientsLock.Lock()
135 for _, client := range managedClients {
136 wg.Add(1)
138 go func(client *Client) {
139 client.Kill()
140 wg.Done()
141 }(client)
142 }
143 managedClientsLock.Unlock()
145 log.Println("[DEBUG] plugin: waiting for all plugin processes to complete...")
146 wg.Wait()
149// Creates a new plugin client which manages the lifecycle of an external
150// plugin and gets the address for the RPC connection.
152// The client must be cleaned up at some point by calling Kill(). If
153// the client is a managed client (created with NewManagedClient) you
154// can just call CleanupClients at the end of your program and they will
155// be properly cleaned.
156func NewClient(config *ClientConfig) (c *Client) {
157 if config.MinPort == 0 && config.MaxPort == 0 {
158 config.MinPort = 10000
159 config.MaxPort = 25000
160 }
162 if config.StartTimeout == 0 {
163 config.StartTimeout = 1 * time.Minute
164 }
166 if config.Stderr == nil {
167 config.Stderr = ioutil.Discard
168 }
170 if config.SyncStdout == nil {
171 config.SyncStdout = ioutil.Discard
172 }
173 if config.SyncStderr == nil {
174 config.SyncStderr = ioutil.Discard
175 }
177 c = &Client{config: config}
178 if config.Managed {
179 managedClientsLock.Lock()
180 managedClients = append(managedClients, c)
181 managedClientsLock.Unlock()
182 }
184 return
187// Client returns an RPC client for the plugin.
189// Subsequent calls to this will return the same RPC client.
190func (c *Client) Client() (*RPCClient, error) {
191 addr, err := c.Start()
192 if err != nil {
193 return nil, err
194 }
196 c.l.Lock()
197 defer c.l.Unlock()
199 if c.client != nil {
200 return c.client, nil
201 }
203 // Connect to the client
204 conn, err := net.Dial(addr.Network(), addr.String())
205 if err != nil {
206 return nil, err
207 }
208 if tcpConn, ok := conn.(*net.TCPConn); ok {
209 // Make sure to set keep alive so that the connection doesn't die
210 tcpConn.SetKeepAlive(true)
211 }
213 // Create the actual RPC client
214 c.client, err = NewRPCClient(conn, c.config.Plugins)
215 if err != nil {
216 conn.Close()
217 return nil, err
218 }
220 // Begin the stream syncing so that stdin, out, err work properly
221 err = c.client.SyncStreams(
222 c.config.SyncStdout,
223 c.config.SyncStderr)
224 if err != nil {
225 c.client.Close()
226 c.client = nil
227 return nil, err
228 }
230 return c.client, nil
233// Tells whether or not the underlying process has exited.
234func (c *Client) Exited() bool {
235 c.l.Lock()
236 defer c.l.Unlock()
237 return c.exited
240// End the executing subprocess (if it is running) and perform any cleanup
241// tasks necessary such as capturing any remaining logs and so on.
243// This method blocks until the process successfully exits.
245// This method can safely be called multiple times.
246func (c *Client) Kill() {
247 // Grab a lock to read some private fields.
248 c.l.Lock()
249 process := c.process
250 addr := c.address
251 doneCh := c.doneLogging
252 c.l.Unlock()
254 // If there is no process, we never started anything. Nothing to kill.
255 if process == nil {
256 return
257 }
259 // We need to check for address here. It is possible that the plugin
260 // started (process != nil) but has no address (addr == nil) if the
261 // plugin failed at startup. If we do have an address, we need to close
262 // the plugin net connections.
263 graceful := false
264 if addr != nil {
265 // Close the client to cleanly exit the process.
266 client, err := c.Client()
267 if err == nil {
268 err = client.Close()
270 // If there is no error, then we attempt to wait for a graceful
271 // exit. If there was an error, we assume that graceful cleanup
272 // won't happen and just force kill.
273 graceful = err == nil
274 if err != nil {
275 // If there was an error just log it. We're going to force
276 // kill in a moment anyways.
277 log.Printf(
278 "[WARN] plugin: error closing client during Kill: %s", err)
279 }
280 }
281 }
283 // If we're attempting a graceful exit, then we wait for a short period
284 // of time to allow that to happen. To wait for this we just wait on the
285 // doneCh which would be closed if the process exits.
286 if graceful {
287 select {
288 case <-doneCh:
289 return
290 case <-time.After(250 * time.Millisecond):
291 }
292 }
294 // If graceful exiting failed, just kill it
295 process.Kill()
297 // Wait for the client to finish logging so we have a complete log
298 <-doneCh
301// Starts the underlying subprocess, communicating with it to negotiate
302// a port for RPC connections, and returning the address to connect via RPC.
304// This method is safe to call multiple times. Subsequent calls have no effect.
305// Once a client has been started once, it cannot be started again, even if
306// it was killed.
307func (c *Client) Start() (addr net.Addr, err error) {
308 c.l.Lock()
309 defer c.l.Unlock()
311 if c.address != nil {
312 return c.address, nil
313 }
315 // If one of cmd or reattach isn't set, then it is an error. We wrap
316 // this in a {} for scoping reasons, and hopeful that the escape
317 // analysis will pop the stock here.
318 {
319 cmdSet := c.config.Cmd != nil
320 attachSet := c.config.Reattach != nil
321 if cmdSet == attachSet {
322 return nil, fmt.Errorf("Only one of Cmd or Reattach must be set")
323 }
324 }
326 // Create the logging channel for when we kill
327 c.doneLogging = make(chan struct{})
329 if c.config.Reattach != nil {
330 // Verify the process still exists. If not, then it is an error
331 p, err := os.FindProcess(c.config.Reattach.Pid)
332 if err != nil {
333 return nil, err
334 }
336 // Attempt to connect to the addr since on Unix systems FindProcess
337 // doesn't actually return an error if it can't find the process.
338 conn, err := net.Dial(
339 c.config.Reattach.Addr.Network(),
340 c.config.Reattach.Addr.String())
341 if err != nil {
342 p.Kill()
343 return nil, ErrProcessNotFound
344 }
345 conn.Close()
347 // Goroutine to mark exit status
348 go func(pid int) {
349 // Wait for the process to die
350 pidWait(pid)
352 // Log so we can see it
353 log.Printf("[DEBUG] plugin: reattached plugin process exited\n")
355 // Mark it
356 c.l.Lock()
357 defer c.l.Unlock()
358 c.exited = true
360 // Close the logging channel since that doesn't work on reattach
361 close(c.doneLogging)
362 }(p.Pid)
364 // Set the address and process
365 c.address = c.config.Reattach.Addr
366 c.process = p
368 return c.address, nil
369 }
371 env := []string{
372 fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue),
373 fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort),
374 fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort),
375 }
377 stdout_r, stdout_w := io.Pipe()
378 stderr_r, stderr_w := io.Pipe()
380 cmd := c.config.Cmd
381 cmd.Env = append(cmd.Env, os.Environ()...)
382 cmd.Env = append(cmd.Env, env...)
383 cmd.Stdin = os.Stdin
384 cmd.Stderr = stderr_w
385 cmd.Stdout = stdout_w
387 log.Printf("[DEBUG] plugin: starting plugin: %s %#v", cmd.Path, cmd.Args)
388 err = cmd.Start()
389 if err != nil {
390 return
391 }
393 // Set the process
394 c.process = cmd.Process
396 // Make sure the command is properly cleaned up if there is an error
397 defer func() {
398 r := recover()
400 if err != nil || r != nil {
401 cmd.Process.Kill()
402 }
404 if r != nil {
405 panic(r)
406 }
407 }()
409 // Start goroutine to wait for process to exit
410 exitCh := make(chan struct{})
411 go func() {
412 // Make sure we close the write end of our stderr/stdout so
413 // that the readers send EOF properly.
414 defer stderr_w.Close()
415 defer stdout_w.Close()
417 // Wait for the command to end.
418 cmd.Wait()
420 // Log and make sure to flush the logs write away
421 log.Printf("[DEBUG] plugin: %s: plugin process exited\n", cmd.Path)
422 os.Stderr.Sync()
424 // Mark that we exited
425 close(exitCh)
427 // Set that we exited, which takes a lock
428 c.l.Lock()
429 defer c.l.Unlock()
430 c.exited = true
431 }()
433 // Start goroutine that logs the stderr
434 go c.logStderr(stderr_r)
436 // Start a goroutine that is going to be reading the lines
437 // out of stdout
438 linesCh := make(chan []byte)
439 go func() {
440 defer close(linesCh)
442 buf := bufio.NewReader(stdout_r)
443 for {
444 line, err := buf.ReadBytes('\n')
445 if line != nil {
446 linesCh <- line
447 }
449 if err == io.EOF {
450 return
451 }
452 }
453 }()
455 // Make sure after we exit we read the lines from stdout forever
456 // so they don't block since it is an io.Pipe
457 defer func() {
458 go func() {
459 for _ = range linesCh {
460 }
461 }()
462 }()
464 // Some channels for the next step
465 timeout := time.After(c.config.StartTimeout)
467 // Start looking for the address
468 log.Printf("[DEBUG] plugin: waiting for RPC address for: %s", cmd.Path)
469 select {
470 case <-timeout:
471 err = errors.New("timeout while waiting for plugin to start")
472 case <-exitCh:
473 err = errors.New("plugin exited before we could connect")
474 case lineBytes := <-linesCh:
475 // Trim the line and split by "|" in order to get the parts of
476 // the output.
477 line := strings.TrimSpace(string(lineBytes))
478 parts := strings.SplitN(line, "|", 4)
479 if len(parts) < 4 {
480 err = fmt.Errorf(
481 "Unrecognized remote plugin message: %s\n\n"+
482 "This usually means that the plugin is either invalid or simply\n"+
483 "needs to be recompiled to support the latest protocol.", line)
484 return
485 }
487 // Check the core protocol. Wrapped in a {} for scoping.
488 {
489 var coreProtocol int64
490 coreProtocol, err = strconv.ParseInt(parts[0], 10, 0)
491 if err != nil {
492 err = fmt.Errorf("Error parsing core protocol version: %s", err)
493 return
494 }
496 if int(coreProtocol) != CoreProtocolVersion {
497 err = fmt.Errorf("Incompatible core API version with plugin. "+
498 "Plugin version: %s, Ours: %d\n\n"+
499 "To fix this, the plugin usually only needs to be recompiled.\n"+
500 "Please report this to the plugin author.", parts[0], CoreProtocolVersion)
501 return
502 }
503 }
505 // Parse the protocol version
506 var protocol int64
507 protocol, err = strconv.ParseInt(parts[1], 10, 0)
508 if err != nil {
509 err = fmt.Errorf("Error parsing protocol version: %s", err)
510 return
511 }
513 // Test the API version
514 if uint(protocol) != c.config.ProtocolVersion {
515 err = fmt.Errorf("Incompatible API version with plugin. "+
516 "Plugin version: %s, Ours: %d", parts[1], c.config.ProtocolVersion)
517 return
518 }
520 switch parts[2] {
521 case "tcp":
522 addr, err = net.ResolveTCPAddr("tcp", parts[3])
523 case "unix":
524 addr, err = net.ResolveUnixAddr("unix", parts[3])
525 default:
526 err = fmt.Errorf("Unknown address type: %s", parts[3])
527 }
528 }
530 c.address = addr
531 return
534// ReattachConfig returns the information that must be provided to NewClient
535// to reattach to the plugin process that this client started. This is
536// useful for plugins that detach from their parent process.
538// If this returns nil then the process hasn't been started yet. Please
539// call Start or Client before calling this.
540func (c *Client) ReattachConfig() *ReattachConfig {
541 c.l.Lock()
542 defer c.l.Unlock()
544 if c.address == nil {
545 return nil
546 }
548 if c.config.Cmd != nil && c.config.Cmd.Process == nil {
549 return nil
550 }
552 // If we connected via reattach, just return the information as-is
553 if c.config.Reattach != nil {
554 return c.config.Reattach
555 }
557 return &ReattachConfig{
558 Addr: c.address,
559 Pid: c.config.Cmd.Process.Pid,
560 }
563func (c *Client) logStderr(r io.Reader) {
564 bufR := bufio.NewReader(r)
565 for {
566 line, err := bufR.ReadString('\n')
567 if line != "" {
568 c.config.Stderr.Write([]byte(line))
570 line = strings.TrimRightFunc(line, unicode.IsSpace)
571 log.Printf("[DEBUG] plugin: %s: %s", filepath.Base(c.config.Cmd.Path), line)
572 }
574 if err == io.EOF {
575 break
576 }
577 }
579 // Flag that we've completed logging for others
580 close(c.doneLogging)
1package plugin
3import (
4 "path/filepath"
7// Discover discovers plugins that are in a given directory.
9// The directory doesn't need to be absolute. For example, "." will work fine.
11// This currently assumes any file matching the glob is a plugin.
12// In the future this may be smarter about checking that a file is
13// executable and so on.
15// TODO: test
16func Discover(glob, dir string) ([]string, error) {
17 var err error
19 // Make the directory absolute if it isn't already
20 if !filepath.IsAbs(dir) {
21 dir, err = filepath.Abs(dir)
22 if err != nil {
23 return nil, err
24 }
25 }
27 return filepath.Glob(filepath.Join(dir, glob))
1package plugin
3// This is a type that wraps error types so that they can be messaged
4// across RPC channels. Since "error" is an interface, we can't always
5// gob-encode the underlying structure. This is a valid error interface
6// implementer that we will push across.
7type BasicError struct {
8 Message string
11// NewBasicError is used to create a BasicError.
13// err is allowed to be nil.
14func NewBasicError(err error) *BasicError {
15 if err == nil {
16 return nil
17 }
19 return &BasicError{err.Error()}
22func (e *BasicError) Error() string {
23 return e.Message
1package plugin
3import (
4 "encoding/binary"
5 "fmt"
6 "log"
7 "net"
8 "sync"
9 "sync/atomic"
10 "time"
12 "github.com/hashicorp/yamux"
15// MuxBroker is responsible for brokering multiplexed connections by unique ID.
17// It is used by plugins to multiplex multiple RPC connections and data
18// streams on top of a single connection between the plugin process and the
19// host process.
21// This allows a plugin to request a channel with a specific ID to connect to
22// or accept a connection from, and the broker handles the details of
23// holding these channels open while they're being negotiated.
25// The Plugin interface has access to these for both Server and Client.
26// The broker can be used by either (optionally) to reserve and connect to
27// new multiplexed streams. This is useful for complex args and return values,
28// or anything else you might need a data stream for.
29type MuxBroker struct {
30 nextId uint32
31 session *yamux.Session
32 streams map[uint32]*muxBrokerPending
34 sync.Mutex
37type muxBrokerPending struct {
38 ch chan net.Conn
39 doneCh chan struct{}
42func newMuxBroker(s *yamux.Session) *MuxBroker {
43 return &MuxBroker{
44 session: s,
45 streams: make(map[uint32]*muxBrokerPending),
46 }
49// Accept accepts a connection by ID.
51// This should not be called multiple times with the same ID at one time.
52func (m *MuxBroker) Accept(id uint32) (net.Conn, error) {
53 var c net.Conn
54 p := m.getStream(id)
55 select {
56 case c = <-p.ch:
57 close(p.doneCh)
58 case <-time.After(5 * time.Second):
59 m.Lock()
60 defer m.Unlock()
61 delete(m.streams, id)
63 return nil, fmt.Errorf("timeout waiting for accept")
64 }
66 // Ack our connection
67 if err := binary.Write(c, binary.LittleEndian, id); err != nil {
68 c.Close()
69 return nil, err
70 }
72 return c, nil
75// AcceptAndServe is used to accept a specific stream ID and immediately
76// serve an RPC server on that stream ID. This is used to easily serve
77// complex arguments.
79// The served interface is always registered to the "Plugin" name.
80func (m *MuxBroker) AcceptAndServe(id uint32, v interface{}) {
81 conn, err := m.Accept(id)
82 if err != nil {
83 log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
84 return
85 }
87 serve(conn, "Plugin", v)
90// Close closes the connection and all sub-connections.
91func (m *MuxBroker) Close() error {
92 return m.session.Close()
95// Dial opens a connection by ID.
96func (m *MuxBroker) Dial(id uint32) (net.Conn, error) {
97 // Open the stream
98 stream, err := m.session.OpenStream()
99 if err != nil {
100 return nil, err
101 }
103 // Write the stream ID onto the wire.
104 if err := binary.Write(stream, binary.LittleEndian, id); err != nil {
105 stream.Close()
106 return nil, err
107 }
109 // Read the ack that we connected. Then we're off!
110 var ack uint32
111 if err := binary.Read(stream, binary.LittleEndian, &ack); err != nil {
112 stream.Close()
113 return nil, err
114 }
115 if ack != id {
116 stream.Close()
117 return nil, fmt.Errorf("bad ack: %d (expected %d)", ack, id)
118 }
120 return stream, nil
123// NextId returns a unique ID to use next.
125// It is possible for very long-running plugin hosts to wrap this value,
126// though it would require a very large amount of RPC calls. In practice
127// we've never seen it happen.
128func (m *MuxBroker) NextId() uint32 {
129 return atomic.AddUint32(&m.nextId, 1)
132// Run starts the brokering and should be executed in a goroutine, since it
133// blocks forever, or until the session closes.
135// Uses of MuxBroker never need to call this. It is called internally by
136// the plugin host/client.
137func (m *MuxBroker) Run() {
138 for {
139 stream, err := m.session.AcceptStream()
140 if err != nil {
141 // Once we receive an error, just exit
142 break
143 }
145 // Read the stream ID from the stream
146 var id uint32
147 if err := binary.Read(stream, binary.LittleEndian, &id); err != nil {
148 stream.Close()
149 continue
150 }
152 // Initialize the waiter
153 p := m.getStream(id)
154 select {
155 case p.ch <- stream:
156 default:
157 }
159 // Wait for a timeout
160 go m.timeoutWait(id, p)
161 }
164func (m *MuxBroker) getStream(id uint32) *muxBrokerPending {
165 m.Lock()
166 defer m.Unlock()
168 p, ok := m.streams[id]
169 if ok {
170 return p
171 }
173 m.streams[id] = &muxBrokerPending{
174 ch: make(chan net.Conn, 1),
175 doneCh: make(chan struct{}),
176 }
177 return m.streams[id]
180func (m *MuxBroker) timeoutWait(id uint32, p *muxBrokerPending) {
181 // Wait for the stream to either be picked up and connected, or
182 // for a timeout.
183 timeout := false
184 select {
185 case <-p.doneCh:
186 case <-time.After(5 * time.Second):
187 timeout = true
188 }
190 m.Lock()
191 defer m.Unlock()
193 // Delete the stream so no one else can grab it
194 delete(m.streams, id)
196 // If we timed out, then check if we have a channel in the buffer,
197 // and if so, close it.
198 if timeout {
199 select {
200 case s := <-p.ch:
201 s.Close()
202 }
203 }
1// The plugin package exposes functions and helpers for communicating to
2// plugins which are implemented as standalone binary applications.
4// plugin.Client fully manages the lifecycle of executing the application,
5// connecting to it, and returning the RPC client for dispensing plugins.
7// plugin.Serve fully manages listeners to expose an RPC server from a binary
8// that plugin.Client can connect to.
9package plugin
11import (
12 "net/rpc"
15// Plugin is the interface that is implemented to serve/connect to an
16// inteface implementation.
17type Plugin interface {
18 // Server should return the RPC server compatible struct to serve
19 // the methods that the Client calls over net/rpc.
20 Server(*MuxBroker) (interface{}, error)
22 // Client returns an interface implementation for the plugin you're
23 // serving that communicates to the server end of the plugin.
24 Client(*MuxBroker, *rpc.Client) (interface{}, error)
1package plugin
3import (
4 "time"
7// pidAlive checks whether a pid is alive.
8func pidAlive(pid int) bool {
9 return _pidAlive(pid)
12// pidWait blocks for a process to exit.
13func pidWait(pid int) error {
14 ticker := time.NewTicker(1 * time.Second)
15 defer ticker.Stop()
17 for range ticker.C {
18 if !pidAlive(pid) {
19 break
20 }
21 }
23 return nil
1// +build !windows
3package plugin
5import (
6 "os"
7 "syscall"
10// _pidAlive tests whether a process is alive or not by sending it Signal 0,
11// since Go otherwise has no way to test this.
12func _pidAlive(pid int) bool {
13 proc, err := os.FindProcess(pid)
14 if err == nil {
15 err = proc.Signal(syscall.Signal(0))
16 }
18 return err == nil
1package plugin
3import (
4 "syscall"
7const (
8 // Weird name but matches the MSDN docs
9 exit_STILL_ACTIVE = 259
11 processDesiredAccess = syscall.STANDARD_RIGHTS_READ |
13 syscall.SYNCHRONIZE
16// _pidAlive tests whether a process is alive or not
17func _pidAlive(pid int) bool {
18 h, err := syscall.OpenProcess(processDesiredAccess, false, uint32(pid))
19 if err != nil {
20 return false
21 }
23 var ec uint32
24 if e := syscall.GetExitCodeProcess(h, &ec); e != nil {
25 return false
26 }
28 return ec == exit_STILL_ACTIVE
1package plugin
3import (
4 "fmt"
5 "io"
6 "net"
7 "net/rpc"
9 "github.com/hashicorp/yamux"
12// RPCClient connects to an RPCServer over net/rpc to dispense plugin types.
13type RPCClient struct {
14 broker *MuxBroker
15 control *rpc.Client
16 plugins map[string]Plugin
18 // These are the streams used for the various stdout/err overrides
19 stdout, stderr net.Conn
22// NewRPCClient creates a client from an already-open connection-like value.
23// Dial is typically used instead.
24func NewRPCClient(conn io.ReadWriteCloser, plugins map[string]Plugin) (*RPCClient, error) {
25 // Create the yamux client so we can multiplex
26 mux, err := yamux.Client(conn, nil)
27 if err != nil {
28 conn.Close()
29 return nil, err
30 }
32 // Connect to the control stream.
33 control, err := mux.Open()
34 if err != nil {
35 mux.Close()
36 return nil, err
37 }
39 // Connect stdout, stderr streams
40 stdstream := make([]net.Conn, 2)
41 for i, _ := range stdstream {
42 stdstream[i], err = mux.Open()
43 if err != nil {
44 mux.Close()
45 return nil, err
46 }
47 }
49 // Create the broker and start it up
50 broker := newMuxBroker(mux)
51 go broker.Run()
53 // Build the client using our broker and control channel.
54 return &RPCClient{
55 broker: broker,
56 control: rpc.NewClient(control),
57 plugins: plugins,
58 stdout: stdstream[0],
59 stderr: stdstream[1],
60 }, nil
63// SyncStreams should be called to enable syncing of stdout,
64// stderr with the plugin.
66// This will return immediately and the syncing will continue to happen
67// in the background. You do not need to launch this in a goroutine itself.
69// This should never be called multiple times.
70func (c *RPCClient) SyncStreams(stdout io.Writer, stderr io.Writer) error {
71 go copyStream("stdout", stdout, c.stdout)
72 go copyStream("stderr", stderr, c.stderr)
73 return nil
76// Close closes the connection. The client is no longer usable after this
77// is called.
78func (c *RPCClient) Close() error {
79 // Call the control channel and ask it to gracefully exit. If this
80 // errors, then we save it so that we always return an error but we
81 // want to try to close the other channels anyways.
82 var empty struct{}
83 returnErr := c.control.Call("Control.Quit", true, &empty)
85 // Close the other streams we have
86 if err := c.control.Close(); err != nil {
87 return err
88 }
89 if err := c.stdout.Close(); err != nil {
90 return err
91 }
92 if err := c.stderr.Close(); err != nil {
93 return err
94 }
95 if err := c.broker.Close(); err != nil {
96 return err
97 }
99 // Return back the error we got from Control.Quit. This is very important
100 // since we MUST return non-nil error if this fails so that Client.Kill
101 // will properly try a process.Kill.
102 return returnErr
105func (c *RPCClient) Dispense(name string) (interface{}, error) {
106 p, ok := c.plugins[name]
107 if !ok {
108 return nil, fmt.Errorf("unknown plugin type: %s", name)
109 }
111 var id uint32
112 if err := c.control.Call(
113 "Dispenser.Dispense", name, &id); err != nil {
114 return nil, err
115 }
117 conn, err := c.broker.Dial(id)
118 if err != nil {
119 return nil, err
120 }
122 return p.Client(c.broker, rpc.NewClient(conn))
1package plugin
3import (
4 "errors"
5 "fmt"
6 "io"
7 "log"
8 "net"
9 "net/rpc"
10 "sync"
12 "github.com/hashicorp/yamux"
15// RPCServer listens for network connections and then dispenses interface
16// implementations over net/rpc.
18// After setting the fields below, they shouldn't be read again directly
19// from the structure which may be reading/writing them concurrently.
20type RPCServer struct {
21 Plugins map[string]Plugin
23 // Stdout, Stderr are what this server will use instead of the
24 // normal stdin/out/err. This is because due to the multi-process nature
25 // of our plugin system, we can't use the normal process values so we
26 // make our own custom one we pipe across.
27 Stdout io.Reader
28 Stderr io.Reader
30 // DoneCh should be set to a non-nil channel that will be closed
31 // when the control requests the RPC server to end.
32 DoneCh chan<- struct{}
34 lock sync.Mutex
37// Accept accepts connections on a listener and serves requests for
38// each incoming connection. Accept blocks; the caller typically invokes
39// it in a go statement.
40func (s *RPCServer) Accept(lis net.Listener) {
41 for {
42 conn, err := lis.Accept()
43 if err != nil {
44 log.Printf("[ERR] plugin: plugin server: %s", err)
45 return
46 }
48 go s.ServeConn(conn)
49 }
52// ServeConn runs a single connection.
54// ServeConn blocks, serving the connection until the client hangs up.
55func (s *RPCServer) ServeConn(conn io.ReadWriteCloser) {
56 // First create the yamux server to wrap this connection
57 mux, err := yamux.Server(conn, nil)
58 if err != nil {
59 conn.Close()
60 log.Printf("[ERR] plugin: error creating yamux server: %s", err)
61 return
62 }
64 // Accept the control connection
65 control, err := mux.Accept()
66 if err != nil {
67 mux.Close()
68 if err != io.EOF {
69 log.Printf("[ERR] plugin: error accepting control connection: %s", err)
70 }
72 return
73 }
75 // Connect the stdstreams (in, out, err)
76 stdstream := make([]net.Conn, 2)
77 for i, _ := range stdstream {
78 stdstream[i], err = mux.Accept()
79 if err != nil {
80 mux.Close()
81 log.Printf("[ERR] plugin: accepting stream %d: %s", i, err)
82 return
83 }
84 }
86 // Copy std streams out to the proper place
87 go copyStream("stdout", stdstream[0], s.Stdout)
88 go copyStream("stderr", stdstream[1], s.Stderr)
90 // Create the broker and start it up
91 broker := newMuxBroker(mux)
92 go broker.Run()
94 // Use the control connection to build the dispenser and serve the
95 // connection.
96 server := rpc.NewServer()
97 server.RegisterName("Control", &controlServer{
98 server: s,
99 })
100 server.RegisterName("Dispenser", &dispenseServer{
101 broker: broker,
102 plugins: s.Plugins,
103 })
104 server.ServeConn(control)
107// done is called internally by the control server to trigger the
108// doneCh to close which is listened to by the main process to cleanly
109// exit.
110func (s *RPCServer) done() {
111 s.lock.Lock()
112 defer s.lock.Unlock()
114 if s.DoneCh != nil {
115 close(s.DoneCh)
116 s.DoneCh = nil
117 }
120// dispenseServer dispenses variousinterface implementations for Terraform.
121type controlServer struct {
122 server *RPCServer
125func (c *controlServer) Quit(
126 null bool, response *struct{}) error {
127 // End the server
128 c.server.done()
130 // Always return true
131 *response = struct{}{}
133 return nil
136// dispenseServer dispenses variousinterface implementations for Terraform.
137type dispenseServer struct {
138 broker *MuxBroker
139 plugins map[string]Plugin
142func (d *dispenseServer) Dispense(
143 name string, response *uint32) error {
144 // Find the function to create this implementation
145 p, ok := d.plugins[name]
146 if !ok {
147 return fmt.Errorf("unknown plugin type: %s", name)
148 }
150 // Create the implementation first so we know if there is an error.
151 impl, err := p.Server(d.broker)
152 if err != nil {
153 // We turn the error into an errors error so that it works across RPC
154 return errors.New(err.Error())
155 }
157 // Reserve an ID for our implementation
158 id := d.broker.NextId()
159 *response = id
161 // Run the rest in a goroutine since it can only happen once this RPC
162 // call returns. We wait for a connection for the plugin implementation
163 // and serve it.
164 go func() {
165 conn, err := d.broker.Accept(id)
166 if err != nil {
167 log.Printf("[ERR] go-plugin: plugin dispense error: %s: %s", name, err)
168 return
169 }
171 serve(conn, "Plugin", impl)
172 }()
174 return nil
177func serve(conn io.ReadWriteCloser, name string, v interface{}) {
178 server := rpc.NewServer()
179 if err := server.RegisterName(name, v); err != nil {
180 log.Printf("[ERR] go-plugin: plugin dispense error: %s", err)
181 return
182 }
184 server.ServeConn(conn)
1package plugin
3import (
4 "errors"
5 "fmt"
6 "io/ioutil"
7 "log"
8 "net"
9 "os"
10 "os/signal"
11 "runtime"
12 "strconv"
13 "sync/atomic"
16// CoreProtocolVersion is the ProtocolVersion of the plugin system itself.
17// We will increment this whenever we change any protocol behavior. This
18// will invalidate any prior plugins but will at least allow us to iterate
19// on the core in a safe way. We will do our best to do this very
20// infrequently.
21const CoreProtocolVersion = 1
23// HandshakeConfig is the configuration used by client and servers to
24// handshake before starting a plugin connection. This is embedded by
25// both ServeConfig and ClientConfig.
27// In practice, the plugin host creates a HandshakeConfig that is exported
28// and plugins then can easily consume it.
29type HandshakeConfig struct {
30 // ProtocolVersion is the version that clients must match on to
31 // agree they can communicate. This should match the ProtocolVersion
32 // set on ClientConfig when using a plugin.
33 ProtocolVersion uint
35 // MagicCookieKey and value are used as a very basic verification
36 // that a plugin is intended to be launched. This is not a security
37 // measure, just a UX feature. If the magic cookie doesn't match,
38 // we show human-friendly output.
39 MagicCookieKey string
40 MagicCookieValue string
43// ServeConfig configures what sorts of plugins are served.
44type ServeConfig struct {
45 // HandshakeConfig is the configuration that must match clients.
46 HandshakeConfig
48 // Plugins are the plugins that are served.
49 Plugins map[string]Plugin
52// Serve serves the plugins given by ServeConfig.
54// Serve doesn't return until the plugin is done being executed. Any
55// errors will be outputted to the log.
57// This is the method that plugins should call in their main() functions.
58func Serve(opts *ServeConfig) {
59 // Validate the handshake config
60 if opts.MagicCookieKey == "" || opts.MagicCookieValue == "" {
61 fmt.Fprintf(os.Stderr,
62 "Misconfigured ServeConfig given to serve this plugin: no magic cookie\n"+
63 "key or value was set. Please notify the plugin author and report\n"+
64 "this as a bug.\n")
65 os.Exit(1)
66 }
68 // First check the cookie
69 if os.Getenv(opts.MagicCookieKey) != opts.MagicCookieValue {
70 fmt.Fprintf(os.Stderr,
71 "This binary is a plugin. These are not meant to be executed directly.\n"+
72 "Please execute the program that consumes these plugins, which will\n"+
73 "load any plugins automatically\n")
74 os.Exit(1)
75 }
77 // Logging goes to the original stderr
78 log.SetOutput(os.Stderr)
80 // Create our new stdout, stderr files. These will override our built-in
81 // stdout/stderr so that it works across the stream boundary.
82 stdout_r, stdout_w, err := os.Pipe()
83 if err != nil {
84 fmt.Fprintf(os.Stderr, "Error preparing plugin: %s\n", err)
85 os.Exit(1)
86 }
87 stderr_r, stderr_w, err := os.Pipe()
88 if err != nil {
89 fmt.Fprintf(os.Stderr, "Error preparing plugin: %s\n", err)
90 os.Exit(1)
91 }
93 // Register a listener so we can accept a connection
94 listener, err := serverListener()
95 if err != nil {
96 log.Printf("[ERR] plugin: plugin init: %s", err)
97 return
98 }
99 defer listener.Close()
101 // Create the channel to tell us when we're done
102 doneCh := make(chan struct{})
104 // Create the RPC server to dispense
105 server := &RPCServer{
106 Plugins: opts.Plugins,
107 Stdout: stdout_r,
108 Stderr: stderr_r,
109 DoneCh: doneCh,
110 }
112 // Output the address and service name to stdout so that core can bring it up.
113 log.Printf("[DEBUG] plugin: plugin address: %s %s\n",
114 listener.Addr().Network(), listener.Addr().String())
115 fmt.Printf("%d|%d|%s|%s\n",
116 CoreProtocolVersion,
117 opts.ProtocolVersion,
118 listener.Addr().Network(),
119 listener.Addr().String())
120 os.Stdout.Sync()
122 // Eat the interrupts
123 ch := make(chan os.Signal, 1)
124 signal.Notify(ch, os.Interrupt)
125 go func() {
126 var count int32 = 0
127 for {
128 <-ch
129 newCount := atomic.AddInt32(&count, 1)
130 log.Printf(
131 "[DEBUG] plugin: received interrupt signal (count: %d). Ignoring.",
132 newCount)
133 }
134 }()
136 // Set our new out, err
137 os.Stdout = stdout_w
138 os.Stderr = stderr_w
140 // Serve
141 go server.Accept(listener)
143 // Wait for the graceful exit
144 <-doneCh
147func serverListener() (net.Listener, error) {
148 if runtime.GOOS == "windows" {
149 return serverListener_tcp()
150 }
152 return serverListener_unix()
155func serverListener_tcp() (net.Listener, error) {
156 minPort, err := strconv.ParseInt(os.Getenv("PLUGIN_MIN_PORT"), 10, 32)
157 if err != nil {
158 return nil, err
159 }
161 maxPort, err := strconv.ParseInt(os.Getenv("PLUGIN_MAX_PORT"), 10, 32)
162 if err != nil {
163 return nil, err
164 }
166 for port := minPort; port <= maxPort; port++ {
167 address := fmt.Sprintf("", port)
168 listener, err := net.Listen("tcp", address)
169 if err == nil {
170 return listener, nil
171 }
172 }
174 return nil, errors.New("Couldn't bind plugin TCP listener")
177func serverListener_unix() (net.Listener, error) {
178 tf, err := ioutil.TempFile("", "plugin")
179 if err != nil {
180 return nil, err
181 }
182 path := tf.Name()
184 // Close the file and remove it because it has to not exist for
185 // the domain socket.
186 if err := tf.Close(); err != nil {
187 return nil, err
188 }
189 if err := os.Remove(path); err != nil {
190 return nil, err
191 }
193 l, err := net.Listen("unix", path)
194 if err != nil {
195 return nil, err
196 }
198 // Wrap the listener in rmListener so that the Unix domain socket file
199 // is removed on close.
200 return &rmListener{
201 Listener: l,
202 Path: path,
203 }, nil
206// rmListener is an implementation of net.Listener that forwards most
207// calls to the listener but also removes a file as part of the close. We
208// use this to cleanup the unix domain socket on close.
209type rmListener struct {
210 net.Listener
211 Path string
214func (l *rmListener) Close() error {
215 // Close the listener itself
216 if err := l.Listener.Close(); err != nil {
217 return err
218 }
220 // Remove the file
221 return os.Remove(l.Path)
1package plugin
3import (
4 "fmt"
5 "os"
8// ServeMuxMap is the type that is used to configure ServeMux
9type ServeMuxMap map[string]*ServeConfig
11// ServeMux is like Serve, but serves multiple types of plugins determined
12// by the argument given on the command-line.
14// This command doesn't return until the plugin is done being executed. Any
15// errors are logged or output to stderr.
16func ServeMux(m ServeMuxMap) {
17 if len(os.Args) != 2 {
18 fmt.Fprintf(os.Stderr,
19 "Invoked improperly. This is an internal command that shouldn't\n"+
20 "be manually invoked.\n")
21 os.Exit(1)
22 }
24 opts, ok := m[os.Args[1]]
25 if !ok {
26 fmt.Fprintf(os.Stderr, "Unknown plugin: %s\n", os.Args[1])
27 os.Exit(1)
28 }
30 Serve(opts)
1package plugin
3import (
4 "io"
5 "log"
8func copyStream(name string, dst io.Writer, src io.Reader) {
9 if src == nil {
10 panic(name + ": src is nil")
11 }
12 if dst == nil {
13 panic(name + ": dst is nil")
14 }
15 if _, err := io.Copy(dst, src); err != nil && err != io.EOF {
16 log.Printf("[ERR] plugin: stream copy '%s' error: %s", name, err)
17 }
1package plugin
3import (
4 "bytes"
5 "net"
6 "net/rpc"
7 "testing"
10// The testing file contains test helpers that you can use outside of
11// this package for making it easier to test plugins themselves.
13// TestConn is a helper function for returning a client and server
14// net.Conn connected to each other.
15func TestConn(t *testing.T) (net.Conn, net.Conn) {
16 // Listen to any local port. This listener will be closed
17 // after a single connection is established.
18 l, err := net.Listen("tcp", "")
19 if err != nil {
20 t.Fatalf("err: %s", err)
21 }
23 // Start a goroutine to accept our client connection
24 var serverConn net.Conn
25 doneCh := make(chan struct{})
26 go func() {
27 defer close(doneCh)
28 defer l.Close()
29 var err error
30 serverConn, err = l.Accept()
31 if err != nil {
32 t.Fatalf("err: %s", err)
33 }
34 }()
36 // Connect to the server
37 clientConn, err := net.Dial("tcp", l.Addr().String())
38 if err != nil {
39 t.Fatalf("err: %s", err)
40 }
42 // Wait for the server side to acknowledge it has connected
43 <-doneCh
45 return clientConn, serverConn
48// TestRPCConn returns a rpc client and server connected to each other.
49func TestRPCConn(t *testing.T) (*rpc.Client, *rpc.Server) {
50 clientConn, serverConn := TestConn(t)
52 server := rpc.NewServer()
53 go server.ServeConn(serverConn)
55 client := rpc.NewClient(clientConn)
56 return client, server
59// TestPluginRPCConn returns a plugin RPC client and server that are connected
60// together and configured.
61func TestPluginRPCConn(t *testing.T, ps map[string]Plugin) (*RPCClient, *RPCServer) {
62 // Create two net.Conns we can use to shuttle our control connection
63 clientConn, serverConn := TestConn(t)
65 // Start up the server
66 server := &RPCServer{Plugins: ps, Stdout: new(bytes.Buffer), Stderr: new(bytes.Buffer)}
67 go server.ServeConn(serverConn)
69 // Connect the client to the server
70 client, err := NewRPCClient(clientConn, ps)
71 if err != nil {
72 t.Fatalf("err: %s", err)
73 }
75 return client, server