1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-02 09:47:23 +03:00

pthreadpool: implement pthreadpool_tevent_wrapper_create() infrastructure

This can be used implement a generic per thread impersonation
for thread pools.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
This commit is contained in:
Stefan Metzmacher 2018-04-20 17:12:07 +02:00 committed by Ralph Boehme
parent 3c4cdb2907
commit f9745d8b52
2 changed files with 433 additions and 1 deletions

View File

@ -103,6 +103,8 @@ struct pthreadpool_tevent_glue {
/* Tuple we are keeping track of in this list. */
struct tevent_context *ev;
struct tevent_threaded_context *tctx;
/* recheck monitor fd event */
struct tevent_fd *fde;
/* Pointer to link object owned by *ev. */
struct pthreadpool_tevent_glue_ev_link *ev_link;
/* active jobs */
@ -122,11 +124,33 @@ struct pthreadpool_tevent_glue_ev_link {
struct pthreadpool_tevent_glue *glue;
};
struct pthreadpool_tevent_wrapper {
struct pthreadpool_tevent *main_tp;
struct pthreadpool_tevent *wrap_tp;
const struct pthreadpool_tevent_wrapper_ops *ops;
void *private_state;
bool force_per_thread_cwd;
};
struct pthreadpool_tevent {
struct pthreadpool_tevent *prev, *next;
struct pthreadpool *pool;
struct pthreadpool_tevent_glue *glue_list;
struct pthreadpool_tevent_job *jobs;
struct {
/*
* This is used on the main context
*/
struct pthreadpool_tevent *list;
/*
* This is used on the wrapper context
*/
struct pthreadpool_tevent_wrapper *ctx;
} wrapper;
};
struct pthreadpool_tevent_job_state {
@ -141,6 +165,7 @@ struct pthreadpool_tevent_job {
struct pthreadpool_tevent_job *prev, *next;
struct pthreadpool_tevent *pool;
struct pthreadpool_tevent_wrapper *wrapper;
struct pthreadpool_tevent_job_state *state;
struct tevent_immediate *im;
@ -180,6 +205,15 @@ struct pthreadpool_tevent_job {
*/
bool started;
/*
* 'wrapper'
* set before calling the wrapper before_job() or
* after_job() hooks.
* unset again check the hook finished.
* (only written by job thread!)
*/
bool wrapper;
/*
* 'executed'
* set once the job function returned.
@ -209,6 +243,18 @@ struct pthreadpool_tevent_job {
* (only written by job thread!)
*/
bool signaled;
/*
* 'exit_thread'
* maybe set during pthreadpool_tevent_job_fn()
* if some wrapper related code generated an error
* and the environment isn't safe anymore.
*
* In such a case pthreadpool_tevent_job_signal()
* will pick this up and therminate the current
* worker thread by returning -1.
*/
bool exit_thread; /* only written/read by job thread! */
} needs_fence;
bool per_thread_cwd;
@ -267,8 +313,22 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
return 0;
}
static struct pthreadpool_tevent *pthreadpool_tevent_unwrap(
struct pthreadpool_tevent *pool)
{
struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
if (wrapper != NULL) {
return wrapper->main_tp;
}
return pool;
}
size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
{
pool = pthreadpool_tevent_unwrap(pool);
if (pool->pool == NULL) {
return 0;
}
@ -278,6 +338,8 @@ size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
{
pool = pthreadpool_tevent_unwrap(pool);
if (pool->pool == NULL) {
return 0;
}
@ -287,6 +349,14 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool)
{
struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
if (wrapper != NULL && wrapper->force_per_thread_cwd) {
return true;
}
pool = pthreadpool_tevent_unwrap(pool);
if (pool->pool == NULL) {
return false;
}
@ -298,21 +368,94 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
{
struct pthreadpool_tevent_job *job = NULL;
struct pthreadpool_tevent_job *njob = NULL;
struct pthreadpool_tevent *wrap_tp = NULL;
struct pthreadpool_tevent *nwrap_tp = NULL;
struct pthreadpool_tevent_glue *glue = NULL;
int ret;
if (pool->wrapper.ctx != NULL) {
struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
pool->wrapper.ctx = NULL;
pool = wrapper->main_tp;
DLIST_REMOVE(pool->wrapper.list, wrapper->wrap_tp);
for (job = pool->jobs; job != NULL; job = njob) {
njob = job->next;
if (job->wrapper != wrapper) {
continue;
}
/*
* This removes the job from the list
*
* Note that it waits in case
* the wrapper hooks are currently
* executing on the job.
*/
pthreadpool_tevent_job_orphan(job);
}
/*
* At this point we're sure that no job
* still references the pthreadpool_tevent_wrapper
* structure, so we can free it.
*/
TALLOC_FREE(wrapper);
pthreadpool_tevent_cleanup_orphaned_jobs();
return 0;
}
if (pool->pool == NULL) {
/*
* A dangling wrapper without main_tp.
*/
return 0;
}
ret = pthreadpool_stop(pool->pool);
if (ret != 0) {
return ret;
}
/*
* orphan all jobs (including wrapper jobs)
*/
for (job = pool->jobs; job != NULL; job = njob) {
njob = job->next;
/* The job this removes it from the list */
/*
* The job this removes it from the list
*
* Note that it waits in case
* the wrapper hooks are currently
* executing on the job (thread).
*/
pthreadpool_tevent_job_orphan(job);
}
/*
* cleanup all existing wrappers, remember we just orphaned
* all jobs (including the once of the wrappers).
*
* So we just mark as broken, so that
* pthreadpool_tevent_job_send() won't accept new jobs.
*/
for (wrap_tp = pool->wrapper.list; wrap_tp != NULL; wrap_tp = nwrap_tp) {
nwrap_tp = wrap_tp->next;
/*
* Just mark them as broken, so that we can't
* get more jobs.
*/
TALLOC_FREE(wrap_tp->wrapper.ctx);
DLIST_REMOVE(pool->wrapper.list, wrap_tp);
}
/*
* Delete all the registered
* tevent_context/tevent_threaded_context
@ -335,12 +478,93 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
return 0;
}
struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
struct pthreadpool_tevent *main_tp,
TALLOC_CTX *mem_ctx,
const struct pthreadpool_tevent_wrapper_ops *ops,
void *pstate,
size_t psize,
const char *type,
const char *location)
{
void **ppstate = (void **)pstate;
struct pthreadpool_tevent *wrap_tp = NULL;
struct pthreadpool_tevent_wrapper *wrapper = NULL;
pthreadpool_tevent_cleanup_orphaned_jobs();
if (main_tp->wrapper.ctx != NULL) {
/*
* stacking of wrappers is not supported
*/
errno = EINVAL;
return NULL;
}
if (main_tp->pool == NULL) {
/*
* The pool is no longer valid,
* most likely it was a wrapper context
* where the main pool was destroyed.
*/
errno = EINVAL;
return NULL;
}
wrap_tp = talloc_zero(mem_ctx, struct pthreadpool_tevent);
if (wrap_tp == NULL) {
return NULL;
}
wrapper = talloc_zero(wrap_tp, struct pthreadpool_tevent_wrapper);
if (wrapper == NULL) {
TALLOC_FREE(wrap_tp);
return NULL;
}
wrapper->main_tp = main_tp;
wrapper->wrap_tp = wrap_tp;
wrapper->ops = ops;
wrapper->private_state = talloc_zero_size(wrapper, psize);
if (wrapper->private_state == NULL) {
TALLOC_FREE(wrap_tp);
return NULL;
}
talloc_set_name_const(wrapper->private_state, type);
wrap_tp->wrapper.ctx = wrapper;
DLIST_ADD_END(main_tp->wrapper.list, wrap_tp);
talloc_set_destructor(wrap_tp, pthreadpool_tevent_destructor);
*ppstate = wrapper->private_state;
return wrap_tp;
}
void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool,
const void *private_state)
{
struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
if (wrapper == NULL) {
abort();
}
if (wrapper->private_state != private_state) {
abort();
}
wrapper->force_per_thread_cwd = true;
}
static int pthreadpool_tevent_glue_destructor(
struct pthreadpool_tevent_glue *glue)
{
struct pthreadpool_tevent_job_state *state = NULL;
struct pthreadpool_tevent_job_state *nstate = NULL;
TALLOC_FREE(glue->fde);
for (state = glue->states; state != NULL; state = nstate) {
nstate = state->next;
@ -381,6 +605,59 @@ static int pthreadpool_tevent_glue_link_destructor(
return 0;
}
static void pthreadpool_tevent_glue_monitor(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags,
void *private_data)
{
struct pthreadpool_tevent_glue *glue =
talloc_get_type_abort(private_data,
struct pthreadpool_tevent_glue);
struct pthreadpool_tevent_job *job = NULL;
struct pthreadpool_tevent_job *njob = NULL;
int ret = -1;
ret = pthreadpool_restart_check_monitor_drain(glue->pool->pool);
if (ret != 0) {
TALLOC_FREE(glue->fde);
}
ret = pthreadpool_restart_check(glue->pool->pool);
if (ret == 0) {
/*
* success...
*/
goto done;
}
/*
* There's a problem and the pool
* has not a single thread available
* for pending jobs, so we can only
* stop the jobs and return an error.
* This is similar to a failure from
* pthreadpool_add_job().
*/
for (job = glue->pool->jobs; job != NULL; job = njob) {
njob = job->next;
tevent_req_defer_callback(job->state->req,
job->state->ev);
tevent_req_error(job->state->req, ret);
}
done:
if (glue->states == NULL) {
/*
* If the glue doesn't have any pending jobs
* we remove the glue.
*
* In order to remove the fd event.
*/
TALLOC_FREE(glue);
}
}
static int pthreadpool_tevent_register_ev(
struct pthreadpool_tevent *pool,
struct pthreadpool_tevent_job_state *state)
@ -388,6 +665,7 @@ static int pthreadpool_tevent_register_ev(
struct tevent_context *ev = state->ev;
struct pthreadpool_tevent_glue *glue = NULL;
struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
int monitor_fd = -1;
/*
* See if this tevent_context was already registered by
@ -420,6 +698,28 @@ static int pthreadpool_tevent_register_ev(
};
talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
monitor_fd = pthreadpool_restart_check_monitor_fd(pool->pool);
if (monitor_fd == -1 && errno != ENOSYS) {
int saved_errno = errno;
TALLOC_FREE(glue);
return saved_errno;
}
if (monitor_fd != -1) {
glue->fde = tevent_add_fd(ev,
glue,
monitor_fd,
TEVENT_FD_READ,
pthreadpool_tevent_glue_monitor,
glue);
if (glue->fde == NULL) {
close(monitor_fd);
TALLOC_FREE(glue);
return ENOMEM;
}
tevent_fd_set_auto_close(glue->fde);
}
/*
* Now allocate the link object to the event context. Note this
* is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
@ -559,6 +859,24 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
abort();
}
/*
* Once we marked the request as 'orphaned'
* we spin/loop if 'wrapper' is marked as active.
*
* We need to wait until the wrapper hook finished
* before we can set job->wrapper = NULL.
*
* This is some kind of spinlock, but with
* 1 millisecond sleeps in between, in order
* to give the thread more cpu time to finish.
*/
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
while (job->needs_fence.wrapper) {
poll(NULL, 0, 1);
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
}
job->wrapper = NULL;
/*
* Once we marked the request as 'orphaned'
* we spin/loop if it's already marked
@ -673,9 +991,14 @@ struct tevent_req *pthreadpool_tevent_job_send(
struct pthreadpool_tevent_job_state *state = NULL;
struct pthreadpool_tevent_job *job = NULL;
int ret;
struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
pthreadpool_tevent_cleanup_orphaned_jobs();
if (wrapper != NULL) {
pool = wrapper->main_tp;
}
req = tevent_req_create(mem_ctx, &state,
struct pthreadpool_tevent_job_state);
if (req == NULL) {
@ -705,6 +1028,7 @@ struct tevent_req *pthreadpool_tevent_job_send(
return tevent_req_post(req, ev);
}
job->pool = pool;
job->wrapper = wrapper;
job->fn = fn;
job->private_data = private_data;
job->im = tevent_create_immediate(state->job);
@ -803,15 +1127,73 @@ static void pthreadpool_tevent_job_fn(void *private_data)
struct pthreadpool_tevent_job *job =
talloc_get_type_abort(private_data,
struct pthreadpool_tevent_job);
struct pthreadpool_tevent_wrapper *wrapper = NULL;
current_job = job;
job->needs_fence.started = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
if (job->needs_fence.orphaned) {
current_job = NULL;
return;
}
wrapper = job->wrapper;
if (wrapper != NULL) {
bool ok;
job->needs_fence.wrapper = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
if (job->needs_fence.orphaned) {
job->needs_fence.wrapper = false;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
current_job = NULL;
return;
}
ok = wrapper->ops->before_job(wrapper->wrap_tp,
wrapper->private_state,
wrapper->main_tp,
__location__);
job->needs_fence.wrapper = false;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
if (!ok) {
job->needs_fence.exit_thread = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
current_job = NULL;
return;
}
}
job->fn(job->private_data);
job->needs_fence.executed = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
if (wrapper != NULL) {
bool ok;
job->needs_fence.wrapper = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
if (job->needs_fence.orphaned) {
job->needs_fence.wrapper = false;
job->needs_fence.exit_thread = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
current_job = NULL;
return;
}
ok = wrapper->ops->after_job(wrapper->wrap_tp,
wrapper->private_state,
wrapper->main_tp,
__location__);
job->needs_fence.wrapper = false;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
if (!ok) {
job->needs_fence.exit_thread = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
current_job = NULL;
return;
}
}
current_job = NULL;
}
@ -830,6 +1212,15 @@ static int pthreadpool_tevent_job_signal(int jobid,
/* Request already gone */
job->needs_fence.dropped = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
if (job->needs_fence.exit_thread) {
/*
* A problem with the wrapper the current job/worker
* thread needs to terminate.
*
* The pthreadpool_tevent is already gone.
*/
return -1;
}
return 0;
}
@ -855,6 +1246,15 @@ static int pthreadpool_tevent_job_signal(int jobid,
job->needs_fence.signaled = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
if (job->needs_fence.exit_thread) {
/*
* A problem with the wrapper the current job/worker
* thread needs to terminate.
*
* The pthreadpool_tevent is already gone.
*/
return -1;
}
return 0;
}

View File

@ -29,6 +29,38 @@ struct pthreadpool_tevent;
int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
struct pthreadpool_tevent **presult);
struct pthreadpool_tevent_wrapper_ops {
const char *name;
bool (*before_job)(struct pthreadpool_tevent *wrap_tp,
void *private_state,
struct pthreadpool_tevent *main_tp,
const char *location);
bool (*after_job)(struct pthreadpool_tevent *wrap_tp,
void *private_state,
struct pthreadpool_tevent *main_tp,
const char *location);
};
struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
struct pthreadpool_tevent *main_tp,
TALLOC_CTX *mem_ctx,
const struct pthreadpool_tevent_wrapper_ops *ops,
void *pstate,
size_t psize,
const char *type,
const char *location);
#define pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, state, type) \
_pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, \
state, sizeof(type), #type, __location__)
/*
* this can only be called directly after
* pthreadpool_tevent_wrapper_create()
*/
void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool,
const void *private_state);
size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool);
size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool);
bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool);