1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-03-21 18:50:38 +03:00

appliers/file_cp.py: refactor copy files algorithms

This commit is contained in:
Evgeny Sinelnikov 2022-12-11 11:14:05 +04:00 committed by Valery Sinelnikov
parent 9dc833a970
commit 87d873862a
2 changed files with 56 additions and 71 deletions

View File

@ -28,6 +28,7 @@ from pathlib import Path
from util.windows import expand_windows_var
from util.util import get_homedir
from util.exceptions import NotUNCPathError
from util.paths import UNCPath
import fnmatch
class Files_cp:
@ -40,7 +41,7 @@ class Files_cp:
self.fromPath = (expand_windows_var(file_obj.fromPath, username).replace('\\', '/')
if file_obj.fromPath else None)
self.isTargetPathDirectory = False
if self.targetPath[-1] == '/' or self.is_pattern(Path(self.fromPath).name):
if targetPath[-1] == '/' or self.is_pattern(Path(self.fromPath).name):
self.isTargetPathDirectory = True
self.action = action_letter2enum(file_obj.action)
self.readOnly = str2bool(file_obj.readOnly)
@ -52,7 +53,7 @@ class Files_cp:
self.fromPathFiles = self.get_list_files()
self.act()
def get_target_file(self, targetPath:Path, fromFile:Path) -> Path:
def get_target_file(self, targetPath:Path, fromFile:str) -> Path:
try:
if fromFile:
if self.isTargetPathDirectory:
@ -61,19 +62,35 @@ class Files_cp:
targetPath.parent.mkdir(parents = True, exist_ok = True)
targetPath = targetPath.parent
if self.hidden:
return targetPath.joinpath('.' + fromFile.name)
return targetPath.joinpath('.' + Path(fromFile).name)
else:
return targetPath.joinpath(fromFile.name)
return targetPath.joinpath(Path(fromFile).name)
else:
if not self.hidden:
return targetPath
else:
return targetPath.parent.joinpath('.' + targetPath.name)
except Exception as exc:
logdata = dict({'exc': exc})
log('D163', logdata)
logdata['targetPath'] = targetPath
logdata['fromFile'] = fromFile
logdata['exc'] = exc
log('W3314', logdata)
def copy_target_file(self, targetFile:Path, fromFile:str):
try:
uri_path = UNCPath(self.fromPath)
self.file_cache.store(fromFile, targetPath)
except NotUNCPathError as exc:
fromFilePath = Path(fromFile)
if fromFilePath.exists():
targetFile.write_bytes(fromFilePath.read_bytes())
except Exception as exc:
logdata['targetPath'] = targetPath
logdata['fromFile'] = fromFile
logdata['exc'] = exc
log('W3315', logdata)
# log('D163', logdata)
def set_read_only(self, targetFile):
if self.readOnly:
@ -86,7 +103,7 @@ class Files_cp:
try:
targetFile = self.get_target_file(self.targetPath, fromFile)
if not targetFile.exists():
targetFile.write_bytes(fromFile.read_bytes())
self.copy_target_file(targetFile, fromFile)
if self.username:
shutil.chown(targetFile, self.username)
self.set_read_only(targetFile)
@ -119,7 +136,7 @@ class Files_cp:
for fromPath in self.fromPathFiles:
targetFile = self.get_target_file(self.targetPath, fromPath)
try:
targetFile.write_bytes(fromPath.read_bytes())
self.copy_target_file(targetFile, fromFile)
if self.username:
shutil.chown(self.targetPath, self.username)
self.set_read_only(targetFile)
@ -151,71 +168,36 @@ class Files_cp:
def get_list_files(self):
ls_all_files = list()
logdata = dict()
logdata['targetPath'] = self.targetPath
logdata['targetPath'] = str(self.targetPath)
fromPathSplit = self.fromPath.split('/')
pattern = self.is_pattern(fromPathSplit[-1])
if self.fromPath and not pattern:
try:
self.file_cache.store(self.fromPath)
fromPath = Path(self.file_cache.get(self.fromPath))
ls_all_files.append(fromPath)
except NotUNCPathError as exc:
fromPath = Path(self.fromPath)
if fromPath.exists():
ls_all_files.append(fromPath)
except Exception as exc:
logdata['fromPath'] = self.fromPath
logdata['exc'] = exc
log('W13', logdata)
elif self.fromPath and len(fromPathSplit) > 2:
exact_path = '/'.join(fromPathSplit[:-1])
if not self.is_pattern(self.fromPath.name):
ls_all_files.append(self.fromPath)
else:
exact_path = self.fromPath.parent
ls_files = self.file_cache.get_ls_smbdir(exact_path)
filtered_ls_files = fnmatch.filter(ls_files, fromPathSplit[-1])
filtered_ls_files = fnmatch.filter(ls_files, self.fromPath.name)
if filtered_ls_files:
ls_from_paths = [exact_path +'/'+ file_s for file_s in filtered_ls_files]
for from_path in ls_from_paths:
try:
self.file_cache.store(from_path)
fromPath = Path(self.file_cache.get(from_path))
ls_all_files.append(fromPath)
except Exception as exc:
logdata['fromPath'] = self.fromPath
logdata['exc'] = exc
log('W13', logdata)
ls_all_files = [exact_path.joinpath(file_s) for file_s in filtered_ls_files]
else:
try:
fromLocalPath = Path(exact_path)
if fromLocalPath.is_dir():
ls = [fromFile for fromFile in fromLocalPath.iterdir() if fromFile.is_file()]
for fromPath in ls:
ls_all_files.append(fromPath)
if exact_path.is_dir():
ls_all_files = [fromFile for fromFile in exact_path.iterdir() if fromFile.is_file()]
except Exception as exc:
logdata['fromPath'] = self.fromPath
logdata['exc'] = exc
log('W13', logdata)
else:
fromPath = Path(self.fromPath) if self.fromPath else None
ls_all_files.append(fromPath)
return ls_all_files
def check_target_path(path_to_check, username = None):
'''
Function for checking the correctness of the path
'''
if not path_to_check:
return None
checking = Path(path_to_check)
if checking.is_dir():
if username and path_to_check == '/':
return Path(get_homedir(username))
return checking
#Check for path directory without '/something' suffix
elif (len(path_to_check.split('/')) > 2
and Path(path_to_check.replace(path_to_check.split('/')[-1], '')).is_dir()):
return checking
elif username:
target_path = Path(get_homedir(username))
res = target_path.joinpath(path_to_check
if path_to_check[0] != '/'
else path_to_check[1:])
return res
else:
return False
rootpath = Path('/')
if username:
rootpath = Path(get_homedir(username))
return checking.joinpath(rootpath)

