1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-23 17:34:17 +03:00

More updates to type annotations

This commit is contained in:
Adolfo Gómez García 2023-12-04 01:07:56 +01:00
parent 695a91eb38
commit a6e65b62b0
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
118 changed files with 275 additions and 275 deletions

View File

@ -81,14 +81,14 @@ class Handler:
# For implementing help
# A list of pairs of (path, help) for subpaths on this handler
help_paths: typing.ClassVar[list[typing.Tuple[str, str]]] = []
help_paths: typing.ClassVar[list[tuple[str, str]]] = []
help_text: typing.ClassVar[str] = 'No help available'
_request: 'ExtendedHttpRequestWithUser' # It's a modified HttpRequest
_path: str
_operation: str
_params: dict[str, typing.Any] # This is a deserliazied object from request. Can be anything as 'a' or {'a': 1} or ....
_args: typing.Tuple[
_args: tuple[
str, ...
] # This are the "path" split by /, that is, the REST invocation arguments
_kwargs: dict
@ -199,7 +199,7 @@ class Handler:
return self._params
@property
def args(self) -> typing.Tuple[str, ...]:
def args(self) -> tuple[str, ...]:
"""
Returns the args object
"""

View File

@ -81,7 +81,7 @@ class Authenticators(ModelHandler):
{'tags': {'title': _('tags'), 'visible': False}},
]
def enum_types(self) -> typing.Iterable[type[auths.Authenticator]]:
def enum_types(self) -> collections.abc.Iterable[type[auths.Authenticator]]:
return auths.factory().providers().values()
def typeInfo(self, type_: type['Module']) -> dict[str, typing.Any]:

View File

@ -227,7 +227,7 @@ class Auths(Handler):
path = 'auth'
authenticated = False # By default, all handlers needs authentication
def auths(self) -> typing.Iterable[dict[str, typing.Any]]:
def auths(self) -> collections.abc.Iterable[dict[str, typing.Any]]:
paramAll: bool = self._params.get('all', 'false').lower() == 'true'
auth: Authenticator
for auth in Authenticator.objects.all():

View File

@ -166,7 +166,7 @@ class MetaAssignedService(DetailHandler):
parent = ensure.is_instance(parent, models.MetaPool)
def assignedUserServicesForPools() -> (
typing.Generator[
typing.Tuple[models.UserService, typing.Optional[dict[str, typing.Any]]], None, None
tuple[models.UserService, typing.Optional[dict[str, typing.Any]]], None, None
]
):
for m in parent.members.filter(enabled=True):

View File

@ -63,7 +63,7 @@ class MFA(ModelHandler):
{'tags': {'title': _('tags'), 'visible': False}},
]
def enum_types(self) -> typing.Iterable[type[mfas.MFA]]:
def enum_types(self) -> collections.abc.Iterable[type[mfas.MFA]]:
return mfas.factory().providers().values()
def getGui(self, type_: str) -> list[typing.Any]:

View File

@ -75,7 +75,7 @@ class Notifiers(ModelHandler):
{'tags': {'title': _('tags'), 'visible': False}},
]
def enum_types(self) -> typing.Iterable[type[messaging.Notifier]]:
def enum_types(self) -> collections.abc.Iterable[type[messaging.Notifier]]:
return messaging.factory().providers().values()
def getGui(self, type_: str) -> list[typing.Any]:

View File

@ -91,7 +91,7 @@ class OsManagers(ModelHandler):
)
# Types related
def enum_types(self) -> typing.Iterable[type[osmanagers.OSManager]]:
def enum_types(self) -> collections.abc.Iterable[type[osmanagers.OSManager]]:
return osmanagers.factory().providers().values()
# Gui related

View File

@ -78,7 +78,7 @@ class Permissions(Handler):
@staticmethod
def permsToDict(
perms: typing.Iterable[models.Permissions],
perms: collections.abc.Iterable[models.Permissions],
) -> list[dict[str, str]]:
res = []
entity: typing.Optional[typing.Union[models.User, models.Group]]

View File

@ -118,7 +118,7 @@ class Providers(ModelHandler):
raise RequestError(gettext('Can\'t delete providers with services'))
# Types related
def enum_types(self) -> typing.Iterable[type[services.ServiceProvider]]:
def enum_types(self) -> collections.abc.Iterable[type[services.ServiceProvider]]:
return services.factory().providers().values()
# Gui related

View File

@ -257,7 +257,7 @@ class Services(DetailHandler): # pylint: disable=too-many-public-methods
def getTypes(
self, parent: 'Model', forType: typing.Optional[str]
) -> typing.Iterable[dict[str, typing.Any]]:
) -> collections.abc.Iterable[dict[str, typing.Any]]:
parent = ensure.is_instance(parent, models.Provider)
logger.debug('getTypes parameters: %s, %s', parent, forType)
offers: list[dict[str, typing.Any]] = []
@ -288,7 +288,7 @@ class Services(DetailHandler): # pylint: disable=too-many-public-methods
return offers # Default is that details do not have types
def getGui(self, parent: 'Model', forType: str) -> typing.Iterable[typing.Any]:
def getGui(self, parent: 'Model', forType: str) -> collections.abc.Iterable[typing.Any]:
parent = ensure.is_instance(parent, models.Provider)
try:
logger.debug('getGui parameters: %s, %s', parent, forType)

View File

@ -625,7 +625,7 @@ class ServicesPools(ModelHandler):
# Returns the action list based on current element, for calendar
def actionsList(self, item: 'Model') -> typing.Any:
item = ensure.is_instance(item, ServicePool)
validActions: typing.Tuple[dict, ...] = ()
validActions: tuple[dict, ...] = ()
itemInfo = item.service.getType() # type: ignore
if itemInfo.usesCache is True:
validActions += (

View File

@ -81,7 +81,7 @@ class Transports(ModelHandler):
{'tags': {'title': _('tags'), 'visible': False}},
]
def enum_types(self) -> typing.Iterable[type[transports.Transport]]:
def enum_types(self) -> collections.abc.Iterable[type[transports.Transport]]:
return transports.factory().providers().values()
def getGui(self, type_: str) -> list[typing.Any]:

View File

@ -65,7 +65,7 @@ if typing.TYPE_CHECKING:
from uds.models import UserService
def getGroupsFromMeta(groups) -> typing.Iterable[Group]:
def getGroupsFromMeta(groups) -> collections.abc.Iterable[Group]:
for g in groups:
if g.is_meta:
for x in g.groups.all():
@ -86,7 +86,7 @@ class Users(DetailHandler):
parent = ensure.is_instance(parent, Authenticator)
# processes item to change uuid key for id
def uuid_to_id(iterable: typing.Iterable[typing.Any]): # will get values from a queryset
def uuid_to_id(iterable: collections.abc.Iterable[typing.Any]): # will get values from a queryset
for v in iterable:
v['id'] = v['uuid']
del v['uuid']

View File

@ -428,7 +428,7 @@ class DetailHandler(BaseModelHandler):
_parent: typing.Optional['ModelHandler']
_path: str
_params: typing.Any # _params is deserialized object from request
_args: typing.Tuple[str, ...]
_args: tuple[str, ...]
_kwargs: dict[str, typing.Any]
_user: 'User'
@ -654,7 +654,7 @@ class DetailHandler(BaseModelHandler):
"""
return {}
def getGui(self, parent: models.Model, forType: str) -> typing.Iterable[typing.Any]:
def getGui(self, parent: models.Model, forType: str) -> collections.abc.Iterable[typing.Any]:
"""
Gets the gui that is needed in order to "edit/add" new items on this detail
If not overriden, means that the detail has no edit/new Gui
@ -667,7 +667,7 @@ class DetailHandler(BaseModelHandler):
def getTypes(
self, parent: models.Model, forType: typing.Optional[str]
) -> typing.Iterable[dict[str, typing.Any]]:
) -> collections.abc.Iterable[dict[str, typing.Any]]:
"""
The default is that detail element will not have any types (they are "homogeneous")
but we provided this method, that can be overridden, in case one detail needs it
@ -721,7 +721,7 @@ class ModelHandler(BaseModelHandler):
# For example ('services', True) -- > .../id_parent/services
# ('services', False) --> ..../services
custom_methods: typing.ClassVar[
list[typing.Tuple[str, bool]]
list[tuple[str, bool]]
] = [] # If this model respond to "custom" methods, we will declare them here
# If this model has details, which ones
detail: typing.ClassVar[
@ -759,7 +759,7 @@ class ModelHandler(BaseModelHandler):
return self.item_as_dict(item)
# types related
def enum_types(self) -> typing.Iterable[type['Module']]: # override this
def enum_types(self) -> collections.abc.Iterable[type['Module']]: # override this
"""
Must be overriden by desdencents if they support types
Excpetcs the list of types that the handler supports

View File

@ -61,7 +61,7 @@ class ContentProcessor:
"""
mime_type: typing.ClassVar[str] = ''
extensions: typing.ClassVar[typing.Iterable[str]] = []
extensions: typing.ClassVar[collections.abc.Iterable[str]] = []
_request: 'HttpRequest'

View File

@ -234,11 +234,11 @@ class OAuth2Authenticator(auths.Authenticator):
return [cert.public_key() for cert in fields.getCertificatesFromField(self.publicKey)]
def _codeVerifierAndChallenge(self) -> typing.Tuple[str, str]:
def _codeVerifierAndChallenge(self) -> tuple[str, str]:
"""Generate a code verifier and a code challenge for PKCE
Returns:
typing.Tuple[str, str]: Code verifier and code challenge
tuple[str, str]: Code verifier and code challenge
"""
codeVerifier = ''.join(secrets.choice(PKCE_ALPHABET) for _ in range(128))
codeChallenge = (

View File

@ -132,7 +132,7 @@ class RadiusClient:
# Second element of return value is the mfa code from field
def authenticate(
self, username: str, password: str, mfaField: str = ''
) -> typing.Tuple[list[str], str, bytes]:
) -> tuple[list[str], str, bytes]:
reply = self.sendAccessRequest(username, password)
if reply.code not in (pyrad.packet.AccessAccept, pyrad.packet.AccessChallenge):
@ -145,7 +145,7 @@ class RadiusClient:
if 'Class' in reply:
groups = [
i[groupClassPrefixLen:].decode()
for i in typing.cast(typing.Iterable[bytes], reply['Class'])
for i in typing.cast(collections.abc.Iterable[bytes], reply['Class'])
if i.startswith(groupClassPrefix)
]
else:
@ -157,7 +157,7 @@ class RadiusClient:
if mfaField and mfaField in reply:
mfaCode = ''.join(
i[groupClassPrefixLen:].decode()
for i in typing.cast(typing.Iterable[bytes], reply['Class'])
for i in typing.cast(collections.abc.Iterable[bytes], reply['Class'])
if i.startswith(groupClassPrefix)
)
return (groups, mfaCode, typing.cast(list[bytes], reply.get('State') or [b''])[0])

View File

@ -564,7 +564,7 @@ class RegexLdap(auths.Authenticator):
groups = self.__getGroups(user)
groupsManager.validate(groups)
def searchUsers(self, pattern: str) -> typing.Iterable[dict[str, str]]:
def searchUsers(self, pattern: str) -> collections.abc.Iterable[dict[str, str]]:
try:
res = []
for r in ldaputil.getAsDict(

View File

@ -546,7 +546,7 @@ class SAMLAuthenticator(auths.Authenticator):
def getInfo(
self, parameters: collections.abc.Mapping[str, str]
) -> typing.Optional[typing.Tuple[str, typing.Optional[str]]]:
) -> typing.Optional[tuple[str, typing.Optional[str]]]:
"""
Althought this is mainly a get info callback, this can be used for any other purpuse we like.
In this case, we use it to provide logout callback also

