blob: 29bcaab4140f7c5999c6597e464a51d146975e0a [file] [log] [blame]
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package poll_test
import (
"internal/poll"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)
var closeHook atomic.Value // func(fd int)
func init() {
closeFunc := poll.CloseFunc
poll.CloseFunc = func(fd int) (err error) {
if v := closeHook.Load(); v != nil {
if hook := v.(func(int)); hook != nil {
hook(fd)
}
}
return closeFunc(fd)
}
}
func TestSplicePipePool(t *testing.T) {
const N = 64
var (
p *poll.SplicePipe
ps []*poll.SplicePipe
allFDs []int
pendingFDs sync.Map // fd → struct{}{}
err error
)
closeHook.Store(func(fd int) { pendingFDs.Delete(fd) })
t.Cleanup(func() { closeHook.Store((func(int))(nil)) })
for i := 0; i < N; i++ {
p, _, err = poll.GetPipe()
if err != nil {
t.Skipf("failed to create pipe due to error(%v), skip this test", err)
}
_, pwfd := poll.GetPipeFds(p)
allFDs = append(allFDs, pwfd)
pendingFDs.Store(pwfd, struct{}{})
ps = append(ps, p)
}
for _, p = range ps {
poll.PutPipe(p)
}
ps = nil
p = nil
// Exploit the timeout of "go test" as a timer for the subsequent verification.
timeout := 5 * time.Minute
if deadline, ok := t.Deadline(); ok {
timeout = deadline.Sub(time.Now())
timeout -= timeout / 10 // Leave 10% headroom for cleanup.
}
expiredTime := time.NewTimer(timeout)
defer expiredTime.Stop()
// Trigger garbage collection repeatedly, waiting for all pipes in sync.Pool
// to either be deallocated and closed, or to time out.
for {
runtime.GC()
time.Sleep(10 * time.Millisecond)
// Detect whether all pipes are closed properly.
var leakedFDs []int
pendingFDs.Range(func(k, v any) bool {
leakedFDs = append(leakedFDs, k.(int))
return true
})
if len(leakedFDs) == 0 {
break
}
select {
case <-expiredTime.C:
t.Logf("all descriptors: %v", allFDs)
t.Fatalf("leaked descriptors: %v", leakedFDs)
default:
}
}
}
func BenchmarkSplicePipe(b *testing.B) {
b.Run("SplicePipeWithPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
p, _, err := poll.GetPipe()
if err != nil {
continue
}
poll.PutPipe(p)
}
})
b.Run("SplicePipeWithoutPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
p := poll.NewPipe()
if p == nil {
b.Skip("newPipe returned nil")
}
poll.DestroyPipe(p)
}
})
}
func BenchmarkSplicePipePoolParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
p, _, err := poll.GetPipe()
if err != nil {
continue
}
poll.PutPipe(p)
}
})
}
func BenchmarkSplicePipeNativeParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
p := poll.NewPipe()
if p == nil {
b.Skip("newPipe returned nil")
}
poll.DestroyPipe(p)
}
})
}