blob: 0b0da999de6710217cd3b27541279346efb2b906 [file] [log] [blame]
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package postgres
import (
"context"
"database/sql"
"fmt"
"net/http"
"strconv"
"golang.org/x/pkgsite/internal"
"golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/log"
)
// UpdateModuleVersionStatesForReprocessing marks modules to be reprocessed
// that were processed prior to the provided appVersion.
func (db *DB) UpdateModuleVersionStatesForReprocessing(ctx context.Context, appVersion string) (err error) {
defer derrors.Wrap(&err, "UpdateModuleVersionStatesForReprocessing(ctx, %q)", appVersion)
for _, status := range []int{
http.StatusOK,
derrors.ToHTTPStatus(derrors.HasIncompletePackages),
derrors.ToHTTPStatus(derrors.BadModule),
derrors.ToHTTPStatus(derrors.AlternativeModule),
} {
query := `UPDATE module_version_states
SET
status = $2,
next_processed_after = CURRENT_TIMESTAMP,
last_processed_at = NULL
WHERE
app_version < $1
AND status = $3;`
result, err := db.db.Exec(ctx, query, appVersion,
derrors.ToReprocessStatus(status), status)
if err != nil {
return err
}
affected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("result.RowsAffected(): %v", err)
}
log.Infof(ctx,
"Updated module_version_states with status=%d and app_version < %q to status=%d; %d affected",
status, appVersion, derrors.ToReprocessStatus(status), affected)
}
return nil
}
var (
// largeModulePackageThresold represents the package threshold at which it
// becomes difficult to process packages. Modules with more than this number
// of packages are generally different versions or forks of kubernetes,
// aws-sdk-go, azure-sdk-go, and bilibili.
largeModulePackageThreshold = 1500
// largeModulesLimit represents the number of large modules that we are
// willing to enqueue at a given time.
largeModulesLimit = 100
)
// GetNextModulesToFetch returns the next batch of modules that need to be
// processed. We prioritize modules based on (1) whether it is the latest version,
// (2) if it is an alternative module, and (3) the number of packages it has.
// We want to leave time-consuming modules until the end and process them at
// a slower rate to reduce database load and timeouts. We also want to leave
// alternative modules towards the end, since these will incur unnecessary
// deletes otherwise.
func (db *DB) GetNextModulesToFetch(ctx context.Context, limit int) (_ []*internal.ModuleVersionState, err error) {
defer derrors.Wrap(&err, "GetNextModulesToFetch(ctx, %d)", limit)
var mvs []*internal.ModuleVersionState
for _, next := range []struct {
query string
statuses []int
}{
{
query: getLatestModuleVersionStates,
statuses: []int{
derrors.ToHTTPStatus(derrors.ReprocessStatusOK),
derrors.ToHTTPStatus(derrors.ReprocessHasIncompletePackages),
},
},
{
query: getLatestModuleVersionStates,
statuses: []int{
derrors.ToHTTPStatus(derrors.ReprocessBadModule),
derrors.ToHTTPStatus(derrors.ReprocessAlternative),
},
},
{
query: getModuleVersionStates,
statuses: []int{
derrors.ToHTTPStatus(derrors.ReprocessStatusOK),
derrors.ToHTTPStatus(derrors.ReprocessHasIncompletePackages),
},
},
{
query: getModuleVersionStates,
statuses: []int{
derrors.ToHTTPStatus(derrors.ReprocessBadModule),
derrors.ToHTTPStatus(derrors.ReprocessAlternative),
},
},
{
query: getModuleVersionStatesRemainder,
},
} {
query := constructRequeueQuery(next.query, next.statuses)
collect := func(rows *sql.Rows) error {
mv, err := scanModuleVersionState(rows.Scan)
if err != nil {
return err
}
mvs = append(mvs, mv)
return nil
}
if err := db.db.RunQuery(ctx, query, collect, limit); err != nil {
return nil, err
}
if len(mvs) > 0 {
var msg string
switch next.query {
case getModuleVersionStatesRemainder:
msg = fmt.Sprintf("modules with status=0 or status=500 or num_packages > %d", largeModulePackageThreshold)
if len(mvs) > largeModulesLimit {
mvs = mvs[:largeModulesLimit]
}
case getLatestModuleVersionStates:
msg = "latest version of modules"
default:
msg = "non-latest version of modules"
}
fmtIntp := func(p *int) string {
if p == nil {
return "NULL"
}
return strconv.Itoa(*p)
}
start := mvs[0]
end := mvs[len(mvs)-1]
pkgRange := fmt.Sprintf("%s <= num_packages <= %s", fmtIntp(start.NumPackages), fmtIntp(end.NumPackages))
log.Infof(ctx, fmt.Sprintf("GetNextModulesToFetch (%s): num_modules=%d; statuses=%v; %s; start_module=%q; end_module=%q",
msg, len(mvs), next.statuses, pkgRange,
fmt.Sprintf("%s/@v/%s", start.ModulePath, start.Version),
fmt.Sprintf("%s/@v/%s", end.ModulePath, end.Version)))
return mvs, nil
}
}
log.Infof(ctx, "No modules to requeue")
return mvs, nil
}
func constructRequeueQuery(baseQuery string, statuses []int) string {
where := "WHERE next_processed_after < CURRENT_TIMESTAMP"
if baseQuery != getModuleVersionStatesRemainder {
where += fmt.Sprintf(" AND COALESCE(num_packages, 0) < %d", largeModulePackageThreshold)
var s string
for i, status := range statuses {
s += fmt.Sprintf("status=%d", status)
if i < len(statuses)-1 {
s += " OR "
}
}
where += fmt.Sprintf(" AND (%s)", s)
} else {
where += " AND (status >= 500 OR status=0)"
}
query := fmt.Sprintf(baseQuery, moduleVersionStateColumns, where)
return query
}
// Get the latest versions of modules that previously
// returned a 20x status; process them in order of
// number of packages.
//
// We also want to prefer release to non-release
// versions. A sort_version will end in '~' if it is a
// release, and that is larger than any other character
// that can occur in a sort_version.
// So if we sort first by the last character in
// sort_version, then by sort_version itself, we will
// get releases before non-releases. To implement that
// two-level ordering in a MAX, we construct an array
// of the two strings.
// Arrays are ordered lexicographically, so MAX will do
// just what we want.
const getLatestModuleVersionStates = `
SELECT %s
FROM (
SELECT s.*
FROM module_version_states s
INNER JOIN (
SELECT module_path,
MAX(ARRAY[right(sort_version, 1), sort_version]) AS mv
FROM module_version_states
GROUP BY 1
) m
ON
s.module_path = m.module_path
AND s.sort_version = m.mv[2]
-- WHERE clause
%s
ORDER BY
num_packages,
sort_version DESC,
module_path
LIMIT $1
) foo`
// Get non-latest versions to be reprocessed.
// Start with modules that previously succeeded, then
// move onto alternative modules.
const getModuleVersionStates = `
SELECT %s
FROM module_version_states
-- WHERE clause
%s
ORDER BY
num_packages,
sort_version DESC,
module_path
LIMIT $1`
const getModuleVersionStatesRemainder = `
SELECT %s
FROM module_version_states
-- WHERE clause
%s
ORDER BY
CASE WHEN status=0 THEN 0
WHEN (status=520 OR status=521) THEN 1
WHEN (status=540 OR status=541) THEN 2
ELSE 3 END,
COALESCE(num_packages, 0),
sort_version DESC,
module_path
LIMIT $1`