internal/pkgsitedb: add package

Package pkgsitedb is added for connecting to the pkgsite database.

Change-Id: Icb1d3e6b901b5525854fc1e824f2efcbc6d6b274
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/464901
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Julie Qiu <julieqiu@google.com>
Reviewed-by: Julie Qiu <julieqiu@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/go.mod b/go.mod
index 5e926bc..a922848 100644
--- a/go.mod
+++ b/go.mod
@@ -8,6 +8,7 @@
 	cloud.google.com/go/cloudtasks v1.8.0
 	cloud.google.com/go/errorreporting v0.1.0
 	cloud.google.com/go/logging v1.6.1
+	cloud.google.com/go/secretmanager v1.9.0
 	github.com/GoogleCloudPlatform/opentelemetry-operations-go v1.0.0
 	github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.26.0
 	github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.0.0
@@ -16,6 +17,7 @@
 	github.com/go-git/go-git/v5 v5.5.2
 	github.com/google/go-cmp v0.5.9
 	github.com/google/safehtml v0.1.0
+	github.com/lib/pq v1.10.7
 	go.opencensus.io v0.24.0
 	go.opentelemetry.io/otel v1.4.0
 	go.opentelemetry.io/otel/sdk v1.4.0
diff --git a/go.sum b/go.sum
index 0ec6ecf..4bbfa8e 100644
--- a/go.sum
+++ b/go.sum
@@ -65,6 +65,8 @@
 cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
 cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
 cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
+cloud.google.com/go/secretmanager v1.9.0 h1:xE6uXljAC1kCR8iadt9+/blg1fvSbmenlsDN4fT9gqw=
+cloud.google.com/go/secretmanager v1.9.0/go.mod h1:b71qH2l1yHmWQHt9LC80akm86mX8AL6X1MA01dW8ht4=
 cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
 cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
 cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
@@ -265,6 +267,8 @@
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
+github.com/lib/pq v1.10.7 h1:p7ZhMD+KsSRozJr34udlUrhboJwWAgCg34+/ZZNvZZw=
+github.com/lib/pq v1.10.7/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
 github.com/matryer/is v1.2.0 h1:92UTHpy8CDwaJ08GqLDzhhuixiBUUD1p3AU6PHddz4A=
 github.com/matryer/is v1.2.0/go.mod h1:2fLPjFQM9rhQ15aVEtbuwhJinnOqrmgXPNdZsdwlWXA=
 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
