Skip to content

Commit

Permalink
Update boson artifact with untrusted cops session
Browse files Browse the repository at this point in the history
  • Loading branch information
sgdxbc committed Apr 17, 2024
1 parent d18307e commit 8639eb8
Show file tree
Hide file tree
Showing 5 changed files with 329 additions and 61 deletions.
4 changes: 2 additions & 2 deletions src/bin/boson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async fn cops_start_client(
let cancel = CancellationToken::new();
use boson_control_messages::CopsVariant::*;
let handle = match &config.variant {
Untrusted => todo!(),
Untrusted => tokio::spawn(boson_cops::untrusted_client_session(config, upcall_sender)),
Replicated(_) => tokio::spawn(boson_cops::pbft_client_session(config, upcall_sender)),
};
*session = Some(AppSession {
Expand Down Expand Up @@ -241,7 +241,7 @@ async fn cops_start_server(
let cancel = CancellationToken::new();
use boson_control_messages::CopsVariant::*;
let handle = match &config.variant {
Untrusted => todo!(),
Untrusted => tokio::spawn(boson_cops::untrusted_server_session(config, cancel.clone())),
Replicated(_) => tokio::spawn(boson_cops::pbft_server_session(config, cancel.clone())),
};
*session = Some(AppSession {
Expand Down
193 changes: 181 additions & 12 deletions src/bin/boson_cops/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::{mem::take, net::SocketAddr, time::Duration};
use std::{mem::take, net::SocketAddr, ops::Range, time::Duration};

use augustus::{
app::{self, ycsb, App},
cops::{self, DefaultVersion, DefaultVersionService},
crypto::{Crypto, CryptoFlavor},
event::{
self,
Expand All @@ -23,6 +24,26 @@ use rand_distr::Uniform;
use tokio::{net::TcpListener, task::JoinSet, time::sleep};
use tokio_util::sync::CancellationToken;

fn create_workload<R>(
rng: R,
do_put: bool,
record_count: usize,
put_range: Range<usize>,
) -> anyhow::Result<ycsb::Workload<R>> {
let mut workload = ycsb::Workload::new(
rng,
if do_put {
ycsb::WorkloadSettings::new_a
} else {
ycsb::WorkloadSettings::new_c
}(record_count),
)?;
if do_put {
workload.key_num = ycsb::Gen::Uniform(Uniform::from(put_range))
}
Ok(workload)
}

pub async fn pbft_client_session(
config: CopsClient,
upcall: impl Clone + SendEvent<(f32, Duration)> + Send + Sync + 'static,
Expand All @@ -35,6 +56,7 @@ pub async fn pbft_client_session(
record_count,
put_range,
variant: CopsVariant::Replicated(config),
..
} = config
else {
anyhow::bail!("unimplemented")
Expand All @@ -46,19 +68,12 @@ pub async fn pbft_client_session(
for index in 0..num_concurrent {
let tcp_listener = TcpListener::bind((ip, 0)).await?;
let addr = tcp_listener.local_addr()?;
let do_put = index < num_concurrent_put;
// caution: non shared workload only for without insertion
let mut workload = ycsb::Workload::new(
let workload = create_workload(
StdRng::seed_from_u64(117418 + index as u64),
if do_put {
ycsb::WorkloadSettings::new_a
} else {
ycsb::WorkloadSettings::new_c
}(record_count),
index < num_concurrent_put,
record_count,
put_range.clone(),
)?;
if do_put {
workload.key_num = ycsb::Gen::Uniform(Uniform::from(put_range.clone()))
}

let mut dispatch_session = event::Session::new();
let mut client_session = Session::new();
Expand Down Expand Up @@ -195,4 +210,158 @@ pub async fn pbft_server_session(
anyhow::bail!("unreachable")
}

pub async fn untrusted_client_session(
config: CopsClient,
upcall: impl Clone + SendEvent<(f32, Duration)> + Send + Sync + 'static,
) -> anyhow::Result<()> {
let CopsClient {
addrs,
ip,
index,
num_concurrent,
num_concurrent_put,
record_count,
put_range,
variant: CopsVariant::Untrusted,
} = config
else {
anyhow::bail!("unimplemented")
};
let replica_addr = addrs[index];

let mut sessions = JoinSet::new();
for index in 0..num_concurrent {
let tcp_listener = TcpListener::bind((ip, 0)).await?;
let addr = tcp_listener.local_addr()?;
let workload = create_workload(
StdRng::seed_from_u64(117418 + index as u64),
index < num_concurrent_put,
record_count,
put_range.clone(),
)?;

let mut dispatch_session = event::Session::new();
let mut client_session = Session::new();
let mut close_loop_session = Session::new();

let mut dispatch = event::Unify(event::Buffered::from(Dispatch::new(
Tcp::new(addr)?,
{
let mut sender = Sender::from(client_session.sender());
move |buf: &_| cops::to_client_on_buf(buf, &mut sender)
},
Once(dispatch_session.sender()),
)?));
let mut client = Blanket(Buffered::from(
cops::Client::<_, _, DefaultVersion, _>::new(
addr,
replica_addr,
cops::ToReplicaMessageNet::new(dispatch::Net::from(dispatch_session.sender())),
Box::new(Sender::from(close_loop_session.sender()))
as Box<dyn cops::Upcall + Send + Sync>,
),
));
let mut close_loop = Blanket(Unify(CloseLoop::new(
Sender::from(client_session.sender()),
workload,
)));

let mut upcall = upcall.clone();
sessions.spawn(async move {
let dispatch_session = dispatch_session.run(&mut dispatch);
let client_session = client_session.run(&mut client);
let close_loop_session = async move {
Sender::from(close_loop_session.sender()).send(Init)?;
tokio::select! {
() = sleep(Duration::from_millis(5000)) => {}
result = close_loop_session.run(&mut close_loop) => result?,
}
close_loop.workload.as_mut().clear();
tokio::select! {
() = sleep(Duration::from_millis(10000)) => {}
result = close_loop_session.run(&mut close_loop) => result?,
}
let latencies = take(close_loop.workload.as_mut());
tokio::select! {
() = sleep(Duration::from_millis(5000)) => {}
result = close_loop_session.run(&mut close_loop) => result?,
}
upcall.send((
latencies.len() as f32 / 10.,
latencies.iter().sum::<Duration>() / latencies.len().max(1) as u32,
))
};
tokio::select! {
result = dispatch_session => result?,
result = client_session => result?,
result = close_loop_session => return result,
}
anyhow::bail!("unreachable")
});
}

Ok(())
}

pub async fn untrusted_server_session(
config: CopsServer,
cancel: CancellationToken,
) -> anyhow::Result<()> {
let CopsServer {
addrs,
id,
record_count,
variant: CopsVariant::Untrusted,
} = config
else {
anyhow::bail!("unimplemented")
};
let addr = addrs[id as usize];

let tcp_listener = TcpListener::bind(addr).await?;
let mut dispatch_session = event::Session::new();
let mut replica_session = Session::new();

let mut dispatch = event::Unify(event::Buffered::from(Dispatch::new(
Tcp::new(addr)?,
{
let mut sender = Sender::from(replica_session.sender());
move |buf: &_| cops::to_replica_on_buf(buf, &mut sender)
},
Once(dispatch_session.sender()),
)?));
let mut replica = Blanket(Buffered::from(
cops::Replica::<_, _, _, _, SocketAddr>::new(
DefaultVersion::default(),
cops::ToReplicaMessageNet::<_, _, SocketAddr>::new(IndexNet::new(
dispatch::Net::from(dispatch_session.sender()),
addrs,
id as usize,
)),
cops::ToClientMessageNet::new(dispatch::Net::from(dispatch_session.sender())),
DefaultVersionService(Box::new(Sender::from(replica_session.sender()))
as Box<dyn SendEvent<cops::events::UpdateOk<DefaultVersion>> + Send + Sync>),
),
));
{
let mut workload =
ycsb::Workload::new(thread_rng(), ycsb::WorkloadSettings::new(record_count))?;
let mut workload = Iter::<_, _, ()>::from(workload.startup_ops());
while let Some((op, ())) = workload.next_op()? {
replica.startup_insert(op)?
}
}

let tcp_accept_session = tcp::accept_session(tcp_listener, dispatch_session.sender());
let dispatch_session = dispatch_session.run(&mut dispatch);
let replica_session = replica_session.run(&mut replica);
tokio::select! {
() = cancel.cancelled() => return Ok(()),
result = tcp_accept_session => result?,
result = dispatch_session => result?,
result = replica_session => result?,
}
anyhow::bail!("unreachable")
}

// cSpell:words pbft upcall
Loading

0 comments on commit 8639eb8

Please sign in to comment.