Skip to content

Commit

Permalink
String.t() -> String.t
Browse files Browse the repository at this point in the history
  • Loading branch information
Kevin C. Baird committed Aug 19, 2015
1 parent 0e441bd commit 3b2f9e6
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 41 deletions.
8 changes: 4 additions & 4 deletions lib/fleet_manager/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule OpenAperture.FleetManager.Configuration do
The exchange identifier
"""
@spec get_current_exchange_id() :: String.t()
@spec get_current_exchange_id() :: String.t
def get_current_exchange_id do
get_config("EXCHANGE_ID", :openaperture_fleet_manager, :exchange_id)
end
Expand All @@ -25,7 +25,7 @@ defmodule OpenAperture.FleetManager.Configuration do
The exchange identifier
"""
@spec get_current_broker_id() :: String.t()
@spec get_current_broker_id() :: String.t
def get_current_broker_id do
get_config("BROKER_ID", :openaperture_fleet_manager, :broker_id)
end
Expand All @@ -39,7 +39,7 @@ defmodule OpenAperture.FleetManager.Configuration do
The exchange identifier
"""
@spec get_current_queue_name() :: String.t()
@spec get_current_queue_name() :: String.t
def get_current_queue_name do
get_config("QUEUE_NAME", :openaperture_fleet_manager, :queue_name)
end
Expand All @@ -59,7 +59,7 @@ defmodule OpenAperture.FleetManager.Configuration do
#
# Value
#
@spec get_config(String.t(), term, term) :: String.t()
@spec get_config(String.t, term, term) :: String.t
defp get_config(env_name, application_config, config_name) do
System.get_env(env_name) || Application.get_env(application_config, config_name)
end
Expand Down
60 changes: 30 additions & 30 deletions lib/fleet_manager/dispatcher.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule OpenAperture.FleetManager.Dispatcher do
use GenServer

alias OpenAperture.Messaging.AMQP.QueueBuilder
alias OpenAperture.Messaging.AMQP.SubscriptionHandler
alias OpenAperture.Messaging.RpcRequest
Expand All @@ -14,11 +14,11 @@ defmodule OpenAperture.FleetManager.Dispatcher do
alias OpenAperture.ManagerApi.SystemEvent

@moduledoc """
This module contains the logic to dispatch Builder messsages to the appropriate GenServer(s)
"""
This module contains the logic to dispatch Builder messsages to the appropriate GenServer(s)
"""

@connection_options nil
use OpenAperture.Messaging
use OpenAperture.Messaging

@doc """
Specific start_link implementation (required by the supervisor)
Expand All @@ -29,21 +29,21 @@ defmodule OpenAperture.FleetManager.Dispatcher do
{:ok, pid} | {:error, reason}
"""
@spec start_link() :: {:ok, pid} | {:error, String.t()}
@spec start_link() :: {:ok, pid} | {:error, String.t}
def start_link do
case GenServer.start_link(__MODULE__, %{}, name: __MODULE__) do
{:error, reason} ->
{:error, reason} ->
Logger.error("[Dispatcher] Failed to start OpenAperture FleetManager: #{inspect reason}")
{:error, reason}
{:ok, pid} ->
try do
if Application.get_env(:autostart, :register_queues, false) do
case register_queues do
{:ok, _} -> {:ok, pid}
{:error, reason} ->
{:error, reason} ->
Logger.error("[Dispatcher] Failed to register FleetManager queues: #{inspect reason}")
{:ok, pid}
end
end
else
{:ok, pid}
end
Expand All @@ -61,13 +61,13 @@ defmodule OpenAperture.FleetManager.Dispatcher do
:ok | {:error, reason}
"""
@spec register_queues() :: :ok | {:error, String.t()}
@spec register_queues() :: :ok | {:error, String.t}
def register_queues do
Logger.debug("[Dispatcher] Registering FleetManager queues...")
fleet_manager_queue = QueueBuilder.build(ManagerApi.get_api, Configuration.get_current_queue_name, Configuration.get_current_exchange_id)

options = OpenAperture.Messaging.ConnectionOptionsResolver.get_for_broker(ManagerApi.get_api, Configuration.get_current_broker_id)
subscribe(options, fleet_manager_queue, fn(payload, _meta, %{delivery_tag: delivery_tag} = async_info) ->
subscribe(options, fleet_manager_queue, fn(payload, _meta, %{delivery_tag: delivery_tag} = async_info) ->
MessageManager.track(async_info)
process_request(delivery_tag, payload)
end)
Expand All @@ -79,44 +79,44 @@ defmodule OpenAperture.FleetManager.Dispatcher do

fleet_request = FleetRequest.from_payload(request.request_body)

request = %{request |
request = %{request |
status: :error,
response_body: %{errors: [error_msg]}
}
acknowledge(delivery_tag, request)
}
acknowledge(delivery_tag, request)

event = %{
unique: true,
type: :unhandled_exception,
severity: :error,
type: :unhandled_exception,
severity: :error,
data: %{
component: :fleet_manager,
exchange_id: Configuration.get_current_exchange_id,
hostname: System.get_env("HOSTNAME"),
fleet_request_action: fleet_request.action
},
message: error_msg
}
SystemEvent.create_system_event!(ManagerApi.get_api, event)
}
SystemEvent.create_system_event!(ManagerApi.get_api, event)
end

@doc """
Method to process FleetManager requests for a defined period of time
## Options
The `delivery_tag` option is the unique identifier of the message
The `payload` defines the Messaging payload
"""
@spec process_request(String.t(), Map) :: term
@spec process_request(String.t, Map) :: term
def process_request(delivery_tag, payload) do
request = RpcRequest.from_payload(payload)

task = Task.async(fn ->
process_request_internal(request, delivery_tag)
end)
end)

#attempt to execute the request for 5 minutes before failing it
try do
Expand All @@ -126,12 +126,12 @@ defmodule OpenAperture.FleetManager.Dispatcher do
:exit, code -> process_request_failure("Exited with code #{inspect code}", delivery_tag, request)
:throw, value -> process_request_failure("Throw called with #{inspect value}", delivery_tag, request)
what, value -> process_request_failure("Caught #{inspect what} with #{inspect value}", delivery_tag, request)
end
end
end

@doc """
Method to execute a FleetManager request
## Options
The `request` option is the parsed RpcRequest request
Expand All @@ -147,13 +147,13 @@ defmodule OpenAperture.FleetManager.Dispatcher do
request = case FleetActions.execute(fleet_request) do
{:ok, response} ->
Logger.debug("[Dispatcher][Request][#{delivery_tag}] Completed successfully")
%{request |
%{request |
status: :completed,
response_body: response,
}
{:error, reason} ->
Logger.debug("[Dispatcher][Request][#{delivery_tag}] Failed: #{inspect reason}")
%{request |
%{request |
status: :error,
response_body: %{errors: ["#{inspect reason}"]},
}
Expand All @@ -164,7 +164,7 @@ defmodule OpenAperture.FleetManager.Dispatcher do
catch
:exit, code -> process_request_failure("Exited with code #{inspect code}", delivery_tag, request)
:throw, value -> process_request_failure("Throw called with #{inspect value}", delivery_tag, request)
what, value -> process_request_failure("Caught #{inspect what} with #{inspect value}", delivery_tag, request)
what, value -> process_request_failure("Caught #{inspect what} with #{inspect value}", delivery_tag, request)
end
end

Expand All @@ -178,7 +178,7 @@ defmodule OpenAperture.FleetManager.Dispatcher do
The `request` option is the RpcRequest
"""
@spec acknowledge(String.t(), RpcRequest.t) :: term
@spec acknowledge(String.t, RpcRequest.t) :: term
def acknowledge(delivery_tag, request) do
message = MessageManager.remove(delivery_tag)
unless message == nil do
Expand All @@ -200,14 +200,14 @@ defmodule OpenAperture.FleetManager.Dispatcher do
The `redeliver` option can be used to requeue a message
"""
@spec reject(String.t(), RpcRequest.t, term) :: term
@spec reject(String.t, RpcRequest.t, term) :: term
def reject(delivery_tag, request, redeliver \\ false) do
message = MessageManager.remove(delivery_tag)
unless message == nil do
SubscriptionHandler.reject_rpc(message[:subscription_handler], delivery_tag, ManagerApi.get_api, request, redeliver)
Logger.debug("[Dispatcher][Request][#{delivery_tag}] Rejected message")
Logger.debug("[Dispatcher][Request][#{delivery_tag}] Rejected message")
else
Logger.error("[Dispatcher][Request][#{delivery_tag}] Unable to reject message, MessageManager does not have a record!")
Logger.error("[Dispatcher][Request][#{delivery_tag}] Unable to reject message, MessageManager does not have a record!")
end
end
end
end
14 changes: 7 additions & 7 deletions lib/fleet_manager/message_manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ defmodule OpenAperture.FleetManager.MessageManager do

@moduledoc """
This module contains the logic for associating message references with their subscription handlers
"""
"""

@doc """
Creates a `GenServer` representing Docker host cluster.
## Return values
{:ok, pid} | {:error, String.t()}
{:ok, pid} | {:error, String.t}
"""
@spec start_link() :: {:ok, pid} | {:error, String.t()}
@spec start_link() :: {:ok, pid} | {:error, String.t}
def start_link() do
Logger.debug("#{@logprefix} Starting...")
Agent.start_link(fn -> %{} end, name: __MODULE__)
Expand All @@ -39,13 +39,13 @@ defmodule OpenAperture.FleetManager.MessageManager do
Logger.debug("#{@logprefix} Tracking message #{delivery_tag}...")
new_message = %{
process: self(),
subscription_handler: subscription_handler,
subscription_handler: subscription_handler,
delivery_tag: delivery_tag,
start_time: :calendar.universal_time
}

Agent.update(__MODULE__, fn messages -> Map.put(messages, delivery_tag, new_message) end)

new_message
end

Expand All @@ -60,7 +60,7 @@ defmodule OpenAperture.FleetManager.MessageManager do
Map containing the subscription_handler and delivery_tag
"""
@spec remove(String.t()) :: Map
@spec remove(String.t) :: Map
def remove(delivery_tag) do
Logger.debug("#{@logprefix} Finished tracking message #{delivery_tag}...")

Expand All @@ -69,4 +69,4 @@ defmodule OpenAperture.FleetManager.MessageManager do
{message, Map.delete(messages, delivery_tag)}
end)
end
end
end

0 comments on commit 3b2f9e6

Please sign in to comment.