Skip to content

Commit

Permalink
Add four more federated examples
Browse files Browse the repository at this point in the history
- InheritanceFederatedImport.lf, LevelPattern.lf,
  LoopDistributedDecentralized.lf, and LoopDistributedDouble.lf
  • Loading branch information
chanijjani committed Mar 7, 2024
1 parent 088869b commit 130ea84
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 0 deletions.
17 changes: 17 additions & 0 deletions test/RustRti/src/federated/InheritanceFederatedImport.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Test for inheritance in a federated program where the superclass is imported from a different file.
// Compilation without errors is success.
target C {
timeout: 1 ms
}

import HelloWorld2 from "../HelloWorld.lf"

reactor Print extends HelloWorld2 {
reaction(startup) {=
printf("Foo\n");
=}
}

federated reactor {
print = new Print()
}
54 changes: 54 additions & 0 deletions test/RustRti/src/federated/LevelPattern.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* This test verifies that the artificial dependencies introduced by level-based scheduling do not,
* by themselves, introduce deadlocks in federated execution.
*
* @author Edward A. Lee
*/
target C {
timeout: 1 s
}

import Count from "../lib/Count.lf"
import TestCount from "../lib/TestCount.lf"

reactor Through {
input in: int
output out: int

reaction(in) -> out {=
lf_set(out, in->value);
=}
}

reactor A {
input in1: int
input in2: int
output out1: int
output out2: int

i1 = new Through()
i1.out -> out1

i2 = new Through()
i2.out -> out2

reaction(in1) -> i1.in {=
lf_set(i1.in, in1->value);
=}

reaction(in2) -> i2.in {=
lf_set(i2.in, in2->value);
=}
}

federated reactor {
c = new Count()
test = new TestCount(num_inputs=2)
b = new A()
t = new Through()

c.out -> b.in1
b.out1 -> t.in
t.out -> b.in2
b.out2 -> test.in
}
94 changes: 94 additions & 0 deletions test/RustRti/src/federated/LoopDistributedDecentralized.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* This tests a feedback loop with physical actions and decentralized coordination.
*
* @author Edward A. Lee
*/
target C {
coordination: decentralized,
logging: LOG,
timeout: 5 sec
}

preamble {=
#include <unistd.h> // Defines sleep()
extern bool stop;
void* ping(void* actionref);
=}

reactor Looper(incr: int = 1, delay: time = 0 msec, stp_offset: time = 0) {
preamble {=
bool stop = false;
int count = 0;
// Thread to trigger an action once every second.
void* ping(void* actionref) {
while(!stop && count++ < 30) {
lf_print("Scheduling action.");
lf_schedule(actionref, 0);
sleep(1);
}
if (count >= 30) {
// shutdown reaction failed to be invoked.
lf_print_error_and_exit("Shutdown failed to be invoked in time.");
}
return NULL;
}
=}
input in: int
output out: int
physical action a(stp_offset)
state count: int = 0
state inputs: int = 0

reaction(startup) -> a {=
// Start the thread that listens for Enter or Return.
lf_thread_t thread_id;
lf_print("Starting thread.");
lf_thread_create(&thread_id, &ping, a);
=}

reaction(a) -> out {=
lf_print("Setting out.");
lf_set(out, self->count);
self->count += self->incr;
=}

reaction(in) {=
self->inputs++;
instant_t time_lag = lf_time_physical() - lf_time_logical();
char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807
lf_comma_separated_time(time_buffer, time_lag);
lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer);
=} STP(stp_offset) {=
self->inputs++;
instant_t time_lag = lf_time_physical() - lf_time_logical();
char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807
lf_comma_separated_time(time_buffer, time_lag);
lf_print("STP offset was violated. Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer);
=} deadline(10 msec) {=
// Note: Could conceivably get both an STP violation and a deadline violation,
// which will result in double counting this input. This is OK, though.
self->inputs++;
instant_t time_lag = lf_time_physical() - lf_time_logical();
char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807
lf_comma_separated_time(time_buffer, time_lag);
lf_print("Deadline miss. Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer);
=}

reaction(shutdown) {=
lf_print("******* Shutdown invoked.");
// Stop the thread that is scheduling actions.
stop = true;
// Don't require receiving all five inputs because that creates flakiness.
// Requiring two leaves a lot of headroom.
if (self->inputs < 2) {
lf_print_error_and_exit("Received only %d inputs.", self->count);
}
=}
}

federated reactor LoopDistributedDecentralized(delay: time = 0) {
left = new Looper(stp_offset = 900 usec)
right = new Looper(incr=-1, stp_offset = 2400 usec)
left.out -> right.in
right.out -> left.in
}
98 changes: 98 additions & 0 deletions test/RustRti/src/federated/LoopDistributedDouble.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* This tests a feedback loop with physical actions and centralized coordination.
*
* @author Edward A. Lee
*/
target C {
coordination: centralized,
coordination-options: {
advance-message-interval: 100 msec
},
timeout: 5 sec,
logging: warn
}

preamble {=
#include <unistd.h> // Defines sleep()
extern bool stop;
void* ping(void* actionref);
=}

reactor Looper(incr: int = 1, delay: time = 0 msec) {
preamble {=
bool stop = false;
// Thread to trigger an action once every second.
void* ping(void* actionref) {
while(!stop) {
lf_print("Scheduling action.");
lf_schedule(actionref, 0);
sleep(1);
}
return NULL;
}
=}
input in: int
input in2: int
output out: int
output out2: int
physical action a(delay)
state count: int = 0
timer t(0, 1 sec)

reaction(startup) -> a {=
// Start the thread that listens for Enter or Return.
lf_thread_t thread_id;
lf_print("Starting thread.");
lf_thread_create(&thread_id, &ping, a);
=}

reaction(a) -> out, out2 {=
if (self->count%2 == 0) {
lf_set(out, self->count);
} else {
lf_set(out2, self->count);
}
self->count += self->incr;
=}

reaction(in) {=
tag_t current_tag = lf_tag();
lf_print("Received %d at logical time " PRINTF_TAG ".",
in->value,
current_tag.time - lf_time_start(), current_tag.microstep
);
=}

reaction(in2) {=
tag_t current_tag = lf_tag();
lf_print("Received %d on in2 at logical time " PRINTF_TAG ".",
in2->value,
current_tag.time - lf_time_start(), current_tag.microstep
);
=}

reaction(t) {=
tag_t current_tag = lf_tag();
lf_print("Timer triggered at logical time " PRINTF_TAG ".",
current_tag.time - lf_time_start(), current_tag.microstep
);
=}

reaction(shutdown) {=
lf_print("******* Shutdown invoked.");
// Stop the thread that is scheduling actions.
stop = true;
if (self->count != 5 * self->incr) {
lf_print_error_and_exit("Failed to receive all five expected inputs.");
}
=}
}

federated reactor(delay: time = 0) {
left = new Looper()
right = new Looper(incr=-1)
left.out -> right.in
right.out -> left.in
right.out2 -> left.in2
left.out2 -> right.in2
}

0 comments on commit 130ea84

Please sign in to comment.