// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package maintner

import (
	"bytes"
	"context"
	"crypto/sha256"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"io/fs"
	"log"
	"net"
	"net/http"
	"net/url"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	"github.com/golang/protobuf/proto"
	"golang.org/x/build/maintner/internal/robustio"
	"golang.org/x/build/maintner/maintpb"
	"golang.org/x/build/maintner/reclog"
)

// NewNetworkMutationSource returns a mutation source from a master server.
// The server argument should be a URL to the JSON logs index.
func NewNetworkMutationSource(server, cacheDir string) MutationSource {
	base, err := url.Parse(server)
	if err != nil {
		panic(fmt.Sprintf("invalid URL: %q", server))
	}
	return &netMutSource{
		server:   server,
		base:     base,
		cacheDir: cacheDir,
	}
}

// TailNetworkMutationSource calls fn for all new mutations added to the log on server.
// Events with the End field set to true are not sent, so all events will
// have exactly one of Mutation or Err fields set to a non-zero value.
// It ignores prior events.
// If the server is restarted and its history diverges,
// TailNetworkMutationSource may return duplicate events. This therefore does not
// return a MutationSource, so it can't be accidentally misused for important things.
// TailNetworkMutationSource returns if fn returns an error, if ctx expires,
// or if it runs into a network error.
func TailNetworkMutationSource(ctx context.Context, server string, fn func(MutationStreamEvent) error) error {
	td, err := os.MkdirTemp("", "maintnertail")
	if err != nil {
		return err
	}
	defer robustio.RemoveAll(td)

	ns := NewNetworkMutationSource(server, td).(*netMutSource)
	ns.quiet = true
	getSegs := func(waitSizeNot int64) ([]LogSegmentJSON, error) {
		for {
			segs, err := ns.getServerSegments(ctx, waitSizeNot)
			if err != nil {
				if err := ctx.Err(); err != nil {
					return nil, err
				}
				// Sleep a minimum of 5 seconds before trying
				// again. The user's fn function might sleep
				// longer or shorter.
				timer := time.NewTimer(5 * time.Second)
				err := fn(MutationStreamEvent{Err: err})
				if err != nil {
					timer.Stop()
					return nil, err
				}
				<-timer.C
				continue
			}
			return segs, nil
		}
	}

	// See how long the log is at start. Then we'll only fetch
	// things after that.
	segs, err := getSegs(0)
	if err != nil {
		return err
	}
	segSize := sumJSONSegSize(segs)
	lastSeg := segs[len(segs)-1]
	if _, _, err := ns.syncSeg(ctx, lastSeg); err != nil {
		return err
	}

	ticker := time.NewTicker(time.Second) // max re-fetch interval
	defer ticker.Stop()
	for {
		segs, err := getSegs(segSize)
		if err != nil {
			return err
		}
		segSize = sumJSONSegSize(segs)

		for _, seg := range segs {
			if seg.Number < lastSeg.Number {
				continue
			}
			var off int64
			if seg.Number == lastSeg.Number {
				off = lastSeg.Size
			}
			_, newData, err := ns.syncSeg(ctx, seg)
			if err != nil {
				return err
			}
			if err := reclog.ForeachRecord(bytes.NewReader(newData), off, func(off int64, hdr, rec []byte) error {
				m := new(maintpb.Mutation)
				if err := proto.Unmarshal(rec, m); err != nil {
					return err
				}
				return fn(MutationStreamEvent{Mutation: m})
			}); err != nil {
				return err
			}
		}
		lastSeg = segs[len(segs)-1]

		<-ticker.C
	}
}

type netMutSource struct {
	server   string
	base     *url.URL
	cacheDir string

	last  []fileSeg
	quiet bool // disable verbose logging

	// Hooks for testing. If nil, unused:
	testHookGetServerSegments func(context.Context, int64) ([]LogSegmentJSON, error)
	testHookSyncSeg           func(context.Context, LogSegmentJSON) (fileSeg, []byte, error)
	testHookOnSplit           func(sumCommon int64)
	testHookFilePrefixSum224  func(file string, n int64) string
}

func (ns *netMutSource) GetMutations(ctx context.Context) <-chan MutationStreamEvent {
	ch := make(chan MutationStreamEvent, 50)
	go func() {
		err := ns.fetchAndSendMutations(ctx, ch)
		final := MutationStreamEvent{Err: err}
		if err == nil {
			final.End = true
		}
		select {
		case ch <- final:
		case <-ctx.Done():
		}
	}()
	return ch
}

