blob: 128222a7438ca4de279b1a253aeff42c12bedf14 [file] [log] [blame]
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package maintner
import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
"io/fs"
"log"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"golang.org/x/build/maintner/internal/robustio"
"golang.org/x/build/maintner/maintpb"
"golang.org/x/build/maintner/reclog"
"google.golang.org/protobuf/proto"
)
// NewNetworkMutationSource returns a mutation source from a master server.
// The server argument should be a URL to the JSON logs index.
func NewNetworkMutationSource(server, cacheDir string) MutationSource {
base, err := url.Parse(server)
if err != nil {
panic(fmt.Sprintf("invalid URL: %q", server))
}
return &netMutSource{
server: server,
base: base,
cacheDir: cacheDir,
}
}
// TailNetworkMutationSource calls fn for all new mutations added to the log on server.
// Events with the End field set to true are not sent, so all events will
// have exactly one of Mutation or Err fields set to a non-zero value.
// It ignores prior events.
// If the server is restarted and its history diverges,
// TailNetworkMutationSource may return duplicate events. This therefore does not
// return a MutationSource, so it can't be accidentally misused for important things.
// TailNetworkMutationSource returns if fn returns an error, if ctx expires,
// or if it runs into a network error.
func TailNetworkMutationSource(ctx context.Context, server string, fn func(MutationStreamEvent) error) error {
td, err := os.MkdirTemp("", "maintnertail")
if err != nil {
return err
}
defer robustio.RemoveAll(td)
ns := NewNetworkMutationSource(server, td).(*netMutSource)
ns.quiet = true
getSegs := func(waitSizeNot int64) ([]LogSegmentJSON, error) {
for {
segs, err := ns.getServerSegments(ctx, waitSizeNot)
if err != nil {
if err := ctx.Err(); err != nil {
return nil, err
}
// Sleep a minimum of 5 seconds before trying
// again. The user's fn function might sleep
// longer or shorter.
timer := time.NewTimer(5 * time.Second)
err := fn(MutationStreamEvent{Err: err})
if err != nil {
timer.Stop()
return nil, err
}
<-timer.C
continue
}
return segs, nil
}
}
// See how long the log is at start. Then we'll only fetch
// things after that.
segs, err := getSegs(0)
if err != nil {
return err
}
segSize := sumJSONSegSize(segs)
lastSeg := segs[len(segs)-1]
if _, _, err := ns.syncSeg(ctx, lastSeg); err != nil {
return err
}
ticker := time.NewTicker(time.Second) // max re-fetch interval
defer ticker.Stop()
for {
segs, err := getSegs(segSize)
if err != nil {
return err
}
segSize = sumJSONSegSize(segs)
for _, seg := range segs {
if seg.Number < lastSeg.Number {
continue
}
var off int64
if seg.Number == lastSeg.Number {
off = lastSeg.Size
}
_, newData, err := ns.syncSeg(ctx, seg)
if err != nil {
return err
}
if err := reclog.ForeachRecord(bytes.NewReader(newData), off, func(off int64, hdr, rec []byte) error {
m := new(maintpb.Mutation)
if err := proto.Unmarshal(rec, m); err != nil {
return err
}
return fn(MutationStreamEvent{Mutation: m})
}); err != nil {
return err
}
}
lastSeg = segs[len(segs)-1]
<-ticker.C
}
}
type netMutSource struct {
server string
base *url.URL
cacheDir string
last []fileSeg
quiet bool // disable verbose logging
// Hooks for testing. If nil, unused:
testHookGetServerSegments func(context.Context, int64) ([]LogSegmentJSON, error)
testHookSyncSeg func(context.Context, LogSegmentJSON) (fileSeg, []byte, error)
testHookOnSplit func(sumCommon int64)
testHookFilePrefixSum224 func(file string, n int64) string
}
func (ns *netMutSource) GetMutations(ctx context.Context) <-chan MutationStreamEvent {
ch := make(chan MutationStreamEvent, 50)
go func() {
err := ns.fetchAndSendMutations(ctx, ch)
final := MutationStreamEvent{Err: err}
if err == nil {
final.End = true
}
select {
case ch <- final:
case <-ctx.Done():
}
}()
return ch
}
// isNoInternetError reports whether the provided error is because there's no
// network connectivity.
func isNoInternetError(err error) bool {
if err == nil {
return false
}
switch err := err.(type) {
case fetchError:
return isNoInternetError(err.Err)
case *url.Error:
return isNoInternetError(err.Err)
case *net.OpError:
return isNoInternetError(err.Err)
case *net.DNSError:
// Trashy:
return err.Err == "no such host"
default:
log.Printf("Unknown error type %T: %#v", err, err)
return false
}
}
func (ns *netMutSource) locallyCachedSegments() (segs []fileSeg, err error) {
defer func() {
if err != nil {
log.Printf("No network connection and failed to use local cache: %v", err)
} else {
log.Printf("No network connection; using %d locally cached segments.", len(segs))
}
}()
des, err := os.ReadDir(ns.cacheDir)
if err != nil {
return nil, err
}
fiMap := map[string]fs.FileInfo{}
segHex := map[int]string{}
segGrowing := map[int]bool{}
for _, de := range des {
name := de.Name()
if !strings.HasSuffix(name, ".mutlog") {
continue
}
fi, err := de.Info()
if err != nil {
return nil, err
}
fiMap[name] = fi
if len(name) == len("0000.6897fab4d3afcda332424b2a2a1a4469021074282bc7be5606aaa221.mutlog") {
num, err := strconv.Atoi(name[:4])
if err != nil {
continue
}
segHex[num] = strings.TrimSuffix(name[5:], ".mutlog")
} else if strings.HasSuffix(name, ".growing.mutlog") {
num, err := strconv.Atoi(name[:4])
if err != nil {
continue
}
segGrowing[num] = true
}
}
for num := 0; ; num++ {
if hex, ok := segHex[num]; ok {
name := fmt.Sprintf("%04d.%s.mutlog", num, hex)
segs = append(segs, fileSeg{
seg: num,
file: filepath.Join(ns.cacheDir, name),
size: fiMap[name].Size(),
sha224: hex,
})
continue
}
if segGrowing[num] {
name := fmt.Sprintf("%04d.growing.mutlog", num)
slurp, err := robustio.ReadFile(filepath.Join(ns.cacheDir, name))
if err != nil {
return nil, err
}
segs = append(segs, fileSeg{
seg: num,
file: filepath.Join(ns.cacheDir, name),
size: int64(len(slurp)),
sha224: fmt.Sprintf("%x", sha256.Sum224(slurp)),
})
}
return segs, nil
}
}
// getServerSegments fetches the JSON logs handler (ns.server, usually
// https://maintner.golang.org/logs) and returns the parsed JSON.
// It sends the "waitsizenot" URL parameter, which specifies that the
// request should long-poll waiting for the server to have a sum of
// log segment sizes different than the value specified. As a result,
// it blocks until the server has new data to send or ctx expires.
//
// getServerSegments returns an error that matches fetchError with
// PossiblyRetryable set to true when it has signal that repeating
// the same call after some time may succeed.
func (ns *netMutSource) getServerSegments(ctx context.Context, waitSizeNot int64) ([]LogSegmentJSON, error) {
if fn := ns.testHookGetServerSegments; fn != nil {
return fn(ctx, waitSizeNot)
}
logsURL := fmt.Sprintf("%s?waitsizenot=%d", ns.server, waitSizeNot)
for {
req, err := http.NewRequestWithContext(ctx, "GET", logsURL, nil)
if err != nil {
return nil, err
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fetchError{Err: err, PossiblyRetryable: true}
}
// When we're doing a long poll and the server replies
// with a 304 response, that means the server is just
// heart-beating us and trying to get a response back
// within its various deadlines. But we should just
// try again.
if res.StatusCode == http.StatusNotModified {
res.Body.Close()
continue
}
defer res.Body.Close()
if res.StatusCode/100 == 5 {
// Consider a 5xx server response to possibly succeed later.
return nil, fetchError{Err: fmt.Errorf("%s: %v", ns.server, res.Status), PossiblyRetryable: true}
} else if res.StatusCode != http.StatusOK {
return nil, fmt.Errorf("%s: %v", ns.server, res.Status)
}
b, err := io.ReadAll(res.Body)
if err != nil {
return nil, fetchError{Err: err, PossiblyRetryable: true}
}
var segs []LogSegmentJSON
err = json.Unmarshal(b, &segs)
if err != nil {
return nil, fmt.Errorf("unmarshaling %s JSON: %v", ns.server, err)
}
return segs, nil
}
}
// getNewSegments fetches new mutations from the network mutation source.
// It tries to absorb the expected network bumps by trying multiple times,
// and returns an error only when it considers the problem to be terminal.
//
// If there's no internet connectivity from the start, it returns locally
// cached segments that might be available from before. Otherwise it waits
// for internet connectivity to come back and keeps going when it does.
func (ns *netMutSource) getNewSegments(ctx context.Context) ([]fileSeg, error) {
sumLast := sumSegSize(ns.last)
// First, fetch JSON metadata for the segments from the server.
var serverSegs []LogSegmentJSON
for try := 1; ; {
segs, err := ns.getServerSegments(ctx, sumLast)
if isNoInternetError(err) {
if sumLast == 0 {
return ns.locallyCachedSegments()
}
log.Printf("No internet; blocking.")
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(15 * time.Second):
try = 1
continue
}
} else if fe := (fetchError{}); errors.As(err, &fe) && fe.PossiblyRetryable {
// Fetching the JSON logs handler happens over an unreliable network connection,
// and will fail at some point. Prefer to try again over reporting a terminal error.
const maxTries = 5
if try == maxTries {
// At this point, promote it to a terminal error.
return nil, fmt.Errorf("after %d attempts, fetching server segments still failed: %v", maxTries, err)
}
someDelay := time.Duration(try*try) * time.Second
log.Printf("fetching server segments did not succeed on attempt %d, will try again in %v: %v", try, someDelay, err)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(someDelay):
try++
continue
}
} else if err != nil {
return nil, err
}
serverSegs = segs
break
}
// TODO: optimization: if already on GCE, skip sync to disk part and just
// read from network. fast & free network inside.
// Second, fetch the new segments or their fragments
// that we don't yet have locally.
var fileSegs []fileSeg
for _, seg := range serverSegs {
for try := 1; ; {
fileSeg, _, err := ns.syncSeg(ctx, seg)
if isNoInternetError(err) {
log.Printf("No internet; blocking.")
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(15 * time.Second):
try = 1
continue
}
} else if fe := (fetchError{}); errors.As(err, &fe) && fe.PossiblyRetryable {
// Syncing a segment fetches a good deal of data over a network connection,
// and will fail at some point. Be very willing to try again at this layer,
// since it's much more efficient than having GetMutations return an error
// and possibly cause a higher level retry to redo significantly more work.
const maxTries = 10
if try == maxTries {
// At this point, promote it to a terminal error.
return nil, fmt.Errorf("after %d attempts, syncing segment %d still failed: %v", maxTries, seg.Number, err)
}
someDelay := time.Duration(try*try) * time.Second
log.Printf("syncing segment %d did not succeed on attempt %d, will try again in %v: %v", seg.Number, try, someDelay, err)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(someDelay):
try++
continue
}
} else if err != nil {
return nil, err
}
fileSegs = append(fileSegs, fileSeg)
break
}
}
// Verify consistency of newly fetched data,
// and check there is in fact something new.
sumCommon := ns.sumCommonPrefixSize(fileSegs, ns.last)
if sumCommon != sumLast {
if fn := ns.testHookOnSplit; fn != nil {
fn(sumCommon)
}
// Our history diverged from the source.
return nil, ErrSplit
} else if sumCur := sumSegSize(fileSegs); sumCommon == sumCur {
// Nothing new. This shouldn't happen since the maintnerd server is required to handle
// the "?waitsizenot=NNN" long polling parameter, so it's a problem if we get here.
return nil, fmt.Errorf("maintner.netsource: maintnerd server returned unchanged log segments")
}
ns.last = fileSegs
newSegs := trimLeadingSegBytes(fileSegs, sumCommon)
return newSegs, nil
}
func trimLeadingSegBytes(in []fileSeg, trim int64) []fileSeg {
// First trim off whole segments, sharing the same underlying memory.
for len(in) > 0 && trim >= in[0].size {
trim -= in[0].size
in = in[1:]
}
if len(in) == 0 {
return nil
}
// Now copy, since we'll be modifying the first element.
out := append([]fileSeg(nil), in...)
out[0].skip = trim
return out
}
// filePrefixSum224 returns the lowercase hex SHA-224 of the first n bytes of file.
func (ns *netMutSource) filePrefixSum224(file string, n int64) string {
if fn := ns.testHookFilePrefixSum224; fn != nil {
return fn(file, n)
}
f, err := os.Open(file)
if err != nil {
if !os.IsNotExist(err) {
log.Print(err)
}
return ""
}
defer f.Close()
h := sha256.New224()
_, err = io.CopyN(h, f, n)
if err != nil {
log.Print(err)
return ""
}
return fmt.Sprintf("%x", h.Sum(nil))
}
func sumSegSize(segs []fileSeg) (sum int64) {
for _, seg := range segs {
sum += seg.size
}
return
}
func sumJSONSegSize(segs []LogSegmentJSON) (sum int64) {
for _, seg := range segs {
sum += seg.Size
}
return
}
// sumCommonPrefixSize computes the size of the longest common prefix of file segments a and b
// that can be found quickly by checking for matching checksums between segment boundaries.
func (ns *netMutSource) sumCommonPrefixSize(a, b []fileSeg) (sum int64) {
for len(a) > 0 && len(b) > 0 {
sa, sb := a[0], b[0]
if sa.sha224 == sb.sha224 {
// Whole chunk in common.
sum += sa.size
a, b = a[1:], b[1:]
continue
}
if sa.size == sb.size {
// If they're the same size but different
// sums, it must've forked.
return
}
// See if one chunk is a prefix of the other.
// Make sa be the smaller one.
if sb.size < sa.size {
sa, sb = sb, sa
}
// Hash the beginning of the bigger size.
bPrefixSum := ns.filePrefixSum224(sb.file, sa.size)
if bPrefixSum == sa.sha224 {
sum += sa.size
}
break
}
return
}
// fetchAndSendMutations fetches new mutations from the network mutation source
// and sends them to ch.
func (ns *netMutSource) fetchAndSendMutations(ctx context.Context, ch chan<- MutationStreamEvent) error {
newSegs, err := ns.getNewSegments(ctx)
if err != nil {
return err
}
return foreachFileSeg(newSegs, func(seg fileSeg) error {
f, err := os.Open(seg.file)
if err != nil {
return err
}
defer f.Close()
if seg.skip > 0 {
if _, err := f.Seek(seg.skip, io.SeekStart); err != nil {
return err
}
}
return reclog.ForeachRecord(io.LimitReader(f, seg.size-seg.skip), seg.skip, func(off int64, hdr, rec []byte) error {
m := new(maintpb.Mutation)
if err := proto.Unmarshal(rec, m); err != nil {
return err
}
select {
case ch <- MutationStreamEvent{Mutation: m}:
return nil
case <-ctx.Done():
return ctx.Err()
}
})
})
}
func foreachFileSeg(segs []fileSeg, fn func(seg fileSeg) error) error {
for _, seg := range segs {
if err := fn(seg); err != nil {
return err
}
}
return nil
}
// TODO: add a constructor for this? or simplify it. make it Size +
// File + embedded LogSegmentJSON?
type fileSeg struct {
seg int
file string // full path
sha224 string
skip int64
size int64
}
// syncSeg syncs the provided log segment, returning its on-disk metadata.
// The newData result is the new data that was added to the segment in this sync.
//
// syncSeg returns an error that matches fetchError with PossiblyRetryable set
// to true when it has signal that repeating the same call after some time may
// succeed.
func (ns *netMutSource) syncSeg(ctx context.Context, seg LogSegmentJSON) (_ fileSeg, newData []byte, _ error) {
if fn := ns.testHookSyncSeg; fn != nil {
return fn(ctx, seg)
}
isFinalSeg := !strings.HasPrefix(seg.URL, "https://storage.googleapis.com/")
relURL, err := url.Parse(seg.URL)
if err != nil {
return fileSeg{}, nil, err
}
segURL := ns.base.ResolveReference(relURL)
frozen := filepath.Join(ns.cacheDir, fmt.Sprintf("%04d.%s.mutlog", seg.Number, seg.SHA224))
// Do we already have it? Files named in their final form with the sha224 are considered
// complete and immutable.
if fi, err := os.Stat(frozen); err == nil && fi.Size() == seg.Size {
return fileSeg{seg: seg.Number, file: frozen, size: fi.Size(), sha224: seg.SHA224}, nil, nil
}
// See how much data we already have in the partial growing file.
partial := filepath.Join(ns.cacheDir, fmt.Sprintf("%04d.growing.mutlog", seg.Number))
have, _ := robustio.ReadFile(partial)
if int64(len(have)) == seg.Size {
got224 := fmt.Sprintf("%x", sha256.Sum224(have))
if got224 == seg.SHA224 {
if !isFinalSeg {
// This was growing for us, but the server started a new growing segment.
if err := robustio.Rename(partial, frozen); err != nil {
return fileSeg{}, nil, err
}
return fileSeg{seg: seg.Number, file: frozen, sha224: seg.SHA224, size: seg.Size}, nil, nil
}
return fileSeg{seg: seg.Number, file: partial, sha224: seg.SHA224, size: seg.Size}, nil, nil
}
}
// Otherwise, download new data.
if int64(len(have)) < seg.Size {
req, err := http.NewRequestWithContext(ctx, "GET", segURL.String(), nil)
if err != nil {
return fileSeg{}, nil, err
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", len(have), seg.Size-1))
if !ns.quiet {
log.Printf("Downloading %d bytes of %s ...", seg.Size-int64(len(have)), segURL)
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return fileSeg{}, nil, fetchError{Err: err, PossiblyRetryable: true}
}
defer res.Body.Close()
if res.StatusCode/100 == 5 {
// Consider a 5xx server response to possibly succeed later.
return fileSeg{}, nil, fetchError{Err: fmt.Errorf("%s: %s", segURL.String(), res.Status), PossiblyRetryable: true}
} else if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusPartialContent {
return fileSeg{}, nil, fmt.Errorf("%s: %s", segURL.String(), res.Status)
}
newData, err = io.ReadAll(res.Body)
res.Body.Close()
if err != nil {
return fileSeg{}, nil, fetchError{Err: err, PossiblyRetryable: true}
}
}
// Commit to disk.
var newContents []byte
if int64(len(newData)) == seg.Size {
newContents = newData
} else if int64(len(have)+len(newData)) == seg.Size {
newContents = append(have, newData...)
} else if int64(len(have)) > seg.Size {
// We have more data than the server; likely because it restarted with uncommitted
// transactions, and so we're headed towards an ErrSplit. Reuse the longest common
// prefix as long as its checksum matches.
newContents = have[:seg.Size]
}
got224 := fmt.Sprintf("%x", sha256.Sum224(newContents))
if got224 != seg.SHA224 {
if len(have) == 0 {
return fileSeg{}, nil, errors.New("corrupt download")
}
// Try again.
os.Remove(partial)
return ns.syncSeg(ctx, seg)
}
// TODO: this is a quadratic amount of write I/O as the 16 MB
// segment grows. Switch to appending to the existing file,
// then perhaps encoding the desired file size into the
// filename suffix (instead of just *.growing.mutlog) so
// concurrent readers know where to stop.
tf, err := os.CreateTemp(ns.cacheDir, "tempseg")
if err != nil {
return fileSeg{}, nil, err
}
if _, err := tf.Write(newContents); err != nil {
return fileSeg{}, nil, err
}
if err := tf.Close(); err != nil {
return fileSeg{}, nil, err
}
finalName := partial
if !isFinalSeg {
finalName = frozen
}
if err := robustio.Rename(tf.Name(), finalName); err != nil {
return fileSeg{}, nil, err
}
if !ns.quiet {
log.Printf("wrote %v", finalName)
}
return fileSeg{seg: seg.Number, file: finalName, size: seg.Size, sha224: seg.SHA224}, newData, nil
}
type LogSegmentJSON struct {
Number int `json:"number"`
Size int64 `json:"size"`
SHA224 string `json:"sha224"`
URL string `json:"url"`
}
// fetchError records an error during a fetch operation over an unreliable network.
type fetchError struct {
Err error // Non-nil.
// PossiblyRetryable indicates whether Err is believed to be possibly caused by a
// non-terminal network error, such that the caller can expect it may not happen
// again if it simply tries the same fetch operation again after waiting a bit.
PossiblyRetryable bool
}
func (e fetchError) Error() string { return e.Err.Error() }
func (e fetchError) Unwrap() error { return e.Err }