Separated processes manager from main uds_tunnel

This commit is contained in:
Adolfo Gómez García 2021-08-05 12:53:44 +02:00
parent c21c0b44ce
commit 1be49a6e0e
2 changed files with 103 additions and 64 deletions

View File

@ -0,0 +1,99 @@
import multiprocessing
import logging
import typing
import curio
import psutil
from . import config
if typing.TYPE_CHECKING:
from multiprocessing.connection import Connection
from multiprocessing.managers import Namespace
logger = logging.getLogger(__name__)
class Processes:
"""
This class is used to store the processes that are used by the tunnel.
"""
children: typing.List[
typing.Tuple['Connection', multiprocessing.Process, psutil.Process]
]
process: typing.Callable
cfg: config.ConfigurationType
ns: 'Namespace'
def __init__(self, process: typing.Callable, cfg: config.ConfigurationType, ns: 'Namespace') -> None:
self.children = []
self.process = process # type: ignore
self.cfg = cfg
self.ns = ns
for i in range(cfg.workers):
self.add_child_pid()
def add_child_pid(self):
own_conn, child_conn = multiprocessing.Pipe()
task = multiprocessing.Process(
target=curio.run,
args=(self.process, child_conn, self.cfg, self.ns)
)
task.start()
logger.debug('ADD CHILD PID: %s', task.pid)
self.children.append((own_conn, task, psutil.Process(task.pid)))
def best_child(self) -> 'Connection':
best: typing.Tuple[float, 'Connection'] = (1000.0, self.children[0][0])
missingProcesses: typing.List[int] = []
for i, c in enumerate(self.children):
try:
if c[2].status() == 'zombie': # Bad kill!!
raise psutil.ZombieProcess(c[2].pid)
percent = c[2].cpu_percent()
except (psutil.ZombieProcess, psutil.NoSuchProcess) as e:
# Process is missing...
logger.warning('Missing process found: %s', e.pid)
try:
c[0].close() # Close pipe to missing process
except Exception:
logger.debug('Could not close handle for %s', e.pid)
try:
c[1].kill()
c[1].close()
except Exception:
logger.debug('Could not close process %s', e.pid)
missingProcesses.append(i)
continue
logger.debug('PID %s has %s', c[2].pid, percent)
if percent < best[0]:
best = (percent, c[0])
# If we have a missing process, try to add it back
if missingProcesses:
logger.debug('Regenerating missing processes: %s', len(missingProcesses))
# Regenerate childs and recreate new proceeses for requests...
tmpChilds = [
self.children[i]
for i in range(len(self.children))
if i not in missingProcesses
]
self.children[:] = tmpChilds
# Now add new children
for i in range(len(missingProcesses)):
self.add_child_pid()
return best[1]
def stop(self):
# Try to stop running childs
for i in self.children:
try:
i[2].kill()
except Exception as e:
logger.info('KILLING child %s: %s', i[2], e)

View File

@ -48,6 +48,7 @@ from uds_tunnel import proxy
from uds_tunnel import consts from uds_tunnel import consts
from uds_tunnel import message from uds_tunnel import message
from uds_tunnel import stats from uds_tunnel import stats
from uds_tunnel import processes
if typing.TYPE_CHECKING: if typing.TYPE_CHECKING:
from multiprocessing.connection import Connection from multiprocessing.connection import Connection
@ -217,72 +218,16 @@ def tunnel_main():
signal.signal(signal.SIGINT, stop_signal) signal.signal(signal.SIGINT, stop_signal)
signal.signal(signal.SIGTERM, stop_signal) signal.signal(signal.SIGTERM, stop_signal)
# Creates as many processes and pipes as required
child: typing.List[
typing.Tuple['Connection', multiprocessing.Process, psutil.Process]
] = []
stats_collector = stats.GlobalStats() stats_collector = stats.GlobalStats()
def add_child_pid(): prcs = processes.Processes(tunnel_proc_async, cfg, stats_collector.ns)
own_conn, child_conn = multiprocessing.Pipe()
task = multiprocessing.Process(
target=curio.run,
args=(tunnel_proc_async, child_conn, cfg, stats_collector.ns),
)
task.start()
logger.debug('ADD CHILD PID: %s', task.pid)
child.append((own_conn, task, psutil.Process(task.pid)))
for i in range(cfg.workers):
add_child_pid()
def best_child() -> 'Connection':
best: typing.Tuple[float, 'Connection'] = (1000.0, child[0][0])
missingProcesses = []
for i, c in enumerate(child):
try:
if c[2].status() == 'zombie': # Bad kill!!
raise psutil.ZombieProcess(c[2].pid)
percent = c[2].cpu_percent()
except (psutil.ZombieProcess, psutil.NoSuchProcess) as e:
# Process is missing...
logger.warning('Missing process found: %s', e.pid)
try:
c[0].close() # Close pipe to missing process
except Exception:
logger.debug('Could not close handle for %s', e.pid)
try:
c[1].kill()
c[1].close()
except Exception:
logger.debug('Could not close process %s', e.pid)
missingProcesses.append(i)
continue
logger.debug('PID %s has %s', c[2].pid, percent)
if percent < best[0]:
best = (percent, c[0])
if missingProcesses:
logger.debug('Regenerating missing processes: %s', len(missingProcesses))
# Regenerate childs and recreate new proceeses to process requests...
tmpChilds = [child[i] for i in range(len(child)) if i not in missingProcesses]
child[:] = tmpChilds
# Now add new children
for i in range(len(missingProcesses)):
add_child_pid()
return best[1]
try: try:
while not do_stop: while not do_stop:
try: try:
client, addr = sock.accept() client, addr = sock.accept()
# Select BEST process for sending this new connection # Select BEST process for sending this new connection
best_child().send( prcs.best_child().send(
message.Message(message.Command.TUNNEL, (client, addr)) message.Message(message.Command.TUNNEL, (client, addr))
) )
del client # Ensure socket is controlled on child process del client # Ensure socket is controlled on child process
@ -297,12 +242,7 @@ def tunnel_main():
if sock: if sock:
sock.close() sock.close()
# Try to stop running childs prcs.stop()
for i in child:
try:
i[2].kill()
except Exception as e:
logger.info('KILLING child %s: %s', i[2], e)
try: try:
if cfg.pidfile: if cfg.pidfile: