1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-03-20 18:50:17 +03:00

Merge pull request #59 from altlinux/share_automount

Autofs/CIFS applier for mounting Samba shares
This commit is contained in:
Evgeny Sinelnikov 2020-05-14 23:27:02 +04:00 committed by GitHub
commit 9d40910890
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 399 additions and 23 deletions

View File

@ -0,0 +1,142 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import fileinput
import jinja2
import os
import subprocess
from pathlib import Path
from .applier_frontend import applier_frontend
from gpt.drives import json2drive
from util.util import get_homedir
def storage_get_drives(storage, sid):
drives = storage.get_drives(sid)
drive_list = list()
for drv_obj in drives:
drive_list.append(drv_obj)
return drive_list
def add_line_if_missing(filename, ins_line):
with open(filename, 'r+') as f:
for line in f:
if ins_line == line.strip():
break
else:
f.write(ins_line + '\n')
f.flush()
class cifs_applier(applier_frontend):
def __init__(self, storage):
pass
def apply(self):
pass
class cifs_applier_user(applier_frontend):
__auto_file = '/etc/auto.master'
__auto_dir = '/etc/auto.master.gpupdate.d'
__template_path = '/usr/share/gpupdate/templates'
__template_mountpoints = 'autofs_mountpoints.j2'
__template_identity = 'autofs_identity.j2'
__template_auto = 'autofs_auto.j2'
def __init__(self, storage, sid, username):
self.storage = storage
self.sid = sid
self.username = username
self.home = get_homedir(username)
conf_file = '{}.conf'.format(sid)
autofs_file = '{}.autofs'.format(sid)
cred_file = '{}.creds'.format(sid)
self.auto_master_d = Path(self.__auto_dir)
self.user_config = self.auto_master_d / conf_file
if os.path.exists(self.user_config.resolve()):
self.user_config.unlink()
self.user_autofs = self.auto_master_d / autofs_file
if os.path.exists(self.user_autofs.resolve()):
self.user_autofs.unlink()
self.user_creds = self.auto_master_d / cred_file
self.mount_dir = Path(os.path.join(self.home, 'net'))
self.drives = storage_get_drives(self.storage, self.sid)
self.template_loader = jinja2.FileSystemLoader(searchpath=self.__template_path)
self.template_env = jinja2.Environment(loader=self.template_loader)
self.template_mountpoints = self.template_env.get_template(self.__template_mountpoints)
self.template_indentity = self.template_env.get_template(self.__template_identity)
self.template_auto = self.template_env.get_template(self.__template_auto)
def user_context_apply(self):
'''
Nothing to implement.
'''
pass
def admin_context_apply(self):
# Create /etc/auto.master.gpupdate.d directory
self.auto_master_d.mkdir(parents=True, exist_ok=True)
# Create user's destination mount directory
self.mount_dir.mkdir(parents=True, exist_ok=True)
# Add pointer to /etc/auto.master.gpiupdate.d in /etc/auto.master
auto_destdir = '+dir:{}'.format(self.__auto_dir)
add_line_if_missing(self.__auto_file, auto_destdir)
# Collect data for drive settings
drive_list = list()
for drv in self.drives:
drive_settings = dict()
drive_settings['dir'] = drv.dir
drive_settings['login'] = drv.login
drive_settings['password'] = drv.password
drive_settings['path'] = drv.path.replace('\\', '/')
drive_list.append(drive_settings)
if len(drive_list) > 0:
mount_settings = dict()
mount_settings['drives'] = drive_list
mount_text = self.template_mountpoints.render(**mount_settings)
with open(self.user_config.resolve(), 'w') as f:
f.truncate()
f.write(mount_text)
f.flush()
autofs_settings = dict()
autofs_settings['home_dir'] = self.home
autofs_settings['mount_file'] = self.user_config.resolve()
autofs_text = self.template_auto.render(**autofs_settings)
with open(self.user_autofs.resolve(), 'w') as f:
f.truncate()
f.write(autofs_text)
f.flush()
subprocess.check_call(['/bin/systemctl', 'restart', 'autofs'])

View File

