internal/worker: honor proxyfetch query param

If the fetch handler sees the "proxyfetch" query param
set to "off", it tells internal/fetch to disable
fetching by the proxy.

Change-Id: I1c9303f96f9249a5b420a73451551aba21d9d417
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/278953
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jamal Carvalho <jamal@golang.org>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index aa8e1b8..155c9a0 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -88,7 +88,7 @@
 				SourceClient: sourceClient,
 				DB:           db,
 			}
-			code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel())
+			code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel(), false)
 			return code, err
 		})
 	if err != nil {
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 03578d1..5890b11 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -137,11 +137,16 @@
 // See https://cloud.google.com/tasks/docs/creating-http-target-tasks.
 const maxCloudTasksTimeout = 30 * time.Minute
 
+const (
+	DisableProxyFetchParam = "proxyfetch"
+	DisableProxyFetchValue = "off"
+)
+
 func (q *GCP) newTaskRequest(modulePath, version, suffix string, disableProxyFetch bool) *taskspb.CreateTaskRequest {
 	taskID := newTaskID(modulePath, version)
 	relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version)
 	if disableProxyFetch {
-		relativeURI += "?proxyfetch=off"
+		relativeURI += fmt.Sprintf("?%s=%s", DisableProxyFetchParam, DisableProxyFetchValue)
 	}
 	task := &taskspb.Task{
 		Name:             fmt.Sprintf("%s/tasks/%s", q.queueName, taskID),
diff --git a/internal/testing/integration/integration_test.go b/internal/testing/integration/integration_test.go
index 845868d..20d5bc5 100644
--- a/internal/testing/integration/integration_test.go
+++ b/internal/testing/integration/integration_test.go
@@ -84,7 +84,7 @@
 			SourceClient: source.NewClient(1 * time.Second),
 			DB:           testDB,
 		}
-		code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "test")
+		code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "test", false)
 		return code, err
 	})
 	workerServer, err := worker.NewServer(&config.Config{}, worker.ServerConfig{
diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go
index 32bcdf0..6f3bbaa 100644
--- a/internal/worker/fetch.go
+++ b/internal/worker/fetch.go
@@ -46,8 +46,8 @@
 // the module_version_states table according to the result. It returns an HTTP
 // status code representing the result of the fetch operation, and a non-nil
 // error if this status code is not 200.
-func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion, appVersionLabel string) (_ int, resolvedVersion string, err error) {
-	defer derrors.Wrap(&err, "FetchAndUpdateState(%q, %q)", modulePath, requestedVersion)
+func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion, appVersionLabel string, disableProxyFetch bool) (_ int, resolvedVersion string, err error) {
+	defer derrors.Wrap(&err, "FetchAndUpdateState(%q, %q, %q, %t)", modulePath, requestedVersion, appVersionLabel, disableProxyFetch)
 
 	tctx, span := trace.StartSpan(ctx, "FetchAndUpdateState")
 	ctx = experiment.NewContext(tctx, experiment.FromContext(ctx).Active()...)
@@ -63,7 +63,7 @@
 		trace.StringAttribute("version", requestedVersion))
 	defer span.End()
 
-	ft := f.fetchAndInsertModule(ctx, modulePath, requestedVersion)
+	ft := f.fetchAndInsertModule(ctx, modulePath, requestedVersion, disableProxyFetch)
 	span.AddAttributes(trace.Int64Attribute("numPackages", int64(len(ft.PackageVersionStates))))
 
 	// If there were any errors processing the module then we didn't insert it.
@@ -128,7 +128,7 @@
 // The given parentCtx is used for tracing, but fetches actually execute in a
 // detached context with fixed timeout, so that fetches are allowed to complete
 // even for short-lived requests.
-func (f *Fetcher) fetchAndInsertModule(ctx context.Context, modulePath, requestedVersion string) *fetchTask {
+func (f *Fetcher) fetchAndInsertModule(ctx context.Context, modulePath, requestedVersion string, disableProxyFetch bool) *fetchTask {
 	ft := &fetchTask{
 		FetchResult: fetch.FetchResult{
 			ModulePath:       modulePath,
@@ -161,7 +161,7 @@
 	}
 
 	start := time.Now()
-	fr := fetch.FetchModule(ctx, modulePath, requestedVersion, f.ProxyClient, f.SourceClient, false)
+	fr := fetch.FetchModule(ctx, modulePath, requestedVersion, f.ProxyClient, f.SourceClient, disableProxyFetch)
 	if fr == nil {
 		panic("fetch.FetchModule should never return a nil FetchResult")
 	}
diff --git a/internal/worker/fetch_test.go b/internal/worker/fetch_test.go
index 8eb8a1f..589b315 100644
--- a/internal/worker/fetch_test.go
+++ b/internal/worker/fetch_test.go
@@ -372,7 +372,7 @@
 	for _, test := range testCases {
 		t.Run(test.pkg, func(t *testing.T) {
 			defer postgres.ResetTestDB(testDB, t)
-			if _, _, err := f.FetchAndUpdateState(ctx, test.modulePath, test.version, testAppVersion); err != nil {
+			if _, _, err := f.FetchAndUpdateState(ctx, test.modulePath, test.version, testAppVersion, false); err != nil {
 				t.Fatalf("FetchAndUpdateState(%q, %q, %v, %v, %v): %v", test.modulePath, test.version, proxyClient, sourceClient, testDB, err)
 			}
 
diff --git a/internal/worker/fetcherror_test.go b/internal/worker/fetcherror_test.go
index 071f5b9..5f07947 100644
--- a/internal/worker/fetcherror_test.go
+++ b/internal/worker/fetcherror_test.go
@@ -334,7 +334,7 @@
 func fetchAndCheckStatus(ctx context.Context, t *testing.T, proxyClient *proxy.Client, modulePath, version string, wantCode int) {
 	t.Helper()
 	f := Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB}
-	code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion)
+	code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion, false)
 	switch code {
 	case http.StatusOK:
 		if err != nil {
diff --git a/internal/worker/refetch_test.go b/internal/worker/refetch_test.go
index 8747842..6c5f9f0 100644
--- a/internal/worker/refetch_test.go
+++ b/internal/worker/refetch_test.go
@@ -61,7 +61,7 @@
 	defer teardownProxy()
 	sourceClient := source.NewClient(sourceTimeout)
 	f := &Fetcher{proxyClient, sourceClient, testDB}
-	if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
+	if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion, false); err != nil {
 		t.Fatalf("FetchAndUpdateState(%q, %q): %v", sample.ModulePath, version, err)
 	}
 
@@ -80,7 +80,7 @@
 	defer teardownProxy()
 
 	f = &Fetcher{proxyClient, sourceClient, testDB}
-	if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
+	if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion, false); err != nil {
 		t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
 	}
 	want := &internal.Unit{
@@ -154,7 +154,7 @@
 	})
 	defer teardownProxy()
 	f = &Fetcher{proxyClient, sourceClient, testDB}
-	if _, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion); !errors.Is(err, derrors.DBModuleInsertInvalid) {
+	if _, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion, false); !errors.Is(err, derrors.DBModuleInsertInvalid) {
 		t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
 	}
 }
diff --git a/internal/worker/server.go b/internal/worker/server.go
index fa794aa..52d7ded 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -288,13 +288,13 @@
 	if err != nil {
 		return err.Error(), http.StatusBadRequest
 	}
-
+	disableProxyFetch := r.FormValue(queue.DisableProxyFetchParam) == queue.DisableProxyFetchValue
 	f := &Fetcher{
 		ProxyClient:  s.proxyClient,
 		SourceClient: s.sourceClient,
 		DB:           s.db,
 	}
-	code, resolvedVersion, err := f.FetchAndUpdateState(r.Context(), modulePath, requestedVersion, s.cfg.AppVersionLabel())
+	code, resolvedVersion, err := f.FetchAndUpdateState(r.Context(), modulePath, requestedVersion, s.cfg.AppVersionLabel(), disableProxyFetch)
 	if err != nil {
 		return err.Error(), code
 	}
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index ef88d5b..47e4f64 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -178,7 +178,7 @@
 
 			// Use 10 workers to have parallelism consistent with the worker binary.
 			q := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
-				code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "")
+				code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "", false)
 				return code, err
 			})