cmd/coordinator: stream build log output
We no longer need to scrub the logs for the builder key since we don't
ever send the key to the buildlets.
Change-Id: If1017bd1927fa7ecb9d912eebe1f3cce2271b1a8
Reviewed-on: https://go-review.googlesource.com/6426
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index 24471f5..71c74a5 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -502,11 +502,21 @@
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
writeStatusHeader(w, st)
- io.WriteString(w, st.logs())
- // TODO: if st is still building, stream them to the user with
- // http.Flusher.Flush and CloseNotifier and registering interest
- // of new writes with the buildStatus. Will require moving the
- // BUILDERKEY scrubbing into the Write method.
+ logs := st.watchLogs()
+ defer st.unregisterWatcher(logs)
+ closed := w.(http.CloseNotifier).CloseNotify()
+ for {
+ select {
+ case b, ok := <-logs:
+ if !ok {
+ return
+ }
+ w.Write(b)
+ w.(http.Flusher).Flush()
+ case <-closed:
+ return
+ }
+ }
}
func handleDebugGoroutines(w http.ResponseWriter, r *http.Request) {
@@ -1132,6 +1142,7 @@
succeeded bool // set when done
output bytes.Buffer // stdout and stderr
events []eventAndTime
+ watcher []*logWatcher
}
func (st *buildStatus) setDone(succeeded bool) {
@@ -1139,6 +1150,7 @@
defer st.mu.Unlock()
st.succeeded = succeeded
st.done = time.Now()
+ st.notifyWatchersLocked(true)
}
func (st *buildStatus) isRunning() bool {
@@ -1223,10 +1235,8 @@
func (st *buildStatus) logs() string {
st.mu.Lock()
- logs := st.output.String()
- st.mu.Unlock()
- key := builderKey(st.name)
- return strings.Replace(string(logs), key, "BUILDERKEY", -1)
+ defer st.mu.Unlock()
+ return st.output.String()
}
func (st *buildStatus) Write(p []byte) (n int, err error) {
@@ -1238,9 +1248,71 @@
p = p[:maxBufferSize-st.output.Len()]
}
st.output.Write(p) // bytes.Buffer can't fail
+ st.notifyWatchersLocked(false)
return plen, nil
}
+// logWatcher holds the state of a client watching the logs of a running build.
+type logWatcher struct {
+ ch chan []byte
+ offset int // Offset of seen logs (offset == len(buf) means "up to date")
+}
+
+// watchLogs returns a channel on which the build's logs is sent.
+// When the build is complete the channel is closed.
+func (st *buildStatus) watchLogs() <-chan []byte {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+
+ ch := make(chan []byte, 10) // room for a few log writes
+ ch <- st.output.Bytes()
+ if !st.isRunning() {
+ close(ch)
+ return ch
+ }
+
+ st.watcher = append(st.watcher, &logWatcher{
+ ch: ch,
+ offset: st.output.Len(),
+ })
+ return ch
+}
+
+// unregisterWatcher removes the provided channel from the list of watchers,
+// so that it receives no further log data.
+func (st *buildStatus) unregisterWatcher(ch <-chan []byte) {
+ st.mu.Lock()
+ defer st.mu.Unlock()
+
+ for i, w := range st.watcher {
+ if w.ch == ch {
+ st.watcher = append(st.watcher[:i], st.watcher[i+1:]...)
+ break
+ }
+ }
+}
+
+// notifyWatchersLocked pushes any new log data to watching clients.
+// If done is true it closes any watcher channels.
+//
+// NOTE: st.mu must be held.
+func (st *buildStatus) notifyWatchersLocked(done bool) {
+ l := st.output.Len()
+ for _, w := range st.watcher {
+ if w.offset < l {
+ select {
+ case w.ch <- st.output.Bytes()[w.offset:]:
+ w.offset = l
+ default:
+ // If the receiver isn't ready, drop the write.
+ }
+ }
+ if done {
+ close(w.ch)
+ }
+ }
+}
+
// Stop any previous go-commit-watcher Docker tasks, so they don't
// pile up upon restarts of the coordinator.
func stopWatchers() {