internal/worker: enqueue tasks concurrently

Enqueuing a lot of tasks can take a while, so do it concurrently.

Change-Id: I460ffa9c4287ab32471078d72d34a8695ac7dfff
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/256758
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Julie Qiu <julie@golang.org>
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 54a2ba6..8494f49 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -16,6 +16,7 @@
 	"reflect"
 	"strconv"
 	"strings"
+	"sync"
 	"time"
 
 	"cloud.google.com/go/errorreporting"
@@ -90,6 +91,7 @@
 		indexTemplate:    t1,
 		versionsTemplate: t2,
 	}
+
 	return &Server{
 		cfg:                  cfg,
 		db:                   scfg.DB,
@@ -370,20 +372,38 @@
 	span.Annotate([]trace.Attribute{trace.Int64Attribute("modules to fetch", int64(len(modules)))}, "processed limit")
 	w.Header().Set("Content-Type", "text/plain")
 	log.Infof(ctx, "Scheduling modules to be fetched: queuing %d modules", len(modules))
-	nEnqueued := 0
+
+	// Enqueue concurrently, because sequentially takes a while.
+	const concurrentEnqueues = 10
+	var (
+		mu                 sync.Mutex
+		nEnqueued, nErrors int
+	)
+	sem := make(chan struct{}, concurrentEnqueues)
 	for _, m := range modules {
-		stats.RecordWithTags(r.Context(),
-			[]tag.Mutator{tag.Upsert(keyEnqueueStatus, strconv.Itoa(m.Status))},
-			enqueueStatus.M(int64(m.Status)))
-		enqueued, err := s.queue.ScheduleFetch(ctx, m.ModulePath, m.Version, suffixParam, s.taskIDChangeInterval)
-		if err != nil {
-			return err
-		}
-		if enqueued {
-			nEnqueued++
-		}
+		m := m
+		sem <- struct{}{}
+		go func() {
+			defer func() { <-sem }()
+			enqueued, err := s.queue.ScheduleFetch(ctx, m.ModulePath, m.Version, suffixParam, s.taskIDChangeInterval)
+			mu.Lock()
+			if err != nil {
+				log.Errorf(ctx, "enqueuing: %v", err)
+				nErrors++
+			} else if enqueued {
+				nEnqueued++
+				stats.RecordWithTags(r.Context(),
+					[]tag.Mutator{tag.Upsert(keyEnqueueStatus, strconv.Itoa(m.Status))},
+					enqueueStatus.M(int64(m.Status)))
+			}
+			mu.Unlock()
+		}()
 	}
-	log.Infof(ctx, "Successfully scheduled modules to be fetched: %d modules enqueued", nEnqueued)
+	// Wait for goroutines to finish.
+	for i := 0; i < concurrentEnqueues; i++ {
+		sem <- struct{}{}
+	}
+	log.Infof(ctx, "Successfully scheduled modules to be fetched: %d modules enqueued, %d errors", nEnqueued, nErrors)
 	return nil
 }