terraform/: add support for a new region, us-east

This will be used to further scale up the pipeline.

Change-Id: Id34f3c5195acf1bd9b09794b15431a8694dc51ac
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/703255
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Ethan Lee <ethanalee@google.com>
diff --git a/deploy/worker.yaml b/deploy/worker.yaml
index 904cf8a..efe05f6 100644
--- a/deploy/worker.yaml
+++ b/deploy/worker.yaml
@@ -97,13 +97,15 @@
       - |
         image=$(cat /workspace/image.txt)
         service=${_ENV}-ecosystem-worker
-        args="--project $PROJECT_ID --region us-central1"
-        gcloud beta run deploy $args  $service --image $image --execution-environment=gen2
-        # If there was a rollback, `gcloud run deploy` will create a revision but
-        # not point traffic to it. The following command ensures that the new revision
-        # will get traffic.
-        latestTraffic=$(gcloud run services $args describe $service \
-                        --format='value(status.traffic.latestRevision)')
-        if [[ $latestTraffic != True ]]; then
-          gcloud run services $args update-traffic $service --to-latest
-        fi
+        for region in "us-central1" "us-east1"; do
+          args="--project $PROJECT_ID --region $region"
+          gcloud beta run deploy $args $service --image $image --execution-environment=gen2
+          # If there was a rollback, `gcloud run deploy` will create a revision but
+          # not point traffic to it. The following command ensures that the new revision
+          # will get traffic.
+          latestTraffic=$(gcloud run services $args describe $service \
+                          --format='value(status.traffic.latestRevision)')
+          if [[ $latestTraffic != True ]]; then
+            gcloud run services $args update-traffic $service --to-latest
+          fi
+        done
diff --git a/internal/config/config.go b/internal/config/config.go
index 6e09702..c860ac1 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -55,10 +55,6 @@
 	// BigQueryDataset is the BigQuery dataset to write results to.
 	BigQueryDataset string
 
-	// QueueURL is the URL that the Cloud Tasks queues should send requests to.
-	// It should be used when the worker is not on AppEngine.
-	QueueURL string
-
 	// LocalQueueWorkers is the number of concurrent requests to the fetch service,
 	// when running locally.
 	LocalQueueWorkers int
@@ -106,6 +102,10 @@
 
 	// Cloud Task Queues
 	QueueNames []string
+
+	// QueueURLs are the URLs that the Cloud Tasks queues should send requests to.
+	// It should be used when the worker is not on AppEngine.
+	QueueURLs []string
 }
 
 // Init resolves all configuration values provided by the config package. It
@@ -127,7 +127,7 @@
 		StaticPath:            ts,
 		BigQueryDataset:       GetEnv("GO_ECOSYSTEM_BIGQUERY_DATASET", "disable"),
 		QueueNames:            strings.Split(os.Getenv("GO_ECOSYSTEM_QUEUE_NAMES"), ","),
-		QueueURL:              os.Getenv("GO_ECOSYSTEM_QUEUE_URL"),
+		QueueURLs:             strings.Split(os.Getenv("GO_ECOSYSTEM_QUEUE_URLS"), ","),
 		VulnDBBucketProjectID: os.Getenv("GO_ECOSYSTEM_VULNDB_BUCKET_PROJECT"),
 		BinaryBucket:          os.Getenv("GO_ECOSYSTEM_BINARY_BUCKET"),
 		BinaryDir:             GetEnv("GO_ECOSYSTEM_BINARY_DIR", "/tmp/binaries"),
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 940d092..0ca78c6 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -55,15 +55,18 @@
 	if err != nil {
 		return nil, err
 	}
-	log.Infof(ctx, "enqueuing to %d queues with URL=%q", len(g.queueNames), g.queueURL)
 	return g, nil
 }
 
+type taskQueue struct {
+	name string // full GCP name of the queue
+	url  string // non-AppEngine URL to post tasks to
+}
+
 // GCP provides a Queue implementation backed by the Google Cloud Tasks API.
 type GCP struct {
 	client     *cloudtasks.Client
-	queueNames []string // full GCP name of the queue
-	queueURL   string   // non-AppEngine URL to post tasks to
+	taskQueues []taskQueue
 	// token holds information that lets the task queue construct an authorized request to the worker.
 	// Since the worker sits behind the IAP, the queue needs an identity token that includes the
 	// identity of a service account that has access, and the client ID for the IAP.
@@ -71,6 +74,14 @@
 	token *taskspb.HttpRequest_OidcToken
 }
 
