// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.21

package quic

import (
	"context"
	"errors"
	"fmt"
	"path/filepath"
	"runtime"
	"sync"
)

// asyncTestState permits handling asynchronous operations in a synchronous test.
//
// For example, a test may want to write to a stream and observe that
// STREAM frames are sent with the contents of the write in response
// to MAX_STREAM_DATA frames received from the peer.
// The Stream.Write is an asynchronous operation, but the test is simpler
// if we can start the write, observe the first STREAM frame sent,
// send a MAX_STREAM_DATA frame, observe the next STREAM frame sent, etc.
//
// We do this by instrumenting points where operations can block.
// We start async operations like Write in a goroutine,
// and wait for the operation to either finish or hit a blocking point.
// When the connection event loop is idle, we check a list of
// blocked operations to see if any can be woken.
type asyncTestState struct {
	mu      sync.Mutex
	notify  chan struct{}
	blocked map[*blockedAsync]struct{}
}

// An asyncOp is an asynchronous operation that results in (T, error).
type asyncOp[T any] struct {
	v   T
	err error

	caller     string
	state      *asyncTestState
	donec      chan struct{}
	cancelFunc context.CancelFunc
}

// cancel cancels the async operation's context, and waits for
// the operation to complete.
func (a *asyncOp[T]) cancel() {
	select {
	case <-a.donec:
		return // already done
	default:
	}
	a.cancelFunc()
	<-a.state.notify
	select {
	case <-a.donec:
	default:
		panic(fmt.Errorf("%v: async op failed to finish after being canceled", a.caller))
	}
}

var errNotDone = errors.New("async op is not done")

// result returns the result of the async operation.
// It returns errNotDone if the operation is still in progress.
//
// Note that unlike a traditional async/await, this doesn't block
// waiting for the operation to complete. Since tests have full
// control over the progress of operations, an asyncOp can only
// become done in reaction to the test taking some action.
func (a *asyncOp[T]) result() (v T, err error) {
	select {
	case <-a.donec:
		return a.v, a.err
	default:
		return v, errNotDone
	}
}

// A blockedAsync is a blocked async operation.
type blockedAsync struct {
	// Exactly one of these will be set, depending on the type of blocked operation.
	g  *gate
	ch <-chan struct{}

	donec chan struct{} // closed when the operation is unblocked
}

type asyncContextKey struct{}

// runAsync starts an asynchronous operation.
//
// The function f should call a blocking function such as
// Stream.Write or Conn.AcceptStream and return its result.
// It must use the provided context.
func runAsync[T any](ts *testConn, f func(context.Context) (T, error)) *asyncOp[T] {
	as := &ts.asyncTestState
	if as.notify == nil {
		as.notify = make(chan struct{})
		as.blocked = make(map[*blockedAsync]struct{})
	}
	_, file, line, _ := runtime.Caller(1)
	ctx := context.WithValue(context.Background(), asyncContextKey{}, true)
	ctx, cancel := context.WithCancel(ctx)
	a := &asyncOp[T]{
		state:      as,
		caller:     fmt.Sprintf("%v:%v", filepath.Base(file), line),
		donec:      make(chan struct{}),
		cancelFunc: cancel,
	}
	go func() {
		a.v, a.err = f(ctx)
		close(a.donec)
		as.notify <- struct{}{}
	}()
	ts.t.Cleanup(func() {
		if _, err := a.result(); err == errNotDone {
			ts.t.Errorf("%v: async operation is still executing at end of test", a.caller)
			a.cancel()
		}
	})
	// Wait for the operation to either finish or block.
	<-as.notify
	return a
}

// waitAndLockGate replaces gate.waitAndLock in tests.
func (as *asyncTestState) waitAndLockGate(ctx context.Context, g *gate) error {
	if g.lockIfSet() {
		// Gate can be acquired without blocking.
		return nil
	}
	return as.block(ctx, &blockedAsync{
		g: g,
	})
}

// waitOnDone replaces receiving from a chan struct{} in tests.
func (as *asyncTestState) waitOnDone(ctx context.Context, ch <-chan struct{}) error {
	select {
	case <-ch:
		return nil // read without blocking
	default:
	}
	return as.block(ctx, &blockedAsync{
		ch: ch,
	})
}

// block waits for a blocked async operation to complete.
func (as *asyncTestState) block(ctx context.Context, b *blockedAsync) error {
	if err := ctx.Err(); err != nil {
		// Context has already expired.
		return err
	}
	if ctx.Value(asyncContextKey{}) == nil {
		// Context is not one that we've created, and hasn't expired.
		// This probably indicates that we've tried to perform a
		// blocking operation without using the async test harness here,
		// which may have unpredictable results.
		panic("blocking async point with unexpected Context")
	}
	b.donec = make(chan struct{})
	// Record this as a pending blocking operation.
	as.mu.Lock()
	as.blocked[b] = struct{}{}
	as.mu.Unlock()
	// Notify the creator of the operation that we're blocked,
	// and wait to be woken up.
	as.notify <- struct{}{}
	select {
	case <-b.donec:
	case <-ctx.Done():
		return ctx.Err()
	}
	return nil
}

// wakeAsync tries to wake up a blocked async operation.
// It returns true if one was woken, false otherwise.
func (as *asyncTestState) wakeAsync() bool {
	as.mu.Lock()
	var woken *blockedAsync
	for w := range as.blocked {
		switch {
		case w.g != nil:
			if w.g.lockIfSet() {
				woken = w
			}
		case w.ch != nil:
			select {
			case <-w.ch:
				woken = w
			default:
			}
		}
		if woken != nil {
			delete(as.blocked, woken)
			break
		}
	}
	as.mu.Unlock()
	if woken == nil {
		return false
	}
	close(woken.donec)
	<-as.notify // must not hold as.mu while blocked here
	return true
}
