maintner: add TailNetworkMutationSource and new cmd/maintwatch command
For debugging maintner in production, curiosity, and to optimize
gitmirror (golang/go#35977).
Updates golang/go#35977
Change-Id: Ie06d2b3f36d03eea613e60a7e60c14b2c45c89a4
Reviewed-on: https://go-review.googlesource.com/c/build/+/209962
Run-TryBot: Alexander Rakoczy <alex@golang.org>
Reviewed-by: Alexander Rakoczy <alex@golang.org>
diff --git a/maintner/maintwatch/maintwatch.go b/maintner/maintwatch/maintwatch.go
new file mode 100644
index 0000000..bae729c
--- /dev/null
+++ b/maintner/maintwatch/maintwatch.go
@@ -0,0 +1,41 @@
+// Copyright 2019 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// The maintwatch commands tails the maintner mutation log.
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "log"
+ "os"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "golang.org/x/build/maintner"
+)
+
+var server = flag.String("server", "https://maintner.golang.org/logs", "maintner server's /logs URL")
+
+func main() {
+ flag.Parse()
+ for {
+ err := maintner.TailNetworkMutationSource(context.Background(), *server, func(e maintner.MutationStreamEvent) error {
+ if e.Err != nil {
+ log.Printf("# ignoring err: %v\n", e.Err)
+ time.Sleep(5 * time.Second)
+ return nil
+ }
+ if e.Mutation != nil {
+ fmt.Println()
+ tm := proto.TextMarshaler{Compact: false}
+ tm.Marshal(os.Stdout, e.Mutation)
+ }
+ return nil
+ })
+ log.Printf("tail error: %v; restarting", err)
+ time.Sleep(time.Second)
+ }
+}
diff --git a/maintner/netsource.go b/maintner/netsource.go
index 356743f..e95cf32 100644
--- a/maintner/netsource.go
+++ b/maintner/netsource.go
@@ -5,6 +5,7 @@
package maintner
import (
+ "bytes"
"context"
"crypto/sha256"
"encoding/json"
@@ -41,17 +42,105 @@
}
}
+// TailNetworkMutationSource calls fn for all new mutations added to the log on server.
+// It ignores prior events.
+// If the server is restarted and its history diverges,
+// TailNetworkMutationSource may return duplicate events. This therefore does not
+// return a MutationSource, so it can't be accidentally misused for important things.
+// TailNetworkMutationSource returns if fn returns an error, or if ctx expires.
+func TailNetworkMutationSource(ctx context.Context, server string, fn func(MutationStreamEvent) error) error {
+ td, err := ioutil.TempDir("", "maintnerwatch")
+ if err != nil {
+ return err
+ }
+ defer os.RemoveAll(td)
+
+ ns := NewNetworkMutationSource(server, td).(*netMutSource)
+ ns.quiet = true
+ getSegs := func(waitSizeNot int64) ([]LogSegmentJSON, error) {
+ for {
+ segs, err := ns.getServerSegments(ctx, waitSizeNot)
+ if err != nil {
+ if err := ctx.Err(); err != nil {
+ return nil, err
+ }
+ // Sleep a minimum of 5 seconds before trying
+ // again. The user's fn function might sleep
+ // longer or shorter.
+ timer := time.NewTimer(5 * time.Second)
+ err := fn(MutationStreamEvent{Err: err})
+ if err != nil {
+ timer.Stop()
+ return nil, err
+ }
+ <-timer.C
+ continue
+ }
+ return segs, nil
+ }
+ }
+
+ // See how long the log is at start. Then we'll only fetch
+ // things after that.
+ segs, err := getSegs(0)
+ if err != nil {
+ return err
+ }
+ segSize := sumJSONSegSize(segs)
+ lastSeg := segs[len(segs)-1]
+ if _, _, err := ns.syncSeg(ctx, lastSeg); err != nil {
+ return err
+ }
+
+ ticker := time.NewTicker(time.Second) // max re-fetch interval
+ defer ticker.Stop()
+ for {
+ segs, err := getSegs(segSize)
+ if err != nil {
+ return err
+ }
+ segSize = sumJSONSegSize(segs)
+
+ for _, seg := range segs {
+ if seg.Number < lastSeg.Number {
+ continue
+ }
+ var off int64
+ if seg.Number == lastSeg.Number {
+ off = lastSeg.Size
+ }
+ _, newData, err := ns.syncSeg(ctx, seg)
+ if err != nil {
+ return err
+ }
+ if err := reclog.ForeachRecord(bytes.NewReader(newData), off, func(off int64, hdr, rec []byte) error {
+ m := new(maintpb.Mutation)
+ if err := proto.Unmarshal(rec, m); err != nil {
+ return err
+ }
+ return fn(MutationStreamEvent{Mutation: m})
+ }); err != nil {
+ return err
+ }
+ }
+ lastSeg = segs[len(segs)-1]
+
+ <-ticker.C
+ }
+}
+
type netMutSource struct {
server string
base *url.URL
cacheDir string
- last []fileSeg
+ last []fileSeg
+ quiet bool // disable verbose logging
// Hooks for testing. If nil, unused:
testHookGetServerSegments func(context.Context, int64) ([]LogSegmentJSON, error)
testHookWaitAfterServerDupData func(context.Context) error
- testHookSyncSeg func(context.Context, LogSegmentJSON) (fileSeg, error)
+ testHookSyncSeg func(context.Context, LogSegmentJSON) (fileSeg, []byte, error)
testHookFilePrefixSum224 func(file string, n int64) string
}
@@ -155,13 +244,16 @@
}
}
-// waitSizeNot optionally specifies that the request should long-poll waiting for the server
-// to have a sum of log segment sizes different than the value specified.
+// getServerSegments fetches the JSON logs handler (ns.server, usually
+// https://maintner.golang.org/logs) and returns the parsed the JSON.
+// It sends the "waitsizenot" URL parameter, which if non-zero
+// specifies that the request should long-poll waiting for the server
+// to have a sum of log segment sizes different than the value
+// specified.
func (ns *netMutSource) getServerSegments(ctx context.Context, waitSizeNot int64) ([]LogSegmentJSON, error) {
if fn := ns.testHookGetServerSegments; fn != nil {
return fn(ctx, waitSizeNot)
}
-
logsURL := ns.server
if waitSizeNot > 0 {
logsURL += fmt.Sprintf("?waitsizenot=%d", waitSizeNot)
@@ -223,7 +315,7 @@
var fileSegs []fileSeg
for _, seg := range segs {
- fileSeg, err := ns.syncSeg(ctx, seg)
+ fileSeg, _, err := ns.syncSeg(ctx, seg)
if err != nil {
return nil, fmt.Errorf("syncing segment %d: %v", seg.Number, err)
}
@@ -302,6 +394,13 @@
return
}
+func sumJSONSegSize(segs []LogSegmentJSON) (sum int64) {
+ for _, seg := range segs {
+ sum += seg.Size
+ }
+ return
+}
+
func (ns *netMutSource) sumCommonPrefixSize(a, b []fileSeg) (sum int64) {
for len(a) > 0 && len(b) > 0 {
sa, sb := a[0], b[0]
@@ -381,7 +480,9 @@
size int64
}
-func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (fileSeg, error) {
+// syncSeg syncs the provided log segment, returning its on-disk metadata.
+// The newData result is the new data that was added to the segment in this sync.
+func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (_ fileSeg, newData []byte, _ error) {
if fn := ns.testHookSyncSeg; fn != nil {
return fn(ctx, seg)
}
@@ -389,7 +490,7 @@
isFinalSeg := !strings.HasPrefix(seg.URL, "https://storage.googleapis.com/")
relURL, err := url.Parse(seg.URL)
if err != nil {
- return fileSeg{}, err
+ return fileSeg{}, nil, err
}
segURL := ns.base.ResolveReference(relURL)
@@ -398,7 +499,7 @@
// Do we already have it? Files named in their final form with the sha224 are considered
// complete and immutable.
if fi, err := os.Stat(frozen); err == nil && fi.Size() == seg.Size {
- return fileSeg{seg: seg.Number, file: frozen, size: fi.Size(), sha224: seg.SHA224}, nil
+ return fileSeg{seg: seg.Number, file: frozen, size: fi.Size(), sha224: seg.SHA224}, nil, nil
}
// See how much data we already have in the partial growing file.
@@ -410,34 +511,36 @@
if !isFinalSeg {
// This was growing for us, but the server started a new growing segment.
if err := os.Rename(partial, frozen); err != nil {
- return fileSeg{}, err
+ return fileSeg{}, nil, err
}
- return fileSeg{seg: seg.Number, file: frozen, sha224: seg.SHA224, size: seg.Size}, nil
+ return fileSeg{seg: seg.Number, file: frozen, sha224: seg.SHA224, size: seg.Size}, nil, nil
}
- return fileSeg{seg: seg.Number, file: partial, sha224: seg.SHA224, size: seg.Size}, nil
+ return fileSeg{seg: seg.Number, file: partial, sha224: seg.SHA224, size: seg.Size}, nil, nil
}
}
// Otherwise, download.
req, err := http.NewRequest("GET", segURL.String(), nil)
if err != nil {
- return fileSeg{}, err
+ return fileSeg{}, nil, err
}
req = req.WithContext(ctx)
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", len(have), seg.Size-1))
- log.Printf("Downloading %d bytes of %s ...", seg.Size-int64(len(have)), segURL)
+ if !ns.quiet {
+ log.Printf("Downloading %d bytes of %s ...", seg.Size-int64(len(have)), segURL)
+ }
res, err := http.DefaultClient.Do(req)
if err != nil {
- return fileSeg{}, err
+ return fileSeg{}, nil, err
}
if res.StatusCode != 200 && res.StatusCode != 206 {
- return fileSeg{}, fmt.Errorf("%s: %s", segURL.String(), res.Status)
+ return fileSeg{}, nil, fmt.Errorf("%s: %s", segURL.String(), res.Status)
}
slurp, err := ioutil.ReadAll(res.Body)
res.Body.Close()
if err != nil {
- return fileSeg{}, err
+ return fileSeg{}, nil, err
}
var newContents []byte
@@ -449,31 +552,38 @@
got224 := fmt.Sprintf("%x", sha256.Sum224(newContents))
if got224 != seg.SHA224 {
if len(have) == 0 {
- return fileSeg{}, errors.New("corrupt download")
+ return fileSeg{}, nil, errors.New("corrupt download")
}
// Try again
os.Remove(partial)
return ns.syncSeg(ctx, seg)
}
+ // TODO: this is a quadratic amount of write I/O as the 16 MB
+ // segment grows. Switch to appending to the existing file,
+ // then perhaps encoding the desired file size into the
+ // filename suffix (instead of just *.growing.mutlog) so
+ // concurrent readers know where to stop.
tf, err := ioutil.TempFile(ns.cacheDir, "tempseg")
if err != nil {
- return fileSeg{}, err
+ return fileSeg{}, nil, err
}
if _, err := tf.Write(newContents); err != nil {
- return fileSeg{}, err
+ return fileSeg{}, nil, err
}
if err := tf.Close(); err != nil {
- return fileSeg{}, err
+ return fileSeg{}, nil, err
}
finalName := partial
if !isFinalSeg {
finalName = frozen
}
if err := os.Rename(tf.Name(), finalName); err != nil {
- return fileSeg{}, err
+ return fileSeg{}, nil, err
}
- log.Printf("wrote %v", finalName)
- return fileSeg{seg: seg.Number, file: finalName, size: seg.Size, sha224: seg.SHA224}, nil
+ if !ns.quiet {
+ log.Printf("wrote %v", finalName)
+ }
+ return fileSeg{seg: seg.Number, file: finalName, size: seg.Size, sha224: seg.SHA224}, slurp, nil
}
type LogSegmentJSON struct {
diff --git a/maintner/netsource_test.go b/maintner/netsource_test.go
index 2920cc8..20e4b9e 100644
--- a/maintner/netsource_test.go
+++ b/maintner/netsource_test.go
@@ -317,13 +317,13 @@
}
return segs, nil
},
- testHookSyncSeg: func(_ context.Context, seg LogSegmentJSON) (fileSeg, error) {
+ testHookSyncSeg: func(_ context.Context, seg LogSegmentJSON) (fileSeg, []byte, error) {
return fileSeg{
seg: seg.Number,
size: seg.Size,
sha224: seg.SHA224,
file: fmt.Sprintf("/fake/%04d.mutlog", seg.Number),
- }, nil
+ }, nil, nil
},
testHookFilePrefixSum224: func(file string, n int64) string {
if tt.prefixSum != "" {
diff --git a/maintner/reclog/reclog.go b/maintner/reclog/reclog.go
index 4911435..a1fd409 100644
--- a/maintner/reclog/reclog.go
+++ b/maintner/reclog/reclog.go
@@ -64,7 +64,8 @@
// ForeachRecord calls fn for each record in r.
// Calls to fn are made serially.
// If fn returns an error, iteration ends and that error is returned.
-// The startOffset be 0 if reading from the beginning of a file.
+// The startOffset is where in the file r represents. It should be 0
+// if reading from the beginning of a file.
func ForeachRecord(r io.Reader, startOffset int64, fn RecordCallback) error {
off := startOffset
br := bufio.NewReader(r)