mirror of
https://github.com/systemd/systemd-stable.git
synced 2025-01-26 10:03:40 +03:00
core: change BoundBy= dependency handling to be processed by a deferred work queue
So far StopWhenUnneeded= handling and UpheldBy= handling was already processed by a queue that is dispatched in a deferred mode of operation instead of instantly. This changes BoundBy= handling to be processed the same way. This should ensure that all *event*-to-job propagation is done directly from unit_notify(), while all *state*-to-job propagation is done from a deferred work queue, quite systematically. The work queue is submitted to by unit_notify() too. Key really is the difference between event and state: some jobs shall be queued one-time on events (think: OnFailure= + OnSuccess= and similar), others shall be queued continuously when a specific state is in effect (think: UpheldBy=). The latter cases are usually effect of the combination of states of a few units (e.g. StopWhenUnneeded= checks wether any of the Wants=/Requires=/… deps are still up before acting), and hence it makes sense to trigger them to be run after an individual unit's state changed, but process them on a queue that runs whenever there's nothing else to do that ensures the decision on them is only taken after all jobs/queued IO events are dispatched, and things settled, so that it makes sense to come to a combined conclusion. If we'd dispatch this work immediately inside of unit_notify() we'd always act instantly, even though another event from another unit that is already queued might make the work unnecessary or invalid. This is mostly a commit to make things philosophically clean. It does not add features, but it should make corner cases more robust.
This commit is contained in:
parent
116654d2cf
commit
56c5959202
@ -1347,6 +1347,44 @@ static unsigned manager_dispatch_start_when_upheld_queue(Manager *m) {
|
||||
return n;
|
||||
}
|
||||
|
||||
static unsigned manager_dispatch_stop_when_bound_queue(Manager *m) {
|
||||
unsigned n = 0;
|
||||
Unit *u;
|
||||
int r;
|
||||
|
||||
assert(m);
|
||||
|
||||
while ((u = m->stop_when_bound_queue)) {
|
||||
_cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
|
||||
Unit *culprit = NULL;
|
||||
|
||||
assert(u->in_stop_when_bound_queue);
|
||||
LIST_REMOVE(stop_when_bound_queue, m->stop_when_bound_queue, u);
|
||||
u->in_stop_when_bound_queue = false;
|
||||
|
||||
n++;
|
||||
|
||||
if (!unit_is_bound_by_inactive(u, &culprit))
|
||||
continue;
|
||||
|
||||
log_unit_debug(u, "Unit is stopped because bound to inactive unit %s.", culprit->id);
|
||||
|
||||
/* If stopping a unit fails continuously we might enter a stop loop here, hence stop acting on the
|
||||
* service being unnecessary after a while. */
|
||||
|
||||
if (!ratelimit_below(&u->auto_start_stop_ratelimit)) {
|
||||
log_unit_warning(u, "Unit needs to be stopped because it is bound to inactive unit %s it, but not stopping since we tried this too often recently.", culprit->id);
|
||||
continue;
|
||||
}
|
||||
|
||||
r = manager_add_job(u->manager, JOB_STOP, u, JOB_REPLACE, NULL, &error, NULL);
|
||||
if (r < 0)
|
||||
log_unit_warning_errno(u, r, "Failed to enqueue stop job, ignoring: %s", bus_error_message(&error, r));
|
||||
}
|
||||
|
||||
return n;
|
||||
}
|
||||
|
||||
static void manager_clear_jobs_and_units(Manager *m) {
|
||||
Unit *u;
|
||||
|
||||
@ -1366,6 +1404,7 @@ static void manager_clear_jobs_and_units(Manager *m) {
|
||||
assert(!m->gc_job_queue);
|
||||
assert(!m->stop_when_unneeded_queue);
|
||||
assert(!m->start_when_upheld_queue);
|
||||
assert(!m->stop_when_bound_queue);
|
||||
|
||||
assert(hashmap_isempty(m->jobs));
|
||||
assert(hashmap_isempty(m->units));
|
||||
@ -2996,6 +3035,9 @@ int manager_loop(Manager *m) {
|
||||
if (manager_dispatch_start_when_upheld_queue(m) > 0)
|
||||
continue;
|
||||
|
||||
if (manager_dispatch_stop_when_bound_queue(m) > 0)
|
||||
continue;
|
||||
|
||||
if (manager_dispatch_stop_when_unneeded_queue(m) > 0)
|
||||
continue;
|
||||
|
||||
|
@ -190,6 +190,9 @@ struct Manager {
|
||||
/* Units which are upheld by another other which we might need to act on */
|
||||
LIST_HEAD(Unit, start_when_upheld_queue);
|
||||
|
||||
/* Units that have BindsTo= another unit, and might need to be shutdown because the bound unit is not active. */
|
||||
LIST_HEAD(Unit, stop_when_bound_queue);
|
||||
|
||||
sd_event *event;
|
||||
|
||||
/* This maps PIDs we care about to units that are interested in. We allow multiple units to he interested in
|
||||
|
@ -62,6 +62,7 @@ static const UnitDependencyAtom atom_map[_UNIT_DEPENDENCY_MAX] = {
|
||||
UNIT_ATOM_PROPAGATE_RESTART |
|
||||
UNIT_ATOM_PROPAGATE_START_FAILURE |
|
||||
UNIT_ATOM_PINS_STOP_WHEN_UNNEEDED |
|
||||
UNIT_ATOM_ADD_CANNOT_BE_ACTIVE_WITHOUT_QUEUE |
|
||||
UNIT_ATOM_DEFAULT_TARGET_DEPENDENCIES,
|
||||
|
||||
[UNIT_UPHELD_BY] = UNIT_ATOM_START_STEADILY |
|
||||
@ -172,7 +173,9 @@ UnitDependency unit_dependency_from_unique_atom(UnitDependencyAtom atom) {
|
||||
UNIT_ATOM_PROPAGATE_RESTART |
|
||||
UNIT_ATOM_PROPAGATE_START_FAILURE |
|
||||
UNIT_ATOM_PINS_STOP_WHEN_UNNEEDED |
|
||||
UNIT_ATOM_ADD_CANNOT_BE_ACTIVE_WITHOUT_QUEUE |
|
||||
UNIT_ATOM_DEFAULT_TARGET_DEPENDENCIES:
|
||||
case UNIT_ATOM_ADD_CANNOT_BE_ACTIVE_WITHOUT_QUEUE:
|
||||
return UNIT_BOUND_BY;
|
||||
|
||||
case UNIT_ATOM_START_STEADILY |
|
||||
|
@ -31,6 +31,8 @@ typedef enum UnitDependencyAtom {
|
||||
|
||||
/* Stop our unit if the other unit happens to inactive */
|
||||
UNIT_ATOM_CANNOT_BE_ACTIVE_WITHOUT = UINT64_C(1) << 7,
|
||||
/* If our unit enters inactive state, add the other unit to the BoundBy= queue */
|
||||
UNIT_ATOM_ADD_CANNOT_BE_ACTIVE_WITHOUT_QUEUE = UINT64_C(1) << 8,
|
||||
|
||||
/* Start this unit whenever we find it inactive and the other unit active */
|
||||
UNIT_ATOM_START_STEADILY = UINT64_C(1) << 9,
|
||||
|
106
src/core/unit.c
106
src/core/unit.c
@ -523,6 +523,22 @@ void unit_submit_to_start_when_upheld_queue(Unit *u) {
|
||||
u->in_start_when_upheld_queue = true;
|
||||
}
|
||||
|
||||
void unit_submit_to_stop_when_bound_queue(Unit *u) {
|
||||
assert(u);
|
||||
|
||||
if (u->in_stop_when_bound_queue)
|
||||
return;
|
||||
|
||||
if (!UNIT_IS_ACTIVE_OR_RELOADING(unit_active_state(u)))
|
||||
return;
|
||||
|
||||
if (!unit_has_dependency(u, UNIT_ATOM_CANNOT_BE_ACTIVE_WITHOUT, NULL))
|
||||
return;
|
||||
|
||||
LIST_PREPEND(stop_when_bound_queue, u->manager->stop_when_bound_queue, u);
|
||||
u->in_stop_when_bound_queue = true;
|
||||
}
|
||||
|
||||
static void unit_clear_dependencies(Unit *u) {
|
||||
assert(u);
|
||||
|
||||
@ -738,6 +754,9 @@ Unit* unit_free(Unit *u) {
|
||||
if (u->in_start_when_upheld_queue)
|
||||
LIST_REMOVE(start_when_upheld_queue, u->manager->start_when_upheld_queue, u);
|
||||
|
||||
if (u->in_stop_when_bound_queue)
|
||||
LIST_REMOVE(stop_when_bound_queue, u->manager->stop_when_bound_queue, u);
|
||||
|
||||
safe_close(u->ip_accounting_ingress_map_fd);
|
||||
safe_close(u->ip_accounting_egress_map_fd);
|
||||
|
||||
@ -2060,6 +2079,38 @@ bool unit_is_upheld_by_active(Unit *u, Unit **ret_culprit) {
|
||||
return false;
|
||||
}
|
||||
|
||||
bool unit_is_bound_by_inactive(Unit *u, Unit **ret_culprit) {
|
||||
Unit *other;
|
||||
|
||||
assert(u);
|
||||
|
||||
/* Checks whether this unit is bound to another unit that is inactive, i.e. whether we should stop
|
||||
* because the other unit is down.*/
|
||||
|
||||
if (unit_active_state(u) != UNIT_ACTIVE || u->job) {
|
||||
/* Don't clean up while the unit is transitioning or is even inactive. */
|
||||
if (ret_culprit)
|
||||
*ret_culprit = NULL;
|
||||
return false;
|
||||
}
|
||||
|
||||
UNIT_FOREACH_DEPENDENCY(other, u, UNIT_ATOM_CANNOT_BE_ACTIVE_WITHOUT) {
|
||||
if (other->job)
|
||||
continue;
|
||||
|
||||
if (UNIT_IS_INACTIVE_OR_FAILED(unit_active_state(other))) {
|
||||
if (*ret_culprit)
|
||||
*ret_culprit = other;
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
if (ret_culprit)
|
||||
*ret_culprit = NULL;
|
||||
return false;
|
||||
}
|
||||
|
||||
static void check_unneeded_dependencies(Unit *u) {
|
||||
Unit *other;
|
||||
assert(u);
|
||||
@ -2080,53 +2131,14 @@ static void check_uphold_dependencies(Unit *u) {
|
||||
unit_submit_to_start_when_upheld_queue(other);
|
||||
}
|
||||
|
||||
static void unit_check_binds_to(Unit *u) {
|
||||
_cleanup_(sd_bus_error_free) sd_bus_error error = SD_BUS_ERROR_NULL;
|
||||
bool stop = false;
|
||||
static void check_bound_by_dependencies(Unit *u) {
|
||||
Unit *other;
|
||||
int r;
|
||||
|
||||
assert(u);
|
||||
|
||||
if (u->job)
|
||||
return;
|
||||
/* Add all units this unit depends on to the queue that processes BindsTo= stop behaviour. */
|
||||
|
||||
if (unit_active_state(u) != UNIT_ACTIVE)
|
||||
return;
|
||||
|
||||
UNIT_FOREACH_DEPENDENCY(other, u, UNIT_ATOM_CANNOT_BE_ACTIVE_WITHOUT) {
|
||||
if (other->job)
|
||||
continue;
|
||||
|
||||
if (!other->coldplugged)
|
||||
/* We might yet create a job for the other unit… */
|
||||
continue;
|
||||
|
||||
if (!UNIT_IS_INACTIVE_OR_FAILED(unit_active_state(other)))
|
||||
continue;
|
||||
|
||||
stop = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (!stop)
|
||||
return;
|
||||
|
||||
/* If stopping a unit fails continuously we might enter a stop
|
||||
* loop here, hence stop acting on the service being
|
||||
* unnecessary after a while. */
|
||||
if (!ratelimit_below(&u->auto_start_stop_ratelimit)) {
|
||||
log_unit_warning(u, "Unit is bound to inactive unit %s, but not stopping since we tried this too often recently.", other->id);
|
||||
return;
|
||||
}
|
||||
|
||||
assert(other);
|
||||
log_unit_info(u, "Unit is bound to inactive unit %s. Stopping, too.", other->id);
|
||||
|
||||
/* A unit we need to run is gone. Sniff. Let's stop this. */
|
||||
r = manager_add_job(u->manager, JOB_STOP, u, JOB_FAIL, NULL, &error, NULL);
|
||||
if (r < 0)
|
||||
log_unit_warning_errno(u, r, "Failed to enqueue stop job, ignoring: %s", bus_error_message(&error, r));
|
||||
UNIT_FOREACH_DEPENDENCY(other, u, UNIT_ATOM_ADD_CANNOT_BE_ACTIVE_WITHOUT_QUEUE)
|
||||
unit_submit_to_stop_when_bound_queue(other);
|
||||
}
|
||||
|
||||
static void retroactively_start_dependencies(Unit *u) {
|
||||
@ -2654,9 +2666,11 @@ void unit_notify(Unit *u, UnitActiveState os, UnitActiveState ns, UnitNotifyFlag
|
||||
retroactively_stop_dependencies(u);
|
||||
}
|
||||
|
||||
/* Stop unneeded units regardless if going down was expected or not */
|
||||
if (UNIT_IS_INACTIVE_OR_FAILED(ns))
|
||||
/* Stop unneeded units and bound-by units regardless if going down was expected or not */
|
||||
if (UNIT_IS_INACTIVE_OR_FAILED(ns)) {
|
||||
check_unneeded_dependencies(u);
|
||||
check_bound_by_dependencies(u);
|
||||
}
|
||||
|
||||
/* Start uphold units regardless if going up was expected or not */
|
||||
if (UNIT_IS_ACTIVE_OR_RELOADING(ns))
|
||||
@ -2703,7 +2717,7 @@ void unit_notify(Unit *u, UnitActiveState os, UnitActiveState ns, UnitNotifyFlag
|
||||
/* Maybe we finished startup, but something we needed has vanished? Let's die then. (This happens when
|
||||
* something BindsTo= to a Type=oneshot unit, as these units go directly from starting to inactive,
|
||||
* without ever entering started.) */
|
||||
unit_check_binds_to(u);
|
||||
unit_submit_to_stop_when_bound_queue(u);
|
||||
|
||||
if (os != UNIT_FAILED && ns == UNIT_FAILED) {
|
||||
reason = strjoina("unit ", u->id, " failed");
|
||||
|
@ -235,6 +235,9 @@ typedef struct Unit {
|
||||
/* Queue of units that have an Uphold= dependency from some other unit, and should be checked for starting */
|
||||
LIST_FIELDS(Unit, start_when_upheld_queue);
|
||||
|
||||
/* Queue of units that have a BindTo= dependency on some other unit, and should possibly be shut down */
|
||||
LIST_FIELDS(Unit, stop_when_bound_queue);
|
||||
|
||||
/* PIDs we keep an eye on. Note that a unit might have many
|
||||
* more, but these are the ones we care enough about to
|
||||
* process SIGCHLD for */
|
||||
@ -387,6 +390,7 @@ typedef struct Unit {
|
||||
bool in_target_deps_queue:1;
|
||||
bool in_stop_when_unneeded_queue:1;
|
||||
bool in_start_when_upheld_queue:1;
|
||||
bool in_stop_when_bound_queue:1;
|
||||
|
||||
bool sent_dbus_new_signal:1;
|
||||
|
||||
@ -745,6 +749,7 @@ void unit_add_to_gc_queue(Unit *u);
|
||||
void unit_add_to_target_deps_queue(Unit *u);
|
||||
void unit_submit_to_stop_when_unneeded_queue(Unit *u);
|
||||
void unit_submit_to_start_when_upheld_queue(Unit *u);
|
||||
void unit_submit_to_stop_when_bound_queue(Unit *u);
|
||||
|
||||
int unit_merge(Unit *u, Unit *other);
|
||||
int unit_merge_by_name(Unit *u, const char *other);
|
||||
@ -875,6 +880,7 @@ bool unit_is_pristine(Unit *u);
|
||||
|
||||
bool unit_is_unneeded(Unit *u);
|
||||
bool unit_is_upheld_by_active(Unit *u, Unit **ret_culprit);
|
||||
bool unit_is_bound_by_inactive(Unit *u, Unit **ret_culprit);
|
||||
|
||||
pid_t unit_control_pid(Unit *u);
|
||||
pid_t unit_main_pid(Unit *u);
|
||||
|
Loading…
x
Reference in New Issue
Block a user