import dataclasses import os import re import socket import select import threading import time import logging import sys import signal import pathlib EXPORTER_LOG = "/exporter-log" # set format for logs if os.environ.get("DEBUG"): logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.DEBUG, handlers=[ logging.FileHandler(EXPORTER_LOG), logging.StreamHandler(sys.stdout) ]) else: logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO, handlers=[ logging.FileHandler(EXPORTER_LOG), logging.StreamHandler(sys.stdout) ]) @dataclasses.dataclass class Environment: FUZZ_REGEXP: str FUZZ_WORKDIR: str FUZZ_LOG_NAME: str HOST: str PORT: int CHECK_INTERVAL: float WAIT_LOG: bool TIMEOUT_WAIT_LOG: float EXPORTER_IGNORE_FILE: str def __init__(self): self.FUZZ_REGEXP = "([a-z_]*)\s*:\s*([-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)" self.FUZZ_WORKDIR = "/workdir" self.FUZZ_LOG_NAME = "fuzzer_stats" self.HOST = "0.0.0.0" self.PORT = 9550 self.CHECK_INTERVAL = 1.0 self.WAIT_LOG = False self.TIMEOUT_WAIT_LOG = 3.0 self.EXPORTER_IGNORE_FILE = "/workdir/.expignore" def check_system_env(self): if os.environ.get("FUZZ_REGEXP"): self.FUZZ_REGEXP = os.environ.get("FUZZ_REGEXP") logging.debug("FUZZ_REGEXP set from system env.") if os.environ.get("FUZZ_WORKDIR"): self.FUZZ_WORKDIR = os.environ.get("FUZZ_WORKDIR") logging.debug("FUZZ_WORKDIR set from system env.") if os.environ.get("FUZZ_LOG_NAME"): self.FUZZ_LOG_NAME = os.environ.get("FUZZ_LOG_NAME") logging.debug("FUZZ_LOG_NAME set from system env.") if os.environ.get("WAIT_LOG"): self.WAIT_LOG = bool(os.environ.get("WAIT_LOG")) logging.debug("WAIT_LOG set from system env.") if os.environ.get("CHECK_INTERVAL"): try: self.CHECK_INTERVAL = float(os.environ.get("CHECK_INTERVAL")) logging.debug("CHECK_INTERVAL set from system env.") except Exception: logging.error("Failed to set CHECK_INTERVAL from system env.") @dataclasses.dataclass class Exporter: LISTEN_HOST: str LISTEN_PORT: int LIST_METRICS: list # In formant - [["metric_name", "metric_value"], ...] # --- PREPARE_METRICS: str def __init__(self, env: Environment): self.LISTEN_HOST = env.HOST self.LISTEN_PORT = env.PORT self.LIST_METRICS = list() self.PREPARE_METRICS = str() def update_metric_list(self, metrics_list: list): # generating output data for Prometheus in the correct web format # without specifying the header, Prometheus cannot receive data s = 'HTTP/1.1 200 OK\n\n' for m in metrics_list: # specifying help info, "." for all metrics # metric type # metric_name metric_value s += f"# HELP {m[0]} .\n" s += f"# TYPE {m[0]} gauge\n" s += f"{m[0]} {m[1]}\n" self.PREPARE_METRICS = s logging.debug("Convert metrics to Prometheus format") def listen(self): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: logging.debug("Socket created") try: sock.bind((self.LISTEN_HOST, self.LISTEN_PORT)) except Exporter: logging.critical("Failed to bind socket") exit(1) logging.debug("Socket bound") if not self.LISTEN_PORT: logging.debug("Listen port - "+str(sock.getsockname()[1])) # Check env_net file. If exist write port value inside. if os.path.exists("/env_net"): with open("/env_net", "w") as env_net: env_net.write("EXPORTER_PORT="+str(sock.getsockname()[1])) logging.debug("File env_net exist. Write port value inside") sock.listen(10) logging.debug('Socket listening') inputs = [sock] outputs = [sock] while True: logging.debug("Waiting for clients...") reads, writes, excepts = select.select(inputs, outputs, inputs) for conn in reads: _conn, client_addr = conn.accept() with _conn as client_conn: ip_addr, client_port = client_addr logging.debug(f'Client connected - {ip_addr}:{client_port}') data = client_conn.recv(1024) logging.debug("Send data") client_conn.send( bytes(self.PREPARE_METRICS, 'utf-8')) # sending a response prepared for Prometheus @dataclasses.dataclass class Parser: PATH_TO_WORKDIR: str LOG_NAME: str REGEXP: str WAIT_LOG: bool TIMEOUT_WAIT_LOG = float IGNORE_LIST: list def __init__(self, env: Environment): self.PATH_TO_WORKDIR = env.FUZZ_WORKDIR self.LOG_NAME = env.FUZZ_LOG_NAME self.REGEXP = env.FUZZ_REGEXP self.WAIT_LOG = env.WAIT_LOG self.TIMEOUT_WAIT_LOG = env.TIMEOUT_WAIT_LOG self.IGNORE_LIST = list() if os.path.exists(self.PATH_TO_WORKDIR+"/"+'.expignore'): with open(self.PATH_TO_WORKDIR+"/"+'.expignore') as f: self.IGNORE_LIST = f.read().splitlines() logging.info(f"Ignored - {self.IGNORE_LIST}") def __check_exists(self, rootPath: str, searchFile: str) -> bool: for root, dirs, files in os.walk(rootPath): if os.path.exists(os.path.join(root, searchFile)): return True return False def __check_valid_dir(self): while True: if os.path.exists(self.PATH_TO_WORKDIR): if not self.__check_exists(rootPath=self.PATH_TO_WORKDIR, searchFile=self.LOG_NAME): if self.WAIT_LOG: logging.warning("Not found fuzzer_stats files. Waiting to be created") time.sleep(self.TIMEOUT_WAIT_LOG) continue else: logging.critical("Not AFL dir") exit(1) else: logging.debug("Check valid dir passed") break else: if self.WAIT_LOG: logging.warning("Dir not exists. Waiting for directory to be created") time.sleep(self.TIMEOUT_WAIT_LOG) continue else: logging.critical("Dir not exists") exit(1) def __check_ignore(self, currentPath: str) -> bool: for ignore in self.IGNORE_LIST: if pathlib.Path(ignore) == pathlib.Path(currentPath): return True return False def __search_fuzz_stats(self) -> list: fuzz_stats = list() for root, dirs, files in os.walk(self.PATH_TO_WORKDIR): if self.LOG_NAME in files: if not self.__check_ignore(currentPath=root[len(self.PATH_TO_WORKDIR) + 1:]): fuzz_stats.append([root[len(self.PATH_TO_WORKDIR) + 1:], os.path.join(root, self.LOG_NAME)]) logging.debug("Get fuzz path - " + str(fuzz_stats)) return fuzz_stats def __get_log(self, path_2log) -> list: with open(path_2log, 'r') as file: return file.readlines() def __re_log(self, search_list: list, sfx: str) -> list: # search by regular expression in list l = list() for search_string in search_list: res = re.findall(self.REGEXP, search_string) if res: for i in res: # creating a nested list with the name of metric [0] and its value [1] # in the name of the metric replace all characters with - "_" for correct display in Prometheus if not i[0] or not i[1]: continue l.append([re.sub("\W", "_", i[0]+"_"+sfx), i[1]]) return l def get_metrics(self): metrics = list() self.__check_valid_dir() for i in self.__search_fuzz_stats(): for j in self.__re_log(self.__get_log(path_2log=i[1]), i[0]): metrics.append(j) logging.debug("Metrics updated") return metrics @dataclasses.dataclass class GracefulKiller: kill_now: bool def __init__(self): self.kill_now = False signal.signal(signal.SIGINT, self.exit_gracefully) signal.signal(signal.SIGTERM, self.exit_gracefully) def exit_gracefully(self, *args): self.kill_now = True if __name__ == "__main__": # set env logging.info("Init environments") env1 = Environment() env1.check_system_env() # init killer by SIGTERM killer = GracefulKiller() # Start listen exporter exporter1 = Exporter(env1) exporter_listen_t = threading.Thread(target=exporter1.listen) exporter_listen_t.setDaemon(True) exporter_listen_t.start() logging.info("Exporter started") # Start update metrics parser1 = Parser(env1) logging.info("Start update metrics") while not killer.kill_now: try: exporter1.update_metric_list(parser1.get_metrics()) time.sleep(env1.CHECK_INTERVAL) # Check exporter thread if not exporter_listen_t.is_alive(): logging.error("Exporter thread is down") exit(1) except Exception as e: logging.warning(f"Stop running by {e}") exit(1)