cmd/relui: create pubsub topic on start
This will create and connect to a pubsub topic for communicating with
relui workers on application start. If the topic already exists, it will
just get a reference to the topic.
For golang/go#40279
Co-authored-by: Carlos Amedee <carlos@golang.org>
Change-Id: Ic173212cd15562b9d1a1cc601d307d5ee1a4e811
Reviewed-on: https://go-review.googlesource.com/c/build/+/257237
Trust: Alexander Rakoczy <alex@golang.org>
Run-TryBot: Alexander Rakoczy <alex@golang.org>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
diff --git a/cmd/relui/main.go b/cmd/relui/main.go
index 4ba50cb..420b9f3 100644
--- a/cmd/relui/main.go
+++ b/cmd/relui/main.go
@@ -6,6 +6,7 @@
package main
import (
+ "context"
"flag"
"io/ioutil"
"log"
@@ -13,12 +14,17 @@
"os"
"path/filepath"
+ "cloud.google.com/go/pubsub"
"github.com/golang/protobuf/proto"
reluipb "golang.org/x/build/cmd/relui/protos"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
var (
devDataDir = flag.String("dev-data-directory", defaultDevDataDir(), "Development-only directory to use for storage of application state.")
+ projectID = flag.String("project-id", os.Getenv("PUBSUB_PROJECT_ID"), "Pubsub project ID for communicating with workers. Uses PUBSUB_PROJECT_ID if unset.")
+ topicID = flag.String("topic-id", "relui-development", "Pubsub topic ID for communicating with workers.")
)
func main() {
@@ -27,7 +33,12 @@
if err := fs.load(); err != nil {
log.Fatalf("Error loading state from %q: %v", *devDataDir, err)
}
- s := &server{store: fs, configs: loadWorkflowConfig("./workflows")}
+ ctx := context.Background()
+ s := &server{
+ configs: loadWorkflowConfig("./workflows"),
+ store: fs,
+ topic: getTopic(ctx),
+ }
http.Handle("/workflows/create", http.HandlerFunc(s.createWorkflowHandler))
http.Handle("/workflows/new", http.HandlerFunc(s.newWorkflowHandler))
http.Handle("/", fileServerHandler(relativeFile("./static"), http.HandlerFunc(s.homeHandler)))
@@ -39,6 +50,22 @@
log.Fatal(http.ListenAndServe(":"+port, http.DefaultServeMux))
}
+// getTopic creates and returns a pubsub topic from the project specified in projectId, which is to be used for
+// communicating with relui workers.
+//
+// It is safe to call if a topic already exists. A reference to the topic will be returned.
+func getTopic(ctx context.Context) *pubsub.Topic {
+ client, err := pubsub.NewClient(ctx, *projectID)
+ if err != nil {
+ log.Fatalf("pubsub.NewClient(_, %q) = %v, wanted no error", *projectID, err)
+ }
+ _, err = client.CreateTopic(ctx, *topicID)
+ if err != nil && status.Code(err) != codes.AlreadyExists {
+ log.Fatalf("client.CreateTopic(_, %q) = %v, wanted no error", *topicID, err)
+ }
+ return client.Topic(*topicID)
+}
+
// loadWorkflowConfig loads Workflow configuration files from dir. It expects all files to be in textproto format.
func loadWorkflowConfig(dir string) []*reluipb.Workflow {
fs, err := filepath.Glob(filepath.Join(relativeFile(dir), "*.textpb"))
diff --git a/cmd/relui/web.go b/cmd/relui/web.go
index a20d2fd..31cba38 100644
--- a/cmd/relui/web.go
+++ b/cmd/relui/web.go
@@ -15,6 +15,7 @@
"path"
"path/filepath"
+ "cloud.google.com/go/pubsub"
"github.com/golang/protobuf/proto"
reluipb "golang.org/x/build/cmd/relui/protos"
)
@@ -55,6 +56,9 @@
// store is for persisting application state.
store store
+
+ // topic is for communicating with relui workers.
+ topic *pubsub.Topic
}
type homeResponse struct {
diff --git a/go.mod b/go.mod
index 22f943e..8674d2c 100644
--- a/go.mod
+++ b/go.mod
@@ -6,6 +6,7 @@
cloud.google.com/go v0.54.0
cloud.google.com/go/bigquery v1.4.0
cloud.google.com/go/datastore v1.1.0
+ cloud.google.com/go/pubsub v1.2.0
cloud.google.com/go/storage v1.6.0
github.com/NYTimes/gziphandler v1.1.1
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239 // indirect