cmd,internal: add initial worker
An initial version of the worker server is added. At the moment, it just
serves an index page.
Change-Id: Ib707cde92ddbfbd7dd6bde9d37e8612f470ed41b
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/464619
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Julie Qiu <julieqiu@google.com>
Reviewed-by: Julie Qiu <julieqiu@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Auto-Submit: Julie Qiu <julieqiu@google.com>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index cf7301e..b9e90d4 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -2,13 +2,68 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-// Command worker runs the worker server.
+// Command worker runs the go-metrics worker server.
package main
import (
+ "context"
+ "flag"
"fmt"
+ "net/http"
+ "os"
+
+ "golang.org/x/exp/event"
+ "golang.org/x/pkgsite-metrics/internal/config"
+ "golang.org/x/pkgsite-metrics/internal/log"
+ "golang.org/x/pkgsite-metrics/internal/worker"
+)
+
+var (
+ workers = flag.Int("workers", 10, "number of concurrent requests to the fetch service, when running locally")
+ devMode = flag.Bool("dev", false, "enable developer mode (reload templates on each page load, serve non-minified JS/CSS, etc.)")
+ port = flag.String("port", config.GetEnv("PORT", "8080"), "port to listen to")
+ dataset = flag.String("dataset", "", "dataset (overrides GO_ECOSYSTEM_METRICS_BIGQUERY_DATASET env var); use 'disable' for no BQ")
+ insecure = flag.Bool("insecure", false, "bypass sandbox in order to compare with old code")
+ // flag used in call to safehtml/template.TrustedSourceFromFlag
+ _ = flag.String("static", "static", "path to folder containing static files served")
)
func main() {
- fmt.Println("Hello World!")
+ flag.Usage = func() {
+ out := flag.CommandLine.Output()
+ fmt.Fprintln(out, "usage:")
+ fmt.Fprintln(out, "worker FLAGS")
+ fmt.Fprintln(out, " run as a server, listening at the PORT env var")
+ flag.PrintDefaults()
+ }
+
+ flag.Parse()
+ ctx := context.Background()
+ ctx = event.WithExporter(ctx,
+ event.NewExporter(log.NewLineHandler(os.Stderr), nil))
+
+ if err := runServer(ctx); err != nil {
+ log.Fatal(ctx, err)
+ }
+}
+
+func runServer(ctx context.Context) error {
+ cfg, err := config.Init(ctx)
+ if err != nil {
+ return err
+ }
+ cfg.LocalQueueWorkers = *workers
+ cfg.DevMode = *devMode
+ if *dataset != "" {
+ cfg.BigQueryDataset = *dataset
+ }
+ cfg.Insecure = *insecure
+ cfg.Dump(os.Stdout)
+ log.Infof(ctx, "config: project=%s, dataset=%s", cfg.ProjectID, cfg.BigQueryDataset)
+ if _, err := worker.NewServer(ctx, cfg); err != nil {
+ return err
+ }
+ addr := ":" + *port
+ log.Infof(ctx, "Listening on addr http://localhost%s", addr)
+ return fmt.Errorf("listening: %v", http.ListenAndServe(addr, nil))
}
diff --git a/go.mod b/go.mod
index 4729553..41351a8 100644
--- a/go.mod
+++ b/go.mod
@@ -14,6 +14,7 @@
github.com/go-git/go-billy/v5 v5.4.0
github.com/go-git/go-git/v5 v5.5.2
github.com/google/go-cmp v0.5.9
+ github.com/google/safehtml v0.1.0
go.opencensus.io v0.24.0
go.opentelemetry.io/otel v1.4.0
go.opentelemetry.io/otel/sdk v1.4.0
@@ -23,6 +24,7 @@
golang.org/x/net v0.5.0
golang.org/x/tools v0.5.1-0.20230117180257-8aba49bb5ea2
google.golang.org/api v0.103.0
+ google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c
honnef.co/go/tools v0.2.2
mvdan.cc/unparam v0.0.0-20220926085101-66de63301820
)
@@ -67,7 +69,6 @@
golang.org/x/text v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
- google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c // indirect
google.golang.org/grpc v1.50.1 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
diff --git a/go.sum b/go.sum
index 4f2b161..e7be449 100644
--- a/go.sum
+++ b/go.sum
@@ -226,6 +226,8 @@
github.com/google/pprof v0.0.0-20210715191844-86eeefc3e471/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
+github.com/google/safehtml v0.1.0 h1:EwLKo8qawTKfsi0orxcQAZzu07cICaBeFMegAU9eaT8=
+github.com/google/safehtml v0.1.0/go.mod h1:L4KWwDsUJdECRAEpZoBn3O64bQaywRscowZjJAzjHnU=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
diff --git a/internal/config/config.go b/internal/config/config.go
new file mode 100644
index 0000000..f919027
--- /dev/null
+++ b/internal/config/config.go
@@ -0,0 +1,222 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package config
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "flag"
+ "fmt"
+ "io"
+ "net/http"
+ "os"
+ "strconv"
+
+ "github.com/google/safehtml/template"
+ "golang.org/x/net/context/ctxhttp"
+ "golang.org/x/pkgsite-metrics/internal/derrors"
+ mrpb "google.golang.org/genproto/googleapis/api/monitoredres"
+)
+
+// Config holds configuration information for the worker server.
+type Config struct {
+ // ProjectID is the Google Cloud ProjectID where the resources live.
+ ProjectID string
+
+ // Identifier for the version currently running.
+ // We do not use the version ID from Cloud Run (see
+ // https://cloud.google.com/run/docs/reference/container-contract).
+ // Instead, we use the DOCKER_IMAGE environment variable, set
+ // in the Cloud Build deploy file.
+ VersionID string
+
+ // LocationID is the location for the GCP project.
+ LocationID string
+
+ // ServiceID names the Cloud Run service.
+ ServiceID string
+
+ // StaticPath is the directory containing static files.
+ StaticPath template.TrustedSource
+
+ // ServiceAccount is the email of the service account that this process
+ // is running as when on GCP.
+ ServiceAccount string
+
+ // UseErrorReporting determines whether errors go to the Error Reporting API.
+ UseErrorReporting bool
+
+ // BigQuery dataset to write results to.
+ BigQueryDataset string
+
+ // QueueName is the name of the Cloud Tasks queue.
+ QueueName string
+
+ // QueueURL is the URL that the Cloud Tasks queue should send requests to.
+ // It should be used when the worker is not on AppEngine.
+ QueueURL string
+
+ // LocalQueueWorkers is the number of concurrent requests to the fetch service, when running locally.
+ LocalQueueWorkers int
+
+ // MonitoredResource represents the resource that is running the current binary.
+ // It might be a Google AppEngine app, a Cloud Run service, or a Kubernetes pod.
+ // See https://cloud.google.com/monitoring/api/resources for more
+ // details:
+ // "An object representing a resource that can be used for monitoring, logging,
+ // billing, or other purposes. Examples include virtual machine instances,
+ // databases, and storage devices such as disks.""
+ MonitoredResource *mrpb.MonitoredResource
+
+ // DevMode indicates whether the server is running in development mode.
+ DevMode bool
+
+ // The host, port and user of the pkgsite database used to find
+ // modules to scan.
+ PkgsiteDBHost string
+ PkgsiteDBPort string
+ PkgsiteDBName string
+ PkgsiteDBUser string
+ // The name of the Secret Manager secret holding the DB password.
+ PkgsiteDBSecret string
+
+ // Run analysis binaries without sandbox.
+ Insecure bool
+
+ // ProxyURL is the URL of the module proxy.
+ ProxyURL string
+}
+
+// Init resolves all configuration values provided by the config package. It
+// must be called before any configuration values are used.
+func Init(ctx context.Context) (_ *Config, err error) {
+ defer derrors.Wrap(&err, "config.Init(ctx)")
+ // Build a Config from the execution environment, loading some values
+ // from environment variables.
+
+ var ts template.TrustedSource
+ if f := flag.Lookup("static"); f != nil {
+ ts = template.TrustedSourceFromFlag(f.Value)
+ }
+ cfg := &Config{
+ ProjectID: GetEnv("GOOGLE_CLOUD_PROJECT", "go-ecosystem"),
+ ServiceID: "go-ecosystem-worker",
+ VersionID: os.Getenv("DOCKER_IMAGE"),
+ LocationID: "us-central1",
+ StaticPath: ts,
+ BigQueryDataset: GetEnv("GO_ECOSYSTEM_BIGQUERY_DATASET", "test"),
+ QueueName: os.Getenv("GO_ECOSYSTEM_QUEUE_NAME"),
+ QueueURL: os.Getenv("GO_ECOSYSTEM_QUEUE_URL"),
+ PkgsiteDBHost: GetEnv("GO_ECOSYSTEM_PKGSITE_DB_HOST", "localhost"),
+ PkgsiteDBPort: GetEnv("GO_ECOSYSTEM_PKGSITE_DB_PORT", "5432"),
+ PkgsiteDBName: GetEnv("GO_ECOSYSTEM_PKGSITE_DB_NAME", "discovery-db"),
+ PkgsiteDBUser: GetEnv("GO_ECOSYSTEM_PKGSITE_DB_USER", "postgres"),
+ PkgsiteDBSecret: os.Getenv("GO_ECOSYSTEM_PKGSITE_DB_SECRET"),
+ ProxyURL: GetEnv("GO_ECOSYSTEM_PROXY_URL", "https://proxy.golang.org"),
+ }
+ if OnCloudRun() {
+ sa, err := gceMetadata(ctx, "instance/service-accounts/default/email")
+ if err != nil {
+ return nil, err
+ }
+ cfg.ServiceAccount = sa
+ cfg.MonitoredResource = &mrpb.MonitoredResource{
+ Type: "cloud_run_revision",
+ Labels: map[string]string{
+ "project_id": cfg.ProjectID,
+ "service_name": cfg.ServiceID,
+ "revision_name": cfg.VersionID,
+ "configuration_name": os.Getenv("K_CONFIGURATION"),
+ },
+ }
+ cfg.UseErrorReporting = true
+ } else { // running locally, perhaps
+ cfg.MonitoredResource = &mrpb.MonitoredResource{
+ Type: "global",
+ Labels: map[string]string{"project_id": cfg.ProjectID},
+ }
+ }
+ return cfg, nil
+}
+
+// OnCloudRun reports whether the current process is running on Cloud Run.
+func OnCloudRun() bool {
+ // Use the presence of the environment variables provided by Cloud Run.
+ // See https://cloud.google.com/run/docs/reference/container-contract.
+ for _, ev := range []string{"K_SERVICE", "K_REVISION", "K_CONFIGURATION"} {
+ if os.Getenv(ev) == "" {
+ return false
+ }
+ }
+ return true
+}
+
+func (c *Config) Validate() error {
+ if c.ProjectID == "" {
+ return errors.New("missing project")
+ }
+ if c.BigQueryDataset == "" {
+ return errors.New("missing dataset")
+ }
+ return nil
+}
+
+// Dump outputs the current config information to the given Writer.
+func (c *Config) Dump(w io.Writer) error {
+ fmt.Fprint(w, "config: ")
+ enc := json.NewEncoder(w)
+ enc.SetIndent("", " ")
+ return enc.Encode(c)
+}
+
+// GetEnv looks up the given key from the environment, returning its value if
+// it exists, and otherwise returning the given fallback value.
+func GetEnv(key, fallback string) string {
+ if value, ok := os.LookupEnv(key); ok {
+ return value
+ }
+ return fallback
+}
+
+// GetEnvInt performs GetEnv(key, fallback) and parses the
+// result as int. If parsing fails, returns errVal.
+func GetEnvInt(key, fallback string, errVal int) int {
+ v := GetEnv(key, fallback)
+ i, err := strconv.Atoi(v)
+ if err != nil {
+ return errVal
+ }
+ return i
+}
+
+// gceMetadata reads a metadata value from GCE.
+// For the possible values of name, see
+// https://cloud.google.com/appengine/docs/standard/java/accessing-instance-metadata.
+func gceMetadata(ctx context.Context, name string) (_ string, err error) {
+ // See https://cloud.google.com/appengine/docs/standard/java/accessing-instance-metadata.
+ // (This documentation doesn't exist for Golang, but it seems to work).
+ defer derrors.Wrap(&err, "gceMetadata(ctx, %q)", name)
+
+ const metadataURL = "http://metadata.google.internal/computeMetadata/v1/"
+ req, err := http.NewRequest("GET", metadataURL+name, nil)
+ if err != nil {
+ return "", fmt.Errorf("http.NewRequest: %v", err)
+ }
+ req.Header.Set("Metadata-Flavor", "Google")
+ resp, err := ctxhttp.Do(ctx, nil, req)
+ if err != nil {
+ return "", fmt.Errorf("ctxhttp.Do: %v", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return "", fmt.Errorf("bad status: %s", resp.Status)
+ }
+ bytes, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return "", fmt.Errorf("io.ReadAll: %v", err)
+ }
+ return string(bytes), nil
+}
diff --git a/internal/parse.go b/internal/parse.go
new file mode 100644
index 0000000..2c21f70
--- /dev/null
+++ b/internal/parse.go
@@ -0,0 +1,205 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+ "bufio"
+ "fmt"
+ "net/http"
+ "os"
+ "strconv"
+ "strings"
+
+ "golang.org/x/pkgsite-metrics/internal/derrors"
+ "golang.org/x/pkgsite-metrics/internal/version"
+)
+
+// ScanRequest contains information passed
+// to a scan endpoint.
+type ScanRequest struct {
+ Module string
+ Version string
+ Suffix string
+ ImportedBy int
+ Mode string
+ Insecure bool
+
+ // TODO: support optional parameters?
+}
+
+func (s *ScanRequest) URLPathAndParams() string {
+ suf := s.Suffix
+ if suf != "" {
+ suf = "/" + suf
+ }
+ return fmt.Sprintf("%s/@v/%s%s?importedby=%d&mode=%s&insecure=%t", s.Module, s.Version, suf, s.ImportedBy, s.Mode, s.Insecure)
+}
+
+func (s *ScanRequest) Path() string {
+ p := s.Module + "@" + s.Version
+ if s.Suffix != "" {
+ p += "/" + s.Suffix
+ }
+ return p
+}
+
+// ParseScanRequest parses an http request r for an endpoint
+// scanPrefix and produces a corresponding ScanRequest.
+//
+// The module and version should have one of the following three forms:
+// - <module>/@v/<version>
+// - <module>@<version>
+// - <module>/@latest
+//
+// (These are the same forms that the module proxy accepts.)
+func ParseScanRequest(r *http.Request, scanPrefix string) (*ScanRequest, error) {
+ mod, vers, suff, err := ParseModuleVersionSuffix(strings.TrimPrefix(r.URL.Path, scanPrefix))
+ if err != nil {
+ return nil, err
+ }
+ importedBy, err := ParseRequiredIntParam(r, "importedby")
+ if err != nil {
+ return nil, err
+ }
+ insecure, err := ParseOptionalBoolParam(r, "insecure", false)
+ if err != nil {
+ return nil, err
+ }
+ return &ScanRequest{
+ Module: mod,
+ Version: vers,
+ Suffix: suff,
+ ImportedBy: importedBy,
+ Mode: ParseMode(r),
+ Insecure: insecure,
+ }, nil
+}
+
+// ParseModuleVersionSuffix returns the module path, version and suffix described by
+// the argument. The suffix is the part of the path after the version.
+func ParseModuleVersionSuffix(requestPath string) (path, vers, suffix string, err error) {
+ p := strings.TrimPrefix(requestPath, "/")
+ modulePath, versionAndSuffix, found := strings.Cut(p, "@")
+ if !found {
+ return "", "", "", fmt.Errorf("invalid path %q: missing '@'", requestPath)
+ }
+ modulePath = strings.TrimSuffix(modulePath, "/")
+ if modulePath == "" {
+ return "", "", "", fmt.Errorf("invalid path %q: missing module", requestPath)
+ }
+ if strings.HasPrefix(versionAndSuffix, "v/") {
+ versionAndSuffix = versionAndSuffix[2:]
+ }
+ // Now versionAndSuffix begins with a version.
+ version, suffix, _ := strings.Cut(versionAndSuffix, "/")
+ if version == "" {
+ return "", "", "", fmt.Errorf("invalid path %q: missing version", requestPath)
+ }
+ if version[0] != 'v' {
+ version = "v" + version
+ }
+ return modulePath, version, suffix, nil
+}
+
+func ParseRequiredIntParam(r *http.Request, name string) (int, error) {
+ value := r.FormValue(name)
+ if value == "" {
+ return 0, fmt.Errorf("missing query param %q", name)
+ }
+ return ParseIntParam(value, name)
+}
+
+func ParseOptionalIntParam(r *http.Request, name string, def int) (int, error) {
+ value := r.FormValue(name)
+ if value == "" {
+ return def, nil
+ }
+ return ParseIntParam(value, name)
+}
+
+func ParseIntParam(value, name string) (int, error) {
+ n, err := strconv.Atoi(value)
+ if err != nil {
+ return 0, fmt.Errorf("want integer for %q query param, got %q", name, value)
+ }
+ return n, nil
+}
+
+func ParseOptionalBoolParam(r *http.Request, name string, def bool) (bool, error) {
+ s := r.FormValue(name)
+ if s == "" {
+ return def, nil
+ }
+ return strconv.ParseBool(s)
+}
+
+func ParseMode(r *http.Request) string {
+ const name = "mode"
+ // "" is allowed mode as some endpoints
+ // might equate it with their default mode.
+ return r.FormValue(name)
+}
+
+type ModuleSpec struct {
+ Path, Version string
+ ImportedBy int
+}
+
+func ParseCorpusFile(filename string, minImportedByCount int) (ms []ModuleSpec, err error) {
+ defer derrors.Wrap(&err, "parseCorpusFile(%q)", filename)
+ lines, err := ReadFileLines(filename)
+ if err != nil {
+ return nil, err
+ }
+ for _, line := range lines {
+ fields := strings.Fields(line)
+ var path, vers, imps string
+ switch len(fields) {
+ case 2: // no version (temporary)
+ path = fields[0]
+ vers = version.Latest
+ imps = fields[1]
+ case 3:
+ path = fields[0]
+ vers = fields[1]
+ imps = fields[2]
+ default:
+ return nil, fmt.Errorf("wrong number of fields on line %q", line)
+ }
+ n, err := strconv.Atoi(imps)
+ if err != nil {
+ return nil, fmt.Errorf("%v on line %q", err, line)
+ }
+ if n >= minImportedByCount {
+ ms = append(ms, ModuleSpec{Path: path, Version: vers, ImportedBy: n})
+ }
+ }
+ return ms, nil
+}
+
+// ReadFilelines reads and returns the lines from a file.
+// Whitespace on each line is trimmed.
+// Blank lines and lines beginning with '#' are ignored.
+func ReadFileLines(filename string) (lines []string, err error) {
+ defer derrors.Wrap(&err, "readFileLines(%q)", filename)
+ f, err := os.Open(filename)
+ if err != nil {
+ return nil, err
+ }
+ defer f.Close()
+
+ s := bufio.NewScanner(f)
+ for s.Scan() {
+ line := strings.TrimSpace(s.Text())
+ if line == "" || strings.HasPrefix(line, "#") {
+ continue
+ }
+ lines = append(lines, line)
+ }
+ if s.Err() != nil {
+ return nil, s.Err()
+ }
+ return lines, nil
+}
diff --git a/internal/parse_test.go b/internal/parse_test.go
new file mode 100644
index 0000000..74f7934
--- /dev/null
+++ b/internal/parse_test.go
@@ -0,0 +1,164 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+ "net/http"
+ "net/url"
+ "testing"
+
+ "github.com/google/go-cmp/cmp"
+ "golang.org/x/pkgsite-metrics/internal/version"
+)
+
+func TestParseScanRequest(t *testing.T) {
+ for _, test := range []struct {
+ name string
+ url string
+ want ScanRequest
+ }{
+ {
+ name: "ValidScanURL",
+ url: "https://worker.com/scan/module/@v/v1.0.0?importedby=50",
+ want: ScanRequest{
+ Module: "module",
+ Version: "v1.0.0",
+ ImportedBy: 50,
+ Mode: "",
+ },
+ },
+ {
+ name: "ValidImportsOnlyScanURL",
+ url: "https://worker.com/scan/module/@v/v1.0.0-abcdefgh?importedby=100&mode=mode1",
+ want: ScanRequest{
+ Module: "module",
+ Version: "v1.0.0-abcdefgh",
+ ImportedBy: 100,
+ Mode: "mode1",
+ },
+ },
+ {
+ name: "Module@Version",
+ url: "https://worker.com/scan/module@v1.2.3?importedby=1",
+ want: ScanRequest{
+ Module: "module",
+ Version: "v1.2.3",
+ ImportedBy: 1,
+ Mode: "",
+ },
+ },
+ {
+ name: "Module@Version suffix",
+ url: "https://worker.com/scan/module@v1.2.3/path/to/dir?importedby=1",
+ want: ScanRequest{
+ Module: "module",
+ Version: "v1.2.3",
+ Suffix: "path/to/dir",
+ ImportedBy: 1,
+ Mode: "",
+ },
+ },
+ } {
+ t.Run(test.name, func(t *testing.T) {
+ u, err := url.Parse(test.url)
+ if err != nil {
+ t.Errorf("url.Parse(%q): %v", test.url, err)
+ }
+ r := &http.Request{URL: u}
+ got, err := ParseScanRequest(r, "/scan")
+ if err != nil {
+ t.Fatal(err)
+ }
+ if g, w := *got, test.want; g != w {
+ t.Errorf("\ngot %+v\nwant %+v", g, w)
+ }
+ })
+ }
+}
+
+func TestParseScanRequestError(t *testing.T) {
+ for _, test := range []struct {
+ name string
+ url string
+ want string
+ }{
+ {
+ name: "InvalidScanURL",
+ url: "/",
+ want: `invalid path "/": missing '@'`,
+ },
+ {
+ name: "InvalidScanURLNoModule",
+ url: "/@v/version",
+ want: `invalid path "/@v/version": missing module`,
+ },
+ {
+ name: "InvalidScanURLNoVersion",
+ url: "/module/@v/",
+ want: `invalid path "/module/@v/": missing version`,
+ },
+ {
+ name: "NoVersion",
+ url: "/module@",
+ want: `invalid path "/module@": missing version`,
+ },
+ {
+ name: "NoVersionSuffix",
+ url: "/module@/suffix",
+ want: `invalid path "/module@/suffix": missing version`,
+ },
+ {
+ name: "MissingImportedBy",
+ url: "/module/@v/v1.0.0",
+ want: `missing query param "importedby"`,
+ },
+ {
+ name: "BadImportedBy",
+ url: "/module@v1?importedby=1a",
+ want: `want integer for "importedby" query param, got "1a"`,
+ },
+ } {
+ t.Run(test.name, func(t *testing.T) {
+ u, err := url.Parse(test.url)
+ if err != nil {
+ t.Errorf("url.Parse(%q): %v", test.url, err)
+ }
+ r := &http.Request{URL: u}
+ if _, err := ParseScanRequest(r, "/scan"); err != nil {
+ if got := err.Error(); got != test.want {
+ t.Fatalf("\ngot %s\nwant %s", got, test.want)
+ }
+ } else {
+ t.Fatalf("error = nil; want = (%v)", test.want)
+ }
+ })
+ }
+}
+
+func TestParseCorpusFile(t *testing.T) {
+ const file = "testdata/modules.txt"
+ got, err := ParseCorpusFile(file, 1)
+ if err != nil {
+ t.Fatal(err)
+ }
+ want := []ModuleSpec{
+ {"m1", "v1.0.0", 18},
+ {"m2", "v2.3.4", 5},
+ {"m3", version.Latest, 1},
+ }
+
+ if !cmp.Equal(got, want) {
+ t.Errorf("\n got %v\nwant %v", got, want)
+ }
+
+ got, err = ParseCorpusFile(file, 10)
+ if err != nil {
+ t.Fatal(err)
+ }
+ want = want[:1]
+ if !cmp.Equal(got, want) {
+ t.Errorf("\n got %v\nwant %v", got, want)
+ }
+}
diff --git a/internal/testdata/modules.txt b/internal/testdata/modules.txt
new file mode 100644
index 0000000..1b2b419
--- /dev/null
+++ b/internal/testdata/modules.txt
@@ -0,0 +1,5 @@
+# test module corpus file
+
+m1 v1.0.0 18
+m2 v2.3.4 5
+m3 1
diff --git a/internal/worker/index.go b/internal/worker/index.go
new file mode 100644
index 0000000..a8cc2e6
--- /dev/null
+++ b/internal/worker/index.go
@@ -0,0 +1,32 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package worker
+
+import (
+ "net/http"
+
+ "golang.org/x/pkgsite-metrics/internal/derrors"
+)
+
+type IndexPage struct {
+ basePage
+}
+
+func (s *Server) handleIndexPage(w http.ResponseWriter, r *http.Request) error {
+ if r.URL.Path != "/" {
+ return derrors.NotFound
+ }
+ tmpl, err := s.maybeLoadTemplate(indexTemplate)
+ if err != nil {
+ return err
+ }
+ return renderPage(r.Context(), w, s.createIndexPage(), tmpl)
+}
+
+func (s *Server) createIndexPage() *IndexPage {
+ return &IndexPage{
+ basePage: newBasePage(),
+ }
+}
diff --git a/internal/worker/server.go b/internal/worker/server.go
new file mode 100644
index 0000000..8ad8b42
--- /dev/null
+++ b/internal/worker/server.go
@@ -0,0 +1,205 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package worker
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "net/http"
+ "os"
+ "path/filepath"
+ "strings"
+ "sync"
+ "time"
+
+ "cloud.google.com/go/errorreporting"
+ "github.com/google/safehtml/template"
+ "golang.org/x/exp/event"
+ "golang.org/x/pkgsite-metrics/internal/bigquery"
+ "golang.org/x/pkgsite-metrics/internal/config"
+ "golang.org/x/pkgsite-metrics/internal/derrors"
+ "golang.org/x/pkgsite-metrics/internal/log"
+ "golang.org/x/pkgsite-metrics/internal/observe"
+ "golang.org/x/pkgsite-metrics/internal/proxy"
+)
+
+type Server struct {
+ cfg *config.Config
+ observer *observe.Observer
+ bqClient *bigquery.Client
+ proxyClient *proxy.Client
+ staticPath template.TrustedSource
+
+ devMode bool
+ mu sync.Mutex
+ templates map[string]*template.Template
+}
+
+var errBQDisabled = &serverError{http.StatusPreconditionRequired, errors.New("BigQuery disabled on this server")}
+
+func NewServer(ctx context.Context, cfg *config.Config) (_ *Server, err error) {
+ defer derrors.WrapAndReport(&err, "NewServer")
+
+ var bq *bigquery.Client
+ if strings.EqualFold(cfg.BigQueryDataset, "disable") {
+ log.Infof(ctx, "BigQuery disabled")
+ } else {
+ bq, err = bigquery.NewClientCreate(ctx, cfg.ProjectID, cfg.BigQueryDataset)
+ if err != nil {
+ return nil, err
+ }
+ for _, tableID := range bigquery.Tables() {
+ created, err := bq.CreateOrUpdateTable(ctx, tableID)
+ if err != nil {
+ return nil, err
+ }
+ verb := "updated"
+ if created {
+ verb = "created"
+ }
+ log.Infof(ctx, "%s table %s\n", verb, tableID)
+ }
+ }
+
+ proxyClient, err := proxy.New(cfg.ProxyURL)
+ if err != nil {
+ return nil, err
+ }
+ s := &Server{
+ cfg: cfg,
+ bqClient: bq,
+ proxyClient: proxyClient,
+ devMode: cfg.DevMode,
+ staticPath: cfg.StaticPath,
+ }
+ if err := s.loadTemplates(); err != nil {
+ return nil, err
+ }
+
+ s.observer, err = observe.NewObserver(ctx, cfg.ProjectID, "go-metrics-worker")
+ if err != nil {
+ return nil, err
+ }
+ // This function will be called for each request.
+ // It lets us install a log handler that knows about the request's
+ // trace ID.
+ s.observer.LogHandlerFunc = func(r *http.Request) event.Handler {
+ traceID := r.Header.Get("X-Cloud-Trace-Context")
+ return log.NewGCPJSONHandler(os.Stderr, traceID)
+ }
+
+ if cfg.UseErrorReporting {
+ reportingClient, err := errorreporting.NewClient(ctx, cfg.ProjectID, errorreporting.Config{
+ ServiceName: cfg.ServiceID,
+ OnError: func(err error) {
+ log.Errorf(ctx, "Error reporting failed: %v", err)
+ },
+ })
+ if err != nil {
+ return nil, err
+ }
+ derrors.SetReportingClient(reportingClient)
+ }
+
+ http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir(s.staticPath.String()))))
+
+ s.handle("/favicon.ico", func(w http.ResponseWriter, r *http.Request) error {
+ http.ServeFile(w, r, filepath.Join(s.staticPath.String(), "favicon.ico"))
+ return nil
+ })
+ s.handle("/", s.handleIndexPage)
+ return s, nil
+}
+
+const metricNamespace = "ecosystem/worker"
+
+type handlerFunc func(w http.ResponseWriter, r *http.Request) error
+
+func (s *Server) handle(pattern string, handler handlerFunc) {
+ h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ start := time.Now()
+ ctx := r.Context()
+ log.With("httpRequest", r).Infof(ctx, "starting %s", r.URL.Path)
+
+ w2 := &responseWriter{ResponseWriter: w}
+ if err := handler(w2, r); err != nil {
+ log.Errorf(ctx, err.Error())
+ derrors.Report(err)
+ s.serveError(ctx, w2, r, err)
+ }
+ log.With(
+ "latency", time.Since(start),
+ "status", translateStatus(w2.status)).
+ Infof(ctx, "request end")
+ })
+ http.Handle(pattern, s.observer.Observe(h))
+}
+
+type serverError struct {
+ status int // HTTP status code
+ err error // wrapped error
+}
+
+func (s *serverError) Error() string {
+ return fmt.Sprintf("%d (%s): %v", s.status, http.StatusText(s.status), s.err)
+}
+
+func (s *Server) serveError(ctx context.Context, w http.ResponseWriter, _ *http.Request, err error) {
+ if errors.Is(err, derrors.InvalidArgument) {
+ err = &serverError{err: err, status: http.StatusBadRequest}
+ }
+ if errors.Is(err, derrors.NotFound) {
+ err = &serverError{err: err, status: http.StatusNotFound}
+ }
+ if errors.Is(err, derrors.BadModule) {
+ err = &serverError{err: err, status: http.StatusNotAcceptable}
+ }
+ serr, ok := err.(*serverError)
+ if !ok {
+ serr = &serverError{status: http.StatusInternalServerError, err: err}
+ }
+ if serr.status == http.StatusInternalServerError {
+ log.Errorf(ctx, serr.err.Error())
+ } else {
+ log.Warningf(ctx, "returning %v", err)
+ }
+ http.Error(w, serr.err.Error(), serr.status)
+}
+
+type responseWriter struct {
+ http.ResponseWriter
+ status int
+}
+
+func (rw *responseWriter) WriteHeader(code int) {
+ rw.status = code
+ rw.ResponseWriter.WriteHeader(code)
+}
+
+func translateStatus(code int) int64 {
+ if code == 0 {
+ return http.StatusOK
+ }
+ return int64(code)
+}
+
+var locNewYork *time.Location
+
+func init() {
+ var err error
+ locNewYork, err = time.LoadLocation("America/New_York")
+ if err != nil {
+ log.Errorf(context.Background(), "time.LoadLocation: %v", err)
+ os.Exit(1)
+ }
+}
+
+func FormatTime(t time.Time) string {
+ if t.IsZero() {
+ return "-"
+ }
+ return t.In(locNewYork).Format("2006-01-02 15:04:05")
+}
diff --git a/internal/worker/templates.go b/internal/worker/templates.go
new file mode 100644
index 0000000..9753459
--- /dev/null
+++ b/internal/worker/templates.go
@@ -0,0 +1,75 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package worker
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+
+ "github.com/google/safehtml/template"
+ "golang.org/x/pkgsite-metrics/internal/derrors"
+ "golang.org/x/pkgsite-metrics/internal/log"
+)
+
+const (
+ indexTemplate = "worker.tmpl"
+)
+
+type basePage struct {
+}
+
+func newBasePage() basePage {
+ return basePage{}
+}
+
+func (s *Server) loadTemplates() error {
+ index, err := s.parseTemplate(template.TrustedSourceFromConstant(indexTemplate))
+ if err != nil {
+ return err
+ }
+ s.templates = map[string]*template.Template{
+ indexTemplate: index,
+ }
+ return nil
+}
+
+func (s *Server) maybeLoadTemplate(tmplName string) (*template.Template, error) {
+ if s.devMode {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ var err error
+ if err = s.loadTemplates(); err != nil {
+ return nil, fmt.Errorf("error parsing templates: %v", err)
+ }
+ }
+ return s.templates[tmplName], nil
+}
+
+// Parse the template for the status page.
+func (s *Server) parseTemplate(filename template.TrustedSource) (*template.Template, error) {
+ templatePath := template.TrustedSourceJoin(s.staticPath, filename)
+ return template.New(filename.String()).Funcs(template.FuncMap{
+ "timefmt": FormatTime,
+ "commasep": func(s []string) string { return strings.Join(s, ", ") },
+ "round": func(n float64) string { return fmt.Sprintf("%.2f", n) },
+ }).ParseFilesFromTrustedSources(templatePath)
+}
+
+func renderPage(ctx context.Context, w http.ResponseWriter, page interface{}, tmpl *template.Template) (err error) {
+ defer derrors.Wrap(&err, "renderPage")
+ var buf bytes.Buffer
+ if err := tmpl.Execute(&buf, page); err != nil {
+ return err
+ }
+ if _, err := io.Copy(w, &buf); err != nil {
+ log.Infof(ctx, "Error copying buffer to ResponseWriter: %v", err)
+ return err
+ }
+ return nil
+}
diff --git a/static/static.css b/static/static.css
new file mode 100644
index 0000000..1639534
--- /dev/null
+++ b/static/static.css
@@ -0,0 +1,7 @@
+/*
+ * Copyright 2021 The Go Authors. All rights reserved.
+ * Use of this source code is governed by a BSD-style
+ * license that can be found in the LICENSE file.
+ */
+
+@import url('shared/shared.css');
diff --git a/static/worker.tmpl b/static/worker.tmpl
new file mode 100644
index 0000000..aea5637
--- /dev/null
+++ b/static/worker.tmpl
@@ -0,0 +1,16 @@
+<!--
+ Copyright 2022 The Go Authors. All rights reserved.
+ Use of this source code is governed by a BSD-style
+ license that can be found in the LICENSE file.
+-->
+
+<!DOCTYPE html>
+<html lang="en">
+<meta charset="utf-8">
+<link href="/static/static.css" rel="stylesheet">
+<title>Go Metrics</title>
+
+<body>
+ <h1>Hello!</h1>
+</body>
+</html>