added cache to checkIdle on macos

This commit is contained in:
Adolfo Gómez García 2022-08-12 22:49:19 +02:00
parent cb4b2184a4
commit 54ea57f330
7 changed files with 110 additions and 43 deletions

View File

@ -29,33 +29,32 @@
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
# pylint: disable=invalid-name
import sys
import os
import PyQt5 # pylint: disable=unused-import
import PyQt5 # noqa
from PyQt5.QtCore import QTimer
from PyQt5.QtWidgets import QMainWindow
from udsactor.log import logger, INFO
from udsactor.client import UDSClientQApp
from udsactor.platform import operations
from udsactor import platform
if __name__ == "__main__":
logger.setLevel(INFO)
# Ensure idle operations is initialized on start
operations.initIdleDuration(0)
platform.operations.initIdleDuration(0)
if 'linux' in sys.platform:
if platform.is_linux:
os.environ['QT_X11_NO_MITSHM'] = '1'
UDSClientQApp.setQuitOnLastWindowClosed(False)
qApp = UDSClientQApp(sys.argv)
if 'win' in sys.platform:
# The "hidden window" is only needed to process events on Windows
if platform.is_windows or platform.is_mac:
# The "hidden window" is not needed on linux
# Not needed on Linux
mw = QMainWindow()
mw.showMinimized() # Start minimized, will be hidden (not destroyed) as soon as qApp.init is invoked

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020 Virtual Cable S.L.
# Copyright (c) 2020-2022 Virtual Cable S.L.U.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
@ -12,7 +12,7 @@
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# * Neither the name of Virtual Cable S.L.U. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
@ -29,12 +29,8 @@
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import sys
from udsactor import platform
if sys.platform == 'win32':
from udsactor.windows import runner
else:
from udsactor.linux import runner
if __name__ == "__main__":
runner.run()
platform.runner.run()

View File

@ -50,7 +50,7 @@ def run() -> None:
r = client.login(sys.argv[2], platform.operations.getSessionType())
print('{},{},{},{}\n'.format(r.ip, r.hostname, r.max_idle, r.dead_line or ''))
elif sys.argv[1] == 'logout':
client.logout(sys.argv[2])
client.logout(sys.argv[2], platform.operations.getSessionType())
except Exception as e:
logger.exception()
logger.error('Got exception while processing command: %s', e)

View File

