1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-03 01:17:56 +03:00

Very big refactoring to adapt to strict checking mode

This commit is contained in:
Adolfo Gómez García 2024-02-23 18:01:02 +01:00
parent 975dd80c5d
commit 4ad32931ee
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
40 changed files with 200 additions and 207 deletions

View File

@ -147,7 +147,7 @@ class Environment:
if id_generator_types is None:
id_generator_types = {}
name = 't-' + table_name + '-' + record_id
id_generators = {}
id_generators: dict[str, typing.Any] = {}
for k, v in id_generator_types.items():
id_generators[k] = v(name)
return Environment(name, id_generators)

View File

@ -46,28 +46,28 @@ if typing.TYPE_CHECKING:
def task_manager() -> 'TaskManager':
from .task import TaskManager # pylint: disable=import-outside-toplevel
from .task import TaskManager
return TaskManager()
def downloads_manager() -> 'DownloadsManager':
from .downloads import DownloadsManager # pylint: disable=import-outside-toplevel
from .downloads import DownloadsManager
return DownloadsManager()
def log_manager() -> 'LogManager':
from .log import LogManager # pylint: disable=import-outside-toplevel
from .log import LogManager
return LogManager()
def publication_manager() -> 'PublicationManager':
from .publication import PublicationManager # pylint: disable=import-outside-toplevel
from .publication import PublicationManager
return PublicationManager()
def notifications_manager() -> 'NotificationsManager':
from .notifications import NotificationsManager # pylint: disable=import-outside-toplevel
from .notifications import NotificationsManager
return NotificationsManager()

View File

@ -65,7 +65,9 @@ if typing.TYPE_CHECKING:
from cryptography.hazmat.primitives.asymmetric.dh import DHPrivateKey
# Note the REAL BIG importance of the SECRET_KEY. if lost, all encripted stored data (almost all fields) will be lost...
UDSK: typing.Final[bytes] = settings.SECRET_KEY[8:24].encode() # UDS key, new, for AES256, so it's 16 bytes length
UDSK: typing.Final[bytes] = settings.SECRET_KEY[
8:24
].encode() # UDS key, new, for AES256, so it's 16 bytes length
class CryptoManager(metaclass=singleton.Singleton):
@ -84,15 +86,27 @@ class CryptoManager(metaclass=singleton.Singleton):
@staticmethod
def aes_key(key: typing.Union[str, bytes], length: int) -> bytes:
if isinstance(key, str):
bkey = key.encode('utf8')
else:
bkey = key
"""
Generate an AES key of the specified length using the provided key.
while len(key) < length:
key += key # type: ignore # Pylance complains about types??
This method is used to generate an AES key of the specified length using the provided key.
kl: list[int] = list(key) # type: ignore # Pylance complains about types??
Args:
key (Union[str, bytes]): The key used to generate the AES key. It can be either a string or bytes.
length (int): The desired length of the AES key.
Returns:
bytes: The generated AES key.
Raises:
ValueError: If the length is not a positive integer.
"""
bkey: bytes = key.encode() if isinstance(key, str) else key
while len(bkey) < length:
bkey += bkey
kl: list[int] = list(bkey)
pos = 0
while len(kl) > length:
kl[pos] ^= kl[length]
@ -184,7 +198,7 @@ class CryptoManager(metaclass=singleton.Singleton):
mult = len(value) // len(key) + 1
value_array = array.array('B', value)
# Ensure key array is at least as long as value_array
key_array = array.array('B', key * mult) # type: ignore # Pylance complains about types??
key_array = array.array('B', key * mult)
# We must return binary in xor, because result is in fact binary
return array.array('B', (value_array[i] ^ key_array[i] for i in range(len(value_array)))).tobytes()
@ -272,7 +286,7 @@ class CryptoManager(metaclass=singleton.Singleton):
try:
ph.verify(hashValue[8:], value)
return True
except Exception as e:
except Exception:
return False # Verify will raise an exception if not valid
# Old sha1

View File

@ -37,7 +37,7 @@ import typing
import collections.abc
from wsgiref.util import FileWrapper
from django.http import HttpResponse, Http404
from django.http import HttpResponse, Http404, HttpRequest
from uds.core.managers.crypto import CryptoManager
from uds.core.util import singleton
@ -59,7 +59,8 @@ class DownloadsManager(metaclass=singleton.Singleton):
_downloadables: dict[str, dict[str, str]] = {}
def __init__(self):
def __init__(self) -> None:
super().__init__()
self._downloadables = {}
@staticmethod
@ -67,7 +68,7 @@ class DownloadsManager(metaclass=singleton.Singleton):
# Singleton pattern will return always the same instance
return DownloadsManager()
def register(self, name: str, comment: str, path: str, mime: str = 'application/octet-stream'):
def register(self, name: str, comment: str, path: str, mime: str = 'application/octet-stream') -> None:
"""
Registers a downloadable file.
@param name: name shown
@ -85,7 +86,7 @@ class DownloadsManager(metaclass=singleton.Singleton):
def downloadables(self) -> dict[str, dict[str, str]]:
return self._downloadables
def send(self, request, _id) -> HttpResponse:
def send(self, request: 'HttpRequest', _id: str) -> HttpResponse:
if _id not in self._downloadables:
logger.error('Downloadable id %s not found in %s!!!', _id, self._downloadables)
raise Http404
@ -96,7 +97,7 @@ class DownloadsManager(metaclass=singleton.Singleton):
self._downloadables[_id]['mime'],
)
def _send_file(self, _, name, filename, mime) -> HttpResponse:
def _send_file(self, request: 'HttpRequest', name: str, filename: str, mime: str) -> HttpResponse:
"""
Send a file through Django without loading the whole file into
memory at once. The FileWrapper will turn the file object into an

View File

@ -30,6 +30,7 @@
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnknownMemberType=false
import logging
import io
@ -38,7 +39,7 @@ import collections.abc
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
from matplotlib.figure import Figure
from matplotlib import cm
from matplotlib import colormaps
# This must be imported to allow 3d projections
from mpl_toolkits.mplot3d.axes3d import Axes3D # pylint: disable=unused-import
@ -81,10 +82,10 @@ def bar_chart(
ys = data['y']
width = 0.60
fig: Figure = Figure(figsize=(size[0], size[1]), dpi=size[2]) # type: ignore
fig: Figure = Figure(figsize=(size[0], size[1]), dpi=size[2])
FigureCanvas(fig) # Stores canvas on fig.canvas
axis = fig.add_subplot(1, 1, 1) # type: ignore
axis = fig.add_subplot(1, 1, 1)
axis.grid(color='r', linestyle='dotted', linewidth=0.1, alpha=0.5)
bottom = np.zeros(len(ys[0]['data']))
@ -138,10 +139,10 @@ def line_chart(
x = data['x']
y = data['y']
fig: Figure = Figure(figsize=(size[0], size[1]), dpi=size[2]) # type: ignore
fig: Figure = Figure(figsize=(size[0], size[1]), dpi=size[2])
FigureCanvas(fig) # Stores canvas on fig.canvas
axis = fig.add_subplot(111) # type: ignore
axis = fig.add_subplot(111)
axis.grid(color='r', linestyle='dotted', linewidth=0.1, alpha=0.5)
for i in y:
@ -209,25 +210,18 @@ def surface_chart(
logger.debug('Y\': %s', y)
logger.debug('Z\': %s', z)
fig: Figure = Figure(figsize=(size[0], size[1]), dpi=size[2]) # type: ignore
fig: Figure = Figure(figsize=(size[0], size[1]), dpi=size[2])
FigureCanvas(fig) # Stores canvas on fig.canvas
axis: typing.Any = fig.add_subplot(1, 1, 1, projection='3d') # type: ignore
axis: typing.Any = fig.add_subplot(1, 1, 1, projection='3d')
# axis.grid(color='r', linestyle='dotted', linewidth=0.1, alpha=0.5)
cmap = colormaps['coolwarm']
if data.get('wireframe', False):
axis.plot_wireframe(
x,
y,
z,
rstride=1,
cstride=1,
cmap=cm.coolwarm, # type: ignore # it's there, but maybe it's created dynamically
)
axis.plot_wireframe(x, y, z, rstride=1, cstride=1, cmap=cmap)
else:
axis.plot_surface(
x, y, z, rstride=1, cstride=1, cmap=cm.coolwarm # type: ignore # pylint: disable=no-member
)
axis.plot_surface(x, y, z, rstride=1, cstride=1, cmap=cmap)
axis.set_title(data.get('title', ''))
axis.set_xlabel(data['xlabel'])

View File

@ -44,9 +44,9 @@ class ExtendedHttpRequest(HttpRequest):
ip_version: int
ip_proxy: str
os: 'types.os.DetectedOsInfo'
user: typing.Optional['User'] # type: ignore # Override base user to be optional
user: typing.Optional['User'] # pyright: ignore[reportIncompatibleVariableOverride]
authorized: bool
class ExtendedHttpRequestWithUser(ExtendedHttpRequest):
user: 'User' # type: ignore # Has the user always
user: 'User' # pyright: ignore[reportIncompatibleVariableOverride]

View File

@ -1285,7 +1285,7 @@ class gui:
if not isinstance(value, collections.abc.Iterable):
value = [gui.as_str(value)]
else: # Is an iterable
value = [gui.as_str(i) for i in typing.cast(collections.abc.Iterable[typing.Any], value)]
value = [gui.as_str(i) for i in value] # pyright: ignore[reportUnknownVariableType]
super()._set_value(value)
def as_list(self) -> list[str]:
@ -1368,7 +1368,7 @@ class gui:
if not isinstance(value, collections.abc.Iterable):
value = [gui.as_str(value)]
else:
value = [gui.as_str(i) for i in typing.cast(collections.abc.Iterable[typing.Any], value)]
value = [gui.as_str(i) for i in value] # pyright: ignore[reportUnknownVariableType]
super()._set_value(value)
def as_list(self) -> list[str]:

View File

@ -250,7 +250,7 @@ class _SerializableField(typing.Generic[T]):
"""
if typing.cast(typing.Type[typing.Any], self.obj_type) in (str, int, float):
tp: typing.Type[T] = self.obj_type
self.__set__(instance, tp(data.decode()))
self.__set__(instance, tp(data.decode())) # type: ignore # mypy complains about calling tp(...)
return
raise TypeError(f"Field {self.name} cannot be unmarshalled (type {self.obj_type})")

View File

@ -52,19 +52,21 @@ class NetworkType(typing.NamedTuple):
logger = logging.getLogger(__name__)
# Test patters for networks IPv4
RECIDRIPV4: typing.Final[re.Pattern] = re.compile(
RECIDRIPV4: typing.Final[re.Pattern[str]] = re.compile(
r'^([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})/([0-9]{1,2})$'
)
REMASKIPV4: typing.Final[re.Pattern] = re.compile(
REMASKIPV4: typing.Final[re.Pattern[str]] = re.compile(
r'^([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})netmask([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})$'
)
RE1ASTERISKIPV4: typing.Final[re.Pattern] = re.compile(r'^([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})\.\*$')
RE2ASTERISKIPV4: typing.Final[re.Pattern] = re.compile(r'^([0-9]{1,3})\.([0-9]{1,3})\.\*\.?\*?$')
RE3ASTERISKIPV4: typing.Final[re.Pattern] = re.compile(r'^([0-9]{1,3})\.\*\.?\*?\.?\*?$')
RERANGEIPV4: typing.Final[re.Pattern] = re.compile(
RE1ASTERISKIPV4: typing.Final[re.Pattern[str]] = re.compile(r'^([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})\.\*$')
RE2ASTERISKIPV4: typing.Final[re.Pattern[str]] = re.compile(r'^([0-9]{1,3})\.([0-9]{1,3})\.\*\.?\*?$')
RE3ASTERISKIPV4: typing.Final[re.Pattern[str]] = re.compile(r'^([0-9]{1,3})\.\*\.?\*?\.?\*?$')
RERANGEIPV4: typing.Final[re.Pattern[str]] = re.compile(
r'^([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})-([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})$'
)
RESINGLEIPV4: typing.Final[re.Pattern] = re.compile(r'^([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})$')
RESINGLEIPV4: typing.Final[re.Pattern[str]] = re.compile(
r'^([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})\.([0-9]{1,3})$'
)
def ip_to_long(ip: str) -> IpType:
@ -110,12 +112,12 @@ def network_from_str_ipv4(nets_string: str) -> NetworkType:
input_string = nets_string
logger.debug('Getting network from %s', nets_string)
def check(*args) -> None:
def check(*args: str) -> None:
for n in args:
if int(n) < 0 or int(n) > 255:
raise Exception()
def to_num(*args) -> int:
def to_num(*args: str) -> int:
start = 256 * 256 * 256
val = 0
for n in args:
@ -230,11 +232,7 @@ def networks_from_str(
If allowMultipleNetworks is True, it allows ',' and ';' separators (and, ofc, more than 1 network)
Returns a list of networks tuples in the form [(start1, end1), (start2, end2) ...]
"""
res = []
for strNet in re.split('[;,]', nets):
if strNet:
res.append(network_from_str(strNet, version))
return res
return [network_from_str(str_net, version) for str_net in re.split('[;,]', nets) if str_net]
def contains(
@ -277,20 +275,16 @@ def is_valid_fqdn(value: str) -> bool:
)
def is_valid_host(value: str):
def is_valid_host(value: str) -> bool:
return is_valid_ip(value) or is_valid_fqdn(value)
def test_connectivity(host: str, port: int, timeout: float = 4) -> bool:
try:
logger.debug(
'Checking connection to %s:%s with %s seconds timeout', host, port, timeout
)
logger.debug('Checking connection to %s:%s with %s seconds timeout', host, port, timeout)
sock = socket.create_connection((host, port), timeout)
sock.close()
except Exception as e:
logger.debug(
'Exception checking %s:%s with %s timeout: %s', host, port, timeout, e
)
logger.debug('Exception checking %s:%s with %s timeout: %s', host, port, timeout, e)
return False
return True

View File

@ -39,7 +39,7 @@ class SessionSerializer:
"""
Serializer for django sessions.
"""
def dumps(self, data) -> bytes:
def dumps(self, data: typing.Any) -> bytes:
"""
Serialize data for storage in a session.
"""

View File

@ -13,12 +13,15 @@ class Singleton(type):
_instance: typing.Optional[typing.Any]
# We use __init__ so we customise the created class from this metaclass
def __init__(cls, *args, **kwargs) -> None:
# Ensure "_instance" is not inherited
def __init__(cls: 'Singleton', *args: typing.Any, **kwargs: typing.Any) -> None:
"""
Initialize the Singleton metaclass for each class that uses it
"""
cls._instance = None
super().__init__(*args, **kwargs)
def __call__(cls, *args, **kwargs) -> typing.Any:
def __call__(cls: 'Singleton', *args: typing.Any, **kwargs: typing.Any) -> typing.Any:
if cls._instance is None:
cls._instance = super().__call__(*args, **kwargs)
return cls._instance

View File

@ -38,11 +38,11 @@ class StateQueue:
_queue: list[typing.Any]
_current: typing.Optional[typing.Any]
def __init__(self):
def __init__(self) -> None:
self._queue = []
self._current = None
def __str__(self):
def __str__(self) -> str:
return f'<StateQueue Current: {self._current}, Queue: ({",".join(state for state in self._queue)})>'
def clear(self) -> None:
@ -80,7 +80,7 @@ class StateQueue:
return self._queue.pop(0)
return None
def remove(self, state: typing.Any):
def remove(self, state: typing.Any) -> None:
try:
self._queue.remove(state)
except Exception: # nosec: Fine to ignore exception here

View File

@ -26,21 +26,25 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import collections.abc
import logging
import typing
from django.http import HttpRequest
logger = logging.getLogger(__name__)
class RequestDebug:
class RequestDebugMiddleware:
"""
Used for logging some request data on develeopment
"""
def __init__(self, get_response):
def __init__(self, get_response: collections.abc.Callable[[typing.Any], typing.Any]):
self.get_response = get_response
def __call__(self, request):
logger.info('Request lang: %s', request.LANGUAGE_CODE)
def __call__(self, request: HttpRequest) -> typing.Any:
logger.info('Request: %s', request)
response = self.get_response(request)

View File

@ -30,7 +30,9 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import argparse
import logging
import typing
from django.core.management.base import BaseCommand
from uds.core.util.config import Config, GlobalConfig
@ -42,14 +44,12 @@ class Command(BaseCommand):
args = "<mod.name=value mod.name=value mod.name=value...>"
help = "Updates configuration values. If mod is omitted, UDS will be used. Omit whitespaces betwen name, =, and value (they must be a single param)"
def add_arguments(self, parser) -> None:
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument('name_value', nargs='+', type=str)
# If set as "password field"
parser.add_argument('--password', action='store_true', default=False, help='Set as password field')
def handle(self, *args, **options) -> None:
def handle(self, *args: typing.Any, **options: typing.Any) -> None:
logger.debug("Handling settings")
GlobalConfig.initialize()
try:

View File

@ -30,6 +30,7 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import argparse
import logging
import typing
import collections.abc
@ -45,7 +46,7 @@ logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = "Show current PUBLIC configuration of UDS broker (passwords are not shown)"
def add_arguments(self, parser) -> None:
def add_arguments(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
'--csv',
action='store_true',
@ -61,7 +62,7 @@ class Command(BaseCommand):
help='Shows configuration in YAML format',
)
def handle(self, *args, **options) -> None:
def handle(self, *args: typing.Any, **options: typing.Any) -> None:
logger.debug("Show settings")
config.GlobalConfig.initialize()
try:

View File

@ -54,7 +54,7 @@ from uds.core.util import modfinder
logger = logging.getLogger(__name__)
def __loadModules():
def __loadModules() -> None:
"""
This imports all packages that are descendant of this package, and, after that,
it register all subclases of mfas.MFA

View File

@ -88,12 +88,12 @@ class Account(UUIDModel, TaggingMixin):
return None
tmp = userService.accounting
tmp.user_service = None # type: ignore
tmp.user_service = None
tmp.end = sql_datetime()
tmp.save()
return tmp
class Meta: # pylint: disable=too-few-public-methods
class Meta: # pyright: ignore
"""
Meta class to declare the name of the table at database
"""
@ -101,5 +101,5 @@ class Account(UUIDModel, TaggingMixin):
db_table = 'uds_accounts'
app_label = 'uds'
def __str__(self):
def __str__(self) -> str:
return f'Account id {self.id}, name {self.name}'

View File

@ -75,7 +75,7 @@ class Cache(models.Model):
if now > v.created + timedelta(seconds=v.validity):
v.delete()
def __str__(self):
def __str__(self) -> str:
if sql_datetime() > (self.created + timedelta(seconds=self.validity)):
expired = "Expired"
else:

View File

@ -57,7 +57,7 @@ class Calendar(UUIDModel, TaggingMixin):
calendaraction_set: 'models.manager.RelatedManager[CalendarAction]'
calendaraccess_set: 'models.manager.RelatedManager[CalendarAccess]'
class Meta: # pylint: disable=too-few-public-methods
class Meta: # pyright: ignore
"""
Meta class to declare db table
"""
@ -66,10 +66,10 @@ class Calendar(UUIDModel, TaggingMixin):
app_label = 'uds'
# Override default save to add uuid
def save(self, *args, **kwargs):
def save(self, *args: typing.Any, **kwargs: typing.Any) -> None:
logger.debug('Saving calendar')
res = UUIDModel.save(self, *args, **kwargs)
UUIDModel.save(self, *args, **kwargs)
# Basically, recalculates all related actions next execution time...
try:
@ -80,7 +80,5 @@ class Calendar(UUIDModel, TaggingMixin):
): # nosec: catch all, we don't want to fail here (if one action cannot be saved, we don't want to fail all)
pass
return res
def __str__(self):
def __str__(self) -> str:
return f'Calendar "{self.name}" modified on {self.modified}, rules: {self.rules.count()}'

View File

@ -80,7 +80,7 @@ class Image(UUIDModel):
metaPools: 'models.manager.RelatedManager[MetaPool]'
servicesPoolsGroup: 'models.manager.RelatedManager[ServicePoolGroup]'
class Meta: # pylint: disable=too-few-public-methods
class Meta: # pyright: ignore
"""
Meta class to declare the name of the table at database
"""
@ -114,7 +114,7 @@ class Image(UUIDModel):
"""
Returns the value of the image (data) as a base 64 encoded string
"""
return base64.b64encode(typing.cast(bytes, self.data)).decode()
return base64.b64encode(self.data).decode()
@property
def thumb64(self) -> str:
@ -157,12 +157,10 @@ class Image(UUIDModel):
data = value
elif isinstance(value, str):
data = base64.b64decode(value)
elif isinstance(value, PIL.Image.Image):
else:
with io.BytesIO() as output:
value.save(output, format='PNG')
data = output.getvalue()
else:
raise ValueError('Invalid image type')
self.width, self.height, self.data = Image.prepare_for_db(data)
@ -197,9 +195,9 @@ class Image(UUIDModel):
def thumbnail_as_response(self) -> HttpResponse:
return HttpResponse(self.thumb, content_type='image/png')
def save(self, *args, **kwargs):
def save(self, *args: typing.Any, **kwargs: typing.Any) -> None:
self.stamp = sql_datetime()
return super().save(*args, **kwargs)
def __str__(self):
def __str__(self) -> str:
return f'Image Id: {self.id}, Name: {self.name}, Size: {self.size}, Length: {len(self.data)} bytes, Thumb length: {len(self.thumb)} bytes'

View File

@ -69,9 +69,9 @@ class ManagedObjectModel(UUIDModel):
"""
Returns an environment valid for the record this object represents
"""
return Environment.environment_for_table_record(self._meta.verbose_name, self.id) # type: ignore # pylint: disable=no-member
return Environment.environment_for_table_record(self._meta.verbose_name or self._meta.db_table, self.id)
def deserialize(self, obj: Module, values: typing.Optional[collections.abc.Mapping[str, str]]):
def deserialize(self, obj: Module, values: typing.Optional[collections.abc.Mapping[str, str]]) -> None:
"""
Conditionally deserializes obj if not initialized via user interface and data holds something
"""

View File

@ -60,7 +60,7 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class MetaPool(UUIDModel, TaggingMixin): # type: ignore
class MetaPool(UUIDModel, TaggingMixin):
"""
A meta pool is a pool that has pool members
"""
@ -83,7 +83,9 @@ class MetaPool(UUIDModel, TaggingMixin): # type: ignore
related_name='metaPools',
on_delete=models.SET_NULL,
)
assignedGroups = models.ManyToManyField(Group, related_name='metaPools', db_table='uds__meta_grps')
assignedGroups: 'models.ManyToManyField[Group, MetaPool]' = models.ManyToManyField(
Group, related_name='metaPools', db_table='uds__meta_grps'
)
# Message if access denied
calendar_message = models.CharField(default='', max_length=256)
@ -102,7 +104,7 @@ class MetaPool(UUIDModel, TaggingMixin): # type: ignore
calendarAccess: 'models.manager.RelatedManager[CalendarAccessMeta]'
members: 'models.manager.RelatedManager["MetaPoolMember"]'
class Meta(UUIDModel.Meta): # pylint: disable=too-few-public-methods
class Meta(UUIDModel.Meta): # pyright: ignore
"""
Meta class to declare the name of the table at database
"""
@ -156,7 +158,7 @@ class MetaPool(UUIDModel, TaggingMixin): # type: ignore
return access == types.states.State.ALLOW
def usage(self, cachedValue=-1) -> types.pools.UsageInfo:
def usage(self, cached_value: int = -1) -> types.pools.UsageInfo:
"""
Returns the % used services, then count and the max related to "maximum" user services
If no "maximum" number of services, will return 0% ofc
@ -192,7 +194,7 @@ class MetaPool(UUIDModel, TaggingMixin): # type: ignore
usage_count = 0
max_count = 0
for pool in query:
poolInfo = pool.usage(pool.usage_count) # type:ignore # Anotated field
poolInfo = pool.usage(typing.cast(typing.Any, pool).usage_count) # usage_count is anottated value, integer
usage_count += poolInfo.used
# If any of the pools has no max, then max is -1
if max_count == consts.UNLIMITED or poolInfo.total == consts.UNLIMITED:
@ -207,7 +209,7 @@ class MetaPool(UUIDModel, TaggingMixin): # type: ignore
@property
def visual_name(self) -> str:
logger.debug('SHORT: %s %s %s', self.short_name, self.short_name is not None, self.name)
logger.debug('SHORT: %s %s %s', self.short_name, bool(self.short_name), self.name)
sn = str(self.short_name).strip()
return sn if sn else self.name
@ -261,7 +263,7 @@ class MetaPool(UUIDModel, TaggingMixin): # type: ignore
return meta
@staticmethod
def pre_delete(sender, **kwargs) -> None:
def pre_delete(sender: typing.Any, **kwargs: typing.Any) -> None:
"""
Used to invoke the Service class "Destroy" before deleting it from database.
@ -272,15 +274,15 @@ class MetaPool(UUIDModel, TaggingMixin): # type: ignore
"""
from uds.core.util.permissions import clean # pylint: disable=import-outside-toplevel
toDelete: 'MetaPool' = kwargs['instance']
to_delete: 'MetaPool' = kwargs['instance']
# Clears related logs
log.clear_logs(toDelete)
log.clear_logs(to_delete)
# Clears related permissions
clean(toDelete)
clean(to_delete)
def __str__(self):
def __str__(self) -> str:
return f'Meta pool: {self.name}, no. pools: {self.members.all().count()}, visible: {self.visible}, policy: {self.policy}'

View File

@ -86,7 +86,7 @@ class MFA(ManagedObjectModel, TaggingMixin): # type: ignore
return f'MFA {self.name} of type {self.data_type} (id:{self.id})'
@staticmethod
def pre_delete(sender, **kwargs) -> None: # pylint: disable=unused-argument
def pre_delete(sender: typing.Any, **kwargs: typing.Any) -> None: # pylint: disable=unused-argument
"""
Used to invoke the Service class "Destroy" before deleting it from database.

View File

@ -49,7 +49,7 @@ if typing.TYPE_CHECKING:
from .authenticator import Authenticator
class Network(UUIDModel, TaggingMixin): # type: ignore
class Network(UUIDModel, TaggingMixin):
"""
This model is used for keeping information of networks associated with transports (right now, just transports..)
"""

View File

@ -113,9 +113,9 @@ class Permissions(UUIDModel):
q = Q(group=group)
try:
existing: Permissions = Permissions.objects.filter(q, object_type=object_type.type, object_id=object_id)[
0 # type: ignore # Slicing is not supported by pylance right now
]
existing: Permissions = Permissions.objects.filter(
q, object_type=object_type.type, object_id=object_id
)[0]
existing.permission = permission
existing.save()
return existing
@ -158,9 +158,7 @@ class Permissions(UUIDModel):
Q(object_type=object_type.type),
Q(object_id=None) | Q(object_id=object_id),
q,
).order_by('-permission')[
0 # type: ignore # Slicing is not supported by pylance right now
]
).order_by('-permission')[0]
logger.debug('Got permission %s', perm)
return PermissionType(perm.permission)
except Exception: # DoesNotExists

View File

@ -49,7 +49,7 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class Provider(ManagedObjectModel, TaggingMixin): # type: ignore
class Provider(ManagedObjectModel, TaggingMixin):
"""
A Provider represents the Service provider itself, (i.e. a KVM Server or a Terminal Server)
"""
@ -60,7 +60,7 @@ class Provider(ManagedObjectModel, TaggingMixin): # type: ignore
# objects: 'models.manager.Manager[Provider]'
services: 'models.manager.RelatedManager[Service]'
class Meta(ManagedObjectModel.Meta): # pylint: disable=too-few-public-methods
class Meta(ManagedObjectModel.Meta): # pyright: ignore
"""
Meta class to declare default order
"""
@ -103,7 +103,7 @@ class Provider(ManagedObjectModel, TaggingMixin): # type: ignore
return f'Provider {self.name} of type {self.data_type} (id:{self.id})'
@staticmethod
def pre_delete(sender, **kwargs) -> None: # pylint: disable=unused-argument
def pre_delete(sender: typing.Any, **kwargs: typing.Any) -> None: # pylint: disable=unused-argument
"""
Used to invoke the Provider class "Destroy" before deleting it from database.

View File

@ -87,7 +87,7 @@ class Scheduler(models.Model):
"""
Returns an environment valid for the record this object represents
"""
return Environment.environment_for_table_record(self._meta.verbose_name, self.id) # type: ignore # pylint: disable=no-member
return Environment.environment_for_table_record(self._meta.verbose_name or self._meta.db_table, self.id)
def get_instance(self) -> typing.Optional[jobs.Job]:
"""
@ -102,13 +102,13 @@ class Scheduler(models.Model):
return None
@staticmethod
def pre_delete(sender, **kwargs) -> None: # pylint: disable=unused-argument
def pre_delete(sender: typing.Any, **kwargs: typing.Any) -> None: # pylint: disable=unused-argument
"""
Used to remove environment for sheduled task
"""
toDelete: 'Scheduler' = kwargs['instance']
logger.debug('Deleting sheduled task %s', toDelete)
toDelete.get_environment().clean_related_data()
to_delete: 'Scheduler' = kwargs['instance']
logger.debug('Deleting sheduled task %s', to_delete)
to_delete.get_environment().clean_related_data()
def __str__(self) -> str:
return f'Scheduled task {self.name}, every {self.frecuency}, last execution at {self.last_execution}, state = {self.state}'

View File

@ -61,16 +61,20 @@ class ServiceTokenAlias(models.Model):
This model stores the alias for a service token.
"""
service = models.ForeignKey('Service', on_delete=models.CASCADE, related_name='aliases')
service: 'models.ForeignKey[Service]' = models.ForeignKey(
'Service', on_delete=models.CASCADE, related_name='aliases'
)
alias = models.CharField(max_length=64, unique=True)
unique_id = models.CharField(max_length=128, default='', db_index=True) # Used to locate an already created alias for a userService and service
unique_id = models.CharField(
max_length=128, default='', db_index=True
) # Used to locate an already created alias for a userService and service
def __str__(self) -> str:
return str(self.alias) # pylint complains about CharField
return str(self.alias)
# pylint: disable=no-member
class Service(ManagedObjectModel, TaggingMixin): # type: ignore
class Service(ManagedObjectModel, TaggingMixin):
"""
A Service represents an specidied type of service offered to final users,
with it configuration (i.e. a KVM Base Machine for cloning or a Terminal
@ -83,14 +87,12 @@ class Service(ManagedObjectModel, TaggingMixin): # type: ignore
max_services_count_type = models.PositiveIntegerField(default=ServicesCountingType.STANDARD)
_cached_instance: typing.Optional['services.Service'] = None
# "fake" declarations for type checking
# objects: 'models.manager.Manager["Service"]'
deployedServices: 'models.manager.RelatedManager[ServicePool]'
aliases: 'models.manager.RelatedManager[ServiceTokenAlias]'
class Meta(ManagedObjectModel.Meta): # pylint: disable=too-few-public-methods
class Meta(ManagedObjectModel.Meta): # pyright: ignore
"""
Meta class to declare default order and unique multiple field index
"""
@ -104,7 +106,7 @@ class Service(ManagedObjectModel, TaggingMixin): # type: ignore
Returns an environment valid for the record this object represents
"""
return Environment.environment_for_table_record(
self._meta.verbose_name, # type: ignore
self._meta.verbose_name or self._meta.db_table,
self.id,
{
'mac': unique.UniqueMacGenerator,
@ -130,7 +132,7 @@ class Service(ManagedObjectModel, TaggingMixin): # type: ignore
"""
if self._cached_instance and values is None:
# logger.debug('Got cached instance instead of deserializing a new one for {}'.format(self.name))
return self._cached_instance
return typing.cast('services.Service', self._cached_instance)
prov: 'services.ServiceProvider' = self.provider.get_instance()
sType = prov.get_service_by_type(self.data_type)
@ -202,7 +204,7 @@ class Service(ManagedObjectModel, TaggingMixin): # type: ignore
return f'{self.name} of type {self.data_type} (id:{self.id})'
@staticmethod
def pre_delete(sender, **kwargs) -> None: # pylint: disable=unused-argument
def pre_delete(sender: typing.Any, **kwargs: typing.Any) -> None: # pylint: disable=unused-argument
"""
Used to invoke the Service class "Destroy" before deleting it from database.

View File

@ -71,7 +71,7 @@ logger = logging.getLogger(__name__)
# pylint: disable=too-many-public-methods
class ServicePool(UUIDModel, TaggingMixin): # type: ignore
class ServicePool(UUIDModel, TaggingMixin):
"""
A deployed service is the Service produced element that is assigned finally to an user (i.e. a Virtual Machine, etc..)
"""
@ -176,7 +176,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
"""
Returns an environment valid for the record this object represents
"""
return Environment.environment_for_table_record(self._meta.verbose_name, self.id) # type: ignore
return Environment.environment_for_table_record(self._meta.verbose_name or self._meta.db_table, self.id)
def active_publication(self) -> typing.Optional['ServicePoolPublication']:
"""
@ -303,12 +303,9 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
# Return the date
try:
found = typing.cast(
'UserService',
self.assigned_user_services().filter(user=forUser, state__in=types.states.State.VALID_STATES)[
0
], # type: ignore # Slicing is not supported by pylance right now
)
found = self.assigned_user_services().filter(
user=forUser, state__in=types.states.State.VALID_STATES
)[0] # Raises exception if at least one is not found
if activePub and found.publication and activePub.id != found.publication.id:
ret = self.get_value('toBeReplacedIn')
if ret:
@ -370,7 +367,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
return int((deadline - check_datetime).total_seconds())
def set_value(self, name: str, value: typing.Any):
def set_value(self, name: str, value: typing.Any) -> None:
"""
Stores a value inside custom storage
@ -431,7 +428,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
self,
activePub: typing.Optional['ServicePoolPublication'],
skipAssigned: bool = False,
):
) -> None:
"""
Used when a new publication is finished.
@ -719,7 +716,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
]
)
def __str__(self):
def __str__(self) -> str:
return (
f'Service pool {self.name}({self.id}) with {self.initial_srvs}'
f' as initial, {self.cache_l1_srvs} as L1 cache, {self.cache_l2_srvs}'

View File

@ -113,7 +113,7 @@ class ServicePoolPublication(UUIDModel):
"""
Returns an environment valid for the record this object represents
"""
return Environment.environment_for_table_record(self._meta.verbose_name, self.id) # type: ignore
return Environment.environment_for_table_record(self._meta.verbose_name or self._meta.db_table, self.id)
def get_instance(self) -> 'services.Publication':
"""
@ -196,7 +196,7 @@ class ServicePoolPublication(UUIDModel):
publication_manager().unpublish(self)
def cancel(self):
def cancel(self) -> None:
"""
Invoques the cancelation of this publication
"""
@ -226,9 +226,7 @@ class ServicePoolPublication(UUIDModel):
logger.debug('Deleted publication %s', to_delete)
def __str__(self) -> str:
return (
f'Publication {self.deployed_service.name}, rev {self.revision}, state {State.from_str(self.state).localized}'
)
return f'Publication {self.deployed_service.name}, rev {self.revision}, state {State.from_str(self.state).localized}'
# Connects a pre deletion signal to Authenticator

View File

@ -83,7 +83,7 @@ class Transport(ManagedObjectModel, TaggingMixin):
def service_pools(self) -> 'models.manager.RelatedManager[ServicePool]':
return self.deployedServices
class Meta(ManagedObjectModel.Meta): # pylint: disable=too-few-public-methods
class Meta(ManagedObjectModel.Meta): # pyright: ignore
"""
Meta class to declare default order
"""
@ -157,7 +157,7 @@ class Transport(ManagedObjectModel, TaggingMixin):
return f'{self.name} of type {self.data_type} (id:{self.id})'
@staticmethod
def pre_delete(sender, **kwargs) -> None: # pylint: disable=unused-argument
def pre_delete(sender: typing.Any, **kwargs: typing.Any) -> None: # pylint: disable=unused-argument
"""
Used to invoke the Service class "Destroy" before deleting it from database.
@ -168,17 +168,17 @@ class Transport(ManagedObjectModel, TaggingMixin):
"""
from uds.core.util.permissions import clean # pylint: disable=import-outside-toplevel
toDelete: 'Transport' = kwargs['instance']
to_delete: 'Transport' = kwargs['instance']
logger.debug('Before delete transport %s', toDelete)
logger.debug('Before delete transport %s', to_delete)
# Only tries to get instance if data is not empty
if toDelete.data != '':
s = toDelete.get_instance()
if to_delete.data != '':
s = to_delete.get_instance()
s.destroy()
s.env.clean_related_data()
# Clears related permissions
clean(toDelete)
clean(to_delete)
# : Connects a pre deletion signal to OS Manager

View File

@ -48,7 +48,7 @@ from .uuid_model import UUIDModel
if typing.TYPE_CHECKING:
from uds.models import Group, UserService, Permissions
from uds.core.types.requests import ExtendedHttpRequest
from django.db.models.manager import RelatedManager # type: ignore # MyPy complains because of django-stubs
from django.db.models.manager import RelatedManager # type: ignore # MyPy complains because of django-stubs
logger = logging.getLogger(__name__)
@ -60,9 +60,7 @@ class User(UUIDModel, properties.PropertiesMixin):
This class represents a single user, associated with one authenticator
"""
manager = models.ForeignKey(
Authenticator, on_delete=models.CASCADE, related_name='users'
)
manager = models.ForeignKey(Authenticator, on_delete=models.CASCADE, related_name='users')
name = models.CharField(max_length=128, db_index=True)
real_name = models.CharField(max_length=128)
comments = models.CharField(max_length=256)
@ -70,12 +68,8 @@ class User(UUIDModel, properties.PropertiesMixin):
password = models.CharField(
max_length=128, default=''
) # Only used on "internal" sources or sources that "needs password"
mfa_data = models.CharField(
max_length=128, default=''
) # Only used on "internal" sources
staff_member = models.BooleanField(
default=False
) # Staff members can login to admin
mfa_data = models.CharField(max_length=128, default='') # Only used on "internal" sources
staff_member = models.BooleanField(default=False) # Staff members can login to admin
is_admin = models.BooleanField(default=False) # is true, this is a super-admin
last_access = models.DateTimeField(default=NEVER)
parent = models.CharField(max_length=50, default=None, null=True)
@ -94,11 +88,7 @@ class User(UUIDModel, properties.PropertiesMixin):
ordering = ('name',)
app_label = 'uds'
constraints = [
models.UniqueConstraint(
fields=['manager', 'name'], name='u_usr_manager_name'
)
]
constraints = [models.UniqueConstraint(fields=['manager', 'name'], name='u_usr_manager_name')]
# For properties
def get_owner_id_and_type(self) -> tuple[str, str]:
@ -155,9 +145,7 @@ class User(UUIDModel, properties.PropertiesMixin):
"""
if self.parent:
try:
usr = User.objects.prefetch_related('authenticator', 'groups').get(
uuid=self.parent
)
usr = User.objects.prefetch_related('authenticator', 'groups').get(uuid=self.parent)
except Exception: # If parent do not exists
usr = self
else:
@ -177,22 +165,22 @@ class User(UUIDModel, properties.PropertiesMixin):
number_belongs_meta=Count('groups', filter=Q(groups__id__in=grps))
) # g.groups.filter(id__in=grps).count()
):
numberGroupsBelongingInMeta: int = g.number_belongs_meta # type: ignore # anottation
numberGroupsBelongingInMeta: int = typing.cast(typing.Any, g).number_belongs_meta # Anotated field
logger.debug('gn = %s', numberGroupsBelongingInMeta)
logger.debug('groups count: %s', g.number_groups) # type: ignore # anottation
logger.debug('groups count: %s', typing.cast(typing.Any, g).number_groups) # Anotated field
if g.meta_if_any is True and numberGroupsBelongingInMeta > 0:
numberGroupsBelongingInMeta = g.number_groups # type: ignore # anottation
numberGroupsBelongingInMeta = typing.cast(typing.Any, g).number_groups # Anotated field
logger.debug('gn after = %s', numberGroupsBelongingInMeta)
# If a meta group is empty, all users belongs to it. we can use gn != 0 to check that if it is empty, is not valid
if numberGroupsBelongingInMeta == g.number_groups: # type: ignore # anottation
if numberGroupsBelongingInMeta == typing.cast(typing.Any, g).number_groups:
# This group matches
yield g
def __str__(self):
def __str__(self) -> str:
return f'{self.pretty_name} (id:{self.id})'
def clean_related_data(self) -> None:
@ -204,7 +192,7 @@ class User(UUIDModel, properties.PropertiesMixin):
self.manager.mfa.get_instance().reset_data(mfas.MFA.get_user_id(self))
@staticmethod
def pre_delete(sender, **kwargs) -> None: # pylint: disable=unused-argument
def pre_delete(sender: typing.Any, **kwargs: typing.Any) -> None: # pylint: disable=unused-argument
"""
Used to invoke the Service class "Destroy" before deleting it from database.
@ -240,6 +228,7 @@ class User(UUIDModel, properties.PropertiesMixin):
logger.debug('Deleted user %s', to_delete)
# Connect to pre delete signal
signals.pre_delete.connect(User.pre_delete, sender=User)

View File

@ -178,7 +178,7 @@ class UserService(UUIDModel, properties.PropertiesMixin):
(see related classes uds.core.util.unique_name_generator and uds.core.util.unique_mac_generator)
"""
return Environment.environment_for_table_record(
self._meta.verbose_name, # type: ignore # pylint: disable=no-member
self._meta.verbose_name or self._meta.model_name or '',
self.id,
{
'mac': unique.UniqueMacGenerator,
@ -218,8 +218,8 @@ class UserService(UUIDModel, properties.PropertiesMixin):
if self.publication is not None:
publication_instance = self.publication.get_instance()
except Exception:
# The publication to witch this item points to, does not exists
self.publication = None # type: ignore
# The publication to which this item points to, does not exists
self.publication = None
logger.exception(
'Got exception at get_instance of an userService %s (seems that publication does not exists!)',
self,
@ -256,7 +256,7 @@ class UserService(UUIDModel, properties.PropertiesMixin):
)
return us
def update_data(self, userservice_instance: 'services.UserService'):
def update_data(self, userservice_instance: 'services.UserService') -> None:
"""
Updates the data field with the serialized :py:class:uds.core.services.UserDeployment
@ -370,7 +370,7 @@ class UserService(UUIDModel, properties.PropertiesMixin):
"""
return bool(self.get_osmanager())
def transforms_user_or_password_for_service(self):
def transforms_user_or_password_for_service(self) -> bool:
"""
If the os manager changes the username or the password, this will return True
"""

View File

@ -48,7 +48,7 @@ from uds.core.util import modfinder
logger = logging.getLogger(__name__)
def __loadModules():
def __loadModules() -> None:
"""
Loads all notifiers modules
"""

View File

@ -60,7 +60,7 @@ class Telegram:
def request(
self,
method,
method: str,
params: typing.Optional[dict[str, typing.Any]] = None,
*,
stream: bool = False,

View File

@ -34,7 +34,7 @@ from uds.core.util import modfinder
logger = logging.getLogger(__name__)
def __load_plugins():
def __load_plugins() -> None:
"""
This imports all packages that are descendant of this package, and, after that,
it register all subclases of service provider as

View File

@ -12,13 +12,13 @@ CONVERSORS: typing.Final[dict[typing.Any, collections.abc.Callable[[typing.Type[
typing.Optional[str]: lambda x: str(x) if x is not None else None, # pyright: ignore
bool: lambda x: bool(x),
typing.Optional[bool]: lambda x: bool(x) if x is not None else None, # pyright: ignore
int: lambda x: int(x or '0'), # pyright: ignore
typing.Optional[int]: lambda x: int(x or '0') if x is not None else None, # pyright: ignore
float: lambda x: float(x or '0'), # pyright: ignore
typing.Optional[float]: lambda x: float(x or '0') if x is not None else None, # pyright: ignore
datetime.datetime: lambda x: datetime.datetime.fromtimestamp(int(x)), # pyright: ignore
int: lambda x: int(x or '0'), # type: ignore
typing.Optional[int]: lambda x: int(x or '0') if x is not None else None, # type: ignore
float: lambda x: float(x or '0'), # type: ignore
typing.Optional[float]: lambda x: float(x or '0') if x is not None else None, # type: ignore
datetime.datetime: lambda x: datetime.datetime.fromtimestamp(int(x)), # type: ignore
typing.Optional[datetime.datetime]: lambda x: (
datetime.datetime.fromtimestamp(int(x)) if x is not None else None # pyright: ignore
datetime.datetime.fromtimestamp(int(x)) if x is not None else None # type: ignore
),
}
@ -31,7 +31,7 @@ def _from_dict(
extra = extra or {}
return type(
**{
k: CONVERSORS.get(type.__annotations__.get(k, str), lambda x: x)(
k: typing.cast(typing.Callable[..., typing.Any], CONVERSORS.get(type.__annotations__.get(k, str), lambda x: x))(
dictionary.get(k, extra.get(k, None))
)
for k in type._fields # type: ignore

View File

@ -48,7 +48,7 @@ from uds.core.util import modfinder
logger = logging.getLogger(__name__)
def __loadModules():
def __loadModules() -> None:
"""
This imports all packages that are descendant of this package, and, after that,
it register all subclases of service provider as

View File

@ -44,7 +44,7 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com
from uds.core.util import modfinder
def __init__():
def __load_modules__() -> None:
"""
This imports all packages that are descendant of this package, and, after that,
it register all subclases of service provider as
@ -57,4 +57,4 @@ def __init__():
)
__init__()
__load_modules__()

View File

@ -29,14 +29,14 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from django.http import HttpResponse
from django.http import HttpRequest, HttpResponse
# from django.views.decorators.cache import cache_page
from uds.core.util.config import GlobalConfig
# @cache_page(3600, key_prefix='custom', cache='memory')
def custom(request, component):
def custom(request: 'HttpRequest', component: str) -> 'HttpResponse':
content_type = 'text/plain'
value = ''