From eca10b8c5ca8a173a822644a79757f5d29d684d3 Mon Sep 17 00:00:00 2001 From: aleksei-okatiev <64004628+aleksei-okatiev@users.noreply.github.com> Date: Tue, 12 Nov 2024 20:15:35 +0100 Subject: [PATCH 1/2] added loop to the listen method --- Gemfile.lock | 2 +- lib/redis_stream/subscriber.rb | 16 +++++++++------- lib/redis_stream/version.rb | 2 +- spec/redis_stream/subscriber_spec.rb | 20 ++++++++++++++++---- 4 files changed, 27 insertions(+), 13 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index efce514..4db03eb 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - redis_stream (0.1.5) + redis_stream (0.2.0) GEM remote: https://rubygems.org/ diff --git a/lib/redis_stream/subscriber.rb b/lib/redis_stream/subscriber.rb index 633c078..d0a4d61 100644 --- a/lib/redis_stream/subscriber.rb +++ b/lib/redis_stream/subscriber.rb @@ -13,13 +13,15 @@ def self.listen(streams:, group: nil, consumer: nil, &block) create_group(stream_key, group) end - # listen for up to 10 messages forever - ids = Array.new(streams.length, ">") - messages = RedisStream.client.xreadgroup(group, consumer, streams, ids, count: 1, block: 0, noack: true) - - messages.each do |stream, stream_messages| - stream_messages.each do |message_id, message_hash| - yield(stream, message_id, message_hash["name"], JSON.parse(message_hash["json"])) + loop do + # listen for up to 10 messages forever + ids = Array.new(streams.length, ">") + messages = RedisStream.client.xreadgroup(group, consumer, streams, ids, count: 1, block: 0, noack: true) + + messages.each do |stream, stream_messages| + stream_messages.each do |message_id, message_hash| + yield(stream, message_id, message_hash["name"], JSON.parse(message_hash["json"])) + end end end end diff --git a/lib/redis_stream/version.rb b/lib/redis_stream/version.rb index 125fd8d..90f5d77 100644 --- a/lib/redis_stream/version.rb +++ b/lib/redis_stream/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module RedisStream - VERSION = "0.1.5" + VERSION = "0.2.0" end diff --git a/spec/redis_stream/subscriber_spec.rb b/spec/redis_stream/subscriber_spec.rb index 4a192a2..ab32cdd 100644 --- a/spec/redis_stream/subscriber_spec.rb +++ b/spec/redis_stream/subscriber_spec.rb @@ -9,12 +9,24 @@ describe ".listen" do it "subscribes to the stream" do - described_class.listen(streams: "stream_test") do |message| - expect(message).to eq("test") + allow(described_class).to receive(:loop).and_yield.twice + + allow(RedisStream.client).to receive(:xreadgroup).and_return([ + ["stream_test", [["message_id", {"name" => "test", "json" => "{}"}]]] + ]) + + described_class.listen(streams: "stream_test") do |stream, message_id, name, json| + expect(stream).to eq("stream_test") + expect(message_id).to eq("message_id") + expect(name).to eq("test") + expect(json).to eq({}) end - described_class.listen(streams: ["stream_test"]) do |message| - expect(message).to eq("test") + described_class.listen(streams: ["stream_test"]) do |stream, message_id, name, json| + expect(stream).to eq("stream_test") + expect(message_id).to eq("message_id") + expect(name).to eq("test") + expect(json).to eq({}) end end end From 48e3c8843217bef1f93ae19b2b578c04aa160a1c Mon Sep 17 00:00:00 2001 From: aleksei-okatiev <64004628+aleksei-okatiev@users.noreply.github.com> Date: Tue, 12 Nov 2024 20:26:03 +0100 Subject: [PATCH 2/2] standard fix --- lib/redis_stream/subscriber.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/redis_stream/subscriber.rb b/lib/redis_stream/subscriber.rb index d0a4d61..3de2c2e 100644 --- a/lib/redis_stream/subscriber.rb +++ b/lib/redis_stream/subscriber.rb @@ -17,7 +17,7 @@ def self.listen(streams:, group: nil, consumer: nil, &block) # listen for up to 10 messages forever ids = Array.new(streams.length, ">") messages = RedisStream.client.xreadgroup(group, consumer, streams, ids, count: 1, block: 0, noack: true) - + messages.each do |stream, stream_messages| stream_messages.each do |message_id, message_hash| yield(stream, message_id, message_hash["name"], JSON.parse(message_hash["json"]))