Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/develop' into daniyar/lake-indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
itegulov committed Nov 14, 2023
2 parents 2c8c948 + 175a7b8 commit a952acd
Show file tree
Hide file tree
Showing 9 changed files with 487 additions and 343 deletions.
692 changes: 422 additions & 270 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions contract/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
target = "wasm32-unknown-unknown"
36 changes: 6 additions & 30 deletions contract/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use near_sdk::borsh::{self, BorshDeserialize, BorshSerialize};
use near_sdk::serde::{Deserialize, Serialize};
use near_sdk::{env, near_bindgen, AccountId, PublicKey};
use near_sdk::{env, near_bindgen, AccountId, PanicOnDefault, PublicKey};
use std::collections::{HashMap, HashSet};

type ParticipantId = u32;
Expand All @@ -25,7 +25,7 @@ pub struct ParticipantInfo {
}

#[derive(BorshDeserialize, BorshSerialize, Serialize, Deserialize, Debug)]
pub struct InitializedContractState {
pub struct InitializingContractState {
pub participants: HashMap<AccountId, ParticipantInfo>,
pub threshold: usize,
pub pk_votes: HashMap<PublicKey, HashSet<ParticipantId>>,
Expand Down Expand Up @@ -55,32 +55,23 @@ pub struct ResharingContractState {

#[derive(BorshDeserialize, BorshSerialize, Serialize, Deserialize, Debug)]
pub enum ProtocolContractState {
NonInitialized,
Initialized(InitializedContractState),
Initializing(InitializingContractState),
Running(RunningContractState),
Resharing(ResharingContractState),
}

#[near_bindgen]
#[derive(BorshDeserialize, BorshSerialize)]
#[derive(BorshDeserialize, BorshSerialize, PanicOnDefault)]
pub struct MpcContract {
protocol_state: ProtocolContractState,
}

impl Default for MpcContract {
fn default() -> Self {
Self {
protocol_state: ProtocolContractState::NonInitialized,
}
}
}

#[near_bindgen]
impl MpcContract {
#[init]
pub fn init(threshold: usize, participants: HashMap<AccountId, ParticipantInfo>) -> Self {
MpcContract {
protocol_state: ProtocolContractState::Initialized(InitializedContractState {
protocol_state: ProtocolContractState::Initializing(InitializingContractState {
participants,
threshold,
pk_votes: HashMap::new(),
Expand Down Expand Up @@ -112,9 +103,6 @@ impl MpcContract {
},
);
}
ProtocolContractState::NonInitialized => {
env::panic_str("protocol state hasn't been initialized yet")
}
_ => env::panic_str("protocol state can't accept new participants right now"),
}
}
Expand Down Expand Up @@ -157,9 +145,6 @@ impl MpcContract {
false
}
}
ProtocolContractState::NonInitialized => {
env::panic_str("protocol state hasn't been initialized yet")
}
_ => env::panic_str("protocol state can't accept new participants right now"),
}
}
Expand Down Expand Up @@ -202,16 +187,13 @@ impl MpcContract {
false
}
}
ProtocolContractState::NonInitialized => {
env::panic_str("protocol state hasn't been initialized yet")
}
_ => env::panic_str("protocol state can't kick participants right now"),
}
}

