maintner: refactor gcslog and API into separate packages

Fixes golang/go#24786.

Change-Id: I008810a0394c75e7c790165308ff9ef872c77fdc
Reviewed-on: https://go-review.googlesource.com/105935
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/maintner/maintnerd/gcslog.go b/maintner/maintnerd/gcslog/gcslog.go
similarity index 85%
rename from maintner/maintnerd/gcslog.go
rename to maintner/maintnerd/gcslog/gcslog.go
index 28d3e79..6b404f0 100644
--- a/maintner/maintnerd/gcslog.go
+++ b/maintner/maintnerd/gcslog/gcslog.go
@@ -2,9 +2,8 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-// Implementation of maintner.MutationSource and Logger for Google Cloud Storage.
-
-package main
+// Package gcslog is an implementation of maintner.MutationSource and Logger for Google Cloud Storage.
+package gcslog
 
 import (
 	"bytes"
@@ -39,7 +38,12 @@
 
 const flushInterval = 10 * time.Minute
 
-type gcsLog struct {
+// GCSLog implements MutationLogger and MutationSource.
+var _ maintner.MutationLogger = &GCSLog{}
+var _ maintner.MutationSource = &GCSLog{}
+
+// GCSLog logs mutations to GCS.
+type GCSLog struct {
 	sc         *storage.Client
 	bucketName string
 	bucket     *storage.BucketHandle
@@ -70,15 +74,16 @@
 
 // newGCSLogBase returns a new gcsLog instance without any association
 // with Google Cloud Storage.
-func newGCSLogBase() *gcsLog {
-	gl := &gcsLog{
+func newGCSLogBase() *GCSLog {
+	gl := &GCSLog{
 		seg: map[int]gcsLogSegment{},
 	}
 	gl.cond = sync.NewCond(&gl.mu)
 	return gl
 }
 
-func newGCSLog(ctx context.Context, bucketName string) (*gcsLog, error) {
+// NewGCSLog creates a GCSLog that logs mutations to a given GCS bucket.
+func NewGCSLog(ctx context.Context, bucketName string) (*GCSLog, error) {
 	sc, err := storage.NewClient(ctx)
 	if err != nil {
 		return nil, fmt.Errorf("storage.NewClient: %v", err)
@@ -95,7 +100,7 @@
 
 var objnameRx = regexp.MustCompile(`^(\d{4})\.([0-9a-f]{56})\.mutlog$`)
 
-func (gl *gcsLog) initLoad(ctx context.Context) error {
+func (gl *GCSLog) initLoad(ctx context.Context) error {
 	it := gl.bucket.Objects(ctx, nil)
 	maxNum := 0
 	for {
@@ -162,7 +167,7 @@
 	return nil
 }
 
-func (gl *gcsLog) serveLogFile(w http.ResponseWriter, r *http.Request) {
+func (gl *GCSLog) serveLogFile(w http.ResponseWriter, r *http.Request) {
 	if r.Method != "GET" && r.Method != "HEAD" {
 		http.Error(w, "bad method", http.StatusBadRequest)
 		return
@@ -193,7 +198,7 @@
 	http.ServeContent(w, r, "", time.Time{}, strings.NewReader(content))
 }
 
-func (gl *gcsLog) serveJSONLogsIndex(w http.ResponseWriter, r *http.Request) {
+func (gl *GCSLog) serveJSONLogsIndex(w http.ResponseWriter, r *http.Request) {
 	if r.Method != "GET" && r.Method != "HEAD" {
 		http.Error(w, "bad method", http.StatusBadRequest)
 		return
@@ -244,9 +249,9 @@
 	return sum
 }
 
-// waitSizeNot blocks until the sum of gcsLog is not v, or the context expires.
+// waitSizeNot blocks until the sum of GCSLog is not v, or the context expires.
 // It reports whether the size changed.
-func (gl *gcsLog) waitSizeNot(ctx context.Context, v int64) (changed bool) {
+func (gl *GCSLog) waitSizeNot(ctx context.Context, v int64) (changed bool) {
 	returned := make(chan struct{})
 	defer close(returned)
 	go gl.waitSizeNotAwaitContextOrChange(ctx, returned)
@@ -270,7 +275,7 @@
 // It's a goroutine that selects on two channels and calls
 // sync.Cond.Broadcast to wake up the waitSizeNot waiter if the
 // context expires.
-func (gl *gcsLog) waitSizeNotAwaitContextOrChange(ctx context.Context, returned <-chan struct{}) {
+func (gl *GCSLog) waitSizeNotAwaitContextOrChange(ctx context.Context, returned <-chan struct{}) {
 	select {
 	case <-ctx.Done():
 		gl.cond.Broadcast()
@@ -279,7 +284,7 @@
 	}
 }
 
-func (gl *gcsLog) sumSizeLocked() int64 {
+func (gl *GCSLog) sumSizeLocked() int64 {
 	var sum int64
 	for n, seg := range gl.seg {
 		if n != gl.curNum {
@@ -290,7 +295,7 @@
 	return sum
 }
 
-func (gl *gcsLog) getJSONLogs(startSeg int) (segs []maintner.LogSegmentJSON) {
+func (gl *GCSLog) getJSONLogs(startSeg int) (segs []maintner.LogSegmentJSON) {
 	gl.mu.Lock()
 	defer gl.mu.Unlock()
 	if startSeg > gl.curNum || startSeg < 0 {
@@ -317,9 +322,9 @@
 	return
 }
 
-// gcsLogWriter is the io.Writer used to write to gcsLog.logBuf. It
+// gcsLogWriter is the io.Writer used to write to GCSLog.logBuf. It
 // keeps the sha224 in sync. Caller must hold gl.mu.
-type gcsLogWriter struct{ gl *gcsLog }
+type gcsLogWriter struct{ gl *GCSLog }
 
 func (w gcsLogWriter) Write(p []byte) (n int, err error) {
 	gl := w.gl
@@ -337,7 +342,8 @@
 	return len(p), nil
 }
 
-func (gl *gcsLog) Log(m *maintpb.Mutation) error {
+// Log writes m to GCS after the buffer is full or after a periodic flush.
+func (gl *GCSLog) Log(m *maintpb.Mutation) error {
 	data, err := proto.Marshal(m)
 	if err != nil {
 		return err
@@ -372,14 +378,14 @@
 	return nil
 }
 
-func (gl *gcsLog) onFlushTimer() {
+func (gl *GCSLog) onFlushTimer() {
 	log.Printf("flush timer fired.")
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
 	defer cancel()
 	gl.flush(ctx)
 }
 
-func (gl *gcsLog) flush(ctx context.Context) error {
+func (gl *GCSLog) flush(ctx context.Context) error {
 	gl.mu.Lock()
 
 	defer gl.mu.Unlock()
@@ -397,7 +403,7 @@
 	return nil
 }
 
-func (gl *gcsLog) flushLocked(ctx context.Context) error {
+func (gl *GCSLog) flushLocked(ctx context.Context) error {
 	buf := gl.logBuf.Bytes()
 	if len(buf) == 0 {
 		return nil
@@ -443,7 +449,7 @@
 	return nil
 }
 
-func (gl *gcsLog) deleteOldSegment(ctx context.Context, objName string) {
+func (gl *GCSLog) deleteOldSegment(ctx context.Context, objName string) {
 	err := gl.bucket.Object(objName).Delete(ctx)
 	if err != nil {
 		// Can ignore, though. Probably emphemeral, and not critical.
@@ -454,7 +460,7 @@
 	}
 }
 
-func (gl *gcsLog) objectNames() (names []string) {
+func (gl *GCSLog) objectNames() (names []string) {
 	gl.mu.Lock()
 	defer gl.mu.Unlock()
 	for _, seg := range gl.seg {
@@ -464,7 +470,7 @@
 	return
 }
 
-func (gl *gcsLog) foreachSegmentReader(ctx context.Context, fn func(r io.Reader) error) error {
+func (gl *GCSLog) foreachSegmentReader(ctx context.Context, fn func(r io.Reader) error) error {
 	objs := gl.objectNames()
 	for i, obj := range objs {
 		log.Printf("Reading %d/%d: %s ...", i+1, len(objs), obj)
@@ -481,7 +487,10 @@
 	return nil
 }
 
-func (gl *gcsLog) GetMutations(ctx context.Context) <-chan maintner.MutationStreamEvent {
+// GetMutations returns a channel of mutations or related events.
+// The channel will never be closed.
+// All sends on the returned channel should select on the provided context.
+func (gl *GCSLog) GetMutations(ctx context.Context) <-chan maintner.MutationStreamEvent {
 	ch := make(chan maintner.MutationStreamEvent, 50) // buffered: overlap gunzip/unmarshal with loading
 	go func() {
 		err := gl.foreachSegmentReader(ctx, func(r io.Reader) error {
@@ -524,9 +533,8 @@
 	return err
 }
 
-// copyFrom is only used for the one-time migrate from disk-to-GCS
-// code path.
-func (gl *gcsLog) copyFrom(src maintner.MutationSource) error {
+// CopyFrom is only used for the one-time migrate from disk-to-GCS code path.
+func (gl *GCSLog) CopyFrom(src maintner.MutationSource) error {
 	gl.curNum = 0
 	ctx := context.Background()
 	for e := range src.GetMutations(ctx) {
@@ -546,3 +554,9 @@
 	}
 	panic("unexpected channel close")
 }
+
+// RegisterHandlers adds handlers for the default paths (/logs and /logs/).
+func (gl *GCSLog) RegisterHandlers(mux *http.ServeMux) {
+	mux.HandleFunc("/logs", gl.serveJSONLogsIndex)
+	mux.HandleFunc("/logs/", gl.serveLogFile)
+}
diff --git a/maintner/maintnerd/gcslog_test.go b/maintner/maintnerd/gcslog/gcslog_test.go
similarity index 98%
rename from maintner/maintnerd/gcslog_test.go
rename to maintner/maintnerd/gcslog/gcslog_test.go
index 209ca61..fa85fd3 100644
--- a/maintner/maintnerd/gcslog_test.go
+++ b/maintner/maintnerd/gcslog/gcslog_test.go
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-package main
+package gcslog
 
 import (
 	"context"
diff --git a/maintner/maintnerd/api.go b/maintner/maintnerd/maintapi/api.go
similarity index 94%
rename from maintner/maintnerd/api.go
rename to maintner/maintnerd/maintapi/api.go
index 2818bbe..7250961 100644
--- a/maintner/maintnerd/api.go
+++ b/maintner/maintnerd/maintapi/api.go
@@ -2,7 +2,8 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-package main
+// Package maintapi exposes a gRPC maintner service for a given corpus.
+package maintapi
 
 import (
 	"context"
@@ -18,6 +19,11 @@
 	"golang.org/x/build/maintner/maintnerd/apipb"
 )
 
+// NewAPIService creates a gRPC Server that serves the Maintner API for the given corpus.
+func NewAPIService(corpus *maintner.Corpus) apipb.MaintnerServiceServer {
+	return apiService{corpus}
+}
+
 // apiService implements apipb.MaintnerServiceServer using the Corpus c.
 type apiService struct {
 	c *maintner.Corpus
diff --git a/maintner/maintnerd/api_test.go b/maintner/maintnerd/maintapi/api_test.go
similarity index 99%
rename from maintner/maintnerd/api_test.go
rename to maintner/maintnerd/maintapi/api_test.go
index bd6b12d..5d0ba01 100644
--- a/maintner/maintnerd/api_test.go
+++ b/maintner/maintnerd/maintapi/api_test.go
@@ -2,7 +2,7 @@
 // Use of this source code is governed by a BSD-style
 // license that can be found in the LICENSE file.
 
-package main
+package maintapi
 
 import (
 	"context"
diff --git a/maintner/maintnerd/maintnerd.go b/maintner/maintnerd/maintnerd.go
index bdc293c..c9492c0 100644
--- a/maintner/maintnerd/maintnerd.go
+++ b/maintner/maintnerd/maintnerd.go
@@ -33,6 +33,8 @@
 	"golang.org/x/build/maintner"
 	"golang.org/x/build/maintner/godata"
 	"golang.org/x/build/maintner/maintnerd/apipb"
+	"golang.org/x/build/maintner/maintnerd/gcslog"
+	"golang.org/x/build/maintner/maintnerd/maintapi"
 	"golang.org/x/crypto/acme/autocert"
 	"golang.org/x/net/http2"
 	"golang.org/x/time/rate"
@@ -155,15 +157,14 @@
 	if *genMut {
 		if *bucket != "" {
 			ctx := context.Background()
-			gl, err := newGCSLog(ctx, *bucket)
+			gl, err := gcslog.NewGCSLog(ctx, *bucket)
 			if err != nil {
 				log.Fatalf("newGCSLog: %v", err)
 			}
-			http.HandleFunc("/logs", gl.serveJSONLogsIndex)
-			http.HandleFunc("/logs/", gl.serveLogFile)
+			gl.RegisterHandlers(http.DefaultServeMux)
 			if *migrateGCSFlag {
 				diskLog := maintner.NewDiskMutationLogger(*dataDir)
-				if err := gl.copyFrom(diskLog); err != nil {
+				if err := gl.CopyFrom(diskLog); err != nil {
 					log.Fatalf("migrate: %v", err)
 				}
 				log.Printf("Success.")
@@ -252,7 +253,7 @@
 	}
 
 	grpcServer := grpc.NewServer()
-	apipb.RegisterMaintnerServiceServer(grpcServer, apiService{corpus})
+	apipb.RegisterMaintnerServiceServer(grpcServer, maintapi.NewAPIService(corpus))
 	http.Handle("/apipb.MaintnerService/", grpcServer)
 
 	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {