mirror of
https://github.com/altlinux/gpupdate.git
synced 2025-03-21 18:50:38 +03:00
Added password change logic
This commit is contained in:
parent
3e889622b1
commit
1f6776912d
@ -30,6 +30,7 @@ import subprocess
|
||||
import ldb
|
||||
import string
|
||||
import secrets
|
||||
from passlib.hash import bcrypt
|
||||
|
||||
class laps_applier(applier_frontend):
|
||||
__epoch_timestamp = 11644473600 # January 1, 1970 as MS file time
|
||||
@ -41,6 +42,7 @@ class laps_applier(applier_frontend):
|
||||
__registry_branch = 'Software/BaseALT/Policies/Laps'
|
||||
__attr_EncryptedPassword = 'msLAPS-EncryptedPassword'
|
||||
__attr_PasswordExpirationTime = 'msLAPS-PasswordExpirationTime'
|
||||
__key_passwordLastModified = '/Software/BaseALT/Policies/Laps/passwordLastModified'
|
||||
|
||||
|
||||
def __init__(self, storage):
|
||||
@ -49,20 +51,22 @@ class laps_applier(applier_frontend):
|
||||
self.all_keys = remove_prefix_from_keys(storage.filter_entries(self.__all_win_registry), self.__all_win_registry)
|
||||
self.all_keys.update(all_alt_keys)
|
||||
|
||||
backup_directory = self.all_keys.get('BackupDirectory', None)
|
||||
if backup_directory != 2:
|
||||
self.backup_directory = self.all_keys.get('BackupDirectory', None)
|
||||
if self.backup_directory != 2:
|
||||
self.__module_enabled = False
|
||||
print('backup_directory', backup_directory)
|
||||
print('backup_directory', self.backup_directory)
|
||||
return
|
||||
self.samdb = storage.get_info('samdb')
|
||||
domain_sid = self.samdb.get_domain_sid()
|
||||
self.domain_dn = self.samdb.domain_dn()
|
||||
self.computer_dn = self.get_computer_dn()
|
||||
self.admin_group_sid = f'{domain_sid}-{WellKnown21RID.DOMAIN_ADMINS.value}'
|
||||
self.expiration_date = self.get_expiration_date()
|
||||
self.expiration_date_int = self.get_int_time(self.expiration_date)
|
||||
self.__password = self.get_password()
|
||||
self.dt_now_int = self.get_int_time(datetime.now())
|
||||
self.hash_psw = None
|
||||
self.target_user = self.get_target_user()
|
||||
self.encryption_principal = self.get_encryption_principal()
|
||||
self.backup_directory = self.all_keys.get('BackupDirectory', None)
|
||||
|
||||
|
||||
self.__module_enabled = check_enabled(
|
||||
@ -74,6 +78,9 @@ class laps_applier(applier_frontend):
|
||||
return self.all_keys.get('AdministratorAccountName', 'root')
|
||||
|
||||
|
||||
def get_hash_password(self, password):
|
||||
return bcrypt.hash(password)
|
||||
|
||||
def wbinfo_check_encryption_principal(self, encryption_principal):
|
||||
try:
|
||||
domain = self.storage.get_info('domain')
|
||||
@ -88,7 +95,7 @@ class laps_applier(applier_frontend):
|
||||
output = subprocess.check_output(wbinfo_cmd)
|
||||
return encryption_principal
|
||||
except:
|
||||
return None
|
||||
return self.admin_group_sid
|
||||
|
||||
|
||||
def get_password(self):
|
||||
@ -143,18 +150,33 @@ class laps_applier(applier_frontend):
|
||||
|
||||
except Exception as e:
|
||||
print('Dlog', e)
|
||||
return
|
||||
return None
|
||||
|
||||
def __change_root_password(self):
|
||||
...
|
||||
try:
|
||||
if self.hash_psw:
|
||||
subprocess.check_output(
|
||||
['usermod', '-p', self.hash_psw, self.target_user],
|
||||
stderr=subprocess.STDOUT, text=True)
|
||||
else:
|
||||
print('Dlog')
|
||||
except subprocess.CalledProcessError as e:
|
||||
print('Dlog', e.output)
|
||||
except Exception as e:
|
||||
print('Dlog')
|
||||
|
||||
def get_encryption_principal(self):
|
||||
encryption_principal = self.all_keys.get('ADPasswordEncryptionPrincipal', None)
|
||||
sid = self.wbinfo_check_encryption_principal(encryption_principal) if encryption_principal else None
|
||||
return sid if sid else self.admin_group_sid
|
||||
sid = (self.wbinfo_check_encryption_principal(encryption_principal)
|
||||
if encryption_principal
|
||||
else self.admin_group_sid)
|
||||
return sid
|
||||
|
||||
def get_json_pass(self):
|
||||
password = self.__password.encode("utf-16-le") + b"\x00\x00"
|
||||
psw = self.get_password()
|
||||
self.hash_psw = self.get_hash_password(psw)
|
||||
password = self.psw.encode("utf-16-le") + b"\x00\x00"
|
||||
|
||||
return f'{{"n":{self.target_user},"t":{self.expiration_date_int},"p":{password}}}'
|
||||
|
||||
def get_expiration_date(self):
|
||||
@ -168,34 +190,64 @@ class laps_applier(applier_frontend):
|
||||
return int(new_dt.timestamp() * self.__hundreds_of_nanoseconds)
|
||||
|
||||
def get_full_blob(self, dpapi_ng_blob):
|
||||
dt = self.get_int_time(datetime.now())
|
||||
left,right = struct.unpack('<LL',struct.pack('Q',dt))
|
||||
left,right = struct.unpack('<LL',struct.pack('Q',self.dt_now_int))
|
||||
packed = struct.pack('<LL',right,left)
|
||||
prefix = packed + struct.pack('<i', len(dpapi_ng_blob)) + b'\x00\x00\x00\x00'
|
||||
full_blob = prefix + dpapi_ng_blob
|
||||
return full_blob
|
||||
|
||||
def get_computer_dn(self, machine_name):
|
||||
def get_computer_dn(self):
|
||||
machine_name = self.storage.get_info('machine_name')
|
||||
search_filter = f'(sAMAccountName={machine_name})'
|
||||
results = self.samdb.search(base=self.samdb.domain_dn(), expression=search_filter, attrs=['dn'])
|
||||
results = self.samdb.search(base=self.domain_dn, expression=search_filter, attrs=['dn'])
|
||||
return results[0]['dn']
|
||||
|
||||
def get_expiration_time_attr(self):
|
||||
try:
|
||||
res = self.samdb.search(base=self.computer_dn,
|
||||
scope=ldb.SCOPE_BASE,
|
||||
expression="(objectClass=*)",
|
||||
attrs=[self.__attr_PasswordExpirationTime])
|
||||
|
||||
int_data = int(res[0].get(self.__attr_PasswordExpirationTime, 0)[0])
|
||||
|
||||
except Exception as e:
|
||||
int_data = 0
|
||||
return int_data
|
||||
|
||||
|
||||
def write_dconf_pass_last_mod(self):
|
||||
try:
|
||||
LastModified = f'"{self.dt_now_int}"'
|
||||
subprocess.check_output(['dconf', 'write', self.__key_passwordLastModified, LastModified])
|
||||
except Exception as exc:
|
||||
print('Dlog', exc)
|
||||
|
||||
def read_dconf_pass_last_mod(self):
|
||||
LastModified = None
|
||||
try:
|
||||
LastModified = subprocess.check_output(['dconf', 'read', self.__key_passwordLastModified]).split("\n")[0]
|
||||
except Exception as exc:
|
||||
print('Dlog', exc)
|
||||
return LastModified
|
||||
|
||||
|
||||
def update_laps_password(self):
|
||||
try:
|
||||
psw_json = self.get_json_pass()
|
||||
machine_name = self.storage.get_info('machine_name')
|
||||
dpapi_ng_blob = dpapi_ng.ncrypt_protect_secret(psw_json, self.encryption_principal, auth_protocol='kerberos')
|
||||
full_blob = self.get_full_blob(dpapi_ng_blob)
|
||||
mod_msg = ldb.Message()
|
||||
mod_msg.dn = self.get_computer_dn(machine_name)
|
||||
mod_msg.dn = self.computer_dn
|
||||
mod_msg[self.__attr_EncryptedPassword] = (ldb.MessageElement
|
||||
(full_blob, ldb.FLAG_MOD_REPLACE, self.__attr_EncryptedPassword))
|
||||
mod_msg[self.__attr_PasswordExpirationTime] = (ldb.MessageElement
|
||||
(self.expiration_date_int, ldb.FLAG_MOD_REPLACE, self.__attr_PasswordExpirationTime))
|
||||
|
||||
self.samdb.modify(mod_msg)
|
||||
self.__change_root_password()
|
||||
print(f"Зашифрованный пароль для {machine_name} успешно обновлен")
|
||||
#self.__change_root_password()
|
||||
self.write_dconf_pass_last_mod()
|
||||
print(f"Пароль успешно обновлен")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Ошибка при работе с LDAP: {str(e)}")
|
||||
|
Loading…
x
Reference in New Issue
Block a user