5
0
mirror of git://git.proxmox.com/git/pxar.git synced 2025-01-19 18:03:35 +03:00

decoder/accessor: allow for split input stream variant

When a pxar archive was encoded using the split stream output
variant, access to the payload of regular files has to be redirected
to the corresponding dedicated input.

Allow to pass the split input variant to the decoder and accessor
instances to handle the split streams accordingly and decode split
stream archives.

Signed-off-by: Christian Ebner <c.ebner@proxmox.com>
This commit is contained in:
Christian Ebner 2024-03-12 15:06:44 +01:00
parent 99d4f646be
commit 3ee98e1072
10 changed files with 206 additions and 78 deletions

View File

@ -9,7 +9,7 @@ async fn main() {
.await .await
.expect("failed to open file"); .expect("failed to open file");
let mut reader = Decoder::from_tokio(file) let mut reader = Decoder::from_tokio(pxar::PxarVariant::Unified(file))
.await .await
.expect("failed to open pxar archive contents"); .expect("failed to open pxar archive contents");

View File

@ -18,7 +18,7 @@ use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation};
use crate::decoder::aio::Decoder; use crate::decoder::aio::Decoder;
use crate::format::GoodbyeItem; use crate::format::GoodbyeItem;
use crate::util; use crate::util;
use crate::Entry; use crate::{Entry, PxarVariant};
use super::sync::{FileReader, FileRefReader}; use super::sync::{FileReader, FileRefReader};
@ -39,7 +39,7 @@ impl<T: FileExt> Accessor<FileReader<T>> {
/// by a blocking file. /// by a blocking file.
#[inline] #[inline]
pub async fn from_file_and_size(input: T, size: u64) -> io::Result<Self> { pub async fn from_file_and_size(input: T, size: u64) -> io::Result<Self> {
Accessor::new(FileReader::new(input), size).await Accessor::new(PxarVariant::Unified(FileReader::new(input)), size).await
} }
} }
@ -75,7 +75,7 @@ where
input: T, input: T,
size: u64, size: u64,
) -> io::Result<Accessor<FileRefReader<T>>> { ) -> io::Result<Accessor<FileRefReader<T>>> {
Accessor::new(FileRefReader::new(input), size).await Accessor::new(PxarVariant::Unified(FileRefReader::new(input)), size).await
} }
} }
@ -85,7 +85,9 @@ impl<T: ReadAt> Accessor<T> {
/// ///
/// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is
/// not allowed to use the `Waker`, as this will cause a `panic!`. /// not allowed to use the `Waker`, as this will cause a `panic!`.
pub async fn new(input: T, size: u64) -> io::Result<Self> { /// Optionally take the file payloads from the provided input stream rather than the regular
/// pxar stream.
pub async fn new(input: PxarVariant<T, (T, u64)>, size: u64) -> io::Result<Self> {
Ok(Self { Ok(Self {
inner: accessor::AccessorImpl::new(input, size).await?, inner: accessor::AccessorImpl::new(input, size).await?,
}) })

View File

@ -19,7 +19,7 @@ use crate::binary_tree_array;
use crate::decoder::{self, DecoderImpl}; use crate::decoder::{self, DecoderImpl};
use crate::format::{self, GoodbyeItem}; use crate::format::{self, GoodbyeItem};
use crate::util; use crate::util;
use crate::{Entry, EntryKind}; use crate::{Entry, EntryKind, PxarVariant};
pub mod aio; pub mod aio;
pub mod cache; pub mod cache;
@ -179,17 +179,22 @@ struct Caches {
/// The random access state machine implementation. /// The random access state machine implementation.
pub(crate) struct AccessorImpl<T> { pub(crate) struct AccessorImpl<T> {
input: T, input: PxarVariant<T, (T, Range<u64>)>,
size: u64, size: u64,
caches: Arc<Caches>, caches: Arc<Caches>,
} }
impl<T: ReadAt> AccessorImpl<T> { impl<T: ReadAt> AccessorImpl<T> {
pub async fn new(input: T, size: u64) -> io::Result<Self> { pub async fn new(input: PxarVariant<T, (T, u64)>, size: u64) -> io::Result<Self> {
if size < (size_of::<GoodbyeItem>() as u64) { if size < (size_of::<GoodbyeItem>() as u64) {
io_bail!("too small to contain a pxar archive"); io_bail!("too small to contain a pxar archive");
} }
let input = input.wrap_multi(
|input| input,
|(payload_input, size)| (payload_input, 0..size),
);
Ok(Self { Ok(Self {
input, input,
size, size,
@ -202,13 +207,14 @@ impl<T: ReadAt> AccessorImpl<T> {
} }
pub async fn open_root_ref(&self) -> io::Result<DirectoryImpl<&dyn ReadAt>> { pub async fn open_root_ref(&self) -> io::Result<DirectoryImpl<&dyn ReadAt>> {
DirectoryImpl::open_at_end( let input = match &self.input {
&self.input as &dyn ReadAt, PxarVariant::Unified(input) => PxarVariant::Unified(input as &dyn ReadAt),
self.size, PxarVariant::Split(input, (payload_input, range)) => PxarVariant::Split(
"/".into(), input as &dyn ReadAt,
Arc::clone(&self.caches), (payload_input as &dyn ReadAt, range.clone()),
) ),
.await };
DirectoryImpl::open_at_end(input, self.size, "/".into(), Arc::clone(&self.caches)).await
} }
pub fn set_goodbye_table_cache( pub fn set_goodbye_table_cache(
@ -224,21 +230,25 @@ impl<T: ReadAt> AccessorImpl<T> {
} }
async fn get_decoder<T: ReadAt>( async fn get_decoder<T: ReadAt>(
input: T, input: PxarVariant<T, (T, Range<u64>)>,
entry_range: Range<u64>, entry_range: Range<u64>,
path: PathBuf, path: PathBuf,
) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> { ) -> io::Result<DecoderImpl<SeqReadAtAdapter<T>>> {
DecoderImpl::new_full(SeqReadAtAdapter::new(input, entry_range), path, true).await let input = input.wrap_multi(
|input| SeqReadAtAdapter::new(input, entry_range.clone()),
|(payload_input, range)| SeqReadAtAdapter::new(payload_input, range),
);
DecoderImpl::new_full(input, path, true).await
} }
// NOTE: This performs the Decoder::read_next_item() behavior! Keep in mind when changing! // NOTE: This performs the Decoder::read_next_item() behavior! Keep in mind when changing!
async fn get_decoder_at_filename<T: ReadAt>( async fn get_decoder_at_filename<T: ReadAt>(
input: T, input: PxarVariant<T, (T, Range<u64>)>,
entry_range: Range<u64>, entry_range: Range<u64>,
path: PathBuf, path: PathBuf,
) -> io::Result<(DecoderImpl<SeqReadAtAdapter<T>>, u64)> { ) -> io::Result<(DecoderImpl<SeqReadAtAdapter<T>>, u64)> {
// Read the header, it should be a FILENAME, then skip over it and its length: // Read the header, it should be a FILENAME, then skip over it and its length:
let header: format::Header = read_entry_at(&input, entry_range.start).await?; let header: format::Header = read_entry_at(input.archive(), entry_range.start).await?;
header.check_header_size()?; header.check_header_size()?;
if header.htype != format::PXAR_FILENAME { if header.htype != format::PXAR_FILENAME {
@ -293,6 +303,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
.next() .next()
.await .await
.ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??; .ok_or_else(|| io_format_err!("unexpected EOF while decoding file entry"))??;
Ok(FileEntryImpl { Ok(FileEntryImpl {
input: self.input.clone(), input: self.input.clone(),
entry, entry,
@ -303,7 +314,11 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
/// Allow opening arbitrary contents from a specific range. /// Allow opening arbitrary contents from a specific range.
pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContentsImpl<T> { pub unsafe fn open_contents_at_range(&self, range: Range<u64>) -> FileContentsImpl<T> {
FileContentsImpl::new(self.input.clone(), range) if let Some((payload_input, _)) = &self.input.payload() {
FileContentsImpl::new(payload_input.clone(), range)
} else {
FileContentsImpl::new(self.input.archive().clone(), range)
}
} }
/// Following a hardlink breaks a couple of conventions we otherwise have, particularly we will /// Following a hardlink breaks a couple of conventions we otherwise have, particularly we will
@ -342,6 +357,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
EntryKind::File { EntryKind::File {
offset: Some(offset), offset: Some(offset),
size, size,
..
} => { } => {
let meta_size = offset - link_offset; let meta_size = offset - link_offset;
let entry_end = link_offset + meta_size + size; let entry_end = link_offset + meta_size + size;
@ -362,7 +378,7 @@ impl<T: Clone + ReadAt> AccessorImpl<T> {
/// The directory random-access state machine implementation. /// The directory random-access state machine implementation.
pub(crate) struct DirectoryImpl<T> { pub(crate) struct DirectoryImpl<T> {
input: T, input: PxarVariant<T, (T, Range<u64>)>,
entry_ofs: u64, entry_ofs: u64,
goodbye_ofs: u64, goodbye_ofs: u64,
size: u64, size: u64,
@ -374,12 +390,12 @@ pub(crate) struct DirectoryImpl<T> {
impl<T: Clone + ReadAt> DirectoryImpl<T> { impl<T: Clone + ReadAt> DirectoryImpl<T> {
/// Open a directory ending at the specified position. /// Open a directory ending at the specified position.
async fn open_at_end( async fn open_at_end(
input: T, input: PxarVariant<T, (T, Range<u64>)>,
end_offset: u64, end_offset: u64,
path: PathBuf, path: PathBuf,
caches: Arc<Caches>, caches: Arc<Caches>,
) -> io::Result<DirectoryImpl<T>> { ) -> io::Result<DirectoryImpl<T>> {
let tail = Self::read_tail_entry(&input, end_offset).await?; let tail = Self::read_tail_entry(input.archive(), end_offset).await?;
if end_offset < tail.size { if end_offset < tail.size {
io_bail!("goodbye tail size out of range"); io_bail!("goodbye tail size out of range");
@ -434,7 +450,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
data.as_mut_ptr() as *mut u8, data.as_mut_ptr() as *mut u8,
len * size_of::<GoodbyeItem>(), len * size_of::<GoodbyeItem>(),
); );
read_exact_at(&self.input, slice, self.table_offset()).await?; read_exact_at(self.input.archive(), slice, self.table_offset()).await?;
} }
Ok(Arc::from(data)) Ok(Arc::from(data))
} }
@ -599,7 +615,8 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
let cursor = self.get_cursor(index).await?; let cursor = self.get_cursor(index).await?;
if cursor.file_name == path { if cursor.file_name == path {
return Ok(Some(cursor.decode_entry().await?)); let entry = cursor.decode_entry().await?;
return Ok(Some(entry));
} }
dup += 1; dup += 1;
@ -645,13 +662,13 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
} }
async fn read_filename_entry(&self, file_ofs: u64) -> io::Result<(PathBuf, u64)> { async fn read_filename_entry(&self, file_ofs: u64) -> io::Result<(PathBuf, u64)> {
let head: format::Header = read_entry_at(&self.input, file_ofs).await?; let head: format::Header = read_entry_at(self.input.archive(), file_ofs).await?;
if head.htype != format::PXAR_FILENAME { if head.htype != format::PXAR_FILENAME {
io_bail!("expected PXAR_FILENAME header, found: {}", head); io_bail!("expected PXAR_FILENAME header, found: {}", head);
} }
let mut path = read_exact_data_at( let mut path = read_exact_data_at(
&self.input, self.input.archive(),
head.content_size() as usize, head.content_size() as usize,
file_ofs + (size_of_val(&head) as u64), file_ofs + (size_of_val(&head) as u64),
) )
@ -681,7 +698,7 @@ impl<T: Clone + ReadAt> DirectoryImpl<T> {
/// A file entry retrieved from a Directory. /// A file entry retrieved from a Directory.
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct FileEntryImpl<T: Clone + ReadAt> { pub(crate) struct FileEntryImpl<T: Clone + ReadAt> {
input: T, input: PxarVariant<T, (T, Range<u64>)>,
entry: Entry, entry: Entry,
entry_range_info: EntryRangeInfo, entry_range_info: EntryRangeInfo,
caches: Arc<Caches>, caches: Arc<Caches>,
@ -711,15 +728,29 @@ impl<T: Clone + ReadAt> FileEntryImpl<T> {
EntryKind::File { EntryKind::File {
size, size,
offset: Some(offset), offset: Some(offset),
payload_offset: None,
} => Ok(Some(offset..(offset + size))), } => Ok(Some(offset..(offset + size))),
// Payload offset beats regular offset if some
EntryKind::File {
size,
offset: Some(_offset),
payload_offset: Some(payload_offset),
} => {
let start_offset = payload_offset + size_of::<format::Header>() as u64;
Ok(Some(start_offset..start_offset + size))
}
_ => Ok(None), _ => Ok(None),
} }
} }
pub async fn contents(&self) -> io::Result<FileContentsImpl<T>> { pub async fn contents(&self) -> io::Result<FileContentsImpl<T>> {
match self.content_range()? { let range = self
Some(range) => Ok(FileContentsImpl::new(self.input.clone(), range)), .content_range()?
None => io_bail!("not a file"), .ok_or_else(|| io_format_err!("not a file"))?;
if let Some((ref payload_input, _)) = self.input.payload() {
Ok(FileContentsImpl::new(payload_input.clone(), range))
} else {
Ok(FileContentsImpl::new(self.input.archive().clone(), range))
} }
} }

View File

@ -12,7 +12,7 @@ use crate::accessor::{self, cache::Cache, MaybeReady, ReadAt, ReadAtOperation};
use crate::decoder::Decoder; use crate::decoder::Decoder;
use crate::format::GoodbyeItem; use crate::format::GoodbyeItem;
use crate::util::poll_result_once; use crate::util::poll_result_once;
use crate::Entry; use crate::{Entry, PxarVariant};
/// Blocking `pxar` random-access decoder. /// Blocking `pxar` random-access decoder.
/// ///
@ -31,7 +31,7 @@ impl<T: FileExt> Accessor<FileReader<T>> {
/// Decode a `pxar` archive from a standard file implementing `FileExt`. /// Decode a `pxar` archive from a standard file implementing `FileExt`.
#[inline] #[inline]
pub fn from_file_and_size(input: T, size: u64) -> io::Result<Self> { pub fn from_file_and_size(input: T, size: u64) -> io::Result<Self> {
Accessor::new(FileReader::new(input), size) Accessor::new(PxarVariant::Unified(FileReader::new(input)), size)
} }
} }
@ -64,7 +64,7 @@ where
{ {
/// Open an `Arc` or `Rc` of `File`. /// Open an `Arc` or `Rc` of `File`.
pub fn from_file_ref_and_size(input: T, size: u64) -> io::Result<Accessor<FileRefReader<T>>> { pub fn from_file_ref_and_size(input: T, size: u64) -> io::Result<Accessor<FileRefReader<T>>> {
Accessor::new(FileRefReader::new(input), size) Accessor::new(PxarVariant::Unified(FileRefReader::new(input)), size)
} }
} }
@ -74,7 +74,7 @@ impl<T: ReadAt> Accessor<T> {
/// ///
/// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is
/// not allowed to use the `Waker`, as this will cause a `panic!`. /// not allowed to use the `Waker`, as this will cause a `panic!`.
pub fn new(input: T, size: u64) -> io::Result<Self> { pub fn new(input: PxarVariant<T, (T, u64)>, size: u64) -> io::Result<Self> {
Ok(Self { Ok(Self {
inner: poll_result_once(accessor::AccessorImpl::new(input, size))?, inner: poll_result_once(accessor::AccessorImpl::new(input, size))?,
}) })

View File

@ -6,7 +6,7 @@ use std::io;
use std::path::Path; use std::path::Path;
use crate::decoder::{self, Contents, SeqRead}; use crate::decoder::{self, Contents, SeqRead};
use crate::Entry; use crate::{Entry, PxarVariant};
/// Asynchronous `pxar` decoder. /// Asynchronous `pxar` decoder.
/// ///
@ -20,8 +20,8 @@ pub struct Decoder<T> {
impl<T: tokio::io::AsyncRead> Decoder<TokioReader<T>> { impl<T: tokio::io::AsyncRead> Decoder<TokioReader<T>> {
/// Decode a `pxar` archive from a `tokio::io::AsyncRead` input. /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input.
#[inline] #[inline]
pub async fn from_tokio(input: T) -> io::Result<Self> { pub async fn from_tokio(input: PxarVariant<T, T>) -> io::Result<Self> {
Decoder::new(TokioReader::new(input)).await Decoder::new(input.wrap(|input| TokioReader::new(input))).await
} }
} }
@ -30,13 +30,16 @@ impl Decoder<TokioReader<tokio::fs::File>> {
/// Decode a `pxar` archive from a `tokio::io::AsyncRead` input. /// Decode a `pxar` archive from a `tokio::io::AsyncRead` input.
#[inline] #[inline]
pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> { pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> {
Decoder::from_tokio(tokio::fs::File::open(path.as_ref()).await?).await Decoder::from_tokio(PxarVariant::Unified(
tokio::fs::File::open(path.as_ref()).await?,
))
.await
} }
} }
impl<T: SeqRead> Decoder<T> { impl<T: SeqRead> Decoder<T> {
/// Create an async decoder from an input implementing our internal read interface. /// Create an async decoder from an input implementing our internal read interface.
pub async fn new(input: T) -> io::Result<Self> { pub async fn new(input: PxarVariant<T, T>) -> io::Result<Self> {
Ok(Self { Ok(Self {
inner: decoder::DecoderImpl::new(input).await?, inner: decoder::DecoderImpl::new(input).await?,
}) })

View File

@ -19,7 +19,7 @@ use endian_trait::Endian;
use crate::format::{self, Header}; use crate::format::{self, Header};
use crate::util::{self, io_err_other}; use crate::util::{self, io_err_other};
use crate::{Entry, EntryKind, Metadata}; use crate::{Entry, EntryKind, Metadata, PxarVariant};
pub mod aio; pub mod aio;
pub mod sync; pub mod sync;
@ -150,13 +150,16 @@ async fn seq_read_entry<T: SeqRead + ?Sized, E: Endian>(input: &mut T) -> io::Re
/// We use `async fn` to implement the decoder state machine so that we can easily plug in both /// We use `async fn` to implement the decoder state machine so that we can easily plug in both
/// synchronous or `async` I/O objects in as input. /// synchronous or `async` I/O objects in as input.
pub(crate) struct DecoderImpl<T> { pub(crate) struct DecoderImpl<T> {
pub(crate) input: T, // Payload of regular files might be provided by a different reader
pub(crate) input: PxarVariant<T, T>,
current_header: Header, current_header: Header,
entry: Entry, entry: Entry,
path_lengths: Vec<usize>, path_lengths: Vec<usize>,
state: State, state: State,
with_goodbye_tables: bool, with_goodbye_tables: bool,
payload_consumed: u64,
/// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for /// The random access code uses decoders for sub-ranges which may not end in a `PAYLOAD` for
/// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF. /// entries like FIFOs or sockets, so there we explicitly allow an item to terminate with EOF.
eof_after_entry: bool, eof_after_entry: bool,
@ -167,6 +170,7 @@ enum State {
Default, Default,
InPayload { InPayload {
offset: u64, offset: u64,
size: u64,
}, },
/// file entries with no data (fifo, socket) /// file entries with no data (fifo, socket)
@ -195,16 +199,16 @@ pub(crate) enum ItemResult {
} }
impl<I: SeqRead> DecoderImpl<I> { impl<I: SeqRead> DecoderImpl<I> {
pub async fn new(input: I) -> io::Result<Self> { pub async fn new(input: PxarVariant<I, I>) -> io::Result<Self> {
Self::new_full(input, "/".into(), false).await Self::new_full(input, "/".into(), false).await
} }
pub(crate) fn input(&self) -> &I { pub(crate) fn input(&self) -> &I {
&self.input self.input.archive()
} }
pub(crate) async fn new_full( pub(crate) async fn new_full(
input: I, input: PxarVariant<I, I>,
path: PathBuf, path: PathBuf,
eof_after_entry: bool, eof_after_entry: bool,
) -> io::Result<Self> { ) -> io::Result<Self> {
@ -219,6 +223,7 @@ impl<I: SeqRead> DecoderImpl<I> {
path_lengths: Vec::new(), path_lengths: Vec::new(),
state: State::Begin, state: State::Begin,
with_goodbye_tables: false, with_goodbye_tables: false,
payload_consumed: 0,
eof_after_entry, eof_after_entry,
}; };
@ -242,9 +247,14 @@ impl<I: SeqRead> DecoderImpl<I> {
// hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE: // hierarchy and parse the next PXAR_FILENAME or the PXAR_GOODBYE:
self.read_next_item().await?; self.read_next_item().await?;
} }
State::InPayload { offset } => { State::InPayload { offset, .. } => {
// We need to skip the current payload first. if self.input.payload().is_some() {
// Update consumed payload as given by the offset referenced by the content reader
self.payload_consumed += offset;
} else {
// Skip remaining payload of current entry in regular stream
self.skip_entry(offset).await?; self.skip_entry(offset).await?;
}
self.read_next_item().await?; self.read_next_item().await?;
} }
State::InGoodbyeTable => { State::InGoodbyeTable => {
@ -300,20 +310,28 @@ impl<I: SeqRead> DecoderImpl<I> {
} }
pub fn content_size(&self) -> Option<u64> { pub fn content_size(&self) -> Option<u64> {
if let State::InPayload { .. } = self.state { if let State::InPayload { size, .. } = self.state {
if self.input.payload().is_some() {
Some(size)
} else {
Some(self.current_header.content_size()) Some(self.current_header.content_size())
}
} else { } else {
None None
} }
} }
pub fn content_reader(&mut self) -> Option<Contents<I>> { pub fn content_reader(&mut self) -> Option<Contents<I>> {
if let State::InPayload { offset } = &mut self.state { if let State::InPayload { offset, size } = &mut self.state {
if self.input.payload().is_some() {
Some(Contents::new( Some(Contents::new(
&mut self.input, self.input.payload_mut().unwrap(),
offset, offset,
self.current_header.content_size(), *size,
)) ))
} else {
Some(Contents::new(self.input.archive_mut(), offset, *size))
}
} else { } else {
None None
} }
@ -357,7 +375,7 @@ impl<I: SeqRead> DecoderImpl<I> {
self.state = State::Default; self.state = State::Default;
self.entry.clear_data(); self.entry.clear_data();
let header: Header = match seq_read_entry_or_eof(&mut self.input).await? { let header: Header = match seq_read_entry_or_eof(self.input.archive_mut()).await? {
None => return Ok(None), None => return Ok(None),
Some(header) => header, Some(header) => header,
}; };
@ -377,11 +395,11 @@ impl<I: SeqRead> DecoderImpl<I> {
} else if header.htype == format::PXAR_ENTRY || header.htype == format::PXAR_ENTRY_V1 { } else if header.htype == format::PXAR_ENTRY || header.htype == format::PXAR_ENTRY_V1 {
if header.htype == format::PXAR_ENTRY { if header.htype == format::PXAR_ENTRY {
self.entry.metadata = Metadata { self.entry.metadata = Metadata {
stat: seq_read_entry(&mut self.input).await?, stat: seq_read_entry(self.input.archive_mut()).await?,
..Default::default() ..Default::default()
}; };
} else if header.htype == format::PXAR_ENTRY_V1 { } else if header.htype == format::PXAR_ENTRY_V1 {
let stat: format::Stat_V1 = seq_read_entry(&mut self.input).await?; let stat: format::Stat_V1 = seq_read_entry(self.input.archive_mut()).await?;
self.entry.metadata = Metadata { self.entry.metadata = Metadata {
stat: stat.into(), stat: stat.into(),
@ -457,7 +475,7 @@ impl<I: SeqRead> DecoderImpl<I> {
) )
}; };
match seq_read_exact_or_eof(&mut self.input, dest).await? { match seq_read_exact_or_eof(self.input.archive_mut(), dest).await? {
Some(()) => { Some(()) => {
self.current_header.check_header_size()?; self.current_header.check_header_size()?;
Ok(Some(())) Ok(Some(()))
@ -527,12 +545,71 @@ impl<I: SeqRead> DecoderImpl<I> {
return Ok(ItemResult::Entry); return Ok(ItemResult::Entry);
} }
format::PXAR_PAYLOAD => { format::PXAR_PAYLOAD => {
let offset = seq_read_position(&mut self.input).await.transpose()?; let offset = seq_read_position(self.input.archive_mut())
.await
.transpose()?;
self.entry.kind = EntryKind::File { self.entry.kind = EntryKind::File {
size: self.current_header.content_size(), size: self.current_header.content_size(),
offset, offset,
payload_offset: None,
};
self.state = State::InPayload {
offset: 0,
size: self.current_header.content_size(),
};
return Ok(ItemResult::Entry);
}
format::PXAR_PAYLOAD_REF => {
let offset = seq_read_position(self.input.archive_mut())
.await
.transpose()?;
let payload_ref = self.read_payload_ref().await?;
if let Some(payload_input) = self.input.payload_mut() {
if seq_read_position(payload_input)
.await
.transpose()?
.is_none()
{
if self.payload_consumed > payload_ref.offset {
io_bail!(
"unexpected offset {}, smaller than already consumed payload {}",
payload_ref.offset,
self.payload_consumed,
);
}
let to_skip = payload_ref.offset - self.payload_consumed;
Self::skip(payload_input, to_skip as usize).await?;
self.payload_consumed += to_skip;
}
let header: Header = seq_read_entry(payload_input).await?;
if header.htype != format::PXAR_PAYLOAD {
io_bail!(
"unexpected header in payload input: expected {} , got {header}",
format::PXAR_PAYLOAD,
);
}
self.payload_consumed += size_of::<Header>() as u64;
if header.content_size() != payload_ref.size {
io_bail!(
"encountered payload size mismatch: got {}, expected {}",
payload_ref.size,
header.content_size(),
);
}
}
self.entry.kind = EntryKind::File {
size: payload_ref.size,
offset,
payload_offset: Some(payload_ref.offset),
};
self.state = State::InPayload {
offset: 0,
size: payload_ref.size,
}; };
self.state = State::InPayload { offset: 0 };
return Ok(ItemResult::Entry); return Ok(ItemResult::Entry);
} }
format::PXAR_FILENAME | format::PXAR_GOODBYE => { format::PXAR_FILENAME | format::PXAR_GOODBYE => {
@ -564,7 +641,7 @@ impl<I: SeqRead> DecoderImpl<I> {
async fn skip_entry(&mut self, offset: u64) -> io::Result<()> { async fn skip_entry(&mut self, offset: u64) -> io::Result<()> {
let len = (self.current_header.content_size() - offset) as usize; let len = (self.current_header.content_size() - offset) as usize;
Self::skip(&mut self.input, len).await Self::skip(self.input.archive_mut(), len).await
} }
async fn skip(input: &mut I, mut len: usize) -> io::Result<()> { async fn skip(input: &mut I, mut len: usize) -> io::Result<()> {
@ -581,7 +658,7 @@ impl<I: SeqRead> DecoderImpl<I> {
async fn read_entry_as_bytes(&mut self) -> io::Result<Vec<u8>> { async fn read_entry_as_bytes(&mut self) -> io::Result<Vec<u8>> {
let size = usize::try_from(self.current_header.content_size()).map_err(io_err_other)?; let size = usize::try_from(self.current_header.content_size()).map_err(io_err_other)?;
let data = seq_read_exact_data(&mut self.input, size).await?; let data = seq_read_exact_data(self.input.archive_mut(), size).await?;
Ok(data) Ok(data)
} }
@ -598,7 +675,7 @@ impl<I: SeqRead> DecoderImpl<I> {
size_of::<T>(), size_of::<T>(),
); );
} }
seq_read_entry(&mut self.input).await seq_read_entry(self.input.archive_mut()).await
} }
// //
@ -630,8 +707,8 @@ impl<I: SeqRead> DecoderImpl<I> {
} }
let data_size = content_size - size_of::<u64>(); let data_size = content_size - size_of::<u64>();
let offset: u64 = seq_read_entry(&mut self.input).await?; let offset: u64 = seq_read_entry(self.input.archive_mut()).await?;
let data = seq_read_exact_data(&mut self.input, data_size).await?; let data = seq_read_exact_data(self.input.archive_mut(), data_size).await?;
Ok(format::Hardlink { offset, data }) Ok(format::Hardlink { offset, data })
} }
@ -667,7 +744,7 @@ impl<I: SeqRead> DecoderImpl<I> {
async fn read_payload_ref(&mut self) -> io::Result<format::PayloadRef> { async fn read_payload_ref(&mut self) -> io::Result<format::PayloadRef> {
self.current_header.check_header_size()?; self.current_header.check_header_size()?;
seq_read_entry(&mut self.input).await seq_read_entry(self.input.archive_mut()).await
} }
} }

View File

@ -7,7 +7,7 @@ use std::task::{Context, Poll};
use crate::decoder::{self, SeqRead}; use crate::decoder::{self, SeqRead};
use crate::util::poll_result_once; use crate::util::poll_result_once;
use crate::Entry; use crate::{Entry, PxarVariant};
/// Blocking `pxar` decoder. /// Blocking `pxar` decoder.
/// ///
@ -25,8 +25,8 @@ pub struct Decoder<T> {
impl<T: io::Read> Decoder<StandardReader<T>> { impl<T: io::Read> Decoder<StandardReader<T>> {
/// Decode a `pxar` archive from a regular `std::io::Read` input. /// Decode a `pxar` archive from a regular `std::io::Read` input.
#[inline] #[inline]
pub fn from_std(input: T) -> io::Result<Self> { pub fn from_std(input: PxarVariant<T, T>) -> io::Result<Self> {
Decoder::new(StandardReader::new(input)) Decoder::new(input.wrap(|i| StandardReader::new(i)))
} }
/// Get a direct reference to the reader contained inside the contained [`StandardReader`]. /// Get a direct reference to the reader contained inside the contained [`StandardReader`].
@ -37,8 +37,15 @@ impl<T: io::Read> Decoder<StandardReader<T>> {
impl Decoder<StandardReader<std::fs::File>> { impl Decoder<StandardReader<std::fs::File>> {
/// Convenience shortcut for `File::open` followed by `Accessor::from_file`. /// Convenience shortcut for `File::open` followed by `Accessor::from_file`.
pub fn open<P: AsRef<Path>>(path: P) -> io::Result<Self> { pub fn open<P: AsRef<Path>>(path: PxarVariant<P, P>) -> io::Result<Self> {
Self::from_std(std::fs::File::open(path.as_ref())?) let input = match path {
PxarVariant::Split(input, payload_input) => PxarVariant::Split(
std::fs::File::open(input)?,
std::fs::File::open(payload_input)?,
),
PxarVariant::Unified(input) => PxarVariant::Unified(std::fs::File::open(input)?),
};
Self::from_std(input)
} }
} }
@ -47,7 +54,9 @@ impl<T: SeqRead> Decoder<T> {
/// ///
/// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is /// Note that the `input`'s `SeqRead` implementation must always return `Poll::Ready` and is
/// not allowed to use the `Waker`, as this will cause a `panic!`. /// not allowed to use the `Waker`, as this will cause a `panic!`.
pub fn new(input: T) -> io::Result<Self> { /// The optional payload input must be used to restore regular file payloads for payload references
/// encountered within the archive.
pub fn new(input: PxarVariant<T, T>) -> io::Result<Self> {
Ok(Self { Ok(Self {
inner: poll_result_once(decoder::DecoderImpl::new(input))?, inner: poll_result_once(decoder::DecoderImpl::new(input))?,
}) })

View File

@ -364,6 +364,9 @@ pub enum EntryKind {
/// The file's byte offset inside the archive, if available. /// The file's byte offset inside the archive, if available.
offset: Option<u64>, offset: Option<u64>,
/// The file's byte offset inside the payload stream, if available.
payload_offset: Option<u64>,
}, },
/// Directory entry. When iterating through an archive, the contents follow next. /// Directory entry. When iterating through an archive, the contents follow next.

View File

@ -94,7 +94,8 @@ fn create_archive() -> io::Result<Vec<u8>> {
fn test_archive() { fn test_archive() {
let archive = create_archive().expect("failed to create test archive"); let archive = create_archive().expect("failed to create test archive");
let mut input = &archive[..]; let mut input = &archive[..];
let mut decoder = decoder::Decoder::from_std(&mut input).expect("failed to create decoder"); let mut decoder = decoder::Decoder::from_std(pxar::PxarVariant::Unified(&mut input))
.expect("failed to create decoder");
let item = decoder let item = decoder
.next() .next()

View File

@ -61,13 +61,15 @@ fn test1() {
// std::fs::write("myarchive.pxar", &file).expect("failed to write out test archive"); // std::fs::write("myarchive.pxar", &file).expect("failed to write out test archive");
let mut input = &file[..]; let mut input = &file[..];
let mut decoder = decoder::Decoder::from_std(&mut input).expect("failed to create decoder"); let mut decoder = decoder::Decoder::from_std(pxar::PxarVariant::Unified(&mut input))
.expect("failed to create decoder");
let decoded_fs = let decoded_fs =
fs::Entry::decode_from(&mut decoder).expect("failed to decode previously encoded archive"); fs::Entry::decode_from(&mut decoder).expect("failed to decode previously encoded archive");
assert_eq!(test_fs, decoded_fs); assert_eq!(test_fs, decoded_fs);
let accessor = accessor::Accessor::new(&file[..], file.len() as u64) let accessor =
accessor::Accessor::new(pxar::PxarVariant::Unified(&file[..]), file.len() as u64)
.expect("failed to create random access reader for encoded archive"); .expect("failed to create random access reader for encoded archive");
check_bunzip2(&accessor); check_bunzip2(&accessor);