internal/googlegroups: embed conversations

Only the first message is used as a document.

Updates golang/oscar#27

Change-Id: I0829fc09444f49ea7ddd4ec87730d3a19c07971c
Reviewed-on: https://go-review.googlesource.com/c/oscar/+/624455
Reviewed-by: Hyang-Ah Hana Kim <hyangah@gmail.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
diff --git a/internal/googlegroups/data.go b/internal/googlegroups/data.go
index 5e23d82..ee9b33d 100644
--- a/internal/googlegroups/data.go
+++ b/internal/googlegroups/data.go
@@ -53,7 +53,7 @@
 	// page of the conversation. The page
 	// contains conversation messages.
 	URL string
-	// Messages are raw html data that contain
+	// Messages are raw HTML data that contain
 	// individual conversation messages obtained
 	// from URL.
 	Messages []string
@@ -72,18 +72,18 @@
 
 // ConversationWatcher returns a new [timed.Watcher] with the given name.
 // It picks up where any previous Watcher of the same name left off.
-func (c *Client) ConversationWatcher(name string) *timed.Watcher[ConversationEvent] {
-	return timed.NewWatcher(c.slog, c.db, name, conversationKind, c.decodeConversationEvent)
+func (c *Client) ConversationWatcher(name string) *timed.Watcher[*ConversationEvent] {
+	return timed.NewWatcher(c.slog, c.db, name, conversationUpdateKind, c.decodeConversationEvent)
 }
 
 // decodeConversationEvent decodes a conversationKind [timed.Entry] into
 // a conversation event.
-func (c *Client) decodeConversationEvent(t *timed.Entry) ConversationEvent {
+func (c *Client) decodeConversationEvent(t *timed.Entry) *ConversationEvent {
 	ce := ConversationEvent{
 		DBTime: t.ModTime,
 	}
-	if err := ordered.Decode(t.Key, &ce.Group, &ce.URL, nil); err != nil {
+	if err := ordered.Decode(t.Key, &ce.Group, &ce.URL); err != nil {
 		c.db.Panic("ggroups conversation event decode", "key", storage.Fmt(t.Key), "err", err)
 	}
-	return ce
+	return &ce
 }
diff --git a/internal/googlegroups/embed.go b/internal/googlegroups/embed.go
new file mode 100644
index 0000000..2dff6b6
--- /dev/null
+++ b/internal/googlegroups/embed.go
@@ -0,0 +1,66 @@
+// Copyright 2024 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package googlegroups
+
+import (
+	"encoding/json"
+	"iter"
+	"slices"
+
+	"golang.org/x/oscar/internal/docs"
+	"golang.org/x/oscar/internal/storage/timed"
+)
+
+var _ docs.Source[*ConversationEvent] = (*Client)(nil)
+
+const DocWatcherID = "ggroupsrelateddocs"
+
+// DocWatcher returns the change event watcher with name "ggroupsrelateddocs".
+// Implements [docs.Source.DocWatcher].
+func (c *Client) DocWatcher() *timed.Watcher[*ConversationEvent] {
+	return c.ConversationWatcher(DocWatcherID)
+}
+
+// LastWritten implements [docs.Entry.LastWritten].
+func (ce *ConversationEvent) LastWritten() timed.DBTime {
+	return ce.DBTime
+}
+
+// ToDocs converts a ConversationEvent to an embeddable document (wrapped
+// as an iterator).
+//
+// This document consists of the HTML for the first message of a
+// Google Group conversation.
+//
+//	https://groups.google.com/g/<group>/c/<conversation>
+//
+// ToDocs returns (nil, false) if any of the necessary data cannot be found
+// in the client's db.
+//
+// Implements [docs.Source.ToDocs].
+func (c *Client) ToDocs(ce *ConversationEvent) (iter.Seq[*docs.Doc], bool) {
+	key := o(conversationKind, ce.Group, ce.URL)
+	val, ok := c.db.Get(key)
+	if !ok {
+		c.slog.Error("ggroups.ToDocs cannot find conversation", "URL", ce.URL)
+		return nil, false
+	}
+	var conv Conversation
+	if err := json.Unmarshal(val, &conv); err != nil {
+		c.slog.Error("ggroups.ToDocs conversation decode failure", "URL", ce.URL, "err", err)
+		return nil, false
+	}
+
+	title := conv.Title
+	if title == "" {
+		title = conv.URL // for sanity
+	}
+	// Embed only the first conversation message.
+	return slices.Values([]*docs.Doc{{
+		ID:    conv.URL,
+		Title: title,
+		Text:  conv.Messages[0],
+	}}), true
+}
diff --git a/internal/googlegroups/sync.go b/internal/googlegroups/sync.go
index 0866097..2ea668c 100644
--- a/internal/googlegroups/sync.go
+++ b/internal/googlegroups/sync.go
@@ -33,18 +33,21 @@
 	"golang.org/x/oscar/internal/crawl"
 	"golang.org/x/oscar/internal/secret"
 	"golang.org/x/oscar/internal/storage"
+	"golang.org/x/oscar/internal/storage/timed"
 	"rsc.io/ordered"
 )
 
 const (
-	syncGroupKind    = "google.SyncGroup"
-	conversationKind = "google.GroupConversation"
+	syncGroupKind          = "google.SyncGroup"
+	conversationKind       = "google.GroupConversation"
+	conversationUpdateKind = "google.GroupConversationUpdate"
 )
 
 // This package stores timed entries in the database of the form:
 //
 //	["google.SyncGroup", group] => JSON of groupSync structure
 //	["google.GroupConversation", group, URL] => Conversation JSON
+//      ["google.GroupConversationUpdateByTime", DBTime, group, URL] => []
 //
 // Google Groups do not have an API for querying groups or conversations.
 // Further, iterating through conversations via web page is not possible
@@ -281,6 +284,8 @@
 
 			key := o(conversationKind, group.Name, conv.URL)
 			b.Set(key, storage.JSON(conv))
+			// Record that the change was updated.
+			timed.Set(c.db, b, conversationUpdateKind, o(group.Name, conv.URL), nil)
 
 			b.MaybeApply()
 
@@ -381,6 +386,9 @@
 				}
 				if truncate(conv) {
 					c.slog.Warn("ggroups conversation truncated", "conversation", u)
+				} else if len(messages) == 0 {
+					// In case Google Groups HTML structure changes.
+					c.slog.Error("ggroups conversation with no messages", "conversation", u)
 				} else {
 					yield(conv, nil)
 				}
diff --git a/internal/googlegroups/sync_test.go b/internal/googlegroups/sync_test.go
index 20098e7..c186e12 100644
--- a/internal/googlegroups/sync_test.go
+++ b/internal/googlegroups/sync_test.go
@@ -164,7 +164,7 @@
 
 			tc := c.Testing()
 			tc.setLimit(d.limit)
-			check(tc.LoadTxtar("testdata/convs.txt"))
+			check(tc.LoadTxtar("testdata/interrupt_convs.txt"))
 
 			err := c.Sync(ctx)
 			if d.wantInterrupt {
diff --git a/internal/googlegroups/syncdocs_test.go b/internal/googlegroups/syncdocs_test.go
new file mode 100644
index 0000000..926bb92
--- /dev/null
+++ b/internal/googlegroups/syncdocs_test.go
@@ -0,0 +1,79 @@
+// Copyright 2024 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package googlegroups
+
+import (
+	"context"
+	"testing"
+
+	"golang.org/x/oscar/internal/docs"
+	"golang.org/x/oscar/internal/storage"
+	"golang.org/x/oscar/internal/testutil"
+)
+
+func TestSyncGoogleGroupsDocs(t *testing.T) {
+	check := testutil.Checker(t)
+	lg := testutil.Slogger(t)
+	db := storage.MemDB()
+
+	c := New(lg, db, nil, nil)
+	check(c.Add("test"))
+
+	tc := c.Testing()
+	tc.setLimit(1000)
+	check(tc.LoadTxtar("testdata/convs.txt"))
+	// Sync the changes so the watcher has
+	// something to work with.
+	check(c.Sync(context.Background()))
+
+	dc := docs.New(lg, db)
+	docs.Sync(dc, c)
+
+	var want = []string{
+		"https://groups.google.com/g/test/c/1",
+		"https://groups.google.com/g/test/c/2",
+		"https://groups.google.com/g/test/c/3",
+	}
+	for d := range dc.Docs("") {
+		if len(want) == 0 {
+			t.Fatalf("unexpected extra doc: %s", d.ID)
+		}
+		if d.ID != want[0] {
+			t.Fatalf("doc mismatch: have %s, want %s", d.ID, want[0])
+		}
+		want = want[1:]
+		if d.ID == ch1 {
+			if d.Title != ch1Title {
+				t.Errorf("#1 Title = %q, want %q", d.Title, ch1Title)
+			}
+			if d.Text != ch1Text {
+				t.Errorf("#1 Text = %q, want %q", d.Text, ch1Text)
+			}
+		}
+	}
+	if len(want) > 0 {
+		t.Fatalf("missing docs: %v", want)
+	}
+
+	dc.Add("https://groups.google.com/g/test/c/1", "OLD TITLE", "OLD TEXT")
+	docs.Sync(dc, c)
+	d, _ := dc.Get(ch1)
+	if d.Title != "OLD TITLE" || d.Text != "OLD TEXT" {
+		t.Errorf("Sync rewrote #1: Title=%q Text=%q, want OLD TITLE, OLD TEXT", d.Title, d.Text)
+	}
+
+	docs.Restart(c)
+	docs.Sync(dc, c)
+	d, _ = dc.Get(ch1)
+	if d.Title == "OLD TITLE" || d.Text == "OLD TEXT" {
+		t.Errorf("Restart+Sync did not rewrite #1: Title=%q Text=%q", d.Title, d.Text)
+	}
+}
+
+var (
+	ch1      = "https://groups.google.com/g/test/c/1"
+	ch1Title = "goroutines"
+	ch1Text  = "Opening message for conversation 1"
+)
diff --git a/internal/googlegroups/testdata/convs.txt b/internal/googlegroups/testdata/convs.txt
index 63fd041..da66e8f 100644
--- a/internal/googlegroups/testdata/convs.txt
+++ b/internal/googlegroups/testdata/convs.txt
@@ -1,41 +1,19 @@
 -- conversation#1 --
 Group: test
+Title: goroutines
 URL: https://groups.google.com/g/test/c/1
 Updated: 2024-10-20
-HTML: update
+HTML: Opening message for conversation 1
 -- conversation#2 --
 Group: test
+Title: errors
 URL: https://groups.google.com/g/test/c/2
 Updated: 2024-10-19
-HTML: update
+HTML: Opening message for conversation 2
 -- conversation#3 --
-Group: different-test
-URL: https://groups.google.com/g/different-test/c/3
+Group: test
+Title: channels
+URL: https://groups.google.com/g/test/c/2
+URL: https://groups.google.com/g/test/c/3
 Updated: 2024-10-19
-HTML: update
--- conversation#4 --
-Group: test
-URL: https://groups.google.com/g/test/c/4
-Updated: 2024-10-19
-HTML: update
--- conversation#5 --
-Group: test
-URL: https://groups.google.com/g/test/c/5
-Updated: 2024-10-19
-HTML: update
--- conversation#6 --
-Group: test
-URL: https://groups.google.com/g/test/c/6
-Updated: 2024-10-18
-HTML: update
--- conversation#7 --
-Group: test
-URL: https://groups.google.com/g/test/c/7
-Updated: 2024-10-18
-HTML: update
-interrupt: true
--- conversation#8 --
-Group: test
-URL: https://groups.google.com/g/test/c/8
-Updated: 2024-10-17
-HTML: update
+HTML: Opening message for conversation 3
diff --git a/internal/googlegroups/testdata/interrupt_convs.txt b/internal/googlegroups/testdata/interrupt_convs.txt
new file mode 100644
index 0000000..63fd041
--- /dev/null
+++ b/internal/googlegroups/testdata/interrupt_convs.txt
@@ -0,0 +1,41 @@
+-- conversation#1 --
+Group: test
+URL: https://groups.google.com/g/test/c/1
+Updated: 2024-10-20
+HTML: update
+-- conversation#2 --
+Group: test
+URL: https://groups.google.com/g/test/c/2
+Updated: 2024-10-19
+HTML: update
+-- conversation#3 --
+Group: different-test
+URL: https://groups.google.com/g/different-test/c/3
+Updated: 2024-10-19
+HTML: update
+-- conversation#4 --
+Group: test
+URL: https://groups.google.com/g/test/c/4
+Updated: 2024-10-19
+HTML: update
+-- conversation#5 --
+Group: test
+URL: https://groups.google.com/g/test/c/5
+Updated: 2024-10-19
+HTML: update
+-- conversation#6 --
+Group: test
+URL: https://groups.google.com/g/test/c/6
+Updated: 2024-10-18
+HTML: update
+-- conversation#7 --
+Group: test
+URL: https://groups.google.com/g/test/c/7
+Updated: 2024-10-18
+HTML: update
+interrupt: true
+-- conversation#8 --
+Group: test
+URL: https://groups.google.com/g/test/c/8
+Updated: 2024-10-17
+HTML: update
diff --git a/internal/googlegroups/testing.go b/internal/googlegroups/testing.go
index 10271dc..a85fb8b 100644
--- a/internal/googlegroups/testing.go
+++ b/internal/googlegroups/testing.go
@@ -118,6 +118,8 @@
 			switch key {
 			case "Group":
 				c.Group = val
+			case "Title":
+				c.Title = val
 			case "URL":
 				c.URL = val
 			case "HTML":
diff --git a/internal/googlegroups/testing_test.go b/internal/googlegroups/testing_test.go
index 593a699..22d96cc 100644
--- a/internal/googlegroups/testing_test.go
+++ b/internal/googlegroups/testing_test.go
@@ -18,8 +18,8 @@
 
 	tc := c.Testing()
 	check(tc.LoadTxtar("testdata/convs.txt"))
-	if len(tc.convs) != 8 {
-		t.Errorf("want 8 conversations; got %d", len(tc.convs))
+	if len(tc.convs) != 3 {
+		t.Errorf("want 3 conversations; got %d", len(tc.convs))
 	}
 }
 
@@ -30,7 +30,7 @@
 	c := New(nil, nil, nil, nil)
 	tc := c.Testing()
 	tc.setLimit(1000) // grab everything in one batch
-	check(tc.LoadTxtar("testdata/convs.txt"))
+	check(tc.LoadTxtar("testdata/interrupt_convs.txt"))
 
 	cnt := 0
 	// There should be three conversations matching the criteria.