Skip to content

Commit

Permalink
Merge pull request #7 from NFIBrokerage/bump-extreme
Browse files Browse the repository at this point in the history
Bump extreme
  • Loading branch information
burmajam authored Nov 14, 2024
2 parents f9b4371 + bc243cc commit 39a2cf5
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 83 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to
[Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## 1.1.0 - 2024-06-18

- Use new `Extreme.ListenerWithBackPressure`

## 1.0.1 - 2024-01-31

- Bump all dependencies including bumping `:extreme` to v1.0.5 which fixes a
Expand Down
7 changes: 7 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
import Config

config :logger, :console,
format: "$time $metadata[$level] $message\n",
level: :debug,
metadata: [:pid, :module, :function]

config :ex_unit, capture_log: true

config :kelvin, ExtremeClient,
db_type: :node,
host: System.get_env("EVENTSTORE_HOST") || "localhost",
Expand Down
87 changes: 41 additions & 46 deletions lib/kelvin/in_order_subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule Kelvin.InOrderSubscription do

defstruct [
:config,
:subscription,
:extreme_listener,
:self,
:max_buffer_size,
demand: 0,
Expand All @@ -53,13 +53,36 @@ defmodule Kelvin.InOrderSubscription do
Keyword.get(
opts,
:catch_up_chunk_size,
Application.get_env(:kelvin, :catch_up_chunk_size, 256)
Application.get_env(:kelvin, :catch_up_chunk_size, 128)
)

connection = Keyword.fetch!(opts, :connection)
stream_name = Keyword.fetch!(opts, :stream_name)

listener_name =
opts
|> Keyword.get(:name, __MODULE__)
|> Module.concat(ExtremeListener)

{:ok, extreme_listener} =
Kelvin.Listener.start_link(connection, stream_name,
read_per_page: max_buffer_size,
auto_subscribe: false,
ack_timeout: :infinity,
name: listener_name,
producer: self(),
get_stream_position_fun: fn ->
opts
|> Keyword.fetch!(:restore_stream_position!)
|> _do_function()
end
)

state = %__MODULE__{
extreme_listener: extreme_listener,
config: Map.new(opts),
self: Keyword.get(opts, :name, self()),
max_buffer_size: max_buffer_size
max_buffer_size: max_buffer_size * 2
}

Process.send_after(
Expand All @@ -75,7 +98,7 @@ defmodule Kelvin.InOrderSubscription do
def handle_info(:check_auto_subscribe, state) do
identifier = "#{inspect(__MODULE__)} (#{inspect(state.self)})"

if do_function(state.config.subscribe_on_init?) do
if _do_function(state.config.subscribe_on_init?) do
Logger.info("#{identifier} subscribing to '#{state.config.stream_name}'")

GenStage.async_info(self(), :subscribe)
Expand All @@ -92,37 +115,21 @@ defmodule Kelvin.InOrderSubscription do
end

def handle_info(:subscribe, state) do
if state.subscription do
# coveralls-ignore-start
Logger.warn("#{inspect(__MODULE__)} is already subscribed.")
# coveralls-ignore-stop
else
case subscribe(state) do
{:ok, sub} ->
Process.link(sub)
{:noreply, [], put_in(state.subscription, sub)}

# coveralls-ignore-start
{:error, reason} ->
{:stop, reason, state}
Kelvin.Listener.subscribe(state.extreme_listener)

# coveralls-ignore-stop
end
end
{:noreply, [], state}
end

def handle_info(_info, state), do: {:noreply, [], state}

@impl GenStage
def handle_call({:on_event, event}, from, state) do
# when the current demand is 0, we should
case state do
%{demand: 0, buffer_size: size, max_buffer_size: max}
when size + 1 == max ->
{:noreply, [], enqueue(state, {event, from})}
{:noreply, [], _enqueue(state, {event, from})}

%{demand: 0} ->
{:reply, :ok, [], enqueue(state, event)}
{:reply, :ok, [], _enqueue(state, event)}

%{demand: demand} ->
{:reply, :ok, [{state.self, event}], put_in(state.demand, demand - 1)}
Expand All @@ -131,26 +138,26 @@ defmodule Kelvin.InOrderSubscription do

@impl GenStage
def handle_demand(demand, state) do
dequeue_events(state, demand, [])
_dequeue_events(state, demand, [])
end

defp dequeue_events(%{buffer_size: size} = state, demand, events)
defp _dequeue_events(%{buffer_size: size} = state, demand, events)
when size == 0 or demand == 0 do
{:noreply, :lists.reverse(events), put_in(state.demand, demand)}
end

defp dequeue_events(state, demand, events) do
case dequeue(state) do
defp _dequeue_events(state, demand, events) do
case _dequeue(state) do
{{:value, {event, from}}, state} ->
GenStage.reply(from, :ok)
dequeue_events(state, demand - 1, [{state.self, event} | events])
_dequeue_events(state, demand - 1, [{state.self, event} | events])

{{:value, event}, state} ->
dequeue_events(state, demand - 1, [{state.self, event} | events])
_dequeue_events(state, demand - 1, [{state.self, event} | events])
end
end

defp dequeue(state) do
defp _dequeue(state) do
case :queue.out(state.buffer) do
# coveralls-ignore-start
{:empty, buffer} ->
Expand All @@ -162,25 +169,13 @@ defmodule Kelvin.InOrderSubscription do
end
end

defp subscribe(state) do
state.config.connection
|> Extreme.RequestManager._name()
|> GenServer.call(
{:read_and_stay_subscribed, self(),
{state.config.stream_name,
do_function(state.config.restore_stream_position!) + 1,
state.max_buffer_size, true, false, :infinity}},
:infinity
)
end

defp do_function(func) when is_function(func, 0), do: func.()
defp _do_function(func) when is_function(func, 0), do: func.()

defp do_function({m, f, a}) when is_atom(m) and is_atom(f) and is_list(a) do
defp _do_function({m, f, a}) when is_atom(m) and is_atom(f) and is_list(a) do
apply(m, f, a)
end

defp enqueue(state, element) do
defp _enqueue(state, element) do
%{
state
| buffer: :queue.in(element, state.buffer),
Expand Down
22 changes: 22 additions & 0 deletions lib/kelvin/listener.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule Kelvin.Listener do
@moduledoc false
use Extreme.ListenerWithBackPressure

@impl Extreme.ListenerWithBackPressure
def on_init(opts) do
state = %{
producer: Keyword.fetch!(opts, :producer),
get_stream_position_fun: Keyword.fetch!(opts, :get_stream_position_fun)
}

{:ok, state}
end

@impl Extreme.ListenerWithBackPressure
def get_last_event(_stream_name, %{} = state),
do: state.get_stream_position_fun.()

@impl Extreme.ListenerWithBackPressure
def process_push(push, _stream_name, %{} = state),
do: GenServer.call(state.producer, {:on_event, push})
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ defmodule Kelvin.MixProject do
defp deps do
[
{:gen_stage, "~> 1.0"},
{:extreme, "~> 1.0 and >= 1.0.5"},
{:extreme, "~> 1.1.0"},
# docs
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
# test
Expand Down
8 changes: 4 additions & 4 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
"ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"},
"excoveralls": {:hex, :excoveralls, "0.18.0", "b92497e69465dc51bc37a6422226ee690ab437e4c06877e836f1c18daeb35da9", [:mix], [{:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "1109bb911f3cb583401760be49c02cbbd16aed66ea9509fc5479335d284da60b"},
"exprotobuf": {:hex, :exprotobuf, "1.2.17", "3003937da617f588a8fb63ebdd7b127a18d78d6502623c272076fd54c07c4de1", [:mix], [{:gpb, "~> 4.0", [hex: :gpb, repo: "hexpm", optional: false]}], "hexpm", "e07ec1e5ae6f8c1c8521450d5f6b658c8c700b1f34c70356e91ece0766f4361a"},
"extreme": {:hex, :extreme, "1.0.5", "fafb04fb514ed63667cdd9385b313d7c67aa10887a9f8f1f290cb721d1ee0e48", [:mix], [{:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:exprotobuf, "~> 1.2.9", [hex: :exprotobuf, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "84588993fcb8f410a3b90a2defaf966d50763a1f9db665a83e0546a34335fa45"},
"extreme": {:hex, :extreme, "1.1.0", "ca31dd0e983e888659f8984e1bac73744f35f98064c1f0743426863a8293e2f7", [:mix], [{:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:exprotobuf, "~> 1.2.9", [hex: :exprotobuf, repo: "hexpm", optional: false]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "96332ba5ab11dbfb0c4d75cd6be8ad5a7f6f3133a1dc80c6850b86d60b3a2e6a"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"gen_stage": {:hex, :gen_stage, "1.2.1", "19d8b5e9a5996d813b8245338a28246307fd8b9c99d1237de199d21efc4c76a1", [:mix], [], "hexpm", "83e8be657fa05b992ffa6ac1e3af6d57aa50aace8f691fcf696ff02f8335b001"},
"gpb": {:hex, :gpb, "4.21.0", "7a2eb8dd0f3032b7b46b04dcdb490ffabe43eab3a9a1f905bd03c9ec35babb0f", [:make, :rebar3], [], "hexpm", "da45984d26048d8d508d3bbffa6f4a5a5163841cefbf40809622bf92b4640de4"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"gpb": {:hex, :gpb, "4.21.1", "72e229c242d252d690addcfd04a6416c26c4d4d2c3521e05570a7a78b48d3bd1", [:make, :rebar3], [], "hexpm", "c05c9aea9e25bd341367a43b3d3eb68e951563911072259c5ec4cb6642f4ef22"},
"jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"},
"makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"},
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"},
}
71 changes: 39 additions & 32 deletions test/kelvin/in_order_subscription_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
defmodule Kelvin.InOrderSubscriptionTest do
use ExUnit.Case, async: true

@moduletag :capture_log

alias Extreme.Messages

setup do
Expand All @@ -14,7 +12,7 @@ defmodule Kelvin.InOrderSubscriptionTest do

describe "given events have been written to a stream" do
setup c do
write_events(0..100, c.stream_name)
_write_events(0..100, c.stream_name)
:ok
end

Expand All @@ -33,7 +31,7 @@ defmodule Kelvin.InOrderSubscriptionTest do
assert event.event.data == to_string(n)
end

write_events(101..200, c.stream_name)
_write_events(101..200, c.stream_name)

for n <- 101..200 do
assert_receive {:events, [event]}, 1_000
Expand Down Expand Up @@ -65,15 +63,7 @@ defmodule Kelvin.InOrderSubscriptionTest do

assert_receive {:DOWN, ^monitor_ref, _, _, _}

# we're hardcoding the restore_stream_position! function so this will
# restart from 0 instead of the current stream position as would be the
# case in a real-life system
for n <- 0..100 do
assert_receive {:events, [event]}, 10_000
assert event.event.data == to_string(n)
end

write_events(101..200, c.stream_name)
_write_events(101..200, c.stream_name)

for n <- 101..200 do
assert_receive {:events, [event]}, 1_000
Expand All @@ -83,28 +73,41 @@ defmodule Kelvin.InOrderSubscriptionTest do
end

describe "given only a few events have been written to a stream" do
setup c do
write_events(0..10, c.stream_name)
:ok
end

test "a slow subscription catches up", c do
total_events = 100

opts = [
producer_name: c.producer_name,
stream_name: c.stream_name,
restore_stream_position!: &restore_stream_position!/0,
test_proc: self(),
# note how we add an artificial bottleneck to the consumer here
sleep_time: 100,
sleep_time: 10,
# and tune down the catch-up (and therefore max buffer queue size)
catch_up_chunk_size: 1
catch_up_chunk_size: 1,
subscribe_after: 1
# in order to simulate a consumer which is slow and get coverage
# on the supply-buffering we do with the queue
]

start_supervised!({MyInOrderSupervisor, opts})

for n <- 0..10 do
spawn(fn ->
Process.sleep(200)
_write_events(0..total_events, c.stream_name)
end)

for n <- 0..total_events do
assert_receive {:events, [event]}, 6_000
assert event.event.data == to_string(n)
end

spawn(fn ->
Process.sleep(200)
_write_events(0..total_events, c.stream_name)
end)

for n <- 0..total_events do
assert_receive {:events, [event]}, 6_000
assert event.event.data == to_string(n)
end
Expand All @@ -113,19 +116,23 @@ defmodule Kelvin.InOrderSubscriptionTest do

defp restore_stream_position!, do: -1

defp write_events(range, stream) do
defp _write_events(range, stream) do
range
|> Enum.map(fn n ->
Messages.NewEvent.new(
event_id: Extreme.Tools.generate_uuid(),
event_type: "kelvin_test_event",
data_content_type: 1,
metadata_content_type: 1,
# valid JSON
data: to_string(n),
metadata: "{}"
)
Process.sleep(5)

[
Messages.NewEvent.new(
event_id: Extreme.Tools.generate_uuid(),
event_type: "kelvin_test_event",
data_content_type: 1,
metadata_content_type: 1,
# valid JSON
data: to_string(n),
metadata: "{}"
)
]
|> ExtremeClient.append_events(stream)
end)
|> ExtremeClient.append_events(stream)
end
end

0 comments on commit 39a2cf5

Please sign in to comment.