storage: reuse transaction for uploads

Reuse a single transaction for all INSERT statements for a single
upload. This cuts about 50% of the runtime off an invocation of
the tests with -cloud, and about 25% of the runtime off an /upload run
on App Engine.

Change-Id: I90068f60420cfb67ae693f18ed83b341c2e483bd
Reviewed-on: https://go-review.googlesource.com/35069
Reviewed-by: Russ Cox <rsc@golang.org>
diff --git a/storage/app/upload.go b/storage/app/upload.go
index 68a378f..adb2577 100644
--- a/storage/app/upload.go
+++ b/storage/app/upload.go
@@ -99,6 +99,11 @@
 			if err != nil {
 				return nil, err
 			}
+			defer func() {
+				if upload != nil {
+					upload.Abort()
+				}
+			}()
 		}
 
 		// The incoming file needs to be stored in Cloud
@@ -121,11 +126,20 @@
 		fileids = append(fileids, meta["fileid"])
 	}
 
+	if upload == nil {
+		return nil, errors.New("no files processed")
+	}
+	if err := upload.Commit(); err != nil {
+		return nil, err
+	}
+
 	status := &uploadStatus{UploadID: upload.ID, FileIDs: fileids}
 	if a.ViewURLBase != "" {
 		status.ViewURL = a.ViewURLBase + url.QueryEscape(upload.ID)
 	}
 
+	upload = nil
+
 	return status, nil
 }
 
diff --git a/storage/db/db.go b/storage/db/db.go
index 3c43810..0039ab8 100644
--- a/storage/db/db.go
+++ b/storage/db/db.go
@@ -145,12 +145,14 @@
 	recordid int64
 	// db is the underlying database that this upload is going to.
 	db *DB
+	// tx is the transaction used by the upload.
+	tx *sql.Tx
 }
 
 // NewUpload returns an upload for storing new files.
 // All records written to the Upload will have the same upload ID.
 func (db *DB) NewUpload(ctx context.Context) (*Upload, error) {
-	// TODO(quentin): Use a transaction?
+	// TODO(quentin): Use the same transaction as the rest of the upload?
 	res, err := db.insertUpload.Exec()
 	if err != nil {
 		return nil, err
@@ -160,33 +162,27 @@
 	if err != nil {
 		return nil, err
 	}
+	tx, err := db.sql.Begin()
+	if err != nil {
+		return nil, err
+	}
 	return &Upload{
 		ID: fmt.Sprint(i),
 		id: i,
 		db: db,
+		tx: tx,
 	}, nil
 }
 
 // InsertRecord inserts a single record in an existing upload.
-func (u *Upload) InsertRecord(r *benchfmt.Result) (err error) {
-	// TODO(quentin): Use a single transaction for the whole upload?
-	tx, err := u.db.sql.Begin()
-	if err != nil {
-		return err
-	}
-	defer func() {
-		if err != nil {
-			tx.Rollback()
-		} else {
-			err = tx.Commit()
-		}
-	}()
+// If InsertRecord returns a non-nil error, the Upload has failed and u.Abort() must be called.
+func (u *Upload) InsertRecord(r *benchfmt.Result) error {
 	// TODO(quentin): Support multiple lines (slice of results?)
 	var buf bytes.Buffer
 	if err := benchfmt.NewPrinter(&buf).Print(r); err != nil {
 		return err
 	}
-	if _, err = tx.Stmt(u.db.insertRecord).Exec(u.id, u.recordid, buf.Bytes()); err != nil {
+	if _, err := u.tx.Stmt(u.db.insertRecord).Exec(u.id, u.recordid, buf.Bytes()); err != nil {
 		return err
 	}
 	var args []interface{}
@@ -199,7 +195,7 @@
 	if len(args) > 0 {
 		query := "INSERT INTO RecordLabels VALUES " + strings.Repeat("(?, ?, ?, ?), ", len(args)/4)
 		query = strings.TrimSuffix(query, ", ")
-		if _, err := tx.Exec(query, args...); err != nil {
+		if _, err := u.tx.Exec(query, args...); err != nil {
 			return err
 		}
 	}
@@ -207,6 +203,17 @@
 	return nil
 }
 
+// Commit finishes processing the upload.
+func (u *Upload) Commit() error {
+	return u.tx.Commit()
+}
+
+// Abort cleans up resources associated with the upload.
+// It does not attempt to clean up partial database state.
+func (u *Upload) Abort() error {
+	return u.tx.Rollback()
+}
+
 // Query searches for results matching the given query string.
 //
 // The query string is first parsed into quoted words (as in the shell)
diff --git a/storage/db/db_test.go b/storage/db/db_test.go
index 85c4820..a3e7bd9 100644
--- a/storage/db/db_test.go
+++ b/storage/db/db_test.go
@@ -60,6 +60,10 @@
 		t.Fatalf("InsertRecord: %v", err)
 	}
 
+	if err := u.Commit(); err != nil {
+		t.Fatalf("Commit: %v", err)
+	}
+
 	rows, err := DBSQL(db).Query("SELECT UploadId, RecordId, Name, Value FROM RecordLabels")
 	if err != nil {
 		t.Fatalf("sql.Query: %v", err)
@@ -119,6 +123,9 @@
 			t.Fatalf("InsertRecord: %v", err)
 		}
 	}
+	if err := u.Commit(); err != nil {
+		t.Fatalf("Commit: %v", err)
+	}
 
 	tests := []struct {
 		q    string