| // Copyright 2024 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Package timed implements modification-time-indexed storage |
| // that can be accessed in modification-time order as well as key order. |
| // |
| // Timed storage builds on top of a basic key-value storage by |
| // maintaining two actual database entries for each logical entry. |
| // The two actual entries for the logical entry (kind, key) → val are: |
| // |
| // - (kind, key) → (modtime, val) |
| // - (kindByTime, modtime, key) → () |
| // |
| // The “kind” is a key namespace that allows maintaining multiple |
| // independent sets of modification-time-indexed storage in a |
| // single [storage.DB]. |
| // |
| // The “modtime” (of type [DBTime]) is an opaque timestamp representing the time when |
| // the logical entry was last set. Other than being a monotonically |
| // increasing integer, no specific semantics are guaranteed about the |
| // meaning of the dbtime. |
| // |
| // An [Entry] represents a single logical entry: |
| // |
| // type Entry struct { |
| // ModTime DBTime // time entry was written |
| // Kind string // key namespace |
| // Key []byte // key |
| // Val []byte // value |
| // } |
| // |
| // The basic [Get], [Set], [Scan], [Delete], and [DeleteRange] functions |
| // are analogous to those in [storage.DB] |
| // but modified to accommodate the new entries: |
| // |
| // - Get(db, kind, key) returns an *Entry. |
| // The db is the underlying storage being used. |
| // |
| // - Set(db, batch, kind, key, val) adds or replaces an entry. |
| // The db is the underlying storage, which is consulted, |
| // but the actual modifications are written to batch, |
| // so that the two different actual entries can be applied |
| // as a single unit. |
| // |
| // - Scan(db, kind, start, end) returns an iterator yielding *Entry. |
| // |
| // - Delete(db, batch, kind, key) deletes an entry. |
| // |
| // - DeleteRange(db, batch, kind, start, end) deletes a range of entries. |
| // |
| // In addition to these operations, the time index enables one new operation: |
| // |
| // - After(db, kind, dbtime) returns an iterator yielding entries |
| // updated after dbtime, ordered by time of update. |
| // |
| // In typical usage, a client stores the latest e.DBTime it has observed |
| // and then uses that value in a future call to After to find only |
| // recent entries. The [Watcher] encapsulates that pattern and |
| // adds mutual exclusion so that multiple processes or goroutines |
| // using the same Watcher will not run concurrently. |
| package timed |
| |
| import ( |
| "iter" |
| "log/slog" |
| "sync/atomic" |
| "time" |
| |
| "golang.org/x/oscar/internal/storage" |
| "rsc.io/ordered" |
| ) |
| |
| // A DBTime is an opaque timestamp representing the time when |
| // a database entry was last set. Timestamps increase monotonically: |
| // comparing two times from entries of the same kind indicates |
| // which entry was written first. |
| // Otherwise, timestamps are opaque and have no specific meaning. |
| type DBTime int64 |
| |
| var lastTime atomic.Int64 |
| |
| // now returns the current DBTime. |
| // The implementation assumes accurate time-keeping on the systems where it runs, |
| // so that if the program is restarted, the new instance will not see times before |
| // the ones the old instance did. |
| // |
| // Packages storing information in the database can create separate |
| // indexes by DBTime to enable incremental processing of new data. |
| func now() DBTime { |
| for { |
| old := lastTime.Load() |
| t := time.Now().UnixNano() |
| if t <= old { |
| t = old + 1 |
| } |
| if lastTime.CompareAndSwap(old, t) { |
| return DBTime(t) |
| } |
| } |
| } |
| |
| // An Entry is a single entry written to the time-indexed storage. |
| type Entry struct { |
| ModTime DBTime // time entry was written |
| Kind string // key namespace |
| Key []byte // key |
| Val []byte // value |
| } |
| |
| // Set adds to b the database updates to set (kind, key) → val, |
| // including updating the time index. |
| // It returns the DBTime associated with the entry. |
| func Set(db storage.DB, b storage.Batch, kind string, key, val []byte) DBTime { |
| t := now() |
| dkey := append(ordered.Encode(kind), key...) |
| if old, ok := db.Get(dkey); ok { |
| var oldT int64 |
| if _, err := ordered.DecodePrefix(old, &oldT); err != nil { |
| // unreachable unless corrupt storage |
| db.Panic("timed.Set decode old", "dkey", storage.Fmt(dkey), "old", storage.Fmt(old), "err", err) |
| } |
| b.Delete(append(ordered.Encode(kind+"ByTime", oldT), key...)) |
| } |
| b.Set(append(ordered.Encode(kind+"ByTime", int64(t)), key...), nil) |
| b.Set(dkey, append(ordered.Encode(int64(t)), val...)) |
| return t |
| } |
| |
| // Delete adds to b the database updates to delete the value corresponding to (kind, key), if any. |
| func Delete(db storage.DB, b storage.Batch, kind string, key []byte) { |
| dkey := append(ordered.Encode(kind), key...) |
| dval, ok := db.Get(dkey) |
| if !ok { |
| return |
| } |
| var t int64 |
| if _, err := ordered.DecodePrefix(dval, &t); err != nil { |
| // unreachable unless corrupt storage |
| db.Panic("timed.Delete decode dval", "dkey", storage.Fmt(dkey), "dval", storage.Fmt(dval), "err", err) |
| } |
| b.Delete(dkey) |
| b.Delete(append(ordered.Encode(kind+"ByTime", t), key...)) |
| } |
| |
| // Get retrieves the value corresponding to (kind, key). |
| func Get(db storage.DB, kind string, key []byte) (*Entry, bool) { |
| dkey := append(ordered.Encode(kind), key...) |
| dval, ok := db.Get(dkey) |
| if !ok { |
| return nil, false |
| } |
| var t int64 |
| val, err := ordered.DecodePrefix(dval, &t) |
| if err != nil { |
| // unreachable unless corrupt storage |
| db.Panic("GetTimed decode", "dkey", storage.Fmt(dkey), "dval", storage.Fmt(dval), "err", err) |
| } |
| return &Entry{DBTime(t), kind, key, val}, true |
| } |
| |
| // Scan returns an iterator over entries (kind, key) → val with start ≤ key ≤ end. |
| func Scan(db storage.DB, kind string, start, end []byte) iter.Seq[*Entry] { |
| dstart := append(ordered.Encode(kind), start...) |
| dend := append(ordered.Encode(kind), end...) |
| return func(yield func(*Entry) bool) { |
| for dkey, dvalf := range db.Scan(dstart, dend) { |
| dval := dvalf() |
| key, err := ordered.DecodePrefix(dkey, nil) // drop kind |
| if err != nil { |
| // unreachable unless corrupt storage |
| db.Panic("ScanTimed decode", "dkey", storage.Fmt(dkey), "err", err) |
| } |
| var t int64 |
| val, err := ordered.DecodePrefix(dval, &t) |
| if err != nil { |
| // unreachable unless corrupt storage |
| db.Panic("ScanTimed decode", "dkey", storage.Fmt(dkey), "dval", storage.Fmt(dval), "err", err) |
| } |
| if !yield(&Entry{DBTime(t), kind, key, val}) { |
| return |
| } |
| } |
| } |
| } |
| |
| // DeleteRange adds to b the database updates to delete all entries |
| // (kind, key) → val with start ≤ key ≤ end. |
| // To allow deleting an arbitrarily large range, DeleteRange calls |
| // b.MaybeApply after adding the updates for each (kind, key). |
| // The effect is that a large range deletion may not be applied atomically to db. |
| // The caller still needs to use b.Apply to apply the final updates |
| // (or, for small ranges, all of them). |
| func DeleteRange(db storage.DB, b storage.Batch, kind string, start, end []byte) { |
| for e := range Scan(db, kind, start, end) { |
| b.Delete(append(ordered.Encode(kind), e.Key...)) |
| b.Delete(append(ordered.Encode(kind+"ByTime", int64(e.ModTime)), e.Key...)) |
| b.MaybeApply() |
| } |
| } |
| |
| // ScanAfter returns an iterator over entries in the database |
| // of the given kind that were set after DBTime t. |
| // If filter is non-nil, ScanAfter omits entries for which filter(e.Key) returns false |
| // and avoids the overhead of loading e.Val for those entries. |
| func ScanAfter(db storage.DB, kind string, t DBTime, filter func(key []byte) bool) iter.Seq[*Entry] { |
| return func(yield func(*Entry) bool) { |
| start, end := ordered.Encode(kind+"ByTime", int64(t+1)), ordered.Encode(kind+"ByTime", ordered.Inf) |
| for tkey := range db.Scan(start, end) { |
| var t int64 |
| key, err := ordered.DecodePrefix(tkey, nil, &t) // drop kind |
| if err != nil { |
| // unreachable unless corrupt storage |
| db.Panic("storage.After decode", "tkey", storage.Fmt(tkey), "err", err) |
| } |
| if filter != nil && !filter(key) { |
| continue |
| } |
| dkey := append(ordered.Encode(kind), key...) |
| dval, ok := db.Get(dkey) |
| if !ok { |
| // Stale entries might happen if Set is called multiple times |
| // for the same key in a single batch, along with a later Delete, |
| // or Set+Delete in a single batch. Ignore. |
| slog.Info("timed stale missing", "tkey", storage.Fmt(tkey), "dkey", storage.Fmt(dkey)) |
| continue |
| } |
| var t2 int64 |
| val, err := ordered.DecodePrefix(dval, &t2) |
| if err != nil { |
| // unreachable unless corrupt storage |
| db.Panic("storage.After val decode", "key", storage.Fmt(key), "dkey", storage.Fmt(dkey), "val", storage.Fmt(val), "err", err) |
| } |
| if t < t2 { |
| // Stale entries might happen if Set is called multiple times |
| // for the same key in a single batch. |
| // The second Set will not see the first one's time entry to delete it. |
| // These should be rare. |
| // Skip this one and wait until we see the index entry for t2. |
| slog.Info("timed stale out of order", "tkey", storage.Fmt(tkey), "dkey", storage.Fmt(dkey), "dval-time", t2) |
| continue |
| } |
| if t > t2 { |
| // unreachable unless corruption: |
| // new index entries with old data should not happen. |
| db.Panic("timed.ScanAfter mismatch", "tkey", storage.Fmt(tkey), "dkey", storage.Fmt(dkey), "dval", storage.Fmt(dval)) |
| } |
| if !yield(&Entry{DBTime(t), kind, key, val}) { |
| return |
| } |
| } |
| } |
| } |
| |
| // A Watcher is a named cursor over recently modified time-stamped key-value pairs |
| // (written using [Set]). |
| // The state of the cursor is stored in the underlying database so that |
| // it persists across program restarts and is shared by all clients of the |
| // database. |
| // |
| // Across all Watchers with the same db, kind, and name, database locks |
| // are used to ensure that only one at a time can iterate over [Watcher.Recent]. |
| // This provides coordination when multiple instances of a program that |
| // notice there is work to be done and more importantly avoids those |
| // instances conflicting with each other. |
| // |
| // The Watcher's state (most recent dbtime marked old) |
| // is stored in the underlying database using the key |
| // ordered.Encode(kind+"Watcher", name), |
| // and while a Watcher is iterating, it locks a database lock |
| // with the same name as that key. |
| type Watcher[T any] struct { |
| db storage.DB |
| dkey []byte |
| kind string |
| decode func(*Entry) T |
| locked atomic.Bool |
| latest atomic.Int64 // highest known DBTime marked old, for fast retrieval by metrics |
| } |
| |
| // NewWatcher returns a new named Watcher reading keys of the given kind from db. |
| // Use [Watcher.Recent] to iterate over recent entries. |
| // |
| // The name distinguishes this Watcher from other Watchers watching the same kind of keys |
| // for different purposes. |
| // |
| // The Watcher applies decode(e) to each time-stamped Entry to obtain the T returned |
| // in the iteration. |
| func NewWatcher[T any](db storage.DB, name, kind string, decode func(*Entry) T) *Watcher[T] { |
| return &Watcher[T]{ |
| db: db, |
| dkey: ordered.Encode(kind+"Watcher", name), |
| kind: kind, |
| decode: decode, |
| } |
| } |
| |
| func (w *Watcher[T]) lock() { |
| if w.locked.Load() { |
| w.db.Panic("timed.Watcher already locked") |
| } |
| w.db.Lock(string(w.dkey)) |
| w.locked.Store(true) |
| } |
| |
| func (w *Watcher[T]) unlock() { |
| if !w.locked.Load() { |
| w.db.Panic("timed.Watcher not locked") |
| } |
| w.db.Unlock(string(w.dkey)) |
| w.locked.Store(false) |
| } |
| |
| func (w *Watcher[T]) cutoff() DBTime { |
| if !w.locked.Load() { |
| // unreachable unless called wrong in this file |
| w.db.Panic("timed.Watcher not locked") |
| } |
| var t int64 |
| if dval, ok := w.db.Get(w.dkey); ok { |
| if err := ordered.Decode(dval, &t); err != nil { |
| // unreachable unless corrupt storage |
| w.db.Panic("watcher decode", "dval", storage.Fmt(dval), "err", err) |
| } |
| } |
| if w.latest.Load() < t { |
| w.latest.Store(t) |
| } |
| return DBTime(t) |
| } |
| |
| // Recent returns an iterator over recent entries, |
| // meaning those entries set after the last time recorded using [Watcher.MarkOld]. |
| // The events are ordered by the time they were last [Set] (that is, by ModTime). |
| // |
| // When the iterator is invoked or ranged over, it acquires a database lock |
| // corresponding to (db, name, kind), to prevent other racing instances of |
| // the same Watcher from visiting the same entries. |
| // It releases the lock automatically when the iteration ends or is stopped. |
| // |
| // The iterator must be used from only a single goroutine at a time, |
| // or else it will panic reporting “Watcher already locked”. |
| // This means that while an iterator can be used multiple times in |
| // sequence, it cannot be used from multiple goroutines, |
| // nor can a new iteration be started inside an existing one. |
| // (If a different process holds the lock, the iterator will wait for that process. |
| // The in-process lock check aims to diagnose simple deadlocks.) |
| func (w *Watcher[T]) Recent() iter.Seq[T] { |
| return func(yield func(T) bool) { |
| w.lock() |
| defer func() { |
| w.Flush() |
| w.unlock() |
| }() |
| |
| for t := range ScanAfter(w.db, w.kind, w.cutoff(), nil) { |
| if !yield(w.decode(t)) { |
| return |
| } |
| } |
| } |
| } |
| |
| // Restart resets the event watcher so that the next iteration over new events |
| // will start at the earliest possible event. |
| // In effect, Restart undoes all previous calls to MarkOld. |
| // Restart must be not be called during an iteration. |
| func (w *Watcher[T]) Restart() { |
| w.lock() |
| defer w.unlock() |
| |
| w.db.Delete(w.dkey) |
| } |
| |
| // MarkOld marks entries at or before t as “old”, |
| // meaning they will no longer be iterated over by [Watcher.Recent]. |
| // A future call to Recent, perhaps even on a different Watcher |
| // with the same configuration, |
| // will start its iteration starting immediately after time t. |
| // |
| // If a newer time t has already been marked “old” in this watcher, |
| // then MarkOld(t) is a no-op. |
| // |
| // MarkOld must only be called during an iteration over Recent, |
| // so that the database lock corresponding to this Watcher is held. |
| // In the case of a process crash before the iteration completes, |
| // the effect of MarkOld may be lost. |
| // Calling [Watcher.Flush] forces the state change to the underlying database. |
| func (w *Watcher[T]) MarkOld(t DBTime) { |
| if !w.locked.Load() { |
| w.db.Panic("timed.Watcher.MarkOld unlocked") |
| } |
| if t <= w.cutoff() { |
| return |
| } |
| w.db.Set(w.dkey, ordered.Encode(int64(t))) |
| w.latest.Store(int64(t)) |
| } |
| |
| // Flush flushes the definition of recent (changed by MarkOld) to the database. |
| // Flush is called automatically at the end of an iteration, |
| // but it can be called explicitly during a long iteration as well. |
| func (w *Watcher[T]) Flush() { |
| w.db.Flush() |
| } |
| |
| // Latest returns the latest known DBTime marked old by the Watcher. |
| // It does not require the lock to be held. |
| // |
| // Latest may return a stale value. In particular, it will return |
| // 0 until the first call to [Watcher.Recent] or [Watcher.MarkOld] in |
| // the process. |
| func (w *Watcher[T]) Latest() DBTime { |
| return DBTime(w.latest.Load()) |
| } |