Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
chore: backport fix of 0.0.11 (#453)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
Co-authored-by: David Palm <[email protected]>
  • Loading branch information
Freyskeyd and dvdplm authored Feb 19, 2024
1 parent b0e88dc commit 53328ac
Show file tree
Hide file tree
Showing 26 changed files with 295 additions and 59 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ strip = true
[workspace.dependencies]
topos-core = { path = "./crates/topos-core", default-features = false }
topos-crypto = { path = "./crates/topos-crypto", default-features = false }
topos-metrics = { path = "./crates/topos-metrics/", default-features = false }

# Various utility crates
clap = { version = "4.0", features = ["derive", "env", "string"] }
Expand Down
6 changes: 6 additions & 0 deletions crates/topos-config/src/tce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ use topos_p2p::{Multiaddr, PeerId};

use self::broadcast::ReliableBroadcastParams;
use self::p2p::P2PConfig;
use self::synchronization::SynchronizationConfig;

pub mod broadcast;
pub mod p2p;
pub mod synchronization;

const DEFAULT_IP: std::net::Ipv4Addr = std::net::Ipv4Addr::new(0, 0, 0, 0);

Expand Down Expand Up @@ -68,6 +70,10 @@ pub struct TceConfig {
#[serde(default)]
pub p2p: P2PConfig,

/// Synchronization configuration
#[serde(default)]
pub synchronization: SynchronizationConfig,

/// gRPC API Addr
#[serde(default = "default_grpc_api_addr")]
pub grpc_api_addr: SocketAddr,
Expand Down
36 changes: 36 additions & 0 deletions crates/topos-config/src/tce/synchronization.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use serde::{Deserialize, Serialize};

/// Configuration for the TCE synchronization
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(rename_all = "kebab-case")]
pub struct SynchronizationConfig {
/// Interval in seconds to synchronize the TCE
#[serde(default = "SynchronizationConfig::default_interval_seconds")]
pub interval_seconds: u64,

/// Maximum number of Proof of delivery per query per subnet
#[serde(default = "SynchronizationConfig::default_limit_per_subnet")]
pub limit_per_subnet: usize,
}

impl Default for SynchronizationConfig {
fn default() -> Self {
Self {
interval_seconds: SynchronizationConfig::INTERVAL_SECONDS,
limit_per_subnet: SynchronizationConfig::LIMIT_PER_SUBNET,
}
}
}

impl SynchronizationConfig {
pub const INTERVAL_SECONDS: u64 = 10;
pub const LIMIT_PER_SUBNET: usize = 100;

const fn default_interval_seconds() -> u64 {
Self::INTERVAL_SECONDS
}

const fn default_limit_per_subnet() -> usize {
Self::LIMIT_PER_SUBNET
}
}
2 changes: 2 additions & 0 deletions crates/topos-core/proto/topos/tce/v1/synchronization.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ message CheckpointRequest {
topos.shared.v1.UUID request_id = 1;

repeated ProofOfDelivery checkpoint = 2;

uint64 limit_per_subnet = 3;
}

message CheckpointResponse {
Expand Down
Binary file modified crates/topos-core/src/api/grpc/generated/topos.bin
Binary file not shown.
2 changes: 2 additions & 0 deletions crates/topos-core/src/api/grpc/generated/topos.tce.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub struct CheckpointRequest {
pub request_id: ::core::option::Option<super::super::shared::v1::Uuid>,
#[prost(message, repeated, tag = "2")]
pub checkpoint: ::prost::alloc::vec::Vec<ProofOfDelivery>,
#[prost(uint64, tag = "3")]
pub limit_per_subnet: u64,
}
#[derive(serde::Deserialize, serde::Serialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
16 changes: 14 additions & 2 deletions crates/topos-metrics/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use prometheus::{
self, register_histogram_with_registry, register_int_counter_with_registry, Histogram,
IntCounter,
self, register_histogram_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge,
};

use lazy_static::lazy_static;
Expand Down Expand Up @@ -31,4 +31,16 @@ lazy_static! {
TOPOS_METRIC_REGISTRY
)
.unwrap();
pub static ref STORAGE_PENDING_POOL_COUNT: IntGauge = register_int_gauge_with_registry!(
"storage_pending_pool_count",
"Number of certificates in the pending pool.",
TOPOS_METRIC_REGISTRY
)
.unwrap();
pub static ref STORAGE_PRECEDENCE_POOL_COUNT: IntGauge = register_int_gauge_with_registry!(
"storage_precedence_pool_count",
"Number of certificates in the precedence pool.",
TOPOS_METRIC_REGISTRY
)
.unwrap();
}
19 changes: 10 additions & 9 deletions crates/topos-sequencer-subnet-runtime/tests/subnet_contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,18 +321,19 @@ async fn deploy_test_token(
"Deploying new token {} with symbol {}",
token_name, token_symbol
);

let deploy_query = ierc20_messaging
let deploy_outcome = ierc20_messaging
.deploy_token(token_encoded_params)
.legacy()
.gas(DEFAULT_GAS);

let deploy_result = deploy_query.send().await.map_err(|e| {
error!("Unable deploy token: {e}");
e
})?;
.gas(DEFAULT_GAS)
.send()
.await
.map_err(|e| {
error!("Unable deploy token: {e}");
e
})?
.await;

match deploy_result.await {
match deploy_outcome {
Ok(r) => {
info!("Token deployed: {:?}", r);
}
Expand Down
16 changes: 16 additions & 0 deletions crates/topos-tce-api/src/graphql/query.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::sync::Arc;

use async_graphql::{Context, EmptyMutation, Object, Schema, Subscription};
Expand All @@ -13,6 +14,7 @@ use topos_core::api::graphql::{
query::CertificateQuery,
};
use topos_core::types::stream::CertificateSourceStreamPosition;
use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT};
use topos_tce_storage::fullnode::FullNodeStore;
use topos_tce_storage::store::ReadStore;

Expand Down Expand Up @@ -114,6 +116,20 @@ impl QueryRoot {
Self::certificate_by_id(ctx, certificate_id).await
}

/// This endpoint is used to get the current storage pool stats.
/// It returns the number of certificates in the pending and precedence pools.
/// The values are estimated as having a precise count is costly.
async fn get_storage_pool_stats(
&self,
_ctx: &Context<'_>,
) -> Result<HashMap<&str, i64>, GraphQLServerError> {
let mut stats = HashMap::new();
stats.insert("pending_pool", STORAGE_PENDING_POOL_COUNT.get());
stats.insert("precedence_pool", STORAGE_PRECEDENCE_POOL_COUNT.get());

Ok(stats)
}

/// This endpoint is used to get the current checkpoint of the source streams.
/// The checkpoint is the position of the last certificate delivered for each source stream.
async fn get_checkpoint(
Expand Down
71 changes: 71 additions & 0 deletions crates/topos-tce-api/tests/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use topos_core::{
},
uci::Certificate,
};
use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT};
use topos_tce_api::{Runtime, RuntimeEvent};
use topos_tce_storage::types::CertificateDeliveredWithPositions;
use topos_tce_storage::StorageClient;
Expand Down Expand Up @@ -621,3 +622,73 @@ async fn can_query_graphql_endpoint_for_certificates(
graphql_certificate.source_subnet_id
);
}

