proxmox-async: add SenderWriter helper

this wraps around a tokio Sender for Vec<u8>, but implements a blocking
write. We can use thas as an adapter for something that only takes a
writer, and can read from it asynchonously

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Dominik Csapak 2022-04-08 11:56:00 +02:00 committed by Wolfgang Bumiller
parent 4cdeee64dc
commit 9471ba9969
2 changed files with 50 additions and 0 deletions

View File

@ -9,3 +9,6 @@ pub use tokio_writer_adapter::TokioWriterAdapter;
mod wrapped_reader_stream;
pub use wrapped_reader_stream::WrappedReaderStream;
mod sender_writer;
pub use sender_writer::SenderWriter;

View File

@ -0,0 +1,47 @@
use std::io;
use anyhow::Error;
use tokio::sync::mpsc::Sender;
/// Wrapper struct around [`tokio::sync::mpsc::Sender`] for `Result<Vec<u8>, Error>` that implements [`std::io::Write`]
pub struct SenderWriter {
sender: Sender<Result<Vec<u8>, Error>>,
}
impl SenderWriter {
pub fn from_sender(sender: tokio::sync::mpsc::Sender<Result<Vec<u8>, Error>>) -> Self {
Self { sender }
}
fn write_impl(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Err(err) = self.sender.blocking_send(Ok(buf.to_vec())) {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!("could not send: {}", err),
));
}
Ok(buf.len())
}
fn flush_impl(&mut self) -> io::Result<()> {
Ok(())
}
}
impl io::Write for SenderWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_impl(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.flush_impl()
}
}
impl Drop for SenderWriter {
fn drop(&mut self) {
// ignore errors
let _ = self.flush_impl();
}
}