Refactorized groups manager and fixed ipInNetwork

Signed-off-by: Adolfo Gómez García <dkmaster@dkmon.com>
This commit is contained in:
Adolfo Gómez García 2022-09-09 02:13:20 +02:00
parent f6607d0416
commit 115d8562e1
2 changed files with 52 additions and 45 deletions

View File

@ -42,6 +42,17 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
class _LocalGrp(typing.NamedTuple):
name: str
group: 'Group'
is_valid: bool = False
is_pattern: bool = False
def matches(self, name: str) -> bool:
"""
Checks if this group name is equal to the provided name (case)
"""
return name.casefold() == self.name.casefold()
class GroupsManager:
"""
@ -63,7 +74,7 @@ class GroupsManager:
Managed groups names are compared using case insensitive comparison.
"""
_groups: typing.Dict[str, dict]
_groups: typing.List[_LocalGrp]
def __init__(self, dbAuthenticator: 'DBAuthenticator'):
"""
@ -72,48 +83,45 @@ class GroupsManager:
to which this groupsManager will be associated
"""
self._dbAuthenticator = dbAuthenticator
self._groups = (
{}
) # We just get active groups, inactive aren't visible to this class
# We just get active groups, inactive aren't visible to this class
self._groups = []
if dbAuthenticator.id: # If "fake" authenticator (that is, root user with no authenticator in fact)
for g in dbAuthenticator.groups.filter(state=State.ACTIVE, is_meta=False):
name = g.name.lower()
isPattern = name.find('pat:') == 0 # Is a pattern?
self._groups[name] = {
'name': g.name,
'group': Group(g),
'valid': False,
'pattern': isPattern,
}
self._groups.append(
_LocalGrp(
name=name[4:] if isPattern else name,
group=Group(g),
is_pattern=isPattern
)
)
def checkAllGroups(self, groupName: str):
def _checkAllGroups(self, groupName: str) -> typing.Iterable[int]:
"""
Returns true if this groups manager contains the specified group name (string)
"""
name = groupName.lower()
res = []
for gName, grp in self._groups.items():
if grp['pattern']:
for n, grp in enumerate(self._groups):
if grp.is_pattern:
logger.debug('Group is a pattern: %s', grp)
try:
logger.debug('Match: %s->%s', grp['name'][4:], name)
if re.search(grp['name'][4:], name, re.IGNORECASE) is not None:
res.append(grp) # Stop searching, one group at least matches
logger.debug('Match: %s->%s', grp.name, name)
if re.search(grp.name, name, re.IGNORECASE) is not None:
yield n
except Exception:
logger.exception('Exception in RE')
else:
logger.debug('Group: %s==%s', name, gName)
if name == gName:
res.append(grp)
return res
if grp.matches(name): # If group name matches
yield n
def getGroupsNames(self) -> typing.Iterable[str]:
"""
Return all groups names managed by this groups manager. The names are returned
as where inserted inside Database (most probably using administration interface)
"""
for g in self._groups.values():
yield g['group'].dbGroup().name
for g in self._groups:
yield g.group.dbGroup().name
def getValidGroups(self) -> typing.Iterable[Group]:
"""
@ -121,42 +129,41 @@ class GroupsManager:
"""
from uds.models import Group as DBGroup
lst: typing.List[str] = []
for g in self._groups.values():
if g['valid']:
lst += (g['group'].dbGroup().id,)
yield g['group']
valid_id_list: typing.List[int] = []
for group in self._groups:
if group.is_valid:
valid_id_list.append(group.group.dbGroup().id)
yield group.group
# Now, get metagroups and also return them
for g2 in DBGroup.objects.filter(
for db_group in DBGroup.objects.filter(
manager__id=self._dbAuthenticator.id, is_meta=True
): # @UndefinedVariable
gn = g2.groups.filter(id__in=lst, state=State.ACTIVE).count()
if g2.meta_if_any and gn > 0:
gn = g2.groups.count()
gn = db_group.groups.filter(id__in=valid_id_list, state=State.ACTIVE).count()
if db_group.meta_if_any and gn > 0:
gn = db_group.groups.count()
if (
gn == g2.groups.count()
gn == db_group.groups.count()
): # If a meta group is empty, all users belongs to it. we can use gn != 0 to check that if it is empty, is not valid
# This group matches
yield Group(g2)
yield Group(db_group)
def hasValidGroups(self):
"""
Checks if this groups manager has at least one group that has been
validated (using :py:meth:.validate)
"""
for g in self._groups.values():
if g['valid']:
return True
return False
return any(g.is_valid for g in self._groups)
def getGroup(self, groupName: str) -> typing.Optional[Group]:
"""
If this groups manager contains that group manager, it returns the
:py:class:uds.core.auths.group.Group representing that group name.
"""
if groupName.lower() in self._groups:
return self._groups[groupName.lower()]['group']
for group in self._groups:
if group.matches(groupName):
return group.group
return None
def validate(self, groupName: typing.Union[str, typing.Iterable]):
@ -175,16 +182,16 @@ class GroupsManager:
for n in groupName:
self.validate(n)
else:
for grp in self.checkAllGroups(groupName):
grp['valid'] = True
for n in self._checkAllGroups(groupName):
self._groups[n] = self._groups[n]._replace(is_valid=True)
def isValid(self, groupName: str) -> bool:
"""
Checks if this group name is marked as valid inside this groups manager.
Returns True if group name is marked as valid, False if it isn't.
"""
for grp in self.checkAllGroups(groupName):
if grp['valid']:
for n in self._checkAllGroups(groupName):
if self._groups[n].is_valid:
return True
return False

View File

@ -203,7 +203,7 @@ def ipInNetwork(
if isinstance(ip, str):
ip = ipToLong(ip)
if isinstance(network, str):
network = [networkFromString(network)]
network = networksFromString(network)
elif isinstance(network, NetworkType):
network = [network]