internal/telemetry: change concurrency model

This changes to use a mutex and directly execute the less performance
sensitive telemetry calls (tracing and logging) and then uses a submission
queue only for stats adjustments as those are much more sensitive (but it
should also be easier to keep up with them in bursts)

Fixes golang/go#33692

Change-Id: Ia59a8975f21dfbfcf115be1f1d11b097be8dd9c8
Reviewed-on: https://go-review.googlesource.com/c/tools/+/190737
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Rebecca Stambler <rstambler@golang.org>
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
diff --git a/internal/lsp/debug/serve.go b/internal/lsp/debug/serve.go
index c7aad50..6e53502 100644
--- a/internal/lsp/debug/serve.go
+++ b/internal/lsp/debug/serve.go
@@ -248,18 +248,15 @@
 
 func Render(tmpl *template.Template, fun func(*http.Request) interface{}) func(http.ResponseWriter, *http.Request) {
 	return func(w http.ResponseWriter, r *http.Request) {
-		done := make(chan struct{})
-		export.Do(func() {
-			defer close(done)
-			var data interface{}
-			if fun != nil {
-				data = fun(r)
-			}
-			if err := tmpl.Execute(w, data); err != nil {
-				log.Error(context.Background(), "", err)
-			}
-		})
-		<-done
+		mu.Lock()
+		defer mu.Unlock()
+		var data interface{}
+		if fun != nil {
+			data = fun(r)
+		}
+		if err := tmpl.Execute(w, data); err != nil {
+			log.Error(context.Background(), "", err)
+		}
 	}
 }
 
diff --git a/internal/telemetry/export/export.go b/internal/telemetry/export/export.go
index aff7247..355d46f 100644
--- a/internal/telemetry/export/export.go
+++ b/internal/telemetry/export/export.go
@@ -10,6 +10,7 @@
 import (
 	"context"
 	"os"
+	"sync"
 	"time"
 
 	"golang.org/x/tools/internal/telemetry"
@@ -27,68 +28,65 @@
 	Metric(context.Context, telemetry.MetricData)
 }
 
