1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-12 09:18:10 +03:00

s4-repl: keep a @REPLCHANGED object on each partition

This object tracks the highest uSN in each partition. It will be used
to allow us to efficiently detect changes in a partition for sending
DsReplicaSync messages to our replication partners.
This commit is contained in:
Andrew Tridgell 2009-09-13 14:24:08 +10:00
parent c3da2056ec
commit 73e380deec

View File

@ -48,6 +48,13 @@
struct replmd_private {
struct la_entry *la_list;
uint32_t num_ncs;
struct nc_entry {
struct ldb_dn *dn;
struct GUID guid;
uint64_t mod_usn;
struct dsdb_control_current_partition *p_ctrl;
} *ncs;
};
struct la_entry {
@ -71,6 +78,249 @@ struct replmd_replicated_request {
struct ldb_message *search_msg;
};
/*
initialise the module
allocate the private structure and build the list
of partition DNs for use by replmd_notify()
*/
static int replmd_init(struct ldb_module *module)
{
struct replmd_private *replmd_private;
struct ldb_context *ldb = ldb_module_get_ctx(module);
replmd_private = talloc_zero(module, struct replmd_private);
if (replmd_private == NULL) {
ldb_oom(ldb);
return LDB_ERR_OPERATIONS_ERROR;
}
ldb_module_set_private(module, replmd_private);
return ldb_next_init(module);
}
static int nc_compare(struct nc_entry *n1, struct nc_entry *n2)
{
return ldb_dn_compare(n1->dn, n2->dn);
}
/*
build the list of partition DNs for use by replmd_notify()
*/
static int replmd_load_NCs(struct ldb_module *module)
{
const char *attrs[] = { "namingContexts", NULL };
struct ldb_result *res = NULL;
int i, ret;
TALLOC_CTX *tmp_ctx;
struct ldb_context *ldb;
struct ldb_message_element *el;
struct replmd_private *replmd_private =
talloc_get_type(ldb_module_get_private(module), struct replmd_private);
if (replmd_private->ncs != NULL) {
return LDB_SUCCESS;
}
ldb = ldb_module_get_ctx(module);
tmp_ctx = talloc_new(module);
/* load the list of naming contexts */
ret = ldb_search(ldb, tmp_ctx, &res, ldb_dn_new(tmp_ctx, ldb, ""),
LDB_SCOPE_BASE, attrs, NULL);
if (ret != LDB_SUCCESS ||
res->count != 1) {
DEBUG(0,(__location__ ": Failed to load rootDSE\n"));
return LDB_ERR_OPERATIONS_ERROR;
}
el = ldb_msg_find_element(res->msgs[0], "namingContexts");
if (el == NULL) {
DEBUG(0,(__location__ ": Failed to load namingContexts\n"));
return LDB_ERR_OPERATIONS_ERROR;
}
replmd_private->num_ncs = el->num_values;
replmd_private->ncs = talloc_array(replmd_private, struct nc_entry,
replmd_private->num_ncs);
if (replmd_private->ncs == NULL) {
ldb_oom(ldb);
return LDB_ERR_OPERATIONS_ERROR;
}
for (i=0; i<replmd_private->num_ncs; i++) {
replmd_private->ncs[i].dn =
ldb_dn_from_ldb_val(replmd_private->ncs,
ldb, &el->values[i]);
replmd_private->ncs[i].mod_usn = 0;
}
talloc_free(res);
/* now find the GUIDs of each of those DNs */
for (i=0; i<replmd_private->num_ncs; i++) {
const char *attrs2[] = { "objectGUID", NULL };
ret = ldb_search(ldb, tmp_ctx, &res, replmd_private->ncs[i].dn,
LDB_SCOPE_BASE, attrs2, NULL);
if (ret != LDB_SUCCESS ||
res->count != 1) {
DEBUG(0,(__location__ ": Failed to load GUID for %s\n",
ldb_dn_get_linearized(replmd_private->ncs[i].dn)));
return LDB_ERR_OPERATIONS_ERROR;
}
replmd_private->ncs[i].guid =
samdb_result_guid(res->msgs[0], "objectGUID");
talloc_free(res);
}
/* sort the NCs into order, most to least specific */
qsort(replmd_private->ncs, replmd_private->num_ncs,
sizeof(replmd_private->ncs[0]), QSORT_CAST nc_compare);
/* pre-create the partition control used in
replmd_notify_store() */
for (i=0; i<replmd_private->num_ncs; i++) {
replmd_private->ncs[i].p_ctrl = talloc(replmd_private->ncs,
struct dsdb_control_current_partition);
if (replmd_private->ncs[i].p_ctrl == NULL) {
ldb_oom(ldb);
return LDB_ERR_OPERATIONS_ERROR;
}
replmd_private->ncs[i].p_ctrl->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
replmd_private->ncs[i].p_ctrl->dn = replmd_private->ncs[i].dn;
}
talloc_free(tmp_ctx);
return LDB_SUCCESS;
}
/*
* notify the repl task that a object has changed. The notifies are
* gathered up in the replmd_private structure then written to the
* @REPLCHANGED object in each partition during the prepare_commit
*/
static int replmd_notify(struct ldb_module *module, struct ldb_dn *dn, uint64_t uSN)
{
int ret, i;
struct replmd_private *replmd_private =
talloc_get_type(ldb_module_get_private(module), struct replmd_private);
ret = replmd_load_NCs(module);
if (ret != LDB_SUCCESS) {
return ret;
}
for (i=0; i<replmd_private->num_ncs; i++) {
if (ldb_dn_compare_base(replmd_private->ncs[i].dn, dn) == 0) {
break;
}
}
if (i == replmd_private->num_ncs) {
DEBUG(0,(__location__ ": DN not within known NCs '%s'\n",
ldb_dn_get_linearized(dn)));
return LDB_ERR_OPERATIONS_ERROR;
}
if (uSN > replmd_private->ncs[i].mod_usn) {
replmd_private->ncs[i].mod_usn = uSN;
}
return LDB_SUCCESS;
}
/*
* update a @REPLCHANGED record in each partition if there have been
* any writes of replicated data in the partition
*/
static int replmd_notify_store(struct ldb_module *module)
{
int ret, i;
struct replmd_private *replmd_private =
talloc_get_type(ldb_module_get_private(module), struct replmd_private);
struct ldb_context *ldb = ldb_module_get_ctx(module);
for (i=0; i<replmd_private->num_ncs; i++) {
struct ldb_message *msg;
struct ldb_request *req;
if (replmd_private->ncs[i].mod_usn == 0) {
/* this partition has not changed in this
transaction */
continue;
}
msg = ldb_msg_new(module);
if (msg == NULL) {
ldb_oom(ldb);
return LDB_ERR_OPERATIONS_ERROR;
}
msg->dn = ldb_dn_new(msg, ldb, "@REPLCHANGED");
if (msg->dn == NULL) {
ldb_oom(ldb);
talloc_free(msg);
return LDB_ERR_OPERATIONS_ERROR;
}
ret = ldb_msg_add_fmt(msg, "uSNHighest", "%llu",
(unsigned long long)replmd_private->ncs[i].mod_usn);
if (ret != LDB_SUCCESS) {
talloc_free(msg);
return ret;
}
msg->elements[0].flags = LDB_FLAG_MOD_REPLACE;
ret = ldb_build_mod_req(&req, ldb, msg,
msg,
NULL,
NULL, ldb_op_default_callback,
NULL);
again:
if (ret != LDB_SUCCESS) {
talloc_free(msg);
return ret;
}
ret = ldb_request_add_control(req,
DSDB_CONTROL_CURRENT_PARTITION_OID,
false, replmd_private->ncs[i].p_ctrl);
if (ret != LDB_SUCCESS) {
talloc_free(msg);
return ret;
}
/* Run the new request */
ret = ldb_next_request(module, req);
if (ret == LDB_SUCCESS) {
ret = ldb_wait(req->handle, LDB_WAIT_ALL);
}
if (ret == LDB_ERR_NO_SUCH_OBJECT) {
ret = ldb_build_add_req(&req, ldb, msg,
msg,
NULL,
NULL, ldb_op_default_callback,
NULL);
goto again;
}
talloc_free(msg);
if (ret != LDB_SUCCESS) {
return ret;
}
}
return LDB_SUCCESS;
}
/*
created a replmd_replicated_request context
*/
@ -458,6 +708,11 @@ static int replmd_add(struct ldb_module *module, struct ldb_request *req)
return ret;
}
ret = replmd_notify(module, msg->dn, seq_num);
if (ret != LDB_SUCCESS) {
return ret;
}
/* go on with the call chain */
return ldb_next_request(module, down_req);
}
@ -535,8 +790,8 @@ static int replmd_update_rpmd_element(struct ldb_context *ldb,
* object. This is needed for DRS replication, as the merge on the
* client is based on this object
*/
static int replmd_update_rpmd(struct ldb_context *ldb, struct ldb_message *msg,
uint64_t *seq_num)
static int replmd_update_rpmd(struct ldb_module *module,
struct ldb_message *msg, uint64_t *seq_num)
{
const struct ldb_val *omd_value;
enum ndr_err_code ndr_err;
@ -549,6 +804,9 @@ static int replmd_update_rpmd(struct ldb_context *ldb, struct ldb_message *msg,
int ret;
const char *attrs[] = { "replPropertyMetaData" , NULL };
struct ldb_result *res;
struct ldb_context *ldb;
ldb = ldb_module_get_ctx(module);
our_invocation_id = samdb_ntds_invocation_id(ldb);
if (!our_invocation_id) {
@ -632,6 +890,11 @@ static int replmd_update_rpmd(struct ldb_context *ldb, struct ldb_message *msg,
return ret;
}
ret = replmd_notify(module, msg->dn, *seq_num);
if (ret != LDB_SUCCESS) {
return ret;
}
el->num_values = 1;
el->values = md_value;
}
@ -692,7 +955,7 @@ static int replmd_modify(struct ldb_module *module, struct ldb_request *req)
* attribute was changed
*/
ret = replmd_update_rpmd(ldb, msg, &seq_num);
ret = replmd_update_rpmd(module, msg, &seq_num);
if (ret != LDB_SUCCESS) {
return ret;
}
@ -830,6 +1093,11 @@ static int replmd_replicated_apply_add(struct replmd_replicated_request *ar)
return replmd_replicated_request_error(ar, ret);
}
ret = replmd_notify(ar->module, msg->dn, seq_num);
if (ret != LDB_SUCCESS) {
return replmd_replicated_request_error(ar, ret);
}
/*
* the meta data array is already sorted by the caller
*/
@ -1106,6 +1374,11 @@ static int replmd_replicated_apply_merge(struct replmd_replicated_request *ar)
msg->elements[i].flags = LDB_FLAG_MOD_REPLACE;
}
ret = replmd_notify(ar->module, msg->dn, seq_num);
if (ret != LDB_SUCCESS) {
return replmd_replicated_request_error(ar, ret);
}
if (DEBUGLVL(4)) {
char *s = ldb_ldif_message_string(ldb, ar, LDB_CHANGETYPE_MODIFY, msg);
DEBUG(4, ("DRS replication modify message:\n%s\n", s));
@ -1685,18 +1958,20 @@ static int replmd_extended_replicated_objects(struct ldb_module *module, struct
for (i=0; i<ar->objs->linked_attributes_count; i++) {
struct la_entry *la_entry;
if (replmd_private == NULL) {
DEBUG(0,(__location__ ": repl_meta_data not called from within a transaction\n"));
return LDB_ERR_OPERATIONS_ERROR;
if (replmd_private->la_list) {
la_entry = talloc(replmd_private->la_list,
struct la_entry);
} else {
la_entry = talloc(replmd_private,
struct la_entry);
}
la_entry = talloc(replmd_private, struct la_entry);
if (la_entry == NULL) {
ldb_oom(ldb);
return LDB_ERR_OPERATIONS_ERROR;
}
la_entry->la = talloc(la_entry, struct drsuapi_DsReplicaLinkedAttribute);
if (la_entry->la == NULL) {
talloc_free(la_entry);
ldb_oom(ldb);
return LDB_ERR_OPERATIONS_ERROR;
}
@ -1892,15 +2167,16 @@ static int replmd_extended(struct ldb_module *module, struct ldb_request *req)
static int replmd_start_transaction(struct ldb_module *module)
{
/* create our private structure for this transaction */
int i;
struct replmd_private *replmd_private = talloc_get_type(ldb_module_get_private(module),
struct replmd_private);
talloc_free(replmd_private);
replmd_private = talloc(module, struct replmd_private);
if (replmd_private == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
talloc_free(replmd_private->la_list);
replmd_private->la_list = NULL;
ldb_module_set_private(module, replmd_private);
for (i=0; i<replmd_private->num_ncs; i++) {
replmd_private->ncs[i].mod_usn = 0;
}
return ldb_next_start_trans(module);
}
@ -1912,23 +2188,32 @@ static int replmd_prepare_commit(struct ldb_module *module)
{
struct replmd_private *replmd_private =
talloc_get_type(ldb_module_get_private(module), struct replmd_private);
struct la_entry *la;
struct la_entry *la, *prev;
int ret;
/* walk the list backwards, to do the first entry first, as we
* added the entries with DLIST_ADD() which puts them at the
* start of the list */
for (la = replmd_private->la_list; la && la->next; la=la->next) ;
for (; la; la=la->prev) {
int ret;
for (; la; la=prev) {
prev = la->prev;
DLIST_REMOVE(replmd_private->la_list, la);
ret = replmd_process_linked_attribute(module, la);
talloc_free(la);
if (ret != LDB_SUCCESS) {
return ret;
}
}
talloc_free(replmd_private);
ldb_module_set_private(module, NULL);
talloc_free(replmd_private->la_list);
replmd_private->la_list = NULL;
/* possibly change @REPLCHANGED */
ret = replmd_notify_store(module);
if (ret != LDB_SUCCESS) {
return ret;
}
return ldb_next_prepare_commit(module);
}
@ -1937,17 +2222,18 @@ static int replmd_del_transaction(struct ldb_module *module)
{
struct replmd_private *replmd_private =
talloc_get_type(ldb_module_get_private(module), struct replmd_private);
talloc_free(replmd_private);
ldb_module_set_private(module, NULL);
talloc_free(replmd_private->la_list);
replmd_private->la_list = NULL;
return ldb_next_del_trans(module);
}
_PUBLIC_ const struct ldb_module_ops ldb_repl_meta_data_module_ops = {
.name = "repl_meta_data",
.add = replmd_add,
.modify = replmd_modify,
.extended = replmd_extended,
.init_context = replmd_init,
.add = replmd_add,
.modify = replmd_modify,
.extended = replmd_extended,
.start_transaction = replmd_start_transaction,
.prepare_commit = replmd_prepare_commit,
.del_transaction = replmd_del_transaction,