diff --git a/Cargo.toml b/Cargo.toml index 37993a5..11ae64a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,12 +25,13 @@ tokio = { version = "0.2.10", optional = true, default-features = false } default = [ "futures-io", "tokio-io" ] futures-io = [ "futures" ] tokio-io = [ "tokio" ] +tokio-fs = [ "tokio-io", "tokio/fs" ] async-example = [ "failure", "futures-io", "tokio-io", - "tokio/fs", + "tokio-fs", "tokio/rt-threaded", "tokio/io-driver", "tokio/macros", diff --git a/src/encoder.rs b/src/encoder.rs index 9474e71..804aa17 100644 --- a/src/encoder.rs +++ b/src/encoder.rs @@ -17,6 +17,7 @@ use crate::format::{self, GoodbyeItem}; use crate::poll_fn::poll_fn; use crate::Metadata; +pub mod aio; pub mod sync; #[doc(inline)] diff --git a/src/encoder/aio.rs b/src/encoder/aio.rs new file mode 100644 index 0000000..f234cf2 --- /dev/null +++ b/src/encoder/aio.rs @@ -0,0 +1,309 @@ +//! Asynchronous `pxar` format handling. + +use std::io; +use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::encoder::{self, SeqWrite}; +use crate::Metadata; + +// #[cfg(feature = "futures-io")] +// use crate::decoder::aio::FuturesReader; +// #[cfg(feature = "tokio-io")] +// use crate::decoder::aio::TokioReader; + +/// Asynchronous `pxar` encoder. +/// +/// This is the `async` version of the `pxar` encoder. +#[repr(transparent)] +pub struct Encoder<'a, T: SeqWrite + 'a> { + inner: encoder::EncoderImpl<'a, T>, +} + +#[cfg(feature = "futures-io")] +impl<'a, T: futures::io::AsyncWrite + 'a> Encoder<'a, FuturesWriter> { + /// Encode a `pxar` archive into a `futures::io::AsyncWrite` output. + #[inline] + pub async fn from_futures( + output: T, + metadata: &Metadata, + ) -> io::Result>> { + Encoder::new(FuturesWriter::new(output), metadata).await + } +} + +#[cfg(feature = "tokio-io")] +impl<'a, T: tokio::io::AsyncWrite + 'a> Encoder<'a, TokioWriter> { + /// Encode a `pxar` archive into a `tokio::io::AsyncWrite` output. + #[inline] + pub async fn from_futures( + output: T, + metadata: &Metadata, + ) -> io::Result>> { + Encoder::new(TokioWriter::new(output), metadata).await + } +} + +#[cfg(feature = "tokio-fs")] +impl<'a> Encoder<'a, TokioWriter> { + /// Convenience shortcut for `File::create` followed by `Encoder::from_tokio`. + pub fn create<'b, P: AsRef>( + path: P, + metadata: &'b Metadata, + ) -> io::Result>> { + Encoder::new( + TokioWriter::new(tokio::fs::File::create(path.as_ref()).await?), + metadata, + ).await + } +} + +impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { + /// Create an asynchronous encoder for an output implementing our internal write interface. + pub async fn new(output: T, metadata: &Metadata) -> io::Result> { + Ok(Self { + inner: encoder::EncoderImpl::new(output, metadata).await?, + }) + } + + /// Create a new regular file in the archive. This returns a `File` object to which the + /// contents have to be written out *completely*. Failing to do so will put the encoder into an + /// error state. + pub async fn create_file<'b, P: AsRef>( + &'b mut self, + metadata: &Metadata, + file_name: P, + file_size: u64, + ) -> io::Result> + where + 'a: 'b, + { + Ok(File { + inner: self.inner.create_file( + metadata, + file_name.as_ref(), + file_size, + ).await?, + }) + } + + // /// Convenience shortcut to add a *regular* file by path including its contents to the archive. + // pub async fn add_file( + // &mut self, + // metadata: &Metadata, + // file_name: P, + // file_size: u64, + // content: &mut dyn tokio::io::Read, + // ) -> io::Result<()> + // where + // P: AsRef, + // F: AsAsyncReader, + // { + // self.inner.add_file( + // metadata, + // file_name.as_ref(), + // file_size, + // content.as_async_reader(), + // ).await + // } + + /// Create a new subdirectory. Note that the subdirectory has to be finished by calling the + /// `finish()` method, otherwise the entire archive will be in an error state. + pub async fn create_directory<'b, P: AsRef>( + &'b mut self, + file_name: P, + metadata: &Metadata, + ) -> io::Result> + where + 'a: 'b, + { + Ok(Encoder { + inner: self.inner.create_directory(file_name.as_ref(), metadata).await?, + }) + } + + /// Finish this directory. This is mandatory, otherwise the `Drop` handler will `panic!`. + pub async fn finish(self) -> io::Result<()> { + self.inner.finish().await + } +} + +#[repr(transparent)] +pub struct File<'a> { + inner: encoder::FileImpl<'a>, +} + +#[cfg(feature = "futures-io")] +impl<'a> futures::io::AsyncWrite for File<'a> { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll> { + unsafe { self.map_unchecked_mut(|this| &mut this.inner) } + .poll_write(cx, data) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + unsafe { self.map_unchecked_mut(|this| &mut this.inner) } + .poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + unsafe { self.map_unchecked_mut(|this| &mut this.inner) } + .poll_close(cx) + } +} + +#[cfg(feature = "tokio-io")] +impl<'a> tokio::io::AsyncWrite for File<'a> { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context, data: &[u8]) -> Poll> { + unsafe { self.map_unchecked_mut(|this| &mut this.inner) } + .poll_write(cx, data) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + unsafe { self.map_unchecked_mut(|this| &mut this.inner) } + .poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + unsafe { self.map_unchecked_mut(|this| &mut this.inner) } + .poll_close(cx) + } +} + +/// Pxar encoder write adapter for `futures::io::AsyncWrite`. +#[cfg(feature = "futures-io")] +mod futures_writer { + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll}; + + use crate::encoder::SeqWrite; + + pub struct FuturesWriter { + inner: Option, + position: u64, + } + + impl FuturesWriter { + pub fn new(inner: T) -> Self { + Self { inner: Some(inner), position: 0 } + } + + fn inner_mut(self: &mut Self) -> io::Result> { + let inner = self + .inner + .as_mut() + .ok_or_else(|| io_format_err!("write after close"))?; + Ok(unsafe { Pin::new_unchecked(inner) }) + } + + fn inner(self: Pin<&mut Self>) -> io::Result> { + unsafe { self.get_unchecked_mut() } + .inner_mut() + } + } + + impl SeqWrite for FuturesWriter { + fn poll_seq_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + let this = unsafe { self.get_unchecked_mut() }; + let got = ready!(this.inner_mut()?.poll_write(cx, buf))?; + this.position += got as u64; + Poll::Ready(Ok(got)) + } + + fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(self.as_ref().position)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.inner()?.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = unsafe { self.get_unchecked_mut() }; + match this.inner.as_mut() { + None => return Poll::Ready(Ok(())), + Some(inner) => { + ready!(unsafe { Pin::new_unchecked(inner).poll_close(cx) })?; + this.inner = None; + Poll::Ready(Ok(())) + } + } + } + } +} + +pub use futures_writer::FuturesWriter; + +/// Pxar encoder write adapter for `tokio::io::AsyncWrite`. +#[cfg(feature = "tokio-io")] +mod tokio_writer { + use std::io; + use std::pin::Pin; + use std::task::{Context, Poll}; + + use crate::encoder::SeqWrite; + + pub struct TokioWriter { + inner: Option, + position: u64, + } + + impl TokioWriter { + pub fn new(inner: T) -> Self { + Self { inner: Some(inner), position: 0 } + } + + fn inner_mut(self: &mut Self) -> io::Result> { + let inner = self + .inner + .as_mut() + .ok_or_else(|| io_format_err!("write after close"))?; + Ok(unsafe { Pin::new_unchecked(inner) }) + } + + fn inner(self: Pin<&mut Self>) -> io::Result> { + unsafe { self.get_unchecked_mut() } + .inner_mut() + } + } + + impl SeqWrite for TokioWriter { + fn poll_seq_write( + self: Pin<&mut Self>, + cx: &mut Context, + buf: &[u8], + ) -> Poll> { + let this = unsafe { self.get_unchecked_mut() }; + let got = ready!(this.inner_mut()?.poll_write(cx, buf))?; + this.position += got as u64; + Poll::Ready(Ok(got)) + } + + fn poll_position(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { + Poll::Ready(Ok(self.as_ref().position)) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + self.inner()?.poll_flush(cx) + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = unsafe { self.get_unchecked_mut() }; + match this.inner.as_mut() { + None => return Poll::Ready(Ok(())), + Some(inner) => { + ready!(unsafe { Pin::new_unchecked(inner).poll_shutdown(cx) })?; + this.inner = None; + Poll::Ready(Ok(())) + } + } + } + } +} + +pub use tokio_writer::TokioWriter; diff --git a/src/encoder/sync.rs b/src/encoder/sync.rs index 7374c06..1c145a7 100644 --- a/src/encoder/sync.rs +++ b/src/encoder/sync.rs @@ -55,8 +55,8 @@ impl<'a, T: SeqWrite + 'a> Encoder<'a, T> { }) } - /// Create a new regular file to the archive. This returns a `File` object to which the - /// contents have to be written out completely. Failing to do so will put the encoder into an + /// Create a new regular file in the archive. This returns a `File` object to which the + /// contents have to be written out *completely*. Failing to do so will put the encoder into an /// error state. pub fn create_file<'b, P: AsRef>( &'b mut self, @@ -178,7 +178,11 @@ impl SeqWrite for StandardWriter { let this = unsafe { self.get_unchecked_mut() }; Poll::Ready(match this.inner.as_mut() { None => Ok(()), - Some(inner) => inner.flush(), + Some(inner) => { + inner.flush()?; + this.inner = None; + Ok(()) + } }) } }