]>
Commit | Line | Data |
---|---|---|
bae9f6d2 JC |
1 | package resource |
2 | ||
3 | import ( | |
4 | "log" | |
5 | "time" | |
6 | ) | |
7 | ||
8 | var refreshGracePeriod = 30 * time.Second | |
9 | ||
10 | // StateRefreshFunc is a function type used for StateChangeConf that is | |
11 | // responsible for refreshing the item being watched for a state change. | |
12 | // | |
13 | // It returns three results. `result` is any object that will be returned | |
14 | // as the final object after waiting for state change. This allows you to | |
15 | // return the final updated object, for example an EC2 instance after refreshing | |
16 | // it. | |
17 | // | |
18 | // `state` is the latest state of that object. And `err` is any error that | |
19 | // may have happened while refreshing the state. | |
20 | type StateRefreshFunc func() (result interface{}, state string, err error) | |
21 | ||
22 | // StateChangeConf is the configuration struct used for `WaitForState`. | |
23 | type StateChangeConf struct { | |
24 | Delay time.Duration // Wait this time before starting checks | |
25 | Pending []string // States that are "allowed" and will continue trying | |
26 | Refresh StateRefreshFunc // Refreshes the current state | |
27 | Target []string // Target state | |
28 | Timeout time.Duration // The amount of time to wait before timeout | |
29 | MinTimeout time.Duration // Smallest time to wait before refreshes | |
30 | PollInterval time.Duration // Override MinTimeout/backoff and only poll this often | |
31 | NotFoundChecks int // Number of times to allow not found | |
32 | ||
33 | // This is to work around inconsistent APIs | |
34 | ContinuousTargetOccurence int // Number of times the Target state has to occur continuously | |
35 | } | |
36 | ||
37 | // WaitForState watches an object and waits for it to achieve the state | |
38 | // specified in the configuration using the specified Refresh() func, | |
39 | // waiting the number of seconds specified in the timeout configuration. | |
40 | // | |
41 | // If the Refresh function returns a error, exit immediately with that error. | |
42 | // | |
43 | // If the Refresh function returns a state other than the Target state or one | |
44 | // listed in Pending, return immediately with an error. | |
45 | // | |
46 | // If the Timeout is exceeded before reaching the Target state, return an | |
47 | // error. | |
48 | // | |
49 | // Otherwise, result the result of the first call to the Refresh function to | |
50 | // reach the target state. | |
51 | func (conf *StateChangeConf) WaitForState() (interface{}, error) { | |
52 | log.Printf("[DEBUG] Waiting for state to become: %s", conf.Target) | |
53 | ||
54 | notfoundTick := 0 | |
55 | targetOccurence := 0 | |
56 | ||
57 | // Set a default for times to check for not found | |
58 | if conf.NotFoundChecks == 0 { | |
59 | conf.NotFoundChecks = 20 | |
60 | } | |
61 | ||
62 | if conf.ContinuousTargetOccurence == 0 { | |
63 | conf.ContinuousTargetOccurence = 1 | |
64 | } | |
65 | ||
66 | type Result struct { | |
67 | Result interface{} | |
68 | State string | |
69 | Error error | |
70 | Done bool | |
71 | } | |
72 | ||
73 | // Read every result from the refresh loop, waiting for a positive result.Done. | |
74 | resCh := make(chan Result, 1) | |
75 | // cancellation channel for the refresh loop | |
76 | cancelCh := make(chan struct{}) | |
77 | ||
78 | result := Result{} | |
79 | ||
80 | go func() { | |
81 | defer close(resCh) | |
82 | ||
83 | time.Sleep(conf.Delay) | |
84 | ||
85 | // start with 0 delay for the first loop | |
86 | var wait time.Duration | |
87 | ||
88 | for { | |
89 | // store the last result | |
90 | resCh <- result | |
91 | ||
92 | // wait and watch for cancellation | |
93 | select { | |
94 | case <-cancelCh: | |
95 | return | |
96 | case <-time.After(wait): | |
97 | // first round had no wait | |
98 | if wait == 0 { | |
99 | wait = 100 * time.Millisecond | |
100 | } | |
101 | } | |
102 | ||
103 | res, currentState, err := conf.Refresh() | |
104 | result = Result{ | |
105 | Result: res, | |
106 | State: currentState, | |
107 | Error: err, | |
108 | } | |
109 | ||
110 | if err != nil { | |
111 | resCh <- result | |
112 | return | |
113 | } | |
114 | ||
115 | // If we're waiting for the absence of a thing, then return | |
116 | if res == nil && len(conf.Target) == 0 { | |
117 | targetOccurence++ | |
118 | if conf.ContinuousTargetOccurence == targetOccurence { | |
119 | result.Done = true | |
120 | resCh <- result | |
121 | return | |
122 | } | |
123 | continue | |
124 | } | |
125 | ||
126 | if res == nil { | |
127 | // If we didn't find the resource, check if we have been | |
128 | // not finding it for awhile, and if so, report an error. | |
129 | notfoundTick++ | |
130 | if notfoundTick > conf.NotFoundChecks { | |
131 | result.Error = &NotFoundError{ | |
132 | LastError: err, | |
133 | Retries: notfoundTick, | |
134 | } | |
135 | resCh <- result | |
136 | return | |
137 | } | |
138 | } else { | |
139 | // Reset the counter for when a resource isn't found | |
140 | notfoundTick = 0 | |
141 | found := false | |
142 | ||
143 | for _, allowed := range conf.Target { | |
144 | if currentState == allowed { | |
145 | found = true | |
146 | targetOccurence++ | |
147 | if conf.ContinuousTargetOccurence == targetOccurence { | |
148 | result.Done = true | |
149 | resCh <- result | |
150 | return | |
151 | } | |
152 | continue | |
153 | } | |
154 | } | |
155 | ||
156 | for _, allowed := range conf.Pending { | |
157 | if currentState == allowed { | |
158 | found = true | |
159 | targetOccurence = 0 | |
160 | break | |
161 | } | |
162 | } | |
163 | ||
164 | if !found && len(conf.Pending) > 0 { | |
165 | result.Error = &UnexpectedStateError{ | |
166 | LastError: err, | |
167 | State: result.State, | |
168 | ExpectedState: conf.Target, | |
169 | } | |
170 | resCh <- result | |
171 | return | |
172 | } | |
173 | } | |
174 | ||
175 | // Wait between refreshes using exponential backoff, except when | |
176 | // waiting for the target state to reoccur. | |
177 | if targetOccurence == 0 { | |
178 | wait *= 2 | |
179 | } | |
180 | ||
181 | // If a poll interval has been specified, choose that interval. | |
182 | // Otherwise bound the default value. | |
183 | if conf.PollInterval > 0 && conf.PollInterval < 180*time.Second { | |
184 | wait = conf.PollInterval | |
185 | } else { | |
186 | if wait < conf.MinTimeout { | |
187 | wait = conf.MinTimeout | |
188 | } else if wait > 10*time.Second { | |
189 | wait = 10 * time.Second | |
190 | } | |
191 | } | |
192 | ||
193 | log.Printf("[TRACE] Waiting %s before next try", wait) | |
194 | } | |
195 | }() | |
196 | ||
197 | // store the last value result from the refresh loop | |
198 | lastResult := Result{} | |
199 | ||
200 | timeout := time.After(conf.Timeout) | |
201 | for { | |
202 | select { | |
203 | case r, ok := <-resCh: | |
204 | // channel closed, so return the last result | |
205 | if !ok { | |
206 | return lastResult.Result, lastResult.Error | |
207 | } | |
208 | ||
209 | // we reached the intended state | |
210 | if r.Done { | |
211 | return r.Result, r.Error | |
212 | } | |
213 | ||
214 | // still waiting, store the last result | |
215 | lastResult = r | |
216 | ||
217 | case <-timeout: | |
218 | log.Printf("[WARN] WaitForState timeout after %s", conf.Timeout) | |
219 | log.Printf("[WARN] WaitForState starting %s refresh grace period", refreshGracePeriod) | |
220 | ||
221 | // cancel the goroutine and start our grace period timer | |
222 | close(cancelCh) | |
223 | timeout := time.After(refreshGracePeriod) | |
224 | ||
225 | // we need a for loop and a label to break on, because we may have | |
226 | // an extra response value to read, but still want to wait for the | |
227 | // channel to close. | |
228 | forSelect: | |
229 | for { | |
230 | select { | |
231 | case r, ok := <-resCh: | |
232 | if r.Done { | |
233 | // the last refresh loop reached the desired state | |
234 | return r.Result, r.Error | |
235 | } | |
236 | ||
237 | if !ok { | |
238 | // the goroutine returned | |
239 | break forSelect | |
240 | } | |
241 | ||
242 | // target state not reached, save the result for the | |
243 | // TimeoutError and wait for the channel to close | |
244 | lastResult = r | |
245 | case <-timeout: | |
246 | log.Println("[ERROR] WaitForState exceeded refresh grace period") | |
247 | break forSelect | |
248 | } | |
249 | } | |
250 | ||
251 | return nil, &TimeoutError{ | |
252 | LastError: lastResult.Error, | |
253 | LastState: lastResult.State, | |
254 | Timeout: conf.Timeout, | |
255 | ExpectedState: conf.Target, | |
256 | } | |
257 | } | |
258 | } | |
259 | } |