Skip to content

Commit

Permalink
Merge pull request eqlabs#2442 from eqlabs/sistemd/optimize-aggregate…
Browse files Browse the repository at this point in the history
…-event-filters

feat(bloom): aggregate event filter caching
  • Loading branch information
sistemd authored Dec 13, 2024
2 parents 9c57e5b + cbd71d9 commit 20b7d64
Show file tree
Hide file tree
Showing 19 changed files with 514 additions and 266 deletions.
47 changes: 27 additions & 20 deletions crates/pathfinder/src/bin/pathfinder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,33 +256,39 @@ This should only be enabled for debugging purposes as it adds substantial proces
feeder_gateway_fetch_concurrency: std::num::NonZeroUsize,

#[arg(
long = "storage.event-bloom-filter-cache-size",
long_help = "The number of blocks whose event bloom filters are cached in memory. This \
cache speeds up event related RPC queries at the cost of using extra memory. \
Each cached filter takes 2 KiB of memory.",
env = "PATHFINDER_STORAGE_BLOOM_FILTER_CACHE_SIZE",
default_value = "524288"
long = "storage.event-filter-cache-size",
long_help = format!(
"The number of aggregate event bloom filters to cache in memory. Each filter covers a {} block range.
This cache speeds up event related RPC queries at the cost of using extra memory.
Each cached filter takes 16 MiB of memory.",
pathfinder_storage::BLOCK_RANGE_LEN
),
env = "PATHFINDER_STORAGE_EVENT_FILTER_CACHE_SIZE",
default_value = "64"
)]
event_bloom_filter_cache_size: std::num::NonZeroUsize,
event_filter_cache_size: std::num::NonZeroUsize,

#[arg(
long = "rpc.get-events-max-blocks-to-scan",
long_help = "The number of blocks to scan for events when querying for events. This limit \
is used to prevent queries from taking too long.",
long_help = "The number of blocks to scan when querying for events. This limit is used to \
prevent queries from taking too long.",
env = "PATHFINDER_RPC_GET_EVENTS_MAX_BLOCKS_TO_SCAN",
default_value = "500"
)]
get_events_max_blocks_to_scan: std::num::NonZeroUsize,

#[arg(
long = "rpc.get-events-max-event-filters-to-load",
long_help = format!("The number of aggregate Bloom filters to load for events when querying for events. \
Each filter covers a {} block range. \
This limit is used to prevent queries from taking too long.", pathfinder_storage::BLOCK_RANGE_LEN),
env = "PATHFINDER_RPC_GET_EVENTS_MAX_EVENT_FILTERS_TO_LOAD",
default_value = "3"
long = "rpc.get-events-max-uncached-event-filters-to-load",
long_help = format!(
"The number of uncached aggregate Bloom filters to load when querying for events.
Each filter covers a {} block range.
This limit is used to prevent queries from taking too long.",
pathfinder_storage::BLOCK_RANGE_LEN
),
env = "PATHFINDER_RPC_GET_EVENTS_MAX_UNCACHED_EVENT_FILTERS_TO_LOAD",
default_value = "12"
)]
get_events_max_event_filters_to_load: std::num::NonZeroUsize,
get_events_max_uncached_event_filters_to_load: std::num::NonZeroUsize,

