Experimenting with use for adding an "UDS FS" so we can explore several UDS contained data easily

This commit is contained in:
Adolfo Gómez García 2021-11-01 00:40:54 +01:00
parent 937465b2f2
commit a957e368e7
3 changed files with 1607 additions and 2 deletions

File diff suppressed because it is too large Load Diff

View File

@ -64,7 +64,7 @@ if typing.TYPE_CHECKING:
ET_OSMANAGER_INIT, ET_OSMANAGER_INIT,
ET_OSMANAGER_READY, ET_OSMANAGER_READY,
ET_OSMANAGER_RELEASE ET_OSMANAGER_RELEASE
) = range(8) ) = range(11)
# Events names # Events names
EVENT_NAMES = { EVENT_NAMES = {
@ -87,7 +87,7 @@ EVENT_NAMES = {
OT_DEPLOYED, OT_DEPLOYED,
OT_AUTHENTICATOR, OT_AUTHENTICATOR,
OT_OSMANAGER OT_OSMANAGER
) = range(4) ) = range(5)
__transDict: typing.Mapping[typing.Type['models.Model'], int] = { __transDict: typing.Mapping[typing.Type['models.Model'], int] = {
ServicePool: OT_DEPLOYED, ServicePool: OT_DEPLOYED,

View File

@ -0,0 +1,173 @@
#!/usr/bin/env python
import logging
import typing
from collections import defaultdict
from errno import ENOENT
from stat import S_IFDIR, S_IFLNK, S_IFREG
from time import time
from django.core.management.base import BaseCommand
from uds import models
from uds.core.util.fuse import FUSE, FuseOSError, Operations
logger = logging.getLogger(__name__)
class UDSFS(Operations):
'Example memory filesystem. Supports only one level of files.'
def __init__(self):
self.files = {}
self.data = defaultdict(bytes)
self.fd = 0
now = time()
self.files['/'] = dict(
st_mode=(S_IFDIR | 0o755),
st_ctime=now,
st_mtime=now,
st_atime=now,
st_nlink=2,
)
def chmod(self, path: str, mode: int) -> int:
logger.debug('CHMOD: %s %s', path, mode)
self.files[path]['st_mode'] &= 0o770000
self.files[path]['st_mode'] |= mode
return 0
def chown(self, path: str, uid: int, gid: int) -> None:
self.files[path]['st_uid'] = uid
self.files[path]['st_gid'] = gid
def create(self, path: str, mode: int, fi: typing.Any = None) -> int:
self.files[path] = dict(
st_mode=(S_IFREG | mode),
st_nlink=1,
st_size=0,
st_ctime=time(),
st_mtime=time(),
st_atime=time(),
)
self.fd += 1
return self.fd
def getattr(self, path: str, fh: typing.Any=None) -> typing.Dict[str, typing.Union[int, float]]:
if path not in self.files:
raise FuseOSError(ENOENT)
return self.files[path]
def getxattr(self, path, name, position=0):
attrs = self.files[path].get('attrs', {})
try:
return attrs[name]
except KeyError:
return '' # Should return ENOATTR
def listxattr(self, path):
attrs = self.files[path].get('attrs', {})
return attrs.keys()
def mkdir(self, path, mode):
self.files[path] = dict(
st_mode=(S_IFDIR | mode),
st_nlink=2,
st_size=0,
st_ctime=time(),
st_mtime=time(),
st_atime=time(),
)
self.files['/']['st_nlink'] += 1
def open(self, path, flags):
self.fd += 1
return self.fd
def read(self, path, size, offset, fh):
return self.data[path][offset : offset + size]
def readdir(self, path, fh):
return ['.', '..'] + [x[1:] for x in self.files if x != '/']
def readlink(self, path):
return self.data[path]
def removexattr(self, path, name):
attrs = self.files[path].get('attrs', {})
try:
del attrs[name]
except KeyError:
pass # Should return ENOATTR
def rename(self, old, new):
self.data[new] = self.data.pop(old)
self.files[new] = self.files.pop(old)
def rmdir(self, path):
# with multiple level support, need to raise ENOTEMPTY if contains any files
self.files.pop(path)
self.files['/']['st_nlink'] -= 1
def setxattr(self, path, name, value, options, position=0):
# Ignore options
attrs = self.files[path].setdefault('attrs', {})
attrs[name] = value
def statfs(self, path):
return dict(f_bsize=512, f_blocks=4096, f_bavail=2048)
def symlink(self, target, source):
self.files[target] = dict(
st_mode=(S_IFLNK | 0o777), st_nlink=1, st_size=len(source)
)
self.data[target] = source
def truncate(self, path, length, fh=None):
# make sure extending the file fills in zero bytes
self.data[path] = self.data[path][:length].ljust(length, b'\x00')
self.files[path]['st_size'] = length
def unlink(self, path):
self.data.pop(path)
self.files.pop(path)
def utimens(self, path, times=None):
now = time()
atime, mtime = times if times else (now, now)
self.files[path]['st_atime'] = atime
self.files[path]['st_mtime'] = mtime
def write(self, path, data, offset, fh):
self.data[path] = (
# make sure the data gets inserted at the right offset
self.data[path][:offset].ljust(offset, b'\x00')
+ data
# and only overwrites the bytes that data is replacing
+ self.data[path][offset + len(data) :]
)
self.files[path]['st_size'] = len(self.data[path])
return len(data)
class Command(BaseCommand):
args = "<mod.name=value mod.name=value mod.name=value...>"
help = "Updates configuration values. If mod is omitted, UDS will be used. Omit whitespaces betwen name, =, and value (they must be a single param)"
def add_arguments(self, parser):
parser.add_argument(
'mount_point', type=str, help='Mount point for the FUSE filesystem'
)
# parser.add_argument('-d', '--debug', action='store_true', help='Enable debug logging')
def handle(self, *args, **options):
logger.debug("Handling UDS FS")
fuse = FUSE(UDSFS(), options['mount_point'], foreground=True, allow_other=True)