Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(f3): implement Finalize for EC backend #4897

Merged
merged 9 commits into from
Oct 16, 2024
2 changes: 2 additions & 0 deletions .config/forest.dic
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Enum
EOF
Ethereum
exa
F3
FFI
FIL
Filecoin/M
Filops
Expand Down
8 changes: 5 additions & 3 deletions f3-sidecar/ffi_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (

func init() {
setGoDebugEnv()
logging.SetAllLoggers(logging.LevelWarn)
err := logging.SetLogLevel("f3/sidecar", "info")
logging.SetAllLoggers(logging.LevelInfo)
err := logging.SetLogLevel("dht", "error")
checkError(err)
err = logging.SetLogLevel("f3", "info")
err = logging.SetLogLevel("net/identify", "error")
checkError(err)
err = logging.SetLogLevel("f3/sidecar", "debug")
checkError(err)
GoF3NodeImpl = &f3Impl{ctx: context.Background()}
}
Expand Down
9 changes: 6 additions & 3 deletions f3-sidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ import (
var logger = logging.Logger("f3/sidecar")

func main() {
logging.SetAllLoggers(logging.LevelError)
if err := logging.SetLogLevel("f3/sidecar", "debug"); err != nil {
logging.SetAllLoggers(logging.LevelInfo)
if err := logging.SetLogLevel("dht", "error"); err != nil {
panic(err)
}
if err := logging.SetLogLevel("net/identify", "error"); err != nil {
panic(err)
}
if err := logging.SetLogLevel("f3", "debug"); err != nil {
if err := logging.SetLogLevel("f3/sidecar", "debug"); err != nil {
panic(err)
}

Expand Down
3 changes: 1 addition & 2 deletions f3-sidecar/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ func run(ctx context.Context, rpcEndpoint string, f3RpcEndpoint string, initialP
} else {
m.BootstrapEpoch = bootstrapEpoch
}
m.CommitteeLookback = 5
// m.Pause = true
m.CommitteeLookback = manifest.DefaultCommitteeLookback

var manifestProvider manifest.ManifestProvider
switch manifestServerID, err := peer.Decode(manifestServer); {
Expand Down
3 changes: 2 additions & 1 deletion src/blocks/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ pub struct CachingBlockHeader {

impl PartialEq for CachingBlockHeader {
fn eq(&self, other: &Self) -> bool {
self.cid() == other.cid()
// Epoch check is redundant but cheap.
self.uncached.epoch == other.uncached.epoch && self.cid() == other.cid()
}
}

Expand Down
5 changes: 5 additions & 0 deletions src/chain_sync/chain_muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ where
})
}

/// Returns a clone of the inner [`SyncNetworkContext`]
pub fn sync_network_context(&self) -> SyncNetworkContext<DB> {
self.network.clone()
}

/// Returns a clone of the bad blocks cache to be used outside of chain
/// sync.
pub fn bad_blocks_cloned(&self) -> Arc<BadBlockCache> {
Expand Down
2 changes: 1 addition & 1 deletion src/chain_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod bad_block_cache;
mod chain_muxer;
pub mod consensus;
mod metrics;
mod network_context;
pub mod network_context;
mod sync_state;
mod tipset_syncer;
mod validation;
Expand Down
8 changes: 6 additions & 2 deletions src/chain_sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ const MAX_CONCURRENT_CHAIN_EXCHANGE_REQUESTS: usize = 2;
/// Context used in chain sync to handle network requests.
/// This contains the peer manager, P2P service interface, and [`Blockstore`]
/// required to make network requests.
pub(in crate::chain_sync) struct SyncNetworkContext<DB> {
pub struct SyncNetworkContext<DB> {
/// Channel to send network messages through P2P service
network_send: flume::Sender<NetworkMessage>,

/// Manages peers to send requests to and updates request stats for the
/// respective peers.
peer_manager: Arc<PeerManager>,
Expand Down Expand Up @@ -141,6 +140,11 @@ where
self.peer_manager.as_ref()
}

/// Returns a reference to the channel for sending network messages through P2P service.
pub fn network_send(&self) -> &flume::Sender<NetworkMessage> {
&self.network_send
}

/// Send a `chain_exchange` request for only block headers (ignore
/// messages). If `peer_id` is `None`, requests will be sent to a set of
/// shuffled peers.
Expand Down
5 changes: 4 additions & 1 deletion src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ pub(super) async fn start(
)?;
let bad_blocks = chain_muxer.bad_blocks_cloned();
let sync_state = chain_muxer.sync_state_cloned();
let sync_network_context = chain_muxer.sync_network_context();
services.spawn(async { Err(anyhow::anyhow!("{}", chain_muxer.await)) });

if config.client.enable_health_check {
Expand Down Expand Up @@ -402,7 +403,7 @@ pub(super) async fn start(
bad_blocks,
sync_state,
eth_event_handler: Arc::new(EthEventHandler::new()),
network_send,
sync_network_context,
network_name,
start_time,
shutdown: shutdown_send,
Expand All @@ -414,6 +415,7 @@ pub(super) async fn start(
});

services.spawn_blocking({
let chain_config = chain_config.clone();
let default_f3_root = config.client.data_dir.join(format!("f3/{}", config.chain));
let crate::f3::F3Options {
chain_finality,
Expand All @@ -423,6 +425,7 @@ pub(super) async fn start(
} = crate::f3::get_f3_sidecar_params(&chain_config);
move || {
crate::f3::run_f3_sidecar_if_enabled(
&chain_config,
format!("http://{rpc_address}/rpc/v1"),
crate::rpc::f3::get_f3_rpc_endpoint().to_string(),
initial_power_table.to_string(),
Expand Down
15 changes: 9 additions & 6 deletions src/f3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use go_ffi::*;
use cid::Cid;
use libp2p::PeerId;

use crate::{networks::ChainConfig, utils::misc::env::is_env_truthy};
use crate::{networks::ChainConfig, utils::misc::env::is_env_set_and_truthy};

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct F3Options {
Expand Down Expand Up @@ -84,7 +84,9 @@ pub fn get_f3_sidecar_params(chain_config: &ChainConfig) -> F3Options {
}
}

#[allow(clippy::too_many_arguments)]
pub fn run_f3_sidecar_if_enabled(
chain_config: &ChainConfig,
_rpc_endpoint: String,
_f3_rpc_endpoint: String,
_initial_power_table: String,
Expand All @@ -93,7 +95,7 @@ pub fn run_f3_sidecar_if_enabled(
_f3_root: String,
_manifest_server: String,
) {
if is_sidecar_ffi_enabled() {
if is_sidecar_ffi_enabled(chain_config) {
#[cfg(all(f3sidecar, not(feature = "no-f3-sidecar")))]
{
GoF3NodeImpl::run(
Expand All @@ -109,10 +111,11 @@ pub fn run_f3_sidecar_if_enabled(
}
}

// Use opt-in mode for now. Consider switching to opt-out mode once F3 is shipped.
fn is_sidecar_ffi_enabled() -> bool {
// Opt-out building the F3 sidecar staticlib
let enabled = is_env_truthy("FOREST_F3_SIDECAR_FFI_ENABLED");
/// Whether F3 sidecar via FFI is enabled.
fn is_sidecar_ffi_enabled(chain_config: &ChainConfig) -> bool {
// Respect the environment variable when set, and fallback to chain config when not set.
let enabled =
is_env_set_and_truthy("FOREST_F3_SIDECAR_FFI_ENABLED").unwrap_or(chain_config.f3_enabled);
cfg_if::cfg_if! {
if #[cfg(all(f3sidecar, not(feature = "no-f3-sidecar")))] {
enabled
Expand Down
15 changes: 14 additions & 1 deletion src/networks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ pub struct ChainConfig {
pub breeze_gas_tamping_duration: i64,
// FIP0081 gradually comes into effect over this many epochs.
pub fip0081_ramp_duration_epochs: u64,
pub f3_enabled: bool,
// F3Consensus set whether F3 should checkpoint tipsets finalized by F3. This flag has no effect if F3 is not enabled.
pub f3_consensus: bool,
pub f3_bootstrap_epoch: i64,
pub f3_initial_power_table: Cid,
// This will likely be deprecated once F3 is fully bootstrapped to avoid single point network dependencies.
Expand All @@ -254,6 +257,8 @@ impl ChainConfig {
breeze_gas_tamping_duration: BREEZE_GAS_TAMPING_DURATION,
// 1 year on mainnet
fip0081_ramp_duration_epochs: 365 * EPOCHS_IN_DAY as u64,
f3_enabled: false,
f3_consensus: false,
f3_bootstrap_epoch: -1,
f3_initial_power_table: Default::default(),
f3_manifest_server: Some(
Expand Down Expand Up @@ -282,6 +287,10 @@ impl ChainConfig {
breeze_gas_tamping_duration: BREEZE_GAS_TAMPING_DURATION,
// 3 days on calibnet
fip0081_ramp_duration_epochs: 3 * EPOCHS_IN_DAY as u64,
// Enable after `f3_initial_power_table` is determined and set to avoid GC hell
// (state tree of epoch 2_081_674 - 900 has to be present in the database if `f3_initial_power_table` is not set)
f3_enabled: false,
f3_consensus: true,
// 2024-10-24T13:30:00Z
f3_bootstrap_epoch: 2_081_674,
f3_initial_power_table: Default::default(),
Expand All @@ -308,6 +317,8 @@ impl ChainConfig {
breeze_gas_tamping_duration: BREEZE_GAS_TAMPING_DURATION,
// Devnet ramp is 200 epochs in Lotus (subject to change).
fip0081_ramp_duration_epochs: env_or_default(ENV_PLEDGE_RULE_RAMP, 200),
f3_enabled: false,
f3_consensus: false,
f3_bootstrap_epoch: -1,
f3_initial_power_table: Default::default(),
f3_manifest_server: None,
Expand Down Expand Up @@ -335,7 +346,9 @@ impl ChainConfig {
ENV_PLEDGE_RULE_RAMP,
365 * EPOCHS_IN_DAY as u64,
),
f3_bootstrap_epoch: -1,
f3_enabled: true,
f3_consensus: true,
f3_bootstrap_epoch: 2760,
f3_initial_power_table: Default::default(),
f3_manifest_server: Some(
"12D3KooWJr9jy4ngtJNR7JC1xgLFra3DjEtyxskRYWvBK9TC3Yn6"
Expand Down
1 change: 1 addition & 0 deletions src/rpc/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ macro_rules! from2internal {
// TODO(forest): https://github.com/ChainSafe/forest/issues/3965
// Just mapping everything to an internal error is not appropriate
from2internal! {
String,
anyhow::Error,
base64::DecodeError,
cid::multibase::Error,
Expand Down
47 changes: 42 additions & 5 deletions src/rpc/methods/f3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ use crate::{
clock::ChainEpoch,
crypto::Signature,
},
utils::misc::env::is_env_set_and_truthy,
};
use ahash::{HashMap, HashSet};
use anyhow::Context as _;
use fil_actor_interface::{
convert::{
from_policy_v13_to_v10, from_policy_v13_to_v11, from_policy_v13_to_v12,
Expand All @@ -37,7 +39,7 @@ use libp2p::PeerId;
use num::Signed as _;
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use std::{borrow::Cow, fmt::Display, str::FromStr as _, sync::Arc};
use std::{borrow::Cow, fmt::Display, num::NonZeroU64, str::FromStr as _, sync::Arc};

static F3_LEASE_MANAGER: Lazy<F3LeaseManager> = Lazy::new(Default::default);

Expand Down Expand Up @@ -423,7 +425,7 @@ impl RpcMethod<1> for ProtectPeer {
) -> Result<Self::Ok, ServerError> {
let peer_id = PeerId::from_str(&peer_id)?;
let (tx, rx) = flume::bounded(1);
ctx.network_send
ctx.network_send()
.send_async(NetworkMessage::JSONRPCRequest {
method: NetRPCMethods::ProtectPeer(tx, std::iter::once(peer_id).collect()),
})
Expand Down Expand Up @@ -463,10 +465,45 @@ impl RpcMethod<1> for Finalize {
type Ok = ();

async fn handle(
_: Ctx<impl Blockstore>,
(_tsk,): Self::Params,
ctx: Ctx<impl Blockstore>,
(f3_tsk,): Self::Params,
) -> Result<Self::Ok, ServerError> {
// TODO(hanabi1224): https://github.com/ChainSafe/forest/issues/4775
// Respect the environment variable when set, and fallback to chain config when not set.
let enabled = is_env_set_and_truthy("FOREST_F3_CONSENSUS_ENABLED")
.unwrap_or(ctx.chain_config().f3_consensus);
if !enabled {
return Ok(());
}

let tsk = f3_tsk.try_into()?;
let finalized_ts = match ctx.chain_index().load_tipset(&tsk)? {
Some(ts) => ts,
None => ctx
.sync_network_context
.chain_exchange_headers(None, &tsk, NonZeroU64::new(1).expect("Infallible"))
.await?
.first()
.cloned()
.with_context(|| format!("failed to get tipset via chain exchange. tsk: {tsk}"))?,
};
tracing::info!(
"F3 finalized tsk {} at epoch {}",
finalized_ts.key(),
finalized_ts.epoch()
);
let head = ctx.chain_store().heaviest_tipset();
if !head
.chain_arc(ctx.store())
.take_while(|ts| ts.epoch() >= finalized_ts.epoch())
.any(|ts| ts == finalized_ts)
{
tracing::info!(
"F3 reset chain head to tsk {} at epoch {}",
finalized_ts.key(),
finalized_ts.epoch()
);
ctx.chain_store().set_heaviest_tipset(finalized_ts)?;
}
Ok(())
}
}
Expand Down
Loading
Loading