internal/relui: add GCS and staging support

Plumb through the FS implementations we can use to store scratch files
and stage binaries for signing. Use it to do both, replacing the prior
WorkingFiles interface.

Includes some enhancements to the OS fs implementation, reflecting the
fact that it's more of a test fake for the GCS fs than standalone. In
particular, directories are implicitly created.

Introduce an artifact type, used to track the data we'll eventually need
to register it with the web server. This changes slightly in the next CL.

For golang/go#51797.

Change-Id: Iebf53eb43587f5311e428c319d2f3b44aea9fa97
Reviewed-on: https://go-review.googlesource.com/c/build/+/403056
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
Run-TryBot: Heschi Kreinick <heschi@google.com>
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/cmd/relui/deployment-prod.yaml b/cmd/relui/deployment-prod.yaml
index 16ba7e2..73a6c5a 100644
--- a/cmd/relui/deployment-prod.yaml
+++ b/cmd/relui/deployment-prod.yaml
@@ -31,6 +31,8 @@
             - "--gerrit-api-secret=secret:symbolic-datum-552/gobot-password"
             - "--twitter-api-secret=secret:symbolic-datum-552/twitter-api-secret"
             - "--builder-master-key=secret:symbolic-datum-552/builder-master-key"
+            - "--scratch-files-base=gs://golang-release-staging/relui-scratch/"
+            - "--staging-files-base=gs://golang-release-staging/"
           ports:
             - containerPort: 444
           env:
diff --git a/cmd/relui/main.go b/cmd/relui/main.go
index 8d85533..30e175b 100644
--- a/cmd/relui/main.go
+++ b/cmd/relui/main.go
@@ -13,10 +13,11 @@
 	"fmt"
 	"io"
 	"log"
+	"math/rand"
 	"net/url"
-	"os"
-	"path/filepath"
+	"time"
 
+	"cloud.google.com/go/storage"
 	"github.com/jackc/pgx/v4/pgxpool"
 	"golang.org/x/build"
 	"golang.org/x/build/buildlet"
@@ -35,9 +36,13 @@
 	downUp      = flag.Bool("migrate-down-up", false, "Run all Up migration steps, then the last down migration step, followed by the final up migration. Exits after completion.")
 	migrateOnly = flag.Bool("migrate-only", false, "Exit after running migrations. Migrations are run by default.")
 	pgConnect   = flag.String("pg-connect", "", "Postgres connection string or URI. If empty, libpq connection defaults are used.")
+
+	scratchFilesBase = flag.String("scratch-files-base", "", "Storage for scratch files. gs://bucket/path or file:///path/to/scratch.")
+	stagingFilesBase = flag.String("staging-files-base", "", "Storage for staging files. gs://bucket/path or file:///path/to/staging.")
 )
 
 func main() {
+	rand.Seed(time.Now().Unix())
 	if err := secret.InitFlagSupport(context.Background()); err != nil {
 		log.Fatalln(err)
 	}
@@ -91,11 +96,20 @@
 		},
 		Instance: build.ProdCoordinator,
 	}
-	log.Printf("Coordinator client: %#v", coordinator)
 	if _, err := coordinator.RemoteBuildlets(); err != nil {
-		log.Printf("Broken coordinator client: %v", err)
+		log.Fatalf("Broken coordinator client: %v", err)
 	}
-	relui.RegisterBuildReleaseWorkflows(dh, &osFiles{"/tmp"}, coordinator.CreateBuildlet)
+	gcsClient, err := storage.NewClient(ctx)
+	if err != nil {
+		log.Fatalf("Could not connect to GCS: %v", err)
+	}
+	releaseTasks := &relui.BuildReleaseTasks{
+		CreateBuildlet: coordinator.CreateBuildlet,
+		GCSClient:      gcsClient,
+		ScratchURL:     *scratchFilesBase,
+		StagingURL:     *stagingFilesBase,
+	}
+	releaseTasks.RegisterBuildReleaseWorkflows(dh)
 	db, err := pgxpool.Connect(ctx, *pgConnect)
 	if err != nil {
 		log.Fatal(err)
@@ -125,15 +139,3 @@
 	io.WriteString(h, principal)
 	return fmt.Sprintf("%x", h.Sum(nil))
 }
