pipe implementation

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-10-30 14:22:41 +01:00
parent 86b8386760
commit f4c536439a
3 changed files with 84 additions and 31 deletions

View File

@ -16,6 +16,7 @@ pub mod executor;
pub mod fork;
pub mod lxcseccomp;
pub mod nsfd;
pub mod pipe;
pub mod poll_fn;
pub mod process;
pub mod reactor;

74
src/pipe.rs Normal file
View File

@ -0,0 +1,74 @@
use std::convert::TryFrom;
use std::io;
use std::marker::PhantomData;
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::task::{Context, Poll};
use crate::error::io_err_other;
use crate::poll_fn::poll_fn;
use crate::reactor::PolledFd;
use crate::rw_traits;
use crate::tools::Fd;
pub struct Pipe<RW> {
fd: PolledFd,
_phantom: PhantomData<RW>,
}
impl<RW> AsRawFd for Pipe<RW> {
#[inline]
fn as_raw_fd(&self) -> RawFd {
self.fd.as_raw_fd()
}
}
pub fn pipe() -> io::Result<(Pipe<rw_traits::Read>, Pipe<rw_traits::Write>)> {
let mut pfd: [RawFd; 2] = [0, 0];
c_try!(unsafe { libc::pipe2(pfd.as_mut_ptr(), libc::O_CLOEXEC) });
let (fd_in, fd_out) = unsafe { (Fd::from_raw_fd(pfd[0]), Fd::from_raw_fd(pfd[1])) };
let fd_in = PolledFd::new(fd_in)?;
let fd_out = PolledFd::new(fd_out)?;
Ok((
Pipe {
fd: fd_in,
_phantom: PhantomData,
},
Pipe {
fd: fd_out,
_phantom: PhantomData,
},
))
}
impl<RW: rw_traits::HasRead> Pipe<RW> {
pub fn poll_read(&mut self, cx: &mut Context, data: &mut [u8]) -> Poll<io::Result<usize>> {
let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
let fd = self.fd.as_raw_fd();
self.fd.wrap_read(cx, || {
c_result!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) })
.map(|res| res as usize)
})
}
pub async fn read(&mut self, data: &mut [u8]) -> io::Result<usize> {
poll_fn(move |cx| self.poll_read(cx, data)).await
}
}
impl<RW: rw_traits::HasWrite> Pipe<RW> {
pub fn poll_write(&mut self, data: &[u8], cx: &mut Context) -> Poll<io::Result<usize>> {
let fd = self.fd.as_raw_fd();
let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
self.fd.wrap_write(cx, || {
c_result!(unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, size) })
.map(|res| res as usize)
})
}
pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
poll_fn(move |cx| self.poll_write(data, cx)).await
}
}

View File

@ -1,4 +1,3 @@
use std::convert::TryFrom;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::{AtomicBool, Ordering};
@ -7,8 +6,6 @@ use std::task::{Context, Poll, Waker};
use std::thread::JoinHandle;
use crate::epoll::{Epoll, EpollEvent, EPOLLERR, EPOLLET, EPOLLHUP, EPOLLIN, EPOLLOUT};
use crate::error::io_err_other;
use crate::poll_fn::poll_fn;
use crate::tools::Fd;
static START: Once = Once::new();
@ -152,6 +149,15 @@ pub struct PolledFd {
registration: Registration,
}
// NOTE: For IntoRawFd we'd need to deregister from the reactor explicitly!
impl AsRawFd for PolledFd {
#[inline]
fn as_raw_fd(&self) -> RawFd {
self.fd.as_raw_fd()
}
}
impl PolledFd {
pub fn new(fd: Fd) -> io::Result<Self> {
Self::new_with_reactor(fd, crate::reactor::default())
@ -206,31 +212,3 @@ impl PolledFd {
}
}
}
impl PolledFd {
pub fn poll_read(&mut self, cx: &mut Context, data: &mut [u8]) -> Poll<io::Result<usize>> {
let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
let fd = self.fd.as_raw_fd();
self.wrap_read(cx, || {
c_result!(unsafe { libc::read(fd, data.as_mut_ptr() as *mut libc::c_void, size) })
.map(|res| res as usize)
})
}
pub async fn read(&mut self, data: &mut [u8]) -> io::Result<usize> {
poll_fn(move |cx| self.poll_read(cx, data)).await
}
pub fn poll_write(&mut self, data: &[u8], cx: &mut Context) -> Poll<io::Result<usize>> {
let fd = self.fd.as_raw_fd();
let size = libc::size_t::try_from(data.len()).map_err(io_err_other)?;
self.wrap_write(cx, || {
c_result!(unsafe { libc::write(fd, data.as_ptr() as *const libc::c_void, size) })
.map(|res| res as usize)
})
}
pub async fn write(&mut self, data: &[u8]) -> io::Result<usize> {
poll_fn(move |cx| self.poll_write(data, cx)).await
}
}