storage/app: parse incoming benchmark records

This change parses incoming benchmark records in parallel to writing
them to Google Cloud Storage. It does not yet attempt to insert the
parsed records into Cloud SQL.

Change-Id: I250b334569b8d59f2366025db5c56add51b96bd6
Reviewed-on: https://go-review.googlesource.com/34628
Reviewed-by: Russ Cox <rsc@golang.org>
diff --git a/storage/app/upload.go b/storage/app/upload.go
index 7943f9e..b9c56be 100644
--- a/storage/app/upload.go
+++ b/storage/app/upload.go
@@ -6,6 +6,7 @@
 
 import (
 	"encoding/json"
+	"errors"
 	"fmt"
 	"io"
 	"mime/multipart"
@@ -67,8 +68,7 @@
 // writes them to the filesystem, and indexes their content.
 func (a *App) processUpload(ctx context.Context, mr *multipart.Reader) (*uploadStatus, error) {
 	var uploadid string
-
-	var status uploadStatus
+	var fileids []string
 
 	for i := 0; ; i++ {
 		p, err := mr.NextPart()
@@ -87,48 +87,74 @@
 			if err != nil {
 				return nil, err
 			}
-			status.UploadID = uploadid
 		}
 
 		// The incoming file needs to be stored in Cloud
 		// Storage and it also needs to be indexed. If the file
 		// is invalid (contains no valid records) it needs to
 		// be rejected and the Cloud Storage upload aborted.
-		// TODO(quentin): We might as well do these in parallel.
 
 		meta := fileMetadata(ctx, uploadid, i)
 
-		fw, err := a.FS.NewWriter(ctx, fmt.Sprintf("uploads/%s.txt", meta["fileid"]), meta)
-		if err != nil {
+		// We need to do two things with the incoming data:
+		// - Write it to permanent storage via a.FS
+		// - Write index records to a.DB
+		// AND if anything fails, attempt to clean up both the
+		// FS and the index records.
+
+		if err := a.indexFile(ctx, p, meta); err != nil {
 			return nil, err
 		}
 
-		var keys []string
-		for k := range meta {
-			keys = append(keys, k)
-		}
-		sort.Strings(keys)
-		for _, k := range keys {
-			if _, err := fmt.Fprintf(fw, "%s: %s\n", k, meta[k]); err != nil {
-				fw.CloseWithError(err)
-				return nil, err
-			}
-		}
-
-		if _, err := io.Copy(fw, p); err != nil {
-			fw.CloseWithError(err)
-			return nil, err
-		}
-		// TODO(quentin): Write records to database
-
-		if err := fw.Close(); err != nil {
-			return nil, err
-		}
-
-		status.FileIDs = append(status.FileIDs, meta["fileid"])
+		fileids = append(fileids, meta["fileid"])
 	}
 
-	return &status, nil
+	return &uploadStatus{uploadid, fileids}, nil
+}
+
+func (a *App) indexFile(ctx context.Context, p io.Reader, meta map[string]string) (err error) {
+	fw, err := a.FS.NewWriter(ctx, fmt.Sprintf("uploads/%s.txt", meta["fileid"]), meta)
+	if err != nil {
+		return err
+	}
+	defer func() {
+		if err != nil {
+			fw.CloseWithError(err)
+		} else {
+			err = fw.Close()
+		}
+	}()
+	var keys []string
+	for k := range meta {
+		keys = append(keys, k)
+	}
+	sort.Strings(keys)
+	for _, k := range keys {
+		if _, err := fmt.Fprintf(fw, "%s: %s\n", k, meta[k]); err != nil {
+			return err
+		}
+	}
+
+	// TODO(quentin): Add a separate goroutine and buffer for writes to fw?
+	tr := io.TeeReader(p, fw)
+	br := NewBenchmarkReader(tr)
+	br.AddLabels(meta)
+	i := 0
+	for {
+		result, err := br.Next()
+		if err != nil {
+			if err != io.EOF {
+				return err
+			}
+			if i == 0 {
+				return errors.New("no valid benchmark lines found")
+			}
+			return nil
+		}
+		i++
+		// TODO(quentin): Write records to database
+		_ = result
+	}
 }
 
 // fileMetadata returns the extra metadata fields associated with an