From aa9b64eccfd037941512bad108c4e3946714a502 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Fri, 22 Jun 2018 17:14:31 +0200 Subject: [PATCH] pthreadpool: maintain a list of job_states on each pthreadpool_tevent_glue We should avoid traversing a linked list within a thread without holding a mutex! Using a mutex would be very tricky as we'll likely deadlock with the mutexes at the raw pthreadpool layer. So we use somekind of spinlock using atomic_thread_fence in order to protect the access to job->state->glue->{tctx,ev} in pthreadpool_tevent_job_signal(). Signed-off-by: Stefan Metzmacher Reviewed-by: Ralph Boehme --- lib/pthreadpool/pthreadpool_tevent.c | 102 ++++++++++++++++++++------- 1 file changed, 78 insertions(+), 24 deletions(-) diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c index 821d13b0236..3b502a7cc5a 100644 --- a/lib/pthreadpool/pthreadpool_tevent.c +++ b/lib/pthreadpool/pthreadpool_tevent.c @@ -18,6 +18,7 @@ */ #include "replace.h" +#include "system/select.h" #include "system/threads.h" #include "pthreadpool_tevent.h" #include "pthreadpool.h" @@ -104,6 +105,8 @@ struct pthreadpool_tevent_glue { struct tevent_threaded_context *tctx; /* Pointer to link object owned by *ev. */ struct pthreadpool_tevent_glue_ev_link *ev_link; + /* active jobs */ + struct pthreadpool_tevent_job_state *states; }; /* @@ -127,6 +130,8 @@ struct pthreadpool_tevent { }; struct pthreadpool_tevent_job_state { + struct pthreadpool_tevent_job_state *prev, *next; + struct pthreadpool_tevent_glue *glue; struct tevent_context *ev; struct tevent_req *req; struct pthreadpool_tevent_job *job; @@ -322,6 +327,16 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool) static int pthreadpool_tevent_glue_destructor( struct pthreadpool_tevent_glue *glue) { + struct pthreadpool_tevent_job_state *state = NULL; + struct pthreadpool_tevent_job_state *nstate = NULL; + + for (state = glue->states; state != NULL; state = nstate) { + nstate = state->next; + + /* The job this removes it from the list */ + pthreadpool_tevent_job_orphan(state->job); + } + if (glue->pool->glue_list != NULL) { DLIST_REMOVE(glue->pool->glue_list, glue); } @@ -355,9 +370,11 @@ static int pthreadpool_tevent_glue_link_destructor( return 0; } -static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool, - struct tevent_context *ev) +static int pthreadpool_tevent_register_ev( + struct pthreadpool_tevent *pool, + struct pthreadpool_tevent_job_state *state) { + struct tevent_context *ev = state->ev; struct pthreadpool_tevent_glue *glue = NULL; struct pthreadpool_tevent_glue_ev_link *ev_link = NULL; @@ -368,7 +385,9 @@ static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool, * pair. */ for (glue = pool->glue_list; glue != NULL; glue = glue->next) { - if (glue->ev == ev) { + if (glue->ev == state->ev) { + state->glue = glue; + DLIST_ADD_END(glue->states, state); return 0; } } @@ -416,6 +435,9 @@ static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool, } #endif + state->glue = glue; + DLIST_ADD_END(glue->states, state); + DLIST_ADD(pool->glue_list, glue); return 0; } @@ -431,7 +453,7 @@ static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job) /* * We should never be called with needs_fence.orphaned == false. * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job) - * after detaching from the request state and pool list. + * after detaching from the request state, glue and pool list. */ if (!job->needs_fence.orphaned) { abort(); @@ -509,6 +531,42 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job) abort(); } + /* + * Once we marked the request as 'orphaned' + * we spin/loop if it's already marked + * as 'finished' (which means that + * pthreadpool_tevent_job_signal() was entered. + * If it saw 'orphaned' it will exit after setting + * 'dropped', otherwise it dereferences + * job->state->glue->{tctx,ev} until it exited + * after setting 'signaled'. + * + * We need to close this potential gab before + * we can set job->state = NULL. + * + * This is some kind of spinlock, but with + * 1 millisecond sleeps in between, in order + * to give the thread more cpu time to finish. + */ + PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + while (job->needs_fence.finished) { + if (job->needs_fence.dropped) { + break; + } + if (job->needs_fence.signaled) { + break; + } + poll(NULL, 0, 1); + PTHREAD_TEVENT_JOB_THREAD_FENCE(job); + } + + /* + * Once the gab is closed, we can remove + * the glue link. + */ + DLIST_REMOVE(job->state->glue->states, job->state); + job->state->glue = NULL; + /* * We need to reparent to a long term context. * And detach from the request state. @@ -561,6 +619,10 @@ static void pthreadpool_tevent_job_cleanup(struct tevent_req *req, * The job request is not scheduled in the pool * yet or anymore. */ + if (state->glue != NULL) { + DLIST_REMOVE(state->glue->states, state); + state->glue = NULL; + } return; } @@ -605,7 +667,7 @@ struct tevent_req *pthreadpool_tevent_job_send( return tevent_req_post(req, ev); } - ret = pthreadpool_tevent_register_ev(pool, ev); + ret = pthreadpool_tevent_register_ev(pool, state); if (tevent_req_error(req, ret)) { return tevent_req_post(req, ev); } @@ -718,9 +780,6 @@ static int pthreadpool_tevent_job_signal(int jobid, struct pthreadpool_tevent_job *job = talloc_get_type_abort(job_private_data, struct pthreadpool_tevent_job); - struct pthreadpool_tevent_job_state *state = job->state; - struct tevent_threaded_context *tctx = NULL; - struct pthreadpool_tevent_glue *g = NULL; job->needs_fence.finished = true; PTHREAD_TEVENT_JOB_THREAD_FENCE(job); @@ -731,27 +790,22 @@ static int pthreadpool_tevent_job_signal(int jobid, return 0; } -#ifdef HAVE_PTHREAD - for (g = job->pool->glue_list; g != NULL; g = g->next) { - if (g->ev == state->ev) { - tctx = g->tctx; - break; - } - } - - if (tctx == NULL) { - abort(); - } -#endif - - if (tctx != NULL) { + /* + * state and state->glue are valid, + * see the job->needs_fence.finished + * "spinlock" loop in + * pthreadpool_tevent_job_orphan() + */ + if (job->state->glue->tctx != NULL) { /* with HAVE_PTHREAD */ - tevent_threaded_schedule_immediate(tctx, job->im, + tevent_threaded_schedule_immediate(job->state->glue->tctx, + job->im, pthreadpool_tevent_job_done, job); } else { /* without HAVE_PTHREAD */ - tevent_schedule_immediate(job->im, state->ev, + tevent_schedule_immediate(job->im, + job->state->glue->ev, pthreadpool_tevent_job_done, job); }