Skip to content

Latest commit

 

History

History
1425 lines (1145 loc) · 65.9 KB

es-cqrs-anatomy.livemd

File metadata and controls

1425 lines (1145 loc) · 65.9 KB

ES/CQRS's Anatomy

Mix.install([
  {:kino_db, "~> 0.2.13"},
  {:postgrex, "~> 0.19.1"}
])

Introduction

What's Event Sourcing?

Event sourcing is like a historical timeline for your data. Instead of storing a snapshot of the current state, event sourcing keeps a record of every change that has happened to your data. This allows you to reconstruct the data's past state and track its evolution over time.

Think of it as a medical record for your data. Just as a doctor can review a patient's medical history to diagnose an illness, you can use event sourcing to understand how your data has changed and identify any anomalies.

This approach is especially useful for applications that need strong audit trails, complex state tracking, or flexible scalability. However, it also comes with higher storage requirements and computational complexity.

graph LR;
  A(Order Created) -->B(Item 1 Added) -->C(Item 2 Added) --> D(Order Paid) --> E(Order Shipped);
Loading

What's CQRS?

CQRS (Command Query Responsibility Segregation) is like having two separate sets of instructions for your data: one for changing it (commands) and one for reading it (queries). This separation allows for specializing each set of instructions, making data manipulation and retrieval more efficient, especially for applications with high write or read loads.

Let's put ALL TOGETHER

graph LR;
  C(Command) -->|Invoked on| AGG([Aggregate]) -->|Generates|E(Event) -->|Stored to| ES[(Event Store)];
  PR(Projector) -->|Read from| ES;
  Q(Query) -->|Query to| PN([Projection]) -->|Retrive from| RS[(Read Store)];
  PR -->|Write to| RS


Loading

A Command is routed to an Aggregate, that emit an Event. Then the event is stored into the Event Store. The Projector subscribe some events and build one or more table into the Read Store. The Projection retrieve data from Read Store based on a Query

What's Event Storming

Event Storming is a collaborative workshop technique for understanding and designing complex business processes or software systems. It involves a visual approach where participants use sticky notes to represent events, commands, aggregates, and other key elements on a large workspace. The goal is to foster communication, shared understanding, and uncover insights into the dynamics of the system or process being modeled. Event Storming is commonly used in agile development, domain-driven design, and other contexts where a shared understanding among team members is crucial.

The picture that explains everything image

Yet Another ERP

We put all the Domain Experts, Developers, Product Owners ecc in the same room and start doing a big picture event storming.

images

Then at the end we have a clear idea of what the system should do (or at least we think so).

images

Play with Orders

Let's create an Order invoking the relative command and take the aggregate id from the command response, for the next steps.

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

aggregate_uuid =
  Kino.RPC.eval_string(
    node,
    ~S"""
    alias EsCqrsAnatomy.Order.Commands.CreateOrder
    alias EsCqrsAnatomy.Order.Commands.OrderItem

    %CreateOrder{
      id: Faker.UUID.v4(), 
      order_number: Faker.String.base64(5), 
      business_partner: Faker.Internet.email(),
      items: [
        %OrderItem{
          product_id: Faker.UUID.v4(),
          quantity: 1,
          uom: "KG"
        }
      ]
    } 
    |> EsCqrsAnatomy.App.dispatch(include_execution_result: true)
    |> elem(1)
    |> Map.get(:aggregate_uuid)
    """,
    file: __ENV__.file
  )

Create a connection to the Event Store

opts = [
  hostname: "postgres",
  port: 5432,
  username: "postgres",
  password: "postgres",
  database: "event_store"
]

{:ok, conn} = Kino.start_child({Postgrex, opts})

Fetch all the event of the Aggregate.
Here we cheat a bit using the event payload to identify all the events related to our Aggregate.
We see the corect way to doi it soon

result_events =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT 
      event_id::text,
      event_type,
      data
    FROM events 
    WHERE encode(data, 'escape')::json ->> 'id' = $1
    """,
    [aggregate_uuid]
  )

Every Aggregate as it's own Stream

result =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT 
      stream_id,
      stream_uuid,
      stream_version 
    FROM streams 
    WHERE stream_uuid = $1
    """,
    [aggregate_uuid]
  )

Let's invoke another command on the Aggregate

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  alias EsCqrsAnatomy.Order.Commands.CompleteOrder

  %CompleteOrder{
    id: aggregate_uuid
  } 
  |> EsCqrsAnatomy.App.dispatch()
  """,
  file: __ENV__.file
)

Check the Stream now

stream =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT 
      stream_id,
      stream_uuid,
      stream_version 
    FROM streams 
    WHERE stream_uuid = $1
    """,
    [aggregate_uuid]
  )

