mirror of
https://github.com/altlinux/gpupdate.git
synced 2025-03-21 18:50:38 +03:00
Added part of laps implementation
This commit is contained in:
parent
ce660afcbd
commit
1c827d4533
@ -25,8 +25,11 @@ import struct
|
||||
from datetime import datetime, timedelta
|
||||
import dpapi_ng
|
||||
from util.util import remove_prefix_from_keys
|
||||
from util.sid import wbinfo_getsid, WellKnown21RID
|
||||
from util.sid import WellKnown21RID
|
||||
import subprocess
|
||||
import ldb
|
||||
import string
|
||||
import secrets
|
||||
|
||||
class laps_applier(applier_frontend):
|
||||
__epoch_timestamp = 11644473600 # January 1, 1970 as MS file time
|
||||
@ -42,17 +45,25 @@ class laps_applier(applier_frontend):
|
||||
|
||||
def __init__(self, storage):
|
||||
self.storage = storage
|
||||
all_alt_keys = storage.filter_entries(self.__registry_branch)
|
||||
self.all_keys = storage.filter_entries(self.__all_win_registry)
|
||||
(remove_prefix_from_keys(self.all_keys, self.__all_win_registry).update(
|
||||
remove_prefix_from_keys(all_alt_keys, self.__registry_branch)))
|
||||
all_alt_keys = remove_prefix_from_keys(storage.filter_entries(self.__registry_branch), self.__registry_branch)
|
||||
self.all_keys = remove_prefix_from_keys(storage.filter_entries(self.__all_win_registry), self.__all_win_registry)
|
||||
self.all_keys.update(all_alt_keys)
|
||||
|
||||
backup_directory = self.all_keys.get('BackupDirectory', None)
|
||||
if backup_directory != 2:
|
||||
self.__module_enabled = False
|
||||
print('backup_directory', backup_directory)
|
||||
return
|
||||
self.samdb = storage.get_info('samdb')
|
||||
domain_sid = self.samdb.get_domain_sid()
|
||||
self.admin_group_sid = f'{domain_sid}-{WellKnown21RID.DOMAIN_ADMINS.value}'
|
||||
self.expiration_date = self.get_expiration_date()
|
||||
self.expiration_date_int = self.get_int_time(self.expiration_date)
|
||||
self.__password = self.get_password()
|
||||
self.target_user = self.get_target_user()
|
||||
self.encryption_principal = self.get_encryption_principal()
|
||||
self.backup_directory = self.all_keys.get('BackupDirectory', None)
|
||||
|
||||
|
||||
self.__module_enabled = check_enabled(
|
||||
self.storage
|
||||
@ -60,15 +71,67 @@ class laps_applier(applier_frontend):
|
||||
, self.__module_experimental
|
||||
)
|
||||
def get_target_user(self):
|
||||
...
|
||||
return self.all_keys.get('AdministratorAccountName', 'root')
|
||||
|
||||
|
||||
def wbinfo_check_encryption_principal(self, encryption_principal):
|
||||
try:
|
||||
domain = self.storage.get_info('domain')
|
||||
username = f'{domain}\\{encryption_principal}'
|
||||
wbinfo_cmd = ['wbinfo', '-n', username]
|
||||
output = subprocess.check_output(wbinfo_cmd)
|
||||
sid = output.split()[0].decode('utf-8')
|
||||
return sid
|
||||
except subprocess.CalledProcessError:
|
||||
wbinfo_cmd = ['wbinfo', '-s', encryption_principal]
|
||||
try:
|
||||
output = subprocess.check_output(wbinfo_cmd)
|
||||
return encryption_principal
|
||||
except:
|
||||
return None
|
||||
|
||||
|
||||
def get_password(self):
|
||||
password_length = self.all_keys.get('PasswordLength', 14)
|
||||
if not isinstance(password_length, int) or not (8 <= password_length <= 64):
|
||||
password_length = 14
|
||||
|
||||
password_complexity = self.all_keys.get('PasswordComplexity', 4)
|
||||
if not isinstance(password_complexity, int) or not (1 <= password_complexity <= 4):
|
||||
password_complexity = 4
|
||||
|
||||
complexity_options = [
|
||||
string.ascii_letters + string.digits + string.punctuation,
|
||||
string.ascii_uppercase,
|
||||
string.ascii_lowercase + string.ascii_uppercase,
|
||||
string.ascii_letters + string.digits,
|
||||
string.ascii_letters + string.digits + string.punctuation
|
||||
]
|
||||
|
||||
chars = complexity_options[password_complexity]
|
||||
|
||||
|
||||
password = ''.join(secrets.choice(chars) for _ in range(password_length))
|
||||
return password
|
||||
|
||||
|
||||
|
||||
def __change_root_password(self):
|
||||
...
|
||||
|
||||
def get_encryption_principal(self):
|
||||
encryption_principal = self.all_keys.get('ADPasswordEncryptionPrincipal', None)
|
||||
sid = self.wbinfo_check_encryption_principal(encryption_principal) if encryption_principal else None
|
||||
return sid if sid else self.admin_group_sid
|
||||
|
||||
def get_json_pass(self):
|
||||
password = self.__password.encode("utf-16-le") + b"\x00\x00"
|
||||
return f'{{"n":{self.target_user},"t":{self.expiration_date_int},"p":{password}}}'
|
||||
|
||||
def get_expiration_date(self):
|
||||
# something...
|
||||
tmp_time = datetime.now().replace(month=12)
|
||||
return tmp_time
|
||||
password_age_days = self.all_keys.get('PasswordAgeDays', 0)
|
||||
return (datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
||||
+ timedelta(days=int(password_age_days)))
|
||||
|
||||
def get_int_time(self, datetime):
|
||||
epoch_timedelta = timedelta(seconds=self.__epoch_timestamp)
|
||||
@ -90,18 +153,19 @@ class laps_applier(applier_frontend):
|
||||
|
||||
def update_laps_password(self):
|
||||
try:
|
||||
self.__password = self.__password.encode("utf-16-le") + b"\x00\x00"
|
||||
date_int = str(self.get_int_time(self.expiration_date))
|
||||
psw_json = '{{"n":{},"t":{},"p":{}}}'.format(self.target_user, date_int, self.__password)
|
||||
psw_json = self.get_json_pass()
|
||||
machine_name = self.storage.get_info('machine_name')
|
||||
dpapi_ng_blob = dpapi_ng.ncrypt_protect_secret(psw_json, self.admin_group_sid, auth_protocol='kerberos')
|
||||
dpapi_ng_blob = dpapi_ng.ncrypt_protect_secret(psw_json, self.encryption_principal, auth_protocol='kerberos')
|
||||
full_blob = self.get_full_blob(dpapi_ng_blob)
|
||||
mod_msg = ldb.Message()
|
||||
mod_msg.dn = self.get_computer_dn(machine_name)
|
||||
mod_msg[self.__attr_EncryptedPassword] = ldb.MessageElement(full_blob, ldb.FLAG_MOD_REPLACE, self.__attr_EncryptedPassword)
|
||||
mod_msg[self.__attr_PasswordExpirationTime] = ldb.MessageElement(date_int, ldb.FLAG_MOD_REPLACE, self.__attr_PasswordExpirationTime)
|
||||
mod_msg[self.__attr_EncryptedPassword] = (ldb.MessageElement
|
||||
(full_blob, ldb.FLAG_MOD_REPLACE, self.__attr_EncryptedPassword))
|
||||
mod_msg[self.__attr_PasswordExpirationTime] = (ldb.MessageElement
|
||||
(self.expiration_date_int, ldb.FLAG_MOD_REPLACE, self.__attr_PasswordExpirationTime))
|
||||
|
||||
self.samdb.modify(mod_msg)
|
||||
self.__change_root_password()
|
||||
print(f"Зашифрованный пароль для {machine_name} успешно обновлен")
|
||||
|
||||
except Exception as e:
|
||||
|
Loading…
x
Reference in New Issue
Block a user