1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-11-10 08:24:12 +03:00

Refactoring for mypy complains... most of them are weird :S

This commit is contained in:
Adolfo Gómez García
2024-10-12 03:17:54 +02:00
parent dd71811045
commit c041284588
11 changed files with 27 additions and 25 deletions

View File

@@ -107,22 +107,22 @@ async def request_pools(session: aiohttp.ClientSession) -> list[collections.abc.
async def request_ticket( async def request_ticket(
session: aiohttp.ClientSession, session: aiohttp.ClientSession,
username: str, username: str,
authSmallName: str, auth_label: str,
groups: typing.Union[list[str], str], groups: typing.Union[list[str], str],
servicePool: str, servicepool: str,
realName: typing.Optional[str] = None, real_name: typing.Optional[str] = None,
transport: typing.Optional[str] = None, transport: typing.Optional[str] = None,
force: bool = False force: bool = False
) -> collections.abc.MutableMapping[str, typing.Any]: ) -> collections.abc.MutableMapping[str, typing.Any]:
data = { data = {
'username': username, 'username': username,
'authSmallName': authSmallName, 'authSmallName': auth_label,
'groups': groups, 'groups': groups,
'servicePool': servicePool, 'servicePool': servicepool,
'force': 'true' if force else 'false' 'force': 'true' if force else 'false'
} }
if realName: if real_name:
data['realname'] = realName data['realname'] = real_name
if transport: if transport:
data['transport'] = transport data['transport'] = transport
response = await session.put( response = await session.put(
@@ -145,10 +145,10 @@ async def main():
ticket = await request_ticket( ticket = await request_ticket(
session=session, session=session,
username='adolfo', username='adolfo',
authSmallName='172.27.0.1:8000', auth_label='172.27.0.1:8000',
groups=['adolfo', 'dkmaster'], groups=['adolfo', 'dkmaster'],
servicePool='6201b357-c4cd-5463-891e-71441a25faee', servicepool='6201b357-c4cd-5463-891e-71441a25faee',
realName='Adolfo Gómez', real_name='Adolfo Gómez',
force=True force=True
) )
print(ticket) print(ticket)

View File

@@ -32,7 +32,6 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com Author: Adolfo Gómez, dkmaster at dkmon dot com
''' '''
import typing import typing
import collections.abc
import requests import requests
import argparse import argparse
import socket import socket
@@ -52,7 +51,7 @@ class LogoutException(RESTException):
pass pass
def registerWithBroker( def register_with_broker(
auth_uuid: str, auth_uuid: str,
username: str, username: str,
password: str, password: str,
@@ -151,7 +150,7 @@ def main():
args = parser.parse_args() args = parser.parse_args()
try: try:
token = registerWithBroker( token = register_with_broker(
auth_uuid=args.auth_uuid, auth_uuid=args.auth_uuid,
username=args.username, username=args.username,
password=args.password, password=args.password,

View File

@@ -806,7 +806,7 @@ class Unmanaged(ActorV3Action):
# Retrieve real service from token alias # Retrieve real service from token alias
dbservice = ServiceTokenAlias.objects.get(alias=token).service dbservice = ServiceTokenAlias.objects.get(alias=token).service
else: else:
dbservice: Service = Service.objects.get(token=token) dbservice = Service.objects.get(token=token)
service: 'services.Service' = dbservice.get_instance() service: 'services.Service' = dbservice.get_instance()
except Exception: except Exception:
logger.exception('Unmanaged host request: %s', self._params) logger.exception('Unmanaged host request: %s', self._params)

View File

@@ -195,9 +195,9 @@ class AssignedService(DetailHandler):
def get_logs(self, parent: 'Model', item: str) -> list[typing.Any]: def get_logs(self, parent: 'Model', item: str) -> list[typing.Any]:
parent = ensure.is_instance(parent, models.ServicePool) parent = ensure.is_instance(parent, models.ServicePool)
try: try:
userService: models.UserService = parent.assigned_user_services().get(uuid=process_uuid(item)) user_service: models.UserService = parent.assigned_user_services().get(uuid=process_uuid(item))
logger.debug('Getting logs for %s', userService) logger.debug('Getting logs for %s', user_service)
return log.get_logs(userService) return log.get_logs(user_service)
except Exception as e: except Exception as e:
raise self.invalid_item_response() from e raise self.invalid_item_response() from e
@@ -206,9 +206,9 @@ class AssignedService(DetailHandler):
parent = ensure.is_instance(parent, models.ServicePool) parent = ensure.is_instance(parent, models.ServicePool)
try: try:
if cache: if cache:
userservice: models.UserService = parent.cached_users_services().get(uuid=process_uuid(item)) userservice = parent.cached_users_services().get(uuid=process_uuid(item))
else: else:
userservice: models.UserService = parent.assigned_user_services().get(uuid=process_uuid(item)) userservice = parent.assigned_user_services().get(uuid=process_uuid(item))
except Exception as e: except Exception as e:
logger.exception('delete_item') logger.exception('delete_item')
raise self.invalid_item_response() from e raise self.invalid_item_response() from e

View File

@@ -201,7 +201,7 @@ class RadiusAuth(auths.Authenticator):
def test(env: 'environment.Environment', data: 'types.core.ValuesType') -> 'types.core.TestResult': def test(env: 'environment.Environment', data: 'types.core.ValuesType') -> 'types.core.TestResult':
"""Test the connection to the server .""" """Test the connection to the server ."""
try: try:
auth = RadiusAuth(env, data) # type: ignore auth = RadiusAuth(env, data)
return auth.test_connection() return auth.test_connection()
except Exception as e: except Exception as e:
logger.error("Exception found testing Radius auth %s: %s", e.__class__, e) logger.error("Exception found testing Radius auth %s: %s", e.__class__, e)

View File

@@ -451,7 +451,7 @@ class RegexLdap(auths.Authenticator):
@staticmethod @staticmethod
def test(env: 'environment.Environment', data: 'types.core.ValuesType') -> 'types.core.TestResult': def test(env: 'environment.Environment', data: 'types.core.ValuesType') -> 'types.core.TestResult':
try: try:
auth = RegexLdap(env, data) # type: ignore # Regexldap does not use "dbAuth", so it's safe... auth = RegexLdap(env, data)
return auth.test_connection() return auth.test_connection()
except Exception as e: except Exception as e:
logger.error('Exception found testing Simple LDAP auth %s: %s', e.__class__, e) logger.error('Exception found testing Simple LDAP auth %s: %s', e.__class__, e)

View File

@@ -469,7 +469,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
@staticmethod @staticmethod
def test(env: 'environment.Environment', data: 'types.core.ValuesType') -> 'types.core.TestResult': def test(env: 'environment.Environment', data: 'types.core.ValuesType') -> 'types.core.TestResult':
try: try:
auth = SimpleLDAPAuthenticator(env, data) # type: ignore auth = SimpleLDAPAuthenticator(env, data)
return auth.test_connection() return auth.test_connection()
except Exception as e: except Exception as e:
logger.error("Exception found testing Simple LDAP auth: %s", e) logger.error("Exception found testing Simple LDAP auth: %s", e)

View File

@@ -99,6 +99,7 @@ class Image(UUIDModel):
@staticmethod @staticmethod
def prepare_for_db(data: bytes) -> tuple[int, int, bytes]: def prepare_for_db(data: bytes) -> tuple[int, int, bytes]:
image: PIL.Image.Image
try: try:
stream = io.BytesIO(data) stream = io.BytesIO(data)
image = PIL.Image.open(stream) image = PIL.Image.open(stream)

View File

@@ -68,6 +68,7 @@ class StatsCountersAccum(models.Model):
raise ValueError('No previous interval for HOUR') raise ValueError('No previous interval for HOUR')
case self.DAY: case self.DAY:
return StatsCountersAccum.IntervalType.HOUR return StatsCountersAccum.IntervalType.HOUR
raise ValueError('Invalid interval type')
def is_base_interval(self) -> bool: def is_base_interval(self) -> bool:
"""Returns if this is the base interval""" """Returns if this is the base interval"""

View File

@@ -44,7 +44,7 @@ from .linux_osmanager import LinuxOsManager
from .linux_randompass_osmanager import LinuxRandomPassManager from .linux_randompass_osmanager import LinuxRandomPassManager
from .linux_ad_osmanager import LinuxOsADManager from .linux_ad_osmanager import LinuxOsADManager
_mypath = os.path.dirname(__spec__.origin) # type: ignore[name-defined] # mypy incorrectly report __spec__ as not beind defined _mypath: typing.Final[str] = os.path.dirname(__spec__.origin) # type: ignore[type-var, assignment] # mypy has some problem with dirname??
# Old version, using spec is better, but we can use __package__ as well # Old version, using spec is better, but we can use __package__ as well
#_mypath = os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) # pyright: ignore #_mypath = os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) # pyright: ignore

View File

@@ -32,6 +32,7 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com Author: Adolfo Gómez, dkmaster at dkmon dot com
""" """
# pyright: reportUnusedImport=false # pyright: reportUnusedImport=false
import typing
import os.path import os.path
from django.utils.translation import gettext_noop as _ from django.utils.translation import gettext_noop as _
@@ -42,9 +43,9 @@ from .windows import WindowsOsManager
from .windows_domain import WinDomainOsManager from .windows_domain import WinDomainOsManager
from .windows_random import WinRandomPassManager from .windows_random import WinRandomPassManager
_mypath = os.path.dirname(__spec__.origin) # type: ignore[name-defined] # mypy incorrectly report __spec__ as not beind defined _mypath: typing.Final[str] = os.path.dirname(__spec__.origin) # type: ignore[type-var, assignment] # mypy has some problem with dirname??
# Old version, using spec is better, but we can use __package__ as well # Old version, using spec is better, but we can use __package__ as well
#_mypath = os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) # pyright: ignore # _mypath = os.path.dirname(typing.cast(str, sys.modules[__package__].__file__)) # pyright: ignore
managers.downloads_manager().register( managers.downloads_manager().register(