internal/workflow: set a watchdog on tasks by default

We have an ongoing problem with gomotes that freeze up during the
release process. I've seen hangs during tests and MSI fetching. Maybe
we're missing a keepalive somewhere, maybe there's a bug in the
coordinator or the buildlet, who knows.

Rather than try to set a deadline everywhere, which seems error-prone
and annoying, add a watchdog to each task that is reset when they log or
explicitly heartbeat.

The predecessors to this CL should make sure that every task logs at a
reasonable frequency. The long pole is long test builders; the cmd/go
tests can take up to 4 minutes, during which there might not be any
output. Set the watchdog to a very generous 10 minutes, and have
AwaitCondition heartbeat at half that interval.

Fixes golang/go#54134.

Change-Id: I921ff9d9b695efac45015846ebd57dbb398aab89
Reviewed-on: https://go-review.googlesource.com/c/build/+/420221
Run-TryBot: Heschi Kreinick <heschi@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Jenny Rakoczy <jenny@golang.org>
diff --git a/internal/task/task.go b/internal/task/task.go
index 129251f..e1a1aa9 100644
--- a/internal/task/task.go
+++ b/internal/task/task.go
@@ -25,16 +25,13 @@
 func AwaitCondition[T any](ctx *workflow.TaskContext, period time.Duration, condition func() (T, bool, error)) (T, error) {
 	pollTimer := time.NewTicker(period / time.Duration(AwaitDivisor))
 	defer pollTimer.Stop()
-	heartbeatTimer := time.NewTicker(time.Minute)
-	defer heartbeatTimer.Stop()
 	for {
 		select {
 		case <-ctx.Done():
 			var zero T
 			return zero, ctx.Err()
-		case <-heartbeatTimer.C:
-			// TODO: reset watchdog
 		case <-pollTimer.C:
+			ctx.ResetWatchdog()
 			res, done, err := condition()
 			if done || err != nil {
 				return res, err
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index 465d0c1..353b786 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -43,6 +43,7 @@
 	"fmt"
 	"reflect"
 	"strings"
+	"time"
 
 	"github.com/google/uuid"
 )
@@ -378,17 +379,28 @@
 
 // A TaskContext is a context.Context, plus workflow-related features.
 type TaskContext struct {
-	context.Context
 	disableRetries bool
-	Logger
+	context.Context
+	Logger     Logger
 	TaskName   string
 	WorkflowID uuid.UUID
+
+	watchdogTimer *time.Timer
+}
+
+func (c *TaskContext) Printf(format string, v ...interface{}) {
+	c.ResetWatchdog()
+	c.Logger.Printf(format, v...)
 }
 
 func (c *TaskContext) DisableRetries() {
 	c.disableRetries = true
 }
 
+func (c *TaskContext) ResetWatchdog() {
+	c.watchdogTimer.Reset(WatchdogDelay)
+}
+
 // A Listener is used to notify the workflow host of state changes, for display
 // and persistence.
 type Listener interface {
@@ -671,20 +683,29 @@
 }
 
 // Maximum number of retries. This could be a workflow property.
-const maxRetries = 3
+var MaxRetries = 3
+
+var WatchdogDelay = 10 * time.Minute
 
 func (w *Workflow) runTask(ctx context.Context, listener Listener, state taskState, args []reflect.Value) taskState {
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+
 	tctx := &TaskContext{
-		Context:    ctx,
-		Logger:     listener.Logger(w.ID, state.def.name),
-		TaskName:   state.def.name,
-		WorkflowID: w.ID,
+		Context:       ctx,
+		Logger:        listener.Logger(w.ID, state.def.name),
+		TaskName:      state.def.name,
+		WorkflowID:    w.ID,
+		watchdogTimer: time.AfterFunc(WatchdogDelay, cancel),
 	}
+
 	in := append([]reflect.Value{reflect.ValueOf(tctx)}, args...)
 	fv := reflect.ValueOf(state.def.f)
 	out := fv.Call(in)
 
-	if errIdx := len(out) - 1; !out[errIdx].IsNil() {
+	if  !tctx.watchdogTimer.Stop() {
+		state.err = fmt.Errorf("task did not log for %v, assumed hung", WatchdogDelay)
+	} else if errIdx := len(out) - 1; !out[errIdx].IsNil() {
 		state.err = out[errIdx].Interface().(error)
 	}
 	state.finished = true
@@ -698,8 +719,8 @@
 		}
 	}
 
-	if state.err != nil && !tctx.disableRetries && state.retryCount+1 < maxRetries {
-		tctx.Printf("task failed, will retry (%v of %v): %v", state.retryCount+1, maxRetries, state.err)
+	if state.err != nil && !tctx.disableRetries && state.retryCount+1 < MaxRetries {
+		tctx.Printf("task failed, will retry (%v of %v): %v", state.retryCount+1, MaxRetries, state.err)
 		state = taskState{
 			def:        state.def,
 			w:          state.w,
diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go
index cb4ea48..de061d9 100644
--- a/internal/workflow/workflow_test.go
+++ b/internal/workflow/workflow_test.go
@@ -277,6 +277,50 @@
 	}
 }
 
+func TestWatchdog(t *testing.T) {
+	t.Run("success", func(t *testing.T) {
+		testWatchdog(t, true)
+	})
+	t.Run("failure", func(t *testing.T) {
+		testWatchdog(t, false)
+	})
+}
+
+func testWatchdog(t *testing.T, success bool) {
+	defer func(r int, d time.Duration) {
+		wf.MaxRetries = r
+		wf.WatchdogDelay = d
+	}(wf.MaxRetries, wf.WatchdogDelay)
+	wf.MaxRetries = 1
+	wf.WatchdogDelay = 750 * time.Millisecond
+
+	maybeLog := func(ctx *wf.TaskContext) (string, error) {
+		select {
+		case <-ctx.Done():
+			return "", ctx.Err()
+		case <-time.After(500 * time.Millisecond):
+		}
+		if success {
+			ctx.Printf("*snore*")
+		}
+		select {
+		case <-ctx.Done():
+			return "", ctx.Err()
+		case <-time.After(500 * time.Millisecond):
+		}
+		return "huh? what?", nil
+	}
+
+	wd := wf.New()
+	wf.Output(wd, "result", wf.Task0(wd, "sleepy", maybeLog))
+
+	w := startWorkflow(t, wd, nil)
+	_, err := w.Run(context.Background(), &verboseListener{t: t})
+	if err == nil != success {
+		t.Errorf("got error %v, wanted success: %v", err, success)
+	}
+}
+
 func TestLogging(t *testing.T) {
 	log := func(ctx *wf.TaskContext, arg string) (string, error) {
 		ctx.Printf("logging argument: %v", arg)