blob: a1fd40906df43a4e24b4a5ef6a533bcf29c010ca [file] [log] [blame]
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package reclog contains readers and writers for a record wrapper
// format used by maintner.
package reclog
import (
"bufio"
"bytes"
"fmt"
"io"
"os"
"strconv"
)
// The reclog format is as follows:
//
// The log is a series of binary blobs. Each record begins with the
// variably-lengthed prefix "REC@XXX+YYY=" where the 0+ XXXX digits
// are the hex offset on disk (where the 'R' on disk is written) and
// the 0+ YYY digits are the hex length of the blob. After the YYY
// digits there is a '=' byte before the YYY bytes of blob. There is
// no record footer.
var (
headerPrefix = []byte("REC@")
headerSuffix = []byte("=")
plus = []byte("+")
)
// RecordCallback is the callback signature accepted by
// ForeachFileRecord and ForeachRecord, which read the mutation log
// format used by DiskMutationLogger.
//
// Offset is the offset in the logical of physical file.
// hdr and bytes are only valid until the function returns
// and must not be retained.
//
// hdr is the record header, in the form "REC@c765c9a+1d3=" (REC@ <hex
// offset> + <hex len(rec)> + '=').
//
// rec is the proto3 binary marshalled representation of
// *maintpb.Mutation.
//
// If the callback returns an error, iteration stops.
type RecordCallback func(off int64, hdr, rec []byte) error
// ForeachFileRecord calls fn for each record in the named file.
// Calls to fn are made serially.
// If fn returns an error, iteration ends and that error is returned.
func ForeachFileRecord(path string, fn RecordCallback) error {
f, err := os.Open(path)
if err != nil {
return err
}
defer f.Close()
if err := ForeachRecord(f, 0, fn); err != nil {
return fmt.Errorf("error in %s: %v", path, err)
}
return nil
}
// ForeachRecord calls fn for each record in r.
// Calls to fn are made serially.
// If fn returns an error, iteration ends and that error is returned.
// The startOffset is where in the file r represents. It should be 0
// if reading from the beginning of a file.
func ForeachRecord(r io.Reader, startOffset int64, fn RecordCallback) error {
off := startOffset
br := bufio.NewReader(r)
var buf bytes.Buffer
var hdrBuf bytes.Buffer
for {
startOff := off
hdr, err := br.ReadSlice('=')
if err != nil {
if err == io.EOF && len(hdr) == 0 {
return nil
}
return err
}
if len(hdr) > 40 {
return fmt.Errorf("malformed overlong header %q at offset %v", hdr[:40], startOff)
}
hdrBuf.Reset()
hdrBuf.Write(hdr)
if !bytes.HasPrefix(hdr, headerPrefix) || !bytes.HasSuffix(hdr, headerSuffix) || bytes.Count(hdr, plus) != 1 {
return fmt.Errorf("malformed header %q at offset %v", hdr, startOff)
}
plusPos := bytes.IndexByte(hdr, '+')
hdrOff, err := strconv.ParseInt(string(hdr[len(headerPrefix):plusPos]), 16, 64)
if err != nil {
return fmt.Errorf("malformed header %q (malformed offset) at offset %v", hdr, startOff)
}
if hdrOff != startOff {
return fmt.Errorf("malformed header %q with offset %v doesn't match expected offset %v", hdr, hdrOff, startOff)
}
hdrSize, err := strconv.ParseInt(string(hdr[plusPos+1:len(hdr)-1]), 16, 64)
if err != nil {
return fmt.Errorf("malformed header %q (bad size) at offset %v", hdr, startOff)
}
off += int64(len(hdr))
buf.Reset()
if _, err := io.CopyN(&buf, br, hdrSize); err != nil {
return fmt.Errorf("truncated record at offset %v: %v", startOff, err)
}
off += hdrSize
if err := fn(startOff, hdrBuf.Bytes(), buf.Bytes()); err != nil {
return err
}
}
}
// AppendRecordToFile opens the named filename for append (creating it
// if necessary) and adds the provided data record to the end.
// The caller is responsible for file locking.
func AppendRecordToFile(filename string, data []byte) error {
f, err := os.OpenFile(filename, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return err
}
off, err := f.Seek(0, io.SeekEnd)
if err != nil {
return err
}
st, err := f.Stat()
if err != nil {
return err
}
if off != st.Size() {
return fmt.Errorf("Size %v != offset %v", st.Size(), off)
}
if err := WriteRecord(f, off, data); err != nil {
f.Close()
return err
}
return f.Close()
}
// WriteRecord writes the record data to w, formatting the record
// wrapper with the given offset off. It is the caller's
// responsibility to pass the correct offset. Exactly one Write
// call will be made to w.
func WriteRecord(w io.Writer, off int64, data []byte) error {
_, err := fmt.Fprintf(w, "REC@%x+%x=%s", off, len(data), data)
return err
}