blob: 49905315a01b4c0b79ca0882b650c28d5736b255 [file] [log] [blame] [edit]
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a MIT
// license that can be found in the LICENSE file.
package main
import (
"os"
"runtime"
"runtime/pprof"
"sync"
"time"
)
func init() {
register("Serving2137", Serving2137)
}
type token_serving2137 struct{}
type request_serving2137 struct {
lock *sync.Mutex
accepted chan bool
}
type Breaker_serving2137 struct {
pendingRequests chan token_serving2137
activeRequests chan token_serving2137
}
func (b *Breaker_serving2137) Maybe(thunk func()) bool {
var t token_serving2137
select {
default:
// Pending request queue is full. Report failure.
return false
case b.pendingRequests <- t:
// Pending request has capacity.
// Wait for capacity in the active queue.
b.activeRequests <- t
// Defer releasing capacity in the active and pending request queue.
defer func() {
<-b.activeRequests
runtime.Gosched()
<-b.pendingRequests
}()
// Do the thing.
thunk()
// Report success
return true
}
}
func (b *Breaker_serving2137) concurrentRequest() request_serving2137 {
r := request_serving2137{lock: &sync.Mutex{}, accepted: make(chan bool, 1)}
r.lock.Lock()
var start sync.WaitGroup
start.Add(1)
go func() { // G2, G3
start.Done()
runtime.Gosched()
ok := b.Maybe(func() {
// Will block on locked mutex.
r.lock.Lock()
runtime.Gosched()
r.lock.Unlock()
})
r.accepted <- ok
}()
start.Wait() // Ensure that the go func has had a chance to execute.
return r
}
// Perform n requests against the breaker, returning mutexes for each
// request which succeeded, and a slice of bools for all requests.
func (b *Breaker_serving2137) concurrentRequests(n int) []request_serving2137 {
requests := make([]request_serving2137, n)
for i := range requests {
requests[i] = b.concurrentRequest()
}
return requests
}
func NewBreaker_serving2137(queueDepth, maxConcurrency int32) *Breaker_serving2137 {
return &Breaker_serving2137{
pendingRequests: make(chan token_serving2137, queueDepth+maxConcurrency),
activeRequests: make(chan token_serving2137, maxConcurrency),
}
}
func unlock_serving2137(req request_serving2137) {
req.lock.Unlock()
runtime.Gosched()
// Verify that function has completed
ok := <-req.accepted
runtime.Gosched()
// Requeue for next usage
req.accepted <- ok
}
func unlockAll_serving2137(requests []request_serving2137) {
for _, lc := range requests {
unlock_serving2137(lc)
}
}
func Serving2137() {
prof := pprof.Lookup("goroutineleak")
defer func() {
time.Sleep(100 * time.Millisecond)
prof.WriteTo(os.Stdout, 2)
}()
for i := 0; i < 1000; i++ {
go func() {
b := NewBreaker_serving2137(1, 1)
locks := b.concurrentRequests(2) // G1
unlockAll_serving2137(locks)
}()
}
}