From d8312478eb11126aa0d2018f54ff1dfbec25e6e9 Mon Sep 17 00:00:00 2001 From: Noemi <45180344+unflxw@users.noreply.github.com> Date: Thu, 29 Aug 2024 16:18:41 +0200 Subject: [PATCH] Implement check-in scheduler Implement a check-in event scheduler, which schedules check-in events to be transmitted in a separate Elixir process, with a minimum wait period of ten seconds between requests, and a wait period of a tenth of a second before the first request, referred to as "debounce" periods. This is a relatively minor improvement for the existing cron check-ins, but it is a requirement for the heartbeat check-ins, both to avoid slowing down customers' applications with blocking requests, and to avoid misuse of the feature from spamming our servers. The scheduler also acts as a deduplicator, removing "similar enough" check-in events -- again, not particularly interesting for cron check-ins, but a requirement to minimise damage when heartbeat check-ins are misused. Use the previously implemented support for NDJSON payloads in the transmitter in order to send all the check-ins in a single request. Similar to the probes, an Elixir process is started by the AppSignal application supervisor. When the process receives an event to be transmitted, it stores it and schedules a message, which will trigger the transmit, to be sent to itself at a later time. When this Elixir process receives a shutdown signal, it attempts to transmit all scheduled events before it shuts down. --- config/config.exs | 3 + lib/appsignal.ex | 3 +- lib/appsignal/check_in.ex | 115 ------- lib/appsignal/check_in/check_in.ex | 19 ++ lib/appsignal/check_in/cron.ex | 64 ++++ lib/appsignal/check_in/scheduler.ex | 191 +++++++++++ mix_helpers.exs | 2 +- test/appsignal/check_in/check_in_test.exs | 99 ++++++ test/appsignal/check_in/scheduler_test.exs | 310 ++++++++++++++++++ test/appsignal/check_in_test.exs | 119 ------- test/appsignal/ecto_repo_test.exs | 6 +- test/mix/helpers_test.exs | 2 +- test/support/appsignal/fake_debounce.ex | 46 +++ .../appsignal/fake_integration_logger.ex | 14 +- test/support/appsignal/fake_scheduler.ex | 40 +++ test/support/appsignal/fake_transmitter.ex | 12 +- test/support/fake_system.ex | 4 + test/support/utils.ex | 16 + 18 files changed, 819 insertions(+), 246 deletions(-) delete mode 100644 lib/appsignal/check_in.ex create mode 100644 lib/appsignal/check_in/check_in.ex create mode 100644 lib/appsignal/check_in/cron.ex create mode 100644 lib/appsignal/check_in/scheduler.ex create mode 100644 test/appsignal/check_in/check_in_test.exs create mode 100644 test/appsignal/check_in/scheduler_test.exs delete mode 100644 test/appsignal/check_in_test.exs create mode 100644 test/support/appsignal/fake_debounce.ex create mode 100644 test/support/appsignal/fake_scheduler.ex diff --git a/config/config.exs b/config/config.exs index c91f60786..983bbadb5 100644 --- a/config/config.exs +++ b/config/config.exs @@ -7,7 +7,10 @@ if Mix.env() in [:bench, :test, :test_no_nif] do config :appsignal, appsignal: Appsignal.FakeAppsignal config :appsignal, appsignal_integration_logger: Appsignal.FakeIntegrationLogger config :appsignal, appsignal_transmitter: Appsignal.FakeTransmitter + config :appsignal, appsignal_checkin_scheduler: Appsignal.FakeScheduler + config :appsignal, appsignal_checkin_debounce: Appsignal.FakeDebounce config :appsignal, inet: FakeInet + config :appsignal, system: FakeSystem config :appsignal, io: FakeIO config :appsignal, file: FakeFile config :appsignal, os_internal: FakeOS diff --git a/lib/appsignal.ex b/lib/appsignal.ex index 55a8c4ff3..79dff003c 100644 --- a/lib/appsignal.ex +++ b/lib/appsignal.ex @@ -47,7 +47,8 @@ defmodule Appsignal do children = [ {Appsignal.Tracer, []}, {Appsignal.Monitor, []}, - {Appsignal.Probes, []} + {Appsignal.Probes, []}, + {Appsignal.CheckIn.Scheduler, []} ] result = Supervisor.start_link(children, strategy: :one_for_one, name: Appsignal.Supervisor) diff --git a/lib/appsignal/check_in.ex b/lib/appsignal/check_in.ex deleted file mode 100644 index 36409ac91..000000000 --- a/lib/appsignal/check_in.ex +++ /dev/null @@ -1,115 +0,0 @@ -defmodule Appsignal.CheckIn do - alias Appsignal.CheckIn.Cron - - @spec cron(String.t()) :: :ok - def cron(identifier) do - Cron.finish(Cron.new(identifier)) - end - - @spec cron(String.t(), (-> out)) :: out when out: var - def cron(identifier, fun) do - cron = Cron.new(identifier) - - Cron.start(cron) - output = fun.() - Cron.finish(cron) - - output - end -end - -defmodule Appsignal.CheckIn.Cron do - alias __MODULE__ - alias Appsignal.CheckIn.Cron.Event - - @transmitter Application.compile_env( - :appsignal, - :appsignal_transmitter, - Appsignal.Transmitter - ) - @type t :: %Cron{identifier: String.t(), digest: String.t()} - - defstruct [:identifier, :digest] - - @spec new(String.t()) :: t - def new(identifier) do - %Cron{ - identifier: identifier, - digest: random_digest() - } - end - - defp random_digest do - Base.encode16(:crypto.strong_rand_bytes(8), case: :lower) - end - - @spec start(Cron.t()) :: :ok - def start(cron) do - transmit(Event.new(cron, :start)) - end - - @spec finish(Cron.t()) :: :ok - def finish(cron) do - transmit(Event.new(cron, :finish)) - end - - @spec transmit(Event.t()) :: :ok - defp transmit(event) do - if Appsignal.Config.active?() do - config = Appsignal.Config.config() - endpoint = "#{config[:logging_endpoint]}/check_ins/json" - - case @transmitter.transmit(endpoint, {event, :json}, config) do - {:ok, status_code, _, _} when status_code in 200..299 -> - Appsignal.IntegrationLogger.trace( - "Transmitted cron check-in `#{event.identifier}` (#{event.digest}) #{event.kind} event" - ) - - {:ok, status_code, _, _} -> - Appsignal.IntegrationLogger.error( - "Failed to transmit cron check-in #{event.kind} event: status code was #{status_code}" - ) - - {:error, reason} -> - Appsignal.IntegrationLogger.error( - "Failed to transmit cron check-in #{event.kind} event: #{reason}" - ) - end - else - Appsignal.IntegrationLogger.debug( - "AppSignal not active, not transmitting cron check-in event" - ) - end - - :ok - end -end - -defmodule Appsignal.CheckIn.Cron.Event do - alias __MODULE__ - alias Appsignal.CheckIn.Cron - - @derive Jason.Encoder - - @type kind :: :start | :finish - @type t :: %Event{ - identifier: String.t(), - digest: String.t(), - kind: kind, - timestamp: integer, - check_in_type: :cron - } - - defstruct [:identifier, :digest, :kind, :timestamp, :check_in_type] - - @spec new(Cron.t(), kind) :: t - def new(%Cron{identifier: identifier, digest: digest}, kind) do - %Event{ - identifier: identifier, - digest: digest, - kind: kind, - timestamp: System.system_time(:second), - check_in_type: :cron - } - end -end diff --git a/lib/appsignal/check_in/check_in.ex b/lib/appsignal/check_in/check_in.ex new file mode 100644 index 000000000..324ca0b32 --- /dev/null +++ b/lib/appsignal/check_in/check_in.ex @@ -0,0 +1,19 @@ +defmodule Appsignal.CheckIn do + alias Appsignal.CheckIn.Cron + + @spec cron(String.t()) :: :ok + def cron(identifier) do + Cron.finish(Cron.new(identifier)) + end + + @spec cron(String.t(), (-> out)) :: out when out: var + def cron(identifier, fun) do + cron = Cron.new(identifier) + + Cron.start(cron) + output = fun.() + Cron.finish(cron) + + output + end +end diff --git a/lib/appsignal/check_in/cron.ex b/lib/appsignal/check_in/cron.ex new file mode 100644 index 000000000..825f679d6 --- /dev/null +++ b/lib/appsignal/check_in/cron.ex @@ -0,0 +1,64 @@ +defmodule Appsignal.CheckIn.Cron do + alias __MODULE__ + alias Appsignal.CheckIn.Cron.Event + + @scheduler Application.compile_env( + :appsignal, + :appsignal_checkin_scheduler, + Appsignal.CheckIn.Scheduler + ) + @type t :: %Cron{identifier: String.t(), digest: String.t()} + + defstruct [:identifier, :digest] + + @spec new(String.t()) :: t + def new(identifier) do + %Cron{ + identifier: identifier, + digest: random_digest() + } + end + + defp random_digest do + Base.encode16(:crypto.strong_rand_bytes(8), case: :lower) + end + + @spec start(Cron.t()) :: :ok + def start(cron) do + @scheduler.schedule(Event.new(cron, :start)) + end + + @spec finish(Cron.t()) :: :ok + def finish(cron) do + @scheduler.schedule(Event.new(cron, :finish)) + end +end + +defmodule Appsignal.CheckIn.Cron.Event do + alias __MODULE__ + alias Appsignal.CheckIn.Cron + + @derive Jason.Encoder + + @type kind :: :start | :finish + @type t :: %Event{ + identifier: String.t(), + digest: String.t(), + kind: kind, + timestamp: integer, + check_in_type: :cron + } + + defstruct [:identifier, :digest, :kind, :timestamp, :check_in_type] + + @spec new(Cron.t(), kind) :: t + def new(%Cron{identifier: identifier, digest: digest}, kind) do + %Event{ + identifier: identifier, + digest: digest, + kind: kind, + timestamp: System.system_time(:second), + check_in_type: :cron + } + end +end diff --git a/lib/appsignal/check_in/scheduler.ex b/lib/appsignal/check_in/scheduler.ex new file mode 100644 index 000000000..0036bbf6b --- /dev/null +++ b/lib/appsignal/check_in/scheduler.ex @@ -0,0 +1,191 @@ +defmodule Appsignal.CheckIn.Scheduler.Debounce do + @initial_debounce_milliseconds 100 + @between_transmissions_debounce_milliseconds 10_000 + + @system Application.compile_env( + :appsignal, + :system, + System + ) + + def milliseconds_until_next_transmission(nil), do: @initial_debounce_milliseconds + + def milliseconds_until_next_transmission(last_transmission_milliseconds) do + max( + @initial_debounce_milliseconds, + @between_transmissions_debounce_milliseconds - + milliseconds_since(last_transmission_milliseconds) + ) + end + + defp milliseconds_since(timestamp) do + @system.system_time(:millisecond) - timestamp + end +end + +defmodule Appsignal.CheckIn.Scheduler do + use GenServer + + alias Appsignal.CheckIn.Cron + + @debounce Application.compile_env( + :appsignal, + :appsignal_checkin_debounce, + Appsignal.CheckIn.Scheduler.Debounce + ) + + @transmitter Application.compile_env( + :appsignal, + :appsignal_transmitter, + Appsignal.Transmitter + ) + + @integration_logger Application.compile_env( + :appsignal, + :appsignal_integration_logger, + Appsignal.IntegrationLogger + ) + + @system Application.compile_env( + :appsignal, + :system, + System + ) + + @impl true + def init(_init_arg) do + # Ensure that the GenServer traps exits so that we can attempt to + # transmit any remaining events before terminating. + Process.flag(:trap_exit, true) + {:ok, initial_state()} + end + + def start_link(_init_arg) do + GenServer.start_link(__MODULE__, [], name: __MODULE__) + end + + def schedule(event) do + if Appsignal.Config.active?() do + GenServer.cast(__MODULE__, {:schedule, event}) + else + @integration_logger.debug( + "AppSignal not active, not scheduling #{describe_events([event])}" + ) + end + + :ok + end + + @impl true + def handle_cast({:schedule, event}, state) do + @integration_logger.trace("Scheduling #{describe_events([event])} to be transmitted") + + schedule_transmission(state) + + {:noreply, %{state | events: add_event(state.events, event)}} + end + + @impl true + def handle_info(:transmit, %{events: events}) do + # Remove the stored events from the state before transmitting them, + # to avoid transmitting them twice if the process receives a shutdown + # signal during the transmission. + {:noreply, initial_state(), {:continue, {:transmit, events}}} + end + + @impl true + def handle_continue({:transmit, events}, state) do + description = describe_events(events) + + config = Appsignal.Config.config() + endpoint = "#{config[:logging_endpoint]}/check_ins/json" + + case @transmitter.transmit(endpoint, {Enum.reverse(events), :ndjson}, config) do + {:ok, status_code, _, _} when status_code in 200..299 -> + @integration_logger.trace("Transmitted #{description}") + + {:ok, status_code, _, _} -> + @integration_logger.error( + "Failed to transmit #{description}: status code was #{status_code}" + ) + + {:error, reason} -> + @integration_logger.error("Failed to transmit #{description}: #{reason}") + end + + { + :noreply, + %{state | last_transmission_milliseconds: @system.system_time(:millisecond)}, + :hibernate + } + end + + @impl true + def terminate(_reason, %{events: events}) when length(events) > 0 do + # If any events are stored, attempt to transmit them before the + # process is terminated. + handle_continue({:transmit, events}, initial_state()) + end + + def terminate(_reason, state), do: nil + + defp initial_state() do + %{events: [], last_transmission_milliseconds: nil} + end + + defp schedule_transmission(%{events: []} = state) do + Process.send_after( + self(), + :transmit, + @debounce.milliseconds_until_next_transmission(state.last_transmission_milliseconds) + ) + end + + defp schedule_transmission(_state) do + # The transmission should only be scheduled when the first event is + # being added, so we don't need to schedule it again. + nil + end + + defp add_event(events, event) do + # Remove redundant events, keeping the newly added one, which + # should be the one with the most recent timestamp. + [event | Enum.reject(events, &redundant_event?(&1, event))] + end + + defp redundant_event?(%Cron.Event{} = event, %Cron.Event{} = new_event) do + # Consider any existing cron check-in event redundant if it has the + # same identifier, digest and kind as the one we're adding. + is_redundant = + event.identifier == new_event.identifier && + event.kind == new_event.kind && + event.digest == new_event.digest + + if is_redundant do + @integration_logger.debug("Replacing previously scheduled #{describe_events([event])}") + end + + is_redundant + end + + defp redundant_event?(_event, _new_event), do: false + + defp describe_events([]) do + # This shouldn't happen. + "no check-in events" + end + + defp describe_events(events) when length(events) > 1 do + "#{Enum.count(events)} check-in events" + end + + defp describe_events([%Cron.Event{} = event]) do + "cron check-in `#{event.identifier || "unknown"}` " <> + "#{event.kind || "unknown"} event (digest #{event.digest || "unknown"})" + end + + defp describe_events([_event]) do + # This shouldn't happen. + "unknown check-in event" + end +end diff --git a/mix_helpers.exs b/mix_helpers.exs index 6e5def647..bbd751b00 100644 --- a/mix_helpers.exs +++ b/mix_helpers.exs @@ -9,7 +9,7 @@ defmodule Mix.Appsignal.Helper do @erlang Application.compile_env(:appsignal, :erlang, :erlang) @os Application.compile_env(:appsignal, :os, :os) - @system Application.compile_env(:appsignal, :mix_system, System) + @system Application.compile_env(:appsignal, :system, System) @proxy_env_vars [ "APPSIGNAL_HTTP_PROXY", diff --git a/test/appsignal/check_in/check_in_test.exs b/test/appsignal/check_in/check_in_test.exs new file mode 100644 index 000000000..756eabd76 --- /dev/null +++ b/test/appsignal/check_in/check_in_test.exs @@ -0,0 +1,99 @@ +defmodule Appsignal.CheckInTest do + use ExUnit.Case + alias Appsignal.CheckIn + alias Appsignal.CheckIn.Cron + alias Appsignal.CheckIn.Cron.Event + alias Appsignal.FakeScheduler + + setup do + start_supervised!(FakeScheduler) + :ok + end + + describe "start/1" do + test "transmits a start event for the cron check-in" do + cron = Cron.new("cron-checkin-name") + Cron.start(cron) + + assert [ + %Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} + ] = FakeScheduler.scheduled() + end + end + + describe "finish/1" do + test "transmits a finish event for the cron check-in" do + cron = Cron.new("cron-checkin-name") + Cron.finish(cron) + + assert [ + %Event{identifier: "cron-checkin-name", kind: :finish, check_in_type: :cron} + ] = FakeScheduler.scheduled() + end + end + + describe "cron/2" do + test "transmits a start and finish event for the cron check-in" do + output = CheckIn.cron("cron-checkin-name", fn -> "output" end) + + assert [ + %Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron}, + %Event{identifier: "cron-checkin-name", kind: :finish, check_in_type: :cron} + ] = FakeScheduler.scheduled() + + assert "output" == output + end + + test "does not transmit a finish event when the function throws an error" do + assert_raise RuntimeError, fn -> + CheckIn.cron("cron-checkin-name", fn -> raise "error" end) + end + + assert [ + %Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} + ] = FakeScheduler.scheduled() + end + end + + describe "cron/1" do + test "transmits a finish event for the cron check-in" do + CheckIn.cron("cron-checkin-name") + + assert [ + %Event{identifier: "cron-checkin-name", kind: :finish, check_in_type: :cron} + ] = FakeScheduler.scheduled() + end + end + + describe "deprecated heartbeat functions" do + test "forwards heartbeat/1 to CheckIn.cron/1" do + Appsignal.heartbeat("heartbeat-name") + + assert [ + %Event{identifier: "heartbeat-name", kind: :finish, check_in_type: :cron} + ] = FakeScheduler.scheduled() + end + + test "forwards heartbeat/2 to CheckIn.cron/2" do + output = Appsignal.heartbeat("heartbeat-name", fn -> "output" end) + + assert [ + %Event{identifier: "heartbeat-name", kind: :start, check_in_type: :cron}, + %Event{identifier: "heartbeat-name", kind: :finish, check_in_type: :cron} + ] = FakeScheduler.scheduled() + + assert "output" == output + end + + test "forwards new/1, start/1 and finish/1 to the CheckIn.Cron module" do + heartbeat = Appsignal.Heartbeat.new("heartbeat-name") + Appsignal.Heartbeat.start(heartbeat) + Appsignal.Heartbeat.finish(heartbeat) + + assert [ + %Event{identifier: "heartbeat-name", kind: :start, check_in_type: :cron}, + %Event{identifier: "heartbeat-name", kind: :finish, check_in_type: :cron} + ] = FakeScheduler.scheduled() + end + end +end diff --git a/test/appsignal/check_in/scheduler_test.exs b/test/appsignal/check_in/scheduler_test.exs new file mode 100644 index 000000000..8a5df704f --- /dev/null +++ b/test/appsignal/check_in/scheduler_test.exs @@ -0,0 +1,310 @@ +defmodule Appsignal.CheckInSchedulerTest do + use ExUnit.Case + alias Appsignal.CheckIn.Cron + alias Appsignal.CheckIn.Scheduler + alias Appsignal.FakeScheduler + alias Appsignal.FakeDebounce + alias Appsignal.FakeIntegrationLogger + alias Appsignal.FakeTransmitter + import AppsignalTest.Utils, only: [with_config: 2, until: 1, until_all_messages_processed: 1] + + def scheduler_events() do + # The events are stored in reverse order. + Enum.reverse(:sys.get_state(Scheduler)[:events]) + end + + setup do + # Start the fake scheduler in proxy mode, so that calls to + # the check-in helpers are forwarded to the real scheduler. + start_supervised!({FakeScheduler, [:proxy]}) + start_supervised!(FakeIntegrationLogger) + start_supervised!(FakeTransmitter) + start_supervised!(FakeDebounce) + start_supervised!(FakeSystem) + + on_exit(fn -> + # Restart the check-in scheduler in between tests to clear + # the stored events and scheduled transmissions. + Supervisor.terminate_child(Appsignal.Supervisor, Scheduler) + Supervisor.restart_child(Appsignal.Supervisor, Scheduler) + end) + + :ok + end + + describe "start/1 and finish/1, when AppSignal is not active" do + test "it does not transmit any events" do + cron = Cron.new("cron-checkin-name") + + with_config(%{active: false}, fn -> + Cron.start(cron) + Cron.finish(cron) + + until_all_messages_processed(Scheduler) + end) + + assert [] = scheduler_events() + + assert FakeIntegrationLogger.logged?( + :debug, + &String.starts_with?( + &1, + "AppSignal not active, not scheduling cron check-in `cron-checkin-name` start event" + ) + ) + + assert FakeIntegrationLogger.logged?( + :debug, + &String.starts_with?( + &1, + "AppSignal not active, not scheduling cron check-in `cron-checkin-name` finish event" + ) + ) + end + end + + describe "schedule/1" do + test "it stores a cron check-in event to be transmitted" do + cron = Cron.new("cron-checkin-name") + + Cron.start(cron) + + until_all_messages_processed(Scheduler) + + assert [ + %Cron.Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} + ] = scheduler_events() + + assert FakeIntegrationLogger.logged?( + :trace, + &String.starts_with?( + &1, + "Scheduling cron check-in `cron-checkin-name` start event" + ) + ) + end + + test "it does not store redundant events" do + cron = Cron.new("cron-checkin-name") + + Cron.start(cron) + Cron.start(cron) + + until_all_messages_processed(Scheduler) + + assert [ + %Cron.Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} + ] = scheduler_events() + + assert FakeIntegrationLogger.logged?( + :trace, + &String.starts_with?( + &1, + "Scheduling cron check-in `cron-checkin-name` start event" + ) + ) + + assert FakeIntegrationLogger.logged?( + :debug, + &String.starts_with?( + &1, + "Replacing previously scheduled cron check-in `cron-checkin-name` start event" + ) + ) + end + + test "it transmits the stored event" do + cron = Cron.new("cron-checkin-name") + + Cron.start(cron) + + assert [] = FakeTransmitter.transmitted() + until(fn -> assert [] != FakeTransmitter.transmitted() end) + + assert [ + {[ + %Cron.Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} + ], :ndjson} + ] = FakeTransmitter.transmitted_payloads() + + assert FakeIntegrationLogger.logged?( + :trace, + &String.starts_with?( + &1, + "Transmitted cron check-in `cron-checkin-name` start event" + ) + ) + end + + test "it transmits many stored events in a single request" do + cron = Cron.new("cron-checkin-name") + + Cron.start(cron) + Cron.finish(cron) + + assert [] = FakeTransmitter.transmitted() + until(fn -> assert [] != FakeTransmitter.transmitted() end) + + assert [ + {[ + %Cron.Event{ + identifier: "cron-checkin-name", + kind: :start, + check_in_type: :cron + }, + %Cron.Event{ + identifier: "cron-checkin-name", + kind: :finish, + check_in_type: :cron + } + ], :ndjson} + ] = FakeTransmitter.transmitted_payloads() + + until(fn -> + assert FakeIntegrationLogger.logged?(:trace, "Transmitted 2 check-in events") + end) + end + + test "it logs an error when it receives a non-2xx response" do + FakeTransmitter.set_response({:ok, 500, :fake, :fake}) + + cron = Cron.new("cron-checkin-name") + + Cron.start(cron) + + assert [] = FakeTransmitter.transmitted() + until(fn -> assert [] != FakeTransmitter.transmitted() end) + + until(fn -> + assert FakeIntegrationLogger.logged?( + :error, + &(String.starts_with?( + &1, + "Failed to transmit cron check-in `cron-checkin-name` start event" + ) && + String.ends_with?(&1, ": status code was 500")) + ) + end) + end + + test "it logs an error when the request errors" do + FakeTransmitter.set_response({:error, "fake error"}) + + cron = Cron.new("cron-checkin-name") + + Cron.start(cron) + + assert [] = FakeTransmitter.transmitted() + until(fn -> assert [] != FakeTransmitter.transmitted() end) + + until(fn -> + assert FakeIntegrationLogger.logged?( + :error, + &(String.starts_with?( + &1, + "Failed to transmit cron check-in `cron-checkin-name` start event" + ) && + String.ends_with?(&1, ": fake error")) + ) + end) + end + + test "it transmits the stored events when it receives a shutdown signal" do + # Set a really long debounce time to ensure that the transmission that + # is taking place is the one triggered by the shutdown signal. + # The `until/1` call below will time out waiting for this debounce. + FakeDebounce.set_debounce(10_000) + + cron = Cron.new("cron-checkin-name") + + Cron.start(cron) + + until_all_messages_processed(Scheduler) + + GenServer.stop(Scheduler) + + until(fn -> assert [] != FakeTransmitter.transmitted() end) + + assert [ + {[ + %Cron.Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} + ], :ndjson} + ] = FakeTransmitter.transmitted_payloads() + + assert FakeIntegrationLogger.logged?( + :trace, + &String.starts_with?( + &1, + "Transmitted cron check-in `cron-checkin-name` start event" + ) + ) + end + + test "it does not transmit the events twice when it receives a shutdown signal during a transmission" do + # Cause the transmission to raise an exception, triggering the process + # to shut down, and `terminate/2` to be called, without the current + # callback in the process updating the state. + FakeTransmitter.set_response(fn -> raise "something went wrong" end) + + cron = Cron.new("cron-checkin-name") + + Cron.start(cron) + + # Wait for the process to attempt to transmit, crash, and call + # `terminate/2` to shut itself down. + current_pid = Process.whereis(Scheduler) + until(fn -> assert !Process.alive?(current_pid) end) + + assert [ + {[ + %Cron.Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron} + ], :ndjson} + ] = FakeTransmitter.transmitted_payloads() + end + + test "it uses the last transmission time to debounce the next scheduled transmission" do + cron = Cron.new("cron-checkin-name") + + Cron.start(cron) + + assert [] = FakeTransmitter.transmitted() + until(fn -> assert [] != FakeTransmitter.transmitted() end) + + assert FakeDebounce.last_transmission_milliseconds() == nil + + Cron.start(cron) + + until_all_messages_processed(Scheduler) + + assert FakeDebounce.last_transmission_milliseconds() == FakeSystem.system_time(:millisecond) + end + end + + describe "milliseconds_until_next_transmission/1" do + test "returns a short debounce period when no last transmission is given" do + assert 100 == Scheduler.Debounce.milliseconds_until_next_transmission(nil) + end + + test "returns a short debounce period when the last transmission was a long time ago" do + epoch_milliseconds = 0 + assert 100 == Scheduler.Debounce.milliseconds_until_next_transmission(epoch_milliseconds) + end + + test "returns a long debounce period when the last transmission was now" do + current_milliseconds = FakeSystem.system_time(:millisecond) + + assert 10000 == + Scheduler.Debounce.milliseconds_until_next_transmission(current_milliseconds) + end + + test "subtracts the time since the last transmission from the long debounce" do + current_milliseconds = FakeSystem.system_time(:millisecond) + last_transmission_milliseconds = current_milliseconds - 1000 + + assert 9000 == + Scheduler.Debounce.milliseconds_until_next_transmission( + last_transmission_milliseconds + ) + end + end +end diff --git a/test/appsignal/check_in_test.exs b/test/appsignal/check_in_test.exs deleted file mode 100644 index 447abf62c..000000000 --- a/test/appsignal/check_in_test.exs +++ /dev/null @@ -1,119 +0,0 @@ -defmodule Appsignal.CheckInTest do - use ExUnit.Case - alias Appsignal.CheckIn - alias Appsignal.CheckIn.Cron - alias Appsignal.CheckIn.Cron.Event - alias Appsignal.FakeTransmitter - import AppsignalTest.Utils, only: [with_config: 2] - - setup do - start_supervised!(FakeTransmitter) - :ok - end - - describe "start/1 and finish/1, when AppSignal is not active" do - test "it does not transmit any events" do - cron = Cron.new("cron-checkin-name") - - with_config(%{active: false}, fn -> - Cron.start(cron) - Cron.finish(cron) - end) - - assert [] = FakeTransmitter.transmitted_payloads() - end - end - - describe "start/1" do - test "transmits a start event for the cron check-in" do - cron = Cron.new("cron-checkin-name") - Cron.start(cron) - - assert [ - {%Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron}, - :json} - ] = FakeTransmitter.transmitted_payloads() - end - end - - describe "finish/1" do - test "transmits a finish event for the cron check-in" do - cron = Cron.new("cron-checkin-name") - Cron.finish(cron) - - assert [ - {%Event{identifier: "cron-checkin-name", kind: :finish, check_in_type: :cron}, - :json} - ] = FakeTransmitter.transmitted_payloads() - end - end - - describe "cron/2" do - test "transmits a start and finish event for the cron check-in" do - output = CheckIn.cron("cron-checkin-name", fn -> "output" end) - - assert [ - {%Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron}, - :json}, - {%Event{identifier: "cron-checkin-name", kind: :finish, check_in_type: :cron}, - :json} - ] = FakeTransmitter.transmitted_payloads() - - assert "output" == output - end - - test "does not transmit a finish event when the function throws an error" do - assert_raise RuntimeError, fn -> - CheckIn.cron("cron-checkin-name", fn -> raise "error" end) - end - - assert [ - {%Event{identifier: "cron-checkin-name", kind: :start, check_in_type: :cron}, - :json} - ] = FakeTransmitter.transmitted_payloads() - end - end - - describe "cron/1" do - test "transmits a finish event for the cron check-in" do - CheckIn.cron("cron-checkin-name") - - assert [ - {%Event{identifier: "cron-checkin-name", kind: :finish, check_in_type: :cron}, - :json} - ] = FakeTransmitter.transmitted_payloads() - end - end - - describe "deprecated heartbeat functions" do - test "forwards heartbeat/1 to CheckIn.cron/1" do - Appsignal.heartbeat("heartbeat-name") - - assert [ - {%Event{identifier: "heartbeat-name", kind: :finish, check_in_type: :cron}, :json} - ] = FakeTransmitter.transmitted_payloads() - end - - test "forwards heartbeat/2 to CheckIn.cron/2" do - output = Appsignal.heartbeat("heartbeat-name", fn -> "output" end) - - assert [ - {%Event{identifier: "heartbeat-name", kind: :start, check_in_type: :cron}, :json}, - {%Event{identifier: "heartbeat-name", kind: :finish, check_in_type: :cron}, :json} - ] = FakeTransmitter.transmitted_payloads() - - assert "output" == output - end - - test "forwards new/1, start/1 and finish/1 to the CheckIn.Cron module" do - heartbeat = Appsignal.Heartbeat.new("heartbeat-name") - Appsignal.Heartbeat.start(heartbeat) - Appsignal.Heartbeat.finish(heartbeat) - - assert [ - {%Event{identifier: "heartbeat-name", kind: :start, check_in_type: :cron}, :json}, - {%Event{identifier: "heartbeat-name", kind: :finish, check_in_type: :cron}, :json} - ] = FakeTransmitter.transmitted_payloads() - end - end -end diff --git a/test/appsignal/ecto_repo_test.exs b/test/appsignal/ecto_repo_test.exs index c25765f07..47bdf4051 100644 --- a/test/appsignal/ecto_repo_test.exs +++ b/test/appsignal/ecto_repo_test.exs @@ -28,9 +28,9 @@ defmodule Appsignal.EctoRepoTest do test "use Appsignal.Ecto.Repo passes through options to Ecto.Repo" do assert Appsignal.TestEctoRepo.get_received_opts() == [ - otp_app: :plug_example, - adapter: Ecto.Adapters.Postgres - ] + otp_app: :plug_example, + adapter: Ecto.Adapters.Postgres + ] end test "use Appsignal.Ecto.Repo can have overriden default options" do diff --git a/test/mix/helpers_test.exs b/test/mix/helpers_test.exs index aa04ae652..44fc88b6a 100644 --- a/test/mix/helpers_test.exs +++ b/test/mix/helpers_test.exs @@ -1,7 +1,7 @@ :code.delete(Mix.Appsignal.Helper) Application.put_env(:appsignal, :erlang, FakeErlang) Application.put_env(:appsignal, :os, FakeOS) -Application.put_env(:appsignal, :mix_system, FakeSystem) +Application.put_env(:appsignal, :system, FakeSystem) {_, _} = Code.eval_file("mix_helpers.exs") defmodule Mix.Appsignal.HelperTest do diff --git a/test/support/appsignal/fake_debounce.ex b/test/support/appsignal/fake_debounce.ex new file mode 100644 index 000000000..65ff73f0c --- /dev/null +++ b/test/support/appsignal/fake_debounce.ex @@ -0,0 +1,46 @@ +defmodule Appsignal.FakeDebounce do + use Agent + + def start_link(opts \\ nil) do + Agent.start_link( + # Since `nil` is a valid value for `last_transmission_milliseconds`, + # we use `:never_called` to differentiate when the function has + # never been called. + fn -> + %{ + last_transmission_milliseconds: :never_called, + debounce: 20 + } + end, + name: __MODULE__ + ) + end + + def child_spec(opts \\ []) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, opts}, + type: :worker, + restart: :permanent, + shutdown: 500 + } + end + + def milliseconds_until_next_transmission(last_transmission_milliseconds) do + Agent.update(__MODULE__, fn state -> + %{state | last_transmission_milliseconds: last_transmission_milliseconds} + end) + + Agent.get(__MODULE__, & &1.debounce) + end + + def last_transmission_milliseconds do + Agent.get(__MODULE__, & &1.last_transmission_milliseconds) + end + + def set_debounce(debounce) do + Agent.update(__MODULE__, fn state -> + %{state | debounce: debounce} + end) + end +end diff --git a/test/support/appsignal/fake_integration_logger.ex b/test/support/appsignal/fake_integration_logger.ex index 996ec3c35..bdf5da81e 100644 --- a/test/support/appsignal/fake_integration_logger.ex +++ b/test/support/appsignal/fake_integration_logger.ex @@ -23,13 +23,19 @@ defmodule Appsignal.FakeIntegrationLogger do add(:logs, [:error, message]) end - def logged?(pid_or_module, type, message) do - Enum.any?(get(pid_or_module, :logs), fn element -> - match?([^type, ^message], element) + def logged?(pid_or_module \\ __MODULE__, type, message) + + def logged?(pid_or_module, type, message) when is_binary(message) do + logged?(pid_or_module, type, &(&1 == message)) + end + + def logged?(pid_or_module, type, matcher) when is_function(matcher) do + Enum.any?(get(pid_or_module, :logs), fn [logged_type, logged_message] -> + type == logged_type && matcher.(logged_message) end) end - def get_logs(pid_or_module, type) do + def get_logs(pid_or_module \\ __MODULE__, type) do Enum.filter(get(pid_or_module, :logs), fn element -> match?([^type, _], element) end) diff --git a/test/support/appsignal/fake_scheduler.ex b/test/support/appsignal/fake_scheduler.ex new file mode 100644 index 000000000..449582a4e --- /dev/null +++ b/test/support/appsignal/fake_scheduler.ex @@ -0,0 +1,40 @@ +defmodule Appsignal.FakeScheduler do + use Agent + + def start_link(opts \\ nil) do + Agent.start_link( + fn -> %{is_proxy: opts == :proxy, scheduled: []} end, + name: __MODULE__ + ) + end + + def child_spec(opts \\ []) do + %{ + id: __MODULE__, + start: {__MODULE__, :start_link, opts}, + type: :worker, + restart: :permanent, + shutdown: 500 + } + end + + def schedule(event) do + if is_proxy?() do + Appsignal.CheckIn.Scheduler.schedule(event) + else + Agent.update(__MODULE__, fn state -> + %{state | scheduled: [event | state.scheduled]} + end) + + :ok + end + end + + def scheduled do + Agent.get(__MODULE__, &Enum.reverse(&1.scheduled)) + end + + defp is_proxy? do + Agent.get(__MODULE__, & &1.is_proxy) + end +end diff --git a/test/support/appsignal/fake_transmitter.ex b/test/support/appsignal/fake_transmitter.ex index d6bd06584..81e334d87 100644 --- a/test/support/appsignal/fake_transmitter.ex +++ b/test/support/appsignal/fake_transmitter.ex @@ -6,7 +6,7 @@ defmodule Appsignal.FakeTransmitter do fn -> %{ transmitted: [], - response: {:ok, 200, :fake, :fake} + response: fn -> {:ok, 200, :fake, :fake} end } end, name: __MODULE__ @@ -28,7 +28,7 @@ defmodule Appsignal.FakeTransmitter do Map.update!(state, :transmitted, &[{url, payload, config} | &1]) end) - Agent.get(__MODULE__, & &1[:response]) + Agent.get(__MODULE__, & &1[:response]).() end def transmitted do @@ -38,4 +38,12 @@ defmodule Appsignal.FakeTransmitter do def transmitted_payloads do Enum.map(transmitted(), fn {_url, payload, _config} -> payload end) end + + def set_response(response) when is_function(response, 0) do + Agent.update(__MODULE__, fn state -> + %{state | response: response} + end) + end + + def set_response(response), do: set_response(fn -> response end) end diff --git a/test/support/fake_system.ex b/test/support/fake_system.ex index 7fbe84c07..3b5bd03cc 100644 --- a/test/support/fake_system.ex +++ b/test/support/fake_system.ex @@ -4,4 +4,8 @@ defmodule FakeSystem do def cmd(command, args, opts \\ []) do get(__MODULE__, :cmd).(command, args, opts) end + + def system_time(_atom) do + 1_000_000_000 + end end diff --git a/test/support/utils.ex b/test/support/utils.ex index 49d66cd49..78cd9230f 100644 --- a/test/support/utils.ex +++ b/test/support/utils.ex @@ -1,4 +1,6 @@ defmodule AppsignalTest.Utils do + require ExUnit.Assertions + # Remove loaded from the app so the module is recompiled when called. Do this # if the module is already loaded before the test env, such as in `mix.exs`. def purge(mod) do @@ -118,6 +120,20 @@ defmodule AppsignalTest.Utils do end end + def until_all_messages_processed(name_or_pid) do + until_messages_queued(name_or_pid, 0) + end + + def until_messages_queued(name, count) when is_atom(name) do + until_messages_queued(Process.whereis(name), count) + end + + def until_messages_queued(pid, count) when is_pid(pid) do + until(fn -> + ExUnit.Assertions.assert({_, ^count} = :erlang.process_info(pid, :message_queue_len)) + end) + end + def repeatedly(assertion) do repeatedly(assertion, 10) end