// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Command ejobs supports jobs on ecosystem-metrics.
package main

import (
	"bytes"
	"context"
	"crypto/md5"
	"debug/buildinfo"
	"encoding/json"
	"errors"
	"flag"
	"fmt"
	"io"
	"net/http"
	"os"
	"path"
	"path/filepath"
	"reflect"
	"strconv"
	"strings"
	"text/tabwriter"
	"time"

	"cloud.google.com/go/storage"
	"golang.org/x/oauth2"
	"golang.org/x/pkgsite-metrics/internal/jobs"
	"google.golang.org/api/impersonate"
	"google.golang.org/api/option"
)

const projectID = "go-ecosystem"

// Common flags
var (
	env    = flag.String("env", "prod", "worker environment (dev or prod)")
	dryRun = flag.Bool("n", false, "print actions but do not execute them")
)
var commands = []command{
	{"list", "",
		"list jobs", doList},
	{"show", "JOBID...",
		"display information about jobs in the last 7 days", doShow},
	{"cancel", "JOBID...",
		"cancel the jobs", doCancel},
	{"start", "BINARY [MIN_IMPORTERS]",
		"start a job for a linux/amd64 binary", doStart},
}

type command struct {
	name   string
	argdoc string
	desc   string
	run    func(context.Context, []string) error
}

func main() {
	flag.Usage = func() {
		out := flag.CommandLine.Output()
		fmt.Fprintln(out, "usage:")
		for _, cmd := range commands {
			fmt.Fprintf(out, "  ejobs %s %s\n", cmd.name, cmd.argdoc)
			fmt.Fprintf(out, "\t%s\n", cmd.desc)
		}
		fmt.Fprintln(out, "\ncommon flags:")
		flag.PrintDefaults()
	}

	flag.Parse()
	if err := run(context.Background()); err != nil {
		fmt.Fprintf(os.Stderr, "%v\n\n", err)
		flag.Usage()
		os.Exit(2)
	}
}

var workerURL string

func run(ctx context.Context) error {
	wu := os.Getenv("GO_ECOSYSTEM_WORKER_URL_SUFFIX")
	if wu == "" {
		return errors.New("need GO_ECOSYSTEM_WORKER_URL_SUFFIX environment variable")
	}
	workerURL = fmt.Sprintf("https://%s-%s", *env, wu)
	name := flag.Arg(0)
	for _, cmd := range commands {
		if cmd.name == name {
			return cmd.run(ctx, flag.Args()[1:])
		}
	}
	return fmt.Errorf("unknown command %q", name)
}

func doShow(ctx context.Context, args []string) error {
	ts, err := identityTokenSource(ctx)
	if err != nil {
		return err
	}
	for _, jobID := range args {
		if err := showJob(ctx, jobID, ts); err != nil {
			return err
		}
	}
	return nil
}

func showJob(ctx context.Context, jobID string, ts oauth2.TokenSource) error {
	job, err := requestJSON[jobs.Job](ctx, "jobs/describe?jobid="+jobID, ts)
	if err != nil {
		return err
	}
	if *dryRun {
		return nil
	}
	rj := reflect.ValueOf(job).Elem()
	rt := rj.Type()
	for i := 0; i < rt.NumField(); i++ {
		f := rt.Field(i)
		if f.IsExported() {
			v := rj.FieldByIndex(f.Index)
			name, _ := strings.CutPrefix(f.Name, "Num")
			fmt.Printf("%s: %v\n", name, v.Interface())
		}
	}
	return nil
}

func doList(ctx context.Context, _ []string) error {
	ts, err := identityTokenSource(ctx)
	if err != nil {
		return err
	}
	joblist, err := requestJSON[[]jobs.Job](ctx, "jobs/list", ts)
	if err != nil {
		return err
	}
	if *dryRun {
		return nil
	}
	d7 := -time.Hour * 24 * 7
	weekBefore := time.Now().Add(d7)
	tw := tabwriter.NewWriter(os.Stdout, 2, 8, 1, ' ', 0)
	fmt.Fprintf(tw, "ID\tUser\tStart Time\tStarted\tFinished\tTotal\tCanceled\n")
	for _, j := range *joblist {
		if j.StartedAt.After(weekBefore) {
			fmt.Fprintf(tw, "%s\t%s\t%s\t%d\t%d\t%d\t%t\n",
				j.ID(), j.User, j.StartedAt.Format(time.RFC3339),
				j.NumStarted,
				j.NumSkipped+j.NumFailed+j.NumErrored+j.NumSucceeded,
				j.NumEnqueued,
				j.Canceled)
		}
	}
	return tw.Flush()
}

