From 963a10c10ad9c175037bc1e4c48d9132865a1fd7 Mon Sep 17 00:00:00 2001 From: Jan Zachmann <50990105+JanZachmann@users.noreply.github.com> Date: Mon, 22 Apr 2024 18:48:45 +0200 Subject: [PATCH] fixed sender --- src/client/mod.rs | 67 ++++++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index 44f612f..163283f 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -93,8 +93,11 @@ pub enum TwinUpdateState { Partial = 1, } +/// Used to update [desired properties](https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-module-twins#back-end-operations) to the client pub struct TwinUpdate { + /// type of update [`TwinUpdateState`] pub state: TwinUpdateState, + /// value pub value: serde_json::Value, } @@ -153,15 +156,15 @@ pub type IotMessageSender = mpsc::Sender; /// Provides a channel and a property array to receive incoming cloud to device messages #[derive(Clone, Debug)] pub struct IncomingMessageObserver { - observer: IotMessageSender, + responder: IotMessageSender, properties: Vec, } impl IncomingMessageObserver { /// Creates a new instance of [`IncomingMessageObserver`] - pub fn new(observer: IotMessageSender, properties: Vec) -> Self { + pub fn new(responder: IotMessageSender, properties: Vec) -> Self { IncomingMessageObserver { - observer, + responder, properties, } } @@ -222,10 +225,10 @@ struct RetrySetting { /// ``` #[derive(Debug, Default)] pub struct IotHubClientBuilder { - tx_connection_status: Option>, - tx_twin_desired: Option>, - tx_direct_method: Option, - tx_incoming_message: Option, + tx_connection_status: Option>>, + tx_twin_desired: Option>>, + tx_direct_method: Option>, + tx_incoming_message: Option>, model_id: Option<&'static str>, retry_setting: Option, } @@ -460,7 +463,7 @@ impl IotHubClientBuilder { mut self, tx_connection_status: mpsc::Sender, ) -> Self { - self.tx_connection_status = Some(tx_connection_status); + self.tx_connection_status = Some(Box::new(tx_connection_status)); self } @@ -491,11 +494,8 @@ impl IotHubClientBuilder { /// } /// } /// ``` - pub fn observe_desired_properties( - mut self, - tx_twin_desired: mpsc::Sender, - ) -> Self { - self.tx_twin_desired = Some(tx_twin_desired); + pub fn observe_desired_properties(mut self, tx_twin_desired: mpsc::Sender) -> Self { + self.tx_twin_desired = Some(Box::new(tx_twin_desired)); self } @@ -536,7 +536,7 @@ impl IotHubClientBuilder { /// } /// ``` pub fn observe_direct_methods(mut self, tx_direct_method: DirectMethodSender) -> Self { - self.tx_direct_method = Some(tx_direct_method); + self.tx_direct_method = Some(Box::new(tx_direct_method)); self } @@ -580,7 +580,7 @@ impl IotHubClientBuilder { mut self, tx_incoming_message: IncomingMessageObserver, ) -> Self { - self.tx_incoming_message = Some(tx_incoming_message); + self.tx_incoming_message = Some(Box::new(tx_incoming_message)); self } @@ -697,10 +697,10 @@ impl IotHubClientBuilder { /// ``` pub struct IotHubClient { twin: Box, - tx_connection_status: Option>, - tx_twin_desired: Option>, - tx_direct_method: Option, - tx_incoming_message: Option, + tx_connection_status: Option>>, + tx_twin_desired: Option>>, + tx_direct_method: Option>, + tx_incoming_message: Option>, model_id: Option<&'static str>, retry_setting: Option, confirmation_set: JoinSet<()>, @@ -1013,31 +1013,31 @@ impl IotHubClient { } fn set_callbacks(&mut self) -> Result<()> { - if let Some(tx) = &self.tx_connection_status { + if let Some(tx) = self.tx_connection_status.as_deref_mut() { self.twin.set_connection_status_callback( Some(IotHubClient::c_connection_status_callback), - Box::into_raw(Box::new(tx.clone())) as *mut c_void, + tx as *mut mpsc::Sender as *mut c_void, )?; } - if let Some(tx) = &self.tx_incoming_message { + if let Some(tx) = self.tx_incoming_message.as_deref_mut() { self.twin.set_input_message_callback( Some(IotHubClient::c_c2d_message_callback), - Box::into_raw(Box::new(tx.clone())) as *mut c_void, + tx as *mut IncomingMessageObserver as *mut c_void, )?; } - if let Some(tx) = &self.tx_twin_desired { + if let Some(tx) = self.tx_twin_desired.as_deref_mut() { self.twin.set_twin_callback( Some(IotHubClient::c_twin_callback), - Box::into_raw(Box::new(tx.clone())) as *mut c_void, + tx as *mut mpsc::Sender as *mut c_void, )?; } - if let Some(tx) = &self.tx_direct_method { + if let Some(tx) = self.tx_direct_method.as_deref_mut() { self.twin.set_method_callback( Some(IotHubClient::c_direct_method_callback), - Box::into_raw(Box::new(tx.clone())) as *mut c_void, + tx as *mut DirectMethodSender as *mut c_void, )?; } @@ -1099,7 +1099,7 @@ impl IotHubClient { status_reason: IOTHUB_CLIENT_CONNECTION_STATUS_REASON, context: *mut ::std::os::raw::c_void, ) { - let tx = Box::from_raw(context as *mut tokio::sync::mpsc::Sender); + let tx = &mut *(context as *mut mpsc::Sender); let status = match connection_status { IOTHUB_CLIENT_CONNECTION_STATUS_TAG_IOTHUB_CLIENT_CONNECTION_AUTHENTICATED => { @@ -1151,7 +1151,7 @@ impl IotHubClient { handle: *mut IOTHUB_MESSAGE_HANDLE_DATA_TAG, context: *mut ::std::os::raw::c_void, ) -> IOTHUBMESSAGE_DISPOSITION_RESULT { - let observer = Box::from_raw(context as *mut IncomingMessageObserver); + let observer = &mut *(context as *mut IncomingMessageObserver); let mut property_keys: Vec = vec![]; for property in &observer.properties { @@ -1173,7 +1173,7 @@ impl IotHubClient { let (tx_result, rx_result) = oneshot::channel::>(); observer - .observer + .responder .blocking_send(IncomingIotMessage { inner: msg, responder: tx_result, @@ -1216,9 +1216,7 @@ impl IotHubClient { size: usize, context: *mut ::std::os::raw::c_void, ) { - let tx = Box::from_raw( - context as *mut tokio::sync::mpsc::Sender, - ); + let tx = &mut *(context as *mut mpsc::Sender); match String::from_utf8(slice::from_raw_parts(payload, size).to_vec()) { Ok(desired_string) => { @@ -1269,8 +1267,7 @@ impl IotHubClient { const METHOD_RESPONSE_SUCCESS: i32 = 200; const METHOD_RESPONSE_ERROR: i32 = 401; - let tx_direct_method = - Box::from_raw(context as *mut tokio::sync::mpsc::Sender); + let tx_direct_method = &mut *(context as *mut DirectMethodSender); let empty_result: CString = CString::from_vec_unchecked(b"{ }".to_vec()); *response_size = empty_result.as_bytes().len();