http: use Watcher of GracefulShutdown to avoid races

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller
2025-05-22 15:08:11 +02:00
parent 891a116520
commit 48fc0a2a07
2 changed files with 9 additions and 14 deletions

View File

@ -191,7 +191,7 @@ const SUBDIRS: SubdirMap = &[
const ROUTER: Router = Router::new()
.get(&list_subdirs_api_method!(SUBDIRS))
.subdirs(SUBDIRS);
e
async fn run() -> Result<(), Error> {
// we first have to configure the api environment (basedir etc.)
@ -214,15 +214,15 @@ async fn run() -> Result<(), Error> {
([127, 0, 0, 1], 65000).into(),
move |listener: TcpListener| {
Ok(async move {
let graceful = Arc::new(GracefulShutdown::new());
let graceful = GracefulShutdown::new();
loop {
let graceful2 = Arc::clone(&graceful);
tokio::select! {
incoming = listener.accept() => {
log::info!("accepted new connection!");
let (conn, _) = incoming?;
let api_service = rest_server.api_service(&conn)?;
tokio::spawn(async move { let res = api_service.serve(conn, Some(graceful2)).await; log::info!("connection finished: {res:?}") });
let watcher = graceful.watcher();
tokio::spawn(async move { let res = api_service.serve(conn, Some(watcher)).await; log::info!("connection finished: {res:?}") });
},
_shutdown = proxmox_daemon::shutdown_future() => {
log::info!("shutdown future triggered!");
@ -230,11 +230,8 @@ async fn run() -> Result<(), Error> {
}
}
}
log::info!("count {}", Arc::strong_count(&graceful));
if let Some(shutdown) = Arc::into_inner(graceful) {
log::info!("shutting down..");
shutdown.shutdown().await
}
log::info!("shutting down..");
shutdown.shutdown().await
Ok(())
})
},

View File

@ -17,7 +17,7 @@ use hyper::http::request::Parts;
use hyper::{Request, Response, StatusCode};
use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server::conn;
use hyper_util::server::graceful::GracefulShutdown;
use hyper_util::server::graceful;
use hyper_util::service::TowerToHyperService;
use regex::Regex;
use serde_json::Value;
@ -119,7 +119,7 @@ impl RedirectService {
pub async fn serve<S>(
self,
conn: S,
mut graceful: Option<Arc<GracefulShutdown>>,
mut graceful: Option<graceful::Watcher>,
) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
@ -130,7 +130,6 @@ impl RedirectService {
let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
if let Some(graceful) = graceful.take() {
let api_conn = graceful.watch(api_conn);
drop(graceful);
api_conn.await
} else {
api_conn.await
@ -240,7 +239,7 @@ impl ApiService {
pub async fn serve<S>(
self,
conn: S,
mut graceful: Option<Arc<GracefulShutdown>>,
mut graceful: Option<graceful::Watcher>,
) -> Result<(), Error>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
@ -251,7 +250,6 @@ impl ApiService {
let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
if let Some(graceful) = graceful.take() {
let api_conn = graceful.watch(api_conn);
drop(graceful);
api_conn.await
} else {
api_conn.await