internal/relui: create cron-based workflow schedules
Adds functionality to schedule Workflows using a standard cron-spec.
Updates resume to skip RunOnce schedules that are in the past.
For golang/go#54476
Change-Id: I30f5bb45353a88ca9398b8787ee7ea44ae96d7e1
Reviewed-on: https://go-review.googlesource.com/c/build/+/432397
Auto-Submit: Jenny Rakoczy <jenny@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Heschi Kreinick <heschi@google.com>
Run-TryBot: Jenny Rakoczy <jenny@golang.org>
diff --git a/internal/relui/schedule.go b/internal/relui/schedule.go
index ef03be3..ee2eb86 100644
--- a/internal/relui/schedule.go
+++ b/internal/relui/schedule.go
@@ -52,10 +52,49 @@
// Schedule represents the interval on which a job should be run. Only
// Type and one other field should be set.
type Schedule struct {
- Once time.Time
- Duration time.Duration
- Cron string
- Type ScheduleType
+ Once time.Time
+ Cron string
+ Type ScheduleType
+}
+
+func (s Schedule) Parse() (cron.Schedule, error) {
+ if err := s.Valid(); err != nil {
+ return nil, err
+ }
+ switch s.Type {
+ case ScheduleOnce:
+ return &RunOnce{next: s.Once}, nil
+ case ScheduleCron:
+ return cron.ParseStandard(s.Cron)
+ }
+ return nil, fmt.Errorf("unschedulable Schedule.Type %q", s.Type)
+}
+
+func (s Schedule) Valid() error {
+ switch s.Type {
+ case ScheduleOnce:
+ if s.Once.IsZero() {
+ return fmt.Errorf("time not set for %q", ScheduleOnce)
+ }
+ return nil
+ case ScheduleCron:
+ _, err := cron.ParseStandard(s.Cron)
+ return err
+ case ScheduleImmediate:
+ return nil
+ }
+ return fmt.Errorf("invalid ScheduleType %q", s.Type)
+}
+
+func (s *Schedule) setType() {
+ switch {
+ case !s.Once.IsZero():
+ s.Type = ScheduleOnce
+ case s.Cron != "":
+ s.Type = ScheduleCron
+ default:
+ s.Type = ScheduleImmediate
+ }
}
// NewScheduler returns a Scheduler ready to run jobs.
@@ -90,6 +129,10 @@
if err != nil {
return row, err
}
+ cronSched, err := sched.Parse()
+ if err != nil {
+ return row, err
+ }
err = s.db.BeginFunc(ctx, func(tx pgx.Tx) error {
now := time.Now()
q := db.New(tx)
@@ -97,6 +140,7 @@
WorkflowName: workflowName,
WorkflowParams: sql.NullString{String: string(m), Valid: len(m) > 0},
Once: sched.Once,
+ Spec: sched.Cron,
CreatedAt: now,
UpdatedAt: now,
})
@@ -105,30 +149,43 @@
}
return nil
})
- s.cron.Schedule(&RunOnce{next: sched.Once}, &WorkflowSchedule{Schedule: row, worker: s.w, Params: params})
+ s.cron.Schedule(cronSched, &WorkflowSchedule{Schedule: row, worker: s.w, Params: params})
return row, err
}
// Resume fetches schedules from the database and schedules them.
func (s *Scheduler) Resume(ctx context.Context) error {
q := db.New(s.db)
- scheds, err := q.Schedules(ctx)
+ rows, err := q.Schedules(ctx)
if err != nil {
return err
}
- for _, sched := range scheds {
- def := s.w.dh.Definition(sched.WorkflowName)
+ for _, row := range rows {
+ def := s.w.dh.Definition(row.WorkflowName)
if def == nil {
- log.Printf("Unable to schedule %q (schedule.id: %d): no definition found", sched.WorkflowName, sched.ID)
+ log.Printf("Unable to schedule %q (schedule.id: %d): no definition found", row.WorkflowName, row.ID)
continue
}
- params, err := UnmarshalWorkflow(sched.WorkflowParams.String, def)
+ params, err := UnmarshalWorkflow(row.WorkflowParams.String, def)
if err != nil {
- log.Printf("Error in UnmarshalWorkflow(%q, %q) for schedule %d: %q", sched.WorkflowParams.String, sched.WorkflowName, sched.ID, err)
+ log.Printf("Error in UnmarshalWorkflow(%q, %q) for schedule %d: %q", row.WorkflowParams.String, row.WorkflowName, row.ID, err)
continue
}
- s.cron.Schedule(&RunOnce{next: sched.Once}, &WorkflowSchedule{
- Schedule: sched,
+ sched := Schedule{Once: row.Once, Cron: row.Spec}
+ sched.setType()
+ if sched.Type == ScheduleOnce && row.Once.Before(time.Now()) {
+ log.Printf("Skipping %q Schedule (schedule.id: %d): %q is in the past", sched.Type, row.ID, sched.Once.String())
+ continue
+ }
+
+ cronSched, err := sched.Parse()
+ if err != nil {
+ log.Printf("Unable to schedule %q (schedule.id %d): invalid Schedule: %q", row.WorkflowName, row.ID, err)
+ continue
+ }
+
+ s.cron.Schedule(cronSched, &WorkflowSchedule{
+ Schedule: row,
Params: params,
worker: s.w,
})
diff --git a/internal/relui/schedule_test.go b/internal/relui/schedule_test.go
index 55e15a5..789c0cf 100644
--- a/internal/relui/schedule_test.go
+++ b/internal/relui/schedule_test.go
@@ -12,9 +12,19 @@
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
+ "github.com/robfig/cron/v3"
"golang.org/x/build/internal/relui/db"
)
+func mustParseSpec(t *testing.T, spec string) cron.Schedule {
+ t.Helper()
+ sched, err := cron.ParseStandard(spec)
+ if err != nil {
+ t.Fatalf("cron.ParseStandard(%q) = %q, wanted no error", spec, err)
+ }
+ return sched
+}
+
func TestSchedulerCreate(t *testing.T) {
now := time.Now()
cases := []struct {
@@ -28,7 +38,7 @@
}{
{
desc: "success: once",
- sched: Schedule{Once: now.AddDate(1, 0, 0)},
+ sched: Schedule{Once: now.AddDate(1, 0, 0), Type: ScheduleOnce},
workflowName: "echo",
params: map[string]any{"greeting": "hello", "farewell": "bye"},
want: db.Schedule{
@@ -61,6 +71,49 @@
},
},
},
+ {
+ desc: "success: cron",
+ sched: Schedule{Cron: "* * * * *", Type: ScheduleCron},
+ workflowName: "echo",
+ params: map[string]any{"greeting": "hello", "farewell": "bye"},
+ want: db.Schedule{
+ WorkflowName: "echo",
+ WorkflowParams: sql.NullString{
+ String: `{"farewell": "bye", "greeting": "hello"}`,
+ Valid: true,
+ },
+ Spec: "* * * * *",
+ CreatedAt: now,
+ UpdatedAt: now,
+ },
+ wantEntries: []ScheduleEntry{
+ {
+ Schedule: mustParseSpec(t, "* * * * *"),
+ Next: now.Add(time.Minute),
+ Job: &WorkflowSchedule{
+ Schedule: db.Schedule{
+ WorkflowName: "echo",
+ WorkflowParams: sql.NullString{
+ String: `{"farewell": "bye", "greeting": "hello"}`,
+ Valid: true,
+ },
+ Spec: "* * * * *",
+ CreatedAt: now,
+ UpdatedAt: now,
+ },
+ Params: map[string]any{"greeting": "hello", "farewell": "bye"},
+ },
+ },
+ },
+ },
+ {
+ desc: "error: invalid Schedule",
+ sched: Schedule{Type: ScheduleImmediate},
+ workflowName: "echo",
+ params: map[string]any{"greeting": "hello", "farewell": "bye"},
+ wantErr: true,
+ wantEntries: []ScheduleEntry{},
+ },
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
@@ -69,8 +122,8 @@
p := testDB(ctx, t)
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}))
row, err := s.Create(ctx, c.sched, c.workflowName, c.params)
- if err != nil {
- t.Fatalf("s.Create(_, %v, %q, %v) = %v, %v, wanted no error", c.sched, c.workflowName, c.params, row, err)
+ if (err != nil) != c.wantErr {
+ t.Fatalf("s.Create(_, %v, %q, %v) = %v, %v, wantErr: %t", c.sched, c.workflowName, c.params, row, err, c.wantErr)
}
if diff := cmp.Diff(c.want, row, cmpopts.EquateApproxTime(time.Minute), cmpopts.IgnoreFields(db.Schedule{}, "ID")); diff != "" {
t.Fatalf("s.Create() mismatch (-want +got):\n%s", diff)
@@ -80,7 +133,7 @@
diffOpts := []cmp.Option{
cmpopts.EquateApproxTime(time.Minute),
cmpopts.IgnoreFields(db.Schedule{}, "ID"),
- cmpopts.IgnoreUnexported(RunOnce{}, WorkflowSchedule{}),
+ cmpopts.IgnoreUnexported(RunOnce{}, WorkflowSchedule{}, time.Location{}),
cmpopts.IgnoreFields(ScheduleEntry{}, "ID", "WrappedJob"),
}
if diff := cmp.Diff(c.wantEntries, got, diffOpts...); diff != "" {
@@ -133,6 +186,56 @@
},
},
},
+ {
+ desc: "success: cron",
+ scheds: []db.CreateScheduleParams{
+ {
+ WorkflowName: "echo",
+ WorkflowParams: sql.NullString{
+ String: `{"farewell": "bye", "greeting": "hello"}`,
+ Valid: true,
+ },
+ Spec: "* * * * *",
+ CreatedAt: now,
+ UpdatedAt: now,
+ },
+ },
+ want: []ScheduleEntry{
+ {
+ Schedule: mustParseSpec(t, "* * * * *"),
+ Next: now.Add(time.Minute),
+ Job: &WorkflowSchedule{
+ Schedule: db.Schedule{
+ WorkflowName: "echo",
+ WorkflowParams: sql.NullString{
+ String: `{"farewell": "bye", "greeting": "hello"}`,
+ Valid: true,
+ },
+ Spec: "* * * * *",
+ CreatedAt: now,
+ UpdatedAt: now,
+ },
+ Params: map[string]any{"greeting": "hello", "farewell": "bye"},
+ },
+ },
+ },
+ },
+ {
+ desc: "skip past RunOnce schedules",
+ scheds: []db.CreateScheduleParams{
+ {
+ WorkflowName: "echo",
+ WorkflowParams: sql.NullString{
+ String: `{"farewell": "bye", "greeting": "hello"}`,
+ Valid: true,
+ },
+ Once: time.Now().AddDate(-1, 0, 0),
+ CreatedAt: now,
+ UpdatedAt: now,
+ },
+ },
+ want: []ScheduleEntry{},
+ },
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
@@ -155,7 +258,7 @@
diffOpts := []cmp.Option{
cmpopts.EquateApproxTime(time.Minute),
cmpopts.IgnoreFields(db.Schedule{}, "ID"),
- cmpopts.IgnoreUnexported(RunOnce{}, WorkflowSchedule{}),
+ cmpopts.IgnoreUnexported(RunOnce{}, WorkflowSchedule{}, time.Location{}),
cmpopts.IgnoreFields(ScheduleEntry{}, "ID", "WrappedJob"),
}
if diff := cmp.Diff(c.want, got, diffOpts...); diff != "" {
diff --git a/internal/relui/templates/new_workflow.html b/internal/relui/templates/new_workflow.html
index 8d21ec3..956d6d8 100644
--- a/internal/relui/templates/new_workflow.html
+++ b/internal/relui/templates/new_workflow.html
@@ -60,7 +60,7 @@
<div class="NewWorkflow-parameter">
<label for="workflow.schedule.cron">Run on a cron schedule (minute hour day-of-month month day-of-week):</label>
<input type="text" id="workflow.schedule.cron" name="workflow.schedule.cron" placeholder="* * * * *" title="Valid Cron-syntax string"
- pattern="/(\S+ \S+ \S+ \S+ \S+ *)|@(hourly|daily|weekly|monthly|yearly|annually|midnight)/"/>
+ pattern="(\S+ \S+ \S+ \S+ \S+ *)|@(hourly|daily|weekly|monthly|yearly|annually|midnight)"/>
</div>
{{else}}
<div class="NewWorkflow-parameter">
diff --git a/internal/relui/web.go b/internal/relui/web.go
index 9e8fe43..5a240d6 100644
--- a/internal/relui/web.go
+++ b/internal/relui/web.go
@@ -376,14 +376,24 @@
return
}
}
- if r.FormValue("workflow.schedule") == string(ScheduleOnce) {
- t, err := time.ParseInLocation(DatetimeLocalLayout, r.FormValue("workflow.schedule.datetime"), time.UTC)
- if err != nil || t.Before(time.Now()) {
- http.Error(w, fmt.Sprintf("parameter %q parsing error: %v", "workflow.schedule.datetime", err), http.StatusBadRequest)
+ sched := Schedule{Type: ScheduleType(r.FormValue("workflow.schedule"))}
+ if sched.Type != ScheduleImmediate {
+ switch sched.Type {
+ case ScheduleOnce:
+ t, err := time.ParseInLocation(DatetimeLocalLayout, r.FormValue("workflow.schedule.datetime"), time.UTC)
+ if err != nil || t.Before(time.Now()) {
+ http.Error(w, fmt.Sprintf("parameter %q parsing error: %v", "workflow.schedule.datetime", err), http.StatusBadRequest)
+ return
+ }
+ sched.Once = t
+ case ScheduleCron:
+ sched.Cron = r.FormValue("workflow.schedule.cron")
+ }
+ if err := sched.Valid(); err != nil {
+ http.Error(w, fmt.Sprintf("parameter %q parsing error: %v", "workflow.schedule", err), http.StatusBadRequest)
return
}
- _, err = s.scheduler.Create(r.Context(), Schedule{Once: t}, name, params)
- if err != nil {
+ if _, err := s.scheduler.Create(r.Context(), sched, name, params); err != nil {
http.Error(w, fmt.Sprintf("failed to create schedule: %v", err), http.StatusInternalServerError)
return
}
diff --git a/internal/relui/web_test.go b/internal/relui/web_test.go
index 1682491..1382674 100644
--- a/internal/relui/web_test.go
+++ b/internal/relui/web_test.go
@@ -10,6 +10,7 @@
"embed"
"flag"
"fmt"
+ "io"
"io/ioutil"
"log"
"net/http"
@@ -187,13 +188,19 @@
wantCode: http.StatusBadRequest,
},
{
- desc: "invalid workflow name",
- params: url.Values{"workflow.name": []string{"invalid"}},
+ desc: "invalid workflow name",
+ params: url.Values{
+ "workflow.name": []string{"invalid"},
+ "workflow.schedule": []string{string(ScheduleImmediate)},
+ },
wantCode: http.StatusBadRequest,
},
{
- desc: "missing workflow params",
- params: url.Values{"workflow.name": []string{"echo"}},
+ desc: "missing workflow params",
+ params: url.Values{
+ "workflow.name": []string{"echo"},
+ "workflow.schedule": []string{string(ScheduleImmediate)},
+ },
wantCode: http.StatusBadRequest,
},
{
@@ -202,6 +209,7 @@
"workflow.name": []string{"echo"},
"workflow.params.greeting": []string{"hello"},
"workflow.params.farewell": []string{"bye"},
+ "workflow.schedule": []string{string(ScheduleImmediate)},
},
wantCode: http.StatusSeeOther,
wantWorkflows: []db.Workflow{
@@ -255,6 +263,26 @@
},
},
},
+ {
+ desc: "successful creation: schedule cron",
+ params: url.Values{
+ "workflow.name": []string{"echo"},
+ "workflow.params.greeting": []string{"hello"},
+ "workflow.params.farewell": []string{"bye"},
+ "workflow.schedule": []string{string(ScheduleCron)},
+ "workflow.schedule.cron": []string{"0 0 1 1 0"},
+ },
+ wantCode: http.StatusSeeOther,
+ wantSchedules: []db.Schedule{
+ {
+ WorkflowName: "echo",
+ WorkflowParams: nullString(`{"farewell": "bye", "greeting": "hello"}`),
+ Spec: "0 0 1 1 0",
+ CreatedAt: now, // cmpopts.EquateApproxTime
+ UpdatedAt: now, // cmpopts.EquateApproxTime
+ },
+ },
+ },
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
@@ -270,6 +298,10 @@
if resp.StatusCode != c.wantCode {
t.Errorf("rep.StatusCode = %d, wanted %d", resp.StatusCode, c.wantCode)
+ if resp.StatusCode == http.StatusBadRequest {
+ b, _ := io.ReadAll(resp.Body)
+ t.Logf("resp.Body: \n%v", string(b))
+ }
}
if c.wantCode == http.StatusBadRequest {
return