1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-03-21 18:50:38 +03:00

Added implementation of actions in case of used password

This commit is contained in:
Valery Sinelnikov 2025-02-27 11:46:51 +04:00
parent 156918ad3b
commit 5f94fad90b

View File

@ -30,8 +30,8 @@ import subprocess
import ldb
import string
import secrets
from passlib.hash import bcrypt
import os
import psutil
class laps_applier(applier_frontend):
__epoch_timestamp = 11644473600
@ -71,7 +71,7 @@ class laps_applier(applier_frontend):
self.dt_now_int = self.get_int_time(datetime.now())
self.expiration_time_attr = self.get_expiration_time_attr()
self.pass_last_mod_int = self.read_dconf_pass_last_mod()
self.post_authentication_actions = self.all_keys.get('PostAuthenticationActions', 1)
self.post_authentication_actions = self.all_keys.get('PostAuthenticationActions', 2)
self.post_authentication_reset_delay = self.all_keys.get('PostAuthenticationResetDelay', 24)
self.target_user = self.get_target_user()
self.encryption_principal = self.get_encryption_principal()
@ -87,9 +87,6 @@ class laps_applier(applier_frontend):
return self.all_keys.get('AdministratorAccountName', 'root')
def get_hash_password(self, password):
return bcrypt.hash(password)
def wbinfo_check_encryption_principal(self, encryption_principal):
try:
domain = self.storage.get_info('domain')
@ -251,6 +248,27 @@ class laps_applier(applier_frontend):
return last_modified
def terminate_user_sessions(self):
"""
Terminates all processes associated with the active sessions of the specified user.
Sessions are retrieved using psutil.users().
"""
# Get a list of active sessions for the given user
user_sessions = [user for user in psutil.users() if user.name == self.target_user]
if not user_sessions:
return
for session in user_sessions:
session_pid = session.pid # Get the PID of the session process
try:
proc = psutil.Process(session_pid)
proc.kill() # Send SIGKILL (kill -9)
print(f"Dlog Process {session_pid} ({ self.target_user}) terminated.")
except (psutil.NoSuchProcess, psutil.AccessDenied):
pass # Skip processes that are already terminated or inaccessible
def update_laps_password(self):
password_rotten = True
action = False
@ -267,8 +285,10 @@ class laps_applier(applier_frontend):
else:
if self.last_login_hours_ago < self.post_authentication_reset_delay:
return
elif self.post_authentication_actions == 0:
return
else:
action = True
action = True if self.post_authentication_actions > 1 else False
try:
@ -294,14 +314,10 @@ class laps_applier(applier_frontend):
self.run_action()
def run_action(self):
if self.post_authentication_actions == 0:
return
elif self.post_authentication_actions == 1:
subprocess.run(["reboot"])
elif self.post_authentication_actions == 2:
...
if self.post_authentication_actions == 2:
self.terminate_user_sessions()
elif self.post_authentication_actions == 3:
...
subprocess.run(["reboot"])
def apply(self):
if self.__module_enabled: