| // Copyright 2010 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package net |
| |
| import ( |
| "io" |
| "os" |
| "sync" |
| "time" |
| ) |
| |
| // pipeDeadline is an abstraction for handling timeouts. |
| type pipeDeadline struct { |
| mu sync.Mutex // Guards timer and cancel |
| timer *time.Timer |
| cancel chan struct{} // Must be non-nil |
| } |
| |
| func makePipeDeadline() pipeDeadline { |
| return pipeDeadline{cancel: make(chan struct{})} |
| } |
| |
| // set sets the point in time when the deadline will time out. |
| // A timeout event is signaled by closing the channel returned by waiter. |
| // Once a timeout has occurred, the deadline can be refreshed by specifying a |
| // t value in the future. |
| // |
| // A zero value for t prevents timeout. |
| func (d *pipeDeadline) set(t time.Time) { |
| d.mu.Lock() |
| defer d.mu.Unlock() |
| |
| if d.timer != nil && !d.timer.Stop() { |
| <-d.cancel // Wait for the timer callback to finish and close cancel |
| } |
| d.timer = nil |
| |
| // Time is zero, then there is no deadline. |
| closed := isClosedChan(d.cancel) |
| if t.IsZero() { |
| if closed { |
| d.cancel = make(chan struct{}) |
| } |
| return |
| } |
| |
| // Time in the future, setup a timer to cancel in the future. |
| if dur := time.Until(t); dur > 0 { |
| if closed { |
| d.cancel = make(chan struct{}) |
| } |
| d.timer = time.AfterFunc(dur, func() { |
| close(d.cancel) |
| }) |
| return |
| } |
| |
| // Time in the past, so close immediately. |
| if !closed { |
| close(d.cancel) |
| } |
| } |
| |
| // wait returns a channel that is closed when the deadline is exceeded. |
| func (d *pipeDeadline) wait() chan struct{} { |
| d.mu.Lock() |
| defer d.mu.Unlock() |
| return d.cancel |
| } |
| |
| func isClosedChan(c <-chan struct{}) bool { |
| select { |
| case <-c: |
| return true |
| default: |
| return false |
| } |
| } |
| |
| type pipeAddr struct{} |
| |
| func (pipeAddr) Network() string { return "pipe" } |
| func (pipeAddr) String() string { return "pipe" } |
| |
| type pipe struct { |
| wrMu sync.Mutex // Serialize Write operations |
| |
| // Used by local Read to interact with remote Write. |
| // Successful receive on rdRx is always followed by send on rdTx. |
| rdRx <-chan []byte |
| rdTx chan<- int |
| |
| // Used by local Write to interact with remote Read. |
| // Successful send on wrTx is always followed by receive on wrRx. |
| wrTx chan<- []byte |
| wrRx <-chan int |
| |
| once sync.Once // Protects closing localDone |
| localDone chan struct{} |
| remoteDone <-chan struct{} |
| |
| readDeadline pipeDeadline |
| writeDeadline pipeDeadline |
| } |
| |
| // Pipe creates a synchronous, in-memory, full duplex |
| // network connection; both ends implement the Conn interface. |
| // Reads on one end are matched with writes on the other, |
| // copying data directly between the two; there is no internal |
| // buffering. |
| func Pipe() (Conn, Conn) { |
| cb1 := make(chan []byte) |
| cb2 := make(chan []byte) |
| cn1 := make(chan int) |
| cn2 := make(chan int) |
| done1 := make(chan struct{}) |
| done2 := make(chan struct{}) |
| |
| p1 := &pipe{ |
| rdRx: cb1, rdTx: cn1, |
| wrTx: cb2, wrRx: cn2, |
| localDone: done1, remoteDone: done2, |
| readDeadline: makePipeDeadline(), |
| writeDeadline: makePipeDeadline(), |
| } |
| p2 := &pipe{ |
| rdRx: cb2, rdTx: cn2, |
| wrTx: cb1, wrRx: cn1, |
| localDone: done2, remoteDone: done1, |
| readDeadline: makePipeDeadline(), |
| writeDeadline: makePipeDeadline(), |
| } |
| return p1, p2 |
| } |
| |
| func (*pipe) LocalAddr() Addr { return pipeAddr{} } |
| func (*pipe) RemoteAddr() Addr { return pipeAddr{} } |
| |
| func (p *pipe) Read(b []byte) (int, error) { |
| n, err := p.read(b) |
| if err != nil && err != io.EOF && err != io.ErrClosedPipe { |
| err = &OpError{Op: "read", Net: "pipe", Err: err} |
| } |
| return n, err |
| } |
| |
| func (p *pipe) read(b []byte) (n int, err error) { |
| switch { |
| case isClosedChan(p.localDone): |
| return 0, io.ErrClosedPipe |
| case isClosedChan(p.remoteDone): |
| return 0, io.EOF |
| case isClosedChan(p.readDeadline.wait()): |
| return 0, os.ErrDeadlineExceeded |
| } |
| |
| select { |
| case bw := <-p.rdRx: |
| nr := copy(b, bw) |
| p.rdTx <- nr |
| return nr, nil |
| case <-p.localDone: |
| return 0, io.ErrClosedPipe |
| case <-p.remoteDone: |
| return 0, io.EOF |
| case <-p.readDeadline.wait(): |
| return 0, os.ErrDeadlineExceeded |
| } |
| } |
| |
| func (p *pipe) Write(b []byte) (int, error) { |
| n, err := p.write(b) |
| if err != nil && err != io.ErrClosedPipe { |
| err = &OpError{Op: "write", Net: "pipe", Err: err} |
| } |
| return n, err |
| } |
| |
| func (p *pipe) write(b []byte) (n int, err error) { |
| switch { |
| case isClosedChan(p.localDone): |
| return 0, io.ErrClosedPipe |
| case isClosedChan(p.remoteDone): |
| return 0, io.ErrClosedPipe |
| case isClosedChan(p.writeDeadline.wait()): |
| return 0, os.ErrDeadlineExceeded |
| } |
| |
| p.wrMu.Lock() // Ensure entirety of b is written together |
| defer p.wrMu.Unlock() |
| for once := true; once || len(b) > 0; once = false { |
| select { |
| case p.wrTx <- b: |
| nw := <-p.wrRx |
| b = b[nw:] |
| n += nw |
| case <-p.localDone: |
| return n, io.ErrClosedPipe |
| case <-p.remoteDone: |
| return n, io.ErrClosedPipe |
| case <-p.writeDeadline.wait(): |
| return n, os.ErrDeadlineExceeded |
| } |
| } |
| return n, nil |
| } |
| |
| func (p *pipe) SetDeadline(t time.Time) error { |
| if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { |
| return io.ErrClosedPipe |
| } |
| p.readDeadline.set(t) |
| p.writeDeadline.set(t) |
| return nil |
| } |
| |
| func (p *pipe) SetReadDeadline(t time.Time) error { |
| if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { |
| return io.ErrClosedPipe |
| } |
| p.readDeadline.set(t) |
| return nil |
| } |
| |
| func (p *pipe) SetWriteDeadline(t time.Time) error { |
| if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { |
| return io.ErrClosedPipe |
| } |
| p.writeDeadline.set(t) |
| return nil |
| } |
| |
| func (p *pipe) Close() error { |
| p.once.Do(func() { close(p.localDone) }) |
| return nil |
| } |