blob: 60b59df2e36a3f73f39665fc264a0d52329b772e [file] [log] [blame]
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build aix || darwin || dragonfly || freebsd || (js && wasm) || linux || netbsd || openbsd || solaris
// +build aix darwin dragonfly freebsd js,wasm linux netbsd openbsd solaris
package poll
import (
"io"
"sync/atomic"
"syscall"
)
// FD is a file descriptor. The net and os packages use this type as a
// field of a larger type representing a network connection or OS file.
type FD struct {
// Lock sysfd and serialize access to Read and Write methods.
fdmu fdMutex
// System file descriptor. Immutable until Close.
Sysfd int
// I/O poller.
pd pollDesc
// Writev cache.
iovecs *[]syscall.Iovec
// Semaphore signaled when file is closed.
csema uint32
// Non-zero if this file has been set to blocking mode.
isBlocking uint32
// Whether this is a streaming descriptor, as opposed to a
// packet-based descriptor like a UDP socket. Immutable.
IsStream bool
// Whether a zero byte read indicates EOF. This is false for a
// message based socket connection.
ZeroReadIsEOF bool
// Whether this is a file rather than a network socket.
isFile bool
}
// Init initializes the FD. The Sysfd field should already be set.
// This can be called multiple times on a single FD.
// The net argument is a network name from the net package (e.g., "tcp"),
// or "file".
// Set pollable to true if fd should be managed by runtime netpoll.
func (fd *FD) Init(net string, pollable bool) error {
// We don't actually care about the various network types.
if net == "file" {
fd.isFile = true
}
if !pollable {
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd)
if err != nil {
// If we could not initialize the runtime poller,
// assume we are using blocking mode.
fd.isBlocking = 1
}
return err
}
// Destroy closes the file descriptor. This is called when there are
// no remaining references.
func (fd *FD) destroy() error {
// Poller may want to unregister fd in readiness notification mechanism,
// so this must be executed before CloseFunc.
fd.pd.close()
// We don't use ignoringEINTR here because POSIX does not define
// whether the descriptor is closed if close returns EINTR.
// If the descriptor is indeed closed, using a loop would race
// with some other goroutine opening a new descriptor.
// (The Linux kernel guarantees that it is closed on an EINTR error.)
err := CloseFunc(fd.Sysfd)
fd.Sysfd = -1
runtime_Semrelease(&fd.csema)
return err
}
// Close closes the FD. The underlying file descriptor is closed by the
// destroy method when there are no remaining references.
func (fd *FD) Close() error {
if !fd.fdmu.increfAndClose() {
return errClosing(fd.isFile)
}
// Unblock any I/O. Once it all unblocks and returns,
// so that it cannot be referring to fd.sysfd anymore,
// the final decref will close fd.sysfd. This should happen
// fairly quickly, since all the I/O is non-blocking, and any
// attempts to block in the pollDesc will return errClosing(fd.isFile).
fd.pd.evict()
// The call to decref will call destroy if there are no other
// references.
err := fd.decref()
// Wait until the descriptor is closed. If this was the only
// reference, it is already closed. Only wait if the file has
// not been set to blocking mode, as otherwise any current I/O
// may be blocking, and that would block the Close.
// No need for an atomic read of isBlocking, increfAndClose means
// we have exclusive access to fd.
if fd.isBlocking == 0 {
runtime_Semacquire(&fd.csema)
}
return err
}
// SetBlocking puts the file into blocking mode.
func (fd *FD) SetBlocking() error {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
// Atomic store so that concurrent calls to SetBlocking
// do not cause a race condition. isBlocking only ever goes
// from 0 to 1 so there is no real race here.
atomic.StoreUint32(&fd.isBlocking, 1)
return syscall.SetNonblock(fd.Sysfd, false)
}
// Darwin and FreeBSD can't read or write 2GB+ files at a time,
// even on 64-bit systems.
// The same is true of socket implementations on many systems.
// See golang.org/issue/7812 and golang.org/issue/16266.
// Use 1GB instead of, say, 2GB-1, to keep subsequent reads aligned.
const maxRW = 1 << 30
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if len(p) == 0 {
// If the caller wanted a zero byte read, return immediately
// without trying (but after acquiring the readLock).
// Otherwise syscall.Read returns 0, nil which looks like
// io.EOF.
// TODO(bradfitz): make it wait for readability? (Issue 15735)
return 0, nil
}
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
for {
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
// Pread wraps the pread system call.
func (fd *FD) Pread(p []byte, off int64) (int, error) {
// Call incref, not readLock, because since pread specifies the
// offset it is independent from other reads.
// Similarly, using the poller doesn't make sense for pread.
if err := fd.incref(); err != nil {
return 0, err
}
if fd.IsStream && len(p) > maxRW {
p = p[:maxRW]
}
var (
n int
err error
)
for {
n, err = syscall.Pread(fd.Sysfd, p, off)
if err != syscall.EINTR {
break
}
}
if err != nil {
n = 0
}
fd.decref()
err = fd.eofError(n, err)
return n, err
}
// ReadFrom wraps the recvfrom network call.
func (fd *FD) ReadFrom(p []byte) (int, syscall.Sockaddr, error) {
if err := fd.readLock(); err != nil {
return 0, nil, err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, nil, err
}
for {
n, sa, err := syscall.Recvfrom(fd.Sysfd, p, 0)
if err != nil {
if err == syscall.EINTR {
continue
}
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, sa, err
}
}
// ReadFrom wraps the recvfrom network call for IPv4.
func (fd *FD) ReadFromInet4(p []byte, from *syscall.SockaddrInet4) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
for {
n, err := syscall.RecvfromInet4(fd.Sysfd, p, 0, from)
if err != nil {
if err == syscall.EINTR {
continue
}
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
// ReadFrom wraps the recvfrom network call for IPv6.
func (fd *FD) ReadFromInet6(p []byte, from *syscall.SockaddrInet6) (int, error) {
if err := fd.readLock(); err != nil {
return 0, err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, err
}
for {
n, err := syscall.RecvfromInet6(fd.Sysfd, p, 0, from)
if err != nil {
if err == syscall.EINTR {
continue
}
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}
// ReadMsg wraps the recvmsg network call.
func (fd *FD) ReadMsg(p []byte, oob []byte, flags int) (int, int, int, syscall.Sockaddr, error) {
if err := fd.readLock(); err != nil {
return 0, 0, 0, nil, err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return 0, 0, 0, nil, err
}
for {
n, oobn, sysflags, sa, err := syscall.Recvmsg(fd.Sysfd, p, oob, flags)
if err != nil {
if err == syscall.EINTR {
continue
}
// TODO(dfc) should n and oobn be set to 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, oobn, sysflags, sa, err
}
}
// Write implements io.Writer.
func (fd *FD) Write(p []byte) (int, error) {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
if err := fd.pd.prepareWrite(fd.isFile); err != nil {
return 0, err
}
var nn int
for {
max := len(p)
if fd.IsStream && max-nn > maxRW {
max = nn + maxRW
}
n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
if n > 0 {
nn += n
}
if nn == len(p) {
return nn, err
}
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
if err != nil {
return nn, err
}
if n == 0 {
return nn, io.ErrUnexpectedEOF
}
}
}
// Pwrite wraps the pwrite system call.
func (fd *FD) Pwrite(p []byte, off int64) (int, error) {
// Call incref, not writeLock, because since pwrite specifies the
// offset it is independent from other writes.
// Similarly, using the poller doesn't make sense for pwrite.
if err := fd.incref(); err != nil {
return 0, err
}
defer fd.decref()
var nn int
for {
max := len(p)
if fd.IsStream && max-nn > maxRW {
max = nn + maxRW
}
n, err := syscall.Pwrite(fd.Sysfd, p[nn:max], off+int64(nn))
if err == syscall.EINTR {
continue
}
if n > 0 {
nn += n
}
if nn == len(p) {
return nn, err
}
if err != nil {
return nn, err
}
if n == 0 {
return nn, io.ErrUnexpectedEOF
}
}
}
// WriteToInet4 wraps the sendto network call for IPv4 addresses.
func (fd *FD) WriteToInet4(p []byte, sa syscall.SockaddrInet4) (int, error) {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
if err := fd.pd.prepareWrite(fd.isFile); err != nil {
return 0, err
}
for {
err := syscall.SendtoInet4(fd.Sysfd, p, 0, sa)
if err == syscall.EINTR {
continue
}
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
if err != nil {
return 0, err
}
return len(p), nil
}
}
// WriteToInet6 wraps the sendto network call for IPv6 addresses.
func (fd *FD) WriteToInet6(p []byte, sa syscall.SockaddrInet6) (int, error) {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
if err := fd.pd.prepareWrite(fd.isFile); err != nil {
return 0, err
}
for {
err := syscall.SendtoInet6(fd.Sysfd, p, 0, sa)
if err == syscall.EINTR {
continue
}
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
if err != nil {
return 0, err
}
return len(p), nil
}
}
// WriteTo wraps the sendto network call.
func (fd *FD) WriteTo(p []byte, sa syscall.Sockaddr) (int, error) {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
if err := fd.pd.prepareWrite(fd.isFile); err != nil {
return 0, err
}
for {
err := syscall.Sendto(fd.Sysfd, p, 0, sa)
if err == syscall.EINTR {
continue
}
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
if err != nil {
return 0, err
}
return len(p), nil
}
}
// WriteMsg wraps the sendmsg network call.
func (fd *FD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (int, int, error) {
if err := fd.writeLock(); err != nil {
return 0, 0, err
}
defer fd.writeUnlock()
if err := fd.pd.prepareWrite(fd.isFile); err != nil {
return 0, 0, err
}
for {
n, err := syscall.SendmsgN(fd.Sysfd, p, oob, sa, 0)
if err == syscall.EINTR {
continue
}
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitWrite(fd.isFile); err == nil {
continue
}
}
if err != nil {
return n, 0, err
}
return n, len(oob), err
}
}
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
case syscall.EAGAIN:
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}
// Seek wraps syscall.Seek.
func (fd *FD) Seek(offset int64, whence int) (int64, error) {
if err := fd.incref(); err != nil {
return 0, err
}
defer fd.decref()
return syscall.Seek(fd.Sysfd, offset, whence)
}
// ReadDirent wraps syscall.ReadDirent.
// We treat this like an ordinary system call rather than a call
// that tries to fill the buffer.
func (fd *FD) ReadDirent(buf []byte) (int, error) {
if err := fd.incref(); err != nil {
return 0, err
}
defer fd.decref()
for {
n, err := ignoringEINTRIO(syscall.ReadDirent, fd.Sysfd, buf)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
// Do not call eofError; caller does not expect to see io.EOF.
return n, err
}
}
// Fchmod wraps syscall.Fchmod.
func (fd *FD) Fchmod(mode uint32) error {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
return ignoringEINTR(func() error {
return syscall.Fchmod(fd.Sysfd, mode)
})
}
// Fchdir wraps syscall.Fchdir.
func (fd *FD) Fchdir() error {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
return syscall.Fchdir(fd.Sysfd)
}
// Fstat wraps syscall.Fstat
func (fd *FD) Fstat(s *syscall.Stat_t) error {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
return ignoringEINTR(func() error {
return syscall.Fstat(fd.Sysfd, s)
})
}
// tryDupCloexec indicates whether F_DUPFD_CLOEXEC should be used.
// If the kernel doesn't support it, this is set to 0.
var tryDupCloexec = int32(1)
// DupCloseOnExec dups fd and marks it close-on-exec.
func DupCloseOnExec(fd int) (int, string, error) {
if syscall.F_DUPFD_CLOEXEC != 0 && atomic.LoadInt32(&tryDupCloexec) == 1 {
r0, e1 := fcntl(fd, syscall.F_DUPFD_CLOEXEC, 0)
if e1 == nil {
return r0, "", nil
}
switch e1.(syscall.Errno) {
case syscall.EINVAL, syscall.ENOSYS:
// Old kernel, or js/wasm (which returns
// ENOSYS). Fall back to the portable way from
// now on.
atomic.StoreInt32(&tryDupCloexec, 0)
default:
return -1, "fcntl", e1
}
}
return dupCloseOnExecOld(fd)
}
// dupCloseOnExecOld is the traditional way to dup an fd and
// set its O_CLOEXEC bit, using two system calls.
func dupCloseOnExecOld(fd int) (int, string, error) {
syscall.ForkLock.RLock()
defer syscall.ForkLock.RUnlock()
newfd, err := syscall.Dup(fd)
if err != nil {
return -1, "dup", err
}
syscall.CloseOnExec(newfd)
return newfd, "", nil
}
// Dup duplicates the file descriptor.
func (fd *FD) Dup() (int, string, error) {
if err := fd.incref(); err != nil {
return -1, "", err
}
defer fd.decref()
return DupCloseOnExec(fd.Sysfd)
}
// On Unix variants only, expose the IO event for the net code.
// WaitWrite waits until data can be read from fd.
func (fd *FD) WaitWrite() error {
return fd.pd.waitWrite(fd.isFile)
}
// WriteOnce is for testing only. It makes a single write call.
func (fd *FD) WriteOnce(p []byte) (int, error) {
if err := fd.writeLock(); err != nil {
return 0, err
}
defer fd.writeUnlock()
return ignoringEINTRIO(syscall.Write, fd.Sysfd, p)
}
// RawRead invokes the user-defined function f for a read operation.
func (fd *FD) RawRead(f func(uintptr) bool) error {
if err := fd.readLock(); err != nil {
return err
}
defer fd.readUnlock()
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return err
}
for {
if f(uintptr(fd.Sysfd)) {
return nil
}
if err := fd.pd.waitRead(fd.isFile); err != nil {
return err
}
}
}
// RawWrite invokes the user-defined function f for a write operation.
func (fd *FD) RawWrite(f func(uintptr) bool) error {
if err := fd.writeLock(); err != nil {
return err
}
defer fd.writeUnlock()
if err := fd.pd.prepareWrite(fd.isFile); err != nil {
return err
}
for {
if f(uintptr(fd.Sysfd)) {
return nil
}
if err := fd.pd.waitWrite(fd.isFile); err != nil {
return err
}
}
}
// ignoringEINTRIO is like ignoringEINTR, but just for IO calls.
func ignoringEINTRIO(fn func(fd int, p []byte) (int, error), fd int, p []byte) (int, error) {
for {
n, err := fn(fd, p)
if err != syscall.EINTR {
return n, err
}
}
}