diff --git a/src/creation_macros.rs b/src/creation_macros.rs index 0ecf151..517ebcc 100644 --- a/src/creation_macros.rs +++ b/src/creation_macros.rs @@ -7,10 +7,12 @@ /// /// ```ignore /// evident::create_static_publisher!( -/// pub , +/// , /// , -/// , -/// , +/// , +/// , +/// filter_type = , +/// filter = , /// CAPTURE_CHANNEL_BOUND = <`usize` literal for the channel bound used to capture events>, /// SUBSCRIPTION_CHANNEL_BOUND = <`usize` literal for the channel bound used per subscription>, /// non_blocking = <`bool` literal defining if event finalizing should be non-blocking (`true`), or block the thread (`false`)> @@ -31,47 +33,73 @@ /// ); /// ``` /// +/// **Example with filter:** +/// +/// ```ignore +/// evident::create_static_publisher!( +/// pub MY_PUBLISHER, +/// MyId, +/// MyEventEntry, +/// MyIntermEvent, +/// filter_type = MyFilter, +/// filter = MyFilter::default(), +/// CAPTURE_CHANNEL_BOUND = 100, +/// SUBSCRIPTION_CHANNEL_BOUND = 50, +/// non_blocking = true +/// ); +/// ``` +/// #[macro_export] macro_rules! create_static_publisher { ($publisher_name:ident, $id_t:ty, $entry_t:ty, $interm_event_t:ty, + $(filter_type=$filter_t:ty,)? + $(filter=$filter:expr,)? CAPTURE_CHANNEL_BOUND = $cap_channel_bound:expr, SUBSCRIPTION_CHANNEL_BOUND = $sub_channel_bound:expr, non_blocking = $try_capture:literal ) => { - $crate::z__create_static_publisher!($publisher_name, + $crate::z__setup_static_publisher!( + $publisher_name, $id_t, $entry_t, $interm_event_t, $cap_channel_bound, $sub_channel_bound, $try_capture + $(, filter_type=$filter_t)? + $(, filter=$filter)? ); }; ($visibility:vis $publisher_name:ident, $id_t:ty, $entry_t:ty, $interm_event_t:ty, + $(filter_type=$filter_t:ty,)? + $(filter=$filter:expr,)? CAPTURE_CHANNEL_BOUND = $cap_channel_bound:expr, SUBSCRIPTION_CHANNEL_BOUND = $sub_channel_bound:expr, non_blocking = $try_capture:literal ) => { - $crate::z__create_static_publisher!($publisher_name, + $crate::z__setup_static_publisher!( + $publisher_name, $id_t, $entry_t, $interm_event_t, $cap_channel_bound, $sub_channel_bound, - $try_capture - scope=$visibility + $try_capture, + scope = $visibility + $(, filter_type=$filter_t)? + $(, filter=$filter)? ); }; } #[macro_export] -macro_rules! z__create_static_publisher { +macro_rules! z__setup_static_publisher { ($publisher_name:ident, $id_t:ty, $entry_t:ty, @@ -79,18 +107,23 @@ macro_rules! z__create_static_publisher { $cap_channel_bound:expr, $sub_channel_bound:expr, $try_capture:literal - $(scope=$visibility:vis)? + $(, scope=$visibility:vis)? + $(, filter_type=$filter_t:ty)? + $(, filter=$filter:expr)? ) => { - $($visibility)? static $publisher_name: $crate::once_cell::sync::Lazy< - $crate::publisher::EvidentPublisher<$id_t, $entry_t>, - > = $crate::once_cell::sync::Lazy::new(|| { - $crate::publisher::EvidentPublisher::< - $id_t, - $entry_t, - >::new(|event| { - $publisher_name.on_event(event); - }, $cap_channel_bound, $sub_channel_bound) - }); + + $crate::z__create_static_publisher!( + $publisher_name, + $id_t, + $entry_t, + $interm_event_t, + $(filter_type=$filter_t,)? + $(filter=$filter,)? + $cap_channel_bound, + $sub_channel_bound, + $try_capture + $(, scope=$visibility)? + ); impl Drop for $interm_event_t { fn drop(&mut self) { @@ -133,6 +166,56 @@ macro_rules! z__create_static_publisher { }; } +#[macro_export] +macro_rules! z__create_static_publisher { + ($publisher_name:ident, + $id_t:ty, + $entry_t:ty, + $interm_event_t:ty, + filter_type=$filter_t:ty, + filter=$filter:expr, + $cap_channel_bound:expr, + $sub_channel_bound:expr, + $try_capture:literal + $(, scope=$visibility:vis)? + ) => { + $($visibility)? static $publisher_name: $crate::once_cell::sync::Lazy< + $crate::publisher::EvidentPublisher<$id_t, $entry_t, $filter_t>, + > = $crate::once_cell::sync::Lazy::new(|| { + $crate::publisher::EvidentPublisher::< + $id_t, + $entry_t, + $filter_t + >::with(|event| { + $publisher_name.on_event(event); + }, $filter, $cap_channel_bound, $sub_channel_bound) + }); + }; + ($publisher_name:ident, + $id_t:ty, + $entry_t:ty, + $interm_event_t:ty, + $cap_channel_bound:expr, + $sub_channel_bound:expr, + $try_capture:literal + $(, scope=$visibility:vis)? + ) => { + type DummyFilter = $crate::event::filter::DummyFilter<$id_t, $entry_t>; + + $($visibility)? static $publisher_name: $crate::once_cell::sync::Lazy< + $crate::publisher::EvidentPublisher<$id_t, $entry_t, DummyFilter>, + > = $crate::once_cell::sync::Lazy::new(|| { + $crate::publisher::EvidentPublisher::< + $id_t, + $entry_t, + DummyFilter + >::new(|event| { + $publisher_name.on_event(event); + }, $cap_channel_bound, $sub_channel_bound) + }); + } +} + /// Macro to create the `set_event!()` macro for a concrete implementation. /// /// ## Usage diff --git a/src/event/filter.rs b/src/event/filter.rs new file mode 100644 index 0000000..eb785b1 --- /dev/null +++ b/src/event/filter.rs @@ -0,0 +1,34 @@ +use std::marker::PhantomData; + +use crate::publisher::{Id, StopCapturing}; + +use super::{entry::EventEntry, intermediary::IntermediaryEvent}; + +pub trait Filter +where + K: Id + StopCapturing, + T: EventEntry, +{ + /// Return `true` if the event should be captured. + fn allow_event(&self, event: &mut impl IntermediaryEvent) -> bool; +} + +#[derive(Default, Debug)] +pub struct DummyFilter +where + K: Id + StopCapturing, + T: EventEntry, +{ + v1: PhantomData, + v2: PhantomData, +} + +impl Filter for DummyFilter +where + K: Id + StopCapturing, + T: EventEntry, +{ + fn allow_event(&self, _event: &mut impl IntermediaryEvent) -> bool { + true + } +} diff --git a/src/event/mod.rs b/src/event/mod.rs index a7215c2..6a4cc32 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -5,6 +5,7 @@ use crate::publisher::Id; use self::{entry::EventEntry, intermediary::IntermediaryEvent, origin::Origin}; pub mod entry; +pub mod filter; pub mod intermediary; pub mod origin; diff --git a/src/publisher.rs b/src/publisher.rs index 8153038..effefac 100644 --- a/src/publisher.rs +++ b/src/publisher.rs @@ -9,8 +9,8 @@ use std::{ }; use crate::{ - event::{entry::EventEntry, intermediary::IntermediaryEvent, Event}, - subscription::{Subscription, SubscriptionErr, SubscriptionSender}, + event::{entry::EventEntry, filter::Filter, intermediary::IntermediaryEvent, Event}, + subscription::{Subscription, SubscriptionError, SubscriptionSender}, }; pub trait Id: @@ -37,23 +37,26 @@ pub trait StopCapturing { type Subscriber = HashMap>; type Capturer = Option>>; -pub struct EvidentPublisher +pub struct EvidentPublisher where K: Id + StopCapturing, T: EventEntry, + F: Filter, SyncSender>: Clone, { pub(crate) subscriptions: Arc>>>, pub(crate) any_event: Arc>>, pub(crate) capturer: Arc>>, + filter: Option, capture_channel_bound: usize, subscription_channel_bound: usize, } -impl EvidentPublisher +impl EvidentPublisher where K: Id + StopCapturing, T: EventEntry, + F: Filter, SyncSender>: Clone, { pub fn new( @@ -67,6 +70,7 @@ where subscriptions: Arc::new(RwLock::new(HashMap::new())), any_event: Arc::new(RwLock::new(HashMap::new())), capturer: Arc::new(RwLock::new(Some(send))), + filter: None, capture_channel_bound, subscription_channel_bound, }; @@ -92,8 +96,56 @@ where publisher } + pub fn with( + mut on_event: impl FnMut(Event) + std::marker::Send + 'static, + filter: F, + capture_channel_bound: usize, + subscription_channel_bound: usize, + ) -> Self { + let (send, recv): (SyncSender>, _) = mpsc::sync_channel(capture_channel_bound); + + let publisher = EvidentPublisher { + subscriptions: Arc::new(RwLock::new(HashMap::new())), + any_event: Arc::new(RwLock::new(HashMap::new())), + capturer: Arc::new(RwLock::new(Some(send))), + filter: Some(filter), + capture_channel_bound, + subscription_channel_bound, + }; + let capturer = publisher.capturer.clone(); + + thread::spawn(move || { + while let Ok(event) = recv.recv() { + let id = event.get_id().clone(); + + on_event(event); + + // Note: `on_event` must still be called to notify all listeners to stop aswell + if StopCapturing::stop_capturing(&id) { + break; + } + } + + if let Ok(mut locked_cap) = capturer.write() { + *locked_cap = None; + } + }); + + publisher + } + + pub fn get_filter(&self) -> &Option { + &self.filter + } + pub fn capture>(&self, interm_event: &mut I) { - if let Ok(locked_cap) = self.capturer.try_read() { + if let Some(filter) = &self.filter { + if !filter.allow_event(interm_event) { + return; + } + } + + if let Ok(locked_cap) = self.capturer.read() { if locked_cap.is_some() { let _ = locked_cap .as_ref() @@ -104,6 +156,12 @@ where } pub fn try_capture>(&self, interm_event: &mut I) { + if let Some(filter) = &self.filter { + if !filter.allow_event(interm_event) { + return; + } + } + if let Ok(locked_cap) = self.capturer.try_read() { if locked_cap.is_some() { let _ = locked_cap @@ -114,11 +172,14 @@ where } } - pub fn subscribe(&self, id: K) -> Result, SubscriptionErr> { + pub fn subscribe(&self, id: K) -> Result, SubscriptionError> { self.subscribe_to_many(vec![id]) } - pub fn subscribe_to_many(&self, ids: Vec) -> Result, SubscriptionErr> { + pub fn subscribe_to_many( + &self, + ids: Vec, + ) -> Result, SubscriptionError> { // Note: Number of ids to listen to most likely affects the number of received events => number is added to channel bound // Addition instead of multiplikation, because even distribution accross events is highly unlikely. let (sender, receiver) = mpsc::sync_channel(ids.len() + self.subscription_channel_bound); @@ -141,7 +202,7 @@ where } } None => { - return Err(SubscriptionErr::CouldNotAccessPublisher); + return Err(SubscriptionError::CouldNotAccessPublisher); } } @@ -154,7 +215,7 @@ where }) } - pub fn subscribe_to_all_events(&self) -> Result, SubscriptionErr> { + pub fn subscribe_to_all_events(&self) -> Result, SubscriptionError> { let (sender, receiver) = mpsc::sync_channel(self.capture_channel_bound); let channel_id = crate::uuid::Uuid::new_v4(); @@ -163,7 +224,7 @@ where locked_vec.insert(channel_id, SubscriptionSender { channel_id, sender }); } None => { - return Err(SubscriptionErr::CouldNotAccessPublisher); + return Err(SubscriptionError::CouldNotAccessPublisher); } } diff --git a/src/subscription.rs b/src/subscription.rs index 26972f4..d6ec2ec 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -5,26 +5,28 @@ use std::{ }; use crate::{ - event::{entry::EventEntry, Event}, + event::{entry::EventEntry, filter::Filter, Event}, publisher::{EvidentPublisher, Id, StopCapturing}, }; -pub struct Subscription<'p, K, T> +pub struct Subscription<'p, K, T, F> where K: Id + StopCapturing, T: EventEntry, + F: Filter, { pub(crate) channel_id: crate::uuid::Uuid, pub(crate) receiver: Receiver>, pub(crate) sub_to_all: bool, pub(crate) subscriptions: Option>, - pub(crate) publisher: &'p EvidentPublisher, + pub(crate) publisher: &'p EvidentPublisher, } -impl<'p, K, T> Subscription<'p, K, T> +impl<'p, K, T, F> Subscription<'p, K, T, F> where K: Id + StopCapturing, T: EventEntry, + F: Filter, { pub fn get_receiver(&self) -> &Receiver> { &self.receiver @@ -34,24 +36,24 @@ where drop(self) } - pub fn unsubscribe_id(&mut self, id: K) -> Result<(), SubscriptionErr> { + pub fn unsubscribe_id(&mut self, id: K) -> Result<(), SubscriptionError> { self.unsubscribe_many(vec![id]) } - pub fn unsubscribe_many(&mut self, ids: Vec) -> Result<(), SubscriptionErr> { + pub fn unsubscribe_many(&mut self, ids: Vec) -> Result<(), SubscriptionError> { if self.sub_to_all || self.subscriptions.is_none() { - return Err(SubscriptionErr::AllEventsSubscriptionNotModifiable); + return Err(SubscriptionError::AllEventsSubscriptionNotModifiable); } let subs = self.subscriptions.as_mut().unwrap(); if ids.len() >= subs.len() { - return Err(SubscriptionErr::UnsubscribeWouldDeleteSubscription); + return Err(SubscriptionError::UnsubscribeWouldDeleteSubscription); } for id in ids.clone() { if !subs.contains(&id) { - return Err(SubscriptionErr::IdNotSubscribed(id)); + return Err(SubscriptionError::IdNotSubscribed(id)); } } @@ -66,30 +68,30 @@ where Ok(()) } - Err(_) => Err(SubscriptionErr::CouldNotAccessPublisher), + Err(_) => Err(SubscriptionError::CouldNotAccessPublisher), } } - pub fn subscribe_id(&mut self, id: K) -> Result<(), SubscriptionErr> { + pub fn subscribe_id(&mut self, id: K) -> Result<(), SubscriptionError> { self.subscribe_many(vec![id]) } - pub fn subscribe_many(&mut self, ids: Vec) -> Result<(), SubscriptionErr> { + pub fn subscribe_many(&mut self, ids: Vec) -> Result<(), SubscriptionError> { if self.sub_to_all || self.subscriptions.is_none() { - return Err(SubscriptionErr::AllEventsSubscriptionNotModifiable); + return Err(SubscriptionError::AllEventsSubscriptionNotModifiable); } let subs = self.subscriptions.as_mut().unwrap(); for id in ids.clone() { if subs.contains(&id) { - return Err(SubscriptionErr::IdAlreadySubscribed(id)); + return Err(SubscriptionError::IdAlreadySubscribed(id)); } } let any_sub_id = match subs.iter().next() { Some(id) => id, None => { - return Err(SubscriptionErr::NoSubscriptionChannelAvailable); + return Err(SubscriptionError::NoSubscriptionChannelAvailable); } }; @@ -98,15 +100,15 @@ where Some(id_subs) => match id_subs.get(&self.channel_id) { Some(sub_sender) => sub_sender.clone(), None => { - return Err(SubscriptionErr::NoSubscriptionChannelAvailable); + return Err(SubscriptionError::NoSubscriptionChannelAvailable); } }, None => { - return Err(SubscriptionErr::NoSubscriptionChannelAvailable); + return Err(SubscriptionError::NoSubscriptionChannelAvailable); } }, Err(_) => { - return Err(SubscriptionErr::CouldNotAccessPublisher); + return Err(SubscriptionError::CouldNotAccessPublisher); } }; @@ -129,15 +131,16 @@ where Ok(()) } - Err(_) => Err(SubscriptionErr::CouldNotAccessPublisher), + Err(_) => Err(SubscriptionError::CouldNotAccessPublisher), } } } -impl<'p, K, T> Drop for Subscription<'p, K, T> +impl<'p, K, T, F> Drop for Subscription<'p, K, T, F> where K: Id + StopCapturing, T: EventEntry, + F: Filter, { fn drop(&mut self) { // Note: We do not want to block the current thread for *unsubscribing*, since publisher also maintains dead channels. @@ -157,27 +160,30 @@ where } } -impl<'p, K, T> PartialEq for Subscription<'p, K, T> +impl<'p, K, T, F> PartialEq for Subscription<'p, K, T, F> where K: Id + StopCapturing, T: EventEntry, + F: Filter, { fn eq(&self, other: &Self) -> bool { self.channel_id == other.channel_id } } -impl<'p, K, T> Eq for Subscription<'p, K, T> +impl<'p, K, T, F> Eq for Subscription<'p, K, T, F> where K: Id + StopCapturing, T: EventEntry, + F: Filter, { } -impl<'p, K, T> Hash for Subscription<'p, K, T> +impl<'p, K, T, F> Hash for Subscription<'p, K, T, F> where K: Id + StopCapturing, T: EventEntry, + F: Filter, { fn hash(&self, state: &mut H) { self.channel_id.hash(state); @@ -185,7 +191,7 @@ where } #[derive(Debug, Clone)] -pub enum SubscriptionErr { +pub enum SubscriptionError { AllEventsSubscriptionNotModifiable, IdNotSubscribed(K), IdAlreadySubscribed(K), diff --git a/tests/min_filter/entry.rs b/tests/min_filter/entry.rs new file mode 100644 index 0000000..24a56c9 --- /dev/null +++ b/tests/min_filter/entry.rs @@ -0,0 +1,48 @@ +use evident::event::{entry::EventEntry, origin::Origin}; + +use super::id::MinId; + +#[derive(Default, Clone)] +pub struct MinEventEntry { + event_id: MinId, + msg: String, + + entry_id: evident::uuid::Uuid, + origin: Origin, +} + +impl EventEntry for MinEventEntry { + fn new(event_id: MinId, msg: &str, origin: Origin) -> Self { + MinEventEntry { + event_id, + msg: msg.to_string(), + + entry_id: evident::uuid::Uuid::new_v4(), + origin, + } + } + + fn get_event_id(&self) -> &MinId { + &self.event_id + } + + fn into_event_id(self) -> MinId { + self.event_id + } + + fn get_entry_id(&self) -> evident::uuid::Uuid { + self.entry_id + } + + fn get_msg(&self) -> &str { + &self.msg + } + + fn get_crate_name(&self) -> &'static str { + &self.origin.crate_name + } + + fn get_origin(&self) -> &evident::event::origin::Origin { + &self.origin + } +} diff --git a/tests/min_filter/filter.rs b/tests/min_filter/filter.rs new file mode 100644 index 0000000..8e0abe3 --- /dev/null +++ b/tests/min_filter/filter.rs @@ -0,0 +1,18 @@ +use evident::event::filter::Filter; + +use super::{entry::MinEventEntry, id::MinId}; + +#[derive(Default)] +pub struct MinFilter {} + +impl Filter for MinFilter { + fn allow_event( + &self, + event: &mut impl evident::event::intermediary::IntermediaryEvent, + ) -> bool { + if event.get_event_id().id % 2 == 0 { + return true; + } + false + } +} diff --git a/tests/min_filter/id.rs b/tests/min_filter/id.rs new file mode 100644 index 0000000..a4ec165 --- /dev/null +++ b/tests/min_filter/id.rs @@ -0,0 +1,18 @@ +#[derive(Debug, Default, Clone, Hash, PartialEq, Eq, Copy)] +pub struct MinId { + pub id: isize, +} + +impl evident::publisher::Id for MinId {} + +const STOP_CAPTURING: MinId = MinId { id: 0 }; + +impl evident::publisher::StopCapturing for MinId { + fn stop_capturing(id: &Self) -> bool { + if id == &STOP_CAPTURING { + return true; + } + + false + } +} diff --git a/tests/min_filter/interim_event.rs b/tests/min_filter/interim_event.rs new file mode 100644 index 0000000..caba0be --- /dev/null +++ b/tests/min_filter/interim_event.rs @@ -0,0 +1,23 @@ +use evident::event::{entry::EventEntry, intermediary::IntermediaryEvent, origin::Origin}; + +use super::{entry::MinEventEntry, id::MinId}; + +pub struct MinInterimEvent { + entry: MinEventEntry, +} + +impl IntermediaryEvent for MinInterimEvent { + fn new(event_id: MinId, msg: &str, origin: Origin) -> Self { + MinInterimEvent { + entry: MinEventEntry::new(event_id, msg, origin), + } + } + + fn get_entry(&self) -> &MinEventEntry { + &self.entry + } + + fn take_entry(&mut self) -> MinEventEntry { + std::mem::take(&mut self.entry) + } +} diff --git a/tests/min_filter/mod.rs b/tests/min_filter/mod.rs new file mode 100644 index 0000000..359d670 --- /dev/null +++ b/tests/min_filter/mod.rs @@ -0,0 +1,50 @@ +use self::{entry::MinEventEntry, filter::MinFilter, id::MinId, interim_event::MinInterimEvent}; + +mod entry; +mod filter; +mod id; +mod interim_event; + +evident::create_static_publisher!( + PUBLISHER, + MinId, + MinEventEntry, + MinInterimEvent, + filter_type = MinFilter, + filter = MinFilter::default(), + CAPTURE_CHANNEL_BOUND = 1, + SUBSCRIPTION_CHANNEL_BOUND = 1, + non_blocking = true +); + +// Note: **no_export** to prevent the macro from adding `#[macro_export]`. +evident::create_set_event_macro!( + no_export + MinId, + MinEventEntry, + MinInterimEvent +); + +#[test] +fn setup_minimal_filtered_publisher() { + let allowed_id = MinId { id: 2 }; + let filtered_id = MinId { id: 3 }; + let msg = "Some msg"; + + let sub = PUBLISHER + .subscribe_to_many(vec![filtered_id, allowed_id]) + .unwrap(); + + // This event is not captured + set_event!(filtered_id, msg).finalize(); + + // This event is captured + set_event!(allowed_id, msg).finalize(); + + let event = sub + .get_receiver() + .recv_timeout(std::time::Duration::from_millis(100)) + .unwrap(); + + assert_eq!(event.get_id(), &allowed_id, "Allowed Id was not captured."); +} diff --git a/tests/tests.rs b/tests/tests.rs index 57215f1..bbd812c 100644 --- a/tests/tests.rs +++ b/tests/tests.rs @@ -1,3 +1,4 @@ pub mod min_concretise; +pub mod min_filter; pub mod pub_sub; pub mod public_concretise;