diff --git a/Cargo.toml b/Cargo.toml index f1d53e4b4..2f2c2a813 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ proxmox = { version = "0.11.5", features = [ "sortable-macro", "api-macro" ] } #proxmox = { git = "git://git.proxmox.com/git/proxmox", version = "0.1.2", features = [ "sortable-macro", "api-macro" ] } #proxmox = { path = "../proxmox/proxmox", features = [ "sortable-macro", "api-macro" ] } proxmox-fuse = "0.1.1" -proxmox-http = { version = "0.1.0", path = "../proxmox/proxmox-http", features = [ "http-helpers", "websocket" ] } +proxmox-http = { version = "0.1.0", path = "../proxmox/proxmox-http", features = [ "client", "http-helpers", "websocket" ] } pxar = { version = "0.10.1", features = [ "tokio-io" ] } #pxar = { path = "../pxar", features = [ "tokio-io" ] } regex = "1.2" diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 056f30e53..4c5484818 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -20,6 +20,9 @@ use proxmox::{ tools::fs::{file_get_json, replace_file, CreateOptions}, }; +use proxmox_http::http::client::HttpsConnector; +use proxmox_http::http::helpers::build_authority; + use super::pipe_to_stream::PipeToSendStream; use crate::api2::types::{Authid, Userid}; use crate::tools::{ @@ -27,10 +30,6 @@ use crate::tools::{ BroadcastFuture, DEFAULT_ENCODE_SET, PROXMOX_BACKUP_TCP_KEEPALIVE_TIME, - http::{ - build_authority, - HttpsConnector, - }, }; /// Timeout used for several HTTP operations that are expected to finish quickly but may block in diff --git a/src/tools.rs b/src/tools.rs index 8a1d0bc76..eb9a9fd0d 100644 --- a/src/tools.rs +++ b/src/tools.rs @@ -31,7 +31,6 @@ pub mod disks; pub mod format; pub mod fs; pub mod fuse_loop; -pub mod http; mod simple_http_client; pub use simple_http_client::SimpleHttp; diff --git a/src/tools/http.rs b/src/tools/http.rs index b99d26a15..dba8dc985 100644 --- a/src/tools/http.rs +++ b/src/tools/http.rs @@ -20,200 +20,3 @@ use tokio_openssl::SslStream; use proxmox::sys::linux::socket::set_tcp_keepalive; use proxmox_http::http::{MaybeTlsStream, ProxyConfig}; - -// Build a http::uri::Authority ("host:port"), use '[..]' around IPv6 addresses -pub(crate) fn build_authority(host: &str, port: u16) -> Result { - let bytes = host.as_bytes(); - let len = bytes.len(); - let authority = if len > 3 && bytes.contains(&b':') && bytes[0] != b'[' && bytes[len-1] != b']' { - format!("[{}]:{}", host, port).parse()? - } else { - format!("{}:{}", host, port).parse()? - }; - Ok(authority) -} - -#[derive(Clone)] -pub struct HttpsConnector { - connector: HttpConnector, - ssl_connector: Arc, - proxy: Option, - tcp_keepalive: u32, -} - -impl HttpsConnector { - pub fn with_connector(mut connector: HttpConnector, ssl_connector: SslConnector, tcp_keepalive: u32) -> Self { - connector.enforce_http(false); - Self { - connector, - ssl_connector: Arc::new(ssl_connector), - proxy: None, - tcp_keepalive, - } - } - - pub fn set_proxy(&mut self, proxy: ProxyConfig) { - self.proxy = Some(proxy); - } - - async fn secure_stream( - tcp_stream: TcpStream, - ssl_connector: &SslConnector, - host: &str, - ) -> Result, Error> { - let config = ssl_connector.configure()?; - let mut conn: SslStream = SslStream::new(config.into_ssl(host)?, tcp_stream)?; - Pin::new(&mut conn).connect().await?; - Ok(MaybeTlsStream::Secured(conn)) - } - - fn parse_status_line(status_line: &str) -> Result<(), Error> { - if !(status_line.starts_with("HTTP/1.1 200") || status_line.starts_with("HTTP/1.0 200")) { - bail!("proxy connect failed - invalid status: {}", status_line) - } - Ok(()) - } - - async fn parse_connect_response( - stream: &mut R, - ) -> Result<(), Error> { - - let mut data: Vec = Vec::new(); - let mut buffer = [0u8; 256]; - const END_MARK: &[u8; 4] = b"\r\n\r\n"; - - 'outer: loop { - let n = stream.read(&mut buffer[..]).await?; - if n == 0 { break; } - let search_start = if data.len() > END_MARK.len() { data.len() - END_MARK.len() + 1 } else { 0 }; - data.extend(&buffer[..n]); - if data.len() >= END_MARK.len() { - if let Some(pos) = data[search_start..].windows(END_MARK.len()).position(|w| w == END_MARK) { - let response = String::from_utf8_lossy(&data); - let status_line = match response.split("\r\n").next() { - Some(status) => status, - None => bail!("missing newline"), - }; - Self::parse_status_line(status_line)?; - - if pos != data.len() - END_MARK.len() { - bail!("unexpected data after connect response"); - } - break 'outer; - } - } - if data.len() > 1024*32 { // max 32K (random chosen limit) - bail!("too many bytes"); - } - } - Ok(()) - } -} - -impl hyper::service::Service for HttpsConnector { - type Response = MaybeTlsStream; - type Error = Error; - #[allow(clippy::type_complexity)] - type Future = Pin> + Send + 'static>>; - - fn poll_ready(&mut self, ctx: &mut Context<'_>) -> Poll> { - self.connector - .poll_ready(ctx) - .map_err(|err| err.into()) - } - - fn call(&mut self, dst: Uri) -> Self::Future { - let mut connector = self.connector.clone(); - let ssl_connector = Arc::clone(&self.ssl_connector); - let is_https = dst.scheme() == Some(&http::uri::Scheme::HTTPS); - let host = match dst.host() { - Some(host) => host.to_owned(), - None => { - return futures::future::err(format_err!("missing URL scheme")).boxed(); - } - }; - let port = dst.port_u16().unwrap_or(if is_https { 443 } else { 80 }); - let keepalive = self.tcp_keepalive; - - if let Some(ref proxy) = self.proxy { - - let use_connect = is_https || proxy.force_connect; - - let proxy_authority = match build_authority(&proxy.host, proxy.port) { - Ok(authority) => authority, - Err(err) => return futures::future::err(err).boxed(), - }; - - let proxy_uri = match Uri::builder() - .scheme("http") - .authority(proxy_authority.as_str()) - .path_and_query("/") - .build() - { - Ok(uri) => uri, - Err(err) => return futures::future::err(err.into()).boxed(), - }; - - let authorization = proxy.authorization.clone(); - - if use_connect { - async move { - - let mut tcp_stream = connector - .call(proxy_uri) - .await - .map_err(|err| format_err!("error connecting to {} - {}", proxy_authority, err))?; - - let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); - - let mut connect_request = format!("CONNECT {0}:{1} HTTP/1.1\r\n", host, port); - if let Some(authorization) = authorization { - connect_request.push_str(&format!( - "Proxy-Authorization: Basic {}\r\n", - base64::encode(authorization), - )); - } - connect_request.push_str(&format!("Host: {0}:{1}\r\n\r\n", host, port)); - - tcp_stream.write_all(connect_request.as_bytes()).await?; - tcp_stream.flush().await?; - - Self::parse_connect_response(&mut tcp_stream).await?; - - if is_https { - Self::secure_stream(tcp_stream, &ssl_connector, &host).await - } else { - Ok(MaybeTlsStream::Normal(tcp_stream)) - } - }.boxed() - } else { - async move { - let tcp_stream = connector - .call(proxy_uri) - .await - .map_err(|err| format_err!("error connecting to {} - {}", proxy_authority, err))?; - - let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); - - Ok(MaybeTlsStream::Proxied(tcp_stream)) - }.boxed() - } - } else { - async move { - let dst_str = dst.to_string(); // for error messages - let tcp_stream = connector - .call(dst) - .await - .map_err(|err| format_err!("error connecting to {} - {}", dst_str, err))?; - - let _ = set_tcp_keepalive(tcp_stream.as_raw_fd(), keepalive); - - if is_https { - Self::secure_stream(tcp_stream, &ssl_connector, &host).await - } else { - Ok(MaybeTlsStream::Normal(tcp_stream)) - } - }.boxed() - } - } -} diff --git a/src/tools/simple_http_client.rs b/src/tools/simple_http_client.rs index 729711c89..fa3eadf40 100644 --- a/src/tools/simple_http_client.rs +++ b/src/tools/simple_http_client.rs @@ -7,10 +7,12 @@ use http::{Request, Response, HeaderValue}; use openssl::ssl::{SslConnector, SslMethod}; use futures::*; -use proxmox_http::http::ProxyConfig; +use proxmox_http::http::{ + ProxyConfig, + client::HttpsConnector, +}; use crate::tools::PROXMOX_BACKUP_TCP_KEEPALIVE_TIME; -use crate::tools::http::HttpsConnector; /// Asyncrounous HTTP client implementation pub struct SimpleHttp {