blob: 1fd90fff1f12b497f31fb0f2ef0f51fcae613a4c [file] [log] [blame]
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package discussion implements a sync mechanism to mirror GitHub
// discussions state into a [storage.DB].
// All the functionality is provided by the [Client], created by [New].
//
// This package stores the following key schemas in the database:
//
// ["discussion.SyncProject", Project] => JSON of [projectSync] structure
// ["discussion.Event", Project, Discussion, API, ID] => [DBTime, Raw(JSON)]
// ["discussion.EventByTime", DBTime, Project, Discussion, API, ID] => []
//
// To reconstruct the history of a given discussion, scan for keys from
// ["discussion.Event", Project, Discussion] to ["discussion.Event", Project, Discussion, ordered.Inf].
//
// The API field is "/discussions", or "/discussions/comments",
// so the first key-value pair is the discussion with its body text and
// metadata.
//
// The IDs are GitHub's and appear to be ordered by creation time within an API,
// so that the comments are time-ordered and the discussions are time-ordered,
// but comments and discussions are not ordered with respect to each other.
// To order them fully, fetch all the events and sort by the time in the JSON.
//
// The JSON is the raw JSON served from GitHub describing the event.
// Storing the raw JSON avoids having to re-download everything if we decide
// another field is of interest to us.
//
// EventByTime is an index of Events by DBTime, which is the time when the
// record was added to the database, which is not necessarily related
// to the time the event occurred. Code that processes new events can
// record which DBTime it has most recently processed and then scan forward in
// the index to learn about new events.
package discussion
import (
"context"
"encoding/json"
"errors"
"fmt"
"iter"
"log/slog"
"slices"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/oscar/internal/docs"
"golang.org/x/oscar/internal/github"
"golang.org/x/oscar/internal/secret"
"golang.org/x/oscar/internal/storage"
"golang.org/x/oscar/internal/storage/timed"
"rsc.io/ordered"
)
// Client is a client for making requests to the
// GitHub GraphQL API and syncing discussion state with
// a [storage.DB].
type Client struct {
gql *gqlClient
slog *slog.Logger
db storage.DB
testMu sync.Mutex
testClient *TestingClient
testEvents map[string]json.RawMessage
}
// New creates a new client for making requests to the GitHub
// GraphQL API.
//
// The secret database is expected to have a secret named "api.github.com" of the
// form "user:pass" where user is a user-name (ignored by GitHub) and pass is an API token
// ("ghp_...").
func New(ctx context.Context, lg *slog.Logger, sdb secret.DB, db storage.DB) *Client {
return &Client{
gql: newGQLClient(authClient(ctx, sdb)),
slog: lg,
db: db,
}
}
var _ docs.Source[*Event] = (*Client)(nil)
const DocWatcherID = "discussiondocs"
// DocWatcher returns the page watcher with name "discussiondocs".
// Implements [docs.Source.DocWatcher].
func (c *Client) DocWatcher() *timed.Watcher[*Event] {
return c.EventWatcher(DocWatcherID)
}
// ToDocs converts an event containing a discussion to
// an embeddable document (wrapped as an iterator).
// It returns (nil, false) if the event is not a discussion.
// Implements [docs.Source.ToDocs].
func (*Client) ToDocs(e *Event) (iter.Seq[*docs.Doc], bool) {
d, ok := e.Typed.(*Discussion)
if !ok {
return nil, false
}
return slices.Values([]*docs.Doc{{
ID: d.URL,
Title: github.CleanTitle(d.Title),
Text: github.CleanBody(d.Body),
}}), true
}
// Sync syncs all projects.
func (c *Client) Sync(ctx context.Context) error {
var errs []error
for key := range c.db.Scan(o(syncProjectKind), o(syncProjectKind, ordered.Inf)) {
var project string
if err := ordered.Decode(key, nil, &project); err != nil {
c.db.Panic("discussion.Client sync decode", "key", storage.Fmt(key), "err", err)
}
if err := c.SyncProject(ctx, project); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
// SyncProject syncs a single project.
func (c *Client) SyncProject(ctx context.Context, project string) error {
key := key(project)
skey := string(key)
// Lock the project, so that no else is sync'ing concurrently.
c.db.Lock(skey)
defer c.db.Unlock(skey)
// Load sync state.
proj, ok, err := load(c.db, project)
if err != nil {
return err
}
if !ok {
return fmt.Errorf("discussion.SyncProject: unknown project: %q", project)
}
if err := c.syncByDate(ctx, proj, DiscussionAPI); err != nil {
return err
}
return c.syncByDate(ctx, proj, CommentAPI)
}
// discussionEventsSince returns an iterator over the discussion events since the given time.
func (c *gqlClient) discussionEventsSince(ctx context.Context, proj *projectSync, since time.Time) iter.Seq2[*Event, error] {
return func(yield func(*Event, error) bool) {
for disc, err := range c.discussions(ctx, proj.Owner, proj.Repo) {
if err != nil {
yield(nil, err)
return
}
e, err := disc.toEvent(proj.Project)
if err != nil {
yield(nil, err)
return
}
if e.Updated.After(since) {
if !yield(e, nil) {
return
}
continue
}
// Discussions are ordered by update time so we can break early
// if we find a discussion updated at or before since.
return
}
}
}
// commentEventsSince returns an iterator over the discussion events since the given time.
func (c *gqlClient) commentEventsSince(ctx context.Context, proj *projectSync, since time.Time) iter.Seq2[*Event, error] {
return func(yield func(*Event, error) bool) {
for c, err := range c.comments(ctx, proj.Owner, proj.Repo) {
if err != nil {
yield(nil, err)
return
}
e, err := c.toEvent(proj.Project)
if err != nil {
yield(nil, err)
return
}
// Comments don't have a guaranteed order, so never break early.
if e.Updated.After(since) {
if !yield(e, nil) {
return
}
}
}
}
}
// syncByDate syncs the events for a given project and api.
// It records all new events since the appropriate date (proj.DisscussionDate
// or proj.CommentDate, depending on the API).
// If successful, it updates the date to the latest event date seen.
// TODO(tatianabradley): Add partial progress markers, in case a sync is so
// big that it hits a timeout.
func (c *Client) syncByDate(ctx context.Context, proj *projectSync, api string) error {
var sinceStr *string
var eventsSince func(context.Context, *projectSync, time.Time) iter.Seq2[*Event, error]
switch api {
case DiscussionAPI:
sinceStr = &proj.DiscussionDate
eventsSince = c.gql.discussionEventsSince
case CommentAPI:
sinceStr = &proj.CommentDate
eventsSince = c.gql.commentEventsSince
default:
// unreachable except bug in this package
c.db.Panic("unrecognized api", api)
}
since, err := parseTime(*sinceStr)
if err != nil {
return err
}
latest := time.Time{}
c.slog.Debug("syncing", "api", api, "project", proj.Project, "after", since)
b := c.db.Batch()
defer b.Apply()
for e, err := range eventsSince(ctx, proj, since) {
if err != nil {
return err
}
c.slog.Debug("syncing discussion event", "project", proj.Project,
"discussion", e.Discussion, "api", e.API,
"updated", e.Updated, "value", e.Typed)
c.writeEvent(b, e)
b.MaybeApply()
if e.Updated.After(latest) {
latest = e.Updated
}
}
b.Apply()
*sinceStr = latest.Format(time.RFC3339)
proj.store(c.db)
return nil
}
// toEvent converts a Discussion to an Event.
func (d *Discussion) toEvent(project string) (*Event, error) {
updated, err := parseTime(d.UpdatedAt)
if err != nil {
return nil, err
}
return &Event{
Project: project,
Discussion: d.Number,
API: DiscussionAPI,
ID: d.Number,
JSON: storage.JSON(d),
Updated: updated,
}, nil
}
// parseTime converts a string to a time in [time.RFC3339] format.
func parseTime(t string) (time.Time, error) {
if t == "" {
return time.Time{}, nil
}
return time.Parse(time.RFC3339, t)
}
// toEvent converts a Comment to an Event.
func (c *Comment) toEvent(project string) (*Event, error) {
updated, err := parseTime(c.UpdatedAt)
if err != nil {
return nil, err
}
disc, err := parseID(c.DiscussionURL, "/")
if err != nil {
return nil, err
}
return &Event{
Project: project,
Discussion: disc,
API: CommentAPI,
ID: c.ID(),
JSON: storage.JSON(c),
Updated: updated,
}, nil
}
// parseID returns the numerical ID at the end of the URL, right
// after the last instance of sep.
// e.g. parseID("https://example.com/comment/123","/") returns 123.
// An error is returned if the URL is malformed.
func parseID(u string, sep string) (int64, error) {
return strconv.ParseInt(u[strings.LastIndex(u, sep)+1:], 10, 64)
}
// writeEvent writes a single event to the database using [timed.Set],
// to maintain a time-ordered index.
func (c *Client) writeEvent(b storage.Batch, e *Event) {
timed.Set(c.db, b, eventKind, e.key(), o(ordered.Raw(e.JSON)))
}
// key returns the db key for an event.
func (e *Event) key() []byte {
return o(e.Project, e.Discussion, e.API, e.ID)
}
// Add adds a GitHub project of the form
// "owner/repo" (for example "golang/go")
// to the database.
// It only adds the project sync metadata.
// The initial data fetch does not happen until [Sync] or [SyncProject] is called.
// Add returns an error if the project has already been added.
func (c *Client) Add(project string) error {
if _, ok, _ := load(c.db, project); ok {
return fmt.Errorf("discussion.Add: already added: %q", project)
}
owner, repo, err := splitProject(project)
if err != nil {
return err
}
proj := &projectSync{
Project: project,
Owner: owner,
Repo: repo,
}
proj.store(c.db)
return nil
}
// store stores proj into db.
func (proj *projectSync) store(db storage.DB) {
db.Set(key(proj.Project), storage.JSON(proj))
}
// load retrieves a project from the db and reports whether
// the project is found.
// It returns an error if the project exists but cannot be unmarshaled.
func load(db storage.DB, project string) (_ *projectSync, ok bool, _ error) {
key := key(project)
p, ok := db.Get(key)
if !ok {
return nil, false, nil
}
var proj projectSync
if err := json.Unmarshal(p, &proj); err != nil {
return nil, true, fmt.Errorf("discussions.load: cannot unmarshal sync for project %s: %w", project, err)
}
return &proj, true, nil
}
// key returns the db key for a discussion project sync.
func key(project string) []byte {
return o(syncProjectKind, project)
}
// Kinds of database entries.
const (
syncProjectKind = "discussion.SyncProject"
eventKind = "discussion.Event"
)
// o is short for ordered.Encode.
func o(list ...any) []byte { return ordered.Encode(list...) }
// splitProject returns the owner and repo for a project of the form
// "owner/repo".
func splitProject(project string) (owner, repo string, _ error) {
parts := strings.Split(project, "/")
if len(parts) != 2 {
return "", "", fmt.Errorf("invalid project: %s", project)
}
return parts[0], parts[1], nil
}
// projectSync is the state of a discussions sync in the DB.
type projectSync struct {
Project string // owner/repo
Owner, Repo string
DiscussionDate string // updated time of last synced discussion
CommentDate string // updated time of last synced comment/reply
}