// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package relui

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"log"
	"strings"
	"sync"
	"testing"
	"time"

	"github.com/google/go-cmp/cmp"
	"github.com/google/go-cmp/cmp/cmpopts"
	"github.com/google/uuid"
	"golang.org/x/build/internal/relui/db"
	"golang.org/x/build/internal/workflow"
)

func TestWorkerStartWorkflow(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	dbp := testDB(ctx, t)
	q := db.New(dbp)
	wg := sync.WaitGroup{}
	dh := NewDefinitionHolder()
	w := NewWorker(dh, dbp, &testWorkflowListener{
		Listener:   &PGListener{dbp},
		onFinished: wg.Done,
	})

	wd := newTestEchoWorkflow()
	dh.RegisterDefinition(t.Name(), wd)
	params := map[string]interface{}{"greeting": "greetings", "names": []string{"alice", "bob"}}

	wg.Add(1)
	wfid, err := w.StartWorkflow(ctx, t.Name(), params, 0)
	if err != nil {
		t.Fatalf("w.StartWorkflow(_, %v, %v) = %v, %v, wanted no error", wd, params, wfid, err)
	}
	go w.Run(ctx)
	wg.Wait()

	wfs, err := q.Workflows(ctx)
	if err != nil {
		t.Fatalf("q.Workflows() = %v, %v, wanted no error", wfs, err)
	}
	wantWfs := []db.Workflow{{
		ID: wfid,
		// Params ignored: nondeterministic serialization
		Name:      nullString(t.Name()),
		Output:    `{"echo": "greetings alice bob"}`,
		Finished:  true,
		CreatedAt: time.Now(), // cmpopts.EquateApproxTime
		UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
	}}
	if diff := cmp.Diff(wantWfs, wfs, cmpopts.EquateApproxTime(time.Minute), cmpopts.IgnoreFields(db.Workflow{}, "Params")); diff != "" {
		t.Fatalf("q.Workflows() mismatch (-want +got):\n%s", diff)
	}
	tasks, err := q.TasksForWorkflow(ctx, wfid)
	if err != nil {
		t.Fatalf("q.TasksForWorkflow(_, %v) = %v, %v, wanted no error", wfid, tasks, err)
	}
	want := []db.Task{
		{
			WorkflowID: wfid,
			Name:       "echo",
			Started:    true,
			Finished:   true,
			Result:     nullString(`"greetings alice bob"`),
			Error:      sql.NullString{},
			CreatedAt:  time.Now(), // cmpopts.EquateApproxTime
			UpdatedAt:  time.Now(), // cmpopts.EquateApproxTime
		},
	}
	if diff := cmp.Diff(want, tasks, cmpopts.EquateApproxTime(time.Minute)); diff != "" {
		t.Errorf("q.TasksForWorkflow(_, %q) mismatch (-want +got):\n%s", wfid, diff)
	}
}

func TestWorkerResume(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	dbp := testDB(ctx, t)
	q := db.New(dbp)
	wg := sync.WaitGroup{}
	dh := NewDefinitionHolder()
	w := NewWorker(dh, dbp, &testWorkflowListener{
		Listener:   &PGListener{dbp},
		onFinished: wg.Done,
	})

	wd := newTestEchoWorkflow()
	dh.RegisterDefinition(t.Name(), wd)
	wfid := createUnfinishedEchoWorkflow(t, ctx, q)

	wg.Add(1)
	go w.Run(ctx)
	if err := w.Resume(ctx, wfid); err != nil {
		t.Fatalf("w.Resume(_, %v) = %v, wanted no error", wfid, err)
	}
	wg.Wait()

	tasks, err := q.TasksForWorkflow(ctx, wfid)
	if err != nil {
		t.Fatalf("q.TasksForWorkflow(_, %v) = %v, %v, wanted no error", wfid, tasks, err)
	}
	want := []db.Task{{
		WorkflowID: wfid,
		Name:       "echo",
		Started:    true,
		Finished:   true,
		Result:     nullString(`"hello alice bob"`),
		Error:      sql.NullString{},
		CreatedAt:  time.Now(), // cmpopts.EquateApproxTime
		UpdatedAt:  time.Now(), // cmpopts.EquateApproxTime
	}}
	if diff := cmp.Diff(want, tasks, cmpopts.EquateApproxTime(time.Minute)); diff != "" {
		t.Errorf("q.TasksForWorkflow(_, %q) mismatch (-want +got):\n%s", wfid, diff)
	}
}

