internal/worker: add /clear-cache endpoint
We sometimes want to clear our Redis-based cache. Adding an endpoint
to the worker will let us do this more simply and with less chance of
error than running the redis CLI.
Change-Id: I855ea5d906f0cb080b4e2f6d5fa279a6a2e0b949
Reviewed-on: https://team-review.git.corp.google.com/c/golang/discovery/+/768539
CI-Result: Cloud Build <devtools-proctor-result-processor@system.gserviceaccount.com>
Reviewed-by: Julie Qiu <julieqiu@google.com>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 49b29a0..4dc121e 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -88,8 +88,11 @@
sourceClient := source.NewClient(config.SourceTimeout)
fetchQueue := newQueue(ctx, cfg, proxyClient, sourceClient, db)
reportingClient := reportingClient(ctx, cfg)
- redisClient := getRedis(ctx, cfg)
- server, err := worker.NewServer(cfg, db, indexClient, proxyClient, sourceClient, redisClient, fetchQueue, reportingClient, config.TaskIDChangeIntervalWorker, *staticPath)
+ redisHAClient := getHARedis(ctx, cfg)
+ redisCacheClient := getCacheRedis(ctx, cfg)
+ server, err := worker.NewServer(cfg, db, indexClient, proxyClient, sourceClient,
+ redisHAClient, redisCacheClient, fetchQueue, reportingClient,
+ config.TaskIDChangeIntervalWorker, *staticPath)
if err != nil {
log.Fatal(ctx, err)
}
@@ -157,23 +160,31 @@
return queue.NewGCP(cfg, client, queueName)
}
-func getRedis(ctx context.Context, cfg *config.Config) *redis.Client {
+func getHARedis(ctx context.Context, cfg *config.Config) *redis.Client {
+ // We update completions with one big pipeline, so we need long write
+ // timeouts. ReadTimeout is increased only to be consistent with
+ // WriteTimeout.
+ return getRedis(ctx, cfg.RedisHAHost, cfg.RedisHAPort, 5*time.Minute, 5*time.Minute)
+}
+
+func getCacheRedis(ctx context.Context, cfg *config.Config) *redis.Client {
+ return getRedis(ctx, cfg.RedisCacheHost, cfg.RedisCachePort, 0, 0)
+}
+
+func getRedis(ctx context.Context, host, port string, writeTimeout, readTimeout time.Duration) *redis.Client {
+ if host == "" {
+ return nil
+ }
var dialTimeout time.Duration
if dl, ok := ctx.Deadline(); ok {
dialTimeout = time.Until(dl)
}
- if cfg.RedisHAHost != "" {
- return redis.NewClient(&redis.Options{
- Addr: cfg.RedisHAHost + ":" + cfg.RedisHAPort,
- DialTimeout: dialTimeout,
- // We update completions with one big pipeline, so we need long write
- // timeouts. ReadTimeout is increased only to be consistent with
- // WriteTimeout.
- WriteTimeout: 5 * time.Minute,
- ReadTimeout: 5 * time.Minute,
- })
- }
- return nil
+ return redis.NewClient(&redis.Options{
+ Addr: host + ":" + port,
+ DialTimeout: dialTimeout,
+ WriteTimeout: writeTimeout,
+ ReadTimeout: readTimeout,
+ })
}
func reportingClient(ctx context.Context, cfg *config.Config) *errorreporting.Client {
diff --git a/internal/testing/integration/integration_test.go b/internal/testing/integration/integration_test.go
index f3a658b..8eeb04e 100644
--- a/internal/testing/integration/integration_test.go
+++ b/internal/testing/integration/integration_test.go
@@ -79,7 +79,7 @@
queue := queue.NewInMemory(ctx, proxyClient, source.NewClient(1*time.Second), testDB, 10,
worker.FetchAndUpdateState, nil)
- workerServer, err := worker.NewServer(&config.Config{}, testDB, indexClient, proxyClient, source.NewClient(1*time.Second), redisHAClient, queue, nil, 10*time.Minute, "../../../content/static")
+ workerServer, err := worker.NewServer(&config.Config{}, testDB, indexClient, proxyClient, source.NewClient(1*time.Second), redisHAClient, redisCacheClient, queue, nil, 10*time.Minute, "../../../content/static")
if err != nil {
t.Fatal(err)
}
diff --git a/internal/worker/completion.go b/internal/worker/completion.go
index 5ba8a02..7be5860 100644
--- a/internal/worker/completion.go
+++ b/internal/worker/completion.go
@@ -25,7 +25,7 @@
// updates redis auto completion indexes with data from these documents.
func (s *Server) handleUpdateRedisIndexes(w http.ResponseWriter, r *http.Request) error {
ctx := r.Context()
- err := updateRedisIndexes(ctx, s.db.Underlying(), s.redisClient, popularCutoff)
+ err := updateRedisIndexes(ctx, s.db.Underlying(), s.redisHAClient, popularCutoff)
if err != nil {
return err
}
@@ -39,7 +39,7 @@
func updateRedisIndexes(ctx context.Context, db *database.DB, redisClient *redis.Client, cutoff int) (err error) {
defer derrors.Wrap(&err, "updateRedisIndexes")
if redisClient == nil {
- return errors.New("redis client is nil")
+ return errors.New("redis HA client is nil")
}
// For autocompletion, we track two separate "indexes" (sorted sets of
diff --git a/internal/worker/server.go b/internal/worker/server.go
index aba6b32..08293b9 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -41,7 +41,8 @@
indexClient *index.Client
proxyClient *proxy.Client
sourceClient *source.Client
- redisClient *redis.Client
+ redisHAClient *redis.Client
+ redisCacheClient *redis.Client
db *postgres.DB
queue queue.Queue
reportingClient *errorreporting.Client
@@ -56,7 +57,8 @@
indexClient *index.Client,
proxyClient *proxy.Client,
sourceClient *source.Client,
- redisClient *redis.Client,
+ redisHAClient *redis.Client,
+ redisCacheClient *redis.Client,
queue queue.Queue,
reportingClient *errorreporting.Client,
taskIDChangeInterval time.Duration,
@@ -75,7 +77,8 @@
indexClient: indexClient,
proxyClient: proxyClient,
sourceClient: sourceClient,
- redisClient: redisClient,
+ redisHAClient: redisHAClient,
+ redisCacheClient: redisCacheClient,
queue: queue,
reportingClient: reportingClient,
indexTemplate: indexTemplate,
@@ -147,6 +150,9 @@
// "before" query parameter.
handle("/repopulate-search-documents", rmw(s.errorHandler(s.handleRepopulateSearchDocuments)))
+ // manual: clear-cache clears the redis cache.
+ handle("/clear-cache", rmw(s.errorHandler(s.clearCache)))
+
// returns the Worker homepage.
handle("/", http.HandlerFunc(s.handleStatusPage))
}
@@ -480,6 +486,18 @@
return nil
}
+func (s *Server) clearCache(w http.ResponseWriter, r *http.Request) error {
+ if s.redisCacheClient == nil {
+ return errors.New("Redis cache client is not configured")
+ }
+ status := s.redisCacheClient.FlushAll()
+ if status.Err() != nil {
+ return status.Err()
+ }
+ fmt.Fprint(w, "Cache cleared.")
+ return nil
+}
+
// Parse the template for the status page.
func parseTemplate(staticPath string) (*template.Template, error) {
if staticPath == "" {
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index 0eeaa7e..c3cf34b 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -164,7 +164,8 @@
// Use 10 workers to have parallelism consistent with the worker binary.
q := queue.NewInMemory(ctx, proxyClient, sourceClient, testDB, 10, FetchAndUpdateState, nil)
- s, err := NewServer(&config.Config{}, testDB, indexClient, proxyClient, sourceClient, nil, q, nil, 10*time.Minute, "")
+ s, err := NewServer(&config.Config{}, testDB, indexClient, proxyClient, sourceClient,
+ nil, nil, q, nil, 10*time.Minute, "")
if err != nil {
t.Fatal(err)
}