Skip to content

Commit

Permalink
format codebase
Browse files Browse the repository at this point in the history
  • Loading branch information
benonymus committed Dec 13, 2023
1 parent 78556cc commit c683a60
Show file tree
Hide file tree
Showing 16 changed files with 832 additions and 718 deletions.
4 changes: 4 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
49 changes: 26 additions & 23 deletions lib/nsq/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule NSQ.Config do
# Duration between polling lookupd for new producers, and fractional jitter
# to add to the lookupd pool loop. this helps evenly distribute requests
# even if multiple consumers restart at the same time
#
#
# NOTE: when not using nsqlookupd, lookupd_poll_interval represents the
# duration of time between reconnection attempts
lookupd_poll_interval: 1 * @minutes,
Expand Down Expand Up @@ -135,7 +135,6 @@ defmodule NSQ.Config do

defstruct Enum.into(@default_config, [])


@doc """
Given a config, tell us what's wrong with it. If nothing is wrong, we'll
return `{:ok, config}`.
Expand All @@ -153,18 +152,24 @@ defmodule NSQ.Config do

%NSQ.Config{} = config

errors = errors ++ Enum.map @valid_ranges, fn({name, {min, max}}) ->
case range_error(Map.get(config, name), min, max) do
{:error, reason} -> "#{name}: #{reason}"
:ok -> nil
end
end

errors = [no_match_error(
config.backoff_strategy, [:exponential, :test]
) | errors]

errors = Enum.reject(errors, fn(v) -> v == nil end)
errors =
errors ++
Enum.map(@valid_ranges, fn {name, {min, max}} ->
case range_error(Map.get(config, name), min, max) do
{:error, reason} -> "#{name}: #{reason}"
:ok -> nil
end
end)

errors = [
no_match_error(
config.backoff_strategy,
[:exponential, :test]
)
| errors
]

errors = Enum.reject(errors, fn v -> v == nil end)

if length(errors) > 0 do
{:error, errors}
Expand All @@ -173,32 +178,32 @@ defmodule NSQ.Config do
end
end


def normalize(config) do
config = %NSQ.Config{config | nsqds: normalize_hosts(config.nsqds)}
config = %NSQ.Config{config | nsqlookupds: normalize_hosts(config.nsqlookupds)}
{:ok, config}
end


def normalize_hosts(hosts) do
Enum.map hosts, fn (host_with_port) ->
Enum.map(hosts, fn host_with_port ->
cond do
is_tuple(host_with_port) ->
{_host, _port} = host_with_port

is_binary(host_with_port) ->
[host, port] = host_with_port |> String.split(":")
{port, _} = Integer.parse(port)
{host, port}

is_list(host_with_port) ->
{_host, _port} = List.to_tuple(host_with_port)

true ->
raise "Invalid host definition #{inspect host_with_port}"
raise "Invalid host definition #{inspect(host_with_port)}"
end
end
end)
end


defp range_error(val, min, max) do
cond do
val == nil -> :ok
Expand All @@ -208,12 +213,10 @@ defmodule NSQ.Config do
end
end


defp matches_any?(val, candidates) do
Enum.any?(candidates, fn(candidate) -> candidate == val end)
Enum.any?(candidates, fn candidate -> candidate == val end)
end


defp no_match_error(val, candidates) do
if matches_any?(val, candidates) do
nil
Expand Down
105 changes: 54 additions & 51 deletions lib/nsq/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ defmodule NSQ.Connection do
Sets up a TCP connection to NSQD. Both consumers and producers use this.
"""


# ------------------------------------------------------- #
# Directives #
# ------------------------------------------------------- #
Expand All @@ -12,27 +11,25 @@ defmodule NSQ.Connection do
alias NSQ.Connection.MessageHandling
alias NSQ.ConnInfo


# ------------------------------------------------------- #
# Type Definitions #
# ------------------------------------------------------- #
@typedoc """
A tuple with a host and a port.
"""
@type host_with_port :: {String.t, integer}
@type host_with_port :: {String.t(), integer}

@typedoc """
A tuple with a string ID (used to target the connection in
NSQ.Connection.Supervisor) and a PID of the connection.
"""
@type connection :: {String.t, pid}
@type connection :: {String.t(), pid}

