internal/queue,frontend,worker: enqueue tasks disabling proxy fetch
If the status of a task to be enqueued indicates reprocessing,
then enqueue the task with a URL whose query param indicates that
proxy fetching should be disabled.
Change-Id: Id7255e861c1dea87d131d83151eb1a46df0ea4ff
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/278712
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jamal Carvalho <jamal@golang.org>
diff --git a/internal/frontend/fetch.go b/internal/frontend/fetch.go
index 0c20eae..fce071e 100644
--- a/internal/frontend/fetch.go
+++ b/internal/frontend/fetch.go
@@ -188,7 +188,7 @@
// A row for this modulePath and requestedVersion combination does not
// exist in version_map. Enqueue the module version to be fetched.
- if _, err := s.queue.ScheduleFetch(ctx, modulePath, requestedVersion, ""); err != nil {
+ if _, err := s.queue.ScheduleFetch(ctx, modulePath, requestedVersion, "", false); err != nil {
fr.err = err
fr.status = http.StatusInternalServerError
}
diff --git a/internal/frontend/unit.go b/internal/frontend/unit.go
index 0728449..cf545f1 100644
--- a/internal/frontend/unit.go
+++ b/internal/frontend/unit.go
@@ -108,7 +108,7 @@
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
- if _, err := s.queue.ScheduleFetch(ctx, info.modulePath, info.requestedVersion, ""); err != nil {
+ if _, err := s.queue.ScheduleFetch(ctx, info.modulePath, info.requestedVersion, "", false); err != nil {
log.Errorf(ctx, "serveDetails(%q): %v", r.URL.Path, err)
}
}()
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 891df96..03578d1 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -30,7 +30,7 @@
// A Queue provides an interface for asynchronous scheduling of fetch actions.
type Queue interface {
- ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (bool, error)
+ ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (bool, error)
}
// New creates a new Queue with name queueName based on the configuration
@@ -114,13 +114,13 @@
// ScheduleFetch enqueues a task on GCP to fetch the given modulePath and
// version. It returns an error if there was an error hashing the task name, or
// an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil).
-func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (enqueued bool, err error) {
+func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (enqueued bool, err error) {
// the new taskqueue API requires a deadline of <= 30s
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q)", modulePath, version, suffix)
- req := q.newTaskRequest(modulePath, version, suffix)
+ req := q.newTaskRequest(modulePath, version, suffix, disableProxyFetch)
enqueued = true
if _, err := q.client.CreateTask(ctx, req); err != nil {
if status.Code(err) == codes.AlreadyExists {
@@ -137,9 +137,12 @@
// See https://cloud.google.com/tasks/docs/creating-http-target-tasks.
const maxCloudTasksTimeout = 30 * time.Minute
-func (q *GCP) newTaskRequest(modulePath, version, suffix string) *taskspb.CreateTaskRequest {
+func (q *GCP) newTaskRequest(modulePath, version, suffix string, disableProxyFetch bool) *taskspb.CreateTaskRequest {
taskID := newTaskID(modulePath, version)
relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version)
+ if disableProxyFetch {
+ relativeURI += "?proxyfetch=off"
+ }
task := &taskspb.Task{
Name: fmt.Sprintf("%s/tasks/%s", q.queueName, taskID),
DispatchDeadline: ptypes.DurationProto(maxCloudTasksTimeout),
@@ -250,7 +253,7 @@
// ScheduleFetch pushes a fetch task into the local queue to be processed
// asynchronously.
-func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (bool, error) {
+func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, _ string, _ bool) (bool, error) {
q.queue <- moduleVersion{modulePath, version}
return true, nil
}
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
index b4afd2f..54a6e15 100644
--- a/internal/queue/queue_test.go
+++ b/internal/queue/queue_test.go
@@ -60,9 +60,17 @@
if err != nil {
t.Fatal(err)
}
- got := gcp.newTaskRequest("mod", "v1.2.3", "suf")
+ got := gcp.newTaskRequest("mod", "v1.2.3", "suf", false)
want.Task.Name = got.Task.Name
if diff := cmp.Diff(want, got, cmp.Comparer(proto.Equal)); diff != "" {
t.Errorf("mismatch (-want, +got):\n%s", diff)
}
+
+ want.Task.MessageType.(*taskspb.Task_HttpRequest).HttpRequest.Url += "?proxyfetch=off"
+ got = gcp.newTaskRequest("mod", "v1.2.3", "suf", true)
+ want.Task.Name = got.Task.Name
+ if diff := cmp.Diff(want, got, cmp.Comparer(proto.Equal)); diff != "" {
+ t.Errorf("mismatch (-want, +got):\n%s", diff)
+ }
+
}
diff --git a/internal/worker/server.go b/internal/worker/server.go
index b53d8cc..c35afdc 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -387,7 +387,8 @@
sem <- struct{}{}
go func() {
defer func() { <-sem }()
- enqueued, err := s.queue.ScheduleFetch(ctx, m.ModulePath, m.Version, suffixParam)
+ enqueued, err := s.queue.ScheduleFetch(ctx, m.ModulePath, m.Version, suffixParam,
+ shouldDisableProxyFetch(m))
mu.Lock()
if err != nil {
log.Errorf(ctx, "enqueuing: %v", err)
@@ -407,6 +408,12 @@
return nil
}
+func shouldDisableProxyFetch(m *internal.ModuleVersionState) bool {
+ // Don't ask the proxy to fetch if this module is being reprocessed.
+ // We use codes 52x and 54x for reprocessing.
+ return m.Status/10 == 52 || m.Status/10 == 54
+}
+
// handleHTMLPage returns an HTML page using a template from s.templates.
func (s *Server) handleHTMLPage(f func(w http.ResponseWriter, r *http.Request) error) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
@@ -434,7 +441,7 @@
return "", err
}
for _, v := range versions {
- if _, err := s.queue.ScheduleFetch(ctx, stdlib.ModulePath, v, suffix); err != nil {
+ if _, err := s.queue.ScheduleFetch(ctx, stdlib.ModulePath, v, suffix, false); err != nil {
return "", fmt.Errorf("error scheduling fetch for %s: %w", v, err)
}
}
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index 592fc96..0822af8 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -324,6 +324,31 @@
}
}
+func TestShouldDisableProxyFetch(t *testing.T) {
+ for _, test := range []struct {
+ status int
+ want bool
+ }{
+ {200, false},
+ {490, false},
+ {500, false},
+ {520, true},
+ {542, true},
+ {580, false},
+ } {
+
+ got := shouldDisableProxyFetch(&internal.ModuleVersionState{
+ ModulePath: "m",
+ Version: "v1.2.3",
+ Status: test.status,
+ })
+ if got != test.want {
+ t.Errorf("status %d: got %t, want %t", test.status, got, test.want)
+ }
+
+ }
+}
+
type fakeTransport struct{}
func (fakeTransport) RoundTrip(*http.Request) (*http.Response, error) {