Skip to content

Commit

Permalink
Specify properties in pub command (#101)
Browse files Browse the repository at this point in the history
new pub option --property
  • Loading branch information
jandelgado authored Sep 5, 2024
1 parent b125757 commit 2244b55
Show file tree
Hide file tree
Showing 14 changed files with 393 additions and 63 deletions.
9 changes: 6 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog for rabtap

## v1.42 (2024-09-04)

* new: `--property KEY=VALUE` option to specify message properties like e.g.
`Expiration`, `ContentType` etc. in the `pub` command. Run
`rabtap help properties` to list the available message properties.

## v1.41 (2024-08-27)

* new: `--filter=FILTER` option for `tap` and `sub` commands to filter output
Expand Down Expand Up @@ -263,6 +269,3 @@
### Changed

* minor changes to output of `info` command (i.e. some values are now quoted)



76 changes: 46 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,25 +140,25 @@ compile from source.
## Usage

```
rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap
Usage:
rabtap -h|--help
rabtap info [--api=APIURI] [--consumers] [--stats] [--filter=EXPR] [--omit-empty]
[--show-default] [--mode=MODE] [--format=FORMAT] [-kncv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap tap EXCHANGES [--uri=URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
[--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
[--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
[--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv]
[--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
[--offset=OFFSET] [--args=KV]... [(--reject [--requeue])] [-jkcsvn]
[--filter=EXPR] [--idle-timeout=DURATION]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT]
[--routingkey=KEY | (--header=KV)...]
[--routingkey=KEY | (--header=KV)...] [ (--property=KV)... ]
[--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [--args=KV]... [-kv]
Expand All @@ -185,6 +185,7 @@ Usage:
rabtap conn close CONNECTION [--api=APIURI] [--reason=REASON] [-kv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap --version
rabtap (-h | --help | help) [properties]
Arguments and options:
EXCHANGES comma-separated list of exchanges and optional binding keys,
Expand All @@ -209,7 +210,7 @@ Arguments and options:
--consumers include consumers and connections in output of info command.
--delay=DELAY Time to wait between sending messages during publish.
If not set then messages will be delayed as recorded.
The value must be suffixed with a time unit, e.g. ms, s etc.
The value must be suffixed with a time unit, e.g. ms, s etc.
-d, --durable create durable exchange/queue.
--exchange=EXCHANGE Optional exchange to publish to. If omitted, exchange will
be taken from message being published (see JSON message format).
Expand All @@ -219,12 +220,12 @@ Arguments and options:
Valid options are: "raw", "json", "json-nopp". Default: raw
* for info command: controls generated output format. Valid
options are: "text", "dot". Default: text
-h, --help print this help.
-h, --help print this help
--header=KV A key value pair in the form of "key=value" used as a
routing- or binding-key. Can occur multiple times.
--idle-timeout=DURATION end reading messages when no new message was received
for the given duration. The value must be suffixed with
a time unit, e.g. ms, s etc.
a time unit, e.g. ms, s etc.
-j, --json deprecated. Use "--format json" instead.
-k, --insecure allow insecure TLS connections (no certificate check).
--lazy create a lazy queue.
Expand All @@ -237,21 +238,23 @@ Arguments and options:
--omit-empty don't show echanges without bindings in info command.
--offset=OFFSET Offset when reading from a stream. Can be 'first', 'last',
'next', a duration like '10m', a RFC3339-Timestamp or
an integer index value. Basically it is an alias for
'--args=x-stream-offset=OFFSET'.
an integer index value. Basically it is an alias for
'--args=x-stream-offset=OFFSET'.
--property=KV A key value pair in the form of "key=value" to specify
message properties like e.g. the content-type.
--queue-type=TYPE type of queue [default: classic].
--reason=REASON reason why the connection was closed [default: closed by rabtap].
--reject Reject messages. Default behaviour is to acknowledge messages.
--requeue Instruct broker to requeue rejected message
-r, --routingkey=KEY routing key to use in publish mode. If omitted, routing key
will be taken from message being published (see JSON
message format).
message format).
--saveto=DIR also save messages and metadata to DIR.
--show-default include default exchange in output info command.
-s, --silent suppress message output to stdout.
--speed=FACTOR Speed factor to use during publish [default: 1.0].
--stats include statistics in output of info command.
-t, --type=TYPE type of exchange [default: fanout].
-t, --type=TYPE type of exchange [default: fanout].
--tls-cert-file=CERTFILE A Cert file to use for client authentication.
--tls-key-file=KEYFILE A Key file to use for client authentication.
--tls-ca-file=CAFILE A CA Cert file to use with TLS.
Expand All @@ -270,14 +273,17 @@ Examples:
export RABTAP_AMQPURI=amqp://guest:guest@localhost:5672/
rabtap queue create JDQ
rabtap queue bind JDQ to amq.topic --bindingkey=key
echo "Hello" | rabtap pub --exchange amq.topic --routingkey "key"
echo "Hello"| gzip | rabtap pub --exchange amq.topic --routingkey "key" --property ContentType=gzip
rabtap sub JDQ
# print only messages that have ".Name == 'JAN'" in their JSON payload
rabtap sub JDQ --filter="let b=fromJSON(r.toStr(r.body(r.msg))); b.Name == 'JAN'"
rabtap queue rm JDQ
# use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api
export RABTAP_APIURI=http://guest:guest@localhost:15672/api
rabtap info
rabtap info --filter "binding.Source == 'amq.topic'" --omit-empty
rabtap info --filter "r.binding.Source == 'amq.topic'" --omit-empty
rabtap conn close "172.17.0.1:40874 -> 172.17.0.2:5672"
# use RABTAP_TLS_CERTFILE | RABTAP_TLS_KEYFILE | RABTAP_TLS_CAFILE environments variables
Expand Down Expand Up @@ -592,28 +598,32 @@ Examples:

#### Publish messages

The `pub` command is used to publish messages to an exchange. The messages to
be published are either read from a file, or from a directory which contains
previously recorded messages (e.g. using the `--saveto` option of the `tap`
command). The general form of the `pub` command is:
The `pub` command is used to publish messages to an exchange. The general
form of the `pub` command is:

```text
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--format=FORMAT]
[--routingkey=KEY | (--header=HEADERKV)...] [--confirms] [--mandatory]
[--delay=DELAY | --speed=FACTOR] [-jkv]
[--routingkey=KEY | (--header=KV)...] [ (--property=KV)... ]
[--confirms] [--mandatory] [--delay=DELAY | --speed=FACTOR] [-jkv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
```

The `SOURCE` parameter specifies the messages to be published. These are either
read from a file, or from a directory which contains previously recorded
messages (e.g. using the `--saveto` option of the `tap` command). If `SOURCE`
is omitted, `stdin` is used.

Message routing is either specified with a routing key and the `--routingkey`
option or, when header based routing should be used, by specifying the headers
with the `--header` option. Each header is specified in the form `KEY=VALUE`.
Multiple headers can be specified by specifying multiple `--header` options.

Messages can be published either in raw format, in which they are sent as-is,
or in [JSON-format, as described here](#json-message-format), which includes
message metadata and the body in a single JSON document. When multiple messages
are published with metadata, rabtap will calculate the time elapsed of
consecutive recorded messages using the metadata, and delay publishing
accordingly.
or in [JSON-format, as described here](#json-message-format) (`--format=json`),
which includes message metadata and the body in a single JSON document. When
multiple messages are published with metadata, rabtap will calculate the time
elapsed of consecutive recorded messages using the metadata, and delay
publishing accordingly.

To set the publishing delay to a fix value, use the `--delay` option. To
publish without delays, use `--delay=0s`. To modify publishing speed use the
Expand All @@ -630,26 +640,32 @@ When the `--mandatory` option is set, rabtap publishes message in mandatory
mode. If set and a message can not be delivered to a queue, the server returns
the message and rabtap will log an error.

Use the `--property` option to set message properties like `ContentType` etc.
Multiple properties can be specified by specifying multiple `--property` options.
Run `rabtap help properties` to see the list of available properties.

Examples:

* `$ echo hello | rabtap pub --exchange amq.fanout` - publish "hello" to
* `echo hello | rabtap pub --exchange amq.fanout` - publish "hello" to
exchange amqp.fanout
* `echo "hello" | rabtap pub --exchange amq.header --header KEY=VAL --header X=Y` -
publish `hello` to exchange `amq.header` and set given message headers
* `$ rabtap pub messages.json --format=json` - messages are read from file
* `rabtap pub messages.json --format=json` - messages are read from file
`messages.json` in [rabtap JSON format](#json-message-format). Target
exchange and routing keys are read from the messages meta data. The
`messages.json` file can contain multiple JSON documents as it is treated as
a JSON stream. Rabtap will honor the `XRabtapReceived` timestamps of the
messages and by default will delay the messages as they were recorded. This
behaviour can be overridden by the `--delay` and `--speed` options
* `$ rabtap pub --exchange amq.direct -r myKey --format=json messages.json --delay=0s` - as
* `rabtap pub --exchange amq.direct -r myKey --format=json messages.json --delay=0s` - as
before, but publish messages always to exchange `amq.direct` with routing key
`myKey` and without any delays
* `$ rabtap pub --exchange amq.direct -r myKey --format=raw somedir --delay=0s` - as
* `rabtap pub --exchange amq.direct -r myKey --format=raw somedir --delay=0s` - as
before, but assuming that `somedir` is a directory, the messages are read
from message files previously recorded to this directory and replayed in the
order they were recorded
* `echo hello | rabtap pub --exchange amq.fanout --property Expiration=1000` -
publish "hello" to exchange amqp.fanout and set the message expiration to 1000ms.

#### Poor mans shovel

Expand Down Expand Up @@ -882,7 +898,7 @@ broker to be used, e.g. `http://guest:guest@localhost:15672/api`).
all queues with at least one consumer
* `rabtap info --mode=byConnection --filter="r.channel.PrefetchCount > 1` - list
all connection with channel that have a prefetch-count > 1
* `rabtap info --mode=byConnection --filter="r.connection.PeerCertSubject matches '.*CN=guest.*'"` -
* `rabtap info --mode=byConnection --filter="r.connection.PeerCertSubject matches '.*CN=guest.*'"` -
list all connection that were authenticated using mTLS and which certificates
subject contains `CN=guest`
* `rabtap sub JDQ --filter="r.msg.RoutingKey == 'test'"` - print only messages that
Expand Down
4 changes: 3 additions & 1 deletion cmd/rabtap/cmd_publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,16 @@ func TestCmdPublishARawFileWithExchangeAndRoutingKey(t *testing.T) {
"--uri", testcommon.IntegrationURIFromEnv().String(),
"--exchange=exchange",
tmpfile.Name(),
"--routingkey", routingKey}
"--routingkey", routingKey,
"--property=ContentType=text/plain"}

main()

select {
case message := <-deliveries:
assert.Equal(t, "exchange", message.Exchange)
assert.Equal(t, routingKey, message.RoutingKey)
assert.Equal(t, "text/plain", message.ContentType)
assert.Equal(t, "hello", string(message.Body))
case <-time.After(time.Second * 2):
assert.Fail(t, "did not receive message within expected time")
Expand Down
Loading

0 comments on commit 2244b55

Please sign in to comment.