Skip to content

Commit

Permalink
chore: WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Dec 20, 2024
1 parent 068a6fb commit 3f3f76d
Show file tree
Hide file tree
Showing 23 changed files with 645 additions and 104 deletions.
304 changes: 282 additions & 22 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ members = [
"packages/simple-signer",
"packages/rs-json-schema-compatibility-validator",
"packages/check-features",
"packages/wallet-utils-contract",
"packages/wallet-utils-contract", "packages/wasm-sdk",
]
[workspace.package]

Expand Down
28 changes: 23 additions & 5 deletions packages/rs-dapi-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ version = "1.7.0"
edition = "2021"

[features]
default = ["mocks", "offline-testing"]
# TODO we should not enable deps features in default
default = ["mocks", "offline-testing", "dapi-grpc/client", "backon/tokio-sleep"]
mocks = [
"dep:sha2",
"dep:hex",
Expand All @@ -17,17 +18,34 @@ mocks = [
dump = ["mocks"]
# skip tests that require connection to the platform; enabled by default
offline-testing = []
wasm = [
"dapi-grpc/wasm",
"dapi-grpc/platform",
"dapi-grpc/core",
# "backon/gloo-timers-sleep",
# "backon/std-blocking-sleep",
"dep:async-std",
"dep:tonic-web-wasm-client",
"dep:http",
"getrandom/js",
]

[dependencies]
backon = { version = "1.2", features = ["tokio-sleep"] }
backon = { version = "1.3", default-features = false }

dapi-grpc = { path = "../dapi-grpc", features = [
"core",
"platform",
"client",
], default-features = false }
futures = "0.3.28"
http = { version = "1.1.0", default-features = false, optional = true }
http-serde = { version = "2.1", optional = true }
rand = { version = "0.8.5", features = ["small_rng"] }
getrandom = { version = "0.2", optional = true }
tonic-web-wasm-client = { version = "0.6.0", optional = true }
rand = { version = "0.8.5", features = [
"small_rng",
"getrandom",
], default-features = false }
thiserror = "1.0.64"
tracing = "0.1.40"
tokio = { version = "1.40", default-features = false }
Expand All @@ -37,6 +55,6 @@ lru = { version = "0.12.3" }
serde = { version = "1.0.197", optional = true, features = ["derive"] }
serde_json = { version = "1.0.120", optional = true }
chrono = { version = "0.4.38", features = ["serde"] }

async-std = { version = "1.13", optional = true }
[dev-dependencies]
tokio = { version = "1.40", features = ["macros"] }
2 changes: 1 addition & 1 deletion packages/rs-dapi-client/src/address_list.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Subsystem to manage DAPI nodes.
use crate::Uri;
use chrono::Utc;
use dapi_grpc::tonic::transport::Uri;
use rand::{rngs::SmallRng, seq::IteratorRandom, SeedableRng};
use std::collections::hash_map::Entry;
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion packages/rs-dapi-client/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use std::{
sync::{Arc, Mutex},
};

use dapi_grpc::tonic::transport::Uri;
use lru::LruCache;

use crate::{
request_settings::AppliedRequestSettings,
transport::{CoreGrpcClient, PlatformGrpcClient},
Uri,
};

/// ConnectionPool represents pool of connections to DAPI nodes.
Expand Down
14 changes: 11 additions & 3 deletions packages/rs-dapi-client/src/dapi_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use backon::{ConstantBuilder, Retryable};
use dapi_grpc::mock::Mockable;
use dapi_grpc::tonic::async_trait;
#[cfg(not(feature = "wasm"))]
use dapi_grpc::tonic::transport::Certificate;
use std::fmt::{Debug, Display};
use std::sync::atomic::AtomicUsize;
Expand All @@ -13,7 +14,7 @@ use tracing::Instrument;
use crate::address_list::AddressListError;
use crate::connection_pool::ConnectionPool;
use crate::request_settings::AppliedRequestSettings;
use crate::transport::TransportError;
use crate::transport::{self, TransportError};
use crate::{
transport::{TransportClient, TransportRequest},
AddressList, CanRetry, DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult,
Expand Down Expand Up @@ -77,6 +78,7 @@ pub struct DapiClient {
address_list: AddressList,
settings: RequestSettings,
pool: ConnectionPool,
#[cfg(not(feature = "wasm"))]
/// Certificate Authority certificate to use for verifying the server's certificate.
pub ca_certificate: Option<Certificate>,
#[cfg(feature = "dump")]
Expand All @@ -95,6 +97,7 @@ impl DapiClient {
pool: ConnectionPool::new(address_count),
#[cfg(feature = "dump")]
dump_dir: None,
#[cfg(not(feature = "wasm"))]
ca_certificate: None,
}
}
Expand All @@ -107,6 +110,7 @@ impl DapiClient {
///
/// # Returns
/// [DapiClient] with CA certificate set.
#[cfg(not(feature = "wasm"))]
pub fn with_ca_certificate(mut self, ca_cert: Certificate) -> Self {
self.ca_certificate = Some(ca_cert);

Expand Down Expand Up @@ -200,8 +204,9 @@ impl DapiRequestExecutor for DapiClient {
.settings
.override_by(R::SETTINGS_OVERRIDES)
.override_by(settings)
.finalize()
.with_ca_certificate(self.ca_certificate.clone());
.finalize();
#[cfg(not(feature = "wasm"))]
let applied_settings = applied_settings.with_ca_certificate(self.ca_certificate.clone());

// Setup retry policy:
let retry_settings = ConstantBuilder::default()
Expand Down Expand Up @@ -310,10 +315,13 @@ impl DapiRequestExecutor for DapiClient {
}
};

let sleeper = transport::channel_impl::BackonSleeper::default();

// Start the routine with retry policy applied:
// We allow let_and_return because `result` is used later if dump feature is enabled
let result = routine
.retry(retry_settings)
.sleep(sleeper)
.notify(|error, duration| {
let retries_counter = Arc::clone(&retries_counter_arc);
retries_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Expand Down
8 changes: 7 additions & 1 deletion packages/rs-dapi-client/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub trait InnerInto<T> {
}

/// Error happened during request execution.
#[derive(Debug, Clone, thiserror::Error, Eq, PartialEq)]
#[derive(Debug, Clone, thiserror::Error, Eq)]
#[error("{inner}")]
pub struct ExecutionError<E> {
/// The cause of error
Expand All @@ -45,6 +45,12 @@ pub struct ExecutionError<E> {
pub address: Option<Address>,
}

impl<E: PartialEq> PartialEq for ExecutionError<E> {
fn eq(&self, other: &Self) -> bool {
self.inner == other.inner && self.retries == other.retries && self.address == other.address
}
}

impl<F, T> InnerInto<ExecutionError<T>> for ExecutionError<F>
where
F: Into<T>,
Expand Down
7 changes: 7 additions & 0 deletions packages/rs-dapi-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@ pub use address_list::AddressListError;
pub use address_list::AddressStatus;
pub use connection_pool::ConnectionPool;
pub use dapi_client::{update_address_ban_status, DapiClient, DapiClientError};
#[cfg(not(feature = "wasm"))]
pub use dapi_grpc::tonic::transport::http;
#[cfg(feature = "dump")]
pub use dump::DumpData;
pub use executor::{
DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult, InnerInto, IntoInner,
WrapToExecutionResult,
};
use futures::{future::BoxFuture, FutureExt};
#[cfg(feature = "wasm")]
pub use http::Uri;
#[cfg(not(feature = "wasm"))]
pub use http_serde::http::Uri;

pub use request_settings::RequestSettings;

/// A DAPI request could be executed with an initialized [DapiClient].
Expand Down
4 changes: 4 additions & 0 deletions packages/rs-dapi-client/src/request_settings.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! DAPI client request settings processing.
#[cfg(not(feature = "wasm"))]
use dapi_grpc::tonic::transport::Certificate;
use std::time::Duration;

Expand Down Expand Up @@ -65,6 +66,7 @@ impl RequestSettings {
ban_failed_address: self
.ban_failed_address
.unwrap_or(DEFAULT_BAN_FAILED_ADDRESS),
#[cfg(not(feature = "wasm"))]
ca_certificate: None,
}
}
Expand All @@ -82,12 +84,14 @@ pub struct AppliedRequestSettings {
/// Ban DAPI address if node not responded or responded with error.
pub ban_failed_address: bool,
/// Certificate Authority certificate to use for verifying the server's certificate.
#[cfg(not(feature = "wasm"))]
pub ca_certificate: Option<Certificate>,
}
impl AppliedRequestSettings {
/// Use provided CA certificate for verifying the server's certificate.
///
/// If set to None, the system's default CA certificates will be used.
#[cfg(not(feature = "wasm"))]
pub fn with_ca_certificate(mut self, ca_cert: Option<Certificate>) -> Self {
self.ca_certificate = ca_cert;
self
Expand Down
13 changes: 10 additions & 3 deletions packages/rs-dapi-client/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
//! Transport options that DAPI requests use under the hood.
pub(crate) mod grpc;
#[cfg(not(feature = "wasm"))]
pub(crate) mod tonic_channel;
#[cfg(feature = "wasm")]
pub(crate) mod wasm_channel;

use crate::connection_pool::ConnectionPool;
pub use crate::request_settings::AppliedRequestSettings;
use crate::{CanRetry, RequestSettings};
use crate::{CanRetry, RequestSettings, Uri};
pub use channel_impl::{BackonSleeper, CoreGrpcClient, PlatformGrpcClient};
use dapi_grpc::mock::Mockable;
use dapi_grpc::tonic::transport::Uri;
pub use futures::future::BoxFuture;
pub use grpc::{CoreGrpcClient, PlatformGrpcClient};
use std::any;
use std::fmt::Debug;
#[cfg(not(feature = "wasm"))]
pub(crate) use tonic_channel as channel_impl;
#[cfg(feature = "wasm")]
pub(crate) use wasm_channel as channel_impl;

/// Generic transport layer request.
/// Requires [Clone] as could be retried and a client in general consumes a request.
Expand Down
49 changes: 8 additions & 41 deletions packages/rs-dapi-client/src/transport/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,19 @@
use std::time::Duration;

#[cfg(not(feature = "wasm"))]
use super::tonic_channel::create_channel;
#[cfg(feature = "wasm")]
use super::wasm_channel::create_channel;
use super::{CanRetry, TransportClient, TransportError, TransportRequest};
use super::{CoreGrpcClient, PlatformGrpcClient};
use crate::connection_pool::{ConnectionPool, PoolPrefix};
use crate::{request_settings::AppliedRequestSettings, RequestSettings};
use dapi_grpc::core::v0::core_client::CoreClient;
use crate::{request_settings::AppliedRequestSettings, RequestSettings, Uri};
use dapi_grpc::core::v0::{self as core_proto};
use dapi_grpc::platform::v0::{self as platform_proto, platform_client::PlatformClient};
use dapi_grpc::tonic::transport::{Certificate, ClientTlsConfig, Uri};
use dapi_grpc::tonic::Streaming;
use dapi_grpc::tonic::{transport::Channel, IntoRequest};
use dapi_grpc::platform::v0::{self as platform_proto};
use dapi_grpc::tonic::{IntoRequest, Streaming};
use futures::{future::BoxFuture, FutureExt, TryFutureExt};

/// Platform Client using gRPC transport.
pub type PlatformGrpcClient = PlatformClient<Channel>;
/// Core Client using gRPC transport.
pub type CoreGrpcClient = CoreClient<Channel>;

fn create_channel(
uri: Uri,
settings: Option<&AppliedRequestSettings>,
) -> Result<Channel, dapi_grpc::tonic::transport::Error> {
let host = uri.host().expect("Failed to get host from URI").to_string();

let mut builder = Channel::builder(uri);
let mut tls_config = ClientTlsConfig::new()
.with_native_roots()
.with_webpki_roots()
.assume_http2(true);

if let Some(settings) = settings {
if let Some(timeout) = settings.connect_timeout {
builder = builder.connect_timeout(timeout);
}

if let Some(pem) = settings.ca_certificate.as_ref() {
let cert = Certificate::from_pem(pem);
tls_config = tls_config.ca_certificate(cert).domain_name(host);
};
}

builder = builder
.tls_config(tls_config)
.expect("Failed to set TLS config");

Ok(builder.connect_lazy())
}

impl TransportClient for PlatformGrpcClient {
fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, TransportError> {
Ok(pool
Expand Down
57 changes: 57 additions & 0 deletions packages/rs-dapi-client/src/transport/tonic_channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use std::time::Duration;

use super::{CanRetry, TransportClient, TransportError, TransportRequest};
use crate::connection_pool::{ConnectionPool, PoolPrefix};
use crate::{request_settings::AppliedRequestSettings, RequestSettings, Uri};
use dapi_grpc::core::v0::core_client::CoreClient;
use dapi_grpc::core::v0::{self as core_proto};
use dapi_grpc::platform::v0::{self as platform_proto, platform_client::PlatformClient};
use dapi_grpc::tonic::transport::{Certificate, Channel, ClientTlsConfig, Uri};
use dapi_grpc::tonic::{IntoRequest, Streaming};
use futures::{future::BoxFuture, FutureExt, TryFutureExt};

/// Platform Client using gRPC transport.
pub type PlatformGrpcClient = PlatformClient<Channel>;
/// Core Client using gRPC transport.
pub type CoreGrpcClient = CoreClient<Channel>;

/// backon::Sleeper
#[derive(Default, Clone, Debug)]
pub(crate) struct Sleeper(backon::TokioSleeper);

impl backon::Sleeper for Sleeper {
type Sleep = backon::TokioSleeper::Sleep;
fn sleep(&self, dur: Duration) -> Self::Sleep {
self.0.sleep(dur)
}
}

fn create_channel(
uri: Uri,
settings: Option<&AppliedRequestSettings>,
) -> Result<Channel, TransportError> {
let host = uri.host().expect("Failed to get host from URI").to_string();

let mut builder = Channel::builder(uri);
let mut tls_config = ClientTlsConfig::new()
.with_native_roots()
.with_webpki_roots()
.assume_http2(true);

if let Some(settings) = settings {
if let Some(timeout) = settings.connect_timeout {
builder = builder.connect_timeout(timeout);
}

if let Some(pem) = settings.ca_certificate.as_ref() {
let cert = Certificate::from_pem(pem);
tls_config = tls_config.ca_certificate(cert).domain_name(host);
};
}

builder = builder
.tls_config(tls_config)
.expect("Failed to set TLS config");

Ok(builder.connect_lazy())
}
Loading

0 comments on commit 3f3f76d

Please sign in to comment.