maintner, maintner/maintnerd: support long polling for log changes

Addresses a TODO in the code, and removes the old 2 second poll loop.

Change-Id: Id698307bd7404e8ca3946fa16621674cca2eca6b
Reviewed-on: https://go-review.googlesource.com/42871
Reviewed-by: Kevin Burke <kev@inburke.com>
diff --git a/maintner/maintnerd/Dockerfile.0 b/maintner/maintnerd/Dockerfile.0
index 783ecf7..76b4cb2 100644
--- a/maintner/maintnerd/Dockerfile.0
+++ b/maintner/maintnerd/Dockerfile.0
@@ -28,9 +28,21 @@
 RUN go get -d cloud.google.com/go/storage && \
     cd /go/src/cloud.google.com/go && git reset --hard 2f1da5d762c81a12c516bfb8a9ede96f42750361
 
-RUN go get -d golang.org/x/crypto/acme/autocert && \
+RUN go get golang.org/x/crypto/acme/autocert && \
     cd /go/src/golang.org/x/crypto && git reset --hard c7af5bf2638a1164f2eb5467c39c6cffbd13a02e
 
+# Optimization to speed COPY+go install steps later. This go install
+# isn't required for correctness.
+RUN go install golang.org/x/oauth2 \
+        golang.org/x/net/context \
+        github.com/google/go-github/github \
+        github.com/gregjones/httpcache \
+        go4.org/types \
+        golang.org/x/sync/errgroup \
+        github.com/golang/protobuf/proto \
+        cloud.google.com/go/storage \
+        golang.org/x/crypto/acme/autocert
+
 COPY . /go/src/golang.org/x/build/
 
 RUN go install -ldflags "-linkmode=external -extldflags '-static -pthread'" golang.org/x/build/maintner/maintnerd
diff --git a/maintner/maintnerd/gcslog.go b/maintner/maintnerd/gcslog.go
index fe46002..28d3e79 100644
--- a/maintner/maintnerd/gcslog.go
+++ b/maintner/maintnerd/gcslog.go
@@ -45,6 +45,7 @@
 	bucket     *storage.BucketHandle
 
 	mu         sync.Mutex // guards the following
+	cond       *sync.Cond
 	seg        map[int]gcsLogSegment
 	curNum     int
 	logBuf     bytes.Buffer
@@ -67,17 +68,25 @@
 	return fmt.Sprintf("{gcsLogSegment num=%v, size=%v, sha=%v, created=%v}", s.num, s.size, s.sha224, s.created.Format(time.RFC3339))
 }
 
+// newGCSLogBase returns a new gcsLog instance without any association
+// with Google Cloud Storage.
+func newGCSLogBase() *gcsLog {
+	gl := &gcsLog{
+		seg: map[int]gcsLogSegment{},
+	}
+	gl.cond = sync.NewCond(&gl.mu)
+	return gl
+}
+
 func newGCSLog(ctx context.Context, bucketName string) (*gcsLog, error) {
 	sc, err := storage.NewClient(ctx)
 	if err != nil {
 		return nil, fmt.Errorf("storage.NewClient: %v", err)
 	}
-	gl := &gcsLog{
-		sc:         sc,
-		bucketName: bucketName,
-		bucket:     sc.Bucket(bucketName),
-		seg:        map[int]gcsLogSegment{},
-	}
+	gl := newGCSLogBase()
+	gl.sc = sc
+	gl.bucketName = bucketName
+	gl.bucket = sc.Bucket(bucketName)
 	if err := gl.initLoad(ctx); err != nil {
 		return nil, err
 	}
@@ -192,16 +201,95 @@
 
 	startSeg, _ := strconv.Atoi(r.FormValue("startseg"))
 	if startSeg < 0 {
-		http.Error(w, "bad seg", http.StatusBadRequest)
+		http.Error(w, "bad startseg", http.StatusBadRequest)
 		return
 	}
+
+	// Long poll if request contains non-zero waitsizenot parameter.
+	// The client's provided 'waitsizenot' value is the sum of the segment
+	// sizes they already know. They're waiting for something new.
+	if s := r.FormValue("waitsizenot"); s != "" {
+		oldSize, err := strconv.ParseInt(s, 10, 64)
+		if err != nil || oldSize < 0 {
+			http.Error(w, "bad waitsizenot", http.StatusBadRequest)
+			return
+		}
+		// Return a 304 if there's no activity in just under a minute.
+		// This keeps some occasional activity on the TCP connection
+		// so we (and any proxies) know it's alive, and can fit
+		// within reason read/write deadlines on either side.
+		ctx, cancel := context.WithTimeout(r.Context(), 55*time.Second)
+		defer cancel()
+		changed := gl.waitSizeNot(ctx, oldSize)
+		if !changed {
+			w.WriteHeader(http.StatusNotModified)
+			return
+		}
+	}
+
 	segs := gl.getJSONLogs(startSeg)
 
 	w.Header().Set("Content-Type", "application/json")
+	w.Header().Set("X-Sum-Segment-Size", fmt.Sprint(sumSegmentSizes(segs)))
+
 	body, _ := json.MarshalIndent(segs, "", "\t")
 	w.Write(body)
 }
 
