maintner: add TailNetworkMutationSource and new cmd/maintwatch command

For debugging maintner in production, curiosity, and to optimize
gitmirror (golang/go#35977).

Updates golang/go#35977

Change-Id: Ie06d2b3f36d03eea613e60a7e60c14b2c45c89a4
Reviewed-on: https://go-review.googlesource.com/c/build/+/209962
Run-TryBot: Alexander Rakoczy <alex@golang.org>
Reviewed-by: Alexander Rakoczy <alex@golang.org>
diff --git a/maintner/maintwatch/maintwatch.go b/maintner/maintwatch/maintwatch.go
new file mode 100644
index 0000000..bae729c
--- /dev/null
+++ b/maintner/maintwatch/maintwatch.go
@@ -0,0 +1,41 @@
+// Copyright 2019 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// The maintwatch commands tails the maintner mutation log.
+package main
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"log"
+	"os"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+	"golang.org/x/build/maintner"
+)
+
+var server = flag.String("server", "https://maintner.golang.org/logs", "maintner server's /logs URL")
+
+func main() {
+	flag.Parse()
+	for {
+		err := maintner.TailNetworkMutationSource(context.Background(), *server, func(e maintner.MutationStreamEvent) error {
+			if e.Err != nil {
+				log.Printf("# ignoring err: %v\n", e.Err)
+				time.Sleep(5 * time.Second)
+				return nil
+			}
+			if e.Mutation != nil {
+				fmt.Println()
+				tm := proto.TextMarshaler{Compact: false}
+				tm.Marshal(os.Stdout, e.Mutation)
+			}
+			return nil
+		})
+		log.Printf("tail error: %v; restarting", err)
+		time.Sleep(time.Second)
+	}
+}
diff --git a/maintner/netsource.go b/maintner/netsource.go
index 356743f..e95cf32 100644
--- a/maintner/netsource.go
+++ b/maintner/netsource.go
@@ -5,6 +5,7 @@
 package maintner
 
 import (
+	"bytes"
 	"context"
 	"crypto/sha256"
 	"encoding/json"
@@ -41,17 +42,105 @@
 	}
 }
 
+// TailNetworkMutationSource calls fn for all new mutations added to the log on server.
+// It ignores prior events.
+// If the server is restarted and its history diverges,
+// TailNetworkMutationSource may return duplicate events. This therefore does not
+// return a MutationSource, so it can't be accidentally misused for important things.
+// TailNetworkMutationSource returns if fn returns an error, or if ctx expires.
+func TailNetworkMutationSource(ctx context.Context, server string, fn func(MutationStreamEvent) error) error {
+	td, err := ioutil.TempDir("", "maintnerwatch")
+	if err != nil {
+		return err
+	}
+	defer os.RemoveAll(td)
+
+	ns := NewNetworkMutationSource(server, td).(*netMutSource)
+	ns.quiet = true
+	getSegs := func(waitSizeNot int64) ([]LogSegmentJSON, error) {
+		for {
+			segs, err := ns.getServerSegments(ctx, waitSizeNot)
+			if err != nil {
+				if err := ctx.Err(); err != nil {
+					return nil, err
+				}
+				// Sleep a minimum of 5 seconds before trying
+				// again. The user's fn function might sleep
+				// longer or shorter.
+				timer := time.NewTimer(5 * time.Second)
+				err := fn(MutationStreamEvent{Err: err})
+				if err != nil {
+					timer.Stop()
+					return nil, err
+				}
+				<-timer.C
+				continue
+			}
+			return segs, nil
+		}
+	}
+
+	// See how long the log is at start. Then we'll only fetch
+	// things after that.
+	segs, err := getSegs(0)
+	if err != nil {
+		return err
+	}
+	segSize := sumJSONSegSize(segs)
+	lastSeg := segs[len(segs)-1]
+	if _, _, err := ns.syncSeg(ctx, lastSeg); err != nil {
+		return err
+	}
+
+	ticker := time.NewTicker(time.Second) // max re-fetch interval
+	defer ticker.Stop()
+	for {
+		segs, err := getSegs(segSize)
+		if err != nil {
+			return err
+		}
+		segSize = sumJSONSegSize(segs)
+
+		for _, seg := range segs {
+			if seg.Number < lastSeg.Number {
+				continue
+			}
+			var off int64
+			if seg.Number == lastSeg.Number {
+				off = lastSeg.Size
+			}
+			_, newData, err := ns.syncSeg(ctx, seg)
+			if err != nil {
+				return err
+			}
+			if err := reclog.ForeachRecord(bytes.NewReader(newData), off, func(off int64, hdr, rec []byte) error {
+				m := new(maintpb.Mutation)
+				if err := proto.Unmarshal(rec, m); err != nil {
+					return err
+				}
+				return fn(MutationStreamEvent{Mutation: m})
+			}); err != nil {
+				return err
+			}
+		}
+		lastSeg = segs[len(segs)-1]
+
+		<-ticker.C
+	}
+}
+
 type netMutSource struct {
 	server   string
 	base     *url.URL
 	cacheDir string
 
-	last []fileSeg
+	last  []fileSeg
+	quiet bool // disable verbose logging
 
 	// Hooks for testing. If nil, unused:
 	testHookGetServerSegments      func(context.Context, int64) ([]LogSegmentJSON, error)
 	testHookWaitAfterServerDupData func(context.Context) error
-	testHookSyncSeg                func(context.Context, LogSegmentJSON) (fileSeg, error)
+	testHookSyncSeg                func(context.Context, LogSegmentJSON) (fileSeg, []byte, error)
 	testHookFilePrefixSum224       func(file string, n int64) string
 }
 