View File

@ -134,7 +134,7 @@ class SampleAuth(auths.Authenticator):
if values and len(self.groups.value) < 2:
raise exceptions.validation.ValidationError(_('We need more than two groups!'))
def searchUsers(self, pattern: str) -> typing.Iterable[dict[str, str]]:
def searchUsers(self, pattern: str) -> collections.abc.Iterable[dict[str, str]]:
"""
Here we will receive a pattern for searching users.
@ -152,7 +152,7 @@ class SampleAuth(auths.Authenticator):
for a in range(1, 10)
]
def searchGroups(self, pattern: str) -> typing.Iterable[dict[str, str]]:
def searchGroups(self, pattern: str) -> collections.abc.Iterable[dict[str, str]]:
"""
Here we we will receive a patter for searching groups.

View File

@ -525,7 +525,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
raise exceptions.auth.AuthenticatorException(_('Username not found'))
groupsManager.validate(self.__getGroups(user))
def searchUsers(self, pattern: str) -> typing.Iterable[dict[str, str]]:
def searchUsers(self, pattern: str) -> collections.abc.Iterable[dict[str, str]]:
try:
res = []
for r in ldaputil.getAsDict(
@ -547,7 +547,7 @@ class SimpleLDAPAuthenticator(auths.Authenticator):
logger.exception("Exception: ")
raise exceptions.auth.AuthenticatorException(_('Too many results, be more specific')) from e
def searchGroups(self, pattern: str) -> typing.Iterable[dict[str, str]]:
def searchGroups(self, pattern: str) -> collections.abc.Iterable[dict[str, str]]:
try:
res = []
for r in ldaputil.getAsDict(

View File

@ -261,7 +261,7 @@ class Authenticator(Module):
"""
return cls.authenticate is not Authenticator.authenticate
def searchUsers(self, pattern: str) -> typing.Iterable[dict[str, str]]:
def searchUsers(self, pattern: str) -> collections.abc.Iterable[dict[str, str]]:
"""
If you provide this method, the user will be allowed to search users,
that is, the search button at administration interface, at user form,
@ -281,7 +281,7 @@ class Authenticator(Module):
"""
return []
def searchGroups(self, pattern: str) -> typing.Iterable[dict[str, str]]:
def searchGroups(self, pattern: str) -> collections.abc.Iterable[dict[str, str]]:
"""
Returns an array of groups that match the supplied pattern
If none found, returns empty array. Items returned are BaseGroups (or derived)
@ -572,7 +572,7 @@ class Authenticator(Module):
def getInfo(
self, parameters: collections.abc.Mapping[str, str]
) -> typing.Optional[typing.Tuple[str, typing.Optional[str]]]:
) -> typing.Optional[tuple[str, typing.Optional[str]]]:
"""
This method is invoked whenever the authinfo url is invoked, with the name of the authenticator
If this is implemented, information returned by this will be shown via web.

View File

@ -175,7 +175,7 @@ class GroupsManager:
return None
def validate(self, groupName: typing.Union[str, typing.Iterable[str]]) -> None:
def validate(self, groupName: typing.Union[str, collections.abc.Iterable[str]]) -> None:
"""Validates that the group (or groups) groupName passed in is valid for this group manager.
It check that the group specified is known by this group manager.

View File

@ -37,17 +37,17 @@ import collections.abc
from uds.core import types
KNOWN_OS_LIST: typing.Final[typing.Tuple[types.os.KnownOS, ...]] = tuple(
KNOWN_OS_LIST: typing.Final[tuple[types.os.KnownOS, ...]] = tuple(
os for os in types.os.KnownOS if os != types.os.KnownOS.UNKNOWN
)
ALL_OS_LIST: typing.Final[typing.Tuple[types.os.KnownOS, ...]] = KNOWN_OS_LIST + (types.os.KnownOS.UNKNOWN,)
desktopOss: typing.Final[typing.Tuple[types.os.KnownOS, ...]] = (
ALL_OS_LIST: typing.Final[tuple[types.os.KnownOS, ...]] = KNOWN_OS_LIST + (types.os.KnownOS.UNKNOWN,)
desktopOss: typing.Final[tuple[types.os.KnownOS, ...]] = (
types.os.KnownOS.LINUX,
types.os.KnownOS.WINDOWS,
types.os.KnownOS.MAC_OS,
)
MOBILE_OS_LIST: typing.Final[typing.Tuple[types.os.KnownOS, ...]] = tuple(set(ALL_OS_LIST) - set(desktopOss))
MOBILE_OS_LIST: typing.Final[tuple[types.os.KnownOS, ...]] = tuple(set(ALL_OS_LIST) - set(desktopOss))
DEFAULT_OS: typing.Final[types.os.KnownOS] = types.os.KnownOS.WINDOWS
@ -55,7 +55,7 @@ DEFAULT_OS: typing.Final[types.os.KnownOS] = types.os.KnownOS.WINDOWS
knownBrowsers = tuple(types.os.KnownBrowser)
browsersREs: dict[types.os.KnownBrowser, typing.Tuple] = {
browsersREs: dict[types.os.KnownBrowser, tuple] = {
types.os.KnownBrowser.FIREFOX: (re.compile(r'Firefox/([0-9.]+)'),),
types.os.KnownBrowser.SEAMONKEY: (re.compile(r'Seamonkey/([0-9.]+)'),),
types.os.KnownBrowser.CHROME: (re.compile(r'Chrome/([0-9.]+)'),),
@ -72,7 +72,7 @@ browsersREs: dict[types.os.KnownBrowser, typing.Tuple] = {
types.os.KnownBrowser.EDGE: (re.compile(r'Edg/([0-9.]+)'),),
}
browserRules: dict[types.os.KnownBrowser, typing.Tuple] = {
browserRules: dict[types.os.KnownBrowser, tuple] = {
types.os.KnownBrowser.EDGE: (types.os.KnownBrowser.EDGE, ()),
types.os.KnownBrowser.CHROME: (
types.os.KnownBrowser.CHROME,

View File

@ -107,13 +107,13 @@ class ServerManager(metaclass=singleton.Singleton):
def getServerStats(
self, serversFltr: 'QuerySet[models.Server]'
) -> list[typing.Tuple[typing.Optional['types.servers.ServerStats'], 'models.Server']]:
) -> list[tuple[typing.Optional['types.servers.ServerStats'], 'models.Server']]:
"""
Returns a list of stats for a list of servers
"""
# Paralelize stats retrieval
retrievedStats: list[
typing.Tuple[typing.Optional['types.servers.ServerStats'], 'models.Server']
tuple[typing.Optional['types.servers.ServerStats'], 'models.Server']
] = []
def _retrieveStats(server: 'models.Server') -> None:
@ -140,11 +140,11 @@ class ServerManager(metaclass=singleton.Singleton):
now: datetime.datetime,
minMemoryMB: int = 0,
excludeServersUUids: typing.Optional[typing.Set[str]] = None,
) -> typing.Tuple['models.Server', 'types.servers.ServerStats']:
) -> tuple['models.Server', 'types.servers.ServerStats']:
"""
Finds the best server for a service
"""
best: typing.Optional[typing.Tuple['models.Server', 'types.servers.ServerStats']] = None
best: typing.Optional[tuple['models.Server', 'types.servers.ServerStats']] = None
unmanaged_list: list['models.Server'] = []
fltrs = serverGroup.servers.filter(maintenance_mode=False)
fltrs = fltrs.filter(Q(locked_until=None) | Q(locked_until__lte=now)) # Only unlocked servers

View File

@ -45,7 +45,7 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
FLDS_EQUIV: collections.abc.Mapping[str, typing.Iterable[str]] = {
FLDS_EQUIV: collections.abc.Mapping[str, collections.abc.Iterable[str]] = {
'fld1': ('username', 'platform', 'duration'),
'fld2': ('source', 'srcip', 'browser', 'sent'),
'fld3': ('destination', 'dstip', 'received'),
@ -136,14 +136,14 @@ class StatsManager(metaclass=singleton.Singleton):
self,
ownerType: int,
counterType: int,
ownerIds: typing.Union[typing.Iterable[int], int, None],
ownerIds: typing.Union[collections.abc.Iterable[int], int, None],
since: datetime.datetime,
to: datetime.datetime,
interval: typing.Optional[int],
max_intervals: typing.Optional[int],
limit: typing.Optional[int],
use_max: bool = False,
) -> typing.Iterable:
) -> collections.abc.Iterable:
"""
Retrieves counters from item
@ -297,8 +297,8 @@ class StatsManager(metaclass=singleton.Singleton):
def getEvents(
self,
ownerType: typing.Union[int, typing.Iterable[int]],
eventType: typing.Union[int, typing.Iterable[int]],
ownerType: typing.Union[int, collections.abc.Iterable[int]],
eventType: typing.Union[int, collections.abc.Iterable[int]],
**kwargs
) -> 'models.QuerySet[StatsEvents]':
"""

View File

@ -716,7 +716,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
idTransport: typing.Optional[str],
doTest: bool = True,
clientHostname: typing.Optional[str] = None,
) -> typing.Tuple[
) -> tuple[
typing.Optional[str],
UserService,
typing.Optional['services.UserService'],
@ -895,7 +895,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
idMetaPool: str,
idTransport: str,
clientHostName: typing.Optional[str] = None,
) -> typing.Tuple[
) -> tuple[
typing.Optional[str],
UserService,
typing.Optional['services.UserService'],
@ -914,7 +914,7 @@ class UserServiceManager(metaclass=singleton.Singleton):
# Get pool members. Just pools "visible" and "usable"
poolMembers = [p for p in meta.members.all() if p.pool.isVisible() and p.pool.isUsable()]
# Sort pools array. List of tuples with (priority, pool)
sortPools: list[typing.Tuple[int, ServicePool]]
sortPools: list[tuple[int, ServicePool]]
# Sort pools based on meta selection
if meta.policy == types.pools.LoadBalancingPolicy.PRIORITY:
sortPools = [(p.priority, p.pool) for p in poolMembers]
@ -947,12 +947,12 @@ class UserServiceManager(metaclass=singleton.Singleton):
logger.debug('Pools: %s/%s', pools, poolsFull)
usable: typing.Optional[typing.Tuple[ServicePool, Transport]] = None
usable: typing.Optional[tuple[ServicePool, Transport]] = None
# Now, Lets find first if there is one assigned in ANY pool
def ensureTransport(
pool: ServicePool,
) -> typing.Optional[typing.Tuple[ServicePool, Transport]]:
) -> typing.Optional[tuple[ServicePool, Transport]]:
found = None
t: Transport
if idTransport == 'meta': # Autoselected:

View File

@ -51,7 +51,7 @@ class MessageProcessorThread(BaseThread):
keepRunning: bool = True
_cached_providers: typing.Optional[
list[typing.Tuple[int, NotificationProviderModule]]
list[tuple[int, NotificationProviderModule]]
]
_cached_stamp: float
@ -62,7 +62,7 @@ class MessageProcessorThread(BaseThread):
self._cached_stamp = 0.0
@property
def providers(self) -> list[typing.Tuple[int, NotificationProviderModule]]:
def providers(self) -> list[tuple[int, NotificationProviderModule]]:
# If _cached_providers is invalid or _cached_time is older than CACHE_TIMEOUT,
# we need to refresh it
if (

View File

@ -66,7 +66,7 @@ class LoginAllowed(enum.StrEnum):
def checkAction(
action: 'LoginAllowed|str',
request: 'ExtendedHttpRequest',
networks: typing.Optional[typing.Iterable[str]] = None,
networks: typing.Optional[collections.abc.Iterable[str]] = None,
) -> bool:
def checkIp() -> bool:
if networks is None:
@ -210,7 +210,7 @@ class MFA(Module):
def _getData(
self, request: 'ExtendedHttpRequest', userId: str
) -> typing.Optional[typing.Tuple[datetime.datetime, str]]:
) -> typing.Optional[tuple[datetime.datetime, str]]:
"""
Internal method to get the data from storage
"""

View File

@ -193,7 +193,7 @@ class OSManager(Module):
userService: 'UserService', # pylint: disable=unused-argument
username: str,
password: str,
) -> typing.Tuple[str, str]:
) -> tuple[str, str]:
"""
This will be invoked prior to passsing username/password to Transport.

View File

@ -49,7 +49,7 @@ logger = logging.getLogger(__name__)
def barChart(
size: typing.Tuple[float, float, int],
size: tuple[float, float, int],
data: collections.abc.Mapping[str, typing.Any],
output: io.BytesIO,
) -> None:
@ -108,7 +108,7 @@ def barChart(
def lineChart(
size: typing.Tuple[float, float, int],
size: tuple[float, float, int],
data: collections.abc.Mapping[str, typing.Any],
output: io.BytesIO,
) -> None:
@ -165,7 +165,7 @@ def lineChart(
def surfaceChart(
size: typing.Tuple[float, float, int],
size: tuple[float, float, int],
data: collections.abc.Mapping[str, typing.Any],
output: io.BytesIO,
) -> None:

View File

@ -88,7 +88,7 @@ class ServiceProviderFactory(factory.ModuleFactory[ServiceProvider]):
super().insert(type_)
def servicesThatDoNotNeedPublication(self) -> typing.Iterable[type[Service]]:
def servicesThatDoNotNeedPublication(self) -> collections.abc.Iterable[type[Service]]:
"""
Returns a list of all service providers registered that do not need
to be published

View File

@ -194,7 +194,7 @@ class Service(Module):
# : Restricted transports
# : If this list contains anything else but emtpy, the only allowed protocol for transports
# : will be the ones listed here (on implementation, ofc)
allowedProtocols: typing.Iterable = protocols.GENERIC_VDI
allowedProtocols: collections.abc.Iterable = protocols.GENERIC_VDI
# : If this services "spawns" a new copy on every execution (that is, does not "reuse" the previous opened session)
# : Default behavior is False (and most common), but some services may need to respawn a new "copy" on every launch
@ -301,7 +301,7 @@ class Service(Module):
# Keep untouched if maxServices is not present
def requestServicesForAssignation(self, **kwargs) -> typing.Iterable['UserService']:
def requestServicesForAssignation(self, **kwargs) -> collections.abc.Iterable['UserService']:
"""
override this if mustAssignManualy is True
@params kwargs: Named arguments
@ -329,13 +329,13 @@ class Service(Module):
"""
return typing.cast('UniqueNameGenerator', self.idGenerators('name'))
def listAssignables(self) -> typing.Iterable[typing.Tuple[str, str]]:
def listAssignables(self) -> collections.abc.Iterable[tuple[str, str]]:
"""
If overrided, will provide list of assignables elements, so we can "add" an element manually to the list of assigned user services
If not overriden, means that it cannot assign manually
Returns:
list[typing.Tuple[str, str]] -- List of asignables services, first element is id, second is name of the element
list[tuple[str, str]] -- List of asignables services, first element is id, second is name of the element
"""
return []
@ -368,23 +368,23 @@ class Service(Module):
"""
return None
def getVappLauncher(self, userService: 'models.UserService') -> typing.Optional[typing.Tuple[str, str]]:
def getVappLauncher(self, userService: 'models.UserService') -> typing.Optional[tuple[str, str]]:
"""Returns the vapp launcher for this service, if any
Args:
userService (UserService): User service to get the vapp launcher from
Returns:
typing.Optional[typing.Tuple[str, str]]: A tuple with the vapp launcher name and the vapp launcher path on server
typing.Optional[tuple[str, str]]: A tuple with the vapp launcher name and the vapp launcher path on server
"""
return None
def getValidId(self, idsList: typing.Iterable[str]) -> typing.Optional[str]:
def getValidId(self, idsList: collections.abc.Iterable[str]) -> typing.Optional[str]:
"""
Looks for an "owned" id in the provided list. If found, returns it, else return None
Args:
idsList (typing.Iterable[str]): List of IPs and MACs that acts as
idsList (collections.abc.Iterable[str]): List of IPs and MACs that acts as
Returns:
typing.Optional[str]: [description]

View File

@ -584,7 +584,7 @@ class UserService(Environmentable, Serializable):
base method does nothing
"""
def getConnectionData(self) -> typing.Optional[typing.Tuple[str, str, str]]:
def getConnectionData(self) -> typing.Optional[tuple[str, str, str]]:
"""
This method is only invoked on some user deployments that needs to provide
Credentials based on deployment itself

View File

@ -90,7 +90,7 @@ class Transport(Module):
# Windows
# Macintosh
# Linux
supportedOss: typing.Tuple = consts.os.desktopOss # Supported operating systems
supportedOss: tuple = consts.os.desktopOss # Supported operating systems
# If this transport is visible via Web, via Thin Client or both
webTransport: bool = False
@ -154,7 +154,7 @@ class Transport(Module):
return f'Not accessible (using service ip {ip})'
@classmethod
def supportsProtocol(cls, protocol: typing.Union[typing.Iterable, str]):
def supportsProtocol(cls, protocol: typing.Union[collections.abc.Iterable, str]):
if isinstance(protocol, str):
return protocol.lower() == cls.protocol.lower()
# Not string group of strings

View File

@ -45,7 +45,7 @@ class LoadBalancingPolicy(enum.IntEnum):
return self.name.lower()
@staticmethod
def enumerate() -> list[typing.Tuple[int, str]]:
def enumerate() -> list[tuple[int, str]]:
return [
(LoadBalancingPolicy.ROUND_ROBIN, _('Evenly distributed')),
(LoadBalancingPolicy.PRIORITY, _('Priority')),
@ -62,7 +62,7 @@ class TransportSelectionPolicy(enum.IntEnum):
return self.name.lower()
@staticmethod
def enumerate() -> list[typing.Tuple[int, str]]:
def enumerate() -> list[tuple[int, str]]:
return [
(TransportSelectionPolicy.AUTO, _('Automatic selection')),
(TransportSelectionPolicy.COMMON, _('Use only common transports')),
@ -78,7 +78,7 @@ class HighAvailabilityPolicy(enum.IntEnum):
return str(self)
@staticmethod
def enumerate() -> list[typing.Tuple[int, str]]:
def enumerate() -> list[tuple[int, str]]:
return [
(HighAvailabilityPolicy.DISABLED, _('Disabled')),
(HighAvailabilityPolicy.ENABLED, _('Enabled')),

View File

@ -53,7 +53,7 @@ class CommonPrefs:
BYPASS_PREF = 'bypassPluginDetection'
@staticmethod
def getWidthHeight(size: str) -> typing.Tuple[int, int]:
def getWidthHeight(size: str) -> tuple[int, int]:
"""
Get width based on screenSizePref value
"""

View File

@ -58,7 +58,7 @@ class ServerType(enum.IntEnum):
}[self]
@staticmethod
def enumerate() -> list[typing.Tuple[int, str]]:
def enumerate() -> list[tuple[int, str]]:
return [
(ServerType.TUNNEL, _('Tunnel')),
(ServerType.ACTOR, _('Actor')),
@ -75,7 +75,7 @@ class ServerSubtype(metaclass=singleton.Singleton):
managed: bool
icon: str
registered: dict[typing.Tuple[ServerType, str], Info]
registered: dict[tuple[ServerType, str], Info]
def __init__(self) -> None:
self.registered = {}
@ -98,7 +98,7 @@ class ServerSubtype(metaclass=singleton.Singleton):
type=type, subtype=subtype, description=description, managed=managed, icon=icon
)
def enum(self) -> typing.Iterable[Info]:
def enum(self) -> collections.abc.Iterable[Info]:
return self.registered.values()
def get(self, type: ServerType, subtype: str) -> typing.Optional[Info]:
@ -118,7 +118,7 @@ class ServerStats(typing.NamedTuple):
memtotal: int = 0 # In bytes
cpuused: float = 0 # 0-1 (cpu usage)
uptime: int = 0 # In seconds
disks: list[typing.Tuple[str, int, int]] = [] # List of tuples (mountpoint, used, total)
disks: list[tuple[str, int, int]] = [] # List of tuples (mountpoint, used, total)
connections: int = 0 # Number of connections
current_users: int = 0 # Number of current users
stamp: float = 0 # Timestamp of this stats
@ -192,7 +192,7 @@ class ServerStats(typing.NamedTuple):
dct = {k: v for k, v in data.items()} # Make a copy
dct.update(kwargs) # and update with kwargs
disks: list[typing.Tuple[str, int, int]] = []
disks: list[tuple[str, int, int]] = []
for disk in dct.get('disks', []):
disks.append((disk['mountpoint'], disk['used'], disk['total']))
return ServerStats(
@ -226,7 +226,7 @@ class ServerCounter(typing.NamedTuple):
counter: int
@staticmethod
def fromIterable(data: typing.Optional[typing.Iterable]) -> typing.Optional['ServerCounter']:
def fromIterable(data: typing.Optional[collections.abc.Iterable]) -> typing.Optional['ServerCounter']:
if data is None:
return None
return ServerCounter(*data)

View File

@ -115,7 +115,7 @@ class ChoiceItem(typing.TypedDict):
text: str
ChoicesType = typing.Union[collections.abc.Callable[[], typing.Iterable[ChoiceItem]], typing.Iterable[ChoiceItem]]
ChoicesType = typing.Union[collections.abc.Callable[[], collections.abc.Iterable[ChoiceItem]], collections.abc.Iterable[ChoiceItem]]
@dataclasses.dataclass

View File

@ -144,7 +144,7 @@ class gui:
def convertToChoices(
vals: typing.Union[
collections.abc.Callable[[], list['types.ui.ChoiceItem']],
typing.Iterable[typing.Union[str, types.ui.ChoiceItem]],
collections.abc.Iterable[typing.Union[str, types.ui.ChoiceItem]],
dict[str, str],
None,
]
@ -191,7 +191,7 @@ class gui:
return {'id': str(id_), 'text': str(text), 'img': img}
@staticmethod
def sortedChoices(choices: typing.Iterable):
def sortedChoices(choices: collections.abc.Iterable):
return sorted(choices, key=lambda item: item['text'].lower())
@staticmethod
@ -579,7 +579,7 @@ class gui:
value: typing.Optional[str] = None,
choices: typing.Union[
collections.abc.Callable[[], list['types.ui.ChoiceItem']],
typing.Iterable[typing.Union[str, types.ui.ChoiceItem]],
collections.abc.Iterable[typing.Union[str, types.ui.ChoiceItem]],
dict[str, str],
None,
] = None,
@ -599,7 +599,7 @@ class gui:
self.type = types.ui.FieldType.TEXT_AUTOCOMPLETE
self._fieldsInfo.choices = gui.convertToChoices(choices or [])
def setChoices(self, values: typing.Iterable[typing.Union[str, types.ui.ChoiceItem]]):
def setChoices(self, values: collections.abc.Iterable[typing.Union[str, types.ui.ChoiceItem]]):
"""
Set the values for this choice field
"""
@ -1028,7 +1028,7 @@ class gui:
required: typing.Optional[bool] = None,
choices: typing.Union[
collections.abc.Callable[[], list['types.ui.ChoiceItem']],
typing.Iterable[typing.Union[str, types.ui.ChoiceItem]],
collections.abc.Iterable[typing.Union[str, types.ui.ChoiceItem]],
dict[str, str],
None,
] = None,
@ -1061,7 +1061,7 @@ class gui:
if fills['callbackName'] not in gui.callbacks:
gui.callbacks[fills['callbackName']] = fnc
def setChoices(self, values: typing.Iterable[typing.Union[str, types.ui.ChoiceItem]]):
def setChoices(self, values: collections.abc.Iterable[typing.Union[str, types.ui.ChoiceItem]]):
"""
Set the values for this choice field
"""
@ -1077,7 +1077,7 @@ class gui:
required: typing.Optional[bool] = None,
choices: typing.Union[
collections.abc.Callable[[], list['types.ui.ChoiceItem']],
typing.Iterable[typing.Union[str, types.ui.ChoiceItem]],
collections.abc.Iterable[typing.Union[str, types.ui.ChoiceItem]],
dict[str, str],
None,
] = None,
@ -1099,7 +1099,7 @@ class gui:
self._fieldsInfo.choices = gui.convertToChoices(choices or [])
def setChoices(self, values: typing.Iterable[typing.Union[str, types.ui.ChoiceItem]]):
def setChoices(self, values: collections.abc.Iterable[typing.Union[str, types.ui.ChoiceItem]]):
"""
Set the values for this choice field
"""
@ -1149,7 +1149,7 @@ class gui:
required: typing.Optional[bool] = None,
choices: typing.Union[
collections.abc.Callable[[], list['types.ui.ChoiceItem']],
typing.Iterable[typing.Union[str, types.ui.ChoiceItem]],
collections.abc.Iterable[typing.Union[str, types.ui.ChoiceItem]],
dict[str, str],
None,
] = None,
@ -1157,7 +1157,7 @@ class gui:
default: typing.Union[
collections.abc.Callable[[], str], collections.abc.Callable[[], list[str]], list[str], str, None
] = None,
value: typing.Optional[typing.Iterable[str]] = None,
value: typing.Optional[collections.abc.Iterable[str]] = None,
):
super().__init__(
label=label,
@ -1174,7 +1174,7 @@ class gui:
self._fieldsInfo.rows = rows
self._fieldsInfo.choices = gui.convertToChoices(choices or [])
def setChoices(self, choices: typing.Iterable[typing.Union[str, types.ui.ChoiceItem]]):
def setChoices(self, choices: collections.abc.Iterable[typing.Union[str, types.ui.ChoiceItem]]):
"""
Set the values for this choice field
"""
@ -1218,7 +1218,7 @@ class gui:
default: typing.Union[
collections.abc.Callable[[], str], collections.abc.Callable[[], list[str]], list[str], str, None
] = None,
value: typing.Optional[typing.Iterable[str]] = None,
value: typing.Optional[collections.abc.Iterable[str]] = None,
) -> None:
super().__init__(
label=label,
@ -1261,7 +1261,7 @@ class UserInterfaceType(type):
def __new__(
mcs: type['UserInterfaceType'],
classname: str,
bases: typing.Tuple[type, ...],
bases: tuple[type, ...],
namespace: dict[str, typing.Any],
) -> 'UserInterfaceType':
newClassDict = {}

View File

@ -42,7 +42,7 @@ from uds.core.managers.crypto import CryptoManager
logger = logging.getLogger(__name__)
_saveLater: list[typing.Tuple['Config.Value', typing.Any]] = []
_saveLater: list[tuple['Config.Value', typing.Any]] = []
_getLater: list['Config.Value'] = []
# For custom params (for choices mainly)
@ -92,7 +92,7 @@ class Config:
return Config.SectionType(Config.SectionType.OTHER)
@staticmethod
def values() -> typing.Iterable['Config.SectionType']:
def values() -> collections.abc.Iterable['Config.SectionType']:
return Config.SectionType
class Section:
@ -296,7 +296,7 @@ class Config:
return Config.Value(section, key, default, crypt, longText, **kwargs)
@staticmethod
def enumerate() -> typing.Iterable['Config.Value']:
def enumerate() -> collections.abc.Iterable['Config.Value']:
GlobalConfig.initialize() # Ensures DB contains all values
for cfg in DBConfig.objects.all().order_by('key'): # @UndefinedVariable
# Skip sections with name starting with "__" (not to be editted on configuration)

View File

@ -186,8 +186,8 @@ def ensureConnected(func: collections.abc.Callable[..., RT]) -> collections.abc.
def cached(
cachePrefix: str,
cacheTimeout: typing.Union[collections.abc.Callable[[], int], int] = -1,
cachingArgs: typing.Optional[typing.Union[typing.Iterable[int], int]] = None,
cachingKWArgs: typing.Optional[typing.Union[typing.Iterable[str], str]] = None,
cachingArgs: typing.Optional[typing.Union[collections.abc.Iterable[int], int]] = None,
cachingKWArgs: typing.Optional[typing.Union[collections.abc.Iterable[str], str]] = None,
cachingKeyFnc: typing.Optional[collections.abc.Callable[[typing.Any], str]] = None,
) -> collections.abc.Callable[[collections.abc.Callable[..., RT]], collections.abc.Callable[..., RT]]:
"""Decorator that give us a "quick& clean" caching feature on db.

View File

@ -49,7 +49,7 @@ def is_iterable(obj: typing.Any) -> typing.Generator[typing.Any, None, None]:
"""Returns an iterable object from a single object or a list of objects
Args:
obj (typing.Union[T, typing.Iterable[T]]): object to be converted to iterable
obj (typing.Union[T, collections.abc.Iterable[T]]): object to be converted to iterable
Returns:
typing.Generator[T, None, None]: Iterable object
@ -74,7 +74,7 @@ def is_instance(obj: typing.Any, cls: type[T]) -> T:
"""Checks if an object is an instance of a class or a list of instances of a class
Args:
obj (typing.Union[T, typing.Iterable[T]]): object to be checked
obj (typing.Union[T, collections.abc.Iterable[T]]): object to be checked
cls (type[T]): Class to check
Returns:

View File

@ -52,7 +52,7 @@ logger = logging.getLogger(__name__)
def _serverGroupValues(
types_: typing.Iterable[types.servers.ServerType], subtype: typing.Optional[str] = None
types_: collections.abc.Iterable[types.servers.ServerType], subtype: typing.Optional[str] = None
) -> list[types.ui.ChoiceItem]:
fltr = models.ServerGroup.objects.filter(
functools.reduce(lambda x, y: x | y, [Q(type=type_) for type_ in types_])

View File

@ -1218,7 +1218,7 @@ class FUSE:
return self.operations.lock(self._decode_optional_path(path), fh, cmd, lock)
def utimens(self, path: bytes, buf: typing.Any) -> int:
times: typing.Optional[typing.Tuple[int, int]] = None
times: typing.Optional[tuple[int, int]] = None
times = (
(
time_of_timespec(buf.contents.actime),
@ -1380,7 +1380,7 @@ class Operations:
def readdir(
self, path: str, fh: typing.Any
) -> typing.Union[
list[str], list[typing.Tuple[str, dict[str, int], int]]
list[str], list[tuple[str, dict[str, int], int]]
]:
'''
Can return either a list of names, or a list of (name, attrs, offset)
@ -1437,7 +1437,7 @@ class Operations:
raise FuseOSError(errno.EROFS)
def utimens(
self, path, times: typing.Optional[typing.Tuple[float, float]] = None
self, path, times: typing.Optional[tuple[float, float]] = None
) -> int:
'Times is a (atime, mtime) tuple. If None use current time.'

View File

@ -197,7 +197,7 @@ def getAsDict(
con: 'LDAPObject',
base: str,
ldapFilter: str,
attrList: typing.Optional[typing.Iterable[str]] = None,
attrList: typing.Optional[collections.abc.Iterable[str]] = None,
sizeLimit: int = 100,
scope: typing.Any = SCOPE_SUBTREE,
) -> typing.Generator[LDAPResultType, None, None]:
@ -255,7 +255,7 @@ def getFirst(
objectClass: str,
field: str,
value: str,
attributes: typing.Optional[typing.Iterable[str]] = None,
attributes: typing.Optional[collections.abc.Iterable[str]] = None,
sizeLimit: int = 50,
) -> typing.Optional[LDAPResultType]:
"""

View File

@ -109,12 +109,12 @@ class LogLevel(enum.IntEnum):
# Return all Log levels as tuples of (level value, level name)
@staticmethod
def all() -> list[typing.Tuple[int, str]]:
def all() -> list[tuple[int, str]]:
return [(level.value, level.name) for level in LogLevel]
# Rteturns "interesting" log levels
@staticmethod
def interesting() -> list[typing.Tuple[int, str]]:
def interesting() -> list[tuple[int, str]]:
return [(level.value, level.name) for level in LogLevel if level.value > LogLevel.INFO.value]

View File

@ -147,7 +147,7 @@ def dynamicLoadAndRegisterPackages(
checkFnc = checker or (lambda x: True)
def process(classes: typing.Iterable[typing.Type]) -> None:
def process(classes: collections.abc.Iterable[typing.Type]) -> None:
cls: type[V]
for cls in classes:
clsSubCls = cls.__subclasses__()

View File

@ -110,7 +110,7 @@ class PropertyAccessor:
def values(self) -> typing.Iterator[typing.Any]:
return iter(self._filter().values_list('value', flat=True))
def items(self) -> typing.Iterator[typing.Tuple[str, typing.Any]]:
def items(self) -> typing.Iterator[tuple[str, typing.Any]]:
return iter(self._filter().values_list('key', 'value'))
def clear(self) -> None:
@ -137,12 +137,12 @@ class PropertyAccessor:
class PropertiesMixin:
"""Mixin to add properties to a model"""
def ownerIdAndType(self) -> typing.Tuple[str, str]:
def ownerIdAndType(self) -> tuple[str, str]:
"""Returns the owner id and type of this object
The owner id and type is used to identify the owner in the properties table
Returns:
typing.Tuple[str, str]: Owner id and type
tuple[str, str]: Owner id and type
"""
# Default implementation does not provide any owner id or type
return '', self.__class__.__name__

View File

@ -49,9 +49,9 @@ T = typing.TypeVar('T', bound=typing.Any)
# callback(sample, arg_2, argument)
# And the literals will be ignored
def match(
arg_list: typing.Iterable[str],
arg_list: collections.abc.Iterable[str],
error: collections.abc.Callable[..., typing.Any],
*args: typing.Tuple[typing.Tuple[str, ...], collections.abc.Callable[..., T]],
*args: tuple[tuple[str, ...], collections.abc.Callable[..., T]],
) -> typing.Any:
"""
Matches a list of arguments against a list of matchers.

View File

@ -35,7 +35,7 @@ except Exception: # nosec: simple check for disabling warnings,
pass
def selfSignedCert(ip: str) -> typing.Tuple[str, str, str]:
def selfSignedCert(ip: str) -> tuple[str, str, str]:
"""
Generates a self signed certificate for the given ip.
This method is mainly intended to be used for generating/saving Actor certificates.

View File

@ -74,16 +74,16 @@ def _get_Id(obj):
return obj.id if obj.id != -1 else None
def _get_P_S_Ids(provider) -> typing.Tuple:
def _get_P_S_Ids(provider) -> tuple:
return tuple(i.id for i in provider.services.all())
def _get_S_DS_Ids(service) -> typing.Tuple:
def _get_S_DS_Ids(service) -> tuple:
return tuple(i.id for i in service.deployedServices.all())
def _get_P_S_DS_Ids(provider) -> typing.Tuple:
res: typing.Tuple = ()
def _get_P_S_DS_Ids(provider) -> tuple:
res: tuple = ()
for i in provider.services.all():
res += _get_S_DS_Ids(i)
return res
@ -111,7 +111,7 @@ idRetriever: collections.abc.Mapping[
},
}
counterTypes: collections.abc.Mapping[int, typing.Tuple[type[Model], ...]] = {
counterTypes: collections.abc.Mapping[int, tuple[type[Model], ...]] = {
CT_LOAD: (Provider,),
CT_STORAGE: (Service,),
CT_ASSIGNED: (ServicePool,),
@ -174,7 +174,7 @@ def addCounter(
def getCounters(
obj: CounterClass, counterType: int, **kwargs
) -> typing.Generator[typing.Tuple[datetime.datetime, int], None, None]:
) -> typing.Generator[tuple[datetime.datetime, int], None, None]:
"""
Get counters

View File

@ -41,7 +41,7 @@ from uds.models import Provider, Service, ServicePool, Authenticator, OSManager
logger = logging.getLogger(__name__)
# EventTupleType = typing.Tuple[datetime.datetime, str, str, str, str, int]
# EventTupleType = tuple[datetime.datetime, str, str, str, str, int]
if typing.TYPE_CHECKING:
from django.db import models

View File

@ -62,12 +62,12 @@ def _encodeValue(key: str, value: typing.Any, compat: bool = False) -> str:
return base64.b64encode(pickle.dumps(value)).decode()
def _decodeValue(dbk: str, value: typing.Optional[str]) -> typing.Tuple[str, typing.Any]:
def _decodeValue(dbk: str, value: typing.Optional[str]) -> tuple[str, typing.Any]:
if value:
try:
v = pickle.loads(base64.b64decode(value.encode())) # nosec: This is e controled pickle loading
if isinstance(v, tuple) and v[0] == MARK:
return typing.cast(typing.Tuple[str, typing.Any], v[1:])
return typing.cast(tuple[str, typing.Any], v[1:])
# Fix value so it contains also the "key" (in this case, the original key is lost, we have only the hash value...)
return ('#' + dbk, v)
except Exception:
@ -312,8 +312,8 @@ class Storage:
except Exception:
return None
def remove(self, skey: typing.Union[typing.Iterable[typing.Union[str, bytes]], str, bytes]) -> None:
keys: typing.Iterable[typing.Union[str, bytes]] = [skey] if isinstance(skey, (str, bytes)) else skey
def remove(self, skey: typing.Union[collections.abc.Iterable[typing.Union[str, bytes]], str, bytes]) -> None:
keys: collections.abc.Iterable[typing.Union[str, bytes]] = [skey] if isinstance(skey, (str, bytes)) else skey
try:
# Process several keys at once
DBStorage.objects.filter(key__in=[self.getKey(k) for k in keys]).delete()
@ -342,7 +342,7 @@ class Storage:
) -> StorageAccess:
return StorageAccess(self._owner, group=group, atomic=atomic, compat=compat)
def locateByAttr1(self, attr1: typing.Union[typing.Iterable[str], str]) -> typing.Iterable[bytes]:
def locateByAttr1(self, attr1: typing.Union[collections.abc.Iterable[str], str]) -> collections.abc.Iterable[bytes]:
if isinstance(attr1, str):
query = DBStorage.objects.filter(owner=self._owner, attr1=attr1) # @UndefinedVariable
else:
@ -353,7 +353,7 @@ class Storage:
def filter(
self, attr1: typing.Optional[str] = None, forUpdate: bool = False
) -> typing.Iterable[typing.Tuple[str, bytes, 'str|None']]:
) -> collections.abc.Iterable[tuple[str, bytes, 'str|None']]:
if attr1 is None:
query = DBStorage.objects.filter(owner=self._owner) # @UndefinedVariable
else:
@ -367,7 +367,7 @@ class Storage:
def filterPickle(
self, attr1: typing.Optional[str] = None, forUpdate: bool = False
) -> typing.Iterable[typing.Tuple[str, typing.Any, 'str|None']]:
) -> collections.abc.Iterable[tuple[str, typing.Any, 'str|None']]:
for v in self.filter(attr1, forUpdate):
yield (v[0], pickle.loads(v[1]), v[2]) # nosec: secure pickle load

View File

@ -236,7 +236,7 @@ def validateHost(host: str) -> str:
return validateFqdn(host)
def validateHostPortPair(hostPortPair: str) -> typing.Tuple[str, int]:
def validateHostPortPair(hostPortPair: str) -> tuple[str, int]:
"""
Validates that a host:port pair is valid
:param hostPortPair: host:port pair to validate

View File

@ -69,7 +69,7 @@ class PublicationCleaner(Job):
friendly_name = 'Publication Cleaner'
def run(self) -> None:
removables: typing.Iterable[
removables: collections.abc.Iterable[
ServicePoolPublication
] = ServicePoolPublication.objects.filter(
state=State.REMOVABLE,

View File

@ -84,7 +84,7 @@ class DeployedServiceRemover(Job):
for pub in publishing:
pub.cancel()
# Now all publishments are canceling, let's try to cancel cache and assigned
uServices: typing.Iterable[UserService] = servicePool.userServices.filter(
uServices: collections.abc.Iterable[UserService] = servicePool.userServices.filter(
state=State.PREPARING
)
for userService in uServices:
@ -109,7 +109,7 @@ class DeployedServiceRemover(Job):
try:
# Now all publications are canceling, let's try to cancel cache and assigned also
uServices: typing.Iterable[UserService] = servicePool.userServices.filter(
uServices: collections.abc.Iterable[UserService] = servicePool.userServices.filter(
state=State.PREPARING
)
for userService in uServices:
@ -168,7 +168,7 @@ class DeployedServiceRemover(Job):
def run(self) -> None:
# First check if there is someone in "removable" estate
removableServicePools: typing.Iterable[
removableServicePools: collections.abc.Iterable[
ServicePool
] = ServicePool.objects.filter(state=State.REMOVABLE).order_by('state_date')[
:10
@ -186,7 +186,7 @@ class DeployedServiceRemover(Job):
except Exception as e2:
logger.error('Could not delete %s', e2)
removingServicePools: typing.Iterable[ServicePool] = ServicePool.objects.filter(
removingServicePools: collections.abc.Iterable[ServicePool] = ServicePool.objects.filter(
state=State.REMOVING
).order_by('state_date')[:10]
# Check if they have been removing for a long time.

View File

@ -73,11 +73,11 @@ class ServiceCacheUpdater(Job):
def servicesPoolsNeedingCacheUpdate(
self,
) -> list[typing.Tuple[ServicePool, int, int, int]]:
) -> list[tuple[ServicePool, int, int, int]]:
# State filter for cached and inAssigned objects
# First we get all deployed services that could need cache generation
# We start filtering out the deployed services that do not need caching at all.
servicePoolsNeedingCaching: typing.Iterable[ServicePool] = (
servicePoolsNeedingCaching: collections.abc.Iterable[ServicePool] = (
ServicePool.objects.filter(Q(initial_srvs__gte=0) | Q(cache_l1_srvs__gte=0))
.filter(
max_srvs__gt=0,
@ -88,7 +88,7 @@ class ServiceCacheUpdater(Job):
)
# We will get the one that proportionally needs more cache
servicesPools: list[typing.Tuple[ServicePool, int, int, int]] = []
servicesPools: list[tuple[ServicePool, int, int, int]] = []
for servicePool in servicePoolsNeedingCaching:
servicePool.userServices.update() # Cleans cached queries
# If this deployedService don't have a publication active and needs it, ignore it

View File

@ -56,7 +56,7 @@ class DeployedServiceStatsCollector(Job):
def run(self) -> None:
logger.debug('Starting Deployed service stats collector')
servicePoolsToCheck: typing.Iterable[
servicePoolsToCheck: collections.abc.Iterable[
models.ServicePool
] = models.ServicePool.objects.filter(state=State.ACTIVE).iterator()
stamp = model.getSqlDatetime()

View File

@ -83,7 +83,7 @@ class StuckCleaner(Job):
)
# Info states are removed on UserServiceCleaner and VALID_STATES are ok, or if "hanged", checked on "HangedCleaner"
def stuckUserServices(servicePool: ServicePool) -> typing.Iterable[UserService]:
def stuckUserServices(servicePool: ServicePool) -> collections.abc.Iterable[UserService]:
q = servicePool.userServices.filter(state_date__lt=since_state)
# Get all that are not in valid or info states, AND the ones that are "PREPARING" with
# "destroy_after" property set (exists) (that means that are waiting to be destroyed after initializations)

View File

@ -85,7 +85,7 @@ class UserServiceRemover(Job):
removeFrom = getSqlDatetime() - timedelta(
seconds=10
) # We keep at least 10 seconds the machine before removing it, so we avoid connections errors
removableUserServices: typing.Iterable[
removableUserServices: collections.abc.Iterable[
UserService
] = UserService.objects.filter(
state=State.REMOVABLE,

View File

@ -252,7 +252,7 @@ class Command(BaseCommand):
VALID_ENTITIES: collections.abc.Mapping[str, collections.abc.Callable[[], str]]
verbose: bool = True
filter_args: list[typing.Tuple[str, str]] = []
filter_args: list[tuple[str, str]] = []
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@ -357,7 +357,7 @@ class Command(BaseCommand):
)
return typing.cast('typing.Iterator[ModelType]', model.objects.all().iterator()) # type: ignore
def output_count(self, message: str, iterable: typing.Iterable[T]) -> typing.Iterable[T]:
def output_count(self, message: str, iterable: collections.abc.Iterable[T]) -> collections.abc.Iterable[T]:
"""
Outputs the count of an iterable
"""

View File

@ -205,7 +205,7 @@ class Command(BaseCommand):
}
# get calendar access
calendarAccess = {}
calendarAccess: dict[str, typing.Any] = {}
for ca in models.CalendarAccess.objects.filter(service_pool=servicePool):
calendarAccess[ca.calendar.name] = ca.access

View File

@ -82,7 +82,7 @@ class StatsFS(types.UDSFSInterface):
)
# Dictionary containing a mapping between a relative day and the corresponding
# today start timestamp + first element of tuple, today start timestamp + second element of tuple
_interval: typing.ClassVar[collections.abc.Mapping[str, typing.Tuple[datetime.timedelta, datetime.timedelta]]] = {
_interval: typing.ClassVar[collections.abc.Mapping[str, tuple[datetime.timedelta, datetime.timedelta]]] = {
'today': (
datetime.timedelta(days=0),
datetime.timedelta(days=1),
@ -101,7 +101,7 @@ class StatsFS(types.UDSFSInterface):
),
}
_dispatchers: collections.abc.Mapping[str, typing.Tuple[DispatcherType, bool]]
_dispatchers: collections.abc.Mapping[str, tuple[DispatcherType, bool]]
_cache: typing.ClassVar[Cache] = Cache('fsevents')
def __init__(self) -> None:
@ -115,7 +115,7 @@ class StatsFS(types.UDSFSInterface):
# Splits the filename and returns a tuple with "dispatcher", "interval", "extension"
def getFilenameComponents(
self, filename: list[str]
) -> typing.Tuple[DispatcherType, StatInterval, str]:
) -> tuple[DispatcherType, StatInterval, str]:
if len(filename) != 1:
raise FileNotFoundError()

View File

@ -124,17 +124,17 @@ class TOTP_MFA(mfas.MFA):
def label(self) -> str:
return gettext('Authentication Code')
def _userData(self, userId: str) -> typing.Tuple[str, bool]:
def _userData(self, userId: str) -> tuple[str, bool]:
# Get data from storage related to this user
# Data contains the secret and if the user has already logged in already some time
# so we show the QR code only once
data: typing.Optional[typing.Tuple[str, bool]] = self.storage.getPickle(userId)
data: typing.Optional[tuple[str, bool]] = self.storage.getPickle(userId)
if data is None:
data = (pyotp.random_base32(), False)
self._saveUserData(userId, data)
return data
def _saveUserData(self, userId: str, data: typing.Tuple[str, bool]) -> None:
def _saveUserData(self, userId: str, data: tuple[str, bool]) -> None:
self.storage.putPickle(userId, data)
def _removeUserData(self, userId: str) -> None:

View File

@ -69,7 +69,7 @@ def migrate(
# Clean up servers, removing empty ones
servers = [s.strip() for s in servers if s.strip()]
# Try dns lookup if servers contains hostnames
server_ip_hostname: list[typing.Tuple[str, str]] = []
server_ip_hostname: list[tuple[str, str]] = []
for server in servers:
try:
validators.validateIpv4OrIpv6(server)

View File

@ -240,7 +240,7 @@ class Authenticator(ManagedObjectModel, TaggingMixin):
return Authenticator.objects.all().order_by('priority')
@staticmethod
def getByTag(tag=None) -> typing.Iterable['Authenticator']:
def getByTag(tag=None) -> collections.abc.Iterable['Authenticator']:
"""
Gets authenticator by tag name.
Special tag name "disabled" is used to exclude customAuth

View File

@ -388,7 +388,7 @@ class CalendarAction(UUIDModel):
except Exception:
self.service_pool.log('Scheduled action not executed because group is not available anymore')
actions: collections.abc.Mapping[str, typing.Tuple[collections.abc.Callable[[], None], bool]] = {
actions: collections.abc.Mapping[str, tuple[collections.abc.Callable[[], None], bool]] = {
# Id, actions (lambda), saveServicePool (bool)
CALENDAR_ACTION_CACHE_L1['id']: (set_l1_cache, True),
CALENDAR_ACTION_CACHE_L2['id']: (set_l2_cache, True),

View File

@ -51,7 +51,7 @@ WEEKDAYS: typing.Final[str] = 'WEEKDAYS'
NEVER: typing.Final[str] = 'NEVER'
# Frequencies
freqs: typing.Tuple[typing.Tuple[str, str], ...] = (
freqs: tuple[tuple[str, str], ...] = (
('YEARLY', typing.cast(str, _('Yearly'))),
('MONTHLY', typing.cast(str, _('Monthly'))),
('WEEKLY', typing.cast(str, _('Weekly'))),
@ -76,7 +76,7 @@ frq_to_mins: collections.abc.Mapping[str, int] = {
'NEVER': 1000* 1000 * 24 * 60,
}
dunits: typing.Tuple[typing.Tuple[str, str], ...] = (
dunits: tuple[tuple[str, str], ...] = (
('MINUTES', _('Minutes')),
('HOURS', _('Hours')),
('DAYS', _('Days')),
@ -90,7 +90,7 @@ dunit_to_mins: collections.abc.Mapping[str, int] = {
'WEEKS': 60 * 24 * 7,
}
weekdays: typing.Tuple[rules.weekday, ...] = (
weekdays: tuple[rules.weekday, ...] = (
rules.SU,
rules.MO,
rules.TU,

View File

@ -89,7 +89,7 @@ class Image(UUIDModel):
app_label = 'uds'
@staticmethod
def resizeAndConvert(image: PIL.Image.Image, size: typing.Tuple[int, int]) -> typing.Tuple[int, int, bytes]:
def resizeAndConvert(image: PIL.Image.Image, size: tuple[int, int]) -> tuple[int, int, bytes]:
"""
Resizes an image to the given size
"""
@ -99,7 +99,7 @@ class Image(UUIDModel):
return (image.width, image.height, output.getvalue())
@staticmethod
def prepareForDb(data: bytes) -> typing.Tuple[int, int, bytes]:
def prepareForDb(data: bytes) -> tuple[int, int, bytes]:
try:
stream = io.BytesIO(data)
image = PIL.Image.open(stream)
@ -178,7 +178,7 @@ class Image(UUIDModel):
self.thumb = consts.images.DEFAULT_THUMB
@property
def size(self) -> typing.Tuple[int, int]:
def size(self) -> tuple[int, int]:
"""
Returns the image size
"""

View File

@ -211,7 +211,7 @@ class MetaPool(UUIDModel, TaggingMixin): # type: ignore
@staticmethod
def getForGroups(
groups: typing.Iterable['Group'], user: typing.Optional['User'] = None
groups: collections.abc.Iterable['Group'], user: typing.Optional['User'] = None
) -> 'QuerySet[MetaPool]':
"""
Return deployed services with publications for the groups requested.

View File

@ -99,7 +99,7 @@ class Network(UUIDModel, TaggingMixin): # type: ignore
return int(number, 16)
@staticmethod
def networksFor(ip: str) -> typing.Iterable['Network']:
def networksFor(ip: str) -> collections.abc.Iterable['Network']:
"""
Returns the networks that are valid for specified ip in dotted quad (xxx.xxx.xxx.xxx)
"""

View File

@ -142,7 +142,7 @@ class Permissions(UUIDModel):
object_type: 'objtype.ObjectType',
object_id: typing.Optional[int] = None,
user: typing.Optional['User'] = None,
groups: typing.Optional[typing.Iterable['Group']] = None,
groups: typing.Optional[collections.abc.Iterable['Group']] = None,
) -> PermissionType:
"""
Retrieves the permission for a given object

View File

@ -95,7 +95,7 @@ class Provider(ManagedObjectModel, TaggingMixin): # type: ignore
return self.maintenance_mode
@staticmethod
def typeFilter(type_: str) -> typing.Iterable['Provider']:
def typeFilter(type_: str) -> collections.abc.Iterable['Provider']:
for i in Provider.objects.filter(data_type=type):
yield i

View File

@ -83,7 +83,7 @@ class ServerGroup(UUIDModel, TaggingMixin, properties.PropertiesMixin):
servers: 'models.manager.RelatedManager[Server]'
# For properties
def ownerIdAndType(self) -> typing.Tuple[str, str]:
def ownerIdAndType(self) -> tuple[str, str]:
return self.uuid, 'servergroup'
class Meta:
@ -209,7 +209,7 @@ class Server(UUIDModel, TaggingMixin, properties.PropertiesMixin):
app_label = 'uds'
# For properties
def ownerIdAndType(self) -> typing.Tuple[str, str]:
def ownerIdAndType(self) -> tuple[str, str]:
return self.uuid, 'server'
@property
@ -314,7 +314,7 @@ class Server(UUIDModel, TaggingMixin, properties.PropertiesMixin):
@staticmethod
def validateToken(
token: str,
serverType: typing.Union[typing.Iterable[types.servers.ServerType], types.servers.ServerType],
serverType: typing.Union[collections.abc.Iterable[types.servers.ServerType], types.servers.ServerType],
request: typing.Optional[ExtendedHttpRequest] = None,
) -> bool:
"""Ensures that a token is valid for a server type

View File

@ -185,7 +185,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
return self.osmanager.getType().transformsUserOrPasswordForService()
return False
def processUserPassword(self, username: str, password: str) -> typing.Tuple[str, str]:
def processUserPassword(self, username: str, password: str) -> tuple[str, str]:
"""
This method is provided for consistency between UserService and ServicePool
There is no posibility to check the username and password that a user will use to
@ -454,7 +454,7 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
cache_level=0, state=states.userService.USABLE, in_use=False
).update(state=states.userService.REMOVABLE, state_date=now)
def validateGroups(self, groups: typing.Iterable['Group']) -> None:
def validateGroups(self, groups: collections.abc.Iterable['Group']) -> None:
"""
Ensures that at least a group of groups (database groups) has access to this Service Pool
raise an InvalidUserException if fails check
@ -507,8 +507,8 @@ class ServicePool(UUIDModel, TaggingMixin): # type: ignore
@staticmethod
def getDeployedServicesForGroups(
groups: typing.Iterable['Group'], user: typing.Optional['User'] = None
) -> typing.Iterable['ServicePool']:
groups: collections.abc.Iterable['Group'], user: typing.Optional['User'] = None
) -> collections.abc.Iterable['ServicePool']:
"""
Return deployed services with publications for the groups requested.

View File

@ -64,8 +64,8 @@ class StatsCounters(models.Model):
@staticmethod
def get_grouped(
owner_type: typing.Union[int, typing.Iterable[int]], counter_type: int, **kwargs
) -> typing.Generator[typing.Tuple[int, int], None, None]:
owner_type: typing.Union[int, collections.abc.Iterable[int]], counter_type: int, **kwargs
) -> typing.Generator[tuple[int, int], None, None]:
"""
Returns a QuerySet of counters grouped by owner_type and counter_type
"""

View File

@ -69,8 +69,8 @@ class StatsEvents(models.Model):
@staticmethod
def get_stats(
owner_type: typing.Union[int, typing.Iterable[int]],
event_type: typing.Union[int, typing.Iterable[int]],
owner_type: typing.Union[int, collections.abc.Iterable[int]],
event_type: typing.Union[int, collections.abc.Iterable[int]],
**kwargs,
) -> 'models.QuerySet[StatsEvents]':
"""

View File

@ -249,7 +249,7 @@ class TicketStore(UUIDModel):
@staticmethod
def get_for_tunnel(
ticket: str,
) -> typing.Tuple[
) -> tuple[
'User',
'UserService',
typing.Optional[str],

View File

@ -100,7 +100,7 @@ class User(UUIDModel, properties.PropertiesMixin):
]
# For properties
def ownerIdAndType(self) -> typing.Tuple[str, str]:
def ownerIdAndType(self) -> tuple[str, str]:
return self.uuid, 'user'
def getUsernameForAuth(self) -> str:

View File

@ -125,7 +125,7 @@ class UserService(UUIDModel, properties.PropertiesMixin):
]
# For properties
def ownerIdAndType(self) -> typing.Tuple[str, str]:
def ownerIdAndType(self) -> tuple[str, str]:
return self.uuid, 'userservice'
@property
@ -358,7 +358,7 @@ class UserService(UUIDModel, properties.PropertiesMixin):
"""
return self.deployed_service.transformsUserOrPasswordForService()
def processUserPassword(self, username: str, password: str) -> typing.Tuple[str, str]:
def processUserPassword(self, username: str, password: str) -> tuple[str, str]:
"""
Before accessing a service by a transport, we can request
the service to "transform" the username & password that the transport

View File

@ -82,7 +82,7 @@ class Telegram:
def sendMessage(self, chat_id: int, text: str) -> dict[str, typing.Any]:
return self.request('sendMessage', {'chat_id': chat_id, 'text': text})
def getUpdates(self, offset: int = 0, timeout: int = 0) -> typing.Iterable[Message]:
def getUpdates(self, offset: int = 0, timeout: int = 0) -> collections.abc.Iterable[Message]:
self.lastOffset = offset or self.lastOffset
res = self.request('getUpdates', {'offset': self.lastOffset, 'timeout': timeout}, stream=True)
if res['ok'] and res['result']: # if ok and there are results

View File

@ -87,7 +87,7 @@ class LinuxRandomPassManager(LinuxOsManager):
def processUserPassword(
self, userService: 'UserService', username: str, password: str
) -> typing.Tuple[str, str]:
) -> tuple[str, str]:
if username == self._userAccount:
return (username, userService.recoverValue('linOsRandomPass'))
return username, password

View File

@ -167,7 +167,7 @@ class WindowsOsManager(osmanagers.OSManager):
def processUserPassword(
self, userService: 'UserService', username: str, password: str
) -> typing.Tuple[str, str]:
) -> tuple[str, str]:
if userService.properties.get('sso_available') == '1':
# Generate a ticket, store it and return username with no password
domain = ''

View File

@ -176,7 +176,7 @@ class WinDomainOsManager(WindowsOsManager):
if self._ou.lower().find(lpath) == -1:
self._ou += ',' + lpath
def __getServerList(self) -> typing.Iterable[typing.Tuple[str, int]]:
def __getServerList(self) -> collections.abc.Iterable[tuple[str, int]]:
if self._serverHint != '':
yield (self._serverHint, 389)
@ -194,7 +194,7 @@ class WinDomainOsManager(WindowsOsManager):
yield (str(server.target)[:-1], server.port)
def __connectLdap(
self, servers: typing.Optional[typing.Iterable[typing.Tuple[str, int]]] = None
self, servers: typing.Optional[collections.abc.Iterable[tuple[str, int]]] = None
) -> typing.Any:
"""
Tries to connect to LDAP

View File

@ -98,7 +98,7 @@ class WinRandomPassManager(WindowsOsManager):
def processUserPassword(
self, userService: 'UserService', username: str, password: str
) -> typing.Tuple[str, str]:
) -> tuple[str, str]:
if username == self._userAccount:
password = userService.recoverValue('winOsRandomPass')

View File

@ -119,7 +119,7 @@ class ReportAuto(Report, metaclass=ReportAutoType):
fields.source_field_data(self.getModel(), self.source)
logger.debug('Source field data: %s', self.source)
def getModelItems(self) -> typing.Iterable[ReportAutoModel]:
def getModelItems(self) -> collections.abc.Iterable[ReportAutoModel]:
model = self.getModel()
filters = [self.source.value] if isinstance(self.source, gui.ChoiceField) else self.source.value
@ -134,8 +134,8 @@ class ReportAuto(Report, metaclass=ReportAutoType):
def getIntervalInHours(self):
return {'hour': 1, 'day': 24, 'week': 24 * 7, 'month': 24 * 30}[self.interval.value]
def getIntervalsList(self) -> list[typing.Tuple[datetime.datetime, datetime.datetime]]:
intervals: list[typing.Tuple[datetime.datetime, datetime.datetime]] = []
def getIntervalsList(self) -> list[tuple[datetime.datetime, datetime.datetime]]:
intervals: list[tuple[datetime.datetime, datetime.datetime]] = []
# Convert start and end dates to datetime objects from date objects
start = datetime.datetime.combine(self.startingDate(), datetime.time.min)
to = datetime.datetime.combine(self.endingDate(), datetime.time.max)

View File

@ -78,7 +78,7 @@ class ListReportAuditCSV(ListReport):
uuid = 'b5f5ebc8-44e9-11ed-97a9-efa619da6a49'
# Generator of data
def genData(self) -> typing.Generator[typing.Tuple, None, None]:
def genData(self) -> typing.Generator[tuple, None, None]:
# Xtract user method, response_code and request from data
# the format is "user: [method/response_code] request"
rx = re.compile(

View File

@ -86,7 +86,7 @@ class UsageSummaryByUsersPool(StatsReport):
def getPoolData(
self, pool
) -> typing.Tuple[list[dict[str, typing.Any]], str]:
) -> tuple[list[dict[str, typing.Any]], str]:
start = self.startDate.stamp()
end = self.endDate.stamp()
logger.debug(self.pool.value)
@ -139,7 +139,7 @@ class UsageSummaryByUsersPool(StatsReport):
return data, pool.name
def getData(self) -> typing.Tuple[list[dict[str, typing.Any]], str]:
def getData(self) -> tuple[list[dict[str, typing.Any]], str]:
return self.getPoolData(ServicePool.objects.get(uuid=self.pool.value))
def generate(self) -> bytes:

View File

@ -103,13 +103,13 @@ class PoolPerformanceReport(StatsReport):
]
self.pools.setChoices(vals)
def getPools(self) -> typing.Iterable[typing.Tuple[str, str]]:
def getPools(self) -> collections.abc.Iterable[tuple[str, str]]:
for p in ServicePool.objects.filter(uuid__in=self.pools.value):
yield (str(p.id), p.name)
def getRangeData(
self,
) -> typing.Tuple[str, list, list]: # pylint: disable=too-many-locals
) -> tuple[str, list, list]: # pylint: disable=too-many-locals
start = self.startDate.stamp()
end = self.endDate.stamp()
if self.samplingPoints.num() < 2:
@ -125,7 +125,7 @@ class PoolPerformanceReport(StatsReport):
else:
xLabelFormat = 'SHORT_DATETIME_FORMAT'
samplingIntervals: list[typing.Tuple[int, int]] = []
samplingIntervals: list[tuple[int, int]] = []
samplingIntervalSeconds = (end - start) / samplingPoints
for i in range(samplingPoints):
samplingIntervals.append((int(start + i * samplingIntervalSeconds), int(start + (i + 1) * samplingIntervalSeconds)))

View File

@ -58,7 +58,7 @@ class PoolsUsageSummary(UsageByPool):
def processedData(
self,
) -> typing.Tuple[
) -> tuple[
typing.ValuesView[collections.abc.MutableMapping[str, typing.Any]], int, int, int
]:
orig, poolNames = super().getData() # pylint: disable=unused-variable # Keep name for reference

View File

@ -88,7 +88,7 @@ class UsageByPool(StatsReport):
]
self.pool.setChoices(vals)
def getData(self) -> typing.Tuple[list[dict[str, typing.Any]], str]:
def getData(self) -> tuple[list[dict[str, typing.Any]], str]:
# Generate the sampling intervals and get dataUsers from db
start = self.startDate.stamp()
end = self.endDate.stamp()

View File

@ -93,7 +93,7 @@ class StatsReportLogin(StatsReport):
def initGui(self):
pass
def getRangeData(self) -> typing.Tuple[str, list, list]:
def getRangeData(self) -> tuple[str, list, list]:
start = self.startDate.stamp()
end = self.endDate.stamp()
if self.samplingPoints.num() < 2:
@ -109,7 +109,7 @@ class StatsReportLogin(StatsReport):
else:
xLabelFormat = 'SHORT_DATETIME_FORMAT'
samplingIntervals: list[typing.Tuple[int, int]] = []
samplingIntervals: list[tuple[int, int]] = []
samplingIntervalSeconds = (end - start) / samplingPoints
for i in range(samplingPoints):
samplingIntervals.append(

View File

@ -149,7 +149,7 @@ class Client:
finally:
lock.release()
def isFullyFunctionalVersion(self) -> typing.Tuple[bool, str]:
def isFullyFunctionalVersion(self) -> tuple[bool, str]:
"""
'4.0 version is always functional (right now...)
"""
@ -183,7 +183,7 @@ class Client:
api = self.__getApi()
vms: typing.Iterable[typing.Any] = api.system_service().vms_service().list() # type: ignore
vms: collections.abc.Iterable[typing.Any] = api.system_service().vms_service().list() # type: ignore
logger.debug('oVirt VMS: %s', vms)
@ -353,7 +353,7 @@ class Client:
d: typing.Any = datacenter_service.get() # type: ignore
storage = []
for dd in typing.cast(typing.Iterable, datacenter_service.storage_domains_service().list()): # type: ignore
for dd in typing.cast(collections.abc.Iterable, datacenter_service.storage_domains_service().list()): # type: ignore
try:
active = dd.status.value
except Exception:
@ -835,10 +835,10 @@ class Client:
cert_subject = display.certificate.subject
else:
for i in typing.cast(
typing.Iterable, api.system_service().hosts_service().list()
collections.abc.Iterable, api.system_service().hosts_service().list()
):
for k in typing.cast(
typing.Iterable,
collections.abc.Iterable,
api.system_service()
.hosts_service()
.service(i.id)

View File

@ -285,7 +285,7 @@ if sys.platform == 'win32':
self._queue = [opCreate, opChangeMac, opStart, opWait, opSuspend, opFinish]
def __checkMachineState(
self, chkState: typing.Union[list[str], typing.Tuple[str, ...], str]
self, chkState: typing.Union[list[str], tuple[str, ...], str]
) -> str:
logger.debug(
'Checking that state of machine %s (%s) is %s',

View File

@ -67,11 +67,11 @@ def checkResultRaw(lst: typing.Any) -> str:
return str(lst[1])
def checkResult(lst: typing.Any) -> typing.Tuple[collections.abc.Mapping[str, typing.Any], str]:
def checkResult(lst: typing.Any) -> tuple[collections.abc.Mapping[str, typing.Any], str]:
return xml2dict.parse(checkResultRaw(lst)), lst[1]
def asIterable(element: RT) -> typing.Iterable[RT]:
def asIterable(element: RT) -> collections.abc.Iterable[RT]:
if isinstance(element, (tuple, list)):
return element
return (element,)
@ -112,7 +112,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
self.connection = xmlrpc.client.ServerProxy(self.endpoint)
@ensureConnected
def enumStorage(self, storageType: int = 0) -> typing.Iterable[types.StorageType]:
def enumStorage(self, storageType: int = 0) -> collections.abc.Iterable[types.StorageType]:
sstorageType = str(storageType) # Ensure it is an string
# Invoke datastore pools info, no parameters except connection string
result, _ = checkResult(
@ -125,7 +125,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
)
@ensureConnected
def enumTemplates(self) -> typing.Iterable[types.TemplateType]:
def enumTemplates(self) -> collections.abc.Iterable[types.TemplateType]:
"""
Invoke templates pools info, with this parameters:
1.- Session string
@ -145,7 +145,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
pass
@ensureConnected
def enumImages(self) -> typing.Iterable[types.ImageType]:
def enumImages(self) -> collections.abc.Iterable[types.ImageType]:
"""
Invoke images pools info, with this parameters:
1.- Session string
@ -316,7 +316,7 @@ class OpenNebulaClient: # pylint: disable=too-many-public-methods
)
@ensureConnected
def enumVMs(self) -> typing.Iterable[types.VirtualMachineType]:
def enumVMs(self) -> collections.abc.Iterable[types.VirtualMachineType]:
"""
Invoke vm pools info, with this parameters:
1.- Session string

View File

@ -43,7 +43,7 @@ logger = logging.getLogger(__name__)
def enumerateDatastores(
api: 'client.OpenNebulaClient', datastoreType: int = 0
) -> typing.Iterable['types.StorageType']:
) -> collections.abc.Iterable['types.StorageType']:
"""
0 seems to be images datastore
"""

View File

@ -49,7 +49,7 @@ logger = logging.getLogger(__name__)
def getTemplates(
api: 'client.OpenNebulaClient', force: bool = False
) -> typing.Iterable[types.TemplateType]:
) -> collections.abc.Iterable[types.TemplateType]:
for t in api.enumTemplates():
if t.name[:4] != 'UDSP':
yield t

View File

@ -179,7 +179,7 @@ def removeMachine(api: 'client.OpenNebulaClient', machineId: str) -> None:
def enumerateMachines(
api: 'client.OpenNebulaClient',
) -> typing.Iterable[types.VirtualMachineType]:
) -> collections.abc.Iterable[types.VirtualMachineType]:
'''
Obtains the list of machines inside OpenNebula.
Machines starting with UDS are filtered out
@ -201,7 +201,7 @@ def getNetInfo(
api: 'client.OpenNebulaClient',
machineId: str,
networkId: typing.Optional[str] = None,
) -> typing.Tuple[str, str]:
) -> tuple[str, str]:
'''
Get the MAC and the IP for the network and machine. If network is None, for the first network
'''

View File

@ -204,12 +204,12 @@ class OpenNebulaProvider(ServiceProvider): # pylint: disable=too-many-public-me
def getDatastores(
self, datastoreType: int = 0
) -> typing.Iterable[on.types.StorageType]:
) -> collections.abc.Iterable[on.types.StorageType]:
yield from on.storage.enumerateDatastores(self.api, datastoreType)
def getTemplates(
self, force: bool = False
) -> typing.Iterable[on.types.TemplateType]:
) -> collections.abc.Iterable[on.types.TemplateType]:
yield from on.template.getTemplates(self.api, force)
def makeTemplate(self, fromTemplateId: str, name, toDataStore: str) -> str:
@ -308,7 +308,7 @@ class OpenNebulaProvider(ServiceProvider): # pylint: disable=too-many-public-me
def getNetInfo(
self, machineId: str, networkId: typing.Optional[str] = None
) -> typing.Tuple[str, str]:
) -> tuple[str, str]:
'''
Changes the mac address of first nic of the machine to the one specified
'''

Some files were not shown because too many files have changed in this diff Show More