document Conn interface better, in preparation
for per-method interface documentation
by mkdoc.pl.
implement timeouts on network reads
and use them in dns client.
also added locks on i/o to ensure writes
are not interlaced.
R=r
DELTA=340 (272 added, 25 deleted, 43 changed)
OCL=25799
CL=25874
diff --git a/src/lib/net/fd.go b/src/lib/net/fd.go
index 501a3f3..7509231 100644
--- a/src/lib/net/fd.go
+++ b/src/lib/net/fd.go
@@ -10,6 +10,7 @@
"net";
"once";
"os";
+ "sync";
"syscall";
)
@@ -24,6 +25,14 @@
laddr string;
raddr string;
+ // owned by client
+ rdeadline_delta int64;
+ rdeadline int64;
+ rio sync.Mutex;
+ wdeadline_delta int64;
+ wdeadline int64;
+ wio sync.Mutex;
+
// owned by fd wait server
ncr, ncw int;
}
@@ -41,6 +50,14 @@
return nil
}
+// Make reads/writes blocking; last gasp, so no error checking.
+func setBlock(fd int64) {
+ flags, e := syscall.Fcntl(fd, syscall.F_GETFL, 0);
+ if e != 0 {
+ return;
+ }
+ syscall.Fcntl(fd, syscall.F_SETFL, flags & ^syscall.O_NONBLOCK);
+}
// A pollServer helps FDs determine when to retry a non-blocking
// read or write after they get EAGAIN. When an FD needs to wait,
@@ -76,6 +93,7 @@
pr, pw *os.FD;
pending map[int64] *netFD;
poll *pollster; // low-level OS hooks
+ deadline int64; // next deadline (nsec since 1970)
}
func (s *pollServer) Run();
@@ -109,18 +127,24 @@
func (s *pollServer) AddFD(fd *netFD, mode int) {
if err := s.poll.AddFD(fd.fd, mode, false); err != nil {
- print("pollServer AddFD: ", err.String(), "\n");
+ panicln("pollServer AddFD ", fd.fd, ": ", err.String(), "\n");
return
}
+ var t int64;
key := fd.fd << 1;
if mode == 'r' {
fd.ncr++;
+ t = fd.rdeadline;
} else {
fd.ncw++;
key++;
+ t = fd.wdeadline;
}
- s.pending[key] = fd
+ s.pending[key] = fd;
+ if t > 0 && (s.deadline == 0 || t < s.deadline) {
+ s.deadline = t;
+ }
}
func (s *pollServer) LookupFD(fd int64, mode int) *netFD {
@@ -136,14 +160,88 @@
return netfd
}
+func (s *pollServer) WakeFD(fd *netFD, mode int) {
+ if mode == 'r' {
+ for fd.ncr > 0 {
+ fd.ncr--;
+ fd.cr <- fd
+ }
+ } else {
+ for fd.ncw > 0 {
+ fd.ncw--;
+ fd.cw <- fd
+ }
+ }
+}
+
+func (s *pollServer) Now() int64 {
+ sec, nsec, err := os.Time();
+ if err != nil {
+ panic("net: os.Time: ", err.String());
+ }
+ nsec += sec * 1e9;
+ return nsec;
+}
+
+func (s *pollServer) CheckDeadlines() {
+ now := s.Now();
+ // TODO(rsc): This will need to be handled more efficiently,
+ // probably with a heap indexed by wakeup time.
+
+ var next_deadline int64;
+ for key, fd := range s.pending {
+ var t int64;
+ var mode int;
+ if key&1 == 0 {
+ mode = 'r';
+ } else {
+ mode = 'w';
+ }
+ if mode == 'r' {
+ t = fd.rdeadline;
+ } else {
+ t = fd.wdeadline;
+ }
+ if t > 0 {
+ if t <= now {
+ s.pending[key] = nil, false;
+ if mode == 'r' {
+ s.poll.DelFD(fd.fd, mode);
+ fd.rdeadline = -1;
+ } else {
+ s.poll.DelFD(fd.fd, mode);
+ fd.wdeadline = -1;
+ }
+ s.WakeFD(fd, mode);
+ } else if next_deadline == 0 || t < next_deadline {
+ next_deadline = t;
+ }
+ }
+ }
+ s.deadline = next_deadline;
+}
+
func (s *pollServer) Run() {
var scratch [100]byte;
for {
- fd, mode, err := s.poll.WaitFD();
+ var t = s.deadline;
+ if t > 0 {
+ t = t - s.Now();
+ if t < 0 {
+ s.CheckDeadlines();
+ continue;
+ }
+ }
+ fd, mode, err := s.poll.WaitFD(t);
if err != nil {
print("pollServer WaitFD: ", err.String(), "\n");
return
}
+ if fd < 0 {
+ // Timeout happened.
+ s.CheckDeadlines();
+ continue;
+ }
if fd == s.pr.Fd() {
// Drain our wakeup pipe.
for nn, e := s.pr.Read(scratch); nn > 0; {
@@ -163,17 +261,7 @@
print("pollServer: unexpected wakeup for fd=", netfd, " mode=", string(mode), "\n");
continue
}
- if mode == 'r' {
- for netfd.ncr > 0 {
- netfd.ncr--;
- netfd.cr <- netfd
- }
- } else {
- for netfd.ncw > 0 {
- netfd.ncw--;
- netfd.cw <- netfd
- }
- }
+ s.WakeFD(netfd, mode);
}
}
}
@@ -231,6 +319,15 @@
if fd == nil || fd.osfd == nil {
return os.EINVAL
}
+
+ // In case the user has set linger,
+ // switch to blocking mode so the close blocks.
+ // As long as this doesn't happen often,
+ // we can handle the extra OS processes.
+ // Otherwise we'll need to use the pollserver
+ // for Close too. Sigh.
+ setBlock(fd.osfd.Fd());
+
e := fd.osfd.Close();
fd.osfd = nil;
fd.fd = -1;
@@ -241,8 +338,15 @@
if fd == nil || fd.osfd == nil {
return -1, os.EINVAL
}
+ fd.rio.Lock();
+ defer fd.rio.Unlock();
+ if fd.rdeadline_delta > 0 {
+ fd.rdeadline = pollserver.Now() + fd.rdeadline_delta;
+ } else {
+ fd.rdeadline = 0;
+ }
n, err = fd.osfd.Read(p);
- for err == os.EAGAIN {
+ for err == os.EAGAIN && fd.rdeadline >= 0 {
pollserver.WaitRead(fd);
n, err = fd.osfd.Read(p)
}
@@ -253,21 +357,29 @@
if fd == nil || fd.osfd == nil {
return -1, os.EINVAL
}
- // TODO(rsc): Lock fd while writing to avoid interlacing writes.
+ fd.wio.Lock();
+ defer fd.wio.Unlock();
+ if fd.wdeadline_delta > 0 {
+ fd.wdeadline = pollserver.Now() + fd.wdeadline_delta;
+ } else {
+ fd.wdeadline = 0;
+ }
err = nil;
nn := 0;
- for nn < len(p) && err == nil {
- // TODO(rsc): If os.FD.Write loops, have to use syscall instead.
+ for nn < len(p) {
n, err = fd.osfd.Write(p[nn:len(p)]);
- for err == os.EAGAIN {
- pollserver.WaitWrite(fd);
- n, err = fd.osfd.Write(p[nn:len(p)])
- }
if n > 0 {
nn += n
}
- if n == 0 {
- break
+ if nn == len(p) {
+ break;
+ }
+ if err == os.EAGAIN && fd.wdeadline >= 0 {
+ pollserver.WaitWrite(fd);
+ continue;
+ }
+ if n == 0 || err != nil {
+ break;
}
}
return nn, err