diff --git a/examples/h2client.rs b/examples/h2client.rs index 87a6e3262..8551af873 100644 --- a/examples/h2client.rs +++ b/examples/h2client.rs @@ -2,7 +2,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use anyhow::{Error}; +use anyhow::Error; use futures::future::TryFutureExt; use futures::stream::Stream; use tokio::net::TcpStream; @@ -38,11 +38,11 @@ impl Future for Process { this.body.flow_control().release_capacity(chunk.len())?; this.bytes += chunk.len(); // println!("GOT FRAME {}", chunk.len()); - }, + } Some(Err(err)) => return Poll::Ready(Err(Error::from(err))), None => { this.trailers = true; - }, + } } } } @@ -52,7 +52,6 @@ impl Future for Process { fn send_request( mut client: h2::client::SendRequest, ) -> impl Future> { - println!("sending request"); let request = http::Request::builder() @@ -62,11 +61,11 @@ fn send_request( let (response, _stream) = client.send_request(request, true).unwrap(); - response - .map_err(Error::from) - .and_then(|response| { - Process { body: response.into_body(), trailers: false, bytes: 0 } - }) + response.map_err(Error::from).and_then(|response| Process { + body: response.into_body(), + trailers: false, + bytes: 0, + }) } fn main() -> Result<(), Error> { @@ -74,16 +73,15 @@ fn main() -> Result<(), Error> { } async fn run() -> Result<(), Error> { - let start = std::time::SystemTime::now(); - let conn = TcpStream::connect(std::net::SocketAddr::from(([127,0,0,1], 8008))) - .await?; + let conn = TcpStream::connect(std::net::SocketAddr::from(([127, 0, 0, 1], 8008))).await?; + conn.set_nodelay(true).unwrap(); let (client, h2) = h2::client::Builder::new() - .initial_connection_window_size(1024*1024*1024) - .initial_window_size(1024*1024*1024) - .max_frame_size(4*1024*1024) + .initial_connection_window_size(1024 * 1024 * 1024) + .initial_window_size(1024 * 1024 * 1024) + .max_frame_size(4 * 1024 * 1024) .handshake(conn) .await?; @@ -99,10 +97,13 @@ async fn run() -> Result<(), Error> { } let elapsed = start.elapsed().unwrap(); - let elapsed = (elapsed.as_secs() as f64) + - (elapsed.subsec_millis() as f64)/1000.0; + let elapsed = (elapsed.as_secs() as f64) + (elapsed.subsec_millis() as f64) / 1000.0; - println!("Downloaded {} bytes, {} MB/s", bytes, (bytes as f64)/(elapsed*1024.0*1024.0)); + println!( + "Downloaded {} bytes, {} MB/s", + bytes, + (bytes as f64) / (elapsed * 1024.0 * 1024.0) + ); Ok(()) } diff --git a/examples/h2s-client.rs b/examples/h2s-client.rs index 0d6beaef1..a3b0d22cd 100644 --- a/examples/h2s-client.rs +++ b/examples/h2s-client.rs @@ -5,6 +5,7 @@ use std::task::{Context, Poll}; use anyhow::{format_err, Error}; use futures::future::TryFutureExt; use futures::stream::Stream; +use tokio::net::TcpStream; // Simple H2 client to test H2 download speed using h2s-server.rs @@ -37,11 +38,11 @@ impl Future for Process { this.body.flow_control().release_capacity(chunk.len())?; this.bytes += chunk.len(); // println!("GOT FRAME {}", chunk.len()); - }, + } Some(Err(err)) => return Poll::Ready(Err(Error::from(err))), None => { this.trailers = true; - }, + } } } } @@ -60,11 +61,11 @@ fn send_request( let (response, _stream) = client.send_request(request, true).unwrap(); - response - .map_err(Error::from) - .and_then(|response| { - Process { body: response.into_body(), trailers: false, bytes: 0 } - }) + response.map_err(Error::from).and_then(|response| Process { + body: response.into_body(), + trailers: false, + bytes: 0, + }) } fn main() -> Result<(), Error> { @@ -74,57 +75,51 @@ fn main() -> Result<(), Error> { async fn run() -> Result<(), Error> { let start = std::time::SystemTime::now(); - let conn = - tokio::net::TcpStream::connect(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?; - + let conn = TcpStream::connect(std::net::SocketAddr::from(([127, 0, 0, 1], 8008))).await?; conn.set_nodelay(true).unwrap(); - conn.set_recv_buffer_size(1024*1024).unwrap(); use openssl::ssl::{SslConnector, SslMethod}; let mut ssl_connector_builder = SslConnector::builder(SslMethod::tls()).unwrap(); ssl_connector_builder.set_verify(openssl::ssl::SslVerifyMode::NONE); - let conn = - tokio_openssl::connect( - ssl_connector_builder.build().configure()?, - "localhost", - conn, - ) + let ssl = ssl_connector_builder + .build() + .configure()? + .into_ssl("localhost")?; + + let conn = tokio_openssl::SslStream::new(ssl, conn)?; + let mut conn = Box::pin(conn); + conn.as_mut() + .connect() .await .map_err(|err| format_err!("connect failed - {}", err))?; let (client, h2) = h2::client::Builder::new() - .initial_connection_window_size(1024*1024*1024) - .initial_window_size(1024*1024*1024) - .max_frame_size(4*1024*1024) + .initial_connection_window_size(1024 * 1024 * 1024) + .initial_window_size(1024 * 1024 * 1024) + .max_frame_size(4 * 1024 * 1024) .handshake(conn) .await?; - // Spawn a task to run the conn... tokio::spawn(async move { - if let Err(e) = h2.await { - println!("GOT ERR={:?}", e); + if let Err(err) = h2.await { + println!("GOT ERR={:?}", err); } }); let mut bytes = 0; - for _ in 0..100 { - match send_request(client.clone()).await { - Ok(b) => { - bytes += b; - } - Err(e) => { - println!("ERROR {}", e); - return Ok(()); - } - } + for _ in 0..2000 { + bytes += send_request(client.clone()).await?; } let elapsed = start.elapsed().unwrap(); - let elapsed = (elapsed.as_secs() as f64) + - (elapsed.subsec_millis() as f64)/1000.0; + let elapsed = (elapsed.as_secs() as f64) + (elapsed.subsec_millis() as f64) / 1000.0; - println!("Downloaded {} bytes, {} MB/s", bytes, (bytes as f64)/(elapsed*1024.0*1024.0)); + println!( + "Downloaded {} bytes, {} MB/s", + bytes, + (bytes as f64) / (elapsed * 1024.0 * 1024.0) + ); Ok(()) } diff --git a/examples/h2s-server.rs b/examples/h2s-server.rs index 8481b8add..4357fe45b 100644 --- a/examples/h2s-server.rs +++ b/examples/h2s-server.rs @@ -2,14 +2,12 @@ use std::sync::Arc; use anyhow::{format_err, Error}; use futures::*; -use hyper::{Request, Response, Body}; -use openssl::ssl::{SslMethod, SslAcceptor, SslFiletype}; +use hyper::{Body, Request, Response}; +use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod}; use tokio::net::{TcpListener, TcpStream}; use proxmox_backup::configdir; -// Simple H2 server to test H2 speed with h2s-client.rs - fn main() -> Result<(), Error> { proxmox_backup::tools::runtime::main(run()) } @@ -19,38 +17,38 @@ async fn run() -> Result<(), Error> { let cert_path = configdir!("/proxy.pem"); let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); - acceptor.set_private_key_file(key_path, SslFiletype::PEM) + acceptor + .set_private_key_file(key_path, SslFiletype::PEM) .map_err(|err| format_err!("unable to read proxy key {} - {}", key_path, err))?; - acceptor.set_certificate_chain_file(cert_path) + acceptor + .set_certificate_chain_file(cert_path) .map_err(|err| format_err!("unable to read proxy cert {} - {}", cert_path, err))?; acceptor.check_private_key().unwrap(); let acceptor = Arc::new(acceptor.build()); - let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?; + let listener = TcpListener::bind(std::net::SocketAddr::from(([127, 0, 0, 1], 8008))).await?; println!("listening on {:?}", listener.local_addr()); loop { let (socket, _addr) = listener.accept().await?; - tokio::spawn(handle_connection(socket, Arc::clone(&acceptor)) - .map(|res| { - if let Err(err) = res { - eprintln!("Error: {}", err); - } - })); + tokio::spawn(handle_connection(socket, Arc::clone(&acceptor)).map(|res| { + if let Err(err) = res { + eprintln!("Error: {}", err); + } + })); } } -async fn handle_connection( - socket: TcpStream, - acceptor: Arc, -) -> Result<(), Error> { +async fn handle_connection(socket: TcpStream, acceptor: Arc) -> Result<(), Error> { socket.set_nodelay(true).unwrap(); - socket.set_send_buffer_size(1024*1024).unwrap(); - socket.set_recv_buffer_size(1024*1024).unwrap(); - let socket = tokio_openssl::accept(acceptor.as_ref(), socket).await?; + let ssl = openssl::ssl::Ssl::new(acceptor.context())?; + let stream = tokio_openssl::SslStream::new(ssl, socket)?; + let mut stream = Box::pin(stream); + + stream.as_mut().accept().await?; let mut http = hyper::server::conn::Http::new(); http.http2_only(true); @@ -61,7 +59,7 @@ async fn handle_connection( let service = hyper::service::service_fn(|_req: Request| { println!("Got request"); - let buffer = vec![65u8; 1024*1024]; // nonsense [A,A,A,A...] + let buffer = vec![65u8; 4 * 1024 * 1024]; // nonsense [A,A,A,A...] let body = Body::from(buffer); let response = Response::builder() @@ -72,7 +70,7 @@ async fn handle_connection( future::ok::<_, Error>(response) }); - http.serve_connection(socket, service) + http.serve_connection(stream, service) .map_err(Error::from) .await?; diff --git a/examples/h2server.rs b/examples/h2server.rs index 1669347f3..1b06557c9 100644 --- a/examples/h2server.rs +++ b/examples/h2server.rs @@ -1,51 +1,55 @@ -use anyhow::{Error}; +use anyhow::Error; use futures::*; +use hyper::{Body, Request, Response}; -// Simple H2 server to test H2 speed with h2client.rs - -use tokio::net::TcpListener; -use tokio::io::{AsyncRead, AsyncWrite}; - -use proxmox_backup::client::pipe_to_stream::PipeToSendStream; +use tokio::net::{TcpListener, TcpStream}; fn main() -> Result<(), Error> { proxmox_backup::tools::runtime::main(run()) } async fn run() -> Result<(), Error> { - let mut listener = TcpListener::bind(std::net::SocketAddr::from(([127,0,0,1], 8008))).await?; + let listener = TcpListener::bind(std::net::SocketAddr::from(([127, 0, 0, 1], 8008))).await?; println!("listening on {:?}", listener.local_addr()); loop { let (socket, _addr) = listener.accept().await?; - tokio::spawn(handle_connection(socket) - .map(|res| { - if let Err(err) = res { - eprintln!("Error: {}", err); - } - })); + tokio::spawn(handle_connection(socket).map(|res| { + if let Err(err) = res { + eprintln!("Error: {}", err); + } + })); } } -async fn handle_connection(socket: T) -> Result<(), Error> { - let mut conn = h2::server::handshake(socket).await?; +async fn handle_connection(socket: TcpStream) -> Result<(), Error> { + socket.set_nodelay(true).unwrap(); - println!("H2 connection bound"); + let mut http = hyper::server::conn::Http::new(); + http.http2_only(true); + // increase window size: todo - find optiomal size + let max_window_size = (1 << 31) - 2; + http.http2_initial_stream_window_size(max_window_size); + http.http2_initial_connection_window_size(max_window_size); - while let Some((request, mut respond)) = conn.try_next().await? { - println!("GOT request: {:?}", request); + let service = hyper::service::service_fn(|_req: Request| { + println!("Got request"); + let buffer = vec![65u8; 4 * 1024 * 1024]; // nonsense [A,A,A,A...] + let body = Body::from(buffer); - let response = http::Response::builder() + let response = Response::builder() .status(http::StatusCode::OK) - .body(()) + .header(http::header::CONTENT_TYPE, "application/octet-stream") + .body(body) .unwrap(); + future::ok::<_, Error>(response) + }); - let send = respond.send_response(response, false).unwrap(); - let data = vec![65u8; 1024*1024]; - PipeToSendStream::new(bytes::Bytes::from(data), send).await?; - println!("DATA SENT"); - } + http.serve_connection(socket, service) + .map_err(Error::from) + .await?; + println!("H2 connection CLOSE !"); Ok(()) }