opt: read uint8list directly from rust codes

This commit is contained in:
Kingtous 2023-02-12 10:28:04 +08:00
parent 01d30bce9e
commit d2e24173d0
6 changed files with 78 additions and 61 deletions

View File

@ -417,8 +417,6 @@ class ImageModel with ChangeNotifier {
String id = '';
int decodeCount = 0;
WeakReference<FFI> parent;
final List<Function(String)> _callbacksOnFirstImage = [];
@ -439,20 +437,16 @@ class ImageModel with ChangeNotifier {
}
}
if (decodeCount >= 1) {
return;
}
final pid = parent.target?.id;
decodeCount += 1;
ui.decodeImageFromPixels(
rgba,
parent.target?.ffiModel.display.width ?? 0,
parent.target?.ffiModel.display.height ?? 0,
isWeb ? ui.PixelFormat.rgba8888 : ui.PixelFormat.bgra8888, (image) {
decodeCount -= 1;
if (parent.target?.id != pid) return;
try {
// Unlock the rgba memory from rust codes.
platformFFI.nextRgba(id);
// my throw exception, because the listener maybe already dispose
update(image);
} catch (e) {
@ -1370,8 +1364,6 @@ class FFI {
final cb = ffiModel.startEventListener(id);
() async {
// Preserved for the rgba data.
Pointer<Uint8>? buffer;
int? bufferSize;
await for (final message in stream) {
if (message is EventToUI_Event) {
try {
@ -1383,29 +1375,15 @@ class FFI {
} else if (message is EventToUI_Rgba) {
// Fetch the image buffer from rust codes.
final sz = platformFFI.getRgbaSize(id);
if (sz == null) {
if (sz == null || sz == 0) {
return;
}
// The buffer does not exists or the bufferSize is not
// equal to the required size.
if (buffer == null || bufferSize != sz) {
// reallocate buffer
if (buffer != null) {
malloc.free(buffer);
}
buffer = malloc.allocate(sz);
bufferSize = sz;
}
final rgba = platformFFI.getRgba(id, buffer, bufferSize!);
final rgba = platformFFI.getRgba(id, sz);
if (rgba != null) {
imageModel.onRgba(rgba);
}
}
}
// Free the buffer allocated on the heap.
if (buffer != null) {
malloc.free(buffer);
}
}();
// every instance will bind a stream
this.id = id;

View File

@ -9,6 +9,7 @@ import 'package:ffi/ffi.dart';
import 'package:flutter/foundation.dart';
import 'package:flutter/services.dart';
import 'package:flutter_hbb/consts.dart';
import 'package:get/get.dart';
import 'package:package_info_plus/package_info_plus.dart';
import 'package:path_provider/path_provider.dart';
import 'package:win32/win32.dart' as win32;
@ -23,10 +24,11 @@ class RgbaFrame extends Struct {
}
typedef F2 = Pointer<Utf8> Function(Pointer<Utf8>, Pointer<Utf8>);
typedef F3 = Void Function(Pointer<Utf8>, Pointer<Uint8>);
typedef F3Dart = void Function(Pointer<Utf8>, Pointer<Uint8>);
typedef F3 = Pointer<Uint8> Function(Pointer<Utf8>);
typedef F4 = Uint64 Function(Pointer<Utf8>);
typedef F4Dart = int Function(Pointer<Utf8>);
typedef F5 = Void Function(Pointer<Utf8>);
typedef F5Dart = void Function(Pointer<Utf8>);
typedef HandleEvent = Future<void> Function(Map<String, dynamic> evt);
/// FFI wrapper around the native Rust core.
@ -47,8 +49,9 @@ class PlatformFFI {
final _toAndroidChannel = const MethodChannel('mChannel');
RustdeskImpl get ffiBind => _ffiBind;
F3Dart? _session_get_rgba;
F3? _session_get_rgba;
F4Dart? _session_get_rgba_size;
F5Dart? _session_next_rgba;
static get localeName => Platform.localeName;
@ -97,13 +100,19 @@ class PlatformFFI {
return res;
}
Uint8List? getRgba(String id, Pointer<Uint8> buffer, int bufSize) {
Uint8List? getRgba(String id, int bufSize) {
if (_session_get_rgba == null) return null;
var a = id.toNativeUtf8();
_session_get_rgba!(a, buffer);
final data = buffer.asTypedList(bufSize);
malloc.free(a);
return data;
try {
final buffer = _session_get_rgba!(a);
if (buffer == nullptr) {
return null;
}
final data = buffer.asTypedList(bufSize);
return data;
} finally {
malloc.free(a);
}
}
int? getRgbaSize(String id) {
@ -114,6 +123,13 @@ class PlatformFFI {
return bufferSize;
}
void nextRgba(String id) {
if (_session_next_rgba == null) return;
final a = id.toNativeUtf8();
_session_next_rgba!(a);
malloc.free(a);
}
/// Init the FFI class, loads the native Rust core library.
Future<void> init(String appType) async {
_appType = appType;
@ -129,8 +145,11 @@ class PlatformFFI {
debugPrint('initializing FFI $_appType');
try {
_translate = dylib.lookupFunction<F2, F2>('translate');
_session_get_rgba = dylib.lookupFunction<F3, F3Dart>("session_get_rgba");
_session_get_rgba_size = dylib.lookupFunction<F4, F4Dart>("session_get_rgba_size");
_session_get_rgba = dylib.lookupFunction<F3, F3>("session_get_rgba");
_session_get_rgba_size =
dylib.lookupFunction<F4, F4Dart>("session_get_rgba_size");
_session_next_rgba =
dylib.lookupFunction<F5, F5Dart>("session_next_rgba");
try {
// SYSTEM user failed
_dir = (await getApplicationDocumentsDirectory()).path;

View File

@ -817,7 +817,7 @@ impl AudioHandler {
pub struct VideoHandler {
decoder: Decoder,
latency_controller: Arc<Mutex<LatencyController>>,
pub rgb: Arc<RwLock<Vec<u8>>>,
pub rgb: Vec<u8>,
recorder: Arc<Mutex<Option<Recorder>>>,
record: bool,
}
@ -850,7 +850,7 @@ impl VideoHandler {
}
match &vf.union {
Some(frame) => {
let res = self.decoder.handle_video_frame(frame, &mut self.rgb.write().unwrap());
let res = self.decoder.handle_video_frame(frame, &mut self.rgb);
if self.record {
self.recorder
.lock()
@ -1545,7 +1545,7 @@ pub type MediaSender = mpsc::Sender<MediaData>;
/// * `video_callback` - The callback for video frame. Being called when a video frame is ready.
pub fn start_video_audio_threads<F>(video_callback: F) -> (MediaSender, MediaSender)
where
F: 'static + FnMut(Arc<RwLock<Vec<u8>>>) + Send,
F: 'static + FnMut(&mut Vec<u8>) + Send,
{
let (video_sender, video_receiver) = mpsc::channel::<MediaData>();
let mut video_callback = video_callback;
@ -1560,7 +1560,7 @@ where
match data {
MediaData::VideoFrame(vf) => {
if let Ok(true) = video_handler.handle_frame(vf) {
video_callback(video_handler.rgb.clone());
video_callback(&mut video_handler.rgb);
}
}
MediaData::Reset => {

View File

@ -15,7 +15,7 @@ use std::{
os::raw::{c_char, c_int},
sync::{Arc, RwLock},
};
use libc::memcpy;
use std::sync::atomic::{AtomicBool, Ordering};
pub(super) const APP_TYPE_MAIN: &str = "main";
pub(super) const APP_TYPE_CM: &str = "cm";
@ -111,8 +111,10 @@ pub unsafe extern "C" fn free_c_args(ptr: *mut *mut c_char, len: c_int) {
#[derive(Default, Clone)]
pub struct FlutterHandler {
pub event_stream: Arc<RwLock<Option<StreamSink<EventToUI>>>>,
// SAFETY: [rgba] is guarded by [rgba_valid], and it's safe to reach [rgba] with `rgba_valid == true`.
// We must check the `rgba_valid` before reading [rgba].
pub rgba: Arc<RwLock<Vec<u8>>>,
pub rgba_valid: Arc<RwLock<bool>>
pub rgba_valid: Arc<AtomicBool>
}
impl FlutterHandler {
@ -291,18 +293,18 @@ impl InvokeUiSession for FlutterHandler {
// unused in flutter
fn adapt_size(&self) {}
fn on_rgba(&self, data: Arc<RwLock<Vec<u8>>>) {
fn on_rgba(&self, data: &mut Vec<u8>) {
// If the current rgba is not fetched by flutter, i.e., is valid.
// We give up sending a new event to flutter.
if *self.rgba_valid.read().unwrap() {
if self.rgba_valid.load(Ordering::Relaxed) {
return;
}
self.rgba_valid.store(true, Ordering::Relaxed);
// Return the rgba buffer to the video handler for reusing allocated rgba buffer.
std::mem::swap::<Vec<u8>>(data.write().unwrap().as_mut(), self.rgba.write().unwrap().as_mut());
std::mem::swap::<Vec<u8>>(data, &mut *self.rgba.write().unwrap());
if let Some(stream) = &*self.event_stream.read().unwrap() {
stream.add(EventToUI::Rgba);
}
let _ = std::mem::replace(&mut *self.rgba_valid.write().unwrap(), true);
}
fn set_peer_info(&self, pi: &PeerInfo) {
@ -421,13 +423,17 @@ impl InvokeUiSession for FlutterHandler {
self.push_event("on_voice_call_incoming", [].into());
}
fn get_rgba(&mut self, buffer: *mut u8) {
// [Safety]
// * It must be ensures the buffer has enough space to place the whole rgba.
let max_len = self.rgba.read().unwrap().len();
unsafe { std::ptr::copy_nonoverlapping(self.rgba.read().unwrap().as_ptr(), buffer, max_len)};
// mark the rgba has been taken from flutter.
let _ = std::mem::replace(&mut *self.rgba_valid.write().unwrap(), false);
#[inline]
fn get_rgba(&self) -> *const u8 {
if self.rgba_valid.load(Ordering::Relaxed) {
return self.rgba.read().unwrap().as_ptr();
}
std::ptr::null_mut()
}
#[inline]
fn next_rgba(&mut self) {
self.rgba_valid.store(false, Ordering::Relaxed);
}
}
@ -668,11 +674,22 @@ pub fn session_get_rgba_size(id: *const char) -> usize {
}
#[no_mangle]
pub fn session_get_rgba(id: *const char, buffer: *mut u8) {
pub fn session_get_rgba(id: *const char) -> *const u8 {
let id = unsafe { std::ffi::CStr::from_ptr(id as _) };
if let Ok(id) = id.to_str() {
if let Some(session) = SESSIONS.write().unwrap().get_mut(id) {
return session.get_rgba(buffer);
return session.get_rgba();
}
}
std::ptr::null()
}
#[no_mangle]
pub fn session_next_rgba(id: *const char) {
let id = unsafe { std::ffi::CStr::from_ptr(id as _) };
if let Ok(id) = id.to_str() {
if let Some(session) = SESSIONS.write().unwrap().get_mut(id) {
return session.next_rgba();
}
}
}

View File

@ -203,12 +203,12 @@ impl InvokeUiSession for SciterHandler {
self.call("adaptSize", &make_args!());
}
fn on_rgba(&self, data: Arc<RwLock<Vec<u8>>>) {
fn on_rgba(&self, data: &mut Vec<u8>) {
VIDEO
.lock()
.unwrap()
.as_mut()
.map(|v| v.render_frame(data.read().unwrap().as_ref()).ok());
.map(|v| v.render_frame(data).ok());
}
fn set_peer_info(&self, pi: &PeerInfo) {
@ -286,7 +286,9 @@ impl InvokeUiSession for SciterHandler {
}
/// RGBA is directly rendered by [on_rgba]. No need to store the rgba for the sciter ui.
fn get_rgba(&mut self, _buffer: *mut u8) {}
fn get_rgba(&self) -> *const u8 { std::ptr::null() }
fn next_rgba(&mut self) {}
}
pub struct SciterSession(Session<SciterHandler>);

View File

@ -712,7 +712,7 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default {
fn update_block_input_state(&self, on: bool);
fn job_progress(&self, id: i32, file_num: i32, speed: f64, finished_size: f64);
fn adapt_size(&self);
fn on_rgba(&self, data: Arc<RwLock<Vec<u8>>>);
fn on_rgba(&self, data: &mut Vec<u8>);
fn msgbox(&self, msgtype: &str, title: &str, text: &str, link: &str, retry: bool);
#[cfg(any(target_os = "android", target_os = "ios"))]
fn clipboard(&self, content: String);
@ -722,7 +722,8 @@ pub trait InvokeUiSession: Send + Sync + Clone + 'static + Sized + Default {
fn on_voice_call_closed(&self, reason: &str);
fn on_voice_call_waiting(&self);
fn on_voice_call_incoming(&self);
fn get_rgba(&mut self, buffer: *mut u8);
fn get_rgba(&self) -> *const u8;
fn next_rgba(&mut self);
}
impl<T: InvokeUiSession> Deref for Session<T> {
@ -957,7 +958,7 @@ pub async fn io_loop<T: InvokeUiSession>(handler: Session<T>) {
let frame_count = Arc::new(AtomicUsize::new(0));
let frame_count_cl = frame_count.clone();
let ui_handler = handler.ui_handler.clone();
let (video_sender, audio_sender) = start_video_audio_threads(move |data: Arc<RwLock<Vec<u8>>> | {
let (video_sender, audio_sender) = start_video_audio_threads(move |data: &mut Vec<u8> | {
frame_count_cl.fetch_add(1, Ordering::Relaxed);
ui_handler.on_rgba(data);
});