Skip to content

Commit

Permalink
Merge branch 'feature/async-shadows' into fix/async-shadow
Browse files Browse the repository at this point in the history
  • Loading branch information
KennethKnudsen97 committed Jul 18, 2024
2 parents ed82293 + 1e414a8 commit fb190ca
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 224 deletions.
4 changes: 1 addition & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ serde_cbor = { version = "0.11", default-features = false, optional = true }
serde-json-core = { version = "0.5" }
shadow-derive = { path = "shadow_derive", version = "0.2.1" }
embedded-storage-async = "0.4"
embedded-mqtt = { git = "ssh://[email protected]/BlackbirdHQ/embedded-mqtt/", rev = "dbf8af0", features = [

] }
embedded-mqtt = { git = "ssh://[email protected]/BlackbirdHQ/embedded-mqtt/", rev = "d766137" }

futures = { version = "0.3.28", default-features = false }

Expand Down
6 changes: 3 additions & 3 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[toolchain]
channel = "nightly-2024-06-17"
components = ["rust-src", "rustfmt", "llvm-tools", "clippy"]
channel = "1.79"
components = ["rust-src", "rustfmt", "llvm-tools"]
targets = [
"x86_64-unknown-linux-gnu",
"thumbv7em-none-eabihf",
"thumbv6m-none-eabi",
"thumbv7em-none-eabihf",
]
85 changes: 56 additions & 29 deletions src/fmt.rs
Original file line number Diff line number Diff line change
@@ -1,31 +1,12 @@
// MIT License

// Copyright (c) 2020 Dario Nieuwenhuis <[email protected]>

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:

// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.

// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

#![macro_use]
#![allow(unused_macros)]
#![allow(unused)]

use core::fmt::{Debug, Display, LowerHex};

#[cfg(all(feature = "defmt", feature = "log"))]
compile_error!("You may not enable both `defmt` and `log` features.");

#[collapse_debuginfo(yes)]
macro_rules! assert {
($($x:tt)*) => {
{
Expand All @@ -37,6 +18,7 @@ macro_rules! assert {
};
}

#[collapse_debuginfo(yes)]
macro_rules! assert_eq {
($($x:tt)*) => {
{
Expand All @@ -48,6 +30,7 @@ macro_rules! assert_eq {
};
}

#[collapse_debuginfo(yes)]
macro_rules! assert_ne {
($($x:tt)*) => {
{
Expand All @@ -59,6 +42,7 @@ macro_rules! assert_ne {
};
}

#[collapse_debuginfo(yes)]
macro_rules! debug_assert {
($($x:tt)*) => {
{
Expand All @@ -70,6 +54,7 @@ macro_rules! debug_assert {
};
}

#[collapse_debuginfo(yes)]
macro_rules! debug_assert_eq {
($($x:tt)*) => {
{
Expand All @@ -81,6 +66,7 @@ macro_rules! debug_assert_eq {
};
}

#[collapse_debuginfo(yes)]
macro_rules! debug_assert_ne {
($($x:tt)*) => {
{
Expand All @@ -92,6 +78,7 @@ macro_rules! debug_assert_ne {
};
}

#[collapse_debuginfo(yes)]
macro_rules! todo {
($($x:tt)*) => {
{
Expand All @@ -103,17 +90,23 @@ macro_rules! todo {
};
}

#[cfg(not(feature = "defmt"))]
#[collapse_debuginfo(yes)]
macro_rules! unreachable {
($($x:tt)*) => {
{
#[cfg(not(feature = "defmt"))]
::core::unreachable!($($x)*);
#[cfg(feature = "defmt")]
::defmt::unreachable!($($x)*);
}
::core::unreachable!($($x)*)
};
}

#[cfg(feature = "defmt")]
#[collapse_debuginfo(yes)]
macro_rules! unreachable {
($($x:tt)*) => {
::defmt::unreachable!($($x)*)
};
}

#[collapse_debuginfo(yes)]
macro_rules! panic {
($($x:tt)*) => {
{
Expand All @@ -125,6 +118,7 @@ macro_rules! panic {
};
}

#[collapse_debuginfo(yes)]
macro_rules! trace {
($s:literal $(, $x:expr)* $(,)?) => {
{
Expand All @@ -138,6 +132,7 @@ macro_rules! trace {
};
}

#[collapse_debuginfo(yes)]
macro_rules! debug {
($s:literal $(, $x:expr)* $(,)?) => {
{
Expand All @@ -151,6 +146,7 @@ macro_rules! debug {
};
}

#[collapse_debuginfo(yes)]
macro_rules! info {
($s:literal $(, $x:expr)* $(,)?) => {
{
Expand All @@ -164,6 +160,7 @@ macro_rules! info {
};
}

#[collapse_debuginfo(yes)]
macro_rules! warn {
($s:literal $(, $x:expr)* $(,)?) => {
{
Expand All @@ -177,6 +174,7 @@ macro_rules! warn {
};
}

#[collapse_debuginfo(yes)]
macro_rules! error {
($s:literal $(, $x:expr)* $(,)?) => {
{
Expand All @@ -191,13 +189,15 @@ macro_rules! error {
}

#[cfg(feature = "defmt")]
#[collapse_debuginfo(yes)]
macro_rules! unwrap {
($($x:tt)*) => {
::defmt::unwrap!($($x)*)
};
}

#[cfg(not(feature = "defmt"))]
#[collapse_debuginfo(yes)]
macro_rules! unwrap {
($arg:expr) => {
match $crate::fmt::Try::into_result($arg) {
Expand Down Expand Up @@ -245,3 +245,30 @@ impl<T, E> Try for Result<T, E> {
self
}
}

pub(crate) struct Bytes<'a>(pub &'a [u8]);

impl<'a> Debug for Bytes<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{:#02x?}", self.0)
}
}

impl<'a> Display for Bytes<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{:#02x?}", self.0)
}
}

impl<'a> LowerHex for Bytes<'a> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "{:#02x?}", self.0)
}
}

#[cfg(feature = "defmt")]
impl<'a> defmt::Format for Bytes<'a> {
fn format(&self, fmt: defmt::Formatter) {
defmt::write!(fmt, "{:02x}", self.0)
}
}
4 changes: 2 additions & 2 deletions src/jobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl Jobs {
Describe::new()
}

pub fn update(job_id: &str, status: JobStatus) -> Update {
Update::new(job_id, status)
pub fn update<'a>(status: JobStatus) -> Update<'a> {
Update::new(status)
}
}
36 changes: 11 additions & 25 deletions src/jobs/update.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use serde::Serialize;

use crate::jobs::{
data_types::JobStatus, JobTopic, MAX_CLIENT_TOKEN_LEN, MAX_JOB_ID_LEN, MAX_THING_NAME_LEN,
};
use crate::jobs::{data_types::JobStatus, MAX_CLIENT_TOKEN_LEN};

use super::{JobError, StatusDetailsOwned};

Expand Down Expand Up @@ -69,7 +67,6 @@ pub struct UpdateJobExecutionRequest<'a> {
}

pub struct Update<'a> {
job_id: &'a str,
status: JobStatus,
client_token: Option<&'a str>,
status_details: Option<&'a StatusDetailsOwned>,
Expand All @@ -81,11 +78,8 @@ pub struct Update<'a> {
}

impl<'a> Update<'a> {
pub fn new(job_id: &'a str, status: JobStatus) -> Self {
assert!(job_id.len() < MAX_JOB_ID_LEN);

pub fn new(status: JobStatus) -> Self {
Self {
job_id,
status,
status_details: None,
include_job_document: false,
Expand Down Expand Up @@ -148,17 +142,7 @@ impl<'a> Update<'a> {
}
}

pub fn topic_payload(
self,
client_id: &str,
buf: &mut [u8],
) -> Result<
(
heapless::String<{ MAX_THING_NAME_LEN + MAX_JOB_ID_LEN + 25 }>,
usize,
),
JobError,
> {
pub fn payload(self, buf: &mut [u8]) -> Result<usize, JobError> {
let payload_len = serde_json_core::to_slice(
&UpdateJobExecutionRequest {
execution_number: self.execution_number,
Expand All @@ -174,15 +158,14 @@ impl<'a> Update<'a> {
)
.map_err(|_| JobError::Encoding)?;

Ok((
JobTopic::Update(self.job_id).format(client_id)?,
payload_len,
))
Ok(payload_len)
}
}

#[cfg(test)]
mod test {
use crate::jobs::JobTopic;

use super::*;
use serde_json_core::to_string;

Expand All @@ -207,14 +190,17 @@ mod test {
#[test]
fn topic_payload() {
let mut buf = [0u8; 512];
let (topic, payload_len) = Update::new("test_job_id", JobStatus::Failed)
let topic = JobTopic::Update("test_job_id")
.format::<64>("test_client")
.unwrap();
let payload_len = Update::new(JobStatus::Failed)
.client_token("test_client:token_update")
.step_timeout_in_minutes(50)
.execution_number(5)
.expected_version(2)
.include_job_document()
.include_job_execution_state()
.topic_payload("test_client", &mut buf)
.payload(&mut buf)
.unwrap();

assert_eq!(&buf[..payload_len], br#"{"executionNumber":5,"expectedVersion":2,"includeJobDocument":true,"includeJobExecutionState":true,"status":"FAILED","stepTimeoutInMinutes":50,"clientToken":"test_client:token_update"}"#);
Expand Down
22 changes: 14 additions & 8 deletions src/ota/control_interface/mqtt.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use core::fmt::Write;

use embassy_sync::blocking_mutex::raw::RawMutex;
use embedded_mqtt::{Publish, QoS};
use embedded_mqtt::{DeferredPayload, EncodingError, Publish, QoS};

use super::ControlInterface;
use crate::jobs::data_types::JobStatus;
use crate::jobs::Jobs;
use crate::jobs::{JobTopic, Jobs, MAX_JOB_ID_LEN, MAX_THING_NAME_LEN};
use crate::ota::config::Config;
use crate::ota::encoding::json::JobStatusReason;
use crate::ota::encoding::FileContext;
Expand Down Expand Up @@ -93,19 +93,25 @@ impl<'a, M: RawMutex, const SUBS: usize> ControlInterface
}
}

// FIXME: Serialize directly into the publish payload through `DeferredPublish` API
let mut buf = [0u8; 512];
let (topic, payload_len) = Jobs::update(file_ctx.job_name.as_str(), status)
.status_details(&file_ctx.status_details)
.topic_payload(self.client_id(), &mut buf)?;
let topic = JobTopic::Update(file_ctx.job_name.as_str())
.format::<{ MAX_THING_NAME_LEN + MAX_JOB_ID_LEN + 25 }>(self.client_id())?;
let payload = DeferredPayload::new(
|buf| {
Jobs::update(status)
.status_details(&file_ctx.status_details)
.payload(buf)
.map_err(|_| EncodingError::BufferSize)
},
512,
);

self.publish(Publish {
dup: false,
qos,
retain: false,
pid: None,
topic_name: &topic,
payload: &buf[..payload_len],
payload,
properties: embedded_mqtt::Properties::Slice(&[]),
})
.await?;
Expand Down
2 changes: 1 addition & 1 deletion src/ota/data_interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl<'a> FileBlock<'a> {
}

pub trait BlockTransfer {
async fn next_block(&mut self) -> Result<impl DerefMut<Target = [u8]>, OtaError>;
async fn next_block(&mut self) -> Result<Option<impl DerefMut<Target = [u8]>>, OtaError>;
}

pub trait DataInterface {
Expand Down
6 changes: 3 additions & 3 deletions src/ota/data_interface/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ impl<'a> OtaTopic<'a> {
}

impl<'a, 'b, M: RawMutex, const SUBS: usize> BlockTransfer for Subscription<'a, 'b, M, SUBS, 1> {
async fn next_block(&mut self) -> Result<impl DerefMut<Target = [u8]>, OtaError> {
Ok(self.next().await.ok_or(OtaError::Encoding)?)
async fn next_block(&mut self) -> Result<Option<impl DerefMut<Target = [u8]>>, OtaError> {
Ok(self.next().await)
}
}

Expand Down Expand Up @@ -179,7 +179,7 @@ impl<'a, M: RawMutex, const SUBS: usize> DataInterface for MqttClient<'a, M, SUB
},
buf,
)
.map_err(|e| EncodingError::BufferSize)
.map_err(|_| EncodingError::BufferSize)
},
32,
);
Expand Down
Loading

0 comments on commit fb190ca

Please sign in to comment.