cmd/relui: create pubsub topic on start

This will create and connect to a pubsub topic for communicating with
relui workers on application start. If the topic already exists, it will
just get a reference to the topic.

For golang/go#40279

Co-authored-by: Carlos Amedee <carlos@golang.org>
Change-Id: Ic173212cd15562b9d1a1cc601d307d5ee1a4e811
Reviewed-on: https://go-review.googlesource.com/c/build/+/257237
Trust: Alexander Rakoczy <alex@golang.org>
Run-TryBot: Alexander Rakoczy <alex@golang.org>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
diff --git a/cmd/relui/main.go b/cmd/relui/main.go
index 4ba50cb..420b9f3 100644
--- a/cmd/relui/main.go
+++ b/cmd/relui/main.go
@@ -6,6 +6,7 @@
 package main
 
 import (
+	"context"
 	"flag"
 	"io/ioutil"
 	"log"
@@ -13,12 +14,17 @@
 	"os"
 	"path/filepath"
 
+	"cloud.google.com/go/pubsub"
 	"github.com/golang/protobuf/proto"
 	reluipb "golang.org/x/build/cmd/relui/protos"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
 var (
 	devDataDir = flag.String("dev-data-directory", defaultDevDataDir(), "Development-only directory to use for storage of application state.")
+	projectID  = flag.String("project-id", os.Getenv("PUBSUB_PROJECT_ID"), "Pubsub project ID for communicating with workers. Uses PUBSUB_PROJECT_ID if unset.")
+	topicID    = flag.String("topic-id", "relui-development", "Pubsub topic ID for communicating with workers.")
 )
 
 func main() {
@@ -27,7 +33,12 @@
 	if err := fs.load(); err != nil {
 		log.Fatalf("Error loading state from %q: %v", *devDataDir, err)
 	}
-	s := &server{store: fs, configs: loadWorkflowConfig("./workflows")}
+	ctx := context.Background()
+	s := &server{
+		configs: loadWorkflowConfig("./workflows"),
+		store:   fs,
+		topic:   getTopic(ctx),
+	}
 	http.Handle("/workflows/create", http.HandlerFunc(s.createWorkflowHandler))
 	http.Handle("/workflows/new", http.HandlerFunc(s.newWorkflowHandler))
 	http.Handle("/", fileServerHandler(relativeFile("./static"), http.HandlerFunc(s.homeHandler)))
@@ -39,6 +50,22 @@
 	log.Fatal(http.ListenAndServe(":"+port, http.DefaultServeMux))
 }
 
+// getTopic creates and returns a pubsub topic from the project specified in projectId, which is to be used for
+// communicating with relui workers.
+//
+// It is safe to call if a topic already exists. A reference to the topic will be returned.
+func getTopic(ctx context.Context) *pubsub.Topic {
+	client, err := pubsub.NewClient(ctx, *projectID)
+	if err != nil {
+		log.Fatalf("pubsub.NewClient(_, %q) = %v, wanted no error", *projectID, err)
+	}
+	_, err = client.CreateTopic(ctx, *topicID)
+	if err != nil && status.Code(err) != codes.AlreadyExists {
+		log.Fatalf("client.CreateTopic(_, %q) = %v, wanted no error", *topicID, err)
+	}
+	return client.Topic(*topicID)
+}
+
 // loadWorkflowConfig loads Workflow configuration files from dir. It expects all files to be in textproto format.
 func loadWorkflowConfig(dir string) []*reluipb.Workflow {
 	fs, err := filepath.Glob(filepath.Join(relativeFile(dir), "*.textpb"))
diff --git a/cmd/relui/web.go b/cmd/relui/web.go
index a20d2fd..31cba38 100644
--- a/cmd/relui/web.go
+++ b/cmd/relui/web.go
@@ -15,6 +15,7 @@
 	"path"
 	"path/filepath"
 
+	"cloud.google.com/go/pubsub"
 	"github.com/golang/protobuf/proto"
 	reluipb "golang.org/x/build/cmd/relui/protos"
 )
@@ -55,6 +56,9 @@
 
 	// store is for persisting application state.
 	store store
+
+	// topic is for communicating with relui workers.
+	topic *pubsub.Topic
 }
 
 type homeResponse struct {
diff --git a/go.mod b/go.mod
index 22f943e..8674d2c 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@
 	cloud.google.com/go v0.54.0
 	cloud.google.com/go/bigquery v1.4.0
 	cloud.google.com/go/datastore v1.1.0
+	cloud.google.com/go/pubsub v1.2.0
 	cloud.google.com/go/storage v1.6.0
 	github.com/NYTimes/gziphandler v1.1.1
 	github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect