1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-10 01:17:59 +03:00

Backported 4.0 tunnel server

This commit is contained in:
Adolfo Gómez García 2022-12-10 21:45:07 +01:00
parent d17dae8bdd
commit dba2526ffb
No known key found for this signature in database
GPG Key ID: DD1ABF20724CDA23
22 changed files with 1351 additions and 345 deletions

8
tunnel-server/pytest.ini Normal file
View File

@ -0,0 +1,8 @@
[pytest]
addopts = "-s"
pythonpath = ./src
python_files = tests.py test_*.py *_tests.py
log_format = "%(asctime)s %(levelname)s %(message)s"
log_date_format = "%Y-%m-%d %H:%M:%S"
log_cli = true
log_level = info

View File

@ -1,2 +1 @@
curio>=1.4
psutil>=5.7.3

View File

@ -0,0 +1,139 @@
#!/usr/bin/env python3
import asyncio
import ssl
import logging
import socket
import typing
if typing.TYPE_CHECKING:
import asyncio.transports
logger = logging.getLogger(__name__)
BACKLOG = 100
STATE_UNINITIALIZED = 0
STATE_COMMAND = 1
STATE_PROXY = 2
COMMAND_LENGTH = 4
TICKET_LENGTH = 48
# Protocol
class TunnelProtocol(asyncio.Protocol):
# future to mark eof
finished: asyncio.Future
transport: 'asyncio.transports.Transport'
other_side: 'TunnelProtocol'
state: int
cmd: bytes
def __init__(self, other_side: typing.Optional['TunnelProtocol'] = None) -> None:
# If no other side is given, we are the server part
super().__init__()
self.state = STATE_UNINITIALIZED
self.other_side = (
other_side if other_side else self
) # No other side, self is just a placeholder
# transport is undefined until conne
self.finished = asyncio.Future()
self.cmd = b''
def connection_made(self, transport: 'asyncio.transports.BaseTransport') -> None:
logger.debug('Connection made: %s', transport.get_extra_info('peername'))
# Update state based on if we are the client or server
self.state = STATE_COMMAND if self.other_side is self else STATE_PROXY
# We know for sure that the transport is a Transport.
self.transport = typing.cast('asyncio.transports.Transport', transport)
self.cmd = b''
def process_command(self, data: bytes) -> None:
self.cmd += data
if len(self.cmd) >= COMMAND_LENGTH:
logger.debug('Command received: %s', self.cmd)
if self.cmd[:4] == b'OPEN':
# Open Command has the ticket behind it
if len(self.cmd) < TICKET_LENGTH + COMMAND_LENGTH:
return # Wait for more data
# log the ticket
logger.debug('Ticket received: %s', self.cmd[4:4+TICKET_LENGTH])
loop = asyncio.get_event_loop()
async def open_other_side() -> None:
try:
(transport, protocol) = await loop.create_connection(
lambda: TunnelProtocol(self), 'www.google.com', 80
)
self.other_side = typing.cast('TunnelProtocol', protocol)
self.other_side.transport.write(
b'GET / HTTP/1.0\r\nHost: www.google.com\r\n\r\n'
)
except Exception as e:
logger.error('Error opening connection: %s', e)
# Send error to client
self.transport.write(b'ERR')
self.transport.close()
loop.create_task(open_other_side())
self.state = STATE_PROXY
def process_proxy(self, data: bytes) -> None:
logger.debug('Processing proxy: %s', len(data))
self.other_side.transport.write(data)
def data_received(self, data: bytes):
logger.debug('Data received: %s', len(data))
if self.state == STATE_COMMAND:
self.process_command(data)
elif self.state == STATE_PROXY:
self.process_proxy(data)
else:
logger.error('Invalid state reached!')
def connection_lost(self, exc: typing.Optional[Exception]) -> None:
logger.debug('Connection closed : %s', exc)
self.finished.set_result(True)
if self.other_side is not self:
self.other_side.transport.close()
async def main():
# Init logger
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain('tests/testing.pem', 'tests/testing.key')
# Create a server
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.settimeout(444.0) # So we can check for stop from time to time
sock.bind(('0.0.0.0', 7777))
sock.listen(BACKLOG)
loop = asyncio.get_running_loop()
# Accepts connections
client, addr = sock.accept()
logger.debug('Accepted connection')
data = client.recv(4)
print(data)
# Upgrade connection to SSL, and use asyncio to handle the rest
transport: 'asyncio.transports.Transport'
protocol: TunnelProtocol
(transport, protocol) = await loop.connect_accepted_socket( # type: ignore
lambda: TunnelProtocol(), client, ssl=context
)
await protocol.finished
if __name__ == '__main__':
asyncio.run(main())

View File

@ -0,0 +1,49 @@
#!/usr/bin/env python3
import socket
import ssl
import logging
import socket
import typing
logger = logging.getLogger(__name__)
def main():
# Init logger
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Open socket to localhost port 7777, TCP
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.settimeout(66.0)
sock.connect(('127.0.0.1', 7777))
sock.sendall(b'ABCD')
# Upgrade to ssl
ssl_sock = context.wrap_socket(sock, server_side=False)
# Write test OPEN, split on 2 sends
ssl_sock.sendall(b'OPEN')
ssl_sock.sendall(b'X'*48)
# Read response
while True:
data = ssl_sock.recv(16384)
if not data:
break
print(data)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,28 @@
#!/usr/bin/env python3
# -*- coding=utf-8 -*-
import sys
import asyncio
import logging
import typing
if typing.TYPE_CHECKING:
from asyncio.streams import StreamReader, StreamWriter
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection('127.0.0.1', 7777)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))

View File

