gddo-server: publish crawl events to a Pub/Sub feed

Change-Id: I55d798a8be5ca1b05375d57d011eda3444b0b415
Reviewed-on: https://go-review.googlesource.com/114775
Reviewed-by: Tuo Shan <shantuo@google.com>
diff --git a/gddo-server/config.go b/gddo-server/config.go
index 199f262..7317ddf 100644
--- a/gddo-server/config.go
+++ b/gddo-server/config.go
@@ -65,6 +65,9 @@
 	ConfigGithubToken        = "github_token"
 	ConfigGithubClientID     = "github_client_id"
 	ConfigGithubClientSecret = "github_client_secret"
+
+	// Pub/Sub Config
+	ConfigCrawlPubSubTopic = "crawl-events"
 )
 
 func loadConfig(ctx context.Context, args []string) (*viper.Viper, error) {
diff --git a/gddo-server/crawl.go b/gddo-server/crawl.go
index c79e27a..d85352e 100644
--- a/gddo-server/crawl.go
+++ b/gddo-server/crawl.go
@@ -8,12 +8,15 @@
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"log"
 	"regexp"
 	"strings"
 	"time"
 
+	"cloud.google.com/go/pubsub"
+
 	"github.com/golang/gddo/doc"
 	"github.com/golang/gddo/gosrc"
 )
@@ -22,6 +25,27 @@
 	testdataPat = regexp.MustCompile(`/testdata(?:/|$)`)
 )
 
+// crawlNote is a message sent to Pub/Sub when a crawl occurs.
+// It is encoded with encoding/gob, so changes should match its
+// compatibility requirements.
+type crawlNote struct {
+	ImportPath string
+}
+
+func (s *server) publishCrawl(ctx context.Context, importPath string) {
+	if s.crawlTopic == nil {
+		return
+	}
+
+	note := &crawlNote{ImportPath: importPath}
+	b, err := json.Marshal(note)
+	if err != nil {
+		log.Printf("Encoding crawlNote: %v", err)
+		return
+	}
+	s.crawlTopic.Publish(ctx, &pubsub.Message{Data: b})
+}
+
 // crawlDoc fetches the package documentation from the VCS and updates the database.
 func (s *server) crawlDoc(ctx context.Context, source string, importPath string, pdoc *doc.Package, hasSubdirs bool, nextCrawl time.Time) (*doc.Package, error) {
 	message := []interface{}{source}
@@ -85,6 +109,7 @@
 		if err := s.put(ctx, pdoc, nextCrawl); err != nil {
 			log.Println(err)
 		}
+		s.publishCrawl(ctx, importPath)
 		return pdoc, nil
 	} else if e, ok := err.(gosrc.NotModifiedError); ok {
 		if pdoc.Status == gosrc.Active && !s.isActivePkg(importPath, e.Status) {
@@ -103,6 +128,7 @@
 				log.Printf("ERROR db.SetNextCrawl(%q): %v", importPath, err)
 			}
 		}
+		s.publishCrawl(ctx, importPath)
 		return pdoc, nil
 	} else if e, ok := err.(gosrc.NotFoundError); ok {
 		message = append(message, "notfound:", e)
diff --git a/gddo-server/main.go b/gddo-server/main.go
index 2f080fe..2137cb9 100644
--- a/gddo-server/main.go
+++ b/gddo-server/main.go
@@ -29,6 +29,7 @@
 	"time"
 
 	"cloud.google.com/go/logging"
+	"cloud.google.com/go/pubsub"
 	"cloud.google.com/go/trace"
 	"github.com/spf13/viper"
 
@@ -838,6 +839,7 @@
 	gceLogger   *GCELogger
 	templates   templateMap
 	traceClient *trace.Client
+	crawlTopic  *pubsub.Topic
 
 	statusPNG http.Handler
 	statusSVG http.Handler
@@ -861,6 +863,13 @@
 			return nil, err
 		}
 		s.traceClient.SetSamplingPolicy(sp)
+
+		// This topic should be created in the cloud console.
+		ps, err := pubsub.NewClient(ctx, proj)
+		if err != nil {
+			return nil, err
+		}
+		s.crawlTopic = ps.Topic(ConfigCrawlPubSubTopic)
 	}
 
 	assets := v.GetString(ConfigAssetsDir)