blob: cb7d127acfbf9aaac2918c07b2247a1437c09654 [file] [log] [blame]
Advanced Go Concurrency Patterns
Sameer Ajmani
Google
http://profiles.google.com/ajmani
@Sajma
http://golang.org
* Video
This talk was presented at Google I/O in May 2013.
.link https://www.youtube.com/watch?v=QDDwwePbDtw Watch the talk on YouTube
* Get ready
.image advconc/gopherswim.jpg 400 400
* Go supports concurrency
In the language and runtime, not a library.
This changes how you structure your programs.
* Goroutines and Channels
Goroutines are independently executing functions in the same address space.
go f()
go g(1, 2)
Channels are typed values that allow goroutines to synchronize and exchange information.
c := make(chan int)
go func() { c <- 3 }()
n := <-c
For more on the basics, watch [[http://talks.golang.org/2012/concurrency.slide#1][Go Concurrency Patterns (Pike, 2012)]].
* Example: ping-pong
.play advconc/pingpong/pingpong.go /STARTMAIN1/,/STOPMAIN1/
* Deadlock detection
.play advconc/pingpongdeadlock/pingpongdeadlock.go /STARTMAIN1/,/STOPMAIN1/
* Panic dumps the stacks
.play advconc/pingpongpanic/pingpongpanic.go /STARTMAIN1/,/STOPMAIN1/
* It's easy to go, but how to stop?
Long-lived programs need to clean up.
Let's look at how to write programs that handle communication, periodic events, and cancellation.
The core is Go's `select` statement: like a `switch`, but the decision is made based on the ability to communicate.
select {
case xc <- x:
// sent x on xc
case y := <-yc:
// received y from yc
}
* Example: feed reader
My favorite feed reader disappeared. I need a new one.
Why not write one?
Where do we start?
* Find an RSS client
Searching [[http://godoc.org][godoc.org]] for *"rss"* turns up several hits, including one that provides:
// Fetch fetches Items for uri and returns the time when the next
// fetch should be attempted. On failure, Fetch returns an error.
func Fetch(uri string) (items []Item, next time.Time, err error)
type Item struct{
Title, Channel, GUID string // a subset of RSS fields
}
But I want a stream:
<-chan Item
And I want multiple subscriptions.
* Here's what we have
type Fetcher interface {
Fetch() (items []Item, next time.Time, err error)
}
func Fetch(domain string) Fetcher {...} // fetches Items from domain
* Here's what we want
type Subscription interface {
Updates() <-chan Item // stream of Items
Close() error // shuts down the stream
}
func Subscribe(fetcher Fetcher) Subscription {...} // converts Fetches to a stream
func Merge(subs ...Subscription) Subscription {...} // merges several streams
* Example
.play advconc/fakemain/fakemain.go /func main/,/^}/
* Subscribe
`Subscribe` creates a new `Subscription` that repeatedly fetches items until `Close` is called.
func Subscribe(fetcher Fetcher) Subscription {
s := &sub{
fetcher: fetcher,
updates: make(chan Item), // for Updates
}
go s.loop()
return s
}
// sub implements the Subscription interface.
type sub struct {
fetcher Fetcher // fetches items
updates chan Item // delivers items to the user
}
// loop fetches items using s.fetcher and sends them
// on s.updates. loop exits when s.Close is called.
func (s *sub) loop() {...}
* Implementing Subscription
To implement the `Subscription` interface, define `Updates` and `Close`.
.code advconc/fakemain/fakemain.go /func.* Updates/,/^}/
func (s *sub) Close() error {
// TODO: make loop exit
// TODO: find out about any error
return err
}
* What does loop do?
- periodically call `Fetch`
- send fetched items on the `Updates` channel
- exit when `Close` is called, reporting any error
* Naive Implementation
# Not quite enough room for this; retry after format change:
# .play advconc/naivemain/naivemain.go /naiveSub\) loop/,/^}/
# also on subsequent slides.
.play advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE /
.code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/
* Bug 1: unsynchronized access to s.closed/s.err
.code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsync
.code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/ HLsync
* Race Detector
go run -race naivemain.go
# original is 400x1500
.image advconc/race.png 150 562
.play advconc/naivemain/naivemain.go /STARTNAIVE /,/s.err/ HLsync
.code advconc/naivemain/naivemain.go /naiveSub\) Close/,/^}/ HLsync
* Bug 2: time.Sleep may keep loop running
.code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsleep
* Bug 3: loop may block forever on s.updates
.code advconc/naivemain/naivemain.go /STARTNAIVE /,/STOPNAIVE / HLsend
* Solution
Change the body of `loop` to a `select` with three cases:
- `Close` was called
- it's time to call `Fetch`
- send an item on `s.updates`
* Structure: for-select loop
`loop` runs in its own goroutine.
`select` lets `loop` avoid blocking indefinitely in any one state.
func (s *sub) loop() {
... declare mutable state ...
for {
... set up channels for cases ...
select {
case <-c1:
... read/write state ...
case c2 <- x:
... read/write state ...
case y := <-c3:
... read/write state ...
}
}
}
The cases interact via local state in `loop`.
* Case 1: Close
`Close` communicates with `loop` via `s.closing`.
type sub struct {
closing chan chan error
}
The service (`loop`) listens for requests on its channel (`s.closing`).
The client (`Close`) sends a request on `s.closing`: _exit_and_reply_with_the_error_
In this case, the only thing in the request is the _reply_channel_.
* Case 1: Close
`Close` asks loop to exit and waits for a response.
.code advconc/fakemain/fakemain.go /\*sub\) Close/,/^}/ HLchan
`loop` handles `Close` by replying with the `Fetch` error and exiting.
.code advconc/fakemain/fakemain.go /STARTCLOSEONLY /,/STOPCLOSEONLY / HLchan
* Case 2: Fetch
Schedule the next `Fetch` after some delay.
.code advconc/fakemain/fakemain.go /STARTFETCHONLY /,/STOPFETCHONLY /
* Case 3: Send
Send the fetched items, one at a time.
var pending []Item // appended by fetch; consumed by send
for {
select {
case s.updates <- pending[0]:
pending = pending[1:]
}
}
Whoops. This crashes.
.image advconc/gopherswrench.jpg 200 337
* Select and nil channels
Sends and receives on nil channels block.
Select never selects a blocking case.
.play advconc/nilselect/nilselect.go /func main/,/^}/
* Case 3: Send (fixed)
Enable send only when pending is non-empty.
.code advconc/fakemain/fakemain.go /STARTSENDONLY /,/STOPSENDONLY / HLupdates
* Select
Put the three cases together:
.code advconc/fakemain/fakemain.go /STARTSELECT /,/STOPSELECT /
The cases interact via `err`, `next`, and `pending`.
No locks, no condition variables, no callbacks.
* Bugs fixed
- Bug 1: unsynchronized access to `s.closed` and `s.err`
- Bug 2: `time.Sleep` may keep loop running
- Bug 3: `loop` may block forever sending on `s.updates`
.code advconc/fakemain/fakemain.go /STARTSELECT /,/STOPSELECT / HLcases
* We can improve loop further
* Issue: Fetch may return duplicates
.code advconc/fakemain/fakemain.go /STARTFETCHVARS /,/STOPFETCHVARS / HLfetch
.code advconc/fakemain/fakemain.go /STARTFETCHCASE /,/STOPFETCHCASE / HLfetch
* Fix: Filter items before adding to pending
.code advconc/fakemain/fakemain.go /STARTSEEN /,/STOPSEEN / HLseen
.code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe
* Issue: Pending queue grows without bound
.code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe
* Fix: Disable fetch case when too much pending
const maxPending = 10
.code advconc/fakemain/fakemain.go /STARTCAP /,/STOPCAP / HLcap
Could instead drop older items from the head of `pending`.
* Issue: Loop blocks on Fetch
.code advconc/fakemain/fakemain.go /STARTDEDUPE /,/STOPDEDUPE / HLfetch
* Fix: Run Fetch asynchronously
Add a new `select` case for `fetchDone`.
type fetchResult struct{ fetched []Item; next time.Time; err error }
.code advconc/fakemain/fakemain.go /STARTFETCHDONE /,/STOPFETCHDONE / HLfetch
.code advconc/fakemain/fakemain.go /STARTFETCHIF /,/STOPFETCHIF / HLfetch
.code advconc/fakemain/fakemain.go /STARTFETCHASYNC /,/STOPFETCHASYNC / HLfetch
* Implemented Subscribe
Responsive. Cleans up. Easy to read and change.
Three techniques:
- `for-select` loop
- service channel, reply channels (`chan`chan`error`)
- `nil` channels in `select` cases
More details online, including `Merge`.
.image advconc/gopherhat.jpg 200 158
* Conclusion
Concurrent programming can be tricky.
Go makes it easier:
- channels convey data, timer events, cancellation signals
- goroutines serialize access to local mutable state
- stack traces & deadlock detector
- race detector
.image advconc/race.png 200 750
* Links
Go Concurrency Patterns (2012)
.link http://talks.golang.org/2012/concurrency.slide
Concurrency is not parallelism
.link http://golang.org/s/concurrency-is-not-parallelism
Share memory by communicating
.link http://golang.org/doc/codewalk/sharemem
Go Tour (learn Go in your browser)
.link http://tour.golang.org