diff --git a/tunnel-server/src/uds_tunnel/tunnel.py b/tunnel-server/src/uds_tunnel/tunnel.py index ad014a07f..903a773d1 100644 --- a/tunnel-server/src/uds_tunnel/tunnel.py +++ b/tunnel-server/src/uds_tunnel/tunnel.py @@ -175,8 +175,6 @@ class TunnelProtocol(asyncio.Protocol): else: raise Exception('Invalid command') except Exception: - if consts.DEBUG: - logger.exception('COMMAND') logger.error('ERROR from %s', self.pretty_source()) self.transport.write(b'ERROR_COMMAND') self.close_connection() diff --git a/tunnel-server/src/udstunnel.conf b/tunnel-server/src/udstunnel.conf index 60b632843..723128c41 100644 --- a/tunnel-server/src/udstunnel.conf +++ b/tunnel-server/src/udstunnel.conf @@ -22,7 +22,7 @@ lognumber = 3 address = 0.0.0.0 # Number of workers. Defaults to 0 (means "as much as cores") -workers = 1 +workers = 2 # Listening port port = 7777 diff --git a/tunnel-server/src/udstunnel.py b/tunnel-server/src/udstunnel.py index 91de6ce44..5d5b494f4 100755 --- a/tunnel-server/src/udstunnel.py +++ b/tunnel-server/src/udstunnel.py @@ -38,6 +38,7 @@ import signal import ssl import socket import logging +import threading import typing import setproctitle @@ -103,32 +104,13 @@ async def tunnel_proc_async( def get_socket() -> typing.Optional[socket.socket]: try: - data: bytes = b'' while True: # Clear back event, for next data msg: typing.Optional[ typing.Tuple[socket.socket, typing.Tuple[str, int]] ] = pipe.recv() if msg: - # Connection done, check for handshake - source: socket.socket - source, address = msg - - try: - # First, ensure handshake (simple handshake) and command - data = source.recv(len(consts.HANDSHAKE_V1)) - - if data != consts.HANDSHAKE_V1: - raise Exception() # Invalid handshake - except Exception: - if consts.DEBUG: - logger.exception('HANDSHAKE') - logger.error('HANDSHAKE invalid from %s (%s)', address, data.hex()) - # Close Source and continue - source.close() - continue - - return source + return msg[0] except Exception: logger.exception('Receiving data from parent process') return None @@ -168,15 +150,35 @@ async def tunnel_proc_async( del tasks[:tasks_number] +def process_connection( + client: socket.socket, addr: typing.Tuple[str, str], conn: 'Connection' +) -> None: + data: bytes = b'' + try: + # First, ensure handshake (simple handshake) and command + data = client.recv(len(consts.HANDSHAKE_V1)) + + if data != consts.HANDSHAKE_V1: + raise Exception() # Invalid handshake + conn.send((client, addr)) + del client # Ensure socket is controlled on child process + except Exception: + logger.error('HANDSHAKE invalid from %s (%s)', addr, data.hex()) + # Close Source and continue + client.close() + + def tunnel_main(): cfg = config.read() # Try to bind to port as running user # Wait for socket incoming connections and spread them + socket.setdefaulttimeout( + 3.0 + ) # So we can check for stop from time to time and not block forever sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - sock.settimeout(3.0) # So we can check for stop from time to time # We will not reuse port, we only want a UDS tunnel server running on a port # but this may change on future... # try: @@ -223,11 +225,14 @@ def tunnel_main(): while not do_stop: try: client, addr = sock.accept() - client.settimeout(3.0) logger.info('CONNECTION from %s', addr) - # Select BEST process for sending this new connection - prcs.best_child().send((client, addr)) - del client # Ensure socket is controlled on child process + + # Check if we have reached the max number of connections + # First part is checked on a thread, if HANDSHAKE is valid + # we will send socket to process pool + threading.Thread( + target=process_connection, args=(client, addr, prcs.best_child()) + ).start() except socket.timeout: pass # Continue and retry except Exception as e: