// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main

import (
	"bufio"
	"context"
	"encoding/json"
	"errors"
	"flag"
	"fmt"
	"io/fs"
	"log"
	"net/http"
	"os"
	"strings"
	"time"

	cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
	taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb"
	"golang.org/x/exp/slog"
	"golang.org/x/mod/semver"
	"golang.org/x/telemetry/godev/internal/config"
	"golang.org/x/telemetry/godev/internal/content"
	ilog "golang.org/x/telemetry/godev/internal/log"
	"golang.org/x/telemetry/godev/internal/middleware"
	"golang.org/x/telemetry/godev/internal/storage"
	tconfig "golang.org/x/telemetry/internal/config"
	contentfs "golang.org/x/telemetry/internal/content"
	"golang.org/x/telemetry/internal/telemetry"
	"golang.org/x/telemetry/internal/unionfs"
)

func main() {
	flag.Parse()
	ctx := context.Background()
	cfg := config.NewConfig()

	if cfg.UseGCS {
		slog.SetDefault(slog.New(ilog.NewGCPLogHandler()))
	}

	buckets, err := storage.NewAPI(ctx, cfg)
	if err != nil {
		log.Fatal(err)
	}
	ucfg, err := tconfig.ReadConfig(cfg.UploadConfig)
	if err != nil {
		log.Fatal(err)
	}
	fsys := fsys(cfg.DevMode)
	cserv := content.Server(fsys)
	mux := http.NewServeMux()

	mux.Handle("/", cserv)
	mux.Handle("/merge/", handleMerge(buckets))
	mux.Handle("/chart/", handleChart(ucfg, buckets))
	mux.Handle("/queue-tasks/", handleTasks(cfg))

	mw := middleware.Chain(
		middleware.Log(slog.Default()),
		middleware.Timeout(cfg.RequestTimeout),
		middleware.RequestSize(cfg.MaxRequestBytes),
		middleware.Recover(),
	)

	fmt.Printf("server listening at http://localhost:%s\n", cfg.WorkerPort)
	log.Fatal(http.ListenAndServe(":"+cfg.WorkerPort, mw(mux)))
}

// handleTasks will populate the task queue that processes report
// data. Cloud Scheduler will be instrumented to call this endpoint
// daily to merge reports and generate chart data. The merge tasks
// will merge the previous weeks reports and the chart tasks will do
// the same minus one day.
// TODO(golang/go#62575): adjust the date range to align with report
// upload cutoff.
func handleTasks(cfg *config.Config) content.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) error {
		now := time.Now().UTC()
		for i := 7; i > 0; i-- {
			date := now.AddDate(0, 0, -1*i).Format("2006-01-02")
			url := cfg.WorkerURL + "/merge/?date=" + date
			if _, err := createHTTPTask(cfg, url); err != nil {
				return err
			}
		}
		for i := 8; i > 1; i-- {
			date := now.AddDate(0, 0, -1*i).Format("2006-01-02")
			url := cfg.WorkerURL + "/chart/?date=" + date
			if _, err := createHTTPTask(cfg, url); err != nil {
				return err
			}
		}
		return nil
	}
}

// createHTTPTask constructs a task with a authorization token
// and HTTP target then adds it to a Queue.
func createHTTPTask(cfg *config.Config, url string) (*taskspb.Task, error) {
	ctx := context.Background()
	client, err := cloudtasks.NewClient(ctx)
	if err != nil {
		return nil, fmt.Errorf("cloudtasks.NewClient: %w", err)
	}
	defer client.Close()

	queuePath := fmt.Sprintf("projects/%s/locations/%s/queues/%s", cfg.ProjectID, cfg.LocationID, cfg.QueueID)
	req := &taskspb.CreateTaskRequest{
		Parent: queuePath,
		Task: &taskspb.Task{
			MessageType: &taskspb.Task_HttpRequest{
				HttpRequest: &taskspb.HttpRequest{
					HttpMethod: taskspb.HttpMethod_POST,
					Url:        url,
					AuthorizationHeader: &taskspb.HttpRequest_OidcToken{
						OidcToken: &taskspb.OidcToken{
							ServiceAccountEmail: cfg.IAPServiceAccount,
							Audience:            cfg.ClientID,
						},
					},
				},
			},
		},
	}

	createdTask, err := client.CreateTask(ctx, req)
	if err != nil {
		return nil, fmt.Errorf("cloudtasks.CreateTask: %w", err)
	}
	return createdTask, nil
}

// TODO: monitor duration and processed data volume.
func handleMerge(s *storage.API) content.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) error {
		ctx := r.Context()
		date := r.URL.Query().Get("date")
		if _, err := time.Parse("2006-01-02", date); err != nil {
			return content.Error(err, http.StatusBadRequest)
		}
		it := s.Upload.Objects(ctx, date)
		mergeWriter, err := s.Merge.Object(date + ".json").NewWriter(ctx)
		if err != nil {
			return err
		}
		defer mergeWriter.Close()
		encoder := json.NewEncoder(mergeWriter)
		var count int
		for {
			obj, err := it.Next()
			if errors.Is(err, storage.ErrObjectIteratorDone) {
				break
			}
			if err != nil {
				return err
			}
			count++
			reader, err := s.Upload.Object(obj).NewReader(ctx)
			if err != nil {
				return err
			}
			defer reader.Close()
			var report telemetry.Report
			if err := json.NewDecoder(reader).Decode(&report); err != nil {
				return err
			}
			if err := encoder.Encode(report); err != nil {
				return err
			}
			if err := reader.Close(); err != nil {
				return err
			}
		}
		if err := mergeWriter.Close(); err != nil {
			return err
		}
		msg := fmt.Sprintf("merged %d reports into %s/%s", count, s.Merge.URI(), date)
		return content.Text(w, msg, http.StatusOK)
	}
}

func handleChart(cfg *tconfig.Config, s *storage.API) content.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) error {
		ctx := r.Context()
		// TODO: use start date and end date to create a timeseries of data.
		date := r.URL.Query().Get("date")
		if _, err := time.Parse("2006-01-02", date); err != nil {
			return content.Error(err, http.StatusBadRequest)
		}
		in, err := s.Merge.Object(date + ".json").NewReader(ctx)
		if errors.Is(err, storage.ErrObjectNotExist) {
			return content.Error(err, http.StatusNotFound)
		}
		if err != nil {
			return err
		}
		defer in.Close()

		var reports []*telemetry.Report
		var xs []float64
		scanner := bufio.NewScanner(in)
		for scanner.Scan() {
			var report telemetry.Report
			if err := json.Unmarshal(scanner.Bytes(), &report); err != nil {
				return err
			}
			reports = append(reports, &report)
			xs = append(xs, report.X)
		}
		if err := in.Close(); err != nil {
			return err
		}

		data := nest(reports)
		charts := charts(cfg, date, data, xs)
		obj := fmt.Sprintf("%s.json", date)
		out, err := s.Chart.Object(obj).NewWriter(ctx)
		if err != nil {
			return err
		}
		defer out.Close()

		if err := json.NewEncoder(out).Encode(charts); err != nil {
			return err
		}
		if err := out.Close(); err != nil {
			return err
		}

		msg := fmt.Sprintf("processed %d reports into %s", len(reports), s.Chart.URI()+"/"+obj)
		return content.Text(w, msg, http.StatusOK)
	}
}

type chartdata struct {
	DateRange  [2]string
	Programs   []*program
	NumReports int
}

type program struct {
	ID     string
	Name   string
	Charts []*chart
}

type chart struct {
	ID   string
	Name string
	Type string
	Data []*datum
}

func (c *chart) String() string {
	bytes, _ := json.Marshal(c)
	return string(bytes)
}

type datum struct {
	Week  string
	Key   string
	Value float64
}

func charts(cfg *tconfig.Config, date string, d data, xs []float64) *chartdata {
	result := &chartdata{DateRange: [2]string{date, date}, NumReports: len(xs)}
	for _, p := range cfg.Programs {
		prog := &program{ID: "charts:" + p.Name, Name: p.Name}
		result.Programs = append(result.Programs, prog)
		var charts []*chart
		if !telemetry.IsToolchainProgram(p.Name) {
			charts = append(charts, partition(d, p.Name, "Version", p.Versions))
		}
		charts = append(charts,
			partition(d, p.Name, "GOOS", cfg.GOOS),
			partition(d, p.Name, "GOARCH", cfg.GOARCH),
			partition(d, p.Name, "GoVersion", cfg.GoVersion))
		for _, c := range p.Counters {
			// TODO: add support for histogram counters by getting the counter type
			// from the chart config.
			charts = append(charts, partition(d, p.Name, c.Name, tconfig.Expand(c.Name)))
		}
		for _, p := range charts {
			if p != nil {
				prog.Charts = append(prog.Charts, p)
			}
		}
	}
	return result
}

func histogram(dat data, program, name string, counters []string, xs []float64) *chart {
	count := &chart{
		ID:   "charts:" + program + ":" + name,
		Name: name,
		Type: "histogram",
	}
	for wk := range dat {
		for _, c := range counters {
			prefix, _ := splitCounterName(c)
			pk := programKey{program}
			gk := graphKey{prefix}
			ck := counterKey{c}

			// TODO: consider using pre-computed distribution buckets to reduce the size
			// of the output.
			for _, x := range xs {
				xk := xKey{x}
				v := dat[wk][pk][gk][ck][xk]
				total := dat[wk][pk][gk][counterKey{gk.prefix}][xk]
				_, bucket := splitCounterName(c)
				if total == 0 {
					d := &datum{
						Week: wk.date,
						Key:  bucket,
					}
					count.Data = append(count.Data, d)
					continue
				}
				d := &datum{
					Week:  wk.date,
					Key:   bucket,
					Value: float64(v) / float64(total),
				}
				count.Data = append(count.Data, d)
			}
		}
	}
	return count
}

// partition builds a chart for the program and the counter. It can return nil
// if there is no data for the counter in dat.
func partition(dat data, program, counterPrefix string, counters []string) *chart {
	count := &chart{
		ID:   "charts:" + program + ":" + counterPrefix,
		Name: counterPrefix,
		Type: "partition",
	}
	pk := programKey{program}
	prefix, _ := splitCounterName(counterPrefix)
	gk := graphKey{prefix}
	for wk := range dat {
		// TODO: when should this be number of reports?
		// total := len(xs)
		total := len(dat[wk][pk][gk][counterKey{gk.prefix}])
		if total == 0 {
			return nil
		}
		// We group versions into major minor buckets, we must skip
		// major minor versions we've already added to the dataset.
		seen := make(map[string]bool)
		for _, b := range counters {
			// TODO(hyangah): let caller normalize names in counters.
			counter := normalizeCounterName(counterPrefix, b)
			if seen[counter] {
				continue
			}
			seen[counter] = true
			ck := counterKey{counter}
			// number of reports where count prefix:bucket > 0
			n := len(dat[wk][pk][gk][ck])
			_, bucket := splitCounterName(counter)
			d := &datum{
				Week:  wk.date,
				Key:   bucket,
				Value: float64(n) / float64(total),
			}
			count.Data = append(count.Data, d)
		}
	}
	return count
}

type weekKey struct {
	date string
}

type programKey struct {
	program string
}

type graphKey struct {
	prefix string
}

type counterKey struct {
	counter string
}

type xKey struct {
	X float64
}

type data map[weekKey]map[programKey]map[graphKey]map[counterKey]map[xKey]int64

// Names of special counters.
// Unlike other counters, these are constructed from the metadata in the report.
const (
	versionCounter   = "Version"
	goosCounter      = "GOOS"
	goarchCounter    = "GOARCH"
	goversionCounter = "GoVersion"
)

// nest groups the report data by week, program, prefix, counter, and x value
// summing together counter values for each program report in a report.
func nest(reports []*telemetry.Report) data {
	result := make(data)
	for _, r := range reports {
		for _, p := range r.Programs {
			result.writeCount(r.Week, p.Program, versionCounter, p.Version, r.X, 1)
			result.writeCount(r.Week, p.Program, goosCounter, p.GOOS, r.X, 1)
			result.writeCount(r.Week, p.Program, goarchCounter, p.GOARCH, r.X, 1)
			result.writeCount(r.Week, p.Program, goversionCounter, p.GoVersion, r.X, 1)
			for c, value := range p.Counters {
				prefix, _ := splitCounterName(c)
				result.writeCount(r.Week, p.Program, prefix, c, r.X, value)
			}
		}
	}
	return result
}

// readCount reads the count value based on the input keys.
// Return nil if any key does not exist.
func (d data) readCount(week, program, prefix, counter string, x float64) (int64, error) {
	wk := weekKey{week}
	if _, ok := d[wk]; !ok {
		return -1, fmt.Errorf("missing weekKey %q", week)
	}
	pk := programKey{program}
	if _, ok := d[wk][pk]; !ok {
		return -1, fmt.Errorf("missing programKey %q", program)
	}
	gk := graphKey{prefix}
	if _, ok := d[wk][pk][gk]; !ok {
		return -1, fmt.Errorf("missing graphKey key %q", prefix)
	}
	ck := counterKey{counter}
	if _, ok := d[wk][pk][gk][ck]; !ok {
		return -1, fmt.Errorf("missing counterKey %v", counter)
	}
	return d[wk][pk][gk][ck][xKey{x}], nil
}

