Skip to content

Commit

Permalink
[Indexer-Grpc-V2] Implement filters.
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 committed Feb 4, 2025
1 parent ab9da6b commit 0473277
Show file tree
Hide file tree
Showing 18 changed files with 1,138 additions and 898 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ lru = "0.7.5"
lz4 = "1.25.0"
maplit = "1.0.2"
merlin = "3"
memchr = "2.7.4"
memory-stats = "1.0.0"
mime = "0.3.16"
mini-moka = { version = "0.10.3", features = ["sync"] }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ anyhow = { workspace = true }
aptos-indexer-grpc-server-framework = { workspace = true }
aptos-indexer-grpc-utils = { workspace = true }
aptos-protos = { workspace = true }
aptos-transaction-filter = { workspace = true }
async-trait = { workspace = true }
build_html = { workspace = true }
clap = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use crate::{config::HistoricalDataServiceConfig, connection_manager::ConnectionManager};
use aptos_indexer_grpc_utils::file_store_operator_v2::file_store_reader::FileStoreReader;
use aptos_protos::indexer::v1::{GetTransactionsRequest, TransactionsResponse};
use aptos_transaction_filter::BooleanTransactionFilter;
use futures::executor::block_on;
use std::{
sync::Arc,
Expand All @@ -15,6 +16,7 @@ use tracing::info;
use uuid::Uuid;

const DEFAULT_MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 10000;
const MAX_FILTER_SIZE: usize = 1000;

pub struct HistoricalDataService {
chain_id: u64,
Expand Down Expand Up @@ -61,6 +63,25 @@ impl HistoricalDataService {
}
let starting_version = request.starting_version.unwrap();

let filter = if let Some(proto_filter) = request.transaction_filter {
match BooleanTransactionFilter::new_from_proto(
proto_filter,
Some(MAX_FILTER_SIZE),
) {
Ok(filter) => Some(filter),
Err(e) => {
let err = Err(Status::invalid_argument(format!(
"Invalid transaction_filter: {e:?}."
)));
info!("Client error: {err:?}.");
let _ = response_sender.blocking_send(err);
continue;
},
}
} else {
None
};

let max_num_transactions_per_batch = if let Some(batch_size) = request.batch_size {
batch_size as usize
} else {
Expand All @@ -77,6 +98,7 @@ impl HistoricalDataService {
starting_version,
ending_version,
max_num_transactions_per_batch,
filter,
response_sender,
)
.await
Expand All @@ -95,6 +117,7 @@ impl HistoricalDataService {
starting_version: u64,
ending_version: Option<u64>,
max_num_transactions_per_batch: usize,
filter: Option<BooleanTransactionFilter>,
response_sender: tokio::sync::mpsc::Sender<Result<TransactionsResponse, Status>>,
) {
info!(stream_id = id, "Start streaming, starting_version: {starting_version}, ending_version: {ending_version:?}.");
Expand All @@ -120,43 +143,49 @@ impl HistoricalDataService {
let (tx, mut rx) = channel(1);

let file_store_reader = self.file_store_reader.clone();
let filter = filter.clone();
tokio::spawn(async move {
file_store_reader
.get_transaction_batch(
next_version,
/*retries=*/ 3,
/*max_files=*/ None,
filter,
tx,
)
.await;
});

let mut close_to_latest = false;
while let Some((transactions, batch_size_bytes)) = rx.recv().await {
while let Some((transactions, batch_size_bytes, timestamp)) = rx.recv().await {
next_version += transactions.len() as u64;
size_bytes += batch_size_bytes as u64;
let timestamp = transactions.first().unwrap().timestamp.unwrap();
let timestamp_since_epoch =
Duration::new(timestamp.seconds as u64, timestamp.nanos as u32);
let now_since_epoch = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let delta = now_since_epoch.saturating_sub(timestamp_since_epoch);

// TODO(grao): Double check if this threshold makes sense.
if delta < Duration::from_secs(60) {
close_to_latest = true;
}
let responses = transactions
.chunks(max_num_transactions_per_batch)
.map(|chunk| TransactionsResponse {
transactions: chunk.to_vec(),
chain_id: Some(self.chain_id),
});
for response in responses {
if response_sender.send(Ok(response)).await.is_err() {
// NOTE: We are not recalculating the version and size_bytes for the stream
// progress since nobody cares about the accurate if client has dropped the
// connection.
info!(stream_id = id, "Client dropped.");
break 'out;

if !transactions.is_empty() {
let responses =
transactions
.chunks(max_num_transactions_per_batch)
.map(|chunk| TransactionsResponse {
transactions: chunk.to_vec(),
chain_id: Some(self.chain_id),
});
for response in responses {
if response_sender.send(Ok(response)).await.is_err() {
// NOTE: We are not recalculating the version and size_bytes for the stream
// progress since nobody cares about the accurate if client has dropped the
// connection.
info!(stream_id = id, "Client dropped.");
break 'out;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
live_data_service::{data_manager::DataManager, fetch_manager::FetchManager},
};
use aptos_protos::transaction::v1::Transaction;
use aptos_transaction_filter::{BooleanTransactionFilter, Filterable};
use prost::Message;
use std::sync::Arc;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -41,6 +42,7 @@ impl<'a> InMemoryCache<'a> {
ending_version: u64,
max_num_transactions_per_batch: usize,
max_bytes_per_batch: usize,
filter: &Option<BooleanTransactionFilter>,
) -> Option<(Vec<Transaction>, usize)> {
while starting_version >= self.data_manager.read().await.end_version {
trace!("Reached head, wait...");
Expand Down Expand Up @@ -82,8 +84,10 @@ impl<'a> InMemoryCache<'a> {
{
if let Some(transaction) = data_manager.get_data(version).as_ref() {
// NOTE: We allow 1 more txn beyond the size limit here, for simplicity.
total_bytes += transaction.encoded_len();
result.push(transaction.as_ref().clone());
if filter.is_none() || filter.as_ref().unwrap().matches(&transaction) {
total_bytes += transaction.encoded_len();
result.push(transaction.as_ref().clone());
}
version += 1;
} else {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ use crate::{
live_data_service::in_memory_cache::InMemoryCache,
};
use aptos_protos::indexer::v1::{GetTransactionsRequest, TransactionsResponse};
use aptos_transaction_filter::BooleanTransactionFilter;
use std::{sync::Arc, time::Duration};
use tokio::sync::mpsc::{Receiver, Sender};
use tonic::{Request, Status};
use tracing::info;
use uuid::Uuid;

static MAX_BYTES_PER_BATCH: usize = 20 * (1 << 20);
const MAX_BYTES_PER_BATCH: usize = 20 * (1 << 20);
const MAX_FILTER_SIZE: usize = 1000;

pub struct LiveDataService<'a> {
chain_id: u64,
Expand Down Expand Up @@ -77,6 +79,25 @@ impl<'a> LiveDataService<'a> {
continue;
}

let filter = if let Some(proto_filter) = request.transaction_filter {
match BooleanTransactionFilter::new_from_proto(
proto_filter,
Some(MAX_FILTER_SIZE),
) {
Ok(filter) => Some(filter),
Err(e) => {
let err = Err(Status::invalid_argument(format!(
"Invalid transaction_filter: {e:?}."
)));
info!("Client error: {err:?}.");
let _ = response_sender.blocking_send(err);
continue;
},
}
} else {
None
};

let max_num_transactions_per_batch = if let Some(batch_size) = request.batch_size {
batch_size as usize
} else {
Expand All @@ -94,6 +115,7 @@ impl<'a> LiveDataService<'a> {
ending_version,
max_num_transactions_per_batch,
MAX_BYTES_PER_BATCH,
filter,
response_sender,
)
.await
Expand All @@ -117,6 +139,7 @@ impl<'a> LiveDataService<'a> {
ending_version: Option<u64>,
max_num_transactions_per_batch: usize,
max_bytes_per_batch: usize,
filter: Option<BooleanTransactionFilter>,
response_sender: tokio::sync::mpsc::Sender<Result<TransactionsResponse, Status>>,
) {
info!(stream_id = id, "Start streaming, starting_version: {starting_version}, ending_version: {ending_version:?}.");
Expand Down Expand Up @@ -145,6 +168,7 @@ impl<'a> LiveDataService<'a> {
ending_version,
max_num_transactions_per_batch,
max_bytes_per_batch,
&filter,
)
.await
{
Expand Down
1 change: 1 addition & 0 deletions ecosystem/indexer-grpc/indexer-grpc-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ rust-version = { workspace = true }
anyhow = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-protos = { workspace = true }
aptos-transaction-filter = { workspace = true }
async-trait = { workspace = true }
backoff = { workspace = true }
base64 = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use crate::{
},
};
use anyhow::Result;
use aptos_protos::transaction::v1::Transaction;
use aptos_protos::{transaction::v1::Transaction, util::timestamp::Timestamp};
use aptos_transaction_filter::{BooleanTransactionFilter, Filterable};
use prost::Message;
use std::{
path::PathBuf,
Expand Down Expand Up @@ -74,7 +75,8 @@ impl FileStoreReader {
version: u64,
retries: u8,
max_files: Option<usize>,
tx: Sender<(Vec<Transaction>, usize)>,
filter: Option<BooleanTransactionFilter>,
tx: Sender<(Vec<Transaction>, usize, Timestamp)>,
) {
trace!(
"Getting transactions from file store, version: {version}, max_files: {max_files:?}."
Expand Down Expand Up @@ -107,23 +109,28 @@ impl FileStoreReader {

for i in file_index..end_file_index {
let current_version = batch_metadata.files[i].first_version;
let mut size_bytes = batch_metadata.files[i].size_bytes;
let transactions = self
.get_transaction_file_at_version(current_version, retries)
.await;
if let Ok(mut transactions) = transactions {
let timestamp = transactions.last().unwrap().timestamp.unwrap();
let num_to_skip = version.saturating_sub(current_version) as usize;
let result = if num_to_skip > 0 {
let transactions_to_return = transactions.split_off(num_to_skip);
for transaction in transactions {
size_bytes -= transaction.encoded_len();
}
(transactions_to_return, size_bytes)
} else {
(transactions, size_bytes)
};
trace!("Got {} transactions from file store to send, size: {size_bytes}, first_version: {:?}", result.0.len(), result.0.first().map(|t| t.version));
if tx.send(result).await.is_err() {
if num_to_skip > 0 {
transactions = transactions.split_off(num_to_skip);
}
if let Some(ref filter) = filter {
transactions = transactions
.into_iter()
.filter(|t| filter.matches(&t))
.collect();
}
let size_bytes = transactions.iter().map(|t| t.encoded_len()).sum();
trace!("Got {} transactions from file store to send, size: {size_bytes}, first_version: {:?}", transactions.len(), transactions.first().map(|t| t.version));
if tx
.send((transactions, size_bytes, timestamp))
.await
.is_err()
{
break;
}
} else {
Expand Down
7 changes: 3 additions & 4 deletions ecosystem/indexer-grpc/transaction-filter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@ rust-version = { workspace = true }
[dependencies]
anyhow = { workspace = true }
aptos-protos = { workspace = true }

derivative = { workspace = true }
derive_builder = { workspace = true }

memchr = { workspace = true }
once_cell = { workspace = true }
prost = { workspace = true }

serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }

thiserror = { workspace = true }

[dev-dependencies]
Expand Down
Loading

0 comments on commit 0473277

Please sign in to comment.