1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-05 09:17:54 +03:00

Fixing Config parameters

This commit is contained in:
Adolfo Gómez 2019-11-22 10:56:05 +01:00
parent bef9776a6b
commit d6691a4098
3 changed files with 48 additions and 64 deletions

View File

@ -75,9 +75,9 @@ class UDSConfigDialog(QDialog):
return udsactor.rest.REST(self.ui.host.text(), self.ui.validateCertificate.currentIndex() == 1)
def browse(self, lineEdit: 'QLineEdit', caption: str) -> None:
name = QFileDialog.getOpenFileName(parent=self, caption=caption)[0] # Returns tuple (filename, filter)
name = QFileDialog.getOpenFileName(parent=self, caption=caption, directory=os.path.dirname(lineEdit.text()))[0]
if name:
lineEdit.setText(name)
lineEdit.setText(os.path.normpath(name))
def browsePreconnect(self) -> None:
self.browse(self.ui.preCommand, 'Select Preconnect command')

View File

@ -25,22 +25,22 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
# pylint: disable=invalid-name
import socket
import platform
import fcntl
import os
import ctypes # @UnusedImport
import ctypes
import ctypes.util
import subprocess
import struct
import array
import typing
from udsactor import types
from .. import types
from .renamer import rename
@ -123,7 +123,6 @@ def getNetworkInfo() -> typing.Iterable[types.InterfaceInfoType]:
def getDomainName() -> str:
return ''
def getLinuxVersion() -> str:
import distro

View File

