blob: a9927969e875091278fd7323996693969a8cc376 [file] [log] [blame]
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package postgres provides functionality for reading and writing to
// the postgres database.
package postgres
import (
"context"
"database/sql"
"time"
"golang.org/x/pkgsite/internal/database"
"golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/log"
"golang.org/x/pkgsite/internal/poller"
)
type DB struct {
db *database.DB
bypassLicenseCheck bool
expoller *poller.Poller
cancel func()
}
// New returns a new postgres DB.
func New(db *database.DB) *DB {
return newdb(db, false)
}
// NewBypassingLicenseCheck returns a new postgres DB that bypasses license
// checks. That means all data will be inserted and returned for
// non-redistributable modules, packages and directories.
func NewBypassingLicenseCheck(db *database.DB) *DB {
return newdb(db, true)
}
// For testing.
var startPoller = true
func newdb(db *database.DB, bypass bool) *DB {
p := poller.New(
[]string(nil),
func(ctx context.Context) (interface{}, error) {
return getExcludedPrefixes(ctx, db)
},
func(err error) {
log.Errorf(context.Background(), "getting excluded prefixes: %v", err)
})
ctx, cancel := context.WithCancel(context.Background())
if startPoller {
p.Poll(ctx) // Initialize the state.
p.Start(ctx, time.Minute)
}
return &DB{
db: db,
bypassLicenseCheck: bypass,
expoller: p,
cancel: cancel,
}
}
// Close closes a DB.
func (db *DB) Close() error {
db.cancel()
return db.db.Close()
}
// Underlying returns the *database.DB inside db.
func (db *DB) Underlying() *database.DB {
return db.db
}
// StalenessTimestamp returns the index timestamp of the oldest
// module that is newer than the index timestamp of the youngest module we have
// processed. That is, let T be the maximum index timestamp of all processed
// modules. Then this function return the minimum index timestamp of unprocessed
// modules that is no less than T, or an error that wraps derrors.NotFound if
// there is none.
//
// The name of the function is imprecise: there may be an older unprocessed
// module, if one newer than it has been processed.
//
// We use this function to compute a metric that is a lower bound on the time
// it takes to process a module since it appeared in the index.
func (db *DB) StalenessTimestamp(ctx context.Context) (time.Time, error) {
var ts time.Time
err := db.db.QueryRow(ctx, `
SELECT m.index_timestamp
FROM module_version_states m
CROSS JOIN (
-- the index timestamp of the youngest processed module
SELECT index_timestamp
FROM module_version_states
WHERE last_processed_at IS NOT NULL
ORDER BY 1 DESC
LIMIT 1
) yp
WHERE m.index_timestamp > yp.index_timestamp
AND last_processed_at IS NULL
ORDER BY m.index_timestamp ASC
LIMIT 1
`).Scan(&ts)
switch err {
case nil:
return ts, nil
case sql.ErrNoRows:
return time.Time{}, derrors.NotFound
default:
return time.Time{}, err
}
}
// collectStrings runs the query, which must select for a single string column, and returns
// a slice of the resulting strings.
func collectStrings(ctx context.Context, db *database.DB, query string, args ...interface{}) (ss []string, err error) {
defer derrors.WrapStack(&err, "DB.collectStrings(%q)", query)
err = db.RunQuery(ctx, query, func(rows *sql.Rows) error {
var s string
if err := rows.Scan(&s); err != nil {
return err
}
ss = append(ss, s)
return nil
}, args...)
if err != nil {
return nil, err
}
return ss, nil
}