internal/worker: factor out FetchAndUpdateState args

Move some common args to a struct.

Change-Id: I4e500bc72f0cc622ec7641568beb5c7dbde1e69a
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/278713
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jamal Carvalho <jamal@golang.org>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 36fda83..aa8e1b8 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -83,7 +83,12 @@
 	expg := cmdconfig.ExperimentGetter(ctx, cfg)
 	fetchQueue, err := queue.New(ctx, cfg, queueName, *workers, expg,
 		func(ctx context.Context, modulePath, version string) (int, error) {
-			code, _, err := worker.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db, cfg.AppVersionLabel())
+			f := &worker.Fetcher{
+				ProxyClient:  proxyClient,
+				SourceClient: sourceClient,
+				DB:           db,
+			}
+			code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel())
 			return code, err
 		})
 	if err != nil {
diff --git a/internal/testing/integration/integration_test.go b/internal/testing/integration/integration_test.go
index 11905fc..845868d 100644
--- a/internal/testing/integration/integration_test.go
+++ b/internal/testing/integration/integration_test.go
@@ -78,9 +78,13 @@
 
 	// TODO: it would be better if InMemory made http requests
 	// back to worker, rather than calling fetch itself.
-	sourceClient := source.NewClient(1 * time.Second)
 	queue := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
-		code, _, err := worker.FetchAndUpdateState(ctx, mpath, version, proxyClient, sourceClient, testDB, "test")
+		f := &worker.Fetcher{
+			ProxyClient:  proxyClient,
+			SourceClient: source.NewClient(1 * time.Second),
+			DB:           testDB,
+		}
+		code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "test")
 		return code, err
 	})
 	workerServer, err := worker.NewServer(&config.Config{}, worker.ServerConfig{
diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go
index 5861831..32bcdf0 100644
--- a/internal/worker/fetch.go
+++ b/internal/worker/fetch.go
@@ -35,11 +35,18 @@
 	timings map[string]time.Duration
 }
 
+// A Fetcher holds state for fetching modules.
+type Fetcher struct {
+	ProxyClient  *proxy.Client
+	SourceClient *source.Client
+	DB           *postgres.DB
+}
+
 // FetchAndUpdateState fetches and processes a module version, and then updates
 // the module_version_states table according to the result. It returns an HTTP
 // status code representing the result of the fetch operation, and a non-nil
 // error if this status code is not 200.
-func FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion string, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB, appVersionLabel string) (_ int, resolvedVersion string, err error) {
+func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion, appVersionLabel string) (_ int, resolvedVersion string, err error) {
 	defer derrors.Wrap(&err, "FetchAndUpdateState(%q, %q)", modulePath, requestedVersion)
 
 	tctx, span := trace.StartSpan(ctx, "FetchAndUpdateState")
@@ -56,14 +63,14 @@
 		trace.StringAttribute("version", requestedVersion))
 	defer span.End()
 
