From 198471f9edfb9da2ee5b54e60a46d208f58ca2e4 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet@samba.org>
Date: Fri, 26 Aug 2016 09:58:38 +1200
Subject: [PATCH] ldb: Avoid cost of talloc_free() for unmatched messages

Instead, we pay the cost of allocating a copy of the whole message once
and we pay the cost of allocating a "struct ldb_val" that will not be used
for each element in that message.

This differes from the approach of ldb_unpack_data_only_attr_list()
in that we need not allocate each value for a message that we do not
return, so is more efficient for large multi-valued attributes and
un-indexed or poorly indexed searches

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
---
 lib/ldb/ldb_tdb/ldb_index.c  |  10 ++-
 lib/ldb/ldb_tdb/ldb_search.c | 164 +++++++++++++++++++++++++----------
 lib/ldb/ldb_tdb/ldb_tdb.h    |   4 +-
 3 files changed, 125 insertions(+), 53 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_index.c b/lib/ldb/ldb_tdb/ldb_index.c
index 392b4a71d6c..6c238562970 100644
--- a/lib/ldb/ldb_tdb/ldb_index.c
+++ b/lib/ldb/ldb_tdb/ldb_index.c
@@ -931,6 +931,7 @@ static int ltdb_index_filter(const struct dn_list *dn_list,
 {
 	struct ldb_context *ldb;
 	struct ldb_message *msg;
+	struct ldb_message *filtered_msg;
 	unsigned int i;
 
 	ldb = ldb_module_get_ctx(ac->module);
@@ -951,7 +952,7 @@ static int ltdb_index_filter(const struct dn_list *dn_list,
 			return LDB_ERR_OPERATIONS_ERROR;
 		}
 
-		ret = ltdb_search_dn1(ac->module, dn, msg, 0);
+		ret = ltdb_search_dn1(ac->module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
 		talloc_free(dn);
 		if (ret == LDB_ERR_NO_SUCH_OBJECT) {
 			/* the record has disappeared? yes, this can happen */
@@ -977,14 +978,15 @@ static int ltdb_index_filter(const struct dn_list *dn_list,
 		}
 
 		/* filter the attributes that the user wants */
-		ret = ltdb_filter_attrs(msg, ac->attrs);
+		ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
+
+		talloc_free(msg);
 
 		if (ret == -1) {
-			talloc_free(msg);
 			return LDB_ERR_OPERATIONS_ERROR;
 		}
 
-		ret = ldb_module_send_entry(ac->req, msg, NULL);
+		ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
 		if (ret != LDB_SUCCESS) {
 			/* Regardless of success or failure, the msg
 			 * is the callbacks responsiblity, and should
diff --git a/lib/ldb/ldb_tdb/ldb_search.c b/lib/ldb/ldb_tdb/ldb_search.c
index 133e5d43f62..8dc93d68ace 100644
--- a/lib/ldb/ldb_tdb/ldb_search.c
+++ b/lib/ldb/ldb_tdb/ldb_search.c
@@ -385,81 +385,144 @@ int ltdb_add_attr_results(struct ldb_module *module,
 
 /*
   filter the specified list of attributes from a message
-  removing not requested attrs.
+  removing not requested attrs from the new message constructed.
+
+  The reason this makes a new message is that the old one may not be
+  individually allocated, which is what our callers expect.
+
  */
-int ltdb_filter_attrs(struct ldb_message *msg, const char * const *attrs)
+int ltdb_filter_attrs(TALLOC_CTX *mem_ctx,
+		      const struct ldb_message *msg, const char * const *attrs,
+		      struct ldb_message **filtered_msg)
 {
 	unsigned int i;
-	int keep_all = 0;
-	struct ldb_message_element *el2;
+	bool keep_all = false;
+	bool add_dn = false;
 	uint32_t num_elements;
+	uint32_t elements_size;
+	struct ldb_message *msg2;
+
+	msg2 = ldb_msg_new(mem_ctx);
+	if (msg2 == NULL) {
+		goto failed;
+	}
+
+	msg2->dn = ldb_dn_copy(msg2, msg->dn);
+	if (msg2->dn == NULL) {
+		goto failed;
+	}
 
 	if (attrs) {
 		/* check for special attrs */
 		for (i = 0; attrs[i]; i++) {
-			if (strcmp(attrs[i], "*") == 0) {
-				keep_all = 1;
+			int cmp = strcmp(attrs[i], "*");
+			if (cmp == 0) {
+				keep_all = true;
 				break;
 			}
-
-			if (ldb_attr_cmp(attrs[i], "distinguishedName") == 0) {
-				if (msg_add_distinguished_name(msg) != 0) {
-					return -1;
-				}
+			cmp = ldb_attr_cmp(attrs[i], "distinguishedName");
+			if (cmp == 0) {
+				add_dn = true;
 			}
 		}
 	} else {
-		keep_all = 1;
-	}
-	
-	if (keep_all) {
-		if (msg_add_distinguished_name(msg) != 0) {
-			return -1;
-		}
-		return 0;
+		keep_all = true;
 	}
 
-	el2 = talloc_array(msg, struct ldb_message_element, msg->num_elements);
-	if (el2 == NULL) {
-		return -1;
+	if (keep_all) {
+		add_dn = true;
+		elements_size = msg->num_elements + 1;
+
+	/* Shortcuts for the simple cases */
+	} else if (add_dn && i == 1) {
+		if (msg_add_distinguished_name(msg2) != 0) {
+			return -1;
+		}
+		*filtered_msg = msg2;
+		return 0;
+	} else if (i == 0) {
+		*filtered_msg = msg2;
+		return 0;
+
+	/* Otherwise we are copying at most as many element as we have attributes */
+	} else {
+		elements_size = i;
 	}
+
+	msg2->elements = talloc_array(msg2, struct ldb_message_element,
+				      elements_size);
+	if (msg2->elements == NULL) goto failed;
+
 	num_elements = 0;
 
 	for (i = 0; i < msg->num_elements; i++) {
+		struct ldb_message_element *el = &msg->elements[i];
+		struct ldb_message_element *el2 = &msg2->elements[num_elements];
 		unsigned int j;
-		int found = 0;
-		
-		for (j = 0; attrs[j]; j++) {
-			if (ldb_attr_cmp(msg->elements[i].name, attrs[j]) == 0) {
-				found = 1;
-				break;
+
+		if (keep_all == false) {
+			bool found = false;
+			for (j = 0; attrs[j]; j++) {
+				int cmp = ldb_attr_cmp(el->name, attrs[j]);
+				if (cmp == 0) {
+					found = true;
+					break;
+				}
+			}
+			if (found == false) {
+				continue;
 			}
 		}
+		*el2 = *el;
+		el2->name = talloc_strdup(msg2->elements, el->name);
+		if (el2->name == NULL) {
+			goto failed;
+		}
+		el2->values = talloc_array(msg2->elements, struct ldb_val, el->num_values);
+		if (el2->values == NULL) {
+			goto failed;
+		}
+		for (j=0;j<el->num_values;j++) {
+			el2->values[j] = ldb_val_dup(el2->values, &el->values[j]);
+			if (el2->values[j].data == NULL && el->values[j].length != 0) {
+				goto failed;
+			}
+		}
+		num_elements++;
 
-		if (found) {
-			el2[num_elements] = msg->elements[i];
-			talloc_steal(el2, el2[num_elements].name);
-			talloc_steal(el2, el2[num_elements].values);
-			num_elements++;
+		/* Pidginhole principle: we can't have more elements
+		 * than the number of attributes if they are unique in
+		 * the DB */
+		if (num_elements > elements_size) {
+			goto failed;
 		}
 	}
 
-	talloc_free(msg->elements);
+	msg2->num_elements = num_elements;
 
-	if (num_elements > 0) {
-		msg->elements = talloc_realloc(msg, el2, struct ldb_message_element,
-					       num_elements);
+	if (add_dn) {
+		if (msg_add_distinguished_name(msg2) != 0) {
+			return -1;
+		}
+	}
+
+	if (msg2->num_elements > 0) {
+		msg2->elements = talloc_realloc(msg2, msg2->elements,
+						struct ldb_message_element,
+						msg2->num_elements);
+		if (msg2->elements == NULL) {
+			return -1;
+		}
 	} else {
-		msg->elements = talloc_array(msg, struct ldb_message_element, 0);
-		talloc_free(el2);
-	}
-	if (msg->elements == NULL) {
-		return -1;
+		talloc_free(msg2->elements);
+		msg2->elements = NULL;
 	}
 
-	msg->num_elements = num_elements;
+	*filtered_msg = msg2;
 
 	return 0;
+failed:
+	return -1;
 }
 
 /*
@@ -469,13 +532,14 @@ static int search_func(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, voi
 {
 	struct ldb_context *ldb;
 	struct ltdb_context *ac;
-	struct ldb_message *msg;
+	struct ldb_message *msg, *filtered_msg;
 	const struct ldb_val val = {
 		.data = data.dptr,
 		.length = data.dsize,
 	};
 	int ret;
 	bool matched;
+	unsigned int nb_elements_in_db;
 
 	ac = talloc_get_type(state, struct ltdb_context);
 	ldb = ldb_module_get_ctx(ac->module);
@@ -492,7 +556,11 @@ static int search_func(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, voi
 	}
 
 	/* unpack the record */
-	ret = ldb_unpack_data(ldb, &val, msg);
+	ret = ldb_unpack_data_only_attr_list_flags(ldb, &val,
+						   msg,
+						   NULL, 0,
+						   LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC,
+						   &nb_elements_in_db);
 	if (ret == -1) {
 		talloc_free(msg);
 		ac->error = LDB_ERR_OPERATIONS_ERROR;
@@ -523,15 +591,15 @@ static int search_func(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, voi
 	}
 
 	/* filter the attributes that the user wants */
-	ret = ltdb_filter_attrs(msg, ac->attrs);
+	ret = ltdb_filter_attrs(ac, msg, ac->attrs, &filtered_msg);
+	talloc_free(msg);
 
 	if (ret == -1) {
-		talloc_free(msg);
 		ac->error = LDB_ERR_OPERATIONS_ERROR;
 		return -1;
 	}
 
-	ret = ldb_module_send_entry(ac->req, msg, NULL);
+	ret = ldb_module_send_entry(ac->req, filtered_msg, NULL);
 	if (ret != LDB_SUCCESS) {
 		ac->request_terminated = true;
 		/* the callback failed, abort the operation */
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index 7390a047b20..b5b78a99903 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -109,7 +109,9 @@ int ltdb_add_attr_results(struct ldb_module *module,
 			  const char * const attrs[], 
 			  unsigned int *count, 
 			  struct ldb_message ***res);
-int ltdb_filter_attrs(struct ldb_message *msg, const char * const *attrs);
+int ltdb_filter_attrs(TALLOC_CTX *mem_ctx,
+		      const struct ldb_message *msg, const char * const *attrs,
+		      struct ldb_message **filtered_msg);
 int ltdb_search(struct ltdb_context *ctx);
 
 /* The following definitions come from lib/ldb/ldb_tdb/ldb_tdb.c  */