mirror of
https://github.com/altlinux/gpupdate.git
synced 2025-03-21 18:50:38 +03:00
Merge pull request #183 from altlinux/addition_to_files
Improve file copy applier support
This commit is contained in:
commit
601e8b1072
@ -28,6 +28,8 @@ from pathlib import Path
|
||||
from util.windows import expand_windows_var
|
||||
from util.util import get_homedir
|
||||
from util.exceptions import NotUNCPathError
|
||||
from util.paths import UNCPath
|
||||
import fnmatch
|
||||
|
||||
class Files_cp:
|
||||
def __init__(self, file_obj, file_cache ,username=None):
|
||||
@ -38,78 +40,122 @@ class Files_cp:
|
||||
return
|
||||
self.fromPath = (expand_windows_var(file_obj.fromPath, username).replace('\\', '/')
|
||||
if file_obj.fromPath else None)
|
||||
self.isTargetPathDirectory = False
|
||||
self.action = action_letter2enum(file_obj.action)
|
||||
self.readOnly = str2bool(file_obj.readOnly)
|
||||
self.archive = str2bool(file_obj.archive)
|
||||
self.hidden = str2bool(file_obj.hidden)
|
||||
self.suppress = str2bool(file_obj.suppress)
|
||||
self.username = username
|
||||
self.fromPathFiles = self.get_list_files()
|
||||
self.fromPathFiles = list()
|
||||
if self.fromPath:
|
||||
if targetPath[-1] == '/' or self.is_pattern(Path(self.fromPath).name):
|
||||
self.isTargetPathDirectory = True
|
||||
self.get_list_files()
|
||||
self.act()
|
||||
|
||||
def get_target_file(self, targetPath, fromPath):
|
||||
def get_target_file(self, targetPath:Path, fromFile:str) -> Path:
|
||||
try:
|
||||
if fromPath and targetPath.is_dir():
|
||||
if self.hidden:
|
||||
return targetPath.joinpath('.' + fromPath.name)
|
||||
if fromFile:
|
||||
fromFileName = Path(fromFile).name
|
||||
if self.isTargetPathDirectory:
|
||||
targetPath.mkdir(parents = True, exist_ok = True)
|
||||
else:
|
||||
return targetPath.joinpath(fromPath.name)
|
||||
targetPath.parent.mkdir(parents = True, exist_ok = True)
|
||||
targetPath = targetPath.parent
|
||||
fromFileName = self.targetPath.name
|
||||
if self.hidden:
|
||||
return targetPath.joinpath('.' + fromFileName)
|
||||
else:
|
||||
return targetPath.joinpath(fromFileName)
|
||||
|
||||
else:
|
||||
if not self.hidden:
|
||||
return targetPath
|
||||
else:
|
||||
return targetPath.parent.joinpath('.' + targetPath.name)
|
||||
|
||||
except Exception as exc:
|
||||
logdata = dict({'exc': exc})
|
||||
logdata = dict()
|
||||
logdata['targetPath'] = targetPath
|
||||
logdata['fromFile'] = fromFile
|
||||
logdata['exc'] = exc
|
||||
log('D163', logdata)
|
||||
|
||||
return None
|
||||
|
||||
def copy_target_file(self, targetFile:Path, fromFile:str):
|
||||
try:
|
||||
uri_path = UNCPath(fromFile)
|
||||
self.file_cache.store(fromFile, targetFile)
|
||||
except NotUNCPathError as exc:
|
||||
fromFilePath = Path(fromFile)
|
||||
if fromFilePath.exists():
|
||||
targetFile.write_bytes(fromFilePath.read_bytes())
|
||||
except Exception as exc:
|
||||
logdata = dict()
|
||||
logdata['targetFile'] = targetFile
|
||||
logdata['fromFile'] = fromFile
|
||||
logdata['exc'] = exc
|
||||
log('W15', logdata)
|
||||
|
||||
def set_read_only(self, targetFile):
|
||||
if self.readOnly:
|
||||
shutil.os.chmod(targetFile, int('444', base = 8))
|
||||
shutil.os.chmod(targetFile, 0o444)
|
||||
else:
|
||||
shutil.os.chmod(targetFile, int('664', base = 8))
|
||||
shutil.os.chmod(targetFile, 0o664)
|
||||
|
||||
def _create_action(self):
|
||||
for fromPath in self.fromPathFiles:
|
||||
logdata = dict()
|
||||
for fromFile in self.fromPathFiles:
|
||||
targetFile = None
|
||||
|
||||
try:
|
||||
targetFile = self.get_target_file(self.targetPath, fromPath)
|
||||
if not targetFile.exists():
|
||||
targetFile.write_bytes(fromPath.read_bytes())
|
||||
targetFile = self.get_target_file(self.targetPath, fromFile)
|
||||
if not targetFile and not targetFile.exists():
|
||||
self.copy_target_file(targetFile, fromFile)
|
||||
if self.username:
|
||||
shutil.chown(targetFile, self.username)
|
||||
self.set_read_only(targetFile)
|
||||
logdata['File'] = targetFile
|
||||
log('D191', logdata)
|
||||
except Exception as exc:
|
||||
logdata = dict()
|
||||
logdata['exc'] = exc
|
||||
logdata['fromPath'] = fromPath
|
||||
logdata['fromPath'] = fromFile
|
||||
logdata['targetPath'] = self.targetPath
|
||||
logdata['targetFile'] = targetFile
|
||||
log('D164', logdata)
|
||||
|
||||
def _delete_action(self):
|
||||
targetFile = Path(self.targetPath)
|
||||
try:
|
||||
if targetFile.exists():
|
||||
targetFile.unlink()
|
||||
except Exception as exc:
|
||||
logdata = dict()
|
||||
logdata['exc'] = exc
|
||||
logdata['targetPath'] = self.targetPath
|
||||
logdata['targetFile'] = targetFile
|
||||
log('D165', logdata)
|
||||
list_target = [self.targetPath.name]
|
||||
if self.is_pattern(self.targetPath.name):
|
||||
list_target = fnmatch.filter([str(x.name) for x in self.targetPath.parent.iterdir() if x.is_file()], self.targetPath.name)
|
||||
logdata = dict()
|
||||
for targetFile in list_target:
|
||||
targetFile = self.targetPath.parent.joinpath(targetFile)
|
||||
try:
|
||||
if targetFile.exists():
|
||||
targetFile.unlink()
|
||||
logdata['File'] = targetFile
|
||||
log('D192', logdata)
|
||||
|
||||
except Exception as exc:
|
||||
logdata['exc'] = exc
|
||||
logdata['targetPath'] = self.targetPath
|
||||
logdata['targetFile'] = targetFile
|
||||
log('D165', logdata)
|
||||
|
||||
def _update_action(self):
|
||||
for fromPath in self.fromPathFiles:
|
||||
targetFile = self.get_target_file(self.targetPath, fromPath)
|
||||
logdata = dict()
|
||||
for fromFile in self.fromPathFiles:
|
||||
targetFile = self.get_target_file(self.targetPath, fromFile)
|
||||
try:
|
||||
targetFile.write_bytes(fromPath.read_bytes())
|
||||
self.copy_target_file(targetFile, fromFile)
|
||||
if self.username:
|
||||
shutil.chown(self.targetPath, self.username)
|
||||
self.set_read_only(targetFile)
|
||||
logdata['File'] = targetFile
|
||||
log('D192', logdata)
|
||||
except Exception as exc:
|
||||
logdata = dict()
|
||||
logdata['exc'] = exc
|
||||
logdata['fromPath'] = self.fromPath
|
||||
logdata['targetPath'] = self.targetPath
|
||||
@ -127,70 +173,52 @@ class Files_cp:
|
||||
self._delete_action()
|
||||
self._create_action()
|
||||
|
||||
def is_pattern(self, name):
|
||||
if name.find('*') != -1 or name.find('?') != -1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def get_list_files(self):
|
||||
ls_all_files = list()
|
||||
logdata = dict()
|
||||
logdata['targetPath'] = self.targetPath
|
||||
if self.fromPath and self.fromPath.split('/')[-1] != '*':
|
||||
logdata['targetPath'] = str(self.targetPath)
|
||||
fromFilePath = Path(self.fromPath)
|
||||
if not self.is_pattern(fromFilePath.name):
|
||||
self.fromPathFiles.append(self.fromPath)
|
||||
else:
|
||||
fromPathDir = self.fromPath[:self.fromPath.rfind('/')]
|
||||
|
||||
try:
|
||||
self.file_cache.store(self.fromPath)
|
||||
fromPath = Path(self.file_cache.get(self.fromPath))
|
||||
ls_all_files.append(fromPath)
|
||||
uri_path = UNCPath(fromPathDir)
|
||||
ls_files = self.file_cache.get_ls_smbdir(fromPathDir)
|
||||
if ls_files:
|
||||
filtered_ls_files = fnmatch.filter(ls_files, fromFilePath.name)
|
||||
if filtered_ls_files:
|
||||
self.fromPathFiles = [fromPathDir + '/' + file_s for file_s in filtered_ls_files]
|
||||
except NotUNCPathError as exc:
|
||||
fromPath = Path(self.fromPath)
|
||||
if fromPath.exists():
|
||||
ls_all_files.append(fromPath)
|
||||
except Exception as exc:
|
||||
logdata['fromPath'] = self.fromPath
|
||||
logdata['exc'] = exc
|
||||
log('W13', logdata)
|
||||
elif self.fromPath and len(self.fromPath.split('/')) > 2:
|
||||
ls_files = self.file_cache.get_ls_smbdir(self.fromPath[:-1])
|
||||
if ls_files:
|
||||
ls_from_paths = [self.fromPath[:-1] + file_s for file_s in ls_files]
|
||||
for from_path in ls_from_paths:
|
||||
try:
|
||||
self.file_cache.store(from_path)
|
||||
fromPath = Path(self.file_cache.get(from_path))
|
||||
ls_all_files.append(fromPath)
|
||||
except Exception as exc:
|
||||
logdata['fromPath'] = self.fromPath
|
||||
logdata['exc'] = exc
|
||||
log('W13', logdata)
|
||||
else:
|
||||
try:
|
||||
fromLocalPath = Path(self.fromPath[:-1])
|
||||
if fromLocalPath.is_dir():
|
||||
ls = [fromFile for fromFile in fromLocalPath.iterdir() if fromFile.is_file()]
|
||||
for fromPath in ls:
|
||||
ls_all_files.append(fromPath)
|
||||
exact_path = Path(fromPathDir)
|
||||
if exact_path.is_dir():
|
||||
self.fromPathFiles = [str(fromFile) for fromFile in exact_path.iterdir() if fromFile.is_file()]
|
||||
except Exception as exc:
|
||||
logdata['fromPath'] = self.fromPath
|
||||
logdata['exc'] = exc
|
||||
log('W13', logdata)
|
||||
else:
|
||||
fromPath = Path(self.fromPath) if self.fromPath else None
|
||||
ls_all_files.append(fromPath)
|
||||
return ls_all_files
|
||||
log('W3316', logdata)
|
||||
except Exception as exc:
|
||||
logdata['fromPath'] = self.fromPath
|
||||
logdata['exc'] = exc
|
||||
log('W3317', logdata)
|
||||
|
||||
def check_target_path(path_to_check, username = None):
|
||||
'''
|
||||
Function for checking the correctness of the path
|
||||
'''
|
||||
if not path_to_check:
|
||||
return None
|
||||
|
||||
checking = Path(path_to_check)
|
||||
if checking.is_dir():
|
||||
if username and path_to_check == '/':
|
||||
return Path(get_homedir(username))
|
||||
return checking
|
||||
#Check for path directory without '/something' suffix
|
||||
elif (len(path_to_check.split('/')) > 2
|
||||
and Path(path_to_check.replace(path_to_check.split('/')[-1], '')).is_dir()):
|
||||
return checking
|
||||
elif username:
|
||||
target_path = Path(get_homedir(username))
|
||||
res = target_path.joinpath(path_to_check
|
||||
if path_to_check[0] != '/'
|
||||
else path_to_check[1:])
|
||||
return res
|
||||
else:
|
||||
return False
|
||||
rootpath = Path('/')
|
||||
if username:
|
||||
rootpath = Path(get_homedir(username))
|
||||
|
||||
return rootpath.joinpath(checking)
|
||||
|
@ -771,12 +771,21 @@ msgstr "Запись настройки Яндекс Браузера в"
|
||||
msgid "Running networkshare applier for user"
|
||||
msgstr "Запуск применение настроек сетевых каталогов для пользователя"
|
||||
|
||||
msgid "File copy"
|
||||
msgstr "Копирование файла"
|
||||
|
||||
msgid "Running networkshare applier for user will not be started"
|
||||
msgstr "Применение настроек сетевых каталогов для пользователя не будет запущено"
|
||||
|
||||
msgid "File update"
|
||||
msgstr "Обновление файла"
|
||||
|
||||
msgid "Applying settings for network share"
|
||||
msgstr "Применение настроек для сетевой папки"
|
||||
|
||||
msgid "Deleting a file"
|
||||
msgstr "Удаление файла"
|
||||
|
||||
# Debug_end
|
||||
|
||||
# Warning
|
||||
@ -823,6 +832,9 @@ msgstr "Не удалось кэшировать файл"
|
||||
msgid "Could not create a valid list of keys"
|
||||
msgstr "Не удалось создать допустимый список ключей"
|
||||
|
||||
msgid "Failed to copy file"
|
||||
msgstr "Не удалось скопировать файл"
|
||||
|
||||
# Fatal
|
||||
msgid "Unable to refresh GPO list"
|
||||
msgstr "Невозможно обновить список объектов групповых политик"
|
||||
|
@ -295,6 +295,9 @@ def debug_code(code):
|
||||
debug_ids[188] = 'Running networkshare applier for user'
|
||||
debug_ids[189] = 'Running networkshare applier for user will not be started'
|
||||
debug_ids[190] = 'Applying settings for network share'
|
||||
debug_ids[191] = 'File copy'
|
||||
debug_ids[192] = 'File update'
|
||||
debug_ids[193] = 'Deleting a file'
|
||||
|
||||
return debug_ids.get(code, 'Unknown debug code')
|
||||
|
||||
@ -320,6 +323,7 @@ def warning_code(code):
|
||||
warning_ids[12] = 'Failed to read the list of files'
|
||||
warning_ids[13] = 'Failed to caching the file'
|
||||
warning_ids[14] = 'Could not create a valid list of keys'
|
||||
warning_ids[15] = 'Failed to copy file'
|
||||
|
||||
return warning_ids.get(code, 'Unknown warning code')
|
||||
|
||||
|
@ -39,15 +39,17 @@ class fs_file_cache:
|
||||
self.samba_context = smbc.Context(use_kerberos=1)
|
||||
#, debug=10)
|
||||
|
||||
def store(self, uri):
|
||||
destdir = uri
|
||||
def store(self, uri, destfile = None):
|
||||
try:
|
||||
uri_path = UNCPath(uri)
|
||||
file_name = os.path.basename(uri_path.get_path())
|
||||
file_path = os.path.dirname(uri_path.get_path())
|
||||
destdir = Path('{}/{}/{}'.format(self.storage_uri,
|
||||
uri_path.get_domain(),
|
||||
file_path))
|
||||
if not destfile:
|
||||
file_name = os.path.basename(uri_path.get_path())
|
||||
file_path = os.path.dirname(uri_path.get_path())
|
||||
destdir = Path('{}/{}/{}'.format(self.storage_uri,
|
||||
uri_path.get_domain(),
|
||||
file_path))
|
||||
else:
|
||||
destdir = destfile.parent
|
||||
except Exception as exc:
|
||||
logdata = dict({'exception': str(exc)})
|
||||
log('D144', logdata)
|
||||
@ -56,9 +58,10 @@ class fs_file_cache:
|
||||
if not destdir.exists():
|
||||
destdir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
destfile = Path('{}/{}/{}'.format(self.storage_uri,
|
||||
uri_path.get_domain(),
|
||||
uri_path.get_path()))
|
||||
if not destfile:
|
||||
destfile = Path('{}/{}/{}'.format(self.storage_uri,
|
||||
uri_path.get_domain(),
|
||||
uri_path.get_path()))
|
||||
|
||||
with open(destfile, 'wb') as df:
|
||||
df.truncate()
|
||||
|
Loading…
x
Reference in New Issue
Block a user