-var exporter = LogWriter(os.Stderr, true)
-
-func SetExporter(setter func(Exporter) Exporter) {
-	Do(func() {
-		exporter = setter(exporter)
-	})
-}
+var (
+	exporterMu sync.Mutex
+	exporter   = LogWriter(os.Stderr, true)
+)
 
 func AddExporters(e ...Exporter) {
-	Do(func() {
-		exporter = Multi(append([]Exporter{exporter}, e...)...)
-	})
+	exporterMu.Lock()
+	defer exporterMu.Unlock()
+	exporter = Multi(append([]Exporter{exporter}, e...)...)
 }
 
 func StartSpan(ctx context.Context, span *telemetry.Span, at time.Time) {
-	Do(func() {
-		span.Start = at
-		exporter.StartSpan(ctx, span)
-	})
+	exporterMu.Lock()
+	defer exporterMu.Unlock()
+	span.Start = at
+	exporter.StartSpan(ctx, span)
 }
 
 func FinishSpan(ctx context.Context, span *telemetry.Span, at time.Time) {
-	Do(func() {
-		span.Finish = at
-		exporter.FinishSpan(ctx, span)
-	})
+	exporterMu.Lock()
+	defer exporterMu.Unlock()
+	span.Finish = at
+	exporter.FinishSpan(ctx, span)
 }
 
 func Tag(ctx context.Context, at time.Time, tags telemetry.TagList) {
-	Do(func() {
-		// If context has a span we need to add the tags to it
-		span := telemetry.GetSpan(ctx)
-		if span == nil {
-			return
-		}
-		if span.Start.IsZero() {
-			// span still being created, tag it directly
-			span.Tags = append(span.Tags, tags...)
-			return
-		}
-		// span in progress, add an event to the span
-		span.Events = append(span.Events, telemetry.Event{
-			At:   at,
-			Tags: tags,
-		})
+	exporterMu.Lock()
+	defer exporterMu.Unlock()
+	// If context has a span we need to add the tags to it
+	span := telemetry.GetSpan(ctx)
+	if span == nil {
+		return
+	}
+	if span.Start.IsZero() {
+		// span still being created, tag it directly
+		span.Tags = append(span.Tags, tags...)
+		return
+	}
+	// span in progress, add an event to the span
+	span.Events = append(span.Events, telemetry.Event{
+		At:   at,
+		Tags: tags,
 	})
 }
 
 func Log(ctx context.Context, event telemetry.Event) {
-	Do(func() {
-		// If context has a span we need to add the event to it
-		span := telemetry.GetSpan(ctx)
-		if span != nil {
-			span.Events = append(span.Events, event)
-		}
-		// and now also hand the event of to the current observer
-		exporter.Log(ctx, event)
-	})
+	exporterMu.Lock()
+	defer exporterMu.Unlock()
+	// If context has a span we need to add the event to it
+	span := telemetry.GetSpan(ctx)
+	if span != nil {
+		span.Events = append(span.Events, event)
+	}
+	// and now also hand the event of to the current observer
+	exporter.Log(ctx, event)
 }
 
 func Metric(ctx context.Context, data telemetry.MetricData) {
-	Do(func() {
-		exporter.Metric(ctx, data)
-	})
+	exporterMu.Lock()
+	defer exporterMu.Unlock()
+	exporter.Metric(ctx, data)
 }
diff --git a/internal/telemetry/export/ocagent/ocagent.go b/internal/telemetry/export/ocagent/ocagent.go
index ef3b5a7..c0a323a 100644
--- a/internal/telemetry/export/ocagent/ocagent.go
+++ b/internal/telemetry/export/ocagent/ocagent.go
@@ -14,6 +14,7 @@
 	"fmt"
 	"net/http"
 	"os"
+	"sync"
 	"time"
 
 	"golang.org/x/tools/internal/telemetry"
@@ -26,6 +27,7 @@
 const exportRate = 2 * time.Second
 
 type exporter struct {
+	mu      sync.Mutex
 	address string
 	node    *wire.Node
 	spans   []*wire.Span
@@ -63,9 +65,7 @@
 	}
 	go func() {
 		for _ = range time.Tick(exportRate) {
-			export.Do(func() {
-				exporter.flush()
-			})
+			exporter.flush()
 		}
 	}()
 	return exporter
@@ -74,16 +74,22 @@
 func (e *exporter) StartSpan(ctx context.Context, span *telemetry.Span) {}
 
 func (e *exporter) FinishSpan(ctx context.Context, span *telemetry.Span) {
+	e.mu.Lock()
+	defer e.mu.Unlock()
 	e.spans = append(e.spans, convertSpan(span))
 }
 
 func (e *exporter) Log(context.Context, telemetry.Event) {}
 
 func (e *exporter) Metric(ctx context.Context, data telemetry.MetricData) {
+	e.mu.Lock()
+	defer e.mu.Unlock()
 	e.metrics = append(e.metrics, convertMetric(data))
 }
 
 func (e *exporter) flush() {
+	e.mu.Lock()
+	defer e.mu.Unlock()
 	spans := e.spans
 	e.spans = nil
 	metrics := e.metrics
diff --git a/internal/telemetry/export/prometheus/prometheus.go b/internal/telemetry/export/prometheus/prometheus.go
index 57cc8d9..903c685 100644
--- a/internal/telemetry/export/prometheus/prometheus.go
+++ b/internal/telemetry/export/prometheus/prometheus.go
@@ -10,9 +10,9 @@
 	"fmt"
 	"net/http"
 	"sort"
+	"sync"
 
 	"golang.org/x/tools/internal/telemetry"
-	"golang.org/x/tools/internal/telemetry/export"
 	"golang.org/x/tools/internal/telemetry/metric"
 )
 
@@ -21,6 +21,7 @@
 }
 
 type Exporter struct {
+	mu      sync.Mutex
 	metrics []telemetry.MetricData
 }
 
@@ -28,6 +29,8 @@
 func (e *Exporter) FinishSpan(ctx context.Context, span *telemetry.Span) {}
 func (e *Exporter) Log(ctx context.Context, event telemetry.Event)       {}
 func (e *Exporter) Metric(ctx context.Context, data telemetry.MetricData) {
+	e.mu.Lock()
+	defer e.mu.Unlock()
 	name := data.Handle()
 	// We keep the metrics in name sorted order so the page is stable and easy
 	// to read. We do this with an insertion sort rather than sorting the list
@@ -76,48 +79,45 @@
 }
 
 func (e *Exporter) Serve(w http.ResponseWriter, r *http.Request) {
-	done := make(chan struct{})
-	export.Do(func() {
-		defer close(done)
-		for _, data := range e.metrics {
-			switch data := data.(type) {
-			case *metric.Int64Data:
-				e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
-				for i, group := range data.Groups() {
-					e.row(w, data.Info.Name, group, "", data.Rows[i])
-				}
+	e.mu.Lock()
+	defer e.mu.Unlock()
+	for _, data := range e.metrics {
+		switch data := data.(type) {
+		case *metric.Int64Data:
+			e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
+			for i, group := range data.Groups() {
+				e.row(w, data.Info.Name, group, "", data.Rows[i])
+			}
 
-			case *metric.Float64Data:
-				e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
-				for i, group := range data.Groups() {
-					e.row(w, data.Info.Name, group, "", data.Rows[i])
-				}
+		case *metric.Float64Data:
+			e.header(w, data.Info.Name, data.Info.Description, data.IsGauge, false)
+			for i, group := range data.Groups() {
+				e.row(w, data.Info.Name, group, "", data.Rows[i])
+			}
 
-			case *metric.HistogramInt64Data:
-				e.header(w, data.Info.Name, data.Info.Description, false, true)
-				for i, group := range data.Groups() {
-					row := data.Rows[i]
-					for j, b := range data.Info.Buckets {
-						e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
-					}
-					e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
-					e.row(w, data.Info.Name+"_count", group, "", row.Count)
-					e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
+		case *metric.HistogramInt64Data:
+			e.header(w, data.Info.Name, data.Info.Description, false, true)
+			for i, group := range data.Groups() {
+				row := data.Rows[i]
+				for j, b := range data.Info.Buckets {
+					e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
 				}
+				e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
+				e.row(w, data.Info.Name+"_count", group, "", row.Count)
+				e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
+			}
 
-			case *metric.HistogramFloat64Data:
-				e.header(w, data.Info.Name, data.Info.Description, false, true)
-				for i, group := range data.Groups() {
-					row := data.Rows[i]
-					for j, b := range data.Info.Buckets {
-						e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
-					}
-					e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
-					e.row(w, data.Info.Name+"_count", group, "", row.Count)
-					e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
+		case *metric.HistogramFloat64Data:
+			e.header(w, data.Info.Name, data.Info.Description, false, true)
+			for i, group := range data.Groups() {
+				row := data.Rows[i]
+				for j, b := range data.Info.Buckets {
+					e.row(w, data.Info.Name+"_bucket", group, fmt.Sprintf(`le="%v"`, b), row.Values[j])
 				}
+				e.row(w, data.Info.Name+"_bucket", group, `le="+Inf"`, row.Count)
+				e.row(w, data.Info.Name+"_count", group, "", row.Count)
+				e.row(w, data.Info.Name+"_sum", group, "", row.Sum)
 			}
 		}
-	})
-	<-done
+	}
 }
diff --git a/internal/telemetry/metric/metric.go b/internal/telemetry/metric/metric.go
index 28a9a8e..64aa572 100644
--- a/internal/telemetry/metric/metric.go
+++ b/internal/telemetry/metric/metric.go
@@ -215,21 +215,19 @@
 func (data *Int64Data) Groups() []telemetry.TagList { return data.groups }
 
 func (data *Int64Data) modify(ctx context.Context, f func(v int64) int64) {
-	export.Do(func() {
-		index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
-		old := data.Rows
-		if insert {
-			data.Rows = make([]int64, len(old)+1)
-			copy(data.Rows, old[:index])
-			copy(data.Rows[index+1:], old[index:])
-		} else {
-			data.Rows = make([]int64, len(old))
-			copy(data.Rows, old)
-		}
-		data.Rows[index] = f(data.Rows[index])
-		frozen := *data
-		export.Metric(ctx, &frozen)
-	})
+	index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
+	old := data.Rows
+	if insert {
+		data.Rows = make([]int64, len(old)+1)
+		copy(data.Rows, old[:index])
+		copy(data.Rows[index+1:], old[index:])
+	} else {
+		data.Rows = make([]int64, len(old))
+		copy(data.Rows, old)
+	}
+	data.Rows[index] = f(data.Rows[index])
+	frozen := *data
+	export.Metric(ctx, &frozen)
 }
 
 func (data *Int64Data) countInt64(ctx context.Context, measure *stats.Int64Measure, value int64) {
@@ -252,21 +250,19 @@
 func (data *Float64Data) Groups() []telemetry.TagList { return data.groups }
 
 func (data *Float64Data) modify(ctx context.Context, f func(v float64) float64) {
-	export.Do(func() {
-		index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
-		old := data.Rows
-		if insert {
-			data.Rows = make([]float64, len(old)+1)
-			copy(data.Rows, old[:index])
-			copy(data.Rows[index+1:], old[index:])
-		} else {
-			data.Rows = make([]float64, len(old))
-			copy(data.Rows, old)
-		}
-		data.Rows[index] = f(data.Rows[index])
-		frozen := *data
-		export.Metric(ctx, &frozen)
-	})
+	index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
+	old := data.Rows
+	if insert {
+		data.Rows = make([]float64, len(old)+1)
+		copy(data.Rows, old[:index])
+		copy(data.Rows[index+1:], old[index:])
+	} else {
+		data.Rows = make([]float64, len(old))
+		copy(data.Rows, old)
+	}
+	data.Rows[index] = f(data.Rows[index])
+	frozen := *data
+	export.Metric(ctx, &frozen)
 }
 
 func (data *Float64Data) sum(ctx context.Context, measure *stats.Float64Measure, value float64) {
@@ -281,27 +277,25 @@
 func (data *HistogramInt64Data) Groups() []telemetry.TagList { return data.groups }
 
 func (data *HistogramInt64Data) modify(ctx context.Context, f func(v *HistogramInt64Row)) {
-	export.Do(func() {
-		index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
-		old := data.Rows
-		var v HistogramInt64Row
-		if insert {
-			data.Rows = make([]*HistogramInt64Row, len(old)+1)
-			copy(data.Rows, old[:index])
-			copy(data.Rows[index+1:], old[index:])
-		} else {
-			data.Rows = make([]*HistogramInt64Row, len(old))
-			copy(data.Rows, old)
-			v = *data.Rows[index]
-		}
-		oldValues := v.Values
-		v.Values = make([]int64, len(data.Info.Buckets))
-		copy(v.Values, oldValues)
-		f(&v)
-		data.Rows[index] = &v
-		frozen := *data
-		export.Metric(ctx, &frozen)
-	})
+	index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
+	old := data.Rows
+	var v HistogramInt64Row
+	if insert {
+		data.Rows = make([]*HistogramInt64Row, len(old)+1)
+		copy(data.Rows, old[:index])
+		copy(data.Rows[index+1:], old[index:])
+	} else {
+		data.Rows = make([]*HistogramInt64Row, len(old))
+		copy(data.Rows, old)
+		v = *data.Rows[index]
+	}
+	oldValues := v.Values
+	v.Values = make([]int64, len(data.Info.Buckets))
+	copy(v.Values, oldValues)
+	f(&v)
+	data.Rows[index] = &v
+	frozen := *data
+	export.Metric(ctx, &frozen)
 }
 
 func (data *HistogramInt64Data) record(ctx context.Context, measure *stats.Int64Measure, value int64) {
@@ -326,27 +320,25 @@
 func (data *HistogramFloat64Data) Groups() []telemetry.TagList { return data.groups }
 
 func (data *HistogramFloat64Data) modify(ctx context.Context, f func(v *HistogramFloat64Row)) {
-	export.Do(func() {
-		index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
-		old := data.Rows
-		var v HistogramFloat64Row
-		if insert {
-			data.Rows = make([]*HistogramFloat64Row, len(old)+1)
-			copy(data.Rows, old[:index])
-			copy(data.Rows[index+1:], old[index:])
-		} else {
-			data.Rows = make([]*HistogramFloat64Row, len(old))
-			copy(data.Rows, old)
-			v = *data.Rows[index]
-		}
-		oldValues := v.Values
-		v.Values = make([]int64, len(data.Info.Buckets))
-		copy(v.Values, oldValues)
-		f(&v)
-		data.Rows[index] = &v
-		frozen := *data
-		export.Metric(ctx, &frozen)
-	})
+	index, insert := getGroup(ctx, &data.groups, data.Info.Keys)
+	old := data.Rows
+	var v HistogramFloat64Row
+	if insert {
+		data.Rows = make([]*HistogramFloat64Row, len(old)+1)
+		copy(data.Rows, old[:index])
+		copy(data.Rows[index+1:], old[index:])
+	} else {
+		data.Rows = make([]*HistogramFloat64Row, len(old))
+		copy(data.Rows, old)
+		v = *data.Rows[index]
+	}
+	oldValues := v.Values
+	v.Values = make([]int64, len(data.Info.Buckets))
+	copy(v.Values, oldValues)
+	f(&v)
+	data.Rows[index] = &v
+	frozen := *data
+	export.Metric(ctx, &frozen)
 }
 
 func (data *HistogramFloat64Data) record(ctx context.Context, measure *stats.Float64Measure, value float64) {
diff --git a/internal/telemetry/stats/stats.go b/internal/telemetry/stats/stats.go
index 807944f..a2b97d5 100644
--- a/internal/telemetry/stats/stats.go
+++ b/internal/telemetry/stats/stats.go
@@ -69,9 +69,11 @@
 
 // Record delivers a new value to the subscribers of this measure.
 func (m *Int64Measure) Record(ctx context.Context, value int64) {
-	for _, s := range m.subscribers {
-		s(ctx, m, value)
-	}
+	do(func() {
+		for _, s := range m.subscribers {
+			s(ctx, m, value)
+		}
+	})
 }
 
 // Name returns the name this measure was given on construction.
@@ -88,7 +90,9 @@
 
 // Record delivers a new value to the subscribers of this measure.
 func (m *Float64Measure) Record(ctx context.Context, value float64) {
-	for _, s := range m.subscribers {
-		s(ctx, m, value)
-	}
+	do(func() {
+		for _, s := range m.subscribers {
+			s(ctx, m, value)
+		}
+	})
 }
diff --git a/internal/telemetry/export/worker.go b/internal/telemetry/stats/worker.go
similarity index 89%
rename from internal/telemetry/export/worker.go
rename to internal/telemetry/stats/worker.go
index 0458fdd..e690a2c 100644
--- a/internal/telemetry/export/worker.go
+++ b/internal/telemetry/stats/worker.go
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-package export
+package stats
 
 import (
 	"fmt"
@@ -23,14 +23,14 @@
 	}()
 }
 
-// Do adds a task to the list of things to work on in the background.
+// do adds a task to the list of things to work on in the background.
 // All tasks will be handled in submission order, and no two tasks will happen
 // concurrently so they do not need to do any kind of locking.
 // It is safe however to call Do concurrently.
 // No promises are made about when the tasks will be performed.
 // This function may block, but in general it will return very quickly and
 // before the task has been run.
-func Do(task func()) {
+func do(task func()) {
 	select {
 	case workQueue <- task:
 	default: