internal/worker: add /clear-cache endpoint

We sometimes want to clear our Redis-based cache. Adding an endpoint
to the worker will let us do this more simply and with less chance of
error than running the redis CLI.

Change-Id: I855ea5d906f0cb080b4e2f6d5fa279a6a2e0b949
Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/768539
CI-Result: Cloud Build <devtools-proctor-result-processor@system.gserviceaccount.com>
Reviewed-by: Julie Qiu <julieqiu@google.com>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 49b29a0..4dc121e 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -88,8 +88,11 @@
 	sourceClient := source.NewClient(config.SourceTimeout)
 	fetchQueue := newQueue(ctx, cfg, proxyClient, sourceClient, db)
 	reportingClient := reportingClient(ctx, cfg)
-	redisClient := getRedis(ctx, cfg)
-	server, err := worker.NewServer(cfg, db, indexClient, proxyClient, sourceClient, redisClient, fetchQueue, reportingClient, config.TaskIDChangeIntervalWorker, *staticPath)
+	redisHAClient := getHARedis(ctx, cfg)
+	redisCacheClient := getCacheRedis(ctx, cfg)
+	server, err := worker.NewServer(cfg, db, indexClient, proxyClient, sourceClient,
+		redisHAClient, redisCacheClient, fetchQueue, reportingClient,
+		config.TaskIDChangeIntervalWorker, *staticPath)
 	if err != nil {
 		log.Fatal(ctx, err)
 	}
@@ -157,23 +160,31 @@
 	return queue.NewGCP(cfg, client, queueName)
 }
 
-func getRedis(ctx context.Context, cfg *config.Config) *redis.Client {
+func getHARedis(ctx context.Context, cfg *config.Config) *redis.Client {
+	// We update completions with one big pipeline, so we need long write
+	// timeouts. ReadTimeout is increased only to be consistent with
+	// WriteTimeout.
+	return getRedis(ctx, cfg.RedisHAHost, cfg.RedisHAPort, 5*time.Minute, 5*time.Minute)
+}
+
+func getCacheRedis(ctx context.Context, cfg *config.Config) *redis.Client {
+	return getRedis(ctx, cfg.RedisCacheHost, cfg.RedisCachePort, 0, 0)
+}
+
+func getRedis(ctx context.Context, host, port string, writeTimeout, readTimeout time.Duration) *redis.Client {
+	if host == "" {
+		return nil
+	}
 	var dialTimeout time.Duration
 	if dl, ok := ctx.Deadline(); ok {
 		dialTimeout = time.Until(dl)
 	}
-	if cfg.RedisHAHost != "" {
-		return redis.NewClient(&redis.Options{
-			Addr:        cfg.RedisHAHost + ":" + cfg.RedisHAPort,
-			DialTimeout: dialTimeout,
-			// We update completions with one big pipeline, so we need long write
-			// timeouts. ReadTimeout is increased only to be consistent with
-			// WriteTimeout.
-			WriteTimeout: 5 * time.Minute,
-			ReadTimeout:  5 * time.Minute,
-		})
-	}
-	return nil
+	return redis.NewClient(&redis.Options{
+		Addr:         host + ":" + port,
+		DialTimeout:  dialTimeout,
+		WriteTimeout: writeTimeout,
+		ReadTimeout:  readTimeout,
+	})
 }
 
 func reportingClient(ctx context.Context, cfg *config.Config) *errorreporting.Client {
diff --git a/internal/testing/integration/integration_test.go b/internal/testing/integration/integration_test.go
index f3a658b..8eeb04e 100644
--- a/internal/testing/integration/integration_test.go
+++ b/internal/testing/integration/integration_test.go
@@ -79,7 +79,7 @@
 	queue := queue.NewInMemory(ctx, proxyClient, source.NewClient(1*time.Second), testDB, 10,
 		worker.FetchAndUpdateState, nil)
 
-	workerServer, err := worker.NewServer(&config.Config{}, testDB, indexClient, proxyClient, source.NewClient(1*time.Second), redisHAClient, queue, nil, 10*time.Minute, "../../../content/static")
+	workerServer, err := worker.NewServer(&config.Config{}, testDB, indexClient, proxyClient, source.NewClient(1*time.Second), redisHAClient, redisCacheClient, queue, nil, 10*time.Minute, "../../../content/static")
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/internal/worker/completion.go b/internal/worker/completion.go
index 5ba8a02..7be5860 100644
--- a/internal/worker/completion.go
+++ b/internal/worker/completion.go
@@ -25,7 +25,7 @@
 // updates redis auto completion indexes with data from these documents.
 func (s *Server) handleUpdateRedisIndexes(w http.ResponseWriter, r *http.Request) error {
 	ctx := r.Context()
-	err := updateRedisIndexes(ctx, s.db.Underlying(), s.redisClient, popularCutoff)
+	err := updateRedisIndexes(ctx, s.db.Underlying(), s.redisHAClient, popularCutoff)
 	if err != nil {
 		return err
 	}
@@ -39,7 +39,7 @@
 func updateRedisIndexes(ctx context.Context, db *database.DB, redisClient *redis.Client, cutoff int) (err error) {
 	defer derrors.Wrap(&err, "updateRedisIndexes")
 	if redisClient == nil {
-		return errors.New("redis client is nil")
+		return errors.New("redis HA client is nil")
 	}
 
 	// For autocompletion, we track two separate "indexes" (sorted sets of
diff --git a/internal/worker/server.go b/internal/worker/server.go
index aba6b32..08293b9 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -41,7 +41,8 @@
 	indexClient          *index.Client
 	proxyClient          *proxy.Client
 	sourceClient         *source.Client
-	redisClient          *redis.Client
+	redisHAClient        *redis.Client
+	redisCacheClient     *redis.Client
 	db                   *postgres.DB
 	queue                queue.Queue
 	reportingClient      *errorreporting.Client
@@ -56,7 +57,8 @@
 	indexClient *index.Client,
 	proxyClient *proxy.Client,
 	sourceClient *source.Client,
-	redisClient *redis.Client,
+	redisHAClient *redis.Client,
+	redisCacheClient *redis.Client,
 	queue queue.Queue,
 	reportingClient *errorreporting.Client,
 	taskIDChangeInterval time.Duration,
@@ -75,7 +77,8 @@
 		indexClient:          indexClient,
 		proxyClient:          proxyClient,
 		sourceClient:         sourceClient,
-		redisClient:          redisClient,
+		redisHAClient:        redisHAClient,
+		redisCacheClient:     redisCacheClient,
 		queue:                queue,
 		reportingClient:      reportingClient,
 		indexTemplate:        indexTemplate,
@@ -147,6 +150,9 @@
 	// "before" query parameter.
 	handle("/repopulate-search-documents", rmw(s.errorHandler(s.handleRepopulateSearchDocuments)))
 
+	// manual: clear-cache clears the redis cache.
+	handle("/clear-cache", rmw(s.errorHandler(s.clearCache)))
+
 	// returns the Worker homepage.
 	handle("/", http.HandlerFunc(s.handleStatusPage))
 }
@@ -480,6 +486,18 @@
 	return nil
 }
 
+func (s *Server) clearCache(w http.ResponseWriter, r *http.Request) error {
+	if s.redisCacheClient == nil {
+		return errors.New("Redis cache client is not configured")
+	}
+	status := s.redisCacheClient.FlushAll()
+	if status.Err() != nil {
+		return status.Err()
+	}
+	fmt.Fprint(w, "Cache cleared.")
+	return nil
+}
+
 // Parse the template for the status page.
 func parseTemplate(staticPath string) (*template.Template, error) {
 	if staticPath == "" {
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index 0eeaa7e..c3cf34b 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -164,7 +164,8 @@
 			// Use 10 workers to have parallelism consistent with the worker binary.
 			q := queue.NewInMemory(ctx, proxyClient, sourceClient, testDB, 10, FetchAndUpdateState, nil)
 
-			s, err := NewServer(&config.Config{}, testDB, indexClient, proxyClient, sourceClient, nil, q, nil, 10*time.Minute, "")
+			s, err := NewServer(&config.Config{}, testDB, indexClient, proxyClient, sourceClient,
+				nil, nil, q, nil, 10*time.Minute, "")
 			if err != nil {
 				t.Fatal(err)
 			}