#!/usr/bin/python3 import os import logging from typing import Callable from time import sleep import yaml import urllib3 from dotenv import load_dotenv from proxmoxer import ProxmoxAPI from functions import clone_template, delete_vm, get_vm_ip urllib3.disable_warnings() FORMAT = "%(asctime)s %(name)s %(levelname)s %(message)s" logging.basicConfig(format=FORMAT) logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) load_dotenv() PROXMOX_HOST: str | None = os.environ.get('PROXMOX_HOST') PROXMOX_USER: str | None = os.environ.get('PROXMOX_USER') PROXMOX_USER_FULL: str | None = os.environ.get('PROXMOX_USER_FULL') PROXMOX_PASSWORD: str | None = os.environ.get('PROXMOX_PASSWORD') def wait_status(funcs: list[Callable[[], dict[str, str]]], status: str, sleep_time: float = 60) -> None: statuses = [False] * len(funcs) while not all(statuses): logger.debug(statuses) sleep(sleep_time) for i in range(len(funcs)): statuses[i] = funcs[i]()['status'] == status def main() -> None: assert PROXMOX_HOST is not None assert PROXMOX_USER is not None assert PROXMOX_USER_FULL is not None assert PROXMOX_PASSWORD is not None proxmox = ProxmoxAPI( PROXMOX_HOST, user=PROXMOX_USER_FULL, password=PROXMOX_PASSWORD, verify_ssl=False ) nodes: list[dict[str, str | int]] = proxmox.nodes.get() # pyright: ignore nodes = list(filter(lambda node: node['status'] == 'online', nodes)) # node = min(nodes, key=lambda node: node['disk'])['node'] node: str = 'pve05' template_id: int = 374 prefix: str = PROXMOX_USER + '-test-k8s' vm_names: dict[str, str] = { 'master': f'{prefix}-master', 'worker1': f'{prefix}-node1', 'worker2': f'{prefix}-node2', } # vm_ids = { # 'master': 500, # 'worker1': 501, # 'worker2': 502, # } vm_ids: dict[str, int] = { 'master': 510, 'worker1': 511, 'worker2': 512, } delete: str = os.environ['DELETE_VMS'] assert delete in ('0', '1') if delete != '0': logger.info('Stopping VMs %s...', vm_ids) proxmox.nodes(node).qemu(vm_ids['master']).status.stop.post() proxmox.nodes(node).qemu(vm_ids['worker1']).status.stop.post() proxmox.nodes(node).qemu(vm_ids['worker2']).status.stop.post() sleep(15) logger.info('Deleting VMs %s...', vm_ids) delete_vm(proxmox, node, vm_ids['master']) delete_vm(proxmox, node, vm_ids['worker1']) delete_vm(proxmox, node, vm_ids['worker2']) sleep(15) vm_id_master, upid_master = clone_template( proxmox, node, template_id, newid=vm_ids['master'], name=vm_names['master'], ) assert vm_id_master == vm_ids['master'] vm_id_worker1, upid_worker1 = clone_template( proxmox, node, template_id, newid=vm_ids['worker1'], name=vm_names['worker1'], ) assert vm_id_worker1 == vm_ids['worker1'] vm_id_worker2, upid_worker2 = clone_template( proxmox, node, template_id, newid=vm_ids['worker2'], name=vm_names['worker2'], ) assert vm_id_worker2 == vm_ids['worker2'] logger.info('Waiting for cloning to complete...') wait_status( [ proxmox.nodes(node).tasks(upid_master).status.get, proxmox.nodes(node).tasks(upid_worker1).status.get, proxmox.nodes(node).tasks(upid_worker2).status.get, ], # pyright: ignore 'stopped', sleep_time=60, ) logger.info('Cloning completed!') logger.info('Starting VMs...') proxmox.nodes(node).qemu(vm_ids['master']).status.start.post() proxmox.nodes(node).qemu(vm_ids['worker1']).status.start.post() proxmox.nodes(node).qemu(vm_ids['worker2']).status.start.post() wait_status( [ proxmox.nodes(node).qemu(vm_ids['master']).status.current.get, proxmox.nodes(node).qemu(vm_ids['worker1']).status.current.get, proxmox.nodes(node).qemu(vm_ids['worker2']).status.current.get, ], # pyright: ignore 'running', sleep_time=10, ) sleep(40) logger.info('VMs are running!') vm_ip_master: str = get_vm_ip(proxmox, node, vm_ids['master']) vm_ip_worker1: str = get_vm_ip(proxmox, node, vm_ids['worker1']) vm_ip_worker2: str = get_vm_ip(proxmox, node, vm_ids['worker2']) proxmox.logout() tmp_path: str = './tmp' if not os.path.exists(tmp_path): os.makedirs(tmp_path) with open(f'{tmp_path}/vm_ids' ,'w') as ofile: vm_ids_file = f'{vm_ids["master"]}\t{vm_names["master"]}\n' vm_ids_file += f'{vm_ids["worker1"]}\t{vm_names["worker1"]}\n' vm_ids_file += f'{vm_ids["worker2"]}\t{vm_names["worker2"]}\n' logger.info(vm_ids_file) ofile.write(vm_ids_file) with open(f'{tmp_path}/hosts' ,'w') as ofile: hosts = f'{vm_ip_master}\t{vm_names["master"]}\n' hosts += f'{vm_ip_worker1}\t{vm_names["worker1"]}\n' hosts += f'{vm_ip_worker2}\t{vm_names["worker2"]}\n' logger.info(hosts) ofile.write(hosts) inventory = { 'workers': { 'hosts': { 'worker1': { 'ansible_host': vm_ip_worker1, 'ansible_user': 'root', }, 'worker2': { 'ansible_host': vm_ip_worker2, 'ansible_user': 'root', }, }, }, 'all_vms': { 'hosts': { 'master': { 'ansible_host': vm_ip_master, 'ansible_user': 'root', }, }, 'children': { 'workers': None }, }, } with open(f'{tmp_path}/generated_inventory.yaml', 'w') as ofile: yaml.dump(inventory, ofile) if __name__ == '__main__': main()