]>
Commit | Line | Data |
---|---|---|
1 | package dag | |
2 | ||
3 | import ( | |
4 | "errors" | |
5 | "fmt" | |
6 | "log" | |
7 | "sync" | |
8 | "time" | |
9 | ||
10 | "github.com/hashicorp/go-multierror" | |
11 | ) | |
12 | ||
13 | // Walker is used to walk every vertex of a graph in parallel. | |
14 | // | |
15 | // A vertex will only be walked when the dependencies of that vertex have | |
16 | // been walked. If two vertices can be walked at the same time, they will be. | |
17 | // | |
18 | // Update can be called to update the graph. This can be called even during | |
19 | // a walk, cahnging vertices/edges mid-walk. This should be done carefully. | |
20 | // If a vertex is removed but has already been executed, the result of that | |
21 | // execution (any error) is still returned by Wait. Changing or re-adding | |
22 | // a vertex that has already executed has no effect. Changing edges of | |
23 | // a vertex that has already executed has no effect. | |
24 | // | |
25 | // Non-parallelism can be enforced by introducing a lock in your callback | |
26 | // function. However, the goroutine overhead of a walk will remain. | |
27 | // Walker will create V*2 goroutines (one for each vertex, and dependency | |
28 | // waiter for each vertex). In general this should be of no concern unless | |
29 | // there are a huge number of vertices. | |
30 | // | |
31 | // The walk is depth first by default. This can be changed with the Reverse | |
32 | // option. | |
33 | // | |
34 | // A single walker is only valid for one graph walk. After the walk is complete | |
35 | // you must construct a new walker to walk again. State for the walk is never | |
36 | // deleted in case vertices or edges are changed. | |
37 | type Walker struct { | |
38 | // Callback is what is called for each vertex | |
39 | Callback WalkFunc | |
40 | ||
41 | // Reverse, if true, causes the source of an edge to depend on a target. | |
42 | // When false (default), the target depends on the source. | |
43 | Reverse bool | |
44 | ||
45 | // changeLock must be held to modify any of the fields below. Only Update | |
46 | // should modify these fields. Modifying them outside of Update can cause | |
47 | // serious problems. | |
48 | changeLock sync.Mutex | |
49 | vertices Set | |
50 | edges Set | |
51 | vertexMap map[Vertex]*walkerVertex | |
52 | ||
53 | // wait is done when all vertices have executed. It may become "undone" | |
54 | // if new vertices are added. | |
55 | wait sync.WaitGroup | |
56 | ||
57 | // errMap contains the errors recorded so far for execution. Reading | |
58 | // and writing should hold errLock. | |
59 | errMap map[Vertex]error | |
60 | errLock sync.Mutex | |
61 | } | |
62 | ||
63 | type walkerVertex struct { | |
64 | // These should only be set once on initialization and never written again. | |
65 | // They are not protected by a lock since they don't need to be since | |
66 | // they are write-once. | |
67 | ||
68 | // DoneCh is closed when this vertex has completed execution, regardless | |
69 | // of success. | |
70 | // | |
71 | // CancelCh is closed when the vertex should cancel execution. If execution | |
72 | // is already complete (DoneCh is closed), this has no effect. Otherwise, | |
73 | // execution is cancelled as quickly as possible. | |
74 | DoneCh chan struct{} | |
75 | CancelCh chan struct{} | |
76 | ||
77 | // Dependency information. Any changes to any of these fields requires | |
78 | // holding DepsLock. | |
79 | // | |
80 | // DepsCh is sent a single value that denotes whether the upstream deps | |
81 | // were successful (no errors). Any value sent means that the upstream | |
82 | // dependencies are complete. No other values will ever be sent again. | |
83 | // | |
84 | // DepsUpdateCh is closed when there is a new DepsCh set. | |
85 | DepsCh chan bool | |
86 | DepsUpdateCh chan struct{} | |
87 | DepsLock sync.Mutex | |
88 | ||
89 | // Below is not safe to read/write in parallel. This behavior is | |
90 | // enforced by changes only happening in Update. Nothing else should | |
91 | // ever modify these. | |
92 | deps map[Vertex]chan struct{} | |
93 | depsCancelCh chan struct{} | |
94 | } | |
95 | ||
96 | // errWalkUpstream is used in the errMap of a walk to note that an upstream | |
97 | // dependency failed so this vertex wasn't run. This is not shown in the final | |
98 | // user-returned error. | |
99 | var errWalkUpstream = errors.New("upstream dependency failed") | |
100 | ||
101 | // Wait waits for the completion of the walk and returns any errors ( | |
102 | // in the form of a multierror) that occurred. Update should be called | |
103 | // to populate the walk with vertices and edges prior to calling this. | |
104 | // | |
105 | // Wait will return as soon as all currently known vertices are complete. | |
106 | // If you plan on calling Update with more vertices in the future, you | |
107 | // should not call Wait until after this is done. | |
108 | func (w *Walker) Wait() error { | |
109 | // Wait for completion | |
110 | w.wait.Wait() | |
111 | ||
112 | // Grab the error lock | |
113 | w.errLock.Lock() | |
114 | defer w.errLock.Unlock() | |
115 | ||
116 | // Build the error | |
117 | var result error | |
118 | for v, err := range w.errMap { | |
119 | if err != nil && err != errWalkUpstream { | |
120 | result = multierror.Append(result, fmt.Errorf( | |
121 | "%s: %s", VertexName(v), err)) | |
122 | } | |
123 | } | |
124 | ||
125 | return result | |
126 | } | |
127 | ||
128 | // Update updates the currently executing walk with the given graph. | |
129 | // This will perform a diff of the vertices and edges and update the walker. | |
130 | // Already completed vertices remain completed (including any errors during | |
131 | // their execution). | |
132 | // | |
133 | // This returns immediately once the walker is updated; it does not wait | |
134 | // for completion of the walk. | |
135 | // | |
136 | // Multiple Updates can be called in parallel. Update can be called at any | |
137 | // time during a walk. | |
138 | func (w *Walker) Update(g *AcyclicGraph) { | |
139 | var v, e *Set | |
140 | if g != nil { | |
141 | v, e = g.vertices, g.edges | |
142 | } | |
143 | ||
144 | // Grab the change lock so no more updates happen but also so that | |
145 | // no new vertices are executed during this time since we may be | |
146 | // removing them. | |
147 | w.changeLock.Lock() | |
148 | defer w.changeLock.Unlock() | |
149 | ||
150 | // Initialize fields | |
151 | if w.vertexMap == nil { | |
152 | w.vertexMap = make(map[Vertex]*walkerVertex) | |
153 | } | |
154 | ||
155 | // Calculate all our sets | |
156 | newEdges := e.Difference(&w.edges) | |
157 | oldEdges := w.edges.Difference(e) | |
158 | newVerts := v.Difference(&w.vertices) | |
159 | oldVerts := w.vertices.Difference(v) | |
160 | ||
161 | // Add the new vertices | |
162 | for _, raw := range newVerts.List() { | |
163 | v := raw.(Vertex) | |
164 | ||
165 | // Add to the waitgroup so our walk is not done until everything finishes | |
166 | w.wait.Add(1) | |
167 | ||
168 | // Add to our own set so we know about it already | |
169 | log.Printf("[DEBUG] dag/walk: added new vertex: %q", VertexName(v)) | |
170 | w.vertices.Add(raw) | |
171 | ||
172 | // Initialize the vertex info | |
173 | info := &walkerVertex{ | |
174 | DoneCh: make(chan struct{}), | |
175 | CancelCh: make(chan struct{}), | |
176 | deps: make(map[Vertex]chan struct{}), | |
177 | } | |
178 | ||
179 | // Add it to the map and kick off the walk | |
180 | w.vertexMap[v] = info | |
181 | } | |
182 | ||
183 | // Remove the old vertices | |
184 | for _, raw := range oldVerts.List() { | |
185 | v := raw.(Vertex) | |
186 | ||
187 | // Get the vertex info so we can cancel it | |
188 | info, ok := w.vertexMap[v] | |
189 | if !ok { | |
190 | // This vertex for some reason was never in our map. This | |
191 | // shouldn't be possible. | |
192 | continue | |
193 | } | |
194 | ||
195 | // Cancel the vertex | |
196 | close(info.CancelCh) | |
197 | ||
198 | // Delete it out of the map | |
199 | delete(w.vertexMap, v) | |
200 | ||
201 | log.Printf("[DEBUG] dag/walk: removed vertex: %q", VertexName(v)) | |
202 | w.vertices.Delete(raw) | |
203 | } | |
204 | ||
205 | // Add the new edges | |
206 | var changedDeps Set | |
207 | for _, raw := range newEdges.List() { | |
208 | edge := raw.(Edge) | |
209 | waiter, dep := w.edgeParts(edge) | |
210 | ||
211 | // Get the info for the waiter | |
212 | waiterInfo, ok := w.vertexMap[waiter] | |
213 | if !ok { | |
214 | // Vertex doesn't exist... shouldn't be possible but ignore. | |
215 | continue | |
216 | } | |
217 | ||
218 | // Get the info for the dep | |
219 | depInfo, ok := w.vertexMap[dep] | |
220 | if !ok { | |
221 | // Vertex doesn't exist... shouldn't be possible but ignore. | |
222 | continue | |
223 | } | |
224 | ||
225 | // Add the dependency to our waiter | |
226 | waiterInfo.deps[dep] = depInfo.DoneCh | |
227 | ||
228 | // Record that the deps changed for this waiter | |
229 | changedDeps.Add(waiter) | |
230 | ||
231 | log.Printf( | |
232 | "[DEBUG] dag/walk: added edge: %q waiting on %q", | |
233 | VertexName(waiter), VertexName(dep)) | |
234 | w.edges.Add(raw) | |
235 | } | |
236 | ||
237 | // Process reoved edges | |
238 | for _, raw := range oldEdges.List() { | |
239 | edge := raw.(Edge) | |
240 | waiter, dep := w.edgeParts(edge) | |
241 | ||
242 | // Get the info for the waiter | |
243 | waiterInfo, ok := w.vertexMap[waiter] | |
244 | if !ok { | |
245 | // Vertex doesn't exist... shouldn't be possible but ignore. | |
246 | continue | |
247 | } | |
248 | ||
249 | // Delete the dependency from the waiter | |
250 | delete(waiterInfo.deps, dep) | |
251 | ||
252 | // Record that the deps changed for this waiter | |
253 | changedDeps.Add(waiter) | |
254 | ||
255 | log.Printf( | |
256 | "[DEBUG] dag/walk: removed edge: %q waiting on %q", | |
257 | VertexName(waiter), VertexName(dep)) | |
258 | w.edges.Delete(raw) | |
259 | } | |
260 | ||
261 | // For each vertex with changed dependencies, we need to kick off | |
262 | // a new waiter and notify the vertex of the changes. | |
263 | for _, raw := range changedDeps.List() { | |
264 | v := raw.(Vertex) | |
265 | info, ok := w.vertexMap[v] | |
266 | if !ok { | |
267 | // Vertex doesn't exist... shouldn't be possible but ignore. | |
268 | continue | |
269 | } | |
270 | ||
271 | // Create a new done channel | |
272 | doneCh := make(chan bool, 1) | |
273 | ||
274 | // Create the channel we close for cancellation | |
275 | cancelCh := make(chan struct{}) | |
276 | ||
277 | // Build a new deps copy | |
278 | deps := make(map[Vertex]<-chan struct{}) | |
279 | for k, v := range info.deps { | |
280 | deps[k] = v | |
281 | } | |
282 | ||
283 | // Update the update channel | |
284 | info.DepsLock.Lock() | |
285 | if info.DepsUpdateCh != nil { | |
286 | close(info.DepsUpdateCh) | |
287 | } | |
288 | info.DepsCh = doneCh | |
289 | info.DepsUpdateCh = make(chan struct{}) | |
290 | info.DepsLock.Unlock() | |
291 | ||
292 | // Cancel the older waiter | |
293 | if info.depsCancelCh != nil { | |
294 | close(info.depsCancelCh) | |
295 | } | |
296 | info.depsCancelCh = cancelCh | |
297 | ||
298 | log.Printf( | |
299 | "[DEBUG] dag/walk: dependencies changed for %q, sending new deps", | |
300 | VertexName(v)) | |
301 | ||
302 | // Start the waiter | |
303 | go w.waitDeps(v, deps, doneCh, cancelCh) | |
304 | } | |
305 | ||
306 | // Start all the new vertices. We do this at the end so that all | |
307 | // the edge waiters and changes are setup above. | |
308 | for _, raw := range newVerts.List() { | |
309 | v := raw.(Vertex) | |
310 | go w.walkVertex(v, w.vertexMap[v]) | |
311 | } | |
312 | } | |
313 | ||
314 | // edgeParts returns the waiter and the dependency, in that order. | |
315 | // The waiter is waiting on the dependency. | |
316 | func (w *Walker) edgeParts(e Edge) (Vertex, Vertex) { | |
317 | if w.Reverse { | |
318 | return e.Source(), e.Target() | |
319 | } | |
320 | ||
321 | return e.Target(), e.Source() | |
322 | } | |
323 | ||
324 | // walkVertex walks a single vertex, waiting for any dependencies before | |
325 | // executing the callback. | |
326 | func (w *Walker) walkVertex(v Vertex, info *walkerVertex) { | |
327 | // When we're done executing, lower the waitgroup count | |
328 | defer w.wait.Done() | |
329 | ||
330 | // When we're done, always close our done channel | |
331 | defer close(info.DoneCh) | |
332 | ||
333 | // Wait for our dependencies. We create a [closed] deps channel so | |
334 | // that we can immediately fall through to load our actual DepsCh. | |
335 | var depsSuccess bool | |
336 | var depsUpdateCh chan struct{} | |
337 | depsCh := make(chan bool, 1) | |
338 | depsCh <- true | |
339 | close(depsCh) | |
340 | for { | |
341 | select { | |
342 | case <-info.CancelCh: | |
343 | // Cancel | |
344 | return | |
345 | ||
346 | case depsSuccess = <-depsCh: | |
347 | // Deps complete! Mark as nil to trigger completion handling. | |
348 | depsCh = nil | |
349 | ||
350 | case <-depsUpdateCh: | |
351 | // New deps, reloop | |
352 | } | |
353 | ||
354 | // Check if we have updated dependencies. This can happen if the | |
355 | // dependencies were satisfied exactly prior to an Update occurring. | |
356 | // In that case, we'd like to take into account new dependencies | |
357 | // if possible. | |
358 | info.DepsLock.Lock() | |
359 | if info.DepsCh != nil { | |
360 | depsCh = info.DepsCh | |
361 | info.DepsCh = nil | |
362 | } | |
363 | if info.DepsUpdateCh != nil { | |
364 | depsUpdateCh = info.DepsUpdateCh | |
365 | } | |
366 | info.DepsLock.Unlock() | |
367 | ||
368 | // If we still have no deps channel set, then we're done! | |
369 | if depsCh == nil { | |
370 | break | |
371 | } | |
372 | } | |
373 | ||
374 | // If we passed dependencies, we just want to check once more that | |
375 | // we're not cancelled, since this can happen just as dependencies pass. | |
376 | select { | |
377 | case <-info.CancelCh: | |
378 | // Cancelled during an update while dependencies completed. | |
379 | return | |
380 | default: | |
381 | } | |
382 | ||
383 | // Run our callback or note that our upstream failed | |
384 | var err error | |
385 | if depsSuccess { | |
386 | log.Printf("[DEBUG] dag/walk: walking %q", VertexName(v)) | |
387 | err = w.Callback(v) | |
388 | } else { | |
389 | log.Printf("[DEBUG] dag/walk: upstream errored, not walking %q", VertexName(v)) | |
390 | err = errWalkUpstream | |
391 | } | |
392 | ||
393 | // Record the error | |
394 | if err != nil { | |
395 | w.errLock.Lock() | |
396 | defer w.errLock.Unlock() | |
397 | ||
398 | if w.errMap == nil { | |
399 | w.errMap = make(map[Vertex]error) | |
400 | } | |
401 | w.errMap[v] = err | |
402 | } | |
403 | } | |
404 | ||
405 | func (w *Walker) waitDeps( | |
406 | v Vertex, | |
407 | deps map[Vertex]<-chan struct{}, | |
408 | doneCh chan<- bool, | |
409 | cancelCh <-chan struct{}) { | |
410 | // For each dependency given to us, wait for it to complete | |
411 | for dep, depCh := range deps { | |
412 | DepSatisfied: | |
413 | for { | |
414 | select { | |
415 | case <-depCh: | |
416 | // Dependency satisfied! | |
417 | break DepSatisfied | |
418 | ||
419 | case <-cancelCh: | |
420 | // Wait cancelled. Note that we didn't satisfy dependencies | |
421 | // so that anything waiting on us also doesn't run. | |
422 | doneCh <- false | |
423 | return | |
424 | ||
425 | case <-time.After(time.Second * 5): | |
426 | log.Printf("[DEBUG] dag/walk: vertex %q, waiting for: %q", | |
427 | VertexName(v), VertexName(dep)) | |
428 | } | |
429 | } | |
430 | } | |
431 | ||
432 | // Dependencies satisfied! We need to check if any errored | |
433 | w.errLock.Lock() | |
434 | defer w.errLock.Unlock() | |
435 | for dep, _ := range deps { | |
436 | if w.errMap[dep] != nil { | |
437 | // One of our dependencies failed, so return false | |
438 | doneCh <- false | |
439 | return | |
440 | } | |
441 | } | |
442 | ||
443 | // All dependencies satisfied and successful | |
444 | doneCh <- true | |
445 | } |