internal/{fetch,worker}: fetch info covers DB insertion

Change the FetchInfo data, used for the worker home page, to include
DB insertion.

For golang/go#48010

Change-Id: Id2ba42b96ebc0a93d7a13c7013ace0c9860a2e11
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/346809
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Julie Qiu <julie@golang.org>
diff --git a/internal/fetch/fetch.go b/internal/fetch/fetch.go
index 0d05920..e65e538 100644
--- a/internal/fetch/fetch.go
+++ b/internal/fetch/fetch.go
@@ -11,9 +11,7 @@
 	"fmt"
 	"io/fs"
 	"net/http"
-	"sort"
 	"strings"
-	"sync"
 	"time"
 
 	"go.opencensus.io/trace"
@@ -58,7 +56,7 @@
 	}
 	defer derrors.Wrap(&fr.Error, "FetchModule(%q, %q)", modulePath, requestedVersion)
 
-	fi, err := fetchModule(ctx, fr, mg, sourceClient)
+	err := fetchModule(ctx, fr, mg, sourceClient)
 	fr.Error = err
 	if err != nil {
 		fr.Status = derrors.ToStatus(fr.Error)
@@ -66,44 +64,31 @@
 	if fr.Status == 0 {
 		fr.Status = http.StatusOK
 	}
-	if fi != nil {
-		finishFetchInfo(fi, fr.Status, fr.Error)
-	}
 	return fr
 }
 
-func fetchModule(ctx context.Context, fr *FetchResult, mg ModuleGetter, sourceClient *source.Client) (*FetchInfo, error) {
+func fetchModule(ctx context.Context, fr *FetchResult, mg ModuleGetter, sourceClient *source.Client) error {
 	info, err := GetInfo(ctx, fr.ModulePath, fr.RequestedVersion, mg)
 	if err != nil {
-		return nil, err
+		return err
 	}
 	fr.ResolvedVersion = info.Version
 	commitTime := info.Time
 
-	// TODO(golang/go#48010): move fetch info to the worker.
-	fi := &FetchInfo{
-		ModulePath: fr.ModulePath,
-		Version:    fr.ResolvedVersion,
-		ZipSize:    uint64(0),
-		Start:      time.Now(),
-	}
-	startFetchInfo(fi)
-
 	var contentDir fs.FS
 	if fr.ModulePath == stdlib.ModulePath {
 		var resolvedVersion string
 		contentDir, resolvedVersion, commitTime, err = stdlib.ContentDir(fr.RequestedVersion)
 		if err != nil {
-			return fi, err
+			return err
 		}
 		// If the requested version is a branch name like "master" or "main", we cannot
 		// determine the right resolved version until we start working with the repo.
 		fr.ResolvedVersion = resolvedVersion
-		fi.Version = resolvedVersion
 	} else {
 		contentDir, err = mg.ContentDir(ctx, fr.ModulePath, fr.ResolvedVersion)
 		if err != nil {
-			return fi, err
+			return err
 		}
 	}
 
@@ -121,7 +106,7 @@
 	var goModBytes []byte
 	fr.GoModPath, goModBytes, err = getGoModPath(ctx, fr.ModulePath, fr.ResolvedVersion, mg)
 	if err != nil {
-		return fi, err
+		return err
 	}
 
 	// If there is no go.mod file in the zip, try another way to detect
@@ -131,21 +116,21 @@
 	if !fr.HasGoMod {
 		forkedModule, err := forkedFrom(contentDir, fr.ModulePath, fr.ResolvedVersion)
 		if err != nil {
-			return fi, err
+			return err
 		}
 		if forkedModule != "" {
-			return fi, fmt.Errorf("forked from %s: %w", forkedModule, derrors.AlternativeModule)
+			return fmt.Errorf("forked from %s: %w", forkedModule, derrors.AlternativeModule)
 		}
 	}
 
 	mod, pvs, err := processModuleContents(ctx, fr.ModulePath, fr.ResolvedVersion, fr.RequestedVersion, commitTime, contentDir, sourceClient)
 	if err != nil {
-		return fi, err
+		return err
 	}
 	mod.HasGoMod = fr.HasGoMod
 	if goModBytes != nil {
 		if err := processGoModFile(goModBytes, mod); err != nil {
-			return fi, fmt.Errorf("%v: %w", err.Error(), derrors.BadModule)
+			return fmt.Errorf("%v: %w", err.Error(), derrors.BadModule)
 		}
 	}
 	fr.Module = mod
@@ -155,7 +140,7 @@
 			fr.Status = derrors.ToStatus(derrors.HasIncompletePackages)
 		}
 	}
