1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-11-05 20:23:57 +03:00

Compare commits

...

6 Commits

Author SHA1 Message Date
Danila Skachedubov
bbbc0b8289 refactor: optimize FreeIPA backend and fix configuration
- Improve GPO downloading with batch processing and better error handling
    - Fix FreeIPA API calls and server discovery logic
    - Add Samba configuration for FreeIPA integration
    - Clean up imports and code formatting
    - Update package dependencies for FreeIPA support
2025-10-28 18:42:20 +04:00
Danila Skachedubov
d975cd2f10 refactor: optimize GPO downloading and error handling
- Implement batch downloading of GPOs instead of single downloads
    - Improve caching mechanism with separate handling for cached/downloaded GPOs
2025-10-10 10:06:33 +04:00
Danila Skachedubov
cb9c70d6c1 feat: add FreeIPA backend configuration and authentication
- Extend backend options to include FreeIPA in CLI tools
    - Add FreeIPA-Samba auto-configuration during setup
2025-10-09 14:05:59 +04:00
Danila Skachedubov
99feb569a2 feat: add FreeIPA credentials and localization
- Implement ipacreds class for FreeIPA GPO management
    - Add FreeIPA API error handling and localization
    - Add freeipa_backend with GPO download and processing
    - Support FreeIPA in backend factory and setup
2025-10-09 14:00:36 +04:00
Danila Skachedubov
cd1a2fc042 feat: add FreeIPA configuration utility
- Implement ipaopts class for FreeIPA configuration management
    - Add methods to retrieve realm, domain, and host from IPA config
2025-10-09 12:26:18 +04:00
Danila Skachedubov
5e918900c6 feat(backend): integrate FreeIPA backend factory
- Add freeipa_backend to backend factory selection
    - Implement ipacreds initialization
2025-10-08 16:48:21 +04:00
10 changed files with 473 additions and 15 deletions

View File

@@ -24,6 +24,8 @@ from util.logging import log
from util.config import GPConfig
from util.util import get_uid_by_username, touch_file
from util.paths import get_dconf_config_file
from util.ipacreds import ipacreds
from .freeipa_backend import freeipa_backend
from storage.dconf_registry import Dconf_registry, create_dconf_ini_file, add_preferences_to_global_registry_dict
def backend_factory(dc, username, is_machine, no_domain = False):
@@ -52,6 +54,20 @@ def backend_factory(dc, username, is_machine, no_domain = False):
logdata = dict({'error': str(exc)})
log('E7', logdata)
if config.get_backend() == 'freeipa' and not no_domain:
try:
if not dc:
dc = config.get_dc()
if dc:
ld = {'dc': dc}
log('D52', ld)
ipac = ipacreds()
domain = ipac.get_domain()
back = freeipa_backend(ipac, username, domain, is_machine)
except Exception as exc:
logdata = {'error': str(exc)}
log('E77', logdata)
if config.get_backend() == 'local' or no_domain:
log('D8')
try:

View File

