mirror of
https://github.com/samba-team/samba.git
synced 2025-01-06 13:18:07 +03:00
d94b9c8124
Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Ralph Boehme <slow@samba.org>
602 lines
13 KiB
C
602 lines
13 KiB
C
/*
|
|
tevent event library.
|
|
|
|
Copyright (C) Jeremy Allison 2015
|
|
|
|
** NOTE! The following LGPL license applies to the tevent
|
|
** library. This does NOT imply that all of Samba is released
|
|
** under the LGPL
|
|
|
|
This library is free software; you can redistribute it and/or
|
|
modify it under the terms of the GNU Lesser General Public
|
|
License as published by the Free Software Foundation; either
|
|
version 3 of the License, or (at your option) any later version.
|
|
|
|
This library is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
Lesser General Public License for more details.
|
|
|
|
You should have received a copy of the GNU Lesser General Public
|
|
License along with this library; if not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "replace.h"
|
|
#include "system/filesys.h"
|
|
#include "talloc.h"
|
|
#include "tevent.h"
|
|
#include "tevent_internal.h"
|
|
#include "tevent_util.h"
|
|
|
|
#ifdef HAVE_PTHREAD
|
|
#include "system/threads.h"
|
|
|
|
struct tevent_immediate_list {
|
|
struct tevent_immediate_list *next, *prev;
|
|
tevent_immediate_handler_t handler;
|
|
struct tevent_immediate *im;
|
|
void *private_ptr;
|
|
};
|
|
|
|
struct tevent_thread_proxy {
|
|
pthread_mutex_t mutex;
|
|
struct tevent_context *dest_ev_ctx;
|
|
int read_fd;
|
|
int write_fd;
|
|
struct tevent_fd *pipe_read_fde;
|
|
/* Pending events list. */
|
|
struct tevent_immediate_list *im_list;
|
|
/* Completed events list. */
|
|
struct tevent_immediate_list *tofree_im_list;
|
|
struct tevent_immediate *free_im;
|
|
};
|
|
|
|
static void free_im_list(struct tevent_immediate_list **pp_list_head)
|
|
{
|
|
struct tevent_immediate_list *im_entry = NULL;
|
|
struct tevent_immediate_list *im_next = NULL;
|
|
|
|
for (im_entry = *pp_list_head; im_entry; im_entry = im_next) {
|
|
im_next = im_entry->next;
|
|
DLIST_REMOVE(*pp_list_head, im_entry);
|
|
TALLOC_FREE(im_entry);
|
|
}
|
|
}
|
|
|
|
static void free_list_handler(struct tevent_context *ev,
|
|
struct tevent_immediate *im,
|
|
void *private_ptr)
|
|
{
|
|
struct tevent_thread_proxy *tp =
|
|
talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
|
|
int ret;
|
|
|
|
ret = pthread_mutex_lock(&tp->mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
/* Notreached. */
|
|
return;
|
|
}
|
|
|
|
free_im_list(&tp->tofree_im_list);
|
|
|
|
ret = pthread_mutex_unlock(&tp->mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
/* Notreached. */
|
|
return;
|
|
}
|
|
}
|
|
|
|
static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
|
|
{
|
|
struct tevent_immediate_list *im_entry = NULL;
|
|
struct tevent_immediate_list *im_next = NULL;
|
|
|
|
for (im_entry = tp->im_list; im_entry; im_entry = im_next) {
|
|
im_next = im_entry->next;
|
|
DLIST_REMOVE(tp->im_list, im_entry);
|
|
|
|
tevent_schedule_immediate(im_entry->im,
|
|
tp->dest_ev_ctx,
|
|
im_entry->handler,
|
|
im_entry->private_ptr);
|
|
|
|
/* Move from pending list to free list. */
|
|
DLIST_ADD(tp->tofree_im_list, im_entry);
|
|
}
|
|
if (tp->tofree_im_list != NULL) {
|
|
/*
|
|
* Once the current immediate events
|
|
* are processed, we need to reschedule
|
|
* ourselves to free them. This works
|
|
* as tevent_schedule_immediate()
|
|
* always adds events to the *END* of
|
|
* the immediate events list.
|
|
*/
|
|
tevent_schedule_immediate(tp->free_im,
|
|
tp->dest_ev_ctx,
|
|
free_list_handler,
|
|
tp);
|
|
}
|
|
}
|
|
|
|
static void pipe_read_handler(struct tevent_context *ev,
|
|
struct tevent_fd *fde,
|
|
uint16_t flags,
|
|
void *private_ptr)
|
|
{
|
|
struct tevent_thread_proxy *tp =
|
|
talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
|
|
ssize_t len = 64;
|
|
int ret;
|
|
|
|
ret = pthread_mutex_lock(&tp->mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
/* Notreached. */
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* Clear out all data in the pipe. We
|
|
* don't really care if this returns -1.
|
|
*/
|
|
while (len == 64) {
|
|
char buf[64];
|
|
len = read(tp->read_fd, buf, 64);
|
|
};
|
|
|
|
schedule_immediate_functions(tp);
|
|
|
|
ret = pthread_mutex_unlock(&tp->mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
/* Notreached. */
|
|
return;
|
|
}
|
|
}
|
|
|
|
static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
|
|
{
|
|
int ret;
|
|
|
|
ret = pthread_mutex_lock(&tp->mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
/* Notreached. */
|
|
return 0;
|
|
}
|
|
|
|
TALLOC_FREE(tp->pipe_read_fde);
|
|
|
|
if (tp->read_fd != -1) {
|
|
(void)close(tp->read_fd);
|
|
tp->read_fd = -1;
|
|
}
|
|
if (tp->write_fd != -1) {
|
|
(void)close(tp->write_fd);
|
|
tp->write_fd = -1;
|
|
}
|
|
|
|
/* Hmmm. It's probably an error if we get here with
|
|
any non-NULL immediate entries.. */
|
|
|
|
free_im_list(&tp->im_list);
|
|
free_im_list(&tp->tofree_im_list);
|
|
|
|
TALLOC_FREE(tp->free_im);
|
|
|
|
ret = pthread_mutex_unlock(&tp->mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
/* Notreached. */
|
|
return 0;
|
|
}
|
|
|
|
ret = pthread_mutex_destroy(&tp->mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
/* Notreached. */
|
|
return 0;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
/*
|
|
* Create a struct that can be passed to other threads
|
|
* to allow them to signal the struct tevent_context *
|
|
* passed in.
|
|
*/
|
|
|
|
struct tevent_thread_proxy *tevent_thread_proxy_create(
|
|
struct tevent_context *dest_ev_ctx)
|
|
{
|
|
int ret;
|
|
int pipefds[2];
|
|
struct tevent_thread_proxy *tp;
|
|
|
|
if (dest_ev_ctx->wrapper.glue != NULL) {
|
|
/*
|
|
* stacking of wrappers is not supported
|
|
*/
|
|
tevent_debug(dest_ev_ctx->wrapper.glue->main_ev,
|
|
TEVENT_DEBUG_FATAL,
|
|
"%s() not allowed on a wrapper context\n",
|
|
__func__);
|
|
errno = EINVAL;
|
|
return NULL;
|
|
}
|
|
|
|
tp = talloc_zero(dest_ev_ctx, struct tevent_thread_proxy);
|
|
if (tp == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
ret = pthread_mutex_init(&tp->mutex, NULL);
|
|
if (ret != 0) {
|
|
goto fail;
|
|
}
|
|
|
|
tp->dest_ev_ctx = dest_ev_ctx;
|
|
tp->read_fd = -1;
|
|
tp->write_fd = -1;
|
|
|
|
talloc_set_destructor(tp, tevent_thread_proxy_destructor);
|
|
|
|
ret = pipe(pipefds);
|
|
if (ret == -1) {
|
|
goto fail;
|
|
}
|
|
|
|
tp->read_fd = pipefds[0];
|
|
tp->write_fd = pipefds[1];
|
|
|
|
ret = ev_set_blocking(pipefds[0], false);
|
|
if (ret != 0) {
|
|
goto fail;
|
|
}
|
|
ret = ev_set_blocking(pipefds[1], false);
|
|
if (ret != 0) {
|
|
goto fail;
|
|
}
|
|
if (!ev_set_close_on_exec(pipefds[0])) {
|
|
goto fail;
|
|
}
|
|
if (!ev_set_close_on_exec(pipefds[1])) {
|
|
goto fail;
|
|
}
|
|
|
|
tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
|
|
tp,
|
|
tp->read_fd,
|
|
TEVENT_FD_READ,
|
|
pipe_read_handler,
|
|
tp);
|
|
if (tp->pipe_read_fde == NULL) {
|
|
goto fail;
|
|
}
|
|
|
|
/*
|
|
* Create an immediate event to free
|
|
* completed lists.
|
|
*/
|
|
tp->free_im = tevent_create_immediate(tp);
|
|
if (tp->free_im == NULL) {
|
|
goto fail;
|
|
}
|
|
|
|
return tp;
|
|
|
|
fail:
|
|
|
|
TALLOC_FREE(tp);
|
|
return NULL;
|
|
}
|
|
|
|
/*
|
|
* This function schedules an immediate event to be called with argument
|
|
* *pp_private in the thread context of dest_ev_ctx. Caller doesn't
|
|
* wait for activation to take place, this is simply fire-and-forget.
|
|
*
|
|
* pp_im must be a pointer to an immediate event talloced on
|
|
* a context owned by the calling thread, or the NULL context.
|
|
* Ownership of *pp_im will be transferred to the tevent library.
|
|
*
|
|
* pp_private can be null, or contents of *pp_private must be
|
|
* talloc'ed memory on a context owned by the calling thread
|
|
* or the NULL context. If non-null, ownership of *pp_private will
|
|
* be transferred to the tevent library.
|
|
*
|
|
* If you want to return a message, have the destination use the
|
|
* same function call to send back to the caller.
|
|
*/
|
|
|
|
|
|
void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
|
|
struct tevent_immediate **pp_im,
|
|
tevent_immediate_handler_t handler,
|
|
void *pp_private_data)
|
|
{
|
|
struct tevent_immediate_list *im_entry;
|
|
int ret;
|
|
char c;
|
|
ssize_t written;
|
|
|
|
ret = pthread_mutex_lock(&tp->mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
/* Notreached. */
|
|
return;
|
|
}
|
|
|
|
if (tp->write_fd == -1) {
|
|
/* In the process of being destroyed. Ignore. */
|
|
goto end;
|
|
}
|
|
|
|
/* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
|
|
im_entry = talloc_zero(NULL, struct tevent_immediate_list);
|
|
if (im_entry == NULL) {
|
|
goto end;
|
|
}
|
|
|
|
im_entry->handler = handler;
|
|
im_entry->im = talloc_move(im_entry, pp_im);
|
|
|
|
if (pp_private_data != NULL) {
|
|
void **pptr = (void **)pp_private_data;
|
|
im_entry->private_ptr = talloc_move(im_entry, pptr);
|
|
}
|
|
|
|
DLIST_ADD(tp->im_list, im_entry);
|
|
|
|
/* And notify the dest_ev_ctx to wake up. */
|
|
c = '\0';
|
|
do {
|
|
written = write(tp->write_fd, &c, 1);
|
|
} while (written == -1 && errno == EINTR);
|
|
|
|
end:
|
|
|
|
ret = pthread_mutex_unlock(&tp->mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
/* Notreached. */
|
|
}
|
|
}
|
|
#else
|
|
/* !HAVE_PTHREAD */
|
|
struct tevent_thread_proxy *tevent_thread_proxy_create(
|
|
struct tevent_context *dest_ev_ctx)
|
|
{
|
|
errno = ENOSYS;
|
|
return NULL;
|
|
}
|
|
|
|
void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
|
|
struct tevent_immediate **pp_im,
|
|
tevent_immediate_handler_t handler,
|
|
void *pp_private_data)
|
|
{
|
|
;
|
|
}
|
|
#endif
|
|
|
|
static int tevent_threaded_context_destructor(
|
|
struct tevent_threaded_context *tctx)
|
|
{
|
|
struct tevent_context *main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
|
|
int ret;
|
|
|
|
if (main_ev != NULL) {
|
|
DLIST_REMOVE(main_ev->threaded_contexts, tctx);
|
|
}
|
|
|
|
/*
|
|
* We have to coordinate with _tevent_threaded_schedule_immediate's
|
|
* unlock of the event_ctx_mutex. We're in the main thread here,
|
|
* and we can be scheduled before the helper thread finalizes its
|
|
* call _tevent_threaded_schedule_immediate. This means we would
|
|
* pthreadpool_destroy a locked mutex, which is illegal.
|
|
*/
|
|
ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
}
|
|
|
|
ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
}
|
|
|
|
ret = pthread_mutex_destroy(&tctx->event_ctx_mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
struct tevent_threaded_context *tevent_threaded_context_create(
|
|
TALLOC_CTX *mem_ctx, struct tevent_context *ev)
|
|
{
|
|
#ifdef HAVE_PTHREAD
|
|
struct tevent_context *main_ev = tevent_wrapper_main_ev(ev);
|
|
struct tevent_threaded_context *tctx;
|
|
int ret;
|
|
|
|
ret = tevent_common_wakeup_init(main_ev);
|
|
if (ret != 0) {
|
|
errno = ret;
|
|
return NULL;
|
|
}
|
|
|
|
tctx = talloc(mem_ctx, struct tevent_threaded_context);
|
|
if (tctx == NULL) {
|
|
return NULL;
|
|
}
|
|
tctx->event_ctx = ev;
|
|
|
|
ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL);
|
|
if (ret != 0) {
|
|
TALLOC_FREE(tctx);
|
|
return NULL;
|
|
}
|
|
|
|
DLIST_ADD(main_ev->threaded_contexts, tctx);
|
|
talloc_set_destructor(tctx, tevent_threaded_context_destructor);
|
|
|
|
return tctx;
|
|
#else
|
|
errno = ENOSYS;
|
|
return NULL;
|
|
#endif
|
|
}
|
|
|
|
static int tevent_threaded_schedule_immediate_destructor(struct tevent_immediate *im)
|
|
{
|
|
if (im->event_ctx != NULL) {
|
|
abort();
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
|
|
struct tevent_immediate *im,
|
|
tevent_immediate_handler_t handler,
|
|
void *private_data,
|
|
const char *handler_name,
|
|
const char *location)
|
|
{
|
|
#ifdef HAVE_PTHREAD
|
|
const char *create_location = im->create_location;
|
|
struct tevent_context *main_ev = NULL;
|
|
struct tevent_wrapper_glue *glue = NULL;
|
|
int ret, wakeup_fd;
|
|
|
|
ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
}
|
|
|
|
if (tctx->event_ctx == NULL) {
|
|
/*
|
|
* Our event context is already gone.
|
|
*/
|
|
ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
}
|
|
return;
|
|
}
|
|
|
|
glue = tctx->event_ctx->wrapper.glue;
|
|
|
|
if ((im->event_ctx != NULL) || (handler == NULL)) {
|
|
abort();
|
|
}
|
|
if (im->destroyed) {
|
|
abort();
|
|
}
|
|
if (im->busy) {
|
|
abort();
|
|
}
|
|
|
|
main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
|
|
|
|
*im = (struct tevent_immediate) {
|
|
.event_ctx = tctx->event_ctx,
|
|
.wrapper = glue,
|
|
.handler = handler,
|
|
.private_data = private_data,
|
|
.handler_name = handler_name,
|
|
.create_location = create_location,
|
|
.schedule_location = location,
|
|
};
|
|
|
|
/*
|
|
* Make sure the event won't be destroyed while
|
|
* it's part of the ev->scheduled_immediates list.
|
|
* _tevent_schedule_immediate() will reset the destructor
|
|
* in tevent_common_threaded_activate_immediate().
|
|
*/
|
|
talloc_set_destructor(im, tevent_threaded_schedule_immediate_destructor);
|
|
|
|
ret = pthread_mutex_lock(&main_ev->scheduled_mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
}
|
|
|
|
DLIST_ADD_END(main_ev->scheduled_immediates, im);
|
|
wakeup_fd = main_ev->wakeup_fd;
|
|
|
|
ret = pthread_mutex_unlock(&main_ev->scheduled_mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
}
|
|
|
|
ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
}
|
|
|
|
/*
|
|
* We might want to wake up the main thread under the lock. We
|
|
* had a slightly similar situation in pthreadpool, changed
|
|
* with 1c4284c7395f23. This is not exactly the same, as the
|
|
* wakeup is only a last-resort thing in case the main thread
|
|
* is sleeping. Doing the wakeup under the lock can easily
|
|
* lead to a contended mutex, which is much more expensive
|
|
* than a noncontended one. So I'd opt for the lower footprint
|
|
* initially. Maybe we have to change that later.
|
|
*/
|
|
tevent_common_wakeup_fd(wakeup_fd);
|
|
#else
|
|
/*
|
|
* tevent_threaded_context_create() returned NULL with ENOSYS...
|
|
*/
|
|
abort();
|
|
#endif
|
|
}
|
|
|
|
void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
|
|
{
|
|
#ifdef HAVE_PTHREAD
|
|
int ret;
|
|
ret = pthread_mutex_lock(&ev->scheduled_mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
}
|
|
|
|
while (ev->scheduled_immediates != NULL) {
|
|
struct tevent_immediate *im = ev->scheduled_immediates;
|
|
struct tevent_immediate copy = *im;
|
|
|
|
DLIST_REMOVE(ev->scheduled_immediates, im);
|
|
|
|
TEVENT_DEBUG(ev, TEVENT_DEBUG_TRACE,
|
|
"Schedule immediate event \"%s\": %p from thread into main\n",
|
|
im->handler_name, im);
|
|
im->handler_name = NULL;
|
|
_tevent_schedule_immediate(im,
|
|
ev,
|
|
copy.handler,
|
|
copy.private_data,
|
|
copy.handler_name,
|
|
copy.schedule_location);
|
|
}
|
|
|
|
ret = pthread_mutex_unlock(&ev->scheduled_mutex);
|
|
if (ret != 0) {
|
|
abort();
|
|
}
|
|
#else
|
|
/*
|
|
* tevent_threaded_context_create() returned NULL with ENOSYS...
|
|
*/
|
|
abort();
|
|
#endif
|
|
}
|