internal/worker: factor out FetchAndUpdateState args
Move some common args to a struct.
Change-Id: I4e500bc72f0cc622ec7641568beb5c7dbde1e69a
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/278713
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jamal Carvalho <jamal@golang.org>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 36fda83..aa8e1b8 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -83,7 +83,12 @@
expg := cmdconfig.ExperimentGetter(ctx, cfg)
fetchQueue, err := queue.New(ctx, cfg, queueName, *workers, expg,
func(ctx context.Context, modulePath, version string) (int, error) {
- code, _, err := worker.FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, db, cfg.AppVersionLabel())
+ f := &worker.Fetcher{
+ ProxyClient: proxyClient,
+ SourceClient: sourceClient,
+ DB: db,
+ }
+ code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel())
return code, err
})
if err != nil {
diff --git a/internal/testing/integration/integration_test.go b/internal/testing/integration/integration_test.go
index 11905fc..845868d 100644
--- a/internal/testing/integration/integration_test.go
+++ b/internal/testing/integration/integration_test.go
@@ -78,9 +78,13 @@
// TODO: it would be better if InMemory made http requests
// back to worker, rather than calling fetch itself.
- sourceClient := source.NewClient(1 * time.Second)
queue := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
- code, _, err := worker.FetchAndUpdateState(ctx, mpath, version, proxyClient, sourceClient, testDB, "test")
+ f := &worker.Fetcher{
+ ProxyClient: proxyClient,
+ SourceClient: source.NewClient(1 * time.Second),
+ DB: testDB,
+ }
+ code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "test")
return code, err
})
workerServer, err := worker.NewServer(&config.Config{}, worker.ServerConfig{
diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go
index 5861831..32bcdf0 100644
--- a/internal/worker/fetch.go
+++ b/internal/worker/fetch.go
@@ -35,11 +35,18 @@
timings map[string]time.Duration
}
+// A Fetcher holds state for fetching modules.
+type Fetcher struct {
+ ProxyClient *proxy.Client
+ SourceClient *source.Client
+ DB *postgres.DB
+}
+
// FetchAndUpdateState fetches and processes a module version, and then updates
// the module_version_states table according to the result. It returns an HTTP
// status code representing the result of the fetch operation, and a non-nil
// error if this status code is not 200.
-func FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion string, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB, appVersionLabel string) (_ int, resolvedVersion string, err error) {
+func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion, appVersionLabel string) (_ int, resolvedVersion string, err error) {
defer derrors.Wrap(&err, "FetchAndUpdateState(%q, %q)", modulePath, requestedVersion)
tctx, span := trace.StartSpan(ctx, "FetchAndUpdateState")
@@ -56,14 +63,14 @@
trace.StringAttribute("version", requestedVersion))
defer span.End()
- ft := fetchAndInsertModule(ctx, modulePath, requestedVersion, proxyClient, sourceClient, db)
+ ft := f.fetchAndInsertModule(ctx, modulePath, requestedVersion)
span.AddAttributes(trace.Int64Attribute("numPackages", int64(len(ft.PackageVersionStates))))
// If there were any errors processing the module then we didn't insert it.
// Delete it in case we are reprocessing an existing module.
// However, don't delete if the error was internal, or we are shedding load.
if ft.Status >= 400 && ft.Status < 500 {
- if err := deleteModule(ctx, db, ft); err != nil {
+ if err := deleteModule(ctx, f.DB, ft); err != nil {
log.Error(ctx, err)
ft.Error = err
ft.Status = http.StatusInternalServerError
@@ -73,7 +80,7 @@
}
// Regardless of what the status code is, insert the result into
// version_map, so that a response can be returned for frontend_fetch.
- if err := updateVersionMap(ctx, db, ft); err != nil {
+ if err := updateVersionMap(ctx, f.DB, ft); err != nil {
log.Error(ctx, err)
if ft.Status != http.StatusInternalServerError {
ft.Error = err
@@ -98,7 +105,7 @@
// TODO(golang/go#39628): Split UpsertModuleVersionState into
// InsertModuleVersionState and UpdateModuleVersionState.
start := time.Now()
- err = db.UpsertModuleVersionState(ctx, ft.ModulePath, ft.ResolvedVersion, appVersionLabel,
+ err = f.DB.UpsertModuleVersionState(ctx, ft.ModulePath, ft.ResolvedVersion, appVersionLabel,
time.Time{}, ft.Status, ft.GoModPath, ft.Error, ft.PackageVersionStates)
ft.timings["db.UpsertModuleVersionState"] = time.Since(start)
if err != nil {
@@ -121,7 +128,7 @@
// The given parentCtx is used for tracing, but fetches actually execute in a
// detached context with fixed timeout, so that fetches are allowed to complete
// even for short-lived requests.
-func fetchAndInsertModule(ctx context.Context, modulePath, requestedVersion string, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB) *fetchTask {
+func (f *Fetcher) fetchAndInsertModule(ctx context.Context, modulePath, requestedVersion string) *fetchTask {
ft := &fetchTask{
FetchResult: fetch.FetchResult{
ModulePath: modulePath,
@@ -143,7 +150,7 @@
return ft
}
- exc, err := db.IsExcluded(ctx, modulePath)
+ exc, err := f.DB.IsExcluded(ctx, modulePath)
if err != nil {
ft.Error = err
return ft
@@ -154,7 +161,7 @@
}
start := time.Now()
- fr := fetch.FetchModule(ctx, modulePath, requestedVersion, proxyClient, sourceClient, false)
+ fr := fetch.FetchModule(ctx, modulePath, requestedVersion, f.ProxyClient, f.SourceClient, false)
if fr == nil {
panic("fetch.FetchModule should never return a nil FetchResult")
}
@@ -174,7 +181,7 @@
log.Infof(ctx, "fetch.FetchVersion succeeded for %s@%s", ft.ModulePath, ft.RequestedVersion)
start = time.Now()
- err = db.InsertModule(ctx, ft.Module)
+ err = f.DB.InsertModule(ctx, ft.Module)
ft.timings["db.InsertModule"] = time.Since(start)
if err != nil {
log.Error(ctx, err)
diff --git a/internal/worker/fetch_test.go b/internal/worker/fetch_test.go
index e311a8a..8eb8a1f 100644
--- a/internal/worker/fetch_test.go
+++ b/internal/worker/fetch_test.go
@@ -367,13 +367,12 @@
},
}
+ sourceClient := source.NewClient(sourceTimeout)
+ f := &Fetcher{proxyClient, sourceClient, testDB}
for _, test := range testCases {
t.Run(test.pkg, func(t *testing.T) {
defer postgres.ResetTestDB(testDB, t)
-
- sourceClient := source.NewClient(sourceTimeout)
-
- if _, _, err := FetchAndUpdateState(ctx, test.modulePath, test.version, proxyClient, sourceClient, testDB, testAppVersion); err != nil {
+ if _, _, err := f.FetchAndUpdateState(ctx, test.modulePath, test.version, testAppVersion); err != nil {
t.Fatalf("FetchAndUpdateState(%q, %q, %v, %v, %v): %v", test.modulePath, test.version, proxyClient, sourceClient, testDB, err)
}
diff --git a/internal/worker/fetcherror_test.go b/internal/worker/fetcherror_test.go
index 44547f0..071f5b9 100644
--- a/internal/worker/fetcherror_test.go
+++ b/internal/worker/fetcherror_test.go
@@ -333,8 +333,8 @@
func fetchAndCheckStatus(ctx context.Context, t *testing.T, proxyClient *proxy.Client, modulePath, version string, wantCode int) {
t.Helper()
- sourceClient := source.NewClient(sourceTimeout)
- code, _, err := FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, testDB, testAppVersion)
+ f := Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB}
+ code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion)
switch code {
case http.StatusOK:
if err != nil {
diff --git a/internal/worker/refetch_test.go b/internal/worker/refetch_test.go
index a845b48..8747842 100644
--- a/internal/worker/refetch_test.go
+++ b/internal/worker/refetch_test.go
@@ -60,8 +60,9 @@
})
defer teardownProxy()
sourceClient := source.NewClient(sourceTimeout)
- if _, _, err := FetchAndUpdateState(ctx, sample.ModulePath, version, proxyClient, sourceClient, testDB, testAppVersion); err != nil {
- t.Fatalf("FetchAndUpdateState(%q, %q, %v, %v, %v): %v", sample.ModulePath, version, proxyClient, sourceClient, testDB, err)
+ f := &Fetcher{proxyClient, sourceClient, testDB}
+ if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
+ t.Fatalf("FetchAndUpdateState(%q, %q): %v", sample.ModulePath, version, err)
}
if _, err := testDB.GetUnitMeta(ctx, pkgFoo, internal.UnknownModulePath, version); err != nil {
@@ -78,8 +79,9 @@
})
defer teardownProxy()
- if _, _, err := FetchAndUpdateState(ctx, sample.ModulePath, version, proxyClient, sourceClient, testDB, testAppVersion); err != nil {
- t.Fatalf("FetchAndUpdateState(%q, %q, %v, %v, %v): %v", modulePath, version, proxyClient, sourceClient, testDB, err)
+ f = &Fetcher{proxyClient, sourceClient, testDB}
+ if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
+ t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
}
want := &internal.Unit{
UnitMeta: internal.UnitMeta{
@@ -120,7 +122,7 @@
if diff := cmp.Diff(want.UnitMeta, *got,
cmp.AllowUnexported(source.Info{}),
cmpopts.IgnoreFields(licenses.Metadata{}, "Coverage"),
- cmpopts.IgnoreFields(internal.UnitMeta{}, "HasGoMod"),); diff != "" {
+ cmpopts.IgnoreFields(internal.UnitMeta{}, "HasGoMod")); diff != "" {
t.Fatalf("testDB.GetUnitMeta(ctx, %q, %q) mismatch (-want +got):\n%s", want.ModulePath, want.Version, diff)
}
@@ -151,7 +153,8 @@
},
})
defer teardownProxy()
- if _, _, err := FetchAndUpdateState(ctx, modulePath, version, proxyClient, sourceClient, testDB, testAppVersion); !errors.Is(err, derrors.DBModuleInsertInvalid) {
- t.Fatalf("FetchAndUpdateState(%q, %q, %v, %v, %v): %v", modulePath, version, proxyClient, sourceClient, testDB, err)
+ f = &Fetcher{proxyClient, sourceClient, testDB}
+ if _, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion); !errors.Is(err, derrors.DBModuleInsertInvalid) {
+ t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
}
}
diff --git a/internal/worker/server.go b/internal/worker/server.go
index c35afdc..fa794aa 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -289,7 +289,12 @@
return err.Error(), http.StatusBadRequest
}
- code, resolvedVersion, err := FetchAndUpdateState(r.Context(), modulePath, requestedVersion, s.proxyClient, s.sourceClient, s.db, s.cfg.AppVersionLabel())
+ f := &Fetcher{
+ ProxyClient: s.proxyClient,
+ SourceClient: s.sourceClient,
+ DB: s.db,
+ }
+ code, resolvedVersion, err := f.FetchAndUpdateState(r.Context(), modulePath, requestedVersion, s.cfg.AppVersionLabel())
if err != nil {
return err.Error(), code
}
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index 0822af8..ef88d5b 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -173,13 +173,12 @@
proxyClient, teardownProxy := proxy.SetupTestClient(t, test.proxy)
defer teardownProxy()
- sourceClient := source.NewClient(sourceTimeout)
-
defer postgres.ResetTestDB(testDB, t)
+ f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB}
// Use 10 workers to have parallelism consistent with the worker binary.
q := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
- code, _, err := FetchAndUpdateState(ctx, mpath, version, proxyClient, sourceClient, testDB, "")
+ code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "")
return code, err
})
@@ -187,7 +186,7 @@
DB: testDB,
IndexClient: indexClient,
ProxyClient: proxyClient,
- SourceClient: sourceClient,
+ SourceClient: f.SourceClient,
Queue: q,
})
if err != nil {