diff --git a/.formatter.exs b/.formatter.exs new file mode 100644 index 0000000..d2cda26 --- /dev/null +++ b/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/lib/nsq/config.ex b/lib/nsq/config.ex index b20a2f2..8556bb6 100644 --- a/lib/nsq/config.ex +++ b/lib/nsq/config.ex @@ -17,7 +17,7 @@ defmodule NSQ.Config do # Duration between polling lookupd for new producers, and fractional jitter # to add to the lookupd pool loop. this helps evenly distribute requests # even if multiple consumers restart at the same time - # + # # NOTE: when not using nsqlookupd, lookupd_poll_interval represents the # duration of time between reconnection attempts lookupd_poll_interval: 1 * @minutes, @@ -135,7 +135,6 @@ defmodule NSQ.Config do defstruct Enum.into(@default_config, []) - @doc """ Given a config, tell us what's wrong with it. If nothing is wrong, we'll return `{:ok, config}`. @@ -153,18 +152,24 @@ defmodule NSQ.Config do %NSQ.Config{} = config - errors = errors ++ Enum.map @valid_ranges, fn({name, {min, max}}) -> - case range_error(Map.get(config, name), min, max) do - {:error, reason} -> "#{name}: #{reason}" - :ok -> nil - end - end - - errors = [no_match_error( - config.backoff_strategy, [:exponential, :test] - ) | errors] - - errors = Enum.reject(errors, fn(v) -> v == nil end) + errors = + errors ++ + Enum.map(@valid_ranges, fn {name, {min, max}} -> + case range_error(Map.get(config, name), min, max) do + {:error, reason} -> "#{name}: #{reason}" + :ok -> nil + end + end) + + errors = [ + no_match_error( + config.backoff_strategy, + [:exponential, :test] + ) + | errors + ] + + errors = Enum.reject(errors, fn v -> v == nil end) if length(errors) > 0 do {:error, errors} @@ -173,32 +178,32 @@ defmodule NSQ.Config do end end - def normalize(config) do config = %NSQ.Config{config | nsqds: normalize_hosts(config.nsqds)} config = %NSQ.Config{config | nsqlookupds: normalize_hosts(config.nsqlookupds)} {:ok, config} end - def normalize_hosts(hosts) do - Enum.map hosts, fn (host_with_port) -> + Enum.map(hosts, fn host_with_port -> cond do is_tuple(host_with_port) -> {_host, _port} = host_with_port + is_binary(host_with_port) -> [host, port] = host_with_port |> String.split(":") {port, _} = Integer.parse(port) {host, port} + is_list(host_with_port) -> {_host, _port} = List.to_tuple(host_with_port) + true -> - raise "Invalid host definition #{inspect host_with_port}" + raise "Invalid host definition #{inspect(host_with_port)}" end - end + end) end - defp range_error(val, min, max) do cond do val == nil -> :ok @@ -208,12 +213,10 @@ defmodule NSQ.Config do end end - defp matches_any?(val, candidates) do - Enum.any?(candidates, fn(candidate) -> candidate == val end) + Enum.any?(candidates, fn candidate -> candidate == val end) end - defp no_match_error(val, candidates) do if matches_any?(val, candidates) do nil diff --git a/lib/nsq/connection.ex b/lib/nsq/connection.ex index 29f657b..75736fe 100644 --- a/lib/nsq/connection.ex +++ b/lib/nsq/connection.ex @@ -3,7 +3,6 @@ defmodule NSQ.Connection do Sets up a TCP connection to NSQD. Both consumers and producers use this. """ - # ------------------------------------------------------- # # Directives # # ------------------------------------------------------- # @@ -12,27 +11,25 @@ defmodule NSQ.Connection do alias NSQ.Connection.MessageHandling alias NSQ.ConnInfo - # ------------------------------------------------------- # # Type Definitions # # ------------------------------------------------------- # @typedoc """ A tuple with a host and a port. """ - @type host_with_port :: {String.t, integer} + @type host_with_port :: {String.t(), integer} @typedoc """ A tuple with a string ID (used to target the connection in NSQ.Connection.Supervisor) and a PID of the connection. """ - @type connection :: {String.t, pid} + @type connection :: {String.t(), pid} @typedoc """ A map, but we can be more specific by asserting some entries that should be set for a connection's state map. """ - @type state :: %{parent: pid, config: NSQ.Config.t, nsqd: host_with_port} - + @type state :: %{parent: pid, config: NSQ.Config.t(), nsqd: host_with_port} # ------------------------------------------------------- # # Module Attributes # @@ -43,8 +40,8 @@ defmodule NSQ.Connection do reader: nil, writer: nil, connected: nil, - cmd_resp_queue: :queue.new, - cmd_queue: :queue.new, + cmd_resp_queue: :queue.new(), + cmd_queue: :queue.new(), config: %{}, reader_pid: nil, msg_sup_pid: nil, @@ -59,29 +56,38 @@ defmodule NSQ.Connection do connect_attempts: 0, stop_flag: false, conn_info_pid: nil, - msg_timeout: nil, + msg_timeout: nil } - # ------------------------------------------------------- # # Behaviour Implementation # # ------------------------------------------------------- # - @spec start_link(pid, host_with_port, NSQ.Config.t, String.t, String.t, pid, list) :: - {:ok, pid} - def start_link(parent, nsqd, config, topic, channel, conn_info_pid, event_manager_pid, opts \\ []) do - state = %{@initial_state | - parent: parent, - nsqd: nsqd, - config: config, - topic: topic, - channel: channel, - conn_info_pid: conn_info_pid, - event_manager_pid: event_manager_pid + @spec start_link(pid, host_with_port, NSQ.Config.t(), String.t(), String.t(), pid, list) :: + {:ok, pid} + def start_link( + parent, + nsqd, + config, + topic, + channel, + conn_info_pid, + event_manager_pid, + opts \\ [] + ) do + state = %{ + @initial_state + | parent: parent, + nsqd: nsqd, + config: config, + topic: topic, + channel: channel, + conn_info_pid: conn_info_pid, + event_manager_pid: event_manager_pid } + {:ok, _pid} = GenServer.start_link(__MODULE__, state, opts) end - @spec init(state) :: {:ok, state} def init(conn_state) do {:ok, reader} = NSQ.Connection.Buffer.start_link(:reader) @@ -90,78 +96,71 @@ defmodule NSQ.Connection do {:ok, writer} = NSQ.Connection.Buffer.start_link(:writer) conn_state = %{conn_state | writer: writer} - {:ok, msg_sup_pid} = NSQ.Message.Supervisor.start_link + {:ok, msg_sup_pid} = NSQ.Message.Supervisor.start_link() conn_state = %{conn_state | msg_sup_pid: msg_sup_pid} - conn_state |> ConnInfo.init + conn_state |> ConnInfo.init() - case conn_state |> Initializer.connect do + case conn_state |> Initializer.connect() do {:ok, state} -> {:ok, state} {{:error, _reason}, state} -> {:ok, state} end end - def terminate(_reason, _state) do :ok end - @spec handle_call({:cmd, tuple, atom}, {pid, reference}, state) :: - {:reply, {:ok, reference}, state} | - {:reply, {:queued, :nosocket}, state} + {:reply, {:ok, reference}, state} + | {:reply, {:queued, :nosocket}, state} def handle_call({:cmd, cmd, kind}, from, state) do {reply, state} = state |> Command.exec(cmd, kind, from) {:reply, reply, state} end - @spec handle_call(:stop, {pid, reference}, state) :: {:stop, :normal, state} def handle_call(:stop, _from, state) do {:stop, :normal, state} end - @spec handle_call(:state, {pid, reference}, state) :: {:reply, state, state} def handle_call(:state, _from, state) do {:reply, state, state} end - @spec handle_call({:nsq_msg, binary}, {pid, reference}, state) :: {:reply, :ok, state} def handle_call({:nsq_msg, msg}, _from, state) do {:ok, state} = MessageHandling.handle_nsq_message(msg, state) {:reply, :ok, state} end - @spec handle_cast(:flush_cmd_queue, state) :: {:noreply, state} def handle_cast(:flush_cmd_queue, state) do {:noreply, Command.flush_cmd_queue!(state)} end - @spec handle_cast(:reconnect, state) :: {:noreply, state} - def handle_cast(:reconnect, %{connect_attempts: connect_attempts} = conn_state) when connect_attempts > 0 do + def handle_cast(:reconnect, %{connect_attempts: connect_attempts} = conn_state) + when connect_attempts > 0 do {_, conn_state} = Initializer.connect(conn_state) {:noreply, conn_state} end + def handle_cast(:reconnect, conn_state) do {:noreply, conn_state} end - # When a task is done, it automatically messages the return value to the # calling process. we can use that opportunity to update the messages in # flight. - @spec handle_info({reference, {:message_done, NSQ.Message.t, any}}, state) :: - {:noreply, T.conn_state} + @spec handle_info({reference, {:message_done, NSQ.Message.t(), any}}, state) :: + {:noreply, T.conn_state()} def handle_info({:message_done, _msg, ret_val}, state) do state |> MessageHandling.update_conn_stats_on_message_done(ret_val) {:noreply, state} end - # ------------------------------------------------------- # # API Definitions # # ------------------------------------------------------- # @@ -175,28 +174,31 @@ defmodule NSQ.Connection do get_state(pid) end - @spec close(pid, state) :: any def close(conn, conn_state \\ nil) do - NSQ.Logger.debug "Closing connection #{inspect conn}" + NSQ.Logger.debug("Closing connection #{inspect(conn)}") conn_state = conn_state || get_state(conn) # send a CLS command and expect CLOSE_WAIT in response {:ok, "CLOSE_WAIT"} = cmd(conn, :cls) # grace period: poll once per second until zero are in flight - result = wait_for_zero_in_flight_with_timeout( - conn_state.conn_info_pid, - ConnInfo.conn_id(conn_state), - conn_state.msg_timeout - ) + result = + wait_for_zero_in_flight_with_timeout( + conn_state.conn_info_pid, + ConnInfo.conn_id(conn_state), + conn_state.msg_timeout + ) # either way, we're exiting case result do :ok -> - NSQ.Logger.warn "#{inspect conn}: No more messages in flight. Exiting." + NSQ.Logger.warn("#{inspect(conn)}: No more messages in flight. Exiting.") + :timeout -> - NSQ.Logger.error "#{inspect conn}: Timed out waiting for messages to finish. Exiting anyway." + NSQ.Logger.error( + "#{inspect(conn)}: Timed out waiting for messages to finish. Exiting anyway." + ) end Process.exit(self(), :normal) @@ -206,9 +208,10 @@ defmodule NSQ.Connection do Calls the command and waits for a response. If a command shouldn't have a response, use cmd_noresponse. """ - @spec cmd(pid, tuple, integer) :: {:ok, binary} | {:error, String.t} + @spec cmd(pid, tuple, integer) :: {:ok, binary} | {:error, String.t()} def cmd(conn_pid, cmd, timeout \\ 5000) do {:ok, ref} = GenServer.call(conn_pid, {:cmd, cmd, :reply}) + receive do {^ref, data} -> {:ok, data} @@ -235,7 +238,8 @@ defmodule NSQ.Connection do @spec wait_for_zero_in_flight(pid, binary) :: any defp wait_for_zero_in_flight(agent_pid, conn_id) do [in_flight] = ConnInfo.fetch(agent_pid, conn_id, [:messages_in_flight]) - NSQ.Logger.debug("Conn #{inspect conn_id}: #{in_flight} still in flight") + NSQ.Logger.debug("Conn #{inspect(conn_id)}: #{in_flight} still in flight") + if in_flight <= 0 do :ok else @@ -244,7 +248,6 @@ defmodule NSQ.Connection do end end - @spec wait_for_zero_in_flight_with_timeout(pid, binary, integer) :: any defp wait_for_zero_in_flight_with_timeout(agent_pid, conn_id, timeout) do try do diff --git a/lib/nsq/connection/buffer.ex b/lib/nsq/connection/buffer.ex index cb00e99..c2d6e47 100644 --- a/lib/nsq/connection/buffer.ex +++ b/lib/nsq/connection/buffer.ex @@ -10,39 +10,40 @@ defmodule NSQ.Connection.Buffer do timeout: nil, type: nil, zin: nil, - zout: nil, + zout: nil } def start_link(type, opts \\ []) do {:ok, _pid} = GenServer.start_link(__MODULE__, type, opts) end - def init(type) do {:ok, %{@initial_state | type: type}} end - def handle_call({:setup_socket, socket, timeout}, _from, state) do {:reply, :ok, %{state | socket: socket, timeout: timeout}} end - def handle_call({:setup_compression, compression}, _from, state) do state = case compression do :plaintext -> NSQ.Logger.debug("Not compressing or decompressing data") %{state | compression: :plaintext} + {:deflate, level} -> NSQ.Logger.debug("Using DEFLATE level #{level} to compress and decompress data") state = %{state | compression: :deflate} + case state.type do :reader -> %{state | zin: open_zin!()} |> convert_plaintext_buffer(:deflate) + :writer -> %{state | zout: open_zout!(level)} end + :snappy -> raise "snappy isn't implemented yet!" end @@ -50,57 +51,55 @@ defmodule NSQ.Connection.Buffer do {:reply, :ok, state} end - def handle_call({:recv, size}, _from, state) do {result, state} = state |> do_recv(size) {:reply, result, state} end - def handle_call({:send, data}, _from, state) do {:reply, state |> do_send(data), state} end - def setup_socket(buffer, socket, timeout) do :ok = buffer |> GenServer.call({:setup_socket, socket, timeout}) end - def setup_compression(buffer, identify_response, config) do case compression_from_identify_response(identify_response, config) do {:ok, {:deflate, level}} -> :ok = buffer |> GenServer.call({:setup_compression, {:deflate, level}}) + {:ok, :plaintext} -> :ok = buffer |> GenServer.call({:setup_compression, :plaintext}) end end - def recv(%{reader: buffer, config: %NSQ.Config{read_timeout: timeout}}, size) do buffer |> recv(size, timeout) end + def recv(buffer, size, timeout) do buffer |> GenServer.call({:recv, size}, timeout) end + def recv!(state, size) do {:ok, data} = state |> recv(size) data end + def recv!(buffer, size, timeout) do {:ok, data} = buffer |> recv(size, timeout) data end - def send!(%{writer: buffer}, data) do buffer |> send!(data) end + def send!(buffer, data) do :ok = buffer |> GenServer.call({:send, data}) end - defp do_send(state, data) do case state.compression do :plaintext -> @@ -108,29 +107,24 @@ defmodule NSQ.Connection.Buffer do :deflate -> compressed = - state.zout - |> :zlib.deflate(data, :sync) - |> List.flatten |> Enum.join("") + state.zout |> :zlib.deflate(data, :sync) |> List.flatten() |> Enum.join("") state.socket |> Socket.Stream.send!(compressed) end end - defp open_zin! do - z = :zlib.open + z = :zlib.open() :ok = z |> :zlib.inflateInit(-15) z end - defp open_zout!(level) do - z = :zlib.open + z = :zlib.open() :ok = z |> :zlib.deflateInit(level, :deflated, -15, 8, :default) z end - # This tries to copy the semantics of a standard socket recv. That is, if you # give it a size, it will block until we have enough data then return it. If # you pass 0 as the size, we'll block until there's X (i.e. an undefined @@ -139,11 +133,9 @@ defmodule NSQ.Connection.Buffer do cond do state.buffered_data_size > 0 && state.buffered_data_size >= size -> <> <> leftover = state.buffered_data - state = %{state | - buffered_data: leftover, - buffered_data_size: byte_size(leftover) - } + state = %{state | buffered_data: leftover, buffered_data_size: byte_size(leftover)} {{:ok, taken}, state} + true -> case state |> buffer do {:error, error} -> {{:error, error}, state} @@ -152,7 +144,6 @@ defmodule NSQ.Connection.Buffer do end end - # Grabs as much data from the socket as is available, combines it with any # compressed data from previous buffering, decompresses it, and stores the # output in several state properties. @@ -167,7 +158,6 @@ defmodule NSQ.Connection.Buffer do end end - # During initialization, it's highly possible that NSQ sends us compressed # messages that are buffered before we start decompressing on the fly. This # assumes all messages in the buffer are compressed and decompresses them. @@ -175,20 +165,20 @@ defmodule NSQ.Connection.Buffer do case compressor do :deflate -> plaintext_data = state.buffered_data + state |> reset_buffer! |> add_raw_chunk_to_buffer!(plaintext_data) + :snappy -> raise "Snappy isn't implemented yet!" end end - defp reset_buffer!(state) do %{state | buffered_data: "", buffered_data_size: 0, compressed_data: ""} end - # Given a chunk of data, decompress as much as we can and store it in its # appropriate place in the buffer. defp add_raw_chunk_to_buffer!(state, raw_chunk) do @@ -198,10 +188,12 @@ defmodule NSQ.Connection.Buffer do case state.compression do :plaintext -> {raw_chunk, ""} + :deflate -> case state.zin |> :zlib.inflateChunk(raw_chunk) do {more, decompressed} -> {decompressed, more} + decompressed -> {decompressed, ""} end @@ -209,27 +201,29 @@ defmodule NSQ.Connection.Buffer do decompressed = if is_list(decompressed) do - decompressed |> List.flatten |> Enum.join("") + decompressed |> List.flatten() |> Enum.join("") else decompressed end combined_buffer = state.buffered_data <> decompressed - %{state | - buffered_data: combined_buffer, - buffered_data_size: byte_size(combined_buffer), - compressed_data: compressed + %{ + state + | buffered_data: combined_buffer, + buffered_data_size: byte_size(combined_buffer), + compressed_data: compressed } end - defp compression_from_identify_response(response, config) do cond do response["snappy"] == true -> {:error, "snappy isn't implemented yet!"} + response["deflate"] == true -> {:ok, {:deflate, config.deflate_level}} + true -> {:ok, :plaintext} end diff --git a/lib/nsq/connection/command.ex b/lib/nsq/connection/command.ex index bad3e33..b1b21db 100644 --- a/lib/nsq/connection/command.ex +++ b/lib/nsq/connection/command.ex @@ -5,13 +5,11 @@ defmodule NSQ.Connection.Command do of the trickier command queueing, flushing, etc. """ - alias NSQ.Connection, as: C alias NSQ.Connection.Buffer alias NSQ.ConnInfo import NSQ.Protocol - def exec(state, cmd, kind, {_, ref} = from) do if state.connected do state = send_data_and_queue_resp(state, cmd, from, kind) @@ -25,40 +23,42 @@ defmodule NSQ.Connection.Command do end end - - @spec send_data_and_queue_resp(C.state, tuple, {reference, pid}, atom) :: C.state + @spec send_data_and_queue_resp(C.state(), tuple, {reference, pid}, atom) :: C.state() def send_data_and_queue_resp(state, cmd, from, kind) do state |> Buffer.send!(encode(cmd)) + if kind == :noresponse do state else - %{state | - cmd_resp_queue: :queue.in({cmd, from, kind}, state.cmd_resp_queue) - } + %{state | cmd_resp_queue: :queue.in({cmd, from, kind}, state.cmd_resp_queue)} end end - - @spec send_response_to_caller(C.state, binary) :: {:ok, C.state} + @spec send_response_to_caller(C.state(), binary) :: {:ok, C.state()} def send_response_to_caller(state, data) do :gen_event.notify(state.event_manager_pid, {:response, data}) {item, cmd_resp_queue} = :queue.out(state.cmd_resp_queue) + case item do {:value, {_cmd, {pid, ref}, :reply}} -> send(pid, {ref, data}) - :empty -> :ok + + :empty -> + :ok end + {:ok, %{state | cmd_resp_queue: cmd_resp_queue}} end - - @spec flush_cmd_queue(C.state) :: C.state + @spec flush_cmd_queue(C.state()) :: C.state() def flush_cmd_queue(state) do {item, new_queue} = :queue.out(state.cmd_queue) + case item do {:value, {cmd, from, kind}} -> state = send_data_and_queue_resp(state, cmd, from, kind) flush_cmd_queue(%{state | cmd_queue: new_queue}) + :empty -> {:ok, %{state | cmd_queue: new_queue}} end @@ -69,14 +69,15 @@ defmodule NSQ.Connection.Command do state end - - @spec update_state_from_cmd(tuple, C.state) :: C.state + @spec update_state_from_cmd(tuple, C.state()) :: C.state() def update_state_from_cmd(cmd, state) do case cmd do {:rdy, count} -> ConnInfo.update(state, %{rdy_count: count, last_rdy: count}) state - _any -> state + + _any -> + state end end end diff --git a/lib/nsq/connection/message_handling.ex b/lib/nsq/connection/message_handling.ex index 25bc7dc..6023888 100644 --- a/lib/nsq/connection/message_handling.ex +++ b/lib/nsq/connection/message_handling.ex @@ -5,7 +5,6 @@ defmodule NSQ.Connection.MessageHandling do alias NSQ.Connection.Command import NSQ.Protocol - @doc """ This is the recv loop that we kick off in a separate process immediately after the handshake. We send each incoming NSQ message as an erlang message @@ -17,7 +16,8 @@ defmodule NSQ.Connection.MessageHandling do # If publishing is quiet, we won't receive any messages in the timeout. # This is fine. Let's just try again! conn_state |> recv_nsq_messages(conn) - {:ok, <>} -> + + {:ok, <>} -> # Got a message! Decode it and let the connection know. We just # received data on the socket to get the size of this message, so if we # timeout in here, that's probably indicative of a problem. @@ -51,92 +51,94 @@ defmodule NSQ.Connection.MessageHandling do state |> kick_off_message_processing(data) end - - @spec update_conn_stats_on_message_done(C.state, any) :: any + @spec update_conn_stats_on_message_done(C.state(), any) :: any def update_conn_stats_on_message_done(state, ret_val) do - ConnInfo.update state, fn(info) -> + ConnInfo.update(state, fn info -> info |> update_stats_from_ret_val(ret_val) - end + end) end - @spec update_stats_from_ret_val(map, any) :: map defp update_stats_from_ret_val(info, ret_val) do info = %{info | messages_in_flight: info.messages_in_flight - 1} + case ret_val do :ok -> %{info | finished_count: info.finished_count + 1} + :fail -> %{info | failed_count: info.failed_count + 1} + :req -> %{info | requeued_count: info.requeued_count + 1} + {:req, _} -> %{info | requeued_count: info.requeued_count + 1} + {:req, _, true} -> - %{info | - requeued_count: info.requeued_count + 1, - backoff_count: info.backoff_count + 1 - } + %{info | requeued_count: info.requeued_count + 1, backoff_count: info.backoff_count + 1} + {:req, _, _} -> %{info | requeued_count: info.requeued_count + 1} end end - - @spec respond_to_heartbeat(C.state) :: :ok + @spec respond_to_heartbeat(C.state()) :: :ok defp respond_to_heartbeat(state) do :gen_event.notify(state.event_manager_pid, :heartbeat) state |> Buffer.send!(encode(:noop)) end - - @spec log_error(C.state, binary, binary) :: any + @spec log_error(C.state(), binary, binary) :: any defp log_error(state, reason, data) do :gen_event.notify(state.event_manager_pid, {:error, reason, data}) + if reason do - NSQ.Logger.error "error: #{reason}\n#{inspect data}" + NSQ.Logger.error("error: #{reason}\n#{inspect(data)}") else - NSQ.Logger.error "error: #{inspect data}" + NSQ.Logger.error("error: #{inspect(data)}") end end - - @spec kick_off_message_processing(C.state, binary) :: {:ok, C.state} + @spec kick_off_message_processing(C.state(), binary) :: {:ok, C.state()} defp kick_off_message_processing(state, data) do message = NSQ.Message.from_data(data) state = received_message(state) - message = %NSQ.Message{message | - connection: self(), - consumer: state.parent, - reader: state.reader, - writer: state.writer, - config: state.config, - msg_timeout: state.msg_timeout, - event_manager_pid: state.event_manager_pid + + message = %NSQ.Message{ + message + | connection: self(), + consumer: state.parent, + reader: state.reader, + writer: state.writer, + config: state.config, + msg_timeout: state.msg_timeout, + event_manager_pid: state.event_manager_pid } + :gen_event.notify(state.event_manager_pid, {:message, message}) GenServer.cast(state.parent, {:maybe_update_rdy, state.nsqd}) NSQ.Message.Supervisor.start_child(state.msg_sup_pid, message) {:ok, state} end - - @spec received_message(C.state) :: C.state + @spec received_message(C.state()) :: C.state() defp received_message(state) do - ConnInfo.update state, fn(info) -> - %{info | - rdy_count: info.rdy_count - 1, - messages_in_flight: info.messages_in_flight + 1, - last_msg_timestamp: now() + ConnInfo.update(state, fn info -> + %{ + info + | rdy_count: info.rdy_count - 1, + messages_in_flight: info.messages_in_flight + 1, + last_msg_timestamp: now() } - end + end) + state end - @spec now :: integer defp now do - {megasec, sec, microsec} = :os.timestamp + {megasec, sec, microsec} = :os.timestamp() 1_000_000 * megasec + sec + microsec / 1_000_000 end end diff --git a/lib/nsq/consumer/backoff.ex b/lib/nsq/consumer/backoff.ex index 5170344..4ebcada 100644 --- a/lib/nsq/consumer/backoff.ex +++ b/lib/nsq/consumer/backoff.ex @@ -3,17 +3,15 @@ defmodule NSQ.Consumer.Backoff do When messages fail unexpectedly hard, we go into "backoff mode". """ - alias NSQ.Consumer, as: C alias NSQ.Consumer.Connections alias NSQ.Consumer.RDY import NSQ.Consumer.Helpers - @doc """ Decision point about whether to continue/end/ignore backoff. """ - @spec start_stop_continue(pid, atom, C.state) :: {:ok, C.state} + @spec start_stop_continue(pid, atom, C.state()) :: {:ok, C.state()} def start_stop_continue(cons, backoff_signal, cons_state) do {backoff_updated, cons_state} = cons_state |> update_backoff_counter(backoff_signal) @@ -22,10 +20,13 @@ defmodule NSQ.Consumer.Backoff do cons_state.config.max_backoff_duration <= 0 -> # Never backoff if max_backoff_duration is <= 0 {:ok, cons_state} + cons_state.backoff_counter == 0 && backoff_updated -> {:ok, _state} = exit_backoff(cons, cons_state) + cons_state.backoff_counter > 0 -> {:ok, _state} = backoff(cons, cons_state, backoff_signal) + true -> {:ok, cons_state} end @@ -36,14 +37,13 @@ defmodule NSQ.Consumer.Backoff do cons_state end - @doc """ This function is called asynchronously from `resume_later`. It will cause one connection to have RDY 1. We only resume after this if messages succeed a number of times == backoff_counter. (That logic is in start_stop_continue.) """ - @spec resume(pid, C.state) :: {:ok, C.state} + @spec resume(pid, C.state()) :: {:ok, C.state()} def resume(_cons, %{backoff_duration: 0, backoff_counter: 0} = cons_state), # looks like we successfully left backoff mode already do: {:ok, cons_state} @@ -62,7 +62,7 @@ defmodule NSQ.Consumer.Backoff do else # pick a random connection to test the waters conn = random_connection_for_backoff(cons_state) - NSQ.Logger.warn("(#{inspect conn}) backoff timeout expired, sending RDY 1") + NSQ.Logger.warn("(#{inspect(conn)}) backoff timeout expired, sending RDY 1") # while in backoff only ever let 1 message at a time through RDY.update(cons, conn, 1, cons_state) @@ -76,85 +76,93 @@ defmodule NSQ.Consumer.Backoff do cons_state end - - @spec backoff(pid, C.state, boolean) :: {:ok, C.state} + @spec backoff(pid, C.state(), boolean) :: {:ok, C.state()} defp backoff(cons, cons_state, backoff_signal) do backoff_duration = calculate_backoff(cons_state) - NSQ.Logger.warn "backing off for #{backoff_duration / 1000} seconds (backoff level #{cons_state.backoff_counter}), setting all to RDY 0" + + NSQ.Logger.warn( + "backing off for #{backoff_duration / 1000} seconds (backoff level #{cons_state.backoff_counter}), setting all to RDY 0" + ) + # send RDY 0 immediately (to *all* connections) - cons_state = Enum.reduce Connections.get(cons_state), cons_state, fn(conn, last_state) -> - {:ok, new_state} = RDY.update(cons, conn, 0, last_state) - new_state - end + cons_state = + Enum.reduce(Connections.get(cons_state), cons_state, fn conn, last_state -> + {:ok, new_state} = RDY.update(cons, conn, 0, last_state) + new_state + end) + :gen_event.notify(cons_state.event_manager_pid, backoff_signal) {:ok, _cons_state} = resume_later(cons, backoff_duration, cons_state) end - - @spec update_backoff_counter(C.state, atom) :: {boolean, C.state} + @spec update_backoff_counter(C.state(), atom) :: {boolean, C.state()} defp update_backoff_counter(cons_state, backoff_signal) do - {backoff_updated, backoff_counter} = cond do - backoff_signal == :resume -> - if cons_state.backoff_counter <= 0 do + {backoff_updated, backoff_counter} = + cond do + backoff_signal == :resume -> + if cons_state.backoff_counter <= 0 do + {false, cons_state.backoff_counter} + else + {true, cons_state.backoff_counter - 1} + end + + backoff_signal == :backoff -> + {true, cons_state.backoff_counter + 1} + + true -> {false, cons_state.backoff_counter} - else - {true, cons_state.backoff_counter - 1} - end - backoff_signal == :backoff -> - {true, cons_state.backoff_counter + 1} - true -> - {false, cons_state.backoff_counter} - end + end + cons_state = %{cons_state | backoff_counter: backoff_counter} {backoff_updated, cons_state} end - - @spec exit_backoff(pid, C.state) :: {:ok, C.state} + @spec exit_backoff(pid, C.state()) :: {:ok, C.state()} defp exit_backoff(cons, cons_state) do count = per_conn_max_in_flight(cons_state) - NSQ.Logger.warn "exiting backoff, returning all to RDY #{count}" - cons_state = Enum.reduce Connections.get(cons_state), cons_state, fn(conn, last_state) -> - {:ok, new_state} = RDY.update(cons, conn, count, last_state) - new_state - end + NSQ.Logger.warn("exiting backoff, returning all to RDY #{count}") + + cons_state = + Enum.reduce(Connections.get(cons_state), cons_state, fn conn, last_state -> + {:ok, new_state} = RDY.update(cons, conn, count, last_state) + new_state + end) + :gen_event.notify(cons_state.event_manager_pid, :resume) {:ok, cons_state} end - # Try resuming from backoff in a few seconds. - @spec resume_later(pid, integer, C.state) :: - {:ok, C.state} + @spec resume_later(pid, integer, C.state()) :: + {:ok, C.state()} defp resume_later(cons, duration, cons_state) do - Task.start_link fn -> + Task.start_link(fn -> :timer.sleep(duration) GenServer.cast(cons, :resume) - end + end) + cons_state = %{cons_state | backoff_duration: duration} {:ok, cons_state} end - - @spec random_connection_for_backoff(C.state) :: C.connection + @spec random_connection_for_backoff(C.state()) :: C.connection() defp random_connection_for_backoff(cons_state) do if cons_state.config.backoff_strategy == :test do # When testing, we're only sending 1 message at a time to a single # nsqd. In this mode, instead of a random connection, always use the # first one that was defined, which ends up being the last one in our # list. - cons_state |> Connections.get |> List.last + cons_state |> Connections.get() |> List.last() else - cons_state |> Connections.get |> Enum.random + cons_state |> Connections.get() |> Enum.random() end end - # Returns the backoff duration in milliseconds. Different strategies can # technically be used, but currently there is only `:exponential` in # production mode and `:test` for tests. Not for external use. - @spec calculate_backoff(C.state) :: integer + @spec calculate_backoff(C.state()) :: integer defp calculate_backoff(cons_state) do case cons_state.config.backoff_strategy do :exponential -> exponential_backoff(cons_state) @@ -162,17 +170,18 @@ defmodule NSQ.Consumer.Backoff do end end - # Used to calculate backoff in milliseconds in production. We include jitter # so that, if we have many consumers in a cluster, we avoid the thundering # herd problem when they attempt to resume. Not for external use. - @spec exponential_backoff(C.state) :: integer + @spec exponential_backoff(C.state()) :: integer defp exponential_backoff(cons_state) do attempts = cons_state.backoff_counter mult = cons_state.config.backoff_multiplier + min( mult * :math.pow(2, attempts), cons_state.config.max_backoff_duration - ) |> round + ) + |> round end end diff --git a/lib/nsq/consumer/connections.ex b/lib/nsq/consumer/connections.ex index 0261104..e913dde 100644 --- a/lib/nsq/consumer/connections.ex +++ b/lib/nsq/consumer/connections.ex @@ -3,13 +3,11 @@ defmodule NSQ.Consumer.Connections do Functions for connecting, disconnecting, managing connections, etc. """ - import NSQ.Consumer.Helpers alias NSQ.ConnInfo alias NSQ.Consumer, as: C alias NSQ.Consumer.RDY - @doc """ Initialized from NSQ.Consumer.Supervisor, sends the consumer a message on a fixed interval. @@ -17,24 +15,27 @@ defmodule NSQ.Consumer.Connections do @spec discovery_loop(pid) :: any def discovery_loop(cons) do cons_state = C.get_state(cons) + %NSQ.Config{ lookupd_poll_interval: poll_interval, lookupd_poll_jitter: poll_jitter } = cons_state.config - delay = poll_interval + round(poll_interval * poll_jitter * :rand.uniform) + + delay = poll_interval + round(poll_interval * poll_jitter * :rand.uniform()) :timer.sleep(delay) GenServer.call(cons, :discover_nsqds) discovery_loop(cons) end - def close(cons_state) do - NSQ.Logger.info "Closing connections for consumer #{inspect self()}" + NSQ.Logger.info("Closing connections for consumer #{inspect(self())}") connections = get(cons_state) - Enum.each connections, fn({_, conn_pid}) -> + + Enum.each(connections, fn {_, conn_pid} -> Task.start_link(NSQ.Connection, :close, [conn_pid]) - end + end) + {:ok, %{cons_state | stop_flag: true}} end @@ -43,7 +44,6 @@ defmodule NSQ.Consumer.Connections do cons_state end - def refresh(cons_state) do {:ok, cons_state} = delete_dead(cons_state) {:ok, cons_state} = reconnect_failed(cons_state) @@ -56,40 +56,44 @@ defmodule NSQ.Consumer.Connections do cons_state end - @doc """ Finds and updates list of live NSQDs using either NSQ.Config.nsqlookupds or NSQ.Config.nsqds, depending on what's configured. Preference is given to nsqlookupd. Not for external use. """ - @spec discover_nsqds_and_connect(pid, C.state) :: {:ok, C.state} + @spec discover_nsqds_and_connect(pid, C.state()) :: {:ok, C.state()} def discover_nsqds_and_connect(cons, cons_state) do - nsqds = cond do - length(cons_state.config.nsqlookupds) > 0 -> - NSQ.Logger.debug "(#{inspect self()}) Discovering nsqds via nsqlookupds #{inspect cons_state.config.nsqlookupds}" - cons_state.config.nsqlookupds - |> NSQ.Lookupd.nsqds_with_topic(cons_state.topic) - - length(cons_state.config.nsqds) > 0 -> - NSQ.Logger.debug "(#{inspect self()}) Using configured nsqds #{inspect cons_state.config.nsqds}" - cons_state.config.nsqds - - true -> - raise "No nsqds or nsqlookupds are configured" - end + nsqds = + cond do + length(cons_state.config.nsqlookupds) > 0 -> + NSQ.Logger.debug( + "(#{inspect(self())}) Discovering nsqds via nsqlookupds #{inspect(cons_state.config.nsqlookupds)}" + ) + + cons_state.config.nsqlookupds + |> NSQ.Lookupd.nsqds_with_topic(cons_state.topic) + + length(cons_state.config.nsqds) > 0 -> + NSQ.Logger.debug( + "(#{inspect(self())}) Using configured nsqds #{inspect(cons_state.config.nsqds)}" + ) + + cons_state.config.nsqds + + true -> + raise "No nsqds or nsqlookupds are configured" + end {:ok, _cons_state} = update(nsqds, cons, cons_state) end - @doc """ Any inactive connections will be killed and any newly discovered connections will be added. Existing connections with no change are left alone. Not for external use. """ - @spec update([C.host_with_port], pid, C.state) :: {:ok, C.state} + @spec update([C.host_with_port()], pid, C.state()) :: {:ok, C.state()} def update(discovered_nsqds, cons, cons_state) do - dead_conns = dead_connections(discovered_nsqds, cons, cons_state) {:ok, cons_state} = stop_connections(dead_conns, cons, cons_state) @@ -99,40 +103,46 @@ defmodule NSQ.Consumer.Connections do {:ok, cons_state} end - @doc """ Given a list of NSQD hosts, open a connection for each. """ - @spec connect_to_nsqds([C.host_with_port], pid, C.state) :: {:ok, C.state} + @spec connect_to_nsqds([C.host_with_port()], pid, C.state()) :: {:ok, C.state()} def connect_to_nsqds(nsqds, cons, cons_state \\ nil) do if length(nsqds) > 0 do - NSQ.Logger.info "Connecting to nsqds #{inspect nsqds}" - end - cons_state = Enum.reduce nsqds, cons_state, fn(nsqd, last_state) -> - {:ok, new_state} = connect_to_nsqd(nsqd, cons, last_state) - new_state + NSQ.Logger.info("Connecting to nsqds #{inspect(nsqds)}") end + + cons_state = + Enum.reduce(nsqds, cons_state, fn nsqd, last_state -> + {:ok, new_state} = connect_to_nsqd(nsqd, cons, last_state) + new_state + end) + {:ok, cons_state} end - @doc """ Create a connection to NSQD and add it to the consumer's supervised list. Not for external use. """ - @spec connect_to_nsqd(C.host_with_port, pid, C.state) :: {:ok, C.state} + @spec connect_to_nsqd(C.host_with_port(), pid, C.state()) :: {:ok, C.state()} def connect_to_nsqd(nsqd, cons, cons_state) do Process.flag(:trap_exit, true) + try do - {:ok, _pid} = NSQ.Connection.Supervisor.start_child( - cons, nsqd, cons_state - ) + {:ok, _pid} = + NSQ.Connection.Supervisor.start_child( + cons, + nsqd, + cons_state + ) # We normally set RDY to 1, but if we're spawning more connections than # max_in_flight, we don't want to break our contract. In that case, the # `RDY.redistribute` loop will take care of getting this connection some # messages later. remaining_rdy = cons_state.max_in_flight - total_rdy_count(cons_state) + cons_state = if remaining_rdy > 0 do conn = conn_from_nsqd(cons, nsqd, cons_state) @@ -145,7 +155,7 @@ defmodule NSQ.Consumer.Connections do {:ok, cons_state} catch :error, _ -> - NSQ.Logger.error "#{inspect cons}: Error connecting to #{inspect nsqd}" + NSQ.Logger.error("#{inspect(cons)}: Error connecting to #{inspect(nsqd)}") conn_id = ConnInfo.conn_id(cons, nsqd) ConnInfo.delete(cons_state, conn_id) {:ok, cons_state} @@ -154,32 +164,31 @@ defmodule NSQ.Consumer.Connections do end end - @doc """ Given a list of connections, force them to stop. Return the new state without those connections. """ - @spec stop_connections([C.connection], pid, C.state) :: {:ok, C.state} + @spec stop_connections([C.connection()], pid, C.state()) :: {:ok, C.state()} def stop_connections(dead_conns, cons, cons_state) do if length(dead_conns) > 0 do - NSQ.Logger.info "Stopping connections #{inspect dead_conns}" + NSQ.Logger.info("Stopping connections #{inspect(dead_conns)}") end - cons_state = Enum.reduce dead_conns, cons_state, fn({nsqd, _pid}, last_state) -> - {:ok, new_state} = stop_connection(cons, nsqd, last_state) - new_state - end + cons_state = + Enum.reduce(dead_conns, cons_state, fn {nsqd, _pid}, last_state -> + {:ok, new_state} = stop_connection(cons, nsqd, last_state) + new_state + end) {:ok, cons_state} end - @doc """ Given a single connection, immediately terminate its process (and all descendant processes, such as message handlers) and remove its info from the ConnInfo agent. Not for external use. """ - @spec stop_connection(pid, C.host_with_port, C.state) :: {:ok, C.state} + @spec stop_connection(pid, C.host_with_port(), C.state()) :: {:ok, C.state()} def stop_connection(cons, conn_id, cons_state) do # Terminate the connection for real. # TODO: Change this method to `kill_connection` and make `stop_connection` @@ -190,7 +199,6 @@ defmodule NSQ.Consumer.Connections do {:ok, cons_state} end - @doc """ When a connection is terminated or dies, we must do some extra cleanup. First, a terminated process isn't necessarily removed from the supervisor's @@ -198,7 +206,7 @@ defmodule NSQ.Consumer.Connections do connection like RDY must be removed so it doesn't contribute to `total_rdy`. Not for external use. """ - @spec cleanup_connection(pid, C.host_with_port, C.state) :: {:ok, C.state} + @spec cleanup_connection(pid, C.host_with_port(), C.state()) :: {:ok, C.state()} def cleanup_connection(_cons, conn_id, cons_state) do # If a connection is terminated normally or non-normally, it will still be # listed in the supervision tree. Let's remove it when we clean up. @@ -211,33 +219,30 @@ defmodule NSQ.Consumer.Connections do {:ok, cons_state} end - @doc """ We may have open connections which nsqlookupd stops reporting. This function tells us which connections we have stored in state but not in nsqlookupd. Not for external use. """ - @spec dead_connections([C.host_with_port], pid, C.state) :: [C.connection] + @spec dead_connections([C.host_with_port()], pid, C.state()) :: [C.connection()] def dead_connections(discovered_nsqds, cons, cons_state) do - Enum.reject get(cons_state), fn(conn) -> + Enum.reject(get(cons_state), fn conn -> conn_already_discovered?(cons, conn, discovered_nsqds) - end + end) end - @doc """ When nsqlookupd reports available producers, there are some that may not already be in our connection list. This function reports which ones are new so we can connect to them. """ - @spec new_nsqds([C.host_with_port], pid, C.state) :: [C.host_with_port] + @spec new_nsqds([C.host_with_port()], pid, C.state()) :: [C.host_with_port()] def new_nsqds(discovered_nsqds, cons, cons_state) do - Enum.reject discovered_nsqds, fn(nsqd) -> + Enum.reject(discovered_nsqds, fn nsqd -> nsqd_already_has_connection?(nsqd, cons, cons_state) - end + end) end - @doc """ Frequently, when testing, we publish a message then immediately want a consumer to process it, but this doesn't work if the consumer doesn't @@ -250,20 +255,20 @@ defmodule NSQ.Consumer.Connections do :ok end - @doc """ Iterate over all listed connections and delete the ones that are dead. This exists because it is difficult to reliably clean up a connection immediately after it is terminated (it might still be running). This function runs in the discovery loop to provide consistency. """ - @spec delete_dead(C.state) :: {:ok, C.state} + @spec delete_dead(C.state()) :: {:ok, C.state()} def delete_dead(state) do - Enum.each get(state), fn({conn_id, pid}) -> + Enum.each(get(state), fn {conn_id, pid} -> unless Process.alive?(pid) do Supervisor.delete_child(state.conn_sup_pid, conn_id) end - end + end) + {:ok, state} end @@ -272,84 +277,89 @@ defmodule NSQ.Consumer.Connections do state end - def reconnect_failed(state) do - Enum.each get(state), fn({_, pid}) -> + Enum.each(get(state), fn {_, pid} -> if Process.alive?(pid), do: GenServer.cast(pid, :reconnect) - end + end) + {:ok, state} end - - @spec count(C.state) :: integer + @spec count(C.state()) :: integer def count(cons_state) do %{active: active} = Supervisor.count_children(cons_state.conn_sup_pid) + if is_integer(active) do active else - NSQ.Logger.warn "(#{inspect self()}) non-integer #{inspect active} returned counting connections, returning 0 instead" + NSQ.Logger.warn( + "(#{inspect(self())}) non-integer #{inspect(active)} returned counting connections, returning 0 instead" + ) + 0 end end - @doc """ Returns all live connections for a consumer. This function, which takes a consumer's entire state as an argument, is for convenience. Not for external use. """ - @spec get(C.state) :: [C.connection] + @spec get(C.state()) :: [C.connection()] def get(%{conn_sup_pid: conn_sup_pid}) do children = Supervisor.which_children(conn_sup_pid) - Enum.map children, fn({child_id, pid, _, _}) -> {child_id, pid} end + Enum.map(children, fn {child_id, pid, _, _} -> {child_id, pid} end) end - @doc """ Returns all live connections for a consumer. Used in tests. Not for external use. """ - @spec get(pid, C.state) :: [C.connection] + @spec get(pid, C.state()) :: [C.connection()] def get(cons, cons_state \\ nil) when is_pid(cons) do cons_state = cons_state || C.get_state(cons) children = Supervisor.which_children(cons_state.conn_sup_pid) - Enum.map children, fn({child_id, pid, _, _}) -> {child_id, pid} end + Enum.map(children, fn {child_id, pid, _, _} -> {child_id, pid} end) end - - @spec idle_with_rdy(C.state) :: [C.connection] + @spec idle_with_rdy(C.state()) :: [C.connection()] def idle_with_rdy(cons_state) do conns = get(cons_state) - Enum.filter conns, fn(conn) -> + + Enum.filter(conns, fn conn -> conn_id = ConnInfo.conn_id(conn) - [last_msg_t, rdy_count] = ConnInfo.fetch( - cons_state, conn_id, [:last_msg_timestamp, :rdy_count] - ) + + [last_msg_t, rdy_count] = + ConnInfo.fetch( + cons_state, + conn_id, + [:last_msg_timestamp, :rdy_count] + ) + sec_since_last_msg = now() - last_msg_t ms_since_last_msg = sec_since_last_msg * 1000 NSQ.Logger.debug( - "(#{inspect conn}) rdy: #{rdy_count} (last message received #{sec_since_last_msg} seconds ago)" + "(#{inspect(conn)}) rdy: #{rdy_count} (last message received #{sec_since_last_msg} seconds ago)" ) ms_since_last_msg > cons_state.config.low_rdy_idle_timeout && rdy_count > 0 - end + end) end - - @spec conn_already_discovered?(pid, C.connection, [C.host_with_port]) :: boolean + @spec conn_already_discovered?(pid, C.connection(), [C.host_with_port()]) :: boolean defp conn_already_discovered?(cons, {conn_id, _}, discovered_nsqds) do - Enum.any? discovered_nsqds, fn(nsqd) -> + Enum.any?(discovered_nsqds, fn nsqd -> ConnInfo.conn_id(cons, nsqd) == conn_id - end + end) end - - @spec nsqd_already_has_connection?(C.host_with_port, pid, C.state) :: boolean + @spec nsqd_already_has_connection?(C.host_with_port(), pid, C.state()) :: boolean defp nsqd_already_has_connection?(nsqd, cons, cons_state) do needle = ConnInfo.conn_id(cons, nsqd) - Enum.any? get(cons_state), fn({conn_id, _}) -> + + Enum.any?(get(cons_state), fn {conn_id, _} -> conn_id == needle - end + end) end end diff --git a/lib/nsq/consumer/rdy.ex b/lib/nsq/consumer/rdy.ex index 1f6a388..f972772 100644 --- a/lib/nsq/consumer/rdy.ex +++ b/lib/nsq/consumer/rdy.ex @@ -4,13 +4,11 @@ defmodule NSQ.Consumer.RDY do is where that goes! """ - alias NSQ.ConnInfo alias NSQ.Consumer, as: C alias NSQ.Consumer.Connections import NSQ.Consumer.Helpers - @doc """ Initialized from NSQ.Consumer.Supervisor, sends the consumer a message on a fixed interval. @@ -24,37 +22,43 @@ defmodule NSQ.Consumer.RDY do redistribute_loop(cons) end - @doc """ If we're not in backoff mode and we've hit a "trigger point" to update RDY, then go ahead and update RDY. Not for external use. """ - @spec maybe_update(pid, C.connection, C.state) :: {:ok, C.state} + @spec maybe_update(pid, C.connection(), C.state()) :: {:ok, C.state()} def maybe_update(cons, conn, cons_state) do if cons_state.backoff_counter > 0 || cons_state.backoff_duration > 0 do # In backoff mode, we only let `start_stop_continue_backoff/3` handle # this case. - NSQ.Logger.debug """ - (#{inspect conn}) skip sending RDY in_backoff:#{cons_state.backoff_counter} || in_backoff_timeout:#{cons_state.backoff_duration} - """ + NSQ.Logger.debug(""" + (#{inspect(conn)}) skip sending RDY in_backoff:#{cons_state.backoff_counter} || in_backoff_timeout:#{cons_state.backoff_duration} + """) + {:ok, cons_state} else - [remain, last_rdy] = ConnInfo.fetch( - cons_state, ConnInfo.conn_id(conn), [:rdy_count, :last_rdy] - ) + [remain, last_rdy] = + ConnInfo.fetch( + cons_state, + ConnInfo.conn_id(conn), + [:rdy_count, :last_rdy] + ) + desired_rdy = per_conn_max_in_flight(cons_state) - if remain <= 1 || remain < (last_rdy / 4) || (desired_rdy > 0 && desired_rdy < remain) do - NSQ.Logger.debug """ - (#{inspect conn}) sending RDY #{desired_rdy} \ + if remain <= 1 || remain < last_rdy / 4 || (desired_rdy > 0 && desired_rdy < remain) do + NSQ.Logger.debug(""" + (#{inspect(conn)}) sending RDY #{desired_rdy} \ (#{remain} remain from last RDY #{last_rdy}) - """ + """) + {:ok, _cons_state} = update(cons, conn, desired_rdy, cons_state) else - NSQ.Logger.debug """ - (#{inspect conn}) skip sending RDY #{desired_rdy} \ + NSQ.Logger.debug(""" + (#{inspect(conn)}) skip sending RDY #{desired_rdy} \ (#{remain} remain out of last RDY #{last_rdy}) - """ + """) + {:ok, cons_state} end end @@ -65,28 +69,29 @@ defmodule NSQ.Consumer.RDY do cons_state end - @doc """ Try to update RDY for a given connection, taking configuration and the current state into account. Not for external use. """ - @spec update(pid, C.connection, integer, C.state) :: {:ok, C.state} + @spec update(pid, C.connection(), integer, C.state()) :: {:ok, C.state()} def update(cons, conn, new_rdy, cons_state) do conn_info = ConnInfo.fetch(cons_state, ConnInfo.conn_id(conn)) cancel_outstanding_retry(cons_state, conn) # Cap the given RDY based on the connection config. - new_rdy = [new_rdy, conn_info.max_rdy] |> Enum.min |> round + new_rdy = [new_rdy, conn_info.max_rdy] |> Enum.min() |> round # Cap the given RDY based on how much we can actually assign. Unless it's # 0, in which case we'll be retrying. max_possible_rdy = calc_max_possible(cons_state, conn_info) - new_rdy = if max_possible_rdy > 0 do - [new_rdy, max_possible_rdy] |> Enum.min |> round - else - new_rdy - end + + new_rdy = + if max_possible_rdy > 0 do + [new_rdy, max_possible_rdy] |> Enum.min() |> round + else + new_rdy + end {:ok, cons_state} = if max_possible_rdy <= 0 && new_rdy > 0 do @@ -100,6 +105,7 @@ defmodule NSQ.Consumer.RDY do else transmit(conn, new_rdy, cons_state) end + {:ok, cons_state} end @@ -108,29 +114,29 @@ defmodule NSQ.Consumer.RDY do cons_state end - @doc """ Delay for a configured interval, then call RDY.update. """ - @spec retry(pid, C.connection, integer, C.state) :: {:ok, C.state} + @spec retry(pid, C.connection(), integer, C.state()) :: {:ok, C.state()} def retry(cons, conn, count, cons_state) do delay = cons_state.config.rdy_retry_delay - NSQ.Logger.debug("(#{inspect conn}) retry RDY in #{delay / 1000} seconds") + NSQ.Logger.debug("(#{inspect(conn)}) retry RDY in #{delay / 1000} seconds") + + {:ok, retry_pid} = + Task.start_link(fn -> + :timer.sleep(delay) + GenServer.call(cons, {:update_rdy, conn, count}) + end) - {:ok, retry_pid} = Task.start_link fn -> - :timer.sleep(delay) - GenServer.call(cons, {:update_rdy, conn, count}) - end ConnInfo.update(cons_state, ConnInfo.conn_id(conn), %{retry_rdy_pid: retry_pid}) {:ok, cons_state} end - @doc """ Send a RDY command for the given connection. """ - @spec transmit(C.connection, integer, C.state) :: {:ok, C.state} + @spec transmit(C.connection(), integer, C.state()) :: {:ok, C.state()} def transmit({_id, pid} = conn, count, cons_state) do [last_rdy] = ConnInfo.fetch(cons_state, ConnInfo.conn_id(conn), [:last_rdy]) @@ -145,35 +151,35 @@ defmodule NSQ.Consumer.RDY do end end - @doc """ This will only be triggered in odd cases where we're in backoff or when there are more connections than max in flight. It will randomly change RDY on some connections to 0 and 1 so that they're all guaranteed to eventually process messages. """ - @spec redistribute(pid, C.state) :: {:ok, C.state} + @spec redistribute(pid, C.state()) :: {:ok, C.state()} def redistribute(cons, cons_state) do if should_redistribute?(cons_state) do conns = Connections.get(cons_state) conn_count = length(conns) if conn_count > cons_state.max_in_flight do - NSQ.Logger.debug """ + NSQ.Logger.debug(""" redistributing RDY state (#{conn_count} conns > #{cons_state.max_in_flight} max_in_flight) - """ + """) end if cons_state.backoff_counter > 0 && conn_count > 1 do - NSQ.Logger.debug """ + NSQ.Logger.debug(""" redistributing RDY state (in backoff and #{conn_count} conns > 1) - """ + """) end # Free up any connections that are RDY but not processing messages. - Connections.idle_with_rdy(cons_state) |> Enum.map(fn(conn) -> - NSQ.Logger.debug("(#{inspect conn}) idle connection, giving up RDY") + Connections.idle_with_rdy(cons_state) + |> Enum.map(fn conn -> + NSQ.Logger.debug("(#{inspect(conn)}) idle connection, giving up RDY") {:ok, _cons_state} = update(cons, conn, 0, cons_state) end) @@ -195,29 +201,27 @@ defmodule NSQ.Consumer.RDY do cons_state end - # Helper for redistribute; we set RDY to 1 for _some_ connections that # were halted, until there's no more RDY left to assign. We assume that the # list of connections has already been sorted in the order that we should # distribute. - @spec distribute(pid, [C.connection], integer, C.state) :: - {:ok, C.state} + @spec distribute(pid, [C.connection()], integer, C.state()) :: + {:ok, C.state()} defp distribute(cons, possible_conns, available_max_in_flight, cons_state) do if length(possible_conns) == 0 || available_max_in_flight <= 0 do {:ok, cons_state} else - [conn|rest] = possible_conns - NSQ.Logger.debug("(#{inspect conn}) redistributing RDY") + [conn | rest] = possible_conns + NSQ.Logger.debug("(#{inspect(conn)}) redistributing RDY") {:ok, cons_state} = update(cons, conn, 1, cons_state) distribute(cons, rest, available_max_in_flight - 1, cons_state) end end - - @spec sort_conns_for_round_robin([C.connection], C.state) :: {[C.connection], C.state} + @spec sort_conns_for_round_robin([C.connection()], C.state()) :: {[C.connection()], C.state()} defp sort_conns_for_round_robin(conns, cons_state) do # We sort to ensure consistency of start_index across runs. - sorted_conns = conns |> Enum.sort_by(fn({conn_id, _}) -> conn_id end) + sorted_conns = conns |> Enum.sort_by(fn {conn_id, _} -> conn_id end) start_index = rem(cons_state.distribution_counter, length(sorted_conns)) # We want to start distributing from a specific index. This reorders the @@ -227,9 +231,9 @@ defmodule NSQ.Consumer.RDY do sorted_conns = sorted_conns |> Enum.split(start_index) - |> Tuple.to_list - |> Enum.reverse - |> Enum.concat + |> Tuple.to_list() + |> Enum.reverse() + |> Enum.concat() # By increasing distribution count, we ensure that the start_index will be # different next time this runs. If the number of connections does not @@ -240,15 +244,17 @@ defmodule NSQ.Consumer.RDY do } end - - @spec cancel_outstanding_retry(C.state, C.connection) :: any + @spec cancel_outstanding_retry(C.state(), C.connection()) :: any defp cancel_outstanding_retry(cons_state, conn) do conn_info = ConnInfo.fetch(cons_state, ConnInfo.conn_id(conn)) # If this is for a connection that's retrying, kill the timer and clean up. if retry_pid = conn_info.retry_rdy_pid do if Process.alive?(retry_pid) do - NSQ.Logger.debug("(#{inspect conn}) rdy retry pid #{inspect retry_pid} detected, killing") + NSQ.Logger.debug( + "(#{inspect(conn)}) rdy retry pid #{inspect(retry_pid)} detected, killing" + ) + Process.exit(retry_pid, :normal) end @@ -256,8 +262,7 @@ defmodule NSQ.Consumer.RDY do end end - - @spec calc_max_possible(C.state, map) :: integer + @spec calc_max_possible(C.state(), map) :: integer defp calc_max_possible(cons_state, conn_info) do rdy_count = conn_info.rdy_count max_in_flight = cons_state.max_in_flight @@ -265,26 +270,23 @@ defmodule NSQ.Consumer.RDY do max_in_flight - total_rdy + rdy_count end - - @spec should_redistribute?(C.state) :: boolean + @spec should_redistribute?(C.state()) :: boolean defp should_redistribute?(cons_state) do conn_count = Connections.count(cons_state) in_backoff = cons_state.backoff_counter > 0 in_backoff_timeout = cons_state.backoff_duration > 0 - !in_backoff_timeout - && conn_count > 0 - && ( - conn_count > cons_state.max_in_flight - || (in_backoff && conn_count > 1) - || cons_state.need_rdy_redistributed - ) + !in_backoff_timeout && + conn_count > 0 && + (conn_count > cons_state.max_in_flight || + (in_backoff && conn_count > 1) || + cons_state.need_rdy_redistributed) end - # Cap available max in flight based on current RDY/backoff status. defp get_available_max_in_flight(cons_state) do total_rdy = total_rdy_count(cons_state) + if cons_state.backoff_counter > 0 do # In backoff mode, we only ever want RDY=1 for the whole consumer. This # makes sure that available is only 1 if total_rdy is 0. diff --git a/lib/nsq/message.ex b/lib/nsq/message.ex index 0afa716..71dbf19 100644 --- a/lib/nsq/message.ex +++ b/lib/nsq/message.ex @@ -6,7 +6,6 @@ defmodule NSQ.Message do alias NSQ.Connection.Buffer use GenServer - # ------------------------------------------------------- # # Struct Definition # # ------------------------------------------------------- # @@ -23,7 +22,7 @@ defmodule NSQ.Message do :parent, :processing_pid, :event_manager_pid, - :msg_timeout, + :msg_timeout ] # ------------------------------------------------------- # @@ -33,14 +32,12 @@ defmodule NSQ.Message do {:ok, _pid} = GenServer.start_link(__MODULE__, message, opts) end - def init(message) do # Process the message asynchronously after init. GenServer.cast(self(), :process) {:ok, message} end - def handle_cast(:process, message) do # `process/1` will send the return value to `message.connection` as part of # its standard procedure. @@ -48,7 +45,6 @@ defmodule NSQ.Message do {:noreply, message} end - # ------------------------------------------------------- # # API Definitions # # ------------------------------------------------------- # @@ -61,7 +57,6 @@ defmodule NSQ.Message do Map.merge(%NSQ.Message{}, message) end - @doc """ This is the main entry point when processing a message. It starts the message GenServer and immediately kicks of a processing call. @@ -70,9 +65,12 @@ defmodule NSQ.Message do # Kick off processing in a separate process, so we can kill it if it takes # too long. message = %{message | parent: self()} - {:ok, pid} = Task.start_link fn -> - process_without_timeout(message) - end + + {:ok, pid} = + Task.start_link(fn -> + process_without_timeout(message) + end) + message = %{message | processing_pid: pid} {:ok, ret_val} = wait_for_msg_done(message) @@ -87,20 +85,18 @@ defmodule NSQ.Message do Process.exit(self(), :normal) end - @doc """ Tells NSQD that we're done processing this message. This is called automatically when the handler returns successfully, or when all retries have been exhausted. """ def fin(message) do - NSQ.Logger.debug("(#{inspect message.connection}) fin msg ID #{message.id}") + NSQ.Logger.debug("(#{inspect(message.connection)}) fin msg ID #{message.id}") message |> Buffer.send!(encode({:fin, message.id})) :gen_event.notify(message.event_manager_pid, {:message_finished, message}) GenServer.call(message.consumer, {:start_stop_continue_backoff, :resume}) end - @doc """ Tells NSQD to requeue the message, with delay and backoff. According to the go-nsq client (but doc'ed nowhere), a delay of -1 is a special value that @@ -111,19 +107,31 @@ defmodule NSQ.Message do def req(message, delay \\ -1, backoff \\ false) do delay = if delay == -1 do - delay = calculate_delay( - message.attempts, message.config.max_requeue_delay + delay = + calculate_delay( + message.attempts, + message.config.max_requeue_delay + ) + + NSQ.Logger.debug( + "(#{inspect(message.connection)}) requeue msg ID #{message.id}, delay #{delay} (auto-calculated with attempts #{message.attempts}), backoff #{backoff}" ) - NSQ.Logger.debug("(#{inspect message.connection}) requeue msg ID #{message.id}, delay #{delay} (auto-calculated with attempts #{message.attempts}), backoff #{backoff}") + delay else - NSQ.Logger.debug("(#{inspect message.connection}) requeue msg ID #{message.id}, delay #{delay}, backoff #{backoff}") + NSQ.Logger.debug( + "(#{inspect(message.connection)}) requeue msg ID #{message.id}, delay #{delay}, backoff #{backoff}" + ) + delay end delay = if delay > message.config.max_requeue_delay do - NSQ.Logger.warn "Invalid requeue delay #{delay}. Must be between 0 and #{message.config.max_requeue_delay}. Sending with max delay #{message.config.max_requeue_delay} instead." + NSQ.Logger.warn( + "Invalid requeue delay #{delay}. Must be between 0 and #{message.config.max_requeue_delay}. Sending with max delay #{message.config.max_requeue_delay} instead." + ) + message.config.max_requeue_delay else delay @@ -139,19 +147,17 @@ defmodule NSQ.Message do :gen_event.notify(message.event_manager_pid, {:message_requeued, message}) end - @doc """ This function is intended to be used by the handler for long-running functions. They can set up a separate process that periodically touches the message until the process finishes. """ def touch(message) do - NSQ.Logger.debug("(#{inspect message.connection}) touch msg ID #{message.id}") + NSQ.Logger.debug("(#{inspect(message.connection)}) touch msg ID #{message.id}") message |> Buffer.send!(encode({:touch, message.id})) send(message.parent, {:message_touch, message}) end - # ------------------------------------------------------- # # Private Functions # # ------------------------------------------------------- # @@ -163,16 +169,14 @@ defmodule NSQ.Message do run_handler_safely(message) |> respond_to_nsq(message) end - send message.parent, {:message_done, message, ret_val} + send(message.parent, {:message_done, message, ret_val}) end - defp should_fail_message?(message) do message.config.max_attempts > 0 && message.attempts > message.config.max_attempts end - # Handler can be either an anonymous function or a module that implements the # `handle_message\2` function. defp run_handler(handler, message) do @@ -183,48 +187,60 @@ defmodule NSQ.Message do end end - defp run_handler_safely(message) do Process.flag(:trap_exit, true) + try do result = run_handler(message.config.message_handler, message) Process.flag(:trap_exit, false) result rescue e -> - NSQ.Logger.error "Error running message handler: #{inspect e}" - NSQ.Logger.error inspect __STACKTRACE__ + NSQ.Logger.error("Error running message handler: #{inspect(e)}") + NSQ.Logger.error(inspect(__STACKTRACE__)) {:req, -1, true} catch :exit, b -> - NSQ.Logger.error "Caught exit running message handler: :exit, #{inspect b}" - NSQ.Logger.error inspect __STACKTRACE__ + NSQ.Logger.error("Caught exit running message handler: :exit, #{inspect(b)}") + NSQ.Logger.error(inspect(__STACKTRACE__)) {:req, -1, true} + a, b -> - NSQ.Logger.error "Caught exception running message handler: #{inspect a}, #{inspect b}" - NSQ.Logger.error inspect __STACKTRACE__ + NSQ.Logger.error("Caught exception running message handler: #{inspect(a)}, #{inspect(b)}") + NSQ.Logger.error(inspect(__STACKTRACE__)) {:req, -1, true} end end - defp respond_to_nsq(ret_val, message) do case ret_val do - :ok -> fin(message) + :ok -> + fin(message) + :fail -> NSQ.Logger.warn("msg #{message.id} attempted #{message.attempts} times, giving up") fin(message) - :req -> req(message) - {:req, delay} -> req(message, delay) - {:req, delay, backoff} -> req(message, delay, backoff) + + :req -> + req(message) + + {:req, delay} -> + req(message, delay) + + {:req, delay, backoff} -> + req(message, delay, backoff) + _ -> - NSQ.Logger.error "Unexpected handler result #{inspect ret_val}, requeueing message #{message.id}" + NSQ.Logger.error( + "Unexpected handler result #{inspect(ret_val)}, requeueing message #{message.id}" + ) + req(message) end + ret_val end - # We expect our function will send us a message when it's done. Block until # that happens. If it takes too long, requeue the message and cancel # processing. @@ -232,8 +248,9 @@ defmodule NSQ.Message do receive do {:message_done, _msg, ret_val} -> {:ok, ret_val} + {:message_touch, _msg} -> - NSQ.Logger.debug "Msg #{message.id} received TOUCH, starting a new wait..." + NSQ.Logger.debug("Msg #{message.id} received TOUCH, starting a new wait...") # If NSQ.Message.touch(msg) is called, we will send TOUCH msg_id to # NSQD, but we also need to reset our timeout on the client to avoid # processes that hang forever. @@ -242,26 +259,29 @@ defmodule NSQ.Message do message.msg_timeout -> # If we've waited this long, we can assume NSQD will requeue the # message on its own. - NSQ.Logger.warn "Msg #{message.id} timed out, quit processing it and expect nsqd to requeue" + NSQ.Logger.warn( + "Msg #{message.id} timed out, quit processing it and expect nsqd to requeue" + ) + :gen_event.notify(message.event_manager_pid, {:message_requeued, message}) unlink_and_exit(message.parent) {:ok, :req} end end - defp unlink_and_exit(pid) do Process.unlink(pid) Process.exit(pid, :kill) end - defp calculate_delay(attempts, max_requeue_delay) do exponential_backoff = :math.pow(2, attempts) * 1000 - jitter = round(0.3 * :rand.uniform * exponential_backoff) + jitter = round(0.3 * :rand.uniform() * exponential_backoff) + min( exponential_backoff + jitter, max_requeue_delay - ) |> round + ) + |> round end end diff --git a/lib/nsq/producer.ex b/lib/nsq/producer.ex index 783a918..14f6716 100644 --- a/lib/nsq/producer.ex +++ b/lib/nsq/producer.ex @@ -46,7 +46,6 @@ defmodule NSQ.Producer do import NSQ.Protocol use GenServer - # ------------------------------------------------------- # # Module Attributes # # ------------------------------------------------------- # @@ -59,34 +58,32 @@ defmodule NSQ.Producer do conn_info_pid: nil } - # ------------------------------------------------------- # # Type Definitions # # ------------------------------------------------------- # @typedoc """ A tuple with a host and a port. """ - @type host_with_port :: {String.t, integer} + @type host_with_port :: {String.t(), integer} @typedoc """ A tuple with a string ID (used to target the connection in NSQ.Connection.Supervisor) and a PID of the connection. """ - @type connection :: {String.t, pid} + @type connection :: {String.t(), pid} @typedoc """ A map, but we can be more specific by asserting some entries that should be set for a connection's state map. """ - @type pro_state :: %{conn_sup_pid: pid, config: NSQ.Config.t} - + @type pro_state :: %{conn_sup_pid: pid, config: NSQ.Config.t()} # ------------------------------------------------------- # # Behaviour Implementation # # ------------------------------------------------------- # @spec init(pro_state) :: {:ok, pro_state} def init(pro_state) do - {:ok, conn_sup_pid} = NSQ.Connection.Supervisor.start_link + {:ok, conn_sup_pid} = NSQ.Connection.Supervisor.start_link() pro_state = %{pro_state | conn_sup_pid: conn_sup_pid} {:ok, conn_info_pid} = Agent.start_link(fn -> %{} end) @@ -96,97 +93,92 @@ defmodule NSQ.Producer do if pro_state.config.event_manager do pro_state.config.event_manager else - {:ok, manager} = :gen_event.start_link + {:ok, manager} = :gen_event.start_link() manager end + pro_state = %{pro_state | event_manager_pid: manager} {:ok, _pro_state} = connect_to_nsqds(pro_state.config.nsqds, self(), pro_state) end - @spec handle_call({:pub, binary}, any, pro_state) :: - {:reply, {:ok, binary}, pro_state} + {:reply, {:ok, binary}, pro_state} def handle_call({:pub, data}, _from, pro_state) do do_pub(pro_state.topic, data, pro_state) end - @spec handle_call({:pub, binary, binary}, any, pro_state) :: - {:reply, {:ok, binary}, pro_state} + {:reply, {:ok, binary}, pro_state} def handle_call({:pub, topic, data}, _from, pro_state) do do_pub(topic, data, pro_state) end - @spec handle_call({:mpub, binary}, any, pro_state) :: - {:reply, {:ok, binary}, pro_state} + {:reply, {:ok, binary}, pro_state} def handle_call({:mpub, data}, _from, pro_state) do do_mpub(pro_state.topic, data, pro_state) end - @spec handle_call({:mpub, binary, binary}, any, pro_state) :: - {:reply, {:ok, binary}, pro_state} + {:reply, {:ok, binary}, pro_state} def handle_call({:mpub, topic, data}, _from, pro_state) do do_mpub(topic, data, pro_state) end - @spec handle_call(:state, any, pro_state) :: {:reply, pro_state, pro_state} def handle_call(:state, _from, state) do {:reply, state, state} end - # ------------------------------------------------------- # # API Definitions # # ------------------------------------------------------- # - @spec start_link(binary, NSQ.Config.t, GenServer.options) :: {:ok, pid} + @spec start_link(binary, NSQ.Config.t(), GenServer.options()) :: {:ok, pid} def start_link(topic, config, genserver_options \\ []) do {:ok, config} = NSQ.Config.validate(config || %NSQ.Config{}) {:ok, config} = NSQ.Config.normalize(config) - unless is_valid_topic_name?(topic), do: raise "Invalid topic name #{topic}" + unless is_valid_topic_name?(topic), do: raise("Invalid topic name #{topic}") state = %{@initial_state | topic: topic, config: config} GenServer.start_link(__MODULE__, state, genserver_options) end - @spec get_connections(pro_state) :: [connection] def get_connections(pro_state) when is_map(pro_state) do children = Supervisor.which_children(pro_state.conn_sup_pid) - Enum.map children, fn({child_id, pid, _, _}) -> {child_id, pid} end + Enum.map(children, fn {child_id, pid, _, _} -> {child_id, pid} end) end - @spec get_connections(pid, pro_state) :: [connection] def get_connections(pro, pro_state \\ nil) when is_pid(pro) do pro_state = pro_state || get_state(pro) get_connections(pro_state) end - @spec random_connection_pid(pro_state) :: pid def random_connection_pid(pro_state) do {_child_id, pid} = Enum.random(get_connections(pro_state)) pid end - @doc """ Create supervised connections to NSQD. """ @spec connect_to_nsqds([host_with_port], pid, pro_state) :: {:ok, pro_state} def connect_to_nsqds(nsqds, pro, pro_state) do - Enum.map nsqds, fn(nsqd) -> - {:ok, _conn} = NSQ.Connection.Supervisor.start_child( - pro, nsqd, pro_state, [restart: :permanent] - ) - end + Enum.map(nsqds, fn nsqd -> + {:ok, _conn} = + NSQ.Connection.Supervisor.start_child( + pro, + nsqd, + pro_state, + restart: :permanent + ) + end) + {:ok, pro_state} end - @doc """ Get the current state of a producer. Used in tests. Not for external use. """ @@ -195,7 +187,6 @@ defmodule NSQ.Producer do GenServer.call(producer, :state) end - @doc """ Publish data to whatever topic is the default. """ @@ -204,7 +195,6 @@ defmodule NSQ.Producer do {:ok, _resp} = GenServer.call(get(sup_pid), {:pub, data}) end - @doc """ Publish data to a specific topic. """ @@ -213,7 +203,6 @@ defmodule NSQ.Producer do {:ok, _resp} = GenServer.call(get(sup_pid), {:pub, topic, data}) end - @doc """ Publish data to whatever topic is the default. """ @@ -222,7 +211,6 @@ defmodule NSQ.Producer do GenServer.call(get(sup_pid), {:mpub, data}) end - @doc """ Publish data to a specific topic. """ @@ -231,37 +219,40 @@ defmodule NSQ.Producer do {:ok, _resp} = GenServer.call(get(sup_pid), {:mpub, topic, data}) end - @doc """ The end-user will be targeting the supervisor, but it's the producer that can actually handle the command. """ @spec get(pid) :: pid def get(sup_pid) do - child = Supervisor.which_children(sup_pid) - |> Enum.find(fn({kind, _, _, _}) -> kind == NSQ.Producer end) + child = + Supervisor.which_children(sup_pid) + |> Enum.find(fn {kind, _, _, _} -> kind == NSQ.Producer end) + {_, pid, _, _} = child pid end - # ------------------------------------------------------- # # Private Functions # # ------------------------------------------------------- # # Used to DRY up handle_call({:pub, ...). @spec do_pub(binary, binary, pro_state) :: {:reply, {:ok, binary}, pro_state} defp do_pub(topic, data, pro_state) do - {:ok, resp} = random_connection_pid(pro_state) + {:ok, resp} = + random_connection_pid(pro_state) |> NSQ.Connection.cmd({:pub, topic, data}) + {:reply, {:ok, resp}, pro_state} end - # Used to DRY up handle_call({:mpub, ...). @spec do_mpub(binary, binary, pro_state) :: {:reply, {:ok, binary}, pro_state} defp do_mpub(topic, data, pro_state) do - {:ok, resp} = random_connection_pid(pro_state) + {:ok, resp} = + random_connection_pid(pro_state) |> NSQ.Connection.cmd({:mpub, topic, data}) + {:reply, {:ok, resp}, pro_state} end end diff --git a/lib/nsq/protocol.ex b/lib/nsq/protocol.ex index 5d18ed6..21e476a 100644 --- a/lib/nsq/protocol.ex +++ b/lib/nsq/protocol.ex @@ -17,6 +17,7 @@ defmodule NSQ.Protocol do {:identify, options} -> json = Jason.encode!(options) + "IDENTIFY\n" <> <> <> json {:auth, secret_key} -> diff --git a/test/consumer_test.exs b/test/consumer_test.exs index b4ab055..d3757df 100644 --- a/test/consumer_test.exs +++ b/test/consumer_test.exs @@ -7,7 +7,7 @@ defmodule NSQ.ConsumerTest do end def handle_event(event, parent) do - send parent, event + send(parent, event) {:ok, parent} end @@ -39,50 +39,56 @@ defmodule NSQ.ConsumerTest do test "msg_timeout" do test_pid = self() - {:ok, consumer} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - nsqds: [{"127.0.0.1", 6750}], - msg_timeout: 1000, - message_handler: fn(body, _msg) -> - if body == "too_slow" do - :timer.sleep(1500) - send(test_pid, :handled) - else - send(test_pid, :handled) + + {:ok, consumer} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + nsqds: [{"127.0.0.1", 6750}], + msg_timeout: 1000, + message_handler: fn body, _msg -> + if body == "too_slow" do + :timer.sleep(1500) + send(test_pid, :handled) + else + send(test_pid, :handled) + end + + :ok end - :ok - end - }) + }) NSQ.Consumer.event_manager(consumer) - |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) + |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "hello"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "hello") assert_receive {:message_finished, _}, 2000 - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "too_slow"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "too_slow") assert_receive {:message_requeued, _}, 2000 end test "NSQ.Message.touch extends timeout" do test_pid = self() - {:ok, consumer} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - nsqds: [{"127.0.0.1", 6750}], - msg_timeout: 1000, - message_handler: fn(_body, msg) -> - Task.start_link fn -> - :timer.sleep 900 - NSQ.Message.touch(msg) + + {:ok, consumer} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + nsqds: [{"127.0.0.1", 6750}], + msg_timeout: 1000, + message_handler: fn _body, msg -> + Task.start_link(fn -> + :timer.sleep(900) + NSQ.Message.touch(msg) + end) + + :timer.sleep(1500) + send(test_pid, :handled) + :ok end - :timer.sleep(1500) - send(test_pid, :handled) - :ok - end - }) + }) NSQ.Consumer.event_manager(consumer) - |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) + |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "hello"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "hello") # Without touch, this message would fail after 1 second. So we test that # it takes longer than 1 second but succeeds. @@ -93,25 +99,27 @@ defmodule NSQ.ConsumerTest do test "we don't go over max_in_flight, and keep processing after saturation" do test_pid = self() - {:ok, consumer} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - nsqds: [{"127.0.0.1", 6750}, {"127.0.0.1", 6760}], - max_in_flight: 4, - message_handler: fn(_body, _msg) -> - send(test_pid, :handled) - :timer.sleep(300) - :ok - end - }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "hello"]) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "hello"]) - HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", [body: "hello"]) - HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", [body: "hello"]) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "hello"]) - HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", [body: "hello"]) + {:ok, consumer} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + nsqds: [{"127.0.0.1", 6750}, {"127.0.0.1", 6760}], + max_in_flight: 4, + message_handler: fn _body, _msg -> + send(test_pid, :handled) + :timer.sleep(300) + :ok + end + }) + + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "hello") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "hello") + HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", body: "hello") + HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", body: "hello") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "hello") + HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", body: "hello") :timer.sleep(100) - [info1, info2] = NSQ.Consumer.conn_info(consumer) |> Map.values + [info1, info2] = NSQ.Consumer.conn_info(consumer) |> Map.values() assert info1.messages_in_flight + info2.messages_in_flight == 4 assert_receive :handled, 2000 @@ -122,51 +130,58 @@ defmodule NSQ.ConsumerTest do assert_receive :handled, 2000 :timer.sleep(1000) - [_info1, _info2] = NSQ.Consumer.conn_info(consumer) |> Map.values + [_info1, _info2] = NSQ.Consumer.conn_info(consumer) |> Map.values() end test "closing the connection waits for outstanding messages and cleanly exits" do test_pid = self() - {:ok, consumer} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - nsqds: [{"127.0.0.1", 6750}], - message_handler: fn(body, _msg) -> - case body do - "slow" -> - :timer.sleep(1000) - "medium" -> - :timer.sleep(500) - "fast" -> - send(test_pid, :handled) + + {:ok, consumer} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + nsqds: [{"127.0.0.1", 6750}], + message_handler: fn body, _msg -> + case body do + "slow" -> + :timer.sleep(1000) + + "medium" -> + :timer.sleep(500) + + "fast" -> + send(test_pid, :handled) + end + + :ok end - :ok - end - }) + }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "fast"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "fast") assert_receive(:handled, 2000) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "slow"]) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "medium"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "slow") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "medium") NSQ.Consumer.close(consumer) :timer.sleep(50) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "fast"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "fast") refute_receive(:handled, 2000) end test "notifies the event manager of relevant events" do test_pid = self() - {:ok, consumer} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - nsqds: [{"127.0.0.1", 6750}], - message_handler: fn(_body, _msg) -> - send(test_pid, :handled) - :ok - end - }) + + {:ok, consumer} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + nsqds: [{"127.0.0.1", 6750}], + message_handler: fn _body, _msg -> + send(test_pid, :handled) + :ok + end + }) NSQ.Consumer.event_manager(consumer) - |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) + |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) assert_receive({:message, %NSQ.Message{}}, 2000) @@ -174,40 +189,42 @@ defmodule NSQ.ConsumerTest do end test "updating connection stats" do - {:ok, consumer} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - lookupd_poll_interval: 500, - nsqds: ["127.0.0.1:6750"], - message_handler: fn(body, _msg) -> - :timer.sleep(1000) - case body do - "ok" -> :ok - "req" -> :req - "req2000" -> {:req, 2000} - "backoff" -> {:req, 2000, true} - "fail" -> :fail + {:ok, consumer} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + lookupd_poll_interval: 500, + nsqds: ["127.0.0.1:6750"], + message_handler: fn body, _msg -> + :timer.sleep(1000) + + case body do + "ok" -> :ok + "req" -> :req + "req2000" -> {:req, 2000} + "backoff" -> {:req, 2000, true} + "fail" -> :fail + end end - end - }) + }) NSQ.Consumer.event_manager(consumer) - |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) + |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) - [info] = NSQ.Consumer.conn_info(consumer) |> Map.values + [info] = NSQ.Consumer.conn_info(consumer) |> Map.values() previous_timestamp = info.last_msg_timestamp :timer.sleep(1000) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "ok"]) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "req"]) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "req2000"]) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "fail"]) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "backoff"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "ok") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "req") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "req2000") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "fail") + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "backoff") assert_receive({:message, _}, 2000) assert_receive({:message, _}, 2000) assert_receive({:message, _}, 2000) assert_receive({:message, _}, 2000) assert_receive({:message, _}, 2000) - [info] = NSQ.Consumer.conn_info(consumer) |> Map.values + [info] = NSQ.Consumer.conn_info(consumer) |> Map.values() assert info.messages_in_flight == 5 assert_receive({:message_finished, _}, 2000) @@ -217,7 +234,7 @@ defmodule NSQ.ConsumerTest do assert_receive({:message_requeued, _}, 2000) :timer.sleep(50) - [info] = NSQ.Consumer.conn_info(consumer) |> Map.values + [info] = NSQ.Consumer.conn_info(consumer) |> Map.values() assert info.messages_in_flight == 0 assert info.requeued_count == 3 assert info.finished_count == 1 @@ -227,17 +244,19 @@ defmodule NSQ.ConsumerTest do test "a connection is terminated, cleaned up, and restarted when the tcp connection closes" do test_pid = self() - {:ok, cons_sup_pid} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - lookupd_poll_interval: 500, - nsqds: [{"127.0.0.1", 6750}], - message_handler: fn(_body, _msg) -> - send(test_pid, :handled) - :ok - end - }) + + {:ok, cons_sup_pid} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + lookupd_poll_interval: 500, + nsqds: [{"127.0.0.1", 6750}], + message_handler: fn _body, _msg -> + send(test_pid, :handled) + :ok + end + }) # Send a message so we can be sure the connection is up and working first. - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) # Abruptly close the connection @@ -245,7 +264,7 @@ defmodule NSQ.ConsumerTest do [conn1] = Connections.get(cons) conn_state = Conn.get_state(conn1) - NSQ.Logger.warn "Closing socket as part of test..." + NSQ.Logger.warn("Closing socket as part of test...") Socket.Stream.close(conn_state.socket) # Normally dead connections hang around until the next discovery loop run, @@ -262,15 +281,16 @@ defmodule NSQ.ConsumerTest do assert conn1 != conn2 # Send another message so we can verify the new connection is working. - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) end test "establishes a connection to NSQ and processes messages" do test_pid = self() + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ nsqds: [{"127.0.0.1", 6750}], - message_handler: fn(body, msg) -> + message_handler: fn body, msg -> assert body == "HTTP message" assert msg.attempts == 1 send(test_pid, :handled) @@ -278,28 +298,30 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) end test "discovery via nsqlookupd" do test_pid = self() - {:ok, _} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - lookupd_poll_interval: 500, - nsqlookupds: ["127.0.0.1:6771", "127.0.0.1:6781"], - message_handler: fn(body, msg) -> - assert body == "HTTP message" - assert msg.attempts == 1 - send(test_pid, :handled) - :ok - end - }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) - HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", [body: "HTTP message"]) + {:ok, _} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + lookupd_poll_interval: 500, + nsqlookupds: ["127.0.0.1:6771", "127.0.0.1:6781"], + message_handler: fn body, msg -> + assert body == "HTTP message" + assert msg.attempts == 1 + send(test_pid, :handled) + :ok + end + }) + + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + HTTP.post("http://127.0.0.1:6761/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) assert_receive(:handled, 2000) @@ -308,34 +330,39 @@ defmodule NSQ.ConsumerTest do test "#start_link lives when given a bad address and not able to reconnect" do test_pid = self() Process.flag(:trap_exit, true) - {:ok, consumer} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - nsqds: [{"127.0.0.1", 7777}], - max_reconnect_attempts: 0, - message_handler: fn(body, msg) -> - assert body == "HTTP message" - assert msg.attempts == 1 - send(test_pid, :handled) - :ok - end - }) + + {:ok, consumer} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + nsqds: [{"127.0.0.1", 7777}], + max_reconnect_attempts: 0, + message_handler: fn body, msg -> + assert body == "HTTP message" + assert msg.attempts == 1 + send(test_pid, :handled) + :ok + end + }) + GenServer.call(NSQ.Consumer.get(consumer), :delete_dead_connections) - [] = NSQ.Consumer.conn_info(consumer) |> Map.values + [] = NSQ.Consumer.conn_info(consumer) |> Map.values() end - test "#start_link lives when given a bad address but able to reconnect" do test_pid = self() - {:ok, consumer} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - nsqds: [{"127.0.0.1", 7777}], - max_reconnect_attempts: 2, - lookupd_poll_interval: 500, - message_handler: fn(body, msg) -> - assert body == "HTTP message" - assert msg.attempts == 1 - send(test_pid, :handled) - :ok - end - }) + + {:ok, consumer} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + nsqds: [{"127.0.0.1", 7777}], + max_reconnect_attempts: 2, + lookupd_poll_interval: 500, + message_handler: fn body, msg -> + assert body == "HTTP message" + assert msg.attempts == 1 + send(test_pid, :handled) + :ok + end + }) + [conn] = Connections.get(NSQ.Consumer.get(consumer)) conn_state = NSQ.Connection.get_state(conn) assert conn_state.connect_attempts == 1 @@ -344,19 +371,22 @@ defmodule NSQ.ConsumerTest do assert conn_state.connect_attempts == 2 end - test "receives messages from mpub" do test_pid = self() + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ nsqds: [{"127.0.0.1", 6750}], - message_handler: fn(body, _msg) -> + message_handler: fn body, _msg -> assert body == "mpubtest" send(test_pid, :handled) :ok end }) - HTTP.post("http://127.0.0.1:6751/mpub?topic=#{@test_topic}", [body: "mpubtest\nmpubtest\nmpubtest"]) + HTTP.post("http://127.0.0.1:6751/mpub?topic=#{@test_topic}", + body: "mpubtest\nmpubtest\nmpubtest" + ) + assert_receive(:handled, 2000) assert_receive(:handled, 2000) assert_receive(:handled, 2000) @@ -371,18 +401,19 @@ defmodule NSQ.ConsumerTest do test "processes many messages concurrently" do test_pid = self() + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ nsqds: [{"127.0.0.1", 6750}], - message_handler: fn(_body, _msg) -> + message_handler: fn _body, _msg -> :timer.sleep(1000) send(test_pid, :handled) :ok end }) - Enum.map 1..1000, fn(_i) -> - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) - end + Enum.map(1..1000, fn _i -> + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") + end) assert_receive_n_times(:handled, 1000, 2000) end @@ -392,30 +423,36 @@ defmodule NSQ.ConsumerTest do # chance to measure the RDY count for each connection. test "when a message raises an exception, goes through the backoff process" do {:ok, run_counter} = Agent.start_link(fn -> 0 end) - {:ok, sup_pid} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - backoff_strategy: :test, # fixed 200ms for testing - max_in_flight: 100, - nsqds: [{"127.0.0.1", 6750}, {"127.0.0.1", 6760}], - - # This handler should always requeue with backoff twice, then succeed. - # This will let us test the full backoff flow: start, continue, and - # resume. - message_handler: fn(_body, _msg) -> - Agent.update(run_counter, fn(count) -> count + 1 end) - run_count = Agent.get(run_counter, fn(count) -> count end) - cond do - run_count == 1 -> - :ok - run_count < 4 -> - {:req, 500, true} - true -> - :ok + + {:ok, sup_pid} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + # fixed 200ms for testing + backoff_strategy: :test, + max_in_flight: 100, + nsqds: [{"127.0.0.1", 6750}, {"127.0.0.1", 6760}], + + # This handler should always requeue with backoff twice, then succeed. + # This will let us test the full backoff flow: start, continue, and + # resume. + message_handler: fn _body, _msg -> + Agent.update(run_counter, fn count -> count + 1 end) + run_count = Agent.get(run_counter, fn count -> count end) + + cond do + run_count == 1 -> + :ok + + run_count < 4 -> + {:req, 500, true} + + true -> + :ok + end end - end - }) + }) NSQ.Consumer.event_manager(sup_pid) - |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) + |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) consumer = Cons.get(sup_pid) cons_state = Cons.get_state(consumer) @@ -431,11 +468,11 @@ defmodule NSQ.ConsumerTest do # Send one successful message through so our subsequent timing is more # predictable. - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive({:message_finished, _}, 5000) # Our message handler enters into backoff mode and requeues the message. - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive({:message_requeued, _}, 2000) assert_receive(:backoff, 1000) @@ -452,10 +489,15 @@ defmodule NSQ.ConsumerTest do # the message we requeued again. :timer.sleep(250) cons_state = Cons.get_state(consumer) - assert 1 == ConnInfo.fetch(cons_state, conn1, :rdy_count) + - ConnInfo.fetch(cons_state, conn2, :rdy_count) - assert 1 == ConnInfo.fetch(cons_state, conn1, :last_rdy) + - ConnInfo.fetch(cons_state, conn2, :last_rdy) + + assert 1 == + ConnInfo.fetch(cons_state, conn1, :rdy_count) + + ConnInfo.fetch(cons_state, conn2, :rdy_count) + + assert 1 == + ConnInfo.fetch(cons_state, conn1, :last_rdy) + + ConnInfo.fetch(cons_state, conn2, :last_rdy) + assert H.total_rdy_count(cons_state) == 1 assert_receive({:message_requeued, _}, 5000) assert_receive(:backoff, 100) @@ -469,10 +511,15 @@ defmodule NSQ.ConsumerTest do # Then we'll go into "test the waters mode" again in 200ms. :timer.sleep(250) cons_state = Cons.get_state(consumer) - assert 1 == ConnInfo.fetch(cons_state, conn1, :rdy_count) + - ConnInfo.fetch(cons_state, conn2, :rdy_count) - assert 1 == ConnInfo.fetch(cons_state, conn1, :last_rdy) + - ConnInfo.fetch(cons_state, conn2, :last_rdy) + + assert 1 == + ConnInfo.fetch(cons_state, conn1, :rdy_count) + + ConnInfo.fetch(cons_state, conn2, :rdy_count) + + assert 1 == + ConnInfo.fetch(cons_state, conn1, :last_rdy) + + ConnInfo.fetch(cons_state, conn2, :last_rdy) + assert H.total_rdy_count(cons_state) == 1 # After the message handler runs successfully, it decrements the @@ -481,7 +528,7 @@ defmodule NSQ.ConsumerTest do assert_receive({:message_finished, _}, 2000) # Send a successful message and leave backoff mode! (I hope!) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive({:message_finished, _}, 2000) assert_receive(:resume, 100) cons_state = Cons.get_state(consumer) @@ -496,19 +543,22 @@ defmodule NSQ.ConsumerTest do # max_in_flight so it succeeds next time. test_pid = self() - {:ok, cons_sup_pid} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - nsqds: [{"127.0.0.1", 6750}], - max_in_flight: 0, - rdy_retry_delay: 300, - message_handler: fn(_body, _msg) -> - send(test_pid, :handled) - :ok - end - }) + + {:ok, cons_sup_pid} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + nsqds: [{"127.0.0.1", 6750}], + max_in_flight: 0, + rdy_retry_delay: 300, + message_handler: fn _body, _msg -> + send(test_pid, :handled) + :ok + end + }) + cons = Cons.get(cons_sup_pid) [conn] = Connections.get(cons) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") refute_receive :handled, 500 cons_state = Cons.get_state(cons) assert ConnInfo.fetch(cons_state, conn, :retry_rdy_pid) == nil @@ -526,42 +576,53 @@ defmodule NSQ.ConsumerTest do test "rdy redistribution when number of connections > max in flight" do test_pid = self() - {:ok, cons_sup_pid} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - nsqds: [{"127.0.0.1", 6750}, {"127.0.0.1", 6760}], - rdy_redistribute_interval: 100, - low_rdy_idle_timeout: 1000, - max_in_flight: 1, - message_handler: fn(_body, _msg) -> - send(test_pid, :handled) - :ok - end - }) + + {:ok, cons_sup_pid} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + nsqds: [{"127.0.0.1", 6750}, {"127.0.0.1", 6760}], + rdy_redistribute_interval: 100, + low_rdy_idle_timeout: 1000, + max_in_flight: 1, + message_handler: fn _body, _msg -> + send(test_pid, :handled) + :ok + end + }) + cons = Cons.get(cons_sup_pid) cons_state = Cons.get_state(cons) [conn1, conn2] = Connections.get(cons) assert H.total_rdy_count(cons_state) == 1 - assert 1 == ConnInfo.fetch(cons_state, conn1, :rdy_count) + - ConnInfo.fetch(cons_state, conn2, :rdy_count) + + assert 1 == + ConnInfo.fetch(cons_state, conn1, :rdy_count) + + ConnInfo.fetch(cons_state, conn2, :rdy_count) + :timer.sleep(1500) - IO.puts "Letting RDY redistribute 10 times..." - {conn1_rdy, conn2_rdy} = Enum.reduce 1..10, {0, 0}, fn(i, {rdy1, rdy2}) -> - :timer.sleep(100) - result = { - rdy1 + ConnInfo.fetch(cons_state, conn1, :rdy_count), - rdy2 + ConnInfo.fetch(cons_state, conn2, :rdy_count) - } - IO.puts "#{i}: #{inspect result}" - result - end + IO.puts("Letting RDY redistribute 10 times...") + + {conn1_rdy, conn2_rdy} = + Enum.reduce(1..10, {0, 0}, fn i, {rdy1, rdy2} -> + :timer.sleep(100) - IO.puts "Distribution: #{conn1_rdy} : #{conn2_rdy}" + result = { + rdy1 + ConnInfo.fetch(cons_state, conn1, :rdy_count), + rdy2 + ConnInfo.fetch(cons_state, conn2, :rdy_count) + } + + IO.puts("#{i}: #{inspect(result)}") + result + end) + + IO.puts("Distribution: #{conn1_rdy} : #{conn2_rdy}") assert conn1_rdy == 5 assert conn2_rdy == 5 end test "works with tls" do test_pid = self() + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ nsqds: [{"127.0.0.1", 6750}], tls_v1: true, @@ -570,7 +631,7 @@ defmodule NSQ.ConsumerTest do tls_key: "#{__DIR__}/ssl_keys/elixir_nsq.key", tls_min_version: :tlsv1, max_reconnect_attempts: 0, - message_handler: fn(body, msg) -> + message_handler: fn body, msg -> assert body == "HTTP message" assert msg.attempts == 1 send(test_pid, :handled) @@ -578,16 +639,17 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) end unless System.get_env("CI") == "true" do test "fails when tls_insecure_skip_verify is false" do test_pid = self() + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ nsqds: [{"127.0.0.1", 6750}], tls_v1: true, @@ -595,7 +657,7 @@ defmodule NSQ.ConsumerTest do tls_cert: "#{__DIR__}/ssl_keys/elixir_nsq.crt", tls_key: "#{__DIR__}/ssl_keys/elixir_nsq.key", max_reconnect_attempts: 0, - message_handler: fn(body, msg) -> + message_handler: fn body, msg -> assert body == "HTTP message" assert msg.attempts == 1 send(test_pid, :handled) @@ -603,50 +665,52 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") refute_receive(:handled, 2000) end end test "starved" do - {:ok, consumer} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - nsqds: [{"127.0.0.1", 6750}], - max_in_flight: 2, - message_handler: fn(_, _) -> - :timer.sleep(1000) - :ok - end - }) + {:ok, consumer} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + nsqds: [{"127.0.0.1", 6750}], + max_in_flight: 2, + message_handler: fn _, _ -> + :timer.sleep(1000) + :ok + end + }) NSQ.Consumer.event_manager(consumer) - |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) + |> :gen_event.add_handler(NSQ.ConsumerTest.EventForwarder, self()) # Nothing is in flight, not starved assert NSQ.Consumer.starved?(consumer) == false # One message in flight, 50% of last_rdy, not starved - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive({:message, _}, 2000) assert NSQ.Consumer.starved?(consumer) == false # Two messages in flight, 100% of last_rdy, __starved__ - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive({:message, _}, 2000) assert NSQ.Consumer.starved?(consumer) == true # Messages are done, back to 0 in flight, not starved assert_receive({:message_finished, _}, 2000) assert_receive({:message_finished, _}, 2000) - :timer.sleep 100 + :timer.sleep(100) assert NSQ.Consumer.starved?(consumer) == false end test "auth" do test_pid = self() + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ nsqds: [{"127.0.0.1", 6765}], auth_secret: "abc", - message_handler: fn(body, msg) -> + message_handler: fn body, msg -> assert body == "HTTP message" assert msg.attempts == 1 send(test_pid, :handled) @@ -654,19 +718,20 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) - HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) end test "deflate" do test_pid = self() + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ nsqds: [{"127.0.0.1", 6750}], deflate: true, - message_handler: fn(body, msg) -> + message_handler: fn body, msg -> assert body == "HTTP message" assert msg.attempts == 1 send(test_pid, :handled) @@ -674,15 +739,16 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) - HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6751/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) end test "deflate + tls + auth" do test_pid = self() + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ nsqds: [{"127.0.0.1", 6765}], deflate: true, @@ -693,7 +759,7 @@ defmodule NSQ.ConsumerTest do tls_min_version: :tlsv1, auth_secret: "abc", max_reconnect_attempts: 0, - message_handler: fn(body, msg) -> + message_handler: fn body, msg -> assert body == "HTTP message" assert msg.attempts == 1 send(test_pid, :handled) @@ -701,20 +767,20 @@ defmodule NSQ.ConsumerTest do end }) - HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) - HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", [body: "HTTP message"]) + HTTP.post("http://127.0.0.1:6766/pub?topic=#{@test_topic}", body: "HTTP message") assert_receive(:handled, 2000) end - test "stop_connections actually stops connections, without throwing an error" do - {:ok, cons_sup_pid} = NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ - msg_timeout: 1000, - nsqds: [{"127.0.0.1", 6750}], - message_handler: fn(_body, _msg) -> :ok end - }) + {:ok, cons_sup_pid} = + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ + msg_timeout: 1000, + nsqds: [{"127.0.0.1", 6750}], + message_handler: fn _body, _msg -> :ok end + }) consumer = NSQ.Consumer.get(cons_sup_pid) state = NSQ.Consumer.get_state(consumer) diff --git a/test/message_test.exs b/test/message_test.exs index 55edc2f..d8ed686 100644 --- a/test/message_test.exs +++ b/test/message_test.exs @@ -3,7 +3,7 @@ defmodule NSQ.MessageTest do doctest NSQ.Message def now do - :calendar.datetime_to_gregorian_seconds(:calendar.universal_time) + :calendar.datetime_to_gregorian_seconds(:calendar.universal_time()) end def build_raw_nsq_data(attrs \\ %{}) do @@ -12,10 +12,10 @@ defmodule NSQ.MessageTest do msg_id = attrs[:id] || String.pad_trailing(SecureRandom.hex(8), 16, "0") data = attrs[:body] || "test data" - <> - <> <> - <> msg_id - <> data + <> <> + <> <> + msg_id <> + data end test "#from_data given raw data, returns an instance of %NSQ.Message" do diff --git a/test/producer_test.exs b/test/producer_test.exs index e7cb232..97ce803 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -16,15 +16,18 @@ defmodule NSQ.ProducerTest do end test "#new starts a new producer, discoverable via nsqlookupd" do - {:ok, producer} = NSQ.Producer.Supervisor.start_link( - @test_topic, %NSQ.Config{nsqds: @configured_nsqds} - ) + {:ok, producer} = + NSQ.Producer.Supervisor.start_link( + @test_topic, + %NSQ.Config{nsqds: @configured_nsqds} + ) # Produce a ton of messages so we're "guaranteed" both our nsqds have # messages and are therefore discoverable. - Enum.map 0..100, fn(_i) -> NSQ.Producer.pub(producer, "test 1") end + Enum.map(0..100, fn _i -> NSQ.Producer.pub(producer, "test 1") end) lookupds = [{"127.0.0.1", 6771}, {"127.0.0.1", 6781}] + discovered_nsqds = lookupds |> NSQ.Lookupd.nsqds_with_topic("__nsq_producer_test_topic__") @@ -36,14 +39,17 @@ defmodule NSQ.ProducerTest do end test "messages added via pub are handled by a consumer" do - {:ok, producer} = NSQ.Producer.Supervisor.start_link( - @test_topic, %NSQ.Config{nsqds: @configured_nsqds} - ) + {:ok, producer} = + NSQ.Producer.Supervisor.start_link( + @test_topic, + %NSQ.Config{nsqds: @configured_nsqds} + ) test_pid = self() + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ nsqds: @configured_nsqds, - message_handler: fn(body, msg) -> + message_handler: fn body, msg -> assert body == "test abc" assert msg.attempts == 1 send(test_pid, :handled) @@ -56,16 +62,19 @@ defmodule NSQ.ProducerTest do end test "messages added via mpub are handled by a consumer" do - {:ok, producer} = NSQ.Producer.Supervisor.start_link( - @test_topic, %NSQ.Config{nsqds: @configured_nsqds} - ) + {:ok, producer} = + NSQ.Producer.Supervisor.start_link( + @test_topic, + %NSQ.Config{nsqds: @configured_nsqds} + ) test_pid = self() {:ok, bodies} = Agent.start_link(fn -> [] end) + NSQ.Consumer.Supervisor.start_link(@test_topic, @test_channel1, %NSQ.Config{ nsqds: @configured_nsqds, - message_handler: fn(body, _msg) -> - Agent.update bodies, fn(list) -> [body|list] end + message_handler: fn body, _msg -> + Agent.update(bodies, fn list -> [body | list] end) send(test_pid, :handled) :ok end @@ -74,6 +83,6 @@ defmodule NSQ.ProducerTest do NSQ.Producer.mpub(producer, ["def", "ghi"]) assert_receive(:handled, 2000) assert_receive(:handled, 2000) - assert Agent.get(bodies, fn(list) -> list end) == ["def", "ghi"] + assert Agent.get(bodies, fn list -> list end) == ["def", "ghi"] end end diff --git a/test/protocol_test.exs b/test/protocol_test.exs index 2df3f35..d50d883 100644 --- a/test/protocol_test.exs +++ b/test/protocol_test.exs @@ -2,4 +2,3 @@ defmodule NSQ.ProtocolTest do use ExUnit.Case, async: false doctest NSQ.Protocol end -