From 130ea840d154def50ff12abe642daa7f8555cbc4 Mon Sep 17 00:00:00 2001 From: Chanhee Lee Date: Thu, 7 Mar 2024 14:26:46 -0700 Subject: [PATCH] Add four more federated examples - InheritanceFederatedImport.lf, LevelPattern.lf, LoopDistributedDecentralized.lf, and LoopDistributedDouble.lf --- .../federated/InheritanceFederatedImport.lf | 17 ++++ test/RustRti/src/federated/LevelPattern.lf | 54 ++++++++++ .../federated/LoopDistributedDecentralized.lf | 94 ++++++++++++++++++ .../src/federated/LoopDistributedDouble.lf | 98 +++++++++++++++++++ 4 files changed, 263 insertions(+) create mode 100644 test/RustRti/src/federated/InheritanceFederatedImport.lf create mode 100644 test/RustRti/src/federated/LevelPattern.lf create mode 100644 test/RustRti/src/federated/LoopDistributedDecentralized.lf create mode 100644 test/RustRti/src/federated/LoopDistributedDouble.lf diff --git a/test/RustRti/src/federated/InheritanceFederatedImport.lf b/test/RustRti/src/federated/InheritanceFederatedImport.lf new file mode 100644 index 0000000000..37da485956 --- /dev/null +++ b/test/RustRti/src/federated/InheritanceFederatedImport.lf @@ -0,0 +1,17 @@ +// Test for inheritance in a federated program where the superclass is imported from a different file. +// Compilation without errors is success. +target C { + timeout: 1 ms +} + +import HelloWorld2 from "../HelloWorld.lf" + +reactor Print extends HelloWorld2 { + reaction(startup) {= + printf("Foo\n"); + =} +} + +federated reactor { + print = new Print() +} diff --git a/test/RustRti/src/federated/LevelPattern.lf b/test/RustRti/src/federated/LevelPattern.lf new file mode 100644 index 0000000000..1721861b14 --- /dev/null +++ b/test/RustRti/src/federated/LevelPattern.lf @@ -0,0 +1,54 @@ +/** + * This test verifies that the artificial dependencies introduced by level-based scheduling do not, + * by themselves, introduce deadlocks in federated execution. + * + * @author Edward A. Lee + */ +target C { + timeout: 1 s +} + +import Count from "../lib/Count.lf" +import TestCount from "../lib/TestCount.lf" + +reactor Through { + input in: int + output out: int + + reaction(in) -> out {= + lf_set(out, in->value); + =} +} + +reactor A { + input in1: int + input in2: int + output out1: int + output out2: int + + i1 = new Through() + i1.out -> out1 + + i2 = new Through() + i2.out -> out2 + + reaction(in1) -> i1.in {= + lf_set(i1.in, in1->value); + =} + + reaction(in2) -> i2.in {= + lf_set(i2.in, in2->value); + =} +} + +federated reactor { + c = new Count() + test = new TestCount(num_inputs=2) + b = new A() + t = new Through() + + c.out -> b.in1 + b.out1 -> t.in + t.out -> b.in2 + b.out2 -> test.in +} diff --git a/test/RustRti/src/federated/LoopDistributedDecentralized.lf b/test/RustRti/src/federated/LoopDistributedDecentralized.lf new file mode 100644 index 0000000000..8814976c8d --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedDecentralized.lf @@ -0,0 +1,94 @@ +/** + * This tests a feedback loop with physical actions and decentralized coordination. + * + * @author Edward A. Lee + */ +target C { + coordination: decentralized, + logging: LOG, + timeout: 5 sec +} + +preamble {= + #include // Defines sleep() + extern bool stop; + void* ping(void* actionref); +=} + +reactor Looper(incr: int = 1, delay: time = 0 msec, stp_offset: time = 0) { + preamble {= + bool stop = false; + int count = 0; + // Thread to trigger an action once every second. + void* ping(void* actionref) { + while(!stop && count++ < 30) { + lf_print("Scheduling action."); + lf_schedule(actionref, 0); + sleep(1); + } + if (count >= 30) { + // shutdown reaction failed to be invoked. + lf_print_error_and_exit("Shutdown failed to be invoked in time."); + } + return NULL; + } + =} + input in: int + output out: int + physical action a(stp_offset) + state count: int = 0 + state inputs: int = 0 + + reaction(startup) -> a {= + // Start the thread that listens for Enter or Return. + lf_thread_t thread_id; + lf_print("Starting thread."); + lf_thread_create(&thread_id, &ping, a); + =} + + reaction(a) -> out {= + lf_print("Setting out."); + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(in) {= + self->inputs++; + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} STP(stp_offset) {= + self->inputs++; + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("STP offset was violated. Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} deadline(10 msec) {= + // Note: Could conceivably get both an STP violation and a deadline violation, + // which will result in double counting this input. This is OK, though. + self->inputs++; + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Deadline miss. Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + // Stop the thread that is scheduling actions. + stop = true; + // Don't require receiving all five inputs because that creates flakiness. + // Requiring two leaves a lot of headroom. + if (self->inputs < 2) { + lf_print_error_and_exit("Received only %d inputs.", self->count); + } + =} +} + +federated reactor LoopDistributedDecentralized(delay: time = 0) { + left = new Looper(stp_offset = 900 usec) + right = new Looper(incr=-1, stp_offset = 2400 usec) + left.out -> right.in + right.out -> left.in +} diff --git a/test/RustRti/src/federated/LoopDistributedDouble.lf b/test/RustRti/src/federated/LoopDistributedDouble.lf new file mode 100644 index 0000000000..dcbc5751b9 --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedDouble.lf @@ -0,0 +1,98 @@ +/** + * This tests a feedback loop with physical actions and centralized coordination. + * + * @author Edward A. Lee + */ +target C { + coordination: centralized, + coordination-options: { + advance-message-interval: 100 msec + }, + timeout: 5 sec, + logging: warn +} + +preamble {= + #include // Defines sleep() + extern bool stop; + void* ping(void* actionref); +=} + +reactor Looper(incr: int = 1, delay: time = 0 msec) { + preamble {= + bool stop = false; + // Thread to trigger an action once every second. + void* ping(void* actionref) { + while(!stop) { + lf_print("Scheduling action."); + lf_schedule(actionref, 0); + sleep(1); + } + return NULL; + } + =} + input in: int + input in2: int + output out: int + output out2: int + physical action a(delay) + state count: int = 0 + timer t(0, 1 sec) + + reaction(startup) -> a {= + // Start the thread that listens for Enter or Return. + lf_thread_t thread_id; + lf_print("Starting thread."); + lf_thread_create(&thread_id, &ping, a); + =} + + reaction(a) -> out, out2 {= + if (self->count%2 == 0) { + lf_set(out, self->count); + } else { + lf_set(out2, self->count); + } + self->count += self->incr; + =} + + reaction(in) {= + tag_t current_tag = lf_tag(); + lf_print("Received %d at logical time " PRINTF_TAG ".", + in->value, + current_tag.time - lf_time_start(), current_tag.microstep + ); + =} + + reaction(in2) {= + tag_t current_tag = lf_tag(); + lf_print("Received %d on in2 at logical time " PRINTF_TAG ".", + in2->value, + current_tag.time - lf_time_start(), current_tag.microstep + ); + =} + + reaction(t) {= + tag_t current_tag = lf_tag(); + lf_print("Timer triggered at logical time " PRINTF_TAG ".", + current_tag.time - lf_time_start(), current_tag.microstep + ); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + // Stop the thread that is scheduling actions. + stop = true; + if (self->count != 5 * self->incr) { + lf_print_error_and_exit("Failed to receive all five expected inputs."); + } + =} +} + +federated reactor(delay: time = 0) { + left = new Looper() + right = new Looper(incr=-1) + left.out -> right.in + right.out -> left.in + right.out2 -> left.in2 + left.out2 -> right.in2 +}