Skip to content

Commit fa82ec9

Browse files
committed
fix: update info
Signed-off-by: Eray Ates <[email protected]>
1 parent 8a1ec38 commit fa82ec9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

74 files changed

+1181
-633
lines changed

client.go

+91-35
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"slices"
78
"sync"
89
"time"
910

@@ -28,17 +29,21 @@ type Client struct {
2829
consumerMutex sync.RWMutex
2930
logger Logger
3031

31-
dlqRecord *kgo.Record
32-
hook *hooker
33-
cancel context.CancelFunc
34-
Trigger []func()
32+
dlqRecord *kgo.Record
33+
dlqCheckTrigger func()
34+
dlqRetryAt time.Time
35+
hook *hooker
36+
cancel context.CancelFunc
37+
trigger []func(context.Context)
3538

39+
topicsCheck []string
40+
appName string
3641
// log purpose
3742

38-
Brokers []string
39-
DLQTopics []string
40-
Topics []string
41-
Meter Meter
43+
brokers []string
44+
dlqTopics []string
45+
topics []string
46+
meter Meter
4247
}
4348

4449
func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
@@ -76,8 +81,10 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
7681
consumerConfig: o.ConsumerConfig,
7782
logger: o.Logger,
7883
clientID: []byte(o.ClientID),
79-
Meter: o.Meter,
80-
hook: &hooker{},
84+
meter: o.Meter,
85+
hook: &hooker{
86+
ctx: context.Background(),
87+
},
8188
}
8289

8390
kgoClient, err := newClient(c, cfg, &o, false)
@@ -123,13 +130,16 @@ func New(ctx context.Context, cfg Config, opts ...Option) (*Client, error) {
123130
}
124131
}
125132

126-
c.Brokers = cfg.Brokers
133+
c.brokers = cfg.Brokers
127134

128135
ctx, cancel := context.WithCancel(ctx)
129136
c.cancel = cancel
130137

138+
c.topicsCheck = slices.Concat(c.topics, c.dlqTopics)
139+
c.appName = o.AppName
140+
131141
for name, p := range o.Plugin.holder {
132-
if err := p(ctx, c, cfg.Plugin[name]); err != nil {
142+
if err := p(ctx, c, cfg.Plugins[name]); err != nil {
133143
return nil, fmt.Errorf("plugin %s: %w", name, err)
134144
}
135145
}
@@ -223,10 +233,10 @@ func newClient(c *Client, cfg Config, o *options, isDLQ bool) (*kgo.Client, erro
223233
}
224234

225235
kgoOpt = append(kgoOpt, kgo.ConsumeTopics(topics...))
226-
c.DLQTopics = topics
236+
c.dlqTopics = topics
227237
} else {
228238
kgoOpt = append(kgoOpt, kgo.ConsumeTopics(o.ConsumerConfig.Topics...))
229-
c.Topics = o.ConsumerConfig.Topics
239+
c.topics = o.ConsumerConfig.Topics
230240
}
231241
}
232242

@@ -278,7 +288,7 @@ func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...Opt
278288
o := optionConsumer{
279289
Client: c,
280290
ConsumerConfig: c.consumerConfig,
281-
Meter: c.Meter,
291+
Meter: c.meter,
282292
}
283293

284294
opts = append([]OptionConsumer{OptionConsumer(callback)}, opts...)
@@ -295,9 +305,9 @@ func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...Opt
295305
if c.KafkaDLQ == nil {
296306
c.hook.setCtx(ctx)
297307

298-
c.logger.Info("wkafka start consuming", "topics", c.Topics)
308+
c.logger.Info("wkafka start consuming", "topics", c.topics)
299309
if err := o.Consumer.Consume(ctx, c.Kafka); err != nil {
300-
return fmt.Errorf("failed to consume %v: %w", c.Topics, err)
310+
return fmt.Errorf("failed to consume %v: %w", c.topics, err)
301311
}
302312

303313
return nil
@@ -311,18 +321,18 @@ func (c *Client) Consume(ctx context.Context, callback CallBackFunc, opts ...Opt
311321
c.hook.setCtx(ctx)
312322

313323
g.Go(func() error {
314-
c.logger.Info("wkafka start consuming", "topics", c.Topics)
324+
c.logger.Info("wkafka start consuming", "topics", c.topics)
315325
if err := o.Consumer.Consume(ctx, c.Kafka); err != nil {
316-
return fmt.Errorf("failed to consume %v: %w", c.Topics, err)
326+
return fmt.Errorf("failed to consume %v: %w", c.topics, err)
317327
}
318328

319329
return nil
320330
})
321331

322332
g.Go(func() error {
323-
c.logger.Info("wkafka start consuming DLQ", "topics", c.DLQTopics)
333+
c.logger.Info("wkafka start consuming DLQ", "topics", c.dlqTopics)
324334
if err := o.ConsumerDLQ.Consume(ctx, c.KafkaDLQ); err != nil {
325-
return fmt.Errorf("failed to consume DLQ %v: %w", c.Topics, err)
335+
return fmt.Errorf("failed to consume DLQ %v: %w", c.topics, err)
326336
}
327337

328338
return nil
@@ -351,22 +361,38 @@ func (c *Client) Admin() *kadm.Client {
351361
return kadm.NewClient(c.Kafka)
352362
}
353363

354-
// Skip for modifying skip configuration in runtime.
355-
// - Useful for DLQ topic.
356-
// - Don't wait inside the modify function.
357-
func (c *Client) Skip(modify func(SkipMap) SkipMap) {
364+
func (c *Client) modifySkip(modify func(SkipMap) SkipMap) {
358365
c.consumerMutex.Lock()
359366
defer c.consumerMutex.Unlock()
360367

368+
newSkip := modify(cloneSkip(c.consumerConfig.Skip))
369+
370+
// eliminate not related topics
371+
for topic := range newSkip {
372+
if !slices.Contains(c.topicsCheck, topic) {
373+
delete(newSkip, topic)
374+
}
375+
}
376+
377+
c.consumerConfig.Skip = newSkip
378+
}
379+
380+
// Skip for modifying skip configuration in runtime.
381+
// - Useful for DLQ topic.
382+
// - Don't wait inside the modify function.
383+
func (c *Client) Skip(ctx context.Context, modify func(SkipMap) SkipMap) {
361384
if modify == nil {
362385
return
363386
}
364387

365-
c.consumerConfig.Skip = modify(c.consumerConfig.Skip)
388+
c.modifySkip(modify)
366389

367-
c.callTrigger()
390+
c.callTrigger(ctx)
391+
if c.dlqCheckTrigger != nil {
392+
c.dlqCheckTrigger()
393+
}
368394

369-
c.logger.Debug("wkafka skip modified", "skip", c.consumerConfig.Skip)
395+
c.logger.Info("wkafka skip modified", "skip", c.consumerConfig.Skip)
370396
}
371397

372398
// SkipCheck returns skip configuration's deep clone.
@@ -381,27 +407,57 @@ func (c *Client) ClientID() []byte {
381407
return c.clientID
382408
}
383409

384-
// SetDLQRecord to set stucked DLQRecord.
410+
// setDLQRecord to set stucked DLQRecord.
385411
// - Using in DLQ iteration.
386-
func (c *Client) setDLQRecord(r *kgo.Record) {
412+
func (c *Client) setDLQRecord(r *kgo.Record, t time.Time) {
387413
c.dlqRecord = r
414+
c.dlqRetryAt = t
415+
}
388416

389-
c.callTrigger()
417+
// DLQRetryAt returns stucked DLQRecord's retry time.
418+
// - Using in DLQ iteration only if DLQRecord is not nil.
419+
func (c *Client) DLQRetryAt() time.Time {
420+
return c.dlqRetryAt
390421
}
391422

392423
// DLQRecord returns stucked DLQRecord if exists.
424+
// - Warning: return pointer and not modify it.
393425
func (c *Client) DLQRecord() *kgo.Record {
394426
return c.dlqRecord
395427
}
396428

397-
func (c *Client) callTrigger() {
429+
func (c *Client) callTrigger(ctx context.Context) {
398430
go func() {
399-
for _, t := range c.Trigger {
400-
t()
431+
for _, t := range c.trigger {
432+
if ctx.Err() != nil {
433+
return
434+
}
435+
436+
t(ctx)
401437
}
402438
}()
403439
}
404440

405-
func (c *Client) GetLogger() Logger {
441+
func (c *Client) AddTrigger(t func(context.Context)) {
442+
c.trigger = append(c.trigger, t)
443+
}
444+
445+
func (c *Client) Logger() Logger {
406446
return c.logger
407447
}
448+
449+
func (c *Client) Topics() []string {
450+
return c.topics
451+
}
452+
453+
func (c *Client) Brokers() []string {
454+
return c.brokers
455+
}
456+
457+
func (c *Client) DLQTopics() []string {
458+
return c.dlqTopics
459+
}
460+
461+
func (c *Client) AppName() string {
462+
return c.appName
463+
}

clientoptions.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,8 @@ func WithPingBackoff(b backoff.BackOff) Option {
151151
}
152152
}
153153

154-
func WithPlugin(name string, fn PluginFunc) Option {
154+
func WithPlugin[T any](name string, fn PluginFunc[T]) Option {
155155
return func(o *options) {
156-
o.Plugin.Add(name, fn)
156+
o.Plugin.Add(name, pluginConvert(fn))
157157
}
158158
}

config.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ type Config struct {
3030
// Consumer is a pre configuration for consumer and validation.
3131
Consumer ConsumerPreConfig `cfg:"consumer" json:"consumer"`
3232

33-
// Plugin add custom plugins to the client like handler.
34-
Plugin map[string]interface{} `cfg:"plugin" json:"plugin"`
33+
// Plugins add custom plugins to the client like handler.
34+
Plugins map[string]interface{} `cfg:"plugins" json:"plugins"`
3535
}
3636

3737
type ConsumerPreConfig struct {

consumer.go

+26-11
Original file line numberDiff line numberDiff line change
@@ -168,17 +168,22 @@ func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallB
168168
return nil
169169
}
170170

171+
processDLQ := newDLQProcess(
172+
&customer,
173+
o.Client.partitionHandlerDLQ.IsRevokedRecord,
174+
o.Client.setDLQRecord,
175+
o.Client.callTrigger,
176+
dlqProcessBatch(fn))
177+
171178
o.ConsumerDLQ = &consumerBatch[T]{
172179
customer: &customer,
173180
IsDLQ: true,
174181
PartitionHandler: o.Client.partitionHandlerDLQ,
175-
DLQProcess: newDLQProcess(
176-
&customer,
177-
o.Client.partitionHandlerDLQ.IsRevokedRecord,
178-
o.Client.setDLQRecord,
179-
dlqProcessBatch(fn)),
182+
DLQProcess: processDLQ,
180183
}
181184

185+
o.Client.dlqCheckTrigger = processDLQ.Trigger
186+
182187
return nil
183188
}
184189
}
@@ -209,17 +214,22 @@ func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc
209214
return nil
210215
}
211216

217+
processDLQ := newDLQProcess(
218+
&customer,
219+
o.Client.partitionHandlerDLQ.IsRevokedRecord,
220+
o.Client.setDLQRecord,
221+
o.Client.callTrigger,
222+
fn)
223+
212224
o.ConsumerDLQ = &consumerSingle[T]{
213225
customer: &customer,
214226
PartitionHandler: o.Client.partitionHandlerDLQ,
215227
IsDLQ: true,
216-
DLQProcess: newDLQProcess(
217-
&customer,
218-
o.Client.partitionHandlerDLQ.IsRevokedRecord,
219-
o.Client.setDLQRecord,
220-
fn),
228+
DLQProcess: processDLQ,
221229
}
222230

231+
o.Client.dlqCheckTrigger = processDLQ.Trigger
232+
223233
return nil
224234
}
225235
}
@@ -309,9 +319,14 @@ func cloneSkip(skip SkipMap) SkipMap {
309319
offsets := make([]int64, len(offset.Offsets))
310320
copy(offsets, offset.Offsets)
311321

322+
var before *int64
323+
if offset.Before != nil {
324+
before = ToPtr(*offset.Before)
325+
}
326+
312327
skipNew[topic][partition] = OffsetConfig{
313328
Offsets: offsets,
314-
Before: offset.Before,
329+
Before: before,
315330
}
316331
}
317332
}

0 commit comments

Comments
 (0)