cmd,internal: add initial worker

An initial version of the worker server is added. At the moment, it just
serves an index page.

Change-Id: Ib707cde92ddbfbd7dd6bde9d37e8612f470ed41b
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/464619
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Julie Qiu <julieqiu@google.com>
Reviewed-by: Julie Qiu <julieqiu@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Auto-Submit: Julie Qiu <julieqiu@google.com>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index cf7301e..b9e90d4 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -2,13 +2,68 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-// Command worker runs the worker server.
+// Command worker runs the go-metrics worker server.
 package main
 
 import (
+	"context"
+	"flag"
 	"fmt"
+	"net/http"
+	"os"
+
+	"golang.org/x/exp/event"
+	"golang.org/x/pkgsite-metrics/internal/config"
+	"golang.org/x/pkgsite-metrics/internal/log"
+	"golang.org/x/pkgsite-metrics/internal/worker"
+)
+
+var (
+	workers  = flag.Int("workers", 10, "number of concurrent requests to the fetch service, when running locally")
+	devMode  = flag.Bool("dev", false, "enable developer mode (reload templates on each page load, serve non-minified JS/CSS, etc.)")
+	port     = flag.String("port", config.GetEnv("PORT", "8080"), "port to listen to")
+	dataset  = flag.String("dataset", "", "dataset (overrides GO_ECOSYSTEM_METRICS_BIGQUERY_DATASET env var); use 'disable' for no BQ")
+	insecure = flag.Bool("insecure", false, "bypass sandbox in order to compare with old code")
+	// flag used in call to safehtml/template.TrustedSourceFromFlag
+	_ = flag.String("static", "static", "path to folder containing static files served")
 )
 
 func main() {
-	fmt.Println("Hello World!")
+	flag.Usage = func() {
+		out := flag.CommandLine.Output()
+		fmt.Fprintln(out, "usage:")
+		fmt.Fprintln(out, "worker FLAGS")
+		fmt.Fprintln(out, "  run as a server, listening at the PORT env var")
+		flag.PrintDefaults()
+	}
+
+	flag.Parse()
+	ctx := context.Background()
+	ctx = event.WithExporter(ctx,
+		event.NewExporter(log.NewLineHandler(os.Stderr), nil))
+
+	if err := runServer(ctx); err != nil {
+		log.Fatal(ctx, err)
+	}
+}
+
+func runServer(ctx context.Context) error {
+	cfg, err := config.Init(ctx)
+	if err != nil {
+		return err
+	}
+	cfg.LocalQueueWorkers = *workers
+	cfg.DevMode = *devMode
+	if *dataset != "" {
+		cfg.BigQueryDataset = *dataset
+	}
+	cfg.Insecure = *insecure
+	cfg.Dump(os.Stdout)
+	log.Infof(ctx, "config: project=%s, dataset=%s", cfg.ProjectID, cfg.BigQueryDataset)
+	if _, err := worker.NewServer(ctx, cfg); err != nil {
+		return err
+	}
+	addr := ":" + *port
+	log.Infof(ctx, "Listening on addr http://localhost%s", addr)
+	return fmt.Errorf("listening: %v", http.ListenAndServe(addr, nil))
 }
diff --git a/go.mod b/go.mod
index 4729553..41351a8 100644
--- a/go.mod
+++ b/go.mod
@@ -14,6 +14,7 @@
 	github.com/go-git/go-billy/v5 v5.4.0
 	github.com/go-git/go-git/v5 v5.5.2
 	github.com/google/go-cmp v0.5.9
+	github.com/google/safehtml v0.1.0
 	go.opencensus.io v0.24.0
 	go.opentelemetry.io/otel v1.4.0
 	go.opentelemetry.io/otel/sdk v1.4.0
@@ -23,6 +24,7 @@
 	golang.org/x/net v0.5.0
 	golang.org/x/tools v0.5.1-0.20230117180257-8aba49bb5ea2
 	google.golang.org/api v0.103.0
+	google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c
 	honnef.co/go/tools v0.2.2
 	mvdan.cc/unparam v0.0.0-20220926085101-66de63301820
 )
@@ -67,7 +69,6 @@
 	golang.org/x/text v0.6.0 // indirect
 	golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
 	google.golang.org/appengine v1.6.7 // indirect
-	google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c // indirect
 	google.golang.org/grpc v1.50.1 // indirect
 	google.golang.org/protobuf v1.28.1 // indirect
 	gopkg.in/warnings.v0 v0.1.2 // indirect
diff --git a/go.sum b/go.sum
index 4f2b161..e7be449 100644
--- a/go.sum
+++ b/go.sum
@@ -226,6 +226,8 @@
 github.com/google/pprof v0.0.0-20210715191844-86eeefc3e471/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
+github.com/google/safehtml v0.1.0 h1:EwLKo8qawTKfsi0orxcQAZzu07cICaBeFMegAU9eaT8=
+github.com/google/safehtml v0.1.0/go.mod h1:L4KWwDsUJdECRAEpZoBn3O64bQaywRscowZjJAzjHnU=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
 github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
diff --git a/internal/config/config.go b/internal/config/config.go
new file mode 100644
index 0000000..f919027
--- /dev/null
+++ b/internal/config/config.go
@@ -0,0 +1,222 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package config
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"flag"
+	"fmt"
+	"io"
+	"net/http"
+	"os"
+	"strconv"
+
+	"github.com/google/safehtml/template"
+	"golang.org/x/net/context/ctxhttp"
+	"golang.org/x/pkgsite-metrics/internal/derrors"
+	mrpb "google.golang.org/genproto/googleapis/api/monitoredres"
+)
+
+// Config holds configuration information for the worker server.
+type Config struct {
+	// ProjectID is the Google Cloud ProjectID where the resources live.
+	ProjectID string
+
+	// Identifier for the version currently running.
+	// We do not use the version ID from Cloud Run (see
+	// https://cloud.google.com/run/docs/reference/container-contract).
+	// Instead, we use the DOCKER_IMAGE environment variable, set
+	// in the Cloud Build deploy file.
+	VersionID string
+
+	// LocationID is the location for the GCP project.
+	LocationID string
+
+	// ServiceID names the Cloud Run service.
+	ServiceID string
+
+	// StaticPath is the directory containing static files.
+	StaticPath template.TrustedSource
+
+	// ServiceAccount is the email of the service account that this process
+	// is running as when on GCP.
+	ServiceAccount string
+
+	// UseErrorReporting determines whether errors go to the Error Reporting API.
+	UseErrorReporting bool
+
+	// BigQuery dataset to write results to.
+	BigQueryDataset string
+
+	// QueueName is the name of the Cloud Tasks queue.
+	QueueName string
+
+	// QueueURL is the URL that the Cloud Tasks queue should send requests to.
+	// It should be used when the worker is not on AppEngine.
+	QueueURL string
+
+	// LocalQueueWorkers is the number of concurrent requests to the fetch service, when running locally.
+	LocalQueueWorkers int
+
+	// MonitoredResource represents the resource that is running the current binary.
+	// It might be a Google AppEngine app, a Cloud Run service, or a Kubernetes pod.
+	// See https://cloud.google.com/monitoring/api/resources for more
+	// details:
+	// "An object representing a resource that can be used for monitoring, logging,
+	// billing, or other purposes. Examples include virtual machine instances,
+	// databases, and storage devices such as disks.""
+	MonitoredResource *mrpb.MonitoredResource
+
+	// DevMode indicates whether the server is running in development mode.
+	DevMode bool
+
+	// The host, port and user of the pkgsite database used to find
+	// modules to scan.
+	PkgsiteDBHost string
+	PkgsiteDBPort string
+	PkgsiteDBName string
+	PkgsiteDBUser string
+	// The name of the Secret Manager secret holding the DB password.
+	PkgsiteDBSecret string
+
+	// Run analysis binaries without sandbox.
+	Insecure bool
+
+	// ProxyURL is the URL of the module proxy.
+	ProxyURL string
+}
+
+// Init resolves all configuration values provided by the config package. It
+// must be called before any configuration values are used.
+func Init(ctx context.Context) (_ *Config, err error) {
+	defer derrors.Wrap(&err, "config.Init(ctx)")
+	// Build a Config from the execution environment, loading some values
+	// from environment variables.
+
+	var ts template.TrustedSource
+	if f := flag.Lookup("static"); f != nil {
+		ts = template.TrustedSourceFromFlag(f.Value)
+	}
+	cfg := &Config{
+		ProjectID:       GetEnv("GOOGLE_CLOUD_PROJECT", "go-ecosystem"),
+		ServiceID:       "go-ecosystem-worker",
+		VersionID:       os.Getenv("DOCKER_IMAGE"),
+		LocationID:      "us-central1",
+		StaticPath:      ts,
+		BigQueryDataset: GetEnv("GO_ECOSYSTEM_BIGQUERY_DATASET", "test"),
+		QueueName:       os.Getenv("GO_ECOSYSTEM_QUEUE_NAME"),
+		QueueURL:        os.Getenv("GO_ECOSYSTEM_QUEUE_URL"),
+		PkgsiteDBHost:   GetEnv("GO_ECOSYSTEM_PKGSITE_DB_HOST", "localhost"),
+		PkgsiteDBPort:   GetEnv("GO_ECOSYSTEM_PKGSITE_DB_PORT", "5432"),
+		PkgsiteDBName:   GetEnv("GO_ECOSYSTEM_PKGSITE_DB_NAME", "discovery-db"),
+		PkgsiteDBUser:   GetEnv("GO_ECOSYSTEM_PKGSITE_DB_USER", "postgres"),
+		PkgsiteDBSecret: os.Getenv("GO_ECOSYSTEM_PKGSITE_DB_SECRET"),
+		ProxyURL:        GetEnv("GO_ECOSYSTEM_PROXY_URL", "https://proxy.golang.org"),
+	}
+	if OnCloudRun() {
+		sa, err := gceMetadata(ctx, "instance/service-accounts/default/email")
+		if err != nil {
+			return nil, err
+		}
+		cfg.ServiceAccount = sa
+		cfg.MonitoredResource = &mrpb.MonitoredResource{
+			Type: "cloud_run_revision",
+			Labels: map[string]string{
+				"project_id":         cfg.ProjectID,
+				"service_name":       cfg.ServiceID,
+				"revision_name":      cfg.VersionID,
+				"configuration_name": os.Getenv("K_CONFIGURATION"),
+			},
+		}
+		cfg.UseErrorReporting = true
+	} else { // running locally, perhaps
+		cfg.MonitoredResource = &mrpb.MonitoredResource{
+			Type:   "global",
+			Labels: map[string]string{"project_id": cfg.ProjectID},
+		}
+	}
+	return cfg, nil
+}
+
+// OnCloudRun reports whether the current process is running on Cloud Run.
+func OnCloudRun() bool {
+	// Use the presence of the environment variables provided by Cloud Run.
+	// See https://cloud.google.com/run/docs/reference/container-contract.
+	for _, ev := range []string{"K_SERVICE", "K_REVISION", "K_CONFIGURATION"} {
+		if os.Getenv(ev) == "" {
+			return false
+		}
+	}
+	return true
+}
+
+func (c *Config) Validate() error {
+	if c.ProjectID == "" {
+		return errors.New("missing project")
+	}
+	if c.BigQueryDataset == "" {
+		return errors.New("missing dataset")
+	}
+	return nil
+}
+
+// Dump outputs the current config information to the given Writer.
+func (c *Config) Dump(w io.Writer) error {
+	fmt.Fprint(w, "config: ")
+	enc := json.NewEncoder(w)
+	enc.SetIndent("", "    ")
+	return enc.Encode(c)
+}
+
+// GetEnv looks up the given key from the environment, returning its value if
+// it exists, and otherwise returning the given fallback value.
+func GetEnv(key, fallback string) string {
+	if value, ok := os.LookupEnv(key); ok {
+		return value
+	}
+	return fallback
+}
+
+// GetEnvInt performs GetEnv(key, fallback) and parses the
+// result as int. If parsing fails, returns errVal.
+func GetEnvInt(key, fallback string, errVal int) int {
+	v := GetEnv(key, fallback)
+	i, err := strconv.Atoi(v)
+	if err != nil {
+		return errVal
+	}
+	return i
+}
+
+// gceMetadata reads a metadata value from GCE.
+// For the possible values of name, see
+// https://cloud.google.com/appengine/docs/standard/java/accessing-instance-metadata.
+func gceMetadata(ctx context.Context, name string) (_ string, err error) {
+	// See https://cloud.google.com/appengine/docs/standard/java/accessing-instance-metadata.
+	// (This documentation doesn't exist for Golang, but it seems to work).
+	defer derrors.Wrap(&err, "gceMetadata(ctx, %q)", name)
+
+	const metadataURL = "http://metadata.google.internal/computeMetadata/v1/"
+	req, err := http.NewRequest("GET", metadataURL+name, nil)
+	if err != nil {
+		return "", fmt.Errorf("http.NewRequest: %v", err)
+	}
+	req.Header.Set("Metadata-Flavor", "Google")
+	resp, err := ctxhttp.Do(ctx, nil, req)
+	if err != nil {
+		return "", fmt.Errorf("ctxhttp.Do: %v", err)
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode != http.StatusOK {
+		return "", fmt.Errorf("bad status: %s", resp.Status)
+	}
+	bytes, err := io.ReadAll(resp.Body)
+	if err != nil {
+		return "", fmt.Errorf("io.ReadAll: %v", err)
+	}
+	return string(bytes), nil
+}
diff --git a/internal/parse.go b/internal/parse.go
new file mode 100644
index 0000000..2c21f70
--- /dev/null
+++ b/internal/parse.go
@@ -0,0 +1,205 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+	"bufio"
+	"fmt"
+	"net/http"
+	"os"
+	"strconv"
+	"strings"
+
+	"golang.org/x/pkgsite-metrics/internal/derrors"
+	"golang.org/x/pkgsite-metrics/internal/version"
+)
+
+// ScanRequest contains information passed
+// to a scan endpoint.
+type ScanRequest struct {
+	Module     string
+	Version    string
+	Suffix     string
+	ImportedBy int
+	Mode       string
+	Insecure   bool
+
+	// TODO: support optional parameters?
+}
+
+func (s *ScanRequest) URLPathAndParams() string {
+	suf := s.Suffix
+	if suf != "" {
+		suf = "/" + suf
+	}
+	return fmt.Sprintf("%s/@v/%s%s?importedby=%d&mode=%s&insecure=%t", s.Module, s.Version, suf, s.ImportedBy, s.Mode, s.Insecure)
+}
+
+func (s *ScanRequest) Path() string {
+	p := s.Module + "@" + s.Version
+	if s.Suffix != "" {
+		p += "/" + s.Suffix
+	}
+	return p
+}
+
+// ParseScanRequest parses an http request r for an endpoint
+// scanPrefix and produces a corresponding ScanRequest.
+//
+// The module and version should have one of the following three forms:
+//   - <module>/@v/<version>
+//   - <module>@<version>
+//   - <module>/@latest
+//
+// (These are the same forms that the module proxy accepts.)
+func ParseScanRequest(r *http.Request, scanPrefix string) (*ScanRequest, error) {
+	mod, vers, suff, err := ParseModuleVersionSuffix(strings.TrimPrefix(r.URL.Path, scanPrefix))
+	if err != nil {
+		return nil, err
+	}
+	importedBy, err := ParseRequiredIntParam(r, "importedby")
+	if err != nil {
+		return nil, err
+	}
+	insecure, err := ParseOptionalBoolParam(r, "insecure", false)
+	if err != nil {
+		return nil, err
+	}
+	return &ScanRequest{
+		Module:     mod,
+		Version:    vers,
+		Suffix:     suff,
+		ImportedBy: importedBy,
+		Mode:       ParseMode(r),
+		Insecure:   insecure,
+	}, nil
+}
+
+// ParseModuleVersionSuffix returns the module path, version and suffix described by
+// the argument. The suffix is the part of the path after the version.
+func ParseModuleVersionSuffix(requestPath string) (path, vers, suffix string, err error) {
+	p := strings.TrimPrefix(requestPath, "/")
+	modulePath, versionAndSuffix, found := strings.Cut(p, "@")
+	if !found {
+		return "", "", "", fmt.Errorf("invalid path %q: missing '@'", requestPath)
+	}
+	modulePath = strings.TrimSuffix(modulePath, "/")
+	if modulePath == "" {
+		return "", "", "", fmt.Errorf("invalid path %q: missing module", requestPath)
+	}
+	if strings.HasPrefix(versionAndSuffix, "v/") {
+		versionAndSuffix = versionAndSuffix[2:]
+	}
+	// Now versionAndSuffix begins with a version.
+	version, suffix, _ := strings.Cut(versionAndSuffix, "/")
+	if version == "" {
+		return "", "", "", fmt.Errorf("invalid path %q: missing version", requestPath)
+	}
+	if version[0] != 'v' {
+		version = "v" + version
+	}
+	return modulePath, version, suffix, nil
+}
+
+func ParseRequiredIntParam(r *http.Request, name string) (int, error) {
+	value := r.FormValue(name)
+	if value == "" {
+		return 0, fmt.Errorf("missing query param %q", name)
+	}
+	return ParseIntParam(value, name)
+}
+
+func ParseOptionalIntParam(r *http.Request, name string, def int) (int, error) {
+	value := r.FormValue(name)
+	if value == "" {
+		return def, nil
+	}
+	return ParseIntParam(value, name)
+}
+
+func ParseIntParam(value, name string) (int, error) {
+	n, err := strconv.Atoi(value)
+	if err != nil {
+		return 0, fmt.Errorf("want integer for %q query param, got %q", name, value)
+	}
+	return n, nil
+}
+
+func ParseOptionalBoolParam(r *http.Request, name string, def bool) (bool, error) {
+	s := r.FormValue(name)
+	if s == "" {
+		return def, nil
+	}
+	return strconv.ParseBool(s)
+}
+
+func ParseMode(r *http.Request) string {
+	const name = "mode"
+	// "" is allowed mode as some endpoints
+	// might equate it with their default mode.
+	return r.FormValue(name)
+}
+
+type ModuleSpec struct {
+	Path, Version string
+	ImportedBy    int
+}
+
+func ParseCorpusFile(filename string, minImportedByCount int) (ms []ModuleSpec, err error) {
+	defer derrors.Wrap(&err, "parseCorpusFile(%q)", filename)
+	lines, err := ReadFileLines(filename)
+	if err != nil {
+		return nil, err
+	}
+	for _, line := range lines {
+		fields := strings.Fields(line)
+		var path, vers, imps string
+		switch len(fields) {
+		case 2: // no version (temporary)
+			path = fields[0]
+			vers = version.Latest
+			imps = fields[1]
+		case 3:
+			path = fields[0]
+			vers = fields[1]
+			imps = fields[2]
+		default:
+			return nil, fmt.Errorf("wrong number of fields on line %q", line)
+		}
+		n, err := strconv.Atoi(imps)
+		if err != nil {
+			return nil, fmt.Errorf("%v on line %q", err, line)
+		}
+		if n >= minImportedByCount {
+			ms = append(ms, ModuleSpec{Path: path, Version: vers, ImportedBy: n})
+		}
+	}
+	return ms, nil
+}
+
+// ReadFilelines reads and returns the lines from a file.
+// Whitespace on each line is trimmed.
+// Blank lines and lines beginning with '#' are ignored.
+func ReadFileLines(filename string) (lines []string, err error) {
+	defer derrors.Wrap(&err, "readFileLines(%q)", filename)
+	f, err := os.Open(filename)
+	if err != nil {
+		return nil, err
+	}
+	defer f.Close()
+
+	s := bufio.NewScanner(f)
+	for s.Scan() {
+		line := strings.TrimSpace(s.Text())
+		if line == "" || strings.HasPrefix(line, "#") {
+			continue
+		}
+		lines = append(lines, line)
+	}
+	if s.Err() != nil {
+		return nil, s.Err()
+	}
+	return lines, nil
+}
diff --git a/internal/parse_test.go b/internal/parse_test.go
new file mode 100644
index 0000000..74f7934
--- /dev/null
+++ b/internal/parse_test.go
@@ -0,0 +1,164 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+	"net/http"
+	"net/url"
+	"testing"
+
+	"github.com/google/go-cmp/cmp"
+	"golang.org/x/pkgsite-metrics/internal/version"
+)
+
+func TestParseScanRequest(t *testing.T) {
+	for _, test := range []struct {
+		name string
+		url  string
+		want ScanRequest
+	}{
+		{
+			name: "ValidScanURL",
+			url:  "https://worker.com/scan/module/@v/v1.0.0?importedby=50",
+			want: ScanRequest{
+				Module:     "module",
+				Version:    "v1.0.0",
+				ImportedBy: 50,
+				Mode:       "",
+			},
+		},
+		{
+			name: "ValidImportsOnlyScanURL",
+			url:  "https://worker.com/scan/module/@v/v1.0.0-abcdefgh?importedby=100&mode=mode1",
+			want: ScanRequest{
+				Module:     "module",
+				Version:    "v1.0.0-abcdefgh",
+				ImportedBy: 100,
+				Mode:       "mode1",
+			},
+		},
+		{
+			name: "Module@Version",
+			url:  "https://worker.com/scan/module@v1.2.3?importedby=1",
+			want: ScanRequest{
+				Module:     "module",
+				Version:    "v1.2.3",
+				ImportedBy: 1,
+				Mode:       "",
+			},
+		},
+		{
+			name: "Module@Version suffix",
+			url:  "https://worker.com/scan/module@v1.2.3/path/to/dir?importedby=1",
+			want: ScanRequest{
+				Module:     "module",
+				Version:    "v1.2.3",
+				Suffix:     "path/to/dir",
+				ImportedBy: 1,
+				Mode:       "",
+			},
+		},
+	} {
+		t.Run(test.name, func(t *testing.T) {
+			u, err := url.Parse(test.url)
+			if err != nil {
+				t.Errorf("url.Parse(%q): %v", test.url, err)
+			}
+			r := &http.Request{URL: u}
+			got, err := ParseScanRequest(r, "/scan")
+			if err != nil {
+				t.Fatal(err)
+			}
+			if g, w := *got, test.want; g != w {
+				t.Errorf("\ngot  %+v\nwant %+v", g, w)
+			}
+		})
+	}
+}
+
+func TestParseScanRequestError(t *testing.T) {
+	for _, test := range []struct {
+		name string
+		url  string
+		want string
+	}{
+		{
+			name: "InvalidScanURL",
+			url:  "/",
+			want: `invalid path "/": missing '@'`,
+		},
+		{
+			name: "InvalidScanURLNoModule",
+			url:  "/@v/version",
+			want: `invalid path "/@v/version": missing module`,
+		},
+		{
+			name: "InvalidScanURLNoVersion",
+			url:  "/module/@v/",
+			want: `invalid path "/module/@v/": missing version`,
+		},
+		{
+			name: "NoVersion",
+			url:  "/module@",
+			want: `invalid path "/module@": missing version`,
+		},
+		{
+			name: "NoVersionSuffix",
+			url:  "/module@/suffix",
+			want: `invalid path "/module@/suffix": missing version`,
+		},
+		{
+			name: "MissingImportedBy",
+			url:  "/module/@v/v1.0.0",
+			want: `missing query param "importedby"`,
+		},
+		{
+			name: "BadImportedBy",
+			url:  "/module@v1?importedby=1a",
+			want: `want integer for "importedby" query param, got "1a"`,
+		},
+	} {
+		t.Run(test.name, func(t *testing.T) {
+			u, err := url.Parse(test.url)
+			if err != nil {
+				t.Errorf("url.Parse(%q): %v", test.url, err)
+			}
+			r := &http.Request{URL: u}
+			if _, err := ParseScanRequest(r, "/scan"); err != nil {
+				if got := err.Error(); got != test.want {
+					t.Fatalf("\ngot  %s\nwant %s", got, test.want)
+				}
+			} else {
+				t.Fatalf("error = nil; want = (%v)", test.want)
+			}
+		})
+	}
+}
+
+func TestParseCorpusFile(t *testing.T) {
+	const file = "testdata/modules.txt"
+	got, err := ParseCorpusFile(file, 1)
+	if err != nil {
+		t.Fatal(err)
+	}
+	want := []ModuleSpec{
+		{"m1", "v1.0.0", 18},
+		{"m2", "v2.3.4", 5},
+		{"m3", version.Latest, 1},
+	}
+
+	if !cmp.Equal(got, want) {
+		t.Errorf("\n got %v\nwant %v", got, want)
+	}
+
+	got, err = ParseCorpusFile(file, 10)
+	if err != nil {
+		t.Fatal(err)
+	}
+	want = want[:1]
+	if !cmp.Equal(got, want) {
+		t.Errorf("\n got %v\nwant %v", got, want)
+	}
+}
diff --git a/internal/testdata/modules.txt b/internal/testdata/modules.txt
new file mode 100644
index 0000000..1b2b419
--- /dev/null
+++ b/internal/testdata/modules.txt
@@ -0,0 +1,5 @@
+# test module corpus file
+
+m1 v1.0.0 18
+m2 v2.3.4 5
+m3  1
diff --git a/internal/worker/index.go b/internal/worker/index.go
new file mode 100644
index 0000000..a8cc2e6
--- /dev/null
+++ b/internal/worker/index.go
@@ -0,0 +1,32 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package worker
+
+import (
+	"net/http"
+
+	"golang.org/x/pkgsite-metrics/internal/derrors"
+)
+
+type IndexPage struct {
+	basePage
+}
+
+func (s *Server) handleIndexPage(w http.ResponseWriter, r *http.Request) error {
+	if r.URL.Path != "/" {
+		return derrors.NotFound
+	}
+	tmpl, err := s.maybeLoadTemplate(indexTemplate)
+	if err != nil {
+		return err
+	}
+	return renderPage(r.Context(), w, s.createIndexPage(), tmpl)
+}
+
+func (s *Server) createIndexPage() *IndexPage {
+	return &IndexPage{
+		basePage: newBasePage(),
+	}
+}
diff --git a/internal/worker/server.go b/internal/worker/server.go
new file mode 100644
index 0000000..8ad8b42
--- /dev/null
+++ b/internal/worker/server.go
@@ -0,0 +1,205 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package worker
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"net/http"
+	"os"
+	"path/filepath"
+	"strings"
+	"sync"
+	"time"
+
+	"cloud.google.com/go/errorreporting"
+	"github.com/google/safehtml/template"
+	"golang.org/x/exp/event"
+	"golang.org/x/pkgsite-metrics/internal/bigquery"
+	"golang.org/x/pkgsite-metrics/internal/config"
+	"golang.org/x/pkgsite-metrics/internal/derrors"
+	"golang.org/x/pkgsite-metrics/internal/log"
+	"golang.org/x/pkgsite-metrics/internal/observe"
+	"golang.org/x/pkgsite-metrics/internal/proxy"
+)
+
+type Server struct {
+	cfg         *config.Config
+	observer    *observe.Observer
+	bqClient    *bigquery.Client
+	proxyClient *proxy.Client
+	staticPath  template.TrustedSource
+
+	devMode   bool
+	mu        sync.Mutex
+	templates map[string]*template.Template
+}
+
+var errBQDisabled = &serverError{http.StatusPreconditionRequired, errors.New("BigQuery disabled on this server")}
+
+func NewServer(ctx context.Context, cfg *config.Config) (_ *Server, err error) {
+	defer derrors.WrapAndReport(&err, "NewServer")
+
+	var bq *bigquery.Client
+	if strings.EqualFold(cfg.BigQueryDataset, "disable") {
+		log.Infof(ctx, "BigQuery disabled")
+	} else {
+		bq, err = bigquery.NewClientCreate(ctx, cfg.ProjectID, cfg.BigQueryDataset)
+		if err != nil {
+			return nil, err
+		}
+		for _, tableID := range bigquery.Tables() {
+			created, err := bq.CreateOrUpdateTable(ctx, tableID)
+			if err != nil {
+				return nil, err
+			}
+			verb := "updated"
+			if created {
+				verb = "created"
+			}
+			log.Infof(ctx, "%s table %s\n", verb, tableID)
+		}
+	}
+
+	proxyClient, err := proxy.New(cfg.ProxyURL)
+	if err != nil {
+		return nil, err
+	}
+	s := &Server{
+		cfg:         cfg,
+		bqClient:    bq,
+		proxyClient: proxyClient,
+		devMode:     cfg.DevMode,
+		staticPath:  cfg.StaticPath,
+	}
+	if err := s.loadTemplates(); err != nil {
+		return nil, err
+	}
+
+	s.observer, err = observe.NewObserver(ctx, cfg.ProjectID, "go-metrics-worker")
+	if err != nil {
+		return nil, err
+	}
+	// This function will be called for each request.
+	// It lets us install a log handler that knows about the request's
+	// trace ID.
+	s.observer.LogHandlerFunc = func(r *http.Request) event.Handler {
+		traceID := r.Header.Get("X-Cloud-Trace-Context")
+		return log.NewGCPJSONHandler(os.Stderr, traceID)
+	}
+
+	if cfg.UseErrorReporting {
+		reportingClient, err := errorreporting.NewClient(ctx, cfg.ProjectID, errorreporting.Config{
+			ServiceName: cfg.ServiceID,
+			OnError: func(err error) {
+				log.Errorf(ctx, "Error reporting failed: %v", err)
+			},
+		})
+		if err != nil {
+			return nil, err
+		}
+		derrors.SetReportingClient(reportingClient)
+	}
+
+	http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir(s.staticPath.String()))))
+
+	s.handle("/favicon.ico", func(w http.ResponseWriter, r *http.Request) error {
+		http.ServeFile(w, r, filepath.Join(s.staticPath.String(), "favicon.ico"))
+		return nil
+	})
+	s.handle("/", s.handleIndexPage)
+	return s, nil
+}
+
+const metricNamespace = "ecosystem/worker"
+
+type handlerFunc func(w http.ResponseWriter, r *http.Request) error
+
+func (s *Server) handle(pattern string, handler handlerFunc) {
+	h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		start := time.Now()
+		ctx := r.Context()
+		log.With("httpRequest", r).Infof(ctx, "starting %s", r.URL.Path)
+
+		w2 := &responseWriter{ResponseWriter: w}
+		if err := handler(w2, r); err != nil {
+			log.Errorf(ctx, err.Error())
+			derrors.Report(err)
+			s.serveError(ctx, w2, r, err)
+		}
+		log.With(
+			"latency", time.Since(start),
+			"status", translateStatus(w2.status)).
+			Infof(ctx, "request end")
+	})
+	http.Handle(pattern, s.observer.Observe(h))
+}
+
+type serverError struct {
+	status int   // HTTP status code
+	err    error // wrapped error
+}
+
+func (s *serverError) Error() string {
+	return fmt.Sprintf("%d (%s): %v", s.status, http.StatusText(s.status), s.err)
+}
+
+func (s *Server) serveError(ctx context.Context, w http.ResponseWriter, _ *http.Request, err error) {
+	if errors.Is(err, derrors.InvalidArgument) {
+		err = &serverError{err: err, status: http.StatusBadRequest}
+	}
+	if errors.Is(err, derrors.NotFound) {
+		err = &serverError{err: err, status: http.StatusNotFound}
+	}
+	if errors.Is(err, derrors.BadModule) {
+		err = &serverError{err: err, status: http.StatusNotAcceptable}
+	}
+	serr, ok := err.(*serverError)
+	if !ok {
+		serr = &serverError{status: http.StatusInternalServerError, err: err}
+	}
+	if serr.status == http.StatusInternalServerError {
+		log.Errorf(ctx, serr.err.Error())
+	} else {
+		log.Warningf(ctx, "returning %v", err)
+	}
+	http.Error(w, serr.err.Error(), serr.status)
+}
+
+type responseWriter struct {
+	http.ResponseWriter
+	status int
+}
+
+func (rw *responseWriter) WriteHeader(code int) {
+	rw.status = code
+	rw.ResponseWriter.WriteHeader(code)
+}
+
+func translateStatus(code int) int64 {
+	if code == 0 {
+		return http.StatusOK
+	}
+	return int64(code)
+}
+
+var locNewYork *time.Location
+
+func init() {
+	var err error
+	locNewYork, err = time.LoadLocation("America/New_York")
+	if err != nil {
+		log.Errorf(context.Background(), "time.LoadLocation: %v", err)
+		os.Exit(1)
+	}
+}
+
+func FormatTime(t time.Time) string {
+	if t.IsZero() {
+		return "-"
+	}
+	return t.In(locNewYork).Format("2006-01-02 15:04:05")
+}
diff --git a/internal/worker/templates.go b/internal/worker/templates.go
new file mode 100644
index 0000000..9753459
--- /dev/null
+++ b/internal/worker/templates.go
@@ -0,0 +1,75 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package worker
+
+import (
+	"bytes"
+	"context"
+	"fmt"
+	"io"
+	"net/http"
+	"strings"
+
+	"github.com/google/safehtml/template"
+	"golang.org/x/pkgsite-metrics/internal/derrors"
+	"golang.org/x/pkgsite-metrics/internal/log"
+)
+
+const (
+	indexTemplate = "worker.tmpl"
+)
+
+type basePage struct {
+}
+
+func newBasePage() basePage {
+	return basePage{}
+}
+
+func (s *Server) loadTemplates() error {
+	index, err := s.parseTemplate(template.TrustedSourceFromConstant(indexTemplate))
+	if err != nil {
+		return err
+	}
+	s.templates = map[string]*template.Template{
+		indexTemplate: index,
+	}
+	return nil
+}
+
+func (s *Server) maybeLoadTemplate(tmplName string) (*template.Template, error) {
+	if s.devMode {
+		s.mu.Lock()
+		defer s.mu.Unlock()
+		var err error
+		if err = s.loadTemplates(); err != nil {
+			return nil, fmt.Errorf("error parsing templates: %v", err)
+		}
+	}
+	return s.templates[tmplName], nil
+}
+
+// Parse the template for the status page.
+func (s *Server) parseTemplate(filename template.TrustedSource) (*template.Template, error) {
+	templatePath := template.TrustedSourceJoin(s.staticPath, filename)
+	return template.New(filename.String()).Funcs(template.FuncMap{
+		"timefmt":  FormatTime,
+		"commasep": func(s []string) string { return strings.Join(s, ", ") },
+		"round":    func(n float64) string { return fmt.Sprintf("%.2f", n) },
+	}).ParseFilesFromTrustedSources(templatePath)
+}
+
+func renderPage(ctx context.Context, w http.ResponseWriter, page interface{}, tmpl *template.Template) (err error) {
+	defer derrors.Wrap(&err, "renderPage")
+	var buf bytes.Buffer
+	if err := tmpl.Execute(&buf, page); err != nil {
+		return err
+	}
+	if _, err := io.Copy(w, &buf); err != nil {
+		log.Infof(ctx, "Error copying buffer to ResponseWriter: %v", err)
+		return err
+	}
+	return nil
+}
diff --git a/static/static.css b/static/static.css
new file mode 100644
index 0000000..1639534
--- /dev/null
+++ b/static/static.css
@@ -0,0 +1,7 @@
+/*
+ * Copyright 2021 The Go Authors. All rights reserved.
+ * Use of this source code is governed by a BSD-style
+ * license that can be found in the LICENSE file.
+ */
+
+@import url('shared/shared.css');
diff --git a/static/worker.tmpl b/static/worker.tmpl
new file mode 100644
index 0000000..aea5637
--- /dev/null
+++ b/static/worker.tmpl
@@ -0,0 +1,16 @@
+<!--
+  Copyright 2022 The Go Authors. All rights reserved.
+  Use of this source code is governed by a BSD-style
+  license that can be found in the LICENSE file.
+-->
+
+<!DOCTYPE html>
+<html lang="en">
+<meta charset="utf-8">
+<link href="/static/static.css" rel="stylesheet">
+<title>Go Metrics</title>
+
+<body>
+  <h1>Hello!</h1>
+</body>
+</html>