internal/worker: enqueue tasks concurrently
Enqueuing a lot of tasks can take a while, so do it concurrently.
Change-Id: I460ffa9c4287ab32471078d72d34a8695ac7dfff
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/256758
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Julie Qiu <julie@golang.org>
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 54a2ba6..8494f49 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -16,6 +16,7 @@
"reflect"
"strconv"
"strings"
+ "sync"
"time"
"cloud.google.com/go/errorreporting"
@@ -90,6 +91,7 @@
indexTemplate: t1,
versionsTemplate: t2,
}
+
return &Server{
cfg: cfg,
db: scfg.DB,
@@ -370,20 +372,38 @@
span.Annotate([]trace.Attribute{trace.Int64Attribute("modules to fetch", int64(len(modules)))}, "processed limit")
w.Header().Set("Content-Type", "text/plain")
log.Infof(ctx, "Scheduling modules to be fetched: queuing %d modules", len(modules))
- nEnqueued := 0
+
+ // Enqueue concurrently, because sequentially takes a while.
+ const concurrentEnqueues = 10
+ var (
+ mu sync.Mutex
+ nEnqueued, nErrors int
+ )
+ sem := make(chan struct{}, concurrentEnqueues)
for _, m := range modules {
- stats.RecordWithTags(r.Context(),
- []tag.Mutator{tag.Upsert(keyEnqueueStatus, strconv.Itoa(m.Status))},
- enqueueStatus.M(int64(m.Status)))
- enqueued, err := s.queue.ScheduleFetch(ctx, m.ModulePath, m.Version, suffixParam, s.taskIDChangeInterval)
- if err != nil {
- return err
- }
- if enqueued {
- nEnqueued++
- }
+ m := m
+ sem <- struct{}{}
+ go func() {
+ defer func() { <-sem }()
+ enqueued, err := s.queue.ScheduleFetch(ctx, m.ModulePath, m.Version, suffixParam, s.taskIDChangeInterval)
+ mu.Lock()
+ if err != nil {
+ log.Errorf(ctx, "enqueuing: %v", err)
+ nErrors++
+ } else if enqueued {
+ nEnqueued++
+ stats.RecordWithTags(r.Context(),
+ []tag.Mutator{tag.Upsert(keyEnqueueStatus, strconv.Itoa(m.Status))},
+ enqueueStatus.M(int64(m.Status)))
+ }
+ mu.Unlock()
+ }()
}
- log.Infof(ctx, "Successfully scheduled modules to be fetched: %d modules enqueued", nEnqueued)
+ // Wait for goroutines to finish.
+ for i := 0; i < concurrentEnqueues; i++ {
+ sem <- struct{}{}
+ }
+ log.Infof(ctx, "Successfully scheduled modules to be fetched: %d modules enqueued, %d errors", nEnqueued, nErrors)
return nil
}