blob: b87fdea8807fb94d66d59027c256d0dd9721d102 [file] [log] [blame]
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package bigquery provides a client for reading and writing to BigQuery.
package bigquery
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"net/http"
"sort"
"strings"
"sync"
"time"
bq "cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
"golang.org/x/exp/maps"
"golang.org/x/pkgsite-metrics/internal/derrors"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
)
// Client is a client for connecting to BigQuery.
type Client struct {
client *bq.Client
dataset *bq.Dataset
}
// NewClient creates a new client for connecting to BigQuery, referring to a single dataset.
// The dataset must already exist.
func NewClient(ctx context.Context, projectID, datasetID string) (_ *Client, err error) {
defer derrors.Wrap(&err, "New(ctx, %q, %q)", projectID, datasetID)
client, err := bq.NewClient(ctx, projectID)
if err != nil {
return nil, err
}
dataset := client.DatasetInProject(projectID, datasetID)
if _, err := dataset.Metadata(ctx); err != nil {
return nil, err
}
return &Client{
client: client,
dataset: dataset,
}, nil
}
// NewClientCreate is like NewClient, but it creates the dataset if it doesn't exist.
func NewClientCreate(ctx context.Context, projectID, datasetID string) (_ *Client, err error) {
if err := CreateDataset(ctx, projectID, datasetID); err != nil {
return nil, err
}
return NewClient(ctx, projectID, datasetID)
}
// CreateDataset creates a BigQuery dataset if it does not already exist.
func CreateDataset(ctx context.Context, projectID, datasetID string) (err error) {
defer derrors.Wrap(&err, "CreateDataset(%q, %q)", projectID, datasetID)
client, err := bq.NewClient(ctx, projectID)
if err != nil {
return err
}
dataset := client.DatasetInProject(projectID, datasetID)
err = dataset.Create(ctx, &bq.DatasetMetadata{Name: datasetID})
if err != nil && !isAlreadyExistsError(err) {
return err
}
return nil
}
// isNotFoundError reports whether the error, which should come from a cloud.google.com/go/bigquery
// client call, is a NotFound error.
func isNotFoundError(err error) bool {
return hasCode(err, http.StatusNotFound)
}
func isAlreadyExistsError(err error) bool {
// The BigQuery API uses 409 for something that exists.
return hasCode(err, http.StatusConflict)
}
func hasCode(err error, code int) bool {
var gerr *googleapi.Error
if !errors.As(err, &gerr) {
return false
}
return gerr.Code == code
}
// Table returns a handle for the given tableID in the client's dataset.
func (c *Client) Table(tableID string) *bq.Table {
return c.dataset.Table(tableID)
}
// FullTableName returns the fully-qualified name of the table, suitable for
// use in queries.
func (c *Client) FullTableName(tableID string) string {
// From https://github.com/googleapis/google-cloud-go/blob/bigquery/v1.43.0/bigquery/table.go#L544.
return fmt.Sprintf("%s.%s.%s", c.dataset.ProjectID, c.dataset.DatasetID, tableID)
}
// CreateTable creates a table with the given name if it doesn't exist.
func (c *Client) CreateTable(ctx context.Context, tableID string) (err error) {
defer derrors.Wrap(&err, "CreateTable(%q)", tableID)
schema := tableSchema(tableID)
if schema == nil {
return fmt.Errorf("no schema registered for table %q", tableID)
}
err = c.Table(tableID).Create(ctx, &bq.TableMetadata{Schema: schema})
if err != nil && !isAlreadyExistsError(err) {
return err
}
return nil
}
// CreateOrUpdateTable creates a table if it does not exist, or updates it if it does.
// It returns true if it created the table.
func (c *Client) CreateOrUpdateTable(ctx context.Context, tableID string) (created bool, err error) {
defer derrors.Wrap(&err, "CreateOrUpdateTable(%q)", tableID)
meta, err := c.Table(tableID).Metadata(ctx) // check if the table already exists
if err != nil {
if !isNotFoundError(err) {
return false, err
}
return true, c.CreateTable(ctx, tableID)
}
schema := tableSchema(tableID)
if schema == nil {
return false, fmt.Errorf("no schema registered for table %q", tableID)
}
_, err = c.Table(tableID).Update(ctx, bq.TableMetadataToUpdate{Schema: schema}, meta.ETag)
return false, err
}
// Upload inserts a row into the table.
func (c *Client) Upload(ctx context.Context, tableID string, row any) (err error) {
defer derrors.Wrap(&err, "Upload(ctx, %q)", tableID)
u := c.Table(tableID).Inserter()
if s, ok := row.(interface{ SetUploadTime(time.Time) }); ok {
s.SetUploadTime(time.Now())
}
return u.Put(ctx, row)
}
// UploadMany inserts multiple rows into the table.
// Each row should be a struct pointer.
// The chunkSize parameter limits the number of rows sent in a single request; this may
// be necessary to avoid reaching the maximum size of a request.
// If chunkSize is <= 0, all rows will be sent in one request.
func UploadMany[T any](ctx context.Context, client *Client, tableID string, rows []T, chunkSize int) (err error) {
defer derrors.Wrap(&err, "UploadMany(%q), %d rows, chunkSize=%d", tableID, len(rows), chunkSize)
now := time.Now()
// Set upload time.
for _, r := range rows {
if s, ok := any(r).(interface{ SetUploadTime(time.Time) }); ok {
s.SetUploadTime(now)
}
}
ins := client.Table(tableID).Inserter()
if chunkSize <= 0 {
return ins.Put(ctx, rows)
}
start := 0
for start < len(rows) {
end := start + chunkSize
if end > len(rows) {
end = len(rows)
}
for {
if err := ins.Put(ctx, rows[start:end]); err == nil {
break
} else if hasCode(err, http.StatusRequestEntityTooLarge) && end-start > 1 {
// Request too large; reduce this chunk size by half.
end = start + (end-start)/2
continue
} else {
return err
}
}
start = end
}
return nil
}
// ForEachRow calls f for each row in the given iterator.
// It returns as soon as f returns false.
func ForEachRow[T any](iter *bq.RowIterator, f func(*T) bool) error {
for {
var row T
err := iter.Next(&row)
if err == iterator.Done {
break
}
if err != nil {
return err
}
if !f(&row) {
break
}
}
return nil
}
// All returns all rows returned by iter.
func All[T any](iter *bq.RowIterator) ([]*T, error) {
var ts []*T
err := ForEachRow(iter, func(t *T) bool {
ts = append(ts, t)
return true
})
if err != nil {
return nil, err
}
return ts, nil
}
func (c *Client) Query(ctx context.Context, q string) (*bq.RowIterator, error) {
return c.client.Query(q).Read(ctx)
}
// NullString constructs a bq.NullString.
func NullString(s string) bq.NullString {
return bq.NullString{StringVal: s, Valid: true}
}
// NullInt constructs a bq.NullInt.
func NullInt(i int) bq.NullInt64 {
return bq.NullInt64{Int64: int64(i), Valid: true}
}
// NullTime constructs a bq.NullTime.
func NullTime(t time.Time) bq.NullTime {
return bq.NullTime{Time: civil.TimeOf(t), Valid: true}
}
// schemaVersion computes a relatively short string from a schema, such that
// different schemas result in different strings with high probability.
func schemaVersion(schema bq.Schema) string {
hash := sha256.Sum256([]byte(schemaString(schema)))
return hex.EncodeToString(hash[:])
}
// schemaString returns a long, human-readable string summarizing schema.
func schemaString(schema bq.Schema) string {
var b strings.Builder
for i, field := range schema {
if i > 0 {
b.WriteRune(';')
}
b.WriteString(field.Name)
if field.Repeated {
b.WriteString(",rep")
}
if field.Required {
b.WriteString(",req")
}
b.WriteByte(':')
if field.Type == bq.RecordFieldType {
fmt.Fprintf(&b, "(%s)", schemaString(field.Schema))
} else {
b.WriteString(string(field.Type))
}
}
return b.String()
}
var (
tableMu sync.Mutex
tables = map[string]bq.Schema{}
)
func addTable(tableID string, s bq.Schema) {
tableMu.Lock()
defer tableMu.Unlock()
tables[tableID] = s
}
// tableSchema returns the schema associated with the given table,
// or nil if there is none.
func tableSchema(tableID string) bq.Schema {
tableMu.Lock()
defer tableMu.Unlock()
return tables[tableID]
}
// Tables returns all the tables used by the worker.
func Tables() []string {
tableMu.Lock()
defer tableMu.Unlock()
tableIDs := maps.Keys(tables)
sort.Strings(tableIDs)
return tableIDs
}
// partitionQuery returns a query that returns one row for each distinct value
// of partitionColumn in tableName.
// The selected row will be the first one according to the orderings, which
// should be comma-separated ORDER BY clauses.
//
// For example, say the students table holds student names and classes.
// Then
//
// partitionQuery("students", "class", "name ASC")
//
// will construct a query returning the student in each class whose name is
// alphabetically first.
//
// (BigQuery SQL has no DISTINCT ON feature and doesn't allow columns of type RECORD
// in queries with DISTINCT, so we have to take this approach.)
func partitionQuery(tableName, partitionColumn, orderings string) string {
// This query first organizes the table rows into windows that have the same partitionColumn.
// The rows in each window are sorted by the given orderings.
// They are then assigned numbers, where 1 is the first row in the window.
// Finally, only the first row in each window is chosen.
// (ROW_NUMBER guarantees that each row has a distinct number; RANK assigns the
// same number to identical rows, which means that the result may still contain
// duplicates.)
const qf = `
SELECT * EXCEPT (rownum)
FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY %s
ORDER BY %s
) AS rownum
FROM %s
) WHERE rownum = 1
`
return fmt.Sprintf(qf, partitionColumn, orderings, "`"+tableName+"`")
}