internal/queue: add package
A queue is added for processing scan requests for modules.
Change-Id: I80e47ed35e381a8b66ebc7055a54685b2641a9e9
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/464617
Auto-Submit: Julie Qiu <julieqiu@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Julie Qiu <julieqiu@google.com>
Reviewed-by: Julie Qiu <julieqiu@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/go.mod b/go.mod
index 41351a8..5e926bc 100644
--- a/go.mod
+++ b/go.mod
@@ -5,6 +5,7 @@
require (
cloud.google.com/go v0.105.0
cloud.google.com/go/bigquery v1.43.0
+ cloud.google.com/go/cloudtasks v1.8.0
cloud.google.com/go/errorreporting v0.1.0
cloud.google.com/go/logging v1.6.1
github.com/GoogleCloudPlatform/opentelemetry-operations-go v1.0.0
@@ -25,6 +26,8 @@
golang.org/x/tools v0.5.1-0.20230117180257-8aba49bb5ea2
google.golang.org/api v0.103.0
google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c
+ google.golang.org/grpc v1.50.1
+ google.golang.org/protobuf v1.28.1
honnef.co/go/tools v0.2.2
mvdan.cc/unparam v0.0.0-20220926085101-66de63301820
)
@@ -69,7 +72,5 @@
golang.org/x/text v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
- google.golang.org/grpc v1.50.1 // indirect
- google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
)
diff --git a/go.sum b/go.sum
index e7be449..0ec6ecf 100644
--- a/go.sum
+++ b/go.sum
@@ -39,6 +39,8 @@
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
cloud.google.com/go/bigquery v1.43.0 h1:u0fvz5ysJBe1jwUPI4LuPwAX+o+6fCUwf3ECeg6eDUQ=
cloud.google.com/go/bigquery v1.43.0/go.mod h1:ZMQcXHsl+xmU1z36G2jNGZmKp9zNY5BUua5wDgmNCfw=
+cloud.google.com/go/cloudtasks v1.8.0 h1:faUiUgXjW8yVZ7XMnKHKm1WE4OldPBUWWfIRN/3z1dc=
+cloud.google.com/go/cloudtasks v1.8.0/go.mod h1:gQXUIwCSOI4yPVK7DgTVFiiP0ZW/eQkydWzwVMdHxrI=
cloud.google.com/go/compute v0.1.0/go.mod h1:GAesmwr110a34z04OlxYkATPBEfVhkymfTBXtfbBFow=
cloud.google.com/go/compute v1.2.0/go.mod h1:xlogom/6gr8RJGBe7nT2eGsQYAFUbbv8dbC29qE3Xmw=
cloud.google.com/go/compute v1.12.1 h1:gKVJMEyqV5c/UnpzjjQbo3Rjvvqpr9B1DFSbJC4OXr0=
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
new file mode 100644
index 0000000..32e84d8
--- /dev/null
+++ b/internal/queue/queue.go
@@ -0,0 +1,296 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package queue provides queue implementations that can be used for
+// asynchronous scheduling of fetch actions.
+package queue
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "hash/fnv"
+ "io"
+ "math"
+ "strings"
+ "time"
+
+ cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
+ "golang.org/x/pkgsite-metrics/internal"
+ "golang.org/x/pkgsite-metrics/internal/config"
+ "golang.org/x/pkgsite-metrics/internal/derrors"
+ "golang.org/x/pkgsite-metrics/internal/log"
+ taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/types/known/durationpb"
+)
+
+// A Queue provides an interface for asynchronous scheduling of fetch actions.
+type Queue interface {
+ // Enqueue a scan request.
+ // Reports whether a new task was actually added.
+ EnqueueScan(context.Context, *internal.ScanRequest, *Options) (bool, error)
+}
+
+// New creates a new Queue with name queueName based on the configuration
+// in cfg. When running locally, Queue uses numWorkers concurrent workers.
+func New(ctx context.Context, cfg *config.Config, processFunc inMemoryProcessFunc) (Queue, error) {
+ if !config.OnCloudRun() {
+ return NewInMemory(ctx, cfg.LocalQueueWorkers, processFunc), nil
+ }
+ client, err := cloudtasks.NewClient(ctx)
+ if err != nil {
+ return nil, err
+ }
+ g, err := newGCP(cfg, client, cfg.QueueName)
+ if err != nil {
+ return nil, err
+ }
+ log.Infof(ctx, "enqueuing at %s with queueURL=%q", g.queueName, g.queueURL)
+ return g, nil
+}
+
+// GCP provides a Queue implementation backed by the Google Cloud Tasks
+// API.
+type GCP struct {
+ client *cloudtasks.Client
+ queueName string // full GCP name of the queue
+ queueURL string // non-AppEngine URL to post tasks to
+ // token holds information that lets the task queue construct an authorized request to the worker.
+ // Since the worker sits behind the IAP, the queue needs an identity token that includes the
+ // identity of a service account that has access, and the client ID for the IAP.
+ // We use the service account of the current process.
+ token *taskspb.HttpRequest_OidcToken
+}
+
+// NewGCP returns a new Queue that can be used to enqueue tasks using the
+// cloud tasks API. The given queueID should be the name of the queue in the
+// cloud tasks console.
+func newGCP(cfg *config.Config, client *cloudtasks.Client, queueID string) (_ *GCP, err error) {
+ defer derrors.Wrap(&err, "newGCP(cfg, client, %q)", queueID)
+ if queueID == "" {
+ return nil, errors.New("empty queueID")
+ }
+ if cfg.ProjectID == "" {
+ return nil, errors.New("empty ProjectID")
+ }
+ if cfg.LocationID == "" {
+ return nil, errors.New("empty LocationID")
+ }
+ if cfg.QueueURL == "" {
+ return nil, errors.New("empty QueueURL")
+ }
+ if cfg.ServiceAccount == "" {
+ return nil, errors.New("empty ServiceAccount")
+ }
+ return &GCP{
+ client: client,
+ queueName: fmt.Sprintf("projects/%s/locations/%s/queues/%s", cfg.ProjectID, cfg.LocationID, queueID),
+ queueURL: cfg.QueueURL,
+ token: &taskspb.HttpRequest_OidcToken{
+ OidcToken: &taskspb.OidcToken{
+ ServiceAccountEmail: cfg.ServiceAccount,
+ },
+ },
+ }, nil
+}
+
+// EnqeueuScan enqueues a task on GCP to fetch the given modulePath and
+// version. It returns an error if there was an error hashing the task name, or
+// an error pushing the task to GCP. If the task was a duplicate, it returns (false, nil).
+func (q *GCP) EnqueueScan(ctx context.Context, sreq *internal.ScanRequest, opts *Options) (enqueued bool, err error) {
+ defer derrors.WrapStack(&err, "queue.EnqueueScan(%v, %v)", sreq, opts)
+ if opts == nil {
+ opts = &Options{}
+ }
+ // Cloud Tasks enforces an RPC timeout of at most 30s. I couldn't find this
+ // in the documentation, but using a larger value, or no timeout, results in
+ // an InvalidArgument error with the text "The deadline cannot be more than
+ // 30s in the future."
+ ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
+ defer cancel()
+
+ req, err := q.newTaskRequest(sreq, opts)
+ if err != nil {
+ return false, fmt.Errorf("q.newTaskRequest(modulePath, version, importedBy, opts): %v", err)
+ }
+
+ enqueued = true
+ if _, err := q.client.CreateTask(ctx, req); err != nil {
+ if status.Code(err) == codes.AlreadyExists {
+ log.Debugf(ctx, "ignoring duplicate task ID %s: %s@%s", req.Task.Name, sreq.Module, sreq.Version)
+ enqueued = false
+ } else {
+ return false, fmt.Errorf("q.client.CreateTask(ctx, req): %v", err)
+ }
+ }
+ return enqueued, nil
+}
+
+// Options is used to provide option arguments for a task queue.
+type Options struct {
+ // Namespace prefixes the URL path.
+ Namespace string
+ // DisableProxyFetch reports whether proxyfetch should be set to off when
+ // making a fetch request.
+ DisableProxyFetch bool
+
+ // TaskNameSuffix is appended to the task name to force reprocessing of
+ // tasks that would normally be de-duplicated.
+ TaskNameSuffix string
+}
+
+// Maximum timeout for HTTP tasks.
+// See https://cloud.google.com/tasks/docs/creating-http-target-tasks.
+const maxCloudTasksTimeout = 30 * time.Minute
+
+const (
+ DisableProxyFetchParam = "proxyfetch"
+ DisableProxyFetchValue = "off"
+)
+
+func (q *GCP) newTaskRequest(sreq *internal.ScanRequest, opts *Options) (_ *taskspb.CreateTaskRequest, err error) {
+ defer derrors.Wrap(&err, "newTaskRequest(%v, %v)", sreq, opts)
+
+ if sreq.Mode == "" {
+ return nil, errors.New("ScanRequest.Mode cannot be empty")
+ }
+ if opts.Namespace == "" {
+ return nil, errors.New("Options.Namespace cannot be empty")
+ }
+ taskID := newTaskID(sreq.Module, sreq.Version)
+ relativeURI := fmt.Sprintf("/%s/scan/%s", opts.Namespace, sreq.URLPathAndParams())
+ var params []string
+ if opts.DisableProxyFetch {
+ params = append(params, fmt.Sprintf("%s=%s", DisableProxyFetchParam, DisableProxyFetchValue))
+ }
+ if len(params) > 0 {
+ relativeURI += fmt.Sprintf("?%s", strings.Join(params, "&"))
+ }
+
+ task := &taskspb.Task{
+ Name: fmt.Sprintf("%s/tasks/%s", q.queueName, taskID),
+ DispatchDeadline: durationpb.New(maxCloudTasksTimeout),
+ }
+ task.MessageType = &taskspb.Task_HttpRequest{
+ HttpRequest: &taskspb.HttpRequest{
+ HttpMethod: taskspb.HttpMethod_POST,
+ Url: q.queueURL + relativeURI,
+ AuthorizationHeader: q.token,
+ },
+ }
+ req := &taskspb.CreateTaskRequest{
+ Parent: q.queueName,
+ Task: task,
+ }
+ // If suffix is non-empty, append it to the task name. The same goes for mode.
+ // This lets us force reprocessing of tasks that would normally be de-duplicated.
+ if opts.TaskNameSuffix != "" {
+ req.Task.Name += "-" + opts.TaskNameSuffix
+ }
+ req.Task.Name += "-" + sreq.Mode
+ return req, nil
+}
+
+// Create a task ID for the given module path and version.
+// Task IDs can contain only letters ([A-Za-z]), numbers ([0-9]), hyphens (-), or underscores (_).
+func newTaskID(modulePath, version string) string {
+ mv := modulePath + "@" + version
+ // Compute a hash to use as a prefix, so the task IDs are distributed uniformly.
+ // See https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#task
+ // under "Task De-duplication".
+ hasher := fnv.New32()
+ io.WriteString(hasher, mv)
+ hash := hasher.Sum32() % math.MaxUint16
+ // Escape the name so it contains only valid characters. Do our best to make it readable.
+ var b strings.Builder
+ for _, r := range mv {
+ switch {
+ case r >= 'A' && r <= 'Z' || r >= 'a' && r <= 'z' || r >= '0' && r <= '9' || r == '-':
+ b.WriteRune(r)
+ case r == '_':
+ b.WriteString("__")
+ case r == '/':
+ b.WriteString("_-")
+ case r == '@':
+ b.WriteString("_v")
+ case r == '.':
+ b.WriteString("_o")
+ default:
+ fmt.Fprintf(&b, "_%04x", r)
+ }
+ }
+ return fmt.Sprintf("%04x-%s", hash, &b)
+}
+
+// InMemory is a Queue implementation that schedules in-process fetch
+// operations. Unlike the GCP task queue, it will not automatically retry tasks
+// on failure.
+//
+// This should only be used for local development.
+type InMemory struct {
+ queue chan *internal.ScanRequest
+ done chan struct{}
+}
+
+type inMemoryProcessFunc func(context.Context, *internal.ScanRequest) (int, error)
+
+// NewInMemory creates a new InMemory that asynchronously fetches
+// from proxyClient and stores in db. It uses workerCount parallelism to
+// execute these fetches.
+func NewInMemory(ctx context.Context, workerCount int, processFunc inMemoryProcessFunc) *InMemory {
+ q := &InMemory{
+ queue: make(chan *internal.ScanRequest, 1000),
+ done: make(chan struct{}),
+ }
+ sem := make(chan struct{}, workerCount)
+ go func() {
+ for v := range q.queue {
+ select {
+ case <-ctx.Done():
+ return
+ case sem <- struct{}{}:
+ }
+
+ // If a worker is available, make a request to the fetch service inside a
+ // goroutine and wait for it to finish.
+ go func(r *internal.ScanRequest) {
+ defer func() { <-sem }()
+
+ log.Infof(ctx, "Fetch requested: %v (workerCount = %d)", r, cap(sem))
+
+ fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
+ defer cancel()
+
+ if _, err := processFunc(fetchCtx, r); err != nil {
+ log.Errorf(fetchCtx, "processFunc(%q, %q): %v", r.Path, r.Version, err)
+ }
+ }(v)
+ }
+ for i := 0; i < cap(sem); i++ {
+ select {
+ case <-ctx.Done():
+ panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err()))
+ case sem <- struct{}{}:
+ }
+ }
+ close(q.done)
+ }()
+ return q
+}
+
+// EnqeueuScan pushes a fetch task into the local queue to be processed
+// asynchronously.
+func (q *InMemory) EnqueueScan(ctx context.Context, req *internal.ScanRequest, _ *Options) (bool, error) {
+ q.queue <- req
+ return true, nil
+}
+
+// WaitForTesting waits for all queued requests to finish. It should only be
+// used by test code.
+func (q *InMemory) WaitForTesting(ctx context.Context) {
+ close(q.queue)
+ <-q.done
+}
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
new file mode 100644
index 0000000..026c9a8
--- /dev/null
+++ b/internal/queue/queue_test.go
@@ -0,0 +1,93 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+ "testing"
+
+ "github.com/google/go-cmp/cmp"
+ "golang.org/x/pkgsite-metrics/internal"
+ "golang.org/x/pkgsite-metrics/internal/config"
+ taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
+ "google.golang.org/protobuf/proto"
+ "google.golang.org/protobuf/types/known/durationpb"
+)
+
+func TestNewTaskID(t *testing.T) {
+ for _, test := range []struct {
+ modulePath, version string
+ want string
+ }{
+ {"m-1", "v2", "acc5-m-1_vv2"},
+ {"my_module", "v1.2.3", "0cb9-my__module_vv1_o2_o3"},
+ {"µπΩ/github.com", "v2.3.4-ß", "a49c-_00b5_03c0_03a9_-github_ocom_vv2_o3_o4-_00df"},
+ } {
+ got := newTaskID(test.modulePath, test.version)
+ if got != test.want {
+ t.Errorf("%s@%s: got %s, want %s", test.modulePath, test.version, got, test.want)
+ }
+ }
+}
+
+func TestNewTaskRequest(t *testing.T) {
+ cfg := config.Config{
+ ProjectID: "Project",
+ LocationID: "us-central1",
+ QueueURL: "http://1.2.3.4:8000",
+ ServiceAccount: "sa",
+ }
+ want := &taskspb.CreateTaskRequest{
+ Parent: "projects/Project/locations/us-central1/queues/queueID",
+ Task: &taskspb.Task{
+ DispatchDeadline: durationpb.New(maxCloudTasksTimeout),
+ MessageType: &taskspb.Task_HttpRequest{
+ HttpRequest: &taskspb.HttpRequest{
+ HttpMethod: taskspb.HttpMethod_POST,
+ Url: "http://1.2.3.4:8000/test/scan/mod/@v/v1.2.3?importedby=0&mode=test&insecure=true",
+ AuthorizationHeader: &taskspb.HttpRequest_OidcToken{
+ OidcToken: &taskspb.OidcToken{
+ ServiceAccountEmail: "sa",
+ },
+ },
+ },
+ },
+ },
+ }
+ gcp, err := newGCP(&cfg, nil, "queueID")
+ if err != nil {
+ t.Fatal(err)
+ }
+ opts := &Options{
+ Namespace: "test",
+ TaskNameSuffix: "suf",
+ }
+ sreq := &internal.ScanRequest{
+ Module: "mod",
+ Version: "v1.2.3",
+ ImportedBy: 0,
+ Mode: "test",
+ Insecure: true,
+ }
+ got, err := gcp.newTaskRequest(sreq, opts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ want.Task.Name = got.Task.Name
+ if diff := cmp.Diff(want, got, cmp.Comparer(proto.Equal)); diff != "" {
+ t.Errorf("mismatch (-want, +got):\n%s", diff)
+ }
+
+ opts.DisableProxyFetch = true
+ want.Task.MessageType.(*taskspb.Task_HttpRequest).HttpRequest.Url += "?proxyfetch=off"
+ got, err = gcp.newTaskRequest(sreq, opts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ want.Task.Name = got.Task.Name
+ if diff := cmp.Diff(want, got, cmp.Comparer(proto.Equal)); diff != "" {
+ t.Errorf("mismatch (-want, +got):\n%s", diff)
+ }
+
+}