Use generic library instead of a copy

This commit is contained in:
Arjan Molenaar 2019-11-09 09:57:22 +01:00
parent 7d7312497a
commit 032fef4b43
15 changed files with 18 additions and 990 deletions

View File

@ -12,10 +12,10 @@ from gaphor.UML.properties import association, redefine, relation
from gaphas.canvas import Connection
from gaphas.connector import Handle, Port
from generic.multidispatch import multidispatch, FunctionDispatcher
from gaphor import UML
from gaphor.diagram.presentation import ElementPresentation, LinePresentation
from gaphor.misc.generic.multidispatch import multidispatch, FunctionDispatcher
T = TypeVar("T", bound=UML.Element)

View File

@ -20,7 +20,7 @@ from __future__ import annotations
from typing import Type, TYPE_CHECKING
import abc
from gaphor.misc.generic.multidispatch import multidispatch, FunctionDispatcher
from generic.multidispatch import multidispatch, FunctionDispatcher
if TYPE_CHECKING:
from gaphor import UML

View File

@ -1,4 +0,0 @@
Copyright (c) 2010 Andrey Popp, All Rights Reserved
Registry implementation and tests for it (generic.registry).
Copyright (c) 2009-2010 Christopher Michael Rossi, All Rights Reserved

View File

@ -1,39 +0,0 @@
A copyright notice accompanies this license document that identifies
the copyright holders.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
1. Redistributions in source code must retain the accompanying
copyright notice, this list of conditions, and the following
disclaimer.
2. Redistributions in binary form must reproduce the accompanying
copyright notice, this list of conditions, and the following
disclaimer in the documentation and/or other materials provided
with the distribution.
3. Names of the copyright holders must not be used to endorse or
promote products derived from this software without prior
written permission from the copyright holders.
4. If any files are modified, you must cause the modified files to
carry prominent notices stating that you changed the files and
the date of any change.
Disclaimer
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND
ANY EXPRESSED OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
HOLDERS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.

View File

@ -1,94 +0,0 @@
""" Event management system.
This module provides API for event management. There are two APIs provided:
* Global event management API: subscribe, unsubscribe, handle.
* Local event management API: Manager
If you run only one instance of your application per Python
interpreter you can stick with global API, but if you want to have
more than one application instances running inside one interpreter and
to have different configurations for them -- you should use local API
and have one instance of Manager object per application instance.
"""
from typing import Callable, Set, Type
from gaphor.misc.generic.registry import Registry, TypeAxis
__all__ = "Manager"
Event = object
Handler = Callable[[object], None]
HandlerSet = Set[Handler]
class Manager:
""" Event manager
Provides API for subscribing for and firing events. There's also global
event manager instantiated at module level with functions
:func:`.subscribe`, :func:`.handle` and decorator :func:`.subscriber` aliased
to corresponding methods of class.
"""
registry: Registry[HandlerSet]
def __init__(self) -> None:
axes = (("event_type", TypeAxis()),)
self.registry = Registry(*axes)
def subscribe(self, handler: Handler, event_type: Type[Event]) -> None:
""" Subscribe ``handler`` to specified ``event_type``"""
handler_set = self.registry.get_registration(event_type)
if handler_set is None:
handler_set = self._register_handler_set(event_type)
handler_set.add(handler)
def unsubscribe(self, handler: Handler, event_type: Type[Event]) -> None:
""" Unsubscribe ``handler`` from ``event_type``"""
handler_set = self.registry.get_registration(event_type)
if handler_set and handler in handler_set:
handler_set.remove(handler)
def handle(self, event: Event) -> None:
""" Fire ``event``
All subscribers will be executed with no determined order.
"""
handler_sets = self.registry.query(event)
for handler_set in handler_sets:
if handler_set:
for handler in set(handler_set):
handler(event)
def _register_handler_set(self, event_type: Type[Event]) -> HandlerSet:
""" Register new handler set for ``event_type``.
"""
handler_set: HandlerSet = set()
self.registry.register(handler_set, event_type)
return handler_set
def subscriber(self, event_type: Type[Event]) -> Callable[[Handler], Handler]:
""" Decorator for subscribing handlers
Works like this:
>>> mymanager = Manager()
>>> class MyEvent():
... pass
>>> @mymanager.subscriber(MyEvent)
... def mysubscriber(evt):
... # handle event
... return
>>> mymanager.handle(MyEvent())
"""
def registrator(func: Handler) -> Handler:
self.subscribe(func, event_type)
return func
return registrator

View File

@ -1,132 +0,0 @@
""" Multidispatch for functions and methods.
This code is a Python 3, slimmed down version of the
generic package by Andrey Popp.
Only the generic function code is left in tact -- no generic methods.
The interface has been made in line with `functools.singledispatch`.
Note that this module does not support annotated functions.
"""
from __future__ import annotations
from typing import cast, Any, Callable, Generic, TypeVar, Union
import functools
import inspect
from gaphor.misc.generic.registry import Registry, TypeAxis
__all__ = "multidispatch"
T = TypeVar("T", bound=Union[Callable[..., Any], type])
KeyType = Union[type, None]
def multidispatch(*argtypes: KeyType) -> Callable[[T], FunctionDispatcher[T]]:
""" Declare function as multidispatch
This decorator takes ``argtypes`` argument types and replace decorated
function with :class:`.FunctionDispatcher` object, which is responsible for
multiple dispatch feature.
"""
def _replace_with_dispatcher(func: T) -> FunctionDispatcher[T]:
nonlocal argtypes
argspec = inspect.getfullargspec(func)
if not argtypes:
arity = _arity(argspec)
if isinstance(func, type):
# It's a class we deal with:
arity -= 1
argtypes = (object,) * arity
dispatcher = cast(
FunctionDispatcher[T],
functools.update_wrapper(FunctionDispatcher(argspec, len(argtypes)), func),
)
dispatcher.register_rule(func, *argtypes)
return dispatcher
return _replace_with_dispatcher
class FunctionDispatcher(Generic[T]):
""" Multidispatcher for functions
This object dispatch calls to function by its argument types. Usually it is
produced by :func:`.multidispatch` decorator.
You should not manually create objects of this type.
"""
registry: Registry[T]
def __init__(self, argspec: inspect.FullArgSpec, params_arity: int) -> None:
""" Initialize dispatcher with ``argspec`` of type
:class:`inspect.ArgSpec` and ``params_arity`` that represent number
params."""
# Check if we have enough positional arguments for number of type params
if _arity(argspec) < params_arity:
raise TypeError(
"Not enough positional arguments "
"for number of type parameters provided."
)
self.argspec = argspec
self.params_arity = params_arity
axis = [(f"arg_{n:d}", TypeAxis()) for n in range(params_arity)]
self.registry = Registry(*axis)
def check_rule(self, rule: T, *argtypes: KeyType) -> None:
# Check if we have the right number of parametrized types
if len(argtypes) != self.params_arity:
raise TypeError(
f"Wrong number of type parameters: have {len(argtypes)}, expected {self.params_arity}."
)
# Check if we have the same argspec (by number of args)
rule_argspec = inspect.getfullargspec(rule)
left_spec = tuple(x and len(x) or 0 for x in rule_argspec[:4])
right_spec = tuple(x and len(x) or 0 for x in self.argspec[:4])
if left_spec != right_spec:
raise TypeError(
f"Rule does not conform to previous implementations: {left_spec} != {right_spec}."
)
def register_rule(self, rule: T, *argtypes: KeyType) -> None:
""" Register new ``rule`` for ``argtypes``."""
self.check_rule(rule, *argtypes)
self.registry.register(rule, *argtypes)
def register(self, *argtypes: KeyType) -> Callable[[T], T]:
""" Decorator for registering new case for multidispatch
New case will be registered for types identified by ``argtypes``. The
length of ``argtypes`` should be equal to the length of ``argtypes``
argument were passed corresponding :func:`.multidispatch` call, which
also indicated the number of arguments multidispatch dispatches on.
"""
def register_rule(func: T) -> T:
self.register_rule(func, *argtypes)
return func
return register_rule
def __call__(self, *args: Any, **kwargs: Any) -> Any:
""" Dispatch call to appropriate rule."""
trimmed_args = args[: self.params_arity]
rule = self.registry.lookup(*trimmed_args)
if not rule:
raise TypeError(f"No available rule found for {trimmed_args!r}")
return rule(*args, **kwargs)
def _arity(argspec: inspect.FullArgSpec) -> int:
""" Determinal positional arity of argspec."""
args = argspec.args if argspec.args else []
defaults = argspec.defaults if argspec.defaults else []
return len(args) - len(defaults)

View File

@ -1,164 +0,0 @@
""" Registry.
This implementation was borrowed from happy[1] project by Chris Rossi.
[1]: http://bitbucket.org/chrisrossi/happy
"""
from __future__ import annotations
__all__ = ("Registry", "SimpleAxis", "TypeAxis")
from typing import (
Any,
Dict,
Generic,
KeysView,
List,
Generator,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
)
K = TypeVar("K")
S = TypeVar("S")
T = TypeVar("T")
V = TypeVar("V")
Axis = Union["SimpleAxis", "TypeAxis"]
class Registry(Generic[T]):
""" Registry implementation."""
def __init__(self, *axes: Tuple[str, Axis]):
self._tree: _TreeNode[T] = _TreeNode()
self._axes = [axis for name, axis in axes]
self._axes_dict = {name: (i, axis) for i, (name, axis) in enumerate(axes)}
def register(self, target: T, *arg_keys: K, **kw_keys: K) -> None:
tree_node = self._tree
for key in self._align_with_axes(arg_keys, kw_keys):
tree_node = tree_node.setdefault(key, _TreeNode[T]())
if not tree_node.target is None:
raise ValueError(
f"Registration for {target} conflicts with existing registration {tree_node.target}."
)
tree_node.target = target
def get_registration(self, *arg_keys: K, **kw_keys: K) -> Optional[T]:
tree_node = self._tree
for key in self._align_with_axes(arg_keys, kw_keys):
if not key in tree_node:
return None
tree_node = tree_node[key]
return tree_node.target
def lookup(self, *arg_objs: V, **kw_objs: V) -> Optional[T]:
return next(self.query(*arg_objs, **kw_objs), None)
def query(self, *arg_objs: V, **kw_objs: V) -> Generator[Optional[T], None, None]:
objs = self._align_with_axes(arg_objs, kw_objs)
axes = self._axes
return self._query(self._tree, objs, axes)
def _query(
self, tree_node: _TreeNode[T], objs: Sequence[Optional[V]], axes: Sequence[Axis]
) -> Generator[Optional[T], None, None]:
""" Recursively traverse registration tree, from left to right, most
specific to least specific, returning the first target found on a
matching node. """
if not objs:
yield tree_node.target
else:
obj = objs[0]
# Skip non-participating nodes
if obj is None:
next_node: Optional[_TreeNode[T]] = tree_node.get(None, None)
if next_node is not None:
yield from self._query(next_node, objs[1:], axes[1:])
else:
# Get matches on this axis and iterate from most to least specific
axis = axes[0]
for match_key in axis.matches(obj, tree_node.keys()):
yield from self._query(tree_node[match_key], objs[1:], axes[1:])
def _align_with_axes(
self, args: Sequence[S], kw: Dict[str, S]
) -> Sequence[Optional[S]]:
""" Create a list matching up all args and kwargs with their
corresponding axes, in order, using ``None`` as a placeholder for
skipped axes. """
axes_dict = self._axes_dict
aligned: List[Optional[S]] = [None for i in range(len(axes_dict))]
args_len = len(args)
if args_len + len(kw) > len(aligned):
raise ValueError("Cannot have more arguments than axes.")
for i, arg in enumerate(args):
aligned[i] = arg
for k, v in kw.items():
i_axis = axes_dict.get(k, None)
if i_axis is None:
raise ValueError(f"No axis with name: {k}")
i, axis = i_axis
if aligned[i] is not None:
raise ValueError(
"Axis defined twice between positional and " "keyword arguments"
)
aligned[i] = v
# Trim empty tail nodes for faster look ups
while aligned and aligned[-1] is None:
del aligned[-1]
return aligned
class _TreeNode(Generic[T], Dict[Any, Any]):
target: Optional[T] = None
def __str__(self) -> str:
return f"<TreeNode {self.target} {dict.__str__(self)}>"
class SimpleAxis:
""" A simple axis where the key into the axis is the same as the object to
be matched (aka the identity axis). This axis behaves just like a
dictionary. You might use this axis if you are interested in registering
something by name, where you're registering an object with the string that
is the name and then using the name to look it up again later.
Subclasses can override the ``get_keys`` method for implementing arbitrary
axes.
"""
def matches(
self, obj: object, keys: KeysView[Optional[object]]
) -> Generator[object, None, None]:
for key in [obj]:
if key in keys:
yield obj
class TypeAxis:
""" An axis which matches the class and super classes of an object in
method resolution order.
"""
def matches(
self, obj: object, keys: KeysView[Optional[type]]
) -> Generator[type, None, None]:
for key in type(obj).mro():
if key in keys:
yield key

