1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-10 01:17:59 +03:00

More linting (a lot more :) )

This commit is contained in:
Adolfo Gómez García 2023-04-15 16:33:53 +02:00
parent ed90cd3995
commit a058b61276
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
42 changed files with 368 additions and 299 deletions

View File

@ -11,7 +11,7 @@ clear-cache-post-run=no
# Load and enable all available extensions. Use --list-extensions to see a list
# all available extensions.
#enable-all-extensions=
enable-all-extensions=yes
# In error mode, messages with a category besides ERROR or FATAL are
# suppressed, and no reports are done by default. Error mode is compatible with
@ -278,34 +278,34 @@ exclude-too-few-public-methods=
ignored-parents=
# Maximum number of arguments for function / method.
max-args=5
max-args=10
# Maximum number of attributes for a class (see R0902).
max-attributes=7
max-attributes=12
# Maximum number of boolean expressions in an if statement (see R0916).
max-bool-expr=5
# Maximum number of branch for function / method body.
max-branches=20
max-branches=24
# Maximum number of locals for function / method body.
max-locals=15
max-locals=24
# Maximum number of parents for a class (see R0901).
max-parents=7
# Maximum number of public methods for a class (see R0904).
max-public-methods=20
max-public-methods=32
# Maximum number of return / yield for function / method body.
max-returns=6
# Maximum number of statements in function / method body.
max-statements=50
max-statements=64
# Minimum number of public methods for a class (see R0903).
min-public-methods=2
min-public-methods=1
[EXCEPTIONS]
@ -428,7 +428,8 @@ disable=raw-checker-failed,
broad-except,
no-name-in-module, # Too many false positives... :(
import-error,
too-many-lines
too-many-lines,
redefined-builtin,
# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option

View File

@ -93,7 +93,7 @@ class Client(Handler):
# error += ' (code {0:04X})'.format(errorCode)
error = _(
'Your service is being created. Please, wait while we complete it'
) + ' ({}%)'.format(int(errorCode * 25))
) + f' ({int(errorCode)*25}%)'
res['error'] = error
res['retryable'] = '1' if retryable else '0'

View File

@ -459,7 +459,7 @@ def webPassword(request: HttpRequest) -> str:
return cryptoManager().symDecrpyt(
passkey, getUDSCookie(request)
) # recover as original unicode string
else: # No session, get from _session instead, this is an "client" REST request
# No session, get from _session instead, this is an "client" REST request
return cryptoManager().symDecrpyt(
getattr(request, '_cryptedpass'), getattr(request, '_scrambler')
)