@ -0,0 +1,41 @@
#!/usr/bin/env python3
# -*- coding=utf-8 -*-
from functools import WRAPPER_ASSIGNMENTS
import ssl
import asyncio
import logging
import typing
import certifi # In order to get valid ca certificates
if typing.TYPE_CHECKING:
from asyncio.streams import StreamReader, StreamWriter
async def tcp_echo_client():
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.load_verify_locations(certifi.where())
reader, writer = await asyncio.open_connection('fake.udsenterprise.com', 7777, ssl=context)
writer.write(b'\x5AMGB\xA5\x01\x00')
await writer.drain()
writer.write(b'OPEN' + b'1'*63 + b'\xA0')
await writer.drain()
writer.write(b'GET / HTTP/1.0\r\n\r\n')
while True:
data = await reader.read(1024)
if not data:
break
print(f'Received: {data!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client())

View File

@ -1,6 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 Virtual Cable S.L.U.
# Copyright (c) 2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -28,18 +29,22 @@
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import enum
import socket
import typing
class Command(enum.IntEnum):
TUNNEL = 0
STATS = 1
import curio
class Message:
command: Command
connection: typing.Optional[typing.Tuple[socket.socket, typing.Any]]
def __init__(self, command: Command, connection: typing.Optional[typing.Tuple[socket.socket, typing.Any]]):
self.command = command
self.connection = connection
BUFFER_SIZE = 1024
async def echo_server(client, address) -> None:
print(f'Connection fro {address!r}')
while True:
data = await client.recv(BUFFER_SIZE)
if not data:
break
print(f'received {data}')
await client.sendall(data)
print('Closed')
if __name__ == '__main__':
curio.run(curio.tcp_server, 'localhost', 7777, echo_server)

View File

@ -0,0 +1,117 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import multiprocessing
import typing
import socket
import curio
import curio.ssl
import curio.io
BUFFER_SIZE = 1024
if typing.TYPE_CHECKING:
from multiprocessing.connection import Connection
def get_socket(pipe: 'Connection') -> typing.Any:
sock, address = pipe.recv()
print(f'Sock: {sock}, f{address}')
return (sock, address)
async def echo_server_async(pipe: 'Connection'):
async def run_server(pipe: 'Connection', group: curio.TaskGroup) -> None:
while True:
sock, address = await curio.run_in_thread(get_socket, pipe)
await group.spawn(echo_server, sock, address)
del sock
async with curio.TaskGroup() as tg:
await tg.spawn(run_server, pipe, tg)
# Reap all of the children tasks as they complete
async for task in tg:
print(f'Deleting {task!r}')
task.joined = True
del task
async def echo_server(iclient, address) -> None:
print(f'Connection from {address!r}')
context = curio.ssl.SSLContext(curio.ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain('testing.pem', 'testing.key')
context.set_ciphers('ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512:ECDHE-RSA-AES256-GCM-SHA384:DHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-SHA384')
client = await context.wrap_socket(curio.io.Socket(iclient), server_side=True)
while True:
data = await client.recv(BUFFER_SIZE)
if not data:
break
print(f'received {data}')
await client.sendall(data)
print('Closed')
def main():
own_conn, child_conn = multiprocessing.Pipe()
task = multiprocessing.Process(target=curio.run, args=(echo_server_async, child_conn,))
task.start()
host, port = 'fake.udsenterprise.com', 7777
backlog = 100
sock = None
try:
# Wait for socket incoming connections and spread them
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, True)
except (AttributeError, OSError) as e:
# log.warning('reuse_port=True option failed', exc_info=True)
pass
sock.bind((host, port))
sock.listen(backlog)
while True:
print('Waiting...')
client, addr = sock.accept()
print('Sending...')
own_conn.send((client, addr))
except Exception:
pass
if sock:
sock.close()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,27 @@
#!/usr/bin/env python3
# -*- coding=utf-8 -*-
from socket import socket
import ssl
import typing
from multiprocessing import Process, Queue
def handle(q: 'Queue[socket]'):
qsock = q.get()
context = ssl.create_default_context()
ssock = context.wrap_socket(qsock, server_hostname='httpbin.org')
ssock.send(b'GET /get\r\n')
print('rest:', ssock.recv(10048))
if __name__ == '__main__':
sock = socket()
sock.connect(('httpbin.org', 443))
q: 'Queue[socket]' = Queue()
proc = Process(target=handle, args=(q,))
proc.start()
# use the function from above to serialize socket
q.put(sock)
proc.join()

View File

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCnBY9+BzkfBb6P
zvW9OsL+sbO5nQBTijsiODPsF5awGwMsEbz5eXh8fmrIWJA2Su0zbjAZn0V7emdp
xh8YvmWd8O1fJWcYzVE+oGweoQEg3j3EaDGhkYHsKzQHV7vYA45Nst8oNBOYStDK
XBur3z+TfW/BH5oLBWiU9qHMcQNwCKTORKMOdk04/L6BVf8wMB6uMGntXrBhCeIG
Zcjjh9aWZlvHFY6Ija4zyEGIig7/r4gcCg5r5Y4Tnd6py5gFK7QkwUssVlWOv6BD
PwUGYMEcrvFnebHB3LyVkNgl/YLYXJlFAiCORI+7/GvQ7jbUgn5RDCJ9F1lYUXTy
LKbyDgJTAgMBAAECggEAI+/BWReKxc8BmoWoQCfljtGDXxuV12O1p346RcEpjgix
3QiQBYmKUBcPOL49gUEL2nYJ6WlolNHYcFzNkhBdYudU25T7os2Cfpwbx1dTG9RV
q+3TR/U75CnDjxTkeO0C8FrdQxQ5Zxop5HB/D7MARebGZeI7zcVDJFIQrCFg4PjS
B2LqQroK1sZI9FRzH1ooaIMFDd4djw/gom5iwrn6zT9BsGVpBrMIJFKwnIx3PyWn
m10O0BsOuuyx5hXtiJGECN2DAaK5X+aOE9CiOVDePgzpBXM8zrNDw0D0kGpkdMHm
Y7oUN9HDIvKbyAzlOavs+8Wc8CibN5BO4lyUuMZP2QKBgQDaZDviNPCB+XxlPXv5
xSj7KPLlbdIOzpF05Z9Sdw3Ba0UiS5bYZN9O14uefOm1z8sjON3OI0ZS25Oxx8hf
rDcerPahOBGHKe61dbkAXjuyhgMyQGedt+ROD8lfZzPH/mwQQQBVvWrWvgStRoG1
zbN4FANkBAInhry0qvQLKsVJlQKBgQDDyLGr4zGQVtiakxufypuFlIl3e7vYCHyI
5jCE5Bpqb+rC8DEcRqVoLVP8LkzuWnwnbup9zp/epfDaSGPrTtL8lHdaps6w+MdL
q3hwD6lSgcWiUPw3HXL8aR4zPf/CvvSGzruEKM+Aly4kDNEqw1xwGqyvxZO1Ss8S
+qLP7jmyRwKBgGlU+NEEiY+WvhmySu5P2pEw0d44Vp0PonZIHczPYRIN491DqfCa
zl2fdlatlqc7HpXRYqF+v/dMsnKHkiwaysb/00A047dWUSyyJ9V2ncJgAoClMZSP
Ug0Ybh6WjxIBsysvvrKb1kDWizjrjbobCVl8BZqimEtqH+/fmC8epOL1AoGAOPCB
W7Azlfrr++iUvCA8otjUMf+2Xdn5/gaUTdHZLONnr1ITtlmFeYrVRh5hGWEPgphr
cjNJo4M3TQSIqsK98d9r5t1kd2ui6orv+AdWAzzisZZEA/N4oZggxF5fp8/JZftx
5bnIv2k4bhucKYevtprLZkNb9fnPx4FFIJv0A7kCgYEAxqDXDZJnqdP5b93l1PH/
LdGJzrZoRuWQ3pPYeDtKxuhgfDWOjMl/nj1r4R12XhWBPSgAT3FVPHMoWbVuF9Mc
/rXqtOimNVuyMwnH/oNFb91V17jMKUYO2ULT3WPiFGCVdHmv7YjxyILmDfZWpgbY
TSM4J09MI1GOu8xHGGFqLVc=
-----END PRIVATE KEY-----

View File

@ -0,0 +1,129 @@
-----BEGIN CERTIFICATE-----
MIIFyDCCBLCgAwIBAgIQCZ+bJcbPk2YsooAFdD9PPjANBgkqhkiG9w0BAQsFADCB
jzELMAkGA1UEBhMCR0IxGzAZBgNVBAgTEkdyZWF0ZXIgTWFuY2hlc3RlcjEQMA4G
A1UEBxMHU2FsZm9yZDEYMBYGA1UEChMPU2VjdGlnbyBMaW1pdGVkMTcwNQYDVQQD
Ey5TZWN0aWdvIFJTQSBEb21haW4gVmFsaWRhdGlvbiBTZWN1cmUgU2VydmVyIENB
MB4XDTIwMTEzMDAwMDAwMFoXDTIxMTEzMDIzNTk1OVowHjEcMBoGA1UEAwwTKi51
ZHNlbnRlcnByaXNlLmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
AKcFj34HOR8Fvo/O9b06wv6xs7mdAFOKOyI4M+wXlrAbAywRvPl5eHx+ashYkDZK
7TNuMBmfRXt6Z2nGHxi+ZZ3w7V8lZxjNUT6gbB6hASDePcRoMaGRgewrNAdXu9gD
jk2y3yg0E5hK0MpcG6vfP5N9b8EfmgsFaJT2ocxxA3AIpM5Eow52TTj8voFV/zAw
Hq4wae1esGEJ4gZlyOOH1pZmW8cVjoiNrjPIQYiKDv+viBwKDmvljhOd3qnLmAUr
tCTBSyxWVY6/oEM/BQZgwRyu8Wd5scHcvJWQ2CX9gthcmUUCII5Ej7v8a9DuNtSC
flEMIn0XWVhRdPIspvIOAlMCAwEAAaOCAo4wggKKMB8GA1UdIwQYMBaAFI2MXsRU
rYrhd+mb+ZsF4bgBjWHhMB0GA1UdDgQWBBTkpYJLdVjo/XqVtNNxXP3k+TbzHzAO
BgNVHQ8BAf8EBAMCBaAwDAYDVR0TAQH/BAIwADAdBgNVHSUEFjAUBggrBgEFBQcD
AQYIKwYBBQUHAwIwSQYDVR0gBEIwQDA0BgsrBgEEAbIxAQICBzAlMCMGCCsGAQUF
BwIBFhdodHRwczovL3NlY3RpZ28uY29tL0NQUzAIBgZngQwBAgEwgYQGCCsGAQUF
BwEBBHgwdjBPBggrBgEFBQcwAoZDaHR0cDovL2NydC5zZWN0aWdvLmNvbS9TZWN0
aWdvUlNBRG9tYWluVmFsaWRhdGlvblNlY3VyZVNlcnZlckNBLmNydDAjBggrBgEF
BQcwAYYXaHR0cDovL29jc3Auc2VjdGlnby5jb20wMQYDVR0RBCowKIITKi51ZHNl
bnRlcnByaXNlLmNvbYIRdWRzZW50ZXJwcmlzZS5jb20wggEEBgorBgEEAdZ5AgQC
BIH1BIHyAPAAdgB9PvL4j/+IVWgkwsDKnlKJeSvFDngJfy5ql2iZfiLw1wAAAXYY
yzKIAAAEAwBHMEUCIQDwBVF/I9Ut5lkgpt0KHVwAgQbH2YrTpPyX/zDoVm0rLwIg
dMS5/rHTZKlDUJlKHZw53RJPeoC4vHv5oBf4C5no//8AdgCUILwejtWNbIhzH4KL
IiwN0dpNXmxPlD1h204vWE2iwgAAAXYYyzKwAAAEAwBHMEUCIBNSxuL+PEvW7kwF
g5G4lNgCytcHDIUVjGZeiuhEBbMzAiEAjSkwqRia25veOppSkp43+iDa0mwEOPD/
2WqerPQWYq0wDQYJKoZIhvcNAQELBQADggEBAJOfjODCZziUPp/2hDIvfS4ZQci6
exjgsrEa/hWydvIxi1CyTxCdLKaWkUvUZFlHg6zxYFMqCg6jAWzo3rcR7b+bkGro
CD5yFaFJcRpkLAB3+1lS0laaovnuBh+vFkK/uT9qsnt4u8CJFIpDsO6YjGdELPUR
1rLnxgPlYI+kG8xEK1iHh53Q5ayKbsavzlQ4usxM+BEP0hjct/lomQN4WJG4ZXLp
HLYznI1ydppozicaMVl+/qn+NQcx1ULPEKC9SwVm72QQqFgTFUxUfM/gd+ROGDv1
jNIJixg1F4HxiJdFUuOtcRpRzal9VSLl/P7qSyHlptrJh1RCEmAw7Ld1zVY=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIENjCCAx6gAwIBAgIBATANBgkqhkiG9w0BAQUFADBvMQswCQYDVQQGEwJTRTEU
MBIGA1UEChMLQWRkVHJ1c3QgQUIxJjAkBgNVBAsTHUFkZFRydXN0IEV4dGVybmFs
IFRUUCBOZXR3b3JrMSIwIAYDVQQDExlBZGRUcnVzdCBFeHRlcm5hbCBDQSBSb290
MB4XDTAwMDUzMDEwNDgzOFoXDTIwMDUzMDEwNDgzOFowbzELMAkGA1UEBhMCU0Ux
FDASBgNVBAoTC0FkZFRydXN0IEFCMSYwJAYDVQQLEx1BZGRUcnVzdCBFeHRlcm5h
bCBUVFAgTmV0d29yazEiMCAGA1UEAxMZQWRkVHJ1c3QgRXh0ZXJuYWwgQ0EgUm9v
dDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALf3GjPm8gAELTngTlvt
H7xsD821+iO2zt6bETOXpClMfZOfvUq8k+0DGuOPz+VtUFrWlymUWoCwSXrbLpX9
uMq/NzgtHj6RQa1wVsfwTz/oMp50ysiQVOnGXw94nZpAPA6sYapeFI+eh6FqUNzX
mk6vBbOmcZSccbNQYArHE504B4YCqOmoaSYYkKtMsE8jqzpPhNjfzp/haW+710LX
a0Tkx63ubUFfclpxCDezeWWkWaCUN/cALw3CknLa0Dhy2xSoRcRdKn23tNbE7qzN
E0S3ySvdQwAl+mG5aWpYIxG3pzOPVnVZ9c0p10a3CitlttNCbxWyuHv77+ldU9U0
WicCAwEAAaOB3DCB2TAdBgNVHQ4EFgQUrb2YejS0Jvf6xCZU7wO94CTLVBowCwYD
VR0PBAQDAgEGMA8GA1UdEwEB/wQFMAMBAf8wgZkGA1UdIwSBkTCBjoAUrb2YejS0
Jvf6xCZU7wO94CTLVBqhc6RxMG8xCzAJBgNVBAYTAlNFMRQwEgYDVQQKEwtBZGRU
cnVzdCBBQjEmMCQGA1UECxMdQWRkVHJ1c3QgRXh0ZXJuYWwgVFRQIE5ldHdvcmsx
IjAgBgNVBAMTGUFkZFRydXN0IEV4dGVybmFsIENBIFJvb3SCAQEwDQYJKoZIhvcN
AQEFBQADggEBALCb4IUlwtYj4g+WBpKdQZic2YR5gdkeWxQHIzZlj7DYd7usQWxH
YINRsPkyPef89iYTx4AWpb9a/IfPeHmJIZriTAcKhjW88t5RxNKWt9x+Tu5w/Rw5
6wwCURQtjr0W4MHfRnXnJK3s9EK0hZNwEGe6nQY1ShjTK3rMUUKhemPR5ruhxSvC
Nr4TDea9Y355e6cJDUCrat2PisP29owaQgVR1EX1n6diIWgVIEM8med8vSTYqZEX
c4g/VhsxOBi0cQ+azcgOno4uG+GMmIPLHzHxREzGBHNJdmAPx/i9F4BrLunMTA5a
mnkPIAou1Z5jJh5VkpTYghdae9C8x49OhgQ=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFdzCCBF+gAwIBAgIQE+oocFv07O0MNmMJgGFDNjANBgkqhkiG9w0BAQwFADBv
MQswCQYDVQQGEwJTRTEUMBIGA1UEChMLQWRkVHJ1c3QgQUIxJjAkBgNVBAsTHUFk
ZFRydXN0IEV4dGVybmFsIFRUUCBOZXR3b3JrMSIwIAYDVQQDExlBZGRUcnVzdCBF
eHRlcm5hbCBDQSBSb290MB4XDTAwMDUzMDEwNDgzOFoXDTIwMDUzMDEwNDgzOFow
gYgxCzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpOZXcgSmVyc2V5MRQwEgYDVQQHEwtK
ZXJzZXkgQ2l0eTEeMBwGA1UEChMVVGhlIFVTRVJUUlVTVCBOZXR3b3JrMS4wLAYD
VQQDEyVVU0VSVHJ1c3QgUlNBIENlcnRpZmljYXRpb24gQXV0aG9yaXR5MIICIjAN
BgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAgBJlFzYOw9sIs9CsVw127c0n00yt
UINh4qogTQktZAnczomfzD2p7PbPwdzx07HWezcoEStH2jnGvDoZtF+mvX2do2NC
tnbyqTsrkfjib9DsFiCQCT7i6HTJGLSR1GJk23+jBvGIGGqQIjy8/hPwhxR79uQf
jtTkUcYRZ0YIUcuGFFQ/vDP+fmyc/xadGL1RjjWmp2bIcmfbIWax1Jt4A8BQOujM
8Ny8nkz+rwWWNR9XWrf/zvk9tyy29lTdyOcSOk2uTIq3XJq0tyA9yn8iNK5+O2hm
AUTnAU5GU5szYPeUvlM3kHND8zLDU+/bqv50TmnHa4xgk97Exwzf4TKuzJM7UXiV
Z4vuPVb+DNBpDxsP8yUmazNt925H+nND5X4OpWaxKXwyhGNVicQNwZNUMBkTrNN9
N6frXTpsNVzbQdcS2qlJC9/YgIoJk2KOtWbPJYjNhLixP6Q5D9kCnusSTJV882sF
qV4Wg8y4Z+LoE53MW4LTTLPtW//e5XOsIzstAL81VXQJSdhJWBp/kjbmUZIO8yZ9
HE0XvMnsQybQv0FfQKlERPSZ51eHnlAfV1SoPv10Yy+xUGUJ5lhCLkMaTLTwJUdZ
+gQek9QmRkpQgbLevni3/GcV4clXhB4PY9bpYrrWX1Uu6lzGKAgEJTm4Diup8kyX
HAc/DVL17e8vgg8CAwEAAaOB9DCB8TAfBgNVHSMEGDAWgBStvZh6NLQm9/rEJlTv
A73gJMtUGjAdBgNVHQ4EFgQUU3m/WqorSs9UgOHYm8Cd8rIDZsswDgYDVR0PAQH/
BAQDAgGGMA8GA1UdEwEB/wQFMAMBAf8wEQYDVR0gBAowCDAGBgRVHSAAMEQGA1Ud
HwQ9MDswOaA3oDWGM2h0dHA6Ly9jcmwudXNlcnRydXN0LmNvbS9BZGRUcnVzdEV4
dGVybmFsQ0FSb290LmNybDA1BggrBgEFBQcBAQQpMCcwJQYIKwYBBQUHMAGGGWh0
dHA6Ly9vY3NwLnVzZXJ0cnVzdC5jb20wDQYJKoZIhvcNAQEMBQADggEBAJNl9jeD
lQ9ew4IcH9Z35zyKwKoJ8OkLJvHgwmp1ocd5yblSYMgpEg7wrQPWCcR23+WmgZWn
RtqCV6mVksW2jwMibDN3wXsyF24HzloUQToFJBv2FAY7qCUkDrvMKnXduXBBP3zQ
YzYhBx9G/2CkkeFnvN4ffhkUyWNnkepnB2u0j4vAbkN9w6GAbLIevFOFfdyQoaS8
Le9Gclc1Bb+7RrtubTeZtv8jkpHGbkD4jylW6l/VXxRTrPBPYer3IsynVgviuDQf
Jtl7GQVoP7o81DgGotPmjw7jtHFtQELFhLRAlSv0ZaBIefYdgWOWnU914Ph85I6p
0fKtirOMxyHNwu8=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIGEzCCA/ugAwIBAgIQfVtRJrR2uhHbdBYLvFMNpzANBgkqhkiG9w0BAQwFADCB
iDELMAkGA1UEBhMCVVMxEzARBgNVBAgTCk5ldyBKZXJzZXkxFDASBgNVBAcTC0pl
cnNleSBDaXR5MR4wHAYDVQQKExVUaGUgVVNFUlRSVVNUIE5ldHdvcmsxLjAsBgNV
BAMTJVVTRVJUcnVzdCBSU0EgQ2VydGlmaWNhdGlvbiBBdXRob3JpdHkwHhcNMTgx
MTAyMDAwMDAwWhcNMzAxMjMxMjM1OTU5WjCBjzELMAkGA1UEBhMCR0IxGzAZBgNV
BAgTEkdyZWF0ZXIgTWFuY2hlc3RlcjEQMA4GA1UEBxMHU2FsZm9yZDEYMBYGA1UE
ChMPU2VjdGlnbyBMaW1pdGVkMTcwNQYDVQQDEy5TZWN0aWdvIFJTQSBEb21haW4g
VmFsaWRhdGlvbiBTZWN1cmUgU2VydmVyIENBMIIBIjANBgkqhkiG9w0BAQEFAAOC
AQ8AMIIBCgKCAQEA1nMz1tc8INAA0hdFuNY+B6I/x0HuMjDJsGz99J/LEpgPLT+N
TQEMgg8Xf2Iu6bhIefsWg06t1zIlk7cHv7lQP6lMw0Aq6Tn/2YHKHxYyQdqAJrkj
eocgHuP/IJo8lURvh3UGkEC0MpMWCRAIIz7S3YcPb11RFGoKacVPAXJpz9OTTG0E
oKMbgn6xmrntxZ7FN3ifmgg0+1YuWMQJDgZkW7w33PGfKGioVrCSo1yfu4iYCBsk
Haswha6vsC6eep3BwEIc4gLw6uBK0u+QDrTBQBbwb4VCSmT3pDCg/r8uoydajotY
uK3DGReEY+1vVv2Dy2A0xHS+5p3b4eTlygxfFQIDAQABo4IBbjCCAWowHwYDVR0j
BBgwFoAUU3m/WqorSs9UgOHYm8Cd8rIDZsswHQYDVR0OBBYEFI2MXsRUrYrhd+mb
+ZsF4bgBjWHhMA4GA1UdDwEB/wQEAwIBhjASBgNVHRMBAf8ECDAGAQH/AgEAMB0G
A1UdJQQWMBQGCCsGAQUFBwMBBggrBgEFBQcDAjAbBgNVHSAEFDASMAYGBFUdIAAw
CAYGZ4EMAQIBMFAGA1UdHwRJMEcwRaBDoEGGP2h0dHA6Ly9jcmwudXNlcnRydXN0
LmNvbS9VU0VSVHJ1c3RSU0FDZXJ0aWZpY2F0aW9uQXV0aG9yaXR5LmNybDB2Bggr
BgEFBQcBAQRqMGgwPwYIKwYBBQUHMAKGM2h0dHA6Ly9jcnQudXNlcnRydXN0LmNv
bS9VU0VSVHJ1c3RSU0FBZGRUcnVzdENBLmNydDAlBggrBgEFBQcwAYYZaHR0cDov
L29jc3AudXNlcnRydXN0LmNvbTANBgkqhkiG9w0BAQwFAAOCAgEAMr9hvQ5Iw0/H
ukdN+Jx4GQHcEx2Ab/zDcLRSmjEzmldS+zGea6TvVKqJjUAXaPgREHzSyrHxVYbH
7rM2kYb2OVG/Rr8PoLq0935JxCo2F57kaDl6r5ROVm+yezu/Coa9zcV3HAO4OLGi
H19+24rcRki2aArPsrW04jTkZ6k4Zgle0rj8nSg6F0AnwnJOKf0hPHzPE/uWLMUx
RP0T7dWbqWlod3zu4f+k+TY4CFM5ooQ0nBnzvg6s1SQ36yOoeNDT5++SR2RiOSLv
xvcRviKFxmZEJCaOEDKNyJOuB56DPi/Z+fVGjmO+wea03KbNIaiGCpXZLoUmGv38
sbZXQm2V0TP2ORQGgkE49Y9Y3IBbpNV9lXj9p5v//cWoaasm56ekBYdbqbe4oyAL
l6lFhd2zi+WJN44pDfwGF/Y4QA5C5BIG+3vzxhFoYt/jmPQT2BVPi7Fp2RBgvGQq
6jG35LWjOhSbJuMLe/0CjraZwTiXWTb2qHSihrZe68Zk6s+go/lunrotEbaGmAhY
LcmsJWTyXnW0OMGuf1pGg+pRyrbxmRE1a6Vqe8YAsOf4vmSyrcjC8azjUeqkk+B5
yOGBQMkKW+ESPMFgKuOXwIlCypTPRpgSabuY0MLTDXJLR27lk8QyKGOHQ+SwMj4K
00u/I5sUKUErmgQfky3xxzlIPK1aEn8=
-----END CERTIFICATE-----

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020 Virtual Cable S.L.U.
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -26,7 +26,7 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import hashlib
import multiprocessing
@ -38,6 +38,7 @@ from .consts import CONFIGFILE
logger = logging.getLogger(__name__)
class ConfigurationType(typing.NamedTuple):
pidfile: str
user: str
@ -51,7 +52,7 @@ class ConfigurationType(typing.NamedTuple):
listen_port: int
workers: int
ssl_certificate: str
ssl_certificate_key: str
ssl_ciphers: str
@ -62,11 +63,26 @@ class ConfigurationType(typing.NamedTuple):
secret: str
allow: typing.Set[str]
def read() -> ConfigurationType:
with open(CONFIGFILE, 'r') as f:
config_str = '[uds]\n' + f.read()
use_uvloop: bool
def read_config_file(
cfg_file: typing.Optional[typing.Union[typing.TextIO, str]] = None
) -> str:
if cfg_file is None:
cfg_file = CONFIGFILE
if isinstance(cfg_file, str):
with open(cfg_file, 'r') as f:
return '[uds]\n' + f.read()
# path is in fact a file-like object
return '[uds]\n' + cfg_file.read()
def read(
cfg_file: typing.Optional[typing.Union[typing.TextIO, str]] = None
) -> ConfigurationType:
config_str = read_config_file(cfg_file)
cfg = configparser.ConfigParser()
cfg.read_string(config_str)
@ -95,7 +111,7 @@ def read() -> ConfigurationType:
user=uds.get('user', ''),
log_level=uds.get('loglevel', 'ERROR'),
log_file=uds.get('logfile', ''),
log_size=int(logsize)*1024*1024,
log_size=int(logsize) * 1024 * 1024,
log_number=int(uds.get('lognumber', '3')),
listen_address=uds.get('address', '0.0.0.0'),
listen_port=int(uds.get('port', '443')),
@ -108,8 +124,13 @@ def read() -> ConfigurationType:
uds_token=uds.get('uds_token', 'unauthorized'),
secret=secret,
allow=set(uds.get('allow', '127.0.0.1').split(',')),
use_uvloop=uds.get('use_uvloop', 'true').lower() == 'true',
)
except ValueError as e:
raise Exception(f'Mandatory configuration file in incorrect format: {e.args[0]}. Please, revise {CONFIGFILE}')
raise Exception(
f'Mandatory configuration file in incorrect format: {e.args[0]}. Please, revise {CONFIGFILE}'
)
except KeyError as e:
raise Exception(f'Mandatory configuration parameter not found: {e.args[0]}. Please, revise {CONFIGFILE}')
raise Exception(
f'Mandatory configuration parameter not found: {e.args[0]}. Please, revise {CONFIGFILE}'
)

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 Virtual Cable S.L.U.
# Copyright (c) 2023 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -26,7 +26,7 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
DEBUG = True
@ -51,10 +51,11 @@ BANDWIDTH_TIME = 10
# Commands LENGTH (all same length)
COMMAND_LENGTH = 4
VERSION = 'v1.0.0'
VERSION = 'v2.0.0'
# Valid commands
COMMAND_OPEN = b'OPEN'
COMMAND_TEST = b'TEST'
COMMAND_STAT = b'STAT' # full stats
COMMAND_INFO = b'INFO' # Basic stats, currently same as FULL

