diff --git a/lib/nsq/connection/initializer.ex b/lib/nsq/connection/initializer.ex index ca0c6f2..2f61297 100644 --- a/lib/nsq/connection/initializer.ex +++ b/lib/nsq/connection/initializer.ex @@ -7,55 +7,67 @@ defmodule NSQ.Connection.Initializer do @socket_opts [as: :binary, mode: :passive, packet: :raw] - @project ElixirNsq.Mixfile.project + @project ElixirNsq.Mixfile.project() @user_agent "#{@project[:app]}/#{@project[:version]}" - @ssl_versions [:sslv3, :tlsv1, :"tlsv1.1", :"tlsv1.2"] |> Enum.with_index + @ssl_versions [:sslv3, :tlsv1, :"tlsv1.1", :"tlsv1.2"] |> Enum.with_index() - - @spec connect(%{nsqd: C.host_with_port}) :: {:ok, C.state} | {:error, String.t} + @spec connect(%{nsqd: C.host_with_port()}) :: {:ok, C.state()} | {:error, String.t()} def connect(%{nsqd: {host, port}} = state) do if should_connect?(state) do - socket_opts = @socket_opts |> Keyword.merge( - send: [{:timeout, state.config.write_timeout}], - timeout: state.config.dial_timeout - ) + socket_opts = + @socket_opts + |> Keyword.merge( + send: [{:timeout, state.config.write_timeout}], + timeout: state.config.dial_timeout + ) case Socket.TCP.connect(host, port, socket_opts) do {:ok, socket} -> state.reader |> Buffer.setup_socket(socket, state.config.read_timeout) state.writer |> Buffer.setup_socket(socket, state.config.read_timeout) + state = %{state | socket: socket} |> do_handshake! |> start_receiving_messages! |> reset_connects + {:ok, %{state | connected: true}} + {:error, reason} -> if length(state.config.nsqlookupds) > 0 do - NSQ.Logger.warn "(#{inspect self()}) connect failed; #{reason}; discovery loop should respawn" + NSQ.Logger.warn( + "(#{inspect(self())}) connect failed; #{reason}; discovery loop should respawn" + ) + {{:error, reason}, %{state | connect_attempts: state.connect_attempts + 1}} else if state.config.max_reconnect_attempts > 0 do - NSQ.Logger.warn "(#{inspect self()}) connect failed; #{reason}; discovery loop should respawn" + NSQ.Logger.warn( + "(#{inspect(self())}) connect failed; #{reason}; discovery loop should respawn" + ) + {{:error, reason}, %{state | connect_attempts: state.connect_attempts + 1}} else - NSQ.Logger.error "(#{inspect self()}) connect failed; #{reason}; reconnect turned off; terminating connection" + NSQ.Logger.error( + "(#{inspect(self())}) connect failed; #{reason}; reconnect turned off; terminating connection" + ) + Process.exit(self(), :connect_failed) end end end else - NSQ.Logger.error "#{inspect self()}: Failed to connect; terminating connection" + NSQ.Logger.error("#{inspect(self())}: Failed to connect; terminating connection") Process.exit(self(), :connect_failed) end end - @doc """ Immediately after connecting to the NSQ socket, both consumers and producers follow this protocol. """ - @spec do_handshake(C.state) :: {:ok, C.state} + @spec do_handshake(C.state()) :: {:ok, C.state()} def do_handshake(conn_state) do conn_state |> send_magic_v2 @@ -68,34 +80,32 @@ defmodule NSQ.Connection.Initializer do {:ok, conn_state} end + def do_handshake!(conn_state) do {:ok, conn_state} = do_handshake(conn_state) conn_state end - - @spec send_magic_v2(C.state) :: :ok + @spec send_magic_v2(C.state()) :: :ok defp send_magic_v2(conn_state) do - NSQ.Logger.debug("(#{inspect self()}) sending magic v2...") + NSQ.Logger.debug("(#{inspect(self())}) sending magic v2...") conn_state |> Buffer.send!(encode(:magic_v2)) end - - @spec identify(C.state) :: {:ok, binary} + @spec identify(C.state()) :: {:ok, binary} defp identify(conn_state) do - NSQ.Logger.debug("(#{inspect self()}) identifying...") + NSQ.Logger.debug("(#{inspect(self())}) identifying...") identify_obj = encode({:identify, identify_props(conn_state)}) conn_state |> Buffer.send!(identify_obj) {:response, json} = recv_nsq_response(conn_state) {:ok, _conn_state} = update_from_identify_response(conn_state, json) end - - @spec identify_props(C.state) :: map + @spec identify_props(C.state()) :: map defp identify_props(%{nsqd: {host, port}, config: config} = conn_state) do %{ - client_id: "#{host}:#{port} (#{inspect conn_state.parent})", - hostname: to_string(:net_adm.localhost), + client_id: "#{host}:#{port} (#{inspect(conn_state.parent)})", + hostname: to_string(:net_adm.localhost()), feature_negotiation: true, heartbeat_interval: config.heartbeat_interval, output_buffer: config.output_buffer_size, @@ -110,26 +120,24 @@ defmodule NSQ.Connection.Initializer do } end - def inflate(data) do - z = :zlib.open + z = :zlib.open() :ok = z |> :zlib.inflateInit(-15) inflated = z |> :zlib.inflateChunk(data) - NSQ.Logger.warn "inflated chunk?" - NSQ.Logger.warn inspect inflated - :ok = z |> :zlib.inflateEnd - :ok = z |> :zlib.close + NSQ.Logger.warn("inflated chunk?") + NSQ.Logger.warn(inspect(inflated)) + :ok = z |> :zlib.inflateEnd() + :ok = z |> :zlib.close() inflated end - - @spec update_from_identify_response(C.state, binary) :: {:ok, C.state} + @spec update_from_identify_response(C.state(), binary) :: {:ok, C.state()} defp update_from_identify_response(conn_state, json) do - {:ok, parsed} = Poison.decode(json) + {:ok, parsed} = Jason.decode(json) # respect negotiated max_rdy_count if parsed["max_rdy_count"] do - ConnInfo.update conn_state, %{max_rdy: parsed["max_rdy_count"]} + ConnInfo.update(conn_state, %{max_rdy: parsed["max_rdy_count"]}) end # respect negotiated msg_timeout @@ -139,13 +147,16 @@ defmodule NSQ.Connection.Initializer do # wrap our socket with SSL if TLS is enabled conn_state = if parsed["tls_v1"] == true do - NSQ.Logger.debug "Upgrading to TLS..." - socket = Socket.SSL.connect! conn_state.socket, [ - cacertfile: conn_state.config.tls_cert, - keyfile: conn_state.config.tls_key, - versions: ssl_versions(conn_state.config.tls_min_version), - verify: ssl_verify_atom(conn_state.config), - ] + NSQ.Logger.debug("Upgrading to TLS...") + + socket = + Socket.SSL.connect!(conn_state.socket, + cacertfile: conn_state.config.tls_cert, + keyfile: conn_state.config.tls_key, + versions: ssl_versions(conn_state.config.tls_min_version), + verify: ssl_verify_atom(conn_state.config) + ) + conn_state = %{conn_state | socket: socket} conn_state.reader |> Buffer.setup_socket(socket, conn_state.config.read_timeout) conn_state.writer |> Buffer.setup_socket(socket, conn_state.config.read_timeout) @@ -159,12 +170,13 @@ defmodule NSQ.Connection.Initializer do # immediately. conn_state.reader |> Buffer.setup_compression(parsed, conn_state.config) conn_state.writer |> Buffer.setup_compression(parsed, conn_state.config) + if parsed["deflate"] == true || parsed["snappy"] == true do conn_state |> wait_for_ok! end if parsed["auth_required"] == true do - NSQ.Logger.debug "sending AUTH" + NSQ.Logger.debug("sending AUTH") auth_cmd = encode({:auth, conn_state.config.auth_secret}) conn_state |> Buffer.send!(auth_cmd) {:response, json} = recv_nsq_response(conn_state) @@ -174,7 +186,6 @@ defmodule NSQ.Connection.Initializer do {:ok, conn_state} end - defp ssl_verify_atom(config) do if config.tls_insecure_skip_verify == true do :verify_none @@ -183,67 +194,62 @@ defmodule NSQ.Connection.Initializer do end end - - @spec subscribe(C.state) :: {:ok, binary} + @spec subscribe(C.state()) :: {:ok, binary} defp subscribe(%{topic: topic, channel: channel} = conn_state) do - NSQ.Logger.debug "(#{inspect self()}) subscribe to #{topic} #{channel}" + NSQ.Logger.debug("(#{inspect(self())}) subscribe to #{topic} #{channel}") conn_state |> Buffer.send!(encode({:sub, topic, channel})) - NSQ.Logger.debug "(#{inspect self()}) wait for subscription acknowledgment" + NSQ.Logger.debug("(#{inspect(self())}) wait for subscription acknowledgment") conn_state |> wait_for_ok! end - - @spec recv_nsq_response(C.state) :: {:response, binary} + @spec recv_nsq_response(C.state()) :: {:response, binary} defp recv_nsq_response(conn_state) do - <> = conn_state |> Buffer.recv!(4) + <> = conn_state |> Buffer.recv!(4) raw_msg_data = conn_state |> Buffer.recv!(msg_size) {:response, _response} = decode(raw_msg_data) end - defp wait_for_ok!(state) do expected = ok_msg() ^expected = state |> Buffer.recv!(byte_size(expected)) end - - @spec ssl_versions(NSQ.Config.t) :: [atom] + @spec ssl_versions(NSQ.Config.t()) :: [atom] def ssl_versions(tls_min_version) do if tls_min_version do min_index = @ssl_versions[tls_min_version] + @ssl_versions - |> Enum.drop_while(fn({_, index}) -> index < min_index end) - |> Enum.map(fn({version, _}) -> version end) - |> Enum.reverse + |> Enum.drop_while(fn {_, index} -> index < min_index end) + |> Enum.map(fn {version, _} -> version end) + |> Enum.reverse() else @ssl_versions - |> Enum.map(fn({version, _}) -> version end) - |> Enum.reverse + |> Enum.map(fn {version, _} -> version end) + |> Enum.reverse() end end - - @spec should_connect?(C.state) :: boolean + @spec should_connect?(C.state()) :: boolean defp should_connect?(state) do state.connect_attempts == 0 || state.connect_attempts <= state.config.max_reconnect_attempts end - - @spec start_receiving_messages(C.state) :: {:ok, C.state} + @spec start_receiving_messages(C.state()) :: {:ok, C.state()} defp start_receiving_messages(state) do reader_pid = spawn_link(MessageHandling, :recv_nsq_messages, [state, self()]) state = %{state | reader_pid: reader_pid} GenServer.cast(self(), :flush_cmd_queue) {:ok, state} end + defp start_receiving_messages!(state) do {:ok, state} = start_receiving_messages(state) state end - - @spec reset_connects(C.state) :: C.state + @spec reset_connects(C.state()) :: C.state() defp reset_connects(state), do: %{state | connect_attempts: 0} end diff --git a/lib/nsq/lookupd.ex b/lib/nsq/lookupd.ex index a7fc970..c238cd7 100644 --- a/lib/nsq/lookupd.ex +++ b/lib/nsq/lookupd.ex @@ -62,7 +62,8 @@ defmodule NSQ.Lookupd do body = if body == nil || body == "", do: "{}", else: body if headers[:"X-Nsq-Content-Type"] == "nsq; version=1.0" do - Poison.decode!(body) + body + |> Jason.decode!() |> normalize_response else %{status_code: 200, status_txt: "OK", data: body} diff --git a/lib/nsq/protocol.ex b/lib/nsq/protocol.ex index 51f2c10..5d18ed6 100644 --- a/lib/nsq/protocol.ex +++ b/lib/nsq/protocol.ex @@ -1,103 +1,122 @@ defmodule NSQ.Protocol do @valid_topic_channel_name_regex ~r/^[\.a-zA-Z0-9_-]+(#ephemeral)?$/ - @frame_type_response <<0 :: size(32)>> - @frame_type_error <<1 :: size(32)>> - @frame_type_message <<2 :: size(32)>> - + @frame_type_response <<0::size(32)>> + @frame_type_error <<1::size(32)>> + @frame_type_message <<2::size(32)>> @doc """ Refer to http://nsq.io/clients/tcp_protocol_spec.html. """ def encode(cmd) do case cmd do - :magic_v2 -> " V2" - :noop -> "NOP\n" + :magic_v2 -> + " V2" + + :noop -> + "NOP\n" + {:identify, options} -> - json = Poison.encode!(options) - "IDENTIFY\n" <> <> <> json + json = Jason.encode!(options) + "IDENTIFY\n" <> <> <> json + {:auth, secret_key} -> - "AUTH\n" <> <> <> secret_key + "AUTH\n" <> <> <> secret_key + {:pub, topic, data} -> - "PUB #{topic}\n" <> << byte_size(data) :: size(32) >> <> data + "PUB #{topic}\n" <> <> <> data + {:mpub, topic, data} -> - {msgs, bytes, count} = Enum.reduce data, {[], 0, 0}, &mpub_info_acc/2 - "MPUB #{topic}\n" - <> <> <> <> - <> Enum.join(msgs, "") - {:sub, topic, channel} -> "SUB #{topic} #{channel}\n" - {:fin, msg_id} -> "FIN #{msg_id}\n" - {:req, msg_id, delay} -> "REQ #{msg_id} #{delay}\n" - {:rdy, count} -> "RDY #{count}\n" - {:touch, msg_id} -> "TOUCH #{msg_id}\n" - :cls -> "CLS\n" + {msgs, bytes, count} = Enum.reduce(data, {[], 0, 0}, &mpub_info_acc/2) + + "MPUB #{topic}\n" <> + <> <> + <> <> + Enum.join(msgs, "") + + {:sub, topic, channel} -> + "SUB #{topic} #{channel}\n" + + {:fin, msg_id} -> + "FIN #{msg_id}\n" + + {:req, msg_id, delay} -> + "REQ #{msg_id} #{delay}\n" + + {:rdy, count} -> + "RDY #{count}\n" + + {:touch, msg_id} -> + "TOUCH #{msg_id}\n" + + :cls -> + "CLS\n" end end - def decode(msg) do case msg do <<0, 0, 0, 6, @frame_type_response, "OK">> -> {:response, "OK"} + <<@frame_type_response, data::binary>> -> {:response, data} + <<@frame_type_error, data::binary>> -> {:error, data} + <<@frame_type_message, data::binary>> -> {:message, data} - <> -> + + <> -> {:error, "Unknown frame type #{frame_type}", data} end end - - @spec decode_as_message(binary) :: {atom, Map.t} | {atom, String.t} + @spec decode_as_message(binary) :: {atom, Map.t()} | {atom, String.t()} def decode_as_message(data) do case data do - <> -> - {:ok, %{ - id: <>, - timestamp: timestamp, - attempts: attempts, - body: rest - }} + <> -> + {:ok, + %{ + id: <>, + timestamp: timestamp, + attempts: attempts, + body: rest + }} + _else -> {:error, "Data did not match expected message format"} end end - def response_msg(body) do data = @frame_type_response <> body - <> <> @frame_type_response <> body + <> <> @frame_type_response <> body end - def ok_msg do response_msg("OK") end - def is_valid_topic_name?(topic) do is_valid_name?(topic) end - def is_valid_channel_name?(topic) do is_valid_name?(topic) end - defp is_valid_name?(name) do len = String.length(name) + len > 0 && len <= 64 && Regex.match?(@valid_topic_channel_name_regex, name) end - # Get all our mpub info in one pass. Expect this to be called via # Enum.reduce. defp mpub_info_acc(msg, {msgs, bytes, count}) do - encoded_msg = <> <> msg + encoded_msg = <> <> msg {[encoded_msg | msgs], bytes + byte_size(encoded_msg), count + 1} end end diff --git a/mix.exs b/mix.exs index 0c7c012..e5f14ff 100644 --- a/mix.exs +++ b/mix.exs @@ -2,14 +2,16 @@ defmodule ElixirNsq.Mixfile do use Mix.Project def project do - [app: :elixir_nsq, - version: "1.1.0", - elixir: "~> 1.1", - description: description(), - package: package(), - build_embedded: Mix.env == :prod, - start_permanent: Mix.env == :prod, - deps: deps()] + [ + app: :elixir_nsq, + version: "1.1.0", + elixir: "~> 1.1", + description: description(), + package: package(), + build_embedded: Mix.env() == :prod, + start_permanent: Mix.env() == :prod, + deps: deps() + ] end # Configuration for the OTP application @@ -17,7 +19,7 @@ defmodule ElixirNsq.Mixfile do # Type "mix help compile.app" for more information def application do [ - applications: [:logger, :httpotion, :poison, :socket2, :elixir_uuid], + applications: [:logger, :httpotion, :socket2, :elixir_uuid, :jason], extra_applications: extra_applications(Mix.env()) ] end @@ -36,17 +38,16 @@ defmodule ElixirNsq.Mixfile do # Type "mix help deps" for more examples and options defp deps do [ - {:poison, "~> 4.0"}, {:httpotion, "~> 3.2"}, {:elixir_uuid, "~> 1.2"}, {:socket2, "~> 2.1"}, + {:jason, "~> 1.4"}, # testing {:secure_random, "~> 0.5", only: :test}, {:plug_cowboy, "~> 2.0", only: :test}, - {:plug, "~> 1.15", only: :test }, - - {:ex_doc, ">= 0.0.0", only: :dev}, + {:plug, "~> 1.15", only: :test}, + {:ex_doc, ">= 0.0.0", only: :dev} ] end @@ -64,7 +65,7 @@ defmodule ElixirNsq.Mixfile do licenses: ["MIT"], links: %{ "GitHub" => "https://github.com/wistia/elixir_nsq" - }, + } ] end end diff --git a/mix.lock b/mix.lock index 823aa62..3087b61 100644 --- a/mix.lock +++ b/mix.lock @@ -1,17 +1,14 @@ %{ - "bandit": {:hex, :bandit, "1.1.1", "7158770ed1584c12964902ebce91c649b2c37a56e8ff36cc4f394baeca0876dc", [:mix], [{:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "d521272da28186412d4f8b480e17009f0bafb844dc7ac7e7a30d79eaf1f141fd"}, "certifi": {:hex, :certifi, "2.12.0", "2d1cca2ec95f59643862af91f001478c9863c2ac9cb6e2f89780bfd8de987329", [:rebar3], [], "hexpm", "ee68d85df22e554040cdb4be100f33873ac6051387baf6a8f6ce82272340ff1c"}, "cowboy": {:hex, :cowboy, "2.10.0", "ff9ffeff91dae4ae270dd975642997afe2a1179d94b1887863e43f681a203e26", [:make, :rebar3], [{:cowlib, "2.12.1", [hex: :cowlib, repo: "hexpm", optional: false]}, {:ranch, "1.8.0", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "3afdccb7183cc6f143cb14d3cf51fa00e53db9ec80cdcd525482f5e99bc41d6b"}, "cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"}, "cowlib": {:hex, :cowlib, "2.12.1", "a9fa9a625f1d2025fe6b462cb865881329b5caff8f1854d1cbc9f9533f00e1e1", [:make, :rebar3], [], "hexpm", "163b73f6367a7341b33c794c4e88e7dbfe6498ac42dcd69ef44c5bc5507c8db0"}, - "earmark": {:hex, :earmark, "1.4.0", "397e750b879df18198afc66505ca87ecf6a96645545585899f6185178433cc09", [:mix], [], "hexpm", "4bedcec35de03b5f559fd2386be24d08f7637c374d3a85d3fe0911eecdae838a"}, "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, "ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"}, - "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, - "http_server": {:git, "https://github.com/parroty/http_server.git", "922d10420836a51289ed04f0bb5022bf695da1ab", []}, "httpotion": {:hex, :httpotion, "3.2.0", "007c81c3a15b4860c893dea858eab2ce859a260b47071e85dcf9611a4226324e", [:mix], [{:ibrowse, "4.4.0", [hex: :ibrowse, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}], "hexpm", "726b3fdfc47d7f15302dac5f6a4152a5002fe8230dee8bdd65b8d154b573580b"}, "ibrowse": {:hex, :ibrowse, "4.4.0", "2d923325efe0d2cb09b9c6a047b2835a5eda69d8a47ed6ff8bc03628b764e991", [:rebar3], [], "hexpm", "6a8e5988872086f0506bef68311493551ac5beae7c06ba2a00d5e9f97a60f1c2"}, + "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, "makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"}, @@ -20,15 +17,9 @@ "plug": {:hex, :plug, "1.15.2", "94cf1fa375526f30ff8770837cb804798e0045fd97185f0bb9e5fcd858c792a3", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "02731fa0c2dcb03d8d21a1d941bdbbe99c2946c0db098eee31008e04c6283615"}, "plug_cowboy": {:hex, :plug_cowboy, "2.6.1", "9a3bbfceeb65eff5f39dab529e5cd79137ac36e913c02067dba3963a26efe9b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "de36e1a21f451a18b790f37765db198075c25875c64834bcc82d90b309eb6613"}, "plug_crypto": {:hex, :plug_crypto, "2.0.0", "77515cc10af06645abbfb5e6ad7a3e9714f805ae118fa1a70205f80d2d70fe73", [:mix], [], "hexpm", "53695bae57cc4e54566d993eb01074e4d894b65a3766f1c43e2c61a1b0f45ea9"}, - "poison": {:hex, :poison, "4.0.1", "bcb755a16fac91cad79bfe9fc3585bb07b9331e50cfe3420a24bcc2d735709ae", [:mix], [], "hexpm", "ba8836feea4b394bb718a161fc59a288fe0109b5006d6bdf97b6badfcf6f0f25"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, "secure_random": {:hex, :secure_random, "0.5.1", "c5532b37c89d175c328f5196a0c2a5680b15ebce3e654da37129a9fe40ebf51b", [:mix], [], "hexpm", "1b9754f15e3940a143baafd19da12293f100044df69ea12db5d72878312ae6ab"}, - "socket": {:hex, :socket, "0.3.13", "98a2ab20ce17f95fb512c5cadddba32b57273e0d2dba2d2e5f976c5969d0c632", [:mix], [], "hexpm", "f82ea9833ef49dde272e6568ab8aac657a636acb4cf44a7de8a935acb8957c2e"}, "socket2": {:hex, :socket2, "2.1.1", "850a8e90963358e1e9ba53efe4e91272f2b39489e3f2a82d291010b20539e8c3", [:mix], [{:certifi, "~> 2.12", [hex: :certifi, repo: "hexpm", optional: false]}], "hexpm", "37face03ee9c98e100084fa202a5291c2200a9b4e661dad7b163c9efdb0a2865"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, - "test_server": {:hex, :test_server, "0.1.14", "c3cdf0b6c1be691ae50a14ee3ea4bd026250c321c2012f5dfaed336d8702a562", [:mix], [{:bandit, ">= 0.7.6", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, ">= 2.0.0", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:x509, "~> 0.6", [hex: :x509, repo: "hexpm", optional: false]}], "hexpm", "5af2f98a22765ff3cc66f09d20c88b754586cf0c45cf0d2e0068e2a47f5041a7"}, - "thousand_island": {:hex, :thousand_island, "1.2.0", "4f548ae771ab5f96bc7e199f9824c0c2ce6d365f8c93f5f64dbbb33988e484bf", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "521671fea179672addb6af46455fc2a77be1edda4c0ed351633e0ef37a4b3584"}, - "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, - "x509": {:hex, :x509, "0.8.8", "aaf5e58b19a36a8e2c5c5cff0ad30f64eef5d9225f0fd98fb07912ee23f7aba3", [:mix], [], "hexpm", "ccc3bff61406e5bb6a63f06d549f3dba3a1bbb456d84517efaaa210d8a33750f"}, } diff --git a/test/nsq/test/auth_server.ex b/test/nsq/test/auth_server.ex index 2696000..37bcff0 100644 --- a/test/nsq/test/auth_server.ex +++ b/test/nsq/test/auth_server.ex @@ -2,8 +2,8 @@ defmodule NSQ.Test.AuthServer do defmodule NSQ.Test.Router do use Plug.Router - plug :match - plug :dispatch + plug(:match) + plug(:dispatch) get "/auth" do json_response = %{ @@ -19,9 +19,7 @@ defmodule NSQ.Test.AuthServer do ] } - - - send_resp(conn, 200, Poison.encode!(json_response)) + send_resp(conn, 200, Jason.encode!(json_response)) end match _ do @@ -31,6 +29,6 @@ defmodule NSQ.Test.AuthServer do def start(port) do [:telemetry] |> Enum.each(&Application.start/1) - Plug.Cowboy.http NSQ.Test.Router, [], port: port + Plug.Cowboy.http(NSQ.Test.Router, [], port: port) end end