mirror of
https://github.com/altlinux/gpupdate.git
synced 2025-03-21 18:50:38 +03:00
appliers/file_cp.py: fixes for refactored copy files algorithms
This commit is contained in:
parent
87d873862a
commit
d9191e47fa
@ -49,22 +49,25 @@ class Files_cp:
|
||||
self.hidden = str2bool(file_obj.hidden)
|
||||
self.suppress = str2bool(file_obj.suppress)
|
||||
self.username = username
|
||||
self.fromPathFiles = list()
|
||||
if self.fromPath:
|
||||
self.fromPathFiles = self.get_list_files()
|
||||
self.get_list_files()
|
||||
self.act()
|
||||
|
||||
def get_target_file(self, targetPath:Path, fromFile:str) -> Path:
|
||||
try:
|
||||
if fromFile:
|
||||
fromFileName = Path(fromFile).name
|
||||
if self.isTargetPathDirectory:
|
||||
targetPath.mkdir(parents = True, exist_ok = True)
|
||||
else:
|
||||
targetPath.parent.mkdir(parents = True, exist_ok = True)
|
||||
targetPath = targetPath.parent
|
||||
fromFileName = self.targetPath.name
|
||||
if self.hidden:
|
||||
return targetPath.joinpath('.' + Path(fromFile).name)
|
||||
return targetPath.joinpath('.' + fromFileName)
|
||||
else:
|
||||
return targetPath.joinpath(Path(fromFile).name)
|
||||
return targetPath.joinpath(fromFileName)
|
||||
|
||||
else:
|
||||
if not self.hidden:
|
||||
@ -72,21 +75,25 @@ class Files_cp:
|
||||
else:
|
||||
return targetPath.parent.joinpath('.' + targetPath.name)
|
||||
except Exception as exc:
|
||||
logdata = dict()
|
||||
logdata['targetPath'] = targetPath
|
||||
logdata['fromFile'] = fromFile
|
||||
logdata['exc'] = exc
|
||||
log('W3314', logdata)
|
||||
|
||||
return None
|
||||
|
||||
def copy_target_file(self, targetFile:Path, fromFile:str):
|
||||
try:
|
||||
uri_path = UNCPath(self.fromPath)
|
||||
self.file_cache.store(fromFile, targetPath)
|
||||
uri_path = UNCPath(fromFile)
|
||||
self.file_cache.store(fromFile, targetFile)
|
||||
except NotUNCPathError as exc:
|
||||
fromFilePath = Path(fromFile)
|
||||
if fromFilePath.exists():
|
||||
targetFile.write_bytes(fromFilePath.read_bytes())
|
||||
except Exception as exc:
|
||||
logdata['targetPath'] = targetPath
|
||||
logdata = dict()
|
||||
logdata['targetFile'] = targetFile
|
||||
logdata['fromFile'] = fromFile
|
||||
logdata['exc'] = exc
|
||||
log('W3315', logdata)
|
||||
@ -100,9 +107,10 @@ class Files_cp:
|
||||
|
||||
def _create_action(self):
|
||||
for fromFile in self.fromPathFiles:
|
||||
targetFile = None
|
||||
try:
|
||||
targetFile = self.get_target_file(self.targetPath, fromFile)
|
||||
if not targetFile.exists():
|
||||
if not targetFile and not targetFile.exists():
|
||||
self.copy_target_file(targetFile, fromFile)
|
||||
if self.username:
|
||||
shutil.chown(targetFile, self.username)
|
||||
@ -166,27 +174,34 @@ class Files_cp:
|
||||
return False
|
||||
|
||||
def get_list_files(self):
|
||||
ls_all_files = list()
|
||||
logdata = dict()
|
||||
logdata['targetPath'] = str(self.targetPath)
|
||||
fromPathSplit = self.fromPath.split('/')
|
||||
if not self.is_pattern(self.fromPath.name):
|
||||
ls_all_files.append(self.fromPath)
|
||||
fromFilePath = Path(self.fromPath)
|
||||
if not self.is_pattern(fromFilePath.name):
|
||||
self.fromPathFiles.append(self.fromPath)
|
||||
else:
|
||||
exact_path = self.fromPath.parent
|
||||
ls_files = self.file_cache.get_ls_smbdir(exact_path)
|
||||
filtered_ls_files = fnmatch.filter(ls_files, self.fromPath.name)
|
||||
if filtered_ls_files:
|
||||
ls_all_files = [exact_path.joinpath(file_s) for file_s in filtered_ls_files]
|
||||
else:
|
||||
fromPathDir = self.fromPath[:self.fromPath.rfind('/')]
|
||||
|
||||
try:
|
||||
uri_path = UNCPath(fromPathDir)
|
||||
ls_files = self.file_cache.get_ls_smbdir(fromPathDir)
|
||||
if ls_files:
|
||||
filtered_ls_files = fnmatch.filter(ls_files, fromFilePath.name)
|
||||
if filtered_ls_files:
|
||||
self.fromPathFiles = [fromPathDir + '/' + file_s for file_s in filtered_ls_files]
|
||||
except NotUNCPathError as exc:
|
||||
try:
|
||||
exact_path = Path(fromPathDir)
|
||||
if exact_path.is_dir():
|
||||
ls_all_files = [fromFile for fromFile in exact_path.iterdir() if fromFile.is_file()]
|
||||
self.fromPathFiles = [str(fromFile) for fromFile in exact_path.iterdir() if fromFile.is_file()]
|
||||
except Exception as exc:
|
||||
logdata['fromPath'] = self.fromPath
|
||||
logdata['exc'] = exc
|
||||
log('W13', logdata)
|
||||
return ls_all_files
|
||||
log('W3316', logdata)
|
||||
except Exception as exc:
|
||||
logdata['fromPath'] = self.fromPath
|
||||
logdata['exc'] = exc
|
||||
log('W3317', logdata)
|
||||
|
||||
def check_target_path(path_to_check, username = None):
|
||||
'''
|
||||
@ -200,4 +215,4 @@ def check_target_path(path_to_check, username = None):
|
||||
if username:
|
||||
rootpath = Path(get_homedir(username))
|
||||
|
||||
return checking.joinpath(rootpath)
|
||||
return rootpath.joinpath(checking)
|
||||
|
@ -49,7 +49,7 @@ class fs_file_cache:
|
||||
uri_path.get_domain(),
|
||||
file_path))
|
||||
else:
|
||||
destdir = destfile.parent()
|
||||
destdir = destfile.parent
|
||||
except Exception as exc:
|
||||
logdata = dict({'exception': str(exc)})
|
||||
log('D144', logdata)
|
||||
|
Loading…
x
Reference in New Issue
Block a user