blob: 84d5520f149cc64e0721a1f5d9a05a2985bf87f4 [file] [log] [blame]
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package firestore implements [storage.DB] and [storage.VectorDB]
// using Google Cloud's [Firestore] service.
//
// A brief introduction to Firestore: it is a document DB with hierarchical keys.
// A key is a string of the form "coll1/id1/coll2/id2/.../collN/idN",
// where the colls are called "collections" and the values are called "documents".
// Each document is a set of key-value pairs. In Go, a document is represented
// by a map[string]any, or a struct with exported fields:
// the Go Firestore client provides convenience functions for converting documents to and from structs.
//
// The two database implementations in this package use three collections:
// - The "locks" collection holds documents representing the locks used by [DB.Lock] and [DB.Unlock].
// - The "values" collection holds the key-value pairs for [DB]. Keys are byte slices but document
// names are strings. We hex-encode the keys to obtain strings with the same ordering.
// - The "vectors" collection holds embeddding vectors for [VectorDB].
//
// [Firestore]: https://cloud.google.com/firestore
package firestore
import (
"context"
"errors"
"iter"
"log/slog"
"reflect"
"cloud.google.com/go/firestore"
"golang.org/x/oscar/internal/gcp/grpcerrors"
"golang.org/x/oscar/internal/storage"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)
// fstore implements operations common to both [storage.DB] and [storage.VectorDB].
type fstore struct {
client *firestore.Client
slog *slog.Logger
docQueryLimit int // number of docs to fetch in a single query (see scan)
}
// docQueryLimit is the default limit on the number of docs
// to process in a single firestore query (see scan). Its value
// has been experimentally shown sufficient to avoid timeouts
// in practice for all but extreme cases. Assuming fetching
// a single item takes 500ms on average, the docQueryLimit
// allows processing 100 items within a single minute while
// also leaving some extra buffer time.
const docQueryLimit = 100
func newFstore(ctx context.Context, lg *slog.Logger, projectID, database string, opts []option.ClientOption) (*fstore, error) {
if projectID == "" {
return nil, errors.New("firestore: empty projectID")
}
if database == "" {
database = firestore.DefaultDatabaseID
}
client, err := firestore.NewClientWithDatabase(ctx, projectID, database, opts...)
if err != nil {
return nil, err
}
// The client doesn't actually connect until something is done, so get a document
// to see if there's a problem.
if _, err := client.Doc("c/d").Get(ctx); err != nil && !grpcerrors.IsNotFound(err) {
return nil, err
}
return &fstore{client: client, slog: lg, docQueryLimit: docQueryLimit}, nil
}
func (f *fstore) Flush() {
// Firestore operations do not require flushing.
}
func (f *fstore) Close() {
if err := f.client.Close(); err != nil {
// unreachable except for bad DB
f.Panic("firestore close", "err", err)
}
}
func (f *fstore) Panic(msg string, args ...any) {
f.slog.Error(msg, args...)
storage.Panic(msg, args...)
}
// get retrieves the document with the given collection and ID.
// If tx is non-nil, the get happens inside the transaction.
func (f *fstore) get(tx *firestore.Transaction, coll *firestore.CollectionRef, id string) *firestore.DocumentSnapshot {
dr := coll.Doc(id)
var ds *firestore.DocumentSnapshot
var err error
if tx == nil {
ds, err = dr.Get(context.TODO())
} else {
ds, err = tx.Get(dr)
}
if err != nil {
if grpcerrors.IsNotFound(err) {
return nil
}
// unreachable except for bad DB
f.Panic("firestore get", "collection", coll, "key", id, "err", err)
}
return ds
}
// set sets the document with the given collection and ID to value.
// If tx is non-nil, the set happens inside the transaction.
func (f *fstore) set(tx *firestore.Transaction, coll *firestore.CollectionRef, key string, value any) {
dr := coll.Doc(key)
if dr == nil {
f.Panic("firestore set bad doc ref args", "collection", coll, "key", key)
}
var err error
if tx == nil {
_, err = dr.Set(context.TODO(), value)
} else {
err = tx.Set(dr, value)
}
if err != nil {
// unreachable except for bad DB
f.Panic("firestore set", "collection", coll, "key", key, "err", err)
}
}
// delete deletes the document with the given collection and ID.
// If tx is non-nil, the delete happens inside the transaction.
// It is not an error to call delete on a document that doesn't exist.
func (f *fstore) delete(tx *firestore.Transaction, coll *firestore.CollectionRef, key string) {
if len(key) == 0 {
return
}
dr := coll.Doc(key)
var err error
if tx == nil {
_, err = dr.Delete(context.TODO())
} else {
err = tx.Delete(dr)
}
if err != nil {
// unreachable except for bad DB
f.Panic("firestore delete", "collection", coll, "key", key, "err", err)
}
}
// deleteRange deletes all the documents in the collection coll whose IDs are between
// start and end, inclusive.
func (f *fstore) deleteRange(coll *firestore.CollectionRef, start, end string) {
bw := f.client.BulkWriter(context.TODO())
for ds := range f.scan(nil, coll, start, end) {
if _, err := bw.Delete(ds.Ref); err != nil {
// unreachable except for bad DB
f.Panic("firestore delete range", "collection", coll, "err", err)
}
}
bw.End()
}
// runTransaction executes fn inside a transaction.
func (f *fstore) runTransaction(fn func(ctx context.Context, tx *firestore.Transaction)) {
err := f.client.RunTransaction(context.TODO(), func(ctx context.Context, tx *firestore.Transaction) error {
fn(ctx, tx)
return nil
})
if err != nil {
// unreachable except for bad DB
f.Panic("firestore transaction", "err", err)
}
}
// scan returns an iterator over the documents in the collection coll whose IDs are
// between start and end, inclusive.
// An empty string for start or end indicates the beginning of the collection.
func (f *fstore) scan(tx *firestore.Transaction, coll *firestore.CollectionRef, start, end string) iter.Seq[*firestore.DocumentSnapshot] {
if end == "" {
// The iteration ends at the start of the collection, so there is nothing to iterate over.
return func(yield func(*firestore.DocumentSnapshot) bool) {
return
}
}
return func(yield func(*firestore.DocumentSnapshot) bool) {
next := func(start string) *firestore.DocumentIterator {
query := coll.OrderBy(firestore.DocumentID, firestore.Asc).Limit(f.docQueryLimit).EndAt(end)
if start != "" {
query = query.StartAt(start)
}
if tx == nil {
return query.Documents(context.TODO())
}
return tx.Documents(query)
}
var last string
for {
n := 0
it := next(start)
for { // should iterate up to f.docQueryLimit number of times
ds, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
// Unreachable except for bad DB or potential 60 seconds timeout
// syncdb: 22:38:20 ERROR firestore scan collection=projects/oscar-go-1/databases/prod/documents/values err="rpc error: code = Unavailable desc = Query timed out. Please try either limiting the entities scanned, or run with an updated index configuration."
// The timeout should not happen now with Query.Limit(f.docQueryLimit).
f.Panic("firestore scan", "collection", coll.Path, "err", err)
}
n++
last = ds.Ref.ID
if !yield(ds) {
return
}
}
start = keyAfter(last)
if n < f.docQueryLimit { // no more things to fetch
return
}
}
}
}
// dataTo converts the data in the given DocumentSnapshot to a value of type T.
func dataTo[T any](f *fstore, ds *firestore.DocumentSnapshot) T {
var data T
if err := ds.DataTo(&data); err != nil {
// unreachable except for bad DB
f.Panic("dataTo", "type", reflect.TypeFor[T](), "err", err)
}
return data
}
// batch implements [storage.Batch].
// All of a batch's operations must occur within the same collection.
type batch struct {
f *fstore
coll *firestore.CollectionRef
ops []*op
size int // approximate size of ops
hasDeleteRange bool // at least one op is a deleteRange
}
func (f *fstore) newBatch(coll *firestore.CollectionRef) *batch {
return &batch{f: f, coll: coll, size: fixedSize}
}
// An op is a single batch operation.
type op struct {
id string // or start, for deleteRange
value any // nil for delete, deleteRange
// for deleteRange:
end string
deleteIDs []string // the list of IDs to delete, determined inside the transaction
}
// Approximate proto sizes, determined empirically.
const (
fixedSize = 300
perWriteSize = 5
// Firestore supposedly has a limit of 10 MiB per request
// (see https://cloud.google.com/firestore/quotas#writes_and_transactions).
// However, I found that exceeding 4 MiB often failed
// with a "too big" error.
maxSize = 4 << 20
// Transactions tend to time out if they perform too many operations.
maxOps = 10_000
)
// set adds a set operation to the batch. It is the caller's responsibility to
// estimate the size of val.
func (b *batch) set(id string, val any, valSize int) {
if val == nil {
b.f.Panic("firestore batch set: nil value")
}
b.ops = append(b.ops, &op{id: id, value: val})
b.size += perWriteSize + len(id) + valSize
}
// delete adds a delete operation to the batch.
func (b *batch) delete(id string) {
if len(id) == 0 {
return
}
b.ops = append(b.ops, &op{id: id, value: nil})
b.size += perWriteSize + len(id)
}
// delete adds a deleteRange operation to the batch.
func (b *batch) deleteRange(start, end string) {
b.ops = append(b.ops, &op{id: start, end: end})
b.size += perWriteSize + len(start) + len(end)
b.hasDeleteRange = true
}
// maybeApply applies the batch if it is big enough.
func (b *batch) maybeApply() bool {
// Apply if the batch is large, or if there is even one deleteRange.
// We don't know how many documents are in the range, and each one
// is a separate operation in the transaction, so be conservative.
if b.size >= maxSize || len(b.ops) >= maxOps || b.hasDeleteRange {
b.apply()
return true
}
return false
}
// apply applies the batch by executing all its operations in a transaction.
func (b *batch) apply() {
b.f.runTransaction(func(ctx context.Context, tx *firestore.Transaction) {
// Reads must happen before writes in a transaction, and we can only delete
// a range by querying the documents.
// So read all the ranges first, and store the IDs in their respective ops
// to preserve deletion order.
for _, op := range b.ops {
if op.end != "" {
for ds := range b.f.scan(tx, b.coll, op.id, op.end) {
op.deleteIDs = append(op.deleteIDs, ds.Ref.ID)
}
}
}
// Execute each op inside the transaction.
for _, op := range b.ops {
switch {
case op.value != nil: // set
b.f.set(tx, b.coll, op.id, op.value)
case op.end == "": // delete
b.f.delete(tx, b.coll, op.id)
default: // deleteRange
for _, id := range op.deleteIDs {
b.f.delete(tx, b.coll, id)
}
}
}
})
b.ops = nil
b.size = fixedSize
}