Skip to content

Commit

Permalink
Add support for pluggable OutputHandlers via an appropriate interface…
Browse files Browse the repository at this point in the history
… trait
  • Loading branch information
twright committed Dec 4, 2024
1 parent a7b87eb commit 6355417
Show file tree
Hide file tree
Showing 15 changed files with 631 additions and 276 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ serde_json = "1.0.133"
# mqtt client
# Disable the default dependency on SSL to avoid a build dependency on OpenSSL
paho-mqtt = {version = "0.12.5", features=["bundled"]}
async-trait = "0.1.51"

[features]
ros = ["dep:r2r"]
Expand Down
88 changes: 34 additions & 54 deletions benches/simple_add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ use criterion::BenchmarkId;
use criterion::Criterion;
use criterion::SamplingMode;
use criterion::{criterion_group, criterion_main};
use futures::{
stream::{self, BoxStream},
StreamExt,
};
use futures::stream::{self, BoxStream};
use trustworthiness_checker::null_output_handler::NullOutputHandler;
use trustworthiness_checker::type_checking::type_check;
use trustworthiness_checker::OutputStream;
use trustworthiness_checker::{Value, Monitor, VarName};
use trustworthiness_checker::{Monitor, Value, VarName};

pub fn spec_simple_add_monitor() -> &'static str {
"in x\n\
Expand All @@ -27,22 +25,18 @@ pub fn spec_simple_add_monitor_typed() -> &'static str {
z = x + y"
}

pub fn input_streams_concrete(
size: usize,
) -> BTreeMap<VarName, BoxStream<'static, Value>> {
pub fn input_streams_concrete(size: usize) -> BTreeMap<VarName, BoxStream<'static, Value>> {
let size = size as i64;
let mut input_streams = BTreeMap::new();
input_streams.insert(
VarName("x".into()),
Box::pin(stream::iter(
(0..size).map(|x| Value::Int(2 * x)),
)) as Pin<Box<dyn futures::Stream<Item = Value> + std::marker::Send>>,
Box::pin(stream::iter((0..size).map(|x| Value::Int(2 * x))))
as Pin<Box<dyn futures::Stream<Item = Value> + std::marker::Send>>,
);
input_streams.insert(
VarName("y".into()),
Box::pin(stream::iter(
(0..size).map(|y| Value::Int(2 * y + 1)),
)) as Pin<Box<dyn futures::Stream<Item = Value> + std::marker::Send>>,
Box::pin(stream::iter((0..size).map(|y| Value::Int(2 * y + 1))))
as Pin<Box<dyn futures::Stream<Item = Value> + std::marker::Send>>,
);
input_streams
}
Expand All @@ -52,100 +46,86 @@ pub fn input_streams_typed(size: usize) -> BTreeMap<VarName, OutputStream<Value>
let size = size as i64;
input_streams.insert(
VarName("x".into()),
Box::pin(stream::iter(
(0..size).map(|x| Value::Int(2 * x)),
)) as OutputStream<Value>,
Box::pin(stream::iter((0..size).map(|x| Value::Int(2 * x)))) as OutputStream<Value>,
);
input_streams.insert(
VarName("y".into()),
Box::pin(stream::iter(
(0..size).map(|y| Value::Int(2 * y + 1)),
)),
Box::pin(stream::iter((0..size).map(|y| Value::Int(2 * y + 1)))),
);
input_streams
}

async fn monitor_outputs_untyped_constraints(num_outputs: usize) {
let mut input_streams = input_streams_concrete(num_outputs);
let spec = trustworthiness_checker::lola_specification(&mut spec_simple_add_monitor()).unwrap();
let mut async_monitor =
let output_handler = NullOutputHandler::new(spec.output_vars.clone());
let async_monitor =
trustworthiness_checker::constraint_based_runtime::ConstraintBasedMonitor::new(
spec,
&mut input_streams,
output_handler,
);
let _outputs: Vec<BTreeMap<VarName, Value>> = async_monitor
.monitor_outputs()
.take(num_outputs)
.collect()
.await;
async_monitor.run().await;
}

async fn monitor_outputs_untyped_async(num_outputs: usize) {
let mut input_streams = input_streams_concrete(num_outputs);
let spec = trustworthiness_checker::lola_specification(&mut spec_simple_add_monitor()).unwrap();
let mut async_monitor = trustworthiness_checker::async_runtime::AsyncMonitorRunner::<
let output_handler = NullOutputHandler::new(spec.output_vars.clone());
let async_monitor = trustworthiness_checker::async_runtime::AsyncMonitorRunner::<
_,
_,
trustworthiness_checker::UntimedLolaSemantics,
trustworthiness_checker::LOLASpecification,
>::new(spec, &mut input_streams);
let _outputs: Vec<BTreeMap<VarName, Value>> = async_monitor
.monitor_outputs()
.take(num_outputs)
.collect()
.await;
_,
>::new(spec, &mut input_streams, output_handler);
async_monitor.run().await;
}

async fn monitor_outputs_typed_async(num_outputs: usize) {
let mut input_streams = input_streams_typed(num_outputs);
let spec =
trustworthiness_checker::lola_specification(&mut spec_simple_add_monitor_typed()).unwrap();
let spec = type_check(spec).expect("Type check failed");
let mut async_monitor = trustworthiness_checker::async_runtime::AsyncMonitorRunner::<
let output_handler = NullOutputHandler::new(spec.output_vars.clone());
let async_monitor = trustworthiness_checker::async_runtime::AsyncMonitorRunner::<
_,
_,
trustworthiness_checker::TypedUntimedLolaSemantics,
_,
>::new(spec, &mut input_streams);
let _outputs: Vec<BTreeMap<VarName, _>> = async_monitor
.monitor_outputs()
.take(num_outputs)
.collect()
.await;
_,
>::new(spec, &mut input_streams, output_handler);
async_monitor.run().await;
}

