mirror of
https://github.com/samba-team/samba.git
synced 2024-12-22 13:34:15 +03:00
fb6b6cf3e4
Pair-Programmed-With: Stefan Metzmacher <metze@samba.org> Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Ralph Boehme <slow@samba.org>
1399 lines
37 KiB
C
1399 lines
37 KiB
C
/*
|
|
* Unix SMB/CIFS implementation.
|
|
* cmocka tests for thread pool implementation
|
|
* Copyright (C) Christof Schmitt 2017
|
|
*
|
|
* This program is free software; you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation; either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
#include "config.h"
|
|
#include <errno.h>
|
|
#include <pthread.h>
|
|
#include <setjmp.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <limits.h>
|
|
#include <unistd.h>
|
|
#include <sys/types.h>
|
|
#include <sys/socket.h>
|
|
|
|
#include <talloc.h>
|
|
#include <tevent.h>
|
|
#include <pthreadpool_tevent.h>
|
|
|
|
#include <cmocka.h>
|
|
#include <poll.h>
|
|
|
|
#ifdef HAVE_VALGRIND_HELGRIND_H
|
|
#include <valgrind/helgrind.h>
|
|
#endif
|
|
#ifndef ANNOTATE_BENIGN_RACE_SIZED
|
|
#define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
|
|
#endif
|
|
|
|
struct pthreadpool_tevent_test {
|
|
struct tevent_context *ev;
|
|
struct pthreadpool_tevent *upool;
|
|
struct pthreadpool_tevent *spool;
|
|
struct pthreadpool_tevent *opool;
|
|
};
|
|
|
|
static int setup_pthreadpool_tevent(void **state)
|
|
{
|
|
struct pthreadpool_tevent_test *t;
|
|
int ret;
|
|
size_t max_threads;
|
|
|
|
t = talloc_zero(NULL, struct pthreadpool_tevent_test);
|
|
assert_non_null(t);
|
|
|
|
t->ev = tevent_context_init(t);
|
|
assert_non_null(t->ev);
|
|
|
|
ret = pthreadpool_tevent_init(t->ev, UINT_MAX, &t->upool);
|
|
assert_int_equal(ret, 0);
|
|
|
|
max_threads = pthreadpool_tevent_max_threads(t->upool);
|
|
assert_int_equal(max_threads, UINT_MAX);
|
|
|
|
ret = pthreadpool_tevent_init(t->ev, 1, &t->opool);
|
|
assert_int_equal(ret, 0);
|
|
|
|
max_threads = pthreadpool_tevent_max_threads(t->opool);
|
|
assert_int_equal(max_threads, 1);
|
|
|
|
ret = pthreadpool_tevent_init(t->ev, 0, &t->spool);
|
|
assert_int_equal(ret, 0);
|
|
|
|
max_threads = pthreadpool_tevent_max_threads(t->spool);
|
|
assert_int_equal(max_threads, 0);
|
|
|
|
*state = t;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static int teardown_pthreadpool_tevent(void **state)
|
|
{
|
|
struct pthreadpool_tevent_test *t = *state;
|
|
|
|
TALLOC_FREE(t);
|
|
|
|
return 0;
|
|
}
|
|
|
|
int __wrap_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
|
|
void *(*start_routine) (void *), void *arg);
|
|
int __real_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
|
|
void *(*start_routine) (void *), void *arg);
|
|
|
|
int __wrap_pthread_create(pthread_t *thread, const pthread_attr_t *attr,
|
|
void *(*start_routine) (void *), void *arg)
|
|
{
|
|
int error;
|
|
|
|
error = mock_type(int);
|
|
if (error != 0) {
|
|
return error;
|
|
}
|
|
|
|
return __real_pthread_create(thread, attr, start_routine, arg);
|
|
}
|
|
|
|
static void test_job_threadid(void *ptr)
|
|
{
|
|
pthread_t *threadid = ptr;
|
|
|
|
*threadid = pthread_self();
|
|
}
|
|
|
|
static int test_create_do(struct tevent_context *ev,
|
|
struct pthreadpool_tevent *pool,
|
|
bool *executed,
|
|
bool *in_main_thread)
|
|
{
|
|
struct tevent_req *req;
|
|
pthread_t zero_thread;
|
|
pthread_t main_thread;
|
|
pthread_t worker_thread;
|
|
bool ok;
|
|
int ret;
|
|
|
|
*executed = false;
|
|
*in_main_thread = false;
|
|
|
|
memset(&zero_thread, 0, sizeof(zero_thread));
|
|
main_thread = pthread_self();
|
|
worker_thread = zero_thread;
|
|
|
|
req = pthreadpool_tevent_job_send(
|
|
ev, ev, pool, test_job_threadid, &worker_thread);
|
|
if (req == NULL) {
|
|
fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
|
|
return ENOMEM;
|
|
}
|
|
|
|
ok = tevent_req_poll(req, ev);
|
|
if (!ok) {
|
|
ret = errno;
|
|
fprintf(stderr, "tevent_req_poll failed: %s\n",
|
|
strerror(ret));
|
|
*executed = !pthread_equal(worker_thread, zero_thread);
|
|
*in_main_thread = pthread_equal(worker_thread, main_thread);
|
|
return ret;
|
|
}
|
|
|
|
|
|
ret = pthreadpool_tevent_job_recv(req);
|
|
TALLOC_FREE(req);
|
|
*executed = !pthread_equal(worker_thread, zero_thread);
|
|
*in_main_thread = pthread_equal(worker_thread, main_thread);
|
|
if (ret != 0) {
|
|
fprintf(stderr, "tevent_req_recv failed: %s\n",
|
|
strerror(ret));
|
|
return ret;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void test_create(void **state)
|
|
{
|
|
struct pthreadpool_tevent_test *t = *state;
|
|
bool executed;
|
|
bool in_main_thread;
|
|
int ret;
|
|
|
|
/*
|
|
* When pthreadpool cannot create the first worker thread,
|
|
* this job will run in the sync fallback in the main thread.
|
|
*/
|
|
will_return(__wrap_pthread_create, EAGAIN);
|
|
ret = test_create_do(t->ev, t->upool, &executed, &in_main_thread);
|
|
assert_int_equal(ret, EAGAIN);
|
|
assert_false(executed);
|
|
assert_false(in_main_thread);
|
|
|
|
/*
|
|
* The sync pool won't trigger pthread_create()
|
|
* It will be triggered by the one pool.
|
|
*/
|
|
will_return(__wrap_pthread_create, EAGAIN);
|
|
|
|
ret = test_create_do(t->ev, t->spool, &executed, &in_main_thread);
|
|
assert_int_equal(ret, 0);
|
|
assert_true(executed);
|
|
assert_true(in_main_thread);
|
|
|
|
ret = test_create_do(t->ev, t->opool, &executed, &in_main_thread);
|
|
assert_int_equal(ret, EAGAIN);
|
|
assert_false(executed);
|
|
assert_false(in_main_thread);
|
|
|
|
/*
|
|
* When a thread can be created, the job will run in the worker thread.
|
|
*/
|
|
will_return(__wrap_pthread_create, 0);
|
|
ret = test_create_do(t->ev, t->upool, &executed, &in_main_thread);
|
|
assert_int_equal(ret, 0);
|
|
assert_true(executed);
|
|
assert_false(in_main_thread);
|
|
|
|
poll(NULL, 0, 10);
|
|
|
|
/*
|
|
* Workerthread will still be active for a second; immediately
|
|
* running another job will also use the worker thread, even
|
|
* if a new thread cannot be created.
|
|
*/
|
|
ret = test_create_do(t->ev, t->upool, &executed, &in_main_thread);
|
|
assert_int_equal(ret, 0);
|
|
assert_true(executed);
|
|
assert_false(in_main_thread);
|
|
|
|
/*
|
|
* When a thread can be created, the job will run in the worker thread.
|
|
*/
|
|
will_return(__wrap_pthread_create, 0);
|
|
ret = test_create_do(t->ev, t->opool, &executed, &in_main_thread);
|
|
assert_int_equal(ret, 0);
|
|
assert_true(executed);
|
|
assert_false(in_main_thread);
|
|
|
|
poll(NULL, 0, 10);
|
|
|
|
/*
|
|
* Workerthread will still be active for a second; immediately
|
|
* running another job will also use the worker thread, even
|
|
* if a new thread cannot be created.
|
|
*/
|
|
ret = test_create_do(t->ev, t->opool, &executed, &in_main_thread);
|
|
assert_int_equal(ret, 0);
|
|
assert_true(executed);
|
|
assert_false(in_main_thread);
|
|
}
|
|
|
|
static void test_per_thread_cwd_job(void *ptr)
|
|
{
|
|
const bool *per_thread_cwd_ptr = ptr;
|
|
bool per_thread_cwd;
|
|
char cwdbuf[PATH_MAX] = {0,};
|
|
char *cwdstr = NULL;
|
|
int ret;
|
|
|
|
/*
|
|
* This needs to be consistent.
|
|
*/
|
|
per_thread_cwd = pthreadpool_tevent_current_job_per_thread_cwd();
|
|
assert_int_equal(per_thread_cwd, *per_thread_cwd_ptr);
|
|
|
|
if (!per_thread_cwd) {
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* Check we're not already in "/".
|
|
*/
|
|
cwdstr = getcwd(cwdbuf, sizeof(cwdbuf));
|
|
assert_non_null(cwdstr);
|
|
assert_string_not_equal(cwdstr, "/");
|
|
|
|
ret = chdir("/");
|
|
assert_int_equal(ret, 0);
|
|
|
|
/*
|
|
* Check we're in "/" now.
|
|
*/
|
|
cwdstr = getcwd(cwdbuf, sizeof(cwdbuf));
|
|
assert_non_null(cwdstr);
|
|
assert_string_equal(cwdstr, "/");
|
|
}
|
|
|
|
static int test_per_thread_cwd_do(struct tevent_context *ev,
|
|
struct pthreadpool_tevent *pool)
|
|
{
|
|
struct tevent_req *req;
|
|
bool per_thread_cwd;
|
|
bool ok;
|
|
int ret;
|
|
per_thread_cwd = pthreadpool_tevent_per_thread_cwd(pool);
|
|
|
|
req = pthreadpool_tevent_job_send(
|
|
ev, ev, pool, test_per_thread_cwd_job, &per_thread_cwd);
|
|
if (req == NULL) {
|
|
fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
|
|
return ENOMEM;
|
|
}
|
|
|
|
ok = tevent_req_poll(req, ev);
|
|
if (!ok) {
|
|
ret = errno;
|
|
fprintf(stderr, "tevent_req_poll failed: %s\n",
|
|
strerror(ret));
|
|
return ret;
|
|
}
|
|
|
|
ret = pthreadpool_tevent_job_recv(req);
|
|
TALLOC_FREE(req);
|
|
if (ret != 0) {
|
|
fprintf(stderr, "tevent_req_recv failed: %s\n",
|
|
strerror(ret));
|
|
return ret;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static void test_per_thread_cwd(void **state)
|
|
{
|
|
struct pthreadpool_tevent_test *t = *state;
|
|
int ret;
|
|
bool per_thread_cwd_u;
|
|
bool per_thread_cwd_o;
|
|
bool per_thread_cwd_s;
|
|
char cwdbuf1[PATH_MAX] = {0,};
|
|
char *cwdstr1 = NULL;
|
|
char cwdbuf2[PATH_MAX] = {0,};
|
|
char *cwdstr2 = NULL;
|
|
|
|
/*
|
|
* The unlimited and one pools
|
|
* should be consistent.
|
|
*
|
|
* We can't enforce this as some constraint
|
|
* container environments disable unshare()
|
|
* completely, even just with CLONE_FS.
|
|
*/
|
|
per_thread_cwd_u = pthreadpool_tevent_per_thread_cwd(t->upool);
|
|
per_thread_cwd_o = pthreadpool_tevent_per_thread_cwd(t->opool);
|
|
assert_int_equal(per_thread_cwd_u, per_thread_cwd_o);
|
|
|
|
/*
|
|
* The sync pool should never support this.
|
|
*/
|
|
per_thread_cwd_s = pthreadpool_tevent_per_thread_cwd(t->spool);
|
|
assert_false(per_thread_cwd_s);
|
|
|
|
/*
|
|
* Check we're not already in "/".
|
|
*/
|
|
cwdstr1 = getcwd(cwdbuf1, sizeof(cwdbuf1));
|
|
assert_non_null(cwdstr1);
|
|
assert_string_not_equal(cwdstr1, "/");
|
|
|
|
will_return(__wrap_pthread_create, 0);
|
|
ret = test_per_thread_cwd_do(t->ev, t->upool);
|
|
assert_int_equal(ret, 0);
|
|
|
|
/*
|
|
* Check we're still in the same directory.
|
|
*/
|
|
cwdstr2 = getcwd(cwdbuf2, sizeof(cwdbuf2));
|
|
assert_non_null(cwdstr2);
|
|
assert_string_equal(cwdstr2, cwdstr1);
|
|
|
|
will_return(__wrap_pthread_create, 0);
|
|
ret = test_per_thread_cwd_do(t->ev, t->opool);
|
|
assert_int_equal(ret, 0);
|
|
|
|
/*
|
|
* Check we're still in the same directory.
|
|
*/
|
|
cwdstr2 = getcwd(cwdbuf2, sizeof(cwdbuf2));
|
|
assert_non_null(cwdstr2);
|
|
assert_string_equal(cwdstr2, cwdstr1);
|
|
|
|
ret = test_per_thread_cwd_do(t->ev, t->spool);
|
|
assert_int_equal(ret, 0);
|
|
|
|
/*
|
|
* Check we're still in the same directory.
|
|
*/
|
|
cwdstr2 = getcwd(cwdbuf2, sizeof(cwdbuf2));
|
|
assert_non_null(cwdstr2);
|
|
assert_string_equal(cwdstr2, cwdstr1);
|
|
}
|
|
|
|
struct test_cancel_job {
|
|
int fdm; /* the main end of socketpair */
|
|
int fdj; /* the job end of socketpair */
|
|
bool started;
|
|
bool canceled;
|
|
bool orphaned;
|
|
bool finished;
|
|
size_t polls;
|
|
size_t timeouts;
|
|
int sleep_msec;
|
|
struct tevent_req *req;
|
|
bool completed;
|
|
int ret;
|
|
};
|
|
|
|
static void test_cancel_job_done(struct tevent_req *req);
|
|
|
|
static int test_cancel_job_destructor(struct test_cancel_job *job)
|
|
{
|
|
ANNOTATE_BENIGN_RACE_SIZED(&job->started,
|
|
sizeof(job->started),
|
|
"protected by pthreadpool_tevent code");
|
|
if (job->started) {
|
|
ANNOTATE_BENIGN_RACE_SIZED(&job->finished,
|
|
sizeof(job->finished),
|
|
"protected by pthreadpool_tevent code");
|
|
assert_true(job->finished);
|
|
}
|
|
|
|
ANNOTATE_BENIGN_RACE_SIZED(&job->fdj,
|
|
sizeof(job->fdj),
|
|
"protected by pthreadpool_tevent code");
|
|
|
|
if (job->fdm != -1) {
|
|
close(job->fdm);
|
|
job->fdm = -1;
|
|
}
|
|
if (job->fdj != -1) {
|
|
close(job->fdj);
|
|
job->fdj = -1;
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
static struct test_cancel_job *test_cancel_job_create(TALLOC_CTX *mem_ctx)
|
|
{
|
|
struct test_cancel_job *job = NULL;
|
|
|
|
job = talloc(mem_ctx, struct test_cancel_job);
|
|
if (job == NULL) {
|
|
return NULL;
|
|
}
|
|
*job = (struct test_cancel_job) {
|
|
.fdm = -1,
|
|
.fdj = -1,
|
|
.sleep_msec = 50,
|
|
};
|
|
|
|
talloc_set_destructor(job, test_cancel_job_destructor);
|
|
return job;
|
|
}
|
|
|
|
static void test_cancel_job_fn(void *ptr)
|
|
{
|
|
struct test_cancel_job *job = (struct test_cancel_job *)ptr;
|
|
int fdj = -1;
|
|
char c = 0;
|
|
int ret;
|
|
|
|
assert_non_null(job); /* make sure we abort without a job pointer */
|
|
|
|
job->started = true;
|
|
fdj = job->fdj;
|
|
job->fdj = -1;
|
|
|
|
if (!pthreadpool_tevent_current_job_continue()) {
|
|
job->canceled = pthreadpool_tevent_current_job_canceled();
|
|
job->orphaned = pthreadpool_tevent_current_job_orphaned();
|
|
job->finished = true;
|
|
close(fdj);
|
|
return;
|
|
}
|
|
|
|
/*
|
|
* Notify that we main thread
|
|
*
|
|
* write of 1 byte should always work!
|
|
*/
|
|
ret = write(fdj, &c, 1);
|
|
assert_int_equal(ret, 1);
|
|
|
|
/*
|
|
* loop until the job was tried to
|
|
* be canceled or becomes orphaned.
|
|
*
|
|
* If there's some activity on the fd
|
|
* we directly finish.
|
|
*/
|
|
do {
|
|
struct pollfd pfd = {
|
|
.fd = fdj,
|
|
.events = POLLIN,
|
|
};
|
|
|
|
job->polls += 1;
|
|
|
|
ret = poll(&pfd, 1, job->sleep_msec);
|
|
if (ret == 1) {
|
|
job->finished = true;
|
|
close(fdj);
|
|
return;
|
|
}
|
|
assert_int_equal(ret, 0);
|
|
|
|
job->timeouts += 1;
|
|
|
|
} while (pthreadpool_tevent_current_job_continue());
|
|
|
|
job->canceled = pthreadpool_tevent_current_job_canceled();
|
|
job->orphaned = pthreadpool_tevent_current_job_orphaned();
|
|
job->finished = true;
|
|
close(fdj);
|
|
}
|
|
|
|
static void test_cancel_job_done(struct tevent_req *req)
|
|
{
|
|
struct test_cancel_job *job =
|
|
tevent_req_callback_data(req,
|
|
struct test_cancel_job);
|
|
|
|
job->ret = pthreadpool_tevent_job_recv(job->req);
|
|
TALLOC_FREE(job->req);
|
|
job->completed = true;
|
|
}
|
|
|
|
static void test_cancel_job_wait(struct test_cancel_job *job,
|
|
struct tevent_context *ev)
|
|
{
|
|
/*
|
|
* We have to keep looping until
|
|
* test_cancel_job_done was triggered
|
|
*/
|
|
while (!job->completed) {
|
|
int ret;
|
|
|
|
ret = tevent_loop_once(ev);
|
|
assert_int_equal(ret, 0);
|
|
}
|
|
}
|
|
|
|
struct test_cancel_state {
|
|
struct test_cancel_job *job1;
|
|
struct test_cancel_job *job2;
|
|
struct test_cancel_job *job3;
|
|
struct test_cancel_job *job4;
|
|
struct test_cancel_job *job5;
|
|
struct test_cancel_job *job6;
|
|
struct test_cancel_job *job7;
|
|
struct test_cancel_job *job8;
|
|
struct test_cancel_job *job9;
|
|
};
|
|
|
|
static void test_cancel_job(void **private_data)
|
|
{
|
|
struct pthreadpool_tevent_test *t = *private_data;
|
|
struct tevent_context *ev = t->ev;
|
|
struct pthreadpool_tevent *pool = t->opool;
|
|
struct test_cancel_state *state = NULL;
|
|
int ret;
|
|
bool ok;
|
|
int fdpair[2] = { -1, -1 };
|
|
char c = 0;
|
|
|
|
state = talloc_zero(t, struct test_cancel_state);
|
|
assert_non_null(state);
|
|
state->job1 = test_cancel_job_create(state);
|
|
assert_non_null(state->job1);
|
|
state->job2 = test_cancel_job_create(state);
|
|
assert_non_null(state->job2);
|
|
state->job3 = test_cancel_job_create(state);
|
|
assert_non_null(state->job3);
|
|
|
|
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
|
assert_int_equal(ret, 0);
|
|
|
|
state->job1->fdm = fdpair[0];
|
|
state->job1->fdj = fdpair[1];
|
|
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
|
|
|
|
will_return(__wrap_pthread_create, 0);
|
|
state->job1->req = pthreadpool_tevent_job_send(
|
|
state->job1, ev, pool, test_cancel_job_fn, state->job1);
|
|
assert_non_null(state->job1->req);
|
|
tevent_req_set_callback(state->job1->req,
|
|
test_cancel_job_done,
|
|
state->job1);
|
|
|
|
state->job2->req = pthreadpool_tevent_job_send(
|
|
state->job2, ev, pool, test_cancel_job_fn, NULL);
|
|
assert_non_null(state->job2->req);
|
|
tevent_req_set_callback(state->job2->req,
|
|
test_cancel_job_done,
|
|
state->job2);
|
|
|
|
state->job3->req = pthreadpool_tevent_job_send(
|
|
state->job3, ev, pool, test_cancel_job_fn, NULL);
|
|
assert_non_null(state->job3->req);
|
|
tevent_req_set_callback(state->job3->req,
|
|
test_cancel_job_done,
|
|
state->job3);
|
|
|
|
/*
|
|
* Wait for the job 1 to start.
|
|
*/
|
|
ret = read(state->job1->fdm, &c, 1);
|
|
assert_int_equal(ret, 1);
|
|
|
|
/*
|
|
* We cancel job 3 and destroy job2.
|
|
* Both should never be executed.
|
|
*/
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 2);
|
|
TALLOC_FREE(state->job2->req);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
|
|
ok = tevent_req_cancel(state->job3->req);
|
|
assert_true(ok);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
|
|
|
|
/*
|
|
* Job 3 should complete as canceled, while
|
|
* job 1 is still running.
|
|
*/
|
|
test_cancel_job_wait(state->job3, ev);
|
|
assert_int_equal(state->job3->ret, ECANCELED);
|
|
assert_null(state->job3->req);
|
|
assert_false(state->job3->started);
|
|
|
|
/*
|
|
* Now job1 is canceled while it's running,
|
|
* this should let it stop it's loop.
|
|
*/
|
|
ok = tevent_req_cancel(state->job1->req);
|
|
assert_false(ok);
|
|
|
|
/*
|
|
* Job 1 completes, It got at least one sleep
|
|
* timeout loop and has state->job1->canceled set.
|
|
*/
|
|
test_cancel_job_wait(state->job1, ev);
|
|
assert_int_equal(state->job1->ret, 0);
|
|
assert_null(state->job1->req);
|
|
assert_true(state->job1->started);
|
|
assert_true(state->job1->finished);
|
|
assert_true(state->job1->canceled);
|
|
assert_false(state->job1->orphaned);
|
|
assert_in_range(state->job1->polls, 1, 100);
|
|
assert_int_equal(state->job1->timeouts, state->job1->polls);
|
|
|
|
/*
|
|
* Now we create jobs 4 and 5
|
|
* Both should execute.
|
|
* Job 4 is orphaned while running by a TALLOC_FREE()
|
|
* This should stop job 4 and let job 5 start.
|
|
* We do a "normal" exit in job 5 by creating some activity
|
|
* on the socketpair.
|
|
*/
|
|
|
|
state->job4 = test_cancel_job_create(state);
|
|
assert_non_null(state->job4);
|
|
|
|
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
|
assert_int_equal(ret, 0);
|
|
|
|
state->job4->fdm = fdpair[0];
|
|
state->job4->fdj = fdpair[1];
|
|
|
|
state->job4->req = pthreadpool_tevent_job_send(
|
|
state->job4, ev, pool, test_cancel_job_fn, state->job4);
|
|
assert_non_null(state->job4->req);
|
|
tevent_req_set_callback(state->job4->req,
|
|
test_cancel_job_done,
|
|
state->job4);
|
|
|
|
state->job5 = test_cancel_job_create(state);
|
|
assert_non_null(state->job5);
|
|
|
|
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
|
assert_int_equal(ret, 0);
|
|
|
|
state->job5->fdm = fdpair[0];
|
|
state->job5->fdj = fdpair[1];
|
|
|
|
state->job5->req = pthreadpool_tevent_job_send(
|
|
state->job5, ev, pool, test_cancel_job_fn, state->job5);
|
|
assert_non_null(state->job5->req);
|
|
tevent_req_set_callback(state->job5->req,
|
|
test_cancel_job_done,
|
|
state->job5);
|
|
|
|
/*
|
|
* Make sure job 5 can exit as soon as possible.
|
|
* It will never get a sleep/poll timeout.
|
|
*/
|
|
ret = write(state->job5->fdm, &c, 1);
|
|
assert_int_equal(ret, 1);
|
|
|
|
/*
|
|
* Wait for the job 4 to start
|
|
*/
|
|
ret = read(state->job4->fdm, &c, 1);
|
|
assert_int_equal(ret, 1);
|
|
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
|
|
|
|
/*
|
|
* destroy the request so that it's marked
|
|
* as orphaned.
|
|
*/
|
|
TALLOC_FREE(state->job4->req);
|
|
|
|
/*
|
|
* Job 5 completes, It got no sleep timeout loop.
|
|
*/
|
|
test_cancel_job_wait(state->job5, ev);
|
|
assert_int_equal(state->job5->ret, 0);
|
|
assert_null(state->job5->req);
|
|
assert_true(state->job5->started);
|
|
assert_true(state->job5->finished);
|
|
assert_false(state->job5->canceled);
|
|
assert_false(state->job5->orphaned);
|
|
assert_int_equal(state->job5->polls, 1);
|
|
assert_int_equal(state->job5->timeouts, 0);
|
|
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
|
|
|
|
/*
|
|
* Job 2 is still not executed as we did a TALLOC_FREE()
|
|
* before is was scheduled.
|
|
*/
|
|
assert_false(state->job2->completed);
|
|
assert_false(state->job2->started);
|
|
|
|
/*
|
|
* Job 4 is still wasn't completed as we did a TALLOC_FREE()
|
|
* while it is was running. but it was started and has
|
|
* orphaned set
|
|
*/
|
|
assert_false(state->job4->completed);
|
|
assert_true(state->job4->started);
|
|
assert_true(state->job4->finished);
|
|
assert_false(state->job4->canceled);
|
|
assert_true(state->job4->orphaned);
|
|
assert_in_range(state->job4->polls, 1, 100);
|
|
assert_int_equal(state->job4->timeouts, state->job4->polls);
|
|
|
|
/*
|
|
* Now we create jobs 6
|
|
* We destroy the pool while it's executing.
|
|
*/
|
|
|
|
state->job6 = test_cancel_job_create(state);
|
|
assert_non_null(state->job6);
|
|
|
|
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
|
assert_int_equal(ret, 0);
|
|
|
|
state->job6->fdm = fdpair[0];
|
|
state->job6->fdj = fdpair[1];
|
|
|
|
state->job6->req = pthreadpool_tevent_job_send(
|
|
state->job6, ev, pool, test_cancel_job_fn, state->job6);
|
|
assert_non_null(state->job6->req);
|
|
tevent_req_set_callback(state->job6->req,
|
|
test_cancel_job_done,
|
|
state->job6);
|
|
|
|
/*
|
|
* Wait for the job 6 to start
|
|
*/
|
|
ret = read(state->job6->fdm, &c, 1);
|
|
assert_int_equal(ret, 1);
|
|
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
|
|
|
|
/*
|
|
* destroy the request so that it's marked
|
|
* as orphaned.
|
|
*/
|
|
pool = NULL;
|
|
TALLOC_FREE(t->opool);
|
|
|
|
/*
|
|
* Wait until the job finished.
|
|
*/
|
|
ret = read(state->job6->fdm, &c, 1);
|
|
assert_int_equal(ret, 0);
|
|
|
|
/*
|
|
* Job 6 is still dangling arround.
|
|
*
|
|
* We need to convince valgrind --tool={drd,helgrind}
|
|
* that the read above is good enough to be
|
|
* sure the job is finished and closed the other end of
|
|
* the socketpair.
|
|
*/
|
|
ANNOTATE_BENIGN_RACE_SIZED(state->job6,
|
|
sizeof(*state->job6),
|
|
"protected by thread fence");
|
|
assert_non_null(state->job6->req);
|
|
assert_true(tevent_req_is_in_progress(state->job6->req));
|
|
assert_false(state->job6->completed);
|
|
assert_true(state->job6->started);
|
|
assert_true(state->job6->finished);
|
|
assert_false(state->job6->canceled);
|
|
assert_true(state->job6->orphaned);
|
|
assert_in_range(state->job6->polls, 1, 100);
|
|
assert_int_equal(state->job6->timeouts, state->job4->polls);
|
|
|
|
TALLOC_FREE(state);
|
|
}
|
|
|
|
struct test_pthreadpool_tevent_wrap_tp_stats {
|
|
unsigned num_before;
|
|
unsigned num_after;
|
|
bool destroyed;
|
|
};
|
|
|
|
struct test_pthreadpool_tevent_wrap_tp_state {
|
|
struct test_pthreadpool_tevent_wrap_tp_state **selfptr;
|
|
struct test_pthreadpool_tevent_wrap_tp_stats *stats;
|
|
};
|
|
|
|
static int test_pthreadpool_tevent_wrap_tp_state_destructor(
|
|
struct test_pthreadpool_tevent_wrap_tp_state *state)
|
|
{
|
|
state->stats->destroyed = true;
|
|
*state->selfptr = NULL;
|
|
|
|
return 0;
|
|
}
|
|
|
|
static bool test_pthreadpool_tevent_tp_before(struct pthreadpool_tevent *wrap,
|
|
void *private_data,
|
|
struct pthreadpool_tevent *main,
|
|
const char *location)
|
|
{
|
|
struct test_pthreadpool_tevent_wrap_tp_state *state =
|
|
talloc_get_type_abort(private_data,
|
|
struct test_pthreadpool_tevent_wrap_tp_state);
|
|
|
|
state->stats->num_before++;
|
|
|
|
return true;
|
|
}
|
|
|
|
static bool test_pthreadpool_tevent_tp_after(struct pthreadpool_tevent *wrap,
|
|
void *private_data,
|
|
struct pthreadpool_tevent *main,
|
|
const char *location)
|
|
{
|
|
struct test_pthreadpool_tevent_wrap_tp_state *state =
|
|
talloc_get_type_abort(private_data,
|
|
struct test_pthreadpool_tevent_wrap_tp_state);
|
|
|
|
state->stats->num_after++;
|
|
|
|
return true;
|
|
}
|
|
|
|
static const struct pthreadpool_tevent_wrapper_ops test_tp_ops = {
|
|
.name = "test_pthreadpool_tevent_tp",
|
|
.before_job = test_pthreadpool_tevent_tp_before,
|
|
.after_job = test_pthreadpool_tevent_tp_after,
|
|
};
|
|
|
|
static void test_wrap_cancel_job(void **private_data)
|
|
{
|
|
struct pthreadpool_tevent_test *t = *private_data;
|
|
struct tevent_context *ev = t->ev;
|
|
struct pthreadpool_tevent *poolw1 = NULL;
|
|
struct test_pthreadpool_tevent_wrap_tp_state *poolw1_state = NULL;
|
|
struct test_pthreadpool_tevent_wrap_tp_stats poolw1_stats = {
|
|
.destroyed = false,
|
|
};
|
|
struct pthreadpool_tevent *poolw2 = NULL;
|
|
struct test_pthreadpool_tevent_wrap_tp_state *poolw2_state = NULL;
|
|
struct test_pthreadpool_tevent_wrap_tp_stats poolw2_stats = {
|
|
.destroyed = false,
|
|
};
|
|
size_t max_threads_o;
|
|
size_t max_threads_w1;
|
|
size_t max_threads_w2;
|
|
bool per_thread_cwd_o;
|
|
bool per_thread_cwd_w1;
|
|
bool per_thread_cwd_w2;
|
|
struct test_cancel_state *state = NULL;
|
|
int ret;
|
|
bool ok;
|
|
int fdpair[2] = { -1, -1 };
|
|
char c = 0;
|
|
|
|
max_threads_o = pthreadpool_tevent_max_threads(t->opool);
|
|
per_thread_cwd_o = pthreadpool_tevent_per_thread_cwd(t->opool);
|
|
|
|
poolw1 = pthreadpool_tevent_wrapper_create(
|
|
t->opool, t, &test_tp_ops, &poolw1_state,
|
|
struct test_pthreadpool_tevent_wrap_tp_state);
|
|
assert_non_null(poolw1);
|
|
poolw1_state->selfptr = &poolw1_state;
|
|
ANNOTATE_BENIGN_RACE_SIZED(&poolw1_stats,
|
|
sizeof(poolw1_stats),
|
|
"protected by pthreadpool_tevent code");
|
|
poolw1_state->stats = &poolw1_stats;
|
|
talloc_set_destructor(poolw1_state,
|
|
test_pthreadpool_tevent_wrap_tp_state_destructor);
|
|
|
|
poolw2 = pthreadpool_tevent_wrapper_create(
|
|
t->opool, t, &test_tp_ops, &poolw2_state,
|
|
struct test_pthreadpool_tevent_wrap_tp_state);
|
|
assert_non_null(poolw2);
|
|
poolw2_state->selfptr = &poolw2_state;
|
|
ANNOTATE_BENIGN_RACE_SIZED(&poolw2_stats,
|
|
sizeof(poolw2_stats),
|
|
"protected by pthreadpool_tevent code");
|
|
poolw2_state->stats = &poolw2_stats;
|
|
talloc_set_destructor(poolw2_state,
|
|
test_pthreadpool_tevent_wrap_tp_state_destructor);
|
|
|
|
assert_false(poolw1_stats.destroyed);
|
|
assert_int_equal(poolw1_stats.num_before, 0);
|
|
assert_int_equal(poolw1_stats.num_after, 0);
|
|
max_threads_w1 = pthreadpool_tevent_max_threads(poolw1);
|
|
assert_int_equal(max_threads_w1, max_threads_o);
|
|
per_thread_cwd_w1 = pthreadpool_tevent_per_thread_cwd(poolw1);
|
|
assert_int_equal(per_thread_cwd_w1, per_thread_cwd_o);
|
|
|
|
assert_false(poolw2_stats.destroyed);
|
|
assert_int_equal(poolw2_stats.num_before, 0);
|
|
assert_int_equal(poolw2_stats.num_after, 0);
|
|
max_threads_w2 = pthreadpool_tevent_max_threads(poolw2);
|
|
assert_int_equal(max_threads_w2, max_threads_o);
|
|
per_thread_cwd_w2 = pthreadpool_tevent_per_thread_cwd(poolw2);
|
|
assert_int_equal(per_thread_cwd_w2, per_thread_cwd_o);
|
|
|
|
state = talloc_zero(t, struct test_cancel_state);
|
|
assert_non_null(state);
|
|
state->job1 = test_cancel_job_create(state);
|
|
assert_non_null(state->job1);
|
|
state->job2 = test_cancel_job_create(state);
|
|
assert_non_null(state->job2);
|
|
state->job3 = test_cancel_job_create(state);
|
|
assert_non_null(state->job3);
|
|
|
|
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
|
assert_int_equal(ret, 0);
|
|
|
|
state->job1->fdm = fdpair[0];
|
|
state->job1->fdj = fdpair[1];
|
|
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
|
|
|
|
will_return(__wrap_pthread_create, 0);
|
|
state->job1->req = pthreadpool_tevent_job_send(
|
|
state->job1, ev, poolw1, test_cancel_job_fn, state->job1);
|
|
assert_non_null(state->job1->req);
|
|
tevent_req_set_callback(state->job1->req,
|
|
test_cancel_job_done,
|
|
state->job1);
|
|
|
|
state->job2->req = pthreadpool_tevent_job_send(
|
|
state->job2, ev, poolw1, test_cancel_job_fn, NULL);
|
|
assert_non_null(state->job2->req);
|
|
tevent_req_set_callback(state->job2->req,
|
|
test_cancel_job_done,
|
|
state->job2);
|
|
|
|
state->job3->req = pthreadpool_tevent_job_send(
|
|
state->job3, ev, poolw1, test_cancel_job_fn, NULL);
|
|
assert_non_null(state->job3->req);
|
|
tevent_req_set_callback(state->job3->req,
|
|
test_cancel_job_done,
|
|
state->job3);
|
|
|
|
/*
|
|
* Wait for the job 1 to start.
|
|
*/
|
|
ret = read(state->job1->fdm, &c, 1);
|
|
assert_int_equal(ret, 1);
|
|
|
|
/*
|
|
* We cancel job 3 and destroy job2.
|
|
* Both should never be executed.
|
|
*/
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 2);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 2);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 2);
|
|
TALLOC_FREE(state->job2->req);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1);
|
|
ok = tevent_req_cancel(state->job3->req);
|
|
assert_true(ok);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
|
|
|
|
/*
|
|
* Job 3 should complete as canceled, while
|
|
* job 1 is still running.
|
|
*/
|
|
test_cancel_job_wait(state->job3, ev);
|
|
assert_int_equal(state->job3->ret, ECANCELED);
|
|
assert_null(state->job3->req);
|
|
assert_false(state->job3->started);
|
|
|
|
/*
|
|
* Now job1 is canceled while it's running,
|
|
* this should let it stop it's loop.
|
|
*/
|
|
ok = tevent_req_cancel(state->job1->req);
|
|
assert_false(ok);
|
|
|
|
/*
|
|
* Job 1 completes, It got at least one sleep
|
|
* timeout loop and has state->job1->canceled set.
|
|
*/
|
|
test_cancel_job_wait(state->job1, ev);
|
|
assert_int_equal(state->job1->ret, 0);
|
|
assert_null(state->job1->req);
|
|
assert_true(state->job1->started);
|
|
assert_true(state->job1->finished);
|
|
assert_true(state->job1->canceled);
|
|
assert_false(state->job1->orphaned);
|
|
assert_in_range(state->job1->polls, 1, 100);
|
|
assert_int_equal(state->job1->timeouts, state->job1->polls);
|
|
|
|
assert_false(poolw1_stats.destroyed);
|
|
assert_int_equal(poolw1_stats.num_before, 1);
|
|
assert_int_equal(poolw1_stats.num_after, 1);
|
|
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
|
|
|
|
/*
|
|
* Now we create jobs 4 and 5
|
|
* Both should execute.
|
|
* Job 4 is orphaned while running by a TALLOC_FREE()
|
|
* This should stop job 4 and let job 5 start.
|
|
* We do a "normal" exit in job 5 by creating some activity
|
|
* on the socketpair.
|
|
*/
|
|
|
|
state->job4 = test_cancel_job_create(state);
|
|
assert_non_null(state->job4);
|
|
|
|
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
|
assert_int_equal(ret, 0);
|
|
|
|
state->job4->fdm = fdpair[0];
|
|
state->job4->fdj = fdpair[1];
|
|
|
|
state->job4->req = pthreadpool_tevent_job_send(
|
|
state->job4, ev, poolw1, test_cancel_job_fn, state->job4);
|
|
assert_non_null(state->job4->req);
|
|
tevent_req_set_callback(state->job4->req,
|
|
test_cancel_job_done,
|
|
state->job4);
|
|
|
|
state->job5 = test_cancel_job_create(state);
|
|
assert_non_null(state->job5);
|
|
|
|
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
|
assert_int_equal(ret, 0);
|
|
|
|
state->job5->fdm = fdpair[0];
|
|
state->job5->fdj = fdpair[1];
|
|
|
|
state->job5->req = pthreadpool_tevent_job_send(
|
|
state->job5, ev, poolw1, test_cancel_job_fn, state->job5);
|
|
assert_non_null(state->job5->req);
|
|
tevent_req_set_callback(state->job5->req,
|
|
test_cancel_job_done,
|
|
state->job5);
|
|
|
|
/*
|
|
* Make sure job 5 can exit as soon as possible.
|
|
* It will never get a sleep/poll timeout.
|
|
*/
|
|
ret = write(state->job5->fdm, &c, 1);
|
|
assert_int_equal(ret, 1);
|
|
|
|
/*
|
|
* Wait for the job 4 to start
|
|
*/
|
|
ret = read(state->job4->fdm, &c, 1);
|
|
assert_int_equal(ret, 1);
|
|
|
|
assert_false(poolw1_stats.destroyed);
|
|
assert_int_equal(poolw1_stats.num_before, 2);
|
|
assert_int_equal(poolw1_stats.num_after, 1);
|
|
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1);
|
|
|
|
/*
|
|
* destroy the request so that it's marked
|
|
* as orphaned.
|
|
*
|
|
* As we're in the wrapper mode, the
|
|
* job thread will exit and try to create
|
|
* a new thread.
|
|
*/
|
|
TALLOC_FREE(state->job4->req);
|
|
|
|
/*
|
|
* Job 5 completes, It got no sleep timeout loop.
|
|
*/
|
|
will_return(__wrap_pthread_create, 0);
|
|
test_cancel_job_wait(state->job5, ev);
|
|
assert_int_equal(state->job5->ret, 0);
|
|
assert_null(state->job5->req);
|
|
assert_true(state->job5->started);
|
|
assert_true(state->job5->finished);
|
|
assert_false(state->job5->canceled);
|
|
assert_false(state->job5->orphaned);
|
|
assert_int_equal(state->job5->polls, 1);
|
|
assert_int_equal(state->job5->timeouts, 0);
|
|
|
|
assert_false(poolw1_stats.destroyed);
|
|
assert_int_equal(poolw1_stats.num_before, 3);
|
|
assert_int_equal(poolw1_stats.num_after, 2);
|
|
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
|
|
|
|
/*
|
|
* Job 2 is still not executed as we did a TALLOC_FREE()
|
|
* before is was scheduled.
|
|
*/
|
|
assert_false(state->job2->completed);
|
|
assert_false(state->job2->started);
|
|
|
|
/*
|
|
* Job 4 is still wasn't completed as we did a TALLOC_FREE()
|
|
* while it is was running. but it was started and has
|
|
* orphaned set
|
|
*/
|
|
assert_false(state->job4->completed);
|
|
assert_true(state->job4->started);
|
|
assert_true(state->job4->finished);
|
|
assert_false(state->job4->canceled);
|
|
assert_true(state->job4->orphaned);
|
|
assert_in_range(state->job4->polls, 1, 100);
|
|
assert_int_equal(state->job4->timeouts, state->job4->polls);
|
|
|
|
assert_false(poolw1_stats.destroyed);
|
|
assert_int_equal(poolw1_stats.num_before, 3);
|
|
assert_int_equal(poolw1_stats.num_after, 2);
|
|
|
|
/*
|
|
* Now we create jobs 6 and 7
|
|
* Both should execute.
|
|
* We destroy the pool wrapper (1) while job 6 is executing.
|
|
* This should stop job 6 and let job 7 start.
|
|
* Job 7 runs on the pool wrapper (2).
|
|
* We do a "normal" exit in job 7 by creating some activity
|
|
* on the socketpair.
|
|
*/
|
|
|
|
state->job6 = test_cancel_job_create(state);
|
|
assert_non_null(state->job6);
|
|
|
|
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
|
assert_int_equal(ret, 0);
|
|
|
|
state->job6->fdm = fdpair[0];
|
|
state->job6->fdj = fdpair[1];
|
|
|
|
state->job6->req = pthreadpool_tevent_job_send(
|
|
state->job6, ev, poolw1, test_cancel_job_fn, state->job6);
|
|
assert_non_null(state->job6->req);
|
|
tevent_req_set_callback(state->job6->req,
|
|
test_cancel_job_done,
|
|
state->job6);
|
|
|
|
state->job7 = test_cancel_job_create(state);
|
|
assert_non_null(state->job7);
|
|
|
|
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
|
assert_int_equal(ret, 0);
|
|
|
|
state->job7->fdm = fdpair[0];
|
|
state->job7->fdj = fdpair[1];
|
|
|
|
state->job7->req = pthreadpool_tevent_job_send(
|
|
state->job7, ev, poolw2, test_cancel_job_fn, state->job7);
|
|
assert_non_null(state->job7->req);
|
|
tevent_req_set_callback(state->job7->req,
|
|
test_cancel_job_done,
|
|
state->job7);
|
|
|
|
/*
|
|
* Make sure job 7 can exit as soon as possible.
|
|
* It will never get a sleep/poll timeout.
|
|
*/
|
|
ret = write(state->job7->fdm, &c, 1);
|
|
assert_int_equal(ret, 1);
|
|
|
|
/*
|
|
* Wait for the job 6 to start
|
|
*/
|
|
ret = read(state->job6->fdm, &c, 1);
|
|
assert_int_equal(ret, 1);
|
|
|
|
assert_non_null(poolw1_state);
|
|
assert_false(poolw1_stats.destroyed);
|
|
assert_int_equal(poolw1_stats.num_before, 4);
|
|
assert_int_equal(poolw1_stats.num_after, 2);
|
|
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1);
|
|
|
|
/*
|
|
* destroy the request so that it's marked
|
|
* as orphaned.
|
|
*/
|
|
TALLOC_FREE(poolw1);
|
|
|
|
/*
|
|
* Wait until the job finished.
|
|
*/
|
|
ret = read(state->job6->fdm, &c, 1);
|
|
assert_int_equal(ret, 0);
|
|
|
|
assert_null(poolw1_state);
|
|
assert_true(poolw1_stats.destroyed);
|
|
assert_int_equal(poolw1_stats.num_before, 4);
|
|
assert_int_equal(poolw1_stats.num_after, 2);
|
|
|
|
/*
|
|
* Job 6 is still dangling arround.
|
|
*
|
|
* We need to convince valgrind --tool={drd,helgrind}
|
|
* that the read above is good enough to be
|
|
* sure the job is finished and closed the other end of
|
|
* the socketpair.
|
|
*/
|
|
ANNOTATE_BENIGN_RACE_SIZED(state->job6,
|
|
sizeof(*state->job6),
|
|
"protected by thread fence");
|
|
assert_non_null(state->job6->req);
|
|
assert_true(tevent_req_is_in_progress(state->job6->req));
|
|
assert_false(state->job6->completed);
|
|
assert_true(state->job6->started);
|
|
assert_true(state->job6->finished);
|
|
assert_false(state->job6->canceled);
|
|
assert_true(state->job6->orphaned);
|
|
assert_in_range(state->job6->polls, 1, 100);
|
|
assert_int_equal(state->job6->timeouts, state->job4->polls);
|
|
|
|
/*
|
|
* Job 7 completes, It got no sleep timeout loop.
|
|
*/
|
|
will_return(__wrap_pthread_create, 0);
|
|
test_cancel_job_wait(state->job7, ev);
|
|
assert_int_equal(state->job7->ret, 0);
|
|
assert_null(state->job7->req);
|
|
assert_true(state->job7->started);
|
|
assert_true(state->job7->finished);
|
|
assert_false(state->job7->canceled);
|
|
assert_false(state->job7->orphaned);
|
|
assert_int_equal(state->job7->polls, 1);
|
|
assert_int_equal(state->job7->timeouts, 0);
|
|
|
|
assert_non_null(poolw2_state);
|
|
assert_false(poolw2_stats.destroyed);
|
|
assert_int_equal(poolw2_stats.num_before, 1);
|
|
assert_int_equal(poolw2_stats.num_after, 1);
|
|
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
|
|
assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
|
|
|
|
/*
|
|
* Now we create jobs 8
|
|
* On a new wrapper pool
|
|
* We destroy the main pool while it's executing.
|
|
*/
|
|
|
|
state->job8 = test_cancel_job_create(state);
|
|
assert_non_null(state->job8);
|
|
|
|
ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
|
|
assert_int_equal(ret, 0);
|
|
|
|
state->job8->fdm = fdpair[0];
|
|
state->job8->fdj = fdpair[1];
|
|
|
|
state->job8->req = pthreadpool_tevent_job_send(
|
|
state->job8, ev, poolw2, test_cancel_job_fn, state->job8);
|
|
assert_non_null(state->job8->req);
|
|
tevent_req_set_callback(state->job8->req,
|
|
test_cancel_job_done,
|
|
state->job8);
|
|
|
|
/*
|
|
* Wait for the job 8 to start
|
|
*/
|
|
ret = read(state->job8->fdm, &c, 1);
|
|
assert_int_equal(ret, 1);
|
|
|
|
assert_false(poolw2_stats.destroyed);
|
|
assert_int_equal(poolw2_stats.num_before, 2);
|
|
assert_int_equal(poolw2_stats.num_after, 1);
|
|
|
|
/*
|
|
* destroy the request so that it's marked
|
|
* as orphaned.
|
|
*/
|
|
TALLOC_FREE(t->opool);
|
|
|
|
/*
|
|
* Wait until the job finished.
|
|
*/
|
|
ret = read(state->job8->fdm, &c, 1);
|
|
assert_int_equal(ret, 0);
|
|
|
|
assert_null(poolw2_state);
|
|
assert_true(poolw2_stats.destroyed);
|
|
assert_int_equal(poolw2_stats.num_before, 2);
|
|
assert_int_equal(poolw2_stats.num_after, 1);
|
|
|
|
/*
|
|
* Job 8 is still dangling arround.
|
|
*
|
|
* We need to convince valgrind --tool={drd,helgrind}
|
|
* that the read above is good enough to be
|
|
* sure the job is finished and closed the other end of
|
|
* the socketpair.
|
|
*/
|
|
ANNOTATE_BENIGN_RACE_SIZED(state->job8,
|
|
sizeof(*state->job8),
|
|
"protected by thread fence");
|
|
assert_non_null(state->job8->req);
|
|
assert_true(tevent_req_is_in_progress(state->job8->req));
|
|
assert_false(state->job8->completed);
|
|
assert_true(state->job8->started);
|
|
assert_true(state->job8->finished);
|
|
assert_false(state->job8->canceled);
|
|
assert_true(state->job8->orphaned);
|
|
assert_in_range(state->job8->polls, 1, 100);
|
|
assert_int_equal(state->job8->timeouts, state->job4->polls);
|
|
|
|
/*
|
|
* Now check that adding a new job to the dangling
|
|
* wrapper gives an error.
|
|
*/
|
|
state->job9 = test_cancel_job_create(state);
|
|
assert_non_null(state->job9);
|
|
|
|
state->job9->req = pthreadpool_tevent_job_send(
|
|
state->job9, ev, poolw2, test_cancel_job_fn, state->job9);
|
|
assert_non_null(state->job9->req);
|
|
tevent_req_set_callback(state->job9->req,
|
|
test_cancel_job_done,
|
|
state->job9);
|
|
|
|
/*
|
|
* Job 9 completes, But with a failure.
|
|
*/
|
|
test_cancel_job_wait(state->job9, ev);
|
|
assert_int_equal(state->job9->ret, EINVAL);
|
|
assert_null(state->job9->req);
|
|
assert_false(state->job9->started);
|
|
assert_false(state->job9->finished);
|
|
assert_false(state->job9->canceled);
|
|
assert_false(state->job9->orphaned);
|
|
assert_int_equal(state->job9->polls, 0);
|
|
assert_int_equal(state->job9->timeouts, 0);
|
|
|
|
TALLOC_FREE(state);
|
|
}
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
const struct CMUnitTest tests[] = {
|
|
cmocka_unit_test_setup_teardown(test_create,
|
|
setup_pthreadpool_tevent,
|
|
teardown_pthreadpool_tevent),
|
|
cmocka_unit_test_setup_teardown(test_per_thread_cwd,
|
|
setup_pthreadpool_tevent,
|
|
teardown_pthreadpool_tevent),
|
|
cmocka_unit_test_setup_teardown(test_cancel_job,
|
|
setup_pthreadpool_tevent,
|
|
teardown_pthreadpool_tevent),
|
|
cmocka_unit_test_setup_teardown(test_wrap_cancel_job,
|
|
setup_pthreadpool_tevent,
|
|
teardown_pthreadpool_tevent),
|
|
};
|
|
|
|
cmocka_set_message_output(CM_OUTPUT_SUBUNIT);
|
|
|
|
return cmocka_run_group_tests(tests, NULL, NULL);
|
|
}
|