maintner: better gerrit commit syncing

Don't update Gerrit refs until after corresponding git commit
mutations are added and processed. Do the batching earlier, so we
always make forward progress.

Also better logging so you can see what's happening.

Change-Id: I493071af7bccec5abf156025389376b9fc182dc8
Reviewed-on: https://go-review.googlesource.com/38672
Reviewed-by: Kevin Burke <kev@inburke.com>
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/maintner/gerrit.go b/maintner/gerrit.go
index 15a619a..9f445bb 100644
--- a/maintner/gerrit.go
+++ b/maintner/gerrit.go
@@ -75,20 +75,27 @@
 }
 
 type gerritCL struct {
-	Hash       gitHash
-	Number     int32
-	Author     *gitPerson
-	AuthorTime time.Time
-	Status     string // "merged", "abandoned", "new"
-	// TODO...
-}
-
-// gerritMetaCommit holds data about the "meta commit" object that Gerrit
-// returns for a given CL.
-type gerritMetaCommit struct {
-	Hash   gitHash
+	// Number is the CL number on the Gerrit
+	// server. (e.g. 1, 2, 3)
 	Number int32
-	Raw    []byte
+
+	// Version is the number of versions of the patchset for this
+	// CL seen so far. It starts at 1.
+	Version int32
+
+	// Commit is the git commit of the latest version of this CL.
+	// Previous versions are available via GerritProject.remote.
+	Commit *gitCommit
+
+	// Meta is the head of the most recent Gerrit "meta" commit
+	// for this CL. This is guaranteed to be a linear history
+	// back to a CL-specific root commit for this meta branch.
+	Meta *gitCommit
+
+	// Status is TODO.
+	// Will be something like "merged", "abandoned", "new"
+	// Or maybe bools.
+	// Status string
 }
 
 // c.mu must be held
@@ -107,15 +114,20 @@
 	project *GerritProject
 }
 
-// AddGerrit adds the Gerrit project with the given URL to the corpus.
-func (c *Corpus) AddGerrit(gerritURL string) {
+// AddGerrit adds the Gerrit project with the given project to the corpus.
+// The provided string should be of the form "hostname/project", without a scheme
+// or trailing slash.
+func (c *Corpus) AddGerrit(gerritProj string) {
 	c.mu.Lock()
 	defer c.mu.Unlock()
-	if strings.Count(gerritURL, "/") != 1 {
-		panic(fmt.Sprintf("gerrit URL %q expected to contain exactly 1 slash", gerritURL))
+	if strings.Count(gerritProj, "/") != 1 {
+		panic(fmt.Sprintf("gerrit project argument %q expected to contain exactly 1 slash", gerritProj))
 	}
 	c.initGerrit()
-	project := c.gerrit.getOrCreateProject(gerritURL)
+	if _, dup := c.gerrit.projects[gerritProj]; dup {
+		panic("duplicated watched gerrit project " + gerritProj)
+	}
+	project := c.gerrit.getOrCreateProject(gerritProj)
 	if project == nil {
 		panic("gerrit project not created")
 	}
@@ -140,22 +152,36 @@
 
 // called with c.mu Locked
 func (gp *GerritProject) processMutation(gm *maintpb.GerritMutation) {
+	c := gp.gerrit.c
+
 	for _, refp := range gm.Refs {
 		m := rxChangeRef.FindStringSubmatch(refp.Ref)
 		if m == nil {
 			continue
 		}
-		cl, err := strconv.ParseInt(m[1], 10, 32)
+		clNum64, err := strconv.ParseInt(m[1], 10, 32)
 		version, ok := gerritVersionNumber(m[2])
 		if !ok || err != nil {
 			continue
 		}
 		hash := gitHashFromHexStr(refp.Sha1)
-		gp.remote[gerritCLVersion{int32(cl), version}] = hash
-		gp.markNeededCommit(hash)
+		gc, ok := c.gitCommit[hash]
+		if !ok {
+			gp.logf("ERROR: ref %v references unknown hash %v; ignoring", refp, hash)
+			continue
+		}
+		clv := gerritCLVersion{int32(clNum64), version}
+		gp.remote[clv] = hash
+		cl := gp.getOrCreateCL(clv.CLNumber)
+		if clv.Version == 0 {
+			cl.Meta = gc
+		} else {
+			cl.Commit = gc
+			cl.Version = clv.Version
+		}
+		gp.logf("Ref %+v => %v", clv, hash)
 	}
 
-	c := gp.gerrit.c
 	for _, commitp := range gm.Commits {
 		gc, err := c.processGitCommit(commitp)
 		if err != nil {
@@ -183,6 +209,19 @@
 	gp.need[hash] = true
 }
 
+// c.mu must be held
+func (gp *GerritProject) getOrCreateCL(num int32) *gerritCL {
+	cl, ok := gp.cls[num]
+	if ok {
+		return cl
+	}
+	cl = &gerritCL{
+		Number: num,
+	}
+	gp.cls[num] = cl
+	return cl
+}
+
 func gerritVersionNumber(s string) (version int32, ok bool) {
 	if s == "meta" {
 		return 0, true
@@ -236,13 +275,6 @@
 }
 
 func (gp *GerritProject) syncOnce(ctx context.Context) error {
-	if err := gp.syncRefs(ctx); err != nil {
-		return err
-	}
-	return gp.syncCommits(ctx)
-}
-
-func (gp *GerritProject) syncRefs(ctx context.Context) error {
 	c := gp.gerrit.c
 
 	fetchCtx, cancel := context.WithTimeout(ctx, time.Minute)
@@ -265,6 +297,7 @@
 	var toFetch []gitHash
 
 	bs := bufio.NewScanner(bytes.NewReader(out))
+	c.mu.RLock() // to access gp.remote; okay because ls-remote output all in out already
 	for bs.Scan() {
 		m := rxRemoteRef.FindSubmatch(bs.Bytes())
 		if m == nil {
@@ -278,9 +311,7 @@
 		sha1 := m[1]
 		hash := gitHashFromHex(sha1)
 
-		c.mu.RLock()
 		curHash := gp.remote[gerritCLVersion{int32(clNum), version}]
-		c.mu.RUnlock()
 
 		if curHash != hash {
 			toFetch = append(toFetch, hash)
@@ -290,37 +321,63 @@
 			})
 		}
 	}
+	c.mu.RUnlock()
 	if err := bs.Err(); err != nil {
 		return err
 	}
 	if len(changedRefs) == 0 {
 		return nil
 	}
-	gp.logf("%d new refs; fetching...", len(changedRefs))
-	if err := gp.fetchHashes(ctx, toFetch); err != nil {
-		return err
-	}
-	gp.logf("fetched %d new refs.", len(changedRefs))
+	gp.logf("%d new refs", len(changedRefs))
+	const batchSize = 250
+	for len(toFetch) > 0 {
+		batch := toFetch
+		if len(batch) > batchSize {
+			batch = batch[:batchSize]
+		}
+		if err := gp.fetchHashes(ctx, batch); err != nil {
+			return err
+		}
 
-	c.addMutation(&maintpb.Mutation{
-		Gerrit: &maintpb.GerritMutation{
-			Project: gp.proj,
-			Refs:    changedRefs,
-		},
-	})
+		c.mu.Lock()
+		for _, hash := range batch {
+			gp.markNeededCommit(hash)
+		}
+		c.mu.Unlock()
+
+		n, err := gp.syncCommits(ctx)
+		if err != nil {
+			return err
+		}
+		toFetch = toFetch[len(batch):]
+		gp.logf("synced %v commits for %d new hashes, %d hashes remain", n, len(batch), len(toFetch))
+
+		c.addMutation(&maintpb.Mutation{
+			Gerrit: &maintpb.GerritMutation{
+				Project: gp.proj,
+				Refs:    changedRefs[:len(batch)],
+			}})
+		changedRefs = changedRefs[len(batch):]
+	}
 	return nil
 }
 
-func (gp *GerritProject) syncCommits(ctx context.Context) error {
+func (gp *GerritProject) syncCommits(ctx context.Context) (n int, err error) {
 	c := gp.gerrit.c
+	lastLog := time.Now()
 	for {
 		hash := gp.commitToIndex()
 		if hash == nil {
-			return nil
+			return n, nil
+		}
+		now := time.Now()
+		if lastLog.Before(now.Add(-1 * time.Second)) {
+			lastLog = now
+			gp.logf("parsing commits (%v done)", n)
 		}
 		commit, err := parseCommitFromGit(gp.gitDir, hash)
 		if err != nil {
-			return err
+			return n, err
 		}
 		c.addMutation(&maintpb.Mutation{
 			Gerrit: &maintpb.GerritMutation{
@@ -328,6 +385,7 @@
 				Commits: []*maintpb.GitCommit{commit},
 			},
 		})
+		n++
 	}
 }
 
@@ -346,75 +404,19 @@
 	statusSpace = []byte("Status: ")
 )
 
-// newMutationFromCL generates a GerritCLMutation using the smallest possible
-// diff between a (the state we have in memory) and b (the current Gerrit
-// state).
-//
-// If newMutationFromCL returns nil, the provided gerrit CL is no newer than
-// the data we have in the corpus. 'a' may be nil.
-func (gp *GerritProject) newMutationFromCL(a *gerritCL, b *gerritMetaCommit) *maintpb.Mutation {
-	if b == nil {
-		panic("newMutationFromCL: provided nil gerritCL")
+func (gp *GerritProject) fetchHashes(ctx context.Context, hashes []gitHash) error {
+	args := []string{"fetch", "--quiet", "origin"}
+	for _, hash := range hashes {
+		args = append(args, hash.String())
 	}
-	if a == nil {
-		var sha1 string
-		switch b.Hash.(type) {
-		case gitSHA1:
-			sha1 = b.Hash.String()
-		default:
-			panic(fmt.Sprintf("unsupported git hash type %T", b.Hash))
-		}
-		_ = sha1
-		panic("TODO")
-		return &maintpb.Mutation{
-			Gerrit: &maintpb.GerritMutation{
-				Project: gp.proj,
-			},
-		}
-	}
-	// TODO: update the existing proto
-	return nil
-}
-
-// updateCL updates the local CL.
-func (gp *GerritProject) updateCL(ctx context.Context, clNum int32, hash gitHash) error {
-	cmd := exec.CommandContext(ctx, "git", "cat-file", "-p", hash.String())
+	gp.logf("fetching %v hashes...", len(hashes))
+	cmd := exec.CommandContext(ctx, "git", args...)
 	cmd.Dir = gp.gitDir
-	buf, errBuf := new(bytes.Buffer), new(bytes.Buffer)
-	cmd.Stdout = buf
-	cmd.Stderr = errBuf
-	if err := cmd.Run(); err != nil {
+	if out, err := cmd.CombinedOutput(); err != nil {
+		log.Printf("error fetching %d hashes from gerrit project %s: %s", len(hashes), gp.proj, out)
 		return err
 	}
-	cl := &gerritMetaCommit{
-		Number: clNum,
-		Hash:   hash,
-		Raw:    buf.Bytes(),
-	}
-	proto := gp.newMutationFromCL(gp.cls[clNum], cl)
-	gp.gerrit.c.addMutation(proto)
-	return nil
-}
-
-func (gp *GerritProject) fetchHashes(ctx context.Context, hashes []gitHash) error {
-	for len(hashes) > 0 {
-		batch := hashes
-		if len(batch) > 500 {
-			batch = batch[:500]
-		}
-		hashes = hashes[len(batch):]
-
-		args := []string{"fetch", "--quiet", "origin"}
-		for _, hash := range batch {
-			args = append(args, hash.String())
-		}
-		cmd := exec.CommandContext(ctx, "git", args...)
-		cmd.Dir = gp.gitDir
-		if out, err := cmd.CombinedOutput(); err != nil {
-			log.Printf("error fetching %d hashes from gerrit project %s: %s", len(batch), gp.proj, out)
-			return err
-		}
-	}
+	gp.logf("fetched %v hashes.", len(hashes))
 	return nil
 }
 
diff --git a/maintner/git.go b/maintner/git.go
index aac538e..2e790de 100644
--- a/maintner/git.go
+++ b/maintner/git.go
@@ -136,9 +136,6 @@
 			}
 		}
 	}
-	log.Printf("TODO: poll %v from %v", conf.repo, conf.dir)
-	select {} // TODO(bradfitz): actuall poll
-	return nil
 }
 
 // returns nil if no work.
diff --git a/maintner/github.go b/maintner/github.go
index 1014954..9c5c2ca 100644
--- a/maintner/github.go
+++ b/maintner/github.go
@@ -1524,7 +1524,6 @@
 			remain--
 		}
 	}
-	return nil
 }
 
 func (p *githubRepoPoller) syncCommentsOnIssue(ctx context.Context, issueNum int32) error {
@@ -1659,8 +1658,6 @@
 			remain--
 		}
 	}
-	p.logf("event sync: done.")
-	return nil
 }
 
 func (p *githubRepoPoller) syncEventsOnIssue(ctx context.Context, issueNum int32) error {