maintner: replace gzip-based on-disk format with uncompressed format
The log file gets 50% larger, but loads in half the time.
Change-Id: I0b163f7b39269a5b39bbd51a80a4982682e94863
Reviewed-on: https://go-review.googlesource.com/38723
Reviewed-by: Kevin Burke <kev@inburke.com>
diff --git a/maintner/gerrit.go b/maintner/gerrit.go
index edf1bbb..1fa99f5 100644
--- a/maintner/gerrit.go
+++ b/maintner/gerrit.go
@@ -139,13 +139,14 @@
// called with c.mu Locked
func (c *Corpus) processGerritMutation(gm *maintpb.GerritMutation) {
if c.gerrit == nil {
- // Untracked.
- return
+ // TODO: option to ignore mutation if user isn't interested.
+ c.initGerrit()
}
gp, ok := c.gerrit.projects[gm.Project]
if !ok {
- // Untracked.
- return
+ // TODO: option to ignore mutation if user isn't interested.
+ // For now, always process the record.
+ gp = c.gerrit.getOrCreateProject(gm.Project)
}
gp.processMutation(gm)
}
diff --git a/maintner/logger.go b/maintner/logger.go
index c0653c5..6f71f43 100644
--- a/maintner/logger.go
+++ b/maintner/logger.go
@@ -7,14 +7,13 @@
import (
"bufio"
"bytes"
- "compress/gzip"
"context"
"fmt"
"io"
- "io/ioutil"
"log"
"os"
"path/filepath"
+ "strconv"
"strings"
"sync"
"time"
@@ -23,6 +22,21 @@
"golang.org/x/build/maintner/maintpb"
)
+// The on-DiskMutationLogger format is as follows:
+//
+// The log is a stream of proto3-marshalled *maintpb.Mutation, spread
+// over 1 or more files named maintner-YYYY-MM-DD.mutlog. Each record
+// begins with the variably-lengthed prefix "REC@XXX+YYY=" where the
+// 0+ XXXX digits are the hex offset on disk (where the 'R' on disk is
+// written) and the 0+ YYY digits are the hex length of the marshalled
+// proto. After the YYY digits there is a '=' byte before the YYY bytes
+// of proto. There is no record footer.
+var (
+ headerPrefix = []byte("REC@")
+ headerSuffix = []byte("=")
+ plus = []byte("+")
+)
+
// A MutationLogger logs mutations.
type MutationLogger interface {
Log(*maintpb.Mutation) error
@@ -47,32 +61,40 @@
// first in lexical order.
func (d *DiskMutationLogger) filename() string {
now := time.Now().UTC()
- return filepath.Join(d.directory, fmt.Sprintf("maintner-%s.proto.gz", now.Format("2006-01-02")))
+ return filepath.Join(d.directory, fmt.Sprintf("maintner-%s.mutlog", now.Format("2006-01-02")))
}
// Log will write m to disk. If a mutation file does not exist for the current
// day, it will be created.
func (d *DiskMutationLogger) Log(m *maintpb.Mutation) error {
- d.mu.Lock()
- defer d.mu.Unlock()
data, err := proto.Marshal(m)
if err != nil {
return err
}
- var buf bytes.Buffer
- zw := gzip.NewWriter(&buf)
- if _, err := zw.Write(data); err != nil {
- return err
- }
- if err := zw.Close(); err != nil {
- return err
- }
- // TODO lock the file for writing
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
f, err := os.OpenFile(d.filename(), os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return err
}
+ off, err := f.Seek(0, io.SeekEnd)
+ if err != nil {
+ return err
+ }
+ st, err := f.Stat()
+ if err != nil {
+ return err
+ }
+ if off != st.Size() {
+ return fmt.Errorf("Size %v != offset %v", st.Size(), off)
+ }
+
+ var buf bytes.Buffer
+ fmt.Fprintf(&buf, "REC@%x+%x=", off, len(data))
+ buf.Write(data)
if _, err := f.Write(buf.Bytes()); err != nil {
+ f.Close()
return err
}
return f.Close()
@@ -97,26 +119,54 @@
if !strings.HasPrefix(fi.Name(), "maintner-") {
return nil
}
- if !strings.HasSuffix(fi.Name(), ".proto.gz") {
+ if !strings.HasSuffix(fi.Name(), ".mutlog") {
return nil
}
+ var off int64
f, err := os.Open(path)
if err != nil {
return err
}
+ defer f.Close()
br := bufio.NewReader(f)
- zr, err := gzip.NewReader(br)
- if err != nil {
- return err
- }
+ var buf bytes.Buffer
for {
- zr.Multistream(false)
- rec, err := ioutil.ReadAll(zr)
+ startOff := off
+ hdr, err := br.ReadSlice('=')
if err != nil {
+ if err == io.EOF && len(hdr) == 0 {
+ return nil
+ }
return err
}
+ if len(hdr) > 40 {
+ return fmt.Errorf("malformed overlong header %q... at %v, offset %v", hdr[:40], path, startOff)
+ }
+ if !bytes.HasPrefix(hdr, headerPrefix) || !bytes.HasSuffix(hdr, headerSuffix) || bytes.Count(hdr, plus) != 1 {
+ return fmt.Errorf("malformed header %q at %v, offset %v", hdr, path, startOff)
+ }
+ plusPos := bytes.IndexByte(hdr, '+')
+ hdrOff, err := strconv.ParseInt(string(hdr[len(headerPrefix):plusPos]), 16, 64)
+ if err != nil {
+ return fmt.Errorf("malformed header %q (malformed offset) at %v, offset %v", hdr, path, startOff)
+ }
+ if hdrOff != startOff {
+ return fmt.Errorf("malformed header %q with offset %v doesn't match expected offset %v in %v", hdr, hdrOff, startOff, path)
+ }
+ hdrSize, err := strconv.ParseInt(string(hdr[plusPos+1:len(hdr)-1]), 16, 64)
+ if err != nil {
+ return fmt.Errorf("malformed header %q (bad size) at %v, offset %v", hdr, path, startOff)
+ }
+ off += int64(len(hdr))
+
+ buf.Reset()
+ if _, err := io.CopyN(&buf, br, hdrSize); err != nil {
+ return fmt.Errorf("truncated record at offset %v: %v", startOff, err)
+ }
+ off += hdrSize
+
m := new(maintpb.Mutation)
- if err := proto.Unmarshal(rec, m); err != nil {
+ if err := proto.Unmarshal(buf.Bytes(), m); err != nil {
return err
}
select {
@@ -125,18 +175,7 @@
case <-ctx.Done():
return ctx.Err()
}
- err = zr.Reset(br)
- if err == io.EOF {
- break
- }
- if err != nil {
- return err
- }
}
- if err := f.Close(); err != nil {
- return err
- }
- return zr.Close()
})
if err != nil {
log.Printf("error walking directory %s: %v", d.directory, err)
diff --git a/maintner/maintnerd/maintnerd.go b/maintner/maintnerd/maintnerd.go
index bace35d..53b9142 100644
--- a/maintner/maintnerd/maintnerd.go
+++ b/maintner/maintnerd/maintnerd.go
@@ -26,6 +26,7 @@
var (
listen = flag.String("listen", "localhost:6343", "listen address")
syncQuit = flag.Bool("sync-and-quit", false, "sync once and quit; don't run a server")
+ initQuit = flag.Bool("init-and-quit", false, "load the mutation log and quit; don't run a server")
verbose = flag.Bool("verbose", false, "enable verbose debug output")
watchGithub = flag.String("watch-github", "", "Comma-separated list of owner/repo pairs to slurp")
// TODO: specify gerrit auth via gitcookies or similar
@@ -93,7 +94,7 @@
var ln net.Listener
var err error
- if !*syncQuit {
+ if !*syncQuit && !*initQuit {
ln, err = net.Listen("tcp", *listen)
if err != nil {
log.Fatal(err)
@@ -117,6 +118,9 @@
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
log.Printf("Loaded data in %v. Memory: %v MB (%v bytes)", initDur, ms.HeapAlloc>>20, ms.HeapAlloc)
+ if *initQuit {
+ return
+ }
if *syncQuit {
if err := corpus.Sync(ctx); err != nil {