Skip to content

Commit

Permalink
runtime-sdk: Add support for incoming messages
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed Sep 26, 2022
1 parent c2e3d56 commit b475b3b
Show file tree
Hide file tree
Showing 24 changed files with 902 additions and 28 deletions.
101 changes: 89 additions & 12 deletions runtime-sdk/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::{
error::{Error as _, RuntimeError},
event::IntoTags,
keymanager::{KeyManagerClient, KeyManagerError},
module::{self, BlockHandler, MethodHandler, TransactionHandler},
module::{self, BlockHandler, InMsgHandler, InMsgResult, MethodHandler, TransactionHandler},
modules,
modules::core::API as _,
runtime::Runtime,
Expand Down Expand Up @@ -537,7 +537,7 @@ impl<R: Runtime> Dispatcher<R> {
messages,
block_tags: block_tags.into_tags(),
tx_reject_hashes: vec![],
in_msgs_count: 0, // TODO: Support processing incoming messages.
in_msgs_count: 0,
})
}
}
Expand All @@ -547,17 +547,63 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
&self,
rt_ctx: transaction::Context<'_>,
batch: &TxnBatch,
_in_msgs: &[roothash::IncomingMessage],
in_msgs: &[roothash::IncomingMessage],
) -> Result<ExecuteBatchResult, RuntimeError> {
self.execute_batch_common(
let mut in_msgs_count = 0;

let mut result = self.execute_batch_common(
rt_ctx,
|ctx| -> Result<Vec<ExecuteTxResult>, RuntimeError> {
// If prefetch limit is set enable prefetch.
let prefetch_enabled = R::PREFETCH_LIMIT > 0;
let mut results = Vec::with_capacity(batch.len());

// Process incoming messages first.
let mut batch_it = batch.iter();
'inmsg: for in_msg in in_msgs {
match R::IncomingMessagesHandler::process_in_msg(ctx, &in_msg) {
InMsgResult::Skip => {
// Skip, but treat as processed.
in_msgs_count += 1;
}
InMsgResult::Execute(raw_tx, tx) => {
// Verify that the transaction has been included in the batch.
match batch_it.next() {
None => {
// Nothing in the batch when there should be an incoming message.
return Err(Error::MalformedTransactionInBatch(anyhow!(
"missing incoming message"
))
.into());
}
Some(batch_tx) if batch_tx != raw_tx => {
// Incoming message does not match what is in the batch.
return Err(Error::MalformedTransactionInBatch(anyhow!(
"mismatched incoming message"
))
.into());
}
_ => {
// Everything is ok.
}
}

// Further execute the inner transaction. The transaction has already
// passed checks so it is ok to include in a block.
let tx_size = raw_tx.len().try_into().unwrap();
let index = results.len();
results.push(Self::execute_tx(ctx, tx_size, tx, index)?);

in_msgs_count += 1;
}
InMsgResult::Stop => break 'inmsg,
}
}

let inmsg_txs = results.len();
let mut txs = Vec::with_capacity(batch.len());
let mut prefixes: BTreeSet<Prefix> = BTreeSet::new();
for tx in batch.iter() {
for tx in batch.iter().skip(inmsg_txs) {
let tx_size = tx.len().try_into().map_err(|_| {
Error::MalformedTransactionInBatch(anyhow!("transaction too large"))
})?;
Expand All @@ -580,24 +626,29 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
}

// Execute the batch.
let mut results = Vec::with_capacity(batch.len());
for (index, (tx_size, tx)) in txs.into_iter().enumerate() {
for (index, (tx_size, tx)) in txs.into_iter().skip(inmsg_txs).enumerate() {
results.push(Self::execute_tx(ctx, tx_size, tx, index)?);
}

Ok(results)
},
)
)?;

// Include number of processed incoming messages in the final result.
result.in_msgs_count = in_msgs_count;

Ok(result)
}

fn schedule_and_execute_batch(
&self,
rt_ctx: transaction::Context<'_>,
batch: &mut TxnBatch,
_in_msgs: &[roothash::IncomingMessage],
in_msgs: &[roothash::IncomingMessage],
) -> Result<ExecuteBatchResult, RuntimeError> {
let cfg = R::SCHEDULE_CONTROL;
let mut tx_reject_hashes = Vec::new();
let mut in_msgs_count = 0;

let mut result = self.execute_batch_common(
rt_ctx,
Expand All @@ -607,13 +658,35 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
// The idea is to keep scheduling transactions as long as we have some space
// available in the block as determined by gas use.
let mut new_batch = Vec::new();
let mut results = Vec::with_capacity(batch.len());
let mut results = Vec::with_capacity(in_msgs.len() + batch.len());
let mut requested_batch_len = cfg.initial_batch_size;

// Process incoming messages first.
'inmsg: for in_msg in in_msgs {
match R::IncomingMessagesHandler::process_in_msg(ctx, &in_msg) {
InMsgResult::Skip => {
// Skip, but treat as processed.
in_msgs_count += 1;
}
InMsgResult::Execute(raw_tx, tx) => {
// Further execute the inner transaction. The transaction has already
// passed checks so it is ok to include in a block.
let tx_size = raw_tx.len().try_into().unwrap();
let index = new_batch.len();
new_batch.push(raw_tx.to_owned());
results.push(Self::execute_tx(ctx, tx_size, tx, index)?);

in_msgs_count += 1;
}
InMsgResult::Stop => break 'inmsg,
}
}

// Process regular transactions.
'batch: loop {
// Remember length of last batch.
let last_batch_len = batch.len();
let last_batch_tx_hash = batch.last().map(|raw_tx| Hash::digest_bytes(raw_tx));

for raw_tx in batch.drain(..) {
// If we don't have enough gas for processing even the cheapest transaction
// we are done. Same if we reached the runtime-imposed maximum tx count.
Expand Down Expand Up @@ -689,8 +762,10 @@ impl<R: Runtime + Send + Sync> transaction::dispatcher::Dispatcher for Dispatche
},
)?;

// Include rejected transaction hashes in the final result.
// Include rejected transaction hashes and number of processed incoming messages in the
// final result.
result.tx_reject_hashes = tx_reject_hashes;
result.in_msgs_count = in_msgs_count;

Ok(result)
}
Expand Down Expand Up @@ -877,6 +952,7 @@ mod test {
core::Genesis {
parameters: core::Parameters {
max_batch_gas: u64::MAX,
max_inmsg_gas: 0,
max_tx_size: 32 * 1024,
max_tx_signers: 1,
max_multisig_signers: 8,
Expand All @@ -885,6 +961,7 @@ mod test {
auth_signature: 0,
auth_multisig_signer: 0,
callformat_x25519_deoxysii: 0,
inmsg_base: 0,
},
min_gas_price: BTreeMap::from([(token::Denomination::NATIVE, 0)]),
},
Expand Down
55 changes: 55 additions & 0 deletions runtime-sdk/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//! Error types for runtimes.
use std::fmt::Display;

pub use oasis_core_runtime::types::Error as RuntimeError;

use crate::{dispatcher, module::CallResult};
Expand Down Expand Up @@ -56,6 +58,18 @@ pub trait Error: std::error::Error {
{
Err(self)
}

/// Converts the error into a serializable error.
fn into_serializable(self) -> SerializableError
where
Self: Sized,
{
SerializableError {
module: self.module_name().to_owned(),
code: self.code(),
message: self.to_string(),
}
}
}

impl Error for std::convert::Infallible {
Expand All @@ -68,6 +82,47 @@ impl Error for std::convert::Infallible {
}
}

/// A standardized serialized implementation for an error.
#[derive(Debug, Default, Clone, thiserror::Error, cbor::Encode, cbor::Decode)]
pub struct SerializableError {
pub module: String,
pub code: u32,
pub message: String,
}

impl Display for SerializableError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message)
}
}

impl Error for SerializableError {
fn module_name(&self) -> &str {
&self.module
}

fn code(&self) -> u32 {
self.code
}
}

impl From<CallResult> for SerializableError {
fn from(result: CallResult) -> Self {
match result {
CallResult::Failed {
module,
code,
message,
} => Self {
module,
code,
message,
},
_ => Default::default(),
}
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
1 change: 1 addition & 0 deletions runtime-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![deny(rust_2018_idioms, unreachable_pub)]
#![forbid(unsafe_code)]
#![feature(int_log)]
#![feature(associated_type_defaults)]

pub mod callformat;
pub mod config;
Expand Down
39 changes: 39 additions & 0 deletions runtime-sdk/src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use impl_trait_for_tuples::impl_for_tuples;

use crate::{
context::{Context, TxContext},
core::consensus::roothash,
dispatcher, error,
error::Error as _,
event, modules,
Expand Down Expand Up @@ -565,6 +566,39 @@ impl ModuleInfoHandler for Tuple {
}
}

/// Incoming message handler.
pub trait InMsgHandler {
/// Process an incoming message.
fn process_in_msg<'a, C: Context>(
ctx: &mut C,
in_msg: &'a roothash::IncomingMessage,
) -> InMsgResult<'a>;
}

/// Result of processing an incoming message.
#[derive(Debug)]
pub enum InMsgResult<'a> {
/// Skip to next incoming message, but count as processed.
Skip,
/// Add to batch/verify inclusion and execute.
Execute(&'a [u8], Transaction),
/// Stop processing incoming messages.
Stop,
}

/// An incoming message handler which discards all incoming messages.
pub struct InMsgDiscard;

impl InMsgHandler for InMsgDiscard {
fn process_in_msg<'a, C: Context>(
_ctx: &mut C,
_in_msg: &'a roothash::IncomingMessage,
) -> InMsgResult<'a> {
// Just skip all messages without doing anything.
InMsgResult::Skip
}
}

/// A runtime module.
pub trait Module {
/// Module name.
Expand All @@ -591,6 +625,11 @@ pub trait Module {

/// Set the module's parameters.
fn set_params<S: Store>(store: S, params: Self::Parameters) {
params
.validate_basic()
.map_err(|_| ())
.expect("module parameters are invalid");

let store = storage::PrefixStore::new(store, &Self::NAME);
let mut store = storage::TypedStore::new(store);
store.insert(Self::Parameters::STORE_KEY, params);
Expand Down
4 changes: 1 addition & 3 deletions runtime-sdk/src/modules/consensus/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
//! Consensus module.
//!
//! Low level consensus module for communicating with the consensus layer.
use std::str::FromStr;

use thiserror::Error;

use oasis_core_runtime::{
Expand Down Expand Up @@ -44,7 +42,7 @@ pub struct Parameters {
impl Default for Parameters {
fn default() -> Self {
Self {
consensus_denomination: token::Denomination::from_str("TEST").unwrap(),
consensus_denomination: "TEST".parse().unwrap(),
consensus_scaling_factor: 1,
}
}
Expand Down
15 changes: 15 additions & 0 deletions runtime-sdk/src/modules/consensus_inmsg/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use crate::modules;

/// Incoming message handler configuration.
pub trait Config: 'static {
/// The accounts module to use.
type Accounts: modules::accounts::API;
/// The consensus module to use.
type Consensus: modules::consensus::API;

/// Maximum number of outgoing consensus message slots that an incoming message can claim.
///
/// When this is configured to be greater than zero it allows incoming messages to also emit
/// consensus messages as a result of executing a transaction.
const MAX_CONSENSUS_MSG_SLOTS_PER_TX: u32 = 1;
}
16 changes: 16 additions & 0 deletions runtime-sdk/src/modules/consensus_inmsg/events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use super::MODULE_NAME;
use crate::error;

/// Events emitted by the consensus incoming message handler module.
#[derive(Debug, cbor::Encode, oasis_runtime_sdk_macros::Event)]
#[cbor(untagged)]
pub enum Event {
#[sdk_event(code = 1)]
Processed {
id: u64,
#[cbor(optional)]
tag: u64,
#[cbor(optional)]
error: Option<error::SerializableError>,
},
}
Loading

0 comments on commit b475b3b

Please sign in to comment.