lan_discovery_WOL: fix udp set_nonblocking twice and lan refactor

Signed-off-by: fufesou <shuanglongchen@yeah.net>
This commit is contained in:
fufesou 2022-07-15 20:39:42 +08:00
parent 13406cd359
commit 3613f27afc
5 changed files with 298 additions and 279 deletions

View File

@ -37,7 +37,6 @@ fn new_socket(addr: SocketAddr, reuse: bool, buf_size: usize) -> Result<Socket,
addr,
socket.recv_buffer_size()
);
socket.set_nonblocking(true)?;
socket.bind(&addr.into())?;
Ok(socket)
}

291
src/lan.rs Normal file
View File

@ -0,0 +1,291 @@
use hbb_common::{
allow_err,
anyhow::bail,
config::{self, Config, RENDEZVOUS_PORT},
log,
protobuf::Message as _,
rendezvous_proto::*,
tokio::{
self,
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
},
ResultType,
};
use std::{
collections::{HashMap, HashSet},
net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket},
time::Instant,
};
type Message = RendezvousMessage;
pub(super) fn start_listening() -> ResultType<()> {
let addr = SocketAddr::from(([0, 0, 0, 0], get_broadcast_port()));
let socket = std::net::UdpSocket::bind(addr)?;
socket.set_read_timeout(Some(std::time::Duration::from_millis(1000)))?;
log::info!("lan discovery listener started");
loop {
let mut buf = [0; 2048];
if let Ok((len, addr)) = socket.recv_from(&mut buf) {
if let Ok(msg_in) = Message::parse_from_bytes(&buf[0..len]) {
match msg_in.union {
Some(rendezvous_message::Union::PeerDiscovery(p)) => {
if p.cmd == "ping" {
if let Some(self_addr) = get_ipaddr_by_peer(&addr) {
let mut msg_out = Message::new();
let peer = PeerDiscovery {
cmd: "pong".to_owned(),
mac: get_mac(&self_addr),
id: Config::get_id(),
hostname: whoami::hostname(),
username: crate::platform::get_active_username(),
platform: whoami::platform().to_string(),
..Default::default()
};
msg_out.set_peer_discovery(peer);
socket.send_to(&msg_out.write_to_bytes()?, addr).ok();
}
}
}
_ => {}
}
}
}
}
}
#[tokio::main(flavor = "current_thread")]
pub async fn discover() -> ResultType<()> {
let sockets = send_query()?;
let rx = spawn_wait_responses(sockets);
handle_received_peers(rx).await?;
log::info!("discover ping done");
Ok(())
}
pub fn send_wol(id: String) {
let interfaces = default_net::get_interfaces();
for peer in &config::LanPeers::load().peers {
if peer.id == id {
for (ip, mac) in peer.ip_mac.iter() {
if let Ok(mac_addr) = mac.parse() {
if let Ok(IpAddr::V4(ip)) = ip.parse() {
for interface in &interfaces {
for ipv4 in &interface.ipv4 {
if (u32::from(ipv4.addr) & u32::from(ipv4.netmask))
== (u32::from(ip) & u32::from(ipv4.netmask))
{
allow_err!(wol::send_wol(
mac_addr,
None,
Some(IpAddr::V4(ipv4.addr))
));
}
}
}
}
}
}
break;
}
}
}
#[inline]
fn get_broadcast_port() -> u16 {
(RENDEZVOUS_PORT + 3) as _
}
fn get_mac(ip: &IpAddr) -> String {
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if let Ok(mac) = get_mac_by_ip(ip) {
mac.to_string()
} else {
"".to_owned()
}
#[cfg(any(target_os = "android", target_os = "ios"))]
"".to_owned()
}
fn get_all_ipv4s() -> ResultType<Vec<Ipv4Addr>> {
let mut ipv4s = Vec::new();
for interface in default_net::get_interfaces() {
for ipv4 in &interface.ipv4 {
ipv4s.push(ipv4.addr.clone());
}
}
Ok(ipv4s)
}
fn get_mac_by_ip(ip: &IpAddr) -> ResultType<String> {
for interface in default_net::get_interfaces() {
match ip {
IpAddr::V4(local_ipv4) => {
if interface.ipv4.iter().any(|x| x.addr == *local_ipv4) {
if let Some(mac_addr) = interface.mac_addr {
return Ok(mac_addr.address());
}
}
}
IpAddr::V6(local_ipv6) => {
if interface.ipv6.iter().any(|x| x.addr == *local_ipv6) {
if let Some(mac_addr) = interface.mac_addr {
return Ok(mac_addr.address());
}
}
}
}
}
bail!("No interface found for ip: {:?}", ip);
}
// Mainly from https://github.com/shellrow/default-net/blob/cf7ca24e7e6e8e566ed32346c9cfddab3f47e2d6/src/interface/shared.rs#L4
fn get_ipaddr_by_peer<A: ToSocketAddrs>(peer: A) -> Option<IpAddr> {
let socket = match UdpSocket::bind("0.0.0.0:0") {
Ok(s) => s,
Err(_) => return None,
};
match socket.connect(peer) {
Ok(()) => (),
Err(_) => return None,
};
match socket.local_addr() {
Ok(addr) => return Some(addr.ip()),
Err(_) => return None,
};
}
fn create_broadcast_sockets() -> ResultType<Vec<UdpSocket>> {
let mut sockets = Vec::new();
for v4_addr in get_all_ipv4s()? {
if v4_addr.is_private() {
let s = UdpSocket::bind(SocketAddr::from((v4_addr, 0)))?;
s.set_broadcast(true)?;
log::debug!("Bind socket to {}", &v4_addr);
sockets.push(s)
}
}
Ok(sockets)
}
fn send_query() -> ResultType<Vec<UdpSocket>> {
let sockets = create_broadcast_sockets()?;
if sockets.is_empty() {
bail!("Found no ipv4 addresses");
}
let mut msg_out = Message::new();
let peer = PeerDiscovery {
cmd: "ping".to_owned(),
..Default::default()
};
msg_out.set_peer_discovery(peer);
let maddr = SocketAddr::from(([255, 255, 255, 255], get_broadcast_port()));
for socket in &sockets {
socket.send_to(&msg_out.write_to_bytes()?, maddr)?;
}
log::info!("discover ping sent");
Ok(sockets)
}
fn wait_response(
socket: UdpSocket,
timeout: Option<std::time::Duration>,
tx: UnboundedSender<config::DiscoveryPeer>,
) -> ResultType<()> {
let mut last_recv_time = Instant::now();
socket.set_read_timeout(timeout)?;
loop {
let mut buf = [0; 2048];
if let Ok((len, addr)) = socket.recv_from(&mut buf) {
if let Ok(msg_in) = Message::parse_from_bytes(&buf[0..len]) {
match msg_in.union {
Some(rendezvous_message::Union::PeerDiscovery(p)) => {
last_recv_time = Instant::now();
if p.cmd == "pong" {
let mac = if let Some(self_addr) = get_ipaddr_by_peer(&addr) {
get_mac(&self_addr)
} else {
"".to_owned()
};
if mac != p.mac {
allow_err!(tx.send(config::DiscoveryPeer {
id: p.id.clone(),
ip_mac: HashMap::from([
(addr.ip().to_string(), p.mac.clone(),)
]),
username: p.username.clone(),
hostname: p.hostname.clone(),
platform: p.platform.clone(),
online: true,
}));
}
}
}
_ => {}
}
}
}
if last_recv_time.elapsed().as_millis() > 3_000 {
break;
}
}
Ok(())
}
fn spawn_wait_responses(sockets: Vec<UdpSocket>) -> UnboundedReceiver<config::DiscoveryPeer> {
let (tx, rx) = unbounded_channel::<_>();
for socket in sockets {
let tx_clone = tx.clone();
std::thread::spawn(move || {
allow_err!(wait_response(
socket,
Some(std::time::Duration::from_millis(10)),
tx_clone
));
});
}
rx
}
async fn handle_received_peers(mut rx: UnboundedReceiver<config::DiscoveryPeer>) -> ResultType<()> {
let mut peers = config::LanPeers::load().peers;
peers.iter_mut().for_each(|peer| {
peer.online = false;
});
let mut response_set = HashSet::new();
let mut last_write_time = Instant::now() - std::time::Duration::from_secs(4);
loop {
tokio::select! {
data = rx.recv() => match data {
Some(mut peer) => {
let in_response_set = !response_set.insert(peer.id.clone());
if let Some(pos) = peers.iter().position(|x| x.is_same_peer(&peer) ) {
let peer1 = peers.remove(pos);
if in_response_set {
peer.ip_mac.extend(peer1.ip_mac);
peer.online = true;
}
}
peers.insert(0, peer);
if last_write_time.elapsed().as_millis() > 300 {
config::LanPeers::store(&peers);
last_write_time = Instant::now();
}
}
None => {
break
}
}
}
}
config::LanPeers::store(&peers);
Ok(())
}

