Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error on use ActiveMQ Artermis (MQTT) protocol #2

Open
sleipnir opened this issue Oct 17, 2020 · 4 comments
Open

Error on use ActiveMQ Artermis (MQTT) protocol #2

sleipnir opened this issue Oct 17, 2020 · 4 comments

Comments

@sleipnir
Copy link

Hello I am trying to use this library with ActiveMQ Artemis. This broker supports different protocols including [MQTT 3.1.1] (https://activemq.apache.org/components/artemis/documentation/latest/mqtt.html). However when I try to connect I get the error below:

2020-10-16 23:01:29.861 [nonode@nohost]:[pid=<0.299.0> file=gen_server.erl ]:[error]:GenServer {Tortoise.Registry, {Tortoise.Connection, "elixir_client_id_3"}} terminating
** (ArgumentError) argument error
    (kernel 7.0) gen_tcp.erl:170: :gen_tcp.connect/4
    (tortoise 0.9.4) lib/tortoise/connection.ex:583: Tortoise.Connection.do_connect/2
    (tortoise 0.9.4) lib/tortoise/connection.ex:385: Tortoise.Connection.handle_info/2
    (stdlib 3.13) gen_server.erl:680: :gen_server.try_dispatch/4
    (stdlib 3.13) gen_server.erl:756: :gen_server.handle_msg/6
    (stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: :connect
State: %Tortoise.Connection{backoff: %Tortoise.Connection.Backoff{max_interval: 30000, min_interval: 100, value: nil}, client_id: "elixir_client_id_3", connect: %Tortoise.Package.Connect{__META__: %Tortoise.Package.Meta{flags: 0, opcode: 1}, clean_session: true, client_id: "elixir_client_id_3", keep_alive: 60, password: nil, protocol: "MQTT", protocol_version: 4, user_name: nil, will: nil}, keep_alive: nil, opts: [client_id: "elixir_client_id_3", handler: {OffBroadway.MQTT.Handler, [client_id: "elixir_client_id_3", queue: {:via, Registry, {OffBroadway.MQTT.QueueRegistry, "tags.00666"}}, sub_ack: nil, meta: %{client_id: "elixir_client_id_3", host: "localhost", password: "******", port: 1883, qos: 0, topic_filter: "tags.00666", username: "admin"}, config: %OffBroadway.MQTT.Config{acknowledger: OffBroadway.MQTT.Acknowledger, client: OffBroadway.MQTT.Client, client_id_prefix: "elixir_client_id", dequeue_interval: 5000, handler: OffBroadway.MQTT.Handler, producer: OffBroadway.MQTT.Producer, queue: OffBroadway.MQTT.Queue, queue_registry: OffBroadway.MQTT.QueueRegistry, queue_supervisor: OffBroadway.MQTT.QueueSupervisor, server: {:tcp, [host: "localhost", port: 1883, username: "admin", password: "admin"]}, telemetry_prefix: :off_broadway_mqtt}]}], server: %Tortoise.Transport{host: 'localhost', opts: [:binary, {:packet, :raw}, {:active, false}, {:username, "admin"}, {:password, "admin"}], port: 1883, type: Tortoise.Transport.Tcp}, status: :down, subscriptions: %Tortoise.Package.Subscribe{__META__: %Tortoise.Package.Meta{flags: 2, opcode: 8}, identifier: nil, topics: [{"tags.00666", 0}]}}
2020-10-16 23:01:29.880 [nonode@nohost]:[pid=<0.309.0> file=gen_server.erl ]:[error]:GenServer {Tortoise.Registry, {Tortoise.Connection, "elixir_client_id_3"}} terminating
** (ArgumentError) argument error
    (kernel 7.0) gen_tcp.erl:170: :gen_tcp.connect/4
    (tortoise 0.9.4) lib/tortoise/connection.ex:583: Tortoise.Connection.do_connect/2
    (tortoise 0.9.4) lib/tortoise/connection.ex:385: Tortoise.Connection.handle_info/2
    (stdlib 3.13) gen_server.erl:680: :gen_server.try_dispatch/4
    (stdlib 3.13) gen_server.erl:756: :gen_server.handle_msg/6
    (stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: :connect
State: %Tortoise.Connection{backoff: %Tortoise.Connection.Backoff{max_interval: 30000, min_interval: 100, value: nil}, client_id: "elixir_client_id_3", connect: %Tortoise.Package.Connect{__META__: %Tortoise.Package.Meta{flags: 0, opcode: 1}, clean_session: true, client_id: "elixir_client_id_3", keep_alive: 60, password: nil, protocol: "MQTT", protocol_version: 4, user_name: nil, will: nil}, keep_alive: nil, opts: [client_id: "elixir_client_id_3", handler: {OffBroadway.MQTT.Handler, [client_id: "elixir_client_id_3", queue: {:via, Registry, {OffBroadway.MQTT.QueueRegistry, "tags.00666"}}, sub_ack: nil, meta: %{client_id: "elixir_client_id_3", host: "localhost", password: "******", port: 1883, qos: 0, topic_filter: "tags.00666", username: "admin"}, config: %OffBroadway.MQTT.Config{acknowledger: OffBroadway.MQTT.Acknowledger, client: OffBroadway.MQTT.Client, client_id_prefix: "elixir_client_id", dequeue_interval: 5000, handler: OffBroadway.MQTT.Handler, producer: OffBroadway.MQTT.Producer, queue: OffBroadway.MQTT.Queue, queue_registry: OffBroadway.MQTT.QueueRegistry, queue_supervisor: OffBroadway.MQTT.QueueSupervisor, server: {:tcp, [host: "localhost", port: 1883, username: "admin", password: "admin"]}, telemetry_prefix: :off_broadway_mqtt}]}], server: %Tortoise.Transport{host: 'localhost', opts: [:binary, {:packet, :raw}, {:active, false}, {:username, "admin"}, {:password, "admin"}], port: 1883, type: Tortoise.Transport.Tcp}, status: :down, subscriptions: %Tortoise.Package.Subscribe{__META__: %Tortoise.Package.Meta{flags: 2, opcode: 8}, identifier: nil, topics: [{"tags.00666", 0}]}}
{:ok, #PID<0.294.0>}
iex(4)> 2020-10-16 23:01:29.882 [nonode@nohost]:[pid=<0.310.0> file=gen_server.erl ]:[error]:GenServer {Tortoise.Registry, {Tortoise.Connection, "elixir_client_id_3"}} terminating
** (ArgumentError) argument error
    (kernel 7.0) gen_tcp.erl:170: :gen_tcp.connect/4
    (tortoise 0.9.4) lib/tortoise/connection.ex:583: Tortoise.Connection.do_connect/2
    (tortoise 0.9.4) lib/tortoise/connection.ex:385: Tortoise.Connection.handle_info/2
    (stdlib 3.13) gen_server.erl:680: :gen_server.try_dispatch/4
    (stdlib 3.13) gen_server.erl:756: :gen_server.handle_msg/6
    (stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: :connect
State: %Tortoise.Connection{backoff: %Tortoise.Connection.Backoff{max_interval: 30000, min_interval: 100, value: nil}, client_id: "elixir_client_id_3", connect: %Tortoise.Package.Connect{__META__: %Tortoise.Package.Meta{flags: 0, opcode: 1}, clean_session: true, client_id: "elixir_client_id_3", keep_alive: 60, password: nil, protocol: "MQTT", protocol_version: 4, user_name: nil, will: nil}, keep_alive: nil, opts: [client_id: "elixir_client_id_3", handler: {OffBroadway.MQTT.Handler, [client_id: "elixir_client_id_3", queue: {:via, Registry, {OffBroadway.MQTT.QueueRegistry, "tags.00666"}}, sub_ack: nil, meta: %{client_id: "elixir_client_id_3", host: "localhost", password: "******", port: 1883, qos: 0, topic_filter: "tags.00666", username: "admin"}, config: %OffBroadway.MQTT.Config{acknowledger: OffBroadway.MQTT.Acknowledger, client: OffBroadway.MQTT.Client, client_id_prefix: "elixir_client_id", dequeue_interval: 5000, handler: OffBroadway.MQTT.Handler, producer: OffBroadway.MQTT.Producer, queue: OffBroadway.MQTT.Queue, queue_registry: OffBroadway.MQTT.QueueRegistry, queue_supervisor: OffBroadway.MQTT.QueueSupervisor, server: {:tcp, [host: "localhost", port: 1883, username: "admin", password: "admin"]}, telemetry_prefix: :off_broadway_mqtt}]}], server: %Tortoise.Transport{host: 'localhost', opts: [:binary, {:packet, :raw}, {:active, false}, {:username, "admin"}, {:password, "admin"}], port: 1883, type: Tortoise.Transport.Tcp}, status: :down, subscriptions: %Tortoise.Package.Subscribe{__META__: %Tortoise.Package.Meta{flags: 2, opcode: 8}, identifier: nil, topics: [{"tags.00666", 0}]}}
2020-10-16 23:01:29.883 [nonode@nohost]:[pid=<0.313.0> file=gen_server.erl ]:[error]:GenServer {Tortoise.Registry, {Tortoise.Connection, "elixir_client_id_3"}} terminating
** (ArgumentError) argument error
    (kernel 7.0) gen_tcp.erl:170: :gen_tcp.connect/4
    (tortoise 0.9.4) lib/tortoise/connection.ex:583: Tortoise.Connection.do_connect/2
    (tortoise 0.9.4) lib/tortoise/connection.ex:385: Tortoise.Connection.handle_info/2
    (stdlib 3.13) gen_server.erl:680: :gen_server.try_dispatch/4
    (stdlib 3.13) gen_server.erl:756: :gen_server.handle_msg/6
    (stdlib 3.13) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message: :connect
State: %Tortoise.Connection{backoff: %Tortoise.Connection.Backoff{max_interval: 30000, min_interval: 100, value: nil}, client_id: "elixir_client_id_3", connect: %Tortoise.Package.Connect{__META__: %Tortoise.Package.Meta{flags: 0, opcode: 1}, clean_session: true, client_id: "elixir_client_id_3", keep_alive: 60, password: nil, protocol: "MQTT", protocol_version: 4, user_name: nil, will: nil}, keep_alive: nil, opts: [client_id: "elixir_client_id_3", handler: {OffBroadway.MQTT.Handler, [client_id: "elixir_client_id_3", queue: {:via, Registry, {OffBroadway.MQTT.QueueRegistry, "tags.00666"}}, sub_ack: nil, meta: %{client_id: "elixir_client_id_3", host: "localhost", password: "******", port: 1883, qos: 0, topic_filter: "tags.00666", username: "admin"}, config: %OffBroadway.MQTT.Config{acknowledger: OffBroadway.MQTT.Acknowledger, client: OffBroadway.MQTT.Client, client_id_prefix: "elixir_client_id", dequeue_interval: 5000, handler: OffBroadway.MQTT.Handler, producer: OffBroadway.MQTT.Producer, queue: OffBroadway.MQTT.Queue, queue_registry: OffBroadway.MQTT.QueueRegistry, queue_supervisor: OffBroadway.MQTT.QueueSupervisor, server: {:tcp, [host: "localhost", port: 1883, username: "admin", password: "admin"]}, telemetry_prefix: :off_broadway_mqtt}]}], server: %Tortoise.Transport{host: 'localhost', opts: [:binary, {:packet, :raw}, {:active, false}, {:username, "admin"}, {:password, "admin"}], port: 1883, type: Tortoise.Transport.Tcp}, status: :down, subscriptions: %Tortoise.Package.Subscribe{__META__: %Tortoise.Package.Meta{flags: 2, opcode: 8}, identifier: nil, topics: [{"tags.00666", 0}]}}

When I use Eclipse Mosquito it works however when I use ActiveMQ Artemis with MQTT enabled it doesn't work.

Below are excerpts of the code used:

defmodule Mercury.Flow do
  use OffBroadway.MQTT

  defmodule FlowException do
    defexception ack: :ignore, message: nil

    def message(e) do
      "message is probably coming from a nincompoop: " <> e.message
    end
  end

  def start_link(config, topic) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producers: [
        default: [
          module: {Producer, [config, subscription: {topic, 0}]},
          stages: 1
        ]
      ],
      processors: [default: [stages: 1]],
      batchers: [
        default: [stages: 1, batch_size: 10]
      ]
    )
  end

  @impl true
  def handle_message(_processor_name, message, _context) do
    message
    |> Message.update_data(&process_data/1)
  rescue
    e ->
      Message.failed(message, e)
  end

  defp process_data(%OffBroadway.MQTT.Data{acc: msg} = data) do
    msg
    |> String.downcase()
    |> String.contains?("great again")
    |> case do
      true -> raise FlowException, "contains \"great again\""
      false -> data
    end
  end

  @impl true
  def handle_batch(_, messages, _batch_info, _context) do
    # ...
    messages
  end
end

config =
  OffBroadway.MQTT.Config.new(
    client_id_prefix: "elixir_client_id",
    server_opts: [
      host: "localhost",
      port: "1883",
      transport: :tcp,
      username: "admin",
      password: "admin"
    ]
  )

Mercury.Flow.start_link(config, "tags.00666")

Do you have any idea what may be happening?

@sleipnir
Copy link
Author

I found the reason. Authentication does not seem to work. When running ActiveMQ without authentication support the process worked accordingly.

docker run -it --rm -e DISABLE_SECURITY=true -p 8161:8161 -p 61616:61616 -p 61613:61613 -p 1883:1883 vromero/activemq-artemis:latest-alpine
[root@sleipnir mercury]# iex -S mix
Erlang/OTP 23 [erts-11.0.2] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]

Interactive Elixir (1.11.0-dev) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> config =
...(1)>   OffBroadway.MQTT.Config.new(
...(1)>     client_id_prefix: "elixir_client_id",
...(1)>     server_opts: [
...(1)>       host: "localhost",
...(1)>       port: "1883",
...(1)>       transport: :tcp
...(1)>     ]
...(1)>   )
%OffBroadway.MQTT.Config{
  acknowledger: OffBroadway.MQTT.Acknowledger,
  client: OffBroadway.MQTT.Client,
  client_id_prefix: "elixir_client_id",
  dequeue_interval: 5000,
  handler: OffBroadway.MQTT.Handler,
  producer: OffBroadway.MQTT.Producer,
  queue: OffBroadway.MQTT.Queue,
  queue_registry: OffBroadway.MQTT.QueueRegistry,
  queue_supervisor: OffBroadway.MQTT.QueueSupervisor,
  server: {:tcp, [host: "localhost", port: 1883]},
  telemetry_prefix: :off_broadway_mqtt
}
iex(2)> Mercury.Flow.start_link(config, "tags.00666")
{:ok, #PID<0.291.0>}
iex(3)> 2020-10-16 23:35:29.397 [nonode@nohost]:[pid=<0.311.0> file=lib/off_broadway_mqtt/handler.ex ]:[debug]:initializing client
2020-10-16 23:35:29.397 [nonode@nohost]:[pid=<0.311.0> file=lib/off_broadway_mqtt/handler.ex ]:[debug]:client attempting to connect

@sleipnir
Copy link
Author

Hello

@sleipnir
Copy link
Author

ping

@sleipnir
Copy link
Author

After investigating the Tortoise library code, I noticed that:

  1. This library here does not correctly initialize the connection when authentication is required.
  2. I didn't find the call to: Tortoise.Connection.subscribe anywhere in this library. During my tests the signature was only created on the ActiveMQ server after the explicit call to this method. I still need to investigate further to understand why this is so.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant