| // Copyright 2024 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Package pebble implements a storage.DB using Pebble, |
| // a production-quality key-value database from CockroachDB. |
| // A pebble database can only be opened by one process at a time. |
| package pebble |
| |
| import ( |
| "bytes" |
| "cmp" |
| "iter" |
| "log/slog" |
| |
| "github.com/cockroachdb/pebble" |
| "golang.org/x/oscar/internal/storage" |
| ) |
| |
| // Open opens an existing Pebble database in the named directory. |
| // The database must already exist. |
| func Open(lg *slog.Logger, dir string) (storage.DB, error) { |
| return open(lg, dir, &pebble.Options{ErrorIfNotExists: true}) |
| } |
| |
| // Create creates a new Pebble database in the named directory. |
| // The database (and directory) must not already exist. |
| func Create(lg *slog.Logger, dir string) (storage.DB, error) { |
| return open(lg, dir, &pebble.Options{ErrorIfExists: true}) |
| } |
| |
| func open(lg *slog.Logger, dir string, opts *pebble.Options) (storage.DB, error) { |
| p, err := pebble.Open(dir, opts) |
| if err != nil { |
| lg.Error("pebble open", "dir", dir, "create", opts.ErrorIfExists, "err", err) |
| return nil, err |
| } |
| return &db{p: p, slog: lg}, nil |
| } |
| |
| type db struct { |
| p *pebble.DB |
| m storage.MemLocker |
| slog *slog.Logger |
| } |
| |
| type batch struct { |
| db *db |
| b *pebble.Batch |
| } |
| |
| func (d *db) Lock(key string) { |
| d.m.Lock(key) |
| } |
| |
| func (d *db) Unlock(key string) { |
| d.m.Unlock(key) |
| } |
| |
| func (d *db) get(key []byte, yield func(val []byte)) { |
| v, c, err := d.p.Get(key) |
| if err == pebble.ErrNotFound { |
| return |
| } |
| if err != nil { |
| // unreachable except db error |
| d.Panic("pebble get", "key", storage.Fmt(key), "err", err) |
| } |
| yield(v) |
| c.Close() |
| } |
| |
| func (d *db) Get(key []byte) (val []byte, ok bool) { |
| d.get(key, func(v []byte) { |
| val = bytes.Clone(v) |
| ok = true |
| }) |
| return |
| } |
| |
| var ( |
| sync = &pebble.WriteOptions{Sync: true} |
| noSync = &pebble.WriteOptions{Sync: false} |
| ) |
| |
| func (d *db) Panic(msg string, args ...any) { |
| d.slog.Error(msg, args...) |
| storage.Panic(msg, args...) |
| } |
| |
| func (d *db) Set(key, val []byte) { |
| if len(key) == 0 { |
| d.Panic("pebble set: empty key") |
| } |
| if err := d.p.Set(key, val, noSync); err != nil { |
| // unreachable except db error |
| d.Panic("pebble set", "key", storage.Fmt(key), "val", storage.Fmt(val), "err", err) |
| } |
| } |
| |
| func (d *db) Delete(key []byte) { |
| if err := d.p.Delete(key, noSync); err != nil { |
| // unreachable except db error |
| d.Panic("pebble delete", "key", storage.Fmt(key), "err", err) |
| } |
| } |
| |
| func (d *db) DeleteRange(start, end []byte) { |
| err := cmp.Or( |
| d.p.DeleteRange(start, end, noSync), |
| d.p.Delete(end, noSync), |
| ) |
| if err != nil { |
| // unreachable except db error |
| d.Panic("pebble delete range", "start", storage.Fmt(start), "end", storage.Fmt(end), "err", err) |
| } |
| } |
| |
| func (d *db) Flush() { |
| if err := d.p.Flush(); err != nil { |
| // unreachable except db error |
| d.Panic("pebble flush", "err", err) |
| } |
| } |
| |
| func (d *db) Close() { |
| if err := d.p.Close(); err != nil { |
| // unreachable except db error |
| d.Panic("pebble close", "err", err) |
| } |
| } |
| |
| func (d *db) Scan(start, end []byte) iter.Seq2[[]byte, func() []byte] { |
| start = bytes.Clone(start) |
| end = bytes.Clone(end) |
| return func(yield func(key []byte, val func() []byte) bool) { |
| // Note: Pebble's UpperBound is non-inclusive (not included in the scan) |
| // but we want to include the key end in the scan, |
| // so do not use UpperBound; we check during the iteration instead. |
| iter, err := d.p.NewIter(&pebble.IterOptions{ |
| LowerBound: start, |
| }) |
| if err != nil { |
| // unreachable except db error |
| d.Panic("pebble new iterator", "start", storage.Fmt(start), "err", err) |
| } |
| defer iter.Close() |
| for iter.First(); iter.Valid(); iter.Next() { |
| key := iter.Key() |
| if bytes.Compare(key, end) > 0 { |
| break |
| } |
| val := func() []byte { |
| v, err := iter.ValueAndErr() |
| if err != nil { |
| // unreachable except db error |
| d.Panic("pebble iterator value", "key", storage.Fmt(key), "err", err) |
| } |
| return v |
| } |
| if !yield(key, val) { |
| return |
| } |
| } |
| } |
| } |
| |
| func (d *db) Batch() storage.Batch { |
| return &batch{d, d.p.NewBatch()} |
| } |
| |
| func (b *batch) Set(key, val []byte) { |
| if len(key) == 0 { |
| b.db.Panic("pebble batch set: empty key") |
| } |
| if err := b.b.Set(key, val, noSync); err != nil { |
| // unreachable except db error |
| b.db.Panic("pebble batch set", "key", storage.Fmt(key), "val", storage.Fmt(val), "err", err) |
| } |
| } |
| |
| func (b *batch) Delete(key []byte) { |
| if err := b.b.Delete(key, noSync); err != nil { |
| // unreachable except db error |
| b.db.Panic("pebble batch delete", "key", storage.Fmt(key), "err", err) |
| } |
| } |
| |
| func (b *batch) DeleteRange(start, end []byte) { |
| err := cmp.Or( |
| b.b.DeleteRange(start, end, noSync), |
| b.b.Delete(end, noSync), |
| ) |
| if err != nil { |
| // unreachable except db error |
| b.db.Panic("pebble batch delete range", "start", storage.Fmt(start), "end", storage.Fmt(end), "err", err) |
| } |
| } |
| |
| // Pebble imposes a higher maximum batch size (4GB), but 100MB is fine. |
| // That's also what storage.Batch's interface definition says is a “typical limit”. |
| const maxBatch = 100e6 |
| |
| func (b *batch) MaybeApply() bool { |
| if b.b.Len() > maxBatch { |
| b.Apply() |
| return true |
| } |
| return false |
| } |
| |
| func (b *batch) Apply() { |
| if err := b.db.p.Apply(b.b, noSync); err != nil { |
| // unreachable except db error |
| b.db.Panic("pebble batch apply", "err", err) |
| } |
| b.b.Reset() |
| } |