diff --git a/rust/processor/migrations/2024-01-11-224315_update_process_status/down.sql b/rust/processor/migrations/2024-01-11-224315_update_process_status/down.sql new file mode 100644 index 00000000..7d28c69a --- /dev/null +++ b/rust/processor/migrations/2024-01-11-224315_update_process_status/down.sql @@ -0,0 +1,2 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE processor_status DROP COLUMN last_transaction_timestamp; \ No newline at end of file diff --git a/rust/processor/migrations/2024-01-11-224315_update_process_status/up.sql b/rust/processor/migrations/2024-01-11-224315_update_process_status/up.sql new file mode 100644 index 00000000..3cadf67b --- /dev/null +++ b/rust/processor/migrations/2024-01-11-224315_update_process_status/up.sql @@ -0,0 +1,3 @@ +-- Your SQL goes here +ALTER TABLE processor_status +ADD COLUMN last_transaction_timestamp TIMESTAMP; \ No newline at end of file diff --git a/rust/processor/src/models/default_models/move_modules.rs b/rust/processor/src/models/default_models/move_modules.rs index 52089690..fa39238e 100644 --- a/rust/processor/src/models/default_models/move_modules.rs +++ b/rust/processor/src/models/default_models/move_modules.rs @@ -29,6 +29,7 @@ pub struct MoveModule { pub is_deleted: bool, } +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct MoveModuleByteCodeParsed { pub address: String, pub name: String, @@ -50,15 +51,23 @@ impl MoveModule { transaction_version, transaction_block_height, write_set_change_index, + // TODO: remove the useless_asref lint when new clippy nighly is released. + #[allow(clippy::useless_asref)] name: parsed_data .as_ref() .map(|d| d.name.clone()) .unwrap_or_default(), address: standardize_address(&write_module.address.to_string()), + // TODO: remove the useless_asref lint when new clippy nighly is released. + #[allow(clippy::useless_asref)] bytecode: parsed_data.as_ref().map(|d| d.bytecode.clone()), + // TODO: remove the useless_asref lint when new clippy nighly is released. + #[allow(clippy::useless_asref)] exposed_functions: parsed_data.as_ref().map(|d| d.exposed_functions.clone()), + // TODO: remove the useless_asref lint when new clippy nighly is released. + #[allow(clippy::useless_asref)] friends: parsed_data.as_ref().map(|d| d.friends.clone()), - structs: parsed_data.as_ref().map(|d| d.structs.clone()), + structs: parsed_data.map(|d| d.structs), is_deleted: false, } } @@ -73,6 +82,8 @@ impl MoveModule { transaction_version, transaction_block_height, write_set_change_index, + // TODO: remove the useless_asref lint when new clippy nighly is released. + #[allow(clippy::useless_asref)] name: delete_module .module .as_ref() diff --git a/rust/processor/src/models/processor_status.rs b/rust/processor/src/models/processor_status.rs index 89ec3479..7b89ec57 100644 --- a/rust/processor/src/models/processor_status.rs +++ b/rust/processor/src/models/processor_status.rs @@ -12,6 +12,7 @@ use diesel_async::RunQueryDsl; pub struct ProcessorStatus { pub processor: String, pub last_success_version: i64, + pub last_transaction_timestamp: Option, } #[derive(AsChangeset, Debug, Queryable)] @@ -21,6 +22,7 @@ pub struct ProcessorStatusQuery { pub processor: String, pub last_success_version: i64, pub last_updated: chrono::NaiveDateTime, + pub last_transaction_timestamp: Option, } impl ProcessorStatusQuery { diff --git a/rust/processor/src/models/token_v2_models/v2_token_datas.rs b/rust/processor/src/models/token_v2_models/v2_token_datas.rs index 4d31f3f3..9714a0ef 100644 --- a/rust/processor/src/models/token_v2_models/v2_token_datas.rs +++ b/rust/processor/src/models/token_v2_models/v2_token_datas.rs @@ -76,6 +76,8 @@ pub struct TokenDataIdFromTable { } impl TokenDataV2 { + // TODO: remove the useless_asref lint when new clippy nighly is released. + #[allow(clippy::useless_asref)] pub fn get_v2_from_write_resource( write_resource: &WriteResource, txn_version: i64, diff --git a/rust/processor/src/processors/mod.rs b/rust/processor/src/processors/mod.rs index c55184ba..da61b871 100644 --- a/rust/processor/src/processors/mod.rs +++ b/rust/processor/src/processors/mod.rs @@ -38,6 +38,7 @@ use crate::{ utils::{ counters::{GOT_CONNECTION_COUNT, UNABLE_TO_GET_CONNECTION_COUNT}, database::{execute_with_better_error, PgDbPool, PgPoolConnection}, + util::parse_timestamp, }, }; use aptos_protos::transaction::v1::Transaction as ProtoTransaction; @@ -104,11 +105,17 @@ pub trait ProcessorTrait: Send + Sync + Debug { /// Store last processed version from database. We can assume that all previously processed /// versions are successful because any gap would cause the processor to panic - async fn update_last_processed_version(&self, version: u64) -> anyhow::Result<()> { + async fn update_last_processed_version( + &self, + version: u64, + last_transaction_timestamp: Option, + ) -> anyhow::Result<()> { let mut conn = self.get_conn().await; + let timestamp = last_transaction_timestamp.map(|t| parse_timestamp(&t, version as i64)); let status = ProcessorStatus { processor: self.name().to_string(), last_success_version: version as i64, + last_transaction_timestamp: timestamp, }; execute_with_better_error( &mut conn, @@ -120,6 +127,8 @@ pub trait ProcessorTrait: Send + Sync + Debug { processor_status::last_success_version .eq(excluded(processor_status::last_success_version)), processor_status::last_updated.eq(excluded(processor_status::last_updated)), + processor_status::last_transaction_timestamp + .eq(excluded(processor_status::last_transaction_timestamp)), )), Some(" WHERE processor_status.last_success_version <= EXCLUDED.last_success_version "), ) diff --git a/rust/processor/src/schema.rs b/rust/processor/src/schema.rs index ace2892c..18b40fd3 100644 --- a/rust/processor/src/schema.rs +++ b/rust/processor/src/schema.rs @@ -870,6 +870,7 @@ diesel::table! { processor -> Varchar, last_success_version -> Int8, last_updated -> Timestamp, + last_transaction_timestamp -> Nullable, } } diff --git a/rust/processor/src/worker.rs b/rust/processor/src/worker.rs index 02e4cc31..7fa1ba45 100644 --- a/rust/processor/src/worker.rs +++ b/rust/processor/src/worker.rs @@ -616,7 +616,7 @@ impl Worker { batch_start_version = batch_end + 1; processor - .update_last_processed_version(batch_end) + .update_last_processed_version(batch_end, batch_end_txn_timestamp.clone()) .await .unwrap();