Skip to content

Commit e00041e

Browse files
Calyhreleibale
andauthored
Fix: XAUTOCLAIM after a TRIM with pending messages returns nil (#2565)
* fix(client): XCLAIM & XAUTOCLAIM after a TRIM might return nils * fix(client): Fix race condition in specs * revert test utils changes * make tests faster --------- Co-authored-by: Leibale Eidelman <[email protected]>
1 parent 4ec97be commit e00041e

File tree

6 files changed

+154
-27
lines changed

6 files changed

+154
-27
lines changed

packages/client/lib/commands/XAUTOCLAIM.spec.ts

+68-12
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,76 @@ describe('XAUTOCLAIM', () => {
2323
});
2424
});
2525

26-
testUtils.testWithClient('client.xAutoClaim', async client => {
27-
await Promise.all([
28-
client.xGroupCreate('key', 'group', '$', {
29-
MKSTREAM: true
30-
}),
26+
testUtils.testWithClient('client.xAutoClaim without messages', async client => {
27+
const [,, reply] = await Promise.all([
28+
client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }),
3129
client.xGroupCreateConsumer('key', 'group', 'consumer'),
30+
client.xAutoClaim('key', 'group', 'consumer', 1, '0-0')
3231
]);
3332

34-
assert.deepEqual(
35-
await client.xAutoClaim('key', 'group', 'consumer', 1, '0-0'),
36-
{
37-
nextId: '0-0',
38-
messages: []
39-
}
40-
);
33+
assert.deepEqual(reply, {
34+
nextId: '0-0',
35+
messages: []
36+
});
37+
}, GLOBAL.SERVERS.OPEN);
38+
39+
testUtils.testWithClient('client.xAutoClaim with messages', async client => {
40+
const [,, id,, reply] = await Promise.all([
41+
client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }),
42+
client.xGroupCreateConsumer('key', 'group', 'consumer'),
43+
client.xAdd('key', '*', { foo: 'bar' }),
44+
client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }),
45+
client.xAutoClaim('key', 'group', 'consumer', 0, '0-0')
46+
]);
47+
48+
assert.deepEqual(reply, {
49+
nextId: '0-0',
50+
messages: [{
51+
id,
52+
message: Object.create(null, {
53+
foo: {
54+
value: 'bar',
55+
configurable: true,
56+
enumerable: true
57+
}
58+
})
59+
}]
60+
});
61+
}, GLOBAL.SERVERS.OPEN);
62+
63+
testUtils.testWithClient('client.xAutoClaim with trimmed messages', async client => {
64+
const [,,,,, id,, reply] = await Promise.all([
65+
client.xGroupCreate('key', 'group', '$', { MKSTREAM: true }),
66+
client.xGroupCreateConsumer('key', 'group', 'consumer'),
67+
client.xAdd('key', '*', { foo: 'bar' }),
68+
client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }),
69+
client.xTrim('key', 'MAXLEN', 0),
70+
client.xAdd('key', '*', { bar: 'baz' }),
71+
client.xReadGroup('group', 'consumer', { key: 'key', id: '>' }),
72+
client.xAutoClaim('key', 'group', 'consumer', 0, '0-0')
73+
]);
74+
75+
assert.deepEqual(reply, {
76+
nextId: '0-0',
77+
messages: testUtils.isVersionGreaterThan([7, 0]) ? [{
78+
id,
79+
message: Object.create(null, {
80+
bar: {
81+
value: 'baz',
82+
configurable: true,
83+
enumerable: true
84+
}
85+
})
86+
}] : [null, {
87+
id,
88+
message: Object.create(null, {
89+
bar: {
90+
value: 'baz',
91+
configurable: true,
92+
enumerable: true
93+
}
94+
})
95+
}]
96+
});
4197
}, GLOBAL.SERVERS.OPEN);
4298
});

packages/client/lib/commands/XAUTOCLAIM.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { RedisCommandArgument, RedisCommandArguments } from '.';
2-
import { StreamMessagesReply, transformStreamMessagesReply } from './generic-transformers';
2+
import { StreamMessagesNullReply, transformStreamMessagesNullReply } from './generic-transformers';
33

44
export const FIRST_KEY_INDEX = 1;
55

@@ -28,12 +28,12 @@ type XAutoClaimRawReply = [RedisCommandArgument, Array<any>];
2828

2929
interface XAutoClaimReply {
3030
nextId: RedisCommandArgument;
31-
messages: StreamMessagesReply;
31+
messages: StreamMessagesNullReply;
3232
}
3333

3434
export function transformReply(reply: XAutoClaimRawReply): XAutoClaimReply {
3535
return {
3636
nextId: reply[0],
37-
messages: transformStreamMessagesReply(reply[1])
37+
messages: transformStreamMessagesNullReply(reply[1])
3838
};
3939
}

packages/client/lib/commands/XCLAIM.spec.ts

+31-1
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,38 @@ describe('XCLAIM', () => {
8383
});
8484

8585
assert.deepEqual(
86-
await client.xClaim('key', 'group', 'consumer', 1, '0-0'),
86+
await client.xClaim('key', 'group', 'consumer', 0, '0-0'),
8787
[]
8888
);
8989
}, GLOBAL.SERVERS.OPEN);
90+
91+
testUtils.testWithClient('client.xClaim with a message', async client => {
92+
await client.xGroupCreate('key', 'group', '$', { MKSTREAM: true });
93+
const id = await client.xAdd('key', '*', { foo: 'bar' });
94+
await client.xReadGroup('group', 'consumer', { key: 'key', id: '>' });
95+
96+
assert.deepEqual(
97+
await client.xClaim('key', 'group', 'consumer', 0, id),
98+
[{
99+
id,
100+
message: Object.create(null, { 'foo': {
101+
value: 'bar',
102+
configurable: true,
103+
enumerable: true
104+
} })
105+
}]
106+
);
107+
}, GLOBAL.SERVERS.OPEN);
108+
109+
testUtils.testWithClient('client.xClaim with a trimmed message', async client => {
110+
await client.xGroupCreate('key', 'group', '$', { MKSTREAM: true });
111+
const id = await client.xAdd('key', '*', { foo: 'bar' });
112+
await client.xReadGroup('group', 'consumer', { key: 'key', id: '>' });
113+
await client.xTrim('key', 'MAXLEN', 0);
114+
115+
assert.deepEqual(
116+
await client.xClaim('key', 'group', 'consumer', 0, id),
117+
testUtils.isVersionGreaterThan([7, 0]) ? []: [null]
118+
);
119+
}, GLOBAL.SERVERS.OPEN);
90120
});

packages/client/lib/commands/XCLAIM.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,4 @@ export function transformArguments(
4545
return args;
4646
}
4747

48-
export { transformStreamMessagesReply as transformReply } from './generic-transformers';
48+
export { transformStreamMessagesNullReply as transformReply } from './generic-transformers';

packages/client/lib/commands/generic-transformers.spec.ts

+33
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
transformStringNumberInfinityArgument,
1010
transformTuplesReply,
1111
transformStreamMessagesReply,
12+
transformStreamMessagesNullReply,
1213
transformStreamsMessagesReply,
1314
transformSortedSetWithScoresReply,
1415
pushGeoCountArgument,
@@ -219,6 +220,38 @@ describe('Generic Transformers', () => {
219220
);
220221
});
221222

223+
it('transformStreamMessagesNullReply', () => {
224+
assert.deepEqual(
225+
transformStreamMessagesNullReply([null, ['0-0', ['0key', '0value']]]),
226+
[null, {
227+
id: '0-0',
228+
message: Object.create(null, {
229+
'0key': {
230+
value: '0value',
231+
configurable: true,
232+
enumerable: true
233+
}
234+
})
235+
}]
236+
);
237+
});
238+
239+
it('transformStreamMessagesNullReply', () => {
240+
assert.deepEqual(
241+
transformStreamMessagesNullReply([null, ['0-1', ['11key', '11value']]]),
242+
[null, {
243+
id: '0-1',
244+
message: Object.create(null, {
245+
'11key': {
246+
value: '11value',
247+
configurable: true,
248+
enumerable: true
249+
}
250+
})
251+
}]
252+
);
253+
});
254+
222255
describe('transformStreamsMessagesReply', () => {
223256
it('null', () => {
224257
assert.equal(

packages/client/lib/commands/generic-transformers.ts

+18-10
Original file line numberDiff line numberDiff line change
@@ -92,19 +92,27 @@ export interface StreamMessageReply {
9292
message: Record<string, RedisCommandArgument>;
9393
}
9494

95-
export type StreamMessagesReply = Array<StreamMessageReply>;
95+
export function transformStreamMessageReply([id, message]: Array<any>): StreamMessageReply {
96+
return {
97+
id,
98+
message: transformTuplesReply(message)
99+
};
100+
}
96101

97-
export function transformStreamMessagesReply(reply: Array<any>): StreamMessagesReply {
98-
const messages = [];
102+
export function transformStreamMessageNullReply(reply: Array<any>): StreamMessageReply | null {
103+
if (reply === null) return null;
104+
return transformStreamMessageReply(reply);
105+
}
99106

100-
for (const [id, message] of reply) {
101-
messages.push({
102-
id,
103-
message: transformTuplesReply(message)
104-
});
105-
}
106107

107-
return messages;
108+
export type StreamMessagesReply = Array<StreamMessageReply>;
109+
export function transformStreamMessagesReply(reply: Array<any>): StreamMessagesReply {
110+
return reply.map(transformStreamMessageReply);
111+
}
112+
113+
export type StreamMessagesNullReply = Array<StreamMessageReply | null>;
114+
export function transformStreamMessagesNullReply(reply: Array<any>): StreamMessagesNullReply {
115+
return reply.map(transformStreamMessageNullReply);
108116
}
109117

110118
export type StreamsMessagesReply = Array<{

0 commit comments

Comments
 (0)