aflplusplus-exporter/aflplusplus-exporter.py
efremovda 728095125d
All checks were successful
CI / build (push) Successful in 1m8s
Added ignoring directory and updated log search
Added ability to ignore directories by adding them to .expignore file;
Updated way to check the working directory for log - fuzzer_stats.
2023-11-03 12:15:56 +03:00

283 lines
9.8 KiB
Python

import dataclasses
import os
import re
import socket
import select
import threading
import time
import logging
import sys
import signal
import pathlib
EXPORTER_LOG = "/exporter-log"
# set format for logs
if os.environ.get("DEBUG"):
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.DEBUG,
handlers=[
logging.FileHandler(EXPORTER_LOG), logging.StreamHandler(sys.stdout)
])
else:
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO,
handlers=[
logging.FileHandler(EXPORTER_LOG), logging.StreamHandler(sys.stdout)
])
@dataclasses.dataclass
class Environment:
FUZZ_REGEXP: str
FUZZ_WORKDIR: str
FUZZ_LOG_NAME: str
HOST: str
PORT: int
CHECK_INTERVAL: float
WAIT_LOG: bool
TIMEOUT_WAIT_LOG: float
EXPORTER_IGNORE_FILE: str
def __init__(self):
self.FUZZ_REGEXP = "([a-z_]*)\s*:\s*([-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)"
self.FUZZ_WORKDIR = "/workdir"
self.FUZZ_LOG_NAME = "fuzzer_stats"
self.HOST = "0.0.0.0"
self.PORT = 9550
self.CHECK_INTERVAL = 1.0
self.WAIT_LOG = False
self.TIMEOUT_WAIT_LOG = 3.0
self.EXPORTER_IGNORE_FILE = "/workdir/.expignore"
def check_system_env(self):
if os.environ.get("FUZZ_REGEXP"):
self.FUZZ_REGEXP = os.environ.get("FUZZ_REGEXP")
logging.debug("FUZZ_REGEXP set from system env.")
if os.environ.get("FUZZ_WORKDIR"):
self.FUZZ_WORKDIR = os.environ.get("FUZZ_WORKDIR")
logging.debug("FUZZ_WORKDIR set from system env.")
if os.environ.get("FUZZ_LOG_NAME"):
self.FUZZ_LOG_NAME = os.environ.get("FUZZ_LOG_NAME")
logging.debug("FUZZ_LOG_NAME set from system env.")
if os.environ.get("WAIT_LOG"):
self.WAIT_LOG = bool(os.environ.get("WAIT_LOG"))
logging.debug("WAIT_LOG set from system env.")
if os.environ.get("CHECK_INTERVAL"):
try:
self.CHECK_INTERVAL = float(os.environ.get("CHECK_INTERVAL"))
logging.debug("CHECK_INTERVAL set from system env.")
except Exception:
logging.error("Failed to set CHECK_INTERVAL from system env.")
@dataclasses.dataclass
class Exporter:
LISTEN_HOST: str
LISTEN_PORT: int
LIST_METRICS: list # In formant - [["metric_name", "metric_value"], ...]
# ---
PREPARE_METRICS: str
def __init__(self, env: Environment):
self.LISTEN_HOST = env.HOST
self.LISTEN_PORT = env.PORT
self.LIST_METRICS = list()
self.PREPARE_METRICS = str()
def update_metric_list(self, metrics_list: list):
# generating output data for Prometheus in the correct web format
# without specifying the header, Prometheus cannot receive data
s = 'HTTP/1.1 200 OK\n\n'
for m in metrics_list:
# specifying help info, "." for all metrics
# metric type
# metric_name metric_value
s += f"# HELP {m[0]} .\n"
s += f"# TYPE {m[0]} gauge\n"
s += f"{m[0]} {m[1]}\n"
self.PREPARE_METRICS = s
logging.debug("Convert metrics to Prometheus format")
def listen(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
logging.debug("Socket created")
try:
sock.bind((self.LISTEN_HOST, self.LISTEN_PORT))
except Exporter:
logging.critical("Failed to bind socket")
exit(1)
logging.debug("Socket bound")
if not self.LISTEN_PORT:
logging.debug("Listen port - "+str(sock.getsockname()[1]))
# Check env_net file. If exist write port value inside.
if os.path.exists("/env_net"):
with open("/env_net", "w") as env_net:
env_net.write("EXPORTER_PORT="+str(sock.getsockname()[1]))
logging.debug("File env_net exist. Write port value inside")
sock.listen(10)
logging.debug('Socket listening')
inputs = [sock]
outputs = [sock]
while True:
logging.debug("Waiting for clients...")
reads, writes, excepts = select.select(inputs, outputs, inputs)
for conn in reads:
_conn, client_addr = conn.accept()
with _conn as client_conn:
ip_addr, client_port = client_addr
logging.debug(f'Client connected - {ip_addr}:{client_port}')
data = client_conn.recv(1024)
logging.debug("Send data")
client_conn.send(
bytes(self.PREPARE_METRICS, 'utf-8')) # sending a response prepared for Prometheus
@dataclasses.dataclass
class Parser:
PATH_TO_WORKDIR: str
LOG_NAME: str
REGEXP: str
WAIT_LOG: bool
TIMEOUT_WAIT_LOG = float
IGNORE_LIST: list
def __init__(self, env: Environment):
self.PATH_TO_WORKDIR = env.FUZZ_WORKDIR
self.LOG_NAME = env.FUZZ_LOG_NAME
self.REGEXP = env.FUZZ_REGEXP
self.WAIT_LOG = env.WAIT_LOG
self.TIMEOUT_WAIT_LOG = env.TIMEOUT_WAIT_LOG
self.IGNORE_LIST = list()
if os.path.exists(self.PATH_TO_WORKDIR+"/"+'.expignore'):
with open(self.PATH_TO_WORKDIR+"/"+'.expignore') as f:
self.IGNORE_LIST = f.read().splitlines()
logging.info(f"Ignored - {self.IGNORE_LIST}")
def __check_exists(self, rootPath: str, searchFile: str) -> bool:
for root, dirs, files in os.walk(rootPath):
if os.path.exists(os.path.join(root, searchFile)):
return True
return False
def __check_valid_dir(self):
while True:
if os.path.exists(self.PATH_TO_WORKDIR):
if not self.__check_exists(rootPath=self.PATH_TO_WORKDIR, searchFile=self.LOG_NAME):
if self.WAIT_LOG:
logging.warning("Not found fuzzer_stats files. Waiting to be created")
time.sleep(self.TIMEOUT_WAIT_LOG)
continue
else:
logging.critical("Not AFL dir")
exit(1)
else:
logging.debug("Check valid dir passed")
break
else:
if self.WAIT_LOG:
logging.warning("Dir not exists. Waiting for directory to be created")
time.sleep(self.TIMEOUT_WAIT_LOG)
continue
else:
logging.critical("Dir not exists")
exit(1)
def __check_ignore(self, currentPath: str) -> bool:
for ignore in self.IGNORE_LIST:
if pathlib.Path(ignore) == pathlib.Path(currentPath):
return True
return False
def __search_fuzz_stats(self) -> list:
fuzz_stats = list()
for root, dirs, files in os.walk(self.PATH_TO_WORKDIR):
if self.LOG_NAME in files:
if not self.__check_ignore(currentPath=root[len(self.PATH_TO_WORKDIR) + 1:]):
fuzz_stats.append([root[len(self.PATH_TO_WORKDIR) + 1:], os.path.join(root, self.LOG_NAME)])
logging.debug("Get fuzz path - " + str(fuzz_stats))
return fuzz_stats
def __get_log(self, path_2log) -> list:
with open(path_2log, 'r') as file:
return file.readlines()
def __re_log(self, search_list: list, sfx: str) -> list: # search by regular expression in list
l = list()
for search_string in search_list:
res = re.findall(self.REGEXP, search_string)
if res:
for i in res:
# creating a nested list with the name of metric [0] and its value [1]
# in the name of the metric replace all characters with - "_" for correct display in Prometheus
if not i[0] or not i[1]:
continue
l.append([re.sub("\W", "_", i[0]+"_"+sfx), i[1]])
return l
def get_metrics(self):
metrics = list()
self.__check_valid_dir()
for i in self.__search_fuzz_stats():
for j in self.__re_log(self.__get_log(path_2log=i[1]), i[0]):
metrics.append(j)
logging.debug("Metrics updated")
return metrics
@dataclasses.dataclass
class GracefulKiller:
kill_now: bool
def __init__(self):
self.kill_now = False
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)
def exit_gracefully(self, *args):
self.kill_now = True
if __name__ == "__main__":
# set env
logging.info("Init environments")
env1 = Environment()
env1.check_system_env()
# init killer by SIGTERM
killer = GracefulKiller()
# Start listen exporter
exporter1 = Exporter(env1)
exporter_listen_t = threading.Thread(target=exporter1.listen)
exporter_listen_t.setDaemon(True)
exporter_listen_t.start()
logging.info("Exporter started")
# Start update metrics
parser1 = Parser(env1)
logging.info("Start update metrics")
while not killer.kill_now:
try:
exporter1.update_metric_list(parser1.get_metrics())
time.sleep(env1.CHECK_INTERVAL)
# Check exporter thread
if not exporter_listen_t.is_alive():
logging.error("Exporter thread is down")
exit(1)
except Exception as e:
logging.warning(f"Stop running by {e}")
exit(1)