diff --git a/lib/fleet_manager/configuration.ex b/lib/fleet_manager/configuration.ex index d329242..b82d861 100644 --- a/lib/fleet_manager/configuration.ex +++ b/lib/fleet_manager/configuration.ex @@ -11,7 +11,7 @@ defmodule OpenAperture.FleetManager.Configuration do The exchange identifier """ - @spec get_current_exchange_id() :: String.t() + @spec get_current_exchange_id() :: String.t def get_current_exchange_id do get_config("EXCHANGE_ID", :openaperture_fleet_manager, :exchange_id) end @@ -25,7 +25,7 @@ defmodule OpenAperture.FleetManager.Configuration do The exchange identifier """ - @spec get_current_broker_id() :: String.t() + @spec get_current_broker_id() :: String.t def get_current_broker_id do get_config("BROKER_ID", :openaperture_fleet_manager, :broker_id) end @@ -39,7 +39,7 @@ defmodule OpenAperture.FleetManager.Configuration do The exchange identifier """ - @spec get_current_queue_name() :: String.t() + @spec get_current_queue_name() :: String.t def get_current_queue_name do get_config("QUEUE_NAME", :openaperture_fleet_manager, :queue_name) end @@ -59,7 +59,7 @@ defmodule OpenAperture.FleetManager.Configuration do # # Value # - @spec get_config(String.t(), term, term) :: String.t() + @spec get_config(String.t, term, term) :: String.t defp get_config(env_name, application_config, config_name) do System.get_env(env_name) || Application.get_env(application_config, config_name) end diff --git a/lib/fleet_manager/dispatcher.ex b/lib/fleet_manager/dispatcher.ex index 16d24d9..12cbde7 100644 --- a/lib/fleet_manager/dispatcher.ex +++ b/lib/fleet_manager/dispatcher.ex @@ -1,6 +1,6 @@ defmodule OpenAperture.FleetManager.Dispatcher do use GenServer - + alias OpenAperture.Messaging.AMQP.QueueBuilder alias OpenAperture.Messaging.AMQP.SubscriptionHandler alias OpenAperture.Messaging.RpcRequest @@ -14,11 +14,11 @@ defmodule OpenAperture.FleetManager.Dispatcher do alias OpenAperture.ManagerApi.SystemEvent @moduledoc """ - This module contains the logic to dispatch Builder messsages to the appropriate GenServer(s) - """ + This module contains the logic to dispatch Builder messsages to the appropriate GenServer(s) + """ @connection_options nil - use OpenAperture.Messaging + use OpenAperture.Messaging @doc """ Specific start_link implementation (required by the supervisor) @@ -29,10 +29,10 @@ defmodule OpenAperture.FleetManager.Dispatcher do {:ok, pid} | {:error, reason} """ - @spec start_link() :: {:ok, pid} | {:error, String.t()} + @spec start_link() :: {:ok, pid} | {:error, String.t} def start_link do case GenServer.start_link(__MODULE__, %{}, name: __MODULE__) do - {:error, reason} -> + {:error, reason} -> Logger.error("[Dispatcher] Failed to start OpenAperture FleetManager: #{inspect reason}") {:error, reason} {:ok, pid} -> @@ -40,10 +40,10 @@ defmodule OpenAperture.FleetManager.Dispatcher do if Application.get_env(:autostart, :register_queues, false) do case register_queues do {:ok, _} -> {:ok, pid} - {:error, reason} -> + {:error, reason} -> Logger.error("[Dispatcher] Failed to register FleetManager queues: #{inspect reason}") {:ok, pid} - end + end else {:ok, pid} end @@ -61,13 +61,13 @@ defmodule OpenAperture.FleetManager.Dispatcher do :ok | {:error, reason} """ - @spec register_queues() :: :ok | {:error, String.t()} + @spec register_queues() :: :ok | {:error, String.t} def register_queues do Logger.debug("[Dispatcher] Registering FleetManager queues...") fleet_manager_queue = QueueBuilder.build(ManagerApi.get_api, Configuration.get_current_queue_name, Configuration.get_current_exchange_id) options = OpenAperture.Messaging.ConnectionOptionsResolver.get_for_broker(ManagerApi.get_api, Configuration.get_current_broker_id) - subscribe(options, fleet_manager_queue, fn(payload, _meta, %{delivery_tag: delivery_tag} = async_info) -> + subscribe(options, fleet_manager_queue, fn(payload, _meta, %{delivery_tag: delivery_tag} = async_info) -> MessageManager.track(async_info) process_request(delivery_tag, payload) end) @@ -79,16 +79,16 @@ defmodule OpenAperture.FleetManager.Dispatcher do fleet_request = FleetRequest.from_payload(request.request_body) - request = %{request | + request = %{request | status: :error, response_body: %{errors: [error_msg]} - } - acknowledge(delivery_tag, request) + } + acknowledge(delivery_tag, request) event = %{ unique: true, - type: :unhandled_exception, - severity: :error, + type: :unhandled_exception, + severity: :error, data: %{ component: :fleet_manager, exchange_id: Configuration.get_current_exchange_id, @@ -96,13 +96,13 @@ defmodule OpenAperture.FleetManager.Dispatcher do fleet_request_action: fleet_request.action }, message: error_msg - } - SystemEvent.create_system_event!(ManagerApi.get_api, event) + } + SystemEvent.create_system_event!(ManagerApi.get_api, event) end @doc """ Method to process FleetManager requests for a defined period of time - + ## Options The `delivery_tag` option is the unique identifier of the message @@ -110,13 +110,13 @@ defmodule OpenAperture.FleetManager.Dispatcher do The `payload` defines the Messaging payload """ - @spec process_request(String.t(), Map) :: term + @spec process_request(String.t, Map) :: term def process_request(delivery_tag, payload) do request = RpcRequest.from_payload(payload) task = Task.async(fn -> process_request_internal(request, delivery_tag) - end) + end) #attempt to execute the request for 5 minutes before failing it try do @@ -126,12 +126,12 @@ defmodule OpenAperture.FleetManager.Dispatcher do :exit, code -> process_request_failure("Exited with code #{inspect code}", delivery_tag, request) :throw, value -> process_request_failure("Throw called with #{inspect value}", delivery_tag, request) what, value -> process_request_failure("Caught #{inspect what} with #{inspect value}", delivery_tag, request) - end + end end @doc """ Method to execute a FleetManager request - + ## Options The `request` option is the parsed RpcRequest request @@ -147,13 +147,13 @@ defmodule OpenAperture.FleetManager.Dispatcher do request = case FleetActions.execute(fleet_request) do {:ok, response} -> Logger.debug("[Dispatcher][Request][#{delivery_tag}] Completed successfully") - %{request | + %{request | status: :completed, response_body: response, } {:error, reason} -> Logger.debug("[Dispatcher][Request][#{delivery_tag}] Failed: #{inspect reason}") - %{request | + %{request | status: :error, response_body: %{errors: ["#{inspect reason}"]}, } @@ -164,7 +164,7 @@ defmodule OpenAperture.FleetManager.Dispatcher do catch :exit, code -> process_request_failure("Exited with code #{inspect code}", delivery_tag, request) :throw, value -> process_request_failure("Throw called with #{inspect value}", delivery_tag, request) - what, value -> process_request_failure("Caught #{inspect what} with #{inspect value}", delivery_tag, request) + what, value -> process_request_failure("Caught #{inspect what} with #{inspect value}", delivery_tag, request) end end @@ -178,7 +178,7 @@ defmodule OpenAperture.FleetManager.Dispatcher do The `request` option is the RpcRequest """ - @spec acknowledge(String.t(), RpcRequest.t) :: term + @spec acknowledge(String.t, RpcRequest.t) :: term def acknowledge(delivery_tag, request) do message = MessageManager.remove(delivery_tag) unless message == nil do @@ -200,14 +200,14 @@ defmodule OpenAperture.FleetManager.Dispatcher do The `redeliver` option can be used to requeue a message """ - @spec reject(String.t(), RpcRequest.t, term) :: term + @spec reject(String.t, RpcRequest.t, term) :: term def reject(delivery_tag, request, redeliver \\ false) do message = MessageManager.remove(delivery_tag) unless message == nil do SubscriptionHandler.reject_rpc(message[:subscription_handler], delivery_tag, ManagerApi.get_api, request, redeliver) - Logger.debug("[Dispatcher][Request][#{delivery_tag}] Rejected message") + Logger.debug("[Dispatcher][Request][#{delivery_tag}] Rejected message") else - Logger.error("[Dispatcher][Request][#{delivery_tag}] Unable to reject message, MessageManager does not have a record!") + Logger.error("[Dispatcher][Request][#{delivery_tag}] Unable to reject message, MessageManager does not have a record!") end end -end \ No newline at end of file +end diff --git a/lib/fleet_manager/message_manager.ex b/lib/fleet_manager/message_manager.ex index d07713b..e7ca245 100644 --- a/lib/fleet_manager/message_manager.ex +++ b/lib/fleet_manager/message_manager.ex @@ -11,15 +11,15 @@ defmodule OpenAperture.FleetManager.MessageManager do @moduledoc """ This module contains the logic for associating message references with their subscription handlers - """ + """ @doc """ Creates a `GenServer` representing Docker host cluster. ## Return values - {:ok, pid} | {:error, String.t()} + {:ok, pid} | {:error, String.t} """ - @spec start_link() :: {:ok, pid} | {:error, String.t()} + @spec start_link() :: {:ok, pid} | {:error, String.t} def start_link() do Logger.debug("#{@logprefix} Starting...") Agent.start_link(fn -> %{} end, name: __MODULE__) @@ -39,13 +39,13 @@ defmodule OpenAperture.FleetManager.MessageManager do Logger.debug("#{@logprefix} Tracking message #{delivery_tag}...") new_message = %{ process: self(), - subscription_handler: subscription_handler, + subscription_handler: subscription_handler, delivery_tag: delivery_tag, start_time: :calendar.universal_time } Agent.update(__MODULE__, fn messages -> Map.put(messages, delivery_tag, new_message) end) - + new_message end @@ -60,7 +60,7 @@ defmodule OpenAperture.FleetManager.MessageManager do Map containing the subscription_handler and delivery_tag """ - @spec remove(String.t()) :: Map + @spec remove(String.t) :: Map def remove(delivery_tag) do Logger.debug("#{@logprefix} Finished tracking message #{delivery_tag}...") @@ -69,4 +69,4 @@ defmodule OpenAperture.FleetManager.MessageManager do {message, Map.delete(messages, delivery_tag)} end) end -end \ No newline at end of file +end