database/sql: add SetMaxOpenConns

Update #4805

Add the ability to set an open connection limit.
Fixed case where the Conn finalCloser was being called with db.mu locked.
Added separate benchmarks for each path for Exec and Query.
Replaced slice based idle pool with list based idle pool.

R=bradfitz
CC=golang-dev
https://golang.org/cl/10726044
diff --git a/src/pkg/database/sql/sql.go b/src/pkg/database/sql/sql.go
index d81f6fe..ae313ca 100644
--- a/src/pkg/database/sql/sql.go
+++ b/src/pkg/database/sql/sql.go
@@ -10,6 +10,7 @@
 package sql
 
 import (
+	"container/list"
 	"database/sql/driver"
 	"errors"
 	"fmt"
@@ -192,12 +193,22 @@
 	driver driver.Driver
 	dsn    string
 
-	mu       sync.Mutex // protects following fields
-	freeConn []*driverConn
+	mu           sync.Mutex // protects following fields
+	freeConn     *list.List // of *driverConn
+	connRequests *list.List // of connRequest
+	numOpen      int
+	pendingOpens int
+	// Used to sygnal the need for new connections
+	// a goroutine running connectionOpener() reads on this chan and
+	// maybeOpenNewConnections sends on the chan (one send per needed connection)
+	// It is closed during db.Close(). The close tells the connectionOpener
+	// goroutine to exit.
+	openerCh chan struct{}
 	closed   bool
 	dep      map[finalCloser]depSet
 	lastPut  map[*driverConn]string // stacktrace of last conn's put; debug only
 	maxIdle  int                    // zero means defaultMaxIdleConns; negative means 0
+	maxOpen  int                    // <= 0 means unlimited
 }
 
 // driverConn wraps a driver.Conn with a mutex, to
@@ -217,6 +228,9 @@
 	inUse      bool
 	onPut      []func() // code (with db.mu held) run when conn is next returned
 	dbmuClosed bool     // same as closed, but guarded by db.mu, for connIfFree
+	// This is the Element returned by db.freeConn.PushFront(conn).
+	// It's used by connIfFree to remove the conn from the freeConn list.
+	listElem *list.Element
 }
 
 func (dc *driverConn) releaseConn(err error) {
@@ -254,15 +268,14 @@
 }
 
 // the dc.db's Mutex is held.
-func (dc *driverConn) closeDBLocked() error {
+func (dc *driverConn) closeDBLocked() func() error {
 	dc.Lock()
+	defer dc.Unlock()
 	if dc.closed {
-		dc.Unlock()
-		return errors.New("sql: duplicate driverConn close")
+		return func() error { return errors.New("sql: duplicate driverConn close") }
 	}
 	dc.closed = true
-	dc.Unlock() // not defer; removeDep finalClose calls may need to lock
-	return dc.db.removeDepLocked(dc, dc)()
+	return dc.db.removeDepLocked(dc, dc)
 }
 
 func (dc *driverConn) Close() error {
@@ -293,8 +306,13 @@
 	err := dc.ci.Close()
 	dc.ci = nil
 	dc.finalClosed = true
-
 	dc.Unlock()
+
+	dc.db.mu.Lock()
+	dc.db.numOpen--
+	dc.db.maybeOpenNewConnections()
+	dc.db.mu.Unlock()
+
 	return err
 }
 
@@ -380,6 +398,13 @@
 	}
 }
 
+// This is the size of the connectionOpener request chan (dn.openerCh).
+// This value should be larger than the maximum typical value
+// used for db.maxOpen. If maxOpen is significantly larger than
+// connectionRequestQueueSize then it is possible for ALL calls into the *DB
+// to block until the connectionOpener can satify the backlog of requests.
+var connectionRequestQueueSize = 1000000
+
 // Open opens a database specified by its database driver name and a
 // driver-specific data source name, usually consisting of at least a
 // database name and connection information.
@@ -398,10 +423,14 @@
 		return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
 	}
 	db := &DB{
-		driver:  driveri,
-		dsn:     dataSourceName,
-		lastPut: make(map[*driverConn]string),
+		driver:   driveri,
+		dsn:      dataSourceName,
+		openerCh: make(chan struct{}, connectionRequestQueueSize),
+		lastPut:  make(map[*driverConn]string),
 	}
+	db.freeConn = list.New()
+	db.connRequests = list.New()
+	go db.connectionOpener()
 	return db, nil
 }
 
@@ -422,16 +451,32 @@
 // Close closes the database, releasing any open resources.
 func (db *DB) Close() error {
 	db.mu.Lock()
-	defer db.mu.Unlock()
+	if db.closed { // Make DB.Close idempotent
+		db.mu.Unlock()
+		return nil
+	}
+	close(db.openerCh)
 	var err error
-	for _, dc := range db.freeConn {
-		err1 := dc.closeDBLocked()
+	fns := make([]func() error, 0, db.freeConn.Len())
+	for db.freeConn.Front() != nil {
+		dc := db.freeConn.Front().Value.(*driverConn)
+		dc.listElem = nil
+		fns = append(fns, dc.closeDBLocked())
+		db.freeConn.Remove(db.freeConn.Front())
+	}
+	db.closed = true
+	for db.connRequests.Front() != nil {
+		req := db.connRequests.Front().Value.(connRequest)
+		db.connRequests.Remove(db.connRequests.Front())
+		close(req)
+	}
+	db.mu.Unlock()
+	for _, fn := range fns {
+		err1 := fn()
 		if err1 != nil {
 			err = err1
 		}
 	}
-	db.freeConn = nil
-	db.closed = true
 	return err
 }
 
@@ -453,6 +498,9 @@
 // SetMaxIdleConns sets the maximum number of connections in the idle
 // connection pool.
 //
+// If MaxOpenConns is greater than 0 but less than the new MaxIdleConns
+// then the new MaxIdleConns will be reduced to match the MaxOpenConns limit
+//
 // If n <= 0, no idle connections are retained.
 func (db *DB) SetMaxIdleConns(n int) {
 	db.mu.Lock()
@@ -463,40 +511,148 @@
 		// No idle connections.
 		db.maxIdle = -1
 	}
-	for len(db.freeConn) > 0 && len(db.freeConn) > n {
-		nfree := len(db.freeConn)
-		dc := db.freeConn[nfree-1]
-		db.freeConn[nfree-1] = nil
-		db.freeConn = db.freeConn[:nfree-1]
+	// Make sure maxIdle doesn't exceed maxOpen
+	if db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen {
+		db.maxIdle = db.maxOpen
+	}
+	for db.freeConn.Len() > db.maxIdleConnsLocked() {
+		dc := db.freeConn.Back().Value.(*driverConn)
+		dc.listElem = nil
+		db.freeConn.Remove(db.freeConn.Back())
 		go dc.Close()
 	}
 }
 
+// SetMaxOpenConns sets the maximum number of open connections to the database.
+//
+// If MaxIdleConns is greater than 0 and the new MaxOpenConns is less than
+// MaxIdleConns, then MaxIdleConns will be reduced to match the new
+// MaxOpenConns limit
+//
+// If n <= 0, then there is no limit on the number of open connections.
+// The default is 0 (unlimited).
+func (db *DB) SetMaxOpenConns(n int) {
+	db.mu.Lock()
+	db.maxOpen = n
+	if n < 0 {
+		db.maxOpen = 0
+	}
+	syncMaxIdle := db.maxOpen > 0 && db.maxIdleConnsLocked() > db.maxOpen
+	db.mu.Unlock()
+	if syncMaxIdle {
+		db.SetMaxIdleConns(n)
+	}
+}
+
+// Assumes db.mu is locked.
+// If there are connRequests and the connection limit hasn't been reached,
+// then tell the connectionOpener to open new connections.
+func (db *DB) maybeOpenNewConnections() {
+	numRequests := db.connRequests.Len() - db.pendingOpens
+	if db.maxOpen > 0 {
+		numCanOpen := db.maxOpen - (db.numOpen + db.pendingOpens)
+		if numRequests > numCanOpen {
+			numRequests = numCanOpen
+		}
+	}
+	for numRequests > 0 {
+		db.pendingOpens++
+		numRequests--
+		db.openerCh <- struct{}{}
+	}
+}
+
+// Runs in a seperate goroutine, opens new connections when requested.
+func (db *DB) connectionOpener() {
+	for _ = range db.openerCh {
+		db.openNewConnection()
+	}
+}
+
+// Open one new connection
+func (db *DB) openNewConnection() {
+	ci, err := db.driver.Open(db.dsn)
+	db.mu.Lock()
+	defer db.mu.Unlock()
+	if db.closed {
+		if err == nil {
+			ci.Close()
+		}
+		return
+	}
+	db.pendingOpens--
+	if err != nil {
+		db.putConnDBLocked(nil, err)
+		return
+	}
+	dc := &driverConn{
+		db: db,
+		ci: ci,
+	}
+	db.addDepLocked(dc, dc)
+	db.numOpen++
+	db.putConnDBLocked(dc, err)
+}
+
+// connRequest represents one request for a new connection
+// When there are no idle connections available, DB.conn will create
+// a new connRequest and put it on the db.connRequests list.
+type connRequest chan<- interface{} // takes either a *driverConn or an error
+
+var errDBClosed = errors.New("sql: database is closed")
+
 // conn returns a newly-opened or cached *driverConn
 func (db *DB) conn() (*driverConn, error) {
 	db.mu.Lock()
 	if db.closed {
 		db.mu.Unlock()
-		return nil, errors.New("sql: database is closed")
+		return nil, errDBClosed
 	}
-	if n := len(db.freeConn); n > 0 {
-		conn := db.freeConn[n-1]
-		db.freeConn = db.freeConn[:n-1]
+
+	// If db.maxOpen > 0 and the number of open connections is over the limit
+	// or there are no free connection, then make a request and wait.
+	if db.maxOpen > 0 && (db.numOpen >= db.maxOpen || db.freeConn.Len() == 0) {
+		// Make the connRequest channel. It's buffered so that the
+		// connectionOpener doesn't block while waiting for the req to be read.
+		ch := make(chan interface{}, 1)
+		req := connRequest(ch)
+		db.connRequests.PushBack(req)
+		db.maybeOpenNewConnections()
+		db.mu.Unlock()
+		ret, ok := <-ch
+		if !ok {
+			return nil, errDBClosed
+		}
+		switch ret.(type) {
+		case *driverConn:
+			return ret.(*driverConn), nil
+		case error:
+			return nil, ret.(error)
+		default:
+			panic("sql: Unexpected type passed through connRequest.ch")
+		}
+	}
+
+	if f := db.freeConn.Front(); f != nil {
+		conn := f.Value.(*driverConn)
+		conn.listElem = nil
+		db.freeConn.Remove(f)
 		conn.inUse = true
 		db.mu.Unlock()
 		return conn, nil
 	}
-	db.mu.Unlock()
 
+	db.mu.Unlock()
 	ci, err := db.driver.Open(db.dsn)
 	if err != nil {
 		return nil, err
 	}
+	db.mu.Lock()
+	db.numOpen++
 	dc := &driverConn{
 		db: db,
 		ci: ci,
 	}
-	db.mu.Lock()
 	db.addDepLocked(dc, dc)
 	dc.inUse = true
 	db.mu.Unlock()
@@ -524,12 +680,9 @@
 	if wanted.inUse {
 		return nil, errConnBusy
 	}
-	for i, conn := range db.freeConn {
-		if conn != wanted {
-			continue
-		}
-		db.freeConn[i] = db.freeConn[len(db.freeConn)-1]
-		db.freeConn = db.freeConn[:len(db.freeConn)-1]
+	if wanted.listElem != nil {
+		db.freeConn.Remove(wanted.listElem)
+		wanted.listElem = nil
 		wanted.inUse = true
 		return wanted, nil
 	}
@@ -589,6 +742,10 @@
 
 	if err == driver.ErrBadConn {
 		// Don't reuse bad connections.
+		// Since the conn is considered bad and is being discarded, treat it
+		// as closed. Decrement the open count.
+		db.numOpen--
+		db.maybeOpenNewConnections()
 		db.mu.Unlock()
 		dc.Close()
 		return
@@ -596,14 +753,39 @@
 	if putConnHook != nil {
 		putConnHook(db, dc)
 	}
-	if n := len(db.freeConn); !db.closed && n < db.maxIdleConnsLocked() {
-		db.freeConn = append(db.freeConn, dc)
-		db.mu.Unlock()
-		return
-	}
+	added := db.putConnDBLocked(dc, nil)
 	db.mu.Unlock()
 
-	dc.Close()
+	if !added {
+		dc.Close()
+	}
+}
+
+// Satisfy a connRequest or put the driverConn in the idle pool and return true
+// or return false.
+// putConnDBLocked will satisfy a connRequest if there is one, or it will
+// return the *driverConn to the freeConn list if err != nil and the idle
+// connection limit would not be reached.
+// If err != nil, the value of dc is ignored.
+// If err == nil, then dc must not equal nil.
+// If a connRequest was fullfilled or the *driverConn was placed in the
+// freeConn list, then true is returned, otherwise false is returned.
+func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
+	if db.connRequests.Len() > 0 {
+		req := db.connRequests.Front().Value.(connRequest)
+		db.connRequests.Remove(db.connRequests.Front())
+		if err != nil {
+			req <- err
+		} else {
+			dc.inUse = true
+			req <- dc
+		}
+		return true
+	} else if err == nil && !db.closed && db.maxIdleConnsLocked() > 0 && db.maxIdleConnsLocked() > db.freeConn.Len() {
+		dc.listElem = db.freeConn.PushFront(dc)
+		return true
+	}
+	return false
 }
 
 // Prepare creates a prepared statement for later queries or executions.
@@ -1250,26 +1432,32 @@
 		return s.stickyErr
 	}
 	s.mu.Lock()
-	defer s.mu.Unlock()
 	if s.closed {
+		s.mu.Unlock()
 		return nil
 	}
 	s.closed = true
 
 	if s.tx != nil {
 		s.txsi.Close()
+		s.mu.Unlock()
 		return nil
 	}
+	s.mu.Unlock()
 
 	return s.db.removeDep(s, s)
 }
 
 func (s *Stmt) finalClose() error {
-	for _, v := range s.css {
-		s.db.noteUnusedDriverStatement(v.dc, v.si)
-		v.dc.removeOpenStmt(v.si)
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.css != nil {
+		for _, v := range s.css {
+			s.db.noteUnusedDriverStatement(v.dc, v.si)
+			v.dc.removeOpenStmt(v.si)
+		}
+		s.css = nil
 	}
-	s.css = nil
 	return nil
 }