internal/{fetch,worker}: include DB insertion in fetch metrics

Fetch latency and related metrics include the time spent inserting
into the DB, as well as the time to fetch and process the module.

For golang/go#48010

Change-Id: I1d685bd25f1b632b0b20de5b1bfac5003bff0caa
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/346750
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
Reviewed-by: Julie Qiu <julie@golang.org>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index feeb1d5..821f11a 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -22,7 +22,6 @@
 	"golang.org/x/pkgsite/internal"
 	"golang.org/x/pkgsite/internal/config"
 	"golang.org/x/pkgsite/internal/dcensus"
-	"golang.org/x/pkgsite/internal/fetch"
 	"golang.org/x/pkgsite/internal/index"
 	"golang.org/x/pkgsite/internal/log"
 	"golang.org/x/pkgsite/internal/middleware"
@@ -117,9 +116,9 @@
 		worker.UnprocessedModules,
 		worker.UnprocessedNewModules,
 		worker.SheddedFetchCount,
-		fetch.FetchLatencyDistribution,
-		fetch.FetchResponseCount,
-		fetch.FetchPackageCount)
+		worker.FetchLatencyDistribution,
+		worker.FetchResponseCount,
+		worker.FetchPackageCount)
 	if err := dcensus.Init(cfg, views...); err != nil {
 		log.Fatal(ctx, err)
 	}
diff --git a/internal/fetch/fetch.go b/internal/fetch/fetch.go
index b7adfc0..0d05920 100644
--- a/internal/fetch/fetch.go
+++ b/internal/fetch/fetch.go
@@ -12,19 +12,13 @@
 	"io/fs"
 	"net/http"
 	"sort"
-	"strconv"
 	"strings"
 	"sync"
 	"time"
 
-	"go.opencensus.io/plugin/ochttp"
-	"go.opencensus.io/stats"
-	"go.opencensus.io/stats/view"
-	"go.opencensus.io/tag"
 	"go.opencensus.io/trace"
 	"golang.org/x/mod/modfile"
 	"golang.org/x/pkgsite/internal"
-	"golang.org/x/pkgsite/internal/dcensus"
 	"golang.org/x/pkgsite/internal/derrors"
 	"golang.org/x/pkgsite/internal/licenses"
 	"golang.org/x/pkgsite/internal/log"
@@ -35,44 +29,6 @@
 
 var ErrModuleContainsNoPackages = errors.New("module contains 0 packages")
 