Take the Stream Id of out Aggregate

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

stream_id =
  Kino.RPC.eval_string(
    node,
    ~S"""
    %{rows: [[stream_id | _]]} = stream
    stream_id
    """,
    file: __ENV__.file
  )

Check the event i that Stream

result4 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT
      event_id::text,
      stream_id,
      stream_version
    FROM stream_events se
    WHERE stream_id = $1
    ORDER BY stream_version ASC
    """,
    [stream_id]
  )

Put all together to see all events for a give Aggregate

result3 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT 
      e.event_id::text,
      event_type,
      data
    FROM events e
      JOIN stream_events se
        ON e.event_id = se.event_id
      JOIN streams s
        ON s.stream_id = se.stream_id
    WHERE s.stream_uuid = $1
    ORDER BY se.stream_version ASC
    """,
    [aggregate_uuid]
  )

The C stand for Command

How a command is routed to the right Aggregate?
With the Router of course!

We define which fields are the key of the Aggregate and which Command should be routed to which Aggregate

Note! We have removed CreateOrder intentionally. Let's see what happen...

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Router do
    use Commanded.Commands.Router

    alias EsCqrsAnatomy.Order.Aggregate.Order

    alias EsCqrsAnatomy.Order.Commands.{CompleteOrder}
    
    identify(Order, by: :id)

    dispatch([CompleteOrder], to: Order)
    
  end
  """,
  file: __ENV__.file
)
require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

command_result =
  Kino.RPC.eval_string(
    node,
    ~S"""
    alias EsCqrsAnatomy.Order.Commands.CreateOrder
    alias EsCqrsAnatomy.Order.Commands.OrderItem

    %CreateOrder{
      id: Faker.UUID.v4(), 
      order_number: Faker.String.base64(5), 
      business_partner: Faker.Internet.email(),
      items: [
        %OrderItem{
          product_id: Faker.UUID.v4(),
          quantity: 1,
          uom: "KG"
        }
      ]
    } 
    |> EsCqrsAnatomy.App.dispatch()
    """,
    file: __ENV__.file
  )

Cool! The Command is not registered to any Aggregate

Middlewares

Middlewares are component used to perform the follow tasks:

  • Command Validation (static and dynamic)
  • Uniqueness Check
  • Command Enrichment

Note! This time we've added some Middlewares before to reach the Aggregate

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Router do
    use Commanded.Commands.Router

    alias EsCqrsAnatomy.Middleware.{Validate, Enrichment}
    alias Commanded.Middleware.Uniqueness
    alias EsCqrsAnatomy.Order.Aggregate.Order
    alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder}

    middleware(Enrichment)
    middleware(Validate)
    middleware(Uniqueness)

    identify(Order, by: :id)

    dispatch([CreateOrder, CompleteOrder], to: Order)
  end
  """,
  file: __ENV__.file
)

Command Validation

