internal/bigquery: add package

An initial bigquery package is added for reading and writing data to
BigQuery.

Change-Id: I3745968de37041a423ec1266551fc0eca3cbabb4
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/464621
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Auto-Submit: Julie Qiu <julieqiu@google.com>
Run-TryBot: Julie Qiu <julieqiu@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Julie Qiu <julieqiu@google.com>
diff --git a/go.mod b/go.mod
index 8e6266d..4729553 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,8 @@
 go 1.18
 
 require (
+	cloud.google.com/go v0.105.0
+	cloud.google.com/go/bigquery v1.43.0
 	cloud.google.com/go/errorreporting v0.1.0
 	cloud.google.com/go/logging v1.6.1
 	github.com/GoogleCloudPlatform/opentelemetry-operations-go v1.0.0
@@ -15,18 +17,20 @@
 	go.opencensus.io v0.24.0
 	go.opentelemetry.io/otel v1.4.0
 	go.opentelemetry.io/otel/sdk v1.4.0
+	golang.org/x/exp v0.0.0-20230131160201-f062dba9d201
 	golang.org/x/exp/event v0.0.0-20220218215828-6cf2b201936e
 	golang.org/x/mod v0.7.0
 	golang.org/x/net v0.5.0
 	golang.org/x/tools v0.5.1-0.20230117180257-8aba49bb5ea2
+	google.golang.org/api v0.103.0
 	honnef.co/go/tools v0.2.2
 	mvdan.cc/unparam v0.0.0-20220926085101-66de63301820
 )
 
 require (
-	cloud.google.com/go v0.105.0 // indirect
 	cloud.google.com/go/compute v1.12.1 // indirect
 	cloud.google.com/go/compute/metadata v0.2.1 // indirect
+	cloud.google.com/go/iam v0.7.0 // indirect
 	cloud.google.com/go/longrunning v0.3.0 // indirect
 	cloud.google.com/go/monitoring v1.8.0 // indirect
 	cloud.google.com/go/trace v1.4.0 // indirect
@@ -41,6 +45,7 @@
 	github.com/go-logr/stdr v1.2.2 // indirect
 	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
 	github.com/golang/protobuf v1.5.2 // indirect
+	github.com/google/uuid v1.3.0 // indirect
 	github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
 	github.com/googleapis/gax-go/v2 v2.7.0 // indirect
 	github.com/imdario/mergo v0.3.13 // indirect
@@ -60,7 +65,7 @@
 	golang.org/x/sync v0.1.0 // indirect
 	golang.org/x/sys v0.4.0 // indirect
 	golang.org/x/text v0.6.0 // indirect
-	google.golang.org/api v0.103.0 // indirect
+	golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
 	google.golang.org/appengine v1.6.7 // indirect
 	google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c // indirect
 	google.golang.org/grpc v1.50.1 // indirect
diff --git a/go.sum b/go.sum
index 200485b..4f2b161 100644
--- a/go.sum
+++ b/go.sum
@@ -37,16 +37,21 @@
 cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg=
 cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc=
 cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
+cloud.google.com/go/bigquery v1.43.0 h1:u0fvz5ysJBe1jwUPI4LuPwAX+o+6fCUwf3ECeg6eDUQ=
+cloud.google.com/go/bigquery v1.43.0/go.mod h1:ZMQcXHsl+xmU1z36G2jNGZmKp9zNY5BUua5wDgmNCfw=
 cloud.google.com/go/compute v0.1.0/go.mod h1:GAesmwr110a34z04OlxYkATPBEfVhkymfTBXtfbBFow=
 cloud.google.com/go/compute v1.2.0/go.mod h1:xlogom/6gr8RJGBe7nT2eGsQYAFUbbv8dbC29qE3Xmw=
 cloud.google.com/go/compute v1.12.1 h1:gKVJMEyqV5c/UnpzjjQbo3Rjvvqpr9B1DFSbJC4OXr0=
 cloud.google.com/go/compute v1.12.1/go.mod h1:e8yNOBcBONZU1vJKCvCoDw/4JQsA0dpM4x/6PIIOocU=
 cloud.google.com/go/compute/metadata v0.2.1 h1:efOwf5ymceDhK6PKMnnrTHP4pppY5L22mle96M1yP48=
 cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM=
+cloud.google.com/go/datacatalog v1.8.0 h1:6kZ4RIOW/uT7QWC5SfPfq/G8sYzr/v+UOmOAxy4Z1TE=
 cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
 cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
 cloud.google.com/go/errorreporting v0.1.0 h1:z40EhrjRspplwbpO+9DSnC4kgDokBi94T/gYwtdKL5Q=
 cloud.google.com/go/errorreporting v0.1.0/go.mod h1:cZSiBMvrnl0X13pD9DwKf9sQ8Eqy3EzHqkyKBZxiIrM=
+cloud.google.com/go/iam v0.7.0 h1:k4MuwOsS7zGJJ+QfZ5vBK8SgHBAvYN/23BWsiihJ1vs=
+cloud.google.com/go/iam v0.7.0/go.mod h1:H5Br8wRaDGNc8XP3keLc4unfUUZeyH3Sfl9XpQEYOeg=
 cloud.google.com/go/logging v1.6.1 h1:ZBsZK+JG+oCDT+vaxwqF2egKNRjz8soXiS6Xv79benI=
 cloud.google.com/go/logging v1.6.1/go.mod h1:5ZO0mHHbvm8gEmeEUHrmDlTDSu5imF6MUP9OfilNXBw=
 cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+CamDs=
@@ -63,6 +68,7 @@
 cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
 cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
 cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
+cloud.google.com/go/storage v1.27.0 h1:YOO045NZI9RKfCj1c5A/ZtuuENUc8OAW+gHdGnDgyMQ=
 cloud.google.com/go/trace v1.4.0 h1:qO9eLn2esajC9sxpqp1YKX37nXC3L4BfGnPS0Cx9dYo=
 cloud.google.com/go/trace v1.4.0/go.mod h1:UG0v8UBqzusp+z63o7FK74SdFE+AXpCLdFb1rshXG+Y=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
@@ -198,9 +204,11 @@
 github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
 github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
 github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
 github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
 github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
 github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
+github.com/google/martian/v3 v3.2.1 h1:d8MncMlErDFTwQGBK1xhv026j9kqhvw1Qv9IbWT1VLQ=
 github.com/google/martian/v3 v3.2.1/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk=
 github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
 github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
@@ -219,6 +227,8 @@
 github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
+github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/googleapis/enterprise-certificate-proxy v0.2.0 h1:y8Yozv7SZtlU//QXbezB6QkpuE6jMD2/gfzk4AftXjs=
 github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg=
 github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
@@ -352,6 +362,8 @@
 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
 golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
 golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
+golang.org/x/exp v0.0.0-20230131160201-f062dba9d201 h1:BEABXpNXLEz0WxtA+6CQIz2xkg80e+1zrhWyMcq8VzE=
+golang.org/x/exp v0.0.0-20230131160201-f062dba9d201/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
 golang.org/x/exp/event v0.0.0-20220218215828-6cf2b201936e h1:K2AuHMC+jaRTzAcivRwKOzjTZ1925Yx4xHMg07YoBQc=
 golang.org/x/exp/event v0.0.0-20220218215828-6cf2b201936e/go.mod h1:AVlZHjhWbW/3yOcmKMtJiObwBPJajBlUpQXRijFNrNc=
 golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
@@ -602,6 +614,8 @@
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
+golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
 google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
 google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
 google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
diff --git a/internal/bigquery/bigquery.go b/internal/bigquery/bigquery.go
new file mode 100644
index 0000000..b87fdea
--- /dev/null
+++ b/internal/bigquery/bigquery.go
@@ -0,0 +1,336 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package bigquery provides a client for reading and writing to BigQuery.
+package bigquery
+
+import (
+	"context"
+	"crypto/sha256"
+	"encoding/hex"
+	"errors"
+	"fmt"
+	"net/http"
+	"sort"
+	"strings"
+	"sync"
+	"time"
+
+	bq "cloud.google.com/go/bigquery"
+	"cloud.google.com/go/civil"
+	"golang.org/x/exp/maps"
+	"golang.org/x/pkgsite-metrics/internal/derrors"
+	"google.golang.org/api/googleapi"
+	"google.golang.org/api/iterator"
+)
+
+// Client is a client for connecting to BigQuery.
+type Client struct {
+	client  *bq.Client
+	dataset *bq.Dataset
+}
+
+// NewClient creates a new client for connecting to BigQuery, referring to a single dataset.
+// The dataset must already exist.
+func NewClient(ctx context.Context, projectID, datasetID string) (_ *Client, err error) {
+	defer derrors.Wrap(&err, "New(ctx, %q, %q)", projectID, datasetID)
+	client, err := bq.NewClient(ctx, projectID)
+	if err != nil {
+		return nil, err
+	}
+	dataset := client.DatasetInProject(projectID, datasetID)
+	if _, err := dataset.Metadata(ctx); err != nil {
+		return nil, err
+	}
+	return &Client{
+		client:  client,
+		dataset: dataset,
+	}, nil
+}
+
+// NewClientCreate is like NewClient, but it creates the dataset if it doesn't exist.
+func NewClientCreate(ctx context.Context, projectID, datasetID string) (_ *Client, err error) {
+	if err := CreateDataset(ctx, projectID, datasetID); err != nil {
+		return nil, err
+	}
+	return NewClient(ctx, projectID, datasetID)
+}
+
+// CreateDataset creates a BigQuery dataset if it does not already exist.
+func CreateDataset(ctx context.Context, projectID, datasetID string) (err error) {
+	defer derrors.Wrap(&err, "CreateDataset(%q, %q)", projectID, datasetID)
+	client, err := bq.NewClient(ctx, projectID)
+	if err != nil {
+		return err
+	}
+	dataset := client.DatasetInProject(projectID, datasetID)
+	err = dataset.Create(ctx, &bq.DatasetMetadata{Name: datasetID})
+	if err != nil && !isAlreadyExistsError(err) {
+		return err
+	}
+	return nil
+}
+
+// isNotFoundError reports whether the error, which should come from a cloud.google.com/go/bigquery
+// client call, is a NotFound error.
+func isNotFoundError(err error) bool {
+	return hasCode(err, http.StatusNotFound)
+}
+
+func isAlreadyExistsError(err error) bool {
+	// The BigQuery API uses 409 for something that exists.
+	return hasCode(err, http.StatusConflict)
+}
+
+func hasCode(err error, code int) bool {
+	var gerr *googleapi.Error
+	if !errors.As(err, &gerr) {
+		return false
+	}
+	return gerr.Code == code
+}
+
+// Table returns a handle for the given tableID in the client's dataset.
+func (c *Client) Table(tableID string) *bq.Table {
+	return c.dataset.Table(tableID)
+}
+
+// FullTableName returns the fully-qualified name of the table, suitable for
+// use in queries.
+func (c *Client) FullTableName(tableID string) string {
+	// From https://github.com/googleapis/google-cloud-go/blob/bigquery/v1.43.0/bigquery/table.go#L544.
+	return fmt.Sprintf("%s.%s.%s", c.dataset.ProjectID, c.dataset.DatasetID, tableID)
+}
+
+// CreateTable creates a table with the given name if it doesn't exist.
+func (c *Client) CreateTable(ctx context.Context, tableID string) (err error) {
+	defer derrors.Wrap(&err, "CreateTable(%q)", tableID)
+	schema := tableSchema(tableID)
+	if schema == nil {
+		return fmt.Errorf("no schema registered for table %q", tableID)
+	}
+	err = c.Table(tableID).Create(ctx, &bq.TableMetadata{Schema: schema})
+	if err != nil && !isAlreadyExistsError(err) {
+		return err
+	}
+	return nil
+}
+
+// CreateOrUpdateTable creates a table if it does not exist, or updates it if it does.
+// It returns true if it created the table.
+func (c *Client) CreateOrUpdateTable(ctx context.Context, tableID string) (created bool, err error) {
+	defer derrors.Wrap(&err, "CreateOrUpdateTable(%q)", tableID)
+	meta, err := c.Table(tableID).Metadata(ctx) // check if the table already exists
+	if err != nil {
+		if !isNotFoundError(err) {
+			return false, err
+		}
+		return true, c.CreateTable(ctx, tableID)
+	}
+	schema := tableSchema(tableID)
+	if schema == nil {
+		return false, fmt.Errorf("no schema registered for table %q", tableID)
+	}
+	_, err = c.Table(tableID).Update(ctx, bq.TableMetadataToUpdate{Schema: schema}, meta.ETag)
+	return false, err
+}
+
+// Upload inserts a row into the table.
+func (c *Client) Upload(ctx context.Context, tableID string, row any) (err error) {
+	defer derrors.Wrap(&err, "Upload(ctx, %q)", tableID)
+	u := c.Table(tableID).Inserter()
+	if s, ok := row.(interface{ SetUploadTime(time.Time) }); ok {
+		s.SetUploadTime(time.Now())
+	}
+	return u.Put(ctx, row)
+}
+
+// UploadMany inserts multiple rows into the table.
+// Each row should be a struct pointer.
+// The chunkSize parameter limits the number of rows sent in a single request; this may
+// be necessary to avoid reaching the maximum size of a request.
+// If chunkSize is <= 0, all rows will be sent in one request.
+func UploadMany[T any](ctx context.Context, client *Client, tableID string, rows []T, chunkSize int) (err error) {
+	defer derrors.Wrap(&err, "UploadMany(%q), %d rows, chunkSize=%d", tableID, len(rows), chunkSize)
+
+	now := time.Now()
+	// Set upload time.
+	for _, r := range rows {
+		if s, ok := any(r).(interface{ SetUploadTime(time.Time) }); ok {
+			s.SetUploadTime(now)
+		}
+	}
+
+	ins := client.Table(tableID).Inserter()
+	if chunkSize <= 0 {
+		return ins.Put(ctx, rows)
+	}
+	start := 0
+	for start < len(rows) {
+		end := start + chunkSize
+		if end > len(rows) {
+			end = len(rows)
+		}
+		for {
+			if err := ins.Put(ctx, rows[start:end]); err == nil {
+				break
+			} else if hasCode(err, http.StatusRequestEntityTooLarge) && end-start > 1 {
+				// Request too large; reduce this chunk size by half.
+				end = start + (end-start)/2
+				continue
+			} else {
+				return err
+			}
+		}
+		start = end
+	}
+	return nil
+}
+
+// ForEachRow calls f for each row in the given iterator.
+// It returns as soon as f returns false.
+func ForEachRow[T any](iter *bq.RowIterator, f func(*T) bool) error {
+	for {
+		var row T
+		err := iter.Next(&row)
+		if err == iterator.Done {
+			break
+		}
+		if err != nil {
+			return err
+		}
+		if !f(&row) {
+			break
+		}
+	}
+	return nil
+}
+
+// All returns all rows returned by iter.
+func All[T any](iter *bq.RowIterator) ([]*T, error) {
+	var ts []*T
+	err := ForEachRow(iter, func(t *T) bool {
+		ts = append(ts, t)
+		return true
+	})
+	if err != nil {
+		return nil, err
+	}
+	return ts, nil
+}
+
+func (c *Client) Query(ctx context.Context, q string) (*bq.RowIterator, error) {
+	return c.client.Query(q).Read(ctx)
+}
+
+// NullString constructs a bq.NullString.
+func NullString(s string) bq.NullString {
+	return bq.NullString{StringVal: s, Valid: true}
+}
+
+// NullInt constructs a bq.NullInt.
+func NullInt(i int) bq.NullInt64 {
+	return bq.NullInt64{Int64: int64(i), Valid: true}
+}
+
+// NullTime constructs a bq.NullTime.
+func NullTime(t time.Time) bq.NullTime {
+	return bq.NullTime{Time: civil.TimeOf(t), Valid: true}
+}
+
+// schemaVersion computes a relatively short string from a schema, such that
+// different schemas result in different strings with high probability.
+func schemaVersion(schema bq.Schema) string {
+	hash := sha256.Sum256([]byte(schemaString(schema)))
+	return hex.EncodeToString(hash[:])
+}
+
+// schemaString returns a long, human-readable string summarizing schema.
+func schemaString(schema bq.Schema) string {
+	var b strings.Builder
+	for i, field := range schema {
+		if i > 0 {
+			b.WriteRune(';')
+		}
+		b.WriteString(field.Name)
+		if field.Repeated {
+			b.WriteString(",rep")
+		}
+		if field.Required {
+			b.WriteString(",req")
+		}
+		b.WriteByte(':')
+		if field.Type == bq.RecordFieldType {
+			fmt.Fprintf(&b, "(%s)", schemaString(field.Schema))
+		} else {
+			b.WriteString(string(field.Type))
+		}
+	}
+	return b.String()
+}
+
+var (
+	tableMu sync.Mutex
+	tables  = map[string]bq.Schema{}
+)
+
+func addTable(tableID string, s bq.Schema) {
+	tableMu.Lock()
+	defer tableMu.Unlock()
+	tables[tableID] = s
+}
+
+// tableSchema returns the schema associated with the given table,
+// or nil if there is none.
+func tableSchema(tableID string) bq.Schema {
+	tableMu.Lock()
+	defer tableMu.Unlock()
+	return tables[tableID]
+}
+
+// Tables returns all the tables used by the worker.
+func Tables() []string {
+	tableMu.Lock()
+	defer tableMu.Unlock()
+	tableIDs := maps.Keys(tables)
+	sort.Strings(tableIDs)
+	return tableIDs
+}
+
+// partitionQuery returns a query that returns one row for each distinct value
+// of partitionColumn in tableName.
+// The selected row will be the first one according to the orderings, which
+// should be comma-separated ORDER BY clauses.
+//
+// For example, say the students table holds student names and classes.
+// Then
+//
+//	partitionQuery("students", "class", "name ASC")
+//
+// will construct a query returning the student in each class whose name is
+// alphabetically first.
+//
+// (BigQuery SQL has no DISTINCT ON feature and doesn't allow columns of type RECORD
+// in queries with DISTINCT, so we have to take this approach.)
+func partitionQuery(tableName, partitionColumn, orderings string) string {
+	// This query first organizes the table rows into windows that have the same partitionColumn.
+	// The rows in each window are sorted by the given orderings.
+	// They are then assigned numbers, where 1 is the first row in the window.
+	// Finally, only the first row in each window is chosen.
+	// (ROW_NUMBER guarantees that each row has a distinct number; RANK assigns the
+	// same number to identical rows, which means that the result may still contain
+	// duplicates.)
+	const qf = `
+		SELECT * EXCEPT (rownum)
+		FROM (
+			SELECT *, ROW_NUMBER() OVER (
+				PARTITION BY %s
+				ORDER BY %s
+			) AS rownum
+			FROM %s
+		) WHERE rownum = 1
+	`
+
+	return fmt.Sprintf(qf, partitionColumn, orderings, "`"+tableName+"`")
+}