1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-03-21 18:50:38 +03:00

Added a class for site domain search

This commit is contained in:
Valery Sinelnikov 2024-02-16 18:34:12 +04:00
parent 66bae5a1af
commit ac2190809a

View File

@ -30,8 +30,6 @@ except ImportError:
from samba.netcmd.common import netcmd_get_domain_infos_via_cldap
import samba.gpo
from storage import cache_factory
from messages import message_with_code
from .xdg import (
xdg_get_desktop
)
@ -39,6 +37,12 @@ from .util import get_homedir
from .logging import log
from .samba import smbopts
from gpoa.storage import registry_factory
from samba.samdb import SamDB
from samba.auth import system_session
import optparse
import ldb
import ipaddress
import netifaces
class smbcreds (smbopts):
@ -48,6 +52,9 @@ class smbcreds (smbopts):
self.credopts = options.CredentialsOptions(self.parser)
self.creds = self.credopts.get_credentials(self.lp, fallback_machine=True)
self.set_dc(dc_fqdn)
self.dc_site = SiteDomainScanner(self.creds, self.selected_dc).select_servers()
if self.dc_site is not None:
self.selected_dc = self.dc_site
def get_dc(self):
return self.selected_dc
@ -152,6 +159,61 @@ class smbcreds (smbopts):
raise exc
return gpos
class SiteDomainScanner:
def __init__(self, smbcreds, dc):
self.smbcreds = smbcreds
parser = optparse.OptionParser(None)
sambaopts = options.SambaOptions(parser)
lp = sambaopts.get_loadparm()
self.samdb = SamDB(url='ldap://{}'.format(dc), session_info=system_session(), credentials=smbcreds, lp=lp)
def get_ip_addresses(self):
interface_list = netifaces.interfaces()
addresses = []
for iface in interface_list:
address_entry = netifaces.ifaddresses(iface)
if netifaces.AF_INET in address_entry:
addresses.extend(ipaddress.ip_address(ipv4_address_entry['addr']) for ipv4_address_entry in address_entry[netifaces.AF_INET])
if netifaces.AF_INET6 in address_entry:
addresses.extend(ipaddress.ip_address(ipv6_address_entry['addr']) for ipv6_address_entry in address_entry[netifaces.AF_INET6])
return addresses
def get_ad_subnets_sites(self):
subnet_dn = ldb.Dn(self.samdb, "CN=Subnets,CN=Sites")
config_dn = self.samdb.get_config_basedn()
subnet_dn.add_base(config_dn)
res = self.samdb.search(subnet_dn, ldb.SCOPE_ONELEVEL, expression='objectClass=subnet', attrs=['cn', 'siteObject'])
subnets = {ipaddress.ip_network(msg['cn'][0].decode('utf8')): msg['siteObject'][0].decode('utf8') for msg in res}
return subnets
def get_ad_site_servers(self, site):
servers_dn = ldb.Dn(self.samdb, "CN=Servers")
site_dn = ldb.Dn(self.samdb, site)
servers_dn.add_base(site_dn)
res = self.samdb.search(servers_dn, ldb.SCOPE_ONELEVEL, expression='objectClass=server', attrs=['dNSHostName'])
servers = [msg['dNSHostName'][0].decode('utf8') for msg in res]
return servers[0] if servers else None
def check_ip_in_subnets(self, ip_addresses, subnets_sites):
return next((subnets_sites[subnet] for subnet in subnets_sites.keys()
if any(ip_address in subnet for ip_address in ip_addresses)), None)
def select_servers(self):
try:
ip_addresses = self.get_ip_addresses()
subnets_sites = self.get_ad_subnets_sites()
our_site = self.check_ip_in_subnets(ip_addresses, subnets_sites)
if our_site:
servers = self.get_ad_site_servers(our_site)
return servers
else:
return None
except Exception as e:
return None
def expand_windows_var(text, username=None):
'''
Scan the line for percent-encoded variables and expand them.