-	return fi, nil
+	return nil
 }
 
 // GetInfo returns the result of a request to the proxy .info endpoint. If
@@ -277,71 +262,3 @@
 	}
 	return false, ""
 }
-
-type FetchInfo struct {
-	ModulePath string
-	Version    string
-	ZipSize    uint64
-	Start      time.Time
-	Finish     time.Time
-	Status     int
-	Error      error
-}
-
-var (
-	fetchInfoMu  sync.Mutex
-	fetchInfoMap = map[*FetchInfo]struct{}{}
-)
-
-func init() {
-	const linger = time.Minute
-	go func() {
-		for {
-			now := time.Now()
-			fetchInfoMu.Lock()
-			for fi := range fetchInfoMap {
-				if !fi.Finish.IsZero() && now.Sub(fi.Finish) > linger {
-					delete(fetchInfoMap, fi)
-				}
-			}
-			fetchInfoMu.Unlock()
-			time.Sleep(linger)
-		}
-	}()
-}
-
-func startFetchInfo(fi *FetchInfo) {
-	fetchInfoMu.Lock()
-	defer fetchInfoMu.Unlock()
-	fetchInfoMap[fi] = struct{}{}
-}
-
-func finishFetchInfo(fi *FetchInfo, status int, err error) {
-	fetchInfoMu.Lock()
-	defer fetchInfoMu.Unlock()
-	fi.Finish = time.Now()
-	fi.Status = status
-	fi.Error = err
-}
-
-// FetchInfos returns information about all fetches in progress,
-// sorted by start time.
-func FetchInfos() []*FetchInfo {
-	var fis []*FetchInfo
-	fetchInfoMu.Lock()
-	for fi := range fetchInfoMap {
-		// Copy to avoid races on Status and Error when read by
-		// worker home page.
-		cfi := *fi
-		fis = append(fis, &cfi)
-	}
-	fetchInfoMu.Unlock()
-	// Order first by done-ness, then by age.
-	sort.Slice(fis, func(i, j int) bool {
-		if (fis[i].Status == 0) == (fis[j].Status == 0) {
-			return fis[i].Start.Before(fis[j].Start)
-		}
-		return fis[i].Status == 0
-	})
-	return fis
-}
diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go
index d8faff6..0c6a533 100644
--- a/internal/worker/fetch.go
+++ b/internal/worker/fetch.go
@@ -137,12 +137,21 @@
 	defer span.End()
 
 	// If we're overloaded, shed load by not processing this module.
-	deferFunc, err := f.maybeShed(ctx, modulePath, requestedVersion)
+	deferFunc, zipSize, err := f.maybeShed(ctx, modulePath, requestedVersion)
 	defer deferFunc()
 	if err != nil {
 		return derrors.ToStatus(err), "", err
 	}
 
+	fi := &FetchInfo{
+		ModulePath: modulePath,
+		Version:    requestedVersion,
+		ZipSize:    uint64(zipSize),
+		Start:      time.Now(),
+	}
+	startFetchInfo(fi)
+	defer func() { finishFetchInfo(fi, status, err) }()
+
 	// Begin by htting the proxy's info endpoint. That will make the proxy aware
 	// of the version if it isn't already, as can happen when we arrive here via
 	// frontend fetch. We ignore both the error and the information itself at
@@ -496,13 +505,13 @@
 	return f.DB.UpdateLatestModuleVersions(ctx, lmv)
 }
 