// writeCount writes the counter values to the result. When a report contains
// multiple program reports for the same program, the value of the counters
// in that report are summed together.
func (d data) writeCount(week, program, prefix, counter string, x float64, value int64) {
	wk := weekKey{week}
	if _, ok := d[wk]; !ok {
		d[wk] = make(map[programKey]map[graphKey]map[counterKey]map[xKey]int64)
	}
	pk := programKey{program}
	if _, ok := d[wk][pk]; !ok {
		d[wk][pk] = make(map[graphKey]map[counterKey]map[xKey]int64)
	}
	gk := graphKey{prefix}
	if _, ok := d[wk][pk][gk]; !ok {
		d[wk][pk][gk] = make(map[counterKey]map[xKey]int64)
	}
	// TODO(hyangah): let caller pass the normalized counter name.
	counter = normalizeCounterName(prefix, counter)
	ck := counterKey{counter}
	if _, ok := d[wk][pk][gk][ck]; !ok {
		d[wk][pk][gk][ck] = make(map[xKey]int64)
	}
	xk := xKey{x}
	d[wk][pk][gk][ck][xk] += value
	// record the total for all counters with the prefix
	// as the bucket name.
	if prefix != counter {
		ck = counterKey{prefix}
		if _, ok := d[wk][pk][gk][ck]; !ok {
			d[wk][pk][gk][ck] = make(map[xKey]int64)
		}
		d[wk][pk][gk][ck][xk] += value
	}
}

// normalizeCounterName normalizes the counter name.
// More specifically, program version, goos, goarch, and goVersion
// are not a real counter, but information from the metadata in the report.
// This function constructs pseudo counter names to handle them
// like other normal counters in aggregation and chart drawing.
// To limit the cardinality of version and goVersion, this function
// uses only major and minor version numbers in the pseudo-counter names.
// If the counter is a normal counter name, it is returned as is.
func normalizeCounterName(prefix, counter string) string {
	switch prefix {
	case versionCounter:
		if counter == "devel" {
			return prefix + ":" + counter
		}
		if strings.HasPrefix(counter, "go") {
			return prefix + ":" + goMajorMinor(counter)
		}
		return prefix + ":" + semver.MajorMinor(counter)
	case goosCounter:
		return prefix + ":" + counter
	case goarchCounter:
		return prefix + ":" + counter
	case goversionCounter:
		return prefix + ":" + goMajorMinor(counter)
	}
	return counter
}

// splitCounterName gets splits the prefix and bucket splitCounterName of a counter name
// or a bucket name. For an input with no bucket part prefix and bucket
// are the same.
func splitCounterName(name string) (prefix, bucket string) {
	prefix, bucket, found := strings.Cut(name, ":")
	if !found {
		bucket = prefix
	}
	return prefix, bucket
}

// goMajorMinor gets the go<Maj>,<Min> version for a given go version.
// For example, go1.20.1 -> go1.20.
// TODO(hyangah): replace with go/version.Lang (available from go1.22)
// after our builders stop running go1.21.
func goMajorMinor(v string) string {
	v = v[2:]
	maj, x, ok := cutInt(v)
	if !ok {
		return ""
	}
	x = x[1:]
	min, _, ok := cutInt(x)
	if !ok {
		return ""
	}
	return fmt.Sprintf("go%s.%s", maj, min)
}

// cutInt scans the leading decimal number at the start of x to an integer
// and returns that value and the rest of the string.
func cutInt(x string) (n, rest string, ok bool) {
	i := 0
	for i < len(x) && '0' <= x[i] && x[i] <= '9' {
		i++
	}
	if i == 0 || x[0] == '0' && i != 1 {
		return "", "", false
	}
	return x[:i], x[i:], true
}
func fsys(fromOS bool) fs.FS {
	var f fs.FS = contentfs.FS
	if fromOS {
		f = os.DirFS("internal/content")
	}
	f, err := unionfs.Sub(f, "worker", "shared")
	if err != nil {
		log.Fatal(err)
	}
	return f
}
