blob: b1621435b009ca24e2584402e395c19c475c701d [file]
// Copyright 2026 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package pgqueue
import (
"context"
"errors"
"log"
"os"
"sync"
"testing"
"time"
_ "github.com/jackc/pgx/v4/stdlib"
"golang.org/x/pkgsite/internal/database"
"golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/queue"
)
const testDBName = "discovery_pgqueue_test"
var testDB *database.DB
func TestMain(m *testing.M) {
if os.Getenv("GO_DISCOVERY_TESTDB") != "true" {
log.Printf("SKIPPING: GO_DISCOVERY_TESTDB is not set to true")
return
}
if err := database.CreateDBIfNotExists(testDBName); err != nil {
if errors.Is(err, derrors.NotFound) {
log.Printf("SKIPPING: could not connect to DB: %v", err)
return
}
log.Fatal(err)
}
db, err := database.Open("pgx", database.DBConnURI(testDBName), "test")
if err != nil {
log.Fatal(err)
}
testDB = db
os.Exit(m.Run())
}
func setup(t *testing.T) *Queue {
t.Helper()
ctx := context.Background()
if testDB == nil {
t.Skip("test database not available")
}
// Drop and recreate the table for a clean slate.
if _, err := testDB.Exec(ctx, `DROP TABLE IF EXISTS queue_tasks`); err != nil {
t.Fatal(err)
}
q, err := New(ctx, testDB)
if err != nil {
t.Fatal(err)
}
return q
}
func TestScheduleFetch(t *testing.T) {
q := setup(t)
ctx := context.Background()
enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil)
if err != nil {
t.Fatal(err)
}
if !enqueued {
t.Error("ScheduleFetch() = false, want true")
}
// Same module@version should be deduplicated.
enqueued, err = q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil)
if err != nil {
t.Fatal(err)
}
if enqueued {
t.Error("ScheduleFetch() duplicate = true, want false")
}
}
func TestScheduleFetchWithSuffix(t *testing.T) {
q := setup(t)
ctx := context.Background()
if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
t.Fatal(err)
}
// Same module@version with a suffix should not be deduplicated.
enqueued, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", &queue.Options{Suffix: "reprocess-1"})
if err != nil {
t.Fatal(err)
}
if !enqueued {
t.Error("ScheduleFetch() with suffix = false, want true")
}
}
func TestPollProcessesTasks(t *testing.T) {
q := setup(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
t.Fatal(err)
}
if _, err := q.ScheduleFetch(ctx, "golang.org/x/net", "v0.1.0", nil); err != nil {
t.Fatal(err)
}
var mu sync.Mutex
var processed []string
go q.Poll(ctx, 2, func(ctx context.Context, modulePath, version string) (int, error) {
mu.Lock()
processed = append(processed, modulePath+"@"+version)
mu.Unlock()
return 200, nil
})
// Wait for tasks to be processed.
deadline := time.After(30 * time.Second)
for {
select {
case <-deadline:
t.Fatal("timed out waiting for tasks to be processed")
case <-time.After(100 * time.Millisecond):
mu.Lock()
n := len(processed)
mu.Unlock()
if n == 2 {
cancel()
return
}
}
}
}
func TestPollDeletesTaskOnError(t *testing.T) {
q := setup(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
t.Fatal(err)
}
done := make(chan struct{})
go func() {
q.Poll(ctx, 1, func(ctx context.Context, modulePath, version string) (int, error) {
close(done)
return 500, errors.New("something went wrong")
})
}()
select {
case <-done:
case <-time.After(30 * time.Second):
t.Fatal("timed out waiting for task to be processed")
}
cancel()
// Verify the task was deleted despite the error.
var count int
err := testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count)
if err != nil {
t.Fatal(err)
}
// Allow a brief moment for the delete to complete.
time.Sleep(100 * time.Millisecond)
err = testDB.QueryRow(context.Background(), `SELECT count(*) FROM queue_tasks`).Scan(&count)
if err != nil {
t.Fatal(err)
}
if count != 0 {
t.Errorf("queue_tasks count = %d, want 0", count)
}
}
func TestPollReclaimsStalledTasks(t *testing.T) {
q := setup(t)
ctx := context.Background()
if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
t.Fatal(err)
}
// Simulate a stalled task by setting started_at to the past.
if _, err := testDB.Exec(ctx,
`UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil {
t.Fatal(err)
}
pollCtx, cancel := context.WithCancel(ctx)
defer cancel()
done := make(chan struct{})
go func() {
q.Poll(pollCtx, 1, func(ctx context.Context, modulePath, version string) (int, error) {
close(done)
return 200, nil
})
}()
select {
case <-done:
// Task was reclaimed and processed.
case <-time.After(30 * time.Second):
t.Fatal("timed out waiting for stalled task to be reclaimed")
}
cancel()
}
func TestStalledWorkerDeleteIsNoop(t *testing.T) {
// Worker1 claims a task, but stalls. Worker2 claims it later. Worker1
// unstalls and completes, but does not delete the task from the queue: once
// worker2 has the task, only it may do the delete. It unstalls and finishes
// and we assert the task is deleted from the queue.
q := setup(t)
ctx := context.Background()
if _, err := q.ScheduleFetch(ctx, "golang.org/x/text", "v0.3.0", nil); err != nil {
t.Fatal(err)
}
worker1Claimed := make(chan struct{})
worker1Stall := make(chan struct{})
worker1Done := make(chan struct{})
worker2Claimed := make(chan struct{})
worker2Stall := make(chan struct{})
worker2Done := make(chan struct{})
// Worker 1 claims the task and stalls.
go func() {
q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) {
close(worker1Claimed)
<-worker1Stall
return 200, nil
})
close(worker1Done)
}()
// Wait for worker 1 to enter processFunc, then backdate started_at so the
// task is eligible for reclaim.
<-worker1Claimed
if _, err := testDB.Exec(ctx, `UPDATE queue_tasks SET started_at = NOW() - INTERVAL '10 minutes'`); err != nil {
t.Fatal(err)
}
// Worker 2 reclaims the task and stalls.
go func() {
q.claimAndProcess(ctx, func(ctx context.Context, modulePath, version string) (int, error) {
close(worker2Claimed)
<-worker2Stall
return 200, nil
})
close(worker2Done)
}()
<-worker2Claimed
// Unstall worker 1. Its delete should be a no-op because worker 2 has a
// newer started_at.
close(worker1Stall)
<-worker1Done
var count int
if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil {
t.Fatal(err)
}
if count != 1 {
t.Errorf("after worker 1: task count = %d, want 1 (worker 1 should not have deleted it)", count)
}
// Unstall worker 2. Its delete should succeed.
close(worker2Stall)
<-worker2Done
if err := testDB.QueryRow(ctx, `SELECT count(*) FROM queue_tasks`).Scan(&count); err != nil {
t.Fatal(err)
}
if count != 0 {
t.Errorf("after worker 2: task count = %d, want 0", count)
}
}