async fn monitor_outputs_untyped_queuing(num_outputs: usize) {
let mut input_streams = input_streams_concrete(num_outputs);
let spec = trustworthiness_checker::lola_specification(&mut spec_simple_add_monitor()).unwrap();
let mut async_monitor = trustworthiness_checker::queuing_runtime::QueuingMonitorRunner::<
let output_handler = NullOutputHandler::new(spec.output_vars.clone());
let async_monitor = trustworthiness_checker::queuing_runtime::QueuingMonitorRunner::<
_,
_,
trustworthiness_checker::UntimedLolaSemantics,
trustworthiness_checker::LOLASpecification,
>::new(spec, &mut input_streams);
let _outputs: Vec<BTreeMap<VarName, Value>> = async_monitor
.monitor_outputs()
.take(num_outputs)
.collect()
.await;
_,
>::new(spec, &mut input_streams, output_handler);
async_monitor.run().await;
}

async fn monitor_outputs_typed_queuing(num_outputs: usize) {
let mut input_streams = input_streams_typed(num_outputs);
let spec =
trustworthiness_checker::lola_specification(&mut spec_simple_add_monitor_typed()).unwrap();
let spec = type_check(spec).expect("Type check failed");
let mut async_monitor = trustworthiness_checker::queuing_runtime::QueuingMonitorRunner::<
let output_handler = NullOutputHandler::new(spec.output_vars.clone());
let async_monitor = trustworthiness_checker::queuing_runtime::QueuingMonitorRunner::<
_,
_,
trustworthiness_checker::TypedUntimedLolaSemantics,
_,
>::new(spec, &mut input_streams);
let _outputs: Vec<BTreeMap<VarName, _>> = async_monitor
.monitor_outputs()
// .take(num_outputs)
.collect()
.await;
_,
>::new(spec, &mut input_streams, output_handler);
async_monitor.run().await;
}

fn from_elem(c: &mut Criterion) {
Expand Down
52 changes: 17 additions & 35 deletions src/async_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::mem;
use std::sync::Arc;

use async_trait::async_trait;
use futures::future::join_all;
use futures::stream;
use futures::stream::BoxStream;
use futures::StreamExt;
use tokio::select;
use tokio::sync::broadcast;
Expand All @@ -20,6 +18,7 @@ use tokio_util::sync::DropGuard;
use crate::core::InputProvider;
use crate::core::Monitor;
use crate::core::MonitoringSemantics;
use crate::core::OutputHandler;
use crate::core::Specification;
use crate::core::{OutputStream, StreamContext, StreamData, VarName};
use crate::stream_utils::{drop_guard_stream, oneshot_to_stream};
Expand Down Expand Up @@ -416,12 +415,16 @@ impl<Val: StreamData> StreamContext<Val> for SubMonitor<Val> {
* expressions as streams.
* - The M type parameter is the model/specification being monitored.
*/
pub struct AsyncMonitorRunner<Expr, Val, S, M>
pub struct AsyncMonitorRunner<Expr, Val, S, M, H>
where
Val: StreamData,
S: MonitoringSemantics<Expr, Val>,
M: Specification<Expr>,
H: OutputHandler<Val> + Send,
Expr: Sync + Send
{
model: M,
output_handler: H,
output_streams: BTreeMap<VarName, OutputStream<Val>>,
#[allow(dead_code)]
// This is used for RAII to cancel background tasks when the async var
Expand All @@ -431,13 +434,15 @@ where
semantics_t: PhantomData<S>,
}

impl<Expr, Val, S, M> Monitor<M, Val> for AsyncMonitorRunner<Expr, Val, S, M>
#[async_trait]
impl<Expr: Sync + Send, Val, S, M, H> Monitor<M, Val, H> for AsyncMonitorRunner<Expr, Val, S, M, H>
where
Val: StreamData,
S: MonitoringSemantics<Expr, Val>,
M: Specification<Expr>,
H: OutputHandler<Val> + Send + Sync,
{
fn new(model: M, input_streams: &mut dyn InputProvider<Val>) -> Self {
fn new(model: M, input_streams: &mut dyn InputProvider<Val>, output: H) -> Self {
let cancellation_token = CancellationToken::new();
let cancellation_guard = Arc::new(cancellation_token.clone().drop_guard());

Expand Down Expand Up @@ -525,39 +530,16 @@ where
semantics_t: PhantomData,
cancellation_guard,
expr_t: PhantomData,
output_handler: output,
}
}

fn spec(&self) -> &M {
&self.model
}

fn monitor_outputs(&mut self) -> BoxStream<'static, BTreeMap<VarName, Val>> {
let output_streams = mem::take(&mut self.output_streams);
let mut outputs = self.model.output_vars();
outputs.sort();

Box::pin(stream::unfold(
(output_streams, outputs),
|(mut output_streams, outputs)| async move {
let mut futures = vec![];
for (_, stream) in output_streams.iter_mut() {
futures.push(stream.next());
}

let next_vals = join_all(futures).await;
let mut res: BTreeMap<VarName, Val> = BTreeMap::new();
for (var, val) in outputs.clone().iter().zip(next_vals) {
res.insert(
var.clone(),
match val {
Some(val) => val,
None => return None,
},
);
}
return Some((res, (output_streams, outputs)));
},
)) as BoxStream<'static, BTreeMap<VarName, Val>>

async fn run(mut self) {
self.output_handler.provide_streams(self.output_streams);
self.output_handler.run().await;
}
}
}
Loading

0 comments on commit 6355417

Please sign in to comment.