Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: construct Processor with Config and Builder #454

Merged
merged 6 commits into from
Jul 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

35 changes: 32 additions & 3 deletions core/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub struct SessionManagerBuilder {
/// To verify the session, use `verify_self()` method of [Session].
/// To verify a message, use `verify(msg, sig)` method of [Session].
#[wasm_export]
#[derive(Debug)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SessionManager {
/// Session
session: Session,
Expand Down Expand Up @@ -118,6 +118,19 @@ impl TryFrom<(String, String)> for Authorizer {
}
}

// A SessionManager can be converted to a string using JSON and then encoded with base58.
// To load the SessionManager from a string, use `SessionManager::from_str`.
impl FromStr for SessionManager {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should rename SessionManager to a better name, Session Manager is not a Menager, it's actualy a wrapper of generated key and it's session.

DelegationToken ? DelegatedKey? and can be abbr. as DTOKEN / DKEY

Copy link
Member Author

@Ma233 Ma233 Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about SessionKeypair ? Better to do it in a separated PR.

Copy link
Member

@RyanKung RyanKung Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's wrap of key, not wrap of session. The field session is used as a proof of the sk.
So, SessionizeKey if you want to undeline it can be verified by session. (I dont think it's a good idea, but it's ok)
Or, Delegatedkey if you want to underline how and what it's desined for.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DelegatedKey sounds more understandable. 😆

type Err = Error;

fn from_str(s: &str) -> Result<Self> {
let s = base58_monero::decode_check(s).map_err(|_| Error::Decode)?;
let session_manager: SessionManager =
serde_json::from_slice(&s).map_err(Error::Deserialize)?;
Ok(session_manager)
}
}

#[wasm_export]
impl SessionManagerBuilder {
/// Create a new SessionManagerBuilder.
Expand Down Expand Up @@ -157,8 +170,8 @@ impl SessionManagerBuilder {
}

/// Set the lifetime of session.
pub fn ttl(mut self, ttl_ms: Option<usize>) -> Self {
self.ttl_ms = ttl_ms.unwrap_or(DEFAULT_SESSION_TTL_MS);
pub fn ttl(mut self, ttl_ms: usize) -> Self {
self.ttl_ms = ttl_ms;
self
}

Expand Down Expand Up @@ -279,6 +292,13 @@ impl SessionManager {
pub fn authorizer_did(&self) -> Did {
self.session.authorizer_did()
}

/// Dump session_manager to string, allowing user to save it in a config file.
/// It can be restored using `SessionManager::from_str`.
pub fn dump(&self) -> Result<String> {
let s = serde_json::to_string(&self).map_err(|_| Error::SerializeError)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not bincode? It will be encoded into b58 finally, so readable is not necessary property.

Copy link
Member Author

@Ma233 Ma233 Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readable would be better for debug. You could simply decode b58 to see the content, because it's serialized json string after decoding.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I agree

base58_monero::encode_check(s.as_bytes()).map_err(|_| Error::Encode)
}
}

#[cfg(test)]
Expand All @@ -301,4 +321,13 @@ mod test {
let pubkey = session.authorizer_pubkey().unwrap();
assert_eq!(key.pubkey(), pubkey);
}

#[test]
pub fn test_dump_restore() {
let key = SecretKey::random();
let sm = SessionManager::new_with_seckey(&key).unwrap();
let dump = sm.dump().unwrap();
let sm2 = SessionManager::from_str(&dump).unwrap();
assert_eq!(sm, sm2);
}
}
8 changes: 4 additions & 4 deletions core/src/swarm/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ impl SwarmBuilder {

/// Sets up the external address for swarm transport.
/// This will be used to configure the transport to listen for WebRTC connections in "HOST" mode.
pub fn external_address(mut self, external_address: Option<String>) -> Self {
self.external_address = external_address;
pub fn external_address(mut self, external_address: String) -> Self {
self.external_address = Some(external_address);
self
}

Expand All @@ -87,8 +87,8 @@ impl SwarmBuilder {
}

/// Bind message callback function for Swarm.
pub fn message_callback(mut self, callback: Option<CallbackFn>) -> Self {
self.message_callback = callback;
pub fn message_callback(mut self, callback: CallbackFn) -> Self {
self.message_callback = Some(callback);
self
}

Expand Down
13 changes: 8 additions & 5 deletions core/src/tests/default/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ pub async fn prepare_node_with_callback(
.unwrap();

let session_manager = SessionManager::new_with_seckey(&key).unwrap();
let swarm = Arc::new(
SwarmBuilder::new(stun, storage, session_manager)
.message_callback(message_callback)
.build(),
);

let mut swarm_builder = SwarmBuilder::new(stun, storage, session_manager);

if let Some(callback) = message_callback {
swarm_builder = swarm_builder.message_callback(callback);
}

let swarm = Arc::new(swarm_builder.build());

println!("key: {:?}", key.to_string());
println!("did: {:?}", swarm.did());
Expand Down
5 changes: 3 additions & 2 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ node = [
"opentelemetry",
"opentelemetry-jaeger",
"backtrace",
"serde_yaml",
"lazy_static",
"axum/ws",
"axum/headers",
Expand All @@ -51,6 +50,7 @@ browser = [
"serde-wasm-bindgen",
"wasmer/js-default",
"lazy_static",
"wasm-bindgen",
]
browser_chrome_test = ["browser"]

Expand All @@ -74,6 +74,7 @@ rings-derive = { workspace = true, optional = true }
rings-rpc = { workspace = true, optional = true }
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.70"
serde_yaml = "0.9.17"
thiserror = "1"
tracing = "0.1.37"
tracing-log = "0.1.3"
Expand All @@ -92,7 +93,6 @@ opentelemetry = { version = "0.18.0", default-features = false, features = ["tra
opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio"], optional = true }
pin-project = { version = "1", optional = true }
reqwest = { version = "0.11", features = ["json", "rustls-tls"], optional = true, default-features = false }
serde_yaml = { version = "0.9.17", optional = true }
tokio = { version = "1.13.0", features = ["full"], optional = true }
tower-http = { version = "0.3.4", features = ["cors"], optional = true }

Expand All @@ -101,6 +101,7 @@ console_error_panic_hook = { version = "0.1.1", optional = true }
reqwest-wasm = { version = "0.11", features = ["json", "rustls-tls"], optional = true, default-features = false }
serde-wasm-bindgen = { version = "0.5.0", optional = true }
tracing-wasm = { version = "0.2.1", optional = true }
wasm-bindgen = { workspace = true, features = ["serde-serialize"], optional = true }

[dev-dependencies]
fluvio-wasm-timer = "0.2.5"
Expand Down
71 changes: 29 additions & 42 deletions node/bin/rings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,19 @@ use futures::select;
use futures::StreamExt;
use futures_timer::Delay;
use rings_node::backend::service::Backend;
use rings_node::cli::Client;
use rings_node::config;
use rings_node::endpoint::run_http_api;
use rings_node::logging::init_logging;
use rings_node::logging::LogLevel;
use rings_node::measure::PeriodicMeasure;
use rings_node::native::cli::Client;
use rings_node::native::config;
use rings_node::native::endpoint::run_http_api;
use rings_node::prelude::http;
use rings_node::prelude::rings_core::dht::Did;
use rings_node::prelude::rings_core::dht::Stabilization;
use rings_node::prelude::rings_core::ecc::SecretKey;
use rings_node::prelude::PersistenceStorage;
use rings_node::prelude::SessionManager;
use rings_node::prelude::SwarmBuilder;
use rings_node::processor::Processor;
use rings_node::processor::ProcessorBuilder;
use rings_node::processor::ProcessorConfig;
use tokio::io;
use tokio::io::AsyncBufReadExt;

Expand Down Expand Up @@ -176,12 +175,8 @@ impl ClientArgs {
let c = config::Config::read_fs(self.config_args.config.as_str())?;

let endpoint_url = self.endpoint_url.as_ref().unwrap_or(&c.endpoint_url);
let ecdsa_key = self.ecdsa_key.unwrap_or(c.ecdsa_key);

Client::new(
endpoint_url.as_str(),
Processor::generate_signature(&ecdsa_key).as_str(),
)
let session_manager = SessionManager::from_str(&c.session_manager)?;
Client::new(endpoint_url.as_str(), session_manager)
}
}

Expand Down Expand Up @@ -367,19 +362,24 @@ struct InspectCommand {
client_args: ClientArgs,
}

fn get_value<V>(value: Option<V>, default_value: V) -> V {
value.unwrap_or(default_value)
}

#[allow(clippy::too_many_arguments)]
async fn daemon_run(args: RunCommand) -> anyhow::Result<()> {
let c = config::Config::read_fs(args.config_args.config)?;
let mut c = config::Config::read_fs(args.config_args.config)?;

let key = get_value(args.ecdsa_key, c.ecdsa_key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Accept a secret key directly, generated delegatedKey, then drop it."
-- I don't think the origin impl is that bad. It's fine if the sk is not holding by Client.
-- Just use & drop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can support both way for Client initialization, by sk directly, and dumped DKEY(SessionManager)

Copy link
Member Author

@Ma233 Ma233 Jul 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generation procedure is in init command.
Here is just a behavior that is easily misunderstood, it replace the key of config with the command line arguments, so that I removed this behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get!

let did: Did = key.address().into();
println!("Did: {}", did);
if let Some(ice_servers) = args.ice_servers {
c.ice_servers = ice_servers;
}
if let Some(external_ip) = args.external_ip {
c.external_ip = Some(external_ip);
}
if let Some(stabilize_timeout) = args.stabilize_timeout {
c.stabilize_timeout = stabilize_timeout;
}
if let Some(http_addr) = args.http_addr {
c.http_addr = http_addr;
}

let session_manager = SessionManager::new_with_seckey(&key)?;
let pc = ProcessorConfig::from(&c);

let (data_storage, measure_storage) = if let Some(storage_path) = args.storage_path {
let storage_path = Path::new(&storage_path);
Expand All @@ -404,38 +404,25 @@ async fn daemon_run(args: RunCommand) -> anyhow::Result<()> {

let measure = PeriodicMeasure::new(per_measure_storage);

let stuns = get_value(args.ice_servers, c.ice_servers);

let external_ip = args.external_ip.map(Some).unwrap_or(c.external_ip);

let (sender, receiver) = tokio::sync::broadcast::channel(1024);
let backend_config = (c.backend, c.extension).into();
let backend = Backend::new(backend_config, sender).await?;
let backend_service_names = backend.service_names();

let swarm = Arc::new(
SwarmBuilder::new(stuns.as_str(), per_data_storage, session_manager)
.external_address(external_ip)
.measure(Box::new(measure))
.message_callback(Some(Box::new(backend)))
.build(),
let processor = Arc::new(
ProcessorBuilder::from_config(serde_yaml::to_string(&pc)?)?
.storage(per_data_storage)
.measure(measure)
.message_callback(Box::new(backend))
.build()?,
);
println!("Did: {}", processor.swarm.did());

let stabilize_timeout = get_value(args.stabilize_timeout, c.stabilize_timeout);
let stabilize = Arc::new(Stabilization::new(swarm.clone(), stabilize_timeout));

let processor = Arc::new(Processor::from((swarm, stabilize)));
let processor_clone = processor.clone();

let pubkey = Arc::new(key.pubkey());
println!("Signature: {}", Processor::generate_signature(&key));

let bind_addr = get_value(args.http_addr, c.http_addr);

let _ = futures::join!(
processor.listen(),
service_loop_register(&processor, backend_service_names),
run_http_api(bind_addr, processor_clone, pubkey, receiver,),
run_http_api(c.http_addr, processor_clone, receiver),
);

Ok(())
Expand Down
Loading