internal/worker: honor proxyfetch query param
If the fetch handler sees the "proxyfetch" query param
set to "off", it tells internal/fetch to disable
fetching by the proxy.
Change-Id: I1c9303f96f9249a5b420a73451551aba21d9d417
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/278953
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jamal Carvalho <jamal@golang.org>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index aa8e1b8..155c9a0 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -88,7 +88,7 @@
SourceClient: sourceClient,
DB: db,
}
- code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel())
+ code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, cfg.AppVersionLabel(), false)
return code, err
})
if err != nil {
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 03578d1..5890b11 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -137,11 +137,16 @@
// See https://cloud.google.com/tasks/docs/creating-http-target-tasks.
const maxCloudTasksTimeout = 30 * time.Minute
+const (
+ DisableProxyFetchParam = "proxyfetch"
+ DisableProxyFetchValue = "off"
+)
+
func (q *GCP) newTaskRequest(modulePath, version, suffix string, disableProxyFetch bool) *taskspb.CreateTaskRequest {
taskID := newTaskID(modulePath, version)
relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version)
if disableProxyFetch {
- relativeURI += "?proxyfetch=off"
+ relativeURI += fmt.Sprintf("?%s=%s", DisableProxyFetchParam, DisableProxyFetchValue)
}
task := &taskspb.Task{
Name: fmt.Sprintf("%s/tasks/%s", q.queueName, taskID),
diff --git a/internal/testing/integration/integration_test.go b/internal/testing/integration/integration_test.go
index 845868d..20d5bc5 100644
--- a/internal/testing/integration/integration_test.go
+++ b/internal/testing/integration/integration_test.go
@@ -84,7 +84,7 @@
SourceClient: source.NewClient(1 * time.Second),
DB: testDB,
}
- code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "test")
+ code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "test", false)
return code, err
})
workerServer, err := worker.NewServer(&config.Config{}, worker.ServerConfig{
diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go
index 32bcdf0..6f3bbaa 100644
--- a/internal/worker/fetch.go
+++ b/internal/worker/fetch.go
@@ -46,8 +46,8 @@
// the module_version_states table according to the result. It returns an HTTP
// status code representing the result of the fetch operation, and a non-nil
// error if this status code is not 200.
-func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion, appVersionLabel string) (_ int, resolvedVersion string, err error) {
- defer derrors.Wrap(&err, "FetchAndUpdateState(%q, %q)", modulePath, requestedVersion)
+func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion, appVersionLabel string, disableProxyFetch bool) (_ int, resolvedVersion string, err error) {
+ defer derrors.Wrap(&err, "FetchAndUpdateState(%q, %q, %q, %t)", modulePath, requestedVersion, appVersionLabel, disableProxyFetch)
tctx, span := trace.StartSpan(ctx, "FetchAndUpdateState")
ctx = experiment.NewContext(tctx, experiment.FromContext(ctx).Active()...)
@@ -63,7 +63,7 @@
trace.StringAttribute("version", requestedVersion))
defer span.End()
- ft := f.fetchAndInsertModule(ctx, modulePath, requestedVersion)
+ ft := f.fetchAndInsertModule(ctx, modulePath, requestedVersion, disableProxyFetch)
span.AddAttributes(trace.Int64Attribute("numPackages", int64(len(ft.PackageVersionStates))))
// If there were any errors processing the module then we didn't insert it.
@@ -128,7 +128,7 @@
// The given parentCtx is used for tracing, but fetches actually execute in a
// detached context with fixed timeout, so that fetches are allowed to complete
// even for short-lived requests.
-func (f *Fetcher) fetchAndInsertModule(ctx context.Context, modulePath, requestedVersion string) *fetchTask {
+func (f *Fetcher) fetchAndInsertModule(ctx context.Context, modulePath, requestedVersion string, disableProxyFetch bool) *fetchTask {
ft := &fetchTask{
FetchResult: fetch.FetchResult{
ModulePath: modulePath,
@@ -161,7 +161,7 @@
}
start := time.Now()
- fr := fetch.FetchModule(ctx, modulePath, requestedVersion, f.ProxyClient, f.SourceClient, false)
+ fr := fetch.FetchModule(ctx, modulePath, requestedVersion, f.ProxyClient, f.SourceClient, disableProxyFetch)
if fr == nil {
panic("fetch.FetchModule should never return a nil FetchResult")
}
diff --git a/internal/worker/fetch_test.go b/internal/worker/fetch_test.go
index 8eb8a1f..589b315 100644
--- a/internal/worker/fetch_test.go
+++ b/internal/worker/fetch_test.go
@@ -372,7 +372,7 @@
for _, test := range testCases {
t.Run(test.pkg, func(t *testing.T) {
defer postgres.ResetTestDB(testDB, t)
- if _, _, err := f.FetchAndUpdateState(ctx, test.modulePath, test.version, testAppVersion); err != nil {
+ if _, _, err := f.FetchAndUpdateState(ctx, test.modulePath, test.version, testAppVersion, false); err != nil {
t.Fatalf("FetchAndUpdateState(%q, %q, %v, %v, %v): %v", test.modulePath, test.version, proxyClient, sourceClient, testDB, err)
}
diff --git a/internal/worker/fetcherror_test.go b/internal/worker/fetcherror_test.go
index 071f5b9..5f07947 100644
--- a/internal/worker/fetcherror_test.go
+++ b/internal/worker/fetcherror_test.go
@@ -334,7 +334,7 @@
func fetchAndCheckStatus(ctx context.Context, t *testing.T, proxyClient *proxy.Client, modulePath, version string, wantCode int) {
t.Helper()
f := Fetcher{proxyClient, source.NewClient(sourceTimeout), testDB}
- code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion)
+ code, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion, false)
switch code {
case http.StatusOK:
if err != nil {
diff --git a/internal/worker/refetch_test.go b/internal/worker/refetch_test.go
index 8747842..6c5f9f0 100644
--- a/internal/worker/refetch_test.go
+++ b/internal/worker/refetch_test.go
@@ -61,7 +61,7 @@
defer teardownProxy()
sourceClient := source.NewClient(sourceTimeout)
f := &Fetcher{proxyClient, sourceClient, testDB}
- if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
+ if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion, false); err != nil {
t.Fatalf("FetchAndUpdateState(%q, %q): %v", sample.ModulePath, version, err)
}
@@ -80,7 +80,7 @@
defer teardownProxy()
f = &Fetcher{proxyClient, sourceClient, testDB}
- if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion); err != nil {
+ if _, _, err := f.FetchAndUpdateState(ctx, sample.ModulePath, version, testAppVersion, false); err != nil {
t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
}
want := &internal.Unit{
@@ -154,7 +154,7 @@
})
defer teardownProxy()
f = &Fetcher{proxyClient, sourceClient, testDB}
- if _, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion); !errors.Is(err, derrors.DBModuleInsertInvalid) {
+ if _, _, err := f.FetchAndUpdateState(ctx, modulePath, version, testAppVersion, false); !errors.Is(err, derrors.DBModuleInsertInvalid) {
t.Fatalf("FetchAndUpdateState(%q, %q): %v", modulePath, version, err)
}
}
diff --git a/internal/worker/server.go b/internal/worker/server.go
index fa794aa..52d7ded 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -288,13 +288,13 @@
if err != nil {
return err.Error(), http.StatusBadRequest
}
-
+ disableProxyFetch := r.FormValue(queue.DisableProxyFetchParam) == queue.DisableProxyFetchValue
f := &Fetcher{
ProxyClient: s.proxyClient,
SourceClient: s.sourceClient,
DB: s.db,
}
- code, resolvedVersion, err := f.FetchAndUpdateState(r.Context(), modulePath, requestedVersion, s.cfg.AppVersionLabel())
+ code, resolvedVersion, err := f.FetchAndUpdateState(r.Context(), modulePath, requestedVersion, s.cfg.AppVersionLabel(), disableProxyFetch)
if err != nil {
return err.Error(), code
}
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index ef88d5b..47e4f64 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -178,7 +178,7 @@
// Use 10 workers to have parallelism consistent with the worker binary.
q := queue.NewInMemory(ctx, 10, nil, func(ctx context.Context, mpath, version string) (int, error) {
- code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "")
+ code, _, err := f.FetchAndUpdateState(ctx, mpath, version, "", false)
return code, err
})