1 // Copyright 2015 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
5 // Package timeseries implements a time series structure for stats collection.
6 package timeseries // import "golang.org/x/net/internal/timeseries"
15 timeSeriesNumBuckets = 64
16 minuteHourSeriesNumBuckets = 60
19 var timeSeriesResolutions = []time.Duration{
26 24 * time.Hour, // 1 day
27 7 * 24 * time.Hour, // 1 week
28 4 * 7 * 24 * time.Hour, // 4 weeks
29 16 * 7 * 24 * time.Hour, // 16 weeks
32 var minuteHourSeriesResolutions = []time.Duration{
37 // An Observable is a kind of data that can be aggregated in a time series.
38 type Observable interface {
39 Multiply(ratio float64) // Multiplies the data in self by a given ratio
40 Add(other Observable) // Adds the data from a different observation to self
41 Clear() // Clears the observation so it can be reused.
42 CopyFrom(other Observable) // Copies the contents of a given observation to self
45 // Float attaches the methods of Observable to a float64.
48 // NewFloat returns a Float.
49 func NewFloat() Observable {
54 // String returns the float as a string.
55 func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) }
57 // Value returns the float's value.
58 func (f *Float) Value() float64 { return float64(*f) }
60 func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) }
62 func (f *Float) Add(other Observable) {
67 func (f *Float) Clear() { *f = 0 }
69 func (f *Float) CopyFrom(other Observable) {
74 // A Clock tells the current time.
75 type Clock interface {
81 var defaultClockInstance defaultClock
83 func (defaultClock) Time() time.Time { return time.Now() }
85 // Information kept per level. Each level consists of a circular list of
86 // observations. The start of the level may be derived from end and the
87 // len(buckets) * sizeInMillis.
89 oldest int // index to oldest bucketed Observable
90 newest int // index to newest bucketed Observable
91 end time.Time // end timestamp for this level
92 size time.Duration // duration of the bucketed Observable
93 buckets []Observable // collections of observations
94 provider func() Observable // used for creating new Observable
97 func (l *tsLevel) Clear() {
99 l.newest = len(l.buckets) - 1
101 for i := range l.buckets {
102 if l.buckets[i] != nil {
109 func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) {
112 l.buckets = make([]Observable, numBuckets)
115 // Keeps a sequence of levels. Each level is responsible for storing data at
116 // a given resolution. For example, the first level stores data at a one
117 // minute resolution while the second level stores data at a one hour
120 // Each level is represented by a sequence of buckets. Each bucket spans an
121 // interval equal to the resolution of the level. New observations are added
122 // to the last bucket.
123 type timeSeries struct {
124 provider func() Observable // make more Observable
125 numBuckets int // number of buckets in each level
126 levels []*tsLevel // levels of bucketed Observable
127 lastAdd time.Time // time of last Observable tracked
128 total Observable // convenient aggregation of all Observable
129 clock Clock // Clock for getting current time
130 pending Observable // observations not yet bucketed
131 pendingTime time.Time // what time are we keeping in pending
132 dirty bool // if there are pending observations
135 // init initializes a level according to the supplied criteria.
136 func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) {
138 ts.numBuckets = numBuckets
140 ts.levels = make([]*tsLevel, len(resolutions))
142 for i := range resolutions {
143 if i > 0 && resolutions[i-1] >= resolutions[i] {
144 log.Print("timeseries: resolutions must be monotonically increasing")
147 newLevel := new(tsLevel)
148 newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider)
149 ts.levels[i] = newLevel
155 // Clear removes all observations from the time series.
156 func (ts *timeSeries) Clear() {
157 ts.lastAdd = time.Time{}
158 ts.total = ts.resetObservation(ts.total)
159 ts.pending = ts.resetObservation(ts.pending)
160 ts.pendingTime = time.Time{}
163 for i := range ts.levels {
168 // Add records an observation at the current time.
169 func (ts *timeSeries) Add(observation Observable) {
170 ts.AddWithTime(observation, ts.clock.Time())
173 // AddWithTime records an observation at the specified time.
174 func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) {
176 smallBucketDuration := ts.levels[0].size
178 if t.After(ts.lastAdd) {
182 if t.After(ts.pendingTime) {
184 ts.mergePendingUpdates()
185 ts.pendingTime = ts.levels[0].end
186 ts.pending.CopyFrom(observation)
188 } else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) {
189 // The observation is close enough to go into the pending bucket.
190 // This compensates for clock skewing and small scheduling delays
191 // by letting the update stay in the fast path.
192 ts.pending.Add(observation)
195 ts.mergeValue(observation, t)
199 // mergeValue inserts the observation at the specified time in the past into all levels.
200 func (ts *timeSeries) mergeValue(observation Observable, t time.Time) {
201 for _, level := range ts.levels {
202 index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size)
203 if 0 <= index && index < ts.numBuckets {
204 bucketNumber := (level.oldest + index) % ts.numBuckets
205 if level.buckets[bucketNumber] == nil {
206 level.buckets[bucketNumber] = level.provider()
208 level.buckets[bucketNumber].Add(observation)
211 ts.total.Add(observation)
214 // mergePendingUpdates applies the pending updates into all levels.
215 func (ts *timeSeries) mergePendingUpdates() {
217 ts.mergeValue(ts.pending, ts.pendingTime)
218 ts.pending = ts.resetObservation(ts.pending)
223 // advance cycles the buckets at each level until the latest bucket in
224 // each level can hold the time specified.
225 func (ts *timeSeries) advance(t time.Time) {
226 if !t.After(ts.levels[0].end) {
229 for i := 0; i < len(ts.levels); i++ {
230 level := ts.levels[i]
231 if !level.end.Before(t) {
235 // If the time is sufficiently far, just clear the level and advance
237 if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) {
238 for _, b := range level.buckets {
239 ts.resetObservation(b)
241 level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds())
244 for t.After(level.end) {
245 level.end = level.end.Add(level.size)
246 level.newest = level.oldest
247 level.oldest = (level.oldest + 1) % ts.numBuckets
248 ts.resetObservation(level.buckets[level.newest])
255 // Latest returns the sum of the num latest buckets from the level.
256 func (ts *timeSeries) Latest(level, num int) Observable {
257 now := ts.clock.Time()
258 if ts.levels[0].end.Before(now) {
262 ts.mergePendingUpdates()
264 result := ts.provider()
265 l := ts.levels[level]
268 for i := 0; i < num; i++ {
269 if l.buckets[index] != nil {
270 result.Add(l.buckets[index])
273 index = ts.numBuckets
281 // LatestBuckets returns a copy of the num latest buckets from level.
282 func (ts *timeSeries) LatestBuckets(level, num int) []Observable {
283 if level < 0 || level > len(ts.levels) {
284 log.Print("timeseries: bad level argument: ", level)
287 if num < 0 || num >= ts.numBuckets {
288 log.Print("timeseries: bad num argument: ", num)
292 results := make([]Observable, num)
293 now := ts.clock.Time()
294 if ts.levels[0].end.Before(now) {
298 ts.mergePendingUpdates()
300 l := ts.levels[level]
303 for i := 0; i < num; i++ {
304 result := ts.provider()
306 if l.buckets[index] != nil {
307 result.CopyFrom(l.buckets[index])
311 index = ts.numBuckets
318 // ScaleBy updates observations by scaling by factor.
319 func (ts *timeSeries) ScaleBy(factor float64) {
320 for _, l := range ts.levels {
321 for i := 0; i < ts.numBuckets; i++ {
322 l.buckets[i].Multiply(factor)
326 ts.total.Multiply(factor)
327 ts.pending.Multiply(factor)
330 // Range returns the sum of observations added over the specified time range.
331 // If start or finish times don't fall on bucket boundaries of the same
332 // level, then return values are approximate answers.
333 func (ts *timeSeries) Range(start, finish time.Time) Observable {
334 return ts.ComputeRange(start, finish, 1)[0]
337 // Recent returns the sum of observations from the last delta.
338 func (ts *timeSeries) Recent(delta time.Duration) Observable {
339 now := ts.clock.Time()
340 return ts.Range(now.Add(-delta), now)
343 // Total returns the total of all observations.
344 func (ts *timeSeries) Total() Observable {
345 ts.mergePendingUpdates()
349 // ComputeRange computes a specified number of values into a slice using
350 // the observations recorded over the specified time period. The return
351 // values are approximate if the start or finish times don't fall on the
352 // bucket boundaries at the same level or if the number of buckets spanning
353 // the range is not an integral multiple of num.
354 func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable {
355 if start.After(finish) {
356 log.Printf("timeseries: start > finish, %v>%v", start, finish)
361 log.Printf("timeseries: num < 0, %v", num)
365 results := make([]Observable, num)
367 for _, l := range ts.levels {
368 if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) {
369 ts.extract(l, start, finish, num, results)
374 // Failed to find a level that covers the desired range. So just
375 // extract from the last level, even if it doesn't cover the entire
377 ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results)
382 // RecentList returns the specified number of values in slice over the most
383 // recent time period of the specified range.
384 func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable {
388 now := ts.clock.Time()
389 return ts.ComputeRange(now.Add(-delta), now, num)
392 // extract returns a slice of specified number of observations from a given
393 // level over a given range.
394 func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) {
395 ts.mergePendingUpdates()
397 srcInterval := l.size
398 dstInterval := finish.Sub(start) / time.Duration(num)
400 srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets))
404 // Where should scanning start?
405 if dstStart.After(srcStart) {
406 advance := dstStart.Sub(srcStart) / srcInterval
407 srcIndex += int(advance)
408 srcStart = srcStart.Add(advance * srcInterval)
411 // The i'th value is computed as show below.
412 // interval = (finish/start)/num
413 // i'th value = sum of observation in range
414 // [ start + i * interval,
415 // start + (i + 1) * interval )
416 for i := 0; i < num; i++ {
417 results[i] = ts.resetObservation(results[i])
418 dstEnd := dstStart.Add(dstInterval)
419 for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) {
420 srcEnd := srcStart.Add(srcInterval)
421 if srcEnd.After(ts.lastAdd) {
425 if !srcEnd.Before(dstStart) {
426 srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets]
427 if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) {
428 // dst completely contains src.
430 results[i].Add(srcValue)
433 // dst partially overlaps src.
434 overlapStart := maxTime(srcStart, dstStart)
435 overlapEnd := minTime(srcEnd, dstEnd)
436 base := srcEnd.Sub(srcStart)
437 fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds()
439 used := ts.provider()
441 used.CopyFrom(srcValue)
443 used.Multiply(fraction)
447 if srcEnd.After(dstEnd) {
452 srcStart = srcStart.Add(srcInterval)
454 dstStart = dstStart.Add(dstInterval)
458 // resetObservation clears the content so the struct may be reused.
459 func (ts *timeSeries) resetObservation(observation Observable) Observable {
460 if observation == nil {
461 observation = ts.provider()
468 // TimeSeries tracks data at granularities from 1 second to 16 weeks.
469 type TimeSeries struct {
473 // NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable.
474 func NewTimeSeries(f func() Observable) *TimeSeries {
475 return NewTimeSeriesWithClock(f, defaultClockInstance)
478 // NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for
479 // assigning timestamps.
480 func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries {
481 ts := new(TimeSeries)
482 ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock)
486 // MinuteHourSeries tracks data at granularities of 1 minute and 1 hour.
487 type MinuteHourSeries struct {
491 // NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable.
492 func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries {
493 return NewMinuteHourSeriesWithClock(f, defaultClockInstance)
496 // NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for
497 // assigning timestamps.
498 func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries {
499 ts := new(MinuteHourSeries)
500 ts.timeSeries.init(minuteHourSeriesResolutions, f,
501 minuteHourSeriesNumBuckets, clock)
505 func (ts *MinuteHourSeries) Minute() Observable {
506 return ts.timeSeries.Latest(0, 60)
509 func (ts *MinuteHourSeries) Hour() Observable {
510 return ts.timeSeries.Latest(1, 60)
513 func minTime(a, b time.Time) time.Time {
520 func maxTime(a, b time.Time) time.Time {