+func queueIdToLocation(id string) (string, error) {
+	parts := strings.Split(id, "-")
+	if len(parts) != 4 {
+		return "", fmt.Errorf("unable to extract location from queue ID: %s", id)
+	}
+	return strings.Join([]string{parts[1], parts[2]}, "-"), nil
+}
+
 // newGCP returns a new Queue that can be used to enqueue tasks using the
 // cloud tasks API.  The given queueID should be the name of the queue in the
 // cloud tasks console.
@@ -79,26 +90,30 @@
 	if len(queueIDs) == 0 {
 		return nil, errors.New("must provide at least one queueID")
 	}
+	if len(queueIDs) != len(cfg.QueueURLs) {
+		return nil, errors.New("must provide the same number of queue ids and queue urls")
+	}
 	if cfg.ProjectID == "" {
 		return nil, errors.New("empty ProjectID")
 	}
 	if cfg.LocationID == "" {
 		return nil, errors.New("empty LocationID")
 	}
-	if cfg.QueueURL == "" {
-		return nil, errors.New("empty QueueURL")
-	}
 	if cfg.ServiceAccount == "" {
 		return nil, errors.New("empty ServiceAccount")
 	}
-	var queueNames []string
-	for _, id := range queueIDs {
-		queueNames = append(queueNames, fmt.Sprintf("projects/%s/locations/%s/queues/%s", cfg.ProjectID, cfg.LocationID, id))
+	var taskQueues []taskQueue
+	for idx, id := range queueIDs {
+		location, err := queueIdToLocation(id)
+		if err != nil {
+			return nil, err
+		}
+		name := fmt.Sprintf("projects/%s/locations/%s/queues/%s", cfg.ProjectID, location, id)
+		taskQueues = append(taskQueues, taskQueue{name: name, url: cfg.QueueURLs[idx]})
 	}
 	return &GCP{
 		client:     client,
-		queueNames: queueNames,
-		queueURL:   cfg.QueueURL,
+		taskQueues: taskQueues,
 		token: &taskspb.HttpRequest_OidcToken{
 			OidcToken: &taskspb.OidcToken{
 				ServiceAccountEmail: cfg.ServiceAccount,
@@ -134,7 +149,7 @@
 			log.Debugf(ctx, "ignoring duplicate task ID %s", req.Task.Name)
 			enqueued = false
 		} else {
-			return false, fmt.Errorf("q.client.CreateTask(ctx, req): %v", err)
+			return false, fmt.Errorf("q.client.CreateTask(ctx, req): queue_name: %s, err: %v", req.Parent, err)
 		}
 	}
 	return enqueued, nil
@@ -176,23 +191,22 @@
 		relativeURI += "?" + params
 	}
 
-	queueIndex := rand.IntN(len(q.queueNames))
-	queueName := q.queueNames[queueIndex]
+	queue := q.taskQueues[rand.IntN(len(q.taskQueues))]
 
 	taskID := newTaskID(opts.Namespace, task)
 	taskpb := &taskspb.Task{
-		Name:             fmt.Sprintf("%s/tasks/%s", queueName, taskID),
+		Name:             fmt.Sprintf("%s/tasks/%s", queue.name, taskID),
 		DispatchDeadline: durationpb.New(maxCloudTasksTimeout),
 		MessageType: &taskspb.Task_HttpRequest{
 			HttpRequest: &taskspb.HttpRequest{
 				HttpMethod:          taskspb.HttpMethod_POST,
-				Url:                 q.queueURL + relativeURI,
+				Url:                 queue.url + relativeURI,
 				AuthorizationHeader: q.token,
 			},
 		},
 	}
 	req := &taskspb.CreateTaskRequest{
-		Parent: queueName,
+		Parent: queue.name,
 		Task:   taskpb,
 	}
 	// If suffix is non-empty, append it to the task name.
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
index bb5bbd0..23d32a9 100644
--- a/internal/queue/queue_test.go
+++ b/internal/queue/queue_test.go
@@ -5,6 +5,7 @@
 package queue
 
 import (
+	"strings"
 	"testing"
 
 	taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb"
@@ -50,14 +51,13 @@
 	cfg := config.Config{
 		ProjectID:      "Project",
 		LocationID:     "us-central1",
-		QueueURL:       "http://1.2.3.4:8000",
+		QueueURLs:      []string{"url-for-region-0", "url-for-region-0", "url-for-region-1"},
 		ServiceAccount: "sa",
 	}
-	queueIDs := []string{"queueID-0", "queueID-1", "queueID-2", "queueID-3"}
-	var possibleQueueNames []string
-	for _, qID := range queueIDs {
-		possibleQueueNames = append(possibleQueueNames, "projects/Project/locations/us-central1/queues/"+qID)
-	}
+	queueIDs := []string{"env-some-region-0", "env-some-region-1", "env-other-region-0"}
+	possibleQueueNames := []string{"projects/Project/locations/some-region/queues/env-some-region-0",
+		"projects/Project/locations/some-region/queues/env-some-region-1",
+		"projects/Project/locations/other-region/queues/env-other-region-0"}
 
 	gcp, err := newGCP(&cfg, nil, queueIDs)
 	if err != nil {
@@ -90,6 +90,21 @@
 		t.Errorf("got.Parent = %q, want one of %v", got.Parent, possibleQueueNames)
 	}
 
+	gotUrl := got.Task.GetHttpRequest().Url
+	if !strings.HasSuffix(gotUrl, "test/scan/mod@v1.2.3?importedby=0&mode=test&insecure=true") {
+		t.Errorf("Url = %q, want HasSuffix(got, test/scan/mod@v1.2.3?importedby=0&mode=test&insecure=true)", gotUrl)
+	}
+	validUrl := false
+	for _, url := range cfg.QueueURLs {
+		if strings.HasPrefix(gotUrl, url) {
+			validUrl = true
+			break
+		}
+	}
+	if !validUrl {
+		t.Errorf(".Url = %q, must start with one of %v", gotUrl, cfg.QueueURLs)
+	}
+
 	want := &taskspb.CreateTaskRequest{
 		Parent: got.Parent,
 		Task: &taskspb.Task{
@@ -97,7 +112,7 @@
 			MessageType: &taskspb.Task_HttpRequest{
 				HttpRequest: &taskspb.HttpRequest{
 					HttpMethod: taskspb.HttpMethod_POST,
-					Url:        "http://1.2.3.4:8000/test/scan/mod@v1.2.3?importedby=0&mode=test&insecure=true",
+					Url:        gotUrl,
 					AuthorizationHeader: &taskspb.HttpRequest_OidcToken{
 						OidcToken: &taskspb.OidcToken{
 							ServiceAccountEmail: "sa",
@@ -114,7 +129,6 @@
 	}
 
 	opts.DisableProxyFetch = true
-	want.Task.MessageType.(*taskspb.Task_HttpRequest).HttpRequest.Url += "&proxyfetch=off"
 
 	got, err = gcp.newTaskRequest(sreq, opts)
 	if err != nil {
@@ -134,6 +148,22 @@
 	}
 	want.Parent = got.Parent
 
+	gotUrl = got.Task.GetHttpRequest().Url
+	if !strings.HasSuffix(gotUrl, "test/scan/mod@v1.2.3?importedby=0&mode=test&insecure=true&proxyfetch=off") {
+		t.Errorf("Url = %q, want HasSuffix(got, test/scan/mod@v1.2.3?importedby=0&mode=test&insecure=true)", gotUrl)
+	}
+	validUrl = false
+	for _, url := range cfg.QueueURLs {
+		if strings.HasPrefix(gotUrl, url) {
+			validUrl = true
+			break
+		}
+	}
+	if !validUrl {
+		t.Errorf(".Url = %q, must start with one of %v", gotUrl, cfg.QueueURLs)
+	}
+	want.Task.GetHttpRequest().Url = gotUrl
+
 	if diff := cmp.Diff(want, got, protocmp.Transform()); diff != "" {
 		t.Errorf("mismatch (-want, +got):\n%s", diff)
 	}
diff --git a/terraform/environment/worker.tf b/terraform/environment/worker.tf
index 282179d..b3bf3c2 100644
--- a/terraform/environment/worker.tf
+++ b/terraform/environment/worker.tf
@@ -17,9 +17,9 @@
   type        = string
 }
 
-variable "region" {
-  description = "GCP region"
-  type        = string
+variable "regions" {
+  description = "GCP region(s)"
+  type        = set(string)
 }
 
 variable "pkgsite_db_project" {
@@ -43,10 +43,24 @@
 }
 
 locals {
-  worker_url             = data.google_cloud_run_service.worker.status[0].url
+  worker_url             = data.google_cloud_run_service.worker[tolist(var.regions)[0]].status[0].url
   tz                     = "America/New_York"
   worker_service_account = "worker@${var.project}.iam.gserviceaccount.com"
-  pkgsite_db             = "${var.pkgsite_db_project}:${var.region}:${var.pkgsite_db_name}"
+  pkgsite_db             = "${var.pkgsite_db_project}:${tolist(var.regions)[0]}:${var.pkgsite_db_name}"
+  task_queues = flatten([
+    for region in var.regions : [
+      for i in range(5) : {
+        name = format("%s-%s-%05s", var.env, region, i)
+        region = region
+        url = data.google_cloud_run_service.worker[region].status[0].url
+      }
+    ]
+  ])
+  # Move task queues into a map to make them easier to use with for_each
+  task_queues_map = {
+    for q in local.task_queues :
+      q.name => q
+    }
 }
 
 
@@ -60,6 +74,7 @@
 }
 
 resource "google_cloud_run_service" "worker" {
+  for_each = var.regions
   provider = google-beta
 
   lifecycle {
@@ -79,7 +94,7 @@
 
   name     = "${var.env}-ecosystem-worker"
   project  = var.project
-  location = var.region
+  location = each.value
 
   metadata {
     annotations = {
@@ -93,7 +108,7 @@
         # Get the image from GCP (see the "data" block below).
         # Exception: when first creating the service, replace this with a hardcoded
         # image tag.
-        image = data.google_cloud_run_service.worker.template[0].spec[0].containers[0].image
+        image = data.google_cloud_run_service.worker[each.value].template[0].spec[0].containers[0].image
         env {
           name  = "GOOGLE_CLOUD_PROJECT"
           value = var.project
@@ -113,17 +128,9 @@
           }
         }
         env {
-          name  = "GO_ECOSYSTEM_QUEUE_URL"
-          value = local.worker_url
-        }
-        env {
-          name = "GITHUB_ACCESS_TOKEN"
-          value_from {
-            secret_key_ref {
-              name = google_secret_manager_secret.github_access_token.secret_id
-              key  = "latest"
-            }
-          }
+          name  = "GO_ECOSYSTEM_QUEUE_URLS"
+          #value = data.google_cloud_run_service.worker[each.value].status[0].url
+          value = join(",", [for queue in local.task_queues_map : queue.url])
         }
         env {
           name  = "CLOUD_RUN_CONCURRENCY"
@@ -177,7 +184,7 @@
     metadata {
       annotations = {
         "autoscaling.knative.dev/minScale"         = "10"
-        "autoscaling.knative.dev/maxScale"         = "500"
+        "autoscaling.knative.dev/maxScale"         = each.value == "us-central1" ? "2500" : "625"
         "run.googleapis.com/cloudsql-instances"    = local.pkgsite_db
         "run.googleapis.com/execution-environment" = "gen2"
       }
@@ -199,19 +206,20 @@
 #
 # We use this data source is used to determine the deployed image.
 data "google_cloud_run_service" "worker" {
+  for_each = var.regions
   name     = "${var.env}-ecosystem-worker"
   project  = var.project
-  location = var.region
+  location = each.value
 }
 
 ################################################################
 # Other components.
 
 resource "google_cloud_tasks_queue" "worker_queues" {
-  count = 5
+  for_each = local.task_queues_map
   project  = var.project
-  name     = "${var.env}-worker-queue-${count.index}"
-  location = var.region
+  name     = each.value.name
+  location = each.value.region
 
   rate_limits {
     max_dispatches_per_second = 500
@@ -227,14 +235,6 @@
   }
 }
 
-resource "google_secret_manager_secret" "github_access_token" {
-  secret_id = "${var.env}-github-access-token"
-  project   = var.project
-  replication {
-    automatic = true
-  }
-}
-
 resource "google_cloud_scheduler_job" "vulndb" {
   count       = var.env == "prod" ? 1 : 0
   name        = "${var.env}-vulndb"
diff --git a/terraform/main.tf b/terraform/main.tf
index e9cfcfd..e835112 100644
--- a/terraform/main.tf
+++ b/terraform/main.tf
@@ -285,7 +285,7 @@
   source                = "./environment"
   env                   = "prod"
   project               = var.prod_project
-  region                = local.region
+  regions                = [local.region]
   pkgsite_db_project    = var.pkgsite_db_project
   pkgsite_db_name       = var.pkgsite_db_name
   vulndb_bucket_project = var.vulndb_bucket_project
@@ -297,10 +297,9 @@
   source                = "./environment"
   env                   = "dev"
   project               = var.dev_project
-  region                = local.region
+  regions                = ["us-central1", "us-east1"]
   pkgsite_db_project    = var.pkgsite_db_project
   pkgsite_db_name       = var.pkgsite_db_name
   vulndb_bucket_project = var.vulndb_bucket_project
   use_profiler          = false
 }
-