1
0
mirror of https://github.com/ansible/awx.git synced 2024-11-01 08:21:15 +03:00

Merge pull request #1243 from anoek/rbac

Multi-org elimination for projects; Various test case cleanup for org users/admins removal
This commit is contained in:
Wayne Witzel III 2016-03-15 09:05:10 -04:00
commit 86afc7b24d
18 changed files with 220 additions and 78 deletions

View File

@ -924,7 +924,6 @@ class ProjectSerializer(UnifiedJobTemplateSerializer, ProjectOptionsSerializer):
def get_related(self, obj):
res = super(ProjectSerializer, self).get_related(obj)
res.update(dict(
organizations = reverse('api:project_organizations_list', args=(obj.pk,)),
teams = reverse('api:project_teams_list', args=(obj.pk,)),
playbooks = reverse('api:project_playbooks', args=(obj.pk,)),
update = reverse('api:project_update_view', args=(obj.pk,)),
@ -936,6 +935,9 @@ class ProjectSerializer(UnifiedJobTemplateSerializer, ProjectOptionsSerializer):
notifiers_error = reverse('api:project_notifiers_error_list', args=(obj.pk,)),
access_list = reverse('api:project_access_list', args=(obj.pk,)),
))
if obj.organization:
res['organization'] = reverse('api:organization_detail',
args=(obj.organization.pk,))
# Backwards compatibility.
if obj.current_update:
res['current_update'] = reverse('api:project_update_detail',

View File

@ -44,7 +44,6 @@ project_urls = patterns('awx.api.views',
url(r'^$', 'project_list'),
url(r'^(?P<pk>[0-9]+)/$', 'project_detail'),
url(r'^(?P<pk>[0-9]+)/playbooks/$', 'project_playbooks'),
url(r'^(?P<pk>[0-9]+)/organizations/$', 'project_organizations_list'),
url(r'^(?P<pk>[0-9]+)/teams/$', 'project_teams_list'),
url(r'^(?P<pk>[0-9]+)/update/$', 'project_update_view'),
url(r'^(?P<pk>[0-9]+)/project_updates/$', 'project_updates_list'),

View File

@ -829,13 +829,6 @@ class ProjectPlaybooks(RetrieveAPIView):
model = Project
serializer_class = ProjectPlaybooksSerializer
class ProjectOrganizationsList(SubListCreateAttachDetachAPIView):
model = Organization
serializer_class = OrganizationSerializer
parent_model = Project
relationship = 'organizations'
class ProjectTeamsList(SubListCreateAttachDetachAPIView):
model = Team

View File

@ -1046,7 +1046,7 @@ class JobHostSummaryAccess(BaseAccess):
model = JobHostSummary
def get_queryset(self):
qs = self.model.accessible_objects(self.user, {'read':True})
qs = self.model.objects
qs = qs.select_related('job', 'job__job_template', 'host')
if self.user.is_superuser:
return qs
@ -1071,7 +1071,7 @@ class JobEventAccess(BaseAccess):
model = JobEvent
def get_queryset(self):
qs = self.model.accessible_objects(self.user, {'read':True})
qs = self.model.objects
qs = qs.select_related('job', 'job__job_template', 'host', 'parent')
qs = qs.prefetch_related('hosts', 'children')
@ -1108,7 +1108,7 @@ class UnifiedJobTemplateAccess(BaseAccess):
model = UnifiedJobTemplate
def get_queryset(self):
qs = self.model.accessible_objects(self.user, {'read':True})
qs = self.model.objects
project_qs = self.user.get_queryset(Project).filter(scm_type__in=[s[0] for s in Project.SCM_TYPE_CHOICES])
inventory_source_qs = self.user.get_queryset(InventorySource).filter(source__in=CLOUD_INVENTORY_SOURCES)
job_template_qs = self.user.get_queryset(JobTemplate)
@ -1140,7 +1140,7 @@ class UnifiedJobAccess(BaseAccess):
model = UnifiedJob
def get_queryset(self):
qs = self.model.accessible_objects(self.user, {'read':True})
qs = self.model.objects
project_update_qs = self.user.get_queryset(ProjectUpdate)
inventory_update_qs = self.user.get_queryset(InventoryUpdate).filter(source__in=CLOUD_INVENTORY_SOURCES)
job_qs = self.user.get_queryset(Job)
@ -1261,7 +1261,7 @@ class ActivityStreamAccess(BaseAccess):
model = ActivityStream
def get_queryset(self):
qs = self.model.accessible_objects(self.user, {'read':True})
qs = self.model.objects
qs = qs.select_related('actor')
qs = qs.prefetch_related('organization', 'user', 'inventory', 'host', 'group', 'inventory_source',
'inventory_update', 'credential', 'team', 'project', 'project_update',

View File

@ -200,4 +200,19 @@ class Migration(migrations.Migration):
name='rolepermission',
index_together=set([('content_type', 'object_id')]),
),
migrations.RenameField(
model_name='organization',
old_name='projects',
new_name='deprecated_projects',
),
migrations.AlterField(
model_name='organization',
name='deprecated_projects',
field=models.ManyToManyField(related_name='deprecated_organizations', to='main.Project', blank=True),
),
migrations.AddField(
model_name='project',
name='organization',
field=models.ForeignKey(related_name='projects', to='main.Organization', blank=True, null=True),
),
]

View File

@ -694,9 +694,9 @@ class ProjectAccess(BaseAccess):
if self.user.is_superuser:
return qs
team_ids = set(Team.objects.filter(deprecated_users__in=[self.user]).values_list('id', flat=True))
qs = qs.filter(Q(created_by=self.user, organizations__isnull=True) |
Q(organizations__deprecated_admins__in=[self.user], organizations__active=True) |
Q(organizations__deprecated_users__in=[self.user], organizations__active=True) |
qs = qs.filter(Q(created_by=self.user, deprecated_organizations__isnull=True) |
Q(deprecated_organizations__deprecated_admins__in=[self.user], deprecated_organizations__active=True) |
Q(deprecated_organizations__deprecated_users__in=[self.user], deprecated_organizations__active=True) |
Q(teams__in=team_ids))
allowed_deploy = [PERM_JOBTEMPLATE_CREATE, PERM_INVENTORY_DEPLOY]
allowed_check = [PERM_JOBTEMPLATE_CREATE, PERM_INVENTORY_DEPLOY, PERM_INVENTORY_CHECK]
@ -726,9 +726,9 @@ class ProjectAccess(BaseAccess):
def can_change(self, obj, data):
if self.user.is_superuser:
return True
if obj.created_by == self.user and not obj.organizations.filter(active=True).count():
if obj.created_by == self.user and not obj.deprecated_organizations.filter(active=True).count():
return True
if obj.organizations.filter(active=True, deprecated_admins__in=[self.user]).exists():
if obj.deprecated_organizations.filter(active=True, deprecated_admins__in=[self.user]).exists():
return True
return False
@ -880,7 +880,7 @@ class JobTemplateAccess(BaseAccess):
Q(cloud_credential_id__in=credential_ids) | Q(cloud_credential__isnull=True),
)
org_admin_ids = base_qs.filter(
Q(project__organizations__deprecated_admins__in=[self.user]) |
Q(project__deprecated_organizations__deprecated_admins__in=[self.user]) |
(Q(project__isnull=True) & Q(job_type=PERM_INVENTORY_SCAN) & Q(inventory__organization__deprecated_admins__in=[self.user]))
)
@ -1097,7 +1097,7 @@ class JobAccess(BaseAccess):
credential_id__in=credential_ids,
)
org_admin_ids = base_qs.filter(
Q(project__organizations__deprecated_admins__in=[self.user]) |
Q(project__deprecated_organizations__deprecated_admins__in=[self.user]) |
(Q(project__isnull=True) & Q(job_type=PERM_INVENTORY_SCAN) & Q(inventory__organization__deprecated_admins__in=[self.user]))
)

