1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-22 13:34:15 +03:00

ldb_mdb: Wrap mdb_env_open

Wrap mdb_env_open to ensure that we only have one MDB_env opened per
database in each process

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
This commit is contained in:
Gary Lockyer 2018-03-07 12:05:34 +13:00 committed by Andrew Bartlett
parent f9a12b6433
commit eb1bc2ec09

View File

@ -649,47 +649,81 @@ static int lmdb_pvt_destructor(struct lmdb_private *lmdb)
trans_finished(lmdb, ltx);
ltx = lmdb_private_trans_head(lmdb);
}
mdb_env_close(lmdb->env);
lmdb->env = NULL;
return 0;
}
static int lmdb_pvt_open(TALLOC_CTX *mem_ctx,
struct mdb_env_wrap {
struct mdb_env_wrap *next, *prev;
dev_t device;
ino_t inode;
MDB_env *env;
int pid;
};
static struct mdb_env_wrap *mdb_list;
/* destroy the last connection to an mdb */
static int mdb_env_wrap_destructor(struct mdb_env_wrap *w)
{
mdb_env_close(w->env);
DLIST_REMOVE(mdb_list, w);
return 0;
}
static int lmdb_open_env(TALLOC_CTX *mem_ctx,
MDB_env **env,
struct ldb_context *ldb,
const char *path,
unsigned int flags,
struct lmdb_private *lmdb)
unsigned int flags)
{
int ret;
unsigned int mdb_flags;
const size_t mmap_size = 8LL * GIGABYTE;
int lmdb_max_key_length;
unsigned int mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
/*
* MDB_NOSUBDIR implies there is a separate file called path and a
* separate lockfile called path-lock
*/
if (flags & LDB_FLG_DONT_CREATE_DB) {
struct stat st;
struct mdb_env_wrap *w;
struct stat st;
if (stat(path, &st) != 0) {
return LDB_ERR_UNAVAILABLE;
if (stat(path, &st) == 0) {
for (w=mdb_list;w;w=w->next) {
if (st.st_dev == w->device && st.st_ino == w->inode) {
/*
* We must have only one MDB_env per process
*/
if (!talloc_reference(mem_ctx, w)) {
return ldb_oom(ldb);
}
*env = w->env;
return LDB_SUCCESS;
}
}
}
ret = mdb_env_create(&lmdb->env);
w = talloc(mem_ctx, struct mdb_env_wrap);
if (w == NULL) {
return ldb_oom(ldb);
}
ret = mdb_env_create(env);
if (ret != 0) {
ldb_asprintf_errstring(
ldb,
"Could not create MDB environment %s: %s\n",
path,
mdb_strerror(ret));
return LDB_ERR_OPERATIONS_ERROR;
return ldb_mdb_err_map(ret);
}
/* Close when lmdb is released */
talloc_set_destructor(lmdb, lmdb_pvt_destructor);
ret = mdb_env_set_mapsize(lmdb->env,
mmap_size);
/*
* Currently we set a 8Gb maximum database size
* via the constant mmap_size above
*/
ret = mdb_env_set_mapsize(*env, mmap_size);
if (ret != 0) {
ldb_asprintf_errstring(
ldb,
@ -697,29 +731,71 @@ static int lmdb_pvt_open(TALLOC_CTX *mem_ctx,
(unsigned long long)(mmap_size),
path,
mdb_strerror(ret));
TALLOC_FREE(w);
return ldb_mdb_err_map(ret);
}
mdb_env_set_maxreaders(lmdb->env, 100000);
/* MDB_NOSUBDIR implies there is a separate file called path and a
* separate lockfile called path-lock
mdb_env_set_maxreaders(*env, 100000);
/*
* As we ensure that there is only one MDB_env open per database per
* process. We can not use the MDB_RDONLY flag, as another ldb may be
* opened in read write mode
*/
mdb_flags = MDB_NOSUBDIR|MDB_NOTLS;
if (flags & LDB_FLG_RDONLY) {
mdb_flags |= MDB_RDONLY;
}
if (flags & LDB_FLG_NOSYNC) {
mdb_flags |= MDB_NOSYNC;
}
ret = mdb_env_open(lmdb->env, path, mdb_flags, 0644);
ret = mdb_env_open(*env, path, mdb_flags, 0644);
if (ret != 0) {
ldb_asprintf_errstring(ldb,
"Could not open DB %s: %s\n",
path, mdb_strerror(ret));
talloc_free(lmdb);
TALLOC_FREE(w);
return ldb_mdb_err_map(ret);
}
if (stat(path, &st) != 0) {
ldb_asprintf_errstring(
ldb,
"Could not stat %s:\n",
path);
TALLOC_FREE(w);
return LDB_ERR_OPERATIONS_ERROR;
}
w->env = *env;
w->device = st.st_dev;
w->inode = st.st_ino;
talloc_set_destructor(w, mdb_env_wrap_destructor);
DLIST_ADD(mdb_list, w);
return LDB_SUCCESS;
}
static int lmdb_pvt_open(struct lmdb_private *lmdb,
struct ldb_context *ldb,
const char *path,
unsigned int flags)
{
int ret;
int lmdb_max_key_length;
if (flags & LDB_FLG_DONT_CREATE_DB) {
struct stat st;
if (stat(path, &st) != 0) {
return LDB_ERR_UNAVAILABLE;
}
}
ret = lmdb_open_env(lmdb, &lmdb->env, ldb, path, flags);
if (ret != 0) {
return ret;
}
/* Close when lmdb is released */
talloc_set_destructor(lmdb, lmdb_pvt_destructor);
/* Store the original pid during the LMDB open */
lmdb->pid = getpid();
@ -727,7 +803,6 @@ static int lmdb_pvt_open(TALLOC_CTX *mem_ctx,
/* This will never happen, but if it does make sure to freak out */
if (lmdb_max_key_length < LDB_MDB_MAX_KEY_LENGTH) {
talloc_free(lmdb);
return ldb_operr(ldb);
}
@ -763,17 +838,17 @@ int lmdb_connect(struct ldb_context *ldb,
return LDB_ERR_OPERATIONS_ERROR;
}
lmdb = talloc_zero(ldb, struct lmdb_private);
lmdb = talloc_zero(ltdb, struct lmdb_private);
if (lmdb == NULL) {
TALLOC_FREE(ltdb);
ldb_oom(ldb);
return LDB_ERR_OPERATIONS_ERROR;
return ldb_oom(ldb);
}
lmdb->ldb = ldb;
ltdb->kv_ops = &lmdb_key_value_ops;
ret = lmdb_pvt_open(ldb, ldb, path, flags, lmdb);
ret = lmdb_pvt_open(lmdb, ldb, path, flags);
if (ret != LDB_SUCCESS) {
TALLOC_FREE(ltdb);
return ret;
}