Refact. Replace all tokio::time::interval() (#7173)

* Refact. Replace all `tokio::time::interval()`

Signed-off-by: fufesou <shuanglongchen@yeah.net>

* Refact Better min_interval for `ThrottledInterval`.

Signed-off-by: fufesou <shuanglongchen@yeah.net>

---------

Signed-off-by: fufesou <shuanglongchen@yeah.net>
This commit is contained in:
fufesou 2024-02-18 21:18:00 +08:00 committed by GitHub
parent 5fdcc748e1
commit 8c108065eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 153 additions and 39 deletions

View File

@ -30,7 +30,7 @@ use hbb_common::{
tokio::{
self,
sync::mpsc,
time::{self, Duration, Instant, Interval},
time::{self, Duration, Instant},
},
Stream,
};
@ -62,7 +62,7 @@ pub struct Remote<T: InvokeUiSession> {
read_jobs: Vec<fs::TransferJob>,
write_jobs: Vec<fs::TransferJob>,
remove_jobs: HashMap<i32, RemoveJob>,
timer: Interval,
timer: crate::RustDeskInterval,
last_update_jobs_status: (Instant, HashMap<i32, u64>),
is_connected: bool,
first_frame: bool,
@ -99,7 +99,7 @@ impl<T: InvokeUiSession> Remote<T> {
read_jobs: Vec::new(),
write_jobs: Vec::new(),
remove_jobs: Default::default(),
timer: time::interval(SEC30),
timer: crate::rustdesk_interval(time::interval(SEC30)),
last_update_jobs_status: (Instant::now(), Default::default()),
is_connected: false,
first_frame: false,
@ -170,7 +170,7 @@ impl<T: InvokeUiSession> Remote<T> {
#[cfg(any(target_os = "windows", target_os = "linux", target_os = "macos"))]
let mut rx_clip_client = rx_clip_client_lock.lock().await;
let mut status_timer = time::interval(Duration::new(1, 0));
let mut status_timer = crate::rustdesk_interval(time::interval(Duration::new(1, 0)));
let mut fps_instant = Instant::now();
loop {
@ -228,7 +228,7 @@ impl<T: InvokeUiSession> Remote<T> {
}
self.update_jobs_status();
} else {
self.timer = time::interval_at(Instant::now() + SEC30, SEC30);
self.timer = crate::rustdesk_interval(time::interval_at(Instant::now() + SEC30, SEC30));
}
}
_ = status_timer.tick() => {
@ -537,7 +537,7 @@ impl<T: InvokeUiSession> Remote<T> {
}
let total_size = job.total_size();
self.read_jobs.push(job);
self.timer = time::interval(MILLI1);
self.timer = crate::rustdesk_interval(time::interval(MILLI1));
allow_err!(
peer.send(&fs::new_receive(id, to, file_num, files, total_size))
.await
@ -597,7 +597,7 @@ impl<T: InvokeUiSession> Remote<T> {
);
job.is_last_job = true;
self.read_jobs.push(job);
self.timer = time::interval(MILLI1);
self.timer = crate::rustdesk_interval(time::interval(MILLI1));
}
}
}

View File

@ -1,6 +1,7 @@
use std::{
future::Future,
sync::{Arc, Mutex, RwLock},
task::Poll,
};
#[derive(Debug, Eq, PartialEq)]
@ -132,15 +133,20 @@ use hbb_common::{
bytes::Bytes,
compress::compress as compress_func,
config::{self, Config, CONNECT_TIMEOUT, READ_TIMEOUT},
futures_util::future::poll_fn,
get_version_number, log,
message_proto::*,
protobuf::Enum,
protobuf::Message as _,
protobuf::{Enum, Message as _},
rendezvous_proto::*,
socket_client,
sodiumoxide::crypto::{box_, secretbox, sign},
tcp::FramedStream,
timeout, tokio, ResultType,
timeout,
tokio::{
self,
time::{Duration, Instant, Interval},
},
ResultType,
};
// #[cfg(any(target_os = "android", target_os = "ios", feature = "cli"))]
use hbb_common::{config::RENDEZVOUS_PORT, futures::future::join_all};
@ -1335,3 +1341,118 @@ pub fn using_public_server() -> bool {
&& crate::get_custom_rendezvous_server(get_option("custom-rendezvous-server")).is_empty()
}
pub struct ThrottledInterval {
interval: Interval,
last_tick: Instant,
min_interval: Duration,
}
impl ThrottledInterval {
pub fn new(i: Interval) -> ThrottledInterval {
let period = i.period();
ThrottledInterval {
interval: i,
last_tick: Instant::now() - period * 2,
min_interval: Duration::from_secs_f64(period.as_secs_f64() * 0.9),
}
}
pub async fn tick(&mut self) -> Instant {
loop {
let instant = poll_fn(|cx| self.poll_tick(cx));
if let Some(instant) = instant.await {
return instant;
}
}
}
pub fn poll_tick(&mut self, cx: &mut std::task::Context<'_>) -> Poll<Option<Instant>> {
match self.interval.poll_tick(cx) {
Poll::Ready(instant) => {
if self.last_tick.elapsed() >= self.min_interval {
self.last_tick = Instant::now();
Poll::Ready(Some(instant))
} else {
// This call is required since tokio 1.27
cx.waker().wake_by_ref();
Poll::Ready(None)
}
}
Poll::Pending => {
Poll::Pending
},
}
}
}
pub type RustDeskInterval = ThrottledInterval;
#[inline]
pub fn rustdesk_interval(i: Interval) -> ThrottledInterval {
ThrottledInterval::new(i)
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{format::StrftimeItems, Local};
use hbb_common::tokio::{
self,
time::{interval, sleep, Duration},
};
use std::collections::HashSet;
#[tokio::test]
async fn test_tokio_time_interval() {
let mut timer = interval(Duration::from_secs(1));
let mut times = Vec::new();
sleep(Duration::from_secs(3)).await;
loop {
tokio::select! {
_ = timer.tick() => {
let format = StrftimeItems::new("%Y-%m-%d %H:%M:%S");
times.push(Local::now().format_with_items(format).to_string());
if times.len() == 5 {
break;
}
}
}
}
let times2: HashSet<String> = HashSet::from_iter(times.clone());
assert_eq!(times.len(), times2.len() + 3);
}
#[allow(non_snake_case)]
#[tokio::test]
async fn test_RustDesk_interval() {
let mut timer = rustdesk_interval(interval(Duration::from_secs(1)));
let mut times = Vec::new();
sleep(Duration::from_secs(3)).await;
loop {
tokio::select! {
_ = timer.tick() => {
let format = StrftimeItems::new("%Y-%m-%d %H:%M:%S");
times.push(Local::now().format_with_items(format).to_string());
if times.len() == 5 {
break;
}
}
}
}
let times2: HashSet<String> = HashSet::from_iter(times.clone());
assert_eq!(times.len(), times2.len());
}
#[test]
fn test_duration_multiplication() {
let dur = Duration::from_secs(1);
assert_eq!(dur * 2, Duration::from_secs(2));
assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.9), Duration::from_millis(900));
assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923), Duration::from_millis(923));
assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923 * 1e-3), Duration::from_micros(923));
assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923 * 1e-6), Duration::from_nanos(923));
assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923 * 1e-9), Duration::from_nanos(1));
assert_eq!(Duration::from_secs_f64(dur.as_secs_f64() * 0.923 * 1e-10), Duration::from_nanos(0));
}
}

View File

@ -51,7 +51,7 @@ pub struct StrategyOptions {
#[cfg(not(any(target_os = "ios")))]
#[tokio::main(flavor = "current_thread")]
async fn start_hbbs_sync_async() {
let mut interval = tokio::time::interval_at(Instant::now() + TIME_CONN, TIME_CONN);
let mut interval = crate::rustdesk_interval(tokio::time::interval_at(Instant::now() + TIME_CONN, TIME_CONN));
let mut last_sent: Option<Instant> = None;
let mut info_uploaded: (bool, String, Option<Instant>) = (false, "".to_owned(), None);
loop {

View File

@ -116,6 +116,7 @@ mod test {
host: "server.example.net".to_owned(),
key: "".to_owned(),
api: "".to_owned(),
relay: "".to_owned(),
}
);
assert_eq!(
@ -124,6 +125,7 @@ mod test {
host: "server.example.net".to_owned(),
key: "".to_owned(),
api: "".to_owned(),
relay: "".to_owned(),
}
);
// key in these tests is "foobar.,2" base64 encoded
@ -136,6 +138,7 @@ mod test {
host: "server.example.net".to_owned(),
key: "Zm9vYmFyLiwyCg==".to_owned(),
api: "abc".to_owned(),
relay: "".to_owned(),
}
);
assert_eq!(
@ -145,6 +148,7 @@ mod test {
host: "server.example.net".to_owned(),
key: "Zm9vYmFyLiwyCg==".to_owned(),
api: "".to_owned(),
relay: "".to_owned(),
}
);
}

