blob: e18a5c65601ea0873c74ced66fc76d043f661b61 [file] [log] [blame]
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Reindex repopulates the perfdata SQL database from the original data files in Google Cloud Storage.
//
// Usage:
//
// reindex [-v] [-db foo.bar/baz] [-bucket name] prefix...
//
// Reindex reindexes all the uploads with IDs starting with the given prefixes.
package main
import (
"context"
"flag"
"fmt"
"io"
"log"
"os"
"strings"
"cloud.google.com/go/storage"
_ "github.com/GoogleCloudPlatform/cloudsql-proxy/proxy/dialers/mysql"
"golang.org/x/perf/storage/benchfmt"
"golang.org/x/perf/storage/db"
"google.golang.org/api/iterator"
)
var (
dbName = flag.String("db", "root:@cloudsql(golang-org:us-central1:golang-org)/perfdata?interpolateParams=true", "connect to MySQL `database`")
bucket = flag.String("bucket", "golang-perfdata", "read from Google Cloud Storage `bucket`")
verbose = flag.Bool("v", false, "verbose")
)
func usage() {
fmt.Fprintf(os.Stderr, `Usage of reindex:
reindex [flags] prefix...
`)
flag.PrintDefaults()
os.Exit(2)
}
func main() {
log.SetPrefix("reindex: ")
log.SetFlags(0)
flag.Usage = usage
flag.Parse()
if *verbose {
log.SetFlags(log.Ldate | log.Ltime | log.Lmicroseconds)
}
ctx := context.Background()
prefixes := flag.Args()
if len(prefixes) == 0 {
log.Fatal("no prefixes to reindex")
}
d, err := db.OpenSQL("mysql", *dbName)
if err != nil {
log.Fatal(err)
}
defer d.Close()
client, err := storage.NewClient(ctx)
if err != nil {
log.Fatal(err)
}
bucket := client.Bucket(*bucket)
for _, prefix := range prefixes {
if strings.Index(prefix, "/") >= 0 {
log.Fatalf("prefix %q cannot contain /", prefix)
}
it := bucket.Objects(ctx, &storage.Query{Prefix: "uploads/" + prefix})
var lastUploadId string
var files []string
for {
objAttrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
log.Fatal(err)
}
name := strings.TrimPrefix(objAttrs.Name, "uploads/")
slash := strings.Index(name, "/")
if slash < 0 {
log.Printf("ignoring file %q", objAttrs.Name)
}
uploadID := name[:slash]
if lastUploadId != "" && uploadID != lastUploadId {
if err := reindex(ctx, d, bucket, lastUploadId, files); err != nil {
log.Fatal(err)
}
files = nil
}
files = append(files, objAttrs.Name)
lastUploadId = uploadID
}
if len(files) > 0 {
if err := reindex(ctx, d, bucket, lastUploadId, files); err != nil {
log.Fatal(err)
}
}
}
}
func reindex(ctx context.Context, db *db.DB, bucket *storage.BucketHandle, uploadID string, files []string) error {
if *verbose {
log.Printf("reindexing %q", uploadID)
}
u, err := db.ReplaceUpload(uploadID)
if err != nil {
return err
}
for _, name := range files {
if err := reindexOne(ctx, u, bucket, name); err != nil {
return err
}
}
return u.Commit()
}
func reindexOne(ctx context.Context, u *db.Upload, bucket *storage.BucketHandle, name string) error {
r, err := bucket.Object(name).NewReader(ctx)
if err != nil {
return err
}
defer r.Close()
br := benchfmt.NewReader(r)
for {
res, err := br.Next()
if err == io.EOF {
break
}
if err != nil {
return err
}
if err := u.InsertRecord(res); err != nil {
return err
}
}
return nil
}