switch it all over to epoll

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-10-31 09:05:25 +01:00
parent cab6f1e64d
commit 8dd2698556
13 changed files with 150 additions and 57 deletions

View File

@ -12,8 +12,3 @@ failure = { version = "0.1", default-features = false, features = ["std"] }
lazy_static = "1.3" lazy_static = "1.3"
libc = "0.2" libc = "0.2"
nix = "0.15" nix = "0.15"
[dependencies.io-uring]
version = "0.1"
path = "../io-uring"
features = ["runtime"]

View File

@ -3,9 +3,9 @@ use std::sync::Arc;
use failure::Error; use failure::Error;
use nix::errno::Errno; use nix::errno::Errno;
use crate::io::seq_packet::SeqPacketSocket;
use crate::lxcseccomp::ProxyMessageBuffer; use crate::lxcseccomp::ProxyMessageBuffer;
use crate::syscall::{self, Syscall, SyscallStatus}; use crate::syscall::{self, Syscall, SyscallStatus};
use io_uring::socket::SeqPacketSocket;
pub struct Client { pub struct Client {
socket: SeqPacketSocket, socket: SeqPacketSocket,

View File

@ -9,8 +9,7 @@ use std::os::raw::c_int;
use std::os::unix::io::{FromRawFd, IntoRawFd}; use std::os::unix::io::{FromRawFd, IntoRawFd};
use std::panic::UnwindSafe; use std::panic::UnwindSafe;
use io_uring::socket::pipe::{self, Pipe}; use crate::io::pipe::{self, Pipe};
use crate::syscall::SyscallStatus; use crate::syscall::SyscallStatus;
pub async fn forking_syscall<F>(func: F) -> io::Result<SyscallStatus> pub async fn forking_syscall<F>(func: F) -> io::Result<SyscallStatus>
@ -50,7 +49,7 @@ impl Fork {
where where
F: FnOnce() -> io::Result<SyscallStatus> + UnwindSafe, F: FnOnce() -> io::Result<SyscallStatus> + UnwindSafe,
{ {
let (pipe_r, pipe_w) = pipe::pipe_default()?; let (pipe_r, pipe_w) = pipe::pipe()?;
let pid = c_try!(unsafe { libc::fork() }); let pid = c_try!(unsafe { libc::fork() });
if pid == 0 { if pid == 0 {

71
src/io/cmsg.rs Normal file
View File

@ -0,0 +1,71 @@
use std::mem;
pub const fn align(n: usize) -> usize {
(n + mem::size_of::<libc::size_t>() - 1) & !(mem::size_of::<libc::size_t>() - 1)
}
pub const fn space(n: usize) -> usize {
align(mem::size_of::<libc::cmsghdr>()) + align(n)
}
pub const fn capacity<T: Sized>() -> usize {
space(mem::size_of::<T>())
}
pub fn buffer<T: Sized>() -> Vec<u8> {
let capacity = capacity::<T>();
let mut buf = Vec::with_capacity(capacity);
unsafe {
buf.set_len(capacity);
}
buf
}
pub struct RawCmsgIterator<'a> {
buf: &'a [u8],
}
pub struct ControlMessageRef<'a> {
pub cmsg_level: libc::c_int,
pub cmsg_type: libc::c_int,
pub data: &'a [u8],
}
impl<'a> Iterator for RawCmsgIterator<'a> {
type Item = ControlMessageRef<'a>;
fn next(&mut self) -> Option<Self::Item> {
use libc::cmsghdr;
if self.buf.len() < mem::size_of::<cmsghdr>() {
return None;
}
let buf: &'a [u8] = self.buf;
// clippy issue:
#[allow(clippy::cast_ptr_alignment)]
let hdr: cmsghdr = unsafe { std::ptr::read_unaligned(buf.as_ptr() as *const cmsghdr) };
let data_off = mem::size_of::<cmsghdr>();
let data_end = hdr.cmsg_len;
let next_hdr = align(hdr.cmsg_len as usize);
let data = &buf[data_off..data_end];
let item = ControlMessageRef {
cmsg_level: hdr.cmsg_level,
cmsg_type: hdr.cmsg_type,
data,
};
self.buf = if next_hdr >= buf.len() {
&[]
} else {
&buf[next_hdr..]
};
Some(item)
}
}
#[inline]
pub fn iter(buf: &[u8]) -> RawCmsgIterator {
RawCmsgIterator { buf }
}

5
src/io/mod.rs Normal file
View File

@ -0,0 +1,5 @@
pub mod cmsg;
pub mod pipe;
pub mod reactor;
pub mod rw_traits;
pub mod seq_packet;

View File

@ -1,15 +1,17 @@
use std::convert::TryFrom; use std::convert::TryFrom;
use std::io; use std::io;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use crate::error::io_err_other; use crate::error::io_err_other;
use crate::io::reactor::PolledFd;
use crate::io::rw_traits;
use crate::poll_fn::poll_fn; use crate::poll_fn::poll_fn;
use crate::reactor::PolledFd;
use crate::rw_traits;
use crate::tools::Fd; use crate::tools::Fd;
pub use rw_traits::{Read, Write};
pub struct Pipe<RW> { pub struct Pipe<RW> {
fd: PolledFd, fd: PolledFd,
_phantom: PhantomData<RW>, _phantom: PhantomData<RW>,
@ -22,6 +24,13 @@ impl<RW> AsRawFd for Pipe<RW> {
} }
} }
impl<RW> IntoRawFd for Pipe<RW> {
#[inline]
fn into_raw_fd(self) -> RawFd {
self.fd.into_raw_fd()
}
}
pub fn pipe() -> io::Result<(Pipe<rw_traits::Read>, Pipe<rw_traits::Write>)> { pub fn pipe() -> io::Result<(Pipe<rw_traits::Read>, Pipe<rw_traits::Write>)> {
let mut pfd: [RawFd; 2] = [0, 0]; let mut pfd: [RawFd; 2] = [0, 0];
@ -56,6 +65,17 @@ impl<RW: rw_traits::HasRead> Pipe<RW> {
pub async fn read(&mut self, data: &mut [u8]) -> io::Result<usize> { pub async fn read(&mut self, data: &mut [u8]) -> io::Result<usize> {
poll_fn(move |cx| self.poll_read(cx, data)).await poll_fn(move |cx| self.poll_read(cx, data)).await
} }
pub async fn read_exact(&mut self, mut data: &mut [u8]) -> io::Result<()> {
while !data.is_empty() {
match self.read(&mut data[..]).await {
Ok(got) => data = &mut data[got..],
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => return Err(err),
}
}
Ok(())
}
} }
impl<RW: rw_traits::HasWrite> Pipe<RW> { impl<RW: rw_traits::HasWrite> Pipe<RW> {

View File

@ -1,5 +1,5 @@
use std::io; use std::io;
use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, Once}; use std::sync::{Arc, Mutex, Once};
use std::task::{Context, Poll, Waker}; use std::task::{Context, Poll, Waker};
@ -12,7 +12,7 @@ use crate::tools::Fd;
static START: Once = Once::new(); static START: Once = Once::new();
static mut REACTOR: Option<Arc<Reactor>> = None; static mut REACTOR: Option<Arc<Reactor>> = None;
pub fn default() -> Arc<Reactor> { pub fn default_reactor() -> Arc<Reactor> {
START.call_once(|| unsafe { START.call_once(|| unsafe {
let reactor = Reactor::new().expect("setup main epoll reactor"); let reactor = Reactor::new().expect("setup main epoll reactor");
REACTOR = Some(reactor); REACTOR = Some(reactor);
@ -130,10 +130,11 @@ pub struct Registration {
impl Drop for Registration { impl Drop for Registration {
fn drop(&mut self) { fn drop(&mut self) {
let inner = self.inner.as_ref().unwrap(); if let Some(inner) = self.inner.take() {
let reactor = Arc::clone(&inner.reactor); let reactor = Arc::clone(&inner.reactor);
inner.gone.store(true, Ordering::Release); inner.gone.store(true, Ordering::Release);
reactor.deregister(self.inner.take().unwrap()); reactor.deregister(inner);
}
} }
} }
@ -150,8 +151,6 @@ pub struct PolledFd {
registration: Registration, registration: Registration,
} }
// NOTE: For IntoRawFd we'd need to deregister from the reactor explicitly!
impl AsRawFd for PolledFd { impl AsRawFd for PolledFd {
#[inline] #[inline]
fn as_raw_fd(&self) -> RawFd { fn as_raw_fd(&self) -> RawFd {
@ -159,10 +158,22 @@ impl AsRawFd for PolledFd {
} }
} }
impl IntoRawFd for PolledFd {
fn into_raw_fd(mut self) -> RawFd {
let registration = self.registration.inner.take().unwrap();
registration
.reactor
.epoll
.remove_fd(self.as_raw_fd())
.expect("cannot remove PolledFd from epoll instance");
self.fd.into_raw_fd()
}
}
impl PolledFd { impl PolledFd {
pub fn new(fd: Fd) -> io::Result<Self> { pub fn new(fd: Fd) -> io::Result<Self> {
fd.set_nonblocking(true).map_err(io_err_other)?; fd.set_nonblocking(true).map_err(io_err_other)?;
Self::new_with_reactor(fd, crate::reactor::default()) Self::new_with_reactor(fd, self::default_reactor())
} }
pub fn new_with_reactor(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> { pub fn new_with_reactor(fd: Fd, reactor: Arc<Reactor>) -> io::Result<Self> {
@ -170,7 +181,7 @@ impl PolledFd {
Ok(Self { fd, registration }) Ok(Self { fd, registration })
} }
pub fn wrap_read<T, F>(&mut self, cx: &mut Context, func: F) -> Poll<io::Result<T>> pub fn wrap_read<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>>
where where
F: FnOnce() -> io::Result<T>, F: FnOnce() -> io::Result<T>,
{ {
@ -192,7 +203,7 @@ impl PolledFd {
} }
} }
pub fn wrap_write<T, F>(&mut self, cx: &mut Context, func: F) -> Poll<io::Result<T>> pub fn wrap_write<T, F>(&self, cx: &mut Context, func: F) -> Poll<io::Result<T>>
where where
F: FnOnce() -> io::Result<T>, F: FnOnce() -> io::Result<T>,
{ {

View File

@ -5,9 +5,9 @@ use std::{io, ptr};
use failure::Error; use failure::Error;
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType}; use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
use crate::io::reactor::PolledFd;
use crate::iovec::{IoVec, IoVecMut}; use crate::iovec::{IoVec, IoVecMut};
use crate::poll_fn::poll_fn; use crate::poll_fn::poll_fn;
use crate::reactor::PolledFd;
use crate::tools::AssertSendSync; use crate::tools::AssertSendSync;
use crate::tools::Fd; use crate::tools::Fd;
@ -82,7 +82,7 @@ impl SeqPacketSocket {
} }
pub fn poll_sendmsg( pub fn poll_sendmsg(
&mut self, &self,
cx: &mut Context, cx: &mut Context,
msg: &AssertSendSync<libc::msghdr>, msg: &AssertSendSync<libc::msghdr>,
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
@ -94,7 +94,7 @@ impl SeqPacketSocket {
}) })
} }
pub async fn sendmsg_vectored(&mut self, iov: &[IoVec<'_>]) -> io::Result<usize> { pub async fn sendmsg_vectored(&self, iov: &[IoVec<'_>]) -> io::Result<usize> {
let msg = AssertSendSync(libc::msghdr { let msg = AssertSendSync(libc::msghdr {
msg_name: ptr::null_mut(), msg_name: ptr::null_mut(),
msg_namelen: 0, msg_namelen: 0,
@ -109,7 +109,7 @@ impl SeqPacketSocket {
} }
pub fn poll_recvmsg( pub fn poll_recvmsg(
&mut self, &self,
cx: &mut Context, cx: &mut Context,
msg: &mut AssertSendSync<libc::msghdr>, msg: &mut AssertSendSync<libc::msghdr>,
) -> Poll<io::Result<usize>> { ) -> Poll<io::Result<usize>> {
@ -123,11 +123,11 @@ impl SeqPacketSocket {
// clippy is wrong about this one // clippy is wrong about this one
#[allow(clippy::needless_lifetimes)] #[allow(clippy::needless_lifetimes)]
pub async fn recvmsg_vectored<'a>( pub async fn recvmsg_vectored(
&mut self, &self,
iov: &mut [IoVecMut<'_>], iov: &mut [IoVecMut<'_>],
cmsg_buf: &'a mut [u8], cmsg_buf: &mut [u8],
) -> io::Result<usize> { ) -> io::Result<(usize, usize)> {
let mut msg = AssertSendSync(libc::msghdr { let mut msg = AssertSendSync(libc::msghdr {
msg_name: ptr::null_mut(), msg_name: ptr::null_mut(),
msg_namelen: 0, msg_namelen: 0,
@ -138,7 +138,8 @@ impl SeqPacketSocket {
msg_flags: libc::MSG_CMSG_CLOEXEC, msg_flags: libc::MSG_CMSG_CLOEXEC,
}); });
poll_fn(move |cx| self.poll_recvmsg(cx, &mut msg)).await let data_size = poll_fn(|cx| self.poll_recvmsg(cx, &mut msg)).await?;
Ok((data_size, msg.0.msg_controllen as usize))
} }
#[inline] #[inline]

View File

@ -8,14 +8,16 @@ use std::os::unix::io::{FromRawFd, RawFd};
use std::{io, mem}; use std::{io, mem};
use failure::{bail, format_err, Error}; use failure::{bail, format_err, Error};
use io_uring::socket::SeqPacketSocket;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use libc::pid_t; use libc::pid_t;
use nix::errno::Errno; use nix::errno::Errno;
use crate::io::cmsg;
use crate::io::seq_packet::SeqPacketSocket;
use crate::iovec::{IoVec, IoVecMut};
use crate::process::PidFd; use crate::process::PidFd;
use crate::seccomp::{SeccompNotif, SeccompNotifResp, SeccompNotifSizes}; use crate::seccomp::{SeccompNotif, SeccompNotifResp, SeccompNotifSizes};
use crate::tools::{Fd, FromFd, IoVec, IoVecMut}; use crate::tools::{Fd, FromFd};
/// Seccomp notification proxy message sent by the lxc monitor. /// Seccomp notification proxy message sent by the lxc monitor.
/// ///
@ -132,19 +134,18 @@ impl ProxyMessageBuffer {
self.cookie_buf.set_len(0); self.cookie_buf.set_len(0);
} }
let mut fd_cmsg_buf = io_uring::socket::cmsg::buffer::<[RawFd; 2]>(); let mut fd_cmsg_buf = cmsg::buffer::<[RawFd; 2]>();
let mut res = socket let (datalen, cmsglen) = socket
.recvmsg_vectored(&mut iovec, Some(&mut fd_cmsg_buf)) .recvmsg_vectored(&mut iovec, &mut fd_cmsg_buf)
.await?; .await?;
if res.is_empty() { if datalen == 0 {
return Ok(false); return Ok(false);
} }
self.set_len(res.len())?; self.set_len(datalen)?;
let cmsg = res let cmsg = cmsg::iter(&fd_cmsg_buf[..cmsglen])
.take_control_messages()
.next() .next()
.ok_or_else(|| format_err!("missing file descriptors in message"))?; .ok_or_else(|| format_err!("missing file descriptors in message"))?;
@ -208,7 +209,7 @@ impl ProxyMessageBuffer {
unsafe { io_vec(&self.seccomp_resp) }, unsafe { io_vec(&self.seccomp_resp) },
]; ];
let len = iov.iter().map(|e| e.len()).sum(); let len = iov.iter().map(|e| e.len()).sum();
if socket.sendmsg_vectored(&iov, &[]).await? != len { if socket.sendmsg_vectored(&iov).await? != len {
io_bail!("truncated message?"); io_bail!("truncated message?");
} }
Ok(()) Ok(())

View File

@ -1,5 +1,5 @@
use std::future::Future; use std::future::Future;
use std::io; use std::io as StdIo;
use failure::{bail, format_err, Error}; use failure::{bail, format_err, Error};
use nix::sys::socket::SockAddr; use nix::sys::socket::SockAddr;
@ -14,22 +14,19 @@ pub mod epoll;
pub mod error; pub mod error;
pub mod executor; pub mod executor;
pub mod fork; pub mod fork;
pub mod io;
pub mod iovec; pub mod iovec;
pub mod lxcseccomp; pub mod lxcseccomp;
pub mod nsfd; pub mod nsfd;
pub mod pipe;
pub mod poll_fn; pub mod poll_fn;
pub mod process; pub mod process;
pub mod reactor;
pub mod rw_traits;
pub mod seccomp; pub mod seccomp;
pub mod seq_packet;
pub mod sys_mknod; pub mod sys_mknod;
pub mod sys_quotactl; pub mod sys_quotactl;
pub mod syscall; pub mod syscall;
pub mod tools; pub mod tools;
use io_uring::socket::SeqPacketListener; use crate::io::seq_packet::SeqPacketListener;
static mut EXECUTOR: *mut executor::ThreadPool = std::ptr::null_mut(); static mut EXECUTOR: *mut executor::ThreadPool = std::ptr::null_mut();
@ -61,14 +58,14 @@ async fn do_main() -> Result<(), Error> {
match std::fs::remove_file(&socket_path) { match std::fs::remove_file(&socket_path) {
Ok(_) => (), Ok(_) => (),
Err(ref e) if e.kind() == io::ErrorKind::NotFound => (), // Ok Err(ref e) if e.kind() == StdIo::ErrorKind::NotFound => (), // Ok
Err(e) => bail!("failed to remove previous socket: {}", e), Err(e) => bail!("failed to remove previous socket: {}", e),
} }
let address = let address =
SockAddr::new_unix(socket_path.as_os_str()).expect("cannot create struct sockaddr_un?"); SockAddr::new_unix(socket_path.as_os_str()).expect("cannot create struct sockaddr_un?");
let mut listener = SeqPacketListener::bind_default(&address) let mut listener = SeqPacketListener::bind(&address)
.map_err(|e| format_err!("failed to create listening socket: {}", e))?; .map_err(|e| format_err!("failed to create listening socket: {}", e))?;
loop { loop {
let client = listener.accept().await?; let client = listener.accept().await?;

View File

@ -1,5 +0,0 @@
use crate::rw_traits;
pub struct Pipe<RW> {
fd: PolledFd,
}

View File

@ -5,8 +5,6 @@
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
pub use io_uring::iovec::{IoVec, IoVecMut};
/// Guard a raw file descriptor with a drop handler. This is mostly useful when access to an owned /// Guard a raw file descriptor with a drop handler. This is mostly useful when access to an owned
/// `RawFd` is required without the corresponding handler object (such as when only the file /// `RawFd` is required without the corresponding handler object (such as when only the file
/// descriptor number is required in a closure which may be dropped instead of being executed). /// descriptor number is required in a closure which may be dropped instead of being executed).