// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package queue

import (
	"container/heap"
	"context"
	"sort"
	"sync"
)

// NewQuota returns an initialized *Quota ready for use.
func NewQuota() *Quota {
	return &Quota{
		queue: new(buildletQueue),
	}
}

// Quota manages a queue for a single quota.
type Quota struct {
	mu    sync.Mutex
	queue *buildletQueue
	limit int
	used  int
	// On GCE, other instances run in the same project as buildlet
	// instances. Track those separately, and subtract from available.
	untrackedUsed int
}

func (q *Quota) push(item *Item) {
	defer q.updated()
	q.mu.Lock()
	defer q.mu.Unlock()
	heap.Push(q.queue, item)
}

func (q *Quota) cancel(item *Item) {
	defer q.updated()
	q.mu.Lock()
	defer q.mu.Unlock()
	if item.index != -1 {
		heap.Remove(q.queue, item.index)
	}
}

func (q *Quota) updated() {
	for {
		if q.tryPop() == nil {
			return
		}
	}
}

// tryPop returns a Item if quota is available and unblocks the
// AwaitQueue call.
func (q *Quota) tryPop() *Item {
	q.mu.Lock()
	defer q.mu.Unlock()
	if !(q.queue.Len() != 0 && q.queue.Peek().cost <= q.limit-q.used-q.untrackedUsed) {
		return nil
	}
	b := q.queue.PopBuildlet()
	q.used += b.cost
	b.ready()
	return b
}

// Empty returns true when there are no items in the queue.
func (q *Quota) Empty() bool {
	return q.Len() == 0
}

// Len returns the number of items in the queue.
func (q *Quota) Len() int {
	q.mu.Lock()
	defer q.mu.Unlock()
	return q.queue.Len()
}

// UpdateQuotas updates the limit and used values on the queue.
func (q *Quota) UpdateQuotas(used, limit int) {
	defer q.updated()
	q.mu.Lock()
	defer q.mu.Unlock()
	q.limit = limit
	q.used = used
}

// UpdateLimit updates the limit values on the queue.
func (q *Quota) UpdateLimit(limit int) {
	defer q.updated()
	q.mu.Lock()
	defer q.mu.Unlock()
	q.limit = limit
}

func (q *Quota) UpdateUntracked(n int) {
	defer q.updated()
	q.mu.Lock()
	defer q.mu.Unlock()
	q.untrackedUsed = n
}

// ReturnQuota decrements the used quota value by v.
func (q *Quota) ReturnQuota(v int) {
	defer q.updated()
	q.mu.Lock()
	defer q.mu.Unlock()
	q.used -= v
}

type Usage struct {
	Used          int
	Limit         int
	UntrackedUsed int
}

// Quotas returns the used, limit, and untracked values for the queue.
func (q *Quota) Quotas() Usage {
	q.mu.Lock()
	defer q.mu.Unlock()
	return Usage{
		Used:          q.used,
		Limit:         q.limit,
		UntrackedUsed: q.untrackedUsed,
	}
}

// Enqueue a build and return an Item. See Item's documentation for
// waiting and releasing quota.
func (q *Quota) Enqueue(cost int, si *SchedItem) *Item {
	item := &Item{
		cost:    cost,
		release: func() { q.ReturnQuota(cost) },
		popped:  make(chan struct{}),
		build:   si,
	}
	item.cancel = func() { q.cancel(item) }
	q.push(item)
	return item
}

// AwaitQueue enqueues a build and returns once the item is unblocked
// by quota, by order of minimum priority.
//
// If the provided context is cancelled before popping, the item is
// removed from the queue and an error is returned.
func (q *Quota) AwaitQueue(ctx context.Context, cost int, si *SchedItem) error {
	if err := ctx.Err(); err != nil {
		return ctx.Err()
	}
	return q.Enqueue(cost, si).Await(ctx)
}

type QuotaStats struct {
	Usage
	Items []ItemStats
}

type ItemStats struct {
	Build *SchedItem
	Cost  int
}

func (q *Quota) ToExported() *QuotaStats {
	q.mu.Lock()
	qs := &QuotaStats{
		Usage: Usage{
			Used:          q.used,
			Limit:         q.limit,
			UntrackedUsed: q.untrackedUsed,
		},
		Items: make([]ItemStats, q.queue.Len()),
	}
	for i, item := range *q.queue {
		qs.Items[i].Build = item.SchedItem()
		qs.Items[i].Cost = item.cost
	}
	q.mu.Unlock()

	sort.Slice(qs.Items, func(i, j int) bool {
		return qs.Items[i].Build.Less(qs.Items[j].Build)
	})
	return qs
}

// An Item is something we manage in a priority buildletQueue.
type Item struct {
	build   *SchedItem
	cancel  func()
	cost    int
	popped  chan struct{}
	release func()
	// index is maintained by the heap.Interface methods.
	index int
}

// SchedItem returns a copy of the SchedItem for a build.
func (i *Item) SchedItem() *SchedItem {
	build := *i.build
	return &build
}

// Await blocks until the Item holds the necessary quota amount, or the
// context is cancelled.
//
// On success, the caller must call ReturnQuota() to release the quota.
func (i *Item) Await(ctx context.Context) error {
	if ctx.Err() != nil {
		i.cancel()
		i.ReturnQuota()
		return ctx.Err()
	}
	select {
	case <-ctx.Done():
		i.cancel()
		i.ReturnQuota()
		return ctx.Err()
	case <-i.popped:
		return nil
	}
}

// ReturnQuota returns quota to the Queue. ReturnQuota is a no-op if
// the item has never been popped.
func (i *Item) ReturnQuota() {
	select {
	case <-i.popped:
		i.release()
	default:
		// We haven't been popped yet, nothing to release.
		return
	}
}

func (i *Item) ready() {
	close(i.popped)
}

// A buildletQueue implements heap.Interface and holds Items.
type buildletQueue []*Item

func (q buildletQueue) Len() int { return len(q) }

func (q buildletQueue) Less(i, j int) bool {
	return q[i].build.Less(q[j].build)
}

func (q buildletQueue) Swap(i, j int) {
	q[i], q[j] = q[j], q[i]
	q[i].index = i
	q[j].index = j
}

func (q *buildletQueue) Push(x interface{}) {
	n := len(*q)
	item := x.(*Item)
	item.index = n
	*q = append(*q, item)
}

func (q *buildletQueue) Pop() interface{} {
	old := *q
	n := len(old)
	item := old[n-1]
	old[n-1] = nil  // avoid memory leak
	item.index = -1 // necessary to avoid races in (*Queue).cancel().
	*q = old[0 : n-1]
	return item
}

func (q *buildletQueue) PopBuildlet() *Item {
	return heap.Pop(q).(*Item)
}

func (q buildletQueue) Peek() *Item {
	return q[0]
}
