blob: 79847140510228c7cfa852222f7de63e34c69da3 [file] [log] [blame]
// Copyright 2026 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package inmemqueue
import (
"context"
"errors"
"sync"
"testing"
)
func TestInMemory_ScheduleAndProcess(t *testing.T) {
ctx := context.Background()
var processed int
var mu sync.Mutex
processFunc := func(ctx context.Context, path, version string) (int, error) {
mu.Lock()
processed++
mu.Unlock()
return 200, nil
}
q := New(ctx, 2, nil, processFunc)
tests := []struct {
path string
version string
}{
{"example.com/mod1", "v1.0.0"},
{"example.com/mod2", "v1.1.0"},
{"example.com/mod3", "v2.0.0"},
}
for _, tc := range tests {
ok, err := q.ScheduleFetch(ctx, tc.path, tc.version, nil)
if err != nil {
t.Fatalf("ScheduleFetch(%q, %q) returned error: %v", tc.path, tc.version, err)
}
if !ok {
t.Errorf("ScheduleFetch(%q, %q) = false, want true", tc.path, tc.version)
}
}
q.WaitForTesting(ctx)
if got, want := processed, len(tests); got != want {
t.Errorf("processed tasks = %d, want %d", got, want)
}
}
func TestInMemory_ProcessFuncError(t *testing.T) {
ctx := context.Background()
processFunc := func(ctx context.Context, path, version string) (int, error) {
return 500, errors.New("simulated fetch error")
}
q := New(ctx, 1, nil, processFunc)
_, err := q.ScheduleFetch(ctx, "example.com/error-mod", "v1.0.0", nil)
if err != nil {
t.Fatalf("ScheduleFetch returned error: %v", err)
}
// This should complete without panicking or deadlocking.
q.WaitForTesting(ctx)
}
func TestInMemory_WorkerConcurrency(t *testing.T) {
ctx := context.Background()
workerCount := 2
var active, max int
var mu sync.Mutex
var wg sync.WaitGroup
blockCh := make(chan struct{})
processFunc := func(ctx context.Context, path, version string) (int, error) {
mu.Lock()
active++
if active > max {
max = active
}
mu.Unlock()
<-blockCh
mu.Lock()
active--
mu.Unlock()
wg.Done()
return 200, nil
}
q := New(ctx, workerCount, nil, processFunc)
taskCount := 5
wg.Add(taskCount)
for range taskCount {
_, err := q.ScheduleFetch(ctx, "example.com/concurrent", "v1.0.0", nil)
if err != nil {
t.Fatalf("ScheduleFetch returned error: %v", err)
}
}
close(blockCh)
wg.Wait()
q.WaitForTesting(ctx)
if max > workerCount {
t.Errorf("max concurrent workers = %d, want <= %d", max, workerCount)
}
}