Skip to content

Commit

Permalink
Merge commit 'HEAD@{5}' into feat/bloom-filter
Browse files Browse the repository at this point in the history
  • Loading branch information
jbcaron committed Feb 3, 2025
2 parents f5ef319 + 3320801 commit cff833c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 38 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ members = [
"crates/madara/node",
"crates/madara/primitives/block",
"crates/madara/primitives/bloom_filter",
"crates/madara/primitives/bloom_filter",
"crates/madara/primitives/convert",
"crates/madara/primitives/transactions",
"crates/madara/primitives/class",
Expand Down Expand Up @@ -48,6 +49,7 @@ default-members = [
"crates/madara/node",
"crates/madara/primitives/block",
"crates/madara/primitives/bloom_filter",
"crates/madara/primitives/bloom_filter",
"crates/madara/primitives/convert",
"crates/madara/primitives/transactions",
"crates/madara/primitives/class",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub async fn get_events(
let chunk_size = filter.chunk_size;

if let Some(keys) = &keys {
if keys.len() > MAX_EVENTS_KEYS {
if keys.iter().flatten().count() > MAX_EVENTS_KEYS {
return Err(StarknetRpcApiError::TooManyKeysInFilter);
}
}
Expand All @@ -50,7 +50,7 @@ pub async fn get_events(
}

// Get the block numbers for the requested range
let (from_block, to_block, latest_block) = block_range(starknet, filter.from_block, filter.to_block)?;
let (from_block, to_block, _) = block_range(starknet, filter.from_block, filter.to_block)?;

let continuation_token = match filter.continuation_token {
Some(token) => ContinuationToken::parse(token).map_err(|_| StarknetRpcApiError::InvalidContinuationToken)?,
Expand All @@ -63,63 +63,73 @@ pub async fn get_events(
}

let from_block = continuation_token.block_n;
let mut filtered_events: Vec<EmittedEvent<Felt>> = Vec::new();
let mut events_chunk: Vec<EmittedEvent<Felt>> = Vec::with_capacity(chunk_size as usize);

let iter_filter = starknet
let key_filter = EventBloomSearcher::new(from_address.as_ref(), keys.as_deref());

let filter_event_stream = starknet
.backend
.get_event_filter_stream(from_block)
.or_internal_server_error("Error getting event filter stream")?;

let key_filter = EventBloomSearcher::new(from_address.as_ref(), keys.as_deref());

for filter_block in iter_filter {
let (current_block, filter) = filter_block.or_internal_server_error("Error getting next filter block")?;
for filter_block in filter_event_stream {
// Attempt to retrieve the next block and its bloom filter.
// Only blocks with events have a bloom filter.
let (current_block, bloom_filter) = filter_block.or_internal_server_error("Error getting next filter block")?;

if current_block > latest_block {
// Stop processing if the current block exceeds the requested range.
// - `latest_block`: Ensures we do not process beyond the latest finalized block.
// - `to_block`: Ensures we do not go beyond the user-specified range.
if current_block > to_block {
break;
}

if !key_filter.search(&filter) {
// Use the bloom filter to quickly check if the block might contain relevant events.
// - This avoids unnecessary block retrieval if no matching events exist.
if !key_filter.search(&bloom_filter) {
continue;
}

// Retrieve the full block data since we now suspect it contains relevant events.
let block =
starknet.get_block(&BlockId::Number(current_block)).or_internal_server_error("Error getting block")?;

// TODO: take only the events to fill the chunk not all the events
let block_filtered_events: Vec<EmittedEvent<Felt>> = drain_block_events(block)
.filter(|event| event_match_filter(&event.event, from_address.as_ref(), keys.as_deref()))
.collect();

if current_block == from_block && (block_filtered_events.len() as u64) < continuation_token.event_n {
return Err(StarknetRpcApiError::InvalidContinuationToken);
}
let mut last_event_index = 0;

#[allow(clippy::iter_skip_zero)]
let block_filtered_reduced_events: Vec<EmittedEvent<Felt>> = block_filtered_events
.into_iter()
for (idx, event) in drain_block_events(block)
.enumerate()
// Skip events that have already been processed if we are resuming from a continuation token.
// Otherwise, start from the beginning of the block.
.skip(if current_block == from_block { continuation_token.event_n as usize } else { 0 })
.take(chunk_size as usize - filtered_events.len())
.collect();

let num_events = block_filtered_reduced_events.len();

filtered_events.extend(block_filtered_reduced_events);

if filtered_events.len() == chunk_size as usize {
let event_n =
if current_block == from_block { continuation_token.event_n + chunk_size } else { num_events as u64 };
let token = Some(ContinuationToken { block_n: current_block, event_n }.to_string());

return Ok(EventsChunk { events: filtered_events, continuation_token: token });
}
if current_block == to_block {
break;
// Filter events based on the given event filter criteria (address, keys).
.filter(|(_, event)| event_match_filter(&event.event, from_address.as_ref(), keys.as_deref()))

// Take exactly enough events to fill the requested chunk size, plus one extra event.
// The extra event is used to determine if the block has more matching events.
// - If an extra event is found, it means there are still unprocessed events in this block.
// -> The continuation token should point to this block and the next event index.
// - If no extra event is found, it means all matching events in this block have been retrieved.
// -> The continuation token should move to the next block.
.take(chunk_size as usize - events_chunk.len() + 1)
{
if events_chunk.len() < chunk_size as usize {
events_chunk.push(event);
last_event_index = idx;
} else {
// If a new event was found, it means there are more events that match the filter.
return Ok(EventsChunk {
events: events_chunk,
continuation_token: Some(
ContinuationToken { block_n: current_block, event_n: (last_event_index + 1) as u64 }
.to_string(),
),
});
}
}
}
// TODO: handle the case where 'to_block' is pending

Ok(EventsChunk { events: filtered_events, continuation_token: None })
Ok(EventsChunk { events: events_chunk, continuation_token: None })
}

fn block_range(
Expand Down

0 comments on commit cff833c

Please sign in to comment.