Skip to content

Commit

Permalink
Add RedisMessenger::add_stream_with_buffer_size
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Nov 15, 2023
1 parent a6508b4 commit fc3df6b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
5 changes: 5 additions & 0 deletions plerkle_messenger/src/plerkle_messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ pub trait Messenger: Sync + Send {
where
Self: Sized;
fn messenger_type(&self) -> MessengerType;
async fn add_stream_with_buffer_size(
&mut self,
stream_key: &'static str,
max_buffer_size: usize,
);
async fn add_stream(&mut self, stream_key: &'static str) -> Result<(), MessengerError>;
async fn set_buffer_size(&mut self, stream_key: &'static str, max_buffer_size: usize);
async fn send(&mut self, stream_key: &'static str, bytes: &[u8]) -> Result<(), MessengerError>;
Expand Down
27 changes: 27 additions & 0 deletions plerkle_messenger/src/redis_messenger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,33 @@ impl Messenger for RedisMessenger {
}
}

async fn add_stream_with_buffer_size(
&mut self,
stream_key: &'static str,
max_buffer_size: usize,
) {
// Add to streams hashmap.
let _result = self.streams.insert(
stream_key,
RedisMessengerStream {
max_len: Some(StreamMaxlen::Approx(max_buffer_size)),
local_buffer: LinkedList::new(),
local_buffer_total: 0,
local_buffer_last_flush: Instant::now(),
},
);

// Add stream to Redis.
let result: RedisResult<()> = self
.connection
.xgroup_create_mkstream(stream_key, self.consumer_group_name.as_str(), "$")
.await;

if let Err(e) = result {
info!("Group already exists: {:?}", e)
}
}

async fn add_stream(&mut self, stream_key: &'static str) -> Result<(), MessengerError> {
// Add to streams hashmap.
let _result = self.streams.insert(
Expand Down

0 comments on commit fc3df6b

Please sign in to comment.