| // +build ignore,OMIT |
| |
| // The frontend command runs a Google server that combines results |
| // from multiple backends. |
| package main |
| |
| import ( |
| "flag" |
| "log" |
| "net" |
| "net/http" |
| _ "net/http/pprof" |
| "strings" |
| "sync" |
| |
| "golang.org/x/net/context" |
| pb "golang.org/x/talks/content/2015/gotham-grpc/search" |
| "google.golang.org/grpc" |
| ) |
| |
| var ( |
| backends = flag.String("backends", "localhost:36061,localhost:36062", "comma-separated backend server addresses") |
| ) |
| |
| type server struct { |
| backends []pb.GoogleClient |
| } |
| |
| // Search issues Search RPCs in parallel to the backends and returns |
| // the first result. |
| func (s *server) Search(ctx context.Context, req *pb.Request) (*pb.Result, error) { // HL |
| c := make(chan result, len(s.backends)) |
| for _, b := range s.backends { |
| go func(backend pb.GoogleClient) { // HL |
| res, err := backend.Search(ctx, req) // HL |
| c <- result{res, err} // HL |
| }(b) // HL |
| } |
| first := <-c // HL |
| return first.res, first.err // HL |
| } |
| |
| type result struct { |
| res *pb.Result |
| err error |
| } |
| |
| // Watch runs Watch RPCs in parallel on the backends and returns a |
| // merged stream of results. |
| func (s *server) Watch(req *pb.Request, stream pb.Google_WatchServer) error { // HL |
| ctx := stream.Context() |
| c := make(chan result) // HL |
| var wg sync.WaitGroup |
| for _, b := range s.backends { |
| wg.Add(1) |
| go func(backend pb.GoogleClient) { // HL |
| defer wg.Done() // HL |
| watchBackend(ctx, backend, req, c) // HL |
| }(b) // HL |
| } |
| go func() { |
| wg.Wait() |
| close(c) // HL |
| }() |
| for res := range c { // HL |
| if res.err != nil { |
| return res.err |
| } |
| if err := stream.Send(res.res); err != nil { // HL |
| return err // HL |
| } // HL |
| } |
| return nil |
| } |
| |
| // watchBackend runs Watch on a single backend and sends results on c. |
| // watchBackend returns when ctx.Done is closed or stream.Recv fails. |
| func watchBackend(ctx context.Context, backend pb.GoogleClient, req *pb.Request, c chan<- result) { |
| stream, err := backend.Watch(ctx, req) // HL |
| if err != nil { |
| select { |
| case c <- result{err: err}: // HL |
| case <-ctx.Done(): |
| } |
| return |
| } |
| for { |
| res, err := stream.Recv() // HL |
| select { |
| case c <- result{res, err}: // HL |
| if err != nil { |
| return |
| } |
| case <-ctx.Done(): |
| return |
| } |
| } |
| } |
| |
| func main() { |
| flag.Parse() |
| go http.ListenAndServe(":36660", nil) // HTTP debugging |
| lis, err := net.Listen("tcp", ":36060") // RPC port |
| if err != nil { |
| log.Fatalf("failed to listen: %v", err) |
| } |
| s := new(server) |
| for _, addr := range strings.Split(*backends, ",") { |
| conn, err := grpc.Dial(addr, grpc.WithInsecure()) |
| if err != nil { |
| log.Fatalf("fail to dial: %v", err) |
| } |
| client := pb.NewGoogleClient(conn) |
| s.backends = append(s.backends, client) |
| } |
| g := grpc.NewServer() |
| pb.RegisterGoogleServer(g, s) |
| g.Serve(lis) |
| } |