| // Copyright 2015 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Package timeseries implements a time series structure for stats collection. |
| package timeseries // import "golang.org/x/net/internal/timeseries" |
| |
| import ( |
| "fmt" |
| "log" |
| "time" |
| ) |
| |
| const ( |
| timeSeriesNumBuckets = 64 |
| minuteHourSeriesNumBuckets = 60 |
| ) |
| |
| var timeSeriesResolutions = []time.Duration{ |
| 1 * time.Second, |
| 10 * time.Second, |
| 1 * time.Minute, |
| 10 * time.Minute, |
| 1 * time.Hour, |
| 6 * time.Hour, |
| 24 * time.Hour, // 1 day |
| 7 * 24 * time.Hour, // 1 week |
| 4 * 7 * 24 * time.Hour, // 4 weeks |
| 16 * 7 * 24 * time.Hour, // 16 weeks |
| } |
| |
| var minuteHourSeriesResolutions = []time.Duration{ |
| 1 * time.Second, |
| 1 * time.Minute, |
| } |
| |
| // An Observable is a kind of data that can be aggregated in a time series. |
| type Observable interface { |
| Multiply(ratio float64) // Multiplies the data in self by a given ratio |
| Add(other Observable) // Adds the data from a different observation to self |
| Clear() // Clears the observation so it can be reused. |
| CopyFrom(other Observable) // Copies the contents of a given observation to self |
| } |
| |
| // Float attaches the methods of Observable to a float64. |
| type Float float64 |
| |
| // NewFloat returns a Float. |
| func NewFloat() Observable { |
| f := Float(0) |
| return &f |
| } |
| |
| // String returns the float as a string. |
| func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) } |
| |
| // Value returns the float's value. |
| func (f *Float) Value() float64 { return float64(*f) } |
| |
| func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) } |
| |
| func (f *Float) Add(other Observable) { |
| o := other.(*Float) |
| *f += *o |
| } |
| |
| func (f *Float) Clear() { *f = 0 } |
| |
| func (f *Float) CopyFrom(other Observable) { |
| o := other.(*Float) |
| *f = *o |
| } |
| |
| // A Clock tells the current time. |
| type Clock interface { |
| Time() time.Time |
| } |
| |
| type defaultClock int |
| |
| var defaultClockInstance defaultClock |
| |
| func (defaultClock) Time() time.Time { return time.Now() } |
| |
| // Information kept per level. Each level consists of a circular list of |
| // observations. The start of the level may be derived from end and the |
| // len(buckets) * sizeInMillis. |
| type tsLevel struct { |
| oldest int // index to oldest bucketed Observable |
| newest int // index to newest bucketed Observable |
| end time.Time // end timestamp for this level |
| size time.Duration // duration of the bucketed Observable |
| buckets []Observable // collections of observations |
| provider func() Observable // used for creating new Observable |
| } |
| |
| func (l *tsLevel) Clear() { |
| l.oldest = 0 |
| l.newest = len(l.buckets) - 1 |
| l.end = time.Time{} |
| for i := range l.buckets { |
| if l.buckets[i] != nil { |
| l.buckets[i].Clear() |
| l.buckets[i] = nil |
| } |
| } |
| } |
| |
| func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) { |
| l.size = size |
| l.provider = f |
| l.buckets = make([]Observable, numBuckets) |
| } |
| |
| // Keeps a sequence of levels. Each level is responsible for storing data at |
| // a given resolution. For example, the first level stores data at a one |
| // minute resolution while the second level stores data at a one hour |
| // resolution. |
| |
| // Each level is represented by a sequence of buckets. Each bucket spans an |
| // interval equal to the resolution of the level. New observations are added |
| // to the last bucket. |
| type timeSeries struct { |
| provider func() Observable // make more Observable |
| numBuckets int // number of buckets in each level |
| levels []*tsLevel // levels of bucketed Observable |
| lastAdd time.Time // time of last Observable tracked |
| total Observable // convenient aggregation of all Observable |
| clock Clock // Clock for getting current time |
| pending Observable // observations not yet bucketed |
| pendingTime time.Time // what time are we keeping in pending |
| dirty bool // if there are pending observations |
| } |
| |
| // init initializes a level according to the supplied criteria. |
| func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) { |
| ts.provider = f |
| ts.numBuckets = numBuckets |
| ts.clock = clock |
| ts.levels = make([]*tsLevel, len(resolutions)) |
| |
| for i := range resolutions { |
| if i > 0 && resolutions[i-1] >= resolutions[i] { |
| log.Print("timeseries: resolutions must be monotonically increasing") |
| break |
| } |
| newLevel := new(tsLevel) |
| newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider) |
| ts.levels[i] = newLevel |
| } |
| |
| ts.Clear() |
| } |
| |
| // Clear removes all observations from the time series. |
| func (ts *timeSeries) Clear() { |
| ts.lastAdd = time.Time{} |
| ts.total = ts.resetObservation(ts.total) |
| ts.pending = ts.resetObservation(ts.pending) |
| ts.pendingTime = time.Time{} |
| ts.dirty = false |
| |
| for i := range ts.levels { |
| ts.levels[i].Clear() |
| } |
| } |
| |
| // Add records an observation at the current time. |
| func (ts *timeSeries) Add(observation Observable) { |
| ts.AddWithTime(observation, ts.clock.Time()) |
| } |
| |
| // AddWithTime records an observation at the specified time. |
| func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) { |
| |
| smallBucketDuration := ts.levels[0].size |
| |
| if t.After(ts.lastAdd) { |
| ts.lastAdd = t |
| } |
| |
| if t.After(ts.pendingTime) { |
| ts.advance(t) |
| ts.mergePendingUpdates() |
| ts.pendingTime = ts.levels[0].end |
| ts.pending.CopyFrom(observation) |
| ts.dirty = true |
| } else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) { |
| // The observation is close enough to go into the pending bucket. |
| // This compensates for clock skewing and small scheduling delays |
| // by letting the update stay in the fast path. |
| ts.pending.Add(observation) |
| ts.dirty = true |
| } else { |
| ts.mergeValue(observation, t) |
| } |
| } |
| |
| // mergeValue inserts the observation at the specified time in the past into all levels. |
| func (ts *timeSeries) mergeValue(observation Observable, t time.Time) { |
| for _, level := range ts.levels { |
| index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size) |
| if 0 <= index && index < ts.numBuckets { |
| bucketNumber := (level.oldest + index) % ts.numBuckets |
| if level.buckets[bucketNumber] == nil { |
| level.buckets[bucketNumber] = level.provider() |
| } |
| level.buckets[bucketNumber].Add(observation) |
| } |
| } |
| ts.total.Add(observation) |
| } |
| |
| // mergePendingUpdates applies the pending updates into all levels. |
| func (ts *timeSeries) mergePendingUpdates() { |
| if ts.dirty { |
| ts.mergeValue(ts.pending, ts.pendingTime) |
| ts.pending = ts.resetObservation(ts.pending) |
| ts.dirty = false |
| } |
| } |
| |
| // advance cycles the buckets at each level until the latest bucket in |
| // each level can hold the time specified. |
| func (ts *timeSeries) advance(t time.Time) { |
| if !t.After(ts.levels[0].end) { |
| return |
| } |
| for i := 0; i < len(ts.levels); i++ { |
| level := ts.levels[i] |
| if !level.end.Before(t) { |
| break |
| } |
| |
| // If the time is sufficiently far, just clear the level and advance |
| // directly. |
| if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) { |
| for _, b := range level.buckets { |
| ts.resetObservation(b) |
| } |
| level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds()) |
| } |
| |
| for t.After(level.end) { |
| level.end = level.end.Add(level.size) |
| level.newest = level.oldest |
| level.oldest = (level.oldest + 1) % ts.numBuckets |
| ts.resetObservation(level.buckets[level.newest]) |
| } |
| |
| t = level.end |
| } |
| } |
| |
| // Latest returns the sum of the num latest buckets from the level. |
| func (ts *timeSeries) Latest(level, num int) Observable { |
| now := ts.clock.Time() |
| if ts.levels[0].end.Before(now) { |
| ts.advance(now) |
| } |
| |
| ts.mergePendingUpdates() |
| |
| result := ts.provider() |
| l := ts.levels[level] |
| index := l.newest |
| |
| for i := 0; i < num; i++ { |
| if l.buckets[index] != nil { |
| result.Add(l.buckets[index]) |
| } |
| if index == 0 { |
| index = ts.numBuckets |
| } |
| index-- |
| } |
| |
| return result |
| } |
| |
| // LatestBuckets returns a copy of the num latest buckets from level. |
| func (ts *timeSeries) LatestBuckets(level, num int) []Observable { |
| if level < 0 || level > len(ts.levels) { |
| log.Print("timeseries: bad level argument: ", level) |
| return nil |
| } |
| if num < 0 || num >= ts.numBuckets { |
| log.Print("timeseries: bad num argument: ", num) |
| return nil |
| } |
| |
| results := make([]Observable, num) |
| now := ts.clock.Time() |
| if ts.levels[0].end.Before(now) { |
| ts.advance(now) |
| } |
| |
| ts.mergePendingUpdates() |
| |
| l := ts.levels[level] |
| index := l.newest |
| |
| for i := 0; i < num; i++ { |
| result := ts.provider() |
| results[i] = result |
| if l.buckets[index] != nil { |
| result.CopyFrom(l.buckets[index]) |
| } |
| |
| if index == 0 { |
| index = ts.numBuckets |
| } |
| index -= 1 |
| } |
| return results |
| } |
| |
| // ScaleBy updates observations by scaling by factor. |
| func (ts *timeSeries) ScaleBy(factor float64) { |
| for _, l := range ts.levels { |
| for i := 0; i < ts.numBuckets; i++ { |
| l.buckets[i].Multiply(factor) |
| } |
| } |
| |
| ts.total.Multiply(factor) |
| ts.pending.Multiply(factor) |
| } |
| |
| // Range returns the sum of observations added over the specified time range. |
| // If start or finish times don't fall on bucket boundaries of the same |
| // level, then return values are approximate answers. |
| func (ts *timeSeries) Range(start, finish time.Time) Observable { |
| return ts.ComputeRange(start, finish, 1)[0] |
| } |
| |
| // Recent returns the sum of observations from the last delta. |
| func (ts *timeSeries) Recent(delta time.Duration) Observable { |
| now := ts.clock.Time() |
| return ts.Range(now.Add(-delta), now) |
| } |
| |
| // Total returns the total of all observations. |
| func (ts *timeSeries) Total() Observable { |
| ts.mergePendingUpdates() |
| return ts.total |
| } |
| |
| // ComputeRange computes a specified number of values into a slice using |
| // the observations recorded over the specified time period. The return |
| // values are approximate if the start or finish times don't fall on the |
| // bucket boundaries at the same level or if the number of buckets spanning |
| // the range is not an integral multiple of num. |
| func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable { |
| if start.After(finish) { |
| log.Printf("timeseries: start > finish, %v>%v", start, finish) |
| return nil |
| } |
| |
| if num < 0 { |
| log.Printf("timeseries: num < 0, %v", num) |
| return nil |
| } |
| |
| results := make([]Observable, num) |
| |
| for _, l := range ts.levels { |
| if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) { |
| ts.extract(l, start, finish, num, results) |
| return results |
| } |
| } |
| |
| // Failed to find a level that covers the desired range. So just |
| // extract from the last level, even if it doesn't cover the entire |
| // desired range. |
| ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results) |
| |
| return results |
| } |
| |
| // RecentList returns the specified number of values in slice over the most |
| // recent time period of the specified range. |
| func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable { |
| if delta < 0 { |
| return nil |
| } |
| now := ts.clock.Time() |
| return ts.ComputeRange(now.Add(-delta), now, num) |
| } |
| |
| // extract returns a slice of specified number of observations from a given |
| // level over a given range. |
| func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) { |
| ts.mergePendingUpdates() |
| |
| srcInterval := l.size |
| dstInterval := finish.Sub(start) / time.Duration(num) |
| dstStart := start |
| srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets)) |
| |
| srcIndex := 0 |
| |
| // Where should scanning start? |
| if dstStart.After(srcStart) { |
| advance := int(dstStart.Sub(srcStart) / srcInterval) |
| srcIndex += advance |
| srcStart = srcStart.Add(time.Duration(advance) * srcInterval) |
| } |
| |
| // The i'th value is computed as show below. |
| // interval = (finish/start)/num |
| // i'th value = sum of observation in range |
| // [ start + i * interval, |
| // start + (i + 1) * interval ) |
| for i := 0; i < num; i++ { |
| results[i] = ts.resetObservation(results[i]) |
| dstEnd := dstStart.Add(dstInterval) |
| for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) { |
| srcEnd := srcStart.Add(srcInterval) |
| if srcEnd.After(ts.lastAdd) { |
| srcEnd = ts.lastAdd |
| } |
| |
| if !srcEnd.Before(dstStart) { |
| srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets] |
| if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) { |
| // dst completely contains src. |
| if srcValue != nil { |
| results[i].Add(srcValue) |
| } |
| } else { |
| // dst partially overlaps src. |
| overlapStart := maxTime(srcStart, dstStart) |
| overlapEnd := minTime(srcEnd, dstEnd) |
| base := srcEnd.Sub(srcStart) |
| fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds() |
| |
| used := ts.provider() |
| if srcValue != nil { |
| used.CopyFrom(srcValue) |
| } |
| used.Multiply(fraction) |
| results[i].Add(used) |
| } |
| |
| if srcEnd.After(dstEnd) { |
| break |
| } |
| } |
| srcIndex++ |
| srcStart = srcStart.Add(srcInterval) |
| } |
| dstStart = dstStart.Add(dstInterval) |
| } |
| } |
| |
| // resetObservation clears the content so the struct may be reused. |
| func (ts *timeSeries) resetObservation(observation Observable) Observable { |
| if observation == nil { |
| observation = ts.provider() |
| } else { |
| observation.Clear() |
| } |
| return observation |
| } |
| |
| // TimeSeries tracks data at granularities from 1 second to 16 weeks. |
| type TimeSeries struct { |
| timeSeries |
| } |
| |
| // NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable. |
| func NewTimeSeries(f func() Observable) *TimeSeries { |
| return NewTimeSeriesWithClock(f, defaultClockInstance) |
| } |
| |
| // NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for |
| // assigning timestamps. |
| func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries { |
| ts := new(TimeSeries) |
| ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock) |
| return ts |
| } |
| |
| // MinuteHourSeries tracks data at granularities of 1 minute and 1 hour. |
| type MinuteHourSeries struct { |
| timeSeries |
| } |
| |
| // NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable. |
| func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries { |
| return NewMinuteHourSeriesWithClock(f, defaultClockInstance) |
| } |
| |
| // NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for |
| // assigning timestamps. |
| func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries { |
| ts := new(MinuteHourSeries) |
| ts.timeSeries.init(minuteHourSeriesResolutions, f, |
| minuteHourSeriesNumBuckets, clock) |
| return ts |
| } |
| |
| func (ts *MinuteHourSeries) Minute() Observable { |
| return ts.timeSeries.Latest(0, 60) |
| } |
| |
| func (ts *MinuteHourSeries) Hour() Observable { |
| return ts.timeSeries.Latest(1, 60) |
| } |
| |
| func minTime(a, b time.Time) time.Time { |
| if a.Before(b) { |
| return a |
| } |
| return b |
| } |
| |
| func maxTime(a, b time.Time) time.Time { |
| if a.After(b) { |
| return a |
| } |
| return b |
| } |