187 lines
4.9 KiB
Perl
Executable File
187 lines
4.9 KiB
Perl
Executable File
package PVE::Service::pvescheduler;
|
|
|
|
use strict;
|
|
use warnings;
|
|
|
|
use POSIX qw(WNOHANG);
|
|
|
|
use PVE::Jobs;
|
|
use PVE::SafeSyslog;
|
|
|
|
use PVE::API2::Replication;
|
|
|
|
use PVE::Daemon;
|
|
use base qw(PVE::Daemon);
|
|
|
|
my $cmdline = [$0, @ARGV];
|
|
my %daemon_options = (stop_wait_time => 180, max_workers => 0);
|
|
my $daemon = __PACKAGE__->new('pvescheduler', $cmdline, %daemon_options);
|
|
|
|
my @JOB_TYPES = qw(replication jobs);
|
|
|
|
my sub running_job_pids : prototype($) {
|
|
my ($self) = @_;
|
|
my $pids = [ map { keys $_->%* } values $self->{jobs}->%* ];
|
|
return scalar($pids->@*) ? $pids : undef;
|
|
}
|
|
|
|
my sub finish_jobs : prototype($) {
|
|
my ($self) = @_;
|
|
for my $type (@JOB_TYPES) {
|
|
for my $cpid (keys $self->{jobs}->{$type}->%*) {
|
|
if (my $waitpid = waitpid($cpid, WNOHANG)) {
|
|
delete $self->{jobs}->{$type}->{$cpid} if $waitpid == $cpid || $waitpid == -1;
|
|
}
|
|
}
|
|
}
|
|
};
|
|
|
|
sub hup {
|
|
my ($self) = @_;
|
|
|
|
my $old_workers = "";
|
|
for my $type (@JOB_TYPES) {
|
|
my $worker = $self->{jobs}->{$type} // next;
|
|
$old_workers .= "$type:$_;" for keys $worker->%*;
|
|
}
|
|
$ENV{"PVE_DAEMON_WORKER_PIDS"} = $old_workers;
|
|
$self->{got_hup_signal} = 1;
|
|
}
|
|
|
|
sub run {
|
|
my ($self) = @_;
|
|
|
|
my $jobs = {};
|
|
$self->{jobs} = $jobs;
|
|
|
|
# modelled after PVE::Daemons logic, but with type added to PID
|
|
if (my $wpids = $ENV{PVE_DAEMON_WORKER_PIDS}) {
|
|
print STDERR "got workers from previous daemon run: $wpids\n"; # FIXME: only log on debug?
|
|
for my $pid (split(';', $wpids)) {
|
|
if ($pid =~ m/^(\w+):(\d+)$/) { # check & untaint
|
|
$self->{jobs}->{$1}->{$2} = 1;
|
|
} else {
|
|
warn "could not parse previous pid entry '$pid', ignoring\n";
|
|
}
|
|
}
|
|
}
|
|
|
|
my $old_sig_chld = $SIG{CHLD};
|
|
local $SIG{CHLD} = sub {
|
|
local ($@, $!, $?); # do not overwrite error vars
|
|
finish_jobs($self);
|
|
$old_sig_chld->(@_) if $old_sig_chld;
|
|
};
|
|
|
|
my $fork = sub {
|
|
my ($type, $sub) = @_;
|
|
|
|
# don't fork again if the previous iteration still runs
|
|
# FIXME: some job types may handle this better themself or just not care - make configurable
|
|
return if scalar(keys $self->{jobs}->{$type}->%*);
|
|
|
|
my $child = fork();
|
|
if (!defined($child)) {
|
|
die "fork failed: $!\n";
|
|
} elsif ($child == 0) {
|
|
$self->after_fork_cleanup();
|
|
eval {
|
|
$sub->();
|
|
};
|
|
if (my $err = $@) {
|
|
syslog('err', "$type: $err");
|
|
}
|
|
POSIX::_exit(0);
|
|
}
|
|
|
|
$jobs->{$type}->{$child} = 1;
|
|
};
|
|
|
|
my $first_run = 1;
|
|
|
|
my $run_jobs = sub {
|
|
# TODO: actually integrate replication in PVE::Jobs and do not always fork here, we could
|
|
# do the state lookup and check if there's new work scheduled before doing so, e.g., by
|
|
# extending the PVE::Jobs interfacae e.g.;
|
|
# my $scheduled_jobs = PVE::Jobs::get_pending() or return;
|
|
# forked { PVE::Jobs::run_jobs($scheduled_jobs) }
|
|
|
|
$fork->('replication', sub {
|
|
PVE::API2::Replication::run_jobs(undef, sub {}, 0, 1);
|
|
});
|
|
|
|
$fork->('jobs', sub {
|
|
PVE::Jobs::run_jobs($first_run);
|
|
});
|
|
|
|
$first_run = 0;
|
|
};
|
|
|
|
PVE::Jobs::setup_dirs();
|
|
|
|
for (my $count = 1000;;$count++) {
|
|
return if $self->{got_hup_signal}; # keep workers running, PVE::Daemon re-execs us on return
|
|
last if $self->{shutdown_request}; # exit main-run loop for shutdown
|
|
|
|
$run_jobs->();
|
|
|
|
my $sleep_time = 60;
|
|
if ($count >= 1000) {
|
|
# Job schedule has minute precision, so try running near the minute boundary.
|
|
my ($current_seconds) = localtime;
|
|
$sleep_time = (60 - $current_seconds) if (60 - $current_seconds >= 5);
|
|
$count = 0;
|
|
}
|
|
|
|
my $slept = 0; # SIGCHLD interrupts sleep, so we need to keep track
|
|
while ($slept < $sleep_time) {
|
|
last if $self->{shutdown_request} || $self->{got_hup_signal};
|
|
$slept += sleep($sleep_time - $slept);
|
|
# TODO: check if there's new work to do, e.g., if a job finished
|
|
# that had a longer runtime than run period
|
|
}
|
|
}
|
|
|
|
# NOTE: we only get here on shutdown_request, so we already sent a TERM to all job-types
|
|
my $timeout = 0;
|
|
while(my $pids = running_job_pids($self)) {
|
|
kill 'TERM', $pids->@*; # send TERM to all workers at once, possible thundering herd - FIXME?
|
|
|
|
finish_jobs($self);
|
|
|
|
# some jobs have a lock timeout of 60s, wait a bit more for graceful termination
|
|
last if $timeout > 75;
|
|
$timeout += sleep(3);
|
|
}
|
|
|
|
if (my $pids = running_job_pids($self)) {
|
|
syslog('warn', "unresponsive job-worker, killing now: " . join(', ', $pids->@*));
|
|
kill 'KILL', $pids->@*;
|
|
}
|
|
}
|
|
|
|
sub shutdown {
|
|
my ($self) = @_;
|
|
|
|
syslog('info', 'got shutdown request, signal running jobs to stop');
|
|
|
|
for my $jobs (values $self->{jobs}->%*) {
|
|
kill 'TERM', keys $jobs->%*;
|
|
}
|
|
$self->{shutdown_request} = 1;
|
|
}
|
|
|
|
$daemon->register_start_command();
|
|
$daemon->register_stop_command();
|
|
$daemon->register_restart_command(1);
|
|
$daemon->register_status_command();
|
|
|
|
our $cmddef = {
|
|
start => [ __PACKAGE__, 'start', []],
|
|
stop => [ __PACKAGE__, 'stop', []],
|
|
restart => [ __PACKAGE__, 'restart', []],
|
|
status => [ __PACKAGE__, 'status', [], undef, sub { print shift . "\n";} ],
|
|
};
|
|
|
|
1;
|