A HiveMQ extension that allows to query retained messages via HTTP instead of using MQTT subscriptions.
HiveMQ is needed to run and test the extension. An evaluation version is available here.
The extension was developed using IntelliJ IDEA. The repo contains the project file: hivemq-retained-message-query-extension.iml
.
It can also be built on the command line using Maven:
$ mvn package
Maven is configured to use the HiveMQ Maven Plugin that will automatically start the plugin when using the RunWithHiveMQ
profile:
$ mvn package -PRunWithHiveMQ
There is a test suite in test/test.js
. Tests are written in JavaScript and run using mocha on Node.js:
$ npm install # install test dependencies
$ npm test # run tests against localhost
To run the tests against a different HiveMQ instance, you can set the BROKER
environment variable:
BROKER=broker.example.com npm test
The extension provides an HTTP API to query retained messages without using the MQTT protocol. The API uses JSON to define the query and represent the results.
Retained message payloads can be queried by sending an HTTP POST request to the /query
endpoint. Using jQuery, you can send a query like this:
$.post(httpBrokerUri + "/query", JSON.stringify(query));
In the request body we pass a JSON-encoded query object containing the details of our query:
{
"topic": "path/of/the/topic",
"depth": 0,
"flatten": false
}
topic
(String) is mandatory. It specifies the path of the topic we want to query.depth
(Number) is optional, defaults to0
. It specifies how many levels of child topics should be included in the response.flatten
(Boolean) is optional, defaults tofalse
. When set totrue
, the query will return an array of result objects instead of a single result objects. The array contains the result object for the queried topic as well as the result objects for all of its children, as far as they are included by thedepth
parameter.
The result object that we get back from the broker is also a JSON object:
{
"topic": "path/of/the/topic",
"payload": "23",
"children": [
{
"topic": "path/of/child/topic",
"payload": "\"foo\""
}
]
}
topic
(String) is always present. This is the path of the topic we queried.payload
(String) is optional. It will be included if there is a retained payload for the given topic. The payload is always a string. For topics following our convention to use JSON in payloads, that means we have toJSON.parse()
the payload on the client side.children
(Array) is optional. It will be included when there are child topics and thedepth
parameter in the query is set to include them, unless theflatten
parameter is set. The items of the array are result objects themselves, includingtopic
, an optionalpayload
and optionalchildren
.
The query API has limited support for wildcard queries: The topic in the query object may contain one or more +
wildcards. Instead of a single result object, the broker will return an array of result objects.
While the #
wildcard is not supported, a similar result can be achieved using the depth
parameter.
It is also possible to query multiple topics at once. When the request body contains an array of query objects instead of a single query object, the broker will return an array with a result object for each query.
Topic | Payload |
---|---|
foo/bar1 |
"hello" |
foo/bar1/baz1 |
["night", "day"] |
foo/bar2 |
13 |
foo/bar2/baz1 |
true |
foo/bar2/baz2 |
false |
Let's assume we have the above retained messages published to the broker. We can now issue some queries:
// Query
{ "topic": "foo/bar1" }
// Result
{ "topic": "foo/bar1", "payload": "\"hello\"" }
// Query
{ "topic": "foo", "depth": 1 }
// Result
{
"topic": "foo",
"children": [
{ "topic": "foo/bar1", "payload": "\"hello\"" },
{ "topic": "foo/bar2", "payload": "13" }
]
}
// Query
{ "topic": "foo", "depth": 2 }
// Result
{
"topic": "foo",
"children": [
{
"topic": "foo/bar1",
"payload": "\"hello\"",
"children": [
{ "topic": "foo/bar1/baz1", "payload": "[\"night\", \"day\"]" }
]
},
{
"topic": "foo/bar2",
"payload": "13",
"children": [
{ "topic": "foo/bar2/baz1", "payload": "true" },
{ "topic": "foo/bar2/baz2", "payload": "false" }
]
}
]
}
// Query
{ "topic": "foo", "depth": 2, "flatten": true }
// Result
[
{ "topic": "foo" },
{ "topic": "foo/bar1", "payload": "\"hello\"" },
{ "topic": "foo/bar1/baz1", "payload": "[\"night\", \"day\"]" }
{ "topic": "foo/bar2", "payload": "13" },
{ "topic": "foo/bar2/baz1", "payload": "true" },
{ "topic": "foo/bar2/baz2", "payload": "false" }
]
// Query
[{ "topic": "foo/bar1" }, { "topic": "foo/bar2", "depth": 1 }]
// Result
[
{
"topic": "foo/bar1",
"payload": "\"hello\""
},
{
"topic": "foo/bar2",
"payload": "13",
"children": [
{ "topic": "foo/bar2/baz1", "payload": "true" },
{ "topic": "foo/bar2/baz2", "payload": "false" }
]
}
]
// Query
{ "topic": "foo/+/baz1" }
// Result
[
{ "topic": "foo/bar1/baz1", "payload": "[\"night\", \"day\"]" },
{ "topic": "foo/bar2/baz1", "payload": "true" }
]
It can be necessary to send CORS headers along with the response e.g. if there is no upstream server which handles it. The internal HTTP Server can be configured to provide these CORS headers by adding the following section to conf/config.xml
:
<retained-message-query-extension>
<cors-header>true</cors-header>
</retained-message-query-extension>
Or by setting the environment variable QUERY_PLUGIN_CORS
to true
.
The default is false
to avoid duplicate CORS header errors.
The default port for the HTTP API is 8080
. It can be changed by setting the environment variable QUERY_PLUGIN_PORT
to the desired port.
The extension can be configured to disconnect all clients when the extension is initialized. This is needed on startup when the extension is used with the HiveMQ CE broker. The extension will not add a PublishInboundInterceptor to clients connected before the extension is initialized. To disconnect all clients on startup, set the environment variable QUERY_PLUGIN_DISCONNECT_CLIENTS
to true
.