From d2e24173d0d87e840e41e22dc1a74b588322979e Mon Sep 17 00:00:00 2001 From: Kingtous Date: Sun, 12 Feb 2023 10:28:04 +0800 Subject: [PATCH] opt: read uint8list directly from rust codes --- flutter/lib/models/model.dart | 30 +++--------------- flutter/lib/models/native_model.dart | 39 +++++++++++++++++------ src/client.rs | 8 ++--- src/flutter.rs | 47 +++++++++++++++++++--------- src/ui/remote.rs | 8 +++-- src/ui_session_interface.rs | 7 +++-- 6 files changed, 78 insertions(+), 61 deletions(-) diff --git a/flutter/lib/models/model.dart b/flutter/lib/models/model.dart index e09a99875..8cf90eba9 100644 --- a/flutter/lib/models/model.dart +++ b/flutter/lib/models/model.dart @@ -417,8 +417,6 @@ class ImageModel with ChangeNotifier { String id = ''; - int decodeCount = 0; - WeakReference parent; final List _callbacksOnFirstImage = []; @@ -439,20 +437,16 @@ class ImageModel with ChangeNotifier { } } - if (decodeCount >= 1) { - return; - } - final pid = parent.target?.id; - decodeCount += 1; ui.decodeImageFromPixels( rgba, parent.target?.ffiModel.display.width ?? 0, parent.target?.ffiModel.display.height ?? 0, isWeb ? ui.PixelFormat.rgba8888 : ui.PixelFormat.bgra8888, (image) { - decodeCount -= 1; if (parent.target?.id != pid) return; try { + // Unlock the rgba memory from rust codes. + platformFFI.nextRgba(id); // my throw exception, because the listener maybe already dispose update(image); } catch (e) { @@ -1370,8 +1364,6 @@ class FFI { final cb = ffiModel.startEventListener(id); () async { // Preserved for the rgba data. - Pointer? buffer; - int? bufferSize; await for (final message in stream) { if (message is EventToUI_Event) { try { @@ -1383,29 +1375,15 @@ class FFI { } else if (message is EventToUI_Rgba) { // Fetch the image buffer from rust codes. final sz = platformFFI.getRgbaSize(id); - if (sz == null) { + if (sz == null || sz == 0) { return; } - // The buffer does not exists or the bufferSize is not - // equal to the required size. - if (buffer == null || bufferSize != sz) { - // reallocate buffer - if (buffer != null) { - malloc.free(buffer); - } - buffer = malloc.allocate(sz); - bufferSize = sz; - } - final rgba = platformFFI.getRgba(id, buffer, bufferSize!); + final rgba = platformFFI.getRgba(id, sz); if (rgba != null) { imageModel.onRgba(rgba); } } } - // Free the buffer allocated on the heap. - if (buffer != null) { - malloc.free(buffer); - } }(); // every instance will bind a stream this.id = id; diff --git a/flutter/lib/models/native_model.dart b/flutter/lib/models/native_model.dart index 588c3646f..ba62b775e 100644 --- a/flutter/lib/models/native_model.dart +++ b/flutter/lib/models/native_model.dart @@ -9,6 +9,7 @@ import 'package:ffi/ffi.dart'; import 'package:flutter/foundation.dart'; import 'package:flutter/services.dart'; import 'package:flutter_hbb/consts.dart'; +import 'package:get/get.dart'; import 'package:package_info_plus/package_info_plus.dart'; import 'package:path_provider/path_provider.dart'; import 'package:win32/win32.dart' as win32; @@ -23,10 +24,11 @@ class RgbaFrame extends Struct { } typedef F2 = Pointer Function(Pointer, Pointer); -typedef F3 = Void Function(Pointer, Pointer); -typedef F3Dart = void Function(Pointer, Pointer); +typedef F3 = Pointer Function(Pointer); typedef F4 = Uint64 Function(Pointer); typedef F4Dart = int Function(Pointer); +typedef F5 = Void Function(Pointer); +typedef F5Dart = void Function(Pointer); typedef HandleEvent = Future Function(Map evt); /// FFI wrapper around the native Rust core. @@ -47,8 +49,9 @@ class PlatformFFI { final _toAndroidChannel = const MethodChannel('mChannel'); RustdeskImpl get ffiBind => _ffiBind; - F3Dart? _session_get_rgba; + F3? _session_get_rgba; F4Dart? _session_get_rgba_size; + F5Dart? _session_next_rgba; static get localeName => Platform.localeName; @@ -97,13 +100,19 @@ class PlatformFFI { return res; } - Uint8List? getRgba(String id, Pointer buffer, int bufSize) { + Uint8List? getRgba(String id, int bufSize) { if (_session_get_rgba == null) return null; var a = id.toNativeUtf8(); - _session_get_rgba!(a, buffer); - final data = buffer.asTypedList(bufSize); - malloc.free(a); - return data; + try { + final buffer = _session_get_rgba!(a); + if (buffer == nullptr) { + return null; + } + final data = buffer.asTypedList(bufSize); + return data; + } finally { + malloc.free(a); + } } int? getRgbaSize(String id) { @@ -114,6 +123,13 @@ class PlatformFFI { return bufferSize; } + void nextRgba(String id) { + if (_session_next_rgba == null) return; + final a = id.toNativeUtf8(); + _session_next_rgba!(a); + malloc.free(a); + } + /// Init the FFI class, loads the native Rust core library. Future init(String appType) async { _appType = appType; @@ -129,8 +145,11 @@ class PlatformFFI { debugPrint('initializing FFI $_appType'); try { _translate = dylib.lookupFunction('translate'); - _session_get_rgba = dylib.lookupFunction("session_get_rgba"); - _session_get_rgba_size = dylib.lookupFunction("session_get_rgba_size"); + _session_get_rgba = dylib.lookupFunction("session_get_rgba"); + _session_get_rgba_size = + dylib.lookupFunction("session_get_rgba_size"); + _session_next_rgba = + dylib.lookupFunction("session_next_rgba"); try { // SYSTEM user failed _dir = (await getApplicationDocumentsDirectory()).path; diff --git a/src/client.rs b/src/client.rs index c6e0a759f..a21592578 100644 --- a/src/client.rs +++ b/src/client.rs @@ -817,7 +817,7 @@ impl AudioHandler { pub struct VideoHandler { decoder: Decoder, latency_controller: Arc>, - pub rgb: Arc>>, + pub rgb: Vec, recorder: Arc>>, record: bool, } @@ -850,7 +850,7 @@ impl VideoHandler { } match &vf.union { Some(frame) => { - let res = self.decoder.handle_video_frame(frame, &mut self.rgb.write().unwrap()); + let res = self.decoder.handle_video_frame(frame, &mut self.rgb); if self.record { self.recorder .lock() @@ -1545,7 +1545,7 @@ pub type MediaSender = mpsc::Sender; /// * `video_callback` - The callback for video frame. Being called when a video frame is ready. pub fn start_video_audio_threads(video_callback: F) -> (MediaSender, MediaSender) where - F: 'static + FnMut(Arc>>) + Send, + F: 'static + FnMut(&mut Vec) + Send, { let (video_sender, video_receiver) = mpsc::channel::(); let mut video_callback = video_callback; @@ -1560,7 +1560,7 @@ where match data { MediaData::VideoFrame(vf) => { if let Ok(true) = video_handler.handle_frame(vf) { - video_callback(video_handler.rgb.clone()); + video_callback(&mut video_handler.rgb); } } MediaData::Reset => { diff --git a/src/flutter.rs b/src/flutter.rs index bb6f85bb9..a60e379f9 100644 --- a/src/flutter.rs +++ b/src/flutter.rs @@ -15,7 +15,7 @@ use std::{ os::raw::{c_char, c_int}, sync::{Arc, RwLock}, }; -use libc::memcpy; +use std::sync::atomic::{AtomicBool, Ordering}; pub(super) const APP_TYPE_MAIN: &str = "main"; pub(super) const APP_TYPE_CM: &str = "cm"; @@ -111,8 +111,10 @@ pub unsafe extern "C" fn free_c_args(ptr: *mut *mut c_char, len: c_int) { #[derive(Default, Clone)] pub struct FlutterHandler { pub event_stream: Arc>>>, + // SAFETY: [rgba] is guarded by [rgba_valid], and it's safe to reach [rgba] with `rgba_valid == true`. + // We must check the `rgba_valid` before reading [rgba]. pub rgba: Arc>>, - pub rgba_valid: Arc> + pub rgba_valid: Arc } impl FlutterHandler { @@ -291,18 +293,18 @@ impl InvokeUiSession for FlutterHandler { // unused in flutter fn adapt_size(&self) {} - fn on_rgba(&self, data: Arc>>) { + fn on_rgba(&self, data: &mut Vec) { // If the current rgba is not fetched by flutter, i.e., is valid. // We give up sending a new event to flutter. - if *self.rgba_valid.read().unwrap() { + if self.rgba_valid.load(Ordering::Relaxed) { return; } + self.rgba_valid.store(true, Ordering::Relaxed); // Return the rgba buffer to the video handler for reusing allocated rgba buffer. - std::mem::swap::>(data.write().unwrap().as_mut(), self.rgba.write().unwrap().as_mut()); + std::mem::swap::>(data, &mut *self.rgba.write().unwrap()); if let Some(stream) = &*self.event_stream.read().unwrap() { stream.add(EventToUI::Rgba); } - let _ = std::mem::replace(&mut *self.rgba_valid.write().unwrap(), true); } fn set_peer_info(&self, pi: &PeerInfo) { @@ -421,13 +423,17 @@ impl InvokeUiSession for FlutterHandler { self.push_event("on_voice_call_incoming", [].into()); } - fn get_rgba(&mut self, buffer: *mut u8) { - // [Safety] - // * It must be ensures the buffer has enough space to place the whole rgba. - let max_len = self.rgba.read().unwrap().len(); - unsafe { std::ptr::copy_nonoverlapping(self.rgba.read().unwrap().as_ptr(), buffer, max_len)}; - // mark the rgba has been taken from flutter. - let _ = std::mem::replace(&mut *self.rgba_valid.write().unwrap(), false); + #[inline] + fn get_rgba(&self) -> *const u8 { + if self.rgba_valid.load(Ordering::Relaxed) { + return self.rgba.read().unwrap().as_ptr(); + } + std::ptr::null_mut() + } + + #[inline] + fn next_rgba(&mut self) { + self.rgba_valid.store(false, Ordering::Relaxed); } } @@ -668,11 +674,22 @@ pub fn session_get_rgba_size(id: *const char) -> usize { } #[no_mangle] -pub fn session_get_rgba(id: *const char, buffer: *mut u8) { +pub fn session_get_rgba(id: *const char) -> *const u8 { let id = unsafe { std::ffi::CStr::from_ptr(id as _) }; if let Ok(id) = id.to_str() { if let Some(session) = SESSIONS.write().unwrap().get_mut(id) { - return session.get_rgba(buffer); + return session.get_rgba(); + } + } + std::ptr::null() +} + +#[no_mangle] +pub fn session_next_rgba(id: *const char) { + let id = unsafe { std::ffi::CStr::from_ptr(id as _) }; + if let Ok(id) = id.to_str() { + if let Some(session) = SESSIONS.write().unwrap().get_mut(id) { + return session.next_rgba(); } } } \ No newline at end of file diff --git a/src/ui/remote.rs b/src/ui/remote.rs index ecf96ab32..e44e31401 100644 --- a/src/ui/remote.rs +++ b/src/ui/remote.rs @@ -203,12 +203,12 @@ impl InvokeUiSession for SciterHandler { self.call("adaptSize", &make_args!()); } - fn on_rgba(&self, data: Arc>>) { + fn on_rgba(&self, data: &mut Vec) { VIDEO .lock() .unwrap() .as_mut() - .map(|v| v.render_frame(data.read().unwrap().as_ref()).ok()); + .map(|v| v.render_frame(data).ok()); } fn set_peer_info(&self, pi: &PeerInfo) { @@ -286,7 +286,9 @@ impl InvokeUiSession for SciterHandler { } /// RGBA is directly rendered by [on_rgba]. No need to store the rgba for the sciter ui. - fn get_rgba(&mut self, _buffer: *mut u8) {} + fn get_rgba(&self) -> *const u8 { std::ptr::null() } + + fn next_rgba(&mut self) {} } pub struct SciterSession(Session); diff --git a/src/ui_session_interface.rs b/src/ui_session_interface.rs index 85deb68c2..25c15f52f 100644 --- a/src/ui_session_interface.rs +++ b/src/ui_session_interface.rs @@ -712,7 +712,7 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default { fn update_block_input_state(&self, on: bool); fn job_progress(&self, id: i32, file_num: i32, speed: f64, finished_size: f64); fn adapt_size(&self); - fn on_rgba(&self, data: Arc>>); + fn on_rgba(&self, data: &mut Vec); fn msgbox(&self, msgtype: &str, title: &str, text: &str, link: &str, retry: bool); #[cfg(any(target_os = "android", target_os = "ios"))] fn clipboard(&self, content: String); @@ -722,7 +722,8 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default { fn on_voice_call_closed(&self, reason: &str); fn on_voice_call_waiting(&self); fn on_voice_call_incoming(&self); - fn get_rgba(&mut self, buffer: *mut u8); + fn get_rgba(&self) -> *const u8; + fn next_rgba(&mut self); } impl Deref for Session { @@ -957,7 +958,7 @@ pub async fn io_loop(handler: Session) { let frame_count = Arc::new(AtomicUsize::new(0)); let frame_count_cl = frame_count.clone(); let ui_handler = handler.ui_handler.clone(); - let (video_sender, audio_sender) = start_video_audio_threads(move |data: Arc>> | { + let (video_sender, audio_sender) = start_video_audio_threads(move |data: &mut Vec | { frame_count_cl.fetch_add(1, Ordering::Relaxed); ui_handler.on_rgba(data); });