| Advanced Go Concurrency Patterns |
| |
| Sameer Ajmani |
| Google |
| http://profiles.google.com/ajmani |
| @Sajma |
| https://go.dev |
| |
| * Video |
| |
| This talk was presented at Google I/O in May 2013. |
| |
| .link https://www.youtube.com/watch?v=QDDwwePbDtw Watch the talk on YouTube |
| |
| * Get ready |
| |
| .image advconc/gopherswim.jpg 400 400 |
| |
| * Go supports concurrency |
| |
| In the language and runtime, not a library. |
| |
| This changes how you structure your programs. |
| |
| * Goroutines and Channels |
| |
| Goroutines are independently executing functions in the same address space. |
| |
| go f() |
| go g(1, 2) |
| |
| Channels are typed values that allow goroutines to synchronize and exchange information. |
| |
| c := make(chan int) |
| go func() { c <- 3 }() |
| n := <-c |
| |
| For more on the basics, watch [[/talks/2012/concurrency.slide#1][Go Concurrency Patterns (Pike, 2012)]]. |
| |
| * Example: ping-pong |
| |
| .play advconc/pingpong/pingpong.go /STARTMAIN1/,/STOPMAIN1/ |
| |
| * Deadlock detection |
| |
| .play advconc/pingpongdeadlock/pingpongdeadlock.go /STARTMAIN1/,/STOPMAIN1/ |
| |
| * Panic dumps the stacks |
| |
| .play advconc/pingpongpanic/pingpongpanic.go /STARTMAIN1/,/STOPMAIN1/ |
| |
| * It's easy to go, but how to stop? |
| |
| Long-lived programs need to clean up. |
| |
| Let's look at how to write programs that handle communication, periodic events, and cancellation. |
| |
| The core is Go's `select` statement: like a `switch`, but the decision is made based on the ability to communicate. |
| |
| select { |
| case xc <- x: |
| // sent x on xc |
| case y := <-yc: |
| // received y from yc |
| } |
| |
| * Example: feed reader |
| |
| My favorite feed reader disappeared. I need a new one. |
| |
| Why not write one? |
| |
| Where do we start? |
| |
| * Find an RSS client |
| |
| Searching [[https://pkg.go.dev][pkg.go.dev]] for *"rss"* turns up several hits, including one that provides: |
| |
| // Fetch fetches Items for uri and returns the time when the next |
| // fetch should be attempted. On failure, Fetch returns an error. |
| func Fetch(uri string) (items []Item, next time.Time, err error) |
| |
| type Item struct{ |
| Title, Channel, GUID string // a subset of RSS fields |
| } |
| |
| But I want a stream: |
| |
| <-chan Item |
| |
| And I want multiple subscriptions. |
| |
| * Here's what we have |
| |
| type Fetcher interface { |
| Fetch() (items []Item, next time.Time, err error) |
| } |
| |
| func Fetch(domain string) Fetcher {...} // fetches Items from domain |
| |
| * Here's what we want |
| |
| type Subscription interface { |
| Updates() <-chan Item // stream of Items |
| Close() error // shuts down the stream |
| } |
| |
| func Subscribe(fetcher Fetcher) Subscription {...} // converts Fetches to a stream |
| |
| func Merge(subs ...Subscription) Subscription {...} // merges several streams |
| |
| * Example |
| |
| .play advconc/fakemain/fakemain.go /func main/,/^}/ |
| |
| * Subscribe |
| |
| `Subscribe` creates a new `Subscription` that repeatedly fetches items until `Close` is called. |
| |
| func Subscribe(fetcher Fetcher) Subscription { |
| s := &sub{ |
| fetcher: fetcher, |
| updates: make(chan Item), // for Updates |
| } |
| go s.loop() |
| return s |
| } |
| |
| // sub implements the Subscription interface. |
| type sub struct { |
| fetcher Fetcher // fetches items |
| updates chan Item // delivers items to the user |
| } |
| |
| // loop fetches items using s.fetcher and sends them |
| // on s.updates. loop exits when s.Close is called. |
| func (s *sub) loop() {...} |
| |
| * Implementing Subscription |
| |
| To implement the `Subscription` interface, define `Updates` and `Close`. |
| |
| .code advconc/fakemain/fakemain.go /func.* Updates/,/^}/ |
| |
| func (s *sub) Close() error { |
| // TODO: make loop exit |
| // TODO: find out about any error |
| return err |
| } |
| |
| * What does loop do? |
| |
| - periodically call `Fetch` |
| - send fetched items on the `Updates` channel |
| - exit when `Close` is called, reporting any error |
| |
| * Naive Implementation |
| |
| # Not quite enough room for this; retry after format change: |
| # .play advconc/naivemain/naivemain.go /naiveSub\) loop/,/^}/ |
| # also on subsequent slides. |
| |
| .play advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / |
| .code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/ |
| |
| * Bug 1: unsynchronized access to s.closed/s.err |
| |
| .code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsync |
| .code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/ HLsync |
| |
| * Race Detector |
| |
| go run -race naivemain.go |
| |
| # original is 400x1500 |
| .image advconc/race.png 150 562 |
| .play advconc/naivemain/naivemain.go /STARTNAIVE /,/s.err/ HLsync |
| .code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/ HLsync |
| |
| * Bug 2: time.Sleep may keep loop running |
| |
| .code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsleep |
| |
| * Bug 3: loop may block forever on s.updates |
| |
| .code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsend |
| |
| * Solution |
| |
| Change the body of `loop` to a `select` with three cases: |
| |
| - `Close` was called |
| - it's time to call `Fetch` |
| - send an item on `s.updates` |
| |
| * Structure: for-select loop |
| |
| `loop` runs in its own goroutine. |
| |
| `select` lets `loop` avoid blocking indefinitely in any one state. |
| |
| func (s *sub) loop() { |
| ... declare mutable state ... |
| for { |
| ... set up channels for cases ... |
| select { |
| case <-c1: |
| ... read/write state ... |
| case c2 <- x: |
| ... read/write state ... |
| case y := <-c3: |
| ... read/write state ... |
| } |
| } |
| } |
| |
| The cases interact via local state in `loop`. |
| |
| * Case 1: Close |
| |
| `Close` communicates with `loop` via `s.closing`. |
| |
| type sub struct { |
| closing chan chan error |
| } |
| |
| The service (`loop`) listens for requests on its channel (`s.closing`). |
| |
| The client (`Close`) sends a request on `s.closing`: _exit_and_reply_with_the_error_ |
| |
| In this case, the only thing in the request is the _reply_channel_. |
| |
| * Case 1: Close |
| |
| `Close` asks loop to exit and waits for a response. |
| |
| .code advconc/fakemain/fakemain.go /\*sub\) Close/,/^}/ HLchan |
| |
| `loop` handles `Close` by replying with the `Fetch` error and exiting. |
| |
| .code advconc/fakemain/fakemain.go /STARTCLOSEONLY /,/STOPCLOSEONLY / HLchan |
| |
| * Case 2: Fetch |
| |
| Schedule the next `Fetch` after some delay. |
| |
| .code advconc/fakemain/fakemain.go /STARTFETCHONLY /,/STOPFETCHONLY / |
| |
| * Case 3: Send |
| |
| Send the fetched items, one at a time. |
| |
| var pending []Item // appended by fetch; consumed by send |
| for { |
| select { |
| case s.updates <- pending[0]: |
| pending = pending[1:] |
| } |
| } |
| |
| Whoops. This crashes. |
| |
| .image advconc/gopherswrench.jpg 200 337 |
| |
| * Select and nil channels |
| |
| Sends and receives on nil channels block. |
| |
| Select never selects a blocking case. |
| |
| .play advconc/nilselect/nilselect.go /func main/,/^}/ |
| |
| * Case 3: Send (fixed) |
| |
| Enable send only when pending is non-empty. |
| |
| .code advconc/fakemain/fakemain.go /STARTSENDONLY /,/STOPSENDONLY / HLupdates |
| |
| * Select |
| |
| Put the three cases together: |
| |
| .code advconc/fakemain/fakemain.go /STARTSELECT /,/STOPSELECT / |
| |
| The cases interact via `err`, `next`, and `pending`. |
| |
| No locks, no condition variables, no callbacks. |
| |
| * Bugs fixed |
| |
| - Bug 1: unsynchronized access to `s.closed` and `s.err` |
| - Bug 2: `time.Sleep` may keep loop running |
| - Bug 3: `loop` may block forever sending on `s.updates` |
| |
| .code advconc/fakemain/fakemain.go /STARTSELECT /,/STOPSELECT / HLcases |
| |
| * We can improve loop further |
| |
| * Issue: Fetch may return duplicates |
| |
| .code advconc/fakemain/fakemain.go /STARTFETCHVARS /,/STOPFETCHVARS / HLfetch |
| .code advconc/fakemain/fakemain.go /STARTFETCHCASE /,/STOPFETCHCASE / HLfetch |
| |
| * Fix: Filter items before adding to pending |
| |
| .code advconc/fakemain/fakemain.go /STARTSEEN /,/STOPSEEN / HLseen |
| .code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe |
| |
| * Issue: Pending queue grows without bound |
| |
| .code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe |
| |
| * Fix: Disable fetch case when too much pending |
| |
| const maxPending = 10 |
| |
| .code advconc/fakemain/fakemain.go /STARTCAP /,/STOPCAP / HLcap |
| |
| Could instead drop older items from the head of `pending`. |
| |
| * Issue: Loop blocks on Fetch |
| |
| .code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLfetch |
| |
| * Fix: Run Fetch asynchronously |
| |
| Add a new `select` case for `fetchDone`. |
| |
| type fetchResult struct{ fetched []Item; next time.Time; err error } |
| |
| .code advconc/fakemain/fakemain.go /STARTFETCHDONE /,/STOPFETCHDONE / HLfetch |
| .code advconc/fakemain/fakemain.go /STARTFETCHIF /,/STOPFETCHIF / HLfetch |
| .code advconc/fakemain/fakemain.go /STARTFETCHASYNC /,/STOPFETCHASYNC / HLfetch |
| |
| * Implemented Subscribe |
| |
| Responsive. Cleans up. Easy to read and change. |
| |
| Three techniques: |
| |
| - `for-select` loop |
| - service channel, reply channels (`chan`chan`error`) |
| - `nil` channels in `select` cases |
| |
| More details online, including `Merge`. |
| |
| .image advconc/gopherhat.jpg 200 158 |
| |
| * Conclusion |
| |
| Concurrent programming can be tricky. |
| |
| Go makes it easier: |
| |
| - channels convey data, timer events, cancellation signals |
| - goroutines serialize access to local mutable state |
| - stack traces & deadlock detector |
| - race detector |
| |
| .image advconc/race.png 200 750 |
| |
| * Links |
| |
| Go Concurrency Patterns (2012) |
| |
| .link /talks/2012/concurrency.slide go.dev/talks/2012/concurrency.slide |
| |
| Concurrency is not parallelism |
| |
| .link /s/concurrency-is-not-parallelism go.dev/s/concurrency-is-not-parallelism |
| |
| Share memory by communicating |
| |
| .link /doc/codewalk/sharemem go.dev/doc/codewalk/sharemem |
| |
| Go Tour (learn Go in your browser) |
| |
| .link /tour/ go.dev/tour |