Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update avail-core dep #243

Merged
merged 10 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,523 changes: 2,436 additions & 1,087 deletions Cargo.lock

Large diffs are not rendered by default.

60 changes: 33 additions & 27 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,55 +7,61 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
avail-subxt = { git = "https://github.com/availproject/avail.git", tag = "v1.6.1-rc3" }
chrono = "0.4.19"
# Internal deps
avail-core = { version = "0.5", git = "https://github.com/availproject/avail-core", tag = "avail-core/v0.5.0" }
avail-subxt = { version = "0.3", git = "https://github.com/availproject/avail.git", tag = "v1.6.3" }
dusk-plonk = { git = "https://github.com/availproject/plonk.git", tag = "v0.12.0-polygon-2" }
futures = { version = "0.3.15", default-features = false, features = ["std", "thread-pool"] }
futures-util = "0.3.17"
hyper = { version = "0.14.23", features = ["full", "http1"] }
hyper-tls = "0.5.0"
kate-recovery = { version = "0.8", git = "https://github.com/availproject/avail-core", tag = "da-primitives/v0.4.6" }
num = "0.4.0"
rand = "0.8.4"
regex = "1.5"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.68"
tokio = { version = "1.21.2", features = ["full"] }
tokio-stream = "0.1.11"
url = "2.2.2"
kate-recovery = { version = "0.9", git = "https://github.com/availproject/avail-core", tag = "avail-core/v0.5.0" }

# Substrate
codec = { package = "parity-scale-codec", version = "3", default-features = false, features = ["derive", "full", "bit-vec"] }
scale-info = { version = "2", features = ["bit-vec"] }
sp-arithmetic = { version = "*" }
sp-core = { version = "*" }

