internal/telemetry/export/ocagent: attach timestamps to metrics

This change attaches start timestamps to timeseries and end
timestamps to the points in each timeseries. Int64Data,
Float64Data, HistogramInt64Data, and HistogramFloat64Data
have also had an EndTime field added to keep track of the last
time the metric was updated.

What works:
* Start and end timestamps will now be attached to timeseries.

What does not work yet:
* MetricDescriptors will not have a unit attached.
* No labels will be attached to timeseries.
* Distributions will not have SumOfSquaredDeviation attached.

Updates golang/go#33819

Change-Id: I692e1676bb1e31de26c1f799b96428fc9a55d6c7
Reviewed-on: https://go-review.googlesource.com/c/tools/+/203060
Run-TryBot: Emmanuel Odeke <emm.odeke@gmail.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
diff --git a/internal/telemetry/export/ocagent/metrics.go b/internal/telemetry/export/ocagent/metrics.go
index 11c2a83..6cb824d 100644
--- a/internal/telemetry/export/ocagent/metrics.go
+++ b/internal/telemetry/export/ocagent/metrics.go
@@ -6,6 +6,7 @@
 
 import (
 	"fmt"
+	"time"
 
 	"golang.org/x/tools/internal/telemetry"
 	"golang.org/x/tools/internal/telemetry/export/ocagent/wire"
@@ -95,17 +96,18 @@
 
 // dataToTimeseries returns a slice of *wire.TimeSeries based on the
 // points in data.
-func dataToTimeseries(data telemetry.MetricData) []*wire.TimeSeries {
+func dataToTimeseries(data telemetry.MetricData, start time.Time) []*wire.TimeSeries {
 	if data == nil {
 		return nil
 	}
 
 	numRows := numRows(data)
+	startTimestamp := convertTimestamp(start)
 	timeseries := make([]*wire.TimeSeries, 0, numRows)
 
 	for i := 0; i < numRows; i++ {
 		timeseries = append(timeseries, &wire.TimeSeries{
-			// TODO: attach StartTimestamp
+			StartTimestamp: &startTimestamp,
 			// TODO: labels?
 			Points: dataToPoints(data, i),
 		})
@@ -135,21 +137,23 @@
 func dataToPoints(data telemetry.MetricData, i int) []*wire.Point {
 	switch d := data.(type) {
 	case *metric.Int64Data:
+		timestamp := convertTimestamp(*d.EndTime)
 		return []*wire.Point{
 			{
 				Value: wire.PointInt64Value{
 					Int64Value: d.Rows[i],
 				},
-				// TODO: attach Timestamp
+				Timestamp: &timestamp,
 			},
 		}
 	case *metric.Float64Data:
+		timestamp := convertTimestamp(*d.EndTime)
 		return []*wire.Point{
 			{
 				Value: wire.PointDoubleValue{
 					DoubleValue: d.Rows[i],
 				},
-				// TODO: attach Timestamp
+				Timestamp: &timestamp,
 			},
 		}
 	case *metric.HistogramInt64Data:
@@ -158,10 +162,10 @@
 		for i, val := range d.Info.Buckets {
 			bucketBounds[i] = float64(val)
 		}
-		return distributionToPoints(row.Values, row.Count, float64(row.Sum), bucketBounds)
+		return distributionToPoints(row.Values, row.Count, float64(row.Sum), bucketBounds, *d.EndTime)
 	case *metric.HistogramFloat64Data:
 		row := d.Rows[i]
-		return distributionToPoints(row.Values, row.Count, row.Sum, d.Info.Buckets)
+		return distributionToPoints(row.Values, row.Count, row.Sum, d.Info.Buckets, *d.EndTime)
 	}
 
 	return nil
@@ -170,13 +174,14 @@
 // distributionToPoints returns an array of *wire.Points containing a
 // wire.PointDistributionValue representing a distribution with the
 // supplied counts, count, and sum.
-func distributionToPoints(counts []int64, count int64, sum float64, bucketBounds []float64) []*wire.Point {
+func distributionToPoints(counts []int64, count int64, sum float64, bucketBounds []float64, end time.Time) []*wire.Point {
 	buckets := make([]*wire.Bucket, len(counts))
 	for i := 0; i < len(counts); i++ {
 		buckets[i] = &wire.Bucket{
 			Count: counts[i],
 		}
 	}
+	timestamp := convertTimestamp(end)
 	return []*wire.Point{
 		{
 			Value: wire.PointDistributionValue{
@@ -190,6 +195,7 @@
 					},
 				},
 			},
+			Timestamp: &timestamp,
 		},
 	}
 }
diff --git a/internal/telemetry/export/ocagent/metrics_test.go b/internal/telemetry/export/ocagent/metrics_test.go
index 85e90dd..033a60e 100644
--- a/internal/telemetry/export/ocagent/metrics_test.go
+++ b/internal/telemetry/export/ocagent/metrics_test.go
@@ -3,6 +3,7 @@
 import (
 	"reflect"
 	"testing"
+	"time"
 
 	"golang.org/x/tools/internal/telemetry"
 	"golang.org/x/tools/internal/telemetry/export/ocagent/wire"
@@ -318,14 +319,22 @@
 }
 
 func TestDataToTimeseries(t *testing.T) {
+	epoch := time.Unix(0, 0)
+	epochTimestamp := convertTimestamp(epoch)
+
+	end := time.Unix(30, 0)
+	endTimestamp := convertTimestamp(end)
+
 	tests := []struct {
-		name string
-		data telemetry.MetricData
-		want []*wire.TimeSeries
+		name  string
+		data  telemetry.MetricData
+		start time.Time
+		want  []*wire.TimeSeries
 	}{
 		{
 			"nil data",
 			nil,
+			time.Time{},
 			nil,
 		},
 		{
@@ -336,28 +345,36 @@
 					2,
 					3,
 				},
+				EndTime: &end,
 			},
+			epoch,
 			[]*wire.TimeSeries{
 				&wire.TimeSeries{
 					Points: []*wire.Point{
 						&wire.Point{
-							Value: wire.PointInt64Value{Int64Value: 1},
+							Value:     wire.PointInt64Value{Int64Value: 1},
+							Timestamp: &endTimestamp,
 						},
 					},
+					StartTimestamp: &epochTimestamp,
 				},
 				&wire.TimeSeries{
 					Points: []*wire.Point{
 						&wire.Point{
-							Value: wire.PointInt64Value{Int64Value: 2},
+							Value:     wire.PointInt64Value{Int64Value: 2},
+							Timestamp: &endTimestamp,
 						},
 					},
+					StartTimestamp: &epochTimestamp,
 				},
 				&wire.TimeSeries{
 					Points: []*wire.Point{
 						&wire.Point{
-							Value: wire.PointInt64Value{Int64Value: 3},
+							Value:     wire.PointInt64Value{Int64Value: 3},
+							Timestamp: &endTimestamp,
 						},
 					},
+					StartTimestamp: &epochTimestamp,
 				},
 			},
 		},
@@ -368,21 +385,27 @@
 					1.5,
 					4.5,
 				},
+				EndTime: &end,
 			},
+			epoch,
 			[]*wire.TimeSeries{
 				&wire.TimeSeries{
 					Points: []*wire.Point{
 						&wire.Point{
-							Value: wire.PointDoubleValue{DoubleValue: 1.5},
+							Value:     wire.PointDoubleValue{DoubleValue: 1.5},
+							Timestamp: &endTimestamp,
 						},
 					},
+					StartTimestamp: &epochTimestamp,
 				},
 				&wire.TimeSeries{
 					Points: []*wire.Point{
 						&wire.Point{
-							Value: wire.PointDoubleValue{DoubleValue: 4.5},
+							Value:     wire.PointDoubleValue{DoubleValue: 4.5},
+							Timestamp: &endTimestamp,
 						},
 					},
+					StartTimestamp: &epochTimestamp,
 				},
 			},
 		},
@@ -405,7 +428,9 @@
 						0, 5, 10,
 					},
 				},
