mirror of
https://github.com/samba-team/samba.git
synced 2025-03-01 04:58:35 +03:00
pthreadpool: test cancelling and freeing pending pthreadpool_tevent jobs/pools
Pair-Programmed-With: Stefan Metzmacher <metze@samba.org> Signed-off-by: Ralph Boehme <slow@samba.org> Signed-off-by: Stefan Metzmacher <metze@samba.org>
This commit is contained in:
parent
f23cac39b3
commit
40d15260d2
@ -17,12 +17,16 @@
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "config.h"
|
||||
#include <errno.h>
|
||||
#include <pthread.h>
|
||||
#include <setjmp.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <limits.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
#include <talloc.h>
|
||||
#include <tevent.h>
|
||||
@ -31,6 +35,13 @@
|
||||
#include <cmocka.h>
|
||||
#include <poll.h>
|
||||
|
||||
#ifdef HAVE_VALGRIND_HELGRIND_H
|
||||
#include <valgrind/helgrind.h>
|
||||
#endif
|
||||
#ifndef ANNOTATE_BENIGN_RACE_SIZED
|
||||
#define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
|
||||
#endif
|
||||
|
||||
struct pthreadpool_tevent_test {
|
||||
struct tevent_context *ev;
|
||||
struct pthreadpool_tevent *upool;
|
||||
@ -233,12 +244,435 @@ static void test_create(void **state)
|
||||
assert_false(in_main_thread);
|
||||
}
|
||||
|
||||
struct test_cancel_job {
|
||||
int fdm; /* the main end of socketpair */
|
||||
int fdj; /* the job end of socketpair */
|
||||
bool started;
|
||||
bool canceled;
|
||||
bool orphaned;
|
||||
bool finished;
|
||||
size_t polls;
|
||||
size_t timeouts;
|
||||
int sleep_msec;
|
||||
struct tevent_req *req;
|
||||
bool completed;
|
||||
int ret;
|
||||
};
|
||||
|
||||
static void test_cancel_job_done(struct tevent_req *req);
|
||||
|
||||
static int test_cancel_job_destructor(struct test_cancel_job *job)
|
||||
{
|
||||
ANNOTATE_BENIGN_RACE_SIZED(&job->started,
|
||||
sizeof(job->started),
|
||||
"protected by pthreadpool_tevent code");
|
||||
if (job->started) {
|
||||
ANNOTATE_BENIGN_RACE_SIZED(&job->finished,
|
||||
sizeof(job->finished),
|
||||
"protected by pthreadpool_tevent code");
|
||||
assert_true(job->finished);
|
||||
}
|
||||
|
||||
ANNOTATE_BENIGN_RACE_SIZED(&job->fdj,
|
||||
sizeof(job->fdj),
|
||||
"protected by pthreadpool_tevent code");
|
||||
|
||||
if (job->fdm != -1) {
|
||||
close(job->fdm);
|
||||
job->fdm = -1;
|
||||
}
|
||||
if (job->fdj != -1) {
|
||||
close(job->fdj);
|
||||
job->fdj = -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static struct test_cancel_job *test_cancel_job_create(TALLOC_CTX *mem_ctx)
|
||||
{
|
||||
struct test_cancel_job *job = NULL;
|
||||
|
||||
job = talloc(mem_ctx, struct test_cancel_job);
|
||||
if (job == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
*job = (struct test_cancel_job) {
|
||||
.fdm = -1,
|
||||
.fdj = -1,
|
||||
.sleep_msec = 50,
|
||||
};
|
||||
|
||||
talloc_set_destructor(job, test_cancel_job_destructor);
|
||||
return job;
|
||||
}
|
||||
|
||||
static void test_cancel_job_fn(void *ptr)
|
||||
{
|
||||
struct test_cancel_job *job = (struct test_cancel_job *)ptr;
|
||||
int fdj = -1;
|
||||
char c = 0;
|
||||
int ret;
|
||||
|
||||
assert_non_null(job); /* make sure we abort without a job pointer */
|
||||
|
||||
job->started = true;
|
||||
fdj = job->fdj;
|
||||
job->fdj = -1;
|
||||
|
||||
if (!pthreadpool_tevent_current_job_continue()) {
|
||||
job->canceled = pthreadpool_tevent_current_job_canceled();
|
||||
job->orphaned = pthreadpool_tevent_current_job_orphaned();
|
||||
job->finished = true;
|
||||
close(fdj);
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Notify that we main thread
|
||||
*
|
||||
* write of 1 byte should always work!
|
||||
*/
|
||||
ret = write(fdj, &c, 1);
|
||||
assert_int_equal(ret, 1);
|
||||
|
||||
/*
|
||||
* loop until the job was tried to
|
||||
* be canceled or becomes orphaned.
|
||||
*
|
||||
* If there's some activity on the fd
|
||||
* we directly finish.
|
||||
*/
|
||||
do {
|
||||
struct pollfd pfd = {
|
||||
.fd = fdj,
|
||||
.events = POLLIN,
|
||||
};
|
||||
|
||||
job->polls += 1;
|
||||
|
||||
ret = poll(&pfd, 1, job->sleep_msec);
|
||||
if (ret == 1) {
|
||||
job->finished = true;
|
||||
close(fdj);
|
||||
return;
|
||||
}
|
||||
assert_int_equal(ret, 0);
|
||||
|
||||
job->timeouts += 1;
|
||||
|
||||
} while (pthreadpool_tevent_current_job_continue());
|
||||
|
||||
job->canceled = pthreadpool_tevent_current_job_canceled();
|
||||
job->orphaned = pthreadpool_tevent_current_job_orphaned();
|
||||
job->finished = true;
|
||||
close(fdj);
|
||||
}
|
||||
|
||||
static void test_cancel_job_done(struct tevent_req *req)
|
||||
{
|
||||
struct test_cancel_job *job =
|
||||
tevent_req_callback_data(req,
|
||||
struct test_cancel_job);
|
||||
|
||||
job->ret = pthreadpool_tevent_job_recv(job->req);
|
||||
TALLOC_FREE(job->req);
|
||||
job->completed = true;
|
||||
}
|
||||
|
||||
static void test_cancel_job_wait(struct test_cancel_job *job,
|
||||
struct tevent_context *ev)
|
||||
{
|
||||
/*
|
||||
* We have to keep looping until
|
||||
* test_cancel_job_done was triggered
|
||||
*/
|
||||
while (!job->completed) {
|
||||
int ret;
|
||||
|
||||
ret = tevent_loop_once(ev);
|
||||
assert_int_equal(ret, 0);
|
||||
}
|
||||
}
|
||||
|
||||
struct test_cancel_state {
|
||||
struct test_cancel_job *job1;
|
||||
struct test_cancel_job *job2;
|
||||
struct test_cancel_job *job3;
|
||||
struct test_cancel_job *job4;
|
||||
struct test_cancel_job *job5;
|
||||
struct test_cancel_job *job6;
|
||||
};
|
||||
|
||||
static void test_cancel_job(void **private_data)
|
||||
{
|
||||
struct pthreadpool_tevent_test *t = *private_data;
|
||||
struct tevent_context *ev = t->ev;
|
||||
struct pthreadpool_tevent *pool = t->opool;
|
||||
struct test_cancel_state *state = NULL;
|
||||
int ret;
|
||||
bool ok;
|
||||
int fdpair[2] = { -1, -1 };
|
||||
char c = 0;
|
||||
|
||||
state = talloc_zero(t, struct test_cancel_state);
|
||||
assert_non_null(state);
|
||||
state->job1 = test_cancel_job_create(state);
|
||||
assert_non_null(state->job1);
|
||||
state->job2 = test_cancel_job_create(state);
|
||||
assert_non_null(state->job2);
|
||||
state->job3 = test_cancel_job_create(state);
|
||||
assert_non_null(state->job3);
|
||||
|
||||
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
||||
assert_int_equal(ret, 0);
|
||||
|
||||
state->job1->fdm = fdpair[0];
|
||||
state->job1->fdj = fdpair[1];
|
||||
|
||||
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
|
||||
|
||||
will_return(__wrap_pthread_create, 0);
|
||||
state->job1->req = pthreadpool_tevent_job_send(
|
||||
state->job1, ev, pool, test_cancel_job_fn, state->job1);
|
||||
assert_non_null(state->job1->req);
|
||||
tevent_req_set_callback(state->job1->req,
|
||||
test_cancel_job_done,
|
||||
state->job1);
|
||||
|
||||
state->job2->req = pthreadpool_tevent_job_send(
|
||||
state->job2, ev, pool, test_cancel_job_fn, NULL);
|
||||
assert_non_null(state->job2->req);
|
||||
tevent_req_set_callback(state->job2->req,
|
||||
test_cancel_job_done,
|
||||
state->job2);
|
||||
|
||||
state->job3->req = pthreadpool_tevent_job_send(
|
||||
state->job3, ev, pool, test_cancel_job_fn, NULL);
|
||||
assert_non_null(state->job3->req);
|
||||
tevent_req_set_callback(state->job3->req,
|
||||
test_cancel_job_done,
|
||||
state->job3);
|
||||
|
||||
/*
|
||||
* Wait for the job 1 to start.
|
||||
*/
|
||||
ret = read(state->job1->fdm, &c, 1);
|
||||
assert_int_equal(ret, 1);
|
||||
|
||||
/*
|
||||
* We cancel job 3 and destroy job2.
|
||||
* Both should never be executed.
|
||||
*/
|
||||
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 2);
|
||||
TALLOC_FREE(state->job2->req);
|
||||
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
|
||||
ok = tevent_req_cancel(state->job3->req);
|
||||
assert_true(ok);
|
||||
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
|
||||
|
||||
/*
|
||||
* Job 3 should complete as canceled, while
|
||||
* job 1 is still running.
|
||||
*/
|
||||
test_cancel_job_wait(state->job3, ev);
|
||||
assert_int_equal(state->job3->ret, ECANCELED);
|
||||
assert_null(state->job3->req);
|
||||
assert_false(state->job3->started);
|
||||
|
||||
/*
|
||||
* Now job1 is canceled while it's running,
|
||||
* this should let it stop it's loop.
|
||||
*/
|
||||
ok = tevent_req_cancel(state->job1->req);
|
||||
assert_false(ok);
|
||||
|
||||
/*
|
||||
* Job 1 completes, It got at least one sleep
|
||||
* timeout loop and has state->job1->canceled set.
|
||||
*/
|
||||
test_cancel_job_wait(state->job1, ev);
|
||||
assert_int_equal(state->job1->ret, 0);
|
||||
assert_null(state->job1->req);
|
||||
assert_true(state->job1->started);
|
||||
assert_true(state->job1->finished);
|
||||
assert_true(state->job1->canceled);
|
||||
assert_false(state->job1->orphaned);
|
||||
assert_in_range(state->job1->polls, 1, 100);
|
||||
assert_int_equal(state->job1->timeouts, state->job1->polls);
|
||||
|
||||
/*
|
||||
* Now we create jobs 4 and 5
|
||||
* Both should execute.
|
||||
* Job 4 is orphaned while running by a TALLOC_FREE()
|
||||
* This should stop job 4 and let job 5 start.
|
||||
* We do a "normal" exit in job 5 by creating some activity
|
||||
* on the socketpair.
|
||||
*/
|
||||
|
||||
state->job4 = test_cancel_job_create(state);
|
||||
assert_non_null(state->job4);
|
||||
|
||||
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
||||
assert_int_equal(ret, 0);
|
||||
|
||||
state->job4->fdm = fdpair[0];
|
||||
state->job4->fdj = fdpair[1];
|
||||
|
||||
state->job4->req = pthreadpool_tevent_job_send(
|
||||
state->job4, ev, pool, test_cancel_job_fn, state->job4);
|
||||
assert_non_null(state->job4->req);
|
||||
tevent_req_set_callback(state->job4->req,
|
||||
test_cancel_job_done,
|
||||
state->job4);
|
||||
|
||||
state->job5 = test_cancel_job_create(state);
|
||||
assert_non_null(state->job5);
|
||||
|
||||
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
||||
assert_int_equal(ret, 0);
|
||||
|
||||
state->job5->fdm = fdpair[0];
|
||||
state->job5->fdj = fdpair[1];
|
||||
|
||||
state->job5->req = pthreadpool_tevent_job_send(
|
||||
state->job5, ev, pool, test_cancel_job_fn, state->job5);
|
||||
assert_non_null(state->job5->req);
|
||||
tevent_req_set_callback(state->job5->req,
|
||||
test_cancel_job_done,
|
||||
state->job5);
|
||||
|
||||
/*
|
||||
* Make sure job 5 can exit as soon as possible.
|
||||
* It will never get a sleep/poll timeout.
|
||||
*/
|
||||
ret = write(state->job5->fdm, &c, 1);
|
||||
assert_int_equal(ret, 1);
|
||||
|
||||
/*
|
||||
* Wait for the job 4 to start
|
||||
*/
|
||||
ret = read(state->job4->fdm, &c, 1);
|
||||
assert_int_equal(ret, 1);
|
||||
|
||||
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
|
||||
|
||||
/*
|
||||
* destroy the request so that it's marked
|
||||
* as orphaned.
|
||||
*/
|
||||
TALLOC_FREE(state->job4->req);
|
||||
|
||||
/*
|
||||
* Job 5 completes, It got no sleep timeout loop.
|
||||
*/
|
||||
test_cancel_job_wait(state->job5, ev);
|
||||
assert_int_equal(state->job5->ret, 0);
|
||||
assert_null(state->job5->req);
|
||||
assert_true(state->job5->started);
|
||||
assert_true(state->job5->finished);
|
||||
assert_false(state->job5->canceled);
|
||||
assert_false(state->job5->orphaned);
|
||||
assert_int_equal(state->job5->polls, 1);
|
||||
assert_int_equal(state->job5->timeouts, 0);
|
||||
|
||||
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
|
||||
|
||||
/*
|
||||
* Job 2 is still not executed as we did a TALLOC_FREE()
|
||||
* before is was scheduled.
|
||||
*/
|
||||
assert_false(state->job2->completed);
|
||||
assert_false(state->job2->started);
|
||||
|
||||
/*
|
||||
* Job 4 is still wasn't completed as we did a TALLOC_FREE()
|
||||
* while it is was running. but it was started and has
|
||||
* orphaned set
|
||||
*/
|
||||
assert_false(state->job4->completed);
|
||||
assert_true(state->job4->started);
|
||||
assert_true(state->job4->finished);
|
||||
assert_false(state->job4->canceled);
|
||||
assert_true(state->job4->orphaned);
|
||||
assert_in_range(state->job4->polls, 1, 100);
|
||||
assert_int_equal(state->job4->timeouts, state->job4->polls);
|
||||
|
||||
/*
|
||||
* Now we create jobs 6
|
||||
* We destroy the pool while it's executing.
|
||||
*/
|
||||
|
||||
state->job6 = test_cancel_job_create(state);
|
||||
assert_non_null(state->job6);
|
||||
|
||||
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
||||
assert_int_equal(ret, 0);
|
||||
|
||||
state->job6->fdm = fdpair[0];
|
||||
state->job6->fdj = fdpair[1];
|
||||
|
||||
state->job6->req = pthreadpool_tevent_job_send(
|
||||
state->job6, ev, pool, test_cancel_job_fn, state->job6);
|
||||
assert_non_null(state->job6->req);
|
||||
tevent_req_set_callback(state->job6->req,
|
||||
test_cancel_job_done,
|
||||
state->job6);
|
||||
|
||||
/*
|
||||
* Wait for the job 6 to start
|
||||
*/
|
||||
ret = read(state->job6->fdm, &c, 1);
|
||||
assert_int_equal(ret, 1);
|
||||
|
||||
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
|
||||
|
||||
/*
|
||||
* destroy the request so that it's marked
|
||||
* as orphaned.
|
||||
*/
|
||||
pool = NULL;
|
||||
TALLOC_FREE(t->opool);
|
||||
|
||||
/*
|
||||
* Wait until the job finished.
|
||||
*/
|
||||
ret = read(state->job6->fdm, &c, 1);
|
||||
assert_int_equal(ret, 0);
|
||||
|
||||
/*
|
||||
* Job 6 is still dangling arround.
|
||||
*
|
||||
* We need to convince valgrind --tool={drd,helgrind}
|
||||
* that the read above is good enough to be
|
||||
* sure the job is finished and closed the other end of
|
||||
* the socketpair.
|
||||
*/
|
||||
ANNOTATE_BENIGN_RACE_SIZED(state->job6,
|
||||
sizeof(*state->job6),
|
||||
"protected by thread fence");
|
||||
assert_non_null(state->job6->req);
|
||||
assert_true(tevent_req_is_in_progress(state->job6->req));
|
||||
assert_false(state->job6->completed);
|
||||
assert_true(state->job6->started);
|
||||
assert_true(state->job6->finished);
|
||||
assert_false(state->job6->canceled);
|
||||
assert_true(state->job6->orphaned);
|
||||
assert_in_range(state->job6->polls, 1, 100);
|
||||
assert_int_equal(state->job6->timeouts, state->job4->polls);
|
||||
|
||||
TALLOC_FREE(state);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
const struct CMUnitTest tests[] = {
|
||||
cmocka_unit_test_setup_teardown(test_create,
|
||||
setup_pthreadpool_tevent,
|
||||
teardown_pthreadpool_tevent),
|
||||
cmocka_unit_test_setup_teardown(test_cancel_job,
|
||||
setup_pthreadpool_tevent,
|
||||
teardown_pthreadpool_tevent),
|
||||
};
|
||||
|
||||
cmocka_set_message_output(CM_OUTPUT_SUBUNIT);
|
||||
|
Loading…
x
Reference in New Issue
Block a user