| // Copyright 2026 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Package inmemqueue provides an in-memory queue implementation that can be |
| // used for scheduling of fetch actions. |
| package inmemqueue |
| |
| import ( |
| "context" |
| "fmt" |
| "time" |
| |
| "golang.org/x/pkgsite/internal" |
| "golang.org/x/pkgsite/internal/experiment" |
| "golang.org/x/pkgsite/internal/log" |
| "golang.org/x/pkgsite/internal/queue" |
| ) |
| |
| // InMemory is a Queue implementation that schedules in-process fetch |
| // operations. Unlike the GCP task queue, it will not automatically retry tasks |
| // on failure. |
| // |
| // This should only be used for local development. |
| type InMemory struct { |
| queue chan internal.Modver |
| done chan struct{} |
| experiments []string |
| } |
| |
| type InMemoryProcessFunc func(context.Context, string, string) (int, error) |
| |
| // New creates a new InMemory that asynchronously fetches from proxyClient and |
| // stores in db. It uses workerCount parallelism to execute these fetches. |
| func New(ctx context.Context, workerCount int, experiments []string, processFunc InMemoryProcessFunc) *InMemory { |
| q := &InMemory{ |
| queue: make(chan internal.Modver, 1000), |
| experiments: experiments, |
| done: make(chan struct{}), |
| } |
| sem := make(chan struct{}, workerCount) |
| go func() { |
| for v := range q.queue { |
| select { |
| case <-ctx.Done(): |
| return |
| case sem <- struct{}{}: |
| } |
| |
| // If a worker is available, make a request to the fetch service inside a |
| // goroutine and wait for it to finish. |
| go func(v internal.Modver) { |
| defer func() { <-sem }() |
| |
| log.Infof(ctx, "Fetch requested: %s (workerCount = %d)", v, cap(sem)) |
| |
| fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) |
| fetchCtx = experiment.NewContext(fetchCtx, experiments...) |
| defer cancel() |
| |
| if _, err := processFunc(fetchCtx, v.Path, v.Version); err != nil { |
| log.Error(fetchCtx, err) |
| } |
| }(v) |
| } |
| for i := 0; i < cap(sem); i++ { |
| select { |
| case <-ctx.Done(): |
| panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err())) |
| case sem <- struct{}{}: |
| } |
| } |
| close(q.done) |
| }() |
| return q |
| } |
| |
| // ScheduleFetch pushes a fetch task into the local queue to be processed |
| // asynchronously. |
| func (q *InMemory) ScheduleFetch(ctx context.Context, modulePath, version string, _ *queue.Options) (bool, error) { |
| q.queue <- internal.Modver{Path: modulePath, Version: version} |
| return true, nil |
| } |
| |
| // WaitForTesting waits for all queued requests to finish. It should only be |
| // used by test code. |
| func (q *InMemory) WaitForTesting(ctx context.Context) { |
| close(q.queue) |
| <-q.done |
| } |