maintner: move record reading and formatting code to its own package
It's starting to pollute the package, and seems separable. I also plan
to use it more from elsewhere in upcoming CLs.
Updates golang/go#19866
Change-Id: I7b2add37f74ed42c2f78939924f19d8322179823
Reviewed-on: https://go-review.googlesource.com/40868
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/maintner/logger.go b/maintner/logger.go
index f030c37..8cc4a3f 100644
--- a/maintner/logger.go
+++ b/maintner/logger.go
@@ -5,36 +5,18 @@
package maintner
import (
- "bufio"
- "bytes"
"context"
"fmt"
- "io"
"log"
"os"
"path/filepath"
- "strconv"
"strings"
"sync"
"time"
"github.com/golang/protobuf/proto"
"golang.org/x/build/maintner/maintpb"
-)
-
-// The on-DiskMutationLogger format is as follows:
-//
-// The log is a stream of proto3-marshalled *maintpb.Mutation, spread
-// over 1 or more files named maintner-YYYY-MM-DD.mutlog. Each record
-// begins with the variably-lengthed prefix "REC@XXX+YYY=" where the
-// 0+ XXXX digits are the hex offset on disk (where the 'R' on disk is
-// written) and the 0+ YYY digits are the hex length of the marshalled
-// proto. After the YYY digits there is a '=' byte before the YYY bytes
-// of proto. There is no record footer.
-var (
- headerPrefix = []byte("REC@")
- headerSuffix = []byte("=")
- plus = []byte("+")
+ "golang.org/x/build/maintner/reclog"
)
// A MutationLogger logs mutations.
@@ -73,113 +55,55 @@
}
d.mu.Lock()
defer d.mu.Unlock()
-
- f, err := os.OpenFile(d.filename(), os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
- if err != nil {
- return err
- }
- off, err := f.Seek(0, io.SeekEnd)
- if err != nil {
- return err
- }
- st, err := f.Stat()
- if err != nil {
- return err
- }
- if off != st.Size() {
- return fmt.Errorf("Size %v != offset %v", st.Size(), off)
- }
-
- var buf bytes.Buffer
- fmt.Fprintf(&buf, "REC@%x+%x=", off, len(data))
- buf.Write(data)
- if _, err := f.Write(buf.Bytes()); err != nil {
- f.Close()
- return err
- }
- return f.Close()
+ return reclog.AppendRecordToFile(d.filename(), data)
}
-func (d *DiskMutationLogger) GetMutations(ctx context.Context) <-chan *maintpb.Mutation {
+func (d *DiskMutationLogger) ForeachFile(fn func(fullPath string, fi os.FileInfo) error) error {
d.mu.RLock()
defer d.mu.RUnlock()
- ch := make(chan *maintpb.Mutation, 50) // buffered: overlap gunzip/unmarshal with loading
if d.directory == "" {
panic("empty directory")
}
+ // Walk guarantees that files are walked in lexical order, which we depend on.
+ return filepath.Walk(d.directory, func(path string, fi os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
+ if fi.IsDir() && path != filepath.Clean(d.directory) {
+ return filepath.SkipDir
+ }
+ if !strings.HasPrefix(fi.Name(), "maintner-") {
+ return nil
+ }
+ if !strings.HasSuffix(fi.Name(), ".mutlog") {
+ return nil
+ }
+ return fn(path, fi)
+ })
+}
+
+func (d *DiskMutationLogger) GetMutations(ctx context.Context) <-chan *maintpb.Mutation {
+ ch := make(chan *maintpb.Mutation, 50) // buffered: overlap gunzip/unmarshal with loading
go func() {
- // Walk guarantees that files are walked in lexical order, which we depend on.
- err := filepath.Walk(d.directory, func(path string, fi os.FileInfo, err error) error {
- if err != nil {
- return err
- }
- if fi.IsDir() && path != filepath.Clean(d.directory) {
- return filepath.SkipDir
- }
- if !strings.HasPrefix(fi.Name(), "maintner-") {
- return nil
- }
- if !strings.HasSuffix(fi.Name(), ".mutlog") {
- return nil
- }
- var off int64
- f, err := os.Open(path)
- if err != nil {
- return err
- }
- defer f.Close()
- br := bufio.NewReader(f)
- var buf bytes.Buffer
- for {
- startOff := off
- hdr, err := br.ReadSlice('=')
- if err != nil {
- if err == io.EOF && len(hdr) == 0 {
- return nil
- }
- return err
- }
- if len(hdr) > 40 {
- return fmt.Errorf("malformed overlong header %q... at %v, offset %v", hdr[:40], path, startOff)
- }
- if !bytes.HasPrefix(hdr, headerPrefix) || !bytes.HasSuffix(hdr, headerSuffix) || bytes.Count(hdr, plus) != 1 {
- return fmt.Errorf("malformed header %q at %v, offset %v", hdr, path, startOff)
- }
- plusPos := bytes.IndexByte(hdr, '+')
- hdrOff, err := strconv.ParseInt(string(hdr[len(headerPrefix):plusPos]), 16, 64)
- if err != nil {
- return fmt.Errorf("malformed header %q (malformed offset) at %v, offset %v", hdr, path, startOff)
- }
- if hdrOff != startOff {
- return fmt.Errorf("malformed header %q with offset %v doesn't match expected offset %v in %v", hdr, hdrOff, startOff, path)
- }
- hdrSize, err := strconv.ParseInt(string(hdr[plusPos+1:len(hdr)-1]), 16, 64)
- if err != nil {
- return fmt.Errorf("malformed header %q (bad size) at %v, offset %v", hdr, path, startOff)
- }
- off += int64(len(hdr))
-
- buf.Reset()
- if _, err := io.CopyN(&buf, br, hdrSize); err != nil {
- return fmt.Errorf("truncated record at offset %v: %v", startOff, err)
- }
- off += hdrSize
-
+ defer close(ch)
+ err := d.ForeachFile(func(fullPath string, fi os.FileInfo) error {
+ log.Printf("File %s has size %v", fullPath, fi.Size())
+ return reclog.ForeachFileRecord(fullPath, func(off int64, hdr, rec []byte) error {
m := new(maintpb.Mutation)
- if err := proto.Unmarshal(buf.Bytes(), m); err != nil {
+ if err := proto.Unmarshal(rec, m); err != nil {
return err
}
select {
case ch <- m:
+ return nil
case <-ctx.Done():
return ctx.Err()
}
- }
+ })
})
if err != nil {
- log.Printf("error walking directory %s: %v", d.directory, err)
+ panic(err)
}
- close(ch)
}()
return ch
}
diff --git a/maintner/reclog/reclog.go b/maintner/reclog/reclog.go
new file mode 100644
index 0000000..1a45e6e
--- /dev/null
+++ b/maintner/reclog/reclog.go
@@ -0,0 +1,147 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package reclog contains readers and writers for a record wrapper
+// format used by maintner.
+package reclog
+
+import (
+ "bufio"
+ "bytes"
+ "fmt"
+ "io"
+ "os"
+ "strconv"
+)
+
+// The reclog format is as follows:
+//
+// The log is a series of binary blobs. Each record begins with the
+// variably-lengthed prefix "REC@XXX+YYY=" where the 0+ XXXX digits
+// are the hex offset on disk (where the 'R' on disk is written) and
+// the 0+ YYY digits are the hex length of the blob. After the YYY
+// digits there is a '=' byte before the YYY bytes of blob. There is
+// no record footer.
+var (
+ headerPrefix = []byte("REC@")
+ headerSuffix = []byte("=")
+ plus = []byte("+")
+)
+
+// RecordCallback is the callback signature accepted by
+// ForeachFileRecord and ForeachRecord, which read the mutation log
+// format used by DiskMutationLogger.
+//
+// Offset is the offset in the logical of physical file.
+// hdr and bytes are only valid until the function returns
+// and must not be retained.
+//
+// hdr is the record header, in the form "REC@c765c9a+1d3=" (REC@ <hex
+// offset> + <hex len(rec)> + '=').
+//
+// rec is the proto3 binary marshalled representation of
+// *maintpb.Mutation.
+//
+// If the callback returns an error, iteration stops.
+type RecordCallback func(off int64, hdr, rec []byte) error
+
+// ForeachFileRecord calls fn for each record in the named file.
+// Calls to fn are made serially.
+// If fn returns an error, iteration ends and that error is returned.
+func ForeachFileRecord(path string, fn RecordCallback) error {
+ f, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ if err := ForeachRecord(f, fn); err != nil {
+ return fmt.Errorf("error in %s: %v", path, err)
+ }
+ return nil
+}
+
+// ForeachRecord calls fn for each record in r.
+// Calls to fn are made serially.
+// If fn returns an error, iteration ends and that error is returned.
+func ForeachRecord(r io.Reader, fn RecordCallback) error {
+ var off int64
+ br := bufio.NewReader(r)
+ var buf bytes.Buffer
+ var hdrBuf bytes.Buffer
+ for {
+ startOff := off
+ hdr, err := br.ReadSlice('=')
+ if err != nil {
+ if err == io.EOF && len(hdr) == 0 {
+ return nil
+ }
+ return err
+ }
+ if len(hdr) > 40 {
+ return fmt.Errorf("malformed overlong header %q at offset %v", hdr[:40], startOff)
+ }
+ hdrBuf.Reset()
+ hdrBuf.Write(hdr)
+ if !bytes.HasPrefix(hdr, headerPrefix) || !bytes.HasSuffix(hdr, headerSuffix) || bytes.Count(hdr, plus) != 1 {
+ return fmt.Errorf("malformed header %q at offset %v", hdr, startOff)
+ }
+ plusPos := bytes.IndexByte(hdr, '+')
+ hdrOff, err := strconv.ParseInt(string(hdr[len(headerPrefix):plusPos]), 16, 64)
+ if err != nil {
+ return fmt.Errorf("malformed header %q (malformed offset) at offset %v", hdr, startOff)
+ }
+ if hdrOff != startOff {
+ return fmt.Errorf("malformed header %q with offset %v doesn't match expected offset %v", hdr, hdrOff, startOff)
+ }
+ hdrSize, err := strconv.ParseInt(string(hdr[plusPos+1:len(hdr)-1]), 16, 64)
+ if err != nil {
+ return fmt.Errorf("malformed header %q (bad size) at offset %v", hdr, startOff)
+ }
+ off += int64(len(hdr))
+
+ buf.Reset()
+ if _, err := io.CopyN(&buf, br, hdrSize); err != nil {
+ return fmt.Errorf("truncated record at offset %v: %v", startOff, err)
+ }
+ off += hdrSize
+ if err := fn(startOff, hdrBuf.Bytes(), buf.Bytes()); err != nil {
+ return err
+ }
+ }
+}
+
+// AppendRecordToFile opens the named filename for append (creating it
+// if necessary) and adds the provided data record to the end.
+// The caller is responsible for file locking.
+func AppendRecordToFile(filename string, data []byte) error {
+ f, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
+ if err != nil {
+ return err
+ }
+ off, err := f.Seek(0, io.SeekEnd)
+ if err != nil {
+ return err
+ }
+ st, err := f.Stat()
+ if err != nil {
+ return err
+ }
+ if off != st.Size() {
+ return fmt.Errorf("Size %v != offset %v", st.Size(), off)
+ }
+ if err := WriteRecord(f, off, data); err != nil {
+ f.Close()
+ return err
+ }
+ return f.Close()
+}
+
+// WriteRecord writes the record data to w, formatting the record
+// wrapper with the given offset off. It is the caller's
+// responsibility to pass the correct offset. Exactly one Write
+// call will be made to w.
+func WriteRecord(w io.Writer, off int64, data []byte) error {
+ _, err := fmt.Fprintf(w, "REC@%x+%x=%s", off, len(data), data)
+ return err
+}