@ -40,13 +40,17 @@ import typing
import psutil
from .. import types
from udsactor import types, tools
MACVER_RE = re.compile(r"<key>ProductVersion</key>\s*<string>(.*)</string>", re.MULTILINE)
MACVER_RE = re.compile(
r"<key>ProductVersion</key>\s*<string>(.*)</string>", re.MULTILINE
)
MACVER_FILE = '/System/Library/CoreServices/SystemVersion.plist'
def checkPermissions() -> bool:
return os.getuid() == 0
return os.getuid() == 0
def getComputerName() -> str:
'''
@ -54,6 +58,7 @@ def getComputerName() -> str:
'''
return socket.gethostname().split('.')[0]
def getNetworkInfo() -> typing.Iterator[types.InterfaceInfoType]:
ifdata: typing.List['psutil._common.snicaddr']
for ifname, ifdata in psutil.net_if_addrs().items():
@ -65,17 +70,23 @@ def getNetworkInfo() -> typing.Iterator[types.InterfaceInfoType]:
name = ifname
elif row.family == socket.AF_LINK:
mac = row.address
# if all data is available, stop iterating
if ip and name and mac:
if mac != '00:00:00:00:00:00' and mac and ip and ip.startswith('169.254') is False: # Skips local interfaces & interfaces with no dhcp IPs
if (
mac != '00:00:00:00:00:00'
and mac
and ip
and ip.startswith('169.254') is False
): # Skips local interfaces & interfaces with no dhcp IPs
yield types.InterfaceInfoType(name=name, ip=ip, mac=mac)
break
def getDomainName() -> str:
return ''
def getMacOs() -> str:
try:
with open(MACVER_FILE, 'r') as f:
@ -85,10 +96,11 @@ def getMacOs() -> str:
return m.group(1)
except Exception: # nosec: B110: ignore exception because we are not interested in it
pass
return 'unknown'
def reboot(flags: int = 0):
def reboot(flags: int = 0) -> None:
'''
Simple reboot using os command
'''
@ -99,7 +111,9 @@ def loggoff() -> None:
'''
Right now restarts the machine...
'''
subprocess.run("/bin/launchctl bootout gui/$(id -u $USER)", shell=True) # nosec: Command line is fixed
subprocess.run(
"/bin/launchctl bootout gui/$(id -u $USER)", shell=True
) # nosec: Command line is fixed
# Ignores output, as it may fail if user is not logged in
@ -107,11 +121,14 @@ def renameComputer(newName: str) -> bool:
'''
Changes the computer name
Returns True if reboot needed
Note: For macOS, no configuration is supported, only "unmanaged" actor
'''
return False
def joinDomain(domain: str, ou: str, account: str, password: str, executeInOneStep: bool = False):
def joinDomain(
domain: str, ou: str, account: str, password: str, executeInOneStep: bool = False
):
pass
@ -122,11 +139,25 @@ def changeUserPassword(user: str, oldPassword: str, newPassword: str) -> None:
def initIdleDuration(atLeastSeconds: int) -> None:
pass
# se we cache for 20 seconds the result, that is enough for our needs
# and we avoid calling a system command every time we need it
@tools.cache(20)
def getIdleDuration() -> float:
# Execute:
try:
return int(next(filter(lambda x: b"HIDIdleTime" in x, subprocess.check_output(["ioreg", "-c", "IOHIDSystem"]).split(b"\n"))).split(b"=")[1]) / 1000000000
return (
int(
next(
filter(
lambda x: b"HIDIdleTime" in x,
subprocess.check_output(
["/usr/sbin/ioreg", "-c", "IOHIDSystem"]
).split(b"\n"),
)
).split(b"=")[1]
)
/ 1000000000
) # nosec: Command line is fixed
except Exception: # nosec: B110: ignore exception because we are not interested in it
return 0
@ -137,14 +168,13 @@ def getCurrentUser() -> str:
'''
return os.getlogin()
def getSessionType() -> str:
'''
Known values:
* Unknown -> No XDG_SESSION_TYPE environment variable
* xrdp --> xrdp session
* other types
Returns the session type. Currently, only "macos" (console) is supported
'''
return 'macos'
def forceTimeSync() -> None:
return

View File

@ -29,13 +29,14 @@
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
import sys
import typing
from .. import rest
from .. import platform
from ..log import logger
from .service import UDSActorSvc
def usage():
def usage() -> typing.NoReturn:
sys.stderr.write('usage: udsactor start|login "username"|logout "username"\n')
sys.exit(2)
@ -50,7 +51,7 @@ def run() -> None:
r = client.login(sys.argv[2], platform.operations.getSessionType())
print('{},{},{},{}\n'.format(r.ip, r.hostname, r.max_idle, r.dead_line or ''))
elif sys.argv[1] == 'logout':
client.logout(sys.argv[2])
client.logout(sys.argv[2], platform.operations.getSessionType())
except Exception as e:
logger.exception()
logger.error('Got exception while processing command: %s', e)

View File

@ -31,9 +31,15 @@
import sys
name = sys.platform
is_windows = is_linux = is_mac = False
if sys.platform == 'win32':
from .windows import operations, store
from .windows import operations, store, runner
is_windows = True
elif sys.platform == 'darwin':
from .macos import operations, store
from .macos import operations, store, runner
is_mac = True
elif sys.platform == 'linux':
from .linux import operations, store, runner
is_linux = True
else:
from .linux import operations, store
raise Exception('Unsupported platform: {0}'.format(sys.platform))

View File

@ -30,14 +30,43 @@
'''
import threading
import ipaddress
import time
import typing
import functools
if typing.TYPE_CHECKING:
from udsactor.types import InterfaceInfoType
# Simple cache for n seconds (default = 30) decorator
def cache(seconds: int = 30) -> typing.Callable:
'''
Simple cache for n seconds (default = 30) decorator
'''
def decorator(func) -> typing.Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> typing.Any:
if not hasattr(wrapper, 'cache'):
wrapper.cache = {} # type: ignore
cache = wrapper.cache # type: ignore
# Compose a key for the cache
key = '{}:{}'.format(args, kwargs)
if key in cache:
if time.time() - cache[key][0] < seconds:
return cache[key][1]
# Call the function
result = func(*args, **kwargs)
cache[key] = (time.time(), result)
return result
return wrapper
return decorator
# Simple sub-script exectution thread
class ScriptExecutorThread(threading.Thread):
def __init__(self, script: str) -> None:
super(ScriptExecutorThread, self).__init__()
self.script = script
@ -47,22 +76,26 @@ class ScriptExecutorThread(threading.Thread):
try:
logger.debug('Executing script: {}'.format(self.script))
exec(self.script, globals(), None) # nosec: exec is fine, it's a "trusted" script
exec(
self.script, globals(), None
) # nosec: exec is fine, it's a "trusted" script
except Exception as e:
logger.error('Error executing script: {}'.format(e))
logger.exception()
class Singleton(type):
'''
Metaclass for singleton pattern
Usage:
class MyClass(metaclass=Singleton):
...
'''
_instance: typing.Optional[typing.Any]
# We use __init__ so we customise the created class from this metaclass
# We use __init__ so we customise the created class from this metaclass
def __init__(self, *args, **kwargs) -> None:
self._instance = None
super().__init__(*args, **kwargs)
@ -74,7 +107,9 @@ class Singleton(type):
# Convert "X.X.X.X/X" to ipaddress.IPv4Network
def strToNoIPV4Network(net: typing.Optional[str]) -> typing.Optional[ipaddress.IPv4Network]:
def strToNoIPV4Network(
net: typing.Optional[str],
) -> typing.Optional[ipaddress.IPv4Network]:
if not net: # Empty or None
return None
try: