blob: 5a1759579e284fec65eaacc320cdf3256f80e89b [file] [log] [blame]
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import (
// synctestNetPipe creates an in-memory, full duplex network connection.
// Read and write timeouts are managed by the synctest group.
// Unlike net.Pipe, the connection is not synchronous.
// Writes are made to a buffer, and return immediately.
// By default, the buffer size is unlimited.
func synctestNetPipe(group *synctestGroup) (r, w *synctestNetConn) {
s1addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort(""))
s2addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort(""))
s1 := newSynctestNetConnHalf(s1addr)
s2 := newSynctestNetConnHalf(s2addr)
r = &synctestNetConn{group: group, loc: s1, rem: s2}
w = &synctestNetConn{group: group, loc: s2, rem: s1}
r.peer = w
w.peer = r
return r, w
// A synctestNetConn is one endpoint of the connection created by synctestNetPipe.
type synctestNetConn struct {
group *synctestGroup
// local and remote connection halves.
// Each half contains a buffer.
// Reads pull from the local buffer, and writes push to the remote buffer.
loc, rem *synctestNetConnHalf
// When set, group.Wait is automatically called before reads and after writes.
autoWait bool
// peer is the other endpoint.
peer *synctestNetConn
// Read reads data from the connection.
func (c *synctestNetConn) Read(b []byte) (n int, err error) {
if c.autoWait {
// Peek returns the available unread read buffer,
// without consuming its contents.
func (c *synctestNetConn) Peek() []byte {
if c.autoWait {
return c.loc.peek()
// Write writes data to the connection.
func (c *synctestNetConn) Write(b []byte) (n int, err error) {
if c.autoWait {
return c.rem.write(b)
// IsClosedByPeer reports whether the peer has closed its end of the connection.
func (c *synctestNetConn) IsClosedByPeer() bool {
if c.autoWait {
return c.loc.isClosedByPeer()
// Close closes the connection.
func (c *synctestNetConn) Close() error {
c.loc.setWriteError(errors.New("connection closed by peer"))
if c.autoWait {
return nil
// LocalAddr returns the (fake) local network address.
func (c *synctestNetConn) LocalAddr() net.Addr {
return c.loc.addr
// LocalAddr returns the (fake) remote network address.
func (c *synctestNetConn) RemoteAddr() net.Addr {
return c.rem.addr
// SetDeadline sets the read and write deadlines for the connection.
func (c *synctestNetConn) SetDeadline(t time.Time) error {
return nil
// SetReadDeadline sets the read deadline for the connection.
func (c *synctestNetConn) SetReadDeadline(t time.Time) error {
c.loc.rctx.setDeadline(, t)
return nil
// SetWriteDeadline sets the write deadline for the connection.
func (c *synctestNetConn) SetWriteDeadline(t time.Time) error {
c.rem.wctx.setDeadline(, t)
return nil
// SetReadBufferSize sets the read buffer limit for the connection.
// Writes by the peer will block so long as the buffer is full.
func (c *synctestNetConn) SetReadBufferSize(size int) {
// synctestNetConnHalf is one data flow in the connection created by synctestNetPipe.
// Each half contains a buffer. Writes to the half push to the buffer, and reads pull from it.
type synctestNetConnHalf struct {
addr net.Addr
// Read and write timeouts.
rctx, wctx deadlineContext
// A half can be readable and/or writable.
// These four channels act as a lock,
// and allow waiting for readability/writability.
// When the half is unlocked, exactly one channel contains a value.
// When the half is locked, all channels are empty.
lockr chan struct{} // readable
lockw chan struct{} // writable
lockrw chan struct{} // readable and writable
lockc chan struct{} // neither readable nor writable
bufMax int // maximum buffer size
buf bytes.Buffer
readErr error // error returned by reads
writeErr error // error returned by writes
func newSynctestNetConnHalf(addr net.Addr) *synctestNetConnHalf {
h := &synctestNetConnHalf{
addr: addr,
lockw: make(chan struct{}, 1),
lockr: make(chan struct{}, 1),
lockrw: make(chan struct{}, 1),
lockc: make(chan struct{}, 1),
bufMax: math.MaxInt, // unlimited
return h
func (h *synctestNetConnHalf) lock() {
select {
case <-h.lockw:
case <-h.lockr:
case <-h.lockrw:
case <-h.lockc:
func (h *synctestNetConnHalf) unlock() {
canRead := h.readErr != nil || h.buf.Len() > 0
canWrite := h.writeErr != nil || h.bufMax > h.buf.Len()
switch {
case canRead && canWrite:
h.lockrw <- struct{}{}
case canRead:
h.lockr <- struct{}{}
case canWrite:
h.lockw <- struct{}{}
h.lockc <- struct{}{}
func (h *synctestNetConnHalf) readWaitAndLock() error {
select {
case <-h.lockr:
return nil
case <-h.lockrw:
return nil
ctx := h.rctx.context()
select {
case <-h.lockr:
return nil
case <-h.lockrw:
return nil
case <-ctx.Done():
return context.Cause(ctx)
func (h *synctestNetConnHalf) writeWaitAndLock() error {
select {
case <-h.lockw:
return nil
case <-h.lockrw:
return nil
ctx := h.wctx.context()
select {
case <-h.lockw:
return nil
case <-h.lockrw:
return nil
case <-ctx.Done():
return context.Cause(ctx)
func (h *synctestNetConnHalf) peek() []byte {
defer h.unlock()
return h.buf.Bytes()
func (h *synctestNetConnHalf) isClosedByPeer() bool {
defer h.unlock()
return h.readErr != nil
func (h *synctestNetConnHalf) read(b []byte) (n int, err error) {
if err := h.readWaitAndLock(); err != nil {
return 0, err
defer h.unlock()
if h.buf.Len() == 0 && h.readErr != nil {
return 0, h.readErr
return h.buf.Read(b)
func (h *synctestNetConnHalf) setReadBufferSize(size int) {
defer h.unlock()
h.bufMax = size
func (h *synctestNetConnHalf) write(b []byte) (n int, err error) {
for n < len(b) {
nn, err := h.writePartial(b[n:])
n += nn
if err != nil {
return n, err
return n, nil
func (h *synctestNetConnHalf) writePartial(b []byte) (n int, err error) {
if err := h.writeWaitAndLock(); err != nil {
return 0, err
defer h.unlock()
if h.writeErr != nil {
return 0, h.writeErr
writeMax := h.bufMax - h.buf.Len()
if writeMax < len(b) {
b = b[:writeMax]
return h.buf.Write(b)
func (h *synctestNetConnHalf) setReadError(err error) {
defer h.unlock()
if h.readErr == nil {
h.readErr = err
func (h *synctestNetConnHalf) setWriteError(err error) {
defer h.unlock()
if h.writeErr == nil {
h.writeErr = err
// deadlineContext converts a changable deadline (as in net.Conn.SetDeadline) into a Context.
type deadlineContext struct {
mu sync.Mutex
ctx context.Context
cancel context.CancelCauseFunc
timer timer
// context returns a Context which expires when the deadline does.
func (t *deadlineContext) context() context.Context {
if t.ctx == nil {
t.ctx, t.cancel = context.WithCancelCause(context.Background())
return t.ctx
// setDeadline sets the current deadline.
func (t *deadlineContext) setDeadline(group *synctestGroup, deadline time.Time) {
// If t.ctx is non-nil and t.cancel is nil, then t.ctx was canceled
// and we should create a new one.
if t.ctx == nil || t.cancel == nil {
t.ctx, t.cancel = context.WithCancelCause(context.Background())
// Stop any existing deadline from expiring.
if t.timer != nil {
if deadline.IsZero() {
// No deadline.
if !deadline.After(group.Now()) {
// Deadline has already expired.
t.cancel = nil
if t.timer != nil {
// Reuse existing deadline timer.
// Create a new timer to cancel the context at the deadline.
t.timer = group.AfterFunc(deadline.Sub(group.Now()), func() {
t.cancel = nil