Skip to content

Commit

Permalink
refactor(rust): enable requests to be messages
Browse files Browse the repository at this point in the history
  • Loading branch information
etorreborre committed Feb 4, 2025
1 parent 8cf0036 commit 5dd9725
Show file tree
Hide file tree
Showing 111 changed files with 2,716 additions and 997 deletions.
31 changes: 26 additions & 5 deletions examples/rust/file_transfer/src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,39 @@
use ockam::Message;
use ockam::{deserialize, serialize, Decodable, Encodable, Encoded, Message};
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Message)]
pub struct FileDescription {
pub name: String,
pub size: usize,
}
impl Message for FileDescription {}

#[derive(Serialize, Deserialize)]
impl Encodable for FileDescription {
fn encode(self) -> ockam::Result<Encoded> {
serialize(self)
}
}

impl Decodable for FileDescription {
fn decode(v: &[u8]) -> ockam::Result<Self> {
deserialize(v)
}
}

#[derive(Serialize, Deserialize, Message)]
pub enum FileData {
Description(FileDescription),
Data(Vec<u8>),
Quit,
}

impl Message for FileData {}
impl Encodable for FileData {
fn encode(self) -> ockam::Result<Encoded> {
serialize(self)
}
}

impl Decodable for FileData {
fn decode(v: &[u8]) -> ockam::Result<Self> {
deserialize(v)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn main(ctx: Context) -> Result<()> {
// Send a message to the "echoer" worker on a different node, over a tcp transport.
// Wait to receive a reply and print it.
let r = route![connection_to_responder, "echoer"];
let reply = node.send_and_receive::<String>(r, "Hello Ockam!".to_string()).await?;
let reply: String = node.send_and_receive(r, "Hello Ockam!".to_string()).await?;

println!("App Received: {}", reply); // should print "Hello Ockam!"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn main(ctx: Context) -> Result<()> {
// Send a message to the "echoer" worker, on a different node, over two tcp hops.
// Wait to receive a reply and print it.
let r = route![connection_to_middle_node, "forward_to_responder", "echoer"];
let reply = node.send_and_receive::<String>(r, "Hello Ockam!".to_string()).await?;
let reply: String = node.send_and_receive(r, "Hello Ockam!".to_string()).await?;
println!("App Received: {}", reply); // should print "Hello Ockam!"

// Stop all workers, stop the node, cleanup and return.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main(ctx: Context) -> Result<()> {
// Send a message to the "echoer" worker on a different node, over a udp transport.
// Wait to receive a reply and print it.
let r = route![bind, (UDP, "localhost:4000"), "echoer"];
let reply = node.send_and_receive::<String>(r, "Hello Ockam!".to_string()).await?;
let reply: String = node.send_and_receive(r, "Hello Ockam!".to_string()).await?;

println!("App Received: {}", reply); // should print "Hello Ockam!"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ async fn main(ctx: Context) -> Result<()> {

// Send a message to the echoer worker via the channel.
// Wait to receive a reply and print it.
let reply = node
.send_and_receive::<String>(route![channel, "echoer"], "Hello Ockam!".to_string())
let reply: String = node
.send_and_receive(route![channel, "echoer"], "Hello Ockam!".to_string())
.await?;
println!("App Received: {}", reply); // should print "Hello Ockam!"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ async fn main(ctx: Context) -> Result<()> {

// Send a message to the echoer worker via the channel.
// Wait to receive a reply and print it.
let reply = node
.send_and_receive::<String>(route![channel, "echoer"], "Hello Ockam!".to_string())
let reply: String = node
.send_and_receive(route![channel, "echoer"], "Hello Ockam!".to_string())
.await?;
println!("App Received: {}", reply); // should print "Hello Ockam!"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ async fn main(ctx: Context) -> Result<()> {

// Send a message to the worker at address "echoer".
// Wait to receive a reply and print it.
let reply = node
.send_and_receive::<String>(
let reply: String = node
.send_and_receive(
route![channel, DefaultAddress::ECHO_SERVICE],
"Hello Ockam!".to_string(),
)
Expand Down
5 changes: 3 additions & 2 deletions implementations/rust/ockam/ockam/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ pub use ockam_core::processor;
/// may be changed in the future to a [`Worker`](crate::Worker)-specific macro.
pub use ockam_core::worker;
pub use ockam_core::{
allow, deny, errcode, route, Address, Any, Encoded, Error, LocalMessage, Mailbox, Mailboxes,
Message, Processor, ProtocolId, Result, Route, Routed, TransportMessage, TryClone, Worker,
allow, deny, deserialize, errcode, route, serialize, Address, Any, Decodable, Encodable,
Encoded, Error, LocalMessage, Mailbox, Mailboxes, Message, Processor, ProtocolId, Result,
Route, Routed, TransportMessage, TryClone, Worker,
};
pub use ockam_identity as identity;
// ---
Expand Down
14 changes: 13 additions & 1 deletion implementations/rust/ockam/ockam/src/remote/info.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::Message;
use ockam_core::compat::string::String;
use ockam_core::flow_control::FlowControlId;
use ockam_core::{Address, Route};
use ockam_core::{deserialize, serialize, Address, Decodable, Encodable, Encoded, Route};
use serde::{Deserialize, Serialize};

/// Information about a remotely forwarded worker.
Expand All @@ -13,6 +13,18 @@ pub struct RemoteRelayInfo {
flow_control_id: Option<FlowControlId>,
}

impl Encodable for RemoteRelayInfo {
fn encode(self) -> ockam_core::Result<Encoded> {
serialize(self)
}
}

impl Decodable for RemoteRelayInfo {
fn decode(v: &[u8]) -> ockam_core::Result<Self> {
deserialize(v)
}
}

impl RemoteRelayInfo {
/// Constructor
pub fn new(
Expand Down
24 changes: 23 additions & 1 deletion implementations/rust/ockam/ockam/tests/message/test.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
use ockam::Message;
use ockam::*;
use serde::{Deserialize, Serialize};

#[derive(Message, Deserialize, Serialize)]
pub struct Tmp {
a: String,
}

impl Encodable for Tmp {
fn encode(self) -> Result<Encoded> {
serialize(self)
}
}
impl Decodable for Tmp {
fn decode(e: &[u8]) -> Result<Tmp> {
deserialize(e)
}
}

#[derive(Message, Deserialize, Serialize)]
pub struct Tmp1 {
a: Vec<u8>,
b: Vec<Tmp>,
}

impl Encodable for Tmp1 {
fn encode(self) -> Result<Encoded> {
serialize(self)
}
}
impl Decodable for Tmp1 {
fn decode(e: &[u8]) -> Result<Tmp1> {
deserialize(e)
}
}

fn assert_impl<T: Message>() {}
fn main() {
assert_impl::<String>();
Expand Down
12 changes: 6 additions & 6 deletions implementations/rust/ockam/ockam/tests/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ async fn test1(ctx: &mut Context) -> Result<()> {

let remote_info = RemoteRelay::create(ctx, route![], RemoteRelayOptions::new()).await?;

let resp = ctx
.send_and_receive::<String>(
let resp: String = ctx
.send_and_receive(
route![remote_info.remote_address(), "echoer"],
"Hello".to_string(),
)
Expand Down Expand Up @@ -62,8 +62,8 @@ async fn test2(ctx: &mut Context) -> Result<()> {
.connect(cloud_listener.socket_string(), TcpConnectionOptions::new())
.await?;

let resp = ctx
.send_and_receive::<String>(
let resp: String = ctx
.send_and_receive(
route![cloud_connection, remote_info.remote_address(), "echoer"],
"Hello".to_string(),
)
Expand Down Expand Up @@ -238,8 +238,8 @@ async fn test4(ctx: &mut Context) -> Result<()> {
)
.await?;

let resp = ctx
.send_and_receive::<String>(route![tunnel_channel, "echoer"], "Hello".to_string())
let resp: String = ctx
.send_and_receive(route![tunnel_channel, "echoer"], "Hello".to_string())
.await?;

assert_eq!(resp, "Hello");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use core::time::Duration;
use minicbor::Decoder;
use tracing::trace;

use crate::authenticator::credential_issuer::CredentialIssuer;
use crate::authenticator::direct::AccountAuthorityInfo;
use crate::authenticator::AuthorityMembersRepository;
use ockam::identity::{Credentials, Identifier, IdentitiesAttributes};
use ockam_core::api::{Method, RequestHeader, Response};
use ockam_core::api::{Method, Request, Response};
use ockam_core::compat::boxed::Box;
use ockam_core::compat::sync::Arc;
use ockam_core::compat::vec::Vec;
Expand Down Expand Up @@ -49,41 +48,43 @@ impl CredentialIssuerWorker {
#[ockam_core::worker]
impl Worker for CredentialIssuerWorker {
type Context = Context;
type Message = Vec<u8>;
type Message = Request<Vec<u8>>;

async fn handle_message(&mut self, c: &mut Context, m: Routed<Self::Message>) -> Result<()> {
let secure_channel_info = match SecureChannelLocalInfo::find_info(m.local_message()) {
Ok(secure_channel_info) => secure_channel_info,
Err(_e) => {
let resp = Response::bad_request_no_request("secure channel required").to_vec()?;
let resp =
Response::bad_request_no_request("secure channel required").into_vec()?;
c.send(m.return_route().clone(), resp).await?;
return Ok(());
}
};

let from = Identifier::from(secure_channel_info.their_identifier());
let return_route = m.return_route().clone();
let body = m.into_body()?;
let mut dec = Decoder::new(&body);
let req: RequestHeader = dec.decode()?;
let request = m.into_body()?;
let header = request.header();
trace! {
target: "credential_issuer",
from = %from,
id = %req.id(),
method = ?req.method(),
path = %req.path(),
body = %req.has_body(),
id = %header.id(),
method = ?header.method(),
path = %header.path(),
body = %header.has_body(),
"request"
}
let res = match (req.method(), req.path()) {
let res = match (header.method(), header.path()) {
(Some(Method::Post), "/") | (Some(Method::Post), "/credential") => {
match self.credential_issuer.issue_credential(&from).await {
Ok(Some(crd)) => Response::ok().with_headers(&req).body(crd).to_vec()?,
Ok(None) => Response::forbidden(&req, "unauthorized member").to_vec()?,
Err(error) => Response::internal_error(&req, &error.to_string()).to_vec()?,
Ok(Some(crd)) => Response::ok().with_headers(header).body(crd).into_vec()?,
Ok(None) => Response::forbidden(header, "unauthorized member").into_vec()?,
Err(error) => {
Response::internal_error(header, &error.to_string()).into_vec()?
}
}
}
_ => Response::unknown_path(&req).to_vec()?,
_ => Response::unknown_path(header).into_vec()?,
};

c.send(return_route, res).await
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use miette::IntoDiagnostic;
use std::collections::{BTreeMap, HashMap};

use ockam::identity::models::IdentifierList;
use ockam::identity::AttributesEntry;
use ockam::identity::Identifier;
use ockam_core::api::Request;
use ockam_core::async_trait;
use ockam_node::Context;

use crate::authenticator::direct::types::AddMember;
use crate::authenticator::direct::types::{AddMember, MemberList};
use crate::nodes::service::default_address::DefaultAddress;
use crate::orchestrator::{AuthorityNodeClient, HasSecureClient};

Expand Down Expand Up @@ -90,24 +91,28 @@ impl Members for AuthorityNodeClient {

async fn list_member_ids(&self, ctx: &Context) -> miette::Result<Vec<Identifier>> {
let req = Request::get("/member_ids");
self.get_secure_client()
let identifiers: IdentifierList = self
.get_secure_client()
.ask(ctx, DefaultAddress::DIRECT_AUTHENTICATOR, req)
.await
.into_diagnostic()?
.success()
.into_diagnostic()
.into_diagnostic()?;
Ok(identifiers.0)
}

async fn list_members(
&self,
ctx: &Context,
) -> miette::Result<HashMap<Identifier, AttributesEntry>> {
let req = Request::get("/");
self.get_secure_client()
let member_list: MemberList = self
.get_secure_client()
.ask(ctx, DefaultAddress::DIRECT_AUTHENTICATOR, req)
.await
.into_diagnostic()?
.success()
.into_diagnostic()
.into_diagnostic()?;
Ok(member_list.0)
}
}
Loading

0 comments on commit 5dd9725

Please sign in to comment.