#[arg(
long = "storage.state-tries",
Expand Down Expand Up @@ -711,9 +717,9 @@ pub struct Config {
pub is_rpc_enabled: bool,
pub gateway_api_key: Option<String>,
pub gateway_timeout: Duration,
pub event_bloom_filter_cache_size: NonZeroUsize,
pub event_filter_cache_size: NonZeroUsize,
pub get_events_max_blocks_to_scan: NonZeroUsize,
pub get_events_max_event_filters_to_load: NonZeroUsize,
pub get_events_max_uncached_event_filters_to_load: NonZeroUsize,
pub state_tries: Option<StateTries>,
pub custom_versioned_constants: Option<VersionedConstants>,
pub feeder_gateway_fetch_concurrency: NonZeroUsize,
Expand Down Expand Up @@ -1001,9 +1007,10 @@ impl Config {
is_sync_enabled: cli.is_sync_enabled,
is_rpc_enabled: cli.is_rpc_enabled,
gateway_api_key: cli.gateway_api_key,
event_bloom_filter_cache_size: cli.event_bloom_filter_cache_size,
event_filter_cache_size: cli.event_filter_cache_size,
get_events_max_blocks_to_scan: cli.get_events_max_blocks_to_scan,
get_events_max_event_filters_to_load: cli.get_events_max_event_filters_to_load,
get_events_max_uncached_event_filters_to_load: cli
.get_events_max_uncached_event_filters_to_load,
gateway_timeout: Duration::from_secs(cli.gateway_timeout.get()),
feeder_gateway_fetch_concurrency: cli.feeder_gateway_fetch_concurrency,
state_tries: cli.state_tries,
Expand Down
5 changes: 3 additions & 2 deletions crates/pathfinder/src/bin/pathfinder/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ async fn async_main() -> anyhow::Result<()> {
let storage_manager =
pathfinder_storage::StorageBuilder::file(pathfinder_context.database.clone())
.journal_mode(config.sqlite_wal)
.bloom_filter_cache_size(config.event_bloom_filter_cache_size.get())
.event_filter_cache_size(config.event_filter_cache_size.get())
.trie_prune_mode(match config.state_tries {
Some(StateTries::Pruned(num_blocks_kept)) => {
Some(pathfinder_storage::TriePruneMode::Prune { num_blocks_kept })
Expand Down Expand Up @@ -217,7 +217,8 @@ Hint: This is usually caused by exceeding the file descriptor limit of your syst
let rpc_config = pathfinder_rpc::context::RpcConfig {
batch_concurrency_limit: config.rpc_batch_concurrency_limit,
get_events_max_blocks_to_scan: config.get_events_max_blocks_to_scan,
get_events_max_event_filters_to_load: config.get_events_max_event_filters_to_load,
get_events_max_uncached_event_filters_to_load: config
.get_events_max_uncached_event_filters_to_load,
custom_versioned_constants: config.custom_versioned_constants.take(),
};

Expand Down
4 changes: 2 additions & 2 deletions crates/pathfinder/src/state/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1081,8 +1081,8 @@ async fn l2_reorg(
}

transaction
.reconstruct_running_event_filter()
.context("Reconstructing running event filter after purge")?;
.reset()
.context("Resetting local DB state after reorg")?;

// Track combined L1 and L2 state.
let l1_l2_head = transaction.l1_l2_pointer().context("Query L1-L2 head")?;
Expand Down
4 changes: 2 additions & 2 deletions crates/pathfinder/src/sync/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -668,8 +668,8 @@ async fn rollback_to_anchor(
}

transaction
.reconstruct_running_event_filter()
.context("Reconstructing running event filter after purge")?;
.reset()
.context("Resetting local DB state after reorg")?;

Ok(())
})
Expand Down
2 changes: 0 additions & 2 deletions crates/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ license = { workspace = true }
rust-version = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[features]

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/rpc/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::sync::watch as tokio_watch;
pub struct RpcConfig {
pub batch_concurrency_limit: NonZeroUsize,
pub get_events_max_blocks_to_scan: NonZeroUsize,
pub get_events_max_event_filters_to_load: NonZeroUsize,
pub get_events_max_uncached_event_filters_to_load: NonZeroUsize,
pub custom_versioned_constants: Option<VersionedConstants>,
}

Expand Down Expand Up @@ -120,7 +120,7 @@ impl RpcContext {
let config = RpcConfig {
batch_concurrency_limit: NonZeroUsize::new(8).unwrap(),
get_events_max_blocks_to_scan: NonZeroUsize::new(1000).unwrap(),
get_events_max_event_filters_to_load: NonZeroUsize::new(1000).unwrap(),
get_events_max_uncached_event_filters_to_load: NonZeroUsize::new(1000).unwrap(),
custom_versioned_constants: None,
};

Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/jsonrpc/router/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ mod tests {
config: RpcConfig {
batch_concurrency_limit: 1.try_into().unwrap(),
get_events_max_blocks_to_scan: 1.try_into().unwrap(),
get_events_max_event_filters_to_load: 1.try_into().unwrap(),
get_events_max_uncached_event_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/method/get_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ pub async fn get_events(
.events(
&constraints,
context.config.get_events_max_blocks_to_scan,
context.config.get_events_max_event_filters_to_load,
context.config.get_events_max_uncached_event_filters_to_load,
)
.map_err(|e| match e {
EventFilterError::Internal(e) => GetEventsError::Internal(e),
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/method/subscribe_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,7 @@ mod tests {
config: RpcConfig {
batch_concurrency_limit: 64.try_into().unwrap(),
get_events_max_blocks_to_scan: 1024.try_into().unwrap(),
get_events_max_event_filters_to_load: 1.try_into().unwrap(),
get_events_max_uncached_event_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/method/subscribe_new_heads.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ mod tests {
config: RpcConfig {
batch_concurrency_limit: 1.try_into().unwrap(),
get_events_max_blocks_to_scan: 1.try_into().unwrap(),
get_events_max_event_filters_to_load: 1.try_into().unwrap(),
get_events_max_uncached_event_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/method/subscribe_pending_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ mod tests {
config: RpcConfig {
batch_concurrency_limit: 1.try_into().unwrap(),
get_events_max_blocks_to_scan: 1.try_into().unwrap(),
get_events_max_event_filters_to_load: 1.try_into().unwrap(),
get_events_max_uncached_event_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/src/method/subscribe_transaction_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ mod tests {
config: RpcConfig {
batch_concurrency_limit: 1.try_into().unwrap(),
get_events_max_blocks_to_scan: 1.try_into().unwrap(),
get_events_max_event_filters_to_load: 1.try_into().unwrap(),
get_events_max_uncached_event_filters_to_load: 1.try_into().unwrap(),
custom_versioned_constants: None,
},
};
Expand Down
7 changes: 6 additions & 1 deletion crates/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ primitive-types = { workspace = true }
r2d2 = { workspace = true }
r2d2_sqlite = { workspace = true }
rand = { workspace = true }
rusqlite = { workspace = true, features = ["bundled", "functions"] }
rusqlite = { workspace = true, features = [
"bundled",
"functions",
"vtab",
"array",
] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true, features = [
"arbitrary_precision",
Expand Down
Loading

0 comments on commit 20b7d64

Please sign in to comment.