func TestWorkerResumeMissingDefinition(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	dbp := testDB(ctx, t)
	q := db.New(dbp)
	w := NewWorker(NewDefinitionHolder(), dbp, &PGListener{dbp})

	cwp := db.CreateWorkflowParams{ID: uuid.New(), Name: nullString(t.Name()), Params: nullString("{}")}
	if wf, err := q.CreateWorkflow(ctx, cwp); err != nil {
		t.Fatalf("q.CreateWorkflow(_, %v) = %v, %v, wanted no error", cwp, wf, err)
	}

	if err := w.Resume(ctx, cwp.ID); err == nil {
		t.Fatalf("w.Resume(_, %q) = %v, wanted error", cwp.ID, err)
	}
}

func TestWorkflowResumeAll(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	dbp := testDB(ctx, t)
	q := db.New(dbp)
	wg := sync.WaitGroup{}
	dh := NewDefinitionHolder()
	w := NewWorker(dh, dbp, &testWorkflowListener{
		Listener:   &PGListener{dbp},
		onFinished: wg.Done,
	})

	wd := newTestEchoWorkflow()
	dh.RegisterDefinition(t.Name(), wd)
	wfid1 := createUnfinishedEchoWorkflow(t, ctx, q)
	wfid2 := createUnfinishedEchoWorkflow(t, ctx, q)

	wg.Add(2)
	go w.Run(ctx)
	if err := w.ResumeAll(ctx); err != nil {
		t.Fatalf("w.ResumeAll() = %v, wanted no error", err)
	}
	wg.Wait()

	tasks, err := q.Tasks(ctx)
	if err != nil {
		t.Fatalf("q.Tasks() = %v, %v, wanted no error", tasks, err)
	}
	want := []db.TasksRow{
		{
			WorkflowID:       wfid1,
			Name:             "echo",
			Started:          true,
			Finished:         true,
			Result:           nullString(`"hello alice bob"`),
			Error:            sql.NullString{},
			CreatedAt:        time.Now(), // cmpopts.EquateApproxTime
			UpdatedAt:        time.Now(), // cmpopts.EquateApproxTime
			MostRecentUpdate: time.Now(),
		},
		{
			WorkflowID:       wfid2,
			Name:             "echo",
			Started:          true,
			Finished:         true,
			Result:           nullString(`"hello alice bob"`),
			Error:            sql.NullString{},
			CreatedAt:        time.Now(), // cmpopts.EquateApproxTime
			UpdatedAt:        time.Now(), // cmpopts.EquateApproxTime
			MostRecentUpdate: time.Now(),
		},
	}
	sort := cmpopts.SortSlices(func(x, y db.TasksRow) bool {
		return x.WorkflowID.String() < y.WorkflowID.String()
	})
	if diff := cmp.Diff(want, tasks, cmpopts.EquateApproxTime(time.Minute), sort); diff != "" {
		t.Errorf("q.Tasks() mismatch (-want +got):\n%s", diff)
	}
}