we can do check on mandatory fields and the relative format.

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Order.Commands.CreateOrder do
    use TypedStruct
    use Vex.Struct
    use StructAccess

    alias EsCqrsAnatomy.Order.Commands.OrderItem

    @derive Jason.Encoder
    typedstruct enforce: true do
      field(:id, String.t())
      field(:order_number, String.t())
      field(:business_partner, String.t())
      field(:items, list(OrderItem))
    end

    use ExConstructor

    validates(:id, presence: true, uuid: true)
    validates(:order_number, presence: true, string: true)
    validates(:business_partner, presence: true, email: true)
    validates(:items, presence: true, list_of_structs: true)
  end

  defmodule EsCqrsAnatomy.Order.Commands.OrderItem do
    use TypedStruct
    use Vex.Struct
    use StructAccess

    @derive Jason.Encoder
    typedstruct enforce: true do
      field(:product_id, String.t())
      field(:quantity, String.t())
      field(:uom, String.t())
    end

    use ExConstructor

    validates(:product_id, presence: true, uuid: true)
    validates(:quantity, presence: true, number: [greater_than: 0])
    validates(:uom, presence: true, inclusion: ["KG", "MT"])
  end
  """,
  file: __ENV__.file
)

We can validate standard format like String, Number ecc or create custom validator. In this example we have two custom validator for UUID and Email

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Validator.Email do
    use Vex.Validator

    def validate(value, _options) when is_binary(value) do
      if String.match?(value, regexp()) do
        :ok
      else
        {:error, "must be a valid email"}
      end
    end

    def validate(_, _options), do: {:error, "must be a valid email"}

    defp regexp(),
      do: ~r/^[_A-Za-z0-9-\+]+(\.[_A-Za-z0-9-]+)*@[A-Za-z0-9-]+(\.[A-Za-z0-9]+)*(\.[A-Za-z]{2,})$/
  end
  """,
  file: __ENV__.file
)
require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  alias EsCqrsAnatomy.Order.Commands.CreateOrder
  alias EsCqrsAnatomy.Order.Commands.OrderItem

  %CreateOrder{
    id: Faker.UUID.v4(), 
    order_number: 12345, 
    business_partner: "i_wanna_be_an_email",
    items: [
      %OrderItem{
        product_id: "Pen",
        quantity: 0,
        uom: "LT"
      }
    ]
  } 
  |> EsCqrsAnatomy.App.dispatch()
  """,
  file: __ENV__.file
)

Uniqueness Check

The order id is what it's called technical key. What identify an order from a business prospective (business key), it's the order number.
So we must avoid to have 2 orders with the same order number.

How can we handle this cross aggregates rule?

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  alias EsCqrsAnatomy.Order.Projections.Order
  alias EsCqrsAnatomy.Order.Commands.CreateOrder

  defimpl Commanded.Middleware.Uniqueness.UniqueFields,
    for: EsCqrsAnatomy.Order.Commands.CreateOrder do
    def unique(%CreateOrder{order_number: order_number}),
      do: [
        {:order_number, "already exist", order_number,
         ignore_case: false, is_unique: &order_number_is_unique?/4, order_number: order_number}
      ]

    def order_number_is_unique?(_field, value, _owner, _opts),
      do: Order.order_number_is_unique?(value)
  end
  """,
  file: __ENV__.file
)

The first Command that arrive, check if the order number already exist into the Projection. If it's available, it store the value in a cache to keep locked the value for a give time. In this way another Command, find the order number into the cache and the command is rejected

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

already_existing_order_number =
  Kino.RPC.eval_string(
    node,
    ~S"""
    alias EsCqrsAnatomy.Order.Projections.Order
    alias EsCqrsAnatomy.Order.Commands.CreateOrder
    alias EsCqrsAnatomy.Order.Commands.OrderItem

    already_existing_order_number = Order.first_order_number() |> List.first()

    %CreateOrder{
      id: Faker.UUID.v4(), 
      order_number: already_existing_order_number, 
      business_partner: Faker.Internet.email(),
      items: [
        %OrderItem{
          product_id: Faker.UUID.v4(),
          quantity: 1,
          uom: "KG"
        }
      ]
    } 
    |> EsCqrsAnatomy.App.dispatch()
    """,
    file: __ENV__.file
  )

Command Enrichment

Let's try to insert a contraints that don't allow to sold blocked product

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Order.Commands.CompleteOrder do
    use TypedStruct
    use Vex.Struct

    @derive Jason.Encoder
    typedstruct enforce: true do
      field(:id, String.t())
      field(:blocked_product_ids, list(String.t()), enforce: false)
    end

    validates(:id, presence: true, uuid: true)
  end


  alias EsCqrsAnatomy.Middleware.Enrichment.EnrichmentProtocol
  alias EsCqrsAnatomy.Order.Commands.CompleteOrder
  alias EsCqrsAnatomy.Order.Constants
  alias EsCqrsAnatomy.Order.Projections.OrderItem

  defimpl EnrichmentProtocol, for: CompleteOrder do
    
    def enrich(%CompleteOrder{} = command) do
      %CompleteOrder{id: id} = command

      products_in_order = OrderItem.products_in_order(id)

      command = %CompleteOrder{
        command
        | blocked_product_ids: lookup_external_data(products_in_order)
      }

      {:ok, command}
    end

    defp lookup_external_data(products_in_order) do
      if Enum.member?(products_in_order, Constants.blocked_product_id()) do
        [Constants.blocked_product_id()]
      else
        []
      end
    end
  end


  """,
  file: __ENV__.file
)
require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

order_id_to_block =
  Kino.RPC.eval_string(
    node,
    ~S"""
    alias EsCqrsAnatomy.Order.Commands.{CreateOrder}
    alias EsCqrsAnatomy.Order.Commands.OrderItem
    alias EsCqrsAnatomy.Order.Constants

    order_id_to_block = 
      %CreateOrder{
        id: Faker.UUID.v4(), 
        order_number: Faker.String.base64(5), 
        business_partner: Faker.Internet.email(),
        items: [
          %OrderItem{
            product_id: Constants.blocked_product_id(),
            quantity: 1,
            uom: "KG"
          }
        ]
      } 
      |> EsCqrsAnatomy.App.dispatch(include_execution_result: true)
      |> elem(1)
      |> Map.get(:aggregate_uuid)

    """,
    file: __ENV__.file
  )
require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  alias EsCqrsAnatomy.Order.Commands.{CompleteOrder}

  %CompleteOrder{
    id: order_id_to_block
  } 
  |> EsCqrsAnatomy.App.dispatch()
  """,
  file: __ENV__.file
)

