Small spelling fixes

This commit is contained in:
Adolfo Gómez García 2021-07-19 01:16:18 +02:00
parent 91f90c8630
commit 51407b54ee
2 changed files with 99 additions and 74 deletions

View File

@ -43,35 +43,45 @@ from .version import VERSION
LISTEN_PORT = 43910
# Default timeout
TIMEOUT = 5 # 5 seconds is more than enought
TIMEOUT = 5 # 5 seconds is more than enought
# Constants
UNKNOWN = 'unknown'
class RESTError(Exception):
ERRCODE = 0
class RESTConnectionError(RESTError):
ERRCODE = -1
# Errors ""raised"" from broker
class RESTInvalidKeyError(RESTError):
ERRCODE = 1
class RESTUnmanagedHostError(RESTError):
ERRCODE = 2
class RESTUserServiceNotFoundError(RESTError):
ERRCODE = 3
class RESTOsManagerError(RESTError):
ERRCODE = 4
# For avoid proxy on localhost connections
NO_PROXY = {
'http': None,
'https': None,
}
UDS_BASE_URL = 'https://{}/uds/rest/'
#
# Basic UDS Api
#
@ -79,6 +89,7 @@ class UDSApi: # pylint: disable=too-few-public-methods
"""
Base for remote api accesses
"""
_host: str
_validateCert: bool
_url: str
@ -86,12 +97,12 @@ class UDSApi: # pylint: disable=too-few-public-methods
def __init__(self, host: str, validateCert: bool) -> None:
self._host = host
self._validateCert = validateCert
self._url = "https://{}/uds/rest/".format(self._host)
self._url = UDS_BASE_URL.format(self._host)
# Disable logging requests messages except for errors, ...
logging.getLogger("requests").setLevel(logging.CRITICAL)
logging.getLogger("urllib3").setLevel(logging.ERROR)
logging.getLogger('request').setLevel(logging.CRITICAL)
logging.getLogger('urllib3').setLevel(logging.ERROR)
try:
warnings.simplefilter("ignore") # Disables all warnings
warnings.simplefilter('ignore') # Disables all warnings
except Exception:
pass
@ -99,19 +110,19 @@ class UDSApi: # pylint: disable=too-few-public-methods
def _headers(self) -> typing.MutableMapping[str, str]:
return {
'Content-Type': 'application/json',
'User-Agent': 'UDS Actor v{}'.format(VERSION)
'User-Agent': 'UDS Actor v{}'.format(VERSION),
}
def _apiURL(self, method: str) -> str:
raise NotImplementedError
def _doPost(
self,
method: str, # i.e. 'initialize', 'ready', ....
payLoad: typing.MutableMapping[str, typing.Any],
headers: typing.Optional[typing.MutableMapping[str, str]] = None,
disableProxy: bool = False
) -> typing.Any:
self,
method: str, # i.e. 'initialize', 'ready', ....
payLoad: typing.MutableMapping[str, typing.Any],
headers: typing.Optional[typing.MutableMapping[str, str]] = None,
disableProxy: bool = False,
) -> typing.Any:
headers = headers or self._headers
try:
result = requests.post(
@ -120,7 +131,9 @@ class UDSApi: # pylint: disable=too-few-public-methods
headers=headers,
verify=self._validateCert,
timeout=TIMEOUT,
proxies=NO_PROXY if disableProxy else None # if not proxies wanted, enforce it
proxies=NO_PROXY
if disableProxy
else None, # if not proxies wanted, enforce it
)
if result.ok:
@ -139,6 +152,7 @@ class UDSApi: # pylint: disable=too-few-public-methods
raise RESTError(data)
#
# UDS Broker API access
#
@ -148,7 +162,12 @@ class UDSServerApi(UDSApi):
def enumerateAuthenticators(self) -> typing.Iterable[types.AuthenticatorType]:
try:
result = requests.get(self._url + 'auth/auths', headers=self._headers, verify=self._validateCert, timeout=4)
result = requests.get(
self._url + 'auth/auths',
headers=self._headers,
verify=self._validateCert,
timeout=4,
)
if result.ok:
for v in sorted(result.json(), key=lambda x: x['priority']):
yield types.AuthenticatorType(
@ -157,7 +176,7 @@ class UDSServerApi(UDSApi):
auth=v['auth'],
type=v['type'],
priority=v['priority'],
isCustom=v['isCustom']
isCustom=v['isCustom'],
)
except Exception:
pass
@ -173,7 +192,7 @@ class UDSServerApi(UDSApi):
preCommand: str,
runOnceCommand: str,
postCommand: str,
logLevel: int
logLevel: int,
) -> str:
"""
Raises an exception if could not register, or registers and returns the "authorization token"
@ -186,7 +205,7 @@ class UDSServerApi(UDSApi):
'pre_command': preCommand,
'run_once_command': runOnceCommand,
'post_command': postCommand,
'log_level': logLevel
'log_level': logLevel,
}
# First, try to login to REST api
@ -194,13 +213,23 @@ class UDSServerApi(UDSApi):
# First, try to login
authInfo = {'auth': auth, 'username': username, 'password': password}
headers = self._headers
result = requests.post(self._url + 'auth/login', data=json.dumps(authInfo), headers=headers, verify=self._validateCert)
result = requests.post(
self._url + 'auth/login',
data=json.dumps(authInfo),
headers=headers,
verify=self._validateCert,
)
if not result.ok or result.json()['result'] == 'error':
raise Exception() # Invalid credentials
headers['X-Auth-Token'] = result.json()['token']
result = requests.post(self._apiURL('register'), data=json.dumps(data), headers=headers, verify=self._validateCert)
result = requests.post(
self._apiURL('register'),
data=json.dumps(data),
headers=headers,
verify=self._validateCert,
)
if result.ok:
return result.json()['result']
except requests.ConnectionError as e:
@ -212,13 +241,18 @@ class UDSServerApi(UDSApi):
raise RESTError(result.content.decode())
def initialize(self, token: str, interfaces: typing.Iterable[types.InterfaceInfoType], actor_type: typing.Optional[str]) -> types.InitializationResultType:
def initialize(
self,
token: str,
interfaces: typing.Iterable[types.InterfaceInfoType],
actor_type: typing.Optional[str],
) -> types.InitializationResultType:
# Generate id list from netork cards
payload = {
'type': actor_type or types.MANAGED,
'token': token,
'version': VERSION,
'id': [{'mac': i.mac, 'ip': i.ip} for i in interfaces]
'id': [{'mac': i.mac, 'ip': i.ip} for i in interfaces],
}
r = self._doPost('initialize', payload)
os = r['os']
@ -232,53 +266,55 @@ class UDSServerApi(UDSApi):
password=os.get('password'),
new_password=os.get('new_password'),
ad=os.get('ad'),
ou=os.get('ou')
) if r['os'] else None
ou=os.get('ou'),
)
if r['os']
else None,
)
def ready(self, own_token: str, secret: str, ip: str, port: int) -> types.CertificateInfoType:
payload = {
'token': own_token,
'secret': secret,
'ip': ip,
'port': port
}
def ready(
self, own_token: str, secret: str, ip: str, port: int
) -> types.CertificateInfoType:
payload = {'token': own_token, 'secret': secret, 'ip': ip, 'port': port}
result = self._doPost('ready', payload)
return types.CertificateInfoType(
private_key=result['private_key'],
server_certificate=result['server_certificate'],
password=result['password']
password=result['password'],
)
def notifyIpChange(self, own_token: str, secret: str, ip: str, port: int) -> types.CertificateInfoType:
payload = {
'token': own_token,
'secret': secret,
'ip': ip,
'port': port
}
def notifyIpChange(
self, own_token: str, secret: str, ip: str, port: int
) -> types.CertificateInfoType:
payload = {'token': own_token, 'secret': secret, 'ip': ip, 'port': port}
result = self._doPost('ipchange', payload)
return types.CertificateInfoType(
private_key=result['private_key'],
server_certificate=result['server_certificate'],
password=result['password']
password=result['password'],
)
def notifyUnmanagedCallback(self, master_token: str, secret: str, interfaces: typing.Iterable[types.InterfaceInfoType], port: int) -> types.CertificateInfoType:
def notifyUnmanagedCallback(
self,
master_token: str,
secret: str,
interfaces: typing.Iterable[types.InterfaceInfoType],
port: int,
) -> types.CertificateInfoType:
payload = {
'id': [{'mac': i.mac, 'ip': i.ip} for i in interfaces],
'token': master_token,
'secret': secret,
'port': port
'port': port,
}
result = self._doPost('unmanaged', payload)
return types.CertificateInfoType(
private_key=result['private_key'],
server_certificate=result['server_certificate'],
password=result['password']
password=result['password'],
)
def login(
@ -288,14 +324,11 @@ class UDSServerApi(UDSApi):
username: str,
sessionType: str,
interfaces: typing.Iterable[types.InterfaceInfoType],
secret: typing.Optional[str]
secret: typing.Optional[str],
) -> types.LoginResultInfoType:
if not token:
return types.LoginResultInfoType(
ip='0.0.0.0',
hostname=UNKNOWN,
dead_line=None,
max_idle=None
ip='0.0.0.0', hostname=UNKNOWN, dead_line=None, max_idle=None
)
payload = {
'type': actor_type or types.MANAGED,
@ -310,7 +343,7 @@ class UDSServerApi(UDSApi):
ip=result['ip'],
hostname=result['hostname'],
dead_line=result['dead_line'],
max_idle=result['max_idle']
max_idle=result['max_idle'],
)
def logout(
@ -319,7 +352,7 @@ class UDSServerApi(UDSApi):
token: str,
username: str,
interfaces: typing.Iterable[types.InterfaceInfoType],
secret: typing.Optional[str]
secret: typing.Optional[str],
) -> None:
if not token:
return
@ -328,19 +361,14 @@ class UDSServerApi(UDSApi):
'id': [{'mac': i.mac, 'ip': i.ip} for i in interfaces],
'token': token,
'username': username,
'secret': secret or ''
'secret': secret or '',
}
self._doPost('logout', payload)
def log(self, own_token: str, level: int, message: str) -> None:
if not own_token:
return
payLoad = {
'token': own_token,
'level': level,
'message': message
}
payLoad = {'token': own_token, 'level': level, 'message': message}
self._doPost('log', payLoad) # Ignores result...
def test(self, master_token: str, actorType: typing.Optional[str]) -> bool:
@ -359,26 +387,25 @@ class UDSClientApi(UDSApi):
def _apiURL(self, method: str) -> str:
return self._url + method
def post(
self,
method: str, # i.e. 'initialize', 'ready', ....
payLoad: typing.MutableMapping[str, typing.Any]
) -> typing.Any:
self,
method: str, # i.e. 'initialize', 'ready', ....
payLoad: typing.MutableMapping[str, typing.Any],
) -> typing.Any:
return self._doPost(method=method, payLoad=payLoad, disableProxy=True)
def register(self, callbackUrl: str) -> None:
payLoad = {
'callback_url': callbackUrl
}
payLoad = {'callback_url': callbackUrl}
self.post('register', payLoad)
def unregister(self, callbackUrl: str) -> None:
payLoad = {
'callback_url': callbackUrl
}
payLoad = {'callback_url': callbackUrl}
self.post('unregister', payLoad)
def login(self, username: str, sessionType: typing.Optional[str] = None) -> types.LoginResultInfoType:
def login(
self, username: str, sessionType: typing.Optional[str] = None
) -> types.LoginResultInfoType:
payLoad = {
'username': username,
'session_type': sessionType or UNKNOWN,
@ -388,13 +415,11 @@ class UDSClientApi(UDSApi):
ip=result['ip'],
hostname=result['hostname'],
dead_line=result['dead_line'],
max_idle=result['max_idle']
max_idle=result['max_idle'],
)
def logout(self, username: str) -> None:
payLoad = {
'username': username
}
payLoad = {'username': username}
self.post('logout', payLoad)
def ping(self) -> bool:

View File

@ -32,7 +32,7 @@ var config struct {
IgnoreCertificates bool // If true, will ignore certificates (when requesting)
}
func validOrigin(w http.ResponseWriter, r *http.Request) error {
func validateOrigin(w http.ResponseWriter, r *http.Request) error {
ip := strings.Split(r.RemoteAddr, ":")[0]
for _, v := range config.AllowFrom {
if v == ip {
@ -46,7 +46,7 @@ func validOrigin(w http.ResponseWriter, r *http.Request) error {
// Test service
func testService(w http.ResponseWriter, r *http.Request) {
if validOrigin(w, r) != nil {
if validateOrigin(w, r) != nil {
return
}
@ -77,7 +77,7 @@ func testService(w http.ResponseWriter, r *http.Request) {
}
func proxyRequest(w http.ResponseWriter, r *http.Request) {
if validOrigin(w, r) != nil {
if validateOrigin(w, r) != nil {
return
}
log.Print("Proxy Request from ", r.RemoteAddr)