View File

@ -152,8 +152,7 @@ impl RendezvousMediator {
keep_alive: DEFAULT_KEEP_ALIVE,
};
let mut timer = interval(TIMER_OUT);
let mut last_timer: Option<Instant> = None;
let mut timer = crate::rustdesk_interval(interval(TIMER_OUT));
const MIN_REG_TIMEOUT: i64 = 3_000;
const MAX_REG_TIMEOUT: i64 = 30_000;
let mut reg_timeout = MIN_REG_TIMEOUT;
@ -215,11 +214,6 @@ impl RendezvousMediator {
break;
}
let now = Some(Instant::now());
if last_timer.map(|x| x.elapsed() < TIMER_OUT).unwrap_or(false) {
// a workaround of tokio timer bug
continue;
}
last_timer = now;
let expired = last_register_resp.map(|x| x.elapsed().as_millis() as i64 >= REG_INTERVAL).unwrap_or(true);
let timeout = last_register_sent.map(|x| x.elapsed().as_millis() as i64 >= reg_timeout).unwrap_or(false);
// temporarily disable exponential backoff for android before we add wakeup trigger to force connect in android
@ -342,7 +336,7 @@ impl RendezvousMediator {
host_prefix: Self::get_host_prefix(&host),
keep_alive: DEFAULT_KEEP_ALIVE,
};
let mut timer = interval(TIMER_OUT);
let mut timer = crate::rustdesk_interval(interval(TIMER_OUT));
let mut last_register_sent: Option<Instant> = None;
let mut last_recv_msg = Instant::now();
// we won't support connecting to multiple rendzvous servers any more, so we can use a global variable here.

View File

