// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Pipe adapter to connect code expecting an io.Read
// with code expecting an io.Write.

package io

import (
	"io";
	"os";
	"sync";
)

type pipeReturn struct {
	n int;
	err os.Error;
}

// Shared pipe structure.
type pipe struct {
	rclosed bool;		// Read end closed?
	rerr os.Error;		// Error supplied to CloseReader
	wclosed bool;		// Write end closed?
	werr os.Error;		// Error supplied to CloseWriter
	wpend []byte;		// Written data waiting to be read.
	wtot int;		// Bytes consumed so far in current write.
	cr chan []byte;		// Write sends data here...
	cw chan pipeReturn;	// ... and reads the n, err back from here.
}

func (p *pipe) Read(data []byte) (n int, err os.Error) {
	if p == nil || p.rclosed {
		return 0, os.EINVAL;
	}

	// Wait for next write block if necessary.
	if p.wpend == nil {
		if !p.wclosed {
			p.wpend = <-p.cr;
		}
		if p.wpend == nil {
			return 0, p.werr;
		}
		p.wtot = 0;
	}

	// Read from current write block.
	n = len(data);
	if n > len(p.wpend) {
		n = len(p.wpend);
	}
	for i := 0; i < n; i++ {
		data[i] = p.wpend[i];
	}
	p.wtot += n;
	p.wpend = p.wpend[n:len(p.wpend)];

	// If write block is done, finish the write.
	if len(p.wpend) == 0 {
		p.wpend = nil;
		p.cw <- pipeReturn{p.wtot, nil};
		p.wtot = 0;
	}

	return n, nil;
}

func (p *pipe) Write(data []byte) (n int, err os.Error) {
	if p == nil || p.wclosed {
		return 0, os.EINVAL;
	}
	if p.rclosed {
		return 0, p.rerr;
	}

	// Send data to reader.
	p.cr <- data;

	// Wait for reader to finish copying it.
	res := <-p.cw;
	return res.n, res.err;
}

func (p *pipe) CloseReader(rerr os.Error) os.Error {
	if p == nil || p.rclosed {
		return os.EINVAL;
	}

	// Stop any future writes.
	p.rclosed = true;
	if rerr == nil {
		rerr = os.EPIPE;
	}
	p.rerr = rerr;

	// Stop the current write.
	if !p.wclosed {
		p.cw <- pipeReturn{p.wtot, rerr};
	}

	return nil;
}

func (p *pipe) CloseWriter(werr os.Error) os.Error {
	if werr == nil {
		werr = os.EOF;
	}
	if p == nil || p.wclosed {
		return os.EINVAL;
	}

	// Stop any future reads.
	p.wclosed = true;
	p.werr = werr;

	// Stop the current read.
	if !p.rclosed {
		p.cr <- nil;
	}

	return nil;
}

// Read/write halves of the pipe.
// They are separate structures for two reasons:
//  1.  If one end becomes garbage without being Closed,
//      its finisher can Close so that the other end
//      does not hang indefinitely.
//  2.  Clients cannot use interface conversions on the
//      read end to find the Write method, and vice versa.

// A PipeReader is the read half of a pipe.
type PipeReader struct {
	lock sync.Mutex;
	p *pipe;
}

// Read implements the standard Read interface:
// it reads data from the pipe, blocking until a writer
// arrives or the write end is closed.
// If the write end is closed with an error, that error is
// returned as err; otherwise err is nil.
func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
	r.lock.Lock();
	defer r.lock.Unlock();

	return r.p.Read(data);
}

// Close closes the reader; subsequent writes to the
// write half of the pipe will return the error os.EPIPE.
func (r *PipeReader) Close() os.Error {
	r.lock.Lock();
	defer r.lock.Unlock();

	return r.p.CloseReader(nil);
}

// CloseWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error rerr.
func (r *PipeReader) CloseWithError(rerr os.Error) os.Error {
	r.lock.Lock();
	defer r.lock.Unlock();

	return r.p.CloseReader(rerr);
}

func (r *PipeReader) finish() {
	r.Close();
}

// Write half of pipe.
type PipeWriter struct {
	lock sync.Mutex;
	p *pipe;
}

// Write implements the standard Write interface:
// it writes data to the pipe, blocking until readers
// have consumed all the data or the read end is closed.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is os.EPIPE.
func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
	w.lock.Lock();
	defer w.lock.Unlock();

	return w.p.Write(data);
}

// Close closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and a nil error.
func (w *PipeWriter) Close() os.Error {
	w.lock.Lock();
	defer w.lock.Unlock();

	return w.p.CloseWriter(nil);
}

// CloseWithError closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and the error werr.
func (w *PipeWriter) CloseWithError(werr os.Error) os.Error {
	w.lock.Lock();
	defer w.lock.Unlock();

	return w.p.CloseWriter(werr);
}

func (w *PipeWriter) finish() {
	w.Close();
}

// Pipe creates a synchronous in-memory pipe.
// It can be used to connect code expecting an io.Reader
// with code expecting an io.Writer.
// Reads on one end are matched with writes on the other,
// copying data directly between the two; there is no internal buffering.
func Pipe() (*PipeReader, *PipeWriter) {
	p := new(pipe);
	p.cr = make(chan []byte, 1);
	p.cw = make(chan pipeReturn, 1);
	r := new(PipeReader);
	r.p = p;
	w := new(PipeWriter);
	w.p = p;
	return r, w;
}

