1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-03-21 18:50:38 +03:00

The way to find the desired DC is supplemented

This commit is contained in:
Valery Sinelnikov 2024-02-21 11:49:02 +04:00
parent 5d2fb3f719
commit 47dc1df796

View File

@ -43,7 +43,7 @@ import optparse
import ldb
import ipaddress
import netifaces
import random
class smbcreds (smbopts):
@ -52,8 +52,10 @@ class smbcreds (smbopts):
self.credopts = options.CredentialsOptions(self.parser)
self.creds = self.credopts.get_credentials(self.lp, fallback_machine=True)
self.set_dc(dc_fqdn)
self.dc_site_servers = SiteDomainScanner(self.creds, self.lp, self.selected_dc).select_servers()
self.sDomain = SiteDomainScanner(self.creds, self.lp, self.selected_dc)
self.dc_site_servers = self.sDomain.select_site_servers()
self.all_servers = self.sDomain.select_all_servers()
self.pdc_emulator_server = self.sDomain.select_pdc_emulator_server()
def get_dc(self):
return self.selected_dc
@ -125,14 +127,17 @@ class smbcreds (smbopts):
return gpos
def update_gpos(self, username):
gpos = self.get_gpos(username)
list_selected_dc = set()
if len(self.dc_site_servers) > 1:
list_selected_dc.add(self.dc_site_servers.pop(0))
if self.dc_site_servers:
self.selected_dc = self.dc_site_servers.pop()
else:
list_selected_dc.add(self.selected_dc)
self.selected_dc = self.all_servers.pop()
list_selected_dc.add(self.selected_dc)
gpos = self.get_gpos(username)
while list_selected_dc:
logdata = dict()
@ -146,27 +151,27 @@ class smbcreds (smbopts):
except NTSTATUSError as smb_exc:
logdata['smb_exc'] = str(smb_exc)
if not check_scroll_enabled():
if self.dc_site_servers:
self.selected_dc = self.dc_site_servers.pop()
if self.pdc_emulator_server:
self.selected_dc = self.pdc_emulator_server
else:
log('F1', logdata)
raise smb_exc
else:
if len(self.dc_site_servers) > 1:
self.selected_dc = self.dc_site_servers.pop(0)
if self.dc_site_servers:
self.selected_dc = self.dc_site_servers.pop()
elif self.all_servers:
self.selected_dc = self.all_servers.pop()
else:
self.selected_dc = self.pdc_emulator_server
if self.selected_dc not in list_selected_dc:
logdata['action'] = 'Search another dc'
log('W11', logdata)
list_selected_dc.add(self.selected_dc)
else:
self.selected_dc = get_dc_hostname(self.creds, self.lp)
if self.selected_dc not in list_selected_dc:
logdata['action'] = 'Search another dc'
log('W11', logdata)
list_selected_dc.add(self.selected_dc)
else:
if self.dc_site_servers:
self.selected_dc = self.dc_site_servers.pop()
else:
log('F1', logdata)
raise smb_exc
log('F1', logdata)
raise smb_exc
except Exception as exc:
logdata['exc'] = str(exc)
log('F1', logdata)
@ -177,6 +182,7 @@ class smbcreds (smbopts):
class SiteDomainScanner:
def __init__(self, smbcreds, lp, dc):
self.samdb = SamDB(url='ldap://{}'.format(dc), session_info=system_session(), credentials=smbcreds, lp=lp)
self.pdc_emulator = self._search_pdc_emulator()
@staticmethod
def _get_ldb_single_message_attr(ldb_message, attr_name, encoding='utf8'):
@ -228,33 +234,48 @@ class SiteDomainScanner:
servers_dn.add_base(site_dn)
res = self.samdb.search(servers_dn, ldb.SCOPE_ONELEVEL, expression='objectClass=server', attrs=['dNSHostName'])
servers = [self._get_ldb_single_message_attr(msg, 'dNSHostName') for msg in res]
random.shuffle(servers)
return servers
def get_ad_all_servers(self):
sites_dn = ldb.Dn(self.samdb, "CN=Sites")
config_dn = self.samdb.get_config_basedn()
sites_dn.add_base(config_dn)
res = self.samdb.search(sites_dn, ldb.SCOPE_SUBTREE, expression='objectClass=server', attrs=['dNSHostName'])
servers = [self._get_ldb_single_message_attr(msg, 'dNSHostName') for msg in res]
random.shuffle(servers)
return servers
def check_ip_in_subnets(self, ip_addresses, subnets_sites):
return next((subnets_sites[subnet] for subnet in subnets_sites.keys()
if any(ip_address in subnet for ip_address in ip_addresses)), None)
def select_servers(self):
def select_site_servers(self):
try:
ip_addresses = self.get_ip_addresses()
subnets_sites = self.get_ad_subnets_sites()
our_site = self.check_ip_in_subnets(ip_addresses, subnets_sites)
pdc_emulator = self._search_pdc_emulator()
servers = []
if our_site:
servers = self.get_ad_site_servers(our_site)
if pdc_emulator in servers:
pdc_index = servers.index(pdc_emulator)
servers.insert(0, servers.pop(pdc_index))
else:
servers.append(pdc_emulator)
return servers
else:
return [pdc_emulator]
random.shuffle(servers)
return servers
except Exception as e:
return None
def select_all_servers(self):
try:
servers = self.get_ad_all_servers()
random.shuffle(servers)
return servers
except Exception as e:
return None
def select_pdc_emulator_server(self):
return self.pdc_emulator
def expand_windows_var(text, username=None):
'''
Scan the line for percent-encoded variables and expand them.