internal/queue,frontend,worker: enqueue tasks disabling proxy fetch

If the status of a task to be enqueued indicates reprocessing,
then enqueue the task with a URL whose query param indicates that
proxy fetching should be disabled.

Change-Id: Id7255e861c1dea87d131d83151eb1a46df0ea4ff
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/278712
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jamal Carvalho <jamal@golang.org>
diff --git a/internal/frontend/fetch.go b/internal/frontend/fetch.go
index 0c20eae..fce071e 100644
--- a/internal/frontend/fetch.go
+++ b/internal/frontend/fetch.go
@@ -188,7 +188,7 @@
 
 			// A row for this modulePath and requestedVersion combination does not
 			// exist in version_map. Enqueue the module version to be fetched.
-			if _, err := s.queue.ScheduleFetch(ctx, modulePath, requestedVersion, ""); err != nil {
+			if _, err := s.queue.ScheduleFetch(ctx, modulePath, requestedVersion, "", false); err != nil {
 				fr.err = err
 				fr.status = http.StatusInternalServerError
 			}
diff --git a/internal/frontend/unit.go b/internal/frontend/unit.go
index 0728449..cf545f1 100644
--- a/internal/frontend/unit.go
+++ b/internal/frontend/unit.go
@@ -108,7 +108,7 @@
 		go func() {
 			ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
 			defer cancel()
-			if _, err := s.queue.ScheduleFetch(ctx, info.modulePath, info.requestedVersion, ""); err != nil {
+			if _, err := s.queue.ScheduleFetch(ctx, info.modulePath, info.requestedVersion, "", false); err != nil {
 				log.Errorf(ctx, "serveDetails(%q): %v", r.URL.Path, err)
 			}
 		}()
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 891df96..03578d1 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -30,7 +30,7 @@
 
 // A Queue provides an interface for asynchronous scheduling of fetch actions.
 type Queue interface {
-	ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (bool, error)
+	ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (bool, error)
 }
 
 // New creates a new Queue with name queueName based on the configuration
@@ -114,13 +114,13 @@
 // ScheduleFetch enqueues a task on GCP to fetch the given modulePath and
 // version. It returns an error if there was an error hashing the task name, or
 // an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil).
-func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (enqueued bool, err error) {
+func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (enqueued bool, err error) {
 	// the new taskqueue API requires a deadline of <= 30s
 	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
 	defer cancel()
 	defer derrors.Wrap(&err, "queue.ScheduleFetch(%q, %q, %q)", modulePath, version, suffix)
 
-	req := q.newTaskRequest(modulePath, version, suffix)
+	req := q.newTaskRequest(modulePath, version, suffix, disableProxyFetch)
 	enqueued = true
 	if _, err := q.client.CreateTask(ctx, req); err != nil {
 		if status.Code(err) == codes.AlreadyExists {
@@ -137,9 +137,12 @@
 // See https://cloud.google.com/tasks/docs/creating-http-target-tasks.
 const maxCloudTasksTimeout = 30 * time.Minute
 
-func (q *GCP) newTaskRequest(modulePath, version, suffix string) *taskspb.CreateTaskRequest {
+func (q *GCP) newTaskRequest(modulePath, version, suffix string, disableProxyFetch bool) *taskspb.CreateTaskRequest {
 	taskID := newTaskID(modulePath, version)
 	relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version)
+	if disableProxyFetch {
+		relativeURI += "?proxyfetch=off"
+	}
 	task := &taskspb.Task{
 		Name:             fmt.Sprintf("%s/tasks/%s", q.queueName, taskID),
 		DispatchDeadline: ptypes.DurationProto(maxCloudTasksTimeout),
@@ -250,7 +253,7 @@
 
 // ScheduleFetch pushes a fetch task into the local queue to be processed
 // asynchronously.
-func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, suffix string) (bool, error) {
+func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, _ string, _ bool) (bool, error) {
 	q.queue <- moduleVersion{modulePath, version}
 	return true, nil
 }
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
index b4afd2f..54a6e15 100644
--- a/internal/queue/queue_test.go
+++ b/internal/queue/queue_test.go
@@ -60,9 +60,17 @@
 	if err != nil {
 		t.Fatal(err)
 	}
-	got := gcp.newTaskRequest("mod", "v1.2.3", "suf")
+	got := gcp.newTaskRequest("mod", "v1.2.3", "suf", false)
 	want.Task.Name = got.Task.Name
 	if diff := cmp.Diff(want, got, cmp.Comparer(proto.Equal)); diff != "" {
 		t.Errorf("mismatch (-want, +got):\n%s", diff)
 	}
+
+	want.Task.MessageType.(*taskspb.Task_HttpRequest).HttpRequest.Url += "?proxyfetch=off"
+	got = gcp.newTaskRequest("mod", "v1.2.3", "suf", true)
+	want.Task.Name = got.Task.Name
+	if diff := cmp.Diff(want, got, cmp.Comparer(proto.Equal)); diff != "" {
+		t.Errorf("mismatch (-want, +got):\n%s", diff)
+	}
+
 }
diff --git a/internal/worker/server.go b/internal/worker/server.go
index b53d8cc..c35afdc 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -387,7 +387,8 @@
 		sem <- struct{}{}
 		go func() {
 			defer func() { <-sem }()
-			enqueued, err := s.queue.ScheduleFetch(ctx, m.ModulePath, m.Version, suffixParam)
+			enqueued, err := s.queue.ScheduleFetch(ctx, m.ModulePath, m.Version, suffixParam,
+				shouldDisableProxyFetch(m))
 			mu.Lock()
 			if err != nil {
 				log.Errorf(ctx, "enqueuing: %v", err)
@@ -407,6 +408,12 @@
 	return nil
 }
 
+func shouldDisableProxyFetch(m *internal.ModuleVersionState) bool {
+	// Don't ask the proxy to fetch if this module is being reprocessed.
+	// We use codes 52x and 54x for reprocessing.
+	return m.Status/10 == 52 || m.Status/10 == 54
+}
+
 // handleHTMLPage returns an HTML page using a template from s.templates.
 func (s *Server) handleHTMLPage(f func(w http.ResponseWriter, r *http.Request) error) http.HandlerFunc {
 	return func(w http.ResponseWriter, r *http.Request) {
@@ -434,7 +441,7 @@
 		return "", err
 	}
 	for _, v := range versions {
-		if _, err := s.queue.ScheduleFetch(ctx, stdlib.ModulePath, v, suffix); err != nil {
+		if _, err := s.queue.ScheduleFetch(ctx, stdlib.ModulePath, v, suffix, false); err != nil {
 			return "", fmt.Errorf("error scheduling fetch for %s: %w", v, err)
 		}
 	}
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index 592fc96..0822af8 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -324,6 +324,31 @@
 	}
 }
 
+func TestShouldDisableProxyFetch(t *testing.T) {
+	for _, test := range []struct {
+		status int
+		want   bool
+	}{
+		{200, false},
+		{490, false},
+		{500, false},
+		{520, true},
+		{542, true},
+		{580, false},
+	} {
+
+		got := shouldDisableProxyFetch(&internal.ModuleVersionState{
+			ModulePath: "m",
+			Version:    "v1.2.3",
+			Status:     test.status,
+		})
+		if got != test.want {
+			t.Errorf("status %d: got %t, want %t", test.status, got, test.want)
+		}
+
+	}
+}
+
 type fakeTransport struct{}
 
 func (fakeTransport) RoundTrip(*http.Request) (*http.Response, error) {