Client is advancing

This commit is contained in:
Adolfo Gómez García 2019-12-10 12:31:58 +01:00
parent b78a41f0d3
commit ff5601ce12
14 changed files with 149 additions and 56 deletions

View File

@ -35,10 +35,41 @@ import os
import PyQt5 # pylint: disable=unused-import
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import QTimer
from udsactor.log import logger, DEBUG
from udsactor.client import UDSActorClient
class UDSClientQApp(QApplication):
_app: UDSActorClient
_initialized: bool
def __init__(self, args) -> None:
super().__init__(args)
# This will be invoked on session close
self.commitDataRequest.connect(self.end) # Will be invoked on session close, to gracely close app
# Execute backgroup thread for actions
self._app = UDSActorClient(self)
def init(self) -> None:
# Notify loging and mark it
logger.debug('Starting APP')
self._app.start()
self._initialized = True
def end(self, sessionManager=None) -> None:
if not self._initialized:
return
self._initialized = False
logger.debug('Stopping app thread')
self._app.stop()
self._app.join()
if __name__ == "__main__":
logger.setLevel(DEBUG)
@ -49,12 +80,16 @@ if __name__ == "__main__":
QApplication.setQuitOnLastWindowClosed(False)
qApp = QApplication(sys.argv)
qApp = UDSClientQApp(sys.argv)
# Execute backgroup thread for actions
app = UDSActorClient(qApp)
# Crate a timer, so we can check signals from time to time by executing python interpreter
# Note: Signals are only checked on python code execution, so we create a
timer = QTimer(qApp)
timer.start(1000)
timer.timeout.connect(lambda *a: None)
app.start()
qApp.init()
qApp.exec_()
qApp.end()
app.join()
logger.debug('Exiting...')

View File

@ -31,6 +31,7 @@
import threading
import time
import typing
import signal
from PyQt5.QtWidgets import QApplication, QMessageBox
@ -40,6 +41,8 @@ from . import platform
from .log import logger
from .http import client
# Not imported at runtime, just for type checking
if typing.TYPE_CHECKING:
from . import types
@ -48,28 +51,44 @@ class UDSActorClient(threading.Thread):
_running: bool
_forceLogoff: bool
_qApp: QApplication
_api: rest.UDSClientApi
api: rest.UDSClientApi
_listener: client.HTTPServerThread
def __init__(self, qApp: QApplication):
super().__init__()
self._api = rest.UDSClientApi() # Self initialized
self.api = rest.UDSClientApi() # Self initialized
self._qApp = qApp
self._running = False
self._forceLogoff = False
self._listener = client.HTTPServerThread(self)
# Capture stop signals..
logger.debug('Setting signals...')
signal.signal(signal.SIGINT, self.stopSignal)
signal.signal(signal.SIGTERM, self.stopSignal)
def stopSignal(self, signum, frame) -> None: # pylint: disable=unused-argument
logger.info('Stop signal received')
self.stop()
def run(self):
logger.debug('UDS Actor thread')
self._listener.start() # async listener for service
self._running = True
time.sleep(0.4) # Wait a bit before sending login
# Notify loging and mark it
self._api.login(platform.operations.getCurrentUser())
self.api.login(platform.operations.getCurrentUser())
while self._running:
time.sleep(1.1) # Sleep between loop iteration
time.sleep(1.1) # Sleeps between loop iterations
self._api.logout(platform.operations.getCurrentUser())
self.api.logout(platform.operations.getCurrentUser())
# Notify Qapllication to exit
self._listener.stop() # async listener for service
# Notify exit to qt
QApplication.quit()
if self._forceLogoff:

View File

