io: revised Pipe implementation

* renamed channels to say what gets sent
* use channel closed status instead of racy check of boolean

R=nigeltao_golang
CC=golang-dev
https://golang.org/cl/196065
diff --git a/src/pkg/io/pipe.go b/src/pkg/io/pipe.go
index 909989a..fe70634 100644
--- a/src/pkg/io/pipe.go
+++ b/src/pkg/io/pipe.go
@@ -2,8 +2,8 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-// Pipe adapter to connect code expecting an io.Read
-// with code expecting an io.Write.
+// Pipe adapter to connect code expecting an io.Reader
+// with code expecting an io.Writer.
 
 package io
 
@@ -12,54 +12,43 @@
 	"sync"
 )
 
-type pipeReturn struct {
-	n   int
-	err os.Error
-}
-
 // Shared pipe structure.
 type pipe struct {
-	rclosed bool            // Read end closed?
-	rerr    os.Error        // Error supplied to CloseReader
-	wclosed bool            // Write end closed?
-	werr    os.Error        // Error supplied to CloseWriter
-	wpend   []byte          // Written data waiting to be read.
-	wtot    int             // Bytes consumed so far in current write.
-	cr      chan []byte     // Write sends data here...
-	cw      chan pipeReturn // ... and reads the n, err back from here.
+	rclosed bool        // Read end closed?
+	rerr    os.Error    // Error supplied to CloseReader
+	wclosed bool        // Write end closed?
+	werr    os.Error    // Error supplied to CloseWriter
+	wpend   []byte      // Written data waiting to be read.
+	wtot    int         // Bytes consumed so far in current write.
+	cw      chan []byte // Write sends data here...
+	cr      chan bool   // ... and reads a done notification from here.
 }
 
 func (p *pipe) Read(data []byte) (n int, err os.Error) {
-	if p == nil || p.rclosed {
+	if p.rclosed {
 		return 0, os.EINVAL
 	}
 
 	// Wait for next write block if necessary.
 	if p.wpend == nil {
-		if !p.wclosed {
-			p.wpend = <-p.cr
+		if !closed(p.cw) {
+			p.wpend = <-p.cw
 		}
-		if p.wclosed {
+		if closed(p.cw) {
 			return 0, p.werr
 		}
 		p.wtot = 0
 	}
 
 	// Read from current write block.
-	n = len(data)
-	if n > len(p.wpend) {
-		n = len(p.wpend)
-	}
-	for i := 0; i < n; i++ {
-		data[i] = p.wpend[i]
-	}
+	n = copy(data, p.wpend)
 	p.wtot += n
 	p.wpend = p.wpend[n:]
 
 	// If write block is done, finish the write.
 	if len(p.wpend) == 0 {
 		p.wpend = nil
-		p.cw <- pipeReturn{p.wtot, nil}
+		p.cr <- true
 		p.wtot = 0
 	}
 
@@ -67,58 +56,52 @@
 }
 
 func (p *pipe) Write(data []byte) (n int, err os.Error) {
-	if p == nil || p.wclosed {
+	if p.wclosed {
 		return 0, os.EINVAL
 	}
-	if p.rclosed {
+	if closed(p.cr) {
 		return 0, p.rerr
 	}
 
-	// Send data to reader.
-	p.cr <- data
+	// Send write to reader.
+	p.cw <- data
 
 	// Wait for reader to finish copying it.
-	res := <-p.cw
-	return res.n, res.err
+	<-p.cr
+	if closed(p.cr) {
+		_, _ = <-p.cw // undo send if reader is gone
+		return 0, p.rerr
+	}
+	return len(data), nil
 }
 
 func (p *pipe) CloseReader(rerr os.Error) os.Error {
-	if p == nil || p.rclosed {
+	if p.rclosed {
 		return os.EINVAL
 	}
-
-	// Stop any future writes.
 	p.rclosed = true
+
+	// Wake up writes.
 	if rerr == nil {
 		rerr = os.EPIPE
 	}
 	p.rerr = rerr
-
-	// Stop the current write.
-	if !p.wclosed {
-		p.cw <- pipeReturn{p.wtot, rerr}
-	}
-
+	close(p.cr)
 	return nil
 }
 
 func (p *pipe) CloseWriter(werr os.Error) os.Error {
+	if p.wclosed {
+		return os.EINVAL
+	}
+	p.wclosed = true
+
+	// Wake up reads.
 	if werr == nil {
 		werr = os.EOF
 	}
-	if p == nil || p.wclosed {
-		return os.EINVAL
-	}
-
-	// Stop any future reads.
-	p.wclosed = true
 	p.werr = werr
-
-	// Stop the current read.
-	if !p.rclosed {
-		p.cr <- nil
-	}
-
+	close(p.cw)
 	return nil
 }
 
@@ -212,12 +195,9 @@
 // Reads on one end are matched with writes on the other,
 // copying data directly between the two; there is no internal buffering.
 func Pipe() (*PipeReader, *PipeWriter) {
-	p := new(pipe)
-	p.cr = make(chan []byte, 1)
-	p.cw = make(chan pipeReturn, 1)
-	r := new(PipeReader)
-	r.p = p
-	w := new(PipeWriter)
-	w.p = p
-	return r, w
+	p := &pipe{
+		cw: make(chan []byte, 1),
+		cr: make(chan bool, 1),
+	}
+	return &PipeReader{p: p}, &PipeWriter{p: p}
 }