|  | // Copyright 2010 The Go Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style | 
|  | // license that can be found in the LICENSE file. | 
|  |  | 
|  | package net | 
|  |  | 
|  | import ( | 
|  | "io" | 
|  | "os" | 
|  | "sync" | 
|  | "time" | 
|  | ) | 
|  |  | 
|  | // pipeDeadline is an abstraction for handling timeouts. | 
|  | type pipeDeadline struct { | 
|  | mu     sync.Mutex // Guards timer and cancel | 
|  | timer  *time.Timer | 
|  | cancel chan struct{} // Must be non-nil | 
|  | } | 
|  |  | 
|  | func makePipeDeadline() pipeDeadline { | 
|  | return pipeDeadline{cancel: make(chan struct{})} | 
|  | } | 
|  |  | 
|  | // set sets the point in time when the deadline will time out. | 
|  | // A timeout event is signaled by closing the channel returned by waiter. | 
|  | // Once a timeout has occurred, the deadline can be refreshed by specifying a | 
|  | // t value in the future. | 
|  | // | 
|  | // A zero value for t prevents timeout. | 
|  | func (d *pipeDeadline) set(t time.Time) { | 
|  | d.mu.Lock() | 
|  | defer d.mu.Unlock() | 
|  |  | 
|  | if d.timer != nil && !d.timer.Stop() { | 
|  | <-d.cancel // Wait for the timer callback to finish and close cancel | 
|  | } | 
|  | d.timer = nil | 
|  |  | 
|  | // Time is zero, then there is no deadline. | 
|  | closed := isClosedChan(d.cancel) | 
|  | if t.IsZero() { | 
|  | if closed { | 
|  | d.cancel = make(chan struct{}) | 
|  | } | 
|  | return | 
|  | } | 
|  |  | 
|  | // Time in the future, setup a timer to cancel in the future. | 
|  | if dur := time.Until(t); dur > 0 { | 
|  | if closed { | 
|  | d.cancel = make(chan struct{}) | 
|  | } | 
|  | d.timer = time.AfterFunc(dur, func() { | 
|  | close(d.cancel) | 
|  | }) | 
|  | return | 
|  | } | 
|  |  | 
|  | // Time in the past, so close immediately. | 
|  | if !closed { | 
|  | close(d.cancel) | 
|  | } | 
|  | } | 
|  |  | 
|  | // wait returns a channel that is closed when the deadline is exceeded. | 
|  | func (d *pipeDeadline) wait() chan struct{} { | 
|  | d.mu.Lock() | 
|  | defer d.mu.Unlock() | 
|  | return d.cancel | 
|  | } | 
|  |  | 
|  | func isClosedChan(c <-chan struct{}) bool { | 
|  | select { | 
|  | case <-c: | 
|  | return true | 
|  | default: | 
|  | return false | 
|  | } | 
|  | } | 
|  |  | 
|  | type pipeAddr struct{} | 
|  |  | 
|  | func (pipeAddr) Network() string { return "pipe" } | 
|  | func (pipeAddr) String() string  { return "pipe" } | 
|  |  | 
|  | type pipe struct { | 
|  | wrMu sync.Mutex // Serialize Write operations | 
|  |  | 
|  | // Used by local Read to interact with remote Write. | 
|  | // Successful receive on rdRx is always followed by send on rdTx. | 
|  | rdRx <-chan []byte | 
|  | rdTx chan<- int | 
|  |  | 
|  | // Used by local Write to interact with remote Read. | 
|  | // Successful send on wrTx is always followed by receive on wrRx. | 
|  | wrTx chan<- []byte | 
|  | wrRx <-chan int | 
|  |  | 
|  | once       sync.Once // Protects closing localDone | 
|  | localDone  chan struct{} | 
|  | remoteDone <-chan struct{} | 
|  |  | 
|  | readDeadline  pipeDeadline | 
|  | writeDeadline pipeDeadline | 
|  | } | 
|  |  | 
|  | // Pipe creates a synchronous, in-memory, full duplex | 
|  | // network connection; both ends implement the Conn interface. | 
|  | // Reads on one end are matched with writes on the other, | 
|  | // copying data directly between the two; there is no internal | 
|  | // buffering. | 
|  | func Pipe() (Conn, Conn) { | 
|  | cb1 := make(chan []byte) | 
|  | cb2 := make(chan []byte) | 
|  | cn1 := make(chan int) | 
|  | cn2 := make(chan int) | 
|  | done1 := make(chan struct{}) | 
|  | done2 := make(chan struct{}) | 
|  |  | 
|  | p1 := &pipe{ | 
|  | rdRx: cb1, rdTx: cn1, | 
|  | wrTx: cb2, wrRx: cn2, | 
|  | localDone: done1, remoteDone: done2, | 
|  | readDeadline:  makePipeDeadline(), | 
|  | writeDeadline: makePipeDeadline(), | 
|  | } | 
|  | p2 := &pipe{ | 
|  | rdRx: cb2, rdTx: cn2, | 
|  | wrTx: cb1, wrRx: cn1, | 
|  | localDone: done2, remoteDone: done1, | 
|  | readDeadline:  makePipeDeadline(), | 
|  | writeDeadline: makePipeDeadline(), | 
|  | } | 
|  | return p1, p2 | 
|  | } | 
|  |  | 
|  | func (*pipe) LocalAddr() Addr  { return pipeAddr{} } | 
|  | func (*pipe) RemoteAddr() Addr { return pipeAddr{} } | 
|  |  | 
|  | func (p *pipe) Read(b []byte) (int, error) { | 
|  | n, err := p.read(b) | 
|  | if err != nil && err != io.EOF && err != io.ErrClosedPipe { | 
|  | err = &OpError{Op: "read", Net: "pipe", Err: err} | 
|  | } | 
|  | return n, err | 
|  | } | 
|  |  | 
|  | func (p *pipe) read(b []byte) (n int, err error) { | 
|  | switch { | 
|  | case isClosedChan(p.localDone): | 
|  | return 0, io.ErrClosedPipe | 
|  | case isClosedChan(p.remoteDone): | 
|  | return 0, io.EOF | 
|  | case isClosedChan(p.readDeadline.wait()): | 
|  | return 0, os.ErrDeadlineExceeded | 
|  | } | 
|  |  | 
|  | select { | 
|  | case bw := <-p.rdRx: | 
|  | nr := copy(b, bw) | 
|  | p.rdTx <- nr | 
|  | return nr, nil | 
|  | case <-p.localDone: | 
|  | return 0, io.ErrClosedPipe | 
|  | case <-p.remoteDone: | 
|  | return 0, io.EOF | 
|  | case <-p.readDeadline.wait(): | 
|  | return 0, os.ErrDeadlineExceeded | 
|  | } | 
|  | } | 
|  |  | 
|  | func (p *pipe) Write(b []byte) (int, error) { | 
|  | n, err := p.write(b) | 
|  | if err != nil && err != io.ErrClosedPipe { | 
|  | err = &OpError{Op: "write", Net: "pipe", Err: err} | 
|  | } | 
|  | return n, err | 
|  | } | 
|  |  | 
|  | func (p *pipe) write(b []byte) (n int, err error) { | 
|  | switch { | 
|  | case isClosedChan(p.localDone): | 
|  | return 0, io.ErrClosedPipe | 
|  | case isClosedChan(p.remoteDone): | 
|  | return 0, io.ErrClosedPipe | 
|  | case isClosedChan(p.writeDeadline.wait()): | 
|  | return 0, os.ErrDeadlineExceeded | 
|  | } | 
|  |  | 
|  | p.wrMu.Lock() // Ensure entirety of b is written together | 
|  | defer p.wrMu.Unlock() | 
|  | for once := true; once || len(b) > 0; once = false { | 
|  | select { | 
|  | case p.wrTx <- b: | 
|  | nw := <-p.wrRx | 
|  | b = b[nw:] | 
|  | n += nw | 
|  | case <-p.localDone: | 
|  | return n, io.ErrClosedPipe | 
|  | case <-p.remoteDone: | 
|  | return n, io.ErrClosedPipe | 
|  | case <-p.writeDeadline.wait(): | 
|  | return n, os.ErrDeadlineExceeded | 
|  | } | 
|  | } | 
|  | return n, nil | 
|  | } | 
|  |  | 
|  | func (p *pipe) SetDeadline(t time.Time) error { | 
|  | if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { | 
|  | return io.ErrClosedPipe | 
|  | } | 
|  | p.readDeadline.set(t) | 
|  | p.writeDeadline.set(t) | 
|  | return nil | 
|  | } | 
|  |  | 
|  | func (p *pipe) SetReadDeadline(t time.Time) error { | 
|  | if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { | 
|  | return io.ErrClosedPipe | 
|  | } | 
|  | p.readDeadline.set(t) | 
|  | return nil | 
|  | } | 
|  |  | 
|  | func (p *pipe) SetWriteDeadline(t time.Time) error { | 
|  | if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { | 
|  | return io.ErrClosedPipe | 
|  | } | 
|  | p.writeDeadline.set(t) | 
|  | return nil | 
|  | } | 
|  |  | 
|  | func (p *pipe) Close() error { | 
|  | p.once.Do(func() { close(p.localDone) }) | 
|  | return nil | 
|  | } |