blob: d525e03a03c6b3f316437f7b1a6b16e835969846 [file] [log] [blame]
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build linux || darwin
package queue
import (
"container/heap"
"context"
"sort"
"sync"
)
// NewQuota returns an initialized *Quota ready for use.
func NewQuota() *Quota {
return &Quota{
queue: new(buildletQueue),
}
}
// Quota manages a queue for a single quota.
type Quota struct {
mu sync.Mutex
queue *buildletQueue
limit int
used int
// On GCE, other instances run in the same project as buildlet
// instances. Track those separately, and subtract from available.
untrackedUsed int
}
func (q *Quota) push(item *Item) {
defer q.updated()
q.mu.Lock()
defer q.mu.Unlock()
heap.Push(q.queue, item)
}
func (q *Quota) cancel(item *Item) {
defer q.updated()
q.mu.Lock()
defer q.mu.Unlock()
if item.index != -1 {
heap.Remove(q.queue, item.index)
}
}
func (q *Quota) updated() {
for {
if q.tryPop() == nil {
return
}
}
}
// tryPop returns a Item if quota is available and unblocks the
// AwaitQueue call.
func (q *Quota) tryPop() *Item {
q.mu.Lock()
defer q.mu.Unlock()
if !(q.queue.Len() != 0 && q.queue.Peek().cost <= q.limit-q.used-q.untrackedUsed) {
return nil
}
b := q.queue.PopBuildlet()
q.used += b.cost
b.ready()
return b
}
// Empty returns true when there are no items in the queue.
func (q *Quota) Empty() bool {
return q.Len() == 0
}
// Len returns the number of items in the queue.
func (q *Quota) Len() int {
q.mu.Lock()
defer q.mu.Unlock()
return q.queue.Len()
}
// UpdateQuotas updates the limit and used values on the queue.
func (q *Quota) UpdateQuotas(used, limit int) {
defer q.updated()
q.mu.Lock()
defer q.mu.Unlock()
q.limit = limit
q.used = used
}
// UpdateLimit updates the limit values on the queue.
func (q *Quota) UpdateLimit(limit int) {
defer q.updated()
q.mu.Lock()
defer q.mu.Unlock()
q.limit = limit
}
func (q *Quota) UpdateUntracked(n int) {
defer q.updated()
q.mu.Lock()
defer q.mu.Unlock()
q.untrackedUsed = n
}
// ReturnQuota decrements the used quota value by v.
func (q *Quota) ReturnQuota(v int) {
defer q.updated()
q.mu.Lock()
defer q.mu.Unlock()
q.used -= v
}
type Usage struct {
Used int
Limit int
UntrackedUsed int
}
// Quotas returns the used, limit, and untracked values for the queue.
func (q *Quota) Quotas() Usage {
q.mu.Lock()
defer q.mu.Unlock()
return Usage{
Used: q.used,
Limit: q.limit,
UntrackedUsed: q.untrackedUsed,
}
}
// Enqueue a build and return an Item. See Item's documentation for
// waiting and releasing quota.
func (q *Quota) Enqueue(cost int, si *SchedItem) *Item {
item := &Item{
cost: cost,
release: func() { q.ReturnQuota(cost) },
popped: make(chan struct{}),
build: si,
}
item.cancel = func() { q.cancel(item) }
q.push(item)
return item
}
// AwaitQueue enqueues a build and returns once the item is unblocked
// by quota, by order of minimum priority.
//
// If the provided context is cancelled before popping, the item is
// removed from the queue and an error is returned.
func (q *Quota) AwaitQueue(ctx context.Context, cost int, si *SchedItem) error {
if err := ctx.Err(); err != nil {
return ctx.Err()
}
return q.Enqueue(cost, si).Await(ctx)
}
type QuotaStats struct {
Usage
Items []ItemStats
}
type ItemStats struct {
Build *SchedItem
Cost int
}
func (q *Quota) ToExported() *QuotaStats {
q.mu.Lock()
qs := &QuotaStats{
Usage: Usage{
Used: q.used,
Limit: q.limit,
UntrackedUsed: q.untrackedUsed,
},
Items: make([]ItemStats, q.queue.Len()),
}
for i, item := range *q.queue {
qs.Items[i].Build = item.SchedItem()
qs.Items[i].Cost = item.cost
}
q.mu.Unlock()
sort.Slice(qs.Items, func(i, j int) bool {
return qs.Items[i].Build.Less(qs.Items[j].Build)
})
return qs
}
// An Item is something we manage in a priority buildletQueue.
type Item struct {
build *SchedItem
cancel func()
cost int
popped chan struct{}
release func()
// index is maintained by the heap.Interface methods.
index int
}
// SchedItem returns a copy of the SchedItem for a build.
func (i *Item) SchedItem() *SchedItem {
build := *i.build
return &build
}
// Await blocks until the Item holds the necessary quota amount, or the
// context is cancelled.
//
// On success, the caller must call ReturnQuota() to release the quota.
func (i *Item) Await(ctx context.Context) error {
if ctx.Err() != nil {
i.cancel()
i.ReturnQuota()
return ctx.Err()
}
select {
case <-ctx.Done():
i.cancel()
i.ReturnQuota()
return ctx.Err()
case <-i.popped:
return nil
}
}
// ReturnQuota returns quota to the Queue. ReturnQuota is a no-op if
// the item has never been popped.
func (i *Item) ReturnQuota() {
select {
case <-i.popped:
i.release()
default:
// We haven't been popped yet, nothing to release.
return
}
}
func (i *Item) ready() {
close(i.popped)
}
// A buildletQueue implements heap.Interface and holds Items.
type buildletQueue []*Item
func (q buildletQueue) Len() int { return len(q) }
func (q buildletQueue) Less(i, j int) bool {
return q[i].build.Less(q[j].build)
}
func (q buildletQueue) Swap(i, j int) {
q[i], q[j] = q[j], q[i]
q[i].index = i
q[j].index = j
}
func (q *buildletQueue) Push(x interface{}) {
n := len(*q)
item := x.(*Item)
item.index = n
*q = append(*q, item)
}
func (q *buildletQueue) Pop() interface{} {
old := *q
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // necessary to avoid races in (*Queue).cancel().
*q = old[0 : n-1]
return item
}
func (q *buildletQueue) PopBuildlet() *Item {
return heap.Pop(q).(*Item)
}
func (q buildletQueue) Peek() *Item {
return q[0]
}