blob: eaca2baa7a7050f09b8a05ab701ffd6ede72e538 [file] [log] [blame]
// Copyright 2019 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package etl
import (
"context"
"errors"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.opencensus.io/trace"
"golang.org/x/discovery/internal"
"golang.org/x/discovery/internal/derrors"
"golang.org/x/discovery/internal/index"
"golang.org/x/discovery/internal/postgres"
"golang.org/x/discovery/internal/proxy"
"golang.org/x/discovery/internal/sample"
"golang.org/x/xerrors"
)
const testTimeout = 30 * time.Second
var testDB *postgres.DB
func TestMain(m *testing.M) {
httpClient = &http.Client{Transport: fakeTransport{}}
postgres.RunDBTests("discovery_etl_test", m, &testDB)
}
type fakeTransport struct{}
func (fakeTransport) RoundTrip(*http.Request) (*http.Response, error) {
return nil, errors.New("bad")
}
type debugExporter struct {
t *testing.T
}
func (e debugExporter) ExportSpan(s *trace.SpanData) {
e.t.Logf("⚡ %s: %v", s.Name, s.EndTime.Sub(s.StartTime))
}
func setupTraceDebugging(t *testing.T) {
trace.RegisterExporter(debugExporter{t})
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
}
func TestETL(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()
setupTraceDebugging(t)
appVersionLabel = "20190000t000000"
var (
start = sample.NowTruncated()
fooIndex = &internal.IndexVersion{
Path: "foo.com/foo",
Timestamp: start,
Version: "v1.0.0",
}
barIndex = &internal.IndexVersion{
Path: "foo.com/bar",
Timestamp: start.Add(time.Second),
Version: "v0.0.1",
}
fooProxy = proxy.NewTestVersion(t, fooIndex.Path, fooIndex.Version, map[string]string{
"foo.go": "package foo\nconst Foo = \"Foo\"",
})
barProxy = proxy.NewTestVersion(t, barIndex.Path, barIndex.Version, map[string]string{
"bar.go": "package bar\nconst Bar = \"Bar\"",
})
state = func(version *internal.IndexVersion, code, tryCount int) *internal.VersionState {
status := &code
if code == 0 {
status = nil
}
return &internal.VersionState{
ModulePath: version.Path,
IndexTimestamp: version.Timestamp,
Status: status,
TryCount: tryCount,
Version: version.Version,
AppVersion: appVersionLabel,
}
}
fooState = func(code, tryCount int) *internal.VersionState {
return state(fooIndex, code, tryCount)
}
barState = func(code, tryCount int) *internal.VersionState {
return state(barIndex, code, tryCount)
}
)
tests := []struct {
label string
index []*internal.IndexVersion
proxy []*proxy.TestVersion
requests []*http.Request
wantFoo *internal.VersionState
wantBar *internal.VersionState
}{
{
label: "full fetch",
index: []*internal.IndexVersion{fooIndex, barIndex},
proxy: []*proxy.TestVersion{fooProxy, barProxy},
requests: []*http.Request{
httptest.NewRequest("POST", "/poll-and-queue", nil),
},
wantFoo: fooState(http.StatusOK, 1),
wantBar: barState(http.StatusOK, 1),
}, {
label: "partial fetch",
index: []*internal.IndexVersion{fooIndex, barIndex},
proxy: []*proxy.TestVersion{fooProxy, barProxy},
requests: []*http.Request{
httptest.NewRequest("POST", "/poll-and-queue?limit=1", nil),
},
wantFoo: fooState(http.StatusOK, 1),
}, {
label: "fetch with errors",
index: []*internal.IndexVersion{fooIndex, barIndex},
proxy: []*proxy.TestVersion{fooProxy},
requests: []*http.Request{
httptest.NewRequest("POST", "/poll-and-queue", nil),
},
wantFoo: fooState(http.StatusOK, 1),
wantBar: barState(http.StatusNotFound, 1),
},
}
for _, test := range tests {
t.Run(test.label, func(t *testing.T) {
indexClient, teardownIndex := index.SetupTestIndex(t, test.index)
defer teardownIndex()
proxyClient, teardownProxy := proxy.SetupTestProxy(t, test.proxy)
defer teardownProxy()
defer postgres.ResetTestDB(testDB, t)
// Use 10 workers to have parallelism consistent with the etl binary.
queue := NewInMemoryQueue(ctx, proxyClient, testDB, 10)
s, err := NewServer(testDB, indexClient, proxyClient, queue, nil, "")
if err != nil {
t.Fatal(err)
}
mux := http.NewServeMux()
s.Install(mux.Handle)
for _, r := range test.requests {
w := httptest.NewRecorder()
mux.ServeHTTP(w, r)
if got, want := w.Code, http.StatusOK; got != want {
t.Fatalf("Code = %d, want %d", got, want)
}
}
// Sleep to hopefully allow the work to begin processing, at which point
// waitForTesting will successfully block until it is complete.
// Experimentally this was not flaky with even 10ms sleep, but we bump to
// 100ms to be extra careful.
time.Sleep(100 * time.Millisecond)
queue.WaitForTesting(ctx)
// To avoid being a change detector, only look at ModulePath, Version,
// Timestamp, and Status.
ignore := cmpopts.IgnoreFields(internal.VersionState{},
"CreatedAt", "NextProcessedAfter", "LastProcessedAt", "Error")
got, err := testDB.GetVersionState(ctx, fooIndex.Path, fooIndex.Version)
if err == nil {
if diff := cmp.Diff(test.wantFoo, got, ignore); diff != "" {
t.Errorf("testDB.GetVersionState(ctx, %q, %q) mismatch (-want +got):\n%s",
fooIndex.Path, fooIndex.Version, diff)
}
} else if test.wantFoo == nil {
if !xerrors.Is(err, derrors.NotFound) {
t.Errorf("expected Not Found error for foo, got %v", err)
}
} else {
t.Fatal(err)
}
got, err = testDB.GetVersionState(ctx, barIndex.Path, barIndex.Version)
if err == nil {
if diff := cmp.Diff(test.wantBar, got, ignore); diff != "" {
t.Errorf("testDB.GetVersionState(ctx, %q, %q) mismatch (-want +got):\n%s",
barIndex.Path, barIndex.Version, diff)
}
} else if test.wantBar == nil {
if !xerrors.Is(err, derrors.NotFound) {
t.Errorf("expected Not Found error for bar, got %v", err)
}
} else {
t.Fatal(err)
}
})
}
}
func TestParseIntParam(t *testing.T) {
for _, test := range []struct {
in string
want int
}{
{"", -1},
{"-1", -1},
{"312", 312},
{"bad", -1},
} {
got := parseIntParam(httptest.NewRequest("GET", fmt.Sprintf("/foo?x=%s", test.in), nil), "x", -1)
if got != test.want {
t.Errorf("%q: got %d, want %d", test.in, got, test.want)
}
}
}