-
-type osFiles struct {
-	basePath string
-}
-
-func (f *osFiles) Create(path string) (io.WriteCloser, error) {
-	return os.Create(filepath.Join(f.basePath, path))
-}
-
-func (f *osFiles) Open(path string) (io.ReadCloser, error) {
-	return os.Open(filepath.Join(f.basePath, path))
-}
diff --git a/internal/gcsfs/gcsfs.go b/internal/gcsfs/gcsfs.go
index 88234f9..c44e176 100644
--- a/internal/gcsfs/gcsfs.go
+++ b/internal/gcsfs/gcsfs.go
@@ -8,8 +8,10 @@
 import (
 	"context"
 	"errors"
+	"fmt"
 	"io"
 	"io/fs"
+	"net/url"
 	"path"
 	"strings"
 	"time"
@@ -18,6 +20,30 @@
 	"google.golang.org/api/iterator"
 )
 
+// FromURL creates a new FS from a file:// or gs:// URL.
+// client is only used for gs:// URLs and can be nil otherwise.
+func FromURL(ctx context.Context, client *storage.Client, base string) (fs.FS, error) {
+	u, err := url.Parse(base)
+	if err != nil {
+		return nil, err
+	}
+	switch u.Scheme {
+	case "gs":
+		if u.Host == "" {
+			return nil, fmt.Errorf("missing bucket in %q", base)
+		}
+		fsys := NewFS(ctx, client, u.Host)
+		if prefix := strings.TrimPrefix(u.Path, "/"); prefix != "" {
+			return fs.Sub(fsys, prefix)
+		}
+		return fsys, nil
+	case "file":
+		return DirFS(u.Path), nil
+	default:
+		return nil, fmt.Errorf("unsupported scheme %q", u.Scheme)
+	}
+}
+
 // Create creates a new file on fsys, which must be a CreateFS.
 func Create(fsys fs.FS, name string) (WriteFile, error) {
 	cfs, ok := fsys.(CreateFS)
diff --git a/internal/gcsfs/osfs.go b/internal/gcsfs/osfs.go
index f9da94e..84ea66a 100644
--- a/internal/gcsfs/osfs.go
+++ b/internal/gcsfs/osfs.go
@@ -10,6 +10,8 @@
 var _ = fs.FS((*dirFS)(nil))
 var _ = CreateFS((*dirFS)(nil))
 
+// DirFS is a variant of os.DirFS that supports file creation and is a suitable
+// test fake for the GCS FS.
 func DirFS(dir string) fs.FS {
 	return dirFS(dir)
 }
@@ -53,7 +55,11 @@
 	if !fs.ValidPath(name) || runtime.GOOS == "windows" && containsAny(name, `\:`) {
 		return nil, &fs.PathError{Op: "open", Path: name, Err: fs.ErrInvalid}
 	}
-	f, err := os.Create(string(dir) + "/" + name)
+	fullName := path.Join(string(dir), name)
+	if err := os.MkdirAll(path.Dir(fullName), 0700); err != nil {
+		return nil, err
+	}
+	f, err := os.Create(fullName)
 	if err != nil {
 		return nil, err
 	}
diff --git a/internal/relui/workflows.go b/internal/relui/workflows.go
index 7e11c19..f6ae866 100644
--- a/internal/relui/workflows.go
+++ b/internal/relui/workflows.go
@@ -5,13 +5,18 @@
 package relui
 
 import (
+	"crypto/sha256"
 	"fmt"
 	"io"
+	"math/rand"
+	"path"
 	"strings"
 	"sync"
 
+	"cloud.google.com/go/storage"
 	"golang.org/x/build/buildlet"
 	"golang.org/x/build/dashboard"
+	"golang.org/x/build/internal/gcsfs"
 	"golang.org/x/build/internal/releasetargets"
 	"golang.org/x/build/internal/task"
 	"golang.org/x/build/internal/workflow"
@@ -176,13 +181,13 @@
 	return arg, nil
 }
 
-func RegisterBuildReleaseWorkflows(h *DefinitionHolder, files WorkingFiles, createBuildlet func(string) (buildlet.Client, error)) {
-	go117, err := newBuildReleaseWorkflow("go1.17", files, createBuildlet)
+func (tasks *BuildReleaseTasks) RegisterBuildReleaseWorkflows(h *DefinitionHolder) {
+	go117, err := tasks.newBuildReleaseWorkflow("go1.17")
 	if err != nil {
 		panic(err)
 	}
 	h.RegisterDefinition("Release Go 1.17", go117)
-	go118, err := newBuildReleaseWorkflow("go1.18", files, createBuildlet)
+	go118, err := tasks.newBuildReleaseWorkflow("go1.18")
 	if err != nil {
 		panic(err)
 	}
@@ -190,7 +195,7 @@
 
 }
 
-func newBuildReleaseWorkflow(majorVersion string, files WorkingFiles, createBuildlet func(string) (buildlet.Client, error)) (*workflow.Definition, error) {
+func (tasks *BuildReleaseTasks) newBuildReleaseWorkflow(majorVersion string) (*workflow.Definition, error) {
 	wd := workflow.New()
 	targets, ok := releasetargets.TargetsForVersion(majorVersion)
 	if !ok {
@@ -199,7 +204,6 @@
 	version := wd.Parameter(workflow.Parameter{Name: "Version", Example: "go1.10.1"})
 	revision := wd.Parameter(workflow.Parameter{Name: "Revision", Example: "release-branch.go1.10"})
 	skipTests := wd.Parameter(workflow.Parameter{Name: "Targets to skip testing (space-separated target names or 'all') (optional)"})
-	tasks := buildReleaseTasks{files, createBuildlet}
 
 	source := wd.Task("Build source archive", tasks.buildSource, revision, version)
 	// Artifact file paths.
@@ -230,44 +234,46 @@
 			testResults = append(testResults, long)
 		}
 	}
-	// Eventually we need to upload artifacts and perhaps summarize test results.
+	// Eventually we need to sign artifacts and perhaps summarize test results.
 	// For now, just mush them all together.
-	results := wd.Task("Combine results", combineResults, wd.Slice(artifacts), wd.Slice(testResults))
+	stagedArtifacts := wd.Task("Stage artifacts for signing", tasks.copyToStaging, wd.Slice(artifacts))
+	results := wd.Task("Combine results", combineResults, stagedArtifacts, wd.Slice(testResults))
 	wd.Output("Build results", results)
 	return wd, nil
 }
 
-// buildReleaseTasks serves as an adapter to the various build tasks in the task package.
-type buildReleaseTasks struct {
-	fs             WorkingFiles
-	createBuildlet func(string) (buildlet.Client, error)
+// BuildReleaseTasks serves as an adapter to the various build tasks in the task package.
+type BuildReleaseTasks struct {
+	GCSClient              *storage.Client
+	ScratchURL, StagingURL string
+	CreateBuildlet         func(string) (buildlet.Client, error)
 }
 
-func (b *buildReleaseTasks) buildSource(ctx *workflow.TaskContext, revision, version string) (string, error) {
-	return b.runBuildStep(ctx, nil, "", "", "source.tar.gz", func(_ *task.BuildletStep, _ io.Reader, w io.Writer) error {
+func (b *BuildReleaseTasks) buildSource(ctx *workflow.TaskContext, revision, version string) (artifact, error) {
+	return b.runBuildStep(ctx, nil, "", artifact{}, "source.tar.gz", func(_ *task.BuildletStep, _ io.Reader, w io.Writer) error {
 		return task.WriteSourceArchive(ctx, revision, version, w)
 	})
 }
 
-func (b *buildReleaseTasks) buildBinary(ctx *workflow.TaskContext, target *releasetargets.Target, source string) (string, error) {
+func (b *BuildReleaseTasks) buildBinary(ctx *workflow.TaskContext, target *releasetargets.Target, source artifact) (artifact, error) {
 	return b.runBuildStep(ctx, target, target.Builder, source, target.Name+"-binary.tar.gz", func(bs *task.BuildletStep, r io.Reader, w io.Writer) error {
 		return bs.BuildBinary(ctx, r, w)
 	})
 }
 
-func (b *buildReleaseTasks) buildMSI(ctx *workflow.TaskContext, target *releasetargets.Target, binary string) (string, error) {
+func (b *BuildReleaseTasks) buildMSI(ctx *workflow.TaskContext, target *releasetargets.Target, binary artifact) (artifact, error) {
 	return b.runBuildStep(ctx, target, target.Builder, binary, target.Name+".msi", func(bs *task.BuildletStep, r io.Reader, w io.Writer) error {
 		return bs.BuildMSI(ctx, r, w)
 	})
 }
 
-func (b *buildReleaseTasks) convertToZip(ctx *workflow.TaskContext, target *releasetargets.Target, binary string) (string, error) {
+func (b *BuildReleaseTasks) convertToZip(ctx *workflow.TaskContext, target *releasetargets.Target, binary artifact) (artifact, error) {
 	return b.runBuildStep(ctx, nil, "", binary, target.Name+".zip", func(_ *task.BuildletStep, r io.Reader, w io.Writer) error {
 		return task.ConvertTGZToZIP(r, w)
 	})
 }
 
-func (b *buildReleaseTasks) runTests(ctx *workflow.TaskContext, target *releasetargets.Target, buildlet, skipTests, binary string) (string, error) {
+func (b *BuildReleaseTasks) runTests(ctx *workflow.TaskContext, target *releasetargets.Target, buildlet, skipTests string, binary artifact) (string, error) {
 	skipped := skipTests == "all"
 	skipTargets := strings.Fields(skipTests)
 	for _, skip := range skipTargets {
@@ -277,11 +283,12 @@
 	}
 	if skipped {
 		ctx.Printf("Skipping test")
-		return "", nil
+		return "skipped", nil
 	}
-	return b.runBuildStep(ctx, target, buildlet, binary, "", func(bs *task.BuildletStep, r io.Reader, _ io.Writer) error {
+	_, err := b.runBuildStep(ctx, target, buildlet, binary, "", func(bs *task.BuildletStep, r io.Reader, _ io.Writer) error {
 		return bs.TestTarget(ctx, r)
 	})
+	return "", err
 }
 
 // runBuildStep is a convenience function that manages resources a build step might need.
@@ -289,27 +296,29 @@
 // If inputName is specified, it will be opened and passed as a Reader to f.
 // If outputName is specified, a unique filename will be generated based off it, the file
 // will be opened and passed as a Writer to f, and its name will be returned as the result.
-func (b *buildReleaseTasks) runBuildStep(
+func (b *BuildReleaseTasks) runBuildStep(
 	ctx *workflow.TaskContext,
 	target *releasetargets.Target,
-	buildletName, inputName, outputName string,
+	buildletName string,
+	input artifact,
+	outputBase string,
 	f func(*task.BuildletStep, io.Reader, io.Writer) error,
-) (string, error) {
+) (artifact, error) {
 	if (target == nil) != (buildletName == "") {
-		return "", fmt.Errorf("target and buildlet must be specified together")
+		return artifact{}, fmt.Errorf("target and buildlet must be specified together")
 	}
 
 	var step *task.BuildletStep
 	if target != nil {
 		ctx.Printf("Creating buildlet %v.", buildletName)
-		client, err := b.createBuildlet(buildletName)
+		client, err := b.CreateBuildlet(buildletName)
 		if err != nil {
-			return "", err
+			return artifact{}, err
 		}
 		defer client.Close()
 		buildConfig, ok := dashboard.Builders[buildletName]
 		if !ok {
-			return "", fmt.Errorf("unknown builder: %v", buildConfig)
+			return artifact{}, fmt.Errorf("unknown builder: %v", buildConfig)
 		}
 		step = &task.BuildletStep{
 			Target:      target,
@@ -320,46 +329,115 @@
 		ctx.Printf("Buildlet ready.")
 	}
 
+	scratchFS, err := gcsfs.FromURL(ctx, b.GCSClient, b.ScratchURL)
+	if err != nil {
+		return artifact{}, err
+	}
 	var in io.ReadCloser
-	var err error
-	if inputName != "" {
-		in, err = b.fs.Open(inputName)
+	if input.scratchPath != "" {
+		in, err = scratchFS.Open(input.scratchPath)
 		if err != nil {
-			return "", err
+			return artifact{}, err
 		}
 		defer in.Close()
 	}
 	var out io.WriteCloser
-	if outputName != "" {
-		out, err = b.fs.Create(outputName)
+	var outputName string
+	hash := sha256.New()
+	size := &sizeWriter{}
+	var multiOut io.Writer
+	if outputBase != "" {
+		outputName = fmt.Sprintf("%v/%v-%v", ctx.WorkflowID.String(), outputBase, rand.Int63())
+		out, err = gcsfs.Create(scratchFS, outputName)
 		if err != nil {
-			return "", err
+			return artifact{}, err
 		}
 		defer out.Close()
+		multiOut = io.MultiWriter(out, hash, size)
 	}
-	if err := f(step, in, out); err != nil {
-		return "", err
+	// Hide in's Close method from the task, which may assert it to Closer.
+	nopIn := io.NopCloser(in)
+	if err := f(step, nopIn, multiOut); err != nil {
+		return artifact{}, err
 	}
 	if step != nil {
 		if err := step.Buildlet.Close(); err != nil {
-			return "", err
+			return artifact{}, err
 		}
 	}
-	// Don't check the error from in.Close: the steps may assert it down to
-	// Closer and close it themselves. (See https://pkg.go.dev/net/http#NewRequestWithContext.)
+	if in != nil {
+		if err := in.Close(); err != nil {
+			return artifact{}, err
+		}
+	}
 	if out != nil {
 		if err := out.Close(); err != nil {
-			return "", err
+			return artifact{}, err
 		}
 	}
-	return outputName, nil
+	return artifact{
+		target:      target,
+		scratchPath: outputName,
+		filename:    outputBase,
+		sha256:      fmt.Sprintf("%x", string(hash.Sum([]byte(nil)))),
+		size:        size.size,
+	}, nil
 }
 
-func combineResults(ctx *workflow.TaskContext, artifacts, tests []string) (string, error) {
-	return strings.Join(artifacts, "\n") + strings.Join(tests, "\n"), nil
+type artifact struct {
+	target      *releasetargets.Target
+	scratchPath string
+	stagingPath string
+	filename    string
+	sha256      string
+	size        int
 }
 
-type WorkingFiles interface {
-	Create(string) (io.WriteCloser, error)
-	Open(string) (io.ReadCloser, error)
+type sizeWriter struct {
+	size int
+}
+
+func (w *sizeWriter) Write(p []byte) (n int, err error) {
+	w.size += len(p)
+	return len(p), nil
+}
+
+func combineResults(ctx *workflow.TaskContext, artifacts []artifact, tests []string) (string, error) {
+	return fmt.Sprintf("%#v\n\n", artifacts) + strings.Join(tests, "\n"), nil
+}
+
+func (tasks *BuildReleaseTasks) copyToStaging(ctx *workflow.TaskContext, artifacts []artifact) ([]artifact, error) {
+	scratchFS, err := gcsfs.FromURL(ctx, tasks.GCSClient, tasks.ScratchURL)
+	if err != nil {
+		return nil, err
+	}
+	stagingFS, err := gcsfs.FromURL(ctx, tasks.GCSClient, tasks.StagingURL)
+	if err != nil {
+		return nil, err
+	}
+	var stagedArtifacts []artifact
+	for _, a := range artifacts {
+		staged := a
+		staged.stagingPath = path.Join(ctx.WorkflowID.String(), a.filename)
+		stagedArtifacts = append(stagedArtifacts, staged)
+
+		in, err := scratchFS.Open(a.scratchPath)
+		if err != nil {
+			return nil, err
+		}
+		out, err := gcsfs.Create(stagingFS, staged.stagingPath)
+		if err != nil {
+			return nil, err
+		}
+		if _, err := io.Copy(out, in); err != nil {
+			return nil, err
+		}
+		if err := in.Close(); err != nil {
+			return nil, err
+		}
+		if err := out.Close(); err != nil {
+			return nil, err
+		}
+	}
+	return artifacts, nil
 }
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index 13035f3..068076b 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -254,6 +254,7 @@
 type TaskContext struct {
 	context.Context
 	Logger
+	WorkflowID uuid.UUID
 }
 
 // A Listener is used to notify the workflow host of state changes, for display
@@ -520,7 +521,11 @@
 }
 
 func (w *Workflow) runTask(ctx context.Context, listener Listener, state taskState, args []reflect.Value) taskState {
-	tctx := &TaskContext{Context: ctx, Logger: listener.Logger(w.ID, state.def.name)}
+	tctx := &TaskContext{
+		Context:    ctx,
+		Logger:     listener.Logger(w.ID, state.def.name),
+		WorkflowID: w.ID,
+	}
 	in := append([]reflect.Value{reflect.ValueOf(tctx)}, args...)
 	out := reflect.ValueOf(state.def.f).Call(in)
 	var err error