quic: add pipe type

Streams (including CRYPTO streams) are an ordered byte sequence.

Both outgoing and incoming streams require random access to a portion
of that sequence. Outbound packets may be lost, requiring us to
resend the data in the lost packet. Inbound packets may arrive out
of order.

Add a "pipe" type as a building block for both inbound and outbound
streams. A pipe is a window into a portion of a stream, permitting
random read and write access within that window (unlike bufio.Reader
or bufio.Writer).

Pipes are implemented as a linked list of blocks.

Block sizes are uniform and allocations are pooled,
avoiding non-pool allocations in the steady state.

Pipe memory consumption is proportional to the current window,
and goes to zero when the window has been fully consumed
(unlike bytes.Buffer).

For golang/go#58547

Change-Id: I0c16707552c9c46f31055daea2396590a924fc60
Reviewed-on: https://go-review.googlesource.com/c/net/+/510615
Run-TryBot: Damien Neil <dneil@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/internal/quic/pipe.go b/internal/quic/pipe.go
new file mode 100644
index 0000000..978a4f3
--- /dev/null
+++ b/internal/quic/pipe.go
@@ -0,0 +1,149 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import (
+	"sync"
+)
+
+// A pipe is a byte buffer used in implementing streams.
+//
+// A pipe contains a window of stream data.
+// Random access reads and writes are supported within the window.
+// Writing past the end of the window extends it.
+// Data may be discarded from the start of the pipe, advancing the window.
+type pipe struct {
+	start int64
+	end   int64
+	head  *pipebuf
+	tail  *pipebuf
+}
+
+type pipebuf struct {
+	off  int64
+	b    []byte
+	next *pipebuf
+}
+
+func (pb *pipebuf) end() int64 {
+	return pb.off + int64(len(pb.b))
+}
+
+var pipebufPool = sync.Pool{
+	New: func() any {
+		return &pipebuf{
+			b: make([]byte, 4096),
+		}
+	},
+}
+
+func newPipebuf() *pipebuf {
+	return pipebufPool.Get().(*pipebuf)
+}
+
+func (b *pipebuf) recycle() {
+	b.off = 0
+	b.next = nil
+	pipebufPool.Put(b)
+}
+
+// writeAt writes len(b) bytes to the pipe at offset off.
+//
+// Writes to offsets before p.start are discarded.
+// Writes to offsets after p.end extend the pipe window.
+func (p *pipe) writeAt(b []byte, off int64) {
+	end := off + int64(len(b))
+	if end > p.end {
+		p.end = end
+	} else if end <= p.start {
+		return
+	}
+
+	if off < p.start {
+		// Discard the portion of b which falls before p.start.
+		trim := p.start - off
+		b = b[trim:]
+		off = p.start
+	}
+
+	if p.head == nil {
+		p.head = newPipebuf()
+		p.head.off = p.start
+		p.tail = p.head
+	}
+	pb := p.head
+	if off >= p.tail.off {
+		// Common case: Writing past the end of the pipe.
+		pb = p.tail
+	}
+	for {
+		pboff := off - pb.off
+		if pboff < int64(len(pb.b)) {
+			n := copy(pb.b[pboff:], b)
+			if n == len(b) {
+				return
+			}
+			off += int64(n)
+			b = b[n:]
+		}
+		if pb.next == nil {
+			pb.next = newPipebuf()
+			pb.next.off = pb.off + int64(len(pb.b))
+			p.tail = pb.next
+		}
+		pb = pb.next
+	}
+}
+
+// copy copies len(b) bytes into b starting from off.
+// The pipe must contain [off, off+len(b)).
+func (p *pipe) copy(off int64, b []byte) {
+	dst := b[:0]
+	p.read(off, len(b), func(c []byte) error {
+		dst = append(dst, c...)
+		return nil
+	})
+}
+
+// read calls f with the data in [off, off+n)
+// The data may be provided sequentially across multiple calls to f.
+func (p *pipe) read(off int64, n int, f func([]byte) error) error {
+	if off < p.start {
+		panic("invalid read range")
+	}
+	for pb := p.head; pb != nil && n > 0; pb = pb.next {
+		if off >= pb.end() {
+			continue
+		}
+		b := pb.b[off-pb.off:]
+		if len(b) > n {
+			b = b[:n]
+		}
+		off += int64(len(b))
+		n -= len(b)
+		if err := f(b); err != nil {
+			return err
+		}
+	}
+	if n > 0 {
+		panic("invalid read range")
+	}
+	return nil
+}
+
+// discardBefore discards all data prior to off.
+func (p *pipe) discardBefore(off int64) {
+	for p.head != nil && p.head.end() < off {
+		head := p.head
+		p.head = p.head.next
+		head.recycle()
+	}
+	if p.head == nil {
+		p.tail = nil
+	}
+	p.start = off
+}
diff --git a/internal/quic/pipe_test.go b/internal/quic/pipe_test.go
new file mode 100644
index 0000000..7a05ff4
--- /dev/null
+++ b/internal/quic/pipe_test.go
@@ -0,0 +1,95 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import (
+	"bytes"
+	"math/rand"
+	"testing"
+)
+
+func TestPipeWrites(t *testing.T) {
+	type writeOp struct {
+		start, end int64
+	}
+	type discardBeforeOp struct {
+		off int64
+	}
+	type op any
+	src := make([]byte, 65536)
+	rand.New(rand.NewSource(0)).Read(src)
+	for _, test := range []struct {
+		desc string
+		ops  []op
+	}{{
+		desc: "sequential writes",
+		ops: []op{
+			writeOp{0, 1024},
+			writeOp{1024, 4096},
+			writeOp{4096, 65536},
+		},
+	}, {
+		desc: "disordered overlapping writes",
+		ops: []op{
+			writeOp{2000, 8000},
+			writeOp{0, 3000},
+			writeOp{7000, 12000},
+		},
+	}, {
+		desc: "write to discarded region",
+		ops: []op{
+			writeOp{0, 65536},
+			discardBeforeOp{32768},
+			writeOp{0, 1000},
+			writeOp{3000, 5000},
+			writeOp{0, 32768},
+		},
+	}, {
+		desc: "write overlaps discarded region",
+		ops: []op{
+			discardBeforeOp{10000},
+			writeOp{0, 20000},
+		},
+	}, {
+		desc: "discard everything",
+		ops: []op{
+			writeOp{0, 10000},
+			discardBeforeOp{10000},
+			writeOp{10000, 20000},
+		},
+	}} {
+		var p pipe
+		var wantset rangeset[int64]
+		var wantStart, wantEnd int64
+		for i, o := range test.ops {
+			switch o := o.(type) {
+			case writeOp:
+				p.writeAt(src[o.start:o.end], o.start)
+				wantset.add(o.start, o.end)
+				wantset.sub(0, wantStart)
+				if o.end > wantEnd {
+					wantEnd = o.end
+				}
+			case discardBeforeOp:
+				p.discardBefore(o.off)
+				wantset.sub(0, o.off)
+				wantStart = o.off
+			}
+			if p.start != wantStart || p.end != wantEnd {
+				t.Errorf("%v: after %#v p contains [%v,%v), want [%v,%v)", test.desc, test.ops[:i+1], p.start, p.end, wantStart, wantEnd)
+			}
+			for _, r := range wantset {
+				want := src[r.start:][:r.size()]
+				got := make([]byte, r.size())
+				p.copy(r.start, got)
+				if !bytes.Equal(got, want) {
+					t.Errorf("%v after %#v, mismatch in data in %v", test.desc, test.ops[:i+1], r)
+				}
+			}
+		}
+	}
+}