mirror of
https://github.com/samba-team/samba.git
synced 2025-11-08 16:23:49 +03:00
r26068: Import improved Python bindings for LDB, including tests.
This commit is contained in:
committed by
Stefan Metzmacher
parent
11a2cbbac5
commit
fc3a8caef7
@@ -137,6 +137,7 @@ sub check_library($$$)
|
||||
unless(defined($lib->{INSTALLDIR})) {
|
||||
$lib->{INSTALLDIR} = "LIBDIR";
|
||||
}
|
||||
|
||||
add_libreplace($lib);
|
||||
}
|
||||
|
||||
|
||||
@@ -79,9 +79,126 @@ m4_include(lib/events/config.m4)
|
||||
|
||||
dnl m4_include(auth/kerberos/config.m4)
|
||||
|
||||
AC_ARG_VAR([PYTHON_VERSION],[The installed Python
|
||||
version to use, for example '2.3'. This string
|
||||
will be appended to the Python interpreter
|
||||
canonical name.])
|
||||
|
||||
AC_PATH_PROG([PYTHON],[python[$PYTHON_VERSION]])
|
||||
if test -z "$PYTHON"; then
|
||||
AC_MSG_ERROR([No python found])
|
||||
fi
|
||||
|
||||
AC_SUBST(PYTHON)
|
||||
|
||||
#
|
||||
# Check for a version of Python >= 2.1.0
|
||||
#
|
||||
AC_MSG_CHECKING([for a version of Python >= '2.1.0'])
|
||||
ac_supports_python_ver=`$PYTHON -c "import sys, string; \
|
||||
ver = string.split(sys.version)[[0]]; \
|
||||
print ver >= '2.1.0'"`
|
||||
if test "$ac_supports_python_ver" != "True"; then
|
||||
AC_MSG_RESULT([no])
|
||||
AC_MSG_ERROR([No recent version of python found])
|
||||
else
|
||||
AC_MSG_RESULT([yes])
|
||||
fi
|
||||
|
||||
#
|
||||
# Check if you have distutils, else fail
|
||||
#
|
||||
AC_MSG_CHECKING([for the distutils Python package])
|
||||
ac_distutils_result=`$PYTHON -c "import distutils" 2>&1`
|
||||
if test -z "$ac_distutils_result"; then
|
||||
AC_MSG_RESULT([yes])
|
||||
else
|
||||
AC_MSG_RESULT([no])
|
||||
AC_MSG_ERROR([distutils not available])
|
||||
fi
|
||||
|
||||
#
|
||||
# Check for Python include path
|
||||
#
|
||||
AC_MSG_CHECKING([for Python include path])
|
||||
if test -z "$PYTHON_CPPFLAGS"; then
|
||||
python_path=`$PYTHON -c "import distutils.sysconfig; \
|
||||
print distutils.sysconfig.get_python_inc();"`
|
||||
if test -n "${python_path}"; then
|
||||
python_path="-I$python_path"
|
||||
fi
|
||||
PYTHON_CPPFLAGS=$python_path
|
||||
fi
|
||||
AC_MSG_RESULT([$PYTHON_CPPFLAGS])
|
||||
AC_SUBST([PYTHON_CPPFLAGS])
|
||||
|
||||
#
|
||||
# Check for Python library path
|
||||
#
|
||||
AC_MSG_CHECKING([for Python library path])
|
||||
if test -z "$PYTHON_LDFLAGS"; then
|
||||
# (makes two attempts to ensure we've got a version number
|
||||
# from the interpreter)
|
||||
py_version=`$PYTHON -c "from distutils.sysconfig import *; \
|
||||
from string import join; \
|
||||
print join(get_config_vars('VERSION'))"`
|
||||
if test "$py_version" == "[None]"; then
|
||||
if test -n "$PYTHON_VERSION"; then
|
||||
py_version=$PYTHON_VERSION
|
||||
else
|
||||
py_version=`$PYTHON -c "import sys; \
|
||||
print sys.version[[:3]]"`
|
||||
fi
|
||||
fi
|
||||
|
||||
PYTHON_LDFLAGS=`$PYTHON -c "from distutils.sysconfig import *; \
|
||||
from string import join; \
|
||||
print '-L' + get_python_lib(0,1), \
|
||||
'-lpython';"`$py_version
|
||||
fi
|
||||
AC_MSG_RESULT([$PYTHON_LDFLAGS])
|
||||
AC_SUBST([PYTHON_LDFLAGS])
|
||||
|
||||
#
|
||||
# Check for site packages
|
||||
#
|
||||
AC_MSG_CHECKING([for Python site-packages path])
|
||||
if test -z "$PYTHON_SITE_PKG"; then
|
||||
PYTHON_SITE_PKG=`$PYTHON -c "import distutils.sysconfig; \
|
||||
print distutils.sysconfig.get_python_lib(0,0);"`
|
||||
fi
|
||||
AC_MSG_RESULT([$PYTHON_SITE_PKG])
|
||||
AC_SUBST([PYTHON_SITE_PKG])
|
||||
|
||||
#
|
||||
# libraries which must be linked in when embedding
|
||||
#
|
||||
AC_MSG_CHECKING(python extra libraries)
|
||||
if test -z "$PYTHON_EXTRA_LIBS"; then
|
||||
PYTHON_EXTRA_LIBS=`$PYTHON -c "import distutils.sysconfig; \
|
||||
conf = distutils.sysconfig.get_config_var; \
|
||||
print conf('LOCALMODLIBS'), conf('LIBS')"`
|
||||
fi
|
||||
AC_MSG_RESULT([$PYTHON_EXTRA_LIBS])
|
||||
AC_SUBST(PYTHON_EXTRA_LIBS)
|
||||
|
||||
#
|
||||
# linking flags needed when embedding
|
||||
#
|
||||
AC_MSG_CHECKING(python extra linking flags)
|
||||
if test -z "$PYTHON_EXTRA_LDFLAGS"; then
|
||||
PYTHON_EXTRA_LDFLAGS=`$PYTHON -c "import distutils.sysconfig; \
|
||||
conf = distutils.sysconfig.get_config_var; \
|
||||
print conf('LINKFORSHARED')"`
|
||||
fi
|
||||
AC_MSG_RESULT([$PYTHON_EXTRA_LDFLAGS])
|
||||
AC_SUBST(PYTHON_EXTRA_LDFLAGS)
|
||||
|
||||
SMB_EXT_LIB(LIBPYTHON, [$PYTHON_LDFLAGS], [$PYTHON_CPPFLAGS])
|
||||
SMB_ENABLE(LIBPYTHON)
|
||||
|
||||
m4_include(auth/gensec/config.m4)
|
||||
m4_include(smbd/process_model.m4)
|
||||
m4_include(scripting/swig/config.m4)
|
||||
m4_include(ntvfs/posix/config.m4)
|
||||
m4_include(ntvfs/unixuid/config.m4)
|
||||
m4_include(lib/socket_wrapper/config.m4)
|
||||
|
||||
@@ -194,10 +194,9 @@ PRIVATE_DEPENDENCIES = \
|
||||
|
||||
#######################
|
||||
# Start LIBRARY swig_ldb
|
||||
[LIBRARY::swig_ldb]
|
||||
PUBLIC_DEPENDENCIES = LIBLDB DYNCONFIG
|
||||
LIBRARY_REALNAME = swig/_ldb.$(SHLIBEXT)
|
||||
OBJ_FILES = swig/ldb_wrap.o
|
||||
[PYTHON::swig_ldb]
|
||||
PUBLIC_DEPENDENCIES = LIBLDB LIBPYTHON
|
||||
SWIG_FILE = ldb.i
|
||||
# End LIBRARY swig_ldb
|
||||
#######################
|
||||
|
||||
|
||||
@@ -5,35 +5,3 @@ SMB_ENABLE(ldb_sqlite3, NO)
|
||||
if test x"$with_sqlite3_support" = x"yes"; then
|
||||
SMB_ENABLE(ldb_sqlite3, YES)
|
||||
fi
|
||||
|
||||
AC_MSG_CHECKING([for Python])
|
||||
|
||||
PYTHON=
|
||||
|
||||
AC_ARG_WITH(python,
|
||||
[ --with-python=PYTHONNAME build Python libraries],
|
||||
[ case "${withval-python}" in
|
||||
yes)
|
||||
PYTHON=python
|
||||
;;
|
||||
no)
|
||||
PYTHON=
|
||||
;;
|
||||
*)
|
||||
PYTHON=${withval-python}
|
||||
;;
|
||||
esac ])
|
||||
|
||||
if test x"$PYTHON" != "x"; then
|
||||
incdir=`python -c 'import sys; print "%s/include/python%d.%d" % (sys.prefix, sys.version_info[[0]], sys.version_info[[1]])'`
|
||||
CPPFLAGS="$CPPFLAGS -I $incdir"
|
||||
fi
|
||||
|
||||
if test x"$PYTHON" != "x"; then
|
||||
AC_MSG_RESULT([${withval-python}])
|
||||
else
|
||||
AC_MSG_RESULT(no)
|
||||
SMB_ENABLE(swig_ldb, NO)
|
||||
fi
|
||||
|
||||
AC_SUBST(PYTHON)
|
||||
|
||||
@@ -3,6 +3,6 @@ from distutils.core import setup
|
||||
from distutils.extension import Extension
|
||||
setup(name='ldb',
|
||||
version='1.0',
|
||||
ext_modules=[Extension('_ldb', ['swig/ldb.i'], include_dirs=['include'],
|
||||
ext_modules=[Extension('_ldb', ['ldb.i'], include_dirs=['include'],
|
||||
libraries=['ldb','ldap'])],
|
||||
)
|
||||
|
||||
@@ -1,178 +0,0 @@
|
||||
"""Provide a more Pythonic and object-oriented interface to ldb."""
|
||||
|
||||
#
|
||||
# Swig interface to Samba
|
||||
#
|
||||
# Copyright (C) Tim Potter 2006
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
#
|
||||
# Interface notes:
|
||||
#
|
||||
# - should an empty dn be represented as None, or an empty string?
|
||||
#
|
||||
# - should single-valued attributes be a string, or a list with one
|
||||
# element?
|
||||
#
|
||||
|
||||
from ldb import *
|
||||
|
||||
# Global initialisation
|
||||
|
||||
result = ldb_global_init()
|
||||
|
||||
if result != 0:
|
||||
raise LdbError, (result, 'ldb_global_init failed')
|
||||
|
||||
# Ldb exceptions
|
||||
|
||||
class LdbError(Exception):
|
||||
"""An exception raised when a ldb error occurs.
|
||||
The exception data is a tuple consisting of the ldb number and a
|
||||
string description of the error."""
|
||||
pass
|
||||
|
||||
# Ldb classes
|
||||
|
||||
class LdbMessage:
|
||||
"""A class representing a ldb message as a Python dictionary."""
|
||||
|
||||
def __init__(self):
|
||||
self.mem_ctx = talloc_init(None)
|
||||
self.msg = ldb_msg_new(self.mem_ctx)
|
||||
|
||||
def __del__(self):
|
||||
if self.mem_ctx is not None:
|
||||
talloc_free(self.mem_ctx)
|
||||
self.mem_ctx = None
|
||||
self.msg = None
|
||||
|
||||
# Make the dn attribute of the object dynamic
|
||||
|
||||
def __getattr__(self, attr):
|
||||
if attr == 'dn':
|
||||
return ldb_dn_linearize(None, self.msg.dn)
|
||||
return self.__dict__[attr]
|
||||
|
||||
def __setattr__(self, attr, value):
|
||||
if attr == 'dn':
|
||||
self.msg.dn = ldb_dn_explode(self.msg, value)
|
||||
if self.msg.dn == None:
|
||||
err = LDB_ERR_INVALID_DN_SYNTAX
|
||||
raise LdbError(err, ldb_strerror(err))
|
||||
return
|
||||
self.__dict__[attr] = value
|
||||
|
||||
# Get and set individual elements
|
||||
|
||||
def __getitem__(self, key):
|
||||
|
||||
elt = ldb_msg_find_element(self.msg, key)
|
||||
|
||||
if elt is None:
|
||||
raise KeyError, "No such attribute '%s'" % key
|
||||
|
||||
return [ldb_val_array_getitem(elt.values, i)
|
||||
for i in range(elt.num_values)]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
ldb_msg_remove_attr(self.msg, key)
|
||||
if type(value) in (list, tuple):
|
||||
[ldb_msg_add_value(self.msg, key, v) for v in value]
|
||||
else:
|
||||
ldb_msg_add_value(self.msg, key, value)
|
||||
|
||||
# Dictionary interface
|
||||
# TODO: move to iterator based interface
|
||||
|
||||
def len(self):
|
||||
return self.msg.num_elements
|
||||
|
||||
def keys(self):
|
||||
return [ldb_message_element_array_getitem(self.msg.elements, i).name
|
||||
for i in range(self.msg.num_elements)]
|
||||
|
||||
def values(self):
|
||||
return [self[k] for k in self.keys()]
|
||||
|
||||
def items(self):
|
||||
return [(k, self[k]) for k in self.keys()]
|
||||
|
||||
# Misc stuff
|
||||
|
||||
def sanity_check(self):
|
||||
return ldb_msg_sanity_check(self.msg)
|
||||
|
||||
class Ldb:
|
||||
"""A class representing a binding to a ldb file."""
|
||||
|
||||
def __init__(self, url, flags = 0):
|
||||
"""Initialise underlying ldb."""
|
||||
|
||||
self.mem_ctx = talloc_init('mem_ctx for ldb 0x%x' % id(self))
|
||||
self.ldb_ctx = ldb_init(self.mem_ctx)
|
||||
|
||||
result = ldb_connect(self.ldb_ctx, url, flags, None)
|
||||
|
||||
if result != LDB_SUCCESS:
|
||||
raise LdbError, (result, ldb_strerror(result))
|
||||
|
||||
def __del__(self):
|
||||
"""Called when the object is to be garbage collected."""
|
||||
self.close()
|
||||
|
||||
def close(self):
|
||||
"""Close down a ldb."""
|
||||
if self.mem_ctx is not None:
|
||||
talloc_free(self.mem_ctx)
|
||||
self.mem_ctx = None
|
||||
self.ldb_ctx = None
|
||||
|
||||
def _ldb_call(self, fn, *args):
|
||||
"""Call a ldb function with args. Raise a LdbError exception
|
||||
if the function returns a non-zero return value."""
|
||||
|
||||
result = fn(*args)
|
||||
|
||||
if result != LDB_SUCCESS:
|
||||
raise LdbError, (result, ldb_strerror(result))
|
||||
|
||||
def search(self, expression):
|
||||
"""Search a ldb for a given expression."""
|
||||
|
||||
self._ldb_call(ldb_search, self.ldb_ctx, None, LDB_SCOPE_DEFAULT,
|
||||
expression, None);
|
||||
|
||||
return [LdbMessage(ldb_message_ptr_array_getitem(result.msgs, ndx))
|
||||
for ndx in range(result.count)]
|
||||
|
||||
def delete(self, dn):
|
||||
"""Delete a dn."""
|
||||
|
||||
_dn = ldb_dn_explode(self.ldb_ctx, dn)
|
||||
|
||||
self._ldb_call(ldb_delete, self.ldb_ctx, _dn)
|
||||
|
||||
def rename(self, olddn, newdn):
|
||||
"""Rename a dn."""
|
||||
|
||||
_olddn = ldb_dn_explode(self.ldb_ctx, olddn)
|
||||
_newdn = ldb_dn_explode(self.ldb_ctx, newdn)
|
||||
|
||||
self._ldb_call(ldb_rename, self.ldb_ctx, _olddn, _newdn)
|
||||
|
||||
def add(self, m):
|
||||
self._ldb_call(ldb_add, self.ldb_ctx, m.msg)
|
||||
353
source/lib/ldb/tests/python/api.py
Executable file
353
source/lib/ldb/tests/python/api.py
Executable file
@@ -0,0 +1,353 @@
|
||||
#!/usr/bin/python
|
||||
# Simple tests for the ldb python API
|
||||
# Copyright (C) 2007 Jelmer Vernooij <jelmer@samba.org>
|
||||
import sys
|
||||
import unittest
|
||||
sys.path.append("swig")
|
||||
sys.path.append("build/lib.linux-i686-2.4")
|
||||
|
||||
import ldb
|
||||
|
||||
class NoContextTests(unittest.TestCase):
|
||||
def test_valid_attr_name(self):
|
||||
self.assertTrue(ldb.valid_attr_name("foo"))
|
||||
self.assertFalse(ldb.valid_attr_name("24foo"))
|
||||
|
||||
def test_timestring(self):
|
||||
self.assertEquals("19700101000000.0Z", ldb.timestring(0))
|
||||
self.assertEquals("20071119191012.0Z", ldb.timestring(1195499412))
|
||||
|
||||
def test_string_to_time(self):
|
||||
self.assertEquals(0, ldb.string_to_time("19700101000000.0Z"))
|
||||
self.assertEquals(1195499412, ldb.string_to_time("20071119191012.0Z"))
|
||||
|
||||
|
||||
class SimpleLdb(unittest.TestCase):
|
||||
def test_connect(self):
|
||||
ldb.Ldb("foo.tdb")
|
||||
|
||||
def test_connect_none(self):
|
||||
ldb.Ldb()
|
||||
|
||||
def test_connect_later(self):
|
||||
x = ldb.Ldb()
|
||||
x.connect("foo.tdb")
|
||||
|
||||
def test_set_create_perms(self):
|
||||
x = ldb.Ldb()
|
||||
x.set_create_perms(0600)
|
||||
|
||||
def test_set_modules_dir(self):
|
||||
x = ldb.Ldb()
|
||||
x.set_modules_dir("/tmp")
|
||||
|
||||
def test_search(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
self.assertEquals(len(l.search()), 1)
|
||||
|
||||
def test_search_attrs(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
self.assertEquals(len(l.search(ldb.Dn(l, ""), ldb.SCOPE_SUBTREE, "(dc=*)", ["dc"])), 0)
|
||||
|
||||
def test_opaque(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
l.set_opaque("my_opaque", l)
|
||||
self.assertTrue(l.get_opaque("my_opaque") is not None)
|
||||
self.assertEquals(None, l.get_opaque("unknown"))
|
||||
|
||||
def test_search_scope_base(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
self.assertEquals(len(l.search(ldb.Dn(l, "dc=foo"),
|
||||
ldb.SCOPE_ONELEVEL)), 0)
|
||||
|
||||
def test_delete(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
self.assertRaises(ldb.LdbError, lambda: l.delete(ldb.Dn(l, "dc=foo")))
|
||||
|
||||
def test_contains(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
self.assertFalse(ldb.Dn(l, "dc=foo") in l)
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
m = ldb.Message()
|
||||
m.dn = ldb.Dn(l, "dc=foo")
|
||||
m["b"] = ["a"]
|
||||
l.add(m)
|
||||
try:
|
||||
self.assertTrue(ldb.Dn(l, "dc=foo") in l)
|
||||
finally:
|
||||
l.delete(m.dn)
|
||||
|
||||
def test_get_config_basedn(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
self.assertEquals(None, l.get_config_basedn())
|
||||
|
||||
def test_get_root_basedn(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
self.assertEquals(None, l.get_root_basedn())
|
||||
|
||||
def test_get_schema_basedn(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
self.assertEquals(None, l.get_schema_basedn())
|
||||
|
||||
def test_get_default_basedn(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
self.assertEquals(None, l.get_default_basedn())
|
||||
|
||||
def test_add(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
m = ldb.Message()
|
||||
m.dn = ldb.Dn(l, "dc=foo")
|
||||
m["bla"] = "bla"
|
||||
self.assertEquals(len(l.search()), 1)
|
||||
l.add(m)
|
||||
try:
|
||||
self.assertEquals(len(l.search()), 2)
|
||||
finally:
|
||||
l.delete(ldb.Dn(l, "dc=foo"))
|
||||
|
||||
def test_add_dict(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
m = {"dn": ldb.Dn(l, "dc=foo"),
|
||||
"bla": "bla"}
|
||||
self.assertEquals(len(l.search()), 1)
|
||||
l.add(m)
|
||||
try:
|
||||
self.assertEquals(len(l.search()), 2)
|
||||
finally:
|
||||
l.delete(ldb.Dn(l, "dc=foo"))
|
||||
|
||||
def test_rename(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
m = ldb.Message()
|
||||
m.dn = ldb.Dn(l, "dc=foo")
|
||||
m["bla"] = "bla"
|
||||
self.assertEquals(len(l.search()), 1)
|
||||
l.add(m)
|
||||
try:
|
||||
l.rename(ldb.Dn(l, "dc=foo"), ldb.Dn(l, "dc=bar"))
|
||||
self.assertEquals(len(l.search()), 2)
|
||||
finally:
|
||||
l.delete(ldb.Dn(l, "dc=bar"))
|
||||
|
||||
def test_modify_delete(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
m = ldb.Message()
|
||||
m.dn = ldb.Dn(l, "dc=modify")
|
||||
m["bla"] = ["1234"]
|
||||
l.add(m)
|
||||
rm = l.search(m.dn)[0]
|
||||
self.assertEquals(["1234"], list(rm["bla"]))
|
||||
try:
|
||||
m = ldb.Message()
|
||||
m.dn = ldb.Dn(l, "dc=modify")
|
||||
m["bla"] = ldb.MessageElement([], ldb.CHANGETYPE_DELETE, "bla")
|
||||
l.modify(m)
|
||||
rm = l.search(m.dn)[0]
|
||||
self.assertEquals(1, len(rm))
|
||||
finally:
|
||||
l.delete(ldb.Dn(l, "dc=modify"))
|
||||
|
||||
def test_modify_add(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
m = ldb.Message()
|
||||
m.dn = ldb.Dn(l, "dc=modify")
|
||||
m["bla"] = ["1234"]
|
||||
l.add(m)
|
||||
try:
|
||||
m = ldb.Message()
|
||||
m.dn = ldb.Dn(l, "dc=modify")
|
||||
m["bla"] = ldb.MessageElement(["456"], ldb.CHANGETYPE_ADD, "bla")
|
||||
l.modify(m)
|
||||
rm = l.search(m.dn)[0]
|
||||
self.assertEquals(2, len(rm))
|
||||
self.assertEquals(["1234", "456"], list(rm["bla"]))
|
||||
finally:
|
||||
l.delete(ldb.Dn(l, "dc=modify"))
|
||||
|
||||
def test_modify_modify(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
m = ldb.Message()
|
||||
m.dn = ldb.Dn(l, "dc=modify")
|
||||
m["bla"] = ["1234", "456"]
|
||||
l.add(m)
|
||||
try:
|
||||
m = ldb.Message()
|
||||
m.dn = ldb.Dn(l, "dc=modify")
|
||||
m["bla"] = ldb.MessageElement(["456"], ldb.CHANGETYPE_MODIFY, "bla")
|
||||
l.modify(m)
|
||||
rm = l.search(m.dn)[0]
|
||||
self.assertEquals(2, len(rm))
|
||||
self.assertEquals(["1234"], list(rm["bla"]))
|
||||
finally:
|
||||
l.delete(ldb.Dn(l, "dc=modify"))
|
||||
|
||||
def test_transaction_commit(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
l.transaction_start()
|
||||
m = ldb.Message(ldb.Dn(l, "dc=foo"))
|
||||
m["foo"] = ["bar"]
|
||||
l.add(m)
|
||||
l.transaction_commit()
|
||||
l.delete(m.dn)
|
||||
|
||||
def test_transaction_cancel(self):
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
l.transaction_start()
|
||||
m = ldb.Message(ldb.Dn(l, "dc=foo"))
|
||||
m["foo"] = ["bar"]
|
||||
l.add(m)
|
||||
l.transaction_cancel()
|
||||
self.assertEquals(0, len(l.search(ldb.Dn(l, "dc=foo"))))
|
||||
|
||||
def test_set_debug(self):
|
||||
def my_report_fn(level, text):
|
||||
pass
|
||||
l = ldb.Ldb("foo.tdb")
|
||||
l.set_debug(my_report_fn)
|
||||
|
||||
|
||||
class DnTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.ldb = ldb.Ldb("foo.tdb")
|
||||
|
||||
def test_str(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
|
||||
self.assertEquals(x.__str__(), "dc=foo,bar=bloe")
|
||||
|
||||
def test_get_casefold(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
|
||||
self.assertEquals(x.get_casefold(), "DC=FOO,BAR=bloe")
|
||||
|
||||
def test_validate(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
|
||||
self.assertTrue(x.validate())
|
||||
|
||||
def test_parent(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
|
||||
self.assertEquals("bar=bloe", x.parent().__str__())
|
||||
|
||||
def test_compare(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
|
||||
y = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
|
||||
self.assertEquals(x, y)
|
||||
z = ldb.Dn(self.ldb, "dc=foo,bar=blie")
|
||||
self.assertNotEquals(z, y)
|
||||
|
||||
def test_is_valid(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo,dc=bloe")
|
||||
self.assertTrue(x.is_valid())
|
||||
x = ldb.Dn(self.ldb, "")
|
||||
# is_valid()'s return values appears to be a side effect of
|
||||
# some other ldb functions. yuck.
|
||||
# self.assertFalse(x.is_valid())
|
||||
|
||||
def test_is_special(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
|
||||
self.assertFalse(x.is_special())
|
||||
x = ldb.Dn(self.ldb, "@FOOBAR")
|
||||
self.assertTrue(x.is_special())
|
||||
|
||||
def test_check_special(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
|
||||
self.assertFalse(x.check_special("FOOBAR"))
|
||||
x = ldb.Dn(self.ldb, "@FOOBAR")
|
||||
self.assertTrue(x.check_special("@FOOBAR"))
|
||||
|
||||
def test_len(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
|
||||
self.assertEquals(2, len(x))
|
||||
x = ldb.Dn(self.ldb, "dc=foo")
|
||||
self.assertEquals(1, len(x))
|
||||
|
||||
def test_add_child(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
|
||||
self.assertTrue(x.add_child(ldb.Dn(self.ldb, "bla=bloe")))
|
||||
self.assertEquals("bla=bloe,dc=foo,bar=bloe", x.__str__())
|
||||
|
||||
def test_add_base(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
|
||||
self.assertTrue(x.add_base(ldb.Dn(self.ldb, "bla=bloe")))
|
||||
self.assertEquals("dc=foo,bar=bloe,bla=bloe", x.__str__())
|
||||
|
||||
def test_add(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo")
|
||||
y = ldb.Dn(self.ldb, "bar=bla")
|
||||
self.assertEquals("dc=foo,bar=bla", str(y + x))
|
||||
|
||||
def test_parse_ldif(self):
|
||||
msgs = self.ldb.parse_ldif("dn: foo=bar\n")
|
||||
msg = msgs.next()
|
||||
self.assertEquals("foo=bar", str(msg[1].dn))
|
||||
self.assertTrue(isinstance(msg[1], ldb.Message))
|
||||
|
||||
def test_parse_ldif_more(self):
|
||||
msgs = self.ldb.parse_ldif("dn: foo=bar\n\n\ndn: bar=bar")
|
||||
msg = msgs.next()
|
||||
self.assertEquals("foo=bar", str(msg[1].dn))
|
||||
msg = msgs.next()
|
||||
self.assertEquals("bar=bar", str(msg[1].dn))
|
||||
|
||||
def test_canonical_string(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
|
||||
self.assertEquals("/bloe/foo", x.canonical_str())
|
||||
|
||||
def test_canonical_ex_string(self):
|
||||
x = ldb.Dn(self.ldb, "dc=foo,bar=bloe")
|
||||
self.assertEquals("/bloe\nfoo", x.canonical_ex_str())
|
||||
|
||||
|
||||
class LdbMsgTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.msg = ldb.Message()
|
||||
|
||||
def test_init_dn(self):
|
||||
self.msg = ldb.Message(ldb.Dn(ldb.Ldb(), "dc=foo"))
|
||||
self.assertEquals("dc=foo", str(self.msg.dn))
|
||||
|
||||
def test_len(self):
|
||||
self.assertEquals(0, len(self.msg))
|
||||
|
||||
def test_notpresent(self):
|
||||
self.assertRaises(KeyError, lambda: self.msg["foo"])
|
||||
|
||||
def test_del(self):
|
||||
del self.msg["foo"]
|
||||
|
||||
def test_add_value(self):
|
||||
self.assertEquals(0, len(self.msg))
|
||||
self.msg["foo"] = ["foo"]
|
||||
self.assertEquals(1, len(self.msg))
|
||||
|
||||
def test_add_value_multiple(self):
|
||||
self.assertEquals(0, len(self.msg))
|
||||
self.msg["foo"] = ["foo", "bla"]
|
||||
self.assertEquals(1, len(self.msg))
|
||||
self.assertEquals(["foo", "bla"], list(self.msg["foo"]))
|
||||
|
||||
def test_set_value(self):
|
||||
self.msg["foo"] = ["fool"]
|
||||
self.assertEquals(["fool"], list(self.msg["foo"]))
|
||||
self.msg["foo"] = ["bar"]
|
||||
self.assertEquals(["bar"], list(self.msg["foo"]))
|
||||
|
||||
def test_keys(self):
|
||||
self.msg["foo"] = ["bla"]
|
||||
self.msg["bar"] = ["bla"]
|
||||
self.assertEquals(["foo", "bar"], self.msg.keys())
|
||||
|
||||
def test_dn(self):
|
||||
self.msg.dn = ldb.Dn(ldb.Ldb("foo.tdb"), "@BASEINFO")
|
||||
self.assertEquals("@BASEINFO", self.msg.dn.__str__())
|
||||
|
||||
|
||||
class MessageElementTests(unittest.TestCase):
|
||||
def test_cmp_element(self):
|
||||
x = ldb.MessageElement(["foo"])
|
||||
y = ldb.MessageElement(["foo"])
|
||||
z = ldb.MessageElement(["bzr"])
|
||||
self.assertEquals(x, y)
|
||||
self.assertNotEquals(x, z)
|
||||
|
||||
def test_create_iterable(self):
|
||||
x = ldb.MessageElement(["foo"])
|
||||
self.assertEquals(["foo"], list(x))
|
||||
@@ -49,3 +49,4 @@ RPC-FRSAPI # Not provided by Samba 4
|
||||
^samba4.NET-API-BECOME-DC.*$ # Fails
|
||||
WINBIND # FIXME: This should not be skipped
|
||||
NSS-TEST # Fails
|
||||
samba4.ldb.python # Fails to link properly
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
#######################
|
||||
# Start LIBRARY swig_dcerpc
|
||||
[LIBRARY::swig_dcerpc]
|
||||
ENABLE = NO
|
||||
LIBRARY_REALNAME = _dcerpc.$(SHLIBEXT)
|
||||
PUBLIC_DEPENDENCIES = LIBCLI_SMB NDR_MISC LIBSAMBA-UTIL LIBSAMBA-CONFIG dcerpc_samr RPC_NDR_LSA DYNCONFIG
|
||||
OBJ_FILES = dcerpc_wrap.o
|
||||
|
||||
@@ -290,3 +290,9 @@ if test -f $samba4bindir/nsstest
|
||||
then
|
||||
plantest "NSS-TEST using winbind" member $VALGRIND $samba4bindir/nsstest $samba4bindir/shared/libnss_winbind.so
|
||||
fi
|
||||
|
||||
# if trial is available, run the python tests:
|
||||
if which trial 2>/dev/null >/dev/null
|
||||
then
|
||||
plantest "ldb.python" none PYTHONPATH=bin/python trial lib/ldb/tests/python/api.py
|
||||
fi
|
||||
|
||||
Reference in New Issue
Block a user