// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package worker

import (
	"bufio"
	"context"
	"crypto/sha256"
	"encoding/hex"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"net/http"
	"os"
	"os/exec"
	"path"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	bq "cloud.google.com/go/bigquery"
	"cloud.google.com/go/storage"
	"golang.org/x/pkgsite-metrics/internal/analysis"
	"golang.org/x/pkgsite-metrics/internal/derrors"
	"golang.org/x/pkgsite-metrics/internal/jobs"
	"golang.org/x/pkgsite-metrics/internal/log"
	"golang.org/x/pkgsite-metrics/internal/queue"
	"golang.org/x/pkgsite-metrics/internal/sandbox"
	"golang.org/x/pkgsite-metrics/internal/scan"
	"golang.org/x/pkgsite-metrics/internal/version"
)

type analysisServer struct {
	*Server
	openFile           openFileFunc // Used to open binary files from GCS, except for testing.
	storedWorkVersions map[analysis.WorkVersionKey]analysis.WorkVersion
}

func newAnalysisServer(ctx context.Context, s *Server) (*analysisServer, error) {
	if s.cfg.BinaryBucket == "" {
		return nil, errors.New("missing binary bucket (define GO_ECOSYSTEM_BINARY_BUCKET)")
	}
	c, err := storage.NewClient(ctx)
	if err != nil {
		return nil, err
	}
	bucket := c.Bucket(s.cfg.BinaryBucket)
	return &analysisServer{
		Server:             s,
		openFile:           gcsOpenFileFunc(ctx, bucket),
		storedWorkVersions: make(map[analysis.WorkVersionKey]analysis.WorkVersion),
	}, nil
}

const analysisBinariesBucketDir = "analysis-binaries"

func (s *analysisServer) handleScan(w http.ResponseWriter, r *http.Request) (err error) {
	defer derrors.Wrap(&err, "analysisServer.handleScan")
	ctx := r.Context()

	req, err := analysis.ParseScanRequest(r, "/analysis/scan")
	if err != nil {
		return fmt.Errorf("%w: %v", derrors.InvalidArgument, err)
	}

	// If there is a job and it's canceled, return immediately.
	if req.JobID != "" && s.jobDB != nil {
		job, err := s.jobDB.GetJob(ctx, req.JobID)
		if err != nil {
			log.Errorf(ctx, err, "failed to get job for id %q", req.JobID)
		} else if job.Canceled {
			log.Infof(ctx, "job %q canceled; skipping", req.JobID)
			return nil
		}
	}

	// incrementJob increments name value by 1 for the current job.
	// If there is an error, it logs it instead of failing.
	incrementJob := func(name string) {
		if req.JobID != "" && s.jobDB != nil {
			if err := s.jobDB.Increment(ctx, req.JobID, name, 1); err != nil {
				log.Errorf(ctx, err, "failed to update job for id %q", req.JobID)
			}
		}
	}

	incrementJob("NumStarted")

	// Handle errors here.
	defer func() {
		if err != nil {
			incrementJob("NumFailed")
		}
	}()

	if req.Suffix != "" {
		return fmt.Errorf("%w: analysis: only implemented for whole modules (no suffix)", derrors.InvalidArgument)
	}
	if req.Binary == "" {
		return fmt.Errorf("%w: analysis: missing binary", derrors.InvalidArgument)
	}
	if req.Binary != path.Base(req.Binary) {
		return fmt.Errorf("%w: analysis: binary name contains slashes (must be a basename)", derrors.InvalidArgument)
	}
	localBinaryPath := path.Join(s.cfg.BinaryDir, req.Binary)
	srcPath := path.Join(analysisBinariesBucketDir, req.Binary)
	const executable = true
	if err := copyToLocalFile(localBinaryPath, executable, srcPath, s.openFile); err != nil {
		return err
	}
	defer derrors.Cleanup(&err, func() error { return os.Remove(localBinaryPath) })

	binaryHash, err := hashFile(localBinaryPath)
	if err != nil {
		return err
	}
	if binaryHash != req.BinaryVersion {
		return fmt.Errorf("%w: analysis: for binary %s, hash of download file %s does not match hash in request %s",
			derrors.InvalidArgument, req.Binary, binaryHash, req.BinaryVersion)
	}
	wv := analysis.WorkVersion{
		BinaryArgs:    req.Args,
		WorkerVersion: s.cfg.VersionID,
		SchemaVersion: analysis.SchemaVersion,
		BinaryVersion: binaryHash,
	}

	if err := s.readWorkVersion(ctx, req.Module, req.Version, req.Binary); err != nil {
		return err
	}
	key := analysis.WorkVersionKey{Module: req.Module, Version: req.Version, Binary: req.Binary}
	if wv == s.storedWorkVersions[key] {
		log.Infof(ctx, "skipping (work version unchanged): %+v", key)
		incrementJob("NumSkipped")
		return nil
	}

	row := s.scan(ctx, req, localBinaryPath, wv)
	if err := writeResult(ctx, req.Serve, w, s.bqClient, analysis.TableName, row); err != nil {
		return err
	}
	if row.Error != "" {
		incrementJob("NumErrored")
	} else {
		incrementJob("NumSucceeded")
	}
	return nil
}