View File

@ -131,9 +131,49 @@ def migrate_projects(apps, schema_editor):
Project = apps.get_model('main', 'Project')
Permission = apps.get_model('main', 'Permission')
JobTemplate = apps.get_model('main', 'JobTemplate')
for project in Project.objects.all():
if project.organizations.count() == 0 and project.created_by is not None:
# Migrate projects to single organizations, duplicating as necessary
for project in [p for p in Project.objects.all()]:
original_project_name = project.name
project_orgs = project.deprecated_organizations.distinct().all()
if project_orgs.count() > 1:
first_org = None
for org in project_orgs:
if first_org is None:
# For the first org, re-use our existing Project object, so don't do the below duplication effort
first_org = org
project.name = first_org.name + ' - ' + original_project_name
project.organization = first_org
project.save()
else:
new_prj = Project.objects.create(
created = project.created,
description = project.description,
name = org.name + ' - ' + original_project_name,
old_pk = project.old_pk,
created_by_id = project.created_by_id,
scm_type = project.scm_type,
scm_url = project.scm_url,
scm_branch = project.scm_branch,
scm_clean = project.scm_clean,
scm_delete_on_update = project.scm_delete_on_update,
scm_delete_on_next_update = project.scm_delete_on_next_update,
scm_update_on_launch = project.scm_update_on_launch,
scm_update_cache_timeout = project.scm_update_cache_timeout,
credential = project.credential,
organization = org
)
migrations[original_project_name]['projects'].add(new_prj)
job_templates = JobTemplate.objects.filter(inventory__organization=org).all()
for jt in job_templates:
jt.project = new_prj
jt.save()
# Migrate permissions
for project in [p for p in Project.objects.all()]:
if project.organization is None and project.created_by is not None:
project.admin_role.members.add(project.created_by)
migrations[project.name]['users'].add(project.created_by)
@ -141,11 +181,10 @@ def migrate_projects(apps, schema_editor):
team.member_role.children.add(project.member_role)
migrations[project.name]['teams'].add(team)
if project.organizations.count() > 0:
for org in project.organizations.all():
for user in org.deprecated_users.all():
project.member_role.members.add(user)
migrations[project.name]['users'].add(user)
if project.organization is not None:
for user in project.organization.deprecated_users.all():
project.member_role.members.add(user)
migrations[project.name]['users'].add(user)
for perm in Permission.objects.filter(project=project, active=True):
# All perms at this level just imply a user or team can read

View File

@ -362,9 +362,9 @@ class JobTemplate(UnifiedJobTemplate, JobOptions, ResourceMixin):
success_notifiers = list(base_notifiers.filter(unifiedjobtemplate_notifiers_for_success__in=[self, self.project]))
any_notifiers = list(base_notifiers.filter(unifiedjobtemplate_notifiers_for_any__in=[self, self.project]))
# Get Organization Notifiers
error_notifiers = set(error_notifiers + list(base_notifiers.filter(organization_notifiers_for_errors__in=self.project.organizations.all())))
success_notifiers = set(success_notifiers + list(base_notifiers.filter(organization_notifiers_for_success__in=self.project.organizations.all())))
any_notifiers = set(any_notifiers + list(base_notifiers.filter(organization_notifiers_for_any__in=self.project.organizations.all())))
error_notifiers = set(error_notifiers + list(base_notifiers.filter(organization_notifiers_for_errors=self.project.organization)))
success_notifiers = set(success_notifiers + list(base_notifiers.filter(organization_notifiers_for_success=self.project.organization)))
any_notifiers = set(any_notifiers + list(base_notifiers.filter(organization_notifiers_for_any=self.project.organization)))
return dict(error=list(error_notifiers), success=list(success_notifiers), any=list(any_notifiers))
class Job(UnifiedJob, JobOptions):

View File

