forked from saratov/infra
update tester role with abstract exploit executor
This commit is contained in:
parent
2e2fba457f
commit
ef92e28885
@ -4,11 +4,13 @@ tester_required_vars:
|
||||
- tester_cve_repo
|
||||
|
||||
tester_packages:
|
||||
- pytest
|
||||
- python-module-pytest
|
||||
- python-module-pytest-bdd
|
||||
- python-module-paramiko
|
||||
- git
|
||||
- curl
|
||||
- gcc
|
||||
|
||||
tester_username: abuser
|
||||
tester_username: abuser_sudo
|
||||
tester_username_sudo: abuser_sudo
|
||||
|
@ -16,13 +16,13 @@
|
||||
|
||||
- name: "ensure that the {{ tester_username_sudo }} exists"
|
||||
user:
|
||||
name: "{{ tester_username }}"
|
||||
name: "{{ tester_username_sudo }}"
|
||||
groups: wheel
|
||||
append: true
|
||||
|
||||
- name: fetch CVE repository
|
||||
git:
|
||||
repo: "{{ tester_cve_repo }}"
|
||||
dist: "/{{ tester_username }}/cve"
|
||||
dest: "/home/{{ tester_username_sudo }}/cve"
|
||||
become: yes
|
||||
become_user: "{{ tester_username }}"
|
||||
become_user: "{{ tester_username_sudo }}"
|
||||
|
0
roles/tester/tests/__init__.py
Normal file
0
roles/tester/tests/__init__.py
Normal file
108
roles/tester/tests/conftest.py
Normal file
108
roles/tester/tests/conftest.py
Normal file
@ -0,0 +1,108 @@
|
||||
"""Configuration for pytest runner."""
|
||||
|
||||
from pytest_bdd import given, when
|
||||
import pytest
|
||||
import paramiko
|
||||
from paramiko.config import SSHConfig
|
||||
from os.path import expanduser
|
||||
import sys
|
||||
import os
|
||||
import inspect
|
||||
|
||||
pytest_plugins = "pytester"
|
||||
cvet = {}
|
||||
|
||||
SSH_USERNAME = os.getenv("SSH_USERNAME", "root")
|
||||
|
||||
class Target:
|
||||
def __init__(self, host):
|
||||
self.host = host
|
||||
config_file = open(expanduser('.tmp/ssh_config'))
|
||||
config = SSHConfig()
|
||||
config.parse(config_file)
|
||||
ip = config.lookup(host).get('hostname', None)
|
||||
port = config.lookup(host).get('port', 22)
|
||||
pk = config.lookup(host).get('identityfile', None)
|
||||
|
||||
self.ssh = paramiko.SSHClient()
|
||||
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
self.ssh.connect(hostname=ip, port=int(port), username=SSH_USERNAME, key_filename=pk)
|
||||
s = self.ssh.get_transport().open_session()
|
||||
paramiko.agent.AgentRequestHandler(s)
|
||||
def __exit__(self):
|
||||
self.ssh.close()
|
||||
|
||||
def exec_command(self, cmd):
|
||||
ssh_stdin, ssh_stdout, ssh_stderr = self.ssh.exec_command(cmd)
|
||||
self.res = {'rc': ssh_stdout.channel.recv_exit_status(),
|
||||
'cmd': cmd,
|
||||
'host': self.host,
|
||||
'stdout': ssh_stdout.read(),
|
||||
'stderr': ssh_stderr.read()}
|
||||
return self.res
|
||||
|
||||
|
||||
@given("I have a root fixture")
|
||||
def root():
|
||||
return "root"
|
||||
|
||||
|
||||
@when("I use a when step from the parent conftest")
|
||||
def global_when():
|
||||
pass
|
||||
|
||||
|
||||
def assert_cmd(expect, res):
|
||||
assert expect(res), "execution of '{}' failed on '{}' with '{}'; lambda is: {}".format(res['cmd'], res['host'], res['stdout'] + res['stderr'], inspect.getsource(expect))
|
||||
|
||||
|
||||
def read_env_vars():
|
||||
req_vars = ['CVET_ABUSER_NODE',
|
||||
'CVET_ABUSER_USER',
|
||||
'CVET_ABUSER_USER_WITH_SUDO',
|
||||
'CVET_VICTIM_NODE',
|
||||
'CVET_VICTIM_NODE_SHORT',
|
||||
'CVET_VICTIM_ADDRESS',
|
||||
'CVET_ABUSERS',
|
||||
'CVET_VICTIMS',
|
||||
'CVET_SRC_NIC',
|
||||
'CVET_CVE',
|
||||
'CVET_TTL',
|
||||
'CVET_SRC_START_ADDRESS_RANGE',
|
||||
'CVET_SRC_END_ADDRESS_RANGE'
|
||||
]
|
||||
list_vars = ['CVET_ABUSERS'
|
||||
'CVET_VICTIMS'
|
||||
]
|
||||
|
||||
for v in req_vars:
|
||||
if v not in os.environ:
|
||||
print('{} required but is not set'.format(v))
|
||||
sys.exit(1)
|
||||
cvet[v.split('_',1)[1].lower()] = os.environ[v] if v not in list_vars else filter(None, os.environ[v].split(' '))
|
||||
cvet['all_hosts'] = cvet['abusers'] + cvet['victims']
|
||||
read_env_vars()
|
||||
|
||||
@pytest.fixture(scope='session', params=cvet['all_hosts'])
|
||||
def ssh_all(request):
|
||||
return Target(request.param)
|
||||
|
||||
@pytest.fixture(scope='session', params=[cvet['abuser_node']])
|
||||
def ssh_abuser(request):
|
||||
return Target(request.param)
|
||||
|
||||
@pytest.fixture(scope='session', params=[cvet['victim_node']])
|
||||
def ssh_victim(request):
|
||||
return Target(request.param)
|
||||
|
||||
@pytest.fixture(scope='session', params=cvet['abusers'])
|
||||
def ssh_abusers(request):
|
||||
return Target(request.param)
|
||||
|
||||
@pytest.fixture(scope='session', params=cvet['victims'])
|
||||
def ssh_victims(request):
|
||||
return Target(request.param)
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def cvet_params():
|
||||
return cvet
|
48
roles/tester/tests/cve/test_cve_exploit.py
Normal file
48
roles/tester/tests/cve/test_cve_exploit.py
Normal file
@ -0,0 +1,48 @@
|
||||
# coding=utf-8
|
||||
"""Perform exploit from <cve> feature tests."""
|
||||
|
||||
from pytest_bdd import (
|
||||
given,
|
||||
scenario,
|
||||
then,
|
||||
when,
|
||||
)
|
||||
|
||||
|
||||
@scenario('../features/cve.feature', 'Victim node should pass all checks after performing exploit')
|
||||
def test_victim_node_should_pass_all_checks_after_performing_exploit(ssh_abuser, ssh_victim):
|
||||
"""Victim node should pass all checks after performing exploit."""
|
||||
|
||||
|
||||
@given('prepared <abuser> node')
|
||||
def prepared_abuser_node(ssh_abuser, cvet_params):
|
||||
"""prepared <abuser> node."""
|
||||
res = ssh_abuser.exec_command('for s in ~abuser/cve/{cve}/abuser-side-*.sh; do $s; done'.format(cve=cvet_params['cve']))
|
||||
assert res['rc'] == 0, "STDERR: {}\nSTDOUT: {}".format(res['stderr'], res['stdout'])
|
||||
|
||||
|
||||
@given('prepared <victim> node')
|
||||
def prepared_victim_node(ssh_abuser, ssh_victim, cvet_params):
|
||||
"""prepared <victim> node."""
|
||||
res = ssh_abuser.exec_command('for s in ~abuser/cve/{cve}/victim-side-*.sh; do scp $s {victim}:/tmp/$(basename $s); done'.format(cve=cvet_params['cve'], victim=cvet_params['victim_node_short']))
|
||||
assert res['rc'] == 0, "STDERR: {}\nSTDOUT: {}".format(res['stderr'], res['stdout'])
|
||||
res = ssh_victim.exec_command('for s in /tmp/victim-side-*.sh; do $s; done')
|
||||
assert res['rc'] == 0, "STDERR: {}\nSTDOUT: {}".format(res['stderr'], res['stdout'])
|
||||
|
||||
|
||||
@when('exploit is finished')
|
||||
def exploit_is_finished(ssh_abuser, cvet_params):
|
||||
"""exploit is finished."""
|
||||
env=' '.join(['{}="{}"'.format(k,v) for k,v in cvet_params.items()])
|
||||
res = ssh_abuser.exec_command('pushd ~abuser/cve/{cve}/; {env_vars} timeout --kill-after=10 --signal=9 -v {ttl} ./perform-sploit.sh'.format(cve=cvet_params['cve'], ttl=cvet_params['ttl'], env_vars=env))
|
||||
assert res['rc'] == (128 + 9), "STDERR: {}\nSTDOUT: {}".format(res['stderr'], res['stdout'])
|
||||
|
||||
|
||||
@then('all checks against <victim> are passed')
|
||||
def all_checks_against_victim_are_passed(ssh_abuser, ssh_victim, cvet_params):
|
||||
"""all checks against <victim> are passed."""
|
||||
env=' '.join(['{}="{}"'.format(k,v) for k,v in cvet_params.items()])
|
||||
res = ssh_abuser.exec_command('for s in ~abuser/cve/{cve}/check-victim-*.sh; do scp $s {victim}:/tmp/$(basename $s); done'.format(cve=cvet_params['cve'], victim=cvet_params['victim_node_short']))
|
||||
res = ssh_victim.exec_command('{env_vars} /tmp/check-victim-*.sh'.format(env_vars=env))
|
||||
assert res['rc'] == 0, "STDERR: {}\nSTDOUT: {}".format(res['stderr'], res['stdout'])
|
||||
|
8
roles/tester/tests/features/cve.feature
Normal file
8
roles/tester/tests/features/cve.feature
Normal file
@ -0,0 +1,8 @@
|
||||
Feature: Perform exploit from <cve>
|
||||
Environment should not be affectable by <cve> exploit
|
||||
|
||||
Scenario Outline: Victim node should pass all checks after performing exploit
|
||||
Given prepared <abuser> node
|
||||
And prepared <victim> node
|
||||
When exploit is finished
|
||||
Then all checks against <victim> are passed
|
Loading…
Reference in New Issue
Block a user