Skip to content

Commit 8dde48b

Browse files
committed
fix: retry DLQ
Signed-off-by: Eray Ates <[email protected]>
1 parent 68592cf commit 8dde48b

18 files changed

+362
-157
lines changed

client.go

+27-18
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@ type Client struct {
2929
consumerMutex sync.RWMutex
3030
logger Logger
3131

32-
dlqRecord *kgo.Record
33-
dlqCheckTrigger func()
34-
dlqRetryAt time.Time
35-
hook *hooker
36-
cancel context.CancelFunc
37-
trigger []func(context.Context)
32+
dlqRecord DLQRecord
33+
dlqRetryTrigger func(opts []OptionDLQTriggerFn)
34+
dlqMutex sync.RWMutex
35+
36+
hook *hooker
37+
cancel context.CancelFunc
38+
trigger []func(context.Context)
3839

3940
topicsCheck []string
4041
appName string
@@ -388,8 +389,8 @@ func (c *Client) Skip(ctx context.Context, modify func(SkipMap) SkipMap) {
388389
c.modifySkip(modify)
389390

390391
c.callTrigger(ctx)
391-
if c.dlqCheckTrigger != nil {
392-
c.dlqCheckTrigger()
392+
if c.dlqRetryTrigger != nil {
393+
c.dlqRetryTrigger(nil)
393394
}
394395

395396
c.logger.Info("wkafka skip modified", "skip", c.consumerConfig.Skip)
@@ -409,23 +410,31 @@ func (c *Client) ClientID() []byte {
409410

410411
// setDLQRecord to set stucked DLQRecord.
411412
// - Using in DLQ iteration.
412-
func (c *Client) setDLQRecord(r *kgo.Record, t time.Time) {
413-
c.dlqRecord = r
414-
c.dlqRetryAt = t
415-
}
413+
func (c *Client) setDLQRecord(r *kgo.Record, t time.Time, err error) {
414+
c.dlqMutex.Lock()
415+
defer c.dlqMutex.Unlock()
416416

417-
// DLQRetryAt returns stucked DLQRecord's retry time.
418-
// - Using in DLQ iteration only if DLQRecord is not nil.
419-
func (c *Client) DLQRetryAt() time.Time {
420-
return c.dlqRetryAt
417+
c.dlqRecord.Err = err
418+
c.dlqRecord.Record = r
419+
c.dlqRecord.RetryAt = t
421420
}
422421

423422
// DLQRecord returns stucked DLQRecord if exists.
424-
// - Warning: return pointer and not modify it.
425-
func (c *Client) DLQRecord() *kgo.Record {
423+
// - Warning: return pointer record and not modify it.
424+
func (c *Client) DLQRecord() DLQRecord {
425+
c.dlqMutex.RLock()
426+
defer c.dlqMutex.RUnlock()
427+
426428
return c.dlqRecord
427429
}
428430

431+
// DLQRetry to trigger DLQ retry and not wait sleep.
432+
func (c *Client) DLQRetry(opts ...OptionDLQTriggerFn) {
433+
if c.dlqRetryTrigger != nil {
434+
c.dlqRetryTrigger(opts)
435+
}
436+
}
437+
429438
func (c *Client) callTrigger(ctx context.Context) {
430439
go func() {
431440
for _, t := range c.trigger {

consumer.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func WithCallbackBatch[T any](fn func(ctx context.Context, msg []T) error) CallB
182182
DLQProcess: processDLQ,
183183
}
184184

185-
o.Client.dlqCheckTrigger = processDLQ.Trigger
185+
o.Client.dlqRetryTrigger = processDLQ.Trigger
186186

187187
return nil
188188
}
@@ -228,7 +228,7 @@ func WithCallback[T any](fn func(ctx context.Context, msg T) error) CallBackFunc
228228
DLQProcess: processDLQ,
229229
}
230230

231-
o.Client.dlqCheckTrigger = processDLQ.Trigger
231+
o.Client.dlqRetryTrigger = processDLQ.Trigger
232232

233233
return nil
234234
}

dlq.go

+66-9
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,25 @@ import (
1313
type DLQRecord struct {
1414
Record *kgo.Record
1515
RetryAt time.Time
16+
Err error
1617
}
1718

1819
type dlqProcess[T any] struct {
1920
customer *customer[T]
2021

2122
isRevokedRecord func(r *kgo.Record) bool
22-
setDLQRecord func(r *kgo.Record, t time.Time)
23+
setDLQRecord func(r *kgo.Record, t time.Time, err error)
2324
callTrigger func(ctx context.Context)
2425
processDLQ func(ctx context.Context, msg T) error
2526

26-
checkFunc func()
27+
checkFunc func(opts []OptionDLQTriggerFn)
2728
checkFuncMutex sync.Mutex
2829
}
2930

3031
func newDLQProcess[T any](
3132
c *customer[T],
3233
isRevokedRecord func(r *kgo.Record) bool,
33-
dlqRecord func(r *kgo.Record, t time.Time),
34+
dlqRecord func(r *kgo.Record, t time.Time, err error),
3435
callTrigger func(ctx context.Context),
3536
processDLQ func(ctx context.Context, msg T) error,
3637
) *dlqProcess[T] {
@@ -86,7 +87,7 @@ func (d *dlqProcess[T]) Iteration(ctx context.Context, r *kgo.Record) error {
8687

8788
firstIteration := true
8889
defer func() {
89-
d.setDLQRecord(nil, time.Time{})
90+
d.setDLQRecord(nil, time.Time{}, nil)
9091
d.setCheckFunc(nil)
9192
d.callTrigger(ctx)
9293
}()
@@ -105,18 +106,40 @@ func (d *dlqProcess[T]) Iteration(ctx context.Context, r *kgo.Record) error {
105106
if err := d.iterationRecordDLQ(ctx, r); err != nil {
106107
errOrg, ok := IsDQLError(err)
107108
var errWrapped error
109+
var errOrgDefault error
108110
if ok {
111+
errOrgDefault = errOrg.Err
109112
errWrapped = wrapErr(r, errOrg.Err, true)
110113
} else {
114+
errOrgDefault = err
111115
errWrapped = wrapErr(r, err, true)
112116
}
113117

114118
d.customer.Logger.Error("DLQ process failed", "error", errWrapped, "retry_interval", wait.CurrentInterval().Truncate(time.Second).String())
115119

116-
d.setDLQRecord(r, time.Now().Add(wait.CurrentInterval()))
120+
d.setDLQRecord(r, time.Now().Add(wait.CurrentInterval()), errOrgDefault)
117121

118122
if firstIteration {
119-
d.setCheckFunc(func() {
123+
d.setCheckFunc(func(opts []OptionDLQTriggerFn) {
124+
o := &OptionDLQTrigger{}
125+
for _, opt := range opts {
126+
opt(o)
127+
}
128+
129+
if o.Force {
130+
wait.Trigger()
131+
132+
return
133+
}
134+
135+
if o.Specs != nil {
136+
if r.Topic == o.Specs.Topic && r.Partition == o.Specs.Partition && r.Offset == o.Specs.Offset {
137+
wait.Trigger()
138+
139+
return
140+
}
141+
}
142+
120143
if d.customer.Skip(d.customer.Cfg, r) {
121144
wait.Trigger()
122145
}
@@ -140,18 +163,52 @@ func (d *dlqProcess[T]) Iteration(ctx context.Context, r *kgo.Record) error {
140163
return nil
141164
}
142165

143-
func (d *dlqProcess[T]) Trigger() {
166+
func (d *dlqProcess[T]) Trigger(opts []OptionDLQTriggerFn) {
144167
d.checkFuncMutex.Lock()
145168
defer d.checkFuncMutex.Unlock()
146169

147170
if d.checkFunc != nil {
148-
d.checkFunc()
171+
d.checkFunc(opts)
149172
}
150173
}
151174

152-
func (d *dlqProcess[T]) setCheckFunc(fn func()) {
175+
func (d *dlqProcess[T]) setCheckFunc(fn func(opts []OptionDLQTriggerFn)) {
153176
d.checkFuncMutex.Lock()
154177
defer d.checkFuncMutex.Unlock()
155178

156179
d.checkFunc = fn
157180
}
181+
182+
// ////////////////////////////////////////////////////////////////////////////
183+
184+
type DLQTriggerSpecs struct {
185+
Topic string `cfg:"topic" json:"topic"`
186+
Partition int32 `cfg:"partition" json:"partition"`
187+
Offset int64 `cfg:"offset" json:"offset"`
188+
}
189+
190+
type OptionDLQTrigger struct {
191+
Force bool `cfg:"force" json:"force"`
192+
Specs *DLQTriggerSpecs `cfg:"specs" json:"specs"`
193+
}
194+
195+
func (o *OptionDLQTrigger) ToOption() OptionDLQTriggerFn {
196+
return func(opt *OptionDLQTrigger) {
197+
opt.Force = o.Force
198+
opt.Specs = o.Specs
199+
}
200+
}
201+
202+
type OptionDLQTriggerFn func(*OptionDLQTrigger)
203+
204+
func WithForceDLQTrigger() OptionDLQTriggerFn {
205+
return func(o *OptionDLQTrigger) {
206+
o.Force = true
207+
}
208+
}
209+
210+
func WithDLQTriggerSpecs(specs *DLQTriggerSpecs) OptionDLQTriggerFn {
211+
return func(o *OptionDLQTrigger) {
212+
o.Specs = specs
213+
}
214+
}

plugins/handler/_ui/dist/assets/index-CnR-J8Dz.js plugins/handler/_ui/dist/assets/index-Bv-H8zf2.js

+105-105
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/handler/_ui/dist/assets/index-COF3pdLZ.css

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/handler/_ui/dist/assets/index-Cd4AzzrC.css

-1
This file was deleted.

plugins/handler/_ui/dist/index.html

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
<!doctype html><html lang="en"><head><meta charset="utf-8"/><meta name="viewport" content="width=device-width,initial-scale=1"/><link rel="icon" type="image/svg+xml" href="./logo.svg"/><title>wkafka</title><script type="module" crossorigin src="./assets/index-CnR-J8Dz.js"></script><link rel="stylesheet" crossorigin href="./assets/index-Cd4AzzrC.css"></head><body></body></html>
1+
<!doctype html><html lang="en"><head><meta charset="utf-8"/><meta name="viewport" content="width=device-width,initial-scale=1"/><link rel="icon" type="image/svg+xml" href="./logo.svg"/><title>wkafka</title><script type="module" crossorigin src="./assets/index-Bv-H8zf2.js"></script><link rel="stylesheet" crossorigin href="./assets/index-COF3pdLZ.css"></head><body></body></html>

plugins/handler/_ui/src/components/Skip.svelte plugins/handler/_ui/src/components/Action.svelte

+12-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
<script lang="ts">
2-
import { skip } from "@/helper/api";
2+
import { retry, skip } from "@/helper/api";
33
44
const skipFunc = () => {
55
skip(topic, partition, offset);
66
};
77
8+
const retryFunc = () => {
9+
retry(topic, partition, offset);
10+
};
11+
812
export let topic: string;
913
export let partition: number;
1014
export let offset: number;
@@ -16,3 +20,10 @@
1620
>
1721
Skip
1822
</button>
23+
24+
<button
25+
class="bg-gray-600 hover:bg-yellow-500 text-white font-bold px-10 border-t border-b border-black"
26+
on:click|preventDefault|stopPropagation={retryFunc}
27+
>
28+
Retry
29+
</button>

plugins/handler/_ui/src/components/Counter.svelte

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
});
1313
1414
const startTimer = (dt: string) => {
15+
clearInterval(timer);
16+
1517
let end = Date.parse(dt);
1618
1719
timer = countDownTimer(end, (v: string) => {

plugins/handler/_ui/src/components/Info.svelte

+5-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<script lang="ts">
22
import { storeInfo } from "@/store/store";
33
import TreeView from "svelte-tree-view";
4-
import Skip from "./Skip.svelte";
4+
import Action from "./Action.svelte";
55
import { getField, getFieldWithDecode } from "@/helper/codec";
66
import View from "./View.svelte";
77
import type { Info } from "@/store/model";
@@ -66,13 +66,15 @@
6666
<View value64={value.dlq_record.value} title="Record" />
6767
<View
6868
value64={getField("error", value.dlq_record.headers)}
69-
title="Error"
69+
title="Record Error"
7070
/>
71+
<hr class="my-2" />
72+
<View value={value.error} title="Process Error" />
7173
</div>
7274
</div>
7375
<div class="flex flex-row justify-between items-baseline">
7476
<div class="mt-2">
75-
<Skip
77+
<Action
7678
topic={value.dlq_record.topic}
7779
partition={value.dlq_record.partition}
7880
offset={value.dlq_record.offset}

plugins/handler/_ui/src/components/Record.svelte

-11
This file was deleted.

plugins/handler/_ui/src/components/View.svelte

+5-2
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,19 @@
55
66
export let title = "";
77
export let value64 = "";
8+
export let value = "";
89
910
let toggle = false;
10-
let value = "";
1111
let jsonValue: unknown;
1212
let className = "";
1313
1414
export { className as class };
1515
1616
onMount(() => {
17-
value = base64ToStr(value64);
17+
if (value64 != "") {
18+
value = base64ToStr(value64);
19+
}
20+
1821
jsonValue = parseJSON(value);
1922
2023
if (!!jsonValue) {

plugins/handler/_ui/src/helper/api.ts

+23
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import axios from 'axios';
66
const endpoints = {
77
info: '../v1/info',
88
skip: '../v1/skip',
9+
retryDLQ: '../v1/retry-dlq',
910
event: '../v1/event',
1011
}
1112

@@ -59,3 +60,25 @@ export const skip = async (topic: string, partition: number, offset: number) =>
5960
addToast('skip request failed', 'alert');
6061
}
6162
}
63+
64+
65+
export const retry = async (topic: string, partition: number, offset: number) => {
66+
try {
67+
await axios.post(endpoints.retryDLQ, {
68+
"specs": {
69+
"topic": topic,
70+
"partition": partition,
71+
"offset": offset
72+
}
73+
}, {
74+
headers: {
75+
'Content-Type': 'application/json'
76+
}
77+
});
78+
79+
addToast('retry request sent');
80+
} catch (error) {
81+
console.error(error);
82+
addToast('retry request failed', 'alert');
83+
}
84+
}

plugins/handler/_ui/src/store/model.ts

+1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export interface Info {
55
skip?: Map<string, Map<number, OffsetConfig>>;
66
dlq_record?: DlqRecord;
77
retry_at?: string;
8+
error?: string;
89

910
updated_at: number;
1011
}

0 commit comments

Comments
 (0)