replication: add replication log files

This commit is contained in:
Dietmar Maurer 2017-06-07 14:24:09 +02:00
parent b94223b028
commit bb5cb56104
2 changed files with 94 additions and 39 deletions

View File

@ -22,6 +22,20 @@ use PVE::ReplicationConfig;
use PVE::ReplicationState;
our $pvesr_lock_path = "/var/lock/pvesr.lck";
our $replicate_logdir = "/var/log/pve/replicate";
# regression tests should overwrite this
sub job_logfile_name {
my ($jobid) = @_;
return "${replicate_logdir}/$jobid";
}
# regression tests should overwrite this
sub get_log_time {
return time();
}
sub job_status {
@ -113,7 +127,7 @@ my $get_next_job = sub {
};
sub remote_prepare_local_job {
my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force) = @_;
my ($ssh_info, $jobid, $vmid, $volumes, $storeid_list, $last_sync, $parent_snapname, $force, $logfunc) = @_;
my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
my $cmd = [@$ssh_cmd, '--', 'pvesr', 'prepare-local-job', $jobid];
@ -131,7 +145,13 @@ sub remote_prepare_local_job {
$remote_snapshots = JSON::decode_json($line);
};
PVE::Tools::run_command($cmd, outfunc => $parser);
my $logger = sub {
my $line = shift;
chomp $line;
$logfunc->("(remote_prepare_local_job) $line");
};
PVE::Tools::run_command($cmd, outfunc => $parser, errfunc => $logger);
die "prepare remote node failed - no result\n"
if !defined($remote_snapshots);
@ -140,13 +160,19 @@ sub remote_prepare_local_job {
}
sub remote_finalize_local_job {
my ($ssh_info, $jobid, $vmid, $volumes, $last_sync) = @_;
my ($ssh_info, $jobid, $vmid, $volumes, $last_sync, $logfunc) = @_;
my $ssh_cmd = PVE::Cluster::ssh_info_to_command($ssh_info);
my $cmd = [@$ssh_cmd, '--', 'pvesr', 'finalize-local-job', $jobid,
@$volumes, '--last_sync', $last_sync];
PVE::Tools::run_command($cmd);
my $logger = sub {
my $line = shift;
chomp $line;
$logfunc->("(remote_finalize_local_job) $line");
};
PVE::Tools::run_command($cmd, outfunc => $logger, errfunc => $logger);
}
# finds local replication snapshots from $last_sync
@ -167,7 +193,7 @@ sub prepare {
if ($snap eq $snapname || (defined($parent_snapname) && ($snap eq $parent_snapname))) {
$last_snapshots->{$volid}->{$snap} = 1;
} elsif ($snap =~ m/^\Q$prefix\E/) {
$logfunc->("$jobid: delete stale replication snapshot '$snap' on $volid");
$logfunc->("delete stale replication snapshot '$snap' on $volid");
PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snap);
}
}
@ -201,8 +227,6 @@ sub delete_job {
sub replicate {
my ($jobcfg, $state, $start_time, $logfunc) = @_;
$logfunc = sub {} if !$logfunc; # log nothing by default
my $local_node = PVE::INotify::nodename();
die "not implemented - internal error" if $jobcfg->{type} ne 'local';
@ -247,24 +271,24 @@ sub replicate {
my $sorted_volids = [ sort keys %$volumes ];
$logfunc->("$jobid: guest => $vmid, type => $vmtype, running => $running");
$logfunc->("$jobid: volumes => " . join(',', @$sorted_volids));
$logfunc->("guest => $vmid, type => $vmtype, running => $running");
$logfunc->("volumes => " . join(',', @$sorted_volids));
if (my $remove_job = $jobcfg->{remove_job}) {
$logfunc->("$jobid: start job removal - mode '${remove_job}'");
$logfunc->("start job removal - mode '${remove_job}'");
if ($remove_job eq 'full' && $jobcfg->{target} ne $local_node) {
# remove all remote volumes
my $ssh_info = PVE::Cluster::get_ssh_info($jobcfg->{target});
remote_prepare_local_job($ssh_info, $jobid, $vmid, [], $state->{storeid_list}, 0, undef, 1);
remote_prepare_local_job($ssh_info, $jobid, $vmid, [], $state->{storeid_list}, 0, undef, 1, $logfunc);
}
# remove all local replication snapshots (lastsync => 0)
prepare($storecfg, $sorted_volids, $jobid, 0, undef, $logfunc);
delete_job($jobid); # update config
$logfunc->("$jobid: job removed");
$logfunc->("job removed");
return;
}
@ -286,7 +310,7 @@ sub replicate {
# prepare remote side
my $remote_snapshots = remote_prepare_local_job(
$ssh_info, $jobid, $vmid, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, 0);
$ssh_info, $jobid, $vmid, $sorted_volids, $state->{storeid_list}, $last_sync, $parent_snapname, 0, $logfunc);
my $storeid_hash = {};
foreach my $volid (@$sorted_volids) {
@ -297,7 +321,7 @@ sub replicate {
# freeze filesystem for data consistency
if ($qga) {
$logfunc->("$jobid: freeze guest filesystem");
$logfunc->("freeze guest filesystem");
PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-freeze");
}
@ -305,7 +329,7 @@ sub replicate {
my $replicate_snapshots = {};
eval {
foreach my $volid (@$sorted_volids) {
$logfunc->("$jobid: create snapshot '${sync_snapname}' on $volid");
$logfunc->("create snapshot '${sync_snapname}' on $volid");
PVE::Storage::volume_snapshot($storecfg, $volid, $sync_snapname);
$replicate_snapshots->{$volid} = 1;
}
@ -314,7 +338,7 @@ sub replicate {
# unfreeze immediately
if ($qga) {
$logfunc->("$jobid: unfreeze guest filesystem");
$logfunc->("unfreeze guest filesystem");
eval { PVE::QemuServer::vm_mon_cmd($vmid, "guest-fsfreeze-thaw"); };
warn $@ if $@; # ignore errors here, because we cannot fix it anyways
}
@ -322,7 +346,7 @@ sub replicate {
my $cleanup_local_snapshots = sub {
my ($volid_hash, $snapname) = @_;
foreach my $volid (sort keys %$volid_hash) {
$logfunc->("$jobid: delete previous replication snapshot '$snapname' on $volid");
$logfunc->("delete previous replication snapshot '$snapname' on $volid");
eval { PVE::Storage::volume_snapshot_delete($storecfg, $volid, $snapname, $running); };
warn $@ if $@;
}
@ -344,17 +368,17 @@ sub replicate {
if (defined($last_snapshots->{$volid}) && defined($remote_snapshots->{$volid})) {
if ($last_snapshots->{$volid}->{$last_sync_snapname} &&
$remote_snapshots->{$volid}->{$last_sync_snapname}) {
$logfunc->("$jobid: incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
$logfunc->("incremental sync '$volid' ($last_sync_snapname => $sync_snapname)");
$base_snapname = $last_sync_snapname;
} elsif (defined($parent_snapname) &&
($last_snapshots->{$volid}->{$parent_snapname} &&
$remote_snapshots->{$volid}->{$parent_snapname})) {
$logfunc->("$jobid: incremental sync '$volid' ($parent_snapname => $sync_snapname)");
$logfunc->("incremental sync '$volid' ($parent_snapname => $sync_snapname)");
$base_snapname = $parent_snapname;
}
}
$logfunc->("$jobid: full sync '$volid' ($sync_snapname)") if !defined($base_snapname);
$logfunc->("full sync '$volid' ($sync_snapname)") if !defined($base_snapname);
replicate_volume($ssh_info, $storecfg, $volid, $base_snapname, $sync_snapname, $rate, $insecure);
}
};
@ -370,7 +394,7 @@ sub replicate {
# remove old snapshots because they are no longer needed
$cleanup_local_snapshots->($last_snapshots, $last_sync_snapname);
remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time);
remote_finalize_local_job($ssh_info, $jobid, $vmid, $sorted_volids, $start_time, $logfunc);
die $err if $err;
}
@ -378,6 +402,8 @@ sub replicate {
my $run_replication_nolock = sub {
my ($jobcfg, $iteration, $start_time, $logfunc) = @_;
my $jobid = $jobcfg->{id};
# we normaly write errors into the state file,
# but we also catch unexpected errors and log them to syslog
# (for examply when there are problems writing the state file)
@ -395,10 +421,23 @@ my $run_replication_nolock = sub {
PVE::ReplicationState::write_job_state($jobcfg, $state);
$logfunc->("$jobcfg->{id}: start replication job") if $logfunc;
mkdir $replicate_logdir;
my $logfile = job_logfile_name($jobid);
open(my $logfd, '>', $logfile) ||
die "unable to open replication log '$logfile' - $!\n";
my $logfunc_wrapper = sub {
my ($msg) = @_;
my $ctime = get_log_time();
print $logfd "$ctime $jobid: $msg\n";
$logfunc->("$ctime $jobid: $msg") if $logfunc;
};
$logfunc_wrapper->("start replication job");
eval {
replicate($jobcfg, $state, $start_time, $logfunc);
replicate($jobcfg, $state, $start_time, $logfunc_wrapper);
};
my $err = $@;
@ -411,22 +450,19 @@ my $run_replication_nolock = sub {
$state->{fail_count}++;
$state->{error} = "$err";
PVE::ReplicationState::write_job_state($jobcfg, $state);
my $msg = "$jobcfg->{id}: end replication job with error: $err";
if ($logfunc) {
$logfunc->($msg);
} else {
warn "$msg\n";
}
$logfunc_wrapper->("end replication job with error: $err");
} else {
$logfunc->("$jobcfg->{id}: end replication job") if $logfunc;
$logfunc_wrapper->("end replication job");
$state->{last_sync} = $start_time;
$state->{fail_count} = 0;
delete $state->{error};
PVE::ReplicationState::write_job_state($jobcfg, $state);
}
close($logfd);
};
if (my $err = $@) {
warn "$jobcfg->{id}: got unexpected error - $err";
warn "$jobid: got unexpected replication job error - $err";
}
};

View File

@ -179,7 +179,24 @@ my $mocked_volume_snapshot_delete = sub {
delete $d->{$snap} || die "no such snapshot '$snap' on '$volid'\n";
};
my $pve_replication_module = Test::MockModule->new('PVE::Replication');
my $mocked_job_logfile_name = sub {
my ($jobid) = @_;
return ".mocked_replication_log_$jobid";
};
my $mocked_log_time = 0;
my $mocked_get_log_time = sub {
return $mocked_log_time;
};
sub setup {
$pve_replication_module->mock(job_logfile_name => $mocked_job_logfile_name);
$pve_replication_module->mock(get_log_time => $mocked_get_log_time);
$pve_storage_module->mock(config => sub { return $mocked_storage_config; });
$pve_storage_module->mock(volume_snapshot_list => $mocked_volume_snapshot_list);
$pve_storage_module->mock(volume_snapshot => $mocked_volume_snapshot);
@ -249,18 +266,20 @@ my $status;
sub track_jobs {
my ($ctime) = @_;
$mocked_log_time = $ctime;
my $logmsg = sub {
my ($msg) = @_;
print "$ctime $msg\n";
print $logfh "$ctime $msg\n";
print "$msg\n";
print $logfh "$msg\n";
};
if (!$status) {
$status = PVE::Replication::job_status();
foreach my $jobid (sort keys %$status) {
my $jobcfg = $status->{$jobid};
$logmsg->("$jobid: new job next_sync => $jobcfg->{next_sync}");
$logmsg->("$ctime $jobid: new job next_sync => $jobcfg->{next_sync}");
}
}
@ -271,7 +290,7 @@ sub track_jobs {
# detect removed jobs
foreach my $jobid (sort keys %$status) {
if (!$new->{$jobid}) {
$logmsg->("$jobid: vanished job");
$logmsg->("$ctime $jobid: vanished job");
}
}
@ -279,7 +298,7 @@ sub track_jobs {
my $jobcfg = $new->{$jobid};
my $oldcfg = $status->{$jobid};
if (!$oldcfg) {
$logmsg->("$jobid: new job next_sync => $jobcfg->{next_sync}");
$logmsg->("$ctime $jobid: new job next_sync => $jobcfg->{next_sync}");
next; # no old state to compare
} else {
foreach my $k (qw(target guest vmtype next_sync)) {
@ -288,7 +307,7 @@ sub track_jobs {
$changes .= ', ' if $changes;
$changes .= "$k => $jobcfg->{$k}";
}
$logmsg->("$jobid: changed config $changes") if $changes;
$logmsg->("$ctime $jobid: changed config $changes") if $changes;
}
}
@ -305,7 +324,7 @@ sub track_jobs {
$changes .= "$k => $value";
}
}
$logmsg->("$jobid: changed state $changes") if $changes;
$logmsg->("$ctime $jobid: changed state $changes") if $changes;
my $old_storeid_list = $oldstate->{storeid_list};
my $storeid_list = $state->{storeid_list};
@ -321,7 +340,7 @@ sub track_jobs {
$storeid_list_changes = 1;
}
$logmsg->("$jobid: changed storeid list " . join(',', @$storeid_list))
$logmsg->("$ctime $jobid: changed storeid list " . join(',', @$storeid_list))
if $storeid_list_changes;
}
$status = $new;