|  | // Copyright 2015 The Go Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style | 
|  | // license that can be found in the LICENSE file. | 
|  |  | 
|  | // Package pargzip contains a parallel gzip writer implementation.  By | 
|  | // compressing each chunk of data in parallel, all the CPUs on the | 
|  | // machine can be used, at a slight loss of compression efficiency. | 
|  | package pargzip | 
|  |  | 
|  | import ( | 
|  | "bufio" | 
|  | "bytes" | 
|  | "compress/gzip" | 
|  | "io" | 
|  | "runtime" | 
|  | "strings" | 
|  | "sync" | 
|  | ) | 
|  |  | 
|  | // A Writer is an io.WriteCloser. | 
|  | // Writes to a Writer are compressed and written to w. | 
|  | // | 
|  | // Any exported fields may only be mutated before the first call to | 
|  | // Write. | 
|  | type Writer struct { | 
|  | // ChunkSize is the number of bytes to gzip at once. | 
|  | // The default from NewWriter is 1MB. | 
|  | ChunkSize int | 
|  |  | 
|  | // Parallel is the number of chunks to compress in parallel. | 
|  | // The default from NewWriter is runtime.NumCPU(). | 
|  | Parallel int | 
|  |  | 
|  | w  io.Writer | 
|  | bw *bufio.Writer | 
|  |  | 
|  | allWritten  chan struct{} // when writing goroutine ends | 
|  | wasWriteErr chan struct{} // closed after 'err' set | 
|  |  | 
|  | sem    chan bool        // semaphore bounding compressions in flight | 
|  | chunkc chan *writeChunk // closed on Close | 
|  |  | 
|  | mu     sync.Mutex // guards following | 
|  | closed bool | 
|  | err    error // sticky write error | 
|  | } | 
|  |  | 
|  | type writeChunk struct { | 
|  | zw *Writer | 
|  | p  string // uncompressed | 
|  |  | 
|  | donec chan struct{} // closed on completion | 
|  |  | 
|  | // one of following is set: | 
|  | z   []byte // compressed | 
|  | err error  // exec error | 
|  | } | 
|  |  | 
|  | // compress runs the gzip child process. | 
|  | // It runs in its own goroutine. | 
|  | func (c *writeChunk) compress() (err error) { | 
|  | defer func() { | 
|  | if err != nil { | 
|  | c.err = err | 
|  | } | 
|  | close(c.donec) | 
|  | <-c.zw.sem | 
|  | }() | 
|  | var zbuf bytes.Buffer | 
|  | zw := gzip.NewWriter(&zbuf) | 
|  | if _, err := io.Copy(zw, strings.NewReader(c.p)); err != nil { | 
|  | return err | 
|  | } | 
|  | if err := zw.Close(); err != nil { | 
|  | return err | 
|  | } | 
|  | c.z = zbuf.Bytes() | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // NewWriter returns a new Writer. | 
|  | // Writes to the returned writer are compressed and written to w. | 
|  | // | 
|  | // It is the caller's responsibility to call Close on the WriteCloser | 
|  | // when done. Writes may be buffered and not flushed until Close. | 
|  | // | 
|  | // Any fields on Writer may only be modified before the first call to | 
|  | // Write. | 
|  | func NewWriter(w io.Writer) *Writer { | 
|  | return &Writer{ | 
|  | w:           w, | 
|  | allWritten:  make(chan struct{}), | 
|  | wasWriteErr: make(chan struct{}), | 
|  |  | 
|  | ChunkSize: 1 << 20, | 
|  | Parallel:  runtime.NumCPU(), | 
|  | } | 
|  | } | 
|  |  | 
|  | func (w *Writer) didInit() bool { return w.bw != nil } | 
|  |  | 
|  | func (w *Writer) init() { | 
|  | w.bw = bufio.NewWriterSize(newChunkWriter{w}, w.ChunkSize) | 
|  | w.chunkc = make(chan *writeChunk, w.Parallel+1) | 
|  | w.sem = make(chan bool, w.Parallel) | 
|  | go func() { | 
|  | defer close(w.allWritten) | 
|  | for c := range w.chunkc { | 
|  | if err := w.writeCompressedChunk(c); err != nil { | 
|  | close(w.wasWriteErr) | 
|  | return | 
|  | } | 
|  | } | 
|  | }() | 
|  | } | 
|  |  | 
|  | func (w *Writer) startChunk(p []byte) { | 
|  | w.sem <- true // block until we can begin | 
|  | c := &writeChunk{ | 
|  | zw:    w, | 
|  | p:     string(p), // string, since the bufio.Writer owns the slice | 
|  | donec: make(chan struct{}), | 
|  | } | 
|  | go c.compress() // receives from w.sem | 
|  | select { | 
|  | case w.chunkc <- c: | 
|  | case <-w.wasWriteErr: | 
|  | // Discard chunks that come after any chunk that failed | 
|  | // to write. | 
|  | } | 
|  | } | 
|  |  | 
|  | func (w *Writer) writeCompressedChunk(c *writeChunk) (err error) { | 
|  | defer func() { | 
|  | if err != nil { | 
|  | w.mu.Lock() | 
|  | defer w.mu.Unlock() | 
|  | if w.err == nil { | 
|  | w.err = err | 
|  | } | 
|  | } | 
|  | }() | 
|  | <-c.donec | 
|  | if c.err != nil { | 
|  | return c.err | 
|  | } | 
|  | _, err = w.w.Write(c.z) | 
|  | return | 
|  | } | 
|  |  | 
|  | func (w *Writer) Write(p []byte) (n int, err error) { | 
|  | if !w.didInit() { | 
|  | w.init() | 
|  | } | 
|  | return w.bw.Write(p) | 
|  | } | 
|  |  | 
|  | func (w *Writer) Close() error { | 
|  | w.mu.Lock() | 
|  | err, wasClosed := w.err, w.closed | 
|  | w.closed = true | 
|  | w.mu.Unlock() | 
|  | if wasClosed { | 
|  | return nil | 
|  | } | 
|  | if !w.didInit() { | 
|  | return nil | 
|  | } | 
|  | if err != nil { | 
|  | return err | 
|  | } | 
|  |  | 
|  | w.bw.Flush() | 
|  | close(w.chunkc) | 
|  | <-w.allWritten // wait for writing goroutine to end | 
|  |  | 
|  | w.mu.Lock() | 
|  | err = w.err | 
|  | w.mu.Unlock() | 
|  | return err | 
|  | } | 
|  |  | 
|  | // newChunkWriter gets large chunks to compress and write to zw. | 
|  | type newChunkWriter struct { | 
|  | zw *Writer | 
|  | } | 
|  |  | 
|  | func (cw newChunkWriter) Write(p []byte) (n int, err error) { | 
|  | n = len(p) | 
|  | max := cw.zw.ChunkSize | 
|  | for len(p) > 0 { | 
|  | chunk := p | 
|  | if len(chunk) > max { | 
|  | chunk = chunk[:max] | 
|  | } | 
|  | p = p[len(chunk):] | 
|  | cw.zw.startChunk(chunk) | 
|  | } | 
|  | return | 
|  | } |