asyncify pxar create_archive
...to take advantage of the aio::Encoder from the pxar create. Rather straightforward conversion, but does require getting rid of references in the Archiver struct, and thus has to be given the Mutex for the catalog directly. The callback is boxed. archive_dir_contents can call itself recursively, and thus needs to return a boxed future. Users are adjusted, namely PxarBackupStream is converted to use an Abortable future instead of a thread so it supports async in its handler function, and the pxar bin create_archive is converted to an async API function. One test case is made to just use 'block_on'. Signed-off-by: Stefan Reiter <s.reiter@proxmox.com> Signed-off-by: Wolfgang Bumiller <w.bumiller@proxmox.com>
This commit is contained in:
parent
a42212fc1e
commit
6afb60abf5
@ -295,7 +295,7 @@ fn extract_archive(
|
||||
)]
|
||||
/// Create a new .pxar archive.
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn create_archive(
|
||||
async fn create_archive(
|
||||
archive: String,
|
||||
source: String,
|
||||
verbose: bool,
|
||||
@ -376,7 +376,7 @@ fn create_archive(
|
||||
dir,
|
||||
writer,
|
||||
feature_flags,
|
||||
|path| {
|
||||
move |path| {
|
||||
if verbose {
|
||||
println!("{:?}", path);
|
||||
}
|
||||
@ -384,7 +384,7 @@ fn create_archive(
|
||||
},
|
||||
None,
|
||||
options,
|
||||
)?;
|
||||
).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -4,10 +4,10 @@ use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll};
|
||||
use std::thread;
|
||||
|
||||
use anyhow::{format_err, Error};
|
||||
use futures::stream::Stream;
|
||||
use futures::future::{Abortable, AbortHandle};
|
||||
use nix::dir::Dir;
|
||||
use nix::fcntl::OFlag;
|
||||
use nix::sys::stat::Mode;
|
||||
@ -21,14 +21,14 @@ use crate::backup::CatalogWriter;
|
||||
/// consumer.
|
||||
pub struct PxarBackupStream {
|
||||
rx: Option<std::sync::mpsc::Receiver<Result<Vec<u8>, Error>>>,
|
||||
child: Option<thread::JoinHandle<()>>,
|
||||
handle: Option<AbortHandle>,
|
||||
error: Arc<Mutex<Option<String>>>,
|
||||
}
|
||||
|
||||
impl Drop for PxarBackupStream {
|
||||
fn drop(&mut self) {
|
||||
self.rx = None;
|
||||
self.child.take().unwrap().join().unwrap();
|
||||
self.handle.take().unwrap().abort();
|
||||
}
|
||||
}
|
||||
|
||||
@ -43,12 +43,8 @@ impl PxarBackupStream {
|
||||
let buffer_size = 256 * 1024;
|
||||
|
||||
let error = Arc::new(Mutex::new(None));
|
||||
let child = std::thread::Builder::new()
|
||||
.name("PxarBackupStream".to_string())
|
||||
.spawn({
|
||||
let error = Arc::clone(&error);
|
||||
move || {
|
||||
let mut catalog_guard = catalog.lock().unwrap();
|
||||
let error2 = Arc::clone(&error);
|
||||
let handler = async move {
|
||||
let writer = std::io::BufWriter::with_capacity(
|
||||
buffer_size,
|
||||
crate::tools::StdChannelWriter::new(tx),
|
||||
@ -61,24 +57,27 @@ impl PxarBackupStream {
|
||||
dir,
|
||||
writer,
|
||||
crate::pxar::Flags::DEFAULT,
|
||||
|path| {
|
||||
move |path| {
|
||||
if verbose {
|
||||
println!("{:?}", path);
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
Some(&mut *catalog_guard),
|
||||
Some(catalog),
|
||||
options,
|
||||
) {
|
||||
let mut error = error.lock().unwrap();
|
||||
).await {
|
||||
let mut error = error2.lock().unwrap();
|
||||
*error = Some(err.to_string());
|
||||
}
|
||||
}
|
||||
})?;
|
||||
};
|
||||
|
||||
let (handle, registration) = AbortHandle::new_pair();
|
||||
let future = Abortable::new(handler, registration);
|
||||
tokio::spawn(future);
|
||||
|
||||
Ok(Self {
|
||||
rx: Some(rx),
|
||||
child: Some(child),
|
||||
handle: Some(handle),
|
||||
error,
|
||||
})
|
||||
}
|
||||
|
@ -5,16 +5,19 @@ use std::io::{self, Read, Write};
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use anyhow::{bail, format_err, Error};
|
||||
use nix::dir::Dir;
|
||||
use nix::errno::Errno;
|
||||
use nix::fcntl::OFlag;
|
||||
use nix::sys::stat::{FileStat, Mode};
|
||||
use futures::future::BoxFuture;
|
||||
use futures::FutureExt;
|
||||
|
||||
use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
|
||||
use pxar::Metadata;
|
||||
use pxar::encoder::LinkOffset;
|
||||
use pxar::encoder::{SeqWrite, LinkOffset};
|
||||
|
||||
use proxmox::c_str;
|
||||
use proxmox::sys::error::SysError;
|
||||
@ -129,13 +132,13 @@ impl std::io::Write for ErrorReporter {
|
||||
}
|
||||
}
|
||||
|
||||
struct Archiver<'a, 'b> {
|
||||
struct Archiver {
|
||||
feature_flags: Flags,
|
||||
fs_feature_flags: Flags,
|
||||
fs_magic: i64,
|
||||
patterns: Vec<MatchEntry>,
|
||||
callback: &'a mut dyn FnMut(&Path) -> Result<(), Error>,
|
||||
catalog: Option<&'b mut dyn BackupCatalogWriter>,
|
||||
callback: Box<dyn FnMut(&Path) -> Result<(), Error> + Send>,
|
||||
catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
|
||||
path: PathBuf,
|
||||
entry_counter: usize,
|
||||
entry_limit: usize,
|
||||
@ -147,19 +150,19 @@ struct Archiver<'a, 'b> {
|
||||
file_copy_buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
type Encoder<'a, 'b> = pxar::encoder::Encoder<'a, &'b mut dyn pxar::encoder::SeqWrite>;
|
||||
type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
|
||||
|
||||
pub fn create_archive<T, F>(
|
||||
pub async fn create_archive<T, F>(
|
||||
source_dir: Dir,
|
||||
mut writer: T,
|
||||
feature_flags: Flags,
|
||||
mut callback: F,
|
||||
catalog: Option<&mut dyn BackupCatalogWriter>,
|
||||
callback: F,
|
||||
catalog: Option<Arc<Mutex<dyn BackupCatalogWriter + Send>>>,
|
||||
options: PxarCreateOptions,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
T: pxar::encoder::SeqWrite,
|
||||
F: FnMut(&Path) -> Result<(), Error>,
|
||||
T: SeqWrite + Send,
|
||||
F: FnMut(&Path) -> Result<(), Error> + Send + 'static,
|
||||
{
|
||||
let fs_magic = detect_fs_type(source_dir.as_raw_fd())?;
|
||||
if is_virtual_file_system(fs_magic) {
|
||||
@ -182,8 +185,7 @@ where
|
||||
set.insert(stat.st_dev);
|
||||
}
|
||||
|
||||
let writer = &mut writer as &mut dyn pxar::encoder::SeqWrite;
|
||||
let mut encoder = Encoder::new(writer, &metadata)?;
|
||||
let mut encoder = Encoder::new(&mut writer, &metadata).await?;
|
||||
|
||||
let mut patterns = options.patterns;
|
||||
|
||||
@ -199,7 +201,7 @@ where
|
||||
feature_flags,
|
||||
fs_feature_flags,
|
||||
fs_magic,
|
||||
callback: &mut callback,
|
||||
callback: Box::new(callback),
|
||||
patterns,
|
||||
catalog,
|
||||
path: PathBuf::new(),
|
||||
@ -213,8 +215,8 @@ where
|
||||
file_copy_buffer: vec::undefined(4 * 1024 * 1024),
|
||||
};
|
||||
|
||||
archiver.archive_dir_contents(&mut encoder, source_dir, true)?;
|
||||
encoder.finish()?;
|
||||
archiver.archive_dir_contents(&mut encoder, source_dir, true).await?;
|
||||
encoder.finish().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -224,7 +226,7 @@ struct FileListEntry {
|
||||
stat: FileStat,
|
||||
}
|
||||
|
||||
impl<'a, 'b> Archiver<'a, 'b> {
|
||||
impl Archiver {
|
||||
/// Get the currently effective feature flags. (Requested flags masked by the file system
|
||||
/// feature flags).
|
||||
fn flags(&self) -> Flags {
|
||||
@ -239,12 +241,13 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
}
|
||||
}
|
||||
|
||||
fn archive_dir_contents(
|
||||
&mut self,
|
||||
encoder: &mut Encoder,
|
||||
fn archive_dir_contents<'a, 'b, T: SeqWrite + Send>(
|
||||
&'a mut self,
|
||||
encoder: &'a mut Encoder<'b, T>,
|
||||
mut dir: Dir,
|
||||
is_root: bool,
|
||||
) -> Result<(), Error> {
|
||||
) -> BoxFuture<'a, Result<(), Error>> {
|
||||
async move {
|
||||
let entry_counter = self.entry_counter;
|
||||
|
||||
let old_patterns_count = self.patterns.len();
|
||||
@ -268,13 +271,13 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
let file_name = file_entry.name.to_bytes();
|
||||
|
||||
if is_root && file_name == b".pxarexclude-cli" {
|
||||
self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)?;
|
||||
self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count).await?;
|
||||
continue;
|
||||
}
|
||||
|
||||
(self.callback)(&file_entry.path)?;
|
||||
self.path = file_entry.path;
|
||||
self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat)
|
||||
self.add_entry(encoder, dir_fd, &file_entry.name, &file_entry.stat).await
|
||||
.map_err(|err| self.wrap_err(err))?;
|
||||
}
|
||||
self.path = old_path;
|
||||
@ -282,6 +285,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
self.patterns.truncate(old_patterns_count);
|
||||
|
||||
Ok(())
|
||||
}.boxed()
|
||||
}
|
||||
|
||||
/// openat() wrapper which allows but logs `EACCES` and turns `ENOENT` into `None`.
|
||||
@ -396,23 +400,22 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn encode_pxarexclude_cli(
|
||||
async fn encode_pxarexclude_cli<T: SeqWrite + Send>(
|
||||
&mut self,
|
||||
encoder: &mut Encoder,
|
||||
encoder: &mut Encoder<'_, T>,
|
||||
file_name: &CStr,
|
||||
patterns_count: usize,
|
||||
) -> Result<(), Error> {
|
||||
let content = generate_pxar_excludes_cli(&self.patterns[..patterns_count]);
|
||||
|
||||
if let Some(ref mut catalog) = self.catalog {
|
||||
catalog.add_file(file_name, content.len() as u64, 0)?;
|
||||
if let Some(ref catalog) = self.catalog {
|
||||
catalog.lock().unwrap().add_file(file_name, content.len() as u64, 0)?;
|
||||
}
|
||||
|
||||
let mut metadata = Metadata::default();
|
||||
metadata.stat.mode = pxar::format::mode::IFREG | 0o600;
|
||||
|
||||
let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64)?;
|
||||
file.write_all(&content)?;
|
||||
let mut file = encoder.create_file(&metadata, ".pxarexclude-cli", content.len() as u64).await?;
|
||||
file.write_all(&content).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -502,9 +505,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_entry(
|
||||
async fn add_entry<T: SeqWrite + Send>(
|
||||
&mut self,
|
||||
encoder: &mut Encoder,
|
||||
encoder: &mut Encoder<'_, T>,
|
||||
parent: RawFd,
|
||||
c_file_name: &CStr,
|
||||
stat: &FileStat,
|
||||
@ -550,23 +553,23 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
|
||||
if stat.st_nlink > 1 {
|
||||
if let Some((path, offset)) = self.hardlinks.get(&link_info) {
|
||||
if let Some(ref mut catalog) = self.catalog {
|
||||
catalog.add_hardlink(c_file_name)?;
|
||||
if let Some(ref catalog) = self.catalog {
|
||||
catalog.lock().unwrap().add_hardlink(c_file_name)?;
|
||||
}
|
||||
|
||||
encoder.add_hardlink(file_name, path, *offset)?;
|
||||
encoder.add_hardlink(file_name, path, *offset).await?;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
let file_size = stat.st_size as u64;
|
||||
if let Some(ref mut catalog) = self.catalog {
|
||||
catalog.add_file(c_file_name, file_size, stat.st_mtime)?;
|
||||
if let Some(ref catalog) = self.catalog {
|
||||
catalog.lock().unwrap().add_file(c_file_name, file_size, stat.st_mtime)?;
|
||||
}
|
||||
|
||||
let offset: LinkOffset =
|
||||
self.add_regular_file(encoder, fd, file_name, &metadata, file_size)?;
|
||||
self.add_regular_file(encoder, fd, file_name, &metadata, file_size).await?;
|
||||
|
||||
if stat.st_nlink > 1 {
|
||||
self.hardlinks.insert(link_info, (self.path.clone(), offset));
|
||||
@ -577,49 +580,49 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
mode::IFDIR => {
|
||||
let dir = Dir::from_fd(fd.into_raw_fd())?;
|
||||
|
||||
if let Some(ref mut catalog) = self.catalog {
|
||||
catalog.start_directory(c_file_name)?;
|
||||
if let Some(ref catalog) = self.catalog {
|
||||
catalog.lock().unwrap().start_directory(c_file_name)?;
|
||||
}
|
||||
let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat);
|
||||
if let Some(ref mut catalog) = self.catalog {
|
||||
catalog.end_directory()?;
|
||||
let result = self.add_directory(encoder, dir, c_file_name, &metadata, stat).await;
|
||||
if let Some(ref catalog) = self.catalog {
|
||||
catalog.lock().unwrap().end_directory()?;
|
||||
}
|
||||
result
|
||||
}
|
||||
mode::IFSOCK => {
|
||||
if let Some(ref mut catalog) = self.catalog {
|
||||
catalog.add_socket(c_file_name)?;
|
||||
if let Some(ref catalog) = self.catalog {
|
||||
catalog.lock().unwrap().add_socket(c_file_name)?;
|
||||
}
|
||||
|
||||
Ok(encoder.add_socket(&metadata, file_name)?)
|
||||
Ok(encoder.add_socket(&metadata, file_name).await?)
|
||||
}
|
||||
mode::IFIFO => {
|
||||
if let Some(ref mut catalog) = self.catalog {
|
||||
catalog.add_fifo(c_file_name)?;
|
||||
if let Some(ref catalog) = self.catalog {
|
||||
catalog.lock().unwrap().add_fifo(c_file_name)?;
|
||||
}
|
||||
|
||||
Ok(encoder.add_fifo(&metadata, file_name)?)
|
||||
Ok(encoder.add_fifo(&metadata, file_name).await?)
|
||||
}
|
||||
mode::IFLNK => {
|
||||
if let Some(ref mut catalog) = self.catalog {
|
||||
catalog.add_symlink(c_file_name)?;
|
||||
if let Some(ref catalog) = self.catalog {
|
||||
catalog.lock().unwrap().add_symlink(c_file_name)?;
|
||||
}
|
||||
|
||||
self.add_symlink(encoder, fd, file_name, &metadata)
|
||||
self.add_symlink(encoder, fd, file_name, &metadata).await
|
||||
}
|
||||
mode::IFBLK => {
|
||||
if let Some(ref mut catalog) = self.catalog {
|
||||
catalog.add_block_device(c_file_name)?;
|
||||
if let Some(ref catalog) = self.catalog {
|
||||
catalog.lock().unwrap().add_block_device(c_file_name)?;
|
||||
}
|
||||
|
||||
self.add_device(encoder, file_name, &metadata, &stat)
|
||||
self.add_device(encoder, file_name, &metadata, &stat).await
|
||||
}
|
||||
mode::IFCHR => {
|
||||
if let Some(ref mut catalog) = self.catalog {
|
||||
catalog.add_char_device(c_file_name)?;
|
||||
if let Some(ref catalog) = self.catalog {
|
||||
catalog.lock().unwrap().add_char_device(c_file_name)?;
|
||||
}
|
||||
|
||||
self.add_device(encoder, file_name, &metadata, &stat)
|
||||
self.add_device(encoder, file_name, &metadata, &stat).await
|
||||
}
|
||||
other => bail!(
|
||||
"encountered unknown file type: 0x{:x} (0o{:o})",
|
||||
@ -629,9 +632,9 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
}
|
||||
}
|
||||
|
||||
fn add_directory(
|
||||
async fn add_directory<T: SeqWrite + Send>(
|
||||
&mut self,
|
||||
encoder: &mut Encoder,
|
||||
encoder: &mut Encoder<'_, T>,
|
||||
dir: Dir,
|
||||
dir_name: &CStr,
|
||||
metadata: &Metadata,
|
||||
@ -639,7 +642,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
) -> Result<(), Error> {
|
||||
let dir_name = OsStr::from_bytes(dir_name.to_bytes());
|
||||
|
||||
let mut encoder = encoder.create_directory(dir_name, &metadata)?;
|
||||
let mut encoder = encoder.create_directory(dir_name, &metadata).await?;
|
||||
|
||||
let old_fs_magic = self.fs_magic;
|
||||
let old_fs_feature_flags = self.fs_feature_flags;
|
||||
@ -662,20 +665,20 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
writeln!(self.logger, "skipping mount point: {:?}", self.path)?;
|
||||
Ok(())
|
||||
} else {
|
||||
self.archive_dir_contents(&mut encoder, dir, false)
|
||||
self.archive_dir_contents(&mut encoder, dir, false).await
|
||||
};
|
||||
|
||||
self.fs_magic = old_fs_magic;
|
||||
self.fs_feature_flags = old_fs_feature_flags;
|
||||
self.current_st_dev = old_st_dev;
|
||||
|
||||
encoder.finish()?;
|
||||
encoder.finish().await?;
|
||||
result
|
||||
}
|
||||
|
||||
fn add_regular_file(
|
||||
async fn add_regular_file<T: SeqWrite + Send>(
|
||||
&mut self,
|
||||
encoder: &mut Encoder,
|
||||
encoder: &mut Encoder<'_, T>,
|
||||
fd: Fd,
|
||||
file_name: &Path,
|
||||
metadata: &Metadata,
|
||||
@ -683,7 +686,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
) -> Result<LinkOffset, Error> {
|
||||
let mut file = unsafe { std::fs::File::from_raw_fd(fd.into_raw_fd()) };
|
||||
let mut remaining = file_size;
|
||||
let mut out = encoder.create_file(metadata, file_name, file_size)?;
|
||||
let mut out = encoder.create_file(metadata, file_name, file_size).await?;
|
||||
while remaining != 0 {
|
||||
let mut got = match file.read(&mut self.file_copy_buffer[..]) {
|
||||
Ok(0) => break,
|
||||
@ -695,7 +698,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
self.report_file_grew_while_reading()?;
|
||||
got = remaining as usize;
|
||||
}
|
||||
out.write_all(&self.file_copy_buffer[..got])?;
|
||||
out.write_all(&self.file_copy_buffer[..got]).await?;
|
||||
remaining -= got as u64;
|
||||
}
|
||||
if remaining > 0 {
|
||||
@ -704,7 +707,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
vec::clear(&mut self.file_copy_buffer[..to_zero]);
|
||||
while remaining != 0 {
|
||||
let fill = remaining.min(self.file_copy_buffer.len() as u64) as usize;
|
||||
out.write_all(&self.file_copy_buffer[..fill])?;
|
||||
out.write_all(&self.file_copy_buffer[..fill]).await?;
|
||||
remaining -= fill as u64;
|
||||
}
|
||||
}
|
||||
@ -712,21 +715,21 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
Ok(out.file_offset())
|
||||
}
|
||||
|
||||
fn add_symlink(
|
||||
async fn add_symlink<T: SeqWrite + Send>(
|
||||
&mut self,
|
||||
encoder: &mut Encoder,
|
||||
encoder: &mut Encoder<'_, T>,
|
||||
fd: Fd,
|
||||
file_name: &Path,
|
||||
metadata: &Metadata,
|
||||
) -> Result<(), Error> {
|
||||
let dest = nix::fcntl::readlinkat(fd.as_raw_fd(), &b""[..])?;
|
||||
encoder.add_symlink(metadata, file_name, dest)?;
|
||||
encoder.add_symlink(metadata, file_name, dest).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn add_device(
|
||||
async fn add_device<T: SeqWrite + Send>(
|
||||
&mut self,
|
||||
encoder: &mut Encoder,
|
||||
encoder: &mut Encoder<'_, T>,
|
||||
file_name: &Path,
|
||||
metadata: &Metadata,
|
||||
stat: &FileStat,
|
||||
@ -735,7 +738,7 @@ impl<'a, 'b> Archiver<'a, 'b> {
|
||||
metadata,
|
||||
file_name,
|
||||
pxar::format::Device::from_dev_t(stat.st_rdev),
|
||||
)?)
|
||||
).await?)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -30,14 +30,15 @@ fn run_test(dir_name: &str) -> Result<(), Error> {
|
||||
..PxarCreateOptions::default()
|
||||
};
|
||||
|
||||
create_archive(
|
||||
let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
rt.block_on(create_archive(
|
||||
dir,
|
||||
writer,
|
||||
Flags::DEFAULT,
|
||||
|_| Ok(()),
|
||||
None,
|
||||
options,
|
||||
)?;
|
||||
))?;
|
||||
|
||||
Command::new("cmp")
|
||||
.arg("--verbose")
|
||||
|
Loading…
Reference in New Issue
Block a user