140 lines
3.5 KiB
Python
140 lines
3.5 KiB
Python
import logging
|
|
import time
|
|
|
|
from proxmoxer import ProxmoxAPI
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.INFO)
|
|
|
|
|
|
def create_vm(
|
|
proxmox: ProxmoxAPI,
|
|
node: str | None = None,
|
|
vmid: int | None = None,
|
|
name: str | None = None,
|
|
prefix: str = "stepchenkoas",
|
|
cores: int = 2,
|
|
memory: int = 2048,
|
|
disk_size: int = 16,
|
|
start: int = 1,
|
|
) -> None:
|
|
if node is None:
|
|
nodes = proxmox.nodes.get()
|
|
nodes = list(
|
|
filter(lambda node: node["status"] == "online", nodes)
|
|
) # pyright: ignore
|
|
node = min(nodes, key=lambda node: node["disk"])["node"]
|
|
|
|
if vmid is None:
|
|
vmid = proxmox.cluster.get("nextid")
|
|
|
|
if name is None:
|
|
name = f"{prefix}-{vmid}"
|
|
|
|
proxmox.nodes(node).qemu.post(
|
|
node=node,
|
|
vmid=vmid,
|
|
description="This VM was created automatically through Proxmox VE API",
|
|
cores=cores,
|
|
cpu="host",
|
|
memory=memory,
|
|
name=name,
|
|
net0="virtio,bridge=vmbr0,tag=103,firewall=1",
|
|
ostype="l26",
|
|
pool="Virt_LAB",
|
|
# alt-server-v-10.1-rc3-x86_64.iso
|
|
sata2="templates:iso/alt-server-v-10.1-rc3-x86_64.iso,media=cdrom",
|
|
scsi0=f"rbd-storage:{disk_size},discard=on",
|
|
scsihw="virtio-scsi-pci",
|
|
sockets=1,
|
|
start=start,
|
|
)
|
|
logger.info("VM %s was created successfully!", vmid)
|
|
|
|
|
|
def delete_vm(
|
|
proxmox: ProxmoxAPI,
|
|
node: str,
|
|
vmid: int,
|
|
) -> str:
|
|
upid: str = (
|
|
proxmox.nodes(node)
|
|
.qemu(vmid)
|
|
.delete(
|
|
node=node,
|
|
vmid=vmid,
|
|
)
|
|
)
|
|
logger.info("VM %s was deleted successfully!", vmid)
|
|
return upid
|
|
|
|
|
|
def clone_template(
|
|
proxmox: ProxmoxAPI,
|
|
node: str,
|
|
vmid: int,
|
|
newid: int | None = None,
|
|
name: str | None = None,
|
|
prefix: str = "stepchenkoas",
|
|
) -> tuple[int, str]:
|
|
if newid is None:
|
|
newid = int(proxmox.cluster.get("nextid")) # pyright: ignore
|
|
|
|
if name is None:
|
|
name = f"{prefix}-{newid}"
|
|
|
|
assert type(newid) is int
|
|
assert type(name) is str
|
|
|
|
upid: str = (
|
|
proxmox.nodes(node)
|
|
.qemu(vmid)
|
|
.clone.post(
|
|
newid=newid,
|
|
node=node,
|
|
vmid=vmid,
|
|
format="raw",
|
|
full=1,
|
|
name=name,
|
|
pool="Virt_LAB",
|
|
storage="rbd-storage",
|
|
target=node,
|
|
)
|
|
) # pyright: ignore
|
|
# TODO: check task status
|
|
logger.debug("UPID '%s'", upid)
|
|
logger.info("Cloning of VM %s to %s was initiated successfully!", vmid, newid)
|
|
|
|
return (newid, upid)
|
|
|
|
|
|
def get_vm_ip(
|
|
proxmox: ProxmoxAPI,
|
|
node: str,
|
|
vmid: int,
|
|
) -> str:
|
|
done = False
|
|
vm_ip_address = None
|
|
while not done:
|
|
try:
|
|
res: dict = (
|
|
proxmox.nodes(node).qemu(vmid).agent("network-get-interfaces").get()
|
|
) # pyright: ignore
|
|
for net in res["result"]:
|
|
if net["name"] == "eth0" or net["name"] == "ens19":
|
|
for ip in net["ip-addresses"]:
|
|
if ip["ip-address-type"] == "ipv4":
|
|
vm_ip_address = ip["ip-address"]
|
|
done = True
|
|
except KeyError:
|
|
time.sleep(1)
|
|
except Exception as e:
|
|
raise RuntimeError(
|
|
f"Could not get the ip address of VM {vmid} on node {node}"
|
|
) from e
|
|
|
|
if vm_ip_address is None:
|
|
raise RuntimeError(f"Could not get the ip address of VM {vmid} on node {node}")
|
|
|
|
return vm_ip_address
|