Skip to content

Commit

Permalink
add the post processors.
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-aptos committed Jan 12, 2024
1 parent 4801aca commit 80ce363
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 6 deletions.
94 changes: 90 additions & 4 deletions rust/indexer-metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
use anyhow::Result;
use clap::Parser;
use indexer_metrics::{
metrics::{PFN_LEDGER_TIMESTAMP, PFN_LEDGER_VERSION, TASK_FAILURE_COUNT},
metrics::{
HASURA_API_LATEST_TRANSACTION_TIMESTAMP, HASURA_API_LATEST_VERSION,
HASURA_API_LATEST_VERSION_TIMESTAMP, INDEXER_PROCESSED_LATENCY, PFN_LEDGER_TIMESTAMP,
PFN_LEDGER_VERSION, TASK_FAILURE_COUNT,
},
util::{deserialize_from_string, fetch_url_with_timeout},
};
use serde::{Deserialize, Serialize};
Expand All @@ -23,6 +27,21 @@ struct FullnodeResponse {
ledger_timestamp: u64,
}

#[derive(Debug, Deserialize, Serialize)]
struct HasuraResponse {
processor_status: Vec<ProcessorStatus>,
}

#[derive(Debug, Deserialize, Serialize)]
struct ProcessorStatus {
processor: String,
last_success_version: i64,
#[serde(deserialize_with = "deserialize_from_string")]
last_updated: chrono::NaiveDateTime,
#[serde(deserialize_with = "deserialize_from_string")]
last_transaction_timestamp: chrono::NaiveDateTime,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct PostProcessorConfig {
Expand All @@ -35,15 +54,18 @@ pub struct PostProcessorConfig {
impl RunnableConfig for PostProcessorConfig {
async fn run(&self) -> Result<()> {
let mut tasks = vec![];
let _hasura_rest_api_endpoint = self.hasura_rest_api_endpoint.clone();
let hasura_rest_api_endpoint = self.hasura_rest_api_endpoint.clone();
let fullnode_rest_api_endpoint = self.fullnode_rest_api_endpoint.clone();
let chain_name = self.chain_name.clone();

// if let Some(hasura) = hasura_rest_api_endpoint {}
if let Some(fullnode) = fullnode_rest_api_endpoint {
tasks.push(tokio::spawn(start_fn_fetch(fullnode, chain_name)));
tasks.push(tokio::spawn(start_fn_fetch(fullnode, chain_name.clone())));
}

if let Some(hasura) = hasura_rest_api_endpoint {
tasks.push(tokio::spawn(start_hasura_fetch(hasura, chain_name)));
}
let _ = futures::future::join_all(tasks).await;
unreachable!("All tasks should run forever");
}
Expand All @@ -55,11 +77,75 @@ impl RunnableConfig for PostProcessorConfig {

#[tokio::main]
async fn main() -> Result<()> {
let args = ServerArgs::parse();
let args: ServerArgs = ServerArgs::parse();
args.run::<PostProcessorConfig>(tokio::runtime::Handle::current())
.await
}

async fn start_hasura_fetch(url: String, chain_name: String) {
loop {
let result = fetch_url_with_timeout(&url, QUERY_TIMEOUT_MS).await;
let time_now = tokio::time::Instant::now();

// Handle the result
match result {
Ok(Ok(response)) => match response.json::<HasuraResponse>().await {
Ok(resp) => {
tracing::info!(url = &url, response = ?resp, "Request succeeded");
for processor in resp.processor_status {
let processor_name = processor.processor;
HASURA_API_LATEST_VERSION
.with_label_values(&[&processor_name, &chain_name])
.set(processor.last_success_version);
HASURA_API_LATEST_VERSION_TIMESTAMP
.with_label_values(&[&processor_name, &chain_name])
.set(processor.last_updated.timestamp_millis() as f64 / 1_000_000.0);
HASURA_API_LATEST_TRANSACTION_TIMESTAMP
.with_label_values(&[&processor_name, &chain_name])
.set(
processor.last_transaction_timestamp.timestamp_millis() as f64
/ 1_000_000.0,
);
INDEXER_PROCESSED_LATENCY
.with_label_values(&[&processor_name, &chain_name])
.set(
processor.last_updated.timestamp_millis() as f64 / 1_000_000.0
- processor.last_transaction_timestamp.timestamp_millis()
as f64
/ 1_000_000.0,
);
}
},
Err(err) => {
tracing::error!(url = &url, error = ?err, "Parsing error");
TASK_FAILURE_COUNT
.with_label_values(&["hasura", &chain_name])
.inc();
},
},
Ok(Err(err)) => {
// Request encountered an error within the timeout
tracing::error!(url = &url, error = ?err, "Request error");
TASK_FAILURE_COUNT
.with_label_values(&["hasura", &chain_name])
.inc();
},
Err(_) => {
// Request timed out
tracing::error!(url = &url, "Request timed out");
TASK_FAILURE_COUNT
.with_label_values(&["hasura", &chain_name])
.inc();
},
}
let elapsed = time_now.elapsed().as_millis() as u64;
// Sleep for a max of 500ms between queries
if elapsed < MIN_TIME_QUERIES_MS {
tokio::time::sleep(Duration::from_millis(MIN_TIME_QUERIES_MS - elapsed)).await;
}
}
}

async fn start_fn_fetch(url: String, chain_name: String) {
loop {
let result = fetch_url_with_timeout(&url, QUERY_TIMEOUT_MS).await;
Expand Down
18 changes: 18 additions & 0 deletions rust/indexer-metrics/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ pub static TASK_FAILURE_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

pub static HASURA_API_LATEST_TRANSACTION_TIMESTAMP: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"indexer_metrics_hasura_latest_transaction_timestamp_secs",
"Processor latest transaction timestamp (unix timestamp) measured from indexer metrics service",
&["processor_name", "chain_name"],
)
.unwrap()
});

pub static HASURA_API_LATEST_VERSION: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"indexer_metrics_hasura_latest_version",
Expand All @@ -35,6 +44,15 @@ pub static HASURA_API_LATEST_VERSION_TIMESTAMP: Lazy<GaugeVec> = Lazy::new(|| {
.unwrap()
});

pub static INDEXER_PROCESSED_LATENCY: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"indexer_metrics_processed_latency_secs",
"Latency between transaction timestamp and indexer processed timestamp (unix timestamp) measured from indexer metrics service",
&["processor_name", "chain_name"],
)
.unwrap()
});

pub static PFN_LEDGER_VERSION: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"indexer_metrics_pfn_ledger_version",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- This file should undo anything in `up.sql`
ALTER TABLE processor_status DROP COLUMN last_transaction_timestamp;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Your SQL goes here
ALTER TABLE processor_status
ADD COLUMN last_transaction_timestamp TIMESTAMP;
2 changes: 2 additions & 0 deletions rust/processor/src/models/processor_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use diesel_async::RunQueryDsl;
pub struct ProcessorStatus {
pub processor: String,
pub last_success_version: i64,
pub last_transaction_timestamp: Option<chrono::NaiveDateTime>,
}

#[derive(AsChangeset, Debug, Queryable)]
Expand All @@ -21,6 +22,7 @@ pub struct ProcessorStatusQuery {
pub processor: String,
pub last_success_version: i64,
pub last_updated: chrono::NaiveDateTime,
pub last_transaction_timestamp: Option<chrono::NaiveDateTime>,
}

impl ProcessorStatusQuery {
Expand Down
9 changes: 8 additions & 1 deletion rust/processor/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,16 @@ pub trait ProcessorTrait: Send + Sync + Debug {

/// Store last processed version from database. We can assume that all previously processed
/// versions are successful because any gap would cause the processor to panic
async fn update_last_processed_version(&self, version: u64) -> anyhow::Result<()> {
async fn update_last_processed_version(
&self,
version: u64,
last_transaction_timestamp: Option<chrono::NaiveDateTime>,
) -> anyhow::Result<()> {
let mut conn = self.get_conn().await;
let status = ProcessorStatus {
processor: self.name().to_string(),
last_success_version: version as i64,
last_transaction_timestamp,
};
execute_with_better_error(
&mut conn,
Expand All @@ -120,6 +125,8 @@ pub trait ProcessorTrait: Send + Sync + Debug {
processor_status::last_success_version
.eq(excluded(processor_status::last_success_version)),
processor_status::last_updated.eq(excluded(processor_status::last_updated)),
processor_status::last_transaction_timestamp
.eq(excluded(processor_status::last_transaction_timestamp)),
)),
Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "),
)
Expand Down
5 changes: 5 additions & 0 deletions rust/processor/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,7 @@ diesel::table! {
processor -> Varchar,
last_success_version -> Int8,
last_updated -> Timestamp,
last_transaction_timestamp -> Nullable<Timestamp>,
}
}

Expand Down Expand Up @@ -1197,6 +1198,10 @@ diesel::table! {
}

diesel::joinable!(block_metadata_transactions -> transactions (version));
diesel::joinable!(move_modules -> transactions (transaction_version));
diesel::joinable!(move_resources -> transactions (transaction_version));
diesel::joinable!(table_items -> transactions (transaction_version));
diesel::joinable!(write_set_changes -> transactions (transaction_version));

diesel::allow_tables_to_appear_in_same_query!(
account_transactions,
Expand Down
15 changes: 14 additions & 1 deletion rust/processor/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,19 @@ impl Worker {
"[Parser] Successfully fetched transaction batches from channel."
);

let last_transaction_pb_timestamp = transactions_batches
.last()
.unwrap()
.transactions
.as_slice()
.last()
.unwrap()
.timestamp
.clone();
let last_transaction_timestamp = last_transaction_pb_timestamp.map(|t| {
chrono::NaiveDateTime::from_timestamp_opt(t.seconds, t.nanos as u32).unwrap()
});

// Process the transactions in parallel
let mut tasks = vec![];
for transactions_pb in transactions_batches {
Expand Down Expand Up @@ -616,7 +629,7 @@ impl Worker {
batch_start_version = batch_end + 1;

processor
.update_last_processed_version(batch_end)
.update_last_processed_version(batch_end, last_transaction_timestamp)
.await
.unwrap();

Expand Down

0 comments on commit 80ce363

Please sign in to comment.