diff --git a/Cargo.toml b/Cargo.toml index 0514e4b93..e974a14ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ members = [ "crates/madara/node", "crates/madara/primitives/block", "crates/madara/primitives/bloom_filter", + "crates/madara/primitives/bloom_filter", "crates/madara/primitives/convert", "crates/madara/primitives/transactions", "crates/madara/primitives/class", @@ -48,6 +49,7 @@ default-members = [ "crates/madara/node", "crates/madara/primitives/block", "crates/madara/primitives/bloom_filter", + "crates/madara/primitives/bloom_filter", "crates/madara/primitives/convert", "crates/madara/primitives/transactions", "crates/madara/primitives/class", diff --git a/crates/madara/client/rpc/src/versions/user/v0_7_1/methods/read/get_events.rs b/crates/madara/client/rpc/src/versions/user/v0_7_1/methods/read/get_events.rs index 6310add5b..1625cc2cd 100644 --- a/crates/madara/client/rpc/src/versions/user/v0_7_1/methods/read/get_events.rs +++ b/crates/madara/client/rpc/src/versions/user/v0_7_1/methods/read/get_events.rs @@ -41,7 +41,7 @@ pub async fn get_events( let chunk_size = filter.chunk_size; if let Some(keys) = &keys { - if keys.len() > MAX_EVENTS_KEYS { + if keys.iter().flatten().count() > MAX_EVENTS_KEYS { return Err(StarknetRpcApiError::TooManyKeysInFilter); } } @@ -50,7 +50,7 @@ pub async fn get_events( } // Get the block numbers for the requested range - let (from_block, to_block, latest_block) = block_range(starknet, filter.from_block, filter.to_block)?; + let (from_block, to_block, _) = block_range(starknet, filter.from_block, filter.to_block)?; let continuation_token = match filter.continuation_token { Some(token) => ContinuationToken::parse(token).map_err(|_| StarknetRpcApiError::InvalidContinuationToken)?, @@ -63,63 +63,73 @@ pub async fn get_events( } let from_block = continuation_token.block_n; - let mut filtered_events: Vec> = Vec::new(); + let mut events_chunk: Vec> = Vec::with_capacity(chunk_size as usize); - let iter_filter = starknet + let key_filter = EventBloomSearcher::new(from_address.as_ref(), keys.as_deref()); + + let filter_event_stream = starknet .backend .get_event_filter_stream(from_block) .or_internal_server_error("Error getting event filter stream")?; - let key_filter = EventBloomSearcher::new(from_address.as_ref(), keys.as_deref()); - - for filter_block in iter_filter { - let (current_block, filter) = filter_block.or_internal_server_error("Error getting next filter block")?; + for filter_block in filter_event_stream { + // Attempt to retrieve the next block and its bloom filter. + // Only blocks with events have a bloom filter. + let (current_block, bloom_filter) = filter_block.or_internal_server_error("Error getting next filter block")?; - if current_block > latest_block { + // Stop processing if the current block exceeds the requested range. + // - `latest_block`: Ensures we do not process beyond the latest finalized block. + // - `to_block`: Ensures we do not go beyond the user-specified range. + if current_block > to_block { break; } - if !key_filter.search(&filter) { + // Use the bloom filter to quickly check if the block might contain relevant events. + // - This avoids unnecessary block retrieval if no matching events exist. + if !key_filter.search(&bloom_filter) { continue; } + // Retrieve the full block data since we now suspect it contains relevant events. let block = starknet.get_block(&BlockId::Number(current_block)).or_internal_server_error("Error getting block")?; - // TODO: take only the events to fill the chunk not all the events - let block_filtered_events: Vec> = drain_block_events(block) - .filter(|event| event_match_filter(&event.event, from_address.as_ref(), keys.as_deref())) - .collect(); - - if current_block == from_block && (block_filtered_events.len() as u64) < continuation_token.event_n { - return Err(StarknetRpcApiError::InvalidContinuationToken); - } + let mut last_event_index = 0; - #[allow(clippy::iter_skip_zero)] - let block_filtered_reduced_events: Vec> = block_filtered_events - .into_iter() + for (idx, event) in drain_block_events(block) + .enumerate() + // Skip events that have already been processed if we are resuming from a continuation token. + // Otherwise, start from the beginning of the block. .skip(if current_block == from_block { continuation_token.event_n as usize } else { 0 }) - .take(chunk_size as usize - filtered_events.len()) - .collect(); - let num_events = block_filtered_reduced_events.len(); - - filtered_events.extend(block_filtered_reduced_events); - - if filtered_events.len() == chunk_size as usize { - let event_n = - if current_block == from_block { continuation_token.event_n + chunk_size } else { num_events as u64 }; - let token = Some(ContinuationToken { block_n: current_block, event_n }.to_string()); - - return Ok(EventsChunk { events: filtered_events, continuation_token: token }); - } - if current_block == to_block { - break; + // Filter events based on the given event filter criteria (address, keys). + .filter(|(_, event)| event_match_filter(&event.event, from_address.as_ref(), keys.as_deref())) + + // Take exactly enough events to fill the requested chunk size, plus one extra event. + // The extra event is used to determine if the block has more matching events. + // - If an extra event is found, it means there are still unprocessed events in this block. + // -> The continuation token should point to this block and the next event index. + // - If no extra event is found, it means all matching events in this block have been retrieved. + // -> The continuation token should move to the next block. + .take(chunk_size as usize - events_chunk.len() + 1) + { + if events_chunk.len() < chunk_size as usize { + events_chunk.push(event); + last_event_index = idx; + } else { + // If a new event was found, it means there are more events that match the filter. + return Ok(EventsChunk { + events: events_chunk, + continuation_token: Some( + ContinuationToken { block_n: current_block, event_n: (last_event_index + 1) as u64 } + .to_string(), + ), + }); + } } } - // TODO: handle the case where 'to_block' is pending - Ok(EventsChunk { events: filtered_events, continuation_token: None }) + Ok(EventsChunk { events: events_chunk, continuation_token: None }) } fn block_range(