blob: 8f821a9c66a57115412407bec0d5e54e3ffb7853 [file] [log] [blame]
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Pipe adapter to connect code expecting an io.Read
// with code expecting an io.Write.
package io
import (
"os"
"sync"
)
type pipeReturn struct {
n int
err os.Error
}
// Shared pipe structure.
type pipe struct {
rclosed bool // Read end closed?
rerr os.Error // Error supplied to CloseReader
wclosed bool // Write end closed?
werr os.Error // Error supplied to CloseWriter
wpend []byte // Written data waiting to be read.
wtot int // Bytes consumed so far in current write.
cr chan []byte // Write sends data here...
cw chan pipeReturn // ... and reads the n, err back from here.
}
func (p *pipe) Read(data []byte) (n int, err os.Error) {
if p == nil || p.rclosed {
return 0, os.EINVAL
}
// Wait for next write block if necessary.
if p.wpend == nil {
if !p.wclosed {
p.wpend = <-p.cr
}
if p.wpend == nil {
return 0, p.werr
}
p.wtot = 0
}
// Read from current write block.
n = len(data)
if n > len(p.wpend) {
n = len(p.wpend)
}
for i := 0; i < n; i++ {
data[i] = p.wpend[i]
}
p.wtot += n
p.wpend = p.wpend[n:]
// If write block is done, finish the write.
if len(p.wpend) == 0 {
p.wpend = nil
p.cw <- pipeReturn{p.wtot, nil}
p.wtot = 0
}
return n, nil
}
func (p *pipe) Write(data []byte) (n int, err os.Error) {
if p == nil || p.wclosed {
return 0, os.EINVAL
}
if p.rclosed {
return 0, p.rerr
}
// Send data to reader.
p.cr <- data
// Wait for reader to finish copying it.
res := <-p.cw
return res.n, res.err
}
func (p *pipe) CloseReader(rerr os.Error) os.Error {
if p == nil || p.rclosed {
return os.EINVAL
}
// Stop any future writes.
p.rclosed = true
if rerr == nil {
rerr = os.EPIPE
}
p.rerr = rerr
// Stop the current write.
if !p.wclosed {
p.cw <- pipeReturn{p.wtot, rerr}
}
return nil
}
func (p *pipe) CloseWriter(werr os.Error) os.Error {
if werr == nil {
werr = os.EOF
}
if p == nil || p.wclosed {
return os.EINVAL
}
// Stop any future reads.
p.wclosed = true
p.werr = werr
// Stop the current read.
if !p.rclosed {
p.cr <- nil
}
return nil
}
// Read/write halves of the pipe.
// They are separate structures for two reasons:
// 1. If one end becomes garbage without being Closed,
// its finisher can Close so that the other end
// does not hang indefinitely.
// 2. Clients cannot use interface conversions on the
// read end to find the Write method, and vice versa.
// A PipeReader is the read half of a pipe.
type PipeReader struct {
lock sync.Mutex
p *pipe
}
// Read implements the standard Read interface:
// it reads data from the pipe, blocking until a writer
// arrives or the write end is closed.
// If the write end is closed with an error, that error is
// returned as err; otherwise err is nil.
func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
r.lock.Lock()
defer r.lock.Unlock()
return r.p.Read(data)
}
// Close closes the reader; subsequent writes to the
// write half of the pipe will return the error os.EPIPE.
func (r *PipeReader) Close() os.Error {
r.lock.Lock()
defer r.lock.Unlock()
return r.p.CloseReader(nil)
}
// CloseWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error rerr.
func (r *PipeReader) CloseWithError(rerr os.Error) os.Error {
r.lock.Lock()
defer r.lock.Unlock()
return r.p.CloseReader(rerr)
}
func (r *PipeReader) finish() { r.Close() }
// Write half of pipe.
type PipeWriter struct {
lock sync.Mutex
p *pipe
}
// Write implements the standard Write interface:
// it writes data to the pipe, blocking until readers
// have consumed all the data or the read end is closed.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is os.EPIPE.
func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
w.lock.Lock()
defer w.lock.Unlock()
return w.p.Write(data)
}
// Close closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and a nil error.
func (w *PipeWriter) Close() os.Error {
w.lock.Lock()
defer w.lock.Unlock()
return w.p.CloseWriter(nil)
}
// CloseWithError closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and the error werr.
func (w *PipeWriter) CloseWithError(werr os.Error) os.Error {
w.lock.Lock()
defer w.lock.Unlock()
return w.p.CloseWriter(werr)
}
func (w *PipeWriter) finish() { w.Close() }
// Pipe creates a synchronous in-memory pipe.
// It can be used to connect code expecting an io.Reader
// with code expecting an io.Writer.
// Reads on one end are matched with writes on the other,
// copying data directly between the two; there is no internal buffering.
func Pipe() (*PipeReader, *PipeWriter) {
p := new(pipe)
p.cr = make(chan []byte, 1)
p.cw = make(chan pipeReturn, 1)
r := new(PipeReader)
r.p = p
w := new(PipeWriter)
w.p = p
return r, w
}