blob: 18c748c66d5b8e66b2615c23aca92e98b8540805 [file] [log] [blame]
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Command ejobs supports jobs on ecosystem-metrics.
package main
import (
"bytes"
"context"
"crypto/md5"
"debug/buildinfo"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"reflect"
"strings"
"text/tabwriter"
"time"
"unicode"
"cloud.google.com/go/storage"
"golang.org/x/oauth2"
"golang.org/x/pkgsite-metrics/internal/analysis"
"golang.org/x/pkgsite-metrics/internal/jobs"
"google.golang.org/api/impersonate"
"google.golang.org/api/option"
)
const (
projectID = "go-ecosystem"
uploaderMetadataKey = "uploader"
)
// Common flags
var (
env = flag.String("env", "prod", "worker environment (dev or prod)")
dryRun = flag.Bool("n", false, "print actions but do not execute them")
)
var (
minImporters int // for start
waitInterval time.Duration // for wait
force bool // for results
outfile string // for results
)
var commands = []command{
{"list", "",
"list jobs",
doList, nil},
{"show", "JOBID...",
"display information about jobs in the last 7 days",
doShow, nil},
{"cancel", "JOBID...",
"cancel the jobs",
doCancel, nil},
{"start", "[-min MIN_IMPORTERS] BINARY ARGS...",
"start a job",
doStart,
func(fs *flag.FlagSet) {
fs.IntVar(&minImporters, "min", -1,
"run on modules with at least this many importers (<0: use server default of 10)")
},
},
{"wait", "JOBID",
"do not exit until JOBID is done",
doWait,
func(fs *flag.FlagSet) {
fs.DurationVar(&waitInterval, "i", 0, "display updates at this interval")
},
},
{"results", "[-f] [-o FILE.json] JOBID",
"download results as JSON",
doResults,
func(fs *flag.FlagSet) {
fs.BoolVar(&force, "f", false, "download even if unfinished")
fs.StringVar(&outfile, "o", "", "output filename")
},
},
}
type command struct {
name string
argdoc string
desc string
run func(context.Context, []string) error
flagdefs func(*flag.FlagSet)
}
func main() {
flag.Usage = func() {
out := flag.CommandLine.Output()
fmt.Fprintln(out, "Usage:")
for _, cmd := range commands {
fmt.Println()
fmt.Fprintf(out, "ejobs %s %s\n", cmd.name, cmd.argdoc)
fmt.Fprintf(out, "\t%s\n", cmd.desc)
if cmd.flagdefs != nil {
fs := flag.NewFlagSet(cmd.name, flag.ContinueOnError)
cmd.flagdefs(fs)
fs.Usage()
}
}
fmt.Fprintln(out, "\ncommon flags:")
flag.PrintDefaults()
}
flag.Parse()
if err := run(context.Background()); err != nil {
fmt.Fprintf(os.Stderr, "%v\n\n", err)
flag.Usage()
os.Exit(2)
}
}
var workerURL string
func run(ctx context.Context) error {
wu := os.Getenv("GO_ECOSYSTEM_WORKER_URL_SUFFIX")
if wu == "" {
return errors.New("need GO_ECOSYSTEM_WORKER_URL_SUFFIX environment variable")
}
workerURL = fmt.Sprintf("https://%s-%s", *env, wu)
name := flag.Arg(0)
for _, cmd := range commands {
if cmd.name == name {
args := flag.Args()[1:]
if cmd.flagdefs != nil {
fs := flag.NewFlagSet(cmd.name, flag.ContinueOnError)
cmd.flagdefs(fs)
if err := fs.Parse(args); err != nil {
return err
}
args = fs.Args()
}
return cmd.run(ctx, args)
}
}
return fmt.Errorf("unknown command %q", name)
}
func doShow(ctx context.Context, args []string) error {
ts, err := identityTokenSource(ctx)
if err != nil {
return err
}
for _, jobID := range args {
if err := showJob(ctx, jobID, ts); err != nil {
return err
}
}
return nil
}
func showJob(ctx context.Context, jobID string, ts oauth2.TokenSource) error {
job, err := requestJSON[jobs.Job](ctx, "jobs/describe?jobid="+jobID, ts)
if err != nil {
return err
}
if *dryRun {
return nil
}
rj := reflect.ValueOf(job).Elem()
rt := rj.Type()
for i := 0; i < rt.NumField(); i++ {
f := rt.Field(i)
if f.IsExported() {
v := rj.FieldByIndex(f.Index)
name, _ := strings.CutPrefix(f.Name, "Num")
fmt.Printf("%s: %v\n", name, v.Interface())
}
}
return nil
}
func doList(ctx context.Context, _ []string) error {
ts, err := identityTokenSource(ctx)
if err != nil {
return err
}
joblist, err := requestJSON[[]jobs.Job](ctx, "jobs/list", ts)
if err != nil {
return err
}
if *dryRun {
return nil
}
d7 := -time.Hour * 24 * 7
weekBefore := time.Now().Add(d7)
tw := tabwriter.NewWriter(os.Stdout, 2, 8, 1, ' ', 0)
fmt.Fprintf(tw, "ID\tUser\tStart Time\tStarted\tFinished\tTotal\tCanceled\n")
for _, j := range *joblist {
if j.StartedAt.After(weekBefore) {
fmt.Fprintf(tw, "%s\t%s\t%s\t%d\t%d\t%d\t%t\n",
j.ID(), j.User, j.StartedAt.Format(time.RFC3339),
j.NumStarted,
j.NumSkipped+j.NumFailed+j.NumErrored+j.NumSucceeded,
j.NumEnqueued,
j.Canceled)
}
}
return tw.Flush()
}
func doCancel(ctx context.Context, args []string) error {
ts, err := identityTokenSource(ctx)
if err != nil {
return err
}
for _, jobID := range args {
url := workerURL + "/jobs/cancel?jobid=" + jobID
if *dryRun {
fmt.Printf("dryrun: GET %s\n", url)
continue
}
if _, err := httpGet(ctx, url, ts); err != nil {
return fmt.Errorf("canceling %q: %w", jobID, err)
}
}
return nil
}
func doWait(ctx context.Context, args []string) error {
if len(args) != 1 {
return errors.New("wrong number of args: want [-i DURATION] JOB_ID")
}
jobID := args[0]
sleepInterval := waitInterval
displayUpdates := sleepInterval != 0
if sleepInterval < time.Second {
sleepInterval = time.Second
}
ts, err := identityTokenSource(ctx)
if err != nil {
return err
}
start := time.Now()
for {
job, err := requestJSON[jobs.Job](ctx, "jobs/describe?jobid="+jobID, ts)
if err != nil {
return err
}
done := job.NumFinished()
if done >= job.NumEnqueued {
break
}
if displayUpdates {
fmt.Printf("%s: %d/%d completed (%d%%)\n",
time.Since(start).Round(time.Second), done, job.NumEnqueued, done*100/job.NumEnqueued)
}
time.Sleep(sleepInterval)
}
fmt.Printf("Job %s finished.\n", jobID)
return nil
}
func doStart(ctx context.Context, args []string) error {
// Validate arguments.
if len(args) == 0 {
return errors.New("wrong number of args: want [-min N] BINARY [ARG1 ARG2 ...]")
}
binaryFile := args[0]
if fi, err := os.Stat(binaryFile); err != nil {
if errors.Is(err, os.ErrNotExist) {
return fmt.Errorf("%s does not exist", binaryFile)
}
return err
} else if fi.IsDir() {
return fmt.Errorf("%s is a directory, not a file", binaryFile)
} else if err := checkIsLinuxAmd64(binaryFile); err != nil {
return err
}
// Check args to binary for whitespace, which we don't support.
binaryArgs := args[1:]
for _, arg := range binaryArgs {
if strings.IndexFunc(arg, unicode.IsSpace) >= 0 {
return fmt.Errorf("arg %q contains whitespace: not supported", arg)
}
}
// Copy binary to GCS if it's not already there.
if canceled, err := uploadAnalysisBinary(ctx, binaryFile); err != nil {
return err
} else if canceled {
return nil
}
// Ask the server to enqueue scan tasks.
its, err := identityTokenSource(ctx)
if err != nil {
return err
}
u := fmt.Sprintf("%s/analysis/enqueue?binary=%s&user=%s", workerURL, filepath.Base(binaryFile), os.Getenv("USER"))
if len(binaryArgs) > 0 {
u += fmt.Sprintf("&args=%s", url.QueryEscape(strings.Join(binaryArgs, " ")))
}
if minImporters >= 0 {
u += fmt.Sprintf("&min=%d", minImporters)
}
if *dryRun {
fmt.Printf("dryrun: GET %s\n", u)
return nil
}
body, err := httpGet(ctx, u, its)
if err != nil {
return err
}
fmt.Printf("%s\n", body)
return nil
}
// checkIsLinuxAmd64 checks if binaryFile is a linux/amd64 Go
// binary. If not, returns an error with appropriate message.
// Otherwise, returns nil.
func checkIsLinuxAmd64(binaryFile string) error {
bin, err := os.Open(binaryFile)
if err != nil {
return err
}
defer bin.Close()
bi, err := buildinfo.Read(bin)
if err != nil {
return err
}
var goos, goarch string
for _, setting := range bi.Settings {
if setting.Key == "GOOS" {
goos = setting.Value
} else if setting.Key == "GOARCH" {
goarch = setting.Value
}
}
if goos != "linux" || goarch != "amd64" {
return fmt.Errorf("binary not built for linux/amd64: GOOS=%s GOARCH=%s", goos, goarch)
}
return nil
}
// uploadAnalysisBinary copies binaryFile to the GCS location used for
// analysis binaries. The user can cancel the upload if the file with
// the same name is already on GCS, upon which true is returned. Otherwise,
// false is returned.
//
// As an optimization, it skips the upload if the file on GCS has the
// same checksum as the local file.
func uploadAnalysisBinary(ctx context.Context, binaryFile string) (canceled bool, err error) {
if *dryRun {
fmt.Printf("dryrun: upload analysis binary %s\n", binaryFile)
return false, nil
}
const bucketName = projectID
binaryName := filepath.Base(binaryFile)
objectName := path.Join("analysis-binaries", binaryName)
ts, err := accessTokenSource(ctx)
if err != nil {
return false, err
}
c, err := storage.NewClient(ctx, option.WithTokenSource(ts))
if err != nil {
return false, err
}
defer c.Close()
bucket := c.Bucket(bucketName)
object := bucket.Object(objectName)
attrs, err := object.Attrs(ctx)
if errors.Is(err, storage.ErrObjectNotExist) {
fmt.Printf("%s binary does not exist on GCS: uploading\n", binaryName)
} else if err != nil {
return false, err
} else if g, w := len(attrs.MD5), md5.Size; g != w {
return false, fmt.Errorf("len(attrs.MD5) = %d, wanted %d", g, w)
} else {
localMD5, err := fileMD5(binaryFile)
if err != nil {
return false, err
}
if bytes.Equal(localMD5, attrs.MD5) {
fmt.Printf("Binary %q on GCS has the same checksum: not uploading.\n", binaryName)
return false, nil
}
// Ask the users if they want to overwrite the existing binary
// while providing more info to help them with their decision.
updated := attrs.Updated.In(time.Local).Format(time.RFC1123) // use local time zone
fmt.Printf("The binary %q already exists on GCS.\n", binaryName)
fmt.Printf("It was last uploaded on %s", updated)
// Communicate uploader info if available.
if uploader := attrs.Metadata[uploaderMetadataKey]; uploader != "" {
fmt.Printf(" by %s", uploader)
}
fmt.Println(".")
fmt.Print("Do you wish to overwrite it? [y/n] ")
var response string
fmt.Scanln(&response)
if r := strings.TrimSpace(response); r != "y" && r != "Y" {
// Accept "Y" and "y" as confirmation.
fmt.Println("Cancelling.")
return true, nil
}
}
fmt.Printf("Uploading.\n")
if err := copyToGCS(ctx, object, binaryFile); err != nil {
return false, err
}
// Add the uploader information for better messaging in the future.
toUpdate := storage.ObjectAttrsToUpdate{
Metadata: map[string]string{uploaderMetadataKey: os.Getenv("USER")},
}
// Refetch the object, otherwise attribute uploading won't have effect.
object = bucket.Object(objectName)
object.Update(ctx, toUpdate) // disregard errors
return false, nil
}
// fileMD5 computes the MD5 checksum of the given file.
func fileMD5(filename string) ([]byte, error) {
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
hash := md5.New()
if _, err := io.Copy(hash, f); err != nil {
return nil, err
}
return hash.Sum(nil)[:], nil
}
// copyToLocalFile copies the filename to the GCS object.
func copyToGCS(ctx context.Context, object *storage.ObjectHandle, filename string) error {
src, err := os.Open(filename)
if err != nil {
return err
}
defer src.Close()
dest := object.NewWriter(ctx)
if _, err := io.Copy(dest, src); err != nil {
return err
}
return dest.Close()
}
func doResults(ctx context.Context, args []string) (err error) {
if len(args) == 0 {
return errors.New("wrong number of args: want [-f] [-o FILE.json] JOB_ID")
}
jobID := args[0]
ts, err := identityTokenSource(ctx)
if err != nil {
return err
}
job, err := requestJSON[jobs.Job](ctx, "jobs/describe?jobid="+jobID, ts)
if err != nil {
return err
}
done := job.NumFinished()
if !force && done < job.NumEnqueued {
return fmt.Errorf("job not finished (%d/%d completed); use -f for partial results", done, job.NumEnqueued)
}
results, err := requestJSON[[]*analysis.Result](ctx, "jobs/results?jobid="+jobID, ts)
if err != nil {
return err
}
out := os.Stdout
if outfile != "" {
out, err = os.Create(outfile)
if err != nil {
return err
}
defer func() { err = errors.Join(err, out.Close()) }()
}
enc := json.NewEncoder(out)
enc.SetIndent("", "\t")
return enc.Encode(results)
}
// requestJSON requests the path from the worker, then reads the returned body
// and unmarshals it as JSON.
func requestJSON[T any](ctx context.Context, path string, ts oauth2.TokenSource) (*T, error) {
url := workerURL + "/" + path
if *dryRun {
fmt.Printf("GET %s\n", url)
return nil, nil
}
body, err := httpGet(ctx, url, ts)
if err != nil {
return nil, err
}
var t T
if err := json.Unmarshal(body, &t); err != nil {
return nil, err
}
return &t, nil
}
// httpGet makes a GET request to the given URL with the given identity token.
// It reads the body and returns the HTTP response and the body.
func httpGet(ctx context.Context, url string, ts oauth2.TokenSource) (body []byte, err error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
token, err := ts.Token()
if err != nil {
return nil, err
}
token.SetAuthHeader(req)
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
body, err = io.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("reading body (%s): %v", res.Status, err)
}
if res.StatusCode != 200 {
return nil, fmt.Errorf("%s: %s", res.Status, body)
}
return body, nil
}
var serviceAccountEmail = fmt.Sprintf("impersonate@%s.iam.gserviceaccount.com", projectID)
func accessTokenSource(ctx context.Context) (oauth2.TokenSource, error) {
return impersonate.CredentialsTokenSource(ctx, impersonate.CredentialsConfig{
TargetPrincipal: serviceAccountEmail,
Scopes: []string{"https://www.googleapis.com/auth/cloud-platform"},
})
}
func identityTokenSource(ctx context.Context) (oauth2.TokenSource, error) {
return impersonate.IDTokenSource(ctx, impersonate.IDTokenConfig{
TargetPrincipal: serviceAccountEmail,
Audience: workerURL,
IncludeEmail: true,
})
}