internal/bigquery: add package
An initial bigquery package is added for reading and writing data to
BigQuery.
Change-Id: I3745968de37041a423ec1266551fc0eca3cbabb4
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/464621
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Auto-Submit: Julie Qiu <julieqiu@google.com>
Run-TryBot: Julie Qiu <julieqiu@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Julie Qiu <julieqiu@google.com>
diff --git a/go.mod b/go.mod
index 8e6266d..4729553 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,8 @@
go 1.18
require (
+ cloud.google.com/go v0.105.0
+ cloud.google.com/go/bigquery v1.43.0
cloud.google.com/go/errorreporting v0.1.0
cloud.google.com/go/logging v1.6.1
github.com/GoogleCloudPlatform/opentelemetry-operations-go v1.0.0
@@ -15,18 +17,20 @@
go.opencensus.io v0.24.0
go.opentelemetry.io/otel v1.4.0
go.opentelemetry.io/otel/sdk v1.4.0
+ golang.org/x/exp v0.0.0-20230131160201-f062dba9d201
golang.org/x/exp/event v0.0.0-20220218215828-6cf2b201936e
golang.org/x/mod v0.7.0
golang.org/x/net v0.5.0
golang.org/x/tools v0.5.1-0.20230117180257-8aba49bb5ea2
+ google.golang.org/api v0.103.0
honnef.co/go/tools v0.2.2
mvdan.cc/unparam v0.0.0-20220926085101-66de63301820
)
require (
- cloud.google.com/go v0.105.0 // indirect
cloud.google.com/go/compute v1.12.1 // indirect
cloud.google.com/go/compute/metadata v0.2.1 // indirect
+ cloud.google.com/go/iam v0.7.0 // indirect
cloud.google.com/go/longrunning v0.3.0 // indirect
cloud.google.com/go/monitoring v1.8.0 // indirect
cloud.google.com/go/trace v1.4.0 // indirect
@@ -41,6 +45,7 @@
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
+ github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.7.0 // indirect
github.com/imdario/mergo v0.3.13 // indirect
@@ -60,7 +65,7 @@
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
- google.golang.org/api v0.103.0 // indirect
+ golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20221201164419-0e50fba7f41c // indirect
google.golang.org/grpc v1.50.1 // indirect
diff --git a/go.sum b/go.sum
index 200485b..4f2b161 100644
--- a/go.sum
+++ b/go.sum
@@ -37,16 +37,21 @@
cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUMb4Nv6dBIg=
cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc=
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
+cloud.google.com/go/bigquery v1.43.0 h1:u0fvz5ysJBe1jwUPI4LuPwAX+o+6fCUwf3ECeg6eDUQ=
+cloud.google.com/go/bigquery v1.43.0/go.mod h1:ZMQcXHsl+xmU1z36G2jNGZmKp9zNY5BUua5wDgmNCfw=
cloud.google.com/go/compute v0.1.0/go.mod h1:GAesmwr110a34z04OlxYkATPBEfVhkymfTBXtfbBFow=
cloud.google.com/go/compute v1.2.0/go.mod h1:xlogom/6gr8RJGBe7nT2eGsQYAFUbbv8dbC29qE3Xmw=
cloud.google.com/go/compute v1.12.1 h1:gKVJMEyqV5c/UnpzjjQbo3Rjvvqpr9B1DFSbJC4OXr0=
cloud.google.com/go/compute v1.12.1/go.mod h1:e8yNOBcBONZU1vJKCvCoDw/4JQsA0dpM4x/6PIIOocU=
cloud.google.com/go/compute/metadata v0.2.1 h1:efOwf5ymceDhK6PKMnnrTHP4pppY5L22mle96M1yP48=
cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM=
+cloud.google.com/go/datacatalog v1.8.0 h1:6kZ4RIOW/uT7QWC5SfPfq/G8sYzr/v+UOmOAxy4Z1TE=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/errorreporting v0.1.0 h1:z40EhrjRspplwbpO+9DSnC4kgDokBi94T/gYwtdKL5Q=
cloud.google.com/go/errorreporting v0.1.0/go.mod h1:cZSiBMvrnl0X13pD9DwKf9sQ8Eqy3EzHqkyKBZxiIrM=
+cloud.google.com/go/iam v0.7.0 h1:k4MuwOsS7zGJJ+QfZ5vBK8SgHBAvYN/23BWsiihJ1vs=
+cloud.google.com/go/iam v0.7.0/go.mod h1:H5Br8wRaDGNc8XP3keLc4unfUUZeyH3Sfl9XpQEYOeg=
cloud.google.com/go/logging v1.6.1 h1:ZBsZK+JG+oCDT+vaxwqF2egKNRjz8soXiS6Xv79benI=
cloud.google.com/go/logging v1.6.1/go.mod h1:5ZO0mHHbvm8gEmeEUHrmDlTDSu5imF6MUP9OfilNXBw=
cloud.google.com/go/longrunning v0.3.0 h1:NjljC+FYPV3uh5/OwWT6pVU+doBqMg2x/rZlE+CamDs=
@@ -63,6 +68,7 @@
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
+cloud.google.com/go/storage v1.27.0 h1:YOO045NZI9RKfCj1c5A/ZtuuENUc8OAW+gHdGnDgyMQ=
cloud.google.com/go/trace v1.4.0 h1:qO9eLn2esajC9sxpqp1YKX37nXC3L4BfGnPS0Cx9dYo=
cloud.google.com/go/trace v1.4.0/go.mod h1:UG0v8UBqzusp+z63o7FK74SdFE+AXpCLdFb1rshXG+Y=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
@@ -198,9 +204,11 @@
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
+github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
+github.com/google/martian/v3 v3.2.1 h1:d8MncMlErDFTwQGBK1xhv026j9kqhvw1Qv9IbWT1VLQ=
github.com/google/martian/v3 v3.2.1/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
@@ -219,6 +227,8 @@
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
+github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
+github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.2.0 h1:y8Yozv7SZtlU//QXbezB6QkpuE6jMD2/gfzk4AftXjs=
github.com/googleapis/enterprise-certificate-proxy v0.2.0/go.mod h1:8C0jb7/mgJe/9KK8Lm7X9ctZC2t60YyIpYEI16jx0Qg=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
@@ -352,6 +362,8 @@
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
+golang.org/x/exp v0.0.0-20230131160201-f062dba9d201 h1:BEABXpNXLEz0WxtA+6CQIz2xkg80e+1zrhWyMcq8VzE=
+golang.org/x/exp v0.0.0-20230131160201-f062dba9d201/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp/event v0.0.0-20220218215828-6cf2b201936e h1:K2AuHMC+jaRTzAcivRwKOzjTZ1925Yx4xHMg07YoBQc=
golang.org/x/exp/event v0.0.0-20220218215828-6cf2b201936e/go.mod h1:AVlZHjhWbW/3yOcmKMtJiObwBPJajBlUpQXRijFNrNc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
@@ -602,6 +614,8 @@
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
+golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE=
google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=
google.golang.org/api v0.8.0/go.mod h1:o4eAsZoiT+ibD93RtjEohWalFOjRDx6CVaqeizhEnKg=
diff --git a/internal/bigquery/bigquery.go b/internal/bigquery/bigquery.go
new file mode 100644
index 0000000..b87fdea
--- /dev/null
+++ b/internal/bigquery/bigquery.go
@@ -0,0 +1,336 @@
+// Copyright 2022 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package bigquery provides a client for reading and writing to BigQuery.
+package bigquery
+
+import (
+ "context"
+ "crypto/sha256"
+ "encoding/hex"
+ "errors"
+ "fmt"
+ "net/http"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ bq "cloud.google.com/go/bigquery"
+ "cloud.google.com/go/civil"
+ "golang.org/x/exp/maps"
+ "golang.org/x/pkgsite-metrics/internal/derrors"
+ "google.golang.org/api/googleapi"
+ "google.golang.org/api/iterator"
+)
+
+// Client is a client for connecting to BigQuery.
+type Client struct {
+ client *bq.Client
+ dataset *bq.Dataset
+}
+
+// NewClient creates a new client for connecting to BigQuery, referring to a single dataset.
+// The dataset must already exist.
+func NewClient(ctx context.Context, projectID, datasetID string) (_ *Client, err error) {
+ defer derrors.Wrap(&err, "New(ctx, %q, %q)", projectID, datasetID)
+ client, err := bq.NewClient(ctx, projectID)
+ if err != nil {
+ return nil, err
+ }
+ dataset := client.DatasetInProject(projectID, datasetID)
+ if _, err := dataset.Metadata(ctx); err != nil {
+ return nil, err
+ }
+ return &Client{
+ client: client,
+ dataset: dataset,
+ }, nil
+}
+
+// NewClientCreate is like NewClient, but it creates the dataset if it doesn't exist.
+func NewClientCreate(ctx context.Context, projectID, datasetID string) (_ *Client, err error) {
+ if err := CreateDataset(ctx, projectID, datasetID); err != nil {
+ return nil, err
+ }
+ return NewClient(ctx, projectID, datasetID)
+}
+
+// CreateDataset creates a BigQuery dataset if it does not already exist.
+func CreateDataset(ctx context.Context, projectID, datasetID string) (err error) {
+ defer derrors.Wrap(&err, "CreateDataset(%q, %q)", projectID, datasetID)
+ client, err := bq.NewClient(ctx, projectID)
+ if err != nil {
+ return err
+ }
+ dataset := client.DatasetInProject(projectID, datasetID)
+ err = dataset.Create(ctx, &bq.DatasetMetadata{Name: datasetID})
+ if err != nil && !isAlreadyExistsError(err) {
+ return err
+ }
+ return nil
+}
+
+// isNotFoundError reports whether the error, which should come from a cloud.google.com/go/bigquery
+// client call, is a NotFound error.
+func isNotFoundError(err error) bool {
+ return hasCode(err, http.StatusNotFound)
+}
+
+func isAlreadyExistsError(err error) bool {
+ // The BigQuery API uses 409 for something that exists.
+ return hasCode(err, http.StatusConflict)
+}
+
+func hasCode(err error, code int) bool {
+ var gerr *googleapi.Error
+ if !errors.As(err, &gerr) {
+ return false
+ }
+ return gerr.Code == code
+}
+
+// Table returns a handle for the given tableID in the client's dataset.
+func (c *Client) Table(tableID string) *bq.Table {
+ return c.dataset.Table(tableID)
+}
+
+// FullTableName returns the fully-qualified name of the table, suitable for
+// use in queries.
+func (c *Client) FullTableName(tableID string) string {
+ // From https://github.com/googleapis/google-cloud-go/blob/bigquery/v1.43.0/bigquery/table.go#L544.
+ return fmt.Sprintf("%s.%s.%s", c.dataset.ProjectID, c.dataset.DatasetID, tableID)
+}
+
+// CreateTable creates a table with the given name if it doesn't exist.
+func (c *Client) CreateTable(ctx context.Context, tableID string) (err error) {
+ defer derrors.Wrap(&err, "CreateTable(%q)", tableID)
+ schema := tableSchema(tableID)
+ if schema == nil {
+ return fmt.Errorf("no schema registered for table %q", tableID)
+ }
+ err = c.Table(tableID).Create(ctx, &bq.TableMetadata{Schema: schema})
+ if err != nil && !isAlreadyExistsError(err) {
+ return err
+ }
+ return nil
+}
+
+// CreateOrUpdateTable creates a table if it does not exist, or updates it if it does.
+// It returns true if it created the table.
+func (c *Client) CreateOrUpdateTable(ctx context.Context, tableID string) (created bool, err error) {
+ defer derrors.Wrap(&err, "CreateOrUpdateTable(%q)", tableID)
+ meta, err := c.Table(tableID).Metadata(ctx) // check if the table already exists
+ if err != nil {
+ if !isNotFoundError(err) {
+ return false, err
+ }
+ return true, c.CreateTable(ctx, tableID)
+ }
+ schema := tableSchema(tableID)
+ if schema == nil {
+ return false, fmt.Errorf("no schema registered for table %q", tableID)
+ }
+ _, err = c.Table(tableID).Update(ctx, bq.TableMetadataToUpdate{Schema: schema}, meta.ETag)
+ return false, err
+}
+
+// Upload inserts a row into the table.
+func (c *Client) Upload(ctx context.Context, tableID string, row any) (err error) {
+ defer derrors.Wrap(&err, "Upload(ctx, %q)", tableID)
+ u := c.Table(tableID).Inserter()
+ if s, ok := row.(interface{ SetUploadTime(time.Time) }); ok {
+ s.SetUploadTime(time.Now())
+ }
+ return u.Put(ctx, row)
+}
+
+// UploadMany inserts multiple rows into the table.
+// Each row should be a struct pointer.
+// The chunkSize parameter limits the number of rows sent in a single request; this may
+// be necessary to avoid reaching the maximum size of a request.
+// If chunkSize is <= 0, all rows will be sent in one request.
+func UploadMany[T any](ctx context.Context, client *Client, tableID string, rows []T, chunkSize int) (err error) {
+ defer derrors.Wrap(&err, "UploadMany(%q), %d rows, chunkSize=%d", tableID, len(rows), chunkSize)
+
+ now := time.Now()
+ // Set upload time.
+ for _, r := range rows {
+ if s, ok := any(r).(interface{ SetUploadTime(time.Time) }); ok {
+ s.SetUploadTime(now)
+ }
+ }
+
+ ins := client.Table(tableID).Inserter()
+ if chunkSize <= 0 {
+ return ins.Put(ctx, rows)
+ }
+ start := 0
+ for start < len(rows) {
+ end := start + chunkSize
+ if end > len(rows) {
+ end = len(rows)
+ }
+ for {
+ if err := ins.Put(ctx, rows[start:end]); err == nil {
+ break
+ } else if hasCode(err, http.StatusRequestEntityTooLarge) && end-start > 1 {
+ // Request too large; reduce this chunk size by half.
+ end = start + (end-start)/2
+ continue
+ } else {
+ return err
+ }
+ }
+ start = end
+ }
+ return nil
+}
+
+// ForEachRow calls f for each row in the given iterator.
+// It returns as soon as f returns false.
+func ForEachRow[T any](iter *bq.RowIterator, f func(*T) bool) error {
+ for {
+ var row T
+ err := iter.Next(&row)
+ if err == iterator.Done {
+ break
+ }
+ if err != nil {
+ return err
+ }
+ if !f(&row) {
+ break
+ }
+ }
+ return nil
+}
+
+// All returns all rows returned by iter.
+func All[T any](iter *bq.RowIterator) ([]*T, error) {
+ var ts []*T
+ err := ForEachRow(iter, func(t *T) bool {
+ ts = append(ts, t)
+ return true
+ })
+ if err != nil {
+ return nil, err
+ }
+ return ts, nil
+}
+
+func (c *Client) Query(ctx context.Context, q string) (*bq.RowIterator, error) {
+ return c.client.Query(q).Read(ctx)
+}
+
+// NullString constructs a bq.NullString.
+func NullString(s string) bq.NullString {
+ return bq.NullString{StringVal: s, Valid: true}
+}
+
+// NullInt constructs a bq.NullInt.
+func NullInt(i int) bq.NullInt64 {
+ return bq.NullInt64{Int64: int64(i), Valid: true}
+}
+
+// NullTime constructs a bq.NullTime.
+func NullTime(t time.Time) bq.NullTime {
+ return bq.NullTime{Time: civil.TimeOf(t), Valid: true}
+}
+
+// schemaVersion computes a relatively short string from a schema, such that
+// different schemas result in different strings with high probability.
+func schemaVersion(schema bq.Schema) string {
+ hash := sha256.Sum256([]byte(schemaString(schema)))
+ return hex.EncodeToString(hash[:])
+}
+
+// schemaString returns a long, human-readable string summarizing schema.
+func schemaString(schema bq.Schema) string {
+ var b strings.Builder
+ for i, field := range schema {
+ if i > 0 {
+ b.WriteRune(';')
+ }
+ b.WriteString(field.Name)
+ if field.Repeated {
+ b.WriteString(",rep")
+ }
+ if field.Required {
+ b.WriteString(",req")
+ }
+ b.WriteByte(':')
+ if field.Type == bq.RecordFieldType {
+ fmt.Fprintf(&b, "(%s)", schemaString(field.Schema))
+ } else {
+ b.WriteString(string(field.Type))
+ }
+ }
+ return b.String()
+}
+
+var (
+ tableMu sync.Mutex
+ tables = map[string]bq.Schema{}
+)
+
+func addTable(tableID string, s bq.Schema) {
+ tableMu.Lock()
+ defer tableMu.Unlock()
+ tables[tableID] = s
+}
+
+// tableSchema returns the schema associated with the given table,
+// or nil if there is none.
+func tableSchema(tableID string) bq.Schema {
+ tableMu.Lock()
+ defer tableMu.Unlock()
+ return tables[tableID]
+}
+
+// Tables returns all the tables used by the worker.
+func Tables() []string {
+ tableMu.Lock()
+ defer tableMu.Unlock()
+ tableIDs := maps.Keys(tables)
+ sort.Strings(tableIDs)
+ return tableIDs
+}
+
+// partitionQuery returns a query that returns one row for each distinct value
+// of partitionColumn in tableName.
+// The selected row will be the first one according to the orderings, which
+// should be comma-separated ORDER BY clauses.
+//
+// For example, say the students table holds student names and classes.
+// Then
+//
+// partitionQuery("students", "class", "name ASC")
+//
+// will construct a query returning the student in each class whose name is
+// alphabetically first.
+//
+// (BigQuery SQL has no DISTINCT ON feature and doesn't allow columns of type RECORD
+// in queries with DISTINCT, so we have to take this approach.)
+func partitionQuery(tableName, partitionColumn, orderings string) string {
+ // This query first organizes the table rows into windows that have the same partitionColumn.
+ // The rows in each window are sorted by the given orderings.
+ // They are then assigned numbers, where 1 is the first row in the window.
+ // Finally, only the first row in each window is chosen.
+ // (ROW_NUMBER guarantees that each row has a distinct number; RANK assigns the
+ // same number to identical rows, which means that the result may still contain
+ // duplicates.)
+ const qf = `
+ SELECT * EXCEPT (rownum)
+ FROM (
+ SELECT *, ROW_NUMBER() OVER (
+ PARTITION BY %s
+ ORDER BY %s
+ ) AS rownum
+ FROM %s
+ ) WHERE rownum = 1
+ `
+
+ return fmt.Sprintf(qf, partitionColumn, orderings, "`"+tableName+"`")
+}