1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-02-04 05:47:00 +03:00

Merge pull request #181 from altlinux/fix_Mapped_Drive

Fix mapped drive for user and machine
This commit is contained in:
Evgeny Sinelnikov 2022-12-04 03:58:14 +04:00 committed by GitHub
commit ce54bae087
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 245 additions and 17 deletions

View File

@ -1,7 +1,7 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
# Copyright (C) 2019-2022 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -16,20 +16,18 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import fileinput
import jinja2
import os
import subprocess
import logging
from pathlib import Path
import string
from .applier_frontend import (
applier_frontend
, check_enabled
)
from gpt.drives import json2drive
from util.util import get_homedir
from util.logging import slogm, log
from util.logging import log
def storage_get_drives(storage, sid):
drives = storage.get_drives(sid)
@ -50,12 +48,84 @@ def add_line_if_missing(filename, ins_line):
f.write(ins_line + '\n')
f.flush()
class Drive_list:
__alphabet = string.ascii_uppercase
def __init__(self):
self.dict_drives = dict()
def __get_letter(self, letter):
slice_letters = set(self.__alphabet[self.__alphabet.find(letter) + 1:]) - set(self.dict_drives.keys())
free_letters = sorted(slice_letters)
if free_letters:
return free_letters[0]
else:
return None
def append(self, drive:dict):
cur_dir = drive['dir']
if cur_dir not in set(self.dict_drives.keys()):
if drive['action'] == 'D':
return
self.dict_drives[cur_dir] = drive
return
else:
if drive['action'] == 'C':
if drive['useLetter'] == '1':
return
else:
new_dir = self.__get_letter(cur_dir)
if not new_dir:
return
drive['dir'] = new_dir
self.dict_drives[new_dir] = drive
return
if drive['action'] == 'U':
self.dict_drives[cur_dir]['thisDrive'] = drive['thisDrive']
self.dict_drives[cur_dir]['allDrives'] = drive['allDrives']
self.dict_drives[cur_dir]['label'] = drive['label']
self.dict_drives[cur_dir]['persistent'] = drive['persistent']
self.dict_drives[cur_dir]['useLetter'] = drive['useLetter']
return
if drive['action'] == 'R':
self.dict_drives[cur_dir] = drive
return
if drive['action'] == 'D':
if drive['useLetter'] == '1':
self.dict_drives.pop(cur_dir, None)
else:
keys_set = set(self.dict_drives.keys())
slice_letters = set(self.__alphabet[self.__alphabet.find(cur_dir):])
for letter_dir in (keys_set & slice_letters):
self.dict_drives.pop(letter_dir, None)
def __call__(self):
return list(self.dict_drives.values())
def len(self):
return len(self.dict_drives)
class cifs_applier(applier_frontend):
def __init__(self, storage):
pass
__module_name = 'CIFSApplier'
__module_enabled = False
__module_experimental = True
def __init__(self, storage, sid):
self.applier_cifs = cifs_applier_user(storage, sid, None)
self.__module_enabled = check_enabled(
storage
, self.__module_name
, self.__module_experimental
)
def apply(self):
pass
if self.__module_enabled:
log('D179')
self.applier_cifs._admin_context_apply()
else:
log('D180')
class cifs_applier_user(applier_frontend):
__module_name = 'CIFSApplierUser'
@ -67,28 +137,44 @@ class cifs_applier_user(applier_frontend):
__template_mountpoints = 'autofs_mountpoints.j2'
__template_identity = 'autofs_identity.j2'
__template_auto = 'autofs_auto.j2'
__template_mountpoints_hide = 'autofs_mountpoints_hide.j2'
__template_auto_hide = 'autofs_auto_hide.j2'
def __init__(self, storage, sid, username):
self.storage = storage
self.sid = sid
self.username = username
self.home = get_homedir(username)
if username:
self.home = '/run/media/' + username
else:
self.home = '/media/gpupdate'
conf_file = '{}.conf'.format(sid)
conf_hide_file = '{}_hide.conf'.format(sid)
autofs_file = '{}.autofs'.format(sid)
autofs_hide_file = '{}_hide.autofs'.format(sid)
cred_file = '{}.creds'.format(sid)
self.auto_master_d = Path(self.__auto_dir)
self.user_config = self.auto_master_d / conf_file
self.user_config_hide = self.auto_master_d / conf_hide_file
if os.path.exists(self.user_config.resolve()):
self.user_config.unlink()
if os.path.exists(self.user_config_hide.resolve()):
self.user_config_hide.unlink()
self.user_autofs = self.auto_master_d / autofs_file
self.user_autofs_hide = self.auto_master_d / autofs_hide_file
if os.path.exists(self.user_autofs.resolve()):
self.user_autofs.unlink()
if os.path.exists(self.user_autofs_hide.resolve()):
self.user_autofs_hide.unlink()
self.user_creds = self.auto_master_d / cred_file
self.mount_dir = Path(os.path.join(self.home, 'net'))
if username:
self.mount_dir = Path(os.path.join(self.home, 'UserDrive'))
else:
self.mount_dir = Path(os.path.join(self.home, 'Drive'))
self.drives = storage_get_drives(self.storage, self.sid)
self.template_loader = jinja2.FileSystemLoader(searchpath=self.__template_path)
@ -98,6 +184,9 @@ class cifs_applier_user(applier_frontend):
self.template_indentity = self.template_env.get_template(self.__template_identity)
self.template_auto = self.template_env.get_template(self.__template_auto)
self.template_mountpoints_hide = self.template_env.get_template(self.__template_mountpoints_hide)
self.template_auto_hide = self.template_env.get_template(self.__template_auto_hide)
self.__module_enabled = check_enabled(
self.storage
, self.__module_name
@ -111,7 +200,7 @@ class cifs_applier_user(applier_frontend):
'''
pass
def __admin_context_apply(self):
def _admin_context_apply(self):
# Create /etc/auto.master.gpupdate.d directory
self.auto_master_d.mkdir(parents=True, exist_ok=True)
# Create user's destination mount directory
@ -122,26 +211,39 @@ class cifs_applier_user(applier_frontend):
add_line_if_missing(self.__auto_file, auto_destdir)
# Collect data for drive settings
drive_list = list()
drive_list = Drive_list()
for drv in self.drives:
drive_settings = dict()
drive_settings['dir'] = drv.dir
drive_settings['login'] = drv.login
drive_settings['password'] = drv.password
drive_settings['path'] = drv.path.replace('\\', '/')
drive_settings['action'] = drv.action
drive_settings['thisDrive'] = drv.thisDrive
drive_settings['allDrives'] = drv.allDrives
drive_settings['label'] = drv.label
drive_settings['persistent'] = drv.persistent
drive_settings['useLetter'] = drv.useLetter
drive_list.append(drive_settings)
if len(drive_list) > 0:
if drive_list.len() > 0:
mount_settings = dict()
mount_settings['drives'] = drive_list
mount_settings['drives'] = drive_list()
mount_text = self.template_mountpoints.render(**mount_settings)
mount_text_hide = self.template_mountpoints_hide.render(**mount_settings)
with open(self.user_config.resolve(), 'w') as f:
f.truncate()
f.write(mount_text)
f.flush()
with open(self.user_config_hide.resolve(), 'w') as f:
f.truncate()
f.write(mount_text_hide)
f.flush()
autofs_settings = dict()
autofs_settings['home_dir'] = self.home
autofs_settings['mount_file'] = self.user_config.resolve()
@ -152,13 +254,20 @@ class cifs_applier_user(applier_frontend):
f.write(autofs_text)
f.flush()
autofs_settings['mount_file'] = self.user_config_hide.resolve()
autofs_text = self.template_auto_hide.render(**autofs_settings)
with open(self.user_autofs_hide.resolve(), 'w') as f:
f.truncate()
f.write(autofs_text)
f.flush()
subprocess.check_call(['/bin/systemctl', 'restart', 'autofs'])
def admin_context_apply(self):
if self.__module_enabled:
log('D146')
self.__admin_context_apply()
self._admin_context_apply()
else:
log('D147')

View File

@ -45,7 +45,9 @@ from .folder_applier import (
folder_applier
, folder_applier_user
)
from .cifs_applier import cifs_applier_user
from .cifs_applier import (
cifs_applier_user
, cifs_applier)
from .ntp_applier import ntp_applier
from .envvar_applier import (
envvar_applier
@ -140,6 +142,13 @@ class frontend_manager:
self.machine_appliers['chromium'] = chromium_applier(self.storage, self.sid, self.username)
self.machine_appliers['shortcuts'] = shortcut_applier(self.storage)
self.machine_appliers['gsettings'] = gsettings_applier(self.storage, self.file_cache)
try:
self.machine_appliers['cifs'] = cifs_applier(self.storage, self.sid)
except Exception as exc:
logdata = dict()
logdata['applier_name'] = 'cifs'
logdata['msg'] = str(exc)
log('E24', logdata)
self.machine_appliers['cups'] = cups_applier(self.storage)
self.machine_appliers['firewall'] = firewall_applier(self.storage)
self.machine_appliers['folders'] = folder_applier(self.storage, self.sid)

View File

@ -67,6 +67,12 @@ def read_drives(drives_file):
drive_obj.set_pass(decrypt_pass(props.get('cpassword')))
drive_obj.set_dir(props.get('letter'))
drive_obj.set_path(props.get('path'))
drive_obj.set_action(props.get('action'))
drive_obj.set_thisDrive(props.get('thisDrive'))
drive_obj.set_allDrives(props.get('allDrives'))
drive_obj.set_label(props.get('label'))
drive_obj.set_persistent(props.get('persistent'))
drive_obj.set_useLetter(props.get('useLetter'))
drives.append(drive_obj)
@ -93,6 +99,12 @@ class drivemap:
self.password = None
self.dir = None
self.path = None
self.action = None
self.thisDrive = None
self.allDrives = None
self.label = None
self.persistent = None
self.useLetter = None
def set_login(self, username):
self.login = username
@ -110,6 +122,24 @@ class drivemap:
def set_path(self, path):
self.path = path
def set_action(self, action):
self.action = action
def set_thisDrive(self, thisDrive):
self.thisDrive = thisDrive
def set_allDrives(self, allDrives):
self.allDrives = allDrives
def set_label(self, label):
self.label = label
def set_persistent(self, persistent):
self.persistent = persistent
def set_useLetter(self, useLetter):
self.useLetter = useLetter
def to_json(self):
drive = dict()
drive['login'] = self.login

View File

@ -741,6 +741,12 @@ msgstr "Сохранение информации об ini-файле"
msgid "Dictionary key generation failed"
msgstr "Формирования ключа словаря не удалось"
msgid "Running CIFS applier for machine"
msgstr "Запуск применение настроек CIFS для машины"
msgid "CIFS applier for machine will not be started"
msgstr "Применение настроек CIFS для машины не будет запущено"
# Debug_end
# Warning

View File

@ -283,6 +283,8 @@ def debug_code(code):
debug_ids[176] = 'Ini-file is not readable'
debug_ids[177] = 'Saving information about ini-file'
debug_ids[178] = 'Dictionary key generation failed'
debug_ids[179] = 'Running CIFS applier for machine'
debug_ids[180] = 'CIFS applier for machine will not be started'
return debug_ids.get(code, 'Unknown debug code')

View File

@ -115,6 +115,13 @@ class drive_entry(object):
self.password = dobj.password
self.dir = dobj.dir
self.path = dobj.path
self.action = dobj.action
self.thisDrive = dobj.thisDrive
self.allDrives = dobj.allDrives
self.label = dobj.label
self.persistent = dobj.persistent
self.useLetter = dobj.useLetter
def update_fields(self):
fields = dict()
@ -123,6 +130,12 @@ class drive_entry(object):
fields['password'] = self.password
fields['dir'] = self.dir
fields['path'] = self.path
fields['action'] = self.action
fields['thisDrive'] = self.thisDrive
fields['allDrives'] = self.allDrives
fields['label'] = self.label
fields['persistent'] = self.persistent
fields['useLetter'] = self.useLetter
return fields

View File

@ -120,6 +120,12 @@ class sqlite_registry(registry):
, Column('dir', String)
, Column('policy_name', String)
, Column('path', String)
, Column('action', String)
, Column('thisDrive', String)
, Column('allDrives', String)
, Column('label', String)
, Column('persistent', String)
, Column('useLetter', String)
, UniqueConstraint('sid', 'dir')
)
self.__folders = Table(

View File

@ -0,0 +1,20 @@
{#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2022 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{{ home_dir }}/.net {{ mount_file }} -t 120

View File

@ -1,7 +1,7 @@
{#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
# Copyright (C) 2019-2022 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -17,5 +17,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{%- for drv in drives %}
{% if (drv.thisDrive != 'HIDE') %}
{% if drv.label %}
{{ drv.label }} -fstype=cifs,cruid=$USER,sec=krb5,noperm :{{ drv.path }}
{% else %}
{{ drv.dir }} -fstype=cifs,cruid=$USER,sec=krb5,noperm :{{ drv.path }}
{% endif %}
{% endif %}
{% endfor %}

View File

@ -0,0 +1,27 @@
{#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2022 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{%- for drv in drives %}
{% if (drv.thisDrive == 'HIDE') %}
{% if drv.label %}
{{ drv.label }} -fstype=cifs,cruid=$USER,sec=krb5,noperm :{{ drv.path }}
{% else %}
{{ drv.dir }} -fstype=cifs,cruid=$USER,sec=krb5,noperm :{{ drv.path }}
{% endif %}
{% endif %}
{% endfor %}