Skip to content

Commit

Permalink
feat: add event filter
Browse files Browse the repository at this point in the history
  • Loading branch information
mhatzl committed Jun 23, 2023
1 parent 287b802 commit e052054
Show file tree
Hide file tree
Showing 11 changed files with 396 additions and 53 deletions.
121 changes: 102 additions & 19 deletions src/creation_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
///
/// ```ignore
/// evident::create_static_publisher!(
/// pub <Name for the publisher>,
/// <visibility specifier> <Name for the publisher>,
/// <Struct implementing `evident::publisher::Id`>,
/// <Struct implementing `evident::event::EventEntry`>,
/// <Struct implementing `evident::event::IntermediaryEvent`>,
/// <Struct implementing `evident::event::entry::EventEntry`>,
/// <Struct implementing `evident::event::intermediary::IntermediaryEvent`>,
/// filter_type = <Optional Struct implementing `evident::event::filter::Filter`>,
/// filter = <Optional instance of the filter. Must be set if filter type is set>,
/// CAPTURE_CHANNEL_BOUND = <`usize` literal for the channel bound used to capture events>,
/// SUBSCRIPTION_CHANNEL_BOUND = <`usize` literal for the channel bound used per subscription>,
/// non_blocking = <`bool` literal defining if event finalizing should be non-blocking (`true`), or block the thread (`false`)>
Expand All @@ -31,66 +33,97 @@
/// );
/// ```
///
/// **Example with filter:**
///
/// ```ignore
/// evident::create_static_publisher!(
/// pub MY_PUBLISHER,
/// MyId,
/// MyEventEntry,
/// MyIntermEvent,
/// filter_type = MyFilter,
/// filter = MyFilter::default(),
/// CAPTURE_CHANNEL_BOUND = 100,
/// SUBSCRIPTION_CHANNEL_BOUND = 50,
/// non_blocking = true
/// );
/// ```
///
#[macro_export]
macro_rules! create_static_publisher {
($publisher_name:ident,
$id_t:ty,
$entry_t:ty,
$interm_event_t:ty,
$(filter_type=$filter_t:ty,)?
$(filter=$filter:expr,)?
CAPTURE_CHANNEL_BOUND = $cap_channel_bound:expr,
SUBSCRIPTION_CHANNEL_BOUND = $sub_channel_bound:expr,
non_blocking = $try_capture:literal
) => {
$crate::z__create_static_publisher!($publisher_name,
$crate::z__setup_static_publisher!(
$publisher_name,
$id_t,
$entry_t,
$interm_event_t,
$cap_channel_bound,
$sub_channel_bound,
$try_capture
$(, filter_type=$filter_t)?
$(, filter=$filter)?
);
};
($visibility:vis $publisher_name:ident,
$id_t:ty,
$entry_t:ty,
$interm_event_t:ty,
$(filter_type=$filter_t:ty,)?
$(filter=$filter:expr,)?
CAPTURE_CHANNEL_BOUND = $cap_channel_bound:expr,
SUBSCRIPTION_CHANNEL_BOUND = $sub_channel_bound:expr,
non_blocking = $try_capture:literal
) => {
$crate::z__create_static_publisher!($publisher_name,
$crate::z__setup_static_publisher!(
$publisher_name,
$id_t,
$entry_t,
$interm_event_t,
$cap_channel_bound,
$sub_channel_bound,
$try_capture
scope=$visibility
$try_capture,
scope = $visibility
$(, filter_type=$filter_t)?
$(, filter=$filter)?
);
};
}

#[macro_export]
macro_rules! z__create_static_publisher {
macro_rules! z__setup_static_publisher {
($publisher_name:ident,
$id_t:ty,
$entry_t:ty,
$interm_event_t:ty,
$cap_channel_bound:expr,
$sub_channel_bound:expr,
$try_capture:literal
$(scope=$visibility:vis)?
$(, scope=$visibility:vis)?
$(, filter_type=$filter_t:ty)?
$(, filter=$filter:expr)?
) => {
$($visibility)? static $publisher_name: $crate::once_cell::sync::Lazy<
$crate::publisher::EvidentPublisher<$id_t, $entry_t>,
> = $crate::once_cell::sync::Lazy::new(|| {
$crate::publisher::EvidentPublisher::<
$id_t,
$entry_t,
>::new(|event| {
$publisher_name.on_event(event);
}, $cap_channel_bound, $sub_channel_bound)
});

$crate::z__create_static_publisher!(
$publisher_name,
$id_t,
$entry_t,
$interm_event_t,
$(filter_type=$filter_t,)?
$(filter=$filter,)?
$cap_channel_bound,
$sub_channel_bound,
$try_capture
$(, scope=$visibility)?
);

impl Drop for $interm_event_t {
fn drop(&mut self) {
Expand Down Expand Up @@ -133,6 +166,56 @@ macro_rules! z__create_static_publisher {
};
}

#[macro_export]
macro_rules! z__create_static_publisher {
($publisher_name:ident,
$id_t:ty,
$entry_t:ty,
$interm_event_t:ty,
filter_type=$filter_t:ty,
filter=$filter:expr,
$cap_channel_bound:expr,
$sub_channel_bound:expr,
$try_capture:literal
$(, scope=$visibility:vis)?
) => {
$($visibility)? static $publisher_name: $crate::once_cell::sync::Lazy<
$crate::publisher::EvidentPublisher<$id_t, $entry_t, $filter_t>,
> = $crate::once_cell::sync::Lazy::new(|| {
$crate::publisher::EvidentPublisher::<
$id_t,
$entry_t,
$filter_t
>::with(|event| {
$publisher_name.on_event(event);
}, $filter, $cap_channel_bound, $sub_channel_bound)
});
};
($publisher_name:ident,
$id_t:ty,
$entry_t:ty,
$interm_event_t:ty,
$cap_channel_bound:expr,
$sub_channel_bound:expr,
$try_capture:literal
$(, scope=$visibility:vis)?
) => {
type DummyFilter = $crate::event::filter::DummyFilter<$id_t, $entry_t>;

$($visibility)? static $publisher_name: $crate::once_cell::sync::Lazy<
$crate::publisher::EvidentPublisher<$id_t, $entry_t, DummyFilter>,
> = $crate::once_cell::sync::Lazy::new(|| {
$crate::publisher::EvidentPublisher::<
$id_t,
$entry_t,
DummyFilter
>::new(|event| {
$publisher_name.on_event(event);
}, $cap_channel_bound, $sub_channel_bound)
});
}
}

/// Macro to create the `set_event!()` macro for a concrete implementation.
///
/// ## Usage
Expand Down
34 changes: 34 additions & 0 deletions src/event/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::marker::PhantomData;

use crate::publisher::{Id, StopCapturing};

use super::{entry::EventEntry, intermediary::IntermediaryEvent};

pub trait Filter<K, T>
where
K: Id + StopCapturing,
T: EventEntry<K>,
{
/// Return `true` if the event should be captured.
fn allow_event(&self, event: &mut impl IntermediaryEvent<K, T>) -> bool;
}

#[derive(Default, Debug)]
pub struct DummyFilter<K, T>
where
K: Id + StopCapturing,
T: EventEntry<K>,
{
v1: PhantomData<K>,
v2: PhantomData<T>,
}

impl<K, T> Filter<K, T> for DummyFilter<K, T>
where
K: Id + StopCapturing,
T: EventEntry<K>,
{
fn allow_event(&self, _event: &mut impl IntermediaryEvent<K, T>) -> bool {
true
}
}
1 change: 1 addition & 0 deletions src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::publisher::Id;
use self::{entry::EventEntry, intermediary::IntermediaryEvent, origin::Origin};

pub mod entry;
pub mod filter;
pub mod intermediary;
pub mod origin;

Expand Down
81 changes: 71 additions & 10 deletions src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use std::{
};

use crate::{
event::{entry::EventEntry, intermediary::IntermediaryEvent, Event},
subscription::{Subscription, SubscriptionErr, SubscriptionSender},
event::{entry::EventEntry, filter::Filter, intermediary::IntermediaryEvent, Event},
subscription::{Subscription, SubscriptionError, SubscriptionSender},
};

pub trait Id:
Expand All @@ -37,23 +37,26 @@ pub trait StopCapturing {
type Subscriber<K, T> = HashMap<crate::uuid::Uuid, SubscriptionSender<K, T>>;
type Capturer<K, T> = Option<SyncSender<Event<K, T>>>;

pub struct EvidentPublisher<K, T>
pub struct EvidentPublisher<K, T, F>
where
K: Id + StopCapturing,
T: EventEntry<K>,
F: Filter<K, T>,
SyncSender<Event<K, T>>: Clone,
{
pub(crate) subscriptions: Arc<RwLock<HashMap<K, Subscriber<K, T>>>>,
pub(crate) any_event: Arc<RwLock<Subscriber<K, T>>>,
pub(crate) capturer: Arc<RwLock<Capturer<K, T>>>,
filter: Option<F>,
capture_channel_bound: usize,
subscription_channel_bound: usize,
}

impl<K, T> EvidentPublisher<K, T>
impl<K, T, F> EvidentPublisher<K, T, F>
where
K: Id + StopCapturing,
T: EventEntry<K>,
F: Filter<K, T>,
SyncSender<Event<K, T>>: Clone,
{
pub fn new(
Expand All @@ -67,6 +70,7 @@ where
subscriptions: Arc::new(RwLock::new(HashMap::new())),
any_event: Arc::new(RwLock::new(HashMap::new())),
capturer: Arc::new(RwLock::new(Some(send))),
filter: None,
capture_channel_bound,
subscription_channel_bound,
};
Expand All @@ -92,8 +96,56 @@ where
publisher
}

pub fn with(
mut on_event: impl FnMut(Event<K, T>) + std::marker::Send + 'static,
filter: F,
capture_channel_bound: usize,
subscription_channel_bound: usize,
) -> Self {
let (send, recv): (SyncSender<Event<K, T>>, _) = mpsc::sync_channel(capture_channel_bound);

let publisher = EvidentPublisher {
subscriptions: Arc::new(RwLock::new(HashMap::new())),
any_event: Arc::new(RwLock::new(HashMap::new())),
capturer: Arc::new(RwLock::new(Some(send))),
filter: Some(filter),
capture_channel_bound,
subscription_channel_bound,
};
let capturer = publisher.capturer.clone();

thread::spawn(move || {
while let Ok(event) = recv.recv() {
let id = event.get_id().clone();

on_event(event);

// Note: `on_event` must still be called to notify all listeners to stop aswell
if StopCapturing::stop_capturing(&id) {
break;
}
}

if let Ok(mut locked_cap) = capturer.write() {
*locked_cap = None;
}
});

publisher
}

pub fn get_filter(&self) -> &Option<F> {
&self.filter
}

pub fn capture<I: IntermediaryEvent<K, T>>(&self, interm_event: &mut I) {
if let Ok(locked_cap) = self.capturer.try_read() {
if let Some(filter) = &self.filter {
if !filter.allow_event(interm_event) {
return;
}
}

if let Ok(locked_cap) = self.capturer.read() {
if locked_cap.is_some() {
let _ = locked_cap
.as_ref()
Expand All @@ -104,6 +156,12 @@ where
}

pub fn try_capture<I: IntermediaryEvent<K, T>>(&self, interm_event: &mut I) {
if let Some(filter) = &self.filter {
if !filter.allow_event(interm_event) {
return;
}
}

if let Ok(locked_cap) = self.capturer.try_read() {
if locked_cap.is_some() {
let _ = locked_cap
Expand All @@ -114,11 +172,14 @@ where
}
}

pub fn subscribe(&self, id: K) -> Result<Subscription<K, T>, SubscriptionErr<K>> {
pub fn subscribe(&self, id: K) -> Result<Subscription<K, T, F>, SubscriptionError<K>> {
self.subscribe_to_many(vec![id])
}

pub fn subscribe_to_many(&self, ids: Vec<K>) -> Result<Subscription<K, T>, SubscriptionErr<K>> {
pub fn subscribe_to_many(
&self,
ids: Vec<K>,
) -> Result<Subscription<K, T, F>, SubscriptionError<K>> {
// Note: Number of ids to listen to most likely affects the number of received events => number is added to channel bound
// Addition instead of multiplikation, because even distribution accross events is highly unlikely.
let (sender, receiver) = mpsc::sync_channel(ids.len() + self.subscription_channel_bound);
Expand All @@ -141,7 +202,7 @@ where
}
}
None => {
return Err(SubscriptionErr::CouldNotAccessPublisher);
return Err(SubscriptionError::CouldNotAccessPublisher);
}
}

Expand All @@ -154,7 +215,7 @@ where
})
}

pub fn subscribe_to_all_events(&self) -> Result<Subscription<K, T>, SubscriptionErr<K>> {
pub fn subscribe_to_all_events(&self) -> Result<Subscription<K, T, F>, SubscriptionError<K>> {
let (sender, receiver) = mpsc::sync_channel(self.capture_channel_bound);
let channel_id = crate::uuid::Uuid::new_v4();

Expand All @@ -163,7 +224,7 @@ where
locked_vec.insert(channel_id, SubscriptionSender { channel_id, sender });
}
None => {
return Err(SubscriptionErr::CouldNotAccessPublisher);
return Err(SubscriptionError::CouldNotAccessPublisher);
}
}

Expand Down
Loading

0 comments on commit e052054

Please sign in to comment.