mirror of
https://github.com/dkmstr/openuds.git
synced 2024-12-21 09:34:08 +03:00
Adding messaging
This commit is contained in:
parent
5ce7d008e6
commit
3405797bb8
3
.gitignore
vendored
3
.gitignore
vendored
@ -64,8 +64,6 @@
|
||||
|
||||
# /server/
|
||||
*_enterprise
|
||||
/server/openuds.sublime-project
|
||||
/server/openuds.sublime-workspace
|
||||
|
||||
# /server/src/
|
||||
/server/src/taskmanager.pid
|
||||
@ -88,7 +86,6 @@
|
||||
# /server/src/uds/
|
||||
/server/src/uds/*_enterprise.py
|
||||
/server/src/uds/fixtures
|
||||
/server/src/uds/tests
|
||||
|
||||
# /server/src/uds/auths/
|
||||
/server/src/uds/auths/*-enterprise
|
||||
|
@ -38,7 +38,7 @@ from uds.models import Notifier
|
||||
from uds.core import messaging
|
||||
from uds.core.ui import gui
|
||||
from uds.core.util import permissions
|
||||
from uds.core.util import os_detector as OsDetector
|
||||
from uds.core.managers import notifications
|
||||
|
||||
from uds.REST.model import ModelHandler
|
||||
|
||||
@ -55,7 +55,6 @@ class Notifiers(ModelHandler):
|
||||
'name',
|
||||
'comments',
|
||||
'tags',
|
||||
'label',
|
||||
]
|
||||
|
||||
table_title = _('Notifiers')
|
||||
@ -92,4 +91,3 @@ class Notifiers(ModelHandler):
|
||||
'type_name': type_.name(),
|
||||
'permission': permissions.getEffectivePermission(self._user, item),
|
||||
}
|
||||
|
||||
|
@ -262,7 +262,7 @@ class SAMLAuthenticator(auths.Authenticator):
|
||||
gettext('Invalid private key. ') + str(e)
|
||||
)
|
||||
|
||||
request = values['_request']
|
||||
request: 'ExtendedHttpRequest' = values['_request']
|
||||
|
||||
if self.entityID.value == '':
|
||||
self.entityID.value = request.build_absolute_uri(self.infoUrl())
|
||||
|
@ -69,13 +69,13 @@ class NotificationsManager(metaclass=singleton.Singleton):
|
||||
def registerGroup(self, group: str) -> None:
|
||||
"""
|
||||
Registers a new group.
|
||||
This is used to group notifications, so that we can send them to a group of users.
|
||||
This is used to group notifications
|
||||
"""
|
||||
pass
|
||||
|
||||
def registerIdentificator(self, group: str, identificator: str) -> None:
|
||||
"""
|
||||
Registers a new identificator.
|
||||
This is used to group notifications, so that we can send them to a group of users.
|
||||
This is used to identify notifications
|
||||
"""
|
||||
pass
|
@ -29,7 +29,7 @@
|
||||
"""
|
||||
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from .provider import Notifier
|
||||
from .provider import Notifier, NotificationLevel
|
||||
from .factory import NotifierFactory
|
||||
|
||||
def factory() -> NotifierFactory:
|
||||
|
@ -111,6 +111,22 @@ class Notifier(ManagedObjectModel, TaggingMixin):
|
||||
db_table = 'uds_notify_prov'
|
||||
app_label = 'uds'
|
||||
|
||||
def getType(self) -> typing.Type['NotificationProviderModule']:
|
||||
"""
|
||||
Get the type of the object this record represents.
|
||||
|
||||
The type is Python type, it obtains this type from ServiceProviderFactory and associated record field.
|
||||
|
||||
Returns:
|
||||
The python type for this record object
|
||||
"""
|
||||
from uds.core import messaging # pylint: disable=redefined-outer-name
|
||||
|
||||
kind_ = messaging.factory().lookup(self.data_type)
|
||||
if kind_ is None:
|
||||
raise Exception('Notifier type not found: {0}'.format(self.data_type))
|
||||
return kind_
|
||||
|
||||
def getInstance(
|
||||
self, values: typing.Optional[typing.Dict[str, str]] = None
|
||||
) -> 'NotificationProviderModule':
|
||||
|
@ -30,13 +30,15 @@
|
||||
"""
|
||||
.. moduleauthor:: Adolfo Gómez, dkmaster at dkmon dot com
|
||||
"""
|
||||
from cProfile import label
|
||||
import logging
|
||||
import smtplib, ssl
|
||||
from email.mime.text import MIMEText
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
import typing
|
||||
|
||||
from django.utils.translation import gettext_noop as _
|
||||
|
||||
from uds.core.messaging import Notifier
|
||||
from uds.core import messaging
|
||||
from uds.core.ui import gui
|
||||
from uds.core.util import validators
|
||||
|
||||
@ -47,7 +49,7 @@ if typing.TYPE_CHECKING:
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class EmailNotifier(Notifier):
|
||||
class EmailNotifier(messaging.Notifier):
|
||||
"""
|
||||
Email notifier
|
||||
"""
|
||||
@ -101,7 +103,7 @@ class EmailNotifier(Notifier):
|
||||
label=_('Username'),
|
||||
order=9,
|
||||
tooltip=_('User with access to SMTP server'),
|
||||
required=True,
|
||||
required=False,
|
||||
defvalue='',
|
||||
tab=_('SMTP Server'),
|
||||
)
|
||||
@ -110,7 +112,7 @@ class EmailNotifier(Notifier):
|
||||
label=_('Password'),
|
||||
order=10,
|
||||
tooltip=_('Password of the user with access to SMTP server'),
|
||||
required=True,
|
||||
required=False,
|
||||
defvalue='',
|
||||
tab=_('SMTP Server'),
|
||||
)
|
||||
@ -153,7 +155,7 @@ class EmailNotifier(Notifier):
|
||||
# if hostname is not valid, we will raise an exception
|
||||
hostname = self.hostname.cleanStr()
|
||||
if not hostname:
|
||||
raise Notifier.ValidationException(_('Invalid SMTP hostname'))
|
||||
raise messaging.Notifier.ValidationException(_('Invalid SMTP hostname'))
|
||||
|
||||
# Now check is valid format
|
||||
if ':' in hostname:
|
||||
@ -169,3 +171,63 @@ class EmailNotifier(Notifier):
|
||||
|
||||
# Done
|
||||
|
||||
def notify(self, group: str, identificator: str, level: messaging.NotificationLevel, message: str) -> None:
|
||||
# Send and email with the notification
|
||||
with self.login() as smtp:
|
||||
try:
|
||||
# Create message container
|
||||
msg = MIMEMultipart('alternative')
|
||||
msg['Subject'] = '{} - {}'.format(group, identificator)
|
||||
msg['From'] = self.fromEmail.value
|
||||
msg['To'] = self.toEmail.value
|
||||
|
||||
part1 = MIMEText(message, 'plain')
|
||||
part2 = MIMEText(message, 'html')
|
||||
|
||||
msg.attach(part1)
|
||||
|
||||
if self.enableHTML.value:
|
||||
msg.attach(part2)
|
||||
|
||||
smtp.sendmail(self.fromEmail.value, self.toEmail.value, msg.as_string())
|
||||
except smtplib.SMTPException as e:
|
||||
logger.error('Error sending email: {}'.format(e))
|
||||
|
||||
|
||||
|
||||
def login(self) -> smtplib.SMTP:
|
||||
"""
|
||||
Login to SMTP server
|
||||
"""
|
||||
host = self.hostname.cleanStr()
|
||||
if ':' in host:
|
||||
host, ports = host.split(':')
|
||||
port = int(ports)
|
||||
else:
|
||||
port = None
|
||||
|
||||
if self.security.value in ('tls', 'ssl'):
|
||||
context = ssl.create_default_context()
|
||||
context.check_hostname = False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
if self.security.value == 'tls':
|
||||
if port:
|
||||
smtp = smtplib.SMTP(host, port,)
|
||||
else:
|
||||
smtp = smtplib.SMTP(host)
|
||||
smtp.starttls(context=context)
|
||||
else:
|
||||
if port:
|
||||
smtp = smtplib.SMTP_SSL(host, port, context=context)
|
||||
else:
|
||||
smtp = smtplib.SMTP_SSL(host, context=context)
|
||||
else:
|
||||
if port:
|
||||
smtp = smtplib.SMTP(host, port)
|
||||
else:
|
||||
smtp = smtplib.SMTP(host)
|
||||
|
||||
if self.username.value and self.password.value:
|
||||
smtp.login(self.username.value, self.password.value)
|
||||
|
||||
return smtp
|
||||
|
@ -1 +0,0 @@
|
||||
from .login import *
|
1
server/src/uds/tests/REST/__init__.py
Normal file
1
server/src/uds/tests/REST/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .login_logout import *
|
72
server/src/uds/tests/REST/login_logout.py
Normal file
72
server/src/uds/tests/REST/login_logout.py
Normal file
@ -0,0 +1,72 @@
|
||||
import random
|
||||
import typing
|
||||
|
||||
|
||||
from django.test import TestCase
|
||||
from django.test.client import Client
|
||||
from django.conf import settings
|
||||
|
||||
from uds.tests import fixtures, tools
|
||||
|
||||
|
||||
class RESTLoginLogoutCase(TestCase):
|
||||
"""
|
||||
Test login and logout
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.client = tools.getClient()
|
||||
|
||||
def test_login_logout(self):
|
||||
"""
|
||||
Test login and logout
|
||||
"""
|
||||
auth = fixtures.authenticators.createAuthenticator()
|
||||
# Create some ramdom users
|
||||
admins = fixtures.authenticators.createUsers(
|
||||
auth, number_of_users=8, is_admin=True
|
||||
)
|
||||
stafs = fixtures.authenticators.createUsers(
|
||||
auth, number_of_users=8, is_staff=True
|
||||
)
|
||||
users = fixtures.authenticators.createUsers(auth, number_of_users=8)
|
||||
|
||||
# Create some groups
|
||||
groups = fixtures.authenticators.createGroups(auth, number_of_groups=32)
|
||||
|
||||
# Add users to some groups, ramdomly
|
||||
for user in users + admins + stafs:
|
||||
for group in random.sample(groups, random.randint(1, len(groups))):
|
||||
user.groups.add(group)
|
||||
|
||||
# All users, admin and staff must be able to login
|
||||
for user in users + admins + stafs:
|
||||
response = self.invokeLogin(auth.uuid, user.name, user.name, 200, 'user')
|
||||
self.assertEqual(
|
||||
response['result'], 'ok', 'Login user {}'.format(user.name)
|
||||
)
|
||||
self.assertIsNotNone(response['token'], 'Login user {}'.format(user.name))
|
||||
self.assertIsNotNone(response['version'], 'Login user {}'.format(user.name))
|
||||
self.assertIsNotNone(
|
||||
response['scrambler'], 'Login user {}'.format(user.name)
|
||||
)
|
||||
|
||||
def invokeLogin(
|
||||
self, auth_id: str, username: str, password: str, expectedResponse, what: str
|
||||
) -> typing.Mapping[str, typing.Any]:
|
||||
response = self.client.post(
|
||||
'/uds/rest/auth/login',
|
||||
{
|
||||
'auth_id': auth_id,
|
||||
'username': username,
|
||||
'password': password,
|
||||
},
|
||||
content_type='application/json',
|
||||
)
|
||||
self.assertEqual(
|
||||
response.status_code, expectedResponse, 'Login {}'.format(what)
|
||||
)
|
||||
if response.status_code == 200:
|
||||
return response.json()
|
||||
|
||||
return {}
|
2
server/src/uds/tests/__init__.py
Normal file
2
server/src/uds/tests/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
from .REST import *
|
||||
from .messaging import *
|
3
server/src/uds/tests/fixtures/__init__.py
vendored
Normal file
3
server/src/uds/tests/fixtures/__init__.py
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
from uds.models import notifications
|
||||
from . import authenticators
|
||||
from . import notifiers
|
98
server/src/uds/tests/fixtures/authenticators.py
vendored
Normal file
98
server/src/uds/tests/fixtures/authenticators.py
vendored
Normal file
@ -0,0 +1,98 @@
|
||||
import typing
|
||||
|
||||
from uds import models
|
||||
from uds.core.util import states
|
||||
from uds.core.managers.crypto import CryptoManager
|
||||
|
||||
# Counters so we can reinvoke the same method and generate new data
|
||||
glob = {
|
||||
'user_id': 0,
|
||||
'group_id': 0
|
||||
}
|
||||
|
||||
def createAuthenticator(
|
||||
authenticator: typing.Optional[models.Authenticator] = None,
|
||||
) -> models.Authenticator:
|
||||
"""
|
||||
Creates a sample authenticator
|
||||
"""
|
||||
if authenticator is None:
|
||||
from uds.auths.InternalDB.authenticator import InternalDBAuth
|
||||
|
||||
authenticator = models.Authenticator()
|
||||
authenticator.name = 'Testing authenticator'
|
||||
authenticator.comments = 'Tesging authenticator'
|
||||
authenticator.data_type = InternalDBAuth.typeType
|
||||
authenticator.data = authenticator.getInstance().serialize()
|
||||
authenticator.save()
|
||||
|
||||
return authenticator
|
||||
|
||||
|
||||
def createUsers(
|
||||
authenticator: models.Authenticator,
|
||||
number_of_users: int = 1,
|
||||
is_staff: bool = False,
|
||||
is_admin: bool = False,
|
||||
enabled: bool = True,
|
||||
) -> typing.List[models.User]:
|
||||
"""
|
||||
Creates some ramdon users
|
||||
password is same as username
|
||||
"""
|
||||
users = [
|
||||
authenticator.users.create(
|
||||
name='user{}'.format(i),
|
||||
password=CryptoManager().hash('user{}'.format(i)),
|
||||
real_name='Real name {}'.format(i),
|
||||
comments='User {}'.format(i),
|
||||
staff_member=is_staff or is_admin,
|
||||
is_admin=is_admin,
|
||||
state=states.common.ACTIVE if enabled else states.common.BLOCKED,
|
||||
)
|
||||
for i in range(glob['user_id'], glob['user_id'] + number_of_users)
|
||||
]
|
||||
glob['user_id'] += number_of_users
|
||||
|
||||
return users
|
||||
|
||||
|
||||
def createGroups(
|
||||
authenticator: models.Authenticator, number_of_groups: int = 1
|
||||
) -> typing.List[models.Group]:
|
||||
"""
|
||||
Creates a sample authenticator
|
||||
"""
|
||||
groups = [
|
||||
authenticator.groups.create(
|
||||
name='Group {}'.format(i),
|
||||
comments='Group {}'.format(i),
|
||||
is_meta=False,
|
||||
)
|
||||
for i in range(glob['group_id'], glob['group_id'] + number_of_groups)
|
||||
]
|
||||
|
||||
glob['group_id'] += number_of_groups
|
||||
|
||||
return groups
|
||||
|
||||
|
||||
def createMetaGroups(
|
||||
authenticator: models.Authenticator, number_of_meta: int = 1
|
||||
) -> typing.List[models.Group]:
|
||||
"""
|
||||
Creates a sample authenticator
|
||||
"""
|
||||
meta_groups = [
|
||||
authenticator.groups.create(
|
||||
name='Meta group {}'.format(i),
|
||||
comments='Meta group {}'.format(i),
|
||||
is_meta=True,
|
||||
meta_if_any=i % 2 == 0,
|
||||
)
|
||||
for i in range(glob['group_id'], glob['group_id'] + number_of_meta)
|
||||
]
|
||||
|
||||
glob['group_id'] += number_of_meta
|
||||
|
||||
return meta_groups
|
42
server/src/uds/tests/fixtures/notifiers.py
vendored
Normal file
42
server/src/uds/tests/fixtures/notifiers.py
vendored
Normal file
@ -0,0 +1,42 @@
|
||||
import typing
|
||||
|
||||
from uds import models
|
||||
from uds.core.util import states
|
||||
from uds.core.managers.crypto import CryptoManager
|
||||
|
||||
# Counters so we can reinvoke the same method and generate new data
|
||||
glob = {'user_id': 0, 'group_id': 0}
|
||||
|
||||
|
||||
def createEmailNotifier(
|
||||
host: typing.Optional[str] = None,
|
||||
port: int = 0,
|
||||
username: typing.Optional[str] = None,
|
||||
password: typing.Optional[str] = None,
|
||||
fromEmail: typing.Optional[str] = None,
|
||||
toEmail: typing.Optional[str] = None,
|
||||
enableHtml: bool = False,
|
||||
security: typing.Optional[str] = None,
|
||||
) -> models.Notifier:
|
||||
from uds.notifiers.email.notifier import EmailNotifier
|
||||
|
||||
notifier = models.Notifier()
|
||||
notifier.name = 'Testing email notifier'
|
||||
notifier.comments = 'Testing email notifier'
|
||||
notifier.data_type = EmailNotifier.typeType
|
||||
instance: EmailNotifier = typing.cast(EmailNotifier, notifier.getInstance())
|
||||
# Fill up fields
|
||||
instance.hostname.value = (host or 'localhost') + (
|
||||
'' if port == 0 else ':' + str(port)
|
||||
)
|
||||
instance.username.value = username or ''
|
||||
instance.password.value = password or ''
|
||||
instance.fromEmail.value = fromEmail or 'from@email.com'
|
||||
instance.toEmail.value = toEmail or 'to@email.com'
|
||||
instance.enableHTML.value = enableHtml
|
||||
instance.security.value = security or 'none'
|
||||
# Save
|
||||
notifier.data = instance.serialize()
|
||||
notifier.save()
|
||||
|
||||
return notifier
|
1
server/src/uds/tests/messaging/__init__.py
Normal file
1
server/src/uds/tests/messaging/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .notifier import *
|
49
server/src/uds/tests/messaging/notifier.py
Normal file
49
server/src/uds/tests/messaging/notifier.py
Normal file
@ -0,0 +1,49 @@
|
||||
import random
|
||||
import typing
|
||||
|
||||
|
||||
from django.test import TestCase
|
||||
from django.test.client import Client
|
||||
from django.conf import settings
|
||||
|
||||
from uds.tests import fixtures, tools
|
||||
from uds.core import messaging
|
||||
|
||||
class TestEmailNotifier(TestCase):
|
||||
"""
|
||||
Test Email Notifier
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
# Setup smtp server
|
||||
from aiosmtpd.controller import Controller
|
||||
from aiosmtpd.handlers import Debugging
|
||||
|
||||
self.smtp_server = Controller(
|
||||
handler=Debugging(),
|
||||
hostname='localhost',
|
||||
port=1025,
|
||||
)
|
||||
self.smtp_server.start()
|
||||
|
||||
def tearDown(self):
|
||||
# Stop smtp debug server
|
||||
self.smtp_server.stop()
|
||||
|
||||
def test_email_notifier(self):
|
||||
"""
|
||||
Test email notifier
|
||||
"""
|
||||
notifier = fixtures.notifiers.createEmailNotifier(
|
||||
host='localhost',
|
||||
port=self.smtp_server.port,
|
||||
enableHtml=False
|
||||
)
|
||||
|
||||
notifier.getInstance().notify(
|
||||
'Group',
|
||||
'Identificator',
|
||||
messaging.NotificationLevel.CRITICAL,
|
||||
'Test message cañón',
|
||||
)
|
||||
|
36
server/src/uds/tests/tools.py
Normal file
36
server/src/uds/tests/tools.py
Normal file
@ -0,0 +1,36 @@
|
||||
import logging
|
||||
import typing
|
||||
|
||||
from django.test.client import Client
|
||||
from django.conf import settings
|
||||
|
||||
from uds.core.managers.crypto import CryptoManager
|
||||
|
||||
|
||||
def getClient() -> Client:
|
||||
# Ensure enterprise middleware is not enabled if it exists...
|
||||
settings.MIDDLEWARE = [
|
||||
'django.contrib.sessions.middleware.SessionMiddleware',
|
||||
'django.middleware.locale.LocaleMiddleware',
|
||||
'django.middleware.common.CommonMiddleware',
|
||||
'django.middleware.csrf.CsrfViewMiddleware',
|
||||
'django.contrib.messages.middleware.MessageMiddleware',
|
||||
'uds.core.util.middleware.request.GlobalRequestMiddleware',
|
||||
]
|
||||
|
||||
client = Client()
|
||||
client.cookies['uds'] = CryptoManager().randomString(48)
|
||||
|
||||
# Patch the client to include
|
||||
# HTTP_USER_AGENT='Testing user agent',
|
||||
# GET, POST, PUT, DELETE methods
|
||||
_oldRequest = client.request
|
||||
|
||||
def _request(**kwargs):
|
||||
if 'HTTP_USER_AGENT' not in kwargs:
|
||||
kwargs['HTTP_USER_AGENT'] = 'Testing user agent'
|
||||
return _oldRequest(**kwargs) # type: ignore
|
||||
|
||||
client.request = _request
|
||||
|
||||
return client
|
1
server/src/uds/tests/web/__init__.py
Normal file
1
server/src/uds/tests/web/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .user import *
|
1
server/src/uds/tests/web/user/__init__.py
Normal file
1
server/src/uds/tests/web/user/__init__.py
Normal file
@ -0,0 +1 @@
|
||||
from .login_logout import *
|
138
server/src/uds/tests/web/user/login_logout.py
Normal file
138
server/src/uds/tests/web/user/login_logout.py
Normal file
@ -0,0 +1,138 @@
|
||||
import random
|
||||
import typing
|
||||
|
||||
|
||||
from django.test import TestCase, TransactionTestCase
|
||||
from django.db import transaction
|
||||
|
||||
from uds.tests import fixtures, tools
|
||||
|
||||
if typing.TYPE_CHECKING:
|
||||
from django.http import HttpResponse
|
||||
|
||||
from uds import models
|
||||
|
||||
class WebLoginLogoutCase(TransactionTestCase):
|
||||
"""
|
||||
Test login and logout
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.client = tools.getClient()
|
||||
|
||||
def assertInvalidLogin(self, response: 'HttpResponse') -> None:
|
||||
# Returns login page with a message on uds.js
|
||||
self.assertContains(response, '<svg', status_code=200)
|
||||
# Fetch uds.js
|
||||
response = typing.cast('HttpResponse', self.client.get('/uds/utility/uds.js'))
|
||||
self.assertContains(response, '"errors": ["Access denied"]', status_code=200)
|
||||
|
||||
def do_login(self, username: str, password: str, authid: str) -> 'HttpResponse':
|
||||
return typing.cast(
|
||||
'HttpResponse',
|
||||
self.client.post(
|
||||
'/uds/page/login',
|
||||
{
|
||||
'user': username,
|
||||
'password': password,
|
||||
'authenticator': authid,
|
||||
},
|
||||
),
|
||||
)
|
||||
|
||||
def test_login_logout_success(self):
|
||||
"""
|
||||
Test login and logout
|
||||
"""
|
||||
auth = fixtures.authenticators.createAuthenticator()
|
||||
# Create some ramdom users
|
||||
admins = fixtures.authenticators.createUsers(
|
||||
auth, number_of_users=8, is_admin=True
|
||||
)
|
||||
stafs = fixtures.authenticators.createUsers(
|
||||
auth, number_of_users=8, is_staff=True
|
||||
)
|
||||
users = fixtures.authenticators.createUsers(auth, number_of_users=8)
|
||||
|
||||
# Create some groups
|
||||
groups = fixtures.authenticators.createGroups(auth, number_of_groups=32)
|
||||
|
||||
# Add users to some groups, ramdomly
|
||||
for user in users + admins + stafs:
|
||||
for group in random.sample(groups, random.randint(1, len(groups))):
|
||||
user.groups.add(group)
|
||||
|
||||
# All users, admin and staff must be able to login
|
||||
for num, user in enumerate(users + admins + stafs, start=1):
|
||||
response = self.do_login(user.name, user.name, auth.uuid)
|
||||
self.assertRedirects(response, '/uds/page/services', status_code=302)
|
||||
# Now invoke logout
|
||||
response = typing.cast('HttpResponse', self.client.get('/uds/page/logout'))
|
||||
self.assertRedirects(
|
||||
response, 'http://testserver/uds/page/login', status_code=302
|
||||
)
|
||||
# Ensures a couple of logs are created for every operation
|
||||
self.assertEqual(models.Log.objects.count(), num*4)
|
||||
|
||||
|
||||
def test_login_valid_user_no_group(self):
|
||||
user = fixtures.authenticators.createUsers(
|
||||
fixtures.authenticators.createAuthenticator(),
|
||||
)[0]
|
||||
|
||||
response = self.do_login(user.name, user.name, user.manager.uuid)
|
||||
self.assertInvalidLogin(response)
|
||||
|
||||
self.assertEqual(models.Log.objects.count(), 2)
|
||||
|
||||
user = fixtures.authenticators.createUsers(
|
||||
fixtures.authenticators.createAuthenticator(),
|
||||
is_staff=True,
|
||||
)[0]
|
||||
|
||||
response = self.do_login(user.name, user.name, user.manager.uuid)
|
||||
self.assertInvalidLogin(response)
|
||||
|
||||
self.assertEqual(models.Log.objects.count(), 4)
|
||||
|
||||
user = fixtures.authenticators.createUsers(
|
||||
fixtures.authenticators.createAuthenticator(),
|
||||
is_admin=True,
|
||||
)[0]
|
||||
|
||||
response = self.do_login(user.name, user.name, user.manager.uuid)
|
||||
self.assertInvalidLogin(response)
|
||||
|
||||
self.assertEqual(models.Log.objects.count(), 6)
|
||||
|
||||
|
||||
def test_login_invalid_user(self):
|
||||
user = fixtures.authenticators.createUsers(
|
||||
fixtures.authenticators.createAuthenticator(),
|
||||
)[0]
|
||||
|
||||
response = self.do_login(user.name, 'wrong password', user.manager.uuid)
|
||||
self.assertInvalidLogin(response)
|
||||
|
||||
# Invalid password log & access denied, in auth and user log
|
||||
self.assertEqual(models.Log.objects.count(), 4)
|
||||
|
||||
user = fixtures.authenticators.createUsers(
|
||||
fixtures.authenticators.createAuthenticator(),
|
||||
is_staff=True,
|
||||
)[0]
|
||||
|
||||
response = self.do_login(user.name, 'wrong password', user.manager.uuid)
|
||||
self.assertInvalidLogin(response)
|
||||
|
||||
self.assertEqual(models.Log.objects.count(), 8)
|
||||
|
||||
user = fixtures.authenticators.createUsers(
|
||||
fixtures.authenticators.createAuthenticator(),
|
||||
is_admin=True,
|
||||
)[0]
|
||||
|
||||
response = self.do_login(user.name, 'wrong password', user.manager.uuid)
|
||||
self.assertInvalidLogin(response)
|
||||
|
||||
self.assertEqual(models.Log.objects.count(), 12)
|
Loading…
Reference in New Issue
Block a user