| // Copyright 2018 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package poll |
| |
| import ( |
| "runtime" |
| "sync" |
| "syscall" |
| "unsafe" |
| ) |
| |
| const ( |
| // spliceNonblock makes calls to splice(2) non-blocking. |
| spliceNonblock = 0x2 |
| |
| // maxSpliceSize is the maximum amount of data Splice asks |
| // the kernel to move in a single call to splice(2). |
| // We use 1MB as Splice writes data through a pipe, and 1MB is the default maximum pipe buffer size, |
| // which is determined by /proc/sys/fs/pipe-max-size. |
| maxSpliceSize = 1 << 20 |
| ) |
| |
| // Splice transfers at most remain bytes of data from src to dst, using the |
| // splice system call to minimize copies of data from and to userspace. |
| // |
| // Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer. |
| // src and dst must both be stream-oriented sockets. |
| // |
| // If err != nil, sc is the system call which caused the error. |
| func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) { |
| p, sc, err := getPipe() |
| if err != nil { |
| return 0, false, sc, err |
| } |
| defer putPipe(p) |
| var inPipe, n int |
| for err == nil && remain > 0 { |
| max := maxSpliceSize |
| if int64(max) > remain { |
| max = int(remain) |
| } |
| inPipe, err = spliceDrain(p.wfd, src, max) |
| // The operation is considered handled if splice returns no |
| // error, or an error other than EINVAL. An EINVAL means the |
| // kernel does not support splice for the socket type of src. |
| // The failed syscall does not consume any data so it is safe |
| // to fall back to a generic copy. |
| // |
| // spliceDrain should never return EAGAIN, so if err != nil, |
| // Splice cannot continue. |
| // |
| // If inPipe == 0 && err == nil, src is at EOF, and the |
| // transfer is complete. |
| handled = handled || (err != syscall.EINVAL) |
| if err != nil || inPipe == 0 { |
| break |
| } |
| p.data += inPipe |
| |
| n, err = splicePump(dst, p.rfd, inPipe) |
| if n > 0 { |
| written += int64(n) |
| remain -= int64(n) |
| p.data -= n |
| } |
| } |
| if err != nil { |
| return written, handled, "splice", err |
| } |
| return written, true, "", nil |
| } |
| |
| // spliceDrain moves data from a socket to a pipe. |
| // |
| // Invariant: when entering spliceDrain, the pipe is empty. It is either in its |
| // initial state, or splicePump has emptied it previously. |
| // |
| // Given this, spliceDrain can reasonably assume that the pipe is ready for |
| // writing, so if splice returns EAGAIN, it must be because the socket is not |
| // ready for reading. |
| // |
| // If spliceDrain returns (0, nil), src is at EOF. |
| func spliceDrain(pipefd int, sock *FD, max int) (int, error) { |
| if err := sock.readLock(); err != nil { |
| return 0, err |
| } |
| defer sock.readUnlock() |
| if err := sock.pd.prepareRead(sock.isFile); err != nil { |
| return 0, err |
| } |
| for { |
| n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock) |
| if err == syscall.EINTR { |
| continue |
| } |
| if err != syscall.EAGAIN { |
| return n, err |
| } |
| if err := sock.pd.waitRead(sock.isFile); err != nil { |
| return n, err |
| } |
| } |
| } |
| |
| // splicePump moves all the buffered data from a pipe to a socket. |
| // |
| // Invariant: when entering splicePump, there are exactly inPipe |
| // bytes of data in the pipe, from a previous call to spliceDrain. |
| // |
| // By analogy to the condition from spliceDrain, splicePump |
| // only needs to poll the socket for readiness, if splice returns |
| // EAGAIN. |
| // |
| // If splicePump cannot move all the data in a single call to |
| // splice(2), it loops over the buffered data until it has written |
| // all of it to the socket. This behavior is similar to the Write |
| // step of an io.Copy in userspace. |
| func splicePump(sock *FD, pipefd int, inPipe int) (int, error) { |
| if err := sock.writeLock(); err != nil { |
| return 0, err |
| } |
| defer sock.writeUnlock() |
| if err := sock.pd.prepareWrite(sock.isFile); err != nil { |
| return 0, err |
| } |
| written := 0 |
| for inPipe > 0 { |
| n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock) |
| // Here, the condition n == 0 && err == nil should never be |
| // observed, since Splice controls the write side of the pipe. |
| if n > 0 { |
| inPipe -= n |
| written += n |
| continue |
| } |
| if err != syscall.EAGAIN { |
| return written, err |
| } |
| if err := sock.pd.waitWrite(sock.isFile); err != nil { |
| return written, err |
| } |
| } |
| return written, nil |
| } |
| |
| // splice wraps the splice system call. Since the current implementation |
| // only uses splice on sockets and pipes, the offset arguments are unused. |
| // splice returns int instead of int64, because callers never ask it to |
| // move more data in a single call than can fit in an int32. |
| func splice(out int, in int, max int, flags int) (int, error) { |
| n, err := syscall.Splice(in, nil, out, nil, max, flags) |
| return int(n), err |
| } |
| |
| type splicePipeFields struct { |
| rfd int |
| wfd int |
| data int |
| } |
| |
| type splicePipe struct { |
| splicePipeFields |
| |
| // We want to use a finalizer, so ensure that the size is |
| // large enough to not use the tiny allocator. |
| _ [24 - unsafe.Sizeof(splicePipeFields{})%24]byte |
| } |
| |
| // splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers. |
| // The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up |
| // a finalizer for each pipe to close its file descriptors before the actual GC. |
| var splicePipePool = sync.Pool{New: newPoolPipe} |
| |
| func newPoolPipe() any { |
| // Discard the error which occurred during the creation of pipe buffer, |
| // redirecting the data transmission to the conventional way utilizing read() + write() as a fallback. |
| p := newPipe() |
| if p == nil { |
| return nil |
| } |
| runtime.SetFinalizer(p, destroyPipe) |
| return p |
| } |
| |
| // getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache. |
| // |
| // Note that it may fail to create a new pipe buffer by newPipe(), in which case getPipe() will return a generic error |
| // and system call name splice in a string as the indication. |
| func getPipe() (*splicePipe, string, error) { |
| v := splicePipePool.Get() |
| if v == nil { |
| return nil, "splice", syscall.EINVAL |
| } |
| return v.(*splicePipe), "", nil |
| } |
| |
| func putPipe(p *splicePipe) { |
| // If there is still data left in the pipe, |
| // then close and discard it instead of putting it back into the pool. |
| if p.data != 0 { |
| runtime.SetFinalizer(p, nil) |
| destroyPipe(p) |
| return |
| } |
| splicePipePool.Put(p) |
| } |
| |
| // newPipe sets up a pipe for a splice operation. |
| func newPipe() *splicePipe { |
| var fds [2]int |
| if err := syscall.Pipe2(fds[:], syscall.O_CLOEXEC|syscall.O_NONBLOCK); err != nil { |
| return nil |
| } |
| |
| // Splice will loop writing maxSpliceSize bytes from the source to the pipe, |
| // and then write those bytes from the pipe to the destination. |
| // Set the pipe buffer size to maxSpliceSize to optimize that. |
| // Ignore errors here, as a smaller buffer size will work, |
| // although it will require more system calls. |
| fcntl(fds[0], syscall.F_SETPIPE_SZ, maxSpliceSize) |
| |
| return &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}} |
| } |
| |
| // destroyPipe destroys a pipe. |
| func destroyPipe(p *splicePipe) { |
| CloseFunc(p.rfd) |
| CloseFunc(p.wfd) |
| } |