1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-12 09:18:10 +03:00

ldb_tdb: Cope with key truncation

Modify the indexing code to handle a maximum key length, index keys
greater than the maximum length will be truncated to the maximum length.
And the unuque index code has been altered to handle multiple records
for the same index key.

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Gary Lockyer 2018-02-21 15:18:11 +13:00 committed by Andrew Bartlett
parent 5c1504b94d
commit 0918068997

View File

@ -165,13 +165,19 @@ struct ltdb_idxptr {
int error;
};
enum key_truncation {
KEY_NOT_TRUNCATED,
KEY_TRUNCATED,
};
static int ltdb_write_index_dn_guid(struct ldb_module *module,
const struct ldb_message *msg,
int add);
static int ltdb_index_dn_base_dn(struct ldb_module *module,
struct ltdb_private *ltdb,
struct ldb_dn *base_dn,
struct dn_list *dn_list);
struct dn_list *dn_list,
enum key_truncation *truncation);
static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
struct dn_list *list);
@ -183,6 +189,13 @@ static void ltdb_dn_list_sort(struct ltdb_private *ltdb,
#define LTDB_GUID_INDEXING_VERSION 3
static unsigned ltdb_max_key_length(struct ltdb_private *ltdb) {
if (ltdb->max_key_length == 0){
return UINT_MAX;
}
return ltdb->max_key_length;
}
/* enable the idxptr mode when transactions start */
int ltdb_index_transaction_start(struct ldb_module *module)
{
@ -423,7 +436,7 @@ normal_index:
return LDB_ERR_OPERATIONS_ERROR;
}
if (el->num_values != 1) {
if (el->num_values == 0) {
return LDB_ERR_OPERATIONS_ERROR;
}
@ -459,13 +472,16 @@ int ltdb_key_dn_from_idx(struct ldb_module *module,
{
struct ldb_context *ldb = ldb_module_get_ctx(module);
int ret;
int index = 0;
enum key_truncation truncation = KEY_NOT_TRUNCATED;
struct dn_list *list = talloc(mem_ctx, struct dn_list);
if (list == NULL) {
ldb_oom(ldb);
return LDB_ERR_OPERATIONS_ERROR;
}
ret = ltdb_index_dn_base_dn(module, ltdb, dn, list);
ret = ltdb_index_dn_base_dn(module, ltdb, dn, list, &truncation);
if (ret != LDB_SUCCESS) {
TALLOC_FREE(list);
return ret;
@ -475,7 +491,8 @@ int ltdb_key_dn_from_idx(struct ldb_module *module,
TALLOC_FREE(list);
return LDB_ERR_NO_SUCH_OBJECT;
}
if (list->count > 1) {
if (list->count > 1 && truncation == KEY_NOT_TRUNCATED) {
const char *dn_str = ldb_dn_get_linearized(dn);
ldb_asprintf_errstring(ldb_module_get_ctx(module),
__location__
@ -488,9 +505,81 @@ int ltdb_key_dn_from_idx(struct ldb_module *module,
return LDB_ERR_CONSTRAINT_VIOLATION;
}
if (list->count > 0 && truncation == KEY_TRUNCATED) {
/*
* DN key has been truncated, need to inspect the actual
* records to locate the actual DN
*/
int i;
index = -1;
for (i=0; i < list->count; i++) {
uint8_t guid_key[LTDB_GUID_KEY_SIZE];
TDB_DATA key = {
.dptr = guid_key,
.dsize = sizeof(guid_key)
};
const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
struct ldb_message *rec = ldb_msg_new(ldb);
if (rec == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
ret = ltdb_idx_to_key(module, ltdb,
ldb, &list->dn[i],
&key);
if (ret != LDB_SUCCESS) {
TALLOC_FREE(list);
TALLOC_FREE(rec);
return ret;
}
ret = ltdb_search_key(module, ltdb, key,
rec, flags);
if (key.dptr != guid_key) {
TALLOC_FREE(key.dptr);
}
if (ret == LDB_ERR_NO_SUCH_OBJECT) {
/*
* the record has disappeared?
* yes, this can happen
*/
TALLOC_FREE(rec);
continue;
}
if (ret != LDB_SUCCESS) {
/* an internal error */
TALLOC_FREE(rec);
TALLOC_FREE(list);
return LDB_ERR_OPERATIONS_ERROR;
}
/*
* We found the actual DN that we wanted from in the
* multiple values that matched the index
* (due to truncation), so return that.
*
*/
if (ldb_dn_compare(dn, rec->dn) == 0) {
index = i;
TALLOC_FREE(rec);
break;
}
}
/*
* We matched the index but the actual DN we wanted
* was not here.
*/
if (index == -1) {
TALLOC_FREE(list);
return LDB_ERR_NO_SUCH_OBJECT;
}
}
/* The tdb_key memory is allocated by the caller */
ret = ltdb_guid_to_key(module, ltdb,
&list->dn[0], tdb_key);
&list->dn[index], tdb_key);
TALLOC_FREE(list);
if (ret != LDB_SUCCESS) {
@ -743,7 +832,8 @@ int ltdb_index_transaction_cancel(struct ldb_module *module)
static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
struct ltdb_private *ltdb,
const char *attr, const struct ldb_val *value,
const struct ldb_schema_attribute **ap)
const struct ldb_schema_attribute **ap,
enum key_truncation *truncation)
{
struct ldb_dn *ret;
struct ldb_val v;
@ -752,6 +842,11 @@ static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
const char *attr_for_dn = NULL;
int r;
bool should_b64_encode;
unsigned max_key_length = ltdb_max_key_length(ltdb);
unsigned key_len = 0;
unsigned attr_len = 0;
unsigned indx_len = 0;
unsigned frmt_len = 0;
if (attr[0] == '@') {
attr_for_dn = attr;
@ -788,6 +883,8 @@ static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
return NULL;
}
}
attr_len = strlen(attr_for_dn);
indx_len = strlen(LTDB_INDEX);
/*
* We do not base 64 encode a DN in a key, it has already been
@ -812,18 +909,59 @@ static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
}
if (should_b64_encode) {
unsigned vstr_len = 0;
char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
if (!vstr) {
talloc_free(attr_folded);
return NULL;
}
ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%s", LTDB_INDEX,
attr_for_dn, vstr);
vstr_len = strlen(vstr);
key_len = 3 + indx_len + attr_len + vstr_len;
if (key_len > max_key_length) {
unsigned excess = key_len - max_key_length;
frmt_len = vstr_len - excess;
*truncation = KEY_TRUNCATED;
/*
* Truncated keys are placed in a separate key space
* from the non truncated keys
* Note: the double hash "##" is not a typo and
* indicates that the following value is base64 encoded
*/
ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s##%.*s",
LTDB_INDEX, attr_for_dn,
frmt_len, vstr);
} else {
frmt_len = vstr_len;
*truncation = KEY_NOT_TRUNCATED;
/*
* Note: the double colon "::" is not a typo and
* indicates that the following value is base64 encoded
*/
ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s::%.*s",
LTDB_INDEX, attr_for_dn,
frmt_len, vstr);
}
talloc_free(vstr);
} else {
ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s", LTDB_INDEX,
attr_for_dn,
(int)v.length, (char *)v.data);
key_len = 2 + indx_len + attr_len + (int)v.length;
if (key_len > max_key_length) {
unsigned excess = key_len - max_key_length;
frmt_len = v.length - excess;
*truncation = KEY_TRUNCATED;
/*
* Truncated keys are placed in a separate key space
* from the non truncated keys
*/
ret = ldb_dn_new_fmt(ldb, ldb, "%s#%s#%.*s",
LTDB_INDEX, attr_for_dn,
frmt_len, (char *)v.data);
} else {
frmt_len = v.length;
*truncation = KEY_NOT_TRUNCATED;
ret = ldb_dn_new_fmt(ldb, ldb, "%s:%s:%.*s",
LTDB_INDEX, attr_for_dn,
frmt_len, (char *)v.data);
}
}
if (v.data != value->data) {
@ -908,6 +1046,7 @@ static int ltdb_index_dn_simple(struct ldb_module *module,
struct ldb_context *ldb;
struct ldb_dn *dn;
int ret;
enum key_truncation truncation = KEY_NOT_TRUNCATED;
ldb = ldb_module_get_ctx(module);
@ -924,7 +1063,12 @@ static int ltdb_index_dn_simple(struct ldb_module *module,
search criterion */
dn = ltdb_index_key(ldb, ltdb,
tree->u.equality.attr,
&tree->u.equality.value, NULL);
&tree->u.equality.value, NULL, &truncation);
/*
* We ignore truncation here and allow multi-valued matches
* as ltdb_search_indexed will filter out the wrong one in
* ltdb_index_filter() which calls ldb_match_message().
*/
if (!dn) return LDB_ERR_OPERATIONS_ERROR;
ret = ltdb_dn_list_load(module, ltdb, dn, list);
@ -959,6 +1103,7 @@ static int ltdb_index_dn_leaf(struct ldb_module *module,
return LDB_SUCCESS;
}
if (ldb_attr_dn(tree->u.equality.attr) == 0) {
enum key_truncation truncation = KEY_NOT_TRUNCATED;
struct ldb_dn *dn
= ldb_dn_from_ldb_val(list,
ldb_module_get_ctx(module),
@ -977,7 +1122,13 @@ static int ltdb_index_dn_leaf(struct ldb_module *module,
* We can't call TALLOC_FREE(dn) as this must belong
* to list for the memory to remain valid.
*/
return ltdb_index_dn_base_dn(module, ltdb, dn, list);
return ltdb_index_dn_base_dn(module, ltdb, dn, list,
&truncation);
/*
* We ignore truncation here and allow multi-valued matches
* as ltdb_search_indexed will filter out the wrong one in
* ltdb_index_filter() which calls ldb_match_message().
*/
} else if ((ltdb->cache->GUID_index_attribute != NULL) &&
(ldb_attr_cmp(tree->u.equality.attr,
@ -1380,7 +1531,8 @@ static int ltdb_index_dn_attr(struct ldb_module *module,
struct ltdb_private *ltdb,
const char *attr,
struct ldb_dn *dn,
struct dn_list *list)
struct dn_list *list,
enum key_truncation *truncation)
{
struct ldb_context *ldb;
struct ldb_dn *key;
@ -1392,7 +1544,7 @@ static int ltdb_index_dn_attr(struct ldb_module *module,
/* work out the index key from the parent DN */
val.data = (uint8_t *)((uintptr_t)ldb_dn_get_casefold(dn));
val.length = strlen((char *)val.data);
key = ltdb_index_key(ldb, ltdb, attr, &val, NULL);
key = ltdb_index_key(ldb, ltdb, attr, &val, NULL, truncation);
if (!key) {
ldb_oom(ldb);
return LDB_ERR_OPERATIONS_ERROR;
@ -1417,12 +1569,14 @@ static int ltdb_index_dn_attr(struct ldb_module *module,
static int ltdb_index_dn_one(struct ldb_module *module,
struct ltdb_private *ltdb,
struct ldb_dn *parent_dn,
struct dn_list *list)
struct dn_list *list,
enum key_truncation *truncation)
{
/* Ensure we do not shortcut on intersection for this list */
list->strict = true;
return ltdb_index_dn_attr(module, ltdb,
LTDB_IDXONE, parent_dn, list);
LTDB_IDXONE, parent_dn, list, truncation);
}
/*
@ -1431,7 +1585,8 @@ static int ltdb_index_dn_one(struct ldb_module *module,
static int ltdb_index_dn_base_dn(struct ldb_module *module,
struct ltdb_private *ltdb,
struct ldb_dn *base_dn,
struct dn_list *dn_list)
struct dn_list *dn_list,
enum key_truncation *truncation)
{
const struct ldb_val *guid_val = NULL;
if (ltdb->cache->GUID_index_attribute == NULL) {
@ -1468,7 +1623,7 @@ static int ltdb_index_dn_base_dn(struct ldb_module *module,
}
return ltdb_index_dn_attr(module, ltdb,
LTDB_IDXDN, base_dn, dn_list);
LTDB_IDXDN, base_dn, dn_list, truncation);
}
/*
@ -1520,7 +1675,8 @@ static int ltdb_index_dn(struct ldb_module *module,
static int ltdb_index_filter(struct ltdb_private *ltdb,
const struct dn_list *dn_list,
struct ltdb_context *ac,
uint32_t *match_count)
uint32_t *match_count,
enum key_truncation scope_one_truncation)
{
struct ldb_context *ldb;
struct ldb_message *msg;
@ -1592,10 +1748,15 @@ static int ltdb_index_filter(struct ltdb_private *ltdb,
return LDB_ERR_OPERATIONS_ERROR;
}
/* We trust the index for SCOPE_ONELEVEL and SCOPE_BASE */
if ((ac->scope == LDB_SCOPE_ONELEVEL
&& ltdb->cache->one_level_indexes)
|| ac->scope == LDB_SCOPE_BASE) {
/*
* We trust the index for LDB_SCOPE_ONELEVEL
* unless the index key has been truncated.
*
* LDB_SCOPE_BASE is not passed in by our only caller.
*/
if (ac->scope == LDB_SCOPE_ONELEVEL
&& ltdb->cache->one_level_indexes
&& scope_one_truncation == KEY_NOT_TRUNCATED) {
ret = ldb_match_message(ldb, msg, ac->tree,
ac->scope, &matched);
} else {
@ -1668,6 +1829,7 @@ int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
struct dn_list *dn_list;
int ret;
enum ldb_scope index_scope;
enum key_truncation scope_one_truncation = KEY_NOT_TRUNCATED;
/* see if indexing is enabled */
if (!ltdb->cache->attribute_indexes &&
@ -1697,17 +1859,10 @@ int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
switch (index_scope) {
case LDB_SCOPE_BASE:
/*
* If we ever start to also load the index values for
* the tree, we must ensure we strictly intersect with
* this list, as we trust the BASE index
* The only caller will have filtered the operation out
* so we should never get here
*/
ret = ltdb_index_dn_base_dn(ac->module, ltdb,
ac->base, dn_list);
if (ret != LDB_SUCCESS) {
talloc_free(dn_list);
return ret;
}
break;
ldb_operr(ldb);
case LDB_SCOPE_ONELEVEL:
/*
@ -1715,7 +1870,8 @@ int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
* the tree, we must ensure we strictly intersect with
* this list, as we trust the ONELEVEL index
*/
ret = ltdb_index_dn_one(ac->module, ltdb, ac->base, dn_list);
ret = ltdb_index_dn_one(ac->module, ltdb, ac->base, dn_list,
&scope_one_truncation);
if (ret != LDB_SUCCESS) {
talloc_free(dn_list);
return ret;
@ -1784,7 +1940,19 @@ int ltdb_search_indexed(struct ltdb_context *ac, uint32_t *match_count)
break;
}
ret = ltdb_index_filter(ltdb, dn_list, ac, match_count);
/*
* It is critical that this function do the re-filter even
* on things found by the index as the index can over-match
* in cases of truncation (as well as when it decides it is
* not worth further filtering)
*
* If this changes, then the index code above would need to
* pass up a flag to say if any index was truncated during
* processing as the truncation here refers only to the
* SCOPE_ONELEVEL index.
*/
ret = ltdb_index_filter(ltdb, dn_list, ac, match_count,
scope_one_truncation);
talloc_free(dn_list);
return ret;
}
@ -1820,6 +1988,8 @@ static int ltdb_index_add1(struct ldb_module *module,
const struct ldb_schema_attribute *a;
struct dn_list *list;
unsigned alloc_len;
enum key_truncation truncation = KEY_TRUNCATED;
ldb = ldb_module_get_ctx(module);
@ -1829,7 +1999,7 @@ static int ltdb_index_add1(struct ldb_module *module,
}
dn_key = ltdb_index_key(ldb, ltdb,
el->name, &el->values[v_idx], &a);
el->name, &el->values[v_idx], &a, &truncation);
if (!dn_key) {
talloc_free(list);
return LDB_ERR_OPERATIONS_ERROR;
@ -1850,13 +2020,81 @@ static int ltdb_index_add1(struct ldb_module *module,
* messages.
*/
if (list->count > 0 &&
ldb_attr_cmp(el->name, LTDB_IDXDN) == 0) {
ldb_attr_cmp(el->name, LTDB_IDXDN) == 0 &&
truncation == KEY_NOT_TRUNCATED) {
talloc_free(list);
return LDB_ERR_CONSTRAINT_VIOLATION;
} else if (list->count > 0
&& ldb_attr_cmp(el->name, LTDB_IDXDN) == 0) {
/*
* At least one existing entry in the DN->GUID index, which
* arises when the DN indexes have been truncated
*
* So need to pull the DN's to check if it's really a duplicate
*/
int i;
for (i=0; i < list->count; i++) {
uint8_t guid_key[LTDB_GUID_KEY_SIZE];
TDB_DATA key = {
.dptr = guid_key,
.dsize = sizeof(guid_key)
};
const int flags = LDB_UNPACK_DATA_FLAG_NO_ATTRS;
struct ldb_message *rec = ldb_msg_new(ldb);
if (rec == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
ret = ltdb_idx_to_key(module, ltdb,
ldb, &list->dn[i],
&key);
if (ret != LDB_SUCCESS) {
TALLOC_FREE(list);
TALLOC_FREE(rec);
return ret;
}
ret = ltdb_search_key(module, ltdb, key,
rec, flags);
if (key.dptr != guid_key) {
TALLOC_FREE(key.dptr);
}
if (ret == LDB_ERR_NO_SUCH_OBJECT) {
/*
* the record has disappeared?
* yes, this can happen
*/
talloc_free(rec);
continue;
}
if (ret != LDB_SUCCESS) {
/* an internal error */
TALLOC_FREE(rec);
TALLOC_FREE(list);
return LDB_ERR_OPERATIONS_ERROR;
}
/*
* The DN we are trying to add to the DB and index
* is already here, so we must deny the addition
*/
if (ldb_dn_compare(msg->dn, rec->dn) == 0) {
TALLOC_FREE(rec);
TALLOC_FREE(list);
return LDB_ERR_CONSTRAINT_VIOLATION;
}
}
}
/*
* Check for duplicates in unique indexes
*
* We don't need to do a loop test like the @IDXDN case
* above as we have a ban on long unique index values
* at the start of this function.
*/
if (list->count > 0 &&
((a != NULL
@ -2253,6 +2491,7 @@ int ltdb_index_del_value(struct ldb_module *module,
unsigned int j;
struct dn_list *list;
struct ldb_dn *dn = msg->dn;
enum key_truncation truncation = KEY_NOT_TRUNCATED;
ldb = ldb_module_get_ctx(module);
@ -2266,7 +2505,14 @@ int ltdb_index_del_value(struct ldb_module *module,
}
dn_key = ltdb_index_key(ldb, ltdb,
el->name, &el->values[v_idx], NULL);
el->name, &el->values[v_idx],
NULL, &truncation);
/*
* We ignore key truncation in ltdb_index_add1() so
* match that by ignoring it here as well
*
* Multiple values are legitimate and accepted
*/
if (!dn_key) {
return LDB_ERR_OPERATIONS_ERROR;
}
@ -2290,6 +2536,9 @@ int ltdb_index_del_value(struct ldb_module *module,
return ret;
}
/*
* Find one of the values matching this message to remove
*/
i = ltdb_dn_list_find_msg(ltdb, list, msg);
if (i == -1) {
/* nothing to delete */