maintner: move record reading and formatting code to its own package

It's starting to pollute the package, and seems separable. I also plan
to use it more from elsewhere in upcoming CLs.

Updates golang/go#19866

Change-Id: I7b2add37f74ed42c2f78939924f19d8322179823
Reviewed-on: https://go-review.googlesource.com/40868
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/maintner/logger.go b/maintner/logger.go
index f030c37..8cc4a3f 100644
--- a/maintner/logger.go
+++ b/maintner/logger.go
@@ -5,36 +5,18 @@
 package maintner
 
 import (
-	"bufio"
-	"bytes"
 	"context"
 	"fmt"
-	"io"
 	"log"
 	"os"
 	"path/filepath"
-	"strconv"
 	"strings"
 	"sync"
 	"time"
 
 	"github.com/golang/protobuf/proto"
 	"golang.org/x/build/maintner/maintpb"
-)
-
-// The on-DiskMutationLogger format is as follows:
-//
-// The log is a stream of proto3-marshalled *maintpb.Mutation, spread
-// over 1 or more files named maintner-YYYY-MM-DD.mutlog.  Each record
-// begins with the variably-lengthed prefix "REC@XXX+YYY=" where the
-// 0+ XXXX digits are the hex offset on disk (where the 'R' on disk is
-// written) and the 0+ YYY digits are the hex length of the marshalled
-// proto. After the YYY digits there is a '=' byte before the YYY bytes
-// of proto. There is no record footer.
-var (
-	headerPrefix = []byte("REC@")
-	headerSuffix = []byte("=")
-	plus         = []byte("+")
+	"golang.org/x/build/maintner/reclog"
 )
 
 // A MutationLogger logs mutations.
@@ -73,113 +55,55 @@
 	}
 	d.mu.Lock()
 	defer d.mu.Unlock()
-
-	f, err := os.OpenFile(d.filename(), os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
-	if err != nil {
-		return err
-	}
-	off, err := f.Seek(0, io.SeekEnd)
-	if err != nil {
-		return err
-	}
-	st, err := f.Stat()
-	if err != nil {
-		return err
-	}
-	if off != st.Size() {
-		return fmt.Errorf("Size %v != offset %v", st.Size(), off)
-	}
-
-	var buf bytes.Buffer
-	fmt.Fprintf(&buf, "REC@%x+%x=", off, len(data))
-	buf.Write(data)
-	if _, err := f.Write(buf.Bytes()); err != nil {
-		f.Close()
-		return err
-	}
-	return f.Close()
+	return reclog.AppendRecordToFile(d.filename(), data)
 }
 
-func (d *DiskMutationLogger) GetMutations(ctx context.Context) <-chan *maintpb.Mutation {
+func (d *DiskMutationLogger) ForeachFile(fn func(fullPath string, fi os.FileInfo) error) error {
 	d.mu.RLock()
 	defer d.mu.RUnlock()
-	ch := make(chan *maintpb.Mutation, 50) // buffered: overlap gunzip/unmarshal with loading
 	if d.directory == "" {
 		panic("empty directory")
 	}
+	// Walk guarantees that files are walked in lexical order, which we depend on.
+	return filepath.Walk(d.directory, func(path string, fi os.FileInfo, err error) error {
+		if err != nil {
+			return err
+		}
+		if fi.IsDir() && path != filepath.Clean(d.directory) {
+			return filepath.SkipDir
+		}
+		if !strings.HasPrefix(fi.Name(), "maintner-") {
+			return nil
+		}
+		if !strings.HasSuffix(fi.Name(), ".mutlog") {
+			return nil
+		}
+		return fn(path, fi)
+	})
+}
+
+func (d *DiskMutationLogger) GetMutations(ctx context.Context) <-chan *maintpb.Mutation {
+	ch := make(chan *maintpb.Mutation, 50) // buffered: overlap gunzip/unmarshal with loading
 	go func() {
-		// Walk guarantees that files are walked in lexical order, which we depend on.
-		err := filepath.Walk(d.directory, func(path string, fi os.FileInfo, err error) error {
-			if err != nil {
-				return err
-			}
-			if fi.IsDir() && path != filepath.Clean(d.directory) {
-				return filepath.SkipDir
-			}
-			if !strings.HasPrefix(fi.Name(), "maintner-") {
-				return nil
-			}
-			if !strings.HasSuffix(fi.Name(), ".mutlog") {
-				return nil
-			}
-			var off int64
-			f, err := os.Open(path)
-			if err != nil {
-				return err
-			}
-			defer f.Close()
-			br := bufio.NewReader(f)
-			var buf bytes.Buffer
-			for {
-				startOff := off
-				hdr, err := br.ReadSlice('=')
-				if err != nil {
-					if err == io.EOF && len(hdr) == 0 {
-						return nil
-					}
-					return err
-				}
-				if len(hdr) > 40 {
-					return fmt.Errorf("malformed overlong header %q... at %v, offset %v", hdr[:40], path, startOff)
-				}
-				if !bytes.HasPrefix(hdr, headerPrefix) || !bytes.HasSuffix(hdr, headerSuffix) || bytes.Count(hdr, plus) != 1 {
-					return fmt.Errorf("malformed header %q at %v, offset %v", hdr, path, startOff)
-				}
-				plusPos := bytes.IndexByte(hdr, '+')
-				hdrOff, err := strconv.ParseInt(string(hdr[len(headerPrefix):plusPos]), 16, 64)
-				if err != nil {
-					return fmt.Errorf("malformed header %q (malformed offset) at %v, offset %v", hdr, path, startOff)
-				}
-				if hdrOff != startOff {
-					return fmt.Errorf("malformed header %q with offset %v doesn't match expected offset %v in %v", hdr, hdrOff, startOff, path)
-				}
-				hdrSize, err := strconv.ParseInt(string(hdr[plusPos+1:len(hdr)-1]), 16, 64)
-				if err != nil {
-					return fmt.Errorf("malformed header %q (bad size) at %v, offset %v", hdr, path, startOff)
-				}
-				off += int64(len(hdr))
-
-				buf.Reset()
-				if _, err := io.CopyN(&buf, br, hdrSize); err != nil {
-					return fmt.Errorf("truncated record at offset %v: %v", startOff, err)
-				}
-				off += hdrSize
-
+		defer close(ch)
+		err := d.ForeachFile(func(fullPath string, fi os.FileInfo) error {
+			log.Printf("File %s has size %v", fullPath, fi.Size())
+			return reclog.ForeachFileRecord(fullPath, func(off int64, hdr, rec []byte) error {
 				m := new(maintpb.Mutation)
-				if err := proto.Unmarshal(buf.Bytes(), m); err != nil {
+				if err := proto.Unmarshal(rec, m); err != nil {
 					return err
 				}
 				select {
 				case ch <- m:
+					return nil
 				case <-ctx.Done():
 					return ctx.Err()
 				}
-			}
+			})
 		})
 		if err != nil {
-			log.Printf("error walking directory %s: %v", d.directory, err)
+			panic(err)
 		}
-		close(ch)
 	}()
 	return ch
 }
diff --git a/maintner/reclog/reclog.go b/maintner/reclog/reclog.go
new file mode 100644
index 0000000..1a45e6e
--- /dev/null
+++ b/maintner/reclog/reclog.go
@@ -0,0 +1,147 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package reclog contains readers and writers for a record wrapper
+// format used by maintner.
+package reclog
+
+import (
+	"bufio"
+	"bytes"
+	"fmt"
+	"io"
+	"os"
+	"strconv"
+)
+
+// The reclog format is as follows:
+//
+// The log is a series of binary blobs. Each record begins with the
+// variably-lengthed prefix "REC@XXX+YYY=" where the 0+ XXXX digits
+// are the hex offset on disk (where the 'R' on disk is written) and
+// the 0+ YYY digits are the hex length of the blob. After the YYY
+// digits there is a '=' byte before the YYY bytes of blob. There is
+// no record footer.
+var (
+	headerPrefix = []byte("REC@")
+	headerSuffix = []byte("=")
+	plus         = []byte("+")
+)
+
+// RecordCallback is the callback signature accepted by
+// ForeachFileRecord and ForeachRecord, which read the mutation log
+// format used by DiskMutationLogger.
+//
+// Offset is the offset in the logical of physical file.
+// hdr and bytes are only valid until the function returns
+// and must not be retained.
+//
+// hdr is the record header, in the form "REC@c765c9a+1d3=" (REC@ <hex
+// offset> + <hex len(rec)> + '=').
+//
+// rec is the proto3 binary marshalled representation of
+// *maintpb.Mutation.
+//
+// If the callback returns an error, iteration stops.
+type RecordCallback func(off int64, hdr, rec []byte) error
+
+// ForeachFileRecord calls fn for each record in the named file.
+// Calls to fn are made serially.
+// If fn returns an error, iteration ends and that error is returned.
+func ForeachFileRecord(path string, fn RecordCallback) error {
+	f, err := os.Open(path)
+	if err != nil {
+		return err
+	}
+	defer f.Close()
+	if err := ForeachRecord(f, fn); err != nil {
+		return fmt.Errorf("error in %s: %v", path, err)
+	}
+	return nil
+}
+
+// ForeachRecord calls fn for each record in r.
+// Calls to fn are made serially.
+// If fn returns an error, iteration ends and that error is returned.
+func ForeachRecord(r io.Reader, fn RecordCallback) error {
+	var off int64
+	br := bufio.NewReader(r)
+	var buf bytes.Buffer
+	var hdrBuf bytes.Buffer
+	for {
+		startOff := off
+		hdr, err := br.ReadSlice('=')
+		if err != nil {
+			if err == io.EOF && len(hdr) == 0 {
+				return nil
+			}
+			return err
+		}
+		if len(hdr) > 40 {
+			return fmt.Errorf("malformed overlong header %q at offset %v", hdr[:40], startOff)
+		}
+		hdrBuf.Reset()
+		hdrBuf.Write(hdr)
+		if !bytes.HasPrefix(hdr, headerPrefix) || !bytes.HasSuffix(hdr, headerSuffix) || bytes.Count(hdr, plus) != 1 {
+			return fmt.Errorf("malformed header %q at offset %v", hdr, startOff)
+		}
+		plusPos := bytes.IndexByte(hdr, '+')
+		hdrOff, err := strconv.ParseInt(string(hdr[len(headerPrefix):plusPos]), 16, 64)
+		if err != nil {
+			return fmt.Errorf("malformed header %q (malformed offset) at offset %v", hdr, startOff)
+		}
+		if hdrOff != startOff {
+			return fmt.Errorf("malformed header %q with offset %v doesn't match expected offset %v", hdr, hdrOff, startOff)
+		}
+		hdrSize, err := strconv.ParseInt(string(hdr[plusPos+1:len(hdr)-1]), 16, 64)
+		if err != nil {
+			return fmt.Errorf("malformed header %q (bad size) at offset %v", hdr, startOff)
+		}
+		off += int64(len(hdr))
+
+		buf.Reset()
+		if _, err := io.CopyN(&buf, br, hdrSize); err != nil {
+			return fmt.Errorf("truncated record at offset %v: %v", startOff, err)
+		}
+		off += hdrSize
+		if err := fn(startOff, hdrBuf.Bytes(), buf.Bytes()); err != nil {
+			return err
+		}
+	}
+}
+
+// AppendRecordToFile opens the named filename for append (creating it
+// if necessary) and adds the provided data record to the end.
+// The caller is responsible for file locking.
+func AppendRecordToFile(filename string, data []byte) error {
+	f, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
+	if err != nil {
+		return err
+	}
+	off, err := f.Seek(0, io.SeekEnd)
+	if err != nil {
+		return err
+	}
+	st, err := f.Stat()
+	if err != nil {
+		return err
+	}
+	if off != st.Size() {
+		return fmt.Errorf("Size %v != offset %v", st.Size(), off)
+	}
+	if err := WriteRecord(f, off, data); err != nil {
+		f.Close()
+		return err
+	}
+	return f.Close()
+}
+
+// WriteRecord writes the record data to w, formatting the record
+// wrapper with the given offset off. It is the caller's
+// responsibility to pass the correct offset. Exactly one Write
+// call will be made to w.
+func WriteRecord(w io.Writer, off int64, data []byte) error {
+	_, err := fmt.Fprintf(w, "REC@%x+%x=%s", off, len(data), data)
+	return err
+}