1
0
mirror of https://github.com/dkmstr/openuds.git synced 2024-12-23 17:34:17 +03:00

adding more tests

This commit is contained in:
Adolfo Gómez García 2022-08-28 01:31:09 +02:00
parent 1567996ebc
commit 7de259af87
22 changed files with 822 additions and 149 deletions

View File

@ -37,15 +37,6 @@ DATABASES = {
# 'CONN_MAX_AGE': 600, # Enable DB Pooling, 10 minutes max connection duration
}
}
# If testing, with test or with pytest, use the local test database
if any(arg.endswith('test') for arg in sys.argv):
DATABASES['default'] = {
'ENGINE': 'django.db.backends.sqlite3',
'OPTIONS': {
'timeout': 20,
},
'NAME': ':memory:',
}
ALLOWED_HOSTS = ['*']
@ -149,6 +140,19 @@ CACHES = {
},
}
# Update DB and CACHE y debug
if any(arg.endswith('test') or '/pytest' in arg for arg in sys.argv) or 'PYTEST_XDIST_WORKER' in os.environ:
DATABASES['default'] = {
'ENGINE': 'django.db.backends.sqlite3',
'OPTIONS': {
'timeout': 20,
},
'NAME': ':memory:',
}
CACHES['memory'] = {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
}
# Related to file uploading
FILE_UPLOAD_PERMISSIONS = 0o640
FILE_UPLOAD_DIRECTORY_PERMISSIONS = 0o750

View File

@ -28,5 +28,3 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from . import actor

View File

@ -29,6 +29,3 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from . import test_login_logout
from . import test_register
from . import test_initialize

View File

@ -28,7 +28,3 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from . import core
from . import web
from . import messaging
from . import REST

View File

@ -28,5 +28,3 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from . import util
from . import managers

View File

@ -28,5 +28,3 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from . import test_cache
from . import test_stats_counters

View File

@ -0,0 +1,258 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from ...utils.test import UDSTransactionTestCase
from ...fixtures.calendars import createCalendars
from uds.core.util import calendar
from uds.models import Calendar
import datetime
class CalendarTests(UDSTransactionTestCase):
def setUp(self) -> None:
createCalendars()
def test_calendar_dayly(self):
cal = Calendar.objects.get(uuid='2cf6846b-d889-57ce-bb35-e647040a95b6')
chk = calendar.CalendarChecker(cal)
calendar.CalendarChecker.updates = 0
# Rule with end
# update 1
self.assertFalse(chk.check(datetime.datetime(2014, 9, 1, 21, 0, 0)))
# update 2
self.assertFalse(chk.check(datetime.datetime(2015, 9, 1, 20, 59, 0)))
self.assertTrue(chk.check(datetime.datetime(2015, 9, 1, 21, 0, 0)))
# update 3
self.assertTrue(chk.check(datetime.datetime(2015, 10, 1, 0, 0, 0)))
self.assertTrue(chk.check(datetime.datetime(2015, 10, 1, 1, 59, 0)))
self.assertFalse(chk.check(datetime.datetime(2015, 10, 1, 2, 0, 0)))
self.assertTrue(chk.check(datetime.datetime(2015, 10, 1, 21, 0, 0)))
# update 4
self.assertFalse(chk.check(datetime.datetime(2015, 10, 2, 21, 0, 0)))
# Rule without end, but with beginning
# update 5
self.assertFalse(chk.check(datetime.datetime(2014, 9, 1, 8, 0, 0)))
# update 6
self.assertFalse(chk.check(datetime.datetime(2015, 9, 1, 7, 59, 0)))
self.assertTrue(chk.check(datetime.datetime(2015, 9, 1, 8, 0, 0)))
# updates... (total is 366, because previous updates has been cached)
for day in range(365):
date = datetime.date(2015, 1, 1) + datetime.timedelta(days=day)
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(7, 59, 0)))
)
fnc = (
self.assertTrue
if date >= datetime.date(2015, 9, 1)
else self.assertFalse
)
fnc(chk.check(datetime.datetime.combine(date, datetime.time(8, 0, 0))))
fnc(chk.check(datetime.datetime.combine(date, datetime.time(19, 59, 0))))
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(20, 0, 0)))
)
self.assertEqual(chk.updates, 366)
def test_calendar_weekly(self):
cal = Calendar.objects.get(uuid='c1221a6d-3848-5fa3-ae98-172662c0f554')
chk = calendar.CalendarChecker(cal)
calendar.CalendarChecker.updates = 0
valid_days = [1, 8, 15, 22, 29]
# Rule with end
for day in range(30):
date = datetime.date(2015, 9, day + 1)
fnc = self.assertTrue if (day + 1) in valid_days else self.assertFalse
fnc(chk.check(datetime.datetime.combine(date, datetime.time(10, 0, 0))))
fnc(chk.check(datetime.datetime.combine(date, datetime.time(11, 59, 0))))
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(9, 59, 0)))
)
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(12, 0, 0)))
)
# update 31
self.assertFalse(chk.check(datetime.datetime(2015, 8, 25, 10, 0, 0)))
# update 32
self.assertFalse(chk.check(datetime.datetime(2015, 10, 6, 10, 0, 0)))
# Rule without end
# updates... (total is 365, because previous updates has been cached)
for day in range(365):
date = datetime.date(2015, 1, 1) + datetime.timedelta(days=day)
fnc = (
self.assertTrue
if date >= datetime.date(2015, 9, 1) and date.isoweekday() == 2
else self.assertFalse
)
fnc(chk.check(datetime.datetime.combine(date, datetime.time(7, 0, 0))))
fnc(chk.check(datetime.datetime.combine(date, datetime.time(8, 59, 0))))
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(6, 59, 0)))
)
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(9, 0, 0)))
)
self.assertEqual(chk.updates, 365)
def test_calendar_monthly(self):
cal = Calendar.objects.get(uuid='353c4cb8-e02d-5387-a18f-f634729fde81')
chk = calendar.CalendarChecker(cal)
calendar.CalendarChecker.updates = 0
# Updates 1..730
for day in range(730):
date = datetime.date(2015, 1, 1) + datetime.timedelta(days=day)
fnc = (
self.assertTrue
if date.day == 1
and datetime.date(2015, 9, 1) <= date <= datetime.date(2015, 11, 1)
else self.assertFalse
)
fnc2 = (
self.assertTrue
if date.day == 1 and date >= datetime.date(2015, 9, 1)
else self.assertFalse
)
fnc(chk.check(datetime.datetime.combine(date, datetime.time(10, 0, 0))))
fnc(chk.check(datetime.datetime.combine(date, datetime.time(11, 59, 0))))
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(9, 59, 0)))
)
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(12, 0, 0)))
)
fnc2(chk.check(datetime.datetime.combine(date, datetime.time(7, 0, 0))))
fnc2(chk.check(datetime.datetime.combine(date, datetime.time(8, 59, 0))))
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(6, 59, 0)))
)
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(9, 0, 0)))
)
self.assertEqual(chk.updates, 730)
def test_calendar_weekdays(self):
cal = Calendar.objects.get(uuid='bccfd011-605b-565f-a08e-80bf75114dce')
chk = calendar.CalendarChecker(cal)
calendar.CalendarChecker.updates = 0
valid_days = [1, 3, 5]
for day in range(730):
date = datetime.date(2015, 1, 1) + datetime.timedelta(days=day)
fnc = (
self.assertTrue
if date.isoweekday() in valid_days
and datetime.date(2015, 9, 1) <= date <= datetime.date(2015, 10, 1)
else self.assertFalse
)
fnc2 = (
self.assertTrue
if date.isoweekday() in valid_days and date >= datetime.date(2015, 9, 1)
else self.assertFalse
)
fnc(chk.check(datetime.datetime.combine(date, datetime.time(10, 0, 0))))
fnc(chk.check(datetime.datetime.combine(date, datetime.time(11, 59, 0))))
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(9, 59, 0)))
)
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(12, 0, 0)))
)
fnc2(chk.check(datetime.datetime.combine(date, datetime.time(7, 0, 0))))
fnc2(chk.check(datetime.datetime.combine(date, datetime.time(8, 59, 0))))
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(6, 59, 0)))
)
self.assertFalse(
chk.check(datetime.datetime.combine(date, datetime.time(9, 0, 0)))
)
self.assertEqual(chk.updates, 730)
def test_calendar_durations(self):
cal = Calendar.objects.get(uuid='60160f94-c8fe-5fdc-bbbe-325010980106')
chk = calendar.CalendarChecker(cal)
# Minutes
self.assertFalse(chk.check(datetime.datetime(2014, 12, 31, 23, 59, 59)))
self.assertTrue(chk.check(datetime.datetime(2015, 1, 1, 0, 0, 0)))
self.assertTrue(chk.check(datetime.datetime(2015, 1, 1, 0, 1, 59)))
self.assertFalse(chk.check(datetime.datetime(2015, 1, 1, 0, 2, 0)))
# Hours
self.assertFalse(chk.check(datetime.datetime(2015, 1, 31, 23, 59, 59)))
self.assertTrue(chk.check(datetime.datetime(2015, 2, 1, 0, 0, 0)))
self.assertTrue(chk.check(datetime.datetime(2015, 2, 1, 1, 59, 59)))
self.assertFalse(chk.check(datetime.datetime(2015, 2, 1, 2, 0, 0)))
# Days
self.assertFalse(chk.check(datetime.datetime(2015, 2, 28, 23, 59, 59)))
self.assertTrue(chk.check(datetime.datetime(2015, 3, 1, 0, 0, 0)))
self.assertTrue(chk.check(datetime.datetime(2015, 3, 2, 23, 59, 59)))
self.assertFalse(chk.check(datetime.datetime(2015, 3, 3, 0, 0, 0)))
# Weeks
self.assertFalse(chk.check(datetime.datetime(2015, 3, 31, 23, 59, 59)))
self.assertTrue(chk.check(datetime.datetime(2015, 4, 1, 8, 0, 0)))
self.assertTrue(chk.check(datetime.datetime(2015, 4, 15, 7, 59, 59)))
self.assertFalse(chk.check(datetime.datetime(2015, 4, 15, 8, 0, 0)))

View File

@ -0,0 +1,87 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
import logging
from ...utils.test import UDSTransactionTestCase
from uds.core.util import net
logger = logging.getLogger(__name__)
class NetTestCase(UDSTransactionTestCase):
def testNetworkFromString(self):
for n in (
('*', 0, 4294967295),
('192.168.0.1', 3232235521, 3232235521),
('192.168.0.*', 3232235520, 3232235775),
('192.168.*.*', 3232235520, 3232301055),
('192.168.*', 3232235520, 3232301055),
('192.*.*.*', 3221225472, 3238002687),
('192.*.*', 3221225472, 3238002687),
('192.*', 3221225472, 3238002687),
('192.168.0.1 netmask 255.255.255.0', 3232235520, 3232235775),
('192.168.0.1/8', 3221225472, 3238002687),
('192.168.0.1/28', 3232235520, 3232235535),
('192.168.0.1-192.168.0.87', 3232235521, 3232235607),
('192.168.0.1 netmask 255.255.255.0', 3232235520, 3232235775),
):
try:
multiple_net: typing.List[net.NetworkType] = net.networksFromString(n[0])
self.assertEqual(len(multiple_net), 1, 'Incorrect number of network returned from {0}'.format(n[0]))
self.assertEqual(multiple_net[0][0], n[1], 'Incorrect network start value for {0}'.format(n[0]))
self.assertEqual(multiple_net[0][1], n[2], 'Incorrect network end value for {0}'.format(n[0]))
single_net: net.NetworkType = net.networkFromString(n[0])
self.assertEqual(len(single_net), 2, 'Incorrect number of network returned from {0}'.format(n[0]))
self.assertEqual(single_net[0], n[1], 'Incorrect network start value for {0}'.format(n[0]))
self.assertEqual(single_net[1], n[2], 'Incorrect network end value for {0}'.format(n[0]))
except Exception as e:
logger.exception('Running test')
raise Exception('Value Error: {}. Input string: {}'.format(e, n[0]))
for n in ('192.168.0', '192.168.0.5-192.168.0.3', 'no net'):
with self.assertRaises(ValueError):
net.networksFromString(n)
self.assertEqual(net.ipToLong('192.168.0.5'), 3232235525)
self.assertEqual(net.longToIp(3232235525), '192.168.0.5')
for n in range(0, 255):
self.assertTrue(net.ipInNetwork('192.168.0.{}'.format(n), '192.168.0.0/24'))
for n in range(4294):
self.assertTrue(net.ipInNetwork(n*1000, [net.NetworkType(0, 4294967295)]))
self.assertTrue(net.ipInNetwork(n*1000, net.NetworkType(0, 4294967295)))

View File

@ -0,0 +1,199 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2014 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import typing
from uds.core.util import permissions
from uds.core.util import ot
from uds import models
from ...utils.test import UDSTransactionTestCase
from ...fixtures import authenticators as authenticators_fixtures, services as services_fixtures
class PermissionsTests(UDSTransactionTestCase):
authenticator: models.Authenticator
groups: typing.List[models.Group]
users: typing.List[models.User]
admins: typing.List[models.User]
staffs: typing.List[models.User]
userService: models.UserService
def setUp(self) -> None:
self.authenticator = authenticators_fixtures.createAuthenticator()
self.groups = authenticators_fixtures.createGroups(self.authenticator)
self.users = authenticators_fixtures.createUsers(self.authenticator, groups=self.groups)
self.admins = authenticators_fixtures.createUsers(self.authenticator, is_admin=True, groups=self.groups)
self.staffs = authenticators_fixtures.createUsers(self.authenticator, is_staff=True, groups=self.groups)
self.userService = services_fixtures.createSingleTestingUserServiceStructure(services_fixtures.createProvider(), self.users[0], list(self.users[0].groups.all()), 'managed')
def doTestUserPermissions(self, obj, user: models.User):
permissions.addUserPermission(user, obj, permissions.PERMISSION_NONE)
self.assertEquals(models.Permissions.objects.count(), 1)
perm = models.Permissions.objects.all()[0]
self.assertEquals(perm.object_type, ot.getObjectType(obj))
self.assertEquals(perm.object_id, obj.pk)
self.assertEquals(perm.permission, permissions.PERMISSION_NONE)
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_NONE))
self.assertEqual(permissions.checkPermissions(user, obj, permissions.PERMISSION_READ), user.is_admin)
self.assertEqual(permissions.checkPermissions(user, obj, permissions.PERMISSION_MANAGEMENT), user.is_admin)
self.assertEqual(permissions.checkPermissions(user, obj, permissions.PERMISSION_ALL), user.is_admin)
# Add a new permission, must overwrite the previous one
permissions.addUserPermission(user, obj, permissions.PERMISSION_ALL)
self.assertEquals(models.Permissions.objects.count(), 1)
perm = models.Permissions.objects.all()[0]
self.assertEquals(perm.object_type, ot.getObjectType(obj))
self.assertEquals(perm.object_id, obj.pk)
self.assertEquals(perm.permission, permissions.PERMISSION_ALL)
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_NONE))
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_READ))
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_MANAGEMENT))
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_ALL))
# Again, with read
permissions.addUserPermission(user, obj, permissions.PERMISSION_READ)
self.assertEquals(models.Permissions.objects.count(), 1)
perm = models.Permissions.objects.all()[0]
self.assertEquals(perm.object_type, ot.getObjectType(obj))
self.assertEquals(perm.object_id, obj.pk)
self.assertEquals(perm.permission, permissions.PERMISSION_READ)
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_NONE))
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_READ))
self.assertEqual(permissions.checkPermissions(user, obj, permissions.PERMISSION_MANAGEMENT), user.is_admin)
self.assertEqual(permissions.checkPermissions(user, obj, permissions.PERMISSION_ALL), user.is_admin)
# Remove obj, permissions must have gone away
obj.delete()
self.assertEquals(models.Permissions.objects.count(), 0)
def doTestGroupPermissions(self, obj, user: models.User):
group = user.groups.all()[0]
permissions.addGroupPermission(group, obj, permissions.PERMISSION_NONE)
self.assertEquals(models.Permissions.objects.count(), 1)
perm = models.Permissions.objects.all()[0]
self.assertEquals(perm.object_type, ot.getObjectType(obj))
self.assertEquals(perm.object_id, obj.pk)
self.assertEquals(perm.permission, permissions.PERMISSION_NONE)
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_NONE))
# Admins has all permissions ALWAYS
self.assertEqual(permissions.checkPermissions(user, obj, permissions.PERMISSION_READ), user.is_admin)
self.assertEqual(permissions.checkPermissions(user, obj, permissions.PERMISSION_ALL), user.is_admin)
permissions.addGroupPermission(group, obj, permissions.PERMISSION_ALL)
self.assertEquals(models.Permissions.objects.count(), 1)
perm = models.Permissions.objects.all()[0]
self.assertEquals(perm.object_type, ot.getObjectType(obj))
self.assertEquals(perm.object_id, obj.pk)
self.assertEquals(perm.permission, permissions.PERMISSION_ALL)
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_NONE))
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_READ))
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_ALL))
# Add user permission, DB must contain both an return ALL
permissions.addUserPermission(user, obj, permissions.PERMISSION_READ)
self.assertEquals(models.Permissions.objects.count(), 2)
perm = models.Permissions.objects.all()[0]
self.assertEquals(perm.object_type, ot.getObjectType(obj))
self.assertEquals(perm.object_id, obj.pk)
self.assertEquals(perm.permission, permissions.PERMISSION_ALL)
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_NONE))
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_READ))
self.assertTrue(permissions.checkPermissions(user, obj, permissions.PERMISSION_ALL))
# Remove obj, permissions must have gone away
obj.delete()
self.assertEquals(models.Permissions.objects.count(), 0)
# Every tests reverses the DB and recalls setUp
def test_user_auth_permissions_user(self):
self.doTestUserPermissions(self.authenticator, self.users[0])
def test_user_auth_permissions_admin(self):
self.doTestUserPermissions(self.authenticator, self.admins[0])
def test_user_auth_permissions_staff(self):
self.doTestUserPermissions(self.authenticator, self.staffs[0])
def test_group_auth_permissions_user(self):
self.doTestGroupPermissions(self.authenticator, self.users[0])
def test_group_auth_permissions_admin(self):
self.doTestGroupPermissions(self.authenticator, self.admins[0])
def test_group_auth_permissions_staff(self):
self.doTestGroupPermissions(self.authenticator, self.staffs[0])
def test_transport_permissions_user(self):
self.doTestUserPermissions(self.userService.deployed_service.transports.first(), self.users[0])
def test_transport_permissions_admin(self):
self.doTestUserPermissions(self.userService.deployed_service.transports.first(), self.admins[0])
def test_transport_permissions_staff(self):
self.doTestUserPermissions(self.userService.deployed_service.transports.first(), self.staffs[0])
'''def test_user_transport_permissions(self):
self.doTestUserPermissions(Transport.objects.all()[0])
def test_group_transport_permissions(self):
self.doTestGroupPermissions(Transport.objects.all()[0])
def test_user_network_permissions(self):
self.doTestUserPermissions(Network.objects.all()[0])
def test_group_network_permissions(self):
self.doTestGroupPermissions(Network.objects.all()[0])
def test_user_provider_permissions(self):
self.doTestUserPermissions(Provider.objects.all()[0])
def test_group_provider_permissions(self):
self.doTestGroupPermissions(Provider.objects.all()[0])
def test_user_service_permissions(self):
self.doTestUserPermissions(Service.objects.all()[0])
def test_group_service_permissions(self):
self.doTestGroupPermissions(Service.objects.all()[0])
def test_user_pool_permissions(self):
self.doTestUserPermissions(ServicePool.objects.all()[0])
def test_group_pool_permissions(self):
self.doTestGroupPermissions(ServicePool.objects.all()[0])
'''

View File

