| // Copyright 2024 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| package http2 |
| |
| import ( |
| "context" |
| "sync" |
| "time" |
| ) |
| |
| // testSyncHooks coordinates goroutines in tests. |
| // |
| // For example, a call to ClientConn.RoundTrip involves several goroutines, including: |
| // - the goroutine running RoundTrip; |
| // - the clientStream.doRequest goroutine, which writes the request; and |
| // - the clientStream.readLoop goroutine, which reads the response. |
| // |
| // Using testSyncHooks, a test can start a RoundTrip and identify when all these goroutines |
| // are blocked waiting for some condition such as reading the Request.Body or waiting for |
| // flow control to become available. |
| // |
| // The testSyncHooks also manage timers and synthetic time in tests. |
| // This permits us to, for example, start a request and cause it to time out waiting for |
| // response headers without resorting to time.Sleep calls. |
| type testSyncHooks struct { |
| // active/inactive act as a mutex and condition variable. |
| // |
| // - neither chan contains a value: testSyncHooks is locked. |
| // - active contains a value: unlocked, and at least one goroutine is not blocked |
| // - inactive contains a value: unlocked, and all goroutines are blocked |
| active chan struct{} |
| inactive chan struct{} |
| |
| // goroutine counts |
| total int // total goroutines |
| condwait map[*sync.Cond]int // blocked in sync.Cond.Wait |
| blocked []*testBlockedGoroutine // otherwise blocked |
| |
| // fake time |
| now time.Time |
| timers []*fakeTimer |
| |
| // Transport testing: Report various events. |
| newclientconn func(*ClientConn) |
| newstream func(*clientStream) |
| } |
| |
| // testBlockedGoroutine is a blocked goroutine. |
| type testBlockedGoroutine struct { |
| f func() bool // blocked until f returns true |
| ch chan struct{} // closed when unblocked |
| } |
| |
| func newTestSyncHooks() *testSyncHooks { |
| h := &testSyncHooks{ |
| active: make(chan struct{}, 1), |
| inactive: make(chan struct{}, 1), |
| condwait: map[*sync.Cond]int{}, |
| } |
| h.inactive <- struct{}{} |
| h.now = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) |
| return h |
| } |
| |
| // lock acquires the testSyncHooks mutex. |
| func (h *testSyncHooks) lock() { |
| select { |
| case <-h.active: |
| case <-h.inactive: |
| } |
| } |
| |
| // waitInactive waits for all goroutines to become inactive. |
| func (h *testSyncHooks) waitInactive() { |
| for { |
| <-h.inactive |
| if !h.unlock() { |
| break |
| } |
| } |
| } |
| |
| // unlock releases the testSyncHooks mutex. |
| // It reports whether any goroutines are active. |
| func (h *testSyncHooks) unlock() (active bool) { |
| // Look for a blocked goroutine which can be unblocked. |
| blocked := h.blocked[:0] |
| unblocked := false |
| for _, b := range h.blocked { |
| if !unblocked && b.f() { |
| unblocked = true |
| close(b.ch) |
| } else { |
| blocked = append(blocked, b) |
| } |
| } |
| h.blocked = blocked |
| |
| // Count goroutines blocked on condition variables. |
| condwait := 0 |
| for _, count := range h.condwait { |
| condwait += count |
| } |
| |
| if h.total > condwait+len(blocked) { |
| h.active <- struct{}{} |
| return true |
| } else { |
| h.inactive <- struct{}{} |
| return false |
| } |
| } |
| |
| // goRun starts a new goroutine. |
| func (h *testSyncHooks) goRun(f func()) { |
| h.lock() |
| h.total++ |
| h.unlock() |
| go func() { |
| defer func() { |
| h.lock() |
| h.total-- |
| h.unlock() |
| }() |
| f() |
| }() |
| } |
| |
| // blockUntil indicates that a goroutine is blocked waiting for some condition to become true. |
| // It waits until f returns true before proceeding. |
| // |
| // Example usage: |
| // |
| // h.blockUntil(func() bool { |
| // // Is the context done yet? |
| // select { |
| // case <-ctx.Done(): |
| // default: |
| // return false |
| // } |
| // return true |
| // }) |
| // // Wait for the context to become done. |
| // <-ctx.Done() |
| // |
| // The function f passed to blockUntil must be non-blocking and idempotent. |
| func (h *testSyncHooks) blockUntil(f func() bool) { |
| if f() { |
| return |
| } |
| ch := make(chan struct{}) |
| h.lock() |
| h.blocked = append(h.blocked, &testBlockedGoroutine{ |
| f: f, |
| ch: ch, |
| }) |
| h.unlock() |
| <-ch |
| } |
| |
| // broadcast is sync.Cond.Broadcast. |
| func (h *testSyncHooks) condBroadcast(cond *sync.Cond) { |
| h.lock() |
| delete(h.condwait, cond) |
| h.unlock() |
| cond.Broadcast() |
| } |
| |
| // broadcast is sync.Cond.Wait. |
| func (h *testSyncHooks) condWait(cond *sync.Cond) { |
| h.lock() |
| h.condwait[cond]++ |
| h.unlock() |
| } |
| |
| // newTimer creates a new fake timer. |
| func (h *testSyncHooks) newTimer(d time.Duration) timer { |
| h.lock() |
| defer h.unlock() |
| t := &fakeTimer{ |
| hooks: h, |
| when: h.now.Add(d), |
| c: make(chan time.Time), |
| } |
| h.timers = append(h.timers, t) |
| return t |
| } |
| |
| // afterFunc creates a new fake AfterFunc timer. |
| func (h *testSyncHooks) afterFunc(d time.Duration, f func()) timer { |
| h.lock() |
| defer h.unlock() |
| t := &fakeTimer{ |
| hooks: h, |
| when: h.now.Add(d), |
| f: f, |
| } |
| h.timers = append(h.timers, t) |
| return t |
| } |
| |
| func (h *testSyncHooks) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) { |
| ctx, cancel := context.WithCancel(ctx) |
| t := h.afterFunc(d, cancel) |
| return ctx, func() { |
| t.Stop() |
| cancel() |
| } |
| } |
| |
| func (h *testSyncHooks) timeUntilEvent() time.Duration { |
| h.lock() |
| defer h.unlock() |
| var next time.Time |
| for _, t := range h.timers { |
| if next.IsZero() || t.when.Before(next) { |
| next = t.when |
| } |
| } |
| if d := next.Sub(h.now); d > 0 { |
| return d |
| } |
| return 0 |
| } |
| |
| // advance advances time and causes synthetic timers to fire. |
| func (h *testSyncHooks) advance(d time.Duration) { |
| h.lock() |
| defer h.unlock() |
| h.now = h.now.Add(d) |
| timers := h.timers[:0] |
| for _, t := range h.timers { |
| t := t // remove after go.mod depends on go1.22 |
| t.mu.Lock() |
| switch { |
| case t.when.After(h.now): |
| timers = append(timers, t) |
| case t.when.IsZero(): |
| // stopped timer |
| default: |
| t.when = time.Time{} |
| if t.c != nil { |
| close(t.c) |
| } |
| if t.f != nil { |
| h.total++ |
| go func() { |
| defer func() { |
| h.lock() |
| h.total-- |
| h.unlock() |
| }() |
| t.f() |
| }() |
| } |
| } |
| t.mu.Unlock() |
| } |
| h.timers = timers |
| } |
| |
| // A timer wraps a time.Timer, or a synthetic equivalent in tests. |
| // Unlike time.Timer, timer is single-use: The timer channel is closed when the timer expires. |
| type timer interface { |
| C() <-chan time.Time |
| Stop() bool |
| Reset(d time.Duration) bool |
| } |
| |
| // timeTimer implements timer using real time. |
| type timeTimer struct { |
| t *time.Timer |
| c chan time.Time |
| } |
| |
| // newTimeTimer creates a new timer using real time. |
| func newTimeTimer(d time.Duration) timer { |
| ch := make(chan time.Time) |
| t := time.AfterFunc(d, func() { |
| close(ch) |
| }) |
| return &timeTimer{t, ch} |
| } |
| |
| // newTimeAfterFunc creates an AfterFunc timer using real time. |
| func newTimeAfterFunc(d time.Duration, f func()) timer { |
| return &timeTimer{ |
| t: time.AfterFunc(d, f), |
| } |
| } |
| |
| func (t timeTimer) C() <-chan time.Time { return t.c } |
| func (t timeTimer) Stop() bool { return t.t.Stop() } |
| func (t timeTimer) Reset(d time.Duration) bool { return t.t.Reset(d) } |
| |
| // fakeTimer implements timer using fake time. |
| type fakeTimer struct { |
| hooks *testSyncHooks |
| |
| mu sync.Mutex |
| when time.Time // when the timer will fire |
| c chan time.Time // closed when the timer fires; mutually exclusive with f |
| f func() // called when the timer fires; mutually exclusive with c |
| } |
| |
| func (t *fakeTimer) C() <-chan time.Time { return t.c } |
| |
| func (t *fakeTimer) Stop() bool { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| stopped := t.when.IsZero() |
| t.when = time.Time{} |
| return stopped |
| } |
| |
| func (t *fakeTimer) Reset(d time.Duration) bool { |
| if t.c != nil || t.f == nil { |
| panic("fakeTimer only supports Reset on AfterFunc timers") |
| } |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| t.hooks.lock() |
| defer t.hooks.unlock() |
| active := !t.when.IsZero() |
| t.when = t.hooks.now.Add(d) |
| if !active { |
| t.hooks.timers = append(t.hooks.timers, t) |
| } |
| return active |
| } |