@ -48,10 +48,10 @@ class Organization(CommonModel, NotificationFieldsModel, ResourceMixin):
blank=True,
related_name='admin_of_organizations',
)
projects = models.ManyToManyField(
deprecated_projects = models.ManyToManyField(
'Project',
blank=True,
related_name='organizations',
related_name='deprecated_organizations',
)
admin_role = ImplicitRoleField(
role_name='Organization Administrator',

View File

@ -198,6 +198,13 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin):
app_label = 'main'
ordering = ('id',)
organization = models.ForeignKey(
'Organization',
blank=True,
null=True,
on_delete=models.CASCADE,
related_name='projects',
)
scm_delete_on_next_update = models.BooleanField(
default=False,
editable=False,
@ -212,13 +219,13 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin):
admin_role = ImplicitRoleField(
role_name='Project Administrator',
role_description='May manage this project',
parent_role='organizations.admin_role',
parent_role='organization.admin_role',
permissions = {'all': True}
)
auditor_role = ImplicitRoleField(
role_name='Project Auditor',
role_description='May read all settings associated with this project',
parent_role='organizations.auditor_role',
parent_role='organization.auditor_role',
permissions = {'read': True}
)
member_role = ImplicitRoleField(
@ -343,9 +350,9 @@ class Project(UnifiedJobTemplate, ProjectOptions, ResourceMixin):
success_notifiers = list(base_notifiers.filter(unifiedjobtemplate_notifiers_for_success=self))
any_notifiers = list(base_notifiers.filter(unifiedjobtemplate_notifiers_for_any=self))
# Get Organization Notifiers
error_notifiers = set(error_notifiers + list(base_notifiers.filter(organization_notifiers_for_errors__in=self.organizations.all())))
success_notifiers = set(success_notifiers + list(base_notifiers.filter(organization_notifiers_for_success__in=self.organizations.all())))
any_notifiers = set(any_notifiers + list(base_notifiers.filter(organization_notifiers_for_any__in=self.organizations.all())))
error_notifiers = set(error_notifiers + list(base_notifiers.filter(organization_notifiers_for_errors=self.organization)))
success_notifiers = set(success_notifiers + list(base_notifiers.filter(organization_notifiers_for_success=self.organization)))
any_notifiers = set(any_notifiers + list(base_notifiers.filter(organization_notifiers_for_any=self.organization)))
return dict(error=list(error_notifiers), success=list(success_notifiers), any=list(any_notifiers))
def get_absolute_url(self):

View File

@ -92,7 +92,7 @@ def test_basic_fields(hosts, fact_scans, get, user):
}
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, get_params=search)
results = response.data['results']
assert 'related' in results[0]
assert 'timestamp' in results[0]
@ -118,12 +118,12 @@ def test_basic_options_fields(hosts, fact_scans, options, user):
@pytest.mark.django_db
def test_related_fact_view(hosts, fact_scans, get, user):
epoch = timezone.now()
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch)
facts_known = Fact.get_timeline(host.id)
assert 9 == len(facts_known)
assert 9 == len(response.data['results'])
for i, fact_known in enumerate(facts_known):
check_url(response.data['results'][i]['related']['fact_view'], fact_known, fact_known.module)
@ -131,12 +131,12 @@ def test_related_fact_view(hosts, fact_scans, get, user):
@pytest.mark.django_db
def test_multiple_hosts(hosts, fact_scans, get, user):
epoch = timezone.now()
(host, response) = setup_common(hosts, fact_scans, get, user, epoch=epoch, host_count=3)
facts_known = Fact.get_timeline(host.id)
assert 9 == len(facts_known)
assert 9 == len(response.data['results'])
for i, fact_known in enumerate(facts_known):
check_url(response.data['results'][i]['related']['fact_view'], fact_known, fact_known.module)
@ -153,7 +153,7 @@ def test_param_to_from(hosts, fact_scans, get, user):
facts_known = Fact.get_timeline(host.id, ts_from=search['from'], ts_to=search['to'])
assert 9 == len(facts_known)
assert 9 == len(response.data['results'])
check_response_facts(facts_known, response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@ -168,7 +168,7 @@ def test_param_module(hosts, fact_scans, get, user):
facts_known = Fact.get_timeline(host.id, module=search['module'])
assert 3 == len(facts_known)
assert 3 == len(response.data['results'])
check_response_facts(facts_known, response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@ -183,7 +183,7 @@ def test_param_from(hosts, fact_scans, get, user):
facts_known = Fact.get_timeline(host.id, ts_from=search['from'])
assert 3 == len(facts_known)
assert 3 == len(response.data['results'])
check_response_facts(facts_known, response)
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@ -198,14 +198,14 @@ def test_param_to(hosts, fact_scans, get, user):
facts_known = Fact.get_timeline(host.id, ts_to=search['to'])
assert 6 == len(facts_known)
assert 6 == len(response.data['results'])
check_response_facts(facts_known, response)
def _test_user_access_control(hosts, fact_scans, get, user_obj, team_obj):
hosts = hosts(host_count=1)
fact_scans(fact_scans=1)
team_obj.users.add(user_obj)
team_obj.member_role.members.add(user_obj)
url = reverse('api:host_fact_versions_list', args=(hosts[0].pk,))
response = get(url, user_obj)
@ -235,7 +235,7 @@ def test_super_user_ok(hosts, fact_scans, get, user, team):
@pytest.mark.django_db
def test_user_admin_ok(organization, hosts, fact_scans, get, user, team):
user_admin = user('johnson', False)
organization.admins.add(user_admin)
organization.admin_role.members.add(user_admin)
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)
@ -247,7 +247,7 @@ def test_user_admin_ok(organization, hosts, fact_scans, get, user, team):
def test_user_admin_403(organization, organizations, hosts, fact_scans, get, user, team):
user_admin = user('johnson', False)
org2 = organizations(1)
org2[0].admins.add(user_admin)
org2[0].admin_role.members.add(user_admin)
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)

