blob: 8ad8b424491d901a15852e733d8fe26d1a1bd036 [file] [log] [blame]
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package worker
import (
"context"
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"strings"
"sync"
"time"
"cloud.google.com/go/errorreporting"
"github.com/google/safehtml/template"
"golang.org/x/exp/event"
"golang.org/x/pkgsite-metrics/internal/bigquery"
"golang.org/x/pkgsite-metrics/internal/config"
"golang.org/x/pkgsite-metrics/internal/derrors"
"golang.org/x/pkgsite-metrics/internal/log"
"golang.org/x/pkgsite-metrics/internal/observe"
"golang.org/x/pkgsite-metrics/internal/proxy"
)
type Server struct {
cfg *config.Config
observer *observe.Observer
bqClient *bigquery.Client
proxyClient *proxy.Client
staticPath template.TrustedSource
devMode bool
mu sync.Mutex
templates map[string]*template.Template
}
var errBQDisabled = &serverError{http.StatusPreconditionRequired, errors.New("BigQuery disabled on this server")}
func NewServer(ctx context.Context, cfg *config.Config) (_ *Server, err error) {
defer derrors.WrapAndReport(&err, "NewServer")
var bq *bigquery.Client
if strings.EqualFold(cfg.BigQueryDataset, "disable") {
log.Infof(ctx, "BigQuery disabled")
} else {
bq, err = bigquery.NewClientCreate(ctx, cfg.ProjectID, cfg.BigQueryDataset)
if err != nil {
return nil, err
}
for _, tableID := range bigquery.Tables() {
created, err := bq.CreateOrUpdateTable(ctx, tableID)
if err != nil {
return nil, err
}
verb := "updated"
if created {
verb = "created"
}
log.Infof(ctx, "%s table %s\n", verb, tableID)
}
}
proxyClient, err := proxy.New(cfg.ProxyURL)
if err != nil {
return nil, err
}
s := &Server{
cfg: cfg,
bqClient: bq,
proxyClient: proxyClient,
devMode: cfg.DevMode,
staticPath: cfg.StaticPath,
}
if err := s.loadTemplates(); err != nil {
return nil, err
}
s.observer, err = observe.NewObserver(ctx, cfg.ProjectID, "go-metrics-worker")
if err != nil {
return nil, err
}
// This function will be called for each request.
// It lets us install a log handler that knows about the request's
// trace ID.
s.observer.LogHandlerFunc = func(r *http.Request) event.Handler {
traceID := r.Header.Get("X-Cloud-Trace-Context")
return log.NewGCPJSONHandler(os.Stderr, traceID)
}
if cfg.UseErrorReporting {
reportingClient, err := errorreporting.NewClient(ctx, cfg.ProjectID, errorreporting.Config{
ServiceName: cfg.ServiceID,
OnError: func(err error) {
log.Errorf(ctx, "Error reporting failed: %v", err)
},
})
if err != nil {
return nil, err
}
derrors.SetReportingClient(reportingClient)
}
http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir(s.staticPath.String()))))
s.handle("/favicon.ico", func(w http.ResponseWriter, r *http.Request) error {
http.ServeFile(w, r, filepath.Join(s.staticPath.String(), "favicon.ico"))
return nil
})
s.handle("/", s.handleIndexPage)
return s, nil
}
const metricNamespace = "ecosystem/worker"
type handlerFunc func(w http.ResponseWriter, r *http.Request) error
func (s *Server) handle(pattern string, handler handlerFunc) {
h := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
ctx := r.Context()
log.With("httpRequest", r).Infof(ctx, "starting %s", r.URL.Path)
w2 := &responseWriter{ResponseWriter: w}
if err := handler(w2, r); err != nil {
log.Errorf(ctx, err.Error())
derrors.Report(err)
s.serveError(ctx, w2, r, err)
}
log.With(
"latency", time.Since(start),
"status", translateStatus(w2.status)).
Infof(ctx, "request end")
})
http.Handle(pattern, s.observer.Observe(h))
}
type serverError struct {
status int // HTTP status code
err error // wrapped error
}
func (s *serverError) Error() string {
return fmt.Sprintf("%d (%s): %v", s.status, http.StatusText(s.status), s.err)
}
func (s *Server) serveError(ctx context.Context, w http.ResponseWriter, _ *http.Request, err error) {
if errors.Is(err, derrors.InvalidArgument) {
err = &serverError{err: err, status: http.StatusBadRequest}
}
if errors.Is(err, derrors.NotFound) {
err = &serverError{err: err, status: http.StatusNotFound}
}
if errors.Is(err, derrors.BadModule) {
err = &serverError{err: err, status: http.StatusNotAcceptable}
}
serr, ok := err.(*serverError)
if !ok {
serr = &serverError{status: http.StatusInternalServerError, err: err}
}
if serr.status == http.StatusInternalServerError {
log.Errorf(ctx, serr.err.Error())
} else {
log.Warningf(ctx, "returning %v", err)
}
http.Error(w, serr.err.Error(), serr.status)
}
type responseWriter struct {
http.ResponseWriter
status int
}
func (rw *responseWriter) WriteHeader(code int) {
rw.status = code
rw.ResponseWriter.WriteHeader(code)
}
func translateStatus(code int) int64 {
if code == 0 {
return http.StatusOK
}
return int64(code)
}
var locNewYork *time.Location
func init() {
var err error
locNewYork, err = time.LoadLocation("America/New_York")
if err != nil {
log.Errorf(context.Background(), "time.LoadLocation: %v", err)
os.Exit(1)
}
}
func FormatTime(t time.Time) string {
if t.IsZero() {
return "-"
}
return t.In(locNewYork).Format("2006-01-02 15:04:05")
}