# spn management # # Copyright Matthieu Patou mat@samba.org 2010 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import samba.getopt as options import ldb from samba.samdb import SamDB from samba.auth import system_session from samba.netcmd.common import _get_user_realm_domain from samba.netcmd import ( Command, CommandError, SuperCommand, Option ) class cmd_spn_list(Command): """List spns of a given user.""" synopsis = "%prog [options]" takes_optiongroups = { "sambaopts": options.SambaOptions, "credopts": options.CredentialsOptions, "versionopts": options.VersionOptions, } takes_options = [ Option("-H", "--URL", help="LDB URL for database or target server", type=str, metavar="URL", dest="H"), ] takes_args = ["user"] def run(self, user, H=None, credopts=None, sambaopts=None, versionopts=None): lp = sambaopts.get_loadparm() creds = credopts.get_credentials(lp) sam = SamDB(H, session_info=system_session(), credentials=creds, lp=lp) # TODO once I understand how, use the domain info to naildown # to the correct domain (cleaneduser, realm, domain) = _get_user_realm_domain(user, sam) self.outf.write(cleaneduser + "\n") res = sam.search( expression="samaccountname=%s" % ldb.binary_encode(cleaneduser), scope=ldb.SCOPE_SUBTREE, attrs=["servicePrincipalName"]) if len(res) > 0: spns = res[0].get("servicePrincipalName") if spns is not None: self.outf.write( "User %s has the following servicePrincipalName: \n" % res[0].dn) for e in spns: self.outf.write("\t %s\n" % e) else: self.outf.write("User %s has no servicePrincipalName\n" % res[0].dn) else: raise CommandError("User %s not found" % user) class cmd_spn_add(Command): """Create a new spn.""" synopsis = "%prog [options]" takes_optiongroups = { "sambaopts": options.SambaOptions, "credopts": options.CredentialsOptions, "versionopts": options.VersionOptions, } takes_options = [ Option("-H", "--URL", help="LDB URL for database or target server", type=str, metavar="URL", dest="H"), ] takes_args = ["name", "user"] def run(self, name, user, H=None, credopts=None, sambaopts=None, versionopts=None): lp = sambaopts.get_loadparm() creds = credopts.get_credentials(lp) sam = SamDB(H, session_info=system_session(), credentials=creds, lp=lp) res = sam.search( expression="servicePrincipalName=%s" % ldb.binary_encode(name), scope=ldb.SCOPE_SUBTREE) if len(res) != 0: raise CommandError("Service principal %s already" " affected to another user" % name) (cleaneduser, realm, domain) = _get_user_realm_domain(user, sam) res = sam.search( expression="samaccountname=%s" % ldb.binary_encode(cleaneduser), scope=ldb.SCOPE_SUBTREE, attrs=["servicePrincipalName"]) if len(res) > 0: res[0].dn msg = ldb.Message() spns = res[0].get("servicePrincipalName") tab = [] found = False flag = ldb.FLAG_MOD_ADD if spns is not None: for e in spns: if str(e) == name: found = True tab.append(str(e)) flag = ldb.FLAG_MOD_REPLACE tab.append(name) msg.dn = res[0].dn msg["servicePrincipalName"] = ldb.MessageElement(tab, flag, "servicePrincipalName") if not found: sam.modify(msg) else: raise CommandError("Service principal %s already" " affected to %s" % (name, user)) else: raise CommandError("User %s not found" % user) class cmd_spn_delete(Command): """Delete a spn.""" synopsis = "%prog [user] [options]" takes_optiongroups = { "sambaopts": options.SambaOptions, "credopts": options.CredentialsOptions, "versionopts": options.VersionOptions, } takes_options = [ Option("-H", "--URL", help="LDB URL for database or target server", type=str, metavar="URL", dest="H"), ] takes_args = ["name", "user?"] def run(self, name, user=None, H=None, credopts=None, sambaopts=None, versionopts=None): lp = sambaopts.get_loadparm() creds = credopts.get_credentials(lp) sam = SamDB(H, session_info=system_session(), credentials=creds, lp=lp) res = sam.search( expression="servicePrincipalName=%s" % ldb.binary_encode(name), scope=ldb.SCOPE_SUBTREE, attrs=["servicePrincipalName", "samAccountName"]) if len(res) > 0: result = None if user is not None: (cleaneduser, realm, domain) = _get_user_realm_domain(user) for elem in res: if str(elem["samAccountName"]).lower() == cleaneduser: result = elem if result is None: raise CommandError("Unable to find user %s with" " spn %s" % (user, name)) else: if len(res) != 1: listUser = "" for r in res: listUser = "%s\n%s" % (listUser, str(r.dn)) raise CommandError("More than one user has the spn %s " "and no specific user was specified, list of users" " with this spn:%s" % (name, listUser)) else: result = res[0] msg = ldb.Message() spns = result.get("servicePrincipalName") tab = [] if spns is not None: for e in spns: if str(e) != name: tab.append(str(e)) flag = ldb.FLAG_MOD_REPLACE msg.dn = result.dn msg["servicePrincipalName"] = ldb.MessageElement(tab, flag, "servicePrincipalName") sam.modify(msg) else: raise CommandError("Service principal %s not affected" % name) class cmd_spn(SuperCommand): """Service Principal Name (SPN) management.""" subcommands = {} subcommands["add"] = cmd_spn_add() subcommands["list"] = cmd_spn_list() subcommands["delete"] = cmd_spn_delete()