blob: d774dce3034c1781fa657d82885642b370cb30e9 [file] [log] [blame]
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build darwin || dragonfly || freebsd || netbsd || openbsd
package runtime
// Integrated network poller (kqueue-based implementation).
import (
"internal/goarch"
"runtime/internal/atomic"
"unsafe"
)
var (
kq int32 = -1
netpollBreakRd, netpollBreakWr uintptr // for netpollBreak
netpollWakeSig atomic.Uint32 // used to avoid duplicate calls of netpollBreak
)
func netpollinit() {
kq = kqueue()
if kq < 0 {
println("runtime: kqueue failed with", -kq)
throw("runtime: netpollinit failed")
}
closeonexec(kq)
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := keventt{
filter: _EVFILT_READ,
flags: _EV_ADD,
}
*(*uintptr)(unsafe.Pointer(&ev.ident)) = uintptr(r)
n := kevent(kq, &ev, 1, nil, 0, nil)
if n < 0 {
println("runtime: kevent failed with", -n)
throw("runtime: kevent failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
func netpollIsPollDescriptor(fd uintptr) bool {
return fd == uintptr(kq) || fd == netpollBreakRd || fd == netpollBreakWr
}
func netpollopen(fd uintptr, pd *pollDesc) int32 {
// Arm both EVFILT_READ and EVFILT_WRITE in edge-triggered mode (EV_CLEAR)
// for the whole fd lifetime. The notifications are automatically unregistered
// when fd is closed.
var ev [2]keventt
*(*uintptr)(unsafe.Pointer(&ev[0].ident)) = fd
ev[0].filter = _EVFILT_READ
ev[0].flags = _EV_ADD | _EV_CLEAR
ev[0].fflags = 0
ev[0].data = 0
if goarch.PtrSize == 4 {
// We only have a pointer-sized field to store into,
// so on a 32-bit system we get no sequence protection.
// TODO(iant): If we notice any problems we could at least
// steal the low-order 2 bits for a tiny sequence number.
ev[0].udata = (*byte)(unsafe.Pointer(pd))
} else {
tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
ev[0].udata = (*byte)(unsafe.Pointer(uintptr(tp)))
}
ev[1] = ev[0]
ev[1].filter = _EVFILT_WRITE
n := kevent(kq, &ev[0], 2, nil, 0, nil)
if n < 0 {
return -n
}
return 0
}
func netpollclose(fd uintptr) int32 {
// Don't need to unregister because calling close()
// on fd will remove any kevents that reference the descriptor.
return 0
}
func netpollarm(pd *pollDesc, mode int) {
throw("runtime: unused")
}
// netpollBreak interrupts a kevent.
func netpollBreak() {
// Failing to cas indicates there is an in-flight wakeup, so we're done here.
if !netpollWakeSig.CompareAndSwap(0, 1) {
return
}
for {
var b byte
n := write(netpollBreakWr, unsafe.Pointer(&b), 1)
if n == 1 || n == -_EAGAIN {
break
}
if n == -_EINTR {
continue
}
println("runtime: netpollBreak write failed with", -n)
throw("runtime: netpollBreak write failed")
}
}
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) (gList, int32) {
if kq == -1 {
return gList{}, 0
}
var tp *timespec
var ts timespec
if delay < 0 {
tp = nil
} else if delay == 0 {
tp = &ts
} else {
ts.setNsec(delay)
if ts.tv_sec > 1e6 {
// Darwin returns EINVAL if the sleep time is too long.
ts.tv_sec = 1e6
}
tp = &ts
}
var events [64]keventt
retry:
n := kevent(kq, nil, 0, &events[0], int32(len(events)), tp)
if n < 0 {
// Ignore the ETIMEDOUT error for now, but try to dive deep and
// figure out what really happened with n == ETIMEOUT,
// see https://go.dev/issue/59679 for details.
if n != -_EINTR && n != -_ETIMEDOUT {
println("runtime: kevent on fd", kq, "failed with", -n)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if delay > 0 {
return gList{}, 0
}
goto retry
}
var toRun gList
delta := int32(0)
for i := 0; i < int(n); i++ {
ev := &events[i]
if uintptr(ev.ident) == netpollBreakRd {
if ev.filter != _EVFILT_READ {
println("runtime: netpoll: break fd ready for", ev.filter)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
netpollWakeSig.Store(0)
}
continue
}
var mode int32
switch ev.filter {
case _EVFILT_READ:
mode += 'r'
// On some systems when the read end of a pipe
// is closed the write end will not get a
// _EVFILT_WRITE event, but will get a
// _EVFILT_READ event with EV_EOF set.
// Note that setting 'w' here just means that we
// will wake up a goroutine waiting to write;
// that goroutine will try the write again,
// and the appropriate thing will happen based
// on what that write returns (success, EPIPE, EAGAIN).
if ev.flags&_EV_EOF != 0 {
mode += 'w'
}
case _EVFILT_WRITE:
mode += 'w'
}
if mode != 0 {
var pd *pollDesc
var tag uintptr
if goarch.PtrSize == 4 {
// No sequence protection on 32-bit systems.
// See netpollopen for details.
pd = (*pollDesc)(unsafe.Pointer(ev.udata))
tag = 0
} else {
tp := taggedPointer(uintptr(unsafe.Pointer(ev.udata)))
pd = (*pollDesc)(tp.pointer())
tag = tp.tag()
if pd.fdseq.Load() != tag {
continue
}
}
pd.setEventErr(ev.flags == _EV_ERROR, tag)
delta += netpollready(&toRun, pd, mode)
}
}
return toRun, delta
}