// isNoInternetError reports whether the provided error is because there's no
// network connectivity.
func isNoInternetError(err error) bool {
	if err == nil {
		return false
	}
	switch err := err.(type) {
	case fetchError:
		return isNoInternetError(err.Err)
	case *url.Error:
		return isNoInternetError(err.Err)
	case *net.OpError:
		return isNoInternetError(err.Err)
	case *net.DNSError:
		// Trashy:
		return err.Err == "no such host"
	default:
		log.Printf("Unknown error type %T: %#v", err, err)
		return false
	}
}

func (ns *netMutSource) locallyCachedSegments() (segs []fileSeg, err error) {
	defer func() {
		if err != nil {
			log.Printf("No network connection and failed to use local cache: %v", err)
		} else {
			log.Printf("No network connection; using %d locally cached segments.", len(segs))
		}
	}()
	des, err := os.ReadDir(ns.cacheDir)
	if err != nil {
		return nil, err
	}
	fiMap := map[string]fs.FileInfo{}
	segHex := map[int]string{}
	segGrowing := map[int]bool{}
	for _, de := range des {
		name := de.Name()
		if !strings.HasSuffix(name, ".mutlog") {
			continue
		}
		fi, err := de.Info()
		if err != nil {
			return nil, err
		}
		fiMap[name] = fi

		if len(name) == len("0000.6897fab4d3afcda332424b2a2a1a4469021074282bc7be5606aaa221.mutlog") {
			num, err := strconv.Atoi(name[:4])
			if err != nil {
				continue
			}
			segHex[num] = strings.TrimSuffix(name[5:], ".mutlog")
		} else if strings.HasSuffix(name, ".growing.mutlog") {
			num, err := strconv.Atoi(name[:4])
			if err != nil {
				continue
			}
			segGrowing[num] = true
		}
	}
	for num := 0; ; num++ {
		if hex, ok := segHex[num]; ok {
			name := fmt.Sprintf("%04d.%s.mutlog", num, hex)
			segs = append(segs, fileSeg{
				seg:    num,
				file:   filepath.Join(ns.cacheDir, name),
				size:   fiMap[name].Size(),
				sha224: hex,
			})
			continue
		}
		if segGrowing[num] {
			name := fmt.Sprintf("%04d.growing.mutlog", num)
			slurp, err := robustio.ReadFile(filepath.Join(ns.cacheDir, name))
			if err != nil {
				return nil, err
			}
			segs = append(segs, fileSeg{
				seg:    num,
				file:   filepath.Join(ns.cacheDir, name),
				size:   int64(len(slurp)),
				sha224: fmt.Sprintf("%x", sha256.Sum224(slurp)),
			})
		}
		return segs, nil
	}
}

// getServerSegments fetches the JSON logs handler (ns.server, usually
// https://maintner.golang.org/logs) and returns the parsed JSON.
// It sends the "waitsizenot" URL parameter, which specifies that the
// request should long-poll waiting for the server to have a sum of
// log segment sizes different than the value specified. As a result,
// it blocks until the server has new data to send or ctx expires.
//
// getServerSegments returns an error that matches fetchError with
// PossiblyRetryable set to true when it has signal that repeating
// the same call after some time may succeed.
func (ns *netMutSource) getServerSegments(ctx context.Context, waitSizeNot int64) ([]LogSegmentJSON, error) {
	if fn := ns.testHookGetServerSegments; fn != nil {
		return fn(ctx, waitSizeNot)
	}
	logsURL := fmt.Sprintf("%s?waitsizenot=%d", ns.server, waitSizeNot)
	for {
		req, err := http.NewRequestWithContext(ctx, "GET", logsURL, nil)
		if err != nil {
			return nil, err
		}
		res, err := http.DefaultClient.Do(req)
		if err != nil {
			return nil, fetchError{Err: err, PossiblyRetryable: true}
		}
		// When we're doing a long poll and the server replies
		// with a 304 response, that means the server is just
		// heart-beating us and trying to get a response back
		// within its various deadlines. But we should just
		// try again.
		if res.StatusCode == http.StatusNotModified {
			res.Body.Close()
			continue
		}
		defer res.Body.Close()
		if res.StatusCode/100 == 5 {
			// Consider a 5xx server response to possibly succeed later.
			return nil, fetchError{Err: fmt.Errorf("%s: %v", ns.server, res.Status), PossiblyRetryable: true}
		} else if res.StatusCode != http.StatusOK {
			return nil, fmt.Errorf("%s: %v", ns.server, res.Status)
		}
		b, err := io.ReadAll(res.Body)
		if err != nil {
			return nil, fetchError{Err: err, PossiblyRetryable: true}
		}
		var segs []LogSegmentJSON
		err = json.Unmarshal(b, &segs)
		if err != nil {
			return nil, fmt.Errorf("unmarshaling %s JSON: %v", ns.server, err)
		}
		return segs, nil
	}
}

