| # Go Concurrency Patterns: Pipelines and cancellation |
| 13 Mar 2014 |
| Tags: concurrency, pipelines, cancellation |
| Summary: How to use Go's concurrency to build data-processing pipelines. |
| |
| Sameer Ajmani |
| |
| ## Introduction |
| |
| Go's concurrency primitives make it easy to construct streaming data pipelines |
| that make efficient use of I/O and multiple CPUs. This article presents |
| examples of such pipelines, highlights subtleties that arise when operations |
| fail, and introduces techniques for dealing with failures cleanly. |
| |
| ## What is a pipeline? |
| |
| There's no formal definition of a pipeline in Go; it's just one of many kinds of |
| concurrent programs. Informally, a pipeline is a series of _stages_ connected |
| by channels, where each stage is a group of goroutines running the same |
| function. In each stage, the goroutines |
| |
| - receive values from _upstream_ via _inbound_ channels |
| - perform some function on that data, usually producing new values |
| - send values _downstream_ via _outbound_ channels |
| |
| Each stage has any number of inbound and outbound channels, except the |
| first and last stages, which have only outbound or inbound channels, |
| respectively. The first stage is sometimes called the _source_ or |
| _producer_; the last stage, the _sink_ or _consumer_. |
| |
| We'll begin with a simple example pipeline to explain the ideas and techniques. |
| Later, we'll present a more realistic example. |
| |
| ## Squaring numbers |
| |
| Consider a pipeline with three stages. |
| |
| The first stage, `gen`, is a function that converts a list of integers to a |
| channel that emits the integers in the list. The `gen` function starts a |
| goroutine that sends the integers on the channel and closes the channel when all |
| the values have been sent: |
| |
| .code pipelines/square.go /func gen/,/^}/ |
| |
| The second stage, `sq`, receives integers from a channel and returns a |
| channel that emits the square of each received integer. After the |
| inbound channel is closed and this stage has sent all the values |
| downstream, it closes the outbound channel: |
| |
| .code pipelines/square.go /func sq/,/^}/ |
| |
| The `main` function sets up the pipeline and runs the final stage: it receives |
| values from the second stage and prints each one, until the channel is closed: |
| |
| .code pipelines/square.go /func main/,/^}/ |
| |
| Since `sq` has the same type for its inbound and outbound channels, we |
| can compose it any number of times. We can also rewrite `main` as a |
| range loop, like the other stages: |
| |
| .code pipelines/square2.go /func main/,/^}/ |
| |
| ## Fan-out, fan-in |
| |
| Multiple functions can read from the same channel until that channel is closed; |
| this is called _fan-out_. This provides a way to distribute work amongst a group |
| of workers to parallelize CPU use and I/O. |
| |
| A function can read from multiple inputs and proceed until all are closed by |
| multiplexing the input channels onto a single channel that's closed when all the |
| inputs are closed. This is called _fan-in_. |
| |
| We can change our pipeline to run two instances of `sq`, each reading from the |
| same input channel. We introduce a new function, _merge_, to fan in the |
| results: |
| |
| .code pipelines/sqfan.go /func main/,/^}/ |
| |
| The `merge` function converts a list of channels to a single channel by starting |
| a goroutine for each inbound channel that copies the values to the sole outbound |
| channel. Once all the `output` goroutines have been started, `merge` starts one |
| more goroutine to close the outbound channel after all sends on that channel are |
| done. |
| |
| Sends on a closed channel panic, so it's important to ensure all sends |
| are done before calling close. The |
| [`sync.WaitGroup`](https://golang.org/pkg/sync/#WaitGroup) type |
| provides a simple way to arrange this synchronization: |
| |
| .code pipelines/sqfan.go /func merge/,/^}/ |
| |
| ## Stopping short |
| |
| There is a pattern to our pipeline functions: |
| |
| - stages close their outbound channels when all the send operations are done. |
| - stages keep receiving values from inbound channels until those channels are closed. |
| |
| This pattern allows each receiving stage to be written as a `range` loop and |
| ensures that all goroutines exit once all values have been successfully sent |
| downstream. |
| |
| But in real pipelines, stages don't always receive all the inbound |
| values. Sometimes this is by design: the receiver may only need a |
| subset of values to make progress. More often, a stage exits early |
| because an inbound value represents an error in an earlier stage. In |
| either case the receiver should not have to wait for the remaining |
| values to arrive, and we want earlier stages to stop producing values |
| that later stages don't need. |
| |
| In our example pipeline, if a stage fails to consume all the inbound values, the |
| goroutines attempting to send those values will block indefinitely: |
| |
| .code pipelines/sqleak.go /first value/,/^}/ |
| |
| This is a resource leak: goroutines consume memory and runtime resources, and |
| heap references in goroutine stacks keep data from being garbage collected. |
| Goroutines are not garbage collected; they must exit on their own. |
| |
| We need to arrange for the upstream stages of our pipeline to exit even when the |
| downstream stages fail to receive all the inbound values. One way to do this is |
| to change the outbound channels to have a buffer. A buffer can hold a fixed |
| number of values; send operations complete immediately if there's room in the |
| buffer: |
| |
| c := make(chan int, 2) // buffer size 2 |
| c <- 1 // succeeds immediately |
| c <- 2 // succeeds immediately |
| c <- 3 // blocks until another goroutine does <-c and receives 1 |
| |
| When the number of values to be sent is known at channel creation time, a buffer |
| can simplify the code. For example, we can rewrite `gen` to copy the list of |
| integers into a buffered channel and avoid creating a new goroutine: |
| |
| .code pipelines/sqbuffer.go /func gen/,/^}/ |
| |
| Returning to the blocked goroutines in our pipeline, we might consider adding a |
| buffer to the outbound channel returned by `merge`: |
| |
| .code pipelines/sqbuffer.go /func merge/,/unchanged/ |
| |
| While this fixes the blocked goroutine in this program, this is bad code. The |
| choice of buffer size of 1 here depends on knowing the number of values `merge` |
| will receive and the number of values downstream stages will consume. This is |
| fragile: if we pass an additional value to `gen`, or if the downstream stage |
| reads any fewer values, we will again have blocked goroutines. |
| |
| Instead, we need to provide a way for downstream stages to indicate to the |
| senders that they will stop accepting input. |
| |
| ## Explicit cancellation |
| |
| When `main` decides to exit without receiving all the values from |
| `out`, it must tell the goroutines in the upstream stages to abandon |
| the values they're trying to send. It does so by sending values on a |
| channel called `done`. It sends two values since there are |
| potentially two blocked senders: |
| |
| .code pipelines/sqdone1.go /func main/,/^}/ |
| |
| The sending goroutines replace their send operation with a `select` statement |
| that proceeds either when the send on `out` happens or when they receive a value |
| from `done`. The value type of `done` is the empty struct because the value |
| doesn't matter: it is the receive event that indicates the send on `out` should |
| be abandoned. The `output` goroutines continue looping on their inbound |
| channel, `c`, so the upstream stages are not blocked. (We'll discuss in a moment |
| how to allow this loop to return early.) |
| |
| .code pipelines/sqdone1.go /func merge/,/unchanged/ |
| |
| This approach has a problem: _each_ downstream receiver needs to know the number |
| of potentially blocked upstream senders and arrange to signal those senders on |
| early return. Keeping track of these counts is tedious and error-prone. |
| |
| We need a way to tell an unknown and unbounded number of goroutines to |
| stop sending their values downstream. In Go, we can do this by |
| closing a channel, because |
| [a receive operation on a closed channel can always proceed immediately, yielding the element type's zero value.](https://golang.org/ref/spec#Receive_operator) |
| |
| This means that `main` can unblock all the senders simply by closing |
| the `done` channel. This close is effectively a broadcast signal to |
| the senders. We extend _each_ of our pipeline functions to accept |
| `done` as a parameter and arrange for the close to happen via a |
| `defer` statement, so that all return paths from `main` will signal |
| the pipeline stages to exit. |
| |
| .code pipelines/sqdone3.go /func main/,/^}/ |
| |
| Each of our pipeline stages is now free to return as soon as `done` is closed. |
| The `output` routine in `merge` can return without draining its inbound channel, |
| since it knows the upstream sender, `sq`, will stop attempting to send when |
| `done` is closed. `output` ensures `wg.Done` is called on all return paths via |
| a `defer` statement: |
| |
| .code pipelines/sqdone3.go /func merge/,/unchanged/ |
| |
| Similarly, `sq` can return as soon as `done` is closed. `sq` ensures its `out` |
| channel is closed on all return paths via a `defer` statement: |
| |
| .code pipelines/sqdone3.go /func sq/,/^}/ |
| |
| Here are the guidelines for pipeline construction: |
| |
| - stages close their outbound channels when all the send operations are done. |
| - stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked. |
| |
| Pipelines unblock senders either by ensuring there's enough buffer for all the |
| values that are sent or by explicitly signalling senders when the receiver may |
| abandon the channel. |
| |
| ## Digesting a tree |
| |
| Let's consider a more realistic pipeline. |
| |
| MD5 is a message-digest algorithm that's useful as a file checksum. The command |
| line utility `md5sum` prints digest values for a list of files. |
| |
| % md5sum *.go |
| d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go |
| ee869afd31f83cbb2d10ee81b2b831dc parallel.go |
| b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go |
| |
| Our example program is like `md5sum` but instead takes a single directory as an |
| argument and prints the digest values for each regular file under that |
| directory, sorted by path name. |
| |
| % go run serial.go . |
| d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go |
| ee869afd31f83cbb2d10ee81b2b831dc parallel.go |
| b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go |
| |
| The main function of our program invokes a helper function `MD5All`, which |
| returns a map from path name to digest value, then sorts and prints the results: |
| |
| .code pipelines/serial.go /func main/,/^}/ |
| |
| The `MD5All` function is the focus of our discussion. In |
| [serial.go](pipelines/serial.go), the implementation uses no concurrency and |
| simply reads and sums each file as it walks the tree. |
| |
| .code pipelines/serial.go /MD5All/,/^}/ |
| |
| ## Parallel digestion |
| |
| In [parallel.go](pipelines/parallel.go), we split `MD5All` into a two-stage |
| pipeline. The first stage, `sumFiles`, walks the tree, digests each file in |
| a new goroutine, and sends the results on a channel with value type `result`: |
| |
| .code pipelines/parallel.go /type result/,/}/ HLresult |
| |
| `sumFiles` returns two channels: one for the `results` and another for the error |
| returned by `filepath.Walk`. The walk function starts a new goroutine to |
| process each regular file, then checks `done`. If `done` is closed, the walk |
| stops immediately: |
| |
| .code pipelines/parallel.go /func sumFiles/,/^}/ |
| |
| `MD5All` receives the digest values from `c`. `MD5All` returns early on error, |
| closing `done` via a `defer`: |
| |
| .code pipelines/parallel.go /func MD5All/,/^}/ HLdone |
| |
| ## Bounded parallelism |
| |
| The `MD5All` implementation in [parallel.go](pipelines/parallel.go) |
| starts a new goroutine for each file. In a directory with many large |
| files, this may allocate more memory than is available on the machine. |
| |
| We can limit these allocations by bounding the number of files read in |
| parallel. In [bounded.go](pipelines/bounded.go), we do this by |
| creating a fixed number of goroutines for reading files. Our pipeline |
| now has three stages: walk the tree, read and digest the files, and |
| collect the digests. |
| |
| The first stage, `walkFiles`, emits the paths of regular files in the tree: |
| |
| .code pipelines/bounded.go /func walkFiles/,/^}/ |
| |
| The middle stage starts a fixed number of `digester` goroutines that receive |
| file names from `paths` and send `results` on channel `c`: |
| |
| .code pipelines/bounded.go /func digester/,/^}/ HLpaths |
| |
| Unlike our previous examples, `digester` does not close its output channel, as |
| multiple goroutines are sending on a shared channel. Instead, code in `MD5All` |
| arranges for the channel to be closed when all the `digesters` are done: |
| |
| .code pipelines/bounded.go /fixed number/,/End of pipeline/ HLc |
| |
| We could instead have each digester create and return its own output |
| channel, but then we would need additional goroutines to fan-in the |
| results. |
| |
| The final stage receives all the `results` from `c` then checks the |
| error from `errc`. This check cannot happen any earlier, since before |
| this point, `walkFiles` may block sending values downstream: |
| |
| .code pipelines/bounded.go /m := make/,/^}/ HLerrc |
| |
| ## Conclusion |
| |
| This article has presented techniques for constructing streaming data pipelines |
| in Go. Dealing with failures in such pipelines is tricky, since each stage in |
| the pipeline may block attempting to send values downstream, and the downstream |
| stages may no longer care about the incoming data. We showed how closing a |
| channel can broadcast a "done" signal to all the goroutines started by a |
| pipeline and defined guidelines for constructing pipelines correctly. |
| |
| Further reading: |
| |
| - [Go Concurrency Patterns](https://talks.golang.org/2012/concurrency.slide#1) |
| ([video](https://www.youtube.com/watch?v=f6kdp27TYZs)) presents the basics |
| of Go's concurrency primitives and several ways to apply them. |
| - [Advanced Go Concurrency Patterns](https://blog.golang.org/advanced-go-concurrency-patterns) |
| ([video](http://www.youtube.com/watch?v=QDDwwePbDtw)) covers more complex |
| uses of Go's primitives, |
| especially `select`. |
| - Douglas McIlroy's paper [Squinting at Power Series](https://swtch.com/~rsc/thread/squint.pdf) |
| shows how Go-like concurrency provides elegant support for complex calculations. |