Fix/ios query onlines (#9134)

* fix: ios query onlines

Signed-off-by: fufesou <linlong1266@gmail.com>

* comments

Signed-off-by: fufesou <linlong1266@gmail.com>

---------

Signed-off-by: fufesou <linlong1266@gmail.com>
This commit is contained in:
fufesou 2024-08-21 18:21:25 +08:00 committed by GitHub
parent e3cce2824d
commit f300d797e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 152 additions and 144 deletions

View File

@ -3407,3 +3407,153 @@ async fn hc_connection_(
}
Ok(())
}
pub mod peer_online {
use hbb_common::{
anyhow::bail,
config::{Config, CONNECT_TIMEOUT, READ_TIMEOUT},
log,
rendezvous_proto::*,
sleep,
socket_client::connect_tcp,
tcp::FramedStream,
ResultType,
};
use std::time::Instant;
pub async fn query_online_states<F: FnOnce(Vec<String>, Vec<String>)>(ids: Vec<String>, f: F) {
let test = false;
if test {
sleep(1.5).await;
let mut onlines = ids;
let offlines = onlines.drain((onlines.len() / 2)..).collect();
f(onlines, offlines)
} else {
let query_begin = Instant::now();
let query_timeout = std::time::Duration::from_millis(3_000);
loop {
match query_online_states_(&ids, query_timeout).await {
Ok((onlines, offlines)) => {
f(onlines, offlines);
break;
}
Err(e) => {
log::debug!("{}", &e);
}
}
if query_begin.elapsed() > query_timeout {
log::debug!(
"query onlines timeout {:?} ({:?})",
query_begin.elapsed(),
query_timeout
);
break;
}
sleep(1.5).await;
}
}
}
async fn create_online_stream() -> ResultType<FramedStream> {
let (rendezvous_server, _servers, _contained) =
crate::get_rendezvous_server(READ_TIMEOUT).await;
let tmp: Vec<&str> = rendezvous_server.split(":").collect();
if tmp.len() != 2 {
bail!("Invalid server address: {}", rendezvous_server);
}
let port: u16 = tmp[1].parse()?;
if port == 0 {
bail!("Invalid server address: {}", rendezvous_server);
}
let online_server = format!("{}:{}", tmp[0], port - 1);
connect_tcp(online_server, CONNECT_TIMEOUT).await
}
async fn query_online_states_(
ids: &Vec<String>,
timeout: std::time::Duration,
) -> ResultType<(Vec<String>, Vec<String>)> {
let query_begin = Instant::now();
let mut msg_out = RendezvousMessage::new();
msg_out.set_online_request(OnlineRequest {
id: Config::get_id(),
peers: ids.clone(),
..Default::default()
});
loop {
let mut socket = match create_online_stream().await {
Ok(s) => s,
Err(e) => {
log::debug!("Failed to create peers online stream, {e}");
return Ok((vec![], ids.clone()));
}
};
// TODO: Use long connections to avoid socket creation
// If we use a Arc<Mutex<Option<FramedStream>>> to hold and reuse the previous socket,
// we may face the following error:
// An established connection was aborted by the software in your host machine. (os error 10053)
if let Err(e) = socket.send(&msg_out).await {
log::debug!("Failed to send peers online states query, {e}");
return Ok((vec![], ids.clone()));
}
if let Some(msg_in) =
crate::common::get_next_nonkeyexchange_msg(&mut socket, None).await
{
match msg_in.union {
Some(rendezvous_message::Union::OnlineResponse(online_response)) => {
let states = online_response.states;
let mut onlines = Vec::new();
let mut offlines = Vec::new();
for i in 0..ids.len() {
// bytes index from left to right
let bit_value = 0x01 << (7 - i % 8);
if (states[i / 8] & bit_value) == bit_value {
onlines.push(ids[i].clone());
} else {
offlines.push(ids[i].clone());
}
}
return Ok((onlines, offlines));
}
_ => {
// ignore
}
}
} else {
// TODO: Make sure socket closed?
bail!("Online stream receives None");
}
if query_begin.elapsed() > timeout {
bail!("Try query onlines timeout {:?}", &timeout);
}
sleep(300.0).await;
}
}
#[cfg(test)]
mod tests {
use hbb_common::tokio;
#[tokio::test]
async fn test_query_onlines() {
super::query_online_states(
vec![
"152183996".to_owned(),
"165782066".to_owned(),
"155323351".to_owned(),
"460952777".to_owned(),
],
|onlines: Vec<String>, offlines: Vec<String>| {
println!("onlines: {:?}, offlines: {:?}", &onlines, &offlines);
},
)
.await;
}
}
}

View File

@ -2093,8 +2093,7 @@ pub(super) mod async_tasks {
ids = rx_onlines.recv() => {
match ids {
Some(_ids) => {
#[cfg(not(any(target_os = "ios")))]
crate::rendezvous_mediator::query_online_states(_ids, handle_query_onlines).await
crate::client::peer_online::query_online_states(_ids, handle_query_onlines).await
}
None => {
break;

View File

@ -12,10 +12,7 @@ use uuid::Uuid;
use hbb_common::{
allow_err,
anyhow::{self, bail},
config::{
self, keys::*, option2bool, Config, CONNECT_TIMEOUT, READ_TIMEOUT, REG_INTERVAL,
RENDEZVOUS_PORT,
},
config::{self, keys::*, option2bool, Config, CONNECT_TIMEOUT, REG_INTERVAL, RENDEZVOUS_PORT},
futures::future::join_all,
log,
protobuf::Message as _,
@ -703,123 +700,6 @@ async fn direct_server(server: ServerPtr) {
}
}
pub async fn query_online_states<F: FnOnce(Vec<String>, Vec<String>)>(ids: Vec<String>, f: F) {
let test = false;
if test {
sleep(1.5).await;
let mut onlines = ids;
let offlines = onlines.drain((onlines.len() / 2)..).collect();
f(onlines, offlines)
} else {
let query_begin = Instant::now();
let query_timeout = std::time::Duration::from_millis(3_000);
loop {
if SHOULD_EXIT.load(Ordering::SeqCst) {
break;
}
match query_online_states_(&ids, query_timeout).await {
Ok((onlines, offlines)) => {
f(onlines, offlines);
break;
}
Err(e) => {
log::debug!("{}", &e);
}
}
if query_begin.elapsed() > query_timeout {
log::debug!(
"query onlines timeout {:?} ({:?})",
query_begin.elapsed(),
query_timeout
);
break;
}
sleep(1.5).await;
}
}
}
async fn create_online_stream() -> ResultType<FramedStream> {
let (rendezvous_server, _servers, _contained) =
crate::get_rendezvous_server(READ_TIMEOUT).await;
let tmp: Vec<&str> = rendezvous_server.split(":").collect();
if tmp.len() != 2 {
bail!("Invalid server address: {}", rendezvous_server);
}
let port: u16 = tmp[1].parse()?;
if port == 0 {
bail!("Invalid server address: {}", rendezvous_server);
}
let online_server = format!("{}:{}", tmp[0], port - 1);
connect_tcp(online_server, CONNECT_TIMEOUT).await
}
async fn query_online_states_(
ids: &Vec<String>,
timeout: std::time::Duration,
) -> ResultType<(Vec<String>, Vec<String>)> {
let query_begin = Instant::now();
let mut msg_out = RendezvousMessage::new();
msg_out.set_online_request(OnlineRequest {
id: Config::get_id(),
peers: ids.clone(),
..Default::default()
});
loop {
if SHOULD_EXIT.load(Ordering::SeqCst) {
// No need to care about onlines
return Ok((Vec::new(), Vec::new()));
}
let mut socket = match create_online_stream().await {
Ok(s) => s,
Err(e) => {
log::debug!("Failed to create peers online stream, {e}");
return Ok((vec![], ids.clone()));
}
};
if let Err(e) = socket.send(&msg_out).await {
log::debug!("Failed to send peers online states query, {e}");
return Ok((vec![], ids.clone()));
}
if let Some(msg_in) = crate::common::get_next_nonkeyexchange_msg(&mut socket, None).await {
match msg_in.union {
Some(rendezvous_message::Union::OnlineResponse(online_response)) => {
let states = online_response.states;
let mut onlines = Vec::new();
let mut offlines = Vec::new();
for i in 0..ids.len() {
// bytes index from left to right
let bit_value = 0x01 << (7 - i % 8);
if (states[i / 8] & bit_value) == bit_value {
onlines.push(ids[i].clone());
} else {
offlines.push(ids[i].clone());
}
}
return Ok((onlines, offlines));
}
_ => {
// ignore
}
}
} else {
// TODO: Make sure socket closed?
bail!("Online stream receives None");
}
if query_begin.elapsed() > timeout {
bail!("Try query onlines timeout {:?}", &timeout);
}
sleep(300.0).await;
}
}
enum Sink<'a> {
Framed(&'a mut FramedSocket, &'a TargetAddr<'a>),
Stream(&'a mut FramedStream),
@ -833,24 +713,3 @@ impl Sink<'_> {
}
}
}
#[cfg(test)]
mod tests {
use hbb_common::tokio;
#[tokio::test]
async fn test_query_onlines() {
super::query_online_states(
vec![
"152183996".to_owned(),
"165782066".to_owned(),
"155323351".to_owned(),
"460952777".to_owned(),
],
|onlines: Vec<String>, offlines: Vec<String>| {
println!("onlines: {:?}, offlines: {:?}", &onlines, &offlines);
},
)
.await;
}
}