maintner: flush protobufs to disk

For now, open and close the file each time we write a protobuf. We can improve
the performance in the future as necessary.

Change-Id: Idcc75df5aa59a604c0c0aeeff6ea0e0ff0f2db8f
Reviewed-on: https://go-review.googlesource.com/37626
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/maintner/maintner.go b/maintner/maintner.go
index 0942abb..cdeed3a 100644
--- a/maintner/maintner.go
+++ b/maintner/maintner.go
@@ -41,7 +41,8 @@
 	githubUsers  map[int64]*githubUser
 	githubRepos  []repoObj
 	// If true, log new commits
-	shouldLog bool
+	shouldLog      bool
+	MutationLogger MutationLogger
 }
 
 type repoObj struct {
@@ -49,11 +50,12 @@
 	tokenFile string
 }
 
-func NewCorpus() *Corpus {
+func NewCorpus(logger MutationLogger) *Corpus {
 	return &Corpus{
-		githubIssues: make(map[githubRepo]map[int32]*githubIssue),
-		githubUsers:  make(map[int64]*githubUser),
-		githubRepos:  []repoObj{},
+		githubIssues:   make(map[githubRepo]map[int32]*githubIssue),
+		githubUsers:    make(map[int64]*githubUser),
+		githubRepos:    []repoObj{},
+		MutationLogger: logger,
 	}
 }
 
@@ -149,6 +151,19 @@
 	}
 }
 
+func (c *Corpus) processMutation(m *maintpb.Mutation) {
+	c.mu.Lock()
+	c.processMutationLocked(m)
+	c.mu.Unlock()
+	if c.MutationLogger != nil && c.shouldLog {
+		err := c.MutationLogger.Log(m)
+		if err != nil {
+			// TODO: handle errors better
+			fmt.Printf("could not log mutation %v: %v\n", m, err)
+		}
+	}
+}
+
 // c.mu must be held.
 func (c *Corpus) processMutationLocked(m *maintpb.Mutation) {
 	if im := m.GithubIssue; im != nil {
@@ -196,7 +211,7 @@
 //
 // If newMutationFromIssue returns nil, the provided github.Issue is no newer
 // than the data we have in the corpus. ci may be nil.
-func newMutationFromIssue(ci *githubIssue, gi *github.Issue, rp githubRepo) *maintpb.GithubIssueMutation {
+func newMutationFromIssue(ci *githubIssue, gi *github.Issue, rp githubRepo) *maintpb.Mutation {
 	if gi == nil || gi.Number == nil {
 		panic(fmt.Sprintf("github issue with nil number: %#v", gi))
 	}
@@ -227,10 +242,10 @@
 		if gi.Body != nil {
 			m.Body = *gi.Body
 		}
-		return m
+		return &maintpb.Mutation{GithubIssue: m}
 	}
 	if gi.UpdatedAt != nil {
-		if gi.UpdatedAt.Before(ci.Updated) {
+		if !gi.UpdatedAt.After(ci.Updated) {
 			// This data is stale, ignore it.
 			return nil
 		}
@@ -243,12 +258,14 @@
 	if gi.Body != nil && *gi.Body != ci.Body {
 		m.Body = *gi.Body
 	}
-	return m
+	return &maintpb.Mutation{GithubIssue: m}
 }
 
 // getIssue finds an issue in the Corpus or returns nil, false if it is not
 // present.
 func (c *Corpus) getIssue(rp githubRepo, number int32) (*githubIssue, bool) {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
 	issueMap, ok := c.githubIssues[rp]
 	if !ok {
 		return nil, false
@@ -365,6 +382,7 @@
 			return err
 		}
 		log.Printf("Polled github for %s; err = %v. Sleeping.", rp, err)
+		// TODO: select and listen for context errors
 		time.Sleep(30 * time.Second)
 	}
 }
@@ -395,18 +413,16 @@
 		if len(issues) == 0 {
 			break
 		}
-		c.mu.Lock()
 		for _, is := range issues {
-			fmt.Printf("issue %d: %s\n", is.ID, *is.Title)
 			gi, _ := c.getIssue(rp, int32(*is.Number))
 			mp := newMutationFromIssue(gi, is, rp)
 			if mp == nil {
 				keepGoing = false
 				break
 			}
-			c.processGithubIssueMutation(mp)
+			fmt.Printf("modifying %s, issue %d: %s\n", rp, *is.Number, *is.Title)
+			c.processMutation(mp)
 		}
-		c.mu.Unlock()
 		page++
 	}
 	return nil