blob: 00be8efa2e2058908ad29dc0171fc4d5af6f4536 [file] [log] [blame]
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Pipe adapter to connect code expecting an io.Reader
// with code expecting an io.Writer.
package io
import (
"os"
"sync"
)
type pipeResult struct {
n int
err os.Error
}
// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
type pipe struct {
rl sync.Mutex // gates readers one at a time
wl sync.Mutex // gates writers one at a time
l sync.Mutex // protects remaining fields
data []byte // data remaining in pending write
rwait sync.Cond // waiting reader
wwait sync.Cond // waiting writer
rerr os.Error // if reader closed, error to give writes
werr os.Error // if writer closed, error to give reads
}
func (p *pipe) read(b []byte) (n int, err os.Error) {
// One reader at a time.
p.rl.Lock()
defer p.rl.Unlock()
p.l.Lock()
defer p.l.Unlock()
for {
if p.rerr != nil {
return 0, os.EINVAL
}
if p.data != nil {
break
}
if p.werr != nil {
return 0, p.werr
}
p.rwait.Wait()
}
n = copy(b, p.data)
p.data = p.data[n:]
if len(p.data) == 0 {
p.data = nil
p.wwait.Signal()
}
return
}
var zero [0]byte
func (p *pipe) write(b []byte) (n int, err os.Error) {
// pipe uses nil to mean not available
if b == nil {
b = zero[:]
}
// One writer at a time.
p.wl.Lock()
defer p.wl.Unlock()
p.l.Lock()
defer p.l.Unlock()
p.data = b
p.rwait.Signal()
for {
if p.data == nil {
break
}
if p.rerr != nil {
err = p.rerr
break
}
if p.werr != nil {
err = os.EINVAL
}
p.wwait.Wait()
}
n = len(b) - len(p.data)
p.data = nil // in case of rerr or werr
return
}
func (p *pipe) rclose(err os.Error) {
if err == nil {
err = os.EPIPE
}
p.l.Lock()
defer p.l.Unlock()
p.rerr = err
p.rwait.Signal()
p.wwait.Signal()
}
func (p *pipe) wclose(err os.Error) {
if err == nil {
err = os.EOF
}
p.l.Lock()
defer p.l.Unlock()
p.werr = err
p.rwait.Signal()
p.wwait.Signal()
}
// A PipeReader is the read half of a pipe.
type PipeReader struct {
p *pipe
}
// Read implements the standard Read interface:
// it reads data from the pipe, blocking until a writer
// arrives or the write end is closed.
// If the write end is closed with an error, that error is
// returned as err; otherwise err is os.EOF.
func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
return r.p.read(data)
}
// Close closes the reader; subsequent writes to the
// write half of the pipe will return the error os.EPIPE.
func (r *PipeReader) Close() os.Error {
return r.CloseWithError(nil)
}
// CloseWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error err.
func (r *PipeReader) CloseWithError(err os.Error) os.Error {
r.p.rclose(err)
return nil
}
// A PipeWriter is the write half of a pipe.
type PipeWriter struct {
p *pipe
}
// Write implements the standard Write interface:
// it writes data to the pipe, blocking until readers
// have consumed all the data or the read end is closed.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is os.EPIPE.
func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
return w.p.write(data)
}
// Close closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and os.EOF.
func (w *PipeWriter) Close() os.Error {
return w.CloseWithError(nil)
}
// CloseWithError closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and the error err.
func (w *PipeWriter) CloseWithError(err os.Error) os.Error {
w.p.wclose(err)
return nil
}
// Pipe creates a synchronous in-memory pipe.
// It can be used to connect code expecting an io.Reader
// with code expecting an io.Writer.
// Reads on one end are matched with writes on the other,
// copying data directly between the two; there is no internal buffering.
func Pipe() (*PipeReader, *PipeWriter) {
p := new(pipe)
p.rwait.L = &p.l
p.wwait.L = &p.l
r := &PipeReader{p}
w := &PipeWriter{p}
return r, w
}