]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blob - vendor/github.com/hashicorp/go-plugin/mux_broker.go
Initial transfer of provider code
[github/fretlink/terraform-provider-statuscake.git] / vendor / github.com / hashicorp / go-plugin / mux_broker.go
1 package plugin
2
3 import (
4 "encoding/binary"
5 "fmt"
6 "log"
7 "net"
8 "sync"
9 "sync/atomic"
10 "time"
11
12 "github.com/hashicorp/yamux"
13 )
14
15 // MuxBroker is responsible for brokering multiplexed connections by unique ID.
16 //
17 // It is used by plugins to multiplex multiple RPC connections and data
18 // streams on top of a single connection between the plugin process and the
19 // host process.
20 //
21 // This allows a plugin to request a channel with a specific ID to connect to
22 // or accept a connection from, and the broker handles the details of
23 // holding these channels open while they're being negotiated.
24 //
25 // The Plugin interface has access to these for both Server and Client.
26 // The broker can be used by either (optionally) to reserve and connect to
27 // new multiplexed streams. This is useful for complex args and return values,
28 // or anything else you might need a data stream for.
29 type MuxBroker struct {
30 nextId uint32
31 session *yamux.Session
32 streams map[uint32]*muxBrokerPending
33
34 sync.Mutex
35 }
36
37 type muxBrokerPending struct {
38 ch chan net.Conn
39 doneCh chan struct{}
40 }
41
42 func newMuxBroker(s *yamux.Session) *MuxBroker {
43 return &MuxBroker{
44 session: s,
45 streams: make(map[uint32]*muxBrokerPending),
46 }
47 }
48
49 // Accept accepts a connection by ID.
50 //
51 // This should not be called multiple times with the same ID at one time.
52 func (m *MuxBroker) Accept(id uint32) (net.Conn, error) {
53 var c net.Conn
54 p := m.getStream(id)
55 select {
56 case c = <-p.ch:
57 close(p.doneCh)
58 case <-time.After(5 * time.Second):
59 m.Lock()
60 defer m.Unlock()
61 delete(m.streams, id)
62
63 return nil, fmt.Errorf("timeout waiting for accept")
64 }
65
66 // Ack our connection
67 if err := binary.Write(c, binary.LittleEndian, id); err != nil {
68 c.Close()
69 return nil, err
70 }
71
72 return c, nil
73 }
74
75 // AcceptAndServe is used to accept a specific stream ID and immediately
76 // serve an RPC server on that stream ID. This is used to easily serve
77 // complex arguments.
78 //
79 // The served interface is always registered to the "Plugin" name.
80 func (m *MuxBroker) AcceptAndServe(id uint32, v interface{}) {
81 conn, err := m.Accept(id)
82 if err != nil {
83 log.Printf("[ERR] plugin: plugin acceptAndServe error: %s", err)
84 return
85 }
86
87 serve(conn, "Plugin", v)
88 }
89
90 // Close closes the connection and all sub-connections.
91 func (m *MuxBroker) Close() error {
92 return m.session.Close()
93 }
94
95 // Dial opens a connection by ID.
96 func (m *MuxBroker) Dial(id uint32) (net.Conn, error) {
97 // Open the stream
98 stream, err := m.session.OpenStream()
99 if err != nil {
100 return nil, err
101 }
102
103 // Write the stream ID onto the wire.
104 if err := binary.Write(stream, binary.LittleEndian, id); err != nil {
105 stream.Close()
106 return nil, err
107 }
108
109 // Read the ack that we connected. Then we're off!
110 var ack uint32
111 if err := binary.Read(stream, binary.LittleEndian, &ack); err != nil {
112 stream.Close()
113 return nil, err
114 }
115 if ack != id {
116 stream.Close()
117 return nil, fmt.Errorf("bad ack: %d (expected %d)", ack, id)
118 }
119
120 return stream, nil
121 }
122
123 // NextId returns a unique ID to use next.
124 //
125 // It is possible for very long-running plugin hosts to wrap this value,
126 // though it would require a very large amount of RPC calls. In practice
127 // we've never seen it happen.
128 func (m *MuxBroker) NextId() uint32 {
129 return atomic.AddUint32(&m.nextId, 1)
130 }
131
132 // Run starts the brokering and should be executed in a goroutine, since it
133 // blocks forever, or until the session closes.
134 //
135 // Uses of MuxBroker never need to call this. It is called internally by
136 // the plugin host/client.
137 func (m *MuxBroker) Run() {
138 for {
139 stream, err := m.session.AcceptStream()
140 if err != nil {
141 // Once we receive an error, just exit
142 break
143 }
144
145 // Read the stream ID from the stream
146 var id uint32
147 if err := binary.Read(stream, binary.LittleEndian, &id); err != nil {
148 stream.Close()
149 continue
150 }
151
152 // Initialize the waiter
153 p := m.getStream(id)
154 select {
155 case p.ch <- stream:
156 default:
157 }
158
159 // Wait for a timeout
160 go m.timeoutWait(id, p)
161 }
162 }
163
164 func (m *MuxBroker) getStream(id uint32) *muxBrokerPending {
165 m.Lock()
166 defer m.Unlock()
167
168 p, ok := m.streams[id]
169 if ok {
170 return p
171 }
172
173 m.streams[id] = &muxBrokerPending{
174 ch: make(chan net.Conn, 1),
175 doneCh: make(chan struct{}),
176 }
177 return m.streams[id]
178 }
179
180 func (m *MuxBroker) timeoutWait(id uint32, p *muxBrokerPending) {
181 // Wait for the stream to either be picked up and connected, or
182 // for a timeout.
183 timeout := false
184 select {
185 case <-p.doneCh:
186 case <-time.After(5 * time.Second):
187 timeout = true
188 }
189
190 m.Lock()
191 defer m.Unlock()
192
193 // Delete the stream so no one else can grab it
194 delete(m.streams, id)
195
196 // If we timed out, then check if we have a channel in the buffer,
197 // and if so, close it.
198 if timeout {
199 select {
200 case s := <-p.ch:
201 s.Close()
202 }
203 }
204 }