View File

@ -1,8 +1,39 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import multiprocessing
import asyncio
import sys
import logging
import typing
import curio
import psutil
from . import config
@ -13,6 +44,14 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
ProcessType = typing.Callable[
['Connection', config.ConfigurationType, 'Namespace'],
typing.Coroutine[typing.Any, None, None],
]
NO_CPU_PERCENT: float = 1000001.0
class Processes:
"""
This class is used to store the processes that are used by the tunnel.
@ -21,11 +60,13 @@ class Processes:
children: typing.List[
typing.Tuple['Connection', multiprocessing.Process, psutil.Process]
]
process: typing.Callable
process: ProcessType
cfg: config.ConfigurationType
ns: 'Namespace'
def __init__(self, process: typing.Callable, cfg: config.ConfigurationType, ns: 'Namespace') -> None:
def __init__(
self, process: ProcessType, cfg: config.ConfigurationType, ns: 'Namespace'
) -> None:
self.children = []
self.process = process # type: ignore
self.cfg = cfg
@ -37,15 +78,17 @@ class Processes:
def add_child_pid(self):
own_conn, child_conn = multiprocessing.Pipe()
task = multiprocessing.Process(
target=curio.run,
args=(self.process, child_conn, self.cfg, self.ns)
target=Processes.runner,
args=(self.process, child_conn, self.cfg, self.ns),
)
task.start()
logger.debug('ADD CHILD PID: %s', task.pid)
self.children.append((own_conn, task, psutil.Process(task.pid)))
self.children.append(
(typing.cast('Connection', own_conn), task, psutil.Process(task.pid))
)
def best_child(self) -> 'Connection':
best: typing.Tuple[float, 'Connection'] = (1000.0, self.children[0][0])
best: typing.Tuple[float, 'Connection'] = (NO_CPU_PERCENT, self.children[0][0])
missingProcesses: typing.List[int] = []
for i, c in enumerate(self.children):
try:
@ -77,23 +120,49 @@ class Processes:
if missingProcesses:
logger.debug('Regenerating missing processes: %s', len(missingProcesses))
# Regenerate childs and recreate new proceeses for requests...
tmpChilds = [
self.children[i]
for i in range(len(self.children))
# Remove missing processes
self.children[:] = [
child
for i, child in enumerate(self.children)
if i not in missingProcesses
]
self.children[:] = tmpChilds
# Now add new children
for i in range(len(missingProcesses)):
for (
_
) in (
missingProcesses
): # wee need to add as many as we removed, that is the len of missingProcesses
self.add_child_pid()
# Recheck best if all child were missing
if best[0] == NO_CPU_PERCENT:
return self.best_child()
return best[1]
def stop(self):
def stop(self) -> None:
# Try to stop running childs
for i in self.children:
try:
i[2].kill()
except Exception as e:
logger.info('KILLING child %s: %s', i[2], e)
@staticmethod
def runner(
proc: ProcessType,
conn: 'Connection',
cfg: config.ConfigurationType,
ns: 'Namespace',
) -> None:
if cfg.use_uvloop:
import uvloop
if sys.version_info >= (3, 11):
with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:
runner.run(proc(conn, cfg, ns))
else:
uvloop.install()
asyncio.run(proc(conn, cfg, ns))
else:
asyncio.run(proc(conn, cfg, ns))

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020 Virtual Cable S.L.U.
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -26,230 +26,54 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import asyncio
import socket
import logging
import typing
import curio
import requests
from . import config
from . import stats
from . import consts
from . import tunnel
if typing.TYPE_CHECKING:
from multiprocessing.managers import Namespace
import curio.io
import ssl
logger = logging.getLogger(__name__)
class Proxy:
cfg: config.ConfigurationType
cfg: 'config.ConfigurationType'
ns: 'Namespace'
def __init__(self, cfg: config.ConfigurationType, ns: 'Namespace') -> None:
def __init__(self, cfg: 'config.ConfigurationType', ns: 'Namespace') -> None:
self.cfg = cfg
self.ns = ns
@staticmethod
def _getUdsUrl(
cfg: config.ConfigurationType,
ticket: bytes,
msg: str,
queryParams: typing.Optional[typing.Mapping[str, str]] = None,
) -> typing.MutableMapping[str, typing.Any]:
try:
url = (
cfg.uds_server + '/' + ticket.decode() + '/' + msg + '/' + cfg.uds_token
)
if queryParams:
url += '?' + '&'.join(
[f'{key}={value}' for key, value in queryParams.items()]
)
r = requests.get(
url,
headers={
'content-type': 'application/json',
'User-Agent': f'UDSTunnel/{consts.VERSION}',
},
)
if not r.ok:
raise Exception(r.content or 'Invalid Ticket (timed out)')
return r.json()
except Exception as e:
raise Exception(f'TICKET COMMS ERROR: {ticket.decode()} {msg} {e!s}')
@staticmethod
def getFromUds(
cfg: config.ConfigurationType, ticket: bytes, address: typing.Tuple[str, int]
) -> typing.MutableMapping[str, typing.Any]:
# Sanity checks
if len(ticket) != consts.TICKET_LENGTH:
raise Exception(f'TICKET INVALID (len={len(ticket)})')
for n, i in enumerate(ticket.decode(errors='ignore')):
if (
(i >= 'a' and i <= 'z')
or (i >= '0' and i <= '9')
or (i >= 'A' and i <= 'Z')
):
continue # Correctus
raise Exception(f'TICKET INVALID (char {i} at pos {n})')
return Proxy._getUdsUrl(cfg, ticket, address[0])
@staticmethod
def notifyEndToUds(
cfg: config.ConfigurationType, ticket: bytes, counter: stats.Stats
) -> None:
Proxy._getUdsUrl(
cfg, ticket, 'stop', {'sent': str(counter.sent), 'recv': str(counter.recv)}
) # Ignore results
@staticmethod
async def doProxy(
source: 'curio.io.Socket',
destination: 'curio.io.Socket',
counter: stats.StatsSingleCounter,
) -> None:
try:
while True:
data = await source.recv(consts.BUFFER_SIZE)
if not data:
break
await destination.sendall(data)
counter.add(len(data))
except Exception:
# Connection broken, same result as closed for us
# We must notice that i'ts easy that when closing one part of the tunnel,
# the other can break (due to some internal data), that's why even log is removed
# logger.info('CONNECTION LOST FROM %s to %s', source.getsockname(), destination.getpeername())
pass
# Method responsible of proxying requests
async def __call__(self, source, address: typing.Tuple[str, int]) -> None:
async def __call__(self, source: socket.socket, context: 'ssl.SSLContext') -> None:
try:
await self.proxy(source, address)
await self.proxy(source, context)
except Exception as e:
logger.exception('Error procesing connection from %s: %s', address, e)
async def stats(self, full: bool, source, address: typing.Tuple[str, int]) -> None:
# Check valid source ip
if address[0] not in self.cfg.allow:
# Invalid source
await source.sendall(b'FORBIDDEN')
return
# Check password
passwd = await source.recv(consts.PASSWORD_LENGTH)
if passwd.decode(errors='ignore') != self.cfg.secret:
# Invalid password
await source.sendall(b'FORBIDDEN')
return
data = stats.GlobalStats.get_stats(self.ns)
for v in data:
logger.debug('SENDING %s', v)
await source.sendall(v.encode() + b'\n')
async def proxy(self, source, address: typing.Tuple[str, int]) -> None:
prettySource = address[0] # Get only source IP
prettyDest = ''
logger.info('CONNECT FROM %s', prettySource)
# Handshake correct in this point, start SSL connection
command: bytes = b''
try:
command = await source.recv(consts.COMMAND_LENGTH)
if command == consts.COMMAND_TEST:
logger.info('COMMAND: TEST')
await source.sendall(b'OK')
logger.info('TERMINATED %s', prettySource)
return
if command in (consts.COMMAND_STAT, consts.COMMAND_INFO):
logger.info('COMMAND: %s', command.decode())
# This is an stats requests
await self.stats(
full=command == consts.COMMAND_STAT, source=source, address=address
)
logger.info('TERMINATED %s', prettySource)
return
if command != consts.COMMAND_OPEN:
# Invalid command
raise Exception(command)
# Now, read a TICKET_LENGTH (64) bytes string, that must be [a-zA-Z0-9]{64}
ticket: bytes = await source.recv(consts.TICKET_LENGTH)
# Ticket received, now process it with UDS
# get source ip address
try:
result = await curio.run_in_thread(
Proxy.getFromUds, self.cfg, ticket, address
)
except Exception as e:
logger.error('ERROR %s', e.args[0] if e.args else e)
try:
await source.sendall(b'ERROR_TICKET')
except Exception:
pass # Ignore errors
return
prettyDest = f"{result['host']}:{result['port']}"
logger.info('OPEN TUNNEL FROM %s to %s', prettySource, prettyDest)
except Exception:
logger.error('ERROR from %s: COMMAND %s', prettySource, command)
try:
await source.sendall(b'ERROR_COMMAND')
addr = source.getpeername()
except Exception:
pass # Ignore errors
return
addr = 'Unknown'
logger.error('Proxy error from %s: %s', addr, e)
async def proxy(self, source: socket.socket, context: 'ssl.SSLContext') -> None:
loop = asyncio.get_event_loop()
# Handshake correct in this point, upgrade the connection to TSL and let
# the protocol controller do the rest
# Initialize own stats counter
counter = stats.Stats(self.ns)
try:
# Communicate source OPEN is ok
await source.sendall(b'OK')
# Open remote server connection
destination = await curio.open_connection(
result['host'], int(result['port'])
)
async with curio.TaskGroup(wait=any) as grp:
await grp.spawn(
Proxy.doProxy, source, destination, counter.as_sent_counter()
)
await grp.spawn(
Proxy.doProxy, destination, source, counter.as_recv_counter()
)
logger.debug('PROXIES READY')
logger.debug('Proxies finalized: %s', grp.exceptions)
await curio.run_in_thread(
Proxy.notifyEndToUds, self.cfg, result['notify'].encode(), counter
)
except Exception as e:
if consts.DEBUG:
logger.exception('OPEN REMOTE')
logger.error('REMOTE from %s: %s', address, e)
finally:
counter.close() # So we ensure stats are correctly updated on ns
logger.info(
'TERMINATED %s to %s, s:%s, r:%s, t:%s',
prettySource,
prettyDest,
counter.sent,
counter.recv,
int(counter.end - counter.start),
# Upgrade connection to SSL, and use asyncio to handle the rest
transport: 'asyncio.transports.Transport'
protocol: tunnel.TunnelProtocol
(transport, protocol) = await loop.connect_accepted_socket( # type: ignore
lambda: tunnel.TunnelProtocol(self), source, ssl=context
)
await protocol.finished
return

View File

@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020 Virtual Cable S.L.U.
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -26,18 +26,19 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import multiprocessing
import socket
import time
import logging
import typing
import io
import asyncio
import ssl
import logging
import typing
import curio
from . import config
from . import consts
@ -142,19 +143,28 @@ async def getServerStats(detailed: bool = False) -> None:
# Context for local connection (ignores cert hostname)
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE # For ServerStats, do not checks certificate
context.verify_mode = ssl.CERT_NONE # For ServerStats, does not checks certificate
try:
host = cfg.listen_address if cfg.listen_address != '0.0.0.0' else 'localhost'
sock = await curio.open_connection(
host, cfg.listen_port, ssl=context, server_hostname='localhost'
)
tmpdata = io.BytesIO()
cmd = consts.COMMAND_STAT if detailed else consts.COMMAND_INFO
async with sock:
await sock.sendall(consts.HANDSHAKE_V1 + cmd + cfg.secret.encode())
reader: asyncio.StreamReader
writer: asyncio.StreamWriter
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((host, cfg.listen_port))
# Send HANDSHAKE
sock.sendall(consts.HANDSHAKE_V1)
# Ugrade connection to TLS
reader, writer = await asyncio.open_connection(sock=sock, ssl=context, server_hostname=host)
tmpdata = io.BytesIO()
cmd = consts.COMMAND_STAT if detailed else consts.COMMAND_INFO
writer.write(cmd + cfg.secret.encode())
await writer.drain()
while True:
chunk = await sock.recv(consts.BUFFER_SIZE)
chunk = await reader.read(consts.BUFFER_SIZE)
if not chunk:
break
tmpdata.write(chunk)

View File

@ -0,0 +1,330 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import asyncio
import typing
import logging
import aiohttp
from . import consts
from . import config
from . import stats
logger = logging.getLogger(__name__)
if typing.TYPE_CHECKING:
from . import proxy
# Protocol
class TunnelProtocol(asyncio.Protocol):
# future to mark eof
finished: asyncio.Future
# Transport and other side of tunnel
transport: 'asyncio.transports.Transport'
other_side: 'TunnelProtocol'
# Current state
runner: typing.Any # In fact, typing.Callable[[bytes], None], but mypy complains on its check
# Command buffer
cmd: bytes
# Ticket
notify_ticket: bytes
# owner Proxy class
owner: 'proxy.Proxy'
# source of connection
source: typing.Tuple[str, int]
destination: typing.Tuple[str, int]
# Counters & stats related
stats_manager: stats.Stats
# counter
counter: stats.StatsSingleCounter
def __init__(
self, owner: 'proxy.Proxy', other_side: typing.Optional['TunnelProtocol'] = None
) -> None:
# If no other side is given, we are the server part
super().__init__()
if other_side:
self.other_side = other_side
self.stats_manager = other_side.stats_manager
self.counter = self.stats_manager.as_recv_counter()
self.runner = self.do_proxy
else:
self.other_side = self
self.stats_manager = stats.Stats(owner.ns)
self.counter = self.stats_manager.as_sent_counter()
self.runner = self.do_command
# transport is undefined until connection_made is called
self.finished = asyncio.Future()
self.cmd = b''
self.notify_ticket = b''
self.owner = owner
self.source = ('', 0)
self.destination = ('', 0)
def process_open(self):
# Open Command has the ticket behind it
if len(self.cmd) < consts.TICKET_LENGTH + consts.COMMAND_LENGTH:
return # Wait for more data to complete OPEN command
# Ticket received, now process it with UDS
ticket = self.cmd[consts.COMMAND_LENGTH :]
# Stop reading from this side until open is done
self.transport.pause_reading()
# clean up the command
self.cmd = b''
loop = asyncio.get_event_loop()
async def open_other_side() -> None:
try:
result = await TunnelProtocol.getTicketFromUDS(
self.owner.cfg, ticket, self.source
)
except Exception as e:
logger.error('ERROR %s', e.args[0] if e.args else e)
self.transport.write(b'ERROR_TICKET')
self.transport.close() # And force close
return
# store for future use
self.destination = (result['host'], int(result['port']))
self.notify_ticket = result['notify'].encode()
logger.info(
'OPEN TUNNEL FROM %s to %s',
self.pretty_source(),
self.pretty_destination(),
)
try:
(_, protocol) = await loop.create_connection(
lambda: TunnelProtocol(self.owner, self),
self.destination[0],
self.destination[1],
)
self.other_side = typing.cast('TunnelProtocol', protocol)
# Resume reading
self.transport.resume_reading()
# send OK to client
self.transport.write(b'OK')
except Exception as e:
logger.error('Error opening connection: %s', e)
self.close_connection()
loop.create_task(open_other_side())
# From now, proxy connection
self.runner = self.do_proxy
def process_stats(self, full: bool) -> None:
# if pasword is not already received, wait for it
if len(self.cmd) < consts.PASSWORD_LENGTH + consts.COMMAND_LENGTH:
return
try:
logger.info('COMMAND: %s', self.cmd[: consts.COMMAND_LENGTH].decode())
# Check valid source ip
if self.transport.get_extra_info('peername')[0] not in self.owner.cfg.allow:
# Invalid source
self.transport.write(b'FORBIDDEN')
return
# Check password
passwd = self.cmd[consts.COMMAND_LENGTH :]
# Clean up the command, only keep base part
self.cmd = self.cmd[:4]
if passwd.decode(errors='ignore') != self.owner.cfg.secret:
# Invalid password
self.transport.write(b'FORBIDDEN')
return
data = stats.GlobalStats.get_stats(self.owner.ns)
for v in data:
logger.debug('SENDING %s', v)
self.transport.write(v.encode() + b'\n')
logger.info('TERMINATED %s', self.pretty_source())
finally:
self.close_connection()
def do_command(self, data: bytes) -> None:
self.cmd += data
if len(self.cmd) >= consts.COMMAND_LENGTH:
logger.info('CONNECT FROM %s', self.pretty_source())
command = self.cmd[: consts.COMMAND_LENGTH]
try:
if command == consts.COMMAND_OPEN:
self.process_open()
elif command == consts.COMMAND_TEST:
logger.info('COMMAND: TEST')
self.transport.write(b'OK')
self.close_connection()
return
elif command in (consts.COMMAND_STAT, consts.COMMAND_INFO):
# This is an stats requests
self.process_stats(full=command == consts.COMMAND_STAT)
return
else:
raise Exception('Invalid command')
except Exception:
logger.error('ERROR from %s', self.pretty_source())
self.transport.write(b'ERROR_COMMAND')
self.close_connection()
return
# if not enough data to process command, wait for more
def do_proxy(self, data: bytes) -> None:
self.counter.add(len(data))
logger.debug('Processing proxy: %s', len(data))
self.other_side.transport.write(data)
# inherited from asyncio.Protocol
def connection_made(self, transport: 'asyncio.transports.BaseTransport') -> None:
logger.debug('Connection made: %s', transport.get_extra_info('peername'))
# We know for sure that the transport is a Transport.
self.transport = typing.cast('asyncio.transports.Transport', transport)
self.cmd = b''
self.source = self.transport.get_extra_info('peername')
def data_received(self, data: bytes):
logger.debug('Data received: %s', len(data))
self.runner(data) # send data to current runner (command or proxy)
def notifyEnd(self):
if self.notify_ticket:
asyncio.get_event_loop().create_task(
TunnelProtocol.notifyEndToUds(
self.owner.cfg, self.notify_ticket, self.stats_manager
)
)
self.notify_ticket = b'' # Clean up so no more notifications
def connection_lost(self, exc: typing.Optional[Exception]) -> None:
logger.debug('Connection closed : %s', exc)
self.finished.set_result(True)
if self.other_side is not self:
self.other_side.transport.close()
else:
self.stats_manager.close()
self.notifyEnd()
# helpers
# source address, pretty format
def pretty_source(self) -> str:
return self.source[0] + ':' + str(self.source[1])
def pretty_destination(self) -> str:
return self.destination[0] + ':' + str(self.destination[1])
def close_connection(self):
self.transport.close()
# If destination is not set, it's a command processing (i.e. TEST or STAT)
if self.destination[0] != '':
logger.info(
'TERMINATED %s to %s, s:%s, r:%s, t:%s',
self.pretty_source(),
self.pretty_destination(),
self.stats_manager.sent,
self.stats_manager.recv,
int(self.stats_manager.end - self.stats_manager.start),
)
# Notify end to uds
self.notifyEnd()
else:
logger.info('TERMINATED %s', self.pretty_source())
@staticmethod
async def _getUdsUrl(
cfg: config.ConfigurationType,
ticket: bytes,
msg: str,
queryParams: typing.Optional[typing.Mapping[str, str]] = None,
) -> typing.MutableMapping[str, typing.Any]:
try:
url = (
cfg.uds_server + '/' + ticket.decode() + '/' + msg + '/' + cfg.uds_token
)
if queryParams:
url += '?' + '&'.join(
[f'{key}={value}' for key, value in queryParams.items()]
)
# Requests url with aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(url) as r:
if not r.ok:
raise Exception(await r.text())
return await r.json()
except Exception as e:
raise Exception(f'TICKET COMMS ERROR: {ticket.decode()} {msg} {e!s}')
@staticmethod
async def getTicketFromUDS(
cfg: config.ConfigurationType, ticket: bytes, address: typing.Tuple[str, int]
) -> typing.MutableMapping[str, typing.Any]:
# Sanity checks
if len(ticket) != consts.TICKET_LENGTH:
raise Exception(f'TICKET INVALID (len={len(ticket)})')
for n, i in enumerate(ticket.decode(errors='ignore')):
if (
(i >= 'a' and i <= 'z')
or (i >= '0' and i <= '9')
or (i >= 'A' and i <= 'Z')
):
continue # Correctus
raise Exception(f'TICKET INVALID (char {i} at pos {n})')
return await TunnelProtocol._getUdsUrl(cfg, ticket, address[0])
@staticmethod
async def notifyEndToUds(
cfg: config.ConfigurationType, ticket: bytes, counter: stats.Stats
) -> None:
await TunnelProtocol._getUdsUrl(
cfg,
ticket,
'stop',
{'sent': str(counter.sent), 'recv': str(counter.recv)},
)

View File

@ -3,7 +3,6 @@
# Pid file, optional
# pidfile = /tmp/udstunnel.pid
user = dkmaster
group = dkmaster
# Log level, valid are DEBUG, INFO, WARN, ERROR. Defaults to ERROR
loglevel = DEBUG

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 Virtual Cable S.L.U.
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -27,29 +27,33 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import os
import pwd
import sys
import asyncio
import argparse
import multiprocessing
import signal
import ssl
import socket
import logging
from concurrent.futures import ThreadPoolExecutor
import typing
import curio
import curio.io
import curio.errors
import setproctitle
try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except ImportError:
pass # no uvloop support
from uds_tunnel import config
from uds_tunnel import proxy
from uds_tunnel import consts
from uds_tunnel import message
from uds_tunnel import stats
from uds_tunnel import processes
try:
import setproctitle
except ImportError:
setproctitle = None # type: ignore
from uds_tunnel import config, proxy, consts, processes, stats
if typing.TYPE_CHECKING:
from multiprocessing.connection import Connection
@ -62,7 +66,7 @@ logger = logging.getLogger(__name__)
do_stop = False
def stop_signal(signum, frame):
def stop_signal(signum: int, frame: typing.Any) -> None:
global do_stop
do_stop = True
logger.debug('SIGNAL %s, frame: %s', signum, frame)
@ -90,7 +94,7 @@ def setup_log(cfg: config.ConfigurationType) -> None:
# Setup basic logging
log = logging.getLogger()
log.setLevel(cfg.log_level)
handler = logging.StreamHandler(sys.stdout)
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(cfg.log_level)
formatter = logging.Formatter(
'%(levelname)s - %(message)s'
@ -102,26 +106,33 @@ def setup_log(cfg: config.ConfigurationType) -> None:
async def tunnel_proc_async(
pipe: 'Connection', cfg: config.ConfigurationType, ns: 'Namespace'
) -> None:
def get_socket(pipe: 'Connection') -> typing.Tuple[typing.Optional[socket.SocketType], typing.Any]:
try:
loop = asyncio.get_running_loop()
except RuntimeError: # older python versions
loop = asyncio.get_event_loop()
tasks: typing.List[asyncio.Task] = []
def get_socket() -> typing.Tuple[typing.Optional[socket.socket], typing.Optional[typing.Tuple[str, int]]]:
try:
while True:
try:
msg: message.Message = pipe.recv()
if msg.command == message.Command.TUNNEL and msg.connection:
return msg.connection
# Clear back event, for next data
msg: typing.Optional[
typing.Tuple[socket.socket, typing.Tuple[str, int]]
] = pipe.recv()
if msg:
return msg
except Exception:
logger.exception('Receiving data from parent process')
return None, None
# Process other messages, and retry
except Exception:
logger.exception('Receiving data from parent process')
return None, None
async def run_server(
pipe: 'Connection', cfg: config.ConfigurationType, group: curio.TaskGroup
) -> None:
async def run_server() -> None:
# Instantiate a proxy redirector for this process (we only need one per process!!)
tunneler = proxy.Proxy(cfg, ns)
# Generate SSL context
context = curio.ssl.SSLContext(curio.ssl.PROTOCOL_TLS_SERVER) # type: ignore
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain(cfg.ssl_certificate, cfg.ssl_certificate_key)
if cfg.ssl_ciphers:
@ -130,58 +141,62 @@ async def tunnel_proc_async(
if cfg.ssl_dhparam:
context.load_dh_params(cfg.ssl_dhparam)
async def processSocket(ssock: socket.socket, address: typing.Any) -> None:
sock = curio.io.Socket(ssock)
try:
# First, ensure handshake (simple handshake) and command
async with curio.timeout_after(3): # type: ignore
data = await sock.recv(len(consts.HANDSHAKE_V1))
if data != consts.HANDSHAKE_V1:
raise Exception(data) # Invalid handshake
except (curio.errors.CancelledError, Exception) as e:
logger.error('HANDSHAKE from %s (%s)', address, 'timeout' if isinstance(e, curio.errors.CancelledError) else e)
# Close Source and continue
await sock.close()
return
sslsock = await context.wrap_socket(
sock, server_side=True # type: ignore
)
await group.spawn(tunneler, sslsock, address)
del sslsock
while True:
address = ('', '')
address: typing.Optional[typing.Tuple[str, int]] = ('', 0)
try:
ssock, address = await curio.run_in_thread(get_socket, pipe)
if not ssock:
break
await group.spawn(processSocket, ssock, address)
(sock, address) = await loop.run_in_executor(None, get_socket)
if not sock:
break # No more sockets, exit
logger.debug(f'CONNECTION from {address!r} (pid: {os.getpid()})')
tasks.append(asyncio.create_task(tunneler(sock, context)))
except Exception:
logger.error('NEGOTIATION ERROR from %s', address[0])
logger.error('NEGOTIATION ERROR from %s', address[0] if address else 'unknown')
async with curio.TaskGroup() as tg:
await tg.spawn(run_server, pipe, cfg, tg)
await tg.join()
# Reap all of the children tasks as they complete
# async for task in tg:
# logger.debug(f'REMOVING async task {task!r}')
# task.joined = True
# del task
# create task for server
tasks.append(asyncio.create_task(run_server()))
while tasks:
to_wait = tasks[:] # Get a copy of the list, and clean the original
# Wait for tasks to finish
done, _ = await asyncio.wait(to_wait, return_when=asyncio.FIRST_COMPLETED)
# Remove finished tasks
for task in done:
tasks.remove(task)
if task.exception():
logger.exception('TUNNEL ERROR')
logger.info('PROCESS %s stopped', os.getpid())
def process_connection(
client: socket.socket, addr: typing.Tuple[str, str], conn: 'Connection'
) -> None:
data: bytes = b''
try:
# First, ensure handshake (simple handshake) and command
data = client.recv(len(consts.HANDSHAKE_V1))
def tunnel_main():
cfg = config.read()
if data != consts.HANDSHAKE_V1:
raise Exception() # Invalid handshake
conn.send((client, addr))
del client # Ensure socket is controlled on child process
except Exception:
logger.error('HANDSHAKE invalid from %s (%s)', addr, data.hex())
# Close Source and continue
client.close()
def tunnel_main(args: 'argparse.Namespace') -> None:
cfg = config.read(args.config)
# Try to bind to port as running user
# Wait for socket incoming connections and spread them
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.setdefaulttimeout(
3.0
) # So we can check for stop from time to time and not block forever
af_inet = socket.AF_INET6 if args.ipv6 or ':' in cfg.listen_address else socket.AF_INET
sock = socket.socket(af_inet, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.settimeout(3.0) # So we can check for stop from time to time
# We will not reuse port, we only want a UDS tunnel server running on a port
# but this may change on future...
# try:
@ -201,8 +216,11 @@ def tunnel_main():
setup_log(cfg)
logger.info('Starting tunnel server on %s:%s', cfg.listen_address, cfg.listen_port)
setproctitle.setproctitle(f'UDSTunnel {cfg.listen_address}:{cfg.listen_port}')
logger.info(
'Starting tunnel server on %s:%s', cfg.listen_address, cfg.listen_port
)
if setproctitle:
setproctitle.setproctitle(f'UDSTunnel {cfg.listen_address}:{cfg.listen_port}')
# Create pid file
if cfg.pidfile:
@ -214,7 +232,6 @@ def tunnel_main():
logger.error('MAIN: %s', e)
return
# Setup signal handlers
signal.signal(signal.SIGINT, stop_signal)
signal.signal(signal.SIGTERM, stop_signal)
@ -223,20 +240,27 @@ def tunnel_main():
prcs = processes.Processes(tunnel_proc_async, cfg, stats_collector.ns)
while not do_stop:
with ThreadPoolExecutor(max_workers=256) as executor:
try:
client, addr = sock.accept()
while not do_stop:
try:
client, addr = sock.accept()
logger.info('CONNECTION from %s', addr)
logger.info('CONNECTION from %s', addr[0])
# Select BEST process for sending this new connection
prcs.best_child().send(
message.Message(message.Command.TUNNEL, (client, addr))
)
del client # Ensure socket is controlled on child process
except socket.timeout:
pass # Continue and retry
# Check if we have reached the max number of connections
# First part is checked on a thread, if HANDSHAKE is valid
# we will send socket to process pool
# Note: We use a thread pool here because we want to
# ensure no denial of service is possible, or at least
# we try to limit it (if connection delays too long, we will close it on the thread)
executor.submit(process_connection, client, addr, prcs.best_child())
except socket.timeout:
pass # Continue and retry
except Exception as e:
logger.error('LOOP: %s', e)
except Exception as e:
logger.error('LOOP: %s', e)
sys.stderr.write(f'Error: {e}\n')
logger.error('MAIN: %s', e)
if sock:
sock.close()
@ -258,9 +282,7 @@ def main() -> None:
group.add_argument(
'-t', '--tunnel', help='Starts the tunnel server', action='store_true'
)
group.add_argument(
'-r', '--rdp', help='RDP Tunnel for traffic accounting'
)
group.add_argument('-r', '--rdp', help='RDP Tunnel for traffic accounting')
group.add_argument(
'-s',
'--stats',
@ -273,19 +295,33 @@ def main() -> None:
help='get current detailed stats from RUNNING tunnel',
action='store_true',
)
# Config file
parser.add_argument(
'-c',
'--config',
help=f'Config file to use (default: {consts.CONFIGFILE})',
default=consts.CONFIGFILE,
)
# If force ipv6
parser.add_argument(
'-6',
'--ipv6',
help='Force IPv6 for tunnel server',
action='store_true',
)
args = parser.parse_args()
if args.tunnel:
tunnel_main()
tunnel_main(args)
elif args.rdp:
pass
elif args.detailed_stats:
curio.run(stats.getServerStats, True)
asyncio.run(stats.getServerStats(True))
elif args.stats:
curio.run(stats.getServerStats, False)
asyncio.run(stats.getServerStats(False))
else:
parser.print_help()
if __name__ == "__main__":
main()
main()

View File

View File

@ -0,0 +1,54 @@
TEST_CONFIG='''# Sample UDS tunnel configuration
# Pid file, optional
pidfile = {pidfile}
user = {user}
# Log level, valid are DEBUG, INFO, WARN, ERROR. Defaults to ERROR
loglevel = {loglevel}
# Log file, Defaults to stdout
logfile = {logfile}
# Max log size before rotating it. Defaults to 32 MB.
# The value is in MB. You can include or not the M string at end.
logsize = {logsize}
# Number of backup logs to keep. Defaults to 3
lognumber = {lognumber}
# Listen address. Defaults to 0.0.0.0
address = {address}
# Number of workers. Defaults to 0 (means "as much as cores")
workers = {workers}
# Listening port
port = 7777
# SSL Related parameters.
ssl_certificate = {ssl_certificate}
ssl_certificate_key = {ssl_certificate_key}
# ssl_ciphers and ssl_dhparam are optional.
ssl_ciphers = {ssl_ciphers}
ssl_dhparam = {ssl_dhparam}
# UDS server location. https NEEDS valid certificate if https
# Must point to tunnel ticket dispatcher URL, that is under /uds/rest/tunnel/ on tunnel server
# Valid examples:
# http://www.example.com/uds/rest/tunnel/ticket
# https://www.example.com:14333/uds/rest/tunnel/ticket
uds_server = {uds_server}
uds_token = {uds_token}
# Secret to get access to admin commands (Currently only stats commands). No default for this.
# Admin commands and only allowed from "allow" ips
# So, in order to allow this commands, ensure listen address allows connections from localhost
secret = {secret}
# List of af allowed admin commands ips (Currently only stats commands).
# Only use IPs, no networks allowed
# defaults to localhost (change if listen address is different from 0.0.0.0)
allow = {allow}
'''

View File

@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
Author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import typing
import hashlib
import string
import io
import random
from unittest import TestCase
from uds_tunnel import config
from . import fixtures
class TestConfigFile(TestCase):
def test_config_file(self) -> None:
# Test in-memory configuration files ramdomly created
for _ in range(100):
values: typing.Mapping[str, typing.Any] = {
'pidfile': f'/tmp/uds_tunnel_{random.randint(0, 100)}.pid', # Random pid file
'user': f'user{random.randint(0, 100)}', # Random user
'loglevel': random.choice(['DEBUG', 'INFO', 'WARNING', 'ERROR']), # Random log level
'logfile': f'/tmp/uds_tunnel_{random.randint(0, 100)}.log', # Random log file
'logsize': random.randint(0, 100), # Random log size
'lognumber': random.randint(0, 100), # Random log number
'address': f'{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}.{random.randint(0, 255)}', # Random address
'workers': random.randint(1, 100), # Random workers, 0 will return as many as cpu cores
'ssl_certificate': f'/tmp/uds_tunnel_{random.randint(0, 100)}.crt', # Random ssl certificate
'ssl_certificate_key': f'/tmp/uds_tunnel_{random.randint(0, 100)}.key', # Random ssl certificate key
'ssl_ciphers': f'ciphers{random.randint(0, 100)}', # Random ssl ciphers
'ssl_dhparam': f'/tmp/uds_tunnel_{random.randint(0, 100)}.dh', # Random ssl dhparam
'uds_server': f'https://uds_server{random.randint(0, 100)}/some_path', # Random uds server
'uds_token': f'uds_token{random.choices(string.ascii_uppercase + string.digits, k=32)}', # Random uds token
'secret': f'secret{random.randint(0, 100)}', # Random secret
'allow': f'{random.randint(0, 255)}.0.0.0', # Random allow
}
h = hashlib.sha256()
h.update(values.get('secret', '').encode())
secret = h.hexdigest()
# Generate an in-memory configuration file from fixtures.TEST_CONFIG
config_file = io.StringIO(fixtures.TEST_CONFIG.format(**values))
# Read it
cfg = config.read(config_file)
# Ensure data is correct
self.assertEqual(cfg.pidfile, values['pidfile'])
self.assertEqual(cfg.user, values['user'])
self.assertEqual(cfg.log_level, values['loglevel'])
self.assertEqual(cfg.log_file, values['logfile'])
self.assertEqual(cfg.log_size, values['logsize'] * 1024 * 1024) # Config file is in MB
self.assertEqual(cfg.log_number, values['lognumber'])
self.assertEqual(cfg.listen_address, values['address'])
self.assertEqual(cfg.workers, values['workers'])
self.assertEqual(cfg.ssl_certificate, values['ssl_certificate'])
self.assertEqual(cfg.ssl_certificate_key, values['ssl_certificate_key'])
self.assertEqual(cfg.ssl_ciphers, values['ssl_ciphers'])
self.assertEqual(cfg.ssl_dhparam, values['ssl_dhparam'])
self.assertEqual(cfg.uds_server, values['uds_server'])
self.assertEqual(cfg.uds_token, values['uds_token'])
self.assertEqual(cfg.secret, secret)
self.assertEqual(cfg.allow, {values['allow']})