compression: add flush_window to DeflateEncoder

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2024-09-03 12:54:59 +02:00
parent b22de1864a
commit 78c29b33ef

View File

@ -33,6 +33,9 @@ pub struct DeflateEncoder<T> {
buffer: ByteBuffer,
input_buffer: Bytes,
state: EncoderState,
/// This is the current amount of sent data and the window size used for intermittent flushing
/// of the compression layer.
flush_window: Option<(usize, usize)>,
}
pub struct DeflateEncoderBuilder<T> {
@ -40,24 +43,32 @@ pub struct DeflateEncoderBuilder<T> {
is_zlib: bool,
buffer_size: usize,
level: Level,
flush_window: Option<usize>,
}
impl<T> DeflateEncoderBuilder<T> {
pub fn zlib(mut self, is_zlib: bool) -> Self {
pub const fn zlib(mut self, is_zlib: bool) -> Self {
self.is_zlib = is_zlib;
self
}
pub fn level(mut self, level: Level) -> Self {
pub const fn level(mut self, level: Level) -> Self {
self.level = level;
self
}
pub fn buffer_size(mut self, buffer_size: usize) -> Self {
pub const fn buffer_size(mut self, buffer_size: usize) -> Self {
self.buffer_size = buffer_size;
self
}
/// If set, this is the number of bytes after which the compressor will be notified to flush
/// some data, so the compression produces more steady output and a little earlier.
pub const fn flush_window(mut self, flush_window: Option<usize>) -> Self {
self.flush_window = flush_window;
self
}
pub fn build(self) -> DeflateEncoder<T> {
let level = match self.level {
Level::Fastest => Compression::fast(),
@ -72,6 +83,7 @@ impl<T> DeflateEncoderBuilder<T> {
buffer: ByteBuffer::with_capacity(self.buffer_size),
input_buffer: Bytes::new(),
state: EncoderState::Reading,
flush_window: self.flush_window.map(|n| (0, n)),
}
}
}
@ -87,6 +99,7 @@ impl<T> DeflateEncoder<T> {
is_zlib: false,
buffer_size: super::BUFFER_SIZE,
level: Level::Default,
flush_window: None,
}
}
@ -202,9 +215,25 @@ where
if this.input_buffer.is_empty() {
return Poll::Ready(Some(Err(io_format_err!("empty input during write"))));
}
// in stream mode we'd like to flush regularly as the library's inner buffer is
// quite huge...
let flush_mode = match &mut this.flush_window {
Some((at, window)) if *at >= *window => {
*at = 0;
FlushCompress::Sync
}
_ => FlushCompress::None,
};
let mut buf = this.input_buffer.split_off(0);
let (read, res) = this.encode(&buf[..], FlushCompress::None)?;
let (read, res) = this.encode(&buf[..], flush_mode)?;
this.input_buffer = buf.split_off(read);
if let Some((at, _window)) = &mut this.flush_window {
*at = at.saturating_add(read);
}
if this.input_buffer.is_empty() {
this.state = EncoderState::Reading;
}