1
0
mirror of https://github.com/samba-team/samba.git synced 2025-08-03 04:22:09 +03:00

pthreadpool: Allow multiple jobs to be received

This can avoid syscalls when multiple jobs are finished simultaneously

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
This commit is contained in:
Volker Lendecke
2014-03-24 10:39:56 +00:00
committed by Jeremy Allison
parent 84aa2ddd86
commit c5d07df6ab
8 changed files with 49 additions and 37 deletions

View File

@ -295,9 +295,9 @@ int asys_result(struct asys_context *ctx, ssize_t *pret, int *perrno,
struct asys_job *job; struct asys_job *job;
int ret, jobid; int ret, jobid;
ret = pthreadpool_finished_job(ctx->pool, &jobid); ret = pthreadpool_finished_jobs(ctx->pool, &jobid, 1);
if (ret != 0) { if (ret < 0) {
return ret; return -ret;
} }
if ((jobid < 0) || (jobid >= ctx->num_jobs)) { if ((jobid < 0) || (jobid >= ctx->num_jobs)) {
return EIO; return EIO;

View File

@ -287,7 +287,7 @@ static void fncall_handler(struct tevent_context *ev, struct tevent_fd *fde,
int i, num_pending; int i, num_pending;
int job_id; int job_id;
if (pthreadpool_finished_job(ctx->pool, &job_id) != 0) { if (pthreadpool_finished_jobs(ctx->pool, &job_id, 1) < 0) {
return; return;
} }

View File

@ -288,25 +288,26 @@ static void pthreadpool_join_children(struct pthreadpool *pool)
* Fetch a finished job number from the signal pipe * Fetch a finished job number from the signal pipe
*/ */
int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid) int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
unsigned num_jobids)
{ {
int ret_jobid; ssize_t to_read, nread;
ssize_t nread;
nread = -1; nread = -1;
errno = EINTR; errno = EINTR;
to_read = sizeof(int) * num_jobids;
while ((nread == -1) && (errno == EINTR)) { while ((nread == -1) && (errno == EINTR)) {
nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int)); nread = read(pool->sig_pipe[0], jobids, to_read);
} }
if (nread == -1) { if (nread == -1) {
return errno; return -errno;
} }
if (nread != sizeof(int)) { if ((nread % sizeof(int)) != 0) {
return EINVAL; return -EINVAL;
} }
*jobid = ret_jobid; return nread / sizeof(int);
return 0;
} }
/* /*

View File

@ -61,7 +61,7 @@ int pthreadpool_destroy(struct pthreadpool *pool);
* *
* This adds a job to a pthreadpool. The job can be identified by * This adds a job to a pthreadpool. The job can be identified by
* job_id. This integer will be returned from * job_id. This integer will be returned from
* pthreadpool_finished_job() then the job is completed. * pthreadpool_finished_jobs() then the job is completed.
* *
* @param[in] pool The pool to run the job on * @param[in] pool The pool to run the job on
* @param[in] job_id A custom identifier * @param[in] job_id A custom identifier
@ -84,15 +84,18 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
int pthreadpool_signal_fd(struct pthreadpool *pool); int pthreadpool_signal_fd(struct pthreadpool *pool);
/** /**
* @brief Get the job_id of a finished job * @brief Get the job_ids of finished jobs
* *
* This blocks until a job has finished unless the fd returned by * This blocks until a job has finished unless the fd returned by
* pthreadpool_signal_fd() is readable. * pthreadpool_signal_fd() is readable.
* *
* @param[in] pool The pool to query for finished jobs * @param[in] pool The pool to query for finished jobs
* @param[out] pjobid The job_id of the finished job * @param[out] jobids The job_ids of the finished job
* @return success: 0, failure: errno * @param[int] num_jobids The job_ids array size
* @return success: >=0, number of finished jobs
* failure: -errno
*/ */
int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid); int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
unsigned num_jobids);
#endif #endif

View File

@ -133,27 +133,35 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
} }
int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid) int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
unsigned num_jobids)
{ {
int ret_jobid; ssize_t to_read, nread;
ssize_t nread; int ret;
nread = -1; nread = -1;
errno = EINTR; errno = EINTR;
to_read = sizeof(int) * num_jobids;
while ((nread == -1) && (errno == EINTR)) { while ((nread == -1) && (errno == EINTR)) {
nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int)); nread = read(pool->sig_pipe[0], jobids, to_read);
} }
if (nread == -1) { if (nread == -1) {
return errno; return -errno;
} }
if (nread != sizeof(int)) { if ((nread % sizeof(int)) != 0) {
return EINVAL; return -EINVAL;
} }
*jobid = ret_jobid;
pool->pipe_busy = 0; pool->pipe_busy = 0;
return pthreadpool_write_to_pipe(pool);
ret = pthreadpool_write_to_pipe(pool);
if (ret != 0) {
return -ret;
}
return nread / sizeof(int);
} }
int pthreadpool_destroy(struct pthreadpool *pool) int pthreadpool_destroy(struct pthreadpool *pool)

View File

@ -71,8 +71,8 @@ static int test_jobs(int num_threads, int num_jobs)
for (i=0; i<num_jobs; i++) { for (i=0; i<num_jobs; i++) {
int jobid = -1; int jobid = -1;
ret = pthreadpool_finished_job(p, &jobid); ret = pthreadpool_finished_jobs(p, &jobid, 1);
if ((ret != 0) || (jobid >= num_jobs)) { if ((ret != 1) || (jobid >= num_jobs)) {
fprintf(stderr, "invalid job number %d\n", jobid); fprintf(stderr, "invalid job number %d\n", jobid);
return -1; return -1;
} }
@ -284,8 +284,8 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
continue; continue;
} }
ret = pthreadpool_finished_job(pools[j], &jobid); ret = pthreadpool_finished_jobs(pools[j], &jobid, 1);
if ((ret != 0) || (jobid >= num_jobs * num_threads)) { if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
fprintf(stderr, "invalid job number %d\n", fprintf(stderr, "invalid job number %d\n",
jobid); jobid);
return -1; return -1;

View File

@ -166,8 +166,8 @@ static void aio_open_handle_completion(struct tevent_context *event_ctx,
return; return;
} }
ret = pthreadpool_finished_job(open_pool, &jobid); ret = pthreadpool_finished_jobs(open_pool, &jobid, 1);
if (ret) { if (ret != 1) {
smb_panic("aio_open_handle_completion"); smb_panic("aio_open_handle_completion");
/* notreached. */ /* notreached. */
return; return;

View File

@ -50,15 +50,15 @@ bool run_bench_pthreadpool(int dummy)
strerror(ret)); strerror(ret));
break; break;
} }
ret = pthreadpool_finished_job(pool, &jobid); ret = pthreadpool_finished_jobs(pool, &jobid, 1);
if (ret != 0) { if (ret < 0) {
d_fprintf(stderr, "pthreadpool_finished_job failed: %s\n", d_fprintf(stderr, "pthreadpool_finished_job failed: %s\n",
strerror(ret)); strerror(-ret));
break; break;
} }
} }
pthreadpool_destroy(pool); pthreadpool_destroy(pool);
return (ret == 0); return (ret == 1);
} }