func (s *analysisServer) readWorkVersion(ctx context.Context, module_path, version, binary string) error {
	s.mu.Lock()
	defer s.mu.Unlock()
	key := analysis.WorkVersionKey{Module: module_path, Version: version, Binary: binary}
	if _, ok := s.storedWorkVersions[key]; ok {
		return nil
	}
	if s.bqClient == nil {
		return nil
	}
	wv, err := analysis.ReadWorkVersion(ctx, s.bqClient, module_path, version, binary)
	if err != nil {
		return err
	}
	if wv != nil {
		s.storedWorkVersions[key] = *wv
	}
	return nil
}

func (s *analysisServer) scan(ctx context.Context, req *analysis.ScanRequest, localBinaryPath string, wv analysis.WorkVersion) *analysis.Result {
	row := &analysis.Result{
		ModulePath:  req.Module,
		Version:     req.Version,
		BinaryName:  req.Binary,
		WorkVersion: wv,
	}
	hasGoMod := true
	err := doScan(ctx, req.Module, req.Version, req.Insecure, func() (err error) {
		// Create a module directory. scanInternal will write the module contents there,
		// and both the analysis binary and addSource will read them.
		mdir := moduleDir(req.Module, req.Version)
		defer derrors.Cleanup(&err, func() error { return os.RemoveAll(mdir) })

		hasGoMod = fileExists(filepath.Join(mdir, "go.mod")) // for precise error breakdown

		jsonTree, err := s.scanInternal(ctx, req, localBinaryPath, mdir)
		if err != nil {
			return err
		}
		info, err := s.proxyClient.Info(ctx, req.Module, req.Version)
		if err != nil {
			return fmt.Errorf("%w: %v", derrors.ProxyError, err)
		}
		row.Version = info.Version
		row.CommitTime = info.Time
		row.Diagnostics = analysis.JSONTreeToDiagnostics(jsonTree)
		return addSource(ctx, row.Diagnostics, 1)
	})
	if err != nil {
		// The errors are classified as to explicitly make a distinction
		// between misc errors for modules and non-modules. The intended
		// audience for analysis pipeline will directly look at errors.
		// Without this distinction, experiments where there are a lot of
		// misc errors might sway users into thinking that something is
		// wrong with their analysis, while in fact it can be the case
		// that synthetic (non-modules) are just outdated.
		switch {
		case isNoModulesSpecified(err):
			// We try to turn every non-module project into a module, so this
			// branch should never be reached. We keep this for sanity and to
			// catch any regressions.
			err = fmt.Errorf("%v: %w", err, derrors.LoadPackagesNoGoModError)
		case isNoRequiredModule(err):
			err = fmt.Errorf("%v: %w", err, derrors.LoadPackagesNoRequiredModuleError)
		case isTooManyFiles(err):
			err = fmt.Errorf("%v: %w", err, derrors.ScanModuleTooManyOpenFiles)
		case isMissingGoSumEntry(err):
			err = fmt.Errorf("%v: %w", err, derrors.LoadPackagesMissingGoSumEntryError)
		case isReplacingWithLocalPath(err):
			err = fmt.Errorf("%v: %w", err, derrors.LoadPackagesImportedLocalError)
		case isModVendor(err):
			err = fmt.Errorf("%v: %w", err, derrors.LoadVendorError)
		case isProxyCacheMiss(err):
			err = fmt.Errorf("%v: %w", err, derrors.ProxyError)
		case isMemoryIssue(err):
			err = fmt.Errorf("%v: %w", err, derrors.ScanModuleMemoryLimitExceeded)
		case isBuildIssue(err):
			err = fmt.Errorf("%v: %w", err, derrors.LoadPackagesError)
		case !hasGoMod:
			// Classify misc errors on synthetic modules separately.
			err = fmt.Errorf("%v: %w", err, derrors.ScanSyntheticModuleError)
		default:
		}
		row.AddError(err)
	}
	row.SortVersion = version.ForSorting(row.Version)
	return row
}

