Skip to content

Commit

Permalink
Merge pull request #47 from benonymus/tweak_child_specs
Browse files Browse the repository at this point in the history
Tweak child_specs
  • Loading branch information
chen-anders authored Dec 18, 2023
2 parents efe4ee3 + f5bbde3 commit e251bf4
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 29 deletions.
19 changes: 8 additions & 11 deletions lib/nsq/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule NSQ.Connection do
# ------------------------------------------------------- #
# Directives #
# ------------------------------------------------------- #
use GenServer
alias NSQ.Connection.Command
alias NSQ.Connection.Initializer
alias NSQ.Connection.MessageHandling
Expand Down Expand Up @@ -62,18 +63,14 @@ defmodule NSQ.Connection do
# ------------------------------------------------------- #
# Behaviour Implementation #
# ------------------------------------------------------- #
@spec start_link(pid, host_with_port, NSQ.Config.t(), String.t(), String.t(), pid, list) ::
@spec start_link({pid | host_with_port | NSQ.Config.t() | String.t() | String.t() | pid}) ::
{:ok, pid}
def start_link(
parent,
nsqd,
config,
topic,
channel,
conn_info_pid,
event_manager_pid,
opts \\ []
) do
def start_link({parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid}),
do: start_link({parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid, []})

@spec start_link({pid | host_with_port | NSQ.Config.t() | String.t() | String.t() | pid | list}) ::
{:ok, pid}
def start_link({parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid, opts}) do
state = %{
@initial_state
| parent: parent,
Expand Down
9 changes: 4 additions & 5 deletions lib/nsq/connection/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,23 @@ defmodule NSQ.Connection.Supervisor do
parent_state = parent_state || GenServer.call(parent, :state)
conn_sup_pid = parent_state.conn_sup_pid

args = [
args = {
parent,
nsqd,
parent_state.config,
parent_state.topic,
parent_state.channel,
parent_state.conn_info_pid,
parent_state.event_manager_pid
]
}

conn_id = ConnInfo.conn_id(parent, nsqd)

# When using nsqlookupd, we expect connections will be naturally
# rediscovered if they fail.
config =
[id: conn_id, start: {NSQ.Connection, :start_link, args}, restart: :temporary] ++ opts
opts = [id: conn_id, restart: :temporary] ++ opts

child = Map.new(config)
child = Supervisor.child_spec({NSQ.Connection, args}, opts)

Supervisor.start_child(conn_sup_pid, child)
end
Expand Down
7 changes: 5 additions & 2 deletions lib/nsq/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,11 @@ defmodule NSQ.Consumer do
@doc """
Starts a Consumer process, called via the supervisor.
"""
@spec start_link(String.t(), String.t(), NSQ.Config.t(), list) :: {:ok, pid}
def start_link(topic, channel, config, opts \\ []) do
@spec start_link({String.t() | String.t() | NSQ.Config.t()}) :: {:ok, pid}
def start_link({topic, channel, config}), do: start_link({topic, channel, config, []})

@spec start_link({String.t() | String.t() | NSQ.Config.t() | list}) :: {:ok, pid}
def start_link({topic, channel, config, opts}) do
{:ok, config} = NSQ.Config.validate(config)
{:ok, config} = NSQ.Config.normalize(config)
unless is_valid_topic_name?(topic), do: raise("Invalid topic name #{topic}")
Expand Down
12 changes: 6 additions & 6 deletions lib/nsq/consumer/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@ defmodule NSQ.Consumer.Supervisor do
rdy_loop_id = String.to_atom("#{consumer_name}_rdy_loop")

children = [
%{
id: NSQ.Consumer,
start: {NSQ.Consumer, :start_link, [topic, channel, config, [name: consumer_name]]}
},
{NSQ.Consumer, {topic, channel, config, [name: consumer_name]}},
# Tasks have temporary restart policy by default
Supervisor.child_spec(
{Task, fn -> NSQ.Consumer.Connections.discovery_loop(consumer_name) end},
id: discovery_loop_id
id: discovery_loop_id,
restart: :permanent
),
Supervisor.child_spec(
{Task, fn -> NSQ.Consumer.RDY.redistribute_loop(consumer_name) end},
id: rdy_loop_id
id: rdy_loop_id,
restart: :permanent
)
]

Expand Down
4 changes: 2 additions & 2 deletions lib/nsq/message/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ defmodule NSQ.Message.Supervisor do
def start_child(msg_sup_pid, message, opts \\ []) do
# If a message fails, NSQ will handle requeueing.
id = message.id <> "-" <> UUID.uuid4(:hex)
config = [id: id, start: {NSQ.Message, :start_link, [message]}, restart: :temporary] ++ opts
child = Map.new(config)
opts = [id: id, restart: :temporary] ++ opts
child = Supervisor.child_spec({NSQ.Message, message}, opts)
Supervisor.start_child(msg_sup_pid, child)
end

Expand Down
7 changes: 5 additions & 2 deletions lib/nsq/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ defmodule NSQ.Producer do
# ------------------------------------------------------- #
# API Definitions #
# ------------------------------------------------------- #
@spec start_link(binary, NSQ.Config.t(), GenServer.options()) :: {:ok, pid}
def start_link(topic, config, genserver_options \\ []) do
@spec start_link({binary(), NSQ.Config.t()}) :: {:ok, pid}
def start_link({topic, config}), do: start_link({topic, config, []})

@spec start_link({binary, NSQ.Config.t(), GenServer.options()}) :: {:ok, pid}
def start_link({topic, config, genserver_options}) do
{:ok, config} = NSQ.Config.validate(config || %NSQ.Config{})
{:ok, config} = NSQ.Config.normalize(config)
unless is_valid_topic_name?(topic), do: raise("Invalid topic name #{topic}")
Expand Down
2 changes: 1 addition & 1 deletion lib/nsq/producer/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule NSQ.Producer.Supervisor do

@impl true
def init({topic, config}) do
children = [%{id: NSQ.Producer, start: {NSQ.Producer, :start_link, [topic, config]}}]
children = [{NSQ.Producer, {topic, config}}]

Supervisor.init(children, strategy: :one_for_one)
end
Expand Down

0 comments on commit e251bf4

Please sign in to comment.