]> git.immae.eu Git - github/fretlink/terraform-provider-statuscake.git/blame_incremental - vendor/github.com/hashicorp/terraform/dag/walk.go
Merge branch 'fix_read_test' of github.com:alexandreFre/terraform-provider-statuscake
[github/fretlink/terraform-provider-statuscake.git] / vendor / github.com / hashicorp / terraform / dag / walk.go
... / ...
CommitLineData
1package dag
2
3import (
4 "errors"
5 "log"
6 "sync"
7 "time"
8
9 "github.com/hashicorp/terraform/tfdiags"
10)
11
12// Walker is used to walk every vertex of a graph in parallel.
13//
14// A vertex will only be walked when the dependencies of that vertex have
15// been walked. If two vertices can be walked at the same time, they will be.
16//
17// Update can be called to update the graph. This can be called even during
18// a walk, cahnging vertices/edges mid-walk. This should be done carefully.
19// If a vertex is removed but has already been executed, the result of that
20// execution (any error) is still returned by Wait. Changing or re-adding
21// a vertex that has already executed has no effect. Changing edges of
22// a vertex that has already executed has no effect.
23//
24// Non-parallelism can be enforced by introducing a lock in your callback
25// function. However, the goroutine overhead of a walk will remain.
26// Walker will create V*2 goroutines (one for each vertex, and dependency
27// waiter for each vertex). In general this should be of no concern unless
28// there are a huge number of vertices.
29//
30// The walk is depth first by default. This can be changed with the Reverse
31// option.
32//
33// A single walker is only valid for one graph walk. After the walk is complete
34// you must construct a new walker to walk again. State for the walk is never
35// deleted in case vertices or edges are changed.
36type Walker struct {
37 // Callback is what is called for each vertex
38 Callback WalkFunc
39
40 // Reverse, if true, causes the source of an edge to depend on a target.
41 // When false (default), the target depends on the source.
42 Reverse bool
43
44 // changeLock must be held to modify any of the fields below. Only Update
45 // should modify these fields. Modifying them outside of Update can cause
46 // serious problems.
47 changeLock sync.Mutex
48 vertices Set
49 edges Set
50 vertexMap map[Vertex]*walkerVertex
51
52 // wait is done when all vertices have executed. It may become "undone"
53 // if new vertices are added.
54 wait sync.WaitGroup
55
56 // diagsMap contains the diagnostics recorded so far for execution,
57 // and upstreamFailed contains all the vertices whose problems were
58 // caused by upstream failures, and thus whose diagnostics should be
59 // excluded from the final set.
60 //
61 // Readers and writers of either map must hold diagsLock.
62 diagsMap map[Vertex]tfdiags.Diagnostics
63 upstreamFailed map[Vertex]struct{}
64 diagsLock sync.Mutex
65}
66
67type walkerVertex struct {
68 // These should only be set once on initialization and never written again.
69 // They are not protected by a lock since they don't need to be since
70 // they are write-once.
71
72 // DoneCh is closed when this vertex has completed execution, regardless
73 // of success.
74 //
75 // CancelCh is closed when the vertex should cancel execution. If execution
76 // is already complete (DoneCh is closed), this has no effect. Otherwise,
77 // execution is cancelled as quickly as possible.
78 DoneCh chan struct{}
79 CancelCh chan struct{}
80
81 // Dependency information. Any changes to any of these fields requires
82 // holding DepsLock.
83 //
84 // DepsCh is sent a single value that denotes whether the upstream deps
85 // were successful (no errors). Any value sent means that the upstream
86 // dependencies are complete. No other values will ever be sent again.
87 //
88 // DepsUpdateCh is closed when there is a new DepsCh set.
89 DepsCh chan bool
90 DepsUpdateCh chan struct{}
91 DepsLock sync.Mutex
92
93 // Below is not safe to read/write in parallel. This behavior is
94 // enforced by changes only happening in Update. Nothing else should
95 // ever modify these.
96 deps map[Vertex]chan struct{}
97 depsCancelCh chan struct{}
98}
99
100// errWalkUpstream is used in the errMap of a walk to note that an upstream
101// dependency failed so this vertex wasn't run. This is not shown in the final
102// user-returned error.
103var errWalkUpstream = errors.New("upstream dependency failed")
104
105// Wait waits for the completion of the walk and returns diagnostics describing
106// any problems that arose. Update should be called to populate the walk with
107// vertices and edges prior to calling this.
108//
109// Wait will return as soon as all currently known vertices are complete.
110// If you plan on calling Update with more vertices in the future, you
111// should not call Wait until after this is done.
112func (w *Walker) Wait() tfdiags.Diagnostics {
113 // Wait for completion
114 w.wait.Wait()
115
116 var diags tfdiags.Diagnostics
117 w.diagsLock.Lock()
118 for v, vDiags := range w.diagsMap {
119 if _, upstream := w.upstreamFailed[v]; upstream {
120 // Ignore diagnostics for nodes that had failed upstreams, since
121 // the downstream diagnostics are likely to be redundant.
122 continue
123 }
124 diags = diags.Append(vDiags)
125 }
126 w.diagsLock.Unlock()
127
128 return diags
129}
130
131// Update updates the currently executing walk with the given graph.
132// This will perform a diff of the vertices and edges and update the walker.
133// Already completed vertices remain completed (including any errors during
134// their execution).
135//
136// This returns immediately once the walker is updated; it does not wait
137// for completion of the walk.
138//
139// Multiple Updates can be called in parallel. Update can be called at any
140// time during a walk.
141func (w *Walker) Update(g *AcyclicGraph) {
142 log.Print("[TRACE] dag/walk: updating graph")
143 var v, e *Set
144 if g != nil {
145 v, e = g.vertices, g.edges
146 }
147
148 // Grab the change lock so no more updates happen but also so that
149 // no new vertices are executed during this time since we may be
150 // removing them.
151 w.changeLock.Lock()
152 defer w.changeLock.Unlock()
153
154 // Initialize fields
155 if w.vertexMap == nil {
156 w.vertexMap = make(map[Vertex]*walkerVertex)
157 }
158
159 // Calculate all our sets
160 newEdges := e.Difference(&w.edges)
161 oldEdges := w.edges.Difference(e)
162 newVerts := v.Difference(&w.vertices)
163 oldVerts := w.vertices.Difference(v)
164
165 // Add the new vertices
166 for _, raw := range newVerts.List() {
167 v := raw.(Vertex)
168
169 // Add to the waitgroup so our walk is not done until everything finishes
170 w.wait.Add(1)
171
172 // Add to our own set so we know about it already
173 log.Printf("[TRACE] dag/walk: added new vertex: %q", VertexName(v))
174 w.vertices.Add(raw)
175
176 // Initialize the vertex info
177 info := &walkerVertex{
178 DoneCh: make(chan struct{}),
179 CancelCh: make(chan struct{}),
180 deps: make(map[Vertex]chan struct{}),
181 }
182
183 // Add it to the map and kick off the walk
184 w.vertexMap[v] = info
185 }
186
187 // Remove the old vertices
188 for _, raw := range oldVerts.List() {
189 v := raw.(Vertex)
190
191 // Get the vertex info so we can cancel it
192 info, ok := w.vertexMap[v]
193 if !ok {
194 // This vertex for some reason was never in our map. This
195 // shouldn't be possible.
196 continue
197 }
198
199 // Cancel the vertex
200 close(info.CancelCh)
201
202 // Delete it out of the map
203 delete(w.vertexMap, v)
204
205 log.Printf("[TRACE] dag/walk: removed vertex: %q", VertexName(v))
206 w.vertices.Delete(raw)
207 }
208
209 // Add the new edges
210 var changedDeps Set
211 for _, raw := range newEdges.List() {
212 edge := raw.(Edge)
213 waiter, dep := w.edgeParts(edge)
214
215 // Get the info for the waiter
216 waiterInfo, ok := w.vertexMap[waiter]
217 if !ok {
218 // Vertex doesn't exist... shouldn't be possible but ignore.
219 continue
220 }
221
222 // Get the info for the dep
223 depInfo, ok := w.vertexMap[dep]
224 if !ok {
225 // Vertex doesn't exist... shouldn't be possible but ignore.
226 continue
227 }
228
229 // Add the dependency to our waiter
230 waiterInfo.deps[dep] = depInfo.DoneCh
231
232 // Record that the deps changed for this waiter
233 changedDeps.Add(waiter)
234
235 log.Printf(
236 "[TRACE] dag/walk: added edge: %q waiting on %q",
237 VertexName(waiter), VertexName(dep))
238 w.edges.Add(raw)
239 }
240
241 // Process reoved edges
242 for _, raw := range oldEdges.List() {
243 edge := raw.(Edge)
244 waiter, dep := w.edgeParts(edge)
245
246 // Get the info for the waiter
247 waiterInfo, ok := w.vertexMap[waiter]
248 if !ok {
249 // Vertex doesn't exist... shouldn't be possible but ignore.
250 continue
251 }
252
253 // Delete the dependency from the waiter
254 delete(waiterInfo.deps, dep)
255
256 // Record that the deps changed for this waiter
257 changedDeps.Add(waiter)
258
259 log.Printf(
260 "[TRACE] dag/walk: removed edge: %q waiting on %q",
261 VertexName(waiter), VertexName(dep))
262 w.edges.Delete(raw)
263 }
264
265 // For each vertex with changed dependencies, we need to kick off
266 // a new waiter and notify the vertex of the changes.
267 for _, raw := range changedDeps.List() {
268 v := raw.(Vertex)
269 info, ok := w.vertexMap[v]
270 if !ok {
271 // Vertex doesn't exist... shouldn't be possible but ignore.
272 continue
273 }
274
275 // Create a new done channel
276 doneCh := make(chan bool, 1)
277
278 // Create the channel we close for cancellation
279 cancelCh := make(chan struct{})
280
281 // Build a new deps copy
282 deps := make(map[Vertex]<-chan struct{})
283 for k, v := range info.deps {
284 deps[k] = v
285 }
286
287 // Update the update channel
288 info.DepsLock.Lock()
289 if info.DepsUpdateCh != nil {
290 close(info.DepsUpdateCh)
291 }
292 info.DepsCh = doneCh
293 info.DepsUpdateCh = make(chan struct{})
294 info.DepsLock.Unlock()
295
296 // Cancel the older waiter
297 if info.depsCancelCh != nil {
298 close(info.depsCancelCh)
299 }
300 info.depsCancelCh = cancelCh
301
302 log.Printf(
303 "[TRACE] dag/walk: dependencies changed for %q, sending new deps",
304 VertexName(v))
305
306 // Start the waiter
307 go w.waitDeps(v, deps, doneCh, cancelCh)
308 }
309
310 // Start all the new vertices. We do this at the end so that all
311 // the edge waiters and changes are setup above.
312 for _, raw := range newVerts.List() {
313 v := raw.(Vertex)
314 go w.walkVertex(v, w.vertexMap[v])
315 }
316}
317
318// edgeParts returns the waiter and the dependency, in that order.
319// The waiter is waiting on the dependency.
320func (w *Walker) edgeParts(e Edge) (Vertex, Vertex) {
321 if w.Reverse {
322 return e.Source(), e.Target()
323 }
324
325 return e.Target(), e.Source()
326}
327
328// walkVertex walks a single vertex, waiting for any dependencies before
329// executing the callback.
330func (w *Walker) walkVertex(v Vertex, info *walkerVertex) {
331 // When we're done executing, lower the waitgroup count
332 defer w.wait.Done()
333
334 // When we're done, always close our done channel
335 defer close(info.DoneCh)
336
337 // Wait for our dependencies. We create a [closed] deps channel so
338 // that we can immediately fall through to load our actual DepsCh.
339 var depsSuccess bool
340 var depsUpdateCh chan struct{}
341 depsCh := make(chan bool, 1)
342 depsCh <- true
343 close(depsCh)
344 for {
345 select {
346 case <-info.CancelCh:
347 // Cancel
348 return
349
350 case depsSuccess = <-depsCh:
351 // Deps complete! Mark as nil to trigger completion handling.
352 depsCh = nil
353
354 case <-depsUpdateCh:
355 // New deps, reloop
356 }
357
358 // Check if we have updated dependencies. This can happen if the
359 // dependencies were satisfied exactly prior to an Update occurring.
360 // In that case, we'd like to take into account new dependencies
361 // if possible.
362 info.DepsLock.Lock()
363 if info.DepsCh != nil {
364 depsCh = info.DepsCh
365 info.DepsCh = nil
366 }
367 if info.DepsUpdateCh != nil {
368 depsUpdateCh = info.DepsUpdateCh
369 }
370 info.DepsLock.Unlock()
371
372 // If we still have no deps channel set, then we're done!
373 if depsCh == nil {
374 break
375 }
376 }
377
378 // If we passed dependencies, we just want to check once more that
379 // we're not cancelled, since this can happen just as dependencies pass.
380 select {
381 case <-info.CancelCh:
382 // Cancelled during an update while dependencies completed.
383 return
384 default:
385 }
386
387 // Run our callback or note that our upstream failed
388 var diags tfdiags.Diagnostics
389 var upstreamFailed bool
390 if depsSuccess {
391 log.Printf("[TRACE] dag/walk: visiting %q", VertexName(v))
392 diags = w.Callback(v)
393 } else {
394 log.Printf("[TRACE] dag/walk: upstream of %q errored, so skipping", VertexName(v))
395 // This won't be displayed to the user because we'll set upstreamFailed,
396 // but we need to ensure there's at least one error in here so that
397 // the failures will cascade downstream.
398 diags = diags.Append(errors.New("upstream dependencies failed"))
399 upstreamFailed = true
400 }
401
402 // Record the result (we must do this after execution because we mustn't
403 // hold diagsLock while visiting a vertex.)
404 w.diagsLock.Lock()
405 if w.diagsMap == nil {
406 w.diagsMap = make(map[Vertex]tfdiags.Diagnostics)
407 }
408 w.diagsMap[v] = diags
409 if w.upstreamFailed == nil {
410 w.upstreamFailed = make(map[Vertex]struct{})
411 }
412 if upstreamFailed {
413 w.upstreamFailed[v] = struct{}{}
414 }
415 w.diagsLock.Unlock()
416}
417
418func (w *Walker) waitDeps(
419 v Vertex,
420 deps map[Vertex]<-chan struct{},
421 doneCh chan<- bool,
422 cancelCh <-chan struct{}) {
423
424 // For each dependency given to us, wait for it to complete
425 for dep, depCh := range deps {
426 DepSatisfied:
427 for {
428 select {
429 case <-depCh:
430 // Dependency satisfied!
431 break DepSatisfied
432
433 case <-cancelCh:
434 // Wait cancelled. Note that we didn't satisfy dependencies
435 // so that anything waiting on us also doesn't run.
436 doneCh <- false
437 return
438
439 case <-time.After(time.Second * 5):
440 log.Printf("[TRACE] dag/walk: vertex %q is waiting for %q",
441 VertexName(v), VertexName(dep))
442 }
443 }
444 }
445
446 // Dependencies satisfied! We need to check if any errored
447 w.diagsLock.Lock()
448 defer w.diagsLock.Unlock()
449 for dep := range deps {
450 if w.diagsMap[dep].HasErrors() {
451 // One of our dependencies failed, so return false
452 doneCh <- false
453 return
454 }
455 }
456
457 // All dependencies satisfied and successful
458 doneCh <- true
459}