From fb20b4fefe9ef6a1e2fd80207d91353f9f03702a Mon Sep 17 00:00:00 2001 From: sgdxbc Date: Sat, 6 Jul 2024 11:50:29 +0800 Subject: [PATCH] Add workload binaries and fix context --- examples/unreplicated-run-client.rs | 4 +- src/bin/workload-standalone.rs | 4 ++ src/bin/workload.rs | 8 +++ src/bin/workload/servers.rs | 106 ++++++++++++++++++++++++++++ src/codec.rs | 4 +- src/event.rs | 2 +- src/event/task.rs | 4 +- src/pbft/messages.rs | 2 +- src/pbft/replica.rs | 60 +++++++++++----- src/unreplicated.rs | 18 ++--- 10 files changed, 180 insertions(+), 32 deletions(-) create mode 100644 src/bin/workload-standalone.rs create mode 100644 src/bin/workload.rs create mode 100644 src/bin/workload/servers.rs diff --git a/examples/unreplicated-run-client.rs b/examples/unreplicated-run-client.rs index a37815aa..c81a52f0 100644 --- a/examples/unreplicated-run-client.rs +++ b/examples/unreplicated-run-client.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use neatworks::{ codec::Payload, event::{ - task::{erase::ScheduleOf, run_with_schedule, ScheduleState}, + task::{erase::Of, run_with_schedule, ScheduleState}, Erase, SendEvent, Untyped, }, net::{combinators::Forward, task::udp}, @@ -28,7 +28,7 @@ async fn main() -> anyhow::Result<()> { unreplicated::codec::client_decode(Erase::new(sender.clone())), ); - let mut context = unreplicated::context::Client::<_, _, ScheduleOf<_>> { + let mut context = unreplicated::context::Client::<_, _, Of<_>> { net: unreplicated::codec::client_encode(Forward( ([127, 0, 0, 1], 3000).into(), socket.clone(), diff --git a/src/bin/workload-standalone.rs b/src/bin/workload-standalone.rs new file mode 100644 index 00000000..664ebfbc --- /dev/null +++ b/src/bin/workload-standalone.rs @@ -0,0 +1,4 @@ +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { + Ok(()) +} diff --git a/src/bin/workload.rs b/src/bin/workload.rs new file mode 100644 index 00000000..89076b0d --- /dev/null +++ b/src/bin/workload.rs @@ -0,0 +1,8 @@ +mod workload { + mod servers; +} + +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { + Ok(()) +} diff --git a/src/bin/workload/servers.rs b/src/bin/workload/servers.rs new file mode 100644 index 00000000..09b9d5a7 --- /dev/null +++ b/src/bin/workload/servers.rs @@ -0,0 +1,106 @@ +use std::{future::Future, sync::Arc}; + +use neatworks::{ + crypto::{Crypto, CryptoFlavor}, + event::{ + task::{erase::Of, run, run_with_schedule, run_worker, ScheduleState}, + Erase, Untyped, + }, + net::{combinators::IndexNet, task::udp}, + pbft, unreplicated, + workload::Null, +}; +use tokio::{net::UdpSocket, select, signal::ctrl_c, sync::mpsc::unbounded_channel}; + +pub async fn unreplicated() -> anyhow::Result<()> { + let socket = Arc::new(UdpSocket::bind("localhost:3000").await?); + let (sender, mut receiver) = unbounded_channel(); + + let net_task = udp::run( + &socket, + unreplicated::codec::server_decode(Erase::new(sender)), + ); + let mut context = unreplicated::context::Server { + net: unreplicated::codec::server_encode(socket.clone()), + }; + let server_task = run( + Untyped::new(unreplicated::ServerState::new(Null)), + &mut context, + &mut receiver, + ); + run_until_interrupted(async { + select! { + result = net_task => result, + result = server_task => result, + } + }) + .await +} + +pub async fn pbft(config: pbft::PublicParameters, index: usize) -> anyhow::Result<()> { + let socket = Arc::new(UdpSocket::bind("localhost:3000").await?); + + let (crypto_sender, mut crypto_receiver) = unbounded_channel(); + let (schedule_sender, mut schedule_receiver) = unbounded_channel(); + let (sender, mut receiver) = unbounded_channel(); + + let mut context = pbft::replica::context::Context::<_, _, Of<_>, _> { + // TODO + peer_net: pbft::messages::codec::to_replica_encode(IndexNet::new( + vec![], + index, + socket.clone(), + )), + downlink_net: pbft::messages::codec::to_client_encode(socket.clone()), + crypto_worker: crypto_sender, + schedule: Erase::new(ScheduleState::new(schedule_sender)), + _m: Default::default(), + }; + let server_task = run_with_schedule( + Untyped::new(pbft::replica::State::new(index as _, Null, config.clone())), + &mut context, + &mut receiver, + &mut schedule_receiver, + |context| &mut context.schedule, + ); + let net_task = udp::run( + &socket, + pbft::messages::codec::to_replica_decode(Erase::new(sender.clone())), + ); + let crypto = Crypto::new_hardcoded(config.num_replica, index, CryptoFlavor::Schnorrkel)?; + let crypto_task = run_worker(crypto, Erase::new(sender), &mut crypto_receiver); + + run_until_interrupted(async { + select! { + result = server_task => result, + result = net_task => result, + result = crypto_task => result, + } + }) + .await +} + +async fn run_until( + task: impl Future>, + background_task: impl Future>, +) -> anyhow::Result<()> { + select! { + result = background_task => result?, + result = task => return result, + } + anyhow::bail!("unexpected termination of forever task") +} + +async fn run_until_interrupted( + task: impl Future>, +) -> anyhow::Result<()> { + run_until( + async { + ctrl_c().await?; + println!(); + anyhow::Ok(()) + }, + task, + ) + .await +} diff --git a/src/codec.rs b/src/codec.rs index fabb3196..562ed928 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -11,9 +11,9 @@ use crate::{ pub struct Encode(fn(&M) -> anyhow::Result, pub T); -impl>, A> SendEvent> for Encode { +impl, L, N: SendEvent>, A> SendEvent> for Encode { fn send(&mut self, Send(remote, message): Send) -> anyhow::Result<()> { - let encoded = (self.0)(&message)?; + let encoded = (self.0)(&message.into())?; self.1.send(Send(remote, encoded)) } } diff --git a/src/event.rs b/src/event.rs index f2b5a68c..87c5b943 100644 --- a/src/event.rs +++ b/src/event.rs @@ -75,7 +75,7 @@ pub trait OnErasedEvent { fn on_event(&mut self, event: M, context: &mut C) -> anyhow::Result<()>; } -#[derive_where(Debug; E)] +#[derive_where(Debug, Clone; E)] #[derive(Deref, DerefMut)] pub struct Erase( #[deref] diff --git a/src/event/task.rs b/src/event/task.rs index 968490cb..e3a7dd9d 100644 --- a/src/event/task.rs +++ b/src/event/task.rs @@ -13,7 +13,9 @@ use super::{work, OnEvent, ScheduleEvent, SendEvent, TimerId}; pub mod erase { use crate::event::{Erase, ErasedEvent}; - pub struct ScheduleOf(std::marker::PhantomData); + pub struct Of(std::marker::PhantomData); + + pub type Sender = Erase>>; pub type ScheduleState = Erase>>; } diff --git a/src/pbft/messages.rs b/src/pbft/messages.rs index 359fa4bf..ffa4aa20 100644 --- a/src/pbft/messages.rs +++ b/src/pbft/messages.rs @@ -114,7 +114,7 @@ pub mod codec { Encode::bincode(net) } - pub fn to_server_decode<'a, A: Addr>( + pub fn to_replica_decode<'a, A: Addr>( mut sender: impl SendEvent>> + SendEvent, Vec>)>> + SendEvent>> diff --git a/src/pbft/replica.rs b/src/pbft/replica.rs index 68b4e4d5..a901bcac 100644 --- a/src/pbft/replica.rs +++ b/src/pbft/replica.rs @@ -1162,29 +1162,28 @@ impl> OnErasedEvent, C> f } pub mod context { + use std::marker::PhantomData; + use super::*; - pub struct Context, T: ScheduleOn> { + pub struct Context, S> { pub peer_net: PN, pub downlink_net: DN, - pub crypto_worker: CW::Out, - pub schedule: T::Out, - } - - pub trait CryptoWorkerOn { - type Out: Submit; - type Context; + pub crypto_worker: O::CryptoWorker, + pub schedule: O::Schedule, + pub _m: PhantomData, } - pub trait ScheduleOn { - type Out: ScheduleEvent + pub trait On { + type CryptoWorker: Submit; + type CryptoContext: work::Upcall; + type Schedule: ScheduleEvent + ScheduleEvent + ScheduleEvent + ScheduleEvent; } - impl, T: ScheduleOn, S, A> super::Context - for Context + impl, S, A> super::Context for Context where PN: SendMessage> + SendMessage, Vec>)> @@ -1195,13 +1194,12 @@ pub mod context { + SendMessage + SendMessage>, DN: SendMessage, - CW::Context: work::Upcall, { type PeerNet = PN; type DownlinkNet = DN; - type CryptoWorker = CW::Out; - type CryptoContext = CW::Context; - type Schedule = T::Out; + type CryptoWorker = O::CryptoWorker; + type CryptoContext = O::CryptoContext; + type Schedule = O::Schedule; fn peer_net(&mut self) -> &mut Self::PeerNet { &mut self.peer_net } @@ -1215,4 +1213,34 @@ pub mod context { &mut self.schedule } } + + mod task { + use tokio::sync::mpsc::UnboundedSender; + + use crate::event::{ + task::erase::{Of, ScheduleState, Sender}, + ErasedEvent, + }; + + use super::*; + + impl On>, State> for Of> + where + PN: SendMessage> + + SendMessage, Vec>)> + + SendMessage> + + SendMessage> + + SendMessage> + + SendMessage> + + SendMessage + + SendMessage>, + DN: SendMessage, + S: App, + A: Addr, + { + type CryptoWorker = UnboundedSender>; + type CryptoContext = Sender, Context>>; + type Schedule = ScheduleState, Context>>; + } + } } diff --git a/src/unreplicated.rs b/src/unreplicated.rs index 18412bc6..cc49c3bf 100644 --- a/src/unreplicated.rs +++ b/src/unreplicated.rs @@ -160,24 +160,24 @@ impl> OnErasedEvent>, C> for Serv pub mod context { use super::*; - pub struct Client> { + pub struct Client> { pub net: N, pub upcall: U, - pub schedule: T::Out, + pub schedule: O::Schedule, } - pub trait ClientScheduleOn { - type Out: ScheduleEvent; + pub trait On { + type Schedule: ScheduleEvent; } - impl, A> ClientContext for Client + impl, A> ClientContext for Client where N: SendEvent>>, U: SendEvent>, { type Net = N; type Upcall = U; - type Schedule = >::Out; + type Schedule = O::Schedule; fn net(&mut self) -> &mut Self::Net { &mut self.net } @@ -204,16 +204,16 @@ pub mod context { } mod task { - use crate::event::task::erase::{ScheduleOf, ScheduleState}; + use crate::event::task::erase::{Of, ScheduleState}; use super::*; - impl ClientScheduleOn> for ScheduleOf> + impl On> for Of> where N: SendEvent>>, U: SendEvent>, { - type Out = ScheduleState, Client>; + type Schedule = ScheduleState, Client>; } } }