Skip to content

Commit c237306

Browse files
committed
Updates
1 parent 9bc1100 commit c237306

17 files changed

+65
-41
lines changed

.sqlx/query-02638977689e77d619be18b7361d24006788ddfe96aa974c8d8e4a4eece728ac.json

+19
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-20813cb4d560d8cf0249c75937a203e20f6c309debea40fcd0b59a9cd9b60bf4.json

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-3edeaae05a38b76e0bbb075d5a7274d6725dd8f415a2f8ec616c4ec8120f7d69.json

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-3f2360a30d1497437a073242c1ca24a216b2f447c5c20d1e7dd998bb17c2392a.json

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-87f4e011d12f2e13397657a52c81aac9fe0a9c2ede2130cf7c8f157f0e226720.json

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-8e3c032a1f69ca82474c7deee6fa77d7353ef5b5fd79e15129f011b51a855ac5.json

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-8ec4b70a13e815516ab5e426d75cd9133825f4afbf434fdc241ef70b39b9da49.json

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-f24bb7a8912cfbc48d3e6c8587408c2a563969fbe33f01f93bffd613ec9cf7e0.json

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.sqlx/query-f4043180ead6f8da4b5611a427884d9c262ef44c29c042799c99358dd1d9e226.json

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.lock

+12
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ redis = { version = "0.23", features = ["tokio-comp"], default-features = false
2525
reqwest = { version = "0.11", features = ["json", "rustls-tls"], default-features = false }
2626
http-body-util = "0.1"
2727
tower-http = { version = "0.5", features = ["cors"] }
28+
tokio-retry = "0.3"
2829

2930
[lib]
3031
name = "arch_indexer"

init.sql

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ CREATE TABLE IF NOT EXISTS transactions (
99
txid text PRIMARY KEY,
1010
block_height bigint NOT NULL REFERENCES blocks(height),
1111
data jsonb NOT NULL,
12-
status smallint NOT NULL,
12+
status jsonb NOT NULL, -- Change this line
1313
bitcoin_txids text[],
1414
created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP
1515
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ALTER TABLE transactions
2+
ALTER COLUMN status TYPE JSONB USING to_jsonb(status::text);

src/db/models.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use chrono::{DateTime, Utc};
22
use serde::{Deserialize, Serialize};
33
use sqlx::types::JsonValue;
4-
4+
use serde_json::Value;
55
#[derive(Debug, Serialize, Deserialize)]
66
pub struct Block {
77
pub height: i64,
@@ -15,7 +15,7 @@ pub struct Transaction {
1515
pub txid: String,
1616
pub block_height: i64,
1717
pub data: JsonValue,
18-
pub status: i32,
18+
pub status: Value,
1919
pub bitcoin_txids: Option<Vec<String>>,
2020
pub created_at: chrono::NaiveDateTime,
2121
}

src/indexer/block_processor.rs

+2-8
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,12 @@ use chrono::Utc;
44
use dashmap::DashMap;
55
use futures::stream;
66
use futures::StreamExt;
7-
use sqlx::Postgres;
8-
use sqlx::QueryBuilder;
97
use std::fmt::Write;
108
use std::sync::Arc;
119
use tracing::error;
12-
use crate::arch_rpc::Block as ArchBlock;
1310
use crate::arch_rpc::ArchRpcClient;
1411
use crate::arch_rpc::Block;
1512
use crate::db::models::Transaction;
16-
use chrono::NaiveDateTime;
1713
use sqlx::PgPool;
1814
use std::sync::atomic::{AtomicU64, AtomicI64};
1915
use std::time::{SystemTime, UNIX_EPOCH};
@@ -146,8 +142,6 @@ impl BlockProcessor {
146142
.expect("Failed to serialize transaction data");
147143

148144
let bitcoin_txids: Option<&[String]> = transaction.bitcoin_txids.as_deref();
149-
150-
// Convert
151145

152146
sqlx::query!(
153147
r#"
@@ -157,7 +151,7 @@ impl BlockProcessor {
157151
transaction.txid,
158152
transaction.block_height,
159153
serde_json::Value::String(data_json),
160-
transaction.status,
154+
serde_json::Value::String(transaction.status.to_string()),
161155
bitcoin_txids,
162156
transaction.created_at
163157
)
@@ -201,7 +195,7 @@ impl BlockProcessor {
201195
txid: txid_clone,
202196
block_height: height,
203197
data: tx.runtime_transaction,
204-
status: if tx.status == "Processing" { 0 } else { 1 },
198+
status: tx.status,
205199
bitcoin_txids: tx.bitcoin_txids,
206200
created_at: chrono::Utc::now().naive_utc(),
207201
})

src/indexer/sync.rs

+15-17
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ use std::sync::Arc;
55
use tokio::time::{Duration, sleep};
66
use tracing::{info, warn, error};
77

8+
use tokio_retry::strategy::{ExponentialBackoff, jitter};
9+
use tokio_retry::Retry;
10+
811
use super::block_processor::BlockProcessor;
912

1013
pub struct ChainSync {
@@ -42,7 +45,8 @@ impl ChainSync {
4245
}
4346

4447
if current > target_height {
45-
break; // Exit the loop if we're already ahead of the target height
48+
// If current is greater than target, continue polling for new blocks
49+
continue;
4650
}
4751

4852
let batch_starts: Vec<_> = (0..self.concurrent_batches)
@@ -56,27 +60,23 @@ impl ChainSync {
5660
let end = (start + self.batch_size as i64 - 1).min(target_height);
5761
let heights: Vec<_> = (start..=end).collect();
5862
let processor = Arc::clone(&self.processor);
59-
63+
6064
async move {
61-
match processor.process_blocks_batch(heights).await {
62-
Ok(blocks) => {
63-
// for block in &blocks {
64-
// info!("Processed block {}", block.height);
65-
// }
66-
Ok(blocks)
67-
}
68-
Err(e) => {
69-
error!("Failed to process batch starting at {}: {:?}", start, e);
70-
Err(e)
71-
}
72-
}
65+
let retry_strategy = ExponentialBackoff::from_millis(10)
66+
.map(jitter) // add jitter to delays
67+
.take(5); // retry up to 5 times
68+
69+
Retry::spawn(retry_strategy, || async {
70+
processor.process_blocks_batch(heights.clone()).await
71+
})
72+
.await
7373
}
7474
})
7575
.collect();
7676

7777
// Process batches concurrently
7878
let results = futures::future::join_all(batch_futures).await;
79-
79+
8080
// Update progress
8181
for result in results {
8282
if let Ok(blocks) = result {
@@ -88,8 +88,6 @@ impl ChainSync {
8888

8989
current = self.current_height.load(Ordering::Relaxed);
9090
}
91-
92-
Ok(())
9391
}
9492

9593
async fn sync_blocks(&self) -> Result<()> {

src/main.rs

-2
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,6 @@ async fn main() -> Result<()> {
179179
}
180180
};
181181

182-
let current_height = 97935;
183-
184182
// Start the chain sync process
185183
let sync = ChainSync::new(
186184
Arc::clone(&processor),

0 commit comments

Comments
 (0)