// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// The gitmirror binary watches the specified Gerrit repositories for
// new commits and syncs them to mirror repositories.
//
// It also serves tarballs over HTTP for the build system.
package main

import (
	"bytes"
	"context"
	"flag"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"os"
	"os/exec"
	"os/signal"
	"path/filepath"
	"runtime"
	"sort"
	"strconv"
	"strings"
	"sync"
	"time"

	"golang.org/x/build/gerrit"
	"golang.org/x/build/internal/gitauth"
	"golang.org/x/build/internal/secret"
	"golang.org/x/build/maintner"
	"golang.org/x/build/maintner/godata"
	repospkg "golang.org/x/build/repos"
	"golang.org/x/sync/errgroup"
)

const (
	goBase = "https://go.googlesource.com/"
)

var (
	flagHTTPAddr     = flag.String("http", "", "If non-empty, the listen address to run an HTTP server on")
	flagCacheDir     = flag.String("cachedir", "", "git cache directory. If empty a temp directory is made.")
	flagPollInterval = flag.Duration("poll", 60*time.Second, "Remote repo poll interval")
	flagMirror       = flag.Bool("mirror", false, "whether to mirror to mirror repos; if disabled, it only runs in HTTP archive server mode")
	flagMirrorGitHub = flag.Bool("mirror-github", true, "whether to mirror to GitHub when mirroring is enabled")
	flagMirrorCSR    = flag.Bool("mirror-csr", true, "whether to mirror to Cloud Source Repositories when mirroring is enabled")
	flagSecretsDir   = flag.String("secretsdir", "", "directory to load secrets from instead of GCP")
)

func main() {
	flag.Parse()

	if *flagHTTPAddr != "" {
		go func() {
			err := http.ListenAndServe(*flagHTTPAddr, nil)
			log.Fatalf("http server failed: %v", err)
		}()
	}
	http.HandleFunc("/debug/env", handleDebugEnv)
	http.HandleFunc("/debug/goroutines", handleDebugGoroutines)

	if err := gitauth.Init(); err != nil {
		log.Fatalf("gitauth: %v", err)
	}

	cacheDir, err := createCacheDir()
	if err != nil {
		log.Fatalf("creating cache dir: %v", err)
	}
	credsDir, err := ioutil.TempDir("", "gitmirror-credentials")
	if err != nil {
		log.Fatalf("creating credentials dir: %v", err)
	}
	defer os.RemoveAll(credsDir)

	m := &gitMirror{
		mux:          http.DefaultServeMux,
		repos:        map[string]*repo{},
		cacheDir:     cacheDir,
		homeDir:      credsDir,
		gerritClient: gerrit.NewClient("https://go-review.googlesource.com", gerrit.NoAuth),
		mirrorGitHub: *flagMirrorGitHub,
		mirrorCSR:    *flagMirrorCSR,
		timeoutScale: 1,
	}
	http.HandleFunc("/", m.handleRoot)

	var eg errgroup.Group
	for _, repo := range repospkg.ByGerritProject {
		r := m.addRepo(repo)
		eg.Go(r.init)
	}
	if err := eg.Wait(); err != nil {
		log.Fatalf("initializing repos: %v", err)
	}

	if *flagMirror {
		if err := writeCredentials(credsDir); err != nil {
			log.Fatalf("writing git credentials: %v", err)
		}
		if err := m.addMirrors(); err != nil {
			log.Fatalf("configuring mirrors: %v", err)
		}
	}

	for _, repo := range m.repos {
		go repo.loop()
	}
	go m.pollGerritAndTickleLoop()
	go m.subscribeToMaintnerAndTickleLoop()

	shutdown := make(chan os.Signal, 1)
	signal.Notify(shutdown, os.Interrupt)
	<-shutdown
}

