internal/workflow: set a watchdog on tasks by default
We have an ongoing problem with gomotes that freeze up during the
release process. I've seen hangs during tests and MSI fetching. Maybe
we're missing a keepalive somewhere, maybe there's a bug in the
coordinator or the buildlet, who knows.
Rather than try to set a deadline everywhere, which seems error-prone
and annoying, add a watchdog to each task that is reset when they log or
explicitly heartbeat.
The predecessors to this CL should make sure that every task logs at a
reasonable frequency. The long pole is long test builders; the cmd/go
tests can take up to 4 minutes, during which there might not be any
output. Set the watchdog to a very generous 10 minutes, and have
AwaitCondition heartbeat at half that interval.
Fixes golang/go#54134.
Change-Id: I921ff9d9b695efac45015846ebd57dbb398aab89
Reviewed-on: https://go-review.googlesource.com/c/build/+/420221
Run-TryBot: Heschi Kreinick <heschi@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Jenny Rakoczy <jenny@golang.org>
diff --git a/internal/task/task.go b/internal/task/task.go
index 129251f..e1a1aa9 100644
--- a/internal/task/task.go
+++ b/internal/task/task.go
@@ -25,16 +25,13 @@
func AwaitCondition[T any](ctx *workflow.TaskContext, period time.Duration, condition func() (T, bool, error)) (T, error) {
pollTimer := time.NewTicker(period / time.Duration(AwaitDivisor))
defer pollTimer.Stop()
- heartbeatTimer := time.NewTicker(time.Minute)
- defer heartbeatTimer.Stop()
for {
select {
case <-ctx.Done():
var zero T
return zero, ctx.Err()
- case <-heartbeatTimer.C:
- // TODO: reset watchdog
case <-pollTimer.C:
+ ctx.ResetWatchdog()
res, done, err := condition()
if done || err != nil {
return res, err
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index 465d0c1..353b786 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -43,6 +43,7 @@
"fmt"
"reflect"
"strings"
+ "time"
"github.com/google/uuid"
)
@@ -378,17 +379,28 @@
// A TaskContext is a context.Context, plus workflow-related features.
type TaskContext struct {
- context.Context
disableRetries bool
- Logger
+ context.Context
+ Logger Logger
TaskName string
WorkflowID uuid.UUID
+
+ watchdogTimer *time.Timer
+}
+
+func (c *TaskContext) Printf(format string, v ...interface{}) {
+ c.ResetWatchdog()
+ c.Logger.Printf(format, v...)
}
func (c *TaskContext) DisableRetries() {
c.disableRetries = true
}
+func (c *TaskContext) ResetWatchdog() {
+ c.watchdogTimer.Reset(WatchdogDelay)
+}
+
// A Listener is used to notify the workflow host of state changes, for display
// and persistence.
type Listener interface {
@@ -671,20 +683,29 @@
}
// Maximum number of retries. This could be a workflow property.
-const maxRetries = 3
+var MaxRetries = 3
+
+var WatchdogDelay = 10 * time.Minute
func (w *Workflow) runTask(ctx context.Context, listener Listener, state taskState, args []reflect.Value) taskState {
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
tctx := &TaskContext{
- Context: ctx,
- Logger: listener.Logger(w.ID, state.def.name),
- TaskName: state.def.name,
- WorkflowID: w.ID,
+ Context: ctx,
+ Logger: listener.Logger(w.ID, state.def.name),
+ TaskName: state.def.name,
+ WorkflowID: w.ID,
+ watchdogTimer: time.AfterFunc(WatchdogDelay, cancel),
}
+
in := append([]reflect.Value{reflect.ValueOf(tctx)}, args...)
fv := reflect.ValueOf(state.def.f)
out := fv.Call(in)
- if errIdx := len(out) - 1; !out[errIdx].IsNil() {
+ if !tctx.watchdogTimer.Stop() {
+ state.err = fmt.Errorf("task did not log for %v, assumed hung", WatchdogDelay)
+ } else if errIdx := len(out) - 1; !out[errIdx].IsNil() {
state.err = out[errIdx].Interface().(error)
}
state.finished = true
@@ -698,8 +719,8 @@
}
}
- if state.err != nil && !tctx.disableRetries && state.retryCount+1 < maxRetries {
- tctx.Printf("task failed, will retry (%v of %v): %v", state.retryCount+1, maxRetries, state.err)
+ if state.err != nil && !tctx.disableRetries && state.retryCount+1 < MaxRetries {
+ tctx.Printf("task failed, will retry (%v of %v): %v", state.retryCount+1, MaxRetries, state.err)
state = taskState{
def: state.def,
w: state.w,
diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go
index cb4ea48..de061d9 100644
--- a/internal/workflow/workflow_test.go
+++ b/internal/workflow/workflow_test.go
@@ -277,6 +277,50 @@
}
}
+func TestWatchdog(t *testing.T) {
+ t.Run("success", func(t *testing.T) {
+ testWatchdog(t, true)
+ })
+ t.Run("failure", func(t *testing.T) {
+ testWatchdog(t, false)
+ })
+}
+
+func testWatchdog(t *testing.T, success bool) {
+ defer func(r int, d time.Duration) {
+ wf.MaxRetries = r
+ wf.WatchdogDelay = d
+ }(wf.MaxRetries, wf.WatchdogDelay)
+ wf.MaxRetries = 1
+ wf.WatchdogDelay = 750 * time.Millisecond
+
+ maybeLog := func(ctx *wf.TaskContext) (string, error) {
+ select {
+ case <-ctx.Done():
+ return "", ctx.Err()
+ case <-time.After(500 * time.Millisecond):
+ }
+ if success {
+ ctx.Printf("*snore*")
+ }
+ select {
+ case <-ctx.Done():
+ return "", ctx.Err()
+ case <-time.After(500 * time.Millisecond):
+ }
+ return "huh? what?", nil
+ }
+
+ wd := wf.New()
+ wf.Output(wd, "result", wf.Task0(wd, "sleepy", maybeLog))
+
+ w := startWorkflow(t, wd, nil)
+ _, err := w.Run(context.Background(), &verboseListener{t: t})
+ if err == nil != success {
+ t.Errorf("got error %v, wanted success: %v", err, success)
+ }
+}
+
func TestLogging(t *testing.T) {
log := func(ctx *wf.TaskContext, arg string) (string, error) {
ctx.Printf("logging argument: %v", arg)