* implement Linux epoll for polling i/o
* isolate OS-specific polling goop in Pollster type
* move generic poll loop out of fd_darwin.go into fd.go

R=r
DELTA=782  (448 added, 281 deleted, 53 changed)
OCL=16108
CL=16119
diff --git a/src/lib/net/fd.go b/src/lib/net/fd.go
new file mode 100644
index 0000000..a93dac0
--- /dev/null
+++ b/src/lib/net/fd.go
@@ -0,0 +1,287 @@
+// Copyright 2009 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// TODO(rsc): All the prints in this file should go to standard error.
+
+package net
+
+import (
+	"net";
+	"once";
+	"os";
+	"syscall";
+)
+
+// Network file descriptor.  Only intended to be used internally,
+// but have to export to make it available in other files implementing package net.
+export type FD struct {
+	// immutable until Close
+	fd int64;
+	osfd *os.FD;
+	cr *chan *FD;
+	cw *chan *FD;
+
+	// owned by fd wait server
+	ncr, ncw int;
+}
+
+// Make reads and writes on fd return EAGAIN instead of blocking.
+func SetNonblock(fd int64) *os.Error {
+	flags, e := syscall.fcntl(fd, syscall.F_GETFL, 0)
+	if e != 0 {
+		return os.ErrnoToError(e)
+	}
+	flags, e = syscall.fcntl(fd, syscall.F_SETFL, flags | syscall.O_NONBLOCK)
+	if e != 0 {
+		return os.ErrnoToError(e)
+	}
+	return nil
+}
+
+
+// A PollServer helps FDs determine when to retry a non-blocking
+// read or write after they get EAGAIN.  When an FD needs to wait,
+// send the fd on s.cr (for a read) or s.cw (for a write) to pass the
+// request to the poll server.  Then receive on fd.cr/fd.cw.
+// When the PollServer finds that i/o on FD should be possible
+// again, it will send fd on fd.cr/fd.cw to wake any waiting processes.
+// This protocol is implemented as s.WaitRead() and s.WaitWrite().
+//
+// There is one subtlety: when sending on s.cr/s.cw, the
+// poll server is probably in a system call, waiting for an fd
+// to become ready.  It's not looking at the request channels.
+// To resolve this, the poll server waits not just on the FDs it has
+// been given but also its own pipe.  After sending on the
+// buffered channel s.cr/s.cw, WaitRead/WaitWrite writes a
+// byte to the pipe, causing the PollServer's poll system call to
+// return.  In response to the pipe being readable, the PollServer
+// re-polls its request channels.
+//
+// Note that the ordering is "send request" and then "wake up server".
+// If the operations were reversed, there would be a race: the poll
+// server might wake up and look at the request channel, see that it
+// was empty, and go back to sleep, all before the requester managed
+// to send the request.  Because the send must complete before the wakeup,
+// the request channel must be buffered.  A buffer of size 1 is sufficient
+// for any request load.  If many processes are trying to submit requests,
+// one will succeed, the PollServer will read the request, and then the
+// channel will be empty for the next process's request.  A larger buffer
+// might help batch requests.
+
+type PollServer struct {
+	cr, cw *chan *FD;	// buffered >= 1
+	pr, pw *os.FD;
+	pending *map[int64] *FD;
+	poll *Pollster;	// low-level OS hooks
+}
+func (s *PollServer) Run();
+
+func NewPollServer() (s *PollServer, err *os.Error) {
+	s = new(PollServer);
+	s.cr = new(chan *FD, 1);
+	s.cw = new(chan *FD, 1);
+	if s.pr, s.pw, err = os.Pipe(); err != nil {
+		return nil, err
+	}
+	if err = SetNonblock(s.pr.fd); err != nil {
+	Error:
+		s.pr.Close();
+		s.pw.Close()
+		return nil, err
+	}
+	if err = SetNonblock(s.pw.fd); err != nil {
+		goto Error
+	}
+	if s.poll, err = NewPollster(); err != nil {
+		goto Error
+	}
+	if err = s.poll.AddFD(s.pr.fd, 'r', true); err != nil {
+		s.poll.Close()
+		goto Error
+	}
+	s.pending = new(map[int64] *FD)
+	go s.Run()
+	return s, nil
+}
+
+func (s *PollServer) AddFD(fd *FD, mode int) {
+	if err := s.poll.AddFD(fd.fd, mode, false); err != nil {
+		print("PollServer AddFD: ", err.String(), "\n")
+		return
+	}
+
+	key := fd.fd << 1
+	if mode == 'r' {
+		fd.ncr++
+	} else {
+		fd.ncw++
+		key++
+	}
+	s.pending[key] = fd
+}
+
+func (s *PollServer) LookupFD(fd int64, mode int) *FD {
+	key := fd << 1
+	if mode == 'w' {
+		key++
+	}
+	netfd, ok := s.pending[key]
+	if !ok {
+		return nil
+	}
+	s.pending[key] = nil, false
+	return netfd
+}
+
+func (s *PollServer) Run() {
+	var scratch [100]byte;
+	for {
+		fd, mode, err := s.poll.WaitFD();
+		if err != nil {
+			print("PollServer WaitFD: ", err.String(), "\n")
+			return
+		}
+		if fd == s.pr.fd {
+			// Drain our wakeup pipe.
+			for nn, e := s.pr.Read(&scratch); nn > 0; {
+				nn, e = s.pr.Read(&scratch)
+			}
+
+			// Read from channels
+			for fd, ok := <-s.cr; ok; fd, ok = <-s.cr {
+				s.AddFD(fd, 'r')
+			}
+			for fd, ok := <-s.cw; ok; fd, ok = <-s.cw {
+				s.AddFD(fd, 'w')
+			}
+		} else {
+			netfd := s.LookupFD(fd, mode)
+			if netfd == nil {
+				print("PollServer: unexpected wakeup for fd=", netfd, " mode=", string(mode), "\n")
+				continue
+			}
+			if mode == 'r' {
+				for netfd.ncr > 0 {
+					netfd.ncr--
+					netfd.cr <- netfd
+				}
+			} else {
+				for netfd.ncw > 0 {
+					netfd.ncw--
+					netfd.cw <- netfd
+				}
+			}
+		}
+	}
+}
+
+func (s *PollServer) Wakeup() {
+	var b [1]byte;
+	s.pw.Write(&b)
+}
+
+func (s *PollServer) WaitRead(fd *FD) {
+	s.cr <- fd;
+	s.Wakeup();
+	<-fd.cr
+}
+
+func (s *PollServer) WaitWrite(fd *FD) {
+	s.cr <- fd;
+	s.Wakeup();
+	<-fd.cr
+}
+
+
+// Network FD methods.
+// All the network FDs use a single PollServer.
+
+var pollserver *PollServer
+
+func StartServer() {
+	p, err := NewPollServer()
+	if err != nil {
+		print("Start PollServer: ", err.String(), "\n")
+	}
+	pollserver = p
+}
+
+export func NewFD(fd int64) (f *FD, err *os.Error) {
+	if pollserver == nil {
+		once.Do(&StartServer);
+	}
+	if err = SetNonblock(fd); err != nil {
+		return nil, err
+	}
+	f = new(FD);
+	f.fd = fd;
+	f.osfd = os.NewFD(fd);
+	f.cr = new(chan *FD, 1);
+	f.cw = new(chan *FD, 1);
+	return f, nil
+}
+
+func (fd *FD) Close() *os.Error {
+	if fd == nil || fd.osfd == nil {
+		return os.EINVAL
+	}
+	e := fd.osfd.Close();
+	fd.osfd = nil;
+	fd.fd = -1
+	return e
+}
+
+func (fd *FD) Read(p *[]byte) (n int, err *os.Error) {
+	if fd == nil || fd.osfd == nil {
+		return -1, os.EINVAL
+	}
+	n, err = fd.osfd.Read(p)
+	for err == os.EAGAIN {
+		pollserver.WaitRead(fd);
+		n, err = fd.osfd.Read(p)
+	}
+	return n, err
+}
+
+func (fd *FD) Write(p *[]byte) (n int, err *os.Error) {
+	if fd == nil || fd.osfd == nil {
+		return -1, os.EINVAL
+	}
+	err = nil;
+	nn := 0
+	for nn < len(p) && err == nil {
+		n, err = fd.osfd.Write(p[nn:len(p)]);
+		for err == os.EAGAIN {
+			pollserver.WaitWrite(fd);
+			n, err = fd.osfd.Write(p[nn:len(p)])
+		}
+		if n > 0 {
+			nn += n
+		}
+		if n == 0 {
+			break
+		}
+	}
+	return nn, err
+}
+
+func (fd *FD) Accept(sa *syscall.Sockaddr) (nfd *FD, err *os.Error) {
+	if fd == nil || fd.osfd == nil {
+		return nil, os.EINVAL
+	}
+	s, e := syscall.accept(fd.fd, sa)
+	for e == syscall.EAGAIN {
+		pollserver.WaitRead(fd);
+		s, e = syscall.accept(fd.fd, sa)
+	}
+	if e != 0 {
+		return nil, os.ErrnoToError(e)
+	}
+	if nfd, err = NewFD(s); err != nil {
+		syscall.close(s)
+		return nil, err
+	}
+	return nfd, nil
+}
+