func writeCredentials(home string) error {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	sshConfig := &bytes.Buffer{}
	gitConfig := &bytes.Buffer{}
	sshConfigPath := filepath.Join(home, "ssh_config")
	// ssh ignores $HOME in favor of /etc/passwd, so we need to override ssh_config explicitly.
	fmt.Fprintf(gitConfig, "[core]\n  sshCommand=\"ssh -F %v\"\n", sshConfigPath)

	// GitHub key, used as the default SSH private key.
	if *flagMirrorGitHub {
		privKey, err := retrieveSecret(ctx, secret.NameGitHubSSHKey)
		if err != nil {
			return fmt.Errorf("reading github key from secret manager: %v", err)
		}
		privKeyPath := filepath.Join(home, secret.NameGitHubSSHKey)
		if err := ioutil.WriteFile(privKeyPath, []byte(privKey+"\n"), 0600); err != nil {
			return err
		}
		fmt.Fprintf(sshConfig, "Host github.com\n  IdentityFile %v\n", privKeyPath)
	}

	// gitmirror service key, used to gcloud auth for CSR writes.
	if *flagMirrorCSR {
		serviceKey, err := retrieveSecret(ctx, secret.NameGitMirrorServiceKey)
		if err != nil {
			return fmt.Errorf("reading service key from secret manager: %v", err)
		}
		serviceKeyPath := filepath.Join(home, secret.NameGitMirrorServiceKey)
		if err := ioutil.WriteFile(serviceKeyPath, []byte(serviceKey), 0600); err != nil {
			return err
		}
		gcloud := exec.CommandContext(ctx, "gcloud", "auth", "activate-service-account", "--key-file", serviceKeyPath)
		gcloud.Env = append(os.Environ(), "HOME="+home)
		if out, err := gcloud.CombinedOutput(); err != nil {
			return fmt.Errorf("gcloud auth failed: %v\n%v", err, out)
		}
		fmt.Fprintf(gitConfig, "[credential \"https://source.developers.google.com\"]\n  helper=gcloud.sh\n")
	}

	if err := ioutil.WriteFile(filepath.Join(home, ".gitconfig"), gitConfig.Bytes(), 0600); err != nil {
		return err
	}
	if err := ioutil.WriteFile(sshConfigPath, sshConfig.Bytes(), 0600); err != nil {
		return err
	}

	return nil
}

func retrieveSecret(ctx context.Context, name string) (string, error) {
	if *flagSecretsDir != "" {
		secret, err := ioutil.ReadFile(filepath.Join(*flagSecretsDir, name))
		return string(secret), err
	}
	sc := secret.MustNewClient()
	defer sc.Close()
	return sc.Retrieve(ctx, name)
}

func createCacheDir() (string, error) {
	if *flagCacheDir == "" {
		dir, err := ioutil.TempDir("", "gitmirror")
		if err != nil {
			log.Fatal(err)
		}
		defer os.RemoveAll(dir)
		return dir, nil
	}

	fi, err := os.Stat(*flagCacheDir)
	if os.IsNotExist(err) {
		if err := os.MkdirAll(*flagCacheDir, 0755); err != nil {
			return "", fmt.Errorf("failed to create watcher's git cache dir: %v", err)
		}
	} else {
		if err != nil {
			return "", fmt.Errorf("invalid -cachedir: %v", err)
		}
		if !fi.IsDir() {
			return "", fmt.Errorf("invalid -cachedir=%q; not a directory", *flagCacheDir)
		}
	}
	return *flagCacheDir, nil
}

// A gitMirror watches Gerrit repositories, fetching the latest commits and
// optionally mirroring them.
type gitMirror struct {
	mux      *http.ServeMux
	repos    map[string]*repo
	cacheDir string
	// homeDir is used as $HOME for all commands, allowing easy configuration overrides.
	homeDir                 string
	gerritClient            *gerrit.Client
	mirrorGitHub, mirrorCSR bool
	timeoutScale            int
}

func (m *gitMirror) addRepo(meta *repospkg.Repo) *repo {
	name := meta.GoGerritProject
	r := &repo{
		name:    name,
		url:     goBase + name,
		meta:    meta,
		root:    filepath.Join(m.cacheDir, name),
		changed: make(chan bool, 1),
		mirror:  m,
	}
	m.mux.Handle("/"+name+".tar.gz", r)
	m.mux.Handle("/debug/watcher/"+r.name, r)
	m.repos[name] = r
	return r
}

// addMirrors sets up mirroring for repositories that need it.
func (m *gitMirror) addMirrors() error {
	for _, repo := range m.repos {
		if m.mirrorGitHub && repo.meta.MirrorToGitHub {
			if err := repo.addRemote("github", "git@github.com:"+repo.meta.GitHubRepo+".git"); err != nil {
				return fmt.Errorf("adding GitHub remote: %v", err)
			}
		}
		if m.mirrorCSR && repo.meta.MirrorToCSR {
			if err := repo.addRemote("csr", "https://source.developers.google.com/p/golang-org/r/"+repo.name); err != nil {
				return fmt.Errorf("adding CSR remote: %v", err)
			}
		}
	}
	return nil
}

// GET /
// or:
// GET /debug/watcher/
func (m *gitMirror) handleRoot(w http.ResponseWriter, r *http.Request) {
	if r.URL.Path != "/" && r.URL.Path != "/debug/watcher/" {
		http.NotFound(w, r)
		return
	}
	w.Header().Set("Content-Type", "text/html; charset=utf-8")
	fmt.Fprint(w, "<html><body><pre>")
	var names []string
	for name := range m.repos {
		names = append(names, name)
	}
	sort.Strings(names)
	for _, name := range names {
		fmt.Fprintf(w, "<a href='/debug/watcher/%s'>%s</a> - %s\n", name, name, m.repos[name].statusLine())
	}
	fmt.Fprint(w, "</pre></body></html>")
}

// a statusEntry is a status string at a specific time.
type statusEntry struct {
	status string
	t      time.Time
}

// statusRing is a ring buffer of timestamped status messages.
type statusRing struct {
	mu   sync.Mutex      // guards rest
	head int             // next position to fill
	ent  [50]statusEntry // ring buffer of entries; zero time means unpopulated
}

func (r *statusRing) add(status string) {
	r.mu.Lock()
	defer r.mu.Unlock()

	r.ent[r.head] = statusEntry{status, time.Now()}
	r.head++
	if r.head == len(r.ent) {
		r.head = 0
	}
}

func (r *statusRing) foreachDesc(fn func(statusEntry)) {
	r.mu.Lock()
	defer r.mu.Unlock()

	i := r.head
	for {
		i--
		if i < 0 {
			i = len(r.ent) - 1
		}
		if i == r.head || r.ent[i].t.IsZero() {
			return
		}
		fn(r.ent[i])
	}
}

// repo represents a repository to be watched.
type repo struct {
	name    string
	url     string
	root    string // on-disk location of the git repo, *cacheDir/name
	meta    *repospkg.Repo
	changed chan bool // sent to when a change comes in
	status  statusRing
	dests   []string // destination remotes to mirror to
	mirror  *gitMirror

	mu        sync.Mutex
	err       error
	firstBad  time.Time
	lastBad   time.Time
	firstGood time.Time
	lastGood  time.Time
}

// init sets up the repo, cloning the repository to the local root.
func (r *repo) init() error {
	canReuse := true
	if _, err := os.Stat(filepath.Join(r.root, "FETCH_HEAD")); err != nil {
		canReuse = false
		r.logf("can't reuse git dir, no FETCH_HEAD: %v", err)
	}
	if canReuse {
		r.setStatus("reusing git dir; running git fetch")
		_, _, err := r.runGitLogged("fetch", "--prune", "origin")
		if err != nil {
			canReuse = false
			r.logf("git fetch failed; proceeding to wipe + clone instead")
		}
	}
	if !canReuse {
		r.setStatus("need clone; removing cache root")
		os.RemoveAll(r.root)
		_, _, err := r.runGitLogged("clone", "--mirror", r.url, r.root)
		if err != nil {
			return fmt.Errorf("cloning %s: %v", r.url, err)
		}
		r.setStatus("cloned")
	}
	return nil
}