+// sumSegmentSizes returns the sum of each seg.Size in segs.
+func sumSegmentSizes(segs []maintner.LogSegmentJSON) (sum int64) {
+	for _, seg := range segs {
+		sum += seg.Size
+	}
+	return sum
+}
+
+// waitSizeNot blocks until the sum of gcsLog is not v, or the context expires.
+// It reports whether the size changed.
+func (gl *gcsLog) waitSizeNot(ctx context.Context, v int64) (changed bool) {
+	returned := make(chan struct{})
+	defer close(returned)
+	go gl.waitSizeNotAwaitContextOrChange(ctx, returned)
+	gl.mu.Lock()
+	defer gl.mu.Unlock()
+	for {
+		if curSize := gl.sumSizeLocked(); curSize != v {
+			log.Printf("waitSize fired. from %d => %d", v, curSize)
+			return true
+		}
+		select {
+		case <-ctx.Done():
+			return false
+		default:
+			gl.cond.Wait()
+		}
+	}
+}
+
+// waitSizeNotAwaitContextOrChange is part of waitSizeNot.
+// It's a goroutine that selects on two channels and calls
+// sync.Cond.Broadcast to wake up the waitSizeNot waiter if the
+// context expires.
+func (gl *gcsLog) waitSizeNotAwaitContextOrChange(ctx context.Context, returned <-chan struct{}) {
+	select {
+	case <-ctx.Done():
+		gl.cond.Broadcast()
+	case <-returned:
+		// No need to do a wakeup. Caller is already gone.
+	}
+}
+
+func (gl *gcsLog) sumSizeLocked() int64 {
+	var sum int64
+	for n, seg := range gl.seg {
+		if n != gl.curNum {
+			sum += seg.size
+		}
+	}
+	sum += int64(gl.logBuf.Len())
+	return sum
+}
+
 func (gl *gcsLog) getJSONLogs(startSeg int) (segs []maintner.LogSegmentJSON) {
 	gl.mu.Lock()
 	defer gl.mu.Unlock()
@@ -272,6 +360,7 @@
 	if err := reclog.WriteRecord(gcsLogWriter{gl}, int64(gl.logBuf.Len()), data); err != nil {
 		return err
 	}
+	gl.cond.Broadcast() // wake any long-polling subscribers
 
 	// Otherwise schedule a periodic flush.
 	if gl.flushTimer == nil {
diff --git a/maintner/maintnerd/gcslog_test.go b/maintner/maintnerd/gcslog_test.go
new file mode 100644
index 0000000..209ca61
--- /dev/null
+++ b/maintner/maintnerd/gcslog_test.go
@@ -0,0 +1,44 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"golang.org/x/build/maintner/maintpb"
+)
+
+func TestGCSLogWakeup_Timeout(t *testing.T) {
+	testGCSLogWakeup(t, false)
+}
+
+func TestGCSLogWakeup_Activity(t *testing.T) {
+	testGCSLogWakeup(t, true)
+}
+
+func testGCSLogWakeup(t *testing.T, activity bool) {
+	gl := newGCSLogBase()
+	waitc := make(chan bool, 1)
+	ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+	defer cancel()
+	go func() {
+		waitc <- gl.waitSizeNot(ctx, 0)
+	}()
+	if activity {
+		if err := gl.Log(new(maintpb.Mutation)); err != nil {
+			t.Fatal(err)
+		}
+	}
+	select {
+	case got := <-waitc:
+		if got != activity {
+			t.Errorf("changed = %v; want %v", got, activity)
+		}
+	case <-time.After(2 * time.Second):
+		t.Errorf("timeout")
+	}
+}
diff --git a/maintner/maintnerd/maintnerd.go b/maintner/maintnerd/maintnerd.go
index 90b7ce7..1081ca3 100644
--- a/maintner/maintnerd/maintnerd.go
+++ b/maintner/maintnerd/maintnerd.go
@@ -186,7 +186,19 @@
 			http.NotFound(w, r)
 			return
 		}
