From ffb6434485158b216728a7fa8529ff4b36ed345a Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Tue, 27 Aug 2019 13:55:41 +0200 Subject: [PATCH] src/api2/reader.rs: switch to async Signed-off-by: Wolfgang Bumiller --- src/api2/reader.rs | 57 ++++++++++++++++++++++++++-------------------- 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/src/api2/reader.rs b/src/api2/reader.rs index 46d5152e0..99a245cf7 100644 --- a/src/api2/reader.rs +++ b/src/api2/reader.rs @@ -93,30 +93,37 @@ fn upgrade_to_backup_reader_protocol( let abort_future = worker.abort_future(); - let env3 = env.clone(); - - req_body + let req_fut = req_body .on_upgrade() .map_err(Error::from) - .and_then(move |conn| { - env3.debug("protocol upgrade done"); + .and_then({ + let env = env.clone(); + move |conn| { + env.debug("protocol upgrade done"); - let mut http = hyper::server::conn::Http::new(); - http.http2_only(true); - // increase window size: todo - find optiomal size - let window_size = 32*1024*1024; // max = (1 << 31) - 2 - http.http2_initial_stream_window_size(window_size); - http.http2_initial_connection_window_size(window_size); + let mut http = hyper::server::conn::Http::new(); + http.http2_only(true); + // increase window size: todo - find optiomal size + let window_size = 32*1024*1024; // max = (1 << 31) - 2 + http.http2_initial_stream_window_size(window_size); + http.http2_initial_connection_window_size(window_size); - http.serve_connection(conn, service) - .map_err(Error::from) - }) - .select(abort_future.map_err(|_| {}).then(move |_| { bail!("task aborted"); })) - .map_err(|(err, _)| err) - .and_then(move |(_result, _)| { - env.log("reader finished sucessfully"); - Ok(()) + http.serve_connection(conn, service) + .map_err(Error::from) + } + }); + let abort_future = abort_future + .map(|_| Err(format_err!("task aborted"))); + + use futures::future::Either; + futures::future::select(req_fut, abort_future) + .map(|res| match res { + Either::Left((Ok(res), _)) => Ok(res), + Either::Left((Err(err), _)) => Err(err), + Either::Right((Ok(res), _)) => Ok(res), + Either::Right((Err(err), _)) => Err(err), }) + .map_ok(move |_| env.log("reader finished sucessfully")) })?; let response = Response::builder() @@ -182,13 +189,13 @@ fn download_file( .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) .and_then(move |file| { env2.log(format!("download {:?}", path3)); - let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()). - map(|bytes| hyper::Chunk::from(bytes.freeze())); + let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) + .map_ok(|bytes| hyper::Chunk::from(bytes.freeze())); let body = Body::wrap_stream(payload); // fixme: set other headers ? - Ok(Response::builder() + futures::future::ok(Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "application/octet-stream") .body(body) @@ -229,13 +236,13 @@ fn download_chunk( .map_err(move |err| http_err!(BAD_REQUEST, format!("open file {:?} failed: {}", path2, err))) .and_then(move |file| { env2.debug(format!("download chunk {:?}", path3)); - let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()). - map(|bytes| hyper::Chunk::from(bytes.freeze())); + let payload = tokio::codec::FramedRead::new(file, tokio::codec::BytesCodec::new()) + .map_ok(|bytes| hyper::Chunk::from(bytes.freeze())); let body = Body::wrap_stream(payload); // fixme: set other headers ? - Ok(Response::builder() + futures::future::ok(Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "application/octet-stream") .body(body)