]>
Commit | Line | Data |
---|---|---|
1 | package dag | |
2 | ||
3 | import ( | |
4 | "errors" | |
5 | "log" | |
6 | "sync" | |
7 | "time" | |
8 | ||
9 | "github.com/hashicorp/terraform/tfdiags" | |
10 | ) | |
11 | ||
12 | // Walker is used to walk every vertex of a graph in parallel. | |
13 | // | |
14 | // A vertex will only be walked when the dependencies of that vertex have | |
15 | // been walked. If two vertices can be walked at the same time, they will be. | |
16 | // | |
17 | // Update can be called to update the graph. This can be called even during | |
18 | // a walk, cahnging vertices/edges mid-walk. This should be done carefully. | |
19 | // If a vertex is removed but has already been executed, the result of that | |
20 | // execution (any error) is still returned by Wait. Changing or re-adding | |
21 | // a vertex that has already executed has no effect. Changing edges of | |
22 | // a vertex that has already executed has no effect. | |
23 | // | |
24 | // Non-parallelism can be enforced by introducing a lock in your callback | |
25 | // function. However, the goroutine overhead of a walk will remain. | |
26 | // Walker will create V*2 goroutines (one for each vertex, and dependency | |
27 | // waiter for each vertex). In general this should be of no concern unless | |
28 | // there are a huge number of vertices. | |
29 | // | |
30 | // The walk is depth first by default. This can be changed with the Reverse | |
31 | // option. | |
32 | // | |
33 | // A single walker is only valid for one graph walk. After the walk is complete | |
34 | // you must construct a new walker to walk again. State for the walk is never | |
35 | // deleted in case vertices or edges are changed. | |
36 | type Walker struct { | |
37 | // Callback is what is called for each vertex | |
38 | Callback WalkFunc | |
39 | ||
40 | // Reverse, if true, causes the source of an edge to depend on a target. | |
41 | // When false (default), the target depends on the source. | |
42 | Reverse bool | |
43 | ||
44 | // changeLock must be held to modify any of the fields below. Only Update | |
45 | // should modify these fields. Modifying them outside of Update can cause | |
46 | // serious problems. | |
47 | changeLock sync.Mutex | |
48 | vertices Set | |
49 | edges Set | |
50 | vertexMap map[Vertex]*walkerVertex | |
51 | ||
52 | // wait is done when all vertices have executed. It may become "undone" | |
53 | // if new vertices are added. | |
54 | wait sync.WaitGroup | |
55 | ||
56 | // diagsMap contains the diagnostics recorded so far for execution, | |
57 | // and upstreamFailed contains all the vertices whose problems were | |
58 | // caused by upstream failures, and thus whose diagnostics should be | |
59 | // excluded from the final set. | |
60 | // | |
61 | // Readers and writers of either map must hold diagsLock. | |
62 | diagsMap map[Vertex]tfdiags.Diagnostics | |
63 | upstreamFailed map[Vertex]struct{} | |
64 | diagsLock sync.Mutex | |
65 | } | |
66 | ||
67 | type walkerVertex struct { | |
68 | // These should only be set once on initialization and never written again. | |
69 | // They are not protected by a lock since they don't need to be since | |
70 | // they are write-once. | |
71 | ||
72 | // DoneCh is closed when this vertex has completed execution, regardless | |
73 | // of success. | |
74 | // | |
75 | // CancelCh is closed when the vertex should cancel execution. If execution | |
76 | // is already complete (DoneCh is closed), this has no effect. Otherwise, | |
77 | // execution is cancelled as quickly as possible. | |
78 | DoneCh chan struct{} | |
79 | CancelCh chan struct{} | |
80 | ||
81 | // Dependency information. Any changes to any of these fields requires | |
82 | // holding DepsLock. | |
83 | // | |
84 | // DepsCh is sent a single value that denotes whether the upstream deps | |
85 | // were successful (no errors). Any value sent means that the upstream | |
86 | // dependencies are complete. No other values will ever be sent again. | |
87 | // | |
88 | // DepsUpdateCh is closed when there is a new DepsCh set. | |
89 | DepsCh chan bool | |
90 | DepsUpdateCh chan struct{} | |
91 | DepsLock sync.Mutex | |
92 | ||
93 | // Below is not safe to read/write in parallel. This behavior is | |
94 | // enforced by changes only happening in Update. Nothing else should | |
95 | // ever modify these. | |
96 | deps map[Vertex]chan struct{} | |
97 | depsCancelCh chan struct{} | |
98 | } | |
99 | ||
100 | // errWalkUpstream is used in the errMap of a walk to note that an upstream | |
101 | // dependency failed so this vertex wasn't run. This is not shown in the final | |
102 | // user-returned error. | |
103 | var errWalkUpstream = errors.New("upstream dependency failed") | |
104 | ||
105 | // Wait waits for the completion of the walk and returns diagnostics describing | |
106 | // any problems that arose. Update should be called to populate the walk with | |
107 | // vertices and edges prior to calling this. | |
108 | // | |
109 | // Wait will return as soon as all currently known vertices are complete. | |
110 | // If you plan on calling Update with more vertices in the future, you | |
111 | // should not call Wait until after this is done. | |
112 | func (w *Walker) Wait() tfdiags.Diagnostics { | |
113 | // Wait for completion | |
114 | w.wait.Wait() | |
115 | ||
116 | var diags tfdiags.Diagnostics | |
117 | w.diagsLock.Lock() | |
118 | for v, vDiags := range w.diagsMap { | |
119 | if _, upstream := w.upstreamFailed[v]; upstream { | |
120 | // Ignore diagnostics for nodes that had failed upstreams, since | |
121 | // the downstream diagnostics are likely to be redundant. | |
122 | continue | |
123 | } | |
124 | diags = diags.Append(vDiags) | |
125 | } | |
126 | w.diagsLock.Unlock() | |
127 | ||
128 | return diags | |
129 | } | |
130 | ||
131 | // Update updates the currently executing walk with the given graph. | |
132 | // This will perform a diff of the vertices and edges and update the walker. | |
133 | // Already completed vertices remain completed (including any errors during | |
134 | // their execution). | |
135 | // | |
136 | // This returns immediately once the walker is updated; it does not wait | |
137 | // for completion of the walk. | |
138 | // | |
139 | // Multiple Updates can be called in parallel. Update can be called at any | |
140 | // time during a walk. | |
141 | func (w *Walker) Update(g *AcyclicGraph) { | |
142 | log.Print("[TRACE] dag/walk: updating graph") | |
143 | var v, e *Set | |
144 | if g != nil { | |
145 | v, e = g.vertices, g.edges | |
146 | } | |
147 | ||
148 | // Grab the change lock so no more updates happen but also so that | |
149 | // no new vertices are executed during this time since we may be | |
150 | // removing them. | |
151 | w.changeLock.Lock() | |
152 | defer w.changeLock.Unlock() | |
153 | ||
154 | // Initialize fields | |
155 | if w.vertexMap == nil { | |
156 | w.vertexMap = make(map[Vertex]*walkerVertex) | |
157 | } | |
158 | ||
159 | // Calculate all our sets | |
160 | newEdges := e.Difference(&w.edges) | |
161 | oldEdges := w.edges.Difference(e) | |
162 | newVerts := v.Difference(&w.vertices) | |
163 | oldVerts := w.vertices.Difference(v) | |
164 | ||
165 | // Add the new vertices | |
166 | for _, raw := range newVerts.List() { | |
167 | v := raw.(Vertex) | |
168 | ||
169 | // Add to the waitgroup so our walk is not done until everything finishes | |
170 | w.wait.Add(1) | |
171 | ||
172 | // Add to our own set so we know about it already | |
173 | log.Printf("[TRACE] dag/walk: added new vertex: %q", VertexName(v)) | |
174 | w.vertices.Add(raw) | |
175 | ||
176 | // Initialize the vertex info | |
177 | info := &walkerVertex{ | |
178 | DoneCh: make(chan struct{}), | |
179 | CancelCh: make(chan struct{}), | |
180 | deps: make(map[Vertex]chan struct{}), | |
181 | } | |
182 | ||
183 | // Add it to the map and kick off the walk | |
184 | w.vertexMap[v] = info | |
185 | } | |
186 | ||
187 | // Remove the old vertices | |
188 | for _, raw := range oldVerts.List() { | |
189 | v := raw.(Vertex) | |
190 | ||
191 | // Get the vertex info so we can cancel it | |
192 | info, ok := w.vertexMap[v] | |
193 | if !ok { | |
194 | // This vertex for some reason was never in our map. This | |
195 | // shouldn't be possible. | |
196 | continue | |
197 | } | |
198 | ||
199 | // Cancel the vertex | |
200 | close(info.CancelCh) | |
201 | ||
202 | // Delete it out of the map | |
203 | delete(w.vertexMap, v) | |
204 | ||
205 | log.Printf("[TRACE] dag/walk: removed vertex: %q", VertexName(v)) | |
206 | w.vertices.Delete(raw) | |
207 | } | |
208 | ||
209 | // Add the new edges | |
210 | var changedDeps Set | |
211 | for _, raw := range newEdges.List() { | |
212 | edge := raw.(Edge) | |
213 | waiter, dep := w.edgeParts(edge) | |
214 | ||
215 | // Get the info for the waiter | |
216 | waiterInfo, ok := w.vertexMap[waiter] | |
217 | if !ok { | |
218 | // Vertex doesn't exist... shouldn't be possible but ignore. | |
219 | continue | |
220 | } | |
221 | ||
222 | // Get the info for the dep | |
223 | depInfo, ok := w.vertexMap[dep] | |
224 | if !ok { | |
225 | // Vertex doesn't exist... shouldn't be possible but ignore. | |
226 | continue | |
227 | } | |
228 | ||
229 | // Add the dependency to our waiter | |
230 | waiterInfo.deps[dep] = depInfo.DoneCh | |
231 | ||
232 | // Record that the deps changed for this waiter | |
233 | changedDeps.Add(waiter) | |
234 | ||
235 | log.Printf( | |
236 | "[TRACE] dag/walk: added edge: %q waiting on %q", | |
237 | VertexName(waiter), VertexName(dep)) | |
238 | w.edges.Add(raw) | |
239 | } | |
240 | ||
241 | // Process reoved edges | |
242 | for _, raw := range oldEdges.List() { | |
243 | edge := raw.(Edge) | |
244 | waiter, dep := w.edgeParts(edge) | |
245 | ||
246 | // Get the info for the waiter | |
247 | waiterInfo, ok := w.vertexMap[waiter] | |
248 | if !ok { | |
249 | // Vertex doesn't exist... shouldn't be possible but ignore. | |
250 | continue | |
251 | } | |
252 | ||
253 | // Delete the dependency from the waiter | |
254 | delete(waiterInfo.deps, dep) | |
255 | ||
256 | // Record that the deps changed for this waiter | |
257 | changedDeps.Add(waiter) | |
258 | ||
259 | log.Printf( | |
260 | "[TRACE] dag/walk: removed edge: %q waiting on %q", | |
261 | VertexName(waiter), VertexName(dep)) | |
262 | w.edges.Delete(raw) | |
263 | } | |
264 | ||
265 | // For each vertex with changed dependencies, we need to kick off | |
266 | // a new waiter and notify the vertex of the changes. | |
267 | for _, raw := range changedDeps.List() { | |
268 | v := raw.(Vertex) | |
269 | info, ok := w.vertexMap[v] | |
270 | if !ok { | |
271 | // Vertex doesn't exist... shouldn't be possible but ignore. | |
272 | continue | |
273 | } | |
274 | ||
275 | // Create a new done channel | |
276 | doneCh := make(chan bool, 1) | |
277 | ||
278 | // Create the channel we close for cancellation | |
279 | cancelCh := make(chan struct{}) | |
280 | ||
281 | // Build a new deps copy | |
282 | deps := make(map[Vertex]<-chan struct{}) | |
283 | for k, v := range info.deps { | |
284 | deps[k] = v | |
285 | } | |
286 | ||
287 | // Update the update channel | |
288 | info.DepsLock.Lock() | |
289 | if info.DepsUpdateCh != nil { | |
290 | close(info.DepsUpdateCh) | |
291 | } | |
292 | info.DepsCh = doneCh | |
293 | info.DepsUpdateCh = make(chan struct{}) | |
294 | info.DepsLock.Unlock() | |
295 | ||
296 | // Cancel the older waiter | |
297 | if info.depsCancelCh != nil { | |
298 | close(info.depsCancelCh) | |
299 | } | |
300 | info.depsCancelCh = cancelCh | |
301 | ||
302 | log.Printf( | |
303 | "[TRACE] dag/walk: dependencies changed for %q, sending new deps", | |
304 | VertexName(v)) | |
305 | ||
306 | // Start the waiter | |
307 | go w.waitDeps(v, deps, doneCh, cancelCh) | |
308 | } | |
309 | ||
310 | // Start all the new vertices. We do this at the end so that all | |
311 | // the edge waiters and changes are setup above. | |
312 | for _, raw := range newVerts.List() { | |
313 | v := raw.(Vertex) | |
314 | go w.walkVertex(v, w.vertexMap[v]) | |
315 | } | |
316 | } | |
317 | ||
318 | // edgeParts returns the waiter and the dependency, in that order. | |
319 | // The waiter is waiting on the dependency. | |
320 | func (w *Walker) edgeParts(e Edge) (Vertex, Vertex) { | |
321 | if w.Reverse { | |
322 | return e.Source(), e.Target() | |
323 | } | |
324 | ||
325 | return e.Target(), e.Source() | |
326 | } | |
327 | ||
328 | // walkVertex walks a single vertex, waiting for any dependencies before | |
329 | // executing the callback. | |
330 | func (w *Walker) walkVertex(v Vertex, info *walkerVertex) { | |
331 | // When we're done executing, lower the waitgroup count | |
332 | defer w.wait.Done() | |
333 | ||
334 | // When we're done, always close our done channel | |
335 | defer close(info.DoneCh) | |
336 | ||
337 | // Wait for our dependencies. We create a [closed] deps channel so | |
338 | // that we can immediately fall through to load our actual DepsCh. | |
339 | var depsSuccess bool | |
340 | var depsUpdateCh chan struct{} | |
341 | depsCh := make(chan bool, 1) | |
342 | depsCh <- true | |
343 | close(depsCh) | |
344 | for { | |
345 | select { | |
346 | case <-info.CancelCh: | |
347 | // Cancel | |
348 | return | |
349 | ||
350 | case depsSuccess = <-depsCh: | |
351 | // Deps complete! Mark as nil to trigger completion handling. | |
352 | depsCh = nil | |
353 | ||
354 | case <-depsUpdateCh: | |
355 | // New deps, reloop | |
356 | } | |
357 | ||
358 | // Check if we have updated dependencies. This can happen if the | |
359 | // dependencies were satisfied exactly prior to an Update occurring. | |
360 | // In that case, we'd like to take into account new dependencies | |
361 | // if possible. | |
362 | info.DepsLock.Lock() | |
363 | if info.DepsCh != nil { | |
364 | depsCh = info.DepsCh | |
365 | info.DepsCh = nil | |
366 | } | |
367 | if info.DepsUpdateCh != nil { | |
368 | depsUpdateCh = info.DepsUpdateCh | |
369 | } | |
370 | info.DepsLock.Unlock() | |
371 | ||
372 | // If we still have no deps channel set, then we're done! | |
373 | if depsCh == nil { | |
374 | break | |
375 | } | |
376 | } | |
377 | ||
378 | // If we passed dependencies, we just want to check once more that | |
379 | // we're not cancelled, since this can happen just as dependencies pass. | |
380 | select { | |
381 | case <-info.CancelCh: | |
382 | // Cancelled during an update while dependencies completed. | |
383 | return | |
384 | default: | |
385 | } | |
386 | ||
387 | // Run our callback or note that our upstream failed | |
388 | var diags tfdiags.Diagnostics | |
389 | var upstreamFailed bool | |
390 | if depsSuccess { | |
391 | log.Printf("[TRACE] dag/walk: visiting %q", VertexName(v)) | |
392 | diags = w.Callback(v) | |
393 | } else { | |
394 | log.Printf("[TRACE] dag/walk: upstream of %q errored, so skipping", VertexName(v)) | |
395 | // This won't be displayed to the user because we'll set upstreamFailed, | |
396 | // but we need to ensure there's at least one error in here so that | |
397 | // the failures will cascade downstream. | |
398 | diags = diags.Append(errors.New("upstream dependencies failed")) | |
399 | upstreamFailed = true | |
400 | } | |
401 | ||
402 | // Record the result (we must do this after execution because we mustn't | |
403 | // hold diagsLock while visiting a vertex.) | |
404 | w.diagsLock.Lock() | |
405 | if w.diagsMap == nil { | |
406 | w.diagsMap = make(map[Vertex]tfdiags.Diagnostics) | |
407 | } | |
408 | w.diagsMap[v] = diags | |
409 | if w.upstreamFailed == nil { | |
410 | w.upstreamFailed = make(map[Vertex]struct{}) | |
411 | } | |
412 | if upstreamFailed { | |
413 | w.upstreamFailed[v] = struct{}{} | |
414 | } | |
415 | w.diagsLock.Unlock() | |
416 | } | |
417 | ||
418 | func (w *Walker) waitDeps( | |
419 | v Vertex, | |
420 | deps map[Vertex]<-chan struct{}, | |
421 | doneCh chan<- bool, | |
422 | cancelCh <-chan struct{}) { | |
423 | ||
424 | // For each dependency given to us, wait for it to complete | |
425 | for dep, depCh := range deps { | |
426 | DepSatisfied: | |
427 | for { | |
428 | select { | |
429 | case <-depCh: | |
430 | // Dependency satisfied! | |
431 | break DepSatisfied | |
432 | ||
433 | case <-cancelCh: | |
434 | // Wait cancelled. Note that we didn't satisfy dependencies | |
435 | // so that anything waiting on us also doesn't run. | |
436 | doneCh <- false | |
437 | return | |
438 | ||
439 | case <-time.After(time.Second * 5): | |
440 | log.Printf("[TRACE] dag/walk: vertex %q is waiting for %q", | |
441 | VertexName(v), VertexName(dep)) | |
442 | } | |
443 | } | |
444 | } | |
445 | ||
446 | // Dependencies satisfied! We need to check if any errored | |
447 | w.diagsLock.Lock() | |
448 | defer w.diagsLock.Unlock() | |
449 | for dep := range deps { | |
450 | if w.diagsMap[dep].HasErrors() { | |
451 | // One of our dependencies failed, so return false | |
452 | doneCh <- false | |
453 | return | |
454 | } | |
455 | } | |
456 | ||
457 | // All dependencies satisfied and successful | |
458 | doneCh <- true | |
459 | } |