diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c index d482c159941..c2bafd52c08 100644 --- a/lib/pthreadpool/pthreadpool.c +++ b/lib/pthreadpool/pthreadpool.c @@ -24,7 +24,6 @@ #include "system/filesys.h" #include "pthreadpool.h" #include "lib/util/dlinklist.h" -#include "lib/util/blocking.h" #ifdef NDEBUG #undef NDEBUG @@ -54,8 +53,6 @@ struct pthreadpool { */ pthread_cond_t condvar; - int check_pipefd[2]; - /* * Array of jobs */ @@ -140,7 +137,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, { struct pthreadpool *pool; int ret; - bool ok; pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool)); if (pool == NULL) { @@ -158,52 +154,10 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, return ENOMEM; } - ret = pipe(pool->check_pipefd); - if (ret != 0) { - free(pool->jobs); - free(pool); - return ENOMEM; - } - - ok = smb_set_close_on_exec(pool->check_pipefd[0]); - if (!ok) { - close(pool->check_pipefd[0]); - close(pool->check_pipefd[1]); - free(pool->jobs); - free(pool); - return EINVAL; - } - ok = smb_set_close_on_exec(pool->check_pipefd[1]); - if (!ok) { - close(pool->check_pipefd[0]); - close(pool->check_pipefd[1]); - free(pool->jobs); - free(pool); - return EINVAL; - } - ret = set_blocking(pool->check_pipefd[0], true); - if (ret == -1) { - close(pool->check_pipefd[0]); - close(pool->check_pipefd[1]); - free(pool->jobs); - free(pool); - return EINVAL; - } - ret = set_blocking(pool->check_pipefd[1], false); - if (ret == -1) { - close(pool->check_pipefd[0]); - close(pool->check_pipefd[1]); - free(pool->jobs); - free(pool); - return EINVAL; - } - pool->head = pool->num_jobs = 0; ret = pthread_mutex_init(&pool->mutex, NULL); if (ret != 0) { - close(pool->check_pipefd[0]); - close(pool->check_pipefd[1]); free(pool->jobs); free(pool); return ret; @@ -212,8 +166,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, ret = pthread_cond_init(&pool->condvar, NULL); if (ret != 0) { pthread_mutex_destroy(&pool->mutex); - close(pool->check_pipefd[0]); - close(pool->check_pipefd[1]); free(pool->jobs); free(pool); return ret; @@ -223,8 +175,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, if (ret != 0) { pthread_cond_destroy(&pool->condvar); pthread_mutex_destroy(&pool->mutex); - close(pool->check_pipefd[0]); - close(pool->check_pipefd[1]); free(pool->jobs); free(pool); return ret; @@ -247,8 +197,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, pthread_mutex_destroy(&pool->fork_mutex); pthread_cond_destroy(&pool->condvar); pthread_mutex_destroy(&pool->mutex); - close(pool->check_pipefd[0]); - close(pool->check_pipefd[1]); free(pool->jobs); free(pool); return ret; @@ -412,14 +360,6 @@ static void pthreadpool_child(void) pool->head = 0; pool->num_jobs = 0; pool->stopped = true; - if (pool->check_pipefd[0] != -1) { - close(pool->check_pipefd[0]); - pool->check_pipefd[0] = -1; - } - if (pool->check_pipefd[1] != -1) { - close(pool->check_pipefd[1]); - pool->check_pipefd[1] = -1; - } ret = pthread_cond_init(&pool->condvar, NULL); assert(ret == 0); @@ -482,14 +422,6 @@ static int pthreadpool_free(struct pthreadpool *pool) return ret2; } - if (pool->check_pipefd[0] != -1) { - close(pool->check_pipefd[0]); - pool->check_pipefd[0] = -1; - } - if (pool->check_pipefd[1] != -1) { - close(pool->check_pipefd[1]); - pool->check_pipefd[1] = -1; - } free(pool->jobs); free(pool); @@ -506,15 +438,6 @@ static int pthreadpool_stop_locked(struct pthreadpool *pool) pool->stopped = true; - if (pool->check_pipefd[0] != -1) { - close(pool->check_pipefd[0]); - pool->check_pipefd[0] = -1; - } - if (pool->check_pipefd[1] != -1) { - close(pool->check_pipefd[1]); - pool->check_pipefd[1] = -1; - } - if (pool->num_threads == 0) { return 0; } @@ -599,33 +522,6 @@ static void pthreadpool_server_exit(struct pthreadpool *pool) free_it = (pool->destroyed && (pool->num_threads == 0)); - while (true) { - uint8_t c = 0; - ssize_t nwritten = 0; - - if (pool->check_pipefd[1] == -1) { - break; - } - - nwritten = write(pool->check_pipefd[1], &c, 1); - if (nwritten == -1) { - if (errno == EINTR) { - continue; - } - if (errno == EAGAIN) { - break; - } -#ifdef EWOULDBLOCK - if (errno == EWOULDBLOCK) { - break; - } -#endif - /* ignore ... */ - } - - break; - } - ret = pthread_mutex_unlock(&pool->mutex); assert(ret == 0); @@ -956,183 +852,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, return res; } -int pthreadpool_restart_check(struct pthreadpool *pool) -{ - int res; - int unlock_res; - unsigned possible_threads = 0; - unsigned missing_threads = 0; - - assert(!pool->destroyed); - - res = pthread_mutex_lock(&pool->mutex); - if (res != 0) { - return res; - } - - if (pool->stopped) { - /* - * Protect against the pool being shut down while - * trying to add a job - */ - unlock_res = pthread_mutex_unlock(&pool->mutex); - assert(unlock_res == 0); - return EINVAL; - } - - if (pool->num_jobs == 0) { - /* - * This also handles the pool->max_threads == 0 case as it never - * calls pthreadpool_put_job() - */ - unlock_res = pthread_mutex_unlock(&pool->mutex); - assert(unlock_res == 0); - return 0; - } - - if (pool->num_idle > 0) { - /* - * We have idle threads and pending jobs, - * this means we better let all threads - * start and check for pending jobs. - */ - res = pthread_cond_broadcast(&pool->condvar); - assert(res == 0); - } - - if (pool->num_threads < pool->max_threads) { - possible_threads = pool->max_threads - pool->num_threads; - } - - if (pool->num_idle < pool->num_jobs) { - missing_threads = pool->num_jobs - pool->num_idle; - } - - missing_threads = MIN(missing_threads, possible_threads); - - while (missing_threads > 0) { - - res = pthreadpool_create_thread(pool); - if (res != 0) { - break; - } - - missing_threads--; - } - - if (missing_threads == 0) { - /* - * Ok, we recreated all thread we need. - */ - unlock_res = pthread_mutex_unlock(&pool->mutex); - assert(unlock_res == 0); - return 0; - } - - if (pool->num_threads != 0) { - /* - * At least one thread is still available, let - * that one run the queued jobs. - */ - unlock_res = pthread_mutex_unlock(&pool->mutex); - assert(unlock_res == 0); - return 0; - } - - /* - * There's no thread available to run any pending jobs. - * The caller may want to cancel the jobs and destroy the pool. - * But that's up to the caller. - */ - unlock_res = pthread_mutex_unlock(&pool->mutex); - assert(unlock_res == 0); - - return res; -} - -int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool) -{ - int fd; - int ret; - bool ok; - - if (pool->stopped) { - errno = EINVAL; - return -1; - } - - if (pool->check_pipefd[0] == -1) { - errno = ENOSYS; - return -1; - } - - fd = dup(pool->check_pipefd[0]); - if (fd == -1) { - return -1; - } - - ok = smb_set_close_on_exec(fd); - if (!ok) { - int saved_errno = errno; - close(fd); - errno = saved_errno; - return -1; - } - - ret = set_blocking(fd, false); - if (ret == -1) { - int saved_errno = errno; - close(fd); - errno = saved_errno; - return -1; - } - - return fd; -} - -int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool) -{ - if (pool->stopped) { - return EINVAL; - } - - if (pool->check_pipefd[0] == -1) { - return ENOSYS; - } - - while (true) { - uint8_t buf[128]; - ssize_t nread; - - nread = read(pool->check_pipefd[0], buf, sizeof(buf)); - if (nread == -1) { - if (errno == EINTR) { - continue; - } - if (errno == EAGAIN) { - return 0; - } -#ifdef EWOULDBLOCK - if (errno == EWOULDBLOCK) { - return 0; - } -#endif - if (errno == 0) { - errno = INT_MAX; - } - - return errno; - } - - if (nread < sizeof(buf)) { - return 0; - } - } - - abort(); - return INT_MAX; -} - size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data) { diff --git a/lib/pthreadpool/pthreadpool.h b/lib/pthreadpool/pthreadpool.h index 543567ceaf7..d8daf9e4519 100644 --- a/lib/pthreadpool/pthreadpool.h +++ b/lib/pthreadpool/pthreadpool.h @@ -144,70 +144,6 @@ int pthreadpool_destroy(struct pthreadpool *pool); int pthreadpool_add_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data); -/** - * @brief Check if the pthreadpool needs a restart. - * - * This checks if there are enough threads to run the already - * queued jobs. This should be called only the callers signal_fn - * (passed to pthreadpool_init()) returned an error, so - * that the job's worker thread exited. - * - * Typically this is called once the file destriptor - * returned by pthreadpool_restart_check_monitor_fd() - * became readable and pthreadpool_restart_check_monitor_drain() - * returned success. - * - * This function tries to restart the missing threads. - * - * @param[in] pool The pool to run the job on - * @return success: 0, failure: errno - * - * @see pthreadpool_restart_check_monitor_fd - * @see pthreadpool_restart_check_monitor_drain - */ -int pthreadpool_restart_check(struct pthreadpool *pool); - -/** - * @brief Return a file destriptor that monitors the pool. - * - * If the file destrictor becomes readable, - * the event handler should call pthreadpool_restart_check_monitor_drain(). - * - * pthreadpool_restart_check() should also be called once the - * state is drained. - * - * This function returns a fresh fd using dup() each time. - * - * If the pool doesn't require restarts, this function - * returns -1 and sets errno = ENOSYS. The caller - * may ignore that situation. - * - * @param[in] pool The pool to run the job on - * @return success: 0, failure: -1 (set errno) - * - * @see pthreadpool_restart_check_monitor_fd - * @see pthreadpool_restart_check_monitor_drain - */ -int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool); - -/** - * @brief Drain the monitor file destriptor of the pool. - * - * If the file destrictor returned by pthreadpool_restart_check_monitor_fd() - * becomes readable, pthreadpool_restart_check_monitor_drain() should be - * called before pthreadpool_restart_check(). - * - * If this function returns an error the caller should close - * the file destriptor it got from pthreadpool_restart_check_monitor_fd(). - * - * @param[in] pool The pool to run the job on - * @return success: 0, failure: errno - * - * @see pthreadpool_restart_check_monitor_fd - * @see pthreadpool_restart_check - */ -int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool); - /** * @brief Try to cancel a job in a pthreadpool * diff --git a/lib/pthreadpool/pthreadpool_sync.c b/lib/pthreadpool/pthreadpool_sync.c index a476ea712c3..2ed6f36dbbc 100644 --- a/lib/pthreadpool/pthreadpool_sync.c +++ b/lib/pthreadpool/pthreadpool_sync.c @@ -83,26 +83,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, pool->signal_fn_private_data); } -int pthreadpool_restart_check(struct pthreadpool *pool) -{ - if (pool->stopped) { - return EINVAL; - } - - return 0; -} - -int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool) -{ - errno = ENOSYS; - return -1; -} - -int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool) -{ - return EINVAL; -} - size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data) {