Skip to content

Commit

Permalink
Decode message body before printing on console (tap,sub) (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado committed Sep 22, 2024
1 parent f887e97 commit 4fad502
Show file tree
Hide file tree
Showing 37 changed files with 502 additions and 338 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/create_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v5
with:
go-version: 1.22.4
go-version: 1.23.1
- name: Checkout code
uses: actions/checkout@v4
- name: Run GoReleaser
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:
- master

env:
GO_VERSION: "1.22.5"
GO_VERSION: "1.23.1"
GO_LANG_CI_LINT_VERSION: "v1.60.1"

name: run tests
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog for rabtap

## v1.43 (2024-09-22)

* new: decode compressed message bodies in `tap` and `sub` command, when the
messages are printed to the console. The encoding is taken from the
`ContentEncoding` message property. `gzip`, `deflate`, `zstd` and `bzip2`
are supported.

## v1.42 (2024-09-04)

* new: `--property KEY=VALUE` option to specify message properties like e.g.
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ TOXICMD:=docker compose exec toxiproxy /go/bin/toxiproxy-cli
.PHONY: phony

build: phony
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags \
CGO_ENABLED=0 go build -ldflags \
"-s -w -X main.version=$(VERSION)" -o ./bin/rabtap ./cmd/rabtap
wasm-build: phony
CGO_ENABLED=1 GOOS=wasip1 GOARCH=wasm go build -o ./bin/rabtap-wasm ./cmd/rabtap
Expand Down
50 changes: 35 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,18 +139,17 @@ compile from source.

## Usage

```
```text
rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap
Usage:
rabtap info [--api=APIURI] [--consumers] [--stats] [--filter=EXPR] [--omit-empty]
[--show-default] [--mode=MODE] [--format=FORMAT] [-kncv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap tap EXCHANGES [--uri=URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
rabtap tap EXCHANGES [--uri=URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
[--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
[--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv]
[(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE]
rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM]
Expand Down Expand Up @@ -210,7 +209,7 @@ Arguments and options:
--consumers include consumers and connections in output of info command.
--delay=DELAY Time to wait between sending messages during publish.
If not set then messages will be delayed as recorded.
The value must be suffixed with a time unit, e.g. ms, s etc.
The value must be suffixed with a time unit, e.g. ms, s etc.
-d, --durable create durable exchange/queue.
--exchange=EXCHANGE Optional exchange to publish to. If omitted, exchange will
be taken from message being published (see JSON message format).
Expand All @@ -224,8 +223,8 @@ Arguments and options:
--header=KV A key value pair in the form of "key=value" used as a
routing- or binding-key. Can occur multiple times.
--idle-timeout=DURATION end reading messages when no new message was received
for the given duration. The value must be suffixed with
a time unit, e.g. ms, s etc.
for the given duration. The value must be suffixed with
a time unit, e.g. ms, s etc.
-j, --json deprecated. Use "--format json" instead.
-k, --insecure allow insecure TLS connections (no certificate check).
--lazy create a lazy queue.
Expand All @@ -238,23 +237,23 @@ Arguments and options:
--omit-empty don't show echanges without bindings in info command.
--offset=OFFSET Offset when reading from a stream. Can be 'first', 'last',
'next', a duration like '10m', a RFC3339-Timestamp or
an integer index value. Basically it is an alias for
'--args=x-stream-offset=OFFSET'.
--property=KV A key value pair in the form of "key=value" to specify
an integer index value. Basically it is an alias for
'--args=x-stream-offset=OFFSET'.
--property=KV A key value pair in the form of "key=value" to specify
message properties like e.g. the content-type.
--queue-type=TYPE type of queue [default: classic].
--reason=REASON reason why the connection was closed [default: closed by rabtap].
--reject Reject messages. Default behaviour is to acknowledge messages.
--requeue Instruct broker to requeue rejected message
-r, --routingkey=KEY routing key to use in publish mode. If omitted, routing key
will be taken from message being published (see JSON
message format).
message format).
--saveto=DIR also save messages and metadata to DIR.
--show-default include default exchange in output info command.
-s, --silent suppress message output to stdout.
--speed=FACTOR Speed factor to use during publish [default: 1.0].
--stats include statistics in output of info command.
-t, --type=TYPE type of exchange [default: fanout].
-t, --type=TYPE type of exchange [default: fanout].
--tls-cert-file=CERTFILE A Cert file to use for client authentication.
--tls-key-file=KEYFILE A Key file to use for client authentication.
--tls-ca-file=CAFILE A CA Cert file to use with TLS.
Expand All @@ -277,7 +276,7 @@ Examples:
rabtap sub JDQ
# print only messages that have ".Name == 'JAN'" in their JSON payload
rabtap sub JDQ --filter="let b=fromJSON(r.toStr(r.body(r.msg))); b.Name == 'JAN'"
rabtap sub JDQ --filter="let b=fromJSON(r.toStr(r.body(r.msg))); b.Name == 'JAN'"
rabtap queue rm JDQ
# use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api
Expand Down Expand Up @@ -642,7 +641,22 @@ the message and rabtap will log an error.

Use the `--property` option to set message properties like `ContentType` etc.
Multiple properties can be specified by specifying multiple `--property` options.
Run `rabtap help properties` to see the list of available properties.
Run `rabtap help properties` to see the list of available properties:

```text
DeliveryMode - delivery mode: 'transient' or 'persistent'
Priority - message priority for priority queues
Expiration - message TTL (ms)
ContentType - application use - MIME content type
ContentEncoding - application use - MIME content encoding
CorrelationId - application use - correlation identifier
ReplyTo - application use - address to reply to
MessageId - application use - message identifier
Timestamp - application use - RFC3339 message timestamp
Type - application use - message type name
AppId - application use - creating application id
UserId - user id, validated if set
```

Examples:

Expand All @@ -665,7 +679,10 @@ Examples:
from message files previously recorded to this directory and replayed in the
order they were recorded
* `echo hello | rabtap pub --exchange amq.fanout --property Expiration=1000` -
publish "hello" to exchange amqp.fanout and set the message expiration to 1000ms.
publish `hello` to exchange `amq.fanout` and set the message expiration to 1000ms.
* `echo hello | gzip | rabtap pub --exchange amq.fanout --property ContentEncoding=gzip` -
publish gzip compressed `hello` to exchange `amq.fanout` and set the `ContentEncoding`
message property accordingly.

#### Poor mans shovel

Expand Down Expand Up @@ -795,6 +812,9 @@ Notes:

* the `--json` option is now deprecated. Use `--format=json` instead
* `nopp` stands for `no pretty-print`
* When the message body is output on the console in `raw` format, Rabtap takes the
`ContentEncoding` property into account and decompresses the body if necessary.
Currently supported encodings are gzip, deflate, zstd, and bzip2.

### JSON message format

Expand Down
21 changes: 21 additions & 0 deletions cmd/rabtap/body.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package main

import (
"bytes"
"fmt"

amqp "github.com/rabbitmq/amqp091-go"
)

// Body returns the message Body, uncompressing if necessary
func Body(m *amqp.Delivery) ([]byte, error) {
// currently we only expect a single encoding in the header
if enc := m.ContentEncoding; enc != "" {
dec, err := NewDecompressor(enc)
if err != nil {
return nil, fmt.Errorf("decompress: %w", err)
}
return dec(bytes.NewReader(m.Body))
}
return m.Body, nil
}
47 changes: 47 additions & 0 deletions cmd/rabtap/body_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package main

import (
"encoding/hex"
"testing"

amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestBodyDecompressesACompressedBody(t *testing.T) {
// given
buf, err := hex.DecodeString("f372f4e30200") // JAN\n
require.NoError(t, err)
d := amqp.Delivery{Body: buf, ContentEncoding: "deflate"}

// when
buf, err = Body(&d)

// then
require.NoError(t, err)
assert.Equal(t, "JAN\n", string(buf))
}

func TestBodyFailsWithUnknownEncoding(t *testing.T) {
// given
d := amqp.Delivery{ContentEncoding: "invalid"}

// when
_, err := Body(&d)

// then
assert.ErrorContains(t, err, "decompress: unsupported encoding")
}

func TestBodyReturnsTheBodyAsIsWithoutAnEncoding(t *testing.T) {
// given
d := amqp.Delivery{Body: []byte("JAN")}

// when
buf, err := Body(&d)

// then
require.NoError(t, err)
assert.Equal(t, "JAN", string(buf))
}
28 changes: 14 additions & 14 deletions cmd/rabtap/cmd_publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ import (

// CmdPublishArg contains arguments for the publish command
type CmdPublishArg struct {
amqpURL *url.URL
tlsConfig *tls.Config
exchange *string
routingKey *string
headers rabtap.KeyValueMap
providerFunc MessageProviderFunc
speed float64
fixedDelay *time.Duration
confirms bool
mandatory bool
amqpURL *url.URL
tlsConfig *tls.Config
exchange *string
routingKey *string
headers rabtap.KeyValueMap
source MessageSource
speed float64
fixedDelay *time.Duration
confirms bool
mandatory bool
}

type DelayFunc func(first, second *RabtapPersistentMessage)
Expand Down Expand Up @@ -88,7 +88,7 @@ func publishMessageStream(publishCh rabtap.PublishChannel,
optExchange *string,
optRoutingKey *string,
headers rabtap.KeyValueMap,
readNextMessageFunc MessageProviderFunc,
source MessageSource,
delayFunc DelayFunc) error {

defer func() {
Expand All @@ -97,7 +97,7 @@ func publishMessageStream(publishCh rabtap.PublishChannel,

var lastMsg *RabtapPersistentMessage
for {
msg, err := readNextMessageFunc()
msg, err := source()
switch err {
case io.EOF: // if errors.Is(err, io.EOF)
return nil
Expand Down Expand Up @@ -150,13 +150,13 @@ func cmdPublish(ctx context.Context, cmd CmdPublishArg) error {
}

go func() {
// runs as long as providerFunc returns messages. Unfortunately, we
// runs as long as source returns messages. Unfortunately, we
// can not stop a blocking read on a file like we do with channels
// and select. So we don't put the goroutine in the error group to
// avoid blocking when e.g. the user presses CTRL+S and then CTRL+C.
// TODO find better solution
resultCh <- publishMessageStream(publishCh, cmd.exchange,
cmd.routingKey, cmd.headers, cmd.providerFunc, delayFunc)
cmd.routingKey, cmd.headers, cmd.source, delayFunc)
}()

g.Go(func() error {
Expand Down
26 changes: 13 additions & 13 deletions cmd/rabtap/cmd_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ import (

// CmdSubscribeArg contains arguments for the subscribe command
type CmdSubscribeArg struct {
amqpURL *url.URL
queue string
tlsConfig *tls.Config
messageReceiveFunc MessageReceiveFunc
termPred Predicate
filterPred Predicate
reject bool
requeue bool
args rabtap.KeyValueMap
timeout time.Duration
amqpURL *url.URL
queue string
tlsConfig *tls.Config
messageSink MessageSink
termPred Predicate
filterPred Predicate
reject bool
requeue bool
args rabtap.KeyValueMap
timeout time.Duration
}

// cmdSub subscribes to messages from the given queue
Expand All @@ -45,11 +45,11 @@ func cmdSubscribe(ctx context.Context, cmd CmdSubscribeArg) error {
g.Go(func() error { return subscriber.EstablishSubscription(ctx, cmd.queue, messageChannel, errorChannel) })
g.Go(func() error {

acknowledger := createAcknowledgeFunc(cmd.reject, cmd.requeue)
err := messageReceiveLoop(ctx,
acknowledger := CreateAcknowledgeFunc(cmd.reject, cmd.requeue)
err := MessageReceiveLoop(ctx,
messageChannel,
errorChannel,
cmd.messageReceiveFunc,
cmd.messageSink,
cmd.filterPred,
cmd.termPred,
acknowledger,
Expand Down
28 changes: 14 additions & 14 deletions cmd/rabtap/cmd_subscribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func TestCmdSubFailsEarlyWhenBrokerIsNotAvailable(t *testing.T) {
go func() {
// we expect cmdSubscribe to return
cmdSubscribe(ctx, CmdSubscribeArg{
amqpURL: amqpURL,
queue: "queue",
tlsConfig: &tls.Config{},
messageReceiveFunc: func(rabtap.TapMessage) error { return nil },
termPred: &constantPred{false},
timeout: time.Second * 10,
amqpURL: amqpURL,
queue: "queue",
tlsConfig: &tls.Config{},
messageSink: func(rabtap.TapMessage) error { return nil },
termPred: &constantPred{false},
timeout: time.Second * 10,
})
done <- true
}()
Expand Down Expand Up @@ -78,13 +78,13 @@ func TestCmdSub(t *testing.T) {

// subscribe to testQueue
go cmdSubscribe(ctx, CmdSubscribeArg{
amqpURL: amqpURL,
queue: testQueue,
tlsConfig: tlsConfig,
messageReceiveFunc: receiveFunc,
filterPred: constantPred{true},
termPred: constantPred{false},
timeout: time.Second * 10,
amqpURL: amqpURL,
queue: testQueue,
tlsConfig: tlsConfig,
messageSink: receiveFunc,
filterPred: constantPred{true},
termPred: constantPred{false},
timeout: time.Second * 10,
})

time.Sleep(time.Second * 1)
Expand All @@ -100,7 +100,7 @@ func TestCmdSub(t *testing.T) {
routingKey: &testKey,
headers: rabtap.KeyValueMap{},
tlsConfig: tlsConfig,
providerFunc: func() (RabtapPersistentMessage, error) {
source: func() (RabtapPersistentMessage, error) {
// provide exactly one message
if messageCount > 0 {
return RabtapPersistentMessage{}, io.EOF
Expand Down
Loading

0 comments on commit 4fad502

Please sign in to comment.