Finally the Aggregate

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Order.Aggregate.Order do
    use TypedStruct

    alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder, DeleteOrder}
    alias EsCqrsAnatomy.Order.Events.{OrderCreated, OrderCompleted, OrderDeleted}
    alias EsCqrsAnatomy.Order.Aggregate.OrderStatus

    @derive Jason.Encoder
    typedstruct enforce: true do
      field(:id, String.t())
      field(:status, String.t())
    end

    def execute(%__MODULE__{id: nil}, %CreateOrder{} = command) do
      %OrderCreated{
        id: command.id,
        order_number: command.order_number,
        business_partner: command.business_partner,
        items: command.items
      }
    end

    def execute(%__MODULE__{}, %CreateOrder{}), do: {:error, "Order already created"}

    def execute(%__MODULE__{status: "COMPLETED"}, %CompleteOrder{}),
      do: {:error, "Order already completed"}

    def execute(%__MODULE__{}, %CompleteOrder{blocked_product_ids: blocked_product_ids})
        when length(blocked_product_ids) > 0 do
      {:error, "Order contains blocked products: #{IO.inspect(blocked_product_ids)}"}
    end

    def execute(%__MODULE__{id: id}, %CompleteOrder{}), do: %OrderCompleted{id: id}

    def execute(%__MODULE__{id: id}, %DeleteOrder{}), do: %OrderDeleted{id: id}

    def apply(%__MODULE__{} = order, %OrderCreated{} = event) do
      %__MODULE__{
        order
        | id: event.id,
          status: OrderStatus.open()
      }
    end

    def apply(%__MODULE__{} = order, %OrderCompleted{} = event) do
      %__MODULE__{
        order
        | id: event.id,
          status: OrderStatus.completed()
      }
    end

    def apply(%__MODULE__{} = order, _), do: order
  end

  """,
  file: __ENV__.file
)

Event Handlers

images

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Router do
    use Commanded.Commands.Router

    alias EsCqrsAnatomy.Middleware.{Validate, Enrichment}
    alias Commanded.Middleware.Uniqueness
    alias EsCqrsAnatomy.Order.Aggregate.Order
    alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder}
    alias EsCqrsAnatomy.Shipment.Aggregate.Shipment
    alias EsCqrsAnatomy.Shipment.Commands.{CreateShipment, CompleteShipment}

    middleware(Enrichment)
    middleware(Validate)
    middleware(Uniqueness)

    identify(Order, by: :id)
    identify(Shipment, by: :id)

    dispatch([CreateOrder, CompleteOrder], to: Order)
    dispatch([CreateShipment, CompleteShipment], to: Shipment)
  end
  """,
  file: __ENV__.file
)
require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Shipment.Policies.Shipment do
    use Commanded.Event.Handler,
      application: EsCqrsAnatomy.App,
      name: "shipment",
      start_from: :current
      
    use EsCqrsAnatomy.Base.EventHandler
    
    alias EsCqrsAnatomy.Order.Events.OrderCompleted
    alias EsCqrsAnatomy.Shipment.Commands.CreateShipment

    def handle(%OrderCompleted{id: id}, %{
          event_id: causation_id,
          correlation_id: correlation_id
        }) do
      %CreateShipment{
        id: UUID.uuid4(),
        order_id: id
      }
      |> EsCqrsAnatomy.App.dispatch(causation_id: causation_id, correlation_id: correlation_id)
    end

  end
  """,
  file: __ENV__.file
)
require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.EventHandlerSupervisor do
    use Supervisor

    def start_link(_args) do
      Supervisor.start_link(__MODULE__, [], name: __MODULE__)
    end

    def init(_args) do
      Supervisor.init(
        [
          EsCqrsAnatomy.Order.Projectors.Orders,
          EsCqrsAnatomy.Shipment.Policies.Shipment
        ],
        strategy: :one_for_one
      )
    end
  end

  """,
  file: __ENV__.file
)
require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"Process.whereis(EsCqrsAnatomy.EventHandlerSupervisor) |> Process.exit(:kill)",
  file: __ENV__.file
)
require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

