From bbc6c98775de9a21f45260fdceba05b8d7c5bcd5 Mon Sep 17 00:00:00 2001 From: rustdesk Date: Wed, 22 Mar 2023 14:13:24 +0800 Subject: [PATCH] vdi: split server.rs into server.rs + console.rs + connection.rs --- vdi/host/src/connection.rs | 1 + vdi/host/src/console.rs | 193 ++++++++++++++++++++++++++ vdi/host/src/lib.rs | 2 + vdi/host/src/server.rs | 275 ++++++------------------------------- 4 files changed, 235 insertions(+), 236 deletions(-) create mode 100644 vdi/host/src/connection.rs create mode 100644 vdi/host/src/console.rs diff --git a/vdi/host/src/connection.rs b/vdi/host/src/connection.rs new file mode 100644 index 000000000..f416ce4e0 --- /dev/null +++ b/vdi/host/src/connection.rs @@ -0,0 +1 @@ +use hbb_common::{message_proto::*, tokio, ResultType}; diff --git a/vdi/host/src/console.rs b/vdi/host/src/console.rs new file mode 100644 index 000000000..29dcbbfc9 --- /dev/null +++ b/vdi/host/src/console.rs @@ -0,0 +1,193 @@ +use hbb_common::{log, tokio, ResultType}; +use image::GenericImage; +use qemu_display::{Console, ConsoleListenerHandler, MouseButton}; +use std::{collections::HashSet, sync::Arc, time}; +pub use tokio::sync::{mpsc, Mutex}; + +#[derive(Debug)] +pub enum Event { + ConsoleUpdate((i32, i32, i32, i32)), + Disconnected, +} + +const PIXMAN_X8R8G8B8: u32 = 0x20020888; +pub type BgraImage = image::ImageBuffer, Vec>; +#[derive(Debug)] +pub struct ConsoleListener { + pub image: Arc>, + pub tx: mpsc::UnboundedSender, +} + +#[async_trait::async_trait] +impl ConsoleListenerHandler for ConsoleListener { + async fn scanout(&mut self, s: qemu_display::Scanout) { + *self.image.lock().await = image_from_vec(s.format, s.width, s.height, s.stride, s.data); + } + + async fn update(&mut self, u: qemu_display::Update) { + let update = image_from_vec(u.format, u.w as _, u.h as _, u.stride, u.data); + let mut image = self.image.lock().await; + if (u.x, u.y) == (0, 0) && update.dimensions() == image.dimensions() { + *image = update; + } else { + image.copy_from(&update, u.x as _, u.y as _).unwrap(); + } + self.tx + .send(Event::ConsoleUpdate((u.x, u.y, u.w, u.h))) + .ok(); + } + + async fn scanout_dmabuf(&mut self, _scanout: qemu_display::ScanoutDMABUF) { + unimplemented!() + } + + async fn update_dmabuf(&mut self, _update: qemu_display::UpdateDMABUF) { + unimplemented!() + } + + async fn mouse_set(&mut self, set: qemu_display::MouseSet) { + dbg!(set); + } + + async fn cursor_define(&mut self, cursor: qemu_display::Cursor) { + dbg!(cursor); + } + + fn disconnected(&mut self) { + dbg!(); + } +} + +#[derive(derivative::Derivative)] +#[derivative(Debug)] +pub struct Client { + #[derivative(Debug = "ignore")] + console: Arc>, + last_update: Option, + has_update: bool, + req_update: bool, + last_buttons: HashSet, + dimensions: (u16, u16), + image: Arc>, +} + +impl Client { + pub fn new(console: Arc>, image: Arc>) -> Self { + Self { + console, + image, + last_update: None, + has_update: false, + req_update: false, + last_buttons: HashSet::new(), + dimensions: (0, 0), + } + } + + pub fn update_pending(&self) -> bool { + self.has_update && self.req_update + } + + pub async fn key_event(&self, qnum: u32, down: bool) -> ResultType<()> { + let console = self.console.lock().await; + if down { + console.keyboard.press(qnum).await?; + } else { + console.keyboard.release(qnum).await?; + } + Ok(()) + } + + pub async fn desktop_resize(&mut self) -> ResultType<()> { + let image = self.image.lock().await; + let (width, height) = (image.width() as _, image.height() as _); + if (width, height) == self.dimensions { + return Ok(()); + } + self.dimensions = (width, height); + Ok(()) + } + + pub async fn send_framebuffer_update(&mut self) -> ResultType<()> { + self.desktop_resize().await?; + if self.has_update && self.req_update { + if let Some(last_update) = self.last_update { + if last_update.elapsed().as_millis() < 10 { + log::info!("TODO: <10ms, could delay update..") + } + } + // self.server.send_framebuffer_update(&self.vnc_server)?; + self.last_update = Some(time::Instant::now()); + self.has_update = false; + self.req_update = false; + } + Ok(()) + } + + pub async fn handle_event(&mut self, event: Option) -> ResultType { + match event { + Some(Event::ConsoleUpdate(_)) => { + self.has_update = true; + } + Some(Event::Disconnected) => { + return Ok(false); + } + None => { + self.send_framebuffer_update().await?; + } + } + + Ok(true) + } +} + +fn image_from_vec(format: u32, width: u32, height: u32, stride: u32, data: Vec) -> BgraImage { + if format != PIXMAN_X8R8G8B8 { + todo!("unhandled pixman format: {}", format) + } + if cfg!(target_endian = "big") { + todo!("pixman/image in big endian") + } + let layout = image::flat::SampleLayout { + channels: 4, + channel_stride: 1, + width, + width_stride: 4, + height, + height_stride: stride as _, + }; + let samples = image::flat::FlatSamples { + samples: data, + layout, + color_hint: None, + }; + samples + .try_into_buffer::>() + .or_else::<&str, _>(|(_err, samples)| { + let view = samples.as_view::>().unwrap(); + let mut img = BgraImage::new(width, height); + img.copy_from(&view, 0, 0).unwrap(); + Ok(img) + }) + .unwrap() +} + +fn button_mask_to_set(mask: u8) -> HashSet { + let mut set = HashSet::new(); + if mask & 0b0000_0001 != 0 { + set.insert(MouseButton::Left); + } + if mask & 0b0000_0010 != 0 { + set.insert(MouseButton::Middle); + } + if mask & 0b0000_0100 != 0 { + set.insert(MouseButton::Right); + } + if mask & 0b0000_1000 != 0 { + set.insert(MouseButton::WheelUp); + } + if mask & 0b0001_0000 != 0 { + set.insert(MouseButton::WheelDown); + } + set +} diff --git a/vdi/host/src/lib.rs b/vdi/host/src/lib.rs index 74f47ad34..e9f8d7ed3 100644 --- a/vdi/host/src/lib.rs +++ b/vdi/host/src/lib.rs @@ -1 +1,3 @@ pub mod server; +mod console; +mod connection; diff --git a/vdi/host/src/server.rs b/vdi/host/src/server.rs index 12f4526c9..5fd28d2d7 100644 --- a/vdi/host/src/server.rs +++ b/vdi/host/src/server.rs @@ -1,23 +1,15 @@ use clap::Parser; -use hbb_common::{ - anyhow::{anyhow, Context}, - log, - message_proto::*, - tokio, ResultType, -}; -use image::GenericImage; -use qemu_display::{Console, ConsoleListenerHandler, MouseButton, VMProxy}; +use hbb_common::{anyhow::Context, log, tokio, ResultType}; +use qemu_display::{Console, VMProxy}; use std::{ borrow::Borrow, - collections::HashSet, - error::Error, - io, - iter::FromIterator, net::{TcpListener, TcpStream}, - sync::{mpsc, Arc, Mutex}, - thread, time, + sync::Arc, + thread, }; +use crate::console::*; + #[derive(Parser, Debug)] pub struct SocketAddrArgs { /// IP address @@ -43,155 +35,12 @@ struct Cli { } #[derive(Debug)] -enum Event { - ConsoleUpdate((i32, i32, i32, i32)), - Disconnected, -} - -const PIXMAN_X8R8G8B8: u32 = 0x20020888; -type BgraImage = image::ImageBuffer, Vec>; - -#[derive(derivative::Derivative)] -#[derivative(Debug)] -struct Client { - #[derivative(Debug = "ignore")] - server: Server, - share: bool, - last_update: Option, - has_update: bool, - req_update: bool, - last_buttons: HashSet, - dimensions: (u16, u16), -} - -impl Client { - fn new(server: Server, share: bool) -> Self { - Self { - server, - share, - last_update: None, - has_update: false, - req_update: false, - last_buttons: HashSet::new(), - dimensions: (0, 0), - } - } - - fn update_pending(&self) -> bool { - self.has_update && self.req_update - } - - async fn key_event(&self, qnum: u32, down: bool) -> ResultType<()> { - let inner = self.server.inner.lock().unwrap(); - if down { - inner.console.keyboard.press(qnum).await?; - } else { - inner.console.keyboard.release(qnum).await?; - } - Ok(()) - } - - fn desktop_resize(&mut self) -> ResultType<()> { - let (width, height) = self.server.dimensions(); - if (width, height) == self.dimensions { - return Ok(()); - } - self.dimensions = (width, height); - Ok(()) - } - - fn send_framebuffer_update(&mut self) -> ResultType<()> { - self.desktop_resize()?; - if self.has_update && self.req_update { - if let Some(last_update) = self.last_update { - if last_update.elapsed().as_millis() < 10 { - println!("TODO: <10ms, could delay update..") - } - } - // self.server.send_framebuffer_update(&self.vnc_server)?; - self.last_update = Some(time::Instant::now()); - self.has_update = false; - self.req_update = false; - } - Ok(()) - } - - async fn handle_event(&mut self, event: Option) -> ResultType { - match event { - Some(Event::ConsoleUpdate(_)) => { - self.has_update = true; - } - Some(Event::Disconnected) => { - return Ok(false); - } - None => { - self.send_framebuffer_update()?; - } - } - - Ok(true) - } -} - -#[derive(Debug)] -struct ConsoleListener { - server: Server, -} - -#[async_trait::async_trait] -impl ConsoleListenerHandler for ConsoleListener { - async fn scanout(&mut self, s: qemu_display::Scanout) { - let mut inner = self.server.inner.lock().unwrap(); - inner.image = image_from_vec(s.format, s.width, s.height, s.stride, s.data); - } - - async fn update(&mut self, u: qemu_display::Update) { - let mut inner = self.server.inner.lock().unwrap(); - let update = image_from_vec(u.format, u.w as _, u.h as _, u.stride, u.data); - if (u.x, u.y) == (0, 0) && update.dimensions() == inner.image.dimensions() { - inner.image = update; - } else { - inner.image.copy_from(&update, u.x as _, u.y as _).unwrap(); - } - inner - .tx - .send(Event::ConsoleUpdate((u.x, u.y, u.w, u.h))) - .unwrap(); - } - - async fn scanout_dmabuf(&mut self, _scanout: qemu_display::ScanoutDMABUF) { - unimplemented!() - } - - async fn update_dmabuf(&mut self, _update: qemu_display::UpdateDMABUF) { - unimplemented!() - } - - async fn mouse_set(&mut self, set: qemu_display::MouseSet) { - dbg!(set); - } - - async fn cursor_define(&mut self, cursor: qemu_display::Cursor) { - dbg!(cursor); - } - - fn disconnected(&mut self) { - dbg!(); - } -} - -#[derive(Debug)] -struct ServerInner { - console: Console, - image: BgraImage, - tx: mpsc::Sender, -} - -#[derive(Clone, Debug)] struct Server { vm_name: String, - rx: Arc>>, - inner: Arc>, + rx: mpsc::UnboundedReceiver, + tx: mpsc::UnboundedSender, + image: Arc>, + console: Arc>, } impl Server { @@ -199,118 +48,73 @@ impl Server { let width = console.width().await?; let height = console.height().await?; let image = BgraImage::new(width as _, height as _); - let (tx, rx) = mpsc::channel(); + let (tx, rx) = mpsc::unbounded_channel(); Ok(Self { vm_name, - rx: Arc::new(Mutex::new(rx)), - inner: Arc::new(Mutex::new(ServerInner { console, image, tx })), + rx, + image: Arc::new(Mutex::new(image)), + tx, + console: Arc::new(Mutex::new(console)), }) } - fn stop_console(&self) -> ResultType<()> { - let mut inner = self.inner.lock().unwrap(); - inner.console.unregister_listener(); + async fn stop_console(&self) -> ResultType<()> { + self.console.lock().await.unregister_listener(); Ok(()) } async fn run_console(&self) -> ResultType<()> { - let inner = self.inner.lock().unwrap(); - inner - .console + self.console + .lock() + .await .register_listener(ConsoleListener { - server: self.clone(), + image: self.image.clone(), + tx: self.tx.clone(), }) .await?; Ok(()) } - fn dimensions(&self) -> (u16, u16) { - let inner = self.inner.lock().unwrap(); - (inner.image.width() as u16, inner.image.height() as u16) + async fn dimensions(&self) -> (u16, u16) { + let image = self.image.lock().await; + (image.width() as u16, image.height() as u16) } - async fn handle_connection(&self, stream: TcpStream) -> ResultType<()> { - let (width, height) = self.dimensions(); + async fn handle_connection(&mut self, stream: TcpStream) -> ResultType<()> { + let (width, height) = self.dimensions().await; - let tx = self.inner.lock().unwrap().tx.clone(); + let tx = self.tx.clone(); let _client_thread = thread::spawn(move || loop {}); - let mut client = Client::new(self.clone(), true); + let mut client = Client::new(self.console.clone(), self.image.clone()); self.run_console().await?; - let rx = self.rx.lock().unwrap(); loop { let ev = if client.update_pending() { - match rx.try_recv() { + match self.rx.try_recv() { Ok(e) => Some(e), - Err(mpsc::TryRecvError::Empty) => None, + Err(mpsc::error::TryRecvError::Empty) => None, Err(e) => { return Err(e.into()); } } } else { - Some(rx.recv()?) + Some( + self.rx + .recv() + .await + .context("Channel closed unexpectedly")?, + ) }; if !client.handle_event(ev).await? { break; } } - self.stop_console()?; + self.stop_console().await?; Ok(()) } } -fn button_mask_to_set(mask: u8) -> HashSet { - let mut set = HashSet::new(); - if mask & 0b0000_0001 != 0 { - set.insert(MouseButton::Left); - } - if mask & 0b0000_0010 != 0 { - set.insert(MouseButton::Middle); - } - if mask & 0b0000_0100 != 0 { - set.insert(MouseButton::Right); - } - if mask & 0b0000_1000 != 0 { - set.insert(MouseButton::WheelUp); - } - if mask & 0b0001_0000 != 0 { - set.insert(MouseButton::WheelDown); - } - set -} - -fn image_from_vec(format: u32, width: u32, height: u32, stride: u32, data: Vec) -> BgraImage { - if format != PIXMAN_X8R8G8B8 { - todo!("unhandled pixman format: {}", format) - } - if cfg!(target_endian = "big") { - todo!("pixman/image in big endian") - } - let layout = image::flat::SampleLayout { - channels: 4, - channel_stride: 1, - width, - width_stride: 4, - height, - height_stride: stride as _, - }; - let samples = image::flat::FlatSamples { - samples: data, - layout, - color_hint: None, - }; - samples - .try_into_buffer::>() - .or_else::<&str, _>(|(_err, samples)| { - let view = samples.as_view::>().unwrap(); - let mut img = BgraImage::new(width, height); - img.copy_from(&view, 0, 0).unwrap(); - Ok(img) - }) - .unwrap() -} - #[tokio::main] pub async fn run() -> ResultType<()> { let args = Cli::parse(); @@ -329,10 +133,9 @@ pub async fn run() -> ResultType<()> { let console = Console::new(&dbus.into(), 0) .await .context("Failed to get the console")?; - let server = Server::new(format!("qemu-rustdesk ({})", vm_name), console).await?; + let mut server = Server::new(format!("qemu-rustdesk ({})", vm_name), console).await?; for stream in listener.incoming() { let stream = stream?; - let server = server.clone(); if let Err(err) = server.handle_connection(stream).await { log::error!("Connection closed: {err}"); }