internal/telemetry: change concurrency model
This changes to use a mutex and directly execute the less performance
sensitive telemetry calls (tracing and logging) and then uses a submission
queue only for stats adjustments as those are much more sensitive (but it
should also be easier to keep up with them in bursts)
Fixes golang/go#33692
Change-Id: Ia59a8975f21dfbfcf115be1f1d11b097be8dd9c8
Reviewed-on: https://go-review.googlesource.com/c/tools/+/190737
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Rebecca Stambler <rstambler@golang.org>
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
diff --git a/internal/lsp/debug/serve.go b/internal/lsp/debug/serve.go
index c7aad50..6e53502 100644
--- a/internal/lsp/debug/serve.go
+++ b/internal/lsp/debug/serve.go
@@ -248,18 +248,15 @@
func Render(tmpl *template.Template, fun func(*http.Request) interface{}) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
- done := make(chan struct{})
- export.Do(func() {
- defer close(done)
- var data interface{}
- if fun != nil {
- data = fun(r)
- }
- if err := tmpl.Execute(w, data); err != nil {
- log.Error(context.Background(), "", err)
- }
- })
- <-done
+ mu.Lock()
+ defer mu.Unlock()
+ var data interface{}
+ if fun != nil {
+ data = fun(r)
+ }
+ if err := tmpl.Execute(w, data); err != nil {
+ log.Error(context.Background(), "", err)
+ }
}
}
diff --git a/internal/telemetry/export/export.go b/internal/telemetry/export/export.go
index aff7247..355d46f 100644
--- a/internal/telemetry/export/export.go
+++ b/internal/telemetry/export/export.go
@@ -10,6 +10,7 @@
import (
"context"
"os"
+ "sync"
"time"
"golang.org/x/tools/internal/telemetry"
@@ -27,68 +28,65 @@
Metric(context.Context, telemetry.MetricData)
}
-var exporter = LogWriter(os.Stderr, true)
-
-func SetExporter(setter func(Exporter) Exporter) {
- Do(func() {
- exporter = setter(exporter)
- })
-}
+var (
+ exporterMu sync.Mutex
+ exporter = LogWriter(os.Stderr, true)
+)
func AddExporters(e ...Exporter) {
- Do(func() {
- exporter = Multi(append([]Exporter{exporter}, e...)...)
- })
+ exporterMu.Lock()
+ defer exporterMu.Unlock()
+ exporter = Multi(append([]Exporter{exporter}, e...)...)
}
func StartSpan(ctx context.Context, span *telemetry.Span, at time.Time) {
- Do(func() {
- span.Start = at
- exporter.StartSpan(ctx, span)
- })
+ exporterMu.Lock()
+ defer exporterMu.Unlock()
+ span.Start = at
+ exporter.StartSpan(ctx, span)
}
func FinishSpan(ctx context.Context, span *telemetry.Span, at time.Time) {
- Do(func() {
- span.Finish = at
- exporter.FinishSpan(ctx, span)
- })
+ exporterMu.Lock()
+ defer exporterMu.Unlock()
+ span.Finish = at
+ exporter.FinishSpan(ctx, span)
}
func Tag(ctx context.Context, at time.Time, tags telemetry.TagList) {
- Do(func() {
- // If context has a span we need to add the tags to it
- span := telemetry.GetSpan(ctx)
- if span == nil {
- return
- }
- if span.Start.IsZero() {
- // span still being created, tag it directly
- span.Tags = append(span.Tags, tags...)
- return
- }
- // span in progress, add an event to the span
- span.Events = append(span.Events, telemetry.Event{
- At: at,
- Tags: tags,
- })
+ exporterMu.Lock()
+ defer exporterMu.Unlock()
+ // If context has a span we need to add the tags to it
+ span := telemetry.GetSpan(ctx)
+ if span == nil {
+ return
+ }
+ if span.Start.IsZero() {
+ // span still being created, tag it directly
+ span.Tags = append(span.Tags, tags...)
+ return
+ }
+ // span in progress, add an event to the span
+ span.Events = append(span.Events, telemetry.Event{
+ At: at,
+ Tags: tags,
})
}
func Log(ctx context.Context, event telemetry.Event) {
- Do(func() {
- // If context has a span we need to add the event to it
- span := telemetry.GetSpan(ctx)
- if span != nil {
- span.Events = append(span.Events, event)
- }
- // and now also hand the event of to the current observer
- exporter.Log(ctx, event)
- })
+ exporterMu.Lock()
+ defer exporterMu.Unlock()
+ // If context has a span we need to add the event to it
+ span := telemetry.GetSpan(ctx)
+ if span != nil {
+ span.Events = append(span.Events, event)
+ }
+ // and now also hand the event of to the current observer
+ exporter.Log(ctx, event)
}
func Metric(ctx context.Context, data telemetry.MetricData) {
- Do(func() {
- exporter.Metric(ctx, data)
- })
+ exporterMu.Lock()
+ defer exporterMu.Unlock()
+ exporter.Metric(ctx, data)
}
diff --git a/internal/telemetry/export/ocagent/ocagent.go b/internal/telemetry/export/ocagent/ocagent.go
index ef3b5a7..c0a323a 100644
--- a/internal/telemetry/export/ocagent/ocagent.go
+++ b/internal/telemetry/export/ocagent/ocagent.go
@@ -14,6 +14,7 @@
"fmt"
"net/http"
"os"
+ "sync"
"time"
"golang.org/x/tools/internal/telemetry"
@@ -26,6 +27,7 @@
const exportRate = 2 * time.Second
type exporter struct {
+ mu sync.Mutex
address string
node *wire.Node
spans []*wire.Span
@@ -63,9 +65,7 @@
}
go func() {
for _ = range time.Tick(exportRate) {
- export.Do(func() {
- exporter.flush()
- })
+ exporter.flush()
}
}()
return exporter
@@ -74,16 +74,22 @@
func (e *exporter) StartSpan(ctx context.Context, span *telemetry.Span) {}
func (e *exporter) FinishSpan(ctx context.Context, span *telemetry.Span) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
e.spans = append(e.spans, convertSpan(span))
}
func (e *exporter) Log(context.Context, telemetry.Event) {}
func (e *exporter) Metric(ctx context.Context, data telemetry.MetricData) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
e.metrics = append(e.metrics, convertMetric(data))
}
func (e *exporter) flush() {
+ e.mu.Lock()
+ defer e.mu.Unlock()
spans := e.spans
e.spans = nil
metrics := e.metrics
diff --git a/internal/telemetry/export/prometheus/prometheus.go b/internal/telemetry/export/prometheus/prometheus.go
index 57cc8d9..903c685 100644
--- a/internal/telemetry/export/prometheus/prometheus.go
+++ b/internal/telemetry/export/prometheus/prometheus.go
@@ -10,9 +10,9 @@
"fmt"
"net/http"
"sort"
+ "sync"
"golang.org/x/tools/internal/telemetry"
- "golang.org/x/tools/internal/telemetry/export"
"golang.org/x/tools/internal/telemetry/metric"
)
@@ -21,6 +21,7 @@
}
type Exporter struct {
+ mu sync.Mutex
metrics []telemetry.MetricData
}
@@ -28,6 +29,8 @@
func (e *Exporter) FinishSpan(ctx context.Context, span *telemetry.Span) {}
func (e *Exporter) Log(ctx context.Context, event telemetry.Event) {}
func (e *Exporter) Metric(ctx context.Context, data telemetry.MetricData) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
name := data.Handle()
// We keep the metrics in name sorted order so the page is stable and easy
// to read. We do this with an insertion sort rather than sorting the list
@@ -76,48 +79,45 @@
}
func (e *Exporter) Serve(w http.ResponseWriter, r *http.Request) {
- done := make(chan struct{})
- export.Do(func() {
- defer close(done)
- for _, data := range e.metrics {
- switch data := data.(type) {
- case *metric.Int64Data:
- e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
- for i, group := range data.Groups() {
- e.row(w, data.Info.Name, group, "", data.Rows[i])
- }
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ for _, data := range e.metrics {
+ switch data := data.(type) {
+ case *metric.Int64Data:
+ e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
+ for i, group := range data.Groups() {
+ e.row(w, data.Info.Name, group, "", data.Rows[i])
+ }
- case *metric.Float64Data:
- e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
- for i, group := range data.Groups() {
- e.row(w, data.Info.Name, group, "", data.Rows[i])
- }
+ case *metric.Float64Data:
+ e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
+ for i, group := range data.Groups() {
+ e.row(w, data.Info.Name, group, "", data.Rows[i])
+ }
- case *metric.HistogramInt64Data:
- e.header(w, data.Info.Name, data.Info.Description, false, true)
- for i, group := range data.Groups() {
- row := data.Rows[i]
- for j, b := range data.Info.Buckets {
- e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
- }
- e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
- e.row(w, data.Info.Name+"_count", group, "", row.Count)
- e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
+ case *metric.HistogramInt64Data:
+ e.header(w, data.Info.Name, data.Info.Description, false, true)
+ for i, group := range data.Groups() {
+ row := data.Rows[i]
+ for j, b := range data.Info.Buckets {
+ e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
}
+ e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
+ e.row(w, data.Info.Name+"_count", group, "", row.Count)
+ e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
+ }
- case *metric.HistogramFloat64Data:
- e.header(w, data.Info.Name, data.Info.Description, false, true)
- for i, group := range data.Groups() {
- row := data.Rows[i]
- for j, b := range data.Info.Buckets {
- e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
- }
- e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
- e.row(w, data.Info.Name+"_count", group, "", row.Count)
- e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
+ case *metric.HistogramFloat64Data:
+ e.header(w, data.Info.Name, data.Info.Description, false, true)
+ for i, group := range data.Groups() {
+ row := data.Rows[i]
+ for j, b := range data.Info.Buckets {
+ e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
}
+ e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
+ e.row(w, data.Info.Name+"_count", group, "", row.Count)
+ e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
}
}
- })
- <-done
+ }
}
diff --git a/internal/telemetry/metric/metric.go b/internal/telemetry/metric/metric.go
index 28a9a8e..64aa572 100644
--- a/internal/telemetry/metric/metric.go
+++ b/internal/telemetry/metric/metric.go
@@ -215,21 +215,19 @@
func (data *Int64Data) Groups() []telemetry.TagList { return data.groups }
func (data *Int64Data) modify(ctx context.Context, f func(v int64) int64) {
- export.Do(func() {
- index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
- old := data.Rows
- if insert {
- data.Rows = make([]int64, len(old)+1)
- copy(data.Rows, old[:index])
- copy(data.Rows[index+1:], old[index:])
- } else {
- data.Rows = make([]int64, len(old))
- copy(data.Rows, old)
- }
- data.Rows[index] = f(data.Rows[index])
- frozen := *data
- export.Metric(ctx, &frozen)
- })
+ index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
+ old := data.Rows
+ if insert {
+ data.Rows = make([]int64, len(old)+1)
+ copy(data.Rows, old[:index])
+ copy(data.Rows[index+1:], old[index:])
+ } else {
+ data.Rows = make([]int64, len(old))
+ copy(data.Rows, old)
+ }
+ data.Rows[index] = f(data.Rows[index])
+ frozen := *data
+ export.Metric(ctx, &frozen)
}
func (data *Int64Data) countInt64(ctx context.Context, measure *stats.Int64Measure, value int64) {
@@ -252,21 +250,19 @@
func (data *Float64Data) Groups() []telemetry.TagList { return data.groups }
func (data *Float64Data) modify(ctx context.Context, f func(v float64) float64) {
- export.Do(func() {
- index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
- old := data.Rows
- if insert {
- data.Rows = make([]float64, len(old)+1)
- copy(data.Rows, old[:index])
- copy(data.Rows[index+1:], old[index:])
- } else {
- data.Rows = make([]float64, len(old))
- copy(data.Rows, old)
- }
- data.Rows[index] = f(data.Rows[index])
- frozen := *data
- export.Metric(ctx, &frozen)
- })
+ index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
+ old := data.Rows
+ if insert {
+ data.Rows = make([]float64, len(old)+1)
+ copy(data.Rows, old[:index])
+ copy(data.Rows[index+1:], old[index:])
+ } else {
+ data.Rows = make([]float64, len(old))
+ copy(data.Rows, old)
+ }
+ data.Rows[index] = f(data.Rows[index])
+ frozen := *data
+ export.Metric(ctx, &frozen)
}
func (data *Float64Data) sum(ctx context.Context, measure *stats.Float64Measure, value float64) {
@@ -281,27 +277,25 @@
func (data *HistogramInt64Data) Groups() []telemetry.TagList { return data.groups }
func (data *HistogramInt64Data) modify(ctx context.Context, f func(v *HistogramInt64Row)) {
- export.Do(func() {
- index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
- old := data.Rows
- var v HistogramInt64Row
- if insert {
- data.Rows = make([]*HistogramInt64Row, len(old)+1)
- copy(data.Rows, old[:index])
- copy(data.Rows[index+1:], old[index:])
- } else {
- data.Rows = make([]*HistogramInt64Row, len(old))
- copy(data.Rows, old)
- v = *data.Rows[index]
- }
- oldValues := v.Values
- v.Values = make([]int64, len(data.Info.Buckets))
- copy(v.Values, oldValues)
- f(&v)
- data.Rows[index] = &v
- frozen := *data
- export.Metric(ctx, &frozen)
- })
+ index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
+ old := data.Rows
+ var v HistogramInt64Row
+ if insert {
+ data.Rows = make([]*HistogramInt64Row, len(old)+1)
+ copy(data.Rows, old[:index])
+ copy(data.Rows[index+1:], old[index:])
+ } else {
+ data.Rows = make([]*HistogramInt64Row, len(old))
+ copy(data.Rows, old)
+ v = *data.Rows[index]
+ }
+ oldValues := v.Values
+ v.Values = make([]int64, len(data.Info.Buckets))
+ copy(v.Values, oldValues)
+ f(&v)
+ data.Rows[index] = &v
+ frozen := *data
+ export.Metric(ctx, &frozen)
}
func (data *HistogramInt64Data) record(ctx context.Context, measure *stats.Int64Measure, value int64) {
@@ -326,27 +320,25 @@
func (data *HistogramFloat64Data) Groups() []telemetry.TagList { return data.groups }
func (data *HistogramFloat64Data) modify(ctx context.Context, f func(v *HistogramFloat64Row)) {
- export.Do(func() {
- index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
- old := data.Rows
- var v HistogramFloat64Row
- if insert {
- data.Rows = make([]*HistogramFloat64Row, len(old)+1)
- copy(data.Rows, old[:index])
- copy(data.Rows[index+1:], old[index:])
- } else {
- data.Rows = make([]*HistogramFloat64Row, len(old))
- copy(data.Rows, old)
- v = *data.Rows[index]
- }
- oldValues := v.Values
- v.Values = make([]int64, len(data.Info.Buckets))
- copy(v.Values, oldValues)
- f(&v)
- data.Rows[index] = &v
- frozen := *data
- export.Metric(ctx, &frozen)
- })
+ index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
+ old := data.Rows
+ var v HistogramFloat64Row
+ if insert {
+ data.Rows = make([]*HistogramFloat64Row, len(old)+1)
+ copy(data.Rows, old[:index])
+ copy(data.Rows[index+1:], old[index:])
+ } else {
+ data.Rows = make([]*HistogramFloat64Row, len(old))
+ copy(data.Rows, old)
+ v = *data.Rows[index]
+ }
+ oldValues := v.Values
+ v.Values = make([]int64, len(data.Info.Buckets))
+ copy(v.Values, oldValues)
+ f(&v)
+ data.Rows[index] = &v
+ frozen := *data
+ export.Metric(ctx, &frozen)
}
func (data *HistogramFloat64Data) record(ctx context.Context, measure *stats.Float64Measure, value float64) {
diff --git a/internal/telemetry/stats/stats.go b/internal/telemetry/stats/stats.go
index 807944f..a2b97d5 100644
--- a/internal/telemetry/stats/stats.go
+++ b/internal/telemetry/stats/stats.go
@@ -69,9 +69,11 @@
// Record delivers a new value to the subscribers of this measure.
func (m *Int64Measure) Record(ctx context.Context, value int64) {
- for _, s := range m.subscribers {
- s(ctx, m, value)
- }
+ do(func() {
+ for _, s := range m.subscribers {
+ s(ctx, m, value)
+ }
+ })
}
// Name returns the name this measure was given on construction.
@@ -88,7 +90,9 @@
// Record delivers a new value to the subscribers of this measure.
func (m *Float64Measure) Record(ctx context.Context, value float64) {
- for _, s := range m.subscribers {
- s(ctx, m, value)
- }
+ do(func() {
+ for _, s := range m.subscribers {
+ s(ctx, m, value)
+ }
+ })
}
diff --git a/internal/telemetry/export/worker.go b/internal/telemetry/stats/worker.go
similarity index 89%
rename from internal/telemetry/export/worker.go
rename to internal/telemetry/stats/worker.go
index 0458fdd..e690a2c 100644
--- a/internal/telemetry/export/worker.go
+++ b/internal/telemetry/stats/worker.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package export
+package stats
import (
"fmt"
@@ -23,14 +23,14 @@
}()
}
-// Do adds a task to the list of things to work on in the background.
+// do adds a task to the list of things to work on in the background.
// All tasks will be handled in submission order, and no two tasks will happen
// concurrently so they do not need to do any kind of locking.
// It is safe however to call Do concurrently.
// No promises are made about when the tasks will be performed.
// This function may block, but in general it will return very quickly and
// before the task has been run.
-func Do(task func()) {
+func do(task func()) {
select {
case workQueue <- task:
default: