mirror of
https://github.com/samba-team/samba.git
synced 2025-01-11 05:18:09 +03:00
pthreadpool: maintain a list of job_states on each pthreadpool_tevent_glue
We should avoid traversing a linked list within a thread without holding a mutex! Using a mutex would be very tricky as we'll likely deadlock with the mutexes at the raw pthreadpool layer. So we use somekind of spinlock using atomic_thread_fence in order to protect the access to job->state->glue->{tctx,ev} in pthreadpool_tevent_job_signal(). Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Ralph Boehme <slow@samba.org>
This commit is contained in:
parent
9b73fda926
commit
aa9b64eccf
@ -18,6 +18,7 @@
|
||||
*/
|
||||
|
||||
#include "replace.h"
|
||||
#include "system/select.h"
|
||||
#include "system/threads.h"
|
||||
#include "pthreadpool_tevent.h"
|
||||
#include "pthreadpool.h"
|
||||
@ -104,6 +105,8 @@ struct pthreadpool_tevent_glue {
|
||||
struct tevent_threaded_context *tctx;
|
||||
/* Pointer to link object owned by *ev. */
|
||||
struct pthreadpool_tevent_glue_ev_link *ev_link;
|
||||
/* active jobs */
|
||||
struct pthreadpool_tevent_job_state *states;
|
||||
};
|
||||
|
||||
/*
|
||||
@ -127,6 +130,8 @@ struct pthreadpool_tevent {
|
||||
};
|
||||
|
||||
struct pthreadpool_tevent_job_state {
|
||||
struct pthreadpool_tevent_job_state *prev, *next;
|
||||
struct pthreadpool_tevent_glue *glue;
|
||||
struct tevent_context *ev;
|
||||
struct tevent_req *req;
|
||||
struct pthreadpool_tevent_job *job;
|
||||
@ -322,6 +327,16 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
|
||||
static int pthreadpool_tevent_glue_destructor(
|
||||
struct pthreadpool_tevent_glue *glue)
|
||||
{
|
||||
struct pthreadpool_tevent_job_state *state = NULL;
|
||||
struct pthreadpool_tevent_job_state *nstate = NULL;
|
||||
|
||||
for (state = glue->states; state != NULL; state = nstate) {
|
||||
nstate = state->next;
|
||||
|
||||
/* The job this removes it from the list */
|
||||
pthreadpool_tevent_job_orphan(state->job);
|
||||
}
|
||||
|
||||
if (glue->pool->glue_list != NULL) {
|
||||
DLIST_REMOVE(glue->pool->glue_list, glue);
|
||||
}
|
||||
@ -355,9 +370,11 @@ static int pthreadpool_tevent_glue_link_destructor(
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
|
||||
struct tevent_context *ev)
|
||||
static int pthreadpool_tevent_register_ev(
|
||||
struct pthreadpool_tevent *pool,
|
||||
struct pthreadpool_tevent_job_state *state)
|
||||
{
|
||||
struct tevent_context *ev = state->ev;
|
||||
struct pthreadpool_tevent_glue *glue = NULL;
|
||||
struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
|
||||
|
||||
@ -368,7 +385,9 @@ static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
|
||||
* pair.
|
||||
*/
|
||||
for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
|
||||
if (glue->ev == ev) {
|
||||
if (glue->ev == state->ev) {
|
||||
state->glue = glue;
|
||||
DLIST_ADD_END(glue->states, state);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@ -416,6 +435,9 @@ static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
|
||||
}
|
||||
#endif
|
||||
|
||||
state->glue = glue;
|
||||
DLIST_ADD_END(glue->states, state);
|
||||
|
||||
DLIST_ADD(pool->glue_list, glue);
|
||||
return 0;
|
||||
}
|
||||
@ -431,7 +453,7 @@ static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
|
||||
/*
|
||||
* We should never be called with needs_fence.orphaned == false.
|
||||
* Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
|
||||
* after detaching from the request state and pool list.
|
||||
* after detaching from the request state, glue and pool list.
|
||||
*/
|
||||
if (!job->needs_fence.orphaned) {
|
||||
abort();
|
||||
@ -509,6 +531,42 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
|
||||
abort();
|
||||
}
|
||||
|
||||
/*
|
||||
* Once we marked the request as 'orphaned'
|
||||
* we spin/loop if it's already marked
|
||||
* as 'finished' (which means that
|
||||
* pthreadpool_tevent_job_signal() was entered.
|
||||
* If it saw 'orphaned' it will exit after setting
|
||||
* 'dropped', otherwise it dereferences
|
||||
* job->state->glue->{tctx,ev} until it exited
|
||||
* after setting 'signaled'.
|
||||
*
|
||||
* We need to close this potential gab before
|
||||
* we can set job->state = NULL.
|
||||
*
|
||||
* This is some kind of spinlock, but with
|
||||
* 1 millisecond sleeps in between, in order
|
||||
* to give the thread more cpu time to finish.
|
||||
*/
|
||||
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
|
||||
while (job->needs_fence.finished) {
|
||||
if (job->needs_fence.dropped) {
|
||||
break;
|
||||
}
|
||||
if (job->needs_fence.signaled) {
|
||||
break;
|
||||
}
|
||||
poll(NULL, 0, 1);
|
||||
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
|
||||
}
|
||||
|
||||
/*
|
||||
* Once the gab is closed, we can remove
|
||||
* the glue link.
|
||||
*/
|
||||
DLIST_REMOVE(job->state->glue->states, job->state);
|
||||
job->state->glue = NULL;
|
||||
|
||||
/*
|
||||
* We need to reparent to a long term context.
|
||||
* And detach from the request state.
|
||||
@ -561,6 +619,10 @@ static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
|
||||
* The job request is not scheduled in the pool
|
||||
* yet or anymore.
|
||||
*/
|
||||
if (state->glue != NULL) {
|
||||
DLIST_REMOVE(state->glue->states, state);
|
||||
state->glue = NULL;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@ -605,7 +667,7 @@ struct tevent_req *pthreadpool_tevent_job_send(
|
||||
return tevent_req_post(req, ev);
|
||||
}
|
||||
|
||||
ret = pthreadpool_tevent_register_ev(pool, ev);
|
||||
ret = pthreadpool_tevent_register_ev(pool, state);
|
||||
if (tevent_req_error(req, ret)) {
|
||||
return tevent_req_post(req, ev);
|
||||
}
|
||||
@ -718,9 +780,6 @@ static int pthreadpool_tevent_job_signal(int jobid,
|
||||
struct pthreadpool_tevent_job *job =
|
||||
talloc_get_type_abort(job_private_data,
|
||||
struct pthreadpool_tevent_job);
|
||||
struct pthreadpool_tevent_job_state *state = job->state;
|
||||
struct tevent_threaded_context *tctx = NULL;
|
||||
struct pthreadpool_tevent_glue *g = NULL;
|
||||
|
||||
job->needs_fence.finished = true;
|
||||
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
|
||||
@ -731,27 +790,22 @@ static int pthreadpool_tevent_job_signal(int jobid,
|
||||
return 0;
|
||||
}
|
||||
|
||||
#ifdef HAVE_PTHREAD
|
||||
for (g = job->pool->glue_list; g != NULL; g = g->next) {
|
||||
if (g->ev == state->ev) {
|
||||
tctx = g->tctx;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (tctx == NULL) {
|
||||
abort();
|
||||
}
|
||||
#endif
|
||||
|
||||
if (tctx != NULL) {
|
||||
/*
|
||||
* state and state->glue are valid,
|
||||
* see the job->needs_fence.finished
|
||||
* "spinlock" loop in
|
||||
* pthreadpool_tevent_job_orphan()
|
||||
*/
|
||||
if (job->state->glue->tctx != NULL) {
|
||||
/* with HAVE_PTHREAD */
|
||||
tevent_threaded_schedule_immediate(tctx, job->im,
|
||||
tevent_threaded_schedule_immediate(job->state->glue->tctx,
|
||||
job->im,
|
||||
pthreadpool_tevent_job_done,
|
||||
job);
|
||||
} else {
|
||||
/* without HAVE_PTHREAD */
|
||||
tevent_schedule_immediate(job->im, state->ev,
|
||||
tevent_schedule_immediate(job->im,
|
||||
job->state->glue->ev,
|
||||
pthreadpool_tevent_job_done,
|
||||
job);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user