1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-02 09:47:23 +03:00

lib ldb key value: add nested transaction support.

Use the nested transaction support added to the key value back ends to
make key value operations atomic. This will ensure that rename
operation failures, which delete the original record and add a new
record, leave the database in a consistent state.

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Gary Lockyer 2019-03-07 10:37:18 +13:00 committed by Andrew Bartlett
parent d7dc4fde40
commit 565341baf5
2 changed files with 179 additions and 16 deletions

View File

@ -469,6 +469,81 @@ static bool ldb_kv_single_valued(const struct ldb_schema_attribute *a,
return false;
}
/*
* Place holder actual implementation to be added in subsequent commits
*/
int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
{
return LDB_SUCCESS;
}
/*
* Starts a sub transaction if they are supported by the backend
*/
static int ldb_kv_sub_transaction_start(struct ldb_kv_private *ldb_kv)
{
int ret = LDB_SUCCESS;
ret = ldb_kv->kv_ops->begin_nested_write(ldb_kv);
if (ret == LDB_SUCCESS) {
ret = ldb_kv_index_sub_transaction_start(ldb_kv);
}
return ret;
}
/*
* Place holder actual implementation to be added in subsequent commits
*/
int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
{
return LDB_SUCCESS;
}
/*
* Commits a sub transaction if they are supported by the backend
*/
static int ldb_kv_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
{
int ret = LDB_SUCCESS;
ret = ldb_kv_index_sub_transaction_commit(ldb_kv);
if (ret != LDB_SUCCESS) {
return ret;
}
ret = ldb_kv->kv_ops->finish_nested_write(ldb_kv);
return ret;
}
/*
* Place holder actual implementation to be added in subsequent commits
*/
int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
{
return LDB_SUCCESS;
}
/*
* Cancels a sub transaction if they are supported by the backend
*/
static int ldb_kv_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
{
int ret = LDB_SUCCESS;
ret = ldb_kv_index_sub_transaction_cancel(ldb_kv);
if (ret != LDB_SUCCESS) {
struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
/*
* In the event of a failure we log the failure and continue
* as we need to cancel the database transaction.
*/
ldb_debug(ldb,
LDB_DEBUG_ERROR,
__location__": ldb_kv_index_sub_transaction_cancel "
"failed: %s",
ldb_errstring(ldb));
}
ret = ldb_kv->kv_ops->abort_nested_write(ldb_kv);
return ret;
}
static int ldb_kv_add_internal(struct ldb_module *module,
struct ldb_kv_private *ldb_kv,
const struct ldb_message *msg,
@ -615,7 +690,23 @@ static int ldb_kv_add(struct ldb_kv_context *ctx)
return LDB_ERR_OPERATIONS_ERROR;
}
ret = ldb_kv_sub_transaction_start(ldb_kv);
if (ret != LDB_SUCCESS) {
return ret;
}
ret = ldb_kv_add_internal(module, ldb_kv, req->op.add.message, true);
if (ret != LDB_SUCCESS) {
int r = ldb_kv_sub_transaction_cancel(ldb_kv);
if (r != LDB_SUCCESS) {
ldb_debug(
ldb_module_get_ctx(module),
LDB_DEBUG_FATAL,
__location__
": Unable to roll back sub transaction");
}
return ret;
}
ret = ldb_kv_sub_transaction_commit(ldb_kv);
return ret;
}
@ -705,6 +796,9 @@ static int ldb_kv_delete(struct ldb_kv_context *ctx)
{
struct ldb_module *module = ctx->module;
struct ldb_request *req = ctx->req;
void *data = ldb_module_get_private(module);
struct ldb_kv_private *ldb_kv =
talloc_get_type(data, struct ldb_kv_private);
int ret = LDB_SUCCESS;
ldb_request_set_state(req, LDB_ASYNC_PENDING);
@ -713,7 +807,23 @@ static int ldb_kv_delete(struct ldb_kv_context *ctx)
return LDB_ERR_OPERATIONS_ERROR;
}
ret = ldb_kv_sub_transaction_start(ldb_kv);
if (ret != LDB_SUCCESS) {
return ret;
}
ret = ldb_kv_delete_internal(module, req->op.del.dn);
if (ret != LDB_SUCCESS) {
int r = ldb_kv_sub_transaction_cancel(ldb_kv);
if (r != LDB_SUCCESS) {
ldb_debug(
ldb_module_get_ctx(module),
LDB_DEBUG_FATAL,
__location__
": Unable to roll back sub transaction");
}
return ret;
}
ret = ldb_kv_sub_transaction_commit(ldb_kv);
return ret;
}
@ -1237,6 +1347,9 @@ static int ldb_kv_modify(struct ldb_kv_context *ctx)
{
struct ldb_module *module = ctx->module;
struct ldb_request *req = ctx->req;
void *data = ldb_module_get_private(module);
struct ldb_kv_private *ldb_kv =
talloc_get_type(data, struct ldb_kv_private);
int ret = LDB_SUCCESS;
ret = ldb_kv_check_special_dn(module, req->op.mod.message);
@ -1250,7 +1363,56 @@ static int ldb_kv_modify(struct ldb_kv_context *ctx)
return LDB_ERR_OPERATIONS_ERROR;
}
ret = ldb_kv_sub_transaction_start(ldb_kv);
if (ret != LDB_SUCCESS) {
return ret;
}
ret = ldb_kv_modify_internal(module, req->op.mod.message, req);
if (ret != LDB_SUCCESS) {
int r = ldb_kv_sub_transaction_cancel(ldb_kv);
if (r != LDB_SUCCESS) {
ldb_debug(
ldb_module_get_ctx(module),
LDB_DEBUG_FATAL,
__location__
": Unable to roll back sub transaction");
}
return ret;
}
ret = ldb_kv_sub_transaction_commit(ldb_kv);
return ret;
}
static int ldb_kv_rename_internal(struct ldb_module *module,
struct ldb_request *req,
struct ldb_message *msg)
{
void *data = ldb_module_get_private(module);
struct ldb_kv_private *ldb_kv =
talloc_get_type(data, struct ldb_kv_private);
int ret = LDB_SUCCESS;
/* Always delete first then add, to avoid conflicts with
* unique indexes. We rely on the transaction to make this
* atomic
*/
ret = ldb_kv_delete_internal(module, msg->dn);
if (ret != LDB_SUCCESS) {
return ret;
}
msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
if (msg->dn == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
/* We don't check single value as we can have more than 1 with
* deleted attributes. We could go through all elements but that's
* maybe not the most efficient way
*/
ret = ldb_kv_add_internal(module, ldb_kv, msg, false);
return ret;
}
@ -1355,28 +1517,26 @@ static int ldb_kv_rename(struct ldb_kv_context *ctx)
talloc_free(key_old.data);
talloc_free(key.data);
/* Always delete first then add, to avoid conflicts with
* unique indexes. We rely on the transaction to make this
* atomic
*/
ret = ldb_kv_delete_internal(module, msg->dn);
ret = ldb_kv_sub_transaction_start(ldb_kv);
if (ret != LDB_SUCCESS) {
talloc_free(msg);
return ret;
}
msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
if (msg->dn == NULL) {
ret = ldb_kv_rename_internal(module, req, msg);
if (ret != LDB_SUCCESS) {
int r = ldb_kv_sub_transaction_cancel(ldb_kv);
if (r != LDB_SUCCESS) {
ldb_debug(
ldb_module_get_ctx(module),
LDB_DEBUG_FATAL,
__location__
": Unable to roll back sub transaction");
}
talloc_free(msg);
return LDB_ERR_OPERATIONS_ERROR;
return ret;
}
/* We don't check single value as we can have more than 1 with
* deleted attributes. We could go through all elements but that's
* maybe not the most efficient way
*/
ret = ldb_kv_add_internal(module, ldb_kv, msg, false);
ret = ldb_kv_sub_transaction_commit(ldb_kv);
talloc_free(msg);
return ret;

View File

@ -304,4 +304,7 @@ int ldb_kv_init_store(struct ldb_kv_private *ldb_kv,
struct ldb_context *ldb,
const char *options[],
struct ldb_module **_module);
int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv);
int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv);
int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv);
#endif /* __LDB_KV_H__ */