-var (
-	fetchLatency = stats.Float64(
-		"go-discovery/worker/fetch-latency",
-		"Latency of a fetch request.",
-		stats.UnitSeconds,
-	)
-	fetchedPackages = stats.Int64(
-		"go-discovery/worker/fetch-package-count",
-		"Count of successfully fetched packages.",
-		stats.UnitDimensionless,
-	)
-
-	// FetchLatencyDistribution aggregates frontend fetch request
-	// latency by status code. It does not count shedded requests.
-	FetchLatencyDistribution = &view.View{
-		Name:        "go-discovery/worker/fetch-latency",
-		Measure:     fetchLatency,
-		Aggregation: ochttp.DefaultLatencyDistribution,
-		Description: "Fetch latency by result status.",
-		TagKeys:     []tag.Key{dcensus.KeyStatus},
-	}
-	// FetchResponseCount counts fetch responses by status.
-	FetchResponseCount = &view.View{
-		Name:        "go-discovery/worker/fetch-count",
-		Measure:     fetchLatency,
-		Aggregation: view.Count(),
-		Description: "Fetch request count by result status",
-		TagKeys:     []tag.Key{dcensus.KeyStatus},
-	}
-	// FetchPackageCount counts how many packages were successfully fetched.
-	FetchPackageCount = &view.View{
-		Name:        "go-discovery/worker/fetch-package-count",
-		Measure:     fetchedPackages,
-		Aggregation: view.Count(),
-		Description: "Count of packages successfully fetched",
-	}
-)
-
 type FetchResult struct {
 	ModulePath       string
 	RequestedVersion string
@@ -96,15 +52,6 @@
 //
 // Even if err is non-nil, the result may contain useful information, like the go.mod path.
 func FetchModule(ctx context.Context, modulePath, requestedVersion string, mg ModuleGetter, sourceClient *source.Client) (fr *FetchResult) {
-	start := time.Now()
-	defer func() {
-		latency := float64(time.Since(start).Seconds())
-		dcensus.RecordWithTag(ctx, dcensus.KeyStatus, strconv.Itoa(fr.Status), fetchLatency.M(latency))
-		if fr.Status < 300 {
-			stats.Record(ctx, fetchedPackages.M(int64(len(fr.PackageVersionStates))))
-		}
-	}()
-
 	fr = &FetchResult{
 		ModulePath:       modulePath,
 		RequestedVersion: requestedVersion,
diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go
index 5610ea4..d8faff6 100644
--- a/internal/worker/fetch.go
+++ b/internal/worker/fetch.go
@@ -11,18 +11,22 @@
 	"math"
 	"net/http"
 	"sort"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
 	"unicode/utf8"
 
+	"go.opencensus.io/plugin/ochttp"
 	"go.opencensus.io/stats"
 	"go.opencensus.io/stats/view"
+	"go.opencensus.io/tag"
 	"go.opencensus.io/trace"
 	"golang.org/x/mod/semver"
 	"golang.org/x/pkgsite/internal"
 	"golang.org/x/pkgsite/internal/cache"
 	"golang.org/x/pkgsite/internal/config"
+	"golang.org/x/pkgsite/internal/dcensus"
 	"golang.org/x/pkgsite/internal/derrors"
 	"golang.org/x/pkgsite/internal/experiment"
 	"golang.org/x/pkgsite/internal/fetch"
@@ -39,6 +43,41 @@
 		"Count of shedded fetches.",
 		stats.UnitDimensionless,
 	)
+	fetchLatency = stats.Float64(
+		"go-discovery/worker/fetch-latency",
+		"Latency of a fetch request.",
+		stats.UnitSeconds,
+	)
+	fetchedPackages = stats.Int64(
+		"go-discovery/worker/fetch-package-count",
+		"Count of successfully fetched packages.",
+		stats.UnitDimensionless,
+	)
+
+	// FetchLatencyDistribution aggregates frontend fetch request
+	// latency by status code. It does not count shedded requests.
+	FetchLatencyDistribution = &view.View{
+		Name:        "go-discovery/worker/fetch-latency",
+		Measure:     fetchLatency,
+		Aggregation: ochttp.DefaultLatencyDistribution,
+		Description: "Fetch latency by result status.",
+		TagKeys:     []tag.Key{dcensus.KeyStatus},
+	}
+	// FetchResponseCount counts fetch responses by status.
+	FetchResponseCount = &view.View{
+		Name:        "go-discovery/worker/fetch-count",
+		Measure:     fetchLatency,
+		Aggregation: view.Count(),
+		Description: "Fetch request count by result status",
+		TagKeys:     []tag.Key{dcensus.KeyStatus},
+	}
+	// FetchPackageCount counts how many packages were successfully fetched.
+	FetchPackageCount = &view.View{
+		Name:        "go-discovery/worker/fetch-package-count",
+		Measure:     fetchedPackages,
+		Aggregation: view.Count(),
+		Description: "Count of packages successfully fetched",
+	}
 
 	// SheddedFetchCount counts the number of fetches that were shedded.
 	SheddedFetchCount = &view.View{
@@ -67,11 +106,22 @@
 // the module_version_states table according to the result. It returns an HTTP
 // status code representing the result of the fetch operation, and a non-nil
 // error if this status code is not 200.
-func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion, appVersionLabel string) (_ int, resolvedVersion string, err error) {
+func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion, appVersionLabel string) (status int, resolvedVersion string, err error) {
 	defer derrors.Wrap(&err, "FetchAndUpdateState(%q, %q, %q)", modulePath, requestedVersion, appVersionLabel)
 	tctx, span := trace.StartSpan(ctx, "FetchAndUpdateState")
 	ctx = experiment.NewContext(tctx, experiment.FromContext(ctx).Active()...)
 	ctx = log.NewContextWithLabel(ctx, "fetch", modulePath+"@"+requestedVersion)
+
+	start := time.Now()
+	var nPackages int64
+	defer func() {
+		latency := float64(time.Since(start).Seconds())
+		dcensus.RecordWithTag(ctx, dcensus.KeyStatus, strconv.Itoa(status), fetchLatency.M(latency))
+		if status < 300 {
+			stats.Record(ctx, fetchedPackages.M(nPackages))
+		}
+	}()
+
 	if !utf8.ValidString(modulePath) {
 		log.Errorf(ctx, "module path %q is not valid UTF-8", modulePath)
 	}
@@ -108,7 +158,8 @@
 		return derrors.ToStatus(err), "", err
 	}
 	ft := f.fetchAndInsertModule(ctx, modulePath, requestedVersion, lmv)
-	span.AddAttributes(trace.Int64Attribute("numPackages", int64(len(ft.PackageVersionStates))))
+	nPackages = int64(len(ft.PackageVersionStates))
+	span.AddAttributes(trace.Int64Attribute("numPackages", nPackages))
 
 	// If there were any errors processing the module then we didn't insert it.
 	// Delete it in case we are reprocessing an existing module.
@@ -160,7 +211,7 @@
 	// action.
 	// TODO(golang/go#39628): Split UpsertModuleVersionState into
 	// InsertModuleVersionState and UpdateModuleVersionState.
-	start := time.Now()
+	startUpdate := time.Now()
 	mvs := &postgres.ModuleVersionStateForUpsert{
 		ModulePath:           ft.ModulePath,
 		Version:              ft.ResolvedVersion,
@@ -172,7 +223,7 @@
 		PackageVersionStates: ft.PackageVersionStates,
 	}
 	err = f.DB.UpsertModuleVersionState(ctx, mvs)
-	ft.timings["db.UpsertModuleVersionState"] = time.Since(start)
+	ft.timings["db.UpsertModuleVersionState"] = time.Since(startUpdate)
 	if err != nil {
 		log.Error(ctx, err)
 		if ft.Error != nil {