diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
index b071e5393db..4c2858a0dee 100644
--- a/source3/lib/pthreadpool/pthreadpool.c
+++ b/source3/lib/pthreadpool/pthreadpool.c
@@ -19,7 +19,6 @@
#include "replace.h"
#include "system/time.h"
-#include "system/filesys.h"
#include "system/wait.h"
#include "system/threads.h"
#include "pthreadpool.h"
@@ -58,9 +57,10 @@ struct pthreadpool {
size_t num_jobs;
/*
- * pipe for signalling
+ * Indicate job completion
*/
- int sig_pipe[2];
+ int (*signal_fn)(int jobid, void *private_data);
+ void *signal_private_data;
/*
* indicator to worker threads that they should shut down
@@ -99,7 +99,9 @@ static void pthreadpool_prep_atfork(void);
* Initialize a thread pool
*/
-int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
+int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
+ int (*signal_fn)(int jobid, void *private_data),
+ void *signal_private_data)
{
struct pthreadpool *pool;
int ret;
@@ -108,6 +110,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
if (pool == NULL) {
return ENOMEM;
}
+ pool->signal_fn = signal_fn;
+ pool->signal_private_data = signal_private_data;
pool->jobs_array_len = 4;
pool->jobs = calloc(
@@ -120,18 +124,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
pool->head = pool->num_jobs = 0;
- ret = pipe(pool->sig_pipe);
- if (ret == -1) {
- int err = errno;
- free(pool->jobs);
- free(pool);
- return err;
- }
-
ret = pthread_mutex_init(&pool->mutex, NULL);
if (ret != 0) {
- close(pool->sig_pipe[0]);
- close(pool->sig_pipe[1]);
free(pool->jobs);
free(pool);
return ret;
@@ -140,8 +134,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
ret = pthread_cond_init(&pool->condvar, NULL);
if (ret != 0) {
pthread_mutex_destroy(&pool->mutex);
- close(pool->sig_pipe[0]);
- close(pool->sig_pipe[1]);
free(pool->jobs);
free(pool);
return ret;
@@ -158,8 +150,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
if (ret != 0) {
pthread_cond_destroy(&pool->condvar);
pthread_mutex_destroy(&pool->mutex);
- close(pool->sig_pipe[0]);
- close(pool->sig_pipe[1]);
free(pool->jobs);
free(pool);
return ret;
@@ -218,12 +208,6 @@ static void pthreadpool_child(void)
pool != NULL;
pool = DLIST_PREV(pool)) {
- close(pool->sig_pipe[0]);
- close(pool->sig_pipe[1]);
-
- ret = pipe(pool->sig_pipe);
- assert(ret == 0);
-
pool->num_threads = 0;
pool->num_exited = 0;
@@ -248,16 +232,6 @@ static void pthreadpool_prep_atfork(void)
pthreadpool_child);
}
-/*
- * Return the file descriptor which becomes readable when a job has
- * finished
- */
-
-int pthreadpool_signal_fd(struct pthreadpool *pool)
-{
- return pool->sig_pipe[0];
-}
-
/*
* Do a pthread_join() on all children that have exited, pool->mutex must be
* locked
@@ -286,32 +260,6 @@ static void pthreadpool_join_children(struct pthreadpool *pool)
*/
}
-/*
- * Fetch a finished job number from the signal pipe
- */
-
-int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
- unsigned num_jobids)
-{
- ssize_t to_read, nread;
-
- nread = -1;
- errno = EINTR;
-
- to_read = sizeof(int) * num_jobids;
-
- while ((nread == -1) && (errno == EINTR)) {
- nread = read(pool->sig_pipe[0], jobids, to_read);
- }
- if (nread == -1) {
- return -errno;
- }
- if ((nread % sizeof(int)) != 0) {
- return -EINVAL;
- }
- return nread / sizeof(int);
-}
-
/*
* Destroy a thread pool, finishing all threads working for it
*/
@@ -390,12 +338,6 @@ int pthreadpool_destroy(struct pthreadpool *pool)
ret = pthread_mutex_unlock(&pthreadpools_mutex);
assert(ret == 0);
- close(pool->sig_pipe[0]);
- pool->sig_pipe[0] = -1;
-
- close(pool->sig_pipe[1]);
- pool->sig_pipe[1] = -1;
-
free(pool->exited);
free(pool->jobs);
free(pool);
@@ -527,8 +469,7 @@ static void *pthreadpool_server(void *arg)
}
if (pthreadpool_get_job(pool, &job)) {
- ssize_t written;
- int sig_pipe = pool->sig_pipe[1];
+ int ret;
/*
* Do the work with the mutex unlocked
@@ -542,8 +483,9 @@ static void *pthreadpool_server(void *arg)
res = pthread_mutex_lock(&pool->mutex);
assert(res == 0);
- written = write(sig_pipe, &job.id, sizeof(job.id));
- if (written != sizeof(int)) {
+ ret = pool->signal_fn(job.id,
+ pool->signal_private_data);
+ if (ret != 0) {
pthreadpool_server_exit(pool);
pthread_mutex_unlock(&pool->mutex);
return NULL;
diff --git a/source3/lib/pthreadpool/pthreadpool.h b/source3/lib/pthreadpool/pthreadpool.h
index adb825a528a..0b8d6e590c8 100644
--- a/source3/lib/pthreadpool/pthreadpool.h
+++ b/source3/lib/pthreadpool/pthreadpool.h
@@ -43,7 +43,9 @@ struct pthreadpool;
* max_threads=0 means unlimited parallelism. The caller has to take
* care to not overload the system.
*/
-int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult);
+int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
+ int (*signal_fn)(int jobid, void *private_data),
+ void *signal_private_data);
/**
* @brief Destroy a pthreadpool
@@ -60,8 +62,8 @@ int pthreadpool_destroy(struct pthreadpool *pool);
* @brief Add a job to a pthreadpool
*
* This adds a job to a pthreadpool. The job can be identified by
- * job_id. This integer will be returned from
- * pthreadpool_finished_jobs() then the job is completed.
+ * job_id. This integer will be passed to signal_fn() when the
+ * job is completed.
*
* @param[in] pool The pool to run the job on
* @param[in] job_id A custom identifier
@@ -72,30 +74,4 @@ int pthreadpool_destroy(struct pthreadpool *pool);
int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data);
-/**
- * @brief Get the signalling fd from a pthreadpool
- *
- * Completion of a job is indicated by readability of the fd returned
- * by pthreadpool_signal_fd().
- *
- * @param[in] pool The pool in question
- * @return The fd to listen on for readability
- */
-int pthreadpool_signal_fd(struct pthreadpool *pool);
-
-/**
- * @brief Get the job_ids of finished jobs
- *
- * This blocks until a job has finished unless the fd returned by
- * pthreadpool_signal_fd() is readable.
- *
- * @param[in] pool The pool to query for finished jobs
- * @param[out] jobids The job_ids of the finished job
- * @param[int] num_jobids The job_ids array size
- * @return success: >=0, number of finished jobs
- * failure: -errno
- */
-int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
- unsigned num_jobids);
-
#endif
diff --git a/source3/lib/pthreadpool/pthreadpool_pipe.c b/source3/lib/pthreadpool/pthreadpool_pipe.c
index 76bafa2c3ff..3eaf5e39bd9 100644
--- a/source3/lib/pthreadpool/pthreadpool_pipe.c
+++ b/source3/lib/pthreadpool/pthreadpool_pipe.c
@@ -24,26 +24,57 @@
struct pthreadpool_pipe {
struct pthreadpool *pool;
+ pid_t pid;
+ int pipe_fds[2];
};
+static int pthreadpool_pipe_signal(int jobid, void *private_data);
+
int pthreadpool_pipe_init(unsigned max_threads,
struct pthreadpool_pipe **presult)
{
- struct pthreadpool_pipe *p;
+ struct pthreadpool_pipe *pool;
int ret;
- p = malloc(sizeof(struct pthreadpool_pipe));
- if (p == NULL) {
+ pool = malloc(sizeof(struct pthreadpool_pipe));
+ if (pool == NULL) {
return ENOMEM;
}
+ pool->pid = getpid();
- ret = pthreadpool_init(max_threads, &p->pool);
+ ret = pipe(pool->pipe_fds);
+ if (ret == -1) {
+ int err = errno;
+ free(pool);
+ return err;
+ }
+
+ ret = pthreadpool_init(max_threads, &pool->pool,
+ pthreadpool_pipe_signal, pool);
if (ret != 0) {
- free(p);
+ close(pool->pipe_fds[0]);
+ close(pool->pipe_fds[1]);
+ free(pool);
return ret;
}
- *presult = p;
+ *presult = pool;
+ return 0;
+}
+
+static int pthreadpool_pipe_signal(int jobid, void *private_data)
+{
+ struct pthreadpool_pipe *pool = private_data;
+ ssize_t written;
+
+ do {
+ written = write(pool->pipe_fds[1], &jobid, sizeof(jobid));
+ } while ((written == -1) && (errno == EINTR));
+
+ if (written != sizeof(jobid)) {
+ return errno;
+ }
+
return 0;
}
@@ -55,30 +86,91 @@ int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
if (ret != 0) {
return ret;
}
+
+ close(pool->pipe_fds[0]);
+ pool->pipe_fds[0] = -1;
+
+ close(pool->pipe_fds[1]);
+ pool->pipe_fds[1] = -1;
+
free(pool);
return 0;
}
+static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
+{
+ pid_t pid = getpid();
+ int signal_fd;
+ int ret;
+
+ if (pid == pool->pid) {
+ return 0;
+ }
+
+ signal_fd = pool->pipe_fds[0];
+
+ close(pool->pipe_fds[0]);
+ pool->pipe_fds[0] = -1;
+
+ close(pool->pipe_fds[1]);
+ pool->pipe_fds[1] = -1;
+
+ ret = pipe(pool->pipe_fds);
+ if (ret != 0) {
+ return errno;
+ }
+
+ ret = dup2(pool->pipe_fds[0], signal_fd);
+ if (ret != 0) {
+ return errno;
+ }
+
+ pool->pipe_fds[0] = signal_fd;
+
+ return 0;
+}
+
int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
void (*fn)(void *private_data),
void *private_data)
{
int ret;
+
+ ret = pthreadpool_pipe_reinit(pool);
+ if (ret != 0) {
+ return ret;
+ }
+
ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
return ret;
}
int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
{
- int fd;
- fd = pthreadpool_signal_fd(pool->pool);
- return fd;
+ return pool->pipe_fds[0];
}
int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
unsigned num_jobids)
{
- int ret;
- ret = pthreadpool_finished_jobs(pool->pool, jobids, num_jobids);
- return ret;
+ ssize_t to_read, nread;
+ pid_t pid = getpid();
+
+ if (pool->pid != pid) {
+ return EINVAL;
+ }
+
+ to_read = sizeof(int) * num_jobids;
+
+ do {
+ nread = read(pool->pipe_fds[0], jobids, to_read);
+ } while ((nread == -1) && (errno == EINTR));
+
+ if (nread == -1) {
+ return -errno;
+ }
+ if ((nread % sizeof(int)) != 0) {
+ return -EINVAL;
+ }
+ return nread / sizeof(int);
}
diff --git a/source3/lib/pthreadpool/pthreadpool_sync.c b/source3/lib/pthreadpool/pthreadpool_sync.c
index 5f06cae2f8c..3e78f467179 100644
--- a/source3/lib/pthreadpool/pthreadpool_sync.c
+++ b/source3/lib/pthreadpool/pthreadpool_sync.c
@@ -17,165 +17,47 @@
* along with this program. If not, see .
*/
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
+#include "replace.h"
#include "pthreadpool.h"
struct pthreadpool {
/*
- * pipe for signalling
+ * Indicate job completion
*/
- int sig_pipe[2];
-
- /*
- * Have we sent something into the pipe that has not been
- * retrieved yet?
- */
- int pipe_busy;
-
- /*
- * Jobids that we have not sent into the pipe yet
- */
- size_t num_ids;
- int *ids;
+ int (*signal_fn)(int jobid,
+ void *private_data);
+ void *signal_private_data;
};
-int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
+int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
+ int (*signal_fn)(int jobid,
+ void *private_data),
+ void *signal_private_data)
{
struct pthreadpool *pool;
- int ret;
pool = (struct pthreadpool *)calloc(1, sizeof(struct pthreadpool));
if (pool == NULL) {
return ENOMEM;
}
- ret = pipe(pool->sig_pipe);
- if (ret == -1) {
- int err = errno;
- free(pool);
- return err;
- }
+ pool->signal_fn = signal_fn;
+ pool->signal_private_data = signal_private_data;
+
*presult = pool;
return 0;
}
-int pthreadpool_signal_fd(struct pthreadpool *pool)
-{
- return pool->sig_pipe[0];
-}
-
-static int pthreadpool_write_to_pipe(struct pthreadpool *pool)
-{
- ssize_t written;
-
- if (pool->pipe_busy) {
- return 0;
- }
- if (pool->num_ids == 0) {
- return 0;
- }
-
- written = -1;
- errno = EINTR;
-
- while ((written == -1) && (errno == EINTR)) {
- written = write(pool->sig_pipe[1], &pool->ids[0], sizeof(int));
- }
- if (written == -1) {
- return errno;
- }
- if (written != sizeof(int)) {
- /*
- * If a single int only partially fits into the pipe,
- * we can assume ourselves pretty broken
- */
- close(pool->sig_pipe[1]);
- pool->sig_pipe[1] = -1;
- return EIO;
- }
-
- if (pool->num_ids > 1) {
- memmove(pool->ids, pool->ids+1, sizeof(int) * (pool->num_ids-1));
- }
- pool->num_ids -= 1;
- pool->pipe_busy = 1;
- return 0;
-}
-
int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data)
{
- int *tmp;
-
- if (pool->sig_pipe[1] == -1) {
- return EIO;
- }
-
fn(private_data);
- tmp = realloc(pool->ids, sizeof(int) * (pool->num_ids+1));
- if (tmp == NULL) {
- return ENOMEM;
- }
- pool->ids = tmp;
- pool->ids[pool->num_ids] = job_id;
- pool->num_ids += 1;
-
- return pthreadpool_write_to_pipe(pool);
-
-}
-
-int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
- unsigned num_jobids)
-{
- ssize_t to_read, nread;
- int ret;
-
- nread = -1;
- errno = EINTR;
-
- to_read = sizeof(int) * num_jobids;
-
- while ((nread == -1) && (errno == EINTR)) {
- nread = read(pool->sig_pipe[0], jobids, to_read);
- }
- if (nread == -1) {
- return -errno;
- }
- if ((nread % sizeof(int)) != 0) {
- return -EINVAL;
- }
-
- pool->pipe_busy = 0;
-
- ret = pthreadpool_write_to_pipe(pool);
- if (ret != 0) {
- return -ret;
- }
-
- return nread / sizeof(int);
+ return pool->signal_fn(job_id, pool->signal_private_data);
}
int pthreadpool_destroy(struct pthreadpool *pool)
{
- if (pool->sig_pipe[0] != -1) {
- close(pool->sig_pipe[0]);
- pool->sig_pipe[0] = -1;
- }
-
- if (pool->sig_pipe[1] != -1) {
- close(pool->sig_pipe[1]);
- pool->sig_pipe[1] = -1;
- }
- free(pool->ids);
free(pool);
return 0;
}