View File

@ -87,7 +87,7 @@ def test_basic_fields(hosts, fact_scans, get, user):
assert 'description' in response.data['summary_fields']['host']
assert 'host' in response.data['related']
assert reverse('api:host_detail', args=(hosts[0].pk,)) == response.data['related']['host']
@mock.patch('awx.api.views.feature_enabled', new=mock_feature_enabled)
@pytest.mark.django_db
def test_content(hosts, fact_scans, get, user, fact_ansible_json):
@ -103,7 +103,7 @@ def _test_search_by_module(hosts, fact_scans, get, user, fact_json, module_name)
'module': module_name
}
(fact_known, response) = setup_common(hosts, fact_scans, get, user, module_name=module_name, get_params=params)
assert fact_json == json.loads(response.data['facts'])
assert timestamp_apiformat(fact_known.timestamp) == response.data['timestamp']
assert module_name == response.data['module']
@ -132,7 +132,7 @@ def _test_user_access_control(hosts, fact_scans, get, user_obj, team_obj):
hosts = hosts(host_count=1)
fact_scans(fact_scans=1)
team_obj.users.add(user_obj)
team_obj.member_role.members.add(user_obj)
url = reverse('api:host_fact_compare_view', args=(hosts[0].pk,))
response = get(url, user_obj)
@ -162,7 +162,7 @@ def test_super_user_ok(hosts, fact_scans, get, user, team):
@pytest.mark.django_db
def test_user_admin_ok(organization, hosts, fact_scans, get, user, team):
user_admin = user('johnson', False)
organization.admins.add(user_admin)
organization.admin_role.members.add(user_admin)
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)
@ -174,7 +174,7 @@ def test_user_admin_ok(organization, hosts, fact_scans, get, user, team):
def test_user_admin_403(organization, organizations, hosts, fact_scans, get, user, team):
user_admin = user('johnson', False)
org2 = organizations(1)
org2[0].admins.add(user_admin)
org2[0].admin_role.members.add(user_admin)
response = _test_user_access_control(hosts, fact_scans, get, user_admin, team)

View File