@@ -1,7 +1,7 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
# Copyright (C) 2019-2025 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -16,10 +16,231 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import smbc
import re
from .applier_backend import applier_backend
from pathlib import Path
from gpt.gpt import gpt, get_local_gpt
from gpt.gpo_dconf_mapping import GpoInfoDconf
from storage import registry_factory
from storage.dconf_registry import Dconf_registry, extract_display_name_version
from storage.fs_file_cache import fs_file_cache
from util.logging import log
from util.util import get_uid_by_username
from util.kerberos import (
machine_kinit
, machine_kdestroy
)
class freeipa_backend(applier_backend):
def __init__(self):
pass
def __init__(self, ipacreds, username, domain, is_machine):
self.ipacreds = ipacreds
self.cache_path = '/var/cache/gpupdate/creds/krb5cc_{}'.format(os.getpid())
self.__kinit_successful = machine_kinit(self.cache_path, "freeipa")
if not self.__kinit_successful:
raise Exception('kinit is not successful')
self.storage = registry_factory()
self.storage.set_info('domain', domain)
machine_name = self.ipacreds.get_machine_name()
self.storage.set_info('machine_name', machine_name)
self.username = machine_name if is_machine else username
self._is_machine_username = is_machine
self.cache_dir = self.ipacreds.get_cache_dir()
self.gpo_cache_part = 'gpo_cache'
self.gpo_cache_dir = os.path.join(self.cache_dir, self.gpo_cache_part)
self.storage.set_info('cache_dir', self.gpo_cache_dir)
self.file_cache = fs_file_cache("freeipa_gpo", username)
logdata = {'cachedir': self.cache_dir}
log('D7', logdata)
def __del__(self):
if self.__kinit_successful:
machine_kdestroy()
def retrieve_and_store(self):
'''
Retrieve settings and store it in a database - FreeIPA version
'''
try:
if self._is_machine_username:
dconf_dict = Dconf_registry.get_dictionary_from_dconf_file_db(save_dconf_db=True)
else:
uid = get_uid_by_username(self.username)
dconf_dict = Dconf_registry.get_dictionary_from_dconf_file_db(uid, save_dconf_db=True)
except Exception as e:
logdata = {'msg': str(e)}
log('E72', logdata)
if self._is_machine_username:
machine_gpts = []
try:
machine_name = self.storage.get_info('machine_name')
machine_gpts = self._get_gpts(machine_name)
machine_gpts.reverse()
except Exception as exc:
logdata = {'msg': str(exc)}
log('E17', logdata)
for i, gptobj in enumerate(machine_gpts):
try:
gptobj.merge_machine()
except Exception as exc:
logdata = {'msg': str(exc)}
log('E26', logdata)
else:
user_gpts = []
try:
user_gpts = self._get_gpts(self.username)
user_gpts.reverse()
except Exception as exc:
logdata = {'msg': str(exc)}
log('E17', logdata)
for i, gptobj in enumerate(user_gpts):
try:
gptobj.merge_user()
except Exception as exc:
logdata = {'msg': str(exc)}
log('E27', logdata)
def _get_gpts(self, username):
gpts = []
gpos, server = self.ipacreds.update_gpos(username)
if not gpos:
return gpts
if not server:
return gpts
cached_gpos = []
download_gpos = []
for i, gpo in enumerate(gpos):
if gpo.file_sys_path.startswith('/'):
if os.path.exists(gpo.file_sys_path):
logdata = {'gpo_name': gpo.display_name, 'path': gpo.file_sys_path}
log('D11', logdata)
cached_gpos.append(gpo)
else:
download_gpos.append(gpo)
else:
if self._check_sysvol_present(gpo):
download_gpos.append(gpo)
else:
logdata = {'gpo_name': gpo.display_name}
log('W4', logdata)
if download_gpos:
try:
self._download_gpos(download_gpos, server)
logdata = {'count': len(download_gpos)}
log('D50', logdata)
except Exception as e:
logdata = {'msg': str(e), 'count': len(download_gpos)}
log('E35', logdata)
else:
log('D211', {})
all_gpos = cached_gpos + download_gpos
for gpo in all_gpos:
gpt_abspath = gpo.file_sys_path
if not os.path.exists(gpt_abspath):
logdata = {'path': gpt_abspath, 'gpo_name': gpo.display_name}
log('W12', logdata)
continue
if self._is_machine_username:
obj = gpt(gpt_abspath, None, GpoInfoDconf(gpo))
else:
obj = gpt(gpt_abspath, self.username, GpoInfoDconf(gpo))
obj.set_name(gpo.display_name)
gpts.append(obj)
local_gpt = get_local_gpt()
gpts.append(local_gpt)
logdata = {'total_count': len(gpts), 'downloaded_count': len(download_gpos)}
log('I2', logdata)
return gpts
def _check_sysvol_present(self, gpo):
if not gpo.file_sys_path:
if getattr(gpo, 'name', '') != 'Local Policy':
logdata = {'gponame': getattr(gpo, 'name', 'Unknown')}
log('W4', logdata)
return False
if gpo.file_sys_path.startswith('\\\\'):
return True
elif gpo.file_sys_path.startswith('/'):
if os.path.exists(gpo.file_sys_path):
return True
else:
return False
else:
return False
def _download_gpos(self, gpos, server):
cache_dir = self.ipacreds.get_cache_dir()
domain = self.ipacreds.get_domain().upper()
gpo_cache_dir = os.path.join(cache_dir, domain, 'POLICIES')
os.makedirs(gpo_cache_dir, exist_ok=True)
for gpo in gpos:
if not gpo.file_sys_path:
continue
smb_remote_path = None
try:
smb_remote_path = self._convert_to_smb_path(gpo.file_sys_path, server)
local_gpo_path = os.path.join(gpo_cache_dir, gpo.name)
self._download_gpo_directory(smb_remote_path, local_gpo_path)
gpo.file_sys_path = local_gpo_path
except Exception as e:
logdata = {
'msg': str(e),
'gpo_name': gpo.display_name,
'smb_path': smb_remote_path,
}
log('E38', logdata)
def _convert_to_smb_path(self, windows_path, server):
match = re.search(r'\\\\[^\\]+\\(.+)', windows_path)
if not match:
raise Exception(f"Invalid Windows path format: {windows_path}")
relative_path = match.group(1).replace('\\', '/').lower()
smb_url = f"smb://{server}/{relative_path}"
return smb_url
def _download_gpo_directory(self, remote_smb_path, local_path):
os.makedirs(local_path, exist_ok=True)
try:
entries = self.file_cache.samba_context.opendir(remote_smb_path).getdents()
for entry in entries:
if entry.name in [".", ".."]:
continue
remote_entry_path = f"{remote_smb_path}/{entry.name}"
local_entry_path = os.path.join(local_path, entry.name)
if entry.smbc_type == smbc.DIR:
self._download_gpo_directory(remote_entry_path, local_entry_path)
elif entry.smbc_type == smbc.FILE:
try:
os.makedirs(os.path.dirname(local_entry_path), exist_ok=True)
self.file_cache.store(remote_entry_path, Path(local_entry_path))
except Exception as e:
logdata = {'exception': str(e), 'file': entry.name}
log('W30', logdata)
except Exception as e:
logdata = {'exception': str(e), 'remote_folder_path': remote_smb_path}
log('W31', logdata)