@ -55,7 +55,10 @@ class HTTPServerHandler(http.server.BaseHTTPRequestHandler):
self.send_header('Content-Length', str(len(data)))
self.send_header('Server: ', self.server_version)
self.end_headers()
self.wfile.write(data.encode())
try:
self.wfile.write(data.encode())
except Exception:
pass # Evict "broken pipe" when sending errors
def do_POST(self) -> None:
# Only allows requests from localhost
@ -70,7 +73,7 @@ class HTTPServerHandler(http.server.BaseHTTPRequestHandler):
try:
length = int(str(self.headers.get('content-length', '0')))
content = self.rfile.read(length)
params: typing.MutableMapping[str, str] = json.loads(content)
params: typing.MutableMapping[str, str] = json.loads(content or '{}')
except Exception as e:
logger.error('Got exception executing POST {}: {}'.format(self.path, str(e)))
self.sendJsonResponse(error='Invalid request', code=400)
@ -108,10 +111,10 @@ class HTTPServerHandler(http.server.BaseHTTPRequestHandler):
self.sendJsonResponse(error='Forbidden', code=403)
def log_error(self, format: str, *args): # pylint: disable=redefined-builtin
logger.error('ERROR ' + format, *args)
logger.error(format, *args)
def log_message(self, format: str, *args): # pylint: disable=redefined-builtin
logger.info('INFO ' + format, *args)
logger.debug(format, *args)
class HTTPServerThread(threading.Thread):
_server: typing.Optional[http.server.HTTPServer]
@ -129,9 +132,14 @@ class HTTPServerThread(threading.Thread):
self.port = -1
self.id = secrets.token_urlsafe(16)
@property
def url(self) -> str:
return 'http://127.0.0.1:{}/{}'.format(self.port, self.id)
def stop(self) -> None:
logger.debug('Stopping Http-client Service')
if self._server:
logger.debug('Stopping Http-client Service')
self._app.api.unregister(self.url)
self._server.shutdown()
self._server = None
@ -139,8 +147,12 @@ class HTTPServerThread(threading.Thread):
HTTPServerHandler._app = self._app # pylint: disable=protected-access
HTTPServerHandler._id = self.id # pylint: disable=protected-access
self._server = http.server.HTTPServer(('0.0.0.0', 0), HTTPServerHandler)
self._server = http.server.HTTPServer(('127.0.0.1', 0), HTTPServerHandler)
self.port = self._server.socket.getsockname()[1]
# Register using app api
logger.debug('Registered %s', self.url)
self._app.api.register(self.url)
self._server.serve_forever()

View File

@ -60,7 +60,7 @@ class PublicProvider(handler.Handler):
raise Exception('Invalid script parameters')
if 'user' in self._params:
logger.debug('Sending SCRIPT to client')
self._service._clientsPool.message(self._params['scripts']) # pylint: disable=protected-access
self._service._clientsPool.executeScript(self._params['script']) # pylint: disable=protected-access
else:
# Execute script at server space, that is, here
# as a parallel thread

View File

@ -71,6 +71,8 @@ class HTTPServerHandler(http.server.BaseHTTPRequestHandler):
# Very simple path & params splitter
path = self.path.split('?')[0][1:].split('/')
logger.debug('Path: %s, params: %s', path, params)
handlerType: typing.Optional[typing.Type['Handler']] = None
if len(path) == 3 and path[0] == 'actor' and path[1] == self._service._secret: # pylint: disable=protected-access
@ -118,10 +120,10 @@ class HTTPServerHandler(http.server.BaseHTTPRequestHandler):
self.process('post', params)
def log_error(self, format, *args): # pylint: disable=redefined-builtin
logger.error('ERROR ' + format % args)
logger.error(format, *args)
def log_message(self, format, *args): # pylint: disable=redefined-builtin
logger.info('INFO ' + format % args)
logger.debug(format, *args)
class HTTPServerThread(threading.Thread):
_server: typing.Optional[http.server.HTTPServer]

View File

@ -30,6 +30,7 @@
'''
import sys
from .. import rest
from ..log import logger
from .service import UDSActorSvc
@ -42,19 +43,18 @@ def run() -> None:
if len(sys.argv) == 3 and sys.argv[1] in ('login', 'logout'):
logger.debug('Running client udsactor')
# client = None
# try:
# client = ipc.ClientIPC(IPC_PORT)
# if 'login' == sys.argv[1]:
# client.sendLogin(sys.argv[2])
# sys.exit(0)
# elif 'logout' == sys.argv[1]:
# client.sendLogout(sys.argv[2])
# sys.exit(0)
# else:
# usage()
# except Exception as e:
# logger.error(e)
try:
client: rest.UDSClientApi = rest.UDSClientApi()
if sys.argv[1] == 'login':
r = client.login(sys.argv[2])
print('{},{},{},{}\n'.format(r.ip, r.hostname, r.max_idle, r.dead_line or ''))
elif sys.argv[1] == 'logout':
client.logout(sys.argv[2])
else:
usage()
except Exception as e:
logger.exception()
logger.error('Got exception while processing command: %s', e)
sys.exit(0)
elif len(sys.argv) != 2:
usage()

View File

@ -77,14 +77,15 @@ class Logger:
if level < self.logLevel: # Skip not wanted messages
return
msg = message % args
# If remote logger is available, notify message to it
try:
if self.remoteLogger:
self.remoteLogger.log(self.own_token, level, message % args)
self.remoteLogger.log(self.own_token, level, msg)
except Exception as e:
self.localLogger.log(FATAL, 'Error notifying log to broker: {}'.format(e))
self.localLogger.log(level, message)
self.localLogger.log(level, msg)
def debug(self, message: str, *args) -> None:
self.log(DEBUG, message, *args)

View File

@ -88,6 +88,9 @@ class UDSApi: # pylint: disable=too-few-public-methods
def _headers(self) -> typing.MutableMapping[str, str]:
return {'content-type': 'application/json'}
def _apiURL(self, method: str) -> str:
raise NotImplementedError
def _doPost(
self,
method: str, # i.e. 'initialize', 'ready', ....
@ -96,7 +99,7 @@ class UDSApi: # pylint: disable=too-few-public-methods
) -> typing.Any:
headers = headers or self._headers
try:
result = requests.post(self._url + 'actor/v2/' + method, data=json.dumps(payLoad), headers=headers, verify=self._validateCert)
result = requests.post(self._apiURL(method), data=json.dumps(payLoad), headers=headers, verify=self._validateCert)
if result.ok:
return result.json()['result']
except requests.ConnectionError as e:
@ -104,13 +107,21 @@ class UDSApi: # pylint: disable=too-few-public-methods
except Exception as e:
pass
raise RESTError(result.content)
try:
data = result.json()
except Exception:
data = result.content.decode()
raise RESTError(data)
#
# UDS Broker API access
#
class UDSServerApi(UDSApi):
def _apiURL(self, method: str) -> str:
return self._url + 'actor/v3/' + method
def enumerateAuthenticators(self) -> typing.Iterable[types.AuthenticatorType]:
try:
result = requests.get(self._url + 'auth/auths', headers=self._headers, verify=self._validateCert, timeout=4)
@ -165,7 +176,7 @@ class UDSServerApi(UDSApi):
headers['X-Auth-Token'] = result.json()['token']
result = requests.post(self._url + 'actor/v2/register', data=json.dumps(data), headers=headers, verify=self._validateCert)
result = requests.post(self._apiURL('register'), data=json.dumps(data), headers=headers, verify=self._validateCert)
if result.ok:
return result.json()['result']
except requests.ConnectionError as e:
@ -263,6 +274,11 @@ class UDSServerApi(UDSApi):
class UDSClientApi(UDSApi):
def __init__(self) -> None:
super().__init__('127.0.0.1:{}'.format(LISTEN_PORT), False)
# Override base url
self._url = "https://{}/ui/".format(self._host)
def _apiURL(self, method: str) -> str:
return self._url + method
def register(self, callbackUrl: str) -> None:
payLoad = {

View File

@ -138,7 +138,7 @@ class CommonService: # pylint: disable=too-many-instance-attributes
while self._isAlive:
counter -= 1
try:
self._certificate = self._api.ready(self._cfg.own_token, self._secret, srvInterface.ip, self._cfg.port)
self._certificate = self._api.ready(self._cfg.own_token, self._secret, srvInterface.ip, rest.LISTEN_PORT)
except rest.RESTConnectionError as e:
logger.info('Error connecting with UDS Broker: %s', e)
self.doWait(5000)
@ -252,13 +252,13 @@ class CommonService: # pylint: disable=too-many-instance-attributes
return self.configureMachine()
def finish(self):
def finish(self) -> None:
if self._http:
self._http.stop()
self.notifyStop()
def checkIpsChanged(self):
def checkIpsChanged(self) -> None:
try:
if not self._cfg.own_token or not self._cfg.config or not self._cfg.config.unique_id:
# Not enouth data do check
@ -269,7 +269,7 @@ class CommonService: # pylint: disable=too-many-instance-attributes
if not new or not old:
raise Exception('No ip currently available for {}'.format(self._cfg.config.unique_id))
if old.ip != new.ip:
self._certificate = self._api.notifyIpChange(self._cfg.own_token, self._secret, new.ip, self._cfg.port)
self._certificate = self._api.notifyIpChange(self._cfg.own_token, self._secret, new.ip, rest.LISTEN_PORT)
# Now store new addresses & interfaces...
self._interfaces = currentInterfaces
logger.info('Ip changed from {} to {}. Notified to UDS'.format(old.ip, new.ip))

View File

@ -63,20 +63,20 @@ class BlockAccess(Exception):
# Helpers
def checkBlockedIp(ip: str)-> None:
cache = Cache('actorv2')
cache = Cache('actorv3')
fails = cache.get(ip) or 0
if fails > ALLOWED_FAILS:
logger.info('Access to actor from %s is blocked for %s seconds since last fail', ip, GlobalConfig.LOGIN_BLOCK.getInt())
raise BlockAccess()
def incFailedIp(ip: str) -> None:
cache = Cache('actorv2')
cache = Cache('actorv3')
fails = (cache.get(ip) or 0) + 1
cache.put(ip, fails, GlobalConfig.LOGIN_BLOCK.getInt())
class ActorV3Action(Handler):
authenticated = False # Actor requests are not authenticated normally
path = 'actor/v2'
path = 'actor/v3'
@staticmethod
def actorResult(result: typing.Any = None, error: typing.Optional[str] = None) -> typing.MutableMapping[str, typing.Any]:
@ -299,7 +299,6 @@ class Ready(ChangeIp):
return result
class Login(ActorV3Action):
"""
Notifies user logged id
@ -307,7 +306,7 @@ class Login(ActorV3Action):
name = 'login'
def action(self) -> typing.MutableMapping[str, typing.Any]:
logger.debug('Args: %s, Params: %s', self._args, self._params)
logger.debug('Login Args: %s, Params: %s', self._args, self._params)
userService = self.getUserService()
osManager = userService.getOsManagerInstance()
if osManager:
@ -336,7 +335,9 @@ class Logout(ActorV3Action):
osManager = userService.getOsManagerInstance()
if osManager:
osManager.loggedOut(userService, self._params.get('username') or '')
osManager.processUnused(userService)
if osManager.isRemovableOnLogout(userService):
logger.debug('Removable on logout: %s', osManager)
userService.remove()
return ActorV3Action.actorResult('ok')

View File

@ -157,6 +157,13 @@ class OSManager(Module):
This function can update userService values. Normal operation will be remove machines if this state is not valid
"""
def isRemovableOnLogout(self, userService: 'UserService') -> bool:
"""
If returns true, when actor notifies "logout", UDS will mark service for removal
can be overriden
"""
return True
def maxIdle(self) -> typing.Optional[int]:
"""
If os manager request "max idle", this method will return a value different to None so actors will get informed on Connection

View File

@ -67,17 +67,17 @@ class Cache:
def get(self, skey: typing.Union[str, bytes], defValue: typing.Any = None) -> typing.Any:
now: datetime = typing.cast(datetime, getSqlDatetime())
logger.debug('Requesting key "%s" for cache "%s"', skey, self._owner)
# logger.debug('Requesting key "%s" for cache "%s"', skey, self._owner)
try:
key = self.__getKey(skey)
logger.debug('Key: %s', key)
# logger.debug('Key: %s', key)
c: DBCache = DBCache.objects.get(pk=key) # @UndefinedVariable
# If expired
if now > c.created + timedelta(seconds=c.validity):
return defValue
try:
logger.debug('value: %s', c.value)
# logger.debug('value: %s', c.value)
val = pickle.loads(typing.cast(bytes, encoders.decode(c.value, 'base64')))
except Exception: # If invalid, simple do no tuse it
logger.exception('Invalid pickle from cache. Removing it.')
@ -88,11 +88,11 @@ class Cache:
return val
except DBCache.DoesNotExist: # @UndefinedVariable
Cache.misses += 1
logger.debug('key not found: %s', skey)
# logger.debug('key not found: %s', skey)
return defValue
except Exception as e:
except Exception:
Cache.misses += 1
logger.debug('Cache inaccesible: %s:%s', skey, e)
# logger.debug('Cache inaccesible: %s:%s', skey, e)
return defValue
def remove(self, skey: typing.Union[str, bytes]) -> bool:

View File

@ -94,7 +94,7 @@ class LinuxOsManager(osmanagers.OSManager):
def release(self, userService: 'UserService') -> None:
pass
def isRemovableOnLogout(self, userService: 'UserService'):
def isRemovableOnLogout(self, userService: 'UserService') -> bool:
'''
Says if a machine is removable on logout
'''

View File

@ -95,7 +95,7 @@ class WindowsOsManager(osmanagers.OSManager):
self.__setProcessUnusedMachines()
def isRemovableOnLogout(self, userService):
def isRemovableOnLogout(self, userService: 'UserService') -> bool:
'''
Says if a machine is removable on logout
'''