1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-13 13:18:06 +03:00

r26587: Fix reading Samba 3 WINS database and initial work on group db, aliases and secrets.

This commit is contained in:
Jelmer Vernooij 2007-12-24 13:04:33 -06:00 committed by Stefan Metzmacher
parent fadab7c60b
commit c7c4cf258a
3 changed files with 196 additions and 40 deletions

View File

@ -73,6 +73,9 @@ class PolicyDatabase:
# FIXME: Read privileges as well
def close(self):
self.tdb.close()
GROUPDB_DATABASE_VERSION_V1 = 1 # native byte format.
GROUPDB_DATABASE_VERSION_V2 = 2 # le format.
@ -88,6 +91,20 @@ MEMBEROF_PREFIX = "MEMBEROF/"
class GroupMappingDatabase:
def __init__(self, file):
self.tdb = tdb.Tdb(file, flags=os.O_RDONLY)
assert self.tdb.fetch_int32("INFO/version\x00") in (GROUPDB_DATABASE_VERSION_V1, GROUPDB_DATABASE_VERSION_V2)
def groupsids(self):
for k in self.tdb.keys():
if k.startswith(GROUP_PREFIX):
yield k[len(GROUP_PREFIX):].rstrip("\0")
def aliases(self):
for k in self.tdb.keys():
if k.startswith(MEMBEROF_PREFIX):
yield k[len(MEMBEROF_PREFIX):].rstrip("\0")
def close(self):
self.tdb.close()
# High water mark keys
@ -102,40 +119,56 @@ class IdmapDatabase:
self.tdb = tdb.Tdb(file, flags=os.O_RDONLY)
assert self.tdb.fetch_int32("IDMAP_VERSION") == IDMAP_VERSION
def close(self):
self.tdb.close()
class SecretsDatabase:
def __init__(self, file):
self.tdb = tdb.Tdb(file, flags=os.O_RDONLY)
self.domains = {}
for k, v in self.tdb.items():
if k == "SECRETS/AUTH_PASSWORD":
self.auth_password = v
elif k == "SECRETS/AUTH_DOMAIN":
self.auth_domain = v
elif k == "SECRETS/AUTH_USER":
self.auth_user = v
elif k.startswith("SECRETS/SID/"):
pass # FIXME
elif k.startswith("SECRETS/DOMGUID/"):
pass # FIXME
elif k.startswith("SECRETS/LDAP_BIND_PW/"):
pass # FIXME
elif k.startswith("SECRETS/AFS_KEYFILE/"):
pass # FIXME
elif k.startswith("SECRETS/MACHINE_SEC_CHANNEL_TYPE/"):
pass # FIXME
elif k.startswith("SECRETS/MACHINE_LAST_CHANGE_TIME/"):
pass # FIXME
elif k.startswith("SECRETS/MACHINE_PASSWORD/"):
pass # FIXME
elif k.startswith("SECRETS/$MACHINE.ACC/"):
pass # FIXME
elif k.startswith("SECRETS/$DOMTRUST.ACC/"):
pass # FIXME
elif k == "INFO/random_seed":
self.random_seed = v
else:
raise "Unknown key %s in secrets database" % k
def get_auth_password(self):
return self.tdb.get("SECRETS/AUTH_PASSWORD")
def get_auth_domain(self):
return self.tdb.get("SECRETS/AUTH_DOMAIN")
def get_auth_user(self):
return self.tdb.get("SECRETS/AUTH_USER")
def get_dom_guid(self, host):
return self.tdb.get("SECRETS/DOMGUID/%s" % host)
def get_ldap_bind_pw(self, host):
return self.tdb.get("SECRETS/LDAP_BIND_PW/%s" % host)
def get_afs_keyfile(self, host):
return self.tdb.get("SECRETS/AFS_KEYFILE/%s" % host)
def get_machine_sec_channel_type(self, host):
return self.tdb.get("SECRETS/MACHINE_SEC_CHANNEL_TYPE/%s" % host)
def get_machine_last_change_time(self, host):
return self.tdb.get("SECRETS/MACHINE_LAST_CHANGE_TIME/%s" % host)
def get_machine_password(self, host):
return self.tdb.get("SECRETS/MACHINE_PASSWORD/%s" % host)
def get_machine_acc(self, host):
return self.tdb.get("SECRETS/$MACHINE.ACC/%s" % host)
def get_domtrust_acc(self, host):
return self.tdb.get("SECRETS/$DOMTRUST.ACC/%s" % host)
def get_random_seed(self):
return self.tdb.get("INFO/random_seed")
def get_sid(self, host):
return self.tdb.get("SECRETS/SID/%s" % host.upper())
def close(self):
self.tdb.close()
SHARE_DATABASE_VERSION_V1 = 1
SHARE_DATABASE_VERSION_V2 = 2
@ -149,6 +182,8 @@ class ShareInfoDatabase:
secdesc = self.tdb.get("SECDESC/%s" % name)
# FIXME: Run ndr_pull_security_descriptor
def close(self):
self.tdb.close()
ACB_DISABLED = 0x00000001
ACB_HOMDIRREQ = 0x00000002
@ -190,16 +225,79 @@ class Smbpasswd:
def __init__(self, file):
pass
TDBSAM_FORMAT_STRING_V0 = "ddddddBBBBBBBBBBBBddBBwdwdBwwd"
TDBSAM_FORMAT_STRING_V1 = "dddddddBBBBBBBBBBBBddBBwdwdBwwd"
TDBSAM_FORMAT_STRING_V2 = "dddddddBBBBBBBBBBBBddBBBwwdBwwd"
TDBSAM_USER_PREFIX = "USER_"
class TdbSam:
def __init__(self, file):
self.tdb = tdb.Tdb(file, flags=os.O_RDONLY)
self.version = self.tdb.fetch_uint32("INFO/version") or 0
assert self.version in (0, 1, 2)
def usernames(self):
for k in self.tdb.keys():
if k.startswith(TDBSAM_USER_PREFIX):
yield k[len(TDBSAM_USER_PREFIX):].rstrip("\0")
def close(self):
self.tdb.close()
def shellsplit(text):
"""Very simple shell-like line splitting.
:param text: Text to split.
:return: List with parts of the line as strings.
"""
ret = list()
inquotes = False
current = ""
for c in text:
if c == "\"":
inquotes = not inquotes
elif c in ("\t", "\n", " ") and not inquotes:
ret.append(current)
current = ""
else:
current += c
if current != "":
ret.append(current)
return ret
class WinsDatabase:
def __init__(self, file):
pass
self.entries = {}
f = open(file, 'r')
assert f.readline().rstrip("\n") == "VERSION 1 0"
for l in f.readlines():
if l[0] == "#": # skip comments
continue
entries = shellsplit(l.rstrip("\n"))
print entries
name = entries[0]
ttl = int(entries[1])
i = 2
ips = []
while "." in entries[i]:
ips.append(entries[i])
i+=1
nb_flags = entries[i]
assert not name in self.entries, "Name %s exists twice" % name
self.entries[name] = (ttl, ips, nb_flags)
f.close()
def __getitem__(self, name):
return self.entries[name]
def __len__(self):
return len(self.entries)
def close(self): # for consistency
pass
class Samba3:
def __init__(self, smbconfpath, libdir):

