quic: gate and queue synchronization primitives

Add a form of monitor (in the sense of the synchronization primitive)
for controlling access to queues and streams.

We call this a "gate". A gate acts as a mutex and condition variable
with one bit of state. A gate may be locked and unlocked. Lock
operations may optionally block on the gate condition being set.
Unlock operations always record the new value of the condition.

Gates play nicely with contexts.

Unlike traditional condition variables, gates do not suffer from
spurious wakeups: A goroutine waiting for a gate condition is not
woken before the condition is set.

Gates are inspired by the queue design from Bryan Mills's talk,
Rethinking Classical Concurrency Patterns.

Add a queue implemented with a gate.

For golang/go#58547

Change-Id: Ibec6d1f29a2c03a7184fca7392ed5639f96b6485
Reviewed-on: https://go-review.googlesource.com/c/net/+/513955
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Damien Neil <dneil@google.com>
diff --git a/internal/quic/gate.go b/internal/quic/gate.go
new file mode 100644
index 0000000..efb28da
--- /dev/null
+++ b/internal/quic/gate.go
@@ -0,0 +1,104 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import "context"
+
+// An gate is a monitor (mutex + condition variable) with one bit of state.
+//
+// The condition may be either set or unset.
+// Lock operations may be unconditional, or wait for the condition to be set.
+// Unlock operations record the new state of the condition.
+type gate struct {
+	// When unlocked, exactly one of set or unset contains a value.
+	// When locked, neither chan contains a value.
+	set   chan struct{}
+	unset chan struct{}
+}
+
+func newGate() gate {
+	g := gate{
+		set:   make(chan struct{}, 1),
+		unset: make(chan struct{}, 1),
+	}
+	g.unset <- struct{}{}
+	return g
+}
+
+// lock acquires the gate unconditionally.
+// It reports whether the condition is set.
+func (g *gate) lock() (set bool) {
+	select {
+	case <-g.set:
+		return true
+	case <-g.unset:
+		return false
+	}
+}
+
+// waitAndLock waits until the condition is set before acquiring the gate.
+func (g *gate) waitAndLock() {
+	<-g.set
+}
+
+// waitAndLockContext waits until the condition is set before acquiring the gate.
+// If the context expires, waitAndLockContext returns an error and does not acquire the gate.
+func (g *gate) waitAndLockContext(ctx context.Context) error {
+	select {
+	case <-g.set:
+		return nil
+	default:
+	}
+	select {
+	case <-g.set:
+		return nil
+	case <-ctx.Done():
+		return ctx.Err()
+	}
+}
+
+// waitWithLock releases an acquired gate until the condition is set.
+// The caller must have previously acquired the gate.
+// Upon return from waitWithLock, the gate will still be held.
+// If waitWithLock returns nil, the condition is set.
+func (g *gate) waitWithLock(ctx context.Context) error {
+	g.unlock(false)
+	err := g.waitAndLockContext(ctx)
+	if err != nil {
+		if g.lock() {
+			// The condition was set in between the context expiring
+			// and us reacquiring the gate.
+			err = nil
+		}
+	}
+	return err
+}
+
+// lockIfSet acquires the gate if and only if the condition is set.
+func (g *gate) lockIfSet() (acquired bool) {
+	select {
+	case <-g.set:
+		return true
+	default:
+		return false
+	}
+}
+
+// unlock sets the condition and releases the gate.
+func (g *gate) unlock(set bool) {
+	if set {
+		g.set <- struct{}{}
+	} else {
+		g.unset <- struct{}{}
+	}
+}
+
+// unlock sets the condition to the result of f and releases the gate.
+// Useful in defers.
+func (g *gate) unlockFunc(f func() bool) {
+	g.unlock(f())
+}
diff --git a/internal/quic/gate_test.go b/internal/quic/gate_test.go
new file mode 100644
index 0000000..0122e39
--- /dev/null
+++ b/internal/quic/gate_test.go
@@ -0,0 +1,142 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import (
+	"context"
+	"testing"
+	"time"
+)
+
+func TestGateLockAndUnlock(t *testing.T) {
+	g := newGate()
+	if set := g.lock(); set {
+		t.Errorf("g.lock() of never-locked gate: true, want false")
+	}
+	unlockedc := make(chan struct{})
+	donec := make(chan struct{})
+	go func() {
+		defer close(donec)
+		set := g.lock()
+		select {
+		case <-unlockedc:
+		default:
+			t.Errorf("g.lock() succeeded while gate was held")
+		}
+		if !set {
+			t.Errorf("g.lock() of set gate: false, want true")
+		}
+		g.unlock(false)
+	}()
+	time.Sleep(1 * time.Millisecond)
+	close(unlockedc)
+	g.unlock(true)
+	<-donec
+	if set := g.lock(); set {
+		t.Errorf("g.lock() of unset gate: true, want false")
+	}
+}
+
+func TestGateWaitAndLock(t *testing.T) {
+	g := newGate()
+	set := false
+	go func() {
+		for i := 0; i < 3; i++ {
+			g.lock()
+			g.unlock(false)
+			time.Sleep(1 * time.Millisecond)
+		}
+		g.lock()
+		set = true
+		g.unlock(true)
+	}()
+	g.waitAndLock()
+	if !set {
+		t.Errorf("g.waitAndLock() returned before gate was set")
+	}
+}
+
+func TestGateWaitAndLockContext(t *testing.T) {
+	g := newGate()
+	// waitAndLockContext is canceled
+	ctx, cancel := context.WithCancel(context.Background())
+	go func() {
+		time.Sleep(1 * time.Millisecond)
+		cancel()
+	}()
+	if err := g.waitAndLockContext(ctx); err != context.Canceled {
+		t.Errorf("g.waitAndLockContext() = %v, want context.Canceled", err)
+	}
+	// waitAndLockContext succeeds
+	set := false
+	go func() {
+		time.Sleep(1 * time.Millisecond)
+		g.lock()
+		set = true
+		g.unlock(true)
+	}()
+	if err := g.waitAndLockContext(context.Background()); err != nil {
+		t.Errorf("g.waitAndLockContext() = %v, want nil", err)
+	}
+	if !set {
+		t.Errorf("g.waitAndLockContext() returned before gate was set")
+	}
+	g.unlock(true)
+	// waitAndLockContext succeeds when the gate is set and the context is canceled
+	if err := g.waitAndLockContext(ctx); err != nil {
+		t.Errorf("g.waitAndLockContext() = %v, want nil", err)
+	}
+}
+
+func TestGateWaitWithLock(t *testing.T) {
+	g := newGate()
+	// waitWithLock is canceled
+	ctx, cancel := context.WithCancel(context.Background())
+	go func() {
+		time.Sleep(1 * time.Millisecond)
+		cancel()
+	}()
+	g.lock()
+	if err := g.waitWithLock(ctx); err != context.Canceled {
+		t.Errorf("g.waitWithLock() = %v, want context.Canceled", err)
+	}
+	// waitWithLock succeeds
+	set := false
+	go func() {
+		g.lock()
+		set = true
+		g.unlock(true)
+	}()
+	time.Sleep(1 * time.Millisecond)
+	if err := g.waitWithLock(context.Background()); err != nil {
+		t.Errorf("g.waitWithLock() = %v, want nil", err)
+	}
+	if !set {
+		t.Errorf("g.waitWithLock() returned before gate was set")
+	}
+}
+
+func TestGateLockIfSet(t *testing.T) {
+	g := newGate()
+	if locked := g.lockIfSet(); locked {
+		t.Errorf("g.lockIfSet() of unset gate = %v, want false", locked)
+	}
+	g.lock()
+	g.unlock(true)
+	if locked := g.lockIfSet(); !locked {
+		t.Errorf("g.lockIfSet() of set gate = %v, want true", locked)
+	}
+}
+
+func TestGateUnlockFunc(t *testing.T) {
+	g := newGate()
+	go func() {
+		g.lock()
+		defer g.unlockFunc(func() bool { return true })
+	}()
+	g.waitAndLock()
+}
diff --git a/internal/quic/queue.go b/internal/quic/queue.go
new file mode 100644
index 0000000..9bb71ca
--- /dev/null
+++ b/internal/quic/queue.go
@@ -0,0 +1,65 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import "context"
+
+// A queue is an unbounded queue of some item (new connections and streams).
+type queue[T any] struct {
+	// The gate condition is set if the queue is non-empty or closed.
+	gate gate
+	err  error
+	q    []T
+}
+
+func newQueue[T any]() queue[T] {
+	return queue[T]{gate: newGate()}
+}
+
+// close closes the queue, causing pending and future pop operations
+// to return immediately with err.
+func (q *queue[T]) close(err error) {
+	q.gate.lock()
+	defer q.unlock()
+	if q.err == nil {
+		q.err = err
+	}
+}
+
+// put appends an item to the queue.
+// It returns true if the item was added, false if the queue is closed.
+func (q *queue[T]) put(v T) bool {
+	q.gate.lock()
+	defer q.unlock()
+	if q.err != nil {
+		return false
+	}
+	q.q = append(q.q, v)
+	return true
+}
+
+// get removes the first item from the queue, blocking until ctx is done, an item is available,
+// or the queue is closed.
+func (q *queue[T]) get(ctx context.Context) (T, error) {
+	var zero T
+	if err := q.gate.waitAndLockContext(ctx); err != nil {
+		return zero, err
+	}
+	defer q.unlock()
+	if q.err != nil {
+		return zero, q.err
+	}
+	v := q.q[0]
+	copy(q.q[:], q.q[1:])
+	q.q[len(q.q)-1] = zero
+	q.q = q.q[:len(q.q)-1]
+	return v, nil
+}
+
+func (q *queue[T]) unlock() {
+	q.gate.unlock(q.err != nil || len(q.q) > 0)
+}
diff --git a/internal/quic/queue_test.go b/internal/quic/queue_test.go
new file mode 100644
index 0000000..8debeff
--- /dev/null
+++ b/internal/quic/queue_test.go
@@ -0,0 +1,59 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import (
+	"context"
+	"io"
+	"testing"
+	"time"
+)
+
+func TestQueue(t *testing.T) {
+	nonblocking, cancel := context.WithCancel(context.Background())
+	cancel()
+
+	q := newQueue[int]()
+	if got, err := q.get(nonblocking); err != context.Canceled {
+		t.Fatalf("q.get() = %v, %v, want nil, contex.Canceled", got, err)
+	}
+
+	if !q.put(1) {
+		t.Fatalf("q.put(1) = false, want true")
+	}
+	if !q.put(2) {
+		t.Fatalf("q.put(2) = false, want true")
+	}
+	if got, err := q.get(nonblocking); got != 1 || err != nil {
+		t.Fatalf("q.get() = %v, %v, want 1, nil", got, err)
+	}
+	if got, err := q.get(nonblocking); got != 2 || err != nil {
+		t.Fatalf("q.get() = %v, %v, want 2, nil", got, err)
+	}
+	if got, err := q.get(nonblocking); err != context.Canceled {
+		t.Fatalf("q.get() = %v, %v, want nil, contex.Canceled", got, err)
+	}
+
+	go func() {
+		time.Sleep(1 * time.Millisecond)
+		q.put(3)
+	}()
+	if got, err := q.get(context.Background()); got != 3 || err != nil {
+		t.Fatalf("q.get() = %v, %v, want 3, nil", got, err)
+	}
+
+	if !q.put(4) {
+		t.Fatalf("q.put(2) = false, want true")
+	}
+	q.close(io.EOF)
+	if got, err := q.get(context.Background()); got != 0 || err != io.EOF {
+		t.Fatalf("q.get() = %v, %v, want 0, io.EOF", got, err)
+	}
+	if q.put(5) {
+		t.Fatalf("q.put(5) = true, want false")
+	}
+}