blob: fc1f422dec4b98d2d76db48c5175bcb3eaf3c164 [file] [log] [blame]
// +build OMIT
// dedupermain runs the Subscribe example with several duplicate
// subscriptions to demonstrate deduping.
package main
import (
"fmt"
"math/rand"
"time"
)
// STARTITEM OMIT
// An Item is a stripped-down RSS item.
type Item struct{ Title, Channel, GUID string }
// STOPITEM OMIT
// STARTFETCHER OMIT
// A Fetcher fetches Items and returns the time when the next fetch should be
// attempted. On failure, Fetch returns a non-nil error.
type Fetcher interface {
Fetch() (items []Item, next time.Time, err error)
}
// STOPFETCHER OMIT
// STARTSUBSCRIPTION OMIT
// A Subscription delivers Items over a channel. Close cancels the
// subscription, closes the Updates channel, and returns the last fetch error,
// if any.
type Subscription interface {
Updates() <-chan Item
Close() error
}
// STOPSUBSCRIPTION OMIT
// STARTSUBSCRIBE OMIT
// Subscribe returns a new Subscription that uses fetcher to fetch Items.
func Subscribe(fetcher Fetcher) Subscription {
s := &sub{
fetcher: fetcher,
updates: make(chan Item), // for Updates
closing: make(chan chan error), // for Close
}
go s.loop()
return s
}
// STOPSUBSCRIBE OMIT
// sub implements the Subscription interface.
type sub struct {
fetcher Fetcher // fetches items
updates chan Item // sends items to the user
closing chan chan error // for Close
}
// STARTUPDATES OMIT
func (s *sub) Updates() <-chan Item {
return s.updates
}
// STOPUPDATES OMIT
// STARTCLOSE OMIT
// STARTCLOSESIG OMIT
func (s *sub) Close() error {
// STOPCLOSESIG OMIT
errc := make(chan error)
s.closing <- errc // HLchan
return <-errc // HLchan
}
// STOPCLOSE OMIT
// loopCloseOnly is a version of loop that includes only the logic
// that handles Close.
func (s *sub) loopCloseOnly() {
// STARTCLOSEONLY OMIT
var err error // set when Fetch fails
for {
select {
case errc := <-s.closing: // HLchan
errc <- err // HLchan
close(s.updates) // tells receiver we're done
return
}
}
// STOPCLOSEONLY OMIT
}
// loopFetchOnly is a version of loop that includes only the logic
// that calls Fetch.
func (s *sub) loopFetchOnly() {
// STARTFETCHONLY OMIT
var pending []Item // appended by fetch; consumed by send
var next time.Time // initially January 1, year 0
var err error
for {
var fetchDelay time.Duration // initally 0 (no delay)
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
select {
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)
}
}
// STOPFETCHONLY OMIT
}
// loopSendOnly is a version of loop that includes only the logic for
// sending items to s.updates.
func (s *sub) loopSendOnly() {
// STARTSENDONLY OMIT
var pending []Item // appended by fetch; consumed by send
for {
var first Item
var updates chan Item // HLupdates
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case // HLupdates
}
select {
case updates <- first:
pending = pending[1:]
}
}
// STOPSENDONLY OMIT
}
// mergedLoop is a version of loop that combines loopCloseOnly,
// loopFetchOnly, and loopSendOnly.
func (s *sub) mergedLoop() {
// STARTFETCHVARS OMIT
var pending []Item
var next time.Time
var err error
// STOPFETCHVARS OMIT
for {
// STARTNOCAP OMIT
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)
// STOPNOCAP OMIT
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case
}
// STARTSELECT OMIT
select {
case errc := <-s.closing: // HLcases
errc <- err
close(s.updates)
return
// STARTFETCHCASE OMIT
case <-startFetch: // HLcases
var fetched []Item
fetched, next, err = s.fetcher.Fetch() // HLfetch
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...) // HLfetch
// STOPFETCHCASE OMIT
case updates <- first: // HLcases
pending = pending[1:]
}
// STOPSELECT OMIT
}
}
// dedupeLoop extends mergedLoop with deduping of fetched items.
func (s *sub) dedupeLoop() {
const maxPending = 10
// STARTSEEN OMIT
var pending []Item
var next time.Time
var err error
var seen = make(map[string]bool) // set of item.GUIDs // HLseen
// STOPSEEN OMIT
for {
// STARTCAP OMIT
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
var startFetch <-chan time.Time // HLcap
if len(pending) < maxPending { // HLcap
startFetch = time.After(fetchDelay) // enable fetch case // HLcap
} // HLcap
// STOPCAP OMIT
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case
}
select {
case errc := <-s.closing:
errc <- err
close(s.updates)
return
// STARTDEDUPE OMIT
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch() // HLfetch
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if !seen[item.GUID] { // HLdupe
pending = append(pending, item) // HLdupe
seen[item.GUID] = true // HLdupe
} // HLdupe
}
// STOPDEDUPE OMIT
case updates <- first:
pending = pending[1:]
}
}
}
// loop periodically fecthes Items, sends them on s.updates, and exits
// when Close is called. It extends dedupeLoop with logic to run
// Fetch asynchronously.
func (s *sub) loop() {
const maxPending = 10
type fetchResult struct {
fetched []Item
next time.Time
err error
}
// STARTFETCHDONE OMIT
var fetchDone chan fetchResult // if non-nil, Fetch is running // HL
// STOPFETCHDONE OMIT
var pending []Item
var next time.Time
var err error
var seen = make(map[string]bool)
for {
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
// STARTFETCHIF OMIT
var startFetch <-chan time.Time
if fetchDone == nil && len(pending) < maxPending { // HLfetch
startFetch = time.After(fetchDelay) // enable fetch case
}
// STOPFETCHIF OMIT
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // enable send case
}
// STARTFETCHASYNC OMIT
select {
case <-startFetch: // HLfetch
fetchDone = make(chan fetchResult, 1) // HLfetch
go func() {
fetched, next, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, next, err}
}()
case result := <-fetchDone: // HLfetch
fetchDone = nil // HLfetch
// Use result.fetched, result.next, result.err
// STOPFETCHASYNC OMIT
fetched := result.fetched
next, err = result.next, result.err
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if id := item.GUID; !seen[id] { // HLdupe
pending = append(pending, item)
seen[id] = true // HLdupe
}
}
case errc := <-s.closing:
errc <- err
close(s.updates)
return
case updates <- first:
pending = pending[1:]
}
}
}
// naiveMerge is a version of Merge that doesn't quite work right. In
// particular, the goroutines it starts may block forever on m.updates
// if the receiver stops receiving.
type naiveMerge struct {
subs []Subscription
updates chan Item
}
// STARTNAIVEMERGE OMIT
func NaiveMerge(subs ...Subscription) Subscription {
m := &naiveMerge{
subs: subs,
updates: make(chan Item),
}
// STARTNAIVEMERGELOOP OMIT
for _, sub := range subs {
go func(s Subscription) {
for it := range s.Updates() {
m.updates <- it // HL
}
}(sub)
}
// STOPNAIVEMERGELOOP OMIT
return m
}
// STOPNAIVEMERGE OMIT
// STARTNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Close() (err error) {
for _, sub := range m.subs {
if e := sub.Close(); err == nil && e != nil {
err = e
}
}
close(m.updates) // HL
return
}
// STOPNAIVEMERGECLOSE OMIT
func (m *naiveMerge) Updates() <-chan Item {
return m.updates
}
type merge struct {
subs []Subscription
updates chan Item
quit chan struct{}
errs chan error
}
// STARTMERGESIG OMIT
// Merge returns a Subscription that merges the item streams from subs.
// Closing the merged subscription closes subs.
func Merge(subs ...Subscription) Subscription {
// STOPMERGESIG OMIT
m := &merge{
subs: subs,
updates: make(chan Item),
quit: make(chan struct{}),
errs: make(chan error),
}
// STARTMERGE OMIT
for _, sub := range subs {
go func(s Subscription) {
for {
var it Item
select {
case it = <-s.Updates():
case <-m.quit: // HL
m.errs <- s.Close() // HL
return // HL
}
select {
case m.updates <- it:
case <-m.quit: // HL
m.errs <- s.Close() // HL
return // HL
}
}
}(sub)
}
// STOPMERGE OMIT
return m
}
func (m *merge) Updates() <-chan Item {
return m.updates
}
// STARTMERGECLOSE OMIT
func (m *merge) Close() (err error) {
close(m.quit) // HL
for _ = range m.subs {
if e := <-m.errs; e != nil { // HL
err = e
}
}
close(m.updates) // HL
return
}
// STOPMERGECLOSE OMIT
// NaiveDedupe converts a stream of Items that may contain duplicates
// into one that doesn't.
func NaiveDedupe(in <-chan Item) <-chan Item {
out := make(chan Item)
go func() {
seen := make(map[string]bool)
for it := range in {
if !seen[it.GUID] {
// BUG: this send blocks if the
// receiver closes the Subscription
// and stops receiving.
out <- it // HL
seen[it.GUID] = true
}
}
close(out)
}()
return out
}
type deduper struct {
s Subscription
updates chan Item
closing chan chan error
}
// Dedupe converts a Subscription that may send duplicate Items into
// one that doesn't.
func Dedupe(s Subscription) Subscription {
d := &deduper{
s: s,
updates: make(chan Item),
closing: make(chan chan error),
}
go d.loop()
return d
}
func (d *deduper) loop() {
in := d.s.Updates() // enable receive
var pending Item
var out chan Item // disable send
seen := make(map[string]bool)
for {
select {
case it := <-in:
if !seen[it.GUID] {
pending = it
in = nil // disable receive
out = d.updates // enable send
seen[it.GUID] = true
}
case out <- pending:
in = d.s.Updates() // enable receive
out = nil // disable send
case errc := <-d.closing:
err := d.s.Close()
errc <- err
close(d.updates)
return
}
}
}
func (d *deduper) Close() error {
errc := make(chan error)
d.closing <- errc
return <-errc
}
func (d *deduper) Updates() <-chan Item {
return d.updates
}
// Fetch returns a Fetcher for Items from domain.
func Fetch(domain string) Fetcher {
return fakeFetch(domain)
}
func fakeFetch(domain string) Fetcher {
return &fakeFetcher{channel: domain}
}
type fakeFetcher struct {
channel string
items []Item
}
// FakeDuplicates causes the fake fetcher to return duplicate items.
var FakeDuplicates bool
func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) {
now := time.Now()
next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond)
item := Item{
Channel: f.channel,
Title: fmt.Sprintf("Item %d", len(f.items)),
}
item.GUID = item.Channel + "/" + item.Title
f.items = append(f.items, item)
if FakeDuplicates {
items = f.items
} else {
items = []Item{item}
}
return
}
func init() {
rand.Seed(time.Now().UnixNano())
}
// STARTMAIN OMIT
func main() {
// STARTMERGECALL OMIT
// Subscribe to some feeds, and create a merged update stream.
merged := Dedupe(Merge(
Subscribe(Fetch("blog.golang.org")),
Subscribe(Fetch("blog.golang.org")),
Subscribe(Fetch("blog.golang.org")),
Subscribe(Fetch("googleblog.blogspot.com")),
Subscribe(Fetch("googledevelopers.blogspot.com"))))
// STOPMERGECALL OMIT
// Close the subscriptions after some time.
time.AfterFunc(3*time.Second, func() {
fmt.Println("closed:", merged.Close())
})
// Print the stream.
for it := range merged.Updates() {
fmt.Println(it.Channel, it.Title)
}
panic("show me the stacks")
}
// STOPMAIN OMIT