net: refactor poller into new internal/poll package
This will make it possible to use the poller with the os package.
This is a lot of code movement but the behavior is intended to be
unchanged.
Update #6817.
Update #7903.
Update #15021.
Update #18507.
Change-Id: I1413685928017c32df5654ded73a2643820977ae
Reviewed-on: https://go-review.googlesource.com/36799
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: David Crawshaw <crawshaw@golang.org>
Reviewed-by: Russ Cox <rsc@golang.org>
diff --git a/src/net/dial.go b/src/net/dial.go
index 50bba5a..0a7da40 100644
--- a/src/net/dial.go
+++ b/src/net/dial.go
@@ -7,6 +7,7 @@
import (
"context"
"internal/nettrace"
+ "internal/poll"
"time"
)
@@ -110,7 +111,7 @@
}
timeRemaining := deadline.Sub(now)
if timeRemaining <= 0 {
- return time.Time{}, errTimeout
+ return time.Time{}, poll.ErrTimeout
}
// Tentatively allocate equal time to each remaining address.
timeout := timeRemaining / time.Duration(addrsRemaining)
diff --git a/src/net/dial_test.go b/src/net/dial_test.go
index 9919d72..9825bc9 100644
--- a/src/net/dial_test.go
+++ b/src/net/dial_test.go
@@ -7,6 +7,7 @@
import (
"bufio"
"context"
+ "internal/poll"
"internal/testenv"
"io"
"net/internal/socktest"
@@ -94,7 +95,7 @@
default:
sw.Set(socktest.FilterConnect, func(so *socktest.Status) (socktest.AfterFilter, error) {
time.Sleep(2 * T)
- return nil, errTimeout
+ return nil, poll.ErrTimeout
})
defer sw.Set(socktest.FilterConnect, nil)
}
@@ -585,8 +586,8 @@
{now, noDeadline, 1, noDeadline, nil},
// Step the clock forward and cross the deadline.
{now.Add(-1 * time.Millisecond), now, 1, now, nil},
- {now.Add(0 * time.Millisecond), now, 1, noDeadline, errTimeout},
- {now.Add(1 * time.Millisecond), now, 1, noDeadline, errTimeout},
+ {now.Add(0 * time.Millisecond), now, 1, noDeadline, poll.ErrTimeout},
+ {now.Add(1 * time.Millisecond), now, 1, noDeadline, poll.ErrTimeout},
}
for i, tt := range testCases {
deadline, err := partialDeadline(tt.now, tt.deadline, tt.addrs)
diff --git a/src/net/dnsclient_unix_test.go b/src/net/dnsclient_unix_test.go
index 85267bb..4464804 100644
--- a/src/net/dnsclient_unix_test.go
+++ b/src/net/dnsclient_unix_test.go
@@ -9,6 +9,7 @@
import (
"context"
"fmt"
+ "internal/poll"
"internal/testenv"
"io/ioutil"
"os"
@@ -767,7 +768,7 @@
if s == "192.0.2.1:53" {
deadline0 = deadline
time.Sleep(10 * time.Millisecond)
- return nil, errTimeout
+ return nil, poll.ErrTimeout
}
if deadline == deadline0 {
diff --git a/src/net/error_posix.go b/src/net/error_posix.go
new file mode 100644
index 0000000..dd9754c
--- /dev/null
+++ b/src/net/error_posix.go
@@ -0,0 +1,21 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows
+
+package net
+
+import (
+ "os"
+ "syscall"
+)
+
+// wrapSyscallError takes an error and a syscall name. If the error is
+// a syscall.Errno, it wraps it in a os.SyscallError using the syscall name.
+func wrapSyscallError(name string, err error) error {
+ if _, ok := err.(syscall.Errno); ok {
+ err = os.NewSyscallError(name, err)
+ }
+ return err
+}
diff --git a/src/net/error_test.go b/src/net/error_test.go
index c23da49..61abfae 100644
--- a/src/net/error_test.go
+++ b/src/net/error_test.go
@@ -7,6 +7,7 @@
import (
"context"
"fmt"
+ "internal/poll"
"io"
"io/ioutil"
"net/internal/socktest"
@@ -87,7 +88,7 @@
return nil
}
switch err := nestedErr.(type) {
- case *AddrError, addrinfoErrno, *DNSError, InvalidAddrError, *ParseError, *timeoutError, UnknownNetworkError:
+ case *AddrError, addrinfoErrno, *DNSError, InvalidAddrError, *ParseError, *poll.TimeoutError, UnknownNetworkError:
return nil
case *os.SyscallError:
nestedErr = err.Err
@@ -97,7 +98,7 @@
goto third
}
switch nestedErr {
- case errCanceled, errClosing, errMissingAddress, errNoSuitableAddress,
+ case errCanceled, poll.ErrClosing, errMissingAddress, errNoSuitableAddress,
context.DeadlineExceeded, context.Canceled:
return nil
}
@@ -432,7 +433,7 @@
goto third
}
switch nestedErr {
- case errClosing, errTimeout:
+ case poll.ErrClosing, poll.ErrTimeout:
return nil
}
return fmt.Errorf("unexpected type on 2nd nested level: %T", nestedErr)
@@ -467,14 +468,14 @@
return nil
}
switch err := nestedErr.(type) {
- case *AddrError, addrinfoErrno, *DNSError, InvalidAddrError, *ParseError, *timeoutError, UnknownNetworkError:
+ case *AddrError, addrinfoErrno, *DNSError, InvalidAddrError, *ParseError, *poll.TimeoutError, UnknownNetworkError:
return nil
case *os.SyscallError:
nestedErr = err.Err
goto third
}
switch nestedErr {
- case errCanceled, errClosing, errMissingAddress, errTimeout, ErrWriteToConnected, io.ErrUnexpectedEOF:
+ case errCanceled, poll.ErrClosing, errMissingAddress, poll.ErrTimeout, ErrWriteToConnected, io.ErrUnexpectedEOF:
return nil
}
return fmt.Errorf("unexpected type on 2nd nested level: %T", nestedErr)
@@ -517,7 +518,7 @@
goto third
}
switch nestedErr {
- case errClosing:
+ case poll.ErrClosing:
return nil
}
return fmt.Errorf("unexpected type on 2nd nested level: %T", nestedErr)
@@ -613,7 +614,7 @@
goto third
}
switch nestedErr {
- case errClosing, errTimeout:
+ case poll.ErrClosing, poll.ErrTimeout:
return nil
}
return fmt.Errorf("unexpected type on 2nd nested level: %T", nestedErr)
@@ -692,7 +693,7 @@
goto third
}
switch nestedErr {
- case errClosing:
+ case poll.ErrClosing:
return nil
}
return fmt.Errorf("unexpected type on 2nd nested level: %T", nestedErr)
diff --git a/src/net/fd_io_plan9.go b/src/net/fd_io_plan9.go
deleted file mode 100644
index 76da0c5..0000000
--- a/src/net/fd_io_plan9.go
+++ /dev/null
@@ -1,93 +0,0 @@
-// Copyright 2016 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package net
-
-import (
- "os"
- "runtime"
- "sync"
- "syscall"
-)
-
-// asyncIO implements asynchronous cancelable I/O.
-// An asyncIO represents a single asynchronous Read or Write
-// operation. The result is returned on the result channel.
-// The undergoing I/O system call can either complete or be
-// interrupted by a note.
-type asyncIO struct {
- res chan result
-
- // mu guards the pid field.
- mu sync.Mutex
-
- // pid holds the process id of
- // the process running the IO operation.
- pid int
-}
-
-// result is the return value of a Read or Write operation.
-type result struct {
- n int
- err error
-}
-
-// newAsyncIO returns a new asyncIO that performs an I/O
-// operation by calling fn, which must do one and only one
-// interruptible system call.
-func newAsyncIO(fn func([]byte) (int, error), b []byte) *asyncIO {
- aio := &asyncIO{
- res: make(chan result, 0),
- }
- aio.mu.Lock()
- go func() {
- // Lock the current goroutine to its process
- // and store the pid in io so that Cancel can
- // interrupt it. We ignore the "hangup" signal,
- // so the signal does not take down the entire
- // Go runtime.
- runtime.LockOSThread()
- runtime_ignoreHangup()
- aio.pid = os.Getpid()
- aio.mu.Unlock()
-
- n, err := fn(b)
-
- aio.mu.Lock()
- aio.pid = -1
- runtime_unignoreHangup()
- aio.mu.Unlock()
-
- aio.res <- result{n, err}
- }()
- return aio
-}
-
-var hangupNote os.Signal = syscall.Note("hangup")
-
-// Cancel interrupts the I/O operation, causing
-// the Wait function to return.
-func (aio *asyncIO) Cancel() {
- aio.mu.Lock()
- defer aio.mu.Unlock()
- if aio.pid == -1 {
- return
- }
- proc, err := os.FindProcess(aio.pid)
- if err != nil {
- return
- }
- proc.Signal(hangupNote)
-}
-
-// Wait for the I/O operation to complete.
-func (aio *asyncIO) Wait() (int, error) {
- res := <-aio.res
- return res.n, res.err
-}
-
-// The following functions, provided by the runtime, are used to
-// ignore and unignore the "hangup" signal received by the process.
-func runtime_ignoreHangup()
-func runtime_unignoreHangup()
diff --git a/src/net/fd_mutex.go b/src/net/fd_mutex.go
deleted file mode 100644
index 4591fd1..0000000
--- a/src/net/fd_mutex.go
+++ /dev/null
@@ -1,249 +0,0 @@
-// Copyright 2013 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package net
-
-import "sync/atomic"
-
-// fdMutex is a specialized synchronization primitive that manages
-// lifetime of an fd and serializes access to Read, Write and Close
-// methods on netFD.
-type fdMutex struct {
- state uint64
- rsema uint32
- wsema uint32
-}
-
-// fdMutex.state is organized as follows:
-// 1 bit - whether netFD is closed, if set all subsequent lock operations will fail.
-// 1 bit - lock for read operations.
-// 1 bit - lock for write operations.
-// 20 bits - total number of references (read+write+misc).
-// 20 bits - number of outstanding read waiters.
-// 20 bits - number of outstanding write waiters.
-const (
- mutexClosed = 1 << 0
- mutexRLock = 1 << 1
- mutexWLock = 1 << 2
- mutexRef = 1 << 3
- mutexRefMask = (1<<20 - 1) << 3
- mutexRWait = 1 << 23
- mutexRMask = (1<<20 - 1) << 23
- mutexWWait = 1 << 43
- mutexWMask = (1<<20 - 1) << 43
-)
-
-// Read operations must do rwlock(true)/rwunlock(true).
-//
-// Write operations must do rwlock(false)/rwunlock(false).
-//
-// Misc operations must do incref/decref.
-// Misc operations include functions like setsockopt and setDeadline.
-// They need to use incref/decref to ensure that they operate on the
-// correct fd in presence of a concurrent close call (otherwise fd can
-// be closed under their feet).
-//
-// Close operations must do increfAndClose/decref.
-
-// incref adds a reference to mu.
-// It reports whether mu is available for reading or writing.
-func (mu *fdMutex) incref() bool {
- for {
- old := atomic.LoadUint64(&mu.state)
- if old&mutexClosed != 0 {
- return false
- }
- new := old + mutexRef
- if new&mutexRefMask == 0 {
- panic("net: inconsistent fdMutex")
- }
- if atomic.CompareAndSwapUint64(&mu.state, old, new) {
- return true
- }
- }
-}
-
-// increfAndClose sets the state of mu to closed.
-// It reports whether there is no remaining reference.
-func (mu *fdMutex) increfAndClose() bool {
- for {
- old := atomic.LoadUint64(&mu.state)
- if old&mutexClosed != 0 {
- return false
- }
- // Mark as closed and acquire a reference.
- new := (old | mutexClosed) + mutexRef
- if new&mutexRefMask == 0 {
- panic("net: inconsistent fdMutex")
- }
- // Remove all read and write waiters.
- new &^= mutexRMask | mutexWMask
- if atomic.CompareAndSwapUint64(&mu.state, old, new) {
- // Wake all read and write waiters,
- // they will observe closed flag after wakeup.
- for old&mutexRMask != 0 {
- old -= mutexRWait
- runtime_Semrelease(&mu.rsema)
- }
- for old&mutexWMask != 0 {
- old -= mutexWWait
- runtime_Semrelease(&mu.wsema)
- }
- return true
- }
- }
-}
-
-// decref removes a reference from mu.
-// It reports whether there is no remaining reference.
-func (mu *fdMutex) decref() bool {
- for {
- old := atomic.LoadUint64(&mu.state)
- if old&mutexRefMask == 0 {
- panic("net: inconsistent fdMutex")
- }
- new := old - mutexRef
- if atomic.CompareAndSwapUint64(&mu.state, old, new) {
- return new&(mutexClosed|mutexRefMask) == mutexClosed
- }
- }
-}
-
-// lock adds a reference to mu and locks mu.
-// It reports whether mu is available for reading or writing.
-func (mu *fdMutex) rwlock(read bool) bool {
- var mutexBit, mutexWait, mutexMask uint64
- var mutexSema *uint32
- if read {
- mutexBit = mutexRLock
- mutexWait = mutexRWait
- mutexMask = mutexRMask
- mutexSema = &mu.rsema
- } else {
- mutexBit = mutexWLock
- mutexWait = mutexWWait
- mutexMask = mutexWMask
- mutexSema = &mu.wsema
- }
- for {
- old := atomic.LoadUint64(&mu.state)
- if old&mutexClosed != 0 {
- return false
- }
- var new uint64
- if old&mutexBit == 0 {
- // Lock is free, acquire it.
- new = (old | mutexBit) + mutexRef
- if new&mutexRefMask == 0 {
- panic("net: inconsistent fdMutex")
- }
- } else {
- // Wait for lock.
- new = old + mutexWait
- if new&mutexMask == 0 {
- panic("net: inconsistent fdMutex")
- }
- }
- if atomic.CompareAndSwapUint64(&mu.state, old, new) {
- if old&mutexBit == 0 {
- return true
- }
- runtime_Semacquire(mutexSema)
- // The signaller has subtracted mutexWait.
- }
- }
-}
-
-// unlock removes a reference from mu and unlocks mu.
-// It reports whether there is no remaining reference.
-func (mu *fdMutex) rwunlock(read bool) bool {
- var mutexBit, mutexWait, mutexMask uint64
- var mutexSema *uint32
- if read {
- mutexBit = mutexRLock
- mutexWait = mutexRWait
- mutexMask = mutexRMask
- mutexSema = &mu.rsema
- } else {
- mutexBit = mutexWLock
- mutexWait = mutexWWait
- mutexMask = mutexWMask
- mutexSema = &mu.wsema
- }
- for {
- old := atomic.LoadUint64(&mu.state)
- if old&mutexBit == 0 || old&mutexRefMask == 0 {
- panic("net: inconsistent fdMutex")
- }
- // Drop lock, drop reference and wake read waiter if present.
- new := (old &^ mutexBit) - mutexRef
- if old&mutexMask != 0 {
- new -= mutexWait
- }
- if atomic.CompareAndSwapUint64(&mu.state, old, new) {
- if old&mutexMask != 0 {
- runtime_Semrelease(mutexSema)
- }
- return new&(mutexClosed|mutexRefMask) == mutexClosed
- }
- }
-}
-
-// Implemented in runtime package.
-func runtime_Semacquire(sema *uint32)
-func runtime_Semrelease(sema *uint32)
-
-// incref adds a reference to fd.
-// It returns an error when fd cannot be used.
-func (fd *netFD) incref() error {
- if !fd.fdmu.incref() {
- return errClosing
- }
- return nil
-}
-
-// decref removes a reference from fd.
-// It also closes fd when the state of fd is set to closed and there
-// is no remaining reference.
-func (fd *netFD) decref() {
- if fd.fdmu.decref() {
- fd.destroy()
- }
-}
-
-// readLock adds a reference to fd and locks fd for reading.
-// It returns an error when fd cannot be used for reading.
-func (fd *netFD) readLock() error {
- if !fd.fdmu.rwlock(true) {
- return errClosing
- }
- return nil
-}
-
-// readUnlock removes a reference from fd and unlocks fd for reading.
-// It also closes fd when the state of fd is set to closed and there
-// is no remaining reference.
-func (fd *netFD) readUnlock() {
- if fd.fdmu.rwunlock(true) {
- fd.destroy()
- }
-}
-
-// writeLock adds a reference to fd and locks fd for writing.
-// It returns an error when fd cannot be used for writing.
-func (fd *netFD) writeLock() error {
- if !fd.fdmu.rwlock(false) {
- return errClosing
- }
- return nil
-}
-
-// writeUnlock removes a reference from fd and unlocks fd for writing.
-// It also closes fd when the state of fd is set to closed and there
-// is no remaining reference.
-func (fd *netFD) writeUnlock() {
- if fd.fdmu.rwunlock(false) {
- fd.destroy()
- }
-}
diff --git a/src/net/fd_mutex_test.go b/src/net/fd_mutex_test.go
deleted file mode 100644
index 3542c70..0000000
--- a/src/net/fd_mutex_test.go
+++ /dev/null
@@ -1,195 +0,0 @@
-// Copyright 2013 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package net
-
-import (
- "math/rand"
- "runtime"
- "testing"
- "time"
-)
-
-func TestMutexLock(t *testing.T) {
- var mu fdMutex
-
- if !mu.incref() {
- t.Fatal("broken")
- }
- if mu.decref() {
- t.Fatal("broken")
- }
-
- if !mu.rwlock(true) {
- t.Fatal("broken")
- }
- if mu.rwunlock(true) {
- t.Fatal("broken")
- }
-
- if !mu.rwlock(false) {
- t.Fatal("broken")
- }
- if mu.rwunlock(false) {
- t.Fatal("broken")
- }
-}
-
-func TestMutexClose(t *testing.T) {
- var mu fdMutex
- if !mu.increfAndClose() {
- t.Fatal("broken")
- }
-
- if mu.incref() {
- t.Fatal("broken")
- }
- if mu.rwlock(true) {
- t.Fatal("broken")
- }
- if mu.rwlock(false) {
- t.Fatal("broken")
- }
- if mu.increfAndClose() {
- t.Fatal("broken")
- }
-}
-
-func TestMutexCloseUnblock(t *testing.T) {
- c := make(chan bool)
- var mu fdMutex
- mu.rwlock(true)
- for i := 0; i < 4; i++ {
- go func() {
- if mu.rwlock(true) {
- t.Error("broken")
- return
- }
- c <- true
- }()
- }
- // Concurrent goroutines must not be able to read lock the mutex.
- time.Sleep(time.Millisecond)
- select {
- case <-c:
- t.Fatal("broken")
- default:
- }
- mu.increfAndClose() // Must unblock the readers.
- for i := 0; i < 4; i++ {
- select {
- case <-c:
- case <-time.After(10 * time.Second):
- t.Fatal("broken")
- }
- }
- if mu.decref() {
- t.Fatal("broken")
- }
- if !mu.rwunlock(true) {
- t.Fatal("broken")
- }
-}
-
-func TestMutexPanic(t *testing.T) {
- ensurePanics := func(f func()) {
- defer func() {
- if recover() == nil {
- t.Fatal("does not panic")
- }
- }()
- f()
- }
-
- var mu fdMutex
- ensurePanics(func() { mu.decref() })
- ensurePanics(func() { mu.rwunlock(true) })
- ensurePanics(func() { mu.rwunlock(false) })
-
- ensurePanics(func() { mu.incref(); mu.decref(); mu.decref() })
- ensurePanics(func() { mu.rwlock(true); mu.rwunlock(true); mu.rwunlock(true) })
- ensurePanics(func() { mu.rwlock(false); mu.rwunlock(false); mu.rwunlock(false) })
-
- // ensure that it's still not broken
- mu.incref()
- mu.decref()
- mu.rwlock(true)
- mu.rwunlock(true)
- mu.rwlock(false)
- mu.rwunlock(false)
-}
-
-func TestMutexStress(t *testing.T) {
- P := 8
- N := int(1e6)
- if testing.Short() {
- P = 4
- N = 1e4
- }
- defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P))
- done := make(chan bool)
- var mu fdMutex
- var readState [2]uint64
- var writeState [2]uint64
- for p := 0; p < P; p++ {
- go func() {
- r := rand.New(rand.NewSource(rand.Int63()))
- for i := 0; i < N; i++ {
- switch r.Intn(3) {
- case 0:
- if !mu.incref() {
- t.Error("broken")
- return
- }
- if mu.decref() {
- t.Error("broken")
- return
- }
- case 1:
- if !mu.rwlock(true) {
- t.Error("broken")
- return
- }
- // Ensure that it provides mutual exclusion for readers.
- if readState[0] != readState[1] {
- t.Error("broken")
- return
- }
- readState[0]++
- readState[1]++
- if mu.rwunlock(true) {
- t.Error("broken")
- return
- }
- case 2:
- if !mu.rwlock(false) {
- t.Error("broken")
- return
- }
- // Ensure that it provides mutual exclusion for writers.
- if writeState[0] != writeState[1] {
- t.Error("broken")
- return
- }
- writeState[0]++
- writeState[1]++
- if mu.rwunlock(false) {
- t.Error("broken")
- return
- }
- }
- }
- done <- true
- }()
- }
- for p := 0; p < P; p++ {
- <-done
- }
- if !mu.increfAndClose() {
- t.Fatal("broken")
- }
- if !mu.decref() {
- t.Fatal("broken")
- }
-}
diff --git a/src/net/fd_plan9.go b/src/net/fd_plan9.go
index 300d8c4..7496e36 100644
--- a/src/net/fd_plan9.go
+++ b/src/net/fd_plan9.go
@@ -5,23 +5,15 @@
package net
import (
+ "internal/poll"
"io"
"os"
- "sync/atomic"
"syscall"
- "time"
)
-type atomicBool int32
-
-func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 }
-func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) }
-func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) }
-
// Network file descriptor.
type netFD struct {
- // locking/lifetime of sysfd + serialize access to Read and Write methods
- fdmu fdMutex
+ pfd poll.FD
// immutable until Close
net string
@@ -30,14 +22,6 @@
listen, ctl, data *os.File
laddr, raddr Addr
isStream bool
-
- // deadlines
- raio *asyncIO
- waio *asyncIO
- rtimer *time.Timer
- wtimer *time.Timer
- rtimedout atomicBool // set true when read deadline has been reached
- wtimedout atomicBool // set true when write deadline has been reached
}
var (
@@ -49,7 +33,7 @@
}
func newFD(net, name string, listen, ctl, data *os.File, laddr, raddr Addr) (*netFD, error) {
- return &netFD{
+ ret := &netFD{
net: net,
n: name,
dir: netdir + "/" + net + "/" + name,
@@ -57,7 +41,9 @@
ctl: ctl, data: data,
laddr: laddr,
raddr: raddr,
- }, nil
+ }
+ ret.pfd.Destroy = ret.destroy
+ return ret, nil
}
func (fd *netFD) init() error {
@@ -99,28 +85,10 @@
}
func (fd *netFD) Read(b []byte) (n int, err error) {
- if fd.rtimedout.isSet() {
- return 0, errTimeout
- }
if !fd.ok() || fd.data == nil {
return 0, syscall.EINVAL
}
- if err := fd.readLock(); err != nil {
- return 0, err
- }
- defer fd.readUnlock()
- if len(b) == 0 {
- return 0, nil
- }
- fd.raio = newAsyncIO(fd.data.Read, b)
- n, err = fd.raio.Wait()
- fd.raio = nil
- if isHangup(err) {
- err = io.EOF
- }
- if isInterrupted(err) {
- err = errTimeout
- }
+ n, err = fd.pfd.Read(fd.data.Read, b)
if fd.net == "udp" && err == io.EOF {
n = 0
err = nil
@@ -129,23 +97,10 @@
}
func (fd *netFD) Write(b []byte) (n int, err error) {
- if fd.wtimedout.isSet() {
- return 0, errTimeout
- }
if !fd.ok() || fd.data == nil {
return 0, syscall.EINVAL
}
- if err := fd.writeLock(); err != nil {
- return 0, err
- }
- defer fd.writeUnlock()
- fd.waio = newAsyncIO(fd.data.Write, b)
- n, err = fd.waio.Wait()
- fd.waio = nil
- if isInterrupted(err) {
- err = errTimeout
- }
- return
+ return fd.pfd.Write(fd.data.Write, b)
}
func (fd *netFD) closeRead() error {
@@ -163,8 +118,8 @@
}
func (fd *netFD) Close() error {
- if !fd.fdmu.increfAndClose() {
- return errClosing
+ if err := fd.pfd.Close(); err != nil {
+ return err
}
if !fd.ok() {
return syscall.EINVAL
@@ -216,77 +171,6 @@
return os.NewFile(uintptr(dfd), s), nil
}
-func (fd *netFD) setDeadline(t time.Time) error {
- return setDeadlineImpl(fd, t, 'r'+'w')
-}
-
-func (fd *netFD) setReadDeadline(t time.Time) error {
- return setDeadlineImpl(fd, t, 'r')
-}
-
-func (fd *netFD) setWriteDeadline(t time.Time) error {
- return setDeadlineImpl(fd, t, 'w')
-}
-
-func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
- d := t.Sub(time.Now())
- if mode == 'r' || mode == 'r'+'w' {
- fd.rtimedout.setFalse()
- }
- if mode == 'w' || mode == 'r'+'w' {
- fd.wtimedout.setFalse()
- }
- if t.IsZero() || d < 0 {
- // Stop timer
- if mode == 'r' || mode == 'r'+'w' {
- if fd.rtimer != nil {
- fd.rtimer.Stop()
- }
- fd.rtimer = nil
- }
- if mode == 'w' || mode == 'r'+'w' {
- if fd.wtimer != nil {
- fd.wtimer.Stop()
- }
- fd.wtimer = nil
- }
- } else {
- // Interrupt I/O operation once timer has expired
- if mode == 'r' || mode == 'r'+'w' {
- fd.rtimer = time.AfterFunc(d, func() {
- fd.rtimedout.setTrue()
- if fd.raio != nil {
- fd.raio.Cancel()
- }
- })
- }
- if mode == 'w' || mode == 'r'+'w' {
- fd.wtimer = time.AfterFunc(d, func() {
- fd.wtimedout.setTrue()
- if fd.waio != nil {
- fd.waio.Cancel()
- }
- })
- }
- }
- if !t.IsZero() && d < 0 {
- // Interrupt current I/O operation
- if mode == 'r' || mode == 'r'+'w' {
- fd.rtimedout.setTrue()
- if fd.raio != nil {
- fd.raio.Cancel()
- }
- }
- if mode == 'w' || mode == 'r'+'w' {
- fd.wtimedout.setTrue()
- if fd.waio != nil {
- fd.waio.Cancel()
- }
- }
- }
- return nil
-}
-
func setReadBuffer(fd *netFD, bytes int) error {
return syscall.EPLAN9
}
@@ -294,11 +178,3 @@
func setWriteBuffer(fd *netFD, bytes int) error {
return syscall.EPLAN9
}
-
-func isHangup(err error) bool {
- return err != nil && stringsHasSuffix(err.Error(), "Hangup")
-}
-
-func isInterrupted(err error) bool {
- return err != nil && stringsHasSuffix(err.Error(), "interrupted")
-}
diff --git a/src/net/fd_poll_nacl.go b/src/net/fd_poll_nacl.go
deleted file mode 100644
index 8398760..0000000
--- a/src/net/fd_poll_nacl.go
+++ /dev/null
@@ -1,89 +0,0 @@
-// Copyright 2013 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package net
-
-import (
- "runtime"
- "syscall"
- "time"
-)
-
-type pollDesc struct {
- fd *netFD
- closing bool
-}
-
-func (pd *pollDesc) init(fd *netFD) error { pd.fd = fd; return nil }
-
-func (pd *pollDesc) close() {}
-
-func (pd *pollDesc) evict() {
- pd.closing = true
- if pd.fd != nil {
- syscall.StopIO(pd.fd.sysfd)
- runtime.KeepAlive(pd.fd)
- }
-}
-
-func (pd *pollDesc) prepare(mode int) error {
- if pd.closing {
- return errClosing
- }
- return nil
-}
-
-func (pd *pollDesc) prepareRead() error { return pd.prepare('r') }
-
-func (pd *pollDesc) prepareWrite() error { return pd.prepare('w') }
-
-func (pd *pollDesc) wait(mode int) error {
- if pd.closing {
- return errClosing
- }
- return errTimeout
-}
-
-func (pd *pollDesc) waitRead() error { return pd.wait('r') }
-
-func (pd *pollDesc) waitWrite() error { return pd.wait('w') }
-
-func (pd *pollDesc) waitCanceled(mode int) {}
-
-func (pd *pollDesc) waitCanceledRead() {}
-
-func (pd *pollDesc) waitCanceledWrite() {}
-
-func (fd *netFD) setDeadline(t time.Time) error {
- return setDeadlineImpl(fd, t, 'r'+'w')
-}
-
-func (fd *netFD) setReadDeadline(t time.Time) error {
- return setDeadlineImpl(fd, t, 'r')
-}
-
-func (fd *netFD) setWriteDeadline(t time.Time) error {
- return setDeadlineImpl(fd, t, 'w')
-}
-
-func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
- d := t.UnixNano()
- if t.IsZero() {
- d = 0
- }
- if err := fd.incref(); err != nil {
- return err
- }
- switch mode {
- case 'r':
- syscall.SetReadDeadline(fd.sysfd, d)
- case 'w':
- syscall.SetWriteDeadline(fd.sysfd, d)
- case 'r' + 'w':
- syscall.SetReadDeadline(fd.sysfd, d)
- syscall.SetWriteDeadline(fd.sysfd, d)
- }
- fd.decref()
- return nil
-}
diff --git a/src/net/fd_poll_runtime.go b/src/net/fd_poll_runtime.go
deleted file mode 100644
index 62b69fc..0000000
--- a/src/net/fd_poll_runtime.go
+++ /dev/null
@@ -1,141 +0,0 @@
-// Copyright 2013 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build darwin dragonfly freebsd linux netbsd openbsd windows solaris
-
-package net
-
-import (
- "runtime"
- "sync"
- "syscall"
- "time"
-)
-
-// runtimeNano returns the current value of the runtime clock in nanoseconds.
-func runtimeNano() int64
-
-func runtime_pollServerInit()
-func runtime_pollOpen(fd uintptr) (uintptr, int)
-func runtime_pollClose(ctx uintptr)
-func runtime_pollWait(ctx uintptr, mode int) int
-func runtime_pollWaitCanceled(ctx uintptr, mode int) int
-func runtime_pollReset(ctx uintptr, mode int) int
-func runtime_pollSetDeadline(ctx uintptr, d int64, mode int)
-func runtime_pollUnblock(ctx uintptr)
-
-type pollDesc struct {
- runtimeCtx uintptr
-}
-
-var serverInit sync.Once
-
-func (pd *pollDesc) init(fd *netFD) error {
- serverInit.Do(runtime_pollServerInit)
- ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
- runtime.KeepAlive(fd)
- if errno != 0 {
- return syscall.Errno(errno)
- }
- pd.runtimeCtx = ctx
- return nil
-}
-
-func (pd *pollDesc) close() {
- if pd.runtimeCtx == 0 {
- return
- }
- runtime_pollClose(pd.runtimeCtx)
- pd.runtimeCtx = 0
-}
-
-// Evict evicts fd from the pending list, unblocking any I/O running on fd.
-func (pd *pollDesc) evict() {
- if pd.runtimeCtx == 0 {
- return
- }
- runtime_pollUnblock(pd.runtimeCtx)
-}
-
-func (pd *pollDesc) prepare(mode int) error {
- res := runtime_pollReset(pd.runtimeCtx, mode)
- return convertErr(res)
-}
-
-func (pd *pollDesc) prepareRead() error {
- return pd.prepare('r')
-}
-
-func (pd *pollDesc) prepareWrite() error {
- return pd.prepare('w')
-}
-
-func (pd *pollDesc) wait(mode int) error {
- res := runtime_pollWait(pd.runtimeCtx, mode)
- return convertErr(res)
-}
-
-func (pd *pollDesc) waitRead() error {
- return pd.wait('r')
-}
-
-func (pd *pollDesc) waitWrite() error {
- return pd.wait('w')
-}
-
-func (pd *pollDesc) waitCanceled(mode int) {
- runtime_pollWaitCanceled(pd.runtimeCtx, mode)
-}
-
-func (pd *pollDesc) waitCanceledRead() {
- pd.waitCanceled('r')
-}
-
-func (pd *pollDesc) waitCanceledWrite() {
- pd.waitCanceled('w')
-}
-
-func convertErr(res int) error {
- switch res {
- case 0:
- return nil
- case 1:
- return errClosing
- case 2:
- return errTimeout
- }
- println("unreachable: ", res)
- panic("unreachable")
-}
-
-func (fd *netFD) setDeadline(t time.Time) error {
- return setDeadlineImpl(fd, t, 'r'+'w')
-}
-
-func (fd *netFD) setReadDeadline(t time.Time) error {
- return setDeadlineImpl(fd, t, 'r')
-}
-
-func (fd *netFD) setWriteDeadline(t time.Time) error {
- return setDeadlineImpl(fd, t, 'w')
-}
-
-func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
- diff := int64(time.Until(t))
- d := runtimeNano() + diff
- if d <= 0 && diff > 0 {
- // If the user has a deadline in the future, but the delay calculation
- // overflows, then set the deadline to the maximum possible value.
- d = 1<<63 - 1
- }
- if t.IsZero() {
- d = 0
- }
- if err := fd.incref(); err != nil {
- return err
- }
- runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
- fd.decref()
- return nil
-}
diff --git a/src/net/fd_posix.go b/src/net/fd_posix.go
deleted file mode 100644
index b4b908a..0000000
--- a/src/net/fd_posix.go
+++ /dev/null
@@ -1,21 +0,0 @@
-// Copyright 2009 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows
-
-package net
-
-import (
- "io"
- "syscall"
-)
-
-// eofError returns io.EOF when fd is available for reading end of
-// file.
-func (fd *netFD) eofError(n int, err error) error {
- if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {
- return io.EOF
- }
- return err
-}
diff --git a/src/net/fd_posix_test.go b/src/net/fd_posix_test.go
deleted file mode 100644
index 85711ef..0000000
--- a/src/net/fd_posix_test.go
+++ /dev/null
@@ -1,57 +0,0 @@
-// Copyright 2012 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows
-
-package net
-
-import (
- "io"
- "syscall"
- "testing"
-)
-
-var eofErrorTests = []struct {
- n int
- err error
- fd *netFD
- expected error
-}{
- {100, nil, &netFD{sotype: syscall.SOCK_STREAM}, nil},
- {100, io.EOF, &netFD{sotype: syscall.SOCK_STREAM}, io.EOF},
- {100, errClosing, &netFD{sotype: syscall.SOCK_STREAM}, errClosing},
- {0, nil, &netFD{sotype: syscall.SOCK_STREAM}, io.EOF},
- {0, io.EOF, &netFD{sotype: syscall.SOCK_STREAM}, io.EOF},
- {0, errClosing, &netFD{sotype: syscall.SOCK_STREAM}, errClosing},
-
- {100, nil, &netFD{sotype: syscall.SOCK_DGRAM}, nil},
- {100, io.EOF, &netFD{sotype: syscall.SOCK_DGRAM}, io.EOF},
- {100, errClosing, &netFD{sotype: syscall.SOCK_DGRAM}, errClosing},
- {0, nil, &netFD{sotype: syscall.SOCK_DGRAM}, nil},
- {0, io.EOF, &netFD{sotype: syscall.SOCK_DGRAM}, io.EOF},
- {0, errClosing, &netFD{sotype: syscall.SOCK_DGRAM}, errClosing},
-
- {100, nil, &netFD{sotype: syscall.SOCK_SEQPACKET}, nil},
- {100, io.EOF, &netFD{sotype: syscall.SOCK_SEQPACKET}, io.EOF},
- {100, errClosing, &netFD{sotype: syscall.SOCK_SEQPACKET}, errClosing},
- {0, nil, &netFD{sotype: syscall.SOCK_SEQPACKET}, io.EOF},
- {0, io.EOF, &netFD{sotype: syscall.SOCK_SEQPACKET}, io.EOF},
- {0, errClosing, &netFD{sotype: syscall.SOCK_SEQPACKET}, errClosing},
-
- {100, nil, &netFD{sotype: syscall.SOCK_RAW}, nil},
- {100, io.EOF, &netFD{sotype: syscall.SOCK_RAW}, io.EOF},
- {100, errClosing, &netFD{sotype: syscall.SOCK_RAW}, errClosing},
- {0, nil, &netFD{sotype: syscall.SOCK_RAW}, nil},
- {0, io.EOF, &netFD{sotype: syscall.SOCK_RAW}, io.EOF},
- {0, errClosing, &netFD{sotype: syscall.SOCK_RAW}, errClosing},
-}
-
-func TestEOFError(t *testing.T) {
- for _, tt := range eofErrorTests {
- actual := tt.fd.eofError(tt.n, tt.err)
- if actual != tt.expected {
- t.Errorf("eofError(%v, %v, %v): expected %v, actual %v", tt.n, tt.err, tt.fd.sotype, tt.expected, actual)
- }
- }
-}
diff --git a/src/net/fd_unix.go b/src/net/fd_unix.go
index 3c95fc0..9f36069 100644
--- a/src/net/fd_unix.go
+++ b/src/net/fd_unix.go
@@ -8,7 +8,7 @@
import (
"context"
- "io"
+ "internal/poll"
"os"
"runtime"
"sync/atomic"
@@ -17,38 +17,36 @@
// Network file descriptor.
type netFD struct {
- // locking/lifetime of sysfd + serialize access to Read and Write methods
- fdmu fdMutex
+ pfd poll.FD
// immutable until Close
- sysfd int
family int
sotype int
- isStream bool
isConnected bool
net string
laddr Addr
raddr Addr
-
- // writev cache.
- iovecs *[]syscall.Iovec
-
- // wait server
- pd pollDesc
}
func sysInit() {
}
func newFD(sysfd, family, sotype int, net string) (*netFD, error) {
- return &netFD{sysfd: sysfd, family: family, sotype: sotype, net: net, isStream: sotype == syscall.SOCK_STREAM}, nil
+ ret := &netFD{
+ pfd: poll.FD{
+ Sysfd: sysfd,
+ IsStream: sotype == syscall.SOCK_STREAM,
+ ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
+ },
+ family: family,
+ sotype: sotype,
+ net: net,
+ }
+ return ret, nil
}
func (fd *netFD) init() error {
- if err := fd.pd.init(fd); err != nil {
- return err
- }
- return nil
+ return fd.pfd.Init()
}
func (fd *netFD) setAddr(laddr, raddr Addr) {
@@ -72,7 +70,7 @@
// Do not need to call fd.writeLock here,
// because fd is not yet accessible to user,
// so no concurrent operations are possible.
- switch err := connectFunc(fd.sysfd, ra); err {
+ switch err := connectFunc(fd.pfd.Sysfd, ra); err {
case syscall.EINPROGRESS, syscall.EALREADY, syscall.EINTR:
case nil, syscall.EISCONN:
select {
@@ -80,9 +78,10 @@
return mapErr(ctx.Err())
default:
}
- if err := fd.init(); err != nil {
+ if err := fd.pfd.Init(); err != nil {
return err
}
+ runtime.KeepAlive(fd)
return nil
case syscall.EINVAL:
// On Solaris we can see EINVAL if the socket has
@@ -97,12 +96,12 @@
default:
return os.NewSyscallError("connect", err)
}
- if err := fd.init(); err != nil {
+ if err := fd.pfd.Init(); err != nil {
return err
}
if deadline, _ := ctx.Deadline(); !deadline.IsZero() {
- fd.setWriteDeadline(deadline)
- defer fd.setWriteDeadline(noDeadline)
+ fd.pfd.SetWriteDeadline(deadline)
+ defer fd.pfd.SetWriteDeadline(noDeadline)
}
// Start the "interrupter" goroutine, if this context might be canceled.
@@ -119,7 +118,7 @@
defer func() {
close(done)
if ctxErr := <-interruptRes; ctxErr != nil && ret == nil {
- // The interrupter goroutine called setWriteDeadline,
+ // The interrupter goroutine called SetWriteDeadline,
// but the connect code below had returned from
// waitWrite already and did a successful connect (ret
// == nil). Because we've now poisoned the connection
@@ -135,7 +134,7 @@
// Force the runtime's poller to immediately give up
// waiting for writability, unblocking waitWrite
// below.
- fd.setWriteDeadline(aLongTimeAgo)
+ fd.pfd.SetWriteDeadline(aLongTimeAgo)
testHookCanceledDial()
interruptRes <- ctx.Err()
case <-done:
@@ -153,7 +152,7 @@
// SO_ERROR socket option to see if the connection
// succeeded or failed. See issue 7474 for further
// details.
- if err := fd.pd.waitWrite(); err != nil {
+ if err := fd.pfd.WaitWrite(); err != nil {
select {
case <-ctx.Done():
return mapErr(ctx.Err())
@@ -161,7 +160,7 @@
}
return err
}
- nerr, err := getsockoptIntFunc(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR)
+ nerr, err := getsockoptIntFunc(fd.pfd.Sysfd, syscall.SOL_SOCKET, syscall.SO_ERROR)
if err != nil {
return os.NewSyscallError("getsockopt", err)
}
@@ -174,45 +173,26 @@
// See golang.org/issue/14548.
// On Darwin, multiple connect system calls on
// a non-blocking socket never harm SO_ERROR.
- switch err := connectFunc(fd.sysfd, ra); err {
+ switch err := connectFunc(fd.pfd.Sysfd, ra); err {
case nil, syscall.EISCONN:
return nil
}
default:
return os.NewSyscallError("getsockopt", err)
}
+ runtime.KeepAlive(fd)
}
}
-func (fd *netFD) destroy() {
- // Poller may want to unregister fd in readiness notification mechanism,
- // so this must be executed before closeFunc.
- fd.pd.close()
- closeFunc(fd.sysfd)
- fd.sysfd = -1
- runtime.SetFinalizer(fd, nil)
-}
-
func (fd *netFD) Close() error {
- if !fd.fdmu.increfAndClose() {
- return errClosing
- }
- // Unblock any I/O. Once it all unblocks and returns,
- // so that it cannot be referring to fd.sysfd anymore,
- // the final decref will close fd.sysfd. This should happen
- // fairly quickly, since all the I/O is non-blocking, and any
- // attempts to block in the pollDesc will return errClosing.
- fd.pd.evict()
- fd.decref()
- return nil
+ runtime.SetFinalizer(fd, nil)
+ return fd.pfd.Close()
}
func (fd *netFD) shutdown(how int) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("shutdown", syscall.Shutdown(fd.sysfd, how))
+ err := fd.pfd.Shutdown(how)
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("shutdown", err)
}
func (fd *netFD) closeRead() error {
@@ -224,233 +204,59 @@
}
func (fd *netFD) Read(p []byte) (n int, err error) {
- if err := fd.readLock(); err != nil {
- return 0, err
- }
- defer fd.readUnlock()
- if len(p) == 0 {
- // If the caller wanted a zero byte read, return immediately
- // without trying. (But after acquiring the readLock.) Otherwise
- // syscall.Read returns 0, nil and eofError turns that into
- // io.EOF.
- // TODO(bradfitz): make it wait for readability? (Issue 15735)
- return 0, nil
- }
- if err := fd.pd.prepareRead(); err != nil {
- return 0, err
- }
- if fd.isStream && len(p) > 1<<30 {
- p = p[:1<<30]
- }
- for {
- n, err = syscall.Read(fd.sysfd, p)
- if err != nil {
- n = 0
- if err == syscall.EAGAIN {
- if err = fd.pd.waitRead(); err == nil {
- continue
- }
- }
- }
- err = fd.eofError(n, err)
- break
- }
- if _, ok := err.(syscall.Errno); ok {
- err = os.NewSyscallError("read", err)
- }
- return
+ n, err = fd.pfd.Read(p)
+ runtime.KeepAlive(fd)
+ return n, wrapSyscallError("read", err)
}
func (fd *netFD) readFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
- if err := fd.readLock(); err != nil {
- return 0, nil, err
- }
- defer fd.readUnlock()
- if err := fd.pd.prepareRead(); err != nil {
- return 0, nil, err
- }
- for {
- n, sa, err = syscall.Recvfrom(fd.sysfd, p, 0)
- if err != nil {
- n = 0
- if err == syscall.EAGAIN {
- if err = fd.pd.waitRead(); err == nil {
- continue
- }
- }
- }
- err = fd.eofError(n, err)
- break
- }
- if _, ok := err.(syscall.Errno); ok {
- err = os.NewSyscallError("recvfrom", err)
- }
- return
+ n, sa, err = fd.pfd.RecvFrom(p)
+ runtime.KeepAlive(fd)
+ return n, sa, wrapSyscallError("recvfrom", err)
}
func (fd *netFD) readMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) {
- if err := fd.readLock(); err != nil {
- return 0, 0, 0, nil, err
- }
- defer fd.readUnlock()
- if err := fd.pd.prepareRead(); err != nil {
- return 0, 0, 0, nil, err
- }
- for {
- n, oobn, flags, sa, err = syscall.Recvmsg(fd.sysfd, p, oob, 0)
- if err != nil {
- // TODO(dfc) should n and oobn be set to 0
- if err == syscall.EAGAIN {
- if err = fd.pd.waitRead(); err == nil {
- continue
- }
- }
- }
- err = fd.eofError(n, err)
- break
- }
- if _, ok := err.(syscall.Errno); ok {
- err = os.NewSyscallError("recvmsg", err)
- }
- return
+ n, oobn, flags, sa, err = fd.pfd.ReadMsg(p, oob)
+ runtime.KeepAlive(fd)
+ return n, oobn, flags, sa, wrapSyscallError("recvmsg", err)
}
func (fd *netFD) Write(p []byte) (nn int, err error) {
- if err := fd.writeLock(); err != nil {
- return 0, err
- }
- defer fd.writeUnlock()
- if err := fd.pd.prepareWrite(); err != nil {
- return 0, err
- }
- for {
- var n int
- max := len(p)
- if fd.isStream && max-nn > 1<<30 {
- max = nn + 1<<30
- }
- n, err = syscall.Write(fd.sysfd, p[nn:max])
- if n > 0 {
- nn += n
- }
- if nn == len(p) {
- break
- }
- if err == syscall.EAGAIN {
- if err = fd.pd.waitWrite(); err == nil {
- continue
- }
- }
- if err != nil {
- break
- }
- if n == 0 {
- err = io.ErrUnexpectedEOF
- break
- }
- }
- if _, ok := err.(syscall.Errno); ok {
- err = os.NewSyscallError("write", err)
- }
- return nn, err
+ nn, err = fd.pfd.Write(p)
+ runtime.KeepAlive(fd)
+ return nn, wrapSyscallError("write", err)
}
func (fd *netFD) writeTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
- if err := fd.writeLock(); err != nil {
- return 0, err
- }
- defer fd.writeUnlock()
- if err := fd.pd.prepareWrite(); err != nil {
- return 0, err
- }
- for {
- err = syscall.Sendto(fd.sysfd, p, 0, sa)
- if err == syscall.EAGAIN {
- if err = fd.pd.waitWrite(); err == nil {
- continue
- }
- }
- break
- }
- if err == nil {
- n = len(p)
- }
- if _, ok := err.(syscall.Errno); ok {
- err = os.NewSyscallError("sendto", err)
- }
- return
+ n, err = fd.pfd.WriteTo(p, sa)
+ runtime.KeepAlive(fd)
+ return n, wrapSyscallError("sendto", err)
}
func (fd *netFD) writeMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) {
- if err := fd.writeLock(); err != nil {
- return 0, 0, err
- }
- defer fd.writeUnlock()
- if err := fd.pd.prepareWrite(); err != nil {
- return 0, 0, err
- }
- for {
- n, err = syscall.SendmsgN(fd.sysfd, p, oob, sa, 0)
- if err == syscall.EAGAIN {
- if err = fd.pd.waitWrite(); err == nil {
- continue
- }
- }
- break
- }
- if err == nil {
- oobn = len(oob)
- }
- if _, ok := err.(syscall.Errno); ok {
- err = os.NewSyscallError("sendmsg", err)
- }
- return
+ n, oobn, err = fd.pfd.WriteMsg(p, oob, sa)
+ runtime.KeepAlive(fd)
+ return n, oobn, wrapSyscallError("sendmsg", err)
}
func (fd *netFD) accept() (netfd *netFD, err error) {
- if err := fd.readLock(); err != nil {
- return nil, err
- }
- defer fd.readUnlock()
-
- var s int
- var rsa syscall.Sockaddr
- if err = fd.pd.prepareRead(); err != nil {
- return nil, err
- }
- for {
- s, rsa, err = accept(fd.sysfd)
- if err != nil {
- nerr, ok := err.(*os.SyscallError)
- if !ok {
- return nil, err
- }
- switch nerr.Err {
- case syscall.EAGAIN:
- if err = fd.pd.waitRead(); err == nil {
- continue
- }
- case syscall.ECONNABORTED:
- // This means that a socket on the
- // listen queue was closed before we
- // Accept()ed it; it's a silly error,
- // so try again.
- continue
- }
- return nil, err
+ d, rsa, errcall, err := fd.pfd.Accept()
+ if err != nil {
+ if errcall != "" {
+ err = wrapSyscallError(errcall, err)
}
- break
+ return nil, err
}
- if netfd, err = newFD(s, fd.family, fd.sotype, fd.net); err != nil {
- closeFunc(s)
+ if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
+ poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil {
fd.Close()
return nil, err
}
- lsa, _ := syscall.Getsockname(netfd.sysfd)
+ lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
@@ -503,7 +309,7 @@
}
func (fd *netFD) dup() (f *os.File, err error) {
- ns, err := dupCloseOnExec(fd.sysfd)
+ ns, err := dupCloseOnExec(fd.pfd.Sysfd)
if err != nil {
return nil, err
}
diff --git a/src/net/fd_windows.go b/src/net/fd_windows.go
index a976f2a..2182b73 100644
--- a/src/net/fd_windows.go
+++ b/src/net/fd_windows.go
@@ -6,62 +6,14 @@
import (
"context"
- "internal/race"
+ "internal/poll"
"os"
"runtime"
- "sync"
"syscall"
"unsafe"
)
-var (
- initErr error
- ioSync uint64
-)
-
-// CancelIo Windows API cancels all outstanding IO for a particular
-// socket on current thread. To overcome that limitation, we run
-// special goroutine, locked to OS single thread, that both starts
-// and cancels IO. It means, there are 2 unavoidable thread switches
-// for every IO.
-// Some newer versions of Windows has new CancelIoEx API, that does
-// not have that limitation and can be used from any thread. This
-// package uses CancelIoEx API, if present, otherwise it fallback
-// to CancelIo.
-
-var (
- canCancelIO bool // determines if CancelIoEx API is present
- skipSyncNotif bool
- hasLoadSetFileCompletionNotificationModes bool
-)
-
func sysInit() {
- var d syscall.WSAData
- e := syscall.WSAStartup(uint32(0x202), &d)
- if e != nil {
- initErr = os.NewSyscallError("wsastartup", e)
- }
- canCancelIO = syscall.LoadCancelIoEx() == nil
- hasLoadSetFileCompletionNotificationModes = syscall.LoadSetFileCompletionNotificationModes() == nil
- if hasLoadSetFileCompletionNotificationModes {
- // It's not safe to use FILE_SKIP_COMPLETION_PORT_ON_SUCCESS if non IFS providers are installed:
- // http://support.microsoft.com/kb/2568167
- skipSyncNotif = true
- protos := [2]int32{syscall.IPPROTO_TCP, 0}
- var buf [32]syscall.WSAProtocolInfo
- len := uint32(unsafe.Sizeof(buf))
- n, err := syscall.WSAEnumProtocols(&protos[0], &buf[0], &len)
- if err != nil {
- skipSyncNotif = false
- } else {
- for i := int32(0); i < n; i++ {
- if buf[i].ServiceFlags1&syscall.XP1_IFS_HANDLES == 0 {
- skipSyncNotif = false
- break
- }
- }
- }
- }
}
// canUseConnectEx reports whether we can use the ConnectEx Windows API call
@@ -75,257 +27,39 @@
return false
}
-// operation contains superset of data necessary to perform all async IO.
-type operation struct {
- // Used by IOCP interface, it must be first field
- // of the struct, as our code rely on it.
- o syscall.Overlapped
-
- // fields used by runtime.netpoll
- runtimeCtx uintptr
- mode int32
- errno int32
- qty uint32
-
- // fields used only by net package
- fd *netFD
- errc chan error
- buf syscall.WSABuf
- sa syscall.Sockaddr
- rsa *syscall.RawSockaddrAny
- rsan int32
- handle syscall.Handle
- flags uint32
- bufs []syscall.WSABuf
-}
-
-func (o *operation) InitBuf(buf []byte) {
- o.buf.Len = uint32(len(buf))
- o.buf.Buf = nil
- if len(buf) != 0 {
- o.buf.Buf = &buf[0]
- }
-}
-
-func (o *operation) InitBufs(buf *Buffers) {
- if o.bufs == nil {
- o.bufs = make([]syscall.WSABuf, 0, len(*buf))
- } else {
- o.bufs = o.bufs[:0]
- }
- for _, b := range *buf {
- var p *byte
- if len(b) > 0 {
- p = &b[0]
- }
- o.bufs = append(o.bufs, syscall.WSABuf{Len: uint32(len(b)), Buf: p})
- }
-}
-
-// ClearBufs clears all pointers to Buffers parameter captured
-// by InitBufs, so it can be released by garbage collector.
-func (o *operation) ClearBufs() {
- for i := range o.bufs {
- o.bufs[i].Buf = nil
- }
- o.bufs = o.bufs[:0]
-}
-
-// ioSrv executes net IO requests.
-type ioSrv struct {
- req chan ioSrvReq
-}
-
-type ioSrvReq struct {
- o *operation
- submit func(o *operation) error // if nil, cancel the operation
-}
-
-// ProcessRemoteIO will execute submit IO requests on behalf
-// of other goroutines, all on a single os thread, so it can
-// cancel them later. Results of all operations will be sent
-// back to their requesters via channel supplied in request.
-// It is used only when the CancelIoEx API is unavailable.
-func (s *ioSrv) ProcessRemoteIO() {
- runtime.LockOSThread()
- defer runtime.UnlockOSThread()
- for r := range s.req {
- if r.submit != nil {
- r.o.errc <- r.submit(r.o)
- } else {
- r.o.errc <- syscall.CancelIo(r.o.fd.sysfd)
- }
- }
-}
-
-// ExecIO executes a single IO operation o. It submits and cancels
-// IO in the current thread for systems where Windows CancelIoEx API
-// is available. Alternatively, it passes the request onto
-// runtime netpoll and waits for completion or cancels request.
-func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) error) (int, error) {
- fd := o.fd
- // Notify runtime netpoll about starting IO.
- err := fd.pd.prepare(int(o.mode))
- if err != nil {
- return 0, err
- }
- // Start IO.
- if canCancelIO {
- err = submit(o)
- } else {
- // Send request to a special dedicated thread,
- // so it can stop the IO with CancelIO later.
- s.req <- ioSrvReq{o, submit}
- err = <-o.errc
- }
- switch err {
- case nil:
- // IO completed immediately
- if o.fd.skipSyncNotif {
- // No completion message will follow, so return immediately.
- return int(o.qty), nil
- }
- // Need to get our completion message anyway.
- case syscall.ERROR_IO_PENDING:
- // IO started, and we have to wait for its completion.
- err = nil
- default:
- return 0, err
- }
- // Wait for our request to complete.
- err = fd.pd.wait(int(o.mode))
- if err == nil {
- // All is good. Extract our IO results and return.
- if o.errno != 0 {
- err = syscall.Errno(o.errno)
- return 0, err
- }
- return int(o.qty), nil
- }
- // IO is interrupted by "close" or "timeout"
- netpollErr := err
- switch netpollErr {
- case errClosing, errTimeout:
- // will deal with those.
- default:
- panic("net: unexpected runtime.netpoll error: " + netpollErr.Error())
- }
- // Cancel our request.
- if canCancelIO {
- err := syscall.CancelIoEx(fd.sysfd, &o.o)
- // Assuming ERROR_NOT_FOUND is returned, if IO is completed.
- if err != nil && err != syscall.ERROR_NOT_FOUND {
- // TODO(brainman): maybe do something else, but panic.
- panic(err)
- }
- } else {
- s.req <- ioSrvReq{o, nil}
- <-o.errc
- }
- // Wait for cancelation to complete.
- fd.pd.waitCanceled(int(o.mode))
- if o.errno != 0 {
- err = syscall.Errno(o.errno)
- if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
- err = netpollErr
- }
- return 0, err
- }
- // We issued a cancelation request. But, it seems, IO operation succeeded
- // before the cancelation request run. We need to treat the IO operation as
- // succeeded (the bytes are actually sent/recv from network).
- return int(o.qty), nil
-}
-
-// Start helper goroutines.
-var rsrv, wsrv *ioSrv
-var onceStartServer sync.Once
-
-func startServer() {
- rsrv = new(ioSrv)
- wsrv = new(ioSrv)
- if !canCancelIO {
- // Only CancelIo API is available. Lets start two special goroutines
- // locked to an OS thread, that both starts and cancels IO. One will
- // process read requests, while other will do writes.
- rsrv.req = make(chan ioSrvReq)
- go rsrv.ProcessRemoteIO()
- wsrv.req = make(chan ioSrvReq)
- go wsrv.ProcessRemoteIO()
- }
-}
-
// Network file descriptor.
type netFD struct {
- // locking/lifetime of sysfd + serialize access to Read and Write methods
- fdmu fdMutex
+ pfd poll.FD
// immutable until Close
- sysfd syscall.Handle
- family int
- sotype int
- isStream bool
- isConnected bool
- skipSyncNotif bool
- net string
- laddr Addr
- raddr Addr
-
- rop operation // read operation
- wop operation // write operation
-
- // wait server
- pd pollDesc
+ family int
+ sotype int
+ isConnected bool
+ net string
+ laddr Addr
+ raddr Addr
}
func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
- if initErr != nil {
- return nil, initErr
+ ret := &netFD{
+ pfd: poll.FD{
+ Sysfd: sysfd,
+ IsStream: sotype == syscall.SOCK_STREAM,
+ ZeroReadIsEOF: sotype != syscall.SOCK_DGRAM && sotype != syscall.SOCK_RAW,
+ },
+ family: family,
+ sotype: sotype,
+ net: net,
}
- onceStartServer.Do(startServer)
- return &netFD{sysfd: sysfd, family: family, sotype: sotype, net: net, isStream: sotype == syscall.SOCK_STREAM}, nil
+ return ret, nil
}
func (fd *netFD) init() error {
- if err := fd.pd.init(fd); err != nil {
- return err
+ errcall, err := fd.pfd.Init(fd.net)
+ if errcall != "" {
+ err = wrapSyscallError(errcall, err)
}
- if hasLoadSetFileCompletionNotificationModes {
- // We do not use events, so we can skip them always.
- flags := uint8(syscall.FILE_SKIP_SET_EVENT_ON_HANDLE)
- // It's not safe to skip completion notifications for UDP:
- // http://blogs.technet.com/b/winserverperformance/archive/2008/06/26/designing-applications-for-high-performance-part-iii.aspx
- if skipSyncNotif && fd.net == "tcp" {
- flags |= syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS
- }
- err := syscall.SetFileCompletionNotificationModes(fd.sysfd, flags)
- if err == nil && flags&syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS != 0 {
- fd.skipSyncNotif = true
- }
- }
- // Disable SIO_UDP_CONNRESET behavior.
- // http://support.microsoft.com/kb/263823
- switch fd.net {
- case "udp", "udp4", "udp6":
- ret := uint32(0)
- flag := uint32(0)
- size := uint32(unsafe.Sizeof(flag))
- err := syscall.WSAIoctl(fd.sysfd, syscall.SIO_UDP_CONNRESET, (*byte)(unsafe.Pointer(&flag)), size, nil, 0, &ret, nil, 0)
- if err != nil {
- return os.NewSyscallError("wsaioctl", err)
- }
- }
- fd.rop.mode = 'r'
- fd.wop.mode = 'w'
- fd.rop.fd = fd
- fd.wop.fd = fd
- fd.rop.runtimeCtx = fd.pd.runtimeCtx
- fd.wop.runtimeCtx = fd.pd.runtimeCtx
- if !canCancelIO {
- fd.rop.errc = make(chan error)
- fd.wop.errc = make(chan error)
- }
- return nil
+ return err
}
func (fd *netFD) setAddr(laddr, raddr Addr) {
@@ -342,11 +76,11 @@
return err
}
if deadline, ok := ctx.Deadline(); ok && !deadline.IsZero() {
- fd.setWriteDeadline(deadline)
- defer fd.setWriteDeadline(noDeadline)
+ fd.pfd.SetWriteDeadline(deadline)
+ defer fd.pfd.SetWriteDeadline(noDeadline)
}
if !canUseConnectEx(fd.net) {
- err := connectFunc(fd.sysfd, ra)
+ err := connectFunc(fd.pfd.Sysfd, ra)
return os.NewSyscallError("connect", err)
}
// ConnectEx windows API requires an unconnected, previously bound socket.
@@ -359,13 +93,10 @@
default:
panic("unexpected type in connect")
}
- if err := syscall.Bind(fd.sysfd, la); err != nil {
+ if err := syscall.Bind(fd.pfd.Sysfd, la); err != nil {
return os.NewSyscallError("bind", err)
}
}
- // Call ConnectEx API.
- o := &fd.wop
- o.sa = ra
// Wait for the goroutine converting context.Done into a write timeout
// to exist, otherwise our caller might cancel the context and
@@ -377,16 +108,14 @@
case <-ctx.Done():
// Force the runtime's poller to immediately give
// up waiting for writability.
- fd.setWriteDeadline(aLongTimeAgo)
+ fd.pfd.SetWriteDeadline(aLongTimeAgo)
<-done
case <-done:
}
}()
- _, err := wsrv.ExecIO(o, "ConnectEx", func(o *operation) error {
- return connectExFunc(o.fd.sysfd, o.sa, nil, 0, nil, &o.o)
- })
- if err != nil {
+ // Call ConnectEx API.
+ if err := fd.pfd.ConnectEx(ra); err != nil {
select {
case <-ctx.Done():
return mapErr(ctx.Err())
@@ -398,38 +127,18 @@
}
}
// Refresh socket properties.
- return os.NewSyscallError("setsockopt", syscall.Setsockopt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_UPDATE_CONNECT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd))))
-}
-
-func (fd *netFD) destroy() {
- if fd.sysfd == syscall.InvalidHandle {
- return
- }
- // Poller may want to unregister fd in readiness notification mechanism,
- // so this must be executed before closeFunc.
- fd.pd.close()
- closeFunc(fd.sysfd)
- fd.sysfd = syscall.InvalidHandle
- // no need for a finalizer anymore
- runtime.SetFinalizer(fd, nil)
+ return os.NewSyscallError("setsockopt", syscall.Setsockopt(fd.pfd.Sysfd, syscall.SOL_SOCKET, syscall.SO_UPDATE_CONNECT_CONTEXT, (*byte)(unsafe.Pointer(&fd.pfd.Sysfd)), int32(unsafe.Sizeof(fd.pfd.Sysfd))))
}
func (fd *netFD) Close() error {
- if !fd.fdmu.increfAndClose() {
- return errClosing
- }
- // unblock pending reader and writer
- fd.pd.evict()
- fd.decref()
- return nil
+ runtime.SetFinalizer(fd, nil)
+ return fd.pfd.Close()
}
func (fd *netFD) shutdown(how int) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return syscall.Shutdown(fd.sysfd, how)
+ err := fd.pfd.Shutdown(how)
+ runtime.KeepAlive(fd)
+ return err
}
func (fd *netFD) closeRead() error {
@@ -441,72 +150,21 @@
}
func (fd *netFD) Read(buf []byte) (int, error) {
- if err := fd.readLock(); err != nil {
- return 0, err
- }
- defer fd.readUnlock()
- o := &fd.rop
- o.InitBuf(buf)
- n, err := rsrv.ExecIO(o, "WSARecv", func(o *operation) error {
- return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
- })
- if race.Enabled {
- race.Acquire(unsafe.Pointer(&ioSync))
- }
- if len(buf) != 0 {
- err = fd.eofError(n, err)
- }
- if _, ok := err.(syscall.Errno); ok {
- err = os.NewSyscallError("wsarecv", err)
- }
- return n, err
+ n, err := fd.pfd.Read(buf)
+ runtime.KeepAlive(fd)
+ return n, wrapSyscallError("wsarecv", err)
}
func (fd *netFD) readFrom(buf []byte) (int, syscall.Sockaddr, error) {
- if len(buf) == 0 {
- return 0, nil, nil
- }
- if err := fd.readLock(); err != nil {
- return 0, nil, err
- }
- defer fd.readUnlock()
- o := &fd.rop
- o.InitBuf(buf)
- n, err := rsrv.ExecIO(o, "WSARecvFrom", func(o *operation) error {
- if o.rsa == nil {
- o.rsa = new(syscall.RawSockaddrAny)
- }
- o.rsan = int32(unsafe.Sizeof(*o.rsa))
- return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil)
- })
- err = fd.eofError(n, err)
- if _, ok := err.(syscall.Errno); ok {
- err = os.NewSyscallError("wsarecvfrom", err)
- }
- if err != nil {
- return n, nil, err
- }
- sa, _ := o.rsa.Sockaddr()
- return n, sa, nil
+ n, sa, err := fd.pfd.RecvFrom(buf)
+ runtime.KeepAlive(fd)
+ return n, sa, wrapSyscallError("wsarecvfrom", err)
}
func (fd *netFD) Write(buf []byte) (int, error) {
- if err := fd.writeLock(); err != nil {
- return 0, err
- }
- defer fd.writeUnlock()
- if race.Enabled {
- race.ReleaseMerge(unsafe.Pointer(&ioSync))
- }
- o := &fd.wop
- o.InitBuf(buf)
- n, err := wsrv.ExecIO(o, "WSASend", func(o *operation) error {
- return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil)
- })
- if _, ok := err.(syscall.Errno); ok {
- err = os.NewSyscallError("wsasend", err)
- }
- return n, err
+ n, err := fd.pfd.Write(buf)
+ runtime.KeepAlive(fd)
+ return n, wrapSyscallError("wsasend", err)
}
func (c *conn) writeBuffers(v *Buffers) (int64, error) {
@@ -521,61 +179,33 @@
}
func (fd *netFD) writeBuffers(buf *Buffers) (int64, error) {
- if len(*buf) == 0 {
- return 0, nil
- }
- if err := fd.writeLock(); err != nil {
- return 0, err
- }
- defer fd.writeUnlock()
- if race.Enabled {
- race.ReleaseMerge(unsafe.Pointer(&ioSync))
- }
- o := &fd.wop
- o.InitBufs(buf)
- n, err := wsrv.ExecIO(o, "WSASend", func(o *operation) error {
- return syscall.WSASend(o.fd.sysfd, &o.bufs[0], uint32(len(*buf)), &o.qty, 0, &o.o, nil)
- })
- o.ClearBufs()
- if _, ok := err.(syscall.Errno); ok {
- err = os.NewSyscallError("wsasend", err)
- }
- testHookDidWritev(n)
- buf.consume(int64(n))
- return int64(n), err
+ n, err := fd.pfd.Writev((*[][]byte)(buf))
+ runtime.KeepAlive(fd)
+ return n, wrapSyscallError("wsasend", err)
}
func (fd *netFD) writeTo(buf []byte, sa syscall.Sockaddr) (int, error) {
- if len(buf) == 0 {
- return 0, nil
- }
- if err := fd.writeLock(); err != nil {
- return 0, err
- }
- defer fd.writeUnlock()
- o := &fd.wop
- o.InitBuf(buf)
- o.sa = sa
- n, err := wsrv.ExecIO(o, "WSASendto", func(o *operation) error {
- return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
- })
- if _, ok := err.(syscall.Errno); ok {
- err = os.NewSyscallError("wsasendto", err)
- }
- return n, err
+ n, err := fd.pfd.WriteTo(buf, sa)
+ runtime.KeepAlive(fd)
+ return n, wrapSyscallError("wsasendto", err)
}
-func (fd *netFD) acceptOne(rawsa []syscall.RawSockaddrAny, o *operation) (*netFD, error) {
- // Get new socket.
- s, err := sysSocket(fd.family, fd.sotype, 0)
+func (fd *netFD) accept() (*netFD, error) {
+ s, rawsa, rsan, errcall, err := fd.pfd.Accept(func() (syscall.Handle, error) {
+ return sysSocket(fd.family, fd.sotype, 0)
+ })
+
if err != nil {
+ if errcall != "" {
+ err = wrapSyscallError(errcall, err)
+ }
return nil, err
}
// Associate our new socket with IOCP.
netfd, err := newFD(s, fd.family, fd.sotype, fd.net)
if err != nil {
- closeFunc(s)
+ poll.CloseFunc(s)
return nil, err
}
if err := netfd.init(); err != nil {
@@ -583,71 +213,11 @@
return nil, err
}
- // Submit accept request.
- o.handle = s
- o.rsan = int32(unsafe.Sizeof(rawsa[0]))
- _, err = rsrv.ExecIO(o, "AcceptEx", func(o *operation) error {
- return acceptFunc(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o)
- })
- if err != nil {
- netfd.Close()
- if _, ok := err.(syscall.Errno); ok {
- err = os.NewSyscallError("acceptex", err)
- }
- return nil, err
- }
-
- // Inherit properties of the listening socket.
- err = syscall.Setsockopt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd)))
- if err != nil {
- netfd.Close()
- return nil, os.NewSyscallError("setsockopt", err)
- }
- runtime.KeepAlive(fd)
- return netfd, nil
-}
-
-func (fd *netFD) accept() (*netFD, error) {
- if err := fd.readLock(); err != nil {
- return nil, err
- }
- defer fd.readUnlock()
-
- o := &fd.rop
- var netfd *netFD
- var err error
- var rawsa [2]syscall.RawSockaddrAny
- for {
- netfd, err = fd.acceptOne(rawsa[:], o)
- if err == nil {
- break
- }
- // Sometimes we see WSAECONNRESET and ERROR_NETNAME_DELETED is
- // returned here. These happen if connection reset is received
- // before AcceptEx could complete. These errors relate to new
- // connection, not to AcceptEx, so ignore broken connection and
- // try AcceptEx again for more connections.
- nerr, ok := err.(*os.SyscallError)
- if !ok {
- return nil, err
- }
- errno, ok := nerr.Err.(syscall.Errno)
- if !ok {
- return nil, err
- }
- switch errno {
- case syscall.ERROR_NETNAME_DELETED, syscall.WSAECONNRESET:
- // ignore these and try again
- default:
- return nil, err
- }
- }
-
// Get local and peer addr out of AcceptEx buffer.
var lrsa, rrsa *syscall.RawSockaddrAny
var llen, rlen int32
syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&rawsa[0])),
- 0, uint32(o.rsan), uint32(o.rsan), &lrsa, &llen, &rrsa, &rlen)
+ 0, rsan, rsan, &lrsa, &llen, &rrsa, &rlen)
lsa, _ := lrsa.Sockaddr()
rsa, _ := rrsa.Sockaddr()
diff --git a/src/net/file_unix.go b/src/net/file_unix.go
index 9e581fc..d67dff8 100644
--- a/src/net/file_unix.go
+++ b/src/net/file_unix.go
@@ -7,6 +7,7 @@
package net
import (
+ "internal/poll"
"os"
"syscall"
)
@@ -17,7 +18,7 @@
return -1, err
}
if err := syscall.SetNonblock(s, true); err != nil {
- closeFunc(s)
+ poll.CloseFunc(s)
return -1, os.NewSyscallError("setnonblock", err)
}
return s, nil
@@ -31,7 +32,7 @@
family := syscall.AF_UNSPEC
sotype, err := syscall.GetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_TYPE)
if err != nil {
- closeFunc(s)
+ poll.CloseFunc(s)
return nil, os.NewSyscallError("getsockopt", err)
}
lsa, _ := syscall.Getsockname(s)
@@ -44,12 +45,12 @@
case *syscall.SockaddrUnix:
family = syscall.AF_UNIX
default:
- closeFunc(s)
+ poll.CloseFunc(s)
return nil, syscall.EPROTONOSUPPORT
}
fd, err := newFD(s, family, sotype, "")
if err != nil {
- closeFunc(s)
+ poll.CloseFunc(s)
return nil, err
}
laddr := fd.addrFunc()(lsa)
diff --git a/src/net/hook_cloexec.go b/src/net/hook_cloexec.go
deleted file mode 100644
index 870f0d7..0000000
--- a/src/net/hook_cloexec.go
+++ /dev/null
@@ -1,14 +0,0 @@
-// Copyright 2015 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build freebsd linux
-
-package net
-
-import "syscall"
-
-var (
- // Placeholders for socket system calls.
- accept4Func func(int, int) (int, syscall.Sockaddr, error) = syscall.Accept4
-)
diff --git a/src/net/hook_unix.go b/src/net/hook_unix.go
index cf52567..fee62a9 100644
--- a/src/net/hook_unix.go
+++ b/src/net/hook_unix.go
@@ -13,10 +13,8 @@
testHookCanceledDial = func() {} // for golang.org/issue/16523
// Placeholders for socket system calls.
- socketFunc func(int, int, int) (int, error) = syscall.Socket
- closeFunc func(int) error = syscall.Close
- connectFunc func(int, syscall.Sockaddr) error = syscall.Connect
- listenFunc func(int, int) error = syscall.Listen
- acceptFunc func(int) (int, syscall.Sockaddr, error) = syscall.Accept
- getsockoptIntFunc func(int, int, int) (int, error) = syscall.GetsockoptInt
+ socketFunc func(int, int, int) (int, error) = syscall.Socket
+ connectFunc func(int, syscall.Sockaddr) error = syscall.Connect
+ listenFunc func(int, int) error = syscall.Listen
+ getsockoptIntFunc func(int, int, int) (int, error) = syscall.GetsockoptInt
)
diff --git a/src/net/hook_windows.go b/src/net/hook_windows.go
index 63ea35a..4e64dce 100644
--- a/src/net/hook_windows.go
+++ b/src/net/hook_windows.go
@@ -13,10 +13,7 @@
testHookDialChannel = func() { time.Sleep(time.Millisecond) } // see golang.org/issue/5349
// Placeholders for socket system calls.
- socketFunc func(int, int, int) (syscall.Handle, error) = syscall.Socket
- closeFunc func(syscall.Handle) error = syscall.Closesocket
- connectFunc func(syscall.Handle, syscall.Sockaddr) error = syscall.Connect
- connectExFunc func(syscall.Handle, syscall.Sockaddr, *byte, uint32, *uint32, *syscall.Overlapped) error = syscall.ConnectEx
- listenFunc func(syscall.Handle, int) error = syscall.Listen
- acceptFunc func(syscall.Handle, syscall.Handle, *byte, uint32, uint32, uint32, *uint32, *syscall.Overlapped) error = syscall.AcceptEx
+ socketFunc func(int, int, int) (syscall.Handle, error) = syscall.Socket
+ connectFunc func(syscall.Handle, syscall.Sockaddr) error = syscall.Connect
+ listenFunc func(syscall.Handle, int) error = syscall.Listen
)
diff --git a/src/net/ipsock_plan9.go b/src/net/ipsock_plan9.go
index b7fd344..1cd8fa2 100644
--- a/src/net/ipsock_plan9.go
+++ b/src/net/ipsock_plan9.go
@@ -249,10 +249,10 @@
func (fd *netFD) acceptPlan9() (nfd *netFD, err error) {
defer func() { fixErr(err) }()
- if err := fd.readLock(); err != nil {
+ if err := fd.pfd.ReadLock(); err != nil {
return nil, err
}
- defer fd.readUnlock()
+ defer fd.pfd.ReadUnlock()
listen, err := os.Open(fd.dir + "/listen")
if err != nil {
return nil, err
diff --git a/src/net/ipsock_posix.go b/src/net/ipsock_posix.go
index ff280c3..5cb85f8 100644
--- a/src/net/ipsock_posix.go
+++ b/src/net/ipsock_posix.go
@@ -8,6 +8,7 @@
import (
"context"
+ "internal/poll"
"runtime"
"syscall"
)
@@ -18,7 +19,7 @@
case syscall.EAFNOSUPPORT, syscall.EPROTONOSUPPORT:
return false
case nil:
- closeFunc(s)
+ poll.CloseFunc(s)
}
return true
}
@@ -68,7 +69,7 @@
if err != nil {
continue
}
- defer closeFunc(s)
+ defer poll.CloseFunc(s)
syscall.SetsockoptInt(s, syscall.IPPROTO_IPV6, syscall.IPV6_V6ONLY, probes[i].value)
sa, err := probes[i].laddr.sockaddr(syscall.AF_INET6)
if err != nil {
diff --git a/src/net/main_cloexec_test.go b/src/net/main_cloexec_test.go
index 7903819..ade71a9 100644
--- a/src/net/main_cloexec_test.go
+++ b/src/net/main_cloexec_test.go
@@ -6,6 +6,8 @@
package net
+import "internal/poll"
+
func init() {
extraTestHookInstallers = append(extraTestHookInstallers, installAccept4TestHook)
extraTestHookUninstallers = append(extraTestHookUninstallers, uninstallAccept4TestHook)
@@ -13,13 +15,13 @@
var (
// Placeholders for saving original socket system calls.
- origAccept4 = accept4Func
+ origAccept4 = poll.Accept4Func
)
func installAccept4TestHook() {
- accept4Func = sw.Accept4
+ poll.Accept4Func = sw.Accept4
}
func uninstallAccept4TestHook() {
- accept4Func = origAccept4
+ poll.Accept4Func = origAccept4
}
diff --git a/src/net/main_unix_test.go b/src/net/main_unix_test.go
index 0cc129f..9cfbc8e 100644
--- a/src/net/main_unix_test.go
+++ b/src/net/main_unix_test.go
@@ -6,13 +6,15 @@
package net
+import "internal/poll"
+
var (
// Placeholders for saving original socket system calls.
origSocket = socketFunc
- origClose = closeFunc
+ origClose = poll.CloseFunc
origConnect = connectFunc
origListen = listenFunc
- origAccept = acceptFunc
+ origAccept = poll.AcceptFunc
origGetsockoptInt = getsockoptIntFunc
extraTestHookInstallers []func()
@@ -21,10 +23,10 @@
func installTestHooks() {
socketFunc = sw.Socket
- closeFunc = sw.Close
+ poll.CloseFunc = sw.Close
connectFunc = sw.Connect
listenFunc = sw.Listen
- acceptFunc = sw.Accept
+ poll.AcceptFunc = sw.Accept
getsockoptIntFunc = sw.GetsockoptInt
for _, fn := range extraTestHookInstallers {
@@ -34,10 +36,10 @@
func uninstallTestHooks() {
socketFunc = origSocket
- closeFunc = origClose
+ poll.CloseFunc = origClose
connectFunc = origConnect
listenFunc = origListen
- acceptFunc = origAccept
+ poll.AcceptFunc = origAccept
getsockoptIntFunc = origGetsockoptInt
for _, fn := range extraTestHookUninstallers {
@@ -48,6 +50,6 @@
// forceCloseSockets must be called only from TestMain.
func forceCloseSockets() {
for s := range sw.Sockets() {
- closeFunc(s)
+ poll.CloseFunc(s)
}
}
diff --git a/src/net/main_windows_test.go b/src/net/main_windows_test.go
index 6ea318c..f38a3a0 100644
--- a/src/net/main_windows_test.go
+++ b/src/net/main_windows_test.go
@@ -4,37 +4,39 @@
package net
+import "internal/poll"
+
var (
// Placeholders for saving original socket system calls.
origSocket = socketFunc
- origClosesocket = closeFunc
+ origClosesocket = poll.CloseFunc
origConnect = connectFunc
- origConnectEx = connectExFunc
+ origConnectEx = poll.ConnectExFunc
origListen = listenFunc
- origAccept = acceptFunc
+ origAccept = poll.AcceptFunc
)
func installTestHooks() {
socketFunc = sw.Socket
- closeFunc = sw.Closesocket
+ poll.CloseFunc = sw.Closesocket
connectFunc = sw.Connect
- connectExFunc = sw.ConnectEx
+ poll.ConnectExFunc = sw.ConnectEx
listenFunc = sw.Listen
- acceptFunc = sw.AcceptEx
+ poll.AcceptFunc = sw.AcceptEx
}
func uninstallTestHooks() {
socketFunc = origSocket
- closeFunc = origClosesocket
+ poll.CloseFunc = origClosesocket
connectFunc = origConnect
- connectExFunc = origConnectEx
+ poll.ConnectExFunc = origConnectEx
listenFunc = origListen
- acceptFunc = origAccept
+ poll.AcceptFunc = origAccept
}
// forceCloseSockets must be called only from TestMain.
func forceCloseSockets() {
for s := range sw.Sockets() {
- closeFunc(s)
+ poll.CloseFunc(s)
}
}
diff --git a/src/net/net.go b/src/net/net.go
index 81206ea..9c27f1b 100644
--- a/src/net/net.go
+++ b/src/net/net.go
@@ -81,6 +81,7 @@
import (
"context"
"errors"
+ "internal/poll"
"io"
"os"
"syscall"
@@ -234,7 +235,7 @@
if !c.ok() {
return syscall.EINVAL
}
- if err := c.fd.setDeadline(t); err != nil {
+ if err := c.fd.pfd.SetDeadline(t); err != nil {
return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
}
return nil
@@ -245,7 +246,7 @@
if !c.ok() {
return syscall.EINVAL
}
- if err := c.fd.setReadDeadline(t); err != nil {
+ if err := c.fd.pfd.SetReadDeadline(t); err != nil {
return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
}
return nil
@@ -256,7 +257,7 @@
if !c.ok() {
return syscall.EINVAL
}
- if err := c.fd.setWriteDeadline(t); err != nil {
+ if err := c.fd.pfd.SetWriteDeadline(t); err != nil {
return &OpError{Op: "set", Net: c.fd.net, Source: nil, Addr: c.fd.laddr, Err: err}
}
return nil
@@ -391,10 +392,8 @@
errMissingAddress = errors.New("missing address")
// For both read and write operations.
- errTimeout error = &timeoutError{}
- errCanceled = errors.New("operation was canceled")
- errClosing = errors.New("use of closed network connection")
- ErrWriteToConnected = errors.New("use of WriteTo with pre-connected connection")
+ errCanceled = errors.New("operation was canceled")
+ ErrWriteToConnected = errors.New("use of WriteTo with pre-connected connection")
)
// mapErr maps from the context errors to the historical internal net
@@ -407,7 +406,7 @@
case context.Canceled:
return errCanceled
case context.DeadlineExceeded:
- return errTimeout
+ return poll.ErrTimeout
default:
return err
}
@@ -502,12 +501,6 @@
return ok && t.Temporary()
}
-type timeoutError struct{}
-
-func (e *timeoutError) Error() string { return "i/o timeout" }
-func (e *timeoutError) Timeout() bool { return true }
-func (e *timeoutError) Temporary() bool { return true }
-
// A ParseError is the error type of literal network address parsers.
type ParseError struct {
// Type is the type of string that was expected, such as
@@ -632,8 +625,6 @@
writeBuffers(*Buffers) (int64, error)
}
-var testHookDidWritev = func(wrote int) {}
-
// Buffers contains zero or more runs of bytes to write.
//
// On certain machines, for certain types of connections, this is
diff --git a/src/net/sendfile_bsd.go b/src/net/sendfile_bsd.go
index 67e80c9..7a2b48c 100644
--- a/src/net/sendfile_bsd.go
+++ b/src/net/sendfile_bsd.go
@@ -7,15 +7,11 @@
package net
import (
+ "internal/poll"
"io"
"os"
- "syscall"
)
-// maxSendfileSize is the largest chunk size we ask the kernel to copy
-// at a time.
-const maxSendfileSize int = 4 << 20
-
// sendFile copies the contents of r to c using the sendfile
// system call to minimize copies.
//
@@ -62,49 +58,10 @@
return 0, err, false
}
- if err := c.writeLock(); err != nil {
- return 0, err, true
- }
- defer c.writeUnlock()
+ written, err = poll.SendFile(&c.pfd, int(f.Fd()), pos, remain)
- dst := c.sysfd
- src := int(f.Fd())
- for remain > 0 {
- n := maxSendfileSize
- if int64(n) > remain {
- n = int(remain)
- }
- pos1 := pos
- n, err1 := syscall.Sendfile(dst, src, &pos1, n)
- if n > 0 {
- pos += int64(n)
- written += int64(n)
- remain -= int64(n)
- }
- if n == 0 && err1 == nil {
- break
- }
- if err1 == syscall.EAGAIN {
- if err1 = c.pd.waitWrite(); err1 == nil {
- continue
- }
- }
- if err1 == syscall.EINTR {
- continue
- }
- if err1 != nil {
- // This includes syscall.ENOSYS (no kernel
- // support) and syscall.EINVAL (fd types which
- // don't implement sendfile)
- err = err1
- break
- }
- }
if lr != nil {
- lr.N = remain
+ lr.N = remain - written
}
- if err != nil {
- err = os.NewSyscallError("sendfile", err)
- }
- return written, err, written > 0
+ return written, wrapSyscallError("sendfile", err), written > 0
}
diff --git a/src/net/sendfile_linux.go b/src/net/sendfile_linux.go
index 7e741f9..c537ea6 100644
--- a/src/net/sendfile_linux.go
+++ b/src/net/sendfile_linux.go
@@ -5,15 +5,11 @@
package net
import (
+ "internal/poll"
"io"
"os"
- "syscall"
)
-// maxSendfileSize is the largest chunk size we ask the kernel to copy
-// at a time.
-const maxSendfileSize int = 4 << 20
-
// sendFile copies the contents of r to c using the sendfile
// system call to minimize copies.
//
@@ -36,44 +32,10 @@
return 0, nil, false
}
- if err := c.writeLock(); err != nil {
- return 0, err, true
- }
- defer c.writeUnlock()
+ written, err = poll.SendFile(&c.pfd, int(f.Fd()), remain)
- dst := c.sysfd
- src := int(f.Fd())
- for remain > 0 {
- n := maxSendfileSize
- if int64(n) > remain {
- n = int(remain)
- }
- n, err1 := syscall.Sendfile(dst, src, nil, n)
- if n > 0 {
- written += int64(n)
- remain -= int64(n)
- }
- if n == 0 && err1 == nil {
- break
- }
- if err1 == syscall.EAGAIN {
- if err1 = c.pd.waitWrite(); err1 == nil {
- continue
- }
- }
- if err1 != nil {
- // This includes syscall.ENOSYS (no kernel
- // support) and syscall.EINVAL (fd types which
- // don't implement sendfile)
- err = err1
- break
- }
- }
if lr != nil {
- lr.N = remain
+ lr.N = remain - written
}
- if err != nil {
- err = os.NewSyscallError("sendfile", err)
- }
- return written, err, written > 0
+ return written, wrapSyscallError("sendfile", err), written > 0
}
diff --git a/src/net/sendfile_solaris.go b/src/net/sendfile_solaris.go
index add70c3..63ca9d4 100644
--- a/src/net/sendfile_solaris.go
+++ b/src/net/sendfile_solaris.go
@@ -5,19 +5,11 @@
package net
import (
+ "internal/poll"
"io"
"os"
- "syscall"
)
-// Not strictly needed, but very helpful for debugging, see issue #10221.
-//go:cgo_import_dynamic _ _ "libsendfile.so"
-//go:cgo_import_dynamic _ _ "libsocket.so"
-
-// maxSendfileSize is the largest chunk size we ask the kernel to copy
-// at a time.
-const maxSendfileSize int = 4 << 20
-
// sendFile copies the contents of r to c using the sendfile
// system call to minimize copies.
//
@@ -62,56 +54,10 @@
return 0, err, false
}
- if err := c.writeLock(); err != nil {
- return 0, err, true
- }
- defer c.writeUnlock()
+ written, err = poll.SendFile(&c.pfd, int(f.Fd()), pos, remain)
- dst := c.sysfd
- src := int(f.Fd())
- for remain > 0 {
- n := maxSendfileSize
- if int64(n) > remain {
- n = int(remain)
- }
- pos1 := pos
- n, err1 := syscall.Sendfile(dst, src, &pos1, n)
- if err1 == syscall.EAGAIN || err1 == syscall.EINTR {
- // partial write may have occurred
- if n = int(pos1 - pos); n == 0 {
- // nothing more to write
- err1 = nil
- }
- }
- if n > 0 {
- pos += int64(n)
- written += int64(n)
- remain -= int64(n)
- }
- if n == 0 && err1 == nil {
- break
- }
- if err1 == syscall.EAGAIN {
- if err1 = c.pd.waitWrite(); err1 == nil {
- continue
- }
- }
- if err1 == syscall.EINTR {
- continue
- }
- if err1 != nil {
- // This includes syscall.ENOSYS (no kernel
- // support) and syscall.EINVAL (fd types which
- // don't implement sendfile)
- err = err1
- break
- }
- }
if lr != nil {
- lr.N = remain
+ lr.N = remain - written
}
- if err != nil {
- err = os.NewSyscallError("sendfile", err)
- }
- return written, err, written > 0
+ return written, wrapSyscallError("sendfile", err), written > 0
}
diff --git a/src/net/sendfile_windows.go b/src/net/sendfile_windows.go
index bc0b7fb..bccd8b1 100644
--- a/src/net/sendfile_windows.go
+++ b/src/net/sendfile_windows.go
@@ -5,6 +5,7 @@
package net
import (
+ "internal/poll"
"io"
"os"
"syscall"
@@ -34,19 +35,10 @@
return 0, nil, false
}
- if err := fd.writeLock(); err != nil {
- return 0, err, true
- }
- defer fd.writeUnlock()
+ done, err := poll.SendFile(&fd.pfd, syscall.Handle(f.Fd()), n)
- o := &fd.wop
- o.qty = uint32(n)
- o.handle = syscall.Handle(f.Fd())
- done, err := wsrv.ExecIO(o, "TransmitFile", func(o *operation) error {
- return syscall.TransmitFile(o.fd.sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
- })
if err != nil {
- return 0, os.NewSyscallError("transmitfile", err), false
+ return 0, wrapSyscallError("transmitfile", err), false
}
if lr != nil {
lr.N -= int64(done)
diff --git a/src/net/sock_cloexec.go b/src/net/sock_cloexec.go
index 616a101..3f5be2d 100644
--- a/src/net/sock_cloexec.go
+++ b/src/net/sock_cloexec.go
@@ -10,6 +10,7 @@
package net
import (
+ "internal/poll"
"os"
"syscall"
)
@@ -42,46 +43,8 @@
return -1, os.NewSyscallError("socket", err)
}
if err = syscall.SetNonblock(s, true); err != nil {
- closeFunc(s)
+ poll.CloseFunc(s)
return -1, os.NewSyscallError("setnonblock", err)
}
return s, nil
}
-
-// Wrapper around the accept system call that marks the returned file
-// descriptor as nonblocking and close-on-exec.
-func accept(s int) (int, syscall.Sockaddr, error) {
- ns, sa, err := accept4Func(s, syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC)
- // On Linux the accept4 system call was introduced in 2.6.28
- // kernel and on FreeBSD it was introduced in 10 kernel. If we
- // get an ENOSYS error on both Linux and FreeBSD, or EINVAL
- // error on Linux, fall back to using accept.
- switch err {
- case nil:
- return ns, sa, nil
- default: // errors other than the ones listed
- return -1, sa, os.NewSyscallError("accept4", err)
- case syscall.ENOSYS: // syscall missing
- case syscall.EINVAL: // some Linux use this instead of ENOSYS
- case syscall.EACCES: // some Linux use this instead of ENOSYS
- case syscall.EFAULT: // some Linux use this instead of ENOSYS
- }
-
- // See ../syscall/exec_unix.go for description of ForkLock.
- // It is probably okay to hold the lock across syscall.Accept
- // because we have put fd.sysfd into non-blocking mode.
- // However, a call to the File method will put it back into
- // blocking mode. We can't take that risk, so no use of ForkLock here.
- ns, sa, err = acceptFunc(s)
- if err == nil {
- syscall.CloseOnExec(ns)
- }
- if err != nil {
- return -1, nil, os.NewSyscallError("accept", err)
- }
- if err = syscall.SetNonblock(ns, true); err != nil {
- closeFunc(ns)
- return -1, nil, os.NewSyscallError("setnonblock", err)
- }
- return ns, sa, nil
-}
diff --git a/src/net/sock_posix.go b/src/net/sock_posix.go
index 16351e1..8985f8f 100644
--- a/src/net/sock_posix.go
+++ b/src/net/sock_posix.go
@@ -8,6 +8,7 @@
import (
"context"
+ "internal/poll"
"os"
"syscall"
)
@@ -43,11 +44,11 @@
return nil, err
}
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
- closeFunc(s)
+ poll.CloseFunc(s)
return nil, err
}
if fd, err = newFD(s, family, sotype, net); err != nil {
- closeFunc(s)
+ poll.CloseFunc(s)
return nil, err
}
@@ -127,7 +128,7 @@
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
} else if lsa != nil {
- if err := syscall.Bind(fd.sysfd, lsa); err != nil {
+ if err := syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
}
@@ -146,8 +147,8 @@
return err
}
}
- lsa, _ = syscall.Getsockname(fd.sysfd)
- if rsa, _ = syscall.Getpeername(fd.sysfd); rsa != nil {
+ lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
+ if rsa, _ = syscall.Getpeername(fd.pfd.Sysfd); rsa != nil {
fd.setAddr(fd.addrFunc()(lsa), fd.addrFunc()(rsa))
} else {
fd.setAddr(fd.addrFunc()(lsa), raddr)
@@ -156,23 +157,23 @@
}
func (fd *netFD) listenStream(laddr sockaddr, backlog int) error {
- if err := setDefaultListenerSockopts(fd.sysfd); err != nil {
+ if err := setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
if lsa, err := laddr.sockaddr(fd.family); err != nil {
return err
} else if lsa != nil {
- if err := syscall.Bind(fd.sysfd, lsa); err != nil {
+ if err := syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
}
- if err := listenFunc(fd.sysfd, backlog); err != nil {
+ if err := listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
if err := fd.init(); err != nil {
return err
}
- lsa, _ := syscall.Getsockname(fd.sysfd)
+ lsa, _ := syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
@@ -188,7 +189,7 @@
// multiple UDP listeners that listen on the same UDP
// port to join the same group address.
if addr.IP != nil && addr.IP.IsMulticast() {
- if err := setDefaultMulticastSockopts(fd.sysfd); err != nil {
+ if err := setDefaultMulticastSockopts(fd.pfd.Sysfd); err != nil {
return err
}
addr := *addr
@@ -204,14 +205,14 @@
if lsa, err := laddr.sockaddr(fd.family); err != nil {
return err
} else if lsa != nil {
- if err := syscall.Bind(fd.sysfd, lsa); err != nil {
+ if err := syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
}
if err := fd.init(); err != nil {
return err
}
- lsa, _ := syscall.Getsockname(fd.sysfd)
+ lsa, _ := syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
diff --git a/src/net/sockopt_posix.go b/src/net/sockopt_posix.go
index cd3d562..e8af84f 100644
--- a/src/net/sockopt_posix.go
+++ b/src/net/sockopt_posix.go
@@ -7,7 +7,7 @@
package net
import (
- "os"
+ "runtime"
"syscall"
)
@@ -101,27 +101,21 @@
}
func setReadBuffer(fd *netFD, bytes int) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_RCVBUF, bytes))
+ err := fd.pfd.SetsockoptInt(syscall.SOL_SOCKET, syscall.SO_RCVBUF, bytes)
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
func setWriteBuffer(fd *netFD, bytes int) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_SNDBUF, bytes))
+ err := fd.pfd.SetsockoptInt(syscall.SOL_SOCKET, syscall.SO_SNDBUF, bytes)
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
func setKeepAlive(fd *netFD, keepalive bool) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, boolint(keepalive)))
+ err := fd.pfd.SetsockoptInt(syscall.SOL_SOCKET, syscall.SO_KEEPALIVE, boolint(keepalive))
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
func setLinger(fd *netFD, sec int) error {
@@ -133,9 +127,7 @@
l.Onoff = 0
l.Linger = 0
}
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptLinger(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_LINGER, &l))
+ err := fd.pfd.SetsockoptLinger(syscall.SOL_SOCKET, syscall.SO_LINGER, &l)
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
diff --git a/src/net/sockoptip_bsd.go b/src/net/sockoptip_bsd.go
index b15c639..b11f3a4 100644
--- a/src/net/sockoptip_bsd.go
+++ b/src/net/sockoptip_bsd.go
@@ -7,28 +7,24 @@
package net
import (
- "os"
+ "runtime"
"syscall"
)
func setIPv4MulticastInterface(fd *netFD, ifi *Interface) error {
ip, err := interfaceToIPv4Addr(ifi)
if err != nil {
- return os.NewSyscallError("setsockopt", err)
+ return wrapSyscallError("setsockopt", err)
}
var a [4]byte
copy(a[:], ip.To4())
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptInet4Addr(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_IF, a))
+ err = fd.pfd.SetsockoptInet4Addr(syscall.IPPROTO_IP, syscall.IP_MULTICAST_IF, a)
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
func setIPv4MulticastLoopback(fd *netFD, v bool) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptByte(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, byte(boolint(v))))
+ err := fd.pfd.SetsockoptByte(syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, byte(boolint(v)))
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
diff --git a/src/net/sockoptip_linux.go b/src/net/sockoptip_linux.go
index c1dcc91..bd7d834 100644
--- a/src/net/sockoptip_linux.go
+++ b/src/net/sockoptip_linux.go
@@ -5,7 +5,7 @@
package net
import (
- "os"
+ "runtime"
"syscall"
)
@@ -15,17 +15,13 @@
v = int32(ifi.Index)
}
mreq := &syscall.IPMreqn{Ifindex: v}
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptIPMreqn(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_IF, mreq))
+ err := fd.pfd.SetsockoptIPMreqn(syscall.IPPROTO_IP, syscall.IP_MULTICAST_IF, mreq)
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
func setIPv4MulticastLoopback(fd *netFD, v bool) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, boolint(v)))
+ err := fd.pfd.SetsockoptInt(syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, boolint(v))
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
diff --git a/src/net/sockoptip_posix.go b/src/net/sockoptip_posix.go
index d508860..4e10f2a 100644
--- a/src/net/sockoptip_posix.go
+++ b/src/net/sockoptip_posix.go
@@ -7,7 +7,7 @@
package net
import (
- "os"
+ "runtime"
"syscall"
)
@@ -16,11 +16,9 @@
if err := setIPv4MreqToInterface(mreq, ifi); err != nil {
return err
}
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptIPMreq(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_ADD_MEMBERSHIP, mreq))
+ err := fd.pfd.SetsockoptIPMreq(syscall.IPPROTO_IP, syscall.IP_ADD_MEMBERSHIP, mreq)
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
func setIPv6MulticastInterface(fd *netFD, ifi *Interface) error {
@@ -28,19 +26,15 @@
if ifi != nil {
v = ifi.Index
}
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_IPV6, syscall.IPV6_MULTICAST_IF, v))
+ err := fd.pfd.SetsockoptInt(syscall.IPPROTO_IPV6, syscall.IPV6_MULTICAST_IF, v)
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
func setIPv6MulticastLoopback(fd *netFD, v bool) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_IPV6, syscall.IPV6_MULTICAST_LOOP, boolint(v)))
+ err := fd.pfd.SetsockoptInt(syscall.IPPROTO_IPV6, syscall.IPV6_MULTICAST_LOOP, boolint(v))
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
func joinIPv6Group(fd *netFD, ifi *Interface, ip IP) error {
@@ -49,9 +43,7 @@
if ifi != nil {
mreq.Interface = uint32(ifi.Index)
}
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptIPv6Mreq(fd.sysfd, syscall.IPPROTO_IPV6, syscall.IPV6_JOIN_GROUP, mreq))
+ err := fd.pfd.SetsockoptIPv6Mreq(syscall.IPPROTO_IPV6, syscall.IPV6_JOIN_GROUP, mreq)
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
diff --git a/src/net/sockoptip_windows.go b/src/net/sockoptip_windows.go
index 916debe..6267603 100644
--- a/src/net/sockoptip_windows.go
+++ b/src/net/sockoptip_windows.go
@@ -6,6 +6,7 @@
import (
"os"
+ "runtime"
"syscall"
"unsafe"
)
@@ -17,17 +18,13 @@
}
var a [4]byte
copy(a[:], ip.To4())
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.Setsockopt(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_IF, (*byte)(unsafe.Pointer(&a[0])), 4))
+ err = fd.pfd.Setsockopt(syscall.IPPROTO_IP, syscall.IP_MULTICAST_IF, (*byte)(unsafe.Pointer(&a[0])), 4)
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
func setIPv4MulticastLoopback(fd *netFD, v bool) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, boolint(v)))
+ err := fd.pfd.SetsockoptInt(syscall.IPPROTO_IP, syscall.IP_MULTICAST_LOOP, boolint(v))
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
diff --git a/src/net/sys_cloexec.go b/src/net/sys_cloexec.go
index ba266e6..b7a8425 100644
--- a/src/net/sys_cloexec.go
+++ b/src/net/sys_cloexec.go
@@ -10,6 +10,7 @@
package net
import (
+ "internal/poll"
"os"
"syscall"
)
@@ -28,30 +29,8 @@
return -1, os.NewSyscallError("socket", err)
}
if err = syscall.SetNonblock(s, true); err != nil {
- closeFunc(s)
+ poll.CloseFunc(s)
return -1, os.NewSyscallError("setnonblock", err)
}
return s, nil
}
-
-// Wrapper around the accept system call that marks the returned file
-// descriptor as nonblocking and close-on-exec.
-func accept(s int) (int, syscall.Sockaddr, error) {
- // See ../syscall/exec_unix.go for description of ForkLock.
- // It is probably okay to hold the lock across syscall.Accept
- // because we have put fd.sysfd into non-blocking mode.
- // However, a call to the File method will put it back into
- // blocking mode. We can't take that risk, so no use of ForkLock here.
- ns, sa, err := acceptFunc(s)
- if err == nil {
- syscall.CloseOnExec(ns)
- }
- if err != nil {
- return -1, nil, os.NewSyscallError("accept", err)
- }
- if err = syscall.SetNonblock(ns, true); err != nil {
- closeFunc(ns)
- return -1, nil, os.NewSyscallError("setnonblock", err)
- }
- return ns, sa, nil
-}
diff --git a/src/net/tcpsock.go b/src/net/tcpsock.go
index 69731eb..a544a5b 100644
--- a/src/net/tcpsock.go
+++ b/src/net/tcpsock.go
@@ -255,7 +255,7 @@
if !l.ok() {
return syscall.EINVAL
}
- if err := l.fd.setDeadline(t); err != nil {
+ if err := l.fd.pfd.SetDeadline(t); err != nil {
return &OpError{Op: "set", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return nil
diff --git a/src/net/tcpsockopt_darwin.go b/src/net/tcpsockopt_darwin.go
index 0d1310e..7415c76 100644
--- a/src/net/tcpsockopt_darwin.go
+++ b/src/net/tcpsockopt_darwin.go
@@ -5,7 +5,7 @@
package net
import (
- "os"
+ "runtime"
"syscall"
"time"
)
@@ -13,17 +13,15 @@
const sysTCP_KEEPINTVL = 0x101
func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
// The kernel expects seconds so round to next highest second.
d += (time.Second - time.Nanosecond)
secs := int(d.Seconds())
- switch err := syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_TCP, sysTCP_KEEPINTVL, secs); err {
+ switch err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, sysTCP_KEEPINTVL, secs); err {
case nil, syscall.ENOPROTOOPT: // OS X 10.7 and earlier don't support this option
default:
- return os.NewSyscallError("setsockopt", err)
+ return wrapSyscallError("setsockopt", err)
}
- return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_TCP, syscall.TCP_KEEPALIVE, secs))
+ err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_KEEPALIVE, secs)
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
diff --git a/src/net/tcpsockopt_posix.go b/src/net/tcpsockopt_posix.go
index 805b56b..9cef434 100644
--- a/src/net/tcpsockopt_posix.go
+++ b/src/net/tcpsockopt_posix.go
@@ -7,14 +7,12 @@
package net
import (
- "os"
+ "runtime"
"syscall"
)
func setNoDelay(fd *netFD, noDelay bool) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
- return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_TCP, syscall.TCP_NODELAY, boolint(noDelay)))
+ err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_NODELAY, boolint(noDelay))
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
diff --git a/src/net/tcpsockopt_solaris.go b/src/net/tcpsockopt_solaris.go
index 76285e5..019fe34 100644
--- a/src/net/tcpsockopt_solaris.go
+++ b/src/net/tcpsockopt_solaris.go
@@ -5,16 +5,12 @@
package net
import (
- "os"
+ "runtime"
"syscall"
"time"
)
func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
// The kernel expects milliseconds so round to next highest
// millisecond.
d += (time.Millisecond - time.Nanosecond)
@@ -31,5 +27,7 @@
// allocate a constant with a different meaning for the value of
// TCP_KEEPINTVL on illumos.
- return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_TCP, syscall.TCP_KEEPALIVE_THRESHOLD, msecs))
+ err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_KEEPALIVE_THRESHOLD, msecs)
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
diff --git a/src/net/tcpsockopt_unix.go b/src/net/tcpsockopt_unix.go
index 8d44fb2..c1df660 100644
--- a/src/net/tcpsockopt_unix.go
+++ b/src/net/tcpsockopt_unix.go
@@ -7,21 +7,19 @@
package net
import (
- "os"
+ "runtime"
"syscall"
"time"
)
func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
// The kernel expects seconds so round to next highest second.
d += (time.Second - time.Nanosecond)
secs := int(d.Seconds())
- if err := syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, secs); err != nil {
- return os.NewSyscallError("setsockopt", err)
+ if err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_KEEPINTVL, secs); err != nil {
+ return wrapSyscallError("setsockopt", err)
}
- return os.NewSyscallError("setsockopt", syscall.SetsockoptInt(fd.sysfd, syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, secs))
+ err := fd.pfd.SetsockoptInt(syscall.IPPROTO_TCP, syscall.TCP_KEEPIDLE, secs)
+ runtime.KeepAlive(fd)
+ return wrapSyscallError("setsockopt", err)
}
diff --git a/src/net/tcpsockopt_windows.go b/src/net/tcpsockopt_windows.go
index 45a4dca..73dead1 100644
--- a/src/net/tcpsockopt_windows.go
+++ b/src/net/tcpsockopt_windows.go
@@ -6,16 +6,13 @@
import (
"os"
+ "runtime"
"syscall"
"time"
"unsafe"
)
func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
- if err := fd.incref(); err != nil {
- return err
- }
- defer fd.decref()
// The kernel expects milliseconds so round to next highest
// millisecond.
d += (time.Millisecond - time.Nanosecond)
@@ -27,6 +24,7 @@
}
ret := uint32(0)
size := uint32(unsafe.Sizeof(ka))
- err := syscall.WSAIoctl(fd.sysfd, syscall.SIO_KEEPALIVE_VALS, (*byte)(unsafe.Pointer(&ka)), size, nil, 0, &ret, nil, 0)
+ err := fd.pfd.WSAIoctl(syscall.SIO_KEEPALIVE_VALS, (*byte)(unsafe.Pointer(&ka)), size, nil, 0, &ret, nil, 0)
+ runtime.KeepAlive(fd)
return os.NewSyscallError("wsaioctl", err)
}
diff --git a/src/net/timeout_test.go b/src/net/timeout_test.go
index 55bbf44..9de7801 100644
--- a/src/net/timeout_test.go
+++ b/src/net/timeout_test.go
@@ -6,6 +6,7 @@
import (
"fmt"
+ "internal/poll"
"internal/testenv"
"io"
"io/ioutil"
@@ -145,9 +146,9 @@
}{
// Tests that accept deadlines in the past work, even if
// there's incoming connections available.
- {-5 * time.Second, [2]error{errTimeout, errTimeout}},
+ {-5 * time.Second, [2]error{poll.ErrTimeout, poll.ErrTimeout}},
- {50 * time.Millisecond, [2]error{nil, errTimeout}},
+ {50 * time.Millisecond, [2]error{nil, poll.ErrTimeout}},
}
func TestAcceptTimeout(t *testing.T) {
@@ -299,9 +300,9 @@
}{
// Tests that read deadlines work, even if there's data ready
// to be read.
- {-5 * time.Second, [2]error{errTimeout, errTimeout}},
+ {-5 * time.Second, [2]error{poll.ErrTimeout, poll.ErrTimeout}},
- {50 * time.Millisecond, [2]error{nil, errTimeout}},
+ {50 * time.Millisecond, [2]error{nil, poll.ErrTimeout}},
}
func TestReadTimeout(t *testing.T) {
@@ -423,9 +424,9 @@
}{
// Tests that read deadlines work, even if there's data ready
// to be read.
- {-5 * time.Second, [2]error{errTimeout, errTimeout}},
+ {-5 * time.Second, [2]error{poll.ErrTimeout, poll.ErrTimeout}},
- {50 * time.Millisecond, [2]error{nil, errTimeout}},
+ {50 * time.Millisecond, [2]error{nil, poll.ErrTimeout}},
}
func TestReadFromTimeout(t *testing.T) {
@@ -496,9 +497,9 @@
}{
// Tests that write deadlines work, even if there's buffer
// space available to write.
- {-5 * time.Second, [2]error{errTimeout, errTimeout}},
+ {-5 * time.Second, [2]error{poll.ErrTimeout, poll.ErrTimeout}},
- {10 * time.Millisecond, [2]error{nil, errTimeout}},
+ {10 * time.Millisecond, [2]error{nil, poll.ErrTimeout}},
}
func TestWriteTimeout(t *testing.T) {
@@ -610,9 +611,9 @@
}{
// Tests that write deadlines work, even if there's buffer
// space available to write.
- {-5 * time.Second, [2]error{errTimeout, errTimeout}},
+ {-5 * time.Second, [2]error{poll.ErrTimeout, poll.ErrTimeout}},
- {10 * time.Millisecond, [2]error{nil, errTimeout}},
+ {10 * time.Millisecond, [2]error{nil, poll.ErrTimeout}},
}
func TestWriteToTimeout(t *testing.T) {
diff --git a/src/net/unixsock.go b/src/net/unixsock.go
index b25d492..d29514e 100644
--- a/src/net/unixsock.go
+++ b/src/net/unixsock.go
@@ -264,7 +264,7 @@
if !l.ok() {
return syscall.EINVAL
}
- if err := l.fd.setDeadline(t); err != nil {
+ if err := l.fd.pfd.SetDeadline(t); err != nil {
return &OpError{Op: "set", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return nil
diff --git a/src/net/writev_test.go b/src/net/writev_test.go
index 7160d28..4c05be4 100644
--- a/src/net/writev_test.go
+++ b/src/net/writev_test.go
@@ -7,6 +7,7 @@
import (
"bytes"
"fmt"
+ "internal/poll"
"io"
"io/ioutil"
"reflect"
@@ -99,13 +100,13 @@
}
func testBuffer_writeTo(t *testing.T, chunks int, useCopy bool) {
- oldHook := testHookDidWritev
- defer func() { testHookDidWritev = oldHook }()
+ oldHook := poll.TestHookDidWritev
+ defer func() { poll.TestHookDidWritev = oldHook }()
var writeLog struct {
sync.Mutex
log []int
}
- testHookDidWritev = func(size int) {
+ poll.TestHookDidWritev = func(size int) {
writeLog.Lock()
writeLog.log = append(writeLog.log, size)
writeLog.Unlock()
diff --git a/src/net/writev_unix.go b/src/net/writev_unix.go
index 174e6bc..bf0fbf8 100644
--- a/src/net/writev_unix.go
+++ b/src/net/writev_unix.go
@@ -7,10 +7,8 @@
package net
import (
- "io"
- "os"
+ "runtime"
"syscall"
- "unsafe"
)
func (c *conn) writeBuffers(v *Buffers) (int64, error) {
@@ -25,71 +23,7 @@
}
func (fd *netFD) writeBuffers(v *Buffers) (n int64, err error) {
- if err := fd.writeLock(); err != nil {
- return 0, err
- }
- defer fd.writeUnlock()
- if err := fd.pd.prepareWrite(); err != nil {
- return 0, err
- }
-
- var iovecs []syscall.Iovec
- if fd.iovecs != nil {
- iovecs = *fd.iovecs
- }
- // TODO: read from sysconf(_SC_IOV_MAX)? The Linux default is
- // 1024 and this seems conservative enough for now. Darwin's
- // UIO_MAXIOV also seems to be 1024.
- maxVec := 1024
-
- for len(*v) > 0 {
- iovecs = iovecs[:0]
- for _, chunk := range *v {
- if len(chunk) == 0 {
- continue
- }
- iovecs = append(iovecs, syscall.Iovec{Base: &chunk[0]})
- if fd.isStream && len(chunk) > 1<<30 {
- iovecs[len(iovecs)-1].SetLen(1 << 30)
- break // continue chunk on next writev
- }
- iovecs[len(iovecs)-1].SetLen(len(chunk))
- if len(iovecs) == maxVec {
- break
- }
- }
- if len(iovecs) == 0 {
- break
- }
- fd.iovecs = &iovecs // cache
-
- wrote, _, e0 := syscall.Syscall(syscall.SYS_WRITEV,
- uintptr(fd.sysfd),
- uintptr(unsafe.Pointer(&iovecs[0])),
- uintptr(len(iovecs)))
- if wrote == ^uintptr(0) {
- wrote = 0
- }
- testHookDidWritev(int(wrote))
- n += int64(wrote)
- v.consume(int64(wrote))
- if e0 == syscall.EAGAIN {
- if err = fd.pd.waitWrite(); err == nil {
- continue
- }
- } else if e0 != 0 {
- err = syscall.Errno(e0)
- }
- if err != nil {
- break
- }
- if n == 0 {
- err = io.ErrUnexpectedEOF
- break
- }
- }
- if _, ok := err.(syscall.Errno); ok {
- err = os.NewSyscallError("writev", err)
- }
- return n, err
+ n, err = fd.pfd.Writev((*[][]byte)(v))
+ runtime.KeepAlive(fd)
+ return n, wrapSyscallError("writev", err)
}