1
0
mirror of https://github.com/ansible/awx.git synced 2024-10-27 00:55:06 +03:00

reorganzing tests folder

make tests a module

refacotring to local imports for tests

fixing test import of tasks

fixing test import of tasks

more testing fixups
This commit is contained in:
Wayne Witzel III 2016-01-27 11:38:56 -05:00
parent 7b07bb7d93
commit 5d6ea242c0
44 changed files with 205 additions and 81 deletions

View File

@ -362,6 +362,9 @@ check: flake8 pep8 # pyflakes pylint
test:
$(PYTHON) manage.py test -v2 awx.main.tests
test_unit:
$(PYTHON) -m py.test awx/main/tests/unit
# Run all API unit tests with coverage enabled.
test_coverage:
coverage run manage.py test -v2 awx.main.tests

View File

@ -1,15 +1,16 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved
from django.test import TestCase
# AWX
from awx.main.tests.base import BaseTest
from awx.fact.models.fact import * # noqa
from awx.fact.utils.dbtransform import KeyTransform
#__all__ = ['DBTransformTest', 'KeyTransformUnitTest']
__all__ = ['KeyTransformUnitTest']
class KeyTransformUnitTest(BaseTest):
class KeyTransformUnitTest(TestCase):
def setUp(self):
super(KeyTransformUnitTest, self).setUp()
self.key_transform = KeyTransform([('.', '\uff0E'), ('$', '\uff04')])
@ -82,7 +83,7 @@ class KeyTransformUnitTest(BaseTest):
data = self.key_transform.transform_incoming(value, None)
self.assertEqual(data, value_transformed)
data = self.key_transform.transform_outgoing(value_transformed, None)
self.assertEqual(data, value)
@ -103,7 +104,7 @@ class KeyTransformUnitTest(BaseTest):
}
data = self.key_transform.transform_incoming(value, None)
self.assertEqual(data, value_transformed)
data = self.key_transform.transform_outgoing(value_transformed, None)
self.assertEqual(data, value)

View File

@ -1,10 +1,6 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
# Python
import base64
import re
# Django
from django.db import models
from django.utils.translation import ugettext_lazy as _

View File

@ -1,22 +0,0 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
from awx.main.tests.organizations import * # noqa
from awx.main.tests.users import * # noqa
from awx.main.tests.inventory import * # noqa
from awx.main.tests.projects import ProjectsTest, ProjectUpdatesTest # noqa
from awx.main.tests.commands import * # noqa
from awx.main.tests.scripts import * # noqa
from awx.main.tests.tasks import RunJobTest # noqa
from awx.main.tests.ad_hoc import * # noqa
from awx.main.tests.licenses import LicenseTests # noqa
from awx.main.tests.jobs import * # noqa
from awx.main.tests.activity_stream import * # noqa
from awx.main.tests.schedules import * # noqa
from awx.main.tests.redact import * # noqa
from awx.main.tests.views import * # noqa
from awx.main.tests.commands import * # noqa
from awx.main.tests.fact import * # noqa
from awx.main.tests.unified_jobs import * # noqa
from awx.main.tests.ha import * # noqa
from awx.main.tests.settings import * # noqa

View File

@ -0,0 +1,24 @@
# Copyright (c) 2015 Ansible, Inc.
# All Rights Reserved.
import logging
logging.disable(logging.CRITICAL)
from .organizations import * # noqa
from .users import * # noqa
from .inventory import * # noqa
from .projects import ProjectsTest, ProjectUpdatesTest # noqa
from .commands import * # noqa
from .scripts import * # noqa
from .tasks import RunJobTest # noqa
from .ad_hoc import * # noqa
from .licenses import LicenseTests # noqa
from .jobs import * # noqa
from .activity_stream import * # noqa
from .schedules import * # noqa
from .redact import * # noqa
from .views import * # noqa
from .commands import * # noqa
from .fact import * # noqa
from .unified_jobs import * # noqa
from .ha import * # noqa
from .settings import * # noqa

View File

@ -6,7 +6,7 @@ from django.core.urlresolvers import reverse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
from .base import BaseTest
class ActivityStreamTest(BaseTest):

