Skip to content

Commit

Permalink
Switch from using :gen_tcp to Socket package.
Browse files Browse the repository at this point in the history
Precursor to using its ssl socket wrapper.
  • Loading branch information
MaxPower15 committed Jan 11, 2016
1 parent 6c7943b commit 02cb267
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 20 deletions.
45 changes: 28 additions & 17 deletions lib/nsq/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule NSQ.Connection do
# ------------------------------------------------------- #
@project ElixirNsq.Mixfile.project
@user_agent "#{@project[:app]}/#{@project[:version]}"
@socket_opts [:binary, active: false, deliver: :term, packet: :raw]
@socket_opts [as: :binary, active: false, deliver: :term, packet: :raw]
@initial_state %{
parent: nil,
socket: nil,
Expand Down Expand Up @@ -124,7 +124,7 @@ defmodule NSQ.Connection do
case msg do
{:response, "_heartbeat_"} ->
GenEvent.notify(state.event_manager_pid, :heartbeat)
:gen_tcp.send(socket, encode(:noop))
socket |> Socket.Stream.send!(encode(:noop))

{:response, data} ->
GenEvent.notify(state.event_manager_pid, {:response, data})
Expand Down Expand Up @@ -311,7 +311,7 @@ defmodule NSQ.Connection do
back to the connection for handling.
"""
def recv_nsq_messages(sock, conn, timeout) do
case :gen_tcp.recv(sock, 4, timeout) do
case sock |> Socket.Stream.recv(4, timeout: timeout) do
{:error, :timeout} ->
# If publishing is quiet, we won't receive any messages in the timeout.
# This is fine. Let's just try again!
Expand All @@ -320,7 +320,9 @@ defmodule NSQ.Connection do
# Got a message! Decode it and let the connection know. We just
# received data on the socket to get the size of this message, so if we
# timeout in here, that's probably indicative of a problem.
{:ok, raw_msg_data} = :gen_tcp.recv(sock, msg_size, timeout)

{:ok, raw_msg_data} =
sock |> Socket.Stream.recv(msg_size, timeout: timeout)
decoded = decode(raw_msg_data)
GenServer.cast(conn, {:nsq_msg, decoded})
recv_nsq_messages(sock, conn, timeout)
Expand Down Expand Up @@ -388,8 +390,12 @@ defmodule NSQ.Connection do
@spec connect(%{nsqd: host_with_port}) :: {:ok, conn_state} | {:error, String.t}
defp connect(%{nsqd: {host, port}} = state) do
if should_connect?(state) do
socket_opts = @socket_opts ++ [send_timeout: state.config.write_timeout]
case :gen_tcp.connect(to_char_list(host), port, socket_opts, state.config.dial_timeout) do
socket_opts =
@socket_opts
|> Keyword.put(:send_timeout, state.config.write_timeout)
|> Keyword.put(:timeout, state.config.dial_timeout)

case Socket.TCP.connect(host, port, socket_opts) do
{:ok, socket} ->
state = %{state | socket: socket}
{:ok, state} = do_handshake(state)
Expand Down Expand Up @@ -424,14 +430,14 @@ defmodule NSQ.Connection do
@spec send_magic_v2(pid) :: :ok
defp send_magic_v2(socket) do
Logger.debug("(#{inspect self}) sending magic v2...")
:ok = :gen_tcp.send(socket, encode(:magic_v2))
socket |> Socket.Stream.send!(encode(:magic_v2))
end

@spec identify(pid, conn_state) :: {:ok, binary}
defp identify(socket, conn_state) do
Logger.debug("(#{inspect self}) identifying...")
identify_obj = encode({:identify, identify_props(conn_state)})
:ok = :gen_tcp.send(socket, identify_obj)
socket |> Socket.Stream.send!(identify_obj)
{:response, json} = recv_nsq_response(socket, conn_state)
{:ok, _conn_state} = update_from_identify_response(conn_state, json)
end
Expand All @@ -457,23 +463,28 @@ defmodule NSQ.Connection do

@spec recv_nsq_response(pid, map) :: {:response, binary}
defp recv_nsq_response(socket, conn_state) do
{:ok, <<msg_size :: size(32)>>} = :gen_tcp.recv(
socket, 4, conn_state.config.read_timeout
)
{:ok, raw_msg_data} = :gen_tcp.recv(
socket, msg_size, conn_state.config.read_timeout
)
{:ok, <<msg_size :: size(32)>>} =
socket |>
Socket.Stream.recv(4, timeout: conn_state.config.read_timeout)

{:ok, raw_msg_data} =
socket |>
Socket.Stream.recv(msg_size, timeout: conn_state.config.read_timeout)

{:response, _response} = decode(raw_msg_data)
end

@spec subscribe(pid, conn_state) :: {:ok, binary}
defp subscribe(socket, %{topic: topic, channel: channel} = conn_state) do
Logger.debug "(#{inspect self}) subscribe to #{topic} #{channel}"
:gen_tcp.send(socket, encode({:sub, topic, channel}))
socket |>
Socket.Stream.send!(encode({:sub, topic, channel}))

Logger.debug "(#{inspect self}) wait for subscription acknowledgment"
expected = ok_msg
{:ok, ^expected} = :gen_tcp.recv(socket, 0, conn_state.config.read_timeout)
{:ok, ^expected} =
socket |>
Socket.Stream.recv(0, timeout: conn_state.config.read_timeout)
end

@spec identify_props(conn_state) :: conn_state
Expand Down Expand Up @@ -524,7 +535,7 @@ defmodule NSQ.Connection do
@spec send_data_and_queue_resp(conn_state, tuple, {reference, pid}, atom) ::
conn_state
defp send_data_and_queue_resp(state, cmd, from, kind) do
:gen_tcp.send(state.socket, encode(cmd))
state.socket |> Socket.Stream.send!(encode(cmd))
if kind == :noresponse do
state
else
Expand Down
6 changes: 3 additions & 3 deletions lib/nsq/message.ex
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ defmodule NSQ.Message do
"""
def fin(message) do
Logger.debug("(#{inspect message.connection}) fin msg ID #{message.id}")
:gen_tcp.send(message.socket, encode({:fin, message.id}))
message.socket |> Socket.Stream.send(encode({:fin, message.id}))
GenEvent.notify(message.event_manager_pid, {:message_finished, message})
GenServer.call(message.consumer, {:start_stop_continue_backoff, :resume})
GenEvent.notify(message.event_manager_pid, :resume)
Expand All @@ -135,7 +135,7 @@ defmodule NSQ.Message do
else
Logger.debug("(#{inspect message.connection}) requeue msg ID #{message.id}, delay #{delay}, backoff #{backoff}")
end
:gen_tcp.send(message.socket, encode({:req, message.id, delay}))
message.socket |> Socket.Stream.send(encode({:req, message.id, delay}))
GenEvent.notify(message.event_manager_pid, {:message_requeued, message})
if backoff do
GenServer.call(message.consumer, {:start_stop_continue_backoff, :backoff})
Expand All @@ -153,7 +153,7 @@ defmodule NSQ.Message do
"""
def touch(message) do
Logger.debug("(#{message.connection}) touch msg ID #{message.id}")
:gen_tcp.send(message.socket, encode({:touch, message.id}))
message.socket |> Socket.Stream.send!(encode({:touch, message.id}))
end

# ------------------------------------------------------- #
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ defmodule ElixirNsq.Mixfile do
{:ibrowse, github: "cmullaparthi/ibrowse", tag: "v4.1.2"},
{:httpotion, "~> 2.1.0"},
{:uuid, "~> 1.1.2"},
{:socket, "~> 0.3.1"},

# testing
{:secure_random, "~> 0.2", only: :test},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@
"ibrowse": {:git, "https://github.com/cmullaparthi/ibrowse.git", "ea3305d21f37eced4fac290f64b068e56df7de80", [tag: "v4.1.2"]},
"poison": {:hex, :poison, "1.5.0"},
"secure_random": {:hex, :secure_random, "0.2.0"},
"socket": {:hex, :socket, "0.3.1"},
"uuid": {:hex, :uuid, "1.1.2"}}
1 change: 1 addition & 0 deletions test/consumer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ defmodule NSQ.ConsumerTest do
conn_state = Conn.get_state(conn1)

Logger.warn "Closing socket as part of test..."
Socket.Stream.close(conn_state.socket)
:gen_tcp.close(conn_state.socket)

# Normally dead connections hang around until the next discovery loop run,
Expand Down

0 comments on commit 02cb267

Please sign in to comment.