// getNewSegments fetches new mutations from the network mutation source.
// It tries to absorb the expected network bumps by trying multiple times,
// and returns an error only when it considers the problem to be terminal.
//
// If there's no internet connectivity from the start, it returns locally
// cached segments that might be available from before. Otherwise it waits
// for internet connectivity to come back and keeps going when it does.
func (ns *netMutSource) getNewSegments(ctx context.Context) ([]fileSeg, error) {
	sumLast := sumSegSize(ns.last)

	// First, fetch JSON metadata for the segments from the server.
	var serverSegs []LogSegmentJSON
	for try := 1; ; {
		segs, err := ns.getServerSegments(ctx, sumLast)
		if isNoInternetError(err) {
			if sumLast == 0 {
				return ns.locallyCachedSegments()
			}
			log.Printf("No internet; blocking.")
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			case <-time.After(15 * time.Second):
				try = 1
				continue
			}
		} else if fe := (fetchError{}); errors.As(err, &fe) && fe.PossiblyRetryable {
			// Fetching the JSON logs handler happens over an unreliable network connection,
			// and will fail at some point. Prefer to try again over reporting a terminal error.
			const maxTries = 5
			if try == maxTries {
				// At this point, promote it to a terminal error.
				return nil, fmt.Errorf("after %d attempts, fetching server segments still failed: %v", maxTries, err)
			}
			someDelay := time.Duration(try*try) * time.Second
			log.Printf("fetching server segments did not succeed on attempt %d, will try again in %v: %v", try, someDelay, err)
			select {
			case <-ctx.Done():
				return nil, ctx.Err()
			case <-time.After(someDelay):
				try++
				continue
			}
		} else if err != nil {
			return nil, err
		}
		serverSegs = segs
		break
	}
	// TODO: optimization: if already on GCE, skip sync to disk part and just
	// read from network. fast & free network inside.

	// Second, fetch the new segments or their fragments
	// that we don't yet have locally.
	var fileSegs []fileSeg
	for _, seg := range serverSegs {
		for try := 1; ; {
			fileSeg, _, err := ns.syncSeg(ctx, seg)
			if isNoInternetError(err) {
				log.Printf("No internet; blocking.")
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
				case <-time.After(15 * time.Second):
					try = 1
					continue
				}
			} else if fe := (fetchError{}); errors.As(err, &fe) && fe.PossiblyRetryable {
				// Syncing a segment fetches a good deal of data over a network connection,
				// and will fail at some point. Be very willing to try again at this layer,
				// since it's much more efficient than having GetMutations return an error
				// and possibly cause a higher level retry to redo significantly more work.
				const maxTries = 10
				if try == maxTries {
					// At this point, promote it to a terminal error.
					return nil, fmt.Errorf("after %d attempts, syncing segment %d still failed: %v", maxTries, seg.Number, err)
				}
				someDelay := time.Duration(try*try) * time.Second
				log.Printf("syncing segment %d did not succeed on attempt %d, will try again in %v: %v", seg.Number, try, someDelay, err)
				select {
				case <-ctx.Done():
					return nil, ctx.Err()
				case <-time.After(someDelay):
					try++
					continue
				}
			} else if err != nil {
				return nil, err
			}
			fileSegs = append(fileSegs, fileSeg)
			break
		}
	}

	// Verify consistency of newly fetched data,
	// and check there is in fact something new.
	sumCommon := ns.sumCommonPrefixSize(fileSegs, ns.last)
	if sumCommon != sumLast {
		if fn := ns.testHookOnSplit; fn != nil {
			fn(sumCommon)
		}
		// Our history diverged from the source.
		return nil, ErrSplit
	} else if sumCur := sumSegSize(fileSegs); sumCommon == sumCur {
		// Nothing new. This shouldn't happen since the maintnerd server is required to handle
		// the "?waitsizenot=NNN" long polling parameter, so it's a problem if we get here.
		return nil, fmt.Errorf("maintner.netsource: maintnerd server returned unchanged log segments")
	}
	ns.last = fileSegs

	newSegs := trimLeadingSegBytes(fileSegs, sumCommon)
	return newSegs, nil
}

func trimLeadingSegBytes(in []fileSeg, trim int64) []fileSeg {
	// First trim off whole segments, sharing the same underlying memory.
	for len(in) > 0 && trim >= in[0].size {
		trim -= in[0].size
		in = in[1:]
	}
	if len(in) == 0 {
		return nil
	}
	// Now copy, since we'll be modifying the first element.
	out := append([]fileSeg(nil), in...)
	out[0].skip = trim
	return out
}