@ -97,8 +97,9 @@ def project(instance, organization):
prj = Project.objects.create(name="test-proj",
description="test-proj-desc",
scm_type="git",
scm_url="https://github.com/jlaska/ansible-playbooks")
prj.organizations.add(organization)
scm_url="https://github.com/jlaska/ansible-playbooks",
organization=organization
)
return prj
@pytest.fixture

View File

@ -69,7 +69,7 @@ def test_encrypted_subfields(get, post, user, organization):
assert response.data['notification_configuration']['account_token'] == "$encrypted$"
with mock.patch.object(notifier_actual.notification_class, "send_messages", assert_send):
notifier_actual.send("Test", {'body': "Test"})
@pytest.mark.django_db
def test_inherited_notifiers(get, post, user, organization, project):
u = user('admin-poster', True)
@ -86,7 +86,6 @@ def test_inherited_notifiers(get, post, user, organization, project):
u)
assert response.status_code == 201
notifiers.append(response.data['id'])
organization.projects.add(project)
i = Inventory.objects.create(name='test', organization=organization)
i.save()
g = Group.objects.create(name='test', inventory=i)
@ -109,7 +108,6 @@ def test_inherited_notifiers(get, post, user, organization, project):
@pytest.mark.django_db
def test_notifier_merging(get, post, user, organization, project, notifier):
user('admin-poster', True)
organization.projects.add(project)
organization.notifiers_any.add(notifier)
project.notifiers_any.add(notifier)
assert len(project.notifiers['any']) == 1

View File

@ -4,6 +4,7 @@ from awx.main.models import (
Role,
RolePermission,
Organization,
Group,
)
@ -97,20 +98,24 @@ def test_team_symantics(organization, team, alice):
assert organization.accessible_by(alice, {'read': True}) is False
@pytest.mark.django_db
def test_auto_m2m_adjuments(organization, project, alice):
def test_auto_m2m_adjuments(organization, inventory, group, alice):
'Ensures the auto role reparenting is working correctly through m2m maps'
organization.admin_role.members.add(alice)
assert project.accessible_by(alice, {'read': True}) is True
g1 = group(name='g1')
g1.admin_role.members.add(alice)
assert g1.accessible_by(alice, {'read': True}) is True
g2 = group(name='g2')
assert g2.accessible_by(alice, {'read': True}) is False
project.organizations.remove(organization)
assert project.accessible_by(alice, {'read': True}) is False
project.organizations.add(organization)
assert project.accessible_by(alice, {'read': True}) is True
g2.parents.add(g1)
assert g2.accessible_by(alice, {'read': True}) is True
g2.parents.remove(g1)
assert g2.accessible_by(alice, {'read': True}) is False
g1.children.add(g2)
assert g2.accessible_by(alice, {'read': True}) is True
g1.children.remove(g2)
assert g2.accessible_by(alice, {'read': True}) is False
organization.projects.remove(project)
assert project.accessible_by(alice, {'read': True}) is False
organization.projects.add(project)
assert project.accessible_by(alice, {'read': True}) is True
@pytest.mark.django_db
def test_auto_field_adjuments(organization, inventory, team, alice):

View File

