| // run -gcflags=-G=3 |
| |
| // Copyright 2021 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Package chans provides utility functions for working with channels. |
| package main |
| |
| import ( |
| "context" |
| "fmt" |
| "runtime" |
| "sort" |
| "sync" |
| "time" |
| ) |
| |
| // _Equal reports whether two slices are equal: the same length and all |
| // elements equal. All floating point NaNs are considered equal. |
| func _SliceEqual[Elem comparable](s1, s2 []Elem) bool { |
| if len(s1) != len(s2) { |
| return false |
| } |
| for i, v1 := range s1 { |
| v2 := s2[i] |
| if v1 != v2 { |
| isNaN := func(f Elem) bool { return f != f } |
| if !isNaN(v1) || !isNaN(v2) { |
| return false |
| } |
| } |
| } |
| return true |
| } |
| |
| // _ReadAll reads from c until the channel is closed or the context is |
| // canceled, returning all the values read. |
| func _ReadAll[Elem any](ctx context.Context, c <-chan Elem) []Elem { |
| var r []Elem |
| for { |
| select { |
| case <-ctx.Done(): |
| return r |
| case v, ok := <-c: |
| if !ok { |
| return r |
| } |
| r = append(r, v) |
| } |
| } |
| } |
| |
| // _Merge merges two channels into a single channel. |
| // This will leave a goroutine running until either both channels are closed |
| // or the context is canceled, at which point the returned channel is closed. |
| func _Merge[Elem any](ctx context.Context, c1, c2 <-chan Elem) <-chan Elem { |
| r := make(chan Elem) |
| go func(ctx context.Context, c1, c2 <-chan Elem, r chan<- Elem) { |
| defer close(r) |
| for c1 != nil || c2 != nil { |
| select { |
| case <-ctx.Done(): |
| return |
| case v1, ok := <-c1: |
| if ok { |
| r <- v1 |
| } else { |
| c1 = nil |
| } |
| case v2, ok := <-c2: |
| if ok { |
| r <- v2 |
| } else { |
| c2 = nil |
| } |
| } |
| } |
| }(ctx, c1, c2, r) |
| return r |
| } |
| |
| // _Filter calls f on each value read from c. If f returns true the value |
| // is sent on the returned channel. This will leave a goroutine running |
| // until c is closed or the context is canceled, at which point the |
| // returned channel is closed. |
| func _Filter[Elem any](ctx context.Context, c <-chan Elem, f func(Elem) bool) <-chan Elem { |
| r := make(chan Elem) |
| go func(ctx context.Context, c <-chan Elem, f func(Elem) bool, r chan<- Elem) { |
| defer close(r) |
| for { |
| select { |
| case <-ctx.Done(): |
| return |
| case v, ok := <-c: |
| if !ok { |
| return |
| } |
| if f(v) { |
| r <- v |
| } |
| } |
| } |
| }(ctx, c, f, r) |
| return r |
| } |
| |
| // _Sink returns a channel that discards all values sent to it. |
| // This will leave a goroutine running until the context is canceled |
| // or the returned channel is closed. |
| func _Sink[Elem any](ctx context.Context) chan<- Elem { |
| r := make(chan Elem) |
| go func(ctx context.Context, r <-chan Elem) { |
| for { |
| select { |
| case <-ctx.Done(): |
| return |
| case _, ok := <-r: |
| if !ok { |
| return |
| } |
| } |
| } |
| }(ctx, r) |
| return r |
| } |
| |
| // An Exclusive is a value that may only be used by a single goroutine |
| // at a time. This is implemented using channels rather than a mutex. |
| type _Exclusive[Val any] struct { |
| c chan Val |
| } |
| |
| // _MakeExclusive makes an initialized exclusive value. |
| func _MakeExclusive[Val any](initial Val) *_Exclusive[Val] { |
| r := &_Exclusive[Val]{ |
| c: make(chan Val, 1), |
| } |
| r.c <- initial |
| return r |
| } |
| |
| // _Acquire acquires the exclusive value for private use. |
| // It must be released using the Release method. |
| func (e *_Exclusive[Val]) Acquire() Val { |
| return <-e.c |
| } |
| |
| // TryAcquire attempts to acquire the value. The ok result reports whether |
| // the value was acquired. If the value is acquired, it must be released |
| // using the Release method. |
| func (e *_Exclusive[Val]) TryAcquire() (v Val, ok bool) { |
| select { |
| case r := <-e.c: |
| return r, true |
| default: |
| return v, false |
| } |
| } |
| |
| // Release updates and releases the value. |
| // This method panics if the value has not been acquired. |
| func (e *_Exclusive[Val]) Release(v Val) { |
| select { |
| case e.c <- v: |
| default: |
| panic("_Exclusive Release without Acquire") |
| } |
| } |
| |
| // Ranger returns a Sender and a Receiver. The Receiver provides a |
| // Next method to retrieve values. The Sender provides a Send method |
| // to send values and a Close method to stop sending values. The Next |
| // method indicates when the Sender has been closed, and the Send |
| // method indicates when the Receiver has been freed. |
| // |
| // This is a convenient way to exit a goroutine sending values when |
| // the receiver stops reading them. |
| func _Ranger[Elem any]() (*_Sender[Elem], *_Receiver[Elem]) { |
| c := make(chan Elem) |
| d := make(chan struct{}) |
| s := &_Sender[Elem]{ |
| values: c, |
| done: d, |
| } |
| r := &_Receiver[Elem] { |
| values: c, |
| done: d, |
| } |
| runtime.SetFinalizer(r, (*_Receiver[Elem]).finalize) |
| return s, r |
| } |
| |
| // A _Sender is used to send values to a Receiver. |
| type _Sender[Elem any] struct { |
| values chan<- Elem |
| done <-chan struct{} |
| } |
| |
| // Send sends a value to the receiver. It reports whether the value was sent. |
| // The value will not be sent if the context is closed or the receiver |
| // is freed. |
| func (s *_Sender[Elem]) Send(ctx context.Context, v Elem) bool { |
| select { |
| case <-ctx.Done(): |
| return false |
| case s.values <- v: |
| return true |
| case <-s.done: |
| return false |
| } |
| } |
| |
| // Close tells the receiver that no more values will arrive. |
| // After Close is called, the _Sender may no longer be used. |
| func (s *_Sender[Elem]) Close() { |
| close(s.values) |
| } |
| |
| // A _Receiver receives values from a _Sender. |
| type _Receiver[Elem any] struct { |
| values <-chan Elem |
| done chan<- struct{} |
| } |
| |
| // Next returns the next value from the channel. The bool result indicates |
| // whether the value is valid. |
| func (r *_Receiver[Elem]) Next(ctx context.Context) (v Elem, ok bool) { |
| select { |
| case <-ctx.Done(): |
| case v, ok = <-r.values: |
| } |
| return v, ok |
| } |
| |
| // finalize is a finalizer for the receiver. |
| func (r *_Receiver[Elem]) finalize() { |
| close(r.done) |
| } |
| |
| func TestReadAll() { |
| c := make(chan int) |
| go func() { |
| c <- 4 |
| c <- 2 |
| c <- 5 |
| close(c) |
| }() |
| got := _ReadAll(context.Background(), c) |
| want := []int{4, 2, 5} |
| if !_SliceEqual(got, want) { |
| panic(fmt.Sprintf("_ReadAll returned %v, want %v", got, want)) |
| } |
| } |
| |
| func TestMerge() { |
| c1 := make(chan int) |
| c2 := make(chan int) |
| go func() { |
| c1 <- 1 |
| c1 <- 3 |
| c1 <- 5 |
| close(c1) |
| }() |
| go func() { |
| c2 <- 2 |
| c2 <- 4 |
| c2 <- 6 |
| close(c2) |
| }() |
| ctx := context.Background() |
| got := _ReadAll(ctx, _Merge(ctx, c1, c2)) |
| sort.Ints(got) |
| want := []int{1, 2, 3, 4, 5, 6} |
| if !_SliceEqual(got, want) { |
| panic(fmt.Sprintf("_Merge returned %v, want %v", got, want)) |
| } |
| } |
| |
| func TestFilter() { |
| c := make(chan int) |
| go func() { |
| c <- 1 |
| c <- 2 |
| c <- 3 |
| close(c) |
| }() |
| even := func(i int) bool { return i%2 == 0 } |
| ctx := context.Background() |
| got := _ReadAll(ctx, _Filter(ctx, c, even)) |
| want := []int{2} |
| if !_SliceEqual(got, want) { |
| panic(fmt.Sprintf("_Filter returned %v, want %v", got, want)) |
| } |
| } |
| |
| func TestSink() { |
| c := _Sink[int](context.Background()) |
| after := time.NewTimer(time.Minute) |
| defer after.Stop() |
| send := func(v int) { |
| select { |
| case c <- v: |
| case <-after.C: |
| panic("timed out sending to _Sink") |
| } |
| } |
| send(1) |
| send(2) |
| send(3) |
| close(c) |
| } |
| |
| func TestExclusive() { |
| val := 0 |
| ex := _MakeExclusive(&val) |
| |
| var wg sync.WaitGroup |
| f := func() { |
| defer wg.Done() |
| for i := 0; i < 10; i++ { |
| p := ex.Acquire() |
| (*p)++ |
| ex.Release(p) |
| } |
| } |
| |
| wg.Add(2) |
| go f() |
| go f() |
| |
| wg.Wait() |
| if val != 20 { |
| panic(fmt.Sprintf("after Acquire/Release loop got %d, want 20", val)) |
| } |
| } |
| |
| func TestExclusiveTry() { |
| s := "" |
| ex := _MakeExclusive(&s) |
| p, ok := ex.TryAcquire() |
| if !ok { |
| panic("TryAcquire failed") |
| } |
| *p = "a" |
| |
| var wg sync.WaitGroup |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| _, ok := ex.TryAcquire() |
| if ok { |
| panic(fmt.Sprintf("TryAcquire succeeded unexpectedly")) |
| } |
| }() |
| wg.Wait() |
| |
| ex.Release(p) |
| |
| p, ok = ex.TryAcquire() |
| if !ok { |
| panic(fmt.Sprintf("TryAcquire failed")) |
| } |
| } |
| |
| func TestRanger() { |
| s, r := _Ranger[int]() |
| |
| ctx := context.Background() |
| go func() { |
| // Receive one value then exit. |
| v, ok := r.Next(ctx) |
| if !ok { |
| panic(fmt.Sprintf("did not receive any values")) |
| } else if v != 1 { |
| panic(fmt.Sprintf("received %d, want 1", v)) |
| } |
| }() |
| |
| c1 := make(chan bool) |
| c2 := make(chan bool) |
| go func() { |
| defer close(c2) |
| if !s.Send(ctx, 1) { |
| panic(fmt.Sprintf("Send failed unexpectedly")) |
| } |
| close(c1) |
| if s.Send(ctx, 2) { |
| panic(fmt.Sprintf("Send succeeded unexpectedly")) |
| } |
| }() |
| |
| <-c1 |
| |
| // Force a garbage collection to try to get the finalizers to run. |
| runtime.GC() |
| |
| select { |
| case <-c2: |
| case <-time.After(time.Minute): |
| panic("_Ranger Send should have failed, but timed out") |
| } |
| } |
| |
| func main() { |
| TestReadAll() |
| TestMerge() |
| TestFilter() |
| TestSink() |
| TestExclusive() |
| TestExclusiveTry() |
| TestRanger() |
| } |