maintner: refactor gcslog and API into separate packages
Fixes golang/go#24786.
Change-Id: I008810a0394c75e7c790165308ff9ef872c77fdc
Reviewed-on: https://go-review.googlesource.com/105935
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/maintner/maintnerd/gcslog.go b/maintner/maintnerd/gcslog/gcslog.go
similarity index 85%
rename from maintner/maintnerd/gcslog.go
rename to maintner/maintnerd/gcslog/gcslog.go
index 28d3e79..6b404f0 100644
--- a/maintner/maintnerd/gcslog.go
+++ b/maintner/maintnerd/gcslog/gcslog.go
@@ -2,9 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-// Implementation of maintner.MutationSource and Logger for Google Cloud Storage.
-
-package main
+// Package gcslog is an implementation of maintner.MutationSource and Logger for Google Cloud Storage.
+package gcslog
import (
"bytes"
@@ -39,7 +38,12 @@
const flushInterval = 10 * time.Minute
-type gcsLog struct {
+// GCSLog implements MutationLogger and MutationSource.
+var _ maintner.MutationLogger = &GCSLog{}
+var _ maintner.MutationSource = &GCSLog{}
+
+// GCSLog logs mutations to GCS.
+type GCSLog struct {
sc *storage.Client
bucketName string
bucket *storage.BucketHandle
@@ -70,15 +74,16 @@
// newGCSLogBase returns a new gcsLog instance without any association
// with Google Cloud Storage.
-func newGCSLogBase() *gcsLog {
- gl := &gcsLog{
+func newGCSLogBase() *GCSLog {
+ gl := &GCSLog{
seg: map[int]gcsLogSegment{},
}
gl.cond = sync.NewCond(&gl.mu)
return gl
}
-func newGCSLog(ctx context.Context, bucketName string) (*gcsLog, error) {
+// NewGCSLog creates a GCSLog that logs mutations to a given GCS bucket.
+func NewGCSLog(ctx context.Context, bucketName string) (*GCSLog, error) {
sc, err := storage.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("storage.NewClient: %v", err)
@@ -95,7 +100,7 @@
var objnameRx = regexp.MustCompile(`^(\d{4})\.([0-9a-f]{56})\.mutlog$`)
-func (gl *gcsLog) initLoad(ctx context.Context) error {
+func (gl *GCSLog) initLoad(ctx context.Context) error {
it := gl.bucket.Objects(ctx, nil)
maxNum := 0
for {
@@ -162,7 +167,7 @@
return nil
}
-func (gl *gcsLog) serveLogFile(w http.ResponseWriter, r *http.Request) {
+func (gl *GCSLog) serveLogFile(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" && r.Method != "HEAD" {
http.Error(w, "bad method", http.StatusBadRequest)
return
@@ -193,7 +198,7 @@
http.ServeContent(w, r, "", time.Time{}, strings.NewReader(content))
}
-func (gl *gcsLog) serveJSONLogsIndex(w http.ResponseWriter, r *http.Request) {
+func (gl *GCSLog) serveJSONLogsIndex(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" && r.Method != "HEAD" {
http.Error(w, "bad method", http.StatusBadRequest)
return
@@ -244,9 +249,9 @@
return sum
}
-// waitSizeNot blocks until the sum of gcsLog is not v, or the context expires.
+// waitSizeNot blocks until the sum of GCSLog is not v, or the context expires.
// It reports whether the size changed.
-func (gl *gcsLog) waitSizeNot(ctx context.Context, v int64) (changed bool) {
+func (gl *GCSLog) waitSizeNot(ctx context.Context, v int64) (changed bool) {
returned := make(chan struct{})
defer close(returned)
go gl.waitSizeNotAwaitContextOrChange(ctx, returned)
@@ -270,7 +275,7 @@
// It's a goroutine that selects on two channels and calls
// sync.Cond.Broadcast to wake up the waitSizeNot waiter if the
// context expires.
-func (gl *gcsLog) waitSizeNotAwaitContextOrChange(ctx context.Context, returned <-chan struct{}) {
+func (gl *GCSLog) waitSizeNotAwaitContextOrChange(ctx context.Context, returned <-chan struct{}) {
select {
case <-ctx.Done():
gl.cond.Broadcast()
@@ -279,7 +284,7 @@
}
}
-func (gl *gcsLog) sumSizeLocked() int64 {
+func (gl *GCSLog) sumSizeLocked() int64 {
var sum int64
for n, seg := range gl.seg {
if n != gl.curNum {
@@ -290,7 +295,7 @@
return sum
}
-func (gl *gcsLog) getJSONLogs(startSeg int) (segs []maintner.LogSegmentJSON) {
+func (gl *GCSLog) getJSONLogs(startSeg int) (segs []maintner.LogSegmentJSON) {
gl.mu.Lock()
defer gl.mu.Unlock()
if startSeg > gl.curNum || startSeg < 0 {
@@ -317,9 +322,9 @@
return
}
-// gcsLogWriter is the io.Writer used to write to gcsLog.logBuf. It
+// gcsLogWriter is the io.Writer used to write to GCSLog.logBuf. It
// keeps the sha224 in sync. Caller must hold gl.mu.
-type gcsLogWriter struct{ gl *gcsLog }
+type gcsLogWriter struct{ gl *GCSLog }
func (w gcsLogWriter) Write(p []byte) (n int, err error) {
gl := w.gl
@@ -337,7 +342,8 @@
return len(p), nil
}
-func (gl *gcsLog) Log(m *maintpb.Mutation) error {
+// Log writes m to GCS after the buffer is full or after a periodic flush.
+func (gl *GCSLog) Log(m *maintpb.Mutation) error {
data, err := proto.Marshal(m)
if err != nil {
return err
@@ -372,14 +378,14 @@
return nil
}
-func (gl *gcsLog) onFlushTimer() {
+func (gl *GCSLog) onFlushTimer() {
log.Printf("flush timer fired.")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
gl.flush(ctx)
}
-func (gl *gcsLog) flush(ctx context.Context) error {
+func (gl *GCSLog) flush(ctx context.Context) error {
gl.mu.Lock()
defer gl.mu.Unlock()
@@ -397,7 +403,7 @@
return nil
}
-func (gl *gcsLog) flushLocked(ctx context.Context) error {
+func (gl *GCSLog) flushLocked(ctx context.Context) error {
buf := gl.logBuf.Bytes()
if len(buf) == 0 {
return nil
@@ -443,7 +449,7 @@
return nil
}
-func (gl *gcsLog) deleteOldSegment(ctx context.Context, objName string) {
+func (gl *GCSLog) deleteOldSegment(ctx context.Context, objName string) {
err := gl.bucket.Object(objName).Delete(ctx)
if err != nil {
// Can ignore, though. Probably emphemeral, and not critical.
@@ -454,7 +460,7 @@
}
}
-func (gl *gcsLog) objectNames() (names []string) {
+func (gl *GCSLog) objectNames() (names []string) {
gl.mu.Lock()
defer gl.mu.Unlock()
for _, seg := range gl.seg {
@@ -464,7 +470,7 @@
return
}
-func (gl *gcsLog) foreachSegmentReader(ctx context.Context, fn func(r io.Reader) error) error {
+func (gl *GCSLog) foreachSegmentReader(ctx context.Context, fn func(r io.Reader) error) error {
objs := gl.objectNames()
for i, obj := range objs {
log.Printf("Reading %d/%d: %s ...", i+1, len(objs), obj)
@@ -481,7 +487,10 @@
return nil
}
-func (gl *gcsLog) GetMutations(ctx context.Context) <-chan maintner.MutationStreamEvent {
+// GetMutations returns a channel of mutations or related events.
+// The channel will never be closed.
+// All sends on the returned channel should select on the provided context.
+func (gl *GCSLog) GetMutations(ctx context.Context) <-chan maintner.MutationStreamEvent {
ch := make(chan maintner.MutationStreamEvent, 50) // buffered: overlap gunzip/unmarshal with loading
go func() {
err := gl.foreachSegmentReader(ctx, func(r io.Reader) error {
@@ -524,9 +533,8 @@
return err
}
-// copyFrom is only used for the one-time migrate from disk-to-GCS
-// code path.
-func (gl *gcsLog) copyFrom(src maintner.MutationSource) error {
+// CopyFrom is only used for the one-time migrate from disk-to-GCS code path.
+func (gl *GCSLog) CopyFrom(src maintner.MutationSource) error {
gl.curNum = 0
ctx := context.Background()
for e := range src.GetMutations(ctx) {
@@ -546,3 +554,9 @@
}
panic("unexpected channel close")
}
+
+// RegisterHandlers adds handlers for the default paths (/logs and /logs/).
+func (gl *GCSLog) RegisterHandlers(mux *http.ServeMux) {
+ mux.HandleFunc("/logs", gl.serveJSONLogsIndex)
+ mux.HandleFunc("/logs/", gl.serveLogFile)
+}
diff --git a/maintner/maintnerd/gcslog_test.go b/maintner/maintnerd/gcslog/gcslog_test.go
similarity index 98%
rename from maintner/maintnerd/gcslog_test.go
rename to maintner/maintnerd/gcslog/gcslog_test.go
index 209ca61..fa85fd3 100644
--- a/maintner/maintnerd/gcslog_test.go
+++ b/maintner/maintnerd/gcslog/gcslog_test.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package main
+package gcslog
import (
"context"
diff --git a/maintner/maintnerd/api.go b/maintner/maintnerd/maintapi/api.go
similarity index 94%
rename from maintner/maintnerd/api.go
rename to maintner/maintnerd/maintapi/api.go
index 2818bbe..7250961 100644
--- a/maintner/maintnerd/api.go
+++ b/maintner/maintnerd/maintapi/api.go
@@ -2,7 +2,8 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package main
+// Package maintapi exposes a gRPC maintner service for a given corpus.
+package maintapi
import (
"context"
@@ -18,6 +19,11 @@
"golang.org/x/build/maintner/maintnerd/apipb"
)
+// NewAPIService creates a gRPC Server that serves the Maintner API for the given corpus.
+func NewAPIService(corpus *maintner.Corpus) apipb.MaintnerServiceServer {
+ return apiService{corpus}
+}
+
// apiService implements apipb.MaintnerServiceServer using the Corpus c.
type apiService struct {
c *maintner.Corpus
diff --git a/maintner/maintnerd/api_test.go b/maintner/maintnerd/maintapi/api_test.go
similarity index 99%
rename from maintner/maintnerd/api_test.go
rename to maintner/maintnerd/maintapi/api_test.go
index bd6b12d..5d0ba01 100644
--- a/maintner/maintnerd/api_test.go
+++ b/maintner/maintnerd/maintapi/api_test.go
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package main
+package maintapi
import (
"context"
diff --git a/maintner/maintnerd/maintnerd.go b/maintner/maintnerd/maintnerd.go
index bdc293c..c9492c0 100644
--- a/maintner/maintnerd/maintnerd.go
+++ b/maintner/maintnerd/maintnerd.go
@@ -33,6 +33,8 @@
"golang.org/x/build/maintner"
"golang.org/x/build/maintner/godata"
"golang.org/x/build/maintner/maintnerd/apipb"
+ "golang.org/x/build/maintner/maintnerd/gcslog"
+ "golang.org/x/build/maintner/maintnerd/maintapi"
"golang.org/x/crypto/acme/autocert"
"golang.org/x/net/http2"
"golang.org/x/time/rate"
@@ -155,15 +157,14 @@
if *genMut {
if *bucket != "" {
ctx := context.Background()
- gl, err := newGCSLog(ctx, *bucket)
+ gl, err := gcslog.NewGCSLog(ctx, *bucket)
if err != nil {
log.Fatalf("newGCSLog: %v", err)
}
- http.HandleFunc("/logs", gl.serveJSONLogsIndex)
- http.HandleFunc("/logs/", gl.serveLogFile)
+ gl.RegisterHandlers(http.DefaultServeMux)
if *migrateGCSFlag {
diskLog := maintner.NewDiskMutationLogger(*dataDir)
- if err := gl.copyFrom(diskLog); err != nil {
+ if err := gl.CopyFrom(diskLog); err != nil {
log.Fatalf("migrate: %v", err)
}
log.Printf("Success.")
@@ -252,7 +253,7 @@
}
grpcServer := grpc.NewServer()
- apipb.RegisterMaintnerServiceServer(grpcServer, apiService{corpus})
+ apipb.RegisterMaintnerServiceServer(grpcServer, maintapi.NewAPIService(corpus))
http.Handle("/apipb.MaintnerService/", grpcServer)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {