| // Copyright 2010 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package net |
| |
| import ( |
| "context" |
| "internal/race" |
| "os" |
| "runtime" |
| "sync" |
| "syscall" |
| "unsafe" |
| ) |
| |
| var ( |
| initErr error |
| ioSync uint64 |
| ) |
| |
| // CancelIo Windows API cancels all outstanding IO for a particular |
| // socket on current thread. To overcome that limitation, we run |
| // special goroutine, locked to OS single thread, that both starts |
| // and cancels IO. It means, there are 2 unavoidable thread switches |
| // for every IO. |
| // Some newer versions of Windows has new CancelIoEx API, that does |
| // not have that limitation and can be used from any thread. This |
| // package uses CancelIoEx API, if present, otherwise it fallback |
| // to CancelIo. |
| |
| var ( |
| canCancelIO bool // determines if CancelIoEx API is present |
| skipSyncNotif bool |
| hasLoadSetFileCompletionNotificationModes bool |
| ) |
| |
| func sysInit() { |
| var d syscall.WSAData |
| e := syscall.WSAStartup(uint32(0x202), &d) |
| if e != nil { |
| initErr = os.NewSyscallError("wsastartup", e) |
| } |
| canCancelIO = syscall.LoadCancelIoEx() == nil |
| hasLoadSetFileCompletionNotificationModes = syscall.LoadSetFileCompletionNotificationModes() == nil |
| if hasLoadSetFileCompletionNotificationModes { |
| // It's not safe to use FILE_SKIP_COMPLETION_PORT_ON_SUCCESS if non IFS providers are installed: |
| // http://support.microsoft.com/kb/2568167 |
| skipSyncNotif = true |
| protos := [2]int32{syscall.IPPROTO_TCP, 0} |
| var buf [32]syscall.WSAProtocolInfo |
| len := uint32(unsafe.Sizeof(buf)) |
| n, err := syscall.WSAEnumProtocols(&protos[0], &buf[0], &len) |
| if err != nil { |
| skipSyncNotif = false |
| } else { |
| for i := int32(0); i < n; i++ { |
| if buf[i].ServiceFlags1&syscall.XP1_IFS_HANDLES == 0 { |
| skipSyncNotif = false |
| break |
| } |
| } |
| } |
| } |
| } |
| |
| // canUseConnectEx reports whether we can use the ConnectEx Windows API call |
| // for the given network type. |
| func canUseConnectEx(net string) bool { |
| switch net { |
| case "tcp", "tcp4", "tcp6": |
| return true |
| } |
| // ConnectEx windows API does not support connectionless sockets. |
| return false |
| } |
| |
| // operation contains superset of data necessary to perform all async IO. |
| type operation struct { |
| // Used by IOCP interface, it must be first field |
| // of the struct, as our code rely on it. |
| o syscall.Overlapped |
| |
| // fields used by runtime.netpoll |
| runtimeCtx uintptr |
| mode int32 |
| errno int32 |
| qty uint32 |
| |
| // fields used only by net package |
| fd *netFD |
| errc chan error |
| buf syscall.WSABuf |
| sa syscall.Sockaddr |
| rsa *syscall.RawSockaddrAny |
| rsan int32 |
| handle syscall.Handle |
| flags uint32 |
| bufs []syscall.WSABuf |
| } |
| |
| func (o *operation) InitBuf(buf []byte) { |
| o.buf.Len = uint32(len(buf)) |
| o.buf.Buf = nil |
| if len(buf) != 0 { |
| o.buf.Buf = &buf[0] |
| } |
| } |
| |
| func (o *operation) InitBufs(buf *Buffers) { |
| if o.bufs == nil { |
| o.bufs = make([]syscall.WSABuf, 0, len(*buf)) |
| } else { |
| o.bufs = o.bufs[:0] |
| } |
| for _, b := range *buf { |
| var p *byte |
| if len(b) > 0 { |
| p = &b[0] |
| } |
| o.bufs = append(o.bufs, syscall.WSABuf{Len: uint32(len(b)), Buf: p}) |
| } |
| } |
| |
| // ClearBufs clears all pointers to Buffers parameter captured |
| // by InitBufs, so it can be released by garbage collector. |
| func (o *operation) ClearBufs() { |
| for i := range o.bufs { |
| o.bufs[i].Buf = nil |
| } |
| o.bufs = o.bufs[:0] |
| } |
| |
| // ioSrv executes net IO requests. |
| type ioSrv struct { |
| req chan ioSrvReq |
| } |
| |
| type ioSrvReq struct { |
| o *operation |
| submit func(o *operation) error // if nil, cancel the operation |
| } |
| |
| // ProcessRemoteIO will execute submit IO requests on behalf |
| // of other goroutines, all on a single os thread, so it can |
| // cancel them later. Results of all operations will be sent |
| // back to their requesters via channel supplied in request. |
| // It is used only when the CancelIoEx API is unavailable. |
| func (s *ioSrv) ProcessRemoteIO() { |
| runtime.LockOSThread() |
| defer runtime.UnlockOSThread() |
| for r := range s.req { |
| if r.submit != nil { |
| r.o.errc <- r.submit(r.o) |
| } else { |
| r.o.errc <- syscall.CancelIo(r.o.fd.sysfd) |
| } |
| } |
| } |
| |
| // ExecIO executes a single IO operation o. It submits and cancels |
| // IO in the current thread for systems where Windows CancelIoEx API |
| // is available. Alternatively, it passes the request onto |
| // runtime netpoll and waits for completion or cancels request. |
| func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) error) (int, error) { |
| fd := o.fd |
| // Notify runtime netpoll about starting IO. |
| err := fd.pd.prepare(int(o.mode)) |
| if err != nil { |
| return 0, err |
| } |
| // Start IO. |
| if canCancelIO { |
| err = submit(o) |
| } else { |
| // Send request to a special dedicated thread, |
| // so it can stop the IO with CancelIO later. |
| s.req <- ioSrvReq{o, submit} |
| err = <-o.errc |
| } |
| switch err { |
| case nil: |
| // IO completed immediately |
| if o.fd.skipSyncNotif { |
| // No completion message will follow, so return immediately. |
| return int(o.qty), nil |
| } |
| // Need to get our completion message anyway. |
| case syscall.ERROR_IO_PENDING: |
| // IO started, and we have to wait for its completion. |
| err = nil |
| default: |
| return 0, err |
| } |
| // Wait for our request to complete. |
| err = fd.pd.wait(int(o.mode)) |
| if err == nil { |
| // All is good. Extract our IO results and return. |
| if o.errno != 0 { |
| err = syscall.Errno(o.errno) |
| return 0, err |
| } |
| return int(o.qty), nil |
| } |
| // IO is interrupted by "close" or "timeout" |
| netpollErr := err |
| switch netpollErr { |
| case errClosing, errTimeout: |
| // will deal with those. |
| default: |
| panic("net: unexpected runtime.netpoll error: " + netpollErr.Error()) |
| } |
| // Cancel our request. |
| if canCancelIO { |
| err := syscall.CancelIoEx(fd.sysfd, &o.o) |
| // Assuming ERROR_NOT_FOUND is returned, if IO is completed. |
| if err != nil && err != syscall.ERROR_NOT_FOUND { |
| // TODO(brainman): maybe do something else, but panic. |
| panic(err) |
| } |
| } else { |
| s.req <- ioSrvReq{o, nil} |
| <-o.errc |
| } |
| // Wait for cancelation to complete. |
| fd.pd.waitCanceled(int(o.mode)) |
| if o.errno != 0 { |
| err = syscall.Errno(o.errno) |
| if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled |
| err = netpollErr |
| } |
| return 0, err |
| } |
| // We issued a cancelation request. But, it seems, IO operation succeeded |
| // before the cancelation request run. We need to treat the IO operation as |
| // succeeded (the bytes are actually sent/recv from network). |
| return int(o.qty), nil |
| } |
| |
| // Start helper goroutines. |
| var rsrv, wsrv *ioSrv |
| var onceStartServer sync.Once |
| |
| func startServer() { |
| rsrv = new(ioSrv) |
| wsrv = new(ioSrv) |
| if !canCancelIO { |
| // Only CancelIo API is available. Lets start two special goroutines |
| // locked to an OS thread, that both starts and cancels IO. One will |
| // process read requests, while other will do writes. |
| rsrv.req = make(chan ioSrvReq) |
| go rsrv.ProcessRemoteIO() |
| wsrv.req = make(chan ioSrvReq) |
| go wsrv.ProcessRemoteIO() |
| } |
| } |
| |
| // Network file descriptor. |
| type netFD struct { |
| // locking/lifetime of sysfd + serialize access to Read and Write methods |
| fdmu fdMutex |
| |
| // immutable until Close |
| sysfd syscall.Handle |
| family int |
| sotype int |
| isStream bool |
| isConnected bool |
| skipSyncNotif bool |
| net string |
| laddr Addr |
| raddr Addr |
| |
| rop operation // read operation |
| wop operation // write operation |
| |
| // wait server |
| pd pollDesc |
| } |
| |
| func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) { |
| if initErr != nil { |
| return nil, initErr |
| } |
| onceStartServer.Do(startServer) |
| return &netFD{sysfd: sysfd, family: family, sotype: sotype, net: net, isStream: sotype == syscall.SOCK_STREAM}, nil |
| } |
| |
| func (fd *netFD) init() error { |
| if err := fd.pd.init(fd); err != nil { |
| return err |
| } |
| if hasLoadSetFileCompletionNotificationModes { |
| // We do not use events, so we can skip them always. |
| flags := uint8(syscall.FILE_SKIP_SET_EVENT_ON_HANDLE) |
| // It's not safe to skip completion notifications for UDP: |
| // http://blogs.technet.com/b/winserverperformance/archive/2008/06/26/designing-applications-for-high-performance-part-iii.aspx |
| if skipSyncNotif && fd.net == "tcp" { |
| flags |= syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS |
| } |
| err := syscall.SetFileCompletionNotificationModes(fd.sysfd, flags) |
| if err == nil && flags&syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS != 0 { |
| fd.skipSyncNotif = true |
| } |
| } |
| // Disable SIO_UDP_CONNRESET behavior. |
| // http://support.microsoft.com/kb/263823 |
| switch fd.net { |
| case "udp", "udp4", "udp6": |
| ret := uint32(0) |
| flag := uint32(0) |
| size := uint32(unsafe.Sizeof(flag)) |
| err := syscall.WSAIoctl(fd.sysfd, syscall.SIO_UDP_CONNRESET, (*byte)(unsafe.Pointer(&flag)), size, nil, 0, &ret, nil, 0) |
| if err != nil { |
| return os.NewSyscallError("wsaioctl", err) |
| } |
| } |
| fd.rop.mode = 'r' |
| fd.wop.mode = 'w' |
| fd.rop.fd = fd |
| fd.wop.fd = fd |
| fd.rop.runtimeCtx = fd.pd.runtimeCtx |
| fd.wop.runtimeCtx = fd.pd.runtimeCtx |
| if !canCancelIO { |
| fd.rop.errc = make(chan error) |
| fd.wop.errc = make(chan error) |
| } |
| return nil |
| } |
| |
| func (fd *netFD) setAddr(laddr, raddr Addr) { |
| fd.laddr = laddr |
| fd.raddr = raddr |
| runtime.SetFinalizer(fd, (*netFD).Close) |
| } |
| |
| func (fd *netFD) connect(ctx context.Context, la, ra syscall.Sockaddr) error { |
| // Do not need to call fd.writeLock here, |
| // because fd is not yet accessible to user, |
| // so no concurrent operations are possible. |
| if err := fd.init(); err != nil { |
| return err |
| } |
| if deadline, ok := ctx.Deadline(); ok && !deadline.IsZero() { |
| fd.setWriteDeadline(deadline) |
| defer fd.setWriteDeadline(noDeadline) |
| } |
| if !canUseConnectEx(fd.net) { |
| err := connectFunc(fd.sysfd, ra) |
| return os.NewSyscallError("connect", err) |
| } |
| // ConnectEx windows API requires an unconnected, previously bound socket. |
| if la == nil { |
| switch ra.(type) { |
| case *syscall.SockaddrInet4: |
| la = &syscall.SockaddrInet4{} |
| case *syscall.SockaddrInet6: |
| la = &syscall.SockaddrInet6{} |
| default: |
| panic("unexpected type in connect") |
| } |
| if err := syscall.Bind(fd.sysfd, la); err != nil { |
| return os.NewSyscallError("bind", err) |
| } |
| } |
| // Call ConnectEx API. |
| o := &fd.wop |
| o.sa = ra |
| |
| // Wait for the goroutine converting context.Done into a write timeout |
| // to exist, otherwise our caller might cancel the context and |
| // cause fd.setWriteDeadline(aLongTimeAgo) to cancel a successful dial. |
| done := make(chan bool) // must be unbuffered |
| defer func() { done <- true }() |
| go func() { |
| select { |
| case <-ctx.Done(): |
| // Force the runtime's poller to immediately give |
| // up waiting for writability. |
| fd.setWriteDeadline(aLongTimeAgo) |
| <-done |
| case <-done: |
| } |
| }() |
| |
| _, err := wsrv.ExecIO(o, "ConnectEx", func(o *operation) error { |
| return connectExFunc(o.fd.sysfd, o.sa, nil, 0, nil, &o.o) |
| }) |
| if err != nil { |
| select { |
| case <-ctx.Done(): |
| return mapErr(ctx.Err()) |
| default: |
| if _, ok := err.(syscall.Errno); ok { |
| err = os.NewSyscallError("connectex", err) |
| } |
| return err |
| } |
| } |
| // Refresh socket properties. |
| return os.NewSyscallError("setsockopt", syscall.Setsockopt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_UPDATE_CONNECT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd)))) |
| } |
| |
| func (fd *netFD) destroy() { |
| if fd.sysfd == syscall.InvalidHandle { |
| return |
| } |
| // Poller may want to unregister fd in readiness notification mechanism, |
| // so this must be executed before closeFunc. |
| fd.pd.close() |
| closeFunc(fd.sysfd) |
| fd.sysfd = syscall.InvalidHandle |
| // no need for a finalizer anymore |
| runtime.SetFinalizer(fd, nil) |
| } |
| |
| func (fd *netFD) Close() error { |
| if !fd.fdmu.increfAndClose() { |
| return errClosing |
| } |
| // unblock pending reader and writer |
| fd.pd.evict() |
| fd.decref() |
| return nil |
| } |
| |
| func (fd *netFD) shutdown(how int) error { |
| if err := fd.incref(); err != nil { |
| return err |
| } |
| defer fd.decref() |
| return syscall.Shutdown(fd.sysfd, how) |
| } |
| |
| func (fd *netFD) closeRead() error { |
| return fd.shutdown(syscall.SHUT_RD) |
| } |
| |
| func (fd *netFD) closeWrite() error { |
| return fd.shutdown(syscall.SHUT_WR) |
| } |
| |
| func (fd *netFD) Read(buf []byte) (int, error) { |
| if err := fd.readLock(); err != nil { |
| return 0, err |
| } |
| defer fd.readUnlock() |
| o := &fd.rop |
| o.InitBuf(buf) |
| n, err := rsrv.ExecIO(o, "WSARecv", func(o *operation) error { |
| return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil) |
| }) |
| if race.Enabled { |
| race.Acquire(unsafe.Pointer(&ioSync)) |
| } |
| if len(buf) != 0 { |
| err = fd.eofError(n, err) |
| } |
| if _, ok := err.(syscall.Errno); ok { |
| err = os.NewSyscallError("wsarecv", err) |
| } |
| return n, err |
| } |
| |
| func (fd *netFD) readFrom(buf []byte) (int, syscall.Sockaddr, error) { |
| if len(buf) == 0 { |
| return 0, nil, nil |
| } |
| if err := fd.readLock(); err != nil { |
| return 0, nil, err |
| } |
| defer fd.readUnlock() |
| o := &fd.rop |
| o.InitBuf(buf) |
| n, err := rsrv.ExecIO(o, "WSARecvFrom", func(o *operation) error { |
| if o.rsa == nil { |
| o.rsa = new(syscall.RawSockaddrAny) |
| } |
| o.rsan = int32(unsafe.Sizeof(*o.rsa)) |
| return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil) |
| }) |
| err = fd.eofError(n, err) |
| if _, ok := err.(syscall.Errno); ok { |
| err = os.NewSyscallError("wsarecvfrom", err) |
| } |
| if err != nil { |
| return n, nil, err |
| } |
| sa, _ := o.rsa.Sockaddr() |
| return n, sa, nil |
| } |
| |
| func (fd *netFD) Write(buf []byte) (int, error) { |
| if err := fd.writeLock(); err != nil { |
| return 0, err |
| } |
| defer fd.writeUnlock() |
| if race.Enabled { |
| race.ReleaseMerge(unsafe.Pointer(&ioSync)) |
| } |
| o := &fd.wop |
| o.InitBuf(buf) |
| n, err := wsrv.ExecIO(o, "WSASend", func(o *operation) error { |
| return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil) |
| }) |
| if _, ok := err.(syscall.Errno); ok { |
| err = os.NewSyscallError("wsasend", err) |
| } |
| return n, err |
| } |
| |
| func (c *conn) writeBuffers(v *Buffers) (int64, error) { |
| if !c.ok() { |
| return 0, syscall.EINVAL |
| } |
| n, err := c.fd.writeBuffers(v) |
| if err != nil { |
| return n, &OpError{Op: "WSASend", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err} |
| } |
| return n, nil |
| } |
| |
| func (fd *netFD) writeBuffers(buf *Buffers) (int64, error) { |
| if len(*buf) == 0 { |
| return 0, nil |
| } |
| if err := fd.writeLock(); err != nil { |
| return 0, err |
| } |
| defer fd.writeUnlock() |
| if race.Enabled { |
| race.ReleaseMerge(unsafe.Pointer(&ioSync)) |
| } |
| o := &fd.wop |
| o.InitBufs(buf) |
| n, err := wsrv.ExecIO(o, "WSASend", func(o *operation) error { |
| return syscall.WSASend(o.fd.sysfd, &o.bufs[0], uint32(len(*buf)), &o.qty, 0, &o.o, nil) |
| }) |
| o.ClearBufs() |
| if _, ok := err.(syscall.Errno); ok { |
| err = os.NewSyscallError("wsasend", err) |
| } |
| testHookDidWritev(n) |
| buf.consume(int64(n)) |
| return int64(n), err |
| } |
| |
| func (fd *netFD) writeTo(buf []byte, sa syscall.Sockaddr) (int, error) { |
| if len(buf) == 0 { |
| return 0, nil |
| } |
| if err := fd.writeLock(); err != nil { |
| return 0, err |
| } |
| defer fd.writeUnlock() |
| o := &fd.wop |
| o.InitBuf(buf) |
| o.sa = sa |
| n, err := wsrv.ExecIO(o, "WSASendto", func(o *operation) error { |
| return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil) |
| }) |
| if _, ok := err.(syscall.Errno); ok { |
| err = os.NewSyscallError("wsasendto", err) |
| } |
| return n, err |
| } |
| |
| func (fd *netFD) acceptOne(rawsa []syscall.RawSockaddrAny, o *operation) (*netFD, error) { |
| // Get new socket. |
| s, err := sysSocket(fd.family, fd.sotype, 0) |
| if err != nil { |
| return nil, err |
| } |
| |
| // Associate our new socket with IOCP. |
| netfd, err := newFD(s, fd.family, fd.sotype, fd.net) |
| if err != nil { |
| closeFunc(s) |
| return nil, err |
| } |
| if err := netfd.init(); err != nil { |
| fd.Close() |
| return nil, err |
| } |
| |
| // Submit accept request. |
| o.handle = s |
| o.rsan = int32(unsafe.Sizeof(rawsa[0])) |
| _, err = rsrv.ExecIO(o, "AcceptEx", func(o *operation) error { |
| return acceptFunc(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o) |
| }) |
| if err != nil { |
| netfd.Close() |
| if _, ok := err.(syscall.Errno); ok { |
| err = os.NewSyscallError("acceptex", err) |
| } |
| return nil, err |
| } |
| |
| // Inherit properties of the listening socket. |
| err = syscall.Setsockopt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd))) |
| if err != nil { |
| netfd.Close() |
| return nil, os.NewSyscallError("setsockopt", err) |
| } |
| runtime.KeepAlive(fd) |
| return netfd, nil |
| } |
| |
| func (fd *netFD) accept() (*netFD, error) { |
| if err := fd.readLock(); err != nil { |
| return nil, err |
| } |
| defer fd.readUnlock() |
| |
| o := &fd.rop |
| var netfd *netFD |
| var err error |
| var rawsa [2]syscall.RawSockaddrAny |
| for { |
| netfd, err = fd.acceptOne(rawsa[:], o) |
| if err == nil { |
| break |
| } |
| // Sometimes we see WSAECONNRESET and ERROR_NETNAME_DELETED is |
| // returned here. These happen if connection reset is received |
| // before AcceptEx could complete. These errors relate to new |
| // connection, not to AcceptEx, so ignore broken connection and |
| // try AcceptEx again for more connections. |
| nerr, ok := err.(*os.SyscallError) |
| if !ok { |
| return nil, err |
| } |
| errno, ok := nerr.Err.(syscall.Errno) |
| if !ok { |
| return nil, err |
| } |
| switch errno { |
| case syscall.ERROR_NETNAME_DELETED, syscall.WSAECONNRESET: |
| // ignore these and try again |
| default: |
| return nil, err |
| } |
| } |
| |
| // Get local and peer addr out of AcceptEx buffer. |
| var lrsa, rrsa *syscall.RawSockaddrAny |
| var llen, rlen int32 |
| syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&rawsa[0])), |
| 0, uint32(o.rsan), uint32(o.rsan), &lrsa, &llen, &rrsa, &rlen) |
| lsa, _ := lrsa.Sockaddr() |
| rsa, _ := rrsa.Sockaddr() |
| |
| netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa)) |
| return netfd, nil |
| } |
| |
| // Unimplemented functions. |
| |
| func (fd *netFD) dup() (*os.File, error) { |
| // TODO: Implement this |
| return nil, syscall.EWINDOWS |
| } |
| |
| func (fd *netFD) readMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) { |
| return 0, 0, 0, nil, syscall.EWINDOWS |
| } |
| |
| func (fd *netFD) writeMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) { |
| return 0, 0, syscall.EWINDOWS |
| } |