# 3rd-party
anyhow = "1.0.41"
async-std = { version = "1.12.0", features = ["attributes"] }
async-trait = "0.1.66"
base64 = "0.21.0"
chrono = "0.4.19"
clap = { version = "4.3.0", features = ["cargo"] }
codec = { package = "parity-scale-codec", version = "3.0.0", default-features = false, features = ["derive", "full", "bit-vec"] }
confy = "0.4.0"
ed25519-dalek = "1.0.1"
futures = { version = "0.3.15", default-features = false, features = ["std", "thread-pool"] }
futures-util = "0.3.17"
hex = "0.4"
hex-literal = "0.4.0"
hyper = { version = "0.14.23", features = ["full", "http1"] }
hyper-tls = "0.5.0"
itertools = "0.10.5"
libipld = { version = "0.12.0", default-features = false, features = ["dag-cbor"] }
libp2p = { version = "0.51.0", features = ["full"] }
mockall = "0.11.3"
multihash = { version = "0.14.0", default-features = false, features = ["blake3", "sha3"] }
num = "0.4.0"
num_cpus = "1.13.0"
openssl = { version = "0.10", features = ["vendored"] }
pcap = "1.1.0"
prometheus-client = "0.19.0"
rand = "0.8.4"
rand_chacha = "0.3"
regex = "1.5"
rocksdb = { version = "0.17.0", features = ["snappy", "multi-threaded-cf"] }
scale-info = { version = "2.0.0", features = ["bit-vec"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0.68"
smallvec = "1.6.1"
sp-core = "6.0.0"
tempdir = "0.3.7"
thiserror = "1.0.37"
threadpool = "1.8.1"
tokio = { version = "1.25", features = ["full"] }
tokio-stream = "0.1.12"
tracing = "0.1.35"
tracing-subscriber = { version = "0.3.15", features = ["json"] }
warp = "0.3.2"

async-trait = "0.1.66"
base64 = "0.21.0"
hex-literal = "0.4.0"
itertools = "0.10.5"
mockall = "0.11.3"
openssl = { version = "0.10", features = ["vendored"] }
pcap = "1.1.0"
url = "2.2.2"
uuid = { version = "1.3.4", features = ["v4", "fast-rng", "macro-diagnostics"] }
void = "1.0.2"
warp = "0.3.2"

[features]
network-analysis = []
Expand Down
69 changes: 32 additions & 37 deletions src/app_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,22 @@

use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use avail_core::AppId;
use avail_subxt::{avail, utils::H256};
use codec::Encode;
use dusk_plonk::commitment_scheme::kzg10::PublicParameters;
use kate_recovery::{
com::{app_specific_rows, columns_positions, decode_app_extrinsics, reconstruct_columns},
com::{
app_specific_rows, columns_positions, decode_app_extrinsics, reconstruct_columns, Percent,
},
commitments,
config::{self, CHUNK_SIZE},
data::{Cell, DataCell},
matrix::{Dimensions, Position},
};
use mockall::automock;
use rand::SeedableRng as _;
use rand_chacha::ChaChaRng;
use rocksdb::DB;
use std::{
collections::{HashMap, HashSet},
Expand All @@ -47,15 +52,15 @@ trait AppClient {
&self,
pp: PublicParameters,
block_number: u32,
dimensions: &Dimensions,
dimensions: Dimensions,
commitments: &[[u8; config::COMMITMENT_SIZE]],
missing_rows: &[u32],
) -> Result<Vec<(u32, Vec<u8>)>>;

async fn fetch_rows_from_dht(
&self,
block_number: u32,
dimensions: &Dimensions,
dimensions: Dimensions,
row_indexes: &[u32],
) -> Vec<Option<Vec<u8>>>;

Expand All @@ -64,7 +69,7 @@ trait AppClient {

fn store_encoded_data_in_db<T: Encode + 'static>(
&self,
app_id: u32,
app_id: AppId,
block_number: u32,
data: &T,
) -> Result<()>;
Expand All @@ -83,7 +88,7 @@ impl AppClient for AppClientImpl {
&self,
pp: PublicParameters,
block_number: u32,
dimensions: &Dimensions,
dimensions: Dimensions,
commitments: &[[u8; config::COMMITMENT_SIZE]],
missing_rows: &[u32],
) -> Result<Vec<(u32, Vec<u8>)>> {
Expand Down Expand Up @@ -114,7 +119,9 @@ impl AppClient for AppClientImpl {
unfetched.len()
);

let missing_cells = columns_positions(dimensions, &unfetched, 0.66);
let mut rng = ChaChaRng::from_seed(Default::default());
let missing_cells =
columns_positions(dimensions, &unfetched, Percent::from_percent(66), &mut rng);

let (missing_fetched, _) = fetch_verified(
&pp,
Expand Down Expand Up @@ -163,7 +170,7 @@ impl AppClient for AppClientImpl {
.flat_map(|cell| cell.data)
.collect::<Vec<_>>();

if data.len() != dimensions.cols() as usize * config::CHUNK_SIZE {
if data.len() != dimensions.width() * config::CHUNK_SIZE {
return Err(anyhow!("Row size is not valid after reconstruction"));
}

Expand All @@ -175,7 +182,7 @@ impl AppClient for AppClientImpl {
async fn fetch_rows_from_dht(
&self,
block_number: u32,
dimensions: &Dimensions,
dimensions: Dimensions,
row_indexes: &[u32],
) -> Vec<Option<Vec<u8>>> {
self.network_client
Expand All @@ -193,7 +200,7 @@ impl AppClient for AppClientImpl {

fn store_encoded_data_in_db<T: Encode + 'static>(
&self,
app_id: u32,
app_id: AppId,
block_number: u32,
data: &T,
) -> Result<()> {
Expand Down Expand Up @@ -249,7 +256,7 @@ async fn fetch_verified(
pp: &PublicParameters,
network_client: &Client,
block_number: u32,
dimensions: &Dimensions,
dimensions: Dimensions,
commitments: &[[u8; config::COMMITMENT_SIZE]],
positions: &[Position],
) -> Result<(Vec<Cell>, Vec<Position>)> {
Expand All @@ -271,13 +278,13 @@ async fn fetch_verified(
async fn process_block(
app_client: impl AppClient,
cfg: &AppClientConfig,
app_id: u32,
app_id: AppId,
block: &BlockVerified,
pp: PublicParameters,
) -> Result<()> {
let lookup = &block.lookup;
let block_number = block.block_num;
let dimensions = &block.dimensions;
let dimensions = block.dimensions;

let commitments = &block.commitments;

Expand Down Expand Up @@ -352,7 +359,7 @@ async fn process_block(
missing_rows.len()
);

if missing_rows.len() * dimensions.cols() as usize > cfg.threshold {
if missing_rows.len() * dimensions.width() > cfg.threshold {
return Err(anyhow::anyhow!("Too many cells are missing"));
}

Expand Down Expand Up @@ -410,7 +417,7 @@ pub async fn run(
db: Arc<DB>,
network_client: Client,
rpc_client: avail::Client,
app_id: u32,
app_id: AppId,
mut block_receive: Receiver<BlockVerified>,
pp: PublicParameters,
) {
Expand All @@ -422,18 +429,7 @@ pub async fn run(

info!(block_number, "Block available: {dimensions:?}");

if block.dimensions.cols() == 0 {
info!(block_number, "Skipping empty block");
continue;
}

if block
.lookup
.index
.iter()
.filter(|&(id, _)| id == &app_id)
.count() == 0
{
if block.lookup.range_of(app_id).is_none() {
info!(
block_number,
"Skipping block with no cells for app {app_id}"
Expand All @@ -459,8 +455,9 @@ pub async fn run(
mod tests {
use super::*;
use crate::types::{AppClientConfig, RuntimeConfig};
use avail_core::DataLookup;
use hex_literal::hex;
use kate_recovery::{index::AppDataIndex, matrix::Dimensions, testnet};
use kate_recovery::{matrix::Dimensions, testnet};

#[tokio::test]
async fn test_process_blocks_without_rpc() {
Expand All @@ -475,15 +472,14 @@ mod tests {
]
.to_vec();

let id_lens: Vec<(u32, usize)> = vec![(0, 1), (1, 69)];
let lookup = DataLookup::from_id_and_len_iter(id_lens.into_iter()).unwrap();
let block = BlockVerified {
header_hash: hex!("ec30fcc1f32db0f51ce6305c2601089741ea0b42853f402194b49b04bf936338")
.into(),
block_num: 270,
dimensions,
lookup: AppDataIndex {
size: 70,
index: [(1, 1)].to_vec(),
},
lookup,
commitments: [
[
171, 159, 250, 70, 135, 100, 125, 155, 243, 109, 243, 101, 158, 165, 185, 147,
Expand Down Expand Up @@ -513,7 +509,7 @@ mod tests {
mock_client
.expect_store_encoded_data_in_db()
.returning(|_, _, _: &Vec<Vec<u8>>| Ok(()));
process_block(mock_client, &cfg, 1, &block, pp)
process_block(mock_client, &cfg, AppId(1), &block, pp)
.await
.unwrap();
}
Expand All @@ -532,15 +528,14 @@ mod tests {
]
.to_vec();

let id_lens: Vec<(u32, usize)> = vec![(0, 1), (1, 11)];
let lookup = DataLookup::from_id_and_len_iter(id_lens.into_iter()).unwrap();
let block = BlockVerified {
header_hash: hex!("5bc959e1d05c68f7e1b5bc3a83cfba4efe636ce7f86102c30bcd6a2794e75afe")
.into(),
block_num: 288,
dimensions,
lookup: AppDataIndex {
size: 12,
index: [(1, 1)].to_vec(),
},
lookup,
commitments: [
[
165, 227, 207, 130, 59, 77, 78, 242, 184, 232, 114, 218, 145, 167, 149, 53, 89,
Expand Down Expand Up @@ -575,7 +570,7 @@ mod tests {
mock_client
.expect_store_encoded_data_in_db()
.returning(|_, _, _: &Vec<Vec<u8>>| Ok(()));
process_block(mock_client, &cfg, 1, &block, pp)
process_block(mock_client, &cfg, AppId(1), &block, pp)
.await
.unwrap();
}
Expand Down
7 changes: 4 additions & 3 deletions src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use std::sync::Arc;

use anyhow::{anyhow, Context, Result};
use avail_core::AppId;
use avail_subxt::primitives::Header as DaHeader;
use avail_subxt::utils::H256;
use codec::{Decode, Encode};
Expand Down Expand Up @@ -42,8 +43,8 @@ pub fn get_last_full_node_ws_from_db(db: Arc<DB>) -> Result<Option<String>> {
.map(Some)?)
}

fn store_data_in_db(db: Arc<DB>, app_id: u32, block_number: u32, data: &[u8]) -> Result<()> {
let key = format!("{app_id}:{block_number}");
fn store_data_in_db(db: Arc<DB>, app_id: AppId, block_number: u32, data: &[u8]) -> Result<()> {
let key = format!("{}:{block_number}", app_id.0);
let cf_handle = db
.cf_handle(APP_DATA_CF)
.context("Failed to get cf handle")?;
Expand All @@ -65,7 +66,7 @@ fn get_data_from_db(db: Arc<DB>, app_id: u32, block_number: u32) -> Result<Optio
/// Encodes and stores app data into database under the `app_id:block_number` key
pub fn store_encoded_data_in_db<T: Encode>(
db: Arc<DB>,
app_id: u32,
app_id: AppId,
block_number: u32,
data: &T,
) -> Result<()> {
Expand Down
Loading
Loading