go.talks: add Advanced Go Concurrency Patterns

R=snanda
CC=golang-dev
https://golang.org/cl/9626044
diff --git a/2013/advconc.slide b/2013/advconc.slide
new file mode 100644
index 0000000..3f3869f
--- /dev/null
+++ b/2013/advconc.slide
@@ -0,0 +1,384 @@
+Advanced Go Concurrency Patterns
+
+Sameer Ajmani
+Google
+http://profiles.google.com/ajmani
+@Sajma
+http://golang.org
+
+* Video
+
+This talk was presented at Google I/O in May 2013.
+
+.link https://www.youtube.com/watch?v=QDDwwePbDtw Watch the talk on YouTube
+
+* Get ready
+
+.image advconc/gopherswim.jpg 400 400
+
+* Go supports concurrency
+
+In the language and runtime, not a library.
+
+This changes how you structure your programs.
+
+* Goroutines and Channels
+
+Goroutines are independently executing functions in the same address space.
+
+   go f()
+   go g(1, 2)
+
+Channels are typed values that allow goroutines to synchronize and exchange information.
+
+  c := make(chan int)
+  go func() { c <- 3 }()
+  n := <-c
+
+For more on the basics, watch [[http://talks.golang.org/2012/concurrency.slide#1][Go Concurrency Patterns (Pike, 2012)]].
+
+* Example: ping-pong
+
+.play advconc/pingpong1.go /STARTMAIN1/,/STOPMAIN1/
+
+* Deadlock detection
+
+.play advconc/pingpongdeadlock.go /STARTMAIN1/,/STOPMAIN1/
+
+* Panic dumps the stacks
+
+.play advconc/pingpongpanic.go /STARTMAIN1/,/STOPMAIN1/
+
+* It's easy to go, but how to stop?
+
+Long-lived programs need to clean up.
+
+Let's look at how to write programs that handle communication, periodic events, and cancellation.
+
+The core is Go's `select` statement: like a `switch`, but the decision is made based on the ability to communicate.
+
+  select {
+  case xc <- x:
+      // sent x on xc
+  case y := <-yc:
+      // received y from yc
+  }
+
+* Example: feed reader
+
+My favorite feed reader disappeared.  I need a new one.
+
+Why not write one?
+
+Where do we start?
+
+* Find an RSS client
+
+Searching [[http://godoc.org][godoc.org]] for *"rss"* turns up several hits, including one that provides:
+
+  // Fetch fetches Items for uri and returns the time when the next
+  // fetch should be attempted.  On failure, Fetch returns an error.
+  func Fetch(uri string) (items []Item, next time.Time, err error)
+
+  type Item struct{
+      Title, Channel, GUID string // a subset of RSS fields
+  }
+
+But I want a stream:
+
+  <-chan Item
+
+And I want multiple subscriptions.
+
+* Here's what we have
+
+  type Fetcher interface {
+      Fetch() (items []Item, next time.Time, err error)
+  }
+
+  func Fetch(domain string) Fetcher {...} // fetches Items from domain
+
+* Here's what we want
+
+  type Subscription interface {
+      Updates() <-chan Item // stream of Items
+      Close() error         // shuts down the stream
+  }
+
+  func Subscribe(fetcher Fetcher) Subscription {...} // converts Fetches to a stream
+
+  func Merge(subs ...Subscription) Subscription {...} // merges several streams
+
+* Example
+
+.play advconc/fakemain.go /func main/,/^}/
+
+* Subscribe
+
+`Subscribe` creates a new `Subscription` that repeatedly fetches items until `Close` is called.
+
+  func Subscribe(fetcher Fetcher) Subscription {
+      s := &sub{
+          fetcher: fetcher,
+          updates: make(chan Item), // for Updates
+      }
+      go s.loop()
+      return s
+  }
+
+  // sub implements the Subscription interface.
+  type sub struct {
+      fetcher Fetcher   // fetches items
+      updates chan Item // delivers items to the user
+  }
+
+  // loop fetches items using s.fetcher and sends them
+  // on s.updates.  loop exits when s.Close is called.
+  func (s *sub) loop() {...}
+
+* Implementing Subscription
+
+To implement the `Subscription` interface, define `Updates` and `Close`.
+
+.code advconc/reader/reader.go /func.* Updates/,/^}/
+
+  func (s *sub) Close() error {
+      // TODO: make loop exit
+      // TODO: find out about any error
+      return err
+  }
+
+* What does loop do?
+
+- periodically call `Fetch`
+- send fetched items on the `Updates` channel
+- exit when `Close` is called, reporting any error
+
+* Naive Implementation
+
+# Not quite enough room for this; retry after format change:
+# .play advconc/naivemain.go /naiveSub\) loop/,/^}/
+# also on subsequent slides.
+
+.play advconc/naivemain.go /STARTNAIVE/,/STOPNAIVE/
+.code advconc/naivemain.go /naiveSub\) Close/,/^}/
+
+* Bug 1: unsynchronized access to s.closed/s.err
+
+.code advconc/naivemain.go /STARTNAIVE/,/STOPNAIVE/ HLsync
+.code advconc/naivemain.go /naiveSub\) Close/,/^}/ HLsync
+
+* Race Detector
+
+  go run -race naivemain.go
+
+# original is 400x1500
+.image advconc/race.png 150 562
+.play advconc/naivemain.go /STARTNAIVE/,/s.err/ HLsync
+.code advconc/naivemain.go /naiveSub\) Close/,/^}/ HLsync
+
+#* Race demo
+#.play go1.1/race.go
+
+* Bug 2: time.Sleep may keep loop running
+
+.code advconc/naivemain.go /STARTNAIVE/,/STOPNAIVE/ HLsleep
+
+* Bug 3: loop may block forever on s.updates
+
+.code advconc/naivemain.go /STARTNAIVE/,/STOPNAIVE/ HLsend
+
+* Solution
+
+Change the body of `loop` to a `select` with three cases:
+
+- `Close` was called
+- it's time to call `Fetch`
+- send an item on `s.updates`
+
+* Structure: for-select loop
+
+`loop` runs in its own goroutine.
+
+`select` lets `loop` avoid blocking indefinitely in any one state.
+
+  func (s *sub) loop() {
+      ... declare mutable state ...
+      for {
+          ... set up channels for cases ...
+          select {
+          case <-c1:
+              ... read/write state ...
+          case c2 <- x:
+              ... read/write state ...
+          case y := <-c3:
+              ... read/write state ...
+          }
+      }
+  }
+
+The cases interact via local state in `loop`.
+
+* Case 1: Close
+
+`Close` communicates with `loop` via `s.closing`.
+
+  type sub struct {
+      closing chan chan error
+  }
+
+The service (`loop`) listens for requests on its channel (`s.closing`).
+
+The client (`Close`) sends a request on `s.closing`: _exit_and_reply_with_the_error_
+
+In this case, the only thing in the request is the _reply_channel_.
+
+* Case 1: Close
+
+`Close` asks loop to exit and waits for a response.
+
+.code advconc/reader/reader.go /\*sub\) Close/,/^}/ HLchan
+
+`loop` handles `Close` by replying with the `Fetch` error and exiting.
+
+.code advconc/reader/reader.go /STARTCLOSEONLY /,/STOPCLOSEONLY / HLchan
+
+* Case 2: Fetch
+
+Schedule the next `Fetch` after some delay.
+
+.code advconc/reader/reader.go /STARTFETCHONLY /,/STOPFETCHONLY /
+
+* Case 3: Send
+
+Send the fetched items, one at a time.
+
+	var pending []Item // appended by fetch; consumed by send
+	for {
+		select {
+		case s.updates <- pending[0]:
+			pending = pending[1:]
+		}
+	}
+
+Whoops. This crashes.
+
+.image advconc/gopherswrench.jpg 200 337
+
+* Select and nil channels
+
+Sends and receives on nil channels block.
+
+Select never selects a blocking case.
+
+.play advconc/nilselect.go /func main/,/^}/
+
+* Case 3: Send (fixed)
+
+Enable send only when pending is non-empty.
+
+.code advconc/reader/reader.go /STARTSENDONLY /,/STOPSENDONLY / HLupdates
+
+* Select
+
+Put the three cases together:
+
+.code advconc/reader/reader.go /STARTSELECT /,/STOPSELECT /
+
+The cases interact via `err`, `next`, and `pending`.
+
+No locks, no condition variables, no callbacks.
+
+* Bugs fixed
+
+- Bug 1: unsynchronized access to `s.closed` and `s.err`
+- Bug 2: `time.Sleep` may keep loop running
+- Bug 3: `loop` may block forever sending on `s.updates`
+
+.code advconc/reader/reader.go /STARTSELECT /,/STOPSELECT / HLcases
+
+* We can improve loop further
+
+* Issue: Fetch may return duplicates
+
+.code advconc/reader/reader.go /STARTFETCHVARS /,/STOPFETCHVARS / HLfetch
+.code advconc/reader/reader.go /STARTFETCHCASE /,/STOPFETCHCASE / HLfetch
+
+* Fix: Filter items before adding to pending
+
+.code advconc/reader/reader.go /STARTSEEN /,/STOPSEEN / HLseen
+.code advconc/reader/reader.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe
+
+* Issue: Pending queue grows without bound
+
+.code advconc/reader/reader.go /STARTDEDUPE /,/STOPDEDUPE / HLdupe
+
+* Fix: Disable fetch case when too much pending
+
+  const maxPending = 10
+
+.code advconc/reader/reader.go /STARTCAP /,/STOPCAP / HLcap
+
+Could instead drop older items from the head of `pending`.
+
+* Issue: Loop blocks on Fetch
+
+.code advconc/reader/reader.go /STARTDEDUPE /,/STOPDEDUPE / HLfetch
+
+* Fix: Run Fetch asynchronously
+
+Add a new `select` case for `fetchDone`.
+
+  type fetchResult struct{ fetched []Item; next time.Time; err error }
+
+.code advconc/reader/reader.go /STARTFETCHDONE /,/STOPFETCHDONE / HLfetch
+.code advconc/reader/reader.go /STARTFETCHIF /,/STOPFETCHIF / HLfetch
+.code advconc/reader/reader.go /STARTFETCHASYNC /,/STOPFETCHASYNC / HLfetch
+
+* Implemented Subscribe
+
+Responsive. Cleans up. Easy to read and change.
+
+Three techniques:
+
+- `for-select` loop
+- service channel, reply channels (`chan`chan`error`)
+- `nil` channels in `select` cases
+
+More details online, including `Merge`.
+
+.image advconc/gopherhat.jpg 200 158
+
+* Conclusion
+
+Concurrent programming can be tricky.
+
+Go makes it easier:
+
+- channels convey data, timer events, cancellation signals
+- goroutines serialize access to local mutable state
+- stack traces & deadlock detector
+- race detector
+
+.image advconc/race.png 200 750
+
+* Links
+
+Go Concurrency Patterns (2012)
+
+.link http://talks.golang.org/2012/concurrency.slide
+
+Concurrency is not parallelism
+
+.link http://golang.org/s/concurrency-is-not-parallelism
+
+Share memory by communicating
+
+.link http://golang.org/doc/codewalk/sharemem
+
+Go Tour (learn Go in your browser)
+
+.link http://tour.golang.org
+
+*Go*Team*Fireside*Chat*
+5:20pm Room 2
diff --git a/2013/advconc/buffer.go b/2013/advconc/buffer.go
new file mode 100644
index 0000000..0cd8d9a
--- /dev/null
+++ b/2013/advconc/buffer.go
@@ -0,0 +1,43 @@
+package main
+
+import (
+	"fmt"
+)
+
+func main() {
+	in, out := make(chan int), make(chan int)
+	go buffer(in, out)
+	for i := 0; i < 10; i++ {
+		in <- i
+	}
+	close(in)
+	for i := range out {
+		fmt.Println(i)
+	}
+}
+
+// buffer provides an unbounded buffer between in and out.  buffer
+// exits when in is closed and all items in the buffer have been sent
+// to out, at which point it closes out.
+func buffer(in <-chan int, out chan<- int) {
+	var buf []int
+	for in != nil || len(buf) > 0 {
+		var i int
+		var c chan<- int
+		if len(buf) > 0 {
+			i = buf[0]
+			c = out // enable send case
+		}
+		select {
+		case n, ok := <-in:
+			if ok {
+				buf = append(buf, n)
+			} else {
+				in = nil // disable receive case
+			}
+		case c <- i:
+			buf = buf[1:]
+		}
+	}
+	close(out)
+}
diff --git a/2013/advconc/dedupermain.go b/2013/advconc/dedupermain.go
new file mode 100644
index 0000000..898c175
--- /dev/null
+++ b/2013/advconc/dedupermain.go
@@ -0,0 +1,41 @@
+package main
+
+import (
+	"fmt"
+	"math/rand"
+	"time"
+
+	. "code.google.com/p/go.talks/2013/reader"
+)
+
+func init() {
+	rand.Seed(time.Now().UnixNano())
+	FakeFetch = true
+}
+
+// STARTMAIN OMIT
+func main() {
+	// STARTMERGECALL OMIT
+	// Subscribe to some feeds, and create a merged update stream.
+	merged := Dedupe(Merge(
+		Subscribe(Fetch("blog.golang.org")),
+		Subscribe(Fetch("blog.golang.org")),
+		Subscribe(Fetch("blog.golang.org")),
+		Subscribe(Fetch("googleblog.blogspot.com")),
+		Subscribe(Fetch("googledevelopers.blogspot.com"))))
+	// STOPMERGECALL OMIT
+
+	// Close the subscriptions after some time.
+	time.AfterFunc(3*time.Second, func() {
+		fmt.Println("closed:", merged.Close())
+	})
+
+	// Print the stream.
+	for it := range merged.Updates() {
+		fmt.Println(it.Channel, it.Title)
+	}
+
+	panic("show me the stacks")
+}
+
+// STOPMAIN OMIT
diff --git a/2013/advconc/fakemain.go b/2013/advconc/fakemain.go
new file mode 100644
index 0000000..bb1d44f
--- /dev/null
+++ b/2013/advconc/fakemain.go
@@ -0,0 +1,39 @@
+package main
+
+import (
+	"fmt"
+	"math/rand"
+	"time"
+
+	. "code.google.com/p/go.talks/2013/reader"
+)
+
+func init() {
+	rand.Seed(time.Now().UnixNano())
+	FakeFetch = true
+}
+
+// STARTMAIN OMIT
+func main() {
+	// STARTMERGECALL OMIT
+	// Subscribe to some feeds, and create a merged update stream.
+	merged := Merge(
+		Subscribe(Fetch("blog.golang.org")),
+		Subscribe(Fetch("googleblog.blogspot.com")),
+		Subscribe(Fetch("googledevelopers.blogspot.com")))
+	// STOPMERGECALL OMIT
+
+	// Close the subscriptions after some time.
+	time.AfterFunc(3*time.Second, func() {
+		fmt.Println("closed:", merged.Close())
+	})
+
+	// Print the stream.
+	for it := range merged.Updates() {
+		fmt.Println(it.Channel, it.Title)
+	}
+
+	panic("show me the stacks")
+}
+
+// STOPMAIN OMIT
diff --git a/2013/advconc/fakemainnomerge.go b/2013/advconc/fakemainnomerge.go
new file mode 100644
index 0000000..c5bca2a
--- /dev/null
+++ b/2013/advconc/fakemainnomerge.go
@@ -0,0 +1,33 @@
+package main
+
+import (
+	"fmt"
+	"math/rand"
+	"time"
+
+	. "code.google.com/p/go.talks/2013/reader"
+)
+
+func init() {
+	rand.Seed(time.Now().UnixNano())
+	FakeFetch = true
+}
+
+// STARTMAIN OMIT
+func main() {
+	sub := Subscribe(Fetch("blog.golang.org"))
+
+	// Close the subscription after some time.
+	time.AfterFunc(3*time.Second, func() {
+		fmt.Println("closed:", sub.Close())
+	})
+
+	// Print the stream.
+	for it := range sub.Updates() {
+		fmt.Println(it.Channel, it.Title)
+	}
+
+	panic("show me the stacks")
+}
+
+// STOPMAIN OMIT
diff --git a/2013/advconc/gopherhat.jpg b/2013/advconc/gopherhat.jpg
new file mode 100644
index 0000000..f34d7b3
--- /dev/null
+++ b/2013/advconc/gopherhat.jpg
Binary files differ
diff --git a/2013/advconc/gopherrunning.jpg b/2013/advconc/gopherrunning.jpg
new file mode 100644
index 0000000..eeeddf1
--- /dev/null
+++ b/2013/advconc/gopherrunning.jpg
Binary files differ
diff --git a/2013/advconc/gopherswim.jpg b/2013/advconc/gopherswim.jpg
new file mode 100644
index 0000000..2f32877
--- /dev/null
+++ b/2013/advconc/gopherswim.jpg
Binary files differ
diff --git a/2013/advconc/gopherswrench.jpg b/2013/advconc/gopherswrench.jpg
new file mode 100644
index 0000000..93005f4
--- /dev/null
+++ b/2013/advconc/gopherswrench.jpg
Binary files differ
diff --git a/2013/advconc/naivemain.go b/2013/advconc/naivemain.go
new file mode 100644
index 0000000..072608f
--- /dev/null
+++ b/2013/advconc/naivemain.go
@@ -0,0 +1,85 @@
+package main
+
+import (
+	"fmt"
+	"math/rand"
+	"time"
+
+	. "code.google.com/p/go.talks/2013/reader"
+)
+
+func NaiveSubscribe(fetcher Fetcher) Subscription {
+	s := &naiveSub{
+		fetcher: fetcher,
+		updates: make(chan Item),
+	}
+	go s.loop()
+	return s
+}
+
+type naiveSub struct {
+	fetcher Fetcher
+	updates chan Item
+	closed  bool
+	err     error
+}
+
+func (s *naiveSub) Updates() <-chan Item {
+	return s.updates
+}
+
+func (s *naiveSub) loop() {
+	// STARTNAIVE OMIT
+	for {
+		if s.closed { // HLsync
+			close(s.updates)
+			return
+		}
+		items, next, err := s.fetcher.Fetch()
+		if err != nil {
+			s.err = err                  // HLsync
+			time.Sleep(10 * time.Second) // HLsleep
+			continue
+		}
+		for _, item := range items {
+			s.updates <- item // HLsend
+		}
+		if now := time.Now(); next.After(now) {
+			time.Sleep(next.Sub(now)) // HLsleep
+		}
+	}
+	// STOPNAIVE OMIT
+}
+
+func (s *naiveSub) Close() error {
+	s.closed = true // HLsync
+	return s.err    // HLsync
+}
+
+func init() {
+	rand.Seed(time.Now().UnixNano())
+	FakeFetch = true
+}
+
+func main() {
+	// Subscribe to some feeds, and create a merged update stream.
+	merged := Merge(
+		NaiveSubscribe(Fetch("blog.golang.org")),
+		NaiveSubscribe(Fetch("googleblog.blogspot.com")),
+		NaiveSubscribe(Fetch("googledevelopers.blogspot.com")))
+
+	// Close the subscriptions after some time.
+	time.AfterFunc(3*time.Second, func() {
+		fmt.Println("closed:", merged.Close())
+	})
+
+	// Print the stream.
+	for it := range merged.Updates() {
+		fmt.Println(it.Channel, it.Title)
+	}
+
+	// The loops are still running.  Let the race detector notice.
+	time.Sleep(1 * time.Second)
+
+	panic("show me the stacks")
+}
diff --git a/2013/advconc/naivemainnomerge.go b/2013/advconc/naivemainnomerge.go
new file mode 100644
index 0000000..9564160
--- /dev/null
+++ b/2013/advconc/naivemainnomerge.go
@@ -0,0 +1,30 @@
+package main
+
+import (
+	"fmt"
+	"math/rand"
+	"time"
+
+	. "code.google.com/p/go.talks/2013/reader"
+)
+
+func init() {
+	rand.Seed(time.Now().UnixNano())
+	FakeFetch = true
+}
+
+func main() {
+	sub := NaiveSubscribe(Fetch("blog.golang.org"))
+
+	// Close the subscription after some time.
+	time.AfterFunc(3*time.Second, func() {
+		fmt.Println("closed:", sub.Close())
+	})
+
+	// Print the stream.
+	for it := range sub.Updates() {
+		fmt.Println(it.Channel, it.Title)
+	}
+
+	panic("show me the stacks")
+}
diff --git a/2013/advconc/nilselect.go b/2013/advconc/nilselect.go
new file mode 100644
index 0000000..f84b612
--- /dev/null
+++ b/2013/advconc/nilselect.go
@@ -0,0 +1,30 @@
+package main
+
+import (
+	"fmt"
+	"math/rand"
+	"time"
+)
+
+func init() {
+	rand.Seed(time.Now().UnixNano())
+}
+
+func main() {
+	a, b := make(chan string), make(chan string)
+	go func() { a <- "a" }()
+	go func() { b <- "b" }()
+	if rand.Intn(2) == 0 {
+		a = nil // HL
+		fmt.Println("nil a")
+	} else {
+		b = nil // HL
+		fmt.Println("nil b")
+	}
+	select {
+	case s := <-a:
+		fmt.Println("got", s)
+	case s := <-b:
+		fmt.Println("got", s)
+	}
+}
diff --git a/2013/advconc/pingpong1.go b/2013/advconc/pingpong1.go
new file mode 100644
index 0000000..80acfff
--- /dev/null
+++ b/2013/advconc/pingpong1.go
@@ -0,0 +1,31 @@
+package main
+
+import (
+	"fmt"
+	"time"
+)
+
+// STARTMAIN1 OMIT
+type Ball struct{ hits int }
+
+func main() {
+	table := make(chan *Ball)
+	go player("ping", table)
+	go player("pong", table)
+
+	table <- new(Ball) // game on; toss the ball
+	time.Sleep(1 * time.Second)
+	<-table // game over; grab the ball
+}
+
+func player(name string, table chan *Ball) {
+	for {
+		ball := <-table
+		ball.hits++
+		fmt.Println(name, ball.hits)
+		time.Sleep(100 * time.Millisecond)
+		table <- ball
+	}
+}
+
+// STOPMAIN1 OMIT
diff --git a/2013/advconc/pingpong2.go b/2013/advconc/pingpong2.go
new file mode 100644
index 0000000..0aca88f
--- /dev/null
+++ b/2013/advconc/pingpong2.go
@@ -0,0 +1,42 @@
+package main
+
+import (
+	"fmt"
+	"time"
+)
+
+// STARTMAIN1 OMIT
+type Ball struct{}
+
+func main() {
+	table := make(chan Ball)
+	stop1, stop2 := make(chan int), make(chan int)
+	go player("ping", table, stop1)
+	go player("pong", table, stop2)
+
+	table <- Ball{}
+	time.Sleep(1 * time.Second)
+	stop1 <- 1
+	stop2 <- 1
+	<-stop1
+	<-stop2
+}
+
+// STOPMAIN1 OMIT
+
+// STARTPLAYER1 OMIT
+func player(name string, table chan Ball, stop chan int) {
+	for {
+		select {
+		case ball := <-table:
+			fmt.Println(name)
+			time.Sleep(100 * time.Millisecond)
+			table <- ball
+		case <-stop:
+			fmt.Println(name, "done")
+			stop <- 1
+		}
+	}
+}
+
+// STOPPLAYER1 OMIT
diff --git a/2013/advconc/pingpongdeadlock.go b/2013/advconc/pingpongdeadlock.go
new file mode 100644
index 0000000..4acbb6b
--- /dev/null
+++ b/2013/advconc/pingpongdeadlock.go
@@ -0,0 +1,31 @@
+package main
+
+import (
+	"fmt"
+	"time"
+)
+
+// STARTMAIN1 OMIT
+type Ball struct{ hits int }
+
+func main() {
+	table := make(chan *Ball)
+	go player("ping", table)
+	go player("pong", table)
+
+	// table <- new(Ball) // game on; toss the ball // HL
+	time.Sleep(1 * time.Second)
+	<-table // game over; grab the ball
+}
+
+func player(name string, table chan *Ball) {
+	for {
+		ball := <-table
+		ball.hits++
+		fmt.Println(name, ball.hits)
+		time.Sleep(100 * time.Millisecond)
+		table <- ball
+	}
+}
+
+// STOPMAIN1 OMIT
diff --git a/2013/advconc/pingpongpanic.go b/2013/advconc/pingpongpanic.go
new file mode 100644
index 0000000..ce6f4b3
--- /dev/null
+++ b/2013/advconc/pingpongpanic.go
@@ -0,0 +1,33 @@
+package main
+
+import (
+	"fmt"
+	"time"
+)
+
+// STARTMAIN1 OMIT
+type Ball struct{ hits int }
+
+func main() {
+	table := make(chan *Ball)
+	go player("ping", table)
+	go player("pong", table)
+
+	table <- new(Ball) // game on; toss the ball
+	time.Sleep(1 * time.Second)
+	<-table // game over; grab the ball
+
+	panic("show me the stacks") // HL
+}
+
+func player(name string, table chan *Ball) {
+	for {
+		ball := <-table
+		ball.hits++
+		fmt.Println(name, ball.hits)
+		time.Sleep(100 * time.Millisecond)
+		table <- ball
+	}
+}
+
+// STOPMAIN1 OMIT
diff --git a/2013/advconc/race.out b/2013/advconc/race.out
new file mode 100644
index 0000000..955794f
--- /dev/null
+++ b/2013/advconc/race.out
@@ -0,0 +1,62 @@
+googleblog.blogspot.com Item 0
+googleblog.blogspot.com Item 1
+googledevelopers.blogspot.com Item 0
+blog.golang.org Item 0
+googledevelopers.blogspot.com Item 1
+blog.golang.org Item 1
+blog.golang.org Item 2
+googledevelopers.blogspot.com Item 2
+googledevelopers.blogspot.com Item 3
+googleblog.blogspot.com Item 2
+googledevelopers.blogspot.com Item 4
+==================
+WARNING: DATA RACE
+Read by goroutine 4: // HL
+  code.google.com/p/go.talks/2013/reader.(*naiveSub).loop() // HL
+      /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:105 +0x46
+  gosched0()
+      /Users/sameer/go/src/pkg/runtime/proc.c:1218 +0x9f
+
+Previous write by goroutine 7: // HL
+  code.google.com/p/go.talks/2013/reader.(*naiveSub).Close() // HL
+      /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:125 +0x38
+  code.google.com/p/go.talks/2013/reader.func·002()
+      /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:353 +0x1da
+  gosched0()
+      /Users/sameer/go/src/pkg/runtime/proc.c:1218 +0x9f
+
+Goroutine 4 (running) created at:
+  code.google.com/p/go.talks/2013/reader.NaiveSubscribe()
+      /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:87 +0xff
+  main.main()
+      /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/advconc/naivemain.go:18 +0xca
+  runtime.main()
+      /Users/sameer/go/src/pkg/runtime/proc.c:182 +0x91
+
+Goroutine 7 (finished) created at:
+  code.google.com/p/go.talks/2013/reader.Merge()
+      /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:357 +0x1f5
+  main.main()
+      /Users/sameer/gocode/src/code.google.com/p/go.talks/2013/advconc/naivemain.go:20 +0x261
+  runtime.main()
+      /Users/sameer/go/src/pkg/runtime/proc.c:182 +0x91
+
+==================
+googleblog.blogspot.com Item 3
+closed: <nil>
+panic: show me the stacks
+
+goroutine 1 [running]:
+main.main()
+	/Users/sameer/gocode/src/code.google.com/p/go.talks/2013/advconc/naivemain.go:33 +0x4f5
+
+goroutine 2 [syscall]:
+
+goroutine 5 [sleep]:
+time.Sleep(0xee5ad23)
+	/Users/sameer/go/src/pkg/runtime/ztime_darwin_amd64.c:19 +0x2f
+code.google.com/p/go.talks/2013/reader.(*naiveSub).loop(0xc2000aea50)
+	/Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:119 +0x2a3
+created by code.google.com/p/go.talks/2013/reader.NaiveSubscribe
+	/Users/sameer/gocode/src/code.google.com/p/go.talks/2013/reader/reader.go:87 +0xff
+exit status 2
diff --git a/2013/advconc/race.png b/2013/advconc/race.png
new file mode 100644
index 0000000..8934a64
--- /dev/null
+++ b/2013/advconc/race.png
Binary files differ
diff --git a/2013/advconc/reader/reader.go b/2013/advconc/reader/reader.go
new file mode 100644
index 0000000..980f2bd
--- /dev/null
+++ b/2013/advconc/reader/reader.go
@@ -0,0 +1,572 @@
+package reader
+
+import (
+	"fmt"
+	"math/rand"
+	"time"
+
+	rss "github.com/jteeuwen/go-pkg-rss"
+)
+
+// STARTITEM OMIT
+// An Item is a stripped-down RSS item.
+type Item struct{ Title, Channel, GUID string }
+
+// STOPITEM OMIT
+
+// STARTFETCHER OMIT
+// A Fetcher fetches Items and returns the time when the next fetch should be
+// attempted.  On failure, Fetch returns a non-nil error.
+type Fetcher interface {
+	Fetch() (items []Item, next time.Time, err error)
+}
+
+// STOPFETCHER OMIT
+
+// STARTSUBSCRIPTION OMIT
+// A Subscription delivers Items over a channel.  Close cancels the
+// subscription, closes the Updates channel, and returns the last fetch error,
+// if any.
+type Subscription interface {
+	Updates() <-chan Item
+	Close() error
+}
+
+// STOPSUBSCRIPTION OMIT
+
+// STARTSUBSCRIBE OMIT
+// Subscribe returns a new Subscription that uses fetcher to fetch Items.
+func Subscribe(fetcher Fetcher) Subscription {
+	s := &sub{
+		fetcher: fetcher,
+		updates: make(chan Item),       // for Updates
+		closing: make(chan chan error), // for Close
+	}
+	go s.loop()
+	return s
+}
+
+// STOPSUBSCRIBE OMIT
+
+// sub implements the Subscription interface.
+type sub struct {
+	fetcher Fetcher         // fetches items
+	updates chan Item       // sends items to the user
+	closing chan chan error // for Close
+}
+
+// STARTUPDATES OMIT
+func (s *sub) Updates() <-chan Item {
+	return s.updates
+}
+
+// STOPUPDATES OMIT
+
+// STARTCLOSE OMIT
+// STARTCLOSESIG OMIT
+func (s *sub) Close() error {
+	// STOPCLOSESIG OMIT
+	errc := make(chan error)
+	s.closing <- errc // HLchan
+	return <-errc     // HLchan
+}
+
+// STOPCLOSE OMIT
+
+// loopCloseOnly is a version of loop that includes only the logic
+// that handles Close.
+func (s *sub) loopCloseOnly() {
+	// STARTCLOSEONLY OMIT
+	var err error // set when Fetch fails
+	for {
+		select {
+		case errc := <-s.closing: // HLchan
+			errc <- err      // HLchan
+			close(s.updates) // tells receiver we're done
+			return
+		}
+	}
+	// STOPCLOSEONLY OMIT
+}
+
+// loopFetchOnly is a version of loop that includes only the logic
+// that calls Fetch.
+func (s *sub) loopFetchOnly() {
+	// STARTFETCHONLY OMIT
+	var pending []Item // appended by fetch; consumed by send
+	var next time.Time // initially January 1, year 0
+	var err error
+	for {
+		var fetchDelay time.Duration // initally 0 (no delay)
+		if now := time.Now(); next.After(now) {
+			fetchDelay = next.Sub(now)
+		}
+		startFetch := time.After(fetchDelay)
+
+		select {
+		case <-startFetch:
+			var fetched []Item
+			fetched, next, err = s.fetcher.Fetch()
+			if err != nil {
+				next = time.Now().Add(10 * time.Second)
+				break
+			}
+			pending = append(pending, fetched...)
+		}
+	}
+	// STOPFETCHONLY OMIT
+}
+
+// loopSendOnly is a version of loop that includes only the logic for
+// sending items to s.updates.
+func (s *sub) loopSendOnly() {
+	// STARTSENDONLY OMIT
+	var pending []Item // appended by fetch; consumed by send
+	for {
+		var first Item
+		var updates chan Item // HLupdates
+		if len(pending) > 0 {
+			first = pending[0]
+			updates = s.updates // enable send case // HLupdates
+		}
+
+		select {
+		case updates <- first:
+			pending = pending[1:]
+		}
+	}
+	// STOPSENDONLY OMIT
+}
+
+// mergedLoop is a version of loop that combines loopCloseOnly,
+// loopFetchOnly, and loopSendOnly.
+func (s *sub) mergedLoop() {
+	// STARTFETCHVARS OMIT
+	var pending []Item
+	var next time.Time
+	var err error
+	// STOPFETCHVARS OMIT
+	for {
+		// STARTNOCAP OMIT
+		var fetchDelay time.Duration
+		if now := time.Now(); next.After(now) {
+			fetchDelay = next.Sub(now)
+		}
+		startFetch := time.After(fetchDelay)
+		// STOPNOCAP OMIT
+		var first Item
+		var updates chan Item
+		if len(pending) > 0 {
+			first = pending[0]
+			updates = s.updates // enable send case
+		}
+
+		// STARTSELECT OMIT
+		select {
+		case errc := <-s.closing: // HLcases
+			errc <- err
+			close(s.updates)
+			return
+			// STARTFETCHCASE OMIT
+		case <-startFetch: // HLcases
+			var fetched []Item
+			fetched, next, err = s.fetcher.Fetch() // HLfetch
+			if err != nil {
+				next = time.Now().Add(10 * time.Second)
+				break
+			}
+			pending = append(pending, fetched...) // HLfetch
+			// STOPFETCHCASE OMIT
+		case updates <- first: // HLcases
+			pending = pending[1:]
+		}
+		// STOPSELECT OMIT
+	}
+}
+
+// dedupeLoop extends mergedLoop with deduping of fetched items.
+func (s *sub) dedupeLoop() {
+	const maxPending = 10
+	// STARTSEEN OMIT
+	var pending []Item
+	var next time.Time
+	var err error
+	var seen = make(map[string]bool) // set of item.GUIDs // HLseen
+	// STOPSEEN OMIT
+	for {
+		// STARTCAP OMIT
+		var fetchDelay time.Duration
+		if now := time.Now(); next.After(now) {
+			fetchDelay = next.Sub(now)
+		}
+		var startFetch <-chan time.Time // HLcap
+		if len(pending) < maxPending {  // HLcap
+			startFetch = time.After(fetchDelay) // enable fetch case  // HLcap
+		} // HLcap
+		// STOPCAP OMIT
+		var first Item
+		var updates chan Item
+		if len(pending) > 0 {
+			first = pending[0]
+			updates = s.updates // enable send case
+		}
+		select {
+		case errc := <-s.closing:
+			errc <- err
+			close(s.updates)
+			return
+		// STARTDEDUPE OMIT
+		case <-startFetch:
+			var fetched []Item
+			fetched, next, err = s.fetcher.Fetch() // HLfetch
+			if err != nil {
+				next = time.Now().Add(10 * time.Second)
+				break
+			}
+			for _, item := range fetched {
+				if !seen[item.GUID] { // HLdupe
+					pending = append(pending, item) // HLdupe
+					seen[item.GUID] = true          // HLdupe
+				} // HLdupe
+			}
+			// STOPDEDUPE OMIT
+		case updates <- first:
+			pending = pending[1:]
+		}
+	}
+}
+
+// loop periodically fecthes Items, sends them on s.updates, and exits
+// when Close is called.  It extends dedupeLoop with logic to run
+// Fetch asynchronously.
+func (s *sub) loop() {
+	const maxPending = 10
+	type fetchResult struct {
+		fetched []Item
+		next    time.Time
+		err     error
+	}
+	// STARTFETCHDONE OMIT
+	var fetchDone chan fetchResult // if non-nil, Fetch is running // HL
+	// STOPFETCHDONE OMIT
+	var pending []Item
+	var next time.Time
+	var err error
+	var seen = make(map[string]bool)
+	for {
+		var fetchDelay time.Duration
+		if now := time.Now(); next.After(now) {
+			fetchDelay = next.Sub(now)
+		}
+		// STARTFETCHIF OMIT
+		var startFetch <-chan time.Time
+		if fetchDone == nil && len(pending) < maxPending { // HLfetch
+			startFetch = time.After(fetchDelay) // enable fetch case
+		}
+		// STOPFETCHIF OMIT
+		var first Item
+		var updates chan Item
+		if len(pending) > 0 {
+			first = pending[0]
+			updates = s.updates // enable send case
+		}
+		// STARTFETCHASYNC OMIT
+		select {
+		case <-startFetch: // HLfetch
+			fetchDone = make(chan fetchResult, 1) // HLfetch
+			go func() {
+				fetched, next, err := s.fetcher.Fetch()
+				fetchDone <- fetchResult{fetched, next, err}
+			}()
+		case result := <-fetchDone: // HLfetch
+			fetchDone = nil // HLfetch
+			// Use result.fetched, result.next, result.err
+			// STOPFETCHASYNC OMIT
+			fetched := result.fetched
+			next, err = result.next, result.err
+			if err != nil {
+				next = time.Now().Add(10 * time.Second)
+				break
+			}
+			for _, item := range fetched {
+				if id := item.GUID; !seen[id] { // HLdupe
+					pending = append(pending, item)
+					seen[id] = true // HLdupe
+				}
+			}
+		case errc := <-s.closing:
+			errc <- err
+			close(s.updates)
+			return
+		case updates <- first:
+			pending = pending[1:]
+		}
+	}
+}
+
+// naiveMerge is a version of Merge that doesn't quite work right.  In
+// particular, the goroutines it starts may block forever on m.updates
+// if the receiver stops receiving.
+type naiveMerge struct {
+	subs    []Subscription
+	updates chan Item
+}
+
+// STARTNAIVEMERGE OMIT
+func NaiveMerge(subs ...Subscription) Subscription {
+	m := &naiveMerge{
+		subs:    subs,
+		updates: make(chan Item),
+	}
+	// STARTNAIVEMERGELOOP OMIT
+	for _, sub := range subs {
+		go func(s Subscription) {
+			for it := range s.Updates() {
+				m.updates <- it // HL
+			}
+		}(sub)
+	}
+	// STOPNAIVEMERGELOOP OMIT
+	return m
+}
+
+// STOPNAIVEMERGE OMIT
+
+// STARTNAIVEMERGECLOSE OMIT
+func (m *naiveMerge) Close() (err error) {
+	for _, sub := range m.subs {
+		if e := sub.Close(); err == nil && e != nil {
+			err = e
+		}
+	}
+	close(m.updates) // HL
+	return
+}
+
+// STOPNAIVEMERGECLOSE OMIT
+
+func (m *naiveMerge) Updates() <-chan Item {
+	return m.updates
+}
+
+type merge struct {
+	subs    []Subscription
+	updates chan Item
+	quit    chan struct{}
+	errs    chan error
+}
+
+// STARTMERGESIG OMIT
+// Merge returns a Subscription that merges the item streams from subs.
+// Closing the merged subscription closes subs.
+func Merge(subs ...Subscription) Subscription {
+	// STOPMERGESIG OMIT
+	m := &merge{
+		subs:    subs,
+		updates: make(chan Item),
+		quit:    make(chan struct{}),
+		errs:    make(chan error),
+	}
+	// STARTMERGE OMIT
+	for _, sub := range subs {
+		go func(s Subscription) {
+			for {
+				var it Item
+				select {
+				case it = <-s.Updates():
+				case <-m.quit: // HL
+					m.errs <- s.Close() // HL
+					return              // HL
+				}
+				select {
+				case m.updates <- it:
+				case <-m.quit: // HL
+					m.errs <- s.Close() // HL
+					return              // HL
+				}
+			}
+		}(sub)
+	}
+	// STOPMERGE OMIT
+	return m
+}
+
+func (m *merge) Updates() <-chan Item {
+	return m.updates
+}
+
+// STARTMERGECLOSE OMIT
+func (m *merge) Close() (err error) {
+	close(m.quit) // HL
+	for _ = range m.subs {
+		if e := <-m.errs; e != nil { // HL
+			err = e
+		}
+	}
+	close(m.updates) // HL
+	return
+}
+
+// STOPMERGECLOSE OMIT
+
+// NaiveDedupe converts a stream of Items that may contain duplicates
+// into one that doesn't.
+func NaiveDedupe(in <-chan Item) <-chan Item {
+	out := make(chan Item)
+	go func() {
+		seen := make(map[string]bool)
+		for it := range in {
+			if !seen[it.GUID] {
+				// BUG: this send blocks if the
+				// receiver closes the Subscription
+				// and stops receiving.
+				out <- it // HL
+				seen[it.GUID] = true
+			}
+		}
+		close(out)
+	}()
+	return out
+}
+
+type deduper struct {
+	s       Subscription
+	updates chan Item
+	closing chan chan error
+}
+
+// Dedupe converts a Subscription that may send duplicate Items into
+// one that doesn't.
+func Dedupe(s Subscription) Subscription {
+	d := &deduper{
+		s:       s,
+		updates: make(chan Item),
+		closing: make(chan chan error),
+	}
+	go d.loop()
+	return d
+}
+
+func (d *deduper) loop() {
+	in := d.s.Updates() // enable receive
+	var pending Item
+	var out chan Item // disable send
+	seen := make(map[string]bool)
+	for {
+		select {
+		case it := <-in:
+			if !seen[it.GUID] {
+				pending = it
+				in = nil        // disable receive
+				out = d.updates // enable send
+				seen[it.GUID] = true
+			}
+		case out <- pending:
+			in = d.s.Updates() // enable receive
+			out = nil          // disable send
+		case errc := <-d.closing:
+			err := d.s.Close()
+			errc <- err
+			close(d.updates)
+			return
+		}
+	}
+}
+
+func (d *deduper) Close() error {
+	errc := make(chan error)
+	d.closing <- errc
+	return <-errc
+}
+
+func (d *deduper) Updates() <-chan Item {
+	return d.updates
+}
+
+// FakeFetch causes Fetch to use a fake fetcher instead of the real
+// one.
+var FakeFetch bool
+
+// Fetch returns a Fetcher for Items from domain.
+func Fetch(domain string) Fetcher {
+	if FakeFetch {
+		return fakeFetch(domain)
+	}
+	return realFetch(domain)
+}
+
+func fakeFetch(domain string) Fetcher {
+	return &fakeFetcher{channel: domain}
+}
+
+type fakeFetcher struct {
+	channel string
+	items   []Item
+}
+
+// FakeDuplicates causes the fake fetcher to return duplicate items.
+var FakeDuplicates bool
+
+func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) {
+	now := time.Now()
+	next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
+	item := Item{
+		Channel: f.channel,
+		Title:   fmt.Sprintf("Item %d", len(f.items)),
+	}
+	item.GUID = item.Channel + "/" + item.Title
+	f.items = append(f.items, item)
+	if FakeDuplicates {
+		items = f.items
+	} else {
+		items = []Item{item}
+	}
+	return
+}
+
+// realFetch returns a fetcher for the specified blogger domain.
+func realFetch(domain string) Fetcher {
+	return NewFetcher(fmt.Sprintf("http://%s/feeds/posts/default?alt=rss", domain))
+}
+
+type fetcher struct {
+	uri   string
+	feed  *rss.Feed
+	items []Item
+}
+
+// NewFetcher returns a Fetcher for uri.
+func NewFetcher(uri string) Fetcher {
+	f := &fetcher{
+		uri: uri,
+	}
+	newChans := func(feed *rss.Feed, chans []*rss.Channel) {}
+	newItems := func(feed *rss.Feed, ch *rss.Channel, items []*rss.Item) {
+		for _, it := range items {
+			f.items = append(f.items, Item{
+				Channel: ch.Title,
+				GUID:    it.Guid,
+				Title:   it.Title,
+			})
+		}
+	}
+	f.feed = rss.New(1 /*minimum interval in minutes*/, true /*respect limit*/, newChans, newItems)
+	return f
+}
+
+func (f *fetcher) Fetch() (items []Item, next time.Time, err error) {
+	fmt.Println("fetching", f.uri)
+	if err = f.feed.Fetch(f.uri, nil); err != nil {
+		return
+	}
+	items = f.items
+	f.items = nil
+	next = time.Now().Add(time.Duration(f.feed.SecondsTillUpdate()) * time.Second)
+	return
+}
+
+// TODO: in a longer talk: move the Subscribe function onto a Reader type, to
+// support dynamically adding and removing Subscriptions.  Reader should dedupe.
+
+// TODO: in a longer talk: make successive Subscribe calls for the same uri
+// share the same underlying Subscription, but provide duplicate streams.