1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-22 13:34:04 +03:00
This commit is contained in:
Adolfo Gómez García 2023-05-15 20:11:00 +02:00
parent 56ad195878
commit 004ea3e7df
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
23 changed files with 132 additions and 202 deletions

View File

@ -208,7 +208,7 @@ class ServiceProvider(Module):
val = getattr(val, 'value', val)
return val is True or val == gui.TRUE
def doLog(self, level: int, message: str) -> None:
def doLog(self, level: log.LogLevel, message: str) -> None:
"""
Logs a message with requested level associated with this service
"""

View File

@ -95,7 +95,6 @@ def fernet_key(crypt_key: bytes) -> str:
Note: if password is not set, this will raise an exception
"""
# Generate an URL-Safe base64 encoded 32 bytes key for Fernet
# First, with seed + password, generate a 32 bytes key based on seed + password
return base64.b64encode(hashlib.sha256(crypt_key).digest()).decode()
# pylint: disable=unnecessary-dunder-call

View File

@ -152,7 +152,7 @@ class Command(BaseCommand):
fltr = fltr.filter(state=State.ERROR)
for item in fltr[:max_items]: # at most max_items items
logs = [
f'{l["date"]}: {log.logStrFromLevel(l["level"])} [{l["source"]}] - {l["message"]}'
f'{l["date"]}: {log.LogLevel.fromStr(l["level"])} [{l["source"]}] - {l["message"]}'
for l in log.getLogs(item)
]
userServices[item.friendly_name] = {

View File

@ -68,4 +68,3 @@ class SampleMFA(mfas.MFA):
def sendCode(self, request: 'ExtendedHttpRequest', userId: str, username: str, identifier: str, code: str) -> mfas.MFA.RESULT:
logger.debug('Sending code: %s (from %s)', code, request.ip)
return mfas.MFA.RESULT.OK

View File

@ -43,60 +43,50 @@ from .linux_osmanager import LinuxOsManager
from .linux_randompass_osmanager import LinuxRandomPassManager
downloadsManager().registerDownloadable(
'udsactor_{version}_all.deb'.format(version=VERSION),
_(
'UDS Actor for Debian, Ubuntu, ... Linux machines <b>(Requires python >= 3.6)</b>'
),
os.path.dirname(typing.cast(str, sys.modules[__package__].__file__))
+ '/files/udsactor_{version}_all.deb'.format(version=VERSION),
f'udsactor_{VERSION}_all.deb',
_('UDS Actor for Debian, Ubuntu, ... Linux machines <b>(Requires python >= 3.6)</b>'),
os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) + f'/files/udsactor_{VERSION}_all.deb',
'application/x-debian-package',
)
downloadsManager().registerDownloadable(
'udsactor-{version}-1.noarch.rpm'.format(version=VERSION),
_(
'UDS Actor for Centos, Fedora, RH, Suse, ... Linux machines <b>(Requires python >= 3.6)</b>'
),
f'udsactor-{VERSION}-1.noarch.rpm',
_('UDS Actor for Centos, Fedora, RH, Suse, ... Linux machines <b>(Requires python >= 3.6)</b>'),
os.path.dirname(typing.cast(str, sys.modules[__package__].__file__))
+ '/files/udsactor-{version}-1.noarch.rpm'.format(version=VERSION),
+ f'/files/udsactor-{VERSION}-1.noarch.rpm',
'application/x-redhat-package-manager',
)
downloadsManager().registerDownloadable(
'udsactor-unmanaged_{version}_all.deb'.format(version=VERSION),
f'udsactor-unmanaged_{VERSION}_all.deb',
_(
'UDS Actor for Debian based Linux machines. Used ONLY for static machines. <b>(Requires python >= 3.6)</b>'
),
os.path.dirname(typing.cast(str, sys.modules[__package__].__file__))
+ '/files/udsactor-unmanaged_{version}_all.deb'.format(version=VERSION),
+ f'/files/udsactor-unmanaged_{VERSION}_all.deb',
'application/x-debian-package',
)
downloadsManager().registerDownloadable(
'udsactor-unmanaged-{version}-1.noarch.rpm'.format(version=VERSION),
f'udsactor-unmanaged-{VERSION}-1.noarch.rpm',
_(
'UDS Actor for Centos, Fedora, RH, Suse, ... Linux machines. Used ONLY for static machines. <b>(Requires python >= 3.6)</b>'
),
os.path.dirname(typing.cast(str, sys.modules[__package__].__file__))
+ '/files/udsactor-unmanaged-{version}-1.noarch.rpm'.format(version=VERSION),
+ f'/files/udsactor-unmanaged-{VERSION}-1.noarch.rpm',
'application/x-redhat-package-manager',
)
downloadsManager().registerDownloadable(
'udsactor_2.2.0_legacy.deb',
_(
'<b>Legacy</b> UDS Actor for Debian, Ubuntu, ... Linux machines <b>(Requires python 2.7)</b>'
),
os.path.dirname(typing.cast(str, sys.modules[__package__].__file__))
+ '/files/udsactor_2.2.0_legacy.deb',
_('<b>Legacy</b> UDS Actor for Debian, Ubuntu, ... Linux machines <b>(Requires python 2.7)</b>'),
os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) + '/files/udsactor_2.2.0_legacy.deb',
'application/x-debian-package',
)
downloadsManager().registerDownloadable(
'udsactor-legacy-2.2.1-1.noarch.rpm',
_(
'<b>Legacy</b> UDS Actor for Centos, Fedora, RH, ... Linux machines <b>(Requires python 2.7)</b>'
),
_('<b>Legacy</b> UDS Actor for Centos, Fedora, RH, ... Linux machines <b>(Requires python 2.7)</b>'),
os.path.dirname(typing.cast(str, sys.modules[__package__].__file__))
+ '/files/udsactor-legacy-2.2.1-1.noarch.rpm',
'application/x-redhat-package-manager',
@ -104,9 +94,7 @@ downloadsManager().registerDownloadable(
downloadsManager().registerDownloadable(
'udsactor-opensuse-legacy-2.2.1-1.noarch.rpm',
_(
'<b>Legacy</b> UDS Actor for OpenSUSE, ... Linux machines <b>(Requires python 2.7)</b>'
),
_('<b>Legacy</b> UDS Actor for OpenSUSE, ... Linux machines <b>(Requires python 2.7)</b>'),
os.path.dirname(typing.cast(str, sys.modules[__package__].__file__))
+ '/files/udsactor-opensuse-legacy-2.2.1-1.noarch.rpm',
'application/x-redhat-package-manager',

View File

@ -94,11 +94,11 @@ class LinuxOsManager(osmanagers.OSManager):
defvalue=gui.TRUE,
)
def __setProcessUnusedMachines(self):
def __setProcessUnusedMachines(self) -> None:
self.processUnusedMachines = self._onLogout == 'remove'
def __init__(self, environment, values):
super(LinuxOsManager, self).__init__(environment, values)
def __init__(self, environment, values) -> None:
super().__init__(environment, values)
if values is not None:
self._onLogout = values['onLogout']
self._idle = int(values['idle'])
@ -128,31 +128,31 @@ class LinuxOsManager(osmanagers.OSManager):
return False
def getName(self, service):
def getName(self, service: 'UserService') -> str:
"""
gets name from deployed
"""
return service.getName()
def doLog(self, service, data, origin=log.LogSource.OSMANAGER):
def doLog(self, service: 'UserService', data, origin=log.LogSource.OSMANAGER) -> None:
# Stores a log associated with this service
try:
msg, level = data.split('\t')
msg, slevel = data.split('\t')
try:
level = int(level)
level = log.LogLevel.fromStr(slevel)
except Exception:
logger.debug('Do not understand level %s', level)
logger.debug('Do not understand level %s', slevel)
level = log.LogLevel.INFO
log.doLog(service, level, msg, origin)
except Exception:
log.doLog(service, log.LogLevel.ERROR, "do not understand {0}".format(data), origin)
log.doLog(service, log.LogLevel.ERROR, f'do not understand {data}', origin)
def actorData(
self, userService: 'UserService'
) -> typing.MutableMapping[str, typing.Any]:
return {'action': 'rename', 'name': userService.getName()}
def processUnused(self, userService):
def processUnused(self, userService: 'UserService') -> None:
"""
This will be invoked for every assigned and unused user service that has been in this state at least 1/2 of Globalconfig.CHECK_UNUSED_TIME
This function can update userService values. Normal operation will be remove machines if this state is not valid
@ -166,14 +166,14 @@ class LinuxOsManager(osmanagers.OSManager):
)
userService.remove()
def isPersistent(self):
def isPersistent(self) -> bool:
return self._onLogout == 'keep-always'
def checkState(self, userService: 'UserService') -> str:
logger.debug('Checking state for service %s', userService)
return State.RUNNING
def maxIdle(self):
def maxIdle(self) -> typing.Optional[int]:
"""
On production environments, will return no idle for non removable machines
"""
@ -192,7 +192,7 @@ class LinuxOsManager(osmanagers.OSManager):
['v3', self._onLogout, str(self._idle), gui.fromBool(self._deadLine)]
).encode('utf8')
def unmarshal(self, data: bytes):
def unmarshal(self, data: bytes) -> None:
values = data.decode('utf8').split('\t')
self._idle = -1
self._deadLine = True

