internal: add frontend fetch source

Frontend fetch requests now have a source=fetch query param.

This query param will be used in the next CL to update
module_version_states correctly.

Change-Id: I658ccea9fba7583838f8ccf733e2862966aef86d
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/348816
Trust: Julie Qiu <julie@golang.org>
Run-TryBot: Julie Qiu <julie@golang.org>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
diff --git a/internal/frontend/fetch.go b/internal/frontend/fetch.go
index 5e00260..0599b48 100644
--- a/internal/frontend/fetch.go
+++ b/internal/frontend/fetch.go
@@ -27,6 +27,7 @@
 	"golang.org/x/pkgsite/internal/log"
 	"golang.org/x/pkgsite/internal/postgres"
 	"golang.org/x/pkgsite/internal/proxy"
+	"golang.org/x/pkgsite/internal/queue"
 	"golang.org/x/pkgsite/internal/source"
 	"golang.org/x/pkgsite/internal/stdlib"
 	"golang.org/x/pkgsite/internal/version"
@@ -196,7 +197,8 @@
 
 			// A row for this modulePath and requestedVersion combination does not
 			// exist in version_map. Enqueue the module version to be fetched.
-			if _, err := s.queue.ScheduleFetch(ctx, modulePath, requestedVersion, nil); err != nil {
+			opts := &queue.Options{Source: queue.SourceFrontendValue}
+			if _, err := s.queue.ScheduleFetch(ctx, modulePath, requestedVersion, opts); err != nil {
 				fr.err = err
 				fr.status = http.StatusInternalServerError
 			}
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index b8f9e76..8c1c37b 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -152,6 +152,10 @@
 	// Suffix is used to force reprocessing of tasks that would normally be
 	// de-duplicated. It is appended to the task name.
 	Suffix string
+
+	// Source is the source that requested the task to be queued. It is
+	// either "frontend" or the empty string if it is the worker.
+	Source string
 }
 
 // Maximum timeout for HTTP tasks.
@@ -161,14 +165,24 @@
 const (
 	DisableProxyFetchParam = "proxyfetch"
 	DisableProxyFetchValue = "off"
+	SourceParam            = "source"
+	SourceFrontendValue    = "frontend"
 )
 
 func (q *GCP) newTaskRequest(modulePath, version string, opts *Options) *taskspb.CreateTaskRequest {
 	taskID := newTaskID(modulePath, version)
 	relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version)
-	if opts.DisableProxyFetch {
-		relativeURI += fmt.Sprintf("?%s=%s", DisableProxyFetchParam, DisableProxyFetchValue)
+	var params []string
+	if opts.Source != "" {
+		params = append(params, fmt.Sprintf("%s=%s", SourceParam, opts.Source))
 	}
+	if opts.DisableProxyFetch {
+		params = append(params, fmt.Sprintf("%s=%s", DisableProxyFetchParam, DisableProxyFetchValue))
+	}
+	if len(params) > 0 {
+		relativeURI += fmt.Sprintf("?%s", strings.Join(params, "&"))
+	}
+
 	task := &taskspb.Task{
 		Name:             fmt.Sprintf("%s/tasks/%s", q.queueName, taskID),
 		DispatchDeadline: ptypes.DurationProto(maxCloudTasksTimeout),
diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go
index b7909d1..1e6c370 100644
--- a/internal/worker/fetch.go
+++ b/internal/worker/fetch.go
@@ -99,6 +99,7 @@
 	DB           *postgres.DB
 	Cache        *cache.Cache
 	loadShedder  *loadShedder
+	Source       string
 }
 
 // FetchAndUpdateState fetches and processes a module version, and then updates
diff --git a/internal/worker/fetch_test.go b/internal/worker/fetch_test.go
index b029ab3..200a20b 100644
--- a/internal/worker/fetch_test.go
+++ b/internal/worker/fetch_test.go
@@ -402,7 +402,7 @@
 	defer teardownProxy()
 
 	// With a plain proxy, we download the zip twice.
-	f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil}
+	f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil, ""}
 	if _, _, err := f.FetchAndUpdateState(ctx, "m.com", "v1.0.0", testAppVersion); err != nil {
 		t.Fatal(err)
 	}
diff --git a/internal/worker/fetcherror_test.go b/internal/worker/fetcherror_test.go
index 1b45e3f..e130112 100644
--- a/internal/worker/fetcherror_test.go
+++ b/internal/worker/fetcherror_test.go
@@ -335,7 +335,7 @@
 
 func fetchAndCheckStatus(ctx context.Context, t *testing.T, proxyClient *proxy.Client, modulePath, version string, wantCode int) {
 	t.Helper()
-	f := Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil}
+	f := Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil, ""}
 	code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion)
 	switch code {
 	case http.StatusOK:
diff --git a/internal/worker/refetch_test.go b/internal/worker/refetch_test.go
index 04920ef..aa55154 100644
--- a/internal/worker/refetch_test.go
+++ b/internal/worker/refetch_test.go
@@ -60,7 +60,7 @@
 	})
 	defer teardownProxy()
 	sourceClient := source.NewClient(sourceTimeout)
-	f := &Fetcher{proxyClient, sourceClient, testDB, nil, nil}
+	f := &Fetcher{proxyClient, sourceClient, testDB, nil, nil, ""}
 	if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
 		t.Fatalf("FetchAndUpdateState(%q, %q): %v", sample.ModulePath, version, err)
 	}
@@ -79,7 +79,7 @@
 	})
 	defer teardownProxy()
 
-	f = &Fetcher{proxyClient, sourceClient, testDB, nil, nil}
+	f = &Fetcher{proxyClient, sourceClient, testDB, nil, nil, ""}
 	if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
 		t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
 	}
@@ -157,7 +157,7 @@
 		},
 	})
 	defer teardownProxy()
-	f = &Fetcher{proxyClient, sourceClient, testDB, nil, nil}
+	f = &Fetcher{proxyClient, sourceClient, testDB, nil, nil, ""}
 	if _, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion); !errors.Is(err, derrors.DBModuleInsertInvalid) {
 		t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
 	}
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 2bad80c..dd57f8c 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -322,6 +322,9 @@
 	if r.FormValue(queue.DisableProxyFetchParam) == queue.DisableProxyFetchValue {
 		f.ProxyClient = f.ProxyClient.WithFetchDisabled()
 	}
+	if r.FormValue(queue.SourceParam) == queue.SourceFrontendValue {
+		f.Source = queue.SourceFrontendValue
+	}
 	code, resolvedVersion, err := f.FetchAndUpdateState(ctx, modulePath, requestedVersion, s.cfg.AppVersionLabel())
 	if code == http.StatusInternalServerError {
 		s.reportError(ctx, err, w, r)
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index dbcd99c..d1ccd0a 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -179,7 +179,7 @@
 			proxyClient, teardownProxy := proxytest.SetupTestClient(t, test.proxy)
 			defer teardownProxy()
 			defer postgres.ResetTestDB(testDB, t)
-			f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil}
+			f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil, ""}
 
 			// Use 10 workers to have parallelism consistent with the worker binary.
 			q := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {