pve_tests/main.py

213 lines
6.1 KiB
Python

#!/usr/bin/python3
import logging
import os
import sys
from time import sleep
from typing import Callable
import urllib3
import yaml
from dotenv import load_dotenv
from proxmoxer import ProxmoxAPI
from functions import clone_template, delete_vm, get_vm_ip
urllib3.disable_warnings()
FORMAT = "%(asctime)s %(name)s %(levelname)s %(message)s"
logging.basicConfig(format=FORMAT)
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
load_dotenv()
PROXMOX_HOST: str | None = os.environ.get("PROXMOX_HOST")
PROXMOX_USER: str | None = os.environ.get("PROXMOX_USER")
PROXMOX_USER_FULL: str | None = os.environ.get("PROXMOX_USER_FULL")
PROXMOX_PASSWORD: str | None = os.environ.get("PROXMOX_PASSWORD")
def wait_status(
funcs: list[Callable[[], dict[str, str]]], status: str, sleep_time: float = 60
) -> None:
statuses = [False] * len(funcs)
while not all(statuses):
logger.debug(statuses)
sleep(sleep_time)
for i in range(len(funcs)):
statuses[i] = funcs[i]()["status"] == status
def main() -> None:
assert PROXMOX_HOST is not None
assert PROXMOX_USER is not None
assert PROXMOX_USER_FULL is not None
assert PROXMOX_PASSWORD is not None
proxmox = ProxmoxAPI(
PROXMOX_HOST,
user=PROXMOX_USER_FULL,
password=PROXMOX_PASSWORD,
verify_ssl=False,
)
nodes: list[dict[str, str | int]] = proxmox.nodes.get() # pyright: ignore
nodes = list(filter(lambda node: node["status"] == "online", nodes))
# node = min(nodes, key=lambda node: node['disk'])['node']
node: str = "pve05"
assert len(sys.argv) >= 2
branch: str = sys.argv[1]
match branch:
case "sisyphus":
template_id: int = 374
case "p10":
template_id: int = 520
case "c10f2":
template_id: int = 374
case _:
assert False
prefix: str = PROXMOX_USER + "-test-k8s"
vm_names: dict[str, str] = {
"master": f"{prefix}-master",
"worker1": f"{prefix}-node1",
"worker2": f"{prefix}-node2",
}
# vm_ids = {
# 'master': 500,
# 'worker1': 501,
# 'worker2': 502,
# }
vm_ids: dict[str, int] = {
"master": 510,
"worker1": 511,
"worker2": 512,
}
delete: str = os.environ["DELETE_VMS"]
assert delete in ("0", "1")
if delete != "0":
logger.info("Stopping VMs %s...", vm_ids)
proxmox.nodes(node).qemu(vm_ids["master"]).status.stop.post()
proxmox.nodes(node).qemu(vm_ids["worker1"]).status.stop.post()
proxmox.nodes(node).qemu(vm_ids["worker2"]).status.stop.post()
sleep(15)
logger.info("Deleting VMs %s...", vm_ids)
delete_vm(proxmox, node, vm_ids["master"])
delete_vm(proxmox, node, vm_ids["worker1"])
delete_vm(proxmox, node, vm_ids["worker2"])
sleep(15)
vm_id_master, upid_master = clone_template(
proxmox,
node,
template_id,
newid=vm_ids["master"],
name=vm_names["master"],
)
assert vm_id_master == vm_ids["master"]
vm_id_worker1, upid_worker1 = clone_template(
proxmox,
node,
template_id,
newid=vm_ids["worker1"],
name=vm_names["worker1"],
)
assert vm_id_worker1 == vm_ids["worker1"]
vm_id_worker2, upid_worker2 = clone_template(
proxmox,
node,
template_id,
newid=vm_ids["worker2"],
name=vm_names["worker2"],
)
assert vm_id_worker2 == vm_ids["worker2"]
logger.info("Waiting for cloning to complete...")
wait_status(
[
proxmox.nodes(node).tasks(upid_master).status.get,
proxmox.nodes(node).tasks(upid_worker1).status.get,
proxmox.nodes(node).tasks(upid_worker2).status.get,
], # pyright: ignore
"stopped",
sleep_time=40,
)
logger.info("Cloning completed!")
logger.info("Starting VMs...")
proxmox.nodes(node).qemu(vm_ids["master"]).status.start.post()
proxmox.nodes(node).qemu(vm_ids["worker1"]).status.start.post()
proxmox.nodes(node).qemu(vm_ids["worker2"]).status.start.post()
wait_status(
[
proxmox.nodes(node).qemu(vm_ids["master"]).status.current.get,
proxmox.nodes(node).qemu(vm_ids["worker1"]).status.current.get,
proxmox.nodes(node).qemu(vm_ids["worker2"]).status.current.get,
], # pyright: ignore
"running",
sleep_time=10,
)
sleep(40)
logger.info("VMs are running!")
vm_ip_master: str = get_vm_ip(proxmox, node, vm_ids["master"])
vm_ip_worker1: str = get_vm_ip(proxmox, node, vm_ids["worker1"])
vm_ip_worker2: str = get_vm_ip(proxmox, node, vm_ids["worker2"])
proxmox.logout()
tmp_path: str = "./tmp"
if not os.path.exists(tmp_path):
os.makedirs(tmp_path)
with open(f"{tmp_path}/vm_ids", "w") as ofile:
vm_ids_file = f'{vm_ids["master"]}\t{vm_names["master"]}\n'
vm_ids_file += f'{vm_ids["worker1"]}\t{vm_names["worker1"]}\n'
vm_ids_file += f'{vm_ids["worker2"]}\t{vm_names["worker2"]}\n'
logger.info(vm_ids_file)
ofile.write(vm_ids_file)
with open(f"{tmp_path}/hosts", "w") as ofile:
hosts = f'{vm_ip_master}\t{vm_names["master"]}\n'
hosts += f'{vm_ip_worker1}\t{vm_names["worker1"]}\n'
hosts += f'{vm_ip_worker2}\t{vm_names["worker2"]}\n'
logger.info(hosts)
ofile.write(hosts)
inventory = {
"workers": {
"hosts": {
"worker1": {
"ansible_host": vm_ip_worker1,
},
"worker2": {
"ansible_host": vm_ip_worker2,
},
},
},
"all_vms": {
"hosts": {
"master": {
"ansible_host": vm_ip_master,
},
},
"children": {"workers": None},
"vars": {"ansible_user": "root"},
},
}
with open(f"{tmp_path}/generated_inventory.yaml", "w") as ofile:
yaml.dump(inventory, ofile)
if __name__ == "__main__":
main()