View File

@ -18,8 +18,8 @@ from crum import impersonate
# AWX
from awx.main.utils import * # noqa
from awx.main.models import * # noqa
from awx.main.tests.base import BaseJobExecutionTest
from awx.main.tests.tasks import TEST_SSH_KEY_DATA, TEST_SSH_KEY_DATA_LOCKED, TEST_SSH_KEY_DATA_UNLOCK
from .base import BaseJobExecutionTest
from .tasks import TEST_SSH_KEY_DATA, TEST_SSH_KEY_DATA_LOCKED, TEST_SSH_KEY_DATA_UNLOCK
__all__ = ['RunAdHocCommandTest', 'AdHocCommandApiTest']

View File

@ -2,8 +2,8 @@
# All Rights Reserved
# AWX
from awx.main.tests.base import BaseTest
from awx.main.tests.commands.base import BaseCommandMixin
from ..base import BaseTest
from .base import BaseCommandMixin
__all__ = ['AgeDeletedCommandFunctionalTest']

View File

@ -11,7 +11,7 @@ from django.core.management import call_command
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTestMixin
from ..base import BaseTestMixin
class BaseCommandMixin(BaseTestMixin):
def create_test_inventories(self):

View File

@ -10,9 +10,9 @@ import mock
from django.core.management.base import CommandError
# AWX
from awx.main.tests.base import BaseTest
from ..base import BaseTest
from awx.fact.tests.base import MongoDBRequired, FactScanBuilder, TEST_FACT_PACKAGES, TEST_FACT_ANSIBLE, TEST_FACT_SERVICES
from awx.main.tests.commands.base import BaseCommandMixin
from .base import BaseCommandMixin
from awx.main.management.commands.cleanup_facts import Command, CleanupFacts
from awx.fact.models.fact import * # noqa
@ -45,7 +45,7 @@ class CleanupFactsCommandFunctionalTest(BaseCommandMixin, BaseTest, MongoDBRequi
def test_invoke_params_required(self):
result, stdout, stderr = self.run_command('cleanup_facts')
self.assertIsInstance(result, CommandError)
self.assertEqual(str(result), 'Both --granularity and --older_than are required.')
self.assertEqual(str(result), 'Both --granularity and --older_than are required.')
def test_module(self):
self.builder.add_fact('packages', TEST_FACT_PACKAGES)

View File

@ -24,7 +24,7 @@ from django.test.utils import override_settings
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest, BaseLiveServerTest
from ..base import BaseTest, BaseLiveServerTest
__all__ = ['CreateDefaultOrgTest', 'DumpDataTest', 'CleanupDeletedTest',
'CleanupJobsTest', 'CleanupActivityStreamTest',
@ -471,7 +471,7 @@ class CleanupJobsTest(BaseCommandMixin, BaseLiveServerTest):
self.assertTrue(ad_hoc_command.signal_start())
ad_hoc_command = AdHocCommand.objects.get(pk=ad_hoc_command.pk)
self.assertEqual(ad_hoc_command.status, 'successful')
# With days=1, no jobs will be deleted.
jobs_before = Job.objects.all().count()
self.assertTrue(jobs_before)
@ -690,7 +690,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
self.assertTrue('required' in str(result))
# Inventory ID, with invalid source.
invalid_source = ''.join([os.path.splitext(self.ini_path)[0] + '-invalid',
os.path.splitext(self.ini_path)[1]])
os.path.splitext(self.ini_path)[1]])
result, stdout, stderr = self.run_command('inventory_import',
inventory_id=inventory_id,
source=invalid_source)
@ -766,7 +766,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
self.assertEqual(group.children.count(), 0)
hosts = set(group.hosts.values_list('name', flat=True))
host_names = set(['db1.example.com','db2.example.com'])
self.assertEqual(hosts, host_names)
self.assertEqual(hosts, host_names)
elif group.name == 'webservers':
self.assertEqual(group.variables_dict, {'webvar': 'blah'})
self.assertEqual(group.children.count(), 0)
@ -860,7 +860,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
self.assertEqual(group.children.filter(active=True).count(), 0)
hosts = set(group.hosts.filter(active=True).values_list('name', flat=True))
host_names = set(['db1.example.com','db2.example.com'])
self.assertEqual(hosts, host_names)
self.assertEqual(hosts, host_names)
elif group.name == 'webservers':
self.assertEqual(group.variables_dict, {'webvar': 'blah'})
self.assertEqual(group.children.filter(active=True).count(), 0)
@ -1013,7 +1013,7 @@ class InventoryImportTest(BaseCommandMixin, BaseLiveServerTest):
rest_api_url = urlparse.urlunsplit([parts.scheme, netloc, parts.path,
parts.query, parts.fragment])
os.environ.setdefault('REST_API_URL', rest_api_url)
os.environ['INVENTORY_ID'] = str(old_inv.pk)
os.environ['INVENTORY_ID'] = str(old_inv.pk)
source = os.path.join(os.path.dirname(__file__), '..', '..', '..', 'plugins',
'inventory', 'awxrest.py')
result, stdout, stderr = self.run_command('inventory_import',

View File

@ -5,8 +5,8 @@
import uuid
# AWX
from awx.main.tests.base import BaseTest
from awx.main.tests.commands.base import BaseCommandMixin
from ..base import BaseTest
from .base import BaseCommandMixin
from awx.main.models import * # noqa
__all__ = ['RemoveInstanceCommandFunctionalTest']

View File

@ -10,9 +10,9 @@ from copy import deepcopy
from mock import MagicMock
# AWX
from awx.main.tests.base import BaseTest
from ..base import BaseTest
from awx.fact.tests.base import MongoDBRequired
from awx.main.tests.commands.base import BaseCommandMixin
from .base import BaseCommandMixin
from awx.main.management.commands.run_fact_cache_receiver import FactCacheReceiver
from awx.fact.models.fact import * # noqa

View File

@ -2,8 +2,8 @@
# All Rights Reserved
# AWX
from awx.main.tests.base import BaseTest
from awx.main.tests.commands.base import BaseCommandMixin
from ..base import BaseTest
from .base import BaseCommandMixin
# Django
from django.core.management.base import CommandError
@ -20,16 +20,16 @@ class UpdatePasswordCommandFunctionalTest(BaseCommandMixin, BaseTest):
def test_updated_ok(self):
result, stdout, stderr = self.run_command('update_password', username='admin', password='dingleberry')
self.assertEqual(stdout, 'Password updated\n')
def test_same_password(self):
result, stdout, stderr = self.run_command('update_password', username='admin', password='admin')
self.assertEqual(stdout, 'Password not updated\n')
def test_error_username_required(self):
result, stdout, stderr = self.run_command('update_password', password='foo')
self.assertIsInstance(result, CommandError)
self.assertEqual(str(result), 'username required')
def test_error_password_required(self):
result, stdout, stderr = self.run_command('update_password', username='admin')
self.assertIsInstance(result, CommandError)

View File

@ -10,7 +10,7 @@ from django.core.urlresolvers import reverse
# AWX
from awx.main.utils import timestamp_apiformat
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
from ..base import BaseLiveServerTest
from awx.fact.models import * # noqa
from awx.fact.tests.base import BaseFactTestMixin, FactScanBuilder, TEST_FACT_ANSIBLE, TEST_FACT_PACKAGES, TEST_FACT_SERVICES
from awx.main.utils import build_url
@ -181,7 +181,7 @@ class FactViewApiTest(FactApiBaseTest):
def test_view_time_filter(self):
self.setup_facts(6)
ts = self.builder.get_timestamp(3)
self.get_fact(Fact.objects.filter(host=self.fact_host, module='ansible', timestamp__lte=ts).order_by('-timestamp')[0],
self.get_fact(Fact.objects.filter(host=self.fact_host, module='ansible', timestamp__lte=ts).order_by('-timestamp')[0],
dict(datetime=ts))

View File

@ -18,7 +18,7 @@ from django.utils.timezone import now
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest, BaseTransactionTest
from .base import BaseTest, BaseTransactionTest
__all__ = ['InventoryTest', 'InventoryUpdatesTest', 'InventoryCredentialTest']

View File

@ -3,7 +3,7 @@ import uuid
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTestMixin
from ..base import BaseTestMixin
TEST_PLAYBOOK = '''- hosts: all
gather_facts: false
@ -128,7 +128,7 @@ class BaseJobTestMixin(BaseTestMixin):
# He works with Randall
self.user_billybob = self.make_user('billybob')
self.org_ops.users.add(self.user_billybob)
# Jim is the newest intern. He can login, but can't do anything quite yet
# except make everyone else fresh coffee.
self.user_jim = self.make_user('jim')
@ -256,7 +256,7 @@ class BaseJobTestMixin(BaseTestMixin):
self.team_ops_testers.users.add(self.user_billybob)
# Each user has his/her own set of credentials.
from awx.main.tests.tasks import (TEST_SSH_KEY_DATA,
from ..tasks import (TEST_SSH_KEY_DATA,
TEST_SSH_KEY_DATA_LOCKED,
TEST_SSH_KEY_DATA_UNLOCK)
self.cred_sue = self.user_sue.credentials.create(
@ -429,7 +429,7 @@ class BaseJobTestMixin(BaseTestMixin):
#self.permission1 = Permission.objects.create(
# inventory = self.inventory,
# project = self.project,
# team = self.team,
# team = self.team,
# permission_type = PERM_INVENTORY_DEPLOY,
# created_by = self.normal_django_user
#)
@ -441,7 +441,7 @@ class BaseJobTestMixin(BaseTestMixin):
# permission_type = PERM_INVENTORY_CHECK,
# created_by = self.normal_django_user
#)
# Engineering has job templates to check/run the dev project onto
# their own inventory.
self.jt_eng_check = JobTemplate.objects.create(

View File

@ -10,7 +10,7 @@ from django.core.urlresolvers import reverse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
from ..base import BaseLiveServerTest
from .base import BaseJobTestMixin
__all__ = ['JobRelaunchTest',]
@ -63,7 +63,7 @@ class JobRelaunchTest(BaseJobTestMixin, BaseLiveServerTest):
self.jt_ops_east_run.extra_vars = jt_extra_vars
self.jt_ops_east_run.save()
response = self.post(url, {}, expect=201)
j = Job.objects.get(pk=response['job'])
self.assertTrue(j.status == 'successful')

View File

@ -10,7 +10,7 @@ from django.conf import settings
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
from ..base import BaseLiveServerTest
from .base import BaseJobTestMixin
__all__ = ['JobStartCancelTest',]
@ -92,7 +92,7 @@ class JobStartCancelTest(BaseJobTestMixin, BaseLiveServerTest):
# self.assertEqual(job.status, 'failed')
# Test with a job that prompts for SSH unlock key, given the right key.
from awx.main.tests.tasks import TEST_SSH_KEY_DATA_UNLOCK
from ..tasks import TEST_SSH_KEY_DATA_UNLOCK
# job = self.jt_ops_west_run.create_job(
# credential=self.cred_greg,
# created_by=self.user_sue,

View File

@ -6,7 +6,7 @@ from django.core.urlresolvers import reverse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest, QueueStartStopTestMixin
from ..base import BaseTest, QueueStartStopTestMixin
__all__ = ['SurveyPasswordRedactedTest']
@ -87,7 +87,7 @@ TEST_COMPLEX_SURVEY = '''
TEST_SINGLE_PASSWORDS = [
{
{
'description': 'Single instance with a . after',
'text' : 'See spot. See spot run. See spot run %s. That is a fast run.' % PASSWORD,
'passwords': [PASSWORD],
@ -233,5 +233,5 @@ class SurveyPasswordRedactedTest(SurveyPasswordBaseTest):
# should redact values in extra_vars
def test_redact_job_extra_vars(self):
for test in self.tests['simple']:
response = self._get_url_job_details(test['job'])
response = self._get_url_job_details(test['job'])
self.check_extra_vars_redacted(test, response)

View File

@ -6,7 +6,7 @@ import os
import tempfile
from awx.main.models import Host, Inventory, Organization
from awx.main.tests.base import BaseTest
from .base import BaseTest
import awx.main.task_engine
from awx.main.task_engine import * # noqa

View File

@ -12,7 +12,7 @@ from django.utils.timezone import now as tz_now
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
from .base import BaseTest
__all__ = ['AuthTokenLimitUnitTest', 'OrganizationsTest']

View File

@ -20,8 +20,8 @@ from django.utils.timezone import now
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTransactionTest
from awx.main.tests.tasks import TEST_SSH_KEY_DATA, TEST_SSH_KEY_DATA_LOCKED, TEST_SSH_KEY_DATA_UNLOCK, TEST_OPENSSH_KEY_DATA, TEST_OPENSSH_KEY_DATA_LOCKED
from .base import BaseTransactionTest
from .tasks import TEST_SSH_KEY_DATA, TEST_SSH_KEY_DATA_LOCKED, TEST_SSH_KEY_DATA_UNLOCK, TEST_OPENSSH_KEY_DATA, TEST_OPENSSH_KEY_DATA_LOCKED
from awx.main.utils import decrypt_field, update_scm_url
TEST_PLAYBOOK = '''- hosts: mygroup

View File

@ -3,7 +3,7 @@ import textwrap
# AWX
from awx.main.redact import UriCleaner
from awx.main.tests.base import BaseTest, URI
from .base import BaseTest, URI
__all__ = ['UriCleanTests']

View File

@ -10,7 +10,7 @@ from django.utils.timezone import now
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
from .base import BaseTest
__all__ = ['ScheduleTest']

View File

@ -10,7 +10,7 @@ import urlparse
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseLiveServerTest
from .base import BaseLiveServerTest
__all__ = ['InventoryScriptTest']

View File

@ -1,7 +1,7 @@
# Copyright (c) 2016 Ansible, Inc.
# All Rights Reserved.
from awx.main.tests.base import BaseTest
from .base import BaseTest
from awx.main.models import * # noqa
from django.core.urlresolvers import reverse

View File

@ -21,7 +21,7 @@ from crum import impersonate
# AWX
from awx.main.utils import * # noqa
from awx.main.models import * # noqa
from awx.main.tests.base import BaseJobExecutionTest
from .base import BaseJobExecutionTest
TEST_PLAYBOOK = u'''
- name: test success

View File

@ -15,7 +15,7 @@ from django.test.utils import override_settings
# AWX
from awx.main.models import * # noqa
from awx.main.tests.base import BaseTest
from .base import BaseTest
from awx.main.conf import tower_settings
__all__ = ['AuthTokenTimeoutTest', 'AuthTokenLimitTest', 'AuthTokenProxyTest', 'UsersTest', 'LdapTest']

View File

@ -2,8 +2,8 @@
from django.core.urlresolvers import reverse
# Reuse Test code
from awx.main.tests.base import BaseLiveServerTest, QueueStartStopTestMixin
from awx.main.tests.base import URI
from .base import BaseLiveServerTest, QueueStartStopTestMixin
from .base import URI
from awx.main.models.projects import * # noqa
__all__ = ['UnifiedJobStdoutRedactedTests']

View File

View File

@ -20,6 +20,7 @@ import tempfile
from rest_framework.exceptions import ParseError, PermissionDenied
from django.utils.encoding import smart_str
from django.core.urlresolvers import reverse
from django.core.exceptions import ValidationError
# PyCrypto
from Crypto.Cipher import AES
@ -522,4 +523,125 @@ def timedelta_total_seconds(timedelta):
timedelta.microseconds + 0.0 +
(timedelta.seconds + timedelta.days * 24 * 3600) * 10 ** 6) / 10 ** 6
def validate_ssh_private_key(data):
"""Validate that the given SSH private key or certificate is,
in fact, valid.
"""
# Map the X in BEGIN X PRIVATE KEY to the key type (ssh-keygen -t).
# Tower jobs using OPENSSH format private keys may still fail if the
# system SSH implementation lacks support for this format.
key_types = {
'RSA': 'rsa',
'DSA': 'dsa',
'EC': 'ecdsa',
'OPENSSH': 'ed25519',
'': 'rsa1',
}
# Key properties to return if valid.
key_data = {
'key_type': None, # Key type (from above mapping).
'key_seg': '', # Key segment (all text including begin/end).
'key_b64': '', # Key data as base64.
'key_bin': '', # Key data as binary.
'key_enc': None, # Boolean, whether key is encrypted.
'cert_seg': '', # Cert segment (all text including begin/end).
'cert_b64': '', # Cert data as base64.
'cert_bin': '', # Cert data as binary.
}
data = data.strip()
validation_error = ValidationError('Invalid private key')
# Sanity check: We may potentially receive a full PEM certificate,
# and we want to accept these.
cert_begin_re = r'(-{4,})\s*BEGIN\s+CERTIFICATE\s*(-{4,})'
cert_end_re = r'(-{4,})\s*END\s+CERTIFICATE\s*(-{4,})'
cert_begin_match = re.search(cert_begin_re, data)
cert_end_match = re.search(cert_end_re, data)
if cert_begin_match and not cert_end_match:
raise validation_error
elif not cert_begin_match and cert_end_match:
raise validation_error
elif cert_begin_match and cert_end_match:
cert_dashes = set([cert_begin_match.groups()[0], cert_begin_match.groups()[1],
cert_end_match.groups()[0], cert_end_match.groups()[1]])
if len(cert_dashes) != 1:
raise validation_error
key_data['cert_seg'] = data[cert_begin_match.start():cert_end_match.end()]
# Find the private key, and also ensure that it internally matches
# itself.
# Set up the valid private key header and footer.
begin_re = r'(-{4,})\s*BEGIN\s+([A-Z0-9]+)?\s*PRIVATE\sKEY\s*(-{4,})'
end_re = r'(-{4,})\s*END\s+([A-Z0-9]+)?\s*PRIVATE\sKEY\s*(-{4,})'
begin_match = re.search(begin_re, data)
end_match = re.search(end_re, data)
if not begin_match or not end_match:
raise validation_error
# Ensure that everything, such as dash counts and key type, lines up,
# and raise an error if it does not.
dashes = set([begin_match.groups()[0], begin_match.groups()[2],
end_match.groups()[0], end_match.groups()[2]])
if len(dashes) != 1:
raise validation_error
if begin_match.groups()[1] != end_match.groups()[1]:
raise validation_error
key_type = begin_match.groups()[1] or ''
try:
key_data['key_type'] = key_types[key_type]
except KeyError:
raise ValidationError('Invalid private key: unsupported type %s' % key_type)
# The private key data begins and ends with the private key.
key_data['key_seg'] = data[begin_match.start():end_match.end()]
# Establish that we are able to base64 decode the private key;
# if we can't, then it's not a valid key.
#
# If we got a certificate, validate that also, in the same way.
header_re = re.compile(r'^(.+?):\s*?(.+?)(\\??)$')
for segment_name in ('cert', 'key'):
segment_to_validate = key_data['%s_seg' % segment_name]
# If we have nothing; skip this one.
# We've already validated that we have a private key above,
# so we don't need to do it again.
if not segment_to_validate:
continue
# Ensure that this segment is valid base64 data.
base64_data = ''
line_continues = False
lines = segment_to_validate.splitlines()
for line in lines[1:-1]:
line = line.strip()
if not line:
continue
if line_continues:
line_continues = line.endswith('\\')
continue
line_match = header_re.match(line)
if line_match:
line_continues = line.endswith('\\')
continue
base64_data += line
try:
decoded_data = base64.b64decode(base64_data)
if not decoded_data:
raise validation_error
key_data['%s_b64' % segment_name] = base64_data
key_data['%s_bin' % segment_name] = decoded_data
except TypeError:
raise validation_error
# Determine if key is encrypted.
if key_data['key_type'] == 'ed25519':
# See https://github.com/openssh/openssh-portable/blob/master/sshkey.c#L3218
# Decoded key data starts with magic string (null-terminated), four byte
# length field, followed by the ciphername -- if ciphername is anything
# other than 'none' the key is encrypted.
key_data['key_enc'] = not bool(key_data['key_bin'].startswith('openssh-key-v1\x00\x00\x00\x00\x04none'))
else:
key_data['key_enc'] = bool('ENCRYPTED' in key_data['key_seg'])
return key_data