1 // Copyright 2009 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
5 // Pipe adapter to connect code expecting an io.Reader
6 // with code expecting an io.Writer.
15 // onceError is an object that will only store an error once.
16 type onceError struct {
17 sync.Mutex // guards following
21 func (a *onceError) Store(err error) {
29 func (a *onceError) Load() error {
35 // ErrClosedPipe is the error used for read or write operations on a closed pipe.
36 var ErrClosedPipe = errors.New("io: read/write on closed pipe")
38 // A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
40 wrMu sync.Mutex // Serializes Write operations
44 once sync.Once // Protects closing done
50 func (p *pipe) Read(b []byte) (n int, err error) {
53 return 0, p.readCloseError()
63 return 0, p.readCloseError()
67 func (p *pipe) readCloseError() error {
69 if werr := p.werr.Load(); rerr == nil && werr != nil {
75 func (p *pipe) CloseRead(err error) error {
80 p.once.Do(func() { close(p.done) })
84 func (p *pipe) Write(b []byte) (n int, err error) {
87 return 0, p.writeCloseError()
93 for once := true; once || len(b) > 0; once = false {
100 return n, p.writeCloseError()
106 func (p *pipe) writeCloseError() error {
107 werr := p.werr.Load()
108 if rerr := p.rerr.Load(); werr == nil && rerr != nil {
114 func (p *pipe) CloseWrite(err error) error {
119 p.once.Do(func() { close(p.done) })
123 // A PipeReader is the read half of a pipe.
124 type PipeReader struct {
128 // Read implements the standard Read interface:
129 // it reads data from the pipe, blocking until a writer
130 // arrives or the write end is closed.
131 // If the write end is closed with an error, that error is
132 // returned as err; otherwise err is EOF.
133 func (r *PipeReader) Read(data []byte) (n int, err error) {
134 return r.p.Read(data)
137 // Close closes the reader; subsequent writes to the
138 // write half of the pipe will return the error ErrClosedPipe.
139 func (r *PipeReader) Close() error {
140 return r.CloseWithError(nil)
143 // CloseWithError closes the reader; subsequent writes
144 // to the write half of the pipe will return the error err.
146 // CloseWithError never overwrites the previous error if it exists
147 // and always returns nil.
148 func (r *PipeReader) CloseWithError(err error) error {
149 return r.p.CloseRead(err)
152 // A PipeWriter is the write half of a pipe.
153 type PipeWriter struct {
157 // Write implements the standard Write interface:
158 // it writes data to the pipe, blocking until one or more readers
159 // have consumed all the data or the read end is closed.
160 // If the read end is closed with an error, that err is
161 // returned as err; otherwise err is ErrClosedPipe.
162 func (w *PipeWriter) Write(data []byte) (n int, err error) {
163 return w.p.Write(data)
166 // Close closes the writer; subsequent reads from the
167 // read half of the pipe will return no bytes and EOF.
168 func (w *PipeWriter) Close() error {
169 return w.CloseWithError(nil)
172 // CloseWithError closes the writer; subsequent reads from the
173 // read half of the pipe will return no bytes and the error err,
174 // or EOF if err is nil.
176 // CloseWithError never overwrites the previous error if it exists
177 // and always returns nil.
178 func (w *PipeWriter) CloseWithError(err error) error {
179 return w.p.CloseWrite(err)
182 // Pipe creates a synchronous in-memory pipe.
183 // It can be used to connect code expecting an io.Reader
184 // with code expecting an io.Writer.
186 // Reads and Writes on the pipe are matched one to one
187 // except when multiple Reads are needed to consume a single Write.
188 // That is, each Write to the PipeWriter blocks until it has satisfied
189 // one or more Reads from the PipeReader that fully consume
191 // The data is copied directly from the Write to the corresponding
192 // Read (or Reads); there is no internal buffering.
194 // It is safe to call Read and Write in parallel with each other or with Close.
195 // Parallel calls to Read and parallel calls to Write are also safe:
196 // the individual calls will be gated sequentially.
197 func Pipe() (*PipeReader, *PipeWriter) {
199 wrCh: make(chan []byte),
200 rdCh: make(chan int),
201 done: make(chan struct{}),
203 return &PipeReader{p}, &PipeWriter{p}