1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-11 05:18:09 +03:00
samba-mirror/lib/tevent/tevent_threads.c
Volker Lendecke f6aaece578 tevent: Add threaded immediate activation
This is infrastructure to improve our async r/w result handling and latency.
The pthreadpool signalling goes through a pipe. This has downsides: The main
event loop has to go through a read on the pipe before it can ship the result.
Also, it is not guaranteed by poll/epoll that the pthreadpool signal pipe is
handled with top priority. When an async pread/pwrite has finished, we should
immediately ship the result to the client, not waiting for anything else.

This patch enables tevent_immediate structs as job signalling. This means a
busy main tevent loop will handle the threaded job completion before any timed
or file descriptor events. Opposite to Jeremy's tevent_thread_proxy this is
done by a modification of the main event loop by looking at a linked list under
a central mutex.

Regarding performance: In a later commit I've created a test that does nothing
but fire one immediate over and over again. If you add a phread_mutex_lock and
unlock pair in the immediate handler, you lose roughly 25% of rounds per
second, so it is measurable. It is questionable that will be measurable in the
real world, but to counter concerns activation of immediates needs to go
through a new struct tevent_threaded_context. Only if such a
tevent_threaded_context exists for a tevent context, the main loop takes the
hit to look at the mutex'ed list of finished jobs.

This patch by design does not care about talloc hierarchies. The idea is that
the main thread owning the tevent context creates a chunk of memory and
prepares the tevent_immediate indication job completion. The main thread hands
the memory chunk together with the immediate as a job description over to a
helper thread. The helper thread does its job and upon completion calls
tevent_threaded_schedule_immediate with the already-prepared immediate. From
that point on memory ownership is again transferred to the main thread.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
2016-08-24 01:33:48 +02:00

493 lines
11 KiB
C

/*
tevent event library.
Copyright (C) Jeremy Allison 2015
** NOTE! The following LGPL license applies to the tevent
** library. This does NOT imply that all of Samba is released
** under the LGPL
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 3 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/
#include "replace.h"
#include "system/filesys.h"
#include "talloc.h"
#include "tevent.h"
#include "tevent_internal.h"
#include "tevent_util.h"
#if defined(HAVE_PTHREAD)
#include <pthread.h>
struct tevent_immediate_list {
struct tevent_immediate_list *next, *prev;
tevent_immediate_handler_t handler;
struct tevent_immediate *im;
void *private_ptr;
};
struct tevent_thread_proxy {
pthread_mutex_t mutex;
struct tevent_context *dest_ev_ctx;
int read_fd;
int write_fd;
struct tevent_fd *pipe_read_fde;
/* Pending events list. */
struct tevent_immediate_list *im_list;
/* Completed events list. */
struct tevent_immediate_list *tofree_im_list;
struct tevent_immediate *free_im;
};
static void free_im_list(struct tevent_immediate_list **pp_list_head)
{
struct tevent_immediate_list *im_entry = NULL;
struct tevent_immediate_list *im_next = NULL;
for (im_entry = *pp_list_head; im_entry; im_entry = im_next) {
im_next = im_entry->next;
DLIST_REMOVE(*pp_list_head, im_entry);
TALLOC_FREE(im_entry);
}
}
static void free_list_handler(struct tevent_context *ev,
struct tevent_immediate *im,
void *private_ptr)
{
struct tevent_thread_proxy *tp =
talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
int ret;
ret = pthread_mutex_lock(&tp->mutex);
if (ret != 0) {
abort();
/* Notreached. */
return;
}
free_im_list(&tp->tofree_im_list);
ret = pthread_mutex_unlock(&tp->mutex);
if (ret != 0) {
abort();
/* Notreached. */
return;
}
}
static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
{
struct tevent_immediate_list *im_entry = NULL;
struct tevent_immediate_list *im_next = NULL;
for (im_entry = tp->im_list; im_entry; im_entry = im_next) {
im_next = im_entry->next;
DLIST_REMOVE(tp->im_list, im_entry);
tevent_schedule_immediate(im_entry->im,
tp->dest_ev_ctx,
im_entry->handler,
im_entry->private_ptr);
/* Move from pending list to free list. */
DLIST_ADD(tp->tofree_im_list, im_entry);
}
if (tp->tofree_im_list != NULL) {
/*
* Once the current immediate events
* are processed, we need to reschedule
* ourselves to free them. This works
* as tevent_schedule_immediate()
* always adds events to the *END* of
* the immediate events list.
*/
tevent_schedule_immediate(tp->free_im,
tp->dest_ev_ctx,
free_list_handler,
tp);
}
}
static void pipe_read_handler(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags,
void *private_ptr)
{
struct tevent_thread_proxy *tp =
talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
ssize_t len = 64;
int ret;
ret = pthread_mutex_lock(&tp->mutex);
if (ret != 0) {
abort();
/* Notreached. */
return;
}
/*
* Clear out all data in the pipe. We
* don't really care if this returns -1.
*/
while (len == 64) {
char buf[64];
len = read(tp->read_fd, buf, 64);
};
schedule_immediate_functions(tp);
ret = pthread_mutex_unlock(&tp->mutex);
if (ret != 0) {
abort();
/* Notreached. */
return;
}
}
static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
{
int ret;
ret = pthread_mutex_lock(&tp->mutex);
if (ret != 0) {
abort();
/* Notreached. */
return 0;
}
TALLOC_FREE(tp->pipe_read_fde);
if (tp->read_fd != -1) {
(void)close(tp->read_fd);
tp->read_fd = -1;
}
if (tp->write_fd != -1) {
(void)close(tp->write_fd);
tp->write_fd = -1;
}
/* Hmmm. It's probably an error if we get here with
any non-NULL immediate entries.. */
free_im_list(&tp->im_list);
free_im_list(&tp->tofree_im_list);
TALLOC_FREE(tp->free_im);
ret = pthread_mutex_unlock(&tp->mutex);
if (ret != 0) {
abort();
/* Notreached. */
return 0;
}
ret = pthread_mutex_destroy(&tp->mutex);
if (ret != 0) {
abort();
/* Notreached. */
return 0;
}
return 0;
}
/*
* Create a struct that can be passed to other threads
* to allow them to signal the struct tevent_context *
* passed in.
*/
struct tevent_thread_proxy *tevent_thread_proxy_create(
struct tevent_context *dest_ev_ctx)
{
int ret;
int pipefds[2];
struct tevent_thread_proxy *tp;
tp = talloc_zero(dest_ev_ctx, struct tevent_thread_proxy);
if (tp == NULL) {
return NULL;
}
ret = pthread_mutex_init(&tp->mutex, NULL);
if (ret != 0) {
goto fail;
}
tp->dest_ev_ctx = dest_ev_ctx;
tp->read_fd = -1;
tp->write_fd = -1;
talloc_set_destructor(tp, tevent_thread_proxy_destructor);
ret = pipe(pipefds);
if (ret == -1) {
goto fail;
}
tp->read_fd = pipefds[0];
tp->write_fd = pipefds[1];
ret = ev_set_blocking(pipefds[0], false);
if (ret != 0) {
goto fail;
}
ret = ev_set_blocking(pipefds[1], false);
if (ret != 0) {
goto fail;
}
if (!ev_set_close_on_exec(pipefds[0])) {
goto fail;
}
if (!ev_set_close_on_exec(pipefds[1])) {
goto fail;
}
tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
tp,
tp->read_fd,
TEVENT_FD_READ,
pipe_read_handler,
tp);
if (tp->pipe_read_fde == NULL) {
goto fail;
}
/*
* Create an immediate event to free
* completed lists.
*/
tp->free_im = tevent_create_immediate(tp);
if (tp->free_im == NULL) {
goto fail;
}
return tp;
fail:
TALLOC_FREE(tp);
return NULL;
}
/*
* This function schedules an immediate event to be called with argument
* *pp_private in the thread context of dest_ev_ctx. Caller doesn't
* wait for activation to take place, this is simply fire-and-forget.
*
* pp_im must be a pointer to an immediate event talloced on
* a context owned by the calling thread, or the NULL context.
* Ownership of *pp_im will be transfered to the tevent library.
*
* pp_private can be null, or contents of *pp_private must be
* talloc'ed memory on a context owned by the calling thread
* or the NULL context. If non-null, ownership of *pp_private will
* be transfered to the tevent library.
*
* If you want to return a message, have the destination use the
* same function call to send back to the caller.
*/
void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
struct tevent_immediate **pp_im,
tevent_immediate_handler_t handler,
void *pp_private_data)
{
struct tevent_immediate_list *im_entry;
int ret;
char c;
ssize_t written;
ret = pthread_mutex_lock(&tp->mutex);
if (ret != 0) {
abort();
/* Notreached. */
return;
}
if (tp->write_fd == -1) {
/* In the process of being destroyed. Ignore. */
goto end;
}
/* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
im_entry = talloc_zero(NULL, struct tevent_immediate_list);
if (im_entry == NULL) {
goto end;
}
im_entry->handler = handler;
im_entry->im = talloc_move(im_entry, pp_im);
if (pp_private_data != NULL) {
void **pptr = (void **)pp_private_data;
im_entry->private_ptr = talloc_move(im_entry, pptr);
}
DLIST_ADD(tp->im_list, im_entry);
/* And notify the dest_ev_ctx to wake up. */
c = '\0';
do {
written = write(tp->write_fd, &c, 1);
} while (written == -1 && errno == EINTR);
end:
ret = pthread_mutex_unlock(&tp->mutex);
if (ret != 0) {
abort();
/* Notreached. */
}
}
#else
/* !HAVE_PTHREAD */
struct tevent_thread_proxy *tevent_thread_proxy_create(
struct tevent_context *dest_ev_ctx)
{
errno = ENOSYS;
return NULL;
}
void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
struct tevent_immediate **pp_im,
tevent_immediate_handler_t handler,
void *pp_private_data)
{
;
}
#endif
static int tevent_threaded_context_destructor(
struct tevent_threaded_context *tctx)
{
if (tctx->event_ctx != NULL) {
DLIST_REMOVE(tctx->event_ctx->threaded_contexts, tctx);
}
return 0;
}
struct tevent_threaded_context *tevent_threaded_context_create(
TALLOC_CTX *mem_ctx, struct tevent_context *ev)
{
#ifdef HAVE_PTHREAD
struct tevent_threaded_context *tctx;
int ret;
ret = tevent_common_wakeup_init(ev);
if (ret != 0) {
errno = ret;
return NULL;
}
tctx = talloc(mem_ctx, struct tevent_threaded_context);
if (tctx == NULL) {
return NULL;
}
tctx->event_ctx = ev;
DLIST_ADD(ev->threaded_contexts, tctx);
talloc_set_destructor(tctx, tevent_threaded_context_destructor);
return tctx;
#else
errno = ENOSYS;
return NULL;
#endif
}
void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
struct tevent_immediate *im,
tevent_immediate_handler_t handler,
void *private_data,
const char *handler_name,
const char *location)
{
#ifdef HAVE_PTHREAD
struct tevent_context *ev = tctx->event_ctx;
int ret;
if ((im->event_ctx != NULL) || (handler == NULL)) {
abort();
}
im->event_ctx = ev;
im->handler = handler;
im->private_data = private_data;
im->handler_name = handler_name;
im->schedule_location = location;
im->cancel_fn = NULL;
im->additional_data = NULL;
ret = pthread_mutex_lock(&ev->scheduled_mutex);
if (ret != 0) {
abort();
}
DLIST_ADD_END(ev->scheduled_immediates, im);
ret = pthread_mutex_unlock(&ev->scheduled_mutex);
if (ret != 0) {
abort();
}
/*
* We might want to wake up the main thread under the lock. We
* had a slightly similar situation in pthreadpool, changed
* with 1c4284c7395f23. This is not exactly the same, as the
* wakeup is only a last-resort thing in case the main thread
* is sleeping. Doing the wakeup under the lock can easily
* lead to a contended mutex, which is much more expensive
* than a noncontended one. So I'd opt for the lower footprint
* initially. Maybe we have to change that later.
*/
tevent_common_wakeup(ev);
#else
/*
* tevent_threaded_context_create() returned NULL with ENOSYS...
*/
abort();
#endif
}
void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
{
#ifdef HAVE_PTHREAD
int ret;
ret = pthread_mutex_lock(&ev->scheduled_mutex);
if (ret != 0) {
abort();
}
while (ev->scheduled_immediates != NULL) {
struct tevent_immediate *im = ev->scheduled_immediates;
DLIST_REMOVE(ev->scheduled_immediates, im);
DLIST_ADD_END(ev->immediate_events, im);
}
ret = pthread_mutex_unlock(&ev->scheduled_mutex);
if (ret != 0) {
abort();
}
#else
/*
* tevent_threaded_context_create() returned NULL with ENOSYS...
*/
abort();
#endif
}