1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-11 05:18:09 +03:00

ldb: <= and >= indexed searching

Full implementation of <= and >= indexed searching using iterate_range
backend operation.  Adds index_format_fn to ldb_schema_syntax so
requires an ABI version bump.  The function must be provided for any
type for which <= and >= indexing is required, and must return a
lexicographically ordered canonicalization of a value.  This causes
index entries to be written in correct order to the database, so
iterate_range on the index DNs can be used.

ldb_kv_index_key is modified to return an index DN with attribute name
but without value if an empty value is provided.  This is needed for
constructing keys that match the beginning or end of an index DN range.

Pair-programmed-with: Garming Sam <garming@catalyst.net.nz>

Signed-off-by: Aaron Haslett <aaronhaslett@catalyst.net.nz>
Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Aaron Haslett 2019-03-04 19:06:31 +13:00 committed by Andrew Bartlett
parent 1b5df44331
commit 9b3021b8d6
2 changed files with 326 additions and 17 deletions

View File

@ -376,7 +376,10 @@ typedef int (*ldb_attr_operator_t)(struct ldb_context *, enum ldb_parse_op opera
ldif_read_fn -> convert from ldif to binary format
ldif_write_fn -> convert from binary to ldif format
canonicalise_fn -> canonicalise a value, for use by indexing and dn construction
index_form_fn -> get lexicographically sorted format for index
comparison_fn -> compare two values
operator_fn -> override function for optimizing out unnecessary
calls to canonicalise_fn and comparison_fn
*/
struct ldb_schema_syntax {
@ -384,6 +387,7 @@ struct ldb_schema_syntax {
ldb_attr_handler_t ldif_read_fn;
ldb_attr_handler_t ldif_write_fn;
ldb_attr_handler_t canonicalise_fn;
ldb_attr_handler_t index_format_fn;
ldb_attr_comparison_t comparison_fn;
ldb_attr_operator_t operator_fn;
};

View File

@ -884,6 +884,19 @@ static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
const size_t min_data = 1;
const size_t min_key_length = additional_key_length
+ indx_len + num_separators + min_data;
struct ldb_val empty;
/*
* Accept a NULL value as a request for a key with no value. This is
* different from passing an empty value, which might be given
* significance by some canonicalise functions.
*/
bool empty_val = value == NULL;
if (empty_val) {
empty.length = 0;
empty.data = discard_const_p(unsigned char, "");
value = &empty;
}
if (attr[0] == '@') {
attr_for_dn = attr;
@ -903,21 +916,33 @@ static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
if (ap) {
*ap = a;
}
r = a->syntax->canonicalise_fn(ldb, ldb, value, &v);
if (r != LDB_SUCCESS) {
const char *errstr = ldb_errstring(ldb);
/* canonicalisation can be refused. For
example, a attribute that takes wildcards
will refuse to canonicalise if the value
contains a wildcard */
ldb_asprintf_errstring(ldb,
"Failed to create index "
"key for attribute '%s':%s%s%s",
attr, ldb_strerror(r),
(errstr?":":""),
(errstr?errstr:""));
talloc_free(attr_folded);
return NULL;
if (empty_val) {
v = *value;
} else {
ldb_attr_handler_t fn;
if (a->syntax->index_format_fn) {
fn = a->syntax->index_format_fn;
} else {
fn = a->syntax->canonicalise_fn;
}
r = fn(ldb, ldb, value, &v);
if (r != LDB_SUCCESS) {
const char *errstr = ldb_errstring(ldb);
/* canonicalisation can be refused. For
example, a attribute that takes wildcards
will refuse to canonicalise if the value
contains a wildcard */
ldb_asprintf_errstring(ldb,
"Failed to create "
"index key for "
"attribute '%s':%s%s%s",
attr, ldb_strerror(r),
(errstr?":":""),
(errstr?errstr:""));
talloc_free(attr_folded);
return NULL;
}
}
}
attr_len = strlen(attr_for_dn);
@ -1034,7 +1059,7 @@ static struct ldb_dn *ldb_kv_index_key(struct ldb_context *ldb,
}
}
if (v.data != value->data) {
if (v.data != value->data && !empty_val) {
talloc_free(v.data);
}
talloc_free(attr_folded);
@ -1615,6 +1640,280 @@ static int ldb_kv_index_dn_and(struct ldb_module *module,
return LDB_SUCCESS;
}
struct ldb_kv_ordered_index_context {
struct ldb_module *module;
int error;
struct dn_list *dn_list;
};
static int traverse_range_index(struct ldb_kv_private *ldb_kv,
struct ldb_val key,
struct ldb_val data,
void *state)
{
struct ldb_context *ldb;
struct ldb_kv_ordered_index_context *ctx =
(struct ldb_kv_ordered_index_context *)state;
struct ldb_module *module = ctx->module;
struct ldb_message_element *el = NULL;
struct ldb_message *msg = NULL;
int version;
size_t dn_array_size, additional_length;
unsigned int i;
ldb = ldb_module_get_ctx(module);
msg = ldb_msg_new(module);
ctx->error = ldb_unpack_data_only_attr_list_flags(ldb, &data,
msg,
NULL, 0,
LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC |
LDB_UNPACK_DATA_FLAG_NO_DN,
NULL);
if (ctx->error != LDB_SUCCESS) {
talloc_free(msg);
return ctx->error;
}
el = ldb_msg_find_element(msg, LDB_KV_IDX);
if (!el) {
talloc_free(msg);
return LDB_SUCCESS;
}
version = ldb_msg_find_attr_as_int(msg, LDB_KV_IDXVERSION, 0);
/*
* we avoid copying the strings by stealing the list. We have
* to steal msg onto el->values (which looks odd) because we
* asked for the memory to be allocated on msg, not on each
* value with LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC above
*/
if (version != LDB_KV_GUID_INDEXING_VERSION) {
/* This is quite likely during the DB startup
on first upgrade to using a GUID index */
ldb_debug_set(ldb_module_get_ctx(module),
LDB_DEBUG_ERROR, __location__
": Wrong GUID index version %d expected %d",
version, LDB_KV_GUID_INDEXING_VERSION);
talloc_free(msg);
ctx->error = LDB_ERR_OPERATIONS_ERROR;
return ctx->error;
}
if (el->num_values == 0) {
talloc_free(msg);
ctx->error = LDB_ERR_OPERATIONS_ERROR;
return ctx->error;
}
if ((el->values[0].length % LDB_KV_GUID_SIZE) != 0
|| el->values[0].length == 0) {
talloc_free(msg);
ctx->error = LDB_ERR_OPERATIONS_ERROR;
return ctx->error;
}
dn_array_size = talloc_array_length(ctx->dn_list->dn);
additional_length = el->values[0].length / LDB_KV_GUID_SIZE;
if (ctx->dn_list->count + additional_length < ctx->dn_list->count) {
talloc_free(msg);
ctx->error = LDB_ERR_OPERATIONS_ERROR;
return ctx->error;
}
if ((ctx->dn_list->count + additional_length) >= dn_array_size) {
size_t new_array_length;
if (dn_array_size * 2 < dn_array_size) {
talloc_free(msg);
ctx->error = LDB_ERR_OPERATIONS_ERROR;
return ctx->error;
}
new_array_length = MAX(ctx->dn_list->count + additional_length,
dn_array_size * 2);
ctx->dn_list->dn = talloc_realloc(ctx->dn_list,
ctx->dn_list->dn,
struct ldb_val,
new_array_length);
}
if (ctx->dn_list->dn == NULL) {
talloc_free(msg);
ctx->error = LDB_ERR_OPERATIONS_ERROR;
return ctx->error;
}
/*
* The actual data is on msg, due to
* LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC
*/
talloc_steal(ctx->dn_list->dn, msg);
for (i = 0; i < additional_length; i++) {
ctx->dn_list->dn[i + ctx->dn_list->count].data
= &el->values[0].data[i * LDB_KV_GUID_SIZE];
ctx->dn_list->dn[i + ctx->dn_list->count].length = LDB_KV_GUID_SIZE;
}
ctx->dn_list->count += additional_length;
talloc_free(msg->elements);
return LDB_SUCCESS;
}
static int ldb_kv_index_dn_ordered(struct ldb_module *module,
struct ldb_kv_private *ldb_kv,
const struct ldb_parse_tree *tree,
struct dn_list *list, bool ascending)
{
enum key_truncation truncation = KEY_NOT_TRUNCATED;
struct ldb_context *ldb = ldb_module_get_ctx(module);
struct ldb_val ldb_key = { 0 }, ldb_key2 = { 0 };
struct ldb_val start_key, end_key;
struct ldb_dn *key_dn = NULL;
const struct ldb_schema_attribute *a = NULL;
struct ldb_kv_ordered_index_context ctx;
int ret;
TALLOC_CTX *tmp_ctx = talloc_new(NULL);
if (!ldb_kv_is_indexed(module, ldb_kv, tree->u.comparison.attr)) {
return LDB_ERR_OPERATIONS_ERROR;
}
if (ldb_kv->cache->GUID_index_attribute == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
/* bail out if we're in a transaction, full search instead. */
if (ldb_kv->kv_ops->transaction_active(ldb_kv)) {
return LDB_ERR_OPERATIONS_ERROR;
}
if (ldb_kv->disallow_dn_filter &&
(ldb_attr_cmp(tree->u.comparison.attr, "dn") == 0)) {
/* in AD mode we do not support "(dn=...)" search filters */
list->dn = NULL;
list->count = 0;
return LDB_SUCCESS;
}
if (tree->u.comparison.attr[0] == '@') {
/* Do not allow a indexed search against an @ */
list->dn = NULL;
list->count = 0;
return LDB_SUCCESS;
}
a = ldb_schema_attribute_by_name(ldb, tree->u.comparison.attr);
/*
* If there's no index format function defined for this attr, then
* the lexicographic order in the database doesn't correspond to the
* attr's ordering, so we can't use the iterate_range op.
*/
if (a->syntax->index_format_fn == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
&tree->u.comparison.value,
NULL, &truncation);
if (!key_dn) {
return LDB_ERR_OPERATIONS_ERROR;
} else if (truncation == KEY_TRUNCATED) {
ldb_debug(ldb, LDB_DEBUG_WARNING,
__location__
": ordered index violation: key dn truncated: %s\n",
ldb_dn_get_linearized(key_dn));
return LDB_ERR_OPERATIONS_ERROR;
}
ldb_key = ldb_kv_key_dn(module, tmp_ctx, key_dn);
talloc_free(key_dn);
if (ldb_key.data == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
key_dn = ldb_kv_index_key(ldb, ldb_kv, tree->u.comparison.attr,
NULL, NULL, &truncation);
if (!key_dn) {
return LDB_ERR_OPERATIONS_ERROR;
} else if (truncation == KEY_TRUNCATED) {
ldb_debug(ldb, LDB_DEBUG_WARNING,
__location__
": ordered index violation: key dn truncated: %s\n",
ldb_dn_get_linearized(key_dn));
return LDB_ERR_OPERATIONS_ERROR;
}
ldb_key2 = ldb_kv_key_dn(module, tmp_ctx, key_dn);
talloc_free(key_dn);
if (ldb_key2.data == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
if (ascending) {
ldb_key2.data[ldb_key2.length-2]++;
start_key = ldb_key;
end_key = ldb_key2;
} else {
start_key = ldb_key2;
end_key = ldb_key;
}
ctx.module = module;
ctx.error = 0;
ctx.dn_list = list;
ctx.dn_list->count = 0;
ctx.dn_list->dn = talloc_zero_array(ctx.dn_list, struct ldb_val, 2);
ret = ldb_kv->kv_ops->iterate_range(ldb_kv, start_key, end_key,
traverse_range_index, &ctx);
if (ret != LDB_SUCCESS || ctx.error != LDB_SUCCESS) {
return LDB_ERR_OPERATIONS_ERROR;
}
TYPESAFE_QSORT(ctx.dn_list->dn, ctx.dn_list->count,
ldb_val_equal_exact_for_qsort);
talloc_free(tmp_ctx);
return LDB_SUCCESS;
}
static int ldb_kv_index_dn_greater(struct ldb_module *module,
struct ldb_kv_private *ldb_kv,
const struct ldb_parse_tree *tree,
struct dn_list *list)
{
return ldb_kv_index_dn_ordered(module,
ldb_kv,
tree,
list, true);
}
static int ldb_kv_index_dn_less(struct ldb_module *module,
struct ldb_kv_private *ldb_kv,
const struct ldb_parse_tree *tree,
struct dn_list *list)
{
return ldb_kv_index_dn_ordered(module,
ldb_kv,
tree,
list, false);
}
/*
return a list of matching objects using a one-level index
*/
@ -1760,9 +2059,15 @@ static int ldb_kv_index_dn(struct ldb_module *module,
ret = ldb_kv_index_dn_leaf(module, ldb_kv, tree, list);
break;
case LDB_OP_SUBSTRING:
case LDB_OP_GREATER:
ret = ldb_kv_index_dn_greater(module, ldb_kv, tree, list);
break;
case LDB_OP_LESS:
ret = ldb_kv_index_dn_less(module, ldb_kv, tree, list);
break;
case LDB_OP_SUBSTRING:
case LDB_OP_PRESENT:
case LDB_OP_APPROX:
case LDB_OP_EXTENDED: