forked from shaba/openuds
Changed UDSClient to remove QApp Network access and used urllib instead
This commit is contained in:
parent
21f811d995
commit
3ed3f03d25
@ -35,11 +35,15 @@ import webbrowser
|
||||
import json
|
||||
import base64, bz2
|
||||
|
||||
from PyQt5 import QtCore, QtGui, QtWidgets # @UnresolvedImport
|
||||
from PyQt5 import QtCore,QtWidgets
|
||||
from PyQt5.QtCore import QSettings
|
||||
|
||||
from uds.rest import RestRequest
|
||||
from uds.forward import forward # pylint: disable=unused-import
|
||||
from uds.tunnel import forward as f2 # pylint: disable=unused-import
|
||||
|
||||
# Just to ensure there are available on runtime
|
||||
from uds.forward import forward # type: ignore
|
||||
from uds.tunnel import forward as f2 # type: ignore
|
||||
|
||||
from uds.log import logger
|
||||
from uds import tools
|
||||
from uds import VERSION
|
||||
@ -139,10 +143,9 @@ class UDSClient(QtWidgets.QMainWindow):
|
||||
self.animTimer.stop()
|
||||
|
||||
def getVersion(self):
|
||||
self.req = RestRequest('', self, self.version)
|
||||
self.req.get()
|
||||
req = RestRequest('', msgFunction=self._sslError)
|
||||
data = req.get()
|
||||
|
||||
def version(self, data):
|
||||
try:
|
||||
self.processError(data)
|
||||
self.ui.info.setText('Processing...')
|
||||
@ -159,6 +162,7 @@ class UDSClient(QtWidgets.QMainWindow):
|
||||
return
|
||||
|
||||
self.serverVersion = data['result']['requiredVersion']
|
||||
# Now load transport data...
|
||||
self.getTransportData()
|
||||
|
||||
except RetryException as e:
|
||||
@ -170,18 +174,16 @@ class UDSClient(QtWidgets.QMainWindow):
|
||||
|
||||
def getTransportData(self):
|
||||
try:
|
||||
self.req = RestRequest(
|
||||
req = RestRequest(
|
||||
'/{}/{}'.format(self.ticket, self.scrambler),
|
||||
self,
|
||||
self.transportDataReceived,
|
||||
msgFunction=self._sslError,
|
||||
params={'hostname': tools.getHostName(), 'version': VERSION},
|
||||
)
|
||||
self.req.get()
|
||||
data = req.get()
|
||||
except Exception as e:
|
||||
logger.exception('Got exception on getTransportData')
|
||||
raise e
|
||||
|
||||
def transportDataReceived(self, data):
|
||||
logger.debug('Transport data received')
|
||||
try:
|
||||
self.processError(data)
|
||||
@ -258,6 +260,20 @@ class UDSClient(QtWidgets.QMainWindow):
|
||||
self.ui.info.setText('Initializing...')
|
||||
QtCore.QTimer.singleShot(100, self.getVersion)
|
||||
|
||||
def _sslError(self, hostname, serial):
|
||||
settings = QSettings()
|
||||
settings.beginGroup('ssl')
|
||||
|
||||
approved = settings.value(serial, False)
|
||||
|
||||
if approved or QMessageBox.warning(self, 'SSL Warning', errorString, QMessageBox.Yes | QMessageBox.No) == QMessageBox.Yes: # type: ignore
|
||||
approved = True
|
||||
settings.setValue(serial, True)
|
||||
|
||||
settings.endGroup()
|
||||
return approved
|
||||
|
||||
|
||||
|
||||
def done(data) -> None:
|
||||
QtWidgets.QMessageBox.critical(None, 'Notice', str(data.data), QtWidgets.QMessageBox.Ok) # type: ignore
|
||||
|
@ -32,93 +32,109 @@
|
||||
# pylint: disable=c-extension-no-member,no-name-in-module
|
||||
|
||||
import json
|
||||
import os
|
||||
import urllib
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
import ssl
|
||||
import socket
|
||||
import typing
|
||||
|
||||
from PyQt5.QtCore import pyqtSignal
|
||||
from PyQt5.QtCore import QObject, QUrl, QSettings
|
||||
from PyQt5.QtCore import Qt
|
||||
from PyQt5.QtNetwork import QNetworkAccessManager, QNetworkRequest, QNetworkReply, QSslCertificate
|
||||
from PyQt5.QtWidgets import QMessageBox
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
|
||||
from . import osDetector
|
||||
|
||||
from . import VERSION
|
||||
|
||||
# Callback for error on cert
|
||||
# parameters are hostname, serial
|
||||
# If returns True, ignores error
|
||||
CertCallbackType = typing.Callable[[str, str], bool]
|
||||
|
||||
|
||||
class RestRequest(QObject):
|
||||
def _open(
|
||||
url: str, certErrorCallback: typing.Optional[CertCallbackType] = None
|
||||
) -> typing.Any:
|
||||
ctx = ssl.create_default_context()
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
hostname = urllib.parse.urlparse(url)[1]
|
||||
serial = ''
|
||||
|
||||
restApiUrl = '' #
|
||||
if url.startswith('https'):
|
||||
with ctx.wrap_socket(
|
||||
socket.socket(socket.AF_INET, socket.SOCK_STREAM), server_hostname=hostname
|
||||
) as s:
|
||||
s.connect((hostname, 443))
|
||||
# Get binary certificate
|
||||
binCert = s.getpeercert(True)
|
||||
if binCert:
|
||||
cert = x509.load_der_x509_certificate(binCert, default_backend())
|
||||
else:
|
||||
raise Exception('Certificate not found!')
|
||||
|
||||
done = pyqtSignal(dict, name='done')
|
||||
serial = hex(cert.serial_number)[2:]
|
||||
|
||||
response = None
|
||||
ctx.check_hostname = True
|
||||
ctx.verify_mode = ssl.CERT_REQUIRED
|
||||
|
||||
def urlopen(url: str):
|
||||
# Generate the request with the headers
|
||||
req = urllib.request.Request(url, headers={
|
||||
'User-Agent': osDetector.getOs() + " - UDS Connector " + VERSION
|
||||
})
|
||||
return urllib.request.urlopen(req, context=ctx)
|
||||
|
||||
def __init__(self, url, parentWindow, done, params=None): # parent not used
|
||||
super(RestRequest, self).__init__()
|
||||
# private
|
||||
self._manager = QNetworkAccessManager()
|
||||
try:
|
||||
if os.path.exists('/etc/ssl/certs/ca-certificates.crt'):
|
||||
pass
|
||||
# os.environ['REQUESTS_CA_BUNDLE'] = '/etc/ssl/certs/ca-certificates.crt'
|
||||
except Exception:
|
||||
pass
|
||||
response = urlopen(url)
|
||||
except urllib.error.URLError as e:
|
||||
if isinstance(e.reason, ssl.SSLCertVerificationError):
|
||||
# Ask about invalid certificate
|
||||
if certErrorCallback:
|
||||
if certErrorCallback(hostname, serial):
|
||||
ctx.check_hostname = False
|
||||
ctx.verify_mode = ssl.CERT_NONE
|
||||
response = urlopen(url)
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
raise
|
||||
|
||||
return response
|
||||
|
||||
|
||||
if params is not None:
|
||||
url += '?' + '&'.join('{}={}'.format(k, urllib.parse.quote(str(v).encode('utf8'))) for k, v in params.items())
|
||||
def getUrl(
|
||||
url: str, certErrorCallback: typing.Optional[CertCallbackType] = None
|
||||
) -> bytes:
|
||||
with _open(url, certErrorCallback) as response:
|
||||
resp = response.read()
|
||||
|
||||
self.url = QUrl(RestRequest.restApiUrl + url)
|
||||
return resp
|
||||
|
||||
# connect asynchronous result, when a request finishes
|
||||
self._manager.finished.connect(self._finished)
|
||||
self._manager.sslErrors.connect(self._sslError)
|
||||
self._parentWindow = parentWindow
|
||||
|
||||
self.done.connect(done, Qt.QueuedConnection) # type: ignore
|
||||
class RestRequest:
|
||||
|
||||
def _finished(self, reply):
|
||||
'''
|
||||
Handle signal 'finished'. A network request has finished.
|
||||
'''
|
||||
try:
|
||||
if reply.error() != QNetworkReply.NoError:
|
||||
raise Exception(reply.errorString())
|
||||
data = bytes(reply.readAll())
|
||||
data = json.loads(data)
|
||||
except Exception as e:
|
||||
data = {
|
||||
'result': None,
|
||||
'error': str(e)
|
||||
}
|
||||
restApiUrl: typing.ClassVar[str] = '' # base Rest API URL
|
||||
_msgFunction: typing.Optional[CertCallbackType]
|
||||
_url: str
|
||||
|
||||
self.done.emit(data) # type: ignore
|
||||
def __init__(
|
||||
self,
|
||||
url,
|
||||
msgFunction: typing.Optional[CertCallbackType] = None,
|
||||
params: typing.Optional[typing.Mapping[str, str]] = None,
|
||||
) -> None: # parent not used
|
||||
self._msgFunction = msgFunction
|
||||
|
||||
reply.deleteLater() # schedule for delete from main event loop
|
||||
if params:
|
||||
url += '?' + '&'.join(
|
||||
'{}={}'.format(k, urllib.parse.quote(str(v).encode('utf8')))
|
||||
for k, v in params.items()
|
||||
)
|
||||
|
||||
def _sslError(self, reply, errors):
|
||||
settings = QSettings()
|
||||
settings.beginGroup('ssl')
|
||||
cert = errors[0].certificate()
|
||||
digest = str(cert.digest().toHex())
|
||||
self._url = RestRequest.restApiUrl + url
|
||||
|
||||
approved = settings.value(digest, False)
|
||||
def get(self) -> typing.Any:
|
||||
return json.loads(getUrl(self._url, self._msgFunction))
|
||||
|
||||
errorString = '<p>The certificate for <b>{}</b> has the following errors:</p><ul>'.format(cert.subjectInfo(QSslCertificate.CommonName))
|
||||
|
||||
for err in errors:
|
||||
errorString += '<li>' + err.errorString() + '</li>'
|
||||
|
||||
errorString += '</ul>'
|
||||
|
||||
if approved or QMessageBox.warning(self._parentWindow, 'SSL Warning', errorString, QMessageBox.Yes | QMessageBox.No) == QMessageBox.Yes: # type: ignore
|
||||
settings.setValue(digest, True)
|
||||
reply.ignoreSslErrors()
|
||||
|
||||
settings.endGroup()
|
||||
|
||||
def get(self):
|
||||
request = QNetworkRequest(self.url)
|
||||
request.setRawHeader(b'User-Agent', osDetector.getOs().encode('utf-8') + b" - UDS Connector " + VERSION.encode('utf-8'))
|
||||
self._manager.get(request)
|
||||
|
@ -401,6 +401,10 @@ class gui:
|
||||
multiline = 8
|
||||
self._data['multiline'] = multiline
|
||||
|
||||
def cleanStr(self):
|
||||
return str(self.value).strip()
|
||||
|
||||
|
||||
class NumericField(InputField):
|
||||
"""
|
||||
This represents a numeric field. It apears with an spin up/down button.
|
||||
@ -535,6 +539,9 @@ class gui:
|
||||
super().__init__(**options)
|
||||
self._type(gui.InputField.PASSWORD_TYPE)
|
||||
|
||||
def cleanStr(self):
|
||||
return str(self.value).strip()
|
||||
|
||||
class HiddenField(InputField):
|
||||
"""
|
||||
This represents a hidden field. It is not displayed to the user. It use
|
||||
|
Loading…
Reference in New Issue
Block a user