Skip to content

Commit

Permalink
Add workload binaries and fix context
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Jul 6, 2024
1 parent 87e60ce commit fb20b4f
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 32 deletions.
4 changes: 2 additions & 2 deletions examples/unreplicated-run-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use neatworks::{
codec::Payload,
event::{
task::{erase::ScheduleOf, run_with_schedule, ScheduleState},
task::{erase::Of, run_with_schedule, ScheduleState},
Erase, SendEvent, Untyped,
},
net::{combinators::Forward, task::udp},
Expand All @@ -28,7 +28,7 @@ async fn main() -> anyhow::Result<()> {
unreplicated::codec::client_decode(Erase::new(sender.clone())),
);

let mut context = unreplicated::context::Client::<_, _, ScheduleOf<_>> {
let mut context = unreplicated::context::Client::<_, _, Of<_>> {
net: unreplicated::codec::client_encode(Forward(
([127, 0, 0, 1], 3000).into(),
socket.clone(),
Expand Down
4 changes: 4 additions & 0 deletions src/bin/workload-standalone.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
Ok(())
}
8 changes: 8 additions & 0 deletions src/bin/workload.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mod workload {
mod servers;
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
Ok(())
}
106 changes: 106 additions & 0 deletions src/bin/workload/servers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
use std::{future::Future, sync::Arc};

use neatworks::{
crypto::{Crypto, CryptoFlavor},
event::{
task::{erase::Of, run, run_with_schedule, run_worker, ScheduleState},
Erase, Untyped,
},
net::{combinators::IndexNet, task::udp},
pbft, unreplicated,
workload::Null,
};
use tokio::{net::UdpSocket, select, signal::ctrl_c, sync::mpsc::unbounded_channel};

pub async fn unreplicated() -> anyhow::Result<()> {
let socket = Arc::new(UdpSocket::bind("localhost:3000").await?);
let (sender, mut receiver) = unbounded_channel();

let net_task = udp::run(
&socket,
unreplicated::codec::server_decode(Erase::new(sender)),
);
let mut context = unreplicated::context::Server {
net: unreplicated::codec::server_encode(socket.clone()),
};
let server_task = run(
Untyped::new(unreplicated::ServerState::new(Null)),
&mut context,
&mut receiver,
);
run_until_interrupted(async {
select! {
result = net_task => result,
result = server_task => result,
}
})
.await
}

pub async fn pbft(config: pbft::PublicParameters, index: usize) -> anyhow::Result<()> {
let socket = Arc::new(UdpSocket::bind("localhost:3000").await?);

let (crypto_sender, mut crypto_receiver) = unbounded_channel();
let (schedule_sender, mut schedule_receiver) = unbounded_channel();
let (sender, mut receiver) = unbounded_channel();

let mut context = pbft::replica::context::Context::<_, _, Of<_>, _> {
// TODO
peer_net: pbft::messages::codec::to_replica_encode(IndexNet::new(
vec![],
index,
socket.clone(),
)),
downlink_net: pbft::messages::codec::to_client_encode(socket.clone()),
crypto_worker: crypto_sender,
schedule: Erase::new(ScheduleState::new(schedule_sender)),
_m: Default::default(),
};
let server_task = run_with_schedule(
Untyped::new(pbft::replica::State::new(index as _, Null, config.clone())),
&mut context,
&mut receiver,
&mut schedule_receiver,
|context| &mut context.schedule,
);
let net_task = udp::run(
&socket,
pbft::messages::codec::to_replica_decode(Erase::new(sender.clone())),
);
let crypto = Crypto::new_hardcoded(config.num_replica, index, CryptoFlavor::Schnorrkel)?;
let crypto_task = run_worker(crypto, Erase::new(sender), &mut crypto_receiver);

run_until_interrupted(async {
select! {
result = server_task => result,
result = net_task => result,
result = crypto_task => result,
}
})
.await
}

async fn run_until(
task: impl Future<Output = anyhow::Result<()>>,
background_task: impl Future<Output = anyhow::Result<()>>,
) -> anyhow::Result<()> {
select! {
result = background_task => result?,
result = task => return result,
}
anyhow::bail!("unexpected termination of forever task")
}

async fn run_until_interrupted(
task: impl Future<Output = anyhow::Result<()>>,
) -> anyhow::Result<()> {
run_until(
async {
ctrl_c().await?;
println!();
anyhow::Ok(())
},
task,
)
.await
}
4 changes: 2 additions & 2 deletions src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use crate::{

pub struct Encode<M, T>(fn(&M) -> anyhow::Result<Bytes>, pub T);

impl<M, N: SendEvent<Send<A, Bytes>>, A> SendEvent<Send<A, M>> for Encode<M, N> {
impl<M: Into<L>, L, N: SendEvent<Send<A, Bytes>>, A> SendEvent<Send<A, M>> for Encode<L, N> {
fn send(&mut self, Send(remote, message): Send<A, M>) -> anyhow::Result<()> {
let encoded = (self.0)(&message)?;
let encoded = (self.0)(&message.into())?;
self.1.send(Send(remote, encoded))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub trait OnErasedEvent<M, C> {
fn on_event(&mut self, event: M, context: &mut C) -> anyhow::Result<()>;
}

#[derive_where(Debug; E)]
#[derive_where(Debug, Clone; E)]
#[derive(Deref, DerefMut)]
pub struct Erase<S, C, E>(
#[deref]
Expand Down
4 changes: 3 additions & 1 deletion src/event/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use super::{work, OnEvent, ScheduleEvent, SendEvent, TimerId};
pub mod erase {
use crate::event::{Erase, ErasedEvent};

pub struct ScheduleOf<S>(std::marker::PhantomData<S>);
pub struct Of<S>(std::marker::PhantomData<S>);

pub type Sender<S, C> = Erase<S, C, super::UnboundedSender<ErasedEvent<S, C>>>;

pub type ScheduleState<S, C> = Erase<S, C, super::ScheduleState<ErasedEvent<S, C>>>;
}
Expand Down
2 changes: 1 addition & 1 deletion src/pbft/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ pub mod codec {
Encode::bincode(net)
}

pub fn to_server_decode<'a, A: Addr>(
pub fn to_replica_decode<'a, A: Addr>(
mut sender: impl SendEvent<Recv<Request<A>>>
+ SendEvent<Recv<(Verifiable<PrePrepare>, Vec<Request<A>>)>>
+ SendEvent<Recv<Verifiable<Prepare>>>
Expand Down
60 changes: 44 additions & 16 deletions src/pbft/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1162,29 +1162,28 @@ impl<S: App, A: Addr, C: Context<Self, A>> OnErasedEvent<Verified<NewView>, C> f
}

pub mod context {
use std::marker::PhantomData;

use super::*;

pub struct Context<PN, DN, CW: CryptoWorkerOn<Self>, T: ScheduleOn<Self>> {
pub struct Context<PN, DN, O: On<Self, S>, S> {
pub peer_net: PN,
pub downlink_net: DN,
pub crypto_worker: CW::Out,
pub schedule: T::Out,
}

pub trait CryptoWorkerOn<C> {
type Out: Submit<Crypto, Self::Context>;
type Context;
pub crypto_worker: O::CryptoWorker,
pub schedule: O::Schedule,
pub _m: PhantomData<S>,
}

pub trait ScheduleOn<C> {
type Out: ScheduleEvent<events::DoViewChange>
pub trait On<C, S> {
type CryptoWorker: Submit<Crypto, Self::CryptoContext>;
type CryptoContext: work::Upcall<S, C>;
type Schedule: ScheduleEvent<events::DoViewChange>
+ ScheduleEvent<events::ProgressPrepare>
+ ScheduleEvent<events::ProgressViewChange>
+ ScheduleEvent<events::StateTransfer>;
}

impl<PN, DN, CW: CryptoWorkerOn<Self>, T: ScheduleOn<Self>, S, A> super::Context<S, A>
for Context<PN, DN, CW, T>
impl<PN, DN, O: On<Self, S>, S, A> super::Context<S, A> for Context<PN, DN, O, S>
where
PN: SendMessage<u8, Request<A>>
+ SendMessage<All, (Verifiable<PrePrepare>, Vec<Request<A>>)>
Expand All @@ -1195,13 +1194,12 @@ pub mod context {
+ SendMessage<u8, QueryNewView>
+ SendMessage<u8, Verifiable<NewView>>,
DN: SendMessage<A, Reply>,
CW::Context: work::Upcall<S, Self>,
{
type PeerNet = PN;
type DownlinkNet = DN;
type CryptoWorker = CW::Out;
type CryptoContext = CW::Context;
type Schedule = T::Out;
type CryptoWorker = O::CryptoWorker;
type CryptoContext = O::CryptoContext;
type Schedule = O::Schedule;
fn peer_net(&mut self) -> &mut Self::PeerNet {
&mut self.peer_net
}
Expand All @@ -1215,4 +1213,34 @@ pub mod context {
&mut self.schedule
}
}

mod task {
use tokio::sync::mpsc::UnboundedSender;

use crate::event::{
task::erase::{Of, ScheduleState, Sender},
ErasedEvent,
};

use super::*;

impl<PN, DN, S, A> On<Context<PN, DN, Self, State<S, A>>, State<S, A>> for Of<State<S, A>>
where
PN: SendMessage<u8, Request<A>>
+ SendMessage<All, (Verifiable<PrePrepare>, Vec<Request<A>>)>
+ SendMessage<All, Verifiable<Prepare>>
+ SendMessage<All, Verifiable<Commit>>
+ SendMessage<All, Verifiable<ViewChange>>
+ SendMessage<All, Verifiable<NewView>>
+ SendMessage<u8, QueryNewView>
+ SendMessage<u8, Verifiable<NewView>>,
DN: SendMessage<A, Reply>,
S: App,
A: Addr,
{
type CryptoWorker = UnboundedSender<ErasedEvent<Crypto, Self::CryptoContext>>;
type CryptoContext = Sender<State<S, A>, Context<PN, DN, Self, State<S, A>>>;
type Schedule = ScheduleState<State<S, A>, Context<PN, DN, Self, State<S, A>>>;
}
}
}
18 changes: 9 additions & 9 deletions src/unreplicated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,24 +160,24 @@ impl<S: App, A, C: ServerContext<A>> OnErasedEvent<Recv<Request<A>>, C> for Serv
pub mod context {
use super::*;

pub struct Client<N, U, T: ClientScheduleOn<Self>> {
pub struct Client<N, U, O: On<Self>> {
pub net: N,
pub upcall: U,
pub schedule: T::Out,
pub schedule: O::Schedule,
}

pub trait ClientScheduleOn<C> {
type Out: ScheduleEvent<client::Resend>;
pub trait On<C> {
type Schedule: ScheduleEvent<client::Resend>;
}

impl<N, U, T: ClientScheduleOn<Self>, A> ClientContext<A> for Client<N, U, T>
impl<N, U, O: On<Self>, A> ClientContext<A> for Client<N, U, O>
where
N: SendEvent<Send<(), Request<A>>>,
U: SendEvent<InvokeOk<Payload>>,
{
type Net = N;
type Upcall = U;
type Schedule = <T as ClientScheduleOn<Self>>::Out;
type Schedule = O::Schedule;
fn net(&mut self) -> &mut Self::Net {
&mut self.net
}
Expand All @@ -204,16 +204,16 @@ pub mod context {
}

mod task {
use crate::event::task::erase::{ScheduleOf, ScheduleState};
use crate::event::task::erase::{Of, ScheduleState};

use super::*;

impl<N, U, A: Addr> ClientScheduleOn<Client<N, U, Self>> for ScheduleOf<ClientState<A>>
impl<N, U, A: Addr> On<Client<N, U, Self>> for Of<ClientState<A>>
where
N: SendEvent<Send<(), Request<A>>>,
U: SendEvent<InvokeOk<Payload>>,
{
type Out = ScheduleState<ClientState<A>, Client<N, U, Self>>;
type Schedule = ScheduleState<ClientState<A>, Client<N, U, Self>>;
}
}
}
Expand Down

0 comments on commit fb20b4f

Please sign in to comment.