diff --git a/lib/projections/ecto.ex b/lib/projections/ecto.ex index 6caa822..afdd30d 100644 --- a/lib/projections/ecto.ex +++ b/lib/projections/ecto.ex @@ -59,37 +59,40 @@ defmodule Commanded.Projections.Ecto do projection_name = Map.fetch!(metadata, :handler_name) event_number = Map.fetch!(metadata, :event_number) - changeset = - %ProjectionVersion{projection_name: projection_name} - |> ProjectionVersion.changeset(%{last_seen_event_number: event_number}) + projection_version = %ProjectionVersion{ + projection_name: projection_name, + last_seen_event_number: event_number + } prefix = schema_prefix(event, metadata) + # Query to update an existing projection version with the last seen event number with + # a check to ensure that the event has not already been projected. + update_projection_version = + from(pv in ProjectionVersion, + where: + pv.projection_name == ^projection_name and pv.last_seen_event_number < ^event_number, + update: [set: [last_seen_event_number: ^event_number]] + ) + multi = Ecto.Multi.new() - |> Ecto.Multi.run(:verify_projection_version, fn repo, _changes -> - version = - case repo.get(ProjectionVersion, projection_name, prefix: prefix) do - nil -> - repo.insert!( - %ProjectionVersion{ - projection_name: projection_name, - last_seen_event_number: 0 - }, - prefix: prefix - ) - - version -> - version - end - - if version.last_seen_event_number < event_number do - {:ok, %{version: version}} - else - {:error, :already_seen_event} + |> Ecto.Multi.run(:track_projection_version, fn repo, _changes -> + try do + repo.insert(projection_version, + prefix: prefix, + on_conflict: update_projection_version, + conflict_target: [:projection_name] + ) + rescue + exception in Ecto.StaleEntryError -> + # Attempted to insert a projection version for an already seen event + {:error, :already_seen_event} + + exception -> + reraise exception, __STACKTRACE__ end end) - |> Ecto.Multi.update(:projection_version, changeset, prefix: prefix) with %Ecto.Multi{} = multi <- apply(multi_fn, [multi]), {:ok, changes} <- transaction(multi) do @@ -99,7 +102,7 @@ defmodule Commanded.Projections.Ecto do :ok end else - {:error, :verify_projection_version, :already_seen_event, _changes} -> :ok + {:error, :track_projection_version, :already_seen_event, _changes} -> :ok {:error, _stage, error, _changes} -> {:error, error} {:error, _error} = reply -> reply end diff --git a/test/projections/ecto_projection_test.exs b/test/projections/ecto_projection_test.exs index a7211ed..eb1e325 100644 --- a/test/projections/ecto_projection_test.exs +++ b/test/projections/ecto_projection_test.exs @@ -68,6 +68,27 @@ defmodule Commanded.Projections.EctoProjectionTest do assert_seen_event("Projector", 3) end + test "should prevent an event being projected more than once" do + Projector.handle(%AnEvent{name: "Event1"}, %{handler_name: "Projector", event_number: 1}) + Projector.handle(%AnEvent{name: "Event2"}, %{handler_name: "Projector", event_number: 2}) + + tasks = + Enum.map(1..3, fn _index -> + Task.async(fn -> + :timer.sleep(:rand.uniform(10)) + + Projector.handle(%AnEvent{name: "Event3"}, %{handler_name: "Projector", event_number: 3}) + end) + end) + + results = Task.await_many(tasks) + + assert Enum.uniq(results) == [:ok] + + assert_projections(Projection, ["Event1", "Event2", "Event3"]) + assert_seen_event("Projector", 3) + end + test "should return an error on failure" do assert {:error, :failure} == Projector.handle(%ErrorEvent{}, %{handler_name: "Projector", event_number: 1})