internal/worker: break dependency between file copying and GCS
Reorganize the code that copies files from a GCS bucket to remove
the direct dependence on GCS. That will enable easier testing.
Also move it to scan.go, because it's not specific to vulncheck.
Change-Id: I9cd169d9577a7a08a7a849a099bd68437d8fa835
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/474535
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Zvonimir Pavlinovic <zpavlinovic@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/worker/analysis.go b/internal/worker/analysis.go
index c0b9326..cace21c 100644
--- a/internal/worker/analysis.go
+++ b/internal/worker/analysis.go
@@ -19,7 +19,6 @@
"path/filepath"
"strings"
- "cloud.google.com/go/storage"
"golang.org/x/pkgsite-metrics/internal/analysis"
"golang.org/x/pkgsite-metrics/internal/derrors"
"golang.org/x/pkgsite-metrics/internal/log"
@@ -32,6 +31,7 @@
type analysisServer struct {
*Server
+ openFile openFileFunc // Used to open binary files from GCS, except for testing.
}
const analysisBinariesBucketDir = "analysis-binaries"
@@ -112,7 +112,11 @@
} else {
destPath = path.Join(sandboxRoot, "binaries", path.Base(req.Binary))
}
- if err := copyBinary(ctx, destPath, req.Binary, s.cfg.BinaryBucket); err != nil {
+ if s.cfg.BinaryBucket == "" {
+ return nil, nil, errors.New("missing binary bucket (define GO_ECOSYSTEM_BINARY_BUCKET)")
+ }
+ srcPath := path.Join(analysisBinariesBucketDir, req.Binary)
+ if err := copyToLocalFile(destPath, true, srcPath, s.openFile); err != nil {
return nil, nil, err
}
binaryHash, err = hashFile(destPath)
@@ -163,22 +167,6 @@
return h.Sum(nil), nil
}
-// copyBinary copies a binary from srcPath to destPath.
-// If binaryBucket is non-empty, it reads srcPath from that GCS bucket.
-// If binaryBucket is empty, return an error.
-func copyBinary(ctx context.Context, destPath, srcPath, binaryBucket string) error {
- if binaryBucket == "" {
- return errors.New("missing binary bucket (define GO_ECOSYSTEM_BINARY_BUCKET)")
- }
- c, err := storage.NewClient(ctx)
- if err != nil {
- return err
- }
- bucket := c.Bucket(binaryBucket)
- bucketPath := path.Join(analysisBinariesBucketDir, srcPath)
- return copyFromGCS(ctx, bucket, bucketPath, destPath, true)
-}
-
// Run the binary on the module.
func runAnalysisBinary(sbox *sandbox.Sandbox, binaryPath, reqArgs, moduleDir string) (analysis.JSONTree, error) {
args := []string{"-json"}
diff --git a/internal/worker/scan.go b/internal/worker/scan.go
index 7c540b5..a3b5c37 100644
--- a/internal/worker/scan.go
+++ b/internal/worker/scan.go
@@ -9,6 +9,7 @@
"encoding/json"
"errors"
"fmt"
+ "io"
"net/http"
"os"
"os/exec"
@@ -17,6 +18,7 @@
"strings"
"sync/atomic"
+ "cloud.google.com/go/storage"
"golang.org/x/pkgsite-metrics/internal/bigquery"
"golang.org/x/pkgsite-metrics/internal/config"
"golang.org/x/pkgsite-metrics/internal/derrors"
@@ -155,3 +157,44 @@
}
return client.Upload(ctx, table, row)
}
+
+type openFileFunc func(filename string) (io.ReadCloser, error)
+
+// copyToLocalFile opens destPath for writing locally, making it executable if specified.
+// It then uses openFile to open srcPath and copies it to the local file.
+func copyToLocalFile(destPath string, executable bool, srcPath string, openFile openFileFunc) (err error) {
+ defer derrors.Wrap(&err, "copyToFile(%q, %q)", destPath, srcPath)
+
+ var mode os.FileMode
+ if executable {
+ mode = 0755
+ } else {
+ mode = 0644
+ }
+ destf, err := os.OpenFile(destPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, mode)
+ if err != nil {
+ return err
+ }
+ rc, err := openFile(srcPath)
+ if err != nil {
+ return err
+ }
+ defer rc.Close()
+ return copyAndClose(destf, rc)
+}
+
+// copyAndClose copies r to wc and closes wc.
+func copyAndClose(wc io.WriteCloser, r io.Reader) error {
+ _, err := io.Copy(wc, r)
+ err2 := wc.Close()
+ if err == nil {
+ err = err2
+ }
+ return err
+}
+
+func gcsOpenFileFunc(ctx context.Context, bucket *storage.BucketHandle) openFileFunc {
+ return func(name string) (io.ReadCloser, error) {
+ return bucket.Object(name).NewReader(ctx)
+ }
+}
diff --git a/internal/worker/server.go b/internal/worker/server.go
index c7cd9fd..e3b8c92 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -17,6 +17,7 @@
"time"
"cloud.google.com/go/errorreporting"
+ "cloud.google.com/go/storage"
"github.com/google/safehtml/template"
"golang.org/x/pkgsite-metrics/internal/bigquery"
"golang.org/x/pkgsite-metrics/internal/config"
@@ -187,7 +188,19 @@
}
func (s *Server) registerAnalysisHandlers(ctx context.Context) error {
- h := &analysisServer{s}
+ if s.cfg.BinaryBucket == "" {
+ return errors.New("missing binary bucket (define GO_ECOSYSTEM_BINARY_BUCKET)")
+ }
+ c, err := storage.NewClient(ctx)
+ if err != nil {
+ return err
+ }
+ bucket := c.Bucket(s.cfg.BinaryBucket)
+
+ h := &analysisServer{
+ Server: s,
+ openFile: gcsOpenFileFunc(ctx, bucket),
+ }
s.handle("/analysis/scan/", h.handleScan)
s.handle("/analysis/enqueue", h.handleEnqueue)
return nil
diff --git a/internal/worker/vulncheck_scan.go b/internal/worker/vulncheck_scan.go
index 8bcd142..10bc700 100644
--- a/internal/worker/vulncheck_scan.go
+++ b/internal/worker/vulncheck_scan.go
@@ -400,7 +400,12 @@
return nil, err
}
defer os.Remove(destf.Name())
- if err := copyFromGCSToWriter(ctx, destf, s.gcsBucket, gcsPathname); err != nil {
+ rc, err := s.gcsBucket.Object(gcsPathname).NewReader(ctx)
+ if err != nil {
+ return nil, err
+ }
+ defer rc.Close()
+ if err := copyAndClose(destf, rc); err != nil {
return nil, err
}
log.Infof(ctx, "%s@%s/%s: running vulncheck in sandbox on %s", modulePath, version, binDir, destf.Name())
@@ -604,7 +609,7 @@
log.Debug(ctx, "copying to temp dir",
"from", gcsPathname, "module", modulePath, "version", version, "dir", binDir)
localPathname := filepath.Join(tempDir, "binary")
- if err := copyFromGCS(ctx, s.gcsBucket, gcsPathname, localPathname, false); err != nil {
+ if err := copyToLocalFile(localPathname, false, gcsPathname, gcsOpenFileFunc(ctx, s.gcsBucket)); err != nil {
return nil, err
}
@@ -630,26 +635,6 @@
return res.Vulns, nil
}
-func copyFromGCS(ctx context.Context, bucket *storage.BucketHandle, srcPath, destPath string, executable bool) (err error) {
- defer derrors.Wrap(&err, "copyFromGCS(%q, %q)", srcPath, destPath)
- var mode os.FileMode
- if executable {
- mode = 0755
- } else {
- mode = 0644
- }
- destf, err := os.OpenFile(destPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, mode)
- if err != nil {
- return err
- }
- err1 := copyFromGCSToWriter(ctx, destf, bucket, srcPath)
- err2 := destf.Close()
- if err1 != nil {
- return err1
- }
- return err2
-}
-
func copyFromGCSToWriter(ctx context.Context, w io.Writer, bucket *storage.BucketHandle, srcPath string) error {
gcsReader, err := bucket.Object(srcPath).NewReader(ctx)
if err != nil {