From 9adda25e00eefae9fa6efdf0c2fc60a5fc33ce78 Mon Sep 17 00:00:00 2001 From: ClSlaid Date: Mon, 16 Oct 2023 00:51:12 +0800 Subject: [PATCH] patch: simplify FUSE Signed-off-by: ClSlaid --- Cargo.lock | 1 - libs/clipboard/Cargo.toml | 1 - libs/clipboard/src/platform/fuse.rs | 539 +++++++---------------- libs/clipboard/src/platform/linux/mod.rs | 288 ++++-------- libs/clipboard/src/platform/linux/x11.rs | 144 +++++- libs/clipboard/src/platform/mod.rs | 12 +- 6 files changed, 387 insertions(+), 598 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f82bd25c3..088d4708a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -941,7 +941,6 @@ dependencies = [ "parking_lot", "percent-encoding", "rand 0.8.5", - "rayon", "serde 1.0.163", "serde_derive", "thiserror", diff --git a/libs/clipboard/Cargo.toml b/libs/clipboard/Cargo.toml index 42bb06a1d..3de02f768 100644 --- a/libs/clipboard/Cargo.toml +++ b/libs/clipboard/Cargo.toml @@ -23,7 +23,6 @@ x11rb = {version = "0.12", features = ["all-extensions"]} rand = {version = "0.8"} fuser = {version = "0.13"} libc = {version = "0.2"} -rayon = {version = "1.7"} dashmap = "5.5" percent-encoding = "2.3" utf16string = "0.2" diff --git a/libs/clipboard/src/platform/fuse.rs b/libs/clipboard/src/platform/fuse.rs index 606237d8b..f641b2cc3 100644 --- a/libs/clipboard/src/platform/fuse.rs +++ b/libs/clipboard/src/platform/fuse.rs @@ -24,22 +24,21 @@ use std::{ path::{Path, PathBuf}, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, + mpsc::{Receiver, Sender}, Arc, }, time::{Duration, SystemTime}, }; -use dashmap::DashMap; -use fuser::{ReplyDirectory, Request, FUSE_ROOT_ID}; +use fuser::{ReplyDirectory, FUSE_ROOT_ID}; use hbb_common::{ bytes::{Buf, Bytes}, log, }; -use parking_lot::{Condvar, Mutex, RwLock}; -use rayon::prelude::*; +use parking_lot::{Condvar, Mutex}; use utf16string::WStr; -use crate::{ClipboardFile, CliprdrError}; +use crate::{send_data, ClipboardFile, CliprdrError}; #[cfg(target_os = "linux")] use super::LDAP_EPOCH_DELTA; @@ -54,175 +53,71 @@ const PERM_READ: u16 = 0o444; /// max length of file name const MAX_NAME_LEN: usize = 255; -// fuse server state -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum Status { - // active and ready for all incoming requests - Active, - // marking and waiting for all FDs to be closed - // only serve read requests - Gc, - // gc completes - // serve no requests - GcComplete, - // fetching new files from remote - // serve no requests - // this state is to make sure only one fetching is running - Fetching, - // fetched, building new FS - Building, -} - -#[derive(Debug, Default)] -struct PendingRequest { - content: Mutex>, - cvar: Condvar, -} - -impl PendingRequest { - pub fn new() -> Self { - Self { - content: Mutex::new(None), - cvar: Condvar::new(), - } - } - - pub fn recv_timeout(&self, timeout: Duration) -> Result { - let mut guard = self.content.lock(); - let res = self.cvar.wait_for(&mut guard, timeout); - if res.timed_out() { - Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout")) - } else { - let content = guard.take(); - match content { - Some(content) => Ok(content), - None => Err(std::io::Error::new(std::io::ErrorKind::Other, "no content")), - } - } - } - - pub fn set(&self, content: ClipboardFile) { - let mut guard = self.content.lock(); - let _ = guard.insert(content); - self.cvar.notify_all(); - } -} - -/// clipboard message dispatcher -#[derive(Debug, Default)] -struct CliprdrTxnDispatcher { - txn_handler: DashMap<(i32, Option), Arc>, -} - -impl CliprdrTxnDispatcher { - pub fn send(&self, conn_id: i32, request: ClipboardFile) -> Arc { - let stream_id = match &request { - ClipboardFile::FormatDataRequest { .. } => None, - ClipboardFile::FileContentsRequest { stream_id, .. } => Some(stream_id), - _ => unreachable!(), - }; - - let req = Arc::new(PendingRequest::new()); - self.txn_handler - .insert((conn_id, stream_id.copied()), req.clone()); - - log::debug!( - "send request to conn_id={}, stream_id={:?}", - conn_id, - stream_id - ); - crate::send_data(conn_id, request); - req - } - - pub fn recv(&self, conn_id: i32, response: ClipboardFile) { - let stream_id = match &response { - ClipboardFile::FormatDataResponse { .. } => None, - ClipboardFile::FileContentsResponse { stream_id, .. } => Some(stream_id), - _ => unreachable!(), - }; - let key = (conn_id, stream_id.cloned()); - log::debug!("recv response for {:?}", key); - match self.txn_handler.remove(&key) { - Some((_, tx)) => tx.set(response), - None => log::warn!("no request found for {:?}", key), - } - } -} - -/// this is a proxy type -/// to avoid occupy FuseServer with &mut self +/// fuse client +/// this is a proxy to the fuse server #[derive(Debug)] -pub(crate) struct FuseClient { - server: Arc, -} - -impl FuseClient { - pub fn new(server: Arc) -> Self { - Self { server } - } +pub struct FuseClient { + server: Arc>, } impl fuser::Filesystem for FuseClient { fn init( &mut self, - _req: &fuser::Request<'_>, - _config: &mut fuser::KernelConfig, + req: &fuser::Request<'_>, + config: &mut fuser::KernelConfig, ) -> Result<(), libc::c_int> { - log::debug!("init fuse server"); - - self.server.init(); - Ok(()) + let mut server = self.server.lock(); + server.init(req, config) } fn lookup( &mut self, - _req: &Request, + req: &fuser::Request<'_>, parent: u64, name: &std::ffi::OsStr, reply: fuser::ReplyEntry, ) { - log::debug!("lookup: parent={}, name={:?}", parent, name); - self.server.look_up(parent, name, reply) + let mut server = self.server.lock(); + server.lookup(req, parent, name, reply) } - fn opendir(&mut self, _req: &Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) { - log::debug!("opendir: ino={}, flags={}", ino, flags); - self.server.opendir(ino, flags, reply) + fn opendir(&mut self, req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) { + let mut server = self.server.lock(); + server.opendir(req, ino, flags, reply) } fn readdir( &mut self, - _req: &Request<'_>, + req: &fuser::Request<'_>, ino: u64, fh: u64, offset: i64, - reply: ReplyDirectory, + reply: fuser::ReplyDirectory, ) { - log::debug!("readdir: ino={}, fh={}, offset={}", ino, fh, offset); - self.server.readdir(ino, fh, offset, reply) + let mut server = self.server.lock(); + server.readdir(req, ino, fh, offset, reply) } fn releasedir( &mut self, - _req: &Request<'_>, + req: &fuser::Request<'_>, ino: u64, fh: u64, - flags: i32, + _flags: i32, reply: fuser::ReplyEmpty, ) { - log::debug!("releasedir: ino={}, fh={}, flags={}", ino, fh, flags); - self.server.releasedir(ino, fh, flags, reply) + let mut server = self.server.lock(); + server.releasedir(req, ino, fh, _flags, reply) } - fn open(&mut self, _req: &Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) { - log::debug!("open: ino={}, flags={}", ino, flags); - self.server.open(ino, flags, reply) + fn open(&mut self, req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) { + let mut server = self.server.lock(); + server.open(req, ino, flags, reply) } fn read( &mut self, - _req: &Request<'_>, + req: &fuser::Request<'_>, ino: u64, fh: u64, offset: i64, @@ -231,31 +126,22 @@ impl fuser::Filesystem for FuseClient { lock_owner: Option, reply: fuser::ReplyData, ) { - log::debug!( - "read: ino={}, fh={}, offset={}, size={}, flags={}", - ino, - fh, - offset, - size, - flags - ); - self.server - .read(ino, fh, offset, size, flags, lock_owner, reply) + let mut server = self.server.lock(); + server.read(req, ino, fh, offset, size, flags, lock_owner, reply) } fn release( &mut self, - _req: &Request<'_>, + req: &fuser::Request<'_>, ino: u64, fh: u64, - flags: i32, - lock_owner: Option, - flush: bool, + _flags: i32, + _lock_owner: Option, + _flush: bool, reply: fuser::ReplyEmpty, ) { - log::debug!("release: ino={}, fh={}, flush={}", ino, fh, flush); - self.server - .release(ino, fh, flags, lock_owner, flush, reply) + let mut server = self.server.lock(); + server.release(req, ino, fh, _flags, _lock_owner, _flush, reply) } } @@ -263,58 +149,75 @@ impl fuser::Filesystem for FuseClient { /// provides a read-only file system #[derive(Debug)] pub(crate) struct FuseServer { - status: RwLock, - dispatcher: CliprdrTxnDispatcher, - // timeout - // current files - // inode mapping: - // 1 -> root (parent of all files) - // 2~n+1 -> nth file in the list (n is the length of the list) - // 0 | n+2.. -> not found - // Note that the file tree is pre-ordered - files: RwLock>, + generation: AtomicU64, + files: Vec, // file handle counter file_handle_counter: AtomicU64, - // file system generations - generation: AtomicU64, // timeout timeout: Duration, + // file read reply channel + tx: Sender, + // file read reply channel + rx: Receiver, } impl FuseServer { /// create a new fuse server pub fn new(timeout: Duration) -> Self { + let (tx, rx) = std::sync::mpsc::channel(); Self { - status: RwLock::new(Status::Active), - dispatcher: CliprdrTxnDispatcher::default(), - files: RwLock::new(Vec::new()), - file_handle_counter: AtomicU64::new(0), generation: AtomicU64::new(0), + files: Vec::new(), + file_handle_counter: AtomicU64::new(0), timeout, + rx, + tx, } } - pub fn client(self: &Arc) -> FuseClient { - FuseClient::new(self.clone()) + pub fn client(server: Arc>) -> FuseClient { + FuseClient { server } } +} - pub fn init(&self) { - let mut w_guard = self.files.write(); - if w_guard.is_empty() { +impl FuseServer { + pub fn serve(&mut self, reply: ClipboardFile) -> Result<(), CliprdrError> { + self.tx.send(reply).map_err(|e| { + log::error!("failed to serve cliprdr reply from endpoint: {:?}", e); + CliprdrError::ClipboardInternalError + })?; + Ok(()) + } +} + +impl fuser::Filesystem for FuseServer { + fn init( + &mut self, + _req: &fuser::Request<'_>, + _config: &mut fuser::KernelConfig, + ) -> Result<(), libc::c_int> { + if self.files.is_empty() { // create a root file let root = FuseNode::new_root(); - w_guard.push(root); + self.files.push(root); } + Ok(()) } - pub fn look_up(&self, parent: u64, name: &std::ffi::OsStr, reply: fuser::ReplyEntry) { + fn lookup( + &mut self, + _req: &fuser::Request<'_>, + parent: u64, + name: &std::ffi::OsStr, + reply: fuser::ReplyEntry, + ) { if name.len() > MAX_NAME_LEN { log::debug!("fuse: name too long"); reply.error(libc::ENAMETOOLONG); return; } - let entries = self.files.read(); + let entries = &self.files; let generation = self.generation.load(Ordering::Relaxed); @@ -353,8 +256,14 @@ impl FuseServer { return; } - pub fn opendir(&self, ino: u64, flags: i32, reply: fuser::ReplyOpen) { - let files = self.files.read(); + fn opendir( + &mut self, + _req: &fuser::Request<'_>, + ino: u64, + flags: i32, + reply: fuser::ReplyOpen, + ) { + let files = &self.files; let Some(entry) = files.get(ino as usize - 1) else { reply.error(libc::ENOENT); log::error!("fuse: opendir: entry not found"); @@ -383,8 +292,15 @@ impl FuseServer { return; } - pub fn readdir(&self, ino: u64, fh: u64, offset: i64, mut reply: ReplyDirectory) { - let files = self.files.read(); + fn readdir( + &mut self, + _req: &fuser::Request<'_>, + ino: u64, + fh: u64, + offset: i64, + mut reply: ReplyDirectory, + ) { + let files = &self.files; let Some(entry) = files.get(ino as usize - 1) else { reply.error(libc::ENOENT); log::error!("fuse: readdir: entry not found"); @@ -429,8 +345,15 @@ impl FuseServer { return; } - pub fn releasedir(&self, ino: u64, fh: u64, _flags: i32, reply: fuser::ReplyEmpty) { - let files = self.files.read(); + fn releasedir( + &mut self, + _req: &fuser::Request<'_>, + ino: u64, + fh: u64, + _flags: i32, + reply: fuser::ReplyEmpty, + ) { + let files = &self.files; let Some(entry) = files.get(ino as usize - 1) else { reply.error(libc::ENOENT); log::error!("fuse: releasedir: entry not found"); @@ -452,8 +375,8 @@ impl FuseServer { return; } - pub fn open(&self, ino: u64, flags: i32, reply: fuser::ReplyOpen) { - let files = self.files.read(); + fn open(&mut self, _req: &fuser::Request<'_>, ino: u64, flags: i32, reply: fuser::ReplyOpen) { + let files = &self.files; let Some(entry) = files.get(ino as usize - 1) else { reply.error(libc::ENOENT); log::error!("fuse: open: entry not found"); @@ -485,8 +408,9 @@ impl FuseServer { return; } - pub fn read( - &self, + fn read( + &mut self, + _req: &fuser::Request<'_>, ino: u64, fh: u64, offset: i64, @@ -495,7 +419,7 @@ impl FuseServer { _lock_owner: Option, reply: fuser::ReplyData, ) { - let files = self.files.read(); + let files = &self.files; let Some(entry) = files.get(ino as usize - 1) else { reply.error(libc::ENOENT); log::error!("fuse: read: entry not found"); @@ -536,8 +460,9 @@ impl FuseServer { reply.data(bytes.as_slice()); } - pub fn release( - &self, + fn release( + &mut self, + _req: &fuser::Request<'_>, ino: u64, fh: u64, _flags: i32, @@ -545,7 +470,7 @@ impl FuseServer { _flush: bool, reply: fuser::ReplyEmpty, ) { - let files = self.files.read(); + let files = &self.files; let Some(entry) = files.get(ino as usize - 1) else { reply.error(libc::ENOENT); log::error!("fuse: release: entry not found"); @@ -560,10 +485,12 @@ impl FuseServer { reply.ok(); return; } +} +impl FuseServer { // get files and directory path right in root of FUSE fs pub fn list_root(&self) -> Vec { - let files = self.files.read(); + let files = &self.files; let children = &files[0].children; let mut paths = Vec::with_capacity(children.len()); for idx in children.iter().copied() { @@ -572,59 +499,19 @@ impl FuseServer { paths } - /// gc filesystem - fn gc_files(&self) { - { - let mut status = self.status.write(); - - // received update after fetching complete - // should fetch again - if *status == Status::Building { - *status = Status::GcComplete; - return; - } - - // really update only when: - // running: Active - if *status != Status::Active { - return; - } - *status = Status::Gc; - } - - let mut old = self.files.write(); - let _ = old.par_iter_mut().fold(|| (), |_, f| f.gc()); - - let mut status = self.status.write(); - *status = Status::GcComplete; - } - /// fetch file list from remote fn sync_file_system( - &self, + &mut self, conn_id: i32, file_group_format_id: i32, _file_contents_format_id: i32, ) -> Result { - { - let mut status = self.status.write(); - if *status != Status::GcComplete { - return Ok(false); - } - *status = Status::Fetching; - } - - // request file list - let request = ClipboardFile::FormatDataRequest { - requested_format_id: file_group_format_id, - }; - let rx = self.dispatcher.send(conn_id, request); - let resp = rx.recv_timeout(self.timeout); + let resp = self.send_sync_fs_request(conn_id, file_group_format_id, self.timeout)?; let descs = match resp { - Ok(ClipboardFile::FormatDataResponse { + ClipboardFile::FormatDataResponse { msg_flags, format_data, - }) => { + } => { if msg_flags != 0x1 { log::error!("clipboard FUSE server: received unexpected response flags"); return Err(CliprdrError::ClipboardInternalError); @@ -633,155 +520,61 @@ impl FuseServer { descs } - Ok(_) => { + _ => { log::error!("clipboard FUSE server: received unexpected response type"); - // rollback status - let mut status = self.status.write(); - *status = Status::GcComplete; - - return Err(CliprdrError::ClipboardInternalError); - } - Err(e) => { - log::error!("clipboard FUSE server: failed to fetch file list, {:?}", e); - // rollback status - let mut status = self.status.write(); - *status = Status::GcComplete; return Err(CliprdrError::ClipboardInternalError); } }; - { - // fetch successful, start building - let mut status = self.status.write(); - *status = Status::Building; - } - let mut new_tree = FuseNode::build_tree(descs)?; let res = new_tree - .par_iter_mut() + .iter_mut() .filter(|f_node| f_node.is_file() && f_node.attributes.size == 0) - .fold(|| Ok(()), |_, f_node| self.sync_node_size(f_node)) - .find_last(|p| p.is_err()); + .try_for_each(|f_node| self.sync_node_size(f_node)); - if res.is_some() { - // rollback status on failure - let mut status = self.status.write(); - if *status == Status::Building { - *status = Status::GcComplete; - } - - log::error!("clipboard FUSE server: failed to fetch file size"); + if let Err(err) = res { + log::error!( + "clipboard FUSE server: failed to fetch file size: {:?}", + err + ); return Err(CliprdrError::ClipboardInternalError); } // replace current file system - let mut old = self.files.write(); - { - let mut status = self.status.write(); - if *status != Status::Building { - // build interrupted, meaning fetched data is outdated - // do not replace - return Ok(false); - } - *status = Status::Active; - } - *old = new_tree; + self.files = new_tree; self.generation.fetch_add(1, Ordering::Relaxed); Ok(true) } - /// replace current files with new files, cucurrently - /// - /// # Note - /// - /// This function should allow concurrent calls. In short, the server can handle multiple update_file calles - /// at a short period of time and make sure it call RPCs as few and late as possible. - /// - /// ## Function Phases - /// - /// ### clear phase - /// - /// - just mark all files to be deleted, all new `open` operations will be denied - /// - current FDs will not be affected, listing (in this level of directory) and reading operations can still be performed. - /// - this will return only when all FDs are closed, or some unexpected error occurs - /// - after all FDs are closed and no more FDs can be opened, dropping the current file list will be safe - /// - /// ### request phase - /// - /// - after all FDs are closed, send a format data request to the clipboard server - /// - /// ### replace phase - /// - /// - after all FDs are closed, the file list will be replaced with the new file list - /// - /// ## Concurrent calls - /// - /// ### server is Active - /// - /// threads calling this function may win getting the write lock on server.status: - /// - the winner will start [clear phase], changing the server to Gc. - /// - the loser or later comming threads calling `server.gc_files` will return directly. - /// - /// movement: Active -> Gc - /// - /// ### server is Gc - /// - /// this indicates there must be exactly one thread running in [clear phase]. - /// - the thread will run `server.sync_file_system` after this phase - /// - other threads try to call `server.gc_files` will return directly - /// - other threads try to call `server.sync_file_system` will return directly - /// - no other threads could be running `server.sync_file_system` - /// - /// after all, only one thread will successfully complete the [clear phase], and that thread will try to complete the whole updating. - /// - /// movement: Gc -> GcComplete - /// - /// ### server is GcComplete - /// - /// This indicates there must be at least one thread trying to call `server.sync_file_system`. - /// threads will trying to get the write lock of status. - /// - the winner will set status to Fetching. - /// - the latter threads get the write lock, only to find the status is not `GcComplete`, return directly. - /// - there might be threads trying to call `server.gc_files`, but will return directly and call `server.sync_file_system`. - /// - /// movement: GcComplete -> Fetching - /// - /// ### server is Fetching - /// - /// This indicates there must be exactly one thread running in `server.sync_file_system`, in its fetching phase. - /// - any other threads calling this function will return directly. - /// - after fetching finishes, it will set status to Building - /// - timeout may reach, then we rollback - /// - /// movement: Fetching -> Building - /// failure: Fetching -> GcComplete - /// - /// ### server is Building - /// - /// The reason why we have this status is to prevent requesting outdated data. - /// There should be exactly one thread start running [replace phase] and might be other threads trying to call `gc_files` - /// - if the building phase is finished, the thread will set status to Active, and other threads may run [clear phase] - /// - if the building phase is interrupted, the thread will quit, and other threads will skip the clear phase, try to fetch directly. - /// - /// movements: Building -> Active, Building -> GcComplete - /// - pub fn update_files( + fn send_sync_fs_request( &self, conn_id: i32, file_group_format_id: i32, + timeout: std::time::Duration, + ) -> Result { + // request file list + let data = ClipboardFile::FormatDataRequest { + requested_format_id: file_group_format_id, + }; + send_data(conn_id, data); + self.rx.recv_timeout(timeout).map_err(|e| { + log::error!("failed to receive file list from channel: {:?}", e); + CliprdrError::ClipboardInternalError + }) + } + + pub fn update_files( + &mut self, + conn_id: i32, + file_group_format_id: i32, file_contents_format_id: i32, ) -> Result { - self.gc_files(); self.sync_file_system(conn_id, file_group_format_id, file_contents_format_id) } - pub fn recv(&self, conn_id: i32, clip_file: ClipboardFile) { - self.dispatcher.recv(conn_id, clip_file) - } - /// allocate a new file descriptor fn alloc_fd(&self) -> u64 { self.file_handle_counter.fetch_add(1, Ordering::Relaxed) @@ -807,7 +600,7 @@ impl FuseServer { clip_data_id: 0, }; - let rx = self.dispatcher.send(node.conn_id, request); + send_data(node.conn_id, request); log::debug!( "waiting for metadata sync reply for {:?} on channel {}", @@ -815,8 +608,10 @@ impl FuseServer { node.conn_id ); - let reply = rx.recv_timeout(self.timeout)?; - + let reply = self + .rx + .recv_timeout(self.timeout) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::TimedOut, e))?; log::debug!( "got metadata sync reply for {:?} on channel {}", node.name, @@ -897,7 +692,7 @@ impl FuseServer { clip_data_id: 0, }; - let rx = self.dispatcher.send(node.conn_id, request); + send_data(node.conn_id, request); log::debug!( "waiting for read reply for {:?} on stream: {}", @@ -905,7 +700,10 @@ impl FuseServer { node.stream_id ); - let reply = rx.recv_timeout(self.timeout)?; + let reply = self.rx.recv_timeout(self.timeout).map_err(|e| { + log::error!("failed to receive file list from channel: {:?}", e); + std::io::Error::new(std::io::ErrorKind::TimedOut, e) + })?; match reply { ClipboardFile::FileContentsResponse { @@ -961,7 +759,7 @@ impl FileDescription { // skip reserved 32 bytes bytes.advance(32); let attributes = bytes.get_u32_le(); - // skip reserverd 16 bytes + // skip reserved 16 bytes bytes.advance(16); // last write time from 1601-01-01 00:00:00, in 100ns let last_write_time = bytes.get_u64_le(); @@ -1126,11 +924,6 @@ impl FuseNode { self.file_handlers.marked() } - /// mark all files to be deleted - pub fn gc(&mut self) { - self.file_handlers.mark_and_wait() - } - pub fn add_handler(&self, fh: u64) { self.file_handlers.add_handler(fh) } @@ -1336,18 +1129,6 @@ impl FileHandles { self.handlers.lock().push(fh); } - // wait till gc completes - pub fn mark_and_wait(&self) { - let mut handlers = self.handlers.lock(); - self.gc.store(true, Ordering::Relaxed); - loop { - if handlers.is_empty() { - return; - } - self.waiter.wait(&mut handlers); - } - } - pub fn marked(&self) -> bool { self.gc.load(Ordering::Relaxed) } diff --git a/libs/clipboard/src/platform/linux/mod.rs b/libs/clipboard/src/platform/linux/mod.rs index cd3bbe2b5..a006e1705 100644 --- a/libs/clipboard/src/platform/linux/mod.rs +++ b/libs/clipboard/src/platform/linux/mod.rs @@ -3,10 +3,7 @@ use std::{ fs::File, os::unix::prelude::FileExt, path::{Path, PathBuf}, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, + sync::Arc, time::{Duration, SystemTime}, }; @@ -20,7 +17,7 @@ use lazy_static::lazy_static; use parking_lot::{Mutex, RwLock}; use utf16string::WString; -use crate::{send_data, send_data_to_all, ClipboardFile, CliprdrError, CliprdrServiceContext}; +use crate::{send_data, ClipboardFile, CliprdrError, CliprdrServiceContext}; use super::{fuse::FuseServer, LDAP_EPOCH_DELTA}; @@ -47,13 +44,16 @@ fn add_remote_format(local_name: &str, remote_id: i32) { } trait SysClipboard: Send + Sync { - fn wait_file_list(&self) -> Result>, CliprdrError>; fn set_file_list(&self, paths: &[PathBuf]) -> Result<(), CliprdrError>; fn stop(&self); fn start(&self); + /// send to 0 will send to all channels + fn send_format_list(&self, conn_id: i32) -> Result<(), CliprdrError>; + /// send to 0 will send to all channels + fn send_file_list(&self, conn_id: i32) -> Result<(), CliprdrError>; } -fn get_sys_clipboard() -> Result, CliprdrError> { +fn get_sys_clipboard(ignore_path: &PathBuf) -> Result, CliprdrError> { #[cfg(feature = "wayland")] { unimplemented!() @@ -61,7 +61,7 @@ fn get_sys_clipboard() -> Result, CliprdrError> { #[cfg(not(feature = "wayland"))] { pub use x11::*; - let x11_clip = X11Clipboard::new()?; + let x11_clip = X11Clipboard::new(ignore_path)?; Ok(Box::new(x11_clip) as Box<_>) } } @@ -300,38 +300,14 @@ enum FileContentsRequest { }, } -/// this is a proxy type for the clipboard context -pub struct CliprdrClient { - pub context: Arc, -} - -impl Drop for CliprdrClient { - fn drop(&mut self) { - self.context.ref_decrease(); - } -} - -impl CliprdrServiceContext for CliprdrClient { - fn set_is_stopped(&mut self) -> Result<(), CliprdrError> { - self.context.set_is_stopped() - } - - fn empty_clipboard(&mut self, conn_id: i32) -> Result { - self.context.empty_clipboard(conn_id) - } - - fn server_clip_file(&mut self, conn_id: i32, msg: ClipboardFile) -> Result<(), CliprdrError> { - self.context.serve(conn_id, msg) - } -} - pub struct ClipboardContext { - pub client_count: AtomicUsize, pub fuse_mount_point: PathBuf, - fuse_server: Arc, + fuse_handle: Mutex>, + + fuse_server: Arc>, + file_list: RwLock>, clipboard: Arc, - fuse_handle: Mutex>, } impl ClipboardContext { @@ -342,14 +318,13 @@ impl ClipboardContext { CliprdrError::CliprdrInit })?; - let fuse_server = Arc::new(FuseServer::new(timeout)); + let fuse_server = Arc::new(Mutex::new(FuseServer::new(timeout))); - let clipboard = get_sys_clipboard()?; - let clipboard = Arc::from(clipboard); + let clipboard = get_sys_clipboard(&fuse_mount_point)?; + let clipboard = Arc::from(clipboard) as Arc<_>; let file_list = RwLock::new(vec![]); Ok(Self { - client_count: AtomicUsize::new(0), fuse_mount_point, fuse_server, fuse_handle: Mutex::new(None), @@ -358,60 +333,48 @@ impl ClipboardContext { }) } - pub fn client(self: Arc) -> Result { - if self.client_count.fetch_add(1, Ordering::Relaxed) == 0 { - let mut fuse_handle = self.fuse_handle.lock(); - - if fuse_handle.is_none() { - let mount_path = &self.fuse_mount_point; - - let mnt_opts = [ - MountOption::FSName("rustdesk-cliprdr-fs".to_string()), - MountOption::RO, - MountOption::NoAtime, - ]; - log::info!( - "mounting clipboard FUSE to {}", - self.fuse_mount_point.display() - ); - - let new_handle = - fuser::spawn_mount2(self.fuse_server.client(), mount_path, &mnt_opts).map_err( - |e| { - log::error!("failed to mount cliprdr fuse: {:?}", e); - CliprdrError::CliprdrInit - }, - )?; - *fuse_handle = Some(new_handle); - } - - self.clipboard.start(); - let clip = self.clone(); - std::thread::spawn(move || { - let res = clip.listen_clipboard(); - if let Err(e) = res { - log::error!("failed to listen clipboard: {:?}", e); - } - log::info!("stopped listening clipboard"); - }); - } - - Ok(CliprdrClient { context: self }) - } - - /// set context to be inactive - pub fn ref_decrease(&self) { - if self.client_count.fetch_sub(1, Ordering::Relaxed) > 1 { - return; + pub fn run(&self) -> Result<(), CliprdrError> { + if !self.is_stopped() { + return Ok(()); } let mut fuse_handle = self.fuse_handle.lock(); - if let Some(fuse_handle) = fuse_handle.take() { - log::debug!("unmounting clipboard FUSE"); - fuse_handle.join(); - } - self.clipboard.stop(); - std::fs::remove_dir(&self.fuse_mount_point).unwrap(); + + let mount_path = &self.fuse_mount_point; + + let mnt_opts = [ + MountOption::FSName("rustdesk-cliprdr-fs".to_string()), + MountOption::RO, + MountOption::NoAtime, + ]; + log::info!( + "mounting clipboard FUSE to {}", + self.fuse_mount_point.display() + ); + + let new_handle = fuser::spawn_mount2( + FuseServer::client(self.fuse_server.clone()), + mount_path, + &mnt_opts, + ) + .map_err(|e| { + log::error!("failed to mount cliprdr fuse: {:?}", e); + CliprdrError::CliprdrInit + })?; + *fuse_handle = Some(new_handle); + + let clipboard = self.clipboard.clone(); + + std::thread::spawn(move || { + log::debug!("start listening clipboard"); + clipboard.start(); + }); + + Ok(()) + } + + pub fn stop(&self) -> Result<(), CliprdrError> { + self.set_is_stopped() } /// set clipboard data from file list @@ -422,108 +385,6 @@ impl ClipboardContext { self.clipboard.set_file_list(&paths) } - pub fn listen_clipboard(&self) -> Result<(), CliprdrError> { - log::debug!("start listening clipboard"); - while let Some(v) = self.clipboard.wait_file_list()? { - let filtered: Vec<_> = v - .into_iter() - .filter(|pb| !pb.starts_with(&self.fuse_mount_point)) - .collect(); - log::debug!("clipboard file list update (filtered): {:?}", filtered); - if filtered.is_empty() { - continue; - } - log::debug!("send file list update to remote"); - - // construct format list update and send - let data = ClipboardFile::FormatList { - format_list: vec![ - ( - FILEDESCRIPTOR_FORMAT_ID, - FILEDESCRIPTORW_FORMAT_NAME.to_string(), - ), - (FILECONTENTS_FORMAT_ID, FILECONTENTS_FORMAT_NAME.to_string()), - ], - }; - - send_data_to_all(data); - log::debug!("format list update sent"); - } - Ok(()) - } - - fn send_format_list(&self, conn_id: i32) -> Result<(), CliprdrError> { - log::debug!("send format list to remote, conn={}", conn_id); - let data = self.clipboard.wait_file_list()?; - if data.is_none() { - log::debug!("clipboard disabled, skip sending"); - return Ok(()); - } - let data = data.unwrap(); - - let filtered: Vec<_> = data - .into_iter() - .filter(|pb| !pb.starts_with(&self.fuse_mount_point)) - .collect(); - if filtered.is_empty() { - log::debug!("no files in format list, skip sending"); - return Ok(()); - } - - let format_list = ClipboardFile::FormatList { - format_list: vec![ - ( - FILEDESCRIPTOR_FORMAT_ID, - FILEDESCRIPTORW_FORMAT_NAME.to_string(), - ), - (FILECONTENTS_FORMAT_ID, FILECONTENTS_FORMAT_NAME.to_string()), - ], - }; - - send_data(conn_id, format_list); - log::debug!("format list to remote dispatched, conn={}", conn_id); - Ok(()) - } - - fn send_file_list(&self, conn_id: i32) -> Result<(), CliprdrError> { - log::debug!("send file list to remote, conn={}", conn_id); - let data = self.clipboard.wait_file_list()?; - if data.is_none() { - log::debug!("clipboard disabled, skip sending"); - return Ok(()); - } - - let data = data.unwrap(); - let filtered: Vec<_> = data - .into_iter() - .filter(|pb| !pb.starts_with(&self.fuse_mount_point)) - .collect(); - - let files = construct_file_list(filtered.as_slice())?; - - let mut data = BytesMut::with_capacity(4 + 592 * files.len()); - data.put_u32_le(filtered.len() as u32); - for file in files.iter() { - data.put(file.as_bin().as_slice()); - } - - { - let mut w_list = self.file_list.write(); - *w_list = files; - } - - let format_data = data.to_vec(); - - send_data( - conn_id, - ClipboardFile::FormatDataResponse { - msg_flags: 1, - format_data, - }, - ); - Ok(()) - } - fn serve_file_contents( &self, conn_id: i32, @@ -663,6 +524,7 @@ impl ClipboardContext { if let Some(fuse_handle) = self.fuse_handle.lock().take() { fuse_handle.join(); } + self.clipboard.stop(); Ok(()) } @@ -673,8 +535,11 @@ impl ClipboardContext { return Ok(true); } - self.fuse_server - .update_files(conn_id, FILEDESCRIPTOR_FORMAT_ID, FILECONTENTS_FORMAT_ID) + self.fuse_server.lock().update_files( + conn_id, + FILEDESCRIPTOR_FORMAT_ID, + FILECONTENTS_FORMAT_ID, + ) } pub fn serve(&self, conn_id: i32, msg: ClipboardFile) -> Result<(), CliprdrError> { @@ -691,7 +556,7 @@ impl ClipboardContext { // ignore capabilities for now - self.send_file_list(0)?; + self.clipboard.send_file_list(0)?; Ok(()) } @@ -722,14 +587,17 @@ impl ClipboardContext { add_remote_format(FILECONTENTS_FORMAT_NAME, file_contents_id); add_remote_format(FILEDESCRIPTORW_FORMAT_NAME, file_descriptor_id); - self.fuse_server - .update_files(conn_id, file_descriptor_id, file_contents_id)?; + self.fuse_server.lock().update_files( + conn_id, + file_descriptor_id, + file_contents_id, + )?; Ok(()) } ClipboardFile::FormatListResponse { msg_flags } => { log::debug!("server_format_list_response called"); if msg_flags != 0x1 { - self.send_format_list(conn_id) + self.clipboard.send_format_list(conn_id) } else { Ok(()) } @@ -749,7 +617,7 @@ impl ClipboardContext { }; if format == FILEDESCRIPTORW_FORMAT_NAME { - self.send_file_list(requested_format_id)?; + self.clipboard.send_file_list(conn_id)?; } else if format == FILECONTENTS_FORMAT_NAME { log::error!( "try to read file contents with FormatDataRequest from conn={}", @@ -769,16 +637,17 @@ impl ClipboardContext { ClipboardFile::FormatDataResponse { .. } => { // we don't know its corresponding request, no resend can be performed log::debug!("server_format_data_response called"); + let mut fuse_server = self.fuse_server.lock(); - self.fuse_server.recv(conn_id, msg); - let paths = self.fuse_server.list_root(); + fuse_server.serve(msg)?; + let paths = fuse_server.list_root(); self.set_clipboard(&paths)?; Ok(()) } ClipboardFile::FileContentsResponse { .. } => { log::debug!("server_file_contents_response called"); // we don't know its corresponding request, no resend can be performed - self.fuse_server.recv(conn_id, msg); + self.fuse_server.lock().serve(msg)?; Ok(()) } ClipboardFile::FileContentsRequest { @@ -818,6 +687,19 @@ impl ClipboardContext { } } +impl CliprdrServiceContext for ClipboardContext { + fn set_is_stopped(&mut self) -> Result<(), CliprdrError> { + self.stop() + } + fn empty_clipboard(&mut self, _conn_id: i32) -> Result { + self.clipboard.set_file_list(&[])?; + Ok(true) + } + fn server_clip_file(&mut self, conn_id: i32, msg: ClipboardFile) -> Result<(), CliprdrError> { + self.serve(conn_id, msg) + } +} + fn resp_format_data_failure(conn_id: i32) { let data = ClipboardFile::FormatDataResponse { msg_flags: 0x2, diff --git a/libs/clipboard/src/platform/linux/x11.rs b/libs/clipboard/src/platform/linux/x11.rs index 81aaa498d..093c3827d 100644 --- a/libs/clipboard/src/platform/linux/x11.rs +++ b/libs/clipboard/src/platform/linux/x11.rs @@ -3,12 +3,21 @@ use std::{ sync::atomic::{AtomicBool, Ordering}, }; -use hbb_common::log; +use hbb_common::{ + bytes::{BufMut, BytesMut}, + log, +}; use once_cell::sync::OnceCell; use x11_clipboard::Clipboard; use x11rb::protocol::xproto::Atom; -use crate::CliprdrError; +use crate::{ + platform::linux::{ + construct_file_list, FILECONTENTS_FORMAT_ID, FILECONTENTS_FORMAT_NAME, + FILEDESCRIPTORW_FORMAT_NAME, FILEDESCRIPTOR_FORMAT_ID, + }, + send_data, ClipboardFile, CliprdrError, +}; use super::{encode_path_to_uri, parse_plain_uri_list, SysClipboard}; @@ -20,12 +29,13 @@ fn get_clip() -> Result<&'static Clipboard, CliprdrError> { pub struct X11Clipboard { stop: AtomicBool, + ignore_path: PathBuf, text_uri_list: Atom, gnome_copied_files: Atom, } impl X11Clipboard { - pub fn new() -> Result { + pub fn new(ignore_path: &PathBuf) -> Result { let clipboard = get_clip()?; let text_uri_list = clipboard .setter @@ -36,6 +46,7 @@ impl X11Clipboard { .get_atom("x-special/gnome-copied-files") .map_err(|_| CliprdrError::CliprdrInit)?; Ok(Self { + ignore_path: ignore_path.to_owned(), stop: AtomicBool::new(false), text_uri_list, gnome_copied_files, @@ -58,9 +69,7 @@ impl X11Clipboard { .store_batch(clip, batch) .map_err(|_| CliprdrError::ClipboardInternalError) } -} -impl SysClipboard for X11Clipboard { fn wait_file_list(&self) -> Result>, CliprdrError> { if self.stop.load(Ordering::Relaxed) { return Ok(None); @@ -70,7 +79,16 @@ impl SysClipboard for X11Clipboard { let p = parse_plain_uri_list(v)?; Ok(Some(p)) } +} +impl X11Clipboard { + #[inline] + fn is_stopped(&self) -> bool { + self.stop.load(Ordering::Relaxed) + } +} + +impl SysClipboard for X11Clipboard { fn set_file_list(&self, paths: &[PathBuf]) -> Result<(), CliprdrError> { let uri_list: Vec = paths.iter().map(|pb| encode_path_to_uri(pb)).collect(); let uri_list = uri_list.join("\n"); @@ -90,5 +108,121 @@ impl SysClipboard for X11Clipboard { fn start(&self) { self.stop.store(false, Ordering::Relaxed); + + while let Ok(sth) = self.wait_file_list() { + if self.is_stopped() { + break; + } + + let Some(paths) = sth else { + std::thread::sleep(std::time::Duration::from_millis(100)); + continue; + }; + + let filtered = paths + .into_iter() + .filter(|pb| !pb.starts_with(&self.ignore_path)) + .collect::>(); + + if filtered.is_empty() { + std::thread::sleep(std::time::Duration::from_millis(100)); + continue; + } + + // send update to server + log::debug!("clipboard updated: {:?}", filtered); + + if let Err(e) = send_format_list(0) { + log::warn!("failed to send format list: {}", e); + } + + std::thread::sleep(std::time::Duration::from_millis(100)); + } + } + + fn send_format_list(&self, conn_id: i32) -> Result<(), CliprdrError> { + if self.is_stopped() { + log::debug!("clipboard stopped, skip sending"); + return Ok(()); + } + + let Some(paths) = self.wait_file_list()? else { + log::debug!("no files in format list, skip sending"); + return Ok(()); + }; + + let filtered: Vec<_> = paths + .into_iter() + .filter(|pb| !pb.starts_with(&self.ignore_path)) + .collect(); + + if filtered.is_empty() { + log::debug!("no files in format list, skip sending"); + return Ok(()); + } + + send_format_list(conn_id) + } + + fn send_file_list(&self, conn_id: i32) -> Result<(), CliprdrError> { + if self.is_stopped() { + log::debug!("clipboard stopped, skip sending"); + return Ok(()); + } + let Some(paths) = self.wait_file_list()? else { + log::debug!("no files in format list, skip sending"); + return Ok(()); + }; + + let filtered: Vec<_> = paths + .into_iter() + .filter(|pb| !pb.starts_with(&self.ignore_path)) + .collect(); + + if filtered.is_empty() { + log::debug!("no files in format list, skip sending"); + return Ok(()); + } + + send_file_list(filtered, conn_id) } } + +fn send_format_list(conn_id: i32) -> Result<(), CliprdrError> { + log::debug!("send format list to remote, conn={}", conn_id); + let format_list = ClipboardFile::FormatList { + format_list: vec![ + ( + FILEDESCRIPTOR_FORMAT_ID, + FILEDESCRIPTORW_FORMAT_NAME.to_string(), + ), + (FILECONTENTS_FORMAT_ID, FILECONTENTS_FORMAT_NAME.to_string()), + ], + }; + + send_data(conn_id, format_list); + log::debug!("format list to remote dispatched, conn={}", conn_id); + Ok(()) +} + +fn send_file_list(paths: Vec, conn_id: i32) -> Result<(), CliprdrError> { + log::debug!("send file list to remote, conn={}", conn_id); + let files = construct_file_list(paths.as_slice())?; + + let mut data = BytesMut::with_capacity(4 + 592 * files.len()); + data.put_u32_le(paths.len() as u32); + for file in files.iter() { + data.put(file.as_bin().as_slice()); + } + + let format_data = data.to_vec(); + + send_data( + conn_id, + ClipboardFile::FormatDataResponse { + msg_flags: 1, + format_data, + }, + ); + Ok(()) +} diff --git a/libs/clipboard/src/platform/mod.rs b/libs/clipboard/src/platform/mod.rs index c6e855143..604055905 100644 --- a/libs/clipboard/src/platform/mod.rs +++ b/libs/clipboard/src/platform/mod.rs @@ -25,9 +25,7 @@ pub fn create_cliprdr_context( _enable_others: bool, response_wait_timeout_secs: u32, ) -> crate::ResultType> { - use std::sync::Arc; - - use hbb_common::{anyhow, log}; + use hbb_common::log; if !enable_files { return Ok(Box::new(DummyCliprdrContext {}) as Box<_>); @@ -53,13 +51,9 @@ pub fn create_cliprdr_context( tmp_path }; - let linux_ctx = Arc::new(linux::ClipboardContext::new(timeout, rd_mnt)?); - let client = linux_ctx.client().map_err(|e| { - log::error!("create clipboard client: {:?}", e); - anyhow::anyhow!("create clipboard client: {:?}", e) - })?; + let linux_ctx = linux::ClipboardContext::new(timeout, rd_mnt)?; - Ok(Box::new(client) as Box<_>) + Ok(Box::new(linux_ctx) as Box<_>) } struct DummyCliprdrContext {}