func doCancel(ctx context.Context, args []string) error {
	ts, err := identityTokenSource(ctx)
	if err != nil {
		return err
	}
	for _, jobID := range args {
		url := workerURL + "/jobs/cancel?jobid=" + jobID
		if *dryRun {
			fmt.Printf("dryrun: GET %s\n", url)
			continue
		}
		if _, err := httpGet(ctx, url, ts); err != nil {
			return fmt.Errorf("canceling %q: %w", jobID, err)
		}
	}
	return nil
}

func doStart(ctx context.Context, args []string) error {
	// Validate arguments.
	if len(args) < 1 || len(args) > 2 {
		return errors.New("wrong number of args: want BINARY [MIN_IMPORTERS]")
	}
	min := -1
	if len(args) > 1 {
		m, err := strconv.Atoi(args[1])
		if err != nil {
			return err
		}
		if m < 0 {
			return errors.New("MIN_IMPORTERS cannot be negative")
		}
		min = m
	}
	binaryFile := args[0]
	if fi, err := os.Stat(binaryFile); err != nil {
		if errors.Is(err, os.ErrNotExist) {
			return fmt.Errorf("%s does not exist", binaryFile)
		}
		return err
	} else if fi.IsDir() {
		return fmt.Errorf("%s is a directory, not a file", binaryFile)
	} else if err := checkIsLinuxAmd64(binaryFile); err != nil {
		return err
	}

	// Copy binary to GCS if it's not already there.
	if err := uploadAnalysisBinary(ctx, binaryFile); err != nil {
		return err
	}

	// Ask the server to enqueue scan tasks.
	its, err := identityTokenSource(ctx)
	if err != nil {
		return err
	}
	url := fmt.Sprintf("%s/analysis/enqueue?binary=%s&user=%s", workerURL, filepath.Base(binaryFile), os.Getenv("USER"))
	if min >= 0 {
		url += fmt.Sprintf("&min=%d", min)
	}
	if *dryRun {
		fmt.Printf("dryrun: GET %s\n", url)
		return nil
	}
	body, err := httpGet(ctx, url, its)
	if err != nil {
		return err
	}
	fmt.Printf("%s\n", body)
	return nil
}

// checkIsLinuxAmd64 checks if binaryFile is a linux/amd64 Go
// binary. If not, returns an error with appropriate message.
// Otherwise, returns nil.
func checkIsLinuxAmd64(binaryFile string) error {
	bin, err := os.Open(binaryFile)
	if err != nil {
		return err
	}
	defer bin.Close()

	bi, err := buildinfo.Read(bin)
	if err != nil {
		return err
	}

	var goos, goarch string
	for _, setting := range bi.Settings {
		if setting.Key == "GOOS" {
			goos = setting.Value
		} else if setting.Key == "GOARCH" {
			goarch = setting.Value
		}
	}

	if goos != "linux" || goarch != "amd64" {
		return fmt.Errorf("binary not built for linux/amd64: GOOS=%s GOARCH=%s", goos, goarch)
	}
	return nil
}

