cmd/coordinator: stream build log output

We no longer need to scrub the logs for the builder key since we don't
ever send the key to the buildlets.

Change-Id: If1017bd1927fa7ecb9d912eebe1f3cce2271b1a8
Reviewed-on: https://go-review.googlesource.com/6426
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index 24471f5..71c74a5 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -502,11 +502,21 @@
 	w.Header().Set("Content-Type", "text/plain; charset=utf-8")
 	writeStatusHeader(w, st)
 
-	io.WriteString(w, st.logs())
-	// TODO: if st is still building, stream them to the user with
-	// http.Flusher.Flush and CloseNotifier and registering interest
-	// of new writes with the buildStatus. Will require moving the
-	// BUILDERKEY scrubbing into the Write method.
+	logs := st.watchLogs()
+	defer st.unregisterWatcher(logs)
+	closed := w.(http.CloseNotifier).CloseNotify()
+	for {
+		select {
+		case b, ok := <-logs:
+			if !ok {
+				return
+			}
+			w.Write(b)
+			w.(http.Flusher).Flush()
+		case <-closed:
+			return
+		}
+	}
 }
 
 func handleDebugGoroutines(w http.ResponseWriter, r *http.Request) {
@@ -1132,6 +1142,7 @@
 	succeeded bool         // set when done
 	output    bytes.Buffer // stdout and stderr
 	events    []eventAndTime
+	watcher   []*logWatcher
 }
 
 func (st *buildStatus) setDone(succeeded bool) {
@@ -1139,6 +1150,7 @@
 	defer st.mu.Unlock()
 	st.succeeded = succeeded
 	st.done = time.Now()
+	st.notifyWatchersLocked(true)
 }
 
 func (st *buildStatus) isRunning() bool {
@@ -1223,10 +1235,8 @@
 
 func (st *buildStatus) logs() string {
 	st.mu.Lock()
-	logs := st.output.String()
-	st.mu.Unlock()
-	key := builderKey(st.name)
-	return strings.Replace(string(logs), key, "BUILDERKEY", -1)
+	defer st.mu.Unlock()
+	return st.output.String()
 }
 
 func (st *buildStatus) Write(p []byte) (n int, err error) {
@@ -1238,9 +1248,71 @@
 		p = p[:maxBufferSize-st.output.Len()]
 	}
 	st.output.Write(p) // bytes.Buffer can't fail
+	st.notifyWatchersLocked(false)
 	return plen, nil
 }
 
+// logWatcher holds the state of a client watching the logs of a running build.
+type logWatcher struct {
+	ch     chan []byte
+	offset int // Offset of seen logs (offset == len(buf) means "up to date")
+}
+
+// watchLogs returns a channel on which the build's logs is sent.
+// When the build is complete the channel is closed.
+func (st *buildStatus) watchLogs() <-chan []byte {
+	st.mu.Lock()
+	defer st.mu.Unlock()
+
+	ch := make(chan []byte, 10) // room for a few log writes
+	ch <- st.output.Bytes()
+	if !st.isRunning() {
+		close(ch)
+		return ch
+	}
+
+	st.watcher = append(st.watcher, &logWatcher{
+		ch:     ch,
+		offset: st.output.Len(),
+	})
+	return ch
+}
+
+// unregisterWatcher removes the provided channel from the list of watchers,
+// so that it receives no further log data.
+func (st *buildStatus) unregisterWatcher(ch <-chan []byte) {
+	st.mu.Lock()
+	defer st.mu.Unlock()
+
+	for i, w := range st.watcher {
+		if w.ch == ch {
+			st.watcher = append(st.watcher[:i], st.watcher[i+1:]...)
+			break
+		}
+	}
+}
+
+// notifyWatchersLocked pushes any new log data to watching clients.
+// If done is true it closes any watcher channels.
+//
+// NOTE: st.mu must be held.
+func (st *buildStatus) notifyWatchersLocked(done bool) {
+	l := st.output.Len()
+	for _, w := range st.watcher {
+		if w.offset < l {
+			select {
+			case w.ch <- st.output.Bytes()[w.offset:]:
+				w.offset = l
+			default:
+				// If the receiver isn't ready, drop the write.
+			}
+		}
+		if done {
+			close(w.ch)
+		}
+	}
+}
+
 // Stop any previous go-commit-watcher Docker tasks, so they don't
 // pile up upon restarts of the coordinator.
 func stopWatchers() {