diff options
author | Jake Champlin <jake.champlin.27@gmail.com> | 2017-06-06 12:40:07 -0400 |
---|---|---|
committer | Jake Champlin <jake.champlin.27@gmail.com> | 2017-06-06 12:40:07 -0400 |
commit | bae9f6d2fd5eb5bc80929bd393932b23f14d7c93 (patch) | |
tree | ca9ab12a7d78b1fc27a8f734729081357ce6d252 /vendor/github.com/hashicorp/terraform/dag/walk.go | |
parent | 254c495b6bebab3fb72a243c4bce858d79e6ee99 (diff) | |
download | terraform-provider-statuscake-bae9f6d2fd5eb5bc80929bd393932b23f14d7c93.tar.gz terraform-provider-statuscake-bae9f6d2fd5eb5bc80929bd393932b23f14d7c93.tar.zst terraform-provider-statuscake-bae9f6d2fd5eb5bc80929bd393932b23f14d7c93.zip |
Initial transfer of provider code
Diffstat (limited to 'vendor/github.com/hashicorp/terraform/dag/walk.go')
-rw-r--r-- | vendor/github.com/hashicorp/terraform/dag/walk.go | 445 |
1 files changed, 445 insertions, 0 deletions
diff --git a/vendor/github.com/hashicorp/terraform/dag/walk.go b/vendor/github.com/hashicorp/terraform/dag/walk.go new file mode 100644 index 0000000..23c87ad --- /dev/null +++ b/vendor/github.com/hashicorp/terraform/dag/walk.go | |||
@@ -0,0 +1,445 @@ | |||
1 | package dag | ||
2 | |||
3 | import ( | ||
4 | "errors" | ||
5 | "fmt" | ||
6 | "log" | ||
7 | "sync" | ||
8 | "time" | ||
9 | |||
10 | "github.com/hashicorp/go-multierror" | ||
11 | ) | ||
12 | |||
13 | // Walker is used to walk every vertex of a graph in parallel. | ||
14 | // | ||
15 | // A vertex will only be walked when the dependencies of that vertex have | ||
16 | // been walked. If two vertices can be walked at the same time, they will be. | ||
17 | // | ||
18 | // Update can be called to update the graph. This can be called even during | ||
19 | // a walk, cahnging vertices/edges mid-walk. This should be done carefully. | ||
20 | // If a vertex is removed but has already been executed, the result of that | ||
21 | // execution (any error) is still returned by Wait. Changing or re-adding | ||
22 | // a vertex that has already executed has no effect. Changing edges of | ||
23 | // a vertex that has already executed has no effect. | ||
24 | // | ||
25 | // Non-parallelism can be enforced by introducing a lock in your callback | ||
26 | // function. However, the goroutine overhead of a walk will remain. | ||
27 | // Walker will create V*2 goroutines (one for each vertex, and dependency | ||
28 | // waiter for each vertex). In general this should be of no concern unless | ||
29 | // there are a huge number of vertices. | ||
30 | // | ||
31 | // The walk is depth first by default. This can be changed with the Reverse | ||
32 | // option. | ||
33 | // | ||
34 | // A single walker is only valid for one graph walk. After the walk is complete | ||
35 | // you must construct a new walker to walk again. State for the walk is never | ||
36 | // deleted in case vertices or edges are changed. | ||
37 | type Walker struct { | ||
38 | // Callback is what is called for each vertex | ||
39 | Callback WalkFunc | ||
40 | |||
41 | // Reverse, if true, causes the source of an edge to depend on a target. | ||
42 | // When false (default), the target depends on the source. | ||
43 | Reverse bool | ||
44 | |||
45 | // changeLock must be held to modify any of the fields below. Only Update | ||
46 | // should modify these fields. Modifying them outside of Update can cause | ||
47 | // serious problems. | ||
48 | changeLock sync.Mutex | ||
49 | vertices Set | ||
50 | edges Set | ||
51 | vertexMap map[Vertex]*walkerVertex | ||
52 | |||
53 | // wait is done when all vertices have executed. It may become "undone" | ||
54 | // if new vertices are added. | ||
55 | wait sync.WaitGroup | ||
56 | |||
57 | // errMap contains the errors recorded so far for execution. Reading | ||
58 | // and writing should hold errLock. | ||
59 | errMap map[Vertex]error | ||
60 | errLock sync.Mutex | ||
61 | } | ||
62 | |||
63 | type walkerVertex struct { | ||
64 | // These should only be set once on initialization and never written again. | ||
65 | // They are not protected by a lock since they don't need to be since | ||
66 | // they are write-once. | ||
67 | |||
68 | // DoneCh is closed when this vertex has completed execution, regardless | ||
69 | // of success. | ||
70 | // | ||
71 | // CancelCh is closed when the vertex should cancel execution. If execution | ||
72 | // is already complete (DoneCh is closed), this has no effect. Otherwise, | ||
73 | // execution is cancelled as quickly as possible. | ||
74 | DoneCh chan struct{} | ||
75 | CancelCh chan struct{} | ||
76 | |||
77 | // Dependency information. Any changes to any of these fields requires | ||
78 | // holding DepsLock. | ||
79 | // | ||
80 | // DepsCh is sent a single value that denotes whether the upstream deps | ||
81 | // were successful (no errors). Any value sent means that the upstream | ||
82 | // dependencies are complete. No other values will ever be sent again. | ||
83 | // | ||
84 | // DepsUpdateCh is closed when there is a new DepsCh set. | ||
85 | DepsCh chan bool | ||
86 | DepsUpdateCh chan struct{} | ||
87 | DepsLock sync.Mutex | ||
88 | |||
89 | // Below is not safe to read/write in parallel. This behavior is | ||
90 | // enforced by changes only happening in Update. Nothing else should | ||
91 | // ever modify these. | ||
92 | deps map[Vertex]chan struct{} | ||
93 | depsCancelCh chan struct{} | ||
94 | } | ||
95 | |||
96 | // errWalkUpstream is used in the errMap of a walk to note that an upstream | ||
97 | // dependency failed so this vertex wasn't run. This is not shown in the final | ||
98 | // user-returned error. | ||
99 | var errWalkUpstream = errors.New("upstream dependency failed") | ||
100 | |||
101 | // Wait waits for the completion of the walk and returns any errors ( | ||
102 | // in the form of a multierror) that occurred. Update should be called | ||
103 | // to populate the walk with vertices and edges prior to calling this. | ||
104 | // | ||
105 | // Wait will return as soon as all currently known vertices are complete. | ||
106 | // If you plan on calling Update with more vertices in the future, you | ||
107 | // should not call Wait until after this is done. | ||
108 | func (w *Walker) Wait() error { | ||
109 | // Wait for completion | ||
110 | w.wait.Wait() | ||
111 | |||
112 | // Grab the error lock | ||
113 | w.errLock.Lock() | ||
114 | defer w.errLock.Unlock() | ||
115 | |||
116 | // Build the error | ||
117 | var result error | ||
118 | for v, err := range w.errMap { | ||
119 | if err != nil && err != errWalkUpstream { | ||
120 | result = multierror.Append(result, fmt.Errorf( | ||
121 | "%s: %s", VertexName(v), err)) | ||
122 | } | ||
123 | } | ||
124 | |||
125 | return result | ||
126 | } | ||
127 | |||
128 | // Update updates the currently executing walk with the given graph. | ||
129 | // This will perform a diff of the vertices and edges and update the walker. | ||
130 | // Already completed vertices remain completed (including any errors during | ||
131 | // their execution). | ||
132 | // | ||
133 | // This returns immediately once the walker is updated; it does not wait | ||
134 | // for completion of the walk. | ||
135 | // | ||
136 | // Multiple Updates can be called in parallel. Update can be called at any | ||
137 | // time during a walk. | ||
138 | func (w *Walker) Update(g *AcyclicGraph) { | ||
139 | var v, e *Set | ||
140 | if g != nil { | ||
141 | v, e = g.vertices, g.edges | ||
142 | } | ||
143 | |||
144 | // Grab the change lock so no more updates happen but also so that | ||
145 | // no new vertices are executed during this time since we may be | ||
146 | // removing them. | ||
147 | w.changeLock.Lock() | ||
148 | defer w.changeLock.Unlock() | ||
149 | |||
150 | // Initialize fields | ||
151 | if w.vertexMap == nil { | ||
152 | w.vertexMap = make(map[Vertex]*walkerVertex) | ||
153 | } | ||
154 | |||
155 | // Calculate all our sets | ||
156 | newEdges := e.Difference(&w.edges) | ||
157 | oldEdges := w.edges.Difference(e) | ||
158 | newVerts := v.Difference(&w.vertices) | ||
159 | oldVerts := w.vertices.Difference(v) | ||
160 | |||
161 | // Add the new vertices | ||
162 | for _, raw := range newVerts.List() { | ||
163 | v := raw.(Vertex) | ||
164 | |||
165 | // Add to the waitgroup so our walk is not done until everything finishes | ||
166 | w.wait.Add(1) | ||
167 | |||
168 | // Add to our own set so we know about it already | ||
169 | log.Printf("[DEBUG] dag/walk: added new vertex: %q", VertexName(v)) | ||
170 | w.vertices.Add(raw) | ||
171 | |||
172 | // Initialize the vertex info | ||
173 | info := &walkerVertex{ | ||
174 | DoneCh: make(chan struct{}), | ||
175 | CancelCh: make(chan struct{}), | ||
176 | deps: make(map[Vertex]chan struct{}), | ||
177 | } | ||
178 | |||
179 | // Add it to the map and kick off the walk | ||
180 | w.vertexMap[v] = info | ||
181 | } | ||
182 | |||
183 | // Remove the old vertices | ||
184 | for _, raw := range oldVerts.List() { | ||
185 | v := raw.(Vertex) | ||
186 | |||
187 | // Get the vertex info so we can cancel it | ||
188 | info, ok := w.vertexMap[v] | ||
189 | if !ok { | ||
190 | // This vertex for some reason was never in our map. This | ||
191 | // shouldn't be possible. | ||
192 | continue | ||
193 | } | ||
194 | |||
195 | // Cancel the vertex | ||
196 | close(info.CancelCh) | ||
197 | |||
198 | // Delete it out of the map | ||
199 | delete(w.vertexMap, v) | ||
200 | |||
201 | log.Printf("[DEBUG] dag/walk: removed vertex: %q", VertexName(v)) | ||
202 | w.vertices.Delete(raw) | ||
203 | } | ||
204 | |||
205 | // Add the new edges | ||
206 | var changedDeps Set | ||
207 | for _, raw := range newEdges.List() { | ||
208 | edge := raw.(Edge) | ||
209 | waiter, dep := w.edgeParts(edge) | ||
210 | |||
211 | // Get the info for the waiter | ||
212 | waiterInfo, ok := w.vertexMap[waiter] | ||
213 | if !ok { | ||
214 | // Vertex doesn't exist... shouldn't be possible but ignore. | ||
215 | continue | ||
216 | } | ||
217 | |||
218 | // Get the info for the dep | ||
219 | depInfo, ok := w.vertexMap[dep] | ||
220 | if !ok { | ||
221 | // Vertex doesn't exist... shouldn't be possible but ignore. | ||
222 | continue | ||
223 | } | ||
224 | |||
225 | // Add the dependency to our waiter | ||
226 | waiterInfo.deps[dep] = depInfo.DoneCh | ||
227 | |||
228 | // Record that the deps changed for this waiter | ||
229 | changedDeps.Add(waiter) | ||
230 | |||
231 | log.Printf( | ||
232 | "[DEBUG] dag/walk: added edge: %q waiting on %q", | ||
233 | VertexName(waiter), VertexName(dep)) | ||
234 | w.edges.Add(raw) | ||
235 | } | ||
236 | |||
237 | // Process reoved edges | ||
238 | for _, raw := range oldEdges.List() { | ||
239 | edge := raw.(Edge) | ||
240 | waiter, dep := w.edgeParts(edge) | ||
241 | |||
242 | // Get the info for the waiter | ||
243 | waiterInfo, ok := w.vertexMap[waiter] | ||
244 | if !ok { | ||
245 | // Vertex doesn't exist... shouldn't be possible but ignore. | ||
246 | continue | ||
247 | } | ||
248 | |||
249 | // Delete the dependency from the waiter | ||
250 | delete(waiterInfo.deps, dep) | ||
251 | |||
252 | // Record that the deps changed for this waiter | ||
253 | changedDeps.Add(waiter) | ||
254 | |||
255 | log.Printf( | ||
256 | "[DEBUG] dag/walk: removed edge: %q waiting on %q", | ||
257 | VertexName(waiter), VertexName(dep)) | ||
258 | w.edges.Delete(raw) | ||
259 | } | ||
260 | |||
261 | // For each vertex with changed dependencies, we need to kick off | ||
262 | // a new waiter and notify the vertex of the changes. | ||
263 | for _, raw := range changedDeps.List() { | ||
264 | v := raw.(Vertex) | ||
265 | info, ok := w.vertexMap[v] | ||
266 | if !ok { | ||
267 | // Vertex doesn't exist... shouldn't be possible but ignore. | ||
268 | continue | ||
269 | } | ||
270 | |||
271 | // Create a new done channel | ||
272 | doneCh := make(chan bool, 1) | ||
273 | |||
274 | // Create the channel we close for cancellation | ||
275 | cancelCh := make(chan struct{}) | ||
276 | |||
277 | // Build a new deps copy | ||
278 | deps := make(map[Vertex]<-chan struct{}) | ||
279 | for k, v := range info.deps { | ||
280 | deps[k] = v | ||
281 | } | ||
282 | |||
283 | // Update the update channel | ||
284 | info.DepsLock.Lock() | ||
285 | if info.DepsUpdateCh != nil { | ||
286 | close(info.DepsUpdateCh) | ||
287 | } | ||
288 | info.DepsCh = doneCh | ||
289 | info.DepsUpdateCh = make(chan struct{}) | ||
290 | info.DepsLock.Unlock() | ||
291 | |||
292 | // Cancel the older waiter | ||
293 | if info.depsCancelCh != nil { | ||
294 | close(info.depsCancelCh) | ||
295 | } | ||
296 | info.depsCancelCh = cancelCh | ||
297 | |||
298 | log.Printf( | ||
299 | "[DEBUG] dag/walk: dependencies changed for %q, sending new deps", | ||
300 | VertexName(v)) | ||
301 | |||
302 | // Start the waiter | ||
303 | go w.waitDeps(v, deps, doneCh, cancelCh) | ||
304 | } | ||
305 | |||
306 | // Start all the new vertices. We do this at the end so that all | ||
307 | // the edge waiters and changes are setup above. | ||
308 | for _, raw := range newVerts.List() { | ||
309 | v := raw.(Vertex) | ||
310 | go w.walkVertex(v, w.vertexMap[v]) | ||
311 | } | ||
312 | } | ||
313 | |||
314 | // edgeParts returns the waiter and the dependency, in that order. | ||
315 | // The waiter is waiting on the dependency. | ||
316 | func (w *Walker) edgeParts(e Edge) (Vertex, Vertex) { | ||
317 | if w.Reverse { | ||
318 | return e.Source(), e.Target() | ||
319 | } | ||
320 | |||
321 | return e.Target(), e.Source() | ||
322 | } | ||
323 | |||
324 | // walkVertex walks a single vertex, waiting for any dependencies before | ||
325 | // executing the callback. | ||
326 | func (w *Walker) walkVertex(v Vertex, info *walkerVertex) { | ||
327 | // When we're done executing, lower the waitgroup count | ||
328 | defer w.wait.Done() | ||
329 | |||
330 | // When we're done, always close our done channel | ||
331 | defer close(info.DoneCh) | ||
332 | |||
333 | // Wait for our dependencies. We create a [closed] deps channel so | ||
334 | // that we can immediately fall through to load our actual DepsCh. | ||
335 | var depsSuccess bool | ||
336 | var depsUpdateCh chan struct{} | ||
337 | depsCh := make(chan bool, 1) | ||
338 | depsCh <- true | ||
339 | close(depsCh) | ||
340 | for { | ||
341 | select { | ||
342 | case <-info.CancelCh: | ||
343 | // Cancel | ||
344 | return | ||
345 | |||
346 | case depsSuccess = <-depsCh: | ||
347 | // Deps complete! Mark as nil to trigger completion handling. | ||
348 | depsCh = nil | ||
349 | |||
350 | case <-depsUpdateCh: | ||
351 | // New deps, reloop | ||
352 | } | ||
353 | |||
354 | // Check if we have updated dependencies. This can happen if the | ||
355 | // dependencies were satisfied exactly prior to an Update occurring. | ||
356 | // In that case, we'd like to take into account new dependencies | ||
357 | // if possible. | ||
358 | info.DepsLock.Lock() | ||
359 | if info.DepsCh != nil { | ||
360 | depsCh = info.DepsCh | ||
361 | info.DepsCh = nil | ||
362 | } | ||
363 | if info.DepsUpdateCh != nil { | ||
364 | depsUpdateCh = info.DepsUpdateCh | ||
365 | } | ||
366 | info.DepsLock.Unlock() | ||
367 | |||
368 | // If we still have no deps channel set, then we're done! | ||
369 | if depsCh == nil { | ||
370 | break | ||
371 | } | ||
372 | } | ||
373 | |||
374 | // If we passed dependencies, we just want to check once more that | ||
375 | // we're not cancelled, since this can happen just as dependencies pass. | ||
376 | select { | ||
377 | case <-info.CancelCh: | ||
378 | // Cancelled during an update while dependencies completed. | ||
379 | return | ||
380 | default: | ||
381 | } | ||
382 | |||
383 | // Run our callback or note that our upstream failed | ||
384 | var err error | ||
385 | if depsSuccess { | ||
386 | log.Printf("[DEBUG] dag/walk: walking %q", VertexName(v)) | ||
387 | err = w.Callback(v) | ||
388 | } else { | ||
389 | log.Printf("[DEBUG] dag/walk: upstream errored, not walking %q", VertexName(v)) | ||
390 | err = errWalkUpstream | ||
391 | } | ||
392 | |||
393 | // Record the error | ||
394 | if err != nil { | ||
395 | w.errLock.Lock() | ||
396 | defer w.errLock.Unlock() | ||
397 | |||
398 | if w.errMap == nil { | ||
399 | w.errMap = make(map[Vertex]error) | ||
400 | } | ||
401 | w.errMap[v] = err | ||
402 | } | ||
403 | } | ||
404 | |||
405 | func (w *Walker) waitDeps( | ||
406 | v Vertex, | ||
407 | deps map[Vertex]<-chan struct{}, | ||
408 | doneCh chan<- bool, | ||
409 | cancelCh <-chan struct{}) { | ||
410 | // For each dependency given to us, wait for it to complete | ||
411 | for dep, depCh := range deps { | ||
412 | DepSatisfied: | ||
413 | for { | ||
414 | select { | ||
415 | case <-depCh: | ||
416 | // Dependency satisfied! | ||
417 | break DepSatisfied | ||
418 | |||
419 | case <-cancelCh: | ||
420 | // Wait cancelled. Note that we didn't satisfy dependencies | ||
421 | // so that anything waiting on us also doesn't run. | ||
422 | doneCh <- false | ||
423 | return | ||
424 | |||
425 | case <-time.After(time.Second * 5): | ||
426 | log.Printf("[DEBUG] dag/walk: vertex %q, waiting for: %q", | ||
427 | VertexName(v), VertexName(dep)) | ||
428 | } | ||
429 | } | ||
430 | } | ||
431 | |||
432 | // Dependencies satisfied! We need to check if any errored | ||
433 | w.errLock.Lock() | ||
434 | defer w.errLock.Unlock() | ||
435 | for dep, _ := range deps { | ||
436 | if w.errMap[dep] != nil { | ||
437 | // One of our dependencies failed, so return false | ||
438 | doneCh <- false | ||
439 | return | ||
440 | } | ||
441 | } | ||
442 | |||
443 | // All dependencies satisfied and successful | ||
444 | doneCh <- true | ||
445 | } | ||