1
0
mirror of https://github.com/dkmstr/openuds.git synced 2025-01-12 09:17:56 +03:00

Adding type checking on fuse before proceding to use it

This commit is contained in:
Adolfo Gómez García 2021-11-01 13:49:02 +01:00
parent a957e368e7
commit 398837c20f
6 changed files with 520 additions and 408 deletions

View File

@ -13,6 +13,8 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# Modified to add type checking, fix bugs, etc.. by dkmaster@dkmon.com
import sys
import os
import ctypes
@ -498,7 +500,7 @@ else:
]
class fuse_context(ctypes.Structure):
class FuseContext(ctypes.Structure):
_fields_ = [
('fuse', ctypes.c_voidp), # type: ignore
('uid', c_uid_t),
@ -508,10 +510,10 @@ class fuse_context(ctypes.Structure):
]
_libfuse.fuse_get_context.restype = ctypes.POINTER(fuse_context)
_libfuse.fuse_get_context.restype = ctypes.POINTER(FuseContext)
class fuse_operations(ctypes.Structure):
class FuseOperations(ctypes.Structure):
_fields_ = [
(
'getattr',
@ -710,25 +712,18 @@ class fuse_operations(ctypes.Structure):
]
def time_of_timespec(ts, use_ns=False):
if use_ns:
return ts.tv_sec * 10 ** 9 + ts.tv_nsec
else:
return ts.tv_sec + ts.tv_nsec / 1e9
def time_of_timespec(ts: c_timespec, use_ns=False):
return ts.tv_sec * 10 ** 9 + ts.tv_nsec
def set_st_attrs(st, attrs, use_ns=False):
def set_st_attrs(st: c_stat, attrs: typing.Mapping[str, int]) -> None:
for key, val in attrs.items():
if key in ('st_atime', 'st_mtime', 'st_ctime', 'st_birthtime'):
timespec = getattr(st, key + 'spec', None)
timespec: typing.Optional[c_timespec] = getattr(st, key + 'spec', None)
if timespec is None:
continue
if use_ns:
timespec.tv_sec, timespec.tv_nsec = divmod(int(val), 10 ** 9)
else:
timespec.tv_sec = int(val)
timespec.tv_nsec = int((val - timespec.tv_sec) * 1e9)
timespec.tv_sec, timespec.tv_nsec = divmod(int(val), 10 ** 9)
elif hasattr(st, key):
setattr(st, key, val)
@ -773,8 +768,14 @@ class FUSE:
)
def __init__(
self, operations, mountpoint, raw_fi=False, encoding='utf-8', **kwargs
):
self,
operations: 'Operations',
mountpoint: str,
*,
raw_fi: bool = False,
encoding: typing.Optional[str] = None,
**kwargs,
) -> None:
'''
Setting raw_fi to True will cause FUSE to pass the fuse_file_info
@ -782,54 +783,43 @@ class FUSE:
This gives you access to direct_io, keep_cache, etc.
'''
encoding = encoding or 'utf-8'
self.operations = operations
self.raw_fi = raw_fi
self.encoding = encoding
self.__critical_exception: BaseException = Exception()
self.use_ns = getattr(operations, 'use_ns', False)
if not self.use_ns:
warnings.warn(
'Time as floating point seconds for utimens is deprecated!\n'
'To enable time as nanoseconds set the property "use_ns" to '
'True in your operations class or set your fusepy '
'requirements to <4.',
DeprecationWarning,
)
# string arguments
sargs: typing.List[str] = ['fuse']
args = ['fuse']
args.extend(flag for arg, flag in self.OPTIONS if kwargs.pop(arg, False))
# Convert options to fuse flags
sargs.extend(flag for arg, flag in self.OPTIONS if kwargs.pop(arg, False))
kwargs.setdefault('fsname', operations.__class__.__name__)
args.append('-o')
args.append(','.join(self._normalize_fuse_options(**kwargs)))
args.append(mountpoint)
sargs.append('-o')
sargs.append(','.join(FUSE._normalize_fuse_options(**kwargs)))
sargs.append(mountpoint)
args = [arg.encode(encoding) for arg in args]
args: typing.List[bytes] = [arg.encode(encoding) for arg in sargs]
argv = (ctypes.c_char_p * len(args))(*args)
fuse_ops = fuse_operations()
for ent in fuse_operations._fields_:
fuse_ops = FuseOperations()
for ent in FuseOperations._fields_:
name, prototype = ent[:2]
check_name = name
# ftruncate()/fgetattr() are implemented in terms of their
# non-f-prefixed versions in the operations object
if check_name in ["ftruncate", "fgetattr"]:
if check_name in ['ftruncate', 'fgetattr']:
check_name = check_name[1:]
val = getattr(operations, check_name, None)
if val is None:
continue
# Function pointer members are tested for using the
# getattr(operations, name) above but are dynamically
# invoked using self.operations(name)
if hasattr(prototype, 'argtypes'):
val = prototype(partial(self._wrapper, getattr(self, name)))
val = prototype(partial(FUSE._wrapper, getattr(self, name)))
setattr(fuse_ops, name, val)
@ -855,16 +845,16 @@ class FUSE:
raise RuntimeError(err)
@staticmethod
def _normalize_fuse_options(**kargs):
def _normalize_fuse_options(**kargs) -> typing.Generator[str, None, None]:
for key, value in kargs.items():
if isinstance(value, bool):
if value is True:
yield key
else:
yield '%s=%s' % (key, value)
yield f'{key}={value}'
@staticmethod
def _wrapper(func, *args, **kwargs):
def _wrapper(func: typing.Callable, *args, **kwargs) -> int:
'Decorator for the methods that follow'
try:
@ -884,7 +874,6 @@ class FUSE:
func.__name__,
type(e),
e.errno,
exc_info=True,
)
return -e.errno
else:
@ -920,106 +909,114 @@ class FUSE:
fuse_exit()
return -errno.EFAULT
def _decode_optional_path(self, path):
def _decode_optional_path(
self, path: typing.Optional[bytes]
) -> typing.Optional[str]:
# NB: this method is intended for fuse operations that
# allow the path argument to be NULL,
# *not* as a generic path decoding method
if path is None:
return None
return path.decode(self.encoding)
return path.decode(self.encoding) if path else None
def getattr(self, path, buf):
def getattr(self, path: bytes, buf: typing.Any) -> None:
return self.fgetattr(path, buf, None)
def readlink(self, path, buf, bufsize):
ret = self.operations('readlink', path.decode(self.encoding)).encode(
self.encoding
)
def readlink(self, path: bytes, buf: typing.Any, bufsize: int) -> None:
ret = self.operations.readlink(path.decode(self.encoding)).encode(self.encoding)
# copies a string into the given buffer
# (null terminated and truncated if necessary)
data = ctypes.create_string_buffer(ret[: bufsize - 1])
ctypes.memmove(buf, data, len(data))
return 0
def mknod(self, path, mode, dev):
return self.operations('mknod', path.decode(self.encoding), mode, dev)
def mknod(self, path: bytes, mode: int, dev: typing.Any) -> int:
return self.operations.mknod(path.decode(self.encoding), mode, dev)
def mkdir(self, path, mode):
return self.operations('mkdir', path.decode(self.encoding), mode)
def mkdir(self, path: bytes, mode: int) -> None:
return self.operations.mkdir(path.decode(self.encoding), mode)
def unlink(self, path):
return self.operations('unlink', path.decode(self.encoding))
def unlink(self, path: bytes) -> None:
return self.operations.unlink(path.decode(self.encoding))
def rmdir(self, path):
return self.operations('rmdir', path.decode(self.encoding))
def rmdir(self, path: bytes) -> None:
return self.operations.rmdir(path.decode(self.encoding))
def symlink(self, source, target):
def symlink(self, source: bytes, target: bytes) -> None:
'creates a symlink `target -> source` (e.g. ln -s source target)'
return self.operations(
'symlink', target.decode(self.encoding), source.decode(self.encoding)
return self.operations.symlink(
target.decode(self.encoding), source.decode(self.encoding)
)
def rename(self, old, new):
return self.operations(
'rename', old.decode(self.encoding), new.decode(self.encoding)
def rename(self, old: bytes, new: bytes) -> None:
return self.operations.rename(
old.decode(self.encoding), new.decode(self.encoding)
)
def link(self, source, target):
def link(self, source: bytes, target: bytes) -> None:
'creates a hard link `target -> source` (e.g. ln source target)'
return self.operations(
'link', target.decode(self.encoding), source.decode(self.encoding)
return self.operations.link(
target.decode(self.encoding), source.decode(self.encoding)
)
def chmod(self, path, mode):
return self.operations('chmod', path.decode(self.encoding), mode)
def chmod(self, path: bytes, mode: int):
return self.operations.chmod(path.decode(self.encoding), mode)
def chown(self, path, uid, gid):
def chown(self, path: bytes, uid: int, gid: int) -> None:
# Check if any of the arguments is a -1 that has overflowed
if c_uid_t(uid + 1).value == 0:
uid = -1
if c_gid_t(gid + 1).value == 0:
gid = -1
return self.operations('chown', path.decode(self.encoding), uid, gid)
return self.operations.chown(path.decode(self.encoding), uid, gid)
def truncate(self, path, length):
return self.operations('truncate', path.decode(self.encoding), length)
def truncate(self, path: bytes, length: int) -> None:
return self.operations.truncate(path.decode(self.encoding), length)
def open(self, path, fip):
def open(self, path: bytes, fip: typing.Any) -> None:
fi = fip.contents
if self.raw_fi:
return self.operations('open', path.decode(self.encoding), fi)
self.operations.open(path.decode(self.encoding), fi)
else:
fi.fh = self.operations('open', path.decode(self.encoding), fi.flags)
fi.fh = self.operations.open(path.decode(self.encoding), fi.flags)
return 0
def read(self, path, buf, size, offset, fip):
def read(
self, path: bytes, buf: typing.Any, size: int, offset: int, fip: typing.Any
):
# fip is a pointer to a struct fuse_file_info
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
ret = self.operations(
'read', self._decode_optional_path(path), size, offset, fh
)
ret = self.operations.read(self._decode_optional_path(path), size, offset, fh)
if not ret:
return 0
retsize = len(ret)
assert retsize <= size, 'actual amount read %d greater than expected %d' % (
retsize,
size,
)
if retsize > size:
raise RuntimeError(
"read too much data ({} bytes, expected {})".format(
retsize, size
)
)
ctypes.memmove(buf, ret, retsize)
return retsize
def write(self, path, buf, size, offset, fip):
def write(
self,
path: typing.Optional[bytes],
buf: typing.Any,
size: int,
offset: int,
fip: typing.Any,
) -> int:
# fip is a pointer to a struct fuse_file_info
# buf is a char*
data = ctypes.string_at(buf, size)
if self.raw_fi:
@ -1027,57 +1024,62 @@ class FUSE:
else:
fh = fip.contents.fh
return self.operations(
'write', self._decode_optional_path(path), data, offset, fh
)
return self.operations.write(self._decode_optional_path(path), data, offset, fh)
def statfs(self, path, buf):
def statfs(self, path: bytes, buf: typing.Any) -> None:
stv = buf.contents
attrs = self.operations('statfs', path.decode(self.encoding))
attrs = self.operations.statfs(path.decode(self.encoding))
for key, val in attrs.items():
if hasattr(stv, key):
setattr(stv, key, val)
return 0
def flush(self, path, fip):
def flush(self, path: typing.Optional[bytes], fip: typing.Any) -> None:
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
return self.operations('flush', self._decode_optional_path(path), fh)
self.operations.flush(self._decode_optional_path(path), fh)
def release(self, path, fip):
def release(self, path: typing.Optional[bytes], fip: typing.Any) -> None:
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
return self.operations('release', self._decode_optional_path(path), fh)
return self.operations.release(self._decode_optional_path(path), fh)
def fsync(self, path, datasync, fip):
def fsync(self, path: typing.Optional[bytes], datasync: bool, fip: typing.Any):
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
return self.operations('fsync', self._decode_optional_path(path), datasync, fh)
return self.operations.fsync(self._decode_optional_path(path), datasync, fh)
def setxattr(self, path, name, value, size, options, *args):
return self.operations(
'setxattr',
def setxattr(
self,
path: bytes,
name: bytes,
value: typing.Any,
size: int,
options: int,
*args,
) -> None:
return self.operations.setxattr(
path.decode(self.encoding),
name.decode(self.encoding),
ctypes.string_at(value, size),
ctypes.string_at(value, size).decode(self.encoding),
options,
*args
*args,
)
def getxattr(self, path, name, value, size, *args):
ret = self.operations(
'getxattr', path.decode(self.encoding), name.decode(self.encoding), *args
)
def getxattr(
self, path: bytes, name: bytes, value: typing.Any, size: int, *args
) -> int:
ret = self.operations.getxattr(
path.decode(self.encoding), name.decode(self.encoding), *args
).encode(self.encoding)
retsize = len(ret)
# allow size queries
@ -1094,8 +1096,8 @@ class FUSE:
return retsize
def listxattr(self, path, namebuf, size):
attrs = self.operations('listxattr', path.decode(self.encoding)) or ''
def listxattr(self, path: bytes, namebuf: typing.Any, size: int) -> int:
attrs = self.operations.listxattr(path.decode(self.encoding)) or []
ret = '\x00'.join(attrs).encode(self.encoding)
if len(ret) > 0:
ret += '\x00'.encode(self.encoding)
@ -1114,81 +1116,82 @@ class FUSE:
return retsize
def removexattr(self, path, name):
return self.operations(
'removexattr', path.decode(self.encoding), name.decode(self.encoding)
def removexattr(self, path: bytes, name: bytes) -> None:
return self.operations.removexattr(
path.decode(self.encoding), name.decode(self.encoding)
)
def opendir(self, path, fip):
def opendir(self, path: bytes, fip: typing.Any) -> None:
# Ignore raw_fi
fip.contents.fh = self.operations('opendir', path.decode(self.encoding))
fip.contents.fh = self.operations.opendir(path.decode(self.encoding))
return 0
def readdir(self, path, buf, filler, offset, fip):
def readdir(
self,
path: bytes,
buf: typing.Any,
filler: typing.Any,
offset: int,
fip: typing.Any,
) -> None:
# Ignore raw_fi
for item in self.operations(
'readdir', self._decode_optional_path(path), fip.contents.fh
for item in self.operations.readdir(
path.decode(self.encoding), fip.contents.fh
):
if isinstance(item, str):
name, st, offset = item, None, 0
else:
name, attrs, offset = item
if attrs:
st = c_stat()
set_st_attrs(st, attrs, use_ns=self.use_ns)
set_st_attrs(st, attrs)
else:
st = None
if filler(buf, name.encode(self.encoding), st, offset) != 0:
break
return 0
def releasedir(self, path, fip):
def releasedir(self, path: typing.Optional[bytes], fip: typing.Any):
# Ignore raw_fi
return self.operations(
'releasedir', self._decode_optional_path(path), fip.contents.fh
)
return self.operations.releasedir(self._decode_optional_path(path), fip.contents.fh)
def fsyncdir(self, path, datasync, fip):
def fsyncdir(self, path: typing.Optional[bytes], datasync: bool, fip: typing.Any):
# Ignore raw_fi
return self.operations(
'fsyncdir', self._decode_optional_path(path), datasync, fip.contents.fh
)
return self.operations.fsyncdir(self._decode_optional_path(path), datasync, fip.contents.fh)
def init(self, conn):
return self.operations('init', '/')
def init(self, conn: typing.Any) -> None:
return self.operations.init('/')
def destroy(self, private_data):
return self.operations('destroy', '/')
def destroy(self, private_data: typing.Any) -> None:
return self.operations.destroy('/')
def access(self, path, amode):
return self.operations('access', path.decode(self.encoding), amode)
def access(self, path:bytes, amode: int) -> None:
return self.operations.access(path.decode(self.encoding), amode)
def create(self, path, mode, fip):
def create(self, path: bytes, mode: int, fip: typing.Any) -> None:
fi = fip.contents
path = path.decode(self.encoding)
if self.raw_fi:
return self.operations('create', path, mode, fi)
self.operations.create(path.decode(self.encoding), mode, fi)
else:
fi.fh = self.operations('create', path, mode)
return 0
fi.fh = self.operations.create(path.decode(self.encoding), mode, None)
def ftruncate(self, path, length, fip):
def ftruncate(
self, path: typing.Optional[bytes], length: int, fip: typing.Any
) -> int:
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
return self.operations('truncate', self._decode_optional_path(path), length, fh)
self.operations.truncate(self._decode_optional_path(path), length, fh)
return 0
def fgetattr(self, path, buf, fip):
def fgetattr(
self, path: typing.Optional[bytes], buf: typing.Any, fip: typing.Any
) -> None:
ctypes.memset(buf, 0, ctypes.sizeof(c_stat))
st = buf.contents
st: c_stat = buf.contents
if not fip:
fh = fip
elif self.raw_fi:
@ -1196,39 +1199,55 @@ class FUSE:
else:
fh = fip.contents.fh
attrs = self.operations('getattr', self._decode_optional_path(path), fh)
set_st_attrs(st, attrs, use_ns=self.use_ns)
return 0
attrs = self.operations.getattr(self._decode_optional_path(path), fh)
set_st_attrs(st, attrs)
def lock(self, path, fip, cmd, lock):
def lock(
self,
path: typing.Optional[bytes],
fip: typing.Any,
cmd: bytes,
lock: typing.Any,
):
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
return self.operations('lock', self._decode_optional_path(path), fh, cmd, lock)
return self.operations.lock(self._decode_optional_path(path), fh, cmd, lock)
def utimens(self, path, buf):
if buf:
atime = time_of_timespec(buf.contents.actime, use_ns=self.use_ns)
mtime = time_of_timespec(buf.contents.modtime, use_ns=self.use_ns)
times = (atime, mtime)
else:
times = None
def utimens(self, path: bytes, buf: typing.Any) -> int:
times: typing.Optional[typing.Tuple[int, int]] = None
times = (
(
time_of_timespec(buf.contents.actime),
time_of_timespec(buf.contents.modtime),
)
if buf
else None
)
return self.operations('utimens', path.decode(self.encoding), times)
return self.operations.utimens(path.decode(self.encoding), times)
def bmap(self, path: bytes, blocksize: int, idx: int):
return self.operations('bmap', path.decode(self.encoding), blocksize, idx)
def bmap(self, path: bytes, blocksize: int, idx: int) -> int:
return self.operations.bmap(path.decode(self.encoding), blocksize, idx)
def ioctl(self, path, cmd, arg, fip, flags, data):
def ioctl(
self,
path: bytes,
cmd: bytes,
arg: bytes,
fip: typing.Any,
flags: int,
data: bytes,
):
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
return self.operations(
'ioctl', path.decode(self.encoding), cmd, arg, fh, flags, data
return self.operations.ioctl(
path.decode(self.encoding), cmd, arg, fh, flags, data
)
@ -1242,56 +1261,56 @@ class Operations:
or the corresponding system call man page.
'''
def __call__(self, op, *args) -> typing.Any:
if not hasattr(self, op):
def __call__(self, op: str, *args) -> typing.Any:
try:
return getattr(self, op)(*args)
except AttributeError:
raise FuseOSError(errno.EFAULT)
return getattr(self, op)(*args)
def access(self, path: str, amode: int) -> int:
def access(self, path: str, amode: int) -> None:
return
def bmap(self, path: str, blocksize: int, idx: int) -> int:
return 0
bmap = None
def chmod(self, path: str, mode: int) -> int:
def chmod(self, path: str, mode: int) -> None:
raise FuseOSError(errno.EROFS)
def chown(self, path: str, uid: int, gid: int) -> None:
raise FuseOSError(errno.EROFS)
def create(self, path: str, mode: int, fi: typing.Any = None):
def create(self, path: str, mode: int, fi: typing.Any) -> int:
'''
When raw_fi is False (default case), fi is None and create should
return a numerical file handle.
When raw_fi is True the file handle should be set directly by create
and return 0.
and return value is not used.
'''
raise FuseOSError(errno.EROFS)
def destroy(self, path: str) -> None:
'Called on filesystem destruction. Path is always /'
pass
def flush(self, path: str, fh: typing.Any) -> int:
return 0
def flush(self, path: typing.Optional[str], fh: typing.Any) -> None:
pass
def fsync(self, path: str, datasync: int, fh: typing.Any) -> int:
return 0
def fsync(self, path: typing.Optional[str], datasync: bool, fh: typing.Any) -> None:
pass
def fsyncdir(self, path: str, datasync: int, fh: typing.Any) -> int:
return 0
def fsyncdir(
self, path: typing.Optional[str], datasync: bool, fh: typing.Any
) -> None:
pass
def getattr(
self, path: str, fh: typing.Any = None
) -> typing.Dict[str, typing.Union[int, float]]:
self, path: typing.Optional[str], fh: typing.Any = None
) -> typing.Dict[str, int]:
'''
Returns a dictionary with keys identical to the stat C structure of
stat(2).
st_atime, st_mtime and st_ctime should be floats.
NOTE: There is an incompatibility between Linux and Mac OS X
concerning st_nlink of directories. Mac OS X counts all files inside
the directory, while Linux counts only the subdirectories.
@ -1301,9 +1320,7 @@ class Operations:
raise FuseOSError(errno.ENOENT)
return dict(st_mode=(S_IFDIR | 0o755), st_nlink=2)
def getxattr(
self, path: str, name: str, position: int = 0
) -> typing.Dict[str, typing.Union[int, float]]:
def getxattr(self, path: str, name: str, position: int = 0) -> str:
raise FuseOSError(ENOTSUP)
def init(self, path: str) -> None:
@ -1314,10 +1331,12 @@ class Operations:
'''
pass
def ioctl(self, path: str, cmd: str, arg: str, fip: int, flags: int, data: bytes) -> int:
def ioctl(
self, path: str, cmd: bytes, arg: bytes, fip: int, flags: int, data: bytes
) -> int:
raise FuseOSError(errno.ENOTTY)
def link(self, target: str, source: str):
def link(self, target: str, source: str) -> None:
'creates a hard link `target -> source` (e.g. ln source target)'
raise FuseOSError(errno.EROFS)
@ -1325,12 +1344,15 @@ class Operations:
def listxattr(self, path: str) -> typing.List[str]:
return []
lock = None
def lock(
self, path: typing.Optional[str], fh: typing.Any, cmd: bytes, lock: typing.Any
) -> int:
return 0
def mkdir(self, path: str, mode: int) -> None:
raise FuseOSError(errno.EROFS)
def mknod(self, path: str, mode: int, dev: int) -> None:
def mknod(self, path: str, mode: int, dev: int) -> int:
raise FuseOSError(errno.EROFS)
def open(self, path: str, flags: int) -> int:
@ -1341,22 +1363,26 @@ class Operations:
When raw_fi is True the signature of open becomes:
open(self, path, fi)
and the file handle should be set directly.
and the file handle should be set directly. In this case, the return value is ignored.
'''
return 0
def opendir(self, path: str) -> int:
'Returns a numerical file handle.'
return 0
def read(self, path: str, size: int, offset: int, fh: int) -> bytes:
def read(
self, path: typing.Optional[str], size: int, offset: int, fh: typing.Any
) -> bytes:
'Returns a string containing the data requested.'
raise FuseOSError(errno.EIO)
def readdir(self, path, fh):
def readdir(
self, path: str, fh: typing.Any
) -> typing.Union[
typing.List[str], typing.List[typing.Tuple[str, typing.Dict[str, int], int]]
]:
'''
Can return either a list of names, or a list of (name, attrs, offset)
tuples. attrs is a dict as in getattr.
@ -1364,28 +1390,30 @@ class Operations:
return ['.', '..']
def readlink(self, path):
def readlink(self, path: str) -> str:
raise FuseOSError(errno.ENOENT)
def release(self, path, fh):
return 0
def release(self, path: typing.Optional[str], fh: typing.Any) -> None:
pass
def releasedir(self, path, fh):
return 0
def releasedir(self, path: typing.Optional[str], fh: typing.Any) -> None:
pass
def removexattr(self, path, name):
def removexattr(self, path: str, name: str) -> None:
raise FuseOSError(ENOTSUP)
def rename(self, old, new):
def rename(self, old: str, new: str) -> None:
raise FuseOSError(errno.EROFS)
def rmdir(self, path):
def rmdir(self, path: str) -> None:
raise FuseOSError(errno.EROFS)
def setxattr(self, path, name, value, options, position=0):
def setxattr(
self, path: str, name: str, value: str, options: int, position: int = 0
) -> None:
raise FuseOSError(ENOTSUP)
def statfs(self, path):
def statfs(self, path: str) -> typing.Dict[str, typing.Union[int, float]]:
'''
Returns a dictionary with keys identical to the statvfs C structure of
statvfs(3).
@ -1396,37 +1424,27 @@ class Operations:
return {}
def symlink(self, target, source):
def symlink(self, target: str, source: str) -> None:
'creates a symlink `target -> source` (e.g. ln -s source target)'
raise FuseOSError(errno.EROFS)
def truncate(self, path, length, fh: typing.Any=None):
def truncate(
self, path: typing.Optional[str], length: int, fh: typing.Any = None
) -> None:
raise FuseOSError(errno.EROFS)
def unlink(self, path):
def unlink(self, path: str) -> None:
raise FuseOSError(errno.EROFS)
def utimens(self, path, times=None):
def utimens(
self, path, times: typing.Optional[typing.Tuple[float, float]] = None
) -> int:
'Times is a (atime, mtime) tuple. If None use current time.'
return 0
def write(self, path, data, offset, fh):
def write(
self, path: typing.Optional[str], data: bytes, offset: int, fh: typing.Any
) -> int:
raise FuseOSError(errno.EROFS)
class LoggingMixIn:
log = logging.getLogger('fuse.log-mixin')
def __call__(self, op, path, *args):
self.log.debug('-> %s %s %s', op, path, repr(args))
ret = '[Unhandled Exception]'
try:
ret = getattr(self, op)(path, *args)
return ret
except OSError as e:
ret = str(e)
raise
finally:
self.log.debug('<- %s %s', op, repr(ret))

View File

@ -146,6 +146,22 @@ __transDict: typing.Mapping[typing.Type['models.Model'], int] = {
# OT_OSMANAGER_RELEASE: -> On OsManager
# (servicepool_uuid, '', userservice_uuid)
# Helpers
# get owner by type and id
def getOwner(ownerType: int, ownerId: int) -> typing.Optional['models.Model']:
if ownerType == OT_PROVIDER:
return Provider.objects.get(pk=ownerId)
elif ownerType == OT_SERVICE:
return Service.objects.get(pk=ownerId)
elif ownerType == OT_DEPLOYED:
return ServicePool.objects.get(pk=ownerId)
elif ownerType == OT_AUTHENTICATOR:
return Authenticator.objects.get(pk=ownerId)
elif ownerType == OT_OSMANAGER:
return OSManager.objects.get(pk=ownerId)
else:
return None
class EventTupleType(typing.NamedTuple):
stamp: datetime.datetime
fld1: str

View File

@ -1,173 +1,2 @@
#!/usr/bin/env python
import logging
import typing
from collections import defaultdict
from errno import ENOENT
from stat import S_IFDIR, S_IFLNK, S_IFREG
from time import time
from django.core.management.base import BaseCommand
from uds import models
from uds.core.util.fuse import FUSE, FuseOSError, Operations
logger = logging.getLogger(__name__)
class UDSFS(Operations):
'Example memory filesystem. Supports only one level of files.'
def __init__(self):
self.files = {}
self.data = defaultdict(bytes)
self.fd = 0
now = time()
self.files['/'] = dict(
st_mode=(S_IFDIR | 0o755),
st_ctime=now,
st_mtime=now,
st_atime=now,
st_nlink=2,
)
def chmod(self, path: str, mode: int) -> int:
logger.debug('CHMOD: %s %s', path, mode)
self.files[path]['st_mode'] &= 0o770000
self.files[path]['st_mode'] |= mode
return 0
def chown(self, path: str, uid: int, gid: int) -> None:
self.files[path]['st_uid'] = uid
self.files[path]['st_gid'] = gid
def create(self, path: str, mode: int, fi: typing.Any = None) -> int:
self.files[path] = dict(
st_mode=(S_IFREG | mode),
st_nlink=1,
st_size=0,
st_ctime=time(),
st_mtime=time(),
st_atime=time(),
)
self.fd += 1
return self.fd
def getattr(self, path: str, fh: typing.Any=None) -> typing.Dict[str, typing.Union[int, float]]:
if path not in self.files:
raise FuseOSError(ENOENT)
return self.files[path]
def getxattr(self, path, name, position=0):
attrs = self.files[path].get('attrs', {})
try:
return attrs[name]
except KeyError:
return '' # Should return ENOATTR
def listxattr(self, path):
attrs = self.files[path].get('attrs', {})
return attrs.keys()
def mkdir(self, path, mode):
self.files[path] = dict(
st_mode=(S_IFDIR | mode),
st_nlink=2,
st_size=0,
st_ctime=time(),
st_mtime=time(),
st_atime=time(),
)
self.files['/']['st_nlink'] += 1
def open(self, path, flags):
self.fd += 1
return self.fd
def read(self, path, size, offset, fh):
return self.data[path][offset : offset + size]
def readdir(self, path, fh):
return ['.', '..'] + [x[1:] for x in self.files if x != '/']
def readlink(self, path):
return self.data[path]
def removexattr(self, path, name):
attrs = self.files[path].get('attrs', {})
try:
del attrs[name]
except KeyError:
pass # Should return ENOATTR
def rename(self, old, new):
self.data[new] = self.data.pop(old)
self.files[new] = self.files.pop(old)
def rmdir(self, path):
# with multiple level support, need to raise ENOTEMPTY if contains any files
self.files.pop(path)
self.files['/']['st_nlink'] -= 1
def setxattr(self, path, name, value, options, position=0):
# Ignore options
attrs = self.files[path].setdefault('attrs', {})
attrs[name] = value
def statfs(self, path):
return dict(f_bsize=512, f_blocks=4096, f_bavail=2048)
def symlink(self, target, source):
self.files[target] = dict(
st_mode=(S_IFLNK | 0o777), st_nlink=1, st_size=len(source)
)
self.data[target] = source
def truncate(self, path, length, fh=None):
# make sure extending the file fills in zero bytes
self.data[path] = self.data[path][:length].ljust(length, b'\x00')
self.files[path]['st_size'] = length
def unlink(self, path):
self.data.pop(path)
self.files.pop(path)
def utimens(self, path, times=None):
now = time()
atime, mtime = times if times else (now, now)
self.files[path]['st_atime'] = atime
self.files[path]['st_mtime'] = mtime
def write(self, path, data, offset, fh):
self.data[path] = (
# make sure the data gets inserted at the right offset
self.data[path][:offset].ljust(offset, b'\x00')
+ data
# and only overwrites the bytes that data is replacing
+ self.data[path][offset + len(data) :]
)
self.files[path]['st_size'] = len(self.data[path])
return len(data)
class Command(BaseCommand):
args = "<mod.name=value mod.name=value mod.name=value...>"
help = "Updates configuration values. If mod is omitted, UDS will be used. Omit whitespaces betwen name, =, and value (they must be a single param)"
def add_arguments(self, parser):
parser.add_argument(
'mount_point', type=str, help='Mount point for the FUSE filesystem'
)
# parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging')
def handle(self, *args, **options):
logger.debug("Handling UDS FS")
fuse = FUSE(UDSFS(), options['mount_point'], foreground=True, allow_other=True)
# Placeholder, import the command from udsfs
from .udsfs import Command

View File

@ -0,0 +1,87 @@
import errno
import stat
import os.path
import logging
import typing
from django.core.management.base import BaseCommand
from uds import models
from uds.core.util.fuse import FUSE, FuseOSError, Operations
from . import types
from . import events
logger = logging.getLogger(__name__)
class UDSFS(Operations):
dispatchers: typing.ClassVar[typing.Dict[str, types.UDSFSInterface]] = {
'events': events.EventFS()
}
# Own stats are the service creation date and 2 hardlinks because of the root folder
_own_stats = types.StatType(st_mode=(stat.S_IFDIR | 0o755), st_nlink=2 + len(dispatchers))
def __init__(self):
pass
def _dispatch(self, path: typing.Optional[str], operation: str, *args, **kwargs) -> typing.Any:
try:
if path:
path_parts = path.split('/')
logger.debug('Dispatching %s for %s', operation, path_parts)
if path_parts[1] in self.dispatchers:
return getattr(self.dispatchers[path_parts[1]], operation)(path_parts[2:], *args, **kwargs)
except Exception as e:
logger.error('Error while dispatching %s for %s: %s', operation, path, e)
raise FuseOSError(errno.ENOENT)
def getattr(self, path: typing.Optional[str], fh: typing.Any = None) -> typing.Dict[str, int]:
# If root folder, return service creation date
if path == '/':
return self._own_stats.as_dict()
# If not root folder, split path to locate dispatcher and call it with the rest of the path
attrs = typing.cast(types.StatType, self._dispatch(path, 'getattr')).as_dict()
logger.debug('Attrs for %s: %s', path, attrs)
return attrs
def getxattr(self, path: str, name: str, position: int = 0) -> str:
'''
Get extended attribute for the given path. Right now, always returns an "empty" string
'''
logger.debug('Getting attr %s from %s (%s)', name, path, position)
return ''
def readdir(self, path: str, fh: typing.Any) -> typing.List[str]:
'''
Read directory, that is composed of the dispatcher names and the "dot" entries
in case of the root folder, otherwise call the dispatcher with the rest of the path
'''
if path == '/':
return ['.', '..'] + list(self.dispatchers.keys())
return typing.cast(typing.List[str], self._dispatch(path, 'readdir'))
def read(self, path: typing.Optional[str], size: int, offset: int, fh: typing.Any) -> bytes:
'''
Reads the content of the "virtual" file
'''
return typing.cast(bytes, self._dispatch(path, 'read', size, offset))
class Command(BaseCommand):
args = "<mod.name=value mod.name=value mod.name=value...>"
help = "Updates configuration values. If mod is omitted, UDS will be used. Omit whitespaces betwen name, =, and value (they must be a single param)"
def add_arguments(self, parser):
parser.add_argument(
'mount_point', type=str, help='Mount point for the FUSE filesystem'
)
# parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging')
def handle(self, *args, **options):
logger.debug("Handling UDS FS")
fuse = FUSE(UDSFS(), options['mount_point'], foreground=True, allow_other=True)

View File

@ -0,0 +1,111 @@
import stat
import calendar
import datetime
import typing
import logging
from django.db.models import QuerySet
from uds.models import StatsEvents
from uds.core.util.stats.events import EVENT_NAMES, getOwner
from . import types
logger = logging.getLogger(__name__)
LINELEN = 160
# Helper, returns a "pretty print" of an event
def pretty_print(event: StatsEvents) -> str:
# convert unix timestamp to human readable
dt = datetime.datetime.fromtimestamp(event.stamp)
# Get owner, if it already exists
owner = getOwner(event.owner_type, event.owner_id)
name = getattr(owner, 'name', '') if hasattr(owner, 'name') else '[*Deleted*]'
# Get event name
event_name = EVENT_NAMES[event.event_type]
# Get event description
return f'{dt} - {event_name} {name} - {event.fld1}|{event.fld2}|{event.fld3}|{event.fld3}'
class EventFS(types.UDSFSInterface):
"""
Class to handle events fs in UDS.
"""
_directory_stats: typing.ClassVar[types.StatType] = types.StatType(st_mode=(stat.S_IFDIR | 0o755), st_nlink=1)
_months: typing.ClassVar[typing.List[str]] = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']
def __init__(self):
pass
def getattr(self, path: typing.List[str]) -> types.StatType:
if len(path) < 1:
return EventFS._directory_stats
years = EventFS.last_years()
if len(path) >= 1 and path[0] not in years:
raise FileNotFoundError('No such file or directory')
if len(path) == 1:
return EventFS._directory_stats # Directory
if len(path) >= 2 and path[1] in EventFS._months:
if len(path) == 2:
return EventFS._directory_stats
if len(path) == 3 and int(path[2]) in range(1, EventFS.number_of_days(int(path[0]), int(path[1])) + 1):
# TODO: calculate size of file
size = LINELEN * EventFS.get_events(int(path[0]), int(path[1]), int(path[2]), 0).count()
return types.StatType(st_mode=stat.S_IFREG | 0o444, st_size=size)
raise FileNotFoundError('No such file or directory')
def readdir(self, path: typing.List[str]) -> typing.List[str]:
if len(path) == 0:
# List ., .. and last 4 years as folders
return ['.', '..'] + EventFS.last_years()
last_years = EventFS.last_years()
if len(path) >= 1 and path[0] in last_years:
year = int(path[0])
# Return months as folders
if len(path) == 1:
return ['.', '..'] + EventFS._months
if len(path) == 2 and path[1] in EventFS._months: # Return days of month as indicated on path
month = int(path[1])
return ['.', '..'] + ['{:02d}'.format(x) for x in range(1, EventFS.number_of_days(year, month) + 1)]
raise FileNotFoundError('No such file or directory')
def read(self, path: typing.List[str], size: int, offset: int) -> bytes:
logger.debug('Reading events for %s: offset: %s, size: %s', path, offset, size)
# Compose
# Everly line is 256, calculate skip
skip = offset // LINELEN
# Calculate number of lines to read
lines = size // LINELEN + 1
# Read lines from get_events
year, month, day = int(path[0]), int(path[1]), int(path[2])
logger.debug('Reading %a lines, skiping %s for day %s/%s/%s', lines, skip, year, month, day)
events = EventFS.get_events(year, month, day, skip)
# Compose lines, adjsting each line length to LINELEN
theLines = [pretty_print(x).encode('utf-8') for x in events[:lines]]
# Adjust each line length to LINELEN
theLines = [x + b' ' * (LINELEN - len(x) - 1) + b'\n' for x in theLines]
# Return lines
return b''.join(theLines)[offset:offset+size]
@staticmethod
def last_years() -> typing.List[str]:
return [str(x) for x in range(datetime.datetime.now().year - 4, datetime.datetime.now().year + 1)]
@staticmethod
def number_of_days(year: int, month: int) -> int:
return calendar.monthrange(year, month)[1]
# retrieve Events from a year as a list of events
@staticmethod
def get_events(year: int, month: int, day: int, skip: int = 0) -> QuerySet[StatsEvents]:
# Calculate starting and ending stamp as unix timestamp from year and month
start = calendar.timegm((year, month, day, 0, 0, 0, 0, 0, 0))
end = calendar.timegm((year, month, day, 23, 59, 59, 0, 0, 0))
logger.debug('Reading stats events from %s to %s, skiping %s first', start, end, skip)
return StatsEvents.objects.filter(stamp__gte=start, stamp__lte=end).order_by('stamp')[skip:]

View File

@ -0,0 +1,51 @@
import stat
import time
import logging
import typing
logger = logging.getLogger(__name__)
class StatType(typing.NamedTuple):
st_mode: int = stat.S_IFREG
st_size: int = -1
st_ctime: int = time.time_ns()
st_mtime: int = time.time_ns()
st_atime: int = time.time_ns()
st_nlink: int = 1
def as_dict(self) -> typing.Dict[str, int]:
rst = {
'st_mode': self.st_mode,
'st_ctime': self.st_ctime,
'st_mtime': self.st_mtime,
'st_atime': self.st_atime,
'st_nlink': self.st_nlink
}
# Append optional fields
if self.st_size != -1:
rst['st_size'] = self.st_size
return rst
class UDSFSInterface:
"""
Base Class for UDS Info File system
"""
def getattr(self, path: typing.List[str]) -> StatType:
"""
Get file attributes. Path is the full path to the file, already splitted.
"""
raise NotImplementedError
def readdir(self, path: typing.List[str]) -> typing.List[str]:
"""
Get a list of files in the directory. Path is the full path to the directory, already splitted.
"""
raise NotImplementedError
def read(self, path: typing.List[str], size: int, offset: int) -> bytes:
"""
Read a file. Path is the full path to the file, already splitted.
"""
raise NotImplementedError