pipe: implementation #3; this time for sure!
Added goroutine; got simpler.

Fixes deadlock when doing Read+Close
or Write+Close on same end.

R=r, cw
CC=golang-dev
https://golang.org/cl/994043
diff --git a/src/pkg/io/pipe.go b/src/pkg/io/pipe.go
index fe70634..79221bd 100644
--- a/src/pkg/io/pipe.go
+++ b/src/pkg/io/pipe.go
@@ -9,114 +9,190 @@
 
 import (
 	"os"
+	"runtime"
 	"sync"
 )
 
+type pipeResult struct {
+	n   int
+	err os.Error
+}
+
 // Shared pipe structure.
 type pipe struct {
-	rclosed bool        // Read end closed?
-	rerr    os.Error    // Error supplied to CloseReader
-	wclosed bool        // Write end closed?
-	werr    os.Error    // Error supplied to CloseWriter
-	wpend   []byte      // Written data waiting to be read.
-	wtot    int         // Bytes consumed so far in current write.
-	cw      chan []byte // Write sends data here...
-	cr      chan bool   // ... and reads a done notification from here.
+	// Reader sends on cr1, receives on cr2.
+	// Writer does the same on cw1, cw2.
+	r1, w1 chan []byte
+	r2, w2 chan pipeResult
+
+	rclose chan os.Error // read close; error to return to writers
+	wclose chan os.Error // write close; error to return to readers
+
+	done chan int // read or write half is done
 }
 
-func (p *pipe) Read(data []byte) (n int, err os.Error) {
-	if p.rclosed {
-		return 0, os.EINVAL
-	}
+func (p *pipe) run() {
+	var (
+		rb    []byte      // pending Read
+		wb    []byte      // pending Write
+		wn    int         // amount written so far from wb
+		rerr  os.Error    // if read end is closed, error to send to writers
+		werr  os.Error    // if write end is closed, error to send to readers
+		r1    chan []byte // p.cr1 or nil depending on whether Read is ok
+		w1    chan []byte // p.cw1 or nil depending on whether Write is ok
+		ndone int
+	)
 
-	// Wait for next write block if necessary.
-	if p.wpend == nil {
-		if !closed(p.cw) {
-			p.wpend = <-p.cw
+	// Read and Write are enabled at the start.
+	r1 = p.r1
+	w1 = p.w1
+
+	for {
+		select {
+		case <-p.done:
+			if ndone++; ndone == 2 {
+				// both reader and writer are gone
+				return
+			}
+			continue
+		case rerr = <-p.rclose:
+			if w1 == nil {
+				// finish pending Write
+				p.w2 <- pipeResult{wn, rerr}
+				wn = 0
+				w1 = p.w1 // allow another Write
+			}
+			if r1 == nil {
+				// Close of read side during Read.
+				// finish pending Read with os.EINVAL.
+				p.r2 <- pipeResult{0, os.EINVAL}
+				r1 = p.r1 // allow another Read
+			}
+			continue
+		case werr = <-p.wclose:
+			if r1 == nil {
+				// finish pending Read
+				p.r2 <- pipeResult{0, werr}
+				r1 = p.r1 // allow another Read
+			}
+			if w1 == nil {
+				// Close of write side during Write.
+				// finish pending Write with os.EINVAL.
+				p.w2 <- pipeResult{wn, os.EINVAL}
+				wn = 0
+				w1 = p.w1 // allow another Write
+			}
+			continue
+		case rb = <-r1:
+			if werr != nil {
+				// write end is closed
+				p.r2 <- pipeResult{0, werr}
+				continue
+			}
+			r1 = nil // disable Read until this one is done
+		case wb = <-w1:
+			if rerr != nil {
+				// read end is closed
+				p.w2 <- pipeResult{0, rerr}
+				continue
+			}
+			w1 = nil // disable Write until this one is done
 		}
-		if closed(p.cw) {
-			return 0, p.werr
+
+		if r1 == nil && w1 == nil {
+			// Have rb and wb.  Execute.
+			n := copy(rb, wb)
+			wn += n
+			wb = wb[n:]
+
+			// Finish Read.
+			p.r2 <- pipeResult{n, nil}
+			r1 = p.r1 // allow another Read
+
+			// Maybe finish Write.
+			if len(wb) == 0 {
+				p.w2 <- pipeResult{wn, nil}
+				wn = 0
+				w1 = p.w1 // allow another Write
+			}
 		}
-		p.wtot = 0
 	}
-
-	// Read from current write block.
-	n = copy(data, p.wpend)
-	p.wtot += n
-	p.wpend = p.wpend[n:]
-
-	// If write block is done, finish the write.
-	if len(p.wpend) == 0 {
-		p.wpend = nil
-		p.cr <- true
-		p.wtot = 0
-	}
-
-	return n, nil
-}
-
-func (p *pipe) Write(data []byte) (n int, err os.Error) {
-	if p.wclosed {
-		return 0, os.EINVAL
-	}
-	if closed(p.cr) {
-		return 0, p.rerr
-	}
-
-	// Send write to reader.
-	p.cw <- data
-
-	// Wait for reader to finish copying it.
-	<-p.cr
-	if closed(p.cr) {
-		_, _ = <-p.cw // undo send if reader is gone
-		return 0, p.rerr
-	}
-	return len(data), nil
-}
-
-func (p *pipe) CloseReader(rerr os.Error) os.Error {
-	if p.rclosed {
-		return os.EINVAL
-	}
-	p.rclosed = true
-
-	// Wake up writes.
-	if rerr == nil {
-		rerr = os.EPIPE
-	}
-	p.rerr = rerr
-	close(p.cr)
-	return nil
-}
-
-func (p *pipe) CloseWriter(werr os.Error) os.Error {
-	if p.wclosed {
-		return os.EINVAL
-	}
-	p.wclosed = true
-
-	// Wake up reads.
-	if werr == nil {
-		werr = os.EOF
-	}
-	p.werr = werr
-	close(p.cw)
-	return nil
 }
 
 // Read/write halves of the pipe.
 // They are separate structures for two reasons:
 //  1.  If one end becomes garbage without being Closed,
-//      its finisher can Close so that the other end
+//      its finalizer can Close so that the other end
 //      does not hang indefinitely.
 //  2.  Clients cannot use interface conversions on the
 //      read end to find the Write method, and vice versa.
 
+type pipeHalf struct {
+	c1     chan []byte
+	c2     chan pipeResult
+	cclose chan os.Error
+	done   chan int
+
+	lock   sync.Mutex
+	closed bool
+
+	io       sync.Mutex
+	ioclosed bool
+}
+
+func (p *pipeHalf) rw(data []byte) (n int, err os.Error) {
+	// Run i/o operation.
+	// Check ioclosed flag under lock to make sure we're still allowed to do i/o.
+	p.io.Lock()
+	defer p.io.Unlock()
+	if p.ioclosed {
+		return 0, os.EINVAL
+	}
+	p.c1 <- data
+	res := <-p.c2
+	return res.n, res.err
+}
+
+func (p *pipeHalf) close(err os.Error) os.Error {
+	// Close pipe half.
+	// Only first call to close does anything.
+	p.lock.Lock()
+	if p.closed {
+		p.lock.Unlock()
+		return os.EINVAL
+	}
+	p.closed = true
+	p.lock.Unlock()
+
+	// First, send the close notification.
+	p.cclose <- err
+
+	// Runner is now responding to rw operations
+	// with os.EINVAL.  Cut off future rw operations
+	// by setting ioclosed flag.
+	p.io.Lock()
+	p.ioclosed = true
+	p.io.Unlock()
+
+	// With ioclosed set, there will be no more rw operations
+	// working on the channels.
+	// Tell the runner we won't be bothering it anymore.
+	p.done <- 1
+
+	// Successfully torn down; can disable finalizer.
+	runtime.SetFinalizer(p, nil)
+
+	return nil
+}
+
+func (p *pipeHalf) finalizer() {
+	p.close(os.EINVAL)
+}
+
+
 // A PipeReader is the read half of a pipe.
 type PipeReader struct {
-	lock sync.Mutex
-	p    *pipe
+	pipeHalf
 }
 
 // Read implements the standard Read interface:
@@ -125,36 +201,27 @@
 // If the write end is closed with an error, that error is
 // returned as err; otherwise err is nil.
 func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
-	r.lock.Lock()
-	defer r.lock.Unlock()
-
-	return r.p.Read(data)
+	return r.rw(data)
 }
 
 // Close closes the reader; subsequent writes to the
 // write half of the pipe will return the error os.EPIPE.
 func (r *PipeReader) Close() os.Error {
-	r.lock.Lock()
-	defer r.lock.Unlock()
-
-	return r.p.CloseReader(nil)
+	return r.CloseWithError(nil)
 }
 
 // CloseWithError closes the reader; subsequent writes
-// to the write half of the pipe will return the error rerr.
-func (r *PipeReader) CloseWithError(rerr os.Error) os.Error {
-	r.lock.Lock()
-	defer r.lock.Unlock()
-
-	return r.p.CloseReader(rerr)
+// to the write half of the pipe will return the error err.
+func (r *PipeReader) CloseWithError(err os.Error) os.Error {
+	if err == nil {
+		err = os.EPIPE
+	}
+	return r.close(err)
 }
 
-func (r *PipeReader) finish() { r.Close() }
-
-// Write half of pipe.
+// A PipeWriter is the write half of a pipe.
 type PipeWriter struct {
-	lock sync.Mutex
-	p    *pipe
+	pipeHalf
 }
 
 // Write implements the standard Write interface:
@@ -163,32 +230,24 @@
 // If the read end is closed with an error, that err is
 // returned as err; otherwise err is os.EPIPE.
 func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
-	w.lock.Lock()
-	defer w.lock.Unlock()
-
-	return w.p.Write(data)
+	return w.rw(data)
 }
 
 // Close closes the writer; subsequent reads from the
-// read half of the pipe will return no bytes and a nil error.
+// read half of the pipe will return no bytes and os.EOF.
 func (w *PipeWriter) Close() os.Error {
-	w.lock.Lock()
-	defer w.lock.Unlock()
-
-	return w.p.CloseWriter(nil)
+	return w.CloseWithError(nil)
 }
 
 // CloseWithError closes the writer; subsequent reads from the
-// read half of the pipe will return no bytes and the error werr.
-func (w *PipeWriter) CloseWithError(werr os.Error) os.Error {
-	w.lock.Lock()
-	defer w.lock.Unlock()
-
-	return w.p.CloseWriter(werr)
+// read half of the pipe will return no bytes and the error err.
+func (w *PipeWriter) CloseWithError(err os.Error) os.Error {
+	if err == nil {
+		err = os.EOF
+	}
+	return w.close(err)
 }
 
-func (w *PipeWriter) finish() { w.Close() }
-
 // Pipe creates a synchronous in-memory pipe.
 // It can be used to connect code expecting an io.Reader
 // with code expecting an io.Writer.
@@ -196,8 +255,39 @@
 // copying data directly between the two; there is no internal buffering.
 func Pipe() (*PipeReader, *PipeWriter) {
 	p := &pipe{
-		cw: make(chan []byte, 1),
-		cr: make(chan bool, 1),
+		r1:     make(chan []byte),
+		r2:     make(chan pipeResult),
+		w1:     make(chan []byte),
+		w2:     make(chan pipeResult),
+		rclose: make(chan os.Error),
+		wclose: make(chan os.Error),
+		done:   make(chan int),
 	}
-	return &PipeReader{p: p}, &PipeWriter{p: p}
+	go p.run()
+
+	// NOTE: Cannot use composite literal here:
+	//	pipeHalf{c1: p.cr1, c2: p.cr2, cclose: p.crclose, cdone: p.cdone}
+	// because this implicitly copies the pipeHalf, which copies the inner mutex.
+
+	r := new(PipeReader)
+	r.c1 = p.r1
+	r.c2 = p.r2
+	r.cclose = p.rclose
+	r.done = p.done
+	// TODO(rsc): Should be able to write
+	//	runtime.SetFinalizer(r, (*PipeReader).finalizer)
+	// but 6g doesn't see the finalizer method.
+	runtime.SetFinalizer(&r.pipeHalf, (*pipeHalf).finalizer)
+
+	w := new(PipeWriter)
+	w.c1 = p.w1
+	w.c2 = p.w2
+	w.cclose = p.wclose
+	w.done = p.done
+	// TODO(rsc): Should be able to write
+	//	runtime.SetFinalizer(w, (*PipeWriter).finalizer)
+	// but 6g doesn't see the finalizer method.
+	runtime.SetFinalizer(&w.pipeHalf, (*pipeHalf).finalizer)
+
+	return r, w
 }