]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blob - vendor/github.com/hashicorp/terraform/dag/walk.go
Initial transfer of provider code
[github/fretlink/terraform-provider-statuscake.git] / vendor / github.com / hashicorp / terraform / dag / walk.go
1 package dag
2
3 import (
4 "errors"
5 "fmt"
6 "log"
7 "sync"
8 "time"
9
10 "github.com/hashicorp/go-multierror"
11 )
12
13 // Walker is used to walk every vertex of a graph in parallel.
14 //
15 // A vertex will only be walked when the dependencies of that vertex have
16 // been walked. If two vertices can be walked at the same time, they will be.
17 //
18 // Update can be called to update the graph. This can be called even during
19 // a walk, cahnging vertices/edges mid-walk. This should be done carefully.
20 // If a vertex is removed but has already been executed, the result of that
21 // execution (any error) is still returned by Wait. Changing or re-adding
22 // a vertex that has already executed has no effect. Changing edges of
23 // a vertex that has already executed has no effect.
24 //
25 // Non-parallelism can be enforced by introducing a lock in your callback
26 // function. However, the goroutine overhead of a walk will remain.
27 // Walker will create V*2 goroutines (one for each vertex, and dependency
28 // waiter for each vertex). In general this should be of no concern unless
29 // there are a huge number of vertices.
30 //
31 // The walk is depth first by default. This can be changed with the Reverse
32 // option.
33 //
34 // A single walker is only valid for one graph walk. After the walk is complete
35 // you must construct a new walker to walk again. State for the walk is never
36 // deleted in case vertices or edges are changed.
37 type Walker struct {
38 // Callback is what is called for each vertex
39 Callback WalkFunc
40
41 // Reverse, if true, causes the source of an edge to depend on a target.
42 // When false (default), the target depends on the source.
43 Reverse bool
44
45 // changeLock must be held to modify any of the fields below. Only Update
46 // should modify these fields. Modifying them outside of Update can cause
47 // serious problems.
48 changeLock sync.Mutex
49 vertices Set
50 edges Set
51 vertexMap map[Vertex]*walkerVertex
52
53 // wait is done when all vertices have executed. It may become "undone"
54 // if new vertices are added.
55 wait sync.WaitGroup
56
57 // errMap contains the errors recorded so far for execution. Reading
58 // and writing should hold errLock.
59 errMap map[Vertex]error
60 errLock sync.Mutex
61 }
62
63 type walkerVertex struct {
64 // These should only be set once on initialization and never written again.
65 // They are not protected by a lock since they don't need to be since
66 // they are write-once.
67
68 // DoneCh is closed when this vertex has completed execution, regardless
69 // of success.
70 //
71 // CancelCh is closed when the vertex should cancel execution. If execution
72 // is already complete (DoneCh is closed), this has no effect. Otherwise,
73 // execution is cancelled as quickly as possible.
74 DoneCh chan struct{}
75 CancelCh chan struct{}
76
77 // Dependency information. Any changes to any of these fields requires
78 // holding DepsLock.
79 //
80 // DepsCh is sent a single value that denotes whether the upstream deps
81 // were successful (no errors). Any value sent means that the upstream
82 // dependencies are complete. No other values will ever be sent again.
83 //
84 // DepsUpdateCh is closed when there is a new DepsCh set.
85 DepsCh chan bool
86 DepsUpdateCh chan struct{}
87 DepsLock sync.Mutex
88
89 // Below is not safe to read/write in parallel. This behavior is
90 // enforced by changes only happening in Update. Nothing else should
91 // ever modify these.
92 deps map[Vertex]chan struct{}
93 depsCancelCh chan struct{}
94 }
95
96 // errWalkUpstream is used in the errMap of a walk to note that an upstream
97 // dependency failed so this vertex wasn't run. This is not shown in the final
98 // user-returned error.
99 var errWalkUpstream = errors.New("upstream dependency failed")
100
101 // Wait waits for the completion of the walk and returns any errors (
102 // in the form of a multierror) that occurred. Update should be called
103 // to populate the walk with vertices and edges prior to calling this.
104 //
105 // Wait will return as soon as all currently known vertices are complete.
106 // If you plan on calling Update with more vertices in the future, you
107 // should not call Wait until after this is done.
108 func (w *Walker) Wait() error {
109 // Wait for completion
110 w.wait.Wait()
111
112 // Grab the error lock
113 w.errLock.Lock()
114 defer w.errLock.Unlock()
115
116 // Build the error
117 var result error
118 for v, err := range w.errMap {
119 if err != nil && err != errWalkUpstream {
120 result = multierror.Append(result, fmt.Errorf(
121 "%s: %s", VertexName(v), err))
122 }
123 }
124
125 return result
126 }
127
128 // Update updates the currently executing walk with the given graph.
129 // This will perform a diff of the vertices and edges and update the walker.
130 // Already completed vertices remain completed (including any errors during
131 // their execution).
132 //
133 // This returns immediately once the walker is updated; it does not wait
134 // for completion of the walk.
135 //
136 // Multiple Updates can be called in parallel. Update can be called at any
137 // time during a walk.
138 func (w *Walker) Update(g *AcyclicGraph) {
139 var v, e *Set
140 if g != nil {
141 v, e = g.vertices, g.edges
142 }
143
144 // Grab the change lock so no more updates happen but also so that
145 // no new vertices are executed during this time since we may be
146 // removing them.
147 w.changeLock.Lock()
148 defer w.changeLock.Unlock()
149
150 // Initialize fields
151 if w.vertexMap == nil {
152 w.vertexMap = make(map[Vertex]*walkerVertex)
153 }
154
155 // Calculate all our sets
156 newEdges := e.Difference(&w.edges)
157 oldEdges := w.edges.Difference(e)
158 newVerts := v.Difference(&w.vertices)
159 oldVerts := w.vertices.Difference(v)
160
161 // Add the new vertices
162 for _, raw := range newVerts.List() {
163 v := raw.(Vertex)
164
165 // Add to the waitgroup so our walk is not done until everything finishes
166 w.wait.Add(1)
167
168 // Add to our own set so we know about it already
169 log.Printf("[DEBUG] dag/walk: added new vertex: %q", VertexName(v))
170 w.vertices.Add(raw)
171
172 // Initialize the vertex info
173 info := &walkerVertex{
174 DoneCh: make(chan struct{}),
175 CancelCh: make(chan struct{}),
176 deps: make(map[Vertex]chan struct{}),
177 }
178
179 // Add it to the map and kick off the walk
180 w.vertexMap[v] = info
181 }
182
183 // Remove the old vertices
184 for _, raw := range oldVerts.List() {
185 v := raw.(Vertex)
186
187 // Get the vertex info so we can cancel it
188 info, ok := w.vertexMap[v]
189 if !ok {
190 // This vertex for some reason was never in our map. This
191 // shouldn't be possible.
192 continue
193 }
194
195 // Cancel the vertex
196 close(info.CancelCh)
197
198 // Delete it out of the map
199 delete(w.vertexMap, v)
200
201 log.Printf("[DEBUG] dag/walk: removed vertex: %q", VertexName(v))
202 w.vertices.Delete(raw)
203 }
204
205 // Add the new edges
206 var changedDeps Set
207 for _, raw := range newEdges.List() {
208 edge := raw.(Edge)
209 waiter, dep := w.edgeParts(edge)
210
211 // Get the info for the waiter
212 waiterInfo, ok := w.vertexMap[waiter]
213 if !ok {
214 // Vertex doesn't exist... shouldn't be possible but ignore.
215 continue
216 }
217
218 // Get the info for the dep
219 depInfo, ok := w.vertexMap[dep]
220 if !ok {
221 // Vertex doesn't exist... shouldn't be possible but ignore.
222 continue
223 }
224
225 // Add the dependency to our waiter
226 waiterInfo.deps[dep] = depInfo.DoneCh
227
228 // Record that the deps changed for this waiter
229 changedDeps.Add(waiter)
230
231 log.Printf(
232 "[DEBUG] dag/walk: added edge: %q waiting on %q",
233 VertexName(waiter), VertexName(dep))
234 w.edges.Add(raw)
235 }
236
237 // Process reoved edges
238 for _, raw := range oldEdges.List() {
239 edge := raw.(Edge)
240 waiter, dep := w.edgeParts(edge)
241
242 // Get the info for the waiter
243 waiterInfo, ok := w.vertexMap[waiter]
244 if !ok {
245 // Vertex doesn't exist... shouldn't be possible but ignore.
246 continue
247 }
248
249 // Delete the dependency from the waiter
250 delete(waiterInfo.deps, dep)
251
252 // Record that the deps changed for this waiter
253 changedDeps.Add(waiter)
254
255 log.Printf(
256 "[DEBUG] dag/walk: removed edge: %q waiting on %q",
257 VertexName(waiter), VertexName(dep))
258 w.edges.Delete(raw)
259 }
260
261 // For each vertex with changed dependencies, we need to kick off
262 // a new waiter and notify the vertex of the changes.
263 for _, raw := range changedDeps.List() {
264 v := raw.(Vertex)
265 info, ok := w.vertexMap[v]
266 if !ok {
267 // Vertex doesn't exist... shouldn't be possible but ignore.
268 continue
269 }
270
271 // Create a new done channel
272 doneCh := make(chan bool, 1)
273
274 // Create the channel we close for cancellation
275 cancelCh := make(chan struct{})
276
277 // Build a new deps copy
278 deps := make(map[Vertex]<-chan struct{})
279 for k, v := range info.deps {
280 deps[k] = v
281 }
282
283 // Update the update channel
284 info.DepsLock.Lock()
285 if info.DepsUpdateCh != nil {
286 close(info.DepsUpdateCh)
287 }
288 info.DepsCh = doneCh
289 info.DepsUpdateCh = make(chan struct{})
290 info.DepsLock.Unlock()
291
292 // Cancel the older waiter
293 if info.depsCancelCh != nil {
294 close(info.depsCancelCh)
295 }
296 info.depsCancelCh = cancelCh
297
298 log.Printf(
299 "[DEBUG] dag/walk: dependencies changed for %q, sending new deps",
300 VertexName(v))
301
302 // Start the waiter
303 go w.waitDeps(v, deps, doneCh, cancelCh)
304 }
305
306 // Start all the new vertices. We do this at the end so that all
307 // the edge waiters and changes are setup above.
308 for _, raw := range newVerts.List() {
309 v := raw.(Vertex)
310 go w.walkVertex(v, w.vertexMap[v])
311 }
312 }
313
314 // edgeParts returns the waiter and the dependency, in that order.
315 // The waiter is waiting on the dependency.
316 func (w *Walker) edgeParts(e Edge) (Vertex, Vertex) {
317 if w.Reverse {
318 return e.Source(), e.Target()
319 }
320
321 return e.Target(), e.Source()
322 }
323
324 // walkVertex walks a single vertex, waiting for any dependencies before
325 // executing the callback.
326 func (w *Walker) walkVertex(v Vertex, info *walkerVertex) {
327 // When we're done executing, lower the waitgroup count
328 defer w.wait.Done()
329
330 // When we're done, always close our done channel
331 defer close(info.DoneCh)
332
333 // Wait for our dependencies. We create a [closed] deps channel so
334 // that we can immediately fall through to load our actual DepsCh.
335 var depsSuccess bool
336 var depsUpdateCh chan struct{}
337 depsCh := make(chan bool, 1)
338 depsCh <- true
339 close(depsCh)
340 for {
341 select {
342 case <-info.CancelCh:
343 // Cancel
344 return
345
346 case depsSuccess = <-depsCh:
347 // Deps complete! Mark as nil to trigger completion handling.
348 depsCh = nil
349
350 case <-depsUpdateCh:
351 // New deps, reloop
352 }
353
354 // Check if we have updated dependencies. This can happen if the
355 // dependencies were satisfied exactly prior to an Update occurring.
356 // In that case, we'd like to take into account new dependencies
357 // if possible.
358 info.DepsLock.Lock()
359 if info.DepsCh != nil {
360 depsCh = info.DepsCh
361 info.DepsCh = nil
362 }
363 if info.DepsUpdateCh != nil {
364 depsUpdateCh = info.DepsUpdateCh
365 }
366 info.DepsLock.Unlock()
367
368 // If we still have no deps channel set, then we're done!
369 if depsCh == nil {
370 break
371 }
372 }
373
374 // If we passed dependencies, we just want to check once more that
375 // we're not cancelled, since this can happen just as dependencies pass.
376 select {
377 case <-info.CancelCh:
378 // Cancelled during an update while dependencies completed.
379 return
380 default:
381 }
382
383 // Run our callback or note that our upstream failed
384 var err error
385 if depsSuccess {
386 log.Printf("[DEBUG] dag/walk: walking %q", VertexName(v))
387 err = w.Callback(v)
388 } else {
389 log.Printf("[DEBUG] dag/walk: upstream errored, not walking %q", VertexName(v))
390 err = errWalkUpstream
391 }
392
393 // Record the error
394 if err != nil {
395 w.errLock.Lock()
396 defer w.errLock.Unlock()
397
398 if w.errMap == nil {
399 w.errMap = make(map[Vertex]error)
400 }
401 w.errMap[v] = err
402 }
403 }
404
405 func (w *Walker) waitDeps(
406 v Vertex,
407 deps map[Vertex]<-chan struct{},
408 doneCh chan<- bool,
409 cancelCh <-chan struct{}) {
410 // For each dependency given to us, wait for it to complete
411 for dep, depCh := range deps {
412 DepSatisfied:
413 for {
414 select {
415 case <-depCh:
416 // Dependency satisfied!
417 break DepSatisfied
418
419 case <-cancelCh:
420 // Wait cancelled. Note that we didn't satisfy dependencies
421 // so that anything waiting on us also doesn't run.
422 doneCh <- false
423 return
424
425 case <-time.After(time.Second * 5):
426 log.Printf("[DEBUG] dag/walk: vertex %q, waiting for: %q",
427 VertexName(v), VertexName(dep))
428 }
429 }
430 }
431
432 // Dependencies satisfied! We need to check if any errored
433 w.errLock.Lock()
434 defer w.errLock.Unlock()
435 for dep, _ := range deps {
436 if w.errMap[dep] != nil {
437 // One of our dependencies failed, so return false
438 doneCh <- false
439 return
440 }
441 }
442
443 // All dependencies satisfied and successful
444 doneCh <- true
445 }