@ -16,7 +16,7 @@ def test_job_template_migration_check(deploy_jobtemplate, check_jobtemplate, use
joe = user('joe')
check_jobtemplate.project.organizations.all()[0].deprecated_users.add(joe)
check_jobtemplate.project.organization.deprecated_users.add(joe)
Permission(user=joe, inventory=check_jobtemplate.inventory, permission_type='read').save()
Permission(user=joe, inventory=check_jobtemplate.inventory,
@ -45,7 +45,7 @@ def test_job_template_migration_deploy(deploy_jobtemplate, check_jobtemplate, us
joe = user('joe')
deploy_jobtemplate.project.organizations.all()[0].deprecated_users.add(joe)
deploy_jobtemplate.project.organization.deprecated_users.add(joe)
Permission(user=joe, inventory=deploy_jobtemplate.inventory, permission_type='read').save()
Permission(user=joe, inventory=deploy_jobtemplate.inventory,
@ -77,7 +77,7 @@ def test_job_template_team_migration_check(deploy_jobtemplate, check_jobtemplate
team.organization = organization
team.save()
check_jobtemplate.project.organizations.all()[0].deprecated_users.add(joe)
check_jobtemplate.project.organization.deprecated_users.add(joe)
Permission(team=team, inventory=check_jobtemplate.inventory, permission_type='read').save()
Permission(team=team, inventory=check_jobtemplate.inventory,
@ -112,7 +112,7 @@ def test_job_template_team_deploy_migration(deploy_jobtemplate, check_jobtemplat
team.organization = organization
team.save()
deploy_jobtemplate.project.organizations.all()[0].deprecated_users.add(joe)
deploy_jobtemplate.project.organization.deprecated_users.add(joe)
Permission(team=team, inventory=deploy_jobtemplate.inventory, permission_type='read').save()
Permission(team=team, inventory=deploy_jobtemplate.inventory,

View File

@ -1,12 +1,95 @@
import pytest
from awx.main.migrations import _rbac as rbac
from awx.main.models import Role
from awx.main.models.organization import Permission
from awx.main.models import Role, Permission, Project, Organization, Credential, JobTemplate, Inventory
from django.apps import apps
from awx.main.migrations import _old_access as old_access
@pytest.mark.django_db
def test_project_migration():
'''
o1 o2 o3 with o1 -- i1 o2 -- i2
\ | /
\ | /
c1 ---- p1
/ | \
/ | \
jt1 jt2 jt3
| | |
i1 i2 i1
goes to
o1
|
|
c1 ---- p1
/ |
/ |
jt1 jt3
| |
i1 i1
o2
|
|
c1 ---- p2
|
|
jt2
|
i2
o3
|
|
c1 ---- p3
'''
o1 = Organization.objects.create(name='o1')
o2 = Organization.objects.create(name='o2')
o3 = Organization.objects.create(name='o3')
c1 = Credential.objects.create(name='c1')
p1 = Project.objects.create(name='p1', credential=c1)
p1.deprecated_organizations.add(o1, o2, o3)
i1 = Inventory.objects.create(name='i1', organization=o1)
i2 = Inventory.objects.create(name='i2', organization=o2)
jt1 = JobTemplate.objects.create(name='jt1', project=p1, inventory=i1)
jt2 = JobTemplate.objects.create(name='jt2', project=p1, inventory=i2)
jt3 = JobTemplate.objects.create(name='jt3', project=p1, inventory=i1)
assert o1.projects.count() == 0
assert o2.projects.count() == 0
assert o3.projects.count() == 0
rbac.migrate_projects(apps, None)
jt1 = JobTemplate.objects.get(pk=jt1.pk)
jt2 = JobTemplate.objects.get(pk=jt2.pk)
jt3 = JobTemplate.objects.get(pk=jt3.pk)
assert jt1.project == jt3.project
assert jt1.project != jt2.project
assert o1.projects.count() == 1
assert o2.projects.count() == 1
assert o3.projects.count() == 1
assert o1.projects.all()[0].jobtemplates.count() == 2
assert o2.projects.all()[0].jobtemplates.count() == 1
assert o3.projects.all()[0].jobtemplates.count() == 0
@pytest.mark.django_db
def test_project_user_project(user_project, project, user):
u = user('owner')

View File

@ -39,7 +39,7 @@ class BaseAdHocCommandTest(BaseJobExecutionTest):
self.setup_instances()
self.setup_users()
self.organization = self.make_organizations(self.super_django_user, 1)[0]
self.organization.admins.add(self.normal_django_user)
self.organization.admin_role.members.add(self.normal_django_user)
self.inventory = self.organization.inventories.create(name='test-inventory', description='description for test-inventory')
self.host = self.inventory.hosts.create(name='host.example.com')
self.host2 = self.inventory.hosts.create(name='host2.example.com')