go.talks: add Advanced Go Concurrency Patterns
R=snanda
CC=golang-dev
https://golang.org/cl/9626044
diff --git a/2013/advconc.slide b/2013/advconc.slide
new file mode 100644
index 0000000..3f3869f
--- /dev/null
+++ b/2013/advconc.slide
@@ -0,0 +1,384 @@
+Advanced Go Concurrency Patterns
+
+Sameer Ajmani
+Google
+http://profiles.google.com/ajmani
+@Sajma
+http://golang.org
+
+* Video
+
+This talk was presented at Google I/O in May 2013.
+
+.link https://www.youtube.com/watch?v=QDDwwePbDtw Watch the talk on YouTube
+
+* Get ready
+
+.image advconc/gopherswim.jpg 400 400
+
+* Go supports concurrency
+
+In the language and runtime, not a library.
+
+This changes how you structure your programs.
+
+* Goroutines and Channels
+
+Goroutines are independently executing functions in the same address space.
+
+ go f()
+ go g(1, 2)
+
+Channels are typed values that allow goroutines to synchronize and exchange information.
+
+ c := make(chan int)
+ go func() { c <- 3 }()
+ n := <-c
+
+For more on the basics, watch [[http://talks.golang.org/2012/concurrency.slide#1][Go Concurrency Patterns (Pike, 2012)]].
+
+* Example: ping-pong
+
+.play advconc/pingpong1.go /STARTMAIN1/,/STOPMAIN1/
+
+* Deadlock detection
+
+.play advconc/pingpongdeadlock.go /STARTMAIN1/,/STOPMAIN1/
+
+* Panic dumps the stacks
+
+.play advconc/pingpongpanic.go /STARTMAIN1/,/STOPMAIN1/
+
+* It's easy to go, but how to stop?
+
+Long-lived programs need to clean up.
+
+Let's look at how to write programs that handle communication, periodic events, and cancellation.
+
+The core is Go's `select` statement: like a `switch`, but the decision is made based on the ability to communicate.
+
+ select {
+ case xc <- x:
+ // sent x on xc
+ case y := <-yc:
+ // received y from yc
+ }
+
+* Example: feed reader
+
+My favorite feed reader disappeared. I need a new one.
+
+Why not write one?
+
+Where do we start?
+
+* Find an RSS client
+
+Searching [[http://godoc.org][godoc.org]] for *"rss"* turns up several hits, including one that provides:
+
+ // Fetch fetches Items for uri and returns the time when the next
+ // fetch should be attempted. On failure, Fetch returns an error.
+ func Fetch(uri string) (items []Item, next time.Time, err error)
+
+ type Item struct{
+ Title, Channel, GUID string // a subset of RSS fields
+ }
+
+But I want a stream:
+
+ <-chan Item
+
+And I want multiple subscriptions.
+
+* Here's what we have
+
+ type Fetcher interface {
+ Fetch() (items []Item, next time.Time, err error)
+ }
+
+ func Fetch(domain string) Fetcher {...} // fetches Items from domain
+
+* Here's what we want
+
+ type Subscription interface {
+ Updates() <-chan Item // stream of Items
+ Close() error // shuts down the stream
+ }
+
+ func Subscribe(fetcher Fetcher) Subscription {...} // converts Fetches to a stream
+
+ func Merge(subs ...Subscription) Subscription {...} // merges several streams
+
+* Example
+
+.play advconc/fakemain.go /func main/,/^}/
+
+* Subscribe
+
+`Subscribe` creates a new `Subscription` that repeatedly fetches items until `Close` is called.
+
+ func Subscribe(fetcher Fetcher) Subscription {
+ s := &sub{
+ fetcher: fetcher,
+ updates: make(chan Item), // for Updates
+ }
+ go s.loop()
+ return s
+ }
+
+ // sub implements the Subscription interface.
+ type sub struct {
+ fetcher Fetcher // fetches items
+ updates chan Item // delivers items to the user
+ }
+
+ // loop fetches items using s.fetcher and sends them
+ // on s.updates. loop exits when s.Close is called.
+ func (s *sub) loop() {...}
+
+* Implementing Subscription
+
+To implement the `Subscription` interface, define `Updates` and `Close`.
+
+.code advconc/reader/reader.go /func.* Updates/,/^}/
+
+ func (s *sub) Close() error {
+ // TODO: make loop exit
+ // TODO: find out about any error
+ return err
+ }
+
+* What does loop do?
+
+- periodically call `Fetch`
+- send fetched items on the `Updates` channel
+- exit when `Close` is called, reporting any error
+
+* Naive Implementation
+
+# Not quite enough room for this; retry after format change:
+# .play advconc/naivemain.go /naiveSub\) loop/,/^}/
+# also on subsequent slides.
+
+.play advconc/naivemain.go /STARTNAIVE/,/STOPNAIVE/
+.code advconc/naivemain.go /naiveSub\) Close/,/^}/
+
+* Bug 1: unsynchronized access to s.closed/s.err
+
+.code advconc/naivemain.go /STARTNAIVE/,/STOPNAIVE/ HLsync
+.code advconc/naivemain.go /naiveSub\) Close/,/^}/ HLsync
+
+* Race Detector
+
+ go run -race naivemain.go
+
+# original is 400x1500
+.image advconc/race.png 150 562
+.play advconc/naivemain.go /STARTNAIVE/,/s.err/ HLsync
+.code advconc/naivemain.go /naiveSub\) Close/,/^}/ HLsync
+
+#* Race demo
+#.play go1.1/race.go
+
+* Bug 2: time.Sleep may keep loop running
+
+.code advconc/naivemain.go /STARTNAIVE/,/STOPNAIVE/ HLsleep
+
+* Bug 3: loop may block forever on s.updates
+
+.code advconc/naivemain.go /STARTNAIVE/,/STOPNAIVE/ HLsend
+
+* Solution
+
+Change the body of `loop` to a `select` with three cases:
+
+- `Close` was called
+- it's time to call `Fetch`
+- send an item on `s.updates`
+
+* Structure: for-select loop
+
+`loop` runs in its own goroutine.
+
+`select` lets `loop` avoid blocking indefinitely in any one state.
+
+ func (s *sub) loop() {
+ ... declare mutable state ...
+ for {
+ ... set up channels for cases ...
+ select {
+ case <-c1:
+ ... read/write state ...
+ case c2 <- x:
+ ... read/write state ...
+ case y := <-c3:
+ ... read/write state ...
+ }
+ }
+ }
+
+The cases interact via local state in `loop`.
+
+* Case 1: Close
+
+`Close` communicates with `loop` via `s.closing`.
+
+ type sub struct {
+ closing chan chan error
+ }
+
+The service (`loop`) listens for requests on its channel (`s.closing`).
+
+The client (`Close`) sends a request on `s.closing`: _exit_and_reply_with_the_error_
+
+In this case, the only thing in the request is the _reply_channel_.
+
+* Case 1: Close
+
+`Close` asks loop to exit and waits for a response.
+
+.code advconc/reader/reader.go /\*sub\) Close/,/^}/ HLchan
+
+`loop` handles `Close` by replying with the `Fetch` error and exiting.
+
+.code advconc/reader/reader.go /STARTCLOSEONLY /,/STOPCLOSEONLY / HLchan
+
+* Case 2: Fetch
+
+Schedule the next `Fetch` after some delay.
+
+.code advconc/reader/reader.go /STARTFETCHONLY /,/STOPFETCHONLY /
+
+* Case 3: Send
+
+Send the fetched items, one at a time.
+
+ var pending []Item // appended by fetch; consumed by send
+ for {
+ select {
+ case s.updates <- pending[0]:
+ pending = pending[1:]
+ }
+ }
+
+Whoops. This crashes.
+
+.image advconc/gopherswrench.jpg 200 337
+
+* Select and nil channels
+
+Sends and receives on nil channels block.
+
+Select never selects a blocking case.
+
+.play advconc/nilselect.go /func main/,/^}/
+
+* Case 3: Send (fixed)
+
+Enable send only when pending is non-empty.
+
+.code advconc/reader/reader.go /STARTSENDONLY /,/STOPSENDONLY / HLupdates
+
+* Select
+
+Put the three cases together:
+
+.code advconc/reader/reader.go /STARTSELECT /,/STOPSELECT /
+
+The cases interact via `err`, `next`, and `pending`.
+
+No locks, no condition variables, no callbacks.
+
+* Bugs fixed
+
+- Bug 1: unsynchronized access to `s.closed` and `s.err`
+- Bug 2: `time.Sleep` may keep loop running
+- Bug 3: `loop` may block forever sending on `s.updates`
+
+.code advconc/reader/reader.go /STARTSELECT /,/STOPSELECT / HLcases
+
+* We can improve loop further
+
+* Issue: Fetch may return duplicates
+
+.code advconc/reader/reader.go /STARTFETCHVARS /,/STOPFETCHVARS / HLfetch
+.code advconc/reader/reader.go /STARTFETCHCASE /,/STOPFETCHCASE / HLfetch
+
+* Fix: Filter items before adding to pending
+
+.code advconc/reader/reader.go /STARTSEEN /,/STOPSEEN / HLseen
+.code advconc/reader/reader.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe
+
+* Issue: Pending queue grows without bound
+
+.code advconc/reader/reader.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe
+
+* Fix: Disable fetch case when too much pending
+
+ const maxPending = 10
+
+.code advconc/reader/reader.go /STARTCAP /,/STOPCAP / HLcap
+
+Could instead drop older items from the head of `pending`.
+
+* Issue: Loop blocks on Fetch
+
+.code advconc/reader/reader.go /STARTDEDUPE /,/STOPDEDUPE / HLfetch
+
+* Fix: Run Fetch asynchronously
+
+Add a new `select` case for `fetchDone`.
+
+ type fetchResult struct{ fetched []Item; next time.Time; err error }
+
+.code advconc/reader/reader.go /STARTFETCHDONE /,/STOPFETCHDONE / HLfetch
+.code advconc/reader/reader.go /STARTFETCHIF /,/STOPFETCHIF / HLfetch
+.code advconc/reader/reader.go /STARTFETCHASYNC /,/STOPFETCHASYNC / HLfetch
+
+* Implemented Subscribe
+
+Responsive. Cleans up. Easy to read and change.
+
+Three techniques:
+
+- `for-select` loop
+- service channel, reply channels (`chan`chan`error`)
+- `nil` channels in `select` cases
+
+More details online, including `Merge`.
+
+.image advconc/gopherhat.jpg 200 158
+
+* Conclusion
+
+Concurrent programming can be tricky.
+
+Go makes it easier:
+
+- channels convey data, timer events, cancellation signals
+- goroutines serialize access to local mutable state
+- stack traces & deadlock detector
+- race detector
+
+.image advconc/race.png 200 750
+
+* Links
+
+Go Concurrency Patterns (2012)
+
+.link http://talks.golang.org/2012/concurrency.slide
+
+Concurrency is not parallelism
+
+.link http://golang.org/s/concurrency-is-not-parallelism
+
+Share memory by communicating
+
+.link http://golang.org/doc/codewalk/sharemem
+
+Go Tour (learn Go in your browser)
+
+.link http://tour.golang.org
+
+*Go*Team*Fireside*Chat*
+5:20pm Room 2
diff --git a/2013/advconc/buffer.go b/2013/advconc/buffer.go
new file mode 100644
index 0000000..0cd8d9a
--- /dev/null
+++ b/2013/advconc/buffer.go
@@ -0,0 +1,43 @@
+package main
+
+import (
+ "fmt"
+)
+
+func main() {
+ in, out := make(chan int), make(chan int)
+ go buffer(in, out)
+ for i := 0; i < 10; i++ {
+ in <- i
+ }
+ close(in)
+ for i := range out {
+ fmt.Println(i)
+ }
+}
+
+// buffer provides an unbounded buffer between in and out. buffer
+// exits when in is closed and all items in the buffer have been sent
+// to out, at which point it closes out.
+func buffer(in <-chan int, out chan<- int) {
+ var buf []int
+ for in != nil || len(buf) > 0 {
+ var i int
+ var c chan<- int
+ if len(buf) > 0 {
+ i = buf[0]
+ c = out // enable send case
+ }
+ select {
+ case n, ok := <-in:
+ if ok {
+ buf = append(buf, n)
+ } else {
+ in = nil // disable receive case
+ }
+ case c <- i:
+ buf = buf[1:]
+ }
+ }
+ close(out)
+}
diff --git a/2013/advconc/dedupermain.go b/2013/advconc/dedupermain.go
new file mode 100644
index 0000000..898c175
--- /dev/null
+++ b/2013/advconc/dedupermain.go
@@ -0,0 +1,41 @@
+package main
+
+import (
+ "fmt"
+ "math/rand"
+ "time"
+
+ . "code.google.com/p/go.talks/2013/reader"
+)
+
+func init() {
+ rand.Seed(time.Now().UnixNano())
+ FakeFetch = true
+}
+
+// STARTMAIN OMIT
+func main() {
+ // STARTMERGECALL OMIT
+ // Subscribe to some feeds, and create a merged update stream.
+ merged := Dedupe(Merge(
+ Subscribe(Fetch("blog.golang.org")),
+ Subscribe(Fetch("blog.golang.org")),
+ Subscribe(Fetch("blog.golang.org")),
+ Subscribe(Fetch("googleblog.blogspot.com")),
+ Subscribe(Fetch("googledevelopers.blogspot.com"))))
+ // STOPMERGECALL OMIT
+
+ // Close the subscriptions after some time.
+ time.AfterFunc(3*time.Second, func() {
+ fmt.Println("closed:", merged.Close())
+ })
+
+ // Print the stream.
+ for it := range merged.Updates() {
+ fmt.Println(it.Channel, it.Title)
+ }
+
+ panic("show me the stacks")
+}
+
+// STOPMAIN OMIT
diff --git a/2013/advconc/fakemain.go b/2013/advconc/fakemain.go
new file mode 100644
index 0000000..bb1d44f
--- /dev/null
+++ b/2013/advconc/fakemain.go
@@ -0,0 +1,39 @@
+package main
+
+import (
+ "fmt"
+ "math/rand"
+ "time"
+
+ . "code.google.com/p/go.talks/2013/reader"
+)
+
+func init() {
+ rand.Seed(time.Now().UnixNano())
+ FakeFetch = true
+}
+
+// STARTMAIN OMIT
+func main() {
+ // STARTMERGECALL OMIT
+ // Subscribe to some feeds, and create a merged update stream.
+ merged := Merge(
+ Subscribe(Fetch("blog.golang.org")),
+ Subscribe(Fetch("googleblog.blogspot.com")),
+ Subscribe(Fetch("googledevelopers.blogspot.com")))
+ // STOPMERGECALL OMIT
+
+ // Close the subscriptions after some time.
+ time.AfterFunc(3*time.Second, func() {
+ fmt.Println("closed:", merged.Close())
+ })
+
+ // Print the stream.
+ for it := range merged.Updates() {
+ fmt.Println(it.Channel, it.Title)
+ }
+
+ panic("show me the stacks")
+}
+
+// STOPMAIN OMIT
diff --git a/2013/advconc/fakemainnomerge.go b/2013/advconc/fakemainnomerge.go
new file mode 100644
index 0000000..c5bca2a
--- /dev/null
+++ b/2013/advconc/fakemainnomerge.go
@@ -0,0 +1,33 @@
+package main
+
+import (
+ "fmt"
+ "math/rand"
+ "time"
+
+ . "code.google.com/p/go.talks/2013/reader"
+)
+
+func init() {
+ rand.Seed(time.Now().UnixNano())
+ FakeFetch = true
+}
+
+// STARTMAIN OMIT
+func main() {
+ sub := Subscribe(Fetch("blog.golang.org"))
+
+ // Close the subscription after some time.
+ time.AfterFunc(3*time.Second, func() {
+ fmt.Println("closed:", sub.Close())
+ })
+
+ // Print the stream.
+ for it := range sub.Updates() {
+ fmt.Println(it.Channel, it.Title)
+ }
+
+ panic("show me the stacks")
+}
+
+// STOPMAIN OMIT
diff --git a/2013/advconc/gopherhat.jpg b/2013/advconc/gopherhat.jpg
new file mode 100644
index 0000000..f34d7b3
--- /dev/null
+++ b/2013/advconc/gopherhat.jpg
Binary files differ
diff --git a/2013/advconc/gopherrunning.jpg b/2013/advconc/gopherrunning.jpg
new file mode 100644
index 0000000..eeeddf1
--- /dev/null
+++ b/2013/advconc/gopherrunning.jpg
Binary files differ
diff --git a/2013/advconc/gopherswim.jpg b/2013/advconc/gopherswim.jpg
new file mode 100644
index 0000000..2f32877
--- /dev/null
+++ b/2013/advconc/gopherswim.jpg
Binary files differ
diff --git a/2013/advconc/gopherswrench.jpg b/2013/advconc/gopherswrench.jpg
new file mode 100644
index 0000000..93005f4
--- /dev/null
+++ b/2013/advconc/gopherswrench.jpg
Binary files differ
diff --git a/2013/advconc/naivemain.go b/2013/advconc/naivemain.go
new file mode 100644
index 0000000..072608f
--- /dev/null
+++ b/2013/advconc/naivemain.go
@@ -0,0 +1,85 @@
+package main
+
+import (
+ "fmt"
+ "math/rand"
+ "time"
+
+ . "code.google.com/p/go.talks/2013/reader"
+)
+
+func NaiveSubscribe(fetcher Fetcher) Subscription {
+ s := &naiveSub{
+ fetcher: fetcher,
+ updates: make(chan Item),
+ }
+ go s.loop()
+ return s
+}
+
+type naiveSub struct {
+ fetcher Fetcher
+ updates chan Item
+ closed bool
+ err error
+}
+
+func (s *naiveSub) Updates() <-chan Item {
+ return s.updates
+}
+
+func (s *naiveSub) loop() {
+ // STARTNAIVE OMIT
+ for {
+ if s.closed { // HLsync
+ close(s.updates)
+ return
+ }
+ items, next, err := s.fetcher.Fetch()
+ if err != nil {
+ s.err = err // HLsync
+ time.Sleep(10 * time.Second) // HLsleep
+ continue
+ }
+ for _, item := range items {
+ s.updates <- item // HLsend
+ }
+ if now := time.Now(); next.After(now) {
+ time.Sleep(next.Sub(now)) // HLsleep
+ }
+ }
+ // STOPNAIVE OMIT
+}
+
+func (s *naiveSub) Close() error {
+ s.closed = true // HLsync
+ return s.err // HLsync
+}
+
+func init() {
+ rand.Seed(time.Now().UnixNano())
+ FakeFetch = true
+}
+
+func main() {
+ // Subscribe to some feeds, and create a merged update stream.
+ merged := Merge(
+ NaiveSubscribe(Fetch("blog.golang.org")),
+ NaiveSubscribe(Fetch("googleblog.blogspot.com")),
+ NaiveSubscribe(Fetch("googledevelopers.blogspot.com")))
+
+ // Close the subscriptions after some time.
+ time.AfterFunc(3*time.Second, func() {
+ fmt.Println("closed:", merged.Close())
+ })
+
+ // Print the stream.
+ for it := range merged.Updates() {
+ fmt.Println(it.Channel, it.Title)
+ }
+
+ // The loops are still running. Let the race detector notice.
+ time.Sleep(1 * time.Second)
+
+ panic("show me the stacks")
+}
diff --git a/2013/advconc/naivemainnomerge.go b/2013/advconc/naivemainnomerge.go
new file mode 100644
index 0000000..9564160
--- /dev/null
+++ b/2013/advconc/naivemainnomerge.go
@@ -0,0 +1,30 @@
+package main
+
+import (
+ "fmt"
+ "math/rand"
+ "time"
+
+ . "code.google.com/p/go.talks/2013/reader"
+)
+
+func init() {
+ rand.Seed(time.Now().UnixNano())
+ FakeFetch = true
+}
+
+func main() {
+ sub := NaiveSubscribe(Fetch("blog.golang.org"))
+
+ // Close the subscription after some time.
+ time.AfterFunc(3*time.Second, func() {
+ fmt.Println("closed:", sub.Close())
+ })
+
+ // Print the stream.
+ for it := range sub.Updates() {
+ fmt.Println(it.Channel, it.Title)
+ }
+
+ panic("show me the stacks")
+}
diff --git a/2013/advconc/nilselect.go b/2013/advconc/nilselect.go
new file mode 100644
index 0000000..f84b612
--- /dev/null
+++ b/2013/advconc/nilselect.go
@@ -0,0 +1,30 @@
+package main
+
+import (
+ "fmt"
+ "math/rand"
+ "time"
+)
+
+func init() {
+ rand.Seed(time.Now().UnixNano())
+}
+
+func main() {
+ a, b := make(chan string), make(chan string)
+ go func() { a <- "a" }()
+ go func() { b <- "b" }()
+ if rand.Intn(2) == 0 {
+ a = nil // HL
+ fmt.Println("nil a")
+ } else {
+ b = nil // HL
+ fmt.Println("nil b")
+ }
+ select {
+ case s := <-a:
+ fmt.Println("got", s)
+ case s := <-b:
+ fmt.Println("got", s)
+ }
+}
diff --git a/2013/advconc/pingpong1.go b/2013/advconc/pingpong1.go
new file mode 100644
index 0000000..80acfff
--- /dev/null
+++ b/2013/advconc/pingpong1.go
@@ -0,0 +1,31 @@
+package main
+
+import (
+ "fmt"
+ "time"
+)
+
+// STARTMAIN1 OMIT
+type Ball struct{ hits int }
+
+func main() {
+ table := make(chan *Ball)
+ go player("ping", table)
+ go player("pong", table)
+
+ table <- new(Ball) // game on; toss the ball
+ time.Sleep(1 * time.Second)
+ <-table // game over; grab the ball
+}
+
+func player(name string, table chan *Ball) {
+ for {
+ ball := <-table
+ ball.hits++
+ fmt.Println(name, ball.hits)
+ time.Sleep(100 * time.Millisecond)
+ table <- ball
+ }
+}
+
+// STOPMAIN1 OMIT
diff --git a/2013/advconc/pingpong2.go b/2013/advconc/pingpong2.go
new file mode 100644
index 0000000..0aca88f
--- /dev/null
+++ b/2013/advconc/pingpong2.go
@@ -0,0 +1,42 @@
+package main
+
+import (
+ "fmt"
+ "time"
+)
+
+// STARTMAIN1 OMIT
+type Ball struct{}
+
+func main() {
+ table := make(chan Ball)
+ stop1, stop2 := make(chan int), make(chan int)
+ go player("ping", table, stop1)
+ go player("pong", table, stop2)
+
+ table <- Ball{}
+ time.Sleep(1 * time.Second)
+ stop1 <- 1
+ stop2 <- 1
+ <-stop1
+ <-stop2
+}
+
+// STOPMAIN1 OMIT
+
+// STARTPLAYER1 OMIT
+func player(name string, table chan Ball, stop chan int) {
+ for {
+ select {
+ case ball := <-table:
+ fmt.Println(name)
+ time.Sleep(100 * time.Millisecond)
+ table <- ball
+ case <-stop:
+ fmt.Println(name, "done")
+ stop <- 1
+ }
+ }
+}
+
+// STOPPLAYER1 OMIT
diff --git a/2013/advconc/pingpongdeadlock.go b/2013/advconc/pingpongdeadlock.go
new file mode 100644
index 0000000..4acbb6b
--- /dev/null
+++ b/2013/advconc/pingpongdeadlock.go
@@ -0,0 +1,31 @@
+package main
+
+import (
+ "fmt"
+ "time"
+)
+
+// STARTMAIN1 OMIT
+type Ball struct{ hits int }
+
+func main() {
+ table := make(chan *Ball)
+ go player("ping", table)
+ go player("pong", table)
+
+ // table <- new(Ball) // game on; toss the ball // HL
+ time.Sleep(1 * time.Second)
+ <-table // game over; grab the ball
+}
+
+func player(name string, table chan *Ball) {
+ for {
+ ball := <-table
+ ball.hits++
+ fmt.Println(name, ball.hits)
+ time.Sleep(100 * time.Millisecond)
+ table <- ball
+ }
+}
+
+// STOPMAIN1 OMIT
diff --git a/2013/advconc/pingpongpanic.go b/2013/advconc/pingpongpanic.go
new file mode 100644
index 0000000..ce6f4b3
--- /dev/null
+++ b/2013/advconc/pingpongpanic.go
@@ -0,0 +1,33 @@
+package main
+
+import (
+ "fmt"
+ "time"
+)
+
+// STARTMAIN1 OMIT
+type Ball struct{ hits int }
+
+func main() {
+ table := make(chan *Ball)
+ go player("ping", table)
+ go player("pong", table)
+
+ table <- new(Ball) // game on; toss the ball
+ time.Sleep(1 * time.Second)
+ <-table // game over; grab the ball
+
+ panic("show me the stacks") // HL
+}
+
+func player(name string, table chan *Ball) {
+ for {
+ ball := <-table
+ ball.hits++
+ fmt.Println(name, ball.hits)
+ time.Sleep(100 * time.Millisecond)
+ table <- ball
+ }
+}
+
+// STOPMAIN1 OMIT
diff --git a/2013/advconc/race.out b/2013/advconc/race.out
new file mode 100644
index 0000000..955794f
--- /dev/null
+++ b/2013/advconc/race.out
@@ -0,0 +1,62 @@
+googleblog.blogspot.com Item 0
+googleblog.blogspot.com Item 1
+googledevelopers.blogspot.com Item 0
+blog.golang.org Item 0
+googledevelopers.blogspot.com Item 1
+blog.golang.org Item 1
+blog.golang.org Item 2
+googledevelopers.blogspot.com Item 2
+googledevelopers.blogspot.com Item 3
+googleblog.blogspot.com Item 2
+googledevelopers.blogspot.com Item 4
+==================
+WARNING: DATA RACE
+Read by goroutine 4: // HL
+ code.google.com/p/go.talks/2013/reader.(*naiveSub).loop() // HL
+ /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:105 +0x46
+ gosched0()
+ /Users/sameer/go/src/pkg/runtime/proc.c:1218 +0x9f
+
+Previous write by goroutine 7: // HL
+ code.google.com/p/go.talks/2013/reader.(*naiveSub).Close() // HL
+ /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:125 +0x38
+ code.google.com/p/go.talks/2013/reader.func·002()
+ /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:353 +0x1da
+ gosched0()
+ /Users/sameer/go/src/pkg/runtime/proc.c:1218 +0x9f
+
+Goroutine 4 (running) created at:
+ code.google.com/p/go.talks/2013/reader.NaiveSubscribe()
+ /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:87 +0xff
+ main.main()
+ /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/advconc/naivemain.go:18 +0xca
+ runtime.main()
+ /Users/sameer/go/src/pkg/runtime/proc.c:182 +0x91
+
+Goroutine 7 (finished) created at:
+ code.google.com/p/go.talks/2013/reader.Merge()
+ /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:357 +0x1f5
+ main.main()
+ /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/advconc/naivemain.go:20 +0x261
+ runtime.main()
+ /Users/sameer/go/src/pkg/runtime/proc.c:182 +0x91
+
+==================
+googleblog.blogspot.com Item 3
+closed: <nil>
+panic: show me the stacks
+
+goroutine 1 [running]:
+main.main()
+ /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/advconc/naivemain.go:33 +0x4f5
+
+goroutine 2 [syscall]:
+
+goroutine 5 [sleep]:
+time.Sleep(0xee5ad23)
+ /Users/sameer/go/src/pkg/runtime/ztime_darwin_amd64.c:19 +0x2f
+code.google.com/p/go.talks/2013/reader.(*naiveSub).loop(0xc2000aea50)
+ /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:119 +0x2a3
+created by code.google.com/p/go.talks/2013/reader.NaiveSubscribe
+ /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:87 +0xff
+exit status 2
diff --git a/2013/advconc/race.png b/2013/advconc/race.png
new file mode 100644
index 0000000..8934a64
--- /dev/null
+++ b/2013/advconc/race.png
Binary files differ
diff --git a/2013/advconc/reader/reader.go b/2013/advconc/reader/reader.go
new file mode 100644
index 0000000..980f2bd
--- /dev/null
+++ b/2013/advconc/reader/reader.go
@@ -0,0 +1,572 @@
+package reader
+
+import (
+ "fmt"
+ "math/rand"
+ "time"
+
+ rss "github.com/jteeuwen/go-pkg-rss"
+)
+
+// STARTITEM OMIT
+// An Item is a stripped-down RSS item.
+type Item struct{ Title, Channel, GUID string }
+
+// STOPITEM OMIT
+
+// STARTFETCHER OMIT
+// A Fetcher fetches Items and returns the time when the next fetch should be
+// attempted. On failure, Fetch returns a non-nil error.
+type Fetcher interface {
+ Fetch() (items []Item, next time.Time, err error)
+}
+
+// STOPFETCHER OMIT
+
+// STARTSUBSCRIPTION OMIT
+// A Subscription delivers Items over a channel. Close cancels the
+// subscription, closes the Updates channel, and returns the last fetch error,
+// if any.
+type Subscription interface {
+ Updates() <-chan Item
+ Close() error
+}
+
+// STOPSUBSCRIPTION OMIT
+
+// STARTSUBSCRIBE OMIT
+// Subscribe returns a new Subscription that uses fetcher to fetch Items.
+func Subscribe(fetcher Fetcher) Subscription {
+ s := &sub{
+ fetcher: fetcher,
+ updates: make(chan Item), // for Updates
+ closing: make(chan chan error), // for Close
+ }
+ go s.loop()
+ return s
+}
+
+// STOPSUBSCRIBE OMIT
+
+// sub implements the Subscription interface.
+type sub struct {
+ fetcher Fetcher // fetches items
+ updates chan Item // sends items to the user
+ closing chan chan error // for Close
+}
+
+// STARTUPDATES OMIT
+func (s *sub) Updates() <-chan Item {
+ return s.updates
+}
+
+// STOPUPDATES OMIT
+
+// STARTCLOSE OMIT
+// STARTCLOSESIG OMIT
+func (s *sub) Close() error {
+ // STOPCLOSESIG OMIT
+ errc := make(chan error)
+ s.closing <- errc // HLchan
+ return <-errc // HLchan
+}
+
+// STOPCLOSE OMIT
+
+// loopCloseOnly is a version of loop that includes only the logic
+// that handles Close.
+func (s *sub) loopCloseOnly() {
+ // STARTCLOSEONLY OMIT
+ var err error // set when Fetch fails
+ for {
+ select {
+ case errc := <-s.closing: // HLchan
+ errc <- err // HLchan
+ close(s.updates) // tells receiver we're done
+ return
+ }
+ }
+ // STOPCLOSEONLY OMIT
+}
+
+// loopFetchOnly is a version of loop that includes only the logic
+// that calls Fetch.
+func (s *sub) loopFetchOnly() {
+ // STARTFETCHONLY OMIT
+ var pending []Item // appended by fetch; consumed by send
+ var next time.Time // initially January 1, year 0
+ var err error
+ for {
+ var fetchDelay time.Duration // initally 0 (no delay)
+ if now := time.Now(); next.After(now) {
+ fetchDelay = next.Sub(now)
+ }
+ startFetch := time.After(fetchDelay)
+
+ select {
+ case <-startFetch:
+ var fetched []Item
+ fetched, next, err = s.fetcher.Fetch()
+ if err != nil {
+ next = time.Now().Add(10 * time.Second)
+ break
+ }
+ pending = append(pending, fetched...)
+ }
+ }
+ // STOPFETCHONLY OMIT
+}
+
+// loopSendOnly is a version of loop that includes only the logic for
+// sending items to s.updates.
+func (s *sub) loopSendOnly() {
+ // STARTSENDONLY OMIT
+ var pending []Item // appended by fetch; consumed by send
+ for {
+ var first Item
+ var updates chan Item // HLupdates
+ if len(pending) > 0 {
+ first = pending[0]
+ updates = s.updates // enable send case // HLupdates
+ }
+
+ select {
+ case updates <- first:
+ pending = pending[1:]
+ }
+ }
+ // STOPSENDONLY OMIT
+}
+
+// mergedLoop is a version of loop that combines loopCloseOnly,
+// loopFetchOnly, and loopSendOnly.
+func (s *sub) mergedLoop() {
+ // STARTFETCHVARS OMIT
+ var pending []Item
+ var next time.Time
+ var err error
+ // STOPFETCHVARS OMIT
+ for {
+ // STARTNOCAP OMIT
+ var fetchDelay time.Duration
+ if now := time.Now(); next.After(now) {
+ fetchDelay = next.Sub(now)
+ }
+ startFetch := time.After(fetchDelay)
+ // STOPNOCAP OMIT
+ var first Item
+ var updates chan Item
+ if len(pending) > 0 {
+ first = pending[0]
+ updates = s.updates // enable send case
+ }
+
+ // STARTSELECT OMIT
+ select {
+ case errc := <-s.closing: // HLcases
+ errc <- err
+ close(s.updates)
+ return
+ // STARTFETCHCASE OMIT
+ case <-startFetch: // HLcases
+ var fetched []Item
+ fetched, next, err = s.fetcher.Fetch() // HLfetch
+ if err != nil {
+ next = time.Now().Add(10 * time.Second)
+ break
+ }
+ pending = append(pending, fetched...) // HLfetch
+ // STOPFETCHCASE OMIT
+ case updates <- first: // HLcases
+ pending = pending[1:]
+ }
+ // STOPSELECT OMIT
+ }
+}
+
+// dedupeLoop extends mergedLoop with deduping of fetched items.
+func (s *sub) dedupeLoop() {
+ const maxPending = 10
+ // STARTSEEN OMIT
+ var pending []Item
+ var next time.Time
+ var err error
+ var seen = make(map[string]bool) // set of item.GUIDs // HLseen
+ // STOPSEEN OMIT
+ for {
+ // STARTCAP OMIT
+ var fetchDelay time.Duration
+ if now := time.Now(); next.After(now) {
+ fetchDelay = next.Sub(now)
+ }
+ var startFetch <-chan time.Time // HLcap
+ if len(pending) < maxPending { // HLcap
+ startFetch = time.After(fetchDelay) // enable fetch case // HLcap
+ } // HLcap
+ // STOPCAP OMIT
+ var first Item
+ var updates chan Item
+ if len(pending) > 0 {
+ first = pending[0]
+ updates = s.updates // enable send case
+ }
+ select {
+ case errc := <-s.closing:
+ errc <- err
+ close(s.updates)
+ return
+ // STARTDEDUPE OMIT
+ case <-startFetch:
+ var fetched []Item
+ fetched, next, err = s.fetcher.Fetch() // HLfetch
+ if err != nil {
+ next = time.Now().Add(10 * time.Second)
+ break
+ }
+ for _, item := range fetched {
+ if !seen[item.GUID] { // HLdupe
+ pending = append(pending, item) // HLdupe
+ seen[item.GUID] = true // HLdupe
+ } // HLdupe
+ }
+ // STOPDEDUPE OMIT
+ case updates <- first:
+ pending = pending[1:]
+ }
+ }
+}
+
+// loop periodically fecthes Items, sends them on s.updates, and exits
+// when Close is called. It extends dedupeLoop with logic to run
+// Fetch asynchronously.
+func (s *sub) loop() {
+ const maxPending = 10
+ type fetchResult struct {
+ fetched []Item
+ next time.Time
+ err error
+ }
+ // STARTFETCHDONE OMIT
+ var fetchDone chan fetchResult // if non-nil, Fetch is running // HL
+ // STOPFETCHDONE OMIT
+ var pending []Item
+ var next time.Time
+ var err error
+ var seen = make(map[string]bool)
+ for {
+ var fetchDelay time.Duration
+ if now := time.Now(); next.After(now) {
+ fetchDelay = next.Sub(now)
+ }
+ // STARTFETCHIF OMIT
+ var startFetch <-chan time.Time
+ if fetchDone == nil && len(pending) < maxPending { // HLfetch
+ startFetch = time.After(fetchDelay) // enable fetch case
+ }
+ // STOPFETCHIF OMIT
+ var first Item
+ var updates chan Item
+ if len(pending) > 0 {
+ first = pending[0]
+ updates = s.updates // enable send case
+ }
+ // STARTFETCHASYNC OMIT
+ select {
+ case <-startFetch: // HLfetch
+ fetchDone = make(chan fetchResult, 1) // HLfetch
+ go func() {
+ fetched, next, err := s.fetcher.Fetch()
+ fetchDone <- fetchResult{fetched, next, err}
+ }()
+ case result := <-fetchDone: // HLfetch
+ fetchDone = nil // HLfetch
+ // Use result.fetched, result.next, result.err
+ // STOPFETCHASYNC OMIT
+ fetched := result.fetched
+ next, err = result.next, result.err
+ if err != nil {
+ next = time.Now().Add(10 * time.Second)
+ break
+ }
+ for _, item := range fetched {
+ if id := item.GUID; !seen[id] { // HLdupe
+ pending = append(pending, item)
+ seen[id] = true // HLdupe
+ }
+ }
+ case errc := <-s.closing:
+ errc <- err
+ close(s.updates)
+ return
+ case updates <- first:
+ pending = pending[1:]
+ }
+ }
+}
+
+// naiveMerge is a version of Merge that doesn't quite work right. In
+// particular, the goroutines it starts may block forever on m.updates
+// if the receiver stops receiving.
+type naiveMerge struct {
+ subs []Subscription
+ updates chan Item
+}
+
+// STARTNAIVEMERGE OMIT
+func NaiveMerge(subs ...Subscription) Subscription {
+ m := &naiveMerge{
+ subs: subs,
+ updates: make(chan Item),
+ }
+ // STARTNAIVEMERGELOOP OMIT
+ for _, sub := range subs {
+ go func(s Subscription) {
+ for it := range s.Updates() {
+ m.updates <- it // HL
+ }
+ }(sub)
+ }
+ // STOPNAIVEMERGELOOP OMIT
+ return m
+}
+
+// STOPNAIVEMERGE OMIT
+
+// STARTNAIVEMERGECLOSE OMIT
+func (m *naiveMerge) Close() (err error) {
+ for _, sub := range m.subs {
+ if e := sub.Close(); err == nil && e != nil {
+ err = e
+ }
+ }
+ close(m.updates) // HL
+ return
+}
+
+// STOPNAIVEMERGECLOSE OMIT
+
+func (m *naiveMerge) Updates() <-chan Item {
+ return m.updates
+}
+
+type merge struct {
+ subs []Subscription
+ updates chan Item
+ quit chan struct{}
+ errs chan error
+}
+
+// STARTMERGESIG OMIT
+// Merge returns a Subscription that merges the item streams from subs.
+// Closing the merged subscription closes subs.
+func Merge(subs ...Subscription) Subscription {
+ // STOPMERGESIG OMIT
+ m := &merge{
+ subs: subs,
+ updates: make(chan Item),
+ quit: make(chan struct{}),
+ errs: make(chan error),
+ }
+ // STARTMERGE OMIT
+ for _, sub := range subs {
+ go func(s Subscription) {
+ for {
+ var it Item
+ select {
+ case it = <-s.Updates():
+ case <-m.quit: // HL
+ m.errs <- s.Close() // HL
+ return // HL
+ }
+ select {
+ case m.updates <- it:
+ case <-m.quit: // HL
+ m.errs <- s.Close() // HL
+ return // HL
+ }
+ }
+ }(sub)
+ }
+ // STOPMERGE OMIT
+ return m
+}
+
+func (m *merge) Updates() <-chan Item {
+ return m.updates
+}
+
+// STARTMERGECLOSE OMIT
+func (m *merge) Close() (err error) {
+ close(m.quit) // HL
+ for _ = range m.subs {
+ if e := <-m.errs; e != nil { // HL
+ err = e
+ }
+ }
+ close(m.updates) // HL
+ return
+}
+
+// STOPMERGECLOSE OMIT
+
+// NaiveDedupe converts a stream of Items that may contain duplicates
+// into one that doesn't.
+func NaiveDedupe(in <-chan Item) <-chan Item {
+ out := make(chan Item)
+ go func() {
+ seen := make(map[string]bool)
+ for it := range in {
+ if !seen[it.GUID] {
+ // BUG: this send blocks if the
+ // receiver closes the Subscription
+ // and stops receiving.
+ out <- it // HL
+ seen[it.GUID] = true
+ }
+ }
+ close(out)
+ }()
+ return out
+}
+
+type deduper struct {
+ s Subscription
+ updates chan Item
+ closing chan chan error
+}
+
+// Dedupe converts a Subscription that may send duplicate Items into
+// one that doesn't.
+func Dedupe(s Subscription) Subscription {
+ d := &deduper{
+ s: s,
+ updates: make(chan Item),
+ closing: make(chan chan error),
+ }
+ go d.loop()
+ return d
+}
+
+func (d *deduper) loop() {
+ in := d.s.Updates() // enable receive
+ var pending Item
+ var out chan Item // disable send
+ seen := make(map[string]bool)
+ for {
+ select {
+ case it := <-in:
+ if !seen[it.GUID] {
+ pending = it
+ in = nil // disable receive
+ out = d.updates // enable send
+ seen[it.GUID] = true
+ }
+ case out <- pending:
+ in = d.s.Updates() // enable receive
+ out = nil // disable send
+ case errc := <-d.closing:
+ err := d.s.Close()
+ errc <- err
+ close(d.updates)
+ return
+ }
+ }
+}
+
+func (d *deduper) Close() error {
+ errc := make(chan error)
+ d.closing <- errc
+ return <-errc
+}
+
+func (d *deduper) Updates() <-chan Item {
+ return d.updates
+}
+
+// FakeFetch causes Fetch to use a fake fetcher instead of the real
+// one.
+var FakeFetch bool
+
+// Fetch returns a Fetcher for Items from domain.
+func Fetch(domain string) Fetcher {
+ if FakeFetch {
+ return fakeFetch(domain)
+ }
+ return realFetch(domain)
+}
+
+func fakeFetch(domain string) Fetcher {
+ return &fakeFetcher{channel: domain}
+}
+
+type fakeFetcher struct {
+ channel string
+ items []Item
+}
+
+// FakeDuplicates causes the fake fetcher to return duplicate items.
+var FakeDuplicates bool
+
+func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) {
+ now := time.Now()
+ next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
+ item := Item{
+ Channel: f.channel,
+ Title: fmt.Sprintf("Item %d", len(f.items)),
+ }
+ item.GUID = item.Channel + "/" + item.Title
+ f.items = append(f.items, item)
+ if FakeDuplicates {
+ items = f.items
+ } else {
+ items = []Item{item}
+ }
+ return
+}
+
+// realFetch returns a fetcher for the specified blogger domain.
+func realFetch(domain string) Fetcher {
+ return NewFetcher(fmt.Sprintf("http://%s/feeds/posts/default?alt=rss", domain))
+}
+
+type fetcher struct {
+ uri string
+ feed *rss.Feed
+ items []Item
+}
+
+// NewFetcher returns a Fetcher for uri.
+func NewFetcher(uri string) Fetcher {
+ f := &fetcher{
+ uri: uri,
+ }
+ newChans := func(feed *rss.Feed, chans []*rss.Channel) {}
+ newItems := func(feed *rss.Feed, ch *rss.Channel, items []*rss.Item) {
+ for _, it := range items {
+ f.items = append(f.items, Item{
+ Channel: ch.Title,
+ GUID: it.Guid,
+ Title: it.Title,
+ })
+ }
+ }
+ f.feed = rss.New(1 /*minimum interval in minutes*/, true /*respect limit*/, newChans, newItems)
+ return f
+}
+
+func (f *fetcher) Fetch() (items []Item, next time.Time, err error) {
+ fmt.Println("fetching", f.uri)
+ if err = f.feed.Fetch(f.uri, nil); err != nil {
+ return
+ }
+ items = f.items
+ f.items = nil
+ next = time.Now().Add(time.Duration(f.feed.SecondsTillUpdate()) * time.Second)
+ return
+}
+
+// TODO: in a longer talk: move the Subscribe function onto a Reader type, to
+// support dynamically adding and removing Subscriptions. Reader should dedupe.
+
+// TODO: in a longer talk: make successive Subscribe calls for the same uri
+// share the same underlying Subscription, but provide duplicate streams.