diff --git a/internal/pkgsitedb/db.go b/internal/pkgsitedb/db.go
new file mode 100644
index 0000000..103a10d
--- /dev/null
+++ b/internal/pkgsitedb/db.go
@@ -0,0 +1,95 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package pkgsitedb provides functionality for connecting to the pkgsite
+// database.
+package pkgsitedb
+
+import (
+	"context"
+	"database/sql"
+	"fmt"
+	"regexp"
+
+	_ "github.com/lib/pq"
+
+	secretmanager "cloud.google.com/go/secretmanager/apiv1"
+	"golang.org/x/pkgsite-metrics/internal/config"
+	"golang.org/x/pkgsite-metrics/internal/derrors"
+	"golang.org/x/pkgsite-metrics/internal/scan"
+	smpb "google.golang.org/genproto/googleapis/cloud/secretmanager/v1"
+)
+
+// Open creates a connection to the pkgsite database.
+func Open(ctx context.Context, cfg *config.Config) (_ *sql.DB, err error) {
+	defer derrors.Wrap(&err, "Open")
+	password, err := getPasswordSecret(ctx, cfg.PkgsiteDBSecret)
+	if err != nil {
+		return nil, err
+	}
+	connString := fmt.Sprintf(
+		"user='%s' password='%s' host='%s' port=%s dbname='%s' sslmode='disable'",
+		cfg.PkgsiteDBUser, password, cfg.PkgsiteDBHost, cfg.PkgsiteDBPort, cfg.PkgsiteDBName)
+	defer derrors.Wrap(&err, "openPkgsiteDB, connString=%q", redactPassword(connString))
+	db, err := sql.Open("postgres", connString)
+	if err != nil {
+		return nil, err
+	}
+	if err := db.PingContext(ctx); err != nil {
+		return nil, err
+	}
+	return db, nil
+}
+
+var passwordRegexp = regexp.MustCompile(`password=\S+`)
+
+func redactPassword(dbinfo string) string {
+	return passwordRegexp.ReplaceAllLiteralString(dbinfo, "password=REDACTED")
+}
+
+func getPasswordSecret(ctx context.Context, secretFullName string) (_ string, err error) {
+	defer derrors.Wrap(&err, "getPasswordSecret(ctx, %q)", secretFullName)
+
+	client, err := secretmanager.NewClient(ctx)
+	if err != nil {
+		return "", err
+	}
+	defer client.Close()
+	result, err := client.AccessSecretVersion(ctx, &smpb.AccessSecretVersionRequest{
+		Name: secretFullName + "/versions/latest",
+	})
+	if err != nil {
+		return "", err
+	}
+	return string(result.Payload.Data), nil
+}
+
+// ModuleSpecs retrieves all modules that contain packages that are
+// imported by minImportedByCount or more packages.
+// It looks for the information in the search_documents table of the given pkgsite DB.
+func ModuleSpecs(ctx context.Context, db *sql.DB, minImportedByCount int) (specs []scan.ModuleSpec, err error) {
+	defer derrors.Wrap(&err, "moduleSpecsFromDB")
+	query := `
+		SELECT module_path, version, max(imported_by_count)
+		FROM search_documents
+		GROUP BY module_path, version
+		HAVING max(imported_by_count) >= $1
+		ORDER by max(imported_by_count) desc`
+	rows, err := db.QueryContext(ctx, query, minImportedByCount)
+	if err != nil {
+		return nil, err
+	}
+	defer rows.Close()
+	for rows.Next() {
+		var spec scan.ModuleSpec
+		if err := rows.Scan(&spec.Path, &spec.Version, &spec.ImportedBy); err != nil {
+			return nil, err
+		}
+		specs = append(specs, spec)
+	}
+	if err := rows.Err(); err != nil {
+		return nil, err
+	}
+	return specs, nil
+}
diff --git a/internal/pkgsitedb/db_test.go b/internal/pkgsitedb/db_test.go
new file mode 100644
index 0000000..9f7581a
--- /dev/null
+++ b/internal/pkgsitedb/db_test.go
@@ -0,0 +1,62 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package pkgsitedb
+
+import (
+	// imported to register the postgres database driver
+	"context"
+	"database/sql"
+	"flag"
+	"fmt"
+	"net/url"
+	"strings"
+	"testing"
+
+	_ "github.com/lib/pq"
+)
+
+// Pass -db to test against a a local database (host 127.0.0.1).
+var dbInfo = flag.String("db", "",
+	"DB info for testing in the form 'name=NAME&port=PORT&user=USER&password=PW'")
+
+func TestModuleSpecs(t *testing.T) {
+	if *dbInfo == "" {
+		t.Skip("missing -db")
+	}
+	info := map[string]string{}
+	for _, kv := range strings.Split(*dbInfo, "&") {
+		k, v, ok := strings.Cut(kv, "=")
+		if !ok {
+			t.Fatalf("%q is not in the form 'key=value'", kv)
+		}
+		info[k] = v
+	}
+
+	const host = "127.0.0.1"
+
+	ctx := context.Background()
+	dbinfo := fmt.Sprintf("postgres://%s/%s?sslmode=disable&user=%s&password=%s&port=%s&timezone=UTC",
+		host, info["name"], url.QueryEscape(info["user"]), url.QueryEscape(info["password"]),
+		url.QueryEscape(info["port"]))
+	db, err := sql.Open("postgres", dbinfo)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer db.Close()
+	if err := db.PingContext(ctx); err != nil {
+		t.Fatal(err)
+	}
+	got, err := ModuleSpecs(ctx, db, 1000)
+	if err != nil {
+		t.Fatal(err)
+	}
+	t.Logf("got %d module specs from %s", len(got), info["name"])
+	if got, want := len(got), 100; got < want {
+		t.Errorf("got %d results, expected at least %d", got, want)
+	}
+	for _, g := range got {
+		fmt.Printf("%s  %s\n", g.Path, g.Version)
+	}
+}
diff --git a/internal/worker/enqueue.go b/internal/worker/enqueue.go
new file mode 100644
index 0000000..20f6867
--- /dev/null
+++ b/internal/worker/enqueue.go
@@ -0,0 +1,89 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package worker
+
+import (
+	"context"
+	"sync"
+
+	"golang.org/x/pkgsite-metrics/internal/config"
+	"golang.org/x/pkgsite-metrics/internal/derrors"
+	"golang.org/x/pkgsite-metrics/internal/log"
+	"golang.org/x/pkgsite-metrics/internal/pkgsitedb"
+	"golang.org/x/pkgsite-metrics/internal/queue"
+	"golang.org/x/pkgsite-metrics/internal/scan"
+)
+
+const defaultMinImportedByCount = 10
+
+func readModules(ctx context.Context, cfg *config.Config, file string, minImpCount int) ([]scan.ModuleSpec, error) {
+	if file != "" {
+		log.Infof(ctx, "reading modules from file %s", file)
+		return scan.ParseCorpusFile(file, minImpCount)
+	}
+	log.Infof(ctx, "reading modules from DB %s", cfg.PkgsiteDBName)
+	return readFromDB(ctx, cfg, minImpCount)
+}
+
+func readFromDB(ctx context.Context, cfg *config.Config, minImportedByCount int) ([]scan.ModuleSpec, error) {
+	db, err := pkgsitedb.Open(ctx, cfg)
+	if err != nil {
+		return nil, err
+	}
+	defer db.Close()
+	return pkgsitedb.ModuleSpecs(ctx, db, minImportedByCount)
+}
+
+func enqueueModules(ctx context.Context, sreqs []*scan.Request, q queue.Queue, opts *queue.Options) (err error) {
+	defer derrors.Wrap(&err, "enqueueModules")
+
+	// Enqueue concurrently, because sequentially takes a while.
+	const concurrentEnqueues = 10
+	var (
+		mu                 sync.Mutex
+		nEnqueued, nErrors int
+	)
+	sem := make(chan struct{}, concurrentEnqueues)
+
+	for _, sreq := range sreqs {
+		log.Infof(ctx, "enqueuing: %s", sreq.URLPathAndParams())
+		if sreq.Module == "std" {
+			continue // ignore the standard library
+		}
+		sreq := sreq
+		sem <- struct{}{}
+		go func() {
+			defer func() { <-sem }()
+			enqueued, err := q.EnqueueScan(ctx, sreq, opts)
+			mu.Lock()
+			if err != nil {
+				log.Errorf(ctx, "enqueuing: %v", err)
+				nErrors++
+			} else if enqueued {
+				nEnqueued++
+			}
+			mu.Unlock()
+		}()
+	}
+	// Wait for goroutines to finish.
+	for i := 0; i < concurrentEnqueues; i++ {
+		sem <- struct{}{}
+	}
+	log.Infof(ctx, "Successfully scheduled modules to be fetched: %d modules enqueued, %d errors", nEnqueued, nErrors)
+	return nil
+}
+
+func moduleSpecsToScanRequests(modspecs []scan.ModuleSpec, mode string) []*scan.Request {
+	var sreqs []*scan.Request
+	for _, ms := range modspecs {
+		sreqs = append(sreqs, &scan.Request{
+			Module:     ms.Path,
+			Version:    ms.Version,
+			ImportedBy: ms.ImportedBy,
+			Mode:       mode,
+		})
+	}
+	return sreqs
+}
diff --git a/internal/worker/pkgsitedb.go b/internal/worker/pkgsitedb.go
new file mode 100644
index 0000000..33cec31
--- /dev/null
+++ b/internal/worker/pkgsitedb.go
@@ -0,0 +1,27 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package worker
+
+import (
+	"fmt"
+	"net/http"
+
+	_ "github.com/lib/pq"
+	"golang.org/x/pkgsite-metrics/internal/pkgsitedb"
+)
+
+func (s *Server) handleTestDB(w http.ResponseWriter, r *http.Request) error {
+	ctx := r.Context()
+	db, err := pkgsitedb.Open(ctx, s.cfg)
+	if err != nil {
+		return err
+	}
+	specs, err := pkgsitedb.ModuleSpecs(ctx, db, 100)
+	if err != nil {
+		return err
+	}
+	fmt.Fprintf(w, "got %d modules with over 100 importers", len(specs))
+	return nil
+}