Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Clock block #34

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ The built-in blocks provided by Protoflow are listed below:
| Block | Description |
|:------------------|:-------------------------------------------------------------------------------------------------------------------------------|
| [`Buffer`] | Stores all messages it receives. |
| [`Clock`] | Periodically sends current timestamp. |
| [`ConcatStrings`] | Concatenates the received string messages, with an optional delimiter string inserted between each message. |
| [`Const`] | Sends a constant value. |
| [`Count`] | Counts the number of messages it receives, while optionally passing them through. |
Expand Down Expand Up @@ -159,6 +160,38 @@ block-beta
protoflow execute Buffer
```

#### [`Clock`]

A block that periodically sends current timestamp.

```mermaid
block-beta
columns 4
Clock space:2 Sink
Clock-- "output" -->Sink

classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Clock block
class Sink hidden
```

```bash
protoflow execute Clock fixed=2
```

```bash
protoflow execute Clock fixed=0.5
```

```bash
protoflow execute Clock random=1..5
```

```bash
protoflow execute Clock random=0.5..1.5
```

#### [`ConcatStrings`]

A block for concatenating all string messages it receives, with an optional delimiter string inserted between each message
Expand Down Expand Up @@ -795,6 +828,7 @@ To add a new block type implementation, make sure to examine and amend:
[`examples`]: lib/protoflow/examples

[`Buffer`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Buffer.html
[`Clock`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Clock.html
[`ConcatStrings`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.ConcatStrings.html
[`Const`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Const.html
[`Count`]: https://docs.rs/protoflow-blocks/latest/protoflow_blocks/struct.Count.html
Expand Down
4 changes: 3 additions & 1 deletion lib/protoflow-blocks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ hash-sha2 = ["dep:sha2"]

rand = ["protoflow-core/rand"]
std = [
"blake3?/std",
"protoflow-core/std",
"chrono/std",
"blake3?/std",
"serde?/std",
"sysml-model?/std",
"tracing?/std",
Expand Down Expand Up @@ -67,6 +68,7 @@ struson = "0.5"
sysml-model = { version = "=0.2.3", default-features = false, optional = true }
ubyte = { version = "0.10", default-features = false }
csv = "1.3.1"
chrono = { version = "0.4.39", default-features = false, features = ["now"] }

[dev-dependencies]
bytes = "1.8.0"
Expand Down
9 changes: 9 additions & 0 deletions lib/protoflow-blocks/doc/core/clock.mmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
block-beta
columns 4
Clock space:2 Sink
Clock-- "output" -->Sink

classDef block height:48px,padding:8px;
classDef hidden visibility:none;
class Clock block
class Sink hidden
12 changes: 12 additions & 0 deletions lib/protoflow-blocks/doc/core/clock.seq.mmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
sequenceDiagram
autonumber
participant Clock as Clock block
participant Clock.output as Clock.output port
participant BlockA as Another block

Clock-->>BlockA: Connect

Clock->>BlockA: Message

Clock-->>Clock.output: Close
Clock-->>BlockA: Disconnect
22 changes: 22 additions & 0 deletions lib/protoflow-blocks/src/blocks/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ pub mod core {
pub trait CoreBlocks {
fn buffer<T: Message + Into<T> + 'static>(&mut self) -> Buffer<T>;

fn clock(&mut self, delay: DelayType) -> Clock;

fn clock_fixed(&mut self, delay: Duration) -> Clock {
self.clock(DelayType::Fixed(delay))
}

fn clock_random(&mut self, delay: Range<Duration>) -> Clock {
self.clock(DelayType::Random(delay))
}

fn const_bytes<T: Into<Bytes>>(&mut self, value: T) -> Const<Bytes>;

fn const_string(&mut self, value: impl ToString) -> Const<String>;
Expand Down Expand Up @@ -43,6 +53,7 @@ pub mod core {
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub enum CoreBlockTag {
Buffer,
Clock,
Const,
Count,
Delay,
Expand All @@ -57,6 +68,11 @@ pub mod core {
input: InputPortName,
},

Clock {
output: OutputPortName,
delay: DelayType,
},

Const {
output: OutputPortName,
value: String,
Expand Down Expand Up @@ -89,6 +105,7 @@ pub mod core {
use CoreBlockConfig::*;
Cow::Borrowed(match self {
Buffer { .. } => "Buffer",
Clock { .. } => "Clock",
Const { .. } => "Const",
Count { .. } => "Count",
Delay { .. } => "Delay",
Expand All @@ -103,6 +120,7 @@ pub mod core {
use CoreBlockConfig::*;
match self {
Buffer { .. } => vec![],
Clock { output, .. } => vec![("output", Some(output.clone()))],
Const { output, .. } => vec![("output", Some(output.clone()))],
Count { output, count, .. } => {
vec![("output", output.clone()), ("count", Some(count.clone()))]
Expand All @@ -120,6 +138,7 @@ pub mod core {
use CoreBlockConfig::*;
match self {
Buffer { .. } => Box::new(super::Buffer::new(system.input_any())), // TODO: Buffer::with_system(system)
Clock { delay, .. } => Box::new(super::Clock::with_system(system, delay.clone())),
Const { value, .. } => Box::new(super::Const::with_system(system, value.clone())),
Count { .. } => Box::new(super::Count::new(
system.input_any(),
Expand All @@ -146,6 +165,9 @@ pub mod core {
mod buffer;
pub use buffer::*;

mod clock;
pub use clock::*;

mod r#const;
pub use r#const::*;

Expand Down
162 changes: 162 additions & 0 deletions lib/protoflow-blocks/src/blocks/core/clock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// This is free and unencumbered software released into the public domain.

use crate::{prelude::String, types::DelayType, StdioConfig, StdioError, StdioSystem, System};
use protoflow_core::{Block, BlockResult, BlockRuntime, OutputPort};
use protoflow_derive::Block;
use simple_mermaid::mermaid;

/// A block that periodically sends current timestamp.
///
/// This block sends current timestamp on its output port, with interval specified by the parameter.
///
/// The timestamp is a Unix UTC timestamp in microseconds passed as a [`i64`] value.
///
/// The block waits for the output port to be connected before sending the value.
///
/// The block does not have any input ports nor state.
///
/// # Block Diagram
#[doc = mermaid!("../../../doc/core/clock.mmd")]
///
/// # Sequence Diagram
#[doc = mermaid!("../../../doc/core/clock.seq.mmd" framed)]
///
/// # Examples
///
/// ## Using the block in a system
///
/// ```rust
/// # use protoflow_blocks::*;
/// # use std::time::Duration;
/// # fn main() {
/// System::build(|s| {
/// let stdin = s.clock_fixed(Duration::from_secs(1));
/// let encode_lines = s.encode_lines();
/// let stdout = s.write_stdout();
/// s.connect(&stdin.output, &encode_lines.input);
/// s.connect(&encode_lines.output, &stdout.input);
/// });
/// # }
/// ```
///
/// ## Running the block via the CLI
///
/// ```console
/// $ protoflow execute Clock fixed=2
/// ```
///
/// ```console
/// $ protoflow execute Clock fixed=0.5
/// ```
///
/// ```console
/// $ protoflow execute Clock random=1..5
/// ```
///
/// ```console
/// $ protoflow execute Clock random=0.5..1.5
/// ```
///
#[derive(Block, Clone)]
pub struct Clock {
/// The port to send the timestamp on.
#[output]
pub output: OutputPort<i64>,

/// A delay between outputs.
#[parameter]
pub delay: DelayType,
}

impl Clock {
pub fn new(output: OutputPort<i64>, delay: DelayType) -> Self {
Self::with_params(output, delay)
}
}

impl Clock {
pub fn with_params(output: OutputPort<i64>, delay: DelayType) -> Self {
Self { output, delay }
}
}

impl Clock {
pub fn with_system(system: &System, delay: DelayType) -> Self {
use crate::SystemBuilding;
Self::with_params(system.output(), delay)
}
}

impl Block for Clock {
fn execute(&mut self, runtime: &dyn BlockRuntime) -> BlockResult {
runtime.wait_for(&self.output)?;

loop {
let now = chrono::Utc::now().timestamp_micros();
self.output.send(&now)?;

let duration = match self.delay {
DelayType::Fixed(duration) => duration,
DelayType::Random(ref range) => runtime.random_duration(range.clone()),
};
runtime.sleep_for(duration)?;
}
}
}

fn parse_range(range_str: &String) -> Option<(f64, f64)> {
if let Some(range_str) = range_str.split_once("..") {
match (range_str.0.parse::<f64>(), range_str.1.parse::<f64>()) {
(Ok(range0), Ok(range1)) => Some((range0, range1)),
_ => None,
}
} else {
None
}
}

#[cfg(feature = "std")]
impl StdioSystem for Clock {
fn build_system(config: StdioConfig) -> Result<System, StdioError> {
use crate::{prelude::Duration, CoreBlocks, IoBlocks, SysBlocks, SystemBuilding};

let delay_type = if let Some(delay) = config.get_opt::<f64>("fixed")? {
DelayType::Fixed(Duration::from_secs_f64(delay))
} else if let Some(delay) = config.get_opt::<String>("random")? {
if let Some(range) = parse_range(&delay) {
DelayType::Random(
Duration::from_secs_f64(range.0)..Duration::from_secs_f64(range.1),
)
} else {
return Err(StdioError::InvalidParameter("random"));
}
} else {
return Err(StdioError::MissingParameter("fixed or random"));
};

Ok(System::build(|s| {
let stdin = s.clock(delay_type);
let encode_lines = s.encode_lines();
let stdout = s.write_stdout();
s.connect(&stdin.output, &encode_lines.input);
s.connect(&encode_lines.output, &stdout.input);
}))
}
}

#[cfg(test)]
mod tests {
use super::Clock;
use crate::{prelude::Duration, DelayType, System, SystemBuilding};

#[test]
fn instantiate_block() {
// Check that the block is constructible:
let _ = System::build(|s| {
let _ = s.block(Clock::with_params(
s.output(),
DelayType::Fixed(Duration::from_secs(1)),
));
});
}
}
1 change: 1 addition & 0 deletions lib/protoflow-blocks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub fn build_stdio_system(
Ok(match system_name.as_ref() {
// CoreBlocks
"Buffer" => Buffer::<String>::build_system(config)?,
"Clock" => Clock::build_system(config)?,
"Const" => Const::<String>::build_system(config)?,
"Count" => Count::<String>::build_system(config)?,
"Delay" => Delay::<String>::build_system(config)?,
Expand Down
12 changes: 8 additions & 4 deletions lib/protoflow-blocks/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
use crate::{
prelude::{fmt, Arc, Box, Bytes, FromStr, Rc, String, ToString},
types::{DelayType, Encoding},
AllBlocks, Buffer, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv, DecodeHex,
DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks, HashBlocks,
IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadStdin, SplitString, SysBlocks,
TextBlocks, WriteFile, WriteStderr, WriteStdout,
AllBlocks, Buffer, Clock, ConcatStrings, Const, CoreBlocks, Count, Decode, DecodeCsv,
DecodeHex, DecodeJson, Delay, Drop, Encode, EncodeCsv, EncodeHex, EncodeJson, FlowBlocks,
HashBlocks, IoBlocks, MathBlocks, Random, ReadDir, ReadEnv, ReadFile, ReadStdin, SplitString,
SysBlocks, TextBlocks, WriteFile, WriteStderr, WriteStdout,
};
#[cfg(all(feature = "std", feature = "serde"))]
use crate::{ReadSocket, WriteSocket};
Expand Down Expand Up @@ -135,6 +135,10 @@ impl CoreBlocks for System {
self.0.block(Buffer::<T>::with_system(self))
}

fn clock(&mut self, delay: DelayType) -> Clock {
self.0.block(Clock::with_system(self, delay))
}

fn const_bytes<T: Into<Bytes>>(&mut self, value: T) -> Const<Bytes> {
self.0
.block(Const::<Bytes>::with_system(self, value.into()))
Expand Down