blob: f65354a7f253b65a4c1718b3dae1908c55229efe [file] [log] [blame]
Russ Cox78906c32009-02-16 16:32:30 -08001// Copyright 2009 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
Russ Cox2a5d30f2010-02-01 17:43:15 -08005// Pipe adapter to connect code expecting an io.Reader
6// with code expecting an io.Writer.
Russ Cox78906c32009-02-16 16:32:30 -08007
8package io
9
Rob Pike929203a2012-02-06 15:09:50 +110010import (
11 "errors"
12 "sync"
13)
Russ Coxc06cf032011-11-01 21:48:52 -040014
15// ErrClosedPipe is the error used for read or write operations on a closed pipe.
Rob Pike929203a2012-02-06 15:09:50 +110016var ErrClosedPipe = errors.New("io: read/write on closed pipe")
Russ Cox78906c32009-02-16 16:32:30 -080017
Russ Coxcc62bed2010-04-27 10:17:17 -070018type pipeResult struct {
19 n int
Russ Coxc06cf032011-11-01 21:48:52 -040020 err error
Russ Coxcc62bed2010-04-27 10:17:17 -070021}
22
Russ Cox6d6f3382011-03-07 10:37:28 -050023// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
Russ Cox78906c32009-02-16 16:32:30 -080024type pipe struct {
Russ Cox6d6f3382011-03-07 10:37:28 -050025 rl sync.Mutex // gates readers one at a time
26 wl sync.Mutex // gates writers one at a time
27 l sync.Mutex // protects remaining fields
28 data []byte // data remaining in pending write
29 rwait sync.Cond // waiting reader
30 wwait sync.Cond // waiting writer
Russ Coxc06cf032011-11-01 21:48:52 -040031 rerr error // if reader closed, error to give writes
32 werr error // if writer closed, error to give reads
Russ Cox78906c32009-02-16 16:32:30 -080033}
34
Russ Coxc06cf032011-11-01 21:48:52 -040035func (p *pipe) read(b []byte) (n int, err error) {
Russ Cox6d6f3382011-03-07 10:37:28 -050036 // One reader at a time.
37 p.rl.Lock()
38 defer p.rl.Unlock()
Russ Cox78906c32009-02-16 16:32:30 -080039
Russ Cox6d6f3382011-03-07 10:37:28 -050040 p.l.Lock()
41 defer p.l.Unlock()
Russ Coxcc62bed2010-04-27 10:17:17 -070042 for {
Russ Cox6d6f3382011-03-07 10:37:28 -050043 if p.rerr != nil {
Russ Coxc06cf032011-11-01 21:48:52 -040044 return 0, ErrClosedPipe
Russ Cox78906c32009-02-16 16:32:30 -080045 }
Russ Cox6d6f3382011-03-07 10:37:28 -050046 if p.data != nil {
47 break
Russ Cox78906c32009-02-16 16:32:30 -080048 }
Russ Cox6d6f3382011-03-07 10:37:28 -050049 if p.werr != nil {
50 return 0, p.werr
51 }
52 p.rwait.Wait()
Russ Cox78906c32009-02-16 16:32:30 -080053 }
Russ Cox6d6f3382011-03-07 10:37:28 -050054 n = copy(b, p.data)
55 p.data = p.data[n:]
56 if len(p.data) == 0 {
57 p.data = nil
58 p.wwait.Signal()
Russ Coxcc62bed2010-04-27 10:17:17 -070059 }
Russ Cox6d6f3382011-03-07 10:37:28 -050060 return
Russ Coxcc62bed2010-04-27 10:17:17 -070061}
62
Russ Cox6d6f3382011-03-07 10:37:28 -050063var zero [0]byte
64
Russ Coxc06cf032011-11-01 21:48:52 -040065func (p *pipe) write(b []byte) (n int, err error) {
Russ Cox6d6f3382011-03-07 10:37:28 -050066 // pipe uses nil to mean not available
67 if b == nil {
68 b = zero[:]
Russ Coxcc62bed2010-04-27 10:17:17 -070069 }
Russ Coxcc62bed2010-04-27 10:17:17 -070070
Russ Cox6d6f3382011-03-07 10:37:28 -050071 // One writer at a time.
72 p.wl.Lock()
73 defer p.wl.Unlock()
Russ Coxcc62bed2010-04-27 10:17:17 -070074
Russ Cox6d6f3382011-03-07 10:37:28 -050075 p.l.Lock()
76 defer p.l.Unlock()
Rick Arnold4be93852013-08-13 11:04:09 -070077 if p.werr != nil {
78 err = ErrClosedPipe
79 return
80 }
Russ Cox6d6f3382011-03-07 10:37:28 -050081 p.data = b
82 p.rwait.Signal()
83 for {
84 if p.data == nil {
85 break
86 }
87 if p.rerr != nil {
88 err = p.rerr
89 break
90 }
91 if p.werr != nil {
Russ Coxc06cf032011-11-01 21:48:52 -040092 err = ErrClosedPipe
Russ Cox6d6f3382011-03-07 10:37:28 -050093 }
94 p.wwait.Wait()
95 }
96 n = len(b) - len(p.data)
97 p.data = nil // in case of rerr or werr
98 return
Russ Coxcc62bed2010-04-27 10:17:17 -070099}
100
Russ Coxc06cf032011-11-01 21:48:52 -0400101func (p *pipe) rclose(err error) {
Russ Cox6d6f3382011-03-07 10:37:28 -0500102 if err == nil {
Russ Coxc06cf032011-11-01 21:48:52 -0400103 err = ErrClosedPipe
Russ Cox6d6f3382011-03-07 10:37:28 -0500104 }
105 p.l.Lock()
106 defer p.l.Unlock()
107 p.rerr = err
108 p.rwait.Signal()
109 p.wwait.Signal()
Russ Coxcc62bed2010-04-27 10:17:17 -0700110}
111
Russ Coxc06cf032011-11-01 21:48:52 -0400112func (p *pipe) wclose(err error) {
Russ Cox6d6f3382011-03-07 10:37:28 -0500113 if err == nil {
Russ Coxc06cf032011-11-01 21:48:52 -0400114 err = EOF
Russ Cox6d6f3382011-03-07 10:37:28 -0500115 }
116 p.l.Lock()
117 defer p.l.Unlock()
118 p.werr = err
119 p.rwait.Signal()
120 p.wwait.Signal()
121}
Russ Coxcc62bed2010-04-27 10:17:17 -0700122
Russ Cox6defc252009-06-06 21:51:05 -0700123// A PipeReader is the read half of a pipe.
124type PipeReader struct {
Russ Cox6d6f3382011-03-07 10:37:28 -0500125 p *pipe
Russ Cox78906c32009-02-16 16:32:30 -0800126}
127
Russ Cox6defc252009-06-06 21:51:05 -0700128// Read implements the standard Read interface:
129// it reads data from the pipe, blocking until a writer
130// arrives or the write end is closed.
131// If the write end is closed with an error, that error is
Russ Coxc06cf032011-11-01 21:48:52 -0400132// returned as err; otherwise err is EOF.
133func (r *PipeReader) Read(data []byte) (n int, err error) {
Russ Cox6d6f3382011-03-07 10:37:28 -0500134 return r.p.read(data)
Russ Cox78906c32009-02-16 16:32:30 -0800135}
136
Russ Cox6defc252009-06-06 21:51:05 -0700137// Close closes the reader; subsequent writes to the
Russ Coxc06cf032011-11-01 21:48:52 -0400138// write half of the pipe will return the error ErrClosedPipe.
139func (r *PipeReader) Close() error {
Russ Coxcc62bed2010-04-27 10:17:17 -0700140 return r.CloseWithError(nil)
Russ Cox78906c32009-02-16 16:32:30 -0800141}
142
Russ Cox6defc252009-06-06 21:51:05 -0700143// CloseWithError closes the reader; subsequent writes
Russ Coxcc62bed2010-04-27 10:17:17 -0700144// to the write half of the pipe will return the error err.
Russ Coxc06cf032011-11-01 21:48:52 -0400145func (r *PipeReader) CloseWithError(err error) error {
Russ Cox6d6f3382011-03-07 10:37:28 -0500146 r.p.rclose(err)
147 return nil
Russ Cox6defc252009-06-06 21:51:05 -0700148}
149
Russ Coxcc62bed2010-04-27 10:17:17 -0700150// A PipeWriter is the write half of a pipe.
Russ Cox6defc252009-06-06 21:51:05 -0700151type PipeWriter struct {
Russ Cox6d6f3382011-03-07 10:37:28 -0500152 p *pipe
Russ Cox78906c32009-02-16 16:32:30 -0800153}
154
Russ Cox6defc252009-06-06 21:51:05 -0700155// Write implements the standard Write interface:
156// it writes data to the pipe, blocking until readers
157// have consumed all the data or the read end is closed.
158// If the read end is closed with an error, that err is
Russ Coxc06cf032011-11-01 21:48:52 -0400159// returned as err; otherwise err is ErrClosedPipe.
160func (w *PipeWriter) Write(data []byte) (n int, err error) {
Russ Cox6d6f3382011-03-07 10:37:28 -0500161 return w.p.write(data)
Russ Cox78906c32009-02-16 16:32:30 -0800162}
163
Russ Cox6defc252009-06-06 21:51:05 -0700164// Close closes the writer; subsequent reads from the
Russ Coxc06cf032011-11-01 21:48:52 -0400165// read half of the pipe will return no bytes and EOF.
166func (w *PipeWriter) Close() error {
Russ Coxcc62bed2010-04-27 10:17:17 -0700167 return w.CloseWithError(nil)
Russ Cox78906c32009-02-16 16:32:30 -0800168}
169
Russ Cox6defc252009-06-06 21:51:05 -0700170// CloseWithError closes the writer; subsequent reads from the
Russ Coxcc62bed2010-04-27 10:17:17 -0700171// read half of the pipe will return no bytes and the error err.
Russ Coxc06cf032011-11-01 21:48:52 -0400172func (w *PipeWriter) CloseWithError(err error) error {
Russ Cox6d6f3382011-03-07 10:37:28 -0500173 w.p.wclose(err)
174 return nil
Russ Cox6defc252009-06-06 21:51:05 -0700175}
176
Rob Pike7bb335c2009-03-06 03:43:44 -0800177// Pipe creates a synchronous in-memory pipe.
Russ Cox6defc252009-06-06 21:51:05 -0700178// It can be used to connect code expecting an io.Reader
Rob Pikec8b47c62009-05-08 11:22:57 -0700179// with code expecting an io.Writer.
Russ Cox6defc252009-06-06 21:51:05 -0700180// Reads on one end are matched with writes on the other,
181// copying data directly between the two; there is no internal buffering.
Rob Pike5a5279e2012-03-01 11:24:13 +1100182// It is safe to call Read and Write in parallel with each other or with
183// Close. Close will complete once pending I/O is done. Parallel calls to
184// Read, and parallel calls to Write, are also safe:
Robert Griesemerde7361b2012-03-02 11:15:45 -0800185// the individual calls will be gated sequentially.
Russ Cox6defc252009-06-06 21:51:05 -0700186func Pipe() (*PipeReader, *PipeWriter) {
Russ Cox6d6f3382011-03-07 10:37:28 -0500187 p := new(pipe)
188 p.rwait.L = &p.l
189 p.wwait.L = &p.l
190 r := &PipeReader{p}
191 w := &PipeWriter{p}
Russ Coxcc62bed2010-04-27 10:17:17 -0700192 return r, w
Russ Cox78906c32009-02-16 16:32:30 -0800193}