diff --git a/chain/ethereum/src/runtime/runtime_adapter.rs b/chain/ethereum/src/runtime/runtime_adapter.rs index 951958d786b..5ebb8a62570 100644 --- a/chain/ethereum/src/runtime/runtime_adapter.rs +++ b/chain/ethereum/src/runtime/runtime_adapter.rs @@ -11,7 +11,7 @@ use graph::blockchain::ChainIdentifier; use graph::components::subgraph::HostMetrics; use graph::data::store::ethereum::call; use graph::data::store::scalar::BigInt; -use graph::data::subgraph::API_VERSION_0_0_9; +use graph::data::subgraph::{API_VERSION_0_0_4, API_VERSION_0_0_9}; use graph::data_source; use graph::data_source::common::{ContractCall, MappingABI}; use graph::prelude::web3::types::H160; @@ -26,7 +26,6 @@ use graph::{ EthereumCallCache, }, runtime::{asc_get, asc_new, AscPtr, HostExportError}, - semver::Version, slog::Logger, }; use graph_runtime_wasm::asc_abi::class::{AscBigInt, AscEnumArray, AscWrapped, EthereumValueKind}; @@ -185,7 +184,7 @@ fn ethereum_call( // For apiVersion >= 0.0.4 the call passed from the mapping includes the // function signature; subgraphs using an apiVersion < 0.0.4 don't pass // the signature along with the call. - let call: UnresolvedContractCall = if ctx.heap.api_version() >= Version::new(0, 0, 4) { + let call: UnresolvedContractCall = if ctx.heap.api_version() >= &API_VERSION_0_0_4 { asc_get::<_, AscUnresolvedContractCall_0_0_4, _>(ctx.heap, wasm_ptr.into(), &ctx.gas, 0)? } else { asc_get::<_, AscUnresolvedContractCall, _>(ctx.heap, wasm_ptr.into(), &ctx.gas, 0)? @@ -215,7 +214,7 @@ fn eth_get_balance( ctx.gas .consume_host_fn_with_metrics(ETH_GET_BALANCE, "eth_get_balance")?; - if ctx.heap.api_version() < API_VERSION_0_0_9 { + if ctx.heap.api_version() < &API_VERSION_0_0_9 { return Err(HostExportError::Deterministic(anyhow!( "ethereum.getBalance call is not supported before API version 0.0.9" ))); @@ -249,7 +248,7 @@ fn eth_has_code( ctx.gas .consume_host_fn_with_metrics(ETH_HAS_CODE, "eth_has_code")?; - if ctx.heap.api_version() < API_VERSION_0_0_9 { + if ctx.heap.api_version() < &API_VERSION_0_0_9 { return Err(HostExportError::Deterministic(anyhow!( "ethereum.hasCode call is not supported before API version 0.0.9" ))); diff --git a/chain/ethereum/src/trigger.rs b/chain/ethereum/src/trigger.rs index fa59438f244..c87a0211ae9 100644 --- a/chain/ethereum/src/trigger.rs +++ b/chain/ethereum/src/trigger.rs @@ -151,7 +151,7 @@ impl ToAscPtr for MappingTrigger { log.as_ref(), ¶ms, ); - if api_version >= API_VERSION_0_0_7 { + if api_version >= &API_VERSION_0_0_7 { asc_new::< AscEthereumEvent_0_0_7< AscEthereumTransaction_0_0_6, @@ -161,14 +161,14 @@ impl ToAscPtr for MappingTrigger { _, >(heap, &(ethereum_event_data, receipt.as_deref()), gas)? .erase() - } else if api_version >= API_VERSION_0_0_6 { + } else if api_version >= &API_VERSION_0_0_6 { asc_new::< AscEthereumEvent, _, _, >(heap, ðereum_event_data, gas)? .erase() - } else if api_version >= API_VERSION_0_0_2 { + } else if api_version >= &API_VERSION_0_0_2 { asc_new::< AscEthereumEvent, _, @@ -192,14 +192,14 @@ impl ToAscPtr for MappingTrigger { outputs, } => { let call = EthereumCallData::new(&block, &transaction, &call, &inputs, &outputs); - if heap.api_version() >= Version::new(0, 0, 6) { + if heap.api_version() >= &Version::new(0, 0, 6) { asc_new::< AscEthereumCall_0_0_3, _, _, >(heap, &call, gas)? .erase() - } else if heap.api_version() >= Version::new(0, 0, 3) { + } else if heap.api_version() >= &Version::new(0, 0, 3) { asc_new::< AscEthereumCall_0_0_3, _, @@ -212,7 +212,7 @@ impl ToAscPtr for MappingTrigger { } MappingTrigger::Block { block } => { let block = EthereumBlockData::from(block.as_ref()); - if heap.api_version() >= Version::new(0, 0, 6) { + if heap.api_version() >= &Version::new(0, 0, 6) { asc_new::(heap, &block, gas)?.erase() } else { asc_new::(heap, &block, gas)?.erase() diff --git a/chain/near/src/trigger.rs b/chain/near/src/trigger.rs index 364b9061038..c48668cdd8e 100644 --- a/chain/near/src/trigger.rs +++ b/chain/near/src/trigger.rs @@ -497,8 +497,8 @@ mod tests { Ok(init_slice(src, buffer)) } - fn api_version(&self) -> graph::semver::Version { - self.api_version.clone() + fn api_version(&self) -> &graph::semver::Version { + &self.api_version } fn asc_type_id( diff --git a/graph/src/data/subgraph/api_version.rs b/graph/src/data/subgraph/api_version.rs index fbda95b2792..d0b4133abb1 100644 --- a/graph/src/data/subgraph/api_version.rs +++ b/graph/src/data/subgraph/api_version.rs @@ -5,6 +5,9 @@ use thiserror::Error; pub const API_VERSION_0_0_2: Version = Version::new(0, 0, 2); +/// Changed calling convention for `ethereum.call` +pub const API_VERSION_0_0_4: Version = Version::new(0, 0, 4); + /// This version adds a new subgraph validation step that rejects manifests whose mappings have /// different API versions if at least one of them is equal to or higher than `0.0.5`. pub const API_VERSION_0_0_5: Version = Version::new(0, 0, 5); diff --git a/graph/src/runtime/asc_heap.rs b/graph/src/runtime/asc_heap.rs index bf31f7dc3f2..abd86d9dbf1 100644 --- a/graph/src/runtime/asc_heap.rs +++ b/graph/src/runtime/asc_heap.rs @@ -28,7 +28,7 @@ pub trait AscHeap { fn read_u32(&self, offset: u32, gas: &GasCounter) -> Result; - fn api_version(&self) -> Version; + fn api_version(&self) -> &Version; fn asc_type_id(&mut self, type_id_index: IndexForAscTypeId) -> Result; } diff --git a/graph/src/runtime/asc_ptr.rs b/graph/src/runtime/asc_ptr.rs index 890bde20e07..cc2815bbcd9 100644 --- a/graph/src/runtime/asc_ptr.rs +++ b/graph/src/runtime/asc_ptr.rs @@ -1,3 +1,5 @@ +use crate::data::subgraph::API_VERSION_0_0_4; + use super::gas::GasCounter; use super::{padding_to_16, DeterministicHostError, HostExportError}; @@ -61,7 +63,7 @@ impl AscPtr { let len = match heap.api_version() { // TODO: The version check here conflicts with the comment on C::asc_size, // which states "Only used for version <= 0.0.3." - version if version <= Version::new(0, 0, 4) => C::asc_size(self, heap, gas), + version if version <= &API_VERSION_0_0_4 => C::asc_size(self, heap, gas), _ => self.read_len(heap, gas), }?; @@ -91,7 +93,7 @@ impl AscPtr { C: AscIndexId, { match heap.api_version() { - version if version <= Version::new(0, 0, 4) => { + version if version <= &API_VERSION_0_0_4 => { let heap_ptr = heap.raw_new(&asc_obj.to_asc_bytes()?, gas)?; Ok(AscPtr::new(heap_ptr)) } diff --git a/runtime/wasm/src/asc_abi/class.rs b/runtime/wasm/src/asc_abi/class.rs index 1fae1ad9ce0..210752582bf 100644 --- a/runtime/wasm/src/asc_abi/class.rs +++ b/runtime/wasm/src/asc_abi/class.rs @@ -1,7 +1,10 @@ use ethabi; use graph::{ - data::store::{self, scalar::Timestamp}, + data::{ + store::{self, scalar::Timestamp}, + subgraph::API_VERSION_0_0_4, + }, runtime::{ gas::GasCounter, AscHeap, AscIndexId, AscType, AscValue, HostExportError, IndexForAscTypeId, ToAscObj, @@ -27,10 +30,10 @@ pub enum ArrayBuffer { impl ArrayBuffer { pub(crate) fn new( values: &[T], - api_version: Version, + api_version: &Version, ) -> Result { match api_version { - version if version <= Version::new(0, 0, 4) => { + version if version <= &API_VERSION_0_0_4 => { Ok(Self::ApiVersion0_0_4(v0_0_4::ArrayBuffer::new(values)?)) } _ => Ok(Self::ApiVersion0_0_5(v0_0_5::ArrayBuffer::new(values)?)), @@ -95,7 +98,7 @@ impl TypedArray { gas: &GasCounter, ) -> Result { match heap.api_version() { - version if version <= Version::new(0, 0, 4) => Ok(Self::ApiVersion0_0_4( + version if version <= &API_VERSION_0_0_4 => Ok(Self::ApiVersion0_0_4( v0_0_4::TypedArray::new(content, heap, gas)?, )), _ => Ok(Self::ApiVersion0_0_5(v0_0_5::TypedArray::new( @@ -201,9 +204,9 @@ pub enum AscString { } impl AscString { - pub fn new(content: &[u16], api_version: Version) -> Result { + pub fn new(content: &[u16], api_version: &Version) -> Result { match api_version { - version if version <= Version::new(0, 0, 4) => { + version if version <= &API_VERSION_0_0_4 => { Ok(Self::ApiVersion0_0_4(v0_0_4::AscString::new(content)?)) } _ => Ok(Self::ApiVersion0_0_5(v0_0_5::AscString::new(content)?)), @@ -275,7 +278,7 @@ impl Array { gas: &GasCounter, ) -> Result { match heap.api_version() { - version if version <= Version::new(0, 0, 4) => Ok(Self::ApiVersion0_0_4( + version if version <= &API_VERSION_0_0_4 => Ok(Self::ApiVersion0_0_4( v0_0_4::Array::new(content, heap, gas)?, )), _ => Ok(Self::ApiVersion0_0_5(v0_0_5::Array::new( diff --git a/runtime/wasm/src/asc_abi/v0_0_4.rs b/runtime/wasm/src/asc_abi/v0_0_4.rs index 39123f96efd..4fabbc425e7 100644 --- a/runtime/wasm/src/asc_abi/v0_0_4.rs +++ b/runtime/wasm/src/asc_abi/v0_0_4.rs @@ -54,7 +54,7 @@ impl ArrayBuffer { &self, byte_offset: u32, length: u32, - api_version: Version, + api_version: &Version, ) -> Result, DeterministicHostError> { let length = length as usize; let byte_offset = byte_offset as usize; diff --git a/runtime/wasm/src/asc_abi/v0_0_5.rs b/runtime/wasm/src/asc_abi/v0_0_5.rs index 4052796f819..71333eb91f1 100644 --- a/runtime/wasm/src/asc_abi/v0_0_5.rs +++ b/runtime/wasm/src/asc_abi/v0_0_5.rs @@ -52,7 +52,7 @@ impl ArrayBuffer { &self, byte_offset: u32, length: u32, - api_version: Version, + api_version: &Version, ) -> Result, DeterministicHostError> { let length = length as usize; let byte_offset = byte_offset as usize; @@ -60,7 +60,7 @@ impl ArrayBuffer { self.content[byte_offset..] .chunks(size_of::()) .take(length) - .map(|asc_obj| T::from_asc_bytes(asc_obj, &api_version)) + .map(|asc_obj| T::from_asc_bytes(asc_obj, api_version)) .collect() } } diff --git a/runtime/wasm/src/module/context.rs b/runtime/wasm/src/module/context.rs index 03cbf244c23..2badd3cd3c1 100644 --- a/runtime/wasm/src/module/context.rs +++ b/runtime/wasm/src/module/context.rs @@ -48,12 +48,8 @@ impl WasmInstanceContext<'_> { self.inner.data_mut() } - pub fn asc_heap_ref(&self) -> &AscHeapCtx { - self.as_ref().asc_heap_ref() - } - - pub fn asc_heap_mut(&mut self) -> &mut AscHeapCtx { - self.as_mut().asc_heap_mut() + pub fn asc_heap(&self) -> &Arc { + self.as_ref().asc_heap() } pub fn suspend_timeout(&mut self) { @@ -96,7 +92,7 @@ pub struct WasmInstanceData { // This option is needed to break the cyclic dependency between, instance, store, and context. // during execution it should always be populated. - asc_heap: Option, + asc_heap: Option>, } impl WasmInstanceData { @@ -117,15 +113,12 @@ impl WasmInstanceData { } } - pub fn set_asc_heap(&mut self, asc_heap: AscHeapCtx) { + pub fn set_asc_heap(&mut self, asc_heap: Arc) { self.asc_heap = Some(asc_heap); } - pub fn asc_heap_ref(&self) -> &AscHeapCtx { - self.asc_heap.as_ref().unwrap() - } - pub fn asc_heap_mut(&mut self) -> &mut AscHeapCtx { - self.asc_heap.as_mut().unwrap() + pub fn asc_heap(&self) -> &Arc { + self.asc_heap.as_ref().expect("asc_heap not set") } pub fn take_state(mut self) -> BlockState { diff --git a/runtime/wasm/src/module/mod.rs b/runtime/wasm/src/module/mod.rs index 3e047690285..1c6e1d5fad3 100644 --- a/runtime/wasm/src/module/mod.rs +++ b/runtime/wasm/src/module/mod.rs @@ -1,5 +1,6 @@ use std::convert::TryFrom; use std::mem::MaybeUninit; +use std::sync::Mutex; use anyhow::anyhow; use anyhow::Error; @@ -134,6 +135,19 @@ fn is_trap_deterministic(trap: &Error) -> bool { } } +struct Arena { + // First free byte in the current arena. Set on the first call to `raw_new`. + start: i32, + // Number of free bytes starting from `arena_start_ptr`. + size: i32, +} + +impl Arena { + fn new() -> Self { + Self { start: 0, size: 0 } + } +} + #[derive(Copy, Clone)] pub struct ExperimentalFeatures { pub allow_non_deterministic_ipfs: bool, @@ -154,11 +168,7 @@ pub struct AscHeapCtx { // is zeroed when initialized or grown. memory: Memory, - // First free byte in the current arena. Set on the first call to `raw_new`. - arena_start_ptr: i32, - - // Number of free bytes starting from `arena_start_ptr`. - arena_free_size: i32, + arena: Mutex, } impl AscHeapCtx { @@ -166,7 +176,7 @@ impl AscHeapCtx { instance: &wasmtime::Instance, ctx: &mut WasmInstanceContext<'_>, api_version: Version, - ) -> anyhow::Result { + ) -> anyhow::Result> { // Provide access to the WASM runtime linear memory let memory = instance .get_memory(ctx.as_context_mut(), "memory") @@ -194,14 +204,33 @@ impl AscHeapCtx { ), }; - Ok(AscHeapCtx { + Ok(Arc::new(AscHeapCtx { memory_allocate, memory, - arena_start_ptr: 0, - arena_free_size: 0, + arena: Mutex::new(Arena::new()), api_version, id_of_type, - }) + })) + } + + fn arena_start_ptr(&self) -> i32 { + self.arena.lock().unwrap().start + } + + fn arena_free_size(&self) -> i32 { + self.arena.lock().unwrap().size + } + + fn set_arena(&self, start_ptr: i32, size: i32) { + let mut arena = self.arena.lock().unwrap(); + arena.start = start_ptr; + arena.size = size; + } + + fn allocated(&self, size: i32) { + let mut arena = self.arena.lock().unwrap(); + arena.start += size; + arena.size -= size; } } @@ -229,21 +258,20 @@ impl AscHeap for WasmInstanceContext<'_> { static MIN_ARENA_SIZE: i32 = 10_000; let size = i32::try_from(bytes.len()).unwrap(); - if size > self.asc_heap_ref().arena_free_size { + if size > self.asc_heap().arena_free_size() { // Allocate a new arena. Any free space left in the previous arena is left unused. This // causes at most half of memory to be wasted, which is acceptable. - let arena_size = size.max(MIN_ARENA_SIZE); + let mut arena_size = size.max(MIN_ARENA_SIZE); // Unwrap: This may panic if more memory needs to be requested from the OS and that // fails. This error is not deterministic since it depends on the operating conditions // of the node. - let memory_allocate = self.asc_heap_ref().memory_allocate.clone(); - self.asc_heap_mut().arena_start_ptr = memory_allocate + let memory_allocate = &self.asc_heap().cheap_clone().memory_allocate; + let mut start_ptr = memory_allocate .call(self.as_context_mut(), arena_size) .unwrap(); - self.asc_heap_mut().arena_free_size = arena_size; - match &self.asc_heap_ref().api_version { + match &self.asc_heap().api_version { version if *version <= Version::new(0, 0, 4) => {} _ => { // This arithmetic is done because when you call AssemblyScripts's `__alloc` @@ -252,19 +280,19 @@ impl AscHeap for WasmInstanceContext<'_> { // `mmInfo` has size of 4, and everything allocated on AssemblyScript memory // should have alignment of 16, this means we need to do a 12 offset on these // big chunks of untyped allocation. - self.asc_heap_mut().arena_start_ptr += 12; - self.asc_heap_mut().arena_free_size -= 12; + start_ptr += 12; + arena_size -= 12; } }; + self.asc_heap().set_arena(start_ptr, arena_size); }; - let ptr = self.asc_heap_ref().arena_start_ptr as usize; + let ptr = self.asc_heap().arena_start_ptr() as usize; // Unwrap: We have just allocated enough space for `bytes`. - let memory = self.asc_heap_ref().memory; + let memory = self.asc_heap().memory; memory.write(self.as_context_mut(), ptr, bytes).unwrap(); - self.asc_heap_mut().arena_start_ptr += size; - self.asc_heap_mut().arena_free_size -= size; + self.asc_heap().allocated(size); Ok(ptr as u32) } @@ -272,7 +300,7 @@ impl AscHeap for WasmInstanceContext<'_> { fn read_u32(&self, offset: u32, gas: &GasCounter) -> Result { gas.consume_host_fn_with_metrics(Gas::new(GAS_COST_LOAD as u64 * 4), "read_u32")?; let mut bytes = [0; 4]; - self.asc_heap_ref() + self.asc_heap() .memory .read(self, offset as usize, &mut bytes) .map_err(|_| { @@ -302,7 +330,7 @@ impl AscHeap for WasmInstanceContext<'_> { // TODO: Do we still need this? Can we use read directly? let src = self - .asc_heap_ref() + .asc_heap() .memory .data(self) .get(offset..) @@ -316,12 +344,13 @@ impl AscHeap for WasmInstanceContext<'_> { Ok(init_slice(src, buffer)) } - fn api_version(&self) -> Version { - self.asc_heap_ref().api_version.clone() + fn api_version(&self) -> &Version { + &self.asc_heap().api_version } fn asc_type_id(&mut self, type_id_index: IndexForAscTypeId) -> Result { - let func = self.asc_heap_ref().id_of_type.clone().unwrap(); + let asc_heap = self.asc_heap().cheap_clone(); + let func = asc_heap.id_of_type.as_ref().unwrap(); // Unwrap ok because it's only called on correct apiVersion, look for AscPtr::generate_header func.call(self.as_context_mut(), type_id_index as u32)