Skip to content

geeknam/esser

Folders and files

NameName
Last commit message
Last commit date

Latest commit

1669ef8 · Apr 16, 2017

History

49 Commits
Apr 16, 2017
Apr 16, 2017
Apr 16, 2017
Apr 3, 2017
Apr 12, 2017
Mar 28, 2017
Apr 2, 2017
Apr 10, 2017
Mar 19, 2017
Apr 16, 2017
Apr 2, 2017
Mar 28, 2017
Apr 12, 2017

Repository files navigation

esser - [E]vent [S]ourcing [Ser]verlessly

pypi version pypi package Build Status Coverage Status Code Issues Slack

  • Serverless + Pay-As-You-Go
  • Aggregates
  • Snapshots
  • Projections

Architectural Design

[Esser Diagram]

Features

  • Command validation
  • Datastore agnostic read layer
  • Push based messaging via DynamoDB Stream
  • Built-in Snapshotting
  • Publish / subscribe style signalling
  • Generated Cloudformation templates (Infrastructure as Code)

Components

  • Runtime: AWS Lambda (Python)
  • Append Only Event Store: DynamoDB
  • Event Source Triggers: DynamoDB Stream
  • Read / Query Store: PostgreSQL / Elasticsearch (via contrib)

Example Usage

Add first entity

items/aggregate.py

from esser.entities import Entity
from esser.registry import register
from items import commands
from items import receivers
from items.event_handler import ItemEventHandler


@register
class Item(Entity):
    
    # set event handler to aggregate state
    event_handler = ItemEventHandler()
    created = commands.CreateItem()
    price_updated = commands.UpdatePrice()

Add commands that can be issued

items/commands.py

from esser.commands import BaseCommand, CreateCommand


class CreateItem(CreateCommand):

    event_name = 'ItemCreated'
    schema = {
        'name': {'type': 'string'},
        'price': {'type': 'float'}
    }


class UpdatePrice(BaseCommand):

    event_name = 'PriceUpdated'
    schema = {
        'price': {'type': 'float', 'diff': True}
    }

Add event handler to fold event stream

items/event_handler.py

from esser.event_handler import BaseEventHandler

class ItemEventHandler(BaseEventHandler):

    def on_item_created(self, aggregate, next_event):
        return self.on_created(aggregate, next_event)

    def on_price_updated(self, aggregate, next_event):
        aggregate['price'] = next_event.event_data['price']
        return aggregate

Subscribe to events

items/receivers.py

from esser.signals.decorators import receiver
from esser.signals import event_pre_save, event_received, event_post_save
from esser.handlers import LambdaHandler
from items.commands import UpdatePrice


@receiver(event_pre_save, sender=UpdatePrice)
def presave_price_updated(sender, **kwargs):
    # Do something before saving the event
    pass


@receiver(event_received, sender=LambdaHandler)
def received_command(sender, **kwargs):
    # when the command is received
    pass

@receiver(event_post_save)
def handle_event_saved(sender, **kwargs):
    # when the event has already been saved
    pass