gddo-server: publish crawl events to a Pub/Sub feed
Change-Id: I55d798a8be5ca1b05375d57d011eda3444b0b415
Reviewed-on: https://go-review.googlesource.com/114775
Reviewed-by: Tuo Shan <shantuo@google.com>
diff --git a/gddo-server/config.go b/gddo-server/config.go
index 199f262..7317ddf 100644
--- a/gddo-server/config.go
+++ b/gddo-server/config.go
@@ -65,6 +65,9 @@
ConfigGithubToken = "github_token"
ConfigGithubClientID = "github_client_id"
ConfigGithubClientSecret = "github_client_secret"
+
+ // Pub/Sub Config
+ ConfigCrawlPubSubTopic = "crawl-events"
)
func loadConfig(ctx context.Context, args []string) (*viper.Viper, error) {
diff --git a/gddo-server/crawl.go b/gddo-server/crawl.go
index c79e27a..d85352e 100644
--- a/gddo-server/crawl.go
+++ b/gddo-server/crawl.go
@@ -8,12 +8,15 @@
import (
"context"
+ "encoding/json"
"fmt"
"log"
"regexp"
"strings"
"time"
+ "cloud.google.com/go/pubsub"
+
"github.com/golang/gddo/doc"
"github.com/golang/gddo/gosrc"
)
@@ -22,6 +25,27 @@
testdataPat = regexp.MustCompile(`/testdata(?:/|$)`)
)
+// crawlNote is a message sent to Pub/Sub when a crawl occurs.
+// It is encoded with encoding/gob, so changes should match its
+// compatibility requirements.
+type crawlNote struct {
+ ImportPath string
+}
+
+func (s *server) publishCrawl(ctx context.Context, importPath string) {
+ if s.crawlTopic == nil {
+ return
+ }
+
+ note := &crawlNote{ImportPath: importPath}
+ b, err := json.Marshal(note)
+ if err != nil {
+ log.Printf("Encoding crawlNote: %v", err)
+ return
+ }
+ s.crawlTopic.Publish(ctx, &pubsub.Message{Data: b})
+}
+
// crawlDoc fetches the package documentation from the VCS and updates the database.
func (s *server) crawlDoc(ctx context.Context, source string, importPath string, pdoc *doc.Package, hasSubdirs bool, nextCrawl time.Time) (*doc.Package, error) {
message := []interface{}{source}
@@ -85,6 +109,7 @@
if err := s.put(ctx, pdoc, nextCrawl); err != nil {
log.Println(err)
}
+ s.publishCrawl(ctx, importPath)
return pdoc, nil
} else if e, ok := err.(gosrc.NotModifiedError); ok {
if pdoc.Status == gosrc.Active && !s.isActivePkg(importPath, e.Status) {
@@ -103,6 +128,7 @@
log.Printf("ERROR db.SetNextCrawl(%q): %v", importPath, err)
}
}
+ s.publishCrawl(ctx, importPath)
return pdoc, nil
} else if e, ok := err.(gosrc.NotFoundError); ok {
message = append(message, "notfound:", e)
diff --git a/gddo-server/main.go b/gddo-server/main.go
index 2f080fe..2137cb9 100644
--- a/gddo-server/main.go
+++ b/gddo-server/main.go
@@ -29,6 +29,7 @@
"time"
"cloud.google.com/go/logging"
+ "cloud.google.com/go/pubsub"
"cloud.google.com/go/trace"
"github.com/spf13/viper"
@@ -838,6 +839,7 @@
gceLogger *GCELogger
templates templateMap
traceClient *trace.Client
+ crawlTopic *pubsub.Topic
statusPNG http.Handler
statusSVG http.Handler
@@ -861,6 +863,13 @@
return nil, err
}
s.traceClient.SetSamplingPolicy(sp)
+
+ // This topic should be created in the cloud console.
+ ps, err := pubsub.NewClient(ctx, proj)
+ if err != nil {
+ return nil, err
+ }
+ s.crawlTopic = ps.Topic(ConfigCrawlPubSubTopic)
}
assets := v.GetString(ConfigAssetsDir)