+				EndTime: &end,
 			},
+			epoch,
 			[]*wire.TimeSeries{
 				&wire.TimeSeries{
 					Points: []*wire.Point{
@@ -432,8 +457,10 @@
 									},
 								},
 							},
+							Timestamp: &endTimestamp,
 						},
 					},
+					StartTimestamp: &epochTimestamp,
 				},
 			},
 		},
@@ -455,7 +482,9 @@
 						0, 5,
 					},
 				},
+				EndTime: &end,
 			},
+			epoch,
 			[]*wire.TimeSeries{
 				&wire.TimeSeries{
 					Points: []*wire.Point{
@@ -479,8 +508,10 @@
 									},
 								},
 							},
+							Timestamp: &endTimestamp,
 						},
 					},
+					StartTimestamp: &epochTimestamp,
 				},
 			},
 		},
@@ -488,7 +519,7 @@
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			got := dataToTimeseries(tt.data)
+			got := dataToTimeseries(tt.data, tt.start)
 			if !reflect.DeepEqual(got, tt.want) {
 				t.Fatalf("Got:\n%s\nWant:\n%s", marshaled(got), marshaled(tt.want))
 			}
@@ -559,11 +590,15 @@
 }
 
 func TestDataToPoints(t *testing.T) {
+	end := time.Unix(30, 0)
+	endTimestamp := convertTimestamp(end)
+
 	int64Data := &metric.Int64Data{
 		Rows: []int64{
 			0,
 			10,
 		},
+		EndTime: &end,
 	}
 
 	float64Data := &metric.Float64Data{
@@ -571,6 +606,7 @@
 			0.5,
 			0.25,
 		},
+		EndTime: &end,
 	}
 
 	histogramInt64Data := &metric.HistogramInt64Data{
@@ -599,6 +635,7 @@
 				0, 5, 10,
 			},
 		},
