Skip to content

Commit

Permalink
Rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
MathiasKoch committed Jul 19, 2024
2 parents 687290a + f424608 commit fb8f981
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 45 deletions.
12 changes: 10 additions & 2 deletions src/provisioning/error.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#[derive(Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]

pub enum Error {
Overflow,
InvalidPayload,
InvalidState,
Mqtt,
DeserializeJson(serde_json_core::de::Error),
Mqtt(embedded_mqtt::Error),
DeserializeJson(#[cfg_attr(feature = "defmt", defmt(Debug2Format))] serde_json_core::de::Error),
DeserializeCbor,
CertificateStorage,
Response(u16),
Expand All @@ -27,3 +29,9 @@ impl From<serde_cbor::Error> for Error {
Self::DeserializeCbor
}
}

impl From<embedded_mqtt::Error> for Error {
fn from(e: embedded_mqtt::Error) -> Self {
Self::Mqtt(e)
}
}
94 changes: 51 additions & 43 deletions src/provisioning/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ use embedded_mqtt::{
SubscribeTopic, Subscription,
};
use futures::StreamExt;
use serde::Serialize;
use serde::{de::DeserializeOwned, Deserialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};

pub use error::Error;

use self::data_types::CreateCertificateFromCsrRequest;
use self::{
data_types::{
CreateCertificateFromCsrRequest, CreateCertificateFromCsrResponse,
CreateKeysAndCertificateResponse, ErrorResponse, RegisterThingRequest,
RegisterThingResponse,
},
Expand Down Expand Up @@ -137,7 +136,6 @@ impl FleetProvisioner {
where
C: DeserializeOwned,
{
use crate::provisioning::data_types::CreateCertificateFromCsrResponse;
let mut create_subscription = Self::begin(mqtt, csr, payload_format).await?;
let mut message = create_subscription
.next()
Expand Down Expand Up @@ -230,11 +228,7 @@ impl FleetProvisioner {
retain_as_published: false,
retain_handling: RetainHandling::SendAtSubscribeTime,
}]))
.await
.map_err(|e| {
error!("Failed subscription to RegisterThingAny! {:?}", e);
Error::Mqtt
})?;
.await?;

mqtt.publish(Publish {
dup: false,
Expand All @@ -247,11 +241,7 @@ impl FleetProvisioner {
payload,
properties: embedded_mqtt::Properties::Slice(&[]),
})
.await
.map_err(|e| {
error!("Failed publish to RegisterThing! {:?}", e);
Error::Mqtt
})?;
.await?;

drop(message);
drop(create_subscription);
Expand Down Expand Up @@ -286,20 +276,30 @@ impl FleetProvisioner {
mqtt: &'b embedded_mqtt::MqttClient<'a, M, SUBS>,
csr: Option<&str>,
payload_format: PayloadFormat,
) -> Result<Subscription<'a, 'b, M, SUBS, 1>, Error> {
) -> Result<Subscription<'a, 'b, M, SUBS, 2>, Error> {
if let Some(csr) = csr {
let subscription = mqtt
.subscribe::<1>(Subscribe::new(&[SubscribeTopic {
topic_path: Topic::CreateCertificateFromCsrAccepted(payload_format)
.format::<47>()?
.as_str(),
maximum_qos: QoS::AtLeastOnce,
no_local: false,
retain_as_published: false,
retain_handling: RetainHandling::SendAtSubscribeTime,
}]))
.await
.map_err(|_| Error::Mqtt)?;
.subscribe(Subscribe::new(&[
SubscribeTopic {
topic_path: Topic::CreateCertificateFromCsrRejected(payload_format)
.format::<47>()?
.as_str(),
maximum_qos: QoS::AtLeastOnce,
no_local: false,
retain_as_published: false,
retain_handling: RetainHandling::SendAtSubscribeTime,
},
SubscribeTopic {
topic_path: Topic::CreateCertificateFromCsrAccepted(payload_format)
.format::<47>()?
.as_str(),
maximum_qos: QoS::AtLeastOnce,
no_local: false,
retain_as_published: false,
retain_handling: RetainHandling::SendAtSubscribeTime,
},
]))
.await?;

let request = CreateCertificateFromCsrRequest {
certificate_signing_request: csr,
Expand All @@ -322,7 +322,7 @@ impl FleetProvisioner {
.map_err(|_| EncodingError::BufferSize)?,
})
},
1024,
csr.len() + 32,
);

mqtt.publish(Publish {
Expand All @@ -331,28 +331,37 @@ impl FleetProvisioner {
retain: false,
pid: None,
topic_name: Topic::CreateCertificateFromCsr(payload_format)
.format::<38>()?
.format::<40>()?
.as_str(),
payload,
properties: embedded_mqtt::Properties::Slice(&[]),
})
.await
.map_err(|_| Error::Mqtt)?;
.await?;

Ok(subscription)
} else {
let subscription = mqtt
.subscribe::<1>(Subscribe::new(&[SubscribeTopic {
topic_path: Topic::CreateKeysAndCertificateAny(payload_format)
.format::<31>()?
.as_str(),
maximum_qos: QoS::AtLeastOnce,
no_local: false,
retain_as_published: false,
retain_handling: RetainHandling::SendAtSubscribeTime,
}]))
.await
.map_err(|_| Error::Mqtt)?;
.subscribe(Subscribe::new(&[
SubscribeTopic {
topic_path: Topic::CreateKeysAndCertificateAccepted(payload_format)
.format::<38>()?
.as_str(),
maximum_qos: QoS::AtLeastOnce,
no_local: false,
retain_as_published: false,
retain_handling: RetainHandling::SendAtSubscribeTime,
},
SubscribeTopic {
topic_path: Topic::CreateKeysAndCertificateRejected(payload_format)
.format::<38>()?
.as_str(),
maximum_qos: QoS::AtLeastOnce,
no_local: false,
retain_as_published: false,
retain_handling: RetainHandling::SendAtSubscribeTime,
},
]))
.await?;

mqtt.publish(Publish {
dup: false,
Expand All @@ -365,8 +374,7 @@ impl FleetProvisioner {
payload: b"",
properties: embedded_mqtt::Properties::Slice(&[]),
})
.await
.map_err(|_| Error::Mqtt)?;
.await?;

Ok(subscription)
}
Expand Down

0 comments on commit fb8f981

Please sign in to comment.