http: add pipelining to ClientConn, ServerConn

R=rsc, bradfitzwork
CC=golang-dev
https://golang.org/cl/4082044
diff --git a/src/pkg/http/persist.go b/src/pkg/http/persist.go
index 8bfc097..000a420 100644
--- a/src/pkg/http/persist.go
+++ b/src/pkg/http/persist.go
@@ -6,14 +6,17 @@
 
 import (
 	"bufio"
-	"container/list"
 	"io"
 	"net"
+	"net/textproto"
 	"os"
 	"sync"
 )
 
-var ErrPersistEOF = &ProtocolError{"persistent connection closed"}
+var (
+	ErrPersistEOF = &ProtocolError{"persistent connection closed"}
+	ErrPipeline   = &ProtocolError{"pipeline error"}
+)
 
 // A ServerConn reads requests and sends responses over an underlying
 // connection, until the HTTP keepalive logic commands an end. ServerConn
@@ -26,8 +29,10 @@
 	r               *bufio.Reader
 	clsd            bool     // indicates a graceful close
 	re, we          os.Error // read/write errors
-	lastBody        io.ReadCloser
+	lastbody        io.ReadCloser
 	nread, nwritten int
+	pipe            textproto.Pipeline
+	pipereq         map[*Request]uint
 	lk              sync.Mutex // protected read/write to re,we
 }
 
@@ -37,7 +42,7 @@
 	if r == nil {
 		r = bufio.NewReader(c)
 	}
-	return &ServerConn{c: c, r: r}
+	return &ServerConn{c: c, r: r, pipereq: make(map[*Request]uint)}
 }
 
 // Close detaches the ServerConn and returns the underlying connection as well
@@ -57,10 +62,25 @@
 // Read returns the next request on the wire. An ErrPersistEOF is returned if
 // it is gracefully determined that there are no more requests (e.g. after the
 // first request on an HTTP/1.0 connection, or after a Connection:close on a
