1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-23 17:34:34 +03:00

pthreadpool: split out a pthreadpool_stop() from pthreadpool_destroy()

This can be used in combination with pthreadpool_cancel_job() to
implement a multi step shutdown of the pool.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
This commit is contained in:
Stefan Metzmacher 2018-04-25 14:03:30 +02:00
parent 5976841614
commit f19552e239
3 changed files with 133 additions and 30 deletions

View File

@ -71,9 +71,16 @@ struct pthreadpool {
void *signal_fn_private_data;
/*
* indicator to worker threads that they should shut down
* indicator to worker threads to stop processing further jobs
* and exit.
*/
bool shutdown;
bool stopped;
/*
* indicator to the last worker thread to free the pool
* resources.
*/
bool destroyed;
/*
* maximum number of threads
@ -169,7 +176,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
return ret;
}
pool->shutdown = false;
pool->stopped = false;
pool->destroyed = false;
pool->num_threads = 0;
pool->max_threads = max_threads;
pool->num_idle = 0;
@ -198,6 +206,10 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
size_t pthreadpool_max_threads(struct pthreadpool *pool)
{
if (pool->stopped) {
return 0;
}
return pool->max_threads;
}
@ -207,8 +219,18 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
int unlock_res;
size_t ret;
if (pool->stopped) {
return 0;
}
res = pthread_mutex_lock(&pool->mutex);
if (res != 0) {
return res;
}
if (pool->stopped) {
unlock_res = pthread_mutex_unlock(&pool->mutex);
assert(unlock_res == 0);
return 0;
}
@ -378,33 +400,17 @@ static int pthreadpool_free(struct pthreadpool *pool)
}
/*
* Destroy a thread pool. Wake up all idle threads for exit. The last
* one will free the pool.
* Stop a thread pool. Wake up all idle threads for exit.
*/
int pthreadpool_destroy(struct pthreadpool *pool)
static int pthreadpool_stop_locked(struct pthreadpool *pool)
{
int ret, ret1;
int ret;
ret = pthread_mutex_lock(&pool->mutex);
if (ret != 0) {
return ret;
}
if (pool->shutdown) {
ret = pthread_mutex_unlock(&pool->mutex);
assert(ret == 0);
return EBUSY;
}
pool->shutdown = true;
pool->stopped = true;
if (pool->num_threads == 0) {
ret = pthread_mutex_unlock(&pool->mutex);
assert(ret == 0);
ret = pthreadpool_free(pool);
return ret;
return 0;
}
/*
@ -413,12 +419,66 @@ int pthreadpool_destroy(struct pthreadpool *pool)
ret = pthread_cond_broadcast(&pool->condvar);
return ret;
}
/*
* Stop a thread pool. Wake up all idle threads for exit.
*/
int pthreadpool_stop(struct pthreadpool *pool)
{
int ret, ret1;
ret = pthread_mutex_lock(&pool->mutex);
if (ret != 0) {
return ret;
}
if (!pool->stopped) {
ret = pthreadpool_stop_locked(pool);
}
ret1 = pthread_mutex_unlock(&pool->mutex);
assert(ret1 == 0);
return ret;
}
/*
* Destroy a thread pool. Wake up all idle threads for exit. The last
* one will free the pool.
*/
int pthreadpool_destroy(struct pthreadpool *pool)
{
int ret, ret1;
bool free_it;
assert(!pool->destroyed);
ret = pthread_mutex_lock(&pool->mutex);
if (ret != 0) {
return ret;
}
pool->destroyed = true;
if (!pool->stopped) {
ret = pthreadpool_stop_locked(pool);
}
free_it = (pool->num_threads == 0);
ret1 = pthread_mutex_unlock(&pool->mutex);
assert(ret1 == 0);
if (free_it) {
pthreadpool_free(pool);
}
return ret;
}
/*
* Prepare for pthread_exit(), pool->mutex must be locked and will be
* unlocked here. This is a bit of a layering violation, but here we
@ -431,7 +491,7 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
pool->num_threads -= 1;
free_it = (pool->shutdown && (pool->num_threads == 0));
free_it = (pool->destroyed && (pool->num_threads == 0));
ret = pthread_mutex_unlock(&pool->mutex);
assert(ret == 0);
@ -444,7 +504,7 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
static bool pthreadpool_get_job(struct pthreadpool *p,
struct pthreadpool_job *job)
{
if (p->shutdown) {
if (p->stopped) {
return false;
}
@ -527,7 +587,7 @@ static void *pthreadpool_server(void *arg)
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 1;
while ((pool->num_jobs == 0) && !pool->shutdown) {
while ((pool->num_jobs == 0) && !pool->stopped) {
pool->num_idle += 1;
res = pthread_cond_timedwait(
@ -605,9 +665,9 @@ static void *pthreadpool_server(void *arg)
}
}
if (pool->shutdown) {
if (pool->stopped) {
/*
* we're asked to shut down, so exit
* we're asked to stop processing jobs, so exit
*/
pthreadpool_server_exit(pool);
return NULL;
@ -666,12 +726,14 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
int res;
int unlock_res;
assert(!pool->destroyed);
res = pthread_mutex_lock(&pool->mutex);
if (res != 0) {
return res;
}
if (pool->shutdown) {
if (pool->stopped) {
/*
* Protect against the pool being shut down while
* trying to add a job
@ -761,6 +823,8 @@ size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
size_t i, j;
size_t num = 0;
assert(!pool->destroyed);
res = pthread_mutex_lock(&pool->mutex);
if (res != 0) {
return res;

View File

@ -71,9 +71,31 @@ size_t pthreadpool_max_threads(struct pthreadpool *pool);
*/
size_t pthreadpool_queued_jobs(struct pthreadpool *pool);
/**
* @brief Stop a pthreadpool
*
* Stop a pthreadpool. If jobs are submitted, but not yet active in
* a thread, they won't get executed. If a job has already been
* submitted to a thread, the job function will continue running, and
* the signal function might still be called.
*
* This allows a multi step shutdown using pthreadpool_stop(),
* pthreadpool_cancel_job() and pthreadpool_destroy().
*
* @param[in] pool The pool to stop
* @return success: 0, failure: errno
*
* @see pthreadpool_cancel_job()
* @see pthreadpool_destroy()
*/
int pthreadpool_stop(struct pthreadpool *pool);
/**
* @brief Destroy a pthreadpool
*
* This basically implies pthreadpool_stop() if the pool
* isn't already stopped.
*
* Destroy a pthreadpool. If jobs are submitted, but not yet active in
* a thread, they won't get executed. If a job has already been
* submitted to a thread, the job function will continue running, and
@ -84,6 +106,8 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool);
*
* @param[in] pool The pool to destroy
* @return success: 0, failure: errno
*
* @see pthreadpool_stop()
*/
int pthreadpool_destroy(struct pthreadpool *pool);
@ -125,6 +149,8 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
* @return The number of canceled jobs
*
* @see pthreadpool_add_job()
* @see pthreadpool_stop()
* @see pthreadpool_destroy()
*/
size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data);

View File

@ -22,6 +22,8 @@
#include "pthreadpool.h"
struct pthreadpool {
bool stopped;
/*
* Indicate job completion
*/
@ -45,6 +47,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
if (pool == NULL) {
return ENOMEM;
}
pool->stopped = false;
pool->signal_fn = signal_fn;
pool->signal_fn_private_data = signal_fn_private_data;
@ -65,6 +68,10 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data)
{
if (pool->stopped) {
return EINVAL;
}
fn(private_data);
return pool->signal_fn(job_id, fn, private_data,
@ -77,6 +84,12 @@ size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
return 0;
}
int pthreadpool_stop(struct pthreadpool *pool)
{
pool->stopped = true;
return 0;
}
int pthreadpool_destroy(struct pthreadpool *pool)
{
free(pool);