maintner, maintner/maintnerd: support long polling for log changes
Addresses a TODO in the code, and removes the old 2 second poll loop.
Change-Id: Id698307bd7404e8ca3946fa16621674cca2eca6b
Reviewed-on: https://go-review.googlesource.com/42871
Reviewed-by: Kevin Burke <kev@inburke.com>
diff --git a/maintner/maintnerd/Dockerfile.0 b/maintner/maintnerd/Dockerfile.0
index 783ecf7..76b4cb2 100644
--- a/maintner/maintnerd/Dockerfile.0
+++ b/maintner/maintnerd/Dockerfile.0
@@ -28,9 +28,21 @@
RUN go get -d cloud.google.com/go/storage && \
cd /go/src/cloud.google.com/go && git reset --hard 2f1da5d762c81a12c516bfb8a9ede96f42750361
-RUN go get -d golang.org/x/crypto/acme/autocert && \
+RUN go get golang.org/x/crypto/acme/autocert && \
cd /go/src/golang.org/x/crypto && git reset --hard c7af5bf2638a1164f2eb5467c39c6cffbd13a02e
+# Optimization to speed COPY+go install steps later. This go install
+# isn't required for correctness.
+RUN go install golang.org/x/oauth2 \
+ golang.org/x/net/context \
+ github.com/google/go-github/github \
+ github.com/gregjones/httpcache \
+ go4.org/types \
+ golang.org/x/sync/errgroup \
+ github.com/golang/protobuf/proto \
+ cloud.google.com/go/storage \
+ golang.org/x/crypto/acme/autocert
+
COPY . /go/src/golang.org/x/build/
RUN go install -ldflags "-linkmode=external -extldflags '-static -pthread'" golang.org/x/build/maintner/maintnerd
diff --git a/maintner/maintnerd/gcslog.go b/maintner/maintnerd/gcslog.go
index fe46002..28d3e79 100644
--- a/maintner/maintnerd/gcslog.go
+++ b/maintner/maintnerd/gcslog.go
@@ -45,6 +45,7 @@
bucket *storage.BucketHandle
mu sync.Mutex // guards the following
+ cond *sync.Cond
seg map[int]gcsLogSegment
curNum int
logBuf bytes.Buffer
@@ -67,17 +68,25 @@
return fmt.Sprintf("{gcsLogSegment num=%v, size=%v, sha=%v, created=%v}", s.num, s.size, s.sha224, s.created.Format(time.RFC3339))
}
+// newGCSLogBase returns a new gcsLog instance without any association
+// with Google Cloud Storage.
+func newGCSLogBase() *gcsLog {
+ gl := &gcsLog{
+ seg: map[int]gcsLogSegment{},
+ }
+ gl.cond = sync.NewCond(&gl.mu)
+ return gl
+}
+
func newGCSLog(ctx context.Context, bucketName string) (*gcsLog, error) {
sc, err := storage.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("storage.NewClient: %v", err)
}
- gl := &gcsLog{
- sc: sc,
- bucketName: bucketName,
- bucket: sc.Bucket(bucketName),
- seg: map[int]gcsLogSegment{},
- }
+ gl := newGCSLogBase()
+ gl.sc = sc
+ gl.bucketName = bucketName
+ gl.bucket = sc.Bucket(bucketName)
if err := gl.initLoad(ctx); err != nil {
return nil, err
}
@@ -192,16 +201,95 @@
startSeg, _ := strconv.Atoi(r.FormValue("startseg"))
if startSeg < 0 {
- http.Error(w, "bad seg", http.StatusBadRequest)
+ http.Error(w, "bad startseg", http.StatusBadRequest)
return
}
+
+ // Long poll if request contains non-zero waitsizenot parameter.
+ // The client's provided 'waitsizenot' value is the sum of the segment
+ // sizes they already know. They're waiting for something new.
+ if s := r.FormValue("waitsizenot"); s != "" {
+ oldSize, err := strconv.ParseInt(s, 10, 64)
+ if err != nil || oldSize < 0 {
+ http.Error(w, "bad waitsizenot", http.StatusBadRequest)
+ return
+ }
+ // Return a 304 if there's no activity in just under a minute.
+ // This keeps some occasional activity on the TCP connection
+ // so we (and any proxies) know it's alive, and can fit
+ // within reason read/write deadlines on either side.
+ ctx, cancel := context.WithTimeout(r.Context(), 55*time.Second)
+ defer cancel()
+ changed := gl.waitSizeNot(ctx, oldSize)
+ if !changed {
+ w.WriteHeader(http.StatusNotModified)
+ return
+ }
+ }
+
segs := gl.getJSONLogs(startSeg)
w.Header().Set("Content-Type", "application/json")
+ w.Header().Set("X-Sum-Segment-Size", fmt.Sprint(sumSegmentSizes(segs)))
+
body, _ := json.MarshalIndent(segs, "", "\t")
w.Write(body)
}
+// sumSegmentSizes returns the sum of each seg.Size in segs.
+func sumSegmentSizes(segs []maintner.LogSegmentJSON) (sum int64) {
+ for _, seg := range segs {
+ sum += seg.Size
+ }
+ return sum
+}
+
+// waitSizeNot blocks until the sum of gcsLog is not v, or the context expires.
+// It reports whether the size changed.
+func (gl *gcsLog) waitSizeNot(ctx context.Context, v int64) (changed bool) {
+ returned := make(chan struct{})
+ defer close(returned)
+ go gl.waitSizeNotAwaitContextOrChange(ctx, returned)
+ gl.mu.Lock()
+ defer gl.mu.Unlock()
+ for {
+ if curSize := gl.sumSizeLocked(); curSize != v {
+ log.Printf("waitSize fired. from %d => %d", v, curSize)
+ return true
+ }
+ select {
+ case <-ctx.Done():
+ return false
+ default:
+ gl.cond.Wait()
+ }
+ }
+}
+
+// waitSizeNotAwaitContextOrChange is part of waitSizeNot.
+// It's a goroutine that selects on two channels and calls
+// sync.Cond.Broadcast to wake up the waitSizeNot waiter if the
+// context expires.
+func (gl *gcsLog) waitSizeNotAwaitContextOrChange(ctx context.Context, returned <-chan struct{}) {
+ select {
+ case <-ctx.Done():
+ gl.cond.Broadcast()
+ case <-returned:
+ // No need to do a wakeup. Caller is already gone.
+ }
+}
+
+func (gl *gcsLog) sumSizeLocked() int64 {
+ var sum int64
+ for n, seg := range gl.seg {
+ if n != gl.curNum {
+ sum += seg.size
+ }
+ }
+ sum += int64(gl.logBuf.Len())
+ return sum
+}
+
func (gl *gcsLog) getJSONLogs(startSeg int) (segs []maintner.LogSegmentJSON) {
gl.mu.Lock()
defer gl.mu.Unlock()
@@ -272,6 +360,7 @@
if err := reclog.WriteRecord(gcsLogWriter{gl}, int64(gl.logBuf.Len()), data); err != nil {
return err
}
+ gl.cond.Broadcast() // wake any long-polling subscribers
// Otherwise schedule a periodic flush.
if gl.flushTimer == nil {
diff --git a/maintner/maintnerd/gcslog_test.go b/maintner/maintnerd/gcslog_test.go
new file mode 100644
index 0000000..209ca61
--- /dev/null
+++ b/maintner/maintnerd/gcslog_test.go
@@ -0,0 +1,44 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "golang.org/x/build/maintner/maintpb"
+)
+
+func TestGCSLogWakeup_Timeout(t *testing.T) {
+ testGCSLogWakeup(t, false)
+}
+
+func TestGCSLogWakeup_Activity(t *testing.T) {
+ testGCSLogWakeup(t, true)
+}
+
+func testGCSLogWakeup(t *testing.T, activity bool) {
+ gl := newGCSLogBase()
+ waitc := make(chan bool, 1)
+ ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer cancel()
+ go func() {
+ waitc <- gl.waitSizeNot(ctx, 0)
+ }()
+ if activity {
+ if err := gl.Log(new(maintpb.Mutation)); err != nil {
+ t.Fatal(err)
+ }
+ }
+ select {
+ case got := <-waitc:
+ if got != activity {
+ t.Errorf("changed = %v; want %v", got, activity)
+ }
+ case <-time.After(2 * time.Second):
+ t.Errorf("timeout")
+ }
+}
diff --git a/maintner/maintnerd/maintnerd.go b/maintner/maintnerd/maintnerd.go
index 90b7ce7..1081ca3 100644
--- a/maintner/maintnerd/maintnerd.go
+++ b/maintner/maintnerd/maintnerd.go
@@ -186,7 +186,19 @@
http.NotFound(w, r)
return
}
- io.WriteString(w, "<html><body>This is <a href='https://godoc.org/golang.org/x/build/maintner/maintnerd'>maintnerd</a>, the <a href='https://godoc.org/golang.org/x/build/maintner'>maintner</a> server.</body>")
+ io.WriteString(w, `<html>
+<body>
+<p>
+ This is <a href='https://godoc.org/golang.org/x/build/maintner/maintnerd'>maintnerd</a>,
+ the <a href='https://godoc.org/golang.org/x/build/maintner'>maintner</a> server.
+ See the <a href='https://godoc.org/golang.org/x/build/maintner/godata'>godata package</a> for
+ a client.
+</p>
+<ul>
+ <li><a href='/logs'>/logs</a>
+</ul>
+</body></html>
+`)
})
errc := make(chan error)
diff --git a/maintner/netsource.go b/maintner/netsource.go
index af1e717..df91149 100644
--- a/maintner/netsource.go
+++ b/maintner/netsource.go
@@ -47,10 +47,10 @@
last []fileSeg
// Hooks for testing. If nil, unused:
- testHookGetServerSegments func(context.Context) ([]LogSegmentJSON, error)
- testHookWaitForServerSegmentUpdate func(context.Context) error
- testHookSyncSeg func(context.Context, LogSegmentJSON) (fileSeg, error)
- testHookFilePrefixSum224 func(file string, n int64) string
+ testHookGetServerSegments func(context.Context, int64) ([]LogSegmentJSON, error)
+ testHookWaitAfterServerDupData func(context.Context) error
+ testHookSyncSeg func(context.Context, LogSegmentJSON) (fileSeg, error)
+ testHookFilePrefixSum224 func(file string, n int64) string
}
func (ns *netMutSource) GetMutations(ctx context.Context) <-chan MutationStreamEvent {
@@ -69,60 +69,54 @@
return ch
}
-func (ns *netMutSource) waitForServerSegmentUpdate(ctx context.Context) error {
- if fn := ns.testHookWaitForServerSegmentUpdate; fn != nil {
- return fn(ctx)
- }
-
- // TODO: few second sleep is dumb. make it
- // subscribe to pubsubhelper? maybe the
- // server's response header should reference
- // its pubsubhelper server URL. but then we
- // can't assume activity means it'll be picked
- // up right away. so maybe wait for activity,
- // and then poll every second for 10 seconds
- // or so, or until there's changes, and then
- // go back to every 2 second polling or
- // something. or maybe the maintnerd server should
- // have its own long poll functionality.
- // for now, just 2 second polling:
- select {
- case <-time.After(2 * time.Second):
- return nil
- case <-ctx.Done():
- return ctx.Err()
- }
-}
-
-func (ns *netMutSource) getServerSegments(ctx context.Context) ([]LogSegmentJSON, error) {
+// waitSizeNot optionally specifies that the request should long-poll waiting for the server
+// to have a sum of log segment sizes different than the value specified.
+func (ns *netMutSource) getServerSegments(ctx context.Context, waitSizeNot int64) ([]LogSegmentJSON, error) {
if fn := ns.testHookGetServerSegments; fn != nil {
- return fn(ctx)
+ return fn(ctx, waitSizeNot)
}
- req, err := http.NewRequest("GET", ns.server, nil)
- if err != nil {
- return nil, err
+ logsURL := ns.server
+ if waitSizeNot > 0 {
+ logsURL += fmt.Sprintf("?waitsizenot=%d", waitSizeNot)
}
- req = req.WithContext(ctx)
- res, err := http.DefaultClient.Do(req)
- if err != nil {
- return nil, err
+ for {
+ req, err := http.NewRequest("GET", logsURL, nil)
+ if err != nil {
+ return nil, err
+ }
+ req = req.WithContext(ctx)
+ res, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ // If we're doing a long poll and the server replies
+ // with a 304 response, that means the server is just
+ // heart-beating us and trying to get a response back
+ // within its various deadlines. But we should just
+ // try again.
+ if waitSizeNot > 0 && res.StatusCode == http.StatusNotModified {
+ res.Body.Close()
+ continue
+ }
+ defer res.Body.Close()
+ if res.StatusCode != 200 {
+ return nil, fmt.Errorf("%s: %v", ns.server, res.Status)
+ }
+ var segs []LogSegmentJSON
+ err = json.NewDecoder(res.Body).Decode(&segs)
+ if err != nil {
+ return nil, fmt.Errorf("decoding %s JSON: %v", ns.server, err)
+ }
+ return segs, nil
}
- defer res.Body.Close()
- if res.StatusCode != 200 {
- return nil, fmt.Errorf("%s: %v", ns.server, res.Status)
- }
- var segs []LogSegmentJSON
- err = json.NewDecoder(res.Body).Decode(&segs)
- if err != nil {
- return nil, fmt.Errorf("decoding %s JSON: %v", ns.server, err)
- }
- return segs, nil
}
func (ns *netMutSource) getNewSegments(ctx context.Context) ([]fileSeg, error) {
for {
- segs, err := ns.getServerSegments(ctx)
+ sumLast := sumSegSize(ns.last)
+
+ segs, err := ns.getServerSegments(ctx, sumLast)
if err != nil {
return nil, err
}
@@ -137,16 +131,25 @@
}
fileSegs = append(fileSegs, fileSeg)
}
- sumLast := sumSegSize(ns.last)
sumCommon := ns.sumCommonPrefixSize(fileSegs, ns.last)
if sumLast != sumCommon {
return nil, ErrSplit
}
sumCur := sumSegSize(fileSegs)
if sumCommon == sumCur {
- // Nothing new. Wait.
- if err := ns.waitForServerSegmentUpdate(ctx); err != nil {
- return nil, err
+ // Nothing new. This shouldn't happen once the
+ // server is updated to respect the
+ // "?waitsizenot=NNN" long polling parameter.
+ // But keep this brief pause as a backup to
+ // prevent spinning and because clients &
+ // servers won't be updated simultaneously.
+ if ns.testHookGetServerSegments == nil {
+ log.Printf("maintner.netsource: server returned unchanged log segments; old server?")
+ }
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case <-time.After(1 * time.Second):
}
continue
}
diff --git a/maintner/netsource_test.go b/maintner/netsource_test.go
index 7ac896b..2920cc8 100644
--- a/maintner/netsource_test.go
+++ b/maintner/netsource_test.go
@@ -155,7 +155,6 @@
// If empty, prefixSum calls are errors.
prefixSum string
- wantWaits int
want []fileSeg
wantSplit bool
}
@@ -240,7 +239,6 @@
{Number: 2, Size: 102, SHA224: "def"},
},
},
- wantWaits: 1,
want: []fileSeg{
{seg: 2, size: 102, sha224: "def", skip: 0, file: "/fake/0002.mutlog"},
},
@@ -305,7 +303,7 @@
waits := 0
ns := &netMutSource{
last: tt.lastSegs,
- testHookGetServerSegments: func(context.Context) (segs []LogSegmentJSON, err error) {
+ testHookGetServerSegments: func(_ context.Context, waitSizeNot int64) (segs []LogSegmentJSON, err error) {
serverSegCalls++
if serverSegCalls > 10 {
t.Fatalf("infinite loop calling getServerSegments? num wait calls = %v", waits)
@@ -319,10 +317,6 @@
}
return segs, nil
},
- testHookWaitForServerSegmentUpdate: func(context.Context) error {
- waits++
- return nil
- },
testHookSyncSeg: func(_ context.Context, seg LogSegmentJSON) (fileSeg, error) {
return fileSeg{
seg: seg.Number,
@@ -353,10 +347,6 @@
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("mismatch\n got: %+v\nwant: %+v\n", got, tt.want)
}
- if tt.wantWaits != waits {
- t.Errorf("wait calls = %v; want %v", waits, tt.wantWaits)
- }
-
})
}
}