-	ft := fetchAndInsertModule(ctx, modulePath, requestedVersion, proxyClient, sourceClient, db)
+	ft := f.fetchAndInsertModule(ctx, modulePath, requestedVersion)
 	span.AddAttributes(trace.Int64Attribute("numPackages", int64(len(ft.PackageVersionStates))))
 
 	// If there were any errors processing the module then we didn't insert it.
 	// Delete it in case we are reprocessing an existing module.
 	// However, don't delete if the error was internal, or we are shedding load.
 	if ft.Status >= 400 && ft.Status < 500 {
-		if err := deleteModule(ctx, db, ft); err != nil {
+		if err := deleteModule(ctx, f.DB, ft); err != nil {
 			log.Error(ctx, err)
 			ft.Error = err
 			ft.Status = http.StatusInternalServerError
@@ -73,7 +80,7 @@
 	}
 	// Regardless of what the status code is, insert the result into
 	// version_map, so that a response can be returned for frontend_fetch.
-	if err := updateVersionMap(ctx, db, ft); err != nil {
+	if err := updateVersionMap(ctx, f.DB, ft); err != nil {
 		log.Error(ctx, err)
 		if ft.Status != http.StatusInternalServerError {
 			ft.Error = err
@@ -98,7 +105,7 @@
 	// TODO(golang/go#39628): Split UpsertModuleVersionState into
 	// InsertModuleVersionState and UpdateModuleVersionState.
 	start := time.Now()
-	err = db.UpsertModuleVersionState(ctx, ft.ModulePath, ft.ResolvedVersion, appVersionLabel,
+	err = f.DB.UpsertModuleVersionState(ctx, ft.ModulePath, ft.ResolvedVersion, appVersionLabel,
 		time.Time{}, ft.Status, ft.GoModPath, ft.Error, ft.PackageVersionStates)
 	ft.timings["db.UpsertModuleVersionState"] = time.Since(start)
 	if err != nil {
@@ -121,7 +128,7 @@
 // The given parentCtx is used for tracing, but fetches actually execute in a
 // detached context with fixed timeout, so that fetches are allowed to complete
 // even for short-lived requests.
-func fetchAndInsertModule(ctx context.Context, modulePath, requestedVersion string, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB) *fetchTask {
+func (f *Fetcher) fetchAndInsertModule(ctx context.Context, modulePath, requestedVersion string) *fetchTask {
 	ft := &fetchTask{
 		FetchResult: fetch.FetchResult{
 			ModulePath:       modulePath,
@@ -143,7 +150,7 @@
 		return ft
 	}
 
-	exc, err := db.IsExcluded(ctx, modulePath)
+	exc, err := f.DB.IsExcluded(ctx, modulePath)
 	if err != nil {
 		ft.Error = err
 		return ft
@@ -154,7 +161,7 @@
 	}
 
 	start := time.Now()
-	fr := fetch.FetchModule(ctx, modulePath, requestedVersion, proxyClient, sourceClient, false)
+	fr := fetch.FetchModule(ctx, modulePath, requestedVersion, f.ProxyClient, f.SourceClient, false)
 	if fr == nil {
 		panic("fetch.FetchModule should never return a nil FetchResult")
 	}
@@ -174,7 +181,7 @@
 	log.Infof(ctx, "fetch.FetchVersion succeeded for %s@%s", ft.ModulePath, ft.RequestedVersion)
 
 	start = time.Now()
-	err = db.InsertModule(ctx, ft.Module)
+	err = f.DB.InsertModule(ctx, ft.Module)
 	ft.timings["db.InsertModule"] = time.Since(start)
 	if err != nil {
 		log.Error(ctx, err)
diff --git a/internal/worker/fetch_test.go b/internal/worker/fetch_test.go
index e311a8a..8eb8a1f 100644
--- a/internal/worker/fetch_test.go
+++ b/internal/worker/fetch_test.go
@@ -367,13 +367,12 @@
 		},
 	}
 
+	sourceClient := source.NewClient(sourceTimeout)
+	f := &Fetcher{proxyClient, sourceClient, testDB}
 	for _, test := range testCases {
 		t.Run(test.pkg, func(t *testing.T) {
 			defer postgres.ResetTestDB(testDB, t)
-
-			sourceClient := source.NewClient(sourceTimeout)
-
-			if _, _, err := FetchAndUpdateState(ctx, test.modulePath, test.version, proxyClient, sourceClient, testDB, testAppVersion); err != nil {
+			if _, _, err := f.FetchAndUpdateState(ctx, test.modulePath, test.version, testAppVersion); err != nil {
 				t.Fatalf("FetchAndUpdateState(%q, %q, %v, %v, %v): %v", test.modulePath, test.version, proxyClient, sourceClient, testDB, err)
 			}
 
diff --git a/internal/worker/fetcherror_test.go b/internal/worker/fetcherror_test.go
index 44547f0..071f5b9 100644
--- a/internal/worker/fetcherror_test.go
+++ b/internal/worker/fetcherror_test.go
@@ -333,8 +333,8 @@
 
 func fetchAndCheckStatus(ctx context.Context, t *testing.T, proxyClient *proxy.Client, modulePath, version string, wantCode int) {
 	t.Helper()
-	sourceClient := source.NewClient(sourceTimeout)
-	code, _, err := FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, testDB, testAppVersion)
+	f := Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB}
+	code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion)
 	switch code {
 	case http.StatusOK:
 		if err != nil {
diff --git a/internal/worker/refetch_test.go b/internal/worker/refetch_test.go
index a845b48..8747842 100644
--- a/internal/worker/refetch_test.go
+++ b/internal/worker/refetch_test.go
@@ -60,8 +60,9 @@
 	})
 	defer teardownProxy()
 	sourceClient := source.NewClient(sourceTimeout)
-	if _, _, err := FetchAndUpdateState(ctx, sample.ModulePath, version, proxyClient, sourceClient, testDB, testAppVersion); err != nil {
-		t.Fatalf("FetchAndUpdateState(%q, %q, %v, %v, %v): %v", sample.ModulePath, version, proxyClient, sourceClient, testDB, err)
+	f := &Fetcher{proxyClient, sourceClient, testDB}
+	if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
+		t.Fatalf("FetchAndUpdateState(%q, %q): %v", sample.ModulePath, version, err)
 	}
 
 	if _, err := testDB.GetUnitMeta(ctx, pkgFoo, internal.UnknownModulePath, version); err != nil {
@@ -78,8 +79,9 @@
 	})
 	defer teardownProxy()
 
-	if _, _, err := FetchAndUpdateState(ctx, sample.ModulePath, version, proxyClient, sourceClient, testDB, testAppVersion); err != nil {
-		t.Fatalf("FetchAndUpdateState(%q, %q, %v, %v, %v): %v", modulePath, version, proxyClient, sourceClient, testDB, err)
+	f = &Fetcher{proxyClient, sourceClient, testDB}
+	if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
+		t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
 	}
 	want := &internal.Unit{
 		UnitMeta: internal.UnitMeta{
@@ -120,7 +122,7 @@
 	if diff := cmp.Diff(want.UnitMeta, *got,
 		cmp.AllowUnexported(source.Info{}),
 		cmpopts.IgnoreFields(licenses.Metadata{}, "Coverage"),
-		cmpopts.IgnoreFields(internal.UnitMeta{}, "HasGoMod"),); diff != "" {
+		cmpopts.IgnoreFields(internal.UnitMeta{}, "HasGoMod")); diff != "" {
 		t.Fatalf("testDB.GetUnitMeta(ctx, %q, %q) mismatch (-want +got):\n%s", want.ModulePath, want.Version, diff)
 	}
 
@@ -151,7 +153,8 @@
 		},
 	})
 	defer teardownProxy()
-	if _, _, err := FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, testDB, testAppVersion); !errors.Is(err, derrors.DBModuleInsertInvalid) {
-		t.Fatalf("FetchAndUpdateState(%q, %q, %v, %v, %v): %v", modulePath, version, proxyClient, sourceClient, testDB, err)
+	f = &Fetcher{proxyClient, sourceClient, testDB}
+	if _, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion); !errors.Is(err, derrors.DBModuleInsertInvalid) {
+		t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
 	}
 }
diff --git a/internal/worker/server.go b/internal/worker/server.go
index c35afdc..fa794aa 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -289,7 +289,12 @@
 		return err.Error(), http.StatusBadRequest
 	}
 
-	code, resolvedVersion, err := FetchAndUpdateState(r.Context(), modulePath, requestedVersion, s.proxyClient, s.sourceClient, s.db, s.cfg.AppVersionLabel())
+	f := &Fetcher{
+		ProxyClient:  s.proxyClient,
+		SourceClient: s.sourceClient,
+		DB:           s.db,
+	}
+	code, resolvedVersion, err := f.FetchAndUpdateState(r.Context(), modulePath, requestedVersion, s.cfg.AppVersionLabel())
 	if err != nil {
 		return err.Error(), code
 	}
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index 0822af8..ef88d5b 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -173,13 +173,12 @@
 
 			proxyClient, teardownProxy := proxy.SetupTestClient(t, test.proxy)
 			defer teardownProxy()
-			sourceClient := source.NewClient(sourceTimeout)
-
 			defer postgres.ResetTestDB(testDB, t)
+			f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB}
 
 			// Use 10 workers to have parallelism consistent with the worker binary.
 			q := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
-				code, _, err := FetchAndUpdateState(ctx, mpath, version, proxyClient, sourceClient, testDB, "")
+				code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "")
 				return code, err
 			})
 
@@ -187,7 +186,7 @@
 				DB:           testDB,
 				IndexClient:  indexClient,
 				ProxyClient:  proxyClient,
-				SourceClient: sourceClient,
+				SourceClient: f.SourceClient,
 				Queue:        q,
 			})
 			if err != nil {