From a5a95b33c5dd1a239c5171610bec0f11f6137f80 Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Fri, 13 Sep 2024 10:43:47 +0200 Subject: [PATCH] Implement heartbeat check-ins Add an `Appsignal.CheckIn.heartbeat` helper that emits a single heartbeat for the check-in identifier given. When called with `continuous: true` as the second argument, it starts and links a separate Elixir process that emits a heartbeat every thirty seconds. Unlike the equivalent functionality in the Ruby integration, which spawns a thread that will stay alive for the lifetime of the Ruby process, the Elixir process is linked to the process that spawned it, meaning it will be shut down when its parent process is shut down. This allows it to be used to track the lifetime of individual Elixir processes. Additionally, it is also possible to add `Appsignal.CheckIn.Heartbeat` as a child process to a supervisor, meaning its lifetime will be tied to that of the other processes supervised by it. Finally, the functionality seen in the Ruby integration could also be achieved by manually calling `Appsignal.CheckIn.Heartbeat.start/1`, keeping the process unlinked and therefore alive for the entirety of the Elixir node's lifetime, though this is unlikely to be useful in the Elixir process model. --- .changesets/add-heartbeat-check-ins.md | 39 ++++++++ config/config.exs | 2 + lib/appsignal/check_in/check_in.ex | 21 +++++ lib/appsignal/check_in/cron.ex | 35 +------ lib/appsignal/check_in/event.ex | 92 ++++++++++++++++++ lib/appsignal/check_in/heartbeat.ex | 39 ++++++++ lib/appsignal/check_in/scheduler.ex | 59 ++++-------- test/appsignal/check_in/check_in_test.exs | 84 ++++++++++++++++- test/appsignal/check_in/event_test.exs | 104 +++++++++++++++++++++ test/appsignal/check_in/scheduler_test.exs | 15 +-- test/support/appsignal/fake_scheduler.ex | 8 ++ 11 files changed, 415 insertions(+), 83 deletions(-) create mode 100644 .changesets/add-heartbeat-check-ins.md create mode 100644 lib/appsignal/check_in/event.ex create mode 100644 lib/appsignal/check_in/heartbeat.ex create mode 100644 test/appsignal/check_in/event_test.exs diff --git a/.changesets/add-heartbeat-check-ins.md b/.changesets/add-heartbeat-check-ins.md new file mode 100644 index 00000000..cd7e42d2 --- /dev/null +++ b/.changesets/add-heartbeat-check-ins.md @@ -0,0 +1,39 @@ +--- +bump: minor +type: add +--- + +Add support for heartbeat check-ins. + +Use the `Appsignal.CheckIn.heartbeat` method to send a single heartbeat check-in event from your application. This can be used, for example, in a `GenServer`'s callback: + +```elixir +@impl true +def handle_cast({:process_job, job}, jobs) do + Appsignal.CheckIn.heartbeat("job_processor") + {:noreply, [job | jobs], {:continue, :process_job}} +end +``` + +Heartbeats are deduplicated and sent asynchronously, without blocking the current thread. Regardless of how often the `.heartbeat` method is called, at most one heartbeat with the same identifier will be sent every ten seconds. + +Pass `continuous: true` as the second argument to send heartbeats continuously during the entire lifetime of the current process. This can be used, for example, during a `GenServer`'s initialisation: + +```elixir +@impl true +def init(_arg) do + Appsignal.CheckIn.heartbeat("my_genserver", continuous: true) + {:ok, nil} +end +``` + +You can also use `Appsignal.CheckIn.Heartbeat` as a supervisor's child process, in order for heartbeats to be sent continuously during the lifetime of the supervisor. This can be used, for example, during an `Application`'s start: + +```elixir +@impl true +def start(_type, _args) do + Supervisor.start_link([ + {Appsignal.CheckIn.Heartbeat, "my_application"} + ], strategy: :one_for_one, name: MyApplication.Supervisor) +end +``` diff --git a/config/config.exs b/config/config.exs index 983bbadb..ab8ee058 100644 --- a/config/config.exs +++ b/config/config.exs @@ -20,7 +20,9 @@ if Mix.env() in [:bench, :test, :test_no_nif] do config :appsignal, appsignal_span: Appsignal.Test.Span config :appsignal, appsignal_tracer: Appsignal.Test.Tracer config :appsignal, appsignal_tracer_nif: Appsignal.Test.Nif + config :appsignal, deletion_delay: 100 + config :appsignal, appsignal_checkin_forever_interval_milliseconds: 10 config :appsignal, :config, otp_app: :appsignal, diff --git a/lib/appsignal/check_in/check_in.ex b/lib/appsignal/check_in/check_in.ex index 324ca0b3..abe7813d 100644 --- a/lib/appsignal/check_in/check_in.ex +++ b/lib/appsignal/check_in/check_in.ex @@ -1,5 +1,12 @@ defmodule Appsignal.CheckIn do alias Appsignal.CheckIn.Cron + alias Appsignal.CheckIn.Event + + @scheduler Application.compile_env( + :appsignal, + :appsignal_checkin_scheduler, + Appsignal.CheckIn.Scheduler + ) @spec cron(String.t()) :: :ok def cron(identifier) do @@ -16,4 +23,18 @@ defmodule Appsignal.CheckIn do output end + + @spec heartbeat(String.t()) :: :ok + @spec heartbeat(String.t(), continuous: boolean) :: :ok + def heartbeat(identifier) do + @scheduler.schedule(Event.heartbeat(identifier)) + :ok + end + + def heartbeat(identifier, continuous: true) do + Appsignal.CheckIn.Heartbeat.start_link(identifier) + :ok + end + + def heartbeat(identifier, _), do: heartbeat(identifier) end diff --git a/lib/appsignal/check_in/cron.ex b/lib/appsignal/check_in/cron.ex index 825f679d..b3bacaac 100644 --- a/lib/appsignal/check_in/cron.ex +++ b/lib/appsignal/check_in/cron.ex @@ -1,6 +1,6 @@ defmodule Appsignal.CheckIn.Cron do alias __MODULE__ - alias Appsignal.CheckIn.Cron.Event + alias Appsignal.CheckIn.Event @scheduler Application.compile_env( :appsignal, @@ -25,40 +25,11 @@ defmodule Appsignal.CheckIn.Cron do @spec start(Cron.t()) :: :ok def start(cron) do - @scheduler.schedule(Event.new(cron, :start)) + @scheduler.schedule(Event.cron(cron, :start)) end @spec finish(Cron.t()) :: :ok def finish(cron) do - @scheduler.schedule(Event.new(cron, :finish)) - end -end - -defmodule Appsignal.CheckIn.Cron.Event do - alias __MODULE__ - alias Appsignal.CheckIn.Cron - - @derive Jason.Encoder - - @type kind :: :start | :finish - @type t :: %Event{ - identifier: String.t(), - digest: String.t(), - kind: kind, - timestamp: integer, - check_in_type: :cron - } - - defstruct [:identifier, :digest, :kind, :timestamp, :check_in_type] - - @spec new(Cron.t(), kind) :: t - def new(%Cron{identifier: identifier, digest: digest}, kind) do - %Event{ - identifier: identifier, - digest: digest, - kind: kind, - timestamp: System.system_time(:second), - check_in_type: :cron - } + @scheduler.schedule(Event.cron(cron, :finish)) end end diff --git a/lib/appsignal/check_in/event.ex b/lib/appsignal/check_in/event.ex new file mode 100644 index 00000000..989e4fc8 --- /dev/null +++ b/lib/appsignal/check_in/event.ex @@ -0,0 +1,92 @@ +defmodule Appsignal.CheckIn.Event do + alias __MODULE__ + alias Appsignal.CheckIn.Cron + + @type kind :: :start | :finish + @type check_in_type :: :cron | :heartbeat + @type t :: %Event{ + identifier: String.t(), + digest: String.t() | nil, + kind: kind | nil, + timestamp: integer, + check_in_type: check_in_type + } + + defstruct [:identifier, :digest, :kind, :timestamp, :check_in_type] + + @spec cron(Cron.t(), kind) :: t + def cron(%Cron{identifier: identifier, digest: digest}, kind) do + %Event{ + identifier: identifier, + digest: digest, + kind: kind, + timestamp: System.system_time(:second), + check_in_type: :cron + } + end + + @spec heartbeat(String.t()) :: t + def heartbeat(identifier) do + %Event{ + identifier: identifier, + timestamp: System.system_time(:second), + check_in_type: :heartbeat + } + end + + @spec describe([t]) :: String.t() + def describe([]) do + # This shouldn't happen. + "no check-in events" + end + + def describe([%Event{check_in_type: :cron} = event]) do + "cron check-in `#{event.identifier || "unknown"}` " <> + "#{event.kind || "unknown"} event (digest #{event.digest || "unknown"})" + end + + def describe([%Event{check_in_type: :heartbeat} = event]) do + "heartbeat check-in `#{event.identifier || "unknown"}` event" + end + + def describe([_event]) do + # This shouldn't happen. + "unknown check-in event" + end + + def describe(events) do + "#{Enum.count(events)} check-in events" + end + + @spec redundant?(t, t) :: boolean + def redundant?( + %Event{check_in_type: :cron} = event, + %Event{check_in_type: :cron} = new_event + ) do + # Consider any existing cron check-in event redundant if it has the + # same identifier, digest and kind as the one we're adding. + event.identifier == new_event.identifier && + event.kind == new_event.kind && + event.digest == new_event.digest + end + + def redundant?( + %Event{check_in_type: :heartbeat} = event, + %Event{check_in_type: :heartbeat} = new_event + ) do + # Consider any existing heartbeat check-in event redundant if it has + # the same identifier as the one we're adding. + event.identifier == new_event.identifier + end + + def redundant?(_event, _new_event), do: false +end + +defimpl Jason.Encoder, for: Appsignal.CheckIn.Event do + def encode(%Appsignal.CheckIn.Event{} = event, opts) do + event + |> Map.from_struct() + |> Map.reject(&is_nil/1) + |> Jason.Encode.map(opts) + end +end diff --git a/lib/appsignal/check_in/heartbeat.ex b/lib/appsignal/check_in/heartbeat.ex new file mode 100644 index 00000000..c8db5822 --- /dev/null +++ b/lib/appsignal/check_in/heartbeat.ex @@ -0,0 +1,39 @@ +defmodule Appsignal.CheckIn.Heartbeat do + use GenServer, shutdown: :brutal_kill + + @interval_milliseconds Application.compile_env( + :appsignal, + :appsignal_checkin_forever_interval_milliseconds, + 30_000 + ) + + @impl true + def init(identifier) do + {:ok, identifier, {:continue, :heartbeat}} + end + + def start(identifier) do + GenServer.start(__MODULE__, identifier) + end + + def start_link(identifier) do + GenServer.start_link(__MODULE__, identifier) + end + + def heartbeat(identifier) do + GenServer.cast(__MODULE__, {:heartbeat, identifier}) + :ok + end + + @impl true + def handle_continue(:heartbeat, identifier) do + Appsignal.CheckIn.heartbeat(identifier) + Process.send_after(self(), :heartbeat, @interval_milliseconds) + {:noreply, identifier} + end + + @impl true + def handle_info(:heartbeat, identifier) do + {:noreply, identifier, {:continue, :heartbeat}} + end +end diff --git a/lib/appsignal/check_in/scheduler.ex b/lib/appsignal/check_in/scheduler.ex index 412143c8..dc7bb55a 100644 --- a/lib/appsignal/check_in/scheduler.ex +++ b/lib/appsignal/check_in/scheduler.ex @@ -26,7 +26,7 @@ end defmodule Appsignal.CheckIn.Scheduler do use GenServer - alias Appsignal.CheckIn.Cron + alias Appsignal.CheckIn.Event @debounce Application.compile_env( :appsignal, @@ -68,9 +68,7 @@ defmodule Appsignal.CheckIn.Scheduler do if Appsignal.Config.active?() do GenServer.cast(__MODULE__, {:schedule, event}) else - @integration_logger.debug( - "AppSignal not active, not scheduling #{describe_events([event])}" - ) + @integration_logger.debug("AppSignal not active, not scheduling #{Event.describe([event])}") end :ok @@ -78,7 +76,7 @@ defmodule Appsignal.CheckIn.Scheduler do @impl true def handle_cast({:schedule, event}, state) do - @integration_logger.trace("Scheduling #{describe_events([event])} to be transmitted") + @integration_logger.trace("Scheduling #{Event.describe([event])} to be transmitted") schedule_transmission(state) @@ -95,7 +93,7 @@ defmodule Appsignal.CheckIn.Scheduler do @impl true def handle_continue({:transmit, events}, state) do - description = describe_events(events) + description = Event.describe(events) config = Appsignal.Config.config() endpoint = "#{config[:logging_endpoint]}/check_ins/json" @@ -150,42 +148,17 @@ defmodule Appsignal.CheckIn.Scheduler do defp add_event(events, event) do # Remove redundant events, keeping the newly added one, which # should be the one with the most recent timestamp. - [event | Enum.reject(events, &redundant_event?(&1, event))] - end - - defp redundant_event?(%Cron.Event{} = event, %Cron.Event{} = new_event) do - # Consider any existing cron check-in event redundant if it has the - # same identifier, digest and kind as the one we're adding. - is_redundant = - event.identifier == new_event.identifier && - event.kind == new_event.kind && - event.digest == new_event.digest - - if is_redundant do - @integration_logger.debug("Replacing previously scheduled #{describe_events([event])}") - end - - is_redundant - end - - defp redundant_event?(_event, _new_event), do: false - - defp describe_events([]) do - # This shouldn't happen. - "no check-in events" - end - - defp describe_events(events) when length(events) > 1 do - "#{Enum.count(events)} check-in events" - end - - defp describe_events([%Cron.Event{} = event]) do - "cron check-in `#{event.identifier || "unknown"}` " <> - "#{event.kind || "unknown"} event (digest #{event.digest || "unknown"})" - end - - defp describe_events([_event]) do - # This shouldn't happen. - "unknown check-in event" + [ + event + | Enum.reject(events, fn existing_event -> + is_redundant = Event.redundant?(existing_event, event) + + if is_redundant do + @integration_logger.debug("Replacing previously scheduled #{Event.describe([event])}") + end + + is_redundant + end) + ] end end diff --git a/test/appsignal/check_in/check_in_test.exs b/test/appsignal/check_in/check_in_test.exs index 756eabd7..8a289417 100644 --- a/test/appsignal/check_in/check_in_test.exs +++ b/test/appsignal/check_in/check_in_test.exs @@ -2,11 +2,21 @@ defmodule Appsignal.CheckInTest do use ExUnit.Case alias Appsignal.CheckIn alias Appsignal.CheckIn.Cron - alias Appsignal.CheckIn.Cron.Event + alias Appsignal.CheckIn.Event + alias Appsignal.CheckIn.Forever alias Appsignal.FakeScheduler + import AppsignalTest.Utils, only: [until: 1] setup do start_supervised!(FakeScheduler) + + on_exit(fn -> + # Restart the forever scheduler in between tests to clear + # any `Process.send_after/3` messages queued. + Supervisor.terminate_child(Appsignal.Supervisor, Forever) + Supervisor.restart_child(Appsignal.Supervisor, Forever) + end) + :ok end @@ -65,6 +75,78 @@ defmodule Appsignal.CheckInTest do end end + describe "heartbeat/1" do + test "transmits a heartbeat event" do + CheckIn.heartbeat("heartbeat-name") + + assert [ + %Event{identifier: "heartbeat-name", check_in_type: :heartbeat} + ] = FakeScheduler.scheduled() + end + end + + describe "heartbeat/2, with continuous: true" do + test "continuously transmits heartbeat events" do + CheckIn.heartbeat("heartbeat-name", continuous: true) + + until(fn -> + assert [ + %Event{identifier: "heartbeat-name", check_in_type: :heartbeat} + ] = FakeScheduler.scheduled() + end) + + until(fn -> + assert [ + %Event{identifier: "heartbeat-name", check_in_type: :heartbeat}, + %Event{identifier: "heartbeat-name", check_in_type: :heartbeat} + ] = FakeScheduler.scheduled() + end) + end + + test "is linked to the caller process" do + CheckIn.heartbeat("timer", continuous: true) + + {:ok, agent} = + Agent.start(fn -> + CheckIn.heartbeat("agent", continuous: true) + end) + + until(fn -> + assert %{"timer" => 2, "agent" => 2} = FakeScheduler.identifier_count() + end) + + Process.exit(agent, :kill) + + until(fn -> + assert %{"timer" => 4, "agent" => 2} = FakeScheduler.identifier_count() + end) + end + end + + describe "Appsignal.CheckIn.Heartbeat" do + test "can be added to a supervisor" do + CheckIn.heartbeat("timer", continuous: true) + + {:ok, supervisor} = + Supervisor.start_link( + [ + {Appsignal.CheckIn.Heartbeat, "supervisor"} + ], + strategy: :one_for_one + ) + + until(fn -> + assert %{"timer" => 2, "supervisor" => 2} = FakeScheduler.identifier_count() + end) + + Supervisor.stop(supervisor) + + until(fn -> + assert %{"timer" => 4, "supervisor" => 2} = FakeScheduler.identifier_count() + end) + end + end + describe "deprecated heartbeat functions" do test "forwards heartbeat/1 to CheckIn.cron/1" do Appsignal.heartbeat("heartbeat-name") diff --git a/test/appsignal/check_in/event_test.exs b/test/appsignal/check_in/event_test.exs new file mode 100644 index 00000000..4ba774b0 --- /dev/null +++ b/test/appsignal/check_in/event_test.exs @@ -0,0 +1,104 @@ +defmodule Appsignal.CheckInEventTest do + use ExUnit.Case + alias Appsignal.CheckIn.Event + alias Appsignal.CheckIn.Cron + + describe "describe/1" do + test "describes a list of many events" do + events = [ + Event.cron(%Cron{identifier: "cron-checkin-name", digest: "digest"}, :start), + Event.cron(%Cron{identifier: "cron-checkin-name", digest: "digest"}, :finish), + Event.heartbeat("heartbeat-checkin-name") + ] + + assert "3 check-in events" == Event.describe(events) + end + + test "describes one cron check-in event" do + event = Event.cron(%Cron{identifier: "cron-checkin-name", digest: "some-digest"}, :start) + + assert "cron check-in `cron-checkin-name` start event (digest some-digest)" == + Event.describe([event]) + end + + test "describes one heartbeat check-in event" do + event = Event.heartbeat("heartbeat-checkin-name") + + assert "heartbeat check-in `heartbeat-checkin-name` event" == Event.describe([event]) + end + + test "describes one unknown check-in event" do + event = %Event{} + + assert "unknown check-in event" == Event.describe([event]) + end + + test "describs an empty list of events" do + assert "no check-in events" == Event.describe([]) + end + end + + describe "redundant?/2" do + test "returns false if the events are of different types" do + event1 = Event.heartbeat("checkin-name") + event2 = Event.heartbeat("checkin-name") + + event2 = Map.put(event2, :check_in_type, :cron) + + assert false == Event.redundant?(event1, event2) + end + + test "returns false if the events are of unknown type" do + event1 = %Event{} + event2 = %Event{} + + assert false == Event.redundant?(event1, event2) + end + + test "returns false if heartbeat events have different identifiers" do + event1 = Event.heartbeat("heartbeat-checkin-name") + event2 = Event.heartbeat("another-heartbeat-checkin-name") + + assert false == Event.redundant?(event1, event2) + end + + test "returns true if heartbeat events have the same identifier" do + event1 = Event.heartbeat("heartbeat-checkin-name") + event2 = Event.heartbeat("heartbeat-checkin-name") + + assert true == Event.redundant?(event1, event2) + end + + test "returns false if cron events have different identifiers" do + event1 = Event.cron(%Cron{identifier: "cron-checkin-name", digest: "digest"}, :start) + + event2 = + Event.cron(%Cron{identifier: "another-cron-checkin-name", digest: "digest"}, :start) + + assert false == Event.redundant?(event1, event2) + end + + test "returns false if cron events have different kinds" do + event1 = Event.cron(%Cron{identifier: "cron-checkin-name", digest: "digest"}, :start) + event2 = Event.cron(%Cron{identifier: "cron-checkin-name", digest: "digest"}, :finish) + + assert false == Event.redundant?(event1, event2) + end + + test "returns false if cron events have different digests" do + event1 = Event.cron(%Cron{identifier: "cron-checkin-name", digest: "digest"}, :start) + event2 = Event.cron(%Cron{identifier: "cron-checkin-name", digest: "other-digest"}, :start) + + assert false == Event.redundant?(event1, event2) + end + + test "returns true if cron events have the same identifier, kind and digest" do + cron = %Cron{identifier: "cron-checkin-name"} + + event1 = Event.cron(cron, :start) + event2 = Event.cron(cron, :start) + + assert true == Event.redundant?(event1, event2) + end + end +end diff --git a/test/appsignal/check_in/scheduler_test.exs b/test/appsignal/check_in/scheduler_test.exs index 3789c866..618d9eb6 100644 --- a/test/appsignal/check_in/scheduler_test.exs +++ b/test/appsignal/check_in/scheduler_test.exs @@ -1,6 +1,7 @@ defmodule Appsignal.CheckInSchedulerTest do use ExUnit.Case alias Appsignal.CheckIn.Cron + alias Appsignal.CheckIn.Event alias Appsignal.CheckIn.Scheduler alias Appsignal.FakeDebounce alias Appsignal.FakeIntegrationLogger @@ -72,7 +73,7 @@ defmodule Appsignal.CheckInSchedulerTest do until_all_messages_processed(Scheduler) assert [ - %Cron.Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} + %Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} ] = scheduler_events() assert FakeIntegrationLogger.logged?( @@ -93,7 +94,7 @@ defmodule Appsignal.CheckInSchedulerTest do until_all_messages_processed(Scheduler) assert [ - %Cron.Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} + %Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} ] = scheduler_events() assert FakeIntegrationLogger.logged?( @@ -123,7 +124,7 @@ defmodule Appsignal.CheckInSchedulerTest do assert [ {[ - %Cron.Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} + %Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} ], :ndjson} ] = FakeTransmitter.transmitted_payloads() @@ -147,12 +148,12 @@ defmodule Appsignal.CheckInSchedulerTest do assert [ {[ - %Cron.Event{ + %Event{ identifier: "cron-checkin-name", kind: :start, check_in_type: :cron }, - %Cron.Event{ + %Event{ identifier: "cron-checkin-name", kind: :finish, check_in_type: :cron @@ -227,7 +228,7 @@ defmodule Appsignal.CheckInSchedulerTest do assert [ {[ - %Cron.Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} + %Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} ], :ndjson} ] = FakeTransmitter.transmitted_payloads() @@ -257,7 +258,7 @@ defmodule Appsignal.CheckInSchedulerTest do assert [ {[ - %Cron.Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} + %Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} ], :ndjson} ] = FakeTransmitter.transmitted_payloads() end diff --git a/test/support/appsignal/fake_scheduler.ex b/test/support/appsignal/fake_scheduler.ex index 0ff94b90..cb2f7d51 100644 --- a/test/support/appsignal/fake_scheduler.ex +++ b/test/support/appsignal/fake_scheduler.ex @@ -34,6 +34,14 @@ defmodule Appsignal.FakeScheduler do Agent.get(__MODULE__, &Enum.reverse(&1.scheduled)) end + def identifier_count do + Agent.get(__MODULE__, fn %{scheduled: scheduled} -> + Enum.reduce(scheduled, %{}, fn event, acc -> + Map.update(acc, event.identifier, 1, &(&1 + 1)) + end) + end) + end + defp proxy? do Agent.get(__MODULE__, & &1.is_proxy) end