View File

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
# pylint: disable=unused-argument # this has a lot of "default" methods, so we need to ignore unused arguments most of the time
#
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
# All rights reserved.
@ -33,7 +34,6 @@ Base module for all authenticators
"""
import enum
import logging
from re import A
import typing
from django.utils.translation import gettext_noop as _
@ -65,6 +65,7 @@ class AuthenticationSuccess(enum.IntEnum):
OK = 1
REDIRECT = 2
class AuthenticationInternalUrl(enum.Enum):
"""
Enumeration for authentication success
@ -78,6 +79,7 @@ class AuthenticationInternalUrl(enum.Enum):
"""
return reverse(self.value)
class AuthenticationResult(typing.NamedTuple):
success: AuthenticationSuccess
url: typing.Optional[str] = None
@ -186,8 +188,8 @@ class Authenticator(Module):
# : If this authenticators casues a temporal block of an user on repeated login failures
blockUserOnLoginFailures: typing.ClassVar[bool] = True
from .user import User
from .group import Group
from .user import User # pylint: disable=import-outside-toplevel
from .group import Group # pylint: disable=import-outside-toplevel
# : The type of user provided, normally standard user will be enough.
# : This is here so if we need it in some case, we can write our own
@ -213,10 +215,12 @@ class Authenticator(Module):
@param environment: Environment for the authenticator
@param values: Values passed to element
"""
from uds.models import Authenticator as AuthenticatorModel
from uds.models import ( # pylint: disable=import-outside-toplevel
Authenticator as AuthenticatorModel,
)
self._dbAuth = dbAuth or AuthenticatorModel() # Fake dbAuth if not provided
super(Authenticator, self).__init__(environment, values)
super().__init__(environment, values)
self.initialize(values)
def initialize(self, values: typing.Optional[typing.Dict[str, typing.Any]]) -> None:
@ -249,9 +253,9 @@ class Authenticator(Module):
user param is a database user object
"""
from uds.core.auths.groups_manager import (
from uds.core.auths.groups_manager import ( # pylint: disable=import-outside-toplevel
GroupsManager,
) # pylint: disable=redefined-outer-name
)
if self.isExternalSource:
groupsManager = GroupsManager(self._dbAuth)
@ -268,7 +272,7 @@ class Authenticator(Module):
This method will allow us to know where to do redirection in case
we need to use callback for authentication
"""
from .auth import authCallbackUrl
from .auth import authCallbackUrl # pylint: disable=import-outside-toplevel
return authCallbackUrl(self.dbAuthenticator())
@ -276,7 +280,7 @@ class Authenticator(Module):
"""
Helper method to return info url for this authenticator
"""
from .auth import authInfoUrl
from .auth import authInfoUrl # pylint: disable=import-outside-toplevel
return authInfoUrl(self.dbAuthenticator())
@ -394,18 +398,26 @@ class Authenticator(Module):
"""
return FAILED_AUTH
def isAccesibleFrom(self, request: 'HttpRequest'):
def isAccesibleFrom(self, request: 'HttpRequest') -> bool:
"""
Used by the login interface to determine if the authenticator is visible on the login page.
"""
from uds.core.util.request import ExtendedHttpRequest
from uds.models import Authenticator as dbAuth
from uds.core.util.request import ( # pylint: disable=import-outside-toplevel
ExtendedHttpRequest,
)
from uds.models import ( # pylint: disable=import-outside-toplevel
Authenticator as dbAuth,
)
return self._dbAuth.state != dbAuth.DISABLED and self._dbAuth.validForIp(
typing.cast('ExtendedHttpRequest', request).ip
)
def transformUsername(self, username: str, request: 'ExtendedHttpRequest') -> str:
def transformUsername(
self,
username: str,
request: 'ExtendedHttpRequest',
) -> str:
"""
On login, this method get called so we can "transform" provided user name.
@ -462,7 +474,11 @@ class Authenticator(Module):
"""
return self.authenticate(username, credentials, groupsManager, request)
def logout(self, request: 'ExtendedHttpRequest', username: str) -> AuthenticationResult:
def logout(
self,
request: 'ExtendedHttpRequest',
username: str,
) -> AuthenticationResult:
"""
Invoked whenever an user logs out.
@ -491,7 +507,10 @@ class Authenticator(Module):
return SUCCESS_AUTH
def webLogoutHook(
self, username: str, request: 'HttpRequest', response: 'HttpResponse'
self,
username: str,
request: 'HttpRequest',
response: 'HttpResponse',
) -> None:
'''
Invoked on web logout of an user

View File

@ -64,7 +64,7 @@ class JobThread(threading.Thread):
_freq: int
def __init__(self, jobInstance: 'Job', dbJob: DBScheduler) -> None:
super(JobThread, self).__init__()
super().__init__()
self._jobInstance = jobInstance
self._dbJobId = dbJob.id
self._freq = dbJob.frecuency

View File

@ -85,7 +85,7 @@ class CryptoManager(metaclass=singleton.Singleton):
while len(key) < length:
key += key # Dup key
kl: typing.List[int] = [v for v in key]
kl: typing.List[int] = list(key)
pos = 0
while len(kl) > length:
kl[pos] ^= kl[length]
@ -279,14 +279,15 @@ class CryptoManager(metaclass=singleton.Singleton):
return secrets.compare_digest(
hashlib.sha3_256(value).hexdigest(), hashValue[8:]
)
elif hashValue[:12] == '{SHA256SALT}':
if hashValue[:12] == '{SHA256SALT}':
# Extract 16 chars salt and hash
salt = hashValue[12:28].encode()
value = salt + value
return secrets.compare_digest(
hashlib.sha3_256(value).hexdigest(), hashValue[28:]
)
else: # Old sha1
# Old sha1
return secrets.compare_digest(
hashValue, str(hashlib.sha1(value).hexdigest()) # nosec: Old compatibility SHA1, not used anymore but need to be supported
) # nosec: Old compatibility SHA1, not used anymore but need to be supported

View File

@ -106,7 +106,7 @@ class DownloadsManager(metaclass=singleton.Singleton):
memory at once. The FileWrapper will turn the file object into an
iterator for chunks of 8KB.
"""
wrapper = FileWrapper(open(filename, 'rb'))
wrapper = FileWrapper(open(filename, 'rb')) # pylint: disable=consider-using-with
response = HttpResponse(wrapper, content_type=mime)
response['Content-Length'] = os.path.getsize(filename)
response['Content-Disposition'] = 'attachment; filename=' + name

View File

@ -149,7 +149,7 @@ class PublicationFinishChecker(DelayedTask):
"""
def __init__(self, publication: ServicePoolPublication) -> None:
super(PublicationFinishChecker, self).__init__()
super().__init__()
self._publishId = publication.id
self._state = publication.state

View File

@ -1021,12 +1021,15 @@ class UserServiceManager(metaclass=singleton.Singleton):
sortedPools = sorted(
sortPools, key=operator.itemgetter(0)
) # sort by priority (first element)
pools: typing.List[ServicePool] = [
p[1] for p in sortedPools if p[1].usage() < 100 and p[1].isUsable()
]
poolsFull: typing.List[ServicePool] = [
p[1] for p in sortedPools if p[1].usage() == 100 and p[1].isUsable()
]
pools: typing.List[ServicePool] = []
poolsFull: typing.List[ServicePool] = []
for p in sortedPools:
if not p[1].isUsable():
continue
if p[1].usage() == 100:
poolsFull.append(p[1])
else:
pools.append(p[1])
logger.debug('Pools: %s/%s', pools, poolsFull)

View File

@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
# pylint: disable=unused-argument # this has a lot of "default" methods, so we need to ignore unused arguments most of the time
#
# Copyright (c) 2022 Virtual Cable S.L.U.
@ -300,7 +300,6 @@ class MFA(Module):
This method allows to reset the MFA state of an user.
Normally, this will do nothing, but for persistent MFA data (as Google Authenticator), this will remove the data.
"""
pass
@staticmethod
def getUserId(user: 'User') -> str:

View File

@ -216,11 +216,16 @@ def surfaceChart(
if data.get('wireframe', False):
axis.plot_wireframe(
x, y, z, rstride=1, cstride=1, cmap=cm.coolwarm # type: ignore
x,
y,
z,
rstride=1,
cstride=1,
cmap=cm.coolwarm, # pylint: disable=no-member # type: ignore
)
else:
axis.plot_surface(
x, y, z, rstride=1, cstride=1, cmap=cm.coolwarm # type: ignore
x, y, z, rstride=1, cstride=1, cmap=cm.coolwarm # type: ignore # pylint: disable=no-member
)
axis.set_title(data.get('title', ''))

View File

@ -48,14 +48,20 @@ logger = logging.getLogger(__name__)
class Report(UserInterface):
mime_type: typing.ClassVar[str] = 'application/pdf' # Report returns pdfs by default, but could be anything else
mime_type: typing.ClassVar[
str
] = 'application/pdf' # Report returns pdfs by default, but could be anything else
name: typing.ClassVar[str] = _('Base Report') # Report name
group: typing.ClassVar[str] = '' # So we can "group" reports by kind?
encoded: typing.ClassVar[bool] = True # If the report is mean to be encoded (binary reports as PDFs == True, text reports must be False so utf-8 is correctly threated
encoded: typing.ClassVar[
bool
] = True # If the report is mean to be encoded (binary reports as PDFs == True, text reports must be False so utf-8 is correctly threated
uuid: typing.ClassVar[str] = ''
description: str = _('Base report') # Report description
filename: str = 'file.pdf' # Filename that will be returned as 'hint' on rest report request
filename: str = (
'file.pdf' # Filename that will be returned as 'hint' on rest report request
)
@classmethod
def translated_name(cls):
@ -81,35 +87,42 @@ class Report(UserInterface):
@classmethod
def getUuid(cls):
if cls.uuid is None:
raise Exception('Class does not includes an uuid!!!: {}'.format(cls))
raise Exception(f'Class does not includes an uuid!!!: {cls}')
return cls.uuid
@staticmethod
def asPDF(html: str, header: typing.Optional[str] = None, water: typing.Optional[str] = None, images: typing.Optional[typing.Dict[str, bytes]] = None) -> bytes:
def asPDF(
html: str,
header: typing.Optional[str] = None,
water: typing.Optional[str] = None,
images: typing.Optional[typing.Dict[str, bytes]] = None,
) -> bytes:
"""
Renders an html as PDF.
Uses the "report.css" as stylesheet
"""
# url fetcher for weasyprint
def report_fetcher(url: str, timeout=10, ssl_context=None) -> typing.Dict:
def report_fetcher(
url: str, timeout=10, ssl_context=None # pylint: disable=unused-argument
) -> typing.Dict:
logger.debug('Getting url for weasyprint %s', url)
if url.startswith('stock://'):
imagePath = stock.getStockImagePath(url[8:])
with open(imagePath, 'rb') as f:
image = f.read()
return dict(string=image, mime_type='image/png')
return {'string': image, 'mime_type': 'image/png'}
if url.startswith('image://'):
img: typing.Optional[bytes] = b'' # Empty image
if images:
img = images.get(url[8:])
logger.debug('Getting image %s? %s', url[8:], img is not None)
return dict(string=img, mime_type='image/png')
return {'string': img, 'mime_type': 'image/png'}
return default_url_fetcher(url)
with open(stock.getStockCssPath('report.css'), 'r') as f:
with open(stock.getStockCssPath('report.css'), 'r', encoding='utf-8') as f:
css = f.read()
css = (
@ -117,13 +130,20 @@ class Report(UserInterface):
.replace('{page}', _('Page'))
.replace('{of}', _('of'))
.replace('{water}', water or 'UDS Report')
.replace('{printed}', _('Printed in {now:%Y, %b %d} at {now:%H:%M}').format(now=datetime.datetime.now()))
.replace(
'{printed}',
_('Printed in {now:%Y, %b %d} at {now:%H:%M}').format(
now=datetime.datetime.now()
),
)
)
h = HTML(string=html, url_fetcher=report_fetcher)
c = CSS(string=css)
return typing.cast(bytes, h.write_pdf(stylesheets=[c])) # Return a new bytes object
return typing.cast(
bytes, h.write_pdf(stylesheets=[c])
) # Return a new bytes object
@staticmethod
def templateAsPDF(templateName, dct, header=None, water=None, images=None) -> bytes:
@ -184,4 +204,4 @@ class Report(UserInterface):
return typing.cast(str, data)
def __str__(self):
return 'Report {} with uuid {}'.format(self.name, self.uuid)
return f'Report {self.name} with uuid {self.uuid}'

View File

@ -32,13 +32,16 @@
"""
import typing
from uds.core.exceptions import UDSException
if typing.TYPE_CHECKING:
from uds.models import UserService, Transport
class ServiceException(Exception):
def __init__(self, *args, **kwargs):
super(ServiceException, self).__init__(*args)
class ServiceException(UDSException):
"""
Base class for all service exceptions
"""
class UnsupportedException(ServiceException):

View File

@ -37,7 +37,6 @@ from uds.core.module import Module
from uds.core.environment import Environment
from uds.core.util import log
from uds.core.util.config import GlobalConfig
from uds.core.ui import gui
# Not imported at runtime, just for type checking
@ -213,7 +212,7 @@ class ServiceProvider(Module):
"""
Logs a message with requested level associated with this service
"""
from uds.models import Provider as DBProvider
from uds.models import Provider as DBProvider # pylint: disable=import-outside-toplevel
if self.getUuid():
log.doLog(

View File

@ -33,10 +33,11 @@
import typing
import logging
from uds.core.util import factory
from .provider import ServiceProvider
from .service import Service
from uds.core.util import factory
logger = logging.getLogger(__name__)

View File

@ -30,6 +30,7 @@
"""
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
"""
import abc
import typing
from uds.core.environment import Environmentable
@ -158,6 +159,7 @@ class Publication(Environmentable, Serializable):
def getUuid(self) -> str:
return self._uuid
@abc.abstractmethod
def publish(self) -> str:
"""
This method is invoked whenever the administrator requests a new publication.
@ -192,11 +194,10 @@ class Publication(Environmentable, Serializable):
this method.
"""
raise NotImplementedError(
'publish method for class {0} not provided! '.format(
self.__class__.__name__
)
f'publish method for class {self.__class__.__name__} not provided! '
)
@abc.abstractmethod
def checkState(self) -> str:
"""
This is a task method. As that, the expected return values are
@ -222,9 +223,7 @@ class Publication(Environmentable, Serializable):
this method.
"""
raise NotImplementedError(
'checkState method for class {0} not provided!!!'.format(
self.__class__.__name__
)
f'checkState method for class {self.__class__.__name__} not provided!!!'
)
def finish(self) -> None:
@ -251,6 +250,7 @@ class Publication(Environmentable, Serializable):
"""
return 'unknown'
@abc.abstractmethod
def destroy(self) -> str:
"""
This is a task method. As that, the expected return values are
@ -270,9 +270,10 @@ class Publication(Environmentable, Serializable):
this method.
"""
raise NotImplementedError(
'destroy method for class {0} not provided!'.format(self.__class__.__name__)
f'destroy method for class {self.__class__.__name__} not provided!'
)
@abc.abstractmethod
def cancel(self) -> str:
"""
This is a task method. As that, the expected return values are
@ -292,11 +293,11 @@ class Publication(Environmentable, Serializable):
this method.
"""
raise NotImplementedError(
'cancel method for class {0} not provided!'.format(self.__class__.__name__)
f'cancel method for class {self.__class__.__name__} not provided!'
)
def __str__(self):
"""
String method, mainly used for debugging purposes
"""
return "Base Publication"
return 'Base Publication'

View File

@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
# pylint: disable=unused-argument # this has a lot of "default" methods, so we need to ignore unused arguments most of the time
#
# Copyright (c) 2012-2021 Virtual Cable S.L.U.
@ -281,9 +281,7 @@ class Service(Module):
name returned is unique :-)
"""
raise Exception(
'The class {0} has been marked as manually asignable but no requestServicesForAssignetion provided!!!'.format(
self.__class__.__name__
)
f'The class {self.__class__.__name__} has been marked as manually asignable but no requestServicesForAssignetion provided!!!'
)
def macGenerator(self) -> typing.Optional['UniqueMacGenerator']:
@ -313,7 +311,9 @@ class Service(Module):
return []
def assignFromAssignables(
self, assignableId: str, user: 'models.User', userDeployment: UserDeployment
self, assignableId: str,
user: 'models.User',
userDeployment: UserDeployment
) -> str:
"""
Assigns from it internal assignable list to an user
@ -398,7 +398,7 @@ class Service(Module):
"""
Logs a message with requested level associated with this service
"""
from uds.models import Service as DBService
from uds.models import Service as DBService # pylint: disable=import-outside-toplevel
if self.getUuid():
log.doLog(

View File

@ -271,7 +271,9 @@ class UserDeployment(Environmentable, Serializable):
"""
raise NotImplementedError('Base getUniqueId for User Deployment called!!!')
def notifyReadyFromOsManager(self, data: typing.Any) -> str:
def notifyReadyFromOsManager(
self, data: typing.Any # pylint: disable=unused-argument
) -> str:
"""
This is a task method. As that, the excepted return values are
State values RUNNING, FINISHED or ERROR.
@ -388,9 +390,7 @@ class UserDeployment(Environmentable, Serializable):
this method.
"""
raise Exception(
'Base deploy for cache invoked! for class {0}'.format(
self.__class__.__name__
)
f'Base deploy for cache invoked! for class {self.__class__.__name__}'
)
def deployForUser(self, user: 'models.User') -> str:
@ -427,9 +427,7 @@ class UserDeployment(Environmentable, Serializable):
this method.
"""
raise NotImplementedError(
'Base deploy for user invoked! for class {0}'.format(
self.__class__.__name__
)
f'Base deploy for user invoked! for class {self.__class__.__name__}'
)
def checkState(self) -> str:
@ -456,7 +454,7 @@ class UserDeployment(Environmentable, Serializable):
this method.
"""
raise NotImplementedError(
'Base check state invoked! for class {0}'.format(self.__class__.__name__)
f'Base check state invoked! for class {self.__class__.__name__}'
)
def finish(self) -> None:
@ -473,7 +471,7 @@ class UserDeployment(Environmentable, Serializable):
nothing)
"""
def moveToCache(self, newLevel: int) -> str:
def moveToCache(self, newLevel: int) -> str: # pylint: disable=unused-argument
"""
This method is invoked whenever the core needs to move from the current
cache level to a new cache level an user deployment.
@ -564,7 +562,7 @@ class UserDeployment(Environmentable, Serializable):
this method.
"""
raise NotImplementedError(
'destroy method for class {0} not provided!'.format(self.__class__.__name__)
f'destroy method for class {self.__class__.__name__} not provided!'
)
def cancel(self) -> str:
@ -631,6 +629,7 @@ class UserDeployment(Environmentable, Serializable):
monitors: number of monitors to use
"""
return None
def __str__(self):
"""
Mainly used for debugging purposses

View File

@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
# pylint: disable=unused-argument # this has a lot of "default" methods, so we need to ignore unused arguments most of the time
#
# Copyright (c) 2012-2022 Virtual Cable S.L.U.
@ -30,7 +30,6 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import os
import sys
import codecs
import logging
@ -156,7 +155,7 @@ class Transport(Module):
Returns a customized error message, that will be used when a service fails to check "isAvailableFor"
Override this in yours transports if needed
"""
return "Not accessible (using service ip {})".format(ip)
return f'Not accessible (using service ip {ip})'
@classmethod
def supportsProtocol(cls, protocol: typing.Union[typing.Iterable, str]):
@ -229,7 +228,7 @@ class Transport(Module):
userService: 'models.UserService',
transport: 'models.Transport',
ip: str,
os: 'DetectedOsInfo',
os: 'DetectedOsInfo', # pylint: disable=redefined-outer-name
user: 'models.User',
password: str,
request: 'ExtendedHttpRequestWithUser',
@ -263,7 +262,7 @@ class Transport(Module):
userService: 'models.UserService',
transport: 'models.Transport',
ip: str,
os: 'DetectedOsInfo',
os: 'DetectedOsInfo', # pylint: disable=redefined-outer-name
user: 'models.User',
password: str,
request: 'ExtendedHttpRequestWithUser',
@ -296,12 +295,16 @@ class Transport(Module):
params: Parameters for the return tuple
"""
# Reads script and signature
import os # pylint: disable=import-outside-toplevel
basePath = os.path.dirname(
sys.modules[self.__module__].__file__ or 'not_found'
) # Will raise if not found
script = open(os.path.join(basePath, scriptName), 'r').read()
signature = open(os.path.join(basePath, scriptName + '.signature'), 'r').read()
with open(os.path.join(basePath, scriptName), 'r', encoding='utf8') as scriptFile:
script = scriptFile.read()
with open(os.path.join(basePath, scriptName + '.signature'), 'r', encoding='utf8') as signatureFile:
signature = signatureFile.read()
return TransportScript(
script=script,
@ -320,7 +323,7 @@ class Transport(Module):
Returns a script for the given os and type
"""
return self.getRelativeScript(
'scripts/{os}/{type}.py'.format(os=osName, type=type), params
f'scripts/{osName}/{type}.py', params
)
def getLink(
@ -328,7 +331,7 @@ class Transport(Module):
userService: 'models.UserService',
transport: 'models.Transport',
ip: str,
os: 'DetectedOsInfo',
os: 'DetectedOsInfo', # pylint: disable=redefined-outer-name
user: 'models.User',
password: str,
request: 'ExtendedHttpRequestWithUser',

View File

@ -2,4 +2,5 @@
# Allow old serialized autoattributes to load correctly (register path...)
# pylint: disable=unused-wildcard-import,wildcard-import
from .auto_attributes import *

View File

@ -105,7 +105,9 @@ class AutoAttributes(Serializable):
return
# We keep original data (maybe incomplete)
if data[:2] == b'v1':
self.attrs = pickle.loads(data[2:]) # nosec: pickle is used to load data from trusted source
self.attrs = pickle.loads(
data[2:]
) # nosec: pickle is used to load data from trusted source
return
# We try to load as v0
try:
@ -117,16 +119,18 @@ class AutoAttributes(Serializable):
k, v = pair.split(b'\1')
# logger.debug('k: %s --- v: %s', k, v)
try:
self.attrs[k.decode()] = pickle.loads(v) # nosec: pickle is used to load data from trusted source
self.attrs[k.decode()] = pickle.loads(
v
) # nosec: pickle is used to load data from trusted source
except Exception: # Old encoding on python2, set encoding for loading
self.attrs[k.decode()] = pickle.loads(v, encoding='utf8') # nosec: pickle is used to load data from trusted source
self.attrs[k.decode()] = pickle.loads(
v, encoding='utf8'
) # nosec: pickle is used to load data from trusted source
def __repr__(self) -> str:
return (
'AutoAttributes('
+ ', '.join(
'{}={}'.format(k, v.getType().__name__) for k, v in self.attrs.items()
)
+ ', '.join(f'{k}={v.getType().__name__}' for k, v in self.attrs.items())
+ ')'
)
@ -134,7 +138,7 @@ class AutoAttributes(Serializable):
return (
'<AutoAttribute '
+ ','.join(
'{} ({}) = {}'.format(k, v.getType(), v.getStrValue())
f'{k} ({v.getType()}) = {v.getStrValue()}'
for k, v in self.attrs.items()
)
+ '>'

View File

@ -59,6 +59,7 @@ from cryptography import fernet
from django.conf import settings
# pylint: disable=too-few-public-methods
class _Unassigned:
pass
@ -97,7 +98,7 @@ def fernet_key(crypt_key: bytes) -> str:
# First, with seed + password, generate a 32 bytes key based on seed + password
return base64.b64encode(hashlib.sha256(crypt_key).digest()).decode()
# pylint: disable=unnecessary-dunder-call
class _SerializableField(typing.Generic[T]):
name: str
type: typing.Type[T]
@ -110,9 +111,8 @@ class _SerializableField(typing.Generic[T]):
def _default(self) -> T:
if isinstance(self.default, _Unassigned):
return self.type()
elif callable(self.default):
if callable(self.default):
return self.default()
else:
return self.default
def __get__(
@ -309,13 +309,12 @@ class PasswordField(StringField):
class _FieldNameSetter(type):
"""Simply adds the name of the field in the class to the field object"""
def __new__(cls, name, bases, attrs):
fields = dict()
def __new__(mcs, name, bases, attrs):
for k, v in attrs.items():
if isinstance(v, _SerializableField):
v.name = k
return super().__new__(cls, name, bases, attrs)
return super().__new__(mcs, name, bases, attrs)
class AutoSerializable(metaclass=_FieldNameSetter):
@ -346,8 +345,6 @@ class AutoSerializable(metaclass=_FieldNameSetter):
"""
return bytes(a ^ b for a, b in zip(data, itertools.cycle(header)))
return data
def unprocess_data(self, header: bytes, data: bytes) -> bytes:
"""Process data after unmarshalling
@ -367,7 +364,7 @@ class AutoSerializable(metaclass=_FieldNameSetter):
def marshal(self) -> bytes:
# Iterate over own members and extract fields
fields = {}
for k, v in self.__class__.__dict__.items():
for _, v in self.__class__.__dict__.items():
if isinstance(v, _SerializableField):
fields[v.name] = (str(v.__class__.__name__), v.marshal(self))
@ -435,16 +432,17 @@ class AutoSerializable(metaclass=_FieldNameSetter):
# Remove from data
data = data[8 + name_len + type_name_len + data_len :]
for k, v in self.__class__.__dict__.items():
for _, v in self.__class__.__dict__.items():
if isinstance(v, _SerializableField):
if v.name in fields and fields[v.name][0] == str(v.__class__.__name__):
v.unmarshal(self, fields[v.name][1])
else:
if not v.name in fields:
logger.warning(f"Field {v.name} not found in unmarshalled data")
logger.warning('Field %s not found in unmarshalled data', v.name)
else:
logger.warning(
f"Field {v.name} has wrong type in unmarshalled data (should be {fields[v.name][0]} and is {v.__class__.name}"
'Field %s has wrong type in unmarshalled data (should be %s and is %s',
v.name, fields[v.name][0], v.__class__.__name__,
)
def __eq__(self, other: typing.Any) -> bool:

View File

@ -107,10 +107,9 @@ class Cache:
Cache.misses += 1
# logger.debug('key not found: %s', skey)
return defValue
except Exception as e:
except Exception:
logger.exception('Error getting cache key: %s', skey)
Cache.misses += 1
# logger.debug('Cache inaccesible: %s:%s', skey, e)
return defValue
def __getitem__(self, key: typing.Union[str, bytes]) -> typing.Any:

View File

@ -225,13 +225,12 @@ class CalendarChecker:
return next_event
def debug(self) -> str:
return "Calendar checker for {}".format(self.calendar)
return f'Calendar checker for {self.calendar}'
@staticmethod
def _cacheKey(key: str) -> str:
# Returns a valid cache key for all caching backends (memcached, redis, or whatever)
# Simple, fastest algorihm is to use md5
h = hashlib.md5() # nosec Thisis just a simpele cache key, no need to use a more secure algorithm
h.update(key.encode('utf-8'))
return h.hexdigest()
return hashlib.md5(
key.encode('utf-8')
).hexdigest() # nosec simple fast algorithm for cache keys

View File

@ -150,10 +150,10 @@ class Config:
self._data = readed.value
self._crypt = readed.crypt or self._crypt
self._longText = readed.long
if self._type != -1 and self._type != readed.field_type:
if self._type not in (-1, readed.field_type):
readed.field_type = self._type
readed.save(update_fields=['field_type'])
if self._help != '' and self._help != readed.help:
if self._help not in ('', readed.help):
readed.help = self._help
readed.save(
update_fields=['help']
@ -169,9 +169,7 @@ class Config:
self._data = self._default
except Exception as e:
logger.info(
'Error accessing db config {0}.{1}'.format(
self._section.name(), self._key
)
'Error accessing db config %s.%s', self._section.name(), self._key
)
logger.exception(e)
self._data = self._default
@ -273,7 +271,7 @@ class Config:
self._data = value
def __str__(self) -> str:
return '{}.{}'.format(self._section.name(), self._key)
return f'{self._section.name()}.{self._key}'
@staticmethod
def section(sectionName):
@ -314,7 +312,10 @@ class Config:
cfg: DBConfig = DBConfig.objects.filter(section=section, key=key)[
0 # type: ignore # Slicing is not supported by pylance right now
]
if checkType and cfg.field_type in (Config.FieldType.READ, Config.FieldType.HIDDEN):
if checkType and cfg.field_type in (
Config.FieldType.READ,
Config.FieldType.HIDDEN,
):
return False # Skip non writable elements
if cfg.crypt:
@ -565,14 +566,18 @@ class GlobalConfig:
'statsAccumFrequency',
'14400',
type=Config.FieldType.NUMERIC,
help=_('Frequency of stats collection in seconds. Default is 4 hours (14400 seconds)'),
help=_(
'Frequency of stats collection in seconds. Default is 4 hours (14400 seconds)'
),
)
# Statisctis accumulation chunk size, in days
STATS_ACCUM_MAX_CHUNK_TIME = Config.section(GLOBAL_SECTION).value(
'statsAccumMaxChunkTime',
'7',
type=Config.FieldType.NUMERIC,
help=_('Maximum number of time to accumulate on one run. Default is 7 (1 week)'),
help=_(
'Maximum number of time to accumulate on one run. Default is 7 (1 week)'
),
)
# If disallow login showing authenticatiors
@ -716,7 +721,10 @@ class GlobalConfig:
help=_('Custom CSS styles applied to the user accesible site'),
)
SITE_INFO: Config.Value = Config.section(CUSTOM_SECTION).value(
'Site information', '', type=Config.FieldType.LONGTEXT, help=_('Site information')
'Site information',
'',
type=Config.FieldType.LONGTEXT,
help=_('Site information'),
)
SITE_FILTER_ONTOP: Config.Value = Config.section(CUSTOM_SECTION).value(
'Show Filter on Top',
@ -781,7 +789,9 @@ class GlobalConfig:
for v in GlobalConfig.__dict__.values():
if isinstance(v, Config.Value):
v.get()
logger.debug('Initialized global config value %s=%s', v.key(), v.get())
logger.debug(
'Initialized global config value %s=%s', v.key(), v.get()
)
for c in _getLater:
logger.debug('Get later: %s', c)

View File

@ -30,11 +30,7 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import logging
import socket
logger = logging.getLogger(__name__)
# import Alias
# Import alias
# pylint: disable=unused-import
from .net import testConnection as testServer

View File

@ -247,7 +247,7 @@ def allowCache(
keyFnc(args[0] if len(args) > 0 else fnc.__name__).encode('utf-8')
)
# compute cache key
cacheKey = '{}-{}'.format(cachePrefix, keyHash.hexdigest())
cacheKey = f'{cachePrefix}-{keyHash.hexdigest()}'
cache = getattr(args[0], 'cache', None) or Cache('functionCache')

View File

@ -41,7 +41,7 @@ def ensure_list(obj: typing.Union[T, typing.Iterable[T]]) -> typing.List[T]:
return [typing.cast(T, obj)]
try:
return [_ for _ in typing.cast(typing.List[T], obj)]
return list(typing.cast(typing.List[T], obj))
except Exception: # Not iterable
return [typing.cast(T, obj)]

View File

@ -1,19 +1,21 @@
import typing
import logging
from uds.core.util import singleton
from uds.core import module
import logging
logger = logging.getLogger(__name__)
T = typing.TypeVar('T', bound=module.Module)
V = typing.TypeVar('V')
class Factory(typing.Generic[V], metaclass=singleton.Singleton):
'''
Generic factory class.
'''
_objects: typing.MutableMapping[str, typing.Type[V]]
def __init__(self) -> None:
@ -46,11 +48,11 @@ class Factory(typing.Generic[V], metaclass=singleton.Singleton):
__getitem__ = get
class ModuleFactory(Factory[T]):
'''
Module Factory class.
'''
def providers(self) -> typing.Mapping[str, typing.Type[T]]:
'''
Returns all providers.

View File

@ -15,12 +15,13 @@
# Modified to add type checking, fix bugs, etc.. by dkmaster@dkmon.com
# pylint: disable=protected-access,too-few-public-methods,unused-argument
import sys
import os
import ctypes
import errno
import typing
import warnings
import logging
from ctypes.util import find_library
@ -44,29 +45,26 @@ if _system == 'Windows':
#
# We have to fix up c_long and c_ulong so that it matches the
# Cygwin (and UNIX) sizes when run on Windows.
import sys
if sys.maxsize > 0xFFFFFFFF:
c_win_long = ctypes.c_int64
c_win_ulong = ctypes.c_uint64
if _system == 'Windows' or _system.startswith('CYGWIN'):
class c_timespec(ctypes.Structure): # type: ignore
class c_timespec(ctypes.Structure): # type: ignore # pylint: disable=too-few-public-methods
_fields_ = [('tv_sec', c_win_long), ('tv_nsec', c_win_long)]
else:
class c_timespec(ctypes.Structure): # type: ignore
class c_timespec(ctypes.Structure): # type: ignore # pylint: disable=too-few-public-methods
_fields_ = [('tv_sec', ctypes.c_long), ('tv_nsec', ctypes.c_long)]
class c_utimbuf(ctypes.Structure):
class c_utimbuf(ctypes.Structure): # type: ignore # pylint: disable=too-few-public-methods
_fields_ = [('actime', c_timespec), ('modtime', c_timespec)]
class c_stat(ctypes.Structure):
class c_stat(ctypes.Structure): # type: ignore # pylint: disable=too-few-public-methods
pass # Platform dependent
@ -92,7 +90,7 @@ if not _libfuse_path:
rootkey, keyname, 0, reg.KEY_READ | reg.KEY_WOW64_32KEY # type: ignore
)
val = str(reg.QueryValueEx(key, valname)[0]) # type: ignore
except WindowsError: # type: ignore
except WindowsError: # type: ignore # pylint: disable=undefined-variable
pass
finally:
if key is not None:
@ -103,16 +101,19 @@ if not _libfuse_path:
reg.HKEY_LOCAL_MACHINE, r"SOFTWARE\WinFsp", r"InstallDir" # type: ignore
)
if _libfuse_path:
_libfuse_path += r"bin\winfsp-%s.dll" % (
_libfuse_path += (
r"bin\winfsp-%s.dll" # pylint: disable=consider-using-f-string
% ( # pylint: disable=consider-using-f-string
"x64" if sys.maxsize > 0xFFFFFFFF else "x86"
)
)
else:
_libfuse_path = find_library('fuse')
if not _libfuse_path:
raise EnvironmentError('Unable to find libfuse')
else:
_libfuse = ctypes.CDLL(_libfuse_path)
_libfuse = ctypes.CDLL(_libfuse_path)
if _system == 'Darwin' and hasattr(_libfuse, 'macfuse_version'):
_system = 'Darwin-MacFuse'
@ -131,7 +132,6 @@ getxattr_t: typing.Type
if _system in ('Darwin', 'Darwin-MacFuse', 'FreeBSD'):
ENOTSUP = 45
c_dev_t = ctypes.c_int32
c_fsblkcnt_t = ctypes.c_ulong
c_fsfilcnt_t = ctypes.c_ulong
@ -295,7 +295,7 @@ elif _system == 'Linux':
('st_mtimespec', c_timespec),
('st_ctimespec', c_timespec),
]
elif _machine == 'ppc64' or _machine == 'ppc64le':
elif _machine in ('ppc64', 'ppc64le'):
c_stat._fields_ = [
('st_dev', c_dev_t),
('st_ino', ctypes.c_ulong),
@ -392,7 +392,7 @@ elif _system == 'Windows' or _system.startswith('CYGWIN'):
('st_birthtimespec', c_timespec),
]
else:
raise NotImplementedError('%s is not supported.' % _system)
raise NotImplementedError(f'{_system} is not supported.')
if _system == 'FreeBSD':
@ -428,7 +428,6 @@ if _system == 'FreeBSD':
('f_frsize', ctypes.c_ulong),
]
elif _system == 'Windows' or _system.startswith('CYGWIN'):
class c_statvfs(ctypes.Structure): # type: ignore
@ -446,7 +445,6 @@ elif _system == 'Windows' or _system.startswith('CYGWIN'):
('f_namemax', c_win_ulong),
]
else:
class c_statvfs(ctypes.Structure): # type: ignore
@ -481,7 +479,6 @@ if _system == 'Windows' or _system.startswith('CYGWIN'):
('lock_owner', ctypes.c_uint64),
]
else:
class fuse_file_info(ctypes.Structure): # type: ignore
@ -713,7 +710,7 @@ class FuseOperations(ctypes.Structure):
def time_of_timespec(ts: c_timespec, use_ns=False):
return ts.tv_sec * 10 ** 9 + ts.tv_nsec # type: ignore
return ts.tv_sec * 10**9 + ts.tv_nsec # type: ignore
def set_st_attrs(st: c_stat, attrs: typing.Mapping[str, int]) -> None:
@ -723,7 +720,7 @@ def set_st_attrs(st: c_stat, attrs: typing.Mapping[str, int]) -> None:
if timespec is None:
continue
timespec.tv_sec, timespec.tv_nsec = divmod(int(val), 10 ** 9)
timespec.tv_sec, timespec.tv_nsec = divmod(int(val), 10**9)
elif hasattr(st, key):
setattr(st, key, val)
@ -749,10 +746,11 @@ def fuse_exit():
class FuseOSError(OSError):
def __init__(self, errno):
super(FuseOSError, self).__init__(errno, os.strerror(errno))
def __init__(self, errno): # pylint: disable=redefined-outer-name
super().__init__(errno, os.strerror(errno))
# pylint: disable=too-many-public-methods
class FUSE:
'''
This class is the lower level interface and should not be subclassed under
@ -776,7 +774,6 @@ class FUSE:
encoding: typing.Optional[str] = None,
**kwargs,
) -> None:
'''
Setting raw_fi to True will cause FUSE to pass the fuse_file_info
class as is to Operations, instead of just the fh field.
@ -839,8 +836,8 @@ class FUSE:
del self.operations # Invoke the destructor
if type(self.__critical_exception) is not Exception:
raise self.__critical_exception
if not isinstance(self.__critical_exception, Exception):
raise self.__critical_exception # type: ignore
if err:
raise RuntimeError(err)
@ -863,7 +860,6 @@ class FUSE:
# private_data field of struct fuse_context
return func(*args, **kwargs) or 0
else:
try:
return func(*args, **kwargs) or 0
@ -876,7 +872,6 @@ class FUSE:
e.errno,
)
return -e.errno
else:
logger.error(
"FUSE operation %s raised an OSError with negative "
"errno %s, returning errno.EINVAL.",
@ -1003,9 +998,7 @@ class FUSE:
logger.debug('Read operation on %s returned %d bytes', path, retsize)
if retsize > size:
raise RuntimeError(
"read too much data ({} bytes, expected {})".format(retsize, size)
)
raise RuntimeError(f'read too much data ({retsize} bytes, expected {size})')
ctypes.memmove(buf, ret, retsize)
return retsize
@ -1271,8 +1264,8 @@ class Operations:
def __call__(self, op: str, *args) -> typing.Any:
try:
return getattr(self, op)(*args)
except AttributeError:
raise FuseOSError(errno.EFAULT)
except AttributeError as e:
raise FuseOSError(errno.EFAULT) from e
def access(self, path: str, amode: int) -> None:
return
@ -1297,8 +1290,7 @@ class Operations:
raise FuseOSError(errno.EROFS)
def destroy(self, path: str) -> None:
'Called on filesystem destruction. Path is always /'
pass
'''Called on filesystem destruction. Path is always /'''
def flush(self, path: typing.Optional[str], fh: typing.Any) -> None:
pass
@ -1325,7 +1317,7 @@ class Operations:
if path != '/':
raise FuseOSError(errno.ENOENT)
return dict(st_mode=(S_IFDIR | 0o755), st_nlink=2)
return {'st_mode': (S_IFDIR | 0o755), 'st_nlink': 2}
def getxattr(self, path: str, name: str, position: int = 0) -> str:
raise FuseOSError(ENOTSUP)
@ -1336,7 +1328,6 @@ class Operations:
Use it instead of __init__ if you start threads on initialization.
'''
pass
def ioctl(
self, path: str, cmd: bytes, arg: bytes, fip: int, flags: int, data: bytes

View File

@ -50,6 +50,6 @@ def hash_key(key: typing.Union[str, bytes]) -> str:
Returns a hash of the given key
"""
if isinstance(key, str):
return hasher(key.encode('utf-8')).hexdigest()
key = key.encode('utf-8')
return hasher(key).hexdigest()

View File

@ -43,24 +43,23 @@ logger = logging.getLogger(__name__)
def udsLink(request: 'HttpRequest', ticket: str, scrambler: str) -> str:
if request.is_secure():
proto = 'udss'
else:
proto = 'uds'
return "{}://{}{}/{}".format(
proto, request.build_absolute_uri('/').split('//')[1], ticket, scrambler
)
return f'{proto}://{request.build_absolute_uri("/")}{ticket}/{scrambler}'
def udsAccessLink(
request: 'HttpRequest', serviceId: str, transportId: typing.Optional[str]
request: 'HttpRequest', # pylint: disable=unused-argument
serviceId: str,
transportId: typing.Optional[str],
) -> str:
'''
If transportId (uuid) is None, this will be a metaLink
'''
return 'udsa://{}/{}'.format(serviceId, transportId or 'meta')
return f'udsa://{serviceId}/{transportId or "meta"}'
def parseDate(dateToParse) -> datetime.date:
@ -97,4 +96,3 @@ def extractKey(dictionary: typing.Dict, key: typing.Any, **kwargs) -> str:
else:
value = default
return value

View File

@ -1,4 +1,4 @@
# -*- coding: utf-8 -*-
# pylint: disable=no-member
#
# Copyright (c) 2016-2021 Virtual Cable S.L.U.
@ -36,6 +36,13 @@ import tempfile
import os.path
import ldap.filter
from ldap import (
SCOPE_BASE, # type: ignore
SCOPE_SUBTREE, # type: ignore
SCOPE_ONELEVEL, # type: ignore
# SCOPE_SUBORDINATE, # type: ignore
)
from django.utils.translation import gettext as _
from uds.core.util import tools
@ -44,23 +51,16 @@ logger = logging.getLogger(__name__)
LDAPResultType = typing.MutableMapping[str, typing.Any]
from ldap import (
SCOPE_BASE, # type: ignore
SCOPE_SUBTREE, # type: ignore
SCOPE_ONELEVEL, # type: ignore
SCOPE_SUBORDINATE, # type: ignore
)
class LDAPError(Exception):
@staticmethod
def reraise(e: typing.Any):
_str = _('Connection error: ')
if hasattr(e, 'message') and isinstance(e.message, dict):
_str += '{}, {}'.format(e.message.get('info', ''), e.message.get('desc'))
_str += f'{e.message.get("info", "")}, {e.message.get("desc", "")}'
else:
_str += '{}'.format(e)
raise LDAPError(_str)
_str += str(e)
raise LDAPError(_str) from e
def escape(value: str):
@ -80,7 +80,9 @@ def connection(
timeout: int = 3,
debug: bool = False,
verify_ssl: bool = False,
certificate: typing.Optional[str] = None, # Content of the certificate, not the file itself
certificate: typing.Optional[
str
] = None, # Content of the certificate, not the file itself
) -> typing.Any:
"""
Tries to connect to ldap. If username is None, it tries to connect using user provided credentials.
@ -100,7 +102,7 @@ def connection(
schema = 'ldaps' if ssl else 'ldap'
if port == -1:
port = 636 if ssl else 389
uri = "{}://{}:{}".format(schema, host, port)
uri = f'{schema}://{host}:{port}'
logger.debug('Ldap uri: %s', uri)
l = ldap.initialize(uri=uri) # type: ignore
@ -112,11 +114,15 @@ def connection(
certificate = (certificate or '').strip()
if ssl:
if certificate and verify_ssl: # If not verify_ssl, we don't need the certificate
if (
certificate and verify_ssl
): # If not verify_ssl, we don't need the certificate
# Create a semi-temporary ca file, with the content of the certificate
# The name is from the host, so we can ovwerwrite it if needed
cert_filename = os.path.join(tempfile.gettempdir(), f'ldap-cert-{host}.pem')
with open(cert_filename, 'w') as f:
cert_filename = os.path.join(
tempfile.gettempdir(), f'ldap-cert-{host}.pem'
)
with open(cert_filename, 'w', encoding='utf8') as f:
f.write(certificate)
l.set_option(ldap.OPT_X_TLS_CACERTFILE, cert_filename) # type: ignore
@ -130,12 +136,12 @@ def connection(
l.simple_bind_s(who=username, cred=password)
except ldap.SERVER_DOWN as e: # type: ignore
raise LDAPError(_('Can\'t contact LDAP server') + ': {}'.format(e))
raise LDAPError(_('Can\'t contact LDAP server') + f': {e}') from e
except ldap.LDAPError as e: # type: ignore
LDAPError.reraise(e)
except Exception as e:
logger.exception('Exception connection:')
raise LDAPError('{}'.format(e))
raise LDAPError(str(e)) from e
logger.debug('Connection was successful')
return l
@ -145,8 +151,8 @@ def getAsDict(
con: typing.Any,
base: str,
ldapFilter: str,
attrList: typing.Optional[typing.Iterable[str]]=None,
sizeLimit: int=100,
attrList: typing.Optional[typing.Iterable[str]] = None,
sizeLimit: int = 100,
scope=SCOPE_SUBTREE,
) -> typing.Generator[LDAPResultType, None, None]:
"""
@ -156,7 +162,7 @@ def getAsDict(
logger.debug('Filter: %s, attr list: %s', ldapFilter, attrList)
if attrList:
attrList = [i for i in attrList] # Ensures iterable is a list
attrList = list(attrList) # Ensures iterable is a list
res = None
try:
@ -172,7 +178,7 @@ def getAsDict(
LDAPError.reraise(e)
except Exception as e:
logger.exception('Exception connection:')
raise LDAPError('{}'.format(e))
raise LDAPError(str(e)) from e
logger.debug('Result of search %s on %s: %s', ldapFilter, base, res)
@ -214,9 +220,9 @@ def getFirst(
"""
value = ldap.filter.escape_filter_chars(value)
attrList = [field] + [i for i in attributes] if attributes else []
attrList = [field] + list(attributes) if attributes else []
ldapFilter = '(&(objectClass={})({}={}))'.format(objectClass, field, value)
ldapFilter = f'(&(objectClass={objectClass})({field}={value}))'
try:
obj = next(getAsDict(con, base, ldapFilter, attrList, sizeLimit))
@ -246,9 +252,11 @@ def getRootDSE(con: typing.Any) -> typing.Optional[LDAPResultType]:
@param cont: Connection to LDAP server
@return: None if root DSE is not found, an dictionary of LDAP entry attributes if found (all in unicode on py2, str on py3).
"""
return next(getAsDict(
return next(
getAsDict(
con=con,
base='',
ldapFilter='(objectClass=*)',
scope=SCOPE_BASE,
))
)
)

View File

@ -37,6 +37,7 @@ logger = logging.getLogger(__name__)
T = typing.TypeVar('T', bound=typing.Any)
# We want to write something like this:
# (('<arg>', '<arg2>', 'literal', '<other_arg>', '<other_arg2>', 'literal2', ...), callback)
# Where callback is a function that will be called with the arguments in the order they are
@ -71,19 +72,26 @@ def match(
continue
# Check if all the arguments match
match = True
doMatch = True
for i, arg in enumerate(arg_list):
if matcher[0][i].startswith('<') and matcher[0][i].endswith('>'):
continue
if arg != matcher[0][i]:
match = False
doMatch = False
break
if match:
if doMatch:
# All the arguments match, call the callback
return matcher[1](*[arg for i, arg in enumerate(arg_list) if matcher[0][i].startswith('<') and matcher[0][i].endswith('>')])
return matcher[1](
*[
arg
for i, arg in enumerate(arg_list)
if matcher[0][i].startswith('<') and matcher[0][i].endswith('>')
]
)
logger.warning('No match found for %s with %s', arg_list, args)
# Invoke error callback
error()
return None # In fact, error is expected to raise an exception, so this is never reached

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2015-2019 Virtual Cable S.L.
# Copyright (c) 2015-2019 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -12,7 +12,7 @@
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
@ -31,7 +31,7 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from .common import (
from .common import ( # pylint: disable=unused-import
ACTIVE,
INACTIVE,
)

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2015-2019 Virtual Cable S.L.
# Copyright (c) 2015-2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -12,7 +12,7 @@
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
@ -28,7 +28,7 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pylint: disable=unused-import
from .common import ERROR, FINISHED, RUNNING # @UnusedImport
from .common import ERROR, FINISHED, RUNNING

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2015-2019 Virtual Cable S.L.
# Copyright (c) 2015-2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -12,7 +12,7 @@
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
@ -40,4 +40,4 @@ from .common import (
CANCELED,
LAUNCHING,
PREPARING,
) # @UnusedImport
)

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2015-2019 Virtual Cable S.L.
# Copyright (c) 2015-2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -12,7 +12,7 @@
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
@ -32,4 +32,4 @@
"""
# pylint: disable=unused-import
from .common import ACTIVE, REMOVABLE, REMOVING, REMOVED # @UnusedImport
from .common import ACTIVE, REMOVABLE, REMOVING, REMOVED

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2015-2021 Virtual Cable S.L.U.
# Copyright (c) 2015-2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -32,4 +32,4 @@
"""
# pylint: disable=unused-import
from .common import FOR_EXECUTE # @UnusedImport
from .common import FOR_EXECUTE

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2015-2021 Virtual Cable S.L.U.
# Copyright (c) 2015-2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -41,4 +41,4 @@ from .common import (
REMOVED,
INFO_STATES,
VALID_STATES,
) # @UnusedImport
)

View File

@ -67,6 +67,7 @@ CounterClass = typing.TypeVar(
OT_PROVIDER, OT_SERVICE, OT_SERVICEPOOL, OT_AUTHENTICATOR = range(4)
# Helpers
def _get_Id(obj):
return obj.id if obj.id != -1 else None

View File

@ -63,7 +63,7 @@ if typing.TYPE_CHECKING:
# Os Manager
ET_OSMANAGER_INIT,
ET_OSMANAGER_READY,
ET_OSMANAGER_RELEASE
ET_OSMANAGER_RELEASE,
) = range(11)
# Events names
@ -81,20 +81,14 @@ EVENT_NAMES: typing.Mapping[int, str] = {
ET_OSMANAGER_RELEASE: 'OS Manager release',
}
(
OT_PROVIDER,
OT_SERVICE,
OT_SERVICEPOOL,
OT_AUTHENTICATOR,
OT_OSMANAGER
) = range(5)
(OT_PROVIDER, OT_SERVICE, OT_SERVICEPOOL, OT_AUTHENTICATOR, OT_OSMANAGER) = range(5)
TYPES_NAMES: typing.Mapping[int, str] = {
OT_PROVIDER: 'Provider',
OT_SERVICE: 'Service',
OT_SERVICEPOOL: 'Deployed',
OT_AUTHENTICATOR: 'Authenticator',
OT_OSMANAGER: 'OS Manager'
OT_OSMANAGER: 'OS Manager',
}
MODEL_TO_EVENT: typing.Mapping[typing.Type['models.Model'], int] = {
@ -102,7 +96,7 @@ MODEL_TO_EVENT: typing.Mapping[typing.Type['models.Model'], int] = {
Service: OT_SERVICE,
Provider: OT_PROVIDER,
Authenticator: OT_AUTHENTICATOR,
OSManager: OT_OSMANAGER
OSManager: OT_OSMANAGER,
}
# Events data (fld1, fld2, fld3, fld4):
@ -154,22 +148,23 @@ MODEL_TO_EVENT: typing.Mapping[typing.Type['models.Model'], int] = {
# OT_OSMANAGER_RELEASE: -> On OsManager
# (servicepool_uuid, '', userservice_uuid)
# Helpers
# get owner by type and id
def getOwner(ownerType: int, ownerId: int) -> typing.Optional['models.Model']:
if ownerType == OT_PROVIDER:
return Provider.objects.get(pk=ownerId)
elif ownerType == OT_SERVICE:
if ownerType == OT_SERVICE:
return Service.objects.get(pk=ownerId)
elif ownerType == OT_SERVICEPOOL:
if ownerType == OT_SERVICEPOOL:
return ServicePool.objects.get(pk=ownerId)
elif ownerType == OT_AUTHENTICATOR:
if ownerType == OT_AUTHENTICATOR:
return Authenticator.objects.get(pk=ownerId)
elif ownerType == OT_OSMANAGER:
if ownerType == OT_OSMANAGER:
return OSManager.objects.get(pk=ownerId)
else:
return None
class EventTupleType(typing.NamedTuple):
stamp: datetime.datetime
fld1: str
@ -188,13 +183,14 @@ class EventTupleType(typing.NamedTuple):
def __str__(self) -> str:
# Convert Event type to string first
eventName = EVENT_NAMES[self.event_type]
return '{} {} {} {} {} {}'.format(self.stamp, eventName, self.fld1, self.fld2, self.fld3, self.fld4 )
return (
f'{self.stamp} {eventName} {self.fld1} {self.fld2} {self.fld3} {self.fld4}'
)
EventClass = typing.Union[Provider, Service, ServicePool, Authenticator]
def addEvent(obj: EventClass, eventType: int, **kwargs) -> bool:
"""
Adds a event stat to specified object
@ -237,7 +233,11 @@ def getEvents(
owner_id = obj.pk
for i in StatsManager.manager().getEvents(
MODEL_TO_EVENT[objType], eventType, owner_id=owner_id, since=kwargs.get('since'), to=kwargs.get('to')
MODEL_TO_EVENT[objType],
eventType,
owner_id=owner_id,
since=kwargs.get('since'),
to=kwargs.get('to'),
):
yield EventTupleType(
datetime.datetime.fromtimestamp(i.stamp),
@ -248,7 +248,8 @@ def getEvents(
i.event_type,
)
# tail the events table
# tail the events table
def tailEvents(sleepTime: int = 2) -> typing.Generator[EventTupleType, None, None]:
fromId = None
while True:
@ -263,4 +264,3 @@ def tailEvents(sleepTime: int = 2) -> typing.Generator[EventTupleType, None, Non
)
fromId = i.pk if i.pk > (fromId or 0) else fromId
time.sleep(sleepTime)