func TestWorkflowResumeRetry(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	dbp := testDB(ctx, t)
	dh := NewDefinitionHolder()
	w := NewWorker(dh, dbp, &PGListener{dbp})

	counter := 0
	blockingChan := make(chan bool)
	wd := workflow.New()
	nothing := workflow.Task0(wd, "needs retry", func(ctx context.Context) (string, error) {
		// Send twice so that the test can stop us mid-execution.
		for i := 0; i < 2; i++ {
			select {
			case <-ctx.Done():
				return "", ctx.Err()
			case blockingChan <- true:
				counter++
			}
		}
		if counter > 4 {
			return "", nil
		}
		return "", errors.New("expected")
	})
	workflow.Output(wd, "nothing", nothing)
	dh.RegisterDefinition(t.Name(), wd)

	// Run the workflow. It will try the task up to 3 times. Stop the worker
	// during its second run, then resume it and verify the task retries.
	go func() {
		for i := 0; i < 3; i++ {
			<-blockingChan
		}
		cancel()
	}()
	wfid, err := w.StartWorkflow(ctx, t.Name(), nil, 0)
	if err != nil {
		t.Fatalf("w.StartWorkflow(_, %v, %v) = %v, %v, wanted no error", wd, nil, wfid, err)
	}
	w.Run(ctx)
	if counter != 3 {
		t.Fatalf("task sent %v times, wanted 3", counter)
	}

	t.Log("Restarting worker")
	ctx, cancel = context.WithCancel(context.Background())
	defer cancel()
	wfDone := make(chan bool, 1)
	w = NewWorker(dh, dbp, &testWorkflowListener{
		Listener:   &PGListener{dbp},
		onFinished: func() { wfDone <- true },
	})

	go func() {
		for {
			select {
			case <-blockingChan:
			case <-ctx.Done():
				return
			}
		}
	}()
	go w.Run(ctx)
	if err := w.Resume(ctx, wfid); err != nil {
		t.Fatalf("w.Resume(_, %v) = %v, wanted no error", wfid, err)
	}
	<-wfDone
}

func newTestEchoWorkflow() *workflow.Definition {
	wd := workflow.New()
	echo := func(ctx context.Context, greeting string, names []string) (string, error) {
		return fmt.Sprintf("%v %v", greeting, strings.Join(names, " ")), nil
	}
	greeting := workflow.Param(wd, workflow.ParamDef[string]{Name: "greeting"})
	names := workflow.Param(wd, workflow.ParamDef[[]string]{Name: "names", ParamType: workflow.SliceShort})
	workflow.Output(wd, "echo", workflow.Task2(wd, "echo", echo, greeting, names))
	return wd
}

func createUnfinishedEchoWorkflow(t *testing.T, ctx context.Context, q *db.Queries) uuid.UUID {
	t.Helper()
	cwp := db.CreateWorkflowParams{ID: uuid.New(), Name: nullString(t.Name()), Params: nullString(`{"greeting": "hello", "names": ["alice", "bob"]}`)}
	if wf, err := q.CreateWorkflow(ctx, cwp); err != nil {
		t.Fatalf("q.CreateWorkflow(_, %v) = %v, %v, wanted no error", cwp, wf, err)
	}
	cwt := db.CreateTaskParams{WorkflowID: cwp.ID, Name: "echo", Result: nullString("null"), CreatedAt: time.Now()}
	if wt, err := q.CreateTask(ctx, cwt); err != nil {
		t.Fatalf("q.CreateWorkflowTask(_, %v) = %v, %v, wanted no error", cwt, wt, err)
	}
	return cwp.ID
}

type testWorkflowListener struct {
	Listener

	onFinished func()
}

func (t *testWorkflowListener) WorkflowFinished(ctx context.Context, wfid uuid.UUID, outputs map[string]interface{}, err error) error {
	defer t.onFinished()
	return t.Listener.WorkflowFinished(ctx, wfid, outputs, err)
}

type unimplementedListener struct {
}

func (u *unimplementedListener) TaskStateChanged(uuid.UUID, string, *workflow.TaskState) error {
	return errors.New("method TaskStateChanged not implemented")
}

func (u *unimplementedListener) Logger(uuid.UUID, string) workflow.Logger {
	return log.Default()
}

func (u *unimplementedListener) WorkflowStarted(context.Context, uuid.UUID, string, map[string]interface{}, int) error {
	return errors.New("method WorkflowStarted not implemented")
}

func (u *unimplementedListener) WorkflowFinished(context.Context, uuid.UUID, map[string]interface{}, error) error {
	return errors.New("method WorkflowFinished not implemented")
}