// filePrefixSum224 returns the lowercase hex SHA-224 of the first n bytes of file.
func (ns *netMutSource) filePrefixSum224(file string, n int64) string {
	if fn := ns.testHookFilePrefixSum224; fn != nil {
		return fn(file, n)
	}
	f, err := os.Open(file)
	if err != nil {
		if !os.IsNotExist(err) {
			log.Print(err)
		}
		return ""
	}
	defer f.Close()
	h := sha256.New224()
	_, err = io.CopyN(h, f, n)
	if err != nil {
		log.Print(err)
		return ""
	}
	return fmt.Sprintf("%x", h.Sum(nil))
}

func sumSegSize(segs []fileSeg) (sum int64) {
	for _, seg := range segs {
		sum += seg.size
	}
	return
}

func sumJSONSegSize(segs []LogSegmentJSON) (sum int64) {
	for _, seg := range segs {
		sum += seg.Size
	}
	return
}

// sumCommonPrefixSize computes the size of the longest common prefix of file segments a and b
// that can be found quickly by checking for matching checksums between segment boundaries.
func (ns *netMutSource) sumCommonPrefixSize(a, b []fileSeg) (sum int64) {
	for len(a) > 0 && len(b) > 0 {
		sa, sb := a[0], b[0]
		if sa.sha224 == sb.sha224 {
			// Whole chunk in common.
			sum += sa.size
			a, b = a[1:], b[1:]
			continue
		}
		if sa.size == sb.size {
			// If they're the same size but different
			// sums, it must've forked.
			return
		}
		// See if one chunk is a prefix of the other.
		// Make sa be the smaller one.
		if sb.size < sa.size {
			sa, sb = sb, sa
		}
		// Hash the beginning of the bigger size.
		bPrefixSum := ns.filePrefixSum224(sb.file, sa.size)
		if bPrefixSum == sa.sha224 {
			sum += sa.size
		}
		break
	}
	return
}

// fetchAndSendMutations fetches new mutations from the network mutation source
// and sends them to ch.
func (ns *netMutSource) fetchAndSendMutations(ctx context.Context, ch chan<- MutationStreamEvent) error {
	newSegs, err := ns.getNewSegments(ctx)
	if err != nil {
		return err
	}
	return foreachFileSeg(newSegs, func(seg fileSeg) error {
		f, err := os.Open(seg.file)
		if err != nil {
			return err
		}
		defer f.Close()
		if seg.skip > 0 {
			if _, err := f.Seek(seg.skip, io.SeekStart); err != nil {
				return err
			}
		}
		return reclog.ForeachRecord(io.LimitReader(f, seg.size-seg.skip), seg.skip, func(off int64, hdr, rec []byte) error {
			m := new(maintpb.Mutation)
			if err := proto.Unmarshal(rec, m); err != nil {
				return err
			}
			select {
			case ch <- MutationStreamEvent{Mutation: m}:
				return nil
			case <-ctx.Done():
				return ctx.Err()
			}
		})
	})
}

func foreachFileSeg(segs []fileSeg, fn func(seg fileSeg) error) error {
	for _, seg := range segs {
		if err := fn(seg); err != nil {
			return err
		}
	}
	return nil
}

// TODO: add a constructor for this? or simplify it. make it Size +
// File + embedded LogSegmentJSON?
type fileSeg struct {
	seg    int
	file   string // full path
	sha224 string
	skip   int64
	size   int64
}

