// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package quic

import (
	"context"
	"fmt"
	"io"
	"math"
	"sync"
	"testing"
)

func TestStreamsCreate(t *testing.T) {
	ctx := canceledContext()
	tc := newTestConn(t, clientSide, permissiveTransportParameters)
	tc.handshake()

	s, err := tc.conn.NewStream(ctx)
	if err != nil {
		t.Fatalf("NewStream: %v", err)
	}
	s.Flush() // open the stream
	tc.wantFrame("created bidirectional stream 0",
		packetType1RTT, debugFrameStream{
			id:   0, // client-initiated, bidi, number 0
			data: []byte{},
		})

	s, err = tc.conn.NewSendOnlyStream(ctx)
	if err != nil {
		t.Fatalf("NewStream: %v", err)
	}
	s.Flush() // open the stream
	tc.wantFrame("created unidirectional stream 0",
		packetType1RTT, debugFrameStream{
			id:   2, // client-initiated, uni, number 0
			data: []byte{},
		})

	s, err = tc.conn.NewStream(ctx)
	if err != nil {
		t.Fatalf("NewStream: %v", err)
	}
	s.Flush() // open the stream
	tc.wantFrame("created bidirectional stream 1",
		packetType1RTT, debugFrameStream{
			id:   4, // client-initiated, uni, number 4
			data: []byte{},
		})
}

func TestStreamsAccept(t *testing.T) {
	ctx := canceledContext()
	tc := newTestConn(t, serverSide)
	tc.handshake()

	tc.writeFrames(packetType1RTT,
		debugFrameStream{
			id: 0, // client-initiated, bidi, number 0
		},
		debugFrameStream{
			id: 2, // client-initiated, uni, number 0
		},
		debugFrameStream{
			id: 4, // client-initiated, bidi, number 1
		})

	for _, accept := range []struct {
		id       streamID
		readOnly bool
	}{
		{0, false},
		{2, true},
		{4, false},
	} {
		s, err := tc.conn.AcceptStream(ctx)
		if err != nil {
			t.Fatalf("conn.AcceptStream() = %v, want stream %v", err, accept.id)
		}
		if got, want := s.id, accept.id; got != want {
			t.Fatalf("conn.AcceptStream() = stream %v, want %v", got, want)
		}
		if got, want := s.IsReadOnly(), accept.readOnly; got != want {
			t.Fatalf("stream %v: s.IsReadOnly() = %v, want %v", accept.id, got, want)
		}
	}

	_, err := tc.conn.AcceptStream(ctx)
	if err != context.Canceled {
		t.Fatalf("conn.AcceptStream() = %v, want context.Canceled", err)
	}
}

func TestStreamsBlockingAccept(t *testing.T) {
	tc := newTestConn(t, serverSide)
	tc.handshake()

	a := runAsync(tc, func(ctx context.Context) (*Stream, error) {
		return tc.conn.AcceptStream(ctx)
	})
	if _, err := a.result(); err != errNotDone {
		tc.t.Fatalf("AcceptStream() = _, %v; want errNotDone", err)
	}

	sid := newStreamID(clientSide, bidiStream, 0)
	tc.writeFrames(packetType1RTT,
		debugFrameStream{
			id: sid,
		})

	s, err := a.result()
	if err != nil {
		t.Fatalf("conn.AcceptStream() = _, %v, want stream", err)
	}
	if got, want := s.id, sid; got != want {
		t.Fatalf("conn.AcceptStream() = stream %v, want %v", got, want)
	}
	if got, want := s.IsReadOnly(), false; got != want {
		t.Fatalf("s.IsReadOnly() = %v, want %v", got, want)
	}
}

func TestStreamsLocalStreamNotCreated(t *testing.T) {
	// "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
	// if it receives a STREAM frame for a locally initiated stream that has
	// not yet been created [...]"
	// https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-3
	tc := newTestConn(t, serverSide)
	tc.handshake()

	tc.writeFrames(packetType1RTT,
		debugFrameStream{
			id: 1, // server-initiated, bidi, number 0
		})
	tc.wantFrame("peer sent STREAM frame for an uncreated local stream",
		packetType1RTT, debugFrameConnectionCloseTransport{
			code: errStreamState,
		})
}

func TestStreamsLocalStreamClosed(t *testing.T) {
	tc, s := newTestConnAndLocalStream(t, clientSide, uniStream, permissiveTransportParameters)
	s.CloseWrite()
	tc.wantFrame("FIN for closed stream",
		packetType1RTT, debugFrameStream{
			id:   newStreamID(clientSide, uniStream, 0),
			fin:  true,
			data: []byte{},
		})
	tc.writeAckForAll()

	tc.writeFrames(packetType1RTT, debugFrameStopSending{
		id: newStreamID(clientSide, uniStream, 0),
	})
	tc.wantIdle("frame for finalized stream is ignored")

	// ACKing the last stream packet should have cleaned up the stream.
	// Check that we don't have any state left.
	if got := len(tc.conn.streams.streams); got != 0 {
		t.Fatalf("after close, len(tc.conn.streams.streams) = %v, want 0", got)
	}
	if tc.conn.streams.queueMeta.head != nil {
		t.Fatalf("after close, stream send queue is not empty; should be")
	}
}

func TestStreamsStreamSendOnly(t *testing.T) {
	// "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
	// if it receives a STREAM frame for a locally initiated stream that has
	// not yet been created [...]"
	// https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-3
	ctx := canceledContext()
	tc := newTestConn(t, serverSide, permissiveTransportParameters)
	tc.handshake()

	s, err := tc.conn.NewSendOnlyStream(ctx)
	if err != nil {
		t.Fatalf("NewStream: %v", err)
	}
	s.Flush() // open the stream
	tc.wantFrame("created unidirectional stream 0",
		packetType1RTT, debugFrameStream{
			id:   3, // server-initiated, uni, number 0
			data: []byte{},
		})

	tc.writeFrames(packetType1RTT,
		debugFrameStream{
			id: 3, // server-initiated, bidi, number 0
		})
	tc.wantFrame("peer sent STREAM frame for a send-only stream",
		packetType1RTT, debugFrameConnectionCloseTransport{
			code: errStreamState,
		})
}

func TestStreamsWriteQueueFairness(t *testing.T) {
	ctx := canceledContext()
	const dataLen = 1 << 20
	const numStreams = 3
	tc := newTestConn(t, clientSide, func(p *transportParameters) {
		p.initialMaxStreamsBidi = numStreams
		p.initialMaxData = 1<<62 - 1
		p.initialMaxStreamDataBidiRemote = dataLen
	}, func(c *Config) {
		c.MaxStreamWriteBufferSize = dataLen
	})
	tc.handshake()
	tc.ignoreFrame(frameTypeAck)

	// Create a number of streams, and write a bunch of data to them.
	// The streams are not limited by flow control.
	//
	// The first stream we create is going to immediately consume all
	// available congestion window.
	//
	// Once we've created all the remaining streams,
	// we start sending acks back to open up the congestion window.
	// We verify that all streams can make progress.
	data := make([]byte, dataLen)
	var streams []*Stream
	for i := 0; i < numStreams; i++ {
		s, err := tc.conn.NewStream(ctx)
		if err != nil {
			t.Fatal(err)
		}
		streams = append(streams, s)
		if n, err := s.Write(data); n != len(data) || err != nil {
			t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data))
		}
		// Wait for the stream to finish writing whatever frames it can before
		// congestion control blocks it.
		tc.wait()
	}

	sent := make([]int64, len(streams))
	for {
		p := tc.readPacket()
		if p == nil {
			break
		}
		tc.writeAckForLatest()
		for _, f := range p.frames {
			sf, ok := f.(debugFrameStream)
			if !ok {
				t.Fatalf("got unexpected frame (want STREAM): %v", sf)
			}
			if got, want := sf.off, sent[sf.id.num()]; got != want {
				t.Fatalf("got frame: %v\nwant offset: %v", sf, want)
			}
			sent[sf.id.num()] = sf.off + int64(len(sf.data))
			// Look at the amount of data sent by all streams, excluding the first one.
			// (The first stream got a head start when it consumed the initial window.)
			//
			// We expect that difference between the streams making the most and least progress
			// so far will be less than the maximum datagram size.
			minSent := sent[1]
			maxSent := sent[1]
			for _, s := range sent[2:] {
				minSent = min(minSent, s)
				maxSent = max(maxSent, s)
			}
			const maxDelta = maxUDPPayloadSize
			if d := maxSent - minSent; d > maxDelta {
				t.Fatalf("stream data sent: %v; delta=%v, want delta <= %v", sent, d, maxDelta)
			}
		}
	}
	// Final check that every stream sent the full amount of data expected.
	for num, s := range sent {
		if s != dataLen {
			t.Errorf("stream %v sent %v bytes, want %v", num, s, dataLen)
		}
	}
}

func TestStreamsShutdown(t *testing.T) {
	// These tests verify that a stream is removed from the Conn's map of live streams
	// after it is fully shut down.
	//
	// Each case consists of a setup step, after which one stream should exist,
	// and a shutdown step, after which no streams should remain in the Conn.
	for _, test := range []struct {
		name     string
		side     streamSide
		styp     streamType
		setup    func(*testing.T, *testConn, *Stream)
		shutdown func(*testing.T, *testConn, *Stream)
	}{{
		name: "closed",
		side: localStream,
		styp: uniStream,
		setup: func(t *testing.T, tc *testConn, s *Stream) {
			s.Close()
		},
		shutdown: func(t *testing.T, tc *testConn, s *Stream) {
			tc.writeAckForAll()
		},
	}, {
		name: "local close",
		side: localStream,
		styp: bidiStream,
		setup: func(t *testing.T, tc *testConn, s *Stream) {
			tc.writeFrames(packetType1RTT, debugFrameResetStream{
				id: s.id,
			})
			s.Close()
		},
		shutdown: func(t *testing.T, tc *testConn, s *Stream) {
			tc.writeAckForAll()
		},
	}, {
		name: "remote reset",
		side: localStream,
		styp: bidiStream,
		setup: func(t *testing.T, tc *testConn, s *Stream) {
			s.Close()
			tc.wantIdle("all frames after Close are ignored")
			tc.writeAckForAll()
		},
		shutdown: func(t *testing.T, tc *testConn, s *Stream) {
			tc.writeFrames(packetType1RTT, debugFrameResetStream{
				id: s.id,
			})
		},
	}, {
		name: "local close",
		side: remoteStream,
		styp: uniStream,
		setup: func(t *testing.T, tc *testConn, s *Stream) {
			tc.writeFrames(packetType1RTT, debugFrameStream{
				id:  s.id,
				fin: true,
			})
			if n, err := s.Read(make([]byte, 16)); n != 0 || err != io.EOF {
				t.Errorf("Read() = %v, %v; want 0, io.EOF", n, err)
			}
		},
		shutdown: func(t *testing.T, tc *testConn, s *Stream) {
			s.CloseRead()
		},
	}} {
		name := fmt.Sprintf("%v/%v/%v", test.side, test.styp, test.name)
		t.Run(name, func(t *testing.T) {
			tc, s := newTestConnAndStream(t, serverSide, test.side, test.styp,
				permissiveTransportParameters)
			tc.ignoreFrame(frameTypeStreamBase)
			tc.ignoreFrame(frameTypeStopSending)
			test.setup(t, tc, s)
			tc.wantIdle("conn should be idle after setup")
			if got, want := len(tc.conn.streams.streams), 1; got != want {
				t.Fatalf("after setup: %v streams in Conn's map; want %v", got, want)
			}
			test.shutdown(t, tc, s)
			tc.wantIdle("conn should be idle after shutdown")
			if got, want := len(tc.conn.streams.streams), 0; got != want {
				t.Fatalf("after shutdown: %v streams in Conn's map; want %v", got, want)
			}
		})
	}
}

func TestStreamsCreateAndCloseRemote(t *testing.T) {
	// This test exercises creating new streams in response to frames
	// from the peer, and cleaning up after streams are fully closed.
	//
	// It's overfitted to the current implementation, but works through
	// a number of corner cases in that implementation.
	//
	// Disable verbose logging in this test: It sends a lot of packets,
	// and they're not especially interesting on their own.
	defer func(vv bool) {
		*testVV = vv
	}(*testVV)
	*testVV = false
	ctx := canceledContext()
	tc := newTestConn(t, serverSide, permissiveTransportParameters)
	tc.handshake()
	tc.ignoreFrame(frameTypeAck)
	type op struct {
		id streamID
	}
	type streamOp op
	type resetOp op
	type acceptOp op
	const noStream = math.MaxInt64
	stringID := func(id streamID) string {
		return fmt.Sprintf("%v/%v", id.streamType(), id.num())
	}
	for _, op := range []any{
		"opening bidi/5 implicitly opens bidi/0-4",
		streamOp{newStreamID(clientSide, bidiStream, 5)},
		acceptOp{newStreamID(clientSide, bidiStream, 5)},
		"bidi/3 was implicitly opened",
		streamOp{newStreamID(clientSide, bidiStream, 3)},
		acceptOp{newStreamID(clientSide, bidiStream, 3)},
		resetOp{newStreamID(clientSide, bidiStream, 3)},
		"bidi/3 is done, frames for it are discarded",
		streamOp{newStreamID(clientSide, bidiStream, 3)},
		"open and close some uni streams as well",
		streamOp{newStreamID(clientSide, uniStream, 0)},
		acceptOp{newStreamID(clientSide, uniStream, 0)},
		streamOp{newStreamID(clientSide, uniStream, 1)},
		acceptOp{newStreamID(clientSide, uniStream, 1)},
		streamOp{newStreamID(clientSide, uniStream, 2)},
		acceptOp{newStreamID(clientSide, uniStream, 2)},
		resetOp{newStreamID(clientSide, uniStream, 1)},
		resetOp{newStreamID(clientSide, uniStream, 0)},
		resetOp{newStreamID(clientSide, uniStream, 2)},
		"closing an implicitly opened stream causes us to accept it",
		resetOp{newStreamID(clientSide, bidiStream, 0)},
		acceptOp{newStreamID(clientSide, bidiStream, 0)},
		resetOp{newStreamID(clientSide, bidiStream, 1)},
		acceptOp{newStreamID(clientSide, bidiStream, 1)},
		resetOp{newStreamID(clientSide, bidiStream, 2)},
		acceptOp{newStreamID(clientSide, bidiStream, 2)},
		"stream bidi/3 was reset previously",
		resetOp{newStreamID(clientSide, bidiStream, 3)},
		resetOp{newStreamID(clientSide, bidiStream, 4)},
		acceptOp{newStreamID(clientSide, bidiStream, 4)},
		"stream bidi/5 was reset previously",
		resetOp{newStreamID(clientSide, bidiStream, 5)},
		"stream bidi/6 was not implicitly opened",
		resetOp{newStreamID(clientSide, bidiStream, 6)},
		acceptOp{newStreamID(clientSide, bidiStream, 6)},
	} {
		if _, ok := op.(acceptOp); !ok {
			if s, err := tc.conn.AcceptStream(ctx); err == nil {
				t.Fatalf("accepted stream %v, want none", stringID(s.id))
			}
		}
		switch op := op.(type) {
		case string:
			t.Log("# " + op)
		case streamOp:
			t.Logf("open stream %v", stringID(op.id))
			tc.writeFrames(packetType1RTT, debugFrameStream{
				id: streamID(op.id),
			})
		case resetOp:
			t.Logf("reset stream %v", stringID(op.id))
			tc.writeFrames(packetType1RTT, debugFrameResetStream{
				id: op.id,
			})
		case acceptOp:
			s := tc.acceptStream()
			if s.id != op.id {
				t.Fatalf("accepted stream %v; want stream %v", stringID(s.id), stringID(op.id))
			}
			t.Logf("accepted stream %v", stringID(op.id))
			// Immediately close the stream, so the stream becomes done when the
			// peer closes its end.
			s.Close()
		}
		p := tc.readPacket()
		if p != nil {
			tc.writeFrames(p.ptype, debugFrameAck{
				ranges: []i64range[packetNumber]{{0, p.num + 1}},
			})
		}
	}
	// Every stream should be fully closed now.
	// Check that we don't have any state left.
	if got := len(tc.conn.streams.streams); got != 0 {
		t.Fatalf("after test, len(tc.conn.streams.streams) = %v, want 0", got)
	}
	if tc.conn.streams.queueMeta.head != nil {
		t.Fatalf("after test, stream send queue is not empty; should be")
	}
}

func TestStreamsCreateConcurrency(t *testing.T) {
	cli, srv := newLocalConnPair(t, &Config{}, &Config{})

	srvdone := make(chan int)
	go func() {
		defer close(srvdone)
		for streams := 0; ; streams++ {
			s, err := srv.AcceptStream(context.Background())
			if err != nil {
				srvdone <- streams
				return
			}
			s.Close()
		}
	}()

	var wg sync.WaitGroup
	const concurrency = 10
	const streams = 10
	for i := 0; i < concurrency; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for j := 0; j < streams; j++ {
				s, err := cli.NewStream(context.Background())
				if err != nil {
					t.Errorf("NewStream: %v", err)
					return
				}
				s.Flush()
				_, err = io.ReadAll(s)
				if err != nil {
					t.Errorf("ReadFull: %v", err)
				}
				s.Close()
			}
		}()
	}
	wg.Wait()

	cli.Abort(nil)
	srv.Abort(nil)
	if got, want := <-srvdone, concurrency*streams; got != want {
		t.Errorf("accepted %v streams, want %v", got, want)
	}
}

func TestStreamsPTOWithImplicitStream(t *testing.T) {
	ctx := canceledContext()
	tc := newTestConn(t, serverSide, permissiveTransportParameters)
	tc.handshake()
	tc.ignoreFrame(frameTypeAck)

	// Peer creates stream 1, and implicitly creates stream 0.
	tc.writeFrames(packetType1RTT, debugFrameStream{
		id: newStreamID(clientSide, bidiStream, 1),
	})

	// We accept stream 1 and write data to it.
	data := []byte("data")
	s, err := tc.conn.AcceptStream(ctx)
	if err != nil {
		t.Fatalf("conn.AcceptStream() = %v, want stream", err)
	}
	s.Write(data)
	s.Flush()
	tc.wantFrame("data written to stream",
		packetType1RTT, debugFrameStream{
			id:   newStreamID(clientSide, bidiStream, 1),
			data: data,
		})

	// PTO expires, and the data is resent.
	const pto = true
	tc.triggerLossOrPTO(packetType1RTT, true)
	tc.wantFrame("data resent after PTO expires",
		packetType1RTT, debugFrameStream{
			id:   newStreamID(clientSide, bidiStream, 1),
			data: data,
		})
}
