diff --git a/README.md b/README.md index a6222644..610e3f9d 100644 --- a/README.md +++ b/README.md @@ -113,6 +113,7 @@ The built-in blocks provided by Protoflow are listed below: | Block | Description | |:------------------|:-------------------------------------------------------------------------------------------------------------------------------| | [`Buffer`] | Stores all messages it receives. | +| [`Clock`] | Periodically sends current timestamp. | | [`ConcatStrings`] | Concatenates the received string messages, with an optional delimiter string inserted between each message. | | [`Const`] | Sends a constant value. | | [`Count`] | Counts the number of messages it receives, while optionally passing them through. | @@ -159,6 +160,38 @@ block-beta protoflow execute Buffer ``` +#### [`Clock`] + +A block that periodically sends current timestamp. + +```mermaid +block-beta + columns 4 + Clock space:2 Sink + Clock-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Clock block + class Sink hidden +``` + +```bash +protoflow execute Clock fixed=2 +``` + +```bash +protoflow execute Clock fixed=0.5 +``` + +```bash +protoflow execute Clock random=1..5 +``` + +```bash +protoflow execute Clock random=0.5..1.5 +``` + #### [`ConcatStrings`] A block for concatenating all string messages it receives, with an optional delimiter string inserted between each message @@ -795,6 +828,7 @@ To add a new block type implementation, make sure to examine and amend: [`examples`]: lib/protoflow/examples [`Buffer`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Buffer.html +[`Clock`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Clock.html [`ConcatStrings`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ConcatStrings.html [`Const`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Const.html [`Count`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Count.html diff --git a/lib/protoflow-blocks/Cargo.toml b/lib/protoflow-blocks/Cargo.toml index 52c54740..02786628 100644 --- a/lib/protoflow-blocks/Cargo.toml +++ b/lib/protoflow-blocks/Cargo.toml @@ -25,8 +25,9 @@ hash-sha2 = ["dep:sha2"] rand = ["protoflow-core/rand"] std = [ - "blake3?/std", "protoflow-core/std", + "chrono/std", + "blake3?/std", "serde?/std", "sysml-model?/std", "tracing?/std", @@ -67,6 +68,7 @@ struson = "0.5" sysml-model = { version = "=0.2.3", default-features = false, optional = true } ubyte = { version = "0.10", default-features = false } csv = "1.3.1" +chrono = { version = "0.4.39", default-features = false, features = ["now"] } [dev-dependencies] bytes = "1.8.0" diff --git a/lib/protoflow-blocks/doc/core/clock.mmd b/lib/protoflow-blocks/doc/core/clock.mmd new file mode 100644 index 00000000..37a38b9c --- /dev/null +++ b/lib/protoflow-blocks/doc/core/clock.mmd @@ -0,0 +1,9 @@ +block-beta + columns 4 + Clock space:2 Sink + Clock-- "output" -->Sink + + classDef block height:48px,padding:8px; + classDef hidden visibility:none; + class Clock block + class Sink hidden diff --git a/lib/protoflow-blocks/doc/core/clock.seq.mmd b/lib/protoflow-blocks/doc/core/clock.seq.mmd new file mode 100644 index 00000000..6a6be6ab --- /dev/null +++ b/lib/protoflow-blocks/doc/core/clock.seq.mmd @@ -0,0 +1,12 @@ +sequenceDiagram + autonumber + participant Clock as Clock block + participant Clock.output as Clock.output port + participant BlockA as Another block + + Clock-->>BlockA: Connect + + Clock->>BlockA: Message + + Clock-->>Clock.output: Close + Clock-->>BlockA: Disconnect diff --git a/lib/protoflow-blocks/src/blocks/core.rs b/lib/protoflow-blocks/src/blocks/core.rs index f6f4e760..64d5b9e6 100644 --- a/lib/protoflow-blocks/src/blocks/core.rs +++ b/lib/protoflow-blocks/src/blocks/core.rs @@ -14,6 +14,16 @@ pub mod core { pub trait CoreBlocks { fn buffer + 'static>(&mut self) -> Buffer; + fn clock(&mut self, delay: DelayType) -> Clock; + + fn clock_fixed(&mut self, delay: Duration) -> Clock { + self.clock(DelayType::Fixed(delay)) + } + + fn clock_random(&mut self, delay: Range) -> Clock { + self.clock(DelayType::Random(delay)) + } + fn const_bytes>(&mut self, value: T) -> Const; fn const_string(&mut self, value: impl ToString) -> Const; @@ -43,6 +53,7 @@ pub mod core { #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub enum CoreBlockTag { Buffer, + Clock, Const, Count, Delay, @@ -57,6 +68,11 @@ pub mod core { input: InputPortName, }, + Clock { + output: OutputPortName, + delay: DelayType, + }, + Const { output: OutputPortName, value: String, @@ -89,6 +105,7 @@ pub mod core { use CoreBlockConfig::*; Cow::Borrowed(match self { Buffer { .. } => "Buffer", + Clock { .. } => "Clock", Const { .. } => "Const", Count { .. } => "Count", Delay { .. } => "Delay", @@ -103,6 +120,7 @@ pub mod core { use CoreBlockConfig::*; match self { Buffer { .. } => vec![], + Clock { output, .. } => vec![("output", Some(output.clone()))], Const { output, .. } => vec![("output", Some(output.clone()))], Count { output, count, .. } => { vec![("output", output.clone()), ("count", Some(count.clone()))] @@ -120,6 +138,7 @@ pub mod core { use CoreBlockConfig::*; match self { Buffer { .. } => Box::new(super::Buffer::new(system.input_any())), // TODO: Buffer::with_system(system) + Clock { delay, .. } => Box::new(super::Clock::with_system(system, delay.clone())), Const { value, .. } => Box::new(super::Const::with_system(system, value.clone())), Count { .. } => Box::new(super::Count::new( system.input_any(), @@ -146,6 +165,9 @@ pub mod core { mod buffer; pub use buffer::*; + mod clock; + pub use clock::*; + mod r#const; pub use r#const::*; diff --git a/lib/protoflow-blocks/src/blocks/core/clock.rs b/lib/protoflow-blocks/src/blocks/core/clock.rs new file mode 100644 index 00000000..e9aae696 --- /dev/null +++ b/lib/protoflow-blocks/src/blocks/core/clock.rs @@ -0,0 +1,162 @@ +// This is free and unencumbered software released into the public domain. + +use crate::{prelude::String, types::DelayType, StdioConfig, StdioError, StdioSystem, System}; +use protoflow_core::{Block, BlockResult, BlockRuntime, OutputPort}; +use protoflow_derive::Block; +use simple_mermaid::mermaid; + +/// A block that periodically sends current timestamp. +/// +/// This block sends current timestamp on its output port, with interval specified by the parameter. +/// +/// The timestamp is a Unix UTC timestamp in microseconds passed as a [`i64`] value. +/// +/// The block waits for the output port to be connected before sending the value. +/// +/// The block does not have any input ports nor state. +/// +/// # Block Diagram +#[doc = mermaid!("../../../doc/core/clock.mmd")] +/// +/// # Sequence Diagram +#[doc = mermaid!("../../../doc/core/clock.seq.mmd" framed)] +/// +/// # Examples +/// +/// ## Using the block in a system +/// +/// ```rust +/// # use protoflow_blocks::*; +/// # use std::time::Duration; +/// # fn main() { +/// System::build(|s| { +/// let stdin = s.clock_fixed(Duration::from_secs(1)); +/// let encode_lines = s.encode_lines(); +/// let stdout = s.write_stdout(); +/// s.connect(&stdin.output, &encode_lines.input); +/// s.connect(&encode_lines.output, &stdout.input); +/// }); +/// # } +/// ``` +/// +/// ## Running the block via the CLI +/// +/// ```console +/// $ protoflow execute Clock fixed=2 +/// ``` +/// +/// ```console +/// $ protoflow execute Clock fixed=0.5 +/// ``` +/// +/// ```console +/// $ protoflow execute Clock random=1..5 +/// ``` +/// +/// ```console +/// $ protoflow execute Clock random=0.5..1.5 +/// ``` +/// +#[derive(Block, Clone)] +pub struct Clock { + /// The port to send the timestamp on. + #[output] + pub output: OutputPort, + + /// A delay between outputs. + #[parameter] + pub delay: DelayType, +} + +impl Clock { + pub fn new(output: OutputPort, delay: DelayType) -> Self { + Self::with_params(output, delay) + } +} + +impl Clock { + pub fn with_params(output: OutputPort, delay: DelayType) -> Self { + Self { output, delay } + } +} + +impl Clock { + pub fn with_system(system: &System, delay: DelayType) -> Self { + use crate::SystemBuilding; + Self::with_params(system.output(), delay) + } +} + +impl Block for Clock { + fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult { + runtime.wait_for(&self.output)?; + + loop { + let now = chrono::Utc::now().timestamp_micros(); + self.output.send(&now)?; + + let duration = match self.delay { + DelayType::Fixed(duration) => duration, + DelayType::Random(ref range) => runtime.random_duration(range.clone()), + }; + runtime.sleep_for(duration)?; + } + } +} + +fn parse_range(range_str: &String) -> Option<(f64, f64)> { + if let Some(range_str) = range_str.split_once("..") { + match (range_str.0.parse::(), range_str.1.parse::()) { + (Ok(range0), Ok(range1)) => Some((range0, range1)), + _ => None, + } + } else { + None + } +} + +#[cfg(feature = "std")] +impl StdioSystem for Clock { + fn build_system(config: StdioConfig) -> Result { + use crate::{prelude::Duration, CoreBlocks, IoBlocks, SysBlocks, SystemBuilding}; + + let delay_type = if let Some(delay) = config.get_opt::("fixed")? { + DelayType::Fixed(Duration::from_secs_f64(delay)) + } else if let Some(delay) = config.get_opt::("random")? { + if let Some(range) = parse_range(&delay) { + DelayType::Random( + Duration::from_secs_f64(range.0)..Duration::from_secs_f64(range.1), + ) + } else { + return Err(StdioError::InvalidParameter("random")); + } + } else { + return Err(StdioError::MissingParameter("fixed or random")); + }; + + Ok(System::build(|s| { + let stdin = s.clock(delay_type); + let encode_lines = s.encode_lines(); + let stdout = s.write_stdout(); + s.connect(&stdin.output, &encode_lines.input); + s.connect(&encode_lines.output, &stdout.input); + })) + } +} + +#[cfg(test)] +mod tests { + use super::Clock; + use crate::{prelude::Duration, DelayType, System, SystemBuilding}; + + #[test] + fn instantiate_block() { + // Check that the block is constructible: + let _ = System::build(|s| { + let _ = s.block(Clock::with_params( + s.output(), + DelayType::Fixed(Duration::from_secs(1)), + )); + }); + } +} diff --git a/lib/protoflow-blocks/src/lib.rs b/lib/protoflow-blocks/src/lib.rs index 41d1decd..7544e137 100644 --- a/lib/protoflow-blocks/src/lib.rs +++ b/lib/protoflow-blocks/src/lib.rs @@ -54,6 +54,7 @@ pub fn build_stdio_system( Ok(match system_name.as_ref() { // CoreBlocks "Buffer" => Buffer::::build_system(config)?, + "Clock" => Clock::build_system(config)?, "Const" => Const::::build_system(config)?, "Count" => Count::::build_system(config)?, "Delay" => Delay::::build_system(config)?, diff --git a/lib/protoflow-blocks/src/system.rs b/lib/protoflow-blocks/src/system.rs index 1ac658c0..e8999257 100644 --- a/lib/protoflow-blocks/src/system.rs +++ b/lib/protoflow-blocks/src/system.rs @@ -5,10 +5,10 @@ use crate::{ prelude::{fmt, Arc, Box, Bytes, FromStr, Rc, String, ToString}, types::{DelayType, Encoding}, - AllBlocks, Buffer, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv, DecodeHex, - DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, HashBlocks, - IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadStdin, SplitString, SysBlocks, - TextBlocks, WriteFile, WriteStderr, WriteStdout, + AllBlocks, Buffer, Clock, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv, + DecodeHex, DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, + HashBlocks, IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadStdin, SplitString, + SysBlocks, TextBlocks, WriteFile, WriteStderr, WriteStdout, }; #[cfg(all(feature = "std", feature = "serde"))] use crate::{ReadSocket, WriteSocket}; @@ -135,6 +135,10 @@ impl CoreBlocks for System { self.0.block(Buffer::::with_system(self)) } + fn clock(&mut self, delay: DelayType) -> Clock { + self.0.block(Clock::with_system(self, delay)) + } + fn const_bytes>(&mut self, value: T) -> Const { self.0 .block(Const::::with_system(self, value.into()))