blob: 7586f325fba290b0db6d3f99d2686399fc3f6796 [file] [log] [blame]
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build linux || darwin
package gomote
import (
"context"
"fmt"
"log"
"net/http"
"strings"
"time"
"cloud.google.com/go/storage"
"github.com/google/uuid"
"go.chromium.org/luci/swarming/client/swarming"
swarmpb "go.chromium.org/luci/swarming/proto/api_v2"
"golang.org/x/build/buildlet"
"golang.org/x/build/internal"
"golang.org/x/build/internal/access"
"golang.org/x/build/internal/coordinator/remote"
"golang.org/x/build/internal/gomote/protos"
"golang.org/x/build/internal/rendezvous"
"golang.org/x/build/internal/swarmclient"
"golang.org/x/crypto/ssh"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type rendezvousClient interface {
DeregisterInstance(ctx context.Context, id string)
HandleReverse(w http.ResponseWriter, r *http.Request)
RegisterInstance(ctx context.Context, id string, wait time.Duration)
WaitForInstance(ctx context.Context, id string) (buildlet.Client, error)
}
// SwarmingServer is a gomote server implementation which supports LUCI swarming bots.
type SwarmingServer struct {
// embed the unimplemented server.
protos.UnimplementedGomoteServiceServer
bucket bucketHandle
buildlets *remote.SessionPool
gceBucketName string
luciConfigClient *swarmclient.ConfigClient
rendezvous rendezvousClient
sshCertificateAuthority ssh.Signer
swarmingClient swarming.Client
}
// NewSwarming creates a gomote server. If the rawCAPriKey is invalid, the program will exit.
func NewSwarming(rsp *remote.SessionPool, rawCAPriKey []byte, gomoteGCSBucket string, storageClient *storage.Client, configClient *swarmclient.ConfigClient, rdv *rendezvous.Rendezvous, swarmClient swarming.Client) (*SwarmingServer, error) {
signer, err := ssh.ParsePrivateKey(rawCAPriKey)
if err != nil {
return nil, fmt.Errorf("unable to parse raw certificate authority private key into signer=%w", err)
}
return &SwarmingServer{
bucket: storageClient.Bucket(gomoteGCSBucket),
buildlets: rsp,
gceBucketName: gomoteGCSBucket,
luciConfigClient: configClient,
rendezvous: rdv,
sshCertificateAuthority: signer,
swarmingClient: swarmClient,
}, nil
}
// Authenticate will allow the caller to verify that they are properly authenticated and authorized to interact with the
// Service.
func (ss *SwarmingServer) Authenticate(ctx context.Context, req *protos.AuthenticateRequest) (*protos.AuthenticateResponse, error) {
_, err := access.IAPFromContext(ctx)
if err != nil {
log.Printf("Authenticate access.IAPFromContext(ctx) = nil, %s", err)
return nil, status.Errorf(codes.Unauthenticated, "request does not contain the required authentication")
}
return &protos.AuthenticateResponse{}, nil
}
// CreateInstance will create a gomote instance within a swarming task for the authenticated user.
func (ss *SwarmingServer) CreateInstance(req *protos.CreateInstanceRequest, stream protos.GomoteService_CreateInstanceServer) error {
creds, err := access.IAPFromContext(stream.Context())
if err != nil {
log.Printf("CreateInstance access.IAPFromContext(ctx) = nil, %s", err)
return status.Errorf(codes.Unauthenticated, "request does not contain the required authentication")
}
if req.GetBuilderType() == "" {
return status.Errorf(codes.InvalidArgument, "invalid builder type")
}
bots, err := ss.luciConfigClient.ListSwarmingBots(stream.Context())
if err != nil {
log.Printf("luciConfigClient.ListSwarmingBots(ctx) = %s", err)
return err
}
var botDesc *swarmclient.SwarmingBot
for _, bot := range bots {
if bot.BucketName == "ci" && req.GetBuilderType() == bot.Name {
botDesc = bot
break
}
}
if botDesc == nil {
return status.Errorf(codes.InvalidArgument, "unknown builder type")
}
userName, err := emailToUser(creds.Email)
if err != nil {
return status.Errorf(codes.Internal, "invalid user email format")
}
type result struct {
buildletClient buildlet.Client
err error
}
rc := make(chan result, 1)
dimensions := make(map[string]string)
for _, bd := range botDesc.Dimensions {
k, v, ok := strings.Cut(bd, ":")
if ok {
dimensions[k] = v
} else {
log.Printf("failed dimension cut: %s", bd)
}
}
name := fmt.Sprintf("gomote-%s-%s", userName, uuid.NewString())
go func() {
bc, err := ss.startNewSwarmingTask(stream.Context(), name, dimensions, &SwarmOpts{})
if err != nil {
log.Printf("startNewSwarmingTask() = %s", err)
}
rc <- result{bc, err}
}()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-stream.Context().Done():
return status.Errorf(codes.DeadlineExceeded, "timed out waiting for gomote instance to be created")
case <-ticker.C:
err := stream.Send(&protos.CreateInstanceResponse{
Status: protos.CreateInstanceResponse_WAITING,
WaitersAhead: int64(0), // Not convinced querying for pending jobs is useful
})
if err != nil {
return status.Errorf(codes.Internal, "unable to stream result: %s", err)
}
case r := <-rc:
if r.err != nil {
log.Printf("error creating gomote buildlet instance=%s: %s", name, r.err)
return status.Errorf(codes.Internal, "gomote creation failed instance=%s", name)
}
gomoteID := ss.buildlets.AddSession(creds.ID, userName, req.GetBuilderType(), "swarming task", r.buildletClient)
log.Printf("created buildlet %s for %s (%s)", gomoteID, userName, r.buildletClient.String())
session, err := ss.buildlets.Session(gomoteID)
if err != nil {
return status.Errorf(codes.Internal, "unable to query for gomote timeout") // this should never happen
}
wd, err := r.buildletClient.WorkDir(stream.Context())
if err != nil {
return status.Errorf(codes.Internal, "could not read working dir: %v", err)
}
err = stream.Send(&protos.CreateInstanceResponse{
Instance: &protos.Instance{
GomoteId: gomoteID,
BuilderType: req.GetBuilderType(),
HostType: "swarming task",
Expires: session.Expires.Unix(),
WorkingDir: wd,
},
Status: protos.CreateInstanceResponse_COMPLETE,
WaitersAhead: 0,
})
if err != nil {
return status.Errorf(codes.Internal, "unable to stream result: %s", err)
}
return nil
}
}
}
// DestroyInstance will destroy a gomote instance. It will ensure that the caller is authenticated and is the owner of the instance
// before it destroys the instance.
func (ss *SwarmingServer) DestroyInstance(ctx context.Context, req *protos.DestroyInstanceRequest) (*protos.DestroyInstanceResponse, error) {
creds, err := access.IAPFromContext(ctx)
if err != nil {
log.Printf("DestroyInstance access.IAPFromContext(ctx) = nil, %s", err)
return nil, status.Errorf(codes.Unauthenticated, "request does not contain the required authentication")
}
if req.GetGomoteId() == "" {
return nil, status.Errorf(codes.InvalidArgument, "invalid gomote ID")
}
_, err = ss.session(req.GetGomoteId(), creds.ID)
if err != nil {
// the helper function returns a meaningful GRPC error.
return nil, err
}
if err := ss.buildlets.DestroySession(req.GetGomoteId()); err != nil {
log.Printf("DestroyInstance remote.DestroySession(%s) = %s", req.GetGomoteId(), err)
return nil, status.Errorf(codes.Internal, "unable to destroy gomote instance")
}
// TODO(go.dev/issue/63819) consider destroying the bot after the task has ended.
return &protos.DestroyInstanceResponse{}, nil
}
// ExecuteCommand will execute a command on a gomote instance. The output from the command will be streamed back to the caller if the output is set.
func (ss *SwarmingServer) ExecuteCommand(req *protos.ExecuteCommandRequest, stream protos.GomoteService_ExecuteCommandServer) error {
creds, err := access.IAPFromContext(stream.Context())
if err != nil {
return status.Errorf(codes.Unauthenticated, "request does not contain the required authentication")
}
ses, bc, err := ss.sessionAndClient(stream.Context(), req.GetGomoteId(), creds.ID)
if err != nil {
// the helper function returns meaningful GRPC error.
return err
}
builderType := req.GetImitateHostType()
if builderType == "" {
builderType = ses.BuilderType
}
remoteErr, execErr := bc.Exec(stream.Context(), req.GetCommand(), buildlet.ExecOpts{
Dir: req.GetDirectory(),
SystemLevel: req.GetSystemLevel(),
Output: &streamWriter{writeFunc: func(p []byte) (int, error) {
err := stream.Send(&protos.ExecuteCommandResponse{
Output: p,
})
if err != nil {
return 0, fmt.Errorf("unable to send data=%w", err)
}
return len(p), nil
}},
Args: req.GetArgs(),
Debug: req.GetDebug(),
Path: req.GetPath(),
})
if execErr != nil {
// there were system errors preventing the command from being started or seen to completion.
return status.Errorf(codes.Aborted, "unable to execute command: %s", execErr)
}
if remoteErr != nil {
// the command failed remotely
return status.Errorf(codes.Unknown, "command execution failed: %s", remoteErr)
}
return nil
}
// InstanceAlive will ensure that the gomote instance is still alive and will extend the timeout. The requester must be authenticated.
func (ss *SwarmingServer) InstanceAlive(ctx context.Context, req *protos.InstanceAliveRequest) (*protos.InstanceAliveResponse, error) {
creds, err := access.IAPFromContext(ctx)
if err != nil {
log.Printf("InstanceAlive access.IAPFromContext(ctx) = nil, %s", err)
return nil, status.Errorf(codes.Unauthenticated, "request does not contain the required authentication")
}
if req.GetGomoteId() == "" {
return nil, status.Errorf(codes.InvalidArgument, "invalid gomote ID")
}
_, err = ss.session(req.GetGomoteId(), creds.ID)
if err != nil {
// the helper function returns meaningful GRPC error.
return nil, err
}
if err := ss.buildlets.RenewTimeout(req.GetGomoteId()); err != nil {
return nil, status.Errorf(codes.Internal, "unable to renew timeout")
}
return &protos.InstanceAliveResponse{}, nil
}
// ListSwarmingBuilders lists all of the swarming builders which run for gotip. The requester must be authenticated.
func (ss *SwarmingServer) ListSwarmingBuilders(ctx context.Context, req *protos.ListSwarmingBuildersRequest) (*protos.ListSwarmingBuildersResponse, error) {
_, err := access.IAPFromContext(ctx)
if err != nil {
log.Printf("ListSwarmingInstances access.IAPFromContext(ctx) = nil, %s", err)
return nil, status.Errorf(codes.Unauthenticated, "request does not contain the required authentication")
}
bots, err := ss.luciConfigClient.ListSwarmingBots(ctx)
if err != nil {
log.Printf("luciConfigClient.ListSwarmingBots(ctx) = %s", err)
return nil, status.Errorf(codes.Internal, "unable to query for bots")
}
var builders []string
for _, bot := range bots {
if bot.BucketName == "ci" && strings.HasPrefix(bot.Name, "gotip") {
builders = append(builders, bot.Name)
}
}
return &protos.ListSwarmingBuildersResponse{Builders: builders}, nil
}
// ListInstances will list the gomote instances owned by the requester. The requester must be authenticated.
func (ss *SwarmingServer) ListInstances(ctx context.Context, req *protos.ListInstancesRequest) (*protos.ListInstancesResponse, error) {
creds, err := access.IAPFromContext(ctx)
if err != nil {
log.Printf("ListInstances access.IAPFromContext(ctx) = nil, %s", err)
return nil, status.Errorf(codes.Unauthenticated, "request does not contain the required authentication")
}
res := &protos.ListInstancesResponse{}
for _, s := range ss.buildlets.List() {
if s.OwnerID != creds.ID {
continue
}
res.Instances = append(res.Instances, &protos.Instance{
GomoteId: s.ID,
BuilderType: s.BuilderType,
HostType: s.HostType,
Expires: s.Expires.Unix(),
})
}
return res, nil
}
// session is a helper function that retrieves a session associated with the gomoteID and ownerID.
func (ss *SwarmingServer) session(gomoteID, ownerID string) (*remote.Session, error) {
session, err := ss.buildlets.Session(gomoteID)
if err != nil {
return nil, status.Errorf(codes.NotFound, "specified gomote instance does not exist")
}
if session.OwnerID != ownerID {
return nil, status.Errorf(codes.PermissionDenied, "not allowed to modify this gomote session")
}
return session, nil
}
// sessionAndClient is a helper function that retrieves a session and buildlet client for the
// associated gomoteID and ownerID. The gomote instance timeout is renewed if the gomote id and owner id
// are valid.
func (ss *SwarmingServer) sessionAndClient(ctx context.Context, gomoteID, ownerID string) (*remote.Session, buildlet.Client, error) {
session, err := ss.session(gomoteID, ownerID)
if err != nil {
return nil, nil, err
}
bc, err := ss.buildlets.BuildletClient(gomoteID)
if err != nil {
return nil, nil, status.Errorf(codes.NotFound, "specified gomote instance does not exist")
}
if err := ss.buildlets.KeepAlive(ctx, gomoteID); err != nil {
log.Printf("gomote: unable to keep alive %s: %s", gomoteID, err)
}
return session, bc, nil
}
// SwarmOpts provides additional options for swarming task creation.
type SwarmOpts struct {
// OnInstanceRequested optionally specifies a hook to run synchronously
// after the computeService.Instances.Insert call, but before
// waiting for its operation to proceed.
OnInstanceRequested func()
// OnInstanceCreated optionally specifies a hook to run synchronously
// after the instance operation succeeds.
OnInstanceCreated func()
// OnInstanceRegistration optionally specifies a hook to run synchronously
// after the instance has been registered in rendezvous.
OnInstanceRegistration func()
}
// startNewSwarmingTask starts a new swarming task on a bot with the buildlet
// running on it. It returns a buildlet client configured to speak to it.
// The request will last as long as the lifetime of the context. The dimensions
// are a set of key value pairs used to describe what instance type to create.
func (ss *SwarmingServer) startNewSwarmingTask(ctx context.Context, name string, dimensions map[string]string, opts *SwarmOpts) (buildlet.Client, error) {
ss.rendezvous.RegisterInstance(ctx, name, 10*time.Minute)
condRun(opts.OnInstanceRegistration)
taskID, err := ss.newSwarmingTask(ctx, name, dimensions, opts)
if err != nil {
ss.rendezvous.DeregisterInstance(ctx, name)
return nil, err
}
log.Printf("gomote: swarming task requested name=%s taskID=%s", name, taskID)
condRun(opts.OnInstanceRequested)
queryCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
var taskUp bool
tryThenPeriodicallyDo(queryCtx, 5*time.Second, func(ctx context.Context, _ time.Time) {
resp, err := ss.swarmingClient.TaskResult(ctx, taskID, &swarming.TaskResultFields{WithPerf: false})
if err != nil {
log.Printf("gomote: unable to query for swarming task state: name=%s taskID=%s %s", name, taskID, err)
return
}
switch taskState := resp.GetState(); taskState {
case swarmpb.TaskState_COMPLETED, swarmpb.TaskState_RUNNING:
taskUp = true
cancel()
case swarmpb.TaskState_EXPIRED, swarmpb.TaskState_INVALID, swarmpb.TaskState_BOT_DIED, swarmpb.TaskState_CANCELED,
swarmpb.TaskState_CLIENT_ERROR, swarmpb.TaskState_KILLED, swarmpb.TaskState_NO_RESOURCE, swarmpb.TaskState_TIMED_OUT:
log.Printf("gomote: swarming task creation failed name=%s state=%s", name, taskState)
cancel()
case swarmpb.TaskState_PENDING:
// continue waiting
default:
log.Printf("gomote: unexpected swarming task state for %s: %s", name, taskState)
}
})
if !taskUp {
ss.rendezvous.DeregisterInstance(ctx, name)
return nil, fmt.Errorf("unable to create swarming task name=%s taskID=%s", name, taskID)
}
condRun(opts.OnInstanceCreated)
bc, err := ss.waitForInstanceOrFailure(ctx, taskID, name)
if err != nil {
ss.rendezvous.DeregisterInstance(ctx, name)
return nil, err
}
return bc, nil
}
// waitForInstanceOrFailure waits for either the swarming task to enter a failed state or the successful connection from
// a buildlet client.
func (ss *SwarmingServer) waitForInstanceOrFailure(ctx context.Context, taskID, name string) (buildlet.Client, error) {
queryCtx, cancel := context.WithTimeout(ctx, 5*time.Minute)
checkForTaskFailure := func(pollCtx context.Context) <-chan error {
errCh := make(chan error, 1)
go func() {
internal.PeriodicallyDo(pollCtx, 10*time.Second, func(ctx context.Context, _ time.Time) {
resp, err := ss.swarmingClient.TaskResult(ctx, taskID, &swarming.TaskResultFields{WithPerf: false})
if err != nil {
log.Printf("gomote: unable to query for swarming task state: name=%s taskID=%s %s", name, taskID, err)
return
}
switch taskState := resp.GetState(); taskState {
case swarmpb.TaskState_RUNNING:
// expected
case swarmpb.TaskState_EXPIRED, swarmpb.TaskState_INVALID, swarmpb.TaskState_BOT_DIED, swarmpb.TaskState_CANCELED,
swarmpb.TaskState_CLIENT_ERROR, swarmpb.TaskState_KILLED, swarmpb.TaskState_NO_RESOURCE, swarmpb.TaskState_TIMED_OUT, swarmpb.TaskState_COMPLETED:
errCh <- fmt.Errorf("swarming task creation failed name=%s state=%s", name, taskState)
default:
log.Printf("gomote: unexpected swarming task state for %s: %s", name, taskState)
}
})
}()
return errCh
}
type result struct {
err error
bc buildlet.Client
}
getConn := func(waitCtx context.Context) <-chan *result {
ch := make(chan *result, 1)
go func() {
bc, err := ss.rendezvous.WaitForInstance(waitCtx, name)
if err != nil {
ss.rendezvous.DeregisterInstance(ctx, name)
}
ch <- &result{err: err, bc: bc}
}()
return ch
}
statusChan := checkForTaskFailure(queryCtx)
resChan := getConn(queryCtx)
select {
case err := <-statusChan:
cancel()
ss.rendezvous.DeregisterInstance(ctx, name)
log.Printf("gomote: failed waiting for task to run %q: %s", name, err)
return nil, err
case r := <-resChan:
cancel()
if r.err != nil {
log.Printf("gomote: failed to establish connection %q: %s", name, r.err)
return nil, r.err
}
return r.bc, r.err
}
}
func buildletStartup(goos, goarch string) string {
return fmt.Sprintf(`
export GO_TASK_ROOT=$(pwd) &&
export GO_BIN=$GO_TASK_ROOT/bin &&
mkdir $GO_BIN &&
export PATH=$PATH:$GO_BIN &&
curl -s https://storage.googleapis.com/go-builder-data/buildlet.%s-%s -L --output $GO_BIN/buildlet &&
chmod +x $GO_BIN/buildlet &&
$GO_BIN/buildlet --coordinator=gomotessh.golang.org:443 --reverse-type swarming-task -swarming-bot -halt=false
`, goos, goarch)
}
func createStringPairs(m map[string]string) []*swarmpb.StringPair {
dims := make([]*swarmpb.StringPair, 0, len(m))
for k, v := range m {
dims = append(dims, &swarmpb.StringPair{
Key: k,
Value: v,
})
}
return dims
}
func platformToGoValues(platform string) (goos string, goarch string, err error) {
goos, goarch, ok := strings.Cut(platform, "-")
if !ok {
return "", "", fmt.Errorf("cipd_platform not in proper format=%s", platform)
}
if goos == "Mac" {
goos = "darwin"
}
return goos, goarch, nil
}
func (ss *SwarmingServer) newSwarmingTask(ctx context.Context, name string, dimensions map[string]string, opts *SwarmOpts) (string, error) {
cipdPlatform, ok := dimensions["cipd_platform"]
if !ok {
return "", fmt.Errorf("dimensions require cipd_platform: instance=%s", name)
}
goos, goarch, err := platformToGoValues(cipdPlatform)
if err != nil {
return "", err
}
req := &swarmpb.NewTaskRequest{
ExpirationSecs: 86400,
Name: name,
Priority: 30,
Properties: &swarmpb.TaskProperties{
Caches: []*swarmpb.CacheEntry{},
CipdInput: &swarmpb.CipdInput{
Packages: []*swarmpb.CipdPackage{
{Path: "tools/bin", PackageName: "infra/tools/luci-auth/" + cipdPlatform, Version: "latest"},
{Path: "tools", PackageName: "golang/bootstrap-go/" + cipdPlatform, Version: "latest"},
{Path: "tools", PackageName: "infra/3pp/tools/gcloud/" + cipdPlatform, Version: "latest"},
{Path: "tools", PackageName: "infra/3pp/tools/cpython3/" + cipdPlatform, Version: "latest"},
},
},
EnvPrefixes: []*swarmpb.StringListPair{
{Key: "PATH", Value: []string{"tools/bin"}},
},
Command: []string{"bash", "-cx", buildletStartup(goos, goarch)},
RelativeCwd: "",
Dimensions: createStringPairs(dimensions),
Env: []*swarmpb.StringPair{
&swarmpb.StringPair{
Key: "GOMOTEID",
Value: name,
},
},
ExecutionTimeoutSecs: 86400,
},
ServiceAccount: "coordinator-builder@golang-ci-luci.iam.gserviceaccount.com",
Realm: "golang:ci",
}
taskMD, err := ss.swarmingClient.NewTask(ctx, req)
if err != nil {
log.Printf("gomote: swarming task creation failed name=%s: %s", name, err)
return "", fmt.Errorf("unable to start task: %w", err)
}
log.Printf("gomote: task created: id=%s https://chromium-swarm.appspot.com/task?id=%s", taskMD.TaskId, taskMD.TaskId)
return taskMD.TaskId, nil
}
func condRun(fn func()) {
if fn != nil {
fn()
}
}
// tryThenPeriodicallyDo calls f and then calls f every period until the provided context is cancelled.
func tryThenPeriodicallyDo(ctx context.Context, period time.Duration, f func(context.Context, time.Time)) {
f(ctx, time.Now())
internal.PeriodicallyDo(ctx, period, f)
}