1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-08 21:18:16 +03:00
samba-mirror/lib/ldb/ldb_tdb/ldb_tdb.c
Garming Sam a76d286537 ldb_kv: Avoid memdup of database records in the case of base searches
This makes LDAP bind significantly faster in the case of having many
members, due to large size of these records (with tens of thousands of
member links). During the nested group calculation, you are only
interested in memberOf not the member links.

(We add a bit-field to determine whether or not the backend actually
supports pointing into database memory. For some reason TDB pointers
aren't stable, so for now we set this option just on LMDB backends.)

Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
2019-04-11 04:17:11 +00:00

557 lines
14 KiB
C

/*
ldb database library
Copyright (C) Andrew Tridgell 2004
Copyright (C) Stefan Metzmacher 2004
Copyright (C) Simo Sorce 2006-2008
Copyright (C) Matthias Dieter Wallnöfer 2009-2010
** NOTE! The following LGPL license applies to the ldb
** library. This does NOT imply that all of Samba is released
** under the LGPL
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 3 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/
/*
* Name: ldb_tdb
*
* Component: ldb tdb backend
*
* Description: core functions for tdb backend
*
* Author: Andrew Tridgell
* Author: Stefan Metzmacher
*
* Modifications:
*
* - description: make the module use asynchronous calls
* date: Feb 2006
* Author: Simo Sorce
*
* - description: make it possible to use event contexts
* date: Jan 2008
* Author: Simo Sorce
*
* - description: fix up memory leaks and small bugs
* date: Oct 2009
* Author: Matthias Dieter Wallnöfer
*/
#include "ldb_tdb.h"
#include "ldb_private.h"
#include "../ldb_key_value/ldb_kv.h"
#include <tdb.h>
/*
lock the database for read - use by ltdb_search and ltdb_sequence_number
*/
static int ltdb_lock_read(struct ldb_module *module)
{
void *data = ldb_module_get_private(module);
struct ldb_kv_private *ldb_kv =
talloc_get_type(data, struct ldb_kv_private);
int tdb_ret = 0;
int ret;
pid_t pid = getpid();
if (ldb_kv->pid != pid) {
ldb_asprintf_errstring(ldb_module_get_ctx(module),
__location__
": Reusing ldb opend by pid %d in "
"process %d\n",
ldb_kv->pid,
pid);
return LDB_ERR_PROTOCOL_ERROR;
}
if (tdb_transaction_active(ldb_kv->tdb) == false &&
ldb_kv->read_lock_count == 0) {
tdb_ret = tdb_lockall_read(ldb_kv->tdb);
}
if (tdb_ret == 0) {
ldb_kv->read_lock_count++;
return LDB_SUCCESS;
}
ret = ltdb_err_map(tdb_error(ldb_kv->tdb));
if (ret == LDB_SUCCESS) {
ret = LDB_ERR_OPERATIONS_ERROR;
}
ldb_debug_set(ldb_module_get_ctx(module),
LDB_DEBUG_FATAL,
"Failure during ltdb_lock_read(): %s -> %s",
tdb_errorstr(ldb_kv->tdb),
ldb_strerror(ret));
return ret;
}
/*
unlock the database after a ltdb_lock_read()
*/
static int ltdb_unlock_read(struct ldb_module *module)
{
void *data = ldb_module_get_private(module);
struct ldb_kv_private *ldb_kv =
talloc_get_type(data, struct ldb_kv_private);
pid_t pid = getpid();
if (ldb_kv->pid != pid) {
ldb_asprintf_errstring(ldb_module_get_ctx(module),
__location__
": Reusing ldb opend by pid %d in "
"process %d\n",
ldb_kv->pid,
pid);
return LDB_ERR_PROTOCOL_ERROR;
}
if (!tdb_transaction_active(ldb_kv->tdb) &&
ldb_kv->read_lock_count == 1) {
tdb_unlockall_read(ldb_kv->tdb);
ldb_kv->read_lock_count--;
return 0;
}
ldb_kv->read_lock_count--;
return 0;
}
static int ltdb_store(struct ldb_kv_private *ldb_kv,
struct ldb_val ldb_key,
struct ldb_val ldb_data,
int flags)
{
TDB_DATA key = {
.dptr = ldb_key.data,
.dsize = ldb_key.length
};
TDB_DATA data = {
.dptr = ldb_data.data,
.dsize = ldb_data.length
};
bool transaction_active = tdb_transaction_active(ldb_kv->tdb);
if (transaction_active == false){
return LDB_ERR_PROTOCOL_ERROR;
}
return tdb_store(ldb_kv->tdb, key, data, flags);
}
static int ltdb_error(struct ldb_kv_private *ldb_kv)
{
return ltdb_err_map(tdb_error(ldb_kv->tdb));
}
static const char *ltdb_errorstr(struct ldb_kv_private *ldb_kv)
{
return tdb_errorstr(ldb_kv->tdb);
}
static int ltdb_delete(struct ldb_kv_private *ldb_kv, struct ldb_val ldb_key)
{
TDB_DATA tdb_key = {
.dptr = ldb_key.data,
.dsize = ldb_key.length
};
bool transaction_active = tdb_transaction_active(ldb_kv->tdb);
if (transaction_active == false){
return LDB_ERR_PROTOCOL_ERROR;
}
return tdb_delete(ldb_kv->tdb, tdb_key);
}
static int ltdb_transaction_start(struct ldb_kv_private *ldb_kv)
{
pid_t pid = getpid();
if (ldb_kv->pid != pid) {
ldb_asprintf_errstring(ldb_module_get_ctx(ldb_kv->module),
__location__
": Reusing ldb opend by pid %d in "
"process %d\n",
ldb_kv->pid,
pid);
return LDB_ERR_PROTOCOL_ERROR;
}
return tdb_transaction_start(ldb_kv->tdb);
}
static int ltdb_transaction_cancel(struct ldb_kv_private *ldb_kv)
{
pid_t pid = getpid();
if (ldb_kv->pid != pid) {
ldb_asprintf_errstring(ldb_module_get_ctx(ldb_kv->module),
__location__
": Reusing ldb opend by pid %d in "
"process %d\n",
ldb_kv->pid,
pid);
return LDB_ERR_PROTOCOL_ERROR;
}
return tdb_transaction_cancel(ldb_kv->tdb);
}
static int ltdb_transaction_prepare_commit(struct ldb_kv_private *ldb_kv)
{
pid_t pid = getpid();
if (ldb_kv->pid != pid) {
ldb_asprintf_errstring(ldb_module_get_ctx(ldb_kv->module),
__location__
": Reusing ldb opend by pid %d in "
"process %d\n",
ldb_kv->pid,
pid);
return LDB_ERR_PROTOCOL_ERROR;
}
return tdb_transaction_prepare_commit(ldb_kv->tdb);
}
static int ltdb_transaction_commit(struct ldb_kv_private *ldb_kv)
{
pid_t pid = getpid();
if (ldb_kv->pid != pid) {
ldb_asprintf_errstring(ldb_module_get_ctx(ldb_kv->module),
__location__
": Reusing ldb opend by pid %d in "
"process %d\n",
ldb_kv->pid,
pid);
return LDB_ERR_PROTOCOL_ERROR;
}
return tdb_transaction_commit(ldb_kv->tdb);
}
struct kv_ctx {
ldb_kv_traverse_fn kv_traverse_fn;
void *ctx;
struct ldb_kv_private *ldb_kv;
int (*parser)(struct ldb_val key,
struct ldb_val data,
void *private_data);
};
static int ltdb_traverse_fn_wrapper(struct tdb_context *tdb,
TDB_DATA tdb_key,
TDB_DATA tdb_data,
void *ctx)
{
struct kv_ctx *kv_ctx = ctx;
struct ldb_val key = {
.length = tdb_key.dsize,
.data = tdb_key.dptr,
};
struct ldb_val data = {
.length = tdb_data.dsize,
.data = tdb_data.dptr,
};
return kv_ctx->kv_traverse_fn(kv_ctx->ldb_kv, key, data, kv_ctx->ctx);
}
static int ltdb_traverse_fn(struct ldb_kv_private *ldb_kv,
ldb_kv_traverse_fn fn,
void *ctx)
{
struct kv_ctx kv_ctx = {
.kv_traverse_fn = fn, .ctx = ctx, .ldb_kv = ldb_kv};
if (tdb_transaction_active(ldb_kv->tdb)) {
return tdb_traverse(
ldb_kv->tdb, ltdb_traverse_fn_wrapper, &kv_ctx);
} else {
return tdb_traverse_read(
ldb_kv->tdb, ltdb_traverse_fn_wrapper, &kv_ctx);
}
}
static int ltdb_update_in_iterate(struct ldb_kv_private *ldb_kv,
struct ldb_val ldb_key,
struct ldb_val ldb_key2,
struct ldb_val ldb_data,
void *state)
{
int tdb_ret;
struct ldb_context *ldb;
struct ldb_kv_reindex_context *ctx =
(struct ldb_kv_reindex_context *)state;
struct ldb_module *module = ctx->module;
TDB_DATA key = {
.dptr = ldb_key.data,
.dsize = ldb_key.length
};
TDB_DATA key2 = {
.dptr = ldb_key2.data,
.dsize = ldb_key2.length
};
TDB_DATA data = {
.dptr = ldb_data.data,
.dsize = ldb_data.length
};
ldb = ldb_module_get_ctx(module);
tdb_ret = tdb_delete(ldb_kv->tdb, key);
if (tdb_ret != 0) {
ldb_debug(ldb,
LDB_DEBUG_ERROR,
"Failed to delete %*.*s "
"for rekey as %*.*s: %s",
(int)key.dsize,
(int)key.dsize,
(const char *)key.dptr,
(int)key2.dsize,
(int)key2.dsize,
(const char *)key.dptr,
tdb_errorstr(ldb_kv->tdb));
ctx->error = ltdb_err_map(tdb_error(ldb_kv->tdb));
return -1;
}
tdb_ret = tdb_store(ldb_kv->tdb, key2, data, 0);
if (tdb_ret != 0) {
ldb_debug(ldb,
LDB_DEBUG_ERROR,
"Failed to rekey %*.*s as %*.*s: %s",
(int)key.dsize,
(int)key.dsize,
(const char *)key.dptr,
(int)key2.dsize,
(int)key2.dsize,
(const char *)key.dptr,
tdb_errorstr(ldb_kv->tdb));
ctx->error = ltdb_err_map(tdb_error(ldb_kv->tdb));
return -1;
}
return tdb_ret;
}
static int ltdb_parse_record_wrapper(TDB_DATA tdb_key,
TDB_DATA tdb_data,
void *ctx)
{
struct kv_ctx *kv_ctx = ctx;
struct ldb_val key = {
.length = tdb_key.dsize,
.data = tdb_key.dptr,
};
struct ldb_val data = {
.length = tdb_data.dsize,
.data = tdb_data.dptr,
};
return kv_ctx->parser(key, data, kv_ctx->ctx);
}
static int ltdb_parse_record(struct ldb_kv_private *ldb_kv,
struct ldb_val ldb_key,
int (*parser)(struct ldb_val key,
struct ldb_val data,
void *private_data),
void *ctx)
{
struct kv_ctx kv_ctx = {.parser = parser, .ctx = ctx, .ldb_kv = ldb_kv};
TDB_DATA key = {
.dptr = ldb_key.data,
.dsize = ldb_key.length
};
int ret;
if (tdb_transaction_active(ldb_kv->tdb) == false &&
ldb_kv->read_lock_count == 0) {
return LDB_ERR_PROTOCOL_ERROR;
}
ret = tdb_parse_record(
ldb_kv->tdb, key, ltdb_parse_record_wrapper, &kv_ctx);
if (ret == 0) {
return LDB_SUCCESS;
}
return ltdb_err_map(tdb_error(ldb_kv->tdb));
}
static int ltdb_iterate_range(struct ldb_kv_private *ldb_kv,
struct ldb_val start_key,
struct ldb_val end_key,
ldb_kv_traverse_fn fn,
void *ctx)
{
/*
* We do not implement this operation because we do not know how to
* iterate from one key to the next (in a sorted fashion).
*
* We could mimic it potentially, but it would violate boundaries of
* knowledge (data type representation).
*/
return LDB_ERR_OPERATIONS_ERROR;
}
static const char *ltdb_name(struct ldb_kv_private *ldb_kv)
{
return tdb_name(ldb_kv->tdb);
}
static bool ltdb_changed(struct ldb_kv_private *ldb_kv)
{
int seq = tdb_get_seqnum(ldb_kv->tdb);
bool has_changed = (seq != ldb_kv->tdb_seqnum);
ldb_kv->tdb_seqnum = seq;
return has_changed;
}
static bool ltdb_transaction_active(struct ldb_kv_private *ldb_kv)
{
return tdb_transaction_active(ldb_kv->tdb);
}
/*
* Get an estimate of the number of records in a tdb database.
*
* This implementation will overestimate the number of records in a sparsely
* populated database. The size estimate is only used for allocating
* an in memory tdb to cache index records during a reindex, overestimating
* the contents is acceptable, and preferable to underestimating
*/
#define RECORD_SIZE 500
static size_t ltdb_get_size(struct ldb_kv_private *ldb_kv)
{
size_t map_size = tdb_map_size(ldb_kv->tdb);
size_t size = map_size / RECORD_SIZE;
return size;
}
static const struct kv_db_ops key_value_ops = {
/* No support for any additional features */
.options = 0,
.store = ltdb_store,
.delete = ltdb_delete,
.iterate = ltdb_traverse_fn,
.update_in_iterate = ltdb_update_in_iterate,
.fetch_and_parse = ltdb_parse_record,
.iterate_range = ltdb_iterate_range,
.lock_read = ltdb_lock_read,
.unlock_read = ltdb_unlock_read,
.begin_write = ltdb_transaction_start,
.prepare_write = ltdb_transaction_prepare_commit,
.finish_write = ltdb_transaction_commit,
.abort_write = ltdb_transaction_cancel,
.error = ltdb_error,
.errorstr = ltdb_errorstr,
.name = ltdb_name,
.has_changed = ltdb_changed,
.transaction_active = ltdb_transaction_active,
.get_size = ltdb_get_size,
};
/*
connect to the database
*/
int ltdb_connect(struct ldb_context *ldb, const char *url,
unsigned int flags, const char *options[],
struct ldb_module **_module)
{
const char *path;
int tdb_flags, open_flags;
struct ldb_kv_private *ldb_kv;
/*
* We hold locks, so we must use a private event context
* on each returned handle
*/
ldb_set_require_private_event_context(ldb);
/* parse the url */
if (strchr(url, ':')) {
if (strncmp(url, "tdb://", 6) != 0) {
ldb_debug(ldb, LDB_DEBUG_ERROR,
"Invalid tdb URL '%s'", url);
return LDB_ERR_OPERATIONS_ERROR;
}
path = url+6;
} else {
path = url;
}
tdb_flags = TDB_DEFAULT | TDB_SEQNUM | TDB_DISALLOW_NESTING;
/* check for the 'nosync' option */
if (flags & LDB_FLG_NOSYNC) {
tdb_flags |= TDB_NOSYNC;
}
/* and nommap option */
if (flags & LDB_FLG_NOMMAP) {
tdb_flags |= TDB_NOMMAP;
}
ldb_kv = talloc_zero(ldb, struct ldb_kv_private);
if (!ldb_kv) {
ldb_oom(ldb);
return LDB_ERR_OPERATIONS_ERROR;
}
if (flags & LDB_FLG_RDONLY) {
/*
* This is weird, but because we can only have one tdb
* in this process, and the other one could be
* read-write, we can't use the tdb readonly. Plus a
* read only tdb prohibits the all-record lock.
*/
open_flags = O_RDWR;
ldb_kv->read_only = true;
} else if (flags & LDB_FLG_DONT_CREATE_DB) {
/*
* This is used by ldbsearch to prevent creation of the database
* if the name is wrong
*/
open_flags = O_RDWR;
} else {
/*
* This is the normal case
*/
open_flags = O_CREAT | O_RDWR;
}
ldb_kv->kv_ops = &key_value_ops;
errno = 0;
/* note that we use quite a large default hash size */
ldb_kv->tdb = ltdb_wrap_open(ldb_kv,
path,
10000,
tdb_flags,
open_flags,
ldb_get_create_perms(ldb),
ldb);
if (!ldb_kv->tdb) {
ldb_asprintf_errstring(ldb,
"Unable to open tdb '%s': %s", path, strerror(errno));
ldb_debug(ldb, LDB_DEBUG_ERROR,
"Unable to open tdb '%s': %s", path, strerror(errno));
talloc_free(ldb_kv);
if (errno == EACCES || errno == EPERM) {
return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
}
return LDB_ERR_OPERATIONS_ERROR;
}
return ldb_kv_init_store(
ldb_kv, "ldb_tdb backend", ldb, options, _module);
}