@typedoc """
A map, but we can be more specific by asserting some entries that should be
set for a connection's state map.
"""
@type state :: %{parent: pid, config: NSQ.Config.t, nsqd: host_with_port}

@type state :: %{parent: pid, config: NSQ.Config.t(), nsqd: host_with_port}

# ------------------------------------------------------- #
# Module Attributes #
Expand All @@ -43,8 +40,8 @@ defmodule NSQ.Connection do
reader: nil,
writer: nil,
connected: nil,
cmd_resp_queue: :queue.new,
cmd_queue: :queue.new,
cmd_resp_queue: :queue.new(),
cmd_queue: :queue.new(),
config: %{},
reader_pid: nil,
msg_sup_pid: nil,
Expand All @@ -59,29 +56,38 @@ defmodule NSQ.Connection do
connect_attempts: 0,
stop_flag: false,
conn_info_pid: nil,
msg_timeout: nil,
msg_timeout: nil
}


# ------------------------------------------------------- #
# Behaviour Implementation #
# ------------------------------------------------------- #
@spec start_link(pid, host_with_port, NSQ.Config.t, String.t, String.t, pid, list) ::
{:ok, pid}
def start_link(parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid, opts \\ []) do
state = %{@initial_state |
parent: parent,
nsqd: nsqd,
config: config,
topic: topic,
channel: channel,
conn_info_pid: conn_info_pid,
event_manager_pid: event_manager_pid
@spec start_link(pid, host_with_port, NSQ.Config.t(), String.t(), String.t(), pid, list) ::
{:ok, pid}
def start_link(
parent,
nsqd,
config,
topic,
channel,
conn_info_pid,
event_manager_pid,
opts \\ []
) do
state = %{
@initial_state
| parent: parent,
nsqd: nsqd,
config: config,
topic: topic,
channel: channel,
conn_info_pid: conn_info_pid,
event_manager_pid: event_manager_pid
}

{:ok, _pid} = GenServer.start_link(__MODULE__, state, opts)
end


@spec init(state) :: {:ok, state}
def init(conn_state) do
{:ok, reader} = NSQ.Connection.Buffer.start_link(:reader)
Expand All @@ -90,78 +96,71 @@ defmodule NSQ.Connection do
{:ok, writer} = NSQ.Connection.Buffer.start_link(:writer)
conn_state = %{conn_state | writer: writer}

{:ok, msg_sup_pid} = NSQ.Message.Supervisor.start_link
{:ok, msg_sup_pid} = NSQ.Message.Supervisor.start_link()
conn_state = %{conn_state | msg_sup_pid: msg_sup_pid}

conn_state |> ConnInfo.init
conn_state |> ConnInfo.init()

case conn_state |> Initializer.connect do
case conn_state |> Initializer.connect() do
{:ok, state} -> {:ok, state}
{{:error, _reason}, state} -> {:ok, state}
end
end


def terminate(_reason, _state) do
:ok
end


@spec handle_call({:cmd, tuple, atom}, {pid, reference}, state) ::
{:reply, {:ok, reference}, state} |
{:reply, {:queued, :nosocket}, state}
{:reply, {:ok, reference}, state}
| {:reply, {:queued, :nosocket}, state}
def handle_call({:cmd, cmd, kind}, from, state) do
{reply, state} = state |> Command.exec(cmd, kind, from)
{:reply, reply, state}
end


@spec handle_call(:stop, {pid, reference}, state) :: {:stop, :normal, state}
def handle_call(:stop, _from, state) do
{:stop, :normal, state}
end


@spec handle_call(:state, {pid, reference}, state) :: {:reply, state, state}
def handle_call(:state, _from, state) do
{:reply, state, state}
end


@spec handle_call({:nsq_msg, binary}, {pid, reference}, state) :: {:reply, :ok, state}
def handle_call({:nsq_msg, msg}, _from, state) do
{:ok, state} = MessageHandling.handle_nsq_message(msg, state)
{:reply, :ok, state}
end


@spec handle_cast(:flush_cmd_queue, state) :: {:noreply, state}
def handle_cast(:flush_cmd_queue, state) do
{:noreply, Command.flush_cmd_queue!(state)}
end


@spec handle_cast(:reconnect, state) :: {:noreply, state}
def handle_cast(:reconnect, %{connect_attempts: connect_attempts} = conn_state) when connect_attempts > 0 do
def handle_cast(:reconnect, %{connect_attempts: connect_attempts} = conn_state)
when connect_attempts > 0 do
{_, conn_state} = Initializer.connect(conn_state)
{:noreply, conn_state}
end

def handle_cast(:reconnect, conn_state) do
{:noreply, conn_state}
end


# When a task is done, it automatically messages the return value to the
# calling process. we can use that opportunity to update the messages in
# flight.
@spec handle_info({reference, {:message_done, NSQ.Message.t, any}}, state) ::
{:noreply, T.conn_state}
@spec handle_info({reference, {:message_done, NSQ.Message.t(), any}}, state) ::
{:noreply, T.conn_state()}
def handle_info({:message_done, _msg, ret_val}, state) do
state |> MessageHandling.update_conn_stats_on_message_done(ret_val)
{:noreply, state}
end


# ------------------------------------------------------- #
# API Definitions #
# ------------------------------------------------------- #
Expand All @@ -175,28 +174,31 @@ defmodule NSQ.Connection do
get_state(pid)
end


@spec close(pid, state) :: any
def close(conn, conn_state \\ nil) do
NSQ.Logger.debug "Closing connection #{inspect conn}"
NSQ.Logger.debug("Closing connection #{inspect(conn)}")
conn_state = conn_state || get_state(conn)

# send a CLS command and expect CLOSE_WAIT in response
{:ok, "CLOSE_WAIT"} = cmd(conn, :cls)

# grace period: poll once per second until zero are in flight
result = wait_for_zero_in_flight_with_timeout(
conn_state.conn_info_pid,
ConnInfo.conn_id(conn_state),
conn_state.msg_timeout
)
result =
wait_for_zero_in_flight_with_timeout(
conn_state.conn_info_pid,
ConnInfo.conn_id(conn_state),
conn_state.msg_timeout
)

# either way, we're exiting
case result do
:ok ->
NSQ.Logger.warn "#{inspect conn}: No more messages in flight. Exiting."
NSQ.Logger.warn("#{inspect(conn)}: No more messages in flight. Exiting.")

:timeout ->
NSQ.Logger.error "#{inspect conn}: Timed out waiting for messages to finish. Exiting anyway."
NSQ.Logger.error(
"#{inspect(conn)}: Timed out waiting for messages to finish. Exiting anyway."
)
end

Process.exit(self(), :normal)
Expand All @@ -206,9 +208,10 @@ defmodule NSQ.Connection do
Calls the command and waits for a response. If a command shouldn't have a
response, use cmd_noresponse.
"""
@spec cmd(pid, tuple, integer) :: {:ok, binary} | {:error, String.t}
@spec cmd(pid, tuple, integer) :: {:ok, binary} | {:error, String.t()}
def cmd(conn_pid, cmd, timeout \\ 5000) do
{:ok, ref} = GenServer.call(conn_pid, {:cmd, cmd, :reply})

receive do
{^ref, data} ->
{:ok, data}
Expand All @@ -235,7 +238,8 @@ defmodule NSQ.Connection do
@spec wait_for_zero_in_flight(pid, binary) :: any
defp wait_for_zero_in_flight(agent_pid, conn_id) do
[in_flight] = ConnInfo.fetch(agent_pid, conn_id, [:messages_in_flight])
NSQ.Logger.debug("Conn #{inspect conn_id}: #{in_flight} still in flight")
NSQ.Logger.debug("Conn #{inspect(conn_id)}: #{in_flight} still in flight")

if in_flight <= 0 do
:ok
else
Expand All @@ -244,7 +248,6 @@ defmodule NSQ.Connection do
end
end


@spec wait_for_zero_in_flight_with_timeout(pid, binary, integer) :: any
defp wait_for_zero_in_flight_with_timeout(agent_pid, conn_id, timeout) do
try do
Expand Down
Loading

0 comments on commit c683a60

Please sign in to comment.