1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-03 01:18:10 +03:00

tevent: add tevent_common_fd_mpx infrastructure

Backends may require to map individual tevent_fd instances to
a single low level kernel state (e.g. for epoll).

This generic infrastructure adds helper functions using
a generic (sub)part of struct tevent_fd.

The new code will allow us to support more than 2 tevent_fd
instances per fd, which makes sure all backends can provide
a similar behavior. This will be important when we add
TEVENT_FD_ERROR as a 3rd kind of fd event.

The aim is to use this in order to replace the limited implementation
we already have in tevent_epoll.c.

As these helpers are typically called from within
'void tevent_fd_set_flags(struct tevent_fd *fde, uint16_t flags)'
there's no way to report errors. So in order avoid additional
error handling complexity the helpers try to avoid
any allocations which may fail. It also means the logic in
tevent_epoll.c doesn't have to change much.

These are implemented as static line functions in order to avoid
the function call overhead, which showed up in profiles of the
early implementation.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
This commit is contained in:
Stefan Metzmacher 2022-11-09 22:48:10 +01:00 committed by Ralph Boehme
parent 95d6600a06
commit b328e99065
4 changed files with 390 additions and 0 deletions

View File

@ -236,6 +236,7 @@ static void epoll_check_reopen(struct epoll_event_context *epoll_ev)
epoll_ev->pid = pid;
epoll_ev->panic_state = &panic_triggered;
for (fde=epoll_ev->ev->fd_events;fde;fde=fde->next) {
tevent_common_fd_mpx_reinit(fde);
fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
}
for (fde=epoll_ev->ev->fd_events;fde;fde=fde->next) {
@ -759,6 +760,7 @@ static int epoll_event_fd_destructor(struct tevent_fd *fde)
int flags = fde->flags;
if (ev == NULL) {
tevent_common_fd_mpx_reinit(fde);
return tevent_common_fd_destructor(fde);
}
@ -789,6 +791,7 @@ static int epoll_event_fd_destructor(struct tevent_fd *fde)
if (epoll_ev->pid != tevent_cached_getpid()) {
epoll_check_reopen(epoll_ev);
if (panic_triggered) {
tevent_common_fd_mpx_reinit(fde);
return tevent_common_fd_destructor(fde);
}
}
@ -796,6 +799,7 @@ static int epoll_event_fd_destructor(struct tevent_fd *fde)
if (mpx_fde != NULL) {
epoll_update_event(epoll_ev, mpx_fde);
if (panic_triggered) {
tevent_common_fd_mpx_reinit(fde);
return tevent_common_fd_destructor(fde);
}
}

View File

@ -47,12 +47,26 @@ const char *tevent_common_fd_str(struct tevent_common_fd_buf *buf,
int tevent_common_fd_destructor(struct tevent_fd *fde)
{
struct tevent_fd *primary = NULL;
if (fde->destroyed) {
tevent_common_check_double_free(fde, "tevent_fd double free");
goto done;
}
fde->destroyed = true;
/*
* The caller should have cleared it from any mpx relationship
*/
primary = tevent_common_fd_mpx_primary(fde);
if (primary != fde) {
tevent_abort(fde->event_ctx,
"tevent_common_fd_destructor: fde not mpx primary");
} else if (fde->mpx.list != NULL) {
tevent_abort(fde->event_ctx,
"tevent_common_fd_destructor: fde has mpx fdes");
}
if (fde->event_ctx) {
tevent_trace_fd_callback(fde->event_ctx, fde, TEVENT_EVENT_TRACE_DETACH);
DLIST_REMOVE(fde->event_ctx->fd_events, fde);
@ -104,6 +118,7 @@ struct tevent_fd *tevent_common_add_fd(struct tevent_context *ev, TALLOC_CTX *me
tevent_trace_fd_callback(fde->event_ctx, fde, TEVENT_EVENT_TRACE_ATTACH);
DLIST_ADD(ev->fd_events, fde);
tevent_common_fd_mpx_reinit(fde);
talloc_set_destructor(fde, tevent_common_fd_destructor);

View File

@ -216,6 +216,14 @@ struct tevent_fd {
void *additional_data;
/* custom tag that can be set by caller */
uint64_t tag;
struct tevent_fd_mpx {
struct tevent_fd_mpx *prev, *next;
struct tevent_fd *fde;
struct tevent_fd *primary;
struct tevent_fd_mpx *list;
uint16_t total_flags;
bool has_mpx;
} mpx;
};
struct tevent_timer {
@ -564,6 +572,11 @@ void tevent_trace_queue_callback(struct tevent_context *ev,
#include "tevent_dlinklist.h"
static inline void tevent_common_fd_mpx_reinit(struct tevent_fd *fde)
{
fde->mpx = (struct tevent_fd_mpx) { .fde = fde, };
}
static inline void tevent_common_fd_disarm(struct tevent_fd *fde)
{
if (fde->event_ctx != NULL) {
@ -572,5 +585,362 @@ static inline void tevent_common_fd_disarm(struct tevent_fd *fde)
DLIST_REMOVE(fde->event_ctx->fd_events, fde);
fde->event_ctx = NULL;
}
tevent_common_fd_mpx_reinit(fde);
fde->wrapper = NULL;
}
/*
* tevent_common_fd_mpx_primary() returns the fde that is responsible
* for the low level state.
*
* By default (when there's no multiplexing) it just returns 'any_fde'.
*
* Note it always returns a valid pointer.
*/
static inline
struct tevent_fd *tevent_common_fd_mpx_primary(struct tevent_fd *any_fde)
{
struct tevent_fd *primary = NULL;
if (any_fde->mpx.primary != NULL) {
primary = any_fde->mpx.primary;
} else {
primary = any_fde;
}
return primary;
}
/*
* tevent_common_fd_mpx_update_flags() needs to be called
* if update_fde->flags has changed. It is needed in
* order to let tevent_common_fd_mpx_flags() return a valid
* result.
*/
static inline
void tevent_common_fd_mpx_update_flags(struct tevent_fd *update_fde)
{
struct tevent_fd *primary = tevent_common_fd_mpx_primary(update_fde);
struct tevent_fd_mpx *mpx = NULL;
uint16_t new_total_flags = 0;
if (!primary->mpx.has_mpx) {
primary->mpx.total_flags = primary->flags;
return;
}
for (mpx = primary->mpx.list; mpx != NULL; mpx = mpx->next) {
struct tevent_fd *mpx_fde = mpx->fde;
/* we don't care that mpx_fde might be == primary */
new_total_flags |= mpx_fde->flags;
}
primary->mpx.total_flags = new_total_flags;
}
/*
* tevent_common_fd_mpx_flags() return the effective flags
* (TEVEND_FD_*) of the primary fde and all multiplexed fdes.
*
* Valid after tevent_common_fd_mpx_update_flags() was called
*/
static inline
uint16_t tevent_common_fd_mpx_flags(struct tevent_fd *any_fde)
{
struct tevent_fd *primary = tevent_common_fd_mpx_primary(any_fde);
return primary->mpx.total_flags;
}
/*
* tevent_common_fd_mpx_clear_writeable() clears TEVENT_FD_WRITE
* from all fdes belonging together.
*/
static inline
void tevent_common_fd_mpx_clear_writeable(struct tevent_fd *any_fde)
{
struct tevent_fd *primary = tevent_common_fd_mpx_primary(any_fde);
struct tevent_fd_mpx *mpx = NULL;
primary->flags &= ~TEVENT_FD_WRITE;
for (mpx = primary->mpx.list; mpx != NULL; mpx = mpx->next) {
struct tevent_fd *mpx_fde = mpx->fde;
/* we don't care that mpx_fde might be == primary */
mpx_fde->flags &= ~TEVENT_FD_WRITE;
}
primary->mpx.total_flags &= ~TEVENT_FD_WRITE;
}
/*
* tevent_common_fd_mpx_additional_flags() modifies
* fde->additional_flags for all fdes belonging together.
*/
static inline
void tevent_common_fd_mpx_additional_flags(struct tevent_fd *any_fde,
uint64_t clear_flags,
uint64_t add_flags)
{
struct tevent_fd *primary = tevent_common_fd_mpx_primary(any_fde);
struct tevent_fd_mpx *mpx = NULL;
primary->additional_flags &= ~clear_flags;
primary->additional_flags |= add_flags;
for (mpx = primary->mpx.list; mpx != NULL; mpx = mpx->next) {
struct tevent_fd *mpx_fde = mpx->fde;
/* we don't care that mpx_fde might be == primary */
mpx_fde->additional_flags &= ~clear_flags;
mpx_fde->additional_flags |= add_flags;
}
}
/*
* tevent_common_fd_mpx_disarm_all() detaches
* all fdes currently belonging together from each other
* and also from the tevent_context, which means their
* handler will never be called again.
*/
static inline
void tevent_common_fd_mpx_disarm_all(struct tevent_fd *any_fde)
{
struct tevent_fd *primary = tevent_common_fd_mpx_primary(any_fde);
struct tevent_fd_mpx *mpx = NULL, *next = NULL;
for (mpx = primary->mpx.list; mpx != NULL; mpx = next) {
struct tevent_fd *mpx_fde = mpx->fde;
next = mpx->next;
DLIST_REMOVE(primary->mpx.list, mpx);
if (mpx_fde == primary) {
/* primary is handled below */
continue;
}
tevent_common_fd_disarm(mpx_fde);
}
tevent_common_fd_disarm(primary);
}
/*
* tevent_common_fd_mpx_select() selects the handler that
* should be called for the given low level event.
*
* Note it's important to pass the primary fde!
*/
static inline
struct tevent_fd *tevent_common_fd_mpx_select(struct tevent_fd *primary,
uint16_t flags,
bool got_error)
{
struct tevent_fd_mpx *mpx = NULL;
struct tevent_fd *selected = NULL;
/* optimize for the single event case. */
if (!primary->mpx.has_mpx) {
/*
* If we got an error, we won't report it if
* the caller only asked for TEVENT_FD_WRITE.
*/
if (got_error && !(primary->flags & TEVENT_FD_READ)) {
return NULL;
}
if (flags & primary->flags) {
return primary;
}
return NULL;
}
for (mpx = primary->mpx.list; mpx != NULL; mpx = mpx->next) {
struct tevent_fd *mpx_fde = mpx->fde;
/*
* If we got an error, we won't report it if
* the caller only asked for TEVENT_FD_WRITE.
*/
if (got_error && !(mpx_fde->flags & TEVENT_FD_READ)) {
continue;
}
if (flags & mpx_fde->flags) {
selected = mpx_fde;
break;
}
}
if (selected == NULL) {
return NULL;
}
/*
* Maintain fairness and demote the just selected fde
*/
DLIST_DEMOTE_SHORT(primary->mpx.list, &selected->mpx);
return selected;
}
/*
* tevent_common_fd_mpx_add() searches for an existing (active) fde
* for the same low level fd and adds the given 'add_fde'
* as multiplexed to the found fde.
*
* If another fde was found it is returned.
* NULL is returned to indicate no match
*/
static inline
struct tevent_fd *tevent_common_fd_mpx_add(struct tevent_fd *add_fde)
{
struct tevent_context *ev = add_fde->event_ctx;
struct tevent_fd *add_primary = tevent_common_fd_mpx_primary(add_fde);
uint16_t add_flags = tevent_common_fd_mpx_flags(add_primary);
struct tevent_fd *mpx_fde = NULL;
struct tevent_fd *mpx_primary = NULL;
struct tevent_fd_mpx *tmp = NULL;
struct tevent_fd_mpx *next = NULL;
/* Find the existing fde that caused the EEXIST error. */
for (mpx_fde = ev->fd_events; mpx_fde; mpx_fde = mpx_fde->next) {
mpx_primary = tevent_common_fd_mpx_primary(mpx_fde);
if (mpx_primary->fd != add_primary->fd) {
mpx_primary = NULL;
continue;
}
if (mpx_primary == add_primary) {
mpx_primary = NULL;
continue;
}
if (add_flags != 0 &&
tevent_common_fd_mpx_flags(mpx_primary) == 0)
{
/*
* only active events should match
*/
mpx_primary = NULL;
continue;
}
break;
}
if (mpx_primary == NULL) {
tevent_debug(ev, TEVENT_DEBUG_FATAL,
"can't find multiplex fde for fd[%d]",
add_fde->fd);
return NULL;
}
/*
* If add_primary is not in it's own list
* we add it in order to simplify the loop below.
*/
if (add_primary->mpx.prev == NULL && add_primary->mpx.next == NULL) {
DLIST_ADD_END(add_primary->mpx.list, &add_primary->mpx);
}
/*
* Add the new mpx_primary to its own list before others,
* if it is not already added.
*/
if (mpx_primary->mpx.prev == NULL && mpx_primary->mpx.next == NULL) {
DLIST_ADD_END(mpx_primary->mpx.list, &mpx_primary->mpx);
}
/*
* Now we clear all entries and move them to the
* new primary
*/
for (tmp = add_primary->mpx.list; tmp != NULL; tmp = next) {
struct tevent_fd *tmp_fde = tmp->fde;
next = tmp->next;
DLIST_REMOVE(add_primary->mpx.list, tmp);
tevent_common_fd_mpx_reinit(tmp_fde);
DLIST_ADD_END(mpx_primary->mpx.list, tmp);
tmp->primary = mpx_primary;
tmp->has_mpx = true;
}
mpx_primary->mpx.has_mpx = true;
return mpx_primary;
}
/*
* tevent_common_fd_mpx_update() calls tevent_common_fd_mpx_update_flags()
* and compares tevent_common_fd_mpx_flags() before and after.
*
* When there's a low level update needed the primary fde,
* otherwise NULL is returned.
*/
static inline
struct tevent_fd *tevent_common_fd_mpx_update(struct tevent_fd *update_fde)
{
struct tevent_fd *primary = tevent_common_fd_mpx_primary(update_fde);
uint16_t old_total_flags;
uint16_t new_total_flags;
old_total_flags = primary->mpx.total_flags;
tevent_common_fd_mpx_update_flags(primary);
new_total_flags = primary->mpx.total_flags;
if (old_total_flags == new_total_flags) {
/* No update needed */
return NULL;
}
return primary;
}
/*
* tevent_common_fd_mpx_remove() removes remove_fde from its possible primary,
* if remove_fde is a primary itself, a new primary is selected.
*
* The remaining primary or NULL is returned.
*/
static inline
struct tevent_fd *tevent_common_fd_mpx_remove(struct tevent_fd *remove_fde)
{
struct tevent_fd *primary = tevent_common_fd_mpx_primary(remove_fde);
struct tevent_fd_mpx *mpx = NULL, *next = NULL;
struct tevent_fd *new_primary = NULL;
DLIST_REMOVE(primary->mpx.list, &remove_fde->mpx);
if (primary != remove_fde) {
tevent_common_fd_mpx_reinit(remove_fde);
return primary;
}
for (mpx = primary->mpx.list; mpx != NULL; mpx = next) {
struct tevent_fd *mpx_fde = mpx->fde;
next = mpx->next;
DLIST_REMOVE(primary->mpx.list, &mpx_fde->mpx);
tevent_common_fd_mpx_reinit(mpx_fde);
mpx->primary = new_primary;
if (new_primary == NULL) {
/*
* Select the first one as the new primary and add
* itself as the first mpx-fde to the mpx list
*/
new_primary = mpx_fde;
DLIST_ADD(new_primary->mpx.list, &mpx_fde->mpx);
continue;
}
new_primary->mpx.has_mpx = true;
mpx->has_mpx = true;
DLIST_ADD_END(new_primary->mpx.list, &mpx_fde->mpx);
}
/* primary == remove_fde */
tevent_common_fd_mpx_reinit(primary);
return new_primary;
}

View File

@ -170,6 +170,7 @@ _PRIVATE_ bool tevent_poll_event_add_fd_internal(struct tevent_context *ev,
size_t num_fdes;
fde->additional_flags = UINT64_MAX;
tevent_common_fd_mpx_reinit(fde);
talloc_set_destructor(fde, poll_event_fd_destructor);
if (fde->flags == 0) {