| // Copyright 2010 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package net |
| |
| import ( |
| "os" |
| "runtime" |
| "sync" |
| "syscall" |
| "time" |
| "unsafe" |
| ) |
| |
| type InvalidConnError struct{} |
| |
| func (e *InvalidConnError) String() string { return "invalid net.Conn" } |
| func (e *InvalidConnError) Temporary() bool { return false } |
| func (e *InvalidConnError) Timeout() bool { return false } |
| |
| var initErr os.Error |
| |
| func init() { |
| var d syscall.WSAData |
| e := syscall.WSAStartup(uint32(0x101), &d) |
| if e != 0 { |
| initErr = os.NewSyscallError("WSAStartup", e) |
| } |
| } |
| |
| func closesocket(s int) (errno int) { |
| return syscall.Closesocket(int32(s)) |
| } |
| |
| // Interface for all io operations. |
| type anOpIface interface { |
| Op() *anOp |
| Name() string |
| Submit() (errno int) |
| } |
| |
| // IO completion result parameters. |
| type ioResult struct { |
| qty uint32 |
| err int |
| } |
| |
| // anOp implements functionality common to all io operations. |
| type anOp struct { |
| // Used by IOCP interface, it must be first field |
| // of the struct, as our code rely on it. |
| o syscall.Overlapped |
| |
| resultc chan ioResult // io completion results |
| errnoc chan int // io submit / cancel operation errors |
| fd *netFD |
| } |
| |
| func (o *anOp) Init(fd *netFD) { |
| o.fd = fd |
| o.resultc = make(chan ioResult, 1) |
| o.errnoc = make(chan int) |
| } |
| |
| func (o *anOp) Op() *anOp { |
| return o |
| } |
| |
| // bufOp is used by io operations that read / write |
| // data from / to client buffer. |
| type bufOp struct { |
| anOp |
| buf syscall.WSABuf |
| } |
| |
| func (o *bufOp) Init(fd *netFD, buf []byte) { |
| o.anOp.Init(fd) |
| o.buf.Len = uint32(len(buf)) |
| if len(buf) == 0 { |
| o.buf.Buf = nil |
| } else { |
| o.buf.Buf = (*byte)(unsafe.Pointer(&buf[0])) |
| } |
| } |
| |
| // resultSrv will retreive all io completion results from |
| // iocp and send them to the correspondent waiting client |
| // goroutine via channel supplied in the request. |
| type resultSrv struct { |
| iocp int32 |
| } |
| |
| func (s *resultSrv) Run() { |
| var o *syscall.Overlapped |
| var key uint32 |
| var r ioResult |
| for { |
| r.err = syscall.GetQueuedCompletionStatus(s.iocp, &(r.qty), &key, &o, syscall.INFINITE) |
| switch { |
| case r.err == 0: |
| // Dequeued successfully completed io packet. |
| case r.err == syscall.WAIT_TIMEOUT && o == nil: |
| // Wait has timed out (should not happen now, but might be used in the future). |
| panic("GetQueuedCompletionStatus timed out") |
| case o == nil: |
| // Failed to dequeue anything -> report the error. |
| panic("GetQueuedCompletionStatus failed " + syscall.Errstr(r.err)) |
| default: |
| // Dequeued failed io packet. |
| } |
| (*anOp)(unsafe.Pointer(o)).resultc <- r |
| } |
| } |
| |
| |
| // ioSrv executes net io requests. |
| type ioSrv struct { |
| submchan chan anOpIface // submit io requests |
| canchan chan anOpIface // cancel io requests |
| } |
| |
| // ProcessRemoteIO will execute submit io requests on behalf |
| // of other goroutines, all on a single os thread, so it can |
| // cancel them later. Results of all operations will be sent |
| // back to their requesters via channel supplied in request. |
| func (s *ioSrv) ProcessRemoteIO() { |
| runtime.LockOSThread() |
| defer runtime.UnlockOSThread() |
| for { |
| select { |
| case o := <-s.submchan: |
| o.Op().errnoc <- o.Submit() |
| case o := <-s.canchan: |
| o.Op().errnoc <- syscall.CancelIo(uint32(o.Op().fd.sysfd)) |
| } |
| } |
| } |
| |
| // ExecIO executes a single io operation. It either executes it |
| // inline, or, if timeouts are employed, passes the request onto |
| // a special goroutine and waits for completion or cancels request. |
| func (s *ioSrv) ExecIO(oi anOpIface, deadline_delta int64) (n int, err os.Error) { |
| var e int |
| o := oi.Op() |
| if deadline_delta > 0 { |
| // Send request to a special dedicated thread, |
| // so it can stop the io with CancelIO later. |
| s.submchan <- oi |
| e = <-o.errnoc |
| } else { |
| e = oi.Submit() |
| } |
| switch e { |
| case 0: |
| // IO completed immediately, but we need to get our completion message anyway. |
| case syscall.ERROR_IO_PENDING: |
| // IO started, and we have to wait for it's completion. |
| default: |
| return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, os.Errno(e)} |
| } |
| // Wait for our request to complete. |
| var r ioResult |
| if deadline_delta > 0 { |
| select { |
| case r = <-o.resultc: |
| case <-time.After(deadline_delta): |
| s.canchan <- oi |
| <-o.errnoc |
| r = <-o.resultc |
| if r.err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled |
| r.err = syscall.EWOULDBLOCK |
| } |
| } |
| } else { |
| r = <-o.resultc |
| } |
| if r.err != 0 { |
| err = &OpError{oi.Name(), o.fd.net, o.fd.laddr, os.Errno(r.err)} |
| } |
| return int(r.qty), err |
| } |
| |
| // Start helper goroutines. |
| var resultsrv *resultSrv |
| var iosrv *ioSrv |
| var onceStartServer sync.Once |
| |
| func startServer() { |
| resultsrv = new(resultSrv) |
| var errno int |
| resultsrv.iocp, errno = syscall.CreateIoCompletionPort(-1, 0, 0, 1) |
| if errno != 0 { |
| panic("CreateIoCompletionPort failed " + syscall.Errstr(errno)) |
| } |
| go resultsrv.Run() |
| |
| iosrv = new(ioSrv) |
| iosrv.submchan = make(chan anOpIface) |
| iosrv.canchan = make(chan anOpIface) |
| go iosrv.ProcessRemoteIO() |
| } |
| |
| // Network file descriptor. |
| type netFD struct { |
| // locking/lifetime of sysfd |
| sysmu sync.Mutex |
| sysref int |
| closing bool |
| |
| // immutable until Close |
| sysfd int |
| family int |
| proto int |
| net string |
| laddr Addr |
| raddr Addr |
| |
| // owned by client |
| rdeadline_delta int64 |
| rdeadline int64 |
| rio sync.Mutex |
| wdeadline_delta int64 |
| wdeadline int64 |
| wio sync.Mutex |
| } |
| |
| func allocFD(fd, family, proto int, net string, laddr, raddr Addr) (f *netFD) { |
| f = &netFD{ |
| sysfd: fd, |
| family: family, |
| proto: proto, |
| net: net, |
| laddr: laddr, |
| raddr: raddr, |
| } |
| runtime.SetFinalizer(f, (*netFD).Close) |
| return f |
| } |
| |
| func newFD(fd, family, proto int, net string, laddr, raddr Addr) (f *netFD, err os.Error) { |
| if initErr != nil { |
| return nil, initErr |
| } |
| onceStartServer.Do(startServer) |
| // Associate our socket with resultsrv.iocp. |
| if _, e := syscall.CreateIoCompletionPort(int32(fd), resultsrv.iocp, 0, 0); e != 0 { |
| return nil, &OpError{"CreateIoCompletionPort", net, laddr, os.Errno(e)} |
| } |
| return allocFD(fd, family, proto, net, laddr, raddr), nil |
| } |
| |
| // Add a reference to this fd. |
| func (fd *netFD) incref() { |
| fd.sysmu.Lock() |
| fd.sysref++ |
| fd.sysmu.Unlock() |
| } |
| |
| // Remove a reference to this FD and close if we've been asked to do so (and |
| // there are no references left. |
| func (fd *netFD) decref() { |
| fd.sysmu.Lock() |
| fd.sysref-- |
| if fd.closing && fd.sysref == 0 && fd.sysfd >= 0 { |
| // In case the user has set linger, switch to blocking mode so |
| // the close blocks. As long as this doesn't happen often, we |
| // can handle the extra OS processes. Otherwise we'll need to |
| // use the resultsrv for Close too. Sigh. |
| syscall.SetNonblock(fd.sysfd, false) |
| closesocket(fd.sysfd) |
| fd.sysfd = -1 |
| // no need for a finalizer anymore |
| runtime.SetFinalizer(fd, nil) |
| } |
| fd.sysmu.Unlock() |
| } |
| |
| func (fd *netFD) Close() os.Error { |
| if fd == nil || fd.sysfd == -1 { |
| return os.EINVAL |
| } |
| |
| fd.incref() |
| syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR) |
| fd.closing = true |
| fd.decref() |
| return nil |
| } |
| |
| // Read from network. |
| |
| type readOp struct { |
| bufOp |
| } |
| |
| func (o *readOp) Submit() (errno int) { |
| var d, f uint32 |
| return syscall.WSARecv(uint32(o.fd.sysfd), &o.buf, 1, &d, &f, &o.o, nil) |
| } |
| |
| func (o *readOp) Name() string { |
| return "WSARecv" |
| } |
| |
| func (fd *netFD) Read(buf []byte) (n int, err os.Error) { |
| if fd == nil { |
| return 0, os.EINVAL |
| } |
| fd.rio.Lock() |
| defer fd.rio.Unlock() |
| fd.incref() |
| defer fd.decref() |
| if fd.sysfd == -1 { |
| return 0, os.EINVAL |
| } |
| var o readOp |
| o.Init(fd, buf) |
| n, err = iosrv.ExecIO(&o, fd.rdeadline_delta) |
| if err == nil && n == 0 { |
| err = os.EOF |
| } |
| return |
| } |
| |
| // ReadFrom from network. |
| |
| type readFromOp struct { |
| bufOp |
| rsa syscall.RawSockaddrAny |
| } |
| |
| func (o *readFromOp) Submit() (errno int) { |
| var d, f uint32 |
| l := int32(unsafe.Sizeof(o.rsa)) |
| return syscall.WSARecvFrom(uint32(o.fd.sysfd), &o.buf, 1, &d, &f, &o.rsa, &l, &o.o, nil) |
| } |
| |
| func (o *readFromOp) Name() string { |
| return "WSARecvFrom" |
| } |
| |
| func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err os.Error) { |
| if fd == nil { |
| return 0, nil, os.EINVAL |
| } |
| if len(buf) == 0 { |
| return 0, nil, nil |
| } |
| fd.rio.Lock() |
| defer fd.rio.Unlock() |
| fd.incref() |
| defer fd.decref() |
| if fd.sysfd == -1 { |
| return 0, nil, os.EINVAL |
| } |
| var o readFromOp |
| o.Init(fd, buf) |
| n, err = iosrv.ExecIO(&o, fd.rdeadline_delta) |
| sa, _ = o.rsa.Sockaddr() |
| return |
| } |
| |
| // Write to network. |
| |
| type writeOp struct { |
| bufOp |
| } |
| |
| func (o *writeOp) Submit() (errno int) { |
| var d uint32 |
| return syscall.WSASend(uint32(o.fd.sysfd), &o.buf, 1, &d, 0, &o.o, nil) |
| } |
| |
| func (o *writeOp) Name() string { |
| return "WSASend" |
| } |
| |
| func (fd *netFD) Write(buf []byte) (n int, err os.Error) { |
| if fd == nil { |
| return 0, os.EINVAL |
| } |
| fd.wio.Lock() |
| defer fd.wio.Unlock() |
| fd.incref() |
| defer fd.decref() |
| if fd.sysfd == -1 { |
| return 0, os.EINVAL |
| } |
| var o writeOp |
| o.Init(fd, buf) |
| return iosrv.ExecIO(&o, fd.wdeadline_delta) |
| } |
| |
| // WriteTo to network. |
| |
| type writeToOp struct { |
| bufOp |
| sa syscall.Sockaddr |
| } |
| |
| func (o *writeToOp) Submit() (errno int) { |
| var d uint32 |
| return syscall.WSASendto(uint32(o.fd.sysfd), &o.buf, 1, &d, 0, o.sa, &o.o, nil) |
| } |
| |
| func (o *writeToOp) Name() string { |
| return "WSASendto" |
| } |
| |
| func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (n int, err os.Error) { |
| if fd == nil { |
| return 0, os.EINVAL |
| } |
| if len(buf) == 0 { |
| return 0, nil |
| } |
| fd.wio.Lock() |
| defer fd.wio.Unlock() |
| fd.incref() |
| defer fd.decref() |
| if fd.sysfd == -1 { |
| return 0, os.EINVAL |
| } |
| var o writeToOp |
| o.Init(fd, buf) |
| o.sa = sa |
| return iosrv.ExecIO(&o, fd.wdeadline_delta) |
| } |
| |
| // Accept new network connections. |
| |
| type acceptOp struct { |
| anOp |
| newsock int |
| attrs [2]syscall.RawSockaddrAny // space for local and remote address only |
| } |
| |
| func (o *acceptOp) Submit() (errno int) { |
| var d uint32 |
| l := uint32(unsafe.Sizeof(o.attrs[0])) |
| return syscall.AcceptEx(uint32(o.fd.sysfd), uint32(o.newsock), |
| (*byte)(unsafe.Pointer(&o.attrs[0])), 0, l, l, &d, &o.o) |
| } |
| |
| func (o *acceptOp) Name() string { |
| return "AcceptEx" |
| } |
| |
| func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (nfd *netFD, err os.Error) { |
| if fd == nil || fd.sysfd == -1 { |
| return nil, os.EINVAL |
| } |
| fd.incref() |
| defer fd.decref() |
| |
| // Get new socket. |
| // See ../syscall/exec.go for description of ForkLock. |
| syscall.ForkLock.RLock() |
| s, e := syscall.Socket(fd.family, fd.proto, 0) |
| if e != 0 { |
| syscall.ForkLock.RUnlock() |
| return nil, os.Errno(e) |
| } |
| syscall.CloseOnExec(s) |
| syscall.ForkLock.RUnlock() |
| |
| // Associate our new socket with IOCP. |
| onceStartServer.Do(startServer) |
| if _, e = syscall.CreateIoCompletionPort(int32(s), resultsrv.iocp, 0, 0); e != 0 { |
| return nil, &OpError{"CreateIoCompletionPort", fd.net, fd.laddr, os.Errno(e)} |
| } |
| |
| // Submit accept request. |
| var o acceptOp |
| o.Init(fd) |
| o.newsock = s |
| _, err = iosrv.ExecIO(&o, 0) |
| if err != nil { |
| closesocket(s) |
| return nil, err |
| } |
| |
| // Inherit properties of the listening socket. |
| e = syscall.SetsockoptInt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, fd.sysfd) |
| if e != 0 { |
| closesocket(s) |
| return nil, err |
| } |
| |
| // Get local and peer addr out of AcceptEx buffer. |
| var lrsa, rrsa *syscall.RawSockaddrAny |
| var llen, rlen int32 |
| l := uint32(unsafe.Sizeof(*lrsa)) |
| syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&o.attrs[0])), |
| 0, l, l, &lrsa, &llen, &rrsa, &rlen) |
| lsa, _ := lrsa.Sockaddr() |
| rsa, _ := rrsa.Sockaddr() |
| |
| return allocFD(s, fd.family, fd.proto, fd.net, toAddr(lsa), toAddr(rsa)), nil |
| } |
| |
| // Not implemeted functions. |
| |
| func (fd *netFD) dup() (f *os.File, err os.Error) { |
| // TODO: Implement this |
| return nil, os.NewSyscallError("dup", syscall.EWINDOWS) |
| } |
| |
| func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err os.Error) { |
| return 0, 0, 0, nil, os.EAFNOSUPPORT |
| } |
| |
| func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err os.Error) { |
| return 0, 0, os.EAFNOSUPPORT |
| } |