Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-10-30 13:42:29 +01:00
parent a3afcf22af
commit 9ebd1972eb
2 changed files with 27 additions and 15 deletions

View File

@ -1,6 +1,7 @@
use std::convert::TryFrom;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread::JoinHandle;
@ -12,7 +13,7 @@ use crate::tools::Fd;
pub struct Reactor {
epoll: Arc<Epoll>,
removed: Mutex<Vec<(RawFd, Box<RegistrationInner>)>>,
removed: Mutex<Vec<Box<RegistrationInner>>>,
thread: Mutex<Option<JoinHandle<()>>>,
}
@ -49,16 +50,16 @@ impl Reactor {
for i in 0..count {
self.handle_event(&buf[i]);
}
// After going through the events we can release memory associated with already closed
// file descriptors:
self.removed.lock().unwrap().clear();
}
}
fn handle_event(&self, event: &EpollEvent) {
let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
for (fd, _) in self.removed.lock().unwrap().iter() {
// This fd is already being dropped, don't touch it!
if *fd == registration.fd {
return;
}
if registration.gone.load(Ordering::Acquire) {
return;
}
if 0 != (event.events & EPOLLIN) {
@ -88,8 +89,7 @@ impl Reactor {
pub fn register(self: Arc<Self>, fd: RawFd) -> io::Result<Registration> {
let mut inner = Box::new(RegistrationInner {
fd,
//ready: AtomicU32::new(0),
gone: AtomicBool::new(false),
reactor: Arc::clone(&self),
read_waker: Mutex::new(None),
write_waker: Mutex::new(None),
@ -109,10 +109,7 @@ impl Reactor {
}
fn deregister(&self, registration: Box<RegistrationInner>) {
self.removed
.lock()
.unwrap()
.push((registration.fd, registration));
self.removed.lock().unwrap().push(registration);
}
}
@ -124,15 +121,16 @@ pub struct Registration {
impl Drop for Registration {
fn drop(&mut self) {
let reactor = Arc::clone(&self.inner.as_ref().unwrap().reactor);
let inner = self.inner.as_ref().unwrap();
let reactor = Arc::clone(&inner.reactor);
inner.gone.store(true, Ordering::Release);
reactor.deregister(self.inner.take().unwrap());
}
}
// This is accessed by the reactor
struct RegistrationInner {
fd: RawFd,
//ready: AtomicU32,
gone: AtomicBool,
reactor: Arc<Reactor>,
read_waker: Mutex<Option<Waker>>,
write_waker: Mutex<Option<Waker>>,

View File

@ -21,6 +21,20 @@ impl FromRawFd for Fd {
}
}
impl Fd {
pub fn set_nonblocking(&self, nb: bool) -> std::io::Result<()> {
let fd = self.as_raw_fd();
let flags = c_try!(unsafe { libc::fcntl(fd, libc::F_GETFL) });
let flags = if nb {
flags | libc::O_NONBLOCK
} else {
flags & !libc::O_NONBLOCK
};
c_try!(unsafe { libc::fcntl(fd, libc::F_SETFL, flags) });
Ok(())
}
}
/// Byte vector utilities.
pub mod vec {
/// Create an uninitialized byte vector of a specific size.