Skip to content

Commit

Permalink
Multiple vars working as intended
Browse files Browse the repository at this point in the history
  • Loading branch information
mortenhaahr committed Dec 13, 2024
1 parent d9088d9 commit 1db899f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
36 changes: 25 additions & 11 deletions src/constraint_based_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
use async_trait::async_trait;
use futures::future::join_all;
use futures::stream;
use futures::stream::BoxStream;
use futures::StreamExt;
use tokio::sync::broadcast;
use std::collections::BTreeMap;
use std::iter::zip;
use std::mem;
use async_stream::stream;

use crate::ast::LOLASpecification;
Expand Down Expand Up @@ -145,19 +141,20 @@ struct InputProducer {
}

impl InputProducer {
pub fn new(stream_collection : ValStreamCollection) -> Self {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(10);
let task_sender = sender.clone();
Self { sender }
}
pub fn run(&self, stream_collection : ValStreamCollection) {
let task_sender = self.sender.clone();
tokio::spawn(async move {
let mut inputs_stream = stream_collection.into_stream();
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
while let Some(inputs) = inputs_stream.next().await {
let data = ProducerMessage::Data(inputs);
task_sender.send(data).unwrap();
}
task_sender.send(ProducerMessage::Done).unwrap();
});
Self { sender }
}

pub fn subscribe(&self) -> broadcast::Receiver<ProducerMessage<BTreeMap<VarName, Value>>> {
Expand All @@ -167,8 +164,10 @@ impl InputProducer {

pub struct ConstraintBasedMonitor {
input_producer: InputProducer,
stream_collection: ValStreamCollection,
model: LOLASpecification,
output_handler: Box<dyn OutputHandler<Value>>,
has_inputs: bool,
}

#[async_trait]
Expand All @@ -182,13 +181,16 @@ impl Monitor<LOLASpecification, Value> for ConstraintBasedMonitor {
(var.clone(), stream.unwrap())
})
.collect::<BTreeMap<_, _>>();
let has_inputs = !input_streams.is_empty();
let stream_collection = ValStreamCollection(input_streams);
let input_producer = InputProducer::new(stream_collection);
let input_producer = InputProducer::new();

ConstraintBasedMonitor {
input_producer,
stream_collection,
model,
output_handler: output,
has_inputs,
}
}

Expand All @@ -204,6 +206,9 @@ impl Monitor<LOLASpecification, Value> for ConstraintBasedMonitor {
.map(|var| (var.clone(), self.output_stream(&var)))
.collect();
self.output_handler.provide_streams(outputs);
if self.has_inputs {
self.input_producer.run(self.stream_collection);
}
self.output_handler.run().await;
}
}
Expand All @@ -213,9 +218,11 @@ impl ConstraintBasedMonitor {
let input_receiver= self.input_producer.subscribe();
let mut runtime_initial = ConstraintBasedRuntime::default();
runtime_initial.store = model_constraints(self.model.clone());
let has_inputs = self.has_inputs.clone();
Box::pin(stream!(
let mut runtime = runtime_initial;
if has_inputs {
let mut input_receiver = input_receiver;
let mut runtime = runtime_initial;
while let Ok(inputs) = input_receiver.recv().await {
match inputs {
ProducerMessage::Data(inputs) => {
Expand All @@ -227,7 +234,14 @@ impl ConstraintBasedMonitor {
}
}
}
))
}
else {
loop {
runtime.step(&BTreeMap::new());
yield runtime.store.clone();
}
}
))
}

fn output_stream(&mut self, var: &VarName) -> BoxStream<'static, Value> {
Expand Down
1 change: 0 additions & 1 deletion tests/constraint_based_lola.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ async fn test_var() {
}

#[tokio::test]
#[ignore = "Tmp ignored"]
async fn test_literal_expression() {
let mut input_streams = input_streams1();
let mut spec = "out z\nz =42";
Expand Down

0 comments on commit 1db899f

Please sign in to comment.