diff --git a/actor/src/actor_client.py b/actor/src/actor_client.py index 9edbe857..b797adba 100755 --- a/actor/src/actor_client.py +++ b/actor/src/actor_client.py @@ -29,33 +29,32 @@ ''' @author: Adolfo Gómez, dkmaster at dkmon dot com ''' -# pylint: disable=invalid-name import sys import os -import PyQt5 # pylint: disable=unused-import +import PyQt5 # noqa from PyQt5.QtCore import QTimer from PyQt5.QtWidgets import QMainWindow from udsactor.log import logger, INFO from udsactor.client import UDSClientQApp -from udsactor.platform import operations +from udsactor import platform if __name__ == "__main__": logger.setLevel(INFO) # Ensure idle operations is initialized on start - operations.initIdleDuration(0) + platform.operations.initIdleDuration(0) - if 'linux' in sys.platform: + if platform.is_linux: os.environ['QT_X11_NO_MITSHM'] = '1' UDSClientQApp.setQuitOnLastWindowClosed(False) qApp = UDSClientQApp(sys.argv) - if 'win' in sys.platform: - # The "hidden window" is only needed to process events on Windows + if platform.is_windows or platform.is_mac: + # The "hidden window" is not needed on linux # Not needed on Linux mw = QMainWindow() mw.showMinimized() # Start minimized, will be hidden (not destroyed) as soon as qApp.init is invoked diff --git a/actor/src/actor_service.py b/actor/src/actor_service.py index 9aa91b95..eb0982ae 100755 --- a/actor/src/actor_service.py +++ b/actor/src/actor_service.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- # -# Copyright (c) 2020 Virtual Cable S.L. +# Copyright (c) 2020-2022 Virtual Cable S.L.U. # All rights reserved. # # Redistribution and use in source and binary forms, with or without modification, @@ -12,7 +12,7 @@ # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. -# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors # may be used to endorse or promote products derived from this software # without specific prior written permission. # @@ -29,12 +29,8 @@ ''' @author: Adolfo Gómez, dkmaster at dkmon dot com ''' -import sys +from udsactor import platform -if sys.platform == 'win32': - from udsactor.windows import runner -else: - from udsactor.linux import runner if __name__ == "__main__": - runner.run() + platform.runner.run() diff --git a/actor/src/udsactor/linux/runner.py b/actor/src/udsactor/linux/runner.py index 2e4ff3f7..1ef15fb4 100644 --- a/actor/src/udsactor/linux/runner.py +++ b/actor/src/udsactor/linux/runner.py @@ -50,7 +50,7 @@ def run() -> None: r = client.login(sys.argv[2], platform.operations.getSessionType()) print('{},{},{},{}\n'.format(r.ip, r.hostname, r.max_idle, r.dead_line or '')) elif sys.argv[1] == 'logout': - client.logout(sys.argv[2]) + client.logout(sys.argv[2], platform.operations.getSessionType()) except Exception as e: logger.exception() logger.error('Got exception while processing command: %s', e) diff --git a/actor/src/udsactor/macos/operations.py b/actor/src/udsactor/macos/operations.py index d083a6ab..2244297e 100644 --- a/actor/src/udsactor/macos/operations.py +++ b/actor/src/udsactor/macos/operations.py @@ -40,13 +40,17 @@ import typing import psutil -from .. import types +from udsactor import types, tools -MACVER_RE = re.compile(r"ProductVersion\s*(.*)", re.MULTILINE) +MACVER_RE = re.compile( + r"ProductVersion\s*(.*)", re.MULTILINE +) MACVER_FILE = '/System/Library/CoreServices/SystemVersion.plist' + def checkPermissions() -> bool: - return os.getuid() == 0 + return os.getuid() == 0 + def getComputerName() -> str: ''' @@ -54,6 +58,7 @@ def getComputerName() -> str: ''' return socket.gethostname().split('.')[0] + def getNetworkInfo() -> typing.Iterator[types.InterfaceInfoType]: ifdata: typing.List['psutil._common.snicaddr'] for ifname, ifdata in psutil.net_if_addrs().items(): @@ -65,17 +70,23 @@ def getNetworkInfo() -> typing.Iterator[types.InterfaceInfoType]: name = ifname elif row.family == socket.AF_LINK: mac = row.address - + # if all data is available, stop iterating if ip and name and mac: - if mac != '00:00:00:00:00:00' and mac and ip and ip.startswith('169.254') is False: # Skips local interfaces & interfaces with no dhcp IPs + if ( + mac != '00:00:00:00:00:00' + and mac + and ip + and ip.startswith('169.254') is False + ): # Skips local interfaces & interfaces with no dhcp IPs yield types.InterfaceInfoType(name=name, ip=ip, mac=mac) break - - + + def getDomainName() -> str: return '' + def getMacOs() -> str: try: with open(MACVER_FILE, 'r') as f: @@ -85,10 +96,11 @@ def getMacOs() -> str: return m.group(1) except Exception: # nosec: B110: ignore exception because we are not interested in it pass - + return 'unknown' -def reboot(flags: int = 0): + +def reboot(flags: int = 0) -> None: ''' Simple reboot using os command ''' @@ -99,7 +111,9 @@ def loggoff() -> None: ''' Right now restarts the machine... ''' - subprocess.run("/bin/launchctl bootout gui/$(id -u $USER)", shell=True) # nosec: Command line is fixed + subprocess.run( + "/bin/launchctl bootout gui/$(id -u $USER)", shell=True + ) # nosec: Command line is fixed # Ignores output, as it may fail if user is not logged in @@ -107,11 +121,14 @@ def renameComputer(newName: str) -> bool: ''' Changes the computer name Returns True if reboot needed + Note: For macOS, no configuration is supported, only "unmanaged" actor ''' return False -def joinDomain(domain: str, ou: str, account: str, password: str, executeInOneStep: bool = False): +def joinDomain( + domain: str, ou: str, account: str, password: str, executeInOneStep: bool = False +): pass @@ -122,11 +139,25 @@ def changeUserPassword(user: str, oldPassword: str, newPassword: str) -> None: def initIdleDuration(atLeastSeconds: int) -> None: pass - +# se we cache for 20 seconds the result, that is enough for our needs +# and we avoid calling a system command every time we need it +@tools.cache(20) def getIdleDuration() -> float: # Execute: try: - return int(next(filter(lambda x: b"HIDIdleTime" in x, subprocess.check_output(["ioreg", "-c", "IOHIDSystem"]).split(b"\n"))).split(b"=")[1]) / 1000000000 + return ( + int( + next( + filter( + lambda x: b"HIDIdleTime" in x, + subprocess.check_output( + ["/usr/sbin/ioreg", "-c", "IOHIDSystem"] + ).split(b"\n"), + ) + ).split(b"=")[1] + ) + / 1000000000 + ) # nosec: Command line is fixed except Exception: # nosec: B110: ignore exception because we are not interested in it return 0 @@ -137,14 +168,13 @@ def getCurrentUser() -> str: ''' return os.getlogin() + def getSessionType() -> str: ''' - Known values: - * Unknown -> No XDG_SESSION_TYPE environment variable - * xrdp --> xrdp session - * other types + Returns the session type. Currently, only "macos" (console) is supported ''' return 'macos' + def forceTimeSync() -> None: return diff --git a/actor/src/udsactor/macos/runner.py b/actor/src/udsactor/macos/runner.py index e201f479..f16647ae 100644 --- a/actor/src/udsactor/macos/runner.py +++ b/actor/src/udsactor/macos/runner.py @@ -29,13 +29,14 @@ @author: Adolfo Gómez, dkmaster at dkmon dot com ''' import sys +import typing from .. import rest from .. import platform from ..log import logger from .service import UDSActorSvc -def usage(): +def usage() -> typing.NoReturn: sys.stderr.write('usage: udsactor start|login "username"|logout "username"\n') sys.exit(2) @@ -50,7 +51,7 @@ def run() -> None: r = client.login(sys.argv[2], platform.operations.getSessionType()) print('{},{},{},{}\n'.format(r.ip, r.hostname, r.max_idle, r.dead_line or '')) elif sys.argv[1] == 'logout': - client.logout(sys.argv[2]) + client.logout(sys.argv[2], platform.operations.getSessionType()) except Exception as e: logger.exception() logger.error('Got exception while processing command: %s', e) diff --git a/actor/src/udsactor/platform.py b/actor/src/udsactor/platform.py index 2c8591bf..576ce3a7 100644 --- a/actor/src/udsactor/platform.py +++ b/actor/src/udsactor/platform.py @@ -31,9 +31,15 @@ import sys name = sys.platform +is_windows = is_linux = is_mac = False if sys.platform == 'win32': - from .windows import operations, store + from .windows import operations, store, runner + is_windows = True elif sys.platform == 'darwin': - from .macos import operations, store + from .macos import operations, store, runner + is_mac = True +elif sys.platform == 'linux': + from .linux import operations, store, runner + is_linux = True else: - from .linux import operations, store + raise Exception('Unsupported platform: {0}'.format(sys.platform)) diff --git a/actor/src/udsactor/tools.py b/actor/src/udsactor/tools.py index 397b0943..c33b618c 100644 --- a/actor/src/udsactor/tools.py +++ b/actor/src/udsactor/tools.py @@ -30,14 +30,43 @@ ''' import threading import ipaddress +import time import typing +import functools if typing.TYPE_CHECKING: from udsactor.types import InterfaceInfoType +# Simple cache for n seconds (default = 30) decorator +def cache(seconds: int = 30) -> typing.Callable: + ''' + Simple cache for n seconds (default = 30) decorator + ''' + def decorator(func) -> typing.Callable: + @functools.wraps(func) + def wrapper(*args, **kwargs) -> typing.Any: + if not hasattr(wrapper, 'cache'): + wrapper.cache = {} # type: ignore + cache = wrapper.cache # type: ignore + # Compose a key for the cache + key = '{}:{}'.format(args, kwargs) + if key in cache: + if time.time() - cache[key][0] < seconds: + return cache[key][1] + + # Call the function + result = func(*args, **kwargs) + cache[key] = (time.time(), result) + return result + + return wrapper + + return decorator + + +# Simple sub-script exectution thread class ScriptExecutorThread(threading.Thread): - def __init__(self, script: str) -> None: super(ScriptExecutorThread, self).__init__() self.script = script @@ -47,22 +76,26 @@ class ScriptExecutorThread(threading.Thread): try: logger.debug('Executing script: {}'.format(self.script)) - exec(self.script, globals(), None) # nosec: exec is fine, it's a "trusted" script + exec( + self.script, globals(), None + ) # nosec: exec is fine, it's a "trusted" script except Exception as e: logger.error('Error executing script: {}'.format(e)) logger.exception() + class Singleton(type): ''' Metaclass for singleton pattern Usage: - + class MyClass(metaclass=Singleton): ... ''' + _instance: typing.Optional[typing.Any] - # We use __init__ so we customise the created class from this metaclass + # We use __init__ so we customise the created class from this metaclass def __init__(self, *args, **kwargs) -> None: self._instance = None super().__init__(*args, **kwargs) @@ -74,7 +107,9 @@ class Singleton(type): # Convert "X.X.X.X/X" to ipaddress.IPv4Network -def strToNoIPV4Network(net: typing.Optional[str]) -> typing.Optional[ipaddress.IPv4Network]: +def strToNoIPV4Network( + net: typing.Optional[str], +) -> typing.Optional[ipaddress.IPv4Network]: if not net: # Empty or None return None try: