// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package gerrit mirrors Gerrit CL state in a [storage.DB].
package gerrit

import (
	"bufio"
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"iter"
	"log/slog"
	"net/http"
	"net/url"
	"strconv"
	"sync/atomic"
	"testing"
	"time"

	"golang.org/x/oscar/internal/secret"
	"golang.org/x/oscar/internal/storage"
	"golang.org/x/oscar/internal/storage/timed"
	"rsc.io/ordered"
)

const (
	syncProjectKind  = "gerrit.SyncProject"
	changeKind       = "gerrit.Change"
	commentKind      = "gerrit.Comment"
	changeUpdateKind = "gerrit.ChangeUpdate"
)

// For a Gerrit project we store changes indexed by change number.
// For each change we store the latest known state, as the JSON
// encoding of what Gerrit calls a ChangeInfo entity.
// We also store comments on the change, a JSON array of Gerrit
// CommentInfo entities.
//
// We store a simple timed stream of updated change numbers.
// We don't store historical status of changes, as all historical
// information is stored in the ChangeInfo and CommentInto entities.
//
// The following key schemas are stored in the database:
//
//	["gerrit.SyncProject", Instance, Project] => JSON of projectSync structure
//	["gerrit.Change", Instance, Project, ChangeNumber] => ChangeInfo JSON
//	["gerrit.Comment", Instance, Project, ChangeNumber] => CommentInfo JSON
//	["gerrit.ChangeUpdate", Instance, ChangeNumber, MetaID] => DBTime
//	["gerrit.ChangeUpdateByTime", DBTime, Instance, ChangeNumber, MetaID] => []
//
// A watcher on "gerrit.ChangeUpdate" will see all Gerrit changes,
// and can read the new data from the database.

// Gerrit APIs for searching changes return results only in reverse
// chronological order. As execution of [Client.Sync] can in principle
// be interrupted by the enclosing environment (for instance, Cloud Run
// timeout), this requires a different algorithm for making partial progress.
//
// The algorithm keeps track of three points in time: low watermark (L),
// high watermark (H), and current watermark (C). [Client.Sync] has
// processed all change updates before L and none after H. The algorithm
// first tries to process change updates in the interval [L, H] by going
// backwards from H. The watermark C is used to remember where in this
// interval the algorithm is currently. This is done so that the algorithm
// can restart in case there is an interruption. Once the algorithm
// processes the [L, H] interval, H becomes the new low watermark, the
// new high watermark is the current moment in time, and C is equal to H.

// changeOpts is the options we request for a change.
var changeOpts = []string{
	"ALL_REVISIONS",
	"DETAILED_ACCOUNTS",
	"LABELS",
	"ALL_COMMITS",
	"MESSAGES",
	"SUBMITTABLE",
	"PARENTS",
}

// o is short for ordered.Encode.
func o(list ...any) []byte { return ordered.Encode(list...) }

// A Client is a connection to a Gerrit instance, and to the database
// that stores information gathered from the instance.
type Client struct {
	instance string
	slog     *slog.Logger
	db       storage.DB
	secret   secret.DB
	http     *http.Client

	flushRequested atomic.Bool // flush database to disk when convenient

	ac accountCache

	testing    bool
	testClient *TestingClient
}

// New returns a new client to access a Gerrit instance
// described by a host name like "go-review.googlesource.com".
// The client uses the given logger, databases, and HTTP client.
//
// The secret database will look for a secret whose name is the
// Gerrit instance. The value will be user:pass.  This is not yet used.
func New(instance string, lg *slog.Logger, db storage.DB, sdb secret.DB, hc *http.Client) *Client {
	return &Client{
		instance: instance,
		slog:     lg,
		db:       db,
		secret:   sdb,
		http:     hc,
		testing:  testing.Testing(),
	}
}

// RequestFlush asks a Gerrit sync to flush the database to disk
// when convenient. This may be called concurrently with Sync.
func (c *Client) RequestFlush() {
	c.flushRequested.Store(true)
}

// projectSync records the sync state of a Gerrit project within
// an instance, such as "go" or "website".
// This is stored in the database.
type projectSync struct {
	Instance    string // instance host name, "go-review.googlesource.com"
	Name        string // project name, such as "go" or "oscar".
	LowMark     string // low watermark L, in gerrit timestamp layout
	HighMark    string // high watermark H, in gerrit timestamp layout
	CurrentMark string // current watermark C, in gerrit timestamp layout
	// Skip is used to guarantee partial progress in the
	// case there are more change updates happening at
	// the CurrentMark across the change batch boundaries.
	Skip int
}

// store stores inst into db.
func (proj *projectSync) store(db storage.DB) {
	db.Set(o(syncProjectKind, proj.Instance, proj.Name), storage.JSON(proj))
}

// Add adds a Gerrit project such as "go" or "oscar" to the database.
// It only adds the project sync metadata.
// The initial data fetch does not happen until [Sync] or [SyncProject]
// is called.
// Add returns an error if the project has already been added.
func (c *Client) Add(project string) error {
	key := o(syncProjectKind, c.instance, project)
	if _, ok := c.db.Get(key); ok {
		return fmt.Errorf("gerrit.Add: already added: %q", project)
	}
	proj := &projectSync{
		Instance: c.instance,
		Name:     project,
	}
	c.db.Set(key, storage.JSON(proj))
	return nil
}

// Sync syncs the data for all projects in this client's instance.
func (c *Client) Sync(ctx context.Context) error {
	var errs []error
	for key := range c.db.Scan(o(syncProjectKind, c.instance), o(syncProjectKind, c.instance, ordered.Inf)) {
		var project string
		if err := ordered.Decode(key, nil, nil, &project); err != nil {
			c.db.Panic("gerrit client sync decode", "key", storage.Fmt(key), "err", err)
		}
		if err := c.SyncProject(ctx, project); err != nil {
			errs = append(errs, err)
		}
	}
	return errors.Join(errs...)
}

// SyncProject syncs a single project.
func (c *Client) SyncProject(ctx context.Context, project string) (err error) {
	c.slog.Debug("gerrit.SyncProject", "project", project)
	defer func() {
		if err != nil {
			err = fmt.Errorf("SyncProject(%q): %w", project, err)
		}
	}()

	key := o(syncProjectKind, c.instance, project)
	skey := string(key)

	// Lock the project, so that no else is sync'ing concurrently.
	c.db.Lock(skey)
	defer c.db.Unlock(skey)

	// Load sync state.
	var proj projectSync
	if val, ok := c.db.Get(key); !ok {
		return fmt.Errorf("missing project %s", project)
	} else if err := json.Unmarshal(val, &proj); err != nil {
		return err
	}

	return c.syncChanges(ctx, &proj)
}

// syncChanges attempts to finish finding all the change updates
// in the interval [proj.LowMark, proj.HighMark]. If it successfully
// finishes analyzing the interval, it opens up a new one and
// starts working on it. It repeats this process as long as there
// are some changes processed in the interval.
// It stores the data for those changes in the database.
// It also adds in the metadata changes, such as values for watermarks.
func (c *Client) syncChanges(ctx context.Context, proj *projectSync) (err error) {
	// save stores the new values for low and high
	// watermark, and sets the current mark to high.
	save := func(low, high string) {
		proj.LowMark = low
		proj.HighMark = high
		proj.CurrentMark = high
		proj.Skip = 0
		proj.store(c.db)
		c.db.Flush()
	}

	// If the previous interval was closed successfully,
	// then create a new one.
	if proj.HighMark == "" {
		save(proj.LowMark, now())
	}

	for {
		c.slog.Info("gerrit sync interval", "project", proj.Name, "low", proj.LowMark,
			"curr", proj.CurrentMark, "skip", proj.Skip, "high", proj.HighMark)
		some, err := c.syncIntervalChanges(ctx, proj)
		if err != nil {
			return err
		}
		if !some { // no changes in the interval
			break
		}
		save(proj.HighMark, now()) // set high as the low mark
	}

	// Prepare for the next invocation of syncChanges.
	save(proj.HighMark, "") // set high as the low mark
	return nil
}

// testNow exists for testing purposes, to avoid the
// issue of dealing with the current moment in time.
// For ordinary use this should be empty string.
// TODO: instead, should we ask database for its
// definition of now?
var testNow string

// now returns current time in gerrit time format.
func now() string {
	if testNow != "" {
		return testNow
	}
	return time.Now().Format(timeStampLayout)
}

// syncIntervalChanges syncs changes in [proj.LowMark, proj.CurrentMark].
// Reports whether there were any change updates in the interval.
func (c *Client) syncIntervalChanges(ctx context.Context, proj *projectSync) (some bool, err error) {
	b := c.db.Batch()
	defer func() {
		b.Apply()
		c.db.Flush()
	}()

	// When we need to fetch multiple lists of changes,
	// concurrent modifications can cause us to see the
	// same change more than once. Keep track of the changes
	// we've already seen.
	cache := make(map[int]json.RawMessage)
	seen := func(change json.RawMessage, changeNum int, metaID string) (bool, error) {
		if oldChange, ok := cache[changeNum]; ok {
			same, err := sameChangeInfo(change, metaID, oldChange)
			if err != nil {
				return false, err
			}
			if same {
				// Nothing has changed.
				return true, nil
			}
		}
		cache[changeNum] = change

		key := o(changeKind, c.instance, proj.Name, changeNum)
		if oldChange, ok := c.db.Get(key); ok {
			same, err := sameChangeInfo(change, metaID, oldChange)
			if err != nil {
				return false, err
			}
			if same {
				// Nothing has changed.
				return true, nil
			}
		}
		return false, nil
	}

	saveCurrentMark := func(curr string, skip int) {
		proj.CurrentMark = curr
		proj.Skip = skip
		proj.store(c.db)
	}

	for {
		nChanges := 0
		for change, err := range c.changes(ctx, proj.Name, proj.LowMark, proj.CurrentMark, proj.Skip) {
			if err != nil {
				return false, err
			}
			if err := ctx.Err(); err != nil {
				return false, err
			}

			some = true
			nChanges++

			if c.flushRequested.Load() {
				// Flush database.
				b.Apply()
				c.db.Flush()
				c.flushRequested.Store(false)
			}

			// Change is a Gerrit ChangeInfo in JSON form.
			// Pull out the change number.
			var num struct {
				Number int    `json:"_number"`
				MetaID string `json:"meta_rev_id"`
			}
			if err := json.Unmarshal(change, &num); err != nil {
				return false, err
			}
			changeNum := num.Number
			metaID := num.MetaID
			if changeNum == 0 {
				return false, fmt.Errorf("missing _number field in %q", change)
			}
			if metaID == "" {
				return false, fmt.Errorf("missing meta_rev_id field in change %d: %q", changeNum, change)
			}

			same, err := seen(change, changeNum, metaID)
			if err != nil {
				return false, err
			}
			if !same {
				key := o(changeKind, c.instance, proj.Name, changeNum)
				b.Set(key, change)
				if err := c.syncComments(ctx, b, proj.Name, changeNum); err != nil {
					return false, err
				}

				// Record that the change was updated.
				timed.Set(c.db, b, changeUpdateKind, o(c.instance, changeNum, metaID), nil)
			}

			b.MaybeApply()

			// Save the update time of the most recent
			// change to proj.CurrentMark only once we
			// have successfully processed the change.
			var updated struct {
				Updated string `json:"updated"`
			}
			if err := json.Unmarshal(change, &updated); err != nil {
				return false, err
			}
			// Gerrit intervals are inclusive. Update proj.Skip
			// to avoid re-fetching processeed change updates
			// happening at the same gerrit timestamp at the
			// boundaries of the outter loop.
			if updated.Updated == proj.CurrentMark {
				saveCurrentMark(updated.Updated, proj.Skip+1)
			} else {
				saveCurrentMark(updated.Updated, 1)
			}

			// Flush progress to the database occasionally
			// to make sure it is saved before interruption.
			if nChanges%100 == 0 {
				b.Apply()
				c.db.Flush()
			}
		}

		// There were no changes in the interval [proj.LowMark, proj.CurrentMark],
		// which means we are done with the interval.
		if nChanges == 0 {
			return some, nil
		}
	}

	return some, nil
}

