1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-08 21:18:00 +03:00

more typing checking

This commit is contained in:
Adolfo Gómez García 2019-06-14 14:08:36 +02:00
parent a5929a0af3
commit 70ec9ab3ea
8 changed files with 125 additions and 111 deletions

View File

@ -166,7 +166,7 @@ class RegexLdap(auths.Authenticator):
if pattern.find('(') == -1:
pattern = '(' + pattern + ')'
val = attributes.get(attr, [])
if type(val) is not list: # May we have a single value
if not isinstance(val, list): # May we have a single value
val = [val]
logger.debug('Pattern: %s', pattern)
@ -183,7 +183,7 @@ class RegexLdap(auths.Authenticator):
logger.debug('Res: %s', res)
return res
def valuesDict(self):
def valuesDict(self) -> gui.ValuesDictType:
return {
'host': self._host, 'port': self._port, 'ssl': gui.boolToStr(self._ssl),
'username': self._username, 'password': self._password, 'timeout': self._timeout,
@ -198,7 +198,7 @@ class RegexLdap(auths.Authenticator):
self._userNameAttr, self._altClass
)
def marshal(self):
def marshal(self) -> bytes:
return '\t'.join([
'v3',
self._host, self._port, gui.boolToStr(self._ssl), self._username, self._password,
@ -206,27 +206,27 @@ class RegexLdap(auths.Authenticator):
self._groupNameAttr, self._userNameAttr, self._altClass
]).encode('utf8')
def unmarshal(self, val):
data = val.decode('utf8').split('\t')
if data[0] == 'v1':
logger.debug("Data: {0}".format(data[1:]))
def unmarshal(self, data: bytes) -> None:
vals = data.decode('utf8').split('\t')
if vals[0] == 'v1':
logger.debug("Data: %s", vals[1:])
self._host, self._port, self._ssl, self._username, self._password, \
self._timeout, self._ldapBase, self._userClass, self._userIdAttr, \
self._groupNameAttr, _regex, self._userNameAttr = data[1:]
self._groupNameAttr, _regex, self._userNameAttr = vals[1:]
self._ssl = gui.strToBool(self._ssl)
self._groupNameAttr = self._groupNameAttr + '=' + _regex
self._userNameAttr = '\n'.join(self._userNameAttr.split(','))
elif data[0] == 'v2':
logger.debug("Data v2: {0}".format(data[1:]))
elif vals[0] == 'v2':
logger.debug("Data v2: %s", vals[1:])
self._host, self._port, self._ssl, self._username, self._password, \
self._timeout, self._ldapBase, self._userClass, self._userIdAttr, \
self._groupNameAttr, self._userNameAttr = data[1:]
self._groupNameAttr, self._userNameAttr = vals[1:]
self._ssl = gui.strToBool(self._ssl)
elif data[0] == 'v3':
logger.debug("Data v3: {0}".format(data[1:]))
elif vals[0] == 'v3':
logger.debug("Data v3: %s", vals[1:])
self._host, self._port, self._ssl, self._username, self._password, \
self._timeout, self._ldapBase, self._userClass, self._userIdAttr, \
self._groupNameAttr, self._userNameAttr, self._altClass = data[1:]
self._groupNameAttr, self._userNameAttr, self._altClass = vals[1:]
self._ssl = gui.strToBool(self._ssl)
def __connection(self):
@ -240,7 +240,7 @@ class RegexLdap(auths.Authenticator):
return self._connection
def __connectAs(self, username, password):
def __connectAs(self, username: str, password: str):
return ldaputil.connection(username, password, self._host, ssl=self._ssl, timeout=self._timeout, debug=False)
def __getUser(self, username):
@ -338,7 +338,6 @@ class RegexLdap(auths.Authenticator):
@params groupData: a dict that has, at least, name, comments and active
@return: Raises an exception it things doesn't go fine
"""
pass
def getGroups(self, username, groupsManager):
"""
@ -356,13 +355,13 @@ class RegexLdap(auths.Authenticator):
try:
res = []
for r in ldaputil.getAsDict(
con=self.__connection(),
base=self._ldapBase,
ldapFilter='(&(&(objectClass={})({}={}*)))'.format(self._userClass, self._userIdAttr, ldaputil.escape(pattern)),
attrList=None, # All attrs
sizeLimit=LDAP_RESULT_LIMIT
):
logger.debug('R: {0}'.format(r))
con=self.__connection(),
base=self._ldapBase,
ldapFilter='(&(&(objectClass={})({}={}*)))'.format(self._userClass, self._userIdAttr, ldaputil.escape(pattern)),
attrList=None, # All attrs
sizeLimit=LDAP_RESULT_LIMIT
):
logger.debug('Result: %s', r)
res.append({
'id': r.get(self._userIdAttr.lower(), '')[0],
'name': self.__getUserRealName(r)
@ -379,7 +378,7 @@ class RegexLdap(auths.Authenticator):
auth = RegexLdap(None, env, data)
return auth.testConnection()
except Exception as e:
logger.error("Exception found testing Simple LDAP auth {0}: {1}".format(e.__class__, e))
logger.error('Exception found testing Simple LDAP auth %s: %s', e.__class__, e)
return [False, "Error testing connection"]
def testConnection(self):

View File

@ -249,12 +249,12 @@ class Module(UserInterface, Environmentable, Serializable):
"""
return self.serializeForm()
def unmarshal(self, str_):
def unmarshal(self, data: bytes) -> None:
"""
By default and if not overriden by descendants, this method recovers
data serialized using serializeForm
"""
self.unserializeForm(str_)
self.unserializeForm(data)
def check(self) -> str:
"""

View File

@ -235,7 +235,7 @@ class Authenticator(Module): # pylint: disable=too-many-public-methods
"""
return cls.authenticate != Authenticator.authenticate
def searchUsers(self, pattern) -> typing.Iterable[typing.Dict[str, str]]:
def searchUsers(self, pattern: str) -> typing.Iterable[typing.Dict[str, str]]:
"""
If you provide this method, the user will be allowed to search users,
that is, the search button at administration interface, at user form,
@ -255,7 +255,7 @@ class Authenticator(Module): # pylint: disable=too-many-public-methods
"""
return []
def searchGroups(self, pattern) -> typing.Iterable[typing.Dict[str, str]]:
def searchGroups(self, pattern: str) -> typing.Iterable[typing.Dict[str, str]]:
"""
Returns an array of groups that match the supplied pattern
If none found, returns empty array. Items returned are BaseGroups (or derived)

View File

@ -30,17 +30,15 @@
"""
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
"""
from __future__ import unicode_literals
import logging
import typing
from uds.core import Module
from uds.core.util.Config import GlobalConfig
from uds.core.ui.UserInterface import gui
import logging
logger = logging.getLogger(__name__)
__updated__ = '2018-06-07'
class ServiceProvider(Module):
"""
@ -75,11 +73,12 @@ class ServiceProvider(Module):
only need data that is keeped at form fields, marshal and unmarshal and in fact
not needed.
"""
from .BaseService import Service
# : Services that we offers. Here is a list of service types (python types) that
# : this class will provide. This types are the python clases, derived from
# : Service, that are childs of this provider
offers = []
offers: typing.List[typing.Type['Service']] = []
# : Name of type, used at administration interface to identify this
# : provider (i.e. Xen server, oVirt Server, ...)
@ -108,20 +107,20 @@ class ServiceProvider(Module):
# : This defines the maximum number of concurrent services that should be in state "in preparation" for this provider
# : Default is return the GlobalConfig value of GlobalConfig.MAX_PREPARING_SERVICES
# : Note: this variable can be either a fixed value (integer, string) or a Gui text field (with a .value)
maxPreparingServices = None
maxPreparingServices: typing.Any = None
# : This defines the maximum number of concurrent services that should be in state "removing" for this provider
# : Default is return the GlobalConfig value of GlobalConfig.MAX_REMOVING_SERVICES
# : Note: this variable can be either a fixed value (integer, string) or a Gui text field (with a .value)
maxRemovingServices = None
maxRemovingServices: typing.Any = None
# : This defines if the limits (max.. vars) should be taken into accout or simply ignored
# : Default is return the GlobalConfig value of GlobalConfig.IGNORE_LIMITS
# : Note: this variable can be either a fixed value (integer, string) or a Gui text field (with a .value)
ignoreLimits = None
ignoreLimits: typing.Any = None
@classmethod
def getServicesTypes(cls):
def getServicesTypes(cls) -> typing.List[typing.Type['Service']]:
"""
Returns what type of services this provider offers
"""

View File

@ -51,7 +51,7 @@ class ServiceProviderFactory:
"""
Initializes internal dictionary for service providers registration
"""
self._providers: typing.Dict[str, ServiceProvider] = {}
self._providers: typing.Dict[str, typing.Type[ServiceProvider]] = {}
@staticmethod
def factory() -> 'ServiceProviderFactory':
@ -62,13 +62,13 @@ class ServiceProviderFactory:
ServiceProviderFactory._factory = ServiceProviderFactory()
return ServiceProviderFactory._factory
def providers(self) -> typing.Dict[str, ServiceProvider]:
def providers(self) -> typing.Dict[str, typing.Type[ServiceProvider]]:
"""
Returns the list of service providers already registered.
"""
return self._providers
def insert(self, type_: ServiceProvider) -> None:
def insert(self, type_: typing.Type[ServiceProvider]) -> None:
"""
Inserts type_ as a service provider
"""
@ -98,14 +98,14 @@ class ServiceProviderFactory:
self._providers[typeName] = type_
def lookup(self, typeName):
def lookup(self, typeName) -> typing.Optional[typing.Type[ServiceProvider]]:
"""
Tries to locate a server provider and by its name, and, if
not found, returns None
"""
return self._providers.get(typeName.lower(), None)
def servicesThatDoNotNeedPublication(self):
def servicesThatDoNotNeedPublication(self) -> typing.Iterable[typing.Type[ServiceProvider]]:
"""
Returns a list of all service providers registered that do not need
to be published

View File

@ -30,14 +30,13 @@
"""
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pylint: disable=too-many-lines
import datetime
import typing
import time
import pickle
import logging
# import six
from django.utils.translation import get_language, ugettext as _, ugettext_noop
from uds.core.util import encoders
@ -79,25 +78,27 @@ class gui:
can access this form to let users
create new instances of this module.
"""
# Values dict type
ValuesDictType = typing.Dict[str, typing.Union[str, typing.List[str], typing.List[typing.Dict[str, str]]]]
# : True string value
TRUE = 'true'
TRUE: typing.ClassVar[str] = 'true'
# : False string value
FALSE = 'false'
FALSE: typing.ClassVar[str] = 'false'
# : String for advanced tabs
ADVANCED_TAB = ugettext_noop('Advanced')
PARAMETERS_TAB = ugettext_noop('Parameters')
CREDENTIALS_TAB = ugettext_noop('Credentials')
TUNNEL_TAB = ugettext_noop('Tunnel')
DISPLAY_TAB = ugettext_noop('Display')
ADVANCED_TAB: typing.ClassVar[str] = ugettext_noop('Advanced')
PARAMETERS_TAB: typing.ClassVar[str] = ugettext_noop('Parameters')
CREDENTIALS_TAB: typing.ClassVar[str] = ugettext_noop('Credentials')
TUNNEL_TAB: typing.ClassVar[str] = ugettext_noop('Tunnel')
DISPLAY_TAB: typing.ClassVar[str] = ugettext_noop('Display')
# : Static Callbacks simple registry
callbacks: typing.Dict[str, typing.Callable] = {}
# Helpers
@staticmethod
def convertToChoices(vals):
def convertToChoices(vals: typing.Iterable[str]) -> typing.List[typing.Dict[str, str]]:
"""
Helper to convert from array of strings to the same dict used in choice,
multichoice, ..
@ -109,13 +110,13 @@ class gui:
return res
@staticmethod
def convertToList(vals):
if vals is not None:
def convertToList(vals: typing.Iterable[str]) -> typing.List[str]:
if vals:
return [str(v) for v in vals]
return []
@staticmethod
def choiceItem(id_, text):
def choiceItem(id_: str, text: str) -> typing.Dict[str, str]:
"""
Helper method to create a single choice item.
@ -134,15 +135,15 @@ class gui:
return {'id': str(id_), 'text': str(text)}
@staticmethod
def choiceImage(id_, text, img):
return {'id': str(id_), 'text': str(text), 'img': img }
def choiceImage(id_: str, text: str, img: str) -> typing.Dict[str, str]:
return {'id': str(id_), 'text': str(text), 'img': img}
@staticmethod
def sortedChoices(choices):
return sorted(choices, key=lambda item: item['text'].lower())
@staticmethod
def strToBool(str_):
def strToBool(str_: typing.Union[str, bytes, bool]) -> bool:
"""
Converts the string "true" (case insensitive) to True (boolean).
Anything else is converted to false
@ -160,7 +161,7 @@ class gui:
return False
@staticmethod
def boolToStr(bol):
def boolToStr(bol: bool) -> str:
"""
Converts a boolean to the string representation. True is converted to
"true", False to "false".
@ -177,7 +178,7 @@ class gui:
# Classes
class InputField(object):
class InputField:
"""
Class representing an simple input field.
This class is not directly usable, must be used by any inherited class
@ -214,20 +215,22 @@ class gui:
so if you use both, the used one will be "value". This is valid for
all form fields.
"""
TEXT_TYPE = 'text'
TEXTBOX_TYPE = 'textbox'
NUMERIC_TYPE = 'numeric'
PASSWORD_TYPE = 'password'
HIDDEN_TYPE = 'hidden'
CHOICE_TYPE = 'choice'
MULTI_CHOICE_TYPE = 'multichoice'
EDITABLE_LIST = 'editlist'
CHECKBOX_TYPE = 'checkbox'
IMAGECHOICE_TYPE = 'imgchoice'
DATE_TYPE = 'date'
INFO_TYPE = 'dummy'
TEXT_TYPE: typing.ClassVar[str] = 'text'
TEXTBOX_TYPE: typing.ClassVar[str] = 'textbox'
NUMERIC_TYPE: typing.ClassVar[str] = 'numeric'
PASSWORD_TYPE: typing.ClassVar[str] = 'password'
HIDDEN_TYPE: typing.ClassVar[str] = 'hidden'
CHOICE_TYPE: typing.ClassVar[str] = 'choice'
MULTI_CHOICE_TYPE: typing.ClassVar[str] = 'multichoice'
EDITABLE_LIST: typing.ClassVar[str] = 'editlist'
CHECKBOX_TYPE: typing.ClassVar[str] = 'checkbox'
IMAGECHOICE_TYPE: typing.ClassVar[str] = 'imgchoice'
DATE_TYPE: typing.ClassVar[str] = 'date'
INFO_TYPE: typing.ClassVar[str] = 'dummy'
DEFAULT_LENTGH = 32 # : If length of some fields are not especified, this value is used as default
DEFAULT_LENTGH: typing.ClassVar[int] = 32 # : If length of some fields are not especified, this value is used as default
_data: typing.Dict[str, typing.Any]
def __init__(self, **options):
self._data = {
@ -244,7 +247,7 @@ class gui:
if 'tab' in options:
self._data['tab'] = options.get('tab')
def _type(self, type_):
def _type(self, type_: str):
"""
Sets the type of this field.
@ -253,12 +256,21 @@ class gui:
"""
self._data['type'] = type_
def isType(self, type_):
def isType(self, type_: str) -> bool:
"""
Returns true if this field is of specified type
"""
return self._data['type'] == type_
def isSerializable(self):
return True
def num(self) -> int:
return -1
def isTrue(self) -> bool:
return False
@property
def value(self):
"""
@ -270,13 +282,13 @@ class gui:
return self._data['value'] if self._data['value'] is not None else self.defValue
@value.setter
def value(self, value):
def value(self, value: typing.Any):
"""
Stores new value (not the default one)
"""
self._setValue(value)
def _setValue(self, value):
def _setValue(self, value: typing.Any):
"""
So we can override value setting at descendants
"""
@ -290,24 +302,24 @@ class gui:
alter original values.
"""
data = self._data.copy()
data['label'] = data['label'] != '' and _(data['label']) or ''
data['tooltip'] = data['tooltip'] != '' and _(data['tooltip']) or ''
data['label'] = _(data['label']) if data['label'] else ''
data['tooltip'] = _(data['tooltip']) if data['tooltip'] else ''
if 'tab' in data:
data['tab'] = _(data['tab'])
return data
@property
def defValue(self):
def defValue(self) -> typing.Any:
"""
Returns the default value for this field
"""
return self._data['defvalue']
@defValue.setter
def defValue(self, defValue):
def defValue(self, defValue: typing.Any):
self.setDefValue(defValue)
def setDefValue(self, defValue):
def setDefValue(self, defValue: typing.Any):
"""
Sets the default value of the field·
@ -317,7 +329,7 @@ class gui:
self._data['defvalue'] = defValue
@property
def label(self):
def label(self) -> str:
return self._data['label']
class TextField(InputField):
@ -383,14 +395,12 @@ class gui:
def __init__(self, **options):
super().__init__(**options)
minValue = options.get('minValue', '987654321')
maxValue = options.get('maxValue', '987654321')
self._data['minValue'] = int(minValue)
self._data['maxValue'] = int(maxValue)
self._data['minValue'] = int(options.get('minValue', '987654321'))
self._data['maxValue'] = int(options.get('maxValue', '987654321'))
self._type(gui.InputField.NUMERIC_TYPE)
def num(self):
def num(self) -> int:
"""
Return value as integer
"""
@ -420,7 +430,7 @@ class gui:
"""
def processValue(self, valueName, options):
def processValue(self, valueName: str, options: typing.Dict[str, typing.Any]) -> None:
val = options.get(valueName, '')
if val == '' and valueName == 'defvalue':
@ -508,10 +518,10 @@ class gui:
def __init__(self, **options):
super().__init__(**options)
self._isSerializable = options.get('serializable', '') != ''
self._isSerializable: bool = options.get('serializable', '') != ''
self._type(gui.InputField.HIDDEN_TYPE)
def isSerializable(self):
def isSerializable(self) -> bool:
return self._isSerializable
class CheckBoxField(InputField):
@ -538,10 +548,10 @@ class gui:
self._type(gui.InputField.CHECKBOX_TYPE)
@staticmethod
def _checkTrue(val):
def _checkTrue(val: typing.Union[str, bytes, bool]) -> bool:
return val in (True, 'true', 'True', b'true', b'True')
def _setValue(self, value):
def _setValue(self, value: typing.Union[str, bytes, bool]):
"""
Override to set value to True or False (bool)
"""
@ -791,10 +801,9 @@ class UserInterfaceType(type):
Metaclass definition for moving the user interface descriptions to a usable
better place
"""
def __new__(cls, classname, bases, classDict): # pylint: disable=bad-mcs-classmethod-argument
newClassDict = {}
_gui = {}
_gui: typing.Dict[str, gui.InputField] = {}
# We will keep a reference to gui elements also at _gui so we can access them easily
for attrName, attr in classDict.items():
if isinstance(attr, gui.InputField):
@ -816,7 +825,10 @@ class UserInterface(metaclass=UserInterfaceType):
the gui form fields values.
"""
def __init__(self, values=None):
_gui: typing.Dict[str, gui.InputField]
def __init__(self, values: typing.Optional[typing.Dict[str, str]] = None):
import copy
# : If there is an array of elements to initialize, simply try to store values on form fields
# Generate a deep copy of inherited Gui, so each User Interface instance has its own "field" set, and do not share the "fielset" with others, what can be really dangerous
@ -832,7 +844,7 @@ class UserInterface(metaclass=UserInterfaceType):
else:
logger.warning('Field %s not found', k)
def initGui(self):
def initGui(self) -> None:
"""
This method gives the oportunity to initialize gui fields before they
are send to administration client.
@ -852,7 +864,7 @@ class UserInterface(metaclass=UserInterfaceType):
of this posibility in a near version...
"""
def valuesDict(self):
def valuesDict(self) -> gui.ValuesDictType:
"""
Returns own data needed for user interaction as a dict of key-names ->
values. The values returned must be strings.
@ -879,7 +891,7 @@ class UserInterface(metaclass=UserInterfaceType):
extracted from form fields
"""
dic = {}
dic: gui.ValuesDictType = {}
for k, v in self._gui.items():
if v.isType(gui.InputField.EDITABLE_LIST):
dic[k] = gui.convertToList(v.value)
@ -894,7 +906,7 @@ class UserInterface(metaclass=UserInterfaceType):
logger.debug('Values Dict: %s', dic)
return dic
def serializeForm(self):
def serializeForm(self) -> bytes:
"""
All values stored at form fields are serialized and returned as a single
string
@ -910,6 +922,7 @@ class UserInterface(metaclass=UserInterfaceType):
# logger.debug('Caller is : {}'.format(inspect.stack()))
arr = []
val: typing.Any
for k, v in self._gui.items():
logger.debug('serializing Key: %s/%s', k, v.value)
if v.isType(gui.InputField.HIDDEN_TYPE) and v.isSerializable() is False:
@ -934,9 +947,10 @@ class UserInterface(metaclass=UserInterfaceType):
arr.append(k.encode('utf8') + b'\003' + val)
logger.debug('Arr, >>%s<<', arr)
return encoders.encode(b'\002'.join(arr), 'zip')
def unserializeForm(self, values):
return typing.cast(bytes, encoders.encode(b'\002'.join(arr), 'zip'))
def unserializeForm(self, values: bytes):
"""
This method unserializes the values previously obtained using
:py:meth:`serializeForm`, and stores
@ -953,13 +967,13 @@ class UserInterface(metaclass=UserInterfaceType):
continue
self._gui[k].value = self._gui[k].defValue
values = encoders.decode(values, 'zip')
values = typing.cast(bytes, encoders.decode(values, 'zip'))
if values == b'': # Has nothing
return
for txt in values.split(b'\002'):
k, v = txt.split(b'\003')
k = k.decode('utf8') # Convert name to unicode
kb, v = txt.split(b'\003')
k = kb.decode('utf8') # Convert name to unicode
if k in self._gui:
try:
if v[0] == 1:
@ -980,7 +994,7 @@ class UserInterface(metaclass=UserInterfaceType):
# logger.info('Invalid serialization data on {0} {1}'.format(self, values.encode('hex')))
@classmethod
def guiDescription(cls, obj=None):
def guiDescription(cls, obj=None) -> typing.List[typing.Dict[str, str]]:
"""
This simple method generates the theGui description needed by the
administration client, so it can

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012 Virtual Cable S.L.
# Copyright (c) 2012-2019 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,

View File

@ -31,9 +31,11 @@
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
"""
import logging
import typing
from django.db import models
from uds.core.Environment import Environment
from uds.core import Module
from uds.models.UUIDModel import UUIDModel
@ -50,7 +52,7 @@ class ManagedObjectModel(UUIDModel):
data = models.TextField(default='')
comments = models.CharField(max_length=256)
_cachedInstance = None
_cachedInstance: typing.Optional[Module] = None
class Meta(UUIDModel.Meta):
"""
@ -75,7 +77,7 @@ class ManagedObjectModel(UUIDModel):
self._cachedInstance = None # Ensures returns correct value on getInstance
def getInstance(self, values=None):
def getInstance(self, values: typing.Optional[typing.Dict[str, str]] = None) -> Module:
"""
Instantiates the object this record contains.
@ -103,14 +105,14 @@ class ManagedObjectModel(UUIDModel):
return obj
def getType(self):
def getType(self) -> typing.Type[Module]:
"""
Returns the type of self (as python type)
Must be overriden!!!
"""
raise NotImplementedError('getType has not been implemented for {}'.format(self.__class__))
def isOfType(self, type_):
def isOfType(self, type_: str) -> bool:
"""
return True if self if of the requested type, else returns False
"""