storage/db: batch INSERT statements across records
The MySQL protocol requires 1-3 synchronous round-trips for every
INSERT statement; to reduce the overhead, we now batch up 900 label
INSERT statments at a time. This makes a massive difference;
TestQuery previously ran in 108s; with this change, it now runs in 5s.
We were also affected by golang/go#15606; since we now generate a new INSERT
statement for every record, we are sidestepping that issue.
Change-Id: Id7a56c18c0978470542135894a2f2bcf6f7c9dd1
Reviewed-on: https://go-review.googlesource.com/35266
Reviewed-by: Russ Cox <rsc@golang.org>
diff --git a/storage/db/db.go b/storage/db/db.go
index 2c99f3c..385dc09 100644
--- a/storage/db/db.go
+++ b/storage/db/db.go
@@ -32,7 +32,6 @@
// prepared statements
lastUpload *sql.Stmt
insertUpload *sql.Stmt
- insertRecord *sql.Stmt
checkUpload *sql.Stmt
deleteRecords *sql.Stmt
}
@@ -141,10 +140,6 @@
if err != nil {
return err
}
- db.insertRecord, err = db.sql.Prepare("INSERT INTO Records(UploadID, RecordID, Content) VALUES (?, ?, ?)")
- if err != nil {
- return err
- }
db.checkUpload, err = db.sql.Prepare("SELECT 1 FROM Uploads WHERE UploadID = ?")
if err != nil {
return err
@@ -168,6 +163,10 @@
db *DB
// tx is the transaction used by the upload.
tx *sql.Tx
+
+ // pending arguments for flush
+ insertRecordArgs []interface{}
+ insertLabelArgs []interface{}
}
// now is a hook for testing
@@ -273,29 +272,73 @@
if err := benchfmt.NewPrinter(&buf).Print(r); err != nil {
return err
}
- if _, err := u.tx.Stmt(u.db.insertRecord).Exec(u.ID, u.recordid, buf.Bytes()); err != nil {
- return err
- }
- var args []interface{}
+ u.insertRecordArgs = append(u.insertRecordArgs, u.ID, u.recordid, buf.Bytes())
for _, k := range r.Labels.Keys() {
- args = append(args, u.ID, u.recordid, k, r.Labels[k])
+ if err := u.insertLabel(k, r.Labels[k]); err != nil {
+ return err
+ }
}
for _, k := range r.NameLabels.Keys() {
- args = append(args, u.ID, u.recordid, k, r.NameLabels[k])
- }
- if len(args) > 0 {
- query := "INSERT INTO RecordLabels VALUES " + strings.Repeat("(?, ?, ?, ?), ", len(args)/4)
- query = strings.TrimSuffix(query, ", ")
- if _, err := u.tx.Exec(query, args...); err != nil {
+ if err := u.insertLabel(k, r.NameLabels[k]); err != nil {
return err
}
}
u.recordid++
+
+ return nil
+}
+
+// insertLabel queues a label pair for insertion.
+// If there are enough labels queued, flush is called.
+func (u *Upload) insertLabel(key, value string) error {
+ // N.B. sqlite3 has a max of 999 arguments.
+ // https://www.sqlite.org/limits.html#max_variable_number
+ if len(u.insertLabelArgs) >= 990 {
+ if err := u.flush(); err != nil {
+ return err
+ }
+ }
+ u.insertLabelArgs = append(u.insertLabelArgs, u.ID, u.recordid, key, value)
+ return nil
+}
+
+// repeatDelim returns a string consisting of n copies of s with delim between each copy.
+func repeatDelim(s, delim string, n int) string {
+ return strings.TrimSuffix(strings.Repeat(s+delim, n), delim)
+}
+
+// insertMultiple executes a single INSERT statement to insert multiple rows.
+func insertMultiple(tx *sql.Tx, sqlPrefix string, argsPerRow int, args []interface{}) error {
+ if len(args) == 0 {
+ return nil
+ }
+ query := sqlPrefix + repeatDelim("("+repeatDelim("?", ", ", argsPerRow)+")", ", ", len(args)/argsPerRow)
+ _, err := tx.Exec(query, args...)
+ return err
+}
+
+// flush sends INSERT statements for any pending data in u.insertRecordArgs and u.insertLabelArgs.
+func (u *Upload) flush() error {
+ if n := len(u.insertRecordArgs); n > 0 {
+ if err := insertMultiple(u.tx, "INSERT INTO Records(UploadID, RecordID, Content) VALUES ", 3, u.insertRecordArgs); err != nil {
+ return err
+ }
+ u.insertRecordArgs = nil
+ }
+ if n := len(u.insertLabelArgs); n > 0 {
+ if err := insertMultiple(u.tx, "INSERT INTO RecordLabels VALUES ", 4, u.insertLabelArgs); err != nil {
+ return err
+ }
+ u.insertLabelArgs = nil
+ }
return nil
}
// Commit finishes processing the upload.
func (u *Upload) Commit() error {
+ if err := u.flush(); err != nil {
+ return err
+ }
return u.tx.Commit()
}
@@ -465,11 +508,10 @@
// Close closes the database connections, releasing any open resources.
func (db *DB) Close() error {
- if err := db.insertUpload.Close(); err != nil {
- return err
- }
- if err := db.insertRecord.Close(); err != nil {
- return err
+ for _, stmt := range []*sql.Stmt{db.lastUpload, db.insertUpload, db.checkUpload, db.deleteRecords} {
+ if err := stmt.Close(); err != nil {
+ return err
+ }
}
return db.sql.Close()
}