mirror of
https://github.com/samba-team/samba.git
synced 2025-08-03 04:22:09 +03:00
pthreadpool: Use detached threads
So far we used joinable threads. This prevented pthreadpool_destroy with blocked threads. This patch converts pthreadpool to detached threads. Now pthreadpool_destroy does not have to wait for the idle threads to finish, it can immediately return. pthreadpool_destroy will tell all threads to exit, and the last active thread will actually free(pthreadpool). Signed-off-by: Volker Lendecke <vl@samba.org> Reviewed-by: Jeremy Allison <jra@samba.org>
This commit is contained in:
committed by
Jeremy Allison
parent
2d6b6c2446
commit
77b7dea2d9
@ -89,12 +89,6 @@ struct pthreadpool {
|
||||
* Number of idle threads
|
||||
*/
|
||||
int num_idle;
|
||||
|
||||
/*
|
||||
* An array of threads that require joining.
|
||||
*/
|
||||
int num_exited;
|
||||
pthread_t *exited; /* We alloc more */
|
||||
};
|
||||
|
||||
static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
@ -152,8 +146,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
|
||||
|
||||
pool->shutdown = false;
|
||||
pool->num_threads = 0;
|
||||
pool->num_exited = 0;
|
||||
pool->exited = NULL;
|
||||
pool->max_threads = max_threads;
|
||||
pool->num_idle = 0;
|
||||
|
||||
@ -220,11 +212,6 @@ static void pthreadpool_child(void)
|
||||
pool = DLIST_PREV(pool)) {
|
||||
|
||||
pool->num_threads = 0;
|
||||
|
||||
pool->num_exited = 0;
|
||||
free(pool->exited);
|
||||
pool->exited = NULL;
|
||||
|
||||
pool->num_idle = 0;
|
||||
pool->head = 0;
|
||||
pool->num_jobs = 0;
|
||||
@ -243,94 +230,13 @@ static void pthreadpool_prep_atfork(void)
|
||||
pthreadpool_child);
|
||||
}
|
||||
|
||||
/*
|
||||
* Do a pthread_join() on all children that have exited, pool->mutex must be
|
||||
* locked
|
||||
*/
|
||||
static void pthreadpool_join_children(struct pthreadpool *pool)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i=0; i<pool->num_exited; i++) {
|
||||
int ret;
|
||||
|
||||
ret = pthread_join(pool->exited[i], NULL);
|
||||
if (ret != 0) {
|
||||
/*
|
||||
* Severe internal error, we can't do much but
|
||||
* abort here.
|
||||
*/
|
||||
abort();
|
||||
}
|
||||
}
|
||||
pool->num_exited = 0;
|
||||
|
||||
/*
|
||||
* Deliberately not free and NULL pool->exited. That will be
|
||||
* re-used by realloc later.
|
||||
*/
|
||||
}
|
||||
|
||||
/*
|
||||
* Destroy a thread pool, finishing all threads working for it
|
||||
*/
|
||||
|
||||
int pthreadpool_destroy(struct pthreadpool *pool)
|
||||
static int pthreadpool_free(struct pthreadpool *pool)
|
||||
{
|
||||
int ret, ret1;
|
||||
|
||||
ret = pthread_mutex_lock(&pool->mutex);
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if ((pool->num_jobs != 0) || pool->shutdown) {
|
||||
ret = pthread_mutex_unlock(&pool->mutex);
|
||||
assert(ret == 0);
|
||||
return EBUSY;
|
||||
}
|
||||
|
||||
if (pool->num_threads > 0) {
|
||||
/*
|
||||
* We have active threads, tell them to finish, wait for that.
|
||||
*/
|
||||
|
||||
pool->shutdown = true;
|
||||
|
||||
if (pool->num_idle > 0) {
|
||||
/*
|
||||
* Wake the idle threads. They will find
|
||||
* pool->shutdown to be set and exit themselves
|
||||
*/
|
||||
ret = pthread_cond_broadcast(&pool->condvar);
|
||||
if (ret != 0) {
|
||||
pthread_mutex_unlock(&pool->mutex);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
|
||||
while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
|
||||
|
||||
if (pool->num_exited > 0) {
|
||||
pthreadpool_join_children(pool);
|
||||
continue;
|
||||
}
|
||||
/*
|
||||
* A thread that shuts down will also signal
|
||||
* pool->condvar
|
||||
*/
|
||||
ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
|
||||
if (ret != 0) {
|
||||
pthread_mutex_unlock(&pool->mutex);
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ret = pthread_mutex_unlock(&pool->mutex);
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
}
|
||||
assert(ret == 0);
|
||||
|
||||
ret = pthread_mutex_destroy(&pool->mutex);
|
||||
ret1 = pthread_cond_destroy(&pool->condvar);
|
||||
|
||||
@ -349,7 +255,6 @@ int pthreadpool_destroy(struct pthreadpool *pool)
|
||||
ret = pthread_mutex_unlock(&pthreadpools_mutex);
|
||||
assert(ret == 0);
|
||||
|
||||
free(pool->exited);
|
||||
free(pool->jobs);
|
||||
free(pool);
|
||||
|
||||
@ -357,25 +262,62 @@ int pthreadpool_destroy(struct pthreadpool *pool)
|
||||
}
|
||||
|
||||
/*
|
||||
* Prepare for pthread_exit(), pool->mutex must be locked
|
||||
* Destroy a thread pool. Wake up all idle threads for exit. The last
|
||||
* one will free the pool.
|
||||
*/
|
||||
|
||||
int pthreadpool_destroy(struct pthreadpool *pool)
|
||||
{
|
||||
int ret, ret1;
|
||||
|
||||
ret = pthread_mutex_lock(&pool->mutex);
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (pool->num_threads == 0) {
|
||||
ret = pthreadpool_free(pool);
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (pool->shutdown) {
|
||||
ret = pthread_mutex_unlock(&pool->mutex);
|
||||
assert(ret == 0);
|
||||
return EBUSY;
|
||||
}
|
||||
|
||||
/*
|
||||
* We have active threads, tell them to finish.
|
||||
*/
|
||||
|
||||
pool->shutdown = true;
|
||||
|
||||
ret = pthread_cond_broadcast(&pool->condvar);
|
||||
|
||||
ret1 = pthread_mutex_unlock(&pool->mutex);
|
||||
assert(ret1 == 0);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
/*
|
||||
* Prepare for pthread_exit(), pool->mutex must be locked and will be
|
||||
* unlocked here. This is a bit of a layering violation, but here we
|
||||
* also take care of removing the pool if we're the last thread.
|
||||
*/
|
||||
static void pthreadpool_server_exit(struct pthreadpool *pool)
|
||||
{
|
||||
pthread_t *exited;
|
||||
int ret;
|
||||
|
||||
pool->num_threads -= 1;
|
||||
|
||||
exited = (pthread_t *)realloc(
|
||||
pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
|
||||
|
||||
if (exited == NULL) {
|
||||
/* lost a thread status */
|
||||
if (pool->shutdown && (pool->num_threads == 0)) {
|
||||
pthreadpool_free(pool);
|
||||
return;
|
||||
}
|
||||
pool->exited = exited;
|
||||
|
||||
pool->exited[pool->num_exited] = pthread_self();
|
||||
pool->num_exited += 1;
|
||||
ret = pthread_mutex_unlock(&pool->mutex);
|
||||
assert(ret == 0);
|
||||
}
|
||||
|
||||
static bool pthreadpool_get_job(struct pthreadpool *p,
|
||||
@ -470,7 +412,6 @@ static void *pthreadpool_server(void *arg)
|
||||
* us. Exit.
|
||||
*/
|
||||
pthreadpool_server_exit(pool);
|
||||
pthread_mutex_unlock(&pool->mutex);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -500,7 +441,6 @@ static void *pthreadpool_server(void *arg)
|
||||
|
||||
if (ret != 0) {
|
||||
pthreadpool_server_exit(pool);
|
||||
pthread_mutex_unlock(&pool->mutex);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
@ -511,16 +451,6 @@ static void *pthreadpool_server(void *arg)
|
||||
* exit
|
||||
*/
|
||||
pthreadpool_server_exit(pool);
|
||||
|
||||
if (pool->num_threads == 0) {
|
||||
/*
|
||||
* Ping the main thread waiting for all of us
|
||||
* workers to have quit.
|
||||
*/
|
||||
pthread_cond_broadcast(&pool->condvar);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&pool->mutex);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
@ -529,6 +459,7 @@ static void *pthreadpool_server(void *arg)
|
||||
int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
|
||||
void (*fn)(void *private_data), void *private_data)
|
||||
{
|
||||
pthread_attr_t thread_attr;
|
||||
pthread_t thread_id;
|
||||
int res;
|
||||
sigset_t mask, omask;
|
||||
@ -548,11 +479,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
|
||||
return EINVAL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Just some cleanup under the mutex
|
||||
*/
|
||||
pthreadpool_join_children(pool);
|
||||
|
||||
/*
|
||||
* Add job to the end of the queue
|
||||
*/
|
||||
@ -585,20 +511,37 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
|
||||
|
||||
sigfillset(&mask);
|
||||
|
||||
res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
|
||||
res = pthread_attr_init(&thread_attr);
|
||||
if (res != 0) {
|
||||
pthread_mutex_unlock(&pool->mutex);
|
||||
return res;
|
||||
}
|
||||
|
||||
res = pthread_attr_setdetachstate(
|
||||
&thread_attr, PTHREAD_CREATE_DETACHED);
|
||||
if (res != 0) {
|
||||
pthread_attr_destroy(&thread_attr);
|
||||
pthread_mutex_unlock(&pool->mutex);
|
||||
return res;
|
||||
}
|
||||
|
||||
res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
|
||||
if (res != 0) {
|
||||
pthread_attr_destroy(&thread_attr);
|
||||
pthread_mutex_unlock(&pool->mutex);
|
||||
return res;
|
||||
}
|
||||
|
||||
res = pthread_create(&thread_id, NULL, pthreadpool_server,
|
||||
(void *)pool);
|
||||
(void *)pool);
|
||||
if (res == 0) {
|
||||
pool->num_threads += 1;
|
||||
}
|
||||
|
||||
assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
|
||||
|
||||
pthread_attr_destroy(&thread_attr);
|
||||
|
||||
pthread_mutex_unlock(&pool->mutex);
|
||||
return res;
|
||||
}
|
||||
|
@ -53,8 +53,13 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
|
||||
/**
|
||||
* @brief Destroy a pthreadpool
|
||||
*
|
||||
* Destroy a pthreadpool. If jobs are still active, this returns
|
||||
* EBUSY.
|
||||
* Destroy a pthreadpool. If jobs are submitted, but not yet active in
|
||||
* a thread, they won't get executed. If a job has already been
|
||||
* submitted to a thread, the job function will continue running, and
|
||||
* the signal function might still be called. The caller of
|
||||
* pthreadpool_init must make sure the required resources are still
|
||||
* around when the pool is destroyed with pending jobs. The last
|
||||
* thread to exit will finally free() the pool memory.
|
||||
*
|
||||
* @param[in] pool The pool to destroy
|
||||
* @return success: 0, failure: errno
|
||||
|
Reference in New Issue
Block a user