internal/task: log buildlet output in chunks

Rather than buffering buildlet output and only showing it in case of
error, stream it to a user-supplied Writer. In relui, buffer 10
seconds of output, or up to 1MB, then dump everything up to the most
recent newline to the logger.

For golang/go#54134.

Change-Id: Ie0eb9af3aa6c3455afd03b358722924e38058063
Reviewed-on: https://go-review.googlesource.com/c/build/+/420541
Reviewed-by: Jenny Rakoczy <jenny@golang.org>
Run-TryBot: Heschi Kreinick <heschi@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/cmd/release/release.go b/cmd/release/release.go
index b4cdab2..adab397 100644
--- a/cmd/release/release.go
+++ b/cmd/release/release.go
@@ -151,7 +151,7 @@
 			Target:      target,
 			Buildlet:    client,
 			BuildConfig: buildConfig,
-			Watch:       watch,
+			LogWriter:   os.Stdout,
 		}
 		if err := f(buildletStep); err != nil {
 			return err
diff --git a/internal/relui/workflows.go b/internal/relui/workflows.go
index f448f00..7dadbbb 100644
--- a/internal/relui/workflows.go
+++ b/internal/relui/workflows.go
@@ -761,11 +761,13 @@
 			return artifact{}, err
 		}
 		defer client.Close()
+		w := &logWriter{logger: ctx.Logger}
+		go w.run(ctx)
 		step = &task.BuildletStep{
 			Target:      target,
 			Buildlet:    client,
 			BuildConfig: build,
-			Watch:       true,
+			LogWriter:   w,
 		}
 		ctx.Printf("Buildlet ready.")
 	}
@@ -866,6 +868,62 @@
 	return len(p), nil
 }
 
+type logWriter struct {
+	flushTicker *time.Ticker
+
+	mu     sync.Mutex
+	buf    []byte
+	logger wf.Logger
+}
+
+func (w *logWriter) Write(b []byte) (int, error) {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+
+	w.buf = append(w.buf, b...)
+	if len(w.buf) > 1<<20 {
+		w.flushLocked(false)
+		w.flushTicker.Reset(10 * time.Second)
+	}
+	return len(b), nil
+}
+
+func (w *logWriter) flush(force bool) {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	w.flushLocked(force)
+}
+
+func (w *logWriter) flushLocked(force bool) {
+	if len(w.buf) == 0 {
+		return
+	}
+	log, rest := w.buf, []byte(nil)
+	if !force {
+		nl := bytes.LastIndexByte(w.buf, '\n')
+		if nl == -1 {
+			return
+		}
+		log, rest = w.buf[:nl], w.buf[nl+1:]
+	}
+	w.logger.Printf("\n%s", string(log))
+	w.buf = append([]byte(nil), rest...) // don't leak
+}
+
+func (w *logWriter) run(ctx context.Context) {
+	w.flushTicker = time.NewTicker(10 * time.Second)
+	defer w.flushTicker.Stop()
+	for {
+		select {
+		case <-w.flushTicker.C:
+			w.flush(false)
+		case <-ctx.Done():
+			w.flush(true)
+			return
+		}
+	}
+}
+
 func (tasks *BuildReleaseTasks) startSigningCommand(ctx *wf.TaskContext, version string) (string, error) {
 	args := fmt.Sprintf("--relui_staging=%q", tasks.ScratchURL+"/"+signingStagingDir(ctx, version))
 	ctx.Printf("run signer with " + args)
diff --git a/internal/task/buildrelease.go b/internal/task/buildrelease.go
index 355ab3e..0a93f12 100644
--- a/internal/task/buildrelease.go
+++ b/internal/task/buildrelease.go
@@ -7,14 +7,12 @@
 import (
 	"archive/tar"
 	"archive/zip"
-	"bytes"
 	"compress/gzip"
 	"context"
 	_ "embed"
 	"fmt"
 	"io"
 	"net/http"
-	"os"
 	"path"
 	"regexp"
 	"strings"
@@ -219,7 +217,7 @@
 	Target      *releasetargets.Target
 	Buildlet    buildlet.Client
 	BuildConfig *dashboard.BuildConfig
-	Watch       bool
+	LogWriter   io.Writer
 }
 
 // BuildBinary builds a binary distribution from sourceArchive and writes it to out.
@@ -394,19 +392,15 @@
 	// Set up build environment. The caller's environment wins if there's a conflict.
 	env := append(b.BuildConfig.Env(), "GOPATH="+work+"/gopath")
 	env = append(env, opts.ExtraEnv...)
-	out := &bytes.Buffer{}
-	opts.Output = out
+	opts.Output = b.LogWriter
 	opts.ExtraEnv = env
 	opts.Args = args
-	if b.Watch {
-		opts.Output = io.MultiWriter(opts.Output, os.Stdout)
-	}
 	remoteErr, execErr := b.Buildlet.Exec(ctx, cmd, opts)
 	if execErr != nil {
 		return execErr
 	}
 	if remoteErr != nil {
-		return fmt.Errorf("Command %v %s failed: %v\nOutput:\n%v", cmd, args, remoteErr, out)
+		return fmt.Errorf("Command %v %s failed: %v", cmd, args, remoteErr)
 	}
 
 	return nil