|  | // Copyright 2009 The Go Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style | 
|  | // license that can be found in the LICENSE file. | 
|  |  | 
|  | // Pipe adapter to connect code expecting an io.Reader | 
|  | // with code expecting an io.Writer. | 
|  |  | 
|  | package io | 
|  |  | 
|  | import ( | 
|  | "errors" | 
|  | "sync" | 
|  | ) | 
|  |  | 
|  | // ErrClosedPipe is the error used for read or write operations on a closed pipe. | 
|  | var ErrClosedPipe = errors.New("io: read/write on closed pipe") | 
|  |  | 
|  | type pipeResult struct { | 
|  | n   int | 
|  | err error | 
|  | } | 
|  |  | 
|  | // A pipe is the shared pipe structure underlying PipeReader and PipeWriter. | 
|  | type pipe struct { | 
|  | rl    sync.Mutex // gates readers one at a time | 
|  | wl    sync.Mutex // gates writers one at a time | 
|  | l     sync.Mutex // protects remaining fields | 
|  | data  []byte     // data remaining in pending write | 
|  | rwait sync.Cond  // waiting reader | 
|  | wwait sync.Cond  // waiting writer | 
|  | rerr  error      // if reader closed, error to give writes | 
|  | werr  error      // if writer closed, error to give reads | 
|  | } | 
|  |  | 
|  | func (p *pipe) read(b []byte) (n int, err error) { | 
|  | // One reader at a time. | 
|  | p.rl.Lock() | 
|  | defer p.rl.Unlock() | 
|  |  | 
|  | p.l.Lock() | 
|  | defer p.l.Unlock() | 
|  | for { | 
|  | if p.rerr != nil { | 
|  | return 0, ErrClosedPipe | 
|  | } | 
|  | if p.data != nil { | 
|  | break | 
|  | } | 
|  | if p.werr != nil { | 
|  | return 0, p.werr | 
|  | } | 
|  | p.rwait.Wait() | 
|  | } | 
|  | n = copy(b, p.data) | 
|  | p.data = p.data[n:] | 
|  | if len(p.data) == 0 { | 
|  | p.data = nil | 
|  | p.wwait.Signal() | 
|  | } | 
|  | return | 
|  | } | 
|  |  | 
|  | var zero [0]byte | 
|  |  | 
|  | func (p *pipe) write(b []byte) (n int, err error) { | 
|  | // pipe uses nil to mean not available | 
|  | if b == nil { | 
|  | b = zero[:] | 
|  | } | 
|  |  | 
|  | // One writer at a time. | 
|  | p.wl.Lock() | 
|  | defer p.wl.Unlock() | 
|  |  | 
|  | p.l.Lock() | 
|  | defer p.l.Unlock() | 
|  | if p.werr != nil { | 
|  | err = ErrClosedPipe | 
|  | return | 
|  | } | 
|  | p.data = b | 
|  | p.rwait.Signal() | 
|  | for { | 
|  | if p.data == nil { | 
|  | break | 
|  | } | 
|  | if p.rerr != nil { | 
|  | err = p.rerr | 
|  | break | 
|  | } | 
|  | if p.werr != nil { | 
|  | err = ErrClosedPipe | 
|  | } | 
|  | p.wwait.Wait() | 
|  | } | 
|  | n = len(b) - len(p.data) | 
|  | p.data = nil // in case of rerr or werr | 
|  | return | 
|  | } | 
|  |  | 
|  | func (p *pipe) rclose(err error) { | 
|  | if err == nil { | 
|  | err = ErrClosedPipe | 
|  | } | 
|  | p.l.Lock() | 
|  | defer p.l.Unlock() | 
|  | p.rerr = err | 
|  | p.rwait.Signal() | 
|  | p.wwait.Signal() | 
|  | } | 
|  |  | 
|  | func (p *pipe) wclose(err error) { | 
|  | if err == nil { | 
|  | err = EOF | 
|  | } | 
|  | p.l.Lock() | 
|  | defer p.l.Unlock() | 
|  | p.werr = err | 
|  | p.rwait.Signal() | 
|  | p.wwait.Signal() | 
|  | } | 
|  |  | 
|  | // A PipeReader is the read half of a pipe. | 
|  | type PipeReader struct { | 
|  | p *pipe | 
|  | } | 
|  |  | 
|  | // Read implements the standard Read interface: | 
|  | // it reads data from the pipe, blocking until a writer | 
|  | // arrives or the write end is closed. | 
|  | // If the write end is closed with an error, that error is | 
|  | // returned as err; otherwise err is EOF. | 
|  | func (r *PipeReader) Read(data []byte) (n int, err error) { | 
|  | return r.p.read(data) | 
|  | } | 
|  |  | 
|  | // Close closes the reader; subsequent writes to the | 
|  | // write half of the pipe will return the error ErrClosedPipe. | 
|  | func (r *PipeReader) Close() error { | 
|  | return r.CloseWithError(nil) | 
|  | } | 
|  |  | 
|  | // CloseWithError closes the reader; subsequent writes | 
|  | // to the write half of the pipe will return the error err. | 
|  | func (r *PipeReader) CloseWithError(err error) error { | 
|  | r.p.rclose(err) | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // A PipeWriter is the write half of a pipe. | 
|  | type PipeWriter struct { | 
|  | p *pipe | 
|  | } | 
|  |  | 
|  | // Write implements the standard Write interface: | 
|  | // it writes data to the pipe, blocking until readers | 
|  | // have consumed all the data or the read end is closed. | 
|  | // If the read end is closed with an error, that err is | 
|  | // returned as err; otherwise err is ErrClosedPipe. | 
|  | func (w *PipeWriter) Write(data []byte) (n int, err error) { | 
|  | return w.p.write(data) | 
|  | } | 
|  |  | 
|  | // Close closes the writer; subsequent reads from the | 
|  | // read half of the pipe will return no bytes and EOF. | 
|  | func (w *PipeWriter) Close() error { | 
|  | return w.CloseWithError(nil) | 
|  | } | 
|  |  | 
|  | // CloseWithError closes the writer; subsequent reads from the | 
|  | // read half of the pipe will return no bytes and the error err. | 
|  | func (w *PipeWriter) CloseWithError(err error) error { | 
|  | w.p.wclose(err) | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // Pipe creates a synchronous in-memory pipe. | 
|  | // It can be used to connect code expecting an io.Reader | 
|  | // with code expecting an io.Writer. | 
|  | // Reads on one end are matched with writes on the other, | 
|  | // copying data directly between the two; there is no internal buffering. | 
|  | // It is safe to call Read and Write in parallel with each other or with | 
|  | // Close. Close will complete once pending I/O is done. Parallel calls to | 
|  | // Read, and parallel calls to Write, are also safe: | 
|  | // the individual calls will be gated sequentially. | 
|  | func Pipe() (*PipeReader, *PipeWriter) { | 
|  | p := new(pipe) | 
|  | p.rwait.L = &p.l | 
|  | p.wwait.L = &p.l | 
|  | r := &PipeReader{p} | 
|  | w := &PipeWriter{p} | 
|  | return r, w | 
|  | } |