internal/relui: create cron-based workflow schedules

Adds functionality to schedule Workflows using a standard cron-spec.
Updates resume to skip RunOnce schedules that are in the past.

For golang/go#54476

Change-Id: I30f5bb45353a88ca9398b8787ee7ea44ae96d7e1
Reviewed-on: https://go-review.googlesource.com/c/build/+/432397
Auto-Submit: Jenny Rakoczy <jenny@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Heschi Kreinick <heschi@google.com>
Run-TryBot: Jenny Rakoczy <jenny@golang.org>
diff --git a/internal/relui/schedule.go b/internal/relui/schedule.go
index ef03be3..ee2eb86 100644
--- a/internal/relui/schedule.go
+++ b/internal/relui/schedule.go
@@ -52,10 +52,49 @@
 // Schedule represents the interval on which a job should be run. Only
 // Type and one other field should be set.
 type Schedule struct {
-	Once     time.Time
-	Duration time.Duration
-	Cron     string
-	Type     ScheduleType
+	Once time.Time
+	Cron string
+	Type ScheduleType
+}
+
+func (s Schedule) Parse() (cron.Schedule, error) {
+	if err := s.Valid(); err != nil {
+		return nil, err
+	}
+	switch s.Type {
+	case ScheduleOnce:
+		return &RunOnce{next: s.Once}, nil
+	case ScheduleCron:
+		return cron.ParseStandard(s.Cron)
+	}
+	return nil, fmt.Errorf("unschedulable Schedule.Type %q", s.Type)
+}
+
+func (s Schedule) Valid() error {
+	switch s.Type {
+	case ScheduleOnce:
+		if s.Once.IsZero() {
+			return fmt.Errorf("time not set for %q", ScheduleOnce)
+		}
+		return nil
+	case ScheduleCron:
+		_, err := cron.ParseStandard(s.Cron)
+		return err
+	case ScheduleImmediate:
+		return nil
+	}
+	return fmt.Errorf("invalid ScheduleType %q", s.Type)
+}
+
+func (s *Schedule) setType() {
+	switch {
+	case !s.Once.IsZero():
+		s.Type = ScheduleOnce
+	case s.Cron != "":
+		s.Type = ScheduleCron
+	default:
+		s.Type = ScheduleImmediate
+	}
 }
 
 // NewScheduler returns a Scheduler ready to run jobs.
@@ -90,6 +129,10 @@
 	if err != nil {
 		return row, err
 	}
+	cronSched, err := sched.Parse()
+	if err != nil {
+		return row, err
+	}
 	err = s.db.BeginFunc(ctx, func(tx pgx.Tx) error {
 		now := time.Now()
 		q := db.New(tx)
@@ -97,6 +140,7 @@
 			WorkflowName:   workflowName,
 			WorkflowParams: sql.NullString{String: string(m), Valid: len(m) > 0},
 			Once:           sched.Once,
+			Spec:           sched.Cron,
 			CreatedAt:      now,
 			UpdatedAt:      now,
 		})
@@ -105,30 +149,43 @@
 		}
 		return nil
 	})
-	s.cron.Schedule(&RunOnce{next: sched.Once}, &WorkflowSchedule{Schedule: row, worker: s.w, Params: params})
+	s.cron.Schedule(cronSched, &WorkflowSchedule{Schedule: row, worker: s.w, Params: params})
 	return row, err
 }
 
 // Resume fetches schedules from the database and schedules them.
 func (s *Scheduler) Resume(ctx context.Context) error {
 	q := db.New(s.db)
-	scheds, err := q.Schedules(ctx)
+	rows, err := q.Schedules(ctx)
 	if err != nil {
 		return err
 	}
-	for _, sched := range scheds {
-		def := s.w.dh.Definition(sched.WorkflowName)
+	for _, row := range rows {
+		def := s.w.dh.Definition(row.WorkflowName)
 		if def == nil {
-			log.Printf("Unable to schedule %q (schedule.id: %d): no definition found", sched.WorkflowName, sched.ID)
+			log.Printf("Unable to schedule %q (schedule.id: %d): no definition found", row.WorkflowName, row.ID)
 			continue
 		}
-		params, err := UnmarshalWorkflow(sched.WorkflowParams.String, def)
+		params, err := UnmarshalWorkflow(row.WorkflowParams.String, def)
 		if err != nil {
-			log.Printf("Error in UnmarshalWorkflow(%q, %q) for schedule %d: %q", sched.WorkflowParams.String, sched.WorkflowName, sched.ID, err)
+			log.Printf("Error in UnmarshalWorkflow(%q, %q) for schedule %d: %q", row.WorkflowParams.String, row.WorkflowName, row.ID, err)
 			continue
 		}
-		s.cron.Schedule(&RunOnce{next: sched.Once}, &WorkflowSchedule{
-			Schedule: sched,
+		sched := Schedule{Once: row.Once, Cron: row.Spec}
+		sched.setType()
+		if sched.Type == ScheduleOnce && row.Once.Before(time.Now()) {
+			log.Printf("Skipping %q Schedule (schedule.id: %d): %q is in the past", sched.Type, row.ID, sched.Once.String())
+			continue
+		}
+
+		cronSched, err := sched.Parse()
+		if err != nil {
+			log.Printf("Unable to schedule %q (schedule.id %d): invalid Schedule: %q", row.WorkflowName, row.ID, err)
+			continue
+		}
+
+		s.cron.Schedule(cronSched, &WorkflowSchedule{
+			Schedule: row,
 			Params:   params,
 			worker:   s.w,
 		})
diff --git a/internal/relui/schedule_test.go b/internal/relui/schedule_test.go
index 55e15a5..789c0cf 100644
--- a/internal/relui/schedule_test.go
+++ b/internal/relui/schedule_test.go
@@ -12,9 +12,19 @@
 
 	"github.com/google/go-cmp/cmp"
 	"github.com/google/go-cmp/cmp/cmpopts"
+	"github.com/robfig/cron/v3"
 	"golang.org/x/build/internal/relui/db"
 )
 
+func mustParseSpec(t *testing.T, spec string) cron.Schedule {
+	t.Helper()
+	sched, err := cron.ParseStandard(spec)
+	if err != nil {
+		t.Fatalf("cron.ParseStandard(%q) = %q, wanted no error", spec, err)
+	}
+	return sched
+}
+
 func TestSchedulerCreate(t *testing.T) {
 	now := time.Now()
 	cases := []struct {
@@ -28,7 +38,7 @@
 	}{
 		{
 			desc:         "success: once",
-			sched:        Schedule{Once: now.AddDate(1, 0, 0)},
+			sched:        Schedule{Once: now.AddDate(1, 0, 0), Type: ScheduleOnce},
 			workflowName: "echo",
 			params:       map[string]any{"greeting": "hello", "farewell": "bye"},
 			want: db.Schedule{
@@ -61,6 +71,49 @@
 				},
 			},
 		},
+		{
+			desc:         "success: cron",
+			sched:        Schedule{Cron: "* * * * *", Type: ScheduleCron},
+			workflowName: "echo",
+			params:       map[string]any{"greeting": "hello", "farewell": "bye"},
+			want: db.Schedule{
+				WorkflowName: "echo",
+				WorkflowParams: sql.NullString{
+					String: `{"farewell": "bye", "greeting": "hello"}`,
+					Valid:  true,
+				},
+				Spec:      "* * * * *",
+				CreatedAt: now,
+				UpdatedAt: now,
+			},
+			wantEntries: []ScheduleEntry{
+				{
+					Schedule: mustParseSpec(t, "* * * * *"),
+					Next:     now.Add(time.Minute),
+					Job: &WorkflowSchedule{
+						Schedule: db.Schedule{
+							WorkflowName: "echo",
+							WorkflowParams: sql.NullString{
+								String: `{"farewell": "bye", "greeting": "hello"}`,
+								Valid:  true,
+							},
+							Spec:      "* * * * *",
+							CreatedAt: now,
+							UpdatedAt: now,
+						},
+						Params: map[string]any{"greeting": "hello", "farewell": "bye"},
+					},
+				},
+			},
+		},
+		{
+			desc:         "error: invalid Schedule",
+			sched:        Schedule{Type: ScheduleImmediate},
+			workflowName: "echo",
+			params:       map[string]any{"greeting": "hello", "farewell": "bye"},
+			wantErr:      true,
+			wantEntries:  []ScheduleEntry{},
+		},
 	}
 	for _, c := range cases {
 		t.Run(c.desc, func(t *testing.T) {
@@ -69,8 +122,8 @@
 			p := testDB(ctx, t)
 			s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}))
 			row, err := s.Create(ctx, c.sched, c.workflowName, c.params)
-			if err != nil {
-				t.Fatalf("s.Create(_, %v, %q, %v) = %v, %v, wanted no error", c.sched, c.workflowName, c.params, row, err)
+			if (err != nil) != c.wantErr {
+				t.Fatalf("s.Create(_, %v, %q, %v) = %v, %v, wantErr: %t", c.sched, c.workflowName, c.params, row, err, c.wantErr)
 			}
 			if diff := cmp.Diff(c.want, row, cmpopts.EquateApproxTime(time.Minute), cmpopts.IgnoreFields(db.Schedule{}, "ID")); diff != "" {
 				t.Fatalf("s.Create() mismatch (-want +got):\n%s", diff)
@@ -80,7 +133,7 @@
 			diffOpts := []cmp.Option{
 				cmpopts.EquateApproxTime(time.Minute),
 				cmpopts.IgnoreFields(db.Schedule{}, "ID"),
-				cmpopts.IgnoreUnexported(RunOnce{}, WorkflowSchedule{}),
+				cmpopts.IgnoreUnexported(RunOnce{}, WorkflowSchedule{}, time.Location{}),
 				cmpopts.IgnoreFields(ScheduleEntry{}, "ID", "WrappedJob"),
 			}
 			if diff := cmp.Diff(c.wantEntries, got, diffOpts...); diff != "" {
@@ -133,6 +186,56 @@
 				},
 			},
 		},
+		{
+			desc: "success: cron",
+			scheds: []db.CreateScheduleParams{
+				{
+					WorkflowName: "echo",
+					WorkflowParams: sql.NullString{
+						String: `{"farewell": "bye", "greeting": "hello"}`,
+						Valid:  true,
+					},
+					Spec:      "* * * * *",
+					CreatedAt: now,
+					UpdatedAt: now,
+				},
+			},
+			want: []ScheduleEntry{
+				{
+					Schedule: mustParseSpec(t, "* * * * *"),
+					Next:     now.Add(time.Minute),
+					Job: &WorkflowSchedule{
+						Schedule: db.Schedule{
+							WorkflowName: "echo",
+							WorkflowParams: sql.NullString{
+								String: `{"farewell": "bye", "greeting": "hello"}`,
+								Valid:  true,
+							},
+							Spec:      "* * * * *",
+							CreatedAt: now,
+							UpdatedAt: now,
+						},
+						Params: map[string]any{"greeting": "hello", "farewell": "bye"},
+					},
+				},
+			},
+		},
+		{
+			desc: "skip past RunOnce schedules",
+			scheds: []db.CreateScheduleParams{
+				{
+					WorkflowName: "echo",
+					WorkflowParams: sql.NullString{
+						String: `{"farewell": "bye", "greeting": "hello"}`,
+						Valid:  true,
+					},
+					Once:      time.Now().AddDate(-1, 0, 0),
+					CreatedAt: now,
+					UpdatedAt: now,
+				},
+			},
+			want: []ScheduleEntry{},
+		},
 	}
 	for _, c := range cases {
 		t.Run(c.desc, func(t *testing.T) {
@@ -155,7 +258,7 @@
 			diffOpts := []cmp.Option{
 				cmpopts.EquateApproxTime(time.Minute),
 				cmpopts.IgnoreFields(db.Schedule{}, "ID"),
-				cmpopts.IgnoreUnexported(RunOnce{}, WorkflowSchedule{}),
+				cmpopts.IgnoreUnexported(RunOnce{}, WorkflowSchedule{}, time.Location{}),
 				cmpopts.IgnoreFields(ScheduleEntry{}, "ID", "WrappedJob"),
 			}
 			if diff := cmp.Diff(c.want, got, diffOpts...); diff != "" {
diff --git a/internal/relui/templates/new_workflow.html b/internal/relui/templates/new_workflow.html
index 8d21ec3..956d6d8 100644
--- a/internal/relui/templates/new_workflow.html
+++ b/internal/relui/templates/new_workflow.html
@@ -60,7 +60,7 @@
                 <div class="NewWorkflow-parameter">
                   <label for="workflow.schedule.cron">Run on a cron schedule (minute hour day-of-month month day-of-week):</label>
                   <input type="text" id="workflow.schedule.cron" name="workflow.schedule.cron" placeholder="* * * * *" title="Valid Cron-syntax string"
-                         pattern="/(\S+ \S+ \S+ \S+ \S+ *)|@(hourly|daily|weekly|monthly|yearly|annually|midnight)/"/>
+                         pattern="(\S+ \S+ \S+ \S+ \S+ *)|@(hourly|daily|weekly|monthly|yearly|annually|midnight)"/>
                 </div>
               {{else}}
                 <div class="NewWorkflow-parameter">
diff --git a/internal/relui/web.go b/internal/relui/web.go
index 9e8fe43..5a240d6 100644
--- a/internal/relui/web.go
+++ b/internal/relui/web.go
@@ -376,14 +376,24 @@
 			return
 		}
 	}
-	if r.FormValue("workflow.schedule") == string(ScheduleOnce) {
-		t, err := time.ParseInLocation(DatetimeLocalLayout, r.FormValue("workflow.schedule.datetime"), time.UTC)
-		if err != nil || t.Before(time.Now()) {
-			http.Error(w, fmt.Sprintf("parameter %q parsing error: %v", "workflow.schedule.datetime", err), http.StatusBadRequest)
+	sched := Schedule{Type: ScheduleType(r.FormValue("workflow.schedule"))}
+	if sched.Type != ScheduleImmediate {
+		switch sched.Type {
+		case ScheduleOnce:
+			t, err := time.ParseInLocation(DatetimeLocalLayout, r.FormValue("workflow.schedule.datetime"), time.UTC)
+			if err != nil || t.Before(time.Now()) {
+				http.Error(w, fmt.Sprintf("parameter %q parsing error: %v", "workflow.schedule.datetime", err), http.StatusBadRequest)
+				return
+			}
+			sched.Once = t
+		case ScheduleCron:
+			sched.Cron = r.FormValue("workflow.schedule.cron")
+		}
+		if err := sched.Valid(); err != nil {
+			http.Error(w, fmt.Sprintf("parameter %q parsing error: %v", "workflow.schedule", err), http.StatusBadRequest)
 			return
 		}
-		_, err = s.scheduler.Create(r.Context(), Schedule{Once: t}, name, params)
-		if err != nil {
+		if _, err := s.scheduler.Create(r.Context(), sched, name, params); err != nil {
 			http.Error(w, fmt.Sprintf("failed to create schedule: %v", err), http.StatusInternalServerError)
 			return
 		}
diff --git a/internal/relui/web_test.go b/internal/relui/web_test.go
index 1682491..1382674 100644
--- a/internal/relui/web_test.go
+++ b/internal/relui/web_test.go
@@ -10,6 +10,7 @@
 	"embed"
 	"flag"
 	"fmt"
+	"io"
 	"io/ioutil"
 	"log"
 	"net/http"
@@ -187,13 +188,19 @@
 			wantCode: http.StatusBadRequest,
 		},
 		{
-			desc:     "invalid workflow name",
-			params:   url.Values{"workflow.name": []string{"invalid"}},
+			desc: "invalid workflow name",
+			params: url.Values{
+				"workflow.name":     []string{"invalid"},
+				"workflow.schedule": []string{string(ScheduleImmediate)},
+			},
 			wantCode: http.StatusBadRequest,
 		},
 		{
-			desc:     "missing workflow params",
-			params:   url.Values{"workflow.name": []string{"echo"}},
+			desc: "missing workflow params",
+			params: url.Values{
+				"workflow.name":     []string{"echo"},
+				"workflow.schedule": []string{string(ScheduleImmediate)},
+			},
 			wantCode: http.StatusBadRequest,
 		},
 		{
@@ -202,6 +209,7 @@
 				"workflow.name":            []string{"echo"},
 				"workflow.params.greeting": []string{"hello"},
 				"workflow.params.farewell": []string{"bye"},
+				"workflow.schedule":        []string{string(ScheduleImmediate)},
 			},
 			wantCode: http.StatusSeeOther,
 			wantWorkflows: []db.Workflow{
@@ -255,6 +263,26 @@
 				},
 			},
 		},
+		{
+			desc: "successful creation: schedule cron",
+			params: url.Values{
+				"workflow.name":            []string{"echo"},
+				"workflow.params.greeting": []string{"hello"},
+				"workflow.params.farewell": []string{"bye"},
+				"workflow.schedule":        []string{string(ScheduleCron)},
+				"workflow.schedule.cron":   []string{"0 0 1 1 0"},
+			},
+			wantCode: http.StatusSeeOther,
+			wantSchedules: []db.Schedule{
+				{
+					WorkflowName:   "echo",
+					WorkflowParams: nullString(`{"farewell": "bye", "greeting": "hello"}`),
+					Spec:           "0 0 1 1 0",
+					CreatedAt:      now, // cmpopts.EquateApproxTime
+					UpdatedAt:      now, // cmpopts.EquateApproxTime
+				},
+			},
+		},
 	}
 	for _, c := range cases {
 		t.Run(c.desc, func(t *testing.T) {
@@ -270,6 +298,10 @@
 
 			if resp.StatusCode != c.wantCode {
 				t.Errorf("rep.StatusCode = %d, wanted %d", resp.StatusCode, c.wantCode)
+				if resp.StatusCode == http.StatusBadRequest {
+					b, _ := io.ReadAll(resp.Body)
+					t.Logf("resp.Body: \n%v", string(b))
+				}
 			}
 			if c.wantCode == http.StatusBadRequest {
 				return