Skip to content

Commit

Permalink
Fix for concurrent logs storage init + transport update (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
Wiezzel authored May 8, 2024
1 parent 3e5d305 commit 8e8b108
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 19 deletions.
14 changes: 8 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ serde-rename-rule = "0.2.2"
serde_json = { version = "1.0.111", features = ["preserve_order"] }
sha3 = "0.10.8"
subsquid-messages = { git = "ssh://[email protected]/subsquid/subsquid-network.git", version = "0.2" }
subsquid-network-transport = { git = "ssh://[email protected]/subsquid/subsquid-network.git", version = "0.4", features = ["worker", "metrics"] }
subsquid-network-transport = { git = "ssh://[email protected]/subsquid/subsquid-network.git", version = "0.4.1", features = ["worker", "metrics"] }
thiserror = "1.0.57"
tokio = { version = "1.35.1", features = ["full", "tracing"] }
tokio-rusqlite = "0.5.1"
Expand Down
21 changes: 11 additions & 10 deletions src/logs_storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use anyhow::Result;
use prost::Message;
use subsquid_messages::QueryExecuted;
use tokio::sync::RwLock;
use tokio_rusqlite::Connection;

pub struct LogsStorage {
db: Connection,
has_next_seq_no: AtomicBool,
has_next_seq_no: Arc<RwLock<bool>>,
}

impl LogsStorage {
Expand All @@ -29,16 +30,16 @@ impl LogsStorage {

Ok(Self {
db,
has_next_seq_no: AtomicBool::new(has_next_seq_no),
has_next_seq_no: Arc::new(RwLock::new(has_next_seq_no)),
})
}

pub fn is_initialized(&self) -> bool {
self.has_next_seq_no.load(Ordering::SeqCst)
pub async fn is_initialized(&self) -> bool {
*self.has_next_seq_no.read().await
}

pub async fn save_log(&self, mut log: QueryExecuted) -> Result<()> {
assert!(self.is_initialized());
assert!(self.is_initialized().await);
self.db
.call(move |db| {
let tx = db.transaction()?;
Expand All @@ -64,7 +65,9 @@ impl LogsStorage {
last_collected_seq_no.unwrap_or(0)
);
let next_seq_no = last_collected_seq_no.map(|x| x + 1).unwrap_or(0);
if self.is_initialized() {
let mut is_init_guard = self.has_next_seq_no.write().await;
if *is_init_guard {
drop(is_init_guard);
self.db
.call_unwrap(move |db| {
db.prepare_cached("DELETE FROM query_logs WHERE seq_no < ?")
Expand All @@ -80,9 +83,7 @@ impl LogsStorage {
})
.await
.expect("Couldn't initialize logs storage");
if self.has_next_seq_no.swap(true, Ordering::SeqCst) {
panic!("Tried to initialize logs storage twice");
}
*is_init_guard = true;
tracing::info!("Initialized logs storage");
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/transport/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl<EventStream: Stream<Item = WorkerEvent>> P2PTransport<EventStream> {
},
);

if !self.logs_storage.is_initialized() {
if !self.logs_storage.is_initialized().await {
continue;
}
let logs = match self.logs_storage.get_logs().await {
Expand Down Expand Up @@ -164,7 +164,7 @@ impl<EventStream: Stream<Item = WorkerEvent>> P2PTransport<EventStream> {
async fn handle_query(&self, peer_id: PeerId, query: Query) {
let query_id = query.query_id.clone().expect("got query without query_id");

if !self.logs_storage.is_initialized() {
if !self.logs_storage.is_initialized().await {
warn!("Logs storage not initialized. Cannot execute queries yet.");
return;
}
Expand Down

0 comments on commit 8e8b108

Please sign in to comment.