maintner: flush protobufs to disk
For now, open and close the file each time we write a protobuf. We can improve
the performance in the future as necessary.
Change-Id: Idcc75df5aa59a604c0c0aeeff6ea0e0ff0f2db8f
Reviewed-on: https://go-review.googlesource.com/37626
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/maintner/maintner.go b/maintner/maintner.go
index 0942abb..cdeed3a 100644
--- a/maintner/maintner.go
+++ b/maintner/maintner.go
@@ -41,7 +41,8 @@
githubUsers map[int64]*githubUser
githubRepos []repoObj
// If true, log new commits
- shouldLog bool
+ shouldLog bool
+ MutationLogger MutationLogger
}
type repoObj struct {
@@ -49,11 +50,12 @@
tokenFile string
}
-func NewCorpus() *Corpus {
+func NewCorpus(logger MutationLogger) *Corpus {
return &Corpus{
- githubIssues: make(map[githubRepo]map[int32]*githubIssue),
- githubUsers: make(map[int64]*githubUser),
- githubRepos: []repoObj{},
+ githubIssues: make(map[githubRepo]map[int32]*githubIssue),
+ githubUsers: make(map[int64]*githubUser),
+ githubRepos: []repoObj{},
+ MutationLogger: logger,
}
}
@@ -149,6 +151,19 @@
}
}
+func (c *Corpus) processMutation(m *maintpb.Mutation) {
+ c.mu.Lock()
+ c.processMutationLocked(m)
+ c.mu.Unlock()
+ if c.MutationLogger != nil && c.shouldLog {
+ err := c.MutationLogger.Log(m)
+ if err != nil {
+ // TODO: handle errors better
+ fmt.Printf("could not log mutation %v: %v\n", m, err)
+ }
+ }
+}
+
// c.mu must be held.
func (c *Corpus) processMutationLocked(m *maintpb.Mutation) {
if im := m.GithubIssue; im != nil {
@@ -196,7 +211,7 @@
//
// If newMutationFromIssue returns nil, the provided github.Issue is no newer
// than the data we have in the corpus. ci may be nil.
-func newMutationFromIssue(ci *githubIssue, gi *github.Issue, rp githubRepo) *maintpb.GithubIssueMutation {
+func newMutationFromIssue(ci *githubIssue, gi *github.Issue, rp githubRepo) *maintpb.Mutation {
if gi == nil || gi.Number == nil {
panic(fmt.Sprintf("github issue with nil number: %#v", gi))
}
@@ -227,10 +242,10 @@
if gi.Body != nil {
m.Body = *gi.Body
}
- return m
+ return &maintpb.Mutation{GithubIssue: m}
}
if gi.UpdatedAt != nil {
- if gi.UpdatedAt.Before(ci.Updated) {
+ if !gi.UpdatedAt.After(ci.Updated) {
// This data is stale, ignore it.
return nil
}
@@ -243,12 +258,14 @@
if gi.Body != nil && *gi.Body != ci.Body {
m.Body = *gi.Body
}
- return m
+ return &maintpb.Mutation{GithubIssue: m}
}
// getIssue finds an issue in the Corpus or returns nil, false if it is not
// present.
func (c *Corpus) getIssue(rp githubRepo, number int32) (*githubIssue, bool) {
+ c.mu.RLock()
+ defer c.mu.RUnlock()
issueMap, ok := c.githubIssues[rp]
if !ok {
return nil, false
@@ -365,6 +382,7 @@
return err
}
log.Printf("Polled github for %s; err = %v. Sleeping.", rp, err)
+ // TODO: select and listen for context errors
time.Sleep(30 * time.Second)
}
}
@@ -395,18 +413,16 @@
if len(issues) == 0 {
break
}
- c.mu.Lock()
for _, is := range issues {
- fmt.Printf("issue %d: %s\n", is.ID, *is.Title)
gi, _ := c.getIssue(rp, int32(*is.Number))
mp := newMutationFromIssue(gi, is, rp)
if mp == nil {
keepGoing = false
break
}
- c.processGithubIssueMutation(mp)
+ fmt.Printf("modifying %s, issue %d: %s\n", rp, *is.Number, *is.Title)
+ c.processMutation(mp)
}
- c.mu.Unlock()
page++
}
return nil