blob: e41a69ee69e163fda4a509747e52734c817872a2 [file] [log] [blame]
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package postgres
import (
var (
// SearchLatency holds observed latency in individual search queries.
SearchLatency = stats.Float64(
"Latency of a search query.",
// SearchSource is a census tag for search query types.
SearchSource = tag.MustNewKey("search.source")
// SearchLatencyDistribution aggregates search request latency by search
// query type.
SearchLatencyDistribution = &view.View{
Name: "go-discovery/search/latency",
Measure: SearchLatency,
Aggregation: ochttp.DefaultLatencyDistribution,
Description: "Search latency, by result source query type.",
TagKeys: []tag.Key{SearchSource},
// SearchResponseCount counts search responses by search query type.
SearchResponseCount = &view.View{
Name: "go-discovery/search/count",
Measure: SearchLatency,
Aggregation: view.Count(),
Description: "Search count, by result source query type.",
TagKeys: []tag.Key{SearchSource},
// searchResponse is used for internal bookkeeping when fanning-out search
// request to multiple different search queries.
type searchResponse struct {
// source is a unique identifier for the search query type (e.g. 'deep',
// 'popular-8'), to be used in logging and reporting.
source string
// results are partially filled out from only the search_documents table.
results []*internal.SearchResult
// err indicates a technical failure of the search query, or that results are
// not provably complete.
err error
// uncounted reports whether this response is missing total result counts. If
// uncounted is true, search will wait for either the hyperloglog count
// estimate, or for an alternate search method to return with
// uncounted=false.
uncounted bool
// searchEvent is used to log structured information about search events for
// later analysis. A 'search event' occurs when a searcher or count estimate
// returns.
type searchEvent struct {
// Type is either the searcher name or 'estimate' (the count estimate).
Type string
// Latency is the duration that that the operation took.
Latency time.Duration
// Err is the error returned by the operation, if any.
Err error
// A searcher is used to execute a single search request.
type searcher func(db *DB, ctx context.Context, q string, limit, offset int) searchResponse
// The searchers used by Search.
var searchers = map[string]searcher{
"popular": (*DB).popularSearch,
"deep": (*DB).deepSearch,
// Search executes two search requests concurrently:
// - a sequential scan of packages in descending order of popularity.
// - all packages ("deep" search) using an inverted index to filter to search
// terms.
// The sequential scan takes significantly less time when searching for very
// common terms (e.g. "errors", "cloud", or "kubernetes"), due to its ability
// to exit early once the requested page of search results is provably
// complete.
// Because 0 <= ts_rank() <= 1, we know that the highest score of any unscanned
// package is ln(e+N), where N is imported_by_count of the package we are
// currently considering. Therefore if the lowest scoring result of popular
// search is greater than ln(e+N), we know that we haven't missed any results
// and can return the search result immediately, cancelling other searches.
// On the other hand, if the popular search is slow, it is likely that the
// search term is infrequent, and deep search will be fast due to our inverted
// gin index on search tokens.
// The gap in this optimization is search terms that are very frequent, but
// rarely relevant: "int" or "package", for example. In these cases we'll pay
// the penalty of a deep search that scans nearly every package.
func (db *DB) Search(ctx context.Context, q string, limit, offset int) (_ []*internal.SearchResult, err error) {
defer derrors.Wrap(&err, "DB.Search(ctx, %q, %d, %d)", q, limit, offset)
return db.hedgedSearch(ctx, q, limit, offset, searchers, nil)
// Penalties to search scores, applied as multipliers to the score.
const (
// Module license is non-redistributable.
nonRedistributablePenalty = 0.5
// Module does not have a go.mod file.
// Start this off gently (close to 1), but consider lowering
// it as time goes by and more of the ecosystem converts to modules.
noGoModPenalty = 0.8
// scoreExpr is the expression that computes the search score.
// It is the product of:
// - The Postgres ts_rank score, based the relevance of the document to the query.
// - The log of the module's popularity, estimated by the number of importing packages.
// The log factor contains exp(1) so that it is always >= 1. Taking the log
// of imported_by_count instead of using it directly makes the effect less
// dramatic: being 2x as popular only has an additive effect.
// - A penalty factor for non-redistributable modules, since a lot of
// details cannot be displayed.
// The first argument to ts_rank is an array of weights for the four tsvector sections,
// in the order D, C, B, A.
// The weights below match the defaults except for B.
var scoreExpr = fmt.Sprintf(`
ts_rank('{0.1, 0.2, 1.0, 1.0}', tsv_search_tokens, websearch_to_tsquery($1)) *
ln(exp(1)+imported_by_count) *
CASE WHEN redistributable THEN 1 ELSE %f END *
CASE WHEN COALESCE(has_go_mod, true) THEN 1 ELSE %f END
`, nonRedistributablePenalty, noGoModPenalty)
// hedgedSearch executes multiple search methods and returns the first
// available result.
// The optional guardTestResult func may be used to allow tests to control the
// order in which search results are returned.
func (db *DB) hedgedSearch(ctx context.Context, q string, limit, offset int, searchers map[string]searcher, guardTestResult func(string) func()) ([]*internal.SearchResult, error) {
searchStart := time.Now()
responses := make(chan searchResponse, len(searchers))
// cancel all unfinished searches when a result (or error) is returned. The
// effectiveness of this depends on the database driver.
searchCtx, cancel := context.WithCancel(ctx)
defer cancel()
// Asynchronously query for the estimated result count.
estimateChan := make(chan estimateResponse, 1)
go func() {
start := time.Now()
estimateResp := db.estimateResultsCount(searchCtx, q)
log.Info(ctx, searchEvent{
Type: "estimate",
Latency: time.Since(start),
Err: estimateResp.err,
if guardTestResult != nil {
defer guardTestResult("estimate")()
estimateChan <- estimateResp
// Fan out our search requests.
for _, s := range searchers {
s := s
go func() {
start := time.Now()
resp := s(db, searchCtx, q, limit, offset)
log.Info(ctx, searchEvent{
Type: resp.source,
Latency: time.Since(start),
Err: resp.err,
if guardTestResult != nil {
defer guardTestResult(resp.source)()
responses <- resp
var resp searchResponse
for range searchers {
resp = <-responses
if resp.err == nil {
log.Infof(ctx, "initial search response from searcher %s", resp.source)
} else {
log.Errorf(ctx, "error from searcher %s: %v", resp.source, resp.err)
if resp.err != nil {
return nil, fmt.Errorf("all searchers failed: %v", resp.err)
if resp.uncounted {
// Since the response is uncounted, we should wait for either the count
// estimate to return, or for the first counted response.
for {
select {
case nextResp := <-responses:
if nextResp.err == nil && !nextResp.uncounted {
log.Infof(ctx, "using counted search results from searcher %s", nextResp.source)
// use this response since it is counted.
resp = nextResp
break loop
case estr := <-estimateChan:
if estr.err != nil {
return nil, fmt.Errorf("error getting estimated count: %v", estr.err)
log.Info(ctx, "using count estimate")
for _, r := range resp.results {
// TODO(b/141182438): this is a hack: once search has been fully
// replaced with fastsearch, change the return signature of this
// function to separate result-level data from this metadata.
r.NumResults = estr.estimate
r.Approximate = true
break loop
//case <-responses:
case <-ctx.Done():
return nil, fmt.Errorf("context deadline exceeded while waiting for estimated result count")
// cancel proactively here: we've got the search result we need.
// latency is only recorded for valid search results, as fast failures could
// skew the latency distribution.
// Note that this latency measurement might differ meaningfully from the
// resp.Latency, if time was spent waiting for the result count estimate.
latency := float64(time.Since(searchStart)) / float64(time.Millisecond)
stats.RecordWithTags(ctx, []tag.Mutator{
tag.Upsert(SearchSource, resp.source),
}, SearchLatency.M(latency))
// To avoid fighting with the query planner, our searches only hit the
// search_documents table and we enrich after getting the results. In the
// future, we may want to fully denormalize and put all search data in the
// search_documents table.
if err := db.addPackageDataToSearchResults(ctx, resp.results); err != nil {
return nil, err
return resp.results, nil
const hllRegisterCount = 128
// hllQuery estimates search result counts using the hyperloglog algorithm.
// Here's how this works:
// 1) Search documents have been partitioned ~evenly into hllRegisterCount
// registers, using the hll_register column. For each hll_register, compute
// the maximum number of leading zeros of any element in the register
// matching our search query. This is the slowest part of the query, but
// since we have an index on (hll_register, hll_leading_zeros desc), we can
// parallelize this and it should be very quick if the density of search
// results is high. To achieve this parallelization, we use a trick of
// selecting a subselected value from generate_series(0, hllRegisterCount-1).
// If there are NO search results in a register, the 'zeros' column will be
// NULL.
// 2) From the results of (1), proceed following the 'Practical
// Considerations' in the wikipedia page above:
// Specifically, use linear counting when E < (5/2)m and there are empty
// registers.
// This should work for any register count >= 128. If we are to decrease this
// register count, we should adjust the estimate for a_m below according to
// the formulas in the wikipedia article above.
var hllQuery = fmt.Sprintf(`
WITH hll_data AS (
SELECT hll_leading_zeros
FROM search_documents
%[2]s *
CASE WHEN tsv_search_tokens @@ websearch_to_tsquery($1) THEN 1 ELSE 0 END
) > 0.1
AND hll_register=generate_series
ORDER BY hll_leading_zeros DESC
) t
) zeros
FROM generate_series(0,%[1]d-1)
nonempty_registers as (SELECT zeros FROM hll_data WHERE zeros IS NOT NULL)
-- use linear counting when there are not enough results, and there is at
-- least one empty register, per 'Practical Considerations'.
CASE WHEN result_count < 2.5 * %[1]d AND empty_register_count > 0
THEN ((0.7213 / (1 + 1.079 / %[1]d)) * (%[1]d *
log(2, (%[1]d::numeric) / empty_register_count)))::int
ELSE result_count END AS approx_count
(0.7213 / (1 + 1.079 / %[1]d)) * -- estimate for a_m
pow(%[1]d, 2) * -- m^2
(1/((%[1]d - count(1)) + SUM(POW(2, -1 * (zeros+1))))) -- Z
)::int AS result_count,
%[1]d - count(1) AS empty_register_count
FROM nonempty_registers
) d`, hllRegisterCount, scoreExpr)
type estimateResponse struct {
estimate uint64
err error
// EstimateResultsCount uses the hyperloglog algorithm to estimate the number
// of results for the given search term.
func (db *DB) estimateResultsCount(ctx context.Context, q string) estimateResponse {
row := db.db.QueryRow(ctx, hllQuery, q)
var estimate sql.NullInt64
if err := row.Scan(&estimate); err != nil {
return estimateResponse{err: fmt.Errorf("row.Scan(): %v", err)}
// If estimate is NULL, then we didn't find *any* results, so should return
// zero (the default).
return estimateResponse{estimate: uint64(estimate.Int64)}
// deepSearch searches all packages for the query. It is slower, but results
// are always valid.
func (db *DB) deepSearch(ctx context.Context, q string, limit, offset int) searchResponse {
query := fmt.Sprintf(`
SELECT *, COUNT(*) OVER() AS total
(%s) AS score
WHERE tsv_search_tokens @@ websearch_to_tsquery($1)
score DESC,
commit_time DESC,
) r
WHERE r.score > 0.1
OFFSET $3`, scoreExpr)
var results []*internal.SearchResult
collect := func(rows *sql.Rows) error {
var r internal.SearchResult
if err := rows.Scan(&r.PackagePath, &r.Version, &r.ModulePath, &r.CommitTime,
&r.NumImportedBy, &r.Score, &r.NumResults); err != nil {
return fmt.Errorf("rows.Scan(): %v", err)
results = append(results, &r)
return nil
err := db.db.RunQuery(ctx, query, collect, q, limit, offset)
if err != nil {
results = nil
return searchResponse{
source: "deep",
results: results,
err: err,
func (db *DB) popularSearch(ctx context.Context, searchQuery string, limit, offset int) searchResponse {
query := `
FROM popular_search($1, $2, $3, $4, $5)`
var results []*internal.SearchResult
collect := func(rows *sql.Rows) error {
var r internal.SearchResult
if err := rows.Scan(&r.PackagePath, &r.Version, &r.ModulePath, &r.CommitTime,
&r.NumImportedBy, &r.Score); err != nil {
return fmt.Errorf("rows.Scan(): %v", err)
results = append(results, &r)
return nil
err := db.db.RunQuery(ctx, query, collect, searchQuery, limit, offset, nonRedistributablePenalty, noGoModPenalty)
if err != nil {
results = nil
return searchResponse{
source: "popular",
results: results,
err: err,
uncounted: true,
// addPackageDataToSearchResults adds package information to SearchResults that is not stored
// in the search_documents table.
func (db *DB) addPackageDataToSearchResults(ctx context.Context, results []*internal.SearchResult) (err error) {
defer derrors.Wrap(&err, "DB.enrichResults(results)")
if len(results) == 0 {
return nil
var (
keys []string
// resultMap tracks PackagePath->SearchResult, to allow joining with the
// returned package data.
resultMap = make(map[string]*internal.SearchResult)
for _, r := range results {
resultMap[r.PackagePath] = r
key := fmt.Sprintf("(%s, %s, %s)", pq.QuoteLiteral(r.PackagePath),
pq.QuoteLiteral(r.Version), pq.QuoteLiteral(r.ModulePath))
keys = append(keys, key)
query := fmt.Sprintf(`
(path, version, module_path) IN (%s)`, strings.Join(keys, ","))
collect := func(rows *sql.Rows) error {
var (
path, name, synopsis string
licenseTypes []string
if err := rows.Scan(&path, &name, &synopsis, pq.Array(&licenseTypes)); err != nil {
return fmt.Errorf("rows.Scan(): %v", err)
r, ok := resultMap[path]
if !ok {
return fmt.Errorf("BUG: unexpected package path: %q", path)
r.Name = name
r.Synopsis = synopsis
for _, l := range licenseTypes {
if l != "" {
r.Licenses = append(r.Licenses, l)
return nil
return db.db.RunQuery(ctx, query, collect)
var upsertSearchStatement = fmt.Sprintf(`
INSERT INTO search_documents (
SETWEIGHT(TO_TSVECTOR('path_tokens', $2), 'A') ||
hll_hash(p.path) & (%[1]d - 1),
packages p
modules m
p.module_path = m.module_path
AND p.version = m.version
p.path = $1
-- Order the versions by release then prerelease.
-- The default version should be the first release
-- version available, if one exists.
m.version_type = 'release' DESC,
m.sort_version DESC,
m.module_path DESC
ON CONFLICT (package_path)
-- the hll fields are functions of path, so they don't change
CASE WHEN excluded.version = search_documents.version
THEN search_documents.version_updated_at
;`, hllRegisterCount)
// UpsertSearchDocuments adds search information for mod ot the search_documents table.
func (db *DB) UpsertSearchDocuments(ctx context.Context, mod *internal.Module) (err error) {
defer derrors.Wrap(&err, "UpsertSearchDocuments(ctx, %q)", mod.ModulePath)
for _, pkg := range mod.Packages {
if isInternalPackage(pkg.Path) {
err := db.UpsertSearchDocument(ctx, upsertSearchDocumentArgs{
PackagePath: pkg.Path,
ModulePath: mod.ModulePath,
Synopsis: pkg.Synopsis,
ReadmeFilePath: mod.ReadmeFilePath,
ReadmeContents: mod.ReadmeContents,
if err != nil {
return err
return nil
type upsertSearchDocumentArgs struct {
PackagePath string
ModulePath string
Synopsis string
ReadmeFilePath string
ReadmeContents string
// UpsertSearchDocument inserts a row for each package in the module, if that
// package is the latest version and is not internal.
// The given module should have already been validated via a call to
// validateModule.
func (db *DB) UpsertSearchDocument(ctx context.Context, args upsertSearchDocumentArgs) (err error) {
defer derrors.Wrap(&err, "UpsertSearchDocument(ctx, %q, %q)", args.PackagePath, args.ModulePath)
// Only summarize the README if the package and module have the same path.
if args.PackagePath != args.ModulePath {
args.ReadmeFilePath = ""
args.ReadmeContents = ""
pathTokens := strings.Join(GeneratePathTokens(args.PackagePath), " ")
sectionB, sectionC, sectionD := SearchDocumentSections(args.Synopsis, args.ReadmeFilePath, args.ReadmeContents)
_, err = db.db.Exec(ctx, upsertSearchStatement, args.PackagePath, pathTokens, sectionB, sectionC, sectionD)
return err
// GetPackagesForSearchDocumentUpsert fetches search information for packages in search_documents
// whose update time is before the given time.
func (db *DB) GetPackagesForSearchDocumentUpsert(ctx context.Context, before time.Time, limit int) (argsList []upsertSearchDocumentArgs, err error) {
defer derrors.Wrap(&err, "GetPackagesForSearchDocumentUpsert(ctx, %s, %d)", before, limit)
query := `
SELECT sd.package_path, sd.module_path, sd.synopsis, m.readme_file_path, m.readme_contents
FROM search_documents sd
INNER JOIN modules m
USING (module_path, version)
WHERE sd.updated_at < $1
collect := func(rows *sql.Rows) error {
var a upsertSearchDocumentArgs
if err := rows.Scan(&a.PackagePath, &a.ModulePath, &a.Synopsis, &a.ReadmeFilePath, &a.ReadmeContents); err != nil {
return err
argsList = append(argsList, a)
return nil
if err := db.db.RunQuery(ctx, query, collect, before, limit); err != nil {
return nil, err
return argsList, nil
// UpdateSearchDocumentsImportedByCount updates imported_by_count and
// imported_by_count_updated_at.
// It does so by completely recalculating the imported-by counts
// from the imports_unique table.
// UpdateSearchDocumentsImportedByCount returns the number of rows updated.
func (db *DB) UpdateSearchDocumentsImportedByCount(ctx context.Context) (nUpdated int64, err error) {
defer derrors.Wrap(&err, "UpdateSearchDocumentsImportedByCount(ctx)")
searchPackages, err := db.getSearchPackages(ctx)
if err != nil {
return 0, err
counts, err := db.computeImportedByCounts(ctx, searchPackages)
if err != nil {
return 0, err
err = db.db.Transact(ctx, func(tx *database.DB) error {
if err := insertImportedByCounts(ctx, tx, counts); err != nil {
return err
if err := compareImportedByCounts(ctx, tx); err != nil {
return err
nUpdated, err = updateImportedByCounts(ctx, tx)
return err
return nUpdated, err
// getSearchPackages returns the set of package paths that are in the search_documents table.
func (db *DB) getSearchPackages(ctx context.Context) (set map[string]bool, err error) {
defer derrors.Wrap(&err, "DB.getSearchPackages(ctx)")
set = map[string]bool{}
err = db.db.RunQuery(ctx, `SELECT package_path FROM search_documents`, func(rows *sql.Rows) error {
var p string
if err := rows.Scan(&p); err != nil {
return err
set[p] = true
return nil
if err != nil {
return nil, err
return set, nil
func (db *DB) computeImportedByCounts(ctx context.Context, searchDocsPackages map[string]bool) (counts map[string]int, err error) {
defer derrors.Wrap(&err, "db.computeImportedByCounts(ctx)")
counts = map[string]int{}
// Get all (from_path, to_path) pairs, deduped.
// Also get the from_path's module path.
rows, err := db.db.Query(ctx, `
from_path, from_module_path, to_path
from_path, from_module_path, to_path;
if err != nil {
return nil, err
defer rows.Close()
for rows.Next() {
var from, fromMod, to string
if err := rows.Scan(&from, &fromMod, &to); err != nil {
return nil, err
// Don't count an importer if it's not in search_documents.
if !searchDocsPackages[from] {
// Don't count an importer if it's in the same module as what it's importing.
// Approximate that check by seeing if from_module_path is a prefix of to_path.
// (In some cases, e.g. when to_path is in a nested module, that is not correct.)
if (fromMod == stdlib.ModulePath && stdlib.Contains(to)) || strings.HasPrefix(to+"/", fromMod+"/") {
if err := rows.Err(); err != nil {
return nil, err
return counts, nil
func insertImportedByCounts(ctx context.Context, db *database.DB, counts map[string]int) (err error) {
defer derrors.Wrap(&err, "insertImportedByCounts(ctx, db, counts)")
const createTableQuery = `
CREATE TEMPORARY TABLE computed_imported_by_counts (
package_path TEXT NOT NULL,
imported_by_count INTEGER DEFAULT 0 NOT NULL
if _, err := db.Exec(ctx, createTableQuery); err != nil {
return fmt.Errorf("CREATE TABLE: %v", err)
values := make([]interface{}, 0, 2*len(counts))
for p, c := range counts {
values = append(values, p, c)
columns := []string{"package_path", "imported_by_count"}
return db.BulkInsert(ctx, "computed_imported_by_counts", columns, values, "")
func compareImportedByCounts(ctx context.Context, db *database.DB) (err error) {
defer derrors.Wrap(&err, "compareImportedByCounts(ctx, tx)")
query := `
search_documents s
computed_imported_by_counts c
s.package_path = c.package_path
// Compute some info about the changes to import-by counts.
const changeThreshold = 0.05 // count how many counts change by at least this fraction
var total, zero, change, diff int
err = db.RunQuery(ctx, query, func(rows *sql.Rows) error {
var path string
var old, new int
if err := rows.Scan(&path, &old, &new); err != nil {
return err
if old != new {
if old == 0 {
return nil
fracDiff := math.Abs(float64(new-old)) / float64(old)
if fracDiff > changeThreshold {
return nil
if err != nil {
return err
log.Infof(ctx, "%6d total rows in search_documents match computed_imported_by_counts", total)
log.Infof(ctx, "%6d will change", change)
log.Infof(ctx, "%6d currently have a zero imported-by count", zero)
log.Infof(ctx, "%6d of the non-zero rows will change by more than %d%%", diff, int(changeThreshold*100))
return nil
// updateImportedByCounts updates the imported_by_count column in search_documents
// for every package in computed_imported_by_counts.
// A row is updated even if the value doesn't change, so that the imported_by_count_updated_at
// column is set.
// Note that if a package is never imported, its imported_by_count column will
// be the default (0) and its imported_by_count_updated_at column will never be set.
func updateImportedByCounts(ctx context.Context, db *database.DB) (int64, error) {
const updateStmt = `
UPDATE search_documents s
imported_by_count = c.imported_by_count,
imported_by_count_updated_at = CURRENT_TIMESTAMP
FROM computed_imported_by_counts c
WHERE s.package_path = c.package_path;`
res, err := db.Exec(ctx, updateStmt)
if err != nil {
return 0, fmt.Errorf("error updating imported_by_count and imported_by_count_updated_at for search documents: %v", err)
n, err := res.RowsAffected()
if err != nil {
return 0, fmt.Errorf("RowsAffected: %v", err)
return n, nil
var (
commonHostnames = map[string]bool{
"": true,
"": true,
"": true,
"": true,
"": true,
"": true,
"": true,
"": true,
"": true,
"": true,
"": true,
commonHostParts = map[string]bool{
"code": true,
"git": true,
"gitlab": true,
"go": true,
"google": true,
"www": true,
// GeneratePathTokens returns the subPaths and path token parts that will be
// indexed for search, which includes (1) the packagePath (2) all sub-paths of
// the packagePath (3) all parts for a path element that is delimited by a dash
// and (4) all parts of a path element that is delimited by a dot, except for
// the last element.
func GeneratePathTokens(packagePath string) []string {
packagePath = strings.Trim(packagePath, "/")
subPathSet := make(map[string]bool)
parts := strings.Split(packagePath, "/")
for i, part := range parts {
dashParts := strings.Split(part, "-")
if len(dashParts) > 1 {
for _, p := range dashParts {
subPathSet[p] = true
for j := i + 2; j <= len(parts); j++ {
p := strings.Join(parts[i:j], "/")
p = strings.Trim(p, "/")
subPathSet[p] = true
if i == 0 && commonHostnames[part] {
// Only index host names if they are not part of commonHostnames.
// Note that because "SELECT to_tsvector('')"
// will return "" as one of its tokens, the common host
// name will still be indexed until we change the pg search_config.
// TODO(b/141318673).
subPathSet[part] = true
dotParts := strings.Split(part, ".")
if len(dotParts) > 1 {
for _, p := range dotParts[:len(dotParts)-1] {
if !commonHostParts[p] {
// If the host is not in commonHostnames, we want to
// index each element up to the extension. For example,
// if the host is, we want to index sigs
// and k8s. Skip common host parts.
subPathSet[p] = true
var subPaths []string
for sp := range subPathSet {
if len(sp) > 0 {
subPaths = append(subPaths, sp)
return subPaths
// isInternalPackage reports whether the path represents an internal directory.
func isInternalPackage(path string) bool {
for _, p := range strings.Split(path, "/") {
if p == "internal" {
return true
return false
// DeleteOlderVersionFromSearchDocuments deletes from search_documents every package with
// the given module path whose version is older than the given version.
// It is used when fetching a module with an alternative path. See internal/worker/fetch.go:fetchAndUpdateState.
func (db *DB) DeleteOlderVersionFromSearchDocuments(ctx context.Context, modulePath, version string) (err error) {
defer derrors.Wrap(&err, "DeleteOlderVersionFromSearchDocuments(ctx, %q, %q)", modulePath, version)
return db.db.Transact(ctx, func(tx *database.DB) error {
// Collect all package paths in search_documents with the given module path
// and an older version. (package_path is the primary key of search_documents.)
var ppaths []string
query := `
SELECT package_path, version
FROM search_documents
WHERE module_path = $1
err := tx.RunQuery(ctx, query, func(rows *sql.Rows) error {
var ppath, v string
if err := rows.Scan(&ppath, &v); err != nil {
return err
if semver.Compare(v, version) < 0 {
ppaths = append(ppaths, ppath)
return nil
}, modulePath)
if err != nil {
return err
if len(ppaths) == 0 {
return nil
// Delete all of those paths.
q := fmt.Sprintf(`DELETE FROM search_documents WHERE package_path IN ('%s')`, strings.Join(ppaths, `', '`))
res, err := tx.Exec(ctx, q)
if err != nil {
return err
n, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("RowsAffected: %v", err)
log.Infof(ctx, "deleted %d rows from search_documents", n)
return nil