All checks were successful
CI / build (push) Successful in 1m8s
Added ability to ignore directories by adding them to .expignore file; Updated way to check the working directory for log - fuzzer_stats.
283 lines
9.8 KiB
Python
283 lines
9.8 KiB
Python
import dataclasses
|
|
import os
|
|
import re
|
|
import socket
|
|
import select
|
|
import threading
|
|
import time
|
|
import logging
|
|
import sys
|
|
import signal
|
|
import pathlib
|
|
|
|
EXPORTER_LOG = "/exporter-log"
|
|
|
|
# set format for logs
|
|
if os.environ.get("DEBUG"):
|
|
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.DEBUG,
|
|
handlers=[
|
|
logging.FileHandler(EXPORTER_LOG), logging.StreamHandler(sys.stdout)
|
|
])
|
|
else:
|
|
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO,
|
|
handlers=[
|
|
logging.FileHandler(EXPORTER_LOG), logging.StreamHandler(sys.stdout)
|
|
])
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Environment:
|
|
FUZZ_REGEXP: str
|
|
FUZZ_WORKDIR: str
|
|
FUZZ_LOG_NAME: str
|
|
HOST: str
|
|
PORT: int
|
|
CHECK_INTERVAL: float
|
|
WAIT_LOG: bool
|
|
TIMEOUT_WAIT_LOG: float
|
|
EXPORTER_IGNORE_FILE: str
|
|
|
|
def __init__(self):
|
|
self.FUZZ_REGEXP = "([a-z_]*)\s*:\s*([-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)"
|
|
self.FUZZ_WORKDIR = "/workdir"
|
|
self.FUZZ_LOG_NAME = "fuzzer_stats"
|
|
self.HOST = "0.0.0.0"
|
|
self.PORT = 9550
|
|
self.CHECK_INTERVAL = 1.0
|
|
self.WAIT_LOG = False
|
|
self.TIMEOUT_WAIT_LOG = 3.0
|
|
self.EXPORTER_IGNORE_FILE = "/workdir/.expignore"
|
|
|
|
def check_system_env(self):
|
|
if os.environ.get("FUZZ_REGEXP"):
|
|
self.FUZZ_REGEXP = os.environ.get("FUZZ_REGEXP")
|
|
logging.debug("FUZZ_REGEXP set from system env.")
|
|
|
|
if os.environ.get("FUZZ_WORKDIR"):
|
|
self.FUZZ_WORKDIR = os.environ.get("FUZZ_WORKDIR")
|
|
logging.debug("FUZZ_WORKDIR set from system env.")
|
|
|
|
if os.environ.get("FUZZ_LOG_NAME"):
|
|
self.FUZZ_LOG_NAME = os.environ.get("FUZZ_LOG_NAME")
|
|
logging.debug("FUZZ_LOG_NAME set from system env.")
|
|
|
|
if os.environ.get("WAIT_LOG"):
|
|
self.WAIT_LOG = bool(os.environ.get("WAIT_LOG"))
|
|
logging.debug("WAIT_LOG set from system env.")
|
|
|
|
if os.environ.get("CHECK_INTERVAL"):
|
|
try:
|
|
self.CHECK_INTERVAL = float(os.environ.get("CHECK_INTERVAL"))
|
|
logging.debug("CHECK_INTERVAL set from system env.")
|
|
except Exception:
|
|
logging.error("Failed to set CHECK_INTERVAL from system env.")
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Exporter:
|
|
LISTEN_HOST: str
|
|
LISTEN_PORT: int
|
|
LIST_METRICS: list # In formant - [["metric_name", "metric_value"], ...]
|
|
# ---
|
|
PREPARE_METRICS: str
|
|
|
|
def __init__(self, env: Environment):
|
|
self.LISTEN_HOST = env.HOST
|
|
self.LISTEN_PORT = env.PORT
|
|
self.LIST_METRICS = list()
|
|
self.PREPARE_METRICS = str()
|
|
|
|
def update_metric_list(self, metrics_list: list):
|
|
# generating output data for Prometheus in the correct web format
|
|
# without specifying the header, Prometheus cannot receive data
|
|
s = 'HTTP/1.1 200 OK\n\n'
|
|
for m in metrics_list:
|
|
# specifying help info, "." for all metrics
|
|
# metric type
|
|
# metric_name metric_value
|
|
s += f"# HELP {m[0]} .\n"
|
|
s += f"# TYPE {m[0]} gauge\n"
|
|
s += f"{m[0]} {m[1]}\n"
|
|
|
|
self.PREPARE_METRICS = s
|
|
logging.debug("Convert metrics to Prometheus format")
|
|
|
|
def listen(self):
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
|
|
|
logging.debug("Socket created")
|
|
|
|
try:
|
|
sock.bind((self.LISTEN_HOST, self.LISTEN_PORT))
|
|
except Exporter:
|
|
logging.critical("Failed to bind socket")
|
|
exit(1)
|
|
|
|
logging.debug("Socket bound")
|
|
if not self.LISTEN_PORT:
|
|
logging.debug("Listen port - "+str(sock.getsockname()[1]))
|
|
# Check env_net file. If exist write port value inside.
|
|
if os.path.exists("/env_net"):
|
|
with open("/env_net", "w") as env_net:
|
|
env_net.write("EXPORTER_PORT="+str(sock.getsockname()[1]))
|
|
logging.debug("File env_net exist. Write port value inside")
|
|
sock.listen(10)
|
|
logging.debug('Socket listening')
|
|
inputs = [sock]
|
|
outputs = [sock]
|
|
while True:
|
|
logging.debug("Waiting for clients...")
|
|
reads, writes, excepts = select.select(inputs, outputs, inputs)
|
|
for conn in reads:
|
|
_conn, client_addr = conn.accept()
|
|
with _conn as client_conn:
|
|
ip_addr, client_port = client_addr
|
|
logging.debug(f'Client connected - {ip_addr}:{client_port}')
|
|
data = client_conn.recv(1024)
|
|
logging.debug("Send data")
|
|
client_conn.send(
|
|
bytes(self.PREPARE_METRICS, 'utf-8')) # sending a response prepared for Prometheus
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class Parser:
|
|
PATH_TO_WORKDIR: str
|
|
LOG_NAME: str
|
|
REGEXP: str
|
|
WAIT_LOG: bool
|
|
TIMEOUT_WAIT_LOG = float
|
|
IGNORE_LIST: list
|
|
|
|
def __init__(self, env: Environment):
|
|
self.PATH_TO_WORKDIR = env.FUZZ_WORKDIR
|
|
self.LOG_NAME = env.FUZZ_LOG_NAME
|
|
self.REGEXP = env.FUZZ_REGEXP
|
|
self.WAIT_LOG = env.WAIT_LOG
|
|
self.TIMEOUT_WAIT_LOG = env.TIMEOUT_WAIT_LOG
|
|
self.IGNORE_LIST = list()
|
|
|
|
if os.path.exists(self.PATH_TO_WORKDIR+"/"+'.expignore'):
|
|
with open(self.PATH_TO_WORKDIR+"/"+'.expignore') as f:
|
|
self.IGNORE_LIST = f.read().splitlines()
|
|
logging.info(f"Ignored - {self.IGNORE_LIST}")
|
|
|
|
def __check_exists(self, rootPath: str, searchFile: str) -> bool:
|
|
for root, dirs, files in os.walk(rootPath):
|
|
if os.path.exists(os.path.join(root, searchFile)):
|
|
return True
|
|
return False
|
|
|
|
def __check_valid_dir(self):
|
|
while True:
|
|
if os.path.exists(self.PATH_TO_WORKDIR):
|
|
if not self.__check_exists(rootPath=self.PATH_TO_WORKDIR, searchFile=self.LOG_NAME):
|
|
if self.WAIT_LOG:
|
|
logging.warning("Not found fuzzer_stats files. Waiting to be created")
|
|
time.sleep(self.TIMEOUT_WAIT_LOG)
|
|
continue
|
|
else:
|
|
logging.critical("Not AFL dir")
|
|
exit(1)
|
|
else:
|
|
logging.debug("Check valid dir passed")
|
|
break
|
|
else:
|
|
if self.WAIT_LOG:
|
|
logging.warning("Dir not exists. Waiting for directory to be created")
|
|
time.sleep(self.TIMEOUT_WAIT_LOG)
|
|
continue
|
|
else:
|
|
logging.critical("Dir not exists")
|
|
exit(1)
|
|
|
|
def __check_ignore(self, currentPath: str) -> bool:
|
|
for ignore in self.IGNORE_LIST:
|
|
if pathlib.Path(ignore) == pathlib.Path(currentPath):
|
|
return True
|
|
return False
|
|
|
|
def __search_fuzz_stats(self) -> list:
|
|
fuzz_stats = list()
|
|
for root, dirs, files in os.walk(self.PATH_TO_WORKDIR):
|
|
if self.LOG_NAME in files:
|
|
if not self.__check_ignore(currentPath=root[len(self.PATH_TO_WORKDIR) + 1:]):
|
|
fuzz_stats.append([root[len(self.PATH_TO_WORKDIR) + 1:], os.path.join(root, self.LOG_NAME)])
|
|
logging.debug("Get fuzz path - " + str(fuzz_stats))
|
|
return fuzz_stats
|
|
|
|
def __get_log(self, path_2log) -> list:
|
|
with open(path_2log, 'r') as file:
|
|
return file.readlines()
|
|
|
|
def __re_log(self, search_list: list, sfx: str) -> list: # search by regular expression in list
|
|
l = list()
|
|
for search_string in search_list:
|
|
res = re.findall(self.REGEXP, search_string)
|
|
if res:
|
|
for i in res:
|
|
# creating a nested list with the name of metric [0] and its value [1]
|
|
# in the name of the metric replace all characters with - "_" for correct display in Prometheus
|
|
if not i[0] or not i[1]:
|
|
continue
|
|
l.append([re.sub("\W", "_", i[0]+"_"+sfx), i[1]])
|
|
return l
|
|
|
|
def get_metrics(self):
|
|
metrics = list()
|
|
|
|
self.__check_valid_dir()
|
|
|
|
for i in self.__search_fuzz_stats():
|
|
for j in self.__re_log(self.__get_log(path_2log=i[1]), i[0]):
|
|
metrics.append(j)
|
|
|
|
logging.debug("Metrics updated")
|
|
return metrics
|
|
|
|
|
|
@dataclasses.dataclass
|
|
class GracefulKiller:
|
|
kill_now: bool
|
|
|
|
def __init__(self):
|
|
self.kill_now = False
|
|
signal.signal(signal.SIGINT, self.exit_gracefully)
|
|
signal.signal(signal.SIGTERM, self.exit_gracefully)
|
|
|
|
def exit_gracefully(self, *args):
|
|
self.kill_now = True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# set env
|
|
logging.info("Init environments")
|
|
env1 = Environment()
|
|
env1.check_system_env()
|
|
|
|
# init killer by SIGTERM
|
|
killer = GracefulKiller()
|
|
|
|
# Start listen exporter
|
|
exporter1 = Exporter(env1)
|
|
exporter_listen_t = threading.Thread(target=exporter1.listen)
|
|
exporter_listen_t.setDaemon(True)
|
|
exporter_listen_t.start()
|
|
logging.info("Exporter started")
|
|
|
|
# Start update metrics
|
|
parser1 = Parser(env1)
|
|
logging.info("Start update metrics")
|
|
while not killer.kill_now:
|
|
try:
|
|
exporter1.update_metric_list(parser1.get_metrics())
|
|
time.sleep(env1.CHECK_INTERVAL)
|
|
|
|
# Check exporter thread
|
|
if not exporter_listen_t.is_alive():
|
|
logging.error("Exporter thread is down")
|
|
exit(1)
|
|
|
|
except Exception as e:
|
|
logging.warning(f"Stop running by {e}")
|
|
exit(1)
|