From 95a129238d19ec8899e5554b5695d277e288c91c Mon Sep 17 00:00:00 2001 From: Chanhee Lee Date: Thu, 7 Mar 2024 14:26:46 -0700 Subject: [PATCH] Add five more federated examples - LoopDistributedDecentralized.lf, LoopDistributedDouble.lf, DistributedStop.lf, DistributedStopDecentralized.lf, and DistributedToken.lf --- test/RustRti/src/federated/DistributedStop.lf | 118 ++++++++++++++++++ .../federated/DistributedStopDecentralized.lf | 17 +++ .../RustRti/src/federated/DistributedToken.lf | 105 ++++++++++++++++ .../federated/LoopDistributedDecentralized.lf | 94 ++++++++++++++ .../src/federated/LoopDistributedDouble.lf | 98 +++++++++++++++ 5 files changed, 432 insertions(+) create mode 100644 test/RustRti/src/federated/DistributedStop.lf create mode 100644 test/RustRti/src/federated/DistributedStopDecentralized.lf create mode 100644 test/RustRti/src/federated/DistributedToken.lf create mode 100644 test/RustRti/src/federated/LoopDistributedDecentralized.lf create mode 100644 test/RustRti/src/federated/LoopDistributedDouble.lf diff --git a/test/RustRti/src/federated/DistributedStop.lf b/test/RustRti/src/federated/DistributedStop.lf new file mode 100644 index 0000000000..6e8796d90b --- /dev/null +++ b/test/RustRti/src/federated/DistributedStop.lf @@ -0,0 +1,118 @@ +/** + * Test for lf_request_stop() in federated execution with centralized coordination. + * + * @author Soroush Bateni + */ +target C + +reactor Sender { + output out: int + timer t(0, 1 usec) + logical action act + state reaction_invoked_correctly: bool = false + + reaction(t, act) -> out, act {= + lf_print("Sending 42 at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_set(out, 42); + if (lf_tag().microstep == 0) { + // Instead of having a separate reaction + // for 'act' like Stop.lf, we trigger the + // same reaction to test lf_request_stop() being + // called multiple times + lf_schedule(act, 0); + } + if (lf_time_logical_elapsed() == USEC(1)) { + // Call lf_request_stop() both at (1 usec, 0) and + // (1 usec, 1) + lf_print("Requesting stop at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_request_stop(); + } + + tag_t _1usec1 = (tag_t) { .time = USEC(1) + lf_time_start(), .microstep = 1u }; + if (lf_tag_compare(lf_tag(), _1usec1) == 0) { + // The reaction was invoked at (1 usec, 1) as expected + self->reaction_invoked_correctly = true; + } else if (lf_tag_compare(lf_tag(), _1usec1) > 0) { + // The reaction should not have been invoked at tags larger than (1 usec, 1) + lf_print_error_and_exit("ERROR: Invoked reaction(t, act) at tag bigger than shutdown."); + } + =} + + reaction(shutdown) {= + if (lf_time_logical_elapsed() != USEC(1) || + lf_tag().microstep != 1) { + lf_print_error_and_exit("ERROR: Sender failed to stop the federation in time. " + "Stopping at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + } else if (self->reaction_invoked_correctly == false) { + lf_print_error_and_exit("ERROR: Sender reaction(t, act) was not invoked at (1 usec, 1). " + "Stopping at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + } + lf_print("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + =} +} + +reactor Receiver( + // Used in the decentralized variant of the test + stp_offset: time = 10 msec) { + input in: int + state reaction_invoked_correctly: bool = false + + reaction(in) {= + lf_print("Received %d at " PRINTF_TAG ".", + in->value, + lf_time_logical_elapsed(), + lf_tag().microstep); + if (lf_time_logical_elapsed() == USEC(1)) { + lf_print("Requesting stop at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + lf_request_stop(); + // The receiver should receive a message at tag + // (1 usec, 1) and trigger this reaction + self->reaction_invoked_correctly = true; + } + + tag_t _1usec1 = (tag_t) { .time = USEC(1) + lf_time_start(), .microstep = 1u }; + if (lf_tag_compare(lf_tag(), _1usec1) > 0) { + self->reaction_invoked_correctly = false; + } + =} + + reaction(shutdown) {= + // Sender should have requested stop earlier than the receiver. + // Therefore, the shutdown events must occur at (1000, 0) on the + // receiver. + if (lf_time_logical_elapsed() != USEC(1) || + lf_tag().microstep != 1) { + lf_print_error_and_exit("Receiver failed to stop the federation at the right time. " + "Stopping at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + } else if (self->reaction_invoked_correctly == false) { + lf_print_error_and_exit("Receiver reaction(in) was not invoked the correct number of times. " + "Stopping at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + } + lf_print("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".", + lf_time_logical_elapsed(), + lf_tag().microstep); + =} +} + +federated reactor DistributedStop { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/federated/DistributedStopDecentralized.lf b/test/RustRti/src/federated/DistributedStopDecentralized.lf new file mode 100644 index 0000000000..c54bedd38c --- /dev/null +++ b/test/RustRti/src/federated/DistributedStopDecentralized.lf @@ -0,0 +1,17 @@ +/** + * Test for lf_request_stop() in federated execution with decentralized coordination. + * + * @author Soroush Bateni + */ +target C { + coordination: decentralized +} + +import Sender, Receiver from "DistributedStop.lf" + +federated reactor DistributedStopDecentralized { + sender = new Sender() + receiver = new Receiver() + + sender.out -> receiver.in +} diff --git a/test/RustRti/src/federated/DistributedToken.lf b/test/RustRti/src/federated/DistributedToken.lf new file mode 100644 index 0000000000..bd7d7b9810 --- /dev/null +++ b/test/RustRti/src/federated/DistributedToken.lf @@ -0,0 +1,105 @@ +/** + * Distributed LF program where a MessageGenerator creates a string message that is sent via the RTI + * (runtime infrastructure) to a receiver that prints the message. The type is char*, so this tests + * the transport of token-encapsulated messages. Three executable programs are generated, + * Distributed, Distributed_Sender, and Distributed_Receiver. The RTI is realized in the first of + * these and is identified as a "launcher," so it launches the other two programs. + * + * This program uses a 'logical' connection -> with a STP violation handler, decentralized + * coordination, and an 'after' that is sufficiently large to get deterministic timestamps. Hence, + * it realizes a 'poor man's Ptides' that does not require clock synchronization nor HLA-style + * centralized control over the advancement of time. + * + * @author Edward A. Lee + */ +target C { + timeout: 5 secs, + coordination: decentralized +} + +/** + * Reactor that generates a sequence of messages, one per second. The message will be a string + * consisting of a root string followed by a count. + * @param root The root string. + * @output message The message. + */ +reactor MessageGenerator(root: string = "") { + // Output type char* instead of string is used for dynamically allocated character arrays (as + // opposed to static constant strings). + output message: char* + state count: int = 1 + // Send first message after 1 sec so that the startup reactions do not factor into the transport + // time measurement on the first message. + timer t(1 sec, 1 sec) + + reaction(t) -> message {= + // With NULL, 0 arguments, snprintf tells us how many bytes are needed. + // Add one for the null terminator. + int length = snprintf(NULL, 0, "%s %d", self->root, self->count) + 1; + + // Dynamically allocate memory for the output. + char* array = (char*)malloc(length * sizeof(char)); + + // Populate the output string and increment the count. + snprintf(array, length, "%s %d", self->root, self->count++); + printf("MessageGenerator: At time " PRINTF_TIME ", send message: %s\n", + lf_time_logical_elapsed(), + array + ); + + // Set the output. + lf_set_array(message, array, length); + =} +} + +/** + * Reactor that prints an incoming string. + * @param prefix A prefix for the message. + * @input message The message. + */ +reactor PrintMessage { + input message: char* + state count: int = 0 + + reaction(message) {= + printf("PrintMessage: At (elapsed) logical time " PRINTF_TIME ", receiver receives: %s\n", + lf_time_logical_elapsed(), + message->value + ); + // Check the trailing number only of the message. + self->count++; + int trailing_number = atoi(&message->value[12]); + if (trailing_number != self->count) { + printf("ERROR: Expected message to be 'Hello World %d'.\n", self->count); + exit(1); + } + =} STP(0) {= + printf("PrintMessage: At (elapsed) tag " PRINTF_TAG ", receiver receives: %s\n" + "Original intended tag was " PRINTF_TAG ".\n", + lf_time_logical_elapsed(), + lf_tag().microstep, + message->value, + message->intended_tag.time - lf_time_start(), + message->intended_tag.microstep); + // Check the trailing number only of the message. + self->count++; + int trailing_number = atoi(&message->value[12]); + if (trailing_number != self->count) { + printf("ERROR: Expected message to be 'Hello World %d'.\n", self->count); + exit(1); + } + =} + + reaction(shutdown) {= + if (self->count == 0) { + printf("ERROR: No messages received.\n"); + exit(2); + } + =} +} + +federated reactor DistributedToken { + msg = new MessageGenerator(root = "Hello World") + dsp = new PrintMessage() + msg.message -> dsp.message after 40 msec +} diff --git a/test/RustRti/src/federated/LoopDistributedDecentralized.lf b/test/RustRti/src/federated/LoopDistributedDecentralized.lf new file mode 100644 index 0000000000..8814976c8d --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedDecentralized.lf @@ -0,0 +1,94 @@ +/** + * This tests a feedback loop with physical actions and decentralized coordination. + * + * @author Edward A. Lee + */ +target C { + coordination: decentralized, + logging: LOG, + timeout: 5 sec +} + +preamble {= + #include // Defines sleep() + extern bool stop; + void* ping(void* actionref); +=} + +reactor Looper(incr: int = 1, delay: time = 0 msec, stp_offset: time = 0) { + preamble {= + bool stop = false; + int count = 0; + // Thread to trigger an action once every second. + void* ping(void* actionref) { + while(!stop && count++ < 30) { + lf_print("Scheduling action."); + lf_schedule(actionref, 0); + sleep(1); + } + if (count >= 30) { + // shutdown reaction failed to be invoked. + lf_print_error_and_exit("Shutdown failed to be invoked in time."); + } + return NULL; + } + =} + input in: int + output out: int + physical action a(stp_offset) + state count: int = 0 + state inputs: int = 0 + + reaction(startup) -> a {= + // Start the thread that listens for Enter or Return. + lf_thread_t thread_id; + lf_print("Starting thread."); + lf_thread_create(&thread_id, &ping, a); + =} + + reaction(a) -> out {= + lf_print("Setting out."); + lf_set(out, self->count); + self->count += self->incr; + =} + + reaction(in) {= + self->inputs++; + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} STP(stp_offset) {= + self->inputs++; + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("STP offset was violated. Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} deadline(10 msec) {= + // Note: Could conceivably get both an STP violation and a deadline violation, + // which will result in double counting this input. This is OK, though. + self->inputs++; + instant_t time_lag = lf_time_physical() - lf_time_logical(); + char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, time_lag); + lf_print("Deadline miss. Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + // Stop the thread that is scheduling actions. + stop = true; + // Don't require receiving all five inputs because that creates flakiness. + // Requiring two leaves a lot of headroom. + if (self->inputs < 2) { + lf_print_error_and_exit("Received only %d inputs.", self->count); + } + =} +} + +federated reactor LoopDistributedDecentralized(delay: time = 0) { + left = new Looper(stp_offset = 900 usec) + right = new Looper(incr=-1, stp_offset = 2400 usec) + left.out -> right.in + right.out -> left.in +} diff --git a/test/RustRti/src/federated/LoopDistributedDouble.lf b/test/RustRti/src/federated/LoopDistributedDouble.lf new file mode 100644 index 0000000000..dcbc5751b9 --- /dev/null +++ b/test/RustRti/src/federated/LoopDistributedDouble.lf @@ -0,0 +1,98 @@ +/** + * This tests a feedback loop with physical actions and centralized coordination. + * + * @author Edward A. Lee + */ +target C { + coordination: centralized, + coordination-options: { + advance-message-interval: 100 msec + }, + timeout: 5 sec, + logging: warn +} + +preamble {= + #include // Defines sleep() + extern bool stop; + void* ping(void* actionref); +=} + +reactor Looper(incr: int = 1, delay: time = 0 msec) { + preamble {= + bool stop = false; + // Thread to trigger an action once every second. + void* ping(void* actionref) { + while(!stop) { + lf_print("Scheduling action."); + lf_schedule(actionref, 0); + sleep(1); + } + return NULL; + } + =} + input in: int + input in2: int + output out: int + output out2: int + physical action a(delay) + state count: int = 0 + timer t(0, 1 sec) + + reaction(startup) -> a {= + // Start the thread that listens for Enter or Return. + lf_thread_t thread_id; + lf_print("Starting thread."); + lf_thread_create(&thread_id, &ping, a); + =} + + reaction(a) -> out, out2 {= + if (self->count%2 == 0) { + lf_set(out, self->count); + } else { + lf_set(out2, self->count); + } + self->count += self->incr; + =} + + reaction(in) {= + tag_t current_tag = lf_tag(); + lf_print("Received %d at logical time " PRINTF_TAG ".", + in->value, + current_tag.time - lf_time_start(), current_tag.microstep + ); + =} + + reaction(in2) {= + tag_t current_tag = lf_tag(); + lf_print("Received %d on in2 at logical time " PRINTF_TAG ".", + in2->value, + current_tag.time - lf_time_start(), current_tag.microstep + ); + =} + + reaction(t) {= + tag_t current_tag = lf_tag(); + lf_print("Timer triggered at logical time " PRINTF_TAG ".", + current_tag.time - lf_time_start(), current_tag.microstep + ); + =} + + reaction(shutdown) {= + lf_print("******* Shutdown invoked."); + // Stop the thread that is scheduling actions. + stop = true; + if (self->count != 5 * self->incr) { + lf_print_error_and_exit("Failed to receive all five expected inputs."); + } + =} +} + +federated reactor(delay: time = 0) { + left = new Looper() + right = new Looper(incr=-1) + left.out -> right.in + right.out -> left.in + right.out2 -> left.in2 + left.out2 -> right.in2 +}