diff --git a/src/libsystemd/sd-event/sd-event.c b/src/libsystemd/sd-event/sd-event.c index a35878cd8f4..8f74b141015 100644 --- a/src/libsystemd/sd-event/sd-event.c +++ b/src/libsystemd/sd-event/sd-event.c @@ -1039,7 +1039,7 @@ static int source_set_pending(sd_event_source *s, bool b) { } } - return 0; + return 1; } static sd_event_source *source_new(sd_event *e, bool floating, EventSourceType type) { @@ -3116,11 +3116,19 @@ static int process_timer( return 0; } -static int process_child(sd_event *e) { +static int process_child(sd_event *e, int64_t threshold, int64_t *ret_min_priority) { + int64_t min_priority = threshold; + bool something_new = false; sd_event_source *s; int r; assert(e); + assert(ret_min_priority); + + if (!e->need_process_child) { + *ret_min_priority = min_priority; + return 0; + } e->need_process_child = false; @@ -3145,6 +3153,9 @@ static int process_child(sd_event *e) { HASHMAP_FOREACH(s, e->child_sources) { assert(s->type == SOURCE_CHILD); + if (s->priority > threshold) + continue; + if (s->pending) continue; @@ -3181,10 +3192,15 @@ static int process_child(sd_event *e) { r = source_set_pending(s, true); if (r < 0) return r; + if (r > 0) { + something_new = true; + min_priority = MIN(min_priority, s->priority); + } } } - return 0; + *ret_min_priority = min_priority; + return something_new; } static int process_pidfd(sd_event *e, sd_event_source *s, uint32_t revents) { @@ -3214,13 +3230,13 @@ static int process_pidfd(sd_event *e, sd_event_source *s, uint32_t revents) { return source_set_pending(s, true); } -static int process_signal(sd_event *e, struct signal_data *d, uint32_t events) { - bool read_one = false; +static int process_signal(sd_event *e, struct signal_data *d, uint32_t events, int64_t *min_priority) { int r; assert(e); assert(d); assert_return(events == EPOLLIN, -EIO); + assert(min_priority); /* If there's a signal queued on this priority and SIGCHLD is on this priority too, then make sure to recheck the @@ -3246,7 +3262,7 @@ static int process_signal(sd_event *e, struct signal_data *d, uint32_t events) { n = read(d->fd, &si, sizeof(si)); if (n < 0) { if (IN_SET(errno, EAGAIN, EINTR)) - return read_one; + return 0; return -errno; } @@ -3256,8 +3272,6 @@ static int process_signal(sd_event *e, struct signal_data *d, uint32_t events) { assert(SIGNAL_VALID(si.ssi_signo)); - read_one = true; - if (e->signal_sources) s = e->signal_sources[si.ssi_signo]; if (!s) @@ -3271,12 +3285,16 @@ static int process_signal(sd_event *e, struct signal_data *d, uint32_t events) { r = source_set_pending(s, true); if (r < 0) return r; + if (r > 0 && *min_priority >= s->priority) { + *min_priority = s->priority; + return 1; /* an event source with smaller priority is queued. */ + } - return 1; + return 0; } } -static int event_inotify_data_read(sd_event *e, struct inotify_data *d, uint32_t revents) { +static int event_inotify_data_read(sd_event *e, struct inotify_data *d, uint32_t revents, int64_t threshold) { ssize_t n; assert(e); @@ -3292,6 +3310,9 @@ static int event_inotify_data_read(sd_event *e, struct inotify_data *d, uint32_t if (d->buffer_filled > 0) return 0; + if (d->priority > threshold) + return 0; + n = read(d->fd, &d->buffer, sizeof(d->buffer)); if (n < 0) { if (IN_SET(errno, EAGAIN, EINTR)) @@ -3831,20 +3852,14 @@ static int epoll_wait_usec( return r; } -_public_ int sd_event_wait(sd_event *e, uint64_t timeout) { +static int process_epoll(sd_event *e, usec_t timeout, int64_t threshold, int64_t *ret_min_priority) { + int64_t min_priority = threshold; + bool something_new = false; size_t n_event_queue, m; int r; - assert_return(e, -EINVAL); - assert_return(e = event_resolve(e), -ENOPKG); - assert_return(!event_pid_changed(e), -ECHILD); - assert_return(e->state != SD_EVENT_FINISHED, -ESTALE); - assert_return(e->state == SD_EVENT_ARMED, -EBUSY); - - if (e->exit_requested) { - e->state = SD_EVENT_PENDING; - return 1; - } + assert(e); + assert(ret_min_priority); n_event_queue = MAX(e->n_sources, 1u); if (!GREEDY_REALLOC(e->event_queue, e->event_queue_allocated, n_event_queue)) @@ -3856,12 +3871,8 @@ _public_ int sd_event_wait(sd_event *e, uint64_t timeout) { for (;;) { r = epoll_wait_usec(e->epoll_fd, e->event_queue, e->event_queue_allocated, timeout); - if (r == -EINTR) { - e->state = SD_EVENT_PENDING; - return 1; - } if (r < 0) - goto finish; + return r; m = (size_t) r; @@ -3877,7 +3888,9 @@ _public_ int sd_event_wait(sd_event *e, uint64_t timeout) { timeout = 0; } - triple_timestamp_get(&e->timestamp); + /* Set timestamp only when this is called first time. */ + if (threshold == INT64_MAX) + triple_timestamp_get(&e->timestamp); for (size_t i = 0; i < m; i++) { @@ -3893,6 +3906,11 @@ _public_ int sd_event_wait(sd_event *e, uint64_t timeout) { assert(s); + if (s->priority > threshold) + continue; + + min_priority = MIN(min_priority, s->priority); + switch (s->type) { case SOURCE_IO: @@ -3920,19 +3938,75 @@ _public_ int sd_event_wait(sd_event *e, uint64_t timeout) { } case WAKEUP_SIGNAL_DATA: - r = process_signal(e, e->event_queue[i].data.ptr, e->event_queue[i].events); + r = process_signal(e, e->event_queue[i].data.ptr, e->event_queue[i].events, &min_priority); break; case WAKEUP_INOTIFY_DATA: - r = event_inotify_data_read(e, e->event_queue[i].data.ptr, e->event_queue[i].events); + r = event_inotify_data_read(e, e->event_queue[i].data.ptr, e->event_queue[i].events, threshold); break; default: assert_not_reached("Invalid wake-up pointer"); } } + if (r < 0) + return r; + if (r > 0) + something_new = true; + } + + *ret_min_priority = min_priority; + return something_new; +} + +_public_ int sd_event_wait(sd_event *e, uint64_t timeout) { + int r; + + assert_return(e, -EINVAL); + assert_return(e = event_resolve(e), -ENOPKG); + assert_return(!event_pid_changed(e), -ECHILD); + assert_return(e->state != SD_EVENT_FINISHED, -ESTALE); + assert_return(e->state == SD_EVENT_ARMED, -EBUSY); + + if (e->exit_requested) { + e->state = SD_EVENT_PENDING; + return 1; + } + + for (int64_t threshold = INT64_MAX; ; threshold--) { + int64_t epoll_min_priority, child_min_priority; + + /* There may be a possibility that new epoll (especially IO) and child events are + * triggered just after process_epoll() call but before process_child(), and the new IO + * events may have higher priority than the child events. To salvage these events, + * let's call epoll_wait() again, but accepts only events with higher priority than the + * previous. See issue https://github.com/systemd/systemd/issues/18190 and comments + * https://github.com/systemd/systemd/pull/18750#issuecomment-785801085 + * https://github.com/systemd/systemd/pull/18922#issuecomment-792825226 */ + + r = process_epoll(e, timeout, threshold, &epoll_min_priority); + if (r == -EINTR) { + e->state = SD_EVENT_PENDING; + return 1; + } if (r < 0) goto finish; + if (r == 0 && threshold < INT64_MAX) + /* No new epoll event. */ + break; + + r = process_child(e, threshold, &child_min_priority); + if (r < 0) + goto finish; + if (r == 0) + /* No new child event. */ + break; + + threshold = MIN(epoll_min_priority, child_min_priority); + if (threshold == INT64_MIN) + break; + + timeout = 0; } r = process_watchdog(e); @@ -3959,19 +4033,12 @@ _public_ int sd_event_wait(sd_event *e, uint64_t timeout) { if (r < 0) goto finish; - if (e->need_process_child) { - r = process_child(e); - if (r < 0) - goto finish; - } - r = process_inotify(e); if (r < 0) goto finish; if (event_next_pending(e)) { e->state = SD_EVENT_PENDING; - return 1; }