Skip to content

Commit

Permalink
Add RedisMessenger::add_stream_with_buffer_size
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Nov 15, 2023
1 parent a6508b4 commit 0fa1e9c
Showing 1 changed file with 28 additions and 0 deletions.
28 changes: 28 additions & 0 deletions plerkle_messenger/src/redis_messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,34 @@ impl Messenger for RedisMessenger {
}
}

async fn add_stream_with_buffer_size(
&mut self,
stream_key: &'static str,
max_buffer_size: usize,
) -> Result<(), MessengerError> {
// Add to streams hashmap.
let _result = self.streams.insert(
stream_key,
RedisMessengerStream {
max_len: Some(StreamMaxlen::Approx(max_buffer_size)),
local_buffer: LinkedList::new(),
local_buffer_total: 0,
local_buffer_last_flush: Instant::now(),
},
);

// Add stream to Redis.
let result: RedisResult<()> = self
.connection
.xgroup_create_mkstream(stream_key, self.consumer_group_name.as_str(), "$")
.await;

if let Err(e) = result {
info!("Group already exists: {:?}", e)
}
Ok(())
}

async fn add_stream(&mut self, stream_key: &'static str) -> Result<(), MessengerError> {
// Add to streams hashmap.
let _result = self.streams.insert(
Expand Down

0 comments on commit 0fa1e9c

Please sign in to comment.