internal/worker: add loadshedder to fetch test

Test fetching with an actual loadshedder. This will catch bugs like
the one fixed by https://golang.org/cl/347109.

Change-Id: I75de9eb22110466ddae8e9c7378844a68322c6c2
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/347395
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jamal Carvalho <jamal@golang.org>
diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go
index 7adb4d4..fe10d8b 100644
--- a/internal/worker/fetch.go
+++ b/internal/worker/fetch.go
@@ -100,6 +100,7 @@
 	SourceClient *source.Client
 	DB           *postgres.DB
 	Cache        *cache.Cache
+	loadShedder  *loadShedder
 }
 
 // FetchAndUpdateState fetches and processes a module version, and then updates
@@ -522,7 +523,7 @@
 }
 
 func (f *Fetcher) maybeShed(ctx context.Context, modulePath, version string) (func(), int64, error) {
-	if zipLoadShedder == nil {
+	if f.loadShedder == nil {
 		return func() {}, 0, nil
 	}
 	zipSize, err := getZipSize(ctx, modulePath, version, f.ProxyClient)
@@ -533,7 +534,7 @@
 	// We treat zip size as a proxy for the total memory consumed by
 	// processing a module, and use it to decide whether we can currently
 	// afford to process a module.
-	shouldShed, deferFunc := zipLoadShedder.shouldShed(uint64(zipSize))
+	shouldShed, deferFunc := f.loadShedder.shouldShed(uint64(zipSize))
 	if shouldShed {
 		stats.Record(ctx, fetchesShedded.M(1))
 		return deferFunc, 0, fmt.Errorf("%w: size=%dMi", derrors.SheddingLoad, zipSize/mib)
diff --git a/internal/worker/fetch_test.go b/internal/worker/fetch_test.go
index 19a6077..b029ab3 100644
--- a/internal/worker/fetch_test.go
+++ b/internal/worker/fetch_test.go
@@ -302,7 +302,13 @@
 	for _, test := range testCases {
 		t.Run(strings.ReplaceAll(test.pkg+"@"+test.version, "/", " "), func(t *testing.T) {
 			defer postgres.ResetTestDB(testDB, t)
-			f := &Fetcher{proxyClient.WithCache(), sourceClient, testDB, nil}
+			f := &Fetcher{
+				ProxyClient:  proxyClient.WithCache(),
+				SourceClient: sourceClient,
+				DB:           testDB,
+				Cache:        nil,
+				loadShedder:  &loadShedder{maxSizeInFlight: 100 * mib},
+			}
 			if _, _, err := f.FetchAndUpdateState(ctx, test.modulePath, test.version, testAppVersion); err != nil {
 				t.Fatalf("FetchAndUpdateState(%q, %q, %v, %v, %v): %v", test.modulePath, test.version, proxyClient, sourceClient, testDB, err)
 			}
@@ -396,7 +402,7 @@
 	defer teardownProxy()
 
 	// With a plain proxy, we download the zip twice.
-	f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil}
+	f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil}
 	if _, _, err := f.FetchAndUpdateState(ctx, "m.com", "v1.0.0", testAppVersion); err != nil {
 		t.Fatal(err)
 	}
diff --git a/internal/worker/fetcherror_test.go b/internal/worker/fetcherror_test.go
index f7e34ec..1b45e3f 100644
--- a/internal/worker/fetcherror_test.go
+++ b/internal/worker/fetcherror_test.go
@@ -335,7 +335,7 @@
 
 func fetchAndCheckStatus(ctx context.Context, t *testing.T, proxyClient *proxy.Client, modulePath, version string, wantCode int) {
 	t.Helper()
-	f := Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil}
+	f := Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil}
 	code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion)
 	switch code {
 	case http.StatusOK:
diff --git a/internal/worker/refetch_test.go b/internal/worker/refetch_test.go
index 0309aa8..04920ef 100644
--- a/internal/worker/refetch_test.go
+++ b/internal/worker/refetch_test.go
@@ -60,7 +60,7 @@
 	})
 	defer teardownProxy()
 	sourceClient := source.NewClient(sourceTimeout)
-	f := &Fetcher{proxyClient, sourceClient, testDB, nil}
+	f := &Fetcher{proxyClient, sourceClient, testDB, nil, nil}
 	if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
 		t.Fatalf("FetchAndUpdateState(%q, %q): %v", sample.ModulePath, version, err)
 	}
@@ -79,7 +79,7 @@
 	})
 	defer teardownProxy()
 
-	f = &Fetcher{proxyClient, sourceClient, testDB, nil}
+	f = &Fetcher{proxyClient, sourceClient, testDB, nil, nil}
 	if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
 		t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
 	}
@@ -157,7 +157,7 @@
 		},
 	})
 	defer teardownProxy()
-	f = &Fetcher{proxyClient, sourceClient, testDB, nil}
+	f = &Fetcher{proxyClient, sourceClient, testDB, nil, nil}
 	if _, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion); !errors.Is(err, derrors.DBModuleInsertInvalid) {
 		t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
 	}
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 03d7a8c..e1d4b9d 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -303,6 +303,7 @@
 		SourceClient: s.sourceClient,
 		DB:           s.db,
 		Cache:        s.cache,
+		loadShedder:  zipLoadShedder,
 	}
 	if r.FormValue(queue.DisableProxyFetchParam) == queue.DisableProxyFetchValue {
 		f.ProxyClient = f.ProxyClient.WithFetchDisabled()
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index c439a35..dbcd99c 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -179,7 +179,7 @@
 			proxyClient, teardownProxy := proxytest.SetupTestClient(t, test.proxy)
 			defer teardownProxy()
 			defer postgres.ResetTestDB(testDB, t)
-			f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil}
+			f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil}
 
 			// Use 10 workers to have parallelism consistent with the worker binary.
 			q := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {