| // Copyright 2024 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package http_test |
| |
| import ( |
| "bytes" |
| "context" |
| "internal/synctest" |
| "io" |
| "math" |
| "net" |
| "net/netip" |
| "os" |
| "sync" |
| "time" |
| ) |
| |
| func fakeNetListen() *fakeNetListener { |
| li := &fakeNetListener{ |
| setc: make(chan struct{}, 1), |
| unsetc: make(chan struct{}, 1), |
| addr: netip.MustParseAddrPort("127.0.0.1:8000"), |
| locPort: 10000, |
| } |
| li.unsetc <- struct{}{} |
| return li |
| } |
| |
| type fakeNetListener struct { |
| setc, unsetc chan struct{} |
| queue []net.Conn |
| closed bool |
| addr netip.AddrPort |
| locPort uint16 |
| |
| onDial func() // called when making a new connection |
| onClose func(*fakeNetConn) // called when closing a connection |
| |
| trackConns bool // set this to record all created conns |
| conns []*fakeNetConn |
| } |
| |
| func (li *fakeNetListener) lock() { |
| select { |
| case <-li.setc: |
| case <-li.unsetc: |
| } |
| } |
| |
| func (li *fakeNetListener) unlock() { |
| if li.closed || len(li.queue) > 0 { |
| li.setc <- struct{}{} |
| } else { |
| li.unsetc <- struct{}{} |
| } |
| } |
| |
| func (li *fakeNetListener) connect() *fakeNetConn { |
| if li.onDial != nil { |
| li.onDial() |
| } |
| li.lock() |
| defer li.unlock() |
| locAddr := netip.AddrPortFrom(netip.AddrFrom4([4]byte{127, 0, 0, 1}), li.locPort) |
| li.locPort++ |
| c0, c1 := fakeNetPipe(li.addr, locAddr) |
| c0.onClose = li.onClose |
| c1.onClose = li.onClose |
| li.queue = append(li.queue, c0) |
| if li.trackConns { |
| li.conns = append(li.conns, c0) |
| } |
| return c1 |
| } |
| |
| func (li *fakeNetListener) Accept() (net.Conn, error) { |
| <-li.setc |
| defer li.unlock() |
| if li.closed { |
| return nil, net.ErrClosed |
| } |
| c := li.queue[0] |
| li.queue = li.queue[1:] |
| return c, nil |
| } |
| |
| func (li *fakeNetListener) Close() error { |
| li.lock() |
| defer li.unlock() |
| li.closed = true |
| return nil |
| } |
| |
| func (li *fakeNetListener) Addr() net.Addr { |
| return net.TCPAddrFromAddrPort(li.addr) |
| } |
| |
| // fakeNetPipe creates an in-memory, full duplex network connection. |
| // |
| // Unlike net.Pipe, the connection is not synchronous. |
| // Writes are made to a buffer, and return immediately. |
| // By default, the buffer size is unlimited. |
| func fakeNetPipe(s1ap, s2ap netip.AddrPort) (r, w *fakeNetConn) { |
| s1addr := net.TCPAddrFromAddrPort(s1ap) |
| s2addr := net.TCPAddrFromAddrPort(s2ap) |
| s1 := newSynctestNetConnHalf(s1addr) |
| s2 := newSynctestNetConnHalf(s2addr) |
| c1 := &fakeNetConn{loc: s1, rem: s2} |
| c2 := &fakeNetConn{loc: s2, rem: s1} |
| c1.peer = c2 |
| c2.peer = c1 |
| return c1, c2 |
| } |
| |
| // A fakeNetConn is one endpoint of the connection created by fakeNetPipe. |
| type fakeNetConn struct { |
| // local and remote connection halves. |
| // Each half contains a buffer. |
| // Reads pull from the local buffer, and writes push to the remote buffer. |
| loc, rem *fakeNetConnHalf |
| |
| // When set, synctest.Wait is automatically called before reads and after writes. |
| autoWait bool |
| |
| // peer is the other endpoint. |
| peer *fakeNetConn |
| |
| onClose func(*fakeNetConn) // called when closing |
| } |
| |
| // Read reads data from the connection. |
| func (c *fakeNetConn) Read(b []byte) (n int, err error) { |
| if c.autoWait { |
| synctest.Wait() |
| } |
| return c.loc.read(b) |
| } |
| |
| // Peek returns the available unread read buffer, |
| // without consuming its contents. |
| func (c *fakeNetConn) Peek() []byte { |
| if c.autoWait { |
| synctest.Wait() |
| } |
| return c.loc.peek() |
| } |
| |
| // Write writes data to the connection. |
| func (c *fakeNetConn) Write(b []byte) (n int, err error) { |
| if c.autoWait { |
| defer synctest.Wait() |
| } |
| return c.rem.write(b) |
| } |
| |
| // IsClosed reports whether the peer has closed its end of the connection. |
| func (c *fakeNetConn) IsClosedByPeer() bool { |
| if c.autoWait { |
| synctest.Wait() |
| } |
| c.rem.lock() |
| defer c.rem.unlock() |
| // If the remote half of the conn is returning ErrClosed, |
| // the peer has closed the connection. |
| return c.rem.readErr == net.ErrClosed |
| } |
| |
| // Close closes the connection. |
| func (c *fakeNetConn) Close() error { |
| if c.onClose != nil { |
| c.onClose(c) |
| } |
| // Local half of the conn is now closed. |
| c.loc.lock() |
| c.loc.writeErr = net.ErrClosed |
| c.loc.readErr = net.ErrClosed |
| c.loc.buf.Reset() |
| c.loc.unlock() |
| // Remote half of the connection reads EOF after reading any remaining data. |
| c.rem.lock() |
| if c.rem.readErr != nil { |
| c.rem.readErr = io.EOF |
| } |
| c.rem.unlock() |
| if c.autoWait { |
| synctest.Wait() |
| } |
| return nil |
| } |
| |
| // LocalAddr returns the (fake) local network address. |
| func (c *fakeNetConn) LocalAddr() net.Addr { |
| return c.loc.addr |
| } |
| |
| // LocalAddr returns the (fake) remote network address. |
| func (c *fakeNetConn) RemoteAddr() net.Addr { |
| return c.rem.addr |
| } |
| |
| // SetDeadline sets the read and write deadlines for the connection. |
| func (c *fakeNetConn) SetDeadline(t time.Time) error { |
| c.SetReadDeadline(t) |
| c.SetWriteDeadline(t) |
| return nil |
| } |
| |
| // SetReadDeadline sets the read deadline for the connection. |
| func (c *fakeNetConn) SetReadDeadline(t time.Time) error { |
| c.loc.rctx.setDeadline(t) |
| return nil |
| } |
| |
| // SetWriteDeadline sets the write deadline for the connection. |
| func (c *fakeNetConn) SetWriteDeadline(t time.Time) error { |
| c.rem.wctx.setDeadline(t) |
| return nil |
| } |
| |
| // SetReadBufferSize sets the read buffer limit for the connection. |
| // Writes by the peer will block so long as the buffer is full. |
| func (c *fakeNetConn) SetReadBufferSize(size int) { |
| c.loc.setReadBufferSize(size) |
| } |
| |
| // fakeNetConnHalf is one data flow in the connection created by fakeNetPipe. |
| // Each half contains a buffer. Writes to the half push to the buffer, and reads pull from it. |
| type fakeNetConnHalf struct { |
| addr net.Addr |
| |
| // Read and write timeouts. |
| rctx, wctx deadlineContext |
| |
| // A half can be readable and/or writable. |
| // |
| // These four channels act as a lock, |
| // and allow waiting for readability/writability. |
| // When the half is unlocked, exactly one channel contains a value. |
| // When the half is locked, all channels are empty. |
| lockr chan struct{} // readable |
| lockw chan struct{} // writable |
| lockrw chan struct{} // readable and writable |
| lockc chan struct{} // neither readable nor writable |
| |
| bufMax int // maximum buffer size |
| buf bytes.Buffer |
| readErr error // error returned by reads |
| writeErr error // error returned by writes |
| } |
| |
| func newSynctestNetConnHalf(addr net.Addr) *fakeNetConnHalf { |
| h := &fakeNetConnHalf{ |
| addr: addr, |
| lockw: make(chan struct{}, 1), |
| lockr: make(chan struct{}, 1), |
| lockrw: make(chan struct{}, 1), |
| lockc: make(chan struct{}, 1), |
| bufMax: math.MaxInt, // unlimited |
| } |
| h.unlock() |
| return h |
| } |
| |
| // lock locks h. |
| func (h *fakeNetConnHalf) lock() { |
| select { |
| case <-h.lockw: // writable |
| case <-h.lockr: // readable |
| case <-h.lockrw: // readable and writable |
| case <-h.lockc: // neither readable nor writable |
| } |
| } |
| |
| // h unlocks h. |
| func (h *fakeNetConnHalf) unlock() { |
| canRead := h.readErr != nil || h.buf.Len() > 0 |
| canWrite := h.writeErr != nil || h.bufMax > h.buf.Len() |
| switch { |
| case canRead && canWrite: |
| h.lockrw <- struct{}{} // readable and writable |
| case canRead: |
| h.lockr <- struct{}{} // readable |
| case canWrite: |
| h.lockw <- struct{}{} // writable |
| default: |
| h.lockc <- struct{}{} // neither readable nor writable |
| } |
| } |
| |
| // waitAndLockForRead waits until h is readable and locks it. |
| func (h *fakeNetConnHalf) waitAndLockForRead() error { |
| // First a non-blocking select to see if we can make immediate progress. |
| // This permits using a canceled context for a non-blocking operation. |
| select { |
| case <-h.lockr: |
| return nil // readable |
| case <-h.lockrw: |
| return nil // readable and writable |
| default: |
| } |
| ctx := h.rctx.context() |
| select { |
| case <-h.lockr: |
| return nil // readable |
| case <-h.lockrw: |
| return nil // readable and writable |
| case <-ctx.Done(): |
| return context.Cause(ctx) |
| } |
| } |
| |
| // waitAndLockForWrite waits until h is writable and locks it. |
| func (h *fakeNetConnHalf) waitAndLockForWrite() error { |
| // First a non-blocking select to see if we can make immediate progress. |
| // This permits using a canceled context for a non-blocking operation. |
| select { |
| case <-h.lockw: |
| return nil // writable |
| case <-h.lockrw: |
| return nil // readable and writable |
| default: |
| } |
| ctx := h.wctx.context() |
| select { |
| case <-h.lockw: |
| return nil // writable |
| case <-h.lockrw: |
| return nil // readable and writable |
| case <-ctx.Done(): |
| return context.Cause(ctx) |
| } |
| } |
| |
| func (h *fakeNetConnHalf) peek() []byte { |
| h.lock() |
| defer h.unlock() |
| return h.buf.Bytes() |
| } |
| |
| func (h *fakeNetConnHalf) read(b []byte) (n int, err error) { |
| if err := h.waitAndLockForRead(); err != nil { |
| return 0, err |
| } |
| defer h.unlock() |
| if h.buf.Len() == 0 && h.readErr != nil { |
| return 0, h.readErr |
| } |
| return h.buf.Read(b) |
| } |
| |
| func (h *fakeNetConnHalf) setReadBufferSize(size int) { |
| h.lock() |
| defer h.unlock() |
| h.bufMax = size |
| } |
| |
| func (h *fakeNetConnHalf) write(b []byte) (n int, err error) { |
| for n < len(b) { |
| nn, err := h.writePartial(b[n:]) |
| n += nn |
| if err != nil { |
| return n, err |
| } |
| } |
| return n, nil |
| } |
| |
| func (h *fakeNetConnHalf) writePartial(b []byte) (n int, err error) { |
| if err := h.waitAndLockForWrite(); err != nil { |
| return 0, err |
| } |
| defer h.unlock() |
| if h.writeErr != nil { |
| return 0, h.writeErr |
| } |
| writeMax := h.bufMax - h.buf.Len() |
| if writeMax < len(b) { |
| b = b[:writeMax] |
| } |
| return h.buf.Write(b) |
| } |
| |
| // deadlineContext converts a changable deadline (as in net.Conn.SetDeadline) into a Context. |
| type deadlineContext struct { |
| mu sync.Mutex |
| ctx context.Context |
| cancel context.CancelCauseFunc |
| timer *time.Timer |
| } |
| |
| // context returns a Context which expires when the deadline does. |
| func (t *deadlineContext) context() context.Context { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| if t.ctx == nil { |
| t.ctx, t.cancel = context.WithCancelCause(context.Background()) |
| } |
| return t.ctx |
| } |
| |
| // setDeadline sets the current deadline. |
| func (t *deadlineContext) setDeadline(deadline time.Time) { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| // If t.ctx is non-nil and t.cancel is nil, then t.ctx was canceled |
| // and we should create a new one. |
| if t.ctx == nil || t.cancel == nil { |
| t.ctx, t.cancel = context.WithCancelCause(context.Background()) |
| } |
| // Stop any existing deadline from expiring. |
| if t.timer != nil { |
| t.timer.Stop() |
| } |
| if deadline.IsZero() { |
| // No deadline. |
| return |
| } |
| now := time.Now() |
| if !deadline.After(now) { |
| // Deadline has already expired. |
| t.cancel(os.ErrDeadlineExceeded) |
| t.cancel = nil |
| return |
| } |
| if t.timer != nil { |
| // Reuse existing deadline timer. |
| t.timer.Reset(deadline.Sub(now)) |
| return |
| } |
| // Create a new timer to cancel the context at the deadline. |
| t.timer = time.AfterFunc(deadline.Sub(now), func() { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| t.cancel(os.ErrDeadlineExceeded) |
| t.cancel = nil |
| }) |
| } |