@ -33,6 +33,7 @@ from .gsettings_applier import (
gsettings_applier,
gsettings_applier_user
)
from .cifs_applier import cifs_applier_user
from util.windows import get_sid
from util.users import (
is_root,
@ -94,7 +95,8 @@ class frontend_manager:
# files and settings, mostly in $HOME.
self.user_appliers = dict({
'shortcuts': shortcut_applier_user(self.storage, self.sid, self.username),
'gsettings': gsettings_applier_user(self.storage, self.sid, self.username)
'gsettings': gsettings_applier_user(self.storage, self.sid, self.username),
'cifs': cifs_applier_user(self.storage, self.sid, self.username)
})
def machine_apply(self):
@ -123,6 +125,7 @@ class frontend_manager:
logging.debug(slogm('Running user appliers from administrator context'))
self.user_appliers['shortcuts'].admin_context_apply()
self.user_appliers['gsettings'].admin_context_apply()
self.user_appliers['cifs'].admin_context_apply()
logging.debug(slogm('Running user appliers for user context'))
with_privileges(self.username, self.user_appliers['shortcuts'].user_context_apply)

View File

@ -22,24 +22,13 @@ from Crypto.Cipher import AES
from util.xml import get_xml_root
def read_drives(drives_file):
drives = list()
for drive in get_xml_root(drives_file):
drive_obj = drivemap()
props = drive.find('Properties')
drive_obj.set_login(props.get('username'))
drive_obj.set_pass(props.get('cpassword'))
drives.append(drive_obj)
return drives
def decrypt_pass(cpassword):
'''
AES key for cpassword decryption: http://msdn.microsoft.com/en-us/library/2c15cbf0-f086-4c74-8b70-1f2fa45dd4be%28v=PROT.13%29#endNote2
'''
if not cpassword:
return cpassword
key = (
b'\x4e\x99\x06\xe8'
b'\xfc\xb6\x6c\xc9'
@ -53,23 +42,76 @@ def decrypt_pass(cpassword):
cpass_len = len(cpassword)
padded_pass = (cpassword + "=" * ((4 - cpass_len % 4) % 4))
password = b64decode(padded_pass)
decrypter = AES(key, AES.MODE_CBC, '\x00' * 16)
decrypter = AES.new(key, AES.MODE_CBC, '\x00' * 16)
return decrypter.decrypt(password)
# decrypt() returns byte array which is immutable and we need to
# strip padding, then convert UTF-16LE to UTF-8
binstr = decrypter.decrypt(password)
by = list()
for item in binstr:
if item != 16:
by.append(item)
utf16str = bytes(by).decode('utf-16', 'ignore')
utf8str = utf16str.encode('utf8')
return utf8str.decode()
def read_drives(drives_file):
drives = list()
for drive in get_xml_root(drives_file):
drive_obj = drivemap()
props = drive.find('Properties')
drive_obj.set_login(props.get('username'))
drive_obj.set_pass(decrypt_pass(props.get('cpassword')))
drive_obj.set_dir(props.get('letter'))
drive_obj.set_path(props.get('path'))
drives.append(drive_obj)
return drives
def json2drive(json_str):
json_obj = json.loads(json_str)
drive_obj = drivemap()
drive_obj.set_login(json_obj['login'])
drive_obj.set_pass(json_obj['password'])
drive_obj.set_dir(json_obj['dir'])
drive_obj.set_path(json_obj['path'])
return drive_obj
class drivemap:
def __init__(self):
self.login = None
self.password = None
self.dir = None
self.path = None
def set_login(self, username):
self.login = username
if not username:
self.login = ''
def set_pass(self, password):
self.password = password
if not password:
self.password = ''
def set_dir(self, path):
self.dir = path
def set_path(self, path):
self.path = path
def to_json(self):
drive = dict()
drive['login'] = self.login
drive['password'] = self.password
drive['dir'] = self.dir
drive['path'] = self.path
contents = dict()
contents['drive'] = drive

View File

@ -92,10 +92,12 @@ class gpt:
def _find_user(self):
self._user_regpol = self._find_regpol('user')
self._user_shortcuts = self._find_shortcuts('user')
self._user_drives = self._find_drives('user')
def _find_machine(self):
self._machine_regpol = self._find_regpol('machine')
self._machine_shortcuts = self._find_shortcuts('machine')
self._machine_drives = self._find_drives('machine')
def _find_regpol(self, part):
'''
@ -138,13 +140,14 @@ class gpt:
'''
Find Drives.xml files.
'''
search_path = os.path.join(self._machine_path, 'Preferences', 'Drives')
if 'user' == part:
search_path = os.path.join(self._user_path, 'Preferences', 'Drives')
if not search_path:
return None
drives_dir = find_dir(self._machine_prefs, 'Drives')
drives_file = find_file(drives_dir, 'drives.xml')
return find_file(search_path, 'drives.xml')
if 'user' == part:
drives_dir = find_dir(self._user_prefs, 'Drives')
drives_file = find_file(drives_dir, 'drives.xml')
return drives_file
def _find_printers(self, part):
'''
@ -169,6 +172,17 @@ class gpt:
for sc in shortcuts:
self.storage.add_shortcut(self.sid, sc)
def _merge_drives(self):
drives = list()
if self.sid == self.storage.get_info('machine_sid'):
drives = read_drives(self._machine_drives)
else:
drives = read_drives(self._user_drives)
for drv in drives:
self.storage.add_drive(self.sid, drv)
def merge(self):
'''
Merge machine and user (if sid provided) settings to storage.
@ -184,6 +198,10 @@ class gpt:
if self._machine_shortcuts:
logging.debug(slogm('Merging machine shortcuts from {}'.format(self._machine_shortcuts)))
self._merge_shortcuts()
if self._machine_drives:
logging.debug(slogm('Merging machine drives from {}'.format(self._machine_drives)))
self._merge_drives()
else:
# Merge user settings if UserPolicyMode set accordingly
# and user settings (for HKCU) are exist.
@ -195,6 +213,9 @@ class gpt:
if self._user_shortcuts:
logging.debug(slogm('Merging user shortcuts from {} for {}'.format(self._user_shortcuts, self.sid)))
self._merge_shortcuts()
if self._user_drives:
logging.debug(slogm('Merging user drives from {} for {}'.format(self._user_drives, self.sid)))
self._merge_drives()
def __str__(self):
template = '''

View File

@ -57,3 +57,15 @@ class printer_entry(object):
self.sid = sid
self.name = pobj.name
self.printer = pobj.to_json()
class drive_entry(object):
'''
Object mapping representing Samba share bound to drive letter
'''
def __init__(self, sid, dobj):
self.sid = sid
self.login = dobj.login
self.password = dobj.password
self.dir = dobj.dir
self.path = dobj.path

View File

@ -42,6 +42,7 @@ from .record_types import (
, ad_shortcut
, info_entry
, printer_entry
, drive_entry
)
class sqlite_registry(registry):
@ -96,6 +97,17 @@ class sqlite_registry(registry):
Column('printer', String),
UniqueConstraint('sid', 'name')
)
self.__drives = Table(
'Drives',
self.__metadata,
Column('id', Integer, primary_key=True),
Column('sid', String),
Column('login', String),
Column('password', String),
Column('dir', String),
Column('path', String),
UniqueConstraint('sid', 'dir')
)
self.__metadata.create_all(self.db_cnt)
Session = sessionmaker(bind=self.db_cnt)
self.db_session = Session()
@ -105,6 +117,7 @@ class sqlite_registry(registry):
mapper(samba_hkcu_preg, self.__hkcu)
mapper(ad_shortcut, self.__shortcuts)
mapper(printer_entry, self.__printers)
mapper(drive_entry, self.__drives)
except:
pass
#logging.error('Error creating mapper')
@ -179,6 +192,19 @@ class sqlite_registry(registry):
.update(update_obj))
self.db_session.commit()
def _drive_upsert(self, row):
try:
self._add(row)
except:
update_obj = dict({ 'dir': row.dir, 'path': row.path, 'login': row.login, 'password': row.password })
(self
.db_session
.query(drive_entry)
.filter(drive_entry.sid == row.sid)
.filter(drive_entry.dir == row.dir)
.update(update_obj))
self.db_session.commit()
def set_info(self, name, value):
ientry = info_entry(name, value)
logging.debug(slogm('Setting info {}:{}'.format(name, value)))
@ -221,6 +247,11 @@ class sqlite_registry(registry):
logging.debug(slogm('Saving info about printer {} for {}'.format(prn_entry.name, sid)))
self._printer_upsert(prn_entry)
def add_drive(self, sid, dobj):
drv_entry = drive_entry(sid, dobj)
logging.debug(slogm('Saving info about drive mapping {} for {}'.format(drv_entry.path, sid)))
self._drive_upsert(drv_entry)
def get_shortcuts(self, sid):
res = (self
.db_session
@ -237,6 +268,14 @@ class sqlite_registry(registry):
.all())
return res
def get_drives(self, sid):
res = (self
.db_session
.query(drive_entry)
.filter(drive_entry.sid == sid)
.all())
return res
def get_hkcu_entry(self, sid, hive_key):
res = (self
.db_session
@ -302,6 +341,14 @@ class sqlite_registry(registry):
.delete())
self.db_session.commit()
def wipe_drives(self, sid):
(self
.db_session
.query(drive_entry)
.filter(drive_entry.sid == sid)
.delete())
self.db_session.commit()
def wipe_hkcu(self, sid):
(self
.db_session

View File

@ -0,0 +1,20 @@
{#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{{ home_dir }}/net {{ mount_file }} -t 120

View File

@ -0,0 +1,25 @@
{#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{% if login %}
username={{ login }}
{% endif %}
{% if password %}
password={{ password }}
{% endif %}

View File

@ -0,0 +1,21 @@
{#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#}
{%- for drv in drives %}
{{ drv.dir }} -fstype=cifs,cruid=$AUTOFS_UID,sec=krb5 :{{ drv.path }}
{% endfor %}

View File

@ -0,0 +1,42 @@
#
# GPOA - GPO Applier for Linux
#
# Copyright (C) 2019-2020 BaseALT Ltd.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import unittest
import unittest.mock
import os
import util.paths
import json
class GptDrivesTestCase(unittest.TestCase):
@unittest.mock.patch('util.paths.cache_dir')
def test_drive_reader(self, cdir_mock):
'''
Test functionality to read objects from Shortcuts.xml
'''
cdir_mock.return_value = '/var/cache/gpupdate'
import gpt.drives
testdata_path = '{}/test/gpt/data/Drives.xml'.format(os.getcwd())
drvs = gpt.drives.read_drives(testdata_path)
json_obj = json.loads(drvs[0].to_json())
self.assertIsNotNone(json_obj['drive'])

View File

@ -20,6 +20,7 @@ Requires: oddjob-%name >= 0.2.0
Requires: libnss-role >= 0.5.0
Requires: local-policy >= 0.3.0
Requires: pam-config >= 1.8
Requires: autofs
# This is needed by shortcuts_applier
Requires: desktop-file-utils