blob: 9ffa0ea24f38b382c390a12077f8b12dbe2664ed [file] [log] [blame]
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package relui
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/jackc/pgx/v4"
"github.com/robfig/cron/v3"
"golang.org/x/build/internal/relui/db"
"golang.org/x/exp/slices"
)
// ScheduleType determines whether a workflow runs immediately or on
// some future date or cadence.
type ScheduleType string
// ElementID returns a string suitable for a HTML element ID.
func (s ScheduleType) ElementID() string {
return strings.ReplaceAll(string(s), " ", "")
}
// FormField returns a string representing which datatype to present
// the user on the creation form.
func (s ScheduleType) FormField() string {
switch s {
case ScheduleCron:
return "cron"
case ScheduleOnce:
return "datetime-local"
}
return ""
}
const (
ScheduleImmediate ScheduleType = "Immediate"
ScheduleOnce ScheduleType = "Future Date"
ScheduleCron ScheduleType = "Cron"
)
var (
ScheduleTypes = []ScheduleType{ScheduleImmediate, ScheduleOnce, ScheduleCron}
)
// Schedule represents the interval on which a job should be run. Only
// Type and one other field should be set.
type Schedule struct {
Once time.Time
Cron string
Type ScheduleType
}
func (s Schedule) Parse() (cron.Schedule, error) {
if err := s.Valid(); err != nil {
return nil, err
}
switch s.Type {
case ScheduleOnce:
return &RunOnce{next: s.Once}, nil
case ScheduleCron:
return cron.ParseStandard(s.Cron)
}
return nil, fmt.Errorf("unschedulable Schedule.Type %q", s.Type)
}
func (s Schedule) Valid() error {
switch s.Type {
case ScheduleOnce:
if s.Once.IsZero() {
return fmt.Errorf("time not set for %q", ScheduleOnce)
}
return nil
case ScheduleCron:
_, err := cron.ParseStandard(s.Cron)
return err
case ScheduleImmediate:
return nil
}
return fmt.Errorf("invalid ScheduleType %q", s.Type)
}
func (s *Schedule) setType() {
switch {
case !s.Once.IsZero():
s.Type = ScheduleOnce
case s.Cron != "":
s.Type = ScheduleCron
default:
s.Type = ScheduleImmediate
}
}
// NewScheduler returns a Scheduler ready to run jobs.
func NewScheduler(db db.PGDBTX, w *Worker) *Scheduler {
c := cron.New()
c.Start()
return &Scheduler{
w: w,
cron: c,
db: db,
}
}
type Scheduler struct {
w *Worker
cron *cron.Cron
db db.PGDBTX
// failed is populated by Resume with schedules that
// failed to resume due to a change in workflow definition.
failedMu sync.Mutex
failed []FailedToScheduleEntry
}
// Create schedules a job and records it in the database.
func (s *Scheduler) Create(ctx context.Context, sched Schedule, workflowName string, params map[string]any) (row db.Schedule, err error) {
def := s.w.dh.Definition(workflowName)
if def == nil {
return row, fmt.Errorf("no workflow named %q", workflowName)
}
m, err := json.Marshal(params)
if err != nil {
return row, err
}
// Validate parameters against workflow definition before enqueuing.
params, err = UnmarshalWorkflow(string(m), def)
if err != nil {
return row, err
}
cronSched, err := sched.Parse()
if err != nil {
return row, err
}
err = s.db.BeginFunc(ctx, func(tx pgx.Tx) error {
now := time.Now()
q := db.New(tx)
row, err = q.CreateSchedule(ctx, db.CreateScheduleParams{
WorkflowName: workflowName,
WorkflowParams: sql.NullString{String: string(m), Valid: len(m) > 0},
Once: sched.Once,
Spec: sched.Cron,
CreatedAt: now,
UpdatedAt: now,
})
if err != nil {
return err
}
return nil
})
s.cron.Schedule(cronSched, &WorkflowSchedule{Schedule: row, worker: s.w, Params: params})
return row, err
}
// Resume fetches schedules from the database and schedules them.
func (s *Scheduler) Resume(ctx context.Context) error {
q := db.New(s.db)
rows, err := q.Schedules(ctx)
if err != nil {
return err
}
for _, row := range rows {
def := s.w.dh.Definition(row.WorkflowName)
if def == nil {
log.Printf("Unable to schedule %q (schedule.id: %d): no definition found", row.WorkflowName, row.ID)
continue
}
sched := Schedule{Once: row.Once, Cron: row.Spec}
sched.setType()
if sched.Type == ScheduleOnce && row.Once.Before(time.Now()) {
log.Printf("Skipping %q Schedule (schedule.id: %d): %q is in the past", sched.Type, row.ID, sched.Once.String())
continue
}
cronSched, err := sched.Parse()
if err != nil {
log.Printf("Unable to schedule %q (schedule.id %d): invalid Schedule: %q", row.WorkflowName, row.ID, err)
continue
}
params, err := UnmarshalWorkflow(row.WorkflowParams.String, def)
if err != nil {
log.Printf("Error in UnmarshalWorkflow(%q, %q) for schedule %d: %q", row.WorkflowParams.String, row.WorkflowName, row.ID, err)
s.failed = append(s.failed, FailedToScheduleEntry{
WorkflowSchedule: WorkflowSchedule{Schedule: row},
ParsedSchedule: cronSched,
})
continue
}
s.cron.Schedule(cronSched, &WorkflowSchedule{
Schedule: row,
Params: params,
worker: s.w,
})
}
return nil
}
// Entries returns a list of scheduled jobs, and a list of jobs that failed to schedule.
//
// The scheduled jobs are filtered by workflowNames, if provided, otherwise all scheduled
// jobs are included.
func (s *Scheduler) Entries(ctx context.Context, workflowNames ...string) ([]ScheduleEntry, []FailedToScheduleEntry) {
q := db.New(s.db)
rows, err := q.SchedulesLastRun(ctx)
if err != nil {
log.Println("leaving out last runs of scheduled workflows because SchedulesLastRun failed:", err)
rows = nil
}
rowMap := make(map[int32]db.SchedulesLastRunRow)
for _, row := range rows {
rowMap[row.ID] = row
}
entries := s.cron.Entries()
ses := make([]ScheduleEntry, 0, len(entries))
for _, e := range s.cron.Entries() {
entry := ScheduleEntry{Entry: e}
if len(workflowNames) != 0 && !slices.Contains(workflowNames, entry.WorkflowJob().Schedule.WorkflowName) {
continue
}
if row, ok := rowMap[entry.WorkflowJob().Schedule.ID]; ok {
entry.LastRun = row
}
ses = append(ses, entry)
}
s.failedMu.Lock()
fes := slices.Clone(s.failed)
s.failedMu.Unlock()
for i, fe := range fes {
if row, ok := rowMap[fe.Schedule.ID]; ok {
fes[i].LastRun = row
}
}
return ses, fes
}
var ErrScheduleNotFound = errors.New("schedule not found")
// Delete removes a schedule from the scheduler, preventing subsequent
// runs, and deletes the schedule from the database.
//
// Jobs in progress are not interrupted, but will be prevented from
// starting again.
func (s *Scheduler) Delete(ctx context.Context, id int) error {
// Look for a schedule with the given id, which may have been either successfully or unsuccessfully scheduled,
// and delete it from memory.
entries := s.cron.Entries()
s.failedMu.Lock()
defer s.failedMu.Unlock()
i := slices.IndexFunc(entries, func(e cron.Entry) bool { return int(e.Job.(*WorkflowSchedule).Schedule.ID) == id })
j := slices.IndexFunc(s.failed, func(e FailedToScheduleEntry) bool { return int(e.Schedule.ID) == id })
switch {
case i >= 0 && j == -1:
entry := entries[i]
s.cron.Remove(entry.ID)
case j >= 0 && i == -1:
s.failed = slices.Delete(s.failed, j, j+1)
case i == -1 && j == -1:
// No such schedule found.
return ErrScheduleNotFound
default:
return fmt.Errorf("internal error: Scheduler.Delete: impossible case i == %d, j == %d", i, j)
}
// Next, delete the schedule with the specified id from the database.
return s.db.BeginFunc(ctx, func(tx pgx.Tx) error {
q := db.New(tx)
if _, err := q.ClearWorkflowSchedule(ctx, int32(id)); err != nil {
return err
}
if _, err := q.DeleteSchedule(ctx, int32(id)); err != nil {
return err
}
return nil
})
}
type ScheduleEntry struct {
// Entry is the cron entry.
//
// Its Job field holds a *WorkflowSchedule that describes the scheduled
// workflow and how to start running a new instance of it.
cron.Entry
LastRun db.SchedulesLastRunRow
}
type FailedToScheduleEntry struct {
WorkflowSchedule
ParsedSchedule cron.Schedule
LastRun db.SchedulesLastRunRow
}
func (e FailedToScheduleEntry) Next() time.Time { return e.ParsedSchedule.Next(time.Now()) }
// WorkflowJob returns a *WorkflowSchedule for the ScheduleEntry.
func (s *ScheduleEntry) WorkflowJob() *WorkflowSchedule {
return s.Job.(*WorkflowSchedule)
}
// WorkflowSchedule represents the data needed to create a Workflow.
type WorkflowSchedule struct {
Schedule db.Schedule
Params map[string]any
worker *Worker
}
// Run starts a Workflow.
func (w *WorkflowSchedule) Run() {
id, err := w.worker.StartWorkflow(context.Background(), w.Schedule.WorkflowName, w.Params, int(w.Schedule.ID))
log.Printf("StartWorkflow(_, %q, %v, %d) = %q, %q", w.Schedule.WorkflowName, w.Params, w.Schedule.ID, id, err)
}
// ScheduleDesc returns a description of the schedule.
func (w WorkflowSchedule) ScheduleDesc() string {
switch {
case !w.Schedule.Once.IsZero():
return "Run once on the future date."
case w.Schedule.Spec != "":
return fmt.Sprintf("Using the cron schedule %q.", w.Schedule.Spec)
default:
return ""
}
}
// ParamDesc returns a description of the parameters.
func (w WorkflowSchedule) ParamDesc() string {
if w.Params == nil {
// If parameters couldn't be unmarshaled, it's because the workflow definition changed.
// The best description we can offer are the originally marshaled parameters, indented.
var buf bytes.Buffer
err := json.Indent(&buf, []byte(w.Schedule.WorkflowParams.String), "", "\t")
if err != nil {
return "failed to indent workflow parameters: " + err.Error()
}
return buf.String()
}
b, err := json.MarshalIndent(w.Params, "", "\t")
if err != nil {
return "failed to marshal workflow parameters: " + err.Error()
}
return string(b)
}
// RunOnce is a cron.Schedule for running a job at a specific time.
type RunOnce struct {
next time.Time
}
// Next returns the next time a job should run.
func (r *RunOnce) Next(t time.Time) time.Time {
if t.After(r.next) {
return time.Time{}
}
return r.next
}
var _ cron.Schedule = &RunOnce{}