@@ -155,13 +244,16 @@
 	}
 }
 
-// waitSizeNot optionally specifies that the request should long-poll waiting for the server
-// to have a sum of log segment sizes different than the value specified.
+// getServerSegments fetches the JSON logs handler (ns.server, usually
+// https://maintner.golang.org/logs) and returns the parsed the JSON.
+// It sends the "waitsizenot" URL parameter, which if non-zero
+// specifies that the request should long-poll waiting for the server
+// to have a sum of log segment sizes different than the value
+// specified.
 func (ns *netMutSource) getServerSegments(ctx context.Context, waitSizeNot int64) ([]LogSegmentJSON, error) {
 	if fn := ns.testHookGetServerSegments; fn != nil {
 		return fn(ctx, waitSizeNot)
 	}
-
 	logsURL := ns.server
 	if waitSizeNot > 0 {
 		logsURL += fmt.Sprintf("?waitsizenot=%d", waitSizeNot)
@@ -223,7 +315,7 @@
 
 		var fileSegs []fileSeg
 		for _, seg := range segs {
-			fileSeg, err := ns.syncSeg(ctx, seg)
+			fileSeg, _, err := ns.syncSeg(ctx, seg)
 			if err != nil {
 				return nil, fmt.Errorf("syncing segment %d: %v", seg.Number, err)
 			}
@@ -302,6 +394,13 @@
 	return
 }
 
+func sumJSONSegSize(segs []LogSegmentJSON) (sum int64) {
+	for _, seg := range segs {
+		sum += seg.Size
+	}
+	return
+}
+
 func (ns *netMutSource) sumCommonPrefixSize(a, b []fileSeg) (sum int64) {
 	for len(a) > 0 && len(b) > 0 {
 		sa, sb := a[0], b[0]
@@ -381,7 +480,9 @@
 	size   int64
 }
 
-func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (fileSeg, error) {
+// syncSeg syncs the provided log segment, returning its on-disk metadata.
+// The newData result is the new data that was added to the segment in this sync.
+func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (_ fileSeg, newData []byte, _ error) {
 	if fn := ns.testHookSyncSeg; fn != nil {
 		return fn(ctx, seg)
 	}
@@ -389,7 +490,7 @@
 	isFinalSeg := !strings.HasPrefix(seg.URL, "https://storage.googleapis.com/")
 	relURL, err := url.Parse(seg.URL)
 	if err != nil {
-		return fileSeg{}, err
+		return fileSeg{}, nil, err
 	}
 	segURL := ns.base.ResolveReference(relURL)
 
@@ -398,7 +499,7 @@
 	// Do we already have it? Files named in their final form with the sha224 are considered
 	// complete and immutable.
 	if fi, err := os.Stat(frozen); err == nil && fi.Size() == seg.Size {
-		return fileSeg{seg: seg.Number, file: frozen, size: fi.Size(), sha224: seg.SHA224}, nil
+		return fileSeg{seg: seg.Number, file: frozen, size: fi.Size(), sha224: seg.SHA224}, nil, nil
 	}
 
 	// See how much data we already have in the partial growing file.
@@ -410,34 +511,36 @@
 			if !isFinalSeg {
 				// This was growing for us, but the server started a new growing segment.
 				if err := os.Rename(partial, frozen); err != nil {
-					return fileSeg{}, err
+					return fileSeg{}, nil, err
 				}
-				return fileSeg{seg: seg.Number, file: frozen, sha224: seg.SHA224, size: seg.Size}, nil
+				return fileSeg{seg: seg.Number, file: frozen, sha224: seg.SHA224, size: seg.Size}, nil, nil
 			}
-			return fileSeg{seg: seg.Number, file: partial, sha224: seg.SHA224, size: seg.Size}, nil
+			return fileSeg{seg: seg.Number, file: partial, sha224: seg.SHA224, size: seg.Size}, nil, nil
 		}
 	}
 
 	// Otherwise, download.
 	req, err := http.NewRequest("GET", segURL.String(), nil)
 	if err != nil {
-		return fileSeg{}, err
+		return fileSeg{}, nil, err
 	}
 	req = req.WithContext(ctx)
 	req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", len(have), seg.Size-1))
 
-	log.Printf("Downloading %d bytes of %s ...", seg.Size-int64(len(have)), segURL)
+	if !ns.quiet {
+		log.Printf("Downloading %d bytes of %s ...", seg.Size-int64(len(have)), segURL)
+	}
 	res, err := http.DefaultClient.Do(req)
 	if err != nil {
-		return fileSeg{}, err
+		return fileSeg{}, nil, err
 	}
 	if res.StatusCode != 200 && res.StatusCode != 206 {
-		return fileSeg{}, fmt.Errorf("%s: %s", segURL.String(), res.Status)
+		return fileSeg{}, nil, fmt.Errorf("%s: %s", segURL.String(), res.Status)
 	}
 	slurp, err := ioutil.ReadAll(res.Body)
 	res.Body.Close()
 	if err != nil {
-		return fileSeg{}, err
+		return fileSeg{}, nil, err
 	}
 
 	var newContents []byte
@@ -449,31 +552,38 @@
 	got224 := fmt.Sprintf("%x", sha256.Sum224(newContents))
 	if got224 != seg.SHA224 {
 		if len(have) == 0 {
-			return fileSeg{}, errors.New("corrupt download")
+			return fileSeg{}, nil, errors.New("corrupt download")
 		}
 		// Try again
 		os.Remove(partial)
 		return ns.syncSeg(ctx, seg)
 	}
+	// TODO: this is a quadratic amount of write I/O as the 16 MB
+	// segment grows. Switch to appending to the existing file,
+	// then perhaps encoding the desired file size into the
+	// filename suffix (instead of just *.growing.mutlog) so
+	// concurrent readers know where to stop.
 	tf, err := ioutil.TempFile(ns.cacheDir, "tempseg")
 	if err != nil {
-		return fileSeg{}, err
+		return fileSeg{}, nil, err
 	}
 	if _, err := tf.Write(newContents); err != nil {
-		return fileSeg{}, err
+		return fileSeg{}, nil, err
 	}
 	if err := tf.Close(); err != nil {
-		return fileSeg{}, err
+		return fileSeg{}, nil, err
 	}
 	finalName := partial
 	if !isFinalSeg {
 		finalName = frozen
 	}
 	if err := os.Rename(tf.Name(), finalName); err != nil {
-		return fileSeg{}, err
+		return fileSeg{}, nil, err
 	}
-	log.Printf("wrote %v", finalName)
-	return fileSeg{seg: seg.Number, file: finalName, size: seg.Size, sha224: seg.SHA224}, nil
+	if !ns.quiet {
+		log.Printf("wrote %v", finalName)
+	}
+	return fileSeg{seg: seg.Number, file: finalName, size: seg.Size, sha224: seg.SHA224}, slurp, nil
 }
 
 type LogSegmentJSON struct {
diff --git a/maintner/netsource_test.go b/maintner/netsource_test.go
index 2920cc8..20e4b9e 100644
--- a/maintner/netsource_test.go
+++ b/maintner/netsource_test.go
@@ -317,13 +317,13 @@
 					}
 					return segs, nil
 				},
-				testHookSyncSeg: func(_ context.Context, seg LogSegmentJSON) (fileSeg, error) {
+				testHookSyncSeg: func(_ context.Context, seg LogSegmentJSON) (fileSeg, []byte, error) {
 					return fileSeg{
 						seg:    seg.Number,
 						size:   seg.Size,
 						sha224: seg.SHA224,
 						file:   fmt.Sprintf("/fake/%04d.mutlog", seg.Number),
-					}, nil
+					}, nil, nil
 				},
 				testHookFilePrefixSum224: func(file string, n int64) string {
 					if tt.prefixSum != "" {
diff --git a/maintner/reclog/reclog.go b/maintner/reclog/reclog.go
index 4911435..a1fd409 100644
--- a/maintner/reclog/reclog.go
+++ b/maintner/reclog/reclog.go
@@ -64,7 +64,8 @@
 // ForeachRecord calls fn for each record in r.
 // Calls to fn are made serially.
 // If fn returns an error, iteration ends and that error is returned.
-// The startOffset be 0 if reading from the beginning of a file.
+// The startOffset is where in the file r represents. It should be 0
+// if reading from the beginning of a file.
 func ForeachRecord(r io.Reader, startOffset int64, fn RecordCallback) error {
 	off := startOffset
 	br := bufio.NewReader(r)