// syncSeg syncs the provided log segment, returning its on-disk metadata.
// The newData result is the new data that was added to the segment in this sync.
//
// syncSeg returns an error that matches fetchError with PossiblyRetryable set
// to true when it has signal that repeating the same call after some time may
// succeed.
func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (_ fileSeg, newData []byte, _ error) {
	if fn := ns.testHookSyncSeg; fn != nil {
		return fn(ctx, seg)
	}

	isFinalSeg := !strings.HasPrefix(seg.URL, "https://storage.googleapis.com/")
	relURL, err := url.Parse(seg.URL)
	if err != nil {
		return fileSeg{}, nil, err
	}
	segURL := ns.base.ResolveReference(relURL)

	frozen := filepath.Join(ns.cacheDir, fmt.Sprintf("%04d.%s.mutlog", seg.Number, seg.SHA224))

	// Do we already have it? Files named in their final form with the sha224 are considered
	// complete and immutable.
	if fi, err := os.Stat(frozen); err == nil && fi.Size() == seg.Size {
		return fileSeg{seg: seg.Number, file: frozen, size: fi.Size(), sha224: seg.SHA224}, nil, nil
	}

	// See how much data we already have in the partial growing file.
	partial := filepath.Join(ns.cacheDir, fmt.Sprintf("%04d.growing.mutlog", seg.Number))
	have, _ := robustio.ReadFile(partial)
	if int64(len(have)) == seg.Size {
		got224 := fmt.Sprintf("%x", sha256.Sum224(have))
		if got224 == seg.SHA224 {
			if !isFinalSeg {
				// This was growing for us, but the server started a new growing segment.
				if err := robustio.Rename(partial, frozen); err != nil {
					return fileSeg{}, nil, err
				}
				return fileSeg{seg: seg.Number, file: frozen, sha224: seg.SHA224, size: seg.Size}, nil, nil
			}
			return fileSeg{seg: seg.Number, file: partial, sha224: seg.SHA224, size: seg.Size}, nil, nil
		}
	}

	// Otherwise, download new data.
	if int64(len(have)) < seg.Size {
		req, err := http.NewRequestWithContext(ctx, "GET", segURL.String(), nil)
		if err != nil {
			return fileSeg{}, nil, err
		}
		req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", len(have), seg.Size-1))

		if !ns.quiet {
			log.Printf("Downloading %d bytes of %s ...", seg.Size-int64(len(have)), segURL)
		}
		res, err := http.DefaultClient.Do(req)
		if err != nil {
			return fileSeg{}, nil, fetchError{Err: err, PossiblyRetryable: true}
		}
		defer res.Body.Close()
		if res.StatusCode/100 == 5 {
			// Consider a 5xx server response to possibly succeed later.
			return fileSeg{}, nil, fetchError{Err: fmt.Errorf("%s: %s", segURL.String(), res.Status), PossiblyRetryable: true}
		} else if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusPartialContent {
			return fileSeg{}, nil, fmt.Errorf("%s: %s", segURL.String(), res.Status)
		}
		newData, err = io.ReadAll(res.Body)
		res.Body.Close()
		if err != nil {
			return fileSeg{}, nil, fetchError{Err: err, PossiblyRetryable: true}
		}
	}

	// Commit to disk.
	var newContents []byte
	if int64(len(newData)) == seg.Size {
		newContents = newData
	} else if int64(len(have)+len(newData)) == seg.Size {
		newContents = append(have, newData...)
	} else if int64(len(have)) > seg.Size {
		// We have more data than the server; likely because it restarted with uncommitted
		// transactions, and so we're headed towards an ErrSplit. Reuse the longest common
		// prefix as long as its checksum matches.
		newContents = have[:seg.Size]
	}
	got224 := fmt.Sprintf("%x", sha256.Sum224(newContents))
	if got224 != seg.SHA224 {
		if len(have) == 0 {
			return fileSeg{}, nil, errors.New("corrupt download")
		}
		// Try again.
		os.Remove(partial)
		return ns.syncSeg(ctx, seg)
	}
	// TODO: this is a quadratic amount of write I/O as the 16 MB
	// segment grows. Switch to appending to the existing file,
	// then perhaps encoding the desired file size into the
	// filename suffix (instead of just *.growing.mutlog) so
	// concurrent readers know where to stop.
	tf, err := os.CreateTemp(ns.cacheDir, "tempseg")
	if err != nil {
		return fileSeg{}, nil, err
	}
	if _, err := tf.Write(newContents); err != nil {
		return fileSeg{}, nil, err
	}
	if err := tf.Close(); err != nil {
		return fileSeg{}, nil, err
	}
	finalName := partial
	if !isFinalSeg {
		finalName = frozen
	}
	if err := robustio.Rename(tf.Name(), finalName); err != nil {
		return fileSeg{}, nil, err
	}
	if !ns.quiet {
		log.Printf("wrote %v", finalName)
	}
	return fileSeg{seg: seg.Number, file: finalName, size: seg.Size, sha224: seg.SHA224}, newData, nil
}

type LogSegmentJSON struct {
	Number int    `json:"number"`
	Size   int64  `json:"size"`
	SHA224 string `json:"sha224"`
	URL    string `json:"url"`
}

// fetchError records an error during a fetch operation over an unreliable network.
type fetchError struct {
	Err error // Non-nil.

	// PossiblyRetryable indicates whether Err is believed to be possibly caused by a
	// non-terminal network error, such that the caller can expect it may not happen
	// again if it simply tries the same fetch operation again after waiting a bit.
	PossiblyRetryable bool
}

func (e fetchError) Error() string { return e.Err.Error() }
func (e fetchError) Unwrap() error { return e.Err }
