| // Copyright 2022 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package relui |
| |
| import ( |
| "context" |
| "database/sql" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "log" |
| "strings" |
| "time" |
| |
| "github.com/jackc/pgx/v4" |
| "github.com/robfig/cron/v3" |
| "golang.org/x/build/internal/relui/db" |
| "golang.org/x/exp/slices" |
| ) |
| |
| // ScheduleType determines whether a workflow runs immediately or on |
| // some future date or cadence. |
| type ScheduleType string |
| |
| // ElementID returns a string suitable for a HTML element ID. |
| func (s ScheduleType) ElementID() string { |
| return strings.ReplaceAll(string(s), " ", "") |
| } |
| |
| // FormField returns a string representing which datatype to present |
| // the user on the creation form. |
| func (s ScheduleType) FormField() string { |
| switch s { |
| case ScheduleCron: |
| return "cron" |
| case ScheduleOnce: |
| return "datetime-local" |
| } |
| return "" |
| } |
| |
| const ( |
| ScheduleImmediate ScheduleType = "Immediate" |
| ScheduleOnce ScheduleType = "Future Date" |
| ScheduleCron ScheduleType = "Cron" |
| ) |
| |
| var ( |
| ScheduleTypes = []ScheduleType{ScheduleImmediate, ScheduleOnce, ScheduleCron} |
| ) |
| |
| // Schedule represents the interval on which a job should be run. Only |
| // Type and one other field should be set. |
| type Schedule struct { |
| Once time.Time |
| Cron string |
| Type ScheduleType |
| } |
| |
| func (s Schedule) Parse() (cron.Schedule, error) { |
| if err := s.Valid(); err != nil { |
| return nil, err |
| } |
| switch s.Type { |
| case ScheduleOnce: |
| return &RunOnce{next: s.Once}, nil |
| case ScheduleCron: |
| return cron.ParseStandard(s.Cron) |
| } |
| return nil, fmt.Errorf("unschedulable Schedule.Type %q", s.Type) |
| } |
| |
| func (s Schedule) Valid() error { |
| switch s.Type { |
| case ScheduleOnce: |
| if s.Once.IsZero() { |
| return fmt.Errorf("time not set for %q", ScheduleOnce) |
| } |
| return nil |
| case ScheduleCron: |
| _, err := cron.ParseStandard(s.Cron) |
| return err |
| case ScheduleImmediate: |
| return nil |
| } |
| return fmt.Errorf("invalid ScheduleType %q", s.Type) |
| } |
| |
| func (s *Schedule) setType() { |
| switch { |
| case !s.Once.IsZero(): |
| s.Type = ScheduleOnce |
| case s.Cron != "": |
| s.Type = ScheduleCron |
| default: |
| s.Type = ScheduleImmediate |
| } |
| } |
| |
| // NewScheduler returns a Scheduler ready to run jobs. |
| func NewScheduler(db db.PGDBTX, w *Worker) *Scheduler { |
| c := cron.New() |
| c.Start() |
| return &Scheduler{ |
| w: w, |
| cron: c, |
| db: db, |
| } |
| } |
| |
| type Scheduler struct { |
| w *Worker |
| cron *cron.Cron |
| db db.PGDBTX |
| } |
| |
| // Create schedules a job and records it in the database. |
| func (s *Scheduler) Create(ctx context.Context, sched Schedule, workflowName string, params map[string]any) (row db.Schedule, err error) { |
| def := s.w.dh.Definition(workflowName) |
| if def == nil { |
| return row, fmt.Errorf("no workflow named %q", workflowName) |
| } |
| m, err := json.Marshal(params) |
| if err != nil { |
| return row, err |
| } |
| // Validate parameters against workflow definition before enqueuing. |
| params, err = UnmarshalWorkflow(string(m), def) |
| if err != nil { |
| return row, err |
| } |
| cronSched, err := sched.Parse() |
| if err != nil { |
| return row, err |
| } |
| err = s.db.BeginFunc(ctx, func(tx pgx.Tx) error { |
| now := time.Now() |
| q := db.New(tx) |
| row, err = q.CreateSchedule(ctx, db.CreateScheduleParams{ |
| WorkflowName: workflowName, |
| WorkflowParams: sql.NullString{String: string(m), Valid: len(m) > 0}, |
| Once: sched.Once, |
| Spec: sched.Cron, |
| CreatedAt: now, |
| UpdatedAt: now, |
| }) |
| if err != nil { |
| return err |
| } |
| return nil |
| }) |
| s.cron.Schedule(cronSched, &WorkflowSchedule{Schedule: row, worker: s.w, Params: params}) |
| return row, err |
| } |
| |
| // Resume fetches schedules from the database and schedules them. |
| func (s *Scheduler) Resume(ctx context.Context) error { |
| q := db.New(s.db) |
| rows, err := q.Schedules(ctx) |
| if err != nil { |
| return err |
| } |
| for _, row := range rows { |
| def := s.w.dh.Definition(row.WorkflowName) |
| if def == nil { |
| log.Printf("Unable to schedule %q (schedule.id: %d): no definition found", row.WorkflowName, row.ID) |
| continue |
| } |
| params, err := UnmarshalWorkflow(row.WorkflowParams.String, def) |
| if err != nil { |
| log.Printf("Error in UnmarshalWorkflow(%q, %q) for schedule %d: %q", row.WorkflowParams.String, row.WorkflowName, row.ID, err) |
| continue |
| } |
| sched := Schedule{Once: row.Once, Cron: row.Spec} |
| sched.setType() |
| if sched.Type == ScheduleOnce && row.Once.Before(time.Now()) { |
| log.Printf("Skipping %q Schedule (schedule.id: %d): %q is in the past", sched.Type, row.ID, sched.Once.String()) |
| continue |
| } |
| |
| cronSched, err := sched.Parse() |
| if err != nil { |
| log.Printf("Unable to schedule %q (schedule.id %d): invalid Schedule: %q", row.WorkflowName, row.ID, err) |
| continue |
| } |
| |
| s.cron.Schedule(cronSched, &WorkflowSchedule{ |
| Schedule: row, |
| Params: params, |
| worker: s.w, |
| }) |
| } |
| return nil |
| } |
| |
| // Entries returns a slice of active jobs. |
| // |
| // Entries are filtered by workflowNames. An empty slice returns |
| // all entries. |
| func (s *Scheduler) Entries(workflowNames ...string) []ScheduleEntry { |
| q := db.New(s.db) |
| rows, err := q.SchedulesLastRun(context.Background()) |
| if err != nil { |
| log.Printf("q.SchedulesLastRun() = _, %q, wanted no error", err) |
| } |
| rowMap := make(map[int32]db.SchedulesLastRunRow) |
| for _, row := range rows { |
| rowMap[row.ID] = row |
| } |
| entries := s.cron.Entries() |
| ret := make([]ScheduleEntry, 0, len(entries)) |
| for _, e := range s.cron.Entries() { |
| entry := ScheduleEntry{Entry: e} |
| if len(workflowNames) != 0 && !slices.Contains(workflowNames, entry.WorkflowJob().Schedule.WorkflowName) { |
| continue |
| } |
| if row, ok := rowMap[entry.WorkflowJob().Schedule.ID]; ok { |
| entry.LastRun = row |
| } |
| ret = append(ret, entry) |
| } |
| return ret |
| } |
| |
| var ErrScheduleNotFound = errors.New("schedule not found") |
| |
| // Delete removes a schedule from the scheduler, preventing subsequent |
| // runs, and deletes the schedule from the database. |
| // |
| // Jobs in progress are not interrupted, but will be prevented from |
| // starting again. |
| func (s *Scheduler) Delete(ctx context.Context, id int) error { |
| entries := s.Entries() |
| i := slices.IndexFunc(entries, func(e ScheduleEntry) bool { return int(e.WorkflowJob().Schedule.ID) == id }) |
| if i == -1 { |
| return ErrScheduleNotFound |
| } |
| entry := entries[i] |
| s.cron.Remove(entry.ID) |
| return s.db.BeginFunc(ctx, func(tx pgx.Tx) error { |
| q := db.New(tx) |
| if _, err := q.ClearWorkflowSchedule(ctx, int32(id)); err != nil { |
| return err |
| } |
| if _, err := q.DeleteSchedule(ctx, int32(id)); err != nil { |
| return err |
| } |
| return nil |
| }) |
| } |
| |
| type ScheduleEntry struct { |
| cron.Entry |
| LastRun db.SchedulesLastRunRow |
| } |
| |
| // type ScheduleEntry cron.Entry |
| |
| // WorkflowJob returns a *WorkflowSchedule for the ScheduleEntry. |
| func (s *ScheduleEntry) WorkflowJob() *WorkflowSchedule { |
| return s.Job.(*WorkflowSchedule) |
| } |
| |
| // WorkflowSchedule represents the data needed to create a Workflow. |
| type WorkflowSchedule struct { |
| Schedule db.Schedule |
| Params map[string]any |
| worker *Worker |
| } |
| |
| // Run starts a Workflow. |
| func (w *WorkflowSchedule) Run() { |
| id, err := w.worker.StartWorkflow(context.Background(), w.Schedule.WorkflowName, w.Params, int(w.Schedule.ID)) |
| log.Printf("StartWorkflow(_, %q, %v, %d) = %q, %q", w.Schedule.WorkflowName, w.Params, w.Schedule.ID, id, err) |
| } |
| |
| // RunOnce is a cron.Schedule for running a job at a specific time. |
| type RunOnce struct { |
| next time.Time |
| } |
| |
| // Next returns the next time a job should run. |
| func (r *RunOnce) Next(t time.Time) time.Time { |
| if t.After(r.next) { |
| return time.Time{} |
| } |
| return r.next |
| } |
| |
| var _ cron.Schedule = &RunOnce{} |