http: use Watcher of GracefulShutdown to avoid races
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
@ -191,7 +191,7 @@ const SUBDIRS: SubdirMap = &[
|
||||
const ROUTER: Router = Router::new()
|
||||
.get(&list_subdirs_api_method!(SUBDIRS))
|
||||
.subdirs(SUBDIRS);
|
||||
|
||||
e
|
||||
async fn run() -> Result<(), Error> {
|
||||
// we first have to configure the api environment (basedir etc.)
|
||||
|
||||
@ -214,15 +214,15 @@ async fn run() -> Result<(), Error> {
|
||||
([127, 0, 0, 1], 65000).into(),
|
||||
move |listener: TcpListener| {
|
||||
Ok(async move {
|
||||
let graceful = Arc::new(GracefulShutdown::new());
|
||||
let graceful = GracefulShutdown::new();
|
||||
loop {
|
||||
let graceful2 = Arc::clone(&graceful);
|
||||
tokio::select! {
|
||||
incoming = listener.accept() => {
|
||||
log::info!("accepted new connection!");
|
||||
let (conn, _) = incoming?;
|
||||
let api_service = rest_server.api_service(&conn)?;
|
||||
tokio::spawn(async move { let res = api_service.serve(conn, Some(graceful2)).await; log::info!("connection finished: {res:?}") });
|
||||
let watcher = graceful.watcher();
|
||||
tokio::spawn(async move { let res = api_service.serve(conn, Some(watcher)).await; log::info!("connection finished: {res:?}") });
|
||||
},
|
||||
_shutdown = proxmox_daemon::shutdown_future() => {
|
||||
log::info!("shutdown future triggered!");
|
||||
@ -230,11 +230,8 @@ async fn run() -> Result<(), Error> {
|
||||
}
|
||||
}
|
||||
}
|
||||
log::info!("count {}", Arc::strong_count(&graceful));
|
||||
if let Some(shutdown) = Arc::into_inner(graceful) {
|
||||
log::info!("shutting down..");
|
||||
shutdown.shutdown().await
|
||||
}
|
||||
log::info!("shutting down..");
|
||||
shutdown.shutdown().await
|
||||
Ok(())
|
||||
})
|
||||
},
|
||||
|
@ -17,7 +17,7 @@ use hyper::http::request::Parts;
|
||||
use hyper::{Request, Response, StatusCode};
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo};
|
||||
use hyper_util::server::conn;
|
||||
use hyper_util::server::graceful::GracefulShutdown;
|
||||
use hyper_util::server::graceful;
|
||||
use hyper_util::service::TowerToHyperService;
|
||||
use regex::Regex;
|
||||
use serde_json::Value;
|
||||
@ -119,7 +119,7 @@ impl RedirectService {
|
||||
pub async fn serve<S>(
|
||||
self,
|
||||
conn: S,
|
||||
mut graceful: Option<Arc<GracefulShutdown>>,
|
||||
mut graceful: Option<graceful::Watcher>,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
@ -130,7 +130,6 @@ impl RedirectService {
|
||||
let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
|
||||
if let Some(graceful) = graceful.take() {
|
||||
let api_conn = graceful.watch(api_conn);
|
||||
drop(graceful);
|
||||
api_conn.await
|
||||
} else {
|
||||
api_conn.await
|
||||
@ -240,7 +239,7 @@ impl ApiService {
|
||||
pub async fn serve<S>(
|
||||
self,
|
||||
conn: S,
|
||||
mut graceful: Option<Arc<GracefulShutdown>>,
|
||||
mut graceful: Option<graceful::Watcher>,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||
@ -251,7 +250,6 @@ impl ApiService {
|
||||
let api_conn = api_conn.serve_connection_with_upgrades(io, api_service);
|
||||
if let Some(graceful) = graceful.take() {
|
||||
let api_conn = graceful.watch(api_conn);
|
||||
drop(graceful);
|
||||
api_conn.await
|
||||
} else {
|
||||
api_conn.await
|
||||
|
Reference in New Issue
Block a user