From 8e8b108740c10e2100a48a5a0ece25076783ddc0 Mon Sep 17 00:00:00 2001 From: Adam Wierzbicki Date: Wed, 8 May 2024 20:53:09 +0200 Subject: [PATCH] Fix for concurrent logs storage init + transport update (#7) --- Cargo.lock | 14 ++++++++------ Cargo.toml | 2 +- src/logs_storage/mod.rs | 21 +++++++++++---------- src/transport/p2p.rs | 4 ++-- 4 files changed, 22 insertions(+), 19 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1506c38..ac9f9ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5182,7 +5182,7 @@ checksum = "80b776a1b2dc779f5ee0641f8ade0125bc1298dd41a9a0c16d8bd57b42d222b1" dependencies = [ "bytes", "heck 0.5.0", - "itertools 0.12.1", + "itertools 0.10.5", "log", "multimap", "once_cell", @@ -5202,7 +5202,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48" dependencies = [ "anyhow", - "itertools 0.12.1", + "itertools 0.10.5", "proc-macro2", "quote", "syn 2.0.59", @@ -6393,13 +6393,14 @@ dependencies = [ [[package]] name = "subsquid-messages" version = "0.2.0" -source = "git+ssh://git@github.com/subsquid/subsquid-network.git#16acbbf1e689e0cad45ad7fbe53de9d35545b186" +source = "git+ssh://git@github.com/subsquid/subsquid-network.git#d7797694ea17eabaa72df78b1388125adb6153a5" dependencies = [ "anyhow", "hex", "libp2p", "prost", "prost-build", + "semver", "serde", "sha3", ] @@ -6431,8 +6432,8 @@ dependencies = [ [[package]] name = "subsquid-network-transport" -version = "0.4.0" -source = "git+ssh://git@github.com/subsquid/subsquid-network.git#16acbbf1e689e0cad45ad7fbe53de9d35545b186" +version = "0.4.1" +source = "git+ssh://git@github.com/subsquid/subsquid-network.git#d7797694ea17eabaa72df78b1388125adb6153a5" dependencies = [ "anyhow", "async-trait", @@ -6450,6 +6451,7 @@ dependencies = [ "log", "prometheus-client", "prost", + "semver", "serde", "subsquid-messages", "thiserror", @@ -7616,7 +7618,7 @@ dependencies = [ "serde_json", "sha3", "subsquid-messages", - "subsquid-network-transport 0.4.0", + "subsquid-network-transport 0.4.1", "thiserror", "tokio", "tokio-rusqlite", diff --git a/Cargo.toml b/Cargo.toml index da3b902..de67a4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,7 @@ serde-rename-rule = "0.2.2" serde_json = { version = "1.0.111", features = ["preserve_order"] } sha3 = "0.10.8" subsquid-messages = { git = "ssh://git@github.com/subsquid/subsquid-network.git", version = "0.2" } -subsquid-network-transport = { git = "ssh://git@github.com/subsquid/subsquid-network.git", version = "0.4", features = ["worker", "metrics"] } +subsquid-network-transport = { git = "ssh://git@github.com/subsquid/subsquid-network.git", version = "0.4.1", features = ["worker", "metrics"] } thiserror = "1.0.57" tokio = { version = "1.35.1", features = ["full", "tracing"] } tokio-rusqlite = "0.5.1" diff --git a/src/logs_storage/mod.rs b/src/logs_storage/mod.rs index c6b03a3..8c593c6 100644 --- a/src/logs_storage/mod.rs +++ b/src/logs_storage/mod.rs @@ -1,13 +1,14 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use anyhow::Result; use prost::Message; use subsquid_messages::QueryExecuted; +use tokio::sync::RwLock; use tokio_rusqlite::Connection; pub struct LogsStorage { db: Connection, - has_next_seq_no: AtomicBool, + has_next_seq_no: Arc>, } impl LogsStorage { @@ -29,16 +30,16 @@ impl LogsStorage { Ok(Self { db, - has_next_seq_no: AtomicBool::new(has_next_seq_no), + has_next_seq_no: Arc::new(RwLock::new(has_next_seq_no)), }) } - pub fn is_initialized(&self) -> bool { - self.has_next_seq_no.load(Ordering::SeqCst) + pub async fn is_initialized(&self) -> bool { + *self.has_next_seq_no.read().await } pub async fn save_log(&self, mut log: QueryExecuted) -> Result<()> { - assert!(self.is_initialized()); + assert!(self.is_initialized().await); self.db .call(move |db| { let tx = db.transaction()?; @@ -64,7 +65,9 @@ impl LogsStorage { last_collected_seq_no.unwrap_or(0) ); let next_seq_no = last_collected_seq_no.map(|x| x + 1).unwrap_or(0); - if self.is_initialized() { + let mut is_init_guard = self.has_next_seq_no.write().await; + if *is_init_guard { + drop(is_init_guard); self.db .call_unwrap(move |db| { db.prepare_cached("DELETE FROM query_logs WHERE seq_no < ?") @@ -80,9 +83,7 @@ impl LogsStorage { }) .await .expect("Couldn't initialize logs storage"); - if self.has_next_seq_no.swap(true, Ordering::SeqCst) { - panic!("Tried to initialize logs storage twice"); - } + *is_init_guard = true; tracing::info!("Initialized logs storage"); } } diff --git a/src/transport/p2p.rs b/src/transport/p2p.rs index 9e6065d..74f6067 100644 --- a/src/transport/p2p.rs +++ b/src/transport/p2p.rs @@ -108,7 +108,7 @@ impl> P2PTransport { }, ); - if !self.logs_storage.is_initialized() { + if !self.logs_storage.is_initialized().await { continue; } let logs = match self.logs_storage.get_logs().await { @@ -164,7 +164,7 @@ impl> P2PTransport { async fn handle_query(&self, peer_id: PeerId, query: Query) { let query_id = query.query_id.clone().expect("got query without query_id"); - if !self.logs_storage.is_initialized() { + if !self.logs_storage.is_initialized().await { warn!("Logs storage not initialized. Cannot execute queries yet."); return; }