-		io.WriteString(w, "<html><body>This is <a href='https://godoc.org/golang.org/x/build/maintner/maintnerd'>maintnerd</a>, the <a href='https://godoc.org/golang.org/x/build/maintner'>maintner</a> server.</body>")
+		io.WriteString(w, `<html>
+<body>
+<p>
+  This is <a href='https://godoc.org/golang.org/x/build/maintner/maintnerd'>maintnerd</a>,
+  the <a href='https://godoc.org/golang.org/x/build/maintner'>maintner</a> server.
+  See the <a href='https://godoc.org/golang.org/x/build/maintner/godata'>godata package</a> for
+  a client.
+</p>
+<ul>
+   <li><a href='/logs'>/logs</a>
+</ul>
+</body></html>
+`)
 	})
 
 	errc := make(chan error)
diff --git a/maintner/netsource.go b/maintner/netsource.go
index af1e717..df91149 100644
--- a/maintner/netsource.go
+++ b/maintner/netsource.go
@@ -47,10 +47,10 @@
 	last []fileSeg
 
 	// Hooks for testing. If nil, unused:
-	testHookGetServerSegments          func(context.Context) ([]LogSegmentJSON, error)
-	testHookWaitForServerSegmentUpdate func(context.Context) error
-	testHookSyncSeg                    func(context.Context, LogSegmentJSON) (fileSeg, error)
-	testHookFilePrefixSum224           func(file string, n int64) string
+	testHookGetServerSegments      func(context.Context, int64) ([]LogSegmentJSON, error)
+	testHookWaitAfterServerDupData func(context.Context) error
+	testHookSyncSeg                func(context.Context, LogSegmentJSON) (fileSeg, error)
+	testHookFilePrefixSum224       func(file string, n int64) string
 }
 
 func (ns *netMutSource) GetMutations(ctx context.Context) <-chan MutationStreamEvent {
@@ -69,60 +69,54 @@
 	return ch
 }
 
