Skip to content

Commit

Permalink
chain/ethereum: Handle subgraph datasource triggers also in `DecoderH…
Browse files Browse the repository at this point in the history
…ook::after_decode` and refactor it
  • Loading branch information
incrypto32 authored and zorancv committed Dec 18, 2024
1 parent 84e9b8a commit 4dfa1af
Showing 1 changed file with 119 additions and 88 deletions.
207 changes: 119 additions & 88 deletions chain/ethereum/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use graph::components::trigger_processor::RunnableTriggers;
use graph::data_source::common::{
CallDecls, DeclaredCall, FindMappingABI, MappingABI, UnresolvedMappingABI,
};
use graph::data_source::CausalityRegion;
use graph::data_source::{CausalityRegion, MappingTrigger as MappingTriggerType};
use graph::env::ENV_VARS;
use graph::futures03::future::try_join;
use graph::futures03::stream::FuturesOrdered;
Expand Down Expand Up @@ -1034,6 +1034,115 @@ impl DecoderHook {
.collect();
Ok(labels)
}

fn collect_declared_calls<'a>(
&self,
runnables: &Vec<RunnableTriggers<'a, Chain>>,
) -> Vec<(Arc<HostMetrics>, DeclaredCall)> {
// Extract all hosted triggers from runnables
let all_triggers = runnables
.iter()
.flat_map(|runnable| &runnable.hosted_triggers);

// Collect calls from both onchain and subgraph triggers
let mut all_calls = Vec::new();

for trigger in all_triggers {
let host_metrics = trigger.host.host_metrics();

match &trigger.mapping_trigger.trigger {
MappingTriggerType::Onchain(t) => {
if let MappingTrigger::Log { calls, .. } = t {
for call in calls.clone() {
all_calls.push((host_metrics.cheap_clone(), call));
}
}
}
MappingTriggerType::Subgraph(t) => {
for call in t.calls.clone() {
// Convert subgraph call to the expected DeclaredCall type if needed
// or handle differently based on the types
all_calls.push((host_metrics.cheap_clone(), call));
}
}
MappingTriggerType::Offchain(_) => {}
}
}

all_calls
}

/// Deduplicate calls. Unfortunately, we can't get `DeclaredCall` to
/// implement `Hash` or `Ord` easily, so we can only deduplicate by
/// comparing the whole call not with a `HashSet` or `BTreeSet`.
/// Since that can be inefficient, we don't deduplicate if we have an
/// enormous amount of calls; in that case though, things will likely
/// blow up because of the amount of I/O that many calls cause.
/// Cutting off at 1000 is fairly arbitrary
fn deduplicate_calls(
&self,
calls: Vec<(Arc<HostMetrics>, DeclaredCall)>,
) -> Vec<(Arc<HostMetrics>, DeclaredCall)> {
if calls.len() >= 1000 {
return calls;
}

let mut uniq_calls = Vec::new();
for (metrics, call) in calls {
if !uniq_calls.iter().any(|(_, c)| c == &call) {
uniq_calls.push((metrics, call));
}
}
uniq_calls
}

/// Log information about failed eth calls. 'Failure' here simply
/// means that the call was reverted; outright errors lead to a real
/// error. For reverted calls, `self.eth_calls` returns the label
/// from the manifest for that call.
///
/// One reason why declared calls can fail is if they are attached
/// to the wrong handler, or if arguments are specified incorrectly.
/// Calls that revert every once in a while might be ok and what the
/// user intended, but we want to clearly log so that users can spot
/// mistakes in their manifest, which will lead to unnecessary eth
/// calls
fn log_declared_call_results(
logger: &Logger,
failures: &[String],
calls_count: usize,
trigger_count: usize,
elapsed: Duration,
) {
let fail_count = failures.len();

if fail_count > 0 {
let mut counts: Vec<_> = failures.iter().counts().into_iter().collect();
counts.sort_by_key(|(label, _)| *label);

let failure_summary = counts
.into_iter()
.map(|(label, count)| {
let times = if count == 1 { "time" } else { "times" };
format!("{label} ({count} {times})")
})
.join(", ");

error!(logger, "Declared calls failed";
"triggers" => trigger_count,
"calls_count" => calls_count,
"fail_count" => fail_count,
"calls_ms" => elapsed.as_millis(),
"failures" => format!("[{}]", failure_summary)
);
} else {
debug!(logger, "Declared calls";
"triggers" => trigger_count,
"calls_count" => calls_count,
"calls_ms" => elapsed.as_millis()
);
}
}
}

#[async_trait]
Expand All @@ -1045,102 +1154,24 @@ impl blockchain::DecoderHook<Chain> for DecoderHook {
runnables: Vec<RunnableTriggers<'a, Chain>>,
metrics: &Arc<SubgraphInstanceMetrics>,
) -> Result<Vec<RunnableTriggers<'a, Chain>>, MappingError> {
/// Log information about failed eth calls. 'Failure' here simply
/// means that the call was reverted; outright errors lead to a real
/// error. For reverted calls, `self.eth_calls` returns the label
/// from the manifest for that call.
///
/// One reason why declared calls can fail is if they are attached
/// to the wrong handler, or if arguments are specified incorrectly.
/// Calls that revert every once in a while might be ok and what the
/// user intended, but we want to clearly log so that users can spot
/// mistakes in their manifest, which will lead to unnecessary eth
/// calls
fn log_results(
logger: &Logger,
failures: &[String],
calls_count: usize,
trigger_count: usize,
elapsed: Duration,
) {
let fail_count = failures.len();

if fail_count > 0 {
let mut counts: Vec<_> = failures.iter().counts().into_iter().collect();
counts.sort_by_key(|(label, _)| *label);
let counts = counts
.into_iter()
.map(|(label, count)| {
let times = if count == 1 { "time" } else { "times" };
format!("{label} ({count} {times})")
})
.join(", ");
error!(logger, "Declared calls failed";
"triggers" => trigger_count,
"calls_count" => calls_count,
"fail_count" => fail_count,
"calls_ms" => elapsed.as_millis(),
"failures" => format!("[{}]", counts));
} else {
debug!(logger, "Declared calls";
"triggers" => trigger_count,
"calls_count" => calls_count,
"calls_ms" => elapsed.as_millis());
}
}

if ENV_VARS.mappings.disable_declared_calls {
return Ok(runnables);
}

let _section = metrics.stopwatch.start_section("declared_ethereum_call");

let start = Instant::now();
let calls: Vec<_> = runnables
.iter()
.map(|r| &r.hosted_triggers)
.flatten()
.filter_map(|trigger| {
trigger
.mapping_trigger
.trigger
.as_onchain()
.map(|t| (trigger.host.host_metrics(), t))
})
.filter_map(|(metrics, trigger)| match trigger {
MappingTrigger::Log { calls, .. } => Some(
calls
.clone()
.into_iter()
.map(move |call| (metrics.cheap_clone(), call)),
),
MappingTrigger::Block { .. } | MappingTrigger::Call { .. } => None,
})
.flatten()
.collect();
// Collect and process declared calls
let calls = self.collect_declared_calls(&runnables);
let deduplicated_calls = self.deduplicate_calls(calls);

// Deduplicate calls. Unfortunately, we can't get `DeclaredCall` to
// implement `Hash` or `Ord` easily, so we can only deduplicate by
// comparing the whole call not with a `HashSet` or `BTreeSet`.
// Since that can be inefficient, we don't deduplicate if we have an
// enormous amount of calls; in that case though, things will likely
// blow up because of the amount of I/O that many calls cause.
// Cutting off at 1000 is fairly arbitrary
let calls = if calls.len() < 1000 {
let mut uniq_calls = Vec::new();
for (metrics, call) in calls {
if !uniq_calls.iter().any(|(_, c)| c == &call) {
uniq_calls.push((metrics, call));
}
}
uniq_calls
} else {
calls
};
// Execute calls and log results
let calls_count = deduplicated_calls.len();
let results = self
.eth_calls(logger, block_ptr, deduplicated_calls)
.await?;

let calls_count = calls.len();
let results = self.eth_calls(logger, block_ptr, calls).await?;
log_results(
Self::log_declared_call_results(
logger,
&results,
calls_count,
Expand Down

0 comments on commit 4dfa1af

Please sign in to comment.