| // Copyright 2019 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package postgres |
| |
| import ( |
| "context" |
| "database/sql" |
| "fmt" |
| "net/http" |
| "sort" |
| "time" |
| |
| "github.com/lib/pq" |
| "go.opencensus.io/trace" |
| "golang.org/x/pkgsite/internal" |
| "golang.org/x/pkgsite/internal/database" |
| "golang.org/x/pkgsite/internal/derrors" |
| "golang.org/x/pkgsite/internal/version" |
| ) |
| |
| // InsertIndexVersions inserts new versions into the module_version_states |
| // table with a status of zero. |
| func (db *DB) InsertIndexVersions(ctx context.Context, versions []*internal.IndexVersion) (err error) { |
| defer derrors.WrapStack(&err, "InsertIndexVersions(ctx, %v)", versions) |
| var vals []interface{} |
| for _, v := range versions { |
| vals = append(vals, v.Path, v.Version, version.ForSorting(v.Version), v.Timestamp, 0, "", "", version.IsIncompatible(v.Version)) |
| } |
| cols := []string{"module_path", "version", "sort_version", "index_timestamp", "status", "error", "go_mod_path", "incompatible"} |
| conflictAction := ` |
| ON CONFLICT |
| (module_path, version) |
| DO UPDATE SET |
| index_timestamp=excluded.index_timestamp, |
| next_processed_after=CURRENT_TIMESTAMP |
| ` |
| return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error { |
| var updates [][2]string // (module_path, version) to update status |
| err := tx.BulkInsertReturning(ctx, "module_version_states", cols, vals, conflictAction, |
| []string{"module_path", "version", "status"}, |
| func(rows *sql.Rows) error { |
| var ( |
| mod, ver string |
| status int |
| ) |
| if err := rows.Scan(&mod, &ver, &status); err != nil { |
| return err |
| } |
| // Update a module's status to 0 if it wasn't found previously. |
| // See https://golang.org/issue/46117. |
| if status == http.StatusNotFound { |
| updates = append(updates, [2]string{mod, ver}) |
| } |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| // We don't have a BulkUpdate function that works for us here |
| // (database.BulkUpdate can use only one column as a key). But we expect |
| // very few of these, so it's fine to run them individually. |
| for _, mv := range updates { |
| _, err = tx.Exec(ctx, ` |
| UPDATE module_version_states |
| SET status = 0 |
| WHERE module_path = $1 AND version = $2 |
| `, mv[0], mv[1]) |
| if err != nil { |
| return err |
| } |
| } |
| return nil |
| }) |
| } |
| |
| type ModuleVersionStateForUpsert struct { |
| ModulePath string |
| Version string |
| AppVersion string |
| Timestamp time.Time |
| Status int |
| HasGoMod bool |
| GoModPath string |
| FetchErr error |
| PackageVersionStates []*internal.PackageVersionState |
| } |
| |
| // UpsertModuleVersionState inserts or updates the module_version_state table with |
| // the results of a fetch operation for a given module version. |
| func (db *DB) UpsertModuleVersionState(ctx context.Context, mvs *ModuleVersionStateForUpsert) (err error) { |
| defer derrors.WrapStack(&err, "UpsertModuleVersionState(ctx, %s@%s)", mvs.ModulePath, mvs.Version) |
| ctx, span := trace.StartSpan(ctx, "UpsertModuleVersionState") |
| defer span.End() |
| |
| var numPackages *int |
| if !(mvs.Status >= http.StatusBadRequest && mvs.Status <= http.StatusNotFound) { |
| // If a module was fetched a 40x error in this range, we won't know how |
| // many packages it has. |
| n := len(mvs.PackageVersionStates) |
| numPackages = &n |
| } |
| |
| return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error { |
| if err := upsertModuleVersionState(ctx, tx, numPackages, mvs); err != nil { |
| return err |
| } |
| // Sync modules.status if the module exists in the modules table. |
| if err := updateModulesStatus(ctx, tx, mvs.ModulePath, mvs.Version, mvs.Status); err != nil { |
| return err |
| } |
| if len(mvs.PackageVersionStates) == 0 { |
| return nil |
| } |
| return upsertPackageVersionStates(ctx, tx, mvs.PackageVersionStates) |
| }) |
| } |
| |
| func upsertModuleVersionState(ctx context.Context, db *database.DB, numPackages *int, mvs *ModuleVersionStateForUpsert) (err error) { |
| defer derrors.WrapStack(&err, "upsertModuleVersionState(%q, %q, ...)", mvs.ModulePath, mvs.Version) |
| ctx, span := trace.StartSpan(ctx, "upsertModuleVersionState") |
| defer span.End() |
| |
| var sqlErrorMsg string |
| if mvs.FetchErr != nil { |
| sqlErrorMsg = mvs.FetchErr.Error() |
| } |
| |
| affected, err := db.Exec(ctx, ` |
| INSERT INTO module_version_states AS mvs ( |
| module_path, |
| version, |
| sort_version, |
| app_version, |
| index_timestamp, |
| status, |
| has_go_mod, |
| go_mod_path, |
| error, |
| num_packages, |
| incompatible) |
| VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) |
| ON CONFLICT (module_path, version) |
| DO UPDATE |
| SET |
| app_version=excluded.app_version, |
| status=excluded.status, |
| has_go_mod=excluded.has_go_mod, |
| go_mod_path=excluded.go_mod_path, |
| error=excluded.error, |
| num_packages=excluded.num_packages, |
| try_count=mvs.try_count+1, |
| last_processed_at=CURRENT_TIMESTAMP, |
| -- back off exponentially until 1 hour, then at constant 1-hour intervals |
| next_processed_after=CASE |
| WHEN mvs.last_processed_at IS NULL THEN |
| CURRENT_TIMESTAMP + INTERVAL '1 minute' |
| WHEN 2*(mvs.next_processed_after - mvs.last_processed_at) < INTERVAL '1 hour' THEN |
| CURRENT_TIMESTAMP + 2*(mvs.next_processed_after - mvs.last_processed_at) |
| ELSE |
| CURRENT_TIMESTAMP + INTERVAL '1 hour' |
| END;`, |
| mvs.ModulePath, mvs.Version, version.ForSorting(mvs.Version), |
| mvs.AppVersion, mvs.Timestamp, mvs.Status, mvs.HasGoMod, mvs.GoModPath, sqlErrorMsg, numPackages, |
| version.IsIncompatible(mvs.Version)) |
| if err != nil { |
| return err |
| } |
| if affected != 1 { |
| return fmt.Errorf("module version state update affected %d rows, expected exactly 1", affected) |
| } |
| return nil |
| } |
| |
| // updateModulesStatus updates the status of the module with the given modulePath |
| // and version, if it exists, in the modules table. |
| func updateModulesStatus(ctx context.Context, db *database.DB, modulePath, resolvedVersion string, status int) (err error) { |
| defer derrors.WrapStack(&err, "updateModulesStatus(%q, %q, %d)", modulePath, resolvedVersion, status) |
| |
| query := `UPDATE modules |
| SET |
| status = $1 |
| WHERE |
| module_path = $2 |
| AND version = $3;` |
| affected, err := db.Exec(ctx, query, status, modulePath, resolvedVersion) |
| if err != nil { |
| return err |
| } |
| if affected > 1 { |
| return fmt.Errorf("module status update affected %d rows, expected at most 1", affected) |
| } |
| return nil |
| } |
| |
| // UpdateModuleVersionStatus updates the status and error fields of a module version. |
| func (db *DB) UpdateModuleVersionStatus(ctx context.Context, modulePath, version string, status int, error string) (err error) { |
| defer derrors.WrapStack(&err, "UpdateModuleVersionStatus(%q, %q, %d)", modulePath, version, status) |
| |
| query := ` |
| UPDATE module_version_states |
| SET status = $3, error = $4 |
| WHERE module_path = $1 AND version = $2 |
| ` |
| _, err = db.db.Exec(ctx, query, modulePath, version, status, error) |
| return err |
| } |
| |
| func upsertPackageVersionStates(ctx context.Context, db *database.DB, packageVersionStates []*internal.PackageVersionState) (err error) { |
| defer derrors.WrapStack(&err, "upsertPackageVersionStates") |
| ctx, span := trace.StartSpan(ctx, "upsertPackageVersionStates") |
| defer span.End() |
| |
| sort.Slice(packageVersionStates, func(i, j int) bool { |
| return packageVersionStates[i].PackagePath < packageVersionStates[j].PackagePath |
| }) |
| var vals []interface{} |
| for _, pvs := range packageVersionStates { |
| vals = append(vals, pvs.PackagePath, pvs.ModulePath, pvs.Version, pvs.Status, pvs.Error) |
| } |
| return db.BulkInsert(ctx, "package_version_states", |
| []string{ |
| "package_path", |
| "module_path", |
| "version", |
| "status", |
| "error", |
| }, |
| vals, |
| `ON CONFLICT (module_path, package_path, version) |
| DO UPDATE |
| SET |
| package_path=excluded.package_path, |
| module_path=excluded.module_path, |
| version=excluded.version, |
| status=excluded.status, |
| error=excluded.error`) |
| } |
| |
| // LatestIndexTimestamp returns the last timestamp successfully inserted into |
| // the module_version_states table. |
| func (db *DB) LatestIndexTimestamp(ctx context.Context) (_ time.Time, err error) { |
| defer derrors.WrapStack(&err, "LatestIndexTimestamp(ctx)") |
| |
| query := `SELECT index_timestamp |
| FROM module_version_states |
| ORDER BY index_timestamp DESC |
| LIMIT 1` |
| |
| var ts time.Time |
| row := db.db.QueryRow(ctx, query) |
| switch err := row.Scan(&ts); err { |
| case sql.ErrNoRows: |
| return time.Time{}, nil |
| case nil: |
| return ts, nil |
| default: |
| return time.Time{}, err |
| } |
| } |
| |
| const moduleVersionStateColumns = ` |
| module_path, |
| version, |
| index_timestamp, |
| created_at, |
| status, |
| error, |
| try_count, |
| last_processed_at, |
| next_processed_after, |
| app_version, |
| has_go_mod, |
| go_mod_path, |
| num_packages` |
| |
| // scanModuleVersionState constructs an *internal.ModuleModuleVersionState from the given |
| // scanner. It expects columns to be in the order of moduleVersionStateColumns. |
| func scanModuleVersionState(scan func(dest ...interface{}) error) (*internal.ModuleVersionState, error) { |
| var ( |
| v internal.ModuleVersionState |
| lastProcessedAt pq.NullTime |
| numPackages sql.NullInt64 |
| hasGoMod sql.NullBool |
| ) |
| if err := scan(&v.ModulePath, &v.Version, &v.IndexTimestamp, &v.CreatedAt, &v.Status, &v.Error, |
| &v.TryCount, &v.LastProcessedAt, &v.NextProcessedAfter, &v.AppVersion, &hasGoMod, &v.GoModPath, |
| &numPackages); err != nil { |
| return nil, err |
| } |
| if lastProcessedAt.Valid { |
| lp := lastProcessedAt.Time |
| v.LastProcessedAt = &lp |
| } |
| if hasGoMod.Valid { |
| v.HasGoMod = hasGoMod.Bool |
| } |
| if numPackages.Valid { |
| n := int(numPackages.Int64) |
| v.NumPackages = &n |
| } |
| return &v, nil |
| } |
| |
| // queryModuleVersionStates executes a query for ModuleModuleVersionState rows. It expects the |
| // given queryFormat be a format specifier with exactly one argument: a %s verb |
| // for the query columns. |
| func (db *DB) queryModuleVersionStates(ctx context.Context, queryFormat string, args ...interface{}) ([]*internal.ModuleVersionState, error) { |
| query := fmt.Sprintf(queryFormat, moduleVersionStateColumns) |
| rows, err := db.db.Query(ctx, query, args...) |
| if err != nil { |
| return nil, err |
| } |
| defer rows.Close() |
| |
| var versions []*internal.ModuleVersionState |
| for rows.Next() { |
| v, err := scanModuleVersionState(rows.Scan) |
| if err != nil { |
| return nil, fmt.Errorf("rows.Scan(): %v", err) |
| } |
| versions = append(versions, v) |
| } |
| |
| return versions, nil |
| } |
| |
| // GetRecentFailedVersions returns versions that have most recently failed. |
| func (db *DB) GetRecentFailedVersions(ctx context.Context, limit int) (_ []*internal.ModuleVersionState, err error) { |
| defer derrors.WrapStack(&err, "GetRecentFailedVersions(ctx, %d)", limit) |
| |
| queryFormat := ` |
| SELECT %s |
| FROM |
| module_version_states |
| WHERE status=500 |
| ORDER BY last_processed_at DESC |
| LIMIT $1` |
| return db.queryModuleVersionStates(ctx, queryFormat, limit) |
| } |
| |
| // GetRecentVersions returns recent versions that have been processed. |
| func (db *DB) GetRecentVersions(ctx context.Context, limit int) (_ []*internal.ModuleVersionState, err error) { |
| defer derrors.WrapStack(&err, "GetRecentVersions(ctx, %d)", limit) |
| |
| queryFormat := ` |
| SELECT %s |
| FROM |
| module_version_states |
| ORDER BY created_at DESC |
| LIMIT $1` |
| return db.queryModuleVersionStates(ctx, queryFormat, limit) |
| } |
| |
| // GetModuleVersionState returns the current module version state for |
| // modulePath and version. |
| func (db *DB) GetModuleVersionState(ctx context.Context, modulePath, resolvedVersion string) (_ *internal.ModuleVersionState, err error) { |
| defer derrors.WrapStack(&err, "GetModuleVersionState(ctx, %q, %q)", modulePath, resolvedVersion) |
| |
| query := fmt.Sprintf(` |
| SELECT %s |
| FROM |
| module_version_states |
| WHERE |
| module_path = $1 |
| AND version = $2;`, moduleVersionStateColumns) |
| |
| row := db.db.QueryRow(ctx, query, modulePath, resolvedVersion) |
| v, err := scanModuleVersionState(row.Scan) |
| switch err { |
| case nil: |
| return v, nil |
| case sql.ErrNoRows: |
| return nil, derrors.NotFound |
| default: |
| return nil, fmt.Errorf("row.Scan(): %v", err) |
| } |
| } |
| |
| // GetPackageVersionStatesForModule returns the current package version states |
| // for modulePath and version. |
| func (db *DB) GetPackageVersionStatesForModule(ctx context.Context, modulePath, resolvedVersion string) (_ []*internal.PackageVersionState, err error) { |
| defer derrors.WrapStack(&err, "GetPackageVersionState(ctx, %q, %q)", modulePath, resolvedVersion) |
| |
| query := ` |
| SELECT |
| package_path, |
| module_path, |
| version, |
| status, |
| error |
| FROM |
| package_version_states |
| WHERE |
| module_path = $1 |
| AND version = $2;` |
| |
| var states []*internal.PackageVersionState |
| collect := func(rows *sql.Rows) error { |
| var s internal.PackageVersionState |
| if err := rows.Scan(&s.PackagePath, &s.ModulePath, &s.Version, |
| &s.Status, &s.Error); err != nil { |
| return fmt.Errorf("rows.Scan(): %v", err) |
| } |
| states = append(states, &s) |
| return nil |
| } |
| if err := db.db.RunQuery(ctx, query, collect, modulePath, resolvedVersion); err != nil { |
| return nil, err |
| } |
| return states, nil |
| } |
| |
| // GetPackageVersionState returns the current package version state for |
| // pkgPath, modulePath and version. |
| func (db *DB) GetPackageVersionState(ctx context.Context, pkgPath, modulePath, resolvedVersion string) (_ *internal.PackageVersionState, err error) { |
| defer derrors.WrapStack(&err, "GetPackageVersionState(ctx, %q, %q, %q)", pkgPath, modulePath, resolvedVersion) |
| |
| query := ` |
| SELECT |
| package_path, |
| module_path, |
| version, |
| status, |
| error |
| FROM |
| package_version_states |
| WHERE |
| package_path = $1 |
| AND module_path = $2 |
| AND version = $3;` |
| |
| var pvs internal.PackageVersionState |
| err = db.db.QueryRow(ctx, query, pkgPath, modulePath, resolvedVersion).Scan( |
| &pvs.PackagePath, &pvs.ModulePath, &pvs.Version, |
| &pvs.Status, &pvs.Error) |
| switch err { |
| case nil: |
| return &pvs, nil |
| case sql.ErrNoRows: |
| return nil, derrors.NotFound |
| default: |
| return nil, fmt.Errorf("row.Scan(): %v", err) |
| } |
| } |
| |
| // VersionStats holds statistics extracted from the module_version_states |
| // table. |
| type VersionStats struct { |
| LatestTimestamp time.Time |
| VersionCounts map[int]int // from status to number of rows |
| } |
| |
| // GetVersionStats queries the module_version_states table for aggregate |
| // information about the current state of module versions, grouping them by |
| // their current status code. |
| func (db *DB) GetVersionStats(ctx context.Context) (_ *VersionStats, err error) { |
| defer derrors.WrapStack(&err, "GetVersionStats(ctx)") |
| |
| query := ` |
| SELECT |
| status, |
| max(index_timestamp), |
| count(*) |
| FROM |
| module_version_states |
| GROUP BY status;` |
| |
| stats := &VersionStats{ |
| VersionCounts: make(map[int]int), |
| } |
| err = db.db.RunQuery(ctx, query, func(rows *sql.Rows) error { |
| var ( |
| status sql.NullInt64 |
| indexTimestamp time.Time |
| count int |
| ) |
| if err := rows.Scan(&status, &indexTimestamp, &count); err != nil { |
| return fmt.Errorf("row.Scan(): %v", err) |
| } |
| if indexTimestamp.After(stats.LatestTimestamp) { |
| stats.LatestTimestamp = indexTimestamp |
| } |
| stats.VersionCounts[int(status.Int64)] = count |
| return nil |
| }) |
| if err != nil { |
| return nil, err |
| } |
| return stats, nil |
| } |
| |
| // HasGoMod reports whether a given module version has a go.mod file. |
| // It returns a NotFound error if it can't find any information. |
| func (db *DB) HasGoMod(ctx context.Context, modulePath, version string) (has bool, err error) { |
| defer derrors.WrapStack(&err, "HasGoMod(%q, %q)", modulePath, version) |
| |
| // Check the module_version_states table. It has information about |
| // every module we've seen. Ignore rows with status == 0 because |
| // they haven't been processed yet. |
| var hasP *bool |
| err = db.db.QueryRow(ctx, ` |
| SELECT has_go_mod |
| FROM module_version_states |
| WHERE module_path = $1 |
| AND version = $2 |
| AND status != 0 |
| `, modulePath, version).Scan(&hasP) |
| if err == sql.ErrNoRows { |
| return false, derrors.NotFound |
| } |
| if err != nil { |
| return false, err |
| } |
| if hasP != nil { |
| return *hasP, nil |
| } |
| // the has_go_mod column hasn't been populated yet. |
| // Fall back to the modules table. |
| // This can be removed when all rows have been populated and |
| // module_version_states.has_go_mod is migrated to NOT NULL. |
| err = db.db.QueryRow(ctx, ` |
| SELECT has_go_mod |
| FROM modules |
| WHERE module_path = $1 AND version = $2 |
| `, modulePath, version).Scan(&has) |
| if err == sql.ErrNoRows { |
| return false, derrors.NotFound |
| } |
| if err != nil { |
| return false, err |
| } |
| return has, nil |
| } |