View File

@ -74,7 +74,7 @@ class LinuxRandomPassManager(LinuxOsManager):
_userAccount: str
def __init__(self, environment, values):
super(LinuxRandomPassManager, self).__init__(environment, values)
super().__init__(environment, values)
if values is not None:
if values['userAccount'] == '':
raise exceptions.ValidationError(
@ -91,7 +91,7 @@ class LinuxRandomPassManager(LinuxOsManager):
return (username, userService.recoverValue('linOsRandomPass'))
return username, password
def genPassword(self, service):
def genPassword(self, service: 'UserService') -> str:
randomPass = service.recoverValue('linOsRandomPass')
if randomPass is None:
randomPass = ''.join(
@ -102,7 +102,7 @@ class LinuxRandomPassManager(LinuxOsManager):
log.doLog(
service,
log.LogLevel.INFO,
"Password set to \"{}\"".format(randomPass),
f'Password set to "{randomPass}"',
log.LogSource.OSMANAGER,
)
@ -134,7 +134,7 @@ class LinuxRandomPassManager(LinuxOsManager):
self._userAccount = values[1].decode()
LinuxOsManager.unmarshal(self, codecs.decode(values[2], 'hex'))
def valuesDict(self):
def valuesDict(self) -> gui.ValuesDictType:
dic = LinuxOsManager.valuesDict(self)
dic['userAccount'] = self._userAccount
return dic

View File

@ -27,9 +27,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from django.utils.translation import gettext_noop as _
from .testing_osmanager import TestOSManager

View File

@ -108,18 +108,18 @@ class TestOSManager(osmanagers.OSManager):
"""
return userService.getName()
def doLog(self, service, data, origin=log.LogSource.OSMANAGER):
def doLog(self, service: 'UserService', data, origin=log.LogSource.OSMANAGER) -> None:
# Stores a log associated with this service
try:
msg, level = data.split('\t')
msg, slevel = data.split('\t')
try:
level = int(level)
level = log.LogLevel.fromStr(slevel)
except Exception:
logger.debug('Do not understand level %s', level)
logger.debug('Do not understand level %s', slevel)
level = log.LogLevel.INFO
log.doLog(service, level, msg, origin)
except Exception:
log.doLog(service, log.LogLevel.ERROR, "do not understand {0}".format(data), origin)
log.doLog(service, log.LogLevel.ERROR, f'do not understand {data}', origin)
def actorData(
self, userService: 'UserService'

View File

@ -45,17 +45,17 @@ from .windows_domain import WinDomainOsManager
from .windows_random import WinRandomPassManager
managers.downloadsManager().registerDownloadable(
'UDSActorSetup-{version}.exe'.format(version=VERSION),
f'UDSActorSetup-{VERSION}.exe',
_('UDS Actor for windows machines'),
os.path.dirname(typing.cast(str, sys.modules[__package__].__file__))
+ '/files/UDSActorSetup-{version}.exe'.format(version=VERSION),
+ f'/files/UDSActorSetup-{VERSION}.exe',
'application/x-msdos-program',
)
managers.downloadsManager().registerDownloadable(
'UDSActorUnmanagedSetup-{version}.exe'.format(version=VERSION),
f'UDSActorUnmanagedSetup-{VERSION}.exe',
_('UDS Actor for Unmanaged windows machines. Used ONLY for static machines.'),
os.path.dirname(typing.cast(str, sys.modules[__package__].__file__))
+ '/files/UDSActorUnmanagedSetup-{version}.exe'.format(version=VERSION),
+ f'/files/UDSActorUnmanagedSetup-{VERSION}.exe',
'application/x-msdos-program',
)

View File

@ -96,7 +96,7 @@ class WindowsOsManager(osmanagers.OSManager):
except Exception:
raise exceptions.ValidationError(
_('Length must be numeric!!')
)
) from None
if length > 6 or length < 1:
raise exceptions.ValidationError(
_('Length must be betwen 1 and 6')
@ -107,7 +107,7 @@ class WindowsOsManager(osmanagers.OSManager):
self.processUnusedMachines = self._onLogout == 'remove'
def __init__(self, environment, values):
super(WindowsOsManager, self).__init__(environment, values)
super().__init__(environment, values)
if values is not None:
self._onLogout = values['onLogout']
self._idle = int(values['idle'])
@ -145,7 +145,7 @@ class WindowsOsManager(osmanagers.OSManager):
try:
msg, levelStr = data.split('\t')
try:
level = int(levelStr)
level = log.LogLevel.fromStr(levelStr)
except Exception:
logger.debug('Do not understand level %s', levelStr)
level = log.LogLevel.INFO
@ -154,7 +154,7 @@ class WindowsOsManager(osmanagers.OSManager):
except Exception:
logger.exception('WindowsOs Manager message log: ')
log.doLog(
userService, log.LogLevel.ERROR, "do not understand {0}".format(data), origin
userService, log.LogLevel.ERROR, f'do not understand {data}', origin
)
def actorData(

View File

@ -68,9 +68,7 @@ class WinDomainOsManager(WindowsOsManager):
length=64,
label=_('Domain'),
order=1,
tooltip=_(
'Domain to join machines to (use FQDN form, Netbios name not supported for most operations)'
),
tooltip=_('Domain to join machines to (use FQDN form, Netbios name not supported for most operations)'),
required=True,
)
account = gui.TextField(
@ -99,9 +97,7 @@ class WinDomainOsManager(WindowsOsManager):
length=64,
label=_('Machine Group'),
order=7,
tooltip=_(
'Group to which add machines on creation. If empty, no group will be used.'
),
tooltip=_('Group to which add machines on creation. If empty, no group will be used.'),
tab=_('Advanced'),
)
removeOnExit = gui.CheckBoxField(
@ -150,17 +146,11 @@ class WinDomainOsManager(WindowsOsManager):
# if values['domain'].find('.') == -1:
# raise exceptions.ValidationException(_('Must provide domain in FQDN'))
if values['account'] == '':
raise exceptions.ValidationError(
_('Must provide an account to add machines to domain!')
)
raise exceptions.ValidationError(_('Must provide an account to add machines to domain!'))
if values['account'].find('\\') != -1:
raise exceptions.ValidationError(
_('DOM\\USER form is not allowed!')
)
raise exceptions.ValidationError(_('DOM\\USER form is not allowed!'))
if values['password'] == '':
raise exceptions.ValidationError(
_('Must provide a password for the account!')
)
raise exceptions.ValidationError(_('Must provide a password for the account!'))
self._domain = values['domain']
self._ou = values['ou'].strip()
self._account = values['account']
@ -269,9 +259,7 @@ class WinDomainOsManager(WindowsOsManager):
fltr = f'(&(objectClass=computer)(sAMAccountName={ldaputil.escape(machineName)}$))'
obj: typing.Optional[typing.MutableMapping[str, typing.Any]]
try:
obj = next(
ldaputil.getAsDict(ldapConnection, base, fltr, ['dn'], sizeLimit=50)
)
obj = next(ldaputil.getAsDict(ldapConnection, base, fltr, ['dn'], sizeLimit=50))
except StopIteration:
obj = None
@ -309,7 +297,7 @@ class WinDomainOsManager(WindowsOsManager):
logger.warning('Could not find _ldap._tcp.%s', self._domain)
log.doLog(
userService,
log.WARN,
log.LogLevel.WARNING,
f'Could not remove machine from domain (_ldap._tcp.{self._domain} not found)',
log.LogSource.OSMANAGER,
)
@ -325,7 +313,7 @@ class WinDomainOsManager(WindowsOsManager):
# logger.exception('Ldap Exception caught')
if error:
log.doLog(userService, log.WARN, error, log.LogSource.OSMANAGER)
log.doLog(userService, log.LogLevel.WARNING, error, log.LogSource.OSMANAGER)
logger.error(error)
def release(self, userService: 'UserService') -> None:
@ -351,7 +339,7 @@ class WinDomainOsManager(WindowsOsManager):
logger.warning('Could not find _ldap._tcp.%s', self._domain)
log.doLog(
userService,
log.WARN,
log.LogLevel.WARNING,
f'Could not remove machine from domain (_ldap._tcp.{self._domain} not found)',
log.LogSource.OSMANAGER,
)
@ -360,7 +348,7 @@ class WinDomainOsManager(WindowsOsManager):
# logger.exception('Ldap Exception caught')
log.doLog(
userService,
log.WARN,
log.LogLevel.WARNING,
f'Could not remove machine from domain ({e})',
log.LogSource.OSMANAGER,
)
@ -369,7 +357,7 @@ class WinDomainOsManager(WindowsOsManager):
# logger.exception('Exception caught')
log.doLog(
userService,
log.WARN,
log.LogLevel.WARNING,
f'Could not remove machine from domain ({e})',
log.LogSource.OSMANAGER,
)
@ -378,14 +366,10 @@ class WinDomainOsManager(WindowsOsManager):
try:
res = self.__getMachine(ldapConnection, userService.friendly_name)
if res is None:
raise Exception(
f'Machine {userService.friendly_name} not found on AD (permissions?)'
)
raise Exception(f'Machine {userService.friendly_name} not found on AD (permissions?)')
ldaputil.recursive_delete(ldapConnection, res)
except IndexError:
logger.error(
'Error deleting %s from BASE %s', userService.friendly_name, self._ou
)
logger.error('Error deleting %s from BASE %s', userService.friendly_name, self._ou)
except Exception:
logger.exception('Deleting from AD: ')
@ -395,9 +379,9 @@ class WinDomainOsManager(WindowsOsManager):
except ldaputil.LDAPError as e:
return _('Check error: {}').format(e)
except dns.resolver.NXDOMAIN:
return _(
'Could not find server parameters (_ldap._tcp.{0} can\'t be resolved)'
).format(self._domain)
return _('Could not find server parameters (_ldap._tcp.{0} can\'t be resolved)').format(
self._domain
)
except Exception as e:
logger.exception('Exception ')
return str(e)
@ -410,17 +394,13 @@ class WinDomainOsManager(WindowsOsManager):
# Group
if self._group != '':
if self.__getGroup(ldapConnection) is None:
return _(
'Check Error: group "{}" not found (using "cn" to locate it)'
).format(self._group)
return _('Check Error: group "{}" not found (using "cn" to locate it)').format(self._group)
return _('Server check was successful')
# pylint: disable=protected-access
@staticmethod
def test(
env: 'Environment', data: typing.Dict[str, str]
) -> typing.List[typing.Any]:
def test(env: 'Environment', data: typing.Dict[str, str]) -> typing.List[typing.Any]:
logger.debug('Test invoked')
wd = WinDomainOsManager(env, data)
logger.debug(wd)
@ -442,17 +422,13 @@ class WinDomainOsManager(WindowsOsManager):
if wd and not wd._ou:
return [
False,
_('The default path {0} for computers was not found!!!').format(
wd._ou
),
_('The default path {0} for computers was not found!!!').format(wd._ou),
]
return [False, _('The ou path {0} was not found!!!').format(wd._ou)]
except dns.resolver.NXDOMAIN:
return [
True,
_(
'Could not check parameters (_ldap._tcp.{0} can\'r be resolved)'
).format(wd._domain),
_('Could not check parameters (_ldap._tcp.{0} can\'r be resolved)').format(wd._domain),
]
except Exception as e:
logger.exception('Exception ')
@ -460,9 +436,7 @@ class WinDomainOsManager(WindowsOsManager):
return [True, _("All parameters seem to work fine.")]
def actorData(
self, userService: 'UserService'
) -> typing.MutableMapping[str, typing.Any]:
def actorData(self, userService: 'UserService') -> typing.MutableMapping[str, typing.Any]:
return {
'action': 'rename_ad',
'name': userService.getName(),

View File

@ -58,9 +58,7 @@ logger = logging.getLogger(__name__)
class WinRandomPassManager(WindowsOsManager):
typeName = _('Windows Random Password OS Manager')
typeType = 'WinRandomPasswordManager'
typeDescription = _(
'Os Manager to control windows machines, with user password set randomly.'
)
typeDescription = _('Os Manager to control windows machines, with user password set randomly.')
iconFile = 'wosmanager.png'
# Apart form data from windows os manager, we need also domain and credentials
@ -88,13 +86,9 @@ class WinRandomPassManager(WindowsOsManager):
super().__init__(environment, values)
if values:
if values['userAccount'] == '':
raise exceptions.ValidationError(
_('Must provide an user account!!!')
)
raise exceptions.ValidationError(_('Must provide an user account!!!'))
if values['password'] == '':
raise exceptions.ValidationError(
_('Must provide a password for the account!!!')
)
raise exceptions.ValidationError(_('Must provide a password for the account!!!'))
self._userAccount = values['userAccount']
self._password = values['password']
else:
@ -107,9 +101,7 @@ class WinRandomPassManager(WindowsOsManager):
if username == self._userAccount:
password = userService.recoverValue('winOsRandomPass')
return WindowsOsManager.processUserPassword(
self, userService, username, password
)
return WindowsOsManager.processUserPassword(self, userService, username, password)
def genPassword(self, userService: 'UserService'):
randomPass = userService.recoverValue('winOsRandomPass')
@ -117,12 +109,9 @@ class WinRandomPassManager(WindowsOsManager):
# Generates a password that conforms to complexity
rnd = random.SystemRandom()
base = ''.join(
rnd.choice(v)
for v in (string.ascii_lowercase, string.ascii_uppercase, string.digits)
rnd.choice(v) for v in (string.ascii_lowercase, string.ascii_uppercase, string.digits)
) + rnd.choice('.+-')
randomPass = ''.join(
rnd.choice(string.ascii_letters + string.digits) for _ in range(12)
)
randomPass = ''.join(rnd.choice(string.ascii_letters + string.digits) for _ in range(12))
pos = rnd.randrange(0, len(randomPass))
randomPass = randomPass[:pos] + base + randomPass[pos:]
userService.storeValue('winOsRandomPass', randomPass)
@ -134,9 +123,7 @@ class WinRandomPassManager(WindowsOsManager):
)
return randomPass
def actorData(
self, userService: 'UserService'
) -> typing.MutableMapping[str, typing.Any]:
def actorData(self, userService: 'UserService') -> typing.MutableMapping[str, typing.Any]:
return {
'action': 'rename',
'name': userService.getName(),
@ -150,9 +137,9 @@ class WinRandomPassManager(WindowsOsManager):
Serializes the os manager data so we can store it in database
'''
base = codecs.encode(super().marshal(), 'hex').decode()
return '\t'.join(
['v1', self._userAccount, CryptoManager().encrypt(self._password), base]
).encode('utf8')
return '\t'.join(['v1', self._userAccount, CryptoManager().encrypt(self._password), base]).encode(
'utf8'
)
def unmarshal(self, data: bytes) -> None:
values = data.decode('utf8').split('\t')

View File

@ -60,16 +60,13 @@ reportAutoModelDct: typing.Mapping[str, typing.Type[ReportAutoModel]] = { # typ
class ReportAutoType(UserInterfaceType):
def __new__(cls, name, bases, attrs) -> 'ReportAutoType':
def __new__(mcs, name, bases, attrs) -> 'ReportAutoType':
# Add gui for elements...
order = 1
# Check what source
if attrs.get('data_source'):
attrs['source'] = fields.source_field(
order, attrs['data_source'], attrs['multiple']
)
attrs['source'] = fields.source_field(order, attrs['data_source'], attrs['multiple'])
order += 1
# Check if date must be added
@ -88,11 +85,9 @@ class ReportAutoType(UserInterfaceType):
attrs['interval'] = fields.intervals_field(order)
order += 1
return typing.cast(
'ReportAutoType', UserInterfaceType.__new__(cls, name, bases, attrs)
)
return typing.cast('ReportAutoType', UserInterfaceType.__new__(mcs, name, bases, attrs))
# pylint: disable=abstract-method
class ReportAuto(Report, metaclass=ReportAutoType):
# Variables that will be overwriten on new class creation
source: typing.ClassVar[typing.Union[gui.MultiChoiceField, gui.ChoiceField]]
@ -115,23 +110,19 @@ class ReportAuto(Report, metaclass=ReportAutoType):
multiple: bool = False
def getModel(self) -> typing.Type[ReportAutoModel]:
data_source = self.data_source.split('.')[0]
data_source = self.data_source.split('.', maxsplit=1)[0]
return reportAutoModelDct[data_source]
def initGui(self):
# Fills datasource
fields.source_field_data(self.getModel(), self.data_source, self.source)
fields.source_field_data(self.getModel(), self.source)
logger.debug('Source field data: %s', self.source)
def getModelItems(self) -> typing.Iterable[ReportAutoModel]:
model = self.getModel()
filters = (
[self.source.value]
if isinstance(self.source, gui.ChoiceField)
else self.source.value
)
filters = [self.source.value] if isinstance(self.source, gui.ChoiceField) else self.source.value
if '0-0-0-0' in filters:
items = model.objects.all()
@ -141,9 +132,7 @@ class ReportAuto(Report, metaclass=ReportAutoType):
return items
def getIntervalInHours(self):
return {'hour': 1, 'day': 24, 'week': 24 * 7, 'month': 24 * 30}[
self.interval.value
]
return {'hour': 1, 'day': 24, 'week': 24 * 7, 'month': 24 * 30}[self.interval.value]
def getIntervalsList(self) -> typing.List[typing.Tuple[datetime.datetime, datetime.datetime]]:
intervals: typing.List[typing.Tuple[datetime.datetime, datetime.datetime]] = []
@ -164,33 +153,29 @@ class ReportAuto(Report, metaclass=ReportAutoType):
next = (start + datetime.timedelta(days=32)).replace(day=1)
intervals.append((start, next))
start = next
logger.info('Intervals: {0}'.format(intervals))
return intervals
logger.debug('Intervals: %s', intervals)
return intervals
def adjustDate(self, d: datetime.date, isEndingDate: bool) -> datetime.date:
if self.interval.value in ('hour', 'day'):
return d
elif self.interval.value == 'week':
if self.interval.value == 'week':
return (d - datetime.timedelta(days=d.weekday())).replace()
elif self.interval.value == 'month':
if self.interval.value == 'month':
if not isEndingDate:
return d.replace(day=1)
else:
return (d + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1)
else:
return d
return (d + datetime.timedelta(days=32)).replace(day=1) - datetime.timedelta(days=1)
return d
def formatDatetimeAsString(self, d: datetime.date) -> str:
if self.interval.value in ('hour', 'day'):
return d.strftime('%Y-%b-%d %H:%M:%S')
elif self.interval.value == 'week':
if self.interval.value == 'week':
return d.strftime('%Y-%b-%d')
elif self.interval.value == 'month':
if self.interval.value == 'month':
return d.strftime('%Y-%b')
else:
return d.strftime('%Y-%b-%d %H:%M:%S')
return d.strftime('%Y-%b-%d %H:%M:%S')
def startingDate(self) -> datetime.date:
return self.adjustDate(self.date_start.date(), False)

View File

@ -37,7 +37,6 @@ import typing
from django.utils.translation import gettext_noop as _
from uds.core.ui import gui
from uds import models
logger = logging.getLogger(__name__)
@ -57,7 +56,7 @@ def single_date_field(order: int) -> gui.DateField:
order=order,
label=_('Date'),
tooltip=_('Date for report'),
defvalue=lambda: datetime.date.today(),
defvalue=datetime.date.today,
required=True,
)
@ -67,7 +66,7 @@ def end_date_field(order: int) -> gui.DateField:
order=order,
label=_('Ending date'),
tooltip=_('ending date for report'),
defvalue=lambda: datetime.date.today()+datetime.timedelta(days=1),
defvalue=lambda: datetime.date.today() + datetime.timedelta(days=1),
required=True,
)
@ -113,7 +112,6 @@ def source_field(
def source_field_data(
model: typing.Any,
data_source: str,
field: typing.Union[gui.ChoiceField, gui.MultiChoiceField],
) -> None:
dataList: typing.List[gui.ChoiceType] = [

View File

@ -41,6 +41,7 @@ from django.utils.translation import gettext, gettext_lazy as _
from uds.core.ui import gui
from uds.core.util import log
from uds.core.managers.log.objects import LogObjectType
from uds.models import Log
from .base import ListReport
@ -49,7 +50,6 @@ from .base import ListReport
logger = logging.getLogger(__name__)
class ListReportAuditCSV(ListReport):
name = _('Audit Log list') # Report name
description = _('List administration audit logs') # Report description
@ -80,16 +80,21 @@ class ListReportAuditCSV(ListReport):
def genData(self) -> typing.Generator[typing.Tuple, None, None]:
# Xtract user method, response_code and request from data
# the format is "user: [method/response_code] request"
rx = re.compile(r'(?P<ip>[^ ]*) (?P<user>.*?): \[(?P<method>[^/]*)/(?P<response_code>[^\]]*)\] (?P<request>.*)')
rx = re.compile(
r'(?P<ip>[^ ]*) (?P<user>.*?): \[(?P<method>[^/]*)/(?P<response_code>[^\]]*)\] (?P<request>.*)'
)
start = self.startDate.datetime().replace(hour=0, minute=0, second=0, microsecond=0)
end = self.endDate.datetime().replace(hour=23, minute=59, second=59, microsecond=999999)
for i in Log.objects.filter(
created__gte=start, created__lte=end, owner_id=0, owner_type=log.OWNER_TYPE_AUDIT,
created__gte=start,
created__lte=end,
source=log.LogSource.REST,
owner_type=LogObjectType.SYSLOG,
).order_by('-created'):
# extract user, method, response_code and request from data field
m = rx.match(i.data)
if m is not None:
# Convert response code to an string if 200, else, to an error
response_code = {
@ -117,7 +122,15 @@ class ListReportAuditCSV(ListReport):
writer = csv.writer(output)
writer.writerow(
[gettext('Date'), gettext('Level'), gettext('IP'), gettext('User'), gettext('Method'), gettext('Response code'), gettext('Request')]
[
gettext('Date'),
gettext('Level'),
gettext('IP'),
gettext('User'),
gettext('Method'),
gettext('Response code'),
gettext('Request'),
]
)
for l in self.genData():

View File

@ -30,8 +30,6 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
from django.utils.translation import gettext_noop as _
from uds.core import reports

View File

@ -34,7 +34,6 @@ import typing
from django.utils.translation import gettext, gettext_lazy as _
from uds.core.ui import gui
from uds.core.util.stats import counters
from .base import StatsReportAuto
@ -68,8 +67,6 @@ class AuthenticatorsStats(StatsReportAuto):
# Will show a.name on every change...
stats.append({'date': a.name, 'users': None})
services = 0
userServices = 0
for i in self.getIntervalsList():
start = i[0]
end = i[1]

View File

@ -30,8 +30,6 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
from django.utils.translation import gettext_noop as _
from uds.core import reports
from ..auto import ReportAuto
@ -43,6 +41,6 @@ class StatsReport(reports.Report):
def generate(self) -> bytes:
raise NotImplementedError('StatsReport generate invoked and not implemented')
# pylint: disable=abstract-method
class StatsReportAuto(ReportAuto, StatsReport):
pass

View File

@ -92,7 +92,7 @@ class CountersPoolAssigned(StatsReport):
for poolUuid in self.pools.value:
try:
pool = ServicePool.objects.get(uuid=poolUuid)
except Exception:
except Exception: # nosec: If not found, simple ignore it and go for next
continue
hours = [0] * 24
@ -125,9 +125,7 @@ class CountersPoolAssigned(StatsReport):
d = {
'title': _('Services by hour'),
'x': X,
'xtickFnc': lambda xx: '{:02d}'.format(
xx
), # pylint: disable=unnecessary-lambda
'xtickFnc': '{:02d}'.format, # Two digits
'xlabel': _('Hour'),
'y': [
{'label': i['name'], 'data': [i['hours'][v] for v in X]} for i in items
@ -172,6 +170,6 @@ class CountersPoolAssignedCSV(CountersPoolAssigned):
for i in items:
for j in range(24):
writer.writerow([i['name'], '{:02d}'.format(j), i['hours'][j]])
writer.writerow([i['name'], f'{j:02d}', i['hours'][j]])
return output.getvalue()

View File

@ -60,7 +60,7 @@ class PoolsUsageSummary(UsageByPool):
) -> typing.Tuple[
typing.ValuesView[typing.MutableMapping[str, typing.Any]], int, int, int
]:
orig, poolNames = super().getData()
orig, poolNames = super().getData() # pylint: disable=unused-variable # Keep name for reference
pools: typing.Dict[str, typing.Dict] = {}
totalTime: int = 0
@ -88,8 +88,8 @@ class PoolsUsageSummary(UsageByPool):
logger.debug('Pools %s', pools)
# Remove unique users, and keep only counts...
for pn in pools:
pools[pn]['users'] = len(pools[pn]['users'])
for _, pn in pools.items():
pn['users'] = len(pn['users'])
return pools.values(), totalTime, totalCount or 1, len(uniqueUsers)

View File

@ -30,16 +30,15 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
""" from django.utils.translation import gettext_noop as _
# from django.utils.translation import gettext_noop as _
from .base import StatsReport
# from .base import StatsReport
class StatsReportUsage(StatsReport): # Disabled from being used
name = _('Usage stats') # Report name
description = _('Statistics of platform use') # Report description
uuid = '9ae54172-ed48-11e4-b8e1-10feed05884b'
# class StatsReportUsage(StatsReport): # Disabled from being used
# name = _('Usage stats') # Report name
# description = _('Statistics of platform use') # Report description
# uuid = '9ae54172-ed48-11e4-b8e1-10feed05884b'
def generate(self):
return ''
"""
# def generate(self):
# return ''

View File

@ -200,7 +200,7 @@ class StatsReportLogin(StatsReport):
_('Sunday'),
][l],
'xlabel': _('Day of week'),
'y': [{'label': 'Users', 'data': [v for v in dataWeek]}],
'y': [{'label': 'Users', 'data': list(dataWeek)}],
'ylabel': 'Users',
}
@ -211,7 +211,7 @@ class StatsReportLogin(StatsReport):
'title': _('Users Access (by hour)'),
'x': X,
'xlabel': _('Hour'),
'y': [{'label': 'Users', 'data': [v for v in dataHour]}],
'y': [{'label': 'Users', 'data': list(dataHour)}],
'ylabel': 'Users',
}