diff --git a/client-py3/full/src/UDSClient.py b/client-py3/full/src/UDSClient.py index 2a98a22b..270f6258 100755 --- a/client-py3/full/src/UDSClient.py +++ b/client-py3/full/src/UDSClient.py @@ -32,13 +32,13 @@ ''' import sys import webbrowser -import json -import base64, bz2 +import typing +import threading -from PyQt5 import QtCore,QtWidgets +from PyQt5 import QtCore, QtWidgets from PyQt5.QtCore import QSettings -from uds.rest import RestRequest +from uds.rest import RestApi, RetryException, InvalidVersion, UDSException # Just to ensure there are available on runtime from uds.forward import forward # type: ignore @@ -50,27 +50,21 @@ from uds import VERSION from UDSWindow import Ui_MainWindow -# Server before this version uses "unsigned" scripts -OLD_METHOD_VERSION = '2.4.0' - - -class RetryException(Exception): - pass class UDSClient(QtWidgets.QMainWindow): - ticket = None - scrambler = None + ticket: str = '' + scrambler: str = '' withError = False - animTimer = None - anim = 0 - animInverted = False - serverVersion = 'X.Y.Z' # Will be overwriten on getVersion - req = None + animTimer: typing.Optional[QtCore.QTimer] = None + anim: int = 0 + animInverted: bool = False + api: RestApi - def __init__(self): + def __init__(self, api: RestApi): QtWidgets.QMainWindow.__init__(self) + self.api = api self.setWindowFlags(QtCore.Qt.FramelessWindowHint | QtCore.Qt.WindowStaysOnTopHint) # type: ignore self.ui = Ui_MainWindow() @@ -98,17 +92,6 @@ class UDSClient(QtWidgets.QMainWindow): def closeWindow(self): self.close() - def processError(self, data): - if 'error' in data: - # QtWidgets.QMessageBox.critical(self, 'Request error {}'.format(data.get('retryable', '0')), data['error'], QtWidgets.QMessageBox.Ok) - if data.get('retryable', '0') == '1': - raise RetryException(data['error']) - - raise Exception(data['error']) - # QtWidgets.QMessageBox.critical(self, 'Request error', rest.data['error'], QtWidgets.QMessageBox.Ok) - # self.closeWindow() - # return - def showError(self, error): logger.error('got error: %s', error) self.stopAnim() @@ -143,115 +126,40 @@ class UDSClient(QtWidgets.QMainWindow): self.animTimer.stop() def getVersion(self): - req = RestRequest('', msgFunction=self._sslError) - data = req.get() - try: - self.processError(data) - self.ui.info.setText('Processing...') - - if data['result']['requiredVersion'] > VERSION: - QtWidgets.QMessageBox.critical( - self, - 'Upgrade required', - 'A newer connector version is required.\nA browser will be opened to download it.', - QtWidgets.QMessageBox.Ok, - ) - webbrowser.open(data['result']['downloadUrl']) - self.closeWindow() - return - - self.serverVersion = data['result']['requiredVersion'] - # Now load transport data... - self.getTransportData() - - except RetryException as e: - self.ui.info.setText(str(e)) - QtCore.QTimer.singleShot(1000, self.getVersion) - + self.api.getVersion() + except InvalidVersion as e: + webbrowser.open(e.downloadUrl) + self.closeWindow() + return except Exception as e: self.showError(e) + self.getTransportData() + def getTransportData(self): try: - req = RestRequest( - '/{}/{}'.format(self.ticket, self.scrambler), - msgFunction=self._sslError, - params={'hostname': tools.getHostName(), 'version': VERSION}, - ) - data = req.get() - except Exception as e: - logger.exception('Got exception on getTransportData') - raise e - - logger.debug('Transport data received') - try: - self.processError(data) - - params = None - - if self.serverVersion <= OLD_METHOD_VERSION: - script = bz2.decompress(base64.b64decode(data['result'])) - # This fixes uds 2.2 "write" string on binary streams on some transport - script = script.replace(b'stdin.write("', b'stdin.write(b"') - script = script.replace(b'version)', b'version.decode("utf-8"))') - else: - res = data['result'] - # We have three elements on result: - # * Script - # * Signature - # * Script data - # We test that the Script has correct signature, and them execute it with the parameters - # script, signature, params = res['script'].decode('base64').decode('bz2'), res['signature'], json.loads(res['params'].decode('base64').decode('bz2')) - script, signature, params = ( - bz2.decompress(base64.b64decode(res['script'])), - res['signature'], - json.loads(bz2.decompress(base64.b64decode(res['params']))), - ) - if tools.verifySignature(script, signature) is False: - logger.error('Signature is invalid') - - raise Exception( - 'Invalid UDS code signature. Please, report to administrator' - ) - + script, params = self.api.getScriptAndParams(self.ticket, self.scrambler) self.stopAnim() if 'darwin' in sys.platform: self.showMinimized() - QtCore.QTimer.singleShot(3000, self.endScript) - self.hide() + # Execute the waiting task... + threading.Thread(target=endScript).start() - exec(script.decode("utf-8"), globals(), {'parent': self, 'sp': params}) + # QtCore.QTimer.singleShot(3000, self.endScript) + # self.hide() + self.closeWindow() + exec(script, globals(), {'parent': self, 'sp': params}) except RetryException as e: self.ui.info.setText(str(e) + ', retrying access...') # Retry operation in ten seconds QtCore.QTimer.singleShot(10000, self.getTransportData) - except Exception as e: - # logger.exception('Got exception executing script:') - self.showError(e) - - def endScript(self): - # After running script, wait for stuff - try: - tools.waitForTasks() - except Exception: - pass - - try: - tools.unlinkFiles() - except Exception: - pass - - try: - tools.execBeforeExit() - except Exception: - pass - - self.closeWindow() + logger.exception('Got exception on getTransportData') + raise e def start(self): """ @@ -260,19 +168,22 @@ class UDSClient(QtWidgets.QMainWindow): self.ui.info.setText('Initializing...') QtCore.QTimer.singleShot(100, self.getVersion) - def _sslError(self, hostname, serial): - settings = QSettings() - settings.beginGroup('ssl') +def endScript(): + # After running script, wait for stuff + try: + tools.waitForTasks() + except Exception: + pass - approved = settings.value(serial, False) - - if approved or QtWidgets.QMessageBox.warning(self, 'SSL Warning', errorString, QMessageBox.Yes | QMessageBox.No) == QMessageBox.Yes: # type: ignore - approved = True - settings.setValue(serial, True) - - settings.endGroup() - return approved + try: + tools.unlinkFiles() + except Exception: + pass + try: + tools.execBeforeExit() + except Exception: + pass def done(data) -> None: @@ -281,7 +192,7 @@ def done(data) -> None: # Ask user to approve endpoint -def approveHost(hostName, parentWindow=None): +def approveHost(hostName: str): settings = QtCore.QSettings() settings.beginGroup('endpoints') @@ -293,9 +204,37 @@ def approveHost(hostName, parentWindow=None): '

Only approve UDS servers that you trust to avoid security issues.

' ) - if approved or QtWidgets.QMessageBox.warning(parentWindow, 'ACCESS Warning', errorString, QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No) == QtWidgets.QMessageBox.Yes: # type: ignore - settings.setValue(hostName, True) + if not approved: + if QtWidgets.QMessageBox.warning( + None, # type: ignore + 'ACCESS Warning', + errorString, + QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No # type: ignore + ) == QtWidgets.QMessageBox.Yes: + settings.setValue(hostName, True) + approved = True + + settings.endGroup() + return approved + +def sslError(hostname: str, serial): + settings = QSettings() + settings.beginGroup('ssl') + + approved = settings.value(serial, False) + + if ( + approved + or QtWidgets.QMessageBox.warning( + None, # type: ignore + 'SSL Warning', + f'Could not check sll certificate for {hostname}', + QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No, # type: ignore + ) + == QtWidgets.QMessageBox.Yes + ): approved = True + settings.setValue(serial, True) settings.endGroup() return approved @@ -346,9 +285,9 @@ if __name__ == "__main__": sys.exit(1) # Setup REST api endpoint - RestRequest.restApiUrl = '{}://{}/uds/rest/client'.format(['http', 'https'][ssl], host) - logger.debug('Setting request URL to %s', RestRequest.restApiUrl) - # RestRequest.restApiUrl = 'https://172.27.0.1/rest/client' + api = RestApi('{}://{}/uds/rest/client'.format( + ['http', 'https'][ssl], host + ), sslError) try: logger.debug('Starting execution') @@ -357,7 +296,7 @@ if __name__ == "__main__": if approveHost(host) is False: raise Exception('Host {} was not approved'.format(host)) - win = UDSClient() + win = UDSClient(api) win.show() win.start() diff --git a/client-py3/full/src/uds/rest.py b/client-py3/full/src/uds/rest.py index 7535ca3e..1f8b7701 100644 --- a/client-py3/full/src/uds/rest.py +++ b/client-py3/full/src/uds/rest.py @@ -32,6 +32,8 @@ # pylint: disable=c-extension-no-member,no-name-in-module import json +import bz2 +import base64 import urllib import urllib.parse import urllib.request @@ -40,101 +42,196 @@ import ssl import socket import typing +import certifi from cryptography import x509 from cryptography.hazmat.backends import default_backend from . import osDetector +from . import tools from . import VERSION +from .log import logger + +# Server before this version uses "unsigned" scripts +OLD_METHOD_VERSION = '2.4.0' # Callback for error on cert # parameters are hostname, serial # If returns True, ignores error CertCallbackType = typing.Callable[[str, str], bool] +# Exceptions +class UDSException(Exception): + pass -def _open( - url: str, certErrorCallback: typing.Optional[CertCallbackType] = None -) -> typing.Any: - ctx = ssl.create_default_context() - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - hostname = urllib.parse.urlparse(url)[1] - serial = '' +class RetryException(UDSException): + pass - if url.startswith('https'): - with ctx.wrap_socket( - socket.socket(socket.AF_INET, socket.SOCK_STREAM), server_hostname=hostname - ) as s: - s.connect((hostname, 443)) - # Get binary certificate - binCert = s.getpeercert(True) - if binCert: - cert = x509.load_der_x509_certificate(binCert, default_backend()) - else: - raise Exception('Certificate not found!') +class InvalidVersion(UDSException): + downloadUrl: str - serial = hex(cert.serial_number)[2:] + def __init__(self, downloadUrl: str) -> None: + super().__init__(downloadUrl) + self.downloadUrl = downloadUrl - response = None - ctx.check_hostname = True - ctx.verify_mode = ssl.CERT_REQUIRED +class RestApi: - def urlopen(url: str): - # Generate the request with the headers - req = urllib.request.Request(url, headers={ - 'User-Agent': osDetector.getOs() + " - UDS Connector " + VERSION - }) - return urllib.request.urlopen(req, context=ctx) - - try: - response = urlopen(url) - except urllib.error.URLError as e: - if isinstance(e.reason, ssl.SSLCertVerificationError): - # Ask about invalid certificate - if certErrorCallback: - if certErrorCallback(hostname, serial): - ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE - response = urlopen(url) - else: - raise - else: - raise - - return response - - -def getUrl( - url: str, certErrorCallback: typing.Optional[CertCallbackType] = None -) -> bytes: - with _open(url, certErrorCallback) as response: - resp = response.read() - - return resp - - -class RestRequest: - - restApiUrl: typing.ClassVar[str] = '' # base Rest API URL - _msgFunction: typing.Optional[CertCallbackType] - _url: str + _restApiUrl: str # base Rest API URL + _callbackInvalidCert: typing.Optional[CertCallbackType] + _serverVersion: str def __init__( self, - url, - msgFunction: typing.Optional[CertCallbackType] = None, - params: typing.Optional[typing.Mapping[str, str]] = None, + restApiUrl, + callbackInvalidCert: typing.Optional[CertCallbackType] = None, ) -> None: # parent not used - self._msgFunction = msgFunction + logger.debug('Setting request URL to %s', restApiUrl) + self._restApiUrl = restApiUrl + self._callbackInvalidCert = callbackInvalidCert + self._serverVersion = '' + + def get(self, url: str, params: typing.Optional[typing.Mapping[str, str]] = None) -> typing.Any: if params: url += '?' + '&'.join( '{}={}'.format(k, urllib.parse.quote(str(v).encode('utf8'))) for k, v in params.items() ) - self._url = RestRequest.restApiUrl + url + return json.loads(RestApi.getUrl(self._restApiUrl + url, self._callbackInvalidCert)) - def get(self) -> typing.Any: - return json.loads(getUrl(self._url, self._msgFunction)) + def processError(self, data: typing.Any) -> None: + if 'error' in data: + if data.get('retryable', '0') == '1': + raise RetryException(data['error']) + raise UDSException(data['error']) + + + def getVersion(self) -> str: + '''Gets and stores the serverVersion. + Also checks that the version is valid for us. If not, + will raise an "InvalidVersion' exception''' + + downloadUrl = '' + if not self._serverVersion: + data = self.get('') + self.processError(data) + self._serverVersion = data['result']['requiredVersion'] + downloadUrl = data['result']['downloadUrl'] + + try: + if self._serverVersion > VERSION: + raise InvalidVersion(downloadUrl) + + return self._serverVersion + except Exception as e: + raise UDSException(e) + + def getScriptAndParams(self, ticket: str, scrambler: str) -> typing.Tuple[str, typing.Any]: + '''Gets the transport script, validates it if necesary + and returns it''' + try: + data = self.get( + '/{}/{}'.format(ticket, scrambler), + params={'hostname': tools.getHostName(), 'version': VERSION}, + ) + except Exception as e: + logger.exception('Got exception on getTransportData') + raise e + + logger.debug('Transport data received') + self.processError(data) + + params = None + + if self._serverVersion <= OLD_METHOD_VERSION: + script = bz2.decompress(base64.b64decode(data['result'])) + # This fixes uds 2.2 "write" string on binary streams on some transport + script = script.replace(b'stdin.write("', b'stdin.write(b"') + script = script.replace(b'version)', b'version.decode("utf-8"))') + else: + res = data['result'] + # We have three elements on result: + # * Script + # * Signature + # * Script data + # We test that the Script has correct signature, and them execute it with the parameters + # script, signature, params = res['script'].decode('base64').decode('bz2'), res['signature'], json.loads(res['params'].decode('base64').decode('bz2')) + script, signature, params = ( + bz2.decompress(base64.b64decode(res['script'])), + res['signature'], + json.loads(bz2.decompress(base64.b64decode(res['params']))), + ) + if tools.verifySignature(script, signature) is False: + logger.error('Signature is invalid') + + raise Exception( + 'Invalid UDS code signature. Please, report to administrator' + ) + + return script.decode(), params + + # exec(script.decode("utf-8"), globals(), {'parent': self, 'sp': params}) + + + @staticmethod + def _open( + url: str, certErrorCallback: typing.Optional[CertCallbackType] = None + ) -> typing.Any: + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + ctx.load_verify_locations(certifi.where()) + hostname = urllib.parse.urlparse(url)[1] + serial = '' + + if url.startswith('https'): + with ctx.wrap_socket( + socket.socket(socket.AF_INET, socket.SOCK_STREAM), server_hostname=hostname + ) as s: + s.connect((hostname, 443)) + # Get binary certificate + binCert = s.getpeercert(True) + if binCert: + cert = x509.load_der_x509_certificate(binCert, default_backend()) + else: + raise Exception('Certificate not found!') + + serial = hex(cert.serial_number)[2:] + + response = None + ctx.check_hostname = True + ctx.verify_mode = ssl.CERT_REQUIRED + + def urlopen(url: str): + # Generate the request with the headers + req = urllib.request.Request(url, headers={ + 'User-Agent': osDetector.getOs() + " - UDS Connector " + VERSION + }) + return urllib.request.urlopen(req, context=ctx) + + try: + response = urlopen(url) + except urllib.error.URLError as e: + if isinstance(e.reason, ssl.SSLCertVerificationError): + # Ask about invalid certificate + if certErrorCallback: + if certErrorCallback(hostname, serial): + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + response = urlopen(url) + else: + raise + else: + raise + + return response + + @staticmethod + def getUrl( + url: str, certErrorCallback: typing.Optional[CertCallbackType] = None + ) -> bytes: + with RestApi._open(url, certErrorCallback) as response: + resp = response.read() + + return resp