diff --git a/.github/workflows/create_release.yml b/.github/workflows/create_release.yml index e474528..4179668 100644 --- a/.github/workflows/create_release.yml +++ b/.github/workflows/create_release.yml @@ -12,7 +12,7 @@ jobs: - name: Install Go uses: actions/setup-go@v5 with: - go-version: 1.22.4 + go-version: 1.23.1 - name: Checkout code uses: actions/checkout@v4 - name: Run GoReleaser diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6b43557..9d058aa 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,7 +8,7 @@ on: - master env: - GO_VERSION: "1.22.5" + GO_VERSION: "1.23.1" GO_LANG_CI_LINT_VERSION: "v1.60.1" name: run tests diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ac0160..2a9cf72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog for rabtap +## v1.43 (2024-09-22) + +* new: decode compressed message bodies in `tap` and `sub` command, when the + messages are printed to the console. The encoding is taken from the + `ContentEncoding` message property. `gzip`, `deflate`, `zstd` and `bzip2` + are supported. + ## v1.42 (2024-09-04) * new: `--property KEY=VALUE` option to specify message properties like e.g. diff --git a/Makefile b/Makefile index 9180f29..1d459aa 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ TOXICMD:=docker compose exec toxiproxy /go/bin/toxiproxy-cli .PHONY: phony build: phony - CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags \ + CGO_ENABLED=0 go build -ldflags \ "-s -w -X main.version=$(VERSION)" -o ./bin/rabtap ./cmd/rabtap wasm-build: phony CGO_ENABLED=1 GOOS=wasip1 GOARCH=wasm go build -o ./bin/rabtap-wasm ./cmd/rabtap diff --git a/README.md b/README.md index 07c5d0d..3165d68 100644 --- a/README.md +++ b/README.md @@ -139,18 +139,17 @@ compile from source. ## Usage -``` - +```text rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap Usage: rabtap info [--api=APIURI] [--consumers] [--stats] [--filter=EXPR] [--omit-empty] [--show-default] [--mode=MODE] [--format=FORMAT] [-kncv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap tap EXCHANGES [--uri=URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM] + rabtap tap EXCHANGES [--uri=URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM] [--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] - rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [--limit=NUM] + rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [--limit=NUM] [--idle-timeout=DURATION] [--filter=EXPR] [-jkncsv] [(--tls-cert-file=CERTFILE --tls-key-file=KEYFILE)] [--tls-ca-file=CAFILE] rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--limit=NUM] @@ -210,7 +209,7 @@ Arguments and options: --consumers include consumers and connections in output of info command. --delay=DELAY Time to wait between sending messages during publish. If not set then messages will be delayed as recorded. - The value must be suffixed with a time unit, e.g. ms, s etc. + The value must be suffixed with a time unit, e.g. ms, s etc. -d, --durable create durable exchange/queue. --exchange=EXCHANGE Optional exchange to publish to. If omitted, exchange will be taken from message being published (see JSON message format). @@ -224,8 +223,8 @@ Arguments and options: --header=KV A key value pair in the form of "key=value" used as a routing- or binding-key. Can occur multiple times. --idle-timeout=DURATION end reading messages when no new message was received - for the given duration. The value must be suffixed with - a time unit, e.g. ms, s etc. + for the given duration. The value must be suffixed with + a time unit, e.g. ms, s etc. -j, --json deprecated. Use "--format json" instead. -k, --insecure allow insecure TLS connections (no certificate check). --lazy create a lazy queue. @@ -238,9 +237,9 @@ Arguments and options: --omit-empty don't show echanges without bindings in info command. --offset=OFFSET Offset when reading from a stream. Can be 'first', 'last', 'next', a duration like '10m', a RFC3339-Timestamp or - an integer index value. Basically it is an alias for - '--args=x-stream-offset=OFFSET'. - --property=KV A key value pair in the form of "key=value" to specify + an integer index value. Basically it is an alias for + '--args=x-stream-offset=OFFSET'. + --property=KV A key value pair in the form of "key=value" to specify message properties like e.g. the content-type. --queue-type=TYPE type of queue [default: classic]. --reason=REASON reason why the connection was closed [default: closed by rabtap]. @@ -248,13 +247,13 @@ Arguments and options: --requeue Instruct broker to requeue rejected message -r, --routingkey=KEY routing key to use in publish mode. If omitted, routing key will be taken from message being published (see JSON - message format). + message format). --saveto=DIR also save messages and metadata to DIR. --show-default include default exchange in output info command. -s, --silent suppress message output to stdout. --speed=FACTOR Speed factor to use during publish [default: 1.0]. --stats include statistics in output of info command. - -t, --type=TYPE type of exchange [default: fanout]. + -t, --type=TYPE type of exchange [default: fanout]. --tls-cert-file=CERTFILE A Cert file to use for client authentication. --tls-key-file=KEYFILE A Key file to use for client authentication. --tls-ca-file=CAFILE A CA Cert file to use with TLS. @@ -277,7 +276,7 @@ Examples: rabtap sub JDQ # print only messages that have ".Name == 'JAN'" in their JSON payload - rabtap sub JDQ --filter="let b=fromJSON(r.toStr(r.body(r.msg))); b.Name == 'JAN'" + rabtap sub JDQ --filter="let b=fromJSON(r.toStr(r.body(r.msg))); b.Name == 'JAN'" rabtap queue rm JDQ # use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api @@ -642,7 +641,22 @@ the message and rabtap will log an error. Use the `--property` option to set message properties like `ContentType` etc. Multiple properties can be specified by specifying multiple `--property` options. -Run `rabtap help properties` to see the list of available properties. +Run `rabtap help properties` to see the list of available properties: + +```text +DeliveryMode - delivery mode: 'transient' or 'persistent' +Priority - message priority for priority queues +Expiration - message TTL (ms) +ContentType - application use - MIME content type +ContentEncoding - application use - MIME content encoding +CorrelationId - application use - correlation identifier +ReplyTo - application use - address to reply to +MessageId - application use - message identifier +Timestamp - application use - RFC3339 message timestamp +Type - application use - message type name +AppId - application use - creating application id +UserId - user id, validated if set +``` Examples: @@ -665,7 +679,10 @@ Examples: from message files previously recorded to this directory and replayed in the order they were recorded * `echo hello | rabtap pub --exchange amq.fanout --property Expiration=1000` - - publish "hello" to exchange amqp.fanout and set the message expiration to 1000ms. + publish `hello` to exchange `amq.fanout` and set the message expiration to 1000ms. +* `echo hello | gzip | rabtap pub --exchange amq.fanout --property ContentEncoding=gzip` - + publish gzip compressed `hello` to exchange `amq.fanout` and set the `ContentEncoding` + message property accordingly. #### Poor mans shovel @@ -795,6 +812,9 @@ Notes: * the `--json` option is now deprecated. Use `--format=json` instead * `nopp` stands for `no pretty-print` +* When the message body is output on the console in `raw` format, Rabtap takes the + `ContentEncoding` property into account and decompresses the body if necessary. + Currently supported encodings are gzip, deflate, zstd, and bzip2. ### JSON message format diff --git a/cmd/rabtap/body.go b/cmd/rabtap/body.go new file mode 100644 index 0000000..f9ce792 --- /dev/null +++ b/cmd/rabtap/body.go @@ -0,0 +1,21 @@ +package main + +import ( + "bytes" + "fmt" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// Body returns the message Body, uncompressing if necessary +func Body(m *amqp.Delivery) ([]byte, error) { + // currently we only expect a single encoding in the header + if enc := m.ContentEncoding; enc != "" { + dec, err := NewDecompressor(enc) + if err != nil { + return nil, fmt.Errorf("decompress: %w", err) + } + return dec(bytes.NewReader(m.Body)) + } + return m.Body, nil +} diff --git a/cmd/rabtap/body_test.go b/cmd/rabtap/body_test.go new file mode 100644 index 0000000..f410b4e --- /dev/null +++ b/cmd/rabtap/body_test.go @@ -0,0 +1,47 @@ +package main + +import ( + "encoding/hex" + "testing" + + amqp "github.com/rabbitmq/amqp091-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBodyDecompressesACompressedBody(t *testing.T) { + // given + buf, err := hex.DecodeString("f372f4e30200") // JAN\n + require.NoError(t, err) + d := amqp.Delivery{Body: buf, ContentEncoding: "deflate"} + + // when + buf, err = Body(&d) + + // then + require.NoError(t, err) + assert.Equal(t, "JAN\n", string(buf)) +} + +func TestBodyFailsWithUnknownEncoding(t *testing.T) { + // given + d := amqp.Delivery{ContentEncoding: "invalid"} + + // when + _, err := Body(&d) + + // then + assert.ErrorContains(t, err, "decompress: unsupported encoding") +} + +func TestBodyReturnsTheBodyAsIsWithoutAnEncoding(t *testing.T) { + // given + d := amqp.Delivery{Body: []byte("JAN")} + + // when + buf, err := Body(&d) + + // then + require.NoError(t, err) + assert.Equal(t, "JAN", string(buf)) +} diff --git a/cmd/rabtap/cmd_publish.go b/cmd/rabtap/cmd_publish.go index 55474b7..0c8c323 100644 --- a/cmd/rabtap/cmd_publish.go +++ b/cmd/rabtap/cmd_publish.go @@ -18,16 +18,16 @@ import ( // CmdPublishArg contains arguments for the publish command type CmdPublishArg struct { - amqpURL *url.URL - tlsConfig *tls.Config - exchange *string - routingKey *string - headers rabtap.KeyValueMap - providerFunc MessageProviderFunc - speed float64 - fixedDelay *time.Duration - confirms bool - mandatory bool + amqpURL *url.URL + tlsConfig *tls.Config + exchange *string + routingKey *string + headers rabtap.KeyValueMap + source MessageSource + speed float64 + fixedDelay *time.Duration + confirms bool + mandatory bool } type DelayFunc func(first, second *RabtapPersistentMessage) @@ -88,7 +88,7 @@ func publishMessageStream(publishCh rabtap.PublishChannel, optExchange *string, optRoutingKey *string, headers rabtap.KeyValueMap, - readNextMessageFunc MessageProviderFunc, + source MessageSource, delayFunc DelayFunc) error { defer func() { @@ -97,7 +97,7 @@ func publishMessageStream(publishCh rabtap.PublishChannel, var lastMsg *RabtapPersistentMessage for { - msg, err := readNextMessageFunc() + msg, err := source() switch err { case io.EOF: // if errors.Is(err, io.EOF) return nil @@ -150,13 +150,13 @@ func cmdPublish(ctx context.Context, cmd CmdPublishArg) error { } go func() { - // runs as long as providerFunc returns messages. Unfortunately, we + // runs as long as source returns messages. Unfortunately, we // can not stop a blocking read on a file like we do with channels // and select. So we don't put the goroutine in the error group to // avoid blocking when e.g. the user presses CTRL+S and then CTRL+C. // TODO find better solution resultCh <- publishMessageStream(publishCh, cmd.exchange, - cmd.routingKey, cmd.headers, cmd.providerFunc, delayFunc) + cmd.routingKey, cmd.headers, cmd.source, delayFunc) }() g.Go(func() error { diff --git a/cmd/rabtap/cmd_subscribe.go b/cmd/rabtap/cmd_subscribe.go index f598abd..f2b3912 100644 --- a/cmd/rabtap/cmd_subscribe.go +++ b/cmd/rabtap/cmd_subscribe.go @@ -16,16 +16,16 @@ import ( // CmdSubscribeArg contains arguments for the subscribe command type CmdSubscribeArg struct { - amqpURL *url.URL - queue string - tlsConfig *tls.Config - messageReceiveFunc MessageReceiveFunc - termPred Predicate - filterPred Predicate - reject bool - requeue bool - args rabtap.KeyValueMap - timeout time.Duration + amqpURL *url.URL + queue string + tlsConfig *tls.Config + messageSink MessageSink + termPred Predicate + filterPred Predicate + reject bool + requeue bool + args rabtap.KeyValueMap + timeout time.Duration } // cmdSub subscribes to messages from the given queue @@ -45,11 +45,11 @@ func cmdSubscribe(ctx context.Context, cmd CmdSubscribeArg) error { g.Go(func() error { return subscriber.EstablishSubscription(ctx, cmd.queue, messageChannel, errorChannel) }) g.Go(func() error { - acknowledger := createAcknowledgeFunc(cmd.reject, cmd.requeue) - err := messageReceiveLoop(ctx, + acknowledger := CreateAcknowledgeFunc(cmd.reject, cmd.requeue) + err := MessageReceiveLoop(ctx, messageChannel, errorChannel, - cmd.messageReceiveFunc, + cmd.messageSink, cmd.filterPred, cmd.termPred, acknowledger, diff --git a/cmd/rabtap/cmd_subscribe_test.go b/cmd/rabtap/cmd_subscribe_test.go index ae79289..3195adb 100644 --- a/cmd/rabtap/cmd_subscribe_test.go +++ b/cmd/rabtap/cmd_subscribe_test.go @@ -32,12 +32,12 @@ func TestCmdSubFailsEarlyWhenBrokerIsNotAvailable(t *testing.T) { go func() { // we expect cmdSubscribe to return cmdSubscribe(ctx, CmdSubscribeArg{ - amqpURL: amqpURL, - queue: "queue", - tlsConfig: &tls.Config{}, - messageReceiveFunc: func(rabtap.TapMessage) error { return nil }, - termPred: &constantPred{false}, - timeout: time.Second * 10, + amqpURL: amqpURL, + queue: "queue", + tlsConfig: &tls.Config{}, + messageSink: func(rabtap.TapMessage) error { return nil }, + termPred: &constantPred{false}, + timeout: time.Second * 10, }) done <- true }() @@ -78,13 +78,13 @@ func TestCmdSub(t *testing.T) { // subscribe to testQueue go cmdSubscribe(ctx, CmdSubscribeArg{ - amqpURL: amqpURL, - queue: testQueue, - tlsConfig: tlsConfig, - messageReceiveFunc: receiveFunc, - filterPred: constantPred{true}, - termPred: constantPred{false}, - timeout: time.Second * 10, + amqpURL: amqpURL, + queue: testQueue, + tlsConfig: tlsConfig, + messageSink: receiveFunc, + filterPred: constantPred{true}, + termPred: constantPred{false}, + timeout: time.Second * 10, }) time.Sleep(time.Second * 1) @@ -100,7 +100,7 @@ func TestCmdSub(t *testing.T) { routingKey: &testKey, headers: rabtap.KeyValueMap{}, tlsConfig: tlsConfig, - providerFunc: func() (RabtapPersistentMessage, error) { + source: func() (RabtapPersistentMessage, error) { // provide exactly one message if messageCount > 0 { return RabtapPersistentMessage{}, io.EOF diff --git a/cmd/rabtap/cmd_tap.go b/cmd/rabtap/cmd_tap.go index 5a4fd38..8a9f466 100644 --- a/cmd/rabtap/cmd_tap.go +++ b/cmd/rabtap/cmd_tap.go @@ -14,12 +14,12 @@ import ( ) type CmdTapArg struct { - tapConfig []rabtap.TapConfiguration - tlsConfig *tls.Config - messageReceiveFunc MessageReceiveFunc - termPred Predicate - filterPred Predicate - timeout time.Duration + tapConfig []rabtap.TapConfiguration + tlsConfig *tls.Config + messageSink MessageSink + termPred Predicate + filterPred Predicate + timeout time.Duration } // cmdTap taps to the given exchanges and displays or saves the received @@ -43,11 +43,11 @@ func cmdTap( }) } g.Go(func() error { - acknowledger := createAcknowledgeFunc(false, false) // ACK - err := messageReceiveLoop(ctx, + acknowledger := CreateAcknowledgeFunc(false, false) // ACK + err := MessageReceiveLoop(ctx, tapMessageChannel, errorChannel, - cmd.messageReceiveFunc, + cmd.messageSink, cmd.filterPred, cmd.termPred, acknowledger, diff --git a/cmd/rabtap/cmd_tap_test.go b/cmd/rabtap/cmd_tap_test.go index 5afdd91..6b59959 100644 --- a/cmd/rabtap/cmd_tap_test.go +++ b/cmd/rabtap/cmd_tap_test.go @@ -47,7 +47,7 @@ func TestCmdTap(t *testing.T) { // when go cmdTap(ctx, CmdTapArg{tapConfig: tapConfig, tlsConfig: &tls.Config{}, - messageReceiveFunc: receiveFunc, + messageSink: receiveFunc, filterPred: constantPred{true}, termPred: constantPred{false}, timeout: time.Second * 10}) diff --git a/cmd/rabtap/command_line.go b/cmd/rabtap/command_line.go index bc6136a..da491f2 100644 --- a/cmd/rabtap/command_line.go +++ b/cmd/rabtap/command_line.go @@ -96,7 +96,7 @@ Arguments and options: --consumers include consumers and connections in output of info command. --delay=DELAY Time to wait between sending messages during publish. If not set then messages will be delayed as recorded. - The value must be suffixed with a time unit, e.g. ms, s etc. + The value must be suffixed with a time unit, e.g. ms, s etc. -d, --durable create durable exchange/queue. --exchange=EXCHANGE Optional exchange to publish to. If omitted, exchange will be taken from message being published (see JSON message format). @@ -111,7 +111,7 @@ Arguments and options: routing- or binding-key. Can occur multiple times. --idle-timeout=DURATION end reading messages when no new message was received for the given duration. The value must be suffixed with - a time unit, e.g. ms, s etc. + a time unit, e.g. ms, s etc. -j, --json deprecated. Use "--format json" instead. -k, --insecure allow insecure TLS connections (no certificate check). --lazy create a lazy queue. @@ -124,8 +124,8 @@ Arguments and options: --omit-empty don't show echanges without bindings in info command. --offset=OFFSET Offset when reading from a stream. Can be 'first', 'last', 'next', a duration like '10m', a RFC3339-Timestamp or - an integer index value. Basically it is an alias for - '--args=x-stream-offset=OFFSET'. + an integer index value. Basically it is an alias for + '--args=x-stream-offset=OFFSET'. --property=KV A key value pair in the form of "key=value" to specify message properties like e.g. the content-type. --queue-type=TYPE type of queue [default: classic]. @@ -134,13 +134,13 @@ Arguments and options: --requeue Instruct broker to requeue rejected message -r, --routingkey=KEY routing key to use in publish mode. If omitted, routing key will be taken from message being published (see JSON - message format). + message format). --saveto=DIR also save messages and metadata to DIR. --show-default include default exchange in output info command. -s, --silent suppress message output to stdout. --speed=FACTOR Speed factor to use during publish [default: 1.0]. --stats include statistics in output of info command. - -t, --type=TYPE type of exchange [default: fanout]. + -t, --type=TYPE type of exchange [default: fanout]. --tls-cert-file=CERTFILE A Cert file to use for client authentication. --tls-key-file=KEYFILE A Key file to use for client authentication. --tls-ca-file=CAFILE A CA Cert file to use with TLS. diff --git a/cmd/rabtap/compression.go b/cmd/rabtap/compression.go deleted file mode 100644 index a95bd9b..0000000 --- a/cmd/rabtap/compression.go +++ /dev/null @@ -1,26 +0,0 @@ -package main - -import ( - "bytes" - "compress/gzip" - "io" - - amqp "github.com/rabbitmq/amqp091-go" -) - -// body returns the message body, uncompressing if necessary -func body(m *amqp.Delivery) ([]byte, error) { - if m.ContentEncoding == "gzip" { - return gunzip(bytes.NewReader(m.Body)) - } - return m.Body, nil -} - -func gunzip(r io.Reader) ([]byte, error) { - zr, err := gzip.NewReader(r) - if err != nil { - return nil, err - } - defer zr.Close() - return io.ReadAll(zr) -} diff --git a/cmd/rabtap/compression_test.go b/cmd/rabtap/compression_test.go deleted file mode 100644 index 96e1916..0000000 --- a/cmd/rabtap/compression_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package main - -import ( - "bytes" - "encoding/hex" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestGunzipDecompressesValidBuffer(t *testing.T) { - // echo "JAN" | gzip | hexdump -X - buf, err := hex.DecodeString("1f8b0800000000000003f372f4e30200270b9a2a04000000") - require.NoError(t, err) - u, err := gunzip(bytes.NewReader(buf)) - require.NoError(t, err) - assert.Equal(t, "JAN\n", string(u)) -} -func TestGunzipFailsWithInvalidData(t *testing.T) { - buf := []byte{1, 2, 3} - _, err := gunzip(bytes.NewReader(buf)) - require.Error(t, err) -} diff --git a/cmd/rabtap/decompression.go b/cmd/rabtap/decompression.go new file mode 100644 index 0000000..59fc7ef --- /dev/null +++ b/cmd/rabtap/decompression.go @@ -0,0 +1,64 @@ +package main + +import ( + "compress/bzip2" + "compress/flate" + "compress/gzip" + "fmt" + "io" + "strings" + + "github.com/klauspost/compress/zstd" +) + +type DecompressionFunc func(r io.Reader) ([]byte, error) + +// NewDecompressor returns a decompression function according to the given +// algorithmn +func NewDecompressor(alg string) (DecompressionFunc, error) { + switch strings.ToLower(alg) { + case "gzip": + return decompressGunzip, nil + case "zstd": + return decompressZstd, nil + case "bzip2": + return decompressBunzip2, nil + case "deflate": + return decompressDeflate, nil + case "identity": + return func(r io.Reader) ([]byte, error) { return io.ReadAll(r) }, nil + default: + return nil, fmt.Errorf("unsupported encoding: %s", alg) + } +} + +func decompressZstd(r io.Reader) ([]byte, error) { + cr, err := zstd.NewReader(r) + if err != nil { + return nil, err + } + defer cr.Close() + return io.ReadAll(cr) +} + +func decompressDeflate(r io.Reader) ([]byte, error) { + // compress/zlib is for zlib-formatted data (DEFLATE data with zlib header). + // compress/flate is for raw DEFLATE data without any headers. + cr := flate.NewReader(r) + defer cr.Close() + return io.ReadAll(cr) +} + +func decompressBunzip2(r io.Reader) ([]byte, error) { + cr := bzip2.NewReader(r) + return io.ReadAll(cr) +} + +func decompressGunzip(r io.Reader) ([]byte, error) { + cr, err := gzip.NewReader(r) + if err != nil { + return nil, err + } + defer cr.Close() + return io.ReadAll(cr) +} diff --git a/cmd/rabtap/decompression_test.go b/cmd/rabtap/decompression_test.go new file mode 100644 index 0000000..9acf1c9 --- /dev/null +++ b/cmd/rabtap/decompression_test.go @@ -0,0 +1,63 @@ +package main + +import ( + "bytes" + "encoding/hex" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDecompressFailsWithInvalidData(t *testing.T) { + testcases := []struct { + name string + }{ + {"zstd"}, + {"deflate"}, + {"gzip"}, + {"bzip2"}, + } + for _, tc := range testcases { + t.Run(fmt.Sprintf("algorihmn %s", tc.name), func(t *testing.T) { + }) + dec, err := NewDecompressor(tc.name) + require.NoError(t, err) + + buf := []byte{1, 2, 3} + _, err = dec(bytes.NewReader(buf)) + require.Error(t, err) + } +} + +func TestDecompressesValidData(t *testing.T) { + testcases := []struct { + alg string + probe string + expected string + }{ + // echo "JAN"|zstd|xxd -p -c0 + {"zstd", "28b52ffd04582100004a414e0a0a21908f", "JAN\n"}, + // echo "JAN"|python3 -c "import sys, zlib; compressor = zlib.compressobj(wbits=-zlib.MAX_WBITS); sys.stdout.buffer.write(compressor.compress(sys.stdin.buffer.read()) + compressor.flush())" | xxd -p -c0 + {"deflate", "f372f4e30200", "JAN\n"}, + // echo "JAN"|gzip|xxd -p -c0 + {"gzip", "1f8b0800000000000003f372f4e30200270b9a2a04000000", "JAN\n"}, + // echo "JAN"|bzip2|xxd -p -c0 + {"bzip2", "425a6839314159265359dab9c92b0000014400001020112000219a68334d173c5dc914e142436ae724ac", "JAN\n"}, + {"identity", "4a414e", "JAN"}, + } + for _, tc := range testcases { + t.Run(fmt.Sprintf("algorithmn %s", tc.alg), func(t *testing.T) { + dec, err := NewDecompressor(tc.alg) + require.NoError(t, err) + + buf, err := hex.DecodeString(tc.probe) + require.NoError(t, err) + + u, err := dec(bytes.NewReader(buf)) + require.NoError(t, err) + assert.Equal(t, tc.expected, string(u)) + }) + } +} diff --git a/cmd/rabtap/default_message_formatter.go b/cmd/rabtap/default_message_formatter.go index c14b48a..d2f885b 100644 --- a/cmd/rabtap/default_message_formatter.go +++ b/cmd/rabtap/default_message_formatter.go @@ -2,12 +2,10 @@ package main -import rabtap "github.com/jandelgado/rabtap/pkg" - // DefaultMessageFormatter is the standard message. type DefaultMessageFormatter struct{} // Format just returns the message body as string, no formatting applied. -func (s DefaultMessageFormatter) Format(message rabtap.TapMessage) string { - return string(message.AmqpMessage.Body) +func (s DefaultMessageFormatter) Format(body []byte) string { + return string(body) } diff --git a/cmd/rabtap/json_message_formatter.go b/cmd/rabtap/json_message_formatter.go index 2b8e099..1de07e5 100644 --- a/cmd/rabtap/json_message_formatter.go +++ b/cmd/rabtap/json_message_formatter.go @@ -5,8 +5,6 @@ package main import ( "encoding/json" "strings" - - rabtap "github.com/jandelgado/rabtap/pkg" ) // JSONMessageFormatter pretty prints JSON formatted messages. @@ -22,24 +20,24 @@ var ( // Format tries to format a message in JSON format. The body can be a simple // JSON object or an array of JSON objects. If the message is not valid JSON, // it will be returned unformatted as-is. -func (s JSONMessageFormatter) Format(message rabtap.TapMessage) string { +func (s JSONMessageFormatter) Format(body []byte) string { var formatted []byte - originalMessage := strings.TrimSpace(string(message.AmqpMessage.Body)) + originalMessage := strings.TrimSpace(string(body)) if len(originalMessage) == 0 { - return string(message.AmqpMessage.Body) + return string(body) } if originalMessage[0] == '[' { // try to unmarshal array to JSON objects var arrayJSONObj []map[string]interface{} err := json.Unmarshal([]byte(originalMessage), &arrayJSONObj) if err != nil { - return string(message.AmqpMessage.Body) + return string(body) } // pretty print JSON formatted, err = json.MarshalIndent(arrayJSONObj, "", " ") if err != nil { - return string(message.AmqpMessage.Body) + return string(body) } } else { @@ -48,12 +46,12 @@ func (s JSONMessageFormatter) Format(message rabtap.TapMessage) string { err := json.Unmarshal([]byte(originalMessage), &simpleJSONObj) if err != nil { - return string(message.AmqpMessage.Body) + return string(body) } formatted, err = json.MarshalIndent(simpleJSONObj, "", " ") if err != nil { - return string(message.AmqpMessage.Body) + return string(body) } } return string(formatted) diff --git a/cmd/rabtap/json_message_formatter_test.go b/cmd/rabtap/json_message_formatter_test.go index b5f2440..34b502b 100644 --- a/cmd/rabtap/json_message_formatter_test.go +++ b/cmd/rabtap/json_message_formatter_test.go @@ -4,57 +4,44 @@ package main import ( "testing" - "time" - rabtap "github.com/jandelgado/rabtap/pkg" - amqp "github.com/rabbitmq/amqp091-go" "github.com/stretchr/testify/assert" ) func TestJSONFormatterInvalidArray(t *testing.T) { - message := amqp.Delivery{ - Body: []byte("[ {\"a\":1} "), - } - formattedMessage := JSONMessageFormatter{}.Format(rabtap.NewTapMessage(&message, time.Now())) + body := []byte("[ {\"a\":1} ") + formattedMessage := JSONMessageFormatter{}.Format(body) // message is expected to be returned untouched assert.Equal(t, "[ {\"a\":1} ", formattedMessage) } func TestJSONFormatterValidArray(t *testing.T) { - message := amqp.Delivery{ - Body: []byte(" [ {\"a\":1} ] "), - } - formattedMessage := JSONMessageFormatter{}.Format(rabtap.NewTapMessage(&message, time.Now())) + body := []byte(" [ {\"a\":1} ] ") + formattedMessage := JSONMessageFormatter{}.Format(body) assert.Equal(t, "[\n {\n \"a\": 1\n }\n]", formattedMessage) } func TestJSONFormatterInvalidObject(t *testing.T) { - message := amqp.Delivery{ - Body: []byte("[ {\"a\":1 "), - } - formattedMessage := JSONMessageFormatter{}.Format(rabtap.NewTapMessage(&message, time.Now())) + body := []byte("[ {\"a\":1 ") + formattedMessage := JSONMessageFormatter{}.Format(body) // message is expected to be returned untouched assert.Equal(t, "[ {\"a\":1 ", formattedMessage) } func TestJSONFormatterValidObject(t *testing.T) { - message := amqp.Delivery{ - Body: []byte(" {\"a\":1} "), - } - formattedMessage := JSONMessageFormatter{}.Format(rabtap.NewTapMessage(&message, time.Now())) + body := []byte(" {\"a\":1} ") + formattedMessage := JSONMessageFormatter{}.Format(body) assert.Equal(t, "{\n \"a\": 1\n}", formattedMessage) } func TestJSONFormatterEmptyValue(t *testing.T) { // An empty buffer effectively should be returned unmodified - message := amqp.Delivery{ - Body: []byte(""), - } - formattedMessage := JSONMessageFormatter{}.Format(rabtap.NewTapMessage(&message, time.Now())) + body := []byte("") + formattedMessage := JSONMessageFormatter{}.Format(body) // message is expected to be returned untouched assert.Equal(t, "", formattedMessage) } diff --git a/cmd/rabtap/main.go b/cmd/rabtap/main.go index 644f19b..2b1bb8d 100644 --- a/cmd/rabtap/main.go +++ b/cmd/rabtap/main.go @@ -61,7 +61,7 @@ func getTLSConfig(insecureTLS bool, certFile string, keyFile string, caFile stri } if caFile != "" { - caCert, err := ioutil.ReadFile(caFile) + caCert, err := os.ReadFile(caFile) failOnError(err, "invalid tls ca file", os.Exit) caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) @@ -90,12 +90,12 @@ func startCmdInfo(ctx context.Context, args CommandLineArgs, titleURL *url.URL) out: NewColorableWriter(os.Stdout)}) } -// createMessageReaderForPublish returns a MessageReaderFunc that reads +// createMessageReaderForPublish returns a message source that reads // messages from the given source in the specified format. The source can // be either empty (=stdin), a filename or a directory name -func createMessageReaderForPublishFunc(source *string, format string) (MessageProviderFunc, error) { +func newPublishMessageSource(source *string, format string) (MessageSource, error) { if source == nil { - return CreateMessageReaderFunc(format, os.Stdin) + return NewReaderMessageSource(format, os.Stdin) } fi, err := os.Stat(*source) @@ -109,7 +109,7 @@ func createMessageReaderForPublishFunc(source *string, format string) (MessagePr return nil, err } // TODO close file - return CreateMessageReaderFunc(format, file) + return NewReaderMessageSource(format, file) } else { metadataFiles, err := LoadMetadataFilesFromDir(*source, ioutil.ReadDir, NewRabtapFileInfoPredicate()) @@ -122,7 +122,7 @@ func createMessageReaderForPublishFunc(source *string, format string) (MessagePr metadataFiles[j].metadata.XRabtapReceivedTimestamp) }) - return CreateMessageFromDirReaderFunc(format, metadataFiles) + return NewReadFilesFromDirMessageSource(format, metadataFiles) } } @@ -130,35 +130,35 @@ func startCmdPublish(ctx context.Context, args CommandLineArgs) { if args.Format == "raw" && args.PubExchange == nil && args.PubRoutingKey == nil { fmt.Fprint(os.Stderr, "Warning: using raw message format but neither exchange or routing key are set.\n") } - provider, err := createMessageReaderForPublishFunc(args.Source, args.Format) - provider = NewTransformingMessageProvider(provider, + source, err := newPublishMessageSource(args.Source, args.Format) + failOnError(err, "message-reader", os.Exit) + source = NewTransformingMessageSource(source, FireHoseTransformer, NewPropertiesTransformer(args.Properties)) - failOnError(err, "message-reader", os.Exit) err = cmdPublish(ctx, CmdPublishArg{ - amqpURL: args.AMQPURL, - exchange: args.PubExchange, - routingKey: args.PubRoutingKey, - headers: args.Args, - fixedDelay: args.Delay, - speed: args.Speed, - tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile), - mandatory: args.Mandatory, - confirms: args.Confirms, - providerFunc: provider}) + amqpURL: args.AMQPURL, + exchange: args.PubExchange, + routingKey: args.PubRoutingKey, + headers: args.Args, + fixedDelay: args.Delay, + speed: args.Speed, + tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile), + mandatory: args.Mandatory, + confirms: args.Confirms, + source: source}) failOnError(err, "publish", os.Exit) } func startCmdSubscribe(ctx context.Context, args CommandLineArgs) { - opts := MessageReceiveFuncOptions{ + opts := MessageSinkOptions{ out: NewColorableWriter(os.Stdout), format: args.Format, silent: args.Silent, optSaveDir: args.SaveDir, filenameProvider: defaultFilenameProvider, } - messageReceiveFunc, err := createMessageReceiveFunc(opts) + messageSink, err := NewMessageSink(opts) failOnError(err, "options", os.Exit) termPred, err := NewLoopCountPred(args.Limit) @@ -167,29 +167,29 @@ func startCmdSubscribe(ctx context.Context, args CommandLineArgs) { failOnError(err, fmt.Sprintf("invalid message filter predicate '%s'", args.Filter), os.Exit) err = cmdSubscribe(ctx, CmdSubscribeArg{ - amqpURL: args.AMQPURL, - queue: args.QueueName, - requeue: args.Requeue, - reject: args.Reject, - tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile), - messageReceiveFunc: messageReceiveFunc, - filterPred: filterPred, - termPred: termPred, - args: args.Args, - timeout: args.IdleTimeout, + amqpURL: args.AMQPURL, + queue: args.QueueName, + requeue: args.Requeue, + reject: args.Reject, + tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile), + messageSink: messageSink, + filterPred: filterPred, + termPred: termPred, + args: args.Args, + timeout: args.IdleTimeout, }) failOnError(err, "error subscribing messages", os.Exit) } func startCmdTap(ctx context.Context, args CommandLineArgs) { - opts := MessageReceiveFuncOptions{ + opts := MessageSinkOptions{ out: NewColorableWriter(os.Stdout), format: args.Format, silent: args.Silent, optSaveDir: args.SaveDir, filenameProvider: defaultFilenameProvider, } - messageReceiveFunc, err := createMessageReceiveFunc(opts) + messageSink, err := NewMessageSink(opts) failOnError(err, "options", os.Exit) termPred, err := NewLoopCountPred(args.Limit) failOnError(err, "invalid message limit predicate", os.Exit) @@ -198,12 +198,12 @@ func startCmdTap(ctx context.Context, args CommandLineArgs) { cmdTap(ctx, CmdTapArg{ - tapConfig: args.TapConfig, - tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile), - messageReceiveFunc: messageReceiveFunc, - filterPred: filterPred, - termPred: termPred, - timeout: args.IdleTimeout, + tapConfig: args.TapConfig, + tlsConfig: getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile), + messageSink: messageSink, + filterPred: filterPred, + termPred: termPred, + timeout: args.IdleTimeout, }) } diff --git a/cmd/rabtap/message_printer.go b/cmd/rabtap/message_printer.go index 80d64db..6a7aaa0 100644 --- a/cmd/rabtap/message_printer.go +++ b/cmd/rabtap/message_printer.go @@ -22,37 +22,40 @@ exchange.......: {{ ExchangeColor .Message.AmqpMessage.Exchange }} {{end}}{{if not .Message.AmqpMessage.Timestamp.IsZero}}app-timestamp..: {{ .Message.AmqpMessage.Timestamp }} {{end}}{{with .Message.AmqpMessage.Type}}app-type.......: {{.}} {{end}}{{with .Message.AmqpMessage.CorrelationId}}app-corr-id....: {{.}} +{{end}}{{with .Message.AmqpMessage.ReplyTo}}reply-to.......: {{.}} +{{end}}{{with .Message.AmqpMessage.AppId}}app-id.........: {{.}} +{{end}}{{with .Message.AmqpMessage.UserId}}user-id........: {{.}} {{end}}{{with .Message.AmqpMessage.Headers}}app-headers....: {{.}} {{end -}} -{{ MessageColor .Body }} +{{ MessageColor (call .Body) }} ` -// PrintMessageInfo holds info for template -type PrintMessageInfo struct { +// PrintMessageEnv holds info for template +type PrintMessageEnv struct { // Message receveived Message rabtap.TapMessage // formatted body - Body string + Body func() string } -// MessageFormatter formats the body of tapped message -type MessageFormatter interface { - Format(message rabtap.TapMessage) string +// MessageBodyFormatter formats the body of a message +type MessageBodyFormatter interface { + Format(body []byte) string } // Registry of available message formatters. Key is contentType -var messageFormatters = map[string]MessageFormatter{} +var messageFormatters = map[string]MessageBodyFormatter{} // RegisterMessageFormatter registers a new message formatter by its // content type. -func RegisterMessageFormatter(contentType string, formatter MessageFormatter) { +func RegisterMessageFormatter(contentType string, formatter MessageBodyFormatter) { messageFormatters[contentType] = formatter } // NewMessageFormatter return a message formatter suitable the given // contentType. -func NewMessageFormatter(contentType string) MessageFormatter { +func NewMessageFormatter(contentType string) MessageBodyFormatter { if formatter, ok := messageFormatters[contentType]; ok { return formatter } @@ -62,15 +65,22 @@ func NewMessageFormatter(contentType string) MessageFormatter { // PrettyPrintMessage formats and prints a tapped message func PrettyPrintMessage(out io.Writer, message rabtap.TapMessage) error { - colorizer := NewColorPrinter() - formatter := NewMessageFormatter(message.AmqpMessage.ContentType) - printStruct := PrintMessageInfo{ + printEnv := PrintMessageEnv{ Message: message, - Body: formatter.Format(message), + Body: func() string { + if b, err := Body(message.AmqpMessage); err != nil { + log.Warnf("decoding failed, printing body as-is: %s", err) + return formatter.Format(message.AmqpMessage.Body) + } else { + return formatter.Format(b) + } + }, } + + colorizer := NewColorPrinter() t := template.Must(template.New("message"). Funcs(colorizer.GetFuncMap()).Parse(messageTemplate)) - return t.Execute(out, printStruct) + return t.Execute(out, printEnv) } diff --git a/cmd/rabtap/message_printer_test.go b/cmd/rabtap/message_printer_test.go index bd29c39..9617d87 100644 --- a/cmd/rabtap/message_printer_test.go +++ b/cmd/rabtap/message_printer_test.go @@ -29,12 +29,15 @@ func ExamplePrettyPrintMessage() { Priority: 99, Expiration: "2017-05-22 17:00:00", ContentType: "plain/text", - ContentEncoding: "utf-8", + ContentEncoding: "identity", MessageId: "4711", Timestamp: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), Type: "some type", CorrelationId: "4712", Headers: amqp.Table{"header": "value"}, + UserId: "jan", + AppId: "rabtap", + ReplyTo: "message123", Body: []byte("simple test message"), } @@ -49,11 +52,14 @@ func ExamplePrettyPrintMessage() { // priority.......: 99 // expiration.....: 2017-05-22 17:00:00 // content-type...: plain/text - // content-enc....: utf-8 + // content-enc....: identity // app-message-id.: 4711 // app-timestamp..: 2009-11-10 23:00:00 +0000 UTC // app-type.......: some type // app-corr-id....: 4712 + // reply-to.......: message123 + // app-id.........: rabtap + // user-id........: jan // app-headers....: map[header:value] // simple test message // diff --git a/cmd/rabtap/message_provider.go b/cmd/rabtap/message_provider.go deleted file mode 100644 index 9e3cedc..0000000 --- a/cmd/rabtap/message_provider.go +++ /dev/null @@ -1,7 +0,0 @@ -// provide RabtapPersistentMessages to be published -package main - -// MessageProviderFunc provides messages that can be published. -// returns the message to be published, xor an error. When no more -// messages are available, io.EOF must be returned. -type MessageProviderFunc func() (RabtapPersistentMessage, error) diff --git a/cmd/rabtap/message_reader_dir.go b/cmd/rabtap/message_reader_dir.go index 0cad16f..20bc206 100644 --- a/cmd/rabtap/message_reader_dir.go +++ b/cmd/rabtap/message_reader_dir.go @@ -1,11 +1,9 @@ -// read persisted metadata and messages from a directory // Copyright (C) 2019-2022 Jan Delgado package main import ( "fmt" "io" - "io/ioutil" "os" "path" "regexp" @@ -26,7 +24,7 @@ func filenameWithoutExtension(fn string) string { return strings.TrimSuffix(fn, path.Ext(fn)) } -// newRabtapFileInfoPredicate returns a FileInfoPredicate that matches +// NewRabtapFileInfoPredicate returns a FileInfoPredicate that matches // rabtap metadata files func NewRabtapFileInfoPredicate() FileInfoPredicate { filenameRe := regexp.MustCompile(metadataFilePattern) @@ -99,9 +97,9 @@ func LoadMetadataFilesFromDir(dirname string, dirReader DirReader, pred FileInfo return readMetadataOfFiles(dirname, filenames) } -// createMessageFromDirReaderFunc returns a MessageProvicerFunc that reads +// NewReadFilesFromDirMessageSource returns a MessageProvicerFunc that reads // messages from the given list of filenames in the given format. -func CreateMessageFromDirReaderFunc(format string, files []FilenameWithMetadata) (MessageProviderFunc, error) { +func NewReadFilesFromDirMessageSource(format string, files []FilenameWithMetadata) (MessageSource, error) { curfile := 0 @@ -125,7 +123,7 @@ func CreateMessageFromDirReaderFunc(format string, files []FilenameWithMetadata) return message, io.EOF } rawFile := filenameWithoutExtension(files[curfile].filename) + ".dat" - body, err := ioutil.ReadFile(rawFile) + body, err := os.ReadFile(rawFile) message = files[curfile].metadata message.Body = body curfile++ diff --git a/cmd/rabtap/message_reader_dir_test.go b/cmd/rabtap/message_reader_dir_test.go index d6d980c..2a47136 100644 --- a/cmd/rabtap/message_reader_dir_test.go +++ b/cmd/rabtap/message_reader_dir_test.go @@ -250,7 +250,7 @@ func TestReadMetadataOfFilesReturnsExpectedMetadata(t *testing.T) { } func TestCreateMessageFromDirReaderFuncReturnsErrorForUnknownFormat(t *testing.T) { - _, err := CreateMessageFromDirReaderFunc("invalid", []FilenameWithMetadata{}) + _, err := NewReadFilesFromDirMessageSource("invalid", []FilenameWithMetadata{}) assert.NotNil(t, err) } @@ -259,7 +259,7 @@ func TestCreateMessageFromDirReaderFuncReturnsCorrectReaderForJSONFormat(t *test formats := []string{"json", "json-nopp"} for _, format := range formats { - reader, err := CreateMessageFromDirReaderFunc(format, []FilenameWithMetadata{}) + reader, err := NewReadFilesFromDirMessageSource(format, []FilenameWithMetadata{}) assert.Nil(t, err) assert.NotNil(t, reader) @@ -272,7 +272,7 @@ func TestCreateMessageFromDirReaderFuncReturnsCorrectReaderForJSONFormat(t *test func TestCreateMessageFromDirReaderFuncReturnsCorrectReaderForRawFormat(t *testing.T) { // TODO complete test - reader, err := CreateMessageFromDirReaderFunc("raw", []FilenameWithMetadata{}) + reader, err := NewReadFilesFromDirMessageSource("raw", []FilenameWithMetadata{}) assert.Nil(t, err) assert.NotNil(t, reader) diff --git a/cmd/rabtap/message_reader_file.go b/cmd/rabtap/message_reader_file.go index 9edc1d8..27f3d5b 100644 --- a/cmd/rabtap/message_reader_file.go +++ b/cmd/rabtap/message_reader_file.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" ) func readMessageFromJSON(reader io.Reader) (RabtapPersistentMessage, error) { @@ -25,9 +24,9 @@ func readMessageFromJSONStream(decoder *json.Decoder) (RabtapPersistentMessage, return message, err } -// CreateMessageReaderFunc returns a MessageProviderFunc that reads messages from +// NewReaderMessageSource returns a MessageSource that reads messages from // the the given reader in the provided format -func CreateMessageReaderFunc(format string, reader io.ReadCloser) (MessageProviderFunc, error) { +func NewReaderMessageSource(format string, reader io.ReadCloser) (MessageSource, error) { switch format { case "json-nopp": fallthrough @@ -43,7 +42,7 @@ func CreateMessageReaderFunc(format string, reader io.ReadCloser) (MessageProvid if read { return RabtapPersistentMessage{}, io.EOF } - buf, err := ioutil.ReadAll(reader) // note: does not return EOF + buf, err := io.ReadAll(reader) // note: does not return EOF read = true return RabtapPersistentMessage{Body: buf}, err }, nil diff --git a/cmd/rabtap/message_reader_file_test.go b/cmd/rabtap/message_reader_file_test.go index d08221d..132631a 100644 --- a/cmd/rabtap/message_reader_file_test.go +++ b/cmd/rabtap/message_reader_file_test.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "io" - "io/ioutil" "testing" "github.com/stretchr/testify/assert" @@ -85,39 +84,39 @@ func TestReadMessageFromJSONStreamReturnsOneMessagePerCall(t *testing.T) { } func TestCreateMessageReaderFuncReturnsErrorForUnknownFormat(t *testing.T) { - reader := ioutil.NopCloser(bytes.NewReader([]byte(""))) - _, err := CreateMessageReaderFunc("invalid", reader) + reader := io.NopCloser(bytes.NewReader([]byte(""))) + _, err := NewReaderMessageSource("invalid", reader) assert.NotNil(t, err) } func TestCreateMessageReaderFuncReturnsJSONReaderForJSONFormats(t *testing.T) { for _, format := range []string{"json", "json-nopp"} { - reader := ioutil.NopCloser(bytes.NewReader([]byte(`{"Body": "aGVsbG8="}`))) + reader := io.NopCloser(bytes.NewReader([]byte(`{"Body": "aGVsbG8="}`))) - readFunc, err := CreateMessageReaderFunc(format, reader) + source, err := NewReaderMessageSource(format, reader) assert.Nil(t, err) - msg, err := readFunc() + msg, err := source() assert.NoError(t, err) assert.Equal(t, []byte("hello"), msg.Body) - msg, err = readFunc() + msg, err = source() assert.Equal(t, io.EOF, err) } } func TestCreateMessageReaderFuncReturnsRawFileReaderForRawFormats(t *testing.T) { - reader := ioutil.NopCloser(bytes.NewReader([]byte("hello"))) + reader := io.NopCloser(bytes.NewReader([]byte("hello"))) - readFunc, err := CreateMessageReaderFunc("raw", reader) + source, err := NewReaderMessageSource("raw", reader) assert.Nil(t, err) - msg, err := readFunc() + msg, err := source() assert.NoError(t, err) assert.Equal(t, []byte("hello"), msg.Body) - msg, err = readFunc() + msg, err = source() assert.Equal(t, io.EOF, err) } diff --git a/cmd/rabtap/message_source.go b/cmd/rabtap/message_source.go new file mode 100644 index 0000000..13d7555 --- /dev/null +++ b/cmd/rabtap/message_source.go @@ -0,0 +1,6 @@ +package main + +// MessageSource provides messages that can be published. +// returns the message to be published, xor an error. When no more +// messages are available, io.EOF must be returned. +type MessageSource func() (RabtapPersistentMessage, error) diff --git a/cmd/rabtap/message_transformer.go b/cmd/rabtap/message_transformer.go index f1fe788..d02cacd 100644 --- a/cmd/rabtap/message_transformer.go +++ b/cmd/rabtap/message_transformer.go @@ -1,12 +1,11 @@ -// compose message providers package main // MessageTransformer transforms the given message type MessageTransformer func(m RabtapPersistentMessage) (RabtapPersistentMessage, error) -// NewTransformingMessageProvider returns a new message provider that computes +// NewTransformingMessageSource returns a new message source that computes // m = tn(...t1(f()), i.e. that applies the transformer to the message provided by f. -func NewTransformingMessageProvider(f MessageProviderFunc, transformer ...MessageTransformer) MessageProviderFunc { +func NewTransformingMessageSource(f MessageSource, transformer ...MessageTransformer) MessageSource { return func() (RabtapPersistentMessage, error) { m, err := f() if err != nil { diff --git a/cmd/rabtap/message_transformer_test.go b/cmd/rabtap/message_transformer_test.go index b84fb8a..c48aeba 100644 --- a/cmd/rabtap/message_transformer_test.go +++ b/cmd/rabtap/message_transformer_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestNewMessageTransformerProvidesAMessageProviderThatTransformsAMessageOnSuccess(t *testing.T) { +func TestNewMessageTransformerProvidesAMessageSourceThatTransformsAMessageOnSuccess(t *testing.T) { // given provider := func() (RabtapPersistentMessage, error) { @@ -26,7 +26,7 @@ func TestNewMessageTransformerProvidesAMessageProviderThatTransformsAMessageOnSu } // when - provider = NewTransformingMessageProvider(provider, transformer1, transformer2) + provider = NewTransformingMessageSource(provider, transformer1, transformer2) msg, err := provider() // then @@ -36,7 +36,7 @@ func TestNewMessageTransformerProvidesAMessageProviderThatTransformsAMessageOnSu assert.Equal(t, "123", msg.MessageID) } -func TestNewMessageTransformerProvidesAMessageProviderThatPropagtesErrors(t *testing.T) { +func TestNewMessageTransformerProvidesAMessageSourceThatPropagtesErrors(t *testing.T) { // given provider := func() (RabtapPersistentMessage, error) { @@ -49,7 +49,7 @@ func TestNewMessageTransformerProvidesAMessageProviderThatPropagtesErrors(t *tes } // when - provider = NewTransformingMessageProvider(provider, transformer) + provider = NewTransformingMessageSource(provider, transformer) _, err := provider() // then diff --git a/cmd/rabtap/message_writer_test.go b/cmd/rabtap/message_writer_test.go index 1123f8c..c362b17 100644 --- a/cmd/rabtap/message_writer_test.go +++ b/cmd/rabtap/message_writer_test.go @@ -65,13 +65,13 @@ func TestSaveMessageToRawFile(t *testing.T) { // check contents of message body .dat file datFilename := basename + ".dat" - contentsBody, err := ioutil.ReadFile(datFilename) + contentsBody, err := os.ReadFile(datFilename) assert.Nil(t, err) assert.Equal(t, []byte("simple test message."), contentsBody) // check contents of metadata file metaFilename := basename + ".json" - contentsMeta, err := ioutil.ReadFile(metaFilename) + contentsMeta, err := os.ReadFile(metaFilename) assert.Nil(t, err) // deserialize from .json file var jsonMetaActual RabtapPersistentMessage @@ -96,7 +96,7 @@ func TestSaveMessageToFilesToInvalidDir(t *testing.T) { // TestSaveMessageToFile tests the SaveMessagesToFile() function by // writing to and reading a temporary files. func TestSaveMessageToJSONFile(t *testing.T) { - testdir, err := ioutil.TempDir("", "") + testdir, err := os.MkdirTemp("", "") assert.Nil(t, err) defer os.RemoveAll(testdir) @@ -105,7 +105,7 @@ func TestSaveMessageToJSONFile(t *testing.T) { err = SaveMessageToJSONFile(filename, rabtap.NewTapMessage(testMessage, createdTs), JSONMarshalIndent) assert.Nil(t, err) - contents, err := ioutil.ReadFile(filename) + contents, err := os.ReadFile(filename) assert.Nil(t, err) // deserialize from .json file diff --git a/cmd/rabtap/subscribe.go b/cmd/rabtap/subscribe.go index 6223e5b..25e4a5f 100644 --- a/cmd/rabtap/subscribe.go +++ b/cmd/rabtap/subscribe.go @@ -24,7 +24,7 @@ type FilenameProvider func() string type AcknowledgeFunc func(rabtap.TapMessage) error -type MessageReceiveFuncOptions struct { +type MessageSinkOptions struct { out io.Writer format string // currently: raw, json, json-nopp silent bool @@ -32,8 +32,8 @@ type MessageReceiveFuncOptions struct { filenameProvider FilenameProvider } -// MessageReceiveFunc processes receiced messages from a tap. -type MessageReceiveFunc func(rabtap.TapMessage) error +// MessageSink processes received messages +type MessageSink func(rabtap.TapMessage) error // var ErrMessageLoopEnded = errors.New("message loop ended") @@ -43,10 +43,10 @@ func createMessagePredEnv(msg rabtap.TapMessage, count int64) map[string]interfa "count": count, "toStr": func(b []byte) string { return string(b) }, "gunzip": func(b []byte) ([]byte, error) { - return gunzip(bytes.NewReader(b)) + return decompressGunzip(bytes.NewReader(b)) }, "body": func(m *amqp.Delivery) ([]byte, error) { - return body(m) + return Body(m) }, } } @@ -72,10 +72,10 @@ func NewLoopCountPred(limit int64) (*LoopCountPred, error) { return &LoopCountPred{limit}, nil } -// createAcknowledgeFunc returns the function used to acknowledge received +// CreateAcknowledgeFunc returns the function used to acknowledge received // functions, wich will either be ACKed or REJECTED with optional REQUEUE // flag set. -func createAcknowledgeFunc(reject, requeue bool) AcknowledgeFunc { +func CreateAcknowledgeFunc(reject, requeue bool) AcknowledgeFunc { return func(message rabtap.TapMessage) error { if reject { if err := message.AmqpMessage.Reject(requeue); err != nil { @@ -90,17 +90,17 @@ func createAcknowledgeFunc(reject, requeue bool) AcknowledgeFunc { } } -// messageReceiveLoop passes received AMQP messages to messageReceiveFunc and +// MessageReceiveLoop passes received AMQP messages to the messageSink and // handles errors received on the errorChan. AMQP messages are ascknowledged by // the provides acknowleder function. Each message is passed to the predicate // termPred function. If true is returned, processing is ended. Timeout // specifies an idle timeout, which will end processing when for the given // duration no new messages are received on messageChan. // TODO pass in struct, limit number of arguments -func messageReceiveLoop(ctx context.Context, +func MessageReceiveLoop(ctx context.Context, messageChan rabtap.TapChannel, errorChan rabtap.SubscribeErrorChannel, - messageReceiveFunc MessageReceiveFunc, + messageSink MessageSink, filterPred Predicate, termPred Predicate, acknowledger AcknowledgeFunc, @@ -147,7 +147,7 @@ func messageReceiveLoop(ctx context.Context, } count += 1 - if err := messageReceiveFunc(message); err != nil { + if err := messageSink(message); err != nil { log.Error(err) } @@ -166,13 +166,11 @@ func messageReceiveLoop(ctx context.Context, } } -// NullMessageReceiveFunc is used a sentinel to terminate a chain of -// MessageReceiveFuncs -func NullMessageReceiveFunc(rabtap.TapMessage) error { +func nopMessageSink(rabtap.TapMessage) error { return nil } -func chainedMessageReceiveFunc(first, second MessageReceiveFunc) MessageReceiveFunc { +func messageSinkTee(first, second MessageSink) MessageSink { return func(message rabtap.TapMessage) error { if err := first(message); err != nil { return err @@ -181,86 +179,84 @@ func chainedMessageReceiveFunc(first, second MessageReceiveFunc) MessageReceiveF } } -// createMessageReceiveFuncWriteToJSONFile return receive func that writes the -// message and metadata to separate files in the provided directory using the -// provided marshaller. -func createMessageReceiveFuncWriteToRawFiles(dir string, marshaller marshalFunc, filenameProvider FilenameProvider) MessageReceiveFunc { +// newWriteToRawFileMessageSink returns a message sink that writes the message +// and metadata to separate files in the provided directory using the provided +// marshaller. +func newWriteToRawFileMessageSink(dir string, marshaller marshalFunc, filenameProvider FilenameProvider) MessageSink { return func(message rabtap.TapMessage) error { basename := path.Join(dir, filenameProvider()) return SaveMessageToRawFiles(basename, message, marshaller) } } -// createMessageReceiveFuncWriteToJSONFile return receive func that writes the +// creatmMessageReceiveFuncWriteToJSONFile return receive func that writes the // message to a file in the provided directory using the provided marshaller. -func createMessageReceiveFuncWriteToJSONFile(dir string, marshaller marshalFunc, filenameProvider FilenameProvider) MessageReceiveFunc { +func newWriteToJSONFileMessageSink(dir string, marshaller marshalFunc, filenameProvider FilenameProvider) MessageSink { return func(message rabtap.TapMessage) error { filename := path.Join(dir, filenameProvider()+".json") return SaveMessageToJSONFile(filename, message, marshaller) } } -// createMessageReceiveFuncPrintJSON returns a function that prints messages as -// JSON to the provided writer -// messages as JSON messages -func createMessageReceiveFuncPrintJSON(out io.Writer, marshaller marshalFunc) MessageReceiveFunc { +// newPrintJSONMessageSink returns a function that prints messages as JSON to +// the provided writer +func newPrintJSONMessageSink(out io.Writer, marshaller marshalFunc) MessageSink { return func(message rabtap.TapMessage) error { return WriteMessage(out, message, marshaller) } } -// createMessageReceiveFuncPrintPretty returns a function that pretty prints -// received messaged to the provided writer -func createMessageReceiveFuncPrintPretty(out io.Writer) MessageReceiveFunc { +// newPrettyPrintJSONMessageSink returns a function that pretty prints received +// messaged to the provided writer +func newPrettyPrintJSONMessageSink(out io.Writer) MessageSink { return func(message rabtap.TapMessage) error { return PrettyPrintMessage(out, message) } } -func createMessageReceivePrintFunc(format string, out io.Writer, silent bool) (MessageReceiveFunc, error) { +func newPrintMessageMessageSink(format string, out io.Writer, silent bool) (MessageSink, error) { if silent { - return NullMessageReceiveFunc, nil + return nopMessageSink, nil } switch format { case "json-nopp": - return createMessageReceiveFuncPrintJSON(out, JSONMarshal), nil + return newPrintJSONMessageSink(out, JSONMarshal), nil case "json": - return createMessageReceiveFuncPrintJSON(out, JSONMarshalIndent), nil + return newPrintJSONMessageSink(out, JSONMarshalIndent), nil case "raw": - return createMessageReceiveFuncPrintPretty(out), nil + return newPrettyPrintJSONMessageSink(out), nil default: return nil, fmt.Errorf("invalid format %s", format) } } -func createMessageReceiveSaveFunc(format string, optSaveDir *string, filenameProvider FilenameProvider) (MessageReceiveFunc, error) { +func newSaveFileMessageSink(format string, optSaveDir *string, filenameProvider FilenameProvider) (MessageSink, error) { if optSaveDir == nil { - return NullMessageReceiveFunc, nil + return nopMessageSink, nil } switch format { case "json-nopp": fallthrough case "json": - return createMessageReceiveFuncWriteToJSONFile(*optSaveDir, JSONMarshalIndent, filenameProvider), nil + return newWriteToJSONFileMessageSink(*optSaveDir, JSONMarshalIndent, filenameProvider), nil case "raw": - return createMessageReceiveFuncWriteToRawFiles(*optSaveDir, JSONMarshalIndent, filenameProvider), nil + return newWriteToRawFileMessageSink(*optSaveDir, JSONMarshalIndent, filenameProvider), nil default: return nil, fmt.Errorf("invalid format %s", format) } } -// createMessageReceiveFunc returns a MessageReceiveFunc which is invoked on -// receival of a message during tap and subscribe. Depending on the options -// set, function that optionally prints to the proviced io.Writer and -// optionally to the provided directory is returned. -func createMessageReceiveFunc(opts MessageReceiveFuncOptions) (MessageReceiveFunc, error) { - - printFunc, err := createMessageReceivePrintFunc(opts.format, opts.out, opts.silent) +// NewMessageSink returns a message sink which is invoked on receival of a +// message during tap and subscribe. Depending on the options set, function +// that optionally prints to the proviced io.Writer and optionally to the +// provided directory is returned. +func NewMessageSink(opts MessageSinkOptions) (MessageSink, error) { + printFunc, err := newPrintMessageMessageSink(opts.format, opts.out, opts.silent) if err != nil { return printFunc, err } - saveFunc, err := createMessageReceiveSaveFunc(opts.format, opts.optSaveDir, opts.filenameProvider) - return chainedMessageReceiveFunc(printFunc, saveFunc), err + saveFunc, err := newSaveFileMessageSink(opts.format, opts.optSaveDir, opts.filenameProvider) + return messageSinkTee(printFunc, saveFunc), err } diff --git a/cmd/rabtap/subscribe_test.go b/cmd/rabtap/subscribe_test.go index bf0f0f9..7500c5f 100644 --- a/cmd/rabtap/subscribe_test.go +++ b/cmd/rabtap/subscribe_test.go @@ -83,7 +83,7 @@ func TestCreateAcknowledgeFuncReturnedFuncCorreclyAcknowledgesTheMessage(t *test // given info := fmt.Sprintf("testcase %d, %+v", i, tc) mock := NewMockAcknowledger() - ackFunc := createAcknowledgeFunc(tc.reject, tc.requeue) + ackFunc := CreateAcknowledgeFunc(tc.reject, tc.requeue) msg := rabtap.TapMessage{AmqpMessage: &amqp.Delivery{Acknowledger: mock}} // when @@ -126,13 +126,13 @@ func TestCreateCountingMessageReceivePredReturnsTrueOnWhenLimitIsReached(t *test } } -func TestChainMessageReceiveFuncCallsBothFunctions(t *testing.T) { +func TestChainMessageSinkCallsBothFunctions(t *testing.T) { firstCalled := false secondCalled := false first := func(_ rabtap.TapMessage) error { firstCalled = true; return nil } second := func(_ rabtap.TapMessage) error { secondCalled = true; return nil } - chained := chainedMessageReceiveFunc(first, second) + chained := messageSinkTee(first, second) err := chained(rabtap.TapMessage{}) assert.Nil(t, err) @@ -140,14 +140,14 @@ func TestChainMessageReceiveFuncCallsBothFunctions(t *testing.T) { assert.True(t, secondCalled) } -func TestChainMessageReceiveFuncDoesNotCallSecondOnErrorOnFirst(t *testing.T) { +func TestChainMessageSinkDoesNotCallSecondOnErrorOnFirst(t *testing.T) { firstCalled := false secondCalled := false expectedErr := errors.New("first failed") first := func(_ rabtap.TapMessage) error { firstCalled = true; return expectedErr } second := func(_ rabtap.TapMessage) error { secondCalled = true; return nil } - chained := chainedMessageReceiveFunc(first, second) + chained := messageSinkTee(first, second) err := chained(rabtap.TapMessage{}) assert.Equal(t, expectedErr, err) @@ -155,28 +155,28 @@ func TestChainMessageReceiveFuncDoesNotCallSecondOnErrorOnFirst(t *testing.T) { assert.False(t, secondCalled) } -func TestCreateMessageReceiveFuncReturnsErrorWithInvalidFormat(t *testing.T) { - opts := MessageReceiveFuncOptions{ +func TestCreateMessageSinkReturnsErrorWithInvalidFormat(t *testing.T) { + opts := MessageSinkOptions{ format: "invalid", } - _, err := createMessageReceiveFunc(opts) + _, err := NewMessageSink(opts) assert.NotNil(t, err) } -func TestCreateMessageReceiveFuncRawToFile(t *testing.T) { +func TestCreateMessageSinkRawToFile(t *testing.T) { testDir, err := ioutil.TempDir("", "") require.Nil(t, err) defer os.RemoveAll(testDir) var b bytes.Buffer - opts := MessageReceiveFuncOptions{ + opts := MessageSinkOptions{ out: &b, format: "raw", optSaveDir: &testDir, silent: false, filenameProvider: func() string { return "tapfilename" }, } - rcvFunc, err := createMessageReceiveFunc(opts) + rcvFunc, err := NewMessageSink(opts) assert.Nil(t, err) message := rabtap.NewTapMessage(&amqp.Delivery{Body: []byte("Testmessage")}, time.Now()) @@ -186,27 +186,27 @@ func TestCreateMessageReceiveFuncRawToFile(t *testing.T) { assert.True(t, strings.Contains(b.String(), "Testmessage")) // check contents of written file(s) - contents, err := ioutil.ReadFile(path.Join(testDir, "tapfilename.dat")) + contents, err := os.ReadFile(path.Join(testDir, "tapfilename.dat")) assert.Nil(t, err) assert.Equal(t, "Testmessage", string(contents)) // TODO check contents of JSON metadata "tapfilename.json" - contents, err = ioutil.ReadFile(path.Join(testDir, "tapfilename.json")) + contents, err = os.ReadFile(path.Join(testDir, "tapfilename.json")) assert.Nil(t, err) var metadata map[string]interface{} err = json.Unmarshal(contents, &metadata) assert.Nil(t, err) } -func TestCreateMessageReceiveFuncPrintsNothingWhenSilentOptionIsSet(t *testing.T) { +func TestCreateMessageSinkPrintsNothingWhenSilentOptionIsSet(t *testing.T) { var b bytes.Buffer - opts := MessageReceiveFuncOptions{ + opts := MessageSinkOptions{ out: &b, format: "raw", optSaveDir: nil, silent: true, } - rcvFunc, err := createMessageReceiveFunc(opts) + rcvFunc, err := NewMessageSink(opts) assert.Nil(t, err) message := rabtap.NewTapMessage(&amqp.Delivery{Body: []byte("Testmessage")}, time.Now()) @@ -216,15 +216,15 @@ func TestCreateMessageReceiveFuncPrintsNothingWhenSilentOptionIsSet(t *testing.T assert.Equal(t, b.String(), "") } -func TestCreateMessageReceiveFuncJSON(t *testing.T) { +func TestCreateMessageSinkJSON(t *testing.T) { var b bytes.Buffer - opts := MessageReceiveFuncOptions{ + opts := MessageSinkOptions{ out: &b, format: "json", optSaveDir: nil, filenameProvider: func() string { return "tapfilename" }, } - rcvFunc, err := createMessageReceiveFunc(opts) + rcvFunc, err := NewMessageSink(opts) assert.Nil(t, err) message := rabtap.NewTapMessage(&amqp.Delivery{Body: []byte("Testmessage")}, time.Now()) @@ -235,22 +235,22 @@ func TestCreateMessageReceiveFuncJSON(t *testing.T) { assert.True(t, strings.Contains(b.String(), "\"Body\": \"VGVzdG1lc3NhZ2U=\"")) } -func TestCreateMessageReceiveFuncJSONNoPPToFile(t *testing.T) { +func TestCreateMessageSinkJSONNoPPToFile(t *testing.T) { // message is written as json (no pretty print) to writer and // as json to file. - testDir, err := ioutil.TempDir("", "") + testDir, err := os.MkdirTemp("", "") require.Nil(t, err) defer os.RemoveAll(testDir) var b bytes.Buffer - opts := MessageReceiveFuncOptions{ + opts := MessageSinkOptions{ out: &b, format: "json-nopp", optSaveDir: &testDir, filenameProvider: func() string { return "tapfilename" }, } - rcvFunc, err := createMessageReceiveFunc(opts) + rcvFunc, err := NewMessageSink(opts) assert.Nil(t, err) message := rabtap.NewTapMessage(&amqp.Delivery{Body: []byte("Testmessage")}, time.Now()) @@ -260,7 +260,7 @@ func TestCreateMessageReceiveFuncJSONNoPPToFile(t *testing.T) { assert.Equal(t, 1, strings.Count(b.String(), "\n")) assert.True(t, strings.Contains(b.String(), ",\"Body\":\"VGVzdG1lc3NhZ2U=\"")) - contents, err := ioutil.ReadFile(path.Join(testDir, "tapfilename.json")) + contents, err := os.ReadFile(path.Join(testDir, "tapfilename.json")) assert.Nil(t, err) assert.True(t, strings.Count(string(contents), "\n") > 1) assert.True(t, strings.Contains(string(contents), "\"Body\": \"VGVzdG1lc3NhZ2U=\"")) @@ -274,7 +274,7 @@ func TestMessageReceiveLoopForwardsMessagesOnChannel(t *testing.T) { done := make(chan bool) received := 0 - receiveFunc := func(rabtap.TapMessage) error { + sink := func(rabtap.TapMessage) error { received++ done <- true return nil @@ -283,7 +283,7 @@ func TestMessageReceiveLoopForwardsMessagesOnChannel(t *testing.T) { passPred := constantPred{val: true} acknowledger := func(rabtap.TapMessage) error { return nil } go func() { - _ = messageReceiveLoop(ctx, messageChan, errorChan, receiveFunc, passPred, termPred, acknowledger, time.Second*10) + _ = MessageReceiveLoop(ctx, messageChan, errorChan, sink, passPred, termPred, acknowledger, time.Second*10) }() messageChan <- rabtap.TapMessage{} @@ -301,7 +301,7 @@ func TestMessageReceiveLoopExitsOnChannelClose(t *testing.T) { close(messageChan) acknowledger := func(rabtap.TapMessage) error { return nil } - err := messageReceiveLoop(ctx, messageChan, errorChan, NullMessageReceiveFunc, passPred, termPred, acknowledger, time.Second*10) + err := MessageReceiveLoop(ctx, messageChan, errorChan, nopMessageSink, passPred, termPred, acknowledger, time.Second*10) assert.Nil(t, err) } @@ -315,7 +315,7 @@ func TestMessageReceiveLoopExitsWhenTermPredReturnsTrue(t *testing.T) { messageChan <- rabtap.TapMessage{} acknowledger := func(rabtap.TapMessage) error { return nil } - err := messageReceiveLoop(ctx, messageChan, errorChan, NullMessageReceiveFunc, passPred, termPred, acknowledger, time.Second*10) + err := MessageReceiveLoop(ctx, messageChan, errorChan, nopMessageSink, passPred, termPred, acknowledger, time.Second*10) assert.Nil(t, err) } @@ -326,7 +326,7 @@ func TestMessageReceiveLoopIgnoresFilteredMessages(t *testing.T) { errorChan := make(rabtap.SubscribeErrorChannel) received := 0 - receiveFunc := func(rabtap.TapMessage) error { + sink := func(rabtap.TapMessage) error { received++ return nil } @@ -345,7 +345,7 @@ func TestMessageReceiveLoopIgnoresFilteredMessages(t *testing.T) { messageChan <- rabtap.TapMessage{AmqpMessage: &amqp.Delivery{MessageId: "test"}} messageChan <- rabtap.TapMessage{AmqpMessage: &amqp.Delivery{MessageId: ""}} - _ = messageReceiveLoop(ctx, messageChan, errorChan, receiveFunc, + _ = MessageReceiveLoop(ctx, messageChan, errorChan, sink, filterPred, termPred, acknowledger, time.Second*1) // we expect 2 of them to be filtered out @@ -363,7 +363,7 @@ func TestMessageReceiveLoopExitsWithErrorWhenIdle(t *testing.T) { acknowledger := func(rabtap.TapMessage) error { return nil } // when - err := messageReceiveLoop(ctx, messageChan, errorChan, NullMessageReceiveFunc, passPred, termPred, acknowledger, time.Second*1) + err := MessageReceiveLoop(ctx, messageChan, errorChan, nopMessageSink, passPred, termPred, acknowledger, time.Second*1) // Then assert.Equal(t, ErrIdleTimeout, err) diff --git a/go.mod b/go.mod index 12741cf..daddb89 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( require ( github.com/expr-lang/expr v1.16.9 + github.com/klauspost/compress v1.17.9 github.com/stealthrocket/net v0.2.1 ) diff --git a/go.sum b/go.sum index c60b055..cf57f7a 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,8 @@ github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= +github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= diff --git a/pkg/dial_tls_test.go b/pkg/dial_tls_test.go index dcc0231..5e6d6cc 100644 --- a/pkg/dial_tls_test.go +++ b/pkg/dial_tls_test.go @@ -8,7 +8,7 @@ package rabtap import ( "crypto/tls" "crypto/x509" - "io/ioutil" + "os" "testing" amqp "github.com/rabbitmq/amqp091-go" @@ -56,7 +56,7 @@ func TestDialTLSConnectsToTLSEndpoint(t *testing.T) { for _, tc := range testcases { // given tlsConfig := &tls.Config{} - caCert, err := ioutil.ReadFile(certDir + "ca.crt") + caCert, err := os.ReadFile(certDir + "ca.crt") assert.NoError(t, err) caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert)