blob: 8b12b04f4fc29557b037acc95079b44524d27bae [file] [log] [blame]
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package googlegroups saves google group conversations as HTML
// in a [storage.DB].
// Every Google Group has a unique name. Group conversations are
// identified with a URL of the form
// https://groups.google.com/g/<group name>/c/<conversation id>.
// The URL points to the Google Group web page for the conversation.
// The page contains all individual conversation messages.
package googlegroups
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"iter"
"log/slog"
"net/http"
"net/url"
"regexp"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
"golang.org/x/net/html"
"golang.org/x/oscar/internal/crawl"
"golang.org/x/oscar/internal/secret"
"golang.org/x/oscar/internal/storage"
"golang.org/x/oscar/internal/storage/timed"
"rsc.io/ordered"
)
const (
syncGroupKind = "google.SyncGroup"
conversationKind = "google.GroupConversation"
conversationUpdateKind = "google.GroupConversationUpdate"
)
// This package stores timed entries in the database of the form:
//
// ["google.SyncGroup", group] => JSON of groupSync structure
// ["google.GroupConversation", group, URL] => Conversation JSON
// ["google.GroupConversationUpdateByTime", DBTime, group, URL] => []
//
// Google Groups do not have an API for querying groups or conversations.
// Further, iterating through conversations via web page is not possible
// via URLs. One has to explicitly ask the page for more conversations.
//
// We sync the conversations then as follows. The algorithm asks the
// Group search page for all conversations updated today (high and current
// watermark) or earlier. It syncs conversations for today by crawling
// the search page and then proceeds by asking for conversations updated
// yesterday (updated current watermark) or earlier, and so on. Once the search
// page returns no conversations, the algorithm stops and remembers where
// it initially started from (lower watermark): the next invocation of
// the algorithm will look only for conversations updated after that point.
// Each conversation is represented by its raw HTML.
//
// The algorithm has a limitation that it will only sync 30 most recently
// updated conversations for a day. This is because Google Search page
// shows only 30 recently updated conversations.
// o is short for ordered.Encode.
func o(list ...any) []byte { return ordered.Encode(list...) }
// A Client is a connection to google groups, and to the database
// that stores information gathered from the groups.
type Client struct {
slog *slog.Logger
db storage.DB
secret secret.DB
http *http.Client
flushRequested atomic.Bool // flush database to disk when convenient
testing bool
testMu sync.Mutex
testClient *TestingClient
}
// New returns a new client to access google groups.
// The client uses the given logger, databases, and HTTP client.
//
// The secret database will look for a secret whose name is the
// "googlegroups" instance. The value will be user:pass. This is not yet used.
func New(lg *slog.Logger, db storage.DB, sdb secret.DB, hc *http.Client) *Client {
return &Client{
slog: lg,
db: db,
secret: sdb,
http: hc,
testing: testing.Testing(),
}
}
// RequestFlush asks sync to flush the database to disk when
// convenient. This may be called concurrently with [Client.Sync].
func (c *Client) RequestFlush() {
c.flushRequested.Store(true)
}
// groupSync records the sync state of a google group,
// such as "golang-dev" or "golang-announcements".
// This is stored in the database.
type groupSync struct {
Name string // group name, such as "golang-nuts" or "golang-dev".
LowMark string // low watermark: everything before this has been synced.
HighMark string // high watermark: everything after this has not been synced.
CurrentMark string // current watermark: everything between this and HighMark has been synced.
}
// store stores group into db.
func (group *groupSync) store(db storage.DB) {
db.Set(o(syncGroupKind, group.Name), storage.JSON(group))
}
// Add adds a google group such as "golang-dev" to the database.
// It only adds the group sync metadata.
// The initial data fetch does not happen until [Sync] or [SyncGroup]
// is called.
// If the group is already present, Add does nothing and returns nil.
func (c *Client) Add(group string) error {
key := o(syncGroupKind, group)
if _, ok := c.db.Get(key); ok {
c.slog.Info("googlegroups.Add: already present", "group", group)
return nil
}
grp := &groupSync{
Name: group,
}
c.db.Set(key, storage.JSON(grp))
return nil
}
// Sync syncs the data for all client groups.
func (c *Client) Sync(ctx context.Context) error {
var errs []error
for key := range c.db.Scan(o(syncGroupKind), o(syncGroupKind, ordered.Inf)) {
var group string
if err := ordered.Decode(key, nil, &group); err != nil {
c.db.Panic("ggroups client sync decode", "key", storage.Fmt(key), "err", err)
}
if err := c.SyncGroup(ctx, group); err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}
// SyncGroup syncs a single group.
func (c *Client) SyncGroup(ctx context.Context, group string) (err error) {
c.slog.Debug("ggroups.SyncGroup", "group", group)
defer func() {
if err != nil {
err = fmt.Errorf("SyncGroup(%q): %w", group, err)
}
}()
key := o(syncGroupKind, group)
skey := string(key)
// Lock the group, so that no else is sync'ing concurrently.
c.db.Lock(skey)
defer c.db.Unlock(skey)
// Load sync state.
var grp groupSync
if val, ok := c.db.Get(key); !ok {
return fmt.Errorf("missing group %s", group)
} else if err := json.Unmarshal(val, &grp); err != nil {
return err
}
return c.syncConversations(ctx, &grp)
}
// syncConversations syncs all group conversations updated in
// [group.HighMark, group.LowMark).
func (c *Client) syncConversations(ctx context.Context, group *groupSync) (err error) {
save := func(low, high, curr string) {
group.LowMark = low
group.HighMark = high
group.CurrentMark = curr
group.store(c.db)
c.db.Flush()
}
if group.HighMark == "" {
// Since Google Groups intervals are at a day-level
// granularity, we set the current mark to tomorrow,
// so we can analyze updates made today.
save(group.LowMark, now(), tomorrow())
}
c.slog.Info("ggroups sync", "group", group.Name, "low", group.LowMark,
"curr", group.CurrentMark, "high", group.HighMark)
if err := c.syncIntervalConversations(ctx, group); err != nil {
return err
}
// Since Google Groups intervals are at a day-level granularity,
// we set the low mark to day before the last day we analyzed.
// For instance, the last day will in most cases be today. To
// ensure that we analyze the rest of today on the next
// invocation, we set the low mark to yesterday.
yest, err := prev(group.HighMark)
if err != nil {
return err
}
save(yest, "", "")
return nil
}
// testTomorrow exists for testing purposes, to avoid the
// issue of dealing with the current moment in time.
// For ordinary use this should be empty string.
// TODO: instead, should we ask database for its
// definition of tomorrow?
var testTomorrow string
// tomorrow returns day after the current time in
// timeStampLayout format.
func tomorrow() string {
if testTomorrow != "" {
return testTomorrow
}
return time.Now().Add(24 * time.Hour).Format(timeStampLayout)
}
// now returns the current time in
// timeStampLayout format.
func now() string {
return time.Now().Format(timeStampLayout)
}
// syncIntervalConversations syncs conversations in (proj.CurrentMark, proj.LowMark).
func (c *Client) syncIntervalConversations(ctx context.Context, group *groupSync) error {
b := c.db.Batch()
defer func() {
b.Apply()
c.db.Flush()
}()
// We fetch increasingly smaller but overlapping conversation
// intervals in order to ensure termination. Due to this and
// concurrent modifications, we can see the same conversation
// more than once. Keep track of the conversations we have
// already seen.
seen := make(map[string]bool)
saveCurrentMark := func(curr string) {
group.CurrentMark = curr
group.store(c.db)
}
for {
nConversations := 0
c.slog.Info("ggroups interval sync", "group", group.Name, "low", group.LowMark, "curr", group.CurrentMark)
for conv, err := range c.conversations(ctx, group.Name, group.LowMark, group.CurrentMark) {
if err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
nConversations++
if c.flushRequested.Load() {
// Flush database.
b.Apply()
c.db.Flush()
c.flushRequested.Store(false)
}
if seen[conv.URL] {
continue
}
seen[conv.URL] = true
key := o(conversationKind, group.Name, conv.URL)
b.Set(key, storage.JSON(conv))
// Record that the change was updated.
timed.Set(c.db, b, conversationUpdateKind, o(group.Name, conv.URL), nil)
b.MaybeApply()
// Flush progress to the database occasionally
// to make sure it is saved before interruption.
if nConversations%10 == 0 {
b.Apply()
c.db.Flush()
}
}
if nConversations == 0 {
break
}
// Since conversations are returned as raw HTML, we
// don't have the actual time they were updated. We
// hence simply decrease the current mark by one day.
pd, err := prev(group.CurrentMark)
if err != nil {
return err
}
saveCurrentMark(pd)
}
return nil
}
// prev accepts a timestamp t in timeStampLayout and
// returns a timestamp for exactly one day before t.
func prev(t string) (string, error) {
tt, err := time.Parse(timeStampLayout, t)
if err != nil {
return "", err
}
return tt.Add(-24 * time.Hour).Format(timeStampLayout), nil
}
// dbSizeLimit is the limit on the value size
// that can be stored to firestore, in bytes.
// See https://firebase.google.com/docs/firestore/quotas
// for more information.
const dbSizeLimit = 1048576
// testDBSizeLimit is dbSizeLimit used for testing.
var testDBSizeLimit = 0
func conversationSizeLimit() int {
if testDBSizeLimit != 0 {
return testDBSizeLimit
}
return dbSizeLimit
}
// conversations returns an iterator, in reverse chronological order, over
// conversations updated in the interval (before, after).
func (c *Client) conversations(ctx context.Context, group, after, before string) iter.Seq2[*Conversation, error] {
if c.divertChanges() { // testing
return c.testClient.conversations(ctx, group, after, before)
}
return func(yield func(*Conversation, error) bool) {
// Fetch all conversations by crawling the search page of the group.
// Note: this approach has the limitation that only the 30 most recent
// results will be returned.
query := "before:" + before
if after != "" {
query += " after:" + after
}
values := url.Values{"q": []string{query}}
addr := fmt.Sprintf("https://groups.google.com/g/%s/search?%s", group, values.Encode())
db := storage.MemDB()
crawler := crawl.New(c.slog, db, c.http)
crawler.Add(addr)
crawler.Allow("https://groups.google.com")
if err := crawler.Run(ctx); err != nil {
yield(nil, err)
return
}
for p := range crawler.PageWatcher("ggroups").Recent() {
// Google groups page contains, among other things,
// links to the message updating the coversation,
// but not the conversation link itself.
u := conversationLink(p.URL, group)
if !matchesConversation(u, group) {
continue
}
// Fetch the body of the conversation since p.URL is
// not pointing to the conversation page itself.
html, err := getHTML(ctx, c.http, u)
if err != nil {
if !yield(nil, err) {
return
}
} else {
title, messages, err := titleAndMessages(c.slog, html)
if err != nil {
// unreachable unless error in crawler or html package
c.db.Panic("ggroups extract messages", "conversation", u, "err", err)
}
conv := &Conversation{
Group: group,
Title: title,
URL: u,
Messages: messages,
}
if len(messages) == 0 {
// In case Google Groups HTML structure changes.
c.slog.Error("ggroups conversation with no messages", "conversation", u)
} else {
if truncate(conv) {
c.slog.Warn("ggroups conversation truncated", "conversation", u)
}
if !yield(conv, nil) {
return
}
}
}
}
return
}
}
// conversationLink attempts to extract url of the
// conversation underlying u. Otherwise, returns u.
// A common example of u is the link to the first
// message of the conversation.
func conversationLink(u, group string) string {
// Resolution of relative paths in the crawler
// sometimes doubles up group component.
from := fmt.Sprintf("/g/%s/g/%s/", group, group)
to := fmt.Sprintf("/g/%s/", group)
u = strings.Replace(u, from, to, 1)
return strings.Split(u, "/m/")[0] // remove message suffix
}
// convRegexp is a regular expression that
// matches only Google Group conversation URLs.
var convRegexp = regexp.MustCompile("^https://groups.google.com/g/([^/]+)/c/[a-zA-Z0-9_]+$")
// matchesConversation checks if u is a
// conversation url for the group.
func matchesConversation(u, group string) bool {
matches := convRegexp.FindAllStringSubmatch(u, -1)
if len(matches) != 1 {
return false
}
match := matches[0]
if len(match) != 2 {
return false
}
return match[1] == group
}
// getHTML uses hc to make an http GET request to u. It returns
// the raw body of the response. It does not follow redirections.
// TODO: extract common logic from crawl or simply use crawl?
func getHTML(ctx context.Context, hc *http.Client, u string) ([]byte, error) {
req, err := http.NewRequestWithContext(ctx, "GET", u, nil)
if err != nil {
return nil, err
}
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
body, err := io.ReadAll(resp.Body)
resp.Body.Close()
if err != nil {
return nil, err
}
if resp.StatusCode != 200 {
return nil, errors.New("http status " + resp.Status)
}
return body, nil
}
// titleAndMessages extracts HTML fragments of h
// containing individual conversation messages as
// well as the title.
// It returns an error if h is not HTML.
func titleAndMessages(lg *slog.Logger, h []byte) (string, []string, error) {
root, err := html.Parse(bytes.NewReader(h))
if err != nil {
return "", nil, err
}
// Currently, Google Groups web page has individual
// messages wrapped in top-level section HTML elements.
sectionNodes := sections(root)
if len(sectionNodes) == 0 {
return "", nil, nil
}
// All sections are grouped together with a parent
// that has "aria-label" set to the conversation
// name. There does not seem to be a more structured
// way of getting the title.
var title string
for _, a := range sectionNodes[0].Parent.Attr {
if a.Key == "aria-label" {
title = a.Val
break
}
}
var sections []string
for _, s := range sectionNodes {
clean(s) // reduce the size of each message
var buf bytes.Buffer
if err := html.Render(&buf, s); err != nil {
lg.Error("ggroups failed section rendering", "err", err)
continue
}
sections = append(sections, buf.String())
}
return title, sections, nil
}
// sections recursively collects all
// elements with section HTML tag.
func sections(n *html.Node) []*html.Node {
var secs []*html.Node
var doSec func(*html.Node)
doSec = func(n *html.Node) {
if n.Type == html.ElementNode && n.Data == "section" {
secs = append(secs, n)
}
for c := n.FirstChild; c != nil; c = c.NextSibling {
doSec(c)
}
}
doSec(n)
return secs
}
// clean recursively removes tag attributes.
func clean(n *html.Node) {
for c := n.FirstChild; c != nil; c = c.NextSibling {
c.Attr = nil // clean attributes
clean(c)
}
}
// truncate removes trailing messages from c until
// the total size of c is below conversationLimitSize.
// It returns true if truncation happened, false otherwise.
func truncate(c *Conversation) bool {
s := size(c)
truncated := false
limit := conversationSizeLimit()
for s >= limit {
if len(c.Messages) == 0 {
// Sanity check, as this only happens if
// conversation title and URL are together
// of dbLimitSize.
break
}
li := len(c.Messages) - 1
lms := c.Messages[li]
c.Messages = c.Messages[:li]
truncated = true
// Take into account quotation marks only.
// Conservatively, do not take into account
// potential spaces and commas around the message.
s -= len(lms) + 2
}
return truncated
}
// size is the size of c in its
// db representation, in bytes.
func size(c *Conversation) int {
return len(storage.JSON(c))
}