forked from lf-lang/lingua-franca
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- LoopDistributedDecentralized.lf and LoopDistributedDouble.lf, DistributedStop.lf, DistributedStopDecentralized.lf, and DistributedToken.lf
- Loading branch information
1 parent
088869b
commit 7ac46cf
Showing
5 changed files
with
432 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
/** | ||
* Test for lf_request_stop() in federated execution with centralized coordination. | ||
* | ||
* @author Soroush Bateni | ||
*/ | ||
target C | ||
|
||
reactor Sender { | ||
output out: int | ||
timer t(0, 1 usec) | ||
logical action act | ||
state reaction_invoked_correctly: bool = false | ||
|
||
reaction(t, act) -> out, act {= | ||
lf_print("Sending 42 at " PRINTF_TAG ".", | ||
lf_time_logical_elapsed(), | ||
lf_tag().microstep); | ||
lf_set(out, 42); | ||
if (lf_tag().microstep == 0) { | ||
// Instead of having a separate reaction | ||
// for 'act' like Stop.lf, we trigger the | ||
// same reaction to test lf_request_stop() being | ||
// called multiple times | ||
lf_schedule(act, 0); | ||
} | ||
if (lf_time_logical_elapsed() == USEC(1)) { | ||
// Call lf_request_stop() both at (1 usec, 0) and | ||
// (1 usec, 1) | ||
lf_print("Requesting stop at " PRINTF_TAG ".", | ||
lf_time_logical_elapsed(), | ||
lf_tag().microstep); | ||
lf_request_stop(); | ||
} | ||
|
||
tag_t _1usec1 = (tag_t) { .time = USEC(1) + lf_time_start(), .microstep = 1u }; | ||
if (lf_tag_compare(lf_tag(), _1usec1) == 0) { | ||
// The reaction was invoked at (1 usec, 1) as expected | ||
self->reaction_invoked_correctly = true; | ||
} else if (lf_tag_compare(lf_tag(), _1usec1) > 0) { | ||
// The reaction should not have been invoked at tags larger than (1 usec, 1) | ||
lf_print_error_and_exit("ERROR: Invoked reaction(t, act) at tag bigger than shutdown."); | ||
} | ||
=} | ||
|
||
reaction(shutdown) {= | ||
if (lf_time_logical_elapsed() != USEC(1) || | ||
lf_tag().microstep != 1) { | ||
lf_print_error_and_exit("ERROR: Sender failed to stop the federation in time. " | ||
"Stopping at " PRINTF_TAG ".", | ||
lf_time_logical_elapsed(), | ||
lf_tag().microstep); | ||
} else if (self->reaction_invoked_correctly == false) { | ||
lf_print_error_and_exit("ERROR: Sender reaction(t, act) was not invoked at (1 usec, 1). " | ||
"Stopping at " PRINTF_TAG ".", | ||
lf_time_logical_elapsed(), | ||
lf_tag().microstep); | ||
} | ||
lf_print("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".", | ||
lf_time_logical_elapsed(), | ||
lf_tag().microstep); | ||
=} | ||
} | ||
|
||
reactor Receiver( | ||
// Used in the decentralized variant of the test | ||
stp_offset: time = 10 msec) { | ||
input in: int | ||
state reaction_invoked_correctly: bool = false | ||
|
||
reaction(in) {= | ||
lf_print("Received %d at " PRINTF_TAG ".", | ||
in->value, | ||
lf_time_logical_elapsed(), | ||
lf_tag().microstep); | ||
if (lf_time_logical_elapsed() == USEC(1)) { | ||
lf_print("Requesting stop at " PRINTF_TAG ".", | ||
lf_time_logical_elapsed(), | ||
lf_tag().microstep); | ||
lf_request_stop(); | ||
// The receiver should receive a message at tag | ||
// (1 usec, 1) and trigger this reaction | ||
self->reaction_invoked_correctly = true; | ||
} | ||
|
||
tag_t _1usec1 = (tag_t) { .time = USEC(1) + lf_time_start(), .microstep = 1u }; | ||
if (lf_tag_compare(lf_tag(), _1usec1) > 0) { | ||
self->reaction_invoked_correctly = false; | ||
} | ||
=} | ||
|
||
reaction(shutdown) {= | ||
// Sender should have requested stop earlier than the receiver. | ||
// Therefore, the shutdown events must occur at (1000, 0) on the | ||
// receiver. | ||
if (lf_time_logical_elapsed() != USEC(1) || | ||
lf_tag().microstep != 1) { | ||
lf_print_error_and_exit("Receiver failed to stop the federation at the right time. " | ||
"Stopping at " PRINTF_TAG ".", | ||
lf_time_logical_elapsed(), | ||
lf_tag().microstep); | ||
} else if (self->reaction_invoked_correctly == false) { | ||
lf_print_error_and_exit("Receiver reaction(in) was not invoked the correct number of times. " | ||
"Stopping at " PRINTF_TAG ".", | ||
lf_time_logical_elapsed(), | ||
lf_tag().microstep); | ||
} | ||
lf_print("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".", | ||
lf_time_logical_elapsed(), | ||
lf_tag().microstep); | ||
=} | ||
} | ||
|
||
federated reactor DistributedStop { | ||
sender = new Sender() | ||
receiver = new Receiver() | ||
|
||
sender.out -> receiver.in | ||
} |
17 changes: 17 additions & 0 deletions
17
test/RustRti/src/federated/DistributedStopDecentralized.lf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
/** | ||
* Test for lf_request_stop() in federated execution with decentralized coordination. | ||
* | ||
* @author Soroush Bateni | ||
*/ | ||
target C { | ||
coordination: decentralized | ||
} | ||
|
||
import Sender, Receiver from "DistributedStop.lf" | ||
|
||
federated reactor DistributedStopDecentralized { | ||
sender = new Sender() | ||
receiver = new Receiver() | ||
|
||
sender.out -> receiver.in | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/** | ||
* Distributed LF program where a MessageGenerator creates a string message that is sent via the RTI | ||
* (runtime infrastructure) to a receiver that prints the message. The type is char*, so this tests | ||
* the transport of token-encapsulated messages. Three executable programs are generated, | ||
* Distributed, Distributed_Sender, and Distributed_Receiver. The RTI is realized in the first of | ||
* these and is identified as a "launcher," so it launches the other two programs. | ||
* | ||
* This program uses a 'logical' connection -> with a STP violation handler, decentralized | ||
* coordination, and an 'after' that is sufficiently large to get deterministic timestamps. Hence, | ||
* it realizes a 'poor man's Ptides' that does not require clock synchronization nor HLA-style | ||
* centralized control over the advancement of time. | ||
* | ||
* @author Edward A. Lee | ||
*/ | ||
target C { | ||
timeout: 5 secs, | ||
coordination: decentralized | ||
} | ||
|
||
/** | ||
* Reactor that generates a sequence of messages, one per second. The message will be a string | ||
* consisting of a root string followed by a count. | ||
* @param root The root string. | ||
* @output message The message. | ||
*/ | ||
reactor MessageGenerator(root: string = "") { | ||
// Output type char* instead of string is used for dynamically allocated character arrays (as | ||
// opposed to static constant strings). | ||
output message: char* | ||
state count: int = 1 | ||
// Send first message after 1 sec so that the startup reactions do not factor into the transport | ||
// time measurement on the first message. | ||
timer t(1 sec, 1 sec) | ||
|
||
reaction(t) -> message {= | ||
// With NULL, 0 arguments, snprintf tells us how many bytes are needed. | ||
// Add one for the null terminator. | ||
int length = snprintf(NULL, 0, "%s %d", self->root, self->count) + 1; | ||
|
||
// Dynamically allocate memory for the output. | ||
char* array = (char*)malloc(length * sizeof(char)); | ||
|
||
// Populate the output string and increment the count. | ||
snprintf(array, length, "%s %d", self->root, self->count++); | ||
printf("MessageGenerator: At time " PRINTF_TIME ", send message: %s\n", | ||
lf_time_logical_elapsed(), | ||
array | ||
); | ||
|
||
// Set the output. | ||
lf_set_array(message, array, length); | ||
=} | ||
} | ||
|
||
/** | ||
* Reactor that prints an incoming string. | ||
* @param prefix A prefix for the message. | ||
* @input message The message. | ||
*/ | ||
reactor PrintMessage { | ||
input message: char* | ||
state count: int = 0 | ||
|
||
reaction(message) {= | ||
printf("PrintMessage: At (elapsed) logical time " PRINTF_TIME ", receiver receives: %s\n", | ||
lf_time_logical_elapsed(), | ||
message->value | ||
); | ||
// Check the trailing number only of the message. | ||
self->count++; | ||
int trailing_number = atoi(&message->value[12]); | ||
if (trailing_number != self->count) { | ||
printf("ERROR: Expected message to be 'Hello World %d'.\n", self->count); | ||
exit(1); | ||
} | ||
=} STP(0) {= | ||
printf("PrintMessage: At (elapsed) tag " PRINTF_TAG ", receiver receives: %s\n" | ||
"Original intended tag was " PRINTF_TAG ".\n", | ||
lf_time_logical_elapsed(), | ||
lf_tag().microstep, | ||
message->value, | ||
message->intended_tag.time - lf_time_start(), | ||
message->intended_tag.microstep); | ||
// Check the trailing number only of the message. | ||
self->count++; | ||
int trailing_number = atoi(&message->value[12]); | ||
if (trailing_number != self->count) { | ||
printf("ERROR: Expected message to be 'Hello World %d'.\n", self->count); | ||
exit(1); | ||
} | ||
=} | ||
|
||
reaction(shutdown) {= | ||
if (self->count == 0) { | ||
printf("ERROR: No messages received.\n"); | ||
exit(2); | ||
} | ||
=} | ||
} | ||
|
||
federated reactor DistributedToken { | ||
msg = new MessageGenerator(root = "Hello World") | ||
dsp = new PrintMessage() | ||
msg.message -> dsp.message after 40 msec | ||
} |
94 changes: 94 additions & 0 deletions
94
test/RustRti/src/federated/LoopDistributedDecentralized.lf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
/** | ||
* This tests a feedback loop with physical actions and decentralized coordination. | ||
* | ||
* @author Edward A. Lee | ||
*/ | ||
target C { | ||
coordination: decentralized, | ||
logging: LOG, | ||
timeout: 5 sec | ||
} | ||
|
||
preamble {= | ||
#include <unistd.h> // Defines sleep() | ||
extern bool stop; | ||
void* ping(void* actionref); | ||
=} | ||
|
||
reactor Looper(incr: int = 1, delay: time = 0 msec, stp_offset: time = 0) { | ||
preamble {= | ||
bool stop = false; | ||
int count = 0; | ||
// Thread to trigger an action once every second. | ||
void* ping(void* actionref) { | ||
while(!stop && count++ < 30) { | ||
lf_print("Scheduling action."); | ||
lf_schedule(actionref, 0); | ||
sleep(1); | ||
} | ||
if (count >= 30) { | ||
// shutdown reaction failed to be invoked. | ||
lf_print_error_and_exit("Shutdown failed to be invoked in time."); | ||
} | ||
return NULL; | ||
} | ||
=} | ||
input in: int | ||
output out: int | ||
physical action a(stp_offset) | ||
state count: int = 0 | ||
state inputs: int = 0 | ||
|
||
reaction(startup) -> a {= | ||
// Start the thread that listens for Enter or Return. | ||
lf_thread_t thread_id; | ||
lf_print("Starting thread."); | ||
lf_thread_create(&thread_id, &ping, a); | ||
=} | ||
|
||
reaction(a) -> out {= | ||
lf_print("Setting out."); | ||
lf_set(out, self->count); | ||
self->count += self->incr; | ||
=} | ||
|
||
reaction(in) {= | ||
self->inputs++; | ||
instant_t time_lag = lf_time_physical() - lf_time_logical(); | ||
char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 | ||
lf_comma_separated_time(time_buffer, time_lag); | ||
lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); | ||
=} STP(stp_offset) {= | ||
self->inputs++; | ||
instant_t time_lag = lf_time_physical() - lf_time_logical(); | ||
char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 | ||
lf_comma_separated_time(time_buffer, time_lag); | ||
lf_print("STP offset was violated. Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); | ||
=} deadline(10 msec) {= | ||
// Note: Could conceivably get both an STP violation and a deadline violation, | ||
// which will result in double counting this input. This is OK, though. | ||
self->inputs++; | ||
instant_t time_lag = lf_time_physical() - lf_time_logical(); | ||
char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 | ||
lf_comma_separated_time(time_buffer, time_lag); | ||
lf_print("Deadline miss. Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer); | ||
=} | ||
|
||
reaction(shutdown) {= | ||
lf_print("******* Shutdown invoked."); | ||
// Stop the thread that is scheduling actions. | ||
stop = true; | ||
// Don't require receiving all five inputs because that creates flakiness. | ||
// Requiring two leaves a lot of headroom. | ||
if (self->inputs < 2) { | ||
lf_print_error_and_exit("Received only %d inputs.", self->count); | ||
} | ||
=} | ||
} | ||
|
||
federated reactor LoopDistributedDecentralized(delay: time = 0) { | ||
left = new Looper(stp_offset = 900 usec) | ||
right = new Looper(incr=-1, stp_offset = 2400 usec) | ||
left.out -> right.in | ||
right.out -> left.in | ||
} |
Oops, something went wrong.