// syncComments updates the comments of a change in the database.
func (c *Client) syncComments(ctx context.Context, b storage.Batch, project string, changeNum int) error {
	var obj json.RawMessage
	if c.divertChanges() { // testing
		cms := c.testClient.comments[changeNum]
		obj = storage.JSON(map[string][]*CommentInfo{"file": cms}) // attach comments to a single file
	} else {
		url := "https://" + c.instance + "/changes/" + strconv.Itoa(changeNum) + "/comments"
		if err := c.get(ctx, url, &obj); err != nil {
			return err
		}
	}

	key := o(commentKind, c.instance, project, changeNum)
	b.Set(key, obj)
	return nil
}

const gerritQueryLimit = 500 // gerrit returns up to 500 changes at a time.

// changes returns an iterator, in reverse chronological order, over
// at most gerritQueryLimit changes in the Gerrit repo most recently
// updated in the interval [before, after]. The first skip number of
// changes matching the criteria are disregarded.
// Empty strings for before and after indicate open interval.
func (c *Client) changes(ctx context.Context, project, after, before string, skip int) iter.Seq2[json.RawMessage, error] {
	if c.divertChanges() { // testing
		return c.testClient.changes(ctx, project, after, before, skip)
	}

	return func(yield func(json.RawMessage, error) bool) {
		baseURL := "https://" + c.instance + "/changes"

		values := url.Values{
			"o": changeOpts,
		}
		query := "p:" + project
		if after != "" {
			query += " after:" + quote(after) // precise timestamps have spaces and need quotes
		}
		if before != "" {
			query += " before:" + quote(before) // precise timestamps have spaces and need quotes
		}
		query += " limit:" + strconv.Itoa(gerritQueryLimit)
		values.Set("q", query)
		if skip > 0 {
			values.Set("S", strconv.Itoa(skip))
		}
		addr := baseURL + "?" + values.Encode()

		var body []json.RawMessage
		if err := c.get(ctx, addr, &body); err != nil {
			yield(nil, err)
			return
		}

		for _, change := range body {
			if !yield(change, nil) {
				return
			}
		}
	}
}

func quote(t string) string {
	if _, err := strconv.Unquote(t); err != nil { // missing quotes
		return strconv.Quote(t)
	}
	return t
}

// sameChangeInfo reports whether two ChangeInfo structures,
// in JSON form, are the same. aMetaID is the meta ID of a.
func sameChangeInfo(a []byte, aMetaID string, b []byte) (bool, error) {
	if bytes.Equal(a, b) {
		return true, nil
	}

	// Unfortunately Gerrit does not return identical ChangeInfo
	// information with consistent field ordering.
	// In particular, we've seen that the order of "reviewers"
	// can change. So we check the meta ID.
	var extractMetaID struct {
		MetaID string `json:"meta_rev_id"`
	}

	if err := json.Unmarshal(b, &extractMetaID); err != nil {
		return false, err
	}
	bMetaID := extractMetaID.MetaID
	if bMetaID == "" {
		return false, errors.New("missing meta ID")
	}

	return aMetaID == bMetaID, nil
}

// get fetches addr and decodes the body as JSON into obj.
func (c *Client) get(ctx context.Context, addr string, obj any) error {
	c.slog.Info("gerrit GET", "addr", addr)

	tries := 0
	backoff := 1 * time.Second
	for {
		req, err := http.NewRequestWithContext(ctx, "GET", addr, nil)
		if err != nil {
			return err
		}
		resp, err := c.http.Do(req)
		if err != nil {
			return err
		}

		if resp.StatusCode != http.StatusOK {
			data, err := io.ReadAll(resp.Body)
			resp.Body.Close()
			if err != nil {
				return fmt.Errorf("reading body: %v", err)
			}

			if resp.StatusCode == http.StatusTooManyRequests {
				tries++
				if tries > 20 {
					return errors.New("too many requests")
				}
				c.slog.Info("gerrit too many requests",
					"try", tries,
					"sleep", backoff,
					"body", string(data))
				time.Sleep(backoff)
				backoff = min(backoff*2, 1*time.Minute)

				continue
			}

			return fmt.Errorf("%s\n%s", resp.Status, data)
		}

		// Skip the XSRF header at the start of the response.
		buf := bufio.NewReader(resp.Body)
		defer resp.Body.Close()
		if _, err := buf.ReadSlice('\n'); err != nil {
			return err
		}

		return json.NewDecoder(buf).Decode(obj)
	}
}
