diff options
Diffstat (limited to 'vendor/github.com/hashicorp/go-plugin/mux_broker.go')
-rw-r--r-- | vendor/github.com/hashicorp/go-plugin/mux_broker.go | 204 |
1 files changed, 204 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/go-plugin/mux_broker.go b/vendor/github.com/hashicorp/go-plugin/mux_broker.go new file mode 100644 index 0000000..01c45ad --- /dev/null +++ b/vendor/github.com/hashicorp/go-plugin/mux_broker.go | |||
@@ -0,0 +1,204 @@ | |||
1 | package plugin | ||
2 | |||
3 | import ( | ||
4 | "encoding/binary" | ||
5 | "fmt" | ||
6 | "log" | ||
7 | "net" | ||
8 | "sync" | ||
9 | "sync/atomic" | ||
10 | "time" | ||
11 | |||
12 | "github.com/hashicorp/yamux" | ||
13 | ) | ||
14 | |||
15 | // MuxBroker is responsible for brokering multiplexed connections by unique ID. | ||
16 | // | ||
17 | // It is used by plugins to multiplex multiple RPC connections and data | ||
18 | // streams on top of a single connection between the plugin process and the | ||
19 | // host process. | ||
20 | // | ||
21 | // This allows a plugin to request a channel with a specific ID to connect to | ||
22 | // or accept a connection from, and the broker handles the details of | ||
23 | // holding these channels open while they're being negotiated. | ||
24 | // | ||
25 | // The Plugin interface has access to these for both Server and Client. | ||
26 | // The broker can be used by either (optionally) to reserve and connect to | ||
27 | // new multiplexed streams. This is useful for complex args and return values, | ||
28 | // or anything else you might need a data stream for. | ||
29 | type MuxBroker struct { | ||
30 | nextId uint32 | ||
31 | session *yamux.Session | ||
32 | streams map[uint32]*muxBrokerPending | ||
33 | |||
34 | sync.Mutex | ||
35 | } | ||
36 | |||
37 | type muxBrokerPending struct { | ||
38 | ch chan net.Conn | ||
39 | doneCh chan struct{} | ||
40 | } | ||
41 | |||
42 | func newMuxBroker(s *yamux.Session) *MuxBroker { | ||
43 | return &MuxBroker{ | ||
44 | session: s, | ||
45 | streams: make(map[uint32]*muxBrokerPending), | ||
46 | } | ||
47 | } | ||
48 | |||
49 | // Accept accepts a connection by ID. | ||
50 | // | ||
51 | // This should not be called multiple times with the same ID at one time. | ||
52 | func (m *MuxBroker) Accept(id uint32) (net.Conn, error) { | ||
53 | var c net.Conn | ||
54 | p := m.getStream(id) | ||
55 | select { | ||
56 | case c = <-p.ch: | ||
57 | close(p.doneCh) | ||
58 | case <-time.After(5 * time.Second): | ||
59 | m.Lock() | ||
60 | defer m.Unlock() | ||
61 | delete(m.streams, id) | ||
62 | |||
63 | return nil, fmt.Errorf("timeout waiting for accept") | ||
64 | } | ||
65 | |||
66 | // Ack our connection | ||
67 | if err := binary.Write(c, binary.LittleEndian, id); err != nil { | ||
68 | c.Close() | ||
69 | return nil, err | ||
70 | } | ||
71 | |||
72 | return c, nil | ||
73 | } | ||
74 | |||
75 | // AcceptAndServe is used to accept a specific stream ID and immediately | ||
76 | // serve an RPC server on that stream ID. This is used to easily serve | ||
77 | // complex arguments. | ||
78 | // | ||
79 | // The served interface is always registered to the "Plugin" name. | ||
80 | func (m *MuxBroker) AcceptAndServe(id uint32, v interface{}) { | ||
81 | conn, err := m.Accept(id) | ||
82 | if err != nil { | ||
83 | log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err) | ||
84 | return | ||
85 | } | ||
86 | |||
87 | serve(conn, "Plugin", v) | ||
88 | } | ||
89 | |||
90 | // Close closes the connection and all sub-connections. | ||
91 | func (m *MuxBroker) Close() error { | ||
92 | return m.session.Close() | ||
93 | } | ||
94 | |||
95 | // Dial opens a connection by ID. | ||
96 | func (m *MuxBroker) Dial(id uint32) (net.Conn, error) { | ||
97 | // Open the stream | ||
98 | stream, err := m.session.OpenStream() | ||
99 | if err != nil { | ||
100 | return nil, err | ||
101 | } | ||
102 | |||
103 | // Write the stream ID onto the wire. | ||
104 | if err := binary.Write(stream, binary.LittleEndian, id); err != nil { | ||
105 | stream.Close() | ||
106 | return nil, err | ||
107 | } | ||
108 | |||
109 | // Read the ack that we connected. Then we're off! | ||
110 | var ack uint32 | ||
111 | if err := binary.Read(stream, binary.LittleEndian, &ack); err != nil { | ||
112 | stream.Close() | ||
113 | return nil, err | ||
114 | } | ||
115 | if ack != id { | ||
116 | stream.Close() | ||
117 | return nil, fmt.Errorf("bad ack: %d (expected %d)", ack, id) | ||
118 | } | ||
119 | |||
120 | return stream, nil | ||
121 | } | ||
122 | |||
123 | // NextId returns a unique ID to use next. | ||
124 | // | ||
125 | // It is possible for very long-running plugin hosts to wrap this value, | ||
126 | // though it would require a very large amount of RPC calls. In practice | ||
127 | // we've never seen it happen. | ||
128 | func (m *MuxBroker) NextId() uint32 { | ||
129 | return atomic.AddUint32(&m.nextId, 1) | ||
130 | } | ||
131 | |||
132 | // Run starts the brokering and should be executed in a goroutine, since it | ||
133 | // blocks forever, or until the session closes. | ||
134 | // | ||
135 | // Uses of MuxBroker never need to call this. It is called internally by | ||
136 | // the plugin host/client. | ||
137 | func (m *MuxBroker) Run() { | ||
138 | for { | ||
139 | stream, err := m.session.AcceptStream() | ||
140 | if err != nil { | ||
141 | // Once we receive an error, just exit | ||
142 | break | ||
143 | } | ||
144 | |||
145 | // Read the stream ID from the stream | ||
146 | var id uint32 | ||
147 | if err := binary.Read(stream, binary.LittleEndian, &id); err != nil { | ||
148 | stream.Close() | ||
149 | continue | ||
150 | } | ||
151 | |||
152 | // Initialize the waiter | ||
153 | p := m.getStream(id) | ||
154 | select { | ||
155 | case p.ch <- stream: | ||
156 | default: | ||
157 | } | ||
158 | |||
159 | // Wait for a timeout | ||
160 | go m.timeoutWait(id, p) | ||
161 | } | ||
162 | } | ||
163 | |||
164 | func (m *MuxBroker) getStream(id uint32) *muxBrokerPending { | ||
165 | m.Lock() | ||
166 | defer m.Unlock() | ||
167 | |||
168 | p, ok := m.streams[id] | ||
169 | if ok { | ||
170 | return p | ||
171 | } | ||
172 | |||
173 | m.streams[id] = &muxBrokerPending{ | ||
174 | ch: make(chan net.Conn, 1), | ||
175 | doneCh: make(chan struct{}), | ||
176 | } | ||
177 | return m.streams[id] | ||
178 | } | ||
179 | |||
180 | func (m *MuxBroker) timeoutWait(id uint32, p *muxBrokerPending) { | ||
181 | // Wait for the stream to either be picked up and connected, or | ||
182 | // for a timeout. | ||
183 | timeout := false | ||
184 | select { | ||
185 | case <-p.doneCh: | ||
186 | case <-time.After(5 * time.Second): | ||
187 | timeout = true | ||
188 | } | ||
189 | |||
190 | m.Lock() | ||
191 | defer m.Unlock() | ||
192 | |||
193 | // Delete the stream so no one else can grab it | ||
194 | delete(m.streams, id) | ||
195 | |||
196 | // If we timed out, then check if we have a channel in the buffer, | ||
197 | // and if so, close it. | ||
198 | if timeout { | ||
199 | select { | ||
200 | case s := <-p.ch: | ||
201 | s.Close() | ||
202 | } | ||
203 | } | ||
204 | } | ||