use new replication helpers from pve-guest-common

This commit is contained in:
Dietmar Maurer 2017-06-12 06:59:21 +02:00
parent 7feb60e968
commit 7519e92caa
5 changed files with 19 additions and 125 deletions

View File

@ -7,6 +7,7 @@ use PVE::JSONSchema qw(get_standard_option);
use PVE::RPCEnvironment;
use PVE::ProcFSTools;
use PVE::ReplicationConfig;
use PVE::ReplicationState;
use PVE::Replication;
use PVE::QemuConfig;
use PVE::QemuServer;
@ -80,7 +81,7 @@ sub run_jobs {
my $code = sub {
my $start_time = $now // time();
while (my $jobcfg = PVE::Replication::get_next_job($iteration, $start_time)) {
while (my $jobcfg = PVE::ReplicationState::get_next_job($iteration, $start_time)) {
my $guest_class = $lookup_guest_class->($jobcfg->{vmtype});
PVE::Replication::run_replication($guest_class, $jobcfg, $iteration, $start_time, $logfunc, 1);
$start_time = $now // time();
@ -150,7 +151,7 @@ __PACKAGE__->register_method ({
my $rpcenv = PVE::RPCEnvironment::get();
my $authuser = $rpcenv->get_user();
my $jobs = PVE::Replication::job_status();
my $jobs = PVE::ReplicationState::job_status();
my $res = [];
foreach my $id (sort keys %$jobs) {
@ -223,7 +224,7 @@ __PACKAGE__->register_method ({
my $rpcenv = PVE::RPCEnvironment::get();
my $authuser = $rpcenv->get_user();
my $jobs = PVE::Replication::job_status();
my $jobs = PVE::ReplicationState::job_status();
my $jobid = $param->{id};
my $jobcfg = $jobs->{$jobid};

View File

@ -9,22 +9,12 @@ use Time::HiRes qw(gettimeofday tv_interval);
use PVE::INotify;
use PVE::ProcFSTools;
use PVE::Tools;
use PVE::CalendarEvent;
use PVE::Cluster;
use PVE::AbstractConfig;
use PVE::Storage;
use PVE::GuestHelpers;
use PVE::ReplicationConfig;
use PVE::ReplicationState;
our $replicate_logdir = "/var/log/pve/replicate";
# regression tests should overwrite this
sub job_logfile_name {
my ($jobid) = @_;
return "${replicate_logdir}/$jobid";
}
# regression tests should overwrite this
sub get_log_time {
@ -32,95 +22,6 @@ sub get_log_time {
return time();
}
sub job_status {
my $local_node = PVE::INotify::nodename();
my $jobs = {};
my $stateobj = PVE::ReplicationState::read_state();
my $cfg = PVE::ReplicationConfig->new();
my $vms = PVE::Cluster::get_vmlist();
foreach my $jobid (sort keys %{$cfg->{ids}}) {
my $jobcfg = $cfg->{ids}->{$jobid};
my $vmid = $jobcfg->{guest};
die "internal error - not implemented" if $jobcfg->{type} ne 'local';
# skip non existing vms
next if !$vms->{ids}->{$vmid};
# only consider guest on local node
next if $vms->{ids}->{$vmid}->{node} ne $local_node;
if (!$jobcfg->{remove_job}) {
# never sync to local node
next if $jobcfg->{target} eq $local_node;
next if $jobcfg->{disable};
}
my $state = PVE::ReplicationState::extract_job_state($stateobj, $jobcfg);
$jobcfg->{state} = $state;
$jobcfg->{id} = $jobid;
$jobcfg->{vmtype} = $vms->{ids}->{$vmid}->{type};
my $next_sync = 0;
if ($jobcfg->{remove_job}) {
$next_sync = 1; # lowest possible value
# todo: consider fail_count? How many retries?
} else {
if (my $fail_count = $state->{fail_count}) {
if ($fail_count < 3) {
$next_sync = $state->{last_try} + 5*60*$fail_count;
}
} else {
my $schedule = $jobcfg->{schedule} || '*/15';
my $calspec = PVE::CalendarEvent::parse_calendar_event($schedule);
$next_sync = PVE::CalendarEvent::compute_next_event($calspec, $state->{last_try}) // 0;
}
}
$jobcfg->{next_sync} = $next_sync;
$jobs->{$jobid} = $jobcfg;
}
return $jobs;
}
sub get_next_job {
my ($iteration, $start_time) = @_;
my $jobs = job_status();
my $sort_func = sub {
my $joba = $jobs->{$a};
my $jobb = $jobs->{$b};
my $sa = $joba->{state};
my $sb = $jobb->{state};
my $res = $sa->{last_iteration} cmp $sb->{last_iteration};
return $res if $res != 0;
$res = $joba->{next_sync} <=> $jobb->{next_sync};
return $res if $res != 0;
return $joba->{guest} <=> $jobb->{guest};
};
foreach my $jobid (sort $sort_func keys %$jobs) {
my $jobcfg = $jobs->{$jobid};
next if $jobcfg->{state}->{last_iteration} >= $iteration;
if ($jobcfg->{next_sync} && ($start_time >= $jobcfg->{next_sync})) {
return $jobcfg;
}
}
return undef;
}
sub remote_prepare_local_job {
my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
@ -209,17 +110,6 @@ sub replicate_volume {
$base_snapshot, $sync_snapname);
}
sub delete_job {
my ($jobid) = @_;
my $code = sub {
my $cfg = PVE::ReplicationConfig->new();
delete $cfg->{ids}->{$jobid};
$cfg->write();
};
PVE::ReplicationConfig::lock($code);
}
sub replicate {
my ($guest_class, $jobcfg, $state, $start_time, $logfunc) = @_;
@ -271,7 +161,7 @@ sub replicate {
# remove all local replication snapshots (lastsync => 0)
prepare($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc);
delete_job($jobid); # update config
PVE::ReplicationConfig::delete_job($jobid); # update config
$logfunc->("job removed");
return;
@ -403,8 +293,8 @@ my $run_replication_nolock = sub {
PVE::ReplicationState::write_job_state($jobcfg, $state);
mkdir $replicate_logdir;
my $logfile = job_logfile_name($jobid);
mkdir $PVE::ReplicationState::replicate_logdir;
my $logfile = PVE::ReplicationState::job_logfile_name($jobid);
open(my $logfd, '>', $logfile) ||
die "unable to open replication log '$logfile' - $!\n";

View File

@ -27,7 +27,8 @@ our $mocked_nodename = 'node1';
our $mocked_replication_jobs = {};
my $pve_replicationconfig = Test::MockModule->new('PVE::ReplicationConfig');
my $pve_replication_config_module = Test::MockModule->new('PVE::ReplicationConfig');
my $pve_replication_state_module = Test::MockModule->new('PVE::ReplicationState');
our $mocked_vm_configs = {};
@ -110,7 +111,7 @@ my $mocked_lxc_load_conf = sub {
my $pve_lxc_config_module = Test::MockModule->new('PVE::LXC::Config');
my $mocked_replication_config = sub {
my $mocked_replication_config_new = sub {
my $res = clone($mocked_replication_jobs);
@ -203,7 +204,7 @@ my $mocked_get_log_time = sub {
};
sub setup {
$pve_replication_module->mock(job_logfile_name => $mocked_job_logfile_name);
$pve_replication_state_module->mock(job_logfile_name => $mocked_job_logfile_name);
$pve_replication_module->mock(get_log_time => $mocked_get_log_time);
$pve_storage_module->mock(config => sub { return $mocked_storage_config; });
@ -211,7 +212,7 @@ sub setup {
$pve_storage_module->mock(volume_snapshot => $mocked_volume_snapshot);
$pve_storage_module->mock(volume_snapshot_delete => $mocked_volume_snapshot_delete);
$pve_replicationconfig->mock(new => $mocked_replication_config);
$pve_replication_config_module->mock(new => $mocked_replication_config_new);
$pve_qemuserver_module->mock(check_running => sub { return 0; });
$pve_qemuconfig_module->mock(load_config => $mocked_qemu_load_conf);
@ -285,7 +286,7 @@ sub track_jobs {
};
if (!$status) {
$status = PVE::Replication::job_status();
$status = PVE::ReplicationState::job_status();
foreach my $jobid (sort keys %$status) {
my $jobcfg = $status->{$jobid};
$logmsg->("$ctime $jobid: new job next_sync => $jobcfg->{next_sync}");
@ -294,7 +295,7 @@ sub track_jobs {
PVE::API2::Replication::run_jobs($ctime, $logmsg);
my $new = PVE::Replication::job_status();
my $new = PVE::ReplicationState::job_status();
# detect removed jobs
foreach my $jobid (sort keys %$status) {

View File

@ -75,9 +75,11 @@ my $mocked_delete_job = sub {
delete $ReplicationTestEnv::mocked_replication_jobs->{$jobid};
};
my $pve_replication_config_module = Test::MockModule->new('PVE::ReplicationConfig');
$pve_replication_config_module->mock(delete_job => $mocked_delete_job);
my $pve_replication_module = Test::MockModule->new('PVE::Replication');
$pve_replication_module->mock(
delete_job => $mocked_delete_job,
remote_prepare_local_job => $mocked_remote_prepare_local_job,
remote_finalize_local_job => $mocked_remote_finalize_local_job,
replicate_volume => $mocked_replicate_volume);

View File

@ -21,8 +21,8 @@ my $mocked_delete_job = sub {
delete $ReplicationTestEnv::mocked_replication_jobs->{$jobid};
};
my $pve_replication_module = Test::MockModule->new('PVE::Replication');
$pve_replication_module->mock(
my $pve_replication_config_module = Test::MockModule->new('PVE::ReplicationConfig');
$pve_replication_config_module->mock(
delete_job => $mocked_delete_job);
my $testjob = {