blob: 8c3d647fb017a97c3e8797c9b807f1fd0136fb03 [file] [log] [blame]
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package jobs
import (
"context"
"time"
"cloud.google.com/go/firestore"
"golang.org/x/pkgsite-metrics/internal/derrors"
"golang.org/x/pkgsite-metrics/internal/fstore"
"google.golang.org/api/iterator"
)
const jobCollection = "Jobs"
type DB struct {
ns *fstore.Namespace
}
// NewDB creates a new database client for jobs.
func NewDB(ctx context.Context, projectID, namespace string) (_ *DB, err error) {
ns, err := fstore.OpenNamespace(ctx, projectID, namespace)
if err != nil {
return nil, err
}
return &DB{ns}, nil
}
// CreateJob creates a new job. It returns an error if a job with the same ID already exists.
func (d *DB) CreateJob(ctx context.Context, j *Job) (err error) {
id := j.ID()
defer derrors.Wrap(&err, "job.DB.CreateJob(%s)", id)
_, err = d.jobRef(id).Create(ctx, j)
return err
}
// DeleteJob deletes the job with the given ID. It does not return an error if the job doesn't exist.
func (d *DB) DeleteJob(ctx context.Context, id string) (err error) {
defer derrors.Wrap(&err, "job.DB.DeleteJob(%s)", id)
_, err = d.jobRef(id).Delete(ctx)
return err
}
// GetJob retrieves the job with the given ID. It returns an error if the job does not exist.
func (d *DB) GetJob(ctx context.Context, id string) (_ *Job, err error) {
defer derrors.Wrap(&err, "job.DB.GetJob(%s)", id)
return fstore.Get[Job](ctx, d.jobRef(id))
}
// UpdateJob gets the job with the given ID, which must exist, then calls f on
// it, then writes it back to the database. These actions occur atomically.
// If f returns an error, that error is returned and no update occurs.
func (d *DB) UpdateJob(ctx context.Context, id string, f func(*Job) error) (err error) {
defer derrors.Wrap(&err, "job.DB.UpdateJob(%s)", id)
return d.ns.Client().RunTransaction(ctx, func(ctx context.Context, tx *firestore.Transaction) error {
docref := d.jobRef(id)
docsnap, err := tx.Get(docref)
if err != nil {
return err
}
j, err := fstore.Decode[Job](docsnap)
if err != nil {
return err
}
if err := f(j); err != nil {
return err
}
return tx.Set(docref, j)
},
firestore.MaxAttempts(firestore.DefaultTransactionMaxAttempts*5))
}
// Increment value named name by n.
func (d *DB) Increment(ctx context.Context, id, name string, n int) (err error) {
defer derrors.Wrap(&err, "job.DB.Increment(%s)", id)
docref := d.jobRef(id)
_, err = docref.Update(ctx, []firestore.Update{
{Path: name, Value: firestore.Increment(n)}, // name will incremented by n
})
return err
}
// ListJobs calls f on each job in the DB, most recently started first.
// f is also passed the time that the job was last updated.
// If f returns a non-nil error, the iteration stops and returns that error.
func (d *DB) ListJobs(ctx context.Context, f func(_ *Job, lastUpdate time.Time) error) (err error) {
defer derrors.Wrap(&err, "job.DB.ListJobs()")
q := d.ns.Collection(jobCollection).OrderBy("StartedAt", firestore.Desc)
iter := q.Documents(ctx)
defer iter.Stop()
for {
docsnap, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return err
}
job, err := fstore.Decode[Job](docsnap)
if err != nil {
return err
}
if err := f(job, docsnap.UpdateTime); err != nil {
return err
}
}
return nil
}
// jobRef returns the DocumentRef for a job with the given ID.
func (d *DB) jobRef(id string) *firestore.DocumentRef {
return d.ns.Collection(jobCollection).Doc(id)
}