1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-13 13:18:06 +03:00
samba-mirror/lib/ldb/ldb_mdb/ldb_mdb.c
Gary Lockyer 322e42818b ldb_mdb: prevent MDB_env reuse across forks
MDB_env's may not be reused accross forks.  Check the pid that the lmdb
structure was created by, and return an error if it is being used by a
different process.

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
2018-05-23 02:27:11 +02:00

900 lines
20 KiB
C

/*
ldb database library using mdb back end
Copyright (C) Jakub Hrozek 2014
Copyright (C) Catalyst.Net Ltd 2017
** NOTE! The following LGPL license applies to the ldb
** library. This does NOT imply that all of Samba is released
** under the LGPL
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 3 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/
#include "ldb_mdb.h"
#include "../ldb_tdb/ldb_tdb.h"
#include "include/dlinklist.h"
#define MDB_URL_PREFIX "mdb://"
#define MDB_URL_PREFIX_SIZE (sizeof(MDB_URL_PREFIX)-1)
#define LDB_MDB_MAX_KEY_LENGTH 511
#define GIGABYTE (1024*1024*1024)
int ldb_mdb_err_map(int lmdb_err)
{
switch (lmdb_err) {
case MDB_SUCCESS:
return LDB_SUCCESS;
case EIO:
return LDB_ERR_OPERATIONS_ERROR;
case EBADE:
case MDB_INCOMPATIBLE:
case MDB_CORRUPTED:
case MDB_INVALID:
return LDB_ERR_UNAVAILABLE;
case MDB_BAD_TXN:
case MDB_BAD_VALSIZE:
#ifdef MDB_BAD_DBI
case MDB_BAD_DBI:
#endif
case MDB_PANIC:
case EINVAL:
return LDB_ERR_PROTOCOL_ERROR;
case MDB_MAP_FULL:
case MDB_DBS_FULL:
case MDB_READERS_FULL:
case MDB_TLS_FULL:
case MDB_TXN_FULL:
case EAGAIN:
return LDB_ERR_BUSY;
case MDB_KEYEXIST:
return LDB_ERR_ENTRY_ALREADY_EXISTS;
case MDB_NOTFOUND:
case ENOENT:
return LDB_ERR_NO_SUCH_OBJECT;
case EACCES:
return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
default:
break;
}
return LDB_ERR_OTHER;
}
#define ldb_mdb_error(ldb, ecode) lmdb_error_at(ldb, ecode, __FILE__, __LINE__)
static int lmdb_error_at(struct ldb_context *ldb,
int ecode,
const char *file,
int line)
{
int ldb_err = ldb_mdb_err_map(ecode);
char *reason = mdb_strerror(ecode);
ldb_asprintf_errstring(ldb,
"(%d) - %s at %s:%d",
ecode,
reason,
file,
line);
return ldb_err;
}
static bool lmdb_transaction_active(struct ltdb_private *ltdb)
{
return ltdb->lmdb_private->txlist != NULL;
}
static MDB_txn *lmdb_trans_get_tx(struct lmdb_trans *ltx)
{
if (ltx == NULL) {
return NULL;
}
return ltx->tx;
}
static void trans_push(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
{
if (lmdb->txlist) {
talloc_steal(lmdb->txlist, ltx);
}
DLIST_ADD(lmdb->txlist, ltx);
}
static void trans_finished(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
{
DLIST_REMOVE(lmdb->txlist, ltx);
talloc_free(ltx);
}
static struct lmdb_trans *lmdb_private_trans_head(struct lmdb_private *lmdb)
{
struct lmdb_trans *ltx;
ltx = lmdb->txlist;
return ltx;
}
static MDB_txn *get_current_txn(struct lmdb_private *lmdb)
{
MDB_txn *txn = NULL;
if (lmdb->read_txn != NULL) {
return lmdb->read_txn;
}
txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
if (txn == NULL) {
int ret;
ret = mdb_txn_begin(lmdb->env, NULL, MDB_RDONLY, &txn);
if (ret != 0) {
lmdb->error = ret;
ldb_asprintf_errstring(lmdb->ldb,
"%s failed: %s\n", __FUNCTION__,
mdb_strerror(ret));
return NULL;
}
lmdb->read_txn = txn;
}
return txn;
}
static int lmdb_store(struct ltdb_private *ltdb,
struct ldb_val key,
struct ldb_val data, int flags)
{
struct lmdb_private *lmdb = ltdb->lmdb_private;
MDB_val mdb_key;
MDB_val mdb_data;
int mdb_flags;
MDB_txn *txn = NULL;
MDB_dbi dbi = 0;
if (ltdb->read_only) {
return LDB_ERR_UNWILLING_TO_PERFORM;
}
txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
if (txn == NULL) {
ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
lmdb->error = MDB_PANIC;
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
mdb_key.mv_size = key.length;
mdb_key.mv_data = key.data;
mdb_data.mv_size = data.length;
mdb_data.mv_data = data.data;
if (flags == TDB_INSERT) {
mdb_flags = MDB_NOOVERWRITE;
} else if ((flags == TDB_MODIFY)) {
/*
* Modifying a record, ensure that it exists.
* This mimics the TDB semantics
*/
MDB_val value;
lmdb->error = mdb_get(txn, dbi, &mdb_key, &value);
if (lmdb->error != MDB_SUCCESS) {
if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
mdb_txn_commit(lmdb->read_txn);
lmdb->read_txn = NULL;
}
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
mdb_flags = 0;
} else {
mdb_flags = 0;
}
lmdb->error = mdb_put(txn, dbi, &mdb_key, &mdb_data, mdb_flags);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
return ldb_mdb_err_map(lmdb->error);
}
static int lmdb_delete(struct ltdb_private *ltdb, struct ldb_val key)
{
struct lmdb_private *lmdb = ltdb->lmdb_private;
MDB_val mdb_key;
MDB_txn *txn = NULL;
MDB_dbi dbi = 0;
if (ltdb->read_only) {
return LDB_ERR_UNWILLING_TO_PERFORM;
}
txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
if (txn == NULL) {
ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
lmdb->error = MDB_PANIC;
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
mdb_key.mv_size = key.length;
mdb_key.mv_data = key.data;
lmdb->error = mdb_del(txn, dbi, &mdb_key, NULL);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
return ldb_mdb_err_map(lmdb->error);
}
static int lmdb_traverse_fn(struct ltdb_private *ltdb,
ldb_kv_traverse_fn fn,
void *ctx)
{
struct lmdb_private *lmdb = ltdb->lmdb_private;
MDB_val mdb_key;
MDB_val mdb_data;
MDB_txn *txn = NULL;
MDB_dbi dbi = 0;
MDB_cursor *cursor = NULL;
int ret;
txn = get_current_txn(lmdb);
if (txn == NULL) {
ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
lmdb->error = MDB_PANIC;
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
lmdb->error = mdb_cursor_open(txn, dbi, &cursor);
if (lmdb->error != MDB_SUCCESS) {
goto done;
}
while ((lmdb->error = mdb_cursor_get(
cursor, &mdb_key,
&mdb_data, MDB_NEXT)) == MDB_SUCCESS) {
struct ldb_val key = {
.length = mdb_key.mv_size,
.data = mdb_key.mv_data,
};
struct ldb_val data = {
.length = mdb_data.mv_size,
.data = mdb_data.mv_data,
};
ret = fn(ltdb, key, data, ctx);
if (ret != 0) {
goto done;
}
}
if (lmdb->error == MDB_NOTFOUND) {
lmdb->error = MDB_SUCCESS;
}
done:
if (cursor != NULL) {
mdb_cursor_close(cursor);
}
if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
mdb_txn_commit(lmdb->read_txn);
lmdb->read_txn = NULL;
}
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
return ldb_mdb_err_map(lmdb->error);
}
static int lmdb_update_in_iterate(struct ltdb_private *ltdb,
struct ldb_val key,
struct ldb_val key2,
struct ldb_val data,
void *state)
{
struct lmdb_private *lmdb = ltdb->lmdb_private;
struct ldb_val copy;
int ret = LDB_SUCCESS;
/*
* Need to take a copy of the data as the delete operation alters the
* data, as it is in private lmdb memory.
*/
copy.length = data.length;
copy.data = talloc_memdup(ltdb, data.data, data.length);
if (copy.data == NULL) {
lmdb->error = MDB_PANIC;
return ldb_oom(lmdb->ldb);
}
lmdb->error = lmdb_delete(ltdb, key);
if (lmdb->error != MDB_SUCCESS) {
ldb_debug(
lmdb->ldb,
LDB_DEBUG_ERROR,
"Failed to delete %*.*s "
"for rekey as %*.*s: %s",
(int)key.length, (int)key.length,
(const char *)key.data,
(int)key2.length, (int)key2.length,
(const char *)key.data,
mdb_strerror(lmdb->error));
ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
goto done;
}
lmdb->error = lmdb_store(ltdb, key2, copy, 0);
if (lmdb->error != MDB_SUCCESS) {
ldb_debug(
lmdb->ldb,
LDB_DEBUG_ERROR,
"Failed to rekey %*.*s as %*.*s: %s",
(int)key.length, (int)key.length,
(const char *)key.data,
(int)key2.length, (int)key2.length,
(const char *)key.data,
mdb_strerror(lmdb->error));
ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
goto done;
}
done:
if (copy.data != NULL) {
TALLOC_FREE(copy.data);
copy.length = 0;
}
/*
* Explicity invalidate the data, as the delete has done this
*/
data.length = 0;
data.data = NULL;
return ret;
}
/* Handles only a single record */
static int lmdb_parse_record(struct ltdb_private *ltdb, struct ldb_val key,
int (*parser)(struct ldb_val key, struct ldb_val data,
void *private_data),
void *ctx)
{
struct lmdb_private *lmdb = ltdb->lmdb_private;
MDB_val mdb_key;
MDB_val mdb_data;
MDB_txn *txn = NULL;
MDB_dbi dbi;
struct ldb_val data;
txn = get_current_txn(lmdb);
if (txn == NULL) {
ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction active");
lmdb->error = MDB_PANIC;
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
mdb_key.mv_size = key.length;
mdb_key.mv_data = key.data;
lmdb->error = mdb_get(txn, dbi, &mdb_key, &mdb_data);
if (lmdb->error != MDB_SUCCESS) {
/* TODO closing a handle should not even be necessary */
mdb_dbi_close(lmdb->env, dbi);
if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
mdb_txn_commit(lmdb->read_txn);
lmdb->read_txn = NULL;
}
if (lmdb->error == MDB_NOTFOUND) {
return LDB_ERR_NO_SUCH_OBJECT;
}
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
data.data = mdb_data.mv_data;
data.length = mdb_data.mv_size;
/* TODO closing a handle should not even be necessary */
mdb_dbi_close(lmdb->env, dbi);
/* We created a read transaction, commit it */
if (ltdb->read_lock_count == 0 && lmdb->read_txn != NULL) {
mdb_txn_commit(lmdb->read_txn);
lmdb->read_txn = NULL;
}
return parser(key, data, ctx);
}
static int lmdb_lock_read(struct ldb_module *module)
{
void *data = ldb_module_get_private(module);
struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
struct lmdb_private *lmdb = ltdb->lmdb_private;
pid_t pid = getpid();
if (pid != lmdb->pid) {
ldb_asprintf_errstring(
lmdb->ldb,
__location__": Reusing ldb opened by pid %d in "
"process %d\n",
lmdb->pid,
pid);
lmdb->error = MDB_BAD_TXN;
return LDB_ERR_PROTOCOL_ERROR;
}
lmdb->error = MDB_SUCCESS;
if (lmdb_transaction_active(ltdb) == false &&
ltdb->read_lock_count == 0) {
lmdb->error = mdb_txn_begin(lmdb->env,
NULL,
MDB_RDONLY,
&lmdb->read_txn);
}
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
ltdb->read_lock_count++;
return ldb_mdb_err_map(lmdb->error);
}
static int lmdb_unlock_read(struct ldb_module *module)
{
void *data = ldb_module_get_private(module);
struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
if (lmdb_transaction_active(ltdb) == false && ltdb->read_lock_count == 1) {
struct lmdb_private *lmdb = ltdb->lmdb_private;
mdb_txn_commit(lmdb->read_txn);
lmdb->read_txn = NULL;
ltdb->read_lock_count--;
return LDB_SUCCESS;
}
ltdb->read_lock_count--;
return LDB_SUCCESS;
}
static int lmdb_transaction_start(struct ltdb_private *ltdb)
{
struct lmdb_private *lmdb = ltdb->lmdb_private;
struct lmdb_trans *ltx;
struct lmdb_trans *ltx_head;
MDB_txn *tx_parent;
pid_t pid = getpid();
/* Do not take out the transaction lock on a read-only DB */
if (ltdb->read_only) {
return LDB_ERR_UNWILLING_TO_PERFORM;
}
ltx = talloc_zero(lmdb, struct lmdb_trans);
if (ltx == NULL) {
return ldb_oom(lmdb->ldb);
}
if (pid != lmdb->pid) {
ldb_asprintf_errstring(
lmdb->ldb,
__location__": Reusing ldb opened by pid %d in "
"process %d\n",
lmdb->pid,
pid);
lmdb->error = MDB_BAD_TXN;
return LDB_ERR_PROTOCOL_ERROR;
}
ltx_head = lmdb_private_trans_head(lmdb);
tx_parent = lmdb_trans_get_tx(ltx_head);
lmdb->error = mdb_txn_begin(lmdb->env, tx_parent, 0, &ltx->tx);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
trans_push(lmdb, ltx);
return ldb_mdb_err_map(lmdb->error);
}
static int lmdb_transaction_cancel(struct ltdb_private *ltdb)
{
struct lmdb_trans *ltx;
struct lmdb_private *lmdb = ltdb->lmdb_private;
ltx = lmdb_private_trans_head(lmdb);
if (ltx == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
mdb_txn_abort(ltx->tx);
trans_finished(lmdb, ltx);
return LDB_SUCCESS;
}
static int lmdb_transaction_prepare_commit(struct ltdb_private *ltdb)
{
/* No need to prepare a commit */
return LDB_SUCCESS;
}
static int lmdb_transaction_commit(struct ltdb_private *ltdb)
{
struct lmdb_trans *ltx;
struct lmdb_private *lmdb = ltdb->lmdb_private;
ltx = lmdb_private_trans_head(lmdb);
if (ltx == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
lmdb->error = mdb_txn_commit(ltx->tx);
trans_finished(lmdb, ltx);
return lmdb->error;
}
static int lmdb_error(struct ltdb_private *ltdb)
{
return ldb_mdb_err_map(ltdb->lmdb_private->error);
}
static const char *lmdb_errorstr(struct ltdb_private *ltdb)
{
return mdb_strerror(ltdb->lmdb_private->error);
}
static const char * lmdb_name(struct ltdb_private *ltdb)
{
return "lmdb";
}
static bool lmdb_changed(struct ltdb_private *ltdb)
{
/*
* lmdb does no provide a quick way to determine if the database
* has changed. This function always returns true.
*
* Note that tdb uses a sequence number that allows this function
* to be implemented efficiently.
*/
return true;
}
static struct kv_db_ops lmdb_key_value_ops = {
.store = lmdb_store,
.delete = lmdb_delete,
.iterate = lmdb_traverse_fn,
.update_in_iterate = lmdb_update_in_iterate,
.fetch_and_parse = lmdb_parse_record,
.lock_read = lmdb_lock_read,
.unlock_read = lmdb_unlock_read,
.begin_write = lmdb_transaction_start,
.prepare_write = lmdb_transaction_prepare_commit,
.finish_write = lmdb_transaction_commit,
.abort_write = lmdb_transaction_cancel,
.error = lmdb_error,
.errorstr = lmdb_errorstr,
.name = lmdb_name,
.has_changed = lmdb_changed,
.transaction_active = lmdb_transaction_active,
};
static const char *lmdb_get_path(const char *url)
{
const char *path;
/* parse the url */
if (strchr(url, ':')) {
if (strncmp(url, MDB_URL_PREFIX, MDB_URL_PREFIX_SIZE) != 0) {
return NULL;
}
path = url + MDB_URL_PREFIX_SIZE;
} else {
path = url;
}
return path;
}
static int lmdb_pvt_destructor(struct lmdb_private *lmdb)
{
struct lmdb_trans *ltx = NULL;
/* Check if this is a forked child */
if (getpid() != lmdb->pid) {
int fd = 0;
/*
* We cannot call mdb_env_close or commit any transactions,
* otherwise they might appear finished in the parent.
*
*/
if (mdb_env_get_fd(lmdb->env, &fd) == 0) {
close(fd);
}
/* Remove the pointer, so that no access should occur */
lmdb->env = NULL;
return 0;
}
/*
* Close the read transaction if it's open
*/
if (lmdb->read_txn != NULL) {
mdb_txn_abort(lmdb->read_txn);
}
if (lmdb->env == NULL) {
return 0;
}
/*
* Abort any currently active transactions
*/
ltx = lmdb_private_trans_head(lmdb);
while (ltx != NULL) {
mdb_txn_abort(ltx->tx);
trans_finished(lmdb, ltx);
ltx = lmdb_private_trans_head(lmdb);
}
lmdb->env = NULL;
return 0;
}
struct mdb_env_wrap {
struct mdb_env_wrap *next, *prev;
dev_t device;
ino_t inode;
MDB_env *env;
pid_t pid;
};
static struct mdb_env_wrap *mdb_list;
/* destroy the last connection to an mdb */
static int mdb_env_wrap_destructor(struct mdb_env_wrap *w)
{
mdb_env_close(w->env);
DLIST_REMOVE(mdb_list, w);
return 0;
}
static int lmdb_open_env(TALLOC_CTX *mem_ctx,
MDB_env **env,
struct ldb_context *ldb,
const char *path,
unsigned int flags)
{
int ret;
const size_t mmap_size = 8LL * GIGABYTE;
unsigned int mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
/*
* MDB_NOSUBDIR implies there is a separate file called path and a
* separate lockfile called path-lock
*/
struct mdb_env_wrap *w;
struct stat st;
pid_t pid = getpid();
if (stat(path, &st) == 0) {
for (w=mdb_list;w;w=w->next) {
if (st.st_dev == w->device &&
st.st_ino == w->inode &&
pid == w->pid) {
/*
* We must have only one MDB_env per process
*/
if (!talloc_reference(mem_ctx, w)) {
return ldb_oom(ldb);
}
*env = w->env;
return LDB_SUCCESS;
}
}
}
w = talloc(mem_ctx, struct mdb_env_wrap);
if (w == NULL) {
return ldb_oom(ldb);
}
ret = mdb_env_create(env);
if (ret != 0) {
ldb_asprintf_errstring(
ldb,
"Could not create MDB environment %s: %s\n",
path,
mdb_strerror(ret));
return ldb_mdb_err_map(ret);
}
/*
* Currently we set a 8Gb maximum database size
* via the constant mmap_size above
*/
ret = mdb_env_set_mapsize(*env, mmap_size);
if (ret != 0) {
ldb_asprintf_errstring(
ldb,
"Could not set MDB mmap() size to %llu on %s: %s\n",
(unsigned long long)(mmap_size),
path,
mdb_strerror(ret));
TALLOC_FREE(w);
return ldb_mdb_err_map(ret);
}
mdb_env_set_maxreaders(*env, 100000);
/*
* As we ensure that there is only one MDB_env open per database per
* process. We can not use the MDB_RDONLY flag, as another ldb may be
* opened in read write mode
*/
if (flags & LDB_FLG_NOSYNC) {
mdb_flags |= MDB_NOSYNC;
}
ret = mdb_env_open(*env, path, mdb_flags, 0644);
if (ret != 0) {
ldb_asprintf_errstring(ldb,
"Could not open DB %s: %s\n",
path, mdb_strerror(ret));
TALLOC_FREE(w);
return ldb_mdb_err_map(ret);
}
if (stat(path, &st) != 0) {
ldb_asprintf_errstring(
ldb,
"Could not stat %s:\n",
path);
TALLOC_FREE(w);
return LDB_ERR_OPERATIONS_ERROR;
}
w->env = *env;
w->device = st.st_dev;
w->inode = st.st_ino;
w->pid = pid;
talloc_set_destructor(w, mdb_env_wrap_destructor);
DLIST_ADD(mdb_list, w);
return LDB_SUCCESS;
}
static int lmdb_pvt_open(struct lmdb_private *lmdb,
struct ldb_context *ldb,
const char *path,
unsigned int flags)
{
int ret;
int lmdb_max_key_length;
if (flags & LDB_FLG_DONT_CREATE_DB) {
struct stat st;
if (stat(path, &st) != 0) {
return LDB_ERR_UNAVAILABLE;
}
}
ret = lmdb_open_env(lmdb, &lmdb->env, ldb, path, flags);
if (ret != 0) {
return ret;
}
/* Close when lmdb is released */
talloc_set_destructor(lmdb, lmdb_pvt_destructor);
/* Store the original pid during the LMDB open */
lmdb->pid = getpid();
lmdb_max_key_length = mdb_env_get_maxkeysize(lmdb->env);
/* This will never happen, but if it does make sure to freak out */
if (lmdb_max_key_length < LDB_MDB_MAX_KEY_LENGTH) {
return ldb_operr(ldb);
}
return LDB_SUCCESS;
}
int lmdb_connect(struct ldb_context *ldb,
const char *url,
unsigned int flags,
const char *options[],
struct ldb_module **_module)
{
const char *path = NULL;
struct lmdb_private *lmdb = NULL;
struct ltdb_private *ltdb = NULL;
int ret;
/*
* We hold locks, so we must use a private event context
* on each returned handle
*/
ldb_set_require_private_event_context(ldb);
path = lmdb_get_path(url);
if (path == NULL) {
ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid mdb URL '%s'", url);
return LDB_ERR_OPERATIONS_ERROR;
}
ltdb = talloc_zero(ldb, struct ltdb_private);
if (!ltdb) {
ldb_oom(ldb);
return LDB_ERR_OPERATIONS_ERROR;
}
lmdb = talloc_zero(ltdb, struct lmdb_private);
if (lmdb == NULL) {
TALLOC_FREE(ltdb);
return ldb_oom(ldb);
}
lmdb->ldb = ldb;
ltdb->kv_ops = &lmdb_key_value_ops;
ret = lmdb_pvt_open(lmdb, ldb, path, flags);
if (ret != LDB_SUCCESS) {
TALLOC_FREE(ltdb);
return ret;
}
ltdb->lmdb_private = lmdb;
if (flags & LDB_FLG_RDONLY) {
ltdb->read_only = true;
}
/*
* This maximum length becomes encoded in the index values so
* must never change even if LMDB starts to allow longer keys.
* The override option is max_key_len_for_self_test, and is
* used for testing only.
*/
ltdb->max_key_length = LDB_MDB_MAX_KEY_LENGTH;
return init_store(ltdb, "ldb_mdb backend", ldb, options, _module);
}