internal/worker: add /poll and /enqueue endpoints.

Add a /poll endpoint that polls the index, but just writes
to module_version_states without enqueuing.

Add an /enqueue endpoint that is identical to /requeue. The name more
accurately reflects that it is for new modules as well as
reprocessing.

Update the worker status page to use the new endpoints.

We'll delete the old endpoints after deploying and changing the
scheduler jobs.

Updates b/158866584.

Change-Id: Id116bf9fd99fa55aaacd71bb4ca6b60770ca8812
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/239480
Reviewed-by: Julie Qiu <julie@golang.org>
diff --git a/content/static/html/worker/index.tmpl b/content/static/html/worker/index.tmpl
index a0a8866..b82b5c8 100644
--- a/content/static/html/worker/index.tmpl
+++ b/content/static/html/worker/index.tmpl
@@ -109,29 +109,29 @@
 </p>
 
 <div class="actions">
-  <form action="/poll-and-queue" method="post" name="queueForm">
-    <button title="Poll the module index for up to 2000 new versions, and enqueue them for processing."
-    onclick="submitForm('queueForm', false); return false">Enqueue From Module Index</button>
-    <input type="number" name="limit" value="10"></input>
-    <output name="result"></output>
-  </form>
-  <form action="/requeue" method="post" name="requeueForm">
-    <button title="Query the discovery database for failed versions, and re-queue them for processing."
-    onclick="submitForm('requeueForm', true); return false">Requeue Failed Versions</button>
-    <input type="number" name="limit" value="10">
-    <output name="result"></output>
-  </form>
-  <form action="/reprocess" method="post" name="reprocessForm">
-    <button title="Mark all versions created before the specified app_version to be reprocessed."
-    onclick="submitForm('reprocessForm', true); return false">Reprocess Versions</button>
-    <input type="text" name="app_version">
-    <output name="result"></output>
-  </form>
-  <form action="/populate-stdlib" method="post" name="populateStdlibForm">
-    <button title="Populates the database with all supported versions of the Go standard library."
-    onclick="submitForm('populateStdlibForm', false); return false">Populate Standard Library</button>
-    <output name="result"></output>
-  </form>
+	<form action="/poll" method="post" name="pollForm">
+		<button title="Poll the module index for up to 2000 new versions."
+      onclick="submitForm('pollForm', false); return false">Poll Module Index</button>
+		<input type="number" name="limit" value="10"></input>
+		<output name="result"></output>
+	</form>
+	<form action="/enqueue" method="post" name="enqueueForm">
+		<button title="Query the discovery database for new or failed versions, and enqueue them for processing."
+      onclick="submitForm('enqueueForm', true); return false">Enqueue New and Failed Versions</button>
+		<input type="number" name="limit" value="10">
+		<output name="result"></output>
+	</form>
+	<form action="/reprocess" method="post" name="reprocessForm">
+		<button title="Mark all versions created before the specified app_version to be reprocessed."
+      onclick="submitForm('reprocessForm', true); return false">Reprocess Versions</button>
+		<input type="text" name="app_version">
+		<output name="result"></output>
+	</form>
+	<form action="/populate-stdlib" method="post" name="populateStdlibForm">
+		<button title="Populates the database with all supported versions of the Go standard library."
+      onclick="submitForm('populateStdlibForm', false); return false">Populate Standard Library</button>
+		<output name="result"></output>
+	</form>
 </div>
 
 <div class="config">
diff --git a/internal/postgres/versionstate.go b/internal/postgres/versionstate.go
index f2bfc99..064fd9c 100644
--- a/internal/postgres/versionstate.go
+++ b/internal/postgres/versionstate.go
@@ -21,7 +21,7 @@
 )
 
 // InsertIndexVersions inserts new versions into the module_version_states
