1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-10 01:17:59 +03:00

Added typing correction on 3.6 version

This commit is contained in:
Adolfo Gómez García 2023-03-25 13:19:28 +01:00
parent e2814f2674
commit 562e9201c8
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
30 changed files with 73 additions and 77 deletions

View File

@ -141,7 +141,7 @@ class Actor(Handler):
except Exception:
return Actor.result({})
def get(self): # pylint: disable=too-many-return-statements
def get(self) -> typing.Any: # pylint: disable=too-many-return-statements
"""
Processes get requests
"""
@ -186,7 +186,7 @@ class Actor(Handler):
raise RequestError('Invalid request')
# Must be invoked as '/rest/actor/UUID/[message], with message data in post body
def post(self): # pylint: disable=too-many-branches
def post(self) -> typing.Any: # pylint: disable=too-many-branches
"""
Processes post requests
"""

View File

@ -126,7 +126,7 @@ class Authenticators(ModelHandler):
'values': [gui.choiceItem('', _('None'))]
+ gui.sortedChoices(
[
gui.choiceItem(v.uuid, v.name)
gui.choiceItem(v.uuid, v.name) # type: ignore
for v in MFA.objects.all()
]
),

View File

@ -105,7 +105,7 @@ class Client(Handler):
"""
return Client.result(_('Correct'))
def get(self): # pylint: disable=too-many-locals
def get(self) -> typing.Any: # pylint: disable=too-many-locals
"""
Processes get requests
"""

View File

@ -43,7 +43,7 @@ logger = logging.getLogger(__name__)
class Config(Handler):
needs_admin = True # By default, staff is lower level needed
def get(self):
def get(self) -> typing.Any:
cfg: CfgConfig.Value
return CfgConfig.getConfigValues(self.is_admin())

View File

@ -65,9 +65,9 @@ class Login(Handler):
@staticmethod
def result(
result: str = 'error',
token: str = None,
scrambler: str = None,
error: str = None,
token: typing.Optional[str] = None,
scrambler: typing.Optional[str] = None,
error: typing.Optional[str] = None,
) -> typing.MutableMapping[str, typing.Any]:
res = {
'result': result,
@ -229,7 +229,7 @@ class Auths(Handler):
path = 'auth'
authenticated = False # By default, all handlers needs authentication
def auths(self):
def auths(self) -> typing.Iterator[typing.Dict[str, typing.Any]]:
paramAll: bool = self._params.get('all', 'false') == 'true'
auth: Authenticator
for auth in Authenticator.objects.all():

View File

@ -160,7 +160,7 @@ class MetaPools(ModelHandler):
'values': [gui.choiceImage(-1, '--------', DEFAULT_THUMB_BASE64)]
+ gui.sortedChoices(
[
gui.choiceImage(v.uuid, v.name, v.thumb64)
gui.choiceImage(v.uuid, v.name, v.thumb64) # type: ignore
for v in Image.objects.all()
]
),
@ -175,7 +175,7 @@ class MetaPools(ModelHandler):
'values': [gui.choiceImage(-1, _('Default'), DEFAULT_THUMB_BASE64)]
+ gui.sortedChoices(
[
gui.choiceImage(v.uuid, v.name, v.thumb64)
gui.choiceImage(v.uuid, v.name, v.thumb64) # type: ignore
for v in ServicePoolGroup.objects.all()
]
),

View File

@ -256,7 +256,7 @@ class MetaAssignedService(DetailHandler):
user: User = User.objects.get(uuid=processUuid(fields['user_id']))
logStr = 'Changing ownership of service from {} to {} by {}'.format(
service.user.pretty_name, user.pretty_name, self._user.pretty_name
service.user.pretty_name, user.pretty_name, self._user.pretty_name # type: ignore
)
# If there is another service that has this same owner, raise an exception

View File

@ -92,10 +92,10 @@ class Permissions(Handler):
{
'id': perm.uuid,
'type': kind,
'auth': entity.manager.uuid,
'auth_name': entity.manager.name,
'entity_id': entity.uuid,
'entity_name': entity.name,
'auth': entity.manager.uuid, # type: ignore
'auth_name': entity.manager.name, # type: ignore
'entity_id': entity.uuid, # type: ignore
'entity_name': entity.name, # type: ignore
'perm': perm.permission,
'perm_name': perm.permission_as_string,
}
@ -103,7 +103,7 @@ class Permissions(Handler):
return sorted(res, key=lambda v: v['auth_name'] + v['entity_name'])
def get(self):
def get(self) -> typing.Any:
"""
Processes get requests
"""

View File

@ -308,7 +308,7 @@ class Services(DetailHandler): # pylint: disable=too-many-public-methods
'values': [gui.choiceItem(-1, '')]
+ gui.sortedChoices(
[
gui.choiceItem(v.uuid, v.name)
gui.choiceItem(v.uuid, v.name) # type: ignore
for v in models.Proxy.objects.all()
]
),

View File

@ -93,7 +93,7 @@ class ServicesPoolGroups(ModelHandler):
'values': [gui.choiceImage(-1, '--------', DEFAULT_THUMB_BASE64)]
+ gui.sortedChoices(
[
gui.choiceImage(v.uuid, v.name, v.thumb64)
gui.choiceImage(v.uuid, v.name, v.thumb64) # type: ignore
for v in Image.objects.all()
]
),

View File

@ -233,8 +233,8 @@ class ServicesPools(ModelHandler):
'name': item.name,
'short_name': item.short_name,
'tags': [tag.tag for tag in item.tags.all()],
'parent': item.service.name,
'parent_type': item.service.data_type,
'parent': item.service.name, # type: ignore
'parent_type': item.service.data_type, # type: ignore
'comments': item.comments,
'state': state,
'thumb': item.image.thumb64
@ -242,8 +242,8 @@ class ServicesPools(ModelHandler):
else DEFAULT_THUMB_BASE64,
'account': item.account.name if item.account is not None else '',
'account_id': item.account.uuid if item.account is not None else None,
'service_id': item.service.uuid,
'provider_id': item.service.provider.uuid,
'service_id': item.service.uuid, # type: ignore
'provider_id': item.service.provider.uuid, # type: ignore
'image_id': item.image.uuid if item.image is not None else None,
'initial_srvs': item.initial_srvs,
'cache_l1_srvs': item.cache_l1_srvs,
@ -297,7 +297,7 @@ class ServicesPools(ModelHandler):
val['tags'] = [tag.tag for tag in item.tags.all()]
val['restrained'] = restrained
val['permission'] = permissions.getEffectivePermission(self._user, item)
val['info'] = Services.serviceInfo(item.service)
val['info'] = Services.serviceInfo(item.service) # type: ignore
val['pool_group_id'] = poolGroupId
val['pool_group_name'] = poolGroupName
val['pool_group_thumb'] = poolGroupThumb
@ -325,7 +325,7 @@ class ServicesPools(ModelHandler):
'values': [gui.choiceItem('', '')]
+ gui.sortedChoices(
[
gui.choiceItem(v.uuid, v.provider.name + '\\' + v.name)
gui.choiceItem(v.uuid, v.provider.name + '\\' + v.name) # type: ignore
for v in Service.objects.all()
]
),
@ -339,7 +339,7 @@ class ServicesPools(ModelHandler):
'name': 'osmanager_id',
'values': [gui.choiceItem(-1, '')]
+ gui.sortedChoices(
[gui.choiceItem(v.uuid, v.name) for v in OSManager.objects.all()]
[gui.choiceItem(v.uuid, v.name) for v in OSManager.objects.all()] # type: ignore
),
'label': ugettext('OS Manager'),
'tooltip': ugettext('OS Manager used as base of this service pool'),
@ -394,7 +394,7 @@ class ServicesPools(ModelHandler):
'values': [gui.choiceImage(-1, '--------', DEFAULT_THUMB_BASE64)]
+ gui.sortedChoices(
[
gui.choiceImage(v.uuid, v.name, v.thumb64)
gui.choiceImage(v.uuid, v.name, v.thumb64) # type: ignore
for v in Image.objects.all()
]
),
@ -409,7 +409,7 @@ class ServicesPools(ModelHandler):
'values': [gui.choiceImage(-1, _('Default'), DEFAULT_THUMB_BASE64)]
+ gui.sortedChoices(
[
gui.choiceImage(v.uuid, v.name, v.thumb64)
gui.choiceImage(v.uuid, v.name, v.thumb64) # type: ignore
for v in ServicePoolGroup.objects.all()
]
),
@ -493,7 +493,7 @@ class ServicesPools(ModelHandler):
'name': 'account_id',
'values': [gui.choiceItem(-1, '')]
+ gui.sortedChoices(
[gui.choiceItem(v.uuid, v.name) for v in Account.objects.all()]
[gui.choiceItem(v.uuid, v.name) for v in Account.objects.all()] # type: ignore
),
'label': ugettext('Accounting'),
'tooltip': ugettext('Account associated to this service pool'),
@ -659,7 +659,7 @@ class ServicesPools(ModelHandler):
# Returns the action list based on current element, for calendar
def actionsList(self, item: ServicePool) -> typing.Any:
validActions: typing.Tuple[typing.Dict, ...] = ()
itemInfo = item.service.getType()
itemInfo = item.service.getType() # type: ignore
if itemInfo.usesCache is True:
validActions += (
CALENDAR_ACTION_INITIAL,
@ -691,7 +691,7 @@ class ServicesPools(ModelHandler):
return validActions
def listAssignables(self, item: ServicePool) -> typing.Any:
service = item.service.getInstance()
service = item.service.getInstance() # type: ignore
return [gui.choiceItem(i[0], i[1]) for i in service.listAssignables()]
def createFromAssignable(self, item: ServicePool) -> typing.Any:

View File

@ -77,8 +77,8 @@ class ServicesUsage(DetailHandler):
'friendly_name': item.friendly_name,
'owner': owner,
'owner_info': owner_info,
'service': item.deployed_service.service.name,
'service_id': item.deployed_service.service.uuid,
'service': item.deployed_service.service.name, # type: ignore
'service_id': item.deployed_service.service.uuid, # type: ignore
'pool': item.deployed_service.name,
'pool_id': item.deployed_service.uuid,
'ip': props.get('ip', _('unknown')),

View File

@ -112,7 +112,7 @@ class System(Handler):
needs_admin = False
needs_staff = True
def get(self):
def get(self) -> typing.Any:
logger.debug('args: %s', self._args)
# Only allow admin user for global stats
if len(self._args) == 1:

View File

@ -127,7 +127,7 @@ class Tickets(Handler):
# Must be invoked as '/rest/ticket/create, with "username", ("authId" or ("authSmallName" or "authTag"), "groups" (array) and optionally "time" (in seconds) as paramteres
def put(
self,
): # pylint: disable=too-many-locals,too-many-branches,too-many-statements
) -> typing.Any:
"""
Processes put requests, currently only under "create"
"""
@ -170,7 +170,7 @@ class Tickets(Handler):
groupIds: typing.List[str] = []
for groupName in tools.asList(self._params['groups']):
try:
groupIds.append(auth.groups.get(name=groupName).uuid)
groupIds.append(auth.groups.get(name=groupName).uuid or '')
except Exception:
logger.info(
'Group %s from ticket does not exists on auth %s, forced creation: %s',
@ -183,7 +183,7 @@ class Tickets(Handler):
auth.groups.create(
name=groupName,
comments='Autocreated form ticket by using force paratemeter',
).uuid
).uuid or ''
)
if not groupIds: # No valid group in groups names
@ -224,7 +224,7 @@ class Tickets(Handler):
# For metapool, transport is ignored..
servicePoolId = 'M' + pool.uuid
servicePoolId = 'M' + pool.uuid # type: ignore
transportId = 'meta'
except models.MetaPool.DoesNotExist:
@ -240,7 +240,7 @@ class Tickets(Handler):
):
pool.assignedGroups.add(auth.groups.get(uuid=addGrp))
servicePoolId = 'F' + pool.uuid
servicePoolId = 'F' + pool.uuid # type: ignore
except models.Authenticator.DoesNotExist:
return Tickets.result(error='Authenticator does not exists')

View File

@ -110,7 +110,7 @@ class Transports(ModelHandler):
'value': [],
'values': sorted(
[{'id': x.uuid, 'text': x.name} for x in Network.objects.all()],
key=lambda x: x['text'].lower(),
key=lambda x: x['text'].lower(), # type: ignore
),
'label': ugettext('Networks'),
'tooltip': ugettext(
@ -148,7 +148,7 @@ class Transports(ModelHandler):
'values': [
{'id': x.uuid, 'text': x.name}
for x in ServicePool.objects.all().order_by('name')
if transport.protocol in x.service.getType().allowedProtocols
if transport.protocol in x.service.getType().allowedProtocols # type: ignore
],
'label': ugettext('Service Pools'),
'tooltip': ugettext('Currently assigned services pools'),

View File

@ -208,7 +208,7 @@ class AssignedService(DetailHandler):
user = models.User.objects.get(uuid=processUuid(fields['user_id']))
logStr = 'Changing ownership of service from {} to {} by {}'.format(
userService.user.pretty_name, user.pretty_name, self._user.pretty_name
userService.user.pretty_name, user.pretty_name, self._user.pretty_name # type: ignore
)
# If there is another service that has this same owner, raise an exception

View File

@ -87,7 +87,7 @@ def getUDSCookie(
if 'uds' not in request.COOKIES:
cookie = cryptoManager().randomString(UDS_COOKIE_LENGTH)
if response is not None:
response.set_cookie('uds', cookie, samesite='Lax')
response.set_cookie('uds', cookie, samesite='Lax', httponly=GlobalConfig.ENHANCED_SECURITY.getBool())
request.COOKIES['uds'] = cookie
else:
cookie = request.COOKIES['uds'][:UDS_COOKIE_LENGTH]

View File

@ -43,7 +43,7 @@ class ScheduledAction(Job):
frecuency = 29 # Frecuncy for this job
friendly_name = 'Scheduled action runner'
def run(self):
def run(self) -> None:
configuredAction: CalendarAction
for configuredAction in CalendarAction.objects.filter(
service_pool__service__provider__maintenance_mode=False, # Avoid maintenance

View File

@ -62,7 +62,7 @@ class AccountUsage(UUIDModel):
pool_uuid = models.CharField(max_length=50, db_index=True, default='')
start = models.DateTimeField(default=NEVER)
end = models.DateTimeField(default=NEVER)
user_service: 'models.OneToOneField[AccountUsage, UserService]' = (
user_service: 'models.OneToOneField[UserService|None]' = (
models.OneToOneField(
UserService,
null=True,
@ -71,7 +71,7 @@ class AccountUsage(UUIDModel):
on_delete=models.SET_NULL,
)
)
account: 'models.ForeignKey[AccountUsage, Account]' = models.ForeignKey(
account: 'models.ForeignKey[Account]' = models.ForeignKey(
Account, related_name='usages', on_delete=models.CASCADE
)
@ -87,7 +87,7 @@ class AccountUsage(UUIDModel):
def elapsed_seconds(self) -> int:
if NEVER in (self.end, self.start):
return 0
return (self.end - self.start).total_seconds()
return int((self.end - self.start).total_seconds())
@property
def elapsed_seconds_timemark(self) -> int:
@ -101,7 +101,7 @@ class AccountUsage(UUIDModel):
if end < start:
return 0
return (end - start).total_seconds()
return int((end - start).total_seconds())
@property
def elapsed(self) -> str:

View File

@ -162,7 +162,7 @@ class Authenticator(ManagedObjectModel, TaggingMixin):
if (
user.real_name.strip() == '' or user.name.strip() == user.real_name.strip()
) and realName != user.real_name:
user.real_name = realName
user.real_name = realName or ''
user.save(update_fields=['real_name'])
return user

View File

@ -65,12 +65,9 @@ class Calendar(UUIDModel, TaggingMixin):
db_table = 'uds_calendar'
app_label = 'uds'
# Override default save to add uuid
def save(
self,
force_insert: bool = False,
force_update: bool = False,
using: bool = None,
update_fields: bool = None,
self, force_insert=False, force_update=False, using=None, update_fields=None
):
logger.debug('Saving calendar')

View File

@ -47,10 +47,10 @@ logger = logging.getLogger(__name__)
class CalendarAccess(UUIDModel):
calendar: 'models.ForeignKey[CalendarAccess, Calendar]' = models.ForeignKey(
calendar: 'models.ForeignKey[Calendar]' = models.ForeignKey(
Calendar, on_delete=models.CASCADE
)
service_pool: 'models.ForeignKey[CalendarAccess, ServicePool]' = models.ForeignKey(
service_pool: 'models.ForeignKey[ServicePool]' = models.ForeignKey(
ServicePool, related_name='calendarAccess', on_delete=models.CASCADE
)
access = models.CharField(max_length=8, default=states.action.DENY)

View File

@ -215,10 +215,10 @@ CALENDAR_ACTION_DICT: typing.Dict[str, typing.Dict] = {
class CalendarAction(UUIDModel):
calendar: 'models.ForeignKey[CalendarAction, Calendar]' = models.ForeignKey(
calendar: 'models.ForeignKey[Calendar]' = models.ForeignKey(
Calendar, on_delete=models.CASCADE
)
service_pool: 'models.ForeignKey[CalendarAction, ServicePool]' = models.ForeignKey(
service_pool: 'models.ForeignKey[ServicePool]' = models.ForeignKey(
ServicePool, on_delete=models.CASCADE
)
action = models.CharField(max_length=64, default='')

View File

@ -266,10 +266,10 @@ signals.pre_delete.connect(MetaPool.beforeDelete, sender=MetaPool)
class MetaPoolMember(UUIDModel):
pool: 'models.ForeignKey["MetaPoolMember", ServicePool]' = models.ForeignKey(
pool: 'models.ForeignKey[ServicePool]' = models.ForeignKey(
ServicePool, related_name='memberOfMeta', on_delete=models.CASCADE
)
meta_pool: 'models.ForeignKey["MetaPoolMember", MetaPool]' = models.ForeignKey(
meta_pool: 'models.ForeignKey[MetaPool]' = models.ForeignKey(
MetaPool, related_name='members', on_delete=models.CASCADE
)
priority = models.PositiveIntegerField(default=0)

View File

@ -290,7 +290,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
return self.service.isInMaintenance() if self.service else True
def isVisible(self) -> bool:
return self.visible
return self.visible # type: ignore
def isUsable(self) -> bool:
return (
@ -497,7 +497,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
"""
if (
self.activePublication() is None
and self.service.getType().publicationType is not None
and self.service.getType().publicationType is not None # type: ignore
):
raise InvalidServiceException()
@ -602,7 +602,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
for servicePool in query:
if (
typing.cast(typing.Any, servicePool).pubs_active
or servicePool.service.data_type in servicesNotNeedingPub
or servicePool.service.data_type in servicesNotNeedingPub # type: ignore
):
yield servicePool
@ -662,7 +662,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
"""
maxs = self.max_srvs
if maxs == 0:
maxs = self.service.getInstance().maxDeployed
maxs = self.service.getInstance().maxDeployed # type: ignore
if maxs <= 0:
return 0
@ -677,12 +677,12 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
return 100 * cachedValue // maxs
def testServer(self, host, port, timeout=4) -> bool:
return self.service.testServer(host, port, timeout)
return self.service.testServer(host, port, timeout) # type: ignore
# parent accessors
@property
def proxy(self) -> typing.Optional['Proxy']:
return self.service.proxy
return self.service.proxy # type: ignore
# Utility for logging
def log(self, message: str, level: int = log.INFO) -> None:

View File

@ -54,7 +54,7 @@ class ServicePoolGroup(UUIDModel):
name = models.CharField(max_length=128, default='', db_index=True, unique=True)
comments = models.CharField(max_length=256, default='')
priority = models.IntegerField(default=0, db_index=True)
image: 'models.ForeignKey[ServicePoolGroup, Image]' = models.ForeignKey(
image: 'models.ForeignKey[Image|None]' = models.ForeignKey(
Image,
null=True,
blank=True,
@ -75,7 +75,7 @@ class ServicePoolGroup(UUIDModel):
def __str__(self) -> str:
return 'Service Pool group {}({}): {}'.format(
self.name, self.comments, self.image.name
self.name, self.comments, self.image.name # type: ignore
)
@property

View File

@ -55,7 +55,7 @@ logger = logging.getLogger(__name__)
class ServicePoolPublicationChangelog(models.Model):
# This should be "servicePool"
publication: 'models.ForeignKey[ServicePoolPublicationChangelog, ServicePool]' = (
publication: 'models.ForeignKey[ServicePool]' = (
models.ForeignKey(
ServicePool, on_delete=models.CASCADE, related_name='changelog'
)
@ -86,7 +86,7 @@ class ServicePoolPublication(UUIDModel):
A deployed service publication keep track of data needed by services that needs "preparation". (i.e. Virtual machine --> base machine --> children of base machines)
"""
deployed_service: 'models.ForeignKey[ServicePoolPublication, ServicePool]' = (
deployed_service: 'models.ForeignKey[ServicePool]' = (
models.ForeignKey(
ServicePool, on_delete=models.CASCADE, related_name='publications'
)
@ -139,7 +139,7 @@ class ServicePoolPublication(UUIDModel):
Raises:
"""
serviceInstance = self.deployed_service.service.getInstance()
serviceInstance = self.deployed_service.service.getInstance() # type: ignore
osManager = self.deployed_service.osmanager
osManagerInstance = osManager.getInstance() if osManager else None

View File

@ -62,5 +62,4 @@ class Storage(models.Model):
def __str__(self) -> str:
return '{} {} > str= {}, {}'.format(
self.owner, self.key, self.data, '/'.join([self.attr1])
)
self.owner, self.key, self.data, self.attr1)

View File

@ -183,7 +183,7 @@ class UserService(UUIDModel): # pylint: disable=too-many-public-methods
"""
# We get the service instance, publication instance and osmanager instance
servicePool = self.deployed_service
serviceInstance = servicePool.service.getInstance()
serviceInstance = servicePool.service.getInstance() # type: ignore
if serviceInstance.needsManager is False or not servicePool.osmanager:
osmanagerInstance = None
else:
@ -360,7 +360,7 @@ class UserService(UUIDModel): # pylint: disable=too-many-public-methods
:note: This method MUST be invoked by transport before using credentials passed to getJavascript.
"""
servicePool = self.deployed_service
serviceInstance = servicePool.service.getInstance()
serviceInstance = servicePool.service.getInstance() # type: ignore
if serviceInstance.needsManager is False or not servicePool.osmanager:
return (username, password)
@ -565,7 +565,7 @@ class UserService(UUIDModel): # pylint: disable=too-many-public-methods
Returns True if this user service does not needs an publication, or if this deployed service publication is the current one
"""
return (
self.deployed_service.service.getType().publicationType is None
self.deployed_service.service.getType().publicationType is None # type: ignore
or self.publication == self.deployed_service.activePublication()
)

View File

@ -73,7 +73,7 @@ def getSqlDatetime() -> datetime:
else 'SELECT CURRENT_TIMESTAMP'
)
cursor.execute(sentence)
date = cursor.fetchone()[0]
date = (cursor.fetchone() or (datetime.now(),))[0]
else:
date = (
datetime.now()