#[rstest]
#[timeout(Duration::from_secs(4))]
#[test(tokio::test)]
async fn check_storage_pool_stats(
broadcast_stream: broadcast::Receiver<CertificateDeliveredWithPositions>,
) {
let addr = get_available_addr();
let graphql_addr = get_available_addr();
let metrics_addr = get_available_addr();

let fullnode_store = create_fullnode_store::default().await;

let store =
create_validator_store(vec![], futures::future::ready(fullnode_store.clone())).await;
STORAGE_PENDING_POOL_COUNT.set(10);
STORAGE_PRECEDENCE_POOL_COUNT.set(200);

let storage_client = StorageClient::new(store.clone());

let (_runtime_client, _launcher, _ctx) = Runtime::builder()
.with_broadcast_stream(broadcast_stream)
.storage(storage_client)
.store(store)
.serve_grpc_addr(addr)
.serve_graphql_addr(graphql_addr)
.serve_metrics_addr(metrics_addr)
.build_and_launch()
.await;

// Wait for server to boot
tokio::time::sleep(Duration::from_millis(100)).await;

let query = "query {getStoragePoolStats}";

#[derive(Debug, Deserialize)]
struct Response {
// data: HashMap<String, serde_json::Value>,
data: Stats,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Stats {
get_storage_pool_stats: PoolStats,
}

#[derive(Debug, Deserialize)]
struct PoolStats {
pending_pool: u64,
precedence_pool: u64,
}

let client = reqwest::Client::new();

let response = client
.post(format!("http://{}", graphql_addr))
.json(&serde_json::json!({
"query": query,
}))
.send()
.await
.unwrap()
.json::<Response>()
.await
.unwrap();

assert_eq!(response.data.get_storage_pool_stats.pending_pool, 10);
assert_eq!(response.data.get_storage_pool_stats.precedence_pool, 200);
}
1 change: 1 addition & 0 deletions crates/topos-tce-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ workspace = true

[dependencies]
topos-core = { workspace = true, features = ["uci", "api"] }
topos-metrics = { workspace = true }

async-stream.workspace = true
async-trait.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-tce-storage/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub enum InternalStorageError {
#[error("Invalid query argument: {0}")]
InvalidQueryArgument(&'static str),

#[error("Unexpected DB state: {0}")]
UnexpectedDBState(&'static str),

#[error(transparent)]
Bincode(#[from] Box<bincode::ErrorKind>),

Expand Down
8 changes: 6 additions & 2 deletions crates/topos-tce-storage/src/fullnode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{collections::HashMap, sync::Arc};
use arc_swap::ArcSwap;
use async_trait::async_trait;

use rocksdb::properties::ESTIMATE_NUM_KEYS;
use topos_core::{
types::{
stream::{CertificateSourceStreamPosition, CertificateTargetStreamPosition, Position},
Expand Down Expand Up @@ -233,8 +234,11 @@ impl WriteStore for FullNodeStore {
}

impl ReadStore for FullNodeStore {
fn count_certificates_delivered(&self) -> Result<usize, StorageError> {
Ok(self.perpetual_tables.certificates.iter()?.count())
fn count_certificates_delivered(&self) -> Result<u64, StorageError> {
Ok(self
.perpetual_tables
.certificates
.property_int_value(ESTIMATE_NUM_KEYS)?)
}

fn get_source_head(&self, subnet_id: &SubnetId) -> Result<Option<SourceHead>, StorageError> {
Expand Down
17 changes: 14 additions & 3 deletions crates/topos-tce-storage/src/rocks/db_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::path::Path;
#[cfg(test)]
use rocksdb::ColumnFamilyDescriptor;
use rocksdb::{
BoundColumnFamily, DBRawIteratorWithThreadMode, DBWithThreadMode, Direction, IteratorMode,
MultiThreaded, ReadOptions, WriteBatch,
BoundColumnFamily, CStrLike, DBRawIteratorWithThreadMode, DBWithThreadMode, Direction,
IteratorMode, MultiThreaded, ReadOptions, WriteBatch,
};

use bincode::Options;
Expand Down Expand Up @@ -66,7 +66,7 @@ impl<K, V> DBColumn<K, V> {
}

/// Returns the CF of the DBColumn, used to build queries.
fn cf(&self) -> Result<Arc<BoundColumnFamily<'_>>, InternalStorageError> {
pub(crate) fn cf(&self) -> Result<Arc<BoundColumnFamily<'_>>, InternalStorageError> {
self.rocksdb
.cf_handle(self.cf)
.ok_or(InternalStorageError::InvalidColumnFamily(self.cf))
Expand All @@ -78,6 +78,17 @@ where
K: DeserializeOwned + Serialize + std::fmt::Debug,
V: DeserializeOwned + Serialize + std::fmt::Debug,
{
pub(crate) fn property_int_value(
&self,
property: impl CStrLike,
) -> Result<u64, InternalStorageError> {
self.rocksdb
.property_int_value_cf(&self.cf()?, property)?
.ok_or(InternalStorageError::UnexpectedDBState(
"Property not found",
))
}

/// Insert a record into the storage by passing a Key and a Value.
///
/// Key are fixed length bincode serialized.
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce-storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub trait WriteStore: Send {
/// [`FullNodeStore`](struct@super::fullnode::FullNodeStore) to read data.
pub trait ReadStore: Send {
/// Returns the number of certificates delivered
fn count_certificates_delivered(&self) -> Result<usize, StorageError>;
fn count_certificates_delivered(&self) -> Result<u64, StorageError>;

/// Try to get a SourceHead of a subnet
///
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-storage/src/tests/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn get_checkpoint_diff_with_no_input(store: Arc<ValidatorStore>) {
}

let checkpoint = store
.get_checkpoint_diff(&[])
.get_checkpoint_diff(&[], 100)
.unwrap()
.into_iter()
.map(|(subnet, proofs)| {
Expand Down Expand Up @@ -97,7 +97,7 @@ async fn get_checkpoint_diff_with_input(store: Arc<ValidatorStore>) {
}

let checkpoint = store
.get_checkpoint_diff(&[checkpoint])
.get_checkpoint_diff(&[checkpoint], 100)
.unwrap()
.into_iter()
.map(|(subnet, proofs)| {
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-storage/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,15 +485,15 @@ async fn get_source_head_for_subnet(store: Arc<ValidatorStore>) {
create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_2], 10);

store
.insert_certificates_delivered(&expected_certificates_for_source_subnet_1)
.insert_certificates_delivered(&expected_certificates_for_source_subnet_1[..])
.await
.unwrap();

let expected_certificates_for_source_subnet_2 =
create_certificate_chain(SOURCE_SUBNET_ID_2, &[TARGET_SUBNET_ID_2], 10);

store
.insert_certificates_delivered(&expected_certificates_for_source_subnet_2)
.insert_certificates_delivered(&expected_certificates_for_source_subnet_2[..])
.await
.unwrap();

Expand Down
Loading

0 comments on commit 53328ac

Please sign in to comment.