func (r *repo) runGitLogged(args ...string) ([]byte, []byte, error) {
	start := time.Now()
	r.logf("running git %s", args)
	stdout, stderr, err := r.runGitQuiet(args...)
	if err == nil {
		r.logf("ran git %s in %v", args, time.Since(start))
	} else {
		r.logf("git %s failed after %v: %v\nstderr: %v\n", args, time.Since(start), err, string(stderr))
	}
	return stdout, stderr, err
}

func (r *repo) runGitQuiet(args ...string) ([]byte, []byte, error) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
	defer cancel()

	stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
	cmd := exec.CommandContext(ctx, "git", args...)
	if args[0] == "clone" {
		// Small hack: if we're cloning, the root doesn't exist yet.
		cmd.Dir = "/"
	} else {
		cmd.Dir = r.root
	}
	cmd.Env = append(os.Environ(), "HOME="+r.mirror.homeDir)
	cmd.Stdout, cmd.Stderr = stdout, stderr
	err := cmd.Run()
	return stdout.Bytes(), stderr.Bytes(), err
}

func (r *repo) setErr(err error) {
	r.mu.Lock()
	defer r.mu.Unlock()
	change := (r.err != nil) != (err != nil)
	now := time.Now()
	if err != nil {
		if change {
			r.firstBad = now
		}
		r.lastBad = now
	} else {
		if change {
			r.firstGood = now
		}
		r.lastGood = now
	}
	r.err = err
}

var startTime = time.Now()

func (r *repo) statusLine() string {
	r.mu.Lock()
	defer r.mu.Unlock()

	if r.lastGood.IsZero() {
		if r.err != nil {
			return fmt.Sprintf("broken; permanently? always failing, for %v", time.Since(r.firstBad))
		}
		if time.Since(startTime) < 5*time.Minute {
			return "ok; starting up, no report yet"
		}
		return fmt.Sprintf("hung; hang at start-up? no report since start %v ago", time.Since(startTime))
	}
	if r.err == nil {
		if sinceGood := time.Since(r.lastGood); sinceGood > 6*time.Minute {
			return fmt.Sprintf("hung? no activity since last success %v ago", sinceGood)
		}
		if r.lastBad.After(time.Now().Add(-1 * time.Hour)) {
			return fmt.Sprintf("ok; recent failure %v ago", time.Since(r.lastBad))
		}
		return "ok"
	}
	return fmt.Sprintf("broken for %v", time.Since(r.lastGood))
}

func (r *repo) setStatus(status string) {
	r.status.add(status)
}

func (r *repo) addRemote(name, url string) error {
	r.dests = append(r.dests, name)
	if err := os.MkdirAll(filepath.Join(r.root, "remotes"), 0777); err != nil {
		return err
	}
	// We want to include only the refs/heads/* and refs/tags/* namespaces
	// in the mirrors. They correspond to published branches and tags.
	// Leave out internal Gerrit namespaces such as refs/changes/*,
	// refs/users/*, etc., because they're not helpful on other hosts.
	remote := "URL: " + url + "\n" +
		"Push: +refs/heads/*:refs/heads/*\n" +
		"Push: +refs/tags/*:refs/tags/*\n"
	return ioutil.WriteFile(filepath.Join(r.root, "remotes", name), []byte(remote), 0777)
}

// loop continuously runs "git fetch" in the repo, checks for new
// commits and mirrors commits to a destination repo (if enabled).
func (r *repo) loop() {
	for {
		if err := r.loopOnce(); err != nil {
			time.Sleep(10 * time.Second * time.Duration(r.mirror.timeoutScale))
			continue
		}

		// We still run a timer but a very slow one, just
		// in case the mechanism updating the repo tickler
		// breaks for some reason.
		timer := time.NewTimer(5 * time.Minute)
		select {
		case <-r.changed:
			r.setStatus("got update tickle")
			timer.Stop()
		case <-timer.C:
			r.setStatus("poll timer fired")
		}
	}
}

func (r *repo) loopOnce() error {
	if err := r.fetch(); err != nil {
		r.logf("fetch failed: %v", err)
		r.setErr(err)
		return err
	}
	for _, dest := range r.dests {
		if err := r.push(dest); err != nil {
			r.logf("push failed: %v", err)
			r.setErr(err)
			return err
		}
	}
	r.setErr(nil)
	r.setStatus("waiting")
	return nil
}

func (r *repo) logf(format string, args ...interface{}) {
	log.Printf(r.name+": "+format, args...)
}

// fetch runs "git fetch" in the repository root.
// It tries three times, just in case it failed because of a transient error.
func (r *repo) fetch() error {
	err := r.try(3, func(attempt int) error {
		r.setStatus(fmt.Sprintf("running git fetch origin, attempt %d", attempt))
		if _, stderr, err := r.runGitLogged("fetch", "--prune", "origin"); err != nil {
			return fmt.Errorf("%v\n\n%s", err, stderr)
		}
		return nil
	})
	if err != nil {
		r.setStatus("git fetch failed")
	} else {
		r.setStatus("ran git fetch")
	}
	return err
}

// push runs "git push -f --mirror dest" in the repository root.
// It tries three times, just in case it failed because of a transient error.
func (r *repo) push(dest string) error {
	err := r.try(3, func(attempt int) error {
		r.setStatus(fmt.Sprintf("syncing to %v, attempt %d", dest, attempt))
		if _, stderr, err := r.runGitLogged("push", "-f", "--mirror", dest); err != nil {
			return fmt.Errorf("%v\n\n%s", err, stderr)
		}
		return nil
	})
	if err != nil {
		r.setStatus("sync to " + dest + " failed")
	} else {
		r.setStatus("did sync to " + dest)
	}
	return err
}

func (r *repo) fetchRevIfNeeded(ctx context.Context, rev string) error {
	if _, _, err := r.runGitQuiet("cat-file", "-e", rev); err == nil {
		return nil
	}
	r.logf("attempting to fetch missing revision %s from origin", rev)
	_, _, err := r.runGitLogged("fetch", "origin", rev)
	return err
}

