Skip to content

Commit

Permalink
fix doc warnings
Browse files Browse the repository at this point in the history
benonymus committed Dec 13, 2023
1 parent 4035cd4 commit e119db2
Showing 4 changed files with 84 additions and 159 deletions.
72 changes: 22 additions & 50 deletions lib/nsq/conn_info.ex
Original file line number Diff line number Diff line change
@@ -4,142 +4,115 @@ defmodule NSQ.ConnInfo do
connection ID.
"""
def conn_id(parent, {host, port} = _nsqd) do
"parent:#{inspect parent}:conn:#{host}:#{port}"
"parent:#{inspect(parent)}:conn:#{host}:#{port}"
end


@doc """
Given a `conn` object return by `Consumer.get_connections`, return the
Given a `conn` object return by `Consumer.get_connections` or connection state object, return the
connection ID.
"""
def conn_id({conn_id, conn_pid} = _conn) when is_pid(conn_pid) do
conn_id
end


@doc """
Given a connection state object, return the connection ID.
"""
def conn_id(%{parent: parent, nsqd: {host, port}} = _conn_state) do
conn_id(parent, {host, port})
end


@doc """
Get info for all connections in a map like `%{conn_id: %{... data ...}}`.
"""
def all(agent_pid) when is_pid(agent_pid) do
Agent.get(agent_pid, fn(data) -> data end)
Agent.get(agent_pid, fn data -> data end)
end


@doc """
`func` is passed `conn_info` for each connection.
"""
def reduce(agent_pid, start_acc, func) do
Agent.get agent_pid, fn(all_conn_info) ->
Agent.get(agent_pid, fn all_conn_info ->
Enum.reduce(all_conn_info, start_acc, func)
end
end)
end


@doc """
Get info for the connection matching `conn_id`.
"""
def fetch(agent_pid, conn_id) when is_pid(agent_pid) do
Agent.get(agent_pid, fn(data) -> data[conn_id] || %{} end)
Agent.get(agent_pid, fn data -> data[conn_id] || %{} end)
end


@doc false
def fetch(%{conn_info_pid: agent_pid}, conn_id) do
fetch(agent_pid, conn_id)
end


@doc """
Get specific data for the connection, e.g.:
[rdy_count, last_rdy] = fetch(pid, "conn_id", [:rdy_count, :last_rdy])
rdy_count = fetch(pid, "conn_id", :rdy_count)
"""
def fetch(agent_pid, conn_id, keys) when is_pid(agent_pid) do
Agent.get agent_pid, fn(data) ->
Agent.get(agent_pid, fn data ->
conn_map = data[conn_id] || %{}

if is_list(keys) do
Enum.map keys, &Map.get(conn_map, &1)
Enum.map(keys, &Map.get(conn_map, &1))
else
Map.get(conn_map, keys)
end
end
end)
end


@doc false
def fetch(%{conn_info_pid: agent_pid} = _state, {conn_id, _conn_pid} = _conn, keys) do
fetch(agent_pid, conn_id, keys)
end


@doc false
def fetch(%{conn_info_pid: agent_pid} = _state, conn_id, keys) do
fetch(agent_pid, conn_id, keys)
end


@doc """
Update the info for a specific connection matching `conn_id`. `conn_info` is
passed to `func`, and the result of `func` is saved as the new `conn_info`.
Update the info for a specific connection matching `conn_id`.
If a function is supplied `conn_info`is passed to `func`, and the result of `func` is saved as the new `conn_info`.
if a map is supplied, the map is merged into the existing conn_info.
"""
def update(agent_pid, conn_id, func) when is_pid(agent_pid) and is_function(func) do
Agent.update agent_pid, fn(data) ->
Agent.update(agent_pid, fn data ->
Map.put(data, conn_id, func.(data[conn_id] || %{}))
end
end)
end


@doc """
Update the info for a specific connection matching `conn_id`. The map is
merged into the existing conn_info.
"""
def update(agent_pid, conn_id, map) when is_pid(agent_pid) and is_map(map) do
Agent.update agent_pid, fn(data) ->
Agent.update(agent_pid, fn data ->
new_conn_data = Map.merge(data[conn_id] || %{}, map)
Map.put(data, conn_id, new_conn_data)
end
end)
end


@doc false
def update(%{conn_info_pid: agent_pid}, conn_id, func) do
update(agent_pid, conn_id, func)
end


@doc false
def update(%{conn_info_pid: agent_pid, parent: parent, nsqd: nsqd}, func) do
update(agent_pid, conn_id(parent, nsqd), func)
end


@doc """
Delete connection info matching `conn_id`. This should be called when a
connection is terminated.
"""
def delete(agent_pid, conn_id) when is_pid(agent_pid) do
Agent.update(agent_pid, fn(data) -> Map.delete(data, conn_id) end)
Agent.update(agent_pid, fn data -> Map.delete(data, conn_id) end)
end


@doc false
def delete(%{conn_info_pid: agent_pid}, conn_id) do
delete(agent_pid, conn_id)
end


@spec init(map) :: any
def init(state) do
update state, %{
update(state, %{
max_rdy: state.max_rdy,
rdy_count: 0,
last_rdy: 0,
@@ -149,14 +122,13 @@ defmodule NSQ.ConnInfo do
finished_count: 0,
requeued_count: 0,
failed_count: 0,
backoff_count: 0,
}
backoff_count: 0
})
end


@spec now :: integer
defp now do
{megasec, sec, microsec} = :os.timestamp
{megasec, sec, microsec} = :os.timestamp()
1_000_000 * megasec + sec + microsec / 1_000_000
end
end
131 changes: 45 additions & 86 deletions lib/nsq/consumer.ex
Original file line number Diff line number Diff line change
@@ -74,7 +74,6 @@ defmodule NSQ.Consumer do
it can fully rebalance.
"""


# ------------------------------------------------------- #
# Directives #
# ------------------------------------------------------- #
@@ -86,7 +85,6 @@ defmodule NSQ.Consumer do
alias NSQ.Consumer.RDY
alias NSQ.ConnInfo, as: ConnInfo


# ------------------------------------------------------- #
# Module Attributes #
# ------------------------------------------------------- #
@@ -103,62 +101,61 @@ defmodule NSQ.Consumer do
stop_flag: false,
backoff_counter: 0,
backoff_duration: 0,
distribution_counter: 0,
distribution_counter: 0
}


# ------------------------------------------------------- #
# Type Definitions #
# ------------------------------------------------------- #
@typedoc """
A tuple with a host and a port.
"""
@type host_with_port :: {String.t, integer}
@type host_with_port :: {String.t(), integer}

@typedoc """
A tuple with a string ID (used to target the connection in
NSQ.Connection.Supervisor) and a PID of the connection.
"""
@type connection :: {String.t, pid}
@type connection :: {String.t(), pid}

@typedoc """
A map, but we can be more specific by asserting some entries that should be
set for a connection's state map.
"""
@type cons_state :: %{conn_sup_pid: pid, config: NSQ.Config.t, conn_info_pid: pid}
@type state :: %{conn_sup_pid: pid, config: NSQ.Config.t, conn_info_pid: pid}

@type cons_state :: %{conn_sup_pid: pid, config: NSQ.Config.t(), conn_info_pid: pid}
@type state :: %{conn_sup_pid: pid, config: NSQ.Config.t(), conn_info_pid: pid}

# ------------------------------------------------------- #
# Behaviour Implementation #
# ------------------------------------------------------- #
@doc """
Starts a Consumer process, called via the supervisor.
"""
@spec start_link(String.t, String.t, NSQ.Config.t, list) :: {:ok, pid}
@spec start_link(String.t(), String.t(), NSQ.Config.t(), list) :: {:ok, pid}
def start_link(topic, channel, config, opts \\ []) do
{:ok, config} = NSQ.Config.validate(config)
{:ok, config} = NSQ.Config.normalize(config)
unless is_valid_topic_name?(topic), do: raise "Invalid topic name #{topic}"
unless is_valid_channel_name?(channel), do: raise "Invalid channel name #{channel}"

state = %{@initial_state |
topic: topic,
channel: channel,
config: config,
max_in_flight: config.max_in_flight
unless is_valid_topic_name?(topic), do: raise("Invalid topic name #{topic}")
unless is_valid_channel_name?(channel), do: raise("Invalid channel name #{channel}")

state = %{
@initial_state
| topic: topic,
channel: channel,
config: config,
max_in_flight: config.max_in_flight
}

GenServer.start_link(__MODULE__, state, opts)
end


@doc """
On init, we create a connection for each NSQD instance discovered, and set
up loops for discovery and RDY redistribution.
"""
@spec init(map) :: {:ok, cons_state}
def init(cons_state) do
{:ok, conn_sup_pid} = NSQ.Connection.Supervisor.start_link
{:ok, conn_sup_pid} = NSQ.Connection.Supervisor.start_link()
cons_state = %{cons_state | conn_sup_pid: conn_sup_pid}

{:ok, conn_info_pid} = Agent.start_link(fn -> %{} end)
@@ -168,143 +165,111 @@ defmodule NSQ.Consumer do
if cons_state.config.event_manager do
cons_state.config.event_manager
else
{:ok, manager} = :gen_event.start_link
{:ok, manager} = :gen_event.start_link()
manager
end

cons_state = %{cons_state | event_manager_pid: manager}

cons_state = %{cons_state | max_in_flight: cons_state.config.max_in_flight}

{:ok, _cons_state} = Connections.discover_nsqds_and_connect(self(), cons_state)
end


@doc """
The RDY loop periodically calls this to make sure RDY is balanced among our
connections.
"""
# The RDY loop periodically calls this to make sure RDY is balanced among our
# connections.
@spec handle_call(:redistribute_rdy, {reference, pid}, cons_state) ::
{:reply, :ok, cons_state}
{:reply, :ok, cons_state}
def handle_call(:redistribute_rdy, _from, cons_state) do
{:reply, :ok, RDY.redistribute!(self(), cons_state)}
end


@doc """
The discovery loop calls this periodically to add/remove active nsqd
connections. Called from Consumer.Supervisor.
"""
# The discovery loop calls this periodically to add/remove active nsqd
# connections. Called from Consumer.Supervisor.
@spec handle_call(:discover_nsqds, {reference, pid}, cons_state) ::
{:reply, :ok, cons_state}
{:reply, :ok, cons_state}
def handle_call(:discover_nsqds, _from, cons_state) do
{:reply, :ok, Connections.refresh!(cons_state)}
end


@doc """
Only used for specs.
"""
# Only used for specs.
@spec handle_call(:delete_dead_connections, {reference, pid}, cons_state) ::
{:reply, :ok, cons_state}
{:reply, :ok, cons_state}
def handle_call(:delete_dead_connections, _from, cons_state) do
{:reply, :ok, Connections.delete_dead!(cons_state)}
end


@doc """
Called from `NSQ.Message.fin/1`. Not for external use.
"""
# Called from `NSQ.Message.fin/1`. Not for external use.
@spec handle_call({:start_stop_continue_backoff, atom}, {reference, pid}, cons_state) ::
{:reply, :ok, cons_state}
{:reply, :ok, cons_state}
def handle_call({:start_stop_continue_backoff, backoff_flag}, _from, cons_state) do
{:reply, :ok, Backoff.start_stop_continue!(self(), backoff_flag, cons_state)}
end


@spec handle_call({:update_rdy, connection, integer}, {reference, pid}, cons_state) ::
{:reply, :ok, cons_state}
{:reply, :ok, cons_state}
def handle_call({:update_rdy, conn, count}, _from, cons_state) do
{:reply, :ok, RDY.update!(self(), conn, count, cons_state)}
end


@doc """
Called from tests to assert correct consumer state. Not for external use.
"""
# Called from tests to assert correct consumer state. Not for external use.
@spec handle_call(:state, {reference, pid}, cons_state) ::
{:reply, cons_state, cons_state}
{:reply, cons_state, cons_state}
def handle_call(:state, _from, state) do
{:reply, state, state}
end


def handle_call(:starved, _from, cons_state) do
is_starved =
ConnInfo.all(cons_state.conn_info_pid)
|> Enum.any?(fn({_conn_id, info}) ->
|> Enum.any?(fn {_conn_id, info} ->
info.messages_in_flight > 0 &&
info.messages_in_flight >= info.last_rdy * 0.85
end)

{:reply, is_starved, cons_state}
end


@doc """
Called from `NSQ.Consumer.change_max_in_flight(consumer, max_in_flight)`. Not
for external use.
"""
# Called from `NSQ.Consumer.change_max_in_flight(consumer, max_in_flight)`. Not
# for external use.
@spec handle_call({:max_in_flight, integer}, {reference, pid}, cons_state) ::
{:reply, :ok, cons_state}
{:reply, :ok, cons_state}
def handle_call({:max_in_flight, new_max_in_flight}, _from, state) do
{:reply, :ok, %{state | max_in_flight: new_max_in_flight}}
end


def handle_call(:close, _, cons_state) do
{:reply, :ok, Connections.close!(cons_state)}
end


@doc """
Called from NSQ.Consume.event_manager.
"""
# Called from NSQ.Consume.event_manager.
@spec handle_call(:event_manager, any, cons_state) ::
{:reply, pid, cons_state}
{:reply, pid, cons_state}
def handle_call(:event_manager, _from, state) do
{:reply, state.event_manager_pid, state}
end


@doc """
Called to observe all connection stats. For debugging or reporting purposes.
"""
# Called to observe all connection stats. For debugging or reporting purposes.
@spec handle_call(:conn_info, any, cons_state) :: {:reply, map, cons_state}
def handle_call(:conn_info, _from, state) do
{:reply, ConnInfo.all(state.conn_info_pid), state}
end


@doc """
Called from `Backoff.resume_later/3`. Not for external use.
"""
# Called from `Backoff.resume_later/3`. Not for external use.
@spec handle_cast(:resume, cons_state) :: {:noreply, cons_state}
def handle_cast(:resume, state) do
{:noreply, Backoff.resume!(self(), state)}
end


@doc """
Called from `NSQ.Connection.handle_cast({:nsq_msg, _}, _)` after each message
is received. Not for external use.
"""
# Called from `NSQ.Connection.handle_cast({:nsq_msg, _}, _)` after each message
# is received. Not for external use.
@spec handle_cast({:maybe_update_rdy, host_with_port}, cons_state) ::
{:noreply, cons_state}
{:noreply, cons_state}
def handle_cast({:maybe_update_rdy, {_host, _port} = nsqd}, cons_state) do
conn = conn_from_nsqd(self(), nsqd, cons_state)
{:noreply, RDY.maybe_update!(self(), conn, cons_state)}
end


# ------------------------------------------------------- #
# API Definitions #
# ------------------------------------------------------- #
@@ -313,13 +278,11 @@ defmodule NSQ.Consumer do
GenServer.call(cons, :starved)
end


def close(sup_pid) do
cons = get(sup_pid)
GenServer.call(cons, :close)
end


@doc """
Called from tests to assert correct consumer state. Not for external use.
"""
@@ -328,7 +291,6 @@ defmodule NSQ.Consumer do
GenServer.call(cons, :state)
end


@doc """
Public function to change `max_in_flight` for a consumer. The new value will
be balanced across connections.
@@ -339,7 +301,6 @@ defmodule NSQ.Consumer do
GenServer.call(cons, {:max_in_flight, new_max_in_flight})
end


@doc """
If the event manager is not defined in NSQ.Config, it will be generated. So
if you want to attach event handlers on the fly, you can use a syntax like
@@ -350,13 +311,11 @@ defmodule NSQ.Consumer do
GenServer.call(cons, :event_manager)
end


def conn_info(sup_pid) do
cons = get(sup_pid)
GenServer.call(cons, :conn_info)
end


@doc """
NSQ.Consumer.Supervisor.start_link returns the supervisor pid so that we can
effectively recover from consumer crashes. This function takes the supervisor
@@ -368,7 +327,7 @@ defmodule NSQ.Consumer do
@spec get(pid) :: pid
def get(sup_pid) do
children = Supervisor.which_children(sup_pid)
child = Enum.find(children, fn({kind, _, _, _}) -> kind == NSQ.Consumer end)
child = Enum.find(children, fn {kind, _, _, _} -> kind == NSQ.Consumer end)
{_, pid, _, _} = child
pid
end
29 changes: 12 additions & 17 deletions lib/nsq/consumer/helpers.ex
Original file line number Diff line number Diff line change
@@ -3,7 +3,6 @@ defmodule NSQ.Consumer.Helpers do
alias NSQ.Consumer.Connections
alias NSQ.ConnInfo


@doc """
Each connection is responsible for maintaining its own rdy_count in ConnInfo.
This function sums all the values of rdy_count for each connection, which
@@ -12,51 +11,47 @@ defmodule NSQ.Consumer.Helpers do
"""
@spec total_rdy_count(pid) :: integer
def total_rdy_count(agent_pid) when is_pid(agent_pid) do
ConnInfo.reduce agent_pid, 0, fn({_, conn_info}, acc) ->
ConnInfo.reduce(agent_pid, 0, fn {_, conn_info}, acc ->
acc + conn_info.rdy_count
end
end)
end


@doc """
Convenience function; uses the consumer state to get the conn info pid. Not
for external use.
"""
@spec total_rdy_count(C.state) :: integer
# Convenience function; uses the consumer state to get the conn info pid.
# Not for external use.
@spec total_rdy_count(C.state()) :: integer
def total_rdy_count(%{conn_info_pid: agent_pid} = _cons_state) do
total_rdy_count(agent_pid)
end


@doc """
Returns how much `max_in_flight` should be distributed to each connection.
If `max_in_flight` is less than the number of connections, then this always
returns 1 and they are randomly distributed via `redistribute_rdy`. Not for
external use.
"""
@spec per_conn_max_in_flight(C.state) :: integer
@spec per_conn_max_in_flight(C.state()) :: integer
def per_conn_max_in_flight(cons_state) do
max_in_flight = cons_state.max_in_flight
conn_count = Connections.count(cons_state)

if conn_count == 0 do
0
else
min(max(1, max_in_flight / conn_count), max_in_flight) |> round
end
end


@spec now() :: integer
def now do
:calendar.datetime_to_gregorian_seconds(:calendar.universal_time)
:calendar.datetime_to_gregorian_seconds(:calendar.universal_time())
end


@spec conn_from_nsqd(pid, C.host_with_port, C.state) :: C.connection
@spec conn_from_nsqd(pid, C.host_with_port(), C.state()) :: C.connection()
def conn_from_nsqd(cons, nsqd, cons_state) do
needle = ConnInfo.conn_id(cons, nsqd)
Enum.find Connections.get(cons_state), fn({conn_id, _}) ->

Enum.find(Connections.get(cons_state), fn {conn_id, _} ->
needle == conn_id
end
end)
end
end
11 changes: 5 additions & 6 deletions lib/nsq/logger.ex
Original file line number Diff line number Diff line change
@@ -5,17 +5,16 @@ defmodule NSQ.Logger do
defdelegate info(message), to: Logger
defdelegate error(message), to: Logger

def warn(message) do
case Version.compare(System.version(), "1.11.0") do
:lt -> Logger.warn(message)
_ -> Logger.warning(message)
end
case Version.compare(System.version(), "1.11.0") do
:lt -> defdelegate warn(message), to: Logger
_ -> defdelegate warn(message), to: Logger, as: :warning
end

def configure(opts) do
case {opts, Version.compare(System.version(), "1.11.0")} do
{[level: :warn], :gt} ->
Logger.configure(Keyword.merge(opts, [level: :warning]))
Logger.configure(Keyword.merge(opts, level: :warning))

_ ->
Logger.configure(opts)
end

0 comments on commit e119db2

Please sign in to comment.