proxmox_async: rustfmt

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2022-02-02 12:55:01 +01:00
parent 807a70cecc
commit de891b1f76
7 changed files with 86 additions and 83 deletions

View File

@ -1,6 +1,6 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use std::sync::mpsc::Receiver;
use std::task::{Context, Poll};
use futures::stream::Stream;
@ -15,7 +15,7 @@ impl<T> Stream for StdChannelStream<T> {
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
match block_in_place(|| self.0.recv()) {
Ok(data) => Poll::Ready(Some(data)),
Err(_) => Poll::Ready(None),// channel closed
Err(_) => Poll::Ready(None), // channel closed
}
}
}

View File

@ -12,11 +12,12 @@ pub struct WrappedReaderStream<R: Read + Unpin> {
buffer: Vec<u8>,
}
impl <R: Read + Unpin> WrappedReaderStream<R> {
impl<R: Read + Unpin> WrappedReaderStream<R> {
pub fn new(reader: R) -> Self {
let mut buffer = Vec::with_capacity(64*1024);
unsafe { buffer.set_len(buffer.capacity()); }
let mut buffer = Vec::with_capacity(64 * 1024);
unsafe {
buffer.set_len(buffer.capacity());
}
Self { reader, buffer }
}
}
@ -49,9 +50,7 @@ mod test {
#[test]
fn test_wrapped_stream_reader() -> Result<(), Error> {
crate::runtime::main(async {
run_wrapped_stream_reader_test().await
})
crate::runtime::main(async { run_wrapped_stream_reader_test().await })
}
struct DummyReader(usize);

View File

@ -15,8 +15,7 @@ pub struct BroadcastData<T> {
listeners: Vec<oneshot::Sender<Result<T, Error>>>,
}
impl <T: Clone> BroadcastData<T> {
impl<T: Clone> BroadcastData<T> {
pub fn new() -> Self {
Self {
result: None,
@ -25,16 +24,19 @@ impl <T: Clone> BroadcastData<T> {
}
pub fn notify_listeners(&mut self, result: Result<T, String>) {
self.result = Some(result.clone());
loop {
match self.listeners.pop() {
None => { break; },
Some(ch) => {
match &result {
Ok(result) => { let _ = ch.send(Ok(result.clone())); },
Err(err) => { let _ = ch.send(Err(format_err!("{}", err))); },
None => {
break;
}
Some(ch) => match &result {
Ok(result) => {
let _ = ch.send(Ok(result.clone()));
}
Err(err) => {
let _ = ch.send(Err(format_err!("{}", err)));
}
},
}
@ -45,7 +47,7 @@ impl <T: Clone> BroadcastData<T> {
use futures::future::{ok, Either};
match &self.result {
None => {},
None => {}
Some(Ok(result)) => return Either::Left(ok(result.clone())),
Some(Err(err)) => return Either::Left(futures::future::err(format_err!("{}", err))),
}
@ -54,13 +56,11 @@ impl <T: Clone> BroadcastData<T> {
self.listeners.push(tx);
Either::Right(rx
.map(|res| match res {
Ok(Ok(t)) => Ok(t),
Ok(Err(e)) => Err(e),
Err(e) => Err(Error::from(e)),
})
)
Either::Right(rx.map(|res| match res {
Ok(Ok(t)) => Ok(t),
Ok(Err(e)) => Err(e),
Err(e) => Err(Error::from(e)),
}))
}
}
@ -85,40 +85,35 @@ impl<T: Clone + Send + 'static> BroadcastFuture<T> {
broadcast: BroadcastData::new(),
future: Some(Pin::from(source)),
};
Self { inner: Arc::new(Mutex::new(inner)) }
Self {
inner: Arc::new(Mutex::new(inner)),
}
}
/// Creates a new instance with a oneshot channel as trigger
pub fn new_oneshot() -> (Self, oneshot::Sender<Result<T, Error>>) {
let (tx, rx) = oneshot::channel::<Result<T, Error>>();
let rx = rx
.map_err(Error::from)
.and_then(futures::future::ready);
let rx = rx.map_err(Error::from).and_then(futures::future::ready);
(Self::new(Box::new(rx)), tx)
}
fn notify_listeners(
inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
result: Result<T, String>,
) {
fn notify_listeners(inner: Arc<Mutex<BroadCastFutureBinding<T>>>, result: Result<T, String>) {
let mut data = inner.lock().unwrap();
data.broadcast.notify_listeners(result);
}
fn spawn(inner: Arc<Mutex<BroadCastFutureBinding<T>>>) -> impl Future<Output = Result<T, Error>> {
fn spawn(
inner: Arc<Mutex<BroadCastFutureBinding<T>>>,
) -> impl Future<Output = Result<T, Error>> {
let mut data = inner.lock().unwrap();
if let Some(source) = data.future.take() {
let inner1 = inner.clone();
let task = source.map(move |value| {
match value {
Ok(value) => Self::notify_listeners(inner1, Ok(value)),
Err(err) => Self::notify_listeners(inner1, Err(err.to_string())),
}
let task = source.map(move |value| match value {
Ok(value) => Self::notify_listeners(inner1, Ok(value)),
Err(err) => Self::notify_listeners(inner1, Err(err.to_string())),
});
tokio::spawn(task);
}
@ -137,22 +132,28 @@ impl<T: Clone + Send + 'static> BroadcastFuture<T> {
fn test_broadcast_future() {
use std::sync::atomic::{AtomicUsize, Ordering};
static CHECKSUM: AtomicUsize = AtomicUsize::new(0);
static CHECKSUM: AtomicUsize = AtomicUsize::new(0);
let (sender, trigger) = BroadcastFuture::new_oneshot();
let receiver1 = sender.listen()
let receiver1 = sender
.listen()
.map_ok(|res| {
CHECKSUM.fetch_add(res, Ordering::SeqCst);
})
.map_err(|err| { panic!("got error {}", err); })
.map_err(|err| {
panic!("got error {}", err);
})
.map(|_| ());
let receiver2 = sender.listen()
let receiver2 = sender
.listen()
.map_ok(|res| {
CHECKSUM.fetch_add(res*2, Ordering::SeqCst);
CHECKSUM.fetch_add(res * 2, Ordering::SeqCst);
})
.map_err(|err| {
panic!("got error {}", err);
})
.map_err(|err| { panic!("got error {}", err); })
.map(|_| ());
let rt = tokio::runtime::Runtime::new().unwrap();
@ -170,12 +171,17 @@ fn test_broadcast_future() {
assert_eq!(result, 3);
// the result stays available until the BroadcastFuture is dropped
rt.block_on(sender.listen()
.map_ok(|res| {
CHECKSUM.fetch_add(res*4, Ordering::SeqCst);
})
.map_err(|err| { panic!("got error {}", err); })
.map(|_| ()));
rt.block_on(
sender
.listen()
.map_ok(|res| {
CHECKSUM.fetch_add(res * 4, Ordering::SeqCst);
})
.map_err(|err| {
panic!("got error {}", err);
})
.map(|_| ()),
);
let result = CHECKSUM.load(Ordering::SeqCst);
assert_eq!(result, 7);

View File

@ -9,8 +9,8 @@ use futures::ready;
use futures::stream::Stream;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use proxmox_sys::io_format_err;
use proxmox_io::ByteBuffer;
use proxmox_sys::io_format_err;
const BUFFER_SIZE: usize = 8192;
@ -98,7 +98,8 @@ impl DeflateEncoder<Vec<u8>> {
let mut buffer = Vec::with_capacity(size_hint);
reader.read_to_end(&mut buffer).await?;
self.inner.reserve(size_hint); // should be enough since we want smalller files
self.compressor.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?;
self.compressor
.compress_vec(&buffer[..], &mut self.inner, FlushCompress::Finish)?;
Ok(())
}
}
@ -144,7 +145,7 @@ impl<T: AsyncWrite + Unpin> DeflateEncoder<T> {
impl<T, O> Stream for DeflateEncoder<T>
where
T: Stream<Item = Result<O, io::Error>> + Unpin,
O: Into<Bytes>
O: Into<Bytes>,
{
type Item = Result<Bytes, io::Error>;

View File

@ -1,19 +1,19 @@
//! Wrappers between async readers and streams.
use std::io;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::{Error, Result};
use tokio::io::{AsyncWrite};
use tokio::sync::mpsc::Sender;
use futures::ready;
use futures::future::FutureExt;
use futures::ready;
use tokio::io::AsyncWrite;
use tokio::sync::mpsc::Sender;
use proxmox_sys::io_format_err;
use proxmox_sys::error::io_err_other;
use proxmox_io::ByteBuffer;
use proxmox_sys::error::io_err_other;
use proxmox_sys::io_format_err;
/// Wrapper around tokio::sync::mpsc::Sender, which implements Write
pub struct AsyncChannelWriter {

View File

@ -4,9 +4,9 @@ use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
use futures::ready;
use futures::stream::Stream;
use tokio::io::{AsyncRead, ReadBuf};
/// Wrapper struct to convert an [AsyncRead] into a [Stream]
pub struct AsyncReaderStream<R: AsyncRead + Unpin> {
@ -14,17 +14,20 @@ pub struct AsyncReaderStream<R: AsyncRead + Unpin> {
buffer: Vec<u8>,
}
impl <R: AsyncRead + Unpin> AsyncReaderStream<R> {
impl<R: AsyncRead + Unpin> AsyncReaderStream<R> {
pub fn new(reader: R) -> Self {
let mut buffer = Vec::with_capacity(64*1024);
unsafe { buffer.set_len(buffer.capacity()); }
let mut buffer = Vec::with_capacity(64 * 1024);
unsafe {
buffer.set_len(buffer.capacity());
}
Self { reader, buffer }
}
pub fn with_buffer_size(reader: R, buffer_size: usize) -> Self {
let mut buffer = Vec::with_capacity(buffer_size);
unsafe { buffer.set_len(buffer.capacity()); }
unsafe {
buffer.set_len(buffer.capacity());
}
Self { reader, buffer }
}
}

View File

@ -625,8 +625,8 @@ pub async fn zip_directory<W>(target: W, source: &Path) -> Result<(), Error>
where
W: AsyncWrite + Unpin + Send,
{
use walkdir::WalkDir;
use std::os::unix::fs::MetadataExt;
use walkdir::WalkDir;
let base_path = source.parent().unwrap_or_else(|| Path::new("/"));
let mut encoder = ZipEncoder::new(target);
@ -640,28 +640,22 @@ where
if let Err(err) = async move {
let entry_path_no_base = entry.path().strip_prefix(base_path)?;
let metadata = entry.metadata()?;
let mtime = match metadata.modified().unwrap_or_else(|_| SystemTime::now()).duration_since(SystemTime::UNIX_EPOCH) {
let mtime = match metadata
.modified()
.unwrap_or_else(|_| SystemTime::now())
.duration_since(SystemTime::UNIX_EPOCH)
{
Ok(dur) => dur.as_secs() as i64,
Err(time_error) => -(time_error.duration().as_secs() as i64)
Err(time_error) => -(time_error.duration().as_secs() as i64),
};
let mode = metadata.mode() as u16;
if entry.file_type().is_file() {
let file = tokio::fs::File::open(entry.path()).await?;
let ze = ZipEntry::new(
&entry_path_no_base,
mtime,
mode,
true,
);
let ze = ZipEntry::new(&entry_path_no_base, mtime, mode, true);
encoder.add_entry(ze, Some(file)).await?;
} else if entry.file_type().is_dir() {
let ze = ZipEntry::new(
&entry_path_no_base,
mtime,
mode,
false,
);
let ze = ZipEntry::new(&entry_path_no_base, mtime, mode, false);
let content: Option<tokio::fs::File> = None;
encoder.add_entry(ze, content).await?;
}