Skip to content

Commit

Permalink
Update boson cops control
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Apr 17, 2024
1 parent 6439a3c commit d18307e
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 45 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 2 additions & 9 deletions src/app/ycsb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ use rand_distr::{WeightedAliasIndex, Zeta, Zipf};
use rustc_hash::FxHasher;
use serde::{Deserialize, Serialize};

use crate::workload;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Op {
Read(String),
Expand Down Expand Up @@ -177,7 +175,7 @@ impl WorkloadSettings {
}
}

#[derive(Clone)]
#[derive(Clone, derive_more::AsMut)]
pub struct Workload<R> {
rng: R,
settings: WorkloadSettings,
Expand All @@ -192,6 +190,7 @@ pub struct Workload<R> {

transaction_count: usize,
rmw_update: Option<Op>,
#[as_mut]
pub latencies: Vec<Duration>,
start: Option<Instant>,
}
Expand Down Expand Up @@ -461,12 +460,6 @@ impl<R: Rng> crate::workload::Workload for Workload<R> {
}
}

impl<R> From<workload::Json<Workload<R>>> for Vec<Duration> {
fn from(value: workload::Json<Workload<R>>) -> Self {
value.0.latencies
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand Down
59 changes: 47 additions & 12 deletions src/bin/boson.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod boson_cops;
mod boson_mutex;

use std::{backtrace::BacktraceStatus, sync::Arc};
use std::{backtrace::BacktraceStatus, sync::Arc, time::Duration};

use axum::{
extract::State,
Expand All @@ -28,10 +28,12 @@ async fn main() -> anyhow::Result<()> {
let app = Router::new()
.route("/ok", get(ok))
.route("/mutex/start", post(mutex_start))
.route("/mutex/stop", post(mutex_stop))
.route("/mutex/stop", post(stop))
.route("/mutex/request", post(mutex_request))
.route("/cops/client/start", post(cops_client_start))
.route("/cops/server/start", post(cops_server_start))
.route("/cops/start-client", post(cops_start_client))
.route("/cops/poll-results", post(cops_poll_results))
.route("/cops/start-server", post(cops_start_server))
.route("/cops/stop-server", post(stop))
.with_state(AppState {
session: Default::default(),
});
Expand All @@ -54,7 +56,13 @@ struct AppSession {
handle: JoinHandle<anyhow::Result<()>>,
cancel: CancellationToken,
event_sender: UnboundedSender<boson_mutex::Event>,
upcall: UnboundedReceiver<augustus::lamport_mutex::events::RequestOk>,
upcall: UnboundedReceiver<Upcall>,
}

#[derive(derive_more::From)]
enum Upcall {
RequestOk(augustus::lamport_mutex::events::RequestOk),
ThroughputLatency(f32, Duration),
}

fn log_exit(err: anyhow::Error) -> StatusCode {
Expand Down Expand Up @@ -127,7 +135,7 @@ async fn mutex_start(
}
}

async fn mutex_stop(State(state): State<AppState>) -> StatusCode {
async fn stop(State(state): State<AppState>) -> StatusCode {
if let Err(err) = async {
let mut session = state.session.lock().await;
let Some(session) = session.take() else {
Expand All @@ -153,7 +161,8 @@ async fn mutex_request(State(state): State<AppState>) -> Response {
let start = Instant::now();
session.event_sender.send(boson_mutex::Event::Request)?;
// TODO timeout
session.upcall.recv().await;
let result = session.upcall.recv().await;
anyhow::ensure!(matches!(result, Some(Upcall::RequestOk(_))));
session.event_sender.send(boson_mutex::Event::Release)?;
Ok(Json(start.elapsed()))
};
Expand All @@ -163,20 +172,20 @@ async fn mutex_request(State(state): State<AppState>) -> Response {
}
}

async fn cops_client_start(
async fn cops_start_client(
State(state): State<AppState>,
Json(config): Json<boson_control_messages::CopsClient>,
) -> StatusCode {
if let Err(err) = async {
let mut session = state.session.lock().await;
anyhow::ensure!(session.is_none());
let (event_sender, _) = unbounded_channel();
let (_, upcall_receiver) = unbounded_channel();
let (upcall_sender, upcall_receiver) = unbounded_channel();
let cancel = CancellationToken::new();
use boson_control_messages::CopsVariant::*;
let handle = match &config.variant {
Untrusted => todo!(),
Replicated(_) => tokio::spawn(boson_cops::pbft_client_session(config)),
Replicated(_) => tokio::spawn(boson_cops::pbft_client_session(config, upcall_sender)),
};
*session = Some(AppSession {
handle,
Expand All @@ -194,7 +203,33 @@ async fn cops_client_start(
}
}

async fn cops_server_start(
async fn cops_poll_results(State(state): State<AppState>) -> Response {
let task = async {
let mut state_session = state.session.lock().await;
let Some(session) = state_session.as_mut() else {
anyhow::bail!("unimplemented")
};
if !session.upcall.is_closed() {
Ok(None)
} else {
let mut results = Vec::new();
while let Ok(result) = session.upcall.try_recv() {
let Upcall::ThroughputLatency(throughput, latency) = result else {
anyhow::bail!("unimplemented")
};
results.push((throughput, latency))
}
state_session.take().unwrap().handle.await??;
Ok(Some(results))
}
};
match task.await {
Ok(results) => Json(results).into_response(),
Err(err) => log_exit(err).into_response(),
}
}

async fn cops_start_server(
State(state): State<AppState>,
Json(config): Json<boson_control_messages::CopsServer>,
) -> StatusCode {
Expand Down Expand Up @@ -225,4 +260,4 @@ async fn cops_server_start(
}
}

// cSpell:words upcall lamport
// cSpell:words upcall lamport pbft
41 changes: 32 additions & 9 deletions src/bin/boson_cops/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::net::SocketAddr;
use std::{mem::take, net::SocketAddr, time::Duration};

use augustus::{
app::{self, ycsb, App},
crypto::{Crypto, CryptoFlavor},
event::{
self,
erased::{session::Sender, Blanket, Buffered, Session, Unify},
Once,
erased::{events::Init, session::Sender, Blanket, Buffered, Session, Unify},
Once, SendEvent,
},
net::{
dispatch,
Expand All @@ -20,10 +20,13 @@ use augustus::{
use boson_control_messages::{CopsClient, CopsServer, CopsVariant};
use rand::{rngs::StdRng, thread_rng, SeedableRng};
use rand_distr::Uniform;
use tokio::{net::TcpListener, task::JoinSet};
use tokio::{net::TcpListener, task::JoinSet, time::sleep};
use tokio_util::sync::CancellationToken;

pub async fn pbft_client_session(config: CopsClient) -> anyhow::Result<()> {
pub async fn pbft_client_session(
config: CopsClient,
upcall: impl Clone + SendEvent<(f32, Duration)> + Send + Sync + 'static,
) -> anyhow::Result<()> {
let CopsClient {
addrs,
ip,
Expand Down Expand Up @@ -86,13 +89,30 @@ pub async fn pbft_client_session(config: CopsClient) -> anyhow::Result<()> {
Json(workload),
)));

let mut upcall = upcall.clone();
sessions.spawn(async move {
let dispatch_session = dispatch_session.run(&mut dispatch);
let client_session = client_session.run(&mut client);
let close_loop_session = async move {
//
close_loop_session.run(&mut close_loop).await?;
Ok(())
Sender::from(close_loop_session.sender()).send(Init)?;
tokio::select! {
() = sleep(Duration::from_millis(5000)) => {}
result = close_loop_session.run(&mut close_loop) => result?,
}
close_loop.workload.as_mut().clear();
tokio::select! {
() = sleep(Duration::from_millis(10000)) => {}
result = close_loop_session.run(&mut close_loop) => result?,
}
let latencies = take(close_loop.workload.as_mut());
tokio::select! {
() = sleep(Duration::from_millis(5000)) => {}
result = close_loop_session.run(&mut close_loop) => result?,
}
upcall.send((
latencies.len() as f32 / 10.,
latencies.iter().sum::<Duration>() / latencies.len().max(1) as u32,
))
};
tokio::select! {
result = dispatch_session => result?,
Expand All @@ -102,6 +122,9 @@ pub async fn pbft_client_session(config: CopsClient) -> anyhow::Result<()> {
anyhow::bail!("unreachable")
});
}
while let Some(result) = sessions.join_next().await {
result??
}
Ok(())
}

Expand Down Expand Up @@ -172,4 +195,4 @@ pub async fn pbft_server_session(
anyhow::bail!("unreachable")
}

// cSpell:words pbft
// cSpell:words pbft upcall
9 changes: 3 additions & 6 deletions src/bin/boson_mutex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ use augustus::{
workload::{events::InvokeOk, Queue},
};
use boson_control_messages::{MutexReplicated, MutexUntrusted};
use tokio::{
net::TcpListener,
sync::mpsc::{UnboundedReceiver, UnboundedSender},
};
use tokio::{net::TcpListener, sync::mpsc::UnboundedReceiver};
use tokio_util::sync::CancellationToken;

pub enum Event {
Expand All @@ -36,7 +33,7 @@ pub enum Event {
pub async fn untrusted_session(
config: MutexUntrusted,
mut events: UnboundedReceiver<Event>,
upcall: UnboundedSender<RequestOk>,
upcall: impl SendEvent<RequestOk> + Send + Sync + 'static,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let id = config.id;
Expand Down Expand Up @@ -107,7 +104,7 @@ pub async fn untrusted_session(
pub async fn replicated_session(
config: MutexReplicated,
mut events: UnboundedReceiver<Event>,
upcall: UnboundedSender<RequestOk>,
upcall: impl SendEvent<RequestOk> + Send + Sync + 'static,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let id = config.id;
Expand Down
5 changes: 3 additions & 2 deletions src/bin/replication.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
future::{pending, Future},
iter::repeat_with,
mem::take,
net::SocketAddr,
sync::{Arc, Mutex},
time::Duration,
Expand Down Expand Up @@ -237,7 +238,7 @@ async fn spawn_client_sessions<
+ Send
+ Sync
+ 'static,
W: Workload<Op = Payload, Result = Payload> + Into<Vec<Duration>> + Send + Sync + 'static,
W: Workload<Op = Payload, Result = Payload> + AsMut<Vec<Duration>> + Send + Sync + 'static,
>(
sessions: &mut JoinSet<anyhow::Result<()>>,
config: ClientConfig,
Expand Down Expand Up @@ -293,7 +294,7 @@ where
latencies
.lock()
.unwrap()
.extend(close_loop.0 .0.workload.into());
.extend(take(close_loop.workload.as_mut()));
barrier.wait().await;
Ok(())
});
Expand Down
11 changes: 9 additions & 2 deletions src/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ impl<T: Iterator<Item = O>, O, R> Workload for Iter<T, O, R> {
// generally speaking, there should be a concept of "transaction" that composed from one or more
// ops, and latency is mean to be measured against transactions
// currently the transaction concept is skipped, maybe revisit the design later
#[derive(Debug, derive_more::Deref)]
#[derive(Debug, derive_more::Deref, derive_more::AsMut)]
pub struct OpLatency<W> {
#[deref]
inner: W,
#[as_mut]
pub latencies: Vec<Duration>,
}

Expand Down Expand Up @@ -247,9 +248,15 @@ impl<I: Iterator<Item = (O, R)>, O, R: Debug + Eq + Send + Sync + 'static> Workl
}
}

#[derive(Debug, Clone, derive_more::Deref)]
#[derive(Debug, Clone, derive_more::Deref, derive_more::DerefMut)]
pub struct Json<W>(pub W);

impl<W: AsMut<T>, T> AsMut<T> for Json<W> {
fn as_mut(&mut self) -> &mut T {
self.0.as_mut()
}
}

impl<W: Workload> Workload for Json<W>
where
W::Op: Serialize,
Expand Down
4 changes: 2 additions & 2 deletions tools/boson-control-messages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ pub struct CopsClient {
pub variant: CopsVariant,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CopsVariant {
Untrusted,
Replicated(CopsReplicated),
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CopsReplicated {
pub num_faulty: usize,
}
Loading

0 comments on commit d18307e

Please sign in to comment.