mirror of
https://github.com/samba-team/samba.git
synced 2024-12-23 17:34:34 +03:00
pthreadpool_pipe: Implement EBUSY for _destroy
Restore EBUSY on pthreadpool_pipe_destroy. We need to count jobs in pthreadpool_pipe so that pthreadpool can exit with active jobs. Unfortunately this makes pthreadpool_pipe_add_job non-threadsafe. We could add mutexes around "num_jobs", but this would mean another set of pthread_atfork functions. As we don't use threaded pthreadpool_pipe_add_job except in the tests, just remove the tests... Signed-off-by: Volker Lendecke <vl@samba.org> Reviewed-by: Jeremy Allison <jra@samba.org>
This commit is contained in:
parent
77b7dea2d9
commit
1a3d081c99
@ -24,6 +24,7 @@
|
||||
|
||||
struct pthreadpool_pipe {
|
||||
struct pthreadpool *pool;
|
||||
int num_jobs;
|
||||
pid_t pid;
|
||||
int pipe_fds[2];
|
||||
};
|
||||
@ -39,7 +40,7 @@ int pthreadpool_pipe_init(unsigned max_threads,
|
||||
struct pthreadpool_pipe *pool;
|
||||
int ret;
|
||||
|
||||
pool = malloc(sizeof(struct pthreadpool_pipe));
|
||||
pool = calloc(1, sizeof(struct pthreadpool_pipe));
|
||||
if (pool == NULL) {
|
||||
return ENOMEM;
|
||||
}
|
||||
@ -88,6 +89,10 @@ int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
|
||||
{
|
||||
int ret;
|
||||
|
||||
if (pool->num_jobs != 0) {
|
||||
return EBUSY;
|
||||
}
|
||||
|
||||
ret = pthreadpool_destroy(pool->pool);
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
@ -132,6 +137,7 @@ static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
|
||||
}
|
||||
|
||||
pool->pipe_fds[0] = signal_fd;
|
||||
pool->num_jobs = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -148,7 +154,13 @@ int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
|
||||
}
|
||||
|
||||
ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
|
||||
return ret;
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
pool->num_jobs += 1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
|
||||
@ -159,7 +171,7 @@ int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
|
||||
int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
|
||||
unsigned num_jobids)
|
||||
{
|
||||
ssize_t to_read, nread;
|
||||
ssize_t to_read, nread, num_jobs;
|
||||
pid_t pid = getpid();
|
||||
|
||||
if (pool->pid != pid) {
|
||||
@ -178,5 +190,13 @@ int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
|
||||
if ((nread % sizeof(int)) != 0) {
|
||||
return -EINVAL;
|
||||
}
|
||||
return nread / sizeof(int);
|
||||
|
||||
num_jobs = nread / sizeof(int);
|
||||
|
||||
if (num_jobs > pool->num_jobs) {
|
||||
return -EINVAL;
|
||||
}
|
||||
pool->num_jobs -= num_jobs;
|
||||
|
||||
return num_jobs;
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ static int test_init(void)
|
||||
}
|
||||
ret = pthreadpool_pipe_destroy(p);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
|
||||
fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
|
||||
strerror(ret));
|
||||
return -1;
|
||||
}
|
||||
@ -72,6 +72,11 @@ static int test_jobs(int num_threads, int num_jobs)
|
||||
for (i=0; i<num_jobs; i++) {
|
||||
int jobid = -1;
|
||||
ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "pthreadpool_pipe_finished_jobs "
|
||||
"failed: %s\n", strerror(-ret));
|
||||
return -1;
|
||||
}
|
||||
if ((ret != 1) || (jobid >= num_jobs)) {
|
||||
fprintf(stderr, "invalid job number %d\n", jobid);
|
||||
return -1;
|
||||
@ -103,7 +108,7 @@ static int test_busydestroy(void)
|
||||
struct pthreadpool_pipe *p;
|
||||
int timeout = 50;
|
||||
struct pollfd pfd;
|
||||
int ret;
|
||||
int ret, jobid;
|
||||
|
||||
ret = pthreadpool_pipe_init(1, &p);
|
||||
if (ret != 0) {
|
||||
@ -128,6 +133,13 @@ static int test_busydestroy(void)
|
||||
|
||||
poll(&pfd, 1, -1);
|
||||
|
||||
ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
|
||||
if (ret < 0) {
|
||||
fprintf(stderr, "pthreadpool_pipe_finished_jobs failed: %s\n",
|
||||
strerror(-ret));
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = pthreadpool_pipe_destroy(p);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
|
||||
@ -137,191 +149,6 @@ static int test_busydestroy(void)
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct threaded_state {
|
||||
pthread_t tid;
|
||||
struct pthreadpool_pipe *p;
|
||||
int start_job;
|
||||
int num_jobs;
|
||||
int timeout;
|
||||
};
|
||||
|
||||
static void *test_threaded_worker(void *p)
|
||||
{
|
||||
struct threaded_state *state = (struct threaded_state *)p;
|
||||
int i;
|
||||
|
||||
for (i=0; i<state->num_jobs; i++) {
|
||||
int ret = pthreadpool_pipe_add_job(
|
||||
state->p, state->start_job + i,
|
||||
test_sleep, &state->timeout);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "pthreadpool_pipe_add_job failed: "
|
||||
"%s\n", strerror(ret));
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
|
||||
int num_jobs)
|
||||
{
|
||||
struct pthreadpool_pipe **pools;
|
||||
struct threaded_state *states;
|
||||
struct threaded_state *state;
|
||||
struct pollfd *pfds;
|
||||
char *finished;
|
||||
pid_t child;
|
||||
int i, ret, poolnum;
|
||||
int received;
|
||||
|
||||
states = calloc(num_threads, sizeof(struct threaded_state));
|
||||
if (states == NULL) {
|
||||
fprintf(stderr, "calloc failed\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
finished = calloc(num_threads * num_jobs, 1);
|
||||
if (finished == NULL) {
|
||||
fprintf(stderr, "calloc failed\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
pools = calloc(num_pools, sizeof(struct pthreadpool_pipe *));
|
||||
if (pools == NULL) {
|
||||
fprintf(stderr, "calloc failed\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
pfds = calloc(num_pools, sizeof(struct pollfd));
|
||||
if (pfds == NULL) {
|
||||
fprintf(stderr, "calloc failed\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (i=0; i<num_pools; i++) {
|
||||
ret = pthreadpool_pipe_init(poolsize, &pools[i]);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
|
||||
strerror(ret));
|
||||
return -1;
|
||||
}
|
||||
pfds[i].fd = pthreadpool_pipe_signal_fd(pools[i]);
|
||||
pfds[i].events = POLLIN|POLLHUP;
|
||||
}
|
||||
|
||||
poolnum = 0;
|
||||
|
||||
for (i=0; i<num_threads; i++) {
|
||||
state = &states[i];
|
||||
|
||||
state->p = pools[poolnum];
|
||||
poolnum = (poolnum + 1) % num_pools;
|
||||
|
||||
state->num_jobs = num_jobs;
|
||||
state->timeout = 1;
|
||||
state->start_job = i * num_jobs;
|
||||
|
||||
ret = pthread_create(&state->tid, NULL, test_threaded_worker,
|
||||
state);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "pthread_create failed: %s\n",
|
||||
strerror(ret));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (random() % 1) {
|
||||
poll(NULL, 0, 1);
|
||||
}
|
||||
|
||||
child = fork();
|
||||
if (child < 0) {
|
||||
fprintf(stderr, "fork failed: %s\n", strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
if (child == 0) {
|
||||
for (i=0; i<num_pools; i++) {
|
||||
ret = pthreadpool_pipe_destroy(pools[i]);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "pthreadpool_pipe_destroy "
|
||||
"failed: %s\n", strerror(ret));
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
/* child */
|
||||
exit(0);
|
||||
}
|
||||
|
||||
for (i=0; i<num_threads; i++) {
|
||||
ret = pthread_join(states[i].tid, NULL);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "pthread_join(%d) failed: %s\n",
|
||||
i, strerror(ret));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
received = 0;
|
||||
|
||||
while (received < num_threads*num_jobs) {
|
||||
int j;
|
||||
|
||||
ret = poll(pfds, num_pools, 1000);
|
||||
if (ret == -1) {
|
||||
fprintf(stderr, "poll failed: %s\n",
|
||||
strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
if (ret == 0) {
|
||||
fprintf(stderr, "\npoll timed out\n");
|
||||
break;
|
||||
}
|
||||
|
||||
for (j=0; j<num_pools; j++) {
|
||||
int jobid = -1;
|
||||
|
||||
if ((pfds[j].revents & (POLLIN|POLLHUP)) == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ret = pthreadpool_pipe_finished_jobs(
|
||||
pools[j], &jobid, 1);
|
||||
if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
|
||||
fprintf(stderr, "invalid job number %d\n",
|
||||
jobid);
|
||||
return -1;
|
||||
}
|
||||
finished[jobid] += 1;
|
||||
received += 1;
|
||||
}
|
||||
}
|
||||
|
||||
for (i=0; i<num_threads*num_jobs; i++) {
|
||||
if (finished[i] != 1) {
|
||||
fprintf(stderr, "finished[%d] = %d\n",
|
||||
i, finished[i]);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
for (i=0; i<num_pools; i++) {
|
||||
ret = pthreadpool_pipe_destroy(pools[i]);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "pthreadpool_pipe_destroy failed: "
|
||||
"%s\n", strerror(ret));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
free(pfds);
|
||||
free(pools);
|
||||
free(states);
|
||||
free(finished);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int test_fork(void)
|
||||
{
|
||||
struct pthreadpool_pipe *p;
|
||||
@ -390,25 +217,6 @@ int main(void)
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Test 10 threads adding jobs on a single pool
|
||||
*/
|
||||
ret = test_threaded_addjob(1, 10, 5, 5000);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "test_jobs failed\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Test 10 threads on 3 pools to verify our fork handling
|
||||
* works right.
|
||||
*/
|
||||
ret = test_threaded_addjob(3, 10, 5, 5000);
|
||||
if (ret != 0) {
|
||||
fprintf(stderr, "test_jobs failed\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
printf("success\n");
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user