Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New encoding/decoding options for sqs, kafka and kafacgo #235

Merged
merged 9 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.84.2
1.85.0
8 changes: 5 additions & 3 deletions examples/service/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ toolchain go1.22.2
replace github.com/Vonage/gosrvlib => ../..

require (
github.com/Vonage/gosrvlib v1.84.2
github.com/Vonage/gosrvlib v1.85.0
github.com/golang/mock v1.6.0
github.com/jstemmer/go-junit-report v1.0.0
github.com/prometheus/client_golang v1.19.0
Expand All @@ -20,6 +20,8 @@ require (

require (
cloud.google.com/go v0.112.2 // indirect
cloud.google.com/go/auth v0.2.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.0 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/firestore v1.15.0 // indirect
cloud.google.com/go/longrunning v0.5.6 // indirect
Expand Down Expand Up @@ -96,7 +98,7 @@ require (
go.opentelemetry.io/otel/trace v1.25.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
Expand All @@ -105,7 +107,7 @@ require (
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/api v0.172.0 // indirect
google.golang.org/api v0.174.0 // indirect
google.golang.org/genproto v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
Expand Down
16 changes: 10 additions & 6 deletions examples/service/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ cloud.google.com/go/asset v1.18.1 h1:+NpxL5L53VY91EoJTHeGGXSWEUllf2hhXpCyTnSrd3Q
cloud.google.com/go/asset v1.18.1/go.mod h1:QXivw0mVqwrhZyuX6iqFbyfCdzYE9AFCJVG47Eh5dMM=
cloud.google.com/go/assuredworkloads v1.11.6 h1:3NlUes0xLN2kcSU24qQADFYsOaetCPg0HSA302AyV5s=
cloud.google.com/go/assuredworkloads v1.11.6/go.mod h1:1dlhWKocQorGYkspt+scx11kQCI9qVHOi1Au6Rw9srg=
cloud.google.com/go/auth v0.2.0 h1:y6oTcpMSbOcXbwYgUUrvI+mrQ2xbrcdpPgtVbCGTLTk=
cloud.google.com/go/auth v0.2.0/go.mod h1:+yb+oy3/P0geX6DLKlqiGHARGR6EX2GRtYCzWOCQSbU=
cloud.google.com/go/auth/oauth2adapt v0.2.0 h1:FR8zevgQwu+8CqiOT5r6xCmJa3pJC/wdXEEPF1OkNhA=
cloud.google.com/go/auth/oauth2adapt v0.2.0/go.mod h1:AfqujpDAlTfLfeCIl/HJZZlIxD8+nJoZ5e0x1IxGq5k=
cloud.google.com/go/automl v1.13.6 h1:NHBO5cjo2IgwaJ5qlez/iA35XI1db87PPlOB0Kjt5RM=
cloud.google.com/go/automl v1.13.6/go.mod h1:/0VtkKis6KhFJuPzi45e0E+e9AdQE09SNieChjJqU18=
cloud.google.com/go/baremetalsolution v1.2.5 h1:jCR4rnVsG6ocK6ngFr2Z6ugKZfTENmMZkiV6Ma2tEeE=
Expand Down Expand Up @@ -722,8 +726,8 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 h1:ESSUROHIBHg7USnszlcdmjBEwdMj9VUvU+OPk4yl2mc=
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0=
Expand Down Expand Up @@ -821,8 +825,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
google.golang.org/api v0.172.0 h1:/1OcMZGPmW1rX2LCu2CmGUD1KXK1+pfzxotxyRUCCdk=
google.golang.org/api v0.172.0/go.mod h1:+fJZq6QXWfa9pXhnIzsjx4yI22d4aI9ZpLb58gvXjis=
google.golang.org/api v0.174.0 h1:zB1BWl7ocxfTea2aQ9mgdzXjnfPySllpPOskdnO+q34=
google.golang.org/api v0.174.0/go.mod h1:aC7tB6j0HR1Nl0ni5ghpx6iLasmAX78Zkh/wgxAAjLg=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
Expand All @@ -834,8 +838,8 @@ google.golang.org/genproto v0.0.0-20240415180920-8c6c420018be h1:g4aX8SUFA8V5F4L
google.golang.org/genproto v0.0.0-20240415180920-8c6c420018be/go.mod h1:FeSdT5fk+lkxatqJP38MsUicGqHax5cLtmy/6TAuxO4=
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be h1:Zz7rLWqp0ApfsR/l7+zSHhY3PMiH2xqgxlfYfAfNpoU=
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be/go.mod h1:dvdCTIoAGbkWbcIKBniID56/7XHTt6WfxXNMxuziJ+w=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240318140521-94a12d6c2237 h1:BGtl5+MtFriTFllRl3QPEPWZrD8nVhSTONzTkSin3+c=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240318140521-94a12d6c2237/go.mod h1:IN9OQUXZ0xT+26MDwZL8fJcYw+y99b0eYPA2U15Jt8o=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240325203815-454cdb8f5daa h1:wBkzraZsSqhj1M4L/nMrljUU6XasJkgHvUsq8oRGwF0=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240325203815-454cdb8f5daa/go.mod h1:IN9OQUXZ0xT+26MDwZL8fJcYw+y99b0eYPA2U15Jt8o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ require (

require (
cloud.google.com/go v0.112.2 // indirect
cloud.google.com/go/auth v0.2.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.0 // indirect
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/firestore v1.15.0 // indirect
cloud.google.com/go/longrunning v0.5.6 // indirect
Expand Down Expand Up @@ -122,14 +124,14 @@ require (
go.opentelemetry.io/otel v1.25.0 // indirect
go.opentelemetry.io/otel/metric v1.25.0 // indirect
go.opentelemetry.io/otel/trace v1.25.0 // indirect
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.20.0 // indirect
google.golang.org/api v0.172.0 // indirect
google.golang.org/api v0.174.0 // indirect
google.golang.org/genproto v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
Expand Down
16 changes: 10 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ cloud.google.com/go/asset v1.18.1 h1:+NpxL5L53VY91EoJTHeGGXSWEUllf2hhXpCyTnSrd3Q
cloud.google.com/go/asset v1.18.1/go.mod h1:QXivw0mVqwrhZyuX6iqFbyfCdzYE9AFCJVG47Eh5dMM=
cloud.google.com/go/assuredworkloads v1.11.6 h1:3NlUes0xLN2kcSU24qQADFYsOaetCPg0HSA302AyV5s=
cloud.google.com/go/assuredworkloads v1.11.6/go.mod h1:1dlhWKocQorGYkspt+scx11kQCI9qVHOi1Au6Rw9srg=
cloud.google.com/go/auth v0.2.0 h1:y6oTcpMSbOcXbwYgUUrvI+mrQ2xbrcdpPgtVbCGTLTk=
cloud.google.com/go/auth v0.2.0/go.mod h1:+yb+oy3/P0geX6DLKlqiGHARGR6EX2GRtYCzWOCQSbU=
cloud.google.com/go/auth/oauth2adapt v0.2.0 h1:FR8zevgQwu+8CqiOT5r6xCmJa3pJC/wdXEEPF1OkNhA=
cloud.google.com/go/auth/oauth2adapt v0.2.0/go.mod h1:AfqujpDAlTfLfeCIl/HJZZlIxD8+nJoZ5e0x1IxGq5k=
cloud.google.com/go/automl v1.13.6 h1:NHBO5cjo2IgwaJ5qlez/iA35XI1db87PPlOB0Kjt5RM=
cloud.google.com/go/automl v1.13.6/go.mod h1:/0VtkKis6KhFJuPzi45e0E+e9AdQE09SNieChjJqU18=
cloud.google.com/go/baremetalsolution v1.2.5 h1:jCR4rnVsG6ocK6ngFr2Z6ugKZfTENmMZkiV6Ma2tEeE=
Expand Down Expand Up @@ -820,8 +824,8 @@ golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 h1:ESSUROHIBHg7USnszlcdmjBEwdMj9VUvU+OPk4yl2mc=
golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f h1:99ci1mjWVBWwJiEKYY6jWa4d2nTQVIEhZIptnrVb1XY=
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 h1:XQyxROzUlZH+WIQwySDgnISgOivlhjIEwaQaJEJrrN0=
Expand Down Expand Up @@ -954,8 +958,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
google.golang.org/api v0.172.0 h1:/1OcMZGPmW1rX2LCu2CmGUD1KXK1+pfzxotxyRUCCdk=
google.golang.org/api v0.172.0/go.mod h1:+fJZq6QXWfa9pXhnIzsjx4yI22d4aI9ZpLb58gvXjis=
google.golang.org/api v0.174.0 h1:zB1BWl7ocxfTea2aQ9mgdzXjnfPySllpPOskdnO+q34=
google.golang.org/api v0.174.0/go.mod h1:aC7tB6j0HR1Nl0ni5ghpx6iLasmAX78Zkh/wgxAAjLg=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM=
Expand All @@ -969,8 +973,8 @@ google.golang.org/genproto v0.0.0-20240415180920-8c6c420018be h1:g4aX8SUFA8V5F4L
google.golang.org/genproto v0.0.0-20240415180920-8c6c420018be/go.mod h1:FeSdT5fk+lkxatqJP38MsUicGqHax5cLtmy/6TAuxO4=
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be h1:Zz7rLWqp0ApfsR/l7+zSHhY3PMiH2xqgxlfYfAfNpoU=
google.golang.org/genproto/googleapis/api v0.0.0-20240415180920-8c6c420018be/go.mod h1:dvdCTIoAGbkWbcIKBniID56/7XHTt6WfxXNMxuziJ+w=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240318140521-94a12d6c2237 h1:BGtl5+MtFriTFllRl3QPEPWZrD8nVhSTONzTkSin3+c=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240318140521-94a12d6c2237/go.mod h1:IN9OQUXZ0xT+26MDwZL8fJcYw+y99b0eYPA2U15Jt8o=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240325203815-454cdb8f5daa h1:wBkzraZsSqhj1M4L/nMrljUU6XasJkgHvUsq8oRGwF0=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240325203815-454cdb8f5daa/go.mod h1:IN9OQUXZ0xT+26MDwZL8fJcYw+y99b0eYPA2U15Jt8o=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be h1:LG9vZxsWGOmUKieR8wPAUR3u3MpnYFQZROPIMaXh7/A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand Down
12 changes: 8 additions & 4 deletions pkg/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ const (
)

type config struct {
sessionTimeout time.Duration
startOffset int64
sessionTimeout time.Duration
startOffset int64
messageEncodeFunc TEncodeFunc
messageDecodeFunc TDecodeFunc
}

func defaultConfig() *config {
return &config{
sessionTimeout: defaultSessionTimeout,
startOffset: kafka.LastOffset,
sessionTimeout: defaultSessionTimeout,
startOffset: kafka.LastOffset,
messageEncodeFunc: DefaultMessageEncodeFunc,
messageDecodeFunc: DefaultMessageDecodeFunc,
}
}
2 changes: 2 additions & 0 deletions pkg/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ func Test_defaultConfig(t *testing.T) {
require.NotNil(t, cfg)
require.NotEmpty(t, cfg.sessionTimeout)
require.Equal(t, int64(-1), cfg.startOffset)
require.NotNil(t, cfg.messageEncodeFunc)
require.NotNil(t, cfg.messageDecodeFunc)
}
25 changes: 25 additions & 0 deletions pkg/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package kafka

import (
"context"
"errors"
"fmt"

"github.com/Vonage/gosrvlib/pkg/typeutil"
"github.com/segmentio/kafka-go"
"go.uber.org/multierr"
)
Expand All @@ -12,6 +14,9 @@ const (
network = "tcp"
)

// TDecodeFunc is the type of function used to replace the default message decoding function used by ReceiveData().
type TDecodeFunc func(ctx context.Context, msg []byte, data any) error

type consumerClient interface {
ReadMessage(ctx context.Context) (kafka.Message, error)
Close() error
Expand All @@ -34,6 +39,10 @@ func NewConsumer(brokers []string, topic, groupID string, opts ...Option) (*Cons
applyOpt(cfg)
}

if cfg.messageDecodeFunc == nil {
return nil, errors.New("missing message decoding function")
}

params := kafka.ReaderConfig{
Brokers: brokers,
Topic: topic,
Expand Down Expand Up @@ -96,3 +105,19 @@ func (c *Consumer) HealthCheck(ctx context.Context) error {

return fmt.Errorf("unable to connect to Kafka: %w", errors)
}

// DefaultMessageDecodeFunc is the default function to decode a message for ReceiveData().
// The value underlying data must be a pointer to the correct type for the next data item received.
func DefaultMessageDecodeFunc(_ context.Context, msg []byte, data any) error {
return typeutil.ByteDecode(msg, data) //nolint:wrapcheck
}

// ReceiveData retrieves a message from the queue and extract its content in the data.
func (c *Consumer) ReceiveData(ctx context.Context, data any) error {
message, err := c.Receive(ctx)
if err != nil {
return err
}

return c.cfg.messageDecodeFunc(ctx, message, data)
}
109 changes: 109 additions & 0 deletions pkg/kafka/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ func Test_NewConsumer(t *testing.T) {
groupID: "three",
wantErr: true,
},
{
name: "missing decoding function",
brokers: []string{"url1", "url2"},
topic: "topic1",
groupID: "one",
options: []Option{
WithMessageDecodeFunc(nil),
},
wantErr: true,
},
}

for _, tt := range testCases {
Expand Down Expand Up @@ -144,3 +154,102 @@ func Test_Consumer_HealthCheck(t *testing.T) {
err = consumer.HealthCheck(ctx)
require.NoError(t, err)
}

type consumerMock struct {
readMessage func(ctx context.Context) (kafka.Message, error)
close func() error
}

func (c consumerMock) ReadMessage(ctx context.Context) (kafka.Message, error) {
return c.readMessage(ctx)
}

func (c consumerMock) Close() error {
return c.close()
}

func TestReceiveData(t *testing.T) {
t.Parallel()

type TestData struct {
Alpha string
Beta int
}

tests := []struct {
name string
mock consumerClient
data TestData
wantErr bool
}{
{
name: "success",
mock: consumerMock{
readMessage: func(_ context.Context) (kafka.Message, error) {
return kafka.Message{
Value: []byte("Kf+BAwEBCFRlc3REYXRhAf+CAAECAQVBbHBoYQEMAAEEQmV0YQEEAAAAD/+CAQZhYmMxMjMB/gLtAA=="),
}, nil
},
close: func() error { return nil },
},
data: TestData{Alpha: "abc123", Beta: -375},
wantErr: false,
},
{
name: "empty",
mock: consumerMock{
readMessage: func(_ context.Context) (kafka.Message, error) { return kafka.Message{Value: []byte{}}, nil },
close: func() error { return nil },
},
wantErr: true,
},
{
name: "error",
mock: consumerMock{
readMessage: func(_ context.Context) (kafka.Message, error) { return kafka.Message{}, errors.New("error") },
close: func() error { return nil },
},
wantErr: true,
},
{
name: "invalid message",
mock: consumerMock{
readMessage: func(_ context.Context) (kafka.Message, error) {
return kafka.Message{
Value: []byte("你好世界"),
}, nil
},
close: func() error { return nil },
},
wantErr: true,
},
}

for _, tt := range tests {
tt := tt

t.Run(tt.name, func(t *testing.T) {
t.Parallel()

ctx := context.TODO()
cli, err := NewConsumer([]string{"url1", "url2"}, "topic", "groupID")
require.NoError(t, err)
require.NotNil(t, cli)

cli.client = tt.mock

var data TestData

err = cli.ReceiveData(ctx, &data)
if tt.wantErr {
require.Error(t, err)

return
}

require.NoError(t, err)
require.Equal(t, tt.data.Alpha, data.Alpha)
require.Equal(t, tt.data.Beta, data.Beta)
})
}
}
17 changes: 17 additions & 0 deletions pkg/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,20 @@ func WithFirstOffset() Option {
c.startOffset = kafka.FirstOffset
}
}

// WithMessageEncodeFunc allow to replace DefaultMessageEncodeFunc.
// This function used by SendData() to encode the input data.
func WithMessageEncodeFunc(f TEncodeFunc) Option {
return func(c *config) {
c.messageEncodeFunc = f
}
}

// WithMessageDecodeFunc allow to replace DefaultMessageDecodeFunc().
// This function used by ReceiveData() to decode a message encoded with messageEncodeFunc to the provided data object.
// The value underlying data must be a pointer to the correct type for the next data item received.
func WithMessageDecodeFunc(f TDecodeFunc) Option {
return func(c *config) {
c.messageDecodeFunc = f
}
}
Loading