View File

@ -1,181 +0,0 @@
""" Tests for :module:`gaphor.misc.generic.event`."""
from __future__ import annotations
from typing import Callable, List
from gaphor.misc.generic.event import Manager
def make_handler(effect: object) -> Callable[[Event], None]:
return lambda e: e.effects.append(effect)
def create_manager():
return Manager()
def test_subscribe_single_event():
events = create_manager()
events.subscribe(make_handler("handler1"), EventA)
e = EventA()
events.handle(e)
assert len(e.effects) == 1
assert "handler1" in e.effects
def test_subscribe_via_decorator():
events = create_manager()
events.subscriber(EventA)(make_handler("handler1"))
e = EventA()
events.handle(e)
assert len(e.effects) == 1
assert "handler1" in e.effects
def test_subscribe_event_inheritance():
events = create_manager()
events.subscribe(make_handler("handler1"), EventA)
events.subscribe(make_handler("handler2"), EventB)
ea = EventA()
events.handle(ea)
assert len(ea.effects) == 1
assert "handler1" in ea.effects
eb = EventB()
events.handle(eb)
assert len(eb.effects) == 2
assert "handler1" in eb.effects
assert "handler2" in eb.effects
def test_subscribe_event_multiple_inheritance():
events = create_manager()
events.subscribe(make_handler("handler1"), EventA)
events.subscribe(make_handler("handler2"), EventC)
events.subscribe(make_handler("handler3"), EventD)
ea = EventA()
events.handle(ea)
assert len(ea.effects) == 1
assert "handler1" in ea.effects
ec = EventC()
events.handle(ec)
assert len(ec.effects) == 1
assert "handler2" in ec.effects
ed = EventD()
events.handle(ed)
assert len(ed.effects) == 3
assert "handler1" in ed.effects
assert "handler2" in ed.effects
assert "handler3" in ed.effects
def test_subscribe_no_events():
events = create_manager()
ea = EventA()
events.handle(ea)
assert len(ea.effects) == 0
def test_subscribe_base_event():
events = create_manager()
events.subscribe(make_handler("handler1"), EventA)
ea = EventB()
events.handle(ea)
assert len(ea.effects) == 1
assert "handler1" in ea.effects
def test_subscribe_event_malformed_multiple_inheritance():
events = create_manager()
events.subscribe(make_handler("handler1"), EventA)
events.subscribe(make_handler("handler2"), EventD)
events.subscribe(make_handler("handler3"), EventE)
ea = EventA()
events.handle(ea)
assert len(ea.effects) == 1
assert "handler1" in ea.effects
ed = EventD()
events.handle(ed)
assert len(ed.effects) == 2
assert "handler1" in ed.effects
assert "handler2" in ed.effects
ee = EventE()
events.handle(ee)
assert len(ee.effects) == 3
assert "handler1" in ee.effects
assert "handler2" in ee.effects
assert "handler3" in ee.effects
def test_subscribe_event_with_no_subscribers_in_the_middle_of_mro():
events = create_manager()
events.subscribe(make_handler("handler1"), Event)
events.subscribe(make_handler("handler2"), EventB)
eb = EventB()
events.handle(eb)
assert len(eb.effects) == 2
assert "handler1" in eb.effects
assert "handler2" in eb.effects
def test_unsubscribe_single_event():
events = create_manager()
handler = make_handler("handler1")
events.subscribe(handler, EventA)
events.unsubscribe(handler, EventA)
e = EventA()
events.handle(e)
assert len(e.effects) == 0
def test_unsubscribe_event_inheritance():
events = create_manager()
handler1 = make_handler("handler1")
handler2 = make_handler("handler2")
events.subscribe(handler1, EventA)
events.subscribe(handler2, EventB)
events.unsubscribe(handler1, EventA)
ea = EventA()
events.handle(ea)
assert len(ea.effects) == 0
eb = EventB()
events.handle(eb)
assert len(eb.effects) == 1
assert "handler2" in eb.effects
class Event:
def __init__(self) -> None:
self.effects: List[object] = []
class EventA(Event):
pass
class EventB(EventA):
pass
class EventC(Event):
pass
class EventD(EventA, EventC):
pass
class EventE(EventD, EventA):
pass

View File

