// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package gcpdial monitors VM instance groups to let frontends dial
// them directly without going through an internal load balancer.
package gcpdial

import (
	"context"
	"fmt"
	"io"
	"io/ioutil"
	"log"
	"math/rand"
	"net/http"
	"strings"
	"sync"
	"time"

	compute "google.golang.org/api/compute/v1"
)

type Dialer struct {
	lister instanceLister

	mu            sync.Mutex
	lastInstances []string           // URLs of instances
	prober        map[string]*prober // URL of instance to its prober
	ready         map[string]string  // URL of instance to ready IP
}

type prober struct {
	d       *Dialer
	instURL string
	cancel  func()          // called by Dialer to shut down this dialer
	ctx     context.Context // context that's canceled from above

	pi *parsedInstance

	// owned by the probeLoop goroutine:
	ip      string
	healthy bool
}

func newProber(d *Dialer, instURL string) *prober {
	ctx, cancel := context.WithCancel(context.Background())
	return &prober{
		d:       d,
		instURL: instURL,
		cancel:  cancel,
		ctx:     ctx,
	}
}

func (p *prober) probeLoop() {
	log.Printf("start prober for %s", p.instURL)
	defer log.Printf("end prober for %s", p.instURL)

	pi, err := parseInstance(p.instURL)
	if err != nil {
		log.Printf("gcpdial: prober %s: failed to parse: %v", p.instURL, err)
		return
	}
	p.pi = pi

	t := time.NewTicker(15 * time.Second)
	defer t.Stop()
	for {
		p.probe()
		select {
		case <-p.ctx.Done():
			return
		case <-t.C:
		}
	}
}

func (p *prober) probe() {
	if p.ip == "" && !p.getIP() {
		return
	}
	ctx, cancel := context.WithTimeout(p.ctx, 30*time.Second)
	defer cancel()
	req, err := http.NewRequest("GET", "http://"+p.ip+"/healthz", nil)
	if err != nil {
		log.Printf("gcpdial: prober %s: NewRequest: %v", p.instURL, err)
		return
	}
	req = req.WithContext(ctx)
	res, err := http.DefaultClient.Do(req)
	if res != nil {
		defer res.Body.Close()
		defer io.Copy(ioutil.Discard, res.Body)
	}
	healthy := err == nil && res.StatusCode == http.StatusOK
	if healthy == p.healthy {
		// No change.
		return
	}
	p.healthy = healthy

	p.d.mu.Lock()
	defer p.d.mu.Unlock()
	if healthy {
		if p.d.ready == nil {
			p.d.ready = map[string]string{}
		}
		p.d.ready[p.instURL] = p.ip
		// TODO: possible optimization: trigger
		// Dialer.PickIP waiters to wake up rather
		// than them polling once a second.
	} else {
		delete(p.d.ready, p.instURL)
		var why string
		if err != nil {
			why = err.Error()
		} else {
			why = res.Status
		}
		log.Printf("gcpdial: prober %s: no longer healthy; %v", p.instURL, why)
	}
}

// getIP populates p.ip and reports whether it did so.
func (p *prober) getIP() bool {
	if p.ip != "" {
		return true
	}
	ctx, cancel := context.WithTimeout(p.ctx, 30*time.Second)
	defer cancel()
	svc, err := compute.NewService(ctx)
	if err != nil {
		log.Printf("gcpdial: prober %s: NewService: %v", p.instURL, err)
		return false
	}
	inst, err := svc.Instances.Get(p.pi.Project, p.pi.Zone, p.pi.Name).Context(ctx).Do()
	if err != nil {
		log.Printf("gcpdial: prober %s: Get: %v", p.instURL, err)
		return false
	}
	var ip string
	var other []string
	for _, ni := range inst.NetworkInterfaces {
		if strings.HasPrefix(ni.NetworkIP, "10.") {
			ip = ni.NetworkIP
		} else {
			other = append(other, ni.NetworkIP)
		}
	}
	if ip == "" {
		log.Printf("gcpdial: prober %s: didn't find 10.x.x.x IP; found %q", p.instURL, other)
		return false
	}
	p.ip = ip
	return true
}

// PickIP returns a randomly healthy IP, waiting until one is available, or until ctx expires.
func (d *Dialer) PickIP(ctx context.Context) (ip string, err error) {
	for {
		if ip, ok := d.pickIP(); ok {
			return ip, nil
		}
		select {
		case <-ctx.Done():
			return "", ctx.Err()
		case <-time.After(time.Second):
		}
	}
}

func (d *Dialer) pickIP() (string, bool) {
	d.mu.Lock()
	defer d.mu.Unlock()
	if len(d.ready) == 0 {
		return "", false
	}
	num := rand.Intn(len(d.ready))
	for _, v := range d.ready {
		if num > 0 {
			num--
			continue
		}
		return v, true
	}
	panic("not reachable")
}

func (d *Dialer) poll() {
	for {
		ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
		res, err := d.lister.ListInstances(ctx)
		cancel()
		if err != nil {
			log.Printf("gcpdial: polling %v: %v", d.lister, err)
			time.Sleep(10 * time.Second)
			continue
		}

		want := map[string]bool{} // the res []string turned into a set
		for _, instURL := range res {
			want[instURL] = true
		}

		d.mu.Lock()
		// Stop + remove any health check probers that no longer appear in the
		// instance group.
		for instURL, prober := range d.prober {
			if !want[instURL] {
				prober.cancel()
				delete(d.prober, instURL)
			}
		}
		// And start any new health check probers that are newly added
		// (or newly known at least) to the instance group.
		for _, instURL := range res {
			if _, ok := d.prober[instURL]; ok {
				continue
			}
			p := newProber(d, instURL)
			go p.probeLoop()
			if d.prober == nil {
				d.prober = map[string]*prober{}
			}
			d.prober[instURL] = p
		}
		d.lastInstances = res
		d.mu.Unlock()
	}
}

// NewRegionInstanceGroupDialer returns a new dialer that dials named
// regional instance group in the provided project and region.
//
// It begins polling immediately, and there's no way to stop it.
// (Until we need one)
func NewRegionInstanceGroupDialer(project, region, group string) *Dialer {
	d := &Dialer{
		lister: regionInstanceGroupLister{project, region, group},
	}
	go d.poll()
	return d
}

// instanceLister is something that can list the current set of VMs.
//
// The idea is that we'll have both zonal and regional instance group listers,
// but currently we only have regionInstanceGroupLister below.
type instanceLister interface {
	// ListInstances returns a list of instances in their API URL form.
	//
	// The API URL form is parseable by the parseInstance func. See its docs.
	ListInstances(context.Context) ([]string, error)
}

// regionInstanceGroupLister is an instanceLister implementation that watches a regional
// instance group for changes to its set of VMs.
type regionInstanceGroupLister struct {
	project, region, group string
}

func (rig regionInstanceGroupLister) ListInstances(ctx context.Context) (ret []string, err error) {
	svc, err := compute.NewService(ctx)
	if err != nil {
		return nil, err
	}
	rigSvc := svc.RegionInstanceGroups
	insts, err := rigSvc.ListInstances(rig.project, rig.region, rig.group, &compute.RegionInstanceGroupsListInstancesRequest{
		InstanceState: "RUNNING",
		PortName:      "", // all
	}).Context(ctx).MaxResults(500).Do()
	if err != nil {
		return nil, err
	}
	// TODO: pagination for really large sets? Currently we truncate the results
	// to the first 500 VMs, which seems like plenty for now.
	// 500 is the maximum the API supports; see:
	// https://pkg.go.dev/google.golang.org/api/compute/v1?tab=doc#RegionInstanceGroupsListInstancesCall.MaxResults
	for _, it := range insts.Items {
		ret = append(ret, it.Instance)
	}
	return ret, nil
}

// parsedInstance contains the project, zone, and name of a VM.
type parsedInstance struct {
	Project, Zone, Name string
}

// parseInstance parses e.g. "https://www.googleapis.com/compute/v1/projects/golang-org/zones/us-central1-c/instances/playsandbox-7sj8" into its parts.
func parseInstance(u string) (*parsedInstance, error) {
	const pfx = "https://www.googleapis.com/compute/v1/projects/"
	if !strings.HasPrefix(u, pfx) {
		return nil, fmt.Errorf("failed to parse instance %q; doesn't begin with %q", u, pfx)
	}
	u = u[len(pfx):] // "golang-org/zones/us-central1-c/instances/playsandbox-7sj8"
	f := strings.Split(u, "/")
	if len(f) != 5 || f[1] != "zones" || f[3] != "instances" {
		return nil, fmt.Errorf("failed to parse instance %q; unexpected format", u)
	}
	return &parsedInstance{f[0], f[2], f[4]}, nil
}
