maintner: flush protobufs to disk

For now, open and close the file each time we write a protobuf. We can improve
the performance in the future as necessary.

Change-Id: Idcc75df5aa59a604c0c0aeeff6ea0e0ff0f2db8f
Reviewed-on: https://go-review.googlesource.com/37626
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/maintner/logger.go b/maintner/logger.go
new file mode 100644
index 0000000..c672489
--- /dev/null
+++ b/maintner/logger.go
@@ -0,0 +1,63 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package maintner
+
+import (
+	"bytes"
+	"compress/gzip"
+	"fmt"
+	"os"
+	"path/filepath"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+	"golang.org/x/build/maintner/maintpb"
+)
+
+// A MutationLogger logs mutations.
+type MutationLogger interface {
+	Log(*maintpb.Mutation) error
+}
+
+// DiskMutationLogger logs mutations to disk.
+type DiskMutationLogger struct {
+	directory string
+}
+
+// NewDiskMutationLogger creates a new DiskMutationLogger, which will create
+// mutations in the given directory.
+func NewDiskMutationLogger(directory string) *DiskMutationLogger {
+	return &DiskMutationLogger{directory: directory}
+}
+
+func (d *DiskMutationLogger) filename() string {
+	now := time.Now().UTC()
+	return filepath.Join(d.directory, fmt.Sprintf("maintner-%s.proto.gz", now.Format("2006-01-02")))
+}
+
+// Log will write m to disk. If a mutation file does not exist for the current
+// day, it will be created.
+func (d *DiskMutationLogger) Log(m *maintpb.Mutation) error {
+	f, err := os.OpenFile(d.filename(), os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
+	if err != nil {
+		return err
+	}
+	data, err := proto.Marshal(m)
+	if err != nil {
+		return err
+	}
+	var buf bytes.Buffer
+	zw := gzip.NewWriter(&buf)
+	if _, err := zw.Write(data); err != nil {
+		return err
+	}
+	if err := zw.Close(); err != nil {
+		return err
+	}
+	if _, err := f.Write(buf.Bytes()); err != nil {
+		return err
+	}
+	return f.Close()
+}
diff --git a/maintner/maintner.go b/maintner/maintner.go
index 0942abb..cdeed3a 100644
--- a/maintner/maintner.go
+++ b/maintner/maintner.go
@@ -41,7 +41,8 @@
 	githubUsers  map[int64]*githubUser
 	githubRepos  []repoObj
 	// If true, log new commits
-	shouldLog bool
+	shouldLog      bool
+	MutationLogger MutationLogger
 }
 
 type repoObj struct {
@@ -49,11 +50,12 @@
 	tokenFile string
 }
 
-func NewCorpus() *Corpus {
+func NewCorpus(logger MutationLogger) *Corpus {
 	return &Corpus{
-		githubIssues: make(map[githubRepo]map[int32]*githubIssue),
-		githubUsers:  make(map[int64]*githubUser),
-		githubRepos:  []repoObj{},
+		githubIssues:   make(map[githubRepo]map[int32]*githubIssue),
+		githubUsers:    make(map[int64]*githubUser),
+		githubRepos:    []repoObj{},
+		MutationLogger: logger,
 	}
 }
 
@@ -149,6 +151,19 @@
 	}
 }
 
+func (c *Corpus) processMutation(m *maintpb.Mutation) {
+	c.mu.Lock()
+	c.processMutationLocked(m)
+	c.mu.Unlock()
+	if c.MutationLogger != nil && c.shouldLog {
+		err := c.MutationLogger.Log(m)
+		if err != nil {
+			// TODO: handle errors better
+			fmt.Printf("could not log mutation %v: %v\n", m, err)
+		}
+	}
+}
+
 // c.mu must be held.
 func (c *Corpus) processMutationLocked(m *maintpb.Mutation) {
 	if im := m.GithubIssue; im != nil {
@@ -196,7 +211,7 @@
 //
 // If newMutationFromIssue returns nil, the provided github.Issue is no newer
 // than the data we have in the corpus. ci may be nil.
-func newMutationFromIssue(ci *githubIssue, gi *github.Issue, rp githubRepo) *maintpb.GithubIssueMutation {
+func newMutationFromIssue(ci *githubIssue, gi *github.Issue, rp githubRepo) *maintpb.Mutation {
 	if gi == nil || gi.Number == nil {
 		panic(fmt.Sprintf("github issue with nil number: %#v", gi))
 	}
@@ -227,10 +242,10 @@
 		if gi.Body != nil {
 			m.Body = *gi.Body
 		}
-		return m
+		return &maintpb.Mutation{GithubIssue: m}
 	}
 	if gi.UpdatedAt != nil {
-		if gi.UpdatedAt.Before(ci.Updated) {
+		if !gi.UpdatedAt.After(ci.Updated) {
 			// This data is stale, ignore it.
 			return nil
 		}
@@ -243,12 +258,14 @@
 	if gi.Body != nil && *gi.Body != ci.Body {
 		m.Body = *gi.Body
 	}
-	return m
+	return &maintpb.Mutation{GithubIssue: m}
 }
 
 // getIssue finds an issue in the Corpus or returns nil, false if it is not
 // present.
 func (c *Corpus) getIssue(rp githubRepo, number int32) (*githubIssue, bool) {
+	c.mu.RLock()
+	defer c.mu.RUnlock()
 	issueMap, ok := c.githubIssues[rp]
 	if !ok {
 		return nil, false
@@ -365,6 +382,7 @@
 			return err
 		}
 		log.Printf("Polled github for %s; err = %v. Sleeping.", rp, err)
+		// TODO: select and listen for context errors
 		time.Sleep(30 * time.Second)
 	}
 }
@@ -395,18 +413,16 @@
 		if len(issues) == 0 {
 			break
 		}
-		c.mu.Lock()
 		for _, is := range issues {
-			fmt.Printf("issue %d: %s\n", is.ID, *is.Title)
 			gi, _ := c.getIssue(rp, int32(*is.Number))
 			mp := newMutationFromIssue(gi, is, rp)
 			if mp == nil {
 				keepGoing = false
 				break
 			}
-			c.processGithubIssueMutation(mp)
+			fmt.Printf("modifying %s, issue %d: %s\n", rp, *is.Number, *is.Title)
+			c.processMutation(mp)
 		}
-		c.mu.Unlock()
 		page++
 	}
 	return nil
diff --git a/maintner/maintner_test.go b/maintner/maintner_test.go
index 2bafac5..b3364ca 100644
--- a/maintner/maintner_test.go
+++ b/maintner/maintner_test.go
@@ -15,6 +15,18 @@
 	"golang.org/x/build/maintner/maintpb"
 )
 
+type dummyMutationLogger struct {
+	Mutations []*maintpb.Mutation
+}
+
+func (d *dummyMutationLogger) Log(m *maintpb.Mutation) error {
+	if d.Mutations == nil {
+		d.Mutations = []*maintpb.Mutation{}
+	}
+	d.Mutations = append(d.Mutations, m)
+	return nil
+}
+
 type mutationTest struct {
 	corpus *Corpus
 	want   *Corpus
@@ -23,7 +35,7 @@
 func (mt mutationTest) test(t *testing.T, muts ...*maintpb.Mutation) {
 	c := mt.corpus
 	if c == nil {
-		c = NewCorpus()
+		c = NewCorpus(&dummyMutationLogger{})
 	}
 	for _, m := range muts {
 		c.processMutationLocked(m)
@@ -44,7 +56,7 @@
 }
 
 func TestProcessMutation_Github_NewIssue(t *testing.T) {
-	c := NewCorpus()
+	c := NewCorpus(&dummyMutationLogger{})
 	c.githubUsers = map[int64]*githubUser{
 		100: &githubUser{
 			Login: "gopherbot",
@@ -79,7 +91,7 @@
 func TestProcessMutation_OldIssue(t *testing.T) {
 	// process a mutation with an Updated timestamp older than the existing
 	// issue.
-	c := NewCorpus()
+	c := NewCorpus(&dummyMutationLogger{})
 	c.githubUsers = map[int64]*githubUser{
 		100: &githubUser{
 			Login: "gopherbot",
@@ -136,14 +148,14 @@
 		State:     github.String("closed"),
 	}
 	is := newMutationFromIssue(nil, gh, githubRepo("golang/go"))
-	want := &maintpb.GithubIssueMutation{
+	want := &maintpb.Mutation{GithubIssue: &maintpb.GithubIssueMutation{
 		Owner:   "golang",
 		Repo:    "go",
 		Number:  5,
 		Body:    "body of the issue",
 		Created: tp1,
 		Updated: tp2,
-	}
+	}}
 	if !reflect.DeepEqual(is, want) {
 		t.Errorf("issue mismatch\n got: %#v\nwant: %#v", is, want)
 	}
diff --git a/maintner/maintnerd/maintnerd.go b/maintner/maintnerd/maintnerd.go
index e66bf48..c10a959 100644
--- a/maintner/maintnerd/maintnerd.go
+++ b/maintner/maintnerd/maintnerd.go
@@ -33,12 +33,15 @@
 var (
 	listen      = flag.String("listen", ":6343", "listen address")
 	watchGithub = flag.String("watch-github", "", "Comma separated list of owner/repo pairs to slurp")
+	dataDir     = flag.String("data-dir", "", "Local directory to write protobuf files to")
 )
 
 func main() {
 	flag.Parse()
 	pairs := strings.Split(*watchGithub, ",")
-	corpus := maintner.NewCorpus()
+	// TODO switch based on flags, for now only local file sync works
+	logger := maintner.NewDiskMutationLogger(*dataDir)
+	corpus := maintner.NewCorpus(logger)
 	for _, pair := range pairs {
 		splits := strings.SplitN(pair, "/", 2)
 		if len(splits) != 2 || splits[1] == "" {