maintner: retry network operations that may fail in getNewSegments

On the internet, servers generally don't go out of their way to ensure
graceful stream shutdowns, preferring to instead rely on the clients to
receive a network error and decide to repeat the same request.

netMutSource implements the MutationSource interface, whose GetMutations
method should return a final MutationStreamEvent event with a non-nil
Err only when the mutation source encounters a fatal error. Most callers
correctly handle such fatal errors with a log.Fatalln or equivalent.

In practice, the netMutSource.getNewSegments method does a lot of work
to ensure it doesn't give up unnecessarily soon, so most errors are
really fatal. However, it was attempting to rely on HTTP requests to
a remote server to always succeed on the first try. That is a mistake.

This change augments the getServerSegments and syncSeg methods to
annotate errors that they believe might be due to transient network
problems, and adds non-zero number of retries to getNewSegments.
That is a good layer to retry since it allows throwing away the least
amount of successful progress made thus far-much less than completely
restarting a maintner-based program and having it reload the corpus.

While here, start relying on the maintnerd server having support for
long-polling since it was added in 2017 (CL 42871), which simplifies
code by allowing the top-level loop and its nesting to be deleted.

Updates golang/go#52048.

Change-Id: I8148109f97365697beab5575358069adfa095f0e
Reviewed-on: https://go-review.googlesource.com/c/build/+/414174
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
Reviewed-by: Alex Rakoczy <jenny@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Run-TryBot: Dmitri Shuralyov <dmitshur@golang.org>
Auto-Submit: Dmitri Shuralyov <dmitshur@golang.org>
diff --git a/maintner/godata/godata_test.go b/maintner/godata/godata_test.go
index eca159d..b0fe2d5 100644
--- a/maintner/godata/godata_test.go
+++ b/maintner/godata/godata_test.go
@@ -39,7 +39,7 @@
 
 func getGoData(tb testing.TB) *maintner.Corpus {
 	if testing.Short() {
-		tb.Skip("not running tests requiring large download in short mode")
+		tb.Skip("skipping test requiring large download in short mode")
 	}
 	corpusMu.Lock()
 	defer corpusMu.Unlock()
diff --git a/maintner/maintner.go b/maintner/maintner.go
index 3788b99..7f611e5 100644
--- a/maintner/maintner.go
+++ b/maintner/maintner.go
@@ -244,7 +244,7 @@
 // the latest state from the MutationSource passed earlier to
 // Initialize. It does not return until there's either a new change or
 // the context expires.
-// If Update returns ErrSplit, the corpus can longer be updated.
+// If Update returns ErrSplit, the corpus can no longer be updated.
 //
 // Update must not be called concurrently with any other Update calls. If
 // reading the corpus concurrently while the corpus is updating, you must hold
diff --git a/maintner/maintnerd/maintapi/api_test.go b/maintner/maintnerd/maintapi/api_test.go
index d7d35ed..81f7468 100644
--- a/maintner/maintnerd/maintapi/api_test.go
+++ b/maintner/maintnerd/maintapi/api_test.go
@@ -430,7 +430,7 @@
 
 func getGoData(tb testing.TB) *maintner.Corpus {
 	if testing.Short() {
-		tb.Skip("skipping maintner godata test in short mode")
+		tb.Skip("skipping test requiring large download in short mode")
 	}
 	corpusMu.Lock()
 	defer corpusMu.Unlock()
diff --git a/maintner/netsource.go b/maintner/netsource.go
index 4c6ca8f..87a531d 100644
--- a/maintner/netsource.go
+++ b/maintner/netsource.go
@@ -50,7 +50,8 @@
 // If the server is restarted and its history diverges,
 // TailNetworkMutationSource may return duplicate events. This therefore does not
 // return a MutationSource, so it can't be accidentally misused for important things.
-// TailNetworkMutationSource returns if fn returns an error, or if ctx expires.
+// TailNetworkMutationSource returns if fn returns an error, if ctx expires,
+// or if it runs into a network error.
 func TailNetworkMutationSource(ctx context.Context, server string, fn func(MutationStreamEvent) error) error {
 	td, err := ioutil.TempDir("", "maintnertail")
 	if err != nil {
@@ -141,16 +142,15 @@
 	quiet bool // disable verbose logging
 
 	// Hooks for testing. If nil, unused:
-	testHookGetServerSegments      func(context.Context, int64) ([]LogSegmentJSON, error)
-	testHookWaitAfterServerDupData func(context.Context) error
-	testHookSyncSeg                func(context.Context, LogSegmentJSON) (fileSeg, []byte, error)
-	testHookFilePrefixSum224       func(file string, n int64) string
+	testHookGetServerSegments func(context.Context, int64) ([]LogSegmentJSON, error)
+	testHookSyncSeg           func(context.Context, LogSegmentJSON) (fileSeg, []byte, error)
+	testHookFilePrefixSum224  func(file string, n int64) string
 }
 
 func (ns *netMutSource) GetMutations(ctx context.Context) <-chan MutationStreamEvent {
 	ch := make(chan MutationStreamEvent, 50)
 	go func() {
-		err := ns.sendMutations(ctx, ch)
+		err := ns.fetchAndSendMutations(ctx, ch)
 		final := MutationStreamEvent{Err: err}
 		if err == nil {
 			final.End = true
@@ -170,6 +170,8 @@
 		return false
 	}
 	switch err := err.(type) {
+	case fetchError:
+		return isNoInternetError(err.Err)
 	case *url.Error:
 		return isNoInternetError(err.Err)
 	case *net.OpError:
@@ -248,55 +250,71 @@
 }
 
 // getServerSegments fetches the JSON logs handler (ns.server, usually
-// https://maintner.golang.org/logs) and returns the parsed the JSON.
-// It sends the "waitsizenot" URL parameter, which if non-zero
-// specifies that the request should long-poll waiting for the server
-// to have a sum of log segment sizes different than the value
-// specified.
+// https://maintner.golang.org/logs) and returns the parsed JSON.
+// It sends the "waitsizenot" URL parameter, which specifies that the
+// request should long-poll waiting for the server to have a sum of
+// log segment sizes different than the value specified. As a result,
+// it blocks until the server has new data to send or ctx expires.
+//
+// getServerSegments returns an error that matches fetchError with
+// PossiblyRetryable set to true when it has signal that repeating
+// the same call after some time may succeed.
 func (ns *netMutSource) getServerSegments(ctx context.Context, waitSizeNot int64) ([]LogSegmentJSON, error) {
 	if fn := ns.testHookGetServerSegments; fn != nil {
 		return fn(ctx, waitSizeNot)
 	}
-	logsURL := ns.server
-	if waitSizeNot > 0 {
-		logsURL += fmt.Sprintf("?waitsizenot=%d", waitSizeNot)
-	}
+	logsURL := fmt.Sprintf("%s?waitsizenot=%d", ns.server, waitSizeNot)
 	for {
-		req, err := http.NewRequest("GET", logsURL, nil)
+		req, err := http.NewRequestWithContext(ctx, "GET", logsURL, nil)
 		if err != nil {
 			return nil, err
 		}
-		req = req.WithContext(ctx)
 		res, err := http.DefaultClient.Do(req)
 		if err != nil {
-			return nil, err
+			return nil, fetchError{Err: err, PossiblyRetryable: true}
 		}
-		// If we're doing a long poll and the server replies
+		// When we're doing a long poll and the server replies
 		// with a 304 response, that means the server is just
 		// heart-beating us and trying to get a response back
 		// within its various deadlines. But we should just
 		// try again.
-		if waitSizeNot > 0 && res.StatusCode == http.StatusNotModified {
+		if res.StatusCode == http.StatusNotModified {
 			res.Body.Close()
 			continue
 		}
 		defer res.Body.Close()
-		if res.StatusCode != 200 {
+		if res.StatusCode/100 == 5 {
+			// Consider a 5xx server response to possibly succeed later.
+			return nil, fetchError{Err: fmt.Errorf("%s: %v", ns.server, res.Status), PossiblyRetryable: true}
+		} else if res.StatusCode != http.StatusOK {
 			return nil, fmt.Errorf("%s: %v", ns.server, res.Status)
 		}
-		var segs []LogSegmentJSON
-		err = json.NewDecoder(res.Body).Decode(&segs)
+		b, err := io.ReadAll(res.Body)
 		if err != nil {
-			return nil, fmt.Errorf("decoding %s JSON: %v", ns.server, err)
+			return nil, fetchError{Err: err, PossiblyRetryable: true}
+		}
+		var segs []LogSegmentJSON
+		err = json.Unmarshal(b, &segs)
+		if err != nil {
+			return nil, fmt.Errorf("unmarshaling %s JSON: %v", ns.server, err)
 		}
 		return segs, nil
 	}
 }
 
+// getNewSegments fetches new mutations from the network mutation source.
+// It tries to absorb the expected network bumps by trying multiple times,
+// and returns an error only when it considers the problem to be terminal.
+//
+// If there's no internet connectivity from the start, it returns locally
+// cached segments that might be available from before. Otherwise it waits
+// for internet connectivity to come back and keeps going when it does.
 func (ns *netMutSource) getNewSegments(ctx context.Context) ([]fileSeg, error) {
-	for {
-		sumLast := sumSegSize(ns.last)
+	sumLast := sumSegSize(ns.last)
 
+	// First, fetch JSON metadata for the segments from the server.
+	var serverSegs []LogSegmentJSON
+	for try := 1; ; {
 		segs, err := ns.getServerSegments(ctx, sumLast)
 		if isNoInternetError(err) {
 			if sumLast == 0 {
@@ -307,50 +325,92 @@
 			case <-ctx.Done():
 				return nil, ctx.Err()
 			case <-time.After(15 * time.Second):
+				try = 1
 				continue
 			}
-		}
-		if err != nil {
-			return nil, err
-		}
-		// TODO: optimization: if already on GCE, skip sync to disk part and just
-		// read from network. fast & free network inside.
-
-		var fileSegs []fileSeg
-		for _, seg := range segs {
-			fileSeg, _, err := ns.syncSeg(ctx, seg)
-			if err != nil {
-				return nil, fmt.Errorf("syncing segment %d: %v", seg.Number, err)
+		} else if fe := (fetchError{}); errors.As(err, &fe) && fe.PossiblyRetryable {
+			// Fetching the JSON logs handler happens over an unreliable network connection,
+			// and will fail at some point. Prefer to try again over reporting a terminal error.
+			const maxTries = 5
+			if try == maxTries {
+				// At this point, promote it to a terminal error.
+				return nil, fmt.Errorf("after %d attempts, fetching server segments still failed: %v", maxTries, err)
 			}
-			fileSegs = append(fileSegs, fileSeg)
-		}
-		sumCommon := ns.sumCommonPrefixSize(fileSegs, ns.last)
-		if sumLast != sumCommon {
-			return nil, ErrSplit
-		}
-		sumCur := sumSegSize(fileSegs)
-		if sumCommon == sumCur {
-			// Nothing new. This shouldn't happen once the
-			// server is updated to respect the
-			// "?waitsizenot=NNN" long polling parameter.
-			// But keep this brief pause as a backup to
-			// prevent spinning and because clients &
-			// servers won't be updated simultaneously.
-			if ns.testHookGetServerSegments == nil {
-				log.Printf("maintner.netsource: server returned unchanged log segments; old server?")
-			}
+			someDelay := time.Duration(try*try) * time.Second
+			log.Printf("fetching server segments did not succeed on attempt %d, will try again in %v: %v", try, someDelay, err)
 			select {
 			case <-ctx.Done():
 				return nil, ctx.Err()
-			case <-time.After(1 * time.Second):
+			case <-time.After(someDelay):
+				try++
+				continue
 			}
-			continue
+		} else if err != nil {
+			return nil, err
 		}
-		ns.last = fileSegs
-
-		newSegs := trimLeadingSegBytes(fileSegs, sumCommon)
-		return newSegs, nil
+		serverSegs = segs
+		break
 	}
+	// TODO: optimization: if already on GCE, skip sync to disk part and just
+	// read from network. fast & free network inside.
+
+	// Second, fetch the new segments or their fragments
+	// that we don't yet have locally.
+	var fileSegs []fileSeg
+	for _, seg := range serverSegs {
+		for try := 1; ; {
+			fileSeg, _, err := ns.syncSeg(ctx, seg)
+			if isNoInternetError(err) {
+				log.Printf("No internet; blocking.")
+				select {
+				case <-ctx.Done():
+					return nil, ctx.Err()
+				case <-time.After(15 * time.Second):
+					try = 1
+					continue
+				}
+			} else if fe := (fetchError{}); errors.As(err, &fe) && fe.PossiblyRetryable {
+				// Syncing a segment fetches a good deal of data over a network connection,
+				// and will fail at some point. Be very willing to try again at this layer,
+				// since it's much more efficient than having GetMutations return an error
+				// and possibly cause a higher level retry to redo significantly more work.
+				const maxTries = 10
+				if try == maxTries {
+					// At this point, promote it to a terminal error.
+					return nil, fmt.Errorf("after %d attempts, syncing segment %d still failed: %v", maxTries, seg.Number, err)
+				}
+				someDelay := time.Duration(try*try) * time.Second
+				log.Printf("syncing segment %d did not succeed on attempt %d, will try again in %v: %v", seg.Number, try, someDelay, err)
+				select {
+				case <-ctx.Done():
+					return nil, ctx.Err()
+				case <-time.After(someDelay):
+					try++
+					continue
+				}
+			} else if err != nil {
+				return nil, err
+			}
+			fileSegs = append(fileSegs, fileSeg)
+			break
+		}
+	}
+
+	// Verify consistency of newly fetched data,
+	// and check there is in fact something new.
+	sumCommon := ns.sumCommonPrefixSize(fileSegs, ns.last)
+	if sumCommon != sumLast {
+		// Our history diverged from the source.
+		return nil, ErrSplit
+	} else if sumCur := sumSegSize(fileSegs); sumCommon == sumCur {
+		// Nothing new. This shouldn't happen since the maintnerd server is required to handle
+		// the "?waitsizenot=NNN" long polling parameter, so it's a problem if we get here.
+		return nil, fmt.Errorf("maintner.netsource: maintnerd server returned unchanged log segments")
+	}
+	ns.last = fileSegs
+
+	newSegs := trimLeadingSegBytes(fileSegs, sumCommon)
+	return newSegs, nil
 }
 
 func trimLeadingSegBytes(in []fileSeg, trim int64) []fileSeg {
@@ -433,7 +493,9 @@
 	return
 }
 
-func (ns *netMutSource) sendMutations(ctx context.Context, ch chan<- MutationStreamEvent) error {
+// fetchAndSendMutations fetches new mutations from the network mutation source
+// and sends them to ch.
+func (ns *netMutSource) fetchAndSendMutations(ctx context.Context, ch chan<- MutationStreamEvent) error {
 	newSegs, err := ns.getNewSegments(ctx)
 	if err != nil {
 		return err
@@ -485,6 +547,10 @@
 
 // syncSeg syncs the provided log segment, returning its on-disk metadata.
 // The newData result is the new data that was added to the segment in this sync.
+//
+// syncSeg returns an error that matches fetchError with PossiblyRetryable set
+// to true when it has signal that repeating the same call after some time may
+// succeed.
 func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (_ fileSeg, newData []byte, _ error) {
 	if fn := ns.testHookSyncSeg; fn != nil {
 		return fn(ctx, seg)
@@ -523,11 +589,10 @@
 	}
 
 	// Otherwise, download.
-	req, err := http.NewRequest("GET", segURL.String(), nil)
+	req, err := http.NewRequestWithContext(ctx, "GET", segURL.String(), nil)
 	if err != nil {
 		return fileSeg{}, nil, err
 	}
-	req = req.WithContext(ctx)
 	req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", len(have), seg.Size-1))
 
 	if !ns.quiet {
@@ -535,15 +600,19 @@
 	}
 	res, err := http.DefaultClient.Do(req)
 	if err != nil {
-		return fileSeg{}, nil, err
+		return fileSeg{}, nil, fetchError{Err: err, PossiblyRetryable: true}
 	}
-	if res.StatusCode != 200 && res.StatusCode != 206 {
+	defer res.Body.Close()
+	if res.StatusCode/100 == 5 {
+		// Consider a 5xx server response to possibly succeed later.
+		return fileSeg{}, nil, fetchError{Err: fmt.Errorf("%s: %s", segURL.String(), res.Status), PossiblyRetryable: true}
+	} else if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusPartialContent {
 		return fileSeg{}, nil, fmt.Errorf("%s: %s", segURL.String(), res.Status)
 	}
-	slurp, err := ioutil.ReadAll(res.Body)
+	slurp, err := io.ReadAll(res.Body)
 	res.Body.Close()
 	if err != nil {
-		return fileSeg{}, nil, err
+		return fileSeg{}, nil, fetchError{Err: err, PossiblyRetryable: true}
 	}
 
 	var newContents []byte
@@ -557,7 +626,7 @@
 		if len(have) == 0 {
 			return fileSeg{}, nil, errors.New("corrupt download")
 		}
-		// Try again
+		// Try again.
 		os.Remove(partial)
 		return ns.syncSeg(ctx, seg)
 	}
@@ -595,3 +664,16 @@
 	SHA224 string `json:"sha224"`
 	URL    string `json:"url"`
 }
+
+// fetchError records an error during a fetch operation over an unreliable network.
+type fetchError struct {
+	Err error // Non-nil.
+
+	// PossiblyRetryable indicates whether Err is believed to be possibly caused by a
+	// non-terminal network error, such that the caller can expect it may not happen
+	// again if it simply tries the same fetch operation again after waiting a bit.
+	PossiblyRetryable bool
+}
+
+func (e fetchError) Error() string { return e.Err.Error() }
+func (e fetchError) Unwrap() error { return e.Err }
diff --git a/maintner/netsource_test.go b/maintner/netsource_test.go
index 20e4b9e..96123c9 100644
--- a/maintner/netsource_test.go
+++ b/maintner/netsource_test.go
@@ -155,8 +155,9 @@
 		// If empty, prefixSum calls are errors.
 		prefixSum string
 
-		want      []fileSeg
-		wantSplit bool
+		want          []fileSeg
+		wantSplit     bool
+		wantUnchanged bool
 	}
 	tests := []testCase{
 		{
@@ -226,22 +227,20 @@
 			},
 		},
 		{
-			name: "incremental_with_sleep",
+			name: "faulty_server_returns_no_new_data",
 			lastSegs: []fileSeg{
 				{seg: 1, size: 101, sha224: "abc", file: "/fake/0001.mutlog"},
 			},
 			serverSegs: [][]LogSegmentJSON{
 				[]LogSegmentJSON{
-					{Number: 1, Size: 101, SHA224: "abc"},
+					{Number: 1, Size: 101, SHA224: "abc"}, // Same as lastSegs, results in unchanged error.
 				},
 				[]LogSegmentJSON{
 					{Number: 1, Size: 101, SHA224: "abc"},
 					{Number: 2, Size: 102, SHA224: "def"},
 				},
 			},
-			want: []fileSeg{
-				{seg: 2, size: 102, sha224: "def", skip: 0, file: "/fake/0002.mutlog"},
-			},
+			wantUnchanged: true,
 		},
 		{
 			name: "split_error_diff_first_seg_same_size",
@@ -300,13 +299,13 @@
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			serverSegCalls := 0
-			waits := 0
+			syncSegCalls := 0
 			ns := &netMutSource{
 				last: tt.lastSegs,
 				testHookGetServerSegments: func(_ context.Context, waitSizeNot int64) (segs []LogSegmentJSON, err error) {
 					serverSegCalls++
-					if serverSegCalls > 10 {
-						t.Fatalf("infinite loop calling getServerSegments? num wait calls = %v", waits)
+					if serverSegCalls%2 == 1 {
+						return nil, fetchError{PossiblyRetryable: true, Err: fmt.Errorf("fake error to simulate the internet saying 'not this time' every now and then")}
 					}
 					if len(tt.serverSegs) == 0 {
 						return nil, nil
@@ -318,6 +317,10 @@
 					return segs, nil
 				},
 				testHookSyncSeg: func(_ context.Context, seg LogSegmentJSON) (fileSeg, []byte, error) {
+					syncSegCalls++
+					if syncSegCalls%3 == 1 {
+						return fileSeg{}, nil, fetchError{PossiblyRetryable: true, Err: fmt.Errorf("fake error to simulate the internet saying 'not this time' every now and then")}
+					}
 					return fileSeg{
 						seg:    seg.Number,
 						size:   seg.Size,
@@ -334,12 +337,19 @@
 				},
 			}
 			got, err := ns.getNewSegments(context.Background())
-			if tt.wantSplit && err == ErrSplit {
+			if tt.wantSplit {
+				if err != ErrSplit {
+					t.Fatalf("wanted ErrSplit; got %+v, %v", got, err)
+				}
 				// Success.
 				return
 			}
-			if tt.wantSplit {
-				t.Fatalf("wanted ErrSplit; got %+v, %v", got, err)
+			if tt.wantUnchanged {
+				if err == nil || err.Error() != "maintner.netsource: maintnerd server returned unchanged log segments" {
+					t.Fatalf("wanted unchanged; got %+v, %v", got, err)
+				}
+				// Success.
+				return
 			}
 			if err != nil {
 				t.Fatal(err)