Advanced Go Concurrency Patterns

Sameer Ajmani
Google
http://profiles.google.com/ajmani
@Sajma
http://golang.org

* Video

This talk was presented at Google I/O in May 2013.

.link https://www.youtube.com/watch?v=QDDwwePbDtw Watch the talk on YouTube

* Get ready

.image advconc/gopherswim.jpg 400 400

* Go supports concurrency

In the language and runtime, not a library.

This changes how you structure your programs.

* Goroutines and Channels

Goroutines are independently executing functions in the same address space.

   go f()
   go g(1, 2)

Channels are typed values that allow goroutines to synchronize and exchange information.

  c := make(chan int)
  go func() { c <- 3 }()
  n := <-c

For more on the basics, watch [[http://go.dev/talks/2012/concurrency.slide#1][Go Concurrency Patterns (Pike, 2012)]].

* Example: ping-pong

.play advconc/pingpong/pingpong.go /STARTMAIN1/,/STOPMAIN1/

* Deadlock detection

.play advconc/pingpongdeadlock/pingpongdeadlock.go /STARTMAIN1/,/STOPMAIN1/

* Panic dumps the stacks

.play advconc/pingpongpanic/pingpongpanic.go /STARTMAIN1/,/STOPMAIN1/

* It's easy to go, but how to stop?

Long-lived programs need to clean up.

Let's look at how to write programs that handle communication, periodic events, and cancellation.

The core is Go's `select` statement: like a `switch`, but the decision is made based on the ability to communicate.

  select {
  case xc <- x:
      // sent x on xc
  case y := <-yc:
      // received y from yc
  }

* Example: feed reader

My favorite feed reader disappeared.  I need a new one.

Why not write one?

Where do we start?

* Find an RSS client

Searching [[http://godoc.org][godoc.org]] for *"rss"* turns up several hits, including one that provides:

  // Fetch fetches Items for uri and returns the time when the next
  // fetch should be attempted.  On failure, Fetch returns an error.
  func Fetch(uri string) (items []Item, next time.Time, err error)

  type Item struct{
      Title, Channel, GUID string // a subset of RSS fields
  }

But I want a stream:

  <-chan Item

And I want multiple subscriptions.

* Here's what we have

  type Fetcher interface {
      Fetch() (items []Item, next time.Time, err error)
  }

  func Fetch(domain string) Fetcher {...} // fetches Items from domain

* Here's what we want

  type Subscription interface {
      Updates() <-chan Item // stream of Items
      Close() error         // shuts down the stream
  }

  func Subscribe(fetcher Fetcher) Subscription {...} // converts Fetches to a stream

  func Merge(subs ...Subscription) Subscription {...} // merges several streams

* Example

.play advconc/fakemain/fakemain.go /func main/,/^}/

* Subscribe

`Subscribe` creates a new `Subscription` that repeatedly fetches items until `Close` is called.

  func Subscribe(fetcher Fetcher) Subscription {
      s := &sub{
          fetcher: fetcher,
          updates: make(chan Item), // for Updates
      }
      go s.loop()
      return s
  }

  // sub implements the Subscription interface.
  type sub struct {
      fetcher Fetcher   // fetches items
      updates chan Item // delivers items to the user
  }

  // loop fetches items using s.fetcher and sends them
  // on s.updates.  loop exits when s.Close is called.
  func (s *sub) loop() {...}

* Implementing Subscription

To implement the `Subscription` interface, define `Updates` and `Close`.

.code advconc/fakemain/fakemain.go /func.* Updates/,/^}/

  func (s *sub) Close() error {
      // TODO: make loop exit
      // TODO: find out about any error
      return err
  }

* What does loop do?

- periodically call `Fetch`
- send fetched items on the `Updates` channel
- exit when `Close` is called, reporting any error

* Naive Implementation

# Not quite enough room for this; retry after format change:
# .play advconc/naivemain/naivemain.go /naiveSub\) loop/,/^}/
# also on subsequent slides.

.play advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE /
.code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/

* Bug 1: unsynchronized access to s.closed/s.err

.code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsync
.code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/ HLsync

* Race Detector

  go run -race naivemain.go

# original is 400x1500
.image advconc/race.png 150 562
.play advconc/naivemain/naivemain.go /STARTNAIVE /,/s.err/ HLsync
.code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/ HLsync

* Bug 2: time.Sleep may keep loop running

.code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsleep

* Bug 3: loop may block forever on s.updates

.code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsend

* Solution

Change the body of `loop` to a `select` with three cases:

- `Close` was called
- it's time to call `Fetch`
- send an item on `s.updates`

* Structure: for-select loop

`loop` runs in its own goroutine.

`select` lets `loop` avoid blocking indefinitely in any one state.

  func (s *sub) loop() {
      ... declare mutable state ...
      for {
          ... set up channels for cases ...
          select {
          case <-c1:
              ... read/write state ...
          case c2 <- x:
              ... read/write state ...
          case y := <-c3:
              ... read/write state ...
          }
      }
  }

The cases interact via local state in `loop`.

* Case 1: Close

`Close` communicates with `loop` via `s.closing`.

  type sub struct {
      closing chan chan error
  }

The service (`loop`) listens for requests on its channel (`s.closing`).

The client (`Close`) sends a request on `s.closing`: _exit_and_reply_with_the_error_

In this case, the only thing in the request is the _reply_channel_.

* Case 1: Close

`Close` asks loop to exit and waits for a response.

.code advconc/fakemain/fakemain.go /\*sub\) Close/,/^}/ HLchan

`loop` handles `Close` by replying with the `Fetch` error and exiting.

.code advconc/fakemain/fakemain.go /STARTCLOSEONLY /,/STOPCLOSEONLY / HLchan

* Case 2: Fetch

Schedule the next `Fetch` after some delay.

.code advconc/fakemain/fakemain.go /STARTFETCHONLY /,/STOPFETCHONLY /

* Case 3: Send

Send the fetched items, one at a time.

	var pending []Item // appended by fetch; consumed by send
	for {
		select {
		case s.updates <- pending[0]:
			pending = pending[1:]
		}
	}

Whoops. This crashes.

.image advconc/gopherswrench.jpg 200 337

* Select and nil channels

Sends and receives on nil channels block.

Select never selects a blocking case.

.play advconc/nilselect/nilselect.go /func main/,/^}/

* Case 3: Send (fixed)

Enable send only when pending is non-empty.

.code advconc/fakemain/fakemain.go /STARTSENDONLY /,/STOPSENDONLY / HLupdates

* Select

Put the three cases together:

.code advconc/fakemain/fakemain.go /STARTSELECT /,/STOPSELECT /

The cases interact via `err`, `next`, and `pending`.

No locks, no condition variables, no callbacks.

* Bugs fixed

- Bug 1: unsynchronized access to `s.closed` and `s.err`
- Bug 2: `time.Sleep` may keep loop running
- Bug 3: `loop` may block forever sending on `s.updates`

.code advconc/fakemain/fakemain.go /STARTSELECT /,/STOPSELECT / HLcases

* We can improve loop further

* Issue: Fetch may return duplicates

.code advconc/fakemain/fakemain.go /STARTFETCHVARS /,/STOPFETCHVARS / HLfetch
.code advconc/fakemain/fakemain.go /STARTFETCHCASE /,/STOPFETCHCASE / HLfetch

* Fix: Filter items before adding to pending

.code advconc/fakemain/fakemain.go /STARTSEEN /,/STOPSEEN / HLseen
.code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe

* Issue: Pending queue grows without bound

.code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe

* Fix: Disable fetch case when too much pending

  const maxPending = 10

.code advconc/fakemain/fakemain.go /STARTCAP /,/STOPCAP / HLcap

Could instead drop older items from the head of `pending`.

* Issue: Loop blocks on Fetch

.code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLfetch

* Fix: Run Fetch asynchronously

Add a new `select` case for `fetchDone`.

  type fetchResult struct{ fetched []Item; next time.Time; err error }

.code advconc/fakemain/fakemain.go /STARTFETCHDONE /,/STOPFETCHDONE / HLfetch
.code advconc/fakemain/fakemain.go /STARTFETCHIF /,/STOPFETCHIF / HLfetch
.code advconc/fakemain/fakemain.go /STARTFETCHASYNC /,/STOPFETCHASYNC / HLfetch

* Implemented Subscribe

Responsive. Cleans up. Easy to read and change.

Three techniques:

- `for-select` loop
- service channel, reply channels (`chan`chan`error`)
- `nil` channels in `select` cases

More details online, including `Merge`.

.image advconc/gopherhat.jpg 200 158

* Conclusion

Concurrent programming can be tricky.

Go makes it easier:

- channels convey data, timer events, cancellation signals
- goroutines serialize access to local mutable state
- stack traces & deadlock detector
- race detector

.image advconc/race.png 200 750

* Links

Go Concurrency Patterns (2012)

.link http://go.dev/talks/2012/concurrency.slide

Concurrency is not parallelism

.link http://golang.org/s/concurrency-is-not-parallelism

Share memory by communicating

.link http://golang.org/doc/codewalk/sharemem

Go Tour (learn Go in your browser)

.link http://tour.golang.org
