mirror of
https://github.com/dkmstr/openuds.git
synced 2025-01-13 13:17:54 +03:00
reformated storage source
This commit is contained in:
parent
c698300096
commit
06b0f1396f
@ -1,7 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
#
|
||||
# Copyright (c) 2012-2019 Virtual Cable S.L.
|
||||
# Copyright (c) 2012-2020 Virtual Cable S.L.U.
|
||||
# All rights reserved.
|
||||
#
|
||||
# Redistribution and use in source and binary forms, with or without modification,
|
||||
@ -45,6 +44,7 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
MARK = '_mgb_'
|
||||
|
||||
|
||||
def _calcKey(owner: bytes, key: bytes, extra: typing.Optional[bytes] = None) -> str:
|
||||
h = hashlib.md5()
|
||||
h.update(owner)
|
||||
@ -53,13 +53,17 @@ def _calcKey(owner: bytes, key: bytes, extra: typing.Optional[bytes] = None) ->
|
||||
h.update(extra)
|
||||
return h.hexdigest()
|
||||
|
||||
|
||||
def _encodeValue(key: str, value: typing.Any, compat: bool = False) -> str:
|
||||
if not compat:
|
||||
return base64.b64encode(pickle.dumps((MARK, key, value))).decode()
|
||||
# Compatibility save
|
||||
return base64.b64encode(pickle.dumps(value)).decode()
|
||||
|
||||
def _decodeValue(dbk: str, value: typing.Optional[str]) -> typing.Tuple[str, typing.Any]:
|
||||
|
||||
def _decodeValue(
|
||||
dbk: str, value: typing.Optional[str]
|
||||
) -> typing.Tuple[str, typing.Any]:
|
||||
if value:
|
||||
try:
|
||||
v = pickle.loads(base64.b64decode(value.encode()))
|
||||
@ -76,17 +80,25 @@ def _decodeValue(dbk: str, value: typing.Optional[str]) -> typing.Tuple[str, typ
|
||||
|
||||
|
||||
class StorageAsDict(MutableMapping):
|
||||
'''
|
||||
"""
|
||||
Accesses storage as dictionary. Much more convenient that old method
|
||||
'''
|
||||
def __init__(self, owner: str, group: typing.Optional[str], atomic: bool = False, compat: bool = False) -> None:
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
owner: str,
|
||||
group: typing.Optional[str],
|
||||
atomic: bool = False,
|
||||
compat: bool = False,
|
||||
) -> None:
|
||||
"""Initializes an storage as dict accesor
|
||||
|
||||
Args:
|
||||
owner (str): owner of the storage
|
||||
group (typing.Optional[str]): group for this dict
|
||||
atomic (bool, optional): [description]. if True, operations with DB will be atomic
|
||||
compat (bool, optional): [description]. if True, keys will be generated "old way" (ignoring group)
|
||||
atomic (bool, optional): if True, operations with DB will be atomic
|
||||
compat (bool, optional): if True, keys will be SAVED with old format
|
||||
(that is, without the key) so it can be read by old api
|
||||
"""
|
||||
self._group = group or ''
|
||||
self._owner = owner
|
||||
@ -132,7 +144,9 @@ class StorageAsDict(MutableMapping):
|
||||
dbk = self._key(key)
|
||||
logger.debug('Setitem: %s = %s', dbk, value)
|
||||
data = _encodeValue(key, value, self._compat)
|
||||
c, created = DBStorage.objects.update_or_create(key=dbk, defaults={'data': data, 'attr1': self._group, 'owner': self._owner})
|
||||
c, created = DBStorage.objects.update_or_create(
|
||||
key=dbk, defaults={'data': data, 'attr1': self._group, 'owner': self._owner}
|
||||
)
|
||||
|
||||
def __delitem__(self, key: str) -> None:
|
||||
dbk = self._key(key)
|
||||
@ -140,9 +154,9 @@ class StorageAsDict(MutableMapping):
|
||||
DBStorage.objects.filter(key=dbk).delete()
|
||||
|
||||
def __iter__(self):
|
||||
'''
|
||||
"""
|
||||
Iterates through keys
|
||||
'''
|
||||
"""
|
||||
return iter(_decodeValue(i.key, i.data)[0] for i in self._filtered)
|
||||
|
||||
def __contains__(self, key: object) -> bool:
|
||||
@ -170,11 +184,19 @@ class StorageAsDict(MutableMapping):
|
||||
def group(self, value: str) -> None:
|
||||
self._group = value or ''
|
||||
|
||||
|
||||
class StorageAccess:
|
||||
'''
|
||||
"""
|
||||
Allows the access to the storage as a dict, with atomic transaction if requested
|
||||
'''
|
||||
def __init__(self, owner: str, group: typing.Optional[str] = None, atomic: bool = False, compat: bool = False):
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
owner: str,
|
||||
group: typing.Optional[str] = None,
|
||||
atomic: bool = False,
|
||||
compat: bool = False,
|
||||
):
|
||||
self._owner = owner
|
||||
self._group = group
|
||||
self._atomic = transaction.atomic() if atomic else None
|
||||
@ -183,12 +205,18 @@ class StorageAccess:
|
||||
def __enter__(self):
|
||||
if self._atomic:
|
||||
self._atomic.__enter__()
|
||||
return StorageAsDict(owner=self._owner, group=self._group, atomic=bool(self._atomic), compat=self._compat)
|
||||
return StorageAsDict(
|
||||
owner=self._owner,
|
||||
group=self._group,
|
||||
atomic=bool(self._atomic),
|
||||
compat=self._compat,
|
||||
)
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
if self._atomic:
|
||||
self._atomic.__exit__(exc_type, exc_value, traceback)
|
||||
|
||||
|
||||
class Storage:
|
||||
_owner: str
|
||||
_bownwer: bytes
|
||||
@ -198,9 +226,16 @@ class Storage:
|
||||
self._bowner = self._owner.encode('utf8')
|
||||
|
||||
def getKey(self, key: typing.Union[str, bytes]) -> str:
|
||||
return _calcKey(self._bowner, key.encode('utf8') if isinstance(key, str) else key)
|
||||
return _calcKey(
|
||||
self._bowner, key.encode('utf8') if isinstance(key, str) else key
|
||||
)
|
||||
|
||||
def saveData(self, skey: typing.Union[str, bytes], data: typing.Any, attr1: typing.Optional[str] = None) -> None:
|
||||
def saveData(
|
||||
self,
|
||||
skey: typing.Union[str, bytes],
|
||||
data: typing.Any,
|
||||
attr1: typing.Optional[str] = None,
|
||||
) -> None:
|
||||
# If None is to be saved, remove
|
||||
if not data:
|
||||
self.remove(skey)
|
||||
@ -212,22 +247,40 @@ class Storage:
|
||||
dataStr = codecs.encode(data, 'base64').decode()
|
||||
attr1 = attr1 or ''
|
||||
try:
|
||||
DBStorage.objects.create(owner=self._owner, key=key, data=dataStr, attr1=attr1)
|
||||
DBStorage.objects.create(
|
||||
owner=self._owner, key=key, data=dataStr, attr1=attr1
|
||||
)
|
||||
except Exception:
|
||||
with transaction.atomic():
|
||||
DBStorage.objects.filter(key=key).select_for_update().update(owner=self._owner, data=dataStr, attr1=attr1) # @UndefinedVariable
|
||||
DBStorage.objects.filter(key=key).select_for_update().update(
|
||||
owner=self._owner, data=dataStr, attr1=attr1
|
||||
) # @UndefinedVariable
|
||||
# logger.debug('Key saved')
|
||||
|
||||
def put(self, skey: typing.Union[str, bytes], data: typing.Any) -> None:
|
||||
return self.saveData(skey, data)
|
||||
|
||||
def putPickle(self, skey: typing.Union[str, bytes], data: typing.Any, attr1: typing.Optional[str] = None) -> None:
|
||||
return self.saveData(skey, pickle.dumps(data), attr1) # Protocol 2 is compatible with python 2.7. This will be unnecesary when fully migrated
|
||||
def putPickle(
|
||||
self,
|
||||
skey: typing.Union[str, bytes],
|
||||
data: typing.Any,
|
||||
attr1: typing.Optional[str] = None,
|
||||
) -> None:
|
||||
return self.saveData(
|
||||
skey, pickle.dumps(data), attr1
|
||||
) # Protocol 2 is compatible with python 2.7. This will be unnecesary when fully migrated
|
||||
|
||||
def updateData(self, skey: typing.Union[str, bytes], data: typing.Any, attr1: typing.Optional[str] = None) -> None:
|
||||
def updateData(
|
||||
self,
|
||||
skey: typing.Union[str, bytes],
|
||||
data: typing.Any,
|
||||
attr1: typing.Optional[str] = None,
|
||||
) -> None:
|
||||
self.saveData(skey, data, attr1)
|
||||
|
||||
def readData(self, skey: typing.Union[str, bytes], fromPickle: bool = False) -> typing.Optional[typing.Union[str, bytes]]:
|
||||
def readData(
|
||||
self, skey: typing.Union[str, bytes], fromPickle: bool = False
|
||||
) -> typing.Optional[typing.Union[str, bytes]]:
|
||||
try:
|
||||
key = self.getKey(skey)
|
||||
logger.debug('Accesing to %s %s', skey, key)
|
||||
@ -245,7 +298,9 @@ class Storage:
|
||||
logger.debug('key not found')
|
||||
return None
|
||||
|
||||
def get(self, skey: typing.Union[str, bytes]) -> typing.Optional[typing.Union[str, bytes]]:
|
||||
def get(
|
||||
self, skey: typing.Union[str, bytes]
|
||||
) -> typing.Optional[typing.Union[str, bytes]]:
|
||||
return self.readData(skey)
|
||||
|
||||
def getPickle(self, skey: typing.Union[str, bytes]) -> typing.Any:
|
||||
@ -259,12 +314,18 @@ class Storage:
|
||||
query = DBStorage.objects.filter(owner=self._owner, attr1=attr1)
|
||||
if forUpdate:
|
||||
query = query.select_for_update()
|
||||
return pickle.loads(codecs.decode(query[0].data.encode(), 'base64')) # @UndefinedVariable
|
||||
return pickle.loads(
|
||||
codecs.decode(query[0].data.encode(), 'base64')
|
||||
) # @UndefinedVariable
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def remove(self, skey: typing.Union[typing.Iterable[typing.Union[str, bytes]], str, bytes]) -> None:
|
||||
keys: typing.Iterable[typing.Union[str, bytes]] = [skey] if isinstance(skey, (str, bytes)) else skey
|
||||
def remove(
|
||||
self, skey: typing.Union[typing.Iterable[typing.Union[str, bytes]], str, bytes]
|
||||
) -> None:
|
||||
keys: typing.Iterable[typing.Union[str, bytes]] = (
|
||||
[skey] if isinstance(skey, (str, bytes)) else skey
|
||||
)
|
||||
try:
|
||||
# Process several keys at once
|
||||
DBStorage.objects.filter(key__in=[self.getKey(k) for k in keys]).delete()
|
||||
@ -283,23 +344,38 @@ class Storage:
|
||||
"""
|
||||
# dbStorage.objects.unlock() # @UndefinedVariable
|
||||
|
||||
def map(self, group: typing.Optional[str] = None, atomic: bool = False, compat: bool = False) -> StorageAccess:
|
||||
def map(
|
||||
self,
|
||||
group: typing.Optional[str] = None,
|
||||
atomic: bool = False,
|
||||
compat: bool = False,
|
||||
) -> StorageAccess:
|
||||
return StorageAccess(self._owner, group=group, atomic=atomic, compat=compat)
|
||||
|
||||
def locateByAttr1(self, attr1: typing.Union[typing.Iterable[str], str]) -> typing.Iterable[bytes]:
|
||||
def locateByAttr1(
|
||||
self, attr1: typing.Union[typing.Iterable[str], str]
|
||||
) -> typing.Iterable[bytes]:
|
||||
if isinstance(attr1, str):
|
||||
query = DBStorage.objects.filter(owner=self._owner, attr1=attr1) # @UndefinedVariable
|
||||
query = DBStorage.objects.filter(
|
||||
owner=self._owner, attr1=attr1
|
||||
) # @UndefinedVariable
|
||||
else:
|
||||
query = DBStorage.objects.filter(owner=self._owner, attr1__in=attr1) # @UndefinedVariable
|
||||
query = DBStorage.objects.filter(
|
||||
owner=self._owner, attr1__in=attr1
|
||||
) # @UndefinedVariable
|
||||
|
||||
for v in query:
|
||||
yield codecs.decode(v.data.encode(), 'base64')
|
||||
|
||||
def filter(self, attr1: typing.Optional[str] = None, forUpdate: bool = False) -> typing.Iterable[typing.Tuple[str, bytes, str]]:
|
||||
def filter(
|
||||
self, attr1: typing.Optional[str] = None, forUpdate: bool = False
|
||||
) -> typing.Iterable[typing.Tuple[str, bytes, str]]:
|
||||
if attr1 is None:
|
||||
query = DBStorage.objects.filter(owner=self._owner) # @UndefinedVariable
|
||||
else:
|
||||
query = DBStorage.objects.filter(owner=self._owner, attr1=attr1) # @UndefinedVariable
|
||||
query = DBStorage.objects.filter(
|
||||
owner=self._owner, attr1=attr1
|
||||
) # @UndefinedVariable
|
||||
|
||||
if forUpdate:
|
||||
query = query.select_for_update()
|
||||
@ -307,7 +383,9 @@ class Storage:
|
||||
for v in query: # @UndefinedVariable
|
||||
yield (v.key, codecs.decode(v.data.encode(), 'base64'), v.attr1)
|
||||
|
||||
def filterPickle(self, attr1: typing.Optional[str] = None, forUpdate: bool = False) -> typing.Iterable[typing.Tuple[str, typing.Any, str]]:
|
||||
def filterPickle(
|
||||
self, attr1: typing.Optional[str] = None, forUpdate: bool = False
|
||||
) -> typing.Iterable[typing.Tuple[str, typing.Any, str]]:
|
||||
for v in self.filter(attr1, forUpdate):
|
||||
yield (v[0], pickle.loads(v[1]), v[2])
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user