diff --git a/Cargo.lock b/Cargo.lock index 451c924040e36..154a7ee80c269 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2088,7 +2088,6 @@ dependencies = [ "backoff", "base64 0.13.1", "chrono", - "cloud-storage", "dashmap", "futures", "itertools 0.12.1", @@ -2288,7 +2287,7 @@ dependencies = [ "ed25519-dalek 1.0.1", "hex", "hmac 0.12.1", - "jsonwebtoken 8.3.0", + "jsonwebtoken", "once_cell", "rand 0.7.3", "regex", @@ -2333,7 +2332,7 @@ dependencies = [ "dashmap", "hex", "hyper", - "jsonwebtoken 8.3.0", + "jsonwebtoken", "jwt", "once_cell", "rand 0.7.3", @@ -3744,7 +3743,7 @@ dependencies = [ "futures", "gcp-bigquery-client", "httpmock", - "jsonwebtoken 8.3.0", + "jsonwebtoken", "once_cell", "prometheus", "rand 0.7.3", @@ -3981,7 +3980,7 @@ dependencies = [ "hashbrown 0.14.3", "hex", "itertools 0.12.1", - "jsonwebtoken 8.3.0", + "jsonwebtoken", "move-binary-format", "move-bytecode-verifier", "move-core-types", @@ -4972,12 +4971,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c7f02d4ea65f2c1853089ffd8d2787bdbc63de2f0d29dedbcf8ccdfa0ccd4cf" -[[package]] -name = "base64" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" - [[package]] name = "base64" version = "0.13.1" @@ -5880,30 +5873,6 @@ dependencies = [ "cc", ] -[[package]] -name = "cloud-storage" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7602ac4363f68ac757d6b87dd5d850549a14d37489902ae639c06ecec06ad275" -dependencies = [ - "async-trait", - "base64 0.13.1", - "bytes", - "chrono", - "dotenv", - "futures-util", - "hex", - "jsonwebtoken 7.2.0", - "lazy_static", - "pem 0.8.3", - "percent-encoding", - "reqwest", - "ring 0.16.20", - "serde", - "serde_json", - "tokio", -] - [[package]] name = "codespan" version = "0.11.1" @@ -7039,12 +7008,6 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" -[[package]] -name = "dotenv" -version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" - [[package]] name = "downcast" version = "0.11.0" @@ -8221,7 +8184,7 @@ dependencies = [ "google-cloud-metadata", "google-cloud-token", "home", - "jsonwebtoken 8.3.0", + "jsonwebtoken", "reqwest", "serde", "serde_json", @@ -9323,20 +9286,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "jsonwebtoken" -version = "7.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afabcc15e437a6484fc4f12d0fd63068fe457bf93f1c148d3d9649c60b103f32" -dependencies = [ - "base64 0.12.3", - "pem 0.8.3", - "ring 0.16.20", - "serde", - "serde_json", - "simple_asn1 0.4.1", -] - [[package]] name = "jsonwebtoken" version = "8.3.0" @@ -9344,11 +9293,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378" dependencies = [ "base64 0.21.6", - "pem 1.1.1", + "pem", "ring 0.16.20", "serde", "serde_json", - "simple_asn1 0.6.2", + "simple_asn1", ] [[package]] @@ -9431,7 +9380,7 @@ dependencies = [ "jsonpath_lib", "k8s-openapi", "kube-core", - "pem 1.1.1", + "pem", "pin-project", "rustls 0.20.9", "rustls-pemfile 0.2.1", @@ -11317,17 +11266,6 @@ dependencies = [ "num-traits", ] -[[package]] -name = "num-bigint" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "090c7f9998ee0ff65aa5b723e4009f7b217707f1fb5ea551329cc4d6231fb304" -dependencies = [ - "autocfg", - "num-integer", - "num-traits", -] - [[package]] name = "num-bigint" version = "0.3.3" @@ -11949,17 +11887,6 @@ dependencies = [ "crypto-mac 0.8.0", ] -[[package]] -name = "pem" -version = "0.8.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd56cbd21fea48d0c440b41cd69c589faacade08c992d9a54e471b79d0fd13eb" -dependencies = [ - "base64 0.13.1", - "once_cell", - "regex", -] - [[package]] name = "pem" version = "1.1.1" @@ -14484,17 +14411,6 @@ dependencies = [ "similar", ] -[[package]] -name = "simple_asn1" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "692ca13de57ce0613a363c8c2f1de925adebc81b04c923ac60c5488bb44abe4b" -dependencies = [ - "chrono", - "num-bigint 0.2.6", - "num-traits", -] - [[package]] name = "simple_asn1" version = "0.6.2" diff --git a/Cargo.toml b/Cargo.toml index 19291d0b5075b..4b82ceb6f032f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -508,10 +508,6 @@ claims = "0.7" clap = { version = "4.3.9", features = ["derive", "env", "unstable-styles"] } clap-verbosity-flag = "2.1.1" clap_complete = "4.4.1" -cloud-storage = { version = "0.11.1", features = [ - "global-client", - "rustls-tls", -], default-features = false } codespan = "0.11.1" codespan-reporting = "0.11.1" colored = "2.0.0" diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml index 2c0516ddda804..0d8ebaa66e5cc 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml @@ -20,7 +20,6 @@ async-trait = { workspace = true } backoff = { workspace = true } base64 = { workspace = true } chrono = { workspace = true } -cloud-storage = { workspace = true } dashmap = { workspace = true } futures = { workspace = true } itertools = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/config.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/config.rs index 31f6b41941dd9..43ccdf0315e64 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/config.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/config.rs @@ -28,7 +28,6 @@ const fn default_enable_compression() -> bool { #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(tag = "file_store_type")] pub enum IndexerGrpcFileStoreConfig { - GcsFileStore(GcsFileStore), LocalFileStore(LocalFileStore), } @@ -44,15 +43,6 @@ impl Default for IndexerGrpcFileStoreConfig { impl IndexerGrpcFileStoreConfig { pub fn create(&self) -> Box { match self { - IndexerGrpcFileStoreConfig::GcsFileStore(gcs_file_store) => { - Box::new(crate::file_store_operator::gcs::GcsFileStoreOperator::new( - gcs_file_store.gcs_file_store_bucket_name.clone(), - gcs_file_store - .gcs_file_store_service_account_key_path - .clone(), - gcs_file_store.enable_compression, - )) - }, IndexerGrpcFileStoreConfig::LocalFileStore(local_file_store) => Box::new( crate::file_store_operator::local::LocalFileStoreOperator::new( local_file_store.local_file_store_path.clone(), diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/gcs.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/gcs.rs deleted file mode 100644 index 444791cc23c63..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/gcs.rs +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use crate::{ - compression_util::{FileEntry, FileStoreMetadata, StorageFormat, FILE_ENTRY_TRANSACTION_COUNT}, - counters::{log_grpc_step, IndexerGrpcStep}, - file_store_operator::{FileStoreOperator, METADATA_FILE_NAME}, -}; -use anyhow::bail; -use aptos_protos::transaction::v1::Transaction; -use cloud_storage::{Bucket, Object}; -use std::env; - -const JSON_FILE_TYPE: &str = "application/json"; -// The environment variable to set the service account path. -const SERVICE_ACCOUNT_ENV_VAR: &str = "SERVICE_ACCOUNT"; -const FILE_STORE_METADATA_TIMEOUT_MILLIS: u128 = 200; - -#[derive(Clone)] -pub struct GcsFileStoreOperator { - bucket_name: String, - file_store_metadata_last_updated: std::time::Instant, - storage_format: StorageFormat, -} - -impl GcsFileStoreOperator { - pub fn new( - bucket_name: String, - service_account_path: String, - enable_compression: bool, - ) -> Self { - env::set_var(SERVICE_ACCOUNT_ENV_VAR, service_account_path); - let storage_format = if enable_compression { - StorageFormat::Lz4CompressedProto - } else { - StorageFormat::JsonBase64UncompressedProto - }; - Self { - bucket_name, - file_store_metadata_last_updated: std::time::Instant::now(), - storage_format, - } - } -} - -#[async_trait::async_trait] -impl FileStoreOperator for GcsFileStoreOperator { - /// Bootstraps the file store operator. This is required before any other operations. - async fn verify_storage_bucket_existence(&self) { - tracing::info!( - bucket_name = self.bucket_name, - "Before file store operator starts, verify the bucket exists." - ); - // Verifies the bucket exists. - Bucket::read(&self.bucket_name) - .await - .expect("Failed to read bucket."); - } - - fn storage_format(&self) -> StorageFormat { - self.storage_format - } - - fn store_name(&self) -> &str { - "GCS" - } - - async fn get_raw_file(&self, version: u64) -> anyhow::Result> { - let file_entry_key = FileEntry::build_key(version, self.storage_format).to_string(); - match Object::download(&self.bucket_name, file_entry_key.as_str()).await { - Ok(file) => Ok(file), - Err(cloud_storage::Error::Other(err)) => { - if err.contains("No such object: ") { - anyhow::bail!("[Indexer File] Transactions file not found. Gap might happen between cache and file store. {}", err) - } else { - anyhow::bail!( - "[Indexer File] Error happens when downloading transaction file. {}", - err - ); - } - }, - Err(err) => { - anyhow::bail!( - "[Indexer File] Error happens when transaction file. {}", - err - ); - }, - } - } - - /// Gets the metadata from the file store. Operator will panic if error happens when accessing the metadata file(except not found). - async fn get_file_store_metadata(&self) -> Option { - match Object::download(&self.bucket_name, METADATA_FILE_NAME).await { - Ok(metadata) => { - let metadata: FileStoreMetadata = - serde_json::from_slice(&metadata).expect("Expected metadata to be valid JSON."); - Some(metadata) - }, - Err(cloud_storage::Error::Other(err)) => { - if err.contains("No such object: ") { - // Metadata is not found. - None - } else { - panic!( - "[Indexer File] Error happens when accessing metadata file. {}", - err - ); - } - }, - Err(e) => { - panic!( - "[Indexer File] Error happens when accessing metadata file. {}", - e - ); - }, - } - } - - /// If the file store is empty, the metadata will be created; otherwise, return the existing metadata. - async fn update_file_store_metadata_with_timeout( - &mut self, - expected_chain_id: u64, - version: u64, - ) -> anyhow::Result<()> { - if let Some(metadata) = self.get_file_store_metadata().await { - assert_eq!(metadata.chain_id, expected_chain_id, "Chain ID mismatch."); - assert_eq!( - metadata.storage_format, self.storage_format, - "Storage format mismatch." - ); - } - if self.file_store_metadata_last_updated.elapsed().as_millis() - < FILE_STORE_METADATA_TIMEOUT_MILLIS - { - bail!("File store metadata is updated too frequently.") - } - self.update_file_store_metadata_internal(expected_chain_id, version) - .await?; - Ok(()) - } - - /// Updates the file store metadata. This is only performed by the operator when new file transactions are uploaded. - async fn update_file_store_metadata_internal( - &mut self, - chain_id: u64, - version: u64, - ) -> anyhow::Result<()> { - let metadata = FileStoreMetadata::new(chain_id, version, self.storage_format); - // If the metadata is not updated, the indexer will be restarted. - Object::create( - self.bucket_name.as_str(), - serde_json::to_vec(&metadata).unwrap(), - METADATA_FILE_NAME, - JSON_FILE_TYPE, - ) - .await?; - self.file_store_metadata_last_updated = std::time::Instant::now(); - Ok(()) - } - - /// Uploads the transactions to the file store. The transactions are grouped into batches of BLOB_STORAGE_SIZE. - /// Updates the file store metadata after the upload. - async fn upload_transaction_batch( - &mut self, - _chain_id: u64, - transactions: Vec, - ) -> anyhow::Result<(u64, u64)> { - let start_version = transactions.first().unwrap().version; - let end_version = transactions.last().unwrap().version; - let batch_size = transactions.len(); - anyhow::ensure!( - start_version % FILE_ENTRY_TRANSACTION_COUNT == 0, - "Starting version has to be a multiple of BLOB_STORAGE_SIZE." - ); - anyhow::ensure!( - batch_size == FILE_ENTRY_TRANSACTION_COUNT as usize, - "The number of transactions to upload has to be multiplier of BLOB_STORAGE_SIZE." - ); - let start_time = std::time::Instant::now(); - let bucket_name = self.bucket_name.clone(); - let file_entry = FileEntry::from_transactions(transactions, self.storage_format); - let file_entry_key = FileEntry::build_key(start_version, self.storage_format).to_string(); - log_grpc_step( - "file_worker", - IndexerGrpcStep::FileStoreEncodedTxns, - Some(start_version as i64), - Some((start_version + FILE_ENTRY_TRANSACTION_COUNT - 1) as i64), - None, - None, - Some(start_time.elapsed().as_secs_f64()), - None, - Some(FILE_ENTRY_TRANSACTION_COUNT as i64), - None, - ); - Object::create( - bucket_name.clone().as_str(), - file_entry.into_inner(), - file_entry_key.as_str(), - JSON_FILE_TYPE, - ) - .await?; - Ok((start_version, end_version)) - } - - fn clone_box(&self) -> Box { - Box::new(self.clone()) - } -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/mod.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/mod.rs index e0bd22efa2407..c97cea0a2a831 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/mod.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator/mod.rs @@ -7,8 +7,6 @@ use crate::compression_util::{ use anyhow::{Context, Result}; use aptos_protos::transaction::v1::Transaction; -pub mod gcs; -pub use gcs::*; pub mod local; use crate::counters::TRANSACTION_STORE_FETCH_RETRIES; pub use local::*;