pve_tests/main.py

112 lines
3.0 KiB
Python

import logging
import urllib3
from typing import Callable, List
from time import sleep
from pathlib import Path
from proxmoxer import ProxmoxAPI
from functions import clone_template, get_vm_ip
urllib3.disable_warnings()
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
PROXMOX_HOST = 'pve.office.basealt.ru'
PROXMOX_USER = 'stepchenkoas@BaseALT'
PROXMOX_PASSWORD = Path('pve_secret').read_text().strip()
def wait_status(funcs: List[Callable], status: str):
statuses = [False] * len(funcs)
while not all(statuses):
logger.debug(statuses)
sleep(5)
for i in range(len(funcs)):
statuses[i] = funcs[i]()['status'] == status
def main():
proxmox = ProxmoxAPI(
PROXMOX_HOST, user=PROXMOX_USER, password=PROXMOX_PASSWORD, verify_ssl=False
)
nodes = proxmox.nodes.get()
nodes = list(filter(lambda node: node['status'] == 'online', nodes))
node = min(nodes, key=lambda node: node['disk'])['node']
template_id = 374
prefix = 'stepchenkaos-test-k8s'
vm_id_master, upid_master = clone_template(
proxmox,
node,
template_id,
name=f'{prefix}-master',
)
vm_id_worker1, upid_worker1 = clone_template(
proxmox,
node,
template_id,
name=f'{prefix}-node1',
)
vm_id_worker2, upid_worker2 = clone_template(
proxmox,
node,
template_id,
name=f'{prefix}-node2',
)
logger.info('Waiting for clone tasks to complete...')
wait_status(
[
proxmox.nodes(node).tasks(upid_master).status.get,
proxmox.nodes(node).tasks(upid_worker1).status.get,
proxmox.nodes(node).tasks(upid_worker2).status.get,
],
'stopped'
)
logger.info('Tasks completed!')
logger.info('Starting VMs...')
proxmox.nodes(node).qemu(vm_id_master).status.start.post()
proxmox.nodes(node).qemu(vm_id_worker1).status.start.post()
proxmox.nodes(node).qemu(vm_id_worker2).status.start.post()
wait_status(
[
proxmox.nodes(node).qemu(vm_id_master).status.current.get,
proxmox.nodes(node).qemu(vm_id_worker1).status.current.get,
proxmox.nodes(node).qemu(vm_id_worker2).status.current.get,
],
'running'
)
sleep(40)
logger.info('VMs are running!')
vm_ip_master = get_vm_ip(proxmox, node, vm_id_master)
vm_ip_worker1 = get_vm_ip(proxmox, node, vm_id_worker1)
vm_ip_worker2 = get_vm_ip(proxmox, node, vm_id_worker2)
with open('vm_ids' ,'w') as ofile:
vm_ids = f'{vm_id_master}\tmaster\n'
vm_ids += f'{vm_id_worker1}\tworker1\n'
vm_ids += f'{vm_id_worker2}\tworker2\n'
logger.info(vm_ids)
ofile.write(vm_ids)
with open('hosts' ,'w') as ofile:
hosts = f'{vm_ip_master}\tmaster\n'
hosts += f'{vm_ip_worker1}\tworker1\n'
hosts += f'{vm_ip_worker2}\tworker2\n'
logger.info(hosts)
ofile.write(hosts)
proxmox.logout()
if __name__ == '__main__':
main()