io.Pipe
assorted underscore cleanup
R=r
DELTA=488 (410 added, 3 deleted, 75 changed)
OCL=25070
CL=25070
diff --git a/src/lib/io/pipe.go b/src/lib/io/pipe.go
new file mode 100644
index 0000000..736f497
--- /dev/null
+++ b/src/lib/io/pipe.go
@@ -0,0 +1,188 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Pipe adapter to connect code expecting an io.Read
+// with code expecting an io.Write.
+
+package io
+
+import (
+ "io";
+ "os";
+ "sync";
+)
+
+type pipeReturn struct {
+ n int;
+ err *os.Error;
+}
+
+// Shared pipe structure.
+type pipe struct {
+ rclosed bool; // Read end closed?
+ wclosed bool; // Write end closed?
+ wpend []byte; // Written data waiting to be read.
+ wtot int; // Bytes consumed so far in current write.
+ cr chan []byte; // Write sends data here...
+ cw chan pipeReturn; // ... and reads the n, err back from here.
+}
+
+func (p *pipe) Read(data []byte) (n int, err *os.Error) {
+ if p == nil || p.rclosed {
+ return 0, os.EINVAL;
+ }
+
+ // Wait for next write block if necessary.
+ if p.wpend == nil {
+ if !p.wclosed {
+ p.wpend = <-p.cr;
+ }
+ if p.wpend == nil {
+ return 0, nil;
+ }
+ p.wtot = 0;
+ }
+
+ // Read from current write block.
+ n = len(data);
+ if n > len(p.wpend) {
+ n = len(p.wpend);
+ }
+ for i := 0; i < n; i++ {
+ data[i] = p.wpend[i];
+ }
+ p.wtot += n;
+ p.wpend = p.wpend[n:len(p.wpend)];
+
+ // If write block is done, finish the write.
+ if len(p.wpend) == 0 {
+ p.wpend = nil;
+ p.cw <- pipeReturn(p.wtot, nil);
+ p.wtot = 0;
+ }
+
+ return n, nil;
+}
+
+func (p *pipe) Write(data []byte) (n int, err *os.Error) {
+ if p == nil || p.wclosed {
+ return 0, os.EINVAL;
+ }
+ if p.rclosed {
+ return 0, os.EPIPE;
+ }
+
+ // Send data to reader.
+ p.cr <- data;
+
+ // Wait for reader to finish copying it.
+ res := <-p.cw;
+ return res.n, res.err;
+}
+
+func (p *pipe) CloseReader() *os.Error {
+ if p == nil || p.rclosed {
+ return os.EINVAL;
+ }
+
+ // Stop any future writes.
+ p.rclosed = true;
+
+ // Stop the current write.
+ if !p.wclosed {
+ p.cw <- pipeReturn(p.wtot, os.EPIPE);
+ }
+
+ return nil;
+}
+
+func (p *pipe) CloseWriter() *os.Error {
+ if p == nil || p.wclosed {
+ return os.EINVAL;
+ }
+
+ // Stop any future reads.
+ p.wclosed = true;
+
+ // Stop the current read.
+ if !p.rclosed {
+ p.cr <- nil;
+ }
+
+ return nil;
+}
+
+// Read/write halves of the pipe.
+// They are separate structures for two reasons:
+// 1. If one end becomes garbage without being Closed,
+// its finisher can Close so that the other end
+// does not hang indefinitely.
+// 2. Clients cannot use interface conversions on the
+// read end to find the Write method, and vice versa.
+
+// Read half of pipe.
+type pipeRead struct {
+ lock sync.Mutex;
+ p *pipe;
+}
+
+func (r *pipeRead) Read(data []byte) (n int, err *os.Error) {
+ r.lock.Lock();
+ defer r.lock.Unlock();
+
+ return r.p.Read(data);
+}
+
+func (r *pipeRead) Close() *os.Error {
+ r.lock.Lock();
+ defer r.lock.Unlock();
+
+ return r.p.CloseReader();
+}
+
+func (r *pipeRead) finish() {
+ r.Close();
+}
+
+// Write half of pipe.
+type pipeWrite struct {
+ lock sync.Mutex;
+ p *pipe;
+}
+
+func (w *pipeWrite) Write(data []byte) (n int, err *os.Error) {
+ w.lock.Lock();
+ defer w.lock.Unlock();
+
+ return w.p.Write(data);
+}
+
+func (w *pipeWrite) Close() *os.Error {
+ w.lock.Lock();
+ defer w.lock.Unlock();
+
+ return w.p.CloseWriter();
+}
+
+func (w *pipeWrite) finish() {
+ w.Close();
+}
+
+// Create a synchronous in-memory pipe.
+// Reads on one end are matched by writes on the other.
+// Writes don't complete until all the data has been
+// written or the read end is closed. Reads return
+// any available data or block until the next write
+// or the write end is closed.
+func Pipe() (io.ReadClose, io.WriteClose) {
+ p := new(pipe);
+ p.cr = make(chan []byte, 1);
+ p.cw = make(chan pipeReturn, 1);
+ r := new(pipeRead);
+ r.p = p;
+ w := new(pipeWrite);
+ w.p = p;
+ return r, w;
+}
+