netchan: do not block sends; implement flow control.
When data is received for a channel, but that channel
is not ready to receive it, the central run() loop
is currently blocked, but this can lead to deadlock
and interference between independent channels.
This CL adds an explicit buffer size to netchan
channels (an API change) - the sender will not
send values until the buffer is non empty.
The protocol changes to send ids rather than channel names
because acks can still be sent after a channel is hung up,
we we need an identifier that can be ignored.
R=r, rsc
CC=golang-dev
https://golang.org/cl/2447042
diff --git a/src/pkg/netchan/netchan_test.go b/src/pkg/netchan/netchan_test.go
index 766c4c4..2134297 100644
--- a/src/pkg/netchan/netchan_test.go
+++ b/src/pkg/netchan/netchan_test.go
@@ -15,7 +15,7 @@
const base = 23
-func exportSend(exp *Exporter, n int, t *testing.T) {
+func exportSend(exp *Exporter, n int, t *testing.T, done chan bool) {
ch := make(chan int)
err := exp.Export("exportedSend", ch, Send)
if err != nil {
@@ -26,6 +26,9 @@
ch <- base+i
}
close(ch)
+ if done != nil {
+ done <- true
+ }
}()
}
@@ -50,9 +53,9 @@
}
}
-func importSend(imp *Importer, n int, t *testing.T) {
+func importSend(imp *Importer, n int, t *testing.T, done chan bool) {
ch := make(chan int)
- err := imp.ImportNValues("exportedRecv", ch, Send, count)
+ err := imp.ImportNValues("exportedRecv", ch, Send, 3, -1)
if err != nil {
t.Fatal("importSend:", err)
}
@@ -61,12 +64,15 @@
ch <- base+i
}
close(ch)
+ if done != nil {
+ done <- true
+ }
}()
}
func importReceive(imp *Importer, t *testing.T, done chan bool) {
ch := make(chan int)
- err := imp.ImportNValues("exportedSend", ch, Recv, count)
+ err := imp.ImportNValues("exportedSend", ch, Recv, 3, count)
if err != nil {
t.Fatal("importReceive:", err)
}
@@ -78,7 +84,7 @@
}
break
}
- if v != 23+i {
+ if v != base+i {
t.Errorf("importReceive: bad value: expected %d+%d=%d; got %+d", base, i, base+i, v)
}
}
@@ -96,7 +102,7 @@
if err != nil {
t.Fatal("new importer:", err)
}
- exportSend(exp, count, t)
+ exportSend(exp, count, t, nil)
importReceive(imp, t, nil)
}
@@ -116,7 +122,7 @@
done <- true
}()
<-expDone
- importSend(imp, count, t)
+ importSend(imp, count, t, nil)
<-done
}
@@ -129,7 +135,7 @@
if err != nil {
t.Fatal("new importer:", err)
}
- exportSend(exp, closeCount, t)
+ exportSend(exp, closeCount, t, nil)
importReceive(imp, t, nil)
}
@@ -149,7 +155,7 @@
done <- true
}()
<-expDone
- importSend(imp, closeCount, t)
+ importSend(imp, closeCount, t, nil)
<-done
}
@@ -172,7 +178,7 @@
close(ch)
// Now try to import a different channel.
ch = make(chan int)
- err = imp.Import("notAChannel", ch, Recv)
+ err = imp.Import("notAChannel", ch, Recv, 1)
if err != nil {
t.Fatal("import:", err)
}
@@ -204,7 +210,7 @@
}
done := make(chan bool)
go func() {
- exportSend(exp, closeCount, t)
+ exportSend(exp, closeCount, t, nil)
done <- true
}()
<-done
@@ -224,7 +230,7 @@
t.Fatal("new importer:", err)
}
done := make(chan bool)
- exportSend(exp, closeCount, t)
+ exportSend(exp, closeCount, t, nil)
go importReceive(imp, t, done)
exp.Sync(0)
<-done
@@ -248,7 +254,7 @@
}
// Prepare to receive two values. We'll actually deliver only one.
ich := make(chan int)
- err = imp.ImportNValues("exportedSend", ich, Recv, 2)
+ err = imp.ImportNValues("exportedSend", ich, Recv, 1, 2)
if err != nil {
t.Fatal("import exportedSend:", err)
}
@@ -285,7 +291,7 @@
}
// Prepare to Send two values. We'll actually deliver only one.
ich := make(chan int)
- err = imp.ImportNValues("exportedRecv", ich, Send, 2)
+ err = imp.ImportNValues("exportedRecv", ich, Send, 1, 2)
if err != nil {
t.Fatal("import exportedRecv:", err)
}
@@ -304,10 +310,70 @@
}
}
+// loop back exportedRecv to exportedSend,
+// but receive a value from ctlch before starting the loop.
+func exportLoopback(exp *Exporter, t *testing.T) {
+ inch := make(chan int)
+ if err := exp.Export("exportedRecv", inch, Recv); err != nil {
+ t.Fatal("exportRecv")
+ }
+
+ outch := make(chan int)
+ if err := exp.Export("exportedSend", outch, Send); err != nil {
+ t.Fatal("exportSend")
+ }
+
+ ctlch := make(chan int)
+ if err := exp.Export("exportedCtl", ctlch, Recv); err != nil {
+ t.Fatal("exportRecv")
+ }
+
+ go func() {
+ <-ctlch
+ for i := 0; i < count; i++ {
+ x := <-inch
+ if x != base+i {
+ t.Errorf("exportLoopback expected %d; got %d", i, x)
+ }
+ outch <- x
+ }
+ }()
+}
+
+// This test checks that channel operations can proceed
+// even when other concurrent operations are blocked.
+func TestIndependentSends(t *testing.T) {
+ exp, err := NewExporter("tcp", "127.0.0.1:0")
+ if err != nil {
+ t.Fatal("new exporter:", err)
+ }
+ imp, err := NewImporter("tcp", exp.Addr().String())
+ if err != nil {
+ t.Fatal("new importer:", err)
+ }
+
+ exportLoopback(exp, t)
+
+ importSend(imp, count, t, nil)
+ done := make(chan bool)
+ go importReceive(imp, t, done)
+
+ // wait for export side to try to deliver some values.
+ time.Sleep(0.25e9)
+
+ ctlch := make(chan int)
+ if err := imp.ImportNValues("exportedCtl", ctlch, Send, 1, 1); err != nil {
+ t.Fatal("importSend:", err)
+ }
+ ctlch <- 0
+
+ <-done
+}
+
// This test cross-connects a pair of exporter/importer pairs.
type value struct {
- i int
- source string
+ I int
+ Source string
}
func TestCrossConnect(t *testing.T) {
@@ -353,13 +419,13 @@
// Import side of cross-traffic.
func crossImport(i1, i2 *Importer, t *testing.T) {
s := make(chan value)
- err := i2.Import("exportedReceive", s, Send)
+ err := i2.Import("exportedReceive", s, Send, 2)
if err != nil {
t.Fatal("import of exportedReceive:", err)
}
r := make(chan value)
- err = i1.Import("exportedSend", r, Recv)
+ err = i1.Import("exportedSend", r, Recv, 2)
if err != nil {
t.Fatal("import of exported Send:", err)
}
@@ -374,10 +440,76 @@
case s <- value{si, name}:
si++
case v := <-r:
- if v.i != ri {
+ if v.I != ri {
t.Errorf("loop: bad value: expected %d, hello; got %+v", ri, v)
}
ri++
}
}
}
+
+const flowCount = 100
+
+// test flow control from exporter to importer.
+func TestExportFlowControl(t *testing.T) {
+ exp, err := NewExporter("tcp", "127.0.0.1:0")
+ if err != nil {
+ t.Fatal("new exporter:", err)
+ }
+ imp, err := NewImporter("tcp", exp.Addr().String())
+ if err != nil {
+ t.Fatal("new importer:", err)
+ }
+
+ sendDone := make(chan bool, 1)
+ exportSend(exp, flowCount, t, sendDone)
+
+ ch := make(chan int)
+ err = imp.ImportNValues("exportedSend", ch, Recv, 20, -1)
+ if err != nil {
+ t.Fatal("importReceive:", err)
+ }
+
+ testFlow(sendDone, ch, flowCount, t)
+}
+
+// test flow control from importer to exporter.
+func TestImportFlowControl(t *testing.T) {
+ exp, err := NewExporter("tcp", "127.0.0.1:0")
+ if err != nil {
+ t.Fatal("new exporter:", err)
+ }
+ imp, err := NewImporter("tcp", exp.Addr().String())
+ if err != nil {
+ t.Fatal("new importer:", err)
+ }
+
+ ch := make(chan int)
+ err = exp.Export("exportedRecv", ch, Recv)
+ if err != nil {
+ t.Fatal("importReceive:", err)
+ }
+
+ sendDone := make(chan bool, 1)
+ importSend(imp, flowCount, t, sendDone)
+ testFlow(sendDone, ch, flowCount, t)
+}
+
+func testFlow(sendDone chan bool, ch <-chan int, N int, t *testing.T) {
+ go func() {
+ time.Sleep(1e9)
+ sendDone <- false
+ }()
+
+ if <-sendDone {
+ t.Fatal("send did not block")
+ }
+ n := 0
+ for i := range ch {
+ t.Log("after blocking, got value ", i)
+ n++
+ }
+ if n != N {
+ t.Fatalf("expected %d values; got %d", N, n)
+ }
+}