blob: c6c5cb67c77952805fa7674915d8538a272eae4c [file] [log] [blame]
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package relui
import (
"context"
"database/sql"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/robfig/cron/v3"
"golang.org/x/build/internal/relui/db"
)
func mustParseSpec(t *testing.T, spec string) cron.Schedule {
t.Helper()
sched, err := cron.ParseStandard(spec)
if err != nil {
t.Fatalf("cron.ParseStandard(%q) = %q, wanted no error", spec, err)
}
return sched
}
func TestSchedulerCreate(t *testing.T) {
now := time.Now()
cases := []struct {
desc string
sched Schedule
workflowName string
params map[string]any
want db.Schedule
wantEntries []ScheduleEntry
wantErr bool
}{
{
desc: "success: once",
sched: Schedule{Once: now.AddDate(1, 0, 0), Type: ScheduleOnce},
workflowName: "echo",
params: map[string]any{"greeting": "hello", "farewell": "bye"},
want: db.Schedule{
WorkflowName: "echo",
WorkflowParams: sql.NullString{
String: `{"farewell": "bye", "greeting": "hello"}`,
Valid: true,
},
Once: now.AddDate(1, 0, 0),
CreatedAt: now,
UpdatedAt: now,
},
wantEntries: []ScheduleEntry{
{Entry: cron.Entry{
Schedule: &RunOnce{next: now.UTC().AddDate(1, 0, 0)},
Next: now.UTC().AddDate(1, 0, 0),
Job: &WorkflowSchedule{
Schedule: db.Schedule{
WorkflowName: "echo",
WorkflowParams: sql.NullString{
String: `{"farewell": "bye", "greeting": "hello"}`,
Valid: true,
},
Once: now.UTC().AddDate(1, 0, 0),
CreatedAt: now,
UpdatedAt: now,
},
Params: map[string]any{"greeting": "hello", "farewell": "bye"},
},
}},
},
},
{
desc: "success: cron",
sched: Schedule{Cron: "* * * * *", Type: ScheduleCron},
workflowName: "echo",
params: map[string]any{"greeting": "hello", "farewell": "bye"},
want: db.Schedule{
WorkflowName: "echo",
WorkflowParams: sql.NullString{
String: `{"farewell": "bye", "greeting": "hello"}`,
Valid: true,
},
Spec: "* * * * *",
CreatedAt: now,
UpdatedAt: now,
},
wantEntries: []ScheduleEntry{
{Entry: cron.Entry{
Schedule: mustParseSpec(t, "* * * * *"),
Next: now.Add(time.Minute),
Job: &WorkflowSchedule{
Schedule: db.Schedule{
WorkflowName: "echo",
WorkflowParams: sql.NullString{
String: `{"farewell": "bye", "greeting": "hello"}`,
Valid: true,
},
Spec: "* * * * *",
CreatedAt: now,
UpdatedAt: now,
},
Params: map[string]any{"greeting": "hello", "farewell": "bye"},
},
}},
},
},
{
desc: "error: invalid Schedule",
sched: Schedule{Type: ScheduleImmediate},
workflowName: "echo",
params: map[string]any{"greeting": "hello", "farewell": "bye"},
wantErr: true,
wantEntries: []ScheduleEntry{},
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := testDB(ctx, t)
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}))
row, err := s.Create(ctx, c.sched, c.workflowName, c.params)
if (err != nil) != c.wantErr {
t.Fatalf("s.Create(_, %v, %q, %v) = %v, %v, wantErr: %t", c.sched, c.workflowName, c.params, row, err, c.wantErr)
}
if diff := cmp.Diff(c.want, row, cmpopts.EquateApproxTime(time.Minute), cmpopts.IgnoreFields(db.Schedule{}, "ID")); diff != "" {
t.Fatalf("s.Create() mismatch (-want +got):\n%s", diff)
}
got := s.Entries()
diffOpts := []cmp.Option{
cmpopts.EquateApproxTime(time.Minute),
cmpopts.IgnoreFields(db.Schedule{}, "ID"),
cmpopts.IgnoreUnexported(RunOnce{}, WorkflowSchedule{}, time.Location{}),
cmpopts.IgnoreFields(ScheduleEntry{}, "ID", "LastRun.ID", "WrappedJob"),
}
if diff := cmp.Diff(c.wantEntries, got, diffOpts...); diff != "" {
t.Fatalf("s.Entries() mismatch (-want +got):\n%s", diff)
}
})
}
}
func TestSchedulerResume(t *testing.T) {
now := time.Now()
cases := []struct {
desc string
scheds []db.CreateScheduleParams
want []ScheduleEntry
wantParams map[string]any
wantErr bool
}{
{
desc: "success: once",
scheds: []db.CreateScheduleParams{
{
WorkflowName: "echo",
WorkflowParams: sql.NullString{
String: `{"farewell": "bye", "greeting": "hello"}`,
Valid: true,
},
Once: now.UTC().AddDate(1, 0, 0),
CreatedAt: now,
UpdatedAt: now,
},
},
want: []ScheduleEntry{
{Entry: cron.Entry{
Schedule: &RunOnce{next: now.UTC().AddDate(1, 0, 0)},
Next: now.UTC().AddDate(1, 0, 0),
Job: &WorkflowSchedule{
Schedule: db.Schedule{
WorkflowName: "echo",
WorkflowParams: sql.NullString{
String: `{"farewell": "bye", "greeting": "hello"}`,
Valid: true,
},
Once: now.UTC().AddDate(1, 0, 0),
CreatedAt: now,
UpdatedAt: now,
},
Params: map[string]any{"greeting": "hello", "farewell": "bye"},
},
}},
},
},
{
desc: "success: cron",
scheds: []db.CreateScheduleParams{
{
WorkflowName: "echo",
WorkflowParams: sql.NullString{
String: `{"farewell": "bye", "greeting": "hello"}`,
Valid: true,
},
Spec: "* * * * *",
CreatedAt: now,
UpdatedAt: now,
},
},
want: []ScheduleEntry{
{Entry: cron.Entry{
Schedule: mustParseSpec(t, "* * * * *"),
Next: now.Add(time.Minute),
Job: &WorkflowSchedule{
Schedule: db.Schedule{
WorkflowName: "echo",
WorkflowParams: sql.NullString{
String: `{"farewell": "bye", "greeting": "hello"}`,
Valid: true,
},
Spec: "* * * * *",
CreatedAt: now,
UpdatedAt: now,
},
Params: map[string]any{"greeting": "hello", "farewell": "bye"},
},
}},
},
},
{
desc: "skip past RunOnce schedules",
scheds: []db.CreateScheduleParams{
{
WorkflowName: "echo",
WorkflowParams: sql.NullString{
String: `{"farewell": "bye", "greeting": "hello"}`,
Valid: true,
},
Once: time.Now().AddDate(-1, 0, 0),
CreatedAt: now,
UpdatedAt: now,
},
},
want: []ScheduleEntry{},
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := testDB(ctx, t)
q := db.New(p)
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}))
for _, csp := range c.scheds {
if _, err := q.CreateSchedule(ctx, csp); err != nil {
t.Fatalf("q.CreateSchedule(_, %#v) = _, %v, wanted no error", csp, err)
}
}
if err := s.Resume(ctx); err != nil {
t.Errorf("s.Resume() = %v, wanted no error", err)
}
got := s.Entries()
diffOpts := []cmp.Option{
cmpopts.EquateApproxTime(time.Minute),
cmpopts.IgnoreFields(db.Schedule{}, "ID"),
cmpopts.IgnoreUnexported(RunOnce{}, WorkflowSchedule{}, time.Location{}),
cmpopts.IgnoreFields(ScheduleEntry{}, "ID", "LastRun.ID", "WrappedJob"),
}
if diff := cmp.Diff(c.want, got, diffOpts...); diff != "" {
t.Fatalf("s.Entries() mismatch (-want +got):\n%s", diff)
}
})
}
}
func TestScheduleDelete(t *testing.T) {
now := time.Now()
cases := []struct {
desc string
sched Schedule
workflowName string
params map[string]any
wantErr bool
wantEntries []ScheduleEntry
want []db.Schedule
wantWorkflows []db.Workflow
}{
{
desc: "success",
sched: Schedule{Once: now.AddDate(1, 0, 0), Type: ScheduleOnce},
workflowName: "echo",
params: map[string]any{"greeting": "hello", "farewell": "bye"},
wantEntries: []ScheduleEntry{},
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
p := testDB(ctx, t)
q := db.New(p)
s := NewScheduler(p, NewWorker(NewDefinitionHolder(), p, &PGListener{DB: p}))
row, err := s.Create(ctx, c.sched, c.workflowName, c.params)
if err != nil {
t.Fatalf("s.Create(_, %v, %q, %v) = %v, %v, wanted no error", c.sched, c.workflowName, c.params, row, err)
}
// simulate a single run
wfid, err := s.w.StartWorkflow(ctx, c.workflowName, c.params, int(row.ID))
if err != nil {
t.Fatalf("s.w.StartWorkflow(_, %q, %v, %d) = %q, %v, wanted no error", c.workflowName, c.params, row.ID, wfid.String(), err)
}
err = s.Delete(ctx, int(row.ID))
if (err != nil) != c.wantErr {
t.Fatalf("s.Delete(%d) = %v, wantErr: %t", row.ID, err, c.wantErr)
}
entries := s.Entries()
diffOpts := []cmp.Option{
cmpopts.EquateApproxTime(time.Minute),
cmpopts.IgnoreFields(db.Schedule{}, "ID"),
cmpopts.IgnoreUnexported(RunOnce{}, WorkflowSchedule{}, time.Location{}),
cmpopts.IgnoreFields(ScheduleEntry{}, "ID", "LastRun.ID", "WrappedJob"),
}
if c.sched.Type == ScheduleCron {
diffOpts = append(diffOpts, cmpopts.IgnoreFields(ScheduleEntry{}, "Next"))
}
if diff := cmp.Diff(c.wantEntries, entries, diffOpts...); diff != "" {
t.Errorf("s.Entries() mismatch (-want +got):\n%s", diff)
}
got, err := q.Schedules(ctx)
if err != nil {
t.Fatalf("q.Schedules() = %v, %v, wanted no error", got, err)
}
if diff := cmp.Diff(c.want, got, diffOpts...); diff != "" {
t.Errorf("q.Schedules() mismatch (-want +got):\n%s", diff)
}
wfs, err := q.Workflows(ctx)
if err != nil {
t.Fatalf("q.Workflows() = %v, %v, wanted no error", wfs, err)
}
if len(wfs) != 1 {
t.Errorf("len(q.Workflows()) = %d, wanted %d", len(wfs), 1)
}
for _, w := range wfs {
if w.ScheduleID.Int32 == row.ID {
t.Errorf("w.ScheduleID = %d, wanted != %d", w.ScheduleID.Int32, row.ID)
}
}
})
}
}