// GET /<name>.tar.gz
// GET /debug/watcher/<name>
func (r *repo) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	if req.Method != "GET" && req.Method != "HEAD" {
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	if strings.HasPrefix(req.URL.Path, "/debug/watcher/") {
		r.serveStatus(w, req)
		return
	}
	rev := req.FormValue("rev")
	if rev == "" {
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
	defer cancel()
	if err := r.fetchRevIfNeeded(ctx, rev); err != nil {
		// Try the archive anyway, it might work
		r.logf("error fetching revision %s: %v", rev, err)
	}
	tgz, _, err := r.runGitQuiet("archive", "--format=tgz", rev)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.Header().Set("Content-Length", strconv.Itoa(len(tgz)))
	w.Header().Set("Content-Type", "application/x-compressed")
	w.Write(tgz)
}

func (r *repo) serveStatus(w http.ResponseWriter, req *http.Request) {
	w.Header().Set("Content-Type", "text/html")
	fmt.Fprintf(w, "<html><head><title>watcher: %s</title><body><h1>watcher status for repo: %q</h1>\n",
		r.name, r.name)
	fmt.Fprintf(w, "<pre>\n")
	nowRound := time.Now().Round(time.Second)
	r.status.foreachDesc(func(ent statusEntry) {
		fmt.Fprintf(w, "%v   %-20s %v\n",
			ent.t.In(time.UTC).Format(time.RFC3339),
			nowRound.Sub(ent.t.Round(time.Second)).String()+" ago",
			ent.status)
	})
	fmt.Fprintf(w, "\n</pre></body></html>")
}

func (r *repo) try(n int, fn func(attempt int) error) error {
	var err error
	for tries := 0; tries < n; tries++ {
		time.Sleep(time.Duration(tries) * 5 * time.Second * time.Duration(r.mirror.timeoutScale)) // Linear back-off.
		if err = fn(tries); err == nil {
			break
		}
	}
	return err
}

func (m *gitMirror) notifyChanged(name string) {
	repo, ok := m.repos[name]
	if !ok {
		return
	}
	select {
	case repo.changed <- true:
	default:
	}
}

// pollGerritAndTickleLoop polls Gerrit's JSON meta URL of all its URLs
// and their current branch heads.  When this sees that one has
// changed, it tickles the channel for that repo and wakes up its
// poller, if its poller is in a sleep.
func (m *gitMirror) pollGerritAndTickleLoop() {
	last := map[string]string{} // repo -> last seen hash
	for {
		gerritRepos, err := m.gerritMetaMap()
		if err != nil {
			log.Printf("pollGerritAndTickle: gerritMetaMap failed, skipping: %v", err)
			gerritRepos = nil
		}
		for repo, hash := range gerritRepos {
			if hash != last[repo] {
				last[repo] = hash
				m.notifyChanged(repo)
			}
		}
		time.Sleep(*flagPollInterval)
	}
}

// subscribeToMaintnerAndTickleLoop subscribes to maintner.golang.org
// and watches for any ref changes in realtime.
func (m *gitMirror) subscribeToMaintnerAndTickleLoop() {
	for {
		if err := m.subscribeToMaintnerAndTickle(); err != nil {
			log.Printf("maintner loop: %v; retrying in 30 seconds", err)
			time.Sleep(30 * time.Second)
		}
	}
}

func (m *gitMirror) subscribeToMaintnerAndTickle() error {
	ctx := context.Background()
	retryTicker := time.NewTicker(10 * time.Second)
	defer retryTicker.Stop() // we never return, though
	for {
		err := maintner.TailNetworkMutationSource(ctx, godata.Server, func(e maintner.MutationStreamEvent) error {
			if e.Mutation != nil && e.Mutation.Gerrit != nil {
				gm := e.Mutation.Gerrit
				if strings.HasPrefix(gm.Project, "go.googlesource.com/") {
					proj := strings.TrimPrefix(gm.Project, "go.googlesource.com/")
					log.Printf("maintner refs for %s changed", gm.Project)
					m.notifyChanged(proj)
				}
			}
			return e.Err
		})
		log.Printf("maintner tail error: %v; sleeping+restarting", err)

		// prevent retry looping faster than once every 10
		// seconds; but usually retry immediately in the case
		// where we've been runing for a while already.
		<-retryTicker.C
	}
}

// gerritMetaMap returns the map from repo name (e.g. "go") to its
// latest master hash.
func (m *gitMirror) gerritMetaMap() (map[string]string, error) {
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
	defer cancel()
	meta, err := m.gerritClient.GetProjects(ctx, "master")
	if err != nil {
		return nil, fmt.Errorf("gerritClient.GetProjects: %v", err)
	}
	result := map[string]string{}
	for repo, v := range meta {
		if master, ok := v.Branches["master"]; ok {
			result[repo] = master
		}
	}
	return result, nil
}

// GET /debug/goroutines
func handleDebugGoroutines(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "text/plain; charset=utf-8")
	buf := make([]byte, 1<<20)
	w.Write(buf[:runtime.Stack(buf, true)])
}

// GET /debug/env
func handleDebugEnv(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "text/plain; charset=utf-8")
	for _, kv := range os.Environ() {
		fmt.Fprintf(w, "%s\n", kv)
	}
}
