diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs index c1dee2b8a..934562574 100644 --- a/src/bin/proxmox-backup-api.rs +++ b/src/bin/proxmox-backup-api.rs @@ -52,6 +52,8 @@ async fn run() -> Result<(), Error> { let mut config = server::ApiConfig::new( buildcfg::JS_DIR, &proxmox_backup::api2::ROUTER, RpcEnvironmentType::PRIVILEGED)?; + let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock()); + config.enable_file_log(buildcfg::API_ACCESS_LOG_FN)?; let rest_server = RestServer::new(config); @@ -79,7 +81,8 @@ async fn run() -> Result<(), Error> { daemon::systemd_notify(daemon::SystemdNotify::Ready)?; let init_result: Result<(), Error> = try_block!({ - server::create_task_control_socket()?; + server::register_task_control_commands(&mut commando_sock)?; + commando_sock.spawn()?; server::server_state_init()?; Ok(()) }); diff --git a/src/bin/proxmox-backup-proxy.rs b/src/bin/proxmox-backup-proxy.rs index 392545040..ce67faeea 100644 --- a/src/bin/proxmox-backup-proxy.rs +++ b/src/bin/proxmox-backup-proxy.rs @@ -94,6 +94,8 @@ async fn run() -> Result<(), Error> { config.register_template("index", &indexpath)?; config.register_template("console", "/usr/share/pve-xtermjs/index.html.hbs")?; + let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock()); + config.enable_file_log(buildcfg::API_ACCESS_LOG_FN)?; let rest_server = RestServer::new(config); @@ -146,7 +148,8 @@ async fn run() -> Result<(), Error> { daemon::systemd_notify(daemon::SystemdNotify::Ready)?; let init_result: Result<(), Error> = try_block!({ - server::create_task_control_socket()?; + server::register_task_control_commands(&mut commando_sock)?; + commando_sock.spawn()?; server::server_state_init()?; Ok(()) }); diff --git a/src/server.rs b/src/server.rs index f0db500e2..aa4b57ec6 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,6 +4,34 @@ //! services. We want async IO, so this is built on top of //! tokio/hyper. +use lazy_static::lazy_static; +use nix::unistd::Pid; + +use proxmox::sys::linux::procfs::PidStat; + +use crate::buildcfg; + +lazy_static! { + static ref PID: i32 = unsafe { libc::getpid() }; + static ref PSTART: u64 = PidStat::read_from_pid(Pid::from_raw(*PID)).unwrap().starttime; +} + +pub fn pid() -> i32 { + *PID +} + +pub fn pstart() -> u64 { + *PSTART +} + +pub fn ctrl_sock_from_pid(pid: i32) -> String { + format!("\0{}/control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, pid) +} + +pub fn our_ctrl_sock() -> String { + ctrl_sock_from_pid(*PID) +} + mod environment; pub use environment::*; diff --git a/src/server/worker_task.rs b/src/server/worker_task.rs index db0d7d2b1..ff8530584 100644 --- a/src/server/worker_task.rs +++ b/src/server/worker_task.rs @@ -8,7 +8,6 @@ use std::sync::{Arc, Mutex}; use anyhow::{bail, format_err, Error}; use futures::*; use lazy_static::lazy_static; -use nix::unistd::Pid; use serde_json::{json, Value}; use serde::{Serialize, Deserialize}; use tokio::sync::oneshot; @@ -20,6 +19,7 @@ use proxmox::tools::fs::{create_path, open_file_locked, replace_file, CreateOpti use super::UPID; use crate::buildcfg; +use crate::server; use crate::tools::logrotate::{LogRotate, LogRotateFiles}; use crate::tools::{FileLogger, FileLogOptions}; use crate::api2::types::Authid; @@ -35,16 +35,16 @@ pub const PROXMOX_BACKUP_ARCHIVE_TASK_FN: &str = taskdir!("/archive"); lazy_static! { static ref WORKER_TASK_LIST: Mutex>> = Mutex::new(HashMap::new()); +} - static ref MY_PID: i32 = unsafe { libc::getpid() }; - static ref MY_PID_PSTART: u64 = procfs::PidStat::read_from_pid(Pid::from_raw(*MY_PID)) - .unwrap() - .starttime; +/// checks if the task UPID refers to a worker from this process +fn is_local_worker(upid: &UPID) -> bool { + upid.pid == server::pid() && upid.pstart == server::pstart() } /// Test if the task is still running pub async fn worker_is_active(upid: &UPID) -> Result { - if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { + if is_local_worker(upid) { return Ok(WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id)); } @@ -52,15 +52,12 @@ pub async fn worker_is_active(upid: &UPID) -> Result { return Ok(false); } - let socketname = format!( - "\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, upid.pid); - + let sock = server::ctrl_sock_from_pid(upid.pid); let cmd = json!({ - "command": "status", + "command": "worker-task-status", "upid": upid.to_string(), }); - - let status = super::send_command(socketname, cmd).await?; + let status = super::send_command(sock, cmd).await?; if let Some(active) = status.as_bool() { Ok(active) @@ -71,69 +68,48 @@ pub async fn worker_is_active(upid: &UPID) -> Result { /// Test if the task is still running (fast but inaccurate implementation) /// -/// If the task is spanned from a different process, we simply return if +/// If the task is spawned from a different process, we simply return if /// that process is still running. This information is good enough to detect /// stale tasks... pub fn worker_is_active_local(upid: &UPID) -> bool { - if (upid.pid == *MY_PID) && (upid.pstart == *MY_PID_PSTART) { + if is_local_worker(upid) { WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id) } else { procfs::check_process_running_pstart(upid.pid, upid.pstart).is_some() } } -pub fn create_task_control_socket() -> Result<(), Error> { - - let socketname = format!( - "\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, *MY_PID); - - let control_future = super::create_control_socket(socketname, |param| { - let param = param - .as_object() - .ok_or_else(|| format_err!("unable to parse parameters (expected json object)"))?; - if param.keys().count() != 2 { bail!("wrong number of parameters"); } - - let command = param["command"] - .as_str() - .ok_or_else(|| format_err!("unable to parse parameters (missing command)"))?; - - // we have only two commands for now - if !(command == "abort-task" || command == "status") { - bail!("got unknown command '{}'", command); - } - - let upid_str = param["upid"] - .as_str() - .ok_or_else(|| format_err!("unable to parse parameters (missing upid)"))?; - - let upid = upid_str.parse::()?; - - if !(upid.pid == *MY_PID && upid.pstart == *MY_PID_PSTART) { +pub fn register_task_control_commands( + commando_sock: &mut super::CommandoSocket, +) -> Result<(), Error> { + fn get_upid(args: Option<&Value>) -> Result { + let args = if let Some(args) = args { args } else { bail!("missing args") }; + let upid = match args.get("upid") { + Some(Value::String(upid)) => upid.parse::()?, + None => bail!("no upid in args"), + _ => bail!("unable to parse upid"), + }; + if !is_local_worker(&upid) { bail!("upid does not belong to this process"); } + Ok(upid) + } - let hash = WORKER_TASK_LIST.lock().unwrap(); + commando_sock.register_command("worker-task-abort".into(), move |args| { + let upid = get_upid(args)?; - match command { - "abort-task" => { - if let Some(ref worker) = hash.get(&upid.task_id) { - worker.request_abort(); - } else { - // assume task is already stopped - } - Ok(Value::Null) - } - "status" => { - let active = hash.contains_key(&upid.task_id); - Ok(active.into()) - } - _ => { - bail!("got unknown command '{}'", command); - } + if let Some(ref worker) = WORKER_TASK_LIST.lock().unwrap().get(&upid.task_id) { + worker.request_abort(); } + Ok(Value::Null) })?; + commando_sock.register_command("worker-task-status".into(), move |args| { + let upid = get_upid(args)?; - tokio::spawn(control_future); + let active = WORKER_TASK_LIST.lock().unwrap().contains_key(&upid.task_id); + + Ok(active.into()) + })?; Ok(()) } @@ -148,17 +124,12 @@ pub fn abort_worker_async(upid: UPID) { pub async fn abort_worker(upid: UPID) -> Result<(), Error> { - let target_pid = upid.pid; - - let socketname = format!( - "\0{}/proxmox-task-control-{}.sock", buildcfg::PROXMOX_BACKUP_RUN_DIR, target_pid); - + let sock = server::ctrl_sock_from_pid(upid.pid); let cmd = json!({ - "command": "abort-task", + "command": "worker-task-abort", "upid": upid.to_string(), }); - - super::send_command(socketname, cmd).map_ok(|_| ()).await + super::send_command(sock, cmd).map_ok(|_| ()).await } fn parse_worker_status_line(line: &str) -> Result<(String, UPID, Option), Error> { diff --git a/tests/worker-task-abort.rs b/tests/worker-task-abort.rs index 3cb41e320..5b73c8b97 100644 --- a/tests/worker-task-abort.rs +++ b/tests/worker-task-abort.rs @@ -42,8 +42,10 @@ fn worker_task_abort() -> Result<(), Error> { let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async move { + let mut commando_sock = server::CommandoSocket::new(server::our_ctrl_sock()); + let init_result: Result<(), Error> = try_block!({ - server::create_task_control_socket()?; + server::register_task_control_commands(&mut commando_sock)?; server::server_state_init()?; Ok(()) });