|  | // Copyright 2021 The Go Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style | 
|  | // license that can be found in the LICENSE file. | 
|  |  | 
|  | package poll_test | 
|  |  | 
|  | import ( | 
|  | "internal/poll" | 
|  | "runtime" | 
|  | "sync" | 
|  | "sync/atomic" | 
|  | "testing" | 
|  | "time" | 
|  | ) | 
|  |  | 
|  | var closeHook atomic.Value // func(fd int) | 
|  |  | 
|  | func init() { | 
|  | closeFunc := poll.CloseFunc | 
|  | poll.CloseFunc = func(fd int) (err error) { | 
|  | if v := closeHook.Load(); v != nil { | 
|  | if hook := v.(func(int)); hook != nil { | 
|  | hook(fd) | 
|  | } | 
|  | } | 
|  | return closeFunc(fd) | 
|  | } | 
|  | } | 
|  |  | 
|  | func TestSplicePipePool(t *testing.T) { | 
|  | if runtime.Compiler == "gccgo" { | 
|  | t.Skip("gofrontend conservative stack collection causes this test to fail") | 
|  | } | 
|  |  | 
|  | const N = 64 | 
|  | var ( | 
|  | p          *poll.SplicePipe | 
|  | ps         []*poll.SplicePipe | 
|  | allFDs     []int | 
|  | pendingFDs sync.Map // fd → struct{}{} | 
|  | err        error | 
|  | ) | 
|  |  | 
|  | closeHook.Store(func(fd int) { pendingFDs.Delete(fd) }) | 
|  | t.Cleanup(func() { closeHook.Store((func(int))(nil)) }) | 
|  |  | 
|  | for i := 0; i < N; i++ { | 
|  | p, _, err = poll.GetPipe() | 
|  | if err != nil { | 
|  | t.Skipf("failed to create pipe due to error(%v), skip this test", err) | 
|  | } | 
|  | _, pwfd := poll.GetPipeFds(p) | 
|  | allFDs = append(allFDs, pwfd) | 
|  | pendingFDs.Store(pwfd, struct{}{}) | 
|  | ps = append(ps, p) | 
|  | } | 
|  | for _, p = range ps { | 
|  | poll.PutPipe(p) | 
|  | } | 
|  | ps = nil | 
|  | p = nil | 
|  |  | 
|  | // Exploit the timeout of "go test" as a timer for the subsequent verification. | 
|  | timeout := 5 * time.Minute | 
|  | if deadline, ok := t.Deadline(); ok { | 
|  | timeout = deadline.Sub(time.Now()) | 
|  | timeout -= timeout / 10 // Leave 10% headroom for cleanup. | 
|  | } | 
|  | expiredTime := time.NewTimer(timeout) | 
|  | defer expiredTime.Stop() | 
|  |  | 
|  | // Trigger garbage collection repeatedly, waiting for all pipes in sync.Pool | 
|  | // to either be deallocated and closed, or to time out. | 
|  | for { | 
|  | runtime.GC() | 
|  | time.Sleep(10 * time.Millisecond) | 
|  |  | 
|  | // Detect whether all pipes are closed properly. | 
|  | var leakedFDs []int | 
|  | pendingFDs.Range(func(k, v any) bool { | 
|  | leakedFDs = append(leakedFDs, k.(int)) | 
|  | return true | 
|  | }) | 
|  | if len(leakedFDs) == 0 { | 
|  | break | 
|  | } | 
|  |  | 
|  | select { | 
|  | case <-expiredTime.C: | 
|  | t.Logf("all descriptors: %v", allFDs) | 
|  | t.Fatalf("leaked descriptors: %v", leakedFDs) | 
|  | default: | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | func BenchmarkSplicePipe(b *testing.B) { | 
|  | b.Run("SplicePipeWithPool", func(b *testing.B) { | 
|  | for i := 0; i < b.N; i++ { | 
|  | p, _, err := poll.GetPipe() | 
|  | if err != nil { | 
|  | continue | 
|  | } | 
|  | poll.PutPipe(p) | 
|  | } | 
|  | }) | 
|  | b.Run("SplicePipeWithoutPool", func(b *testing.B) { | 
|  | for i := 0; i < b.N; i++ { | 
|  | p := poll.NewPipe() | 
|  | if p == nil { | 
|  | b.Skip("newPipe returned nil") | 
|  | } | 
|  | poll.DestroyPipe(p) | 
|  | } | 
|  | }) | 
|  | } | 
|  |  | 
|  | func BenchmarkSplicePipePoolParallel(b *testing.B) { | 
|  | b.RunParallel(func(pb *testing.PB) { | 
|  | for pb.Next() { | 
|  | p, _, err := poll.GetPipe() | 
|  | if err != nil { | 
|  | continue | 
|  | } | 
|  | poll.PutPipe(p) | 
|  | } | 
|  | }) | 
|  | } | 
|  |  | 
|  | func BenchmarkSplicePipeNativeParallel(b *testing.B) { | 
|  | b.RunParallel(func(pb *testing.PB) { | 
|  | for pb.Next() { | 
|  | p := poll.NewPipe() | 
|  | if p == nil { | 
|  | b.Skip("newPipe returned nil") | 
|  | } | 
|  | poll.DestroyPipe(p) | 
|  | } | 
|  | }) | 
|  | } |