-func (ns *netMutSource) waitForServerSegmentUpdate(ctx context.Context) error {
-	if fn := ns.testHookWaitForServerSegmentUpdate; fn != nil {
-		return fn(ctx)
-	}
-
-	// TODO: few second sleep is dumb. make it
-	// subscribe to pubsubhelper? maybe the
-	// server's response header should reference
-	// its pubsubhelper server URL. but then we
-	// can't assume activity means it'll be picked
-	// up right away. so maybe wait for activity,
-	// and then poll every second for 10 seconds
-	// or so, or until there's changes, and then
-	// go back to every 2 second polling or
-	// something. or maybe the maintnerd server should
-	// have its own long poll functionality.
-	// for now, just 2 second polling:
-	select {
-	case <-time.After(2 * time.Second):
-		return nil
-	case <-ctx.Done():
-		return ctx.Err()
-	}
-}
-
-func (ns *netMutSource) getServerSegments(ctx context.Context) ([]LogSegmentJSON, error) {
+// waitSizeNot optionally specifies that the request should long-poll waiting for the server
+// to have a sum of log segment sizes different than the value specified.
+func (ns *netMutSource) getServerSegments(ctx context.Context, waitSizeNot int64) ([]LogSegmentJSON, error) {
 	if fn := ns.testHookGetServerSegments; fn != nil {
-		return fn(ctx)
+		return fn(ctx, waitSizeNot)
 	}
 
-	req, err := http.NewRequest("GET", ns.server, nil)
-	if err != nil {
-		return nil, err
+	logsURL := ns.server
+	if waitSizeNot > 0 {
+		logsURL += fmt.Sprintf("?waitsizenot=%d", waitSizeNot)
 	}
-	req = req.WithContext(ctx)
-	res, err := http.DefaultClient.Do(req)
-	if err != nil {
-		return nil, err
+	for {
+		req, err := http.NewRequest("GET", logsURL, nil)
+		if err != nil {
+			return nil, err
+		}
+		req = req.WithContext(ctx)
+		res, err := http.DefaultClient.Do(req)
+		if err != nil {
+			return nil, err
+		}
+		// If we're doing a long poll and the server replies
+		// with a 304 response, that means the server is just
+		// heart-beating us and trying to get a response back
+		// within its various deadlines. But we should just
+		// try again.
+		if waitSizeNot > 0 && res.StatusCode == http.StatusNotModified {
+			res.Body.Close()
+			continue
+		}
+		defer res.Body.Close()
+		if res.StatusCode != 200 {
+			return nil, fmt.Errorf("%s: %v", ns.server, res.Status)
+		}
+		var segs []LogSegmentJSON
+		err = json.NewDecoder(res.Body).Decode(&segs)
+		if err != nil {
+			return nil, fmt.Errorf("decoding %s JSON: %v", ns.server, err)
+		}
+		return segs, nil
 	}
-	defer res.Body.Close()
-	if res.StatusCode != 200 {
-		return nil, fmt.Errorf("%s: %v", ns.server, res.Status)
-	}
-	var segs []LogSegmentJSON
-	err = json.NewDecoder(res.Body).Decode(&segs)
-	if err != nil {
-		return nil, fmt.Errorf("decoding %s JSON: %v", ns.server, err)
-	}
-	return segs, nil
 }
 
 func (ns *netMutSource) getNewSegments(ctx context.Context) ([]fileSeg, error) {
 	for {
-		segs, err := ns.getServerSegments(ctx)
+		sumLast := sumSegSize(ns.last)
+
+		segs, err := ns.getServerSegments(ctx, sumLast)
 		if err != nil {
 			return nil, err
 		}
@@ -137,16 +131,25 @@
 			}
 			fileSegs = append(fileSegs, fileSeg)
 		}
-		sumLast := sumSegSize(ns.last)
 		sumCommon := ns.sumCommonPrefixSize(fileSegs, ns.last)
 		if sumLast != sumCommon {
 			return nil, ErrSplit
 		}
 		sumCur := sumSegSize(fileSegs)
 		if sumCommon == sumCur {
-			// Nothing new. Wait.
-			if err := ns.waitForServerSegmentUpdate(ctx); err != nil {
-				return nil, err
+			// Nothing new. This shouldn't happen once the
+			// server is updated to respect the
+			// "?waitsizenot=NNN" long polling parameter.
+			// But keep this brief pause as a backup to
+			// prevent spinning and because clients &
+			// servers won't be updated simultaneously.
+			if ns.testHookGetServerSegments == nil {
+				log.Printf("maintner.netsource: server returned unchanged log segments; old server?")
+			}
+			select {
+			case <-ctx.Done():
+				return nil, ctx.Err()
+			case <-time.After(1 * time.Second):
 			}
 			continue
 		}
diff --git a/maintner/netsource_test.go b/maintner/netsource_test.go
index 7ac896b..2920cc8 100644
--- a/maintner/netsource_test.go
+++ b/maintner/netsource_test.go
@@ -155,7 +155,6 @@
 		// If empty, prefixSum calls are errors.
 		prefixSum string
 
-		wantWaits int
 		want      []fileSeg
 		wantSplit bool
 	}
@@ -240,7 +239,6 @@
 					{Number: 2, Size: 102, SHA224: "def"},
 				},
 			},
-			wantWaits: 1,
 			want: []fileSeg{
 				{seg: 2, size: 102, sha224: "def", skip: 0, file: "/fake/0002.mutlog"},
 			},
@@ -305,7 +303,7 @@
 			waits := 0
 			ns := &netMutSource{
 				last: tt.lastSegs,
-				testHookGetServerSegments: func(context.Context) (segs []LogSegmentJSON, err error) {
+				testHookGetServerSegments: func(_ context.Context, waitSizeNot int64) (segs []LogSegmentJSON, err error) {
 					serverSegCalls++
 					if serverSegCalls > 10 {
 						t.Fatalf("infinite loop calling getServerSegments? num wait calls = %v", waits)
@@ -319,10 +317,6 @@
 					}
 					return segs, nil
 				},
-				testHookWaitForServerSegmentUpdate: func(context.Context) error {
-					waits++
-					return nil
-				},
 				testHookSyncSeg: func(_ context.Context, seg LogSegmentJSON) (fileSeg, error) {
 					return fileSeg{
 						seg:    seg.Number,
@@ -353,10 +347,6 @@
 			if !reflect.DeepEqual(got, tt.want) {
 				t.Errorf("mismatch\n got: %+v\nwant: %+v\n", got, tt.want)
 			}
-			if tt.wantWaits != waits {
-				t.Errorf("wait calls = %v; want %v", waits, tt.wantWaits)
-			}
-
 		})
 	}
 }