blob: 35b5480944c6cbf12adbf6b0a30c9605fd1b8af1 [file] [log] [blame]
// +build ignore,OMIT
// The frontend command runs a Google server that combines results
// from multiple backends.
package main
import (
"flag"
"log"
"net"
"net/http"
_ "net/http/pprof"
"strings"
"sync"
"golang.org/x/net/context"
pb "golang.org/x/talks/content/2015/gotham-grpc/search"
"google.golang.org/grpc"
)
var (
backends = flag.String("backends", "localhost:36061,localhost:36062", "comma-separated backend server addresses")
)
type server struct {
backends []pb.GoogleClient
}
// Search issues Search RPCs in parallel to the backends and returns
// the first result.
func (s *server) Search(ctx context.Context, req *pb.Request) (*pb.Result, error) { // HL
c := make(chan result, len(s.backends))
for _, b := range s.backends {
go func(backend pb.GoogleClient) { // HL
res, err := backend.Search(ctx, req) // HL
c <- result{res, err} // HL
}(b) // HL
}
first := <-c // HL
return first.res, first.err // HL
}
type result struct {
res *pb.Result
err error
}
// Watch runs Watch RPCs in parallel on the backends and returns a
// merged stream of results.
func (s *server) Watch(req *pb.Request, stream pb.Google_WatchServer) error { // HL
ctx := stream.Context()
c := make(chan result) // HL
var wg sync.WaitGroup
for _, b := range s.backends {
wg.Add(1)
go func(backend pb.GoogleClient) { // HL
defer wg.Done() // HL
watchBackend(ctx, backend, req, c) // HL
}(b) // HL
}
go func() {
wg.Wait()
close(c) // HL
}()
for res := range c { // HL
if res.err != nil {
return res.err
}
if err := stream.Send(res.res); err != nil { // HL
return err // HL
} // HL
}
return nil
}
// watchBackend runs Watch on a single backend and sends results on c.
// watchBackend returns when ctx.Done is closed or stream.Recv fails.
func watchBackend(ctx context.Context, backend pb.GoogleClient, req *pb.Request, c chan<- result) {
stream, err := backend.Watch(ctx, req) // HL
if err != nil {
select {
case c <- result{err: err}: // HL
case <-ctx.Done():
}
return
}
for {
res, err := stream.Recv() // HL
select {
case c <- result{res, err}: // HL
if err != nil {
return
}
case <-ctx.Done():
return
}
}
}
func main() {
flag.Parse()
go http.ListenAndServe(":36660", nil) // HTTP debugging
lis, err := net.Listen("tcp", ":36060") // RPC port
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := new(server)
for _, addr := range strings.Split(*backends, ",") {
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
client := pb.NewGoogleClient(conn)
s.backends = append(s.backends, client)
}
g := grpc.NewServer()
pb.RegisterGoogleServer(g, s)
g.Serve(lis)
}