@ -28,7 +28,3 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from uds.models import notifications
from . import authenticators
from . import notifiers
from . import services

View File

@ -28,6 +28,7 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
import copy
import typing
import datetime
import random
@ -35,59 +36,199 @@ import random
from uds import models
from uds.models.calendar_rule import freqs, dunits
# Counters so we can reinvoke the same method and generate new data
glob = {
'calendar_id': 1,
'calendar_rule_id': 1,
# fixtures for calendars and calendar rules
CALENDAR_DATA: typing.Mapping[str, typing.List[typing.Dict[str, typing.Union[str,int,None]]]] = {
'calendars': [
{
"modified": "2015-09-18T00:04:31.792",
"uuid": "2cf6846b-d889-57ce-bb35-e647040a95b6",
"comments": "Calendar by days",
"name": "Calendar Dayly",
},
{
"modified": "2015-09-18T00:05:44.386",
"uuid": "c1221a6d-3848-5fa3-ae98-172662c0f554",
"comments": "Calendar by weeks",
"name": "Calendar Weekly",
},
{
"modified": "2015-09-18T00:03:47.362",
"uuid": "353c4cb8-e02d-5387-a18f-f634729fde81",
"comments": "Calendar by months",
"name": "Calendar Monthly",
},
{
"modified": "2015-09-18T00:05:33.958",
"uuid": "bccfd011-605b-565f-a08e-80bf75114dce",
"comments": "Calendar by day of week",
"name": "Calendar Weekdays",
},
{
"modified": "2015-09-18T00:19:51.131",
"uuid": "60160f94-c8fe-5fdc-bbbe-325010980106",
"comments": "Calendar tests for durations",
"name": "Calendar xDurations",
},
],
'rules': [
{
"end": None,
"uuid": "42846f5f-6a61-5257-beb5-67beb179d6b1",
"duration_unit": "HOURS",
"interval": 1,
"comments": "Rule with 1 day interval, no end",
"start": "2015-09-01T08:00:00",
"frequency": "DAILY",
"duration": 12,
"calendar": 0,
"name": "Rule interval 1 day, no end",
},
{
"end": "2015-10-01",
"uuid": "f20d8841-72d2-5054-b590-37dc19729b80",
"duration_unit": "MINUTES",
"interval": 1,
"comments": "Rule with 1 day interval, with an end",
"start": "2015-09-01T21:00:00",
"frequency": "DAILY",
"duration": 300,
"calendar": 0,
"name": "Rule interval 1 day, end",
},
{
"end": None,
"uuid": "935cceba-4384-50ba-a125-ea40727f0609",
"duration_unit": "HOURS",
"interval": 1,
"comments": "Rule with 1 week interval, no end.",
"start": "2015-09-01T07:00:00",
"frequency": "WEEKLY",
"duration": 2,
"calendar": 1,
"name": "Rule with 1 week interval, no end",
},
{
"end": "2015-10-01",
"uuid": "53c94b8a-6ab4-5c06-b863-083e88bd8469",
"duration_unit": "MINUTES",
"interval": 1,
"comments": "Rule with 1 week interval, with end",
"start": "2015-09-01T10:00:00",
"frequency": "WEEKLY",
"duration": 120,
"calendar": 1,
"name": "Rule with 1 week interval, with end",
},
{
"end": None,
"uuid": "ff8168a4-0c0c-5a48-acee-f8b3f04d52b8",
"duration_unit": "HOURS",
"interval": 1,
"comments": "Rule with 1 month interval, no end",
"start": "2015-09-01T07:00:00",
"frequency": "MONTHLY",
"duration": 2,
"calendar": 2,
"name": "Rule with 1 month interval, no end",
},
{
"end": "2015-11-01",
"uuid": "0c4e2086-f807-5801-889c-3d568e42033f",
"duration_unit": "MINUTES",
"interval": 1,
"comments": "Rule with 1 month interval, with end",
"start": "2015-09-01T10:00:00",
"frequency": "MONTHLY",
"duration": 120,
"calendar": 2,
"name": "Rule with 1 month interval, with end",
},
{
"end": None,
"uuid": "3227f381-c017-5d5c-b2ca-863e5ac643ce",
"duration_unit": "HOURS",
"interval": 42,
"comments": "Rule for Mon, Wed & Fri, no end",
"start": "2015-09-01T07:00:00",
"frequency": "WEEKDAYS",
"duration": 2,
"calendar": 3,
"name": "Rule for Mon, Wed & Fri, no end",
},
{
"end": "2015-10-01",
"uuid": "2c16056e-97b1-5a4e-ae34-ab99c73ddd8f",
"duration_unit": "MINUTES",
"interval": 42,
"comments": "Rule for Mon, Wed & Fri, with end",
"start": "2015-09-01T10:00:00",
"frequency": "WEEKDAYS",
"duration": 120,
"calendar": 3,
"name": "Rule for Mon, Wed & Fri, with end",
},
{
"end": "2015-01-01",
"uuid": "a4dd4e82-65bd-5824-bd78-95eafb40abf5",
"duration_unit": "MINUTES",
"interval": 1,
"comments": "For testing minutes",
"start": "2015-01-01T00:00:00",
"frequency": "DAILY",
"duration": 2,
"calendar": 4,
"name": "Test Minutes",
},
{
"end": "2015-02-01",
"uuid": "9194d314-a6b0-5d7f-a3e3-08ff475e271c",
"duration_unit": "HOURS",
"interval": 1,
"comments": "for testing hours",
"start": "2015-02-01T00:00:00",
"frequency": "DAILY",
"duration": 2,
"calendar": 4,
"name": "Test Hours",
},
{
"end": "2015-03-01",
"uuid": "bffb7290-16eb-5be0-adb2-6b454d6d7b49",
"duration_unit": "DAYS",
"interval": 1,
"comments": "For testing days",
"start": "2015-03-01T00:00:00",
"frequency": "DAILY",
"duration": 2,
"calendar": 4,
"name": "Test Days",
},
{
"end": "2015-04-01",
"uuid": "dc8e30e9-2bf2-5008-a46b-2457376fb2e0",
"duration_unit": "WEEKS",
"interval": 1,
"comments": "for testing weeks",
"start": "2015-04-01T08:00:00",
"frequency": "DAILY",
"duration": 2,
"calendar": 4,
"name": "Test Weeks",
},
],
}
def createCalendars(number: int) -> typing.List[models.Calendar]:
"""
Creates some testing calendars
"""
def createCalendars() -> typing.Tuple[typing.List[models.Calendar], typing.List[models.CalendarRule]]:
calendars: typing.List[models.Calendar] = []
for i in range(number):
calendar = models.Calendar.objects.create(
name='Calendar {}'.format(glob['calendar_id']),
comments='Calendar {} comments'.format(glob['calendar_id']),
)
calendars.append(calendar)
glob['calendar_id'] += 1
return calendars
def createRules(number: int, calendar: models.Calendar) -> typing.List[models.CalendarRule]:
"""
Creates some testing rules associated to a calendar
"""
rules: typing.List[models.CalendarRule] = []
rnd = random.Random() # nosec: testing purposes
for i in range(number):
# All rules will start now
# Rules duration will be a random between 1 and 10 days
# freqs a random value from freqs
# interval is a random value between 1 and 10
# duration is a random value between 0 and 24
# duration_unit is a random from dunits
start = datetime.datetime.now()
end = start + datetime.timedelta(days=rnd.randint(1, 10))
freq = rnd.choice(freqs)
interval = rnd.randint(1, 10)
duration = rnd.randint(0, 24)
duration_unit = rnd.choice(dunits)
for calendar in CALENDAR_DATA["calendars"]:
calendars.append(models.Calendar.objects.create(**calendar))
for r in CALENDAR_DATA["rules"]:
# Extract parent calendar
rule = r.copy()
parent = calendars[typing.cast(int, rule['calendar'])]
del rule['calendar']
rules.append(parent.rules.create(**rule))
rule = models.CalendarRule.objects.create(
calendar=calendar,
name='Rule {}'.format(glob['calendar_rule_id']),
comments='Rule {} comments'.format(glob['calendar_rule_id']),
start=start,
end=end,
freq=freq,
interval=interval,
duration=duration,
duration_unit=duration_unit,
)
rules.append(rule)
glob['calendar_rule_id'] += 1
return rules
return calendars, rules

View File

@ -47,8 +47,10 @@ glob = {
'user_service_id': 1,
}
def createProvider() -> models.Provider:
from uds.services.Test.provider import TestProvider
provider = models.Provider()
provider.name = 'Testing provider {}'.format(glob['provider_id'])
provider.comments = 'Tesging provider comment {}'.format(glob['provider_id'])
@ -62,7 +64,8 @@ def createProvider() -> models.Provider:
def createSingleTestingUserServiceStructure(
provider: 'models.Provider',
user: 'models.User', groups: typing.List['models.Group'],
user: 'models.User',
groups: typing.List['models.Group'],
type_: typing.Union[typing.Literal['managed'], typing.Literal['unmanaged']],
) -> 'models.UserService':
@ -100,20 +103,6 @@ def createSingleTestingUserServiceStructure(
)
glob['osmanager_id'] += 1
values = {
'testURL': 'http://www.udsenterprise.com',
'forceNewWindow': True,
}
transport: 'models.Transport' = models.Transport.objects.create(
name='Transport %d' % (glob['transport_id']),
comments='Comment for Trnasport %d' % (glob['transport_id']),
data_type=TestTransport.typeType,
data=TestTransport(
environment.Environment(str(glob['transport_id'])), values
).serialize(),
)
glob['transport_id'] += 1
service_pool: 'models.ServicePool' = service.deployedServices.create(
name='Service pool %d' % (glob['service_pool_id']),
short_name='pool%d' % (glob['service_pool_id']),
@ -129,11 +118,27 @@ def createSingleTestingUserServiceStructure(
)
glob['service_pool_id'] += 1
values = {
'testURL': 'http://www.udsenterprise.com',
'forceNewWindow': True,
}
transport: 'models.Transport' = models.Transport.objects.create(
name='Transport %d' % (glob['transport_id']),
comments='Comment for Trnasport %d' % (glob['transport_id']),
data_type=TestTransport.typeType,
data=TestTransport(
environment.Environment(str(glob['transport_id'])), values
).serialize(),
)
glob['transport_id'] += 1
service_pool.publications.add(publication)
for g in groups:
service_pool.assignedGroups.add(g)
service_pool.transports.add(transport)
service_pool.transports.add(transport)
user_service: 'models.UserService' = service_pool.userServices.create(
friendly_name='user-service-{}'.format(glob['user_service_id']),
publication=publication,
@ -147,4 +152,4 @@ def createSingleTestingUserServiceStructure(
src_ip=generators.random_ip(),
)
return user_service
return user_service

View File

@ -28,4 +28,3 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from . import test_notifier

View File

@ -30,7 +30,7 @@
"""
from django.test import TestCase
from .. import fixtures
from ..fixtures import notifiers as notifiers_fixtures
from uds.core import messaging
class TestEmailNotifier(TestCase):
@ -58,7 +58,7 @@ class TestEmailNotifier(TestCase):
"""
Test email notifier
"""
notifier = fixtures.notifiers.createEmailNotifier(
notifier = notifiers_fixtures.createEmailNotifier(
host='localhost',
port=self.smtp_server.port,
enableHtml=False

View File

@ -35,7 +35,7 @@ import typing
from uds import models
from .. import test, generators, constants
from ... import fixtures
from ...fixtures import authenticators as authenticators_fixtures, services as service_fixtures
from uds.REST.handlers import AUTH_TOKEN_HEADER
@ -53,28 +53,28 @@ class WEBTestCase(test.UDSTransactionTestCase):
def setUp(self) -> None:
# Set up data for REST Test cases
# First, the authenticator related
self.auth = fixtures.authenticators.createAuthenticator()
self.groups = fixtures.authenticators.createGroups(
self.auth = authenticators_fixtures.createAuthenticator()
self.groups = authenticators_fixtures.createGroups(
self.auth, NUMBER_OF_ITEMS_TO_CREATE
)
# Create some users, one admin, one staff and one user
self.admins = fixtures.authenticators.createUsers(
self.admins = authenticators_fixtures.createUsers(
self.auth,
number_of_users=NUMBER_OF_ITEMS_TO_CREATE,
is_admin=True,
groups=self.groups,
)
self.staffs = fixtures.authenticators.createUsers(
self.staffs = authenticators_fixtures.createUsers(
self.auth,
number_of_users=NUMBER_OF_ITEMS_TO_CREATE,
is_staff=True,
groups=self.groups,
)
self.plain_users = fixtures.authenticators.createUsers(
self.plain_users = authenticators_fixtures.createUsers(
self.auth, number_of_users=NUMBER_OF_ITEMS_TO_CREATE, groups=self.groups
)
self.provider = fixtures.services.createProvider()
self.provider = service_fixtures.createProvider()
def do_login(self, username: str, password: str, authid: str, check: bool = False) -> 'test.UDSHttpResponse':
response = typing.cast(

View File

@ -28,4 +28,3 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from . import user

View File

@ -28,4 +28,3 @@
"""
@author: Adolfo Gómez, dkmaster at dkmon dot com
"""
from . import test_login_logout

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2019 Virtual Cable S.L.
# Copyright (c) 2012-2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -12,7 +12,7 @@
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
@ -148,12 +148,12 @@ class IPAuth(auths.Authenticator):
self.getGroups(ip, gm)
if gm.hasValidGroups() and self.dbAuthenticator().isValidUser(ip, True):
return '''function setVal(element, value) {{
document.getElementById(element).value = value;
}}
setVal("id_user", "{ip}");
setVal("id_password", "{passwd}");
document.getElementById("loginform").submit();'''.format(
return ('function setVal(element, value) {{\n' # nosec: no user input, password is always EMPTY
' document.getElementById(element).value = value;\n'
'}}\n'
'setVal("id_user", "{ip}");\n'
'setVal("id_password", "{passwd}");\n'
'document.getElementById("loginform").submit();\n').format(
ip=ip, passwd=''
)

View File

@ -69,7 +69,6 @@ class CalendarChecker:
def _updateData(self, dtime: datetime.datetime) -> bitarray.bitarray:
logger.debug('Updating %s', dtime)
# Else, update the array
CalendarChecker.updates += 1
data = bitarray.bitarray(60 * 24) # Granurality is minute

View File

@ -35,8 +35,9 @@ import socket
import logging
import typing
NetworkType = typing.Tuple[int, int]
NetworklistType = typing.List[NetworkType]
class NetworkType(typing.NamedTuple):
start: int
end: int
logger = logging.getLogger(__name__)
@ -83,17 +84,11 @@ def longToIp(n: int) -> str:
return '.'.join(q)
except Exception:
return '0.0.0.0' # Invalid values will map to "0.0.0.0"
return '0.0.0.0' # nosec: Invalid values will map to "0.0.0.0"
def networkFromString(strNets: str) -> NetworkType:
return typing.cast(NetworkType, networksFromString(strNets, False))
def networksFromString(
strNets: str, allowMultipleNetworks: bool = True
) -> typing.Union[NetworklistType, NetworkType]:
"""
'''
Parses the network from strings in this forms:
- A.* (or A.*.* or A.*.*.*)
- A.B.* (or A.B.*.* )
@ -102,10 +97,9 @@ def networksFromString(
- A.B.C.D netmask X.X.X.X (i.e. 192.168.0.0 netmask 255.255.255.0)
- A.B.C.D - E.F.G.D (i.e. 192-168.0.0-192.168.0.255)
- A.B.C.D
If allowMultipleNetworks is True, it allows ',' and ';' separators (and, ofc, more than 1 network)
Returns a list of networks tuples in the form [(start1, end1), (start2, end2) ...]
"""
returns a named tuple with networks start and network end
'''
inputString = strNets
logger.debug('Getting network from %s', strNets)
@ -128,17 +122,10 @@ def networksFromString(
v |= 1 << (31 - n)
return v
if allowMultipleNetworks:
res = []
for strNet in re.split('[;,]', strNets):
if strNet != '':
res.append(typing.cast(NetworkType, networksFromString(strNet, False)))
return res
strNets = strNets.replace(' ', '')
if strNets == '*':
return 0, 4294967295
return NetworkType(0, 4294967295)
try:
# Test patterns
@ -152,7 +139,7 @@ def networksFromString(
val = toNum(*m.groups())
bits = maskFromBits(bits)
noBits = ~bits & 0xFFFFFFFF
return val & bits, val | noBits
return NetworkType(val & bits, val | noBits)
m = reMask.match(strNets)
if m is not None:
@ -161,7 +148,7 @@ def networksFromString(
val = toNum(*(m.groups()[0:4]))
bits = toNum(*(m.groups()[4:8]))
noBits = ~bits & 0xFFFFFFFF
return val & bits, val | noBits
return NetworkType(val & bits, val | noBits)
m = reRange.match(strNets)
if m is not None:
@ -171,14 +158,14 @@ def networksFromString(
val2 = toNum(*(m.groups()[4:8]))
if val2 < val:
raise Exception()
return val, val2
return NetworkType(val, val2)
m = reHost.match(strNets)
if m is not None:
logger.debug('Format is a single host')
check(*m.groups())
val = toNum(*m.groups())
return val, val
return NetworkType(val, val)
for v in ((re1Asterisk, 3), (re2Asterisk, 2), (re3Asterisk, 1)):
m = v[0].match(strNets)
@ -187,7 +174,7 @@ def networksFromString(
val = toNum(*(m.groups()[0 : v[1] + 1]))
bits = maskFromBits(v[1] * 8)
noBits = ~bits & 0xFFFFFFFF
return val & bits, val | noBits
return NetworkType(val & bits, val | noBits)
# No pattern recognized, invalid network
raise Exception()
@ -196,15 +183,31 @@ def networksFromString(
raise ValueError(inputString)
def networksFromString(
strNets: str
) -> typing.List[NetworkType]:
"""
If allowMultipleNetworks is True, it allows ',' and ';' separators (and, ofc, more than 1 network)
Returns a list of networks tuples in the form [(start1, end1), (start2, end2) ...]
"""
res = []
for strNet in re.split('[;,]', strNets):
if strNet != '':
res.append(typing.cast(NetworkType, networkFromString(strNet)))
return res
def ipInNetwork(
ip: typing.Union[str, int], network: typing.Union[str, NetworklistType]
ip: typing.Union[str, int], network: typing.Union[str, NetworkType, typing.List[NetworkType]]
) -> bool:
if isinstance(ip, str):
ip = ipToLong(ip)
if isinstance(network, str):
network = typing.cast(NetworklistType, networksFromString(network))
network = [networkFromString(network)]
elif isinstance(network, NetworkType):
network = [network]
for net in typing.cast(NetworklistType, network):
for net in network:
if net[0] <= ip <= net[1]:
return True
return False

View File

@ -68,9 +68,6 @@ def getEffectivePermission(
if user.is_admin:
return PERMISSION_ALL
if not user.staff_member:
return PERMISSION_NONE
# Just check permissions for staff members
# root means for "object type" not for an object
if root is False:

View File

@ -52,7 +52,7 @@ class Calendar(UUIDModel, TaggingMixin):
modified = models.DateTimeField(auto_now=True)
# "fake" declarations for type checking
objects: 'models.manager.Manager[Calendar]'
objects: 'models.manager.Manager["Calendar"]'
rules: 'models.manager.RelatedManager[CalendarRule]'
calendaraction_set: 'models.manager.RelatedManager[CalendarAction]'
calendaraccess_set: 'models.manager.RelatedManager[CalendarAccess]'
@ -78,7 +78,7 @@ class Calendar(UUIDModel, TaggingMixin):
try:
for v in self.calendaraction_set.all():
v.save()
except Exception:
except Exception: # nosec: catch all, we don't want to fail here (if one action cannot be saved, we don't want to fail all)
pass
return res