maintner: flush protobufs to disk
For now, open and close the file each time we write a protobuf. We can improve
the performance in the future as necessary.
Change-Id: Idcc75df5aa59a604c0c0aeeff6ea0e0ff0f2db8f
Reviewed-on: https://go-review.googlesource.com/37626
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/maintner/logger.go b/maintner/logger.go
new file mode 100644
index 0000000..c672489
--- /dev/null
+++ b/maintner/logger.go
@@ -0,0 +1,63 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package maintner
+
+import (
+ "bytes"
+ "compress/gzip"
+ "fmt"
+ "os"
+ "path/filepath"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "golang.org/x/build/maintner/maintpb"
+)
+
+// A MutationLogger logs mutations.
+type MutationLogger interface {
+ Log(*maintpb.Mutation) error
+}
+
+// DiskMutationLogger logs mutations to disk.
+type DiskMutationLogger struct {
+ directory string
+}
+
+// NewDiskMutationLogger creates a new DiskMutationLogger, which will create
+// mutations in the given directory.
+func NewDiskMutationLogger(directory string) *DiskMutationLogger {
+ return &DiskMutationLogger{directory: directory}
+}
+
+func (d *DiskMutationLogger) filename() string {
+ now := time.Now().UTC()
+ return filepath.Join(d.directory, fmt.Sprintf("maintner-%s.proto.gz", now.Format("2006-01-02")))
+}
+
+// Log will write m to disk. If a mutation file does not exist for the current
+// day, it will be created.
+func (d *DiskMutationLogger) Log(m *maintpb.Mutation) error {
+ f, err := os.OpenFile(d.filename(), os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
+ if err != nil {
+ return err
+ }
+ data, err := proto.Marshal(m)
+ if err != nil {
+ return err
+ }
+ var buf bytes.Buffer
+ zw := gzip.NewWriter(&buf)
+ if _, err := zw.Write(data); err != nil {
+ return err
+ }
+ if err := zw.Close(); err != nil {
+ return err
+ }
+ if _, err := f.Write(buf.Bytes()); err != nil {
+ return err
+ }
+ return f.Close()
+}
diff --git a/maintner/maintner.go b/maintner/maintner.go
index 0942abb..cdeed3a 100644
--- a/maintner/maintner.go
+++ b/maintner/maintner.go
@@ -41,7 +41,8 @@
githubUsers map[int64]*githubUser
githubRepos []repoObj
// If true, log new commits
- shouldLog bool
+ shouldLog bool
+ MutationLogger MutationLogger
}
type repoObj struct {
@@ -49,11 +50,12 @@
tokenFile string
}
-func NewCorpus() *Corpus {
+func NewCorpus(logger MutationLogger) *Corpus {
return &Corpus{
- githubIssues: make(map[githubRepo]map[int32]*githubIssue),
- githubUsers: make(map[int64]*githubUser),
- githubRepos: []repoObj{},
+ githubIssues: make(map[githubRepo]map[int32]*githubIssue),
+ githubUsers: make(map[int64]*githubUser),
+ githubRepos: []repoObj{},
+ MutationLogger: logger,
}
}
@@ -149,6 +151,19 @@
}
}
+func (c *Corpus) processMutation(m *maintpb.Mutation) {
+ c.mu.Lock()
+ c.processMutationLocked(m)
+ c.mu.Unlock()
+ if c.MutationLogger != nil && c.shouldLog {
+ err := c.MutationLogger.Log(m)
+ if err != nil {
+ // TODO: handle errors better
+ fmt.Printf("could not log mutation %v: %v\n", m, err)
+ }
+ }
+}
+
// c.mu must be held.
func (c *Corpus) processMutationLocked(m *maintpb.Mutation) {
if im := m.GithubIssue; im != nil {
@@ -196,7 +211,7 @@
//
// If newMutationFromIssue returns nil, the provided github.Issue is no newer
// than the data we have in the corpus. ci may be nil.
-func newMutationFromIssue(ci *githubIssue, gi *github.Issue, rp githubRepo) *maintpb.GithubIssueMutation {
+func newMutationFromIssue(ci *githubIssue, gi *github.Issue, rp githubRepo) *maintpb.Mutation {
if gi == nil || gi.Number == nil {
panic(fmt.Sprintf("github issue with nil number: %#v", gi))
}
@@ -227,10 +242,10 @@
if gi.Body != nil {
m.Body = *gi.Body
}
- return m
+ return &maintpb.Mutation{GithubIssue: m}
}
if gi.UpdatedAt != nil {
- if gi.UpdatedAt.Before(ci.Updated) {
+ if !gi.UpdatedAt.After(ci.Updated) {
// This data is stale, ignore it.
return nil
}
@@ -243,12 +258,14 @@
if gi.Body != nil && *gi.Body != ci.Body {
m.Body = *gi.Body
}
- return m
+ return &maintpb.Mutation{GithubIssue: m}
}
// getIssue finds an issue in the Corpus or returns nil, false if it is not
// present.
func (c *Corpus) getIssue(rp githubRepo, number int32) (*githubIssue, bool) {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
issueMap, ok := c.githubIssues[rp]
if !ok {
return nil, false
@@ -365,6 +382,7 @@
return err
}
log.Printf("Polled github for %s; err = %v. Sleeping.", rp, err)
+ // TODO: select and listen for context errors
time.Sleep(30 * time.Second)
}
}
@@ -395,18 +413,16 @@
if len(issues) == 0 {
break
}
- c.mu.Lock()
for _, is := range issues {
- fmt.Printf("issue %d: %s\n", is.ID, *is.Title)
gi, _ := c.getIssue(rp, int32(*is.Number))
mp := newMutationFromIssue(gi, is, rp)
if mp == nil {
keepGoing = false
break
}
- c.processGithubIssueMutation(mp)
+ fmt.Printf("modifying %s, issue %d: %s\n", rp, *is.Number, *is.Title)
+ c.processMutation(mp)
}
- c.mu.Unlock()
page++
}
return nil
diff --git a/maintner/maintner_test.go b/maintner/maintner_test.go
index 2bafac5..b3364ca 100644
--- a/maintner/maintner_test.go
+++ b/maintner/maintner_test.go
@@ -15,6 +15,18 @@
"golang.org/x/build/maintner/maintpb"
)
+type dummyMutationLogger struct {
+ Mutations []*maintpb.Mutation
+}
+
+func (d *dummyMutationLogger) Log(m *maintpb.Mutation) error {
+ if d.Mutations == nil {
+ d.Mutations = []*maintpb.Mutation{}
+ }
+ d.Mutations = append(d.Mutations, m)
+ return nil
+}
+
type mutationTest struct {
corpus *Corpus
want *Corpus
@@ -23,7 +35,7 @@
func (mt mutationTest) test(t *testing.T, muts ...*maintpb.Mutation) {
c := mt.corpus
if c == nil {
- c = NewCorpus()
+ c = NewCorpus(&dummyMutationLogger{})
}
for _, m := range muts {
c.processMutationLocked(m)
@@ -44,7 +56,7 @@
}
func TestProcessMutation_Github_NewIssue(t *testing.T) {
- c := NewCorpus()
+ c := NewCorpus(&dummyMutationLogger{})
c.githubUsers = map[int64]*githubUser{
100: &githubUser{
Login: "gopherbot",
@@ -79,7 +91,7 @@
func TestProcessMutation_OldIssue(t *testing.T) {
// process a mutation with an Updated timestamp older than the existing
// issue.
- c := NewCorpus()
+ c := NewCorpus(&dummyMutationLogger{})
c.githubUsers = map[int64]*githubUser{
100: &githubUser{
Login: "gopherbot",
@@ -136,14 +148,14 @@
State: github.String("closed"),
}
is := newMutationFromIssue(nil, gh, githubRepo("golang/go"))
- want := &maintpb.GithubIssueMutation{
+ want := &maintpb.Mutation{GithubIssue: &maintpb.GithubIssueMutation{
Owner: "golang",
Repo: "go",
Number: 5,
Body: "body of the issue",
Created: tp1,
Updated: tp2,
- }
+ }}
if !reflect.DeepEqual(is, want) {
t.Errorf("issue mismatch\n got: %#v\nwant: %#v", is, want)
}
diff --git a/maintner/maintnerd/maintnerd.go b/maintner/maintnerd/maintnerd.go
index e66bf48..c10a959 100644
--- a/maintner/maintnerd/maintnerd.go
+++ b/maintner/maintnerd/maintnerd.go
@@ -33,12 +33,15 @@
var (
listen = flag.String("listen", ":6343", "listen address")
watchGithub = flag.String("watch-github", "", "Comma separated list of owner/repo pairs to slurp")
+ dataDir = flag.String("data-dir", "", "Local directory to write protobuf files to")
)
func main() {
flag.Parse()
pairs := strings.Split(*watchGithub, ",")
- corpus := maintner.NewCorpus()
+ // TODO switch based on flags, for now only local file sync works
+ logger := maintner.NewDiskMutationLogger(*dataDir)
+ corpus := maintner.NewCorpus(logger)
for _, pair := range pairs {
splits := strings.SplitN(pair, "/", 2)
if len(splits) != 2 || splits[1] == "" {