A library for event sourcing in Python.
Use pip to install the stable distribution from the Python Package Index.
$ pip install eventsourcing
You can use the library's Aggregate
class to define event sourced aggregates.
The example below uses the library's declarative syntax
to define an event sourced aggregate called World
.
The World
class uses the Aggregate
class and the event
decorator from the eventsourcing.domain
module.
from eventsourcing.domain import Aggregate, event
class World(Aggregate):
def __init__(self):
self.history = []
@event("SomethingHappened")
def make_it_so(self, what):
self.history.append(what)
The World
aggregate has a command method make_it_so()
which triggers
an event called SomethingHappened
that appends the given value
of what
to the history
attribute of an instance of World
.
It is generally recommended to use an imperative
style when naming command methods, and to name event classes
using past participles.
You can use the library's Application
class to define event sourced
applications. The Application
class brings together a domain model
and persistence infrastructure.
The Worlds
application in the example below uses the library's
Application
class and the World
aggregate defined above.
from eventsourcing.application import Application
class Worlds(Application):
def create_world(self):
world = World()
self.save(world)
return world.id
def make_it_so(self, world_id, what):
world = self.repository.get(world_id)
world.make_it_so(what)
self.save(world)
def get_history(self, world_id):
world = self.repository.get(world_id)
return world.history
The create_world()
factory method creates and saves a new World
aggregate instance, and returns the UUID of the new instance. The
make_it_so()
command method uses the given world_id
to retrieve
a World
aggregate instance from the application's repository
(reconstructs the aggregate from its stored events), then calls
the aggregate's make_it_so()
method, and then saves the aggregate
(collects and stores new aggregate event). The get_history()
query
method uses the given world_id
to retrieve an World
aggregate
from the application's repository and then returns the aggregate's
history.
Domain models and applications — base classes for domain model aggregates and applications. Suggests how to structure an event-sourced application.
Flexible event store — flexible persistence of domain events. Combines an event mapper and an event recorder in ways that can be easily extended. Mapper uses a transcoder that can be easily extended to support custom model object types. Recorders supporting different databases can be easily substituted and configured with environment variables.
Application-level encryption and compression — encrypts and decrypts events inside the application. This means data will be encrypted in transit across a network ("on the wire") and at disk level including backups ("at rest"), which is a legal requirement in some jurisdictions when dealing with personally identifiable information (PII) for example the EU's GDPR. Compression reduces the size of stored domain events and snapshots, usually by around 25% to 50% of the original size. Compression reduces the size of data in the database and decreases transit time across a network.
Snapshotting — reduces access-time for aggregates with many domain events.
Versioning - allows domain model changes to be introduced after an application has been deployed. Both domain events and aggregate classes can be versioned. The recorded state of an older version can be upcast to be compatible with a new version. Stored events and snapshots are upcast from older versions to new versions before the event or aggregate object is reconstructed.
Optimistic concurrency control — ensures a distributed or horizontally scaled application doesn't become inconsistent due to concurrent method execution. Leverages optimistic concurrency controls in adapted database management systems.
Notifications and projections — reliable propagation of application events with pull-based notifications allows the application state to be projected accurately into replicas, indexes, view models, and other applications. Supports materialized views and CQRS.
Event-driven systems — reliable event processing. Event-driven systems can be defined independently of particular persistence infrastructure and mode of running.
Detailed documentation — documentation provides general overview, introduction of concepts, explanation of usage, and detailed descriptions of library classes.
Worked examples — includes examples showing how to develop aggregates, applications and systems.
When the World
aggregate class is called, a created event object is
created and used to construct an instance of World
. And when the
command method make_it_so()
is called on an instance of
World
, a SomethingHappened
event is triggered with a value of
what
, which results in the what
value being appended to the
history
of the World
aggregate instance.
# Create a new aggregate.
world = World()
# Execute aggregate commands.
world.make_it_so('dinosaurs')
world.make_it_so('trucks')
world.make_it_so('internet')
# View aggregate state.
assert world.history[0] == 'dinosaurs'
assert world.history[1] == 'trucks'
assert world.history[2] == 'internet'
The resulting events can be collected using the collect_events()
method. The collect_events()
method is used by the Application
save()
method (see below).
# Collect events.
events = world.collect_events()
assert len(events) == 4
assert type(events[0]).__name__ == 'Created'
assert type(events[1]).__name__ == 'SomethingHappened'
assert type(events[2]).__name__ == 'SomethingHappened'
assert type(events[3]).__name__ == 'SomethingHappened'
The aggregate events can be used to reconstruct the state of the aggregate.
copy = None
for event in events:
copy = event.mutate(copy)
assert copy.history == world.history
assert copy.id == world.id
assert copy.version == world.version
assert copy.created_on == world.created_on
assert copy.modified_on == world.modified_on
This example can be adjusted and extended for any event sourced domain model.
Let's define a test that uses the Worlds
application. Below, we will
run this test with different persistence infrastructure.
from eventsourcing.persistence import RecordConflictError
from eventsourcing.system import NotificationLogReader
def test(app: Worlds, expect_visible_in_db: bool):
# Check app has zero event notifications.
assert len(app.log['1,10'].items) == 0, len(app.log['1,10'].items)
# Create a new aggregate.
world_id = app.create_world()
# Execute application commands.
app.make_it_so(world_id, 'dinosaurs')
app.make_it_so(world_id, 'trucks')
# Check recorded state of the aggregate.
assert app.get_history(world_id) == [
'dinosaurs',
'trucks'
]
# Execute another command.
app.make_it_so(world_id, 'internet')
# Check recorded state of the aggregate.
assert app.get_history(world_id) == [
'dinosaurs',
'trucks',
'internet'
]
# Check values are (or aren't visibible) in the database.
values = [b'dinosaurs', b'trucks', b'internet']
if expect_visible_in_db:
expected_num_visible = len(values)
else:
expected_num_visible = 0
actual_num_visible = 0
reader = NotificationLogReader(app.log)
for notification in reader.read(start=1):
for what in values:
if what in notification.state:
actual_num_visible += 1
break
assert expected_num_visible == actual_num_visible
# Get historical state (at version from above).
old = app.repository.get(world_id, version=3)
assert old.history[-1] == 'trucks' # internet not happened
assert len(old.history) == 2
# Check app has four event notifications.
assert len(app.log['1,10'].items) == 4, len(app.log['1,10'].items)
# Optimistic concurrency control (no branches).
old.make_it_so('future')
try:
app.save(old)
except RecordConflictError:
pass
else:
raise Exception("Shouldn't get here")
# Check app still has only four event notifications.
assert len(app.log['1,10'].items) == 4, len(app.log['1,10'].items)
# Read event notifications.
reader = NotificationLogReader(app.log)
notifications = list(reader.read(start=1))
assert len(notifications) == 4
# Create eight more aggregate events.
world_id = app.create_world()
app.make_it_so(world_id, 'plants')
app.make_it_so(world_id, 'fish')
app.make_it_so(world_id, 'mammals')
world_id = app.create_world()
app.make_it_so(world_id, 'morning')
app.make_it_so(world_id, 'afternoon')
app.make_it_so(world_id, 'evening')
# Get the new event notifications from the reader.
last_id = notifications[-1].id
notifications = list(reader.read(start=last_id + 1))
assert len(notifications) == 8
# Get all the event notifications from the reader.
last_id = notifications[-1].id
notifications = list(reader.read(start=1))
assert len(notifications) == 12
We can run the code in default "development" environment using the default "Plain Old Python Object" infrastructure. The example below runs with no compression or encryption of the stored events.
# Construct an application object.
app = Worlds()
# Run the test.
test(app, expect_visible_in_db=True)
Configure "production" environment using SQLite infrastructure.
The example below uses zlib and AES to compress and encrypt
the stored events. An in-memory SQLite database is used, but
it would work the same way if a file path were set as the SQLITE_DBNAME
.
import os
from eventsourcing.cipher import AESCipher
# Generate a cipher key (keep this safe).
cipher_key = AESCipher.create_key(num_bytes=32)
# Cipher key.
os.environ['CIPHER_KEY'] = cipher_key
# Cipher topic.
os.environ['CIPHER_TOPIC'] = 'eventsourcing.cipher:AESCipher'
# Compressor topic.
os.environ['COMPRESSOR_TOPIC'] = 'eventsourcing.compressor:ZlibCompressor'
# Use SQLite infrastructure.
os.environ['INFRASTRUCTURE_FACTORY'] = 'eventsourcing.sqlite:Factory'
os.environ['SQLITE_DBNAME'] = ':memory:' # Or path to a file on disk.
Run the code in "production" SQLite environment.
# Construct an application object.
app = Worlds()
# Run the test.
test(app, expect_visible_in_db=False)
Configure "production" environment using PostgresSQL infrastructure. The example below uses zlib and AES to compress and encrypt the stored events. It is assumed the database and database user have already been created, and the database server is running locally.
import os
from eventsourcing.cipher import AESCipher
# Generate a cipher key (keep this safe).
cipher_key = AESCipher.create_key(num_bytes=32)
# Cipher key.
os.environ['CIPHER_KEY'] = cipher_key
# Cipher topic.
os.environ['CIPHER_TOPIC'] = 'eventsourcing.cipher:AESCipher'
# Compressor topic.
os.environ['COMPRESSOR_TOPIC'] = 'eventsourcing.compressor:ZlibCompressor'
# Use Postgres infrastructure.
os.environ['INFRASTRUCTURE_FACTORY'] = 'eventsourcing.postgres:Factory'
os.environ['POSTGRES_DBNAME'] = 'eventsourcing'
os.environ['POSTGRES_HOST'] = '127.0.0.1'
os.environ['POSTGRES_USER'] = 'eventsourcing'
os.environ['POSTGRES_PASSWORD'] = 'eventsourcing'
Run the code in "production" PostgreSQL environment.
# Construct an application object.
app = Worlds()
# Run the test.
test(app, expect_visible_in_db=False)
This project is hosted on GitHub.
Please register questions, requests and issues on GitHub, or post in the project's Slack channel.
There is a Slack channel for this project, which you are welcome to join.
Please refer to the documentation for installation and usage guides.