Skip to content

Commit 4f567ec

Browse files
committed
feat(broker): allow to set queue name dynamically when kicking on redis-cluster broker
1 parent 0d62e3b commit 4f567ec

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

taskiq_redis/redis_cluster_broker.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ async def kick(self, message: BrokerMessage) -> None:
5555
5656
:param message: message to append.
5757
"""
58-
await self.redis.lpush(self.queue_name, message.message) # type: ignore
58+
queue_name = message.labels.get("queue_name") or self.queue_name
59+
await self.redis.lpush(queue_name, message.message) # type: ignore
5960

6061
async def listen(self) -> AsyncGenerator[bytes, None]:
6162
"""
@@ -162,8 +163,9 @@ async def kick(self, message: BrokerMessage) -> None:
162163
163164
:param message: message to append.
164165
"""
166+
queue_name = message.labels.get("queue_name") or self.queue_name
165167
await self.redis.xadd(
166-
self.queue_name,
168+
queue_name,
167169
{b"data": message.message},
168170
maxlen=self.maxlen,
169171
approximate=self.approximate,

0 commit comments

Comments
 (0)