func (s *analysisServer) scanInternal(ctx context.Context, req *analysis.ScanRequest, binaryPath, moduleDir string) (jt analysis.JSONTree, err error) {
	if err := prepareModule(ctx, req.Module, req.Version, moduleDir, s.proxyClient, req.Insecure, !req.SkipInit); err != nil {
		return nil, err
	}
	var sbox *sandbox.Sandbox
	if !req.Insecure {
		sbox = sandbox.New("/bundle")
		sbox.Runsc = "/usr/local/bin/runsc"
	}
	return runAnalysisBinary(sbox, binaryPath, req.Args, moduleDir)
}

func hashFile(filename string) (_ string, err error) {
	defer derrors.Wrap(&err, "hashFile(%q)", filename)
	f, err := os.Open(filename)
	if err != nil {
		return "", err
	}
	defer f.Close()
	return hashReader(f)
}

func hashReader(r io.Reader) (string, error) {
	h := sha256.New()
	if _, err := io.Copy(h, r); err != nil {
		return "", err
	}
	return hex.EncodeToString(h.Sum(nil)), nil
}

// runAnalysisBinary runs the binary on the module.
func runAnalysisBinary(sbox *sandbox.Sandbox, binaryPath, reqArgs, moduleDir string) (analysis.JSONTree, error) {
	args := []string{"-json"}
	args = append(args, strings.Fields(reqArgs)...)
	args = append(args, "./...")
	out, err := runBinaryInDir(sbox, binaryPath, args, moduleDir)
	if err != nil {
		return nil, fmt.Errorf("running analysis binary %s: %s", binaryPath, derrors.IncludeStderr(err))
	}
	var tree analysis.JSONTree
	if err := json.Unmarshal(out, &tree); err != nil {
		return nil, err
	}
	return tree, nil
}

func runBinaryInDir(sbox *sandbox.Sandbox, path string, args []string, dir string) ([]byte, error) {
	if sbox == nil {
		cmd := exec.Command(path, args...)
		cmd.Dir = dir
		return cmd.Output()
	}
	cmd := sbox.Command(path, args...)
	cmd.Dir = dir
	return cmd.Output()
}

// addSource adds source code lines to the diagnostics.
// Each diagnostic's position includes a full file path and line number.
// addSource reads the file at the line, and includes nContext lines from above
// and below.
func addSource(ctx context.Context, ds []*analysis.Diagnostic, nContext int) error {
	for _, d := range ds {
		if d.Position == "" {
			// some binaries might collect basic stats, such
			// as number of occurrences of a certain pattern.
			// It might not make sense for them to report a
			// position.
			continue
		}

		file, line, _, err := parsePosition(d.Position)
		if err != nil {
			return err
		}
		source, err := readSource(file, line, nContext)
		if err != nil {
			return fmt.Errorf("reading %s:%d: %w", file, line, err)
		}
		d.Source = bq.NullString{StringVal: source, Valid: true}

		if url, err := sourceURL(d.Position, line); err == nil {
			d.Position = url
		} else {
			// URL creation failure should not result in an error of the analysis run.
			log.Errorf(ctx, err, "url creation failed for position %s", d.Position)
		}
	}
	return nil
}

// parsePosition parses a position from a diagnostic.
// Positions are in the format file:line:col.
func parsePosition(pos string) (file string, line, col int, err error) {
	defer derrors.Wrap(&err, "parsePosition(%q)", pos)
	i := strings.LastIndexByte(pos, ':')
	if i < 0 {
		return "", 0, 0, errors.New("missing colon")
	}
	col, err = strconv.Atoi(pos[i+1:])
	if err != nil {
		return "", 0, 0, err
	}
	pos = pos[:i]
	i = strings.LastIndexByte(pos, ':')
	if i < 0 {
		return "", 0, 0, errors.New("missing second colon")
	}
	line, err = strconv.Atoi(pos[i+1:])
	if err != nil {
		return "", 0, 0, err
	}
	return pos[:i], line, col, nil
}

