// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Pipe adapter to connect code expecting an io.Read
// with code expecting an io.Write.

package io

import (
	"os"
	"sync"
)

type pipeReturn struct {
	n   int
	err os.Error
}

// Shared pipe structure.
type pipe struct {
	rclosed bool            // Read end closed?
	rerr    os.Error        // Error supplied to CloseReader
	wclosed bool            // Write end closed?
	werr    os.Error        // Error supplied to CloseWriter
	wpend   []byte          // Written data waiting to be read.
	wtot    int             // Bytes consumed so far in current write.
	cr      chan []byte     // Write sends data here...
	cw      chan pipeReturn // ... and reads the n, err back from here.
}

func (p *pipe) Read(data []byte) (n int, err os.Error) {
	if p == nil || p.rclosed {
		return 0, os.EINVAL
	}

	// Wait for next write block if necessary.
	if p.wpend == nil {
		if !p.wclosed {
			p.wpend = <-p.cr
		}
		if p.wpend == nil {
			return 0, p.werr
		}
		p.wtot = 0
	}

	// Read from current write block.
	n = len(data)
	if n > len(p.wpend) {
		n = len(p.wpend)
	}
	for i := 0; i < n; i++ {
		data[i] = p.wpend[i]
	}
	p.wtot += n
	p.wpend = p.wpend[n:]

	// If write block is done, finish the write.
	if len(p.wpend) == 0 {
		p.wpend = nil
		p.cw <- pipeReturn{p.wtot, nil}
		p.wtot = 0
	}

	return n, nil
}

func (p *pipe) Write(data []byte) (n int, err os.Error) {
	if p == nil || p.wclosed {
		return 0, os.EINVAL
	}
	if p.rclosed {
		return 0, p.rerr
	}

	// Send data to reader.
	p.cr <- data

	// Wait for reader to finish copying it.
	res := <-p.cw
	return res.n, res.err
}

func (p *pipe) CloseReader(rerr os.Error) os.Error {
	if p == nil || p.rclosed {
		return os.EINVAL
	}

	// Stop any future writes.
	p.rclosed = true
	if rerr == nil {
		rerr = os.EPIPE
	}
	p.rerr = rerr

	// Stop the current write.
	if !p.wclosed {
		p.cw <- pipeReturn{p.wtot, rerr}
	}

	return nil
}

func (p *pipe) CloseWriter(werr os.Error) os.Error {
	if werr == nil {
		werr = os.EOF
	}
	if p == nil || p.wclosed {
		return os.EINVAL
	}

	// Stop any future reads.
	p.wclosed = true
	p.werr = werr

	// Stop the current read.
	if !p.rclosed {
		p.cr <- nil
	}

	return nil
}

// Read/write halves of the pipe.
// They are separate structures for two reasons:
//  1.  If one end becomes garbage without being Closed,
//      its finisher can Close so that the other end
//      does not hang indefinitely.
//  2.  Clients cannot use interface conversions on the
//      read end to find the Write method, and vice versa.

// A PipeReader is the read half of a pipe.
type PipeReader struct {
	lock sync.Mutex
	p    *pipe
}

// Read implements the standard Read interface:
// it reads data from the pipe, blocking until a writer
// arrives or the write end is closed.
// If the write end is closed with an error, that error is
// returned as err; otherwise err is nil.
func (r *PipeReader) Read(data []byte) (n int, err os.Error) {
	r.lock.Lock()
	defer r.lock.Unlock()

	return r.p.Read(data)
}

// Close closes the reader; subsequent writes to the
// write half of the pipe will return the error os.EPIPE.
func (r *PipeReader) Close() os.Error {
	r.lock.Lock()
	defer r.lock.Unlock()

	return r.p.CloseReader(nil)
}

// CloseWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error rerr.
func (r *PipeReader) CloseWithError(rerr os.Error) os.Error {
	r.lock.Lock()
	defer r.lock.Unlock()

	return r.p.CloseReader(rerr)
}

func (r *PipeReader) finish() { r.Close() }

// Write half of pipe.
type PipeWriter struct {
	lock sync.Mutex
	p    *pipe
}

// Write implements the standard Write interface:
// it writes data to the pipe, blocking until readers
// have consumed all the data or the read end is closed.
// If the read end is closed with an error, that err is
// returned as err; otherwise err is os.EPIPE.
func (w *PipeWriter) Write(data []byte) (n int, err os.Error) {
	w.lock.Lock()
	defer w.lock.Unlock()

	return w.p.Write(data)
}

// Close closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and a nil error.
func (w *PipeWriter) Close() os.Error {
	w.lock.Lock()
	defer w.lock.Unlock()

	return w.p.CloseWriter(nil)
}

// CloseWithError closes the writer; subsequent reads from the
// read half of the pipe will return no bytes and the error werr.
func (w *PipeWriter) CloseWithError(werr os.Error) os.Error {
	w.lock.Lock()
	defer w.lock.Unlock()

	return w.p.CloseWriter(werr)
}

func (w *PipeWriter) finish() { w.Close() }

// Pipe creates a synchronous in-memory pipe.
// It can be used to connect code expecting an io.Reader
// with code expecting an io.Writer.
// Reads on one end are matched with writes on the other,
// copying data directly between the two; there is no internal buffering.
func Pipe() (*PipeReader, *PipeWriter) {
	p := new(pipe)
	p.cr = make(chan []byte, 1)
	p.cw = make(chan pipeReturn, 1)
	r := new(PipeReader)
	r.p = p
	w := new(PipeWriter)
	w.p = p
	return r, w
}
