blob: 676691607096bbda572232bf6edc4f6e44cf2f8f [file] [log] [blame]
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package storage
import (
"bytes"
"iter"
"sync"
"rsc.io/ordered"
)
type overlayDB struct {
MemLocker
mu sync.RWMutex
overlay DB
base DB
}
type keyRange struct {
start, end []byte
}
// Start of keys used for the overlay implementation.
const overlayPrefix = "__overlay"
// NewOverlayDB returns a DB that combines overlay with base.
// Reads happen from the overlay first, then the base.
// All writes go to the overlay.
//
// An overlay DB should only be used for testing. It can violate the
// specification of [DB] when a process is writing to the base concurrently.
// Locks held in the overlay will not be observed by the base, so changes
// from other processes can occur even while the process with the overlay
// has the lock.
//
// The overlay DB assumes that all keys are encoded with [rsc.io/ordered].
// The part of the key space beginning with ordered.Encode(overlayPrefix) in the overlay
// DB is reserved for use by the implementation.
func NewOverlayDB(overlay, base DB) DB {
return &overlayDB{
overlay: overlay,
base: base,
}
}
// Get returns the value associated with the key.
func (db *overlayDB) Get(key []byte) (val []byte, ok bool) {
db.mu.RLock()
defer db.mu.RUnlock()
if oval, ok := db.overlay.Get(key); ok {
return oval, true
}
if db.deleted(key) {
return nil, false
}
return db.base.Get(key)
}
// Set sets the value associated with key to val.
func (db *overlayDB) Set(key, val []byte) {
db.mu.Lock()
defer db.mu.Unlock()
db.setLocked(key, val)
}
func (db *overlayDB) setLocked(key, val []byte) {
db.overlay.Set(key, val)
db.unmarkDeleted(key)
}
// Delete deletes any entry with the given key.
func (db *overlayDB) Delete(key []byte) {
db.mu.Lock()
defer db.mu.Unlock()
db.deleteLocked(key)
}
func (db *overlayDB) deleteLocked(key []byte) {
db.overlay.Delete(key)
db.markDeleted(key)
}
// DeleteRange deletes all entries with start ≤ key ≤ end.
func (db *overlayDB) DeleteRange(start, end []byte) {
db.mu.Lock()
defer db.mu.Unlock()
db.deleteRangeLocked(start, end)
}
func (db *overlayDB) deleteRangeLocked(start, end []byte) {
// TODO(maybe): consolidate ranges
db.overlay.DeleteRange(start, end)
db.markRangeDeleted(start, end)
}
// Scan returns an iterator over all key-value pairs
// in the range start ≤ key ≤ end.
// It does not guaranteed a consistent view of the DB (snapshot); keys and values
// may change during the iteration.
func (db *overlayDB) Scan(start, end []byte) iter.Seq2[[]byte, func() []byte] {
return func(yield func([]byte, func() []byte) bool) {
db.mu.RLock()
locked := true
defer func() {
if locked {
db.mu.RUnlock()
}
}()
// Filter out all keys in base that have been deleted.
fbase := filter2(db.base.Scan(start, end),
func(k []byte, v func() []byte) bool { return !db.deleted(k) })
// Merge all the keys in overlay with the undeleted ones in base.
for k, vf := range unionFunc2(db.overlay.Scan(start, end), fbase, bytes.Compare) {
// Release the lock so yield can call methods on db.
db.mu.RUnlock()
locked = false
if !yield(k, vf) {
return
}
db.mu.RLock()
locked = true
}
}
}
// filter2 returns a sequence that consists of all the elements of s for which f returns true.
func filter2[K, V any](s iter.Seq2[K, V], f func(K, V) bool) iter.Seq2[K, V] {
return func(yield func(K, V) bool) {
for k, v := range s {
if !f(k, v) {
continue
}
if !yield(k, v) {
return
}
}
}
}
// unionFunc2 returns an iterator over all elements of s1 and s2, with keys in the same order.
// The keys of s1 and s2 must both be sorted according to cmp.
// If s1 and s2 have the same key, it is yielded once, with s1's value.
func unionFunc2[K, V any](s1, s2 iter.Seq2[K, V], cmp func(K, K) int) iter.Seq2[K, V] {
return func(yield func(K, V) bool) {
next, stop := iter.Pull2(s2)
defer stop()
k2, v2, ok := next()
for k1, v1 := range s1 {
for ok && cmp(k2, k1) < 0 {
if !yield(k2, v2) {
return
}
k2, v2, ok = next()
}
if !yield(k1, v1) {
return
}
if cmp(k1, k2) == 0 {
k2, v2, ok = next()
}
}
for ; ok; k2, v2, ok = next() {
if !yield(k2, v2) {
return
}
}
}
}
// markDeleted marks key as deleted.
// It isn't sufficient to simply delete the key in the overlay, because
// the key may exist in the base as well.
func (db *overlayDB) markDeleted(key []byte) {
tombstone := ordered.Encode(overlayPrefix, ordered.Raw(key))
db.overlay.Set(tombstone, nil)
}
// unmarkDeleted removes from the database the marker that key is deleted.
// It is not strictly necessary to do this when a key is set, but it
// saves some space.
func (db *overlayDB) unmarkDeleted(key []byte) {
tombstone := ordered.Encode(overlayPrefix, ordered.Raw(key))
db.overlay.Delete(tombstone)
}
// markRangeDeleted marks a range of keys as deleted.
// It isn't sufficient to delete each key in the range that appears in base,
// because a key in the range might be added to base but not overlay, and then
// it would be visible.
func (db *overlayDB) markRangeDeleted(start, end []byte) {
// The key for a deleted range is the start of the range.
key := ordered.Encode(overlayPrefix, "ranges", ordered.Raw(start))
db.overlay.Set(key, end)
}
// deleted reports whether key is deleted.
// The result is only valid for db if key is not present in db.overlay;
// keys in db.overlay are not deleted, regardless of what this function reports.
// deleted must be called with the lock held.
func (db *overlayDB) deleted(key []byte) bool {
tombstone := ordered.Encode(overlayPrefix, ordered.Raw(key))
if _, ok := db.overlay.Get(tombstone); ok {
return true
}
// Scan deleted ranges up to where the start of the range is equal to key.
prefix := ordered.Encode(overlayPrefix, "ranges")
for _, vf := range db.overlay.Scan(prefix, append(prefix, ordered.Encode(ordered.Raw(key))...)) {
// We know start <= key, so just compare end >= key.
end := vf()
if bytes.Compare(end, key) >= 0 {
return true
}
}
return false
}
// Batch returns a new batch.
func (db *overlayDB) Batch() Batch {
return &overlayBatch{db: db}
}
// Flush flushes everything to persistent storage.
func (db *overlayDB) Flush() {
// overlay is a memDB and base is never written; nothing to flush.
}
func (db *overlayDB) Close() {
db.base.Close()
db.overlay.Close()
}
func (db *overlayDB) Panic(msg string, args ...any) {
Panic(msg, args...)
}
// An overlayBatch is a Batch for an overlayDB.
type overlayBatch struct {
db *overlayDB // underlying database
ops []func() // operations to apply
}
func (b *overlayBatch) Set(key, val []byte) {
if len(key) == 0 {
b.db.Panic("overlaydb batch set: empty key")
}
k := bytes.Clone(key)
v := bytes.Clone(val)
b.ops = append(b.ops, func() { b.db.setLocked(k, v) })
}
func (b *overlayBatch) Delete(key []byte) {
k := bytes.Clone(key)
b.ops = append(b.ops, func() { b.db.deleteLocked(k) })
}
func (b *overlayBatch) DeleteRange(start, end []byte) {
s := bytes.Clone(start)
e := bytes.Clone(end)
b.ops = append(b.ops, func() { b.db.deleteRangeLocked(s, e) })
}
func (b *overlayBatch) MaybeApply() bool {
return false
}
func (b *overlayBatch) Apply() {
b.db.mu.Lock()
defer b.db.mu.Unlock()
for _, op := range b.ops {
op()
}
b.ops = nil
}