From 1be49a6e0e03582e7a45db8723b190e85ed1b437 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adolfo=20G=C3=B3mez=20Garc=C3=ADa?= Date: Thu, 5 Aug 2021 12:53:44 +0200 Subject: [PATCH] Separated processes manager from main uds_tunnel --- tunnel-server/src/uds_tunnel/processes.py | 99 +++++++++++++++++++++++ tunnel-server/src/udstunnel.py | 68 +--------------- 2 files changed, 103 insertions(+), 64 deletions(-) create mode 100644 tunnel-server/src/uds_tunnel/processes.py diff --git a/tunnel-server/src/uds_tunnel/processes.py b/tunnel-server/src/uds_tunnel/processes.py new file mode 100644 index 00000000..d2db355e --- /dev/null +++ b/tunnel-server/src/uds_tunnel/processes.py @@ -0,0 +1,99 @@ +import multiprocessing +import logging +import typing + +import curio +import psutil + +from . import config + +if typing.TYPE_CHECKING: + from multiprocessing.connection import Connection + from multiprocessing.managers import Namespace + +logger = logging.getLogger(__name__) + +class Processes: + """ + This class is used to store the processes that are used by the tunnel. + """ + + children: typing.List[ + typing.Tuple['Connection', multiprocessing.Process, psutil.Process] + ] + process: typing.Callable + cfg: config.ConfigurationType + ns: 'Namespace' + + def __init__(self, process: typing.Callable, cfg: config.ConfigurationType, ns: 'Namespace') -> None: + self.children = [] + self.process = process # type: ignore + self.cfg = cfg + self.ns = ns + + for i in range(cfg.workers): + self.add_child_pid() + + def add_child_pid(self): + own_conn, child_conn = multiprocessing.Pipe() + task = multiprocessing.Process( + target=curio.run, + args=(self.process, child_conn, self.cfg, self.ns) + ) + task.start() + logger.debug('ADD CHILD PID: %s', task.pid) + self.children.append((own_conn, task, psutil.Process(task.pid))) + + def best_child(self) -> 'Connection': + best: typing.Tuple[float, 'Connection'] = (1000.0, self.children[0][0]) + missingProcesses: typing.List[int] = [] + for i, c in enumerate(self.children): + try: + if c[2].status() == 'zombie': # Bad kill!! + raise psutil.ZombieProcess(c[2].pid) + percent = c[2].cpu_percent() + except (psutil.ZombieProcess, psutil.NoSuchProcess) as e: + # Process is missing... + logger.warning('Missing process found: %s', e.pid) + try: + c[0].close() # Close pipe to missing process + except Exception: + logger.debug('Could not close handle for %s', e.pid) + try: + c[1].kill() + c[1].close() + except Exception: + logger.debug('Could not close process %s', e.pid) + + missingProcesses.append(i) + continue + + logger.debug('PID %s has %s', c[2].pid, percent) + + if percent < best[0]: + best = (percent, c[0]) + + # If we have a missing process, try to add it back + if missingProcesses: + logger.debug('Regenerating missing processes: %s', len(missingProcesses)) + # Regenerate childs and recreate new proceeses for requests... + tmpChilds = [ + self.children[i] + for i in range(len(self.children)) + if i not in missingProcesses + ] + self.children[:] = tmpChilds + # Now add new children + for i in range(len(missingProcesses)): + self.add_child_pid() + + return best[1] + + def stop(self): + # Try to stop running childs + for i in self.children: + try: + i[2].kill() + except Exception as e: + logger.info('KILLING child %s: %s', i[2], e) + \ No newline at end of file diff --git a/tunnel-server/src/udstunnel.py b/tunnel-server/src/udstunnel.py index 505a1752..47a032dc 100755 --- a/tunnel-server/src/udstunnel.py +++ b/tunnel-server/src/udstunnel.py @@ -48,6 +48,7 @@ from uds_tunnel import proxy from uds_tunnel import consts from uds_tunnel import message from uds_tunnel import stats +from uds_tunnel import processes if typing.TYPE_CHECKING: from multiprocessing.connection import Connection @@ -217,72 +218,16 @@ def tunnel_main(): signal.signal(signal.SIGINT, stop_signal) signal.signal(signal.SIGTERM, stop_signal) - # Creates as many processes and pipes as required - child: typing.List[ - typing.Tuple['Connection', multiprocessing.Process, psutil.Process] - ] = [] - stats_collector = stats.GlobalStats() - def add_child_pid(): - own_conn, child_conn = multiprocessing.Pipe() - task = multiprocessing.Process( - target=curio.run, - args=(tunnel_proc_async, child_conn, cfg, stats_collector.ns), - ) - task.start() - logger.debug('ADD CHILD PID: %s', task.pid) - child.append((own_conn, task, psutil.Process(task.pid))) - - for i in range(cfg.workers): - add_child_pid() - - def best_child() -> 'Connection': - best: typing.Tuple[float, 'Connection'] = (1000.0, child[0][0]) - missingProcesses = [] - for i, c in enumerate(child): - try: - if c[2].status() == 'zombie': # Bad kill!! - raise psutil.ZombieProcess(c[2].pid) - percent = c[2].cpu_percent() - except (psutil.ZombieProcess, psutil.NoSuchProcess) as e: - # Process is missing... - logger.warning('Missing process found: %s', e.pid) - try: - c[0].close() # Close pipe to missing process - except Exception: - logger.debug('Could not close handle for %s', e.pid) - try: - c[1].kill() - c[1].close() - except Exception: - logger.debug('Could not close process %s', e.pid) - - missingProcesses.append(i) - continue - - logger.debug('PID %s has %s', c[2].pid, percent) - - if percent < best[0]: - best = (percent, c[0]) - - if missingProcesses: - logger.debug('Regenerating missing processes: %s', len(missingProcesses)) - # Regenerate childs and recreate new proceeses to process requests... - tmpChilds = [child[i] for i in range(len(child)) if i not in missingProcesses] - child[:] = tmpChilds - # Now add new children - for i in range(len(missingProcesses)): - add_child_pid() - - return best[1] + prcs = processes.Processes(tunnel_proc_async, cfg, stats_collector.ns) try: while not do_stop: try: client, addr = sock.accept() # Select BEST process for sending this new connection - best_child().send( + prcs.best_child().send( message.Message(message.Command.TUNNEL, (client, addr)) ) del client # Ensure socket is controlled on child process @@ -297,12 +242,7 @@ def tunnel_main(): if sock: sock.close() - # Try to stop running childs - for i in child: - try: - i[2].kill() - except Exception as e: - logger.info('KILLING child %s: %s', i[2], e) + prcs.stop() try: if cfg.pidfile: