From aa4cebbdd91d2ac1dfb660b564d95b43df207d93 Mon Sep 17 00:00:00 2001 From: Kayla Brady <31781298+KaylaBrady@users.noreply.github.com> Date: Thu, 21 Nov 2024 14:18:54 -0500 Subject: [PATCH] refactor: common PubSub functions + macro (#244) * refactor: common PubSub functions + macro * cleanup: remove unused broadcast_message_name opt * refactor: broadcast_interval_ms as module attribute --- lib/mobile_app_backend/alerts/pub_sub.ex | 116 +++-------------- lib/mobile_app_backend/predictions/pub_sub.ex | 111 +++------------- lib/mobile_app_backend/pub_sub.ex | 119 ++++++++++++++++++ lib/mobile_app_backend/vehicles/pub_sub.ex | 115 +++-------------- test/mobile_app_backend/pub_sub_test.exs | 61 +++++++++ 5 files changed, 230 insertions(+), 292 deletions(-) create mode 100644 lib/mobile_app_backend/pub_sub.ex create mode 100644 test/mobile_app_backend/pub_sub_test.exs diff --git a/lib/mobile_app_backend/alerts/pub_sub.ex b/lib/mobile_app_backend/alerts/pub_sub.ex index a848d632..e68749ec 100644 --- a/lib/mobile_app_backend/alerts/pub_sub.ex +++ b/lib/mobile_app_backend/alerts/pub_sub.ex @@ -16,22 +16,17 @@ defmodule MobileAppBackend.Alerts.PubSub do 1. Regularly scheduled interval - configured by `:alerts_broadcast_interval_ms` 2. When there is a reset event of the underlying alert stream. """ - use GenServer + use MobileAppBackend.PubSub, + broadcast_interval_ms: + Application.compile_env(:mobile_app_backend, :alerts_broadcast_interval_ms, 500) + alias MBTAV3API.{JsonApi, Store, Stream} alias MobileAppBackend.Alerts.PubSub @behaviour PubSub.Behaviour - require Logger - @fetch_registry_key :fetch_registry_key - @typedoc """ - tuple {fetch_keys, format_fn} where format_fn transforms the data returned - into the format expected by subscribers. - """ - @type registry_value :: {Store.fetch_keys(), function()} - @type state :: %{last_dispatched_table_name: atom()} @spec start_link(Keyword.t()) :: GenServer.on_start() @@ -66,7 +61,6 @@ defmodule MobileAppBackend.Alerts.PubSub do @impl GenServer def init(opts \\ []) do Stream.StaticInstance.subscribe("alerts:to_store") - broadcast_timer(50) create_table_fn = @@ -78,102 +72,24 @@ defmodule MobileAppBackend.Alerts.PubSub do create_table_fn.() end - @impl true - # Any time there is a reset_event, broadcast so that subscribers are immediately - # notified of the changes. This way, when the stream first starts, - # consumers don't have to wait `:alerts_broadcast_interval_ms` to receive their first message. - def handle_info(:reset_event, state) do - send(self(), :broadcast) - {:noreply, state, :hibernate} - end - - def handle_info(:timed_broadcast, state) do - send(self(), :broadcast) - broadcast_timer() - {:noreply, state, :hibernate} - end - @impl GenServer def handle_info(:broadcast, %{last_dispatched_table_name: last_dispatched} = state) do Registry.dispatch(MobileAppBackend.Alerts.Registry, @fetch_registry_key, fn entries -> - Enum.group_by( - entries, - fn {_, {fetch_keys, format_fn}} -> {fetch_keys, format_fn} end, - fn {pid, _} -> pid end - ) - |> Enum.each(fn {registry_value, pids} -> - broadcast_new_alerts(registry_value, pids, last_dispatched) + entries + |> MobileAppBackend.PubSub.group_pids_by_target_data() + |> Enum.each(fn {{fetch_keys, format_fn} = registry_value, pids} -> + fetch_keys + |> Store.Alerts.fetch() + |> format_fn.() + |> MobileAppBackend.PubSub.broadcast_latest_data( + :new_alerts, + registry_value, + pids, + last_dispatched + ) end) end) {:noreply, state, :hibernate} end - - defp broadcast_new_alerts( - {fetch_keys, format_fn} = registry_value, - pids, - last_dispatched_table_name - ) do - latest_data = - fetch_keys - |> Store.Alerts.fetch() - |> format_fn.() - - last_dispatched_entry = :ets.lookup(last_dispatched_table_name, registry_value) - - if !already_broadcast(last_dispatched_entry, latest_data) do - broadcast(pids, latest_data, registry_value, last_dispatched_table_name) - end - end - - defp broadcast( - pids, - data, - {fetch_keys, _format_fn} = registry_value, - last_dispatched_table_name - ) do - Logger.info("#{__MODULE__} broadcasting to pids len=#{length(pids)}") - - {time_micros, _result} = - :timer.tc(__MODULE__, :broadcast_to_pids, [ - pids, - data - ]) - - Logger.info( - "#{__MODULE__} broadcast_to_pids fetch_keys=#{inspect(fetch_keys)} duration=#{time_micros / 1000}" - ) - - :ets.insert(last_dispatched_table_name, {registry_value, data}) - end - - defp already_broadcast([], _latest_data) do - # Nothing has been broadcast yet - false - end - - defp already_broadcast([{_registry_key, old_data}], latest_data) do - old_data == latest_data - end - - def broadcast_to_pids(pids, data) do - Enum.each( - pids, - &send( - &1, - {:new_alerts, data} - ) - ) - end - - defp broadcast_timer do - interval = - Application.get_env(:mobile_app_backend, :alerts_broadcast_interval_ms, 500) - - broadcast_timer(interval) - end - - defp broadcast_timer(interval) do - Process.send_after(self(), :timed_broadcast, interval) - end end diff --git a/lib/mobile_app_backend/predictions/pub_sub.ex b/lib/mobile_app_backend/predictions/pub_sub.ex index a3130906..c05a7e41 100644 --- a/lib/mobile_app_backend/predictions/pub_sub.ex +++ b/lib/mobile_app_backend/predictions/pub_sub.ex @@ -45,22 +45,20 @@ defmodule MobileAppBackend.Predictions.PubSub do Based on https://github.com/mbta/dotcom/blob/main/lib/predictions/pub_sub.ex """ - use GenServer + use MobileAppBackend.PubSub, + broadcast_interval_ms: + Application.compile_env(:mobile_app_backend, :predictions_broadcast_interval_ms, 10_000) + alias MBTAV3API.{Prediction, Stop, Store, Stream, Trip, Vehicle} alias MobileAppBackend.GlobalDataCache alias MobileAppBackend.Predictions.PubSub - @behaviour PubSub.Behaviour - require Logger + @behaviour PubSub.Behaviour + @fetch_registry_key :fetch_registry_key - @typedoc """ - tuple {fetch_keys, format_fn} where format_fn transforms the data returned - from fetching predictions from the store into the format expected by subscribers. - """ - @type registry_value :: {Store.fetch_keys(), function()} @type broadcast_message :: {:new_predictions, %{ @@ -260,97 +258,24 @@ defmodule MobileAppBackend.Predictions.PubSub do create_table_fn.() end - @impl true - # Any time there is a reset_event, broadcast so that subscribers are immediately - # notified of the changes. This way, when a prediction stream first starts, - # consumers don't have to wait `:predictions_broadcast_interval_ms` to receive their first message. - def handle_info(:reset_event, state) do - send(self(), :broadcast) - {:noreply, state, :hibernate} - end - - def handle_info(:timed_broadcast, state) do - send(self(), :broadcast) - broadcast_timer() - {:noreply, state, :hibernate} - end - @impl GenServer def handle_info(:broadcast, %{last_dispatched_table_name: last_dispatched} = state) do Registry.dispatch(MobileAppBackend.Predictions.Registry, @fetch_registry_key, fn entries -> - Enum.group_by( - entries, - fn {_, {fetch_keys, format_fn}} -> {fetch_keys, format_fn} end, - fn {pid, {_, _}} -> pid end - ) - |> Enum.each(fn {registry_value, pids} -> - broadcast_new_predictions(registry_value, pids, last_dispatched) + entries + |> MobileAppBackend.PubSub.group_pids_by_target_data() + |> Enum.each(fn {{fetch_keys, format_fn} = registry_value, pids} -> + fetch_keys + |> Store.Predictions.fetch_with_associations() + |> format_fn.() + |> MobileAppBackend.PubSub.broadcast_latest_data( + :new_predictions, + registry_value, + pids, + last_dispatched + ) end) end) {:noreply, state, :hibernate} end - - defp broadcast_new_predictions( - {fetch_keys, format_fn} = registry_value, - pids, - last_dispatched_table_name - ) do - new_predictions = - fetch_keys - |> Store.Predictions.fetch_with_associations() - |> format_fn.() - - last_dispatched_entry = :ets.lookup(last_dispatched_table_name, registry_value) - - if !predictions_already_broadcast(last_dispatched_entry, new_predictions) do - broadcast_predictions(pids, new_predictions, registry_value, last_dispatched_table_name) - end - end - - defp broadcast_predictions(pids, predictions, registry_value, last_dispatched_table_name) do - Logger.info("#{__MODULE__} broadcasting to pids len=#{length(pids)}") - - {time_micros, _result} = - :timer.tc(__MODULE__, :broadcast_to_pids, [ - pids, - predictions - ]) - - Logger.info( - "#{__MODULE__} broadcast_to_pids fetch_keys=#{inspect(elem(registry_value, 0))}duration=#{time_micros / 1000}" - ) - - :ets.insert(last_dispatched_table_name, {registry_value, predictions}) - end - - defp predictions_already_broadcast([], _new_preidctions) do - # Nothing has been broadcast yet - false - end - - defp predictions_already_broadcast([{_registry_key, last_predictions}], new_predictions) do - last_predictions == new_predictions - end - - def broadcast_to_pids(pids, predictions) do - Enum.each( - pids, - &send( - &1, - {:new_predictions, predictions} - ) - ) - end - - defp broadcast_timer do - interval = - Application.get_env(:mobile_app_backend, :predictions_broadcast_interval_ms, 10_000) - - broadcast_timer(interval) - end - - defp broadcast_timer(interval) do - Process.send_after(self(), :timed_broadcast, interval) - end end diff --git a/lib/mobile_app_backend/pub_sub.ex b/lib/mobile_app_backend/pub_sub.ex new file mode 100644 index 00000000..62b4a96c --- /dev/null +++ b/lib/mobile_app_backend/pub_sub.ex @@ -0,0 +1,119 @@ +defmodule MobileAppBackend.PubSub do + @moduledoc """ + Common functions for broadcasting the latest state from realtime MBTAV3API.Stores + to subscriber processes if the data has changed. + """ + alias MBTAV3API.Store + require Logger + + @typedoc """ + tuple {fetch_keys, format_fn} where format_fn transforms the data returned + into the format expected by subscribers. + """ + @type registry_value :: {Store.fetch_keys(), function()} + + @doc """ + Group registered pids by the data they are subscribed to. + """ + @spec group_pids_by_target_data([{pid(), registry_value()}]) :: %{registry_value() => [pid()]} + def group_pids_by_target_data(registry_entries) do + Enum.group_by( + registry_entries, + fn {_, {fetch_keys, format_fn}} -> {fetch_keys, format_fn} end, + fn {pid, _} -> pid end + ) + end + + @doc """ + Broadcast the latest data to subscriber pids if the data has changed since they last received it. + """ + @spec broadcast_latest_data(any(), atom(), registry_value(), [pid()], atom()) :: any() + def broadcast_latest_data( + latest_data, + broadcast_message_name, + registry_value, + pids, + last_dispatched_table_name + ) do + last_dispatched_entry = :ets.lookup(last_dispatched_table_name, registry_value) + + if !already_broadcast(last_dispatched_entry, latest_data) do + broadcast( + pids, + latest_data, + broadcast_message_name, + registry_value, + last_dispatched_table_name + ) + end + end + + defp already_broadcast([], _latest_data) do + # Nothing has been broadcast yet + false + end + + defp already_broadcast([{_registry_key, old_data}], latest_data) do + old_data == latest_data + end + + defp broadcast( + pids, + data, + broadcast_message_name, + {fetch_keys, _format_fn} = registry_value, + last_dispatched_table_name + ) do + Logger.info("#{__MODULE__} broadcasting to pids len=#{length(pids)}") + + {time_micros, _result} = + :timer.tc(__MODULE__, :broadcast_to_pids, [ + pids, + data, + broadcast_message_name + ]) + + Logger.info( + "#{__MODULE__} broadcast_to_pids fetch_keys=#{inspect(fetch_keys)} duration=#{time_micros / 1000}" + ) + + :ets.insert(last_dispatched_table_name, {registry_value, data}) + end + + def broadcast_to_pids(pids, data, broadcast_message_name) do + Enum.each( + pids, + &send( + &1, + {broadcast_message_name, data} + ) + ) + end + + defmacro __using__(opts) do + quote location: :keep, bind_quoted: [opts: opts] do + use GenServer + + @broadcast_interval_ms Keyword.fetch!(opts, :broadcast_interval_ms) + + # Any time there is a reset_event, broadcast so that subscribers are immediately + # notified of the changes. This way, when the stream first starts, + # consumers don't have to wait `:broadcast_interval_ms` to receive their first message. + @impl true + def handle_info(:reset_event, state) do + send(self(), :broadcast) + {:noreply, state, :hibernate} + end + + def handle_info(:timed_broadcast, state) do + send(self(), :broadcast) + broadcast_timer(@broadcast_interval_ms) + {:noreply, state, :hibernate} + end + + defp broadcast_timer(interval) do + Process.send_after(self(), :timed_broadcast, interval) + end + end + end +end diff --git a/lib/mobile_app_backend/vehicles/pub_sub.ex b/lib/mobile_app_backend/vehicles/pub_sub.ex index 71dc02e0..95c4318b 100644 --- a/lib/mobile_app_backend/vehicles/pub_sub.ex +++ b/lib/mobile_app_backend/vehicles/pub_sub.ex @@ -22,22 +22,17 @@ defmodule MobileAppBackend.Vehicles.PubSub do 1. Regularly scheduled interval - configured by `:vehicles_broadcast_interval_ms` 2. When there is a reset event of the underlying vehicle stream. """ - use GenServer + use MobileAppBackend.PubSub, + broadcast_interval_ms: + Application.compile_env(:mobile_app_backend, :vehicles_broadcast_interval_ms, 500) + alias MBTAV3API.{Store, Stream} alias MobileAppBackend.Vehicles.PubSub @behaviour PubSub.Behaviour - require Logger - @fetch_registry_key :fetch_registry_key - @typedoc """ - tuple {fetch_keys, format_fn} where format_fn transforms the data returned - into the format expected by subscribers. - """ - @type registry_value :: {Store.fetch_keys(), function()} - @type state :: %{last_dispatched_table_name: atom()} @spec start_link(Keyword.t()) :: GenServer.on_start() @@ -102,102 +97,24 @@ defmodule MobileAppBackend.Vehicles.PubSub do create_table_fn.() end - @impl true - # Any time there is a reset_event, broadcast so that subscribers are immediately - # notified of the changes. This way, when the vehicle stream first starts, - # consumers don't have to wait `:vehicles_broadcast_interval_ms` to receive their first message. - def handle_info(:reset_event, state) do - send(self(), :broadcast) - {:noreply, state, :hibernate} - end - - def handle_info(:timed_broadcast, state) do - send(self(), :broadcast) - broadcast_timer() - {:noreply, state, :hibernate} - end - @impl GenServer def handle_info(:broadcast, %{last_dispatched_table_name: last_dispatched} = state) do Registry.dispatch(MobileAppBackend.Vehicles.Registry, @fetch_registry_key, fn entries -> - Enum.group_by( - entries, - fn {_, {fetch_keys, format_fn}} -> {fetch_keys, format_fn} end, - fn {pid, _} -> pid end - ) - |> Enum.each(fn {registry_value, pids} -> - broadcast_new_vehicles(registry_value, pids, last_dispatched) + entries + |> MobileAppBackend.PubSub.group_pids_by_target_data() + |> Enum.each(fn {{fetch_keys, format_fn} = registry_value, pids} -> + fetch_keys + |> Store.Vehicles.fetch() + |> format_fn.() + |> MobileAppBackend.PubSub.broadcast_latest_data( + :new_vehicles, + registry_value, + pids, + last_dispatched + ) end) end) {:noreply, state, :hibernate} end - - defp broadcast_new_vehicles( - {fetch_keys, format_fn} = registry_value, - pids, - last_dispatched_table_name - ) do - new_vehicles = - fetch_keys - |> Store.Vehicles.fetch() - |> format_fn.() - - last_dispatched_entry = :ets.lookup(last_dispatched_table_name, registry_value) - - if !vehicles_already_broadcast(last_dispatched_entry, new_vehicles) do - broadcast_vehicles(pids, new_vehicles, registry_value, last_dispatched_table_name) - end - end - - defp broadcast_vehicles( - pids, - vehicles, - {fetch_keys, _format_fn} = registry_value, - last_dispatched_table_name - ) do - Logger.info("#{__MODULE__} broadcasting to pids len=#{length(pids)}") - - {time_micros, _result} = - :timer.tc(__MODULE__, :broadcast_to_pids, [ - pids, - vehicles - ]) - - Logger.info( - "#{__MODULE__} broadcast_to_pids fetch_keys=#{inspect(fetch_keys)} duration=#{time_micros / 1000}" - ) - - :ets.insert(last_dispatched_table_name, {registry_value, vehicles}) - end - - defp vehicles_already_broadcast([], _new_vehicles) do - # Nothing has been broadcast yet - false - end - - defp vehicles_already_broadcast([{_registry_key, last_vehicles}], new_vehicles) do - last_vehicles == new_vehicles - end - - def broadcast_to_pids(pids, vehicles) do - Enum.each( - pids, - &send( - &1, - {:new_vehicles, vehicles} - ) - ) - end - - defp broadcast_timer do - interval = - Application.get_env(:mobile_app_backend, :vehicles_broadcast_interval_ms, 500) - - broadcast_timer(interval) - end - - defp broadcast_timer(interval) do - Process.send_after(self(), :timed_broadcast, interval) - end end diff --git a/test/mobile_app_backend/pub_sub_test.exs b/test/mobile_app_backend/pub_sub_test.exs new file mode 100644 index 00000000..adbdfee7 --- /dev/null +++ b/test/mobile_app_backend/pub_sub_test.exs @@ -0,0 +1,61 @@ +defmodule MobileAppBackend.PubSubTests do + use ExUnit.Case + + alias MobileAppBackend.PubSub + + setup do + _dispatched_table = :ets.new(:fake_last_dispatched, [:set, :named_table]) + :ok + end + + describe "group_pids_by_target_data/1" do + test "groups by registy value" do + assert %{ + {:fetch_1, :format_1} => [:pid_1, :pid_2], + {:fetch_1, :format_2} => [:pid_3], + {:fetch_2, :format_1} => [:pid_4] + } == + PubSub.group_pids_by_target_data([ + {:pid_1, {:fetch_1, :format_1}}, + {:pid_2, {:fetch_1, :format_1}}, + {:pid_3, {:fetch_1, :format_2}}, + {:pid_4, {:fetch_2, :format_1}} + ]) + end + end + + describe "broadcast_latest_data/5" do + test "broadcast latest data only broadcasts when data has changed" do + PubSub.broadcast_latest_data( + "latest_data", + :new_data, + {:fetch_keys, :format_fn}, + [self()], + :fake_last_dispatched + ) + + assert_receive {:new_data, "latest_data"} + + # Doesn't re-send the same alerts that have already been seen + PubSub.broadcast_latest_data( + "latest_data", + :new_data, + {:fetch_keys, :format_fn}, + [self()], + :fake_last_dispatched + ) + + refute_receive _ + + PubSub.broadcast_latest_data( + "even_newer_data", + :new_data, + {:fetch_keys, :format_fn}, + [self()], + :fake_last_dispatched + ) + + assert_receive {:new_data, "even_newer_data"} + end + end +end