blob: b5d08f974e48831bd81e46edbcad5ddfdfbb29a4 [file] [log] [blame]
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rendezvous
import (
"context"
"fmt"
"log"
"net"
"net/http"
"sync"
"time"
"golang.org/x/build/buildlet"
"golang.org/x/build/internal"
"golang.org/x/build/revdial/v2"
"google.golang.org/api/idtoken"
)
// result contains the result for a waiting instance registration.
type result struct {
bc buildlet.Client
err error
}
// entry contains the elements needed to process an instance registration.
type entry struct {
deadline time.Time
ch chan *result
}
// TokenValidator verifies if a token is valid.
type TokenValidator func(ctx context.Context, jwt string) bool
// Rendezvous waits for buildlets to connect, verifies they are valid instances
// and passes the connection to the waiting caller.
type Rendezvous struct {
mu sync.Mutex
m map[string]*entry
validator TokenValidator
}
// Option is an optional configuration setting.
type Option func(*Rendezvous)
// OptionValidator changes the verifier used by Rendezvous.
func OptionValidator(v TokenValidator) Option {
return func(rdv *Rendezvous) {
rdv.validator = v
}
}
// New creates a Rendezvous element. The context that is passed in should be non-canceled
// during the lifetime of the running service.
func New(ctx context.Context, opts ...Option) *Rendezvous {
rdv := &Rendezvous{
m: make(map[string]*entry),
validator: validateLUCIIDToken,
}
for _, opt := range opts {
opt(rdv)
}
go internal.PeriodicallyDo(ctx, 10*time.Second, func(ctx context.Context, t time.Time) {
rdv.purgeExpiredRegistrations()
})
return rdv
}
// purgeExpiredRegistrations will purge expired registrations.
func (rdv *Rendezvous) purgeExpiredRegistrations() {
rdv.mu.Lock()
for id, ent := range rdv.m {
if time.Now().After(ent.deadline) {
log.Printf("rendezvous: stopped waiting for instance=%q due to timeout", id)
ent.ch <- &result{err: fmt.Errorf("timed out waiting for rendezvous client=%q", id)}
delete(rdv.m, id)
}
}
rdv.mu.Unlock()
}
// RegisterInstance notes an instance and waits for that instance to connect to the handler. An
// instance must be registered before the instance can attempt to connect. If an instance does
// not connect before the end of the wait period, the instance will not be able to connect.
func (rdv *Rendezvous) RegisterInstance(ctx context.Context, id string, wait time.Duration) {
rdv.mu.Lock()
rdv.m[id] = &entry{
deadline: time.Now().Add(wait),
ch: make(chan *result, 1),
}
rdv.mu.Unlock()
}
// DeregisterInstance removes the registration for an instance which has been
// previously registered.
func (rdv *Rendezvous) DeregisterInstance(ctx context.Context, id string) {
rdv.mu.Lock()
delete(rdv.m, id)
rdv.mu.Unlock()
}
// WaitForInstance waits for the registered instance to successfully connect. It waits for the
// lifetime of the context. If the instance is not registered or has exceeded the timeout period,
// it will immediately return an error.
func (rdv *Rendezvous) WaitForInstance(ctx context.Context, id string) (buildlet.Client, error) {
rdv.mu.Lock()
e, ok := rdv.m[id]
rdv.mu.Unlock()
if !ok {
return nil, fmt.Errorf("instance not found: name=%q", id)
}
select {
case <-ctx.Done():
rdv.mu.Lock()
delete(rdv.m, id)
rdv.mu.Unlock()
return nil, fmt.Errorf("context timeout waiting for rendezvous client=%q: %w", id, ctx.Err())
case res := <-e.ch:
rdv.mu.Lock()
delete(rdv.m, id)
close(e.ch)
rdv.mu.Unlock()
return res.bc, res.err
}
}
const (
// HeaderID is the HTTP header used for passing the gomote ID.
HeaderID = "X-Go-Gomote-ID"
// HeaderToken is the HTTP header used for passing in the authentication token.
HeaderToken = "X-Go-Swarming-Auth-Token"
// HeaderHostname is the HTTP header used for passing in the hostname.
HeaderHostname = "X-Go-Hostname"
)
// HandleReverse handles HTTP requests from the buildlet and passes the connection to
// the waiter.
func (rdv *Rendezvous) HandleReverse(w http.ResponseWriter, r *http.Request) {
if r.TLS == nil {
http.Error(w, "buildlet registration requires SSL", http.StatusInternalServerError)
return
}
var (
id = r.Header.Get(HeaderID)
authToken = r.Header.Get(HeaderToken)
hostname = r.Header.Get(HeaderHostname)
)
if hostname == "" {
http.Error(w, "missing X-Go-Hostname header", http.StatusBadRequest)
return
}
if id == "" {
http.Error(w, "missing X-Go-Gomote-ID header", http.StatusBadRequest)
return
}
if authToken == "" {
http.Error(w, "missing X-Go-Swarming-Auth-Token header", http.StatusBadRequest)
return
}
rdv.mu.Lock()
res, ok := rdv.m[id]
rdv.mu.Unlock()
if !ok {
http.Error(w, "not expecting buildlet client", http.StatusPreconditionFailed)
return
}
if !rdv.validator(r.Context(), authToken) {
log.Printf("rendezvous: Unable to validate authentication token id=%s", id)
http.Error(w, "invalid authentication Token", http.StatusPreconditionFailed)
return
}
hj, ok := w.(http.Hijacker)
if !ok {
http.Error(w, "webserver does not support hijacking", http.StatusHTTPVersionNotSupported)
return
}
conn, _, err := hj.Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
res.ch <- &result{err: err}
return
}
bc, err := connToClient(conn, hostname, "swarming_task")
if err != nil {
log.Printf("rendezvous: unable to create buildlet client: %s", err)
conn.Close()
res.ch <- &result{err: err}
return
}
res.ch <- &result{bc: bc}
}
func connToClient(conn net.Conn, hostname, hostType string) (buildlet.Client, error) {
if err := (&http.Response{StatusCode: http.StatusSwitchingProtocols, Proto: "HTTP/1.1"}).Write(conn); err != nil {
log.Printf("gomote: error writing upgrade response to reverse buildlet %s (%s) at %s: %v", hostname, hostType, conn.RemoteAddr(), err)
conn.Close()
return nil, err
}
revDialer := revdial.NewDialer(conn, "/revdial")
revDialerDone := revDialer.Done()
dialer := revDialer.Dial
client := buildlet.NewClient(conn.RemoteAddr().String(), buildlet.NoKeyPair)
client.SetHTTPClient(&http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return dialer(ctx)
},
},
})
client.SetDialer(dialer)
client.SetDescription(fmt.Sprintf("reverse peer %s/%s for host type %v", hostname, conn.RemoteAddr(), hostType))
var isDead struct {
sync.Mutex
v bool
}
client.SetOnHeartbeatFailure(func() {
isDead.Lock()
isDead.v = true
isDead.Unlock()
conn.Close()
})
// If the reverse dialer (which is always reading from the
// conn detects that the remote went away, close the buildlet
// client proactively.
go func() {
<-revDialerDone
isDead.Lock()
defer isDead.Unlock()
if !isDead.v {
client.Close()
}
}()
tstatus := time.Now()
status, err := client.Status(context.Background())
if err != nil {
log.Printf("Reverse connection %s/%s for %s did not answer status after %v: %v",
hostname, conn.RemoteAddr(), hostType, time.Since(tstatus), err)
conn.Close()
return nil, err
}
log.Printf("Buildlet %s/%s: %+v for %s", hostname, conn.RemoteAddr(), status, hostType)
return client, nil
}
// validateLUCIIDToken verifies that the token is valid and contains the expected fields.
func validateLUCIIDToken(ctx context.Context, jwt string) bool {
payload, err := idtoken.Validate(ctx, jwt, "https://gomote.golang.org")
if err != nil {
log.Printf("rendezvous: unable to validate authentication token: %s", err)
return false
}
if payload.Issuer != "https://accounts.google.com" {
log.Printf("rendezvous: incorrect issuer: %q", payload.Issuer)
return false
}
if payload.Expires+30 < time.Now().Unix() || payload.IssuedAt-30 > time.Now().Unix() {
log.Printf("rendezvous: Bad JWT times: expires %v, issued %v", time.Unix(payload.Expires, 0), time.Unix(payload.IssuedAt, 0))
return false
}
email, ok := payload.Claims["email"]
if !ok || email != "coordinator-builder@golang-ci-luci.iam.gserviceaccount.com" {
log.Printf("rendezvous: incorrect email=%s", email)
return false
}
emailVerified, ok := payload.Claims["email_verified"].(bool)
if !ok || !emailVerified {
log.Printf("rendezvous: email unverified email=%s", email)
return false
}
return true
}