-func (f *Fetcher) maybeShed(ctx context.Context, modulePath, version string) (func(), error) {
+func (f *Fetcher) maybeShed(ctx context.Context, modulePath, version string) (func(), int64, error) {
 	if zipLoadShedder == nil {
-		return func() {}, nil
+		return func() {}, 0, nil
 	}
 	zipSize, err := getZipSize(ctx, modulePath, version, f.ProxyClient)
 	if err != nil {
-		return func() {}, err
+		return func() {}, 0, err
 	}
 	// Load shed or mark module as too large.
 	// We treat zip size as a proxy for the total memory consumed by
@@ -511,14 +520,14 @@
 	shouldShed, deferFunc := zipLoadShedder.shouldShed(uint64(zipSize))
 	if shouldShed {
 		stats.Record(ctx, fetchesShedded.M(1))
-		return deferFunc, fmt.Errorf("%w: size=%dMi", derrors.SheddingLoad, zipSize/mib)
+		return deferFunc, 0, fmt.Errorf("%w: size=%dMi", derrors.SheddingLoad, zipSize/mib)
 	}
 	if zipSize > maxModuleZipSize {
 		log.Warningf(ctx, "FetchModule: %s@%s zip size %dMi exceeds max %dMi",
 			modulePath, version, zipSize/mib, maxModuleZipSize/mib)
-		return deferFunc, derrors.ModuleTooLarge
+		return deferFunc, 0, derrors.ModuleTooLarge
 	}
-	return deferFunc, nil
+	return deferFunc, zipSize, nil
 }
 
 func getZipSize(ctx context.Context, modulePath, resolvedVersion string, prox *proxy.Client) (_ int64, err error) {
diff --git a/internal/worker/fetchinfo.go b/internal/worker/fetchinfo.go
new file mode 100644
index 0000000..2052e42
--- /dev/null
+++ b/internal/worker/fetchinfo.go
@@ -0,0 +1,83 @@
+// Copyright 2021 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package worker
+
+import (
+	"sort"
+	"sync"
+	"time"
+)
+
+// FetchInfo describes a fetch in progress, or completed.
+// It is used to display information on the worker home page.
+type FetchInfo struct {
+	ModulePath string
+	Version    string
+	ZipSize    uint64
+	Start      time.Time
+	Finish     time.Time
+	Status     int
+	Error      error
+}
+
+var (
+	fetchInfoMu  sync.Mutex
+	fetchInfoMap = map[*FetchInfo]struct{}{}
+)
+
+func init() {
+	// Start a goroutine to remove FetchInfos that have been finished for a
+	// while.
+	const linger = time.Minute
+	go func() {
+		for {
+			now := time.Now()
+			fetchInfoMu.Lock()
+			for fi := range fetchInfoMap {
+				if !fi.Finish.IsZero() && now.Sub(fi.Finish) > linger {
+					delete(fetchInfoMap, fi)
+				}
+			}
+			fetchInfoMu.Unlock()
+			time.Sleep(linger)
+		}
+	}()
+}
+
+func startFetchInfo(fi *FetchInfo) {
+	fetchInfoMu.Lock()
+	defer fetchInfoMu.Unlock()
+	fetchInfoMap[fi] = struct{}{}
+}
+
+func finishFetchInfo(fi *FetchInfo, status int, err error) {
+	fetchInfoMu.Lock()
+	defer fetchInfoMu.Unlock()
+	fi.Finish = time.Now()
+	fi.Status = status
+	fi.Error = err
+}
+
+// FetchInfos returns information about all fetches in progress,
+// sorted by start time.
+func FetchInfos() []*FetchInfo {
+	var fis []*FetchInfo
+	fetchInfoMu.Lock()
+	for fi := range fetchInfoMap {
+		// Copy to avoid races on Status and Error when read by
+		// worker home page.
+		cfi := *fi
+		fis = append(fis, &cfi)
+	}
+	fetchInfoMu.Unlock()
+	// Order first by done-ness, then by age.
+	sort.Slice(fis, func(i, j int) bool {
+		if (fis[i].Status == 0) == (fis[j].Status == 0) {
+			return fis[i].Start.Before(fis[j].Start)
+		}
+		return fis[i].Status == 0
+	})
+	return fis
+}
diff --git a/internal/worker/pages.go b/internal/worker/pages.go
index 1c56ee7..7a62644 100644
--- a/internal/worker/pages.go
+++ b/internal/worker/pages.go
@@ -20,7 +20,6 @@
 	"golang.org/x/pkgsite/internal"
 	"golang.org/x/pkgsite/internal/config"
 	"golang.org/x/pkgsite/internal/derrors"
-	"golang.org/x/pkgsite/internal/fetch"
 	"golang.org/x/pkgsite/internal/log"
 	"golang.org/x/pkgsite/internal/postgres"
 	"golang.org/x/sync/errgroup"
@@ -99,7 +98,7 @@
 		ProcessStats    processMemStats
 		SystemStats     systemMemStats
 		CgroupStats     map[string]uint64
-		Fetches         []*fetch.FetchInfo
+		Fetches         []*FetchInfo
 		LogsURL         string
 	}{
 		Config:         s.cfg,
@@ -115,7 +114,7 @@
 		ProcessStats:   pms,
 		SystemStats:    sms,
 		CgroupStats:    getCgroupMemStats(),
-		Fetches:        fetch.FetchInfos(),
+		Fetches:        FetchInfos(),
 		LogsURL:        logsURL,
 	}
 	return renderPage(ctx, w, page, s.templates[indexTemplate])