rest-server: connection: clean up accept data flow
This adds the structs `AcceptState` and `AcceptFlags` and adapts relevant method signatures of `AcceptBuilder` accordingly. This makes it easier to add further parameters in the future. Signed-off-by: Max Carrara <m.carrara@proxmox.com>
This commit is contained in:
parent
9f33be3078
commit
0d3e7c8eaf
@ -255,6 +255,16 @@ impl From<(ClientSender, InsecureClientSender)> for Sender {
|
||||
}
|
||||
}
|
||||
|
||||
struct AcceptState {
|
||||
pub socket: InsecureClientStream,
|
||||
pub acceptor: Arc<Mutex<SslAcceptor>>,
|
||||
pub accept_counter: Arc<()>,
|
||||
}
|
||||
|
||||
struct AcceptFlags {
|
||||
pub is_debug: bool,
|
||||
}
|
||||
|
||||
impl AcceptBuilder {
|
||||
async fn accept_connections(
|
||||
self,
|
||||
@ -285,24 +295,26 @@ impl AcceptBuilder {
|
||||
continue;
|
||||
}
|
||||
|
||||
let state = AcceptState {
|
||||
socket,
|
||||
acceptor,
|
||||
accept_counter,
|
||||
};
|
||||
|
||||
let flags = AcceptFlags {
|
||||
is_debug: self.debug,
|
||||
};
|
||||
|
||||
match sender {
|
||||
Sender::Secure(ref secure_sender) => {
|
||||
let accept_future = Self::do_accept_tls(
|
||||
socket,
|
||||
acceptor,
|
||||
accept_counter,
|
||||
self.debug,
|
||||
secure_sender.clone(),
|
||||
);
|
||||
let accept_future = Self::do_accept_tls(state, flags, secure_sender.clone());
|
||||
|
||||
tokio::spawn(accept_future);
|
||||
}
|
||||
Sender::SecureAndInsecure(ref secure_sender, ref insecure_sender) => {
|
||||
let accept_future = Self::do_accept_tls_optional(
|
||||
socket,
|
||||
acceptor,
|
||||
accept_counter,
|
||||
self.debug,
|
||||
state,
|
||||
flags,
|
||||
secure_sender.clone(),
|
||||
insecure_sender.clone(),
|
||||
);
|
||||
@ -343,17 +355,11 @@ impl AcceptBuilder {
|
||||
Ok(socket)
|
||||
}
|
||||
|
||||
async fn do_accept_tls(
|
||||
socket: InsecureClientStream,
|
||||
acceptor: Arc<Mutex<SslAcceptor>>,
|
||||
accept_counter: Arc<()>,
|
||||
debug: bool,
|
||||
secure_sender: ClientSender,
|
||||
) {
|
||||
async fn do_accept_tls(state: AcceptState, flags: AcceptFlags, secure_sender: ClientSender) {
|
||||
let ssl = {
|
||||
// limit acceptor_guard scope
|
||||
// Acceptor can be reloaded using the command socket "reload-certificate" command
|
||||
let acceptor_guard = acceptor.lock().unwrap();
|
||||
let acceptor_guard = state.acceptor.lock().unwrap();
|
||||
|
||||
match openssl::ssl::Ssl::new(acceptor_guard.context()) {
|
||||
Ok(ssl) => ssl,
|
||||
@ -364,7 +370,7 @@ impl AcceptBuilder {
|
||||
}
|
||||
};
|
||||
|
||||
let secure_stream = match tokio_openssl::SslStream::new(ssl, socket) {
|
||||
let secure_stream = match tokio_openssl::SslStream::new(ssl, state.socket) {
|
||||
Ok(stream) => stream,
|
||||
Err(err) => {
|
||||
log::error!("failed to create SslStream using ssl and connection socket - {err}");
|
||||
@ -381,41 +387,39 @@ impl AcceptBuilder {
|
||||
|
||||
match result {
|
||||
Ok(Ok(())) => {
|
||||
if secure_sender.send(Ok(secure_stream)).await.is_err() && debug {
|
||||
if secure_sender.send(Ok(secure_stream)).await.is_err() && flags.is_debug {
|
||||
log::error!("detected closed connection channel");
|
||||
}
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
if debug {
|
||||
if flags.is_debug {
|
||||
log::error!("https handshake failed - {err}");
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
if debug {
|
||||
if flags.is_debug {
|
||||
log::error!("https handshake timeout");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(accept_counter); // decrease reference count
|
||||
drop(state.accept_counter); // decrease reference count
|
||||
}
|
||||
|
||||
async fn do_accept_tls_optional(
|
||||
socket: InsecureClientStream,
|
||||
acceptor: Arc<Mutex<SslAcceptor>>,
|
||||
accept_counter: Arc<()>,
|
||||
debug: bool,
|
||||
state: AcceptState,
|
||||
flags: AcceptFlags,
|
||||
secure_sender: ClientSender,
|
||||
insecure_sender: InsecureClientSender,
|
||||
) {
|
||||
let client_initiates_handshake = {
|
||||
#[cfg(feature = "rate-limited-stream")]
|
||||
let socket = socket.inner();
|
||||
let socket_ref = state.socket.inner();
|
||||
|
||||
#[cfg(not(feature = "rate-limited-stream"))]
|
||||
let socket = &socket;
|
||||
let socket_ref = &state.socket;
|
||||
|
||||
match Self::wait_for_client_tls_handshake(socket).await {
|
||||
match Self::wait_for_client_tls_handshake(socket_ref).await {
|
||||
Ok(initiates_handshake) => initiates_handshake,
|
||||
Err(err) => {
|
||||
log::error!("error checking for TLS handshake: {err}");
|
||||
@ -425,16 +429,16 @@ impl AcceptBuilder {
|
||||
};
|
||||
|
||||
if !client_initiates_handshake {
|
||||
let insecure_stream = Box::pin(socket);
|
||||
let insecure_stream = Box::pin(state.socket);
|
||||
|
||||
if insecure_sender.send(Ok(insecure_stream)).await.is_err() && debug {
|
||||
if insecure_sender.send(Ok(insecure_stream)).await.is_err() && flags.is_debug {
|
||||
log::error!("detected closed connection channel")
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
Self::do_accept_tls(socket, acceptor, accept_counter, debug, secure_sender).await
|
||||
Self::do_accept_tls(state, flags, secure_sender).await
|
||||
}
|
||||
|
||||
async fn wait_for_client_tls_handshake(incoming_stream: &TcpStream) -> Result<bool, Error> {
|
||||
|
Loading…
x
Reference in New Issue
Block a user