Adding type checking on fuse before proceding to use it

This commit is contained in:
Adolfo Gómez García 2021-11-01 13:49:02 +01:00
parent a957e368e7
commit a6f1e95cc0
5 changed files with 383 additions and 408 deletions

View File

@ -13,6 +13,8 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# Modified to add type checking, fix bugs, etc.. by dkmaster@dkmon.com
import sys
import os
import ctypes
@ -498,7 +500,7 @@ else:
]
class fuse_context(ctypes.Structure):
class FuseContext(ctypes.Structure):
_fields_ = [
('fuse', ctypes.c_voidp), # type: ignore
('uid', c_uid_t),
@ -508,10 +510,10 @@ class fuse_context(ctypes.Structure):
]
_libfuse.fuse_get_context.restype = ctypes.POINTER(fuse_context)
_libfuse.fuse_get_context.restype = ctypes.POINTER(FuseContext)
class fuse_operations(ctypes.Structure):
class FuseOperations(ctypes.Structure):
_fields_ = [
(
'getattr',
@ -710,25 +712,18 @@ class fuse_operations(ctypes.Structure):
]
def time_of_timespec(ts, use_ns=False):
if use_ns:
return ts.tv_sec * 10 ** 9 + ts.tv_nsec
else:
return ts.tv_sec + ts.tv_nsec / 1e9
def time_of_timespec(ts: c_timespec, use_ns=False):
return ts.tv_sec * 10 ** 9 + ts.tv_nsec
def set_st_attrs(st, attrs, use_ns=False):
def set_st_attrs(st: c_stat, attrs: typing.Mapping[str, int]) -> None:
for key, val in attrs.items():
if key in ('st_atime', 'st_mtime', 'st_ctime', 'st_birthtime'):
timespec = getattr(st, key + 'spec', None)
timespec: typing.Optional[c_timespec] = getattr(st, key + 'spec', None)
if timespec is None:
continue
if use_ns:
timespec.tv_sec, timespec.tv_nsec = divmod(int(val), 10 ** 9)
else:
timespec.tv_sec = int(val)
timespec.tv_nsec = int((val - timespec.tv_sec) * 1e9)
timespec.tv_sec, timespec.tv_nsec = divmod(int(val), 10 ** 9)
elif hasattr(st, key):
setattr(st, key, val)
@ -773,8 +768,14 @@ class FUSE:
)
def __init__(
self, operations, mountpoint, raw_fi=False, encoding='utf-8', **kwargs
):
self,
operations: 'Operations',
mountpoint: str,
*,
raw_fi: bool = False,
encoding: typing.Optional[str] = None,
**kwargs,
) -> None:
'''
Setting raw_fi to True will cause FUSE to pass the fuse_file_info
@ -782,54 +783,43 @@ class FUSE:
This gives you access to direct_io, keep_cache, etc.
'''
encoding = encoding or 'utf-8'
self.operations = operations
self.raw_fi = raw_fi
self.encoding = encoding
self.__critical_exception: BaseException = Exception()
self.use_ns = getattr(operations, 'use_ns', False)
if not self.use_ns:
warnings.warn(
'Time as floating point seconds for utimens is deprecated!\n'
'To enable time as nanoseconds set the property "use_ns" to '
'True in your operations class or set your fusepy '
'requirements to <4.',
DeprecationWarning,
)
# string arguments
sargs: typing.List[str] = ['fuse']
args = ['fuse']
args.extend(flag for arg, flag in self.OPTIONS if kwargs.pop(arg, False))
# Convert options to fuse flags
sargs.extend(flag for arg, flag in self.OPTIONS if kwargs.pop(arg, False))
kwargs.setdefault('fsname', operations.__class__.__name__)
args.append('-o')
args.append(','.join(self._normalize_fuse_options(**kwargs)))
args.append(mountpoint)
sargs.append('-o')
sargs.append(','.join(FUSE._normalize_fuse_options(**kwargs)))
sargs.append(mountpoint)
args = [arg.encode(encoding) for arg in args]
args: typing.List[bytes] = [arg.encode(encoding) for arg in sargs]
argv = (ctypes.c_char_p * len(args))(*args)
fuse_ops = fuse_operations()
for ent in fuse_operations._fields_:
fuse_ops = FuseOperations()
for ent in FuseOperations._fields_:
name, prototype = ent[:2]
check_name = name
# ftruncate()/fgetattr() are implemented in terms of their
# non-f-prefixed versions in the operations object
if check_name in ["ftruncate", "fgetattr"]:
if check_name in ['ftruncate', 'fgetattr']:
check_name = check_name[1:]
val = getattr(operations, check_name, None)
if val is None:
continue
# Function pointer members are tested for using the
# getattr(operations, name) above but are dynamically
# invoked using self.operations(name)
if hasattr(prototype, 'argtypes'):
val = prototype(partial(self._wrapper, getattr(self, name)))
val = prototype(partial(FUSE._wrapper, getattr(self, name)))
setattr(fuse_ops, name, val)
@ -855,16 +845,16 @@ class FUSE:
raise RuntimeError(err)
@staticmethod
def _normalize_fuse_options(**kargs):
def _normalize_fuse_options(**kargs) -> typing.Generator[str, None, None]:
for key, value in kargs.items():
if isinstance(value, bool):
if value is True:
yield key
else:
yield '%s=%s' % (key, value)
yield f'{key}={value}'
@staticmethod
def _wrapper(func, *args, **kwargs):
def _wrapper(func: typing.Callable, *args, **kwargs) -> int:
'Decorator for the methods that follow'
try:
@ -884,7 +874,6 @@ class FUSE:
func.__name__,
type(e),
e.errno,
exc_info=True,
)
return -e.errno
else:
@ -920,106 +909,114 @@ class FUSE:
fuse_exit()
return -errno.EFAULT
def _decode_optional_path(self, path):
def _decode_optional_path(
self, path: typing.Optional[bytes]
) -> typing.Optional[str]:
# NB: this method is intended for fuse operations that
# allow the path argument to be NULL,
# *not* as a generic path decoding method
if path is None:
return None
return path.decode(self.encoding)
return path.decode(self.encoding) if path else None
def getattr(self, path, buf):
def getattr(self, path: bytes, buf: typing.Any) -> None:
return self.fgetattr(path, buf, None)
def readlink(self, path, buf, bufsize):
ret = self.operations('readlink', path.decode(self.encoding)).encode(
self.encoding
)
def readlink(self, path: bytes, buf: typing.Any, bufsize: int) -> None:
ret = self.operations.readlink(path.decode(self.encoding)).encode(self.encoding)
# copies a string into the given buffer
# (null terminated and truncated if necessary)
data = ctypes.create_string_buffer(ret[: bufsize - 1])
ctypes.memmove(buf, data, len(data))
return 0
def mknod(self, path, mode, dev):
return self.operations('mknod', path.decode(self.encoding), mode, dev)
def mknod(self, path: bytes, mode: int, dev: typing.Any) -> int:
return self.operations.mknod(path.decode(self.encoding), mode, dev)
def mkdir(self, path, mode):
return self.operations('mkdir', path.decode(self.encoding), mode)
def mkdir(self, path: bytes, mode: int) -> None:
return self.operations.mkdir(path.decode(self.encoding), mode)
def unlink(self, path):
return self.operations('unlink', path.decode(self.encoding))
def unlink(self, path: bytes) -> None:
return self.operations.unlink(path.decode(self.encoding))
def rmdir(self, path):
return self.operations('rmdir', path.decode(self.encoding))
def rmdir(self, path: bytes) -> None:
return self.operations.rmdir(path.decode(self.encoding))
def symlink(self, source, target):
def symlink(self, source: bytes, target: bytes) -> None:
'creates a symlink `target -> source` (e.g. ln -s source target)'
return self.operations(
'symlink', target.decode(self.encoding), source.decode(self.encoding)
return self.operations.symlink(
target.decode(self.encoding), source.decode(self.encoding)
)
def rename(self, old, new):
return self.operations(
'rename', old.decode(self.encoding), new.decode(self.encoding)
def rename(self, old: bytes, new: bytes) -> None:
return self.operations.rename(
old.decode(self.encoding), new.decode(self.encoding)
)
def link(self, source, target):
def link(self, source: bytes, target: bytes) -> None:
'creates a hard link `target -> source` (e.g. ln source target)'
return self.operations(
'link', target.decode(self.encoding), source.decode(self.encoding)
return self.operations.link(
target.decode(self.encoding), source.decode(self.encoding)
)
def chmod(self, path, mode):
return self.operations('chmod', path.decode(self.encoding), mode)
def chmod(self, path: bytes, mode: int):
return self.operations.chmod(path.decode(self.encoding), mode)
def chown(self, path, uid, gid):
def chown(self, path: bytes, uid: int, gid: int) -> None:
# Check if any of the arguments is a -1 that has overflowed
if c_uid_t(uid + 1).value == 0:
uid = -1
if c_gid_t(gid + 1).value == 0:
gid = -1
return self.operations('chown', path.decode(self.encoding), uid, gid)
return self.operations.chown(path.decode(self.encoding), uid, gid)
def truncate(self, path, length):
return self.operations('truncate', path.decode(self.encoding), length)
def truncate(self, path: bytes, length: int) -> None:
return self.operations.truncate(path.decode(self.encoding), length)
def open(self, path, fip):
def open(self, path: bytes, fip: typing.Any) -> None:
fi = fip.contents
if self.raw_fi:
return self.operations('open', path.decode(self.encoding), fi)
self.operations.open(path.decode(self.encoding), fi)
else:
fi.fh = self.operations('open', path.decode(self.encoding), fi.flags)
fi.fh = self.operations.open(path.decode(self.encoding), fi.flags)
return 0
def read(self, path, buf, size, offset, fip):
def read(
self, path: bytes, buf: typing.Any, size: int, offset: int, fip: typing.Any
):
# fip is a pointer to a struct fuse_file_info
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
ret = self.operations(
'read', self._decode_optional_path(path), size, offset, fh
)
ret = self.operations.read(self._decode_optional_path(path), size, offset, fh)
if not ret:
return 0
retsize = len(ret)
assert retsize <= size, 'actual amount read %d greater than expected %d' % (
retsize,
size,
)
if retsize > size:
raise RuntimeError(
"read too much data ({} bytes, expected {})".format(
retsize, size
)
)
ctypes.memmove(buf, ret, retsize)
return retsize
def write(self, path, buf, size, offset, fip):
def write(
self,
path: typing.Optional[bytes],
buf: typing.Any,
size: int,
offset: int,
fip: typing.Any,
) -> int:
# fip is a pointer to a struct fuse_file_info
# buf is a char*
data = ctypes.string_at(buf, size)
if self.raw_fi:
@ -1027,57 +1024,62 @@ class FUSE:
else:
fh = fip.contents.fh
return self.operations(
'write', self._decode_optional_path(path), data, offset, fh
)
return self.operations.write(self._decode_optional_path(path), data, offset, fh)
def statfs(self, path, buf):
def statfs(self, path: bytes, buf: typing.Any) -> None:
stv = buf.contents
attrs = self.operations('statfs', path.decode(self.encoding))
attrs = self.operations.statfs(path.decode(self.encoding))
for key, val in attrs.items():
if hasattr(stv, key):
setattr(stv, key, val)
return 0
def flush(self, path, fip):
def flush(self, path: typing.Optional[bytes], fip: typing.Any) -> None:
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
return self.operations('flush', self._decode_optional_path(path), fh)
self.operations.flush(self._decode_optional_path(path), fh)
def release(self, path, fip):
def release(self, path: typing.Optional[bytes], fip: typing.Any) -> None:
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
return self.operations('release', self._decode_optional_path(path), fh)
return self.operations.release(self._decode_optional_path(path), fh)
def fsync(self, path, datasync, fip):
def fsync(self, path: typing.Optional[bytes], datasync: bool, fip: typing.Any):
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
return self.operations('fsync', self._decode_optional_path(path), datasync, fh)
return self.operations.fsync(self._decode_optional_path(path), datasync, fh)
def setxattr(self, path, name, value, size, options, *args):
return self.operations(
'setxattr',
def setxattr(
self,
path: bytes,
name: bytes,
value: typing.Any,
size: int,
options: int,
*args,
) -> None:
return self.operations.setxattr(
path.decode(self.encoding),
name.decode(self.encoding),
ctypes.string_at(value, size),
ctypes.string_at(value, size).decode(self.encoding),
options,
*args
*args,
)
def getxattr(self, path, name, value, size, *args):
ret = self.operations(
'getxattr', path.decode(self.encoding), name.decode(self.encoding), *args
)
def getxattr(
self, path: bytes, name: bytes, value: typing.Any, size: int, *args
) -> int:
ret = self.operations.getxattr(
path.decode(self.encoding), name.decode(self.encoding), *args
).encode(self.encoding)
retsize = len(ret)
# allow size queries
@ -1094,8 +1096,8 @@ class FUSE:
return retsize
def listxattr(self, path, namebuf, size):
attrs = self.operations('listxattr', path.decode(self.encoding)) or ''
def listxattr(self, path: bytes, namebuf: typing.Any, size: int) -> int:
attrs = self.operations.listxattr(path.decode(self.encoding)) or []
ret = '\x00'.join(attrs).encode(self.encoding)
if len(ret) > 0:
ret += '\x00'.encode(self.encoding)
@ -1114,81 +1116,82 @@ class FUSE:
return retsize
def removexattr(self, path, name):
return self.operations(
'removexattr', path.decode(self.encoding), name.decode(self.encoding)
def removexattr(self, path: bytes, name: bytes) -> None:
return self.operations.removexattr(
path.decode(self.encoding), name.decode(self.encoding)
)
def opendir(self, path, fip):
def opendir(self, path: bytes, fip: typing.Any) -> None:
# Ignore raw_fi
fip.contents.fh = self.operations('opendir', path.decode(self.encoding))
fip.contents.fh = self.operations.opendir(path.decode(self.encoding))
return 0
def readdir(self, path, buf, filler, offset, fip):
def readdir(
self,
path: bytes,
buf: typing.Any,
filler: typing.Any,
offset: int,
fip: typing.Any,
) -> None:
# Ignore raw_fi
for item in self.operations(
'readdir', self._decode_optional_path(path), fip.contents.fh
for item in self.operations.readdir(
path.decode(self.encoding), fip.contents.fh
):
if isinstance(item, str):
name, st, offset = item, None, 0
else:
name, attrs, offset = item
if attrs:
st = c_stat()
set_st_attrs(st, attrs, use_ns=self.use_ns)
set_st_attrs(st, attrs)
else:
st = None
if filler(buf, name.encode(self.encoding), st, offset) != 0:
break
return 0
def releasedir(self, path, fip):
def releasedir(self, path: typing.Optional[bytes], fip: typing.Any):
# Ignore raw_fi
return self.operations(
'releasedir', self._decode_optional_path(path), fip.contents.fh
)
return self.operations.releasedir(self._decode_optional_path(path), fip.contents.fh)
def fsyncdir(self, path, datasync, fip):
def fsyncdir(self, path: typing.Optional[bytes], datasync: bool, fip: typing.Any):
# Ignore raw_fi
return self.operations(
'fsyncdir', self._decode_optional_path(path), datasync, fip.contents.fh
)
return self.operations.fsyncdir(self._decode_optional_path(path), datasync, fip.contents.fh)
def init(self, conn):
return self.operations('init', '/')
def init(self, conn: typing.Any) -> None:
return self.operations.init('/')
def destroy(self, private_data):
return self.operations('destroy', '/')
def destroy(self, private_data: typing.Any) -> None:
return self.operations.destroy('/')
def access(self, path, amode):
return self.operations('access', path.decode(self.encoding), amode)
def access(self, path:bytes, amode: int) -> None:
return self.operations.access(path.decode(self.encoding), amode)
def create(self, path, mode, fip):
def create(self, path: bytes, mode: int, fip: typing.Any) -> None:
fi = fip.contents
path = path.decode(self.encoding)
if self.raw_fi:
return self.operations('create', path, mode, fi)
self.operations.create(path.decode(self.encoding), mode, fi)
else:
fi.fh = self.operations('create', path, mode)
return 0
fi.fh = self.operations.create(path.decode(self.encoding), mode, None)
def ftruncate(self, path, length, fip):
def ftruncate(
self, path: typing.Optional[bytes], length: int, fip: typing.Any
) -> int:
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
return self.operations('truncate', self._decode_optional_path(path), length, fh)
self.operations.truncate(self._decode_optional_path(path), length, fh)
return 0
def fgetattr(self, path, buf, fip):
def fgetattr(
self, path: typing.Optional[bytes], buf: typing.Any, fip: typing.Any
) -> None:
ctypes.memset(buf, 0, ctypes.sizeof(c_stat))
st = buf.contents
st: c_stat = buf.contents
if not fip:
fh = fip
elif self.raw_fi:
@ -1196,39 +1199,55 @@ class FUSE:
else:
fh = fip.contents.fh
attrs = self.operations('getattr', self._decode_optional_path(path), fh)
set_st_attrs(st, attrs, use_ns=self.use_ns)
return 0
attrs = self.operations.getattr(self._decode_optional_path(path), fh)
set_st_attrs(st, attrs)
def lock(self, path, fip, cmd, lock):
def lock(
self,
path: typing.Optional[bytes],
fip: typing.Any,
cmd: bytes,
lock: typing.Any,
):
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
return self.operations('lock', self._decode_optional_path(path), fh, cmd, lock)
return self.operations.lock(self._decode_optional_path(path), fh, cmd, lock)
def utimens(self, path, buf):
if buf:
atime = time_of_timespec(buf.contents.actime, use_ns=self.use_ns)
mtime = time_of_timespec(buf.contents.modtime, use_ns=self.use_ns)
times = (atime, mtime)
else:
times = None
def utimens(self, path: bytes, buf: typing.Any) -> int:
times: typing.Optional[typing.Tuple[int, int]] = None
times = (
(
time_of_timespec(buf.contents.actime),
time_of_timespec(buf.contents.modtime),
)
if buf
else None
)
return self.operations('utimens', path.decode(self.encoding), times)
return self.operations.utimens(path.decode(self.encoding), times)
def bmap(self, path: bytes, blocksize: int, idx: int):
return self.operations('bmap', path.decode(self.encoding), blocksize, idx)
def bmap(self, path: bytes, blocksize: int, idx: int) -> int:
return self.operations.bmap(path.decode(self.encoding), blocksize, idx)
def ioctl(self, path, cmd, arg, fip, flags, data):
def ioctl(
self,
path: bytes,
cmd: bytes,
arg: bytes,
fip: typing.Any,
flags: int,
data: bytes,
):
if self.raw_fi:
fh = fip.contents
else:
fh = fip.contents.fh
return self.operations(
'ioctl', path.decode(self.encoding), cmd, arg, fh, flags, data
return self.operations.ioctl(
path.decode(self.encoding), cmd, arg, fh, flags, data
)
@ -1242,56 +1261,56 @@ class Operations:
or the corresponding system call man page.
'''
def __call__(self, op, *args) -> typing.Any:
if not hasattr(self, op):
def __call__(self, op: str, *args) -> typing.Any:
try:
return getattr(self, op)(*args)
except AttributeError:
raise FuseOSError(errno.EFAULT)
return getattr(self, op)(*args)
def access(self, path: str, amode: int) -> int:
def access(self, path: str, amode: int) -> None:
return
def bmap(self, path: str, blocksize: int, idx: int) -> int:
return 0
bmap = None
def chmod(self, path: str, mode: int) -> int:
def chmod(self, path: str, mode: int) -> None:
raise FuseOSError(errno.EROFS)
def chown(self, path: str, uid: int, gid: int) -> None:
raise FuseOSError(errno.EROFS)
def create(self, path: str, mode: int, fi: typing.Any = None):
def create(self, path: str, mode: int, fi: typing.Any) -> int:
'''
When raw_fi is False (default case), fi is None and create should
return a numerical file handle.
When raw_fi is True the file handle should be set directly by create
and return 0.
and return value is not used.
'''
raise FuseOSError(errno.EROFS)
def destroy(self, path: str) -> None:
'Called on filesystem destruction. Path is always /'
pass
def flush(self, path: str, fh: typing.Any) -> int:
return 0
def flush(self, path: typing.Optional[str], fh: typing.Any) -> None:
pass
def fsync(self, path: str, datasync: int, fh: typing.Any) -> int:
return 0
def fsync(self, path: typing.Optional[str], datasync: bool, fh: typing.Any) -> None:
pass
def fsyncdir(self, path: str, datasync: int, fh: typing.Any) -> int:
return 0
def fsyncdir(
self, path: typing.Optional[str], datasync: bool, fh: typing.Any
) -> None:
pass
def getattr(
self, path: str, fh: typing.Any = None
) -> typing.Dict[str, typing.Union[int, float]]:
self, path: typing.Optional[str], fh: typing.Any = None
) -> typing.Dict[str, int]:
'''
Returns a dictionary with keys identical to the stat C structure of
stat(2).
st_atime, st_mtime and st_ctime should be floats.
NOTE: There is an incompatibility between Linux and Mac OS X
concerning st_nlink of directories. Mac OS X counts all files inside
the directory, while Linux counts only the subdirectories.
@ -1301,9 +1320,7 @@ class Operations:
raise FuseOSError(errno.ENOENT)
return dict(st_mode=(S_IFDIR | 0o755), st_nlink=2)
def getxattr(
self, path: str, name: str, position: int = 0
) -> typing.Dict[str, typing.Union[int, float]]:
def getxattr(self, path: str, name: str, position: int = 0) -> str:
raise FuseOSError(ENOTSUP)
def init(self, path: str) -> None:
@ -1314,10 +1331,12 @@ class Operations:
'''
pass
def ioctl(self, path: str, cmd: str, arg: str, fip: int, flags: int, data: bytes) -> int:
def ioctl(
self, path: str, cmd: bytes, arg: bytes, fip: int, flags: int, data: bytes
) -> int:
raise FuseOSError(errno.ENOTTY)
def link(self, target: str, source: str):
def link(self, target: str, source: str) -> None:
'creates a hard link `target -> source` (e.g. ln source target)'
raise FuseOSError(errno.EROFS)
@ -1325,12 +1344,15 @@ class Operations:
def listxattr(self, path: str) -> typing.List[str]:
return []
lock = None
def lock(
self, path: typing.Optional[str], fh: typing.Any, cmd: bytes, lock: typing.Any
) -> int:
return 0
def mkdir(self, path: str, mode: int) -> None:
raise FuseOSError(errno.EROFS)
def mknod(self, path: str, mode: int, dev: int) -> None:
def mknod(self, path: str, mode: int, dev: int) -> int:
raise FuseOSError(errno.EROFS)
def open(self, path: str, flags: int) -> int:
@ -1341,22 +1363,26 @@ class Operations:
When raw_fi is True the signature of open becomes:
open(self, path, fi)
and the file handle should be set directly.
and the file handle should be set directly. In this case, the return value is ignored.
'''
return 0
def opendir(self, path: str) -> int:
'Returns a numerical file handle.'
return 0
def read(self, path: str, size: int, offset: int, fh: int) -> bytes:
def read(
self, path: typing.Optional[str], size: int, offset: int, fh: typing.Any
) -> bytes:
'Returns a string containing the data requested.'
raise FuseOSError(errno.EIO)
def readdir(self, path, fh):
def readdir(
self, path: str, fh: typing.Any
) -> typing.Union[
typing.List[str], typing.List[typing.Tuple[str, typing.Dict[str, int], int]]
]:
'''
Can return either a list of names, or a list of (name, attrs, offset)
tuples. attrs is a dict as in getattr.
@ -1364,28 +1390,30 @@ class Operations:
return ['.', '..']
def readlink(self, path):
def readlink(self, path: str) -> str:
raise FuseOSError(errno.ENOENT)
def release(self, path, fh):
return 0
def release(self, path: typing.Optional[str], fh: typing.Any) -> None:
pass
def releasedir(self, path, fh):
return 0
def releasedir(self, path: typing.Optional[str], fh: typing.Any) -> None:
pass
def removexattr(self, path, name):
def removexattr(self, path: str, name: str) -> None:
raise FuseOSError(ENOTSUP)
def rename(self, old, new):
def rename(self, old: str, new: str) -> None:
raise FuseOSError(errno.EROFS)
def rmdir(self, path):
def rmdir(self, path: str) -> None:
raise FuseOSError(errno.EROFS)
def setxattr(self, path, name, value, options, position=0):
def setxattr(
self, path: str, name: str, value: str, options: int, position: int = 0
) -> None:
raise FuseOSError(ENOTSUP)
def statfs(self, path):
def statfs(self, path: str) -> typing.Dict[str, typing.Union[int, float]]:
'''
Returns a dictionary with keys identical to the statvfs C structure of
statvfs(3).
@ -1396,37 +1424,27 @@ class Operations:
return {}
def symlink(self, target, source):
def symlink(self, target: str, source: str) -> None:
'creates a symlink `target -> source` (e.g. ln -s source target)'
raise FuseOSError(errno.EROFS)
def truncate(self, path, length, fh: typing.Any=None):
def truncate(
self, path: typing.Optional[str], length: int, fh: typing.Any = None
) -> None:
raise FuseOSError(errno.EROFS)
def unlink(self, path):
def unlink(self, path: str) -> None:
raise FuseOSError(errno.EROFS)
def utimens(self, path, times=None):
def utimens(
self, path, times: typing.Optional[typing.Tuple[float, float]] = None
) -> int:
'Times is a (atime, mtime) tuple. If None use current time.'
return 0
def write(self, path, data, offset, fh):
def write(
self, path: typing.Optional[str], data: bytes, offset: int, fh: typing.Any
) -> int:
raise FuseOSError(errno.EROFS)
class LoggingMixIn:
log = logging.getLogger('fuse.log-mixin')
def __call__(self, op, path, *args):
self.log.debug('-> %s %s %s', op, path, repr(args))
ret = '[Unhandled Exception]'
try:
ret = getattr(self, op)(path, *args)
return ret
except OSError as e:
ret = str(e)
raise
finally:
self.log.debug('<- %s %s', op, repr(ret))

View File

@ -1,173 +1,2 @@
#!/usr/bin/env python
import logging
import typing
from collections import defaultdict
from errno import ENOENT
from stat import S_IFDIR, S_IFLNK, S_IFREG
from time import time
from django.core.management.base import BaseCommand
from uds import models
from uds.core.util.fuse import FUSE, FuseOSError, Operations
logger = logging.getLogger(__name__)
class UDSFS(Operations):
'Example memory filesystem. Supports only one level of files.'
def __init__(self):
self.files = {}
self.data = defaultdict(bytes)
self.fd = 0
now = time()
self.files['/'] = dict(
st_mode=(S_IFDIR | 0o755),
st_ctime=now,
st_mtime=now,
st_atime=now,
st_nlink=2,
)
def chmod(self, path: str, mode: int) -> int:
logger.debug('CHMOD: %s %s', path, mode)
self.files[path]['st_mode'] &= 0o770000
self.files[path]['st_mode'] |= mode
return 0
def chown(self, path: str, uid: int, gid: int) -> None:
self.files[path]['st_uid'] = uid
self.files[path]['st_gid'] = gid
def create(self, path: str, mode: int, fi: typing.Any = None) -> int:
self.files[path] = dict(
st_mode=(S_IFREG | mode),
st_nlink=1,
st_size=0,
st_ctime=time(),
st_mtime=time(),
st_atime=time(),
)
self.fd += 1
return self.fd
def getattr(self, path: str, fh: typing.Any=None) -> typing.Dict[str, typing.Union[int, float]]:
if path not in self.files:
raise FuseOSError(ENOENT)
return self.files[path]
def getxattr(self, path, name, position=0):
attrs = self.files[path].get('attrs', {})
try:
return attrs[name]
except KeyError:
return '' # Should return ENOATTR
def listxattr(self, path):
attrs = self.files[path].get('attrs', {})
return attrs.keys()
def mkdir(self, path, mode):
self.files[path] = dict(
st_mode=(S_IFDIR | mode),
st_nlink=2,
st_size=0,
st_ctime=time(),
st_mtime=time(),
st_atime=time(),
)
self.files['/']['st_nlink'] += 1
def open(self, path, flags):
self.fd += 1
return self.fd
def read(self, path, size, offset, fh):
return self.data[path][offset : offset + size]
def readdir(self, path, fh):
return ['.', '..'] + [x[1:] for x in self.files if x != '/']
def readlink(self, path):
return self.data[path]
def removexattr(self, path, name):
attrs = self.files[path].get('attrs', {})
try:
del attrs[name]
except KeyError:
pass # Should return ENOATTR
def rename(self, old, new):
self.data[new] = self.data.pop(old)
self.files[new] = self.files.pop(old)
def rmdir(self, path):
# with multiple level support, need to raise ENOTEMPTY if contains any files
self.files.pop(path)
self.files['/']['st_nlink'] -= 1
def setxattr(self, path, name, value, options, position=0):
# Ignore options
attrs = self.files[path].setdefault('attrs', {})
attrs[name] = value
def statfs(self, path):
return dict(f_bsize=512, f_blocks=4096, f_bavail=2048)
def symlink(self, target, source):
self.files[target] = dict(
st_mode=(S_IFLNK | 0o777), st_nlink=1, st_size=len(source)
)
self.data[target] = source
def truncate(self, path, length, fh=None):
# make sure extending the file fills in zero bytes
self.data[path] = self.data[path][:length].ljust(length, b'\x00')
self.files[path]['st_size'] = length
def unlink(self, path):
self.data.pop(path)
self.files.pop(path)
def utimens(self, path, times=None):
now = time()
atime, mtime = times if times else (now, now)
self.files[path]['st_atime'] = atime
self.files[path]['st_mtime'] = mtime
def write(self, path, data, offset, fh):
self.data[path] = (
# make sure the data gets inserted at the right offset
self.data[path][:offset].ljust(offset, b'\x00')
+ data
# and only overwrites the bytes that data is replacing
+ self.data[path][offset + len(data) :]
)
self.files[path]['st_size'] = len(self.data[path])
return len(data)
class Command(BaseCommand):
args = "<mod.name=value mod.name=value mod.name=value...>"
help = "Updates configuration values. If mod is omitted, UDS will be used. Omit whitespaces betwen name, =, and value (they must be a single param)"
def add_arguments(self, parser):
parser.add_argument(
'mount_point', type=str, help='Mount point for the FUSE filesystem'
)
# parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging')
def handle(self, *args, **options):
logger.debug("Handling UDS FS")
fuse = FUSE(UDSFS(), options['mount_point'], foreground=True, allow_other=True)
# Placeholder, import the command from udsfs
from .udsfs import Command

View File

@ -0,0 +1,62 @@
import errno
import stat
import os.path
import logging
import typing
from django.core.management.base import BaseCommand
from uds import models
from uds.core.util.fuse import FUSE, FuseOSError, Operations
from . import types
from . import events
logger = logging.getLogger(__name__)
class UDSFS(Operations):
dispatchers: typing.ClassVar[typing.Dict[str, types.UDSFSInterface]] = {
'events': events.EventFS()
}
_own_stats = types.StatType(st_mode=(stat.S_IFDIR | 0o755), st_nlink=2)
def __init__(self):
pass
def _dispatch(self, path: typing.Optional[str], operation: str, *args, **kwargs) -> typing.Any:
if path:
path_parts = os.path.split(path)
if path_parts[1] in self.dispatchers:
return getattr(self.dispatchers[path_parts[1]], operation)(path_parts[2:], *args, **kwargs)
raise FuseOSError(errno.ENOENT)
def getattr(self, path: typing.Optional[str], fh: typing.Any = None) -> typing.Dict[str, int]:
# If root folder, return service creation date
if path == '/':
return self._own_stats.as_dict()
# If not root folder, split path to locate dispatcher and call it with the rest of the path
return typing.cast(types.StatType, self._dispatch(path, 'getattr')).as_dict()
def readdir(self, path: str, fh: typing.Any) -> typing.List[str]:
if path == '/':
return ['.', '..'] + list(self.dispatchers.keys())
return typing.cast(typing.List[str], self._dispatch(path, 'readdir'))
class Command(BaseCommand):
args = "<mod.name=value mod.name=value mod.name=value...>"
help = "Updates configuration values. If mod is omitted, UDS will be used. Omit whitespaces betwen name, =, and value (they must be a single param)"
def add_arguments(self, parser):
parser.add_argument(
'mount_point', type=str, help='Mount point for the FUSE filesystem'
)
# parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging')
def handle(self, *args, **options):
logger.debug("Handling UDS FS")
fuse = FUSE(UDSFS(), options['mount_point'], foreground=True, allow_other=True)

View File

@ -0,0 +1,27 @@
import stat
import time
import typing
import logging
from . import types
logger = logging.getLogger(__name__)
class EventFS(types.UDSFSInterface):
"""
Class to handle events fs in UDS.
"""
_own_stats = types.StatType(st_mode=(stat.S_IFDIR | 0o755), st_nlink=1)
def __init__(self):
pass
def getattr(self, path: typing.List[str]) -> types.StatType:
if len(path) <= 1:
return self._own_stats
return types.StatType(st_mode=stat.S_IFREG | 0o444, st_nlink=1)
def readdir(self, path: typing.List[str]) -> typing.List[str]:
if len(path) <= 1:
return ['.', '..']
return []

View File

@ -0,0 +1,39 @@
import stat
import time
import logging
import typing
logger = logging.getLogger(__name__)
class StatType(typing.NamedTuple):
st_mode: int
st_ctime: int = -1
st_mtime: int = -1
st_atime: int = -1
st_nlink: int = 1
def as_dict(self) -> typing.Dict[str, int]:
return {
'st_mode': self.st_mode,
'st_ctime': self.st_ctime if self.st_ctime != -1 else int(time.time()),
'st_mtime': self.st_mtime if self.st_mtime != -1 else int(time.time()),
'st_atime': self.st_atime if self.st_atime != -1 else int(time.time()),
'st_nlink': self.st_nlink
}
class UDSFSInterface:
"""
Base Class for UDS Info File system
"""
def getattr(self, path: typing.List[str]) -> StatType:
"""
Get file attributes. Path is the full path to the file, already splitted.
"""
raise NotImplementedError
def readdir(self, path: typing.List[str]) -> typing.List[str]:
"""
Get a list of files in the directory. Path is the full path to the directory, already splitted.
"""
raise NotImplementedError