blob: 7f04c37023bc419029800dbb7a551be52b68fdc4 [file] [log] [blame]
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package bigquery provides a client for reading and writing to BigQuery.
package bigquery
import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"net/http"
"sort"
"strings"
"sync"
"time"
bq "cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
"golang.org/x/pkgsite-metrics/internal/derrors"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
)
// Client is a client for connecting to BigQuery.
type Client struct {
client *bq.Client
dataset *bq.Dataset
deleteDatasetOnClose bool
}
// NewClientCreate creates a new client for connecting to BigQuery, referring
// to a single dataset. It creates the dataset if it doesn't exist.
func NewClientCreate(ctx context.Context, projectID, datasetID string) (_ *Client, err error) {
if err := CreateDataset(ctx, projectID, datasetID); err != nil {
return nil, err
}
return newClient(ctx, projectID, datasetID)
}
func newClient(ctx context.Context, projectID, datasetID string) (_ *Client, err error) {
defer derrors.Wrap(&err, "New(ctx, %q, %q)", projectID, datasetID)
client, err := bq.NewClient(ctx, projectID)
if err != nil {
return nil, err
}
dataset := client.DatasetInProject(projectID, datasetID)
// Check that the dataset exists and is accessible.
if _, err := dataset.Metadata(ctx); err != nil {
return nil, err
}
return &Client{
client: client,
dataset: dataset,
}, nil
}
func (c *Client) Close() (err error) {
if c.deleteDatasetOnClose {
err = c.dataset.DeleteWithContents(context.Background())
}
return errors.Join(err, c.client.Close())
}
// NewClientForTesting creates a client with a new, unique dataset. Closing the client deletes the dataset.
func NewClientForTesting(ctx context.Context, projectID, testID string) (*Client, error) {
dsID := fmt.Sprintf("test_%s_%s", testID, time.Now().Format("20060102T030405"))
c, err := NewClientCreate(ctx, projectID, dsID)
if err != nil {
return nil, err
}
c.deleteDatasetOnClose = true
return c, nil
}
// CreateDataset creates a BigQuery dataset if it does not already exist.
func CreateDataset(ctx context.Context, projectID, datasetID string) (err error) {
defer derrors.Wrap(&err, "CreateDataset(%q, %q)", projectID, datasetID)
client, err := bq.NewClient(ctx, projectID)
if err != nil {
return err
}
dataset := client.DatasetInProject(projectID, datasetID)
// If the dataset exists, do not try to create it. This will
// avoid generating confusing error messages in logs.
if _, err := dataset.Metadata(ctx); err != nil {
if !isNotFoundError(err) {
return err
}
} else {
return nil
}
err = dataset.Create(ctx, &bq.DatasetMetadata{Name: datasetID})
if err != nil && !isAlreadyExistsError(err) {
// check already-exists error for sanity and in case of races
return err
}
return nil
}
// isNotFoundError reports whether the error, which should come from a cloud.google.com/go/bigquery
// client call, is a NotFound error.
func isNotFoundError(err error) bool {
return hasCode(err, http.StatusNotFound)
}
func isAlreadyExistsError(err error) bool {
// The BigQuery API uses 409 for something that exists.
return hasCode(err, http.StatusConflict)
}
func isRaceChangeError(err error) bool {
// Changing the table in between update calls will make the
// Etag invalid and result in a PreconditionFailed error.
return hasCode(err, http.StatusPreconditionFailed)
}
func hasCode(err error, code int) bool {
var gerr *googleapi.Error
if !errors.As(err, &gerr) {
return false
}
return gerr.Code == code
}
// Dataset returns the underlying client dataset.
func (c *Client) Dataset() *bq.Dataset {
return c.dataset
}
// Table returns a handle for the given tableID in the client's dataset.
func (c *Client) Table(tableID string) *bq.Table {
return c.dataset.Table(tableID)
}
// FullTableName returns the fully-qualified name of the table, suitable for
// use in queries.
func (c *Client) FullTableName(tableID string) string {
// From https://github.com/googleapis/google-cloud-go/blob/bigquery/v1.43.0/bigquery/table.go#L544.
return fmt.Sprintf("%s.%s.%s", c.dataset.ProjectID, c.dataset.DatasetID, tableID)
}
// CreateOrUpdateTable creates a table if it does not exist, or updates it if it does.
// It returns true if it created the table.
func (c *Client) CreateOrUpdateTable(ctx context.Context, tableID string) (created bool, err error) {
defer derrors.Wrap(&err, "CreateOrUpdateTable(%q)", tableID)
schema := TableSchema(tableID)
if schema == nil {
return false, fmt.Errorf("no schema registered for table %q", tableID)
}
meta, err := c.Table(tableID).Metadata(ctx) // check if the table already exists
if err != nil {
if !isNotFoundError(err) {
return false, err
}
return true, c.Table(tableID).Create(ctx, &bq.TableMetadata{Schema: schema})
}
if SchemaVersion(schema) == SchemaVersion(meta.Schema) {
// The schemas are the same, so we don't need to do anything. In fact, any
// update, even an idempotent one, will result in table patching that counts
// towards quota limits for table metadata updates.
return false, nil
}
_, err = c.Table(tableID).Update(ctx, bq.TableMetadataToUpdate{Schema: schema}, meta.ETag)
// There is a race condition if multiple threads of control call this function concurrently:
// The table may have changed since Metadata was called above. This error is harmless: it
// just means that someone else updated the table before us. Ignore it.
if isAlreadyExistsError(err) || isRaceChangeError(err) {
// check already-exists error for sanity and in case of races
return false, nil
}
return false, err
}
// A Row is something that can be uploaded to BigQuery.
type Row interface {
SetUploadTime(time.Time)
}
// Upload inserts a row into the table.
func (c *Client) Upload(ctx context.Context, tableID string, row Row) (err error) {
defer derrors.Wrap(&err, "Upload(ctx, %q)", tableID)
u := c.Table(tableID).Inserter()
row.SetUploadTime(time.Now())
return u.Put(ctx, row)
}
// UploadMany inserts multiple rows into the table.
// Each row should be a struct pointer.
// The chunkSize parameter limits the number of rows sent in a single request; this may
// be necessary to avoid reaching the maximum size of a request.
// If chunkSize is <= 0, all rows will be sent in one request.
func UploadMany[T Row](ctx context.Context, client *Client, tableID string, rows []T, chunkSize int) (err error) {
defer derrors.Wrap(&err, "UploadMany(%q), %d rows, chunkSize=%d", tableID, len(rows), chunkSize)
now := time.Now()
// Set upload time.
for _, r := range rows {
r.SetUploadTime(now)
}
ins := client.Table(tableID).Inserter()
if chunkSize <= 0 {
return ins.Put(ctx, rows)
}
start := 0
for start < len(rows) {
end := start + chunkSize
if end > len(rows) {
end = len(rows)
}
for {
if err := ins.Put(ctx, rows[start:end]); err == nil {
break
} else if hasCode(err, http.StatusRequestEntityTooLarge) && end-start > 1 {
// Request too large; reduce this chunk size by half.
end = start + (end-start)/2
continue
} else {
return err
}
}
start = end
}
return nil
}
// ForEachRow calls f for each row in the given iterator.
// It returns as soon as f returns false.
func ForEachRow[T any](iter *bq.RowIterator, f func(*T) bool) error {
for {
var row T
err := iter.Next(&row)
if err == iterator.Done {
break
}
if err != nil {
return err
}
if !f(&row) {
break
}
}
return nil
}
// All returns all rows returned by iter.
func All[T any](iter *bq.RowIterator) ([]*T, error) {
var ts []*T
err := ForEachRow(iter, func(t *T) bool {
ts = append(ts, t)
return true
})
if err != nil {
return nil, err
}
return ts, nil
}
func (c *Client) Query(ctx context.Context, q string) (*bq.RowIterator, error) {
return c.client.Query(q).Read(ctx)
}
// NullFloat constructs a bq.NullFloat64
func NullFloat(f float64) bq.NullFloat64 {
return bq.NullFloat64{Float64: f, Valid: true}
}
// NullString constructs a bq.NullString.
func NullString(s string) bq.NullString {
return bq.NullString{StringVal: s, Valid: true}
}
// NullInt constructs a bq.NullInt.
func NullInt(i int) bq.NullInt64 {
return bq.NullInt64{Int64: int64(i), Valid: true}
}
// NullTime constructs a bq.NullTime.
func NullTime(t time.Time) bq.NullTime {
return bq.NullTime{Time: civil.TimeOf(t), Valid: true}
}
// SchemaVersion computes a relatively short string from a schema, such that
// different schemas result in different strings with high probability.
func SchemaVersion(schema bq.Schema) string {
hash := sha256.Sum256([]byte(SchemaString(schema)))
return hex.EncodeToString(hash[:])
}
// SchemaString returns a long, human-readable string summarizing schema.
func SchemaString(schema bq.Schema) string {
var b strings.Builder
// Order of columns does not matter in relational algebra,
// so we sort them by column name.
var fields []*bq.FieldSchema
for _, f := range schema {
fields = append(fields, f)
}
sort.SliceStable(fields, func(i, j int) bool {
return fields[i].Name < fields[j].Name // fields cannot have the same name
})
for i, field := range fields {
if i > 0 {
b.WriteRune(';')
}
b.WriteString(field.Name)
if field.Repeated {
b.WriteString(",rep")
}
if field.Required {
b.WriteString(",req")
}
b.WriteByte(':')
if field.Type == bq.RecordFieldType {
fmt.Fprintf(&b, "(%s)", SchemaString(field.Schema))
} else {
b.WriteString(string(field.Type))
}
}
return b.String()
}
var (
tableMu sync.Mutex
tables = map[string]bq.Schema{}
)
// AddTable records the schema for a table, so table creation just needs the name.
func AddTable(tableID string, s bq.Schema) {
tableMu.Lock()
defer tableMu.Unlock()
tables[tableID] = s
}
// TableSchema returns the schema associated with the given table,
// or nil if there is none.
func TableSchema(tableID string) bq.Schema {
tableMu.Lock()
defer tableMu.Unlock()
return tables[tableID]
}
// PartitionQuery describes a query that returns one row for each distinct value
// of the partition columns in the given table.
//
// The selected row will be the first one according to the OrderBy clauses.
//
// For example, say the students table holds student names and classes.
// Then
//
// PartitionQuery{
// Table: "students",
// PartitionOn: "class",
// OrderBy: "name ASC",
// }.String()
//
// will construct a query returning the student in each class whose name is
// alphabetically first.
//
// (BigQuery SQL has no DISTINCT ON feature and doesn't allow columns of type RECORD
// in queries with DISTINCT, so we have to take this approach.)
type PartitionQuery struct {
From string // should use full table name
Columns string // comma-separated columns to select, or "*" ("" => "*")
PartitionOn string // comma-separated columns defining the partition
Where string // WHERE clause
OrderBy string // text after ORDER BY: comma-separated columns, each
// optionally followed by DESC or ASC
}
func (q PartitionQuery) String() string {
// This query first organizes the table rows into windows that have the same partitionColumn.
// The rows in each window are sorted by the given orderings.
// They are then assigned numbers, where 1 is the first row in the window.
// Finally, only the first row in each window is chosen.
// (ROW_NUMBER guarantees that each row has a distinct number; RANK assigns the
// same number to identical rows, which means that the result may still contain
// duplicates.)
const qf = `
SELECT * EXCEPT (rownum)
FROM (
SELECT %s, ROW_NUMBER() OVER (
PARTITION BY %s
ORDER BY %s
) AS rownum
FROM %s
%s
) WHERE rownum = 1
`
cols := q.Columns
if cols == "" {
cols = "*"
}
where := q.Where
if where != "" {
where = "WHERE " + where
}
return fmt.Sprintf(qf, cols, q.PartitionOn, q.OrderBy, q.From, where)
}
// InferSchema is a copy of cloud.google.com/go/bigquery.InferSchema so
// users don't have to import cloud.google.com/go/bigquery just to get it.
var InferSchema = bq.InferSchema