Refactoring UDS Client to allow more possibilities

This commit is contained in:
Adolfo Gómez García 2021-06-19 14:45:51 +02:00
parent eed4bc5fb7
commit 58cfa779d1
2 changed files with 243 additions and 207 deletions

View File

@ -32,13 +32,13 @@
'''
import sys
import webbrowser
import json
import base64, bz2
import typing
import threading
from PyQt5 import QtCore,QtWidgets
from PyQt5 import QtCore, QtWidgets
from PyQt5.QtCore import QSettings
from uds.rest import RestRequest
from uds.rest import RestApi, RetryException, InvalidVersion, UDSException
# Just to ensure there are available on runtime
from uds.forward import forward # type: ignore
@ -50,27 +50,21 @@ from uds import VERSION
from UDSWindow import Ui_MainWindow
# Server before this version uses "unsigned" scripts
OLD_METHOD_VERSION = '2.4.0'
class RetryException(Exception):
pass
class UDSClient(QtWidgets.QMainWindow):
ticket = None
scrambler = None
ticket: str = ''
scrambler: str = ''
withError = False
animTimer = None
anim = 0
animInverted = False
serverVersion = 'X.Y.Z' # Will be overwriten on getVersion
req = None
animTimer: typing.Optional[QtCore.QTimer] = None
anim: int = 0
animInverted: bool = False
api: RestApi
def __init__(self):
def __init__(self, api: RestApi):
QtWidgets.QMainWindow.__init__(self)
self.api = api
self.setWindowFlags(QtCore.Qt.FramelessWindowHint | QtCore.Qt.WindowStaysOnTopHint) # type: ignore
self.ui = Ui_MainWindow()
@ -98,17 +92,6 @@ class UDSClient(QtWidgets.QMainWindow):
def closeWindow(self):
self.close()
def processError(self, data):
if 'error' in data:
# QtWidgets.QMessageBox.critical(self, 'Request error {}'.format(data.get('retryable', '0')), data['error'], QtWidgets.QMessageBox.Ok)
if data.get('retryable', '0') == '1':
raise RetryException(data['error'])
raise Exception(data['error'])
# QtWidgets.QMessageBox.critical(self, 'Request error', rest.data['error'], QtWidgets.QMessageBox.Ok)
# self.closeWindow()
# return
def showError(self, error):
logger.error('got error: %s', error)
self.stopAnim()
@ -143,115 +126,40 @@ class UDSClient(QtWidgets.QMainWindow):
self.animTimer.stop()
def getVersion(self):
req = RestRequest('', msgFunction=self._sslError)
data = req.get()
try:
self.processError(data)
self.ui.info.setText('Processing...')
if data['result']['requiredVersion'] > VERSION:
QtWidgets.QMessageBox.critical(
self,
'Upgrade required',
'A newer connector version is required.\nA browser will be opened to download it.',
QtWidgets.QMessageBox.Ok,
)
webbrowser.open(data['result']['downloadUrl'])
self.closeWindow()
return
self.serverVersion = data['result']['requiredVersion']
# Now load transport data...
self.getTransportData()
except RetryException as e:
self.ui.info.setText(str(e))
QtCore.QTimer.singleShot(1000, self.getVersion)
self.api.getVersion()
except InvalidVersion as e:
webbrowser.open(e.downloadUrl)
self.closeWindow()
return
except Exception as e:
self.showError(e)
self.getTransportData()
def getTransportData(self):
try:
req = RestRequest(
'/{}/{}'.format(self.ticket, self.scrambler),
msgFunction=self._sslError,
params={'hostname': tools.getHostName(), 'version': VERSION},
)
data = req.get()
except Exception as e:
logger.exception('Got exception on getTransportData')
raise e
logger.debug('Transport data received')
try:
self.processError(data)
params = None
if self.serverVersion <= OLD_METHOD_VERSION:
script = bz2.decompress(base64.b64decode(data['result']))
# This fixes uds 2.2 "write" string on binary streams on some transport
script = script.replace(b'stdin.write("', b'stdin.write(b"')
script = script.replace(b'version)', b'version.decode("utf-8"))')
else:
res = data['result']
# We have three elements on result:
# * Script
# * Signature
# * Script data
# We test that the Script has correct signature, and them execute it with the parameters
# script, signature, params = res['script'].decode('base64').decode('bz2'), res['signature'], json.loads(res['params'].decode('base64').decode('bz2'))
script, signature, params = (
bz2.decompress(base64.b64decode(res['script'])),
res['signature'],
json.loads(bz2.decompress(base64.b64decode(res['params']))),
)
if tools.verifySignature(script, signature) is False:
logger.error('Signature is invalid')
raise Exception(
'Invalid UDS code signature. Please, report to administrator'
)
script, params = self.api.getScriptAndParams(self.ticket, self.scrambler)
self.stopAnim()
if 'darwin' in sys.platform:
self.showMinimized()
QtCore.QTimer.singleShot(3000, self.endScript)
self.hide()
# Execute the waiting task...
threading.Thread(target=endScript).start()
exec(script.decode("utf-8"), globals(), {'parent': self, 'sp': params})
# QtCore.QTimer.singleShot(3000, self.endScript)
# self.hide()
self.closeWindow()
exec(script, globals(), {'parent': self, 'sp': params})
except RetryException as e:
self.ui.info.setText(str(e) + ', retrying access...')
# Retry operation in ten seconds
QtCore.QTimer.singleShot(10000, self.getTransportData)
except Exception as e:
# logger.exception('Got exception executing script:')
self.showError(e)
def endScript(self):
# After running script, wait for stuff
try:
tools.waitForTasks()
except Exception:
pass
try:
tools.unlinkFiles()
except Exception:
pass
try:
tools.execBeforeExit()
except Exception:
pass
self.closeWindow()
logger.exception('Got exception on getTransportData')
raise e
def start(self):
"""
@ -260,19 +168,22 @@ class UDSClient(QtWidgets.QMainWindow):
self.ui.info.setText('Initializing...')
QtCore.QTimer.singleShot(100, self.getVersion)
def _sslError(self, hostname, serial):
settings = QSettings()
settings.beginGroup('ssl')
def endScript():
# After running script, wait for stuff
try:
tools.waitForTasks()
except Exception:
pass
approved = settings.value(serial, False)
if approved or QtWidgets.QMessageBox.warning(self, 'SSL Warning', errorString, QMessageBox.Yes | QMessageBox.No) == QMessageBox.Yes: # type: ignore
approved = True
settings.setValue(serial, True)
settings.endGroup()
return approved
try:
tools.unlinkFiles()
except Exception:
pass
try:
tools.execBeforeExit()
except Exception:
pass
def done(data) -> None:
@ -281,7 +192,7 @@ def done(data) -> None:
# Ask user to approve endpoint
def approveHost(hostName, parentWindow=None):
def approveHost(hostName: str):
settings = QtCore.QSettings()
settings.beginGroup('endpoints')
@ -293,9 +204,37 @@ def approveHost(hostName, parentWindow=None):
'<p>Only approve UDS servers that you trust to avoid security issues.</p>'
)
if approved or QtWidgets.QMessageBox.warning(parentWindow, 'ACCESS Warning', errorString, QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No) == QtWidgets.QMessageBox.Yes: # type: ignore
settings.setValue(hostName, True)
if not approved:
if QtWidgets.QMessageBox.warning(
None, # type: ignore
'ACCESS Warning',
errorString,
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No # type: ignore
) == QtWidgets.QMessageBox.Yes:
settings.setValue(hostName, True)
approved = True
settings.endGroup()
return approved
def sslError(hostname: str, serial):
settings = QSettings()
settings.beginGroup('ssl')
approved = settings.value(serial, False)
if (
approved
or QtWidgets.QMessageBox.warning(
None, # type: ignore
'SSL Warning',
f'Could not check sll certificate for {hostname}',
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No, # type: ignore
)
== QtWidgets.QMessageBox.Yes
):
approved = True
settings.setValue(serial, True)
settings.endGroup()
return approved
@ -346,9 +285,9 @@ if __name__ == "__main__":
sys.exit(1)
# Setup REST api endpoint
RestRequest.restApiUrl = '{}://{}/uds/rest/client'.format(['http', 'https'][ssl], host)
logger.debug('Setting request URL to %s', RestRequest.restApiUrl)
# RestRequest.restApiUrl = 'https://172.27.0.1/rest/client'
api = RestApi('{}://{}/uds/rest/client'.format(
['http', 'https'][ssl], host
), sslError)
try:
logger.debug('Starting execution')
@ -357,7 +296,7 @@ if __name__ == "__main__":
if approveHost(host) is False:
raise Exception('Host {} was not approved'.format(host))
win = UDSClient()
win = UDSClient(api)
win.show()
win.start()

View File

@ -32,6 +32,8 @@
# pylint: disable=c-extension-no-member,no-name-in-module
import json
import bz2
import base64
import urllib
import urllib.parse
import urllib.request
@ -40,101 +42,196 @@ import ssl
import socket
import typing
import certifi
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from . import osDetector
from . import tools
from . import VERSION
from .log import logger
# Server before this version uses "unsigned" scripts
OLD_METHOD_VERSION = '2.4.0'
# Callback for error on cert
# parameters are hostname, serial
# If returns True, ignores error
CertCallbackType = typing.Callable[[str, str], bool]
# Exceptions
class UDSException(Exception):
pass
def _open(
url: str, certErrorCallback: typing.Optional[CertCallbackType] = None
) -> typing.Any:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
hostname = urllib.parse.urlparse(url)[1]
serial = ''
class RetryException(UDSException):
pass
if url.startswith('https'):
with ctx.wrap_socket(
socket.socket(socket.AF_INET, socket.SOCK_STREAM), server_hostname=hostname
) as s:
s.connect((hostname, 443))
# Get binary certificate
binCert = s.getpeercert(True)
if binCert:
cert = x509.load_der_x509_certificate(binCert, default_backend())
else:
raise Exception('Certificate not found!')
class InvalidVersion(UDSException):
downloadUrl: str
serial = hex(cert.serial_number)[2:]
def __init__(self, downloadUrl: str) -> None:
super().__init__(downloadUrl)
self.downloadUrl = downloadUrl
response = None
ctx.check_hostname = True
ctx.verify_mode = ssl.CERT_REQUIRED
class RestApi:
def urlopen(url: str):
# Generate the request with the headers
req = urllib.request.Request(url, headers={
'User-Agent': osDetector.getOs() + " - UDS Connector " + VERSION
})
return urllib.request.urlopen(req, context=ctx)
try:
response = urlopen(url)
except urllib.error.URLError as e:
if isinstance(e.reason, ssl.SSLCertVerificationError):
# Ask about invalid certificate
if certErrorCallback:
if certErrorCallback(hostname, serial):
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
response = urlopen(url)
else:
raise
else:
raise
return response
def getUrl(
url: str, certErrorCallback: typing.Optional[CertCallbackType] = None
) -> bytes:
with _open(url, certErrorCallback) as response:
resp = response.read()
return resp
class RestRequest:
restApiUrl: typing.ClassVar[str] = '' # base Rest API URL
_msgFunction: typing.Optional[CertCallbackType]
_url: str
_restApiUrl: str # base Rest API URL
_callbackInvalidCert: typing.Optional[CertCallbackType]
_serverVersion: str
def __init__(
self,
url,
msgFunction: typing.Optional[CertCallbackType] = None,
params: typing.Optional[typing.Mapping[str, str]] = None,
restApiUrl,
callbackInvalidCert: typing.Optional[CertCallbackType] = None,
) -> None: # parent not used
self._msgFunction = msgFunction
logger.debug('Setting request URL to %s', restApiUrl)
self._restApiUrl = restApiUrl
self._callbackInvalidCert = callbackInvalidCert
self._serverVersion = ''
def get(self, url: str, params: typing.Optional[typing.Mapping[str, str]] = None) -> typing.Any:
if params:
url += '?' + '&'.join(
'{}={}'.format(k, urllib.parse.quote(str(v).encode('utf8')))
for k, v in params.items()
)
self._url = RestRequest.restApiUrl + url
return json.loads(RestApi.getUrl(self._restApiUrl + url, self._callbackInvalidCert))
def get(self) -> typing.Any:
return json.loads(getUrl(self._url, self._msgFunction))
def processError(self, data: typing.Any) -> None:
if 'error' in data:
if data.get('retryable', '0') == '1':
raise RetryException(data['error'])
raise UDSException(data['error'])
def getVersion(self) -> str:
'''Gets and stores the serverVersion.
Also checks that the version is valid for us. If not,
will raise an "InvalidVersion' exception'''
downloadUrl = ''
if not self._serverVersion:
data = self.get('')
self.processError(data)
self._serverVersion = data['result']['requiredVersion']
downloadUrl = data['result']['downloadUrl']
try:
if self._serverVersion > VERSION:
raise InvalidVersion(downloadUrl)
return self._serverVersion
except Exception as e:
raise UDSException(e)
def getScriptAndParams(self, ticket: str, scrambler: str) -> typing.Tuple[str, typing.Any]:
'''Gets the transport script, validates it if necesary
and returns it'''
try:
data = self.get(
'/{}/{}'.format(ticket, scrambler),
params={'hostname': tools.getHostName(), 'version': VERSION},
)
except Exception as e:
logger.exception('Got exception on getTransportData')
raise e
logger.debug('Transport data received')
self.processError(data)
params = None
if self._serverVersion <= OLD_METHOD_VERSION:
script = bz2.decompress(base64.b64decode(data['result']))
# This fixes uds 2.2 "write" string on binary streams on some transport
script = script.replace(b'stdin.write("', b'stdin.write(b"')
script = script.replace(b'version)', b'version.decode("utf-8"))')
else:
res = data['result']
# We have three elements on result:
# * Script
# * Signature
# * Script data
# We test that the Script has correct signature, and them execute it with the parameters
# script, signature, params = res['script'].decode('base64').decode('bz2'), res['signature'], json.loads(res['params'].decode('base64').decode('bz2'))
script, signature, params = (
bz2.decompress(base64.b64decode(res['script'])),
res['signature'],
json.loads(bz2.decompress(base64.b64decode(res['params']))),
)
if tools.verifySignature(script, signature) is False:
logger.error('Signature is invalid')
raise Exception(
'Invalid UDS code signature. Please, report to administrator'
)
return script.decode(), params
# exec(script.decode("utf-8"), globals(), {'parent': self, 'sp': params})
@staticmethod
def _open(
url: str, certErrorCallback: typing.Optional[CertCallbackType] = None
) -> typing.Any:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
ctx.load_verify_locations(certifi.where())
hostname = urllib.parse.urlparse(url)[1]
serial = ''
if url.startswith('https'):
with ctx.wrap_socket(
socket.socket(socket.AF_INET, socket.SOCK_STREAM), server_hostname=hostname
) as s:
s.connect((hostname, 443))
# Get binary certificate
binCert = s.getpeercert(True)
if binCert:
cert = x509.load_der_x509_certificate(binCert, default_backend())
else:
raise Exception('Certificate not found!')
serial = hex(cert.serial_number)[2:]
response = None
ctx.check_hostname = True
ctx.verify_mode = ssl.CERT_REQUIRED
def urlopen(url: str):
# Generate the request with the headers
req = urllib.request.Request(url, headers={
'User-Agent': osDetector.getOs() + " - UDS Connector " + VERSION
})
return urllib.request.urlopen(req, context=ctx)
try:
response = urlopen(url)
except urllib.error.URLError as e:
if isinstance(e.reason, ssl.SSLCertVerificationError):
# Ask about invalid certificate
if certErrorCallback:
if certErrorCallback(hostname, serial):
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
response = urlopen(url)
else:
raise
else:
raise
return response
@staticmethod
def getUrl(
url: str, certErrorCallback: typing.Optional[CertCallbackType] = None
) -> bytes:
with RestApi._open(url, certErrorCallback) as response:
resp = response.read()
return resp