Skip to content

Commit

Permalink
feat!: construct Processor with Config and Builder (#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 authored Jul 15, 2023
1 parent d1b9d1e commit 449adae
Show file tree
Hide file tree
Showing 27 changed files with 397 additions and 340 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 32 additions & 3 deletions core/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct SessionManagerBuilder {
/// To verify the session, use `verify_self()` method of [Session].
/// To verify a message, use `verify(msg, sig)` method of [Session].
#[wasm_export]
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SessionManager {
/// Session
session: Session,
Expand Down Expand Up @@ -118,6 +118,19 @@ impl TryFrom<(String, String)> for Authorizer {
}
}

// A SessionManager can be converted to a string using JSON and then encoded with base58.
// To load the SessionManager from a string, use `SessionManager::from_str`.
impl FromStr for SessionManager {
type Err = Error;

fn from_str(s: &str) -> Result<Self> {
let s = base58_monero::decode_check(s).map_err(|_| Error::Decode)?;
let session_manager: SessionManager =
serde_json::from_slice(&s).map_err(Error::Deserialize)?;
Ok(session_manager)
}
}

#[wasm_export]
impl SessionManagerBuilder {
/// Create a new SessionManagerBuilder.
Expand Down Expand Up @@ -157,8 +170,8 @@ impl SessionManagerBuilder {
}

/// Set the lifetime of session.
pub fn ttl(mut self, ttl_ms: Option<usize>) -> Self {
self.ttl_ms = ttl_ms.unwrap_or(DEFAULT_SESSION_TTL_MS);
pub fn ttl(mut self, ttl_ms: usize) -> Self {
self.ttl_ms = ttl_ms;
self
}

Expand Down Expand Up @@ -279,6 +292,13 @@ impl SessionManager {
pub fn authorizer_did(&self) -> Did {
self.session.authorizer_did()
}

/// Dump session_manager to string, allowing user to save it in a config file.
/// It can be restored using `SessionManager::from_str`.
pub fn dump(&self) -> Result<String> {
let s = serde_json::to_string(&self).map_err(|_| Error::SerializeError)?;
base58_monero::encode_check(s.as_bytes()).map_err(|_| Error::Encode)
}
}

#[cfg(test)]
Expand All @@ -301,4 +321,13 @@ mod test {
let pubkey = session.authorizer_pubkey().unwrap();
assert_eq!(key.pubkey(), pubkey);
}

#[test]
pub fn test_dump_restore() {
let key = SecretKey::random();
let sm = SessionManager::new_with_seckey(&key).unwrap();
let dump = sm.dump().unwrap();
let sm2 = SessionManager::from_str(&dump).unwrap();
assert_eq!(sm, sm2);
}
}
8 changes: 4 additions & 4 deletions core/src/swarm/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl SwarmBuilder {

/// Sets up the external address for swarm transport.
/// This will be used to configure the transport to listen for WebRTC connections in "HOST" mode.
pub fn external_address(mut self, external_address: Option<String>) -> Self {
self.external_address = external_address;
pub fn external_address(mut self, external_address: String) -> Self {
self.external_address = Some(external_address);
self
}

Expand All @@ -87,8 +87,8 @@ impl SwarmBuilder {
}

/// Bind message callback function for Swarm.
pub fn message_callback(mut self, callback: Option<CallbackFn>) -> Self {
self.message_callback = callback;
pub fn message_callback(mut self, callback: CallbackFn) -> Self {
self.message_callback = Some(callback);
self
}

Expand Down
13 changes: 8 additions & 5 deletions core/src/tests/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ pub async fn prepare_node_with_callback(
.unwrap();

let session_manager = SessionManager::new_with_seckey(&key).unwrap();
let swarm = Arc::new(
SwarmBuilder::new(stun, storage, session_manager)
.message_callback(message_callback)
.build(),
);

let mut swarm_builder = SwarmBuilder::new(stun, storage, session_manager);

if let Some(callback) = message_callback {
swarm_builder = swarm_builder.message_callback(callback);
}

let swarm = Arc::new(swarm_builder.build());

println!("key: {:?}", key.to_string());
println!("did: {:?}", swarm.did());
Expand Down
5 changes: 3 additions & 2 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ node = [
"opentelemetry",
"opentelemetry-jaeger",
"backtrace",
"serde_yaml",
"lazy_static",
"axum/ws",
"axum/headers",
Expand All @@ -51,6 +50,7 @@ browser = [
"serde-wasm-bindgen",
"wasmer/js-default",
"lazy_static",
"wasm-bindgen",
]
browser_chrome_test = ["browser"]

Expand All @@ -74,6 +74,7 @@ rings-derive = { workspace = true, optional = true }
rings-rpc = { workspace = true, optional = true }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.70"
serde_yaml = "0.9.17"
thiserror = "1"
tracing = "0.1.37"
tracing-log = "0.1.3"
Expand All @@ -92,7 +93,6 @@ opentelemetry = { version = "0.18.0", default-features = false, features = ["tra
opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"], optional = true }
pin-project = { version = "1", optional = true }
reqwest = { version = "0.11", features = ["json", "rustls-tls"], optional = true, default-features = false }
serde_yaml = { version = "0.9.17", optional = true }
tokio = { version = "1.13.0", features = ["full"], optional = true }
tower-http = { version = "0.3.4", features = ["cors"], optional = true }

Expand All @@ -101,6 +101,7 @@ console_error_panic_hook = { version = "0.1.1", optional = true }
reqwest-wasm = { version = "0.11", features = ["json", "rustls-tls"], optional = true, default-features = false }
serde-wasm-bindgen = { version = "0.5.0", optional = true }
tracing-wasm = { version = "0.2.1", optional = true }
wasm-bindgen = { workspace = true, features = ["serde-serialize"], optional = true }

[dev-dependencies]
fluvio-wasm-timer = "0.2.5"
Expand Down
71 changes: 29 additions & 42 deletions node/bin/rings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@ use futures::select;
use futures::StreamExt;
use futures_timer::Delay;
use rings_node::backend::service::Backend;
use rings_node::cli::Client;
use rings_node::config;
use rings_node::endpoint::run_http_api;
use rings_node::logging::init_logging;
use rings_node::logging::LogLevel;
use rings_node::measure::PeriodicMeasure;
use rings_node::native::cli::Client;
use rings_node::native::config;
use rings_node::native::endpoint::run_http_api;
use rings_node::prelude::http;
use rings_node::prelude::rings_core::dht::Did;
use rings_node::prelude::rings_core::dht::Stabilization;
use rings_node::prelude::rings_core::ecc::SecretKey;
use rings_node::prelude::PersistenceStorage;
use rings_node::prelude::SessionManager;
use rings_node::prelude::SwarmBuilder;
use rings_node::processor::Processor;
use rings_node::processor::ProcessorBuilder;
use rings_node::processor::ProcessorConfig;
use tokio::io;
use tokio::io::AsyncBufReadExt;

Expand Down Expand Up @@ -176,12 +175,8 @@ impl ClientArgs {
let c = config::Config::read_fs(self.config_args.config.as_str())?;

let endpoint_url = self.endpoint_url.as_ref().unwrap_or(&c.endpoint_url);
let ecdsa_key = self.ecdsa_key.unwrap_or(c.ecdsa_key);

Client::new(
endpoint_url.as_str(),
Processor::generate_signature(&ecdsa_key).as_str(),
)
let session_manager = SessionManager::from_str(&c.session_manager)?;
Client::new(endpoint_url.as_str(), session_manager)
}
}

Expand Down Expand Up @@ -367,19 +362,24 @@ struct InspectCommand {
client_args: ClientArgs,
}

fn get_value<V>(value: Option<V>, default_value: V) -> V {
value.unwrap_or(default_value)
}

#[allow(clippy::too_many_arguments)]
async fn daemon_run(args: RunCommand) -> anyhow::Result<()> {
let c = config::Config::read_fs(args.config_args.config)?;
let mut c = config::Config::read_fs(args.config_args.config)?;

let key = get_value(args.ecdsa_key, c.ecdsa_key);
let did: Did = key.address().into();
println!("Did: {}", did);
if let Some(ice_servers) = args.ice_servers {
c.ice_servers = ice_servers;
}
if let Some(external_ip) = args.external_ip {
c.external_ip = Some(external_ip);
}
if let Some(stabilize_timeout) = args.stabilize_timeout {
c.stabilize_timeout = stabilize_timeout;
}
if let Some(http_addr) = args.http_addr {
c.http_addr = http_addr;
}

let session_manager = SessionManager::new_with_seckey(&key)?;
let pc = ProcessorConfig::from(&c);

let (data_storage, measure_storage) = if let Some(storage_path) = args.storage_path {
let storage_path = Path::new(&storage_path);
Expand All @@ -404,38 +404,25 @@ async fn daemon_run(args: RunCommand) -> anyhow::Result<()> {

let measure = PeriodicMeasure::new(per_measure_storage);

let stuns = get_value(args.ice_servers, c.ice_servers);

let external_ip = args.external_ip.map(Some).unwrap_or(c.external_ip);

let (sender, receiver) = tokio::sync::broadcast::channel(1024);
let backend_config = (c.backend, c.extension).into();
let backend = Backend::new(backend_config, sender).await?;
let backend_service_names = backend.service_names();

let swarm = Arc::new(
SwarmBuilder::new(stuns.as_str(), per_data_storage, session_manager)
.external_address(external_ip)
.measure(Box::new(measure))
.message_callback(Some(Box::new(backend)))
.build(),
let processor = Arc::new(
ProcessorBuilder::from_config(serde_yaml::to_string(&pc)?)?
.storage(per_data_storage)
.measure(measure)
.message_callback(Box::new(backend))
.build()?,
);
println!("Did: {}", processor.swarm.did());

let stabilize_timeout = get_value(args.stabilize_timeout, c.stabilize_timeout);
let stabilize = Arc::new(Stabilization::new(swarm.clone(), stabilize_timeout));

let processor = Arc::new(Processor::from((swarm, stabilize)));
let processor_clone = processor.clone();

let pubkey = Arc::new(key.pubkey());
println!("Signature: {}", Processor::generate_signature(&key));

let bind_addr = get_value(args.http_addr, c.http_addr);

let _ = futures::join!(
processor.listen(),
service_loop_register(&processor, backend_service_names),
run_http_api(bind_addr, processor_clone, pubkey, receiver,),
run_http_api(c.http_addr, processor_clone, receiver),
);

Ok(())
Expand Down
Loading

0 comments on commit 449adae

Please sign in to comment.