From 2c9cf6f78d9bb2f5baec44860ce4e29df34a36ec Mon Sep 17 00:00:00 2001 From: sgdxbc Date: Sun, 7 Jul 2024 11:04:18 +0800 Subject: [PATCH] Revise context design --- src/bin/workload/clients.rs | 8 +-- src/bin/workload/servers.rs | 4 +- src/codec.rs | 8 +-- src/event.rs | 14 ++++++ src/event/task.rs | 18 +++++-- src/model.rs | 87 ++++++++++++++++++-------------- src/net.rs | 8 +-- src/net/combinators.rs | 20 ++++---- src/net/task/udp.rs | 6 +-- src/pbft/client.rs | 13 +++-- src/pbft/replica.rs | 4 +- src/unreplicated.rs | 98 ++++++++++++++++--------------------- 12 files changed, 161 insertions(+), 127 deletions(-) diff --git a/src/bin/workload/clients.rs b/src/bin/workload/clients.rs index 8d4c77bb..4c5aad88 100644 --- a/src/bin/workload/clients.rs +++ b/src/bin/workload/clients.rs @@ -3,7 +3,7 @@ use std::{future::Future, net::SocketAddr, sync::Arc}; use bytes::Bytes; use neatworks::{ event::{ - task::{run_with_schedule, ContextOf, ScheduleState}, + task::{run_with_schedule, Context, ScheduleState}, Erase, SendEvent, Untyped, }, net::{ @@ -43,13 +43,14 @@ pub async fn unreplicated(invoke_task: impl InvokeTask) -> anyhow::Result<()> { unreplicated::codec::client_decode(Erase::new(sender.clone())), ); - let mut context = unreplicated::context::Client::, _, _> { + let mut context = unreplicated::context::Client:: { net: unreplicated::codec::client_encode(Forward( ([127, 0, 0, 1], 3000).into(), socket.clone(), )), upcall: upcall_sender, schedule: Erase::new(ScheduleState::new(schedule_sender)), + _m: Default::default(), }; let client_task = run_with_schedule( Untyped::new(unreplicated::ClientState::new(random(), addr)), @@ -87,7 +88,7 @@ pub async fn pbft( pbft::messages::codec::to_client_decode(Erase::new(sender.clone())), ); - let mut context = pbft::client::context::Context::, _, _> { + let mut context = pbft::client::context::Context:: { net: pbft::messages::codec::to_replica_encode(IndexNet::new( replica_addrs, None, @@ -95,6 +96,7 @@ pub async fn pbft( )), upcall: upcall_sender, schedule: Erase::new(ScheduleState::new(schedule_sender)), + _m: Default::default(), }; let client_task = run_with_schedule( Untyped::new(pbft::client::State::new(random(), addr, config)), diff --git a/src/bin/workload/servers.rs b/src/bin/workload/servers.rs index 703de709..913d4ff2 100644 --- a/src/bin/workload/servers.rs +++ b/src/bin/workload/servers.rs @@ -3,7 +3,7 @@ use std::{net::SocketAddr, sync::Arc}; use neatworks::{ crypto::{Crypto, CryptoFlavor}, event::{ - task::{run, run_with_schedule, run_worker, ContextOf, ScheduleState}, + task::{run, run_with_schedule, run_worker, Context, ScheduleState}, Erase, Untyped, }, net::{combinators::IndexNet, task::udp}, @@ -46,7 +46,7 @@ pub async fn pbft( let (schedule_sender, mut schedule_receiver) = unbounded_channel(); let (sender, mut receiver) = unbounded_channel(); - let mut context = pbft::replica::context::Context::, _, _, _> { + let mut context = pbft::replica::context::Context:: { peer_net: pbft::messages::codec::to_replica_encode(IndexNet::new( addrs, index, diff --git a/src/codec.rs b/src/codec.rs index 1bdaea7d..7d98830f 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -5,16 +5,16 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use crate::{ event::SendEvent, - net::events::Send, + net::events::Cast, workload::{events::InvokeOk, Typed}, }; pub struct Encode(fn(&M) -> anyhow::Result, pub T); -impl, L, N: SendEvent>, A> SendEvent> for Encode { - fn send(&mut self, Send(remote, message): Send) -> anyhow::Result<()> { +impl, L, N: SendEvent>, A> SendEvent> for Encode { + fn send(&mut self, Cast(remote, message): Cast) -> anyhow::Result<()> { let encoded = (self.0)(&message.into())?; - self.1.send(Send(remote, encoded)) + self.1.send(Cast(remote, encoded)) } } diff --git a/src/event.rs b/src/event.rs index 34c6ada1..135aaade 100644 --- a/src/event.rs +++ b/src/event.rs @@ -46,6 +46,20 @@ pub trait ScheduleEvent { fn unset(&mut self, id: TimerId) -> anyhow::Result<()>; } +impl, M> ScheduleEvent for &mut T { + fn set( + &mut self, + period: Duration, + event: impl FnMut() -> M + Send + 'static, + ) -> anyhow::Result { + T::set(self, period, event) + } + + fn unset(&mut self, id: TimerId) -> anyhow::Result<()> { + T::unset(self, id) + } +} + #[derive_where(Debug; S)] #[derive(Deref, DerefMut)] pub struct Untyped( diff --git a/src/event/task.rs b/src/event/task.rs index 9f312dc3..bfafd359 100644 --- a/src/event/task.rs +++ b/src/event/task.rs @@ -21,14 +21,21 @@ pub mod erase { // a "stub", or marker type that indicates the task based context is being used // in a context type that refers to itself (on type level, not memory reference) // -// the private PhantomData<_> prevents it from being constructed anywhere +// the private PhantomData<()> prevents it from being constructed anywhere // outside. and it indeed should not be ever constructed; it only shows up in // type annotations, as a "placeholder" to take place for the actual generics // that would refer the context's own type and cannot be written out directly // anywhere outside the context definition // // the marker type seems to have no implementation. ideally it should have -// several implementation "blanket over" the generic state e.g. for schedule +// several implementation "blanket over" a generic state e.g. for schedule +// +// struct ContextOf(PhantomData) // the desired form of `Context` +// +// trait On { +// type Out; +// } +// // impl On for ContextOf // where // /* whatever bounds the state and context */ @@ -55,7 +62,12 @@ pub mod erase { // as the result, the `impl`s of `ContextOf<_>` all lives in the use sites and // are for some specialization. but in sprite those `impl`s are together // recovering the necessary part of the blanket above -pub struct ContextOf(PhantomData); +// +// since the `impl`s are becoming specialized, it is unnecessary for this marker +// to be generic over state (or anything). the (becoming unnecessary) +// PhantomData<_> is saved to continue preventing `Context` value being +// constructed, which is still desired despiting the workaround +pub struct Context(PhantomData<()>); impl, N> SendEvent for UnboundedSender { fn send(&mut self, event: M) -> anyhow::Result<()> { diff --git a/src/model.rs b/src/model.rs index 664152d2..3d8b80c5 100644 --- a/src/model.rs +++ b/src/model.rs @@ -1,8 +1,10 @@ use std::{collections::BTreeMap, fmt::Debug, time::Duration}; +use derive_where::derive_where; + use crate::{ - event::{SendEvent, TimerId}, - net::events::Send, + event::{ScheduleEvent, SendEvent, TimerId}, + net::events::Cast, }; pub trait State { @@ -17,55 +19,66 @@ pub trait State { } } -#[derive(Debug, PartialEq, Eq, Hash, Default)] -pub struct Network { - pub messages: BTreeMap>, - pub timers: BTreeMap>, // TODO - pub timer_count: u32, - pub now: Duration, +#[derive(Debug, PartialEq, Eq, Hash)] +#[derive_where(Default)] +pub struct TimerState { + envelops: Vec>, + count: u32, } -#[derive(Debug)] -pub struct NetworkContext<'a, A, M, T> { - state: &'a mut Network, - pub addr: A, +#[derive_where(Debug, PartialEq, Eq, Hash; M)] +struct TimerEnvelop { + id: u32, + #[derive_where(skip)] + generate: Box M + Send>, + period: Duration, + event: M, } -impl Network { - pub fn context(&mut self, addr: A) -> NetworkContext<'_, A, M, T> { - NetworkContext { state: self, addr } +impl TimerState { + pub fn new() -> Self { + Self::default() } } -impl, N, T> SendEvent> for NetworkContext<'_, A, N, T> { - fn send(&mut self, Send(remote, message): Send) -> anyhow::Result<()> { - let Some(inbox) = self.state.messages.get_mut(&remote) else { - anyhow::bail!("missing inbox for addr {remote:?}") +impl, N> ScheduleEvent for TimerState { + fn set( + &mut self, + period: Duration, + mut event: impl FnMut() -> M + Send + 'static, + ) -> anyhow::Result { + self.count += 1; + let id = self.count; + let envelop = TimerEnvelop { + id, + event: event().into(), + generate: Box::new(move || event().into()), + period, }; - inbox.push(message.into()); - Ok(()) + self.envelops.push(envelop); + Ok(TimerId(id)) } -} -impl NetworkContext<'_, A, M, T> { - pub fn set(&mut self, period: Duration, event: T) -> anyhow::Result { - let Some(inbox) = self.state.timers.get_mut(&self.addr) else { - anyhow::bail!("missing inbox for addr {:?}", self.addr) + fn unset(&mut self, TimerId(id): TimerId) -> anyhow::Result<()> { + let Some(pos) = self.envelops.iter().position(|envelop| envelop.id == id) else { + anyhow::bail!("missing timer of {:?}", TimerId(id)) }; - self.state.timer_count += 1; - let id = self.state.timer_count; - inbox.push((id, self.state.now + period, event)); - Ok(TimerId(id)) + self.envelops.remove(pos); + Ok(()) } +} - pub fn unset(&mut self, TimerId(id): TimerId) -> anyhow::Result<()> { - let Some(inbox) = self.state.timers.get_mut(&self.addr) else { - anyhow::bail!("missing inbox for addr {:?}", self.addr) - }; - let Some(pos) = inbox.iter().position(|(other_id, _, _)| *other_id == id) else { - anyhow::bail!("missing timer {:?}", TimerId(id)) +#[derive(Debug, PartialEq, Eq, Hash, Default)] +pub struct NetworkState { + pub messages: BTreeMap>, +} + +impl, N> SendEvent> for NetworkState { + fn send(&mut self, Cast(remote, message): Cast) -> anyhow::Result<()> { + let Some(inbox) = self.messages.get_mut(&remote) else { + anyhow::bail!("missing inbox for addr {remote:?}") }; - inbox.remove(pos); + inbox.push(message.into()); Ok(()) } } diff --git a/src/net.rs b/src/net.rs index ed0ceea4..8d801382 100644 --- a/src/net.rs +++ b/src/net.rs @@ -11,7 +11,9 @@ pub mod task { } pub mod events { - pub struct Send(pub A, pub M); + // probably called `Send` in any sane codebase, but that terribly conflicts with + // std::marker::Send + pub struct Cast(pub A, pub M); pub struct Recv(pub M); } @@ -20,9 +22,9 @@ pub trait SendMessage { fn send(&mut self, remote: A, message: M) -> anyhow::Result<()>; } -impl>, A, M> SendMessage for E { +impl>, A, M> SendMessage for E { fn send(&mut self, remote: A, message: M) -> anyhow::Result<()> { - SendEvent::send(self, events::Send(remote, message)) + SendEvent::send(self, events::Cast(remote, message)) } } diff --git a/src/net/combinators.rs b/src/net/combinators.rs index 96a6dcc2..ca1ad973 100644 --- a/src/net/combinators.rs +++ b/src/net/combinators.rs @@ -2,14 +2,14 @@ use bytes::Bytes; use crate::event::SendEvent; -use super::{events::Send, Addr}; +use super::{events::Cast, Addr}; #[derive(Debug)] pub struct Forward(pub A, pub N); -impl>, M> SendEvent> for Forward { - fn send(&mut self, Send((), message): Send<(), M>) -> anyhow::Result<()> { - self.1.send(Send(self.0.clone(), message)) +impl>, M> SendEvent> for Forward { + fn send(&mut self, Cast((), message): Cast<(), M>) -> anyhow::Result<()> { + self.1.send(Cast(self.0.clone(), message)) } } @@ -33,26 +33,26 @@ impl IndexNet { } } -impl>, M, I: Into> SendEvent> +impl>, M, I: Into> SendEvent> for IndexNet { - fn send(&mut self, Send(index, message): Send) -> anyhow::Result<()> { + fn send(&mut self, Cast(index, message): Cast) -> anyhow::Result<()> { let index = index.into(); let addr = self .addrs .get(index) .ok_or(anyhow::format_err!("missing address of index {index}"))?; - self.inner.send(Send(addr.clone(), message)) + self.inner.send(Cast(addr.clone(), message)) } } -impl>> SendEvent> for IndexNet { - fn send(&mut self, Send(All, message): Send) -> anyhow::Result<()> { +impl>> SendEvent> for IndexNet { + fn send(&mut self, Cast(All, message): Cast) -> anyhow::Result<()> { for (index, addr) in self.addrs.iter().enumerate() { if Some(index) == self.all_except { continue; } - self.inner.send(Send(addr.clone(), message.clone()))? + self.inner.send(Cast(addr.clone(), message.clone()))? } Ok(()) } diff --git a/src/net/task/udp.rs b/src/net/task/udp.rs index a0b98444..12beef06 100644 --- a/src/net/task/udp.rs +++ b/src/net/task/udp.rs @@ -3,10 +3,10 @@ use std::{net::SocketAddr, sync::Arc}; use bytes::Bytes; use tokio::{net::UdpSocket, spawn}; -use crate::{event::SendEvent, net::events::Send}; +use crate::{event::SendEvent, net::events::Cast}; -impl SendEvent> for Arc { - fn send(&mut self, Send(remote, message): Send) -> anyhow::Result<()> { +impl SendEvent> for Arc { + fn send(&mut self, Cast(remote, message): Cast) -> anyhow::Result<()> { let socket = self.clone(); spawn(async move { if socket.send_to(&message, remote).await.is_err() { diff --git a/src/pbft/client.rs b/src/pbft/client.rs index 4e42c353..be9f5de9 100644 --- a/src/pbft/client.rs +++ b/src/pbft/client.rs @@ -131,19 +131,22 @@ impl State { } pub mod context { + use std::marker::PhantomData; + use super::*; - pub struct Context, N, U> { + pub struct Context, N, U, A> { pub net: N, pub upcall: U, pub schedule: O::Schedule, + pub _m: PhantomData, } pub trait On { type Schedule: ScheduleEvent; } - impl, N, U, A> super::Context for Context + impl, N, U, A> super::Context for Context where N: SendMessage> + SendMessage>, U: SendEvent>, @@ -163,16 +166,16 @@ pub mod context { } mod task { - use crate::event::task::{erase::ScheduleState, ContextOf}; + use crate::event::task::{erase::ScheduleState, Context as Task}; use super::*; - impl On> for ContextOf> + impl On> for Task where N: SendMessage> + SendMessage>, U: SendEvent>, { - type Schedule = ScheduleState, Context>; + type Schedule = ScheduleState, Context>; } } } diff --git a/src/pbft/replica.rs b/src/pbft/replica.rs index 872667e7..f7ff4ac9 100644 --- a/src/pbft/replica.rs +++ b/src/pbft/replica.rs @@ -1217,7 +1217,7 @@ pub mod context { use crate::event::{ task::{ erase::{ScheduleState, Sender}, - ContextOf, + Context as Task, }, UntypedEvent, }; @@ -1230,7 +1230,7 @@ pub mod context { // all (allowed) combinations of different runtimes // i don't see any possibility for now to run the protocol with any setup other than fully // supported by tasks, so this is good enough - impl On>, State> for ContextOf> + impl On>, State> for Task where PN: SendMessage> + SendMessage, Vec>)> diff --git a/src/unreplicated.rs b/src/unreplicated.rs index f4bcc799..b2dfd274 100644 --- a/src/unreplicated.rs +++ b/src/unreplicated.rs @@ -7,7 +7,7 @@ use crate::{ codec::Payload, event::{OnErasedEvent, ScheduleEvent, SendEvent, TimerId}, net::{ - events::{Recv, Send}, + events::{Cast, Recv}, Addr, }, workload::{ @@ -61,7 +61,7 @@ pub mod client { } pub trait ClientContext { - type Net: SendEvent>>; + type Net: SendEvent>>; type Upcall: SendEvent>; type Schedule: ScheduleEvent; fn net(&mut self) -> &mut Self::Net; @@ -96,7 +96,7 @@ impl ClientState { .op .clone(), }; - context.net().send(Send((), request)) + context.net().send(Cast((), request)) } } @@ -137,7 +137,7 @@ impl ServerState { } pub trait ServerContext { - type Net: SendEvent>; + type Net: SendEvent>; fn net(&mut self) -> &mut Self::Net; } @@ -146,7 +146,7 @@ impl> OnErasedEvent>, C> for Serv match self.replies.get(&request.client_id) { Some(reply) if reply.seq > request.seq => return Ok(()), Some(reply) if reply.seq == request.seq => { - return context.net().send(Send(request.client_addr, reply.clone())) + return context.net().send(Cast(request.client_addr, reply.clone())) } _ => {} } @@ -155,26 +155,29 @@ impl> OnErasedEvent>, C> for Serv result: Payload(self.app.execute(&request.op)?), }; self.replies.insert(request.client_id, reply.clone()); - context.net().send(Send(request.client_addr, reply)) + context.net().send(Cast(request.client_addr, reply)) } } pub mod context { + use std::marker::PhantomData; + use super::*; - pub struct Client, N, U> { + pub struct Client, N, U, A> { pub net: N, pub upcall: U, pub schedule: O::Schedule, + pub _m: PhantomData, } pub trait On { type Schedule: ScheduleEvent; } - impl, N, U, A> super::ClientContext for Client + impl, N, U, A> super::ClientContext for Client where - N: SendEvent>>, + N: SendEvent>>, U: SendEvent>, { type Net = N; @@ -197,7 +200,7 @@ pub mod context { impl ServerContext for Server where - N: SendEvent>, + N: SendEvent>, { type Net = N; fn net(&mut self) -> &mut Self::Net { @@ -206,16 +209,16 @@ pub mod context { } mod task { - use crate::event::task::{erase::ScheduleState, ContextOf}; + use crate::event::task::{erase::ScheduleState, Context}; use super::*; - impl On> for ContextOf> + impl On> for Context where - N: SendEvent>>, + N: SendEvent>>, U: SendEvent>, { - type Schedule = ScheduleState, Client>; + type Schedule = ScheduleState, Client>; } } } @@ -251,7 +254,7 @@ pub mod model { use crate::{ event::combinators::Transient, - model::{Network, NetworkContext}, + model::{NetworkState, TimerState}, workload::app::kvstore::KVStore, }; @@ -276,59 +279,44 @@ pub mod model { ClientResend, } + impl From for Timer { + fn from(client::Resend: client::Resend) -> Self { + Self::ClientResend + } + } + pub struct State { clients: Vec>, - client_upcalls: Vec>>, server: ServerState, - network: Network, + client_contexts: Vec, + network: NetworkState, } - impl super::ClientContext for NetworkContext<'_, Addr, Message, Timer> { - type Net = Self; - type Schedule = Self; - type Upcall = Self; - fn net(&mut self) -> &mut Self::Net { - self - } - fn schedule(&mut self) -> &mut Self::Schedule { - self - } - fn upcall(&mut self) -> &mut Self::Upcall { - self - } - } - - impl SendEvent>> for NetworkContext<'_, Addr, Message, Timer> { - fn send(&mut self, Send((), message): Send<(), Request>) -> anyhow::Result<()> { - self.send(Send(Addr::Server, message)) - } + struct ClientLocalContext { + upcall: Transient>, + schedule: TimerState, } - impl ScheduleEvent for NetworkContext<'_, Addr, Message, Timer> { - fn set( - &mut self, - period: Duration, - _: impl FnMut() -> super::client::Resend + std::marker::Send + 'static, - ) -> anyhow::Result { - self.set(period, Timer::ClientResend) - } + struct ClientContextCarrier; - fn unset(&mut self, id: TimerId) -> anyhow::Result<()> { - NetworkContext::unset(self, id) - } + impl<'a> super::context::On> for ClientContextCarrier { + type Schedule = &'a mut TimerState; } - #[allow(unused)] - impl SendEvent> for NetworkContext<'_, Addr, Message, Timer> { - fn send(&mut self, event: InvokeOk) -> anyhow::Result<()> { - let Addr::Client(index) = self.addr else { - anyhow::bail!("unimplemented") - }; - Ok(()) + type ClientContext<'a> = super::context::Client< + ClientContextCarrier, + &'a mut NetworkState, + &'a mut Transient>, + Addr, + >; + + impl SendEvent>> for NetworkState { + fn send(&mut self, Cast((), message): Cast<(), Request>) -> anyhow::Result<()> { + self.send(Cast(Addr::Server, message)) } } - impl super::ServerContext for NetworkContext<'_, Addr, Message, Timer> { + impl ServerContext for NetworkState { type Net = Self; fn net(&mut self) -> &mut Self::Net { self