From f9745d8b5234091c38e93ed57a255120b61f3ad7 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Fri, 20 Apr 2018 17:12:07 +0200 Subject: [PATCH] pthreadpool: implement pthreadpool_tevent_wrapper_create() infrastructure This can be used implement a generic per thread impersonation for thread pools. Signed-off-by: Stefan Metzmacher Reviewed-by: Ralph Boehme --- lib/pthreadpool/pthreadpool_tevent.c | 402 ++++++++++++++++++++++++++- lib/pthreadpool/pthreadpool_tevent.h | 32 +++ 2 files changed, 433 insertions(+), 1 deletion(-) diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c index 01e8586b384..19b1e6d9650 100644 --- a/lib/pthreadpool/pthreadpool_tevent.c +++ b/lib/pthreadpool/pthreadpool_tevent.c @@ -103,6 +103,8 @@ struct pthreadpool_tevent_glue { /* Tuple we are keeping track of in this list. */ struct tevent_context *ev; struct tevent_threaded_context *tctx; + /* recheck monitor fd event */ + struct tevent_fd *fde; /* Pointer to link object owned by *ev. */ struct pthreadpool_tevent_glue_ev_link *ev_link; /* active jobs */ @@ -122,11 +124,33 @@ struct pthreadpool_tevent_glue_ev_link { struct pthreadpool_tevent_glue *glue; }; +struct pthreadpool_tevent_wrapper { + struct pthreadpool_tevent *main_tp; + struct pthreadpool_tevent *wrap_tp; + const struct pthreadpool_tevent_wrapper_ops *ops; + void *private_state; + bool force_per_thread_cwd; +}; + struct pthreadpool_tevent { + struct pthreadpool_tevent *prev, *next; + struct pthreadpool *pool; struct pthreadpool_tevent_glue *glue_list; struct pthreadpool_tevent_job *jobs; + + struct { + /* + * This is used on the main context + */ + struct pthreadpool_tevent *list; + + /* + * This is used on the wrapper context + */ + struct pthreadpool_tevent_wrapper *ctx; + } wrapper; }; struct pthreadpool_tevent_job_state { @@ -141,6 +165,7 @@ struct pthreadpool_tevent_job { struct pthreadpool_tevent_job *prev, *next; struct pthreadpool_tevent *pool; + struct pthreadpool_tevent_wrapper *wrapper; struct pthreadpool_tevent_job_state *state; struct tevent_immediate *im; @@ -180,6 +205,15 @@ struct pthreadpool_tevent_job { */ bool started; + /* + * 'wrapper' + * set before calling the wrapper before_job() or + * after_job() hooks. + * unset again check the hook finished. + * (only written by job thread!) + */ + bool wrapper; + /* * 'executed' * set once the job function returned. @@ -209,6 +243,18 @@ struct pthreadpool_tevent_job { * (only written by job thread!) */ bool signaled; + + /* + * 'exit_thread' + * maybe set during pthreadpool_tevent_job_fn() + * if some wrapper related code generated an error + * and the environment isn't safe anymore. + * + * In such a case pthreadpool_tevent_job_signal() + * will pick this up and therminate the current + * worker thread by returning -1. + */ + bool exit_thread; /* only written/read by job thread! */ } needs_fence; bool per_thread_cwd; @@ -267,8 +313,22 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads, return 0; } +static struct pthreadpool_tevent *pthreadpool_tevent_unwrap( + struct pthreadpool_tevent *pool) +{ + struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx; + + if (wrapper != NULL) { + return wrapper->main_tp; + } + + return pool; +} + size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool) { + pool = pthreadpool_tevent_unwrap(pool); + if (pool->pool == NULL) { return 0; } @@ -278,6 +338,8 @@ size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool) size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool) { + pool = pthreadpool_tevent_unwrap(pool); + if (pool->pool == NULL) { return 0; } @@ -287,6 +349,14 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool) bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool) { + struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx; + + if (wrapper != NULL && wrapper->force_per_thread_cwd) { + return true; + } + + pool = pthreadpool_tevent_unwrap(pool); + if (pool->pool == NULL) { return false; } @@ -298,21 +368,94 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool) { struct pthreadpool_tevent_job *job = NULL; struct pthreadpool_tevent_job *njob = NULL; + struct pthreadpool_tevent *wrap_tp = NULL; + struct pthreadpool_tevent *nwrap_tp = NULL; struct pthreadpool_tevent_glue *glue = NULL; int ret; + if (pool->wrapper.ctx != NULL) { + struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx; + + pool->wrapper.ctx = NULL; + pool = wrapper->main_tp; + + DLIST_REMOVE(pool->wrapper.list, wrapper->wrap_tp); + + for (job = pool->jobs; job != NULL; job = njob) { + njob = job->next; + + if (job->wrapper != wrapper) { + continue; + } + + /* + * This removes the job from the list + * + * Note that it waits in case + * the wrapper hooks are currently + * executing on the job. + */ + pthreadpool_tevent_job_orphan(job); + } + + /* + * At this point we're sure that no job + * still references the pthreadpool_tevent_wrapper + * structure, so we can free it. + */ + TALLOC_FREE(wrapper); + + pthreadpool_tevent_cleanup_orphaned_jobs(); + return 0; + } + + if (pool->pool == NULL) { + /* + * A dangling wrapper without main_tp. + */ + return 0; + } + ret = pthreadpool_stop(pool->pool); if (ret != 0) { return ret; } + /* + * orphan all jobs (including wrapper jobs) + */ for (job = pool->jobs; job != NULL; job = njob) { njob = job->next; - /* The job this removes it from the list */ + /* + * The job this removes it from the list + * + * Note that it waits in case + * the wrapper hooks are currently + * executing on the job (thread). + */ pthreadpool_tevent_job_orphan(job); } + /* + * cleanup all existing wrappers, remember we just orphaned + * all jobs (including the once of the wrappers). + * + * So we just mark as broken, so that + * pthreadpool_tevent_job_send() won't accept new jobs. + */ + for (wrap_tp = pool->wrapper.list; wrap_tp != NULL; wrap_tp = nwrap_tp) { + nwrap_tp = wrap_tp->next; + + /* + * Just mark them as broken, so that we can't + * get more jobs. + */ + TALLOC_FREE(wrap_tp->wrapper.ctx); + + DLIST_REMOVE(pool->wrapper.list, wrap_tp); + } + /* * Delete all the registered * tevent_context/tevent_threaded_context @@ -335,12 +478,93 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool) return 0; } +struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create( + struct pthreadpool_tevent *main_tp, + TALLOC_CTX *mem_ctx, + const struct pthreadpool_tevent_wrapper_ops *ops, + void *pstate, + size_t psize, + const char *type, + const char *location) +{ + void **ppstate = (void **)pstate; + struct pthreadpool_tevent *wrap_tp = NULL; + struct pthreadpool_tevent_wrapper *wrapper = NULL; + + pthreadpool_tevent_cleanup_orphaned_jobs(); + + if (main_tp->wrapper.ctx != NULL) { + /* + * stacking of wrappers is not supported + */ + errno = EINVAL; + return NULL; + } + + if (main_tp->pool == NULL) { + /* + * The pool is no longer valid, + * most likely it was a wrapper context + * where the main pool was destroyed. + */ + errno = EINVAL; + return NULL; + } + + wrap_tp = talloc_zero(mem_ctx, struct pthreadpool_tevent); + if (wrap_tp == NULL) { + return NULL; + } + + wrapper = talloc_zero(wrap_tp, struct pthreadpool_tevent_wrapper); + if (wrapper == NULL) { + TALLOC_FREE(wrap_tp); + return NULL; + } + wrapper->main_tp = main_tp; + wrapper->wrap_tp = wrap_tp; + wrapper->ops = ops; + wrapper->private_state = talloc_zero_size(wrapper, psize); + if (wrapper->private_state == NULL) { + TALLOC_FREE(wrap_tp); + return NULL; + } + talloc_set_name_const(wrapper->private_state, type); + + wrap_tp->wrapper.ctx = wrapper; + + DLIST_ADD_END(main_tp->wrapper.list, wrap_tp); + + talloc_set_destructor(wrap_tp, pthreadpool_tevent_destructor); + + *ppstate = wrapper->private_state; + return wrap_tp; +} + +void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool, + const void *private_state) +{ + struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx; + + if (wrapper == NULL) { + abort(); + } + + if (wrapper->private_state != private_state) { + abort(); + } + + wrapper->force_per_thread_cwd = true; +} + static int pthreadpool_tevent_glue_destructor( struct pthreadpool_tevent_glue *glue) { struct pthreadpool_tevent_job_state *state = NULL; struct pthreadpool_tevent_job_state *nstate = NULL; + TALLOC_FREE(glue->fde); + for (state = glue->states; state != NULL; state = nstate) { nstate = state->next; @@ -381,6 +605,59 @@ static int pthreadpool_tevent_glue_link_destructor( return 0; } +static void pthreadpool_tevent_glue_monitor(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, + void *private_data) +{ + struct pthreadpool_tevent_glue *glue = + talloc_get_type_abort(private_data, + struct pthreadpool_tevent_glue); + struct pthreadpool_tevent_job *job = NULL; + struct pthreadpool_tevent_job *njob = NULL; + int ret = -1; + + ret = pthreadpool_restart_check_monitor_drain(glue->pool->pool); + if (ret != 0) { + TALLOC_FREE(glue->fde); + } + + ret = pthreadpool_restart_check(glue->pool->pool); + if (ret == 0) { + /* + * success... + */ + goto done; + } + + /* + * There's a problem and the pool + * has not a single thread available + * for pending jobs, so we can only + * stop the jobs and return an error. + * This is similar to a failure from + * pthreadpool_add_job(). + */ + for (job = glue->pool->jobs; job != NULL; job = njob) { + njob = job->next; + + tevent_req_defer_callback(job->state->req, + job->state->ev); + tevent_req_error(job->state->req, ret); + } + +done: + if (glue->states == NULL) { + /* + * If the glue doesn't have any pending jobs + * we remove the glue. + * + * In order to remove the fd event. + */ + TALLOC_FREE(glue); + } +} + static int pthreadpool_tevent_register_ev( struct pthreadpool_tevent *pool, struct pthreadpool_tevent_job_state *state) @@ -388,6 +665,7 @@ static int pthreadpool_tevent_register_ev( struct tevent_context *ev = state->ev; struct pthreadpool_tevent_glue *glue = NULL; struct pthreadpool_tevent_glue_ev_link *ev_link = NULL; + int monitor_fd = -1; /* * See if this tevent_context was already registered by @@ -420,6 +698,28 @@ static int pthreadpool_tevent_register_ev( }; talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor); + monitor_fd = pthreadpool_restart_check_monitor_fd(pool->pool); + if (monitor_fd == -1 && errno != ENOSYS) { + int saved_errno = errno; + TALLOC_FREE(glue); + return saved_errno; + } + + if (monitor_fd != -1) { + glue->fde = tevent_add_fd(ev, + glue, + monitor_fd, + TEVENT_FD_READ, + pthreadpool_tevent_glue_monitor, + glue); + if (glue->fde == NULL) { + close(monitor_fd); + TALLOC_FREE(glue); + return ENOMEM; + } + tevent_fd_set_auto_close(glue->fde); + } + /* * Now allocate the link object to the event context. Note this * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event @@ -559,6 +859,24 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job) abort(); } + /* + * Once we marked the request as 'orphaned' + * we spin/loop if 'wrapper' is marked as active. + * + * We need to wait until the wrapper hook finished + * before we can set job->wrapper = NULL. + * + * This is some kind of spinlock, but with + * 1 millisecond sleeps in between, in order + * to give the thread more cpu time to finish. + */ + PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + while (job->needs_fence.wrapper) { + poll(NULL, 0, 1); + PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + } + job->wrapper = NULL; + /* * Once we marked the request as 'orphaned' * we spin/loop if it's already marked @@ -673,9 +991,14 @@ struct tevent_req *pthreadpool_tevent_job_send( struct pthreadpool_tevent_job_state *state = NULL; struct pthreadpool_tevent_job *job = NULL; int ret; + struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx; pthreadpool_tevent_cleanup_orphaned_jobs(); + if (wrapper != NULL) { + pool = wrapper->main_tp; + } + req = tevent_req_create(mem_ctx, &state, struct pthreadpool_tevent_job_state); if (req == NULL) { @@ -705,6 +1028,7 @@ struct tevent_req *pthreadpool_tevent_job_send( return tevent_req_post(req, ev); } job->pool = pool; + job->wrapper = wrapper; job->fn = fn; job->private_data = private_data; job->im = tevent_create_immediate(state->job); @@ -803,15 +1127,73 @@ static void pthreadpool_tevent_job_fn(void *private_data) struct pthreadpool_tevent_job *job = talloc_get_type_abort(private_data, struct pthreadpool_tevent_job); + struct pthreadpool_tevent_wrapper *wrapper = NULL; current_job = job; job->needs_fence.started = true; PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + if (job->needs_fence.orphaned) { + current_job = NULL; + return; + } + + wrapper = job->wrapper; + if (wrapper != NULL) { + bool ok; + + job->needs_fence.wrapper = true; + PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + if (job->needs_fence.orphaned) { + job->needs_fence.wrapper = false; + PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + current_job = NULL; + return; + } + ok = wrapper->ops->before_job(wrapper->wrap_tp, + wrapper->private_state, + wrapper->main_tp, + __location__); + job->needs_fence.wrapper = false; + PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + if (!ok) { + job->needs_fence.exit_thread = true; + PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + current_job = NULL; + return; + } + } job->fn(job->private_data); job->needs_fence.executed = true; PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + + if (wrapper != NULL) { + bool ok; + + job->needs_fence.wrapper = true; + PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + if (job->needs_fence.orphaned) { + job->needs_fence.wrapper = false; + job->needs_fence.exit_thread = true; + PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + current_job = NULL; + return; + } + ok = wrapper->ops->after_job(wrapper->wrap_tp, + wrapper->private_state, + wrapper->main_tp, + __location__); + job->needs_fence.wrapper = false; + PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + if (!ok) { + job->needs_fence.exit_thread = true; + PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + current_job = NULL; + return; + } + } + current_job = NULL; } @@ -830,6 +1212,15 @@ static int pthreadpool_tevent_job_signal(int jobid, /* Request already gone */ job->needs_fence.dropped = true; PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + if (job->needs_fence.exit_thread) { + /* + * A problem with the wrapper the current job/worker + * thread needs to terminate. + * + * The pthreadpool_tevent is already gone. + */ + return -1; + } return 0; } @@ -855,6 +1246,15 @@ static int pthreadpool_tevent_job_signal(int jobid, job->needs_fence.signaled = true; PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + if (job->needs_fence.exit_thread) { + /* + * A problem with the wrapper the current job/worker + * thread needs to terminate. + * + * The pthreadpool_tevent is already gone. + */ + return -1; + } return 0; } diff --git a/lib/pthreadpool/pthreadpool_tevent.h b/lib/pthreadpool/pthreadpool_tevent.h index ff2ab7cfb73..6c939fc1d2d 100644 --- a/lib/pthreadpool/pthreadpool_tevent.h +++ b/lib/pthreadpool/pthreadpool_tevent.h @@ -29,6 +29,38 @@ struct pthreadpool_tevent; int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads, struct pthreadpool_tevent **presult); +struct pthreadpool_tevent_wrapper_ops { + const char *name; + + bool (*before_job)(struct pthreadpool_tevent *wrap_tp, + void *private_state, + struct pthreadpool_tevent *main_tp, + const char *location); + bool (*after_job)(struct pthreadpool_tevent *wrap_tp, + void *private_state, + struct pthreadpool_tevent *main_tp, + const char *location); +}; + +struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create( + struct pthreadpool_tevent *main_tp, + TALLOC_CTX *mem_ctx, + const struct pthreadpool_tevent_wrapper_ops *ops, + void *pstate, + size_t psize, + const char *type, + const char *location); +#define pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, state, type) \ + _pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, \ + state, sizeof(type), #type, __location__) + +/* + * this can only be called directly after + * pthreadpool_tevent_wrapper_create() + */ +void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool, + const void *private_state); + size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool); size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool); bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool);