go.talks/pkg/socket: consolidate writes that occur within a small window
R=golang-dev, kamil.kisiel
CC=golang-dev
https://golang.org/cl/12311043
diff --git a/pkg/socket/socket.go b/pkg/socket/socket.go
index 42a351f..e24d21c 100644
--- a/pkg/socket/socket.go
+++ b/pkg/socket/socket.go
@@ -28,6 +28,8 @@
"runtime"
"strconv"
"strings"
+ "sync"
+ "time"
"unicode/utf8"
"code.google.com/p/go.net/websocket"
@@ -40,7 +42,13 @@
// invoked.
var Environ func() []string = os.Environ
-const msgLimit = 1000 // max number of messages to send per session
+const (
+ // The maximum number of messages to send per session (avoid flooding).
+ msgLimit = 1000
+
+ // Batch messages sent in this interval and send as a single message.
+ msgDelay = 10 * time.Millisecond
+)
// Message is the wire format for the websocket connection to the browser.
// It is used for both sending output messages and receiving commands, as
@@ -223,7 +231,8 @@
if err != nil {
m.Body = err.Error()
}
- p.out <- m
+ // Wait for any outstanding reads to finish (potential race here).
+ time.AfterFunc(msgDelay, func() { p.out <- m })
}
// cmd builds an *exec.Cmd that writes its standard output and error to the
@@ -232,8 +241,8 @@
cmd := exec.Command(args[0], args[1:]...)
cmd.Dir = dir
cmd.Env = Environ()
- cmd.Stdout = &messageWriter{p.id, "stdout", p.out}
- cmd.Stderr = &messageWriter{p.id, "stderr", p.out}
+ cmd.Stdout = &messageWriter{id: p.id, kind: "stdout", out: p.out}
+ cmd.Stderr = &messageWriter{id: p.id, kind: "stderr", out: p.out}
return cmd
}
@@ -251,13 +260,31 @@
type messageWriter struct {
id, kind string
out chan<- *Message
+
+ mu sync.Mutex
+ buf []byte
+ send *time.Timer
}
func (w *messageWriter) Write(b []byte) (n int, err error) {
- w.out <- &Message{Id: w.id, Kind: w.kind, Body: safeString(b)}
+ // Buffer writes that occur in a short period to send as one Message.
+ w.mu.Lock()
+ w.buf = append(w.buf, b...)
+ if w.send == nil {
+ w.send = time.AfterFunc(msgDelay, w.sendNow)
+ }
+ w.mu.Unlock()
return len(b), nil
}
+func (w *messageWriter) sendNow() {
+ w.mu.Lock()
+ body := safeString(w.buf)
+ w.buf, w.send = nil, nil
+ w.mu.Unlock()
+ w.out <- &Message{Id: w.id, Kind: w.kind, Body: body}
+}
+
// safeString returns b as a valid UTF-8 string.
func safeString(b []byte) string {
if utf8.Valid(b) {