blob: f6aa8b9ddfe178e5d64d6a7f4620f604d2bf58b3 [file] [log] [blame]
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
/*
* Project: kubernetes
* Issue or PR : https://github.com/kubernetes/kubernetes/pull/13135
* Buggy version: 6ced66249d4fd2a81e86b4a71d8df0139fe5ceae
* fix commit-id: a12b7edc42c5c06a2e7d9f381975658692951d5a
* Flaky: 93/100
*/
package main
import (
"os"
"runtime/pprof"
"sync"
"time"
)
func init() {
register("Kubernetes13135", Kubernetes13135)
}
var (
StopChannel_kubernetes13135 chan struct{}
)
func Util_kubernetes13135(f func(), period time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-stopCh:
return
default:
}
func() {
f()
}()
time.Sleep(period)
}
}
type Store_kubernetes13135 interface {
Add(obj interface{})
Replace(obj interface{})
}
type Reflector_kubernetes13135 struct {
store Store_kubernetes13135
}
func (r *Reflector_kubernetes13135) ListAndWatch(stopCh <-chan struct{}) error {
r.syncWith()
return nil
}
func NewReflector_kubernetes13135(store Store_kubernetes13135) *Reflector_kubernetes13135 {
return &Reflector_kubernetes13135{
store: store,
}
}
func (r *Reflector_kubernetes13135) syncWith() {
r.store.Replace(nil)
}
type Cacher_kubernetes13135 struct {
sync.Mutex
initialized sync.WaitGroup
initOnce sync.Once
watchCache *WatchCache_kubernetes13135
reflector *Reflector_kubernetes13135
}
func (c *Cacher_kubernetes13135) processEvent() {
c.Lock()
defer c.Unlock()
}
func (c *Cacher_kubernetes13135) startCaching(stopChannel <-chan struct{}) {
c.Lock()
for {
err := c.reflector.ListAndWatch(stopChannel)
if err == nil {
break
}
}
}
type WatchCache_kubernetes13135 struct {
sync.RWMutex
onReplace func()
onEvent func()
}
func (w *WatchCache_kubernetes13135) SetOnEvent(onEvent func()) {
w.Lock()
defer w.Unlock()
w.onEvent = onEvent
}
func (w *WatchCache_kubernetes13135) SetOnReplace(onReplace func()) {
w.Lock()
defer w.Unlock()
w.onReplace = onReplace
}
func (w *WatchCache_kubernetes13135) processEvent() {
w.Lock()
defer w.Unlock()
if w.onEvent != nil {
w.onEvent()
}
}
func (w *WatchCache_kubernetes13135) Add(obj interface{}) {
w.processEvent()
}
func (w *WatchCache_kubernetes13135) Replace(obj interface{}) {
w.Lock()
defer w.Unlock()
if w.onReplace != nil {
w.onReplace()
}
}
func NewCacher_kubernetes13135(stopCh <-chan struct{}) *Cacher_kubernetes13135 {
watchCache := &WatchCache_kubernetes13135{}
cacher := &Cacher_kubernetes13135{
initialized: sync.WaitGroup{},
watchCache: watchCache,
reflector: NewReflector_kubernetes13135(watchCache),
}
cacher.initialized.Add(1)
watchCache.SetOnReplace(func() {
cacher.initOnce.Do(func() { cacher.initialized.Done() })
cacher.Unlock()
})
watchCache.SetOnEvent(cacher.processEvent)
go Util_kubernetes13135(func() { cacher.startCaching(stopCh) }, 0, stopCh) // G2
cacher.initialized.Wait()
return cacher
}
func Kubernetes13135() {
prof := pprof.Lookup("goroutineleak")
defer func() {
time.Sleep(100 * time.Millisecond)
prof.WriteTo(os.Stdout, 2)
}()
StopChannel_kubernetes13135 = make(chan struct{})
for i := 0; i < 50; i++ {
go func() {
// Should create a local channel. Using a single global channel
// concurrently will cause a deadlock which does not actually exist
// in the original microbenchmark.
StopChannel_kubernetes13135 := make(chan struct{})
c := NewCacher_kubernetes13135(StopChannel_kubernetes13135) // G1
go c.watchCache.Add(nil) // G3
go close(StopChannel_kubernetes13135)
}()
}
}