Skip to content

Commit

Permalink
filter erc transfers and traces by from/to addresses (#123)
Browse files Browse the repository at this point in the history
* filter erc transfers and traces by from/to addresses

* review: move reward address to proper filter

* review: filter by rpc parameter

* fmt
  • Loading branch information
cool-mestorf authored Nov 15, 2023
1 parent fd8a752 commit 6e9ec88
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 10 deletions.
15 changes: 13 additions & 2 deletions crates/freeze/src/datasets/erc20_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct Erc20Transfers {
#[async_trait::async_trait]
impl Dataset for Erc20Transfers {
fn optional_parameters() -> Vec<Dim> {
vec![Dim::Contract, Dim::Topic0, Dim::Topic1, Dim::Topic2]
vec![Dim::Contract, Dim::Topic0, Dim::Topic1, Dim::Topic2, Dim::FromAddress, Dim::ToAddress]
}

fn use_block_ranges() -> bool {
Expand All @@ -34,9 +34,20 @@ impl CollectByBlock for Erc20Transfers {
type Response = Vec<Log>;

async fn extract(request: Params, source: Arc<Source>, _: Arc<Query>) -> R<Self::Response> {
let topics = [Some(ValueOrArray::Value(Some(*EVENT_ERC20_TRANSFER))), None, None, None];
let mut topics = [Some(ValueOrArray::Value(Some(*EVENT_ERC20_TRANSFER))), None, None, None];
if let Some(from_address) = &request.from_address {
let mut v = vec![0u8; 12];
v.append(&mut from_address.to_owned());
topics[1] = Some(ValueOrArray::Value(Some(H256::from_slice(&v[..]))));
}
if let Some(to_address) = &request.to_address {
let mut v = vec![0u8; 12];
v.append(&mut to_address.to_owned());
topics[2] = Some(ValueOrArray::Value(Some(H256::from_slice(&v[..]))));
}
let filter = Filter { topics, ..request.ethers_log_filter()? };
let logs = source.fetcher.get_logs(&filter).await?;

Ok(logs.into_iter().filter(|x| x.topics.len() == 3 && x.data.len() == 32).collect())
}

Expand Down
16 changes: 14 additions & 2 deletions crates/freeze/src/datasets/erc721_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct Erc721Transfers {
#[async_trait::async_trait]
impl Dataset for Erc721Transfers {
fn optional_parameters() -> Vec<Dim> {
vec![Dim::Contract]
vec![Dim::Contract, Dim::FromAddress, Dim::ToAddress]
}

fn use_block_ranges() -> bool {
Expand All @@ -34,9 +34,21 @@ impl CollectByBlock for Erc721Transfers {
type Response = Vec<Log>;

async fn extract(request: Params, source: Arc<Source>, _: Arc<Query>) -> R<Self::Response> {
let topics = [Some(ValueOrArray::Value(Some(*EVENT_ERC721_TRANSFER))), None, None, None];
let mut topics =
[Some(ValueOrArray::Value(Some(*EVENT_ERC721_TRANSFER))), None, None, None];
if let Some(from_address) = &request.from_address {
let mut v = vec![0u8; 12];
v.append(&mut from_address.to_owned());
topics[1] = Some(ValueOrArray::Value(Some(H256::from_slice(&v[..]))));
}
if let Some(to_address) = &request.to_address {
let mut v = vec![0u8; 12];
v.append(&mut to_address.to_owned());
topics[2] = Some(ValueOrArray::Value(Some(H256::from_slice(&v[..]))));
}
let filter = Filter { topics, ..request.ethers_log_filter()? };
let logs = source.fetcher.get_logs(&filter).await?;

Ok(logs.into_iter().filter(|x| x.topics.len() == 4 && x.data.len() == 0).collect())
}

Expand Down
12 changes: 9 additions & 3 deletions crates/freeze/src/datasets/native_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ pub struct NativeTransfers {
}

#[async_trait::async_trait]
impl Dataset for NativeTransfers {}
impl Dataset for NativeTransfers {
fn optional_parameters() -> Vec<Dim> {
vec![Dim::FromAddress, Dim::ToAddress]
}
}

#[async_trait::async_trait]
impl CollectByBlock for NativeTransfers {
type Response = Vec<Trace>;

async fn extract(request: Params, source: Arc<Source>, _: Arc<Query>) -> R<Self::Response> {
source.fetcher.trace_block(request.block_number()?.into()).await
let traces = source.fetcher.trace_block(request.block_number()?.into()).await?;
Ok(filter_traces_by_from_to_addresses(traces, &request.from_address, &request.to_address))
}

fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
Expand All @@ -40,7 +45,8 @@ impl CollectByTransaction for NativeTransfers {
type Response = Vec<Trace>;

async fn extract(request: Params, source: Arc<Source>, _: Arc<Query>) -> R<Self::Response> {
source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await
let traces = source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await?;
Ok(filter_traces_by_from_to_addresses(traces, &request.from_address, &request.to_address))
}

fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
Expand Down
49 changes: 46 additions & 3 deletions crates/freeze/src/datasets/traces.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@ pub struct Traces {
}

#[async_trait::async_trait]
impl Dataset for Traces {}
impl Dataset for Traces {
fn optional_parameters() -> Vec<Dim> {
vec![Dim::FromAddress, Dim::ToAddress]
}
}

#[async_trait::async_trait]
impl CollectByBlock for Traces {
type Response = Vec<Trace>;

async fn extract(request: Params, source: Arc<Source>, _: Arc<Query>) -> R<Self::Response> {
source.fetcher.trace_block(request.block_number()?.into()).await
let traces = source.fetcher.trace_block(request.block_number()?.into()).await?;
Ok(filter_traces_by_from_to_addresses(traces, &request.from_address, &request.to_address))
}

fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
Expand All @@ -53,7 +58,8 @@ impl CollectByTransaction for Traces {
type Response = Vec<Trace>;

async fn extract(request: Params, source: Arc<Source>, _: Arc<Query>) -> R<Self::Response> {
source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await
let traces = source.fetcher.trace_transaction(request.ethers_transaction_hash()?).await?;
Ok(filter_traces_by_from_to_addresses(traces, &request.from_address, &request.to_address))
}

fn transform(response: Self::Response, columns: &mut Self, query: &Arc<Query>) -> R<()> {
Expand All @@ -62,6 +68,43 @@ impl CollectByTransaction for Traces {
process_traces(&traces, columns, &query.schemas)
}
}

pub(crate) fn filter_traces_by_from_to_addresses(
traces: Vec<Trace>,
from_address: &Option<Vec<u8>>,
to_address: &Option<Vec<u8>>,
) -> Vec<Trace> {
// filter by from_address
let from_filter: Box<dyn Fn(&Trace) -> bool + Send> = if let Some(from_address) = from_address {
Box::new(move |trace| {
let from = match &trace.action {
Action::Call(action) => action.from,
Action::Create(action) => action.from,
Action::Suicide(action) => action.address,
_ => return false,
};
from.as_bytes() == from_address
})
} else {
Box::new(|_| true)
};
// filter by to_address
let to_filter: Box<dyn Fn(&Trace) -> bool + Send> = if let Some(to_address) = to_address {
Box::new(move |trace| {
let to = match &trace.action {
Action::Call(action) => action.to,
Action::Suicide(action) => action.refund_address,
Action::Reward(action) => action.author,
_ => return false,
};
to.as_bytes() == to_address
})
} else {
Box::new(|_| true)
};
traces.into_iter().filter(from_filter).filter(to_filter).collect()
}

/// process block into columns
pub(crate) fn process_traces(traces: &[Trace], columns: &mut Traces, schemas: &Schemas) -> R<()> {
let schema = schemas.get(&Datatype::Traces).ok_or(err("schema not provided"))?;
Expand Down

0 comments on commit 6e9ec88

Please sign in to comment.