storage/db: batch INSERT statements across records

The MySQL protocol requires 1-3 synchronous round-trips for every
INSERT statement; to reduce the overhead, we now batch up 900 label
INSERT statments at a time. This makes a massive difference;
TestQuery previously ran in 108s; with this change, it now runs in 5s.

We were also affected by golang/go#15606; since we now generate a new INSERT
statement for every record, we are sidestepping that issue.

Change-Id: Id7a56c18c0978470542135894a2f2bcf6f7c9dd1
Reviewed-on: https://go-review.googlesource.com/35266
Reviewed-by: Russ Cox <rsc@golang.org>
diff --git a/storage/db/db.go b/storage/db/db.go
index 2c99f3c..385dc09 100644
--- a/storage/db/db.go
+++ b/storage/db/db.go
@@ -32,7 +32,6 @@
 	// prepared statements
 	lastUpload    *sql.Stmt
 	insertUpload  *sql.Stmt
-	insertRecord  *sql.Stmt
 	checkUpload   *sql.Stmt
 	deleteRecords *sql.Stmt
 }
@@ -141,10 +140,6 @@
 	if err != nil {
 		return err
 	}
-	db.insertRecord, err = db.sql.Prepare("INSERT INTO Records(UploadID, RecordID, Content) VALUES (?, ?, ?)")
-	if err != nil {
-		return err
-	}
 	db.checkUpload, err = db.sql.Prepare("SELECT 1 FROM Uploads WHERE UploadID = ?")
 	if err != nil {
 		return err
@@ -168,6 +163,10 @@
 	db *DB
 	// tx is the transaction used by the upload.
 	tx *sql.Tx
+
+	// pending arguments for flush
+	insertRecordArgs []interface{}
+	insertLabelArgs  []interface{}
 }
 
 // now is a hook for testing
@@ -273,29 +272,73 @@
 	if err := benchfmt.NewPrinter(&buf).Print(r); err != nil {
 		return err
 	}
-	if _, err := u.tx.Stmt(u.db.insertRecord).Exec(u.ID, u.recordid, buf.Bytes()); err != nil {
-		return err
-	}
-	var args []interface{}
+	u.insertRecordArgs = append(u.insertRecordArgs, u.ID, u.recordid, buf.Bytes())
 	for _, k := range r.Labels.Keys() {
-		args = append(args, u.ID, u.recordid, k, r.Labels[k])
+		if err := u.insertLabel(k, r.Labels[k]); err != nil {
+			return err
+		}
 	}
 	for _, k := range r.NameLabels.Keys() {
-		args = append(args, u.ID, u.recordid, k, r.NameLabels[k])
-	}
-	if len(args) > 0 {
-		query := "INSERT INTO RecordLabels VALUES " + strings.Repeat("(?, ?, ?, ?), ", len(args)/4)
-		query = strings.TrimSuffix(query, ", ")
-		if _, err := u.tx.Exec(query, args...); err != nil {
+		if err := u.insertLabel(k, r.NameLabels[k]); err != nil {
 			return err
 		}
 	}
 	u.recordid++
+
+	return nil
+}
+
+// insertLabel queues a label pair for insertion.
+// If there are enough labels queued, flush is called.
+func (u *Upload) insertLabel(key, value string) error {
+	// N.B. sqlite3 has a max of 999 arguments.
+	// https://www.sqlite.org/limits.html#max_variable_number
+	if len(u.insertLabelArgs) >= 990 {
+		if err := u.flush(); err != nil {
+			return err
+		}
+	}
+	u.insertLabelArgs = append(u.insertLabelArgs, u.ID, u.recordid, key, value)
+	return nil
+}
+
+// repeatDelim returns a string consisting of n copies of s with delim between each copy.
+func repeatDelim(s, delim string, n int) string {
+	return strings.TrimSuffix(strings.Repeat(s+delim, n), delim)
+}
+
+// insertMultiple executes a single INSERT statement to insert multiple rows.
+func insertMultiple(tx *sql.Tx, sqlPrefix string, argsPerRow int, args []interface{}) error {
+	if len(args) == 0 {
+		return nil
+	}
+	query := sqlPrefix + repeatDelim("("+repeatDelim("?", ", ", argsPerRow)+")", ", ", len(args)/argsPerRow)
+	_, err := tx.Exec(query, args...)
+	return err
+}
+
+// flush sends INSERT statements for any pending data in u.insertRecordArgs and u.insertLabelArgs.
+func (u *Upload) flush() error {
+	if n := len(u.insertRecordArgs); n > 0 {
+		if err := insertMultiple(u.tx, "INSERT INTO Records(UploadID, RecordID, Content) VALUES ", 3, u.insertRecordArgs); err != nil {
+			return err
+		}
+		u.insertRecordArgs = nil
+	}
+	if n := len(u.insertLabelArgs); n > 0 {
+		if err := insertMultiple(u.tx, "INSERT INTO RecordLabels VALUES ", 4, u.insertLabelArgs); err != nil {
+			return err
+		}
+		u.insertLabelArgs = nil
+	}
 	return nil
 }
 
 // Commit finishes processing the upload.
 func (u *Upload) Commit() error {
+	if err := u.flush(); err != nil {
+		return err
+	}
 	return u.tx.Commit()
 }
 
@@ -465,11 +508,10 @@
 
 // Close closes the database connections, releasing any open resources.
 func (db *DB) Close() error {
-	if err := db.insertUpload.Close(); err != nil {
-		return err
-	}
-	if err := db.insertRecord.Close(); err != nil {
-		return err
+	for _, stmt := range []*sql.Stmt{db.lastUpload, db.insertUpload, db.checkUpload, db.deleteRecords} {
+		if err := stmt.Close(); err != nil {
+			return err
+		}
 	}
 	return db.sql.Close()
 }