diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c index 1ef6dccee62..610cfb02f15 100644 --- a/lib/pthreadpool/pthreadpool.c +++ b/lib/pthreadpool/pthreadpool.c @@ -71,9 +71,16 @@ struct pthreadpool { void *signal_fn_private_data; /* - * indicator to worker threads that they should shut down + * indicator to worker threads to stop processing further jobs + * and exit. */ - bool shutdown; + bool stopped; + + /* + * indicator to the last worker thread to free the pool + * resources. + */ + bool destroyed; /* * maximum number of threads @@ -169,7 +176,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, return ret; } - pool->shutdown = false; + pool->stopped = false; + pool->destroyed = false; pool->num_threads = 0; pool->max_threads = max_threads; pool->num_idle = 0; @@ -198,6 +206,10 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, size_t pthreadpool_max_threads(struct pthreadpool *pool) { + if (pool->stopped) { + return 0; + } + return pool->max_threads; } @@ -207,8 +219,18 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool) int unlock_res; size_t ret; + if (pool->stopped) { + return 0; + } + res = pthread_mutex_lock(&pool->mutex); if (res != 0) { + return res; + } + + if (pool->stopped) { + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); return 0; } @@ -378,33 +400,17 @@ static int pthreadpool_free(struct pthreadpool *pool) } /* - * Destroy a thread pool. Wake up all idle threads for exit. The last - * one will free the pool. + * Stop a thread pool. Wake up all idle threads for exit. */ -int pthreadpool_destroy(struct pthreadpool *pool) +static int pthreadpool_stop_locked(struct pthreadpool *pool) { - int ret, ret1; + int ret; - ret = pthread_mutex_lock(&pool->mutex); - if (ret != 0) { - return ret; - } - - if (pool->shutdown) { - ret = pthread_mutex_unlock(&pool->mutex); - assert(ret == 0); - return EBUSY; - } - - pool->shutdown = true; + pool->stopped = true; if (pool->num_threads == 0) { - ret = pthread_mutex_unlock(&pool->mutex); - assert(ret == 0); - - ret = pthreadpool_free(pool); - return ret; + return 0; } /* @@ -413,12 +419,66 @@ int pthreadpool_destroy(struct pthreadpool *pool) ret = pthread_cond_broadcast(&pool->condvar); + return ret; +} + +/* + * Stop a thread pool. Wake up all idle threads for exit. + */ + +int pthreadpool_stop(struct pthreadpool *pool) +{ + int ret, ret1; + + ret = pthread_mutex_lock(&pool->mutex); + if (ret != 0) { + return ret; + } + + if (!pool->stopped) { + ret = pthreadpool_stop_locked(pool); + } + ret1 = pthread_mutex_unlock(&pool->mutex); assert(ret1 == 0); return ret; } +/* + * Destroy a thread pool. Wake up all idle threads for exit. The last + * one will free the pool. + */ + +int pthreadpool_destroy(struct pthreadpool *pool) +{ + int ret, ret1; + bool free_it; + + assert(!pool->destroyed); + + ret = pthread_mutex_lock(&pool->mutex); + if (ret != 0) { + return ret; + } + + pool->destroyed = true; + + if (!pool->stopped) { + ret = pthreadpool_stop_locked(pool); + } + + free_it = (pool->num_threads == 0); + + ret1 = pthread_mutex_unlock(&pool->mutex); + assert(ret1 == 0); + + if (free_it) { + pthreadpool_free(pool); + } + + return ret; +} /* * Prepare for pthread_exit(), pool->mutex must be locked and will be * unlocked here. This is a bit of a layering violation, but here we @@ -431,7 +491,7 @@ static void pthreadpool_server_exit(struct pthreadpool *pool) pool->num_threads -= 1; - free_it = (pool->shutdown && (pool->num_threads == 0)); + free_it = (pool->destroyed && (pool->num_threads == 0)); ret = pthread_mutex_unlock(&pool->mutex); assert(ret == 0); @@ -444,7 +504,7 @@ static void pthreadpool_server_exit(struct pthreadpool *pool) static bool pthreadpool_get_job(struct pthreadpool *p, struct pthreadpool_job *job) { - if (p->shutdown) { + if (p->stopped) { return false; } @@ -527,7 +587,7 @@ static void *pthreadpool_server(void *arg) clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += 1; - while ((pool->num_jobs == 0) && !pool->shutdown) { + while ((pool->num_jobs == 0) && !pool->stopped) { pool->num_idle += 1; res = pthread_cond_timedwait( @@ -605,9 +665,9 @@ static void *pthreadpool_server(void *arg) } } - if (pool->shutdown) { + if (pool->stopped) { /* - * we're asked to shut down, so exit + * we're asked to stop processing jobs, so exit */ pthreadpool_server_exit(pool); return NULL; @@ -666,12 +726,14 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, int res; int unlock_res; + assert(!pool->destroyed); + res = pthread_mutex_lock(&pool->mutex); if (res != 0) { return res; } - if (pool->shutdown) { + if (pool->stopped) { /* * Protect against the pool being shut down while * trying to add a job @@ -761,6 +823,8 @@ size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, size_t i, j; size_t num = 0; + assert(!pool->destroyed); + res = pthread_mutex_lock(&pool->mutex); if (res != 0) { return res; diff --git a/lib/pthreadpool/pthreadpool.h b/lib/pthreadpool/pthreadpool.h index dd1f9718b23..b4733580e07 100644 --- a/lib/pthreadpool/pthreadpool.h +++ b/lib/pthreadpool/pthreadpool.h @@ -71,9 +71,31 @@ size_t pthreadpool_max_threads(struct pthreadpool *pool); */ size_t pthreadpool_queued_jobs(struct pthreadpool *pool); +/** + * @brief Stop a pthreadpool + * + * Stop a pthreadpool. If jobs are submitted, but not yet active in + * a thread, they won't get executed. If a job has already been + * submitted to a thread, the job function will continue running, and + * the signal function might still be called. + * + * This allows a multi step shutdown using pthreadpool_stop(), + * pthreadpool_cancel_job() and pthreadpool_destroy(). + * + * @param[in] pool The pool to stop + * @return success: 0, failure: errno + * + * @see pthreadpool_cancel_job() + * @see pthreadpool_destroy() + */ +int pthreadpool_stop(struct pthreadpool *pool); + /** * @brief Destroy a pthreadpool * + * This basically implies pthreadpool_stop() if the pool + * isn't already stopped. + * * Destroy a pthreadpool. If jobs are submitted, but not yet active in * a thread, they won't get executed. If a job has already been * submitted to a thread, the job function will continue running, and @@ -84,6 +106,8 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool); * * @param[in] pool The pool to destroy * @return success: 0, failure: errno + * + * @see pthreadpool_stop() */ int pthreadpool_destroy(struct pthreadpool *pool); @@ -125,6 +149,8 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, * @return The number of canceled jobs * * @see pthreadpool_add_job() + * @see pthreadpool_stop() + * @see pthreadpool_destroy() */ size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data); diff --git a/lib/pthreadpool/pthreadpool_sync.c b/lib/pthreadpool/pthreadpool_sync.c index 837abac54d7..48e6a0ddb60 100644 --- a/lib/pthreadpool/pthreadpool_sync.c +++ b/lib/pthreadpool/pthreadpool_sync.c @@ -22,6 +22,8 @@ #include "pthreadpool.h" struct pthreadpool { + bool stopped; + /* * Indicate job completion */ @@ -45,6 +47,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, if (pool == NULL) { return ENOMEM; } + pool->stopped = false; pool->signal_fn = signal_fn; pool->signal_fn_private_data = signal_fn_private_data; @@ -65,6 +68,10 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool) int pthreadpool_add_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data) { + if (pool->stopped) { + return EINVAL; + } + fn(private_data); return pool->signal_fn(job_id, fn, private_data, @@ -77,6 +84,12 @@ size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, return 0; } +int pthreadpool_stop(struct pthreadpool *pool) +{ + pool->stopped = true; + return 0; +} + int pthreadpool_destroy(struct pthreadpool *pool) { free(pool);