View File

@ -18,7 +18,8 @@
#
import unittest
from samba.samba3 import GroupMappingDatabase, Registry, PolicyDatabase
from samba.samba3 import (GroupMappingDatabase, Registry, PolicyDatabase, SecretsDatabase, TdbSam,
WinsDatabase)
import os
DATADIR=os.path.join(os.path.dirname(__file__), "../../../../../testdata/samba3")
@ -59,3 +60,55 @@ class GroupsTestCase(unittest.TestCase):
def setUp(self):
self.groupdb = GroupMappingDatabase(os.path.join(DATADIR, "group_mapping.tdb"))
def tearDown(self):
self.groupdb.close()
def test_group_length(self):
self.assertEquals(13, len(list(self.groupdb.groupsids())))
def test_groupsids(self):
sids = list(self.groupdb.groupsids())
self.assertTrue("S-1-5-32-544" in sids)
def test_alias_length(self):
self.assertEquals(0, len(list(self.groupdb.aliases())))
class SecretsDbTestCase(unittest.TestCase):
def setUp(self):
self.secretsdb = SecretsDatabase(os.path.join(DATADIR, "secrets.tdb"))
def tearDown(self):
self.secretsdb.close()
def test_get_sid(self):
self.assertTrue(self.secretsdb.get_sid("BEDWYR") is not None)
class TdbSamTestCase(unittest.TestCase):
def setUp(self):
self.samdb = TdbSam(os.path.join(DATADIR, "passdb.tdb"))
def tearDown(self):
self.samdb.close()
def test_usernames(self):
self.assertEquals(3, len(list(self.samdb.usernames())))
class WinsDatabaseTestCase(unittest.TestCase):
def setUp(self):
self.winsdb = WinsDatabase(os.path.join(DATADIR, "wins.dat"))
def test_length(self):
self.assertEquals(22, len(self.winsdb))
def test_first_entry(self):
self.assertEqual((1124185120, ["192.168.1.5"], "64R"), self.winsdb["ADMINISTRATOR#03"])
def tearDown(self):
self.winsdb.close()
# FIXME: smbpasswd
# FIXME: idmapdb
# FIXME: Shares

View File

@ -530,14 +530,7 @@ data: %d
if ldapurl is not None:
message("Enabling Samba3 LDAP mappings for SAM database")
samdb.modify("""
dn: @MODULES
changetype: modify
replace: @LIST
@LIST: samldb,operational,objectguid,rdn_name,samba3sam
""")
samdb.add({"dn": "@MAP=samba3sam", "@MAP_URL": ldapurl})
enable_samba3sam(samdb)
return ret
@ -551,3 +544,15 @@ def upgrade_verify(subobj, samba3, paths, message):
assert(len(msg) >= 1)
# FIXME
def enable_samba3sam(samdb):
samdb.modify("""
dn: @MODULES
changetype: modify
replace: @LIST
@LIST: samldb,operational,objectguid,rdn_name,samba3sam
""")
samdb.add({"dn": "@MAP=samba3sam", "@MAP_URL": ldapurl})