+		EndTime: &end,
 	}
 
 	histogramFloat64Data := &metric.HistogramFloat64Data{
@@ -627,6 +664,7 @@
 				0, 5, 10,
 			},
 		},
+		EndTime: &end,
 	}
 
 	tests := []struct {
@@ -650,6 +688,7 @@
 					Value: wire.PointInt64Value{
 						Int64Value: 0,
 					},
+					Timestamp: &endTimestamp,
 				},
 			},
 		},
@@ -662,6 +701,7 @@
 					Value: wire.PointInt64Value{
 						Int64Value: 10,
 					},
+					Timestamp: &endTimestamp,
 				},
 			},
 		},
@@ -674,6 +714,7 @@
 					Value: wire.PointDoubleValue{
 						DoubleValue: 0.5,
 					},
+					Timestamp: &endTimestamp,
 				},
 			},
 		},
@@ -686,6 +727,7 @@
 					Value: wire.PointDoubleValue{
 						DoubleValue: 0.25,
 					},
+					Timestamp: &endTimestamp,
 				},
 			},
 		},
@@ -717,6 +759,7 @@
 							},
 						},
 					},
+					Timestamp: &endTimestamp,
 				},
 			},
 		},
@@ -748,6 +791,7 @@
 							},
 						},
 					},
+					Timestamp: &endTimestamp,
 				},
 			},
 		},
@@ -779,6 +823,7 @@
 							},
 						},
 					},
+					Timestamp: &endTimestamp,
 				},
 			},
 		},
@@ -810,6 +855,7 @@
 							},
 						},
 					},
+					Timestamp: &endTimestamp,
 				},
 			},
 		},
@@ -826,12 +872,16 @@
 }
 
 func TestDistributionToPoints(t *testing.T) {
+	end := time.Unix(30, 0)
+	endTimestamp := convertTimestamp(end)
+
 	tests := []struct {
 		name    string
 		counts  []int64
 		count   int64
 		sum     float64
 		buckets []float64
+		end     time.Time
 		want    []*wire.Point
 	}{
 		{
@@ -846,6 +896,7 @@
 			buckets: []float64{
 				0, 5, 10,
 			},
+			end: end,
 			want: []*wire.Point{
 				{
 					Value: wire.PointDistributionValue{
@@ -871,6 +922,7 @@
 							},
 						},
 					},
+					Timestamp: &endTimestamp,
 				},
 			},
 		},
