storage: reuse transaction for uploads
Reuse a single transaction for all INSERT statements for a single
upload. This cuts about 50% of the runtime off an invocation of
the tests with -cloud, and about 25% of the runtime off an /upload run
on App Engine.
Change-Id: I90068f60420cfb67ae693f18ed83b341c2e483bd
Reviewed-on: https://go-review.googlesource.com/35069
Reviewed-by: Russ Cox <rsc@golang.org>
diff --git a/storage/app/upload.go b/storage/app/upload.go
index 68a378f..adb2577 100644
--- a/storage/app/upload.go
+++ b/storage/app/upload.go
@@ -99,6 +99,11 @@
if err != nil {
return nil, err
}
+ defer func() {
+ if upload != nil {
+ upload.Abort()
+ }
+ }()
}
// The incoming file needs to be stored in Cloud
@@ -121,11 +126,20 @@
fileids = append(fileids, meta["fileid"])
}
+ if upload == nil {
+ return nil, errors.New("no files processed")
+ }
+ if err := upload.Commit(); err != nil {
+ return nil, err
+ }
+
status := &uploadStatus{UploadID: upload.ID, FileIDs: fileids}
if a.ViewURLBase != "" {
status.ViewURL = a.ViewURLBase + url.QueryEscape(upload.ID)
}
+ upload = nil
+
return status, nil
}
diff --git a/storage/db/db.go b/storage/db/db.go
index 3c43810..0039ab8 100644
--- a/storage/db/db.go
+++ b/storage/db/db.go
@@ -145,12 +145,14 @@
recordid int64
// db is the underlying database that this upload is going to.
db *DB
+ // tx is the transaction used by the upload.
+ tx *sql.Tx
}
// NewUpload returns an upload for storing new files.
// All records written to the Upload will have the same upload ID.
func (db *DB) NewUpload(ctx context.Context) (*Upload, error) {
- // TODO(quentin): Use a transaction?
+ // TODO(quentin): Use the same transaction as the rest of the upload?
res, err := db.insertUpload.Exec()
if err != nil {
return nil, err
@@ -160,33 +162,27 @@
if err != nil {
return nil, err
}
+ tx, err := db.sql.Begin()
+ if err != nil {
+ return nil, err
+ }
return &Upload{
ID: fmt.Sprint(i),
id: i,
db: db,
+ tx: tx,
}, nil
}
// InsertRecord inserts a single record in an existing upload.
-func (u *Upload) InsertRecord(r *benchfmt.Result) (err error) {
- // TODO(quentin): Use a single transaction for the whole upload?
- tx, err := u.db.sql.Begin()
- if err != nil {
- return err
- }
- defer func() {
- if err != nil {
- tx.Rollback()
- } else {
- err = tx.Commit()
- }
- }()
+// If InsertRecord returns a non-nil error, the Upload has failed and u.Abort() must be called.
+func (u *Upload) InsertRecord(r *benchfmt.Result) error {
// TODO(quentin): Support multiple lines (slice of results?)
var buf bytes.Buffer
if err := benchfmt.NewPrinter(&buf).Print(r); err != nil {
return err
}
- if _, err = tx.Stmt(u.db.insertRecord).Exec(u.id, u.recordid, buf.Bytes()); err != nil {
+ if _, err := u.tx.Stmt(u.db.insertRecord).Exec(u.id, u.recordid, buf.Bytes()); err != nil {
return err
}
var args []interface{}
@@ -199,7 +195,7 @@
if len(args) > 0 {
query := "INSERT INTO RecordLabels VALUES " + strings.Repeat("(?, ?, ?, ?), ", len(args)/4)
query = strings.TrimSuffix(query, ", ")
- if _, err := tx.Exec(query, args...); err != nil {
+ if _, err := u.tx.Exec(query, args...); err != nil {
return err
}
}
@@ -207,6 +203,17 @@
return nil
}
+// Commit finishes processing the upload.
+func (u *Upload) Commit() error {
+ return u.tx.Commit()
+}
+
+// Abort cleans up resources associated with the upload.
+// It does not attempt to clean up partial database state.
+func (u *Upload) Abort() error {
+ return u.tx.Rollback()
+}
+
// Query searches for results matching the given query string.
//
// The query string is first parsed into quoted words (as in the shell)
diff --git a/storage/db/db_test.go b/storage/db/db_test.go
index 85c4820..a3e7bd9 100644
--- a/storage/db/db_test.go
+++ b/storage/db/db_test.go
@@ -60,6 +60,10 @@
t.Fatalf("InsertRecord: %v", err)
}
+ if err := u.Commit(); err != nil {
+ t.Fatalf("Commit: %v", err)
+ }
+
rows, err := DBSQL(db).Query("SELECT UploadId, RecordId, Name, Value FROM RecordLabels")
if err != nil {
t.Fatalf("sql.Query: %v", err)
@@ -119,6 +123,9 @@
t.Fatalf("InsertRecord: %v", err)
}
}
+ if err := u.Commit(); err != nil {
+ t.Fatalf("Commit: %v", err)
+ }
tests := []struct {
q string