blob: 3427a555408ba4c128dc55b23c8fdfd44ce0d690 [file] [log] [blame]
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package firestore implements [storage.DB] and [storage.VectorDB]
// using Google Cloud's [Firestore] service.
//
// A brief introduction to Firestore: it is a document DB with hierarchical keys.
// A key is a string of the form "coll1/id1/coll2/id2/.../collN/idN",
// where the colls are called "collections" and the values are called "documents".
// Each document is a set of key-value pairs. In Go, a document is represented
// by a map[string]any, or a struct with exported fields:
// the Go Firestore client provides convenience functions for converting documents to and from structs.
//
// The two database implementations in this package use three collections:
// - The "locks" collection holds documents representing the locks used by [DB.Lock] and [DB.Unlock].
// - The "values" collection holds the key-value pairs for [DB]. Keys are byte slices but document
// names are strings. We hex-encode the keys to obtain strings with the same ordering.
// - The "vectors" collection holds embeddding vectors for [VectorDB].
//
// [Firestore]: https://cloud.google.com/firestore
package firestore
import (
"context"
"iter"
"log/slog"
"reflect"
"cloud.google.com/go/firestore"
"golang.org/x/oscar/internal/storage"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// DBOptions is the configuration for a [DB] or [VectorDB].
type DBOptions struct {
ProjectID string // if empty, attempt to detect the project
Database string // if empty, use the default database
Logger *slog.Logger
ClientOptions []option.ClientOption // options to the Firestore client
}
// fstore implements operations common to both [storage.DB] and [storage.VectorDB].
type fstore struct {
client *firestore.Client
slog *slog.Logger
}
func newFstore(ctx context.Context, dbopts *DBOptions) (*fstore, error) {
if dbopts == nil {
dbopts = &DBOptions{}
}
if dbopts.ProjectID == "" {
dbopts.ProjectID = firestore.DetectProjectID
}
if dbopts.Database == "" {
dbopts.Database = firestore.DefaultDatabaseID
}
if dbopts.Logger == nil {
dbopts.Logger = slog.Default()
}
client, err := firestore.NewClientWithDatabase(ctx, dbopts.ProjectID, dbopts.Database, dbopts.ClientOptions...)
if err != nil {
return nil, err
}
// The client doesn't actually connect until something is done, so get a document
// to see if there's a problem.
if _, err := client.Doc("c/d").Get(ctx); err != nil && !isNotFound(err) {
return nil, err
}
return &fstore{client: client, slog: dbopts.Logger}, nil
}
func (f *fstore) Flush() {
// Firestore operations do not require flushing.
}
func (f *fstore) Close() {
if err := f.client.Close(); err != nil {
// unreachable except for bad DB
f.Panic("firestore close", "err", err)
}
}
func (f *fstore) Panic(msg string, args ...any) {
f.slog.Error(msg, args...)
storage.Panic(msg, args...)
}
// get retrieves the document with the given collection and ID.
// If tx is non-nil, the get happens inside the transaction.
func (f *fstore) get(tx *firestore.Transaction, coll, id string) *firestore.DocumentSnapshot {
dr := f.client.Collection(coll).Doc(id)
var ds *firestore.DocumentSnapshot
var err error
if tx == nil {
ds, err = dr.Get(context.TODO())
} else {
ds, err = tx.Get(dr)
}
if err != nil {
if isNotFound(err) {
return nil
}
// unreachable except for bad DB
f.Panic("firestore get", "collection", coll, "key", id, "err", err)
}
return ds
}
// set sets the document with the given collection and ID to value.
// If tx is non-nil, the set happens inside the transaction.
func (f *fstore) set(tx *firestore.Transaction, coll, key string, value any) {
dr := f.client.Collection(coll).Doc(key)
var err error
if tx == nil {
_, err = dr.Set(context.TODO(), value)
} else {
err = tx.Set(dr, value)
}
if err != nil {
// unreachable except for bad DB
f.Panic("firestore set", "collection", coll, "key", key, "err", err)
}
}
// delete deletes the document with the given collection and ID.
// If tx is non-nil, the delete happens inside the transaction.
// It is not an error to call delete on a document that doesn't exist.
func (f *fstore) delete(tx *firestore.Transaction, coll, key string) {
dr := f.client.Collection(coll).Doc(key)
var err error
if tx == nil {
_, err = dr.Delete(context.TODO())
} else {
err = tx.Delete(dr)
}
if err != nil {
// unreachable except for bad DB
f.Panic("firestore delete", "collection", coll, "key", key, "err", err)
}
}
// deleteRange deletes all the documents in the collection coll whose IDs are between
// start and end, inclusive.
func (f *fstore) deleteRange(coll, start, end string) {
bw := f.client.BulkWriter(context.TODO())
for ds := range f.scan(nil, coll, start, end) {
if _, err := bw.Delete(ds.Ref); err != nil {
// unreachable except for bad DB
f.Panic("firestore delete range", "collection", coll, "err", err)
}
}
bw.End()
}
// runTransaction executes fn inside a transaction.
func (f *fstore) runTransaction(fn func(ctx context.Context, tx *firestore.Transaction)) {
err := f.client.RunTransaction(context.TODO(), func(ctx context.Context, tx *firestore.Transaction) error {
fn(ctx, tx)
return nil
})
if err != nil {
// unreachable except for bad DB
f.Panic("firestore transaction", "err", err)
}
}
// scan returns an iterator over the documents in the collection coll whose IDs are
// between start and end, inclusive.
func (f *fstore) scan(tx *firestore.Transaction, coll, start, end string) iter.Seq[*firestore.DocumentSnapshot] {
query := f.client.Collection(coll).
OrderBy(firestore.DocumentID, firestore.Asc).
StartAt(start).
EndAt(end)
var iter *firestore.DocumentIterator
if tx == nil {
iter = query.Documents(context.TODO())
} else {
iter = tx.Documents(query)
}
return func(yield func(*firestore.DocumentSnapshot) bool) {
defer iter.Stop()
for {
ds, err := iter.Next()
if err == iterator.Done {
return
}
if err != nil {
// unreachable except for bad DB
f.Panic("firestore scan", "collection", coll, "err", err)
}
if !yield(ds) {
return
}
}
}
}
// dataTo converts the data in the given DocumentSnapshot to a value of type T.
func dataTo[T any](f *fstore, ds *firestore.DocumentSnapshot) T {
var data T
if err := ds.DataTo(&data); err != nil {
// unreachable except for bad DB
f.Panic("dataTo", "type", reflect.TypeFor[T](), "err", err)
}
return data
}
// batch implements [storage.Batch].
// All of a batch's operations must occur within the same collection.
type batch struct {
f *fstore
coll string
ops []*op
size int // approximate size of ops
hasDeleteRange bool // at least one op is a deleteRange
}
func (f *fstore) newBatch(coll string) *batch {
return &batch{f: f, coll: coll, size: fixedSize}
}
// An op is a single batch operation.
type op struct {
id string // or start, for deleteRange
value any // nil for delete, deleteRange
// for deleteRange:
end string
deleteIDs []string // the list of IDs to delete, determined inside the transaction
}
// Approximate proto sizes, determined empirically.
const (
fixedSize = 300
perWriteSize = 5
// Firestore supposedly has a limit of 10 MiB per request
// (see https://cloud.google.com/firestore/quotas#writes_and_transactions).
// However, I found that exceeding 4 MiB often failed
// with a "too big" error.
maxSize = 4 << 20
)
// set adds a set operation to the batch. It is the caller's responsibility to
// estimate the size of val.
func (b *batch) set(id string, val any, valSize int) {
if val == nil {
panic("firestore batch set: nil value")
}
b.ops = append(b.ops, &op{id: id, value: val})
b.size += perWriteSize + len(id) + valSize
}
// delete adds a delete operation to the batch.
func (b *batch) delete(id string) {
b.ops = append(b.ops, &op{id: id, value: nil})
b.size += perWriteSize + len(id)
}
// delete adds a deleteRange operation to the batch.
func (b *batch) deleteRange(start, end string) {
b.ops = append(b.ops, &op{id: start, end: end})
b.size += perWriteSize + len(start) + len(end)
b.hasDeleteRange = true
}
// maybeApply applies the batch if it is big enough.
func (b *batch) maybeApply() bool {
// Apply if the batch is large, or if there is even one deleteRange.
// We don't know how many documents are in the range, and each one
// is a separate operation in the transaction, so be conservative.
if b.size > maxSize || b.hasDeleteRange {
b.apply()
return true
}
return false
}
// apply applies the batch by executing all its operations in a transaction.
func (b *batch) apply() {
b.f.runTransaction(func(ctx context.Context, tx *firestore.Transaction) {
// Reads must happen before writes in a transaction, and we can only delete
// a range by querying the documents.
// So read all the ranges first, and store the IDs in their respective ops
// to preserve deletion order.
for _, op := range b.ops {
if op.end != "" {
for ds := range b.f.scan(tx, b.coll, op.id, op.end) {
op.deleteIDs = append(op.deleteIDs, ds.Ref.ID)
}
}
}
// Execute each op inside the transaction.
for _, op := range b.ops {
switch {
case op.value != nil: // set
b.f.set(tx, b.coll, op.id, op.value)
case op.end == "": // delete
b.f.delete(tx, b.coll, op.id)
default: // deleteRange
for _, id := range op.deleteIDs {
b.f.delete(tx, b.coll, id)
}
}
}
})
b.ops = nil
b.size = fixedSize
}
// isNotFound reports whether an error returned by the Firestore client is a NotFound
// error.
func isNotFound(err error) bool {
return status.Code(err) == codes.NotFound
}
// isTimeout reports whether an error returned by the Firestore client indicates a timeout.
func isTimeout(err error) bool {
return status.Code(err) == codes.DeadlineExceeded
}