pve_tests/main.py

184 lines
4.7 KiB
Python

from time import sleep
from typing import Optional
from pathlib import Path
import urllib3
from proxmoxer import ProxmoxAPI
urllib3.disable_warnings()
PROXMOX_HOST = 'pve.office.basealt.ru'
PROXMOX_USER = 'stepchenkoas@BaseALT'
PROXMOX_PASSWORD = Path('pve_secret').read_text().strip()
def create_vm(
proxmox,
node: Optional[str] = None,
vmid: Optional[int] = None,
name: Optional[str] = None,
prefix: str = 'stepchenkoas',
cores: int = 2,
memory: int = 2048,
disk_size: int = 16,
start: int = 1,
):
if node is None:
nodes = proxmox.nodes.get()
nodes = list(filter(lambda node: node['status'] == 'online', nodes))
node = min(nodes, key=lambda node: node['disk'])['node']
if vmid is None:
vmid = proxmox.cluster.get('nextid')
if name is None:
name = f'{prefix}-{vmid}'
proxmox.nodes(node).qemu.post(
node=node,
vmid=vmid,
description='This VM was created automatically through Proxmox VE API',
cores=cores,
cpu='host',
memory=memory,
name=name,
net0='virtio,bridge=vmbr0,tag=103,firewall=1',
ostype='l26',
pool='Virt_LAB',
#alt-server-v-10.1-rc3-x86_64.iso
sata2='templates:iso/alt-server-v-10.1-rc3-x86_64.iso,media=cdrom',
scsi0=f'rbd-storage:{disk_size},discard=on',
scsihw='virtio-scsi-pci',
sockets=1,
start=start,
)
print(f'VM {vmid} was created successfully!')
def delete_vm(
proxmox,
node,
vmid,
):
proxmox.nodes(node).qemu(vmid).delete(
node=node,
vmid=vmid,
)
print(f'VM {vmid} was deleted successfully!')
def clone_template(
proxmox,
node: str,
vmid: int,
newid: Optional[int] = None,
name: Optional[str] = None,
prefix: str ='stepchenkoas',
) -> int:
if newid is None:
newid = proxmox.cluster.get('nextid')
if name is None:
name = f'{prefix}-{newid}'
res = proxmox.nodes(node).qemu(vmid).clone.post(
newid=newid,
node=node,
vmid=vmid,
format='raw',
full=1,
name=name,
pool='Virt_LAB',
storage='rbd-storage',
target=node,
)
print(res)
print(f'VM {vmid} was cloned to {newid} successfully!')
return newid
def get_vm_ip(
proxmox,
node: 'str',
vmid: int,
):
res = proxmox.nodes(node).qemu(vmid).agent('network-get-interfaces').get()
vm_ip_address = None
for net in res['result']:
if net['name'] == 'eth0':
for ip in net['ip-addresses']:
if ip['ip-address-type'] == 'ipv4':
vm_ip_address = ip['ip-address']
return vm_ip_address
def main():
proxmox = ProxmoxAPI(
PROXMOX_HOST, user=PROXMOX_USER, password=PROXMOX_PASSWORD, verify_ssl=False
)
nodes = proxmox.nodes.get()
nodes = list(filter(lambda node: node['status'] == 'online', nodes))
node = min(nodes, key=lambda node: node['disk'])['node']
template_id = 374
prefix = 'stepchenkaos-test-k8s'
vm_id_master = clone_template(
proxmox,
node,
template_id,
name=f'{prefix}-master',
)
proxmox.nodes(node).qemu(vm_id_master).status.start.post()
vm_id_worker1 = clone_template(
proxmox,
node,
template_id,
name=f'{prefix}-node1',
)
proxmox.nodes(node).qemu(vm_id_worker1).status.start.post()
vm_id_worker2 = clone_template(
proxmox,
node,
template_id,
name=f'{prefix}-node2',
)
proxmox.nodes(node).qemu(vm_id_worker2).status.start.post()
vms_statuses = [False, False, False]
print(vms_statuses)
while not all(vms_statuses):
sleep(5)
vms_statuses[0] = proxmox.nodes(node).qemu(vm_id_master).status.current.get()['status'] == 'running'
vms_statuses[1] = proxmox.nodes(node).qemu(vm_id_worker1).status.current.get()['status'] == 'running'
vms_statuses[2] = proxmox.nodes(node).qemu(vm_id_worker2).status.current.get()['status'] == 'running'
print(vms_statuses)
sleep(60)
vm_ip_master = get_vm_ip(proxmox, node, vm_id_master)
vm_ip_worker1 = get_vm_ip(proxmox, node, vm_id_worker1)
vm_ip_worker2 = get_vm_ip(proxmox, node, vm_id_worker2)
with open('vm_ids' ,'w') as ofile:
vm_ids = f'{vm_id_master}\tmaster\n'
vm_ids += f'{vm_id_worker1}\tworker1\n'
vm_ids += f'{vm_id_worker2}\tworker2\n'
ofile.write(vm_ids)
with open('hosts' ,'w') as ofile:
hosts = f'{vm_ip_master}\tmaster\n'
hosts += f'{vm_ip_worker1}\tworker1\n'
hosts += f'{vm_ip_worker2}\tworker2\n'
ofile.write(hosts)
proxmox.logout()
if __name__ == '__main__':
main()