Basic event system.

This commit is contained in:
Andrey Popp 2010-07-20 17:42:44 +04:00
parent cdae0d8805
commit 71447a813d
2 changed files with 216 additions and 0 deletions

72
generic/event.py Normal file
View File

@ -0,0 +1,72 @@
""" Event management system."""
from collections import namedtuple
from generic.registry import Registry
from generic.registry import TypeAxis
__all__ = ["Manager"]
class HandlerSet(namedtuple("HandlerSet", ["parents", "handlers"])):
""" Set of handlers for specific type of event."""
@property
def all_handlers(self):
seen = set()
for handler in self.handlers:
seen.add(handler)
yield handler
for parent in self.parents:
for handler in parent.all_handlers:
if not handler in seen:
seen.add(handler)
yield handler
class Manager(object):
""" Event manager."""
def __init__(self):
axes = (("event_type", TypeAxis()),)
self.registry = Registry(*axes)
def subscribe(self, handler, event_type):
""" Subscribe ``handler`` to specified ``event_type``."""
handler_set = self.registry.get_registration(event_type)
if not handler_set:
handler_set = self._register_handlers(event_type)
handler_set.handlers.add(handler)
def unsubscribe(self, handler, event_type):
""" Unsubscribe ``handler`` from ``event_type``."""
handler_set = self.registry.get_registration(event_type)
if handler_set and handler in handler_set.handlers:
handler_set.handlers.remove(handler)
def fire(self, event):
""" Fire event instance."""
handler_set = self.registry.lookup(event)
for handler in handler_set.all_handlers:
handler(event)
def _register_handlers(self, event_type):
""" Derive handlers by copying them for event's subclasses."""
handlers = set()
parents = event_type.__bases__
parent_handler_sets = []
for parent in parents:
parent_handlers = self.registry.get_registration(parent)
if parent_handlers is None:
parent_handlers = self._register_handlers(parent)
parent_handler_sets.append(parent_handlers)
handler_set = HandlerSet(parents=parent_handler_sets, handlers=handlers)
self.registry.register(handler_set, event_type)
return handler_set
_global_manager = Manager()
subscribe = _global_manager.subscribe
unsubscribe = _global_manager.unsubscribe
fire = _global_manager.fire

144
generic/tests/test_event.py Normal file
View File

@ -0,0 +1,144 @@
""" Tests for :module:`generic.event`."""
import unittest
__all__ = ["ManagerTests"]
class ManagerTests(unittest.TestCase):
def makeHandler(self, effect):
return lambda e: e.effects.append(effect)
def createManager(self):
from generic.event import Manager
return Manager()
def test_subscribe_single_event(self):
events = self.createManager()
events.subscribe(self.makeHandler("handler1"), EventA)
e = EventA()
events.fire(e)
self.assertEqual(len(e.effects), 1)
self.assertTrue("handler1" in e.effects)
def test_subscribe_event_inheritance(self):
events = self.createManager()
events.subscribe(self.makeHandler("handler1"), EventA)
events.subscribe(self.makeHandler("handler2"), EventB)
ea = EventA()
events.fire(ea)
self.assertEqual(len(ea.effects), 1)
self.assertTrue("handler1" in ea.effects)
eb = EventB()
events.fire(eb)
self.assertEqual(len(eb.effects), 2)
self.assertTrue("handler1" in eb.effects)
self.assertTrue("handler2" in eb.effects)
def test_subscribe_event_multiple_inheritance(self):
events = self.createManager()
events.subscribe(self.makeHandler("handler1"), EventA)
events.subscribe(self.makeHandler("handler2"), EventC)
events.subscribe(self.makeHandler("handler3"), EventD)
ea = EventA()
events.fire(ea)
self.assertEqual(len(ea.effects), 1)
self.assertTrue("handler1" in ea.effects)
ec = EventC()
events.fire(ec)
self.assertEqual(len(ec.effects), 1)
self.assertTrue("handler2" in ec.effects)
ed = EventD()
events.fire(ed)
self.assertEqual(len(ed.effects), 3)
self.assertTrue("handler1" in ed.effects)
self.assertTrue("handler2" in ed.effects)
self.assertTrue("handler3" in ed.effects)
def test_subscribe_event_malformed_multiple_inheritance(self):
events = self.createManager()
events.subscribe(self.makeHandler("handler1"), EventA)
events.subscribe(self.makeHandler("handler2"), EventD)
events.subscribe(self.makeHandler("handler3"), EventE)
ea = EventA()
events.fire(ea)
self.assertEqual(len(ea.effects), 1)
self.assertTrue("handler1" in ea.effects)
ed = EventD()
events.fire(ed)
self.assertEqual(len(ed.effects), 2)
self.assertTrue("handler1" in ed.effects)
self.assertTrue("handler2" in ed.effects)
ee = EventE()
events.fire(ee)
self.assertEqual(len(ee.effects), 3)
self.assertTrue("handler1" in ee.effects)
self.assertTrue("handler2" in ee.effects)
self.assertTrue("handler3" in ee.effects)
def test_subscribe_event_with_no_subscribers_in_the_middle_of_mro(self):
events = self.createManager()
events.subscribe(self.makeHandler("handler1"), Event)
events.subscribe(self.makeHandler("handler2"), EventB)
eb = EventB()
events.fire(eb)
self.assertEqual(len(eb.effects), 2)
self.assertTrue("handler1" in eb.effects)
self.assertTrue("handler2" in eb.effects)
def test_unsubscribe_single_event(self):
events = self.createManager()
handler = self.makeHandler("handler1")
events.subscribe(handler, EventA)
events.unsubscribe(handler, EventA)
e = EventA()
events.fire(e)
self.assertEqual(len(e.effects), 0)
def test_unsubscribe_event_inheritance(self):
events = self.createManager()
handler1 = self.makeHandler("handler1")
handler2 = self.makeHandler("handler2")
events.subscribe(handler1, EventA)
events.subscribe(handler2, EventB)
events.unsubscribe(handler1, EventA)
ea = EventA()
events.fire(ea)
self.assertEqual(len(ea.effects), 0)
eb = EventB()
events.fire(eb)
self.assertEqual(len(eb.effects), 1)
self.assertTrue("handler2" in eb.effects)
class Event(object):
def __init__(self):
self.effects = []
class EventA(Event):
pass
class EventB(EventA):
pass
class EventC(Event):
pass
class EventD(EventA, EventC):
pass
class EventE(EventD, EventA):
pass