terraform/: add support for a new region, us-east
This will be used to further scale up the pipeline.
Change-Id: Id34f3c5195acf1bd9b09794b15431a8694dc51ac
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/703255
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Ethan Lee <ethanalee@google.com>
diff --git a/deploy/worker.yaml b/deploy/worker.yaml
index 904cf8a..efe05f6 100644
--- a/deploy/worker.yaml
+++ b/deploy/worker.yaml
@@ -97,13 +97,15 @@
- |
image=$(cat /workspace/image.txt)
service=${_ENV}-ecosystem-worker
- args="--project $PROJECT_ID --region us-central1"
- gcloud beta run deploy $args $service --image $image --execution-environment=gen2
- # If there was a rollback, `gcloud run deploy` will create a revision but
- # not point traffic to it. The following command ensures that the new revision
- # will get traffic.
- latestTraffic=$(gcloud run services $args describe $service \
- --format='value(status.traffic.latestRevision)')
- if [[ $latestTraffic != True ]]; then
- gcloud run services $args update-traffic $service --to-latest
- fi
+ for region in "us-central1" "us-east1"; do
+ args="--project $PROJECT_ID --region $region"
+ gcloud beta run deploy $args $service --image $image --execution-environment=gen2
+ # If there was a rollback, `gcloud run deploy` will create a revision but
+ # not point traffic to it. The following command ensures that the new revision
+ # will get traffic.
+ latestTraffic=$(gcloud run services $args describe $service \
+ --format='value(status.traffic.latestRevision)')
+ if [[ $latestTraffic != True ]]; then
+ gcloud run services $args update-traffic $service --to-latest
+ fi
+ done
diff --git a/internal/config/config.go b/internal/config/config.go
index 6e09702..c860ac1 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -55,10 +55,6 @@
// BigQueryDataset is the BigQuery dataset to write results to.
BigQueryDataset string
- // QueueURL is the URL that the Cloud Tasks queues should send requests to.
- // It should be used when the worker is not on AppEngine.
- QueueURL string
-
// LocalQueueWorkers is the number of concurrent requests to the fetch service,
// when running locally.
LocalQueueWorkers int
@@ -106,6 +102,10 @@
// Cloud Task Queues
QueueNames []string
+
+ // QueueURLs are the URLs that the Cloud Tasks queues should send requests to.
+ // It should be used when the worker is not on AppEngine.
+ QueueURLs []string
}
// Init resolves all configuration values provided by the config package. It
@@ -127,7 +127,7 @@
StaticPath: ts,
BigQueryDataset: GetEnv("GO_ECOSYSTEM_BIGQUERY_DATASET", "disable"),
QueueNames: strings.Split(os.Getenv("GO_ECOSYSTEM_QUEUE_NAMES"), ","),
- QueueURL: os.Getenv("GO_ECOSYSTEM_QUEUE_URL"),
+ QueueURLs: strings.Split(os.Getenv("GO_ECOSYSTEM_QUEUE_URLS"), ","),
VulnDBBucketProjectID: os.Getenv("GO_ECOSYSTEM_VULNDB_BUCKET_PROJECT"),
BinaryBucket: os.Getenv("GO_ECOSYSTEM_BINARY_BUCKET"),
BinaryDir: GetEnv("GO_ECOSYSTEM_BINARY_DIR", "/tmp/binaries"),
diff --git a/internal/queue/queue.go b/internal/queue/queue.go
index 940d092..0ca78c6 100644
--- a/internal/queue/queue.go
+++ b/internal/queue/queue.go
@@ -55,15 +55,18 @@
if err != nil {
return nil, err
}
- log.Infof(ctx, "enqueuing to %d queues with URL=%q", len(g.queueNames), g.queueURL)
return g, nil
}
+type taskQueue struct {
+ name string // full GCP name of the queue
+ url string // non-AppEngine URL to post tasks to
+}
+
// GCP provides a Queue implementation backed by the Google Cloud Tasks API.
type GCP struct {
client *cloudtasks.Client
- queueNames []string // full GCP name of the queue
- queueURL string // non-AppEngine URL to post tasks to
+ taskQueues []taskQueue
// token holds information that lets the task queue construct an authorized request to the worker.
// Since the worker sits behind the IAP, the queue needs an identity token that includes the
// identity of a service account that has access, and the client ID for the IAP.
@@ -71,6 +74,14 @@
token *taskspb.HttpRequest_OidcToken
}
+func queueIdToLocation(id string) (string, error) {
+ parts := strings.Split(id, "-")
+ if len(parts) != 4 {
+ return "", fmt.Errorf("unable to extract location from queue ID: %s", id)
+ }
+ return strings.Join([]string{parts[1], parts[2]}, "-"), nil
+}
+
// newGCP returns a new Queue that can be used to enqueue tasks using the
// cloud tasks API. The given queueID should be the name of the queue in the
// cloud tasks console.
@@ -79,26 +90,30 @@
if len(queueIDs) == 0 {
return nil, errors.New("must provide at least one queueID")
}
+ if len(queueIDs) != len(cfg.QueueURLs) {
+ return nil, errors.New("must provide the same number of queue ids and queue urls")
+ }
if cfg.ProjectID == "" {
return nil, errors.New("empty ProjectID")
}
if cfg.LocationID == "" {
return nil, errors.New("empty LocationID")
}
- if cfg.QueueURL == "" {
- return nil, errors.New("empty QueueURL")
- }
if cfg.ServiceAccount == "" {
return nil, errors.New("empty ServiceAccount")
}
- var queueNames []string
- for _, id := range queueIDs {
- queueNames = append(queueNames, fmt.Sprintf("projects/%s/locations/%s/queues/%s", cfg.ProjectID, cfg.LocationID, id))
+ var taskQueues []taskQueue
+ for idx, id := range queueIDs {
+ location, err := queueIdToLocation(id)
+ if err != nil {
+ return nil, err
+ }
+ name := fmt.Sprintf("projects/%s/locations/%s/queues/%s", cfg.ProjectID, location, id)
+ taskQueues = append(taskQueues, taskQueue{name: name, url: cfg.QueueURLs[idx]})
}
return &GCP{
client: client,
- queueNames: queueNames,
- queueURL: cfg.QueueURL,
+ taskQueues: taskQueues,
token: &taskspb.HttpRequest_OidcToken{
OidcToken: &taskspb.OidcToken{
ServiceAccountEmail: cfg.ServiceAccount,
@@ -134,7 +149,7 @@
log.Debugf(ctx, "ignoring duplicate task ID %s", req.Task.Name)
enqueued = false
} else {
- return false, fmt.Errorf("q.client.CreateTask(ctx, req): %v", err)
+ return false, fmt.Errorf("q.client.CreateTask(ctx, req): queue_name: %s, err: %v", req.Parent, err)
}
}
return enqueued, nil
@@ -176,23 +191,22 @@
relativeURI += "?" + params
}
- queueIndex := rand.IntN(len(q.queueNames))
- queueName := q.queueNames[queueIndex]
+ queue := q.taskQueues[rand.IntN(len(q.taskQueues))]
taskID := newTaskID(opts.Namespace, task)
taskpb := &taskspb.Task{
- Name: fmt.Sprintf("%s/tasks/%s", queueName, taskID),
+ Name: fmt.Sprintf("%s/tasks/%s", queue.name, taskID),
DispatchDeadline: durationpb.New(maxCloudTasksTimeout),
MessageType: &taskspb.Task_HttpRequest{
HttpRequest: &taskspb.HttpRequest{
HttpMethod: taskspb.HttpMethod_POST,
- Url: q.queueURL + relativeURI,
+ Url: queue.url + relativeURI,
AuthorizationHeader: q.token,
},
},
}
req := &taskspb.CreateTaskRequest{
- Parent: queueName,
+ Parent: queue.name,
Task: taskpb,
}
// If suffix is non-empty, append it to the task name.
diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go
index bb5bbd0..23d32a9 100644
--- a/internal/queue/queue_test.go
+++ b/internal/queue/queue_test.go
@@ -5,6 +5,7 @@
package queue
import (
+ "strings"
"testing"
taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb"
@@ -50,14 +51,13 @@
cfg := config.Config{
ProjectID: "Project",
LocationID: "us-central1",
- QueueURL: "http://1.2.3.4:8000",
+ QueueURLs: []string{"url-for-region-0", "url-for-region-0", "url-for-region-1"},
ServiceAccount: "sa",
}
- queueIDs := []string{"queueID-0", "queueID-1", "queueID-2", "queueID-3"}
- var possibleQueueNames []string
- for _, qID := range queueIDs {
- possibleQueueNames = append(possibleQueueNames, "projects/Project/locations/us-central1/queues/"+qID)
- }
+ queueIDs := []string{"env-some-region-0", "env-some-region-1", "env-other-region-0"}
+ possibleQueueNames := []string{"projects/Project/locations/some-region/queues/env-some-region-0",
+ "projects/Project/locations/some-region/queues/env-some-region-1",
+ "projects/Project/locations/other-region/queues/env-other-region-0"}
gcp, err := newGCP(&cfg, nil, queueIDs)
if err != nil {
@@ -90,6 +90,21 @@
t.Errorf("got.Parent = %q, want one of %v", got.Parent, possibleQueueNames)
}
+ gotUrl := got.Task.GetHttpRequest().Url
+ if !strings.HasSuffix(gotUrl, "test/scan/mod@v1.2.3?importedby=0&mode=test&insecure=true") {
+ t.Errorf("Url = %q, want HasSuffix(got, test/scan/mod@v1.2.3?importedby=0&mode=test&insecure=true)", gotUrl)
+ }
+ validUrl := false
+ for _, url := range cfg.QueueURLs {
+ if strings.HasPrefix(gotUrl, url) {
+ validUrl = true
+ break
+ }
+ }
+ if !validUrl {
+ t.Errorf(".Url = %q, must start with one of %v", gotUrl, cfg.QueueURLs)
+ }
+
want := &taskspb.CreateTaskRequest{
Parent: got.Parent,
Task: &taskspb.Task{
@@ -97,7 +112,7 @@
MessageType: &taskspb.Task_HttpRequest{
HttpRequest: &taskspb.HttpRequest{
HttpMethod: taskspb.HttpMethod_POST,
- Url: "http://1.2.3.4:8000/test/scan/mod@v1.2.3?importedby=0&mode=test&insecure=true",
+ Url: gotUrl,
AuthorizationHeader: &taskspb.HttpRequest_OidcToken{
OidcToken: &taskspb.OidcToken{
ServiceAccountEmail: "sa",
@@ -114,7 +129,6 @@
}
opts.DisableProxyFetch = true
- want.Task.MessageType.(*taskspb.Task_HttpRequest).HttpRequest.Url += "&proxyfetch=off"
got, err = gcp.newTaskRequest(sreq, opts)
if err != nil {
@@ -134,6 +148,22 @@
}
want.Parent = got.Parent
+ gotUrl = got.Task.GetHttpRequest().Url
+ if !strings.HasSuffix(gotUrl, "test/scan/mod@v1.2.3?importedby=0&mode=test&insecure=true&proxyfetch=off") {
+ t.Errorf("Url = %q, want HasSuffix(got, test/scan/mod@v1.2.3?importedby=0&mode=test&insecure=true)", gotUrl)
+ }
+ validUrl = false
+ for _, url := range cfg.QueueURLs {
+ if strings.HasPrefix(gotUrl, url) {
+ validUrl = true
+ break
+ }
+ }
+ if !validUrl {
+ t.Errorf(".Url = %q, must start with one of %v", gotUrl, cfg.QueueURLs)
+ }
+ want.Task.GetHttpRequest().Url = gotUrl
+
if diff := cmp.Diff(want, got, protocmp.Transform()); diff != "" {
t.Errorf("mismatch (-want, +got):\n%s", diff)
}
diff --git a/terraform/environment/worker.tf b/terraform/environment/worker.tf
index 282179d..b3bf3c2 100644
--- a/terraform/environment/worker.tf
+++ b/terraform/environment/worker.tf
@@ -17,9 +17,9 @@
type = string
}
-variable "region" {
- description = "GCP region"
- type = string
+variable "regions" {
+ description = "GCP region(s)"
+ type = set(string)
}
variable "pkgsite_db_project" {
@@ -43,10 +43,24 @@
}
locals {
- worker_url = data.google_cloud_run_service.worker.status[0].url
+ worker_url = data.google_cloud_run_service.worker[tolist(var.regions)[0]].status[0].url
tz = "America/New_York"
worker_service_account = "worker@${var.project}.iam.gserviceaccount.com"
- pkgsite_db = "${var.pkgsite_db_project}:${var.region}:${var.pkgsite_db_name}"
+ pkgsite_db = "${var.pkgsite_db_project}:${tolist(var.regions)[0]}:${var.pkgsite_db_name}"
+ task_queues = flatten([
+ for region in var.regions : [
+ for i in range(5) : {
+ name = format("%s-%s-%05s", var.env, region, i)
+ region = region
+ url = data.google_cloud_run_service.worker[region].status[0].url
+ }
+ ]
+ ])
+ # Move task queues into a map to make them easier to use with for_each
+ task_queues_map = {
+ for q in local.task_queues :
+ q.name => q
+ }
}
@@ -60,6 +74,7 @@
}
resource "google_cloud_run_service" "worker" {
+ for_each = var.regions
provider = google-beta
lifecycle {
@@ -79,7 +94,7 @@
name = "${var.env}-ecosystem-worker"
project = var.project
- location = var.region
+ location = each.value
metadata {
annotations = {
@@ -93,7 +108,7 @@
# Get the image from GCP (see the "data" block below).
# Exception: when first creating the service, replace this with a hardcoded
# image tag.
- image = data.google_cloud_run_service.worker.template[0].spec[0].containers[0].image
+ image = data.google_cloud_run_service.worker[each.value].template[0].spec[0].containers[0].image
env {
name = "GOOGLE_CLOUD_PROJECT"
value = var.project
@@ -113,17 +128,9 @@
}
}
env {
- name = "GO_ECOSYSTEM_QUEUE_URL"
- value = local.worker_url
- }
- env {
- name = "GITHUB_ACCESS_TOKEN"
- value_from {
- secret_key_ref {
- name = google_secret_manager_secret.github_access_token.secret_id
- key = "latest"
- }
- }
+ name = "GO_ECOSYSTEM_QUEUE_URLS"
+ #value = data.google_cloud_run_service.worker[each.value].status[0].url
+ value = join(",", [for queue in local.task_queues_map : queue.url])
}
env {
name = "CLOUD_RUN_CONCURRENCY"
@@ -177,7 +184,7 @@
metadata {
annotations = {
"autoscaling.knative.dev/minScale" = "10"
- "autoscaling.knative.dev/maxScale" = "500"
+ "autoscaling.knative.dev/maxScale" = each.value == "us-central1" ? "2500" : "625"
"run.googleapis.com/cloudsql-instances" = local.pkgsite_db
"run.googleapis.com/execution-environment" = "gen2"
}
@@ -199,19 +206,20 @@
#
# We use this data source is used to determine the deployed image.
data "google_cloud_run_service" "worker" {
+ for_each = var.regions
name = "${var.env}-ecosystem-worker"
project = var.project
- location = var.region
+ location = each.value
}
################################################################
# Other components.
resource "google_cloud_tasks_queue" "worker_queues" {
- count = 5
+ for_each = local.task_queues_map
project = var.project
- name = "${var.env}-worker-queue-${count.index}"
- location = var.region
+ name = each.value.name
+ location = each.value.region
rate_limits {
max_dispatches_per_second = 500
@@ -227,14 +235,6 @@
}
}
-resource "google_secret_manager_secret" "github_access_token" {
- secret_id = "${var.env}-github-access-token"
- project = var.project
- replication {
- automatic = true
- }
-}
-
resource "google_cloud_scheduler_job" "vulndb" {
count = var.env == "prod" ? 1 : 0
name = "${var.env}-vulndb"
diff --git a/terraform/main.tf b/terraform/main.tf
index e9cfcfd..e835112 100644
--- a/terraform/main.tf
+++ b/terraform/main.tf
@@ -285,7 +285,7 @@
source = "./environment"
env = "prod"
project = var.prod_project
- region = local.region
+ regions = [local.region]
pkgsite_db_project = var.pkgsite_db_project
pkgsite_db_name = var.pkgsite_db_name
vulndb_bucket_project = var.vulndb_bucket_project
@@ -297,10 +297,9 @@
source = "./environment"
env = "dev"
project = var.dev_project
- region = local.region
+ regions = ["us-central1", "us-east1"]
pkgsite_db_project = var.pkgsite_db_project
pkgsite_db_name = var.pkgsite_db_name
vulndb_bucket_project = var.vulndb_bucket_project
use_profiler = false
}
-