mirror of
git://git.proxmox.com/git/pve-zsync.git
synced 2025-02-08 09:57:56 +03:00
Refactor locking
This introduces a new locked() mechanism allowing to enclose locked sections in a cleaner way. There's only two types of locks namely one for state and cron (they are always read together and almost always written together) and one for sync. Signed-off-by: Fabian Ebner <f.ebner@proxmox.com>
This commit is contained in:
parent
f1616f20eb
commit
96ed817693
221
pve-zsync
221
pve-zsync
@ -18,7 +18,6 @@ my $PATH = "/usr/sbin";
|
||||
my $PVE_DIR = "/etc/pve/local";
|
||||
my $QEMU_CONF = "${PVE_DIR}/qemu-server";
|
||||
my $LXC_CONF = "${PVE_DIR}/lxc";
|
||||
my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
|
||||
my $PROG_PATH = "$PATH/${PROGNAME}";
|
||||
my $INTERVAL = 15;
|
||||
my $DEBUG;
|
||||
@ -110,14 +109,20 @@ sub cut_target_width {
|
||||
return "$head/" . $path . "/$tail";
|
||||
}
|
||||
|
||||
sub lock {
|
||||
my ($fh) = @_;
|
||||
flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
|
||||
}
|
||||
sub locked {
|
||||
my ($lock_fn, $code) = @_;
|
||||
|
||||
sub unlock {
|
||||
my ($fh) = @_;
|
||||
flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
|
||||
my $lock_fh = IO::File->new("> $lock_fn");
|
||||
|
||||
flock($lock_fh, LOCK_EX) || die "Couldn't acquire lock - $!\n";
|
||||
my $res = eval { $code->() };
|
||||
my $err = $@;
|
||||
|
||||
flock($lock_fh, LOCK_UN) || warn "Error unlocking - $!\n";
|
||||
die "$err" if $err;
|
||||
|
||||
close($lock_fh);
|
||||
return $res;
|
||||
}
|
||||
|
||||
sub get_status {
|
||||
@ -342,7 +347,6 @@ sub update_state {
|
||||
|
||||
$in_fh = IO::File->new("< $STATE");
|
||||
die "Could not open file $STATE: $!\n" if !$in_fh;
|
||||
lock($in_fh);
|
||||
$text = <$in_fh>;
|
||||
};
|
||||
|
||||
@ -395,7 +399,6 @@ sub update_cron {
|
||||
|
||||
my $fh = IO::File->new("< $CRONJOBS");
|
||||
die "Could not open file $CRONJOBS: $!\n" if !$fh;
|
||||
lock($fh);
|
||||
|
||||
my @test = <$fh>;
|
||||
|
||||
@ -502,43 +505,45 @@ sub vm_exists {
|
||||
sub init {
|
||||
my ($param) = @_;
|
||||
|
||||
my $cfg = read_cron();
|
||||
locked("$CONFIG_PATH/cron_and_state.lock", sub {
|
||||
my $cfg = read_cron();
|
||||
|
||||
my $job = param_to_job($param);
|
||||
my $job = param_to_job($param);
|
||||
|
||||
$job->{state} = "ok";
|
||||
$job->{lsync} = 0;
|
||||
$job->{state} = "ok";
|
||||
$job->{lsync} = 0;
|
||||
|
||||
my $source = parse_target($param->{source});
|
||||
my $dest = parse_target($param->{dest});
|
||||
my $source = parse_target($param->{source});
|
||||
my $dest = parse_target($param->{dest});
|
||||
|
||||
if (my $ip = $dest->{ip}) {
|
||||
run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
|
||||
}
|
||||
if (my $ip = $dest->{ip}) {
|
||||
run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{dest_user}\@$ip"]);
|
||||
}
|
||||
|
||||
if (my $ip = $source->{ip}) {
|
||||
run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
|
||||
}
|
||||
if (my $ip = $source->{ip}) {
|
||||
run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "$param->{source_user}\@$ip"]);
|
||||
}
|
||||
|
||||
die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
|
||||
die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest, $param->{dest_user});
|
||||
|
||||
if (!defined($source->{vmid})) {
|
||||
die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
|
||||
}
|
||||
if (!defined($source->{vmid})) {
|
||||
die "Pool $source->{all} does not exists\n" if !check_pool_exists($source, $param->{source_user});
|
||||
}
|
||||
|
||||
my $vm_type = vm_exists($source, $param->{source_user});
|
||||
$job->{vm_type} = $vm_type;
|
||||
$source->{vm_type} = $vm_type;
|
||||
my $vm_type = vm_exists($source, $param->{source_user});
|
||||
$job->{vm_type} = $vm_type;
|
||||
$source->{vm_type} = $vm_type;
|
||||
|
||||
die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
|
||||
die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;
|
||||
|
||||
die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
|
||||
die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};
|
||||
|
||||
#check if vm has zfs disks if not die;
|
||||
get_disks($source, $param->{source_user}) if $source->{vmid};
|
||||
#check if vm has zfs disks if not die;
|
||||
get_disks($source, $param->{source_user}) if $source->{vmid};
|
||||
|
||||
update_cron($job);
|
||||
update_state($job);
|
||||
update_cron($job);
|
||||
update_state($job);
|
||||
}); #cron and state lock
|
||||
|
||||
eval {
|
||||
sync($param) if !$param->{skip};
|
||||
@ -568,96 +573,92 @@ sub get_job {
|
||||
sub destroy_job {
|
||||
my ($param) = @_;
|
||||
|
||||
my $job = get_job($param);
|
||||
$job->{state} = "del";
|
||||
locked("$CONFIG_PATH/cron_and_state.lock", sub {
|
||||
my $job = get_job($param);
|
||||
$job->{state} = "del";
|
||||
|
||||
update_cron($job);
|
||||
update_state($job);
|
||||
update_cron($job);
|
||||
update_state($job);
|
||||
});
|
||||
}
|
||||
|
||||
sub sync {
|
||||
my ($param) = @_;
|
||||
|
||||
my $lock_fh = IO::File->new("> $LOCKFILE");
|
||||
die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
|
||||
lock($lock_fh);
|
||||
locked("$CONFIG_PATH/sync.lock", sub {
|
||||
|
||||
my $date = get_date();
|
||||
my $job;
|
||||
eval {
|
||||
$job = get_job($param);
|
||||
};
|
||||
my $date = get_date();
|
||||
my $job;
|
||||
eval {
|
||||
$job = get_job($param);
|
||||
};
|
||||
|
||||
if ($job && defined($job->{state}) && $job->{state} eq "syncing") {
|
||||
die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
|
||||
}
|
||||
if ($job && defined($job->{state}) && $job->{state} eq "syncing") {
|
||||
die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
|
||||
}
|
||||
|
||||
my $dest = parse_target($param->{dest});
|
||||
my $source = parse_target($param->{source});
|
||||
my $dest = parse_target($param->{dest});
|
||||
my $source = parse_target($param->{source});
|
||||
|
||||
my $sync_path = sub {
|
||||
my ($source, $dest, $job, $param, $date) = @_;
|
||||
my $sync_path = sub {
|
||||
my ($source, $dest, $job, $param, $date) = @_;
|
||||
|
||||
($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
|
||||
($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name}, $param->{source_user});
|
||||
|
||||
snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
|
||||
snapshot_add($source, $dest, $param->{name}, $date, $param->{source_user}, $param->{dest_user});
|
||||
|
||||
send_image($source, $dest, $param);
|
||||
send_image($source, $dest, $param);
|
||||
|
||||
snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
|
||||
snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}, $param->{source_user}, $param->{dest_user}) if ($source->{destroy} && $source->{old_snap});
|
||||
|
||||
};
|
||||
};
|
||||
|
||||
my $vm_type = vm_exists($source, $param->{source_user});
|
||||
$source->{vm_type} = $vm_type;
|
||||
my $vm_type = vm_exists($source, $param->{source_user});
|
||||
$source->{vm_type} = $vm_type;
|
||||
|
||||
if ($job) {
|
||||
$job->{state} = "syncing";
|
||||
$job->{vm_type} = $vm_type if !$job->{vm_type};
|
||||
update_state($job);
|
||||
}
|
||||
if ($job) {
|
||||
$job->{state} = "syncing";
|
||||
$job->{vm_type} = $vm_type if !$job->{vm_type};
|
||||
locked("$CONFIG_PATH/cron_and_state.lock", sub { update_state($job); });
|
||||
}
|
||||
|
||||
eval{
|
||||
if ($source->{vmid}) {
|
||||
die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
|
||||
die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
|
||||
my $disks = get_disks($source, $param->{source_user});
|
||||
eval{
|
||||
if ($source->{vmid}) {
|
||||
die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
|
||||
die "source-user has to be root for syncing VMs\n" if ($param->{source_user} ne "root");
|
||||
my $disks = get_disks($source, $param->{source_user});
|
||||
|
||||
foreach my $disk (sort keys %{$disks}) {
|
||||
$source->{all} = $disks->{$disk}->{all};
|
||||
$source->{pool} = $disks->{$disk}->{pool};
|
||||
$source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
|
||||
$source->{last_part} = $disks->{$disk}->{last_part};
|
||||
foreach my $disk (sort keys %{$disks}) {
|
||||
$source->{all} = $disks->{$disk}->{all};
|
||||
$source->{pool} = $disks->{$disk}->{pool};
|
||||
$source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
|
||||
$source->{last_part} = $disks->{$disk}->{last_part};
|
||||
&$sync_path($source, $dest, $job, $param, $date);
|
||||
}
|
||||
if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
|
||||
send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
|
||||
} else {
|
||||
send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
|
||||
}
|
||||
} else {
|
||||
&$sync_path($source, $dest, $job, $param, $date);
|
||||
}
|
||||
if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
|
||||
send_config($source, $dest,'ssh', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
|
||||
} else {
|
||||
send_config($source, $dest,'local', $param->{source_user}, $param->{dest_user}, $param->{dest_config_path});
|
||||
};
|
||||
if (my $err = $@) {
|
||||
if ($job) {
|
||||
$job->{state} = "error";
|
||||
locked("$CONFIG_PATH/cron_and_state.lock", sub { update_state($job); });
|
||||
print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
|
||||
}
|
||||
} else {
|
||||
&$sync_path($source, $dest, $job, $param, $date);
|
||||
die "$err\n";
|
||||
}
|
||||
};
|
||||
if(my $err = $@) {
|
||||
|
||||
if ($job) {
|
||||
$job->{state} = "error";
|
||||
update_state($job);
|
||||
unlock($lock_fh);
|
||||
close($lock_fh);
|
||||
print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
|
||||
$job->{state} = "ok";
|
||||
$job->{lsync} = $date;
|
||||
locked("$CONFIG_PATH/cron_and_state.lock", sub { update_state($job); });
|
||||
}
|
||||
die "$err\n";
|
||||
}
|
||||
|
||||
if ($job) {
|
||||
$job->{state} = "ok";
|
||||
$job->{lsync} = $date;
|
||||
update_state($job);
|
||||
}
|
||||
|
||||
unlock($lock_fh);
|
||||
close($lock_fh);
|
||||
}); #sync lock
|
||||
}
|
||||
|
||||
sub snapshot_get{
|
||||
@ -1031,19 +1032,23 @@ sub status {
|
||||
sub enable_job {
|
||||
my ($param) = @_;
|
||||
|
||||
my $job = get_job($param);
|
||||
$job->{state} = "ok";
|
||||
update_state($job);
|
||||
update_cron($job);
|
||||
locked("$CONFIG_PATH/cron_and_state.lock", sub {
|
||||
my $job = get_job($param);
|
||||
$job->{state} = "ok";
|
||||
update_state($job);
|
||||
update_cron($job);
|
||||
});
|
||||
}
|
||||
|
||||
sub disable_job {
|
||||
my ($param) = @_;
|
||||
|
||||
my $job = get_job($param);
|
||||
$job->{state} = "stopped";
|
||||
update_state($job);
|
||||
update_cron($job);
|
||||
locked("$CONFIG_PATH/cron_and_state.lock", sub {
|
||||
my $job = get_job($param);
|
||||
$job->{state} = "stopped";
|
||||
update_state($job);
|
||||
update_cron($job);
|
||||
});
|
||||
}
|
||||
|
||||
my $cmd_help = {
|
||||
|
Loading…
x
Reference in New Issue
Block a user