proxmox-compression: make ZstdEncoder stream a bit more generic

by not requiring the 'anyhow::Error' type from the source, but only
that it implements 'Into<Error>'. This way, we can also accept a stream
that produces e.g. an io::Error

Signed-off-by: Dominik Csapak <d.csapak@proxmox.com>
This commit is contained in:
Dominik Csapak 2022-05-31 13:17:20 +02:00 committed by Wolfgang Bumiller
parent b5accff750
commit 38db37dc5f

View File

@ -32,10 +32,11 @@ pub struct ZstdEncoder<'a, T> {
state: EncoderState,
}
impl<'a, T, O> ZstdEncoder<'a, T>
impl<'a, T, O, E> ZstdEncoder<'a, T>
where
T: Stream<Item = Result<O, Error>> + Unpin,
T: Stream<Item = Result<O, E>> + Unpin,
O: Into<Bytes>,
E: Into<Error>,
{
/// Returns a new [ZstdEncoder] with default level 3
pub fn new(inner: T) -> Result<Self, io::Error> {
@ -79,10 +80,11 @@ impl<'a, T> ZstdEncoder<'a, T> {
}
}
impl<'a, T, O> Stream for ZstdEncoder<'a, T>
impl<'a, T, O, E> Stream for ZstdEncoder<'a, T>
where
T: Stream<Item = Result<O, Error>> + Unpin,
T: Stream<Item = Result<O, E>> + Unpin,
O: Into<Bytes>,
E: Into<Error>,
{
type Item = Result<Bytes, Error>;
@ -93,7 +95,10 @@ where
match this.state {
EncoderState::Reading => {
if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) {
let buf = res?;
let buf = match res {
Ok(buf) => buf,
Err(err) => return Poll::Ready(Some(Err(err.into()))),
};
this.input_buffer = buf.into();
this.state = EncoderState::Writing;
} else {