@ -1,224 +0,0 @@
""" Tests for :module:`gaphor.misc.generic.multidispatch`."""
import pytest
from inspect import FullArgSpec
from gaphor.misc.generic.multidispatch import multidispatch, FunctionDispatcher
def create_dispatcher(
params_arity, args=None, varargs=None, keywords=None, defaults=None
) -> FunctionDispatcher:
return FunctionDispatcher(
FullArgSpec(
args=args,
varargs=varargs,
varkw=keywords,
defaults=defaults,
kwonlyargs=[],
kwonlydefaults={},
annotations={},
),
params_arity,
)
def test_one_argument():
dispatcher = create_dispatcher(1, args=["x"])
dispatcher.register_rule(lambda x: x + 1, int)
assert dispatcher(1) == 2
with pytest.raises(TypeError):
dispatcher("s")
dispatcher.register_rule(lambda x: x + "1", str)
assert dispatcher(1) == 2
assert dispatcher("1") == "11"
with pytest.raises(TypeError):
dispatcher(tuple())
def test_two_arguments():
dispatcher = create_dispatcher(2, args=["x", "y"])
dispatcher.register_rule(lambda x, y: x + y + 1, int, int)
assert dispatcher(1, 2) == 4
with pytest.raises(TypeError):
dispatcher("s", "ss")
with pytest.raises(TypeError):
dispatcher(1, "ss")
with pytest.raises(TypeError):
dispatcher("s", 2)
dispatcher.register_rule(lambda x, y: x + y + "1", str, str)
assert dispatcher(1, 2) == 4
assert dispatcher("1", "2") == "121"
with pytest.raises(TypeError):
dispatcher("1", 1)
with pytest.raises(TypeError):
dispatcher(1, "1")
dispatcher.register_rule(lambda x, y: str(x) + y + "1", int, str)
assert dispatcher(1, 2) == 4
assert dispatcher("1", "2") == "121"
assert dispatcher(1, "2") == "121"
with pytest.raises(TypeError):
dispatcher("1", 1)
def test_bottom_rule():
dispatcher = create_dispatcher(1, args=["x"])
dispatcher.register_rule(lambda x: x, object)
assert dispatcher(1) == 1
assert dispatcher("1") == "1"
assert dispatcher([1]) == [1]
assert dispatcher((1,)) == (1,)
def test_subtype_evaluation():
class Super:
pass
class Sub(Super):
pass
dispatcher = create_dispatcher(1, args=["x"])
dispatcher.register_rule(lambda x: x, Super)
o_super = Super()
assert dispatcher(o_super) == o_super
o_sub = Sub()
assert dispatcher(o_sub) == o_sub
with pytest.raises(TypeError):
dispatcher(object())
dispatcher.register_rule(lambda x: (x, x), Sub)
o_super = Super()
assert dispatcher(o_super) == o_super
o_sub = Sub()
assert dispatcher(o_sub) == (o_sub, o_sub)
def test_register_rule_with_wrong_arity():
dispatcher = create_dispatcher(1, args=["x"])
dispatcher.register_rule(lambda x: x, int)
with pytest.raises(TypeError):
dispatcher.register_rule(lambda x, y: x, str)
def test_register_rule_with_different_arg_names():
dispatcher = create_dispatcher(1, args=["x"])
dispatcher.register_rule(lambda y: y, int)
assert dispatcher(1) == 1
def test_dispatching_with_varargs():
dispatcher = create_dispatcher(1, args=["x"], varargs="va")
dispatcher.register_rule(lambda x, *va: x, int)
assert dispatcher(1) == 1
with pytest.raises(TypeError):
dispatcher("1", 2, 3)
def test_dispatching_with_varkw():
dispatcher = create_dispatcher(1, args=["x"], keywords="vk")
dispatcher.register_rule(lambda x, **vk: x, int)
assert dispatcher(1) == 1
with pytest.raises(TypeError):
dispatcher("1", a=1, b=2)
def test_dispatching_with_kw():
dispatcher = create_dispatcher(1, args=["x", "y"], defaults=["vk"])
dispatcher.register_rule(lambda x, y=1: x, int)
assert dispatcher(1) == 1
with pytest.raises(TypeError):
dispatcher("1", k=1)
def test_create_dispatcher_with_pos_args_less_multi_arity():
with pytest.raises(TypeError):
create_dispatcher(2, args=["x"])
with pytest.raises(TypeError):
create_dispatcher(2, args=["x", "y"], defaults=["x"])
def test_register_rule_with_wrong_number_types_parameters():
dispatcher = create_dispatcher(1, args=["x", "y"])
with pytest.raises(TypeError):
dispatcher.register_rule(lambda x, y: x, int, str)
def test_register_rule_with_partial_dispatching():
dispatcher = create_dispatcher(1, args=["x", "y"])
dispatcher.register_rule(lambda x, y: x, int)
assert dispatcher(1, 2) == 1
assert dispatcher(1, "2") == 1
with pytest.raises(TypeError):
dispatcher("2", 1)
dispatcher.register_rule(lambda x, y: x, str)
assert dispatcher(1, 2) == 1
assert dispatcher(1, "2") == 1
assert dispatcher("1", "2") == "1"
assert dispatcher("1", 2) == "1"
def test_default_dispatcher():
@multidispatch(int, str)
def func(x, y):
return str(x) + y
assert func(1, "2") == "12"
with pytest.raises(TypeError):
func(1, 2)
with pytest.raises(TypeError):
func("1", 2)
with pytest.raises(TypeError):
func("1", "2")
def test_multiple_functions():
@multidispatch(int, str)
def func(x, y):
return str(x) + y
@func.register(str, str)
def _(x, y):
return x + y
assert func(1, "2") == "12"
assert func("1", "2") == "12"
with pytest.raises(TypeError):
func(1, 2)
with pytest.raises(TypeError):
func("1", 2)
def test_default():
@multidispatch()
def func(x, y):
return x + y
@func.register(str, str)
def _(x, y):
return y + x
assert func(1, 1) == 2
assert func("1", "2") == "21"
def test_on_classes():
@multidispatch()
class A:
def __init__(self, a, b):
self.v = a + b
@A.register(str, str)
class B:
def __init__(self, a, b):
self.v = b + a
assert A(1, 1).v == 2
assert A("1", "2").v == "21"