View File

@ -39,15 +39,17 @@ class fs_file_cache:
self.samba_context = smbc.Context(use_kerberos=1)
#, debug=10)
def store(self, uri):
destdir = uri
def store(self, uri, destfile = None):
try:
uri_path = UNCPath(uri)
file_name = os.path.basename(uri_path.get_path())
file_path = os.path.dirname(uri_path.get_path())
destdir = Path('{}/{}/{}'.format(self.storage_uri,
uri_path.get_domain(),
file_path))
if not destfile:
file_name = os.path.basename(uri_path.get_path())
file_path = os.path.dirname(uri_path.get_path())
destdir = Path('{}/{}/{}'.format(self.storage_uri,
uri_path.get_domain(),
file_path))
else:
destdir = destfile.parent()
except Exception as exc:
logdata = dict({'exception': str(exc)})
log('D144', logdata)
@ -56,9 +58,10 @@ class fs_file_cache:
if not destdir.exists():
destdir.mkdir(parents=True, exist_ok=True)
destfile = Path('{}/{}/{}'.format(self.storage_uri,
uri_path.get_domain(),
uri_path.get_path()))
if not destfile:
destfile = Path('{}/{}/{}'.format(self.storage_uri,
uri_path.get_domain(),
uri_path.get_path()))
with open(destfile, 'wb') as df:
df.truncate()