blob: 00b580c78f1c16e898082ed9e33d43329b378eef [file] [log] [blame]
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package queue provides a queue interface that can be used for
// asynchronous scheduling of fetch actions.
package queue
import (
"context"
"fmt"
"time"
"golang.org/x/pkgsite/internal"
"golang.org/x/pkgsite/internal/experiment"
"golang.org/x/pkgsite/internal/log"
)
// A Queue provides an interface for asynchronous scheduling of fetch actions.
type Queue interface {
ScheduleFetch(ctx context.Context, modulePath, version string, opts *Options) (bool, error)
}
// Options is used to provide option arguments for a task queue.
type Options struct {
// DisableProxyFetch reports whether proxyfetch should be set to off when
// making a fetch request.
DisableProxyFetch bool
// Suffix is used to force reprocessing of tasks that would normally be
// de-duplicated. It is appended to the task name.
Suffix string
// Source is the source that requested the task to be queued. It is
// either "frontend" or the empty string if it is the worker.
Source string
}
const (
DisableProxyFetchParam = "proxyfetch"
DisableProxyFetchValue = "off"
SourceParam = "source"
SourceFrontendValue = "frontend"
SourceWorkerValue = "worker"
)
// InMemory is a Queue implementation that schedules in-process fetch
// operations. Unlike the GCP task queue, it will not automatically retry tasks
// on failure.
//
// This should only be used for local development.
type InMemory struct {
queue chan internal.Modver
done chan struct{}
experiments []string
}
type InMemoryProcessFunc func(context.Context, string, string) (int, error)
// NewInMemory creates a new InMemory that asynchronously fetches
// from proxyClient and stores in db. It uses workerCount parallelism to
// execute these fetches.
func NewInMemory(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory {
q := &InMemory{
queue: make(chan internal.Modver, 1000),
experiments: experiments,
done: make(chan struct{}),
}
sem := make(chan struct{}, workerCount)
go func() {
for v := range q.queue {
select {
case <-ctx.Done():
return
case sem <- struct{}{}:
}
// If a worker is available, make a request to the fetch service inside a
// goroutine and wait for it to finish.
go func(v internal.Modver) {
defer func() { <-sem }()
log.Infof(ctx, "Fetch requested: %s (workerCount = %d)", v, cap(sem))
fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
fetchCtx = experiment.NewContext(fetchCtx, experiments...)
defer cancel()
if _, err := processFunc(fetchCtx, v.Path, v.Version); err != nil {
log.Error(fetchCtx, err)
}
}(v)
}
for i := 0; i < cap(sem); i++ {
select {
case <-ctx.Done():
panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err()))
case sem <- struct{}{}:
}
}
close(q.done)
}()
return q
}
// ScheduleFetch pushes a fetch task into the local queue to be processed
// asynchronously.
func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *Options) (bool, error) {
q.queue <- internal.Modver{Path: modulePath, Version: version}
return true, nil
}
// WaitForTesting waits for all queued requests to finish. It should only be
// used by test code.
func (q *InMemory) WaitForTesting(ctx context.Context) {
close(q.queue)
<-q.done
}