blob: 2a7d52033f9c6a7490fa33556d07a6d6c2bda681 [file] [log] [blame]
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// The gitmirror binary watches the specified Gerrit repositories for
// new commits and syncs them to mirror repositories.
//
// It also serves tarballs over HTTP for the build system.
package main
import (
"bytes"
"context"
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"path"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/build/gerrit"
"golang.org/x/build/internal/gitauth"
"golang.org/x/build/internal/secret"
"golang.org/x/build/maintner"
"golang.org/x/build/maintner/godata"
repospkg "golang.org/x/build/repos"
"golang.org/x/sync/errgroup"
)
const (
goBase = "https://go.googlesource.com/"
)
var (
flagHTTPAddr = flag.String("http", "", "If non-empty, the listen address to run an HTTP server on")
flagCacheDir = flag.String("cachedir", "", "git cache directory. If empty a temp directory is made.")
flagPollInterval = flag.Duration("poll", 60*time.Second, "Remote repo poll interval")
flagMirror = flag.Bool("mirror", false, "whether to mirror to mirror repos; if disabled, it only runs in HTTP archive server mode")
)
func main() {
flag.Parse()
if *flagHTTPAddr != "" {
go func() {
err := http.ListenAndServe(*flagHTTPAddr, nil)
log.Fatalf("http server failed: %v", err)
}()
}
http.HandleFunc("/debug/env", handleDebugEnv)
http.HandleFunc("/debug/goroutines", handleDebugGoroutines)
if err := gitauth.Init(); err != nil {
log.Fatalf("gitauth: %v", err)
}
cacheDir, err := createCacheDir()
if err != nil {
log.Fatalf("creating cache dir: %v", err)
}
m := &mirror{
repos: map[string]*Repo{},
cacheDir: cacheDir,
gerritClient: gerrit.NewClient("https://go-review.googlesource.com", gerrit.NoAuth),
}
http.HandleFunc("/", m.handleRoot)
if err := m.addRepos(); err != nil {
log.Fatalf("adding repos: %v", err)
}
if *flagMirror {
if err := writeCredentials(); err != nil {
log.Fatalf("writing ssh credentials: %v", err)
}
if err := m.runMirrors(); err != nil {
log.Fatalf("running mirror: %v", err)
}
}
for _, repo := range m.repos {
go repo.Loop()
}
go m.pollGerritAndTickle()
go m.subscribeToMaintnerAndTickleLoop()
select {}
}
func writeCredentials() error {
sc := secret.MustNewClient()
defer sc.Close()
home, err := os.UserHomeDir()
if err != nil {
return err
}
sshDir := filepath.Join(home, ".ssh")
sshKey := filepath.Join(sshDir, "id_ed25519")
if _, err := os.Stat(sshKey); err == nil {
log.Printf("Using github ssh key at %v", sshKey)
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
privKey, err := sc.Retrieve(ctx, secret.NameGitHubSSHKey)
if err != nil || len(privKey) == 0 {
return fmt.Errorf("can't mirror to github without %q GCP secret manager or file %v", secret.NameGitHubSSHKey, sshKey)
}
if err := os.MkdirAll(sshDir, 0700); err != nil {
return err
}
if err := ioutil.WriteFile(sshKey, []byte(privKey+"\n"), 0600); err != nil {
return err
}
log.Printf("Wrote %s from GCP secret manager.", sshKey)
return nil
}
func createCacheDir() (string, error) {
if *flagCacheDir == "" {
dir, err := ioutil.TempDir("", "gitmirror")
if err != nil {
log.Fatal(err)
}
defer os.RemoveAll(dir)
return dir, nil
}
fi, err := os.Stat(*flagCacheDir)
if os.IsNotExist(err) {
if err := os.MkdirAll(*flagCacheDir, 0755); err != nil {
return "", fmt.Errorf("failed to create watcher's git cache dir: %v", err)
}
} else {
if err != nil {
return "", fmt.Errorf("invalid -cachedir: %v", err)
}
if !fi.IsDir() {
return "", fmt.Errorf("invalid -cachedir=%q; not a directory", *flagCacheDir)
}
}
return *flagCacheDir, nil
}
// A mirror watches Gerrit repositories, fetching the latest commits and
// optionally mirroring them.
type mirror struct {
repos map[string]*Repo
cacheDir string
gerritClient *gerrit.Client
}
// addRepos adds known repositories to the mirror.
func (m *mirror) addRepos() error {
var eg errgroup.Group
for name := range repospkg.ByGerritProject {
name := name
eg.Go(func() error {
r, err := NewRepo(goBase+name, m.cacheDir)
if err != nil {
return err
}
http.Handle("/"+name+".tar.gz", r)
m.repos[name] = r
return nil
})
}
return eg.Wait()
}
// runMirrors sets up and starts mirroring for the repositories that are
// configured to be mirrored.
func (m *mirror) runMirrors() error {
for name, repo := range m.repos {
meta, ok := repospkg.ByGerritProject[name]
if !ok || !meta.MirrorToGitHub {
continue
}
if err := repo.addRemote("github", "git@github.com:"+meta.GitHubRepo()+".git",
// We want to include only the refs/heads/* and refs/tags/* namespaces
// in the mirrors. They correspond to published branches and tags.
// Leave out internal Gerrit namespaces such as refs/changes/*,
// refs/users/*, etc., because they're not helpful on other hosts.
"push = +refs/heads/*:refs/heads/*",
"push = +refs/tags/*:refs/tags/*",
); err != nil {
return fmt.Errorf("adding remote: %v", err)
}
}
return nil
}
// GET /
// or:
// GET /debug/watcher/
func (m *mirror) handleRoot(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" && r.URL.Path != "/debug/watcher/" {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
fmt.Fprint(w, "<html><body><pre>")
var names []string
for name := range m.repos {
names = append(names, name)
}
sort.Strings(names)
for _, name := range names {
fmt.Fprintf(w, "<a href='/debug/watcher/%s'>%s</a> - %s\n", name, name, m.repos[name].statusLine())
}
fmt.Fprint(w, "</pre></body></html>")
}
// a statusEntry is a status string at a specific time.
type statusEntry struct {
status string
t time.Time
}
// statusRing is a ring buffer of timestamped status messages.
type statusRing struct {
mu sync.Mutex // guards rest
head int // next position to fill
ent [50]statusEntry // ring buffer of entries; zero time means unpopulated
}
func (r *statusRing) add(status string) {
r.mu.Lock()
defer r.mu.Unlock()
r.ent[r.head] = statusEntry{status, time.Now()}
r.head++
if r.head == len(r.ent) {
r.head = 0
}
}
func (r *statusRing) foreachDesc(fn func(statusEntry)) {
r.mu.Lock()
defer r.mu.Unlock()
i := r.head
for {
i--
if i < 0 {
i = len(r.ent) - 1
}
if i == r.head || r.ent[i].t.IsZero() {
return
}
fn(r.ent[i])
}
}
// Repo represents a repository to be watched.
type Repo struct {
root string // on-disk location of the git repo, *cacheDir/name
changed chan bool // sent to when a change comes in
status statusRing
dests []string // destination remotes to mirror to
mu sync.Mutex
err error
firstBad time.Time
lastBad time.Time
firstGood time.Time
lastGood time.Time
}
// NewRepo checks out an instance of the git repository at url to dir.
func NewRepo(url, dir string) (*Repo, error) {
name := path.Base(url) // "go", "net", etc
root := filepath.Join(dir, name)
r := &Repo{
root: root,
changed: make(chan bool, 1),
}
http.Handle("/debug/watcher/"+r.name(), r)
canReuse := true
if _, err := os.Stat(filepath.Join(r.root, "FETCH_HEAD")); err != nil {
canReuse = false
r.logf("can't reuse git dir, no FETCH_HEAD: %v", err)
}
if canReuse {
r.setStatus("reusing git dir; running git fetch")
cmd := exec.Command("git", "fetch", "--prune", "origin")
cmd.Dir = r.root
r.logf("running git fetch")
t0 := time.Now()
var stderr bytes.Buffer
cmd.Stderr = &stderr
err := cmd.Run()
r.logf("ran git fetch in %v", time.Since(t0))
if err != nil {
canReuse = false
r.logf("git fetch failed; proceeding to wipe + clone instead; err: %v, stderr: %s", err, stderr.Bytes())
}
}
if !canReuse {
r.setStatus("need clone; removing cache root")
os.RemoveAll(r.root)
t0 := time.Now()
r.setStatus("running fresh git clone --mirror")
r.logf("cloning %v into %s", url, r.root)
cmd := exec.Command("git", "clone", "--mirror", url, r.root)
if out, err := cmd.CombinedOutput(); err != nil {
return nil, fmt.Errorf("cloning %s: %v\n\n%s", url, err, out)
}
r.setStatus("cloned")
r.logf("cloned in %v", time.Since(t0))
}
return r, nil
}
func (r *Repo) setErr(err error) {
r.mu.Lock()
defer r.mu.Unlock()
change := (r.err != nil) != (err != nil)
now := time.Now()
if err != nil {
if change {
r.firstBad = now
}
r.lastBad = now
} else {
if change {
r.firstGood = now
}
r.lastGood = now
}
r.err = err
}
var startTime = time.Now()
func (r *Repo) statusLine() string {
r.mu.Lock()
defer r.mu.Unlock()
if r.lastGood.IsZero() {
if r.err != nil {
return fmt.Sprintf("broken; permanently? always failing, for %v", time.Since(r.firstBad))
}
if time.Since(startTime) < 5*time.Minute {
return "ok; starting up, no report yet"
}
return fmt.Sprintf("hung; hang at start-up? no report since start %v ago", time.Since(startTime))
}
if r.err == nil {
if sinceGood := time.Since(r.lastGood); sinceGood > 6*time.Minute {
return fmt.Sprintf("hung? no activity since last success %v ago", sinceGood)
}
if r.lastBad.After(time.Now().Add(-1 * time.Hour)) {
return fmt.Sprintf("ok; recent failure %v ago", time.Since(r.lastBad))
}
return "ok"
}
return fmt.Sprintf("broken for %v", time.Since(r.lastGood))
}
func (r *Repo) setStatus(status string) {
r.status.add(status)
}
func (r *Repo) addRemote(name, url string, opts ...string) error {
r.dests = append(r.dests, name)
cmd := exec.Command("git", "remote", "remove", name)
cmd.Dir = r.root
if err := cmd.Run(); err != nil {
// Exit status 2 means not found, which is fine.
if ee, ok := err.(*exec.ExitError); !ok || ee.ExitCode() != 2 {
return err
}
}
gitConfig := filepath.Join(r.root, "config")
f, err := os.OpenFile(gitConfig, os.O_WRONLY|os.O_APPEND, os.ModePerm)
if err != nil {
return err
}
_, err = fmt.Fprintf(f, "\n[remote %q]\n\turl = %v\n", name, url)
if err != nil {
f.Close()
return err
}
for _, o := range opts {
_, err := fmt.Fprintf(f, "\t%s\n", o)
if err != nil {
f.Close()
return err
}
}
return f.Close()
}
// Loop continuously runs "git fetch" in the repo, checks for new
// commits and mirrors commits to a destination repo (if enabled).
func (r *Repo) Loop() {
outer:
for {
if err := r.fetch(); err != nil {
r.logf("fetch failed in repo loop: %v", err)
r.setErr(err)
time.Sleep(10 * time.Second)
continue
}
for _, dest := range r.dests {
if err := r.push(dest); err != nil {
r.logf("push failed in repo loop: %v", err)
r.setErr(err)
time.Sleep(10 * time.Second)
continue outer
}
}
r.setErr(nil)
r.setStatus("waiting")
// We still run a timer but a very slow one, just
// in case the mechanism updating the repo tickler
// breaks for some reason.
timer := time.NewTimer(5 * time.Minute)
select {
case <-r.changed:
r.setStatus("got update tickle")
timer.Stop()
case <-timer.C:
r.setStatus("poll timer fired")
}
}
}
func (r *Repo) name() string {
return filepath.Base(r.root)
}
func (r *Repo) logf(format string, args ...interface{}) {
log.Printf(r.name()+": "+format, args...)
}
// fetch runs "git fetch" in the repository root.
// It tries three times, just in case it failed because of a transient error.
func (r *Repo) fetch() error {
err := try(3, func(attempt int) error {
r.setStatus(fmt.Sprintf("running git fetch origin, attempt %d", attempt))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
cmd := exec.CommandContext(ctx, "git", "fetch", "--prune", "origin")
cmd.Dir = r.root
if out, err := cmd.CombinedOutput(); err != nil {
err = fmt.Errorf("%v\n\n%s", err, out)
r.logf("git fetch: %v", err)
return err
}
return nil
})
if err != nil {
r.setStatus("git fetch failed")
} else {
r.setStatus("ran git fetch")
}
return err
}
// push runs "git push -f --mirror dest" in the repository root.
// It tries three times, just in case it failed because of a transient error.
func (r *Repo) push(dest string) error {
err := try(3, func(attempt int) error {
r.setStatus(fmt.Sprintf("syncing to %v, attempt %d", dest, attempt))
cmd := exec.Command("git", "push", "-f", "--mirror", dest)
cmd.Dir = r.root
if out, err := cmd.CombinedOutput(); err != nil {
err = fmt.Errorf("%v\n\n%s", err, out)
r.logf("git push failed: %v", err)
return err
}
return nil
})
if err != nil {
r.setStatus("sync to " + dest + " failed")
} else {
r.setStatus("did sync to " + dest)
}
return err
}
// hasRev returns true if the repo contains the commit-ish rev.
func (r *Repo) hasRev(ctx context.Context, rev string) bool {
cmd := exec.CommandContext(ctx, "git", "cat-file", "-t", rev)
cmd.Dir = r.root
return cmd.Run() == nil
}
// if non-nil, used by r.archive to create a "git archive" command.
var testHookArchiveCmd func(context.Context, string, ...string) *exec.Cmd
// if non-nil, used by r.archive to create a "git fetch" command.
var testHookFetchCmd func(context.Context, string, ...string) *exec.Cmd
// archive exports the git repository at the given rev and returns the
// compressed repository.
func (r *Repo) archive(ctx context.Context, rev string) ([]byte, error) {
var cmd *exec.Cmd
if testHookArchiveCmd == nil {
cmd = exec.CommandContext(ctx, "git", "archive", "--format=tgz", rev)
} else {
cmd = testHookArchiveCmd(ctx, "git", "archive", "--format=tgz", rev)
}
cmd.Dir = r.root
return cmd.Output()
}
// fetchRev attempts to fetch rev from remote.
func (r *Repo) fetchRev(ctx context.Context, remote, rev string) error {
var cmd *exec.Cmd
if testHookFetchCmd == nil {
cmd = exec.CommandContext(ctx, "git", "fetch", remote, rev)
} else {
cmd = testHookFetchCmd(ctx, "git", "fetch", remote, rev)
}
cmd.Dir = r.root
return cmd.Run()
}
func (r *Repo) fetchRevIfNeeded(ctx context.Context, rev string) error {
if r.hasRev(ctx, rev) {
return nil
}
r.logf("attempting to fetch missing revision %s from origin", rev)
return r.fetchRev(ctx, "origin", rev)
}
// GET /<name>.tar.gz
// GET /debug/watcher/<name>
func (r *Repo) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method != "GET" && req.Method != "HEAD" {
w.WriteHeader(http.StatusBadRequest)
return
}
if strings.HasPrefix(req.URL.Path, "/debug/watcher/") {
r.serveStatus(w, req)
return
}
rev := req.FormValue("rev")
if rev == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
ctx, cancel := context.WithTimeout(req.Context(), 30*time.Second)
defer cancel()
if err := r.fetchRevIfNeeded(ctx, rev); err != nil {
// Try the archive anyway, it might work
r.logf("error fetching revision %s: %v", rev, err)
}
tgz, err := r.archive(ctx, rev)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Length", strconv.Itoa(len(tgz)))
w.Header().Set("Content-Type", "application/x-compressed")
w.Write(tgz)
}
func (r *Repo) serveStatus(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "text/html")
fmt.Fprintf(w, "<html><head><title>watcher: %s</title><body><h1>watcher status for repo: %q</h1>\n",
r.name(), r.name())
fmt.Fprintf(w, "<pre>\n")
nowRound := time.Now().Round(time.Second)
r.status.foreachDesc(func(ent statusEntry) {
fmt.Fprintf(w, "%v %-20s %v\n",
ent.t.In(time.UTC).Format(time.RFC3339),
nowRound.Sub(ent.t.Round(time.Second)).String()+" ago",
ent.status)
})
fmt.Fprintf(w, "\n</pre></body></html>")
}
func try(n int, fn func(attempt int) error) error {
var err error
for tries := 0; tries < n; tries++ {
time.Sleep(time.Duration(tries) * 5 * time.Second) // Linear back-off.
if err = fn(tries); err == nil {
break
}
}
return err
}
func (m *mirror) notifyChanged(name string) {
repo, ok := m.repos[name]
if !ok {
return
}
select {
case repo.changed <- true:
default:
}
}
// pollGerritAndTickle polls Gerrit's JSON meta URL of all its URLs
// and their current branch heads. When this sees that one has
// changed, it tickles the channel for that repo and wakes up its
// poller, if its poller is in a sleep.
func (m *mirror) pollGerritAndTickle() {
last := map[string]string{} // repo -> last seen hash
for {
gerritRepos, err := m.gerritMetaMap()
if err != nil {
log.Printf("pollGerritAndTickle: gerritMetaMap failed, skipping: %v", err)
gerritRepos = nil
}
for repo, hash := range gerritRepos {
if hash != last[repo] {
last[repo] = hash
m.notifyChanged(repo)
}
}
time.Sleep(*flagPollInterval)
}
}
// subscribeToMaintnerAndTickleLoop subscribes to maintner.golang.org
// and watches for any ref changes in realtime.
func (m *mirror) subscribeToMaintnerAndTickleLoop() {
for {
if err := m.subscribeToMaintnerAndTickle(); err != nil {
log.Printf("maintner loop: %v; retrying in 30 seconds", err)
time.Sleep(30 * time.Second)
}
}
}
func (m *mirror) subscribeToMaintnerAndTickle() error {
ctx := context.Background()
retryTicker := time.NewTicker(10 * time.Second)
defer retryTicker.Stop() // we never return, though
for {
err := maintner.TailNetworkMutationSource(ctx, godata.Server, func(e maintner.MutationStreamEvent) error {
if e.Mutation != nil && e.Mutation.Gerrit != nil {
gm := e.Mutation.Gerrit
if strings.HasPrefix(gm.Project, "go.googlesource.com/") {
proj := strings.TrimPrefix(gm.Project, "go.googlesource.com/")
log.Printf("maintner refs for %s changed", gm.Project)
m.notifyChanged(proj)
}
}
return e.Err
})
log.Printf("maintner tail error: %v; sleeping+restarting", err)
// prevent retry looping faster than once every 10
// seconds; but usually retry immediately in the case
// where we've been runing for a while already.
<-retryTicker.C
}
}
// gerritMetaMap returns the map from repo name (e.g. "go") to its
// latest master hash.
func (m *mirror) gerritMetaMap() (map[string]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
meta, err := m.gerritClient.GetProjects(ctx, "master")
if err != nil {
return nil, fmt.Errorf("gerritClient.GetProjects: %v", err)
}
result := map[string]string{}
for repo, v := range meta {
if master, ok := v.Branches["master"]; ok {
result[repo] = master
}
}
return result, nil
}
// GET /debug/goroutines
func handleDebugGoroutines(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
buf := make([]byte, 1<<20)
w.Write(buf[:runtime.Stack(buf, true)])
}
// GET /debug/env
func handleDebugEnv(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
for _, kv := range os.Environ() {
fmt.Fprintf(w, "%s\n", kv)
}
}