From 8a8a47031611ed786d0747518f3e880871290ac2 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Mon, 16 Dec 2019 10:06:26 +0100 Subject: [PATCH] src/bin/proxmox-backup-client.rs: use new ApiHandler::Async --- src/bin/proxmox-backup-client.rs | 468 +++++++++++++++++-------------- 1 file changed, 263 insertions(+), 205 deletions(-) diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index 4908de8b..d224ec91 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -10,7 +10,7 @@ use std::os::unix::fs::OpenOptionsExt; use proxmox::{sortable, identity}; use proxmox::tools::fs::{file_get_contents, file_get_json, file_set_contents, image_size}; -use proxmox::api::{ApiHandler, ApiMethod, RpcEnvironment}; +use proxmox::api::{ApiFuture, ApiHandler, ApiMethod, RpcEnvironment}; use proxmox::api::schema::*; use proxmox::api::cli::*; use proxmox::api::api; @@ -233,11 +233,18 @@ fn strip_server_file_expenstion(name: &str) -> String { } } -fn list_backup_groups( +fn list_backup_groups<'a>( param: Value, _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { + _rpcenv: &'a mut dyn RpcEnvironment, +) -> ApiFuture<'a> { + + async move { + list_backup_groups_async(param).await + }.boxed() +} + +async fn list_backup_groups_async(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; @@ -245,9 +252,7 @@ fn list_backup_groups( let path = format!("api2/json/admin/datastore/{}/groups", repo.store()); - let mut result = async_main(async move { - client.get(&path, None).await - })?; + let mut result = client.get(&path, None).await?; record_repository(&repo); @@ -311,11 +316,18 @@ fn list_backup_groups( Ok(Value::Null) } -fn list_snapshots( +fn list_snapshots<'a>( param: Value, _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { + _rpcenv: &'a mut dyn RpcEnvironment, +) -> ApiFuture<'a> { + + async move { + list_snapshots_async(param).await + }.boxed() +} + +async fn list_snapshots_async(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; @@ -332,9 +344,7 @@ fn list_snapshots( args["backup-id"] = group.backup_id().into(); } - let result = async_main(async move { - client.get(&path, Some(args)).await - })?; + let result = client.get(&path, Some(args)).await?; record_repository(&repo); @@ -381,11 +391,18 @@ fn list_snapshots( Ok(Value::Null) } -fn forget_snapshots( +fn forget_snapshots<'a>( param: Value, _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { + _rpcenv: &'a mut dyn RpcEnvironment, +) -> ApiFuture<'a> { + + async move { + forget_snapshots_async(param).await + }.boxed() +} + +async fn forget_snapshots_async(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; @@ -396,29 +413,34 @@ fn forget_snapshots( let path = format!("api2/json/admin/datastore/{}/snapshots", repo.store()); - let result = async_main(async move { - client.delete(&path, Some(json!({ - "backup-type": snapshot.group().backup_type(), - "backup-id": snapshot.group().backup_id(), - "backup-time": snapshot.backup_time().timestamp(), - }))).await - })?; + let result = client.delete(&path, Some(json!({ + "backup-type": snapshot.group().backup_type(), + "backup-id": snapshot.group().backup_id(), + "backup-time": snapshot.backup_time().timestamp(), + }))).await?; record_repository(&repo); Ok(result) } -fn api_login( +fn api_login<'a>( param: Value, _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { + _rpcenv: &'a mut dyn RpcEnvironment, +) -> ApiFuture<'a> { + + async move { + api_login_async(param).await + }.boxed() +} + +async fn api_login_async(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; let client = HttpClient::new(repo.host(), repo.user(), None)?; - async_main(async move { client.login().await })?; + client.login().await?; record_repository(&repo); @@ -438,11 +460,18 @@ fn api_logout( Ok(Value::Null) } -fn dump_catalog( +fn dump_catalog<'a>( param: Value, _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { + _rpcenv: &'a mut dyn RpcEnvironment, +) -> ApiFuture<'a> { + + async move { + dump_catalog_async(param).await + }.boxed() +} + +async fn dump_catalog_async(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; @@ -461,55 +490,58 @@ fn dump_catalog( let client = HttpClient::new(repo.host(), repo.user(), None)?; - async_main(async move { - let client = BackupReader::start( - client, - crypt_config.clone(), - repo.store(), - &snapshot.group().backup_type(), - &snapshot.group().backup_id(), - snapshot.backup_time(), - true, - ).await?; + let client = BackupReader::start( + client, + crypt_config.clone(), + repo.store(), + &snapshot.group().backup_type(), + &snapshot.group().backup_id(), + snapshot.backup_time(), + true, + ).await?; - let manifest = client.download_manifest().await?; + let manifest = client.download_manifest().await?; - let index = client.download_dynamic_index(&manifest, CATALOG_NAME).await?; + let index = client.download_dynamic_index(&manifest, CATALOG_NAME).await?; - let most_used = index.find_most_used_chunks(8); + let most_used = index.find_most_used_chunks(8); - let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used); + let chunk_reader = RemoteChunkReader::new(client.clone(), crypt_config, most_used); - let mut reader = BufferedDynamicReader::new(index, chunk_reader); + let mut reader = BufferedDynamicReader::new(index, chunk_reader); - let mut catalogfile = std::fs::OpenOptions::new() - .write(true) - .read(true) - .custom_flags(libc::O_TMPFILE) - .open("/tmp")?; + let mut catalogfile = std::fs::OpenOptions::new() + .write(true) + .read(true) + .custom_flags(libc::O_TMPFILE) + .open("/tmp")?; - std::io::copy(&mut reader, &mut catalogfile) - .map_err(|err| format_err!("unable to download catalog - {}", err))?; + std::io::copy(&mut reader, &mut catalogfile) + .map_err(|err| format_err!("unable to download catalog - {}", err))?; - catalogfile.seek(SeekFrom::Start(0))?; + catalogfile.seek(SeekFrom::Start(0))?; - let mut catalog_reader = CatalogReader::new(catalogfile); + let mut catalog_reader = CatalogReader::new(catalogfile); - catalog_reader.dump()?; + catalog_reader.dump()?; - record_repository(&repo); - - Ok::<(), Error>(()) - })?; + record_repository(&repo); Ok(Value::Null) } -fn list_snapshot_files( +fn list_snapshot_files<'a>( param: Value, _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { + _rpcenv: &'a mut dyn RpcEnvironment, +) -> ApiFuture<'a> { + + async move { + list_snapshot_files_async(param).await + }.boxed() +} + +async fn list_snapshot_files_async(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; @@ -522,13 +554,11 @@ fn list_snapshot_files( let path = format!("api2/json/admin/datastore/{}/files", repo.store()); - let mut result = async_main(async move { - client.get(&path, Some(json!({ - "backup-type": snapshot.group().backup_type(), - "backup-id": snapshot.group().backup_id(), - "backup-time": snapshot.backup_time().timestamp(), - }))).await - })?; + let mut result = client.get(&path, Some(json!({ + "backup-type": snapshot.group().backup_type(), + "backup-id": snapshot.group().backup_id(), + "backup-time": snapshot.backup_time().timestamp(), + }))).await?; record_repository(&repo); @@ -549,7 +579,18 @@ fn list_snapshot_files( Ok(Value::Null) } -fn start_garbage_collection( +fn start_garbage_collection<'a>( + param: Value, + info: &'static ApiMethod, + rpcenv: &'a mut dyn RpcEnvironment, +) -> ApiFuture<'a> { + + async move { + start_garbage_collection_async(param, info, rpcenv).await + }.boxed() +} + +async fn start_garbage_collection_async( param: Value, _info: &ApiMethod, _rpcenv: &mut dyn RpcEnvironment, @@ -562,13 +603,11 @@ fn start_garbage_collection( let path = format!("api2/json/admin/datastore/{}/gc", repo.store()); - async_main(async { - let result = client.post(&path, None).await?; + let result = client.post(&path, None).await?; - record_repository(&repo); + record_repository(&repo); - view_task_result(client, result, &output_format).await - })?; + view_task_result(client, result, &output_format).await?; Ok(Value::Null) } @@ -615,7 +654,18 @@ fn spawn_catalog_upload( Ok((catalog, catalog_result_rx)) } -fn create_backup( +fn create_backup<'a>( + param: Value, + info: &'static ApiMethod, + rpcenv: &'a mut dyn RpcEnvironment, +) -> ApiFuture<'a> { + + async move { + create_backup_async(param, info, rpcenv).await + }.boxed() +} + +async fn create_backup_async( param: Value, _info: &ApiMethod, _rpcenv: &mut dyn RpcEnvironment, @@ -752,122 +802,120 @@ fn create_backup( } }; - async_main(async move { - let client = BackupWriter::start( - client, - repo.store(), - backup_type, - &backup_id, - backup_time, - verbose, - ).await?; + let client = BackupWriter::start( + client, + repo.store(), + backup_type, + &backup_id, + backup_time, + verbose, + ).await?; - let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp()); - let mut manifest = BackupManifest::new(snapshot); + let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp()); + let mut manifest = BackupManifest::new(snapshot); - let (catalog, catalog_result_rx) = spawn_catalog_upload(client.clone(), crypt_config.clone())?; + let (catalog, catalog_result_rx) = spawn_catalog_upload(client.clone(), crypt_config.clone())?; - for (backup_type, filename, target, size) in upload_list { - match backup_type { - BackupType::CONFIG => { - println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target); - let stats = client - .upload_blob_from_file(&filename, &target, crypt_config.clone(), true) - .await?; - manifest.add_file(target, stats.size, stats.csum); - } - BackupType::LOGFILE => { // fixme: remove - not needed anymore ? - println!("Upload log file '{}' to '{:?}' as {}", filename, repo, target); - let stats = client - .upload_blob_from_file(&filename, &target, crypt_config.clone(), true) - .await?; - manifest.add_file(target, stats.size, stats.csum); - } - BackupType::PXAR => { - println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target); - catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?; - let stats = backup_directory( - &client, - &filename, - &target, - chunk_size_opt, - devices.clone(), - verbose, - skip_lost_and_found, - crypt_config.clone(), - catalog.clone(), - ).await?; - manifest.add_file(target, stats.size, stats.csum); - catalog.lock().unwrap().end_directory()?; - } - BackupType::IMAGE => { - println!("Upload image '{}' to '{:?}' as {}", filename, repo, target); - let stats = backup_image( - &client, - &filename, - &target, - size, - chunk_size_opt, - verbose, - crypt_config.clone(), - ).await?; - manifest.add_file(target, stats.size, stats.csum); - } + for (backup_type, filename, target, size) in upload_list { + match backup_type { + BackupType::CONFIG => { + println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target); + let stats = client + .upload_blob_from_file(&filename, &target, crypt_config.clone(), true) + .await?; + manifest.add_file(target, stats.size, stats.csum); + } + BackupType::LOGFILE => { // fixme: remove - not needed anymore ? + println!("Upload log file '{}' to '{:?}' as {}", filename, repo, target); + let stats = client + .upload_blob_from_file(&filename, &target, crypt_config.clone(), true) + .await?; + manifest.add_file(target, stats.size, stats.csum); + } + BackupType::PXAR => { + println!("Upload directory '{}' to '{:?}' as {}", filename, repo, target); + catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?; + let stats = backup_directory( + &client, + &filename, + &target, + chunk_size_opt, + devices.clone(), + verbose, + skip_lost_and_found, + crypt_config.clone(), + catalog.clone(), + ).await?; + manifest.add_file(target, stats.size, stats.csum); + catalog.lock().unwrap().end_directory()?; + } + BackupType::IMAGE => { + println!("Upload image '{}' to '{:?}' as {}", filename, repo, target); + let stats = backup_image( + &client, + &filename, + &target, + size, + chunk_size_opt, + verbose, + crypt_config.clone(), + ).await?; + manifest.add_file(target, stats.size, stats.csum); } } + } - // finalize and upload catalog - if upload_catalog { - let mutex = Arc::try_unwrap(catalog) - .map_err(|_| format_err!("unable to get catalog (still used)"))?; - let mut catalog = mutex.into_inner().unwrap(); + // finalize and upload catalog + if upload_catalog { + let mutex = Arc::try_unwrap(catalog) + .map_err(|_| format_err!("unable to get catalog (still used)"))?; + let mut catalog = mutex.into_inner().unwrap(); - catalog.finish()?; + catalog.finish()?; - drop(catalog); // close upload stream + drop(catalog); // close upload stream - let stats = catalog_result_rx.await??; + let stats = catalog_result_rx.await??; - manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum); - } + manifest.add_file(CATALOG_NAME.to_owned(), stats.size, stats.csum); + } - if let Some(rsa_encrypted_key) = rsa_encrypted_key { - let target = "rsa-encrypted.key"; - println!("Upload RSA encoded key to '{:?}' as {}", repo, target); - let stats = client - .upload_blob_from_data(rsa_encrypted_key, target, None, false, false) - .await?; - manifest.add_file(format!("{}.blob", target), stats.size, stats.csum); - - // openssl rsautl -decrypt -inkey master-private.pem -in rsa-encrypted.key -out t - /* - let mut buffer2 = vec![0u8; rsa.size() as usize]; - let pem_data = file_get_contents("master-private.pem")?; - let rsa = openssl::rsa::Rsa::private_key_from_pem(&pem_data)?; - let len = rsa.private_decrypt(&buffer, &mut buffer2, openssl::rsa::Padding::PKCS1)?; - println!("TEST {} {:?}", len, buffer2); - */ - } - - // create manifest (index.json) - let manifest = manifest.into_json(); - - println!("Upload index.json to '{:?}'", repo); - let manifest = serde_json::to_string_pretty(&manifest)?.into(); - client - .upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, crypt_config.clone(), true, true) + if let Some(rsa_encrypted_key) = rsa_encrypted_key { + let target = "rsa-encrypted.key"; + println!("Upload RSA encoded key to '{:?}' as {}", repo, target); + let stats = client + .upload_blob_from_data(rsa_encrypted_key, target, None, false, false) .await?; + manifest.add_file(format!("{}.blob", target), stats.size, stats.csum); - client.finish().await?; + // openssl rsautl -decrypt -inkey master-private.pem -in rsa-encrypted.key -out t + /* + let mut buffer2 = vec![0u8; rsa.size() as usize]; + let pem_data = file_get_contents("master-private.pem")?; + let rsa = openssl::rsa::Rsa::private_key_from_pem(&pem_data)?; + let len = rsa.private_decrypt(&buffer, &mut buffer2, openssl::rsa::Padding::PKCS1)?; + println!("TEST {} {:?}", len, buffer2); + */ + } - let end_time = Local::now(); - let elapsed = end_time.signed_duration_since(start_time); - println!("Duration: {}", elapsed); + // create manifest (index.json) + let manifest = manifest.into_json(); - println!("End Time: {}", end_time.to_rfc3339_opts(chrono::SecondsFormat::Secs, false)); + println!("Upload index.json to '{:?}'", repo); + let manifest = serde_json::to_string_pretty(&manifest)?.into(); + client + .upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, crypt_config.clone(), true, true) + .await?; - Ok(Value::Null) - }) + client.finish().await?; + + let end_time = Local::now(); + let elapsed = end_time.signed_duration_since(start_time); + println!("Duration: {}", elapsed); + + println!("End Time: {}", end_time.to_rfc3339_opts(chrono::SecondsFormat::Secs, false)); + + Ok(Value::Null) } fn complete_backup_source(arg: &str, param: &HashMap) -> Vec { @@ -891,14 +939,6 @@ fn complete_backup_source(arg: &str, param: &HashMap) -> Vec Result { - async_main(restore_do(param)) -} - fn dump_image( client: Arc, crypt_config: Option>, @@ -944,6 +984,17 @@ fn dump_image( Ok(()) } +fn restore<'a>( + param: Value, + _info: &ApiMethod, + _rpcenv: &'a mut dyn RpcEnvironment, +) -> ApiFuture<'a> { + + async move { + restore_do(param).await + }.boxed() +} + async fn restore_do(param: Value) -> Result { let repo = extract_repository_from_value(¶m)?; @@ -1103,11 +1154,17 @@ async fn restore_do(param: Value) -> Result { Ok(Value::Null) } -fn upload_log( +fn upload_log<'a>( param: Value, _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { + _rpcenv: &'a mut dyn RpcEnvironment, +) -> ApiFuture<'a> { + async move { + upload_log_async(param).await + }.boxed() +} + +async fn upload_log_async(param: Value) -> Result { let logfile = tools::required_string_param(¶m, "logfile")?; let repo = extract_repository_from_value(¶m)?; @@ -1144,9 +1201,7 @@ fn upload_log( let body = hyper::Body::from(raw_data); - async_main(async move { - client.upload("application/octet-stream", body, &path, Some(args)).await - }) + client.upload("application/octet-stream", body, &path, Some(args)).await } fn prune( @@ -1791,12 +1846,15 @@ async fn mount_do(param: Value, pipe: Option) -> Result { Ok(Value::Null) } -fn catalog_shell( +fn catalog_shell<'a>( param: Value, _info: &ApiMethod, - _rpcenv: &mut dyn RpcEnvironment, -) -> Result { - async_main(catalog_shell_async(param)) + _rpcenv: &'a mut dyn RpcEnvironment, +) -> ApiFuture<'a> { + + async move { + catalog_shell_async(param).await + }.boxed() } async fn catalog_shell_async(param: Value) -> Result { @@ -1910,7 +1968,7 @@ fn catalog_mgmt_cli() -> CliCommandMap { #[sortable] const API_METHOD_SHELL: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&catalog_shell), + &ApiHandler::Async(&catalog_shell), &ObjectSchema::new( "Shell to interactively inspect and restore snapshots.", &sorted!([ @@ -1930,7 +1988,7 @@ fn catalog_mgmt_cli() -> CliCommandMap { #[sortable] const API_METHOD_DUMP_CATALOG: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&dump_catalog), + &ApiHandler::Async(&dump_catalog), &ObjectSchema::new( "Dump catalog.", &sorted!([ @@ -2097,7 +2155,7 @@ fn main() { #[sortable] const API_METHOD_CREATE_BACKUP: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&create_backup), + &ApiHandler::Async(&create_backup), &ObjectSchema::new( "Create (host) backup.", &sorted!([ @@ -2178,7 +2236,7 @@ fn main() { #[sortable] const API_METHOD_UPLOAD_LOG: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&upload_log), + &ApiHandler::Async(&upload_log), &ObjectSchema::new( "Upload backup log file.", &sorted!([ @@ -2215,7 +2273,7 @@ fn main() { #[sortable] const API_METHOD_LIST_BACKUP_GROUPS: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&list_backup_groups), + &ApiHandler::Async(&list_backup_groups), &ObjectSchema::new( "List backup groups.", &sorted!([ @@ -2230,7 +2288,7 @@ fn main() { #[sortable] const API_METHOD_LIST_SNAPSHOTS: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&list_snapshots), + &ApiHandler::Async(&list_snapshots), &ObjectSchema::new( "List backup snapshots.", &sorted!([ @@ -2248,7 +2306,7 @@ fn main() { #[sortable] const API_METHOD_FORGET_SNAPSHOTS: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&forget_snapshots), + &ApiHandler::Async(&forget_snapshots), &ObjectSchema::new( "Forget (remove) backup snapshots.", &sorted!([ @@ -2265,7 +2323,7 @@ fn main() { #[sortable] const API_METHOD_START_GARBAGE_COLLECTION: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&start_garbage_collection), + &ApiHandler::Async(&start_garbage_collection), &ObjectSchema::new( "Start garbage collection for a specific repository.", &sorted!([ ("repository", true, &REPO_URL_SCHEMA) ]), @@ -2277,7 +2335,7 @@ fn main() { #[sortable] const API_METHOD_RESTORE: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&restore), + &ApiHandler::Async(&restore), &ObjectSchema::new( "Restore backup repository.", &sorted!([ @@ -2323,7 +2381,7 @@ We do not extraxt '.pxar' archives when writing to stdandard output. #[sortable] const API_METHOD_LIST_SNAPSHOT_FILES: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&list_snapshot_files), + &ApiHandler::Async(&list_snapshot_files), &ObjectSchema::new( "List snapshot files.", &sorted!([ @@ -2377,7 +2435,7 @@ We do not extraxt '.pxar' archives when writing to stdandard output. #[sortable] const API_METHOD_API_LOGIN: ApiMethod = ApiMethod::new( - &ApiHandler::Sync(&api_login), + &ApiHandler::Async(&api_login), &ObjectSchema::new( "Try to login. If successful, store ticket.", &sorted!([ ("repository", true, &REPO_URL_SCHEMA) ]),