Added log-waiting functionality
All checks were successful
CI / build (push) Successful in 10s

When set system variable WAIT_LOG, the process of checking for logs goes into an infinite repetition until the log files are found
This commit is contained in:
Даниил Ефремов 2023-10-17 14:07:34 +03:00
parent af76857c0f
commit 2dd8a6077f

View File

@ -31,6 +31,9 @@ class Environment:
FUZZ_LOG_NAME: str FUZZ_LOG_NAME: str
HOST: str HOST: str
PORT: int PORT: int
CHECK_INTERVAL: float
WAIT_LOG: bool
TIMEOUT_WAIT_LOG: float
def __init__(self): def __init__(self):
self.FUZZ_REGEXP = "([a-z_]*)\s*:\s*([-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)" self.FUZZ_REGEXP = "([a-z_]*)\s*:\s*([-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)"
@ -38,6 +41,9 @@ class Environment:
self.FUZZ_LOG_NAME = "fuzzer_stats" self.FUZZ_LOG_NAME = "fuzzer_stats"
self.HOST = "0.0.0.0" self.HOST = "0.0.0.0"
self.PORT = 9550 self.PORT = 9550
self.CHECK_INTERVAL = 1.0
self.WAIT_LOG = False
self.TIMEOUT_WAIT_LOG = 3.0
def check_system_env(self): def check_system_env(self):
if os.environ.get("FUZZ_REGEXP"): if os.environ.get("FUZZ_REGEXP"):
@ -52,6 +58,17 @@ class Environment:
self.FUZZ_LOG_NAME = os.environ.get("FUZZ_LOG_NAME") self.FUZZ_LOG_NAME = os.environ.get("FUZZ_LOG_NAME")
logging.debug("FUZZ_LOG_NAME set from system env.") logging.debug("FUZZ_LOG_NAME set from system env.")
if os.environ.get("WAIT_LOG"):
self.WAIT_LOG = bool(os.environ.get("WAIT_LOG"))
logging.debug("WAIT_LOG set from system env.")
if os.environ.get("CHECK_INTERVAL"):
try:
self.CHECK_INTERVAL = float(os.environ.get("CHECK_INTERVAL"))
logging.debug("CHECK_INTERVAL set from system env.")
except Exception:
logging.error("Failed to set CHECK_INTERVAL from system env.")
@dataclasses.dataclass @dataclasses.dataclass
class Exporter: class Exporter:
@ -124,25 +141,39 @@ class Parser:
PATH_TO_WORKDIR: str PATH_TO_WORKDIR: str
LOG_NAME: str LOG_NAME: str
REGEXP: str REGEXP: str
WAIT_LOG: bool
TIMEOUT_WAIT_LOG = float
def __init__(self, env: Environment): def __init__(self, env: Environment):
self.PATH_TO_WORKDIR = env.FUZZ_WORKDIR self.PATH_TO_WORKDIR = env.FUZZ_WORKDIR
self.LOG_NAME = env.FUZZ_LOG_NAME self.LOG_NAME = env.FUZZ_LOG_NAME
self.REGEXP = env.FUZZ_REGEXP self.REGEXP = env.FUZZ_REGEXP
self.WAIT_LOG = env.WAIT_LOG
self.TIMEOUT_WAIT_LOG = env.TIMEOUT_WAIT_LOG
def __check_valid_dir(self): def __check_valid_dir(self):
while True:
if os.path.exists(self.PATH_TO_WORKDIR): if os.path.exists(self.PATH_TO_WORKDIR):
test = os.walk(self.PATH_TO_WORKDIR) test = os.walk(self.PATH_TO_WORKDIR)
path, dirs, files = next(test) path, dirs, files = next(test)
if (not os.path.exists(path + "/" + self.LOG_NAME) and not dirs or if (not os.path.exists(path + "/" + self.LOG_NAME) and not dirs or
"fuzzer_stats" not in files and not os.path.exists(path + "/" + dirs[0] + "/" + self.LOG_NAME)): "fuzzer_stats" not in files and not os.path.exists(path + "/" + dirs[0] + "/" + self.LOG_NAME)):
if self.WAIT_LOG:
logging.warning("Not found fuzzer_stats files. Waiting to be created")
else:
logging.critical("Not AFL dir") logging.critical("Not AFL dir")
exit(1) exit(1)
else: else:
logging.debug("Valid check passed") logging.debug("Check valid dir passed")
break
else: else:
logging.critical("Not AFL dir") if self.WAIT_LOG:
logging.warning("Dir not exists. Waiting for directory to be created")
time.sleep(self.TIMEOUT_WAIT_LOG)
continue
else:
logging.critical("Dir not exists")
exit(1) exit(1)
def __search_fuzz_stats(self) -> list: def __search_fuzz_stats(self) -> list:
@ -218,7 +249,7 @@ if __name__ == "__main__":
while not killer.kill_now: while not killer.kill_now:
try: try:
exporter1.update_metric_list(parser1.get_metrics()) exporter1.update_metric_list(parser1.get_metrics())
time.sleep(3.0) time.sleep(env1.CHECK_INTERVAL)
# Check exporter thread # Check exporter thread
if not exporter_listen_t.is_alive(): if not exporter_listen_t.is_alive():