handle dropped fds

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2019-10-30 12:34:27 +01:00
parent beb2f986f9
commit f757af32da

View File

@ -1,8 +1,7 @@
use std::convert::TryFrom;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::thread::JoinHandle;
@ -21,29 +20,33 @@ pub const READY_ERR: u32 = 0b100;
pub struct Reactor {
epoll: Arc<Epoll>,
removed: Mutex<Vec<(RawFd, Box<RegistrationInner>)>>,
thread: JoinHandle<()>,
thread: Mutex<Option<JoinHandle<()>>>,
}
impl Reactor {
pub fn new() -> io::Result<Arc<Self>> {
let epoll = Arc::new(Epoll::new()?);
let handle = std::thread::spawn({
let epoll = Arc::clone(&epoll);
move || Self::thread_main(epoll)
});
Ok(Arc::new(Self {
let this = Arc::new(Reactor {
epoll,
removed: Mutex::new(Vec::new()),
thread: handle,
}))
thread: Mutex::new(None),
});
let handle = std::thread::spawn({
let this = Arc::clone(&this);
move || this.thread_main()
});
this.thread.lock().unwrap().replace(handle);
Ok(this)
}
fn thread_main(epoll: Arc<Epoll>) {
fn thread_main(self: Arc<Self>) {
let mut buf: [EpollEvent; 16] = unsafe { std::mem::zeroed() };
loop {
let count = match epoll.wait(&mut buf, None) {
let count = match self.epoll.wait(&mut buf, None) {
Ok(count) => count,
Err(err) => {
eprintln!("error in epoll loop: {}", err);
@ -51,13 +54,20 @@ impl Reactor {
}
};
for i in 0..count {
Self::handle_event(&buf[i]);
self.handle_event(&buf[i]);
}
}
}
fn handle_event(event: &EpollEvent) {
fn handle_event(&self, event: &EpollEvent) {
let registration = unsafe { &mut *(event.r#u64 as *mut RegistrationInner) };
for (fd, _) in self.removed.lock().unwrap().iter() {
// This fd is already being dropped, don't touch it!
if *fd == registration.fd {
return;
}
}
if 0 != (event.events & EPOLLIN) {
//let _prev = registration.ready.fetch_or(READY_IN, Ordering::AcqRel);
if let Some(waker) = registration.read_waker.lock().unwrap().take() {