diff --git a/libs/hbb_common/src/udp.rs b/libs/hbb_common/src/udp.rs index c2d5057f2..3532dd1e0 100644 --- a/libs/hbb_common/src/udp.rs +++ b/libs/hbb_common/src/udp.rs @@ -37,7 +37,6 @@ fn new_socket(addr: SocketAddr, reuse: bool, buf_size: usize) -> Result ResultType<()> { + let addr = SocketAddr::from(([0, 0, 0, 0], get_broadcast_port())); + let socket = std::net::UdpSocket::bind(addr)?; + socket.set_read_timeout(Some(std::time::Duration::from_millis(1000)))?; + log::info!("lan discovery listener started"); + loop { + let mut buf = [0; 2048]; + if let Ok((len, addr)) = socket.recv_from(&mut buf) { + if let Ok(msg_in) = Message::parse_from_bytes(&buf[0..len]) { + match msg_in.union { + Some(rendezvous_message::Union::PeerDiscovery(p)) => { + if p.cmd == "ping" { + if let Some(self_addr) = get_ipaddr_by_peer(&addr) { + let mut msg_out = Message::new(); + let peer = PeerDiscovery { + cmd: "pong".to_owned(), + mac: get_mac(&self_addr), + id: Config::get_id(), + hostname: whoami::hostname(), + username: crate::platform::get_active_username(), + platform: whoami::platform().to_string(), + ..Default::default() + }; + msg_out.set_peer_discovery(peer); + socket.send_to(&msg_out.write_to_bytes()?, addr).ok(); + } + } + } + _ => {} + } + } + } + } +} + +#[tokio::main(flavor = "current_thread")] +pub async fn discover() -> ResultType<()> { + let sockets = send_query()?; + let rx = spawn_wait_responses(sockets); + handle_received_peers(rx).await?; + + log::info!("discover ping done"); + Ok(()) +} + +pub fn send_wol(id: String) { + let interfaces = default_net::get_interfaces(); + for peer in &config::LanPeers::load().peers { + if peer.id == id { + for (ip, mac) in peer.ip_mac.iter() { + if let Ok(mac_addr) = mac.parse() { + if let Ok(IpAddr::V4(ip)) = ip.parse() { + for interface in &interfaces { + for ipv4 in &interface.ipv4 { + if (u32::from(ipv4.addr) & u32::from(ipv4.netmask)) + == (u32::from(ip) & u32::from(ipv4.netmask)) + { + allow_err!(wol::send_wol( + mac_addr, + None, + Some(IpAddr::V4(ipv4.addr)) + )); + } + } + } + } + } + } + break; + } + } +} + +#[inline] +fn get_broadcast_port() -> u16 { + (RENDEZVOUS_PORT + 3) as _ +} + +fn get_mac(ip: &IpAddr) -> String { + #[cfg(not(any(target_os = "android", target_os = "ios")))] + if let Ok(mac) = get_mac_by_ip(ip) { + mac.to_string() + } else { + "".to_owned() + } + #[cfg(any(target_os = "android", target_os = "ios"))] + "".to_owned() +} + +fn get_all_ipv4s() -> ResultType> { + let mut ipv4s = Vec::new(); + for interface in default_net::get_interfaces() { + for ipv4 in &interface.ipv4 { + ipv4s.push(ipv4.addr.clone()); + } + } + Ok(ipv4s) +} + +fn get_mac_by_ip(ip: &IpAddr) -> ResultType { + for interface in default_net::get_interfaces() { + match ip { + IpAddr::V4(local_ipv4) => { + if interface.ipv4.iter().any(|x| x.addr == *local_ipv4) { + if let Some(mac_addr) = interface.mac_addr { + return Ok(mac_addr.address()); + } + } + } + IpAddr::V6(local_ipv6) => { + if interface.ipv6.iter().any(|x| x.addr == *local_ipv6) { + if let Some(mac_addr) = interface.mac_addr { + return Ok(mac_addr.address()); + } + } + } + } + } + bail!("No interface found for ip: {:?}", ip); +} + +// Mainly from https://github.com/shellrow/default-net/blob/cf7ca24e7e6e8e566ed32346c9cfddab3f47e2d6/src/interface/shared.rs#L4 +fn get_ipaddr_by_peer(peer: A) -> Option { + let socket = match UdpSocket::bind("0.0.0.0:0") { + Ok(s) => s, + Err(_) => return None, + }; + + match socket.connect(peer) { + Ok(()) => (), + Err(_) => return None, + }; + + match socket.local_addr() { + Ok(addr) => return Some(addr.ip()), + Err(_) => return None, + }; +} + +fn create_broadcast_sockets() -> ResultType> { + let mut sockets = Vec::new(); + for v4_addr in get_all_ipv4s()? { + if v4_addr.is_private() { + let s = UdpSocket::bind(SocketAddr::from((v4_addr, 0)))?; + s.set_broadcast(true)?; + log::debug!("Bind socket to {}", &v4_addr); + sockets.push(s) + } + } + Ok(sockets) +} + +fn send_query() -> ResultType> { + let sockets = create_broadcast_sockets()?; + if sockets.is_empty() { + bail!("Found no ipv4 addresses"); + } + + let mut msg_out = Message::new(); + let peer = PeerDiscovery { + cmd: "ping".to_owned(), + ..Default::default() + }; + msg_out.set_peer_discovery(peer); + let maddr = SocketAddr::from(([255, 255, 255, 255], get_broadcast_port())); + for socket in &sockets { + socket.send_to(&msg_out.write_to_bytes()?, maddr)?; + } + log::info!("discover ping sent"); + Ok(sockets) +} + +fn wait_response( + socket: UdpSocket, + timeout: Option, + tx: UnboundedSender, +) -> ResultType<()> { + let mut last_recv_time = Instant::now(); + + socket.set_read_timeout(timeout)?; + loop { + let mut buf = [0; 2048]; + if let Ok((len, addr)) = socket.recv_from(&mut buf) { + if let Ok(msg_in) = Message::parse_from_bytes(&buf[0..len]) { + match msg_in.union { + Some(rendezvous_message::Union::PeerDiscovery(p)) => { + last_recv_time = Instant::now(); + if p.cmd == "pong" { + let mac = if let Some(self_addr) = get_ipaddr_by_peer(&addr) { + get_mac(&self_addr) + } else { + "".to_owned() + }; + + if mac != p.mac { + allow_err!(tx.send(config::DiscoveryPeer { + id: p.id.clone(), + ip_mac: HashMap::from([ + (addr.ip().to_string(), p.mac.clone(),) + ]), + username: p.username.clone(), + hostname: p.hostname.clone(), + platform: p.platform.clone(), + online: true, + })); + } + } + } + _ => {} + } + } + } + if last_recv_time.elapsed().as_millis() > 3_000 { + break; + } + } + Ok(()) +} + +fn spawn_wait_responses(sockets: Vec) -> UnboundedReceiver { + let (tx, rx) = unbounded_channel::<_>(); + for socket in sockets { + let tx_clone = tx.clone(); + std::thread::spawn(move || { + allow_err!(wait_response( + socket, + Some(std::time::Duration::from_millis(10)), + tx_clone + )); + }); + } + rx +} + +async fn handle_received_peers(mut rx: UnboundedReceiver) -> ResultType<()> { + let mut peers = config::LanPeers::load().peers; + peers.iter_mut().for_each(|peer| { + peer.online = false; + }); + + let mut response_set = HashSet::new(); + let mut last_write_time = Instant::now() - std::time::Duration::from_secs(4); + loop { + tokio::select! { + data = rx.recv() => match data { + Some(mut peer) => { + let in_response_set = !response_set.insert(peer.id.clone()); + if let Some(pos) = peers.iter().position(|x| x.is_same_peer(&peer) ) { + let peer1 = peers.remove(pos); + if in_response_set { + peer.ip_mac.extend(peer1.ip_mac); + peer.online = true; + } + } + peers.insert(0, peer); + if last_write_time.elapsed().as_millis() > 300 { + config::LanPeers::store(&peers); + last_write_time = Instant::now(); + } + } + None => { + break + } + } + } + } + + config::LanPeers::store(&peers); + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 93cd67738..715a59b5c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,8 @@ mod client; #[cfg(not(any(target_os = "ios")))] mod rendezvous_mediator; #[cfg(not(any(target_os = "ios")))] +mod lan; +#[cfg(not(any(target_os = "ios")))] pub use self::rendezvous_mediator::*; /// cbindgen:ignore pub mod common; diff --git a/src/rendezvous_mediator.rs b/src/rendezvous_mediator.rs index 8da1937a7..6dc7a11c9 100644 --- a/src/rendezvous_mediator.rs +++ b/src/rendezvous_mediator.rs @@ -2,7 +2,7 @@ use crate::server::{check_zombie, new as new_server, ServerPtr}; use hbb_common::{ allow_err, anyhow::bail, - config::{self, Config, REG_INTERVAL, RENDEZVOUS_PORT, RENDEZVOUS_TIMEOUT}, + config::{Config, REG_INTERVAL, RENDEZVOUS_PORT, RENDEZVOUS_TIMEOUT}, futures::future::join_all, log, protobuf::Message as _, @@ -10,15 +10,13 @@ use hbb_common::{ sleep, socket_client, tokio::{ self, select, - sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, time::{interval, Duration}, }, udp::FramedSocket, AddrMangle, IntoTargetAddr, ResultType, TargetAddr, }; use std::{ - collections::{HashMap, HashSet}, - net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket}, + net::SocketAddr, sync::{ atomic::{AtomicBool, Ordering}, Arc, Mutex, @@ -66,7 +64,7 @@ impl RendezvousMediator { #[cfg(not(any(target_os = "android", target_os = "ios")))] if crate::platform::is_installed() { std::thread::spawn(move || { - allow_err!(lan_discovery()); + allow_err!(super::lan::start_listening()); }); } loop { @@ -542,274 +540,3 @@ async fn direct_server(server: ServerPtr) { } } } - -#[inline] -pub fn get_broadcast_port() -> u16 { - (RENDEZVOUS_PORT + 3) as _ -} - -pub fn get_mac(ip: &IpAddr) -> String { - #[cfg(not(any(target_os = "android", target_os = "ios")))] - if let Ok(mac) = get_mac_by_ip(ip) { - mac.to_string() - } else { - "".to_owned() - } - #[cfg(any(target_os = "android", target_os = "ios"))] - "".to_owned() -} - -fn get_all_ipv4s() -> ResultType> { - let mut ipv4s = Vec::new(); - for interface in default_net::get_interfaces() { - for ipv4 in &interface.ipv4 { - ipv4s.push(ipv4.addr.clone()); - } - } - Ok(ipv4s) -} - -fn get_mac_by_ip(ip: &IpAddr) -> ResultType { - for interface in default_net::get_interfaces() { - match ip { - IpAddr::V4(local_ipv4) => { - if interface.ipv4.iter().any(|x| x.addr == *local_ipv4) { - if let Some(mac_addr) = interface.mac_addr { - return Ok(mac_addr.address()); - } - } - } - IpAddr::V6(local_ipv6) => { - if interface.ipv6.iter().any(|x| x.addr == *local_ipv6) { - if let Some(mac_addr) = interface.mac_addr { - return Ok(mac_addr.address()); - } - } - } - } - } - bail!("No interface found for ip: {:?}", ip); -} - -// Mainly from https://github.com/shellrow/default-net/blob/cf7ca24e7e6e8e566ed32346c9cfddab3f47e2d6/src/interface/shared.rs#L4 -pub fn get_ipaddr_by_peer(peer: A) -> Option { - let socket = match UdpSocket::bind("0.0.0.0:0") { - Ok(s) => s, - Err(_) => return None, - }; - - match socket.connect(peer) { - Ok(()) => (), - Err(_) => return None, - }; - - match socket.local_addr() { - Ok(addr) => return Some(addr.ip()), - Err(_) => return None, - }; -} - -fn lan_discovery() -> ResultType<()> { - let addr = SocketAddr::from(([0, 0, 0, 0], get_broadcast_port())); - let socket = std::net::UdpSocket::bind(addr)?; - socket.set_read_timeout(Some(std::time::Duration::from_millis(1000)))?; - log::info!("lan discovery listener started"); - loop { - let mut buf = [0; 2048]; - if let Ok((len, addr)) = socket.recv_from(&mut buf) { - if let Ok(msg_in) = Message::parse_from_bytes(&buf[0..len]) { - match msg_in.union { - Some(rendezvous_message::Union::PeerDiscovery(p)) => { - if p.cmd == "ping" { - if let Some(self_addr) = get_ipaddr_by_peer(&addr) { - let mut msg_out = Message::new(); - let peer = PeerDiscovery { - cmd: "pong".to_owned(), - mac: get_mac(&self_addr), - id: Config::get_id(), - hostname: whoami::hostname(), - username: crate::platform::get_active_username(), - platform: whoami::platform().to_string(), - ..Default::default() - }; - msg_out.set_peer_discovery(peer); - socket.send_to(&msg_out.write_to_bytes()?, addr).ok(); - } - } - } - _ => {} - } - } - } - } -} - -fn create_broadcast_sockets() -> ResultType> { - let mut sockets = Vec::new(); - for v4_addr in get_all_ipv4s()? { - if v4_addr.is_private() { - let s = UdpSocket::bind(SocketAddr::from((v4_addr, 0)))?; - s.set_broadcast(true)?; - log::debug!("Bind socket to {}", &v4_addr); - sockets.push(s) - } - } - Ok(sockets) -} - -fn send_query() -> ResultType> { - let sockets = create_broadcast_sockets()?; - if sockets.is_empty() { - bail!("Found no ipv4 addresses"); - } - - let mut msg_out = Message::new(); - let peer = PeerDiscovery { - cmd: "ping".to_owned(), - ..Default::default() - }; - msg_out.set_peer_discovery(peer); - let maddr = SocketAddr::from(([255, 255, 255, 255], get_broadcast_port())); - for socket in &sockets { - socket.send_to(&msg_out.write_to_bytes()?, maddr)?; - } - log::info!("discover ping sent"); - Ok(sockets) -} - -fn wait_response( - socket: UdpSocket, - timeout: Option, - tx: UnboundedSender, -) -> ResultType<()> { - let mut last_recv_time = Instant::now(); - - socket.set_read_timeout(timeout)?; - loop { - let mut buf = [0; 2048]; - if let Ok((len, addr)) = socket.recv_from(&mut buf) { - if let Ok(msg_in) = Message::parse_from_bytes(&buf[0..len]) { - match msg_in.union { - Some(rendezvous_message::Union::PeerDiscovery(p)) => { - last_recv_time = Instant::now(); - if p.cmd == "pong" { - let mac = if let Some(self_addr) = get_ipaddr_by_peer(&addr) { - get_mac(&self_addr) - } else { - "".to_owned() - }; - - if mac != p.mac { - allow_err!(tx.send(config::DiscoveryPeer { - id: p.id.clone(), - ip_mac: HashMap::from([ - (addr.ip().to_string(), p.mac.clone(),) - ]), - username: p.username.clone(), - hostname: p.hostname.clone(), - platform: p.platform.clone(), - online: true, - })); - } - } - } - _ => {} - } - } - } - if last_recv_time.elapsed().as_millis() > 3_000 { - break; - } - } - Ok(()) -} - -fn spawn_wait_responses(sockets: Vec) -> UnboundedReceiver { - let (tx, rx) = unbounded_channel::<_>(); - for socket in sockets { - let tx_clone = tx.clone(); - std::thread::spawn(move || { - allow_err!(wait_response( - socket, - Some(std::time::Duration::from_millis(10)), - tx_clone - )); - }); - } - rx -} - -async fn handle_received_peers(mut rx: UnboundedReceiver) -> ResultType<()> { - let mut peers = config::LanPeers::load().peers; - peers.iter_mut().for_each(|peer| { - peer.online = false; - }); - - let mut response_set = HashSet::new(); - let mut last_write_time = Instant::now() - std::time::Duration::from_secs(4); - loop { - tokio::select! { - data = rx.recv() => match data { - Some(mut peer) => { - let in_response_set = !response_set.insert(peer.id.clone()); - if let Some(pos) = peers.iter().position(|x| x.is_same_peer(&peer) ) { - let peer1 = peers.remove(pos); - if in_response_set { - peer.ip_mac.extend(peer1.ip_mac); - peer.online = true; - } - } - peers.insert(0, peer); - if last_write_time.elapsed().as_millis() > 300 { - config::LanPeers::store(&peers); - last_write_time = Instant::now(); - } - } - None => { - break - } - } - } - } - - config::LanPeers::store(&peers); - Ok(()) -} - -#[tokio::main(flavor = "current_thread")] -pub async fn discover() -> ResultType<()> { - let sockets = send_query()?; - let rx = spawn_wait_responses(sockets); - handle_received_peers(rx).await?; - - log::info!("discover ping done"); - Ok(()) -} - -pub fn send_wol(id: String) { - let interfaces = default_net::get_interfaces(); - for peer in &config::LanPeers::load().peers { - if peer.id == id { - for (ip, mac) in peer.ip_mac.iter() { - if let Ok(mac_addr) = mac.parse() { - if let Ok(IpAddr::V4(ip)) = ip.parse() { - for interface in &interfaces { - for ipv4 in &interface.ipv4 { - if (u32::from(ipv4.addr) & u32::from(ipv4.netmask)) - == (u32::from(ip) & u32::from(ipv4.netmask)) - { - allow_err!(wol::send_wol( - mac_addr, - None, - Some(IpAddr::V4(ipv4.addr)) - )); - } - } - } - } - } - } - break; - } - } -} diff --git a/src/ui.rs b/src/ui.rs index 084422f6b..2b7ffcc29 100644 --- a/src/ui.rs +++ b/src/ui.rs @@ -548,7 +548,7 @@ impl UI { } fn send_wol(&mut self, id: String) { - crate::rendezvous_mediator::send_wol(id) + crate::lan::send_wol(id) } fn new_remote(&mut self, id: String, remote_type: String) { @@ -695,7 +695,7 @@ impl UI { fn discover(&self) { std::thread::spawn(move || { - allow_err!(crate::rendezvous_mediator::discover()); + allow_err!(crate::lan::discover()); }); }