@ -41,7 +41,7 @@ use hbb_common::{
tokio::{
net::TcpStream,
sync::mpsc,
time::{self, Duration, Instant, Interval},
time::{self, Duration, Instant},
},
tokio_util::codec::{BytesCodec, Framed},
};
@ -175,8 +175,8 @@ pub struct Connection {
server: super::ServerPtrWeak,
hash: Hash,
read_jobs: Vec<fs::TransferJob>,
timer: Interval,
file_timer: Interval,
timer: crate::RustDeskInterval,
file_timer: crate::RustDeskInterval,
file_transfer: Option<(String, bool)>,
port_forward_socket: Option<Framed<TcpStream, BytesCodec>>,
port_forward_address: String,
@ -327,8 +327,8 @@ impl Connection {
server,
hash,
read_jobs: Vec::new(),
timer: time::interval(SEC30),
file_timer: time::interval(SEC30),
timer: crate::rustdesk_interval(time::interval(SEC30)),
file_timer: crate::rustdesk_interval(time::interval(SEC30)),
file_transfer: None,
port_forward_socket: None,
port_forward_address: "".to_owned(),
@ -419,7 +419,7 @@ impl Connection {
if !conn.block_input {
conn.send_permission(Permission::BlockInput, false).await;
}
let mut test_delay_timer = time::interval(TEST_DELAY_TIMEOUT);
let mut test_delay_timer = crate::rustdesk_interval(time::interval(TEST_DELAY_TIMEOUT));
let mut last_recv_time = Instant::now();
conn.stream.set_send_timeout(
@ -432,7 +432,7 @@ impl Connection {
#[cfg(not(any(target_os = "android", target_os = "ios")))]
std::thread::spawn(move || Self::handle_input(_rx_input, tx_cloned));
let mut second_timer = time::interval(Duration::from_secs(1));
let mut second_timer = crate::rustdesk_interval(time::interval(Duration::from_secs(1)));
loop {
tokio::select! {
@ -608,7 +608,7 @@ impl Connection {
}
}
} else {
conn.file_timer = time::interval_at(Instant::now() + SEC30, SEC30);
conn.file_timer = crate::rustdesk_interval(time::interval_at(Instant::now() + SEC30, SEC30));
}
}
Ok(conns) = hbbs_rx.recv() => {
@ -2054,7 +2054,8 @@ impl Connection {
job.is_remote = true;
job.conn_id = self.inner.id();
self.read_jobs.push(job);
self.file_timer = time::interval(MILLI1);
self.file_timer =
crate::rustdesk_interval(time::interval(MILLI1));
self.post_file_audit(
FileAuditType::RemoteSend,
&s.path,

View File

@ -442,7 +442,8 @@ pub mod server {
match ipc::connect(1000, postfix).await {
Ok(mut stream) => {
let mut timer = tokio::time::interval(Duration::from_secs(1));
let mut timer =
crate::rustdesk_interval(tokio::time::interval(Duration::from_secs(1)));
let mut nack = 0;
loop {
tokio::select! {
@ -777,7 +778,7 @@ pub mod client {
tokio::spawn(async move {
let mut stream = Connection::new(stream);
let postfix = postfix.to_owned();
let mut timer = tokio::time::interval(Duration::from_secs(1));
let mut timer = crate::rustdesk_interval(tokio::time::interval(Duration::from_secs(1)));
let mut nack = 0;
let mut rx = rx_clone.lock().await;
loop {

View File

@ -173,7 +173,7 @@ async fn start_query_session_count(sender: std::sync::mpsc::Sender<Data>) {
let mut last_count = 0;
loop {
if let Ok(mut c) = crate::ipc::connect(1000, "").await {
let mut timer = tokio::time::interval(Duration::from_secs(1));
let mut timer = crate::rustdesk_interval(tokio::time::interval(Duration::from_secs(1)));
loop {
tokio::select! {
res = c.next() => {

View File

@ -1038,9 +1038,7 @@ async fn check_connect_status_(reconnect: bool, rx: mpsc::UnboundedReceiver<ipc:
loop {
if let Ok(mut c) = ipc::connect(1000, "").await {
const TIMER_OUT: time::Duration = time::Duration::from_secs(1);
let mut timer = time::interval(TIMER_OUT);
let mut last_timer = time::Instant::now() - TIMER_OUT * 2;
let mut timer = crate::rustdesk_interval(time::interval(time::Duration::from_secs(1)));
loop {
tokio::select! {
res = c.next() => {
@ -1108,11 +1106,6 @@ async fn check_connect_status_(reconnect: bool, rx: mpsc::UnboundedReceiver<ipc:
allow_err!(c.send(&data).await);
}
_ = timer.tick() => {
if last_timer.elapsed() < TIMER_OUT {
continue;
}
last_timer = time::Instant::now();
c.send(&ipc::Data::OnlineStatus(None)).await.ok();
c.send(&ipc::Data::Options(None)).await.ok();
c.send(&ipc::Data::Config(("id".to_owned(), None))).await.ok();