diff --git a/actor/src/UDSActorConfig.py b/actor/src/UDSActorConfig.py index af6a8a509..56fb6ccb4 100644 --- a/actor/src/UDSActorConfig.py +++ b/actor/src/UDSActorConfig.py @@ -75,9 +75,9 @@ class UDSConfigDialog(QDialog): return udsactor.rest.REST(self.ui.host.text(), self.ui.validateCertificate.currentIndex() == 1) def browse(self, lineEdit: 'QLineEdit', caption: str) -> None: - name = QFileDialog.getOpenFileName(parent=self, caption=caption)[0] # Returns tuple (filename, filter) + name = QFileDialog.getOpenFileName(parent=self, caption=caption, directory=os.path.dirname(lineEdit.text()))[0] if name: - lineEdit.setText(name) + lineEdit.setText(os.path.normpath(name)) def browsePreconnect(self) -> None: self.browse(self.ui.preCommand, 'Select Preconnect command') diff --git a/actor/src/udsactor/linux/operations.py b/actor/src/udsactor/linux/operations.py index c826e92c9..3c7931eac 100644 --- a/actor/src/udsactor/linux/operations.py +++ b/actor/src/udsactor/linux/operations.py @@ -25,22 +25,22 @@ # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - ''' @author: Adolfo Gómez, dkmaster at dkmon dot com ''' +# pylint: disable=invalid-name import socket import platform import fcntl import os -import ctypes # @UnusedImport +import ctypes import ctypes.util import subprocess import struct import array import typing -from udsactor import types +from .. import types from .renamer import rename @@ -123,7 +123,6 @@ def getNetworkInfo() -> typing.Iterable[types.InterfaceInfoType]: def getDomainName() -> str: return '' - def getLinuxVersion() -> str: import distro diff --git a/actor/src/udsactor/windows/operations.py b/actor/src/udsactor/windows/operations.py index 18d5fdae3..9ec0d5d3e 100644 --- a/actor/src/udsactor/windows/operations.py +++ b/actor/src/udsactor/windows/operations.py @@ -25,34 +25,33 @@ # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - ''' @author: Adolfo Gómez, dkmaster at dkmon dot com ''' +# pylint: disable=invalid-name +import os +import ctypes +from ctypes.wintypes import DWORD, LPCWSTR +import typing + import win32com.client import win32net import win32security import win32api import win32con -import ctypes -from ctypes.wintypes import DWORD, LPCWSTR -import os -from udsactor import utils -from udsactor.log import logger +from .. import types +from ..log import logger - -def getErrorMessage(res=0): +def getErrorMessage(resultCode: int = 0) -> str: # sys_fs_enc = sys.getfilesystemencoding() or 'mbcs' - msg = win32api.FormatMessage(res) - return msg.decode('windows-1250', 'ignore') + msg = win32api.FormatMessage(resultCode) + return msg - -def getComputerName(): +def getComputerName() -> str: return win32api.GetComputerNameEx(win32con.ComputerNamePhysicalDnsHostname) - -def getNetworkInfo(): +def getNetworkInfo() -> typing.Iterable[types.InterfaceInfoType]: obj = win32com.client.Dispatch("WbemScripting.SWbemLocator") wmobj = obj.ConnectServer("localhost", "root\\cimv2") adapters = wmobj.ExecQuery("Select * from Win32_NetworkAdapterConfiguration where IpEnabled=True") @@ -63,13 +62,11 @@ def getNetworkInfo(): continue if ip is None or ip == '' or ip.startswith('169.254') or ip.startswith('0.'): # If single link ip, or no ip continue - # logger.debug('Net config found: {}=({}, {})'.format(obj.Caption, obj.MACAddress, ip)) - yield utils.Bunch(name=obj.Caption, mac=obj.MACAddress, ip=ip) + yield types.InterfaceInfoType(name=obj.Caption, mac=obj.MACAddress, ip=ip) except Exception: return - -def getDomainName(): +def getDomainName() -> str: ''' Will return the domain name if we belong a domain, else None (if part of a network group, will also return None) @@ -85,11 +82,9 @@ def getDomainName(): return domain - -def getWindowsVersion(): +def getWindowsVersion() -> str: return win32api.GetVersionEx() - EWX_LOGOFF = 0x00000000 EWX_SHUTDOWN = 0x00000001 EWX_REBOOT = 0x00000002 @@ -97,20 +92,17 @@ EWX_FORCE = 0x00000004 EWX_POWEROFF = 0x00000008 EWX_FORCEIFHUNG = 0x00000010 - -def reboot(flags=EWX_FORCEIFHUNG | EWX_REBOOT): +def reboot(flags: int = EWX_FORCEIFHUNG | EWX_REBOOT) -> None: hproc = win32api.GetCurrentProcess() htok = win32security.OpenProcessToken(hproc, win32security.TOKEN_ADJUST_PRIVILEGES | win32security.TOKEN_QUERY) privs = ((win32security.LookupPrivilegeValue(None, win32security.SE_SHUTDOWN_NAME), win32security.SE_PRIVILEGE_ENABLED),) win32security.AdjustTokenPrivileges(htok, 0, privs) win32api.ExitWindowsEx(flags, 0) - -def loggoff(): +def loggoff() -> None: win32api.ExitWindowsEx(EWX_LOGOFF) - -def renameComputer(newName): +def renameComputer(newName: str) -> None: # Needs admin privileges to work if ctypes.windll.kernel32.SetComputerNameExW(DWORD(win32con.ComputerNamePhysicalDnsHostname), LPCWSTR(newName)) == 0: # @UndefinedVariable # win32api.FormatMessage -> returns error string @@ -120,7 +112,6 @@ def renameComputer(newName): computerName = win32api.GetComputerNameEx(win32con.ComputerNamePhysicalDnsHostname) raise Exception('Error renaming computer from {} to {}: {}'.format(computerName, newName, error)) - NETSETUP_JOIN_DOMAIN = 0x00000001 NETSETUP_ACCT_CREATE = 0x00000002 NETSETUP_ACCT_DELETE = 0x00000004 @@ -131,8 +122,7 @@ NETSETUP_MACHINE_PWD_PASSED = 0x00000080 NETSETUP_JOIN_WITH_NEW_NAME = 0x00000400 NETSETUP_DEFER_SPN_SET = 0x1000000 - -def joinDomain(domain, ou, account, password, executeInOneStep=False): +def joinDomain(domain: str, ou: str, account: str, password: str, executeInOneStep: bool = False) -> None: ''' Joins machine to a windows domain :param domain: Domain to join to @@ -149,26 +139,26 @@ def joinDomain(domain, ou, account, password, executeInOneStep=False): account = domain + '\\' + account # Do log - flags = NETSETUP_ACCT_CREATE | NETSETUP_DOMAIN_JOIN_IF_JOINED | NETSETUP_JOIN_DOMAIN + flags: typing.Any = NETSETUP_ACCT_CREATE | NETSETUP_DOMAIN_JOIN_IF_JOINED | NETSETUP_JOIN_DOMAIN if executeInOneStep: flags |= NETSETUP_JOIN_WITH_NEW_NAME flags = DWORD(flags) - domain = LPCWSTR(domain) + lpDomain = LPCWSTR(domain) # Must be in format "ou=.., ..., dc=...," - ou = LPCWSTR(ou) if ou is not None and ou != '' else None - account = LPCWSTR(account) - password = LPCWSTR(password) + lpOu = LPCWSTR(ou) if ou is not None and ou != '' else None + lpAccount = LPCWSTR(account) + lpPassword = LPCWSTR(password) - res = ctypes.windll.netapi32.NetJoinDomain(None, domain, ou, account, password, flags) + res = ctypes.windll.netapi32.NetJoinDomain(None, lpDomain, lpOu, lpAccount, lpPassword, flags) # Machine found in another ou, use it and warn this on log if res == 2224: flags = DWORD(NETSETUP_DOMAIN_JOIN_IF_JOINED | NETSETUP_JOIN_DOMAIN) - res = ctypes.windll.netapi32.NetJoinDomain(None, domain, None, account, password, flags) - if res != 0: + res = ctypes.windll.netapi32.NetJoinDomain(None, lpDomain, None, lpAccount, lpPassword, flags) + if res: # Log the error error = getErrorMessage(res) if res == 1355: @@ -176,39 +166,34 @@ def joinDomain(domain, ou, account, password, executeInOneStep=False): logger.error('Error joining domain: {}, {}'.format(error, res)) raise Exception('Error joining domain {}, with credentials {}/*****{}: {}, {}'.format(domain, account, ', under OU {}'.format(ou) if ou is not None else '', res, error)) +def changeUserPassword(user: str, oldPassword: str, newPassword: str) -> None: + lpUser = LPCWSTR(user) + lpOldPassword = LPCWSTR(oldPassword) + lpNewPassword = LPCWSTR(newPassword) -def changeUserPassword(user, oldPassword, newPassword): - computerName = LPCWSTR(getComputerName()) - user = LPCWSTR(user) - oldPassword = LPCWSTR(oldPassword) - newPassword = LPCWSTR(newPassword) - - res = ctypes.windll.netapi32.NetUserChangePassword(computerName, user, oldPassword, newPassword) + res = ctypes.windll.netapi32.NetUserChangePassword(None, lpUser, lpOldPassword, lpNewPassword) if res != 0: # Log the error, and raise exception to parent error = getErrorMessage(res) - raise Exception('Error changing password for user {}: {} {}'.format(user.value, res, error)) + raise Exception('Error changing password for user {}: {} {}'.format(lpUser.value, res, error)) - -class LASTINPUTINFO(ctypes.Structure): +class LASTINPUTINFO(ctypes.Structure): # pylint: disable=too-few-public-methods _fields_ = [ ('cbSize', ctypes.c_uint), ('dwTime', ctypes.c_uint), ] - -def initIdleDuration(atLeastSeconds): +def initIdleDuration(atLeastSeconds: int = 0): # pylint: disable=unused-argument ''' In windows, there is no need to set screensaver ''' - pass + return - -def getIdleDuration(): +def getIdleDuration() -> float: try: lastInputInfo = LASTINPUTINFO() - lastInputInfo.cbSize = ctypes.sizeof(lastInputInfo) + lastInputInfo.cbSize = ctypes.sizeof(lastInputInfo) # pylint: disable=attribute-defined-outside-init if ctypes.windll.user32.GetLastInputInfo(ctypes.byref(lastInputInfo)) == 0: return 0 # if lastInputInfo.dwTime > 1000000000: # Value toooo high, nonsense... @@ -222,13 +207,13 @@ def getIdleDuration(): return 0 -def getCurrentUser(): +def getCurrentUser() -> str: ''' Returns current logged in username ''' return os.environ['USERNAME'] -def writeToPipe(pipeName, bytesPayload, waitForResponse): +def writeToPipe(pipeName: str, bytesPayload: bytes, waitForResponse: bool) -> typing.Optional[bytes]: # (str, bytes, bool) -> Optional[bytes] try: with open(pipeName, 'r+b', 0) as f: @@ -237,5 +222,5 @@ def writeToPipe(pipeName, bytesPayload, waitForResponse): if waitForResponse: return f.read() return b'ok' - except Exception as e: - None + except Exception: + return None