From b75577f9b252bf2a1b636c3a93bda955666ef12f Mon Sep 17 00:00:00 2001 From: AurelienFT <32803821+AurelienFT@users.noreply.github.com> Date: Thu, 22 Feb 2024 14:56:39 +0100 Subject: [PATCH] Add gathering of transfers in the node + prep work for ABIs (#4646) * Add 2 RPC endpoint with fake data * Add stream grpc * Add initial impl of execution traces for abi calls * Link job on getting info with API. * Add draft gather infos * Cleanup execution trace * Cargo clippy pass * Update grpc public & stream api for execution traces * Execution trace parameters & return value are now ser in json format * Cargo clippy pass * Fix execution trace unit tests * Add serde_json as an optional dep * Update proto files * Fix finality filter and add placeholder for slot transfers endpoints * Format and add a start of logic for compiling results * Fix compil errors for basic feature * Fix clippy issues * Add JSON-RPC structure endpoint * Flatten code for new abi get_slot_transfers * Fix tu * Fix logic in the stream and add it in the gRPC endpoint and Json-RPC * Format and update massa-runtime * Revert config change on the client * Revert changes on initial rolls file * Fix final broadcast by default on API. * Fix initial rolls * Add transfer when calling an SC --------- Co-authored-by: sydhds --- Cargo.lock | 19 +- Cargo.toml | 4 +- massa-api-exports/src/execution.rs | 30 ++- massa-api/Cargo.toml | 1 + massa-api/src/lib.rs | 5 + massa-api/src/private.rs | 6 +- massa-api/src/public.rs | 79 +++++- massa-execution-exports/Cargo.toml | 1 + massa-execution-exports/src/channels.rs | 6 + .../src/controller_traits.rs | 16 ++ massa-execution-exports/src/lib.rs | 5 + massa-execution-exports/src/settings.rs | 6 + .../src/test_exports/config.rs | 3 + massa-execution-exports/src/types.rs | 140 +++++++++++ massa-execution-worker/Cargo.toml | 5 + massa-execution-worker/src/active_history.rs | 2 +- massa-execution-worker/src/context.rs | 4 +- massa-execution-worker/src/controller.rs | 30 +++ massa-execution-worker/src/execution.rs | 238 +++++++++++++++--- massa-execution-worker/src/lib.rs | 3 + .../src/tests/scenarios_mandatories.rs | 233 +++++++++++++++++ .../src/tests/tests_active_history.rs | 2 + massa-execution-worker/src/tests/universe.rs | 25 +- .../src/tests/wasm/et_deploy_sc.wasm | Bin 0 -> 5782 bytes .../src/tests/wasm/et_init_sc.wasm | Bin 0 -> 1083 bytes .../src/tests/wasm/execution_trace.wasm | Bin 0 -> 3593 bytes massa-execution-worker/src/trace_history.rs | 59 +++++ massa-grpc/Cargo.toml | 2 + massa-grpc/src/handler.rs | 108 ++++++++ massa-grpc/src/lib.rs | 3 + massa-grpc/src/public.rs | 220 +++++++++++++++- massa-grpc/src/stream/mod.rs | 4 + .../src/stream/new_slot_abi_call_stacks.rs | 139 ++++++++++ massa-grpc/src/stream/new_slot_transfers.rs | 166 ++++++++++++ massa-grpc/src/tests/mock.rs | 2 + massa-node/Cargo.toml | 1 + massa-node/base_config/config.toml | 4 + massa-node/base_config/openrpc.json | 72 +++++- massa-node/src/main.rs | 13 + massa-node/src/settings.rs | 3 + 40 files changed, 1602 insertions(+), 57 deletions(-) create mode 100644 massa-execution-worker/src/tests/wasm/et_deploy_sc.wasm create mode 100644 massa-execution-worker/src/tests/wasm/et_init_sc.wasm create mode 100644 massa-execution-worker/src/tests/wasm/execution_trace.wasm create mode 100644 massa-execution-worker/src/trace_history.rs create mode 100644 massa-grpc/src/stream/new_slot_abi_call_stacks.rs create mode 100644 massa-grpc/src/stream/new_slot_transfers.rs diff --git a/Cargo.lock b/Cargo.lock index 722c14159a9..842c5902d7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -156,9 +156,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.77" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9d19de80eff169429ac1e9f48fffb163916b448a44e8e046186232046d9e1f9" +checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca" [[package]] name = "arrayref" @@ -2566,7 +2566,7 @@ dependencies = [ [[package]] name = "massa-proto-rs" version = "0.1.0" -source = "git+https://github.com/massalabs/massa-proto-rs?rev=84678fb77cf6d06d85d55e2c20502908ff292e61#84678fb77cf6d06d85d55e2c20502908ff292e61" +source = "git+https://github.com/massalabs/massa-proto-rs?rev=25638b3b7d387afbca81afcb02bae1af03697f18#25638b3b7d387afbca81afcb02bae1af03697f18" dependencies = [ "glob", "prost", @@ -2579,7 +2579,7 @@ dependencies = [ [[package]] name = "massa-sc-runtime" version = "0.10.0" -source = "git+https://github.com/massalabs/massa-sc-runtime?rev=845dc733a2f66b1b219a75d71f30c8e2a7e8a310#845dc733a2f66b1b219a75d71f30c8e2a7e8a310" +source = "git+https://github.com/massalabs/massa-sc-runtime?rev=32102813c05559c17d8ac1e1ee673802c2ba2ea8#32102813c05559c17d8ac1e1ee673802c2ba2ea8" dependencies = [ "anyhow", "as-ffi-bindings", @@ -2596,6 +2596,7 @@ dependencies = [ "prost-types", "rand", "regex", + "rust_decimal", "serde", "serde_json", "serial_test", @@ -2914,6 +2915,7 @@ dependencies = [ "parking_lot", "rand", "rand_xoshiro", + "schnellru", "serde_json", "sha2 0.10.8", "sha3", @@ -3025,6 +3027,7 @@ dependencies = [ "num", "parking_lot", "serde", + "serde_json", "thiserror", "tokio", "tokio-stream", @@ -5324,18 +5327,18 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" [[package]] name = "thiserror" -version = "1.0.52" +version = "1.0.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83a48fd946b02c0a526b2e9481c8e2a17755e47039164a86c4070446e3a4614d" +checksum = "6e3de26b0965292219b4287ff031fcba86837900fe9cd2b34ea8ad893c0953d2" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.52" +version = "1.0.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7fbe9b594d6568a6a1443250a7e67d80b74e1e96f6d1715e1e21cc1888291d3" +checksum = "268026685b2be38d7103e9e507c938a1fcb3d7e6eb15e87870b617bf37b6d581" dependencies = [ "proc-macro2 1.0.71", "quote 1.0.33", diff --git a/Cargo.toml b/Cargo.toml index 8dd38255194..829f0eb81c9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -106,8 +106,8 @@ massa_versioning = { path = "./massa-versioning" } massa_wallet = { path = "./massa-wallet" } # Massa projects dependencies -massa-proto-rs = { git = "https://github.com/massalabs/massa-proto-rs", "rev" = "84678fb77cf6d06d85d55e2c20502908ff292e61" } -massa-sc-runtime = { git = "https://github.com/massalabs/massa-sc-runtime", "rev" = "845dc733a2f66b1b219a75d71f30c8e2a7e8a310" } +massa-proto-rs = { git = "https://github.com/massalabs/massa-proto-rs", "rev" = "25638b3b7d387afbca81afcb02bae1af03697f18" } +massa-sc-runtime = { git = "https://github.com/massalabs/massa-sc-runtime", "rev" = "32102813c05559c17d8ac1e1ee673802c2ba2ea8" } peernet = { git = "https://github.com/massalabs/PeerNet", "rev" = "04b05ddd320fbe76cc858115af7b5fc28bdb8310" } # Common dependencies diff --git a/massa-api-exports/src/execution.rs b/massa-api-exports/src/execution.rs index 57afd5b99cf..0c266341977 100644 --- a/massa-api-exports/src/execution.rs +++ b/massa-api-exports/src/execution.rs @@ -1,7 +1,10 @@ // Copyright (c) 2022 MASSA LABS use massa_final_state::StateChanges; -use massa_models::{address::Address, amount::Amount, output_event::SCOutputEvent, slot::Slot}; +use massa_models::{ + address::Address, amount::Amount, operation::OperationId, output_event::SCOutputEvent, + slot::Slot, +}; use serde::{Deserialize, Serialize}; use std::{collections::VecDeque, fmt::Display}; @@ -85,3 +88,28 @@ pub struct ReadOnlyCall { /// fee pub fee: Option, } + +/// Context of the transfer +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub enum TransferContext { + #[serde(rename = "operation_id")] + /// Transfer made in an operation + Operation(OperationId), + #[serde(rename = "asc_index")] + /// Transfer made in an asynchronous call + ASC(u64), +} + +/// Structure defining a transfer +#[derive(Clone, Debug, Deserialize, Serialize)] +pub struct Transfer { + /// The sender of the transfer + pub from: Address, + /// The receiver of the transfer + pub to: Address, + /// The amount of the transfer + pub amount: Amount, + /// Context + pub context: TransferContext, +} diff --git a/massa-api/Cargo.toml b/massa-api/Cargo.toml index ebfb3fc4081..d3f4c6e3608 100644 --- a/massa-api/Cargo.toml +++ b/massa-api/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [features] test-exports = ["dep:massa_channel", "dep:massa_grpc", "massa_grpc/test-exports"] +execution-trace = ["massa_execution_exports/execution-trace"] [dependencies] massa_api_exports = { workspace = true } diff --git a/massa-api/src/lib.rs b/massa-api/src/lib.rs index 8ec346f37a4..d13f96fbd67 100644 --- a/massa-api/src/lib.rs +++ b/massa-api/src/lib.rs @@ -11,6 +11,7 @@ use jsonrpsee::proc_macros::rpc; use jsonrpsee::server::middleware::HostFilterLayer; use jsonrpsee::server::{BatchRequestConfig, ServerBuilder, ServerHandle}; use jsonrpsee::RpcModule; +use massa_api_exports::execution::Transfer; use massa_api_exports::{ address::{AddressFilter, AddressInfo}, block::{BlockInfo, BlockSummary}, @@ -373,6 +374,10 @@ pub trait MassaRpc { #[method(name = "get_addresses_bytecode")] async fn get_addresses_bytecode(&self, args: Vec) -> RpcResult>>; + /// Get all the transfers for a slot + #[method(name = "get_slots_transfers")] + async fn get_slots_transfers(&self, arg: Vec) -> RpcResult>>; + /// Adds operations to pool. Returns operations that were ok and sent to pool. #[method(name = "send_operations")] async fn send_operations(&self, arg: Vec) -> RpcResult>; diff --git a/massa-api/src/private.rs b/massa-api/src/private.rs index b348b593c82..a22adc3f7c4 100644 --- a/massa-api/src/private.rs +++ b/massa-api/src/private.rs @@ -11,7 +11,7 @@ use massa_api_exports::{ datastore::{DatastoreEntryInput, DatastoreEntryOutput}, endorsement::EndorsementInfo, error::ApiError, - execution::{ExecuteReadOnlyResponse, ReadOnlyBytecodeExecution, ReadOnlyCall}, + execution::{ExecuteReadOnlyResponse, ReadOnlyBytecodeExecution, ReadOnlyCall, Transfer}, node::NodeStatus, operation::{OperationInfo, OperationInput}, page::{PageRequest, PagedVec}, @@ -195,6 +195,10 @@ impl MassaRpcServer for API { ); } + async fn get_slots_transfers(&self, _: Vec) -> RpcResult>> { + crate::wrong_api::>>() + } + async fn get_status(&self) -> RpcResult { crate::wrong_api::() } diff --git a/massa-api/src/public.rs b/massa-api/src/public.rs index bc2166c0bdd..99b800098c5 100644 --- a/massa-api/src/public.rs +++ b/massa-api/src/public.rs @@ -12,7 +12,9 @@ use massa_api_exports::{ datastore::{DatastoreEntryInput, DatastoreEntryOutput}, endorsement::EndorsementInfo, error::ApiError, - execution::{ExecuteReadOnlyResponse, ReadOnlyBytecodeExecution, ReadOnlyCall, ReadOnlyResult}, + execution::{ + ExecuteReadOnlyResponse, ReadOnlyBytecodeExecution, ReadOnlyCall, ReadOnlyResult, Transfer, + }, node::NodeStatus, operation::{OperationInfo, OperationInput}, page::{PageRequest, PagedVec}, @@ -121,6 +123,81 @@ impl MassaRpcServer for API { crate::wrong_api::<()>() } + #[cfg(feature = "execution-trace")] + async fn get_slots_transfers(&self, slots: Vec) -> RpcResult>> { + use massa_api_exports::execution::TransferContext; + use std::str::FromStr; + + let mut res: Vec> = Vec::with_capacity(slots.len()); + for slot in slots { + let mut transfers = Vec::new(); + let abi_calls = self + .0 + .execution_controller + .get_slot_abi_call_stack(slot.clone().into()); + if let Some(abi_calls) = abi_calls { + // flatten & filter transfer trace in asc_call_stacks + + let abi_transfer_1 = "assembly_script_transfer_coins".to_string(); + let abi_transfer_2 = "assembly_script_transfer_coins_for".to_string(); + let abi_transfer_3 = "abi_transfer_coins".to_string(); + let transfer_abi_names = vec![abi_transfer_1, abi_transfer_2, abi_transfer_3]; + for (i, asc_call_stack) in abi_calls.asc_call_stacks.iter().enumerate() { + for abi_trace in asc_call_stack { + let only_transfer = abi_trace.flatten_filter(&transfer_abi_names); + for transfer in only_transfer { + let (t_from, t_to, t_amount) = transfer.parse_transfer(); + transfers.push(Transfer { + from: Address::from_str(&t_from).unwrap(), + to: Address::from_str(&t_to).unwrap(), + amount: Amount::from_raw(t_amount), + context: TransferContext::ASC(i as u64), + }); + } + } + } + + for op_call_stack in abi_calls.operation_call_stacks { + let op_id = op_call_stack.0; + let op_call_stack = op_call_stack.1; + for abi_trace in op_call_stack { + let only_transfer = abi_trace.flatten_filter(&transfer_abi_names); + for transfer in only_transfer { + let (t_from, t_to, t_amount) = transfer.parse_transfer(); + transfers.push(Transfer { + from: Address::from_str(&t_from).unwrap(), + to: Address::from_str(&t_to).unwrap(), + amount: Amount::from_raw(t_amount), + context: TransferContext::Operation(op_id), + }); + } + } + } + } + let transfers_op: Vec = self + .0 + .execution_controller + .get_transfers_for_slot(slot) + .unwrap_or_default() + .iter() + .map(|t| Transfer { + from: t.from, + to: t.to, + amount: t.amount, + context: TransferContext::Operation(t.op_id), + }) + .collect(); + transfers.extend(transfers_op); + res.push(transfers); + } + Ok(res) + } + + #[cfg(not(feature = "execution-trace"))] + async fn get_slots_transfers(&self, _: Vec) -> RpcResult>> { + RpcResult::Err(ApiError::BadRequest("feature execution-trace is not enabled".into()).into()) + } + async fn execute_read_only_bytecode( &self, reqs: Vec, diff --git a/massa-execution-exports/Cargo.toml b/massa-execution-exports/Cargo.toml index 0937cef4cdd..d92947fcc02 100644 --- a/massa-execution-exports/Cargo.toml +++ b/massa-execution-exports/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [features] gas_calibration = ["tempfile"] test-exports = ["massa_models/test-exports", "tempfile", "mockall"] +execution-trace = ["massa-sc-runtime/execution-trace"] [dependencies] displaydoc = {workspace = true} diff --git a/massa-execution-exports/src/channels.rs b/massa-execution-exports/src/channels.rs index 86e050c411b..8dd11748dc2 100644 --- a/massa-execution-exports/src/channels.rs +++ b/massa-execution-exports/src/channels.rs @@ -2,9 +2,15 @@ use crate::types::SlotExecutionOutput; +#[cfg(feature = "execution-trace")] +use crate::types::SlotAbiCallStack; + /// channels used by the execution worker #[derive(Clone)] pub struct ExecutionChannels { /// Broadcast channel for new slot execution outputs pub slot_execution_output_sender: tokio::sync::broadcast::Sender, + #[cfg(feature = "execution-trace")] + /// Broadcast channel for execution traces (abi call stacks, boolean true if the slot is finalized, false otherwise) + pub slot_execution_traces_sender: tokio::sync::broadcast::Sender<(SlotAbiCallStack, bool)>, } diff --git a/massa-execution-exports/src/controller_traits.rs b/massa-execution-exports/src/controller_traits.rs index d64f8274f8b..ff51154f27a 100644 --- a/massa-execution-exports/src/controller_traits.rs +++ b/massa-execution-exports/src/controller_traits.rs @@ -5,6 +5,7 @@ use crate::types::{ ExecutionBlockMetadata, ExecutionQueryRequest, ExecutionQueryResponse, ReadOnlyExecutionRequest, }; + use crate::ExecutionError; use crate::{ExecutionAddressInfo, ReadOnlyExecutionOutput}; use massa_models::address::Address; @@ -20,6 +21,9 @@ use massa_models::stats::ExecutionStats; use std::collections::BTreeMap; use std::collections::HashMap; +#[cfg(feature = "execution-trace")] +use crate::types::{AbiTrace, SlotAbiCallStack, Transfer}; + #[cfg_attr(feature = "test-exports", mockall::automock)] /// interface that communicates with the execution worker thread pub trait ExecutionController: Send + Sync { @@ -111,6 +115,18 @@ pub trait ExecutionController: Send + Sync { /// Get execution statistics fn get_stats(&self) -> ExecutionStats; + #[cfg(feature = "execution-trace")] + /// Get the abi call stack for a given operation id + fn get_operation_abi_call_stack(&self, operation_id: OperationId) -> Option>; + + #[cfg(feature = "execution-trace")] + /// Get the abi call stack for a given slot + fn get_slot_abi_call_stack(&self, slot: Slot) -> Option; + + #[cfg(feature = "execution-trace")] + /// Get the all transfers of MAS for a given slot + fn get_transfers_for_slot(&self, slot: Slot) -> Option>; + /// Returns a boxed clone of self. /// Useful to allow cloning `Box`. fn clone_box(&self) -> Box; diff --git a/massa-execution-exports/src/lib.rs b/massa-execution-exports/src/lib.rs index a49d73f9982..b1cee610585 100644 --- a/massa-execution-exports/src/lib.rs +++ b/massa-execution-exports/src/lib.rs @@ -71,3 +71,8 @@ pub use types::{ #[cfg(any(feature = "test-exports", feature = "gas_calibration"))] pub mod test_exports; + +#[cfg(feature = "execution-trace")] +pub use types::{ + AbiTrace, SCRuntimeAbiTraceType, SCRuntimeAbiTraceValue, SlotAbiCallStack, Transfer, +}; diff --git a/massa-execution-exports/src/settings.rs b/massa-execution-exports/src/settings.rs index 0bda6e34ba4..2b47694e0dc 100644 --- a/massa-execution-exports/src/settings.rs +++ b/massa-execution-exports/src/settings.rs @@ -94,4 +94,10 @@ pub struct ExecutionConfig { pub max_event_size: usize, /// chain id pub chain_id: u64, + /// whether slot execution traces broadcast is enabled + pub broadcast_traces_enabled: bool, + /// slot execution traces channel capacity + pub broadcast_slot_execution_traces_channel_capacity: usize, + /// Max execution traces slot to keep in trace history cache + pub max_execution_traces_slot_limit: usize, } diff --git a/massa-execution-exports/src/test_exports/config.rs b/massa-execution-exports/src/test_exports/config.rs index 55b61e7272d..12cfbc95f9d 100644 --- a/massa-execution-exports/src/test_exports/config.rs +++ b/massa-execution-exports/src/test_exports/config.rs @@ -69,6 +69,9 @@ impl Default for ExecutionConfig { max_function_length: 1000, max_parameter_length: 1000, chain_id: *CHAINID, + broadcast_traces_enabled: true, + broadcast_slot_execution_traces_channel_capacity: 5000, + max_execution_traces_slot_limit: 5000, } } } diff --git a/massa-execution-exports/src/types.rs b/massa-execution-exports/src/types.rs index aa87139650b..431f7ca73aa 100644 --- a/massa-execution-exports/src/types.rs +++ b/massa-execution-exports/src/types.rs @@ -21,6 +21,17 @@ use massa_pos_exports::ProductionStats; use massa_storage::Storage; use std::collections::{BTreeMap, BTreeSet}; +#[cfg(feature = "execution-trace")] +use massa_models::prehash::PreHashMap; +#[cfg(feature = "execution-trace")] +pub use massa_sc_runtime::AbiTrace as SCRuntimeAbiTrace; +#[cfg(feature = "execution-trace")] +pub use massa_sc_runtime::AbiTraceType as SCRuntimeAbiTraceType; +#[cfg(feature = "execution-trace")] +pub use massa_sc_runtime::AbiTraceValue as SCRuntimeAbiTraceValue; +#[cfg(feature = "execution-trace")] +use std::collections::VecDeque; + /// Metadata needed to execute the block #[derive(Clone, Debug)] pub struct ExecutionBlockMetadata { @@ -203,6 +214,118 @@ pub struct ExecutionAddressInfo { pub cycle_infos: Vec, } +#[cfg(feature = "execution-trace")] +/// A trace of an abi call + its parameters + the result +#[derive(Debug, Clone)] +pub struct AbiTrace { + /// Abi name + pub name: String, + /// Abi parameters + pub parameters: Vec, + /// Abi return value + pub return_value: SCRuntimeAbiTraceType, + /// Abi sub calls + pub sub_calls: Option>, +} + +#[cfg(feature = "execution-trace")] +impl From for AbiTrace { + fn from(trace: SCRuntimeAbiTrace) -> Self { + Self { + name: trace.name, + parameters: trace.params, + return_value: trace.return_value, + sub_calls: trace.sub_calls.map(|sub_calls| { + sub_calls + .into_iter() + .map(|sub_call| sub_call.into()) + .collect() + }), + } + } +} + +#[cfg(feature = "execution-trace")] +impl AbiTrace { + /// Flatten and filter for abi names in an AbiTrace + pub fn flatten_filter(&self, abi_names: &Vec) -> Vec<&Self> { + let mut filtered: Vec<&Self> = Default::default(); + let mut to_process: VecDeque<&Self> = vec![self].into(); + + while !to_process.is_empty() { + let t = to_process.pop_front(); + if let Some(trace) = t { + if abi_names.iter().find(|t| *(*t) == trace.name).is_some() { + // filtered.extend(&trace) + filtered.push(trace); + } + + if let Some(sub_call) = &trace.sub_calls { + for sc in sub_call.iter().rev() { + to_process.push_front(sc); + } + } + } + } + + filtered + } + + /// This function assumes that the abi trace is a transfer. + /// Calling this function on a non-transfer abi trace will have undefined behavior. + pub fn parse_transfer(&self) -> (String, String, u64) { + let t_from = self + .parameters + .iter() + .find_map(|p| { + if p.name == "from_address" { + if let SCRuntimeAbiTraceType::String(v) = &p.value { + return Some(v.clone()); + } + } + None + }) + .unwrap_or_default(); + let t_to = self + .parameters + .iter() + .find_map(|p| { + if p.name == "to_address" { + if let SCRuntimeAbiTraceType::String(v) = &p.value { + return Some(v.clone()); + } + } + None + }) + .unwrap_or_default(); + let t_amount = self + .parameters + .iter() + .find_map(|p| { + if p.name == "raw_amount" { + if let SCRuntimeAbiTraceType::U64(v) = &p.value { + return Some(v.clone()); + } + } + None + }) + .unwrap_or_default(); + (t_from, t_to, t_amount) + } +} + +#[cfg(feature = "execution-trace")] +#[derive(Debug, Clone)] +/// Structure for all abi calls in a slot +pub struct SlotAbiCallStack { + /// Slot + pub slot: Slot, + /// asc call stacks + pub asc_call_stacks: Vec>, + /// operation call stacks + pub operation_call_stacks: PreHashMap>, +} + /// structure describing the output of the execution of a slot #[derive(Debug, Clone)] pub enum SlotExecutionOutput { @@ -235,6 +358,23 @@ pub struct ExecutionOutput { pub state_changes: StateChanges, /// events emitted by the execution step pub events: EventStore, + /// slot trace + #[cfg(feature = "execution-trace")] + pub slot_trace: Option<(SlotAbiCallStack, Vec)>, +} + +#[cfg(feature = "execution-trace")] +#[derive(Debug, Clone)] +/// structure describing a transfer +pub struct Transfer { + /// From + pub from: Address, + /// To + pub to: Address, + /// Amount + pub amount: Amount, + /// operation id + pub op_id: OperationId, } /// structure describing the output of a read only execution diff --git a/massa-execution-worker/Cargo.toml b/massa-execution-worker/Cargo.toml index f942cbbe152..43ad1d2d43c 100644 --- a/massa-execution-worker/Cargo.toml +++ b/massa-execution-worker/Cargo.toml @@ -39,6 +39,10 @@ benchmarking = [ "tempfile" ] metrics = [] +execution-trace = [ + "schnellru", + "massa_execution_exports/execution-trace", +] [dependencies] anyhow = { workspace = true } @@ -78,6 +82,7 @@ massa_db_worker = { workspace = true, optional = true } tempfile = { workspace = true, optional = true } massa_wallet = { workspace = true } massa-proto-rs = { workspace = true } +schnellru = { workspace = true, optional = true } [dev-dependencies] massa_storage = { workspace = true } diff --git a/massa-execution-worker/src/active_history.rs b/massa-execution-worker/src/active_history.rs index 22bac9efc99..1b4c41ebb6b 100644 --- a/massa-execution-worker/src/active_history.rs +++ b/massa-execution-worker/src/active_history.rs @@ -30,7 +30,7 @@ pub enum SlotIndexPosition { Past, /// out of bounds in the future Future, - /// found in history at a the given index + /// found in history at a given index Found(usize), /// history is empty NoHistory, diff --git a/massa-execution-worker/src/context.rs b/massa-execution-worker/src/context.rs index a3e84f8275d..5deb91dea63 100644 --- a/massa-execution-worker/src/context.rs +++ b/massa-execution-worker/src/context.rs @@ -911,6 +911,8 @@ impl ExecutionContext { block_info, state_changes, events: std::mem::take(&mut self.events), + #[cfg(feature = "execution-trace")] + slot_trace: None, } } @@ -948,7 +950,7 @@ impl ExecutionContext { } /// Creates a new event but does not emit it. - /// Note that this does not increments the context event counter. + /// Note that this does not increment the context event counter. /// /// # Arguments: /// data: the string data that is the payload of the event diff --git a/massa-execution-worker/src/controller.rs b/massa-execution-worker/src/controller.rs index f1381383366..9651b126868 100644 --- a/massa-execution-worker/src/controller.rs +++ b/massa-execution-worker/src/controller.rs @@ -25,6 +25,9 @@ use std::fmt::Display; use std::sync::Arc; use tracing::info; +#[cfg(feature = "execution-trace")] +use massa_execution_exports::{AbiTrace, SlotAbiCallStack, Transfer}; + /// structure used to communicate with execution thread pub(crate) struct ExecutionInputData { /// set stop to true to stop the thread @@ -466,6 +469,33 @@ impl ExecutionController for ExecutionControllerImpl { self.execution_state.read().get_stats() } + #[cfg(feature = "execution-trace")] + fn get_operation_abi_call_stack(&self, operation_id: OperationId) -> Option> { + self.execution_state + .read() + .trace_history + .read() + .fetch_traces_for_op(&operation_id) + } + + #[cfg(feature = "execution-trace")] + fn get_slot_abi_call_stack(&self, slot: Slot) -> Option { + self.execution_state + .read() + .trace_history + .read() + .fetch_traces_for_slot(&slot) + } + + #[cfg(feature = "execution-trace")] + fn get_transfers_for_slot(&self, slot: Slot) -> Option> { + self.execution_state + .read() + .trace_history + .read() + .fetch_transfers_for_slot(&slot) + } + /// Returns a boxed clone of self. /// Allows cloning `Box`, /// see `massa-execution-exports/controller_traits.rs` diff --git a/massa-execution-worker/src/execution.rs b/massa-execution-worker/src/execution.rs index e7a7e468130..fac0f34e891 100644 --- a/massa-execution-worker/src/execution.rs +++ b/massa-execution-worker/src/execution.rs @@ -48,6 +48,15 @@ use std::collections::{BTreeMap, BTreeSet}; use std::sync::Arc; use tracing::{debug, info, trace, warn}; +#[cfg(feature = "execution-trace")] +use crate::trace_history::TraceHistory; +#[cfg(feature = "execution-trace")] +use massa_execution_exports::{AbiTrace, SlotAbiCallStack, Transfer}; +#[cfg(feature = "execution-trace")] +use massa_models::config::MAX_OPERATIONS_PER_BLOCK; +#[cfg(feature = "execution-trace")] +use massa_models::prehash::PreHashMap; + /// Used to acquire a lock on the execution context macro_rules! context_guard { ($self:ident) => { @@ -55,6 +64,11 @@ macro_rules! context_guard { }; } +#[cfg(feature = "execution-trace")] +pub type ExecutionResult = Vec; +#[cfg(not(feature = "execution-trace"))] +pub type ExecutionResult = (); + /// Structure holding consistent speculative and final execution states, /// and allowing access to them. pub(crate) struct ExecutionState { @@ -92,6 +106,8 @@ pub(crate) struct ExecutionState { channels: ExecutionChannels, /// prometheus metrics massa_metrics: MassaMetrics, + #[cfg(feature = "execution-trace")] + pub(crate) trace_history: Arc>, } impl ExecutionState { @@ -165,12 +181,17 @@ impl ExecutionState { final_cursor: last_final_slot, stats_counter: ExecutionStatsCounter::new(config.stats_time_window_duration), module_cache, - config, mip_store, selector, channels, wallet, massa_metrics, + #[cfg(feature = "execution-trace")] + trace_history: Arc::new(RwLock::new(TraceHistory::new( + config.max_execution_traces_slot_limit as u32, + MAX_OPERATIONS_PER_BLOCK, + ))), + config, } } @@ -273,6 +294,25 @@ impl ExecutionState { ); } } + + #[cfg(feature = "execution-trace")] + { + if self.config.broadcast_traces_enabled { + if let Some((slot_trace, _)) = exec_out.slot_trace.clone() { + if let Err(err) = self + .channels + .slot_execution_traces_sender + .send((slot_trace, true)) + { + trace!( + "error, failed to broadcast abi trace for slot {} due to: {}", + exec_out.slot.clone(), + err + ); + } + } + } + } } /// Applies an execution output to the active (non-final) state @@ -372,7 +412,7 @@ impl ExecutionState { block_slot: Slot, remaining_block_gas: &mut u64, block_credits: &mut Amount, - ) -> Result<(), ExecutionError> { + ) -> Result { // check validity period if !(operation .get_validity_range(self.config.operation_validity_period) @@ -419,6 +459,11 @@ impl ExecutionState { // update block credits *block_credits = new_block_credits; + #[cfg(feature = "execution-trace")] + let res = vec![]; + #[allow(clippy::let_unit_value)] + #[cfg(not(feature = "execution-trace"))] + let res = (); // Call the execution process specific to the operation type. let mut execution_result = match &operation.content.op { OperationType::ExecuteSC { .. } => { @@ -427,15 +472,15 @@ impl ExecutionState { OperationType::CallSC { .. } => { self.execute_callsc_op(&operation.content.op, sender_addr) } - OperationType::RollBuy { .. } => { - self.execute_roll_buy_op(&operation.content.op, sender_addr) - } - OperationType::RollSell { .. } => { - self.execute_roll_sell_op(&operation.content.op, sender_addr) - } - OperationType::Transaction { .. } => { - self.execute_transaction_op(&operation.content.op, sender_addr) - } + OperationType::RollBuy { .. } => self + .execute_roll_buy_op(&operation.content.op, sender_addr) + .map(|_| res), + OperationType::RollSell { .. } => self + .execute_roll_sell_op(&operation.content.op, sender_addr) + .map(|_| res), + OperationType::Transaction { .. } => self + .execute_transaction_op(&operation.content.op, sender_addr) + .map(|_| res), }; { @@ -459,12 +504,20 @@ impl ExecutionState { // check execution results match execution_result { - Ok(_) => { + Ok(_value) => { context.insert_executed_op( operation_id, true, Slot::new(operation.content.expire_period, op_thread), ); + #[cfg(feature = "execution-trace")] + { + Ok(_value) + } + #[cfg(not(feature = "execution-trace"))] + { + Ok(()) + } } Err(err) => { // an error occurred: emit error event and reset context to snapshot @@ -480,12 +533,18 @@ impl ExecutionState { operation_id, false, Slot::new(operation.content.expire_period, op_thread), - ) + ); + #[cfg(feature = "execution-trace")] + { + Ok(vec![]) + } + #[cfg(not(feature = "execution-trace"))] + { + Ok(()) + } } } } - - Ok(()) } /// Execute a denunciation in the context of a block. @@ -782,7 +841,7 @@ impl ExecutionState { &self, operation: &OperationType, sender_addr: Address, - ) -> Result<(), ExecutionError> { + ) -> Result { // process ExecuteSC operations only let (bytecode, max_gas, datastore) = match &operation { OperationType::ExecuteSC { @@ -817,7 +876,7 @@ impl ExecutionState { .read() .load_tmp_module(bytecode, *max_gas)?; // run the VM - massa_sc_runtime::run_main( + let _res = massa_sc_runtime::run_main( &*self.execution_interface, module, *max_gas, @@ -828,7 +887,14 @@ impl ExecutionState { error, })?; - Ok(()) + #[cfg(feature = "execution-trace")] + { + Ok(_res.trace.into_iter().map(|t| t.into()).collect()) + } + #[cfg(not(feature = "execution-trace"))] + { + Ok(()) + } } /// Execute an operation of type `CallSC` @@ -843,7 +909,7 @@ impl ExecutionState { &self, operation: &OperationType, sender_addr: Address, - ) -> Result<(), ExecutionError> { + ) -> Result { // process CallSC operations only let (max_gas, target_addr, target_func, param, coins) = match &operation { OperationType::CallSC { @@ -896,7 +962,9 @@ impl ExecutionState { // quit if there is no function to be called if target_func.is_empty() { - return Ok(()); + return Err(ExecutionError::RuntimeError( + "no function to call in the CallSC operation".to_string(), + )); } // Load bytecode. Assume empty bytecode if not found. @@ -923,11 +991,18 @@ impl ExecutionState { } _ => (), } - response.map_err(|error| ExecutionError::VMError { + let _response = response.map_err(|error| ExecutionError::VMError { context: "CallSC".to_string(), error, })?; - Ok(()) + #[cfg(feature = "execution-trace")] + { + Ok(_response.trace.into_iter().map(|t| t.into()).collect()) + } + #[cfg(not(feature = "execution-trace"))] + { + Ok(()) + } } /// Tries to execute an asynchronous message @@ -940,7 +1015,7 @@ impl ExecutionState { &self, message: AsyncMessage, bytecode: Option, - ) -> Result<(), ExecutionError> { + ) -> Result { // prepare execution context let context_snapshot; let bytecode = { @@ -1013,11 +1088,18 @@ impl ExecutionState { self.config.gas_costs.clone(), ); match response { - Ok(Response { init_gas_cost, .. }) => { + Ok(res) => { self.module_cache .write() - .set_init_cost(&bytecode, init_gas_cost); - Ok(()) + .set_init_cost(&bytecode, res.init_gas_cost); + #[cfg(feature = "execution-trace")] + { + Ok(res.trace.into_iter().map(|t| t.into()).collect()) + } + #[cfg(not(feature = "execution-trace"))] + { + Ok(()) + } } Err(error) => { if let VMError::ExecutionError { init_gas_cost, .. } = error { @@ -1054,6 +1136,14 @@ impl ExecutionState { exec_target: Option<&(BlockId, ExecutionBlockMetadata)>, selector: Box, ) -> ExecutionOutput { + #[cfg(feature = "execution-trace")] + let mut slot_trace = SlotAbiCallStack { + slot: *slot, + operation_call_stacks: PreHashMap::default(), + asc_call_stacks: vec![], + }; + #[cfg(feature = "execution-trace")] + let mut transfers = vec![]; // Create a new execution context for the whole active slot let mut execution_context = ExecutionContext::active_slot( self.config.clone(), @@ -1077,8 +1167,16 @@ impl ExecutionState { // Try executing asynchronous messages. // Effects are cancelled on failure and the sender is reimbursed. for (opt_bytecode, message) in messages { - if let Err(err) = self.execute_async_message(message, opt_bytecode) { - debug!("failed executing async message: {}", err); + match self.execute_async_message(message, opt_bytecode) { + Ok(_message_return) => { + #[cfg(feature = "execution-trace")] + { + slot_trace.asc_call_stacks.push(_message_return); + } + } + Err(err) => { + debug!("failed executing async message: {}", err); + } } } @@ -1143,16 +1241,50 @@ impl ExecutionState { // Try executing the operations of this block in the order in which they appear in the block. // Errors are logged but do not interrupt the execution of the slot. for operation in operations.into_iter() { - if let Err(err) = self.execute_operation( + match self.execute_operation( &operation, stored_block.content.header.content.slot, &mut remaining_block_gas, &mut block_credits, ) { - debug!( - "failed executing operation {} in block {}: {}", - operation.id, block_id, err - ); + Ok(_op_return) => { + #[cfg(feature = "execution-trace")] + { + slot_trace + .operation_call_stacks + .insert(operation.id, _op_return); + match &operation.content.op { + OperationType::Transaction { + recipient_address, + amount, + } => { + transfers.push(Transfer { + from: operation.content_creator_address, + to: *recipient_address, + amount: *amount, + op_id: operation.id, + }); + } + OperationType::CallSC { + target_addr, coins, .. + } => { + transfers.push(Transfer { + from: operation.content_creator_address, + to: *target_addr, + amount: *coins, + op_id: operation.id, + }); + } + _ => {} + } + } + } + Err(err) => { + debug!( + "failed executing operation {} in block {}: {}", + operation.id, block_id, err + ); + } } } @@ -1239,9 +1371,25 @@ impl ExecutionState { context_guard!(self).update_production_stats(&producer_addr, *slot, None); } + #[cfg(feature = "execution-trace")] + self.trace_history + .write() + .save_traces_for_slot(*slot, slot_trace.clone()); + #[cfg(feature = "execution-trace")] + self.trace_history + .write() + .save_transfers_for_slot(*slot, transfers.clone()); // Finish slot + #[cfg(not(feature = "execution-trace"))] let exec_out = context_guard!(self).settle_slot(block_info); + #[cfg(feature = "execution-trace")] + let exec_out = { + let mut out = context_guard!(self).settle_slot(block_info); + out.slot_trace = Some((slot_trace, transfers)); + out + }; + // Broadcast a slot execution output to active channel subscribers. if self.config.broadcast_enabled { let slot_exec_out = SlotExecutionOutput::ExecutedSlot(exec_out.clone()); @@ -1297,8 +1445,28 @@ impl ExecutionState { } let exec_out = self.execute_slot(slot, exec_target, selector); + #[cfg(feature = "execution-trace")] + { + if self.config.broadcast_traces_enabled { + if let Some((slot_trace, _)) = exec_out.slot_trace.clone() { + if let Err(err) = self + .channels + .slot_execution_traces_sender + .send((slot_trace, false)) + { + trace!( + "error, failed to broadcast abi trace for slot {} due to: {}", + exec_out.slot.clone(), + err + ); + } + } + } + } + // apply execution output to active state self.apply_active_execution_output(exec_out); + debug!("execute_candidate_slot: execution finished & state applied"); } @@ -1325,6 +1493,7 @@ impl ExecutionState { // check if the final slot execution result is already cached at the front of the speculative execution history let first_exec_output = self.active_history.write().0.pop_front(); + if let Some(exec_out) = first_exec_output { if &exec_out.slot == slot && exec_out.block_info.as_ref().map(|i| i.block_id) == target_id @@ -1353,7 +1522,6 @@ impl ExecutionState { self.active_cursor = self.final_cursor; // execute slot - debug!("execute_final_slot: execution started"); let exec_out = self.execute_slot(slot, exec_target, selector); // apply execution output to final state @@ -1510,7 +1678,7 @@ impl ExecutionState { let execution_output = context_guard!(self).settle_slot(None); let exact_exec_cost = req.max_gas.saturating_sub(exec_response.remaining_gas); - // compute a gas cost, estimating the gas of the last SC call to be max_instance_cost + // compute a gas cost, estimating the gas of the last SC call to be max_instance_cost let corrected_cost = match (context_guard!(self)).gas_remaining_before_subexecution { Some(gas_remaining) => req .max_gas diff --git a/massa-execution-worker/src/lib.rs b/massa-execution-worker/src/lib.rs index fdf51d7059c..030bb929e5e 100644 --- a/massa-execution-worker/src/lib.rs +++ b/massa-execution-worker/src/lib.rs @@ -94,6 +94,9 @@ mod speculative_roll_state; mod stats; mod worker; +#[cfg(feature = "execution-trace")] +mod trace_history; + use massa_db_exports as _; pub use worker::start_execution_worker; diff --git a/massa-execution-worker/src/tests/scenarios_mandatories.rs b/massa-execution-worker/src/tests/scenarios_mandatories.rs index 67e12289c81..a1b883e45f2 100644 --- a/massa-execution-worker/src/tests/scenarios_mandatories.rs +++ b/massa-execution-worker/src/tests/scenarios_mandatories.rs @@ -39,6 +39,13 @@ use std::{cmp::Reverse, collections::BTreeMap, str::FromStr, time::Duration}; use super::universe::{ExecutionForeignControllers, ExecutionTestUniverse}; +#[cfg(feature = "execution-trace")] +use massa_execution_exports::{AbiTrace, SCRuntimeAbiTraceType, SCRuntimeAbiTraceValue}; +#[cfg(feature = "execution-trace")] +use massa_models::operation::OperationId; +#[cfg(feature = "execution-trace")] +use std::thread; + const TEST_SK_1: &str = "S18r2i8oJJyhF7Kprx98zwxAc3W4szf7RKuVMX6JydZz8zSxHeC"; const TEST_SK_2: &str = "S1FpYC4ugG9ivZZbLVrTwWtF9diSRiAwwrVX5Gx1ANSRLfouUjq"; const TEST_SK_3: &str = "S1LgXhWLEgAgCX3nm6y8PVPzpybmsYpi6yg6ZySwu5Z4ERnD7Bu"; @@ -2608,3 +2615,229 @@ fn chain_id() { assert_eq!(events.len(), 1); assert_eq!(events[0].data, format!("Chain id: {}", *CHAINID)); } + +#[cfg(feature = "execution-trace")] +#[test] +fn execution_trace() { + // setup the period duration + let mut exec_cfg = ExecutionConfig::default(); + // Make sure broadcast is enabled as we need it for this test + exec_cfg.broadcast_enabled = true; + + let finalized_waitpoint = WaitPoint::new(); + let mut foreign_controllers = ExecutionForeignControllers::new_with_mocks(); + let keypair = KeyPair::from_str(TEST_SK_1).unwrap(); + selector_boilerplate(&mut foreign_controllers.selector_controller); + expect_finalize_deploy_and_call_blocks( + Slot::new(1, 0), + None, + finalized_waitpoint.get_trigger_handle(), + &mut foreign_controllers.final_state, + ); + final_state_boilerplate( + &mut foreign_controllers.final_state, + foreign_controllers.db.clone(), + &foreign_controllers.selector_controller, + &mut foreign_controllers.ledger_controller, + None, + None, + None, + ); + + let mut universe = ExecutionTestUniverse::new(foreign_controllers, exec_cfg); + universe.deploy_bytecode_block( + &keypair, + Slot::new(1, 0), + include_bytes!("./wasm/execution_trace.wasm"), + //unused + include_bytes!("./wasm/execution_trace.wasm"), + ); + finalized_waitpoint.wait(); + + let mut receiver = universe.broadcast_traces_channel_receiver.take().unwrap(); + let join_handle = thread::spawn(move || { + let exec_traces = receiver.blocking_recv(); + /* + let exec_traces_2 = receiver.blocking_recv(); + return vec![ + exec_traces.expect("Execution output"), + exec_traces_2.expect("Final execution output"), + ]; + */ + return exec_traces; + }); + let broadcast_result_ = join_handle.join().expect("Nothing received from thread"); + let (broadcast_result, _) = broadcast_result_.unwrap(); + + let abi_name_1 = "assembly_script_generate_event"; + let traces_1: Vec<(OperationId, Vec)> = broadcast_result + .operation_call_stacks + .iter() + .filter_map(|(k, v)| { + Some(( + k.clone(), + v.iter() + .filter(|t| t.name == abi_name_1) + .cloned() + .collect::>(), + )) + }) + .collect(); + + assert_eq!(traces_1.len(), 1); // Only one op + assert_eq!(traces_1.first().unwrap().1.len(), 1); // Only one generate_event + assert_eq!(traces_1.first().unwrap().1.get(0).unwrap().name, abi_name_1); + + let abi_name_2 = "assembly_script_transfer_coins"; + let traces_2: Vec<(OperationId, Vec)> = broadcast_result + .operation_call_stacks + .iter() + .filter_map(|(k, v)| { + Some(( + k.clone(), + v.iter() + .filter(|t| t.name == abi_name_2) + .cloned() + .collect::>(), + )) + }) + .collect(); + + assert_eq!(traces_2.len(), 1); // Only one op + assert_eq!(traces_2.first().unwrap().1.len(), 1); // Only one transfer_coins + assert_eq!(traces_2.first().unwrap().1.get(0).unwrap().name, abi_name_2); + // println!( + // "params: {:?}", + // traces_2.first().unwrap().1.get(0).unwrap().parameters + // ); + assert_eq!( + traces_2.first().unwrap().1.get(0).unwrap().parameters, + vec![ + SCRuntimeAbiTraceValue { + name: "from_address".to_string(), + value: SCRuntimeAbiTraceType::String( + "AU1TyzwHarZMQSVJgxku8co7xjrRLnH74nFbNpoqNd98YhJkWgi".to_string() + ), + }, + SCRuntimeAbiTraceValue { + name: "to_address".to_string(), + value: SCRuntimeAbiTraceType::String( + "AU12E6N5BFAdC2wyiBV6VJjqkWhpz1kLVp2XpbRdSnL1mKjCWT6oR".to_string() + ), + }, + SCRuntimeAbiTraceValue { + name: "raw_amount".to_string(), + value: SCRuntimeAbiTraceType::U64(2000) + } + ] + ); +} + +#[cfg(feature = "execution-trace")] +#[test] +fn execution_trace_nested() { + // setup the period duration + let mut exec_cfg = ExecutionConfig::default(); + // Make sure broadcast is enabled as we need it for this test + exec_cfg.broadcast_enabled = true; + + let finalized_waitpoint = WaitPoint::new(); + let mut foreign_controllers = ExecutionForeignControllers::new_with_mocks(); + let keypair = KeyPair::from_str(TEST_SK_1).unwrap(); + selector_boilerplate(&mut foreign_controllers.selector_controller); + expect_finalize_deploy_and_call_blocks( + Slot::new(1, 0), + None, + finalized_waitpoint.get_trigger_handle(), + &mut foreign_controllers.final_state, + ); + final_state_boilerplate( + &mut foreign_controllers.final_state, + foreign_controllers.db.clone(), + &foreign_controllers.selector_controller, + &mut foreign_controllers.ledger_controller, + None, + None, + None, + ); + + // let rt = tokio::runtime::Runtime::new().unwrap(); + + let mut universe = ExecutionTestUniverse::new(foreign_controllers, exec_cfg); + universe.deploy_bytecode_block( + &keypair, + Slot::new(1, 0), + include_bytes!("./wasm/et_deploy_sc.wasm"), + include_bytes!("./wasm/et_init_sc.wasm"), + ); + finalized_waitpoint.wait(); + + let mut receiver = universe.broadcast_traces_channel_receiver.take().unwrap(); + let join_handle = thread::spawn(move || { + // Execution Output + let exec_traces = receiver.blocking_recv(); + return exec_traces; + }); + let broadcast_result_ = join_handle.join().expect("Nothing received from thread"); + + // println!("b r: {:?}", broadcast_result_); + let (broadcast_result, _) = broadcast_result_.unwrap(); + + let abi_name_1 = "assembly_script_call"; + let traces_1: Vec<(OperationId, Vec)> = broadcast_result + .operation_call_stacks + .iter() + .filter_map(|(k, v)| { + Some(( + k.clone(), + v.iter() + .filter(|t| t.name == abi_name_1) + .cloned() + .collect::>(), + )) + }) + .collect(); + + assert_eq!(traces_1.len(), 1); // Only one op + assert_eq!(traces_1.first().unwrap().1.len(), 1); // Only one transfer_coins + assert_eq!(traces_1.first().unwrap().1.get(0).unwrap().name, abi_name_1); + + // filter sub calls + let abi_name_2 = "assembly_script_transfer_coins"; + let sub_call: Vec = traces_1 + .first() + .unwrap() + .1 + .get(0) + .unwrap() + .sub_calls + .as_ref() + .unwrap() + .iter() + .filter(|a| a.name == abi_name_2) + .cloned() + .collect(); + + // println!("params: {:?}", sub_call.get(0).unwrap().parameters); + assert_eq!( + sub_call.get(0).unwrap().parameters, + vec![ + SCRuntimeAbiTraceValue { + name: "from_address".to_string(), + value: SCRuntimeAbiTraceType::String( + "AS1Bc3kZ6LhPLJvXV4vcVJLFRExRFbkPWD7rCg9aAdQ1NGzRwgnu".to_string() + ) + }, + SCRuntimeAbiTraceValue { + name: "to_address".to_string(), + value: SCRuntimeAbiTraceType::String( + "AU12E6N5BFAdC2wyiBV6VJjqkWhpz1kLVp2XpbRdSnL1mKjCWT6oR".to_string() + ) + }, + SCRuntimeAbiTraceValue { + name: "raw_amount".to_string(), + value: SCRuntimeAbiTraceType::U64(1425) + } + ] + ); +} diff --git a/massa-execution-worker/src/tests/tests_active_history.rs b/massa-execution-worker/src/tests/tests_active_history.rs index 72629dac12c..7e4d9c926e1 100644 --- a/massa-execution-worker/src/tests/tests_active_history.rs +++ b/massa-execution-worker/src/tests/tests_active_history.rs @@ -54,6 +54,8 @@ fn test_active_history_deferred_credits() { execution_trail_hash_change: Default::default(), }, events: Default::default(), + #[cfg(feature = "execution-trace")] + slot_trace: Default::default(), }; let active_history = ActiveHistory(VecDeque::from([exec_output_1])); diff --git a/massa-execution-worker/src/tests/universe.rs b/massa-execution-worker/src/tests/universe.rs index ce06cc6760d..56a66de5fa0 100644 --- a/massa-execution-worker/src/tests/universe.rs +++ b/massa-execution-worker/src/tests/universe.rs @@ -8,7 +8,7 @@ use massa_db_exports::{MassaDBConfig, MassaDBController, ShareableMassaDBControl use massa_db_worker::MassaDB; use massa_execution_exports::{ ExecutionBlockMetadata, ExecutionChannels, ExecutionConfig, ExecutionController, - ExecutionError, ExecutionManager, + ExecutionError, ExecutionManager, SlotExecutionOutput, }; use massa_final_state::{FinalStateController, MockFinalStateController}; use massa_ledger_exports::MockLedgerControllerWrapper; @@ -40,6 +40,9 @@ use tokio::sync::broadcast; use crate::start_execution_worker; +#[cfg(feature = "execution-trace")] +use massa_execution_exports::SlotAbiCallStack; + pub struct ExecutionForeignControllers { pub selector_controller: Box, pub final_state: Arc>, @@ -75,6 +78,10 @@ pub struct ExecutionTestUniverse { pub storage: Storage, pub final_state: Arc>, module_manager: Box, + pub broadcast_channel_receiver: Option>, + #[cfg(feature = "execution-trace")] + pub broadcast_traces_channel_receiver: + Option>, } impl TestUniverse for ExecutionTestUniverse { @@ -88,15 +95,20 @@ impl TestUniverse for ExecutionTestUniverse { warn_announced_version_ratio: Ratio::new_raw(30, 100), }; let mip_store = MipStore::try_from(([], mip_stats_config)).unwrap(); - let (tx, _) = broadcast::channel(16); + let (tx, rx) = broadcast::channel(16); + #[cfg(feature = "execution-trace")] + let (tx_traces, rx_traces) = broadcast::channel(16); + let exec_channels = ExecutionChannels { + slot_execution_output_sender: tx, + #[cfg(feature = "execution-trace")] + slot_execution_traces_sender: tx_traces, + }; let (module_manager, module_controller) = start_execution_worker( config.clone(), controllers.final_state.clone(), controllers.selector_controller, mip_store, - ExecutionChannels { - slot_execution_output_sender: tx, - }, + exec_channels, Arc::new(RwLock::new(create_test_wallet(Some(PreHashMap::default())))), MassaMetrics::new( false, @@ -112,6 +124,9 @@ impl TestUniverse for ExecutionTestUniverse { final_state: controllers.final_state, module_controller, module_manager, + broadcast_channel_receiver: Some(rx), + #[cfg(feature = "execution-trace")] + broadcast_traces_channel_receiver: Some(rx_traces), }; universe.initialize(); universe diff --git a/massa-execution-worker/src/tests/wasm/et_deploy_sc.wasm b/massa-execution-worker/src/tests/wasm/et_deploy_sc.wasm new file mode 100644 index 0000000000000000000000000000000000000000..1ff647f3d88a5b89e04e63fea4a4f79fb7a4ff30 GIT binary patch literal 5782 zcma)AU2I%O8J)R5yLWfi-s{G+Y8{8Uo3yxsBwpM5SBl6~DkLqH5d{fZNSn32PQA9* zj(6=esAMfjDMGF2Lmw)X2TMTPhgNNQs!ARzkdQ&B1Rh!>qN-3HQiwz)c*sLVIA`u| z?6^^7Uwu3G-uce=duHa!qSjiHLI^pUIw?D{bJFc}I^v|$;k^(i1qO*19$sS*@|?Xr zCp*qlPTwax|d9^JP{!*>gstw_%zBIjfw%VFmonL8JPu1Ji<(29}{cKBkpRO}o zYuAK`b?)44XJ)ltYu915r*UAn#;JO36g+A78(I-F?_QW;O%!eqbUq915Wh zikviFBj-S4U&E}NHr_%`G;;2jMb0x}+hqAN=L4uc?P`C)z)ZtzY&>z=gzGsUZ~cdc zjtL`S5_HimNwA@s2PD7zSp-Mi}H=x_1mPp0-1v9W9Ga z1_NG(+J&B)l5WR2#_OQ$cTrWLgfhbTVG7win>OO(G>n?SczA=*5JGnvMlq{|Moe^q z2<>2Xv{3ihkj>!2h!aktSg$E_DP`Uk=H56=EgOendc>-YI6+tEBV~Rm&EZ&Q1a@UZ zJNH5HLszCctip(sfv!f|Q74-HC}X}a&FvN9HZNdehvZsyI-VkB~ z9|LpPxO8?# zTKU_wA|DZLLc6*@R6?I^MKM5;DS+?{&{*IgCR ze^>MXhy-y&=#nkmExp^98FX^jYWb}&_tPzwgNNDfazFv%a%sB>Y=^Wn9*w$Xj28*_ z7`7en1lWLPh3E{IsoRKHP@8iE=5j%IxB|E)Ko=t(8e)!^(uw0kDstHZ>Imv2paY=} zZYanh!#?bZvJp8M2^Rb6Eh%NkMR4ZXw?(|U;ARC5?VDzV zsDZFMssu}2^AFJkU(M44jRM-p=auc}2lNlmmt0|uOH9!s@eX%n;`y?tkG_xzd<8!W zH|cHtu6FDx)?~J@ve2CxZG2JUjCeEQ_#NjQitB_fSmv1wGDlTl+idoQSj&K$OMM~M zv2!d;tNQ#KqL{U4jy3Jv)@Lj!Hd}GH=jwIV5JdOJf}`IFF?Q)m%7omI;GJj-Jj%OY~jYr~Onj+qNtcG_9T0WKF1kvQHbbdnq3CTHTk zkh%)SNP6u(gcLpUcQf0tDtP91j9r>Wu4AQev&V|@YyggVd6-Tloap3YqgONmb2v)u z_sb4*4$N_puy_v0+cRs~V2>CF3p8>;{%uX9gS8;rcAg8E^FCwnZ4AjB4tBthPIpT& z=T6&?7wk5nXT%{HrXy?;jQdhweCDLWjHrpWn8#fcX4)8+F)rfmsyHR;!u%n|{v}X%6MF!14)gP3 zS{#K=8^105uHpFz#umJ#l>7p`rPM(p_Crp<`Z{b9H*4+J@#Ylt9~0&x^by@wN*$*D zJvVuv?Knb4x{9kN&h|Wg4CH=cew0?y7v=&6JiU}wigrB6eYniU`27|JDa>UI@OCAA zD{tE#rjgMct~xw+^SYoc@+GA{Py2@;%xe}pk0YmLAli1E!!P4xhN$4>7vw;K|P$%n29rNb!%jP+84M?-fFRha?0 z$2it*-ZH2KzpNSgaR%?mPYpYD8xOg&XrqZ|$s8<#tNpg;*ZS2ydd)x>{W9=P%f+lZ zKmxiD{`EtsB$E6QVk3$5lC_b)kyQ({F$>{3c+}=dJEyQ>?uA`uJPXV0cDz2;#1dj^ zLQ~^z&F0aInUS}VN!@B_F=kP5e&hJ^GcSj>zi))MVj{W38XSdndZD!&OuPytn0_r( zDMm~g#kw0%KXFHKH!AXwF~~S%0#bk!Atgu|Qn4C&{N|yThh83fdFbV#mxo>+dU@#4 z3N^;;HICm2NC8rWlptkD#d@L@YK+@!0>1@F5mJJbAr%`9J<$p^ChS$fZxK?0lpz%x zF{7a;TA@b4UPb(tAZ17eC}zf}843-F=|FcZ?r@$b74?9`#o$+gX*b(o#Mu{*Dj5MXQSx- zC^$U|?u~*4qgX}k?_Ro>2R;vc9{4=)dEoQF@3pIAz>fhx2K*TCW5ACAPhRb%+vC8G z13wP@IPl}Zj{{Fm?u7*tz)t`_0sI8;6TnXZPd@L39R=VEz!!io0AB#U0Q_EAQv|*U zd=dB}@I~N@!0(k!CE!cImw+z;Ujn`a{9ajB2EGh@8Tc~rW#G%e@0EQO;48pafUf{w z0loq}Sw&{ByX+9F%t|p^Moh0n*6cNnU!J}^b9gGu!6I2nW|1B2FuTM`vs%oUQPXn` z!fOUH3#miq@I*$Fm1Gv#!49)atTd~|j2SgO6U}QDQisgpj;n!;CM(G-vV$FFmsn|5 ziy1R&dM26|&oiFtTuEFFWHebxW|1B2FuTM`vs%oUQPVTgy!c%ZuR^XQt_CuitR%C@ z4tAJbVx?IvX3VJRnP_uyzmng#~F^lgD~Lx^J4t8HLOxucnKi*3c^1x`*@7& z8)B{vt9y9%ITB(ZDSjo!e-Cs=jS=6s_lB+IwbhyWBefO$kMq>GzVWp?jvU3mhiB_6 Ti_83n@sTt5Z}gF++RFa{%HZ%1 literal 0 HcmV?d00001 diff --git a/massa-execution-worker/src/tests/wasm/et_init_sc.wasm b/massa-execution-worker/src/tests/wasm/et_init_sc.wasm new file mode 100644 index 0000000000000000000000000000000000000000..20f559b76ce58027e09758d60ac3ab2c17edd239 GIT binary patch literal 1083 zcmZ`(J#Q3O6g}_F=k8mS5mA%^Fq4H47m2MziVK7U7ZgOWL&3L1F`iuy&a8K5)-z*U zMu>4qK|wkSij?^glt^(wK^3LU4@f~tVac3lFcJmxbnl&a?>Xn*k=`4m^+W+sC;esB zVi{O=TRvFUZL3=PM_@s<`kv!t#io}I%Bq3(CTU%#Gj!QxFy2n;VUmd`1^F|S|3di(G7|xYm1xv=W$)ASH*&&OPoeLKJL>D$l`ls!T^Dz) z&%JjZeP5zRzOc!DK;77ny*OACb{Wae&ck(yHv0jy!PR3IAh<$E@ZklBBu}8F_w>6jf;PnNE?@%1K^@^ zT6x|(&CcQxs^n=`u-6476!(Bj<-@8b3VVj+Slc0ByEE94IPBlfQOJ|fsB@5Kp_$h_ zRI@a}n5)DPDH`N_D=5)0mW*R&E36`eyn4_5H#FZ`?hNe_>p2FPC#IpV={LBaW31V& zW7PNT)-k6n<2dbCs&7)$@>UM?GiFwa{~hvzcuu$5F+W)RnNK=c?e`PfNh_`t+lQU@ z$UF{t+%?LBJZIqUudXpx$EBU+k{8T#p53nb+U9vUsA+WyPJ5rD$0x>jQC)6S!|b-y4H WbMsk~+id2MX;e)n(Ahn-Sr)ynjK&e1= zj@G&7vDbc_z3)A-)CO%OBI>ThvdFR;%Ca-WpDbI$p&BlW+FrKz=iA&@L+gTL2c0dq zw$kl~V!7?wU{ISy7qnN_&sPWaezO-=>t|}sPPN$(XDo2^e+5nlouFR}gKDr9bV5;I zA{O>*oxy6*uhzTG&Ol;zB5u19htIC9M8&49p)#U}Vn&B`{NA|JhFyud)oLd=3**&l z5A%4uTHWk$l$feQ)%to+4`oVMtNk!+R#$3+z*ez#(C+rnTar-8X?0uj*NOP9Qn8^Y z=96pJuGuG@6J{r2K1i5%#2ihDXzZj~X~*mx+@3SHa`$#g+s0|7EzdDuZJFMNan{n9 zyWf>`%%nmy!^LzQdcBzEdhs;_D=jk`UDzn#Kc*DF~!m`l?cDXrJ(ux%3BFgt}Ci&85+6e{;rCS1*2N(ZnLT(bNBY&t5bMN{-(A^E_mA z6p^t~PC7cHluN^-(X%i9aymRLgRFm=d zpS=G5*OttoXyfYd|8?LO|f8V*u9zn*ne&4DUGiabfVNWY%5%%`(+%3PU6!p-(Z+D7TzxOU7+aUyu7 z;J!0Ha&vXov-gsM`tguJ%M(tZC56H_abyMAy_xkTP`2>)Ov=^{CfXJi;iR&M=GDV@ zh!Nv5iRMz$L`qD;I1!)axxZ`SDSH|UJd1A>%tjBsUo65KZWHs%gjU}c^Grg|TOx7{ zFtU!;mAcd^Ega<{=ZJAE4jF{5YNi zSew%7ZdjYrx07)McmUeBpp(2tq(8vQX~>@ua|QD7Zg)z5gYsWLV1c^F9@5i3T20Ph zx3mZ55iw6DwQ|KghX-aaC$**?2lytMxq|fz(4W+6TyFr3^AYrY_(MOA$zAY@x*J$I zhw(W6O=;r{I*{}*+KeLg5NUup`%uoJr-Ucjh}DVW%SjP-^*Ap5MR9F{+r)XygG(#a z*8eOnFd5gho}$M@2JubnU)dRqGU5YSAP3}u0#F1>KpCh+5NYN13nYB}_Ipa2ws5>N&zktJ%OL@sK1^aY>@lz=i&iM*jDYN13vY6bK~pahhG zO5`!Up(SdeL?LQL^d+DSRKQ|n^qL;g8Z{LFwh~YVDi|?(MnWLFHsn1sPd|f6Vd!&z=vy555n+555n+5B{WC&4Ql=KMQ^q{4Drc@L5-rWIG3b z4*VSWIq-Af=fG!8PErMV@blp3!Ow%A2R{!!>v@vuD1cu8zW{y#`~vs|@F%UBBKSq{ zi{KZ*FM?kLf6{6yfnNf@1bzwp68I(XC#|wF_+{|R;FrNKgI@-J((0>#Uje@Yeg*sr z_!aP3RjdqVml@(I^Q0IpJ*HJMYf)Q4&#!9(2!K^+WL2`VSRKqTv&56;X)$7YP0KZa ztqwGR09eJ070s$-WwAP#VP=UZ&C_DU^qQ8*W@`Weu!<4i239nyl9k2kV1}6`o-|L3 z5z}j0CYvn)R?+iK;@iNAW>vDXSRKqTv&56;X)$7YP0M7nt)h+JLcU3S8(7h-N>&!D zgBfO)c+xyAMoh11nQZgcO%)2+~MSc_RkG|mi%JPfIuNS{GJb%VcyQ{c`&frF9 z$r^5jHg1Kk^kf5f!uWRG!cB7yH{Xx&#hH!n*EeSME!2G@a1Xe(=q~fnH*o9WU#a&X zhg;-hJn-Mb9v-*_pS)RbkktZ=JccvM%DwRC`|xT(?!#IG+On{A9-0>9aqK-V4?=ze zp8WuJdho%=_!QQAkbDCDiaZS7Bk=kZ_S$k1^7lacQLww%<8PfqQ!wd$2J;M-co$m= z9sWGIJ2mKT_Upm-YCZh&IsMqfrw$*V56%Vk&9K?+RPoEI9vnZ5AArZ(wcdXL^j?eA literal 0 HcmV?d00001 diff --git a/massa-execution-worker/src/trace_history.rs b/massa-execution-worker/src/trace_history.rs new file mode 100644 index 00000000000..7a5c2576eab --- /dev/null +++ b/massa-execution-worker/src/trace_history.rs @@ -0,0 +1,59 @@ +use massa_execution_exports::{AbiTrace, SlotAbiCallStack, Transfer}; +use massa_models::{operation::OperationId, slot::Slot}; +use schnellru::{ByLength, LruMap}; + +/// Execution traces history +pub struct TraceHistory { + /// Execution traces history by slot + trace_per_slot: LruMap, + /// Transfer coins by slot + transfer_per_slot: LruMap>, + /// Execution op linked to slot + op_per_slot: LruMap, +} + +impl TraceHistory { + pub fn new(max_slot_size_cache: u32, op_per_slot: u32) -> Self { + Self { + trace_per_slot: LruMap::new(ByLength::new(max_slot_size_cache)), + op_per_slot: LruMap::new(ByLength::new(max_slot_size_cache * op_per_slot)), + transfer_per_slot: LruMap::new(ByLength::new(max_slot_size_cache * op_per_slot)), + } + } + + /// Fetch execution traces for a given slot + pub(crate) fn fetch_traces_for_slot(&self, slot: &Slot) -> Option { + self.trace_per_slot.peek(slot).cloned() + } + + /// Fetch slot for a given operation + pub(crate) fn fetch_traces_for_op(&self, op_id: &OperationId) -> Option> { + self.op_per_slot + .peek(op_id) + .and_then(|slot| { + self.trace_per_slot + .peek(slot) + .map(|trace| trace.operation_call_stacks.get(op_id).cloned()) + }) + // .flatten() + .flatten() + } + + /// Fetch transfer for a given slot + pub(crate) fn fetch_transfers_for_slot(&self, slot: &Slot) -> Option> { + self.transfer_per_slot.peek(slot).cloned() + } + + /// Save execution traces for a given slot + pub(crate) fn save_traces_for_slot(&mut self, slot: Slot, traces: SlotAbiCallStack) { + for (op_id, _) in traces.operation_call_stacks.iter() { + self.op_per_slot.insert(*op_id, slot); + } + self.trace_per_slot.insert(slot, traces); + } + + /// Save transfer for a given slot + pub(crate) fn save_transfers_for_slot(&mut self, slot: Slot, transfers: Vec) { + self.transfer_per_slot.insert(slot, transfers); + } +} diff --git a/massa-grpc/Cargo.toml b/massa-grpc/Cargo.toml index 08909e33acb..3b362a7fed5 100644 --- a/massa-grpc/Cargo.toml +++ b/massa-grpc/Cargo.toml @@ -8,6 +8,7 @@ homepage = "https://massa.net" documentation = "https://docs.massa.net/" [features] +execution-trace = ["serde_json"] test-exports = [] [dependencies] @@ -28,6 +29,7 @@ tracing = { workspace = true } parking_lot = { workspace = true, "features" = ["deadlock_detection"] } h2 = { workspace = true } itertools = { workspace = true } +serde_json = { workspace = true, optional = true } # test massa_consensus_exports = { workspace = true } diff --git a/massa-grpc/src/handler.rs b/massa-grpc/src/handler.rs index 211feeb4600..4c890fa0ff0 100644 --- a/massa-grpc/src/handler.rs +++ b/massa-grpc/src/handler.rs @@ -16,13 +16,21 @@ use crate::public::{ get_stakers, get_status, get_transactions_throughput, query_state, search_blocks, search_endorsements, search_operations, }; + +#[cfg(feature = "execution-trace")] +use crate::public::{get_operation_abi_call_stacks, get_slot_abi_call_stacks, get_slot_transfers}; +#[cfg(feature = "execution-trace")] +use crate::stream::new_slot_transfers::new_slot_transfers; + use crate::server::{MassaPrivateGrpc, MassaPublicGrpc}; use crate::stream::{ new_blocks::{new_blocks, NewBlocksStreamType}, new_endorsements::{new_endorsements, NewEndorsementsStreamType}, new_filled_blocks::{new_filled_blocks, NewFilledBlocksStreamType}, new_operations::{new_operations, NewOperationsStreamType}, + new_slot_abi_call_stacks::{new_slot_abi_call_stacks, NewSlotABICallStacksStreamType}, new_slot_execution_outputs::{new_slot_execution_outputs, NewSlotExecutionOutputsStreamType}, + new_slot_transfers::NewSlotTransfersStreamType, send_blocks::SendBlocksStreamType, send_endorsements::{send_endorsements, SendEndorsementsStreamType}, send_operations::{send_operations, SendOperationsStreamType}, @@ -40,6 +48,68 @@ impl grpc_api::public_service_server::PublicService for MassaPublicGrpc { Ok(tonic::Response::new(execute_read_only_call(self, request)?)) } + #[cfg(feature = "execution-trace")] + async fn get_operation_abi_call_stacks( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + Ok(tonic::Response::new(get_operation_abi_call_stacks( + self, request, + )?)) + } + + #[cfg(not(feature = "execution-trace"))] + async fn get_operation_abi_call_stacks( + &self, + _request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + Err(tonic::Status::unimplemented("feature not enabled")) + } + + #[cfg(feature = "execution-trace")] + async fn get_slot_abi_call_stacks( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + Ok(tonic::Response::new(get_slot_abi_call_stacks( + self, request, + )?)) + } + + #[cfg(not(feature = "execution-trace"))] + async fn get_slot_abi_call_stacks( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + Err(tonic::Status::unimplemented("feature not enabled")) + } + + #[cfg(feature = "execution-trace")] + async fn get_slot_transfers( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + Ok(tonic::Response::new(get_slot_transfers(self, request)?)) + } + + #[cfg(not(feature = "execution-trace"))] + async fn get_slot_transfers( + &self, + _request: tonic::Request, + ) -> std::result::Result, tonic::Status> + { + Err(tonic::Status::unimplemented("feature not enabled")) + } + /// handler for get blocks async fn get_blocks( &self, @@ -218,6 +288,44 @@ impl grpc_api::public_service_server::PublicService for MassaPublicGrpc { )) } + type NewSlotTransfersStream = NewSlotTransfersStreamType; + + #[cfg(feature = "execution-trace")] + /// handler for subscribe new slot transfers stream + async fn new_slot_transfers( + &self, + request: tonic::Request>, + ) -> Result, tonic::Status> { + Ok(tonic::Response::new( + new_slot_transfers(self, request).await?, + )) + } + + #[cfg(not(feature = "execution-trace"))] + /// handler for subscribe new slot transfers stream + async fn new_slot_transfers( + &self, + _request: tonic::Request>, + ) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("feature not enabled")) + } + + type NewSlotABICallStacksStream = NewSlotABICallStacksStreamType; + + /// handler for subscribe new slot abi call stacks stream + async fn new_slot_abi_call_stacks( + &self, + request: tonic::Request>, + ) -> Result, tonic::Status> { + if cfg!(feature = "execution-trace") { + Ok(tonic::Response::new( + new_slot_abi_call_stacks(self, request).await?, + )) + } else { + Err(tonic::Status::unimplemented("feature not enabled")) + } + } + type SendBlocksStream = SendBlocksStreamType; /// handler for send_blocks_stream diff --git a/massa-grpc/src/lib.rs b/massa-grpc/src/lib.rs index 477f055b357..73aabfd7c30 100644 --- a/massa-grpc/src/lib.rs +++ b/massa-grpc/src/lib.rs @@ -23,6 +23,9 @@ use tonic_health as _; use tonic_reflection as _; use tonic_web as _; +#[cfg(feature = "execution-trace")] +use serde_json as _; + /// gRPC configuration pub mod config; /// models error diff --git a/massa-grpc/src/public.rs b/massa-grpc/src/public.rs index 8f6b2bb65f0..09eff717b8a 100644 --- a/massa-grpc/src/public.rs +++ b/massa-grpc/src/public.rs @@ -22,7 +22,7 @@ use massa_models::operation::{OperationId, SecureShareOperation}; use massa_models::prehash::{PreHashMap, PreHashSet}; use massa_models::slot::Slot; use massa_models::timeslots::get_latest_block_slot_at_timestamp; -use massa_proto_rs::massa::api::v1 as grpc_api; +use massa_proto_rs::massa::api::v1::{self as grpc_api}; use massa_proto_rs::massa::model::v1::{self as grpc_model, read_only_execution_call}; use massa_serialization::{DeserializeError, Deserializer}; use massa_time::MassaTime; @@ -30,6 +30,17 @@ use massa_versioning::versioning_factory::{FactoryStrategy, VersioningFactory}; use std::collections::HashSet; use std::str::FromStr; +#[cfg(feature = "execution-trace")] +use massa_execution_exports::AbiTrace; +#[cfg(feature = "execution-trace")] +use massa_proto_rs::massa::api::v1::abi_call_stack_element_parent::CallStackElement; +#[cfg(feature = "execution-trace")] +use massa_proto_rs::massa::api::v1::{ + AbiCallStack, AbiCallStackElement, AbiCallStackElementCall, AbiCallStackElementParent, + AscabiCallStack, GetOperationAbiCallStacksResponse, GetSlotAbiCallStacksResponse, + OperationAbiCallStack, SlotAbiCallStacks, TransferInfo, TransferInfos, +}; + /// Execute read only call (function or bytecode) pub(crate) fn execute_read_only_call( grpc: &MassaPublicGrpc, @@ -458,6 +469,213 @@ pub(crate) fn get_stakers( Ok(grpc_api::GetStakersResponse { stakers }) } +#[cfg(feature = "execution-trace")] +/// recursive function to convert a AbiTrace struct +pub fn into_element(abi_trace: &AbiTrace) -> AbiCallStackElementParent { + if abi_trace.sub_calls.is_none() { + AbiCallStackElementParent { + call_stack_element: Some(CallStackElement::Element(AbiCallStackElement { + name: abi_trace.name.clone(), + parameters: abi_trace + .parameters + .iter() + .map(|p| serde_json::to_string(p).unwrap_or_default()) + .collect::>(), + return_value: serde_json::to_string(&abi_trace.return_value).unwrap_or_default(), + })), + } + } else { + AbiCallStackElementParent { + call_stack_element: Some(CallStackElement::ElementCall(AbiCallStackElementCall { + name: abi_trace.name.clone(), + parameters: abi_trace + .parameters + .iter() + .map(|p| serde_json::to_string(p).unwrap_or_default()) + .collect(), + return_value: serde_json::to_string(&abi_trace.return_value).unwrap_or_default(), + sub_calls: abi_trace + .sub_calls + .clone() + .unwrap_or_default() + .iter() + .map(into_element) + .collect(), + })), + } + } +} + +#[cfg(feature = "execution-trace")] +/// Get slot transfers +pub(crate) fn get_slot_transfers( + grpc: &MassaPublicGrpc, + request: tonic::Request, +) -> Result { + use massa_proto_rs::massa::api::v1::GetSlotTransfersResponse; + + let slots = request.into_inner().slots; + + let mut transfer_each_slot: Vec = vec![]; + for slot in slots { + let mut slot_transfers = TransferInfos { + slot: slot.clone().into(), + transfers: vec![], + }; + let abi_calls = grpc + .execution_controller + .get_slot_abi_call_stack(slot.clone().into()); + if let Some(abi_calls) = abi_calls { + // flatten & filter transfer trace in asc_call_stacks + + let abi_transfer_1 = "assembly_script_transfer_coins".to_string(); + let abi_transfer_2 = "assembly_script_transfer_coins_for".to_string(); + let abi_transfer_3 = "abi_transfer_coins".to_string(); + let transfer_abi_names = vec![abi_transfer_1, abi_transfer_2, abi_transfer_3]; + for (i, asc_call_stack) in abi_calls.asc_call_stacks.iter().enumerate() { + for abi_trace in asc_call_stack { + let only_transfer = abi_trace.flatten_filter(&transfer_abi_names); + for transfer in only_transfer { + let (t_from, t_to, t_amount) = transfer.parse_transfer(); + slot_transfers.transfers.push(TransferInfo { + from: t_from.clone(), + to: t_to.clone(), + amount: t_amount, + operation_id_or_asc_index: Some( + grpc_api::transfer_info::OperationIdOrAscIndex::AscIndex(i as u64), + ), + }); + } + } + } + + for op_call_stack in abi_calls.operation_call_stacks { + let op_id = op_call_stack.0; + let op_call_stack = op_call_stack.1; + for abi_trace in op_call_stack { + let only_transfer = abi_trace.flatten_filter(&transfer_abi_names); + for transfer in only_transfer { + let (t_from, t_to, t_amount) = transfer.parse_transfer(); + slot_transfers.transfers.push(TransferInfo { + from: t_from.clone(), + to: t_to.clone(), + amount: t_amount, + operation_id_or_asc_index: Some( + grpc_api::transfer_info::OperationIdOrAscIndex::OperationId( + op_id.to_string(), + ), + ), + }); + } + } + } + } + + let transfers = grpc + .execution_controller + .get_transfers_for_slot(slot.into()); + if let Some(transfers) = transfers { + for transfer in transfers { + slot_transfers.transfers.push(TransferInfo { + from: transfer.from.to_string(), + to: transfer.to.to_string(), + amount: transfer.amount.to_raw(), + operation_id_or_asc_index: Some( + grpc_api::transfer_info::OperationIdOrAscIndex::OperationId( + transfer.op_id.to_string(), + ), + ), + }); + } + } + + transfer_each_slot.push(slot_transfers); + } + + Ok(GetSlotTransfersResponse { transfer_each_slot }) +} + +#[cfg(feature = "execution-trace")] +/// Get operation ABI call stacks +pub(crate) fn get_operation_abi_call_stacks( + grpc: &MassaPublicGrpc, + request: tonic::Request, +) -> Result { + let op_ids_ = request.into_inner().operation_ids; + + let op_ids: Vec = op_ids_ + .iter() + .map(|o| OperationId::from_str(o)) + .collect::, _>>()?; + + let mut elements = vec![]; + for op_id in op_ids { + let abi_traces_ = grpc + .execution_controller + .get_operation_abi_call_stack(op_id); + if let Some(abi_traces) = abi_traces_ { + for abi_trace in abi_traces.iter() { + elements.push(into_element(abi_trace)); + } + } else { + elements.push(AbiCallStackElementParent { + call_stack_element: None, + }) + } + } + + let resp = GetOperationAbiCallStacksResponse { + call_stacks: vec![AbiCallStack { + call_stack: elements, + }], + }; + + Ok(resp) +} + +#[cfg(feature = "execution-trace")] +pub(crate) fn get_slot_abi_call_stacks( + grpc: &MassaPublicGrpc, + request: tonic::Request, +) -> Result { + let slots = request.into_inner().slots; + + let slot_elements = vec![]; + for slot in slots { + let call_stack_ = grpc + .execution_controller + .get_slot_abi_call_stack(slot.into()); + + let mut slot_abi_call_stacks = SlotAbiCallStacks { + asc_call_stacks: vec![], + operation_call_stacks: vec![], + }; + + if let Some(call_stack) = call_stack_ { + for (i, asc_call_stack) in call_stack.asc_call_stacks.into_iter().enumerate() { + slot_abi_call_stacks.asc_call_stacks.push(AscabiCallStack { + index: i as u64, + call_stack: asc_call_stack.iter().map(into_element).collect(), + }) + } + for (op_id, op_call_stack) in call_stack.operation_call_stacks { + slot_abi_call_stacks + .operation_call_stacks + .push(OperationAbiCallStack { + operation_id: op_id.to_string(), + call_stack: op_call_stack.iter().map(into_element).collect(), + }) + } + } + } + + let resp = GetSlotAbiCallStacksResponse { + slot_call_stacks: slot_elements, + }; + + Ok(resp) +} + /// Get next block best parents pub(crate) fn get_next_block_best_parents( grpc: &MassaPublicGrpc, diff --git a/massa-grpc/src/stream/mod.rs b/massa-grpc/src/stream/mod.rs index f6e91c4fb0d..8c212996e6d 100644 --- a/massa-grpc/src/stream/mod.rs +++ b/massa-grpc/src/stream/mod.rs @@ -8,8 +8,12 @@ pub mod new_endorsements; pub mod new_filled_blocks; /// subscribe new operations pub mod new_operations; +/// subscribe new slot abi call stacks +pub mod new_slot_abi_call_stacks; /// subscribe new slot execution outputs pub mod new_slot_execution_outputs; +/// subscribe new slot transfers +pub mod new_slot_transfers; /// send_blocks streaming pub mod send_blocks; /// send endorsements diff --git a/massa-grpc/src/stream/new_slot_abi_call_stacks.rs b/massa-grpc/src/stream/new_slot_abi_call_stacks.rs new file mode 100644 index 00000000000..673bdccdfde --- /dev/null +++ b/massa-grpc/src/stream/new_slot_abi_call_stacks.rs @@ -0,0 +1,139 @@ +use futures_util::StreamExt; +use massa_proto_rs::massa::api::v1::{self as grpc_api, FinalityLevel}; +use std::pin::Pin; +use tokio::select; +use tonic::{Request, Streaming}; +use tracing::{error, warn}; + +use crate::error::match_for_io_error; +use crate::{error::GrpcError, server::MassaPublicGrpc}; + +#[cfg(feature = "execution-trace")] +use crate::public::into_element; +#[cfg(not(feature = "execution-trace"))] +use massa_models::slot::Slot; +#[cfg(feature = "execution-trace")] +use massa_proto_rs::massa::api::v1::{AscabiCallStack, OperationAbiCallStack}; +#[cfg(not(feature = "execution-trace"))] +#[derive(Clone)] +struct SlotAbiCallStack { + /// Slot + pub slot: Slot, +} + +/// Type declaration for NewSlotExecutionOutputs +pub type NewSlotABICallStacksStreamType = Pin< + Box< + dyn futures_util::Stream< + Item = Result, + > + Send + + 'static, + >, +>; + +/// Creates a new stream of new slots abi call stacks +pub(crate) async fn new_slot_abi_call_stacks( + grpc: &MassaPublicGrpc, + request: Request>, +) -> Result { + let (tx, rx) = tokio::sync::mpsc::channel(grpc.grpc_config.max_channel_size); + // Extract the incoming stream of abi call stacks messages + let mut in_stream = request.into_inner(); + + // Subscribe to the new slot execution events channel + #[cfg(feature = "execution-trace")] + let mut subscriber = grpc + .execution_channels + .slot_execution_traces_sender + .subscribe(); + #[cfg(not(feature = "execution-trace"))] + let (mut subscriber, _receiver) = { + let (subscriber_, receiver) = + tokio::sync::broadcast::channel::<(SlotAbiCallStack, bool)>(0); + (subscriber_.subscribe(), receiver) + }; + + tokio::spawn(async move { + let mut finality = FinalityLevel::Unspecified; + loop { + select! { + // Receive a new slot execution traces from the subscriber + event = subscriber.recv() => { + match event { + Ok((massa_slot_execution_trace, received_finality)) => { + if (finality == FinalityLevel::Final && !received_finality) || + (finality == FinalityLevel::Candidate && received_finality) { + continue; + } + + #[allow(unused_mut)] + let mut ret = grpc_api::NewSlotAbiCallStacksResponse { + slot: Some(massa_slot_execution_trace.slot.into()), + asc_call_stacks: vec![], + operation_call_stacks: vec![] + }; + + #[cfg(feature = "execution-trace")] + { + for (i, asc_call_stack) in massa_slot_execution_trace.asc_call_stacks.iter().enumerate() { + ret.asc_call_stacks.push( + AscabiCallStack { + index: i as u64, + call_stack: asc_call_stack.iter().map(into_element).collect() + } + ) + } + for (op_id, op_call_stack) in massa_slot_execution_trace.operation_call_stacks.iter() { + ret.operation_call_stacks.push( + OperationAbiCallStack { + operation_id: op_id.to_string(), + call_stack: op_call_stack.iter().map(into_element).collect() + } + ) + } + } + + if let Err(e) = tx.send(Ok(ret)).await { + error!("failed to send new slot execution trace: {}", e); + break; + } + } + Err(e) => { + error!("error on receive new slot execution trace : {}", e) + } + } + } + // Receive a new message from the in_stream + res = in_stream.next() => { + match res { + Some(res) => { + match res { + Ok(message) => { + finality = message.finality_level(); + }, + Err(e) => { + // Any io error -> break + if let Some(io_err) = match_for_io_error(&e) { + warn!("client disconnected, broken pipe: {}", io_err); + break; + } + error!("{}", e); + if let Err(e2) = tx.send(Err(e)).await { + error!("failed to send back error response: {}", e2); + break; + } + } + } + } + None => { + // the client has disconnected + break; + } + } + } + } + } + }); + let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(Box::pin(out_stream) as NewSlotABICallStacksStreamType) +} diff --git a/massa-grpc/src/stream/new_slot_transfers.rs b/massa-grpc/src/stream/new_slot_transfers.rs new file mode 100644 index 00000000000..0837fd4cc62 --- /dev/null +++ b/massa-grpc/src/stream/new_slot_transfers.rs @@ -0,0 +1,166 @@ +use std::pin::Pin; + +#[cfg(feature = "execution-trace")] +use crate::{error::GrpcError, server::MassaPublicGrpc}; +#[cfg(feature = "execution-trace")] +use tonic::{Request, Streaming}; + +/// Type declaration for NewSlotTransfers +pub type NewSlotTransfersStreamType = Pin< + Box< + dyn futures_util::Stream< + Item = Result< + massa_proto_rs::massa::api::v1::NewSlotTransfersResponse, + tonic::Status, + >, + > + Send + + 'static, + >, +>; + +#[cfg(feature = "execution-trace")] +/// Creates a new stream of new slots transfers +pub(crate) async fn new_slot_transfers( + grpc: &MassaPublicGrpc, + request: Request>, +) -> Result { + use crate::error::match_for_io_error; + use futures_util::StreamExt; + use massa_proto_rs::massa::api::v1::{self as grpc_api, FinalityLevel, TransferInfo}; + use tokio::select; + use tracing::{error, warn}; + + let (tx, rx) = tokio::sync::mpsc::channel(grpc.grpc_config.max_channel_size); + // Extract the incoming stream of abi call stacks messages + let mut in_stream = request.into_inner(); + + // Subscribe to the new slot execution events channel + let mut subscriber = grpc + .execution_channels + .slot_execution_traces_sender + .subscribe(); + + tokio::spawn({ + let execution_controller = grpc.execution_controller.clone(); + async move { + let mut finality = FinalityLevel::Unspecified; + loop { + select! { + // Receive a new slot execution traces from the subscriber + event = subscriber.recv() => { + match event { + Ok((massa_slot_execution_trace, is_final)) => { + if (finality == FinalityLevel::Final && !is_final) || + (finality == FinalityLevel::Candidate && is_final) || + (finality == FinalityLevel::Unspecified && !is_final) { + continue; + } + let mut ret_transfers = Vec::new(); + // flatten & filter transfer trace in asc_call_stacks + + let abi_transfer_1 = "assembly_script_transfer_coins".to_string(); + let abi_transfer_2 = "assembly_script_transfer_coins_for".to_string(); + let abi_transfer_3 = "abi_transfer_coins".to_string(); + let transfer_abi_names = vec![abi_transfer_1, abi_transfer_2, abi_transfer_3]; + for (i, asc_call_stack) in massa_slot_execution_trace.asc_call_stacks.iter().enumerate() { + for abi_trace in asc_call_stack { + let only_transfer = abi_trace.flatten_filter(&transfer_abi_names); + for transfer in only_transfer { + let (t_from, t_to, t_amount) = transfer.parse_transfer(); + ret_transfers.push(TransferInfo { + from: t_from.clone(), + to: t_to.clone(), + amount: t_amount, + operation_id_or_asc_index: Some( + grpc_api::transfer_info::OperationIdOrAscIndex::AscIndex(i as u64), + ), + }); + } + } + } + + for op_call_stack in massa_slot_execution_trace.operation_call_stacks { + let op_id = op_call_stack.0; + let op_call_stack = op_call_stack.1; + for abi_trace in op_call_stack { + let only_transfer = abi_trace.flatten_filter(&transfer_abi_names); + for transfer in only_transfer { + let (t_from, t_to, t_amount) = transfer.parse_transfer(); + ret_transfers.push(TransferInfo { + from: t_from.clone(), + to: t_to.clone(), + amount: t_amount, + operation_id_or_asc_index: Some( + grpc_api::transfer_info::OperationIdOrAscIndex::OperationId(op_id.to_string()), + ), + }); + } + } + } + let transfers = + execution_controller + .get_transfers_for_slot(massa_slot_execution_trace.slot); + if let Some(transfers) = transfers { + for transfer in transfers { + ret_transfers.push(TransferInfo { + from: transfer.from.to_string(), + to: transfer.to.to_string(), + amount: transfer.amount.to_raw(), + operation_id_or_asc_index: Some( + grpc_api::transfer_info::OperationIdOrAscIndex::OperationId( + transfer.op_id.to_string(), + ), + ), + }); + } + } + let ret = grpc_api::NewSlotTransfersResponse { + slot: Some(massa_slot_execution_trace.slot.into()), + transfers: ret_transfers, + }; + + if let Err(e) = tx.send(Ok(ret)).await { + error!("failed to send new slot execution trace: {}", e); + break; + } + } + Err(e) => { + error!("error on receive new slot execution trace : {}", e) + } + } + } + // Receive a new message from the in_stream + res = in_stream.next() => { + match res { + Some(res) => { + match res { + Ok(message) => { + finality = message.finality_level(); + }, + Err(e) => { + // Any io error -> break + if let Some(io_err) = match_for_io_error(&e) { + warn!("client disconnected, broken pipe: {}", io_err); + break; + } + error!("{}", e); + if let Err(e2) = tx.send(Err(e)).await { + error!("failed to send back error response: {}", e2); + break; + } + } + } + } + None => { + // the client has disconnected + break; + } + } + } + } + } + } + }); + let out_stream = tokio_stream::wrappers::ReceiverStream::new(rx); + Ok(Box::pin(out_stream) as NewSlotTransfersStreamType) +} diff --git a/massa-grpc/src/tests/mock.rs b/massa-grpc/src/tests/mock.rs index 586cbc2aa01..eb0d95d6307 100644 --- a/massa-grpc/src/tests/mock.rs +++ b/massa-grpc/src/tests/mock.rs @@ -128,6 +128,8 @@ pub(crate) fn grpc_public_service(addr: &SocketAddr) -> MassaPublicGrpc { execution_controller: execution_ctrl, execution_channels: ExecutionChannels { slot_execution_output_sender, + #[cfg(feature = "execution-trace")] + slot_execution_traces_sender: tokio::sync::broadcast::channel(5000).0, }, pool_broadcasts: PoolBroadcasts { endorsement_sender, diff --git a/massa-node/Cargo.toml b/massa-node/Cargo.toml index e20fd14db53..017075ef76b 100644 --- a/massa-node/Cargo.toml +++ b/massa-node/Cargo.toml @@ -22,6 +22,7 @@ sandbox = [ "massa_models/sandbox", "massa_metrics/sandbox", ] +execution-trace = ["massa_grpc/execution-trace", "massa_api/execution-trace", "massa_execution_worker/execution-trace", "massa_execution_exports/execution-trace"] [dependencies] crossbeam-channel = { workspace = true } # BOM UPGRADE Revert to "0.5.6" if problem diff --git a/massa-node/base_config/config.toml b/massa-node/base_config/config.toml index a782dc985e8..1f92ec42396 100644 --- a/massa-node/base_config/config.toml +++ b/massa-node/base_config/config.toml @@ -215,6 +215,10 @@ snip_amount = 10 # slot execution outputs channel capacity broadcast_slot_execution_output_channel_capacity = 5000 + # slot execution traces channel capacity + broadcast_slot_execution_traces_channel_capacity = 5000 + # Max execution traces (group by block id) to keep in cache + execution_traces_limit = 5000 [ledger] # path to the initial ledger diff --git a/massa-node/base_config/openrpc.json b/massa-node/base_config/openrpc.json index f56b3ff63f9..3fd39938623 100644 --- a/massa-node/base_config/openrpc.json +++ b/massa-node/base_config/openrpc.json @@ -2,7 +2,7 @@ "openrpc": "1.2.4", "info": { "title": "Massa OpenRPC Specification", - "version": "MAIN.1.0", + "version": "MAIN.2.1", "description": "Massa OpenRPC Specification document. Find more information on https://docs.massa.net/docs/build/api/jsonrpc", "termsOfService": "https://open-rpc.org", "contact": { @@ -21,6 +21,11 @@ "name": "Massa public testnet API", "url": "https://test.massa.net/api/v2", "description": "Massa public testnet url" + }, + { + "name": "Massa public mainnet API", + "url": "https://mainnet.massa.net/api/v2", + "description": "Massa public mainnet url" } ], "methods": [ @@ -298,6 +303,42 @@ "summary": "Get datastore entry", "description": "Get datastore entry." }, + { + "tags": [ + { + "name": "public", + "description": "Massa public api" + } + ], + "params": [ + { + "name": "slots", + "description": "Precise the slots you want to gather the transfers from", + "schema": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Slot" + } + }, + "required": true + } + ], + "result": { + "name": "SlotsTransfers", + "schema": { + "type": "array", + "items": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Transfer" + } + } + } + }, + "name": "get_slots_transfers", + "summary": "Get transfers for specified slots", + "description": "Get transfers for specified slots" + }, { "tags": [ { @@ -3103,6 +3144,35 @@ }, "additionalProperties": false }, + "Transfer": { + "title": "Transfer", + "description": "Describe a transfer of MAS", + "required": [ + "from", + "to", + "amount", + "context" + ], + "type": "object", + "properties": { + "from": { + "description": "Address of the sender", + "type": "string" + }, + "to": { + "description": "Address of the receiver", + "type": "string" + }, + "amount": { + "description": "Amount transferred", + "type": "integer" + }, + "context": { + "description": "Context of the transfer : operation or asyncronous execution", + "type": "object" + } + } + }, "Version": { "description": "Application version, checked during handshakes", "type": "string" diff --git a/massa-node/src/main.rs b/massa-node/src/main.rs index b0b1f06da8d..31cf8b3e58f 100644 --- a/massa-node/src/main.rs +++ b/massa-node/src/main.rs @@ -470,6 +470,14 @@ async fn launch( max_function_length: MAX_FUNCTION_NAME_LENGTH, max_parameter_length: MAX_PARAMETERS_SIZE, chain_id: *CHAINID, + #[cfg(feature = "execution-trace")] + broadcast_traces_enabled: true, + #[cfg(not(feature = "execution-trace"))] + broadcast_traces_enabled: false, + broadcast_slot_execution_traces_channel_capacity: SETTINGS + .execution + .broadcast_slot_execution_traces_channel_capacity, + max_execution_traces_slot_limit: SETTINGS.execution.execution_traces_limit, }; let execution_channels = ExecutionChannels { @@ -477,6 +485,11 @@ async fn launch( execution_config.broadcast_slot_execution_output_channel_capacity, ) .0, + #[cfg(feature = "execution-trace")] + slot_execution_traces_sender: broadcast::channel( + execution_config.broadcast_slot_execution_traces_channel_capacity, + ) + .0, }; let (execution_manager, execution_controller) = start_execution_worker( diff --git a/massa-node/src/settings.rs b/massa-node/src/settings.rs index e35cee44aa5..acf5f937b0f 100644 --- a/massa-node/src/settings.rs +++ b/massa-node/src/settings.rs @@ -34,6 +34,9 @@ pub struct ExecutionSettings { pub snip_amount: usize, /// slot execution outputs channel capacity pub broadcast_slot_execution_output_channel_capacity: usize, + /// slot execution traces channel capacity + pub broadcast_slot_execution_traces_channel_capacity: usize, + pub execution_traces_limit: usize, } #[derive(Clone, Debug, Deserialize)]