Skip to content

Commit

Permalink
merge request items
Browse files Browse the repository at this point in the history
  • Loading branch information
tmcgroul committed Dec 4, 2023
1 parent f1f2515 commit 09e7ffc
Showing 1 changed file with 61 additions and 27 deletions.
88 changes: 61 additions & 27 deletions src/firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::datasource::{
use crate::pbcodec;
use crate::pbfirehose::single_block_request::Reference;
use crate::pbfirehose::{ForkStep, Request, Response, SingleBlockRequest, SingleBlockResponse};
use crate::pbtransforms::CombinedFilter;
use crate::pbtransforms::{CombinedFilter, CallToFilter, LogFilter};
use anyhow::{format_err, Context};
use async_stream::try_stream;
use futures_core::stream::Stream;
Expand Down Expand Up @@ -83,35 +83,35 @@ impl Firehose {
let filter = CombinedFilter::decode(&transform.value[..])?;

for log_filter in filter.log_filters {
let log_request = LogRequest {
address: log_filter
.addresses
.into_iter()
.map(|address| prefix_hex::encode(address))
.collect(),
topic0: log_filter
.event_signatures
.into_iter()
.map(|signature| prefix_hex::encode(signature))
.collect(),
};
logs.push(log_request);
let mut log_request = LogRequest::from(log_filter);
log_request.topic0.sort();

let to_merge = logs.iter_mut().find(|log| log.topic0 == log_request.topic0);
if let Some(to_merge) = to_merge {
for address in log_request.address {
if !to_merge.address.contains(&address) {
to_merge.address.push(address);
}
}
} else {
logs.push(log_request);
}
}

for call_filter in filter.call_filters {
let trace_request = TraceRequest {
address: call_filter
.addresses
.into_iter()
.map(|address| prefix_hex::encode(address))
.collect(),
sighash: call_filter
.signatures
.into_iter()
.map(|signature| prefix_hex::encode(signature))
.collect(),
};
traces.push(trace_request);
let mut trace_request = TraceRequest::from(call_filter);
trace_request.sighash.sort();

let to_merge = traces.iter_mut().find(|trace| trace.sighash == trace_request.sighash);
if let Some(to_merge) = to_merge {
for address in trace_request.address {
if !to_merge.address.contains(&address) {
to_merge.address.push(address);
}
}
} else {
traces.push(trace_request);
}
}
}

Expand Down Expand Up @@ -302,6 +302,40 @@ impl Firehose {
}
}

impl From<LogFilter> for LogRequest {
fn from(value: LogFilter) -> Self {
LogRequest {
address: value
.addresses
.into_iter()
.map(|address| prefix_hex::encode(address))
.collect(),
topic0: value
.event_signatures
.into_iter()
.map(|signature| prefix_hex::encode(signature))
.collect(),
}
}
}

impl From<CallToFilter> for TraceRequest {
fn from(value: CallToFilter) -> Self {
TraceRequest {
address: value
.addresses
.into_iter()
.map(|address| prefix_hex::encode(address))
.collect(),
sighash: value
.signatures
.into_iter()
.map(|signature| prefix_hex::encode(signature))
.collect(),
}
}
}

impl TryFrom<BlockHeader> for pbcodec::BlockHeader {
type Error = anyhow::Error;

Expand Down

0 comments on commit 09e7ffc

Please sign in to comment.