blob: 064fd9ccce865af7ecdecd8faa7b7e04c681d5a3 [file] [log] [blame]
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package postgres
import (
"context"
"database/sql"
"fmt"
"net/http"
"sort"
"time"
"github.com/lib/pq"
"go.opencensus.io/trace"
"golang.org/x/pkgsite/internal"
"golang.org/x/pkgsite/internal/database"
"golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/version"
)
// InsertIndexVersions inserts new versions into the module_version_states
// table with a status of zero.
func (db *DB) InsertIndexVersions(ctx context.Context, versions []*internal.IndexVersion) (err error) {
defer derrors.Wrap(&err, "InsertIndexVersions(ctx, %v)", versions)
var vals []interface{}
for _, v := range versions {
vals = append(vals, v.Path, v.Version, version.ForSorting(v.Version), v.Timestamp, 0, "", "")
}
cols := []string{"module_path", "version", "sort_version", "index_timestamp", "status", "error", "go_mod_path"}
conflictAction := `
ON CONFLICT
(module_path, version)
DO UPDATE SET
index_timestamp=excluded.index_timestamp,
next_processed_after=CURRENT_TIMESTAMP`
return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
return tx.BulkInsert(ctx, "module_version_states", cols, vals, conflictAction)
})
}
// UpsertModuleVersionState inserts or updates the module_version_state table with
// the results of a fetch operation for a given module version.
func (db *DB) UpsertModuleVersionState(ctx context.Context, modulePath, vers, appVersion string, timestamp time.Time, status int, goModPath string, fetchErr error, packageVersionStates []*internal.PackageVersionState) (err error) {
defer derrors.Wrap(&err, "UpsertModuleVersionState(ctx, %q, %q, %q, %s, %d, %q, %v",
modulePath, vers, appVersion, timestamp, status, goModPath, fetchErr)
ctx, span := trace.StartSpan(ctx, "UpsertModuleVersionState")
defer span.End()
var numPackages *int
if !(status >= http.StatusBadRequest && status <= http.StatusNotFound) {
// If a module was fetched a 40x error in this range, we won't know how
// many packages it has.
n := len(packageVersionStates)
numPackages = &n
}
return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
if err := upsertModuleVersionState(ctx, tx, modulePath, vers, appVersion, numPackages, timestamp, status, goModPath, fetchErr); err != nil {
return err
}
if len(packageVersionStates) == 0 {
return nil
}
return upsertPackageVersionStates(ctx, tx, packageVersionStates)
})
}
func upsertModuleVersionState(ctx context.Context, db *database.DB, modulePath, vers, appVersion string, numPackages *int, timestamp time.Time, status int, goModPath string, fetchErr error) (err error) {
defer derrors.Wrap(&err, "upsertModuleVersionState(ctx, %q, %q, %q, %s, %d, %q, %v",
modulePath, vers, appVersion, timestamp, status, goModPath, fetchErr)
ctx, span := trace.StartSpan(ctx, "upsertModuleVersionState")
defer span.End()
var sqlErrorMsg string
if fetchErr != nil {
sqlErrorMsg = fetchErr.Error()
}
result, err := db.Exec(ctx, `
INSERT INTO module_version_states AS mvs (
module_path,
version,
sort_version,
app_version,
index_timestamp,
status,
go_mod_path,
error,
num_packages)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (module_path, version)
DO UPDATE
SET
app_version=excluded.app_version,
status=excluded.status,
go_mod_path=excluded.go_mod_path,
error=excluded.error,
num_packages=excluded.num_packages,
try_count=mvs.try_count+1,
last_processed_at=CURRENT_TIMESTAMP,
-- back off exponentially until 1 hour, then at constant 1-hour intervals
next_processed_after=CASE
WHEN mvs.last_processed_at IS NULL THEN
CURRENT_TIMESTAMP + INTERVAL '1 minute'
WHEN 2*(mvs.next_processed_after - mvs.last_processed_at) < INTERVAL '1 hour' THEN
CURRENT_TIMESTAMP + 2*(mvs.next_processed_after - mvs.last_processed_at)
ELSE
CURRENT_TIMESTAMP + INTERVAL '1 hour'
END;`,
modulePath, vers, version.ForSorting(vers),
appVersion, timestamp, status, goModPath, sqlErrorMsg, numPackages)
if err != nil {
return err
}
affected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("result.RowsAffected(): %v", err)
}
if affected != 1 {
return fmt.Errorf("module version state update affected %d rows, expected exactly 1", affected)
}
return nil
}
func upsertPackageVersionStates(ctx context.Context, db *database.DB, packageVersionStates []*internal.PackageVersionState) (err error) {
defer derrors.Wrap(&err, "upsertPackageVersionStates")
ctx, span := trace.StartSpan(ctx, "upsertPackageVersionStates")
defer span.End()
sort.Slice(packageVersionStates, func(i, j int) bool {
return packageVersionStates[i].PackagePath < packageVersionStates[j].PackagePath
})
var vals []interface{}
for _, pvs := range packageVersionStates {
vals = append(vals, pvs.PackagePath, pvs.ModulePath, pvs.Version, pvs.Status, pvs.Error)
}
return db.BulkInsert(ctx, "package_version_states",
[]string{
"package_path",
"module_path",
"version",
"status",
"error",
},
vals,
`ON CONFLICT (module_path, package_path, version)
DO UPDATE
SET
package_path=excluded.package_path,
module_path=excluded.module_path,
version=excluded.version,
status=excluded.status,
error=excluded.error`)
}
// LatestIndexTimestamp returns the last timestamp successfully inserted into
// the module_version_states table.
func (db *DB) LatestIndexTimestamp(ctx context.Context) (_ time.Time, err error) {
defer derrors.Wrap(&err, "LatestIndexTimestamp(ctx)")
query := `SELECT index_timestamp
FROM module_version_states
ORDER BY index_timestamp DESC
LIMIT 1`
var ts time.Time
row := db.db.QueryRow(ctx, query)
switch err := row.Scan(&ts); err {
case sql.ErrNoRows:
return time.Time{}, nil
case nil:
return ts, nil
default:
return time.Time{}, err
}
}
const moduleVersionStateColumns = `
module_path,
version,
index_timestamp,
created_at,
status,
error,
try_count,
last_processed_at,
next_processed_after,
app_version,
go_mod_path,
num_packages`
// scanModuleVersionState constructs an *internal.ModuleModuleVersionState from the given
// scanner. It expects columns to be in the order of moduleVersionStateColumns.
func scanModuleVersionState(scan func(dest ...interface{}) error) (*internal.ModuleVersionState, error) {
var (
v internal.ModuleVersionState
lastProcessedAt pq.NullTime
numPackages sql.NullInt64
)
if err := scan(&v.ModulePath, &v.Version, &v.IndexTimestamp, &v.CreatedAt, &v.Status, &v.Error,
&v.TryCount, &v.LastProcessedAt, &v.NextProcessedAfter, &v.AppVersion, &v.GoModPath, &numPackages); err != nil {
return nil, err
}
if lastProcessedAt.Valid {
lp := lastProcessedAt.Time
v.LastProcessedAt = &lp
}
if numPackages.Valid {
n := int(numPackages.Int64)
v.NumPackages = &n
}
return &v, nil
}
// queryModuleVersionStates executes a query for ModuleModuleVersionState rows. It expects the
// given queryFormat be a format specifier with exactly one argument: a %s verb
// for the query columns.
func (db *DB) queryModuleVersionStates(ctx context.Context, queryFormat string, args ...interface{}) ([]*internal.ModuleVersionState, error) {
query := fmt.Sprintf(queryFormat, moduleVersionStateColumns)
rows, err := db.db.Query(ctx, query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var versions []*internal.ModuleVersionState
for rows.Next() {
v, err := scanModuleVersionState(rows.Scan)
if err != nil {
return nil, fmt.Errorf("rows.Scan(): %v", err)
}
versions = append(versions, v)
}
return versions, nil
}
// GetRecentFailedVersions returns versions that have most recently failed.
func (db *DB) GetRecentFailedVersions(ctx context.Context, limit int) (_ []*internal.ModuleVersionState, err error) {
defer derrors.Wrap(&err, "GetRecentFailedVersions(ctx, %d)", limit)
queryFormat := `
SELECT %s
FROM
module_version_states
WHERE status=500
ORDER BY last_processed_at DESC
LIMIT $1`
return db.queryModuleVersionStates(ctx, queryFormat, limit)
}
// GetRecentVersions returns recent versions that have been processed.
func (db *DB) GetRecentVersions(ctx context.Context, limit int) (_ []*internal.ModuleVersionState, err error) {
defer derrors.Wrap(&err, "GetRecentVersions(ctx, %d)", limit)
queryFormat := `
SELECT %s
FROM
module_version_states
ORDER BY created_at DESC
LIMIT $1`
return db.queryModuleVersionStates(ctx, queryFormat, limit)
}
// GetModuleVersionState returns the current module version state for
// modulePath and version.
func (db *DB) GetModuleVersionState(ctx context.Context, modulePath, version string) (_ *internal.ModuleVersionState, err error) {
defer derrors.Wrap(&err, "GetModuleVersionState(ctx, %q, %q)", modulePath, version)
query := fmt.Sprintf(`
SELECT %s
FROM
module_version_states
WHERE
module_path = $1
AND version = $2;`, moduleVersionStateColumns)
row := db.db.QueryRow(ctx, query, modulePath, version)
v, err := scanModuleVersionState(row.Scan)
switch err {
case nil:
return v, nil
case sql.ErrNoRows:
return nil, derrors.NotFound
default:
return nil, fmt.Errorf("row.Scan(): %v", err)
}
}
// GetPackageVersionStatesForModule returns the current package version states
// for modulePath and version.
func (db *DB) GetPackageVersionStatesForModule(ctx context.Context, modulePath, version string) (_ []*internal.PackageVersionState, err error) {
defer derrors.Wrap(&err, "GetPackageVersionState(ctx, %q, %q)", modulePath, version)
query := `
SELECT
package_path,
module_path,
version,
status,
error
FROM
package_version_states
WHERE
module_path = $1
AND version = $2;`
var states []*internal.PackageVersionState
collect := func(rows *sql.Rows) error {
var s internal.PackageVersionState
if err := rows.Scan(&s.PackagePath, &s.ModulePath, &s.Version,
&s.Status, &s.Error); err != nil {
return fmt.Errorf("rows.Scan(): %v", err)
}
states = append(states, &s)
return nil
}
if err := db.db.RunQuery(ctx, query, collect, modulePath, version); err != nil {
return nil, err
}
return states, nil
}
// GetPackageVersionState returns the current package version state for
// pkgPath, modulePath and version.
func (db *DB) GetPackageVersionState(ctx context.Context, pkgPath, modulePath, version string) (_ *internal.PackageVersionState, err error) {
defer derrors.Wrap(&err, "GetPackageVersionState(ctx, %q, %q, %q)", pkgPath, modulePath, version)
query := `
SELECT
package_path,
module_path,
version,
status,
error
FROM
package_version_states
WHERE
package_path = $1
AND module_path = $2
AND version = $3;`
var pvs internal.PackageVersionState
err = db.db.QueryRow(ctx, query, pkgPath, modulePath, version).Scan(
&pvs.PackagePath, &pvs.ModulePath, &pvs.Version,
&pvs.Status, &pvs.Error)
switch err {
case nil:
return &pvs, nil
case sql.ErrNoRows:
return nil, derrors.NotFound
default:
return nil, fmt.Errorf("row.Scan(): %v", err)
}
}
// VersionStats holds statistics extracted from the module_version_states
// table.
type VersionStats struct {
LatestTimestamp time.Time
VersionCounts map[int]int // from status to number of rows
}
// GetVersionStats queries the module_version_states table for aggregate
// information about the current state of module versions, grouping them by
// their current status code.
func (db *DB) GetVersionStats(ctx context.Context) (_ *VersionStats, err error) {
defer derrors.Wrap(&err, "GetVersionStats(ctx)")
query := `
SELECT
status,
max(index_timestamp),
count(*)
FROM
module_version_states
GROUP BY status;`
stats := &VersionStats{
VersionCounts: make(map[int]int),
}
err = db.db.RunQuery(ctx, query, func(rows *sql.Rows) error {
var (
status sql.NullInt64
indexTimestamp time.Time
count int
)
if err := rows.Scan(&status, &indexTimestamp, &count); err != nil {
return fmt.Errorf("row.Scan(): %v", err)
}
if indexTimestamp.After(stats.LatestTimestamp) {
stats.LatestTimestamp = indexTimestamp
}
stats.VersionCounts[int(status.Int64)] = count
return nil
})
if err != nil {
return nil, err
}
return stats, nil
}