blob: 63a8fbc44898a202ee57613f8d64443d3074b21b [file] [log] [blame]
// Copyright 2010 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package net
import (
"os"
"runtime"
"sync"
"syscall"
"time"
"unsafe"
)
type InvalidConnError struct{}
func (e *InvalidConnError) String() string { return "invalid net.Conn" }
func (e *InvalidConnError) Temporary() bool { return false }
func (e *InvalidConnError) Timeout() bool { return false }
var initErr os.Error
func init() {
var d syscall.WSAData
e := syscall.WSAStartup(uint32(0x101), &d)
if e != 0 {
initErr = os.NewSyscallError("WSAStartup", e)
}
}
func closesocket(s int) (errno int) {
return syscall.Closesocket(int32(s))
}
// Interface for all io operations.
type anOpIface interface {
Op() *anOp
Name() string
Submit() (errno int)
}
// IO completion result parameters.
type ioResult struct {
qty uint32
err int
}
// anOp implements functionality common to all io operations.
type anOp struct {
// Used by IOCP interface, it must be first field
// of the struct, as our code rely on it.
o syscall.Overlapped
resultc chan ioResult // io completion results
errnoc chan int // io submit / cancel operation errors
fd *netFD
}
func (o *anOp) Init(fd *netFD) {
o.fd = fd
o.resultc = make(chan ioResult, 1)
o.errnoc = make(chan int)
}
func (o *anOp) Op() *anOp {
return o
}
// bufOp is used by io operations that read / write
// data from / to client buffer.
type bufOp struct {
anOp
buf syscall.WSABuf
}
func (o *bufOp) Init(fd *netFD, buf []byte) {
o.anOp.Init(fd)
o.buf.Len = uint32(len(buf))
if len(buf) == 0 {
o.buf.Buf = nil
} else {
o.buf.Buf = (*byte)(unsafe.Pointer(&buf[0]))
}
}
// resultSrv will retreive all io completion results from
// iocp and send them to the correspondent waiting client
// goroutine via channel supplied in the request.
type resultSrv struct {
iocp int32
}
func (s *resultSrv) Run() {
var o *syscall.Overlapped
var key uint32
var r ioResult
for {
r.err = syscall.GetQueuedCompletionStatus(s.iocp, &(r.qty), &key, &o, syscall.INFINITE)
switch {
case r.err == 0:
// Dequeued successfully completed io packet.
case r.err == syscall.WAIT_TIMEOUT && o == nil:
// Wait has timed out (should not happen now, but might be used in the future).
panic("GetQueuedCompletionStatus timed out")
case o == nil:
// Failed to dequeue anything -> report the error.
panic("GetQueuedCompletionStatus failed " + syscall.Errstr(r.err))
default:
// Dequeued failed io packet.
}
(*anOp)(unsafe.Pointer(o)).resultc <- r
}
}
// ioSrv executes net io requests.
type ioSrv struct {
submchan chan anOpIface // submit io requests
canchan chan anOpIface // cancel io requests
}
// ProcessRemoteIO will execute submit io requests on behalf
// of other goroutines, all on a single os thread, so it can
// cancel them later. Results of all operations will be sent
// back to their requesters via channel supplied in request.
func (s *ioSrv) ProcessRemoteIO() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
for {
select {
case o := <-s.submchan:
o.Op().errnoc <- o.Submit()
case o := <-s.canchan:
o.Op().errnoc <- syscall.CancelIo(uint32(o.Op().fd.sysfd))
}
}
}
// ExecIO executes a single io operation. It either executes it
// inline, or, if timeouts are employed, passes the request onto
// a special goroutine and waits for completion or cancels request.
func (s *ioSrv) ExecIO(oi anOpIface, deadline_delta int64) (n int, err os.Error) {
var e int
o := oi.Op()
if deadline_delta > 0 {
// Send request to a special dedicated thread,
// so it can stop the io with CancelIO later.
s.submchan <- oi
e = <-o.errnoc
} else {
e = oi.Submit()
}
switch e {
case 0:
// IO completed immediately, but we need to get our completion message anyway.
case syscall.ERROR_IO_PENDING:
// IO started, and we have to wait for it's completion.
default:
return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, os.Errno(e)}
}
// Wait for our request to complete.
var r ioResult
if deadline_delta > 0 {
select {
case r = <-o.resultc:
case <-time.After(deadline_delta):
s.canchan <- oi
<-o.errnoc
r = <-o.resultc
if r.err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
r.err = syscall.EWOULDBLOCK
}
}
} else {
r = <-o.resultc
}
if r.err != 0 {
err = &OpError{oi.Name(), o.fd.net, o.fd.laddr, os.Errno(r.err)}
}
return int(r.qty), err
}
// Start helper goroutines.
var resultsrv *resultSrv
var iosrv *ioSrv
var onceStartServer sync.Once
func startServer() {
resultsrv = new(resultSrv)
var errno int
resultsrv.iocp, errno = syscall.CreateIoCompletionPort(-1, 0, 0, 1)
if errno != 0 {
panic("CreateIoCompletionPort failed " + syscall.Errstr(errno))
}
go resultsrv.Run()
iosrv = new(ioSrv)
iosrv.submchan = make(chan anOpIface)
iosrv.canchan = make(chan anOpIface)
go iosrv.ProcessRemoteIO()
}
// Network file descriptor.
type netFD struct {
// locking/lifetime of sysfd
sysmu sync.Mutex
sysref int
closing bool
// immutable until Close
sysfd int
family int
proto int
net string
laddr Addr
raddr Addr
// owned by client
rdeadline_delta int64
rdeadline int64
rio sync.Mutex
wdeadline_delta int64
wdeadline int64
wio sync.Mutex
}
func allocFD(fd, family, proto int, net string, laddr, raddr Addr) (f *netFD) {
f = &netFD{
sysfd: fd,
family: family,
proto: proto,
net: net,
laddr: laddr,
raddr: raddr,
}
runtime.SetFinalizer(f, (*netFD).Close)
return f
}
func newFD(fd, family, proto int, net string, laddr, raddr Addr) (f *netFD, err os.Error) {
if initErr != nil {
return nil, initErr
}
onceStartServer.Do(startServer)
// Associate our socket with resultsrv.iocp.
if _, e := syscall.CreateIoCompletionPort(int32(fd), resultsrv.iocp, 0, 0); e != 0 {
return nil, &OpError{"CreateIoCompletionPort", net, laddr, os.Errno(e)}
}
return allocFD(fd, family, proto, net, laddr, raddr), nil
}
// Add a reference to this fd.
func (fd *netFD) incref() {
fd.sysmu.Lock()
fd.sysref++
fd.sysmu.Unlock()
}
// Remove a reference to this FD and close if we've been asked to do so (and
// there are no references left.
func (fd *netFD) decref() {
fd.sysmu.Lock()
fd.sysref--
if fd.closing && fd.sysref == 0 && fd.sysfd >= 0 {
// In case the user has set linger, switch to blocking mode so
// the close blocks. As long as this doesn't happen often, we
// can handle the extra OS processes. Otherwise we'll need to
// use the resultsrv for Close too. Sigh.
syscall.SetNonblock(fd.sysfd, false)
closesocket(fd.sysfd)
fd.sysfd = -1
// no need for a finalizer anymore
runtime.SetFinalizer(fd, nil)
}
fd.sysmu.Unlock()
}
func (fd *netFD) Close() os.Error {
if fd == nil || fd.sysfd == -1 {
return os.EINVAL
}
fd.incref()
syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR)
fd.closing = true
fd.decref()
return nil
}
// Read from network.
type readOp struct {
bufOp
}
func (o *readOp) Submit() (errno int) {
var d, f uint32
return syscall.WSARecv(uint32(o.fd.sysfd), &o.buf, 1, &d, &f, &o.o, nil)
}
func (o *readOp) Name() string {
return "WSARecv"
}
func (fd *netFD) Read(buf []byte) (n int, err os.Error) {
if fd == nil {
return 0, os.EINVAL
}
fd.rio.Lock()
defer fd.rio.Unlock()
fd.incref()
defer fd.decref()
if fd.sysfd == -1 {
return 0, os.EINVAL
}
var o readOp
o.Init(fd, buf)
n, err = iosrv.ExecIO(&o, fd.rdeadline_delta)
if err == nil && n == 0 {
err = os.EOF
}
return
}
// ReadFrom from network.
type readFromOp struct {
bufOp
rsa syscall.RawSockaddrAny
}
func (o *readFromOp) Submit() (errno int) {
var d, f uint32
l := int32(unsafe.Sizeof(o.rsa))
return syscall.WSARecvFrom(uint32(o.fd.sysfd), &o.buf, 1, &d, &f, &o.rsa, &l, &o.o, nil)
}
func (o *readFromOp) Name() string {
return "WSARecvFrom"
}
func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err os.Error) {
if fd == nil {
return 0, nil, os.EINVAL
}
if len(buf) == 0 {
return 0, nil, nil
}
fd.rio.Lock()
defer fd.rio.Unlock()
fd.incref()
defer fd.decref()
if fd.sysfd == -1 {
return 0, nil, os.EINVAL
}
var o readFromOp
o.Init(fd, buf)
n, err = iosrv.ExecIO(&o, fd.rdeadline_delta)
sa, _ = o.rsa.Sockaddr()
return
}
// Write to network.
type writeOp struct {
bufOp
}
func (o *writeOp) Submit() (errno int) {
var d uint32
return syscall.WSASend(uint32(o.fd.sysfd), &o.buf, 1, &d, 0, &o.o, nil)
}
func (o *writeOp) Name() string {
return "WSASend"
}
func (fd *netFD) Write(buf []byte) (n int, err os.Error) {
if fd == nil {
return 0, os.EINVAL
}
fd.wio.Lock()
defer fd.wio.Unlock()
fd.incref()
defer fd.decref()
if fd.sysfd == -1 {
return 0, os.EINVAL
}
var o writeOp
o.Init(fd, buf)
return iosrv.ExecIO(&o, fd.wdeadline_delta)
}
// WriteTo to network.
type writeToOp struct {
bufOp
sa syscall.Sockaddr
}
func (o *writeToOp) Submit() (errno int) {
var d uint32
return syscall.WSASendto(uint32(o.fd.sysfd), &o.buf, 1, &d, 0, o.sa, &o.o, nil)
}
func (o *writeToOp) Name() string {
return "WSASendto"
}
func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (n int, err os.Error) {
if fd == nil {
return 0, os.EINVAL
}
if len(buf) == 0 {
return 0, nil
}
fd.wio.Lock()
defer fd.wio.Unlock()
fd.incref()
defer fd.decref()
if fd.sysfd == -1 {
return 0, os.EINVAL
}
var o writeToOp
o.Init(fd, buf)
o.sa = sa
return iosrv.ExecIO(&o, fd.wdeadline_delta)
}
// Accept new network connections.
type acceptOp struct {
anOp
newsock int
attrs [2]syscall.RawSockaddrAny // space for local and remote address only
}
func (o *acceptOp) Submit() (errno int) {
var d uint32
l := uint32(unsafe.Sizeof(o.attrs[0]))
return syscall.AcceptEx(uint32(o.fd.sysfd), uint32(o.newsock),
(*byte)(unsafe.Pointer(&o.attrs[0])), 0, l, l, &d, &o.o)
}
func (o *acceptOp) Name() string {
return "AcceptEx"
}
func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (nfd *netFD, err os.Error) {
if fd == nil || fd.sysfd == -1 {
return nil, os.EINVAL
}
fd.incref()
defer fd.decref()
// Get new socket.
// See ../syscall/exec.go for description of ForkLock.
syscall.ForkLock.RLock()
s, e := syscall.Socket(fd.family, fd.proto, 0)
if e != 0 {
syscall.ForkLock.RUnlock()
return nil, os.Errno(e)
}
syscall.CloseOnExec(s)
syscall.ForkLock.RUnlock()
// Associate our new socket with IOCP.
onceStartServer.Do(startServer)
if _, e = syscall.CreateIoCompletionPort(int32(s), resultsrv.iocp, 0, 0); e != 0 {
return nil, &OpError{"CreateIoCompletionPort", fd.net, fd.laddr, os.Errno(e)}
}
// Submit accept request.
var o acceptOp
o.Init(fd)
o.newsock = s
_, err = iosrv.ExecIO(&o, 0)
if err != nil {
closesocket(s)
return nil, err
}
// Inherit properties of the listening socket.
e = syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, fd.sysfd)
if e != 0 {
closesocket(s)
return nil, err
}
// Get local and peer addr out of AcceptEx buffer.
var lrsa, rrsa *syscall.RawSockaddrAny
var llen, rlen int32
l := uint32(unsafe.Sizeof(*lrsa))
syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&o.attrs[0])),
0, l, l, &lrsa, &llen, &rrsa, &rlen)
lsa, _ := lrsa.Sockaddr()
rsa, _ := rrsa.Sockaddr()
return allocFD(s, fd.family, fd.proto, fd.net, toAddr(lsa), toAddr(rsa)), nil
}
// Not implemeted functions.
func (fd *netFD) dup() (f *os.File, err os.Error) {
// TODO: Implement this
return nil, os.NewSyscallError("dup", syscall.EWINDOWS)
}
func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err os.Error) {
return 0, 0, 0, nil, os.EAFNOSUPPORT
}
func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err os.Error) {
return 0, 0, os.EAFNOSUPPORT
}