internal/worker: add loadshedder to fetch test
Test fetching with an actual loadshedder. This will catch bugs like
the one fixed by https://golang.org/cl/347109.
Change-Id: I75de9eb22110466ddae8e9c7378844a68322c6c2
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/347395
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jamal Carvalho <jamal@golang.org>
diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go
index 7adb4d4..fe10d8b 100644
--- a/internal/worker/fetch.go
+++ b/internal/worker/fetch.go
@@ -100,6 +100,7 @@
SourceClient *source.Client
DB *postgres.DB
Cache *cache.Cache
+ loadShedder *loadShedder
}
// FetchAndUpdateState fetches and processes a module version, and then updates
@@ -522,7 +523,7 @@
}
func (f *Fetcher) maybeShed(ctx context.Context, modulePath, version string) (func(), int64, error) {
- if zipLoadShedder == nil {
+ if f.loadShedder == nil {
return func() {}, 0, nil
}
zipSize, err := getZipSize(ctx, modulePath, version, f.ProxyClient)
@@ -533,7 +534,7 @@
// We treat zip size as a proxy for the total memory consumed by
// processing a module, and use it to decide whether we can currently
// afford to process a module.
- shouldShed, deferFunc := zipLoadShedder.shouldShed(uint64(zipSize))
+ shouldShed, deferFunc := f.loadShedder.shouldShed(uint64(zipSize))
if shouldShed {
stats.Record(ctx, fetchesShedded.M(1))
return deferFunc, 0, fmt.Errorf("%w: size=%dMi", derrors.SheddingLoad, zipSize/mib)
diff --git a/internal/worker/fetch_test.go b/internal/worker/fetch_test.go
index 19a6077..b029ab3 100644
--- a/internal/worker/fetch_test.go
+++ b/internal/worker/fetch_test.go
@@ -302,7 +302,13 @@
for _, test := range testCases {
t.Run(strings.ReplaceAll(test.pkg+"@"+test.version, "/", " "), func(t *testing.T) {
defer postgres.ResetTestDB(testDB, t)
- f := &Fetcher{proxyClient.WithCache(), sourceClient, testDB, nil}
+ f := &Fetcher{
+ ProxyClient: proxyClient.WithCache(),
+ SourceClient: sourceClient,
+ DB: testDB,
+ Cache: nil,
+ loadShedder: &loadShedder{maxSizeInFlight: 100 * mib},
+ }
if _, _, err := f.FetchAndUpdateState(ctx, test.modulePath, test.version, testAppVersion); err != nil {
t.Fatalf("FetchAndUpdateState(%q, %q, %v, %v, %v): %v", test.modulePath, test.version, proxyClient, sourceClient, testDB, err)
}
@@ -396,7 +402,7 @@
defer teardownProxy()
// With a plain proxy, we download the zip twice.
- f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil}
+ f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil}
if _, _, err := f.FetchAndUpdateState(ctx, "m.com", "v1.0.0", testAppVersion); err != nil {
t.Fatal(err)
}
diff --git a/internal/worker/fetcherror_test.go b/internal/worker/fetcherror_test.go
index f7e34ec..1b45e3f 100644
--- a/internal/worker/fetcherror_test.go
+++ b/internal/worker/fetcherror_test.go
@@ -335,7 +335,7 @@
func fetchAndCheckStatus(ctx context.Context, t *testing.T, proxyClient *proxy.Client, modulePath, version string, wantCode int) {
t.Helper()
- f := Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil}
+ f := Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil}
code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion)
switch code {
case http.StatusOK:
diff --git a/internal/worker/refetch_test.go b/internal/worker/refetch_test.go
index 0309aa8..04920ef 100644
--- a/internal/worker/refetch_test.go
+++ b/internal/worker/refetch_test.go
@@ -60,7 +60,7 @@
})
defer teardownProxy()
sourceClient := source.NewClient(sourceTimeout)
- f := &Fetcher{proxyClient, sourceClient, testDB, nil}
+ f := &Fetcher{proxyClient, sourceClient, testDB, nil, nil}
if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
t.Fatalf("FetchAndUpdateState(%q, %q): %v", sample.ModulePath, version, err)
}
@@ -79,7 +79,7 @@
})
defer teardownProxy()
- f = &Fetcher{proxyClient, sourceClient, testDB, nil}
+ f = &Fetcher{proxyClient, sourceClient, testDB, nil, nil}
if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
}
@@ -157,7 +157,7 @@
},
})
defer teardownProxy()
- f = &Fetcher{proxyClient, sourceClient, testDB, nil}
+ f = &Fetcher{proxyClient, sourceClient, testDB, nil, nil}
if _, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion); !errors.Is(err, derrors.DBModuleInsertInvalid) {
t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
}
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 03d7a8c..e1d4b9d 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -303,6 +303,7 @@
SourceClient: s.sourceClient,
DB: s.db,
Cache: s.cache,
+ loadShedder: zipLoadShedder,
}
if r.FormValue(queue.DisableProxyFetchParam) == queue.DisableProxyFetchValue {
f.ProxyClient = f.ProxyClient.WithFetchDisabled()
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index c439a35..dbcd99c 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -179,7 +179,7 @@
proxyClient, teardownProxy := proxytest.SetupTestClient(t, test.proxy)
defer teardownProxy()
defer postgres.ResetTestDB(testDB, t)
- f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil}
+ f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil}
// Use 10 workers to have parallelism consistent with the worker binary.
q := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {