singleflight: copy from internal/singleflight in standard library

Updates golang/go#17338

Change-Id: Icf1972cdec0bbc9b3142141d9e706c07f312efc0
Reviewed-on: https://go-review.googlesource.com/30292
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
diff --git a/singleflight/singleflight.go b/singleflight/singleflight.go
new file mode 100644
index 0000000..9a4f8d5
--- /dev/null
+++ b/singleflight/singleflight.go
@@ -0,0 +1,111 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package singleflight provides a duplicate function call suppression
+// mechanism.
+package singleflight // import "golang.org/x/sync/singleflight"
+
+import "sync"
+
+// call is an in-flight or completed singleflight.Do call
+type call struct {
+	wg sync.WaitGroup
+
+	// These fields are written once before the WaitGroup is done
+	// and are only read after the WaitGroup is done.
+	val interface{}
+	err error
+
+	// These fields are read and written with the singleflight
+	// mutex held before the WaitGroup is done, and are read but
+	// not written after the WaitGroup is done.
+	dups  int
+	chans []chan<- Result
+}
+
+// Group represents a class of work and forms a namespace in
+// which units of work can be executed with duplicate suppression.
+type Group struct {
+	mu sync.Mutex       // protects m
+	m  map[string]*call // lazily initialized
+}
+
+// Result holds the results of Do, so they can be passed
+// on a channel.
+type Result struct {
+	Val    interface{}
+	Err    error
+	Shared bool
+}
+
+// Do executes and returns the results of the given function, making
+// sure that only one execution is in-flight for a given key at a
+// time. If a duplicate comes in, the duplicate caller waits for the
+// original to complete and receives the same results.
+// The return value shared indicates whether v was given to multiple callers.
+func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
+	g.mu.Lock()
+	if g.m == nil {
+		g.m = make(map[string]*call)
+	}
+	if c, ok := g.m[key]; ok {
+		c.dups++
+		g.mu.Unlock()
+		c.wg.Wait()
+		return c.val, c.err, true
+	}
+	c := new(call)
+	c.wg.Add(1)
+	g.m[key] = c
+	g.mu.Unlock()
+
+	g.doCall(c, key, fn)
+	return c.val, c.err, c.dups > 0
+}
+
+// DoChan is like Do but returns a channel that will receive the
+// results when they are ready.
+func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
+	ch := make(chan Result, 1)
+	g.mu.Lock()
+	if g.m == nil {
+		g.m = make(map[string]*call)
+	}
+	if c, ok := g.m[key]; ok {
+		c.dups++
+		c.chans = append(c.chans, ch)
+		g.mu.Unlock()
+		return ch
+	}
+	c := &call{chans: []chan<- Result{ch}}
+	c.wg.Add(1)
+	g.m[key] = c
+	g.mu.Unlock()
+
+	go g.doCall(c, key, fn)
+
+	return ch
+}
+
+// doCall handles the single call for a key.
+func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
+	c.val, c.err = fn()
+	c.wg.Done()
+
+	g.mu.Lock()
+	delete(g.m, key)
+	for _, ch := range c.chans {
+		ch <- Result{c.val, c.err, c.dups > 0}
+	}
+	g.mu.Unlock()
+}
+
+// Forget tells the singleflight to forget about a key.  Future calls
+// to Do for this key will call the function rather than waiting for
+// an earlier call to complete.
+func (g *Group) Forget(key string) {
+	g.mu.Lock()
+	delete(g.m, key)
+	g.mu.Unlock()
+}
diff --git a/singleflight/singleflight_test.go b/singleflight/singleflight_test.go
new file mode 100644
index 0000000..5e6f1b3
--- /dev/null
+++ b/singleflight/singleflight_test.go
@@ -0,0 +1,87 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package singleflight
+
+import (
+	"errors"
+	"fmt"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+)
+
+func TestDo(t *testing.T) {
+	var g Group
+	v, err, _ := g.Do("key", func() (interface{}, error) {
+		return "bar", nil
+	})
+	if got, want := fmt.Sprintf("%v (%T)", v, v), "bar (string)"; got != want {
+		t.Errorf("Do = %v; want %v", got, want)
+	}
+	if err != nil {
+		t.Errorf("Do error = %v", err)
+	}
+}
+
+func TestDoErr(t *testing.T) {
+	var g Group
+	someErr := errors.New("Some error")
+	v, err, _ := g.Do("key", func() (interface{}, error) {
+		return nil, someErr
+	})
+	if err != someErr {
+		t.Errorf("Do error = %v; want someErr %v", err, someErr)
+	}
+	if v != nil {
+		t.Errorf("unexpected non-nil value %#v", v)
+	}
+}
+
+func TestDoDupSuppress(t *testing.T) {
+	var g Group
+	var wg1, wg2 sync.WaitGroup
+	c := make(chan string, 1)
+	var calls int32
+	fn := func() (interface{}, error) {
+		if atomic.AddInt32(&calls, 1) == 1 {
+			// First invocation.
+			wg1.Done()
+		}
+		v := <-c
+		c <- v // pump; make available for any future calls
+
+		time.Sleep(10 * time.Millisecond) // let more goroutines enter Do
+
+		return v, nil
+	}
+
+	const n = 10
+	wg1.Add(1)
+	for i := 0; i < n; i++ {
+		wg1.Add(1)
+		wg2.Add(1)
+		go func() {
+			defer wg2.Done()
+			wg1.Done()
+			v, err, _ := g.Do("key", fn)
+			if err != nil {
+				t.Errorf("Do error: %v", err)
+				return
+			}
+			if s, _ := v.(string); s != "bar" {
+				t.Errorf("Do = %T %v; want %q", v, v, "bar")
+			}
+		}()
+	}
+	wg1.Wait()
+	// At least one goroutine is in fn now and all of them have at
+	// least reached the line before the Do.
+	c <- "bar"
+	wg2.Wait()
+	if got := atomic.LoadInt32(&calls); got <= 0 || got >= n {
+		t.Errorf("number of calls = %d; want over 0 and less than %d", got, n)
+	}
+}