// uploadAnalysisBinary copies binaryFile to the GCS location used for
// analysis binaries.
// As an optimization, it skips the upload if the file is already on GCS
// and has the same checksum as the local file.
func uploadAnalysisBinary(ctx context.Context, binaryFile string) error {
	if *dryRun {
		fmt.Printf("dryrun: upload analysis binary %s\n", binaryFile)
		return nil
	}
	const bucketName = projectID
	binaryName := filepath.Base(binaryFile)
	objectName := path.Join("analysis-binaries", binaryName)

	ts, err := accessTokenSource(ctx)
	if err != nil {
		return err
	}
	c, err := storage.NewClient(ctx, option.WithTokenSource(ts))
	if err != nil {
		return err
	}
	defer c.Close()
	bucket := c.Bucket(bucketName)
	object := bucket.Object(objectName)
	attrs, err := object.Attrs(ctx)
	if errors.Is(err, storage.ErrObjectNotExist) {
		fmt.Printf("%s does not exist, uploading\n", object.ObjectName())
	} else if err != nil {
		return err
	} else if g, w := len(attrs.MD5), md5.Size; g != w {
		return fmt.Errorf("len(attrs.MD5) = %d, wanted %d", g, w)
	} else {
		localMD5, err := fileMD5(binaryFile)
		if err != nil {
			return err
		}
		if bytes.Equal(localMD5, attrs.MD5) {
			fmt.Printf("%s already on GCS with same checksum; not uploading\n", binaryFile)
			return nil
		} else {
			fmt.Printf("binary %s exists on GCS but hashes don't match; uploading\n", binaryName)
		}
	}
	fmt.Printf("copying %s to %s\n", binaryFile, object.ObjectName())
	return copyToGCS(ctx, object, binaryFile)
}

// fileMD5 computes the MD5 checksum of the given file.
func fileMD5(filename string) ([]byte, error) {
	f, err := os.Open(filename)
	if err != nil {
		return nil, err
	}
	defer f.Close()
	hash := md5.New()
	if _, err := io.Copy(hash, f); err != nil {
		return nil, err
	}
	return hash.Sum(nil)[:], nil
}

// copyToLocalFile copies the filename to the GCS object.
func copyToGCS(ctx context.Context, object *storage.ObjectHandle, filename string) error {
	src, err := os.Open(filename)
	if err != nil {
		return err
	}
	defer src.Close()
	dest := object.NewWriter(ctx)
	if _, err := io.Copy(dest, src); err != nil {
		return err
	}
	return dest.Close()
}

// requestJSON requests the path from the worker, then reads the returned body
// and unmarshals it as JSON.
func requestJSON[T any](ctx context.Context, path string, ts oauth2.TokenSource) (*T, error) {
	url := workerURL + "/" + path
	if *dryRun {
		fmt.Printf("GET %s\n", url)
		return nil, nil
	}
	body, err := httpGet(ctx, url, ts)
	if err != nil {
		return nil, err
	}
	var t T
	if err := json.Unmarshal(body, &t); err != nil {
		return nil, err
	}
	return &t, nil
}

// httpGet makes a GET request to the given URL with the given identity token.
// It reads the body and returns the HTTP response and the body.
func httpGet(ctx context.Context, url string, ts oauth2.TokenSource) (body []byte, err error) {
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
	if err != nil {
		return nil, err
	}
	token, err := ts.Token()
	if err != nil {
		return nil, err
	}
	token.SetAuthHeader(req)
	res, err := http.DefaultClient.Do(req)
	if err != nil {
		return nil, err
	}
	defer res.Body.Close()
	body, err = io.ReadAll(res.Body)
	if err != nil {
		return nil, fmt.Errorf("reading body (%s): %v", res.Status, err)
	}
	if res.StatusCode != 200 {
		return nil, fmt.Errorf("%s: %s", res.Status, body)
	}
	return body, nil
}

var serviceAccountEmail = fmt.Sprintf("impersonate@%s.iam.gserviceaccount.com", projectID)

func accessTokenSource(ctx context.Context) (oauth2.TokenSource, error) {
	return impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{
		TargetPrincipal: serviceAccountEmail,
		Scopes:          []string{"https://www.googleapis.com/auth/cloud-platform"},
	})
}

func identityTokenSource(ctx context.Context) (oauth2.TokenSource, error) {
	return impersonate.IDTokenSource(ctx, impersonate.IDTokenConfig{
		TargetPrincipal: serviceAccountEmail,
		Audience:        workerURL,
		IncludeEmail:    true,
	})
}