@@ -878,7 +930,7 @@
 
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
-			got := distributionToPoints(tt.counts, tt.count, tt.sum, tt.buckets)
+			got := distributionToPoints(tt.counts, tt.count, tt.sum, tt.buckets, tt.end)
 			if !reflect.DeepEqual(got, tt.want) {
 				t.Fatalf("Got:\n%s\nWant:\n%s", marshaled(got), marshaled(tt.want))
 			}
diff --git a/internal/telemetry/export/ocagent/ocagent.go b/internal/telemetry/export/ocagent/ocagent.go
index d1ebfd2..cad7e13 100644
--- a/internal/telemetry/export/ocagent/ocagent.go
+++ b/internal/telemetry/export/ocagent/ocagent.go
@@ -114,7 +114,7 @@
 func (e *exporter) Metric(ctx context.Context, data telemetry.MetricData) {
 	e.mu.Lock()
 	defer e.mu.Unlock()
-	e.metrics = append(e.metrics, convertMetric(data))
+	e.metrics = append(e.metrics, convertMetric(data, e.config.Start))
 }
 
 func (e *exporter) Flush() {
@@ -202,9 +202,9 @@
 	return result
 }
 
-func convertMetric(data telemetry.MetricData) *wire.Metric {
+func convertMetric(data telemetry.MetricData, start time.Time) *wire.Metric {
 	descriptor := dataToMetricDescriptor(data)
-	timeseries := dataToTimeseries(data)
+	timeseries := dataToTimeseries(data, start)
 
 	if descriptor == nil && timeseries == nil {
 		return nil
diff --git a/internal/telemetry/metric/metric.go b/internal/telemetry/metric/metric.go
index 0333450..98f0bd8 100644
--- a/internal/telemetry/metric/metric.go
+++ b/internal/telemetry/metric/metric.go
@@ -131,6 +131,8 @@
 	IsGauge bool
 	// Rows holds the per group values for the metric.
 	Rows []int64
+	// End is the last time this metric was updated.
+	EndTime *time.Time
 
 	groups []telemetry.TagList
 }
@@ -143,6 +145,8 @@
 	IsGauge bool
 	// Rows holds the per group values for the metric.
 	Rows []float64
+	// End is the last time this metric was updated.
+	EndTime *time.Time
 
 	groups []telemetry.TagList
 }
@@ -153,6 +157,8 @@
 	Info *HistogramInt64
 	// Rows holds the per group values for the metric.
 	Rows []*HistogramInt64Row
+	// End is the last time this metric was updated.
+	EndTime *time.Time
 
 	groups []telemetry.TagList
 }
@@ -177,6 +183,8 @@
 	Info *HistogramFloat64
 	// Rows holds the per group values for the metric.
 	Rows []*HistogramFloat64Row
+	// End is the last time this metric was updated.
+	EndTime *time.Time
 
 	groups []telemetry.TagList
 }
@@ -215,7 +223,7 @@
 func (data *Int64Data) Handle() string              { return data.Info.Name }
 func (data *Int64Data) Groups() []telemetry.TagList { return data.groups }
 
