Added tunnel info to normalize return values & log values

This commit is contained in:
Adolfo Gómez García 2021-07-10 13:19:45 +02:00
parent 99ee0b00fc
commit aaa909fff0
2 changed files with 63 additions and 23 deletions

View File

@ -54,24 +54,37 @@ class Proxy:
self.ns = ns
@staticmethod
def _getUdsUrl(cfg: config.ConfigurationType, ticket: bytes, msg: str, queryParams: typing.Mapping[str, str] = None) -> typing.MutableMapping[str, typing.Any]:
def _getUdsUrl(
cfg: config.ConfigurationType,
ticket: bytes,
msg: str,
queryParams: typing.Mapping[str, str] = None,
) -> typing.MutableMapping[str, typing.Any]:
try:
url = cfg.uds_server + '/' + ticket.decode() + '/' + msg + '/' + cfg.uds_token
url = (
cfg.uds_server + '/' + ticket.decode() + '/' + msg + '/' + cfg.uds_token
)
if queryParams:
url += '?' + '&'. join([f'{key}={value}' for key, value in queryParams.items()])
r = requests.get(url, headers={'content-type': 'application/json', 'User-Agent': f'UDSTunnel-{consts.VERSION}'})
url += '?' + '&'.join(
[f'{key}={value}' for key, value in queryParams.items()]
)
r = requests.get(
url,
headers={
'content-type': 'application/json',
'User-Agent': f'UDSTunnel-{consts.VERSION}',
},
)
if not r.ok:
raise Exception(r.content)
return r.json()
except Exception as e:
raise Exception(f'TICKET COMMS ERROR: {ticket.decode()} {msg} {e!s}')
@staticmethod
def getFromUds(
cfg: config.ConfigurationType, ticket: bytes,
address: typing.Tuple[str, int]
cfg: config.ConfigurationType, ticket: bytes, address: typing.Tuple[str, int]
) -> typing.MutableMapping[str, typing.Any]:
# Sanity checks
if len(ticket) != consts.TICKET_LENGTH:
@ -89,11 +102,19 @@ class Proxy:
return Proxy._getUdsUrl(cfg, ticket, address[0])
@staticmethod
def notifyEndToUds(cfg: config.ConfigurationType, ticket: bytes, counter: stats.Stats) -> None:
Proxy._getUdsUrl(cfg, ticket, 'stop', {'sent': str(counter.sent), 'recv': str(counter.recv)}) # Ignore results
def notifyEndToUds(
cfg: config.ConfigurationType, ticket: bytes, counter: stats.Stats
) -> None:
Proxy._getUdsUrl(
cfg, ticket, 'stop', {'sent': str(counter.sent), 'recv': str(counter.recv)}
) # Ignore results
@staticmethod
async def doProxy(source: 'curio.io.Socket', destination: 'curio.io.Socket', counter: stats.StatsSingleCounter) -> None:
async def doProxy(
source: 'curio.io.Socket',
destination: 'curio.io.Socket',
counter: stats.StatsSingleCounter,
) -> None:
try:
while True:
data = await source.recv(consts.BUFFER_SIZE)
@ -102,13 +123,12 @@ class Proxy:
await destination.sendall(data)
counter.add(len(data))
except Exception:
# Connection broken, same result as closed for us
# Connection broken, same result as closed for us
# We must notice that i'ts easy that when closing one part of the tunnel,
# the other can break (due to some internal data), that's why even log is removed
# logger.info('CONNECTION LOST FROM %s to %s', source.getsockname(), destination.getpeername())
pass
# Method responsible of proxying requests
async def __call__(self, source, address: typing.Tuple[str, int]) -> None:
await self.proxy(source, address)
@ -134,21 +154,22 @@ class Proxy:
await source.sendall(v.encode() + b'\n')
async def proxy(self, source, address: typing.Tuple[str, int]) -> None:
pretty_adress = address[0] # Get only source IP
logger.info('CONNECT FROM %s', pretty_adress)
prettySource = address[0] # Get only source IP
prettyDest = ''
logger.info('CONNECT FROM %s', prettySource)
try:
# First, ensure handshake (simple handshake) and command
data: bytes = await source.recv(len(consts.HANDSHAKE_V1))
if data != consts.HANDSHAKE_V1:
logger.error('INVALID HANDSHAKE %s', data)
raise Exception()
except Exception:
if consts.DEBUG:
logger.exception('HANDSHAKE')
logger.error('HANDSHAKE from %s', address)
await source.sendall(b'HANDSHAKE_ERROR')
await source.sendall(b'ERROR_HANDSHAKE')
# Closes connection now
return
@ -158,6 +179,7 @@ class Proxy:
if command == consts.COMMAND_TEST:
logger.info('COMMAND: TEST')
await source.sendall(b'OK')
logger.info('TERMINATED %s', prettySource)
return
if command in (consts.COMMAND_STAT, consts.COMMAND_INFO):
@ -166,6 +188,7 @@ class Proxy:
await self.stats(
full=command == consts.COMMAND_STAT, source=source, address=address
)
logger.info('TERMINATED %s', prettySource)
return
if command != consts.COMMAND_OPEN:
@ -177,19 +200,22 @@ class Proxy:
# Ticket received, now process it with UDS
try:
result = await curio.run_in_thread(Proxy.getFromUds, self.cfg, ticket, address)
result = await curio.run_in_thread(
Proxy.getFromUds, self.cfg, ticket, address
)
except Exception as e:
logger.error('ERROR %s', e.args[0] if e.args else e)
await source.sendall(b'ERROR INVALID TICKET')
await source.sendall(b'ERROR_TICKET')
return
logger.info('OPEN TUNNEL FROM %s to %s:%s', pretty_adress, result['host'], result['port'])
prettyDest = result['host'] + ':' + result['port']
logger.info('OPEN TUNNEL FROM %s to %s', prettySource, prettyDest)
except Exception:
if consts.DEBUG:
logger.exception('COMMAND')
logger.error('ERROR from %s', address)
await source.sendall(b'COMMAND_ERROR')
logger.error('ERROR from %s', prettySource)
await source.sendall(b'ERROR_COMMAND')
return
# Communicate source OPEN is ok
@ -213,7 +239,9 @@ class Proxy:
logger.debug('PROXIES READY')
logger.debug('Proxies finalized: %s', grp.exceptions)
await curio.run_in_thread(Proxy.notifyEndToUds, self.cfg, result['notify'].encode(), counter)
await curio.run_in_thread(
Proxy.notifyEndToUds, self.cfg, result['notify'].encode(), counter
)
except Exception as e:
if consts.DEBUG:
@ -223,4 +251,11 @@ class Proxy:
finally:
counter.close() # So we ensure stats are correctly updated on ns
logger.info('TERMINATED %s, s:%s, r: %s', ':'.join(str(i) for i in address), counter.sent, counter.recv)
logger.info(
'TERMINATED %s to %s, s:%s, r:%s, t:%s',
prettySource,
prettyDest,
counter.sent,
counter.recv,
int(counter.end-counter.start)
)

View File

@ -69,6 +69,8 @@ class Stats:
last_recv: int
recv: int
last: float
start: float # timestamp
end: float
def __init__(self, ns: 'Namespace'):
self.ns = ns
@ -77,6 +79,8 @@ class Stats:
self.sent = self.last_sent = 0
self.recv = self.last_recv = 0
self.last = time.monotonic()
self.start = time.monotonic()
self.end = self.start
def update(self, force: bool = False):
now = time.monotonic()
@ -104,6 +108,7 @@ class Stats:
def close(self):
self.update(True)
self.ns.current -= 1
self.end = time.monotonic()
# Stats collector thread
class GlobalStats: