mirror of
https://github.com/samba-team/samba.git
synced 2025-02-28 01:58:17 +03:00
lib/tevent: Add a thread-safe tevent backend
Signed-off-by: Stefan Metzmacher <metze@samba.org>
This commit is contained in:
parent
d860aa2cac
commit
fa71f32411
@ -114,6 +114,7 @@ static void tevent_backend_init(void)
|
||||
{
|
||||
tevent_select_init();
|
||||
tevent_poll_init();
|
||||
tevent_poll_mt_init();
|
||||
tevent_standard_init();
|
||||
#ifdef HAVE_EPOLL
|
||||
tevent_epoll_init();
|
||||
|
@ -315,6 +315,7 @@ void tevent_cleanup_pending_signal_handlers(struct tevent_signal *se);
|
||||
bool tevent_standard_init(void);
|
||||
bool tevent_select_init(void);
|
||||
bool tevent_poll_init(void);
|
||||
bool tevent_poll_mt_init(void);
|
||||
#ifdef HAVE_EPOLL
|
||||
bool tevent_epoll_init(void);
|
||||
#endif
|
||||
|
@ -34,7 +34,8 @@ struct poll_event_context {
|
||||
struct tevent_context *ev;
|
||||
|
||||
/*
|
||||
* A DLIST for fresh fde's
|
||||
* A DLIST for fresh fde's added by poll_event_add_fd but not
|
||||
* picked up yet by poll_event_loop_once
|
||||
*/
|
||||
struct tevent_fd *fresh;
|
||||
|
||||
@ -45,6 +46,11 @@ struct poll_event_context {
|
||||
struct tevent_fd **fdes;
|
||||
unsigned num_fds;
|
||||
|
||||
/*
|
||||
* Signal fd to wake the poll() thread
|
||||
*/
|
||||
int signal_fd;
|
||||
|
||||
/* information for exiting from the event loop */
|
||||
int exit_code;
|
||||
};
|
||||
@ -61,17 +67,125 @@ static int poll_event_context_init(struct tevent_context *ev)
|
||||
return -1;
|
||||
}
|
||||
poll_ev->ev = ev;
|
||||
poll_ev->signal_fd = -1;
|
||||
ev->additional_data = poll_ev;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int poll_event_mt_destructor(struct poll_event_context *poll_ev)
|
||||
{
|
||||
if (poll_ev->signal_fd != -1) {
|
||||
close(poll_ev->signal_fd);
|
||||
poll_ev->signal_fd = -1;
|
||||
}
|
||||
if (poll_ev->num_fds == 0) {
|
||||
return 0;
|
||||
}
|
||||
if (poll_ev->fds[0].fd != -1) {
|
||||
close(poll_ev->fds[0].fd);
|
||||
poll_ev->fds[0].fd = -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static bool set_nonblock(int fd)
|
||||
{
|
||||
int val;
|
||||
|
||||
val = fcntl(fd, F_GETFL, 0);
|
||||
if (val == -1) {
|
||||
return false;
|
||||
}
|
||||
val |= O_NONBLOCK;
|
||||
|
||||
return (fcntl(fd, F_SETFL, val) != -1);
|
||||
}
|
||||
|
||||
static int poll_event_context_init_mt(struct tevent_context *ev)
|
||||
{
|
||||
struct poll_event_context *poll_ev;
|
||||
struct pollfd *pfd;
|
||||
int fds[2];
|
||||
int ret;
|
||||
|
||||
ret = poll_event_context_init(ev);
|
||||
if (ret == -1) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
poll_ev = talloc_get_type_abort(
|
||||
ev->additional_data, struct poll_event_context);
|
||||
|
||||
poll_ev->fds = talloc_zero(poll_ev, struct pollfd);
|
||||
if (poll_ev->fds == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = pipe(fds);
|
||||
if (ret == -1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!set_nonblock(fds[0]) || !set_nonblock(fds[1])) {
|
||||
close(fds[0]);
|
||||
close(fds[1]);
|
||||
return -1;
|
||||
}
|
||||
|
||||
poll_ev->signal_fd = fds[1];
|
||||
|
||||
pfd = &poll_ev->fds[0];
|
||||
pfd->fd = fds[0];
|
||||
pfd->events = (POLLIN|POLLHUP);
|
||||
|
||||
poll_ev->num_fds = 1;
|
||||
|
||||
talloc_set_destructor(poll_ev, poll_event_mt_destructor);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void poll_event_wake_pollthread(struct poll_event_context *poll_ev)
|
||||
{
|
||||
char c;
|
||||
ssize_t ret;
|
||||
|
||||
if (poll_ev->signal_fd == -1) {
|
||||
return;
|
||||
}
|
||||
c = 0;
|
||||
do {
|
||||
ret = write(poll_ev->signal_fd, &c, sizeof(c));
|
||||
} while ((ret == -1) && (errno == EINTR));
|
||||
}
|
||||
|
||||
static void poll_event_drain_signal_fd(struct poll_event_context *poll_ev)
|
||||
{
|
||||
char buf[16];
|
||||
ssize_t ret;
|
||||
int fd;
|
||||
|
||||
if (poll_ev->signal_fd == -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (poll_ev->num_fds < 1) {
|
||||
return;
|
||||
}
|
||||
fd = poll_ev->fds[0].fd;
|
||||
|
||||
do {
|
||||
ret = read(fd, buf, sizeof(buf));
|
||||
} while (ret == sizeof(buf));
|
||||
}
|
||||
|
||||
/*
|
||||
destroy an fd_event
|
||||
*/
|
||||
static int poll_event_fd_destructor(struct tevent_fd *fde)
|
||||
{
|
||||
struct tevent_context *ev = fde->event_ctx;
|
||||
struct poll_event_context *poll_ev = NULL;
|
||||
struct poll_event_context *poll_ev;
|
||||
uint64_t del_idx = fde->additional_flags;
|
||||
|
||||
if (ev == NULL) {
|
||||
@ -82,6 +196,7 @@ static int poll_event_fd_destructor(struct tevent_fd *fde)
|
||||
ev->additional_data, struct poll_event_context);
|
||||
|
||||
poll_ev->fdes[del_idx] = NULL;
|
||||
poll_event_wake_pollthread(poll_ev);
|
||||
done:
|
||||
return tevent_common_fd_destructor(fde);
|
||||
}
|
||||
@ -94,6 +209,21 @@ static int poll_fresh_fde_destructor(struct tevent_fd *fde)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void poll_event_schedule_immediate(struct tevent_immediate *im,
|
||||
struct tevent_context *ev,
|
||||
tevent_immediate_handler_t handler,
|
||||
void *private_data,
|
||||
const char *handler_name,
|
||||
const char *location)
|
||||
{
|
||||
struct poll_event_context *poll_ev = talloc_get_type_abort(
|
||||
ev->additional_data, struct poll_event_context);
|
||||
|
||||
tevent_common_schedule_immediate(im, ev, handler, private_data,
|
||||
handler_name, location);
|
||||
poll_event_wake_pollthread(poll_ev);
|
||||
}
|
||||
|
||||
/*
|
||||
add a fd based event
|
||||
return NULL on failure (memory allocation error)
|
||||
@ -131,6 +261,7 @@ static struct tevent_fd *poll_event_add_fd(struct tevent_context *ev,
|
||||
|
||||
DLIST_ADD(poll_ev->fresh, fde);
|
||||
talloc_set_destructor(fde, poll_fresh_fde_destructor);
|
||||
poll_event_wake_pollthread(poll_ev);
|
||||
|
||||
/*
|
||||
* poll_event_loop_poll will take care of the rest in
|
||||
@ -159,6 +290,7 @@ static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
|
||||
poll_ev->fds[idx].events = pollflags;
|
||||
|
||||
fde->flags = flags;
|
||||
poll_event_wake_pollthread(poll_ev);
|
||||
}
|
||||
|
||||
static bool poll_event_setup_fresh(struct tevent_context *ev,
|
||||
@ -246,6 +378,7 @@ static int poll_event_loop_poll(struct tevent_context *ev,
|
||||
ev->additional_data, struct poll_event_context);
|
||||
int pollrtn;
|
||||
int timeout = -1;
|
||||
unsigned first_fd;
|
||||
unsigned i;
|
||||
|
||||
if (ev->signal_events && tevent_common_check_signal(ev)) {
|
||||
@ -257,6 +390,8 @@ static int poll_event_loop_poll(struct tevent_context *ev,
|
||||
timeout += (tvalp->tv_usec + 999) / 1000;
|
||||
}
|
||||
|
||||
poll_event_drain_signal_fd(poll_ev);
|
||||
|
||||
if (!poll_event_setup_fresh(ev, poll_ev)) {
|
||||
return -1;
|
||||
}
|
||||
@ -283,11 +418,13 @@ static int poll_event_loop_poll(struct tevent_context *ev,
|
||||
return 0;
|
||||
}
|
||||
|
||||
first_fd = (poll_ev->signal_fd != -1) ? 1 : 0;
|
||||
|
||||
/* at least one file descriptor is ready - check
|
||||
which ones and call the handler, being careful to allow
|
||||
the handler to remove itself when called */
|
||||
|
||||
for (i=0; i<poll_ev->num_fds; i++) {
|
||||
for (i=first_fd; i<poll_ev->num_fds; i++) {
|
||||
struct pollfd *pfd;
|
||||
struct tevent_fd *fde;
|
||||
uint16_t flags = 0;
|
||||
@ -379,3 +516,21 @@ _PRIVATE_ bool tevent_poll_init(void)
|
||||
{
|
||||
return tevent_register_backend("poll", &poll_event_ops);
|
||||
}
|
||||
|
||||
static const struct tevent_ops poll_event_mt_ops = {
|
||||
.context_init = poll_event_context_init_mt,
|
||||
.add_fd = poll_event_add_fd,
|
||||
.set_fd_close_fn = tevent_common_fd_set_close_fn,
|
||||
.get_fd_flags = tevent_common_fd_get_flags,
|
||||
.set_fd_flags = poll_event_set_fd_flags,
|
||||
.add_timer = tevent_common_add_timer,
|
||||
.schedule_immediate = poll_event_schedule_immediate,
|
||||
.add_signal = tevent_common_add_signal,
|
||||
.loop_once = poll_event_loop_once,
|
||||
.loop_wait = tevent_common_loop_wait,
|
||||
};
|
||||
|
||||
_PRIVATE_ bool tevent_poll_mt_init(void)
|
||||
{
|
||||
return tevent_register_backend("poll_mt", &poll_event_mt_ops);
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user