forked from Proxmox/proxmox
proxmox-shared-memory: shared memory and shared mutex implementation
This commit is contained in:
parent
956e7041fe
commit
9828acd2ef
@ -8,6 +8,7 @@ members = [
|
||||
"proxmox-lang",
|
||||
"proxmox-router",
|
||||
"proxmox-schema",
|
||||
"proxmox-shared-memory",
|
||||
"proxmox-section-config",
|
||||
"proxmox-sortable-macro",
|
||||
"proxmox-tfa",
|
||||
|
1
Makefile
1
Makefile
@ -9,6 +9,7 @@ CRATES = \
|
||||
proxmox-lang \
|
||||
proxmox-router \
|
||||
proxmox-schema \
|
||||
proxmox-shared-memory \
|
||||
proxmox-section-config \
|
||||
proxmox-sortable-macro \
|
||||
proxmox-tfa \
|
||||
|
15
proxmox-shared-memory/Cargo.toml
Normal file
15
proxmox-shared-memory/Cargo.toml
Normal file
@ -0,0 +1,15 @@
|
||||
[package]
|
||||
name = "proxmox-shared-memory"
|
||||
version = "0.1.0"
|
||||
authors = ["Dietmar Maurer <dietmar@proxmox.com>"]
|
||||
edition = "2018"
|
||||
license = "AGPL-3"
|
||||
description = "Shared Memory IPC"
|
||||
|
||||
|
||||
[dependencies]
|
||||
anyhow = "1.0"
|
||||
libc = "0.2"
|
||||
nix = "0.19.1"
|
||||
|
||||
proxmox = { path = "../proxmox", version = "0.15", default-features = false }
|
279
proxmox-shared-memory/src/lib.rs
Normal file
279
proxmox-shared-memory/src/lib.rs
Normal file
@ -0,0 +1,279 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::os::unix::io::AsRawFd;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::fs::File;
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::os::unix::io::FromRawFd;
|
||||
use std::ffi::{CStr, CString};
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
use nix::fcntl::OFlag;
|
||||
use nix::sys::mman::{MapFlags, ProtFlags};
|
||||
use nix::sys::stat::Mode;
|
||||
use nix::errno::Errno;
|
||||
|
||||
use proxmox::tools::fs::{create_path, CreateOptions};
|
||||
use proxmox::tools::mmap::Mmap;
|
||||
use proxmox::sys::error::SysError;
|
||||
|
||||
mod raw_shared_mutex;
|
||||
|
||||
mod shared_mutex;
|
||||
pub use shared_mutex::*;
|
||||
|
||||
/// Data inside SharedMemory need to implement this trait
|
||||
///
|
||||
/// IMPORTANT: Please use #[repr(C)] for all types implementing this
|
||||
pub trait Init: Sized {
|
||||
/// Make sure the data struicture is initialized. This is called
|
||||
/// after mapping into shared memory. The caller makes sure that
|
||||
/// no other process run this at the same time.
|
||||
fn initialize(this: &mut MaybeUninit<Self>);
|
||||
}
|
||||
|
||||
/// Memory mapped shared memory region
|
||||
///
|
||||
/// This allows access to same memory region for multiple
|
||||
/// processes. You should only use atomic types from 'std::sync::atomic', or
|
||||
/// protect the data with [SharedMutex].
|
||||
///
|
||||
/// SizeOf(T) needs to be a multiple of 4096 (the page size).
|
||||
pub struct SharedMemory<T> {
|
||||
mmap: Mmap<T>,
|
||||
}
|
||||
|
||||
const fn up_to_page_size(n: usize) -> usize {
|
||||
// FIXME: use sysconf(_SC_PAGE_SIZE)
|
||||
(n + 4095) & !4095
|
||||
}
|
||||
|
||||
fn mmap_file<T: Init>(file: &mut File, initialize: bool) -> Result<Mmap<T>, Error> {
|
||||
// map it as MaybeUninit
|
||||
let mut mmap: Mmap<MaybeUninit<T>> = unsafe {
|
||||
Mmap::map_fd(
|
||||
file.as_raw_fd(),
|
||||
0, 1,
|
||||
ProtFlags::PROT_READ | ProtFlags::PROT_WRITE,
|
||||
MapFlags::MAP_SHARED | MapFlags::MAP_NORESERVE | MapFlags::MAP_POPULATE,
|
||||
)?
|
||||
};
|
||||
|
||||
if initialize {
|
||||
Init::initialize(&mut mmap[0]);
|
||||
}
|
||||
|
||||
Ok(unsafe { std::mem::transmute(mmap) })
|
||||
}
|
||||
|
||||
impl <T: Sized + Init> SharedMemory<T> {
|
||||
|
||||
pub fn open(path: &Path, options: CreateOptions) -> Result<Self, Error> {
|
||||
|
||||
let size = std::mem::size_of::<T>();
|
||||
let up_size = up_to_page_size(size);
|
||||
|
||||
if size != up_size {
|
||||
bail!("SharedMemory::open {:?} failed - data size {} is not a multiple of 4096", path, size);
|
||||
}
|
||||
|
||||
let mmap = Self::open_shmem(path, options)?;
|
||||
|
||||
Ok(Self { mmap })
|
||||
}
|
||||
|
||||
pub fn open_shmem<P: AsRef<Path>>(
|
||||
path: P,
|
||||
options: CreateOptions,
|
||||
) -> Result<Mmap<T>, Error> {
|
||||
let path = path.as_ref();
|
||||
|
||||
let dir_name = path
|
||||
.parent()
|
||||
.ok_or_else(|| format_err!("bad path {:?}", path))?
|
||||
.to_owned();
|
||||
|
||||
let statfs = nix::sys::statfs::statfs(&dir_name)?;
|
||||
if statfs.filesystem_type() != nix::sys::statfs::TMPFS_MAGIC {
|
||||
bail!("path {:?} is not on tmpfs", dir_name);
|
||||
}
|
||||
|
||||
let oflag = OFlag::O_RDWR | OFlag::O_CLOEXEC;
|
||||
|
||||
// Try to open existing file
|
||||
match nix::fcntl::open(path, oflag, Mode::empty()) {
|
||||
Ok(fd) => {
|
||||
let mut file = unsafe { File::from_raw_fd(fd) };
|
||||
let mmap = mmap_file(&mut file, false)?;
|
||||
return Ok(mmap);
|
||||
}
|
||||
Err(err) => {
|
||||
if err.not_found() {
|
||||
// fall thrue - try to create the file
|
||||
} else {
|
||||
bail!("open {:?} failed - {}", path, err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// create temporary file using O_TMPFILE
|
||||
let mut file = match nix::fcntl::open(&dir_name, oflag | OFlag::O_TMPFILE, Mode::empty()) {
|
||||
Ok(fd) => {
|
||||
let mut file = unsafe { File::from_raw_fd(fd) };
|
||||
options.apply_to(&mut file, &dir_name)?;
|
||||
file
|
||||
}
|
||||
Err(err) => {
|
||||
bail!("open tmpfile in {:?} failed - {}", dir_name, err);
|
||||
}
|
||||
};
|
||||
|
||||
let size = std::mem::size_of::<T>();
|
||||
let size = up_to_page_size(size);
|
||||
|
||||
nix::unistd::ftruncate(file.as_raw_fd(), size as i64)?;
|
||||
|
||||
// link the file into place:
|
||||
let proc_path = format!("/proc/self/fd/{}\0", file.as_raw_fd());
|
||||
let proc_path = unsafe { CStr::from_bytes_with_nul_unchecked(proc_path.as_bytes()) };
|
||||
|
||||
let mmap = mmap_file(&mut file, true)?;
|
||||
|
||||
let res = {
|
||||
let path = CString::new(path.as_os_str().as_bytes())?;
|
||||
Errno::result(unsafe {
|
||||
libc::linkat(
|
||||
-1,
|
||||
proc_path.as_ptr(),
|
||||
libc::AT_FDCWD,
|
||||
path.as_ptr(),
|
||||
libc::AT_SYMLINK_FOLLOW,
|
||||
)
|
||||
})
|
||||
};
|
||||
|
||||
drop(file); // no longer required
|
||||
|
||||
match res {
|
||||
Ok(_rc) => return Ok(mmap),
|
||||
// if someone else was faster, open the existing file:
|
||||
Err(nix::Error::Sys(Errno::EEXIST)) => {
|
||||
// if opening fails again now, we'll just error...
|
||||
match nix::fcntl::open(path, oflag, Mode::empty()) {
|
||||
Ok(fd) => {
|
||||
let mut file = unsafe { File::from_raw_fd(fd) };
|
||||
let mmap = mmap_file(&mut file, false)?;
|
||||
return Ok(mmap);
|
||||
}
|
||||
Err(err) => bail!("open {:?} failed - {}", path, err),
|
||||
};
|
||||
}
|
||||
Err(err) => return Err(err.into()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn data(&self) -> &T {
|
||||
&self.mmap[0]
|
||||
}
|
||||
|
||||
pub fn data_mut(&mut self) -> &mut T {
|
||||
&mut self.mmap[0]
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use super::*;
|
||||
|
||||
use std::sync::atomic::AtomicU64;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[repr(C)]
|
||||
struct TestData {
|
||||
count: u64,
|
||||
value1: u64,
|
||||
value2: u64,
|
||||
}
|
||||
|
||||
impl Init for TestData {
|
||||
|
||||
fn initialize(this: &mut MaybeUninit<Self>) {
|
||||
this.write(Self {
|
||||
count: 0,
|
||||
value1: 0xffff_ffff_ffff_0000,
|
||||
value2: 0x0000_ffff_ffff_ffff,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
struct SingleMutexData {
|
||||
data: SharedMutex<TestData>,
|
||||
padding: [u8; 4096 - 64],
|
||||
}
|
||||
|
||||
impl Init for SingleMutexData {
|
||||
fn initialize(this: &mut MaybeUninit<Self>) {
|
||||
let me = unsafe { &mut *this.as_mut_ptr() };
|
||||
let data: &mut MaybeUninit<SharedMutex<TestData>> = unsafe { std::mem::transmute(&mut me.data) };
|
||||
Init::initialize(data);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_shared_memory_mutex() -> Result<(), Error> {
|
||||
|
||||
create_path("/run/proxmox-shmem", None, None);
|
||||
|
||||
let shared: SharedMemory<SingleMutexData> =
|
||||
SharedMemory::open(Path::new("/run/proxmox-shmem/test.shm"), CreateOptions::new())?;
|
||||
|
||||
let mut guard = shared.data().data.lock();
|
||||
println!("DATA {:?}", *guard);
|
||||
guard.count += 1;
|
||||
println!("DATA {:?}", *guard);
|
||||
|
||||
//unimplemented!();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[repr(C)]
|
||||
struct MultiMutexData {
|
||||
acount: AtomicU64,
|
||||
block1: SharedMutex<TestData>,
|
||||
block2: SharedMutex<TestData>,
|
||||
padding: [u8; 4096 - 136],
|
||||
}
|
||||
|
||||
impl Init for MultiMutexData {
|
||||
fn initialize(this: &mut MaybeUninit<Self>) {
|
||||
let me = unsafe { &mut *this.as_mut_ptr() };
|
||||
|
||||
let block1: &mut MaybeUninit<SharedMutex<TestData>> = unsafe { std::mem::transmute(&mut me.block1) };
|
||||
Init::initialize(block1);
|
||||
|
||||
let block2: &mut MaybeUninit<SharedMutex<TestData>> = unsafe { std::mem::transmute(&mut me.block2) };
|
||||
Init::initialize(block2);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_shared_memory_multi_mutex() -> Result<(), Error> {
|
||||
|
||||
let shared: SharedMemory<MultiMutexData> =
|
||||
SharedMemory::open(Path::new("/run/proxmox-shmem/test3.shm"), CreateOptions::new())?;
|
||||
|
||||
let mut guard = shared.data().block1.lock();
|
||||
println!("BLOCK1 {:?}", *guard);
|
||||
guard.count += 1;
|
||||
|
||||
let mut guard = shared.data().block2.lock();
|
||||
println!("BLOCK2 {:?}", *guard);
|
||||
guard.count += 2;
|
||||
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
}
|
104
proxmox-shared-memory/src/raw_shared_mutex.rs
Normal file
104
proxmox-shared-memory/src/raw_shared_mutex.rs
Normal file
@ -0,0 +1,104 @@
|
||||
use std::cell::UnsafeCell;
|
||||
use std::mem::MaybeUninit;
|
||||
|
||||
//use anyhow::Error;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct RawSharedMutex {
|
||||
inner: UnsafeCell<libc::pthread_mutex_t>,
|
||||
}
|
||||
|
||||
unsafe impl Send for RawSharedMutex {}
|
||||
unsafe impl Sync for RawSharedMutex {}
|
||||
|
||||
impl RawSharedMutex {
|
||||
|
||||
pub const fn uninitialized() -> Self {
|
||||
Self { inner: UnsafeCell::new(libc::PTHREAD_MUTEX_INITIALIZER) }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn init(&mut self) {
|
||||
let mut attr = MaybeUninit::<libc::pthread_mutexattr_t>::uninit();
|
||||
cvt_nz(libc::pthread_mutexattr_init(attr.as_mut_ptr())).unwrap();
|
||||
let attr = PthreadMutexAttr(&mut attr);
|
||||
cvt_nz(libc::pthread_mutexattr_settype(attr.0.as_mut_ptr(), libc::PTHREAD_MUTEX_NORMAL))
|
||||
.unwrap();
|
||||
cvt_nz(libc::pthread_mutexattr_setpshared(attr.0.as_mut_ptr(), libc::PTHREAD_PROCESS_SHARED))
|
||||
.unwrap();
|
||||
cvt_nz(pthread_mutexattr_setrobust(attr.0.as_mut_ptr(), PTHREAD_MUTEX_ROBUST))
|
||||
.unwrap();
|
||||
cvt_nz(libc::pthread_mutex_init(self.inner.get(), attr.0.as_ptr())).unwrap();
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn lock(&self) {
|
||||
let mut r = libc::pthread_mutex_lock(self.inner.get());
|
||||
if r == libc::EOWNERDEAD {
|
||||
r = pthread_mutex_consistent(self.inner.get());
|
||||
}
|
||||
|
||||
debug_assert_eq!(r, 0);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn unlock(&self) {
|
||||
let r = libc::pthread_mutex_unlock(self.inner.get());
|
||||
debug_assert_eq!(r, 0);
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub unsafe fn try_lock(&self) -> bool {
|
||||
let mut r = libc::pthread_mutex_trylock(self.inner.get());
|
||||
if r == libc::EOWNERDEAD {
|
||||
r = pthread_mutex_consistent(self.inner.get());
|
||||
}
|
||||
|
||||
r == 0
|
||||
}
|
||||
|
||||
/*
|
||||
#[inline]
|
||||
pub unsafe fn destroy(&self) {
|
||||
let r = libc::pthread_mutex_destroy(self.inner.get());
|
||||
debug_assert_eq!(r, 0);
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
|
||||
// Note: copied from rust std::sys::unix::cvt_nz
|
||||
fn cvt_nz(error: libc::c_int) -> std::io::Result<()> {
|
||||
if error == 0 {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(std::io::Error::from_raw_os_error(error))
|
||||
}
|
||||
}
|
||||
|
||||
// Copied from rust standard libs
|
||||
struct PthreadMutexAttr<'a>(&'a mut MaybeUninit<libc::pthread_mutexattr_t>);
|
||||
|
||||
impl Drop for PthreadMutexAttr<'_> {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
let result = libc::pthread_mutexattr_destroy(self.0.as_mut_ptr());
|
||||
debug_assert_eq!(result, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Those thing need rust libc wrapper 0.2.105 (we have 0.2.94), so
|
||||
// we import ourselves
|
||||
|
||||
pub const PTHREAD_MUTEX_ROBUST: libc::c_int = 1;
|
||||
|
||||
#[link(name = "c")]
|
||||
extern {
|
||||
fn pthread_mutexattr_setrobust(
|
||||
attr: *mut libc::pthread_mutexattr_t,
|
||||
robustness: libc::c_int,
|
||||
) -> libc::c_int;
|
||||
|
||||
fn pthread_mutex_consistent(mutex: *mut libc::pthread_mutex_t) -> libc::c_int;
|
||||
}
|
107
proxmox-shared-memory/src/shared_mutex.rs
Normal file
107
proxmox-shared-memory/src/shared_mutex.rs
Normal file
@ -0,0 +1,107 @@
|
||||
use std::cell::UnsafeCell;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::{Deref, DerefMut};
|
||||
|
||||
use crate::Init;
|
||||
use crate::raw_shared_mutex::RawSharedMutex;
|
||||
|
||||
#[derive(Debug)]
|
||||
#[repr(C)]
|
||||
pub struct SharedMutex<T: ?Sized> {
|
||||
inner: RawSharedMutex,
|
||||
data: UnsafeCell<T>,
|
||||
}
|
||||
|
||||
unsafe impl<T: ?Sized + Send> Send for SharedMutex<T> {}
|
||||
unsafe impl<T: ?Sized + Send> Sync for SharedMutex<T> {}
|
||||
|
||||
impl <T: Init> Init for SharedMutex<T> {
|
||||
|
||||
fn initialize(this: &mut MaybeUninit<SharedMutex<T>>) {
|
||||
|
||||
let me = unsafe { &mut *this.as_mut_ptr() };
|
||||
|
||||
me.inner = RawSharedMutex::uninitialized();
|
||||
println!("INITIALIZE MUTEX");
|
||||
unsafe { me.inner.init(); }
|
||||
|
||||
let u: &mut MaybeUninit<T> = unsafe { std::mem::transmute(me.data.get_mut()) };
|
||||
Init::initialize(u);
|
||||
}
|
||||
}
|
||||
|
||||
impl <T: Default> Default for SharedMutex<T> {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
inner: RawSharedMutex::uninitialized(),
|
||||
data: UnsafeCell::new(T::default()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> SharedMutex<T> {
|
||||
|
||||
pub fn lock(&self) -> SharedMutexGuard<'_, T> {
|
||||
|
||||
unsafe {
|
||||
self.inner.lock();
|
||||
SharedMutexGuard::new(self)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_lock(&self) -> Option<SharedMutexGuard<'_, T>> {
|
||||
unsafe {
|
||||
if self.inner.try_lock() {
|
||||
Some(SharedMutexGuard::new(self))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unlock(guard: SharedMutexGuard<'_, T>) {
|
||||
drop(guard);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
pub struct SharedMutexGuard<'a, T: ?Sized + 'a> {
|
||||
lock: &'a SharedMutex<T>,
|
||||
|
||||
_phantom_data: PhantomData<*const ()>, // make it !Send
|
||||
}
|
||||
|
||||
unsafe impl<T: ?Sized + Sync> Sync for SharedMutexGuard<'_, T> {}
|
||||
|
||||
impl<'a, T: ?Sized> SharedMutexGuard<'a, T> {
|
||||
fn new(lock: &'a SharedMutex<T>) -> SharedMutexGuard<'a, T> {
|
||||
SharedMutexGuard {
|
||||
lock,
|
||||
_phantom_data: PhantomData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized> Deref for SharedMutexGuard<'_, T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &T {
|
||||
unsafe { &*self.lock.data.get() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized> DerefMut for SharedMutexGuard<'_, T> {
|
||||
fn deref_mut(&mut self) -> &mut T {
|
||||
unsafe { &mut *self.lock.data.get() }
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: ?Sized> Drop for SharedMutexGuard<'_, T> {
|
||||
#[inline]
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
self.lock.inner.unlock();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user