diff --git a/python/samba/netcmd/domain/auth/silo/__init__.py b/python/samba/netcmd/domain/auth/silo/__init__.py index 16672eb8416..64485f87c43 100644 --- a/python/samba/netcmd/domain/auth/silo/__init__.py +++ b/python/samba/netcmd/domain/auth/silo/__init__.py @@ -20,371 +20,16 @@ # along with this program. If not, see . # -import samba.getopt as options -from samba.domain.models import AuthenticationPolicy, AuthenticationSilo -from samba.domain.models.exceptions import ModelError -from samba.netcmd import Command, CommandError, Option, SuperCommand +from samba.netcmd import SuperCommand from .member import cmd_domain_auth_silo_member - - -class cmd_domain_auth_silo_list(Command): - """List authentication silos on the domain.""" - - synopsis = "%prog -H [options]" - - takes_optiongroups = { - "sambaopts": options.SambaOptions, - "credopts": options.CredentialsOptions, - "hostopts": options.HostOptions, - } - - takes_options = [ - Option("--json", help="Output results in JSON format.", - dest="output_format", action="store_const", const="json"), - ] - - def run(self, hostopts=None, sambaopts=None, credopts=None, - output_format=None): - - ldb = self.ldb_connect(hostopts, sambaopts, credopts) - - try: - silos = AuthenticationSilo.query(ldb) - except ModelError as e: - raise CommandError(e) - - # Using json output format gives more detail. - if output_format == "json": - self.print_json({silo.name: silo for silo in silos}) - else: - for silo in silos: - print(silo.name, file=self.outf) - - -class cmd_domain_auth_silo_view(Command): - """View an authentication silo on the domain.""" - - synopsis = "%prog -H [options]" - - takes_optiongroups = { - "sambaopts": options.SambaOptions, - "credopts": options.CredentialsOptions, - "hostopts": options.HostOptions, - } - - takes_options = [ - Option("--name", - help="Name of authentication silo to view (required).", - dest="name", action="store", type=str, required=True), - ] - - def run(self, hostopts=None, sambaopts=None, credopts=None, name=None): - - ldb = self.ldb_connect(hostopts, sambaopts, credopts) - - try: - silo = AuthenticationSilo.get(ldb, cn=name) - except ModelError as e: - raise CommandError(e) - - # Check if silo exists first. - if silo is None: - raise CommandError(f"Authentication silo {name} not found.") - - # Display silo as JSON. - self.print_json(silo.as_dict()) - - -class cmd_domain_auth_silo_create(Command): - """Create a new authentication silo on the domain.""" - - synopsis = "%prog -H [options]" - - takes_optiongroups = { - "sambaopts": options.SambaOptions, - "credopts": options.CredentialsOptions, - "hostopts": options.HostOptions, - } - - takes_options = [ - Option("--name", help="Name of authentication silo (required).", - dest="name", action="store", type=str, required=True), - Option("--description", - help="Optional description for authentication silo.", - dest="description", action="store", type=str), - Option("--user-authentication-policy", - help="User account authentication policy.", - dest="user_authentication_policy", action="store", type=str, - metavar="USER_POLICY"), - Option("--service-authentication-policy", - help="Managed service account authentication policy.", - dest="service_authentication_policy", action="store", type=str, - metavar="SERVICE_POLICY"), - Option("--computer-authentication-policy", - help="Computer authentication policy.", - dest="computer_authentication_policy", action="store", type=str, - metavar="COMPUTER_POLICY"), - Option("--protect", - help="Protect authentication silo from accidental deletion.", - dest="protect", action="store_true"), - Option("--unprotect", - help="Unprotect authentication silo from accidental deletion.", - dest="unprotect", action="store_true"), - Option("--audit", - help="Only audit silo policies.", - dest="audit", action="store_true"), - Option("--enforce", - help="Enforce silo policies.", - dest="enforce", action="store_true") - ] - - @staticmethod - def get_policy(ldb, name): - """Helper function to fetch auth policy or raise CommandError. - - :param ldb: Ldb connection - :param name: Either the DN or name of authentication policy - """ - try: - return AuthenticationPolicy.find(ldb, name) - except (ModelError, ValueError) as e: - raise CommandError(e) - - def run(self, hostopts=None, sambaopts=None, credopts=None, - name=None, description=None, - user_authentication_policy=None, - service_authentication_policy=None, - computer_authentication_policy=None, - protect=None, unprotect=None, - audit=None, enforce=None): - - if protect and unprotect: - raise CommandError("--protect and --unprotect cannot be used together.") - if audit and enforce: - raise CommandError("--audit and --enforce cannot be used together.") - - ldb = self.ldb_connect(hostopts, sambaopts, credopts) - - try: - silo = AuthenticationSilo.get(ldb, cn=name) - except ModelError as e: - raise CommandError(e) - - # Make sure silo doesn't already exist. - if silo is not None: - raise CommandError(f"Authentication silo {name} already exists.") - - # New silo object. - silo = AuthenticationSilo(cn=name, description=description) - - # Set user policy - if user_authentication_policy: - silo.user_authentication_policy = \ - self.get_policy(ldb, user_authentication_policy).dn - - # Set service policy - if service_authentication_policy: - silo.service_authentication_policy = \ - self.get_policy(ldb, service_authentication_policy).dn - - # Set computer policy - if computer_authentication_policy: - silo.computer_authentication_policy = \ - self.get_policy(ldb, computer_authentication_policy).dn - - # Either --enforce will be set or --audit but never both. - # The default if both are missing is enforce=True. - if enforce is not None: - silo.enforced = enforce - else: - silo.enforced = not audit - - # Create silo - try: - silo.save(ldb) - - if protect: - silo.protect(ldb) - except ModelError as e: - raise CommandError(e) - - # Authentication silo created successfully. - print(f"Created authentication silo: {name}", file=self.outf) - - -class cmd_domain_auth_silo_modify(Command): - """Modify an authentication silo on the domain.""" - - synopsis = "%prog -H [options]" - - takes_optiongroups = { - "sambaopts": options.SambaOptions, - "credopts": options.CredentialsOptions, - "hostopts": options.HostOptions, - } - - takes_options = [ - Option("--name", help="Name of authentication silo (required).", - dest="name", action="store", type=str, required=True), - Option("--description", - help="Optional description for authentication silo.", - dest="description", action="store", type=str), - Option("--user-authentication-policy", - help="User account authentication policy.", - dest="user_authentication_policy", action="store", type=str, - metavar="USER_POLICY"), - Option("--service-authentication-policy", - help="Managed service account authentication policy.", - dest="service_authentication_policy", action="store", type=str, - metavar="SERVICE_POLICY"), - Option("--computer-authentication-policy", - help="Computer authentication policy.", - dest="computer_authentication_policy", action="store", type=str, - metavar="COMPUTER_POLICY"), - Option("--protect", - help="Protect authentication silo from accidental deletion.", - dest="protect", action="store_true"), - Option("--unprotect", - help="Unprotect authentication silo from accidental deletion.", - dest="unprotect", action="store_true"), - Option("--audit", - help="Only audit silo policies.", - dest="audit", action="store_true"), - Option("--enforce", - help="Enforce silo policies.", - dest="enforce", action="store_true") - ] - - @staticmethod - def get_policy(ldb, name): - """Helper function to fetch auth policy or raise CommandError. - - :param ldb: Ldb connection - :param name: Either the DN or name of authentication policy - """ - try: - return AuthenticationPolicy.find(ldb, name) - except (ModelError, ValueError) as e: - raise CommandError(e) - - def run(self, hostopts=None, sambaopts=None, credopts=None, - name=None, description=None, - user_authentication_policy=None, - service_authentication_policy=None, - computer_authentication_policy=None, - protect=None, unprotect=None, - audit=None, enforce=None): - - if audit and enforce: - raise CommandError("--audit and --enforce cannot be used together.") - if protect and unprotect: - raise CommandError("--protect and --unprotect cannot be used together.") - - ldb = self.ldb_connect(hostopts, sambaopts, credopts) - - try: - silo = AuthenticationSilo.get(ldb, cn=name) - except ModelError as e: - raise CommandError(e) - - # Check if silo exists first. - if silo is None: - raise CommandError(f"Authentication silo {name} not found.") - - # Either --enforce will be set or --audit but never both. - if enforce: - silo.enforced = True - elif audit: - silo.enforced = False - - # Update the description. - if description is not None: - silo.description = description - - # Set or unset user policy. - if user_authentication_policy == "": - silo.user_authentication_policy = None - elif user_authentication_policy: - silo.user_authentication_policy = \ - self.get_policy(ldb, user_authentication_policy).dn - - # Set or unset service policy. - if service_authentication_policy == "": - silo.service_authentication_policy = None - elif service_authentication_policy: - silo.service_authentication_policy = \ - self.get_policy(ldb, service_authentication_policy).dn - - # Set or unset computer policy. - if computer_authentication_policy == "": - silo.computer_authentication_policy = None - elif computer_authentication_policy: - silo.computer_authentication_policy = \ - self.get_policy(ldb, computer_authentication_policy).dn - - # Update silo - try: - silo.save(ldb) - - if protect: - silo.protect(ldb) - elif unprotect: - silo.unprotect(ldb) - except ModelError as e: - raise CommandError(e) - - # Silo updated successfully. - print(f"Updated authentication silo: {name}", file=self.outf) - - -class cmd_domain_auth_silo_delete(Command): - """Delete an authentication silo on the domain.""" - - synopsis = "%prog -H [options]" - - takes_optiongroups = { - "sambaopts": options.SambaOptions, - "credopts": options.CredentialsOptions, - "hostopts": options.HostOptions, - } - - takes_options = [ - Option("--name", help="Name of authentication silo (required).", - dest="name", action="store", type=str, required=True), - Option("--force", help="Force delete protected authentication silo.", - dest="force", action="store_true") - ] - - def run(self, hostopts=None, sambaopts=None, credopts=None, name=None, - force=None): - - ldb = self.ldb_connect(hostopts, sambaopts, credopts) - - try: - silo = AuthenticationSilo.get(ldb, cn=name) - except ModelError as e: - raise CommandError(e) - - # Check if silo exists first. - if silo is None: - raise CommandError(f"Authentication silo {name} not found.") - - # Delete silo - try: - if force: - silo.unprotect(ldb) - - silo.delete(ldb) - except ModelError as e: - if not force: - raise CommandError( - f"{e}\nTry --force to delete protected authentication silos.") - else: - raise CommandError(e) - - # Authentication silo deleted successfully. - print(f"Deleted authentication silo: {name}", file=self.outf) +from .silo import ( + cmd_domain_auth_silo_create, + cmd_domain_auth_silo_delete, + cmd_domain_auth_silo_list, + cmd_domain_auth_silo_modify, + cmd_domain_auth_silo_view, +) class cmd_domain_auth_silo(SuperCommand): diff --git a/python/samba/netcmd/domain/auth/silo/silo.py b/python/samba/netcmd/domain/auth/silo/silo.py new file mode 100644 index 00000000000..028581a61c7 --- /dev/null +++ b/python/samba/netcmd/domain/auth/silo/silo.py @@ -0,0 +1,385 @@ +# Unix SMB/CIFS implementation. +# +# authentication silos - authentication silo management +# +# Copyright (C) Catalyst.Net Ltd. 2023 +# +# Written by Rob van der Linde +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import samba.getopt as options +from samba.domain.models import AuthenticationPolicy, AuthenticationSilo +from samba.domain.models.exceptions import ModelError +from samba.netcmd import Command, CommandError, Option + + +class cmd_domain_auth_silo_list(Command): + """List authentication silos on the domain.""" + + synopsis = "%prog -H [options]" + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "credopts": options.CredentialsOptions, + "hostopts": options.HostOptions, + } + + takes_options = [ + Option("--json", help="Output results in JSON format.", + dest="output_format", action="store_const", const="json"), + ] + + def run(self, hostopts=None, sambaopts=None, credopts=None, + output_format=None): + + ldb = self.ldb_connect(hostopts, sambaopts, credopts) + + try: + silos = AuthenticationSilo.query(ldb) + except ModelError as e: + raise CommandError(e) + + # Using json output format gives more detail. + if output_format == "json": + self.print_json({silo.name: silo for silo in silos}) + else: + for silo in silos: + print(silo.name, file=self.outf) + + +class cmd_domain_auth_silo_view(Command): + """View an authentication silo on the domain.""" + + synopsis = "%prog -H [options]" + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "credopts": options.CredentialsOptions, + "hostopts": options.HostOptions, + } + + takes_options = [ + Option("--name", + help="Name of authentication silo to view (required).", + dest="name", action="store", type=str, required=True), + ] + + def run(self, hostopts=None, sambaopts=None, credopts=None, name=None): + + ldb = self.ldb_connect(hostopts, sambaopts, credopts) + + try: + silo = AuthenticationSilo.get(ldb, cn=name) + except ModelError as e: + raise CommandError(e) + + # Check if silo exists first. + if silo is None: + raise CommandError(f"Authentication silo {name} not found.") + + # Display silo as JSON. + self.print_json(silo.as_dict()) + + +class cmd_domain_auth_silo_create(Command): + """Create a new authentication silo on the domain.""" + + synopsis = "%prog -H [options]" + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "credopts": options.CredentialsOptions, + "hostopts": options.HostOptions, + } + + takes_options = [ + Option("--name", help="Name of authentication silo (required).", + dest="name", action="store", type=str, required=True), + Option("--description", + help="Optional description for authentication silo.", + dest="description", action="store", type=str), + Option("--user-authentication-policy", + help="User account authentication policy.", + dest="user_authentication_policy", action="store", type=str, + metavar="USER_POLICY"), + Option("--service-authentication-policy", + help="Managed service account authentication policy.", + dest="service_authentication_policy", action="store", type=str, + metavar="SERVICE_POLICY"), + Option("--computer-authentication-policy", + help="Computer authentication policy.", + dest="computer_authentication_policy", action="store", type=str, + metavar="COMPUTER_POLICY"), + Option("--protect", + help="Protect authentication silo from accidental deletion.", + dest="protect", action="store_true"), + Option("--unprotect", + help="Unprotect authentication silo from accidental deletion.", + dest="unprotect", action="store_true"), + Option("--audit", + help="Only audit silo policies.", + dest="audit", action="store_true"), + Option("--enforce", + help="Enforce silo policies.", + dest="enforce", action="store_true") + ] + + @staticmethod + def get_policy(ldb, name): + """Helper function to fetch auth policy or raise CommandError. + + :param ldb: Ldb connection + :param name: Either the DN or name of authentication policy + """ + try: + return AuthenticationPolicy.find(ldb, name) + except (ModelError, ValueError) as e: + raise CommandError(e) + + def run(self, hostopts=None, sambaopts=None, credopts=None, + name=None, description=None, + user_authentication_policy=None, + service_authentication_policy=None, + computer_authentication_policy=None, + protect=None, unprotect=None, + audit=None, enforce=None): + + if protect and unprotect: + raise CommandError("--protect and --unprotect cannot be used together.") + if audit and enforce: + raise CommandError("--audit and --enforce cannot be used together.") + + ldb = self.ldb_connect(hostopts, sambaopts, credopts) + + try: + silo = AuthenticationSilo.get(ldb, cn=name) + except ModelError as e: + raise CommandError(e) + + # Make sure silo doesn't already exist. + if silo is not None: + raise CommandError(f"Authentication silo {name} already exists.") + + # New silo object. + silo = AuthenticationSilo(cn=name, description=description) + + # Set user policy + if user_authentication_policy: + silo.user_authentication_policy = \ + self.get_policy(ldb, user_authentication_policy).dn + + # Set service policy + if service_authentication_policy: + silo.service_authentication_policy = \ + self.get_policy(ldb, service_authentication_policy).dn + + # Set computer policy + if computer_authentication_policy: + silo.computer_authentication_policy = \ + self.get_policy(ldb, computer_authentication_policy).dn + + # Either --enforce will be set or --audit but never both. + # The default if both are missing is enforce=True. + if enforce is not None: + silo.enforced = enforce + else: + silo.enforced = not audit + + # Create silo + try: + silo.save(ldb) + + if protect: + silo.protect(ldb) + except ModelError as e: + raise CommandError(e) + + # Authentication silo created successfully. + print(f"Created authentication silo: {name}", file=self.outf) + + +class cmd_domain_auth_silo_modify(Command): + """Modify an authentication silo on the domain.""" + + synopsis = "%prog -H [options]" + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "credopts": options.CredentialsOptions, + "hostopts": options.HostOptions, + } + + takes_options = [ + Option("--name", help="Name of authentication silo (required).", + dest="name", action="store", type=str, required=True), + Option("--description", + help="Optional description for authentication silo.", + dest="description", action="store", type=str), + Option("--user-authentication-policy", + help="User account authentication policy.", + dest="user_authentication_policy", action="store", type=str, + metavar="USER_POLICY"), + Option("--service-authentication-policy", + help="Managed service account authentication policy.", + dest="service_authentication_policy", action="store", type=str, + metavar="SERVICE_POLICY"), + Option("--computer-authentication-policy", + help="Computer authentication policy.", + dest="computer_authentication_policy", action="store", type=str, + metavar="COMPUTER_POLICY"), + Option("--protect", + help="Protect authentication silo from accidental deletion.", + dest="protect", action="store_true"), + Option("--unprotect", + help="Unprotect authentication silo from accidental deletion.", + dest="unprotect", action="store_true"), + Option("--audit", + help="Only audit silo policies.", + dest="audit", action="store_true"), + Option("--enforce", + help="Enforce silo policies.", + dest="enforce", action="store_true") + ] + + @staticmethod + def get_policy(ldb, name): + """Helper function to fetch auth policy or raise CommandError. + + :param ldb: Ldb connection + :param name: Either the DN or name of authentication policy + """ + try: + return AuthenticationPolicy.find(ldb, name) + except (ModelError, ValueError) as e: + raise CommandError(e) + + def run(self, hostopts=None, sambaopts=None, credopts=None, + name=None, description=None, + user_authentication_policy=None, + service_authentication_policy=None, + computer_authentication_policy=None, + protect=None, unprotect=None, + audit=None, enforce=None): + + if audit and enforce: + raise CommandError("--audit and --enforce cannot be used together.") + if protect and unprotect: + raise CommandError("--protect and --unprotect cannot be used together.") + + ldb = self.ldb_connect(hostopts, sambaopts, credopts) + + try: + silo = AuthenticationSilo.get(ldb, cn=name) + except ModelError as e: + raise CommandError(e) + + # Check if silo exists first. + if silo is None: + raise CommandError(f"Authentication silo {name} not found.") + + # Either --enforce will be set or --audit but never both. + if enforce: + silo.enforced = True + elif audit: + silo.enforced = False + + # Update the description. + if description is not None: + silo.description = description + + # Set or unset user policy. + if user_authentication_policy == "": + silo.user_authentication_policy = None + elif user_authentication_policy: + silo.user_authentication_policy = \ + self.get_policy(ldb, user_authentication_policy).dn + + # Set or unset service policy. + if service_authentication_policy == "": + silo.service_authentication_policy = None + elif service_authentication_policy: + silo.service_authentication_policy = \ + self.get_policy(ldb, service_authentication_policy).dn + + # Set or unset computer policy. + if computer_authentication_policy == "": + silo.computer_authentication_policy = None + elif computer_authentication_policy: + silo.computer_authentication_policy = \ + self.get_policy(ldb, computer_authentication_policy).dn + + # Update silo + try: + silo.save(ldb) + + if protect: + silo.protect(ldb) + elif unprotect: + silo.unprotect(ldb) + except ModelError as e: + raise CommandError(e) + + # Silo updated successfully. + print(f"Updated authentication silo: {name}", file=self.outf) + + +class cmd_domain_auth_silo_delete(Command): + """Delete an authentication silo on the domain.""" + + synopsis = "%prog -H [options]" + + takes_optiongroups = { + "sambaopts": options.SambaOptions, + "credopts": options.CredentialsOptions, + "hostopts": options.HostOptions, + } + + takes_options = [ + Option("--name", help="Name of authentication silo (required).", + dest="name", action="store", type=str, required=True), + Option("--force", help="Force delete protected authentication silo.", + dest="force", action="store_true") + ] + + def run(self, hostopts=None, sambaopts=None, credopts=None, name=None, + force=None): + + ldb = self.ldb_connect(hostopts, sambaopts, credopts) + + try: + silo = AuthenticationSilo.get(ldb, cn=name) + except ModelError as e: + raise CommandError(e) + + # Check if silo exists first. + if silo is None: + raise CommandError(f"Authentication silo {name} not found.") + + # Delete silo + try: + if force: + silo.unprotect(ldb) + + silo.delete(ldb) + except ModelError as e: + if not force: + raise CommandError( + f"{e}\nTry --force to delete protected authentication silos.") + else: + raise CommandError(e) + + # Authentication silo deleted successfully. + print(f"Deleted authentication silo: {name}", file=self.outf)