// sourceURL creates a URL showing the code corresponding to
// position pos and highlighting line.
func sourceURL(pos string, line int) (string, error) {
	// Trim /tmp/modules/ from the position string.
	relPos := strings.TrimPrefix(pos, modulesDir+"/")
	if relPos == pos {
		return "", errors.New("unexpected prefix")
	}
	i := strings.IndexByte(relPos, ':')
	if i < 0 {
		return "", errors.New("missing colon in position")
	}
	path := relPos[:i]
	return fmt.Sprintf("https://go-mod-viewer.appspot.com/%s#L%d", path, line), nil

}

// readSource returns the given line (1-based) from the file, along with
// nContext lines above and below it.
func readSource(file string, line int, nContext int) (_ string, err error) {
	defer derrors.Wrap(&err, "readSource(%q, %d, %d)", file, line, nContext)
	f, err := os.Open(file)
	if err != nil {
		return "", err
	}
	defer f.Close()
	scan := bufio.NewScanner(f)
	var lines []string
	n := 0 // 1-based line number
	for scan.Scan() {
		n++
		if n < line-nContext {
			continue
		}
		if n > line+nContext {
			break
		}
		lines = append(lines, scan.Text())
	}
	if scan.Err() != nil {
		return "", scan.Err()
	}
	return strings.Join(lines, "\n"), nil
}

func (s *analysisServer) handleEnqueue(w http.ResponseWriter, r *http.Request) (err error) {
	defer derrors.Wrap(&err, "analysisServer.handleEnqueue")
	ctx := r.Context()
	params := &analysis.EnqueueParams{Min: defaultMinImportedByCount}
	if err := scan.ParseParams(r, params); err != nil {
		return fmt.Errorf("%w: %v", derrors.InvalidArgument, err)
	}
	if params.Binary == "" {
		return fmt.Errorf("%w: analysis: missing binary", derrors.InvalidArgument)
	}
	if params.Binary != path.Base(params.Binary) {
		return fmt.Errorf("%w: analysis: binary name contains slashes (must be a basename)", derrors.InvalidArgument)
	}
	srcPath := path.Join(analysisBinariesBucketDir, params.Binary)
	rc, err := s.openFile(srcPath)
	if err != nil {
		return err
	}
	defer rc.Close()
	binaryHash, err := hashReader(rc)
	if err != nil {
		return err
	}
	mods, err := readModules(ctx, s.cfg, params.File, params.Min)
	if err != nil {
		return err
	}

	// If a user was provided, create a Job.
	var jobID string
	sj := ""
	if params.User != "" {
		job := jobs.NewJob(params.User, time.Now(), r.URL.String(), params.Binary, binaryHash, params.Args)
		jobID = job.ID()
		if err := s.jobDB.CreateJob(ctx, job); err != nil {
			sj = fmt.Sprintf(", but could not create job: %v", err)
		} else {
			sj = ", job ID is " + jobID
		}
	}

	tasks := createAnalysisQueueTasks(params, jobID, binaryHash, mods)
	err = enqueueTasks(ctx, tasks, s.queue,
		&queue.Options{Namespace: "analysis", TaskNameSuffix: params.Suffix})
	if err != nil {
		if err := s.jobDB.DeleteJob(ctx, jobID); err != nil {
			log.Errorf(ctx, err, "failed to delete job upon unsuccessful enqueuing")
		}
		return fmt.Errorf("enequeue failed: %w", err)
	}
	if jobID != "" {
		s.jobDB.Increment(ctx, jobID, "NumEnqueued", len(tasks))
	}
	// Communicate enqueue status for better usability.
	fmt.Fprintf(w, "enqueued %d analysis tasks successfully%s\n", len(tasks), sj)
	return nil
}

func createAnalysisQueueTasks(params *analysis.EnqueueParams, jobID string, binaryVersion string, mods []scan.ModuleSpec) []queue.Task {
	var tasks []queue.Task
	for _, mod := range mods {
		tasks = append(tasks, &analysis.ScanRequest{
			ModuleURLPath: scan.ModuleURLPath{
				Module:  mod.Path,
				Version: mod.Version,
			},
			ScanParams: analysis.ScanParams{
				Binary:        params.Binary,
				BinaryVersion: binaryVersion,
				Args:          params.Args,
				ImportedBy:    mod.ImportedBy,
				Insecure:      params.Insecure,
				JobID:         jobID,
				SkipInit:      params.SkipInit,
			},
		})
	}
	return tasks
}