@ -25,34 +25,33 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
'''
@author: Adolfo Gómez, dkmaster at dkmon dot com
'''
# pylint: disable=invalid-name
import os
import ctypes
from ctypes.wintypes import DWORD, LPCWSTR
import typing
import win32com.client
import win32net
import win32security
import win32api
import win32con
import ctypes
from ctypes.wintypes import DWORD, LPCWSTR
import os
from udsactor import utils
from udsactor.log import logger
from .. import types
from ..log import logger
def getErrorMessage(res=0):
def getErrorMessage(resultCode: int = 0) -> str:
# sys_fs_enc = sys.getfilesystemencoding() or 'mbcs'
msg = win32api.FormatMessage(res)
return msg.decode('windows-1250', 'ignore')
msg = win32api.FormatMessage(resultCode)
return msg
def getComputerName():
def getComputerName() -> str:
return win32api.GetComputerNameEx(win32con.ComputerNamePhysicalDnsHostname)
def getNetworkInfo():
def getNetworkInfo() -> typing.Iterable[types.InterfaceInfoType]:
obj = win32com.client.Dispatch("WbemScripting.SWbemLocator")
wmobj = obj.ConnectServer("localhost", "root\\cimv2")
adapters = wmobj.ExecQuery("Select * from Win32_NetworkAdapterConfiguration where IpEnabled=True")
@ -63,13 +62,11 @@ def getNetworkInfo():
continue
if ip is None or ip == '' or ip.startswith('169.254') or ip.startswith('0.'): # If single link ip, or no ip
continue
# logger.debug('Net config found: {}=({}, {})'.format(obj.Caption, obj.MACAddress, ip))
yield utils.Bunch(name=obj.Caption, mac=obj.MACAddress, ip=ip)
yield types.InterfaceInfoType(name=obj.Caption, mac=obj.MACAddress, ip=ip)
except Exception:
return
def getDomainName():
def getDomainName() -> str:
'''
Will return the domain name if we belong a domain, else None
(if part of a network group, will also return None)
@ -85,11 +82,9 @@ def getDomainName():
return domain
def getWindowsVersion():
def getWindowsVersion() -> str:
return win32api.GetVersionEx()
EWX_LOGOFF = 0x00000000
EWX_SHUTDOWN = 0x00000001
EWX_REBOOT = 0x00000002
@ -97,20 +92,17 @@ EWX_FORCE = 0x00000004
EWX_POWEROFF = 0x00000008
EWX_FORCEIFHUNG = 0x00000010
def reboot(flags=EWX_FORCEIFHUNG | EWX_REBOOT):
def reboot(flags: int = EWX_FORCEIFHUNG | EWX_REBOOT) -> None:
hproc = win32api.GetCurrentProcess()
htok = win32security.OpenProcessToken(hproc, win32security.TOKEN_ADJUST_PRIVILEGES | win32security.TOKEN_QUERY)
privs = ((win32security.LookupPrivilegeValue(None, win32security.SE_SHUTDOWN_NAME), win32security.SE_PRIVILEGE_ENABLED),)
win32security.AdjustTokenPrivileges(htok, 0, privs)
win32api.ExitWindowsEx(flags, 0)
def loggoff():
def loggoff() -> None:
win32api.ExitWindowsEx(EWX_LOGOFF)
def renameComputer(newName):
def renameComputer(newName: str) -> None:
# Needs admin privileges to work
if ctypes.windll.kernel32.SetComputerNameExW(DWORD(win32con.ComputerNamePhysicalDnsHostname), LPCWSTR(newName)) == 0: # @UndefinedVariable
# win32api.FormatMessage -> returns error string
@ -120,7 +112,6 @@ def renameComputer(newName):
computerName = win32api.GetComputerNameEx(win32con.ComputerNamePhysicalDnsHostname)
raise Exception('Error renaming computer from {} to {}: {}'.format(computerName, newName, error))
NETSETUP_JOIN_DOMAIN = 0x00000001
NETSETUP_ACCT_CREATE = 0x00000002
NETSETUP_ACCT_DELETE = 0x00000004
@ -131,8 +122,7 @@ NETSETUP_MACHINE_PWD_PASSED = 0x00000080
NETSETUP_JOIN_WITH_NEW_NAME = 0x00000400
NETSETUP_DEFER_SPN_SET = 0x1000000
def joinDomain(domain, ou, account, password, executeInOneStep=False):
def joinDomain(domain: str, ou: str, account: str, password: str, executeInOneStep: bool = False) -> None:
'''
Joins machine to a windows domain
:param domain: Domain to join to
@ -149,26 +139,26 @@ def joinDomain(domain, ou, account, password, executeInOneStep=False):
account = domain + '\\' + account
# Do log
flags = NETSETUP_ACCT_CREATE | NETSETUP_DOMAIN_JOIN_IF_JOINED | NETSETUP_JOIN_DOMAIN
flags: typing.Any = NETSETUP_ACCT_CREATE | NETSETUP_DOMAIN_JOIN_IF_JOINED | NETSETUP_JOIN_DOMAIN
if executeInOneStep:
flags |= NETSETUP_JOIN_WITH_NEW_NAME
flags = DWORD(flags)
domain = LPCWSTR(domain)
lpDomain = LPCWSTR(domain)
# Must be in format "ou=.., ..., dc=...,"
ou = LPCWSTR(ou) if ou is not None and ou != '' else None
account = LPCWSTR(account)
password = LPCWSTR(password)
lpOu = LPCWSTR(ou) if ou is not None and ou != '' else None
lpAccount = LPCWSTR(account)
lpPassword = LPCWSTR(password)
res = ctypes.windll.netapi32.NetJoinDomain(None, domain, ou, account, password, flags)
res = ctypes.windll.netapi32.NetJoinDomain(None, lpDomain, lpOu, lpAccount, lpPassword, flags)
# Machine found in another ou, use it and warn this on log
if res == 2224:
flags = DWORD(NETSETUP_DOMAIN_JOIN_IF_JOINED | NETSETUP_JOIN_DOMAIN)
res = ctypes.windll.netapi32.NetJoinDomain(None, domain, None, account, password, flags)
if res != 0:
res = ctypes.windll.netapi32.NetJoinDomain(None, lpDomain, None, lpAccount, lpPassword, flags)
if res:
# Log the error
error = getErrorMessage(res)
if res == 1355:
@ -176,39 +166,34 @@ def joinDomain(domain, ou, account, password, executeInOneStep=False):
logger.error('Error joining domain: {}, {}'.format(error, res))
raise Exception('Error joining domain {}, with credentials {}/*****{}: {}, {}'.format(domain, account, ', under OU {}'.format(ou) if ou is not None else '', res, error))
def changeUserPassword(user: str, oldPassword: str, newPassword: str) -> None:
lpUser = LPCWSTR(user)
lpOldPassword = LPCWSTR(oldPassword)
lpNewPassword = LPCWSTR(newPassword)
def changeUserPassword(user, oldPassword, newPassword):
computerName = LPCWSTR(getComputerName())
user = LPCWSTR(user)
oldPassword = LPCWSTR(oldPassword)
newPassword = LPCWSTR(newPassword)
res = ctypes.windll.netapi32.NetUserChangePassword(computerName, user, oldPassword, newPassword)
res = ctypes.windll.netapi32.NetUserChangePassword(None, lpUser, lpOldPassword, lpNewPassword)
if res != 0:
# Log the error, and raise exception to parent
error = getErrorMessage(res)
raise Exception('Error changing password for user {}: {} {}'.format(user.value, res, error))
raise Exception('Error changing password for user {}: {} {}'.format(lpUser.value, res, error))
class LASTINPUTINFO(ctypes.Structure):
class LASTINPUTINFO(ctypes.Structure): # pylint: disable=too-few-public-methods
_fields_ = [
('cbSize', ctypes.c_uint),
('dwTime', ctypes.c_uint),
]
def initIdleDuration(atLeastSeconds):
def initIdleDuration(atLeastSeconds: int = 0): # pylint: disable=unused-argument
'''
In windows, there is no need to set screensaver
'''
pass
return
def getIdleDuration():
def getIdleDuration() -> float:
try:
lastInputInfo = LASTINPUTINFO()
lastInputInfo.cbSize = ctypes.sizeof(lastInputInfo)
lastInputInfo.cbSize = ctypes.sizeof(lastInputInfo) # pylint: disable=attribute-defined-outside-init
if ctypes.windll.user32.GetLastInputInfo(ctypes.byref(lastInputInfo)) == 0:
return 0
# if lastInputInfo.dwTime > 1000000000: # Value toooo high, nonsense...
@ -222,13 +207,13 @@ def getIdleDuration():
return 0
def getCurrentUser():
def getCurrentUser() -> str:
'''
Returns current logged in username
'''
return os.environ['USERNAME']
def writeToPipe(pipeName, bytesPayload, waitForResponse):
def writeToPipe(pipeName: str, bytesPayload: bytes, waitForResponse: bool) -> typing.Optional[bytes]:
# (str, bytes, bool) -> Optional[bytes]
try:
with open(pipeName, 'r+b', 0) as f:
@ -237,5 +222,5 @@ def writeToPipe(pipeName, bytesPayload, waitForResponse):
if waitForResponse:
return f.read()
return b'ok'
except Exception as e:
None
except Exception:
return None