blob: 6f71f43a59bfcd546122ac131ccb232fd8d366ce [file] [log] [blame]
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package maintner
import (
// The on-DiskMutationLogger format is as follows:
// The log is a stream of proto3-marshalled *maintpb.Mutation, spread
// over 1 or more files named maintner-YYYY-MM-DD.mutlog. Each record
// begins with the variably-lengthed prefix "REC@XXX+YYY=" where the
// 0+ XXXX digits are the hex offset on disk (where the 'R' on disk is
// written) and the 0+ YYY digits are the hex length of the marshalled
// proto. After the YYY digits there is a '=' byte before the YYY bytes
// of proto. There is no record footer.
var (
headerPrefix = []byte("REC@")
headerSuffix = []byte("=")
plus = []byte("+")
// A MutationLogger logs mutations.
type MutationLogger interface {
Log(*maintpb.Mutation) error
// DiskMutationLogger logs mutations to disk.
type DiskMutationLogger struct {
directory string
mu sync.RWMutex
// NewDiskMutationLogger creates a new DiskMutationLogger, which will create
// mutations in the given directory.
func NewDiskMutationLogger(directory string) *DiskMutationLogger {
if directory == "" {
panic("empty directory")
return &DiskMutationLogger{directory: directory}
// filename returns the filename to write to. The oldest filename must come
// first in lexical order.
func (d *DiskMutationLogger) filename() string {
now := time.Now().UTC()
return filepath.Join(, fmt.Sprintf("maintner-%s.mutlog", now.Format("2006-01-02")))
// Log will write m to disk. If a mutation file does not exist for the current
// day, it will be created.
func (d *DiskMutationLogger) Log(m *maintpb.Mutation) error {
data, err := proto.Marshal(m)
if err != nil {
return err
f, err := os.OpenFile(d.filename(), os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return err
off, err := f.Seek(0, io.SeekEnd)
if err != nil {
return err
st, err := f.Stat()
if err != nil {
return err
if off != st.Size() {
return fmt.Errorf("Size %v != offset %v", st.Size(), off)
var buf bytes.Buffer
fmt.Fprintf(&buf, "REC@%x+%x=", off, len(data))
if _, err := f.Write(buf.Bytes()); err != nil {
return err
return f.Close()
func (d *DiskMutationLogger) GetMutations(ctx context.Context) <-chan *maintpb.Mutation {
ch := make(chan *maintpb.Mutation, 50) // buffered: overlap gunzip/unmarshal with loading
if == "" {
panic("empty directory")
go func() {
// Walk guarantees that files are walked in lexical order, which we depend on.
err := filepath.Walk(, func(path string, fi os.FileInfo, err error) error {
if err != nil {
return err
if fi.IsDir() && path != filepath.Clean( {
return filepath.SkipDir
if !strings.HasPrefix(fi.Name(), "maintner-") {
return nil
if !strings.HasSuffix(fi.Name(), ".mutlog") {
return nil
var off int64
f, err := os.Open(path)
if err != nil {
return err
defer f.Close()
br := bufio.NewReader(f)
var buf bytes.Buffer
for {
startOff := off
hdr, err := br.ReadSlice('=')
if err != nil {
if err == io.EOF && len(hdr) == 0 {
return nil
return err
if len(hdr) > 40 {
return fmt.Errorf("malformed overlong header %q... at %v, offset %v", hdr[:40], path, startOff)
if !bytes.HasPrefix(hdr, headerPrefix) || !bytes.HasSuffix(hdr, headerSuffix) || bytes.Count(hdr, plus) != 1 {
return fmt.Errorf("malformed header %q at %v, offset %v", hdr, path, startOff)
plusPos := bytes.IndexByte(hdr, '+')
hdrOff, err := strconv.ParseInt(string(hdr[len(headerPrefix):plusPos]), 16, 64)
if err != nil {
return fmt.Errorf("malformed header %q (malformed offset) at %v, offset %v", hdr, path, startOff)
if hdrOff != startOff {
return fmt.Errorf("malformed header %q with offset %v doesn't match expected offset %v in %v", hdr, hdrOff, startOff, path)
hdrSize, err := strconv.ParseInt(string(hdr[plusPos+1:len(hdr)-1]), 16, 64)
if err != nil {
return fmt.Errorf("malformed header %q (bad size) at %v, offset %v", hdr, path, startOff)
off += int64(len(hdr))
if _, err := io.CopyN(&buf, br, hdrSize); err != nil {
return fmt.Errorf("truncated record at offset %v: %v", startOff, err)
off += hdrSize
m := new(maintpb.Mutation)
if err := proto.Unmarshal(buf.Bytes(), m); err != nil {
return err
select {
case ch <- m:
case <-ctx.Done():
return ctx.Err()
if err != nil {
log.Printf("error walking directory %s: %v",, err)
return ch