From 0f860f712f86ef42c5ce6801faa1d95eff7020a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabian=20Gr=C3=BCnbichler?= Date: Mon, 11 Jan 2021 09:51:21 +0100 Subject: [PATCH] tokio 1.0: update to new tokio-openssl interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit connect/accept are now happening on pinned SslStreams Signed-off-by: Fabian Grünbichler --- src/bin/proxmox-backup-proxy.rs | 27 ++++++++++++++++++++++----- src/server/rest.rs | 4 ++-- src/tools/async_io.rs | 2 +- src/tools/http.rs | 11 +++++------ 4 files changed, 30 insertions(+), 14 deletions(-) diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 16450244d..c8eb237c0 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -167,7 +167,7 @@ fn accept_connections( mut listener: tokio::net::TcpListener, acceptor: Arc, debug: bool, -) -> tokio::sync::mpsc::Receiver, Error>> { +) -> tokio::sync::mpsc::Receiver>>, Error>> { const MAX_PENDING_ACCEPTS: usize = 1024; @@ -185,7 +185,24 @@ fn accept_connections( sock.set_nodelay(true).unwrap(); let _ = set_tcp_keepalive(sock.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); let acceptor = Arc::clone(&acceptor); - let mut sender = sender.clone(); + + let ssl = match openssl::ssl::Ssl::new(acceptor.context()) { + Ok(ssl) => ssl, + Err(err) => { + eprintln!("failed to create Ssl object from Acceptor context - {}", err); + continue; + }, + }; + let stream = match tokio_openssl::SslStream::new(ssl, sock) { + Ok(stream) => stream, + Err(err) => { + eprintln!("failed to create SslStream using ssl and connection socket - {}", err); + continue; + }, + }; + + let mut stream = Box::pin(stream); + let sender = sender.clone(); if Arc::strong_count(&accept_counter) > MAX_PENDING_ACCEPTS { eprintln!("connection rejected - to many open connections"); @@ -195,13 +212,13 @@ fn accept_connections( let accept_counter = accept_counter.clone(); tokio::spawn(async move { let accept_future = tokio::time::timeout( - Duration::new(10, 0), tokio_openssl::accept(&acceptor, sock)); + Duration::new(10, 0), stream.as_mut().accept()); let result = accept_future.await; match result { - Ok(Ok(connection)) => { - if let Err(_) = sender.send(Ok(connection)).await { + Ok(Ok(())) => { + if let Err(_) = sender.send(Ok(stream)).await { if debug { eprintln!("detect closed connection channel"); } diff --git a/src/server/rest.rs b/src/server/rest.rs index 04bdc5f98..c30d1c920 100644 --- a/src/server/rest.rs +++ b/src/server/rest.rs @@ -65,7 +65,7 @@ impl RestServer { } } -impl tower_service::Service<&tokio_openssl::SslStream> for RestServer { +impl tower_service::Service<&Pin>>> for RestServer { type Response = ApiService; type Error = Error; type Future = Pin> + Send>>; @@ -74,7 +74,7 @@ impl tower_service::Service<&tokio_openssl::SslStream> fo Poll::Ready(Ok(())) } - fn call(&mut self, ctx: &tokio_openssl::SslStream) -> Self::Future { + fn call(&mut self, ctx: &Pin>>) -> Self::Future { match ctx.get_ref().peer_addr() { Err(err) => { future::err(format_err!("unable to get peer address - {}", err)).boxed() diff --git a/src/tools/async_io.rs b/src/tools/async_io.rs index 3a5a6c9aa..997c02fad 100644 --- a/src/tools/async_io.rs +++ b/src/tools/async_io.rs @@ -74,7 +74,7 @@ impl AsyncWrite for EitherStream, + Pin>>, > { fn connected(&self) -> hyper::client::connect::Connected { match self { diff --git a/src/tools/http.rs b/src/tools/http.rs index 130aa3819..47d6e1f6d 100644 --- a/src/tools/http.rs +++ b/src/tools/http.rs @@ -3,6 +3,7 @@ use lazy_static::lazy_static; use std::task::{Context, Poll}; use std::os::unix::io::AsRawFd; use std::collections::HashMap; +use std::pin::Pin; use hyper::{Uri, Body}; use hyper::client::{Client, HttpConnector}; @@ -101,7 +102,7 @@ impl HttpsConnector { type MaybeTlsStream = EitherStream< tokio::net::TcpStream, - tokio_openssl::SslStream, + Pin>>, >; impl hyper::service::Service for HttpsConnector { @@ -123,10 +124,6 @@ impl hyper::service::Service for HttpsConnector { .scheme() .ok_or_else(|| format_err!("missing URL scheme"))? == "https"; - let host = dst - .host() - .ok_or_else(|| format_err!("missing hostname in destination url?"))? - .to_string(); let config = this.ssl_connector.configure(); let dst_str = dst.to_string(); // for error messages @@ -139,7 +136,9 @@ impl hyper::service::Service for HttpsConnector { let _ = set_tcp_keepalive(conn.as_raw_fd(), PROXMOX_BACKUP_TCP_KEEPALIVE_TIME); if is_https { - let conn = tokio_openssl::connect(config?, &host, conn).await?; + let conn: tokio_openssl::SslStream = tokio_openssl::SslStream::new(config?.into_ssl(&dst_str)?, conn)?; + let mut conn = Box::pin(conn); + conn.as_mut().connect().await?; Ok(MaybeTlsStream::Right(conn)) } else { Ok(MaybeTlsStream::Left(conn))