-// HTTP/1.1 connection). Read can be called concurrently with Write, but not
-// with another Read.
+// HTTP/1.1 connection).
 func (sc *ServerConn) Read() (req *Request, err os.Error) {
 
+	// Ensure ordered execution of Reads and Writes
+	id := sc.pipe.Next()
+	sc.pipe.StartRequest(id)
+	defer func() {
+		sc.pipe.EndRequest(id)
+		if req == nil {
+			sc.pipe.StartResponse(id)
+			sc.pipe.EndResponse(id)
+		} else {
+			// Remember the pipeline id of this request
+			sc.lk.Lock()
+			sc.pipereq[req] = id
+			sc.lk.Unlock()
+		}
+	}()
+
 	sc.lk.Lock()
 	if sc.we != nil { // no point receiving if write-side broken or closed
 		defer sc.lk.Unlock()
@@ -73,12 +93,12 @@
 	sc.lk.Unlock()
 
 	// Make sure body is fully consumed, even if user does not call body.Close
-	if sc.lastBody != nil {
+	if sc.lastbody != nil {
 		// body.Close is assumed to be idempotent and multiple calls to
 		// it should return the error that its first invokation
 		// returned.
-		err = sc.lastBody.Close()
-		sc.lastBody = nil
+		err = sc.lastbody.Close()
+		sc.lastbody = nil
 		if err != nil {
 			sc.lk.Lock()
 			defer sc.lk.Unlock()
@@ -102,7 +122,7 @@
 			return
 		}
 	}
-	sc.lastBody = req.Body
+	sc.lastbody = req.Body
 	sc.nread++
 	if req.Close {
 		sc.lk.Lock()
@@ -121,11 +141,24 @@
 	return sc.nread - sc.nwritten
 }
 
-// Write writes a repsonse. To close the connection gracefully, set the
+// Write writes resp in response to req. To close the connection gracefully, set the
 // Response.Close field to true. Write should be considered operational until
 // it returns an error, regardless of any errors returned on the Read side.
-// Write can be called concurrently with Read, but not with another Write.
-func (sc *ServerConn) Write(resp *Response) os.Error {
+func (sc *ServerConn) Write(req *Request, resp *Response) os.Error {
+
+	// Retrieve the pipeline ID of this request/response pair
+	sc.lk.Lock()
+	id, ok := sc.pipereq[req]
+	sc.pipereq[req] = 0, false
+	if !ok {
+		sc.lk.Unlock()
+		return ErrPipeline
+	}
+	sc.lk.Unlock()
+
+	// Ensure pipeline order
+	sc.pipe.StartResponse(id)
+	defer sc.pipe.EndResponse(id)
 
 	sc.lk.Lock()
 	if sc.we != nil {
@@ -166,10 +199,11 @@
 	c               net.Conn
 	r               *bufio.Reader
 	re, we          os.Error // read/write errors
-	lastBody        io.ReadCloser
+	lastbody        io.ReadCloser
 	nread, nwritten int
-	reqm            list.List  // request methods in order of execution
-	lk              sync.Mutex // protects read/write to reqm,re,we
+	pipe            textproto.Pipeline
+	pipereq         map[*Request]uint
+	lk              sync.Mutex // protects read/write to re,we,pipereq,etc.
 }
 
 // NewClientConn returns a new ClientConn reading and writing c.  If r is not
@@ -178,7 +212,7 @@
 	if r == nil {
 		r = bufio.NewReader(c)
 	}
-	return &ClientConn{c: c, r: r}
+	return &ClientConn{c: c, r: r, pipereq: make(map[*Request]uint)}
 }
 
 // Close detaches the ClientConn and returns the underlying connection as well
@@ -191,7 +225,6 @@
 	r = cc.r
 	cc.c = nil
 	cc.r = nil
-	cc.reqm.Init()
 	cc.lk.Unlock()
 	return
 }
@@ -201,8 +234,23 @@
 // keepalive connection is logically closed after this request and the opposing
 // server is informed. An ErrUnexpectedEOF indicates the remote closed the
 // underlying TCP connection, which is usually considered as graceful close.
-// Write can be called concurrently with Read, but not with another Write.
-func (cc *ClientConn) Write(req *Request) os.Error {
+func (cc *ClientConn) Write(req *Request) (err os.Error) {
+
+	// Ensure ordered execution of Writes
+	id := cc.pipe.Next()
+	cc.pipe.StartRequest(id)
+	defer func() {
+		cc.pipe.EndRequest(id)
+		if err != nil {
+			cc.pipe.StartResponse(id)
+			cc.pipe.EndResponse(id)
+		} else {
+			// Remember the pipeline id of this request
+			cc.lk.Lock()
+			cc.pipereq[req] = id
+			cc.lk.Unlock()
+		}
+	}()
 
 	cc.lk.Lock()
 	if cc.re != nil { // no point sending if read-side closed or broken
@@ -223,7 +271,7 @@
 		cc.lk.Unlock()
 	}
 
-	err := req.Write(cc.c)
+	err = req.Write(cc.c)
 	if err != nil {
 		cc.lk.Lock()
 		defer cc.lk.Unlock()
@@ -231,9 +279,6 @@
 		return err
 	}
 	cc.nwritten++
-	cc.lk.Lock()
-	cc.reqm.PushBack(req.Method)
-	cc.lk.Unlock()
 
 	return nil
 }
@@ -250,7 +295,21 @@
 // returned together with an ErrPersistEOF, which means that the remote
 // requested that this be the last request serviced. Read can be called
 // concurrently with Write, but not with another Read.
-func (cc *ClientConn) Read() (resp *Response, err os.Error) {
+func (cc *ClientConn) Read(req *Request) (resp *Response, err os.Error) {
+
+	// Retrieve the pipeline ID of this request/response pair
+	cc.lk.Lock()
+	id, ok := cc.pipereq[req]
+	cc.pipereq[req] = 0, false
+	if !ok {
+		cc.lk.Unlock()
+		return nil, ErrPipeline
+	}
+	cc.lk.Unlock()
+
+	// Ensure pipeline order
+	cc.pipe.StartResponse(id)
+	defer cc.pipe.EndResponse(id)
 
 	cc.lk.Lock()
 	if cc.re != nil {
@@ -259,17 +318,13 @@
 	}
 	cc.lk.Unlock()
 
-	if cc.nread >= cc.nwritten {
-		return nil, os.NewError("persist client pipe count")
-	}
-
 	// Make sure body is fully consumed, even if user does not call body.Close
-	if cc.lastBody != nil {
+	if cc.lastbody != nil {
 		// body.Close is assumed to be idempotent and multiple calls to
 		// it should return the error that its first invokation
 		// returned.
-		err = cc.lastBody.Close()
-		cc.lastBody = nil
+		err = cc.lastbody.Close()
+		cc.lastbody = nil
 		if err != nil {
 			cc.lk.Lock()
 			defer cc.lk.Unlock()
@@ -278,18 +333,14 @@
 		}
 	}
 
-	cc.lk.Lock()
-	m := cc.reqm.Front()
-	cc.reqm.Remove(m)
-	cc.lk.Unlock()
-	resp, err = ReadResponse(cc.r, m.Value.(string))
+	resp, err = ReadResponse(cc.r, req.Method)
 	if err != nil {
 		cc.lk.Lock()
 		defer cc.lk.Unlock()
 		cc.re = err
 		return
 	}
-	cc.lastBody = resp.Body
+	cc.lastbody = resp.Body
 
 	cc.nread++
 
@@ -301,3 +352,12 @@
 	}
 	return
 }
+
+// Do is convenience method that writes a request and reads a response.
+func (cc *ClientConn) Do(req *Request) (resp *Response, err os.Error) {
+	err = cc.Write(req)
+	if err != nil {
+		return
+	}
+	return cc.Read(req)
+}