Skip to content

Commit

Permalink
feat: add annotations to subscriptions table
Browse files Browse the repository at this point in the history
  • Loading branch information
yordis committed Jan 24, 2025
1 parent 362f087 commit 2a0715c
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 13 deletions.
16 changes: 15 additions & 1 deletion lib/event_store/sql/init.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,24 @@ defmodule EventStore.Sql.Init do
create_subscription_index(),
create_snapshots_table(column_data_type),
create_schema_migrations_table(),
record_event_store_schema_version()
record_event_store_schema_version(),
add_annotations_to_subscriptions(),
create_subscription_annotations_index()
]
end

defp add_annotations_to_subscriptions do
"""
ALTER TABLE subscriptions ADD COLUMN annotations jsonb;
"""
end

defp create_subscription_annotations_index do
"""
CREATE INDEX subscriptions_annotations_idx ON subscriptions USING gin (annotations);
"""
end

defp create_streams_table do
"""
CREATE TABLE streams
Expand Down
7 changes: 4 additions & 3 deletions lib/event_store/sql/statements/insert_subscription.sql.eex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ INSERT INTO "<%= schema %>".subscriptions
(
stream_uuid,
subscription_name,
last_seen
last_seen,
annotations
)
VALUES ($1, $2, $3)
RETURNING subscription_id, stream_uuid, subscription_name, last_seen, created_at;
VALUES ($1, $2, $3, $4)
RETURNING subscription_id, stream_uuid, subscription_name, last_seen, created_at, annotations;
53 changes: 47 additions & 6 deletions lib/event_store/storage/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,40 @@ defmodule EventStore.Storage.Subscription do
QuerySubscription
}

@typedoc """
A subscription to an event stream.
* `subscription_id` - Unique identifier for the subscription
* `stream_uuid` - The stream being subscribed to
* `subscription_name` - Name of the subscription
* `last_seen` - Last event seen by this subscription
* `created_at` - When the subscription was created
* `annotations` - Arbitrary non-identifying metadata attached to the
subscription, inspired by Kubernetes annotations. These are key-value pairs
that can be used to store auxiliary information about a subscription that
is not directly part of its core functionality. For example:
* Build/release information (team owner, git sha, etc.)
* Client-specific configuration
* Debugging info
* Tool information
"""
@type t :: %EventStore.Storage.Subscription{
subscription_id: non_neg_integer(),
stream_uuid: String.t(),
subscription_name: String.t(),
last_seen: non_neg_integer() | nil,
created_at: DateTime.t()
created_at: DateTime.t(),
annotations: map()
}

defstruct [:subscription_id, :stream_uuid, :subscription_name, :last_seen, :created_at]
defstruct [
:subscription_id,
:stream_uuid,
:subscription_name,
:last_seen,
:created_at,
:annotations
]

defdelegate subscriptions(conn, opts), to: QueryAllSubscriptions, as: :execute

Expand Down Expand Up @@ -49,7 +74,17 @@ defmodule EventStore.Storage.Subscription do
do: Subscription.Delete.execute(conn, stream_uuid, subscription_name, opts)

defp create_subscription(conn, stream_uuid, subscription_name, start_from, opts) do
case CreateSubscription.execute(conn, stream_uuid, subscription_name, start_from, opts) do
{annotations, opts} = Keyword.split(opts, [:annotations])
annotations = annotations || %{}

case CreateSubscription.execute(
conn,
stream_uuid,
subscription_name,
start_from,
annotations,
opts
) do
{:ok, %Subscription{}} = reply ->
reply

Expand Down Expand Up @@ -96,7 +131,7 @@ defmodule EventStore.Storage.Subscription do
defmodule CreateSubscription do
@moduledoc false

def execute(conn, stream_uuid, subscription_name, start_from, opts) do
def execute(conn, stream_uuid, subscription_name, start_from, annotations, opts) do
Logger.debug(
"Attempting to create subscription on stream " <>
inspect(stream_uuid) <>
Expand All @@ -107,7 +142,12 @@ defmodule EventStore.Storage.Subscription do

query = Statements.insert_subscription(schema)

case Postgrex.query(conn, query, [stream_uuid, subscription_name, start_from], opts) do
case Postgrex.query(
conn,
query,
[stream_uuid, subscription_name, start_from, annotations],
opts
) do
{:ok, %Postgrex.Result{rows: rows}} ->
Logger.debug(
"Created subscription on stream \"#{stream_uuid}\" named \"#{subscription_name}\""
Expand Down Expand Up @@ -208,7 +248,8 @@ defmodule EventStore.Storage.Subscription do
stream_uuid: stream_uuid,
subscription_name: subscription_name,
last_seen: last_seen,
created_at: created_at
created_at: created_at,
annotations: annotations || %{}

Check warning on line 252 in lib/event_store/storage/subscription.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.14.x, 26, ignore)

variable "annotations" does not exist and is being expanded to "annotations()", please use parentheses to remove the ambiguity or change the variable name

Check failure on line 252 in lib/event_store/storage/subscription.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.14.x, 26, ignore)

** (CompileError) lib/event_store/storage/subscription.ex:252: undefined function annotations/0 (expected EventStore.Storage.Subscription.Adapter to define such a function or for it to be imported, but none are available)

Check warning on line 252 in lib/event_store/storage/subscription.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

variable "annotations" does not exist and is being expanded to "annotations()", please use parentheses to remove the ambiguity or change the variable name

Check failure on line 252 in lib/event_store/storage/subscription.ex

View workflow job for this annotation

GitHub Actions / Build and test (1.13.x, 25, ignore)

** (CompileError) lib/event_store/storage/subscription.ex:252: undefined function annotations/0 (expected EventStore.Storage.Subscription.Adapter to define such a function or for it to be imported, but none are available)
}
end
end
Expand Down
7 changes: 7 additions & 0 deletions priv/event_store/migrations/v1.4.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Add annotations field to subscriptions table

ALTER TABLE subscriptions
ADD COLUMN annotations jsonb DEFAULT '{}'::jsonb NOT NULL;

-- Add index on annotations for better query performance
CREATE INDEX subscriptions_annotations_idx ON subscriptions USING gin (annotations);
45 changes: 43 additions & 2 deletions test/storage/subscription_persistence_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,57 @@ defmodule EventStore.Storage.SubscriptionPersistenceTest do
verify_subscription(subscription, 1)
end

test "create subscription with annotations", context do
annotations = %{"key" => "value", "nested" => %{"data" => 123}}
{:ok, subscription} = subscribe_to_stream(context, annotations: annotations)

verify_subscription(subscription)
assert subscription.annotations == annotations
end

test "create subscription with default empty annotations", context do
{:ok, subscription} = subscribe_to_stream(context)

verify_subscription(subscription)
assert subscription.annotations == %{}
end

test "create subscription when already exists preserves annotations", context do
# First create with annotations
initial_annotations = %{"key" => "value", "metadata" => %{"version" => 1}}
{:ok, subscription1} = subscribe_to_stream(context, annotations: initial_annotations)

# Then try different scenarios that should all preserve the original annotations
# No annotations provided
{:ok, subscription2} = subscribe_to_stream(context)
# Explicit nil
{:ok, subscription3} = subscribe_to_stream(context, annotations: nil)
# Empty map
{:ok, subscription4} = subscribe_to_stream(context, annotations: %{})

# Verify all subscriptions are the same
assert subscription1.subscription_id == subscription2.subscription_id
assert subscription2.subscription_id == subscription3.subscription_id
assert subscription3.subscription_id == subscription4.subscription_id

# Verify annotations are preserved in all cases
assert subscription1.annotations == initial_annotations
assert subscription2.annotations == initial_annotations
assert subscription3.annotations == initial_annotations
assert subscription4.annotations == initial_annotations
end

def ack_last_seen_event(context, last_seen) do
%{conn: conn, schema: schema} = context

Storage.ack_last_seen_event(conn, @all_stream, @subscription_name, last_seen, schema: schema)
end

defp subscribe_to_stream(context) do
defp subscribe_to_stream(context, opts \\ []) do
%{conn: conn, schema: schema} = context
opts = Keyword.merge([schema: schema], opts)

Storage.subscribe_to_stream(conn, @all_stream, @subscription_name, schema: schema)
Storage.subscribe_to_stream(conn, @all_stream, @subscription_name, opts)
end

defp delete_subscription(context) do
Expand Down
3 changes: 2 additions & 1 deletion test/subscriptions/subscription_recovery_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ defmodule EventStore.Subscriptions.SubscriptionRecoveryTest do
defp subscribe_to_all_streams(subscription_name, subscriber, opts) do
{:ok, subscription} = EventStore.subscribe_to_all_streams(subscription_name, subscriber, opts)

assert_receive {:subscribed, ^subscription}
# Increase timeout to 5 seconds to account for potential database operations
assert_receive {:subscribed, ^subscription}, 5_000

{:ok, subscription}
end
Expand Down

0 comments on commit 2a0715c

Please sign in to comment.