mirror of
https://github.com/samba-team/samba.git
synced 2025-01-14 19:24:43 +03:00
9d8e7666c0
Without the deactivated links control, we assert certain conditions over DRS instead. Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz> Reviewed-by: Garming Sam <garming@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
213 lines
7.0 KiB
Python
213 lines
7.0 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
# Originally based on ./sam.py
|
|
import sys
|
|
import os
|
|
import base64
|
|
import random
|
|
import re
|
|
|
|
sys.path.insert(0, "bin/python")
|
|
import samba
|
|
from samba.tests.subunitrun import SubunitOptions, TestProgram
|
|
|
|
import samba.getopt as options
|
|
|
|
from samba.auth import system_session
|
|
import ldb
|
|
from samba.samdb import SamDB
|
|
from samba.dcerpc import misc
|
|
|
|
from samba.dcerpc import drsuapi, misc, drsblobs
|
|
from samba.drs_utils import drs_DsBind
|
|
from samba.ndr import ndr_unpack, ndr_pack
|
|
|
|
import drs_base
|
|
|
|
import time
|
|
|
|
|
|
class LATestException(Exception):
|
|
pass
|
|
|
|
class ExopBaseTest:
|
|
def _exop_req8(self, dest_dsa, invocation_id, nc_dn_str, exop,
|
|
replica_flags=0, max_objects=0):
|
|
req8 = drsuapi.DsGetNCChangesRequest8()
|
|
|
|
req8.destination_dsa_guid = misc.GUID(dest_dsa) if dest_dsa else misc.GUID()
|
|
req8.source_dsa_invocation_id = misc.GUID(invocation_id)
|
|
req8.naming_context = drsuapi.DsReplicaObjectIdentifier()
|
|
req8.naming_context.dn = unicode(nc_dn_str)
|
|
req8.highwatermark = drsuapi.DsReplicaHighWaterMark()
|
|
req8.highwatermark.tmp_highest_usn = 0
|
|
req8.highwatermark.reserved_usn = 0
|
|
req8.highwatermark.highest_usn = 0
|
|
req8.uptodateness_vector = None
|
|
req8.replica_flags = replica_flags
|
|
req8.max_object_count = max_objects
|
|
req8.max_ndr_size = 402116
|
|
req8.extended_op = exop
|
|
req8.fsmo_info = 0
|
|
req8.partial_attribute_set = None
|
|
req8.partial_attribute_set_ex = None
|
|
req8.mapping_ctr.num_mappings = 0
|
|
req8.mapping_ctr.mappings = None
|
|
|
|
return req8
|
|
|
|
def _ds_bind(self, server_name):
|
|
binding_str = "ncacn_ip_tcp:%s[seal]" % server_name
|
|
|
|
drs = drsuapi.drsuapi(binding_str, self.get_loadparm(), self.get_credentials())
|
|
(drs_handle, supported_extensions) = drs_DsBind(drs)
|
|
return (drs, drs_handle)
|
|
|
|
|
|
class LATests(drs_base.DrsBaseTestCase, ExopBaseTest):
|
|
|
|
def setUp(self):
|
|
super(LATests, self).setUp()
|
|
# DrsBaseTestCase sets up self.ldb_dc1, self.ldb_dc2
|
|
# we're only using one
|
|
self.samdb = self.ldb_dc1
|
|
|
|
self.base_dn = self.samdb.domain_dn()
|
|
self.ou = "OU=la,%s" % self.base_dn
|
|
if True:
|
|
try:
|
|
self.samdb.delete(self.ou, ['tree_delete:1'])
|
|
except ldb.LdbError, e:
|
|
pass
|
|
self.samdb.add({'objectclass': 'organizationalUnit',
|
|
'dn': self.ou})
|
|
|
|
self.dc_guid = self.samdb.get_invocation_id()
|
|
self.drs, self.drs_handle = self._ds_bind(self.dnsname_dc1)
|
|
|
|
def tearDown(self):
|
|
super(LATests, self).tearDown()
|
|
try:
|
|
self.samdb.delete(self.ou, ['tree_delete:1'])
|
|
except ldb.LdbError, e:
|
|
pass
|
|
|
|
def delete_user(self, user):
|
|
self.samdb.delete(user['dn'])
|
|
del self.users[self.users.index(user)]
|
|
|
|
def add_object(self, cn, objectclass):
|
|
dn = "CN=%s,%s" % (cn, self.ou)
|
|
self.samdb.add({'cn': cn,
|
|
'objectclass': objectclass,
|
|
'dn': dn})
|
|
|
|
return dn
|
|
|
|
def add_objects(self, n, objectclass, prefix=None):
|
|
if prefix is None:
|
|
prefix = objectclass
|
|
dns = []
|
|
for i in range(n):
|
|
dns.append(self.add_object("%s%d" % (prefix, i + 1),
|
|
objectclass))
|
|
return dns
|
|
|
|
def add_linked_attribute(self, src, dest, attr='member'):
|
|
m = ldb.Message()
|
|
m.dn = ldb.Dn(self.samdb, src)
|
|
m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_ADD, attr)
|
|
self.samdb.modify(m)
|
|
|
|
def remove_linked_attribute(self, src, dest, attr='member'):
|
|
m = ldb.Message()
|
|
m.dn = ldb.Dn(self.samdb, src)
|
|
m[attr] = ldb.MessageElement(dest, ldb.FLAG_MOD_DELETE, attr)
|
|
self.samdb.modify(m)
|
|
|
|
def attr_search(self, obj, expected, attr, scope=ldb.SCOPE_BASE):
|
|
|
|
req8 = self._exop_req8(dest_dsa=None,
|
|
invocation_id=self.dc_guid,
|
|
nc_dn_str=obj,
|
|
exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ)
|
|
|
|
level, ctr = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
|
|
expected_attid = getattr(drsuapi, 'DRSUAPI_ATTID_' + attr)
|
|
|
|
links = []
|
|
for link in ctr.linked_attributes:
|
|
if link.attid == expected_attid:
|
|
unpacked = ndr_unpack(drsuapi.DsReplicaObjectIdentifier3,
|
|
link.value.blob)
|
|
active = link.flags & drsuapi.DRSUAPI_DS_LINKED_ATTRIBUTE_FLAG_ACTIVE
|
|
links.append((str(unpacked.dn), bool(active)))
|
|
|
|
return links
|
|
|
|
|
|
def assert_forward_links(self, obj, expected, attr='member'):
|
|
results = self.attr_search(obj, expected, attr)
|
|
self.assertEqual(len(results), len(expected))
|
|
|
|
for k, v in results:
|
|
self.assertTrue(k in expected)
|
|
self.assertEqual(expected[k], v, "%s active flag should be %d, not %d" %
|
|
(k, expected[k], v))
|
|
|
|
def get_object_guid(self, dn):
|
|
res = self.samdb.search(dn,
|
|
scope=ldb.SCOPE_BASE,
|
|
attrs=['objectGUID'])
|
|
return str(misc.GUID(res[0]['objectGUID'][0]))
|
|
|
|
def test_links_all_delete_group(self):
|
|
u1, u2 = self.add_objects(2, 'user', 'u_all_del_group')
|
|
g1, g2 = self.add_objects(2, 'group', 'g_all_del_group')
|
|
g2guid = self.get_object_guid(g2)
|
|
|
|
self.add_linked_attribute(g1, u1)
|
|
self.add_linked_attribute(g2, u1)
|
|
self.add_linked_attribute(g2, u2)
|
|
|
|
self.samdb.delete(g2)
|
|
self.assert_forward_links(g1, {u1: True})
|
|
res = self.samdb.search('<GUID=%s>' % g2guid,
|
|
scope=ldb.SCOPE_BASE,
|
|
controls=['show_deleted:1'])
|
|
new_dn = res[0].dn
|
|
self.assert_forward_links(new_dn, {})
|
|
|
|
|
|
def test_la_links_delete_link(self):
|
|
u1, u2 = self.add_objects(2, 'user', 'u_del_link')
|
|
g1, g2 = self.add_objects(2, 'group', 'g_del_link')
|
|
|
|
self.add_linked_attribute(g1, u1)
|
|
self.add_linked_attribute(g2, u1)
|
|
self.add_linked_attribute(g2, u2)
|
|
|
|
self.remove_linked_attribute(g2, u1)
|
|
|
|
self.assert_forward_links(g1, {u1: True})
|
|
self.assert_forward_links(g2, {u1: False, u2: True})
|
|
|
|
self.add_linked_attribute(g2, u1)
|
|
self.remove_linked_attribute(g2, u2)
|
|
self.assert_forward_links(g2, {u1: True, u2: False})
|
|
self.remove_linked_attribute(g2, u1)
|
|
self.assert_forward_links(g2, {u1: False, u2: False})
|
|
|
|
def test_la_links_delete_user(self):
|
|
u1, u2 = self.add_objects(2, 'user', 'u_del_user')
|
|
g1, g2 = self.add_objects(2, 'group', 'g_del_user')
|
|
|
|
self.add_linked_attribute(g1, u1)
|
|
self.add_linked_attribute(g2, u1)
|
|
self.add_linked_attribute(g2, u2)
|
|
|
|
self.samdb.delete(u1)
|
|
|
|
self.assert_forward_links(g1, {})
|
|
self.assert_forward_links(g2, {u2: True})
|