internal/{fetch,worker}: fetch info covers DB insertion
Change the FetchInfo data, used for the worker home page, to include
DB insertion.
For golang/go#48010
Change-Id: Id2ba42b96ebc0a93d7a13c7013ace0c9860a2e11
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/346809
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Julie Qiu <julie@golang.org>
diff --git a/internal/fetch/fetch.go b/internal/fetch/fetch.go
index 0d05920..e65e538 100644
--- a/internal/fetch/fetch.go
+++ b/internal/fetch/fetch.go
@@ -11,9 +11,7 @@
"fmt"
"io/fs"
"net/http"
- "sort"
"strings"
- "sync"
"time"
"go.opencensus.io/trace"
@@ -58,7 +56,7 @@
}
defer derrors.Wrap(&fr.Error, "FetchModule(%q, %q)", modulePath, requestedVersion)
- fi, err := fetchModule(ctx, fr, mg, sourceClient)
+ err := fetchModule(ctx, fr, mg, sourceClient)
fr.Error = err
if err != nil {
fr.Status = derrors.ToStatus(fr.Error)
@@ -66,44 +64,31 @@
if fr.Status == 0 {
fr.Status = http.StatusOK
}
- if fi != nil {
- finishFetchInfo(fi, fr.Status, fr.Error)
- }
return fr
}
-func fetchModule(ctx context.Context, fr *FetchResult, mg ModuleGetter, sourceClient *source.Client) (*FetchInfo, error) {
+func fetchModule(ctx context.Context, fr *FetchResult, mg ModuleGetter, sourceClient *source.Client) error {
info, err := GetInfo(ctx, fr.ModulePath, fr.RequestedVersion, mg)
if err != nil {
- return nil, err
+ return err
}
fr.ResolvedVersion = info.Version
commitTime := info.Time
- // TODO(golang/go#48010): move fetch info to the worker.
- fi := &FetchInfo{
- ModulePath: fr.ModulePath,
- Version: fr.ResolvedVersion,
- ZipSize: uint64(0),
- Start: time.Now(),
- }
- startFetchInfo(fi)
-
var contentDir fs.FS
if fr.ModulePath == stdlib.ModulePath {
var resolvedVersion string
contentDir, resolvedVersion, commitTime, err = stdlib.ContentDir(fr.RequestedVersion)
if err != nil {
- return fi, err
+ return err
}
// If the requested version is a branch name like "master" or "main", we cannot
// determine the right resolved version until we start working with the repo.
fr.ResolvedVersion = resolvedVersion
- fi.Version = resolvedVersion
} else {
contentDir, err = mg.ContentDir(ctx, fr.ModulePath, fr.ResolvedVersion)
if err != nil {
- return fi, err
+ return err
}
}
@@ -121,7 +106,7 @@
var goModBytes []byte
fr.GoModPath, goModBytes, err = getGoModPath(ctx, fr.ModulePath, fr.ResolvedVersion, mg)
if err != nil {
- return fi, err
+ return err
}
// If there is no go.mod file in the zip, try another way to detect
@@ -131,21 +116,21 @@
if !fr.HasGoMod {
forkedModule, err := forkedFrom(contentDir, fr.ModulePath, fr.ResolvedVersion)
if err != nil {
- return fi, err
+ return err
}
if forkedModule != "" {
- return fi, fmt.Errorf("forked from %s: %w", forkedModule, derrors.AlternativeModule)
+ return fmt.Errorf("forked from %s: %w", forkedModule, derrors.AlternativeModule)
}
}
mod, pvs, err := processModuleContents(ctx, fr.ModulePath, fr.ResolvedVersion, fr.RequestedVersion, commitTime, contentDir, sourceClient)
if err != nil {
- return fi, err
+ return err
}
mod.HasGoMod = fr.HasGoMod
if goModBytes != nil {
if err := processGoModFile(goModBytes, mod); err != nil {
- return fi, fmt.Errorf("%v: %w", err.Error(), derrors.BadModule)
+ return fmt.Errorf("%v: %w", err.Error(), derrors.BadModule)
}
}
fr.Module = mod
@@ -155,7 +140,7 @@
fr.Status = derrors.ToStatus(derrors.HasIncompletePackages)
}
}
- return fi, nil
+ return nil
}
// GetInfo returns the result of a request to the proxy .info endpoint. If
@@ -277,71 +262,3 @@
}
return false, ""
}
-
-type FetchInfo struct {
- ModulePath string
- Version string
- ZipSize uint64
- Start time.Time
- Finish time.Time
- Status int
- Error error
-}
-
-var (
- fetchInfoMu sync.Mutex
- fetchInfoMap = map[*FetchInfo]struct{}{}
-)
-
-func init() {
- const linger = time.Minute
- go func() {
- for {
- now := time.Now()
- fetchInfoMu.Lock()
- for fi := range fetchInfoMap {
- if !fi.Finish.IsZero() && now.Sub(fi.Finish) > linger {
- delete(fetchInfoMap, fi)
- }
- }
- fetchInfoMu.Unlock()
- time.Sleep(linger)
- }
- }()
-}
-
-func startFetchInfo(fi *FetchInfo) {
- fetchInfoMu.Lock()
- defer fetchInfoMu.Unlock()
- fetchInfoMap[fi] = struct{}{}
-}
-
-func finishFetchInfo(fi *FetchInfo, status int, err error) {
- fetchInfoMu.Lock()
- defer fetchInfoMu.Unlock()
- fi.Finish = time.Now()
- fi.Status = status
- fi.Error = err
-}
-
-// FetchInfos returns information about all fetches in progress,
-// sorted by start time.
-func FetchInfos() []*FetchInfo {
- var fis []*FetchInfo
- fetchInfoMu.Lock()
- for fi := range fetchInfoMap {
- // Copy to avoid races on Status and Error when read by
- // worker home page.
- cfi := *fi
- fis = append(fis, &cfi)
- }
- fetchInfoMu.Unlock()
- // Order first by done-ness, then by age.
- sort.Slice(fis, func(i, j int) bool {
- if (fis[i].Status == 0) == (fis[j].Status == 0) {
- return fis[i].Start.Before(fis[j].Start)
- }
- return fis[i].Status == 0
- })
- return fis
-}
diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go
index d8faff6..0c6a533 100644
--- a/internal/worker/fetch.go
+++ b/internal/worker/fetch.go
@@ -137,12 +137,21 @@
defer span.End()
// If we're overloaded, shed load by not processing this module.
- deferFunc, err := f.maybeShed(ctx, modulePath, requestedVersion)
+ deferFunc, zipSize, err := f.maybeShed(ctx, modulePath, requestedVersion)
defer deferFunc()
if err != nil {
return derrors.ToStatus(err), "", err
}
+ fi := &FetchInfo{
+ ModulePath: modulePath,
+ Version: requestedVersion,
+ ZipSize: uint64(zipSize),
+ Start: time.Now(),
+ }
+ startFetchInfo(fi)
+ defer func() { finishFetchInfo(fi, status, err) }()
+
// Begin by htting the proxy's info endpoint. That will make the proxy aware
// of the version if it isn't already, as can happen when we arrive here via
// frontend fetch. We ignore both the error and the information itself at
@@ -496,13 +505,13 @@
return f.DB.UpdateLatestModuleVersions(ctx, lmv)
}
-func (f *Fetcher) maybeShed(ctx context.Context, modulePath, version string) (func(), error) {
+func (f *Fetcher) maybeShed(ctx context.Context, modulePath, version string) (func(), int64, error) {
if zipLoadShedder == nil {
- return func() {}, nil
+ return func() {}, 0, nil
}
zipSize, err := getZipSize(ctx, modulePath, version, f.ProxyClient)
if err != nil {
- return func() {}, err
+ return func() {}, 0, err
}
// Load shed or mark module as too large.
// We treat zip size as a proxy for the total memory consumed by
@@ -511,14 +520,14 @@
shouldShed, deferFunc := zipLoadShedder.shouldShed(uint64(zipSize))
if shouldShed {
stats.Record(ctx, fetchesShedded.M(1))
- return deferFunc, fmt.Errorf("%w: size=%dMi", derrors.SheddingLoad, zipSize/mib)
+ return deferFunc, 0, fmt.Errorf("%w: size=%dMi", derrors.SheddingLoad, zipSize/mib)
}
if zipSize > maxModuleZipSize {
log.Warningf(ctx, "FetchModule: %s@%s zip size %dMi exceeds max %dMi",
modulePath, version, zipSize/mib, maxModuleZipSize/mib)
- return deferFunc, derrors.ModuleTooLarge
+ return deferFunc, 0, derrors.ModuleTooLarge
}
- return deferFunc, nil
+ return deferFunc, zipSize, nil
}
func getZipSize(ctx context.Context, modulePath, resolvedVersion string, prox *proxy.Client) (_ int64, err error) {
diff --git a/internal/worker/fetchinfo.go b/internal/worker/fetchinfo.go
new file mode 100644
index 0000000..2052e42
--- /dev/null
+++ b/internal/worker/fetchinfo.go
@@ -0,0 +1,83 @@
+// Copyright 2021 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package worker
+
+import (
+ "sort"
+ "sync"
+ "time"
+)
+
+// FetchInfo describes a fetch in progress, or completed.
+// It is used to display information on the worker home page.
+type FetchInfo struct {
+ ModulePath string
+ Version string
+ ZipSize uint64
+ Start time.Time
+ Finish time.Time
+ Status int
+ Error error
+}
+
+var (
+ fetchInfoMu sync.Mutex
+ fetchInfoMap = map[*FetchInfo]struct{}{}
+)
+
+func init() {
+ // Start a goroutine to remove FetchInfos that have been finished for a
+ // while.
+ const linger = time.Minute
+ go func() {
+ for {
+ now := time.Now()
+ fetchInfoMu.Lock()
+ for fi := range fetchInfoMap {
+ if !fi.Finish.IsZero() && now.Sub(fi.Finish) > linger {
+ delete(fetchInfoMap, fi)
+ }
+ }
+ fetchInfoMu.Unlock()
+ time.Sleep(linger)
+ }
+ }()
+}
+
+func startFetchInfo(fi *FetchInfo) {
+ fetchInfoMu.Lock()
+ defer fetchInfoMu.Unlock()
+ fetchInfoMap[fi] = struct{}{}
+}
+
+func finishFetchInfo(fi *FetchInfo, status int, err error) {
+ fetchInfoMu.Lock()
+ defer fetchInfoMu.Unlock()
+ fi.Finish = time.Now()
+ fi.Status = status
+ fi.Error = err
+}
+
+// FetchInfos returns information about all fetches in progress,
+// sorted by start time.
+func FetchInfos() []*FetchInfo {
+ var fis []*FetchInfo
+ fetchInfoMu.Lock()
+ for fi := range fetchInfoMap {
+ // Copy to avoid races on Status and Error when read by
+ // worker home page.
+ cfi := *fi
+ fis = append(fis, &cfi)
+ }
+ fetchInfoMu.Unlock()
+ // Order first by done-ness, then by age.
+ sort.Slice(fis, func(i, j int) bool {
+ if (fis[i].Status == 0) == (fis[j].Status == 0) {
+ return fis[i].Start.Before(fis[j].Start)
+ }
+ return fis[i].Status == 0
+ })
+ return fis
+}
diff --git a/internal/worker/pages.go b/internal/worker/pages.go
index 1c56ee7..7a62644 100644
--- a/internal/worker/pages.go
+++ b/internal/worker/pages.go
@@ -20,7 +20,6 @@
"golang.org/x/pkgsite/internal"
"golang.org/x/pkgsite/internal/config"
"golang.org/x/pkgsite/internal/derrors"
- "golang.org/x/pkgsite/internal/fetch"
"golang.org/x/pkgsite/internal/log"
"golang.org/x/pkgsite/internal/postgres"
"golang.org/x/sync/errgroup"
@@ -99,7 +98,7 @@
ProcessStats processMemStats
SystemStats systemMemStats
CgroupStats map[string]uint64
- Fetches []*fetch.FetchInfo
+ Fetches []*FetchInfo
LogsURL string
}{
Config: s.cfg,
@@ -115,7 +114,7 @@
ProcessStats: pms,
SystemStats: sms,
CgroupStats: getCgroupMemStats(),
- Fetches: fetch.FetchInfos(),
+ Fetches: FetchInfos(),
LogsURL: logsURL,
}
return renderPage(ctx, w, page, s.templates[indexTemplate])