blob: 72c6d21cc2cc3a931d10692ef68136624c95e1d3 [file] [log] [blame]
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// The fetch command runs a server that fetches modules from a proxy and writes
// them to the discovery database.
package main
import (
"bufio"
"context"
"flag"
"net/http"
"os"
"strconv"
"strings"
"time"
cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
"cloud.google.com/go/errorreporting"
"cloud.google.com/go/profiler"
"github.com/go-redis/redis/v7"
"golang.org/x/pkgsite/internal/config"
"golang.org/x/pkgsite/internal/database"
"golang.org/x/pkgsite/internal/dcensus"
"golang.org/x/pkgsite/internal/experiment"
"golang.org/x/pkgsite/internal/index"
"golang.org/x/pkgsite/internal/queue"
"golang.org/x/pkgsite/internal/source"
"golang.org/x/pkgsite/internal/worker"
"golang.org/x/pkgsite/internal/log"
"golang.org/x/pkgsite/internal/middleware"
"golang.org/x/pkgsite/internal/postgres"
"golang.org/x/pkgsite/internal/proxy"
"contrib.go.opencensus.io/integrations/ocsql"
)
var (
timeout = config.GetEnv("GO_DISCOVERY_WORKER_TIMEOUT_MINUTES", "10")
queueName = config.GetEnv("GO_DISCOVERY_WORKER_TASK_QUEUE", "")
workers = flag.Int("workers", 10, "number of concurrent requests to the fetch service, when running locally")
staticPath = flag.String("static", "content/static", "path to folder containing static files served")
)
func main() {
flag.Parse()
ctx := context.Background()
cfg, err := config.Init(ctx)
if err != nil {
log.Fatal(ctx, err)
}
cfg.Dump(os.Stderr)
if cfg.UseProfiler {
if err := profiler.Start(profiler.Config{}); err != nil {
log.Fatalf(ctx, "profiler.Start: %v", err)
}
}
readProxyRemoved(ctx)
// Wrap the postgres driver with OpenCensus instrumentation.
driverName, err := ocsql.Register("postgres", ocsql.WithAllTraceOptions())
if err != nil {
log.Fatalf(ctx, "unable to register the ocsql driver: %v\n", err)
}
ddb, err := database.Open(driverName, cfg.DBConnInfo())
if err != nil {
log.Fatalf(ctx, "database.Open: %v", err)
}
db := postgres.New(ddb)
defer db.Close()
populateExcluded(ctx, db)
indexClient, err := index.New(cfg.IndexURL)
if err != nil {
log.Fatal(ctx, err)
}
proxyClient, err := proxy.New(cfg.ProxyURL)
if err != nil {
log.Fatal(ctx, err)
}
sourceClient := source.NewClient(config.SourceTimeout)
fetchQueue := newQueue(ctx, cfg, proxyClient, sourceClient, db)
reportingClient := reportingClient(ctx, cfg)
redisHAClient := getHARedis(ctx, cfg)
redisCacheClient := getCacheRedis(ctx, cfg)
server, err := worker.NewServer(cfg, worker.ServerConfig{
DB: db,
IndexClient: indexClient,
ProxyClient: proxyClient,
SourceClient: sourceClient,
RedisHAClient: redisHAClient,
RedisCacheClient: redisCacheClient,
Queue: fetchQueue,
ReportingClient: reportingClient,
TaskIDChangeInterval: config.TaskIDChangeIntervalWorker,
StaticPath: *staticPath,
})
if err != nil {
log.Fatal(ctx, err)
}
router := dcensus.NewRouter(nil)
server.Install(router.Handle)
views := append(dcensus.ClientViews, dcensus.ServerViews...)
if err := dcensus.Init(cfg, views...); err != nil {
log.Fatal(ctx, err)
}
// We are not currently forwarding any ports on AppEngine, so serving debug
// information is broken.
if !cfg.OnAppEngine() {
dcensusServer, err := dcensus.NewServer()
if err != nil {
log.Fatal(ctx, err)
}
go http.ListenAndServe(cfg.DebugAddr("localhost:8001"), dcensusServer)
}
handlerTimeout, err := strconv.Atoi(timeout)
if err != nil {
log.Fatalf(ctx, "strconv.Atoi(%q): %v", timeout, err)
}
requestLogger := logger(ctx, cfg)
experimenter, err := middleware.NewExperimenter(ctx, 1*time.Minute, db, requestLogger)
if err != nil {
log.Fatal(ctx, err)
}
mw := middleware.Chain(
middleware.RequestLog(requestLogger),
middleware.Timeout(time.Duration(handlerTimeout)*time.Minute),
middleware.Experiment(experimenter),
)
http.Handle("/", mw(router))
addr := cfg.HostAddr("localhost:8000")
log.Infof(ctx, "Listening on addr %s", addr)
log.Fatal(ctx, http.ListenAndServe(addr, nil))
}
func newQueue(ctx context.Context, cfg *config.Config, proxyClient *proxy.Client, sourceClient *source.Client, db *postgres.DB) queue.Queue {
if !cfg.OnAppEngine() {
experiments, err := db.GetExperiments(ctx)
if err != nil {
log.Fatal(ctx, err)
}
set := map[string]bool{}
for _, e := range experiments {
if e.Rollout > 0 {
set[e.Name] = true
}
}
return queue.NewInMemory(ctx, proxyClient, sourceClient, db, *workers,
worker.FetchAndUpdateState, experiment.NewSet(set))
}
if queueName == "" {
log.Fatal(ctx, "missing queue: must set GO_DISCOVERY_WORKER_TASK_QUEUE env var")
}
client, err := cloudtasks.NewClient(ctx)
if err != nil {
log.Fatal(ctx, err)
}
return queue.NewGCP(cfg, client, queueName)
}
func getHARedis(ctx context.Context, cfg *config.Config) *redis.Client {
// We update completions with one big pipeline, so we need long write
// timeouts. ReadTimeout is increased only to be consistent with
// WriteTimeout.
return getRedis(ctx, cfg.RedisHAHost, cfg.RedisHAPort, 5*time.Minute, 5*time.Minute)
}
func getCacheRedis(ctx context.Context, cfg *config.Config) *redis.Client {
return getRedis(ctx, cfg.RedisCacheHost, cfg.RedisCachePort, 0, 0)
}
func getRedis(ctx context.Context, host, port string, writeTimeout, readTimeout time.Duration) *redis.Client {
if host == "" {
return nil
}
var dialTimeout time.Duration
if dl, ok := ctx.Deadline(); ok {
dialTimeout = time.Until(dl)
}
return redis.NewClient(&redis.Options{
Addr: host + ":" + port,
DialTimeout: dialTimeout,
WriteTimeout: writeTimeout,
ReadTimeout: readTimeout,
})
}
func reportingClient(ctx context.Context, cfg *config.Config) *errorreporting.Client {
if !cfg.OnAppEngine() {
return nil
}
reporter, err := errorreporting.NewClient(ctx, cfg.ProjectID, errorreporting.Config{
ServiceName: cfg.ServiceID,
OnError: func(err error) {
log.Errorf(ctx, "Error reporting failed: %v", err)
},
})
if err != nil {
log.Fatal(ctx, err)
}
return reporter
}
func logger(ctx context.Context, cfg *config.Config) middleware.Logger {
if cfg.OnAppEngine() {
logger, err := log.UseStackdriver(ctx, cfg, "worker-log")
if err != nil {
log.Fatal(ctx, err)
}
return logger
}
return middleware.LocalLogger{}
}
// Read a file of module versions that we should ignore because
// the are in the index but not stored in the proxy.
// Format of the file: each line is
// module@version
func readProxyRemoved(ctx context.Context) {
filename := config.GetEnv("GO_DISCOVERY_PROXY_REMOVED", "")
if filename == "" {
return
}
lines, err := readFileLines(filename)
if err != nil {
log.Fatal(ctx, err)
}
for _, line := range lines {
worker.ProxyRemoved[line] = true
}
log.Infof(ctx, "read %d excluded module versions from %s", len(worker.ProxyRemoved), filename)
}
// populateExcluded adds each element of excludedPrefixes to the excluded_prefixes
// table if it isn't already present.
func populateExcluded(ctx context.Context, db *postgres.DB) {
filename := config.GetEnv("GO_DISCOVERY_EXCLUDED_FILENAME", "")
if filename == "" {
return
}
lines, err := readFileLines(filename)
if err != nil {
log.Fatal(ctx, err)
}
user := os.Getenv("USER")
if user == "" {
user = "etl"
}
for _, line := range lines {
var prefix, reason string
i := strings.IndexAny(line, " \t")
if i >= 0 {
prefix = line[:i]
reason = strings.TrimSpace(line[i+1:])
}
if reason == "" {
log.Fatalf(ctx, "missing reason in %s, line %q", filename, line)
}
present, err := db.IsExcluded(ctx, prefix)
if err != nil {
log.Fatalf(ctx, "db.IsExcluded(%q): %v", prefix, err)
}
if !present {
if err := db.InsertExcludedPrefix(ctx, prefix, user, reason); err != nil {
log.Fatalf(ctx, "db.InsertExcludedPrefix(%q, %q, %q): %v", prefix, user, reason, err)
}
}
}
}
// readFileLines reads filename and returns its lines, trimmed of whitespace.
// Blank lines and lines whose first non-blank character is '#' are omitted.
func readFileLines(filename string) ([]string, error) {
var lines []string
f, err := os.Open(filename)
if err != nil {
return nil, err
}
defer f.Close()
scan := bufio.NewScanner(f)
for scan.Scan() {
line := strings.TrimSpace(scan.Text())
if line == "" || line[0] == '#' {
continue
}
lines = append(lines, line)
}
if err := scan.Err(); err != nil {
return nil, err
}
return lines, nil
}