blob: 8a5099b20a6a6a1e5c34e5055bac3a90cbe9bc6b [file] [log] [blame]
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package queue provides queue interface and an in-memory
// implementation that can be used for asynchronous scheduling
// of fetch actions.
package queue
import (
"context"
"fmt"
"time"
)
// A Task can produce information needed for Cloud Tasks.
type Task interface {
Name() string // Human-readable string for the task. Need not be unique.
Path() string // URL path
Params() string // URL (escaped) query params
}
// A Queue provides an interface for asynchronous scheduling of tasks.
type Queue interface {
// Enqueue enqueues a task request.
// It reports whether a new task was actually added to the queue.
Enqueue(context.Context, Task, *Options) (bool, error)
}
// Options is used to provide option arguments for a task queue.
type Options struct {
// TaskNameSuffix is appended to the task name to force reprocessing of
// tasks that would normally be de-duplicated.
TaskNameSuffix string
}
// Metadata is data needed to create a Cloud Task Queue.
type Metadata struct {
Project string // Name of the gaby project.
Location string // Location of the queue (e.g., us-central1).
QueueName string // Unique ID of the queue.
QueueURL string // URL of the Cloud Run service.
ServiceAccount string // Email of the service account associated with the project.
}
// InMemory is a Queue implementation that schedules in-process fetch
// operations. Unlike the GCP task queue, it will not automatically
// retry tasks on failure.
//
// This should only be used for local development and testing.
type InMemory struct {
queue chan Task
done chan struct{}
errs []error
}
// NewInMemory creates a new InMemory that asynchronously schedules
// tasks and executes processFunc on them. It uses workerCount parallelism
// to accomplish this.
func NewInMemory(ctx context.Context, workerCount int, processFunc func(context.Context, Task) error) *InMemory {
q := &InMemory{
queue: make(chan Task, 1000),
done: make(chan struct{}),
}
sem := make(chan struct{}, workerCount)
go func() {
for v := range q.queue {
select {
case <-ctx.Done():
return
case sem <- struct{}{}:
}
// If a worker is available, spawn a task in a
// goroutine and wait for it to finish.
go func(t Task) {
defer func() { <-sem }()
fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
if err := processFunc(fetchCtx, t); err != nil {
q.errs = append(q.errs, err)
}
}(v)
}
for i := 0; i < cap(sem); i++ {
select {
case <-ctx.Done():
// If context is cancelled here, there is no way for us to
// do cleanup. We panic here since there is no other way to
// report an error.
panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err()))
case sem <- struct{}{}:
}
}
close(q.done)
}()
return q
}
// Enqueue pushes a scan task into the local queue to be processed
// asynchronously.
func (q *InMemory) Enqueue(ctx context.Context, task Task, _ *Options) (bool, error) {
q.queue <- task
return true, nil
}
// Wait waits for all queued requests to finish.
func (q *InMemory) Wait(ctx context.Context) {
close(q.queue)
<-q.done
}
func (q *InMemory) Errors() []error {
return q.errs
}