1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-22 13:34:15 +03:00

pthreadpool: add pthreadpool_restart_check[_monitor_{fd,drain}]()

This makes it possible to monitor the pthreadpool for exited worker
threads and may restart new threads from the main thread again.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
This commit is contained in:
Stefan Metzmacher 2018-07-16 14:43:01 +02:00 committed by Ralph Boehme
parent fbafdc99ef
commit 3c4cdb2907
3 changed files with 365 additions and 0 deletions

View File

@ -23,6 +23,7 @@
#include "system/threads.h"
#include "pthreadpool.h"
#include "lib/util/dlinklist.h"
#include "lib/util/blocking.h"
#ifdef NDEBUG
#undef NDEBUG
@ -52,6 +53,8 @@ struct pthreadpool {
*/
pthread_cond_t condvar;
int check_pipefd[2];
/*
* Array of jobs
*/
@ -136,6 +139,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
{
struct pthreadpool *pool;
int ret;
bool ok;
pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
if (pool == NULL) {
@ -153,10 +157,52 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
return ENOMEM;
}
ret = pipe(pool->check_pipefd);
if (ret != 0) {
free(pool->jobs);
free(pool);
return ENOMEM;
}
ok = smb_set_close_on_exec(pool->check_pipefd[0]);
if (!ok) {
close(pool->check_pipefd[0]);
close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return EINVAL;
}
ok = smb_set_close_on_exec(pool->check_pipefd[1]);
if (!ok) {
close(pool->check_pipefd[0]);
close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return EINVAL;
}
ret = set_blocking(pool->check_pipefd[0], true);
if (ret == -1) {
close(pool->check_pipefd[0]);
close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return EINVAL;
}
ret = set_blocking(pool->check_pipefd[1], false);
if (ret == -1) {
close(pool->check_pipefd[0]);
close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return EINVAL;
}
pool->head = pool->num_jobs = 0;
ret = pthread_mutex_init(&pool->mutex, NULL);
if (ret != 0) {
close(pool->check_pipefd[0]);
close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return ret;
@ -165,6 +211,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
ret = pthread_cond_init(&pool->condvar, NULL);
if (ret != 0) {
pthread_mutex_destroy(&pool->mutex);
close(pool->check_pipefd[0]);
close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return ret;
@ -174,6 +222,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
if (ret != 0) {
pthread_cond_destroy(&pool->condvar);
pthread_mutex_destroy(&pool->mutex);
close(pool->check_pipefd[0]);
close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return ret;
@ -196,6 +246,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
pthread_mutex_destroy(&pool->fork_mutex);
pthread_cond_destroy(&pool->condvar);
pthread_mutex_destroy(&pool->mutex);
close(pool->check_pipefd[0]);
close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return ret;
@ -359,6 +411,14 @@ static void pthreadpool_child(void)
pool->head = 0;
pool->num_jobs = 0;
pool->stopped = true;
if (pool->check_pipefd[0] != -1) {
close(pool->check_pipefd[0]);
pool->check_pipefd[0] = -1;
}
if (pool->check_pipefd[1] != -1) {
close(pool->check_pipefd[1]);
pool->check_pipefd[1] = -1;
}
ret = pthread_cond_init(&pool->condvar, NULL);
assert(ret == 0);
@ -421,6 +481,14 @@ static int pthreadpool_free(struct pthreadpool *pool)
return ret2;
}
if (pool->check_pipefd[0] != -1) {
close(pool->check_pipefd[0]);
pool->check_pipefd[0] = -1;
}
if (pool->check_pipefd[1] != -1) {
close(pool->check_pipefd[1]);
pool->check_pipefd[1] = -1;
}
free(pool->jobs);
free(pool);
@ -437,6 +505,15 @@ static int pthreadpool_stop_locked(struct pthreadpool *pool)
pool->stopped = true;
if (pool->check_pipefd[0] != -1) {
close(pool->check_pipefd[0]);
pool->check_pipefd[0] = -1;
}
if (pool->check_pipefd[1] != -1) {
close(pool->check_pipefd[1]);
pool->check_pipefd[1] = -1;
}
if (pool->num_threads == 0) {
return 0;
}
@ -521,6 +598,33 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
free_it = (pool->destroyed && (pool->num_threads == 0));
while (true) {
uint8_t c = 0;
ssize_t nwritten = 0;
if (pool->check_pipefd[1] == -1) {
break;
}
nwritten = write(pool->check_pipefd[1], &c, 1);
if (nwritten == -1) {
if (errno == EINTR) {
continue;
}
if (errno == EAGAIN) {
break;
}
#ifdef EWOULDBLOCK
if (errno == EWOULDBLOCK) {
break;
}
#endif
/* ignore ... */
}
break;
}
ret = pthread_mutex_unlock(&pool->mutex);
assert(ret == 0);
@ -851,6 +955,183 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
return res;
}
int pthreadpool_restart_check(struct pthreadpool *pool)
{
int res;
int unlock_res;
unsigned possible_threads = 0;
unsigned missing_threads = 0;
assert(!pool->destroyed);
res = pthread_mutex_lock(&pool->mutex);
if (res != 0) {
return res;
}
if (pool->stopped) {
/*
* Protect against the pool being shut down while
* trying to add a job
*/
unlock_res = pthread_mutex_unlock(&pool->mutex);
assert(unlock_res == 0);
return EINVAL;
}
if (pool->num_jobs == 0) {
/*
* This also handles the pool->max_threads == 0 case as it never
* calls pthreadpool_put_job()
*/
unlock_res = pthread_mutex_unlock(&pool->mutex);
assert(unlock_res == 0);
return 0;
}
if (pool->num_idle > 0) {
/*
* We have idle threads and pending jobs,
* this means we better let all threads
* start and check for pending jobs.
*/
res = pthread_cond_broadcast(&pool->condvar);
assert(res == 0);
}
if (pool->num_threads < pool->max_threads) {
possible_threads = pool->max_threads - pool->num_threads;
}
if (pool->num_idle < pool->num_jobs) {
missing_threads = pool->num_jobs - pool->num_idle;
}
missing_threads = MIN(missing_threads, possible_threads);
while (missing_threads > 0) {
res = pthreadpool_create_thread(pool);
if (res != 0) {
break;
}
missing_threads--;
}
if (missing_threads == 0) {
/*
* Ok, we recreated all thread we need.
*/
unlock_res = pthread_mutex_unlock(&pool->mutex);
assert(unlock_res == 0);
return 0;
}
if (pool->num_threads != 0) {
/*
* At least one thread is still available, let
* that one run the queued jobs.
*/
unlock_res = pthread_mutex_unlock(&pool->mutex);
assert(unlock_res == 0);
return 0;
}
/*
* There's no thread available to run any pending jobs.
* The caller may want to cancel the jobs and destroy the pool.
* But that's up to the caller.
*/
unlock_res = pthread_mutex_unlock(&pool->mutex);
assert(unlock_res == 0);
return res;
}
int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
{
int fd;
int ret;
bool ok;
if (pool->stopped) {
errno = EINVAL;
return -1;
}
if (pool->check_pipefd[0] == -1) {
errno = ENOSYS;
return -1;
}
fd = dup(pool->check_pipefd[0]);
if (fd == -1) {
return -1;
}
ok = smb_set_close_on_exec(fd);
if (!ok) {
int saved_errno = errno;
close(fd);
errno = saved_errno;
return -1;
}
ret = set_blocking(fd, false);
if (ret == -1) {
int saved_errno = errno;
close(fd);
errno = saved_errno;
return -1;
}
return fd;
}
int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
{
if (pool->stopped) {
return EINVAL;
}
if (pool->check_pipefd[0] == -1) {
return ENOSYS;
}
while (true) {
uint8_t buf[128];
ssize_t nread;
nread = read(pool->check_pipefd[0], buf, sizeof(buf));
if (nread == -1) {
if (errno == EINTR) {
continue;
}
if (errno == EAGAIN) {
return 0;
}
#ifdef EWOULDBLOCK
if (errno == EWOULDBLOCK) {
return 0;
}
#endif
if (errno == 0) {
errno = INT_MAX;
}
return errno;
}
if (nread < sizeof(buf)) {
return 0;
}
}
abort();
return INT_MAX;
}
size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data)
{

View File

@ -144,6 +144,70 @@ int pthreadpool_destroy(struct pthreadpool *pool);
int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data);
/**
* @brief Check if the pthreadpool needs a restart.
*
* This checks if there are enough threads to run the already
* queued jobs. This should be called only the callers signal_fn
* (passed to pthreadpool_init()) returned an error, so
* that the job's worker thread exited.
*
* Typically this is called once the file destriptor
* returned by pthreadpool_restart_check_monitor_fd()
* became readable and pthreadpool_restart_check_monitor_drain()
* returned success.
*
* This function tries to restart the missing threads.
*
* @param[in] pool The pool to run the job on
* @return success: 0, failure: errno
*
* @see pthreadpool_restart_check_monitor_fd
* @see pthreadpool_restart_check_monitor_drain
*/
int pthreadpool_restart_check(struct pthreadpool *pool);
/**
* @brief Return a file destriptor that monitors the pool.
*
* If the file destrictor becomes readable,
* the event handler should call pthreadpool_restart_check_monitor_drain().
*
* pthreadpool_restart_check() should also be called once the
* state is drained.
*
* This function returns a fresh fd using dup() each time.
*
* If the pool doesn't require restarts, this function
* returns -1 and sets errno = ENOSYS. The caller
* may ignore that situation.
*
* @param[in] pool The pool to run the job on
* @return success: 0, failure: -1 (set errno)
*
* @see pthreadpool_restart_check_monitor_fd
* @see pthreadpool_restart_check_monitor_drain
*/
int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool);
/**
* @brief Drain the monitor file destriptor of the pool.
*
* If the file destrictor returned by pthreadpool_restart_check_monitor_fd()
* becomes readable, pthreadpool_restart_check_monitor_drain() should be
* called before pthreadpool_restart_check().
*
* If this function returns an error the caller should close
* the file destriptor it got from pthreadpool_restart_check_monitor_fd().
*
* @param[in] pool The pool to run the job on
* @return success: 0, failure: errno
*
* @see pthreadpool_restart_check_monitor_fd
* @see pthreadpool_restart_check
*/
int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool);
/**
* @brief Try to cancel a job in a pthreadpool
*

View File

@ -83,6 +83,26 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
pool->signal_fn_private_data);
}
int pthreadpool_restart_check(struct pthreadpool *pool)
{
if (pool->stopped) {
return EINVAL;
}
return 0;
}
int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
{
errno = ENOSYS;
return -1;
}
int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
{
return EINVAL;
}
size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data)
{