| // naivemain runs the Subscribe example with the naive Subscribe |
| // implementation and a fake RSS fetcher. |
| // +build ignore,OMIT |
| |
| package main |
| |
| import ( |
| "fmt" |
| "math/rand" |
| "time" |
| ) |
| |
| // STARTITEM OMIT |
| // An Item is a stripped-down RSS item. |
| type Item struct{ Title, Channel, GUID string } |
| |
| // STOPITEM OMIT |
| |
| // STARTFETCHER OMIT |
| // A Fetcher fetches Items and returns the time when the next fetch should be |
| // attempted. On failure, Fetch returns a non-nil error. |
| type Fetcher interface { |
| Fetch() (items []Item, next time.Time, err error) |
| } |
| |
| // STOPFETCHER OMIT |
| |
| // STARTSUBSCRIPTION OMIT |
| // A Subscription delivers Items over a channel. Close cancels the |
| // subscription, closes the Updates channel, and returns the last fetch error, |
| // if any. |
| type Subscription interface { |
| Updates() <-chan Item |
| Close() error |
| } |
| |
| // STOPSUBSCRIPTION OMIT |
| |
| // STARTSUBSCRIBE OMIT |
| // Subscribe returns a new Subscription that uses fetcher to fetch Items. |
| func Subscribe(fetcher Fetcher) Subscription { |
| s := &sub{ |
| fetcher: fetcher, |
| updates: make(chan Item), // for Updates |
| closing: make(chan chan error), // for Close |
| } |
| go s.loop() |
| return s |
| } |
| |
| // STOPSUBSCRIBE OMIT |
| |
| // sub implements the Subscription interface. |
| type sub struct { |
| fetcher Fetcher // fetches items |
| updates chan Item // sends items to the user |
| closing chan chan error // for Close |
| } |
| |
| // STARTUPDATES OMIT |
| func (s *sub) Updates() <-chan Item { |
| return s.updates |
| } |
| |
| // STOPUPDATES OMIT |
| |
| // STARTCLOSE OMIT |
| // STARTCLOSESIG OMIT |
| func (s *sub) Close() error { |
| // STOPCLOSESIG OMIT |
| errc := make(chan error) |
| s.closing <- errc // HLchan |
| return <-errc // HLchan |
| } |
| |
| // STOPCLOSE OMIT |
| |
| // loopCloseOnly is a version of loop that includes only the logic |
| // that handles Close. |
| func (s *sub) loopCloseOnly() { |
| // STARTCLOSEONLY OMIT |
| var err error // set when Fetch fails |
| for { |
| select { |
| case errc := <-s.closing: // HLchan |
| errc <- err // HLchan |
| close(s.updates) // tells receiver we're done |
| return |
| } |
| } |
| // STOPCLOSEONLY OMIT |
| } |
| |
| // loopFetchOnly is a version of loop that includes only the logic |
| // that calls Fetch. |
| func (s *sub) loopFetchOnly() { |
| // STARTFETCHONLY OMIT |
| var pending []Item // appended by fetch; consumed by send |
| var next time.Time // initially January 1, year 0 |
| var err error |
| for { |
| var fetchDelay time.Duration // initally 0 (no delay) |
| if now := time.Now(); next.After(now) { |
| fetchDelay = next.Sub(now) |
| } |
| startFetch := time.After(fetchDelay) |
| |
| select { |
| case <-startFetch: |
| var fetched []Item |
| fetched, next, err = s.fetcher.Fetch() |
| if err != nil { |
| next = time.Now().Add(10 * time.Second) |
| break |
| } |
| pending = append(pending, fetched...) |
| } |
| } |
| // STOPFETCHONLY OMIT |
| } |
| |
| // loopSendOnly is a version of loop that includes only the logic for |
| // sending items to s.updates. |
| func (s *sub) loopSendOnly() { |
| // STARTSENDONLY OMIT |
| var pending []Item // appended by fetch; consumed by send |
| for { |
| var first Item |
| var updates chan Item // HLupdates |
| if len(pending) > 0 { |
| first = pending[0] |
| updates = s.updates // enable send case // HLupdates |
| } |
| |
| select { |
| case updates <- first: |
| pending = pending[1:] |
| } |
| } |
| // STOPSENDONLY OMIT |
| } |
| |
| // mergedLoop is a version of loop that combines loopCloseOnly, |
| // loopFetchOnly, and loopSendOnly. |
| func (s *sub) mergedLoop() { |
| // STARTFETCHVARS OMIT |
| var pending []Item |
| var next time.Time |
| var err error |
| // STOPFETCHVARS OMIT |
| for { |
| // STARTNOCAP OMIT |
| var fetchDelay time.Duration |
| if now := time.Now(); next.After(now) { |
| fetchDelay = next.Sub(now) |
| } |
| startFetch := time.After(fetchDelay) |
| // STOPNOCAP OMIT |
| var first Item |
| var updates chan Item |
| if len(pending) > 0 { |
| first = pending[0] |
| updates = s.updates // enable send case |
| } |
| |
| // STARTSELECT OMIT |
| select { |
| case errc := <-s.closing: // HLcases |
| errc <- err |
| close(s.updates) |
| return |
| // STARTFETCHCASE OMIT |
| case <-startFetch: // HLcases |
| var fetched []Item |
| fetched, next, err = s.fetcher.Fetch() // HLfetch |
| if err != nil { |
| next = time.Now().Add(10 * time.Second) |
| break |
| } |
| pending = append(pending, fetched...) // HLfetch |
| // STOPFETCHCASE OMIT |
| case updates <- first: // HLcases |
| pending = pending[1:] |
| } |
| // STOPSELECT OMIT |
| } |
| } |
| |
| // dedupeLoop extends mergedLoop with deduping of fetched items. |
| func (s *sub) dedupeLoop() { |
| const maxPending = 10 |
| // STARTSEEN OMIT |
| var pending []Item |
| var next time.Time |
| var err error |
| var seen = make(map[string]bool) // set of item.GUIDs // HLseen |
| // STOPSEEN OMIT |
| for { |
| // STARTCAP OMIT |
| var fetchDelay time.Duration |
| if now := time.Now(); next.After(now) { |
| fetchDelay = next.Sub(now) |
| } |
| var startFetch <-chan time.Time // HLcap |
| if len(pending) < maxPending { // HLcap |
| startFetch = time.After(fetchDelay) // enable fetch case // HLcap |
| } // HLcap |
| // STOPCAP OMIT |
| var first Item |
| var updates chan Item |
| if len(pending) > 0 { |
| first = pending[0] |
| updates = s.updates // enable send case |
| } |
| select { |
| case errc := <-s.closing: |
| errc <- err |
| close(s.updates) |
| return |
| // STARTDEDUPE OMIT |
| case <-startFetch: |
| var fetched []Item |
| fetched, next, err = s.fetcher.Fetch() // HLfetch |
| if err != nil { |
| next = time.Now().Add(10 * time.Second) |
| break |
| } |
| for _, item := range fetched { |
| if !seen[item.GUID] { // HLdupe |
| pending = append(pending, item) // HLdupe |
| seen[item.GUID] = true // HLdupe |
| } // HLdupe |
| } |
| // STOPDEDUPE OMIT |
| case updates <- first: |
| pending = pending[1:] |
| } |
| } |
| } |
| |
| // loop periodically fecthes Items, sends them on s.updates, and exits |
| // when Close is called. It extends dedupeLoop with logic to run |
| // Fetch asynchronously. |
| func (s *sub) loop() { |
| const maxPending = 10 |
| type fetchResult struct { |
| fetched []Item |
| next time.Time |
| err error |
| } |
| // STARTFETCHDONE OMIT |
| var fetchDone chan fetchResult // if non-nil, Fetch is running // HL |
| // STOPFETCHDONE OMIT |
| var pending []Item |
| var next time.Time |
| var err error |
| var seen = make(map[string]bool) |
| for { |
| var fetchDelay time.Duration |
| if now := time.Now(); next.After(now) { |
| fetchDelay = next.Sub(now) |
| } |
| // STARTFETCHIF OMIT |
| var startFetch <-chan time.Time |
| if fetchDone == nil && len(pending) < maxPending { // HLfetch |
| startFetch = time.After(fetchDelay) // enable fetch case |
| } |
| // STOPFETCHIF OMIT |
| var first Item |
| var updates chan Item |
| if len(pending) > 0 { |
| first = pending[0] |
| updates = s.updates // enable send case |
| } |
| // STARTFETCHASYNC OMIT |
| select { |
| case <-startFetch: // HLfetch |
| fetchDone = make(chan fetchResult, 1) // HLfetch |
| go func() { |
| fetched, next, err := s.fetcher.Fetch() |
| fetchDone <- fetchResult{fetched, next, err} |
| }() |
| case result := <-fetchDone: // HLfetch |
| fetchDone = nil // HLfetch |
| // Use result.fetched, result.next, result.err |
| // STOPFETCHASYNC OMIT |
| fetched := result.fetched |
| next, err = result.next, result.err |
| if err != nil { |
| next = time.Now().Add(10 * time.Second) |
| break |
| } |
| for _, item := range fetched { |
| if id := item.GUID; !seen[id] { // HLdupe |
| pending = append(pending, item) |
| seen[id] = true // HLdupe |
| } |
| } |
| case errc := <-s.closing: |
| errc <- err |
| close(s.updates) |
| return |
| case updates <- first: |
| pending = pending[1:] |
| } |
| } |
| } |
| |
| // naiveMerge is a version of Merge that doesn't quite work right. In |
| // particular, the goroutines it starts may block forever on m.updates |
| // if the receiver stops receiving. |
| type naiveMerge struct { |
| subs []Subscription |
| updates chan Item |
| } |
| |
| // STARTNAIVEMERGE OMIT |
| func NaiveMerge(subs ...Subscription) Subscription { |
| m := &naiveMerge{ |
| subs: subs, |
| updates: make(chan Item), |
| } |
| // STARTNAIVEMERGELOOP OMIT |
| for _, sub := range subs { |
| go func(s Subscription) { |
| for it := range s.Updates() { |
| m.updates <- it // HL |
| } |
| }(sub) |
| } |
| // STOPNAIVEMERGELOOP OMIT |
| return m |
| } |
| |
| // STOPNAIVEMERGE OMIT |
| |
| // STARTNAIVEMERGECLOSE OMIT |
| func (m *naiveMerge) Close() (err error) { |
| for _, sub := range m.subs { |
| if e := sub.Close(); err == nil && e != nil { |
| err = e |
| } |
| } |
| close(m.updates) // HL |
| return |
| } |
| |
| // STOPNAIVEMERGECLOSE OMIT |
| |
| func (m *naiveMerge) Updates() <-chan Item { |
| return m.updates |
| } |
| |
| type merge struct { |
| subs []Subscription |
| updates chan Item |
| quit chan struct{} |
| errs chan error |
| } |
| |
| // STARTMERGESIG OMIT |
| // Merge returns a Subscription that merges the item streams from subs. |
| // Closing the merged subscription closes subs. |
| func Merge(subs ...Subscription) Subscription { |
| // STOPMERGESIG OMIT |
| m := &merge{ |
| subs: subs, |
| updates: make(chan Item), |
| quit: make(chan struct{}), |
| errs: make(chan error), |
| } |
| // STARTMERGE OMIT |
| for _, sub := range subs { |
| go func(s Subscription) { |
| for { |
| var it Item |
| select { |
| case it = <-s.Updates(): |
| case <-m.quit: // HL |
| m.errs <- s.Close() // HL |
| return // HL |
| } |
| select { |
| case m.updates <- it: |
| case <-m.quit: // HL |
| m.errs <- s.Close() // HL |
| return // HL |
| } |
| } |
| }(sub) |
| } |
| // STOPMERGE OMIT |
| return m |
| } |
| |
| func (m *merge) Updates() <-chan Item { |
| return m.updates |
| } |
| |
| // STARTMERGECLOSE OMIT |
| func (m *merge) Close() (err error) { |
| close(m.quit) // HL |
| for _ = range m.subs { |
| if e := <-m.errs; e != nil { // HL |
| err = e |
| } |
| } |
| close(m.updates) // HL |
| return |
| } |
| |
| // STOPMERGECLOSE OMIT |
| |
| // NaiveDedupe converts a stream of Items that may contain duplicates |
| // into one that doesn't. |
| func NaiveDedupe(in <-chan Item) <-chan Item { |
| out := make(chan Item) |
| go func() { |
| seen := make(map[string]bool) |
| for it := range in { |
| if !seen[it.GUID] { |
| // BUG: this send blocks if the |
| // receiver closes the Subscription |
| // and stops receiving. |
| out <- it // HL |
| seen[it.GUID] = true |
| } |
| } |
| close(out) |
| }() |
| return out |
| } |
| |
| type deduper struct { |
| s Subscription |
| updates chan Item |
| closing chan chan error |
| } |
| |
| // Dedupe converts a Subscription that may send duplicate Items into |
| // one that doesn't. |
| func Dedupe(s Subscription) Subscription { |
| d := &deduper{ |
| s: s, |
| updates: make(chan Item), |
| closing: make(chan chan error), |
| } |
| go d.loop() |
| return d |
| } |
| |
| func (d *deduper) loop() { |
| in := d.s.Updates() // enable receive |
| var pending Item |
| var out chan Item // disable send |
| seen := make(map[string]bool) |
| for { |
| select { |
| case it := <-in: |
| if !seen[it.GUID] { |
| pending = it |
| in = nil // disable receive |
| out = d.updates // enable send |
| seen[it.GUID] = true |
| } |
| case out <- pending: |
| in = d.s.Updates() // enable receive |
| out = nil // disable send |
| case errc := <-d.closing: |
| err := d.s.Close() |
| errc <- err |
| close(d.updates) |
| return |
| } |
| } |
| } |
| |
| func (d *deduper) Close() error { |
| errc := make(chan error) |
| d.closing <- errc |
| return <-errc |
| } |
| |
| func (d *deduper) Updates() <-chan Item { |
| return d.updates |
| } |
| |
| // Fetch returns a Fetcher for Items from domain. |
| func Fetch(domain string) Fetcher { |
| return fakeFetch(domain) |
| } |
| |
| func fakeFetch(domain string) Fetcher { |
| return &fakeFetcher{channel: domain} |
| } |
| |
| type fakeFetcher struct { |
| channel string |
| items []Item |
| } |
| |
| // FakeDuplicates causes the fake fetcher to return duplicate items. |
| var FakeDuplicates bool |
| |
| func (f *fakeFetcher) Fetch() (items []Item, next time.Time, err error) { |
| now := time.Now() |
| next = now.Add(time.Duration(rand.Intn(5)) * 500 * time.Millisecond) |
| item := Item{ |
| Channel: f.channel, |
| Title: fmt.Sprintf("Item %d", len(f.items)), |
| } |
| item.GUID = item.Channel + "/" + item.Title |
| f.items = append(f.items, item) |
| if FakeDuplicates { |
| items = f.items |
| } else { |
| items = []Item{item} |
| } |
| return |
| } |
| |
| func NaiveSubscribe(fetcher Fetcher) Subscription { |
| s := &naiveSub{ |
| fetcher: fetcher, |
| updates: make(chan Item), |
| } |
| go s.loop() |
| return s |
| } |
| |
| type naiveSub struct { |
| fetcher Fetcher |
| updates chan Item |
| closed bool |
| err error |
| } |
| |
| func (s *naiveSub) Updates() <-chan Item { |
| return s.updates |
| } |
| |
| func (s *naiveSub) loop() { |
| // STARTNAIVE OMIT |
| for { |
| if s.closed { // HLsync |
| close(s.updates) |
| return |
| } |
| items, next, err := s.fetcher.Fetch() |
| if err != nil { |
| s.err = err // HLsync |
| time.Sleep(10 * time.Second) // HLsleep |
| continue |
| } |
| for _, item := range items { |
| s.updates <- item // HLsend |
| } |
| if now := time.Now(); next.After(now) { |
| time.Sleep(next.Sub(now)) // HLsleep |
| } |
| } |
| // STOPNAIVE OMIT |
| } |
| |
| func (s *naiveSub) Close() error { |
| s.closed = true // HLsync |
| return s.err // HLsync |
| } |
| |
| func init() { |
| rand.Seed(time.Now().UnixNano()) |
| } |
| |
| func main() { |
| // Subscribe to some feeds, and create a merged update stream. |
| merged := Merge( |
| NaiveSubscribe(Fetch("blog.golang.org")), |
| NaiveSubscribe(Fetch("googleblog.blogspot.com")), |
| NaiveSubscribe(Fetch("googledevelopers.blogspot.com"))) |
| |
| // Close the subscriptions after some time. |
| time.AfterFunc(3*time.Second, func() { |
| fmt.Println("closed:", merged.Close()) |
| }) |
| |
| // Print the stream. |
| for it := range merged.Updates() { |
| fmt.Println(it.Channel, it.Title) |
| } |
| |
| // The loops are still running. Let the race detector notice. |
| time.Sleep(1 * time.Second) |
| |
| panic("show me the stacks") |
| } |