blob: 14cabb705003d99efa643e47be99efbab6aa5b27 [file] [log] [blame]
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package postgres
import (
"context"
"database/sql"
"fmt"
"net/http"
"strconv"
"golang.org/x/pkgsite/internal"
"golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/log"
)
// UpdateModuleVersionStatesForReprocessing marks modules to be reprocessed
// that were processed prior to the provided appVersion.
func (db *DB) UpdateModuleVersionStatesForReprocessing(ctx context.Context, appVersion string) (err error) {
defer derrors.Wrap(&err, "UpdateModuleVersionStatesForReprocessing(ctx, %q)", appVersion)
for _, status := range []int{
http.StatusOK,
derrors.ToHTTPStatus(derrors.HasIncompletePackages),
derrors.ToHTTPStatus(derrors.BadModule),
derrors.ToHTTPStatus(derrors.AlternativeModule),
} {
query := `UPDATE module_version_states
SET
status = $2,
next_processed_after = CURRENT_TIMESTAMP,
last_processed_at = NULL
WHERE
app_version < $1
AND status = $3;`
result, err := db.db.Exec(ctx, query, appVersion,
derrors.ToReprocessStatus(status), status)
if err != nil {
return err
}
affected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("result.RowsAffected(): %v", err)
}
log.Infof(ctx,
"Updated module_version_states with status=%d and app_version < %q to status=%d; %d affected",
status, appVersion, derrors.ToReprocessStatus(status), affected)
}
return nil
}
// largeModulePackageThresold represents the package threshold at which it
// becomes difficult to process packages. Modules with more than this number
// of packages are generally different versions or forks of kubernetes,
// aws-sdk-go, azure-sdk-go, and bilibili.
const largeModulePackageThreshold = 1500
// largeModulesLimit represents the number of large modules that we are
// willing to enqueue at a given time.
// var for testing.
var largeModulesLimit = 100
// GetNextModulesToFetch returns the next batch of modules that need to be
// processed. We prioritize modules based on (1) whether it has status zero
// (never processed), (2) whether it is the latest version, (3) if it is an
// alternative module, and (4) the number of packages it has. We want to leave
// time-consuming modules until the end and process them at a slower rate to
// reduce database load and timeouts. We also want to leave alternative modules
// towards the end, since these will incur unnecessary deletes otherwise.
func (db *DB) GetNextModulesToFetch(ctx context.Context, limit int) (_ []*internal.ModuleVersionState, err error) {
defer derrors.Wrap(&err, "GetNextModulesToFetch(ctx, %d)", limit)
var mvs []*internal.ModuleVersionState
query := fmt.Sprintf(nextModulesToProcessQuery, moduleVersionStateColumns)
collect := func(rows *sql.Rows) error {
// Scan the last two columns separately; they are in the query only for sorting.
scan := func(dests ...interface{}) error {
var latest bool
var npkg int
return rows.Scan(append(dests, &latest, &npkg)...)
}
mv, err := scanModuleVersionState(scan)
if err != nil {
return err
}
mvs = append(mvs, mv)
return nil
}
if err := db.db.RunQuery(ctx, query, collect, largeModulePackageThreshold, limit); err != nil {
return nil, err
}
if len(mvs) == 0 {
log.Infof(ctx, "No modules to requeue")
} else {
fmtIntp := func(p *int) string {
if p == nil {
return "NULL"
}
return strconv.Itoa(*p)
}
start := mvs[0]
end := mvs[len(mvs)-1]
pkgRange := fmt.Sprintf("%s <= num_packages <= %s", fmtIntp(start.NumPackages), fmtIntp(end.NumPackages))
log.Infof(ctx, "GetNextModulesToFetch: num_modules=%d; %s; start_module=%q; end_module=%q",
len(mvs), pkgRange,
fmt.Sprintf("%s/@v/%s", start.ModulePath, start.Version),
fmt.Sprintf("%s/@v/%s", end.ModulePath, end.Version))
}
// Don't return more than largeModulesLimit of modules that have more than
// largeModulePackageThreshold packages.
nLarge := 0
for i, m := range mvs {
if m.NumPackages != nil && *m.NumPackages >= largeModulePackageThreshold {
nLarge++
}
if nLarge > largeModulesLimit {
return mvs[:i], nil
}
}
return mvs, nil
}
const nextModulesToProcessQuery = `
-- Make a table of the latest versions of each module.
WITH latest_versions AS (
SELECT DISTINCT ON (module_path) module_path, version
FROM module_version_states
ORDER BY
module_path,
right(sort_version, 1) = '~' DESC, -- prefer release versions
sort_version DESC
)
SELECT %s, latest, npkg
FROM (
SELECT
%[1]s,
((module_path, version) IN (SELECT * FROM latest_versions)) AS latest,
COALESCE(num_packages, 0) AS npkg
FROM module_version_states
) s
WHERE next_processed_after < CURRENT_TIMESTAMP
AND (status = 0 OR status >= 500)
ORDER BY
CASE
-- new modules
WHEN status = 0 THEN 0
WHEN npkg < $1 THEN -- non-large modules
CASE
WHEN latest THEN -- latest version
CASE
-- with ReprocessStatusOK or ReprocessHasIncompletePackages
WHEN status = 520 OR status = 521 THEN 1
-- with ReprocessBadModule or ReprocessAlternative
WHEN status = 540 OR status = 541 THEN 2
ELSE 9
END
ELSE -- non-latest version
CASE
WHEN status = 520 OR status = 521 THEN 3
WHEN status = 540 OR status = 541 THEN 4
ELSE 9
END
END
ELSE -- large modules
CASE
WHEN latest THEN -- latest version
CASE
WHEN status = 520 OR status = 521 THEN 5
WHEN status = 540 OR status = 541 THEN 6
ELSE 9
END
ELSE -- non-latest version
CASE
WHEN status = 520 OR status = 521 THEN 7
WHEN status = 540 OR status = 541 THEN 8
ELSE 9
END
END
END,
npkg, -- prefer fewer packages
module_path,
version -- for reproducibility
LIMIT $2
`