Skip to content

Commit

Permalink
Add five more federated examples
Browse files Browse the repository at this point in the history
- LoopDistributedDecentralized.lf, LoopDistributedDouble.lf,
  DistributedStop.lf, DistributedStopDecentralized.lf,
  and DistributedToken.lf
  • Loading branch information
chanijjani committed Mar 8, 2024
1 parent 088869b commit 95a1292
Show file tree
Hide file tree
Showing 5 changed files with 432 additions and 0 deletions.
118 changes: 118 additions & 0 deletions test/RustRti/src/federated/DistributedStop.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Test for lf_request_stop() in federated execution with centralized coordination.
*
* @author Soroush Bateni
*/
target C

reactor Sender {
output out: int
timer t(0, 1 usec)
logical action act
state reaction_invoked_correctly: bool = false

reaction(t, act) -> out, act {=
lf_print("Sending 42 at " PRINTF_TAG ".",
lf_time_logical_elapsed(),
lf_tag().microstep);
lf_set(out, 42);
if (lf_tag().microstep == 0) {
// Instead of having a separate reaction
// for 'act' like Stop.lf, we trigger the
// same reaction to test lf_request_stop() being
// called multiple times
lf_schedule(act, 0);
}
if (lf_time_logical_elapsed() == USEC(1)) {
// Call lf_request_stop() both at (1 usec, 0) and
// (1 usec, 1)
lf_print("Requesting stop at " PRINTF_TAG ".",
lf_time_logical_elapsed(),
lf_tag().microstep);
lf_request_stop();
}

tag_t _1usec1 = (tag_t) { .time = USEC(1) + lf_time_start(), .microstep = 1u };
if (lf_tag_compare(lf_tag(), _1usec1) == 0) {
// The reaction was invoked at (1 usec, 1) as expected
self->reaction_invoked_correctly = true;
} else if (lf_tag_compare(lf_tag(), _1usec1) > 0) {
// The reaction should not have been invoked at tags larger than (1 usec, 1)
lf_print_error_and_exit("ERROR: Invoked reaction(t, act) at tag bigger than shutdown.");
}
=}

reaction(shutdown) {=
if (lf_time_logical_elapsed() != USEC(1) ||
lf_tag().microstep != 1) {
lf_print_error_and_exit("ERROR: Sender failed to stop the federation in time. "
"Stopping at " PRINTF_TAG ".",
lf_time_logical_elapsed(),
lf_tag().microstep);
} else if (self->reaction_invoked_correctly == false) {
lf_print_error_and_exit("ERROR: Sender reaction(t, act) was not invoked at (1 usec, 1). "
"Stopping at " PRINTF_TAG ".",
lf_time_logical_elapsed(),
lf_tag().microstep);
}
lf_print("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".",
lf_time_logical_elapsed(),
lf_tag().microstep);
=}
}

reactor Receiver(
// Used in the decentralized variant of the test
stp_offset: time = 10 msec) {
input in: int
state reaction_invoked_correctly: bool = false

reaction(in) {=
lf_print("Received %d at " PRINTF_TAG ".",
in->value,
lf_time_logical_elapsed(),
lf_tag().microstep);
if (lf_time_logical_elapsed() == USEC(1)) {
lf_print("Requesting stop at " PRINTF_TAG ".",
lf_time_logical_elapsed(),
lf_tag().microstep);
lf_request_stop();
// The receiver should receive a message at tag
// (1 usec, 1) and trigger this reaction
self->reaction_invoked_correctly = true;
}

tag_t _1usec1 = (tag_t) { .time = USEC(1) + lf_time_start(), .microstep = 1u };
if (lf_tag_compare(lf_tag(), _1usec1) > 0) {
self->reaction_invoked_correctly = false;
}
=}

reaction(shutdown) {=
// Sender should have requested stop earlier than the receiver.
// Therefore, the shutdown events must occur at (1000, 0) on the
// receiver.
if (lf_time_logical_elapsed() != USEC(1) ||
lf_tag().microstep != 1) {
lf_print_error_and_exit("Receiver failed to stop the federation at the right time. "
"Stopping at " PRINTF_TAG ".",
lf_time_logical_elapsed(),
lf_tag().microstep);
} else if (self->reaction_invoked_correctly == false) {
lf_print_error_and_exit("Receiver reaction(in) was not invoked the correct number of times. "
"Stopping at " PRINTF_TAG ".",
lf_time_logical_elapsed(),
lf_tag().microstep);
}
lf_print("SUCCESS: Successfully stopped the federation at " PRINTF_TAG ".",
lf_time_logical_elapsed(),
lf_tag().microstep);
=}
}

federated reactor DistributedStop {
sender = new Sender()
receiver = new Receiver()

sender.out -> receiver.in
}
17 changes: 17 additions & 0 deletions test/RustRti/src/federated/DistributedStopDecentralized.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* Test for lf_request_stop() in federated execution with decentralized coordination.
*
* @author Soroush Bateni
*/
target C {
coordination: decentralized
}

import Sender, Receiver from "DistributedStop.lf"

federated reactor DistributedStopDecentralized {
sender = new Sender()
receiver = new Receiver()

sender.out -> receiver.in
}
105 changes: 105 additions & 0 deletions test/RustRti/src/federated/DistributedToken.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Distributed LF program where a MessageGenerator creates a string message that is sent via the RTI
* (runtime infrastructure) to a receiver that prints the message. The type is char*, so this tests
* the transport of token-encapsulated messages. Three executable programs are generated,
* Distributed, Distributed_Sender, and Distributed_Receiver. The RTI is realized in the first of
* these and is identified as a "launcher," so it launches the other two programs.
*
* This program uses a 'logical' connection -> with a STP violation handler, decentralized
* coordination, and an 'after' that is sufficiently large to get deterministic timestamps. Hence,
* it realizes a 'poor man's Ptides' that does not require clock synchronization nor HLA-style
* centralized control over the advancement of time.
*
* @author Edward A. Lee
*/
target C {
timeout: 5 secs,
coordination: decentralized
}

/**
* Reactor that generates a sequence of messages, one per second. The message will be a string
* consisting of a root string followed by a count.
* @param root The root string.
* @output message The message.
*/
reactor MessageGenerator(root: string = "") {
// Output type char* instead of string is used for dynamically allocated character arrays (as
// opposed to static constant strings).
output message: char*
state count: int = 1
// Send first message after 1 sec so that the startup reactions do not factor into the transport
// time measurement on the first message.
timer t(1 sec, 1 sec)

reaction(t) -> message {=
// With NULL, 0 arguments, snprintf tells us how many bytes are needed.
// Add one for the null terminator.
int length = snprintf(NULL, 0, "%s %d", self->root, self->count) + 1;

// Dynamically allocate memory for the output.
char* array = (char*)malloc(length * sizeof(char));

// Populate the output string and increment the count.
snprintf(array, length, "%s %d", self->root, self->count++);
printf("MessageGenerator: At time " PRINTF_TIME ", send message: %s\n",
lf_time_logical_elapsed(),
array
);

// Set the output.
lf_set_array(message, array, length);
=}
}

/**
* Reactor that prints an incoming string.
* @param prefix A prefix for the message.
* @input message The message.
*/
reactor PrintMessage {
input message: char*
state count: int = 0

reaction(message) {=
printf("PrintMessage: At (elapsed) logical time " PRINTF_TIME ", receiver receives: %s\n",
lf_time_logical_elapsed(),
message->value
);
// Check the trailing number only of the message.
self->count++;
int trailing_number = atoi(&message->value[12]);
if (trailing_number != self->count) {
printf("ERROR: Expected message to be 'Hello World %d'.\n", self->count);
exit(1);
}
=} STP(0) {=
printf("PrintMessage: At (elapsed) tag " PRINTF_TAG ", receiver receives: %s\n"
"Original intended tag was " PRINTF_TAG ".\n",
lf_time_logical_elapsed(),
lf_tag().microstep,
message->value,
message->intended_tag.time - lf_time_start(),
message->intended_tag.microstep);
// Check the trailing number only of the message.
self->count++;
int trailing_number = atoi(&message->value[12]);
if (trailing_number != self->count) {
printf("ERROR: Expected message to be 'Hello World %d'.\n", self->count);
exit(1);
}
=}

reaction(shutdown) {=
if (self->count == 0) {
printf("ERROR: No messages received.\n");
exit(2);
}
=}
}

federated reactor DistributedToken {
msg = new MessageGenerator(root = "Hello World")
dsp = new PrintMessage()
msg.message -> dsp.message after 40 msec
}
94 changes: 94 additions & 0 deletions test/RustRti/src/federated/LoopDistributedDecentralized.lf
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* This tests a feedback loop with physical actions and decentralized coordination.
*
* @author Edward A. Lee
*/
target C {
coordination: decentralized,
logging: LOG,
timeout: 5 sec
}

preamble {=
#include <unistd.h> // Defines sleep()
extern bool stop;
void* ping(void* actionref);
=}

reactor Looper(incr: int = 1, delay: time = 0 msec, stp_offset: time = 0) {
preamble {=
bool stop = false;
int count = 0;
// Thread to trigger an action once every second.
void* ping(void* actionref) {
while(!stop && count++ < 30) {
lf_print("Scheduling action.");
lf_schedule(actionref, 0);
sleep(1);
}
if (count >= 30) {
// shutdown reaction failed to be invoked.
lf_print_error_and_exit("Shutdown failed to be invoked in time.");
}
return NULL;
}
=}
input in: int
output out: int
physical action a(stp_offset)
state count: int = 0
state inputs: int = 0

reaction(startup) -> a {=
// Start the thread that listens for Enter or Return.
lf_thread_t thread_id;
lf_print("Starting thread.");
lf_thread_create(&thread_id, &ping, a);
=}

reaction(a) -> out {=
lf_print("Setting out.");
lf_set(out, self->count);
self->count += self->incr;
=}

reaction(in) {=
self->inputs++;
instant_t time_lag = lf_time_physical() - lf_time_logical();
char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807
lf_comma_separated_time(time_buffer, time_lag);
lf_print("Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer);
=} STP(stp_offset) {=
self->inputs++;
instant_t time_lag = lf_time_physical() - lf_time_logical();
char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807
lf_comma_separated_time(time_buffer, time_lag);
lf_print("STP offset was violated. Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer);
=} deadline(10 msec) {=
// Note: Could conceivably get both an STP violation and a deadline violation,
// which will result in double counting this input. This is OK, though.
self->inputs++;
instant_t time_lag = lf_time_physical() - lf_time_logical();
char time_buffer[28]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807
lf_comma_separated_time(time_buffer, time_lag);
lf_print("Deadline miss. Received %d. Logical time is behind physical time by %s nsec.", in->value, time_buffer);
=}

reaction(shutdown) {=
lf_print("******* Shutdown invoked.");
// Stop the thread that is scheduling actions.
stop = true;
// Don't require receiving all five inputs because that creates flakiness.
// Requiring two leaves a lot of headroom.
if (self->inputs < 2) {
lf_print_error_and_exit("Received only %d inputs.", self->count);
}
=}
}

federated reactor LoopDistributedDecentralized(delay: time = 0) {
left = new Looper(stp_offset = 900 usec)
right = new Looper(incr=-1, stp_offset = 2400 usec)
left.out -> right.in
right.out -> left.in
}
Loading

0 comments on commit 95a1292

Please sign in to comment.