1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-01 04:58:35 +03:00

pthreadpool: Avoid a malloc/free per job

pthreadpool_add_job is in our hottest code path for r/w intensive workloads, so
we should avoid anything CPU-intensive. pthreadpool used to malloc each job and
free it in the worker thread. This patch adds a FIFO queue for jobs that helper
threads copy from, avoiding constant malloc/free. This cuts user space
CPU in the local-bench-pthreadpool benchmark by roughly 10% on my system.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
This commit is contained in:
Volker Lendecke 2014-03-21 17:53:26 +01:00 committed by Jeremy Allison
parent 17a60b98db
commit 84aa2ddd86

View File

@ -34,7 +34,6 @@
#include "lib/util/dlinklist.h"
struct pthreadpool_job {
struct pthreadpool_job *next;
int id;
void (*fn)(void *private_data);
void *private_data;
@ -57,9 +56,13 @@ struct pthreadpool {
pthread_cond_t condvar;
/*
* List of work jobs
* Array of jobs
*/
struct pthreadpool_job *jobs, *last_job;
size_t jobs_array_len;
struct pthreadpool_job *jobs;
size_t head;
size_t num_jobs;
/*
* pipe for signalling
@ -113,9 +116,21 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
return ENOMEM;
}
pool->jobs_array_len = 4;
pool->jobs = calloc(
pool->jobs_array_len, sizeof(struct pthreadpool_job));
if (pool->jobs == NULL) {
free(pool);
return ENOMEM;
}
pool->head = pool->num_jobs = 0;
ret = pipe(pool->sig_pipe);
if (ret == -1) {
int err = errno;
free(pool->jobs);
free(pool);
return err;
}
@ -124,6 +139,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
if (ret != 0) {
close(pool->sig_pipe[0]);
close(pool->sig_pipe[1]);
free(pool->jobs);
free(pool);
return ret;
}
@ -133,12 +149,12 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
pthread_mutex_destroy(&pool->mutex);
close(pool->sig_pipe[0]);
close(pool->sig_pipe[1]);
free(pool->jobs);
free(pool);
return ret;
}
pool->shutdown = 0;
pool->jobs = pool->last_job = NULL;
pool->num_threads = 0;
pool->num_exited = 0;
pool->exited = NULL;
@ -151,6 +167,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
pthread_mutex_destroy(&pool->mutex);
close(pool->sig_pipe[0]);
close(pool->sig_pipe[1]);
free(pool->jobs);
free(pool);
return ret;
}
@ -221,14 +238,8 @@ static void pthreadpool_child(void)
pool->exited = NULL;
pool->num_idle = 0;
while (pool->jobs != NULL) {
struct pthreadpool_job *job;
job = pool->jobs;
pool->jobs = job->next;
free(job);
}
pool->last_job = NULL;
pool->head = 0;
pool->num_jobs = 0;
ret = pthread_mutex_unlock(&pool->mutex);
assert(ret == 0);
@ -311,7 +322,7 @@ int pthreadpool_destroy(struct pthreadpool *pool)
return ret;
}
if ((pool->jobs != NULL) || pool->shutdown) {
if ((pool->num_jobs != 0) || pool->shutdown) {
ret = pthread_mutex_unlock(&pool->mutex);
assert(ret == 0);
return EBUSY;
@ -383,6 +394,7 @@ int pthreadpool_destroy(struct pthreadpool *pool)
pool->sig_pipe[1] = -1;
free(pool->exited);
free(pool->jobs);
free(pool);
return 0;
@ -410,6 +422,61 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
pool->num_exited += 1;
}
static bool pthreadpool_get_job(struct pthreadpool *p,
struct pthreadpool_job *job)
{
if (p->num_jobs == 0) {
return false;
}
*job = p->jobs[p->head];
p->head = (p->head+1) % p->jobs_array_len;
p->num_jobs -= 1;
return true;
}
static bool pthreadpool_put_job(struct pthreadpool *p,
int id,
void (*fn)(void *private_data),
void *private_data)
{
struct pthreadpool_job *job;
if (p->num_jobs == p->jobs_array_len) {
struct pthreadpool_job *tmp;
size_t new_len = p->jobs_array_len * 2;
tmp = realloc(
p->jobs, sizeof(struct pthreadpool_job) * new_len);
if (tmp == NULL) {
return false;
}
p->jobs = tmp;
/*
* We just doubled the jobs array. The array implements a FIFO
* queue with a modulo-based wraparound, so we have to memcpy
* the jobs that are logically at the queue end but physically
* before the queue head into the reallocated area. The new
* space starts at the current jobs_array_len, and we have to
* copy everything before the current head job into the new
* area.
*/
memcpy(&p->jobs[p->jobs_array_len], p->jobs,
sizeof(struct pthreadpool_job) * p->head);
p->jobs_array_len = new_len;
}
job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
job->id = id;
job->fn = fn;
job->private_data = private_data;
p->num_jobs += 1;
return true;
}
static void *pthreadpool_server(void *arg)
{
struct pthreadpool *pool = (struct pthreadpool *)arg;
@ -422,7 +489,7 @@ static void *pthreadpool_server(void *arg)
while (1) {
struct timespec ts;
struct pthreadpool_job *job;
struct pthreadpool_job job;
/*
* idle-wait at most 1 second. If nothing happens in that
@ -432,7 +499,7 @@ static void *pthreadpool_server(void *arg)
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 1;
while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
pool->num_idle += 1;
res = pthread_cond_timedwait(
@ -441,7 +508,7 @@ static void *pthreadpool_server(void *arg)
if (res == ETIMEDOUT) {
if (pool->jobs == NULL) {
if (pool->num_jobs == 0) {
/*
* we timed out and still no work for
* us. Exit.
@ -456,19 +523,9 @@ static void *pthreadpool_server(void *arg)
assert(res == 0);
}
job = pool->jobs;
if (job != NULL) {
if (pthreadpool_get_job(pool, &job)) {
ssize_t written;
/*
* Ok, there's work for us to do, remove the job from
* the pthreadpool list
*/
pool->jobs = job->next;
if (pool->last_job == job) {
pool->last_job = NULL;
}
int sig_pipe = pool->sig_pipe[1];
/*
* Do the work with the mutex unlocked
@ -477,12 +534,8 @@ static void *pthreadpool_server(void *arg)
res = pthread_mutex_unlock(&pool->mutex);
assert(res == 0);
job->fn(job->private_data);
written = write(pool->sig_pipe[1], &job->id,
sizeof(int));
free(job);
job.fn(job.private_data);
written = write(sig_pipe, &job.id, sizeof(job.id));
res = pthread_mutex_lock(&pool->mutex);
assert(res == 0);
@ -494,7 +547,7 @@ static void *pthreadpool_server(void *arg)
}
}
if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
/*
* No more work to do and we're asked to shut down, so
* exit
@ -518,24 +571,12 @@ static void *pthreadpool_server(void *arg)
int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data)
{
struct pthreadpool_job *job;
pthread_t thread_id;
int res;
sigset_t mask, omask;
job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
if (job == NULL) {
return ENOMEM;
}
job->fn = fn;
job->private_data = private_data;
job->id = job_id;
job->next = NULL;
res = pthread_mutex_lock(&pool->mutex);
if (res != 0) {
free(job);
return res;
}
@ -546,7 +587,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
*/
res = pthread_mutex_unlock(&pool->mutex);
assert(res == 0);
free(job);
return EINVAL;
}
@ -558,13 +598,10 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
/*
* Add job to the end of the queue
*/
if (pool->jobs == NULL) {
pool->jobs = job;
if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
pthread_mutex_unlock(&pool->mutex);
return ENOMEM;
}
else {
pool->last_job->next = job;
}
pool->last_job = job;
if (pool->num_idle > 0) {
/*