blob: d5674ebf0dfd8e8b53a9f6783c5ca8b2c14c5c25 [file] [log] [blame]
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package db provides the high-level database interface for the
// storage app.
package db
import (
"bytes"
"database/sql"
"fmt"
"io"
"regexp"
"sort"
"strconv"
"strings"
"text/template"
"time"
"golang.org/x/net/context"
"golang.org/x/perf/storage"
"golang.org/x/perf/storage/benchfmt"
"golang.org/x/perf/storage/query"
)
// TODO(quentin): Add Context to every function when App Engine supports Go >=1.8.
// DB is a high-level interface to a database for the storage
// app. It's safe for concurrent use by multiple goroutines.
type DB struct {
sql *sql.DB // underlying database connection
driverName string // name of underlying driver for SQL differences
// prepared statements
lastUpload *sql.Stmt
insertUpload *sql.Stmt
checkUpload *sql.Stmt
deleteRecords *sql.Stmt
}
// OpenSQL creates a DB backed by a SQL database. The parameters are
// the same as the parameters for sql.Open. Only mysql and sqlite3 are
// explicitly supported; other database engines will receive MySQL
// query syntax which may or may not be compatible.
func OpenSQL(driverName, dataSourceName string) (*DB, error) {
db, err := sql.Open(driverName, dataSourceName)
if err != nil {
return nil, err
}
if hook := openHooks[driverName]; hook != nil {
if err := hook(db); err != nil {
return nil, err
}
}
d := &DB{sql: db, driverName: driverName}
if err := d.createTables(driverName); err != nil {
return nil, err
}
if err := d.prepareStatements(driverName); err != nil {
return nil, err
}
return d, nil
}
var openHooks = make(map[string]func(*sql.DB) error)
// RegisterOpenHook registers a hook to be called after opening a connection to driverName.
// This is used by the sqlite3 package to register a ConnectHook.
// It must be called from an init function.
func RegisterOpenHook(driverName string, hook func(*sql.DB) error) {
openHooks[driverName] = hook
}
// createTmpl is the template used to prepare the CREATE statements
// for the database. It is evaluated with . as a map containing one
// entry whose key is the driver name.
var createTmpl = template.Must(template.New("create").Parse(`
CREATE TABLE IF NOT EXISTS Uploads (
UploadID VARCHAR(20) PRIMARY KEY,
Day VARCHAR(8),
Seq BIGINT UNSIGNED
{{if not .sqlite3}}
, Index (Day, Seq)
{{end}}
);
{{if .sqlite3}}
CREATE INDEX IF NOT EXISTS UploadDaySeq ON Uploads(Day, Seq);
{{end}}
CREATE TABLE IF NOT EXISTS Records (
UploadID VARCHAR(20) NOT NULL,
RecordID BIGINT UNSIGNED NOT NULL,
Content BLOB NOT NULL,
PRIMARY KEY (UploadID, RecordID),
FOREIGN KEY (UploadID) REFERENCES Uploads(UploadID) ON UPDATE CASCADE ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS RecordLabels (
UploadID VARCHAR(20) NOT NULL,
RecordID BIGINT UNSIGNED NOT NULL,
Name VARCHAR(255) NOT NULL,
Value VARCHAR(8192) NOT NULL,
{{if not .sqlite3}}
Index (Name(100), Value(100)),
{{end}}
PRIMARY KEY (UploadID, RecordID, Name),
FOREIGN KEY (UploadID, RecordID) REFERENCES Records(UploadID, RecordID) ON UPDATE CASCADE ON DELETE CASCADE
);
{{if .sqlite3}}
CREATE INDEX IF NOT EXISTS RecordLabelsNameValue ON RecordLabels(Name, Value);
{{end}}
`))
// createTables creates any missing tables on the connection in
// db.sql. driverName is the same driver name passed to sql.Open and
// is used to select the correct syntax.
func (db *DB) createTables(driverName string) error {
var buf bytes.Buffer
if err := createTmpl.Execute(&buf, map[string]bool{driverName: true}); err != nil {
return err
}
for _, q := range strings.Split(buf.String(), ";") {
if strings.TrimSpace(q) == "" {
continue
}
if _, err := db.sql.Exec(q); err != nil {
return fmt.Errorf("create table: %v", err)
}
}
return nil
}
// prepareStatements calls db.sql.Prepare on reusable SQL statements.
func (db *DB) prepareStatements(driverName string) error {
var err error
query := "SELECT UploadID FROM Uploads ORDER BY Day DESC, Seq DESC LIMIT 1"
if driverName != "sqlite3" {
query += " FOR UPDATE"
}
db.lastUpload, err = db.sql.Prepare(query)
if err != nil {
return err
}
db.insertUpload, err = db.sql.Prepare("INSERT INTO Uploads(UploadID, Day, Seq) VALUES (?, ?, ?)")
if err != nil {
return err
}
db.checkUpload, err = db.sql.Prepare("SELECT 1 FROM Uploads WHERE UploadID = ?")
if err != nil {
return err
}
db.deleteRecords, err = db.sql.Prepare("DELETE FROM Records WHERE UploadID = ?")
if err != nil {
return err
}
return nil
}
// An Upload is a collection of files that share an upload ID.
type Upload struct {
// ID is the value of the "upload" key that should be
// associated with every record in this upload.
ID string
// recordid is the index of the next record to insert.
recordid int64
// db is the underlying database that this upload is going to.
db *DB
// tx is the transaction used by the upload.
tx *sql.Tx
// pending arguments for flush
insertRecordArgs []interface{}
insertLabelArgs []interface{}
lastResult *benchfmt.Result
}
// now is a hook for testing
var now = time.Now
// ReplaceUpload removes the records associated with id if any and
// allows insertion of new records.
func (db *DB) ReplaceUpload(id string) (*Upload, error) {
if _, err := db.deleteRecords.Exec(id); err != nil {
return nil, err
}
var found bool
err := db.checkUpload.QueryRow(id).Scan(&found)
switch err {
case sql.ErrNoRows:
var day sql.NullString
var num sql.NullInt64
if m := regexp.MustCompile(`^(\d+)\.(\d+)$`).FindStringSubmatch(id); m != nil {
day.Valid, num.Valid = true, true
day.String = m[1]
num.Int64, _ = strconv.ParseInt(m[2], 10, 64)
}
if _, err := db.insertUpload.Exec(id, day, num); err != nil {
return nil, err
}
case nil:
default:
return nil, err
}
tx, err := db.sql.Begin()
if err != nil {
return nil, err
}
u := &Upload{
ID: id,
db: db,
tx: tx,
}
return u, nil
}
// NewUpload returns an upload for storing new files.
// All records written to the Upload will have the same upload ID.
func (db *DB) NewUpload(ctx context.Context) (*Upload, error) {
day := now().UTC().Format("20060102")
num := 0
tx, err := db.sql.Begin()
if err != nil {
return nil, err
}
defer func() {
if tx != nil {
tx.Rollback()
}
}()
var lastID string
err = tx.Stmt(db.lastUpload).QueryRow().Scan(&lastID)
switch err {
case sql.ErrNoRows:
case nil:
if strings.HasPrefix(lastID, day) {
num, err = strconv.Atoi(lastID[len(day)+1:])
if err != nil {
return nil, err
}
}
default:
return nil, err
}
num++
id := fmt.Sprintf("%s.%d", day, num)
_, err = tx.Stmt(db.insertUpload).Exec(id, day, num)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
tx = nil
utx, err := db.sql.Begin()
if err != nil {
return nil, err
}
u := &Upload{
ID: id,
db: db,
tx: utx,
}
return u, nil
}
// InsertRecord inserts a single record in an existing upload.
// If InsertRecord returns a non-nil error, the Upload has failed and u.Abort() must be called.
func (u *Upload) InsertRecord(r *benchfmt.Result) error {
if u.lastResult != nil && u.lastResult.SameLabels(r) {
data := u.insertRecordArgs[len(u.insertRecordArgs)-1].([]byte)
data = append(data, r.Content...)
data = append(data, '\n')
u.insertRecordArgs[len(u.insertRecordArgs)-1] = data
return nil
}
// TODO(quentin): Support multiple lines (slice of results?)
var buf bytes.Buffer
if err := benchfmt.NewPrinter(&buf).Print(r); err != nil {
return err
}
u.lastResult = r
u.insertRecordArgs = append(u.insertRecordArgs, u.ID, u.recordid, buf.Bytes())
for _, k := range r.Labels.Keys() {
if err := u.insertLabel(k, r.Labels[k]); err != nil {
return err
}
}
for _, k := range r.NameLabels.Keys() {
if err := u.insertLabel(k, r.NameLabels[k]); err != nil {
return err
}
}
u.recordid++
return nil
}
// insertLabel queues a label pair for insertion.
// If there are enough labels queued, flush is called.
func (u *Upload) insertLabel(key, value string) error {
// N.B. sqlite3 has a max of 999 arguments.
// https://www.sqlite.org/limits.html#max_variable_number
if len(u.insertLabelArgs) >= 990 {
if err := u.flush(); err != nil {
return err
}
}
u.insertLabelArgs = append(u.insertLabelArgs, u.ID, u.recordid, key, value)
return nil
}
// repeatDelim returns a string consisting of n copies of s with delim between each copy.
func repeatDelim(s, delim string, n int) string {
return strings.TrimSuffix(strings.Repeat(s+delim, n), delim)
}
// insertMultiple executes a single INSERT statement to insert multiple rows.
func insertMultiple(tx *sql.Tx, sqlPrefix string, argsPerRow int, args []interface{}) error {
if len(args) == 0 {
return nil
}
query := sqlPrefix + repeatDelim("("+repeatDelim("?", ", ", argsPerRow)+")", ", ", len(args)/argsPerRow)
_, err := tx.Exec(query, args...)
return err
}
// flush sends INSERT statements for any pending data in u.insertRecordArgs and u.insertLabelArgs.
func (u *Upload) flush() error {
if n := len(u.insertRecordArgs); n > 0 {
if err := insertMultiple(u.tx, "INSERT INTO Records(UploadID, RecordID, Content) VALUES ", 3, u.insertRecordArgs); err != nil {
return err
}
u.insertRecordArgs = nil
}
if n := len(u.insertLabelArgs); n > 0 {
if err := insertMultiple(u.tx, "INSERT INTO RecordLabels VALUES ", 4, u.insertLabelArgs); err != nil {
return err
}
u.insertLabelArgs = nil
}
u.lastResult = nil
return nil
}
// Commit finishes processing the upload.
func (u *Upload) Commit() error {
if err := u.flush(); err != nil {
return err
}
return u.tx.Commit()
}
// Abort cleans up resources associated with the upload.
// It does not attempt to clean up partial database state.
func (u *Upload) Abort() error {
return u.tx.Rollback()
}
// parseQuery parses a query into a slice of SQL subselects and a slice of arguments.
// The subselects must be joined with INNER JOIN in the order returned.
func parseQuery(q string) (sql []string, args []interface{}, err error) {
var keys []string
parts := make(map[string]part)
for _, word := range query.SplitWords(q) {
p, err := parseWord(word)
if err != nil {
return nil, nil, err
}
if _, ok := parts[p.key]; ok {
parts[p.key], err = parts[p.key].merge(p)
if err != nil {
return nil, nil, err
}
} else {
keys = append(keys, p.key)
parts[p.key] = p
}
}
// Process each key
sort.Strings(keys)
for _, key := range keys {
s, a, err := parts[key].sql()
if err != nil {
return nil, nil, err
}
sql = append(sql, s)
args = append(args, a...)
}
return
}
// Query searches for results matching the given query string.
//
// The query string is first parsed into quoted words (as in the shell)
// and then each word must be formatted as one of the following:
// key:value - exact match on label "key" = "value"
// key>value - value greater than (useful for dates)
// key<value - value less than (also useful for dates)
func (db *DB) Query(q string) *Query {
ret := &Query{q: q}
query := "SELECT r.Content FROM "
sql, args, err := parseQuery(q)
if err != nil {
ret.err = err
return ret
}
for i, part := range sql {
if i > 0 {
query += " INNER JOIN "
}
query += fmt.Sprintf("(%s) t%d", part, i)
if i > 0 {
query += " USING (UploadID, RecordID)"
}
}
if len(sql) > 0 {
query += " LEFT JOIN"
}
query += " Records r"
if len(sql) > 0 {
query += " USING (UploadID, RecordID)"
}
ret.sqlQuery, ret.sqlArgs = query, args
ret.rows, ret.err = db.sql.Query(query, args...)
return ret
}
// Query is the result of a query.
// Use Next to advance through the rows, making sure to call Close when done:
//
// q := db.Query("key:value")
// defer q.Close()
// for q.Next() {
// res := q.Result()
// ...
// }
// err = q.Err() // get any error encountered during iteration
// ...
type Query struct {
rows *sql.Rows
// for Debug
q string
sqlQuery string
sqlArgs []interface{}
// from last call to Next
br *benchfmt.Reader
err error
}
// Debug returns the human-readable state of the query.
func (q *Query) Debug() string {
ret := fmt.Sprintf("q=%q", q.q)
if q.sqlQuery != "" || len(q.sqlArgs) > 0 {
ret += fmt.Sprintf(" sql={%q %#v}", q.sqlQuery, q.sqlArgs)
}
if q.err != nil {
ret += fmt.Sprintf(" err=%v", q.err)
}
return ret
}
// Next prepares the next result for reading with the Result
// method. It returns false when there are no more results, either by
// reaching the end of the input or an error.
func (q *Query) Next() bool {
if q.err != nil {
return false
}
if q.br != nil {
if q.br.Next() {
return true
}
q.err = q.br.Err()
if q.err != nil {
return false
}
}
if !q.rows.Next() {
return false
}
var content []byte
q.err = q.rows.Scan(&content)
if q.err != nil {
return false
}
q.br = benchfmt.NewReader(bytes.NewReader(content))
if !q.br.Next() {
q.err = q.br.Err()
if q.err == nil {
q.err = io.ErrUnexpectedEOF
}
return false
}
return q.err == nil
}
// Result returns the most recent result generated by a call to Next.
func (q *Query) Result() *benchfmt.Result {
return q.br.Result()
}
// Err returns the error state of the query.
func (q *Query) Err() error {
if q.err == io.EOF {
return nil
}
return q.err
}
// Close frees resources associated with the query.
func (q *Query) Close() error {
if q.rows != nil {
return q.rows.Close()
}
return q.Err()
}
// CountUploads returns the number of uploads in the database.
func (db *DB) CountUploads() (int, error) {
var uploads int
err := db.sql.QueryRow("SELECT COUNT(*) FROM Uploads").Scan(&uploads)
return uploads, err
}
// Close closes the database connections, releasing any open resources.
func (db *DB) Close() error {
for _, stmt := range []*sql.Stmt{db.lastUpload, db.insertUpload, db.checkUpload, db.deleteRecords} {
if err := stmt.Close(); err != nil {
return err
}
}
return db.sql.Close()
}
// UploadList is the result of ListUploads.
// Use Next to advance through the rows, making sure to call Close when done:
//
// q := db.ListUploads("key:value")
// defer q.Close()
// for q.Next() {
// info := q.Info()
// ...
// }
// err = q.Err() // get any error encountered during iteration
// ...
type UploadList struct {
rows *sql.Rows
extraLabels []string
// for Debug
q string
sqlQuery string
sqlArgs []interface{}
// from last call to Next
count int
uploadID string
labelValues []sql.NullString
err error
}
// Debug returns the human-readable state of ul.
func (ul *UploadList) Debug() string {
ret := fmt.Sprintf("q=%q", ul.q)
if ul.sqlQuery != "" || len(ul.sqlArgs) > 0 {
ret += fmt.Sprintf(" sql={%q %#v}", ul.sqlQuery, ul.sqlArgs)
}
if ul.err != nil {
ret += fmt.Sprintf(" err=%v", ul.err)
}
return ret
}
// ListUploads searches for uploads containing results matching the given query string.
// The query may be empty, in which case all uploads will be returned.
// For each label in extraLabels, one unspecified record's value will be obtained for each upload.
// If limit is non-zero, only the limit most recent uploads will be returned.
func (db *DB) ListUploads(q string, extraLabels []string, limit int) *UploadList {
ret := &UploadList{q: q, extraLabels: extraLabels}
var args []interface{}
query := "SELECT j.UploadID, rCount"
for i, label := range extraLabels {
query += fmt.Sprintf(", (SELECT l%d.Value FROM RecordLabels l%d WHERE l%d.UploadID = j.UploadID AND Name = ? LIMIT 1)", i, i, i)
args = append(args, label)
}
sql, qArgs, err := parseQuery(q)
if err != nil {
ret.err = err
return ret
}
if len(sql) == 0 {
// Optimize empty query.
query += " FROM (SELECT UploadID, (SELECT COUNT(*) FROM Records r WHERE r.UploadID = u.UploadID) AS rCount FROM Uploads u "
switch db.driverName {
case "sqlite3":
query += "WHERE"
default:
query += "HAVING"
}
query += " rCount > 0 ORDER BY u.Day DESC, u.Seq DESC, u.UploadID DESC"
if limit != 0 {
query += fmt.Sprintf(" LIMIT %d", limit)
}
query += ") j"
} else {
// Join individual queries.
query += " FROM (SELECT UploadID, COUNT(*) as rCount FROM "
args = append(args, qArgs...)
for i, part := range sql {
if i > 0 {
query += " INNER JOIN "
}
query += fmt.Sprintf("(%s) t%d", part, i)
if i > 0 {
query += " USING (UploadID, RecordID)"
}
}
query += " LEFT JOIN Records r USING (UploadID, RecordID)"
query += " GROUP BY UploadID) j LEFT JOIN Uploads u USING (UploadID) ORDER BY u.Day DESC, u.Seq DESC, u.UploadID DESC"
if limit != 0 {
query += fmt.Sprintf(" LIMIT %d", limit)
}
}
ret.sqlQuery, ret.sqlArgs = query, args
ret.rows, ret.err = db.sql.Query(query, args...)
return ret
}
// Next prepares the next result for reading with the Result
// method. It returns false when there are no more results, either by
// reaching the end of the input or an error.
func (ul *UploadList) Next() bool {
if ul.err != nil {
return false
}
if !ul.rows.Next() {
return false
}
args := []interface{}{&ul.uploadID, &ul.count}
ul.labelValues = make([]sql.NullString, len(ul.extraLabels))
for i := range ul.labelValues {
args = append(args, &ul.labelValues[i])
}
ul.err = ul.rows.Scan(args...)
if ul.err != nil {
return false
}
return ul.err == nil
}
// Info returns the most recent UploadInfo generated by a call to Next.
func (ul *UploadList) Info() storage.UploadInfo {
l := make(benchfmt.Labels)
for i := range ul.extraLabels {
if ul.labelValues[i].Valid {
l[ul.extraLabels[i]] = ul.labelValues[i].String
}
}
return storage.UploadInfo{UploadID: ul.uploadID, Count: ul.count, LabelValues: l}
}
// Err returns the error state of the query.
func (ul *UploadList) Err() error {
return ul.err
}
// Close frees resources associated with the query.
func (ul *UploadList) Close() error {
if ul.rows != nil {
return ul.rows.Close()
}
return ul.err
}