blob: 1963fe4ce31f4c8d4df3c1f135d2ff5db3b1e7c1 [file] [log] [blame]
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sign
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/google/uuid"
"golang.org/x/build/internal/access"
"golang.org/x/build/internal/relui/protos"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var _ Service = (*SigningServer)(nil)
// SigningServer is a GRPC signing server used to send signing requests and
// signing status requests to a client.
type SigningServer struct {
protos.UnimplementedReleaseServiceServer
// requests is a channel of outgoing signing requests to be sent to
// any of the connected signing clients.
requests chan *protos.SigningRequest
// callback is a map of the message ID to callback association used to
// respond to previously created requests to a channel.
callbackMu sync.Mutex
callback map[string]func(*signResponse) // Key is message ID.
}
// NewServer creates a GRPC signing server used to send signing requests and
// signing status requests to a client.
func NewServer() *SigningServer {
return &SigningServer{
requests: make(chan *protos.SigningRequest),
callback: make(map[string]func(*signResponse)),
}
}
// UpdateSigningStatus uses a bidirectional streaming connection to send signing requests to the client
// and receive status updates on signing requests. There is no specific order which the requests or responses
// need to occur in. The connection returns an error once the context is canceled or an error is encountered.
func (rs *SigningServer) UpdateSigningStatus(stream protos.ReleaseService_UpdateSigningStatusServer) error {
iap, err := access.IAPFromContext(stream.Context())
if err != nil {
return status.Errorf(codes.Unauthenticated, "request does not contain the required authentication")
}
t := time.Now()
log.Printf("SigningServer: a client connected (iap = %+v)\n", iap)
g, ctx := errgroup.WithContext(stream.Context())
g.Go(func() error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-rs.requests:
err := stream.Send(req)
if err != nil {
rs.callAndDeregister(req.GetMessageId(), &signResponse{err: err})
return err
}
}
}
})
g.Go(func() error {
for {
resp, err := stream.Recv()
if err != nil {
return err
}
rs.callAndDeregister(resp.GetMessageId(), &signResponse{status: resp})
}
})
err = g.Wait()
log.Printf("SigningServer: a client disconnected after %v (err = %v)\n", time.Since(t), err)
return err
}
// do sends a signing request and returns the corresponding signing response.
// It blocks until a response is received or the context times out or is canceled.
func (rs *SigningServer) do(ctx context.Context, req *protos.SigningRequest) (resp *protos.SigningStatus, err error) {
t := time.Now()
defer func() {
if err == nil {
log.Printf("SigningServer: successfully round-tripped message=%q in %v:\n req = %v\n resp = %v\n", req.GetMessageId(), time.Since(t), req, resp)
} else {
log.Printf("SigningServer: communication error %v for message=%q after %v:\n req = %v\n", err, req.GetMessageId(), time.Since(t), req)
}
}()
// Register where to send the response for this message ID.
respCh := make(chan *signResponse, 1) // Room for one response.
rs.register(req.GetMessageId(), func(r *signResponse) { respCh <- r })
// Send the request.
select {
case rs.requests <- req:
case <-ctx.Done():
rs.deregister(req.GetMessageId())
return nil, ctx.Err()
}
// Wait for the response.
select {
case resp := <-respCh:
return resp.status, resp.err
case <-ctx.Done():
rs.deregister(req.GetMessageId())
return nil, ctx.Err()
}
}
// SignArtifact implements Service.
func (rs *SigningServer) SignArtifact(ctx context.Context, bt BuildType, objectURI []string) (jobID string, _ error) {
resp, err := rs.do(ctx, &protos.SigningRequest{
MessageId: uuid.NewString(),
RequestOneof: &protos.SigningRequest_Sign{Sign: &protos.SignArtifactRequest{
BuildType: bt.proto(),
GcsUri: objectURI,
}},
})
if err != nil {
return "", err
}
switch t := resp.StatusOneof.(type) {
case *protos.SigningStatus_Started:
return t.Started.JobId, nil
case *protos.SigningStatus_Failed:
return "", fmt.Errorf("failed to start %v signing on %q: %s", bt, objectURI, t.Failed.GetDescription())
default:
return "", fmt.Errorf("unexpected response type %T for a sign request", t)
}
}
// ArtifactSigningStatus implements Service.
func (rs *SigningServer) ArtifactSigningStatus(ctx context.Context, jobID string) (_ Status, desc string, objectURI []string, _ error) {
resp, err := rs.do(ctx, &protos.SigningRequest{
MessageId: uuid.NewString(),
RequestOneof: &protos.SigningRequest_Status{Status: &protos.SignArtifactStatusRequest{
JobId: jobID,
}},
})
if err != nil {
return StatusUnknown, "", nil, err
}
switch t := resp.StatusOneof.(type) {
case *protos.SigningStatus_Completed:
return StatusCompleted, "", t.Completed.GetGcsUri(), nil
case *protos.SigningStatus_Failed:
return StatusFailed, t.Failed.GetDescription(), nil, nil
case *protos.SigningStatus_NotFound:
return StatusNotFound, fmt.Sprintf("signing job %q not found", jobID), nil, nil
case *protos.SigningStatus_Running:
return StatusRunning, t.Running.GetDescription(), nil, nil
default:
return 0, "", nil, fmt.Errorf("unexpected response type %T for a status request", t)
}
}
// CancelSigning implements Service.
func (rs *SigningServer) CancelSigning(ctx context.Context, jobID string) error {
_, err := rs.do(ctx, &protos.SigningRequest{
MessageId: uuid.NewString(),
RequestOneof: &protos.SigningRequest_Cancel{Cancel: &protos.SignArtifactCancelRequest{
JobId: jobID,
}},
})
return err
}
// signResponse contains the response and error from a signing request.
type signResponse struct {
status *protos.SigningStatus
err error
}
// register creates a message ID to channel association.
func (s *SigningServer) register(messageID string, f func(*signResponse)) {
s.callbackMu.Lock()
s.callback[messageID] = f
s.callbackMu.Unlock()
}
// deregister removes the channel to message ID association.
func (s *SigningServer) deregister(messageID string) {
s.callbackMu.Lock()
delete(s.callback, messageID)
s.callbackMu.Unlock()
}
// callAndDeregister calls the callback associated with the message ID.
// If no callback is registered for the message ID, the response is dropped.
// The callback registration is always removed if it exists.
func (s *SigningServer) callAndDeregister(messageID string, resp *signResponse) {
s.callbackMu.Lock()
defer s.callbackMu.Unlock()
respFunc, ok := s.callback[messageID]
if !ok {
// drop the message
log.Printf("SigningServer: caller not found for message=%q", messageID)
return
}
delete(s.callback, messageID)
respFunc(resp)
}