internal: add frontend fetch source
Frontend fetch requests now have a source=fetch query param.
This query param will be used in the next CL to update
module_version_states correctly.
Change-Id: I658ccea9fba7583838f8ccf733e2862966aef86d
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/348816
Trust: Julie Qiu <julie@golang.org>
Run-TryBot: Julie Qiu <julie@golang.org>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
diff --git a/internal/frontend/fetch.go b/internal/frontend/fetch.go
index 5e00260..0599b48 100644
--- a/internal/frontend/fetch.go
+++ b/internal/frontend/fetch.go
@@ -27,6 +27,7 @@
"golang.org/x/pkgsite/internal/log"
"golang.org/x/pkgsite/internal/postgres"
"golang.org/x/pkgsite/internal/proxy"
+ "golang.org/x/pkgsite/internal/queue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/stdlib"
"golang.org/x/pkgsite/internal/version"
@@ -196,7 +197,8 @@
// A row for this modulePath and requestedVersion combination does not
// exist in version_map. Enqueue the module version to be fetched.
- if _, err := s.queue.ScheduleFetch(ctx, modulePath, requestedVersion, nil); err != nil {
+ opts := &queue.Options{Source: queue.SourceFrontendValue}
+ if _, err := s.queue.ScheduleFetch(ctx, modulePath, requestedVersion, opts); err != nil {
fr.err = err
fr.status = http.StatusInternalServerError
}
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index b8f9e76..8c1c37b 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -152,6 +152,10 @@
// Suffix is used to force reprocessing of tasks that would normally be
// de-duplicated. It is appended to the task name.
Suffix string
+
+ // Source is the source that requested the task to be queued. It is
+ // either "frontend" or the empty string if it is the worker.
+ Source string
}
// Maximum timeout for HTTP tasks.
@@ -161,14 +165,24 @@
const (
DisableProxyFetchParam = "proxyfetch"
DisableProxyFetchValue = "off"
+ SourceParam = "source"
+ SourceFrontendValue = "frontend"
)
func (q *GCP) newTaskRequest(modulePath, version string, opts *Options) *taskspb.CreateTaskRequest {
taskID := newTaskID(modulePath, version)
relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version)
- if opts.DisableProxyFetch {
- relativeURI += fmt.Sprintf("?%s=%s", DisableProxyFetchParam, DisableProxyFetchValue)
+ var params []string
+ if opts.Source != "" {
+ params = append(params, fmt.Sprintf("%s=%s", SourceParam, opts.Source))
}
+ if opts.DisableProxyFetch {
+ params = append(params, fmt.Sprintf("%s=%s", DisableProxyFetchParam, DisableProxyFetchValue))
+ }
+ if len(params) > 0 {
+ relativeURI += fmt.Sprintf("?%s", strings.Join(params, "&"))
+ }
+
task := &taskspb.Task{
Name: fmt.Sprintf("%s/tasks/%s", q.queueName, taskID),
DispatchDeadline: ptypes.DurationProto(maxCloudTasksTimeout),
diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go
index b7909d1..1e6c370 100644
--- a/internal/worker/fetch.go
+++ b/internal/worker/fetch.go
@@ -99,6 +99,7 @@
DB *postgres.DB
Cache *cache.Cache
loadShedder *loadShedder
+ Source string
}
// FetchAndUpdateState fetches and processes a module version, and then updates
diff --git a/internal/worker/fetch_test.go b/internal/worker/fetch_test.go
index b029ab3..200a20b 100644
--- a/internal/worker/fetch_test.go
+++ b/internal/worker/fetch_test.go
@@ -402,7 +402,7 @@
defer teardownProxy()
// With a plain proxy, we download the zip twice.
- f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil}
+ f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil, ""}
if _, _, err := f.FetchAndUpdateState(ctx, "m.com", "v1.0.0", testAppVersion); err != nil {
t.Fatal(err)
}
diff --git a/internal/worker/fetcherror_test.go b/internal/worker/fetcherror_test.go
index 1b45e3f..e130112 100644
--- a/internal/worker/fetcherror_test.go
+++ b/internal/worker/fetcherror_test.go
@@ -335,7 +335,7 @@
func fetchAndCheckStatus(ctx context.Context, t *testing.T, proxyClient *proxy.Client, modulePath, version string, wantCode int) {
t.Helper()
- f := Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil}
+ f := Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil, ""}
code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion)
switch code {
case http.StatusOK:
diff --git a/internal/worker/refetch_test.go b/internal/worker/refetch_test.go
index 04920ef..aa55154 100644
--- a/internal/worker/refetch_test.go
+++ b/internal/worker/refetch_test.go
@@ -60,7 +60,7 @@
})
defer teardownProxy()
sourceClient := source.NewClient(sourceTimeout)
- f := &Fetcher{proxyClient, sourceClient, testDB, nil, nil}
+ f := &Fetcher{proxyClient, sourceClient, testDB, nil, nil, ""}
if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
t.Fatalf("FetchAndUpdateState(%q, %q): %v", sample.ModulePath, version, err)
}
@@ -79,7 +79,7 @@
})
defer teardownProxy()
- f = &Fetcher{proxyClient, sourceClient, testDB, nil, nil}
+ f = &Fetcher{proxyClient, sourceClient, testDB, nil, nil, ""}
if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
}
@@ -157,7 +157,7 @@
},
})
defer teardownProxy()
- f = &Fetcher{proxyClient, sourceClient, testDB, nil, nil}
+ f = &Fetcher{proxyClient, sourceClient, testDB, nil, nil, ""}
if _, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion); !errors.Is(err, derrors.DBModuleInsertInvalid) {
t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
}
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 2bad80c..dd57f8c 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -322,6 +322,9 @@
if r.FormValue(queue.DisableProxyFetchParam) == queue.DisableProxyFetchValue {
f.ProxyClient = f.ProxyClient.WithFetchDisabled()
}
+ if r.FormValue(queue.SourceParam) == queue.SourceFrontendValue {
+ f.Source = queue.SourceFrontendValue
+ }
code, resolvedVersion, err := f.FetchAndUpdateState(ctx, modulePath, requestedVersion, s.cfg.AppVersionLabel())
if code == http.StatusInternalServerError {
s.reportError(ctx, err, w, r)
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index dbcd99c..d1ccd0a 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -179,7 +179,7 @@
proxyClient, teardownProxy := proxytest.SetupTestClient(t, test.proxy)
defer teardownProxy()
defer postgres.ResetTestDB(testDB, t)
- f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil}
+ f := &Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB, nil, nil, ""}
// Use 10 workers to have parallelism consistent with the worker binary.
q := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {