1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-27 14:04:05 +03:00

ldb: Lock the whole backend database for the duration of a search

We must hold locks not just for the duration of each search, but for the whole search
as our module stack may make multiple search requests to build up the whole result.

This is explains a number of replication and read corruption issues in Samba

Pair-Programmed-With: Stefan Metzmacher <metze@samba.org>

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Signed-off-by: Stefan Metzmacher <metze@samba.org>
This commit is contained in:
Andrew Bartlett 2017-06-15 13:56:46 +12:00 committed by Stefan Metzmacher
parent d3c0df35c9
commit a8958515f0
2 changed files with 160 additions and 9 deletions

View File

@ -966,6 +966,146 @@ static int ldb_msg_check_element_flags(struct ldb_context *ldb,
return LDB_SUCCESS;
}
/*
* This context allows us to make the unlock be a talloc destructor
*
* This ensures that a request started, but not waited on, will still
* unlock.
*/
struct ldb_db_lock_context {
struct ldb_request *req;
struct ldb_context *ldb;
};
/*
* We have to have a the unlock on a destructor so that we unlock the
* DB if a caller calls talloc_free(req). We trust that the ldb
* context has not already gone away.
*/
static int ldb_db_lock_destructor(struct ldb_db_lock_context *lock_context)
{
int ret;
struct ldb_module *next_module;
FIRST_OP_NOERR(lock_context->ldb, read_unlock);
if (next_module != NULL) {
ret = next_module->ops->read_unlock(next_module);
} else {
ret = LDB_SUCCESS;
}
if (ret != LDB_SUCCESS) {
ldb_debug(lock_context->ldb,
LDB_DEBUG_FATAL,
"Failed to unlock db: %s / %s",
ldb_errstring(lock_context->ldb),
ldb_strerror(ret));
}
return 0;
}
static int ldb_lock_backend_callback(struct ldb_request *req,
struct ldb_reply *ares)
{
struct ldb_db_lock_context *lock_context;
int ret;
lock_context = talloc_get_type(req->context,
struct ldb_db_lock_context);
if (!ares) {
return ldb_module_done(lock_context->req, NULL, NULL,
LDB_ERR_OPERATIONS_ERROR);
}
if (ares->error != LDB_SUCCESS || ares->type == LDB_REPLY_DONE) {
ret = ldb_module_done(lock_context->req, ares->controls,
ares->response, ares->error);
/*
* If this is a LDB_REPLY_DONE or an error, unlock the
* DB by calling the destructor on this context
*/
talloc_free(lock_context);
return ret;
}
/* Otherwise pass on the callback */
switch (ares->type) {
case LDB_REPLY_ENTRY:
return ldb_module_send_entry(lock_context->req, ares->message,
ares->controls);
case LDB_REPLY_REFERRAL:
return ldb_module_send_referral(lock_context->req,
ares->referral);
default:
/* Can't happen */
return LDB_ERR_OPERATIONS_ERROR;
}
}
/*
* Do an ldb_search() with a lock held, but release it if the request
* is freed with talloc_free()
*/
static int lock_search(struct ldb_module *lock_module, struct ldb_request *req)
{
/* Used in FIRST_OP_NOERR to find where to send the lock request */
struct ldb_module *next_module = NULL;
struct ldb_request *down_req = NULL;
struct ldb_db_lock_context *lock_context;
struct ldb_context *ldb = ldb_module_get_ctx(lock_module);
int ret;
lock_context = talloc(req, struct ldb_db_lock_context);
if (lock_context == NULL) {
return ldb_oom(ldb);
}
lock_context->ldb = ldb;
lock_context->req = req;
ret = ldb_build_search_req_ex(&down_req, ldb, req,
req->op.search.base,
req->op.search.scope,
req->op.search.tree,
req->op.search.attrs,
req->controls,
lock_context,
ldb_lock_backend_callback,
req);
LDB_REQ_SET_LOCATION(down_req);
if (ret != LDB_SUCCESS) {
return ret;
}
/* call DB lock */
FIRST_OP_NOERR(ldb, read_lock);
if (next_module != NULL) {
ret = next_module->ops->read_lock(next_module);
} else {
ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
}
if (ret == LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION) {
/* We might be talking LDAP */
ldb_reset_err_string(ldb);
ret = 0;
TALLOC_FREE(lock_context);
return ldb_next_request(lock_module, req);
} else if ((ret != LDB_SUCCESS) && (ldb->err_string == NULL)) {
/* if no error string was setup by the backend */
ldb_asprintf_errstring(ldb, "Failed to get DB lock: %s (%d)",
ldb_strerror(ret), ret);
} else {
talloc_set_destructor(lock_context, ldb_db_lock_destructor);
}
if (ret != LDB_SUCCESS) {
return ret;
}
return ldb_next_request(lock_module, down_req);
}
/*
start an ldb request
@ -991,15 +1131,32 @@ int ldb_request(struct ldb_context *ldb, struct ldb_request *req)
/* call the first module in the chain */
switch (req->operation) {
case LDB_SEARCH:
{
/*
* A fake module to allow ldb_next_request() to be
* re-used and to keep the locking out of this function.
*/
static const struct ldb_module_ops lock_module_ops = {
.name = "lock_searches",
.search = lock_search
};
struct ldb_module lock_module = {
.ldb = ldb,
.next = ldb->modules,
.ops = &lock_module_ops
};
next_module = &lock_module;
/* due to "ldb_build_search_req" base DN always != NULL */
if (!ldb_dn_validate(req->op.search.base)) {
ldb_asprintf_errstring(ldb, "ldb_search: invalid basedn '%s'",
ldb_dn_get_linearized(req->op.search.base));
return LDB_ERR_INVALID_DN_SYNTAX;
}
FIRST_OP(ldb, search);
ret = next_module->ops->search(next_module, req);
break;
}
case LDB_ADD:
if (!ldb_dn_validate(req->op.add.message->dn)) {
ldb_asprintf_errstring(ldb, "ldb_add: invalid dn '%s'",

View File

@ -1991,14 +1991,8 @@ static int test_ldb_modify_during_whole_search_callback1(struct ldb_request *req
ldb_request_done(req, LDB_SUCCESS);
assert_int_equal(ret, 0);
/*
* We got the result because of this
* tests wants to demonstrate.
*
* Once the bug is fixed, it should
* change to assert_int_equal(res_count, 0);
*/
assert_int_equal(res_count, 1);
/* We should not have got the result */
assert_int_equal(res_count, 0);
return ret;
}