Skip to content

Commit 89b7be1

Browse files
committed
Add signalling with event_received, event_pre_save. Fixes #10
1 parent 80c9949 commit 89b7be1

File tree

7 files changed

+109
-27
lines changed

7 files changed

+109
-27
lines changed

esser/events.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from esser.validators import EsserValidator
2-
from esser.constants import AGGREGATE_KEY_DELIMITER
32
from esser.exceptions import EventValidationException
3+
from esser.signals import event_pre_save
44

55

66
class BaseEvent(object):
@@ -31,9 +31,19 @@ def attach_entity(self, entity):
3131
setattr(self.validator, 'aggregate', entity)
3232

3333
def persist(self, attrs):
34+
event_version = self.get_version()
35+
# dispatch the signal before persisting
36+
event_pre_save.send(
37+
sender=self.__class__,
38+
aggregate_name=self.entity.aggregate_name,
39+
aggregate_id=self.entity.aggregate_id,
40+
event_name=self.event_name,
41+
version=event_version,
42+
payload=attrs,
43+
)
3444
return self.entity.repository.persist(
3545
aggregate_id=self.entity.aggregate_id,
36-
version=self.get_version(),
46+
version=event_version,
3747
event_type=self.event_name,
3848
attrs=attrs
3949
)

esser/handlers/__init__.py

Lines changed: 38 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import importlib
22
from collections import defaultdict
3+
from esser.signals import event_received
34
from esser.registry import registry
45

56

@@ -14,27 +15,40 @@ def get_aggregate(aggregate_name, aggregate_id):
1415
return aggregate
1516

1617

17-
def handle_event(event, context):
18-
event_name = event['EventName']
19-
aggregate_id = event.get('AggregateId', None)
20-
aggregate = get_aggregate(event['AggregateName'], aggregate_id)
21-
event_class_attr = None
22-
for event_key, cls in aggregate.__class__.__dict__.items():
23-
if cls.__class__.__name__ == event_name:
24-
event_class_attr = event_key
25-
aggregate_event = getattr(aggregate, event_class_attr, None)
26-
if aggregate_event:
27-
return aggregate_event.save(attrs=event['Payload'])
28-
return
29-
30-
31-
def handle_stream(event, context):
32-
aggregates = defaultdict(dict)
33-
for record in event['Records']:
34-
keys = record['dynamodb']['Keys']
35-
aggregate_name = keys['aggregate_name']['S']
36-
aggregate_key = keys['aggregate_key']['S']
37-
aggregate_id = aggregate_key.split(':')[0]
38-
aggregate = get_aggregate(aggregate_name, aggregate_id)
39-
aggregates[aggregate_name][aggregate_id] = aggregate.current_state
40-
return aggregates
18+
class LambdaHandler(object):
19+
20+
def handle_event(self, event, context):
21+
event_name = event['EventName']
22+
aggregate_id = event.get('AggregateId', None)
23+
aggregate = get_aggregate(event['AggregateName'], aggregate_id)
24+
event_received.send(
25+
sender=self.__class__,
26+
aggregate_name=event['AggregateName'],
27+
aggregate_id=aggregate_id,
28+
payload=event['Payload']
29+
)
30+
event_class_attr = None
31+
for event_key, cls in aggregate.__class__.__dict__.items():
32+
if cls.__class__.__name__ == event_name:
33+
event_class_attr = event_key
34+
aggregate_event = getattr(aggregate, event_class_attr, None)
35+
if aggregate_event:
36+
return aggregate_event.save(attrs=event['Payload'])
37+
return
38+
39+
def handle_stream(self, event, context):
40+
aggregates = defaultdict(dict)
41+
for record in event['Records']:
42+
keys = record['dynamodb']['Keys']
43+
aggregate_name = keys['aggregate_name']['S']
44+
aggregate_key = keys['aggregate_key']['S']
45+
aggregate_id = aggregate_key.split(':')[0]
46+
aggregate = get_aggregate(aggregate_name, aggregate_id)
47+
aggregates[aggregate_name][aggregate_id] = aggregate.current_state
48+
return aggregates
49+
50+
51+
handler = LambdaHandler()
52+
53+
handle_event = handler.handle_event
54+
handle_stream = handler.handle_stream

esser/signals/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import dispatch
2+
3+
event_received = dispatch.Signal(
4+
providing_args=['aggregate_name', 'aggregate_id', 'payload']
5+
)
6+
7+
event_pre_save = dispatch.Signal(
8+
providing_args=[
9+
'aggregate_name', 'aggregate_id', 'payload',
10+
'event_name', 'version',
11+
]
12+
)
13+
14+
event_post_save = dispatch.Signal(
15+
providing_args=[
16+
'aggregate_name', 'aggregate_id', 'payload',
17+
'event_name', 'version',
18+
]
19+
)

esser/signals/decorators.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
2+
3+
def receiver(signal, **kwargs):
4+
"""
5+
A decorator for connecting receivers to signals. Used by passing in the
6+
signal (or list of signals) and keyword arguments to connect::
7+
@receiver(post_save, sender=Sender)
8+
def signal_receiver(sender, **kwargs):
9+
...
10+
@receiver([post_save, post_delete], sender=Sender)
11+
def signals_receiver(sender, **kwargs):
12+
...
13+
"""
14+
def _decorator(func):
15+
if isinstance(signal, (list, tuple)):
16+
for s in signal:
17+
s.connect(func, **kwargs)
18+
else:
19+
signal.connect(func, **kwargs)
20+
return func
21+
return _decorator

examples/items/aggregate.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from esser.reducer import BaseReducer
33
from esser.registry import register
44
from items import events
5+
from items import receivers
56

67

78
class ItemReducer(BaseReducer):

examples/items/receivers.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from esser.signals.decorators import receiver
2+
from esser.signals import event_pre_save, event_received
3+
from esser.handlers import LambdaHandler
4+
from items.events import PriceUpdated
5+
6+
7+
@receiver(event_pre_save, sender=PriceUpdated)
8+
def check_price_update(sender, **kwargs):
9+
print sender
10+
print kwargs
11+
12+
13+
@receiver(event_received, sender=LambdaHandler)
14+
def do_something(sender, **kwargs):
15+
print sender
16+
print kwargs

requirements/base.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pynamodb
2-
cerberus
2+
cerberus
3+
dispatcher

0 commit comments

Comments
 (0)