|  | // +build OMIT | 
|  |  | 
|  | package main | 
|  |  | 
|  | import ( | 
|  | "fmt" | 
|  | "sync" | 
|  | ) | 
|  |  | 
|  | // gen sends the values in nums on the returned channel, then closes it. | 
|  | func gen(done <-chan struct{}, nums ...int) <-chan int { | 
|  | out := make(chan int, len(nums)) | 
|  | for _, n := range nums { | 
|  | // We ignore done here because these sends cannot block. | 
|  | out <- n | 
|  | } | 
|  | close(out) | 
|  | return out | 
|  | } | 
|  |  | 
|  | // sq receives values from in, squares them, and sends them on the returned | 
|  | // channel, until in or done is closed.  Then sq closes the returned channel. | 
|  | func sq(done <-chan struct{}, in <-chan int) <-chan int { | 
|  | out := make(chan int) | 
|  | go func() { | 
|  | defer close(out) // HL | 
|  | for n := range in { | 
|  | select { | 
|  | case out <- n * n: | 
|  | case <-done: | 
|  | return // HL | 
|  | } | 
|  | } | 
|  | }() | 
|  | return out | 
|  | } | 
|  |  | 
|  | // merge receives values from each input channel and sends them on the returned | 
|  | // channel.  merge closes the returned channel after all the input values have | 
|  | // been sent or after done is closed. | 
|  | func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { | 
|  | var wg sync.WaitGroup | 
|  | out := make(chan int) | 
|  |  | 
|  | // Start an output goroutine for each input channel in cs.  output | 
|  | // copies values from c to out until c or done is closed, then calls | 
|  | // wg.Done. | 
|  | output := func(c <-chan int) { | 
|  | defer wg.Done() // HL | 
|  | for n := range c { | 
|  | select { | 
|  | case out <- n: | 
|  | case <-done: | 
|  | return // HL | 
|  | } | 
|  | } | 
|  | } | 
|  | // ... the rest is unchanged ... | 
|  |  | 
|  | wg.Add(len(cs)) | 
|  | for _, c := range cs { | 
|  | go output(c) | 
|  | } | 
|  |  | 
|  | // Start a goroutine to close out once all the output goroutines are | 
|  | // done.  This must start after the wg.Add call. | 
|  | go func() { | 
|  | wg.Wait() | 
|  | close(out) | 
|  | }() | 
|  | return out | 
|  | } | 
|  |  | 
|  | func main() { | 
|  | // Set up a done channel that's shared by the whole pipeline, | 
|  | // and close that channel when this pipeline exits, as a signal | 
|  | // for all the goroutines we started to exit. | 
|  | done := make(chan struct{}) // HL | 
|  | defer close(done)           // HL | 
|  |  | 
|  | in := gen(done, 2, 3) | 
|  |  | 
|  | // Distribute the sq work across two goroutines that both read from in. | 
|  | c1 := sq(done, in) | 
|  | c2 := sq(done, in) | 
|  |  | 
|  | // Consume the first value from output. | 
|  | out := merge(done, c1, c2) | 
|  | fmt.Println(<-out) // 4 or 9 | 
|  |  | 
|  | // done will be closed by the deferred call. // HL | 
|  | } |