-// table.
+// table with a status of zero.
 func (db *DB) InsertIndexVersions(ctx context.Context, versions []*internal.IndexVersion) (err error) {
 	defer derrors.Wrap(&err, "InsertIndexVersions(ctx, %v)", versions)
 
diff --git a/internal/testing/integration/integration_test.go b/internal/testing/integration/integration_test.go
index afb1327..a185f40 100644
--- a/internal/testing/integration/integration_test.go
+++ b/internal/testing/integration/integration_test.go
@@ -113,7 +113,12 @@
 	frontendServer.Install(frontendMux.Handle, redisCacheClient)
 	frontendHTTP := httptest.NewServer(frontendMux)
 
-	if _, err := doGet(workerHTTP.URL + "/poll-and-queue"); err != nil {
+	if _, err := doGet(workerHTTP.URL + "/poll"); err != nil {
+		t.Fatal(err)
+	}
+	// TODO: This should really be made deterministic.
+	time.Sleep(100 * time.Millisecond)
+	if _, err := doGet(workerHTTP.URL + "/enqueue"); err != nil {
 		t.Fatal(err)
 	}
 	// TODO: This should really be made deterministic.
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 7097291..264efd8 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -98,21 +98,30 @@
 	if s.reportingClient != nil {
 		rmw = middleware.ErrorReporting(s.reportingClient.Report)
 	}
-	// cloud-scheduler: poll-and-queue polls the Module Index for new versions
+
+	// scheduled: poll polls the Module Index for new versions
+	// that have been published and inserts that metadata into
+	// module_version_states.
+	// This endpoint is intended to be invoked periodically by a scheduler.
+	// See the note about duplicate tasks for "/enqueue" below.
+	handle("/poll", rmw(s.errorHandler(s.handlePollIndex)))
+
+	// TODO: remove after /poll is in production and the scheduler jobs have been changed.
+	// scheduled: poll-and-queue polls the Module Index for new versions
 	// that have been published and inserts that metadata into
 	// module_version_states. It also inserts the version into the task-queue
 	// to to be fetched and processed.
-	// This endpoint is invoked by a Cloud Scheduler job.
+	// This endpoint is intended to be invoked periodically by a scheduler.
 	// See the note about duplicate tasks for "/requeue" below.
 	handle("/poll-and-queue", rmw(s.errorHandler(s.handleIndexAndQueue)))
 
-	// cloud-scheduler: update-imported-by-count updates the imported_by_count for packages
-	// in search_documents where imported_by_count_updated_at is null or
-	// imported_by_count_updated_at < version_updated_at.
-	// This endpoint is invoked by a Cloud Scheduler job.
+	// scheduled: update-imported-by-count update the imported_by_count for
+	// packages in search_documents where imported_by_count_updated_at is null
+	// or imported_by_count_updated_at < version_updated_at.
+	// This endpoint is intended to be invoked periodically by a scheduler.
 	handle("/update-imported-by-count", rmw(s.errorHandler(s.handleUpdateImportedByCount)))
 
-	// cloud-scheduler: download search document data and update the redis sorted
+	// scheduled: download search document data and update the redis sorted
 	// set(s) used in auto-completion.
 	handle("/update-redis-indexes", rmw(s.errorHandler(s.handleUpdateRedisIndexes)))
 
@@ -121,18 +130,30 @@
 	// request fails for any reason other than an http.StatusInternalServerError,
 	// it will return an http.StatusOK so that the task queue does not retry
 	// fetching module versions that have a terminal error.
-	// This endpoint is invoked by a Cloud Tasks queue.
+	// This endpoint is intended to be invoked by a task queue with semantics like
+	// Google Cloud Task Queues.
 	handle("/fetch/", http.StripPrefix("/fetch", http.HandlerFunc(s.handleFetch)))
 
-	// manual: requeue queries the module_version_states table for the next
+	// scheduled: enqueue queries the module_version_states table for the next
 	// batch of module versions to process, and enqueues them for processing.
 	// Normally this will not cause duplicate processing, because Cloud Tasks
 	// are de-duplicated. That does not apply after a task has been finished or
-	// deleted for one hour (see
+	// deleted for Server.taskIDChangeInterval (see
 	// https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#createtaskrequest,
-	// under "Task De-duplication"). If you cannot wait an hour, you can force
+	// under "Task De-duplication"). If you cannot wait, you can force
 	// duplicate tasks by providing any string as the "suffix" query parameter.
-	handle("/requeue", rmw(s.errorHandler(s.handleRequeue)))
+	handle("/enqueue", rmw(s.errorHandler(s.handleEnqueue)))
+
+	// TODO: remove after /queue is in production and the scheduler jobs have been changed.
+	// scheduled: requeue queries the module_version_states table for the next
+	// batch of module versions to process, and enqueues them for processing.
+	// Normally this will not cause duplicate processing, because Cloud Tasks
+	// are de-duplicated. That does not apply after a task has been finished or
+	// deleted for Server.taskIDChangeInterval (see
+	// https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#createtaskrequest,
+	// under "Task De-duplication"). If you cannot wait, you can force
+	// duplicate tasks by providing any string as the "suffix" query parameter.
+	handle("/requeue", rmw(s.errorHandler(s.handleEnqueue)))
 
 	// manual: reprocess sets a reprocess status for all records in the
 	// module_version_states table that were processed by an app_version that
@@ -175,7 +196,7 @@
 // handleRepopulateSearchDocuments repopulates every row in the search_documents table
 // that was last updated before the given time.
 func (s *Server) handleRepopulateSearchDocuments(w http.ResponseWriter, r *http.Request) error {
-	limit := parseIntParam(r, "limit", 100)
+	limit := parseLimitParam(r, 100)
 	beforeParam := r.FormValue("before")
 	if beforeParam == "" {
 		return &serverError{
@@ -272,10 +293,29 @@
 	return parts[0], parts[1], nil
 }
 
+func (s *Server) handlePollIndex(w http.ResponseWriter, r *http.Request) (err error) {
+	defer derrors.Wrap(&err, "handlePollIndex(%q)", r.URL.Path)
+	ctx := r.Context()
+	limit := parseLimitParam(r, 10)
+	since, err := s.db.LatestIndexTimestamp(ctx)
+	if err != nil {
+		return err
+	}
+	versions, err := s.indexClient.GetVersions(ctx, since, limit)
+	if err != nil {
+		return err
+	}
+	if err := s.db.InsertIndexVersions(ctx, versions); err != nil {
+		return err
+	}
+	log.Infof(ctx, "Inserted %d modules from the index", len(versions))
+	return nil
+}
+
 func (s *Server) handleIndexAndQueue(w http.ResponseWriter, r *http.Request) (err error) {
 	defer derrors.Wrap(&err, "handleIndexAndQueue(%q)", r.URL.Path)
 	ctx := r.Context()
-	limit := parseIntParam(r, "limit", 10)
+	limit := parseLimitParam(r, 10)
 	suffixParam := r.FormValue("suffix")
 	since, err := s.db.LatestIndexTimestamp(ctx)
 	if err != nil {
@@ -303,13 +343,13 @@
 	return nil
 }
 
-// handleRequeue queries the module_version_states table for the next
-// batch of module versions to process, and enqueues them for processing.  Note
-// that this may cause duplicate processing.
-func (s *Server) handleRequeue(w http.ResponseWriter, r *http.Request) (err error) {
-	defer derrors.Wrap(&err, "handleRequeue(%q)", r.URL.Path)
+// handleEnqueue queries the module_version_states table for the next batch of
+// module versions to process, and enqueues them for processing. Note that this
+// may cause duplicate processing.
+func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) (err error) {
+	defer derrors.Wrap(&err, "handleEnqueue(%q)", r.URL.Path)
 	ctx := r.Context()
-	limit := parseIntParam(r, "limit", 10)
+	limit := parseLimitParam(r, 10)
 	suffixParam := r.FormValue("suffix") // append to task name to avoid deduplication
 	span := trace.FromContext(r.Context())
 	span.Annotate([]trace.Attribute{trace.Int64Attribute("limit", int64(limit))}, "processed limit")
@@ -320,13 +360,13 @@
 
 	span.Annotate([]trace.Attribute{trace.Int64Attribute("versions to fetch", int64(len(versions)))}, "processed limit")
 	w.Header().Set("Content-Type", "text/plain")
-	log.Infof(ctx, "Scheduling modules to be fetched: requeuing %d modules", len(versions))
+	log.Infof(ctx, "Scheduling modules to be fetched: queuing %d modules", len(versions))
 	for _, v := range versions {
 		if err := s.queue.ScheduleFetch(ctx, v.ModulePath, v.Version, suffixParam, s.taskIDChangeInterval); err != nil {
 			return err
 		}
 	}
-	log.Infof(ctx, "Successfully scheduled modules to be fetched: %d modules requeued", len(versions))
+	log.Infof(ctx, "Successfully scheduled modules to be fetched: %d modules queued", len(versions))
 
 	return nil
 }
@@ -573,10 +613,11 @@
 	return t.In(locNewYork).Format("2006-01-02 15:04:05")
 }
 
-// parseIntParam parses the query parameter with name as in integer. If the
+// parseLimitParam parses the query parameter with name as in integer. If the
 // parameter is missing or there is a parse error, it is logged and the default
 // value is returned.
-func parseIntParam(r *http.Request, name string, defaultValue int) int {
+func parseLimitParam(r *http.Request, defaultValue int) int {
+	const name = "limit"
 	param := r.FormValue(name)
 	if param == "" {
 		return defaultValue
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index 372428a..bc92630 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -89,11 +89,11 @@
 		}
 		state = func(version *internal.IndexVersion, code, tryCount, numPackages int) *internal.ModuleVersionState {
 			goModPath := version.Path
-			if code >= 300 {
+			if code == 0 || code >= 300 {
 				goModPath = ""
 			}
 			var n *int
-			if code != http.StatusNotFound {
+			if code != 0 && code != http.StatusNotFound {
 				n = &numPackages
 			}
 			return &internal.ModuleVersionState{
@@ -123,11 +123,22 @@
 		wantBar  *internal.ModuleVersionState
 	}{
 		{
+			label: "poll only",
+			index: []*internal.IndexVersion{fooIndex, barIndex},
+			proxy: []*proxy.TestModule{fooProxy, barProxy},
+			requests: []*http.Request{
+				httptest.NewRequest("POST", "/poll", nil),
+			},
+			wantFoo: fooState(0, 0),
+			wantBar: barState(0, 0),
+		},
+		{
 			label: "full fetch",
 			index: []*internal.IndexVersion{fooIndex, barIndex},
 			proxy: []*proxy.TestModule{fooProxy, barProxy},
 			requests: []*http.Request{
-				httptest.NewRequest("POST", "/poll-and-queue", nil),
+				httptest.NewRequest("POST", "/poll", nil),
+				httptest.NewRequest("POST", "/enqueue", nil),
 			},
 			wantFoo: fooState(http.StatusOK, 1),
 			wantBar: barState(http.StatusOK, 1),
@@ -136,7 +147,8 @@
 			index: []*internal.IndexVersion{fooIndex, barIndex},
 			proxy: []*proxy.TestModule{fooProxy, barProxy},
 			requests: []*http.Request{
-				httptest.NewRequest("POST", "/poll-and-queue?limit=1", nil),
+				httptest.NewRequest("POST", "/poll?limit=1", nil),
+				httptest.NewRequest("POST", "/enqueue", nil),
 			},
 			wantFoo: fooState(http.StatusOK, 1),
 		}, {
@@ -144,7 +156,8 @@
 			index: []*internal.IndexVersion{fooIndex, barIndex},
 			proxy: []*proxy.TestModule{fooProxy},
 			requests: []*http.Request{
-				httptest.NewRequest("POST", "/poll-and-queue", nil),
+				httptest.NewRequest("POST", "/poll", nil),
+				httptest.NewRequest("POST", "/enqueue", nil),
 			},
 			wantFoo: fooState(http.StatusOK, 1),
 			wantBar: barState(http.StatusNotFound, 1),
@@ -238,7 +251,7 @@
 		{"312", 312},
 		{"bad", -1},
 	} {
-		got := parseIntParam(httptest.NewRequest("GET", fmt.Sprintf("/foo?x=%s", test.in), nil), "x", -1)
+		got := parseLimitParam(httptest.NewRequest("GET", fmt.Sprintf("/foo?limit=%s", test.in), nil), -1)
 		if got != test.want {
 			t.Errorf("%q: got %d, want %d", test.in, got, test.want)
 		}