View File

@ -1,147 +0,0 @@
""" Tests for :module:`gaphor.misc.generic.registry`."""
import pytest
from typing import Union
from gaphor.misc.generic.registry import Registry, SimpleAxis, TypeAxis
class DummyA:
pass
class DummyB(DummyA):
pass
def test_one_axis_no_specificity():
registry: Registry[object] = Registry(("foo", SimpleAxis()))
a = object()
b = object()
registry.register(a)
registry.register(b, "foo")
assert registry.lookup() == a
assert registry.lookup("foo") == b
assert registry.lookup("bar") is None
def test_subtyping_on_axes():
registry: Registry[str] = Registry(("type", TypeAxis()))
target1 = "one"
registry.register(target1, object)
target2 = "two"
registry.register(target2, DummyA)
target3 = "three"
registry.register(target3, DummyB)
assert registry.lookup(object()) == target1
assert registry.lookup(DummyA()) == target2
assert registry.lookup(DummyB()) == target3
def test_query_subtyping_on_axes():
registry: Registry[str] = Registry(("type", TypeAxis()))
target1 = "one"
registry.register(target1, object)
target2 = "two"
registry.register(target2, DummyA)
target3 = "three"
registry.register(target3, DummyB)
target4 = "four"
registry.register(target4, int)
assert list(registry.query(object())) == [target1]
assert list(registry.query(DummyA())) == [target2, target1]
assert list(registry.query(DummyB())) == [target3, target2, target1]
assert list(registry.query(3)) == [target4, target1]
def test_two_axes():
registry: Registry[Union[str, object]] = Registry(
("type", TypeAxis()), ("name", SimpleAxis())
)
target1 = "one"
registry.register(target1, object)
target2 = "two"
registry.register(target2, DummyA)
target3 = "three"
registry.register(target3, DummyA, "foo")
context1 = object()
assert registry.lookup(context1) == target1
context2 = DummyB()
assert registry.lookup(context2) == target2
assert registry.lookup(context2, "foo") == target3
target4 = object()
registry.register(target4, DummyB)
assert registry.lookup(context2) == target4
assert registry.lookup(context2, "foo") == target3
def test_get_registration():
registry: Registry[str] = Registry(("type", TypeAxis()), ("name", SimpleAxis()))
registry.register("one", object)
registry.register("two", DummyA, "foo")
assert registry.get_registration(object) == "one"
assert registry.get_registration(DummyA, "foo") == "two"
assert registry.get_registration(object, "foo") is None
assert registry.get_registration(DummyA) is None
def test_register_too_many_keys():
registry: Registry[type] = Registry(("name", SimpleAxis()))
with pytest.raises(ValueError):
registry.register(object, "one", "two")
def test_lookup_too_many_keys():
registry: Registry[object] = Registry(("name", SimpleAxis()))
with pytest.raises(ValueError):
registry.register(registry.lookup("one", "two"))
def test_conflict_error():
registry: Registry[Union[object, type]] = Registry(("name", SimpleAxis()))
registry.register(object(), name="foo")
with pytest.raises(ValueError):
registry.register(object, "foo")
def test_skip_nodes():
registry: Registry[str] = Registry(
("one", SimpleAxis()), ("two", SimpleAxis()), ("three", SimpleAxis())
)
registry.register("foo", one=1, three=3)
assert registry.lookup(1, three=3) == "foo"
def test_miss():
registry: Registry[str] = Registry(
("one", SimpleAxis()), ("two", SimpleAxis()), ("three", SimpleAxis())
)
registry.register("foo", 1, 2)
assert registry.lookup(one=1, three=3) is None
def test_bad_lookup():
registry: Registry[int] = Registry(("name", SimpleAxis()), ("grade", SimpleAxis()))
with pytest.raises(ValueError):
registry.register(1, foo=1)
with pytest.raises(ValueError):
registry.lookup(foo=1)
with pytest.raises(ValueError):
registry.register(1, "foo", name="foo")

View File

@ -5,9 +5,9 @@ Event Manager.
from typing import Optional, Sequence, Type
from generic.event import Manager as _Manager
from generic.event import Handler, Event
from gaphor.abc import Service
from gaphor.misc.generic.event import Manager as _Manager
from gaphor.misc.generic.event import Handler, Event
def event_handler(*event_types):

14
poetry.lock generated
View File

@ -181,6 +181,14 @@ PyGObject = ">=3.20.0,<4.0.0"
future = ">=0.17.0,<0.18.0"
pycairo = ">=1.13.0,<2.0.0"
[[package]]
category = "main"
description = "Generic programming library for Python"
name = "generic"
optional = false
python-versions = ">=3.6,<4.0"
version = "1.0.0b1"
[[package]]
category = "dev"
description = "File identification library for Python"
@ -712,7 +720,7 @@ docs = ["sphinx", "jaraco.packaging (>=3.2)", "rst.linker (>=1.9)"]
testing = ["pathlib2", "contextlib2", "unittest2"]
[metadata]
content-hash = "8c396e55551077f59a19af99ce2d424bc8b377d88a1906c1d7ca3162830ee4a1"
content-hash = "a09f15ea3ab25d85b7a4babb6a0f51efa0edb607326b7476bdba13a4ce32ce1b"
python-versions = "^3.7"
[metadata.files]
@ -818,6 +826,10 @@ gaphas = [
{file = "gaphas-1.1.2-py2.py3-none-any.whl", hash = "sha256:11d248941b3bd9890323a61c438f1945839bb3cd91bffa3583d4179091cfc8bd"},
{file = "gaphas-1.1.2.tar.gz", hash = "sha256:f1254f64b709da5722878e1b43a435bb199d12e836f255d517b42fe94ac45b26"},
]
generic = [
{file = "generic-1.0.0b1-py3-none-any.whl", hash = "sha256:c7c8778e6ede6aeb7b2525626584faea98aa141b11ea891d03061cf79313146e"},
{file = "generic-1.0.0b1.tar.gz", hash = "sha256:f883a065a02dc64b9584e8f9d7738a39159e138e90ef84755213eb5b4a235f02"},
]
identify = [
{file = "identify-1.4.7-py2.py3-none-any.whl", hash = "sha256:4f1fe9a59df4e80fcb0213086fcf502bc1765a01ea4fe8be48da3b65afd2a017"},
{file = "identify-1.4.7.tar.gz", hash = "sha256:d8919589bd2a5f99c66302fec0ef9027b12ae150b0b0213999ad3f695fc7296e"},

View File

@ -38,6 +38,7 @@ PyGObject = "^3.30"
gaphas = "^1.0.0"
importlib_metadata = "^0.23"
typing_extensions = "^3.7.4"
generic = {version = "^1.0.0-beta.1", allows-prereleases = true}
[tool.poetry.dev-dependencies]
tomlkit = "^0.5"