internal/worker: break dependency between file copying and GCS

Reorganize the code that copies files from a GCS bucket to remove
the direct dependence on GCS. That will enable easier testing.

Also move it to scan.go, because it's not specific to vulncheck.

Change-Id: I9cd169d9577a7a08a7a849a099bd68437d8fa835
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/474535
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Zvonimir Pavlinovic <zpavlinovic@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/worker/analysis.go b/internal/worker/analysis.go
index c0b9326..cace21c 100644
--- a/internal/worker/analysis.go
+++ b/internal/worker/analysis.go
@@ -19,7 +19,6 @@
 	"path/filepath"
 	"strings"
 
-	"cloud.google.com/go/storage"
 	"golang.org/x/pkgsite-metrics/internal/analysis"
 	"golang.org/x/pkgsite-metrics/internal/derrors"
 	"golang.org/x/pkgsite-metrics/internal/log"
@@ -32,6 +31,7 @@
 
 type analysisServer struct {
 	*Server
+	openFile openFileFunc // Used to open binary files from GCS, except for testing.
 }
 
 const analysisBinariesBucketDir = "analysis-binaries"
@@ -112,7 +112,11 @@
 	} else {
 		destPath = path.Join(sandboxRoot, "binaries", path.Base(req.Binary))
 	}
-	if err := copyBinary(ctx, destPath, req.Binary, s.cfg.BinaryBucket); err != nil {
+	if s.cfg.BinaryBucket == "" {
+		return nil, nil, errors.New("missing binary bucket (define GO_ECOSYSTEM_BINARY_BUCKET)")
+	}
+	srcPath := path.Join(analysisBinariesBucketDir, req.Binary)
+	if err := copyToLocalFile(destPath, true, srcPath, s.openFile); err != nil {
 		return nil, nil, err
 	}
 	binaryHash, err = hashFile(destPath)
@@ -163,22 +167,6 @@
 	return h.Sum(nil), nil
 }
 
-// copyBinary copies a binary from srcPath to destPath.
-// If binaryBucket is non-empty, it reads srcPath from that GCS bucket.
-// If binaryBucket is empty, return an error.
-func copyBinary(ctx context.Context, destPath, srcPath, binaryBucket string) error {
-	if binaryBucket == "" {
-		return errors.New("missing binary bucket (define GO_ECOSYSTEM_BINARY_BUCKET)")
-	}
-	c, err := storage.NewClient(ctx)
-	if err != nil {
-		return err
-	}
-	bucket := c.Bucket(binaryBucket)
-	bucketPath := path.Join(analysisBinariesBucketDir, srcPath)
-	return copyFromGCS(ctx, bucket, bucketPath, destPath, true)
-}
-
 // Run the binary on the module.
 func runAnalysisBinary(sbox *sandbox.Sandbox, binaryPath, reqArgs, moduleDir string) (analysis.JSONTree, error) {
 	args := []string{"-json"}
diff --git a/internal/worker/scan.go b/internal/worker/scan.go
index 7c540b5..a3b5c37 100644
--- a/internal/worker/scan.go
+++ b/internal/worker/scan.go
@@ -9,6 +9,7 @@
 	"encoding/json"
 	"errors"
 	"fmt"
+	"io"
 	"net/http"
 	"os"
 	"os/exec"
@@ -17,6 +18,7 @@
 	"strings"
 	"sync/atomic"
 
+	"cloud.google.com/go/storage"
 	"golang.org/x/pkgsite-metrics/internal/bigquery"
 	"golang.org/x/pkgsite-metrics/internal/config"
 	"golang.org/x/pkgsite-metrics/internal/derrors"
@@ -155,3 +157,44 @@
 	}
 	return client.Upload(ctx, table, row)
 }
+
+type openFileFunc func(filename string) (io.ReadCloser, error)
+
+// copyToLocalFile opens destPath for writing locally, making it executable if specified.
+// It then uses openFile to open srcPath and copies it to the local file.
+func copyToLocalFile(destPath string, executable bool, srcPath string, openFile openFileFunc) (err error) {
+	defer derrors.Wrap(&err, "copyToFile(%q, %q)", destPath, srcPath)
+
+	var mode os.FileMode
+	if executable {
+		mode = 0755
+	} else {
+		mode = 0644
+	}
+	destf, err := os.OpenFile(destPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, mode)
+	if err != nil {
+		return err
+	}
+	rc, err := openFile(srcPath)
+	if err != nil {
+		return err
+	}
+	defer rc.Close()
+	return copyAndClose(destf, rc)
+}
+
+// copyAndClose copies r to wc and closes wc.
+func copyAndClose(wc io.WriteCloser, r io.Reader) error {
+	_, err := io.Copy(wc, r)
+	err2 := wc.Close()
+	if err == nil {
+		err = err2
+	}
+	return err
+}
+
+func gcsOpenFileFunc(ctx context.Context, bucket *storage.BucketHandle) openFileFunc {
+	return func(name string) (io.ReadCloser, error) {
+		return bucket.Object(name).NewReader(ctx)
+	}
+}
diff --git a/internal/worker/server.go b/internal/worker/server.go
index c7cd9fd..e3b8c92 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -17,6 +17,7 @@
 	"time"
 
 	"cloud.google.com/go/errorreporting"
+	"cloud.google.com/go/storage"
 	"github.com/google/safehtml/template"
 	"golang.org/x/pkgsite-metrics/internal/bigquery"
 	"golang.org/x/pkgsite-metrics/internal/config"
@@ -187,7 +188,19 @@
 }
 
 func (s *Server) registerAnalysisHandlers(ctx context.Context) error {
-	h := &analysisServer{s}
+	if s.cfg.BinaryBucket == "" {
+		return errors.New("missing binary bucket (define GO_ECOSYSTEM_BINARY_BUCKET)")
+	}
+	c, err := storage.NewClient(ctx)
+	if err != nil {
+		return err
+	}
+	bucket := c.Bucket(s.cfg.BinaryBucket)
+
+	h := &analysisServer{
+		Server:   s,
+		openFile: gcsOpenFileFunc(ctx, bucket),
+	}
 	s.handle("/analysis/scan/", h.handleScan)
 	s.handle("/analysis/enqueue", h.handleEnqueue)
 	return nil
diff --git a/internal/worker/vulncheck_scan.go b/internal/worker/vulncheck_scan.go
index 8bcd142..10bc700 100644
--- a/internal/worker/vulncheck_scan.go
+++ b/internal/worker/vulncheck_scan.go
@@ -400,7 +400,12 @@
 		return nil, err
 	}
 	defer os.Remove(destf.Name())
-	if err := copyFromGCSToWriter(ctx, destf, s.gcsBucket, gcsPathname); err != nil {
+	rc, err := s.gcsBucket.Object(gcsPathname).NewReader(ctx)
+	if err != nil {
+		return nil, err
+	}
+	defer rc.Close()
+	if err := copyAndClose(destf, rc); err != nil {
 		return nil, err
 	}
 	log.Infof(ctx, "%s@%s/%s: running vulncheck in sandbox on %s", modulePath, version, binDir, destf.Name())
@@ -604,7 +609,7 @@
 	log.Debug(ctx, "copying to temp dir",
 		"from", gcsPathname, "module", modulePath, "version", version, "dir", binDir)
 	localPathname := filepath.Join(tempDir, "binary")
-	if err := copyFromGCS(ctx, s.gcsBucket, gcsPathname, localPathname, false); err != nil {
+	if err := copyToLocalFile(localPathname, false, gcsPathname, gcsOpenFileFunc(ctx, s.gcsBucket)); err != nil {
 		return nil, err
 	}
 
@@ -630,26 +635,6 @@
 	return res.Vulns, nil
 }
 
-func copyFromGCS(ctx context.Context, bucket *storage.BucketHandle, srcPath, destPath string, executable bool) (err error) {
-	defer derrors.Wrap(&err, "copyFromGCS(%q, %q)", srcPath, destPath)
-	var mode os.FileMode
-	if executable {
-		mode = 0755
-	} else {
-		mode = 0644
-	}
-	destf, err := os.OpenFile(destPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, mode)
-	if err != nil {
-		return err
-	}
-	err1 := copyFromGCSToWriter(ctx, destf, bucket, srcPath)
-	err2 := destf.Close()
-	if err1 != nil {
-		return err1
-	}
-	return err2
-}
-
 func copyFromGCSToWriter(ctx context.Context, w io.Writer, bucket *storage.BucketHandle, srcPath string) error {
 	gcsReader, err := bucket.Object(srcPath).NewReader(ctx)
 	if err != nil {