internal/queue: add Options

queue.Options is now used to pass optional arguments to ScheduleFetch.

Change-Id: I4e00a91ad7d2cfa2ccf669d643a21fda6b3b3fb8
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/348815
Trust: Julie Qiu <julie@golang.org>
Run-TryBot: Julie Qiu <julie@golang.org>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/frontend/fetch.go b/internal/frontend/fetch.go
index f329d86..5e00260 100644
--- a/internal/frontend/fetch.go
+++ b/internal/frontend/fetch.go
@@ -196,7 +196,7 @@
 
 			// A row for this modulePath and requestedVersion combination does not
 			// exist in version_map. Enqueue the module version to be fetched.
-			if _, err := s.queue.ScheduleFetch(ctx, modulePath, requestedVersion, "", false); err != nil {
+			if _, err := s.queue.ScheduleFetch(ctx, modulePath, requestedVersion, nil); err != nil {
 				fr.err = err
 				fr.status = http.StatusInternalServerError
 			}
diff --git a/internal/frontend/unit.go b/internal/frontend/unit.go
index eddcd7b..057bf95 100644
--- a/internal/frontend/unit.go
+++ b/internal/frontend/unit.go
@@ -143,7 +143,7 @@
 			ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
 			defer cancel()
 			log.Infof(ctx, "serveUnitPage: Scheduling %q@%q to be fetched", um.ModulePath, info.requestedVersion)
-			if _, err := s.queue.ScheduleFetch(ctx, um.ModulePath, info.requestedVersion, "", false); err != nil {
+			if _, err := s.queue.ScheduleFetch(ctx, um.ModulePath, info.requestedVersion, nil); err != nil {
 				log.Errorf(ctx, "serveUnitPage(%q): scheduling fetch for %q@%q: %v",
 					r.URL.Path, um.ModulePath, info.requestedVersion, err)
 			}
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 8905e1f..b8f9e76 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -31,7 +31,7 @@
 
 // A Queue provides an interface for asynchronous scheduling of fetch actions.
 type Queue interface {
-	ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (bool, error)
+	ScheduleFetch(ctx context.Context, modulePath, version string, opts *Options) (bool, error)
 }
 
 // New creates a new Queue with name queueName based on the configuration
@@ -115,9 +115,11 @@
 // ScheduleFetch enqueues a task on GCP to fetch the given modulePath and
 // version. It returns an error if there was an error hashing the task name, or
 // an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil).
-func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version, suffix string, disableProxyFetch bool) (enqueued bool, err error) {
-	defer derrors.WrapStack(&err, "queue.ScheduleFetch(%q, %q, %q)", modulePath, version, suffix)
-
+func (q *GCP) ScheduleFetch(ctx context.Context, modulePath, version string, opts *Options) (enqueued bool, err error) {
+	defer derrors.WrapStack(&err, "queue.ScheduleFetch(%q, %q, %v)", modulePath, version, opts)
+	if opts == nil {
+		opts = &Options{}
+	}
 	// Cloud Tasks enforces an RPC timeout of at most 30s. I couldn't find this
 	// in the documentation, but using a larger value, or no timeout, results in
 	// an InvalidArgument error with the text "The deadline cannot be more than
@@ -128,7 +130,7 @@
 	if modulePath == internal.UnknownModulePath {
 		return false, errors.New("given unknown module path")
 	}
-	req := q.newTaskRequest(modulePath, version, suffix, disableProxyFetch)
+	req := q.newTaskRequest(modulePath, version, opts)
 	enqueued = true
 	if _, err := q.client.CreateTask(ctx, req); err != nil {
 		if status.Code(err) == codes.AlreadyExists {
@@ -141,6 +143,17 @@
 	return enqueued, nil
 }
 
+// Options is used to provide option arguments for a task queue.
+type Options struct {
+	// DisableProxyFetch reports whether proxyfetch should be set to off when
+	// making a fetch request.
+	DisableProxyFetch bool
+
+	// Suffix is used to force reprocessing of tasks that would normally be
+	// de-duplicated. It is appended to the task name.
+	Suffix string
+}
+
 // Maximum timeout for HTTP tasks.
 // See https://cloud.google.com/tasks/docs/creating-http-target-tasks.
 const maxCloudTasksTimeout = 30 * time.Minute
@@ -150,10 +163,10 @@
 	DisableProxyFetchValue = "off"
 )
 
-func (q *GCP) newTaskRequest(modulePath, version, suffix string, disableProxyFetch bool) *taskspb.CreateTaskRequest {
+func (q *GCP) newTaskRequest(modulePath, version string, opts *Options) *taskspb.CreateTaskRequest {
 	taskID := newTaskID(modulePath, version)
 	relativeURI := fmt.Sprintf("/fetch/%s/@v/%s", modulePath, version)
-	if disableProxyFetch {
+	if opts.DisableProxyFetch {
 		relativeURI += fmt.Sprintf("?%s=%s", DisableProxyFetchParam, DisableProxyFetchValue)
 	}
 	task := &taskspb.Task{
@@ -173,8 +186,8 @@
 	}
 	// If suffix is non-empty, append it to the task name. This lets us force reprocessing
 	// of tasks that would normally be de-duplicated.
-	if suffix != "" {
-		req.Task.Name += "-" + suffix
+	if opts.Suffix != "" {
+		req.Task.Name += "-" + opts.Suffix
 	}
 	return req
 }
@@ -271,7 +284,7 @@
 
 // ScheduleFetch pushes a fetch task into the local queue to be processed
 // asynchronously.
-func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version, _ string, _ bool) (bool, error) {
+func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *Options) (bool, error) {
 	q.queue <- internal.Modver{Path: modulePath, Version: version}
 	return true, nil
 }
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
index 54a6e15..80a5067 100644
--- a/internal/queue/queue_test.go
+++ b/internal/queue/queue_test.go
@@ -60,14 +60,18 @@
 	if err != nil {
 		t.Fatal(err)
 	}
-	got := gcp.newTaskRequest("mod", "v1.2.3", "suf", false)
+	opts := &Options{
+		Suffix: "suf",
+	}
+	got := gcp.newTaskRequest("mod", "v1.2.3", opts)
 	want.Task.Name = got.Task.Name
 	if diff := cmp.Diff(want, got, cmp.Comparer(proto.Equal)); diff != "" {
 		t.Errorf("mismatch (-want, +got):\n%s", diff)
 	}
 
 	want.Task.MessageType.(*taskspb.Task_HttpRequest).HttpRequest.Url += "?proxyfetch=off"
-	got = gcp.newTaskRequest("mod", "v1.2.3", "suf", true)
+	opts.DisableProxyFetch = true
+	got = gcp.newTaskRequest("mod", "v1.2.3", opts)
 	want.Task.Name = got.Task.Name
 	if diff := cmp.Diff(want, got, cmp.Comparer(proto.Equal)); diff != "" {
 		t.Errorf("mismatch (-want, +got):\n%s", diff)
diff --git a/internal/worker/server.go b/internal/worker/server.go
index e1d4b9d..899caa3 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -440,11 +440,14 @@
 	sem := make(chan struct{}, concurrentEnqueues)
 	for _, m := range modules {
 		m := m
+		opts := queue.Options{
+			Suffix:            suffixParam,
+			DisableProxyFetch: shouldDisableProxyFetch(m),
+		}
 		sem <- struct{}{}
 		go func() {
 			defer func() { <-sem }()
-			enqueued, err := s.queue.ScheduleFetch(ctx, m.ModulePath, m.Version, suffixParam,
-				shouldDisableProxyFetch(m))
+			enqueued, err := s.queue.ScheduleFetch(ctx, m.ModulePath, m.Version, &opts)
 			mu.Lock()
 			if err != nil {
 				log.Errorf(ctx, "enqueuing: %v", err)
@@ -492,7 +495,7 @@
 			return err
 		}
 		if vm.ResolvedVersion != resolvedVersion {
-			if _, err := s.queue.ScheduleFetch(r.Context(), stdlib.ModulePath, requestedVersion, "", false); err != nil {
+			if _, err := s.queue.ScheduleFetch(r.Context(), stdlib.ModulePath, requestedVersion, nil); err != nil {
 				return fmt.Errorf("error scheduling fetch for %s: %w", requestedVersion, err)
 			}
 		}
@@ -517,7 +520,10 @@
 		return "", err
 	}
 	for _, v := range versions {
-		if _, err := s.queue.ScheduleFetch(ctx, stdlib.ModulePath, v, suffix, false); err != nil {
+		opts := &queue.Options{
+			Suffix: suffix,
+		}
+		if _, err := s.queue.ScheduleFetch(ctx, stdlib.ModulePath, v, opts); err != nil {
 			return "", fmt.Errorf("error scheduling fetch for %s: %w", v, err)
 		}
 	}