| // Copyright 2015 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Package pargzip contains a parallel gzip writer implementation. By |
| // compressing each chunk of data in parallel, all the CPUs on the |
| // machine can be used, at a slight loss of compression efficiency. |
| package pargzip |
| |
| import ( |
| "bufio" |
| "bytes" |
| "compress/gzip" |
| "io" |
| "runtime" |
| "strings" |
| "sync" |
| ) |
| |
| // A Writer is an io.WriteCloser. |
| // Writes to a Writer are compressed and written to w. |
| // |
| // Any exported fields may only be mutated before the first call to |
| // Write. |
| type Writer struct { |
| // ChunkSize is the number of bytes to gzip at once. |
| // The default from NewWriter is 1MB. |
| ChunkSize int |
| |
| // Parallel is the number of chunks to compress in parallel. |
| // The default from NewWriter is runtime.NumCPU(). |
| Parallel int |
| |
| w io.Writer |
| bw *bufio.Writer |
| |
| allWritten chan struct{} // when writing goroutine ends |
| wasWriteErr chan struct{} // closed after 'err' set |
| |
| sem chan bool // semaphore bounding compressions in flight |
| chunkc chan *writeChunk // closed on Close |
| |
| mu sync.Mutex // guards following |
| closed bool |
| err error // sticky write error |
| } |
| |
| type writeChunk struct { |
| zw *Writer |
| p string // uncompressed |
| |
| donec chan struct{} // closed on completion |
| |
| // one of following is set: |
| z []byte // compressed |
| err error // exec error |
| } |
| |
| // compress runs the gzip child process. |
| // It runs in its own goroutine. |
| func (c *writeChunk) compress() (err error) { |
| defer func() { |
| if err != nil { |
| c.err = err |
| } |
| close(c.donec) |
| <-c.zw.sem |
| }() |
| var zbuf bytes.Buffer |
| zw := gzip.NewWriter(&zbuf) |
| if _, err := io.Copy(zw, strings.NewReader(c.p)); err != nil { |
| return err |
| } |
| if err := zw.Close(); err != nil { |
| return err |
| } |
| c.z = zbuf.Bytes() |
| return nil |
| } |
| |
| // NewWriter returns a new Writer. |
| // Writes to the returned writer are compressed and written to w. |
| // |
| // It is the caller's responsibility to call Close on the WriteCloser |
| // when done. Writes may be buffered and not flushed until Close. |
| // |
| // Any fields on Writer may only be modified before the first call to |
| // Write. |
| func NewWriter(w io.Writer) *Writer { |
| return &Writer{ |
| w: w, |
| allWritten: make(chan struct{}), |
| wasWriteErr: make(chan struct{}), |
| |
| ChunkSize: 1 << 20, |
| Parallel: runtime.NumCPU(), |
| } |
| } |
| |
| func (w *Writer) didInit() bool { return w.bw != nil } |
| |
| func (w *Writer) init() { |
| w.bw = bufio.NewWriterSize(newChunkWriter{w}, w.ChunkSize) |
| w.chunkc = make(chan *writeChunk, w.Parallel+1) |
| w.sem = make(chan bool, w.Parallel) |
| go func() { |
| defer close(w.allWritten) |
| for c := range w.chunkc { |
| if err := w.writeCompressedChunk(c); err != nil { |
| close(w.wasWriteErr) |
| return |
| } |
| } |
| }() |
| } |
| |
| func (w *Writer) startChunk(p []byte) { |
| w.sem <- true // block until we can begin |
| c := &writeChunk{ |
| zw: w, |
| p: string(p), // string, since the bufio.Writer owns the slice |
| donec: make(chan struct{}), |
| } |
| go c.compress() // receives from w.sem |
| select { |
| case w.chunkc <- c: |
| case <-w.wasWriteErr: |
| // Discard chunks that come after any chunk that failed |
| // to write. |
| } |
| } |
| |
| func (w *Writer) writeCompressedChunk(c *writeChunk) (err error) { |
| defer func() { |
| if err != nil { |
| w.mu.Lock() |
| defer w.mu.Unlock() |
| if w.err == nil { |
| w.err = err |
| } |
| } |
| }() |
| <-c.donec |
| if c.err != nil { |
| return c.err |
| } |
| _, err = w.w.Write(c.z) |
| return |
| } |
| |
| func (w *Writer) Write(p []byte) (n int, err error) { |
| if !w.didInit() { |
| w.init() |
| } |
| return w.bw.Write(p) |
| } |
| |
| func (w *Writer) Close() error { |
| w.mu.Lock() |
| err, wasClosed := w.err, w.closed |
| w.closed = true |
| w.mu.Unlock() |
| if wasClosed { |
| return nil |
| } |
| if !w.didInit() { |
| return nil |
| } |
| if err != nil { |
| return err |
| } |
| |
| w.bw.Flush() |
| close(w.chunkc) |
| <-w.allWritten // wait for writing goroutine to end |
| |
| w.mu.Lock() |
| err = w.err |
| w.mu.Unlock() |
| return err |
| } |
| |
| // newChunkWriter gets large chunks to compress and write to zw. |
| type newChunkWriter struct { |
| zw *Writer |
| } |
| |
| func (cw newChunkWriter) Write(p []byte) (n int, err error) { |
| n = len(p) |
| max := cw.zw.ChunkSize |
| for len(p) > 0 { |
| chunk := p |
| if len(chunk) > max { |
| chunk = chunk[:max] |
| } |
| p = p[len(chunk):] |
| cw.zw.startChunk(chunk) |
| } |
| return |
| } |