diff --git a/proxmox-compression/src/deflate/decompression.rs b/proxmox-compression/src/deflate/decompression.rs new file mode 100644 index 00000000..45ed8579 --- /dev/null +++ b/proxmox-compression/src/deflate/decompression.rs @@ -0,0 +1,141 @@ +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use anyhow::Error; +use bytes::Bytes; +use flate2::{Decompress, FlushDecompress}; +use futures::ready; +use futures::stream::Stream; + +use proxmox_io::ByteBuffer; + +#[derive(Eq, PartialEq)] +enum DecoderState { + Reading, + Writing, + Flushing, + Finished, +} + +pub struct DeflateDecoder { + inner: T, + decompressor: Decompress, + buffer: ByteBuffer, + input_buffer: Bytes, + state: DecoderState, +} + +pub struct DeflateDecoderBuilder { + inner: T, + is_zlib: bool, + buffer_size: usize, +} + +impl DeflateDecoderBuilder { + pub fn zlib(mut self, is_zlib: bool) -> Self { + self.is_zlib = is_zlib; + self + } + + pub fn buffer_size(mut self, buffer_size: usize) -> Self { + self.buffer_size = buffer_size; + self + } + + pub fn build(self) -> DeflateDecoder { + DeflateDecoder { + inner: self.inner, + decompressor: Decompress::new(self.is_zlib), + buffer: ByteBuffer::with_capacity(self.buffer_size), + input_buffer: Bytes::new(), + state: DecoderState::Reading, + } + } +} + +impl DeflateDecoder { + pub fn new(inner: T) -> Self { + Self::builder(inner).build() + } + + pub fn builder(inner: T) -> DeflateDecoderBuilder { + DeflateDecoderBuilder { + inner, + is_zlib: false, + buffer_size: super::BUFFER_SIZE, + } + } + + fn decode( + &mut self, + inbuf: &[u8], + flush: FlushDecompress, + ) -> Result<(usize, flate2::Status), io::Error> { + let old_in = self.decompressor.total_in(); + let old_out = self.decompressor.total_out(); + let res = self + .decompressor + .decompress(inbuf, self.buffer.get_free_mut_slice(), flush)?; + let new_in = (self.decompressor.total_in() - old_in) as usize; + let new_out = (self.decompressor.total_out() - old_out) as usize; + self.buffer.add_size(new_out); + + Ok((new_in, res)) + } +} + +impl Stream for DeflateDecoder +where + T: Stream> + Unpin, + O: Into, + E: Into, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + + loop { + match this.state { + DecoderState::Reading => { + if let Some(res) = ready!(Pin::new(&mut this.inner).poll_next(cx)) { + let buf = res.map_err(Into::into)?; + this.input_buffer = buf.into(); + this.state = DecoderState::Writing; + } else { + this.state = DecoderState::Flushing; + } + } + DecoderState::Writing => { + if this.input_buffer.is_empty() { + return Poll::Ready(Some(Err(anyhow::format_err!( + "empty input during write" + )))); + } + let mut buf = this.input_buffer.split_off(0); + let (read, res) = this.decode(&buf[..], FlushDecompress::None)?; + this.input_buffer = buf.split_off(read); + if this.input_buffer.is_empty() { + this.state = DecoderState::Reading; + } + if this.buffer.is_full() || res == flate2::Status::BufError { + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); + return Poll::Ready(Some(Ok(bytes.into()))); + } + } + DecoderState::Flushing => { + let (_read, res) = this.decode(&[][..], FlushDecompress::Finish)?; + if !this.buffer.is_empty() { + let bytes = this.buffer.remove_data(this.buffer.len()).to_vec(); + return Poll::Ready(Some(Ok(bytes.into()))); + } + if res == flate2::Status::StreamEnd { + this.state = DecoderState::Finished; + } + } + DecoderState::Finished => return Poll::Ready(None), + } + } + } +} diff --git a/proxmox-compression/src/deflate/mod.rs b/proxmox-compression/src/deflate/mod.rs index 514ccbdc..6867176c 100644 --- a/proxmox-compression/src/deflate/mod.rs +++ b/proxmox-compression/src/deflate/mod.rs @@ -1,5 +1,7 @@ mod compression; +mod decompression; pub use compression::{DeflateEncoder, Level}; +pub use decompression::DeflateDecoder; const BUFFER_SIZE: usize = 8192; diff --git a/proxmox-compression/src/lib.rs b/proxmox-compression/src/lib.rs index 4be643c4..817d2af5 100644 --- a/proxmox-compression/src/lib.rs +++ b/proxmox-compression/src/lib.rs @@ -1,6 +1,6 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -pub use deflate::{DeflateEncoder, Level}; +pub use deflate::{DeflateDecoder, DeflateEncoder, Level}; mod deflate; pub mod tar;