vdi: split server.rs into server.rs + console.rs + connection.rs
This commit is contained in:
parent
c14619ef1f
commit
bbc6c98775
1
vdi/host/src/connection.rs
Normal file
1
vdi/host/src/connection.rs
Normal file
@ -0,0 +1 @@
|
||||
use hbb_common::{message_proto::*, tokio, ResultType};
|
193
vdi/host/src/console.rs
Normal file
193
vdi/host/src/console.rs
Normal file
@ -0,0 +1,193 @@
|
||||
use hbb_common::{log, tokio, ResultType};
|
||||
use image::GenericImage;
|
||||
use qemu_display::{Console, ConsoleListenerHandler, MouseButton};
|
||||
use std::{collections::HashSet, sync::Arc, time};
|
||||
pub use tokio::sync::{mpsc, Mutex};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Event {
|
||||
ConsoleUpdate((i32, i32, i32, i32)),
|
||||
Disconnected,
|
||||
}
|
||||
|
||||
const PIXMAN_X8R8G8B8: u32 = 0x20020888;
|
||||
pub type BgraImage = image::ImageBuffer<image::Rgba<u8>, Vec<u8>>;
|
||||
#[derive(Debug)]
|
||||
pub struct ConsoleListener {
|
||||
pub image: Arc<Mutex<BgraImage>>,
|
||||
pub tx: mpsc::UnboundedSender<Event>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ConsoleListenerHandler for ConsoleListener {
|
||||
async fn scanout(&mut self, s: qemu_display::Scanout) {
|
||||
*self.image.lock().await = image_from_vec(s.format, s.width, s.height, s.stride, s.data);
|
||||
}
|
||||
|
||||
async fn update(&mut self, u: qemu_display::Update) {
|
||||
let update = image_from_vec(u.format, u.w as _, u.h as _, u.stride, u.data);
|
||||
let mut image = self.image.lock().await;
|
||||
if (u.x, u.y) == (0, 0) && update.dimensions() == image.dimensions() {
|
||||
*image = update;
|
||||
} else {
|
||||
image.copy_from(&update, u.x as _, u.y as _).unwrap();
|
||||
}
|
||||
self.tx
|
||||
.send(Event::ConsoleUpdate((u.x, u.y, u.w, u.h)))
|
||||
.ok();
|
||||
}
|
||||
|
||||
async fn scanout_dmabuf(&mut self, _scanout: qemu_display::ScanoutDMABUF) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn update_dmabuf(&mut self, _update: qemu_display::UpdateDMABUF) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn mouse_set(&mut self, set: qemu_display::MouseSet) {
|
||||
dbg!(set);
|
||||
}
|
||||
|
||||
async fn cursor_define(&mut self, cursor: qemu_display::Cursor) {
|
||||
dbg!(cursor);
|
||||
}
|
||||
|
||||
fn disconnected(&mut self) {
|
||||
dbg!();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(derivative::Derivative)]
|
||||
#[derivative(Debug)]
|
||||
pub struct Client {
|
||||
#[derivative(Debug = "ignore")]
|
||||
console: Arc<Mutex<Console>>,
|
||||
last_update: Option<time::Instant>,
|
||||
has_update: bool,
|
||||
req_update: bool,
|
||||
last_buttons: HashSet<MouseButton>,
|
||||
dimensions: (u16, u16),
|
||||
image: Arc<Mutex<BgraImage>>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
pub fn new(console: Arc<Mutex<Console>>, image: Arc<Mutex<BgraImage>>) -> Self {
|
||||
Self {
|
||||
console,
|
||||
image,
|
||||
last_update: None,
|
||||
has_update: false,
|
||||
req_update: false,
|
||||
last_buttons: HashSet::new(),
|
||||
dimensions: (0, 0),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_pending(&self) -> bool {
|
||||
self.has_update && self.req_update
|
||||
}
|
||||
|
||||
pub async fn key_event(&self, qnum: u32, down: bool) -> ResultType<()> {
|
||||
let console = self.console.lock().await;
|
||||
if down {
|
||||
console.keyboard.press(qnum).await?;
|
||||
} else {
|
||||
console.keyboard.release(qnum).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn desktop_resize(&mut self) -> ResultType<()> {
|
||||
let image = self.image.lock().await;
|
||||
let (width, height) = (image.width() as _, image.height() as _);
|
||||
if (width, height) == self.dimensions {
|
||||
return Ok(());
|
||||
}
|
||||
self.dimensions = (width, height);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn send_framebuffer_update(&mut self) -> ResultType<()> {
|
||||
self.desktop_resize().await?;
|
||||
if self.has_update && self.req_update {
|
||||
if let Some(last_update) = self.last_update {
|
||||
if last_update.elapsed().as_millis() < 10 {
|
||||
log::info!("TODO: <10ms, could delay update..")
|
||||
}
|
||||
}
|
||||
// self.server.send_framebuffer_update(&self.vnc_server)?;
|
||||
self.last_update = Some(time::Instant::now());
|
||||
self.has_update = false;
|
||||
self.req_update = false;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn handle_event(&mut self, event: Option<Event>) -> ResultType<bool> {
|
||||
match event {
|
||||
Some(Event::ConsoleUpdate(_)) => {
|
||||
self.has_update = true;
|
||||
}
|
||||
Some(Event::Disconnected) => {
|
||||
return Ok(false);
|
||||
}
|
||||
None => {
|
||||
self.send_framebuffer_update().await?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
fn image_from_vec(format: u32, width: u32, height: u32, stride: u32, data: Vec<u8>) -> BgraImage {
|
||||
if format != PIXMAN_X8R8G8B8 {
|
||||
todo!("unhandled pixman format: {}", format)
|
||||
}
|
||||
if cfg!(target_endian = "big") {
|
||||
todo!("pixman/image in big endian")
|
||||
}
|
||||
let layout = image::flat::SampleLayout {
|
||||
channels: 4,
|
||||
channel_stride: 1,
|
||||
width,
|
||||
width_stride: 4,
|
||||
height,
|
||||
height_stride: stride as _,
|
||||
};
|
||||
let samples = image::flat::FlatSamples {
|
||||
samples: data,
|
||||
layout,
|
||||
color_hint: None,
|
||||
};
|
||||
samples
|
||||
.try_into_buffer::<image::Rgba<u8>>()
|
||||
.or_else::<&str, _>(|(_err, samples)| {
|
||||
let view = samples.as_view::<image::Rgba<u8>>().unwrap();
|
||||
let mut img = BgraImage::new(width, height);
|
||||
img.copy_from(&view, 0, 0).unwrap();
|
||||
Ok(img)
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn button_mask_to_set(mask: u8) -> HashSet<MouseButton> {
|
||||
let mut set = HashSet::new();
|
||||
if mask & 0b0000_0001 != 0 {
|
||||
set.insert(MouseButton::Left);
|
||||
}
|
||||
if mask & 0b0000_0010 != 0 {
|
||||
set.insert(MouseButton::Middle);
|
||||
}
|
||||
if mask & 0b0000_0100 != 0 {
|
||||
set.insert(MouseButton::Right);
|
||||
}
|
||||
if mask & 0b0000_1000 != 0 {
|
||||
set.insert(MouseButton::WheelUp);
|
||||
}
|
||||
if mask & 0b0001_0000 != 0 {
|
||||
set.insert(MouseButton::WheelDown);
|
||||
}
|
||||
set
|
||||
}
|
@ -1 +1,3 @@
|
||||
pub mod server;
|
||||
mod console;
|
||||
mod connection;
|
||||
|
@ -1,23 +1,15 @@
|
||||
use clap::Parser;
|
||||
use hbb_common::{
|
||||
anyhow::{anyhow, Context},
|
||||
log,
|
||||
message_proto::*,
|
||||
tokio, ResultType,
|
||||
};
|
||||
use image::GenericImage;
|
||||
use qemu_display::{Console, ConsoleListenerHandler, MouseButton, VMProxy};
|
||||
use hbb_common::{anyhow::Context, log, tokio, ResultType};
|
||||
use qemu_display::{Console, VMProxy};
|
||||
use std::{
|
||||
borrow::Borrow,
|
||||
collections::HashSet,
|
||||
error::Error,
|
||||
io,
|
||||
iter::FromIterator,
|
||||
net::{TcpListener, TcpStream},
|
||||
sync::{mpsc, Arc, Mutex},
|
||||
thread, time,
|
||||
sync::Arc,
|
||||
thread,
|
||||
};
|
||||
|
||||
use crate::console::*;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
pub struct SocketAddrArgs {
|
||||
/// IP address
|
||||
@ -43,155 +35,12 @@ struct Cli {
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum Event {
|
||||
ConsoleUpdate((i32, i32, i32, i32)),
|
||||
Disconnected,
|
||||
}
|
||||
|
||||
const PIXMAN_X8R8G8B8: u32 = 0x20020888;
|
||||
type BgraImage = image::ImageBuffer<image::Rgba<u8>, Vec<u8>>;
|
||||
|
||||
#[derive(derivative::Derivative)]
|
||||
#[derivative(Debug)]
|
||||
struct Client {
|
||||
#[derivative(Debug = "ignore")]
|
||||
server: Server,
|
||||
share: bool,
|
||||
last_update: Option<time::Instant>,
|
||||
has_update: bool,
|
||||
req_update: bool,
|
||||
last_buttons: HashSet<MouseButton>,
|
||||
dimensions: (u16, u16),
|
||||
}
|
||||
|
||||
impl Client {
|
||||
fn new(server: Server, share: bool) -> Self {
|
||||
Self {
|
||||
server,
|
||||
share,
|
||||
last_update: None,
|
||||
has_update: false,
|
||||
req_update: false,
|
||||
last_buttons: HashSet::new(),
|
||||
dimensions: (0, 0),
|
||||
}
|
||||
}
|
||||
|
||||
fn update_pending(&self) -> bool {
|
||||
self.has_update && self.req_update
|
||||
}
|
||||
|
||||
async fn key_event(&self, qnum: u32, down: bool) -> ResultType<()> {
|
||||
let inner = self.server.inner.lock().unwrap();
|
||||
if down {
|
||||
inner.console.keyboard.press(qnum).await?;
|
||||
} else {
|
||||
inner.console.keyboard.release(qnum).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn desktop_resize(&mut self) -> ResultType<()> {
|
||||
let (width, height) = self.server.dimensions();
|
||||
if (width, height) == self.dimensions {
|
||||
return Ok(());
|
||||
}
|
||||
self.dimensions = (width, height);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn send_framebuffer_update(&mut self) -> ResultType<()> {
|
||||
self.desktop_resize()?;
|
||||
if self.has_update && self.req_update {
|
||||
if let Some(last_update) = self.last_update {
|
||||
if last_update.elapsed().as_millis() < 10 {
|
||||
println!("TODO: <10ms, could delay update..")
|
||||
}
|
||||
}
|
||||
// self.server.send_framebuffer_update(&self.vnc_server)?;
|
||||
self.last_update = Some(time::Instant::now());
|
||||
self.has_update = false;
|
||||
self.req_update = false;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_event(&mut self, event: Option<Event>) -> ResultType<bool> {
|
||||
match event {
|
||||
Some(Event::ConsoleUpdate(_)) => {
|
||||
self.has_update = true;
|
||||
}
|
||||
Some(Event::Disconnected) => {
|
||||
return Ok(false);
|
||||
}
|
||||
None => {
|
||||
self.send_framebuffer_update()?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ConsoleListener {
|
||||
server: Server,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ConsoleListenerHandler for ConsoleListener {
|
||||
async fn scanout(&mut self, s: qemu_display::Scanout) {
|
||||
let mut inner = self.server.inner.lock().unwrap();
|
||||
inner.image = image_from_vec(s.format, s.width, s.height, s.stride, s.data);
|
||||
}
|
||||
|
||||
async fn update(&mut self, u: qemu_display::Update) {
|
||||
let mut inner = self.server.inner.lock().unwrap();
|
||||
let update = image_from_vec(u.format, u.w as _, u.h as _, u.stride, u.data);
|
||||
if (u.x, u.y) == (0, 0) && update.dimensions() == inner.image.dimensions() {
|
||||
inner.image = update;
|
||||
} else {
|
||||
inner.image.copy_from(&update, u.x as _, u.y as _).unwrap();
|
||||
}
|
||||
inner
|
||||
.tx
|
||||
.send(Event::ConsoleUpdate((u.x, u.y, u.w, u.h)))
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
async fn scanout_dmabuf(&mut self, _scanout: qemu_display::ScanoutDMABUF) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn update_dmabuf(&mut self, _update: qemu_display::UpdateDMABUF) {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn mouse_set(&mut self, set: qemu_display::MouseSet) {
|
||||
dbg!(set);
|
||||
}
|
||||
|
||||
async fn cursor_define(&mut self, cursor: qemu_display::Cursor) {
|
||||
dbg!(cursor);
|
||||
}
|
||||
|
||||
fn disconnected(&mut self) {
|
||||
dbg!();
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct ServerInner {
|
||||
console: Console,
|
||||
image: BgraImage,
|
||||
tx: mpsc::Sender<Event>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
struct Server {
|
||||
vm_name: String,
|
||||
rx: Arc<Mutex<mpsc::Receiver<Event>>>,
|
||||
inner: Arc<Mutex<ServerInner>>,
|
||||
rx: mpsc::UnboundedReceiver<Event>,
|
||||
tx: mpsc::UnboundedSender<Event>,
|
||||
image: Arc<Mutex<BgraImage>>,
|
||||
console: Arc<Mutex<Console>>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
@ -199,118 +48,73 @@ impl Server {
|
||||
let width = console.width().await?;
|
||||
let height = console.height().await?;
|
||||
let image = BgraImage::new(width as _, height as _);
|
||||
let (tx, rx) = mpsc::channel();
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
Ok(Self {
|
||||
vm_name,
|
||||
rx: Arc::new(Mutex::new(rx)),
|
||||
inner: Arc::new(Mutex::new(ServerInner { console, image, tx })),
|
||||
rx,
|
||||
image: Arc::new(Mutex::new(image)),
|
||||
tx,
|
||||
console: Arc::new(Mutex::new(console)),
|
||||
})
|
||||
}
|
||||
|
||||
fn stop_console(&self) -> ResultType<()> {
|
||||
let mut inner = self.inner.lock().unwrap();
|
||||
inner.console.unregister_listener();
|
||||
async fn stop_console(&self) -> ResultType<()> {
|
||||
self.console.lock().await.unregister_listener();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn run_console(&self) -> ResultType<()> {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
inner
|
||||
.console
|
||||
self.console
|
||||
.lock()
|
||||
.await
|
||||
.register_listener(ConsoleListener {
|
||||
server: self.clone(),
|
||||
image: self.image.clone(),
|
||||
tx: self.tx.clone(),
|
||||
})
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn dimensions(&self) -> (u16, u16) {
|
||||
let inner = self.inner.lock().unwrap();
|
||||
(inner.image.width() as u16, inner.image.height() as u16)
|
||||
async fn dimensions(&self) -> (u16, u16) {
|
||||
let image = self.image.lock().await;
|
||||
(image.width() as u16, image.height() as u16)
|
||||
}
|
||||
|
||||
async fn handle_connection(&self, stream: TcpStream) -> ResultType<()> {
|
||||
let (width, height) = self.dimensions();
|
||||
async fn handle_connection(&mut self, stream: TcpStream) -> ResultType<()> {
|
||||
let (width, height) = self.dimensions().await;
|
||||
|
||||
let tx = self.inner.lock().unwrap().tx.clone();
|
||||
let tx = self.tx.clone();
|
||||
let _client_thread = thread::spawn(move || loop {});
|
||||
|
||||
let mut client = Client::new(self.clone(), true);
|
||||
let mut client = Client::new(self.console.clone(), self.image.clone());
|
||||
self.run_console().await?;
|
||||
let rx = self.rx.lock().unwrap();
|
||||
loop {
|
||||
let ev = if client.update_pending() {
|
||||
match rx.try_recv() {
|
||||
match self.rx.try_recv() {
|
||||
Ok(e) => Some(e),
|
||||
Err(mpsc::TryRecvError::Empty) => None,
|
||||
Err(mpsc::error::TryRecvError::Empty) => None,
|
||||
Err(e) => {
|
||||
return Err(e.into());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Some(rx.recv()?)
|
||||
Some(
|
||||
self.rx
|
||||
.recv()
|
||||
.await
|
||||
.context("Channel closed unexpectedly")?,
|
||||
)
|
||||
};
|
||||
if !client.handle_event(ev).await? {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
self.stop_console()?;
|
||||
self.stop_console().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn button_mask_to_set(mask: u8) -> HashSet<MouseButton> {
|
||||
let mut set = HashSet::new();
|
||||
if mask & 0b0000_0001 != 0 {
|
||||
set.insert(MouseButton::Left);
|
||||
}
|
||||
if mask & 0b0000_0010 != 0 {
|
||||
set.insert(MouseButton::Middle);
|
||||
}
|
||||
if mask & 0b0000_0100 != 0 {
|
||||
set.insert(MouseButton::Right);
|
||||
}
|
||||
if mask & 0b0000_1000 != 0 {
|
||||
set.insert(MouseButton::WheelUp);
|
||||
}
|
||||
if mask & 0b0001_0000 != 0 {
|
||||
set.insert(MouseButton::WheelDown);
|
||||
}
|
||||
set
|
||||
}
|
||||
|
||||
fn image_from_vec(format: u32, width: u32, height: u32, stride: u32, data: Vec<u8>) -> BgraImage {
|
||||
if format != PIXMAN_X8R8G8B8 {
|
||||
todo!("unhandled pixman format: {}", format)
|
||||
}
|
||||
if cfg!(target_endian = "big") {
|
||||
todo!("pixman/image in big endian")
|
||||
}
|
||||
let layout = image::flat::SampleLayout {
|
||||
channels: 4,
|
||||
channel_stride: 1,
|
||||
width,
|
||||
width_stride: 4,
|
||||
height,
|
||||
height_stride: stride as _,
|
||||
};
|
||||
let samples = image::flat::FlatSamples {
|
||||
samples: data,
|
||||
layout,
|
||||
color_hint: None,
|
||||
};
|
||||
samples
|
||||
.try_into_buffer::<image::Rgba<u8>>()
|
||||
.or_else::<&str, _>(|(_err, samples)| {
|
||||
let view = samples.as_view::<image::Rgba<u8>>().unwrap();
|
||||
let mut img = BgraImage::new(width, height);
|
||||
img.copy_from(&view, 0, 0).unwrap();
|
||||
Ok(img)
|
||||
})
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
pub async fn run() -> ResultType<()> {
|
||||
let args = Cli::parse();
|
||||
@ -329,10 +133,9 @@ pub async fn run() -> ResultType<()> {
|
||||
let console = Console::new(&dbus.into(), 0)
|
||||
.await
|
||||
.context("Failed to get the console")?;
|
||||
let server = Server::new(format!("qemu-rustdesk ({})", vm_name), console).await?;
|
||||
let mut server = Server::new(format!("qemu-rustdesk ({})", vm_name), console).await?;
|
||||
for stream in listener.incoming() {
|
||||
let stream = stream?;
|
||||
let server = server.clone();
|
||||
if let Err(err) = server.handle_connection(stream).await {
|
||||
log::error!("Connection closed: {err}");
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user