From bae9f6d2fd5eb5bc80929bd393932b23f14d7c93 Mon Sep 17 00:00:00 2001 From: Jake Champlin Date: Tue, 6 Jun 2017 12:40:07 -0400 Subject: Initial transfer of provider code --- vendor/github.com/hashicorp/terraform/dag/walk.go | 445 ++++++++++++++++++++++ 1 file changed, 445 insertions(+) create mode 100644 vendor/github.com/hashicorp/terraform/dag/walk.go (limited to 'vendor/github.com/hashicorp/terraform/dag/walk.go') diff --git a/vendor/github.com/hashicorp/terraform/dag/walk.go b/vendor/github.com/hashicorp/terraform/dag/walk.go new file mode 100644 index 0000000..23c87ad --- /dev/null +++ b/vendor/github.com/hashicorp/terraform/dag/walk.go @@ -0,0 +1,445 @@ +package dag + +import ( + "errors" + "fmt" + "log" + "sync" + "time" + + "github.com/hashicorp/go-multierror" +) + +// Walker is used to walk every vertex of a graph in parallel. +// +// A vertex will only be walked when the dependencies of that vertex have +// been walked. If two vertices can be walked at the same time, they will be. +// +// Update can be called to update the graph. This can be called even during +// a walk, cahnging vertices/edges mid-walk. This should be done carefully. +// If a vertex is removed but has already been executed, the result of that +// execution (any error) is still returned by Wait. Changing or re-adding +// a vertex that has already executed has no effect. Changing edges of +// a vertex that has already executed has no effect. +// +// Non-parallelism can be enforced by introducing a lock in your callback +// function. However, the goroutine overhead of a walk will remain. +// Walker will create V*2 goroutines (one for each vertex, and dependency +// waiter for each vertex). In general this should be of no concern unless +// there are a huge number of vertices. +// +// The walk is depth first by default. This can be changed with the Reverse +// option. +// +// A single walker is only valid for one graph walk. After the walk is complete +// you must construct a new walker to walk again. State for the walk is never +// deleted in case vertices or edges are changed. +type Walker struct { + // Callback is what is called for each vertex + Callback WalkFunc + + // Reverse, if true, causes the source of an edge to depend on a target. + // When false (default), the target depends on the source. + Reverse bool + + // changeLock must be held to modify any of the fields below. Only Update + // should modify these fields. Modifying them outside of Update can cause + // serious problems. + changeLock sync.Mutex + vertices Set + edges Set + vertexMap map[Vertex]*walkerVertex + + // wait is done when all vertices have executed. It may become "undone" + // if new vertices are added. + wait sync.WaitGroup + + // errMap contains the errors recorded so far for execution. Reading + // and writing should hold errLock. + errMap map[Vertex]error + errLock sync.Mutex +} + +type walkerVertex struct { + // These should only be set once on initialization and never written again. + // They are not protected by a lock since they don't need to be since + // they are write-once. + + // DoneCh is closed when this vertex has completed execution, regardless + // of success. + // + // CancelCh is closed when the vertex should cancel execution. If execution + // is already complete (DoneCh is closed), this has no effect. Otherwise, + // execution is cancelled as quickly as possible. + DoneCh chan struct{} + CancelCh chan struct{} + + // Dependency information. Any changes to any of these fields requires + // holding DepsLock. + // + // DepsCh is sent a single value that denotes whether the upstream deps + // were successful (no errors). Any value sent means that the upstream + // dependencies are complete. No other values will ever be sent again. + // + // DepsUpdateCh is closed when there is a new DepsCh set. + DepsCh chan bool + DepsUpdateCh chan struct{} + DepsLock sync.Mutex + + // Below is not safe to read/write in parallel. This behavior is + // enforced by changes only happening in Update. Nothing else should + // ever modify these. + deps map[Vertex]chan struct{} + depsCancelCh chan struct{} +} + +// errWalkUpstream is used in the errMap of a walk to note that an upstream +// dependency failed so this vertex wasn't run. This is not shown in the final +// user-returned error. +var errWalkUpstream = errors.New("upstream dependency failed") + +// Wait waits for the completion of the walk and returns any errors ( +// in the form of a multierror) that occurred. Update should be called +// to populate the walk with vertices and edges prior to calling this. +// +// Wait will return as soon as all currently known vertices are complete. +// If you plan on calling Update with more vertices in the future, you +// should not call Wait until after this is done. +func (w *Walker) Wait() error { + // Wait for completion + w.wait.Wait() + + // Grab the error lock + w.errLock.Lock() + defer w.errLock.Unlock() + + // Build the error + var result error + for v, err := range w.errMap { + if err != nil && err != errWalkUpstream { + result = multierror.Append(result, fmt.Errorf( + "%s: %s", VertexName(v), err)) + } + } + + return result +} + +// Update updates the currently executing walk with the given graph. +// This will perform a diff of the vertices and edges and update the walker. +// Already completed vertices remain completed (including any errors during +// their execution). +// +// This returns immediately once the walker is updated; it does not wait +// for completion of the walk. +// +// Multiple Updates can be called in parallel. Update can be called at any +// time during a walk. +func (w *Walker) Update(g *AcyclicGraph) { + var v, e *Set + if g != nil { + v, e = g.vertices, g.edges + } + + // Grab the change lock so no more updates happen but also so that + // no new vertices are executed during this time since we may be + // removing them. + w.changeLock.Lock() + defer w.changeLock.Unlock() + + // Initialize fields + if w.vertexMap == nil { + w.vertexMap = make(map[Vertex]*walkerVertex) + } + + // Calculate all our sets + newEdges := e.Difference(&w.edges) + oldEdges := w.edges.Difference(e) + newVerts := v.Difference(&w.vertices) + oldVerts := w.vertices.Difference(v) + + // Add the new vertices + for _, raw := range newVerts.List() { + v := raw.(Vertex) + + // Add to the waitgroup so our walk is not done until everything finishes + w.wait.Add(1) + + // Add to our own set so we know about it already + log.Printf("[DEBUG] dag/walk: added new vertex: %q", VertexName(v)) + w.vertices.Add(raw) + + // Initialize the vertex info + info := &walkerVertex{ + DoneCh: make(chan struct{}), + CancelCh: make(chan struct{}), + deps: make(map[Vertex]chan struct{}), + } + + // Add it to the map and kick off the walk + w.vertexMap[v] = info + } + + // Remove the old vertices + for _, raw := range oldVerts.List() { + v := raw.(Vertex) + + // Get the vertex info so we can cancel it + info, ok := w.vertexMap[v] + if !ok { + // This vertex for some reason was never in our map. This + // shouldn't be possible. + continue + } + + // Cancel the vertex + close(info.CancelCh) + + // Delete it out of the map + delete(w.vertexMap, v) + + log.Printf("[DEBUG] dag/walk: removed vertex: %q", VertexName(v)) + w.vertices.Delete(raw) + } + + // Add the new edges + var changedDeps Set + for _, raw := range newEdges.List() { + edge := raw.(Edge) + waiter, dep := w.edgeParts(edge) + + // Get the info for the waiter + waiterInfo, ok := w.vertexMap[waiter] + if !ok { + // Vertex doesn't exist... shouldn't be possible but ignore. + continue + } + + // Get the info for the dep + depInfo, ok := w.vertexMap[dep] + if !ok { + // Vertex doesn't exist... shouldn't be possible but ignore. + continue + } + + // Add the dependency to our waiter + waiterInfo.deps[dep] = depInfo.DoneCh + + // Record that the deps changed for this waiter + changedDeps.Add(waiter) + + log.Printf( + "[DEBUG] dag/walk: added edge: %q waiting on %q", + VertexName(waiter), VertexName(dep)) + w.edges.Add(raw) + } + + // Process reoved edges + for _, raw := range oldEdges.List() { + edge := raw.(Edge) + waiter, dep := w.edgeParts(edge) + + // Get the info for the waiter + waiterInfo, ok := w.vertexMap[waiter] + if !ok { + // Vertex doesn't exist... shouldn't be possible but ignore. + continue + } + + // Delete the dependency from the waiter + delete(waiterInfo.deps, dep) + + // Record that the deps changed for this waiter + changedDeps.Add(waiter) + + log.Printf( + "[DEBUG] dag/walk: removed edge: %q waiting on %q", + VertexName(waiter), VertexName(dep)) + w.edges.Delete(raw) + } + + // For each vertex with changed dependencies, we need to kick off + // a new waiter and notify the vertex of the changes. + for _, raw := range changedDeps.List() { + v := raw.(Vertex) + info, ok := w.vertexMap[v] + if !ok { + // Vertex doesn't exist... shouldn't be possible but ignore. + continue + } + + // Create a new done channel + doneCh := make(chan bool, 1) + + // Create the channel we close for cancellation + cancelCh := make(chan struct{}) + + // Build a new deps copy + deps := make(map[Vertex]<-chan struct{}) + for k, v := range info.deps { + deps[k] = v + } + + // Update the update channel + info.DepsLock.Lock() + if info.DepsUpdateCh != nil { + close(info.DepsUpdateCh) + } + info.DepsCh = doneCh + info.DepsUpdateCh = make(chan struct{}) + info.DepsLock.Unlock() + + // Cancel the older waiter + if info.depsCancelCh != nil { + close(info.depsCancelCh) + } + info.depsCancelCh = cancelCh + + log.Printf( + "[DEBUG] dag/walk: dependencies changed for %q, sending new deps", + VertexName(v)) + + // Start the waiter + go w.waitDeps(v, deps, doneCh, cancelCh) + } + + // Start all the new vertices. We do this at the end so that all + // the edge waiters and changes are setup above. + for _, raw := range newVerts.List() { + v := raw.(Vertex) + go w.walkVertex(v, w.vertexMap[v]) + } +} + +// edgeParts returns the waiter and the dependency, in that order. +// The waiter is waiting on the dependency. +func (w *Walker) edgeParts(e Edge) (Vertex, Vertex) { + if w.Reverse { + return e.Source(), e.Target() + } + + return e.Target(), e.Source() +} + +// walkVertex walks a single vertex, waiting for any dependencies before +// executing the callback. +func (w *Walker) walkVertex(v Vertex, info *walkerVertex) { + // When we're done executing, lower the waitgroup count + defer w.wait.Done() + + // When we're done, always close our done channel + defer close(info.DoneCh) + + // Wait for our dependencies. We create a [closed] deps channel so + // that we can immediately fall through to load our actual DepsCh. + var depsSuccess bool + var depsUpdateCh chan struct{} + depsCh := make(chan bool, 1) + depsCh <- true + close(depsCh) + for { + select { + case <-info.CancelCh: + // Cancel + return + + case depsSuccess = <-depsCh: + // Deps complete! Mark as nil to trigger completion handling. + depsCh = nil + + case <-depsUpdateCh: + // New deps, reloop + } + + // Check if we have updated dependencies. This can happen if the + // dependencies were satisfied exactly prior to an Update occurring. + // In that case, we'd like to take into account new dependencies + // if possible. + info.DepsLock.Lock() + if info.DepsCh != nil { + depsCh = info.DepsCh + info.DepsCh = nil + } + if info.DepsUpdateCh != nil { + depsUpdateCh = info.DepsUpdateCh + } + info.DepsLock.Unlock() + + // If we still have no deps channel set, then we're done! + if depsCh == nil { + break + } + } + + // If we passed dependencies, we just want to check once more that + // we're not cancelled, since this can happen just as dependencies pass. + select { + case <-info.CancelCh: + // Cancelled during an update while dependencies completed. + return + default: + } + + // Run our callback or note that our upstream failed + var err error + if depsSuccess { + log.Printf("[DEBUG] dag/walk: walking %q", VertexName(v)) + err = w.Callback(v) + } else { + log.Printf("[DEBUG] dag/walk: upstream errored, not walking %q", VertexName(v)) + err = errWalkUpstream + } + + // Record the error + if err != nil { + w.errLock.Lock() + defer w.errLock.Unlock() + + if w.errMap == nil { + w.errMap = make(map[Vertex]error) + } + w.errMap[v] = err + } +} + +func (w *Walker) waitDeps( + v Vertex, + deps map[Vertex]<-chan struct{}, + doneCh chan<- bool, + cancelCh <-chan struct{}) { + // For each dependency given to us, wait for it to complete + for dep, depCh := range deps { + DepSatisfied: + for { + select { + case <-depCh: + // Dependency satisfied! + break DepSatisfied + + case <-cancelCh: + // Wait cancelled. Note that we didn't satisfy dependencies + // so that anything waiting on us also doesn't run. + doneCh <- false + return + + case <-time.After(time.Second * 5): + log.Printf("[DEBUG] dag/walk: vertex %q, waiting for: %q", + VertexName(v), VertexName(dep)) + } + } + } + + // Dependencies satisfied! We need to check if any errored + w.errLock.Lock() + defer w.errLock.Unlock() + for dep, _ := range deps { + if w.errMap[dep] != nil { + // One of our dependencies failed, so return false + doneCh <- false + return + } + } + + // All dependencies satisfied and successful + doneCh <- true +} -- cgit v1.2.3