-func (data *Int64Data) modify(ctx context.Context, f func(v int64) int64) {
+func (data *Int64Data) modify(ctx context.Context, at time.Time, f func(v int64) int64) {
 	index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
 	old := data.Rows
 	if insert {
@@ -227,34 +235,31 @@
 		copy(data.Rows, old)
 	}
 	data.Rows[index] = f(data.Rows[index])
+	data.EndTime = &at
 	frozen := *data
 	export.Metric(ctx, &frozen)
 }
 
 func (data *Int64Data) countInt64(ctx context.Context, measure *stats.Int64Measure, value int64, at time.Time) {
-	// TODO: Use at.
-	data.modify(ctx, func(v int64) int64 { return v + 1 })
+	data.modify(ctx, at, func(v int64) int64 { return v + 1 })
 }
 
 func (data *Int64Data) countFloat64(ctx context.Context, measure *stats.Float64Measure, value float64, at time.Time) {
-	// TODO: Use at.
-	data.modify(ctx, func(v int64) int64 { return v + 1 })
+	data.modify(ctx, at, func(v int64) int64 { return v + 1 })
 }
 
 func (data *Int64Data) sum(ctx context.Context, measure *stats.Int64Measure, value int64, at time.Time) {
-	// TODO: Use at.
-	data.modify(ctx, func(v int64) int64 { return v + value })
+	data.modify(ctx, at, func(v int64) int64 { return v + value })
 }
 
 func (data *Int64Data) latest(ctx context.Context, measure *stats.Int64Measure, value int64, at time.Time) {
-	// TODO: Use at.
-	data.modify(ctx, func(v int64) int64 { return value })
+	data.modify(ctx, at, func(v int64) int64 { return value })
 }
 
 func (data *Float64Data) Handle() string              { return data.Info.Name }
 func (data *Float64Data) Groups() []telemetry.TagList { return data.groups }
 
-func (data *Float64Data) modify(ctx context.Context, f func(v float64) float64) {
+func (data *Float64Data) modify(ctx context.Context, at time.Time, f func(v float64) float64) {
 	index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
 	old := data.Rows
 	if insert {
@@ -266,23 +271,23 @@
 		copy(data.Rows, old)
 	}
 	data.Rows[index] = f(data.Rows[index])
+	data.EndTime = &at
 	frozen := *data
 	export.Metric(ctx, &frozen)
 }
 
 func (data *Float64Data) sum(ctx context.Context, measure *stats.Float64Measure, value float64, at time.Time) {
-	data.modify(ctx, func(v float64) float64 { return v + value })
+	data.modify(ctx, at, func(v float64) float64 { return v + value })
 }
 
 func (data *Float64Data) latest(ctx context.Context, measure *stats.Float64Measure, value float64, at time.Time) {
-	// TODO: Use at.
-	data.modify(ctx, func(v float64) float64 { return value })
+	data.modify(ctx, at, func(v float64) float64 { return value })
 }
 
 func (data *HistogramInt64Data) Handle() string              { return data.Info.Name }
 func (data *HistogramInt64Data) Groups() []telemetry.TagList { return data.groups }
 
-func (data *HistogramInt64Data) modify(ctx context.Context, f func(v *HistogramInt64Row)) {
+func (data *HistogramInt64Data) modify(ctx context.Context, at time.Time, f func(v *HistogramInt64Row)) {
 	index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
 	old := data.Rows
 	var v HistogramInt64Row
@@ -300,13 +305,13 @@
 	copy(v.Values, oldValues)
 	f(&v)
 	data.Rows[index] = &v
+	data.EndTime = &at
 	frozen := *data
 	export.Metric(ctx, &frozen)
 }
 
 func (data *HistogramInt64Data) record(ctx context.Context, measure *stats.Int64Measure, value int64, at time.Time) {
-	// TODO: Use at.
-	data.modify(ctx, func(v *HistogramInt64Row) {
+	data.modify(ctx, at, func(v *HistogramInt64Row) {
 		v.Sum += value
 		if v.Min > value || v.Count == 0 {
 			v.Min = value
@@ -326,7 +331,7 @@
 func (data *HistogramFloat64Data) Handle() string              { return data.Info.Name }
 func (data *HistogramFloat64Data) Groups() []telemetry.TagList { return data.groups }
 
-func (data *HistogramFloat64Data) modify(ctx context.Context, f func(v *HistogramFloat64Row)) {
+func (data *HistogramFloat64Data) modify(ctx context.Context, at time.Time, f func(v *HistogramFloat64Row)) {
 	index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
 	old := data.Rows
 	var v HistogramFloat64Row
@@ -344,13 +349,13 @@
 	copy(v.Values, oldValues)
 	f(&v)
 	data.Rows[index] = &v
+	data.EndTime = &at
 	frozen := *data
 	export.Metric(ctx, &frozen)
 }
 
 func (data *HistogramFloat64Data) record(ctx context.Context, measure *stats.Float64Measure, value float64, at time.Time) {
-	// TODO: Use at.
-	data.modify(ctx, func(v *HistogramFloat64Row) {
+	data.modify(ctx, at, func(v *HistogramFloat64Row) {
 		v.Sum += value
 		if v.Min > value || v.Count == 0 {
 			v.Min = value