internal/telemtry: changed to a simpler threading model for stats
this is not a final solution, but it makes it easier to debug
and reason about, and does not require a go routine or buffered
channel
Change-Id: I758758ac80fcd525ab5264e34c48941766a8db11
Reviewed-on: https://go-review.googlesource.com/c/tools/+/208664
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Emmanuel Odeke <emm.odeke@gmail.com>
diff --git a/internal/telemetry/stats/stats.go b/internal/telemetry/stats/stats.go
index e6eb364..f3d3ede 100644
--- a/internal/telemetry/stats/stats.go
+++ b/internal/telemetry/stats/stats.go
@@ -9,6 +9,7 @@
import (
"context"
+ "sync"
"time"
"golang.org/x/tools/internal/telemetry/unit"
@@ -19,6 +20,7 @@
name string
description string
unit unit.Unit
+ mu sync.Mutex
subscribers []Int64Subscriber
}
@@ -27,6 +29,7 @@
name string
description string
unit unit.Unit
+ mu sync.Mutex
subscribers []Float64Subscriber
}
@@ -66,16 +69,20 @@
func (m *Int64Measure) Unit() unit.Unit { return m.unit }
// Subscribe adds a new subscriber to this measure.
-func (m *Int64Measure) Subscribe(s Int64Subscriber) { m.subscribers = append(m.subscribers, s) }
+func (m *Int64Measure) Subscribe(s Int64Subscriber) {
+ m.mu.Lock()
+ m.subscribers = append(m.subscribers, s)
+ m.mu.Unlock()
+}
// Record delivers a new value to the subscribers of this measure.
func (m *Int64Measure) Record(ctx context.Context, value int64) {
at := time.Now()
- do(func() {
- for _, s := range m.subscribers {
- s(ctx, m, value, at)
- }
- })
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ for _, s := range m.subscribers {
+ s(ctx, m, value, at)
+ }
}
// Name returns the name this measure was given on construction.
@@ -88,14 +95,18 @@
func (m *Float64Measure) Unit() unit.Unit { return m.unit }
// Subscribe adds a new subscriber to this measure.
-func (m *Float64Measure) Subscribe(s Float64Subscriber) { m.subscribers = append(m.subscribers, s) }
+func (m *Float64Measure) Subscribe(s Float64Subscriber) {
+ m.mu.Lock()
+ m.subscribers = append(m.subscribers, s)
+ m.mu.Unlock()
+}
// Record delivers a new value to the subscribers of this measure.
func (m *Float64Measure) Record(ctx context.Context, value float64) {
at := time.Now()
- do(func() {
- for _, s := range m.subscribers {
- s(ctx, m, value, at)
- }
- })
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ for _, s := range m.subscribers {
+ s(ctx, m, value, at)
+ }
}
diff --git a/internal/telemetry/stats/worker.go b/internal/telemetry/stats/worker.go
deleted file mode 100644
index e690a2c..0000000
--- a/internal/telemetry/stats/worker.go
+++ /dev/null
@@ -1,40 +0,0 @@
-// Copyright 2019 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package stats
-
-import (
- "fmt"
- "os"
-)
-
-var (
- // TODO: Think about whether this is the right concurrency model, and what
- // TODO: the queue length should be
- workQueue = make(chan func(), 1000)
-)
-
-func init() {
- go func() {
- for task := range workQueue {
- task()
- }
- }()
-}
-
-// do adds a task to the list of things to work on in the background.
-// All tasks will be handled in submission order, and no two tasks will happen
-// concurrently so they do not need to do any kind of locking.
-// It is safe however to call Do concurrently.
-// No promises are made about when the tasks will be performed.
-// This function may block, but in general it will return very quickly and
-// before the task has been run.
-func do(task func()) {
- select {
- case workQueue <- task:
- default:
- fmt.Fprint(os.Stderr, "work queue is full\n")
- workQueue <- task
- }
-}