1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-08 21:18:16 +03:00
samba-mirror/lib/ldb/ldb_mdb/ldb_mdb.c
Gary Lockyer 9df4d6ab49 Fix clang 9 parentheses-equality warnings
Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andreas Schneider <asn@samba.org>
2020-05-08 09:31:31 +00:00

1144 lines
26 KiB
C

/*
ldb database library using mdb back end
Copyright (C) Jakub Hrozek 2014
Copyright (C) Catalyst.Net Ltd 2017
** NOTE! The following LGPL license applies to the ldb
** library. This does NOT imply that all of Samba is released
** under the LGPL
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 3 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/
#include "ldb_mdb.h"
#include "../ldb_key_value/ldb_kv.h"
#include "include/dlinklist.h"
#define MDB_URL_PREFIX "mdb://"
#define MDB_URL_PREFIX_SIZE (sizeof(MDB_URL_PREFIX)-1)
#define LDB_MDB_MAX_KEY_LENGTH 511
#define GIGABYTE (1024*1024*1024)
int ldb_mdb_err_map(int lmdb_err)
{
switch (lmdb_err) {
case MDB_SUCCESS:
return LDB_SUCCESS;
case EIO:
return LDB_ERR_OPERATIONS_ERROR;
#ifdef EBADE
case EBADE:
#endif
case MDB_INCOMPATIBLE:
case MDB_CORRUPTED:
case MDB_INVALID:
return LDB_ERR_UNAVAILABLE;
case MDB_BAD_TXN:
case MDB_BAD_VALSIZE:
#ifdef MDB_BAD_DBI
case MDB_BAD_DBI:
#endif
case MDB_PANIC:
case EINVAL:
return LDB_ERR_PROTOCOL_ERROR;
case MDB_MAP_FULL:
case MDB_DBS_FULL:
case MDB_READERS_FULL:
case MDB_TLS_FULL:
case MDB_TXN_FULL:
case EAGAIN:
return LDB_ERR_BUSY;
case MDB_KEYEXIST:
return LDB_ERR_ENTRY_ALREADY_EXISTS;
case MDB_NOTFOUND:
case ENOENT:
return LDB_ERR_NO_SUCH_OBJECT;
case EACCES:
return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
default:
break;
}
return LDB_ERR_OTHER;
}
#define ldb_mdb_error(ldb, ecode) lmdb_error_at(ldb, ecode, __FILE__, __LINE__)
static int lmdb_error_at(struct ldb_context *ldb,
int ecode,
const char *file,
int line)
{
int ldb_err = ldb_mdb_err_map(ecode);
char *reason = mdb_strerror(ecode);
ldb_asprintf_errstring(ldb,
"(%d) - %s at %s:%d",
ecode,
reason,
file,
line);
return ldb_err;
}
static bool lmdb_transaction_active(struct ldb_kv_private *ldb_kv)
{
return ldb_kv->lmdb_private->txlist != NULL;
}
static MDB_txn *lmdb_trans_get_tx(struct lmdb_trans *ltx)
{
if (ltx == NULL) {
return NULL;
}
return ltx->tx;
}
static void trans_push(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
{
if (lmdb->txlist) {
talloc_steal(lmdb->txlist, ltx);
}
DLIST_ADD(lmdb->txlist, ltx);
}
static void trans_finished(struct lmdb_private *lmdb, struct lmdb_trans *ltx)
{
DLIST_REMOVE(lmdb->txlist, ltx);
talloc_free(ltx);
}
static struct lmdb_trans *lmdb_private_trans_head(struct lmdb_private *lmdb)
{
struct lmdb_trans *ltx;
ltx = lmdb->txlist;
return ltx;
}
static MDB_txn *get_current_txn(struct lmdb_private *lmdb)
{
MDB_txn *txn = NULL;
txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
if (txn != NULL) {
return txn;
}
if (lmdb->read_txn != NULL) {
return lmdb->read_txn;
}
lmdb->error = MDB_BAD_TXN;
ldb_set_errstring(lmdb->ldb, __location__":No active transaction\n");
return NULL;
}
static int lmdb_store(struct ldb_kv_private *ldb_kv,
struct ldb_val key,
struct ldb_val data,
int flags)
{
struct lmdb_private *lmdb = ldb_kv->lmdb_private;
MDB_val mdb_key;
MDB_val mdb_data;
int mdb_flags;
MDB_txn *txn = NULL;
MDB_dbi dbi = 0;
if (ldb_kv->read_only) {
return LDB_ERR_UNWILLING_TO_PERFORM;
}
txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
if (txn == NULL) {
ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
lmdb->error = MDB_PANIC;
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
mdb_key.mv_size = key.length;
mdb_key.mv_data = key.data;
mdb_data.mv_size = data.length;
mdb_data.mv_data = data.data;
if (flags == TDB_INSERT) {
mdb_flags = MDB_NOOVERWRITE;
} else if (flags == TDB_MODIFY) {
/*
* Modifying a record, ensure that it exists.
* This mimics the TDB semantics
*/
MDB_val value;
lmdb->error = mdb_get(txn, dbi, &mdb_key, &value);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
mdb_flags = 0;
} else {
mdb_flags = 0;
}
lmdb->error = mdb_put(txn, dbi, &mdb_key, &mdb_data, mdb_flags);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
return ldb_mdb_err_map(lmdb->error);
}
static int lmdb_delete(struct ldb_kv_private *ldb_kv, struct ldb_val key)
{
struct lmdb_private *lmdb = ldb_kv->lmdb_private;
MDB_val mdb_key;
MDB_txn *txn = NULL;
MDB_dbi dbi = 0;
if (ldb_kv->read_only) {
return LDB_ERR_UNWILLING_TO_PERFORM;
}
txn = lmdb_trans_get_tx(lmdb_private_trans_head(lmdb));
if (txn == NULL) {
ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
lmdb->error = MDB_PANIC;
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
mdb_key.mv_size = key.length;
mdb_key.mv_data = key.data;
lmdb->error = mdb_del(txn, dbi, &mdb_key, NULL);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
return ldb_mdb_err_map(lmdb->error);
}
static int lmdb_traverse_fn(struct ldb_kv_private *ldb_kv,
ldb_kv_traverse_fn fn,
void *ctx)
{
struct lmdb_private *lmdb = ldb_kv->lmdb_private;
MDB_val mdb_key;
MDB_val mdb_data;
MDB_txn *txn = NULL;
MDB_dbi dbi = 0;
MDB_cursor *cursor = NULL;
int ret;
txn = get_current_txn(lmdb);
if (txn == NULL) {
ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
lmdb->error = MDB_PANIC;
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
lmdb->error = mdb_cursor_open(txn, dbi, &cursor);
if (lmdb->error != MDB_SUCCESS) {
goto done;
}
while ((lmdb->error = mdb_cursor_get(
cursor, &mdb_key,
&mdb_data, MDB_NEXT)) == MDB_SUCCESS) {
struct ldb_val key = {
.length = mdb_key.mv_size,
.data = mdb_key.mv_data,
};
struct ldb_val data = {
.length = mdb_data.mv_size,
.data = mdb_data.mv_data,
};
ret = fn(ldb_kv, key, data, ctx);
if (ret != 0) {
/*
* NOTE: This DOES NOT set lmdb->error!
*
* This means that the caller will get success.
* This matches TDB traverse behaviour, where callbacks
* may terminate the traverse, but do not change the
* return code from success.
*
* Callers SHOULD store their own error codes.
*/
goto done;
}
}
if (lmdb->error == MDB_NOTFOUND) {
lmdb->error = MDB_SUCCESS;
}
done:
if (cursor != NULL) {
mdb_cursor_close(cursor);
}
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
return ldb_mdb_err_map(lmdb->error);
}
static int lmdb_update_in_iterate(struct ldb_kv_private *ldb_kv,
struct ldb_val key,
struct ldb_val key2,
struct ldb_val data,
void *state)
{
struct lmdb_private *lmdb = ldb_kv->lmdb_private;
struct ldb_val copy;
int ret = LDB_SUCCESS;
/*
* Need to take a copy of the data as the delete operation alters the
* data, as it is in private lmdb memory.
*/
copy.length = data.length;
copy.data = talloc_memdup(ldb_kv, data.data, data.length);
if (copy.data == NULL) {
lmdb->error = MDB_PANIC;
return ldb_oom(lmdb->ldb);
}
lmdb->error = lmdb_delete(ldb_kv, key);
if (lmdb->error != MDB_SUCCESS) {
ldb_debug(
lmdb->ldb,
LDB_DEBUG_ERROR,
"Failed to delete %*.*s "
"for rekey as %*.*s: %s",
(int)key.length, (int)key.length,
(const char *)key.data,
(int)key2.length, (int)key2.length,
(const char *)key.data,
mdb_strerror(lmdb->error));
ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
goto done;
}
lmdb->error = lmdb_store(ldb_kv, key2, copy, 0);
if (lmdb->error != MDB_SUCCESS) {
ldb_debug(
lmdb->ldb,
LDB_DEBUG_ERROR,
"Failed to rekey %*.*s as %*.*s: %s",
(int)key.length, (int)key.length,
(const char *)key.data,
(int)key2.length, (int)key2.length,
(const char *)key.data,
mdb_strerror(lmdb->error));
ret = ldb_mdb_error(lmdb->ldb, lmdb->error);
goto done;
}
done:
if (copy.data != NULL) {
TALLOC_FREE(copy.data);
copy.length = 0;
}
/*
* Explicity invalidate the data, as the delete has done this
*/
data.length = 0;
data.data = NULL;
return ret;
}
/* Handles only a single record */
static int lmdb_parse_record(struct ldb_kv_private *ldb_kv,
struct ldb_val key,
int (*parser)(struct ldb_val key,
struct ldb_val data,
void *private_data),
void *ctx)
{
struct lmdb_private *lmdb = ldb_kv->lmdb_private;
MDB_val mdb_key;
MDB_val mdb_data;
MDB_txn *txn = NULL;
MDB_dbi dbi;
struct ldb_val data;
txn = get_current_txn(lmdb);
if (txn == NULL) {
ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction active");
lmdb->error = MDB_PANIC;
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
mdb_key.mv_size = key.length;
mdb_key.mv_data = key.data;
lmdb->error = mdb_get(txn, dbi, &mdb_key, &mdb_data);
if (lmdb->error != MDB_SUCCESS) {
/* TODO closing a handle should not even be necessary */
mdb_dbi_close(lmdb->env, dbi);
if (lmdb->error == MDB_NOTFOUND) {
return LDB_ERR_NO_SUCH_OBJECT;
}
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
data.data = mdb_data.mv_data;
data.length = mdb_data.mv_size;
/* TODO closing a handle should not even be necessary */
mdb_dbi_close(lmdb->env, dbi);
return parser(key, data, ctx);
}
/*
* Exactly the same as iterate, except we have a start key and an end key
* (which are both included in the results if present).
*
* If start > end, return MDB_PANIC.
*/
static int lmdb_iterate_range(struct ldb_kv_private *ldb_kv,
struct ldb_val start_key,
struct ldb_val end_key,
ldb_kv_traverse_fn fn,
void *ctx)
{
struct lmdb_private *lmdb = ldb_kv->lmdb_private;
MDB_val mdb_key;
MDB_val mdb_data;
MDB_txn *txn = NULL;
MDB_dbi dbi = 0;
MDB_cursor *cursor = NULL;
int ret;
MDB_val mdb_s_key;
MDB_val mdb_e_key;
txn = get_current_txn(lmdb);
if (txn == NULL) {
ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
lmdb->error = MDB_PANIC;
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
mdb_s_key.mv_size = start_key.length;
mdb_s_key.mv_data = start_key.data;
mdb_e_key.mv_size = end_key.length;
mdb_e_key.mv_data = end_key.data;
if (mdb_cmp(txn, dbi, &mdb_s_key, &mdb_e_key) > 0) {
lmdb->error = MDB_PANIC;
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
lmdb->error = mdb_cursor_open(txn, dbi, &cursor);
if (lmdb->error != MDB_SUCCESS) {
goto done;
}
lmdb->error = mdb_cursor_get(cursor, &mdb_s_key, &mdb_data, MDB_SET_RANGE);
if (lmdb->error != MDB_SUCCESS) {
if (lmdb->error == MDB_NOTFOUND) {
lmdb->error = MDB_SUCCESS;
}
goto done;
} else {
struct ldb_val key = {
.length = mdb_s_key.mv_size,
.data = mdb_s_key.mv_data,
};
struct ldb_val data = {
.length = mdb_data.mv_size,
.data = mdb_data.mv_data,
};
if (mdb_cmp(txn, dbi, &mdb_s_key, &mdb_e_key) > 0) {
goto done;
}
ret = fn(ldb_kv, key, data, ctx);
if (ret != 0) {
/*
* NOTE: This DOES NOT set lmdb->error!
*
* This means that the caller will get success.
* This matches TDB traverse behaviour, where callbacks
* may terminate the traverse, but do not change the
* return code from success.
*
* Callers SHOULD store their own error codes.
*/
goto done;
}
}
while ((lmdb->error = mdb_cursor_get(
cursor, &mdb_key,
&mdb_data, MDB_NEXT)) == MDB_SUCCESS) {
struct ldb_val key = {
.length = mdb_key.mv_size,
.data = mdb_key.mv_data,
};
struct ldb_val data = {
.length = mdb_data.mv_size,
.data = mdb_data.mv_data,
};
if (mdb_cmp(txn, dbi, &mdb_key, &mdb_e_key) > 0) {
goto done;
}
ret = fn(ldb_kv, key, data, ctx);
if (ret != 0) {
/*
* NOTE: This DOES NOT set lmdb->error!
*
* This means that the caller will get success.
* This matches TDB traverse behaviour, where callbacks
* may terminate the traverse, but do not change the
* return code from success.
*
* Callers SHOULD store their own error codes.
*/
goto done;
}
}
if (lmdb->error == MDB_NOTFOUND) {
lmdb->error = MDB_SUCCESS;
}
done:
if (cursor != NULL) {
mdb_cursor_close(cursor);
}
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
return ldb_mdb_err_map(lmdb->error);
}
static int lmdb_lock_read(struct ldb_module *module)
{
void *data = ldb_module_get_private(module);
struct ldb_kv_private *ldb_kv =
talloc_get_type(data, struct ldb_kv_private);
struct lmdb_private *lmdb = ldb_kv->lmdb_private;
pid_t pid = getpid();
if (pid != lmdb->pid) {
ldb_asprintf_errstring(
lmdb->ldb,
__location__": Reusing ldb opened by pid %d in "
"process %d\n",
lmdb->pid,
pid);
lmdb->error = MDB_BAD_TXN;
return LDB_ERR_PROTOCOL_ERROR;
}
lmdb->error = MDB_SUCCESS;
if (lmdb_transaction_active(ldb_kv) == false &&
ldb_kv->read_lock_count == 0) {
lmdb->error = mdb_txn_begin(lmdb->env,
NULL,
MDB_RDONLY,
&lmdb->read_txn);
}
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
ldb_kv->read_lock_count++;
return ldb_mdb_err_map(lmdb->error);
}
static int lmdb_unlock_read(struct ldb_module *module)
{
void *data = ldb_module_get_private(module);
struct ldb_kv_private *ldb_kv =
talloc_get_type(data, struct ldb_kv_private);
if (lmdb_transaction_active(ldb_kv) == false &&
ldb_kv->read_lock_count == 1) {
struct lmdb_private *lmdb = ldb_kv->lmdb_private;
mdb_txn_commit(lmdb->read_txn);
lmdb->read_txn = NULL;
ldb_kv->read_lock_count--;
return LDB_SUCCESS;
}
ldb_kv->read_lock_count--;
return LDB_SUCCESS;
}
static int lmdb_transaction_start(struct ldb_kv_private *ldb_kv)
{
struct lmdb_private *lmdb = ldb_kv->lmdb_private;
struct lmdb_trans *ltx;
struct lmdb_trans *ltx_head;
MDB_txn *tx_parent;
pid_t pid = getpid();
/* Do not take out the transaction lock on a read-only DB */
if (ldb_kv->read_only) {
return LDB_ERR_UNWILLING_TO_PERFORM;
}
ltx = talloc_zero(lmdb, struct lmdb_trans);
if (ltx == NULL) {
return ldb_oom(lmdb->ldb);
}
if (pid != lmdb->pid) {
ldb_asprintf_errstring(
lmdb->ldb,
__location__": Reusing ldb opened by pid %d in "
"process %d\n",
lmdb->pid,
pid);
lmdb->error = MDB_BAD_TXN;
return LDB_ERR_PROTOCOL_ERROR;
}
/*
* Clear out any stale readers
*/
{
int stale = 0;
mdb_reader_check(lmdb->env, &stale);
if (stale > 0) {
ldb_debug(
lmdb->ldb,
LDB_DEBUG_ERROR,
"LMDB Stale readers, deleted (%d)",
stale);
}
}
ltx_head = lmdb_private_trans_head(lmdb);
tx_parent = lmdb_trans_get_tx(ltx_head);
lmdb->error = mdb_txn_begin(lmdb->env, tx_parent, 0, &ltx->tx);
if (lmdb->error != MDB_SUCCESS) {
return ldb_mdb_error(lmdb->ldb, lmdb->error);
}
trans_push(lmdb, ltx);
return ldb_mdb_err_map(lmdb->error);
}
static int lmdb_transaction_cancel(struct ldb_kv_private *ldb_kv)
{
struct lmdb_trans *ltx;
struct lmdb_private *lmdb = ldb_kv->lmdb_private;
ltx = lmdb_private_trans_head(lmdb);
if (ltx == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
mdb_txn_abort(ltx->tx);
trans_finished(lmdb, ltx);
return LDB_SUCCESS;
}
static int lmdb_transaction_prepare_commit(struct ldb_kv_private *ldb_kv)
{
/* No need to prepare a commit */
return LDB_SUCCESS;
}
static int lmdb_transaction_commit(struct ldb_kv_private *ldb_kv)
{
struct lmdb_trans *ltx;
struct lmdb_private *lmdb = ldb_kv->lmdb_private;
ltx = lmdb_private_trans_head(lmdb);
if (ltx == NULL) {
return LDB_ERR_OPERATIONS_ERROR;
}
lmdb->error = mdb_txn_commit(ltx->tx);
trans_finished(lmdb, ltx);
return lmdb->error;
}
static int lmdb_error(struct ldb_kv_private *ldb_kv)
{
return ldb_mdb_err_map(ldb_kv->lmdb_private->error);
}
static const char *lmdb_errorstr(struct ldb_kv_private *ldb_kv)
{
return mdb_strerror(ldb_kv->lmdb_private->error);
}
static const char *lmdb_name(struct ldb_kv_private *ldb_kv)
{
return "lmdb";
}
static bool lmdb_changed(struct ldb_kv_private *ldb_kv)
{
/*
* lmdb does no provide a quick way to determine if the database
* has changed. This function always returns true.
*
* Note that tdb uses a sequence number that allows this function
* to be implemented efficiently.
*/
return true;
}
/*
* Get the number of records in the database.
*
* The mdb_env_stat call returns an accurate count, so we return the actual
* number of records in the database rather than an estimate.
*/
static size_t lmdb_get_size(struct ldb_kv_private *ldb_kv)
{
struct MDB_stat stats = {0};
struct lmdb_private *lmdb = ldb_kv->lmdb_private;
int ret = 0;
ret = mdb_env_stat(lmdb->env, &stats);
if (ret != 0) {
return 0;
}
return stats.ms_entries;
}
/*
* Start a sub transaction
* As lmdb supports nested transactions we can start a new transaction
*/
static int lmdb_nested_transaction_start(struct ldb_kv_private *ldb_kv)
{
int ret = lmdb_transaction_start(ldb_kv);
return ret;
}
/*
* Commit a sub transaction
* As lmdb supports nested transactions we can commit the nested transaction
*/
static int lmdb_nested_transaction_commit(struct ldb_kv_private *ldb_kv)
{
int ret = lmdb_transaction_commit(ldb_kv);
return ret;
}
/*
* Cancel a sub transaction
* As lmdb supports nested transactions we can cancel the nested transaction
*/
static int lmdb_nested_transaction_cancel(struct ldb_kv_private *ldb_kv)
{
int ret = lmdb_transaction_cancel(ldb_kv);
return ret;
}
static struct kv_db_ops lmdb_key_value_ops = {
.options = LDB_KV_OPTION_STABLE_READ_LOCK,
.store = lmdb_store,
.delete = lmdb_delete,
.iterate = lmdb_traverse_fn,
.update_in_iterate = lmdb_update_in_iterate,
.fetch_and_parse = lmdb_parse_record,
.iterate_range = lmdb_iterate_range,
.lock_read = lmdb_lock_read,
.unlock_read = lmdb_unlock_read,
.begin_write = lmdb_transaction_start,
.prepare_write = lmdb_transaction_prepare_commit,
.finish_write = lmdb_transaction_commit,
.abort_write = lmdb_transaction_cancel,
.error = lmdb_error,
.errorstr = lmdb_errorstr,
.name = lmdb_name,
.has_changed = lmdb_changed,
.transaction_active = lmdb_transaction_active,
.get_size = lmdb_get_size,
.begin_nested_write = lmdb_nested_transaction_start,
.finish_nested_write = lmdb_nested_transaction_commit,
.abort_nested_write = lmdb_nested_transaction_cancel,
};
static const char *lmdb_get_path(const char *url)
{
const char *path;
/* parse the url */
if (strchr(url, ':')) {
if (strncmp(url, MDB_URL_PREFIX, MDB_URL_PREFIX_SIZE) != 0) {
return NULL;
}
path = url + MDB_URL_PREFIX_SIZE;
} else {
path = url;
}
return path;
}
static int lmdb_pvt_destructor(struct lmdb_private *lmdb)
{
struct lmdb_trans *ltx = NULL;
/* Check if this is a forked child */
if (getpid() != lmdb->pid) {
int fd = 0;
/*
* We cannot call mdb_env_close or commit any transactions,
* otherwise they might appear finished in the parent.
*
*/
if (mdb_env_get_fd(lmdb->env, &fd) == 0) {
close(fd);
}
/* Remove the pointer, so that no access should occur */
lmdb->env = NULL;
return 0;
}
/*
* Close the read transaction if it's open
*/
if (lmdb->read_txn != NULL) {
mdb_txn_abort(lmdb->read_txn);
}
if (lmdb->env == NULL) {
return 0;
}
/*
* Abort any currently active transactions
*/
ltx = lmdb_private_trans_head(lmdb);
while (ltx != NULL) {
mdb_txn_abort(ltx->tx);
trans_finished(lmdb, ltx);
ltx = lmdb_private_trans_head(lmdb);
}
lmdb->env = NULL;
return 0;
}
struct mdb_env_wrap {
struct mdb_env_wrap *next, *prev;
dev_t device;
ino_t inode;
MDB_env *env;
pid_t pid;
};
static struct mdb_env_wrap *mdb_list;
/* destroy the last connection to an mdb */
static int mdb_env_wrap_destructor(struct mdb_env_wrap *w)
{
mdb_env_close(w->env);
DLIST_REMOVE(mdb_list, w);
return 0;
}
static int lmdb_open_env(TALLOC_CTX *mem_ctx,
MDB_env **env,
struct ldb_context *ldb,
const char *path,
const size_t env_map_size,
unsigned int flags)
{
int ret;
unsigned int mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
/*
* MDB_NOSUBDIR implies there is a separate file called path and a
* separate lockfile called path-lock
*/
struct mdb_env_wrap *w;
struct stat st;
pid_t pid = getpid();
int fd = 0;
unsigned v;
if (stat(path, &st) == 0) {
for (w=mdb_list;w;w=w->next) {
if (st.st_dev == w->device &&
st.st_ino == w->inode &&
pid == w->pid) {
/*
* We must have only one MDB_env per process
*/
if (!talloc_reference(mem_ctx, w)) {
return ldb_oom(ldb);
}
*env = w->env;
return LDB_SUCCESS;
}
}
}
w = talloc(mem_ctx, struct mdb_env_wrap);
if (w == NULL) {
return ldb_oom(ldb);
}
ret = mdb_env_create(env);
if (ret != 0) {
ldb_asprintf_errstring(
ldb,
"Could not create MDB environment %s: %s\n",
path,
mdb_strerror(ret));
return ldb_mdb_err_map(ret);
}
if (env_map_size > 0) {
ret = mdb_env_set_mapsize(*env, env_map_size);
if (ret != 0) {
ldb_asprintf_errstring(
ldb,
"Could not set MDB mmap() size to %llu "
"on %s: %s\n",
(unsigned long long)(env_map_size),
path,
mdb_strerror(ret));
TALLOC_FREE(w);
return ldb_mdb_err_map(ret);
}
}
mdb_env_set_maxreaders(*env, 100000);
/*
* As we ensure that there is only one MDB_env open per database per
* process. We can not use the MDB_RDONLY flag, as another ldb may be
* opened in read write mode
*/
if (flags & LDB_FLG_NOSYNC) {
mdb_flags |= MDB_NOSYNC;
}
ret = mdb_env_open(*env, path, mdb_flags, 0644);
if (ret != 0) {
ldb_asprintf_errstring(ldb,
"Could not open DB %s: %s\n",
path, mdb_strerror(ret));
TALLOC_FREE(w);
return ldb_mdb_err_map(ret);
}
{
MDB_envinfo stat = {0};
ret = mdb_env_info (*env, &stat);
if (ret != 0) {
ldb_asprintf_errstring(
ldb,
"Could not get MDB environment stats %s: %s\n",
path,
mdb_strerror(ret));
return ldb_mdb_err_map(ret);
}
}
ret = mdb_env_get_fd(*env, &fd);
if (ret != 0) {
ldb_asprintf_errstring(ldb,
"Could not obtain DB FD %s: %s\n",
path, mdb_strerror(ret));
TALLOC_FREE(w);
return ldb_mdb_err_map(ret);
}
/* Just as for TDB: on exec, don't inherit the fd */
v = fcntl(fd, F_GETFD, 0);
if (v == -1) {
TALLOC_FREE(w);
return LDB_ERR_OPERATIONS_ERROR;
}
ret = fcntl(fd, F_SETFD, v | FD_CLOEXEC);
if (ret == -1) {
TALLOC_FREE(w);
return LDB_ERR_OPERATIONS_ERROR;
}
if (fstat(fd, &st) != 0) {
ldb_asprintf_errstring(
ldb,
"Could not stat %s:\n",
path);
TALLOC_FREE(w);
return LDB_ERR_OPERATIONS_ERROR;
}
w->env = *env;
w->device = st.st_dev;
w->inode = st.st_ino;
w->pid = pid;
talloc_set_destructor(w, mdb_env_wrap_destructor);
DLIST_ADD(mdb_list, w);
return LDB_SUCCESS;
}
static int lmdb_pvt_open(struct lmdb_private *lmdb,
struct ldb_context *ldb,
const char *path,
const size_t env_map_size,
unsigned int flags)
{
int ret;
int lmdb_max_key_length;
if (flags & LDB_FLG_DONT_CREATE_DB) {
struct stat st;
if (stat(path, &st) != 0) {
return LDB_ERR_UNAVAILABLE;
}
}
ret = lmdb_open_env(lmdb, &lmdb->env, ldb, path, env_map_size, flags);
if (ret != 0) {
return ret;
}
/* Close when lmdb is released */
talloc_set_destructor(lmdb, lmdb_pvt_destructor);
/* Store the original pid during the LMDB open */
lmdb->pid = getpid();
lmdb_max_key_length = mdb_env_get_maxkeysize(lmdb->env);
/* This will never happen, but if it does make sure to freak out */
if (lmdb_max_key_length < LDB_MDB_MAX_KEY_LENGTH) {
return ldb_operr(ldb);
}
return LDB_SUCCESS;
}
int lmdb_connect(struct ldb_context *ldb,
const char *url,
unsigned int flags,
const char *options[],
struct ldb_module **_module)
{
const char *path = NULL;
struct lmdb_private *lmdb = NULL;
struct ldb_kv_private *ldb_kv = NULL;
int ret;
size_t env_map_size = 0;
/*
* We hold locks, so we must use a private event context
* on each returned handle
*/
ldb_set_require_private_event_context(ldb);
path = lmdb_get_path(url);
if (path == NULL) {
ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid mdb URL '%s'", url);
return LDB_ERR_OPERATIONS_ERROR;
}
ldb_kv = talloc_zero(ldb, struct ldb_kv_private);
if (!ldb_kv) {
ldb_oom(ldb);
return LDB_ERR_OPERATIONS_ERROR;
}
lmdb = talloc_zero(ldb_kv, struct lmdb_private);
if (lmdb == NULL) {
TALLOC_FREE(ldb_kv);
return ldb_oom(ldb);
}
lmdb->ldb = ldb;
ldb_kv->kv_ops = &lmdb_key_value_ops;
{
const char *size = ldb_options_find(
ldb, ldb->options, "lmdb_env_size");
if (size != NULL) {
env_map_size = strtoull(size, NULL, 0);
}
}
ret = lmdb_pvt_open(lmdb, ldb, path, env_map_size, flags);
if (ret != LDB_SUCCESS) {
TALLOC_FREE(ldb_kv);
return ret;
}
ldb_kv->lmdb_private = lmdb;
if (flags & LDB_FLG_RDONLY) {
ldb_kv->read_only = true;
}
/*
* This maximum length becomes encoded in the index values so
* must never change even if LMDB starts to allow longer keys.
* The override option is max_key_len_for_self_test, and is
* used for testing only.
*/
ldb_kv->max_key_length = LDB_MDB_MAX_KEY_LENGTH;
return ldb_kv_init_store(
ldb_kv, "ldb_mdb backend", ldb, options, _module);
}