pub fn vote_pk(&mut self, public_key: PublicKey) -> bool {
match &mut self.protocol_state {
ProtocolContractState::Initialized(InitializedContractState {
ProtocolContractState::Initializing(InitializingContractState {
participants,
threshold,
pk_votes,
Expand All @@ -238,9 +220,6 @@ impl MpcContract {
false
}
}
ProtocolContractState::NonInitialized => {
env::panic_str("protocol state hasn't been initialized yet")
}
ProtocolContractState::Running(state) if state.public_key == public_key => true,
ProtocolContractState::Resharing(state) if state.public_key == public_key => true,
_ => env::panic_str("can't change public key anymore"),
Expand Down Expand Up @@ -281,9 +260,6 @@ impl MpcContract {
false
}
}
ProtocolContractState::NonInitialized => {
env::panic_str("protocol state hasn't been initialized yet")
}
ProtocolContractState::Running(state) => {
if state.epoch == epoch {
true
Expand Down
15 changes: 7 additions & 8 deletions integration-tests/src/env/local.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::env::{LeaderNodeApi, SignerNodeApi};
use crate::mpc::{self, NodeProcess};
use crate::util;
use aes_gcm::aead::consts::U32;
use aes_gcm::aead::generic_array::GenericArray;
use mpc_recovery::firewall::allowed::DelegateActionRelayer;
use mpc_recovery::logging;
use mpc_recovery::relayer::NearRpcAndRelayerClient;
use multi_party_eddsa::protocols::ExpandedKeyPair;

use crate::env::{LeaderNodeApi, SignerNodeApi};
use crate::mpc::{self, NodeProcess};
use crate::util;

pub struct SignerNode {
pub address: String,
env: String,
Expand All @@ -30,7 +29,7 @@ impl SignerNode {
cipher_key: &GenericArray<u8, U32>,
) -> anyhow::Result<Self> {
let web_port = util::pick_unused_port().await?;
let args = mpc_recovery::Cli::StartSign {
let cli = mpc_recovery::Cli::StartSign {
env: ctx.env.clone(),
node_id,
web_port,
Expand All @@ -43,7 +42,7 @@ impl SignerNode {
};

let sign_node_id = format!("sign-{node_id}");
let process = mpc::spawn(ctx.release, &sign_node_id, args).await?;
let process = mpc::spawn(ctx.release, &sign_node_id, cli).await?;
let address = format!("http://127.0.0.1:{web_port}");
tracing::info!("Signer node is starting at {}", address);
util::ping_until_ok(&address, 60).await?;
Expand Down Expand Up @@ -88,7 +87,7 @@ impl LeaderNode {
tracing::info!("Running leader node...");
let account_creator = &ctx.relayer_ctx.creator_account;
let web_port = util::pick_unused_port().await?;
let args = mpc_recovery::Cli::StartLeader {
let cli = mpc_recovery::Cli::StartLeader {
env: ctx.env.clone(),
web_port,
sign_nodes,
Expand Down Expand Up @@ -123,7 +122,7 @@ impl LeaderNode {
logging_options: logging::Options::default(),
};

let process = mpc::spawn(ctx.release, "leader", args).await?;
let process = mpc::spawn(ctx.release, "leader", cli).await?;
let address = format!("http://127.0.0.1:{web_port}");
tracing::info!("Leader node container is starting at {}", address);
util::ping_until_ok(&address, 60).await?;
Expand Down
1 change: 1 addition & 0 deletions mpc-recovery/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ async-trait = "0.1"
atty = "0.2"
axum = "0.6.19"
axum-extra = "0.7"
axum-tracing-opentelemetry = "0.14.1"
base64 = "0.21"
borsh = "0.10.3"
chrono = "0.4.24"
Expand Down
11 changes: 7 additions & 4 deletions mpc-recovery/src/leader_node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ use axum::{
Extension, Json, Router,
};
use axum_extra::extract::WithRejection;
use axum_tracing_opentelemetry::middleware::{OtelAxumLayer, OtelInResponseLayer};
use borsh::BorshDeserialize;
use curv::elliptic::curves::{Ed25519, Point};
use prometheus::{Encoder, TextEncoder};

use near_fetch::signer::KeyRotatingSigner;
use near_primitives::delegate_action::{DelegateAction, NonDelegateAction};
use near_primitives::transaction::{Action, DeleteKeyAction};
use near_primitives::types::AccountId;

use prometheus::{Encoder, TextEncoder};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
Expand Down Expand Up @@ -116,7 +115,11 @@ pub async fn run(config: Config) {
.route("/metrics", get(metrics))
.route_layer(middleware::from_fn(track_metrics))
.layer(Extension(state))
.layer(cors_layer);
.layer(cors_layer)
// Include trace context as header into the response
.layer(OtelInResponseLayer)
// Start OpenTelemetry trace on incoming request
.layer(OtelAxumLayer::default());

let addr = SocketAddr::from(([0, 0, 0, 0], port));
tracing::debug!(?addr, "starting http server");
Expand Down
38 changes: 25 additions & 13 deletions mpc-recovery/src/logging.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::trace::{self, RandomIdGenerator, Sampler, Tracer};
use opentelemetry::sdk::Resource;
use opentelemetry::KeyValue;
Expand All @@ -8,13 +9,13 @@ use std::sync::OnceLock;
use tracing::subscriber::DefaultGuard;
use tracing_appender::non_blocking::NonBlocking;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::filter::{Filtered, LevelFilter};
use tracing_subscriber::filter::Filtered;
use tracing_subscriber::layer::{Layered, SubscriberExt};
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::{fmt, reload, EnvFilter, Layer, Registry};

static LOG_LAYER_RELOAD_HANDLE: OnceLock<reload::Handle<EnvFilter, Registry>> = OnceLock::new();
static OTLP_LAYER_RELOAD_HANDLE: OnceLock<reload::Handle<LevelFilter, LogLayer<Registry>>> =
static OTLP_LAYER_RELOAD_HANDLE: OnceLock<reload::Handle<EnvFilter, LogLayer<Registry>>> =
OnceLock::new();

type LogLayer<Inner> = Layered<
Expand All @@ -27,7 +28,7 @@ type LogLayer<Inner> = Layered<
>;

type TracingLayer<Inner> = Layered<
Filtered<OpenTelemetryLayer<Inner, Tracer>, reload::Layer<LevelFilter, Inner>, Inner>,
Filtered<OpenTelemetryLayer<Inner, Tracer>, reload::Layer<EnvFilter, Inner>, Inner>,
Inner,
>;

Expand Down Expand Up @@ -88,24 +89,24 @@ pub struct Options {
value_enum,
default_value = "off"
)]
opentelemetry_level: OpenTelemetryLevel,
pub opentelemetry_level: OpenTelemetryLevel,

/// Opentelemetry gRPC collector endpoint.
#[clap(
long,
env("MPC_RECOVERY_OTLP_ENDPOINT"),
default_value = "http://localhost:4317"
)]
otlp_endpoint: String,
pub otlp_endpoint: String,

/// Whether the log needs to be colored.
#[clap(long, value_enum, default_value = "auto")]
color: ColorOutput,
pub color: ColorOutput,

/// Enable logging of spans. For instance, this prints timestamps of entering and exiting a span,
/// together with the span duration and used/idle CPU time.
#[clap(long)]
log_span_events: bool,
pub log_span_events: bool,
}

impl Default for Options {
Expand Down Expand Up @@ -191,17 +192,22 @@ async fn add_opentelemetry_layer<S>(
env: String,
node_id: String,
subscriber: S,
) -> (TracingLayer<S>, reload::Handle<LevelFilter, S>)
) -> (TracingLayer<S>, reload::Handle<EnvFilter, S>)
where
S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
{
let filter = match opentelemetry_level {
OpenTelemetryLevel::OFF => LevelFilter::OFF,
OpenTelemetryLevel::INFO => LevelFilter::INFO,
OpenTelemetryLevel::DEBUG => LevelFilter::DEBUG,
OpenTelemetryLevel::TRACE => LevelFilter::TRACE,
OpenTelemetryLevel::OFF => EnvFilter::new("off"),
OpenTelemetryLevel::INFO => EnvFilter::new("info"),
OpenTelemetryLevel::DEBUG => EnvFilter::new("debug"),
OpenTelemetryLevel::TRACE => EnvFilter::new("trace"),
};
let (filter, handle) = reload::Layer::<LevelFilter, S>::new(filter);
// `otel::tracing` should be a level info to emit opentelemetry trace & span
// `otel::setup` set to debug to log detected resources, configuration read and infered
let filter = filter
.add_directive("otel::tracing=trace".parse().unwrap())
.add_directive("otel=debug".parse().unwrap());
let (filter, handle) = reload::Layer::<EnvFilter, S>::new(filter);

let resource = vec![
KeyValue::new(SERVICE_NAME, format!("mpc:{}:{}", env, node_id)),
Expand All @@ -224,12 +230,18 @@ where
)
.install_batch(opentelemetry::runtime::Tokio)
.unwrap();

init_propagator();
let layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter);
(subscriber.with(layer), handle)
}

pub fn init_propagator() {
opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
}

fn set_default_otlp_level(options: &Options) {
// Record the initial tracing level specified as a command-line flag. Use this recorded value to
// reset opentelemetry filter when the LogConfig file gets deleted.
Expand Down
14 changes: 7 additions & 7 deletions node/src/protocol/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl ConsensusProtocol for StartedState {
private_share,
public_key,
}) => match contract_state {
ProtocolState::Initialized(_) => Err(ConsensusError::ContractStateRollback),
ProtocolState::Initializing(_) => Err(ConsensusError::ContractStateRollback),
ProtocolState::Running(contract_state) => {
if contract_state.public_key != public_key {
return Err(ConsensusError::MismatchedPublicKey);
Expand Down Expand Up @@ -131,7 +131,7 @@ impl ConsensusProtocol for StartedState {
}
},
None => match contract_state {
ProtocolState::Initialized(contract_state) => {
ProtocolState::Initializing(contract_state) => {
if contract_state.participants.contains_key(&ctx.me()) {
tracing::info!("starting key generation as a part of the participant set");
let participants = contract_state.participants;
Expand Down Expand Up @@ -169,7 +169,7 @@ impl ConsensusProtocol for GeneratingState {
contract_state: ProtocolState,
) -> Result<NodeState, ConsensusError> {
match contract_state {
ProtocolState::Initialized(_) => {
ProtocolState::Initializing(_) => {
tracing::debug!("continuing generation, contract state has not been finalized yet");
Ok(NodeState::Generating(self))
}
Expand Down Expand Up @@ -217,7 +217,7 @@ impl ConsensusProtocol for WaitingForConsensusState {
contract_state: ProtocolState,
) -> Result<NodeState, ConsensusError> {
match contract_state {
ProtocolState::Initialized(contract_state) => {
ProtocolState::Initializing(contract_state) => {
tracing::debug!("waiting for consensus, contract state has not been finalized yet");
let public_key = self.public_key.into_near_public_key();
let has_voted = contract_state
Expand Down Expand Up @@ -338,7 +338,7 @@ impl ConsensusProtocol for RunningState {
contract_state: ProtocolState,
) -> Result<NodeState, ConsensusError> {
match contract_state {
ProtocolState::Initialized(_) => Err(ConsensusError::ContractStateRollback),
ProtocolState::Initializing(_) => Err(ConsensusError::ContractStateRollback),
ProtocolState::Running(contract_state) => match contract_state.epoch.cmp(&self.epoch) {
Ordering::Greater => {
tracing::warn!(
Expand Down Expand Up @@ -404,7 +404,7 @@ impl ConsensusProtocol for ResharingState {
contract_state: ProtocolState,
) -> Result<NodeState, ConsensusError> {
match contract_state {
ProtocolState::Initialized(_) => Err(ConsensusError::ContractStateRollback),
ProtocolState::Initializing(_) => Err(ConsensusError::ContractStateRollback),
ProtocolState::Running(contract_state) => {
match contract_state.epoch.cmp(&(self.old_epoch + 1)) {
Ordering::Greater => {
Expand Down Expand Up @@ -476,7 +476,7 @@ impl ConsensusProtocol for JoiningState {
contract_state: ProtocolState,
) -> Result<NodeState, ConsensusError> {
match contract_state {
ProtocolState::Initialized(_) => Err(ConsensusError::ContractStateRollback),
ProtocolState::Initializing(_) => Err(ConsensusError::ContractStateRollback),
ProtocolState::Running(contract_state) => {
if contract_state.candidates.contains_key(&ctx.me()) {
let voted = contract_state
Expand Down
Loading

0 comments on commit a952acd

Please sign in to comment.