5
0
mirror of git://git.proxmox.com/git/pxar.git synced 2024-12-22 21:33:50 +03:00

add aio Encoder

Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
Wolfgang Bumiller 2020-03-18 16:10:28 +01:00
parent 05356884d6
commit 5410984044
4 changed files with 319 additions and 4 deletions

View File

@ -25,12 +25,13 @@ tokio = { version = "0.2.10", optional = true, default-features = false }
default = [ "futures-io", "tokio-io" ]
futures-io = [ "futures" ]
tokio-io = [ "tokio" ]
tokio-fs = [ "tokio-io", "tokio/fs" ]
async-example = [
"failure",
"futures-io",
"tokio-io",
"tokio/fs",
"tokio-fs",
"tokio/rt-threaded",
"tokio/io-driver",
"tokio/macros",

View File

@ -17,6 +17,7 @@ use crate::format::{self, GoodbyeItem};
use crate::poll_fn::poll_fn;
use crate::Metadata;
pub mod aio;
pub mod sync;
#[doc(inline)]

309
src/encoder/aio.rs Normal file
View File

@ -0,0 +1,309 @@
//! Asynchronous `pxar` format handling.
use std::io;
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::encoder::{self, SeqWrite};
use crate::Metadata;
// #[cfg(feature = "futures-io")]
// use crate::decoder::aio::FuturesReader;
// #[cfg(feature = "tokio-io")]
// use crate::decoder::aio::TokioReader;
/// Asynchronous `pxar` encoder.
///
/// This is the `async` version of the `pxar` encoder.
#[repr(transparent)]
pub struct Encoder<'a, T: SeqWrite + 'a> {
inner: encoder::EncoderImpl<'a, T>,
}
#[cfg(feature = "futures-io")]
impl<'a, T: futures::io::AsyncWrite + 'a> Encoder<'a, FuturesWriter<T>> {
/// Encode a `pxar` archive into a `futures::io::AsyncWrite` output.
#[inline]
pub async fn from_futures(
output: T,
metadata: &Metadata,
) -> io::Result<Encoder<'a, FuturesWriter<T>>> {
Encoder::new(FuturesWriter::new(output), metadata).await
}
}
#[cfg(feature = "tokio-io")]
impl<'a, T: tokio::io::AsyncWrite + 'a> Encoder<'a, TokioWriter<T>> {
/// Encode a `pxar` archive into a `tokio::io::AsyncWrite` output.
#[inline]
pub async fn from_futures(
output: T,
metadata: &Metadata,
) -> io::Result<Encoder<'a, TokioWriter<T>>> {
Encoder::new(TokioWriter::new(output), metadata).await
}
}
#[cfg(feature = "tokio-fs")]
impl<'a> Encoder<'a, TokioWriter<tokio::fs::File>> {
/// Convenience shortcut for `File::create` followed by `Encoder::from_tokio`.
pub fn create<'b, P: AsRef<Path>>(
path: P,
metadata: &'b Metadata,
) -> io::Result<Encoder<'a, TokioWriter<tokio::fs::File>>> {
Encoder::new(
TokioWriter::new(tokio::fs::File::create(path.as_ref()).await?),
metadata,
).await
}
}
impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
/// Create an asynchronous encoder for an output implementing our internal write interface.
pub async fn new(output: T, metadata: &Metadata) -> io::Result<Encoder<'a, T>> {
Ok(Self {
inner: encoder::EncoderImpl::new(output, metadata).await?,
})
}
/// Create a new regular file in the archive. This returns a `File` object to which the
/// contents have to be written out *completely*. Failing to do so will put the encoder into an
/// error state.
pub async fn create_file<'b, P: AsRef<Path>>(
&'b mut self,
metadata: &Metadata,
file_name: P,
file_size: u64,
) -> io::Result<File<'b>>
where
'a: 'b,
{
Ok(File {
inner: self.inner.create_file(
metadata,
file_name.as_ref(),
file_size,
).await?,
})
}
// /// Convenience shortcut to add a *regular* file by path including its contents to the archive.
// pub async fn add_file<P, F>(
// &mut self,
// metadata: &Metadata,
// file_name: P,
// file_size: u64,
// content: &mut dyn tokio::io::Read,
// ) -> io::Result<()>
// where
// P: AsRef<Path>,
// F: AsAsyncReader,
// {
// self.inner.add_file(
// metadata,
// file_name.as_ref(),
// file_size,
// content.as_async_reader(),
// ).await
// }
/// Create a new subdirectory. Note that the subdirectory has to be finished by calling the
/// `finish()` method, otherwise the entire archive will be in an error state.
pub async fn create_directory<'b, P: AsRef<Path>>(
&'b mut self,
file_name: P,
metadata: &Metadata,
) -> io::Result<Encoder<'b, &'b mut dyn SeqWrite>>
where
'a: 'b,
{
Ok(Encoder {
inner: self.inner.create_directory(file_name.as_ref(), metadata).await?,
})
}
/// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`.
pub async fn finish(self) -> io::Result<()> {
self.inner.finish().await
}
}
#[repr(transparent)]
pub struct File<'a> {
inner: encoder::FileImpl<'a>,
}
#[cfg(feature = "futures-io")]
impl<'a> futures::io::AsyncWrite for File<'a> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll<io::Result<usize>> {
unsafe { self.map_unchecked_mut(|this| &mut this.inner) }
.poll_write(cx, data)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
unsafe { self.map_unchecked_mut(|this| &mut this.inner) }
.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
unsafe { self.map_unchecked_mut(|this| &mut this.inner) }
.poll_close(cx)
}
}
#[cfg(feature = "tokio-io")]
impl<'a> tokio::io::AsyncWrite for File<'a> {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll<io::Result<usize>> {
unsafe { self.map_unchecked_mut(|this| &mut this.inner) }
.poll_write(cx, data)
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
unsafe { self.map_unchecked_mut(|this| &mut this.inner) }
.poll_flush(cx)
}
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
unsafe { self.map_unchecked_mut(|this| &mut this.inner) }
.poll_close(cx)
}
}
/// Pxar encoder write adapter for `futures::io::AsyncWrite`.
#[cfg(feature = "futures-io")]
mod futures_writer {
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::encoder::SeqWrite;
pub struct FuturesWriter<T> {
inner: Option<T>,
position: u64,
}
impl<T: futures::io::AsyncWrite> FuturesWriter<T> {
pub fn new(inner: T) -> Self {
Self { inner: Some(inner), position: 0 }
}
fn inner_mut(self: &mut Self) -> io::Result<Pin<&mut T>> {
let inner = self
.inner
.as_mut()
.ok_or_else(|| io_format_err!("write after close"))?;
Ok(unsafe { Pin::new_unchecked(inner) })
}
fn inner(self: Pin<&mut Self>) -> io::Result<Pin<&mut T>> {
unsafe { self.get_unchecked_mut() }
.inner_mut()
}
}
impl<T: futures::io::AsyncWrite> SeqWrite for FuturesWriter<T> {
fn poll_seq_write(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = unsafe { self.get_unchecked_mut() };
let got = ready!(this.inner_mut()?.poll_write(cx, buf))?;
this.position += got as u64;
Poll::Ready(Ok(got))
}
fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<u64>> {
Poll::Ready(Ok(self.as_ref().position))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
self.inner()?.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
let this = unsafe { self.get_unchecked_mut() };
match this.inner.as_mut() {
None => return Poll::Ready(Ok(())),
Some(inner) => {
ready!(unsafe { Pin::new_unchecked(inner).poll_close(cx) })?;
this.inner = None;
Poll::Ready(Ok(()))
}
}
}
}
}
pub use futures_writer::FuturesWriter;
/// Pxar encoder write adapter for `tokio::io::AsyncWrite`.
#[cfg(feature = "tokio-io")]
mod tokio_writer {
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::encoder::SeqWrite;
pub struct TokioWriter<T> {
inner: Option<T>,
position: u64,
}
impl<T: tokio::io::AsyncWrite> TokioWriter<T> {
pub fn new(inner: T) -> Self {
Self { inner: Some(inner), position: 0 }
}
fn inner_mut(self: &mut Self) -> io::Result<Pin<&mut T>> {
let inner = self
.inner
.as_mut()
.ok_or_else(|| io_format_err!("write after close"))?;
Ok(unsafe { Pin::new_unchecked(inner) })
}
fn inner(self: Pin<&mut Self>) -> io::Result<Pin<&mut T>> {
unsafe { self.get_unchecked_mut() }
.inner_mut()
}
}
impl<T: tokio::io::AsyncWrite> SeqWrite for TokioWriter<T> {
fn poll_seq_write(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = unsafe { self.get_unchecked_mut() };
let got = ready!(this.inner_mut()?.poll_write(cx, buf))?;
this.position += got as u64;
Poll::Ready(Ok(got))
}
fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<u64>> {
Poll::Ready(Ok(self.as_ref().position))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
self.inner()?.poll_flush(cx)
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
let this = unsafe { self.get_unchecked_mut() };
match this.inner.as_mut() {
None => return Poll::Ready(Ok(())),
Some(inner) => {
ready!(unsafe { Pin::new_unchecked(inner).poll_shutdown(cx) })?;
this.inner = None;
Poll::Ready(Ok(()))
}
}
}
}
}
pub use tokio_writer::TokioWriter;

View File

@ -55,8 +55,8 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> {
})
}
/// Create a new regular file to the archive. This returns a `File` object to which the
/// contents have to be written out completely. Failing to do so will put the encoder into an
/// Create a new regular file in the archive. This returns a `File` object to which the
/// contents have to be written out *completely*. Failing to do so will put the encoder into an
/// error state.
pub fn create_file<'b, P: AsRef<Path>>(
&'b mut self,
@ -178,7 +178,11 @@ impl<T: io::Write> SeqWrite for StandardWriter<T> {
let this = unsafe { self.get_unchecked_mut() };
Poll::Ready(match this.inner.as_mut() {
None => Ok(()),
Some(inner) => inner.flush(),
Some(inner) => {
inner.flush()?;
this.inner = None;
Ok(())
}
})
}
}