blob: 0d66e03f75cf85f1d73f0bc1970121519b732f0e [file] [log] [blame]
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package github implements sync mechanism to mirror GitHub issue state
// into a [storage.DB] as well as code to inspect that state and to make
// issue changes on GitHub.
// All the functionality is provided by the [Client], created by [New].
package github
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"iter"
"log/slog"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"testing"
"time"
"golang.org/x/oscar/internal/secret"
"golang.org/x/oscar/internal/storage"
"golang.org/x/oscar/internal/storage/timed"
"rsc.io/ordered"
)
const (
eventKind = "github.Event"
syncProjectKind = "github.SyncProject"
)
// This package stores the following key schemas in the database:
//
// ["github.SyncProject", Project] => JSON of projectSync structure
// ["github.Event", Project, Issue, Type, API, ID] => [DBTime, Raw(JSON)]
// ["github.EventByTime", DBTime, Project, Issue, Type, API, ID] => []
//
// To reconstruct the history of a given issue, scan for keys from
// ["github.Event", Project, Issue] to ["github.Event", Project, Issue, ordered.Inf].
//
// The API field is "/issues", "/issues/comments", or "/issues/events",
// so the first key-value pair is the issue creation event with the issue body text.
//
// The IDs are GitHub's and appear to be ordered by time within an API,
// so that the comments are time-ordered and the events are time-ordered,
// but comments and events are not ordered with respect to each other.
// To order them fully, fetch all the events and sort by the time in the JSON.
//
// The JSON is the raw JSON served from GitHub describing the event.
// Storing the raw JSON avoids having to re-download everything if we decide
// another field is of interest to us.
//
// EventByTime is an index of Events by DBTime, which is the time when the
// record was added to the database. Code that processes new events can
// record which DBTime it has most recently processed and then scan forward in
// the index to learn about new events.
// o is short for ordered.Encode.
func o(list ...any) []byte { return ordered.Encode(list...) }
// Scrub is a scrubber for use with [rsc.io/httprr] when writing
// tests that access GitHub through an httprr.RecordReplay.
// It removes auth credentials from the request.
func Scrub(req *http.Request) error {
req.Header.Del("Authorization")
return nil
}
// A Client is a connection to GitHub state in a database and on GitHub itself.
type Client struct {
slog *slog.Logger
db storage.DB
secret secret.DB
http *http.Client
testing bool
testMu sync.Mutex
testClient *TestingClient
testEdits []*TestingEdit
testEvents map[string]json.RawMessage
}
// New returns a new client that uses the given logger, databases, and HTTP client.
//
// The secret database is expected to have a secret named "api.github.com" of the
// form "user:pass" where user is a user-name (ignored by GitHub) and pass is an API token
// ("ghp_...").
func New(lg *slog.Logger, db storage.DB, sdb secret.DB, hc *http.Client) *Client {
return &Client{
slog: lg,
db: db,
secret: sdb,
http: hc,
testing: testing.Testing(),
}
}
// A projectSync is per-GitHub project ("owner/repo") sync state stored in the database.
type projectSync struct {
Name string // owner/repo
EventETag string
EventID int64
IssueDate string
CommentDate string
RefillID int64
FullSyncActive bool
FullSyncIssue int64
}
// store stores proj into db.
func (proj *projectSync) store(db storage.DB) {
db.Set(o(syncProjectKind, proj.Name), storage.JSON(proj))
}
// Add adds a GitHub project of the form
// "owner/repo" (for example "golang/go")
// to the database.
// It only adds the project sync metadata.
// The initial data fetch does not happen until [Sync] or [SyncProject] is called.
// Add returns an error if the project has already been added.
func (c *Client) Add(project string) error {
key := o(syncProjectKind, project)
if _, ok := c.db.Get(key); ok {
return fmt.Errorf("github.Add: already added: %q", project)
}
c.db.Set(key, storage.JSON(&projectSync{Name: project}))
return nil
}
// Sync syncs all projects.
func (c *Client) Sync(ctx context.Context) error {
var errs []error
for key, _ := range c.db.Scan(o(syncProjectKind), o(syncProjectKind, ordered.Inf)) {
var project string
if err := ordered.Decode(key, nil, &project); err != nil {
c.db.Panic("github client sync decode", "key", storage.Fmt(key), "err", err)
}
if err := c.SyncProject(ctx, project); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
// If testFullSyncStop is non-nil, then SyncProject returns this error
// after each event is processed, to allow testing that interrupted syncs
// save state and can make progress.
var testFullSyncStop error
// SyncProject syncs a single project.
func (c *Client) SyncProject(ctx context.Context, project string) (err error) {
c.slog.Debug("github.SyncProject", "project", project)
defer func() {
if err != nil {
err = fmt.Errorf("SyncProject(%q): %w", project, err)
}
}()
key := o(syncProjectKind, project)
skey := string(key)
// Lock the project, so that no one else is sync'ing
// the project at the same time.
c.db.Lock(skey)
defer c.db.Unlock(skey)
// Load sync state.
var proj projectSync
if val, ok := c.db.Get(key); !ok {
return fmt.Errorf("missing project")
} else if err := json.Unmarshal(val, &proj); err != nil {
return err
}
// Sync issues, comments, events.
if err := c.syncIssues(ctx, &proj); err != nil {
return err
}
if err := c.syncIssueComments(ctx, &proj); err != nil {
return err
}
// See syncIssueEvents doc comment for details about this dance.
// The incremental event sync only works up to a certain number
// of events. To initialize a repo, we need a “full sync” that scans one
// issue at a time. We also need the full sync if we fall too far behind
// by not syncing for many days.
if proj.EventID == 0 || proj.FullSyncActive {
// Full scan.
if proj.EventID == 0 {
proj.FullSyncActive = true
proj.FullSyncIssue = 0
proj.store(c.db)
if err := c.syncIssueEvents(ctx, &proj, 0, true); err != nil {
return err
}
}
if err := c.syncIssues(ctx, &proj); err != nil {
return err
}
for key, _ := range c.db.Scan(o(eventKind, project), o(eventKind, project, ordered.Inf)) {
var issue int64
if _, err := ordered.DecodePrefix(key, nil, nil, &issue); err != nil {
return err
}
if issue <= proj.FullSyncIssue {
continue
}
if err := c.syncIssueEvents(ctx, &proj, issue, false); err != nil {
return err
}
proj.FullSyncIssue = issue
proj.store(c.db)
if testFullSyncStop != nil {
return testFullSyncStop
}
}
// Fall through to incremental scan to clean up.
proj.FullSyncActive = false
proj.store(c.db)
}
// Incremental scan.
if err := c.syncIssueEvents(ctx, &proj, 0, false); err != nil {
return err
}
return nil
}
// syncIssues syncs the issues for a given project.
// It records all new issues since proj.IssueDate.
// If successful, it updates proj.IssueDate to the latest issue date seen.
func (c *Client) syncIssues(ctx context.Context, proj *projectSync) error {
return c.syncByDate(ctx, proj, "/issues")
}
// syncIssueComments sync the issue comments for a given project.
// It records all new issue comments since proj.CommentDate.
// If successful, it updates proj.CommentDate to the latest comment date seen.
func (c *Client) syncIssueComments(ctx context.Context, proj *projectSync) error {
return c.syncByDate(ctx, proj, "/issues/comments")
}
// syncByDate downloads and saves issues or issue comments since
// the date specified in proj (proj.IssueDate or proj.CommentDate).
// api is "/issues" for issues or "/issues/comments" for issue comments.
// syncByDate updates the proj date with the new latest date seen
// before any error.
func (c *Client) syncByDate(ctx context.Context, proj *projectSync, api string) error {
Restart:
// For these APIs, we can ask GitHub for the event stream in increasing time order,
// so we can iterate through all the events, saving the latest time we have seen,
// and pick up where we left off.
var since *string
values := url.Values{
"sort": {"updated"},
"direction": {"asc"},
"page": {"1"},
}
switch api {
default:
panic("downloadByDate api: " + api)
case "/issues":
since = &proj.IssueDate
values["state"] = []string{"all"}
values["per_page"] = []string{"100"}
case "/issues/comments":
since = &proj.CommentDate
}
if *since != "" {
values["since"] = []string{*since}
}
b := c.db.Batch()
defer b.Apply()
urlStr := "https://api.github.com/repos/" + proj.Name + api + "?" + values.Encode()
npage := 0
defer proj.store(c.db)
for pg, err := range c.pages(ctx, urlStr, "") {
if err != nil {
return err
}
for _, raw := range pg.body {
var meta struct {
ID int64 `json:"id"`
Updated string `json:"updated_at"`
Number int64 `json:"number"` // for /issues feed
IssueURL string `json:"issue_url"` // for /issues/comments feed
CreatedAt string `json:"created_at"`
}
if err := json.Unmarshal(raw, &meta); err != nil {
return fmt.Errorf("parsing JSON: %v", err)
}
if meta.ID == 0 {
return fmt.Errorf("parsing message: no id: %s", string(raw))
}
if meta.Updated == "" {
return fmt.Errorf("parsing JSON: no updated_at: %s", string(raw))
}
switch api {
default:
c.db.Panic("github downloadByDate bad api", "api", api)
case "/issues":
if meta.Number == 0 {
return fmt.Errorf("parsing message: no number: %s", string(raw))
}
case "/issues/comments":
n, err := strconv.ParseInt(meta.IssueURL[strings.LastIndex(meta.IssueURL, "/")+1:], 10, 64)
if err != nil {
return fmt.Errorf("invalid comment URL: %s", meta.IssueURL)
}
meta.Number = n
}
c.writeEvent(b, proj.Name, meta.Number, api, meta.ID, raw)
b.MaybeApply()
*since = meta.Updated
}
b.Apply()
proj.store(c.db) // update *since
// GitHub stops returning results after 1000 pages.
// After 500 pages, restart pagination with a new since value.
// TODO: Write a test.
if npage++; npage >= 500 {
goto Restart
}
}
return nil
}
// syncIssueEvents downloads and saves new issue events in the given project.
//
// The /issues/events API does not have a "since time T" option:
// it lists events going backward in time, so we have to read backward
// until we find an event we've seen before. Only then can we update
// the "most recent seen event" proj.EventID, since otherwise there
// is still an unread gap between what we saw before and what we've
// seen so far. If syncIssueEvents is able to read back far enough, it
// updates proj.EventID and proj.EventETag.
//
// If onlySetLatest is true, syncIssueEvents does not store any events
// but does write down the most recent proj.EventID and proj.EventETag.
// (This doesn't make sense yet but keep reading.)
//
// The /issues/events API has a limit to how far back it can read,
// so if it has been a very long time since we last updated
// (or if this is the first sync), we may not reach the last known event.
// In that case, we have to resort to syncing each event separately
// using /issues/n/events, under the assumption that any one issue
// will be short enough to iterate its entire event list.
//
// If issue > 0, the sync uses /issues/n/events and does not update
// proj.EventID or proj.EventETag. The caller is expected to be looping
// over all events in this case.
//
// To sync all issues events in a repo with too many events, the full sync sequence is:
//
// - c.syncIssueEvents(proj, 0, true) to set EventID and EventETag
// to the current latest event in the repo, establishing a lower bound on what
// we will record during the next steps.
//
// - c.syncIssues(proj) to get the full list of issues.
//
// - c.syncIssueEvents(proj, issue, false) for every issue found by syncIssues.
//
// Now the database should contain all the events with id <= proj.EventID
// but it may also contain newer ones.
//
// - c.syncIssueEvents(proj, 0, false) to read any events since the beginning of the sync.
//
// Now the database should contain all events up to the new proj.EventID.
func (c *Client) syncIssueEvents(ctx context.Context, proj *projectSync, issue int64, onlySetLatest bool) error {
if issue > 0 && onlySetLatest {
panic("syncIssueEvents misuse")
}
values := url.Values{
"page": {"1"},
"per_page": {"100"},
}
var api = "/issues/events"
if issue > 0 {
api = fmt.Sprintf("/issues/%d/events", issue)
}
urlStr := "https://api.github.com/repos/" + proj.Name + api + "?" + values.Encode()
var (
firstID int64
firstETag string
lastID int64
stopped bool
)
b := c.db.Batch()
defer b.Apply()
Pages:
for pg, err := range c.pages(ctx, urlStr, proj.EventETag) {
if err == errNotModified {
return nil
}
if err != nil {
return err
}
for _, raw := range pg.body {
var meta struct {
ID int64 `json:"id"`
URL string `json:"url"`
Issue struct {
Number int64 `json:"number"`
} `json:"issue"`
}
if err := json.Unmarshal(raw, &meta); err != nil {
return fmt.Errorf("parsing JSON: %v", err)
}
if issue > 0 {
meta.Issue.Number = issue
} else if meta.Issue.Number == 0 {
return fmt.Errorf("parsing message: no issue number: %s", string(raw))
}
if meta.ID == 0 {
return fmt.Errorf("parsing message: no id: %s", string(raw))
}
if firstID == 0 {
firstID = meta.ID
firstETag = pg.resp.Header.Get("Etag")
}
lastID = meta.ID
if issue == 0 && (onlySetLatest || proj.EventID != 0 && meta.ID <= proj.EventID) {
stopped = true
break Pages
}
c.writeEvent(b, proj.Name, meta.Issue.Number, "/issues/events", meta.ID, raw)
b.MaybeApply()
}
}
if issue == 0 && lastID != 0 && !stopped {
return fmt.Errorf("lost sync: missing event IDs between %d and %d", proj.EventID, lastID)
}
if issue == 0 {
proj.EventID = firstID
proj.EventETag = firstETag
proj.store(c.db)
}
return nil
}
// writeEvent writes a single event to the database using SetTimed, to maintain a time-ordered index.
func (c *Client) writeEvent(b storage.Batch, project string, issue int64, api string, id int64, raw json.RawMessage) {
timed.Set(c.db, b, eventKind, o(project, issue, api, id), o(ordered.Raw(raw)))
}
// errNotModified is returned by get when an etag is being used
// and the server returns a 304 not modified response.
var errNotModified = errors.New("304 not modified")
// get fetches url and decodes the body as JSON into obj.
//
// If etag is non-empty, the request includes an If-None-Match: etag header
// and get returns errNotModified if the server says the object is unmodified
// since that etag.
//
// get uses the api.github.com secret if available.
// Otherwise it makes an unauthenticated request.
func (c *Client) get(ctx context.Context, url, etag string, obj any) (*http.Response, error) {
if c.divertEdits() {
c.testMu.Lock()
js := c.testEvents[url]
c.testMu.Unlock()
if js != nil {
resp := &http.Response{
StatusCode: 200,
Status: "200 OK",
}
return resp, json.Unmarshal(js, obj)
}
}
auth, _ := c.secret.Get("api.github.com")
if _, pass, ok := strings.Cut(auth, ":"); ok {
// Accept token as "password" in user:pass from netrc secret store
auth = pass
}
nrate := 0
nfail := 0
Redo:
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+auth)
if etag != "" {
req.Header.Set("If-None-Match", etag)
}
resp, err := c.http.Do(req)
if err != nil {
return nil, err
}
data, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, fmt.Errorf("reading body: %v", err)
}
if resp.StatusCode != http.StatusOK {
if resp.StatusCode == http.StatusNotModified { // 304
return nil, errNotModified
}
if c.rateLimit(resp) {
if nrate++; nrate > 20 {
return nil, fmt.Errorf("%s # too many rate limits\n%s", resp.Status, data)
}
goto Redo
}
if resp.StatusCode == http.StatusInternalServerError || resp.StatusCode == http.StatusBadGateway { // 500, 502
c.slog.Error("github get server failure", "code", resp.StatusCode, "status", resp.Status, "body", string(data))
if nfail++; nfail < 3 {
time.Sleep(time.Duration(nfail) * 2 * time.Second)
goto Redo
}
}
return nil, fmt.Errorf("%s\n%s", resp.Status, data)
}
return resp, json.Unmarshal(data, obj)
}
// A page is an HTTP response with a body that is a JSON array of objects.
// The objects are not decoded (they are json.RawMessages).
type page struct {
resp *http.Response
body []json.RawMessage
}
// pages returns a paginated result starting at url and using etag.
// If pages encounters an error, it yields nil, err.
func (c *Client) pages(ctx context.Context, url, etag string) iter.Seq2[*page, error] {
return func(yield func(*page, error) bool) {
for url != "" {
var body []json.RawMessage
resp, err := c.get(ctx, url, etag, &body)
if err != nil {
yield(nil, err)
return
}
if !yield(&page{resp, body}, nil) {
return
}
url = findNext(resp.Header.Get("Link"))
}
}
}
// findNext finds the "next" URL in the Link header value.
// Sample Link header:
//
// Link: <https://api.github.com/repositories/26468385/issues?direction=asc&page=2&per_page=100&sort=updated&state=all>; rel="next", <https://api.github.com/repositories/26468385/issues?direction=asc&page=2&per_page=100&sort=updated&state=all>; rel="last"
//
// The format is a comma-separated list of <LINK>; rel="type".
func findNext(link string) string {
for link != "" {
link = strings.TrimSpace(link)
if !strings.HasPrefix(link, "<") {
break
}
i := strings.Index(link, ">")
if i < 0 {
break
}
linkURL := link[1:i]
link = strings.TrimSpace(link[i+1:])
for strings.HasPrefix(link, ";") {
link = strings.TrimSpace(link[1:])
i := strings.Index(link, ";")
j := strings.Index(link, ",")
if i < 0 || j >= 0 && j < i {
i = j
}
if i < 0 {
i = len(link)
}
attr := strings.TrimSpace(link[:i])
if attr == `rel="next"` {
return linkURL
}
link = link[i:]
}
if !strings.HasPrefix(link, ",") {
break
}
link = strings.TrimSpace(link[1:])
}
return ""
}
// rateLimit looks at the response to decide whether a rate limit has been applied.
// If so, rateLimit sleeps until the time specified in the response, plus a bit extra.
// rateLimit reports whether this was a rate-limit response.
func (c *Client) rateLimit(resp *http.Response) bool {
if resp.StatusCode != http.StatusForbidden || resp.Header.Get("X-Ratelimit-Remaining") != "0" {
return false
}
n, _ := strconv.Atoi(resp.Header.Get("X-Ratelimit-Reset"))
if n == 0 {
return false
}
t := time.Unix(int64(n), 0)
now := time.Now()
const timeSkew = 1 * time.Minute
delay := t.Sub(now) + timeSkew
if delay < 0 {
// We are being asked to sleep until a time that already happened.
// If it happened recently (within timeSkew), then maybe our clock
// is just out of sync with GitHub's clock.
// But if it happened long ago, we are probably reading an old HTTP trace.
// In that case, return false (we didn't find a valid ratelimit).
return false
}
c.slog.Info("github ratelimit", "reset", t.Format(time.RFC3339),
"limit", resp.Header.Get("X-Ratelimit-Limit"),
"remaining", resp.Header.Get("X-Ratelimit-Remaining"),
"used", resp.Header.Get("X-Ratelimit-Used"))
time.Sleep(delay)
return true
}