From 291b7ec748194ed24154e33c50115f80128773ee Mon Sep 17 00:00:00 2001 From: Chanhee Lee Date: Tue, 5 Mar 2024 20:24:40 -0700 Subject: [PATCH] Add four more federated tests - PingPongDistributed.lf, PingPongDistributedPhysical.lf, STPParameter.lf, and SmallDelayDecentralized.lf --- .../src/federated/PingPongDistributed.lf | 28 +++++++ .../federated/PingPongDistributedPhysical.lf | 77 +++++++++++++++++++ test/RustRti/src/federated/STPParameter.lf | 21 +++++ .../src/federated/SmallDelayDecentralized.lf | 69 +++++++++++++++++ 4 files changed, 195 insertions(+) create mode 100644 test/RustRti/src/federated/PingPongDistributed.lf create mode 100644 test/RustRti/src/federated/PingPongDistributedPhysical.lf create mode 100644 test/RustRti/src/federated/STPParameter.lf create mode 100644 test/RustRti/src/federated/SmallDelayDecentralized.lf diff --git a/test/RustRti/src/federated/PingPongDistributed.lf b/test/RustRti/src/federated/PingPongDistributed.lf new file mode 100644 index 0000000000..fa89e6f306 --- /dev/null +++ b/test/RustRti/src/federated/PingPongDistributed.lf @@ -0,0 +1,28 @@ +/** + * Basic benchmark from the Savina benchmark suite that is intended to measure message-passing + * overhead. + * + * This version is distributed, communicating using logical connections over sockets. + * + * See [Benchmarks wiki page](https://github.com/icyphy/lingua-franca/wiki/Benchmarks). + * + * This is based on https://www.scala-lang.org/old/node/54 See + * https://shamsimam.github.io/papers/2014-agere-savina.pdf. + * + * This is a distributed version, where Ping and Pong run in separate programs and can be run on + * different machines. + * + * There is no parallelism in this application, so it does not benefit from being being distributed. + * + * @author Edward A. Lee + */ +target C + +import Ping, Pong from "PingPongDistributedPhysical.lf" + +federated reactor(count: int = 10) { + ping = new Ping(count=count) + pong = new Pong(expected=count) + ping.send -> pong.receive + pong.send -> ping.receive +} diff --git a/test/RustRti/src/federated/PingPongDistributedPhysical.lf b/test/RustRti/src/federated/PingPongDistributedPhysical.lf new file mode 100644 index 0000000000..5253d66c4e --- /dev/null +++ b/test/RustRti/src/federated/PingPongDistributedPhysical.lf @@ -0,0 +1,77 @@ +/** + * Basic benchmark from the Savina benchmark suite that is intended to measure message-passing + * overhead. + * + * This version is distributed, communicating using physical connections over sockets. + * + * This is based on https://www.scala-lang.org/old/node/54 See + * https://shamsimam.github.io/papers/2014-agere-savina.pdf. + * + * This is a distributed version, where Ping and Pong run in separate programs and can be run on + * different machines. + * + * To get a sense, some (informal) results for 1,000,000 ping-pongs on my Mac: + * - Unthreaded: 97 msec + * - Threaded: 265 msec + * - Distributed: 53 seconds + * + * There is no parallelism in this application, so it does not benefit from being being distributed. + * + * These measurements are total execution time, including startup and shutdown, of all three + * programs. + * + * @author Edward A. Lee + */ +target C + +reactor Ping(count: int = 10) { + input receive: int + output send: int + state pingsLeft: int = count + logical action serve + + reaction(startup, serve) -> send {= + printf("At logical time " PRINTF_TIME ", Ping sending %d.\n", lf_time_logical_elapsed(), self->pingsLeft); + lf_set(send, self->pingsLeft--); + =} + + reaction(receive) -> serve {= + if (self->pingsLeft > 0) { + lf_schedule(serve, 0); + } else { + lf_request_stop(); + } + =} +} + +reactor Pong(expected: int = 10) { + input receive: int + output send: int + state count: int = 0 + + reaction(receive) -> send {= + self->count++; + printf("At logical time " PRINTF_TIME ", Pong received %d.\n", lf_time_logical_elapsed(), receive->value); + lf_set(send, receive->value); + if (self->count == self->expected) { + lf_request_stop(); + } + =} + + reaction(shutdown) {= + if (self->count != self->expected) { + fprintf(stderr, "Pong expected to receive %d inputs, but it received %d.\n", + self->expected, self->count + ); + exit(1); + } + printf("Pong received %d pings.\n", self->count); + =} +} + +federated reactor(count: int = 10) { + ping = new Ping(count=count) + pong = new Pong(expected=count) + ping.send ~> pong.receive + pong.send ~> ping.receive +} diff --git a/test/RustRti/src/federated/STPParameter.lf b/test/RustRti/src/federated/STPParameter.lf new file mode 100644 index 0000000000..6faded8841 --- /dev/null +++ b/test/RustRti/src/federated/STPParameter.lf @@ -0,0 +1,21 @@ +target C { + timeout: 5 sec, + coordination: decentralized +} + +import Count from "../lib/Count.lf" +import TestCount from "../lib/TestCount.lf" + +reactor PrintTimer(STP_offset: time = 500 msec) extends TestCount { + timer t(0, 1 sec) + + reaction(t) {= + lf_print("Timer ticked at " PRINTF_TAG ".", lf_time_logical_elapsed(), lf_tag().microstep ); + =} +} + +federated reactor { + c = new Count() + p = new PrintTimer(num_inputs=6) + c.out -> p.in +} diff --git a/test/RustRti/src/federated/SmallDelayDecentralized.lf b/test/RustRti/src/federated/SmallDelayDecentralized.lf new file mode 100644 index 0000000000..ce03140742 --- /dev/null +++ b/test/RustRti/src/federated/SmallDelayDecentralized.lf @@ -0,0 +1,69 @@ +target C { + timeout: 1 sec, + coordination: decentralized +} + +preamble {= + #include "platform.h" +=} + +reactor Count { + state count: int = 1 + output out: int + logical action loop + + reaction(startup) -> loop {= + lf_schedule(loop, 0); + =} + + reaction(loop) -> out {= + if (self->count < 6) { + lf_sleep(MSEC(50)); + lf_set(out, self->count++); + lf_schedule(loop, 0); + } + =} +} + +reactor Print { + input in: int + state c: int = 1 + state checks: int = 0 + + logical action loop + + reaction(startup) -> loop {= + lf_schedule(loop, 0); + =} + + reaction(in) {= + interval_t elapsed_time = lf_time_logical_elapsed(); + lf_print("++++++++ At tag " PRINTF_TAG ", received %d", elapsed_time, lf_tag().microstep, in->value); + if (in->value != self->c) { + lf_print_error_and_exit("Expected to receive %d.", self->c); + } + self->c++; + =} STP(1 sec) {= + lf_print_warning("STP violation at tag " PRINTF_TAG + ". This should not happen because the STP offset is large. Checking value anyway.", + lf_tag().time - lf_time_start(), lf_tag().microstep); + lf_print("-------- Received %d", in->value); + if (in->value != self->c) { + lf_print_error_and_exit("Expected to receive %d.", self->c); + } + self->c++; + =} + + reaction(loop) -> loop {= + lf_print("checking self->checks, which is now %d...", self->checks); + if (self->checks++ <= 3) { + lf_schedule(loop, 0); + } + =} +} + +federated reactor { + c = new Count() + p = new Print() + c.out -> p.in after 0 +}