order_id =
  Kino.RPC.eval_string(
    node,
    ~S"""
    alias EsCqrsAnatomy.Order.Commands.{CreateOrder, CompleteOrder}
    alias EsCqrsAnatomy.Order.Commands.OrderItem
    alias EsCqrsAnatomy.Order.Constants

    order_id = 
      %CreateOrder{
        id: Faker.UUID.v4(), 
        order_number: Faker.String.base64(5), 
        business_partner: Faker.Internet.email(),
        items: [
          %OrderItem{
            product_id: Faker.UUID.v4(),
            quantity: 1,
            uom: "KG"
          }
        ]
      } 
      |> EsCqrsAnatomy.App.dispatch(include_execution_result: true)
      |> elem(1)
      |> Map.get(:aggregate_uuid)

    %CompleteOrder{
      id: order_id
    } 
    |> EsCqrsAnatomy.App.dispatch()

    order_id
    """,
    file: __ENV__.file
  )
result5 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT 
      event_id::text,
      event_type,
      data
    FROM events 
    WHERE event_type = 'Shipment.ShipmentCreated' 
    AND encode(data, 'escape')::json ->> 'order_id' = $1
    """,
    [order_id]
  )
result6 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT 
    	subscription_name,
    	stream_uuid,
    	last_seen 
    FROM subscriptions
    WHERE subscription_name  = 'shipment'
    """,
    []
  )
result7 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT 
      stream_id,
      stream_uuid,
      stream_version 
    FROM streams 
    ORDER BY stream_id ASC
    """,
    []
  )
result8 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT  
    	e.event_type,
    	se.stream_id,
    	se.stream_version,
    	se.original_stream_id,
    	se.original_stream_version 
    FROM stream_events se
      JOIN events e
    		ON se.event_id = e.event_id
    ORDER BY 
    	se.original_stream_id ASC, 
    	se.original_stream_version ASC,
    	se.stream_version ASC, 
    	se.stream_id DESC
    """,
    []
  )

Projector

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Order.Projectors.Orders do
    use Commanded.Projections.Ecto,
      application: EsCqrsAnatomy.App,
      repo: EsCqrsAnatomy.Repo,
      name: "orders"

    use EsCqrsAnatomy.Base.EventHandler

    alias EsCqrsAnatomy.Order.Events.{
      OrderCreated,
      OrderCompleted,
      OrderDeleted
    }

    alias EsCqrsAnatomy.Order.Projections.{Order, OrderItem}
    alias EsCqrsAnatomy.Order.Aggregate.OrderStatus

    project(
      %OrderCreated{} = event,
      _metadata,
      fn multi ->
        multi
        |> Ecto.Multi.insert(
          :orders,
          %Order{
            id: event.id,
            order_number: event.order_number,
            business_partner: event.business_partner,
            status: OrderStatus.open(),
            items:
              event.items
              |> Enum.map(
                &%OrderItem{
                  id: UUID.uuid4(),
                  order_id: event.id,
                  product_id: &1.product_id,
                  quantity: &1.quantity,
                  uom: &1.uom
                }
              )
          }
        )
      end
    )

    project(
      %OrderCompleted{id: id},
      _metadata,
      fn multi ->
        multi
        |> Ecto.Multi.run(:order_to_update, fn repo, _changes ->
          {:ok, repo.get(Order, id)}
        end)
        |> Ecto.Multi.update(:order, fn %{order_to_update: order} ->
          Ecto.Changeset.change(order,
            status: OrderStatus.completed()
          )
        end)
      end
    )

    project(
      %OrderDeleted{id: id},
      _metadata,
      fn multi ->
        multi
        |> Ecto.Multi.run(:order_to_delete, fn repo, _changes ->
          {rows_deleted, _} = from(o in Order, where: o.id == ^id) |> repo.delete_all()
          {:ok, rows_deleted}
        end)
      end
    )

  end
  """,
  file: __ENV__.file
)
opts = [
  hostname: "postgres",
  port: 5432,
  username: "postgres",
  password: "postgres",
  database: "read_store"
]

{:ok, conn} = Kino.start_child({Postgrex, opts})

Remember to switch to conn_rs

result9 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT
      o.order_number, 
      o.business_partner, 
      o.status, 
      oi.product_id, 
      oi.quantity, 
      oi.uom 
    FROM orders o 
    	JOIN order_items oi 
    		ON o.id = oi.order_id 
    ORDER BY 
      o.order_number, 
      oi.quantity 
    """,
    []
  )
projection_version =
  Postgrex.query!(
    conn,
    ~S"""
    select 
    		projection_name,
    		last_seen_event_number 
    	from projection_versions
    	where projection_name = 'orders'
    """,
    []
  )
require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

last_seen_event_number =
  Kino.RPC.eval_string(
    node,
    ~S"""
    %{rows: [[_ , last_seen_event_number]]} = projection_version
    last_seen_event_number
    """,
    file: __ENV__.file
  )
opts = [
  hostname: "postgres",
  port: 5432,
  username: "postgres",
  password: "postgres",
  database: "event_store"
]

{:ok, conn} = Kino.start_child({Postgrex, opts})
result11 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT 
      event_type,
      data
    FROM events e
      JOIN stream_events se
        ON e.event_id = se.event_id
    WHERE 
    	se.stream_id  = 0
    	AND se.stream_version = $1

    """,
    [last_seen_event_number]
  )

Performance

Stream Linking

require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.Policies.StreamLinker do
    use Commanded.Event.Handler,
      application: EsCqrsAnatomy.App,
      name: "stream_linker",
      start_from: :origin

    require Logger

    alias Commanded.Event.FailureContext
    alias EsCqrsAnatomy.Order.Events.{OrderCreated, OrderCompleted}

    def handle(%OrderCreated{}, %{event_id: event_id}) do
      orders_stream_id = UUID.uuid5(:oid, "orders")
      EsCqrsAnatomy.EventStore.link_to_stream(orders_stream_id, :any_version, [event_id])
    end

    def handle(%OrderCompleted{}, %{event_id: event_id}) do
      orders_stream_id = UUID.uuid5(:oid, "orders")
      EsCqrsAnatomy.EventStore.link_to_stream(orders_stream_id, :any_version, [event_id])
    end

    def error({:error, reason}, event, %FailureContext{context: context}) do
      context = record_failure(context)

      case Map.get(context, :failures) do
        too_many when too_many >= 3 ->
          # skip bad event after third failure
          Logger.error(
            "#{__MODULE__} Skipping bad event, too many failures: #{inspect(event)} for reason: #{inspect(reason)}"
          )

          :skip

        _ ->
          # retry event, failure count is included in context map
          {:retry, context}
      end
    end

    defp record_failure(context) do
      Map.update(context, :failures, 1, fn failures -> failures + 1 end)
    end
  end

  """,
  file: __ENV__.file
)
require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"""
  defmodule EsCqrsAnatomy.EventHandlerSupervisor do
    use Supervisor

    def start_link(_args) do
      Supervisor.start_link(__MODULE__, [], name: __MODULE__)
    end

    def init(_args) do
      Supervisor.init(
        [
          EsCqrsAnatomy.Order.Projectors.Orders,
          EsCqrsAnatomy.Shipment.Policies.Shipment,
          EsCqrsAnatomy.Policies.StreamLinker
        ],
        strategy: :one_for_one
      )
    end
  end
  """,
  file: __ENV__.file
)
require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

Kino.RPC.eval_string(
  node,
  ~S"Process.whereis(EsCqrsAnatomy.EventHandlerSupervisor) |> Process.exit(:kill)",
  file: __ENV__.file
)
require Kino.RPC
node = :"[email protected]"
Node.set_cookie(node, :secretcookie)

orders_stream_id =
  Kino.RPC.eval_string(
    node,
    ~S"""
    orders_stream_id = UUID.uuid5(:oid, "orders")
    """,
    file: __ENV__.file
  )
result13 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT
      stream_uuid,
      stream_version
    FROM streams
    WHERE stream_uuid = $1
    """,
    [orders_stream_id]
  )

Aggregate Snapshoot

config :es_cqrs_anatomy, EsCqrsAnatomy.App,
  snapshotting: %{
    EsCqrsAnatomy.Order.Aggregate.Order => [
      snapshot_every: 1,
      snapshot_version: 1
    ]
  }
result12 =
  Postgrex.query!(
    conn,
    ~S"""
    SELECT
    	s.source_version,
    	s.source_type,
      s.metadata,
    	s.data
    FROM snapshots s 
    """,
    []
  )