View File

@ -12,6 +12,8 @@ mod client;
#[cfg(not(any(target_os = "ios")))]
mod rendezvous_mediator;
#[cfg(not(any(target_os = "ios")))]
mod lan;
#[cfg(not(any(target_os = "ios")))]
pub use self::rendezvous_mediator::*;
/// cbindgen:ignore
pub mod common;

View File

@ -2,7 +2,7 @@ use crate::server::{check_zombie, new as new_server, ServerPtr};
use hbb_common::{
allow_err,
anyhow::bail,
config::{self, Config, REG_INTERVAL, RENDEZVOUS_PORT, RENDEZVOUS_TIMEOUT},
config::{Config, REG_INTERVAL, RENDEZVOUS_PORT, RENDEZVOUS_TIMEOUT},
futures::future::join_all,
log,
protobuf::Message as _,
@ -10,15 +10,13 @@ use hbb_common::{
sleep, socket_client,
tokio::{
self, select,
sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
time::{interval, Duration},
},
udp::FramedSocket,
AddrMangle, IntoTargetAddr, ResultType, TargetAddr,
};
use std::{
collections::{HashMap, HashSet},
net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket},
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
@ -66,7 +64,7 @@ impl RendezvousMediator {
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if crate::platform::is_installed() {
std::thread::spawn(move || {
allow_err!(lan_discovery());
allow_err!(super::lan::start_listening());
});
}
loop {
@ -542,274 +540,3 @@ async fn direct_server(server: ServerPtr) {
}
}
}
#[inline]
pub fn get_broadcast_port() -> u16 {
(RENDEZVOUS_PORT + 3) as _
}
pub fn get_mac(ip: &IpAddr) -> String {
#[cfg(not(any(target_os = "android", target_os = "ios")))]
if let Ok(mac) = get_mac_by_ip(ip) {
mac.to_string()
} else {
"".to_owned()
}
#[cfg(any(target_os = "android", target_os = "ios"))]
"".to_owned()
}
fn get_all_ipv4s() -> ResultType<Vec<Ipv4Addr>> {
let mut ipv4s = Vec::new();
for interface in default_net::get_interfaces() {
for ipv4 in &interface.ipv4 {
ipv4s.push(ipv4.addr.clone());
}
}
Ok(ipv4s)
}
fn get_mac_by_ip(ip: &IpAddr) -> ResultType<String> {
for interface in default_net::get_interfaces() {
match ip {
IpAddr::V4(local_ipv4) => {
if interface.ipv4.iter().any(|x| x.addr == *local_ipv4) {
if let Some(mac_addr) = interface.mac_addr {
return Ok(mac_addr.address());
}
}
}
IpAddr::V6(local_ipv6) => {
if interface.ipv6.iter().any(|x| x.addr == *local_ipv6) {
if let Some(mac_addr) = interface.mac_addr {
return Ok(mac_addr.address());
}
}
}
}
}
bail!("No interface found for ip: {:?}", ip);
}
// Mainly from https://github.com/shellrow/default-net/blob/cf7ca24e7e6e8e566ed32346c9cfddab3f47e2d6/src/interface/shared.rs#L4
pub fn get_ipaddr_by_peer<A: ToSocketAddrs>(peer: A) -> Option<IpAddr> {
let socket = match UdpSocket::bind("0.0.0.0:0") {
Ok(s) => s,
Err(_) => return None,
};
match socket.connect(peer) {
Ok(()) => (),
Err(_) => return None,
};
match socket.local_addr() {
Ok(addr) => return Some(addr.ip()),
Err(_) => return None,
};
}
fn lan_discovery() -> ResultType<()> {
let addr = SocketAddr::from(([0, 0, 0, 0], get_broadcast_port()));
let socket = std::net::UdpSocket::bind(addr)?;
socket.set_read_timeout(Some(std::time::Duration::from_millis(1000)))?;
log::info!("lan discovery listener started");
loop {
let mut buf = [0; 2048];
if let Ok((len, addr)) = socket.recv_from(&mut buf) {
if let Ok(msg_in) = Message::parse_from_bytes(&buf[0..len]) {
match msg_in.union {
Some(rendezvous_message::Union::PeerDiscovery(p)) => {
if p.cmd == "ping" {
if let Some(self_addr) = get_ipaddr_by_peer(&addr) {
let mut msg_out = Message::new();
let peer = PeerDiscovery {
cmd: "pong".to_owned(),
mac: get_mac(&self_addr),
id: Config::get_id(),
hostname: whoami::hostname(),
username: crate::platform::get_active_username(),
platform: whoami::platform().to_string(),
..Default::default()
};
msg_out.set_peer_discovery(peer);
socket.send_to(&msg_out.write_to_bytes()?, addr).ok();
}
}
}
_ => {}
}
}
}
}
}
fn create_broadcast_sockets() -> ResultType<Vec<UdpSocket>> {
let mut sockets = Vec::new();
for v4_addr in get_all_ipv4s()? {
if v4_addr.is_private() {
let s = UdpSocket::bind(SocketAddr::from((v4_addr, 0)))?;
s.set_broadcast(true)?;
log::debug!("Bind socket to {}", &v4_addr);
sockets.push(s)
}
}
Ok(sockets)
}
fn send_query() -> ResultType<Vec<UdpSocket>> {
let sockets = create_broadcast_sockets()?;
if sockets.is_empty() {
bail!("Found no ipv4 addresses");
}
let mut msg_out = Message::new();
let peer = PeerDiscovery {
cmd: "ping".to_owned(),
..Default::default()
};
msg_out.set_peer_discovery(peer);
let maddr = SocketAddr::from(([255, 255, 255, 255], get_broadcast_port()));
for socket in &sockets {
socket.send_to(&msg_out.write_to_bytes()?, maddr)?;
}
log::info!("discover ping sent");
Ok(sockets)
}
fn wait_response(
socket: UdpSocket,
timeout: Option<std::time::Duration>,
tx: UnboundedSender<config::DiscoveryPeer>,
) -> ResultType<()> {
let mut last_recv_time = Instant::now();
socket.set_read_timeout(timeout)?;
loop {
let mut buf = [0; 2048];
if let Ok((len, addr)) = socket.recv_from(&mut buf) {
if let Ok(msg_in) = Message::parse_from_bytes(&buf[0..len]) {
match msg_in.union {
Some(rendezvous_message::Union::PeerDiscovery(p)) => {
last_recv_time = Instant::now();
if p.cmd == "pong" {
let mac = if let Some(self_addr) = get_ipaddr_by_peer(&addr) {
get_mac(&self_addr)
} else {
"".to_owned()
};
if mac != p.mac {
allow_err!(tx.send(config::DiscoveryPeer {
id: p.id.clone(),
ip_mac: HashMap::from([
(addr.ip().to_string(), p.mac.clone(),)
]),
username: p.username.clone(),
hostname: p.hostname.clone(),
platform: p.platform.clone(),
online: true,
}));
}
}
}
_ => {}
}
}
}
if last_recv_time.elapsed().as_millis() > 3_000 {
break;
}
}
Ok(())
}
fn spawn_wait_responses(sockets: Vec<UdpSocket>) -> UnboundedReceiver<config::DiscoveryPeer> {
let (tx, rx) = unbounded_channel::<_>();
for socket in sockets {
let tx_clone = tx.clone();
std::thread::spawn(move || {
allow_err!(wait_response(
socket,
Some(std::time::Duration::from_millis(10)),
tx_clone
));
});
}
rx
}
async fn handle_received_peers(mut rx: UnboundedReceiver<config::DiscoveryPeer>) -> ResultType<()> {
let mut peers = config::LanPeers::load().peers;
peers.iter_mut().for_each(|peer| {
peer.online = false;
});
let mut response_set = HashSet::new();
let mut last_write_time = Instant::now() - std::time::Duration::from_secs(4);
loop {
tokio::select! {
data = rx.recv() => match data {
Some(mut peer) => {
let in_response_set = !response_set.insert(peer.id.clone());
if let Some(pos) = peers.iter().position(|x| x.is_same_peer(&peer) ) {
let peer1 = peers.remove(pos);
if in_response_set {
peer.ip_mac.extend(peer1.ip_mac);
peer.online = true;
}
}
peers.insert(0, peer);
if last_write_time.elapsed().as_millis() > 300 {
config::LanPeers::store(&peers);
last_write_time = Instant::now();
}
}
None => {
break
}
}
}
}
config::LanPeers::store(&peers);
Ok(())
}
#[tokio::main(flavor = "current_thread")]
pub async fn discover() -> ResultType<()> {
let sockets = send_query()?;
let rx = spawn_wait_responses(sockets);
handle_received_peers(rx).await?;
log::info!("discover ping done");
Ok(())
}
pub fn send_wol(id: String) {
let interfaces = default_net::get_interfaces();
for peer in &config::LanPeers::load().peers {
if peer.id == id {
for (ip, mac) in peer.ip_mac.iter() {
if let Ok(mac_addr) = mac.parse() {
if let Ok(IpAddr::V4(ip)) = ip.parse() {
for interface in &interfaces {
for ipv4 in &interface.ipv4 {
if (u32::from(ipv4.addr) & u32::from(ipv4.netmask))
== (u32::from(ip) & u32::from(ipv4.netmask))
{
allow_err!(wol::send_wol(
mac_addr,
None,
Some(IpAddr::V4(ipv4.addr))
));
}
}
}
}
}
}
break;
}
}
}

View File

@ -548,7 +548,7 @@ impl UI {
}
fn send_wol(&mut self, id: String) {
crate::rendezvous_mediator::send_wol(id)
crate::lan::send_wol(id)
}
fn new_remote(&mut self, id: String, remote_type: String) {
@ -695,7 +695,7 @@ impl UI {
fn discover(&self) {
std::thread::spawn(move || {
allow_err!(crate::rendezvous_mediator::discover());
allow_err!(crate::lan::discover());
});
}