internal/queue: add package

A queue is added for processing scan requests for modules.

Change-Id: I80e47ed35e381a8b66ebc7055a54685b2641a9e9
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/464617
Auto-Submit: Julie Qiu <julieqiu@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Julie Qiu <julieqiu@google.com>
Reviewed-by: Julie Qiu <julieqiu@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/go.mod b/go.mod
index 41351a8..5e926bc 100644
--- a/go.mod
+++ b/go.mod
@@ -5,6 +5,7 @@
 require (
 	cloud.google.com/go v0.105.0
 	cloud.google.com/go/bigquery v1.43.0
+	cloud.google.com/go/cloudtasks v1.8.0
 	cloud.google.com/go/errorreporting v0.1.0
 	cloud.google.com/go/logging v1.6.1
 	github.com/GoogleCloudPlatform/opentelemetry-operations-go v1.0.0
@@ -25,6 +26,8 @@
 	golang.org/x/tools v0.5.1-0.20230117180257-8aba49bb5ea2
 	google.golang.org/api v0.103.0
 	google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c
+	google.golang.org/grpc v1.50.1
+	google.golang.org/protobuf v1.28.1
 	honnef.co/go/tools v0.2.2
 	mvdan.cc/unparam v0.0.0-20220926085101-66de63301820
 )
@@ -69,7 +72,5 @@
 	golang.org/x/text v0.6.0 // indirect
 	golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
 	google.golang.org/appengine v1.6.7 // indirect
-	google.golang.org/grpc v1.50.1 // indirect
-	google.golang.org/protobuf v1.28.1 // indirect
 	gopkg.in/warnings.v0 v0.1.2 // indirect
 )
diff --git a/go.sum b/go.sum
index e7be449..0ec6ecf 100644
--- a/go.sum
+++ b/go.sum
@@ -39,6 +39,8 @@
 cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
 cloud.google.com/go/bigquery v1.43.0 h1:u0fvz5ysJBe1jwUPI4LuPwAX+o+6fCUwf3ECeg6eDUQ=
 cloud.google.com/go/bigquery v1.43.0/go.mod h1:ZMQcXHsl+xmU1z36G2jNGZmKp9zNY5BUua5wDgmNCfw=
+cloud.google.com/go/cloudtasks v1.8.0 h1:faUiUgXjW8yVZ7XMnKHKm1WE4OldPBUWWfIRN/3z1dc=
+cloud.google.com/go/cloudtasks v1.8.0/go.mod h1:gQXUIwCSOI4yPVK7DgTVFiiP0ZW/eQkydWzwVMdHxrI=
 cloud.google.com/go/compute v0.1.0/go.mod h1:GAesmwr110a34z04OlxYkATPBEfVhkymfTBXtfbBFow=
 cloud.google.com/go/compute v1.2.0/go.mod h1:xlogom/6gr8RJGBe7nT2eGsQYAFUbbv8dbC29qE3Xmw=
 cloud.google.com/go/compute v1.12.1 h1:gKVJMEyqV5c/UnpzjjQbo3Rjvvqpr9B1DFSbJC4OXr0=
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
new file mode 100644
index 0000000..32e84d8
--- /dev/null
+++ b/internal/queue/queue.go
@@ -0,0 +1,296 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package queue provides queue implementations that can be used for
+// asynchronous scheduling of fetch actions.
+package queue
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"hash/fnv"
+	"io"
+	"math"
+	"strings"
+	"time"
+
+	cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
+	"golang.org/x/pkgsite-metrics/internal"
+	"golang.org/x/pkgsite-metrics/internal/config"
+	"golang.org/x/pkgsite-metrics/internal/derrors"
+	"golang.org/x/pkgsite-metrics/internal/log"
+	taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/types/known/durationpb"
+)
+
+// A Queue provides an interface for asynchronous scheduling of fetch actions.
+type Queue interface {
+	// Enqueue a scan request.
+	// Reports whether a new task was actually added.
+	EnqueueScan(context.Context, *internal.ScanRequest, *Options) (bool, error)
+}
+
+// New creates a new Queue with name queueName based on the configuration
+// in cfg. When running locally, Queue uses numWorkers concurrent workers.
+func New(ctx context.Context, cfg *config.Config, processFunc inMemoryProcessFunc) (Queue, error) {
+	if !config.OnCloudRun() {
+		return NewInMemory(ctx, cfg.LocalQueueWorkers, processFunc), nil
+	}
+	client, err := cloudtasks.NewClient(ctx)
+	if err != nil {
+		return nil, err
+	}
+	g, err := newGCP(cfg, client, cfg.QueueName)
+	if err != nil {
+		return nil, err
+	}
+	log.Infof(ctx, "enqueuing at %s with queueURL=%q", g.queueName, g.queueURL)
+	return g, nil
+}
+
+// GCP provides a Queue implementation backed by the Google Cloud Tasks
+// API.
+type GCP struct {
+	client    *cloudtasks.Client
+	queueName string // full GCP name of the queue
+	queueURL  string // non-AppEngine URL to post tasks to
+	// token holds information that lets the task queue construct an authorized request to the worker.
+	// Since the worker sits behind the IAP, the queue needs an identity token that includes the
+	// identity of a service account that has access, and the client ID for the IAP.
+	// We use the service account of the current process.
+	token *taskspb.HttpRequest_OidcToken
+}
+
+// NewGCP returns a new Queue that can be used to enqueue tasks using the
+// cloud tasks API.  The given queueID should be the name of the queue in the
+// cloud tasks console.
+func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) (_ *GCP, err error) {
+	defer derrors.Wrap(&err, "newGCP(cfg, client, %q)", queueID)
+	if queueID == "" {
+		return nil, errors.New("empty queueID")
+	}
+	if cfg.ProjectID == "" {
+		return nil, errors.New("empty ProjectID")
+	}
+	if cfg.LocationID == "" {
+		return nil, errors.New("empty LocationID")
+	}
+	if cfg.QueueURL == "" {
+		return nil, errors.New("empty QueueURL")
+	}
+	if cfg.ServiceAccount == "" {
+		return nil, errors.New("empty ServiceAccount")
+	}
+	return &GCP{
+		client:    client,
+		queueName: fmt.Sprintf("projects/%s/locations/%s/queues/%s", cfg.ProjectID, cfg.LocationID, queueID),
+		queueURL:  cfg.QueueURL,
+		token: &taskspb.HttpRequest_OidcToken{
+			OidcToken: &taskspb.OidcToken{
+				ServiceAccountEmail: cfg.ServiceAccount,
+			},
+		},
+	}, nil
+}
+
+// EnqeueuScan enqueues a task on GCP to fetch the given modulePath and
+// version. It returns an error if there was an error hashing the task name, or
+// an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil).
+func (q *GCP) EnqueueScan(ctx context.Context, sreq *internal.ScanRequest, opts *Options) (enqueued bool, err error) {
+	defer derrors.WrapStack(&err, "queue.EnqueueScan(%v, %v)", sreq, opts)
+	if opts == nil {
+		opts = &Options{}
+	}
+	// Cloud Tasks enforces an RPC timeout of at most 30s. I couldn't find this
+	// in the documentation, but using a larger value, or no timeout, results in
+	// an InvalidArgument error with the text "The deadline cannot be more than
+	// 30s in the future."
+	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+	defer cancel()
+
+	req, err := q.newTaskRequest(sreq, opts)
+	if err != nil {
+		return false, fmt.Errorf("q.newTaskRequest(modulePath, version, importedBy, opts): %v", err)
+	}
+
+	enqueued = true
+	if _, err := q.client.CreateTask(ctx, req); err != nil {
+		if status.Code(err) == codes.AlreadyExists {
+			log.Debugf(ctx, "ignoring duplicate task ID %s: %s@%s", req.Task.Name, sreq.Module, sreq.Version)
+			enqueued = false
+		} else {
+			return false, fmt.Errorf("q.client.CreateTask(ctx, req): %v", err)
+		}
+	}
+	return enqueued, nil
+}
+
+// Options is used to provide option arguments for a task queue.
+type Options struct {
+	// Namespace prefixes the URL path.
+	Namespace string
+	// DisableProxyFetch reports whether proxyfetch should be set to off when
+	// making a fetch request.
+	DisableProxyFetch bool
+
+	// TaskNameSuffix is appended to the task name to force reprocessing of
+	// tasks that would normally be de-duplicated.
+	TaskNameSuffix string
+}
+
+// Maximum timeout for HTTP tasks.
+// See https://cloud.google.com/tasks/docs/creating-http-target-tasks.
+const maxCloudTasksTimeout = 30 * time.Minute
+
+const (
+	DisableProxyFetchParam = "proxyfetch"
+	DisableProxyFetchValue = "off"
+)
+
+func (q *GCP) newTaskRequest(sreq *internal.ScanRequest, opts *Options) (_ *taskspb.CreateTaskRequest, err error) {
+	defer derrors.Wrap(&err, "newTaskRequest(%v, %v)", sreq, opts)
+
+	if sreq.Mode == "" {
+		return nil, errors.New("ScanRequest.Mode cannot be empty")
+	}
+	if opts.Namespace == "" {
+		return nil, errors.New("Options.Namespace cannot be empty")
+	}
+	taskID := newTaskID(sreq.Module, sreq.Version)
+	relativeURI := fmt.Sprintf("/%s/scan/%s", opts.Namespace, sreq.URLPathAndParams())
+	var params []string
+	if opts.DisableProxyFetch {
+		params = append(params, fmt.Sprintf("%s=%s", DisableProxyFetchParam, DisableProxyFetchValue))
+	}
+	if len(params) > 0 {
+		relativeURI += fmt.Sprintf("?%s", strings.Join(params, "&"))
+	}
+
+	task := &taskspb.Task{
+		Name:             fmt.Sprintf("%s/tasks/%s", q.queueName, taskID),
+		DispatchDeadline: durationpb.New(maxCloudTasksTimeout),
+	}
+	task.MessageType = &taskspb.Task_HttpRequest{
+		HttpRequest: &taskspb.HttpRequest{
+			HttpMethod:          taskspb.HttpMethod_POST,
+			Url:                 q.queueURL + relativeURI,
+			AuthorizationHeader: q.token,
+		},
+	}
+	req := &taskspb.CreateTaskRequest{
+		Parent: q.queueName,
+		Task:   task,
+	}
+	// If suffix is non-empty, append it to the task name. The same goes for mode.
+	// This lets us force reprocessing of tasks that would normally be de-duplicated.
+	if opts.TaskNameSuffix != "" {
+		req.Task.Name += "-" + opts.TaskNameSuffix
+	}
+	req.Task.Name += "-" + sreq.Mode
+	return req, nil
+}
+
+// Create a task ID for the given module path and version.
+// Task IDs can contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-), or underscores (_).
+func newTaskID(modulePath, version string) string {
+	mv := modulePath + "@" + version
+	// Compute a hash to use as a prefix, so the task IDs are distributed uniformly.
+	// See https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#task
+	// under "Task De-duplication".
+	hasher := fnv.New32()
+	io.WriteString(hasher, mv)
+	hash := hasher.Sum32() % math.MaxUint16
+	// Escape the name so it contains only valid characters. Do our best to make it readable.
+	var b strings.Builder
+	for _, r := range mv {
+		switch {
+		case r >= 'A' && r <= 'Z' || r >= 'a' && r <= 'z' || r >= '0' && r <= '9' || r == '-':
+			b.WriteRune(r)
+		case r == '_':
+			b.WriteString("__")
+		case r == '/':
+			b.WriteString("_-")
+		case r == '@':
+			b.WriteString("_v")
+		case r == '.':
+			b.WriteString("_o")
+		default:
+			fmt.Fprintf(&b, "_%04x", r)
+		}
+	}
+	return fmt.Sprintf("%04x-%s", hash, &b)
+}
+
+// InMemory is a Queue implementation that schedules in-process fetch
+// operations. Unlike the GCP task queue, it will not automatically retry tasks
+// on failure.
+//
+// This should only be used for local development.
+type InMemory struct {
+	queue chan *internal.ScanRequest
+	done  chan struct{}
+}
+
+type inMemoryProcessFunc func(context.Context, *internal.ScanRequest) (int, error)
+
+// NewInMemory creates a new InMemory that asynchronously fetches
+// from proxyClient and stores in db. It uses workerCount parallelism to
+// execute these fetches.
+func NewInMemory(ctx context.Context, workerCount int, processFunc inMemoryProcessFunc) *InMemory {
+	q := &InMemory{
+		queue: make(chan *internal.ScanRequest, 1000),
+		done:  make(chan struct{}),
+	}
+	sem := make(chan struct{}, workerCount)
+	go func() {
+		for v := range q.queue {
+			select {
+			case <-ctx.Done():
+				return
+			case sem <- struct{}{}:
+			}
+
+			// If a worker is available, make a request to the fetch service inside a
+			// goroutine and wait for it to finish.
+			go func(r *internal.ScanRequest) {
+				defer func() { <-sem }()
+
+				log.Infof(ctx, "Fetch requested: %v (workerCount = %d)", r, cap(sem))
+
+				fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
+				defer cancel()
+
+				if _, err := processFunc(fetchCtx, r); err != nil {
+					log.Errorf(fetchCtx, "processFunc(%q, %q): %v", r.Path, r.Version, err)
+				}
+			}(v)
+		}
+		for i := 0; i < cap(sem); i++ {
+			select {
+			case <-ctx.Done():
+				panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err()))
+			case sem <- struct{}{}:
+			}
+		}
+		close(q.done)
+	}()
+	return q
+}
+
+// EnqeueuScan pushes a fetch task into the local queue to be processed
+// asynchronously.
+func (q *InMemory) EnqueueScan(ctx context.Context, req *internal.ScanRequest, _ *Options) (bool, error) {
+	q.queue <- req
+	return true, nil
+}
+
+// WaitForTesting waits for all queued requests to finish. It should only be
+// used by test code.
+func (q *InMemory) WaitForTesting(ctx context.Context) {
+	close(q.queue)
+	<-q.done
+}
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
new file mode 100644
index 0000000..026c9a8
--- /dev/null
+++ b/internal/queue/queue_test.go
@@ -0,0 +1,93 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+	"testing"
+
+	"github.com/google/go-cmp/cmp"
+	"golang.org/x/pkgsite-metrics/internal"
+	"golang.org/x/pkgsite-metrics/internal/config"
+	taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
+	"google.golang.org/protobuf/proto"
+	"google.golang.org/protobuf/types/known/durationpb"
+)
+
+func TestNewTaskID(t *testing.T) {
+	for _, test := range []struct {
+		modulePath, version string
+		want                string
+	}{
+		{"m-1", "v2", "acc5-m-1_vv2"},
+		{"my_module", "v1.2.3", "0cb9-my__module_vv1_o2_o3"},
+		{"µπΩ/github.com", "v2.3.4-ß", "a49c-_00b5_03c0_03a9_-github_ocom_vv2_o3_o4-_00df"},
+	} {
+		got := newTaskID(test.modulePath, test.version)
+		if got != test.want {
+			t.Errorf("%s@%s: got %s, want %s", test.modulePath, test.version, got, test.want)
+		}
+	}
+}
+
+func TestNewTaskRequest(t *testing.T) {
+	cfg := config.Config{
+		ProjectID:      "Project",
+		LocationID:     "us-central1",
+		QueueURL:       "http://1.2.3.4:8000",
+		ServiceAccount: "sa",
+	}
+	want := &taskspb.CreateTaskRequest{
+		Parent: "projects/Project/locations/us-central1/queues/queueID",
+		Task: &taskspb.Task{
+			DispatchDeadline: durationpb.New(maxCloudTasksTimeout),
+			MessageType: &taskspb.Task_HttpRequest{
+				HttpRequest: &taskspb.HttpRequest{
+					HttpMethod: taskspb.HttpMethod_POST,
+					Url:        "http://1.2.3.4:8000/test/scan/mod/@v/v1.2.3?importedby=0&mode=test&insecure=true",
+					AuthorizationHeader: &taskspb.HttpRequest_OidcToken{
+						OidcToken: &taskspb.OidcToken{
+							ServiceAccountEmail: "sa",
+						},
+					},
+				},
+			},
+		},
+	}
+	gcp, err := newGCP(&cfg, nil, "queueID")
+	if err != nil {
+		t.Fatal(err)
+	}
+	opts := &Options{
+		Namespace:      "test",
+		TaskNameSuffix: "suf",
+	}
+	sreq := &internal.ScanRequest{
+		Module:     "mod",
+		Version:    "v1.2.3",
+		ImportedBy: 0,
+		Mode:       "test",
+		Insecure:   true,
+	}
+	got, err := gcp.newTaskRequest(sreq, opts)
+	if err != nil {
+		t.Fatal(err)
+	}
+	want.Task.Name = got.Task.Name
+	if diff := cmp.Diff(want, got, cmp.Comparer(proto.Equal)); diff != "" {
+		t.Errorf("mismatch (-want, +got):\n%s", diff)
+	}
+
+	opts.DisableProxyFetch = true
+	want.Task.MessageType.(*taskspb.Task_HttpRequest).HttpRequest.Url += "?proxyfetch=off"
+	got, err = gcp.newTaskRequest(sreq, opts)
+	if err != nil {
+		t.Fatal(err)
+	}
+	want.Task.Name = got.Task.Name
+	if diff := cmp.Diff(want, got, cmp.Comparer(proto.Equal)); diff != "" {
+		t.Errorf("mismatch (-want, +got):\n%s", diff)
+	}
+
+}