1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-22 13:34:04 +03:00

Migrated all os managers to "modern" serialization model

This commit is contained in:
Adolfo Gómez García 2024-01-27 19:29:49 +01:00
parent e88c3f2fff
commit 7cfa9ca9f4
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
21 changed files with 564 additions and 363 deletions

View File

@ -162,7 +162,7 @@ class UpdateFromPreparing(StateUpdater):
# By default, if not valid publication, service will be marked for removal on preparation finished
state = State.REMOVABLE
if self.user_service.check_publication_validity():
if self.user_service.is_publication_valid():
logger.debug('Publication is valid for %s', self.user_service.friendly_name)
state = self.check_os_manager_related()

View File

@ -99,7 +99,6 @@ class Module(abc.ABC, UserInterface, Environmentable, Serializable):
module.
"""
__slots__ = ['_uuid']
# Import variable indicating this module is a base class not a real module
# Note that Module is not a real module, but a base class for all modules so is_base is not used on this class
# This means that is_base is set as a default here, but not checked for Module NEVER (but for subclasses)

View File

@ -182,14 +182,14 @@ class OSManager(Module):
return False
@classmethod
def transforms_user_or_password_for_service(cls: type['OSManager']) -> bool:
def is_credentials_modified_for_service(cls: type['OSManager']) -> bool:
"""
Helper method that informs if the os manager transforms the username and/or the password.
This is used from ServicePool
"""
return cls.process_user_password != OSManager.process_user_password
return cls.update_credentials != OSManager.update_credentials
def process_user_password(
def update_credentials(
self,
userservice: 'UserService', # pylint: disable=unused-argument
username: str,

View File

@ -812,7 +812,10 @@ class gui:
super()._set_value(gui.as_str(value))
def as_str(self):
"""Returns the password as string (stripped)"""
return gui.as_str(self.value).strip()
as_clean_str = as_str # Alias in facet, for coherence with other string fields
def __str__(self):
return '********'

View File

@ -33,7 +33,7 @@ import logging
import typing
import collections.abc
from django.utils.translation import gettext as _
from django.utils.translation import gettext_lazy as _
from django.db.models import Q
from cryptography.x509 import load_pem_x509_certificate
@ -214,7 +214,7 @@ def verify_ssl_field(
order: int = 92,
tab: 'types.ui.Tab|str|None|bool' = None,
old_field_name: typing.Optional[str] = None,
) -> ui.gui.CheckBoxField:
) -> ui.gui.CheckBoxField:
return ui.gui.CheckBoxField(
label=_('Verify SSL'),
default=default,
@ -362,3 +362,28 @@ def mfa_attr_field(order: int = 20, tab: 'types.ui.Tab|str|None|bool' = None) ->
tab=None if tab is False else None if tab is None else types.ui.Tab.MFA,
old_field_name='mfaAttr',
)
def on_logout_field(order: int = 10, tab: 'types.ui.Tab|str|None|bool' = False) -> ui.gui.ChoiceField:
return ui.gui.ChoiceField(
label=_('Logout Action'),
order=10,
readonly=True,
tooltip=_('What to do when user logs out from service'),
choices=[
ui.gui.choice_item('keep', _('Keep service assigned')),
ui.gui.choice_item('remove', _('Remove service')),
ui.gui.choice_item('keep-always', _('Keep service assigned even on new publication')),
],
tab=None if tab is False else None if tab is None else types.ui.Tab.ADVANCED,
default='keep',
)
def onlogout_field_is_persistent(fld: ui.gui.ChoiceField) -> bool:
return fld.value == 'keep-always'
def onlogout_field_is_removable(fld: ui.gui.ChoiceField) -> bool:
return fld.value == 'remove'
def onlogout_field_is_keep(fld: ui.gui.ChoiceField) -> bool:
return fld.value == 'keep'

View File

@ -183,7 +183,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
def transforms_user_or_password_for_service(self) -> bool:
if self.osmanager:
return self.osmanager.get_type().transforms_user_or_password_for_service()
return self.osmanager.get_type().is_credentials_modified_for_service()
return False
def process_user_password(self, username: str, password: str) -> tuple[str, str]:

View File

@ -402,7 +402,7 @@ class UserService(UUIDModel, properties.PropertiesMixin):
if serviceInstance.needs_manager is False or not servicePool.osmanager:
return (username, password)
return servicePool.osmanager.get_instance().process_user_password(self, username, password)
return servicePool.osmanager.get_instance().update_credentials(self, username, password)
def set_state(self, state: str) -> None:
"""
@ -615,13 +615,14 @@ class UserService(UUIDModel, properties.PropertiesMixin):
def actor_version(self, version: str) -> None:
self.properties['actor_version'] = version
def check_publication_validity(self) -> bool:
def is_publication_valid(self) -> bool:
"""
Returns True if this user service does not needs an publication, or if this deployed service publication is the current one
"""
return (
self.deployed_service.service and self.deployed_service.service.get_type().publication_type is None
) or self.publication == self.deployed_service.active_publication()
self.deployed_service.service.get_type().publication_type is None
or self.publication == self.deployed_service.active_publication()
)
# Utility for logging
def log(self, message: str, level: log.LogLevel = log.LogLevel.INFO) -> None:

View File

@ -40,7 +40,7 @@ from django.utils.translation import gettext_noop as _
from uds.core import osmanagers, types, consts
from uds.core.types.services import ServiceType as serviceTypes
from uds.core.ui import gui
from uds.core.util import log
from uds.core.util import fields, log
from uds.core.types.states import State
from uds.core.workers import initialize
@ -60,18 +60,7 @@ class LinuxOsManager(osmanagers.OSManager):
servicesType = serviceTypes.VDI
on_logout = gui.ChoiceField(
label=_('Logout Action'),
order=10,
readonly=True,
tooltip=_('What to do when user logs out from service'),
choices=[
gui.choice_item('keep', gettext_lazy('Keep service assigned')),
gui.choice_item('remove', gettext_lazy('Remove service')),
gui.choice_item('keep-always', gettext_lazy('Keep service assigned even on new publication')),
],
default='keep',
)
on_logout = fields.on_logout_field()
idle = gui.NumericField(
label=_("Max.Idle time"),
@ -94,7 +83,7 @@ class LinuxOsManager(osmanagers.OSManager):
)
def _flag_processes_unused_machines(self) -> None:
self.handles_unused_userservices = self.on_logout.value == 'remove'
self.handles_unused_userservices = fields.onlogout_field_is_removable(self.on_logout)
def initialize(self, values: 'Module.ValuesType') -> None:
self._flag_processes_unused_machines()
@ -105,13 +94,14 @@ class LinuxOsManager(osmanagers.OSManager):
def ignore_deadline(self) -> bool:
return not self.deadline.as_bool()
def is_removable_on_logout(self, userservice: 'UserService') -> bool:
'''
Says if a machine is removable on logout
'''
"""
if a machine is removable on logout
"""
if not userservice.in_use:
if (self.on_logout.as_str() == 'remove') or (
not userservice.check_publication_validity() and self.on_logout.as_str() == 'keep'
if fields.onlogout_field_is_removable(self.on_logout) or(
not userservice.is_publication_valid() and fields.onlogout_field_is_keep(self.on_logout)
):
return True
@ -154,17 +144,14 @@ class LinuxOsManager(osmanagers.OSManager):
userservice.remove()
def is_persistent(self) -> bool:
return self.on_logout.as_str() == 'keep-always'
return fields.onlogout_field_is_persistent(self.on_logout)
def check_state(self, userService: 'UserService') -> str:
logger.debug('Checking state for service %s', userService)
return State.RUNNING
def max_idle(self) -> typing.Optional[int]:
"""
On production environments, will return no idle for non removable machines
"""
if self.idle.as_int() <= 0: # or (settings.DEBUG is False and self._on_logout != 'remove'):
if self.idle.as_int() <= 0:
return None
return self.idle.as_int()

View File

@ -78,7 +78,7 @@ class LinuxRandomPassManager(LinuxOsManager):
if values['user_account'] == '':
raise exceptions.ui.ValidationError(_('Must provide an user account!!!'))
def process_user_password(
def update_credentials(
self, userservice: 'UserService', username: str, password: str
) -> tuple[str, str]:
if username == self.user_account.as_str():

View File

@ -97,7 +97,7 @@ class TestOSManager(osmanagers.OSManager):
'''
if not userService.in_use:
if (self.on_logout.value == 'remove') or (
not userService.check_publication_validity() and self.on_logout.value == 'keep'
not userService.is_publication_valid() and self.on_logout.value == 'keep'
):
return True

View File

@ -19,30 +19,18 @@ from django.utils.translation import gettext_noop as _
from uds.core import exceptions, osmanagers, types, consts
from uds.core.types.services import ServiceType as serviceTypes
from uds.core.ui import gui
from uds.core.util import log
from uds.core.util import log, fields
from uds.core.types.states import State
from uds.models import TicketStore
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from uds.core.module import Module
from uds.models import UserService
logger = logging.getLogger(__name__)
def scrambleMsg(msg: str) -> str:
"""
Simple scrambler so password are not seen at source page
"""
data = msg.encode('utf8')
res = b''
n = 0x32
for c in data[::-1]:
res += bytes([c ^ n])
n = (n + c) & 0xFF
return codecs.encode(res, 'hex').decode()
class WindowsOsManager(osmanagers.OSManager):
type_name = _('Windows Basic OS Manager')
type_type = 'WindowsManager'
@ -50,21 +38,7 @@ class WindowsOsManager(osmanagers.OSManager):
icon_file = 'wosmanager.png'
servicesType = serviceTypes.VDI
on_logout = gui.ChoiceField(
label=_('Logout Action'),
order=10,
readonly=True,
tooltip=_('What to do when user logs out from service'),
choices=[
{'id': 'keep', 'text': typing.cast(str, gettext_lazy('Keep service assigned'))},
{'id': 'remove', 'text': typing.cast(str, gettext_lazy('Remove service'))},
{
'id': 'keep-always',
'text': typing.cast(str, gettext_lazy('Keep service assigned even on new publication')),
},
],
default='keep',
)
on_logout = fields.on_logout_field()
idle = gui.NumericField(
label=_("Max.Idle time"),
@ -81,91 +55,57 @@ class WindowsOsManager(osmanagers.OSManager):
deadline = gui.CheckBoxField(
label=_('Calendar logout'),
order=90,
tooltip=_(
'If checked, UDS will try to logout user when the calendar for his current access expires'
),
tooltip=_('If checked, UDS will try to logout user when the calendar for his current access expires'),
tab=types.ui.Tab.ADVANCED,
default=True,
)
_on_logout: str
_idle: int
_deadline: bool
def _flag_processes_unused_machines(self):
self.handles_unused_userservices = fields.onlogout_field_is_removable(self.on_logout)
@staticmethod
def validateLen(length):
try:
length = int(length)
except Exception:
raise exceptions.ui.ValidationError(
_('Length must be numeric!!')
) from None
if length > 6 or length < 1:
raise exceptions.ui.ValidationError(
_('Length must be betwen 1 and 6')
)
return length
def validate(self, values: 'Module.ValuesType') -> None:
self._flag_processes_unused_machines()
def _set_handles_unused(self):
self.handles_unused_userservices = self._on_logout == 'remove'
def __init__(self, environment, values):
super().__init__(environment, values)
if values is not None:
self._on_logout = values['on_logout']
self._idle = int(values['idle'])
self._deadline = gui.as_bool(values['deadline'])
else:
self._on_logout = ''
self._idle = -1
self._deadline = True
self._set_handles_unused()
def is_removable_on_logout(self, userService: 'UserService') -> bool:
def is_removable_on_logout(self, userservice: 'UserService') -> bool:
"""
Says if a machine is removable on logout
"""
if not userService.in_use:
if (self._on_logout == 'remove') or (
not userService.check_publication_validity() and self._on_logout == 'keep'
if not userservice.in_use:
if fields.onlogout_field_is_removable(self.on_logout) or(
not userservice.is_publication_valid() and fields.onlogout_field_is_keep(self.on_logout)
):
return True
return False
def release(self, userService: 'UserService') -> None:
def release(self, userservice: 'UserService') -> None:
pass
def ignore_deadline(self) -> bool:
return not self._deadline
return not self.deadline.as_bool()
def get_name(self, userService: 'UserService') -> str:
return userService.get_name()
def get_name(self, userservice: 'UserService') -> str:
return userservice.get_name()
def do_log(self, userService: 'UserService', data: str, origin=log.LogSource.OSMANAGER):
def do_log(self, userservice: 'UserService', data: str, origin=log.LogSource.OSMANAGER):
# Stores a log associated with this service
try:
msg, levelStr = data.split('\t')
msg, level_str = data.split('\t')
try:
level = log.LogLevel.from_str(levelStr)
level = log.LogLevel.from_str(level_str)
except Exception:
logger.debug('Do not understand level %s', levelStr)
logger.debug('Do not understand level %s', level_str)
level = log.LogLevel.INFO
log.log(userService, level, msg, origin)
log.log(userservice, level, msg, origin)
except Exception:
logger.exception('WindowsOs Manager message log: ')
log.log(
userService, log.LogLevel.ERROR, f'do not understand {data}', origin
)
log.log(userservice, log.LogLevel.ERROR, f'do not understand {data}', origin)
def actor_data(
self, userService: 'UserService'
) -> collections.abc.MutableMapping[str, typing.Any]:
return {'action': 'rename', 'name': userService.get_name()} # No custom data
def actor_data(self, userservice: 'UserService') -> collections.abc.MutableMapping[str, typing.Any]:
return {'action': 'rename', 'name': userservice.get_name()} # No custom data
def process_user_password(
def update_credentials(
self, userService: 'UserService', username: str, password: str
) -> tuple[str, str]:
if userService.properties.get('sso_available') == '1':
@ -177,14 +117,10 @@ class WindowsOsManager(osmanagers.OSManager):
username, domain = username.split('\\')
creds = {'username': username, 'password': password, 'domain': domain}
ticket = TicketStore.create(
creds,validity=300
) # , owner=SECURE_OWNER, secure=True)
ticket = TicketStore.create(creds, validity=300) # , owner=SECURE_OWNER, secure=True)
return ticket, ''
return osmanagers.OSManager.process_user_password(
self, userService, username, password
)
return super().update_credentials(userService, username, password)
def handle_unused(self, userservice: 'UserService') -> None:
"""
@ -201,57 +137,37 @@ class WindowsOsManager(osmanagers.OSManager):
userservice.remove()
def is_persistent(self):
return self._on_logout == 'keep-always'
return fields.onlogout_field_is_persistent(self.on_logout)
def check_state(self, userservice: 'UserService') -> str:
# will alway return true, because the check is done by an actor callback
logger.debug('Checking state for service %s', userservice)
return State.RUNNING
def max_idle(self):
"""
On production environments, will return no idle for non removable machines
"""
if (
self._idle <= 0
): # or (settings.DEBUG is False and self._on_logout != 'remove'):
def max_idle(self) -> typing.Optional[int]:
if self.idle.as_int() <= 0:
return None
return self._idle
def marshal(self) -> bytes:
"""
Serializes the os manager data so we can store it in database
"""
return '\t'.join(
['v3', self._on_logout, str(self._idle), gui.bool_as_str(self._deadline)]
).encode('utf8')
return self.idle.as_int()
def unmarshal(self, data: bytes) -> None:
vals = data.decode('utf8').split('\t')
self._idle = -1
self._deadline = True
try:
if vals[0] == 'v1':
self._on_logout = vals[1]
elif vals[0] == 'v2':
self._on_logout, self._idle = vals[1], int(vals[2])
elif vals[0] == 'v3':
self._on_logout, self._idle, self._deadline = (
vals[1],
int(vals[2]),
gui.as_bool(vals[3]),
)
except Exception:
logger.exception(
'Exception unmarshalling. Some values left as default ones'
if not data.startswith(b'v'):
return super().unmarshal(data)
values = data.decode('utf8').split('\t')
self.idle.value = -1
self.deadline.value = True
if values[0] == 'v1':
self.on_logout.value = values[1]
elif values[0] == 'v2':
self.on_logout.value, self.idle.value = values[1], int(values[2])
elif values[0] == 'v3':
self.on_logout.value, self.idle.value, self.deadline.value = (
values[1],
int(values[2]),
gui.as_bool(values[3]),
)
self._set_handles_unused()
def get_fields_as_dict(self) -> 'gui.ValuesDictType':
return {
'on_logout': self._on_logout,
'idle': str(self._idle),
'deadline': gui.bool_as_str(self._deadline),
}
self._flag_processes_unused_machines()
# Flag that we need an upgrade (remarshal and save)
self.flag_for_upgrade()

View File

@ -42,8 +42,8 @@ import ldap
from django.utils.translation import gettext_noop as _
from uds.core.ui import gui
from uds.core.managers.crypto import CryptoManager
from uds.core import exceptions, consts
from uds.core.util import log
from uds.core import exceptions, consts, types
from uds.core.util import fields, log
from uds.core.util import ldaputil
from .windows import WindowsOsManager
@ -99,7 +99,7 @@ class WinDomainOsManager(WindowsOsManager):
label=_('Machine Group'),
order=7,
tooltip=_('Group to which add machines on creation. If empty, no group will be used.'),
tab=_('Advanced'),
tab=types.ui.Tab.ADVANCED,
)
remove_on_exit = gui.CheckBoxField(
label=_('Machine clean'),
@ -107,78 +107,58 @@ class WinDomainOsManager(WindowsOsManager):
tooltip=_(
'If checked, UDS will try to remove the machine from the domain USING the provided credentials'
),
tab=_('Advanced'),
tab=types.ui.Tab.ADVANCED,
default=True,
)
server_hint = gui.TextField(
length=64,
label=_('Server Hint'),
order=9,
tooltip=_('In case of several AD servers, which one is preferred (only used for group and account removal operations)'),
tab=_('Advanced'),
tooltip=_(
'In case of several AD servers, which one is preferred (only used for group and account removal operations)'
),
tab=types.ui.Tab.ADVANCED,
)
ssl = gui.CheckBoxField(
use_ssl = gui.CheckBoxField(
label=_('Use SSL'),
order=10,
tooltip=_('If checked, a ssl connection to Active Directory will be used'),
tab=_('Advanced'),
tab=types.ui.Tab.ADVANCED,
default=True,
old_field_name='ssl',
)
timeout = fields.timeout_field(order=11, default=10, tab=types.ui.Tab.ADVANCED)
# Inherits base "on_logout"
on_logout = WindowsOsManager.on_logout
idle = WindowsOsManager.idle
deadline = WindowsOsManager.deadline
_domain: str
_ou: str
_account: str
_pasword: str
_group: str
_server_hint: str
_remove_on_exit: str
_ssl: str
def __init__(self, environment: 'Environment', values: 'Module.ValuesType'):
super().__init__(environment, values)
def initialize(self, values: 'Module.ValuesType') -> None:
if values:
if values['domain'] == '':
raise exceptions.ui.ValidationError(_('Must provide a domain!'))
# if values['domain'].find('.') == -1:
# raise exceptions.ValidationException(_('Must provide domain in FQDN'))
if values['account'] == '':
raise exceptions.ui.ValidationError(_('Must provide an account to add machines to domain!'))
if values['account'].find('\\') != -1:
raise exceptions.ui.ValidationError(_('DOM\\USER form is not allowed!'))
if values['password'] == '':
raise exceptions.ui.ValidationError(_('Must provide a password for the account!'))
self._domain = values['domain']
self._ou = values['ou'].strip()
self._account = values['account']
self._password = values['password']
self._group = values['grp'].strip()
self._server_hint = values['server_hint'].strip()
self._ssl = 'y' if values['ssl'] else 'n'
self._remove_on_exit = 'y' if values['remove_on_exit'] else 'n'
else:
self._domain = ''
self._ou = ''
self._account = ''
self._password = '' # nosec: no encoded password
self._group = ''
self._server_hint = ''
self._remove_on_exit = 'n'
self._ssl = 'n'
# Some cleaning of input data (remove spaces, etc..)
for fld in (self.domain, self.account, self.ou, self.grp, self.server_hint):
fld.value = fld.as_clean_str().replace(' ', '')
# self._ou = self._ou.replace(' ', ''), do not remove spaces
if self._domain != '' and self._ou != '':
lpath = 'dc=' + ',dc='.join((s.lower() for s in self._domain.split('.')))
if self._ou.lower().find(lpath) == -1:
self._ou += ',' + lpath
if self.domain.as_str() == '':
raise exceptions.ui.ValidationError(_('Must provide a domain!'))
if self.account.as_str() == '':
raise exceptions.ui.ValidationError(_('Must provide an account to add machines to domain!'))
if self.account.as_str().find('\\') != -1:
raise exceptions.ui.ValidationError(_('DOM\\USER form is not allowed! for account'))
if self.password.as_str() == '':
raise exceptions.ui.ValidationError(_('Must provide a password for the account!'))
# Fix ou based on domain if needed
if self.domain.as_str() and self.ou.as_str():
lpath = 'dc=' + ',dc='.join((s.lower() for s in self.domain.as_str().split('.')))
if lpath not in self.ou.as_str().lower(): # If not in ou, add it
self.ou.value = self.ou.as_str() + ',' + lpath
def _get_server_list(self) -> collections.abc.Iterable[tuple[str, int]]:
if self._server_hint != '':
yield (self._server_hint, 389)
if self.server_hint.as_str() != '':
yield (self.server_hint.as_str(), 389)
server: typing.Any
@ -187,7 +167,7 @@ class WinDomainOsManager(WindowsOsManager):
for server in reversed(
sorted(
iter(dns.resolver.resolve('_ldap._tcp.' + self._domain, 'SRV')),
iter(dns.resolver.resolve('_ldap._tcp.' + self.domain.as_str(), 'SRV')),
key=key,
)
):
@ -205,33 +185,33 @@ class WinDomainOsManager(WindowsOsManager):
if servers is None:
servers = self._get_server_list()
account = self._account
account = self.account.as_str()
if account.find('@') == -1:
account += '@' + self._domain
account += '@' + self.domain.as_str()
_str = "No servers found"
_error_string = "No servers found"
# And if not possible, try using NON-SSL
for server in servers:
ssl = self._ssl == 'y'
ssl = self.use_ssl.as_bool() == 'y'
port = server[1] if not ssl else -1
try:
return ldaputil.connection(
account,
self._password,
self.account.as_str(),
server[0],
port=port,
ssl=ssl,
timeout=10,
timeout=self.timeout.as_int(),
debug=False,
)
except Exception as e:
_str = f'Error: {e}'
_error_string = f'Error: {e}'
raise ldaputil.LDAPError(_str)
raise ldaputil.LDAPError(_error_string)
def _get_group(self, ldapConnection: 'ldaputil.LDAPObject') -> typing.Optional[str]:
base = ','.join(['DC=' + i for i in self._domain.split('.')])
group = ldaputil.escape(self._group)
base = ','.join(['DC=' + i for i in self.domain.as_str().split('.')])
group = ldaputil.escape(self.grp.as_str())
obj: typing.Optional[collections.abc.MutableMapping[str, typing.Any]]
try:
obj = next(
@ -252,10 +232,10 @@ class WinDomainOsManager(WindowsOsManager):
return obj['dn'] # Returns the DN
def _get_machine(self, ldap_connection: 'ldaputil.LDAPObject', machine_name: str) -> typing.Optional[str]:
# if self._ou:
# base = self._ou
# if self.ou.as_str():
# base = self.ou.as_str()
# else:
base = ','.join(['DC=' + i for i in self._domain.split('.')])
base = ','.join(['DC=' + i for i in self.domain.as_str().split('.')])
fltr = f'(&(objectClass=computer)(sAMAccountName={ldaputil.escape(machine_name)}$))'
obj: typing.Optional[collections.abc.MutableMapping[str, typing.Any]]
@ -271,10 +251,10 @@ class WinDomainOsManager(WindowsOsManager):
def ready_notified(self, userservice: 'UserService') -> None:
# No group to add
if self._group == '':
if self.grp.as_str() == '':
return
if '.' not in self._domain:
if '.' not in self.domain.as_str():
logger.info('Adding to a group for a non FQDN domain is not supported')
return
@ -295,11 +275,11 @@ class WinDomainOsManager(WindowsOsManager):
error = None
break
except dns.resolver.NXDOMAIN: # No domain found, log it and pass
logger.warning('Could not find _ldap._tcp.%s', self._domain)
logger.warning('Could not find _ldap._tcp.%s', self.domain.as_str())
log.log(
userservice,
log.LogLevel.WARNING,
f'Could not remove machine from domain (_ldap._tcp.{self._domain} not found)',
f'Could not remove machine from domain (_ldap._tcp.{self.domain.as_str()} not found)',
log.LogSource.OSMANAGER,
)
except ldap.ALREADY_EXISTS: # type: ignore # (valid)
@ -308,9 +288,9 @@ class WinDomainOsManager(WindowsOsManager):
break
except ldaputil.LDAPError:
logger.exception('Ldap Exception caught')
error = f'Could not add machine (invalid credentials? for {self._account})'
error = f'Could not add machine (invalid credentials? for {self.account.as_str()})'
except Exception as e:
error = f'Could not add machine {userservice.friendly_name} to group {self._group}: {e}'
error = f'Could not add machine {userservice.friendly_name} to group {self.grp.as_str()}: {e}'
# logger.exception('Ldap Exception caught')
if error:
@ -321,10 +301,10 @@ class WinDomainOsManager(WindowsOsManager):
super().release(userservice)
# If no removal requested, just return
if self._remove_on_exit != 'y':
if self.remove_on_exit.as_bool() is False:
return
if '.' not in self._domain:
if '.' not in self.domain.as_str():
# logger.info('Releasing from a not FQDN domain is not supported')
log.log(
userservice,
@ -337,11 +317,11 @@ class WinDomainOsManager(WindowsOsManager):
try:
ldap_connection = self._connect_ldap()
except dns.resolver.NXDOMAIN: # No domain found, log it and pass
logger.warning('Could not find _ldap._tcp.%s', self._domain)
logger.warning('Could not find _ldap._tcp.%s', self.domain.as_str())
log.log(
userservice,
log.LogLevel.WARNING,
f'Could not remove machine from domain (_ldap._tcp.{self._domain} not found)',
f'Could not remove machine from domain (_ldap._tcp.{self.domain.as_str()} not found)',
log.LogSource.OSMANAGER,
)
return
@ -370,7 +350,7 @@ class WinDomainOsManager(WindowsOsManager):
raise Exception(f'Machine {userservice.friendly_name} not found on AD (permissions?)')
ldaputil.recursive_delete(ldap_connection, res)
except IndexError:
logger.error('Error deleting %s from BASE %s', userservice.friendly_name, self._ou)
logger.error('Error deleting %s from BASE %s', userservice.friendly_name, self.ou.as_str())
except Exception:
logger.exception('Deleting from AD: ')
@ -381,21 +361,23 @@ class WinDomainOsManager(WindowsOsManager):
return _('Check error: {}').format(e)
except dns.resolver.NXDOMAIN:
return _('Could not find server parameters (_ldap._tcp.{0} can\'t be resolved)').format(
self._domain
self.domain.as_str()
)
except Exception as e:
logger.exception('Exception ')
return str(e)
try:
ldap_connection.search_st(self._ou, ldap.SCOPE_BASE) # type: ignore # (valid)
ldap_connection.search_st(self.ou.as_str(), ldap.SCOPE_BASE) # type: ignore # (valid)
except ldaputil.LDAPError as e:
return _('Check error: {}').format(e)
# Group
if self._group != '':
if self.grp.as_str() != '':
if self._get_group(ldap_connection) is None:
return _('Check Error: group "{}" not found (using "cn" to locate it)').format(self._group)
return _('Check Error: group "{}" not found (using "cn" to locate it)').format(
self.grp.as_str()
)
return _('Server check was successful')
@ -411,25 +393,25 @@ class WinDomainOsManager(WindowsOsManager):
except ldaputil.LDAPError as e:
return [False, _('Could not access AD using LDAP ({0})').format(e)]
ou = wd._ou
ou = wd.ou.as_str()
if ou == '':
ou = 'cn=Computers,dc=' + ',dc='.join(wd._domain.split('.'))
ou = 'cn=Computers,dc=' + ',dc='.join(wd.domain.as_str().split('.'))
logger.info('Checking %s with ou %s', wd._domain, ou)
logger.info('Checking %s with ou %s', wd.domain.as_str(), ou)
r = ldap_connection.search_st(ou, ldap.SCOPE_BASE) # type: ignore # (valid)
logger.info('Result of search: %s', r)
except ldaputil.LDAPError:
if wd and not wd._ou:
if wd and not wd.ou.as_str():
return [
False,
_('The default path {0} for computers was not found!!!').format(wd._ou),
_('The default path {0} for computers was not found!!!').format(wd.ou.as_str()),
]
return [False, _('The ou path {0} was not found!!!').format(wd._ou)]
return [False, _('The ou path {0} was not found!!!').format(wd.ou.as_str())]
except dns.resolver.NXDOMAIN:
return [
True,
_('Could not check parameters (_ldap._tcp.{0} can\'r be resolved)').format(wd._domain),
_('Could not check parameters (_ldap._tcp.{0} can\'r be resolved)').format(wd.domain.as_str()),
]
except Exception as e:
logger.exception('Exception ')
@ -441,76 +423,47 @@ class WinDomainOsManager(WindowsOsManager):
return {
'action': 'rename_ad',
'name': userservice.get_name(),
# Repeat data, to keep compat with old versions of Actor
# Will be removed in a couple of versions
'ad': self._domain,
'ou': self._ou,
'username': self._account,
'password': self._password,
'ad': self.domain.as_str(),
'ou': self.ou.as_str(),
'username': self.account.as_str(),
'password': self.account.as_str(),
'custom': {
'domain': self._domain,
'ou': self._ou,
'username': self._account,
'password': self._password,
'domain': self.domain.as_str(),
'ou': self.ou.as_str(),
'username': self.account.as_str(),
'password': self.account.as_str(),
},
}
def marshal(self) -> bytes:
"""
Serializes the os manager data so we can store it in database
"""
base = codecs.encode(super().marshal(), 'hex').decode()
return '\t'.join(
[
'v4',
self._domain,
self._ou,
self._account,
CryptoManager().encrypt(self._password),
base,
self._group,
self._server_hint,
self._ssl,
self._remove_on_exit,
]
).encode('utf8')
def unmarshal(self, data: bytes) -> None:
if not data.startswith(b'v'):
return super().unmarshal(data)
values = data.decode('utf8').split('\t')
if values[0] in ('v1', 'v2', 'v3', 'v4'):
self._domain = values[1]
self._ou = values[2]
self._account = values[3]
self._password = CryptoManager().decrypt(values[4])
self.domain.value = values[1]
self.ou.value = values[2]
self.account.value = values[3]
self.password.value = CryptoManager().decrypt(values[4])
if values[0] in ('v2', 'v3', 'v4'):
self._group = values[6]
self.grp.value = values[6]
else:
self._group = ''
self.grp.value = ''
if values[0] in ('v3', 'v4'):
self._server_hint = values[7]
self.server_hint.value = values[7]
else:
self._server_hint = ''
self.server_hint.value = ''
if values[0] == 'v4':
self._ssl = values[8]
self._remove_on_exit = values[9]
self.use_ssl.value = values[8] == 'y'
self.remove_on_exit.value = values[9] == 'y'
else:
self._ssl = 'n'
self._remove_on_exit = 'y'
self.use_ssl.value = False
self.remove_on_exit.value = True
super().unmarshal(codecs.decode(values[5].encode(), 'hex'))
def get_fields_as_dict(self) -> gui.ValuesDictType:
dct = super().get_fields_as_dict()
dct['domain'] = self._domain
dct['ou'] = self._ou
dct['account'] = self._account
dct['password'] = self._password
dct['grp'] = self._group
dct['server_hint'] = self._server_hint
dct['ssl'] = self._ssl == 'y'
dct['remove_on_exit'] = self._remove_on_exit == 'y'
return dct
self.flag_for_upgrade() # Force upgrade to new format

View File

@ -39,6 +39,7 @@ import typing
import collections.abc
from django.utils.translation import gettext_noop as _
from uds.core.module import Module
from uds.core.ui import gui
from uds.core.managers.crypto import CryptoManager
from uds.core import exceptions
@ -83,32 +84,23 @@ class WinRandomPassManager(WindowsOsManager):
idle = WindowsOsManager.idle
dead_line = WindowsOsManager.deadline
_user_account: str
_password: str
def __init__(self, environment: 'Environment', values: 'Module.ValuesType'):
super().__init__(environment, values)
def validate(self, values: 'Module.ValuesType') -> None:
if values:
if values['userAccount'] == '':
self.user_account.value = self.user_account.as_clean_str()
if self.user_account.as_str() == '':
raise exceptions.ui.ValidationError(_('Must provide an user account!!!'))
if values['password'] == '':
if self.password.as_str() == '':
raise exceptions.ui.ValidationError(_('Must provide a password for the account!!!'))
self._user_account = values['userAccount']
self._password = values['password']
else:
self._user_account = ''
self._password = '' # nosec: not a password (empty)
def process_user_password(
self, userService: 'UserService', username: str, password: str
) -> tuple[str, str]:
if username == self._user_account:
password = userService.recover_value('winOsRandomPass')
def update_credentials(self, userservice: 'UserService', username: str, password: str) -> tuple[str, str]:
if username == self.user_account.as_clean_str():
password = userservice.recover_value('winOsRandomPass')
return WindowsOsManager.process_user_password(self, userService, username, password)
return WindowsOsManager.update_credentials(self, userservice, username, password)
def gen_random_password(self, userService: 'UserService'):
randomPass = userService.recover_value('winOsRandomPass')
def gen_random_password(self, userservice: 'UserService'):
randomPass = userservice.recover_value('winOsRandomPass')
if not randomPass:
# Generates a password that conforms to complexity
rnd = random.SystemRandom()
@ -118,49 +110,39 @@ class WinRandomPassManager(WindowsOsManager):
randomPass = ''.join(rnd.choice(string.ascii_letters + string.digits) for _ in range(12))
pos = rnd.randrange(0, len(randomPass))
randomPass = randomPass[:pos] + base + randomPass[pos:]
userService.store_value('winOsRandomPass', randomPass)
userservice.store_value('winOsRandomPass', randomPass)
log.log(
userService,
userservice,
log.LogLevel.INFO,
f'Password set to "{randomPass}"',
log.LogSource.OSMANAGER,
)
return randomPass
def actor_data(self, userService: 'UserService') -> collections.abc.MutableMapping[str, typing.Any]:
def actor_data(self, userservice: 'UserService') -> collections.abc.MutableMapping[str, typing.Any]:
return {
'action': 'rename',
'name': userService.get_name(),
# Repeat data, to keep compat with old versions of Actor
# Will be removed in a couple of versions
'username': self._user_account,
'password': self._password,
'new_password': self.gen_random_password(userService),
'name': userservice.get_name(),
# Repeat data, to keep compat with old versions of Actor (the part outside "custom")
# Will be removed in a couple of versions (maybe 6.0? :D), maybe before (But not before 5.0)
'username': self.user_account.as_clean_str(),
'password': self.password.as_str(),
'new_password': self.gen_random_password(userservice),
'custom': {
'username': self._user_account,
'password': self._password,
'new_password': self.gen_random_password(userService),
'username': self.user_account.as_clean_str(),
'password': self.password.as_str(),
'new_password': self.gen_random_password(userservice),
},
}
def marshal(self) -> bytes:
'''
Serializes the os manager data so we can store it in database
'''
base = codecs.encode(super().marshal(), 'hex').decode()
return '\t'.join(['v1', self._user_account, CryptoManager().encrypt(self._password), base]).encode(
'utf8'
)
def unmarshal(self, data: bytes) -> None:
if not data.startswith(b'v'):
return super().unmarshal(data)
values = data.decode('utf8').split('\t')
if values[0] == 'v1':
self._user_account = values[1]
self._password = CryptoManager().decrypt(values[2])
self.user_account.value = values[1]
self.password.value = CryptoManager().decrypt(values[2])
super().unmarshal(codecs.decode(values[3].encode(), 'hex'))
def get_fields_as_dict(self) -> gui.ValuesDictType:
dic = super().get_fields_as_dict()
dic['user_account'] = self._user_account
dic['password'] = self._password
return dic
self.flag_for_upgrade() # Force upgrade to new format

View File

@ -52,7 +52,7 @@ USE_SSL: typing.Final[bool] = True
AUTOMATIC_ID_MAPPING: typing.Final[bool] = True
class LinuxAdOsManagerTest(UDSTestCase):
class LinuxAdOsManagerSerialTest(UDSTestCase):
def test_marshaling(self) -> None:
instance = osmanager.LinuxOsADManager(environment=Environment.get_temporary_environment())
instance.domain.value = DOMAIN

View File

@ -60,7 +60,7 @@ SERIALIZED_OSMANAGER_DATA: typing.Final[typing.Mapping[str, bytes]] = {
}
class LinuxOsManagerTest(UDSTestCase):
class LinuxOsManagerSerialTest(UDSTestCase):
def check(self, version: str, instance: 'osmanager.LinuxRandomPassManager') -> None:
self.assertEqual(instance.user_account.value, 'prueba')
self.assertEqual(instance.on_logout.value, 'keep')

View File

@ -0,0 +1,132 @@
# pylint: disable=no-member # ldap module gives errors to pylint
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import codecs
import typing
from tests.utils.test import UDSTestCase
from uds.core.environment import Environment
from uds.core.managers.crypto import CryptoManager
from django.conf import settings
from uds.osmanagers.WindowsOsManager import windows_domain as osmanager
PASSWD: typing.Final[str] = 'PASSWD'
CRYPTED_PASSWD: typing.Final[str] = CryptoManager().encrypt(PASSWD)
# values = data.decode('utf8').split('\t')
# if values[0] in ('v1', 'v2', 'v3', 'v4'):
# self.domain.value = values[1]
# self.ou.value = values[2]
# self.account.value = values[3]
# self.password.value = CryptoManager().decrypt(values[4])
# if values[0] in ('v2', 'v3', 'v4'):
# self.grp.value = values[6]
# else:
# self.grp.value = ''
# if values[0] in ('v3', 'v4'):
# self.server_hint.value = values[7]
# else:
# self.server_hint.value = ''
# if values[0] == 'v4':
# self.use_ssl.value = values[8] == 'y'
# self.remove_on_exit.value = values[9] == 'y'
# else:
# self.use_ssl.value = False
# self.remove_on_exit.value = True
# super().unmarshal(codecs.decode(values[5].encode(), 'hex'))
# self.flag_for_upgrade() # Force upgrade to new format
SERIALIZED_OSMANAGER_DATA: typing.Final[typing.Mapping[str, bytes]] = {
'v1': b'v1\tDOMAIN\tOU\tACCOUNT\t' + CRYPTED_PASSWD.encode() + b'\t' + codecs.encode(b'v3\tkeep\t30\ttrue', 'hex'),
'v2': b'v2\tDOMAIN\tOU\tACCOUNT\t' + CRYPTED_PASSWD.encode() + b'\t' + codecs.encode(b'v3\tkeep\t30\ttrue', 'hex') + b'\tGRP',
'v3': b'v3\tDOMAIN\tOU\tACCOUNT\t' + CRYPTED_PASSWD.encode() + b'\t' + codecs.encode(b'v3\tkeep\t30\ttrue', 'hex') + b'\tGRP\tSERVER_HINT',
'v4': b'v4\tDOMAIN\tOU\tACCOUNT\t' + CRYPTED_PASSWD.encode() + b'\t' + codecs.encode(b'v3\tkeep\t30\ttrue', 'hex') + b'\tGRP\tSERVER_HINT\ty\ty',
}
class WindowsOsManagerSerialTest(UDSTestCase):
def check(self, version: str, instance: 'osmanager.WinDomainOsManager') -> None:
self.assertEqual(instance.on_logout.value, 'keep')
self.assertEqual(instance.idle.value, 30)
self.assertEqual(instance.deadline.value, True)
self.assertEqual(instance.domain.value, 'DOMAIN')
self.assertEqual(instance.ou.value, 'OU')
self.assertEqual(instance.account.value, 'ACCOUNT')
self.assertEqual(instance.password.value, PASSWD)
if version in ('v2', 'v3', 'v4'):
self.assertEqual(instance.grp.value, 'GRP')
if version in ('v3', 'v4'):
self.assertEqual(instance.server_hint.value, 'SERVER_HINT')
if version == 'v4':
self.assertEqual(instance.use_ssl.value, True)
self.assertEqual(instance.remove_on_exit.value, True)
def test_unmarshall_all_versions(self) -> None:
for v in range(1, len(SERIALIZED_OSMANAGER_DATA) + 1):
instance = osmanager.WinDomainOsManager(environment=Environment.get_temporary_environment())
instance.unmarshal(SERIALIZED_OSMANAGER_DATA['v{}'.format(v)])
self.check(f'v{v}', instance)
def test_marshaling(self) -> None:
# Unmarshall last version, remarshall and check that is marshalled using new marshalling format
LAST_VERSION = 'v{}'.format(len(SERIALIZED_OSMANAGER_DATA))
instance = osmanager.WinDomainOsManager(
environment=Environment.get_temporary_environment()
)
instance.unmarshal(SERIALIZED_OSMANAGER_DATA[LAST_VERSION])
marshaled_data = instance.marshal()
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'v'))
# Reunmarshall again and check that remarshalled flag is not set
instance = osmanager.WinDomainOsManager(
environment=Environment.get_temporary_environment()
)
instance.unmarshal(marshaled_data)
self.assertFalse(instance.needs_upgrade())
# Check that data is correct
self.check(LAST_VERSION, instance)

View File

@ -0,0 +1,106 @@
# pylint: disable=no-member # ldap module gives errors to pylint
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import typing
from tests.utils.test import UDSTestCase
from uds.core.environment import Environment
from django.conf import settings
from uds.osmanagers.WindowsOsManager import windows as osmanager
PASSWD: typing.Final[str] = 'PASSWD'
# values = data.decode('utf8').split('\t')
# self.idle.value = -1
# self.deadline.value = True
# if values[0] == 'v1':
# self.on_logout.value = values[1]
# elif values[0] == 'v2':
# self.on_logout.value, self.idle.value = values[1], int(values[2])
# elif values[0] == 'v3':
# self.on_logout.value, self.idle.value, self.deadline.value = (
# values[1],
# int(values[2]),
# gui.as_bool(values[3]),
# )
SERIALIZED_OSMANAGER_DATA: typing.Final[typing.Mapping[str, bytes]] = {
'v1': b'v1\tkeep',
'v2': b'v2\tkeep\t999',
'v3': b'v3\tkeep\t999\tFALSE',
}
class WindowsOsManagerSerialTest(UDSTestCase):
def check(self, version: str, instance: 'osmanager.WindowsOsManager') -> None:
self.assertEqual(instance.on_logout.value, 'keep')
if version == 'v1':
self.assertEqual(instance.idle.value, -1)
self.assertEqual(instance.deadline.value, True)
elif version == 'v2':
self.assertEqual(instance.idle.value, 999)
self.assertEqual(instance.deadline.value, True)
elif version == 'v3':
self.assertEqual(instance.idle.value, 999)
self.assertEqual(instance.deadline.value, False)
def test_unmarshall_all_versions(self) -> None:
for v in range(1, len(SERIALIZED_OSMANAGER_DATA) + 1):
instance = osmanager.WindowsOsManager(environment=Environment.get_temporary_environment())
instance.unmarshal(SERIALIZED_OSMANAGER_DATA['v{}'.format(v)])
self.check(f'v{v}', instance)
def test_marshaling(self) -> None:
# Unmarshall last version, remarshall and check that is marshalled using new marshalling format
LAST_VERSION = 'v{}'.format(len(SERIALIZED_OSMANAGER_DATA))
instance = osmanager.WindowsOsManager(
environment=Environment.get_temporary_environment()
)
instance.unmarshal(SERIALIZED_OSMANAGER_DATA[LAST_VERSION])
marshaled_data = instance.marshal()
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'v'))
# Reunmarshall again and check that remarshalled flag is not set
instance = osmanager.WindowsOsManager(
environment=Environment.get_temporary_environment()
)
instance.unmarshal(marshaled_data)
self.assertFalse(instance.needs_upgrade())
# Check that data is correct
self.check(LAST_VERSION, instance)

View File

@ -0,0 +1,97 @@
# pylint: disable=no-member # ldap module gives errors to pylint
#
# Copyright (c) 2024 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import codecs
import typing
from tests.utils.test import UDSTestCase
from uds.core.environment import Environment
from uds.core.managers.crypto import CryptoManager
from django.conf import settings
from uds.osmanagers.WindowsOsManager import windows_random as osmanager
PASSWD: typing.Final[str] = 'PASSWD'
CRYPTED_PASSWD: typing.Final[str] = CryptoManager().encrypt(PASSWD)
# if data.startswith(b'v'):
# return super().unmarshal(data)
# values = data.decode('utf8').split('\t')
# if values[0] == 'v1':
# self._user_account = values[1]
# self._password = CryptoManager().decrypt(values[2])
# super().unmarshal(codecs.decode(values[3].encode(), 'hex'))
# self.flag_for_upgrade() # Force upgrade to new format
SERIALIZED_OSMANAGER_DATA: typing.Final[typing.Mapping[str, bytes]] = {
'v1': b'v1\tUSER_ACCOUNT\t' + CRYPTED_PASSWD.encode() + b'\t' + codecs.encode(b'v3\tkeep\t30\ttrue', 'hex'),
}
class WindowsOsManagerSerialTest(UDSTestCase):
def check(self, version: str, instance: 'osmanager.WinRandomPassManager') -> None:
self.assertEqual(instance.on_logout.value, 'keep')
self.assertEqual(instance.idle.value, 30)
self.assertEqual(instance.deadline.value, True)
self.assertEqual(instance.user_account.value, 'USER_ACCOUNT')
self.assertEqual(instance.password.value, PASSWD)
def test_unmarshall_all_versions(self) -> None:
for v in range(1, len(SERIALIZED_OSMANAGER_DATA) + 1):
instance = osmanager.WinRandomPassManager(environment=Environment.get_temporary_environment())
instance.unmarshal(SERIALIZED_OSMANAGER_DATA['v{}'.format(v)])
self.check(f'v{v}', instance)
def test_marshaling(self) -> None:
# Unmarshall last version, remarshall and check that is marshalled using new marshalling format
LAST_VERSION = 'v{}'.format(len(SERIALIZED_OSMANAGER_DATA))
instance = osmanager.WinRandomPassManager(environment=Environment.get_temporary_environment())
instance.unmarshal(SERIALIZED_OSMANAGER_DATA[LAST_VERSION])
marshaled_data = instance.marshal()
# Ensure remarshalled flag is set
self.assertTrue(instance.needs_upgrade())
instance.flag_for_upgrade(False) # reset flag
# Ensure fields has been marshalled using new format
self.assertFalse(marshaled_data.startswith(b'v'))
# Reunmarshall again and check that remarshalled flag is not set
instance = osmanager.WinRandomPassManager(environment=Environment.get_temporary_environment())
instance.unmarshal(marshaled_data)
self.assertFalse(instance.needs_upgrade())
# Check that data is correct
self.check(LAST_VERSION, instance)