blob: 546a97b0f9648fb4e16018d29d5b44d2538be91b [file] [log] [blame]
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package worker
import (
"context"
"fmt"
"net/http"
"sort"
"strings"
"time"
"go.opencensus.io/trace"
"golang.org/x/mod/semver"
"golang.org/x/pkgsite/internal"
"golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/experiment"
"golang.org/x/pkgsite/internal/fetch"
"golang.org/x/pkgsite/internal/log"
"golang.org/x/pkgsite/internal/postgres"
"golang.org/x/pkgsite/internal/proxy"
"golang.org/x/pkgsite/internal/source"
)
const (
// Indicates that although we have a valid module, some packages could not be processed.
hasIncompletePackagesCode = 290
hasIncompletePackagesDesc = "incomplete packages"
)
// ProxyRemoved is a set of module@version that have been removed from the proxy,
// even though they are still in the index.
var ProxyRemoved = map[string]bool{}
// fetchTask represents the result of a fetch task that was processed.
type fetchTask struct {
fetch.FetchResult
timings map[string]time.Duration
}
// FetchAndUpdateState fetches and processes a module version, and then updates
// the module_version_states table according to the result. It returns an HTTP
// status code representing the result of the fetch operation, and a non-nil
// error if this status code is not 200.
func FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion string, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB, appVersionLabel string) (_ int, err error) {
defer derrors.Wrap(&err, "FetchAndUpdateState(%q, %q)", modulePath, requestedVersion)
tctx, span := trace.StartSpan(ctx, "FetchAndUpdateState")
ctx = experiment.NewContext(tctx, experiment.FromContext(ctx))
ctx = log.NewContextWithLabel(ctx, "fetch", modulePath+"@"+requestedVersion)
span.AddAttributes(
trace.StringAttribute("modulePath", modulePath),
trace.StringAttribute("version", requestedVersion))
defer span.End()
ft := fetchAndInsertModule(ctx, modulePath, requestedVersion, proxyClient, sourceClient, db)
span.AddAttributes(trace.Int64Attribute("numPackages", int64(len(ft.PackageVersionStates))))
dbErr := updateVersionMapAndDeleteModulesWithErrors(ctx, db, ft)
if dbErr != nil {
log.Error(ctx, dbErr)
ft.Error = dbErr
ft.Status = http.StatusInternalServerError
}
if !semver.IsValid(ft.ResolvedVersion) {
return ft.Status, ft.Error
}
// Update the module_version_states table with the new status of
// module@version. This must happen last, because if it succeeds with a
// code < 500 but a later action fails, we will never retry the later
// action.
// TODO(golang/go#39628): Split UpsertModuleVersionState into
// InsertModuleVersionState and UpdateModuleVersionState.
start := time.Now()
err = db.UpsertModuleVersionState(ctx, ft.ModulePath, ft.ResolvedVersion, appVersionLabel,
time.Time{}, ft.Status, ft.GoModPath, ft.Error, ft.PackageVersionStates)
ft.timings["db.UpsertModuleVersionState"] = time.Since(start)
if err != nil {
log.Error(ctx, err)
if ft.Error != nil {
ft.Status = http.StatusInternalServerError
ft.Error = fmt.Errorf("db.UpsertModuleVersionState: %v, original error: %v", err, ft.Error)
}
logTaskResult(ctx, ft, "Failed to update module version state")
return http.StatusInternalServerError, ft.Error
}
logTaskResult(ctx, ft, "Updated module version state")
return ft.Status, ft.Error
}
// fetchAndInsertModule fetches the given module version from the module proxy
// or (in the case of the standard library) from the Go repo and writes the
// resulting data to the database.
//
// The given parentCtx is used for tracing, but fetches actually execute in a
// detached context with fixed timeout, so that fetches are allowed to complete
// even for short-lived requests.
func fetchAndInsertModule(ctx context.Context, modulePath, requestedVersion string, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB) *fetchTask {
ft := &fetchTask{
FetchResult: fetch.FetchResult{
ModulePath: modulePath,
RequestedVersion: requestedVersion,
},
timings: map[string]time.Duration{},
}
defer func() {
derrors.Wrap(&ft.Error, "fetchAndInsertModule(%q, %q)", modulePath, requestedVersion)
if ft.Error != nil {
ft.Status = derrors.ToHTTPStatus(ft.Error)
ft.ResolvedVersion = requestedVersion
}
}()
if ProxyRemoved[modulePath+"@"+requestedVersion] {
log.Infof(ctx, "not fetching %s@%s because it is on the ProxyRemoved list", modulePath, requestedVersion)
ft.Error = derrors.Excluded
return ft
}
exc, err := db.IsExcluded(ctx, modulePath)
if err != nil {
ft.Error = err
return ft
}
if exc {
ft.Error = derrors.Excluded
return ft
}
start := time.Now()
fr := fetch.FetchModule(ctx, modulePath, requestedVersion, proxyClient, sourceClient)
if fr == nil {
panic("fetch.FetchModule should never return a nil FetchResult")
}
ft.FetchResult = *fr
ft.timings["fetch.FetchModule"] = time.Since(start)
if ft.Error != nil {
logf := log.Errorf
if ft.Status < 500 {
logf = log.Infof
}
logf(ctx, "Error executing fetch: %v (code %d)", ft.Error, ft.Status)
return ft
}
log.Infof(ctx, "fetch.FetchVersion succeeded for %s@%s", ft.ModulePath, ft.RequestedVersion)
start = time.Now()
err = db.InsertModule(ctx, ft.Module)
ft.timings["db.InsertModule"] = time.Since(start)
if err != nil {
log.Error(ctx, err)
ft.Status = derrors.ToHTTPStatus(err)
ft.Error = err
return ft
}
log.Infof(ctx, "db.InsertModule succeeded for %s@%s", ft.ModulePath, ft.RequestedVersion)
return ft
}
func updateVersionMapAndDeleteModulesWithErrors(ctx context.Context, db *postgres.DB, ft *fetchTask) (err error) {
defer derrors.Wrap(&err, "updateVersionMapAndDeleteModulesWithErrors(%q, %q, %q, %d, %v)",
ft.ModulePath, ft.RequestedVersion, ft.ResolvedVersion, ft.Status, ft.Error)
ctx, span := trace.StartSpan(ctx, "worker.updateFetchResult")
defer span.End()
var errMsg string
if ft.Error != nil {
errMsg = ft.Error.Error()
}
vm := &internal.VersionMap{
ModulePath: ft.ModulePath,
RequestedVersion: ft.RequestedVersion,
ResolvedVersion: ft.ResolvedVersion,
Status: ft.Status,
GoModPath: ft.GoModPath,
Error: errMsg,
}
start := time.Now()
err = db.UpsertVersionMap(ctx, vm)
ft.timings["db.UpsertVersionMap"] = time.Since(start)
if err != nil {
return err
}
if !semver.IsValid(vm.ResolvedVersion) {
// If the requestedVersion was not successfully resolved, at
// this point it will be the same as the resolvedVersion.
// No additional tables need to be updated.
return nil
}
// If there were any errors processing the module then we didn't insert it.
// Delete it in case we are reprocessing an existing module.
if vm.Status > 400 {
log.Infof(ctx, "%s@%s: code=%d, deleting", vm.ModulePath, vm.ResolvedVersion, vm.Status)
start = time.Now()
err = db.DeleteModule(ctx, vm.ModulePath, vm.ResolvedVersion)
ft.timings["db.DeleteModule"] = time.Since(start)
if err != nil {
return err
}
}
// If this was an alternative path (ft.Status == 491) and there is an older
// version in search_documents, delete it. This is the case where a module's
// canonical path was changed by the addition of a go.mod file. For example,
// versions of logrus before it acquired a go.mod file could have the path
// github.com/Sirupsen/logrus, but once the go.mod file specifies that the
// path is all lower-case, the old versions should not show up in search. We
// still leave their pages in the database so users of those old versions
// can still view documentation.
if vm.Status == derrors.ToHTTPStatus(derrors.AlternativeModule) {
log.Infof(ctx, "%s@%s: code=491, deleting older version from search", vm.ModulePath, vm.ResolvedVersion)
start = time.Now()
err = db.DeleteOlderVersionFromSearchDocuments(ctx, vm.ModulePath, vm.ResolvedVersion)
ft.timings["db.DeleteOlderVersionFromSearchDocuments"] = time.Since(start)
if err != nil {
return err
}
}
return nil
}
func logTaskResult(ctx context.Context, ft *fetchTask, prefix string) {
var times []string
for k, v := range ft.timings {
times = append(times, fmt.Sprintf("%s=%.3fs", k, v.Seconds()))
}
sort.Strings(times)
msg := strings.Join(times, ", ")
logf := log.Infof
if ft.Status == http.StatusInternalServerError {
logf = log.Errorf
}
logf(ctx, "%s for %s@%s: code=%d, num_packages=%d, err=%v; timings: %s",
prefix, ft.ModulePath, ft.ResolvedVersion, ft.Status, len(ft.PackageVersionStates), ft.Error, msg)
}