View File

@@ -30,6 +30,7 @@ from util.util import (
)
from util.config import GPConfig
from util.paths import get_custom_policy_dir
from frontend.appliers.ini_file import Ini_file
class Runner:
@@ -77,7 +78,7 @@ def parse_arguments():
type=str,
nargs='?',
const='backend',
choices=['local', 'samba'],
choices=['local', 'samba', 'freeipa'],
help='Backend (source of settings) name')
parser_write.add_argument('status',
@@ -92,7 +93,7 @@ def parse_arguments():
type=str,
nargs='?',
const='backend',
choices=['local', 'samba'],
choices=['local', 'samba', 'freeipa'],
help='Backend (source of settings) name')
parser_enable.add_argument('--local-policy',
@@ -101,7 +102,7 @@ def parse_arguments():
parser_enable.add_argument('--backend',
default='samba',
type=str,
choices=['local', 'samba'],
choices=['local', 'samba', 'freeipa'],
help='Backend (source of settings) name')
parser_update.add_argument('--local-policy',
@@ -110,7 +111,7 @@ def parse_arguments():
parser_update.add_argument('--backend',
default='samba',
type=str,
choices=['local', 'samba'],
choices=['local', 'samba', 'freeipa'],
help='Backend (source of settings) name')
@@ -221,6 +222,8 @@ def enable_gp(policy_name, backend_type):
cmd_enable_gpupdate_user_timer = ['/bin/systemctl', '--global', 'enable', 'gpupdate-user.timer']
cmd_enable_gpupdate_scripts_service = ['/bin/systemctl', 'enable', 'gpupdate-scripts-run.service']
cmd_enable_gpupdate_user_scripts_service = ['/bin/systemctl', '--global', 'enable', 'gpupdate-scripts-run-user.service']
cmd_ipa_client_samba = ['/usr/sbin/ipa-client-samba', '--unattended']
config = GPConfig()
@@ -271,6 +274,28 @@ def enable_gp(policy_name, backend_type):
if not is_unit_enabled('gpupdate.timer'):
disable_gp()
return
if backend_type == 'freeipa':
result = runcmd(cmd_ipa_client_samba)
if result[0] != 0:
if "already configured" in str(result[1]) or "already exists" in str(result[1]):
print("FreeIPA is already configured")
else:
print(str(result))
return
else:
print(str(result))
ini_obj = type("ini", (), {})()
ini_obj.path = "/etc/samba/smb.conf"
ini_obj.section = "global"
ini_obj.action = "UPDATE"
ini_obj.property = "log level"
ini_obj.value = "0"
Ini_file(ini_obj)
# Enable gpupdate-setup.timer for all users
if not rollback_on_error(cmd_enable_gpupdate_user_timer):
return

View File

@@ -268,6 +268,12 @@ msgstr "Не удалось обновить LDAP новыми данными п
msgid "Failed to change local user password"
msgstr "Не удалось изменить пароль локального пользователя"
msgid "Unable to initialize Freeipa backend"
msgstr "Невозможно инициализировать бэкэнд Freeipa"
msgid "FreeIPA API Error"
msgstr "Ошибка API FreeIPA"
# Error_end
# Debug
@@ -961,6 +967,9 @@ msgstr "Не найдены входы после изменения парол
msgid "Unknown message type, no message assigned"
msgstr "Неизвестный тип сообщения"
msgid "Failed to load cached versions"
msgstr "Не удалось загрузить кешированные версии"
# Debug_end
# Warning

View File

@@ -112,6 +112,8 @@ def error_code(code):
error_ids[74] = 'Autofs restart failed'
error_ids[75] = 'Failed to update LDAP with new password data'
error_ids[76] = 'Failed to change local user password'
error_ids[77] = 'Unable to initialize Freeipa backend'
error_ids[78] = 'FreeIPA API error'
return error_ids.get(code, 'Unknown error code')
def debug_code(code):
@@ -349,6 +351,7 @@ def debug_code(code):
debug_ids[232] = 'No user login records found'
debug_ids[233] = 'Calculating time since the first user login after their password change'
debug_ids[234] = 'No logins found after password change'
debug_ids[235] = 'Failed to load cached versions'
return debug_ids.get(code, 'Unknown debug code')

68
gpoa/util/ipa.py Normal file
View File

@@ -0,0 +1,68 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import configparser
import os
from ipalib import api
class ipaopts:
def __init__(self):
"""Initialize the class and load the FreeIPA config file."""
self.config_file = "/etc/ipa/default.conf"
self.config = configparser.ConfigParser()
if not os.path.exists(self.config_file):
raise FileNotFoundError(f"Config file for Freeipa{self.config_file} not found.")
self.config.read(self.config_file)
def get_realm(self):
"""Return the Kerberos realm from the config."""
try:
return self.config.get('global', 'realm')
except (configparser.NoSectionError, configparser.NoOptionError):
raise ValueError("Realm not found in config file.")
def get_domain(self):
"""Return the domain from the config."""
try:
return self.config.get('global', 'domain')
except (configparser.NoSectionError, configparser.NoOptionError):
raise ValueError("Domain not found in config file.")
def get_server(self):
"""
Return the FreeIPA PDC Emulator server from API.
"""
try:
result = api.Command.gpmaster_show_pdc()
pdc_server = result['result']['pdc_emulator']
return pdc_server
except Exception as e:
pass
def get_machine_name(self):
"""Return the host from the config."""
try:
return self.config.get('global', 'host')
except (configparser.NoSectionError, configparser.NoOptionError):
raise ValueError("Host not found in config file.")
def get_cache_dir(self):
"""Return the cache directory path."""
return "/var/cache/freeipa/gpo_cache"

102
gpoa/util/ipacreds.py Normal file
View File

@@ -0,0 +1,102 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2025 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>
import smbc
import os
import re
from ipalib import api
from pathlib import Path
from storage.dconf_registry import Dconf_registry, extract_display_name_version
from util.util import get_uid_by_username
from .ipa import ipaopts
from util.logging import log
class ipacreds(ipaopts):
def __init__(self):
super().__init__()
self.smb_context = smbc.Context(use_kerberos=True)
self.gpo_list = []
def update_gpos(self, username):
gpos = []
try:
if not api.isdone('bootstrap'):
api.bootstrap(context='cli')
if not api.isdone('finalize'):
api.finalize()
api.Backend.rpcclient.connect()
try:
server = self.get_server()
is_machine = (username == self.get_machine_name())
if is_machine:
result = api.Command.chain_resolve_for_host(username)
else:
result = api.Command.chain_resolve_for_user(username)
policies_list = result["result"]
try:
if is_machine:
dconf_dict = Dconf_registry.get_dictionary_from_dconf_file_db(save_dconf_db=True)
else:
uid = get_uid_by_username(username)
dconf_dict = Dconf_registry.get_dictionary_from_dconf_file_db(uid, save_dconf_db=True)
dict_gpo_name_version = extract_display_name_version(dconf_dict, username)
except Exception as exc:
logdata = {'exc': str(exc)}
log('D235', logdata)
dict_gpo_name_version = {}
for policy in policies_list:
class SimpleGPO:
def __init__(self, policy_data):
self.display_name = policy_data.get('name', 'Unknown')
self.file_sys_path = policy_data.get('file_system_path', '')
self.version = int(policy_data.get('version', 0))
self.flags = int(policy_data.get('flags', 0))
self.link = policy_data.get('link', 'Unknown')
guid_match = re.search(r'\{[^}]+\}', self.file_sys_path)
self.name = guid_match.group(0) if guid_match else f"policy_{id(self)}"
gpo = SimpleGPO(policy)
if (gpo.display_name in dict_gpo_name_version.keys() and
dict_gpo_name_version.get(gpo.display_name, {}).get('version') == str(gpo.version)):
cached_path = dict_gpo_name_version.get(gpo.display_name, {}).get('correct_path')
if cached_path and Path(cached_path).exists():
gpo.file_sys_path = cached_path
ldata = {'gpo_name': gpo.display_name, 'gpo_uuid': gpo.name, 'file_sys_path_cache': True}
else:
ldata = {'gpo_name': gpo.display_name, 'gpo_uuid': gpo.name, 'file_sys_path': gpo.file_sys_path}
else:
ldata = {'gpo_name': gpo.display_name, 'gpo_uuid': gpo.name, 'file_sys_path': gpo.file_sys_path}
gpos.append(gpo)
finally:
api.Backend.rpcclient.disconnect()
except Exception as exc:
logdata = {'exc': str(exc)}
log('E78', logdata)
return gpos, server
def get_domain(self):
return super().get_domain()
def get_server(self):
return super().get_server()
def get_cache_dir(self):
return super().get_cache_dir()

View File

@@ -22,20 +22,31 @@ import subprocess
from .util import get_machine_name
from .logging import log
from .samba import smbopts
from .ipa import ipaopts
def machine_kinit(cache_name=None):
def machine_kinit(cache_name=None, backend_type=None):
'''
Perform kinit with machine credentials
'''
opts = smbopts()
host = get_machine_name()
realm = opts.get_realm()
with_realm = '{}@{}'.format(host, realm)
os.environ['KRB5CCNAME'] = 'FILE:{}'.format(cache_name)
kinit_cmd = ['kinit', '-k', with_realm]
if backend_type == 'freeipa':
keytab_path = '/etc/samba/samba.keytab'
opts = ipaopts()
host = "cifs/" + opts.get_machine_name()
realm = opts.get_realm()
with_realm = '{}@{}'.format(host, realm)
kinit_cmd = ['kinit', '-kt', keytab_path, with_realm]
else:
opts = smbopts()
host = get_machine_name()
realm = opts.get_realm()
with_realm = '{}@{}'.format(host, realm)
kinit_cmd = ['kinit', '-k', with_realm]
if cache_name:
os.environ['KRB5CCNAME'] = 'FILE:{}'.format(cache_name)
kinit_cmd.extend(['-c', cache_name])
proc = subprocess.Popen(kinit_cmd)
proc.wait()

View File

@@ -106,7 +106,7 @@ def get_backends():
'''
Get the list of backends supported by GPOA
'''
return ['local', 'samba']
return ['local', 'samba', 'freeipa']
def get_default_policy_name():
'''

View File

@@ -34,6 +34,8 @@
%add_python3_req_skip util.windows
%add_python3_req_skip util.xml
%add_python3_req_skip util.gpoa_ini_parsing
%add_python3_req_skip util.ipacreds
%add_python3_req_skip frontend.appliers.ini_file
Name: gpupdate
Version: 0.13.4
@@ -63,6 +65,7 @@ Requires: dconf-profile
Requires: packagekit
Requires: dconf
Requires: libgvdb-gir
Requires: freeipa-client-samba
# This is needed by shortcuts_applier
Requires: desktop-file-utils
# This is needed for smb file cache support