maintner: replace gzip-based on-disk format with uncompressed format

The log file gets 50% larger, but loads in half the time.

Change-Id: I0b163f7b39269a5b39bbd51a80a4982682e94863
Reviewed-on: https://go-review.googlesource.com/38723
Reviewed-by: Kevin Burke <kev@inburke.com>
diff --git a/maintner/gerrit.go b/maintner/gerrit.go
index edf1bbb..1fa99f5 100644
--- a/maintner/gerrit.go
+++ b/maintner/gerrit.go
@@ -139,13 +139,14 @@
 // called with c.mu Locked
 func (c *Corpus) processGerritMutation(gm *maintpb.GerritMutation) {
 	if c.gerrit == nil {
-		// Untracked.
-		return
+		// TODO: option to ignore mutation if user isn't interested.
+		c.initGerrit()
 	}
 	gp, ok := c.gerrit.projects[gm.Project]
 	if !ok {
-		// Untracked.
-		return
+		// TODO: option to ignore mutation if user isn't interested.
+		// For now, always process the record.
+		gp = c.gerrit.getOrCreateProject(gm.Project)
 	}
 	gp.processMutation(gm)
 }
diff --git a/maintner/logger.go b/maintner/logger.go
index c0653c5..6f71f43 100644
--- a/maintner/logger.go
+++ b/maintner/logger.go
@@ -7,14 +7,13 @@
 import (
 	"bufio"
 	"bytes"
-	"compress/gzip"
 	"context"
 	"fmt"
 	"io"
-	"io/ioutil"
 	"log"
 	"os"
 	"path/filepath"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -23,6 +22,21 @@
 	"golang.org/x/build/maintner/maintpb"
 )
 
+// The on-DiskMutationLogger format is as follows:
+//
+// The log is a stream of proto3-marshalled *maintpb.Mutation, spread
+// over 1 or more files named maintner-YYYY-MM-DD.mutlog.  Each record
+// begins with the variably-lengthed prefix "REC@XXX+YYY=" where the
+// 0+ XXXX digits are the hex offset on disk (where the 'R' on disk is
+// written) and the 0+ YYY digits are the hex length of the marshalled
+// proto. After the YYY digits there is a '=' byte before the YYY bytes
+// of proto. There is no record footer.
+var (
+	headerPrefix = []byte("REC@")
+	headerSuffix = []byte("=")
+	plus         = []byte("+")
+)
+
 // A MutationLogger logs mutations.
 type MutationLogger interface {
 	Log(*maintpb.Mutation) error
@@ -47,32 +61,40 @@
 // first in lexical order.
 func (d *DiskMutationLogger) filename() string {
 	now := time.Now().UTC()
-	return filepath.Join(d.directory, fmt.Sprintf("maintner-%s.proto.gz", now.Format("2006-01-02")))
+	return filepath.Join(d.directory, fmt.Sprintf("maintner-%s.mutlog", now.Format("2006-01-02")))
 }
 
 // Log will write m to disk. If a mutation file does not exist for the current
 // day, it will be created.
 func (d *DiskMutationLogger) Log(m *maintpb.Mutation) error {
-	d.mu.Lock()
-	defer d.mu.Unlock()
 	data, err := proto.Marshal(m)
 	if err != nil {
 		return err
 	}
-	var buf bytes.Buffer
-	zw := gzip.NewWriter(&buf)
-	if _, err := zw.Write(data); err != nil {
-		return err
-	}
-	if err := zw.Close(); err != nil {
-		return err
-	}
-	// TODO lock the file for writing
+	d.mu.Lock()
+	defer d.mu.Unlock()
+
 	f, err := os.OpenFile(d.filename(), os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
 	if err != nil {
 		return err
 	}
+	off, err := f.Seek(0, io.SeekEnd)
+	if err != nil {
+		return err
+	}
+	st, err := f.Stat()
+	if err != nil {
+		return err
+	}
+	if off != st.Size() {
+		return fmt.Errorf("Size %v != offset %v", st.Size(), off)
+	}
+
+	var buf bytes.Buffer
+	fmt.Fprintf(&buf, "REC@%x+%x=", off, len(data))
+	buf.Write(data)
 	if _, err := f.Write(buf.Bytes()); err != nil {
+		f.Close()
 		return err
 	}
 	return f.Close()
@@ -97,26 +119,54 @@
 			if !strings.HasPrefix(fi.Name(), "maintner-") {
 				return nil
 			}
-			if !strings.HasSuffix(fi.Name(), ".proto.gz") {
+			if !strings.HasSuffix(fi.Name(), ".mutlog") {
 				return nil
 			}
+			var off int64
 			f, err := os.Open(path)
 			if err != nil {
 				return err
 			}
+			defer f.Close()
 			br := bufio.NewReader(f)
-			zr, err := gzip.NewReader(br)
-			if err != nil {
-				return err
-			}
+			var buf bytes.Buffer
 			for {
-				zr.Multistream(false)
-				rec, err := ioutil.ReadAll(zr)
+				startOff := off
+				hdr, err := br.ReadSlice('=')
 				if err != nil {
+					if err == io.EOF && len(hdr) == 0 {
+						return nil
+					}
 					return err
 				}
+				if len(hdr) > 40 {
+					return fmt.Errorf("malformed overlong header %q... at %v, offset %v", hdr[:40], path, startOff)
+				}
+				if !bytes.HasPrefix(hdr, headerPrefix) || !bytes.HasSuffix(hdr, headerSuffix) || bytes.Count(hdr, plus) != 1 {
+					return fmt.Errorf("malformed header %q at %v, offset %v", hdr, path, startOff)
+				}
+				plusPos := bytes.IndexByte(hdr, '+')
+				hdrOff, err := strconv.ParseInt(string(hdr[len(headerPrefix):plusPos]), 16, 64)
+				if err != nil {
+					return fmt.Errorf("malformed header %q (malformed offset) at %v, offset %v", hdr, path, startOff)
+				}
+				if hdrOff != startOff {
+					return fmt.Errorf("malformed header %q with offset %v doesn't match expected offset %v in %v", hdr, hdrOff, startOff, path)
+				}
+				hdrSize, err := strconv.ParseInt(string(hdr[plusPos+1:len(hdr)-1]), 16, 64)
+				if err != nil {
+					return fmt.Errorf("malformed header %q (bad size) at %v, offset %v", hdr, path, startOff)
+				}
+				off += int64(len(hdr))
+
+				buf.Reset()
+				if _, err := io.CopyN(&buf, br, hdrSize); err != nil {
+					return fmt.Errorf("truncated record at offset %v: %v", startOff, err)
+				}
+				off += hdrSize
+
 				m := new(maintpb.Mutation)
-				if err := proto.Unmarshal(rec, m); err != nil {
+				if err := proto.Unmarshal(buf.Bytes(), m); err != nil {
 					return err
 				}
 				select {
@@ -125,18 +175,7 @@
 				case <-ctx.Done():
 					return ctx.Err()
 				}
-				err = zr.Reset(br)
-				if err == io.EOF {
-					break
-				}
-				if err != nil {
-					return err
-				}
 			}
-			if err := f.Close(); err != nil {
-				return err
-			}
-			return zr.Close()
 		})
 		if err != nil {
 			log.Printf("error walking directory %s: %v", d.directory, err)
diff --git a/maintner/maintnerd/maintnerd.go b/maintner/maintnerd/maintnerd.go
index bace35d..53b9142 100644
--- a/maintner/maintnerd/maintnerd.go
+++ b/maintner/maintnerd/maintnerd.go
@@ -26,6 +26,7 @@
 var (
 	listen      = flag.String("listen", "localhost:6343", "listen address")
 	syncQuit    = flag.Bool("sync-and-quit", false, "sync once and quit; don't run a server")
+	initQuit    = flag.Bool("init-and-quit", false, "load the mutation log and quit; don't run a server")
 	verbose     = flag.Bool("verbose", false, "enable verbose debug output")
 	watchGithub = flag.String("watch-github", "", "Comma-separated list of owner/repo pairs to slurp")
 	// TODO: specify gerrit auth via gitcookies or similar
@@ -93,7 +94,7 @@
 
 	var ln net.Listener
 	var err error
-	if !*syncQuit {
+	if !*syncQuit && !*initQuit {
 		ln, err = net.Listen("tcp", *listen)
 		if err != nil {
 			log.Fatal(err)
@@ -117,6 +118,9 @@
 	var ms runtime.MemStats
 	runtime.ReadMemStats(&ms)
 	log.Printf("Loaded data in %v. Memory: %v MB (%v bytes)", initDur, ms.HeapAlloc>>20, ms.HeapAlloc)
+	if *initQuit {
+		return
+	}
 
 	if *syncQuit {
 		if err := corpus.Sync(ctx); err != nil {