diff --git a/Cargo.toml b/Cargo.toml index 1a42fbc..4063f6f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ authors = ["omnect@conplement.de>"] edition = "2021" name = "azure-iot-sdk" repository = "git@github.com:omnect/azure-iot-sdk.git" -version = "0.12.0" +version = "0.12.1" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -19,6 +19,7 @@ url = "2.4" [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } +azure-iot-sdk = { path = ".", features = ["module_client"] } [features] # select either "module_client", "edge_client" or "device_client" functionality diff --git a/src/client/mod.rs b/src/client/mod.rs index cd0b4cb..44f612f 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -93,6 +93,11 @@ pub enum TwinUpdateState { Partial = 1, } +pub struct TwinUpdate { + pub state: TwinUpdateState, + pub value: serde_json::Value, +} + /// Reason for unauthenticated connection result #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum UnauthenticatedReason { @@ -218,7 +223,7 @@ struct RetrySetting { #[derive(Debug, Default)] pub struct IotHubClientBuilder { tx_connection_status: Option>, - tx_twin_desired: Option>, + tx_twin_desired: Option>, tx_direct_method: Option, tx_incoming_message: Option, model_id: Option<&'static str>, @@ -488,7 +493,7 @@ impl IotHubClientBuilder { /// ``` pub fn observe_desired_properties( mut self, - tx_twin_desired: mpsc::Sender<(TwinUpdateState, serde_json::Value)>, + tx_twin_desired: mpsc::Sender, ) -> Self { self.tx_twin_desired = Some(tx_twin_desired); self @@ -693,7 +698,7 @@ impl IotHubClientBuilder { pub struct IotHubClient { twin: Box, tx_connection_status: Option>, - tx_twin_desired: Option>, + tx_twin_desired: Option>, tx_direct_method: Option, tx_incoming_message: Option, model_id: Option<&'static str>, @@ -1008,27 +1013,32 @@ impl IotHubClient { } fn set_callbacks(&mut self) -> Result<()> { - let context = self as *mut IotHubClient as *mut c_void; - - if self.tx_connection_status.is_some() { + if let Some(tx) = &self.tx_connection_status { self.twin.set_connection_status_callback( Some(IotHubClient::c_connection_status_callback), - context, + Box::into_raw(Box::new(tx.clone())) as *mut c_void, )?; } - if self.tx_incoming_message.is_some() { - self.twin - .set_input_message_callback(Some(IotHubClient::c_c2d_message_callback), context)?; + if let Some(tx) = &self.tx_incoming_message { + self.twin.set_input_message_callback( + Some(IotHubClient::c_c2d_message_callback), + Box::into_raw(Box::new(tx.clone())) as *mut c_void, + )?; } - if self.tx_twin_desired.is_some() { - self.twin - .set_twin_callback(Some(IotHubClient::c_twin_callback), context)?; + if let Some(tx) = &self.tx_twin_desired { + self.twin.set_twin_callback( + Some(IotHubClient::c_twin_callback), + Box::into_raw(Box::new(tx.clone())) as *mut c_void, + )?; } - if self.tx_direct_method.is_some() { - self.twin - .set_method_callback(Some(IotHubClient::c_direct_method_callback), context)?; + + if let Some(tx) = &self.tx_direct_method { + self.twin.set_method_callback( + Some(IotHubClient::c_direct_method_callback), + Box::into_raw(Box::new(tx.clone())) as *mut c_void, + )?; } Ok(()) @@ -1089,7 +1099,7 @@ impl IotHubClient { status_reason: IOTHUB_CLIENT_CONNECTION_STATUS_REASON, context: *mut ::std::os::raw::c_void, ) { - let client = &mut *(context as *mut IotHubClient); + let tx = Box::from_raw(context as *mut tokio::sync::mpsc::Sender); let status = match connection_status { IOTHUB_CLIENT_CONNECTION_STATUS_TAG_IOTHUB_CLIENT_CONNECTION_AUTHENTICATED => { @@ -1133,81 +1143,70 @@ impl IotHubClient { debug!("Received connection status: {status:?}"); - if let Some(tx) = &client.tx_connection_status { - tx.blocking_send(status) - .expect("c_connection_status_callback: cannot blocking_send"); - } + tx.blocking_send(status) + .expect("c_connection_status_callback: cannot blocking_send"); } unsafe extern "C" fn c_c2d_message_callback( handle: *mut IOTHUB_MESSAGE_HANDLE_DATA_TAG, context: *mut ::std::os::raw::c_void, ) -> IOTHUBMESSAGE_DISPOSITION_RESULT { - let client = &mut *(context as *mut IotHubClient); + let observer = Box::from_raw(context as *mut IncomingMessageObserver); + let mut property_keys: Vec = vec![]; - if let Some(observer) = &client.tx_incoming_message { - let mut property_keys: Vec = vec![]; - for property in &observer.properties { - match CString::new(property.clone()) { - Ok(p) => property_keys.push(p), - Err(e) => { - error!( - "invalid property in c2d message received. payload: {property}, error: {e}" - ); - return IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED; - } + for property in &observer.properties { + match CString::new(property.clone()) { + Ok(p) => property_keys.push(p), + Err(e) => { + error!( + "invalid property in c2d message received. payload: {property}, error: {e}" + ); + return IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED; } } + } - match IotMessage::from_incoming_handle(handle, property_keys) { - Ok(msg) => { - debug!("Received message from iothub: {msg:?}"); + match IotMessage::from_incoming_handle(handle, property_keys) { + Ok(msg) => { + debug!("Received message from iothub: {msg:?}"); - let (tx_result, rx_result) = oneshot::channel::>(); + let (tx_result, rx_result) = oneshot::channel::>(); - observer - .observer - .blocking_send(IncomingIotMessage { - inner: msg, - responder: tx_result, - }) - .expect("c_c2d_message_callback: cannot blocking_send"); + observer + .observer + .blocking_send(IncomingIotMessage { + inner: msg, + responder: tx_result, + }) + .expect("c_c2d_message_callback: cannot blocking_send"); - match rx_result.blocking_recv() { - Ok(Ok(DispositionResult::Accepted)) => { - IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_ACCEPTED - } - Ok(Ok(DispositionResult::Rejected)) => { - IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED - } - Ok(Ok(DispositionResult::Abandoned)) => { - IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_ABANDONED - } - Ok(Ok(DispositionResult::AsyncAck)) => { - IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_ASYNC_ACK - } - Ok(Err(e)) => { - error!("cannot handle c2d message: {e}"); - IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED - } - Err(e) => { - error!("channel unexpectedly closed: {e}"); - IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED - } + match rx_result.blocking_recv() { + Ok(Ok(DispositionResult::Accepted)) => { + IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_ACCEPTED + } + Ok(Ok(DispositionResult::Rejected)) => { + IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED + } + Ok(Ok(DispositionResult::Abandoned)) => { + IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_ABANDONED + } + Ok(Ok(DispositionResult::AsyncAck)) => { + IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_ASYNC_ACK + } + Ok(Err(e)) => { + error!("cannot handle c2d message: {e}"); + IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED + } + Err(e) => { + error!("channel unexpectedly closed: {e}"); + IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED } - } - Err(e) => { - error!("cannot create IotMessage from incomming handle: {e}"); - IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED } } - } else { - match IotMessage::from_incoming_handle(handle, vec![]) { - Ok(msg) => debug!("Received message from iothub: {msg:?}"), - Err(e) => error!("cannot create IotMessage from incomming handle: {e}"), + Err(e) => { + error!("cannot create IotMessage from incomming handle: {e}"); + IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED } - - IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED } } @@ -1217,21 +1216,25 @@ impl IotHubClient { size: usize, context: *mut ::std::os::raw::c_void, ) { + let tx = Box::from_raw( + context as *mut tokio::sync::mpsc::Sender, + ); + match String::from_utf8(slice::from_raw_parts(payload, size).to_vec()) { Ok(desired_string) => { match serde_json::from_str::(&desired_string) { Ok(desired_json) => { - let client = &mut *(context as *mut IotHubClient); let desired_state: TwinUpdateState = mem::transmute(state as i8); debug!( "Twin callback. state: {desired_state:?} size: {size} payload: {desired_json}" ); - if let Some(tx) = &client.tx_twin_desired { - tx.blocking_send((desired_state, desired_json)) - .expect("c_twin_callback: cannot blocking_send"); - } + tx.blocking_send(TwinUpdate { + state: desired_state, + value: desired_json, + }) + .expect("c_twin_callback: cannot blocking_send"); } Err(e) => error!( "desired twin cannot be parsed. payload: {desired_string} error: {e}" @@ -1248,8 +1251,7 @@ impl IotHubClient { ) { trace!("SendReportedTwin result: {status_code}"); - let result: Box> = - Box::from_raw(context as *mut oneshot::Sender); + let result = Box::from_raw(context as *mut oneshot::Sender); result .send(status_code == 204) @@ -1267,6 +1269,9 @@ impl IotHubClient { const METHOD_RESPONSE_SUCCESS: i32 = 200; const METHOD_RESPONSE_ERROR: i32 = 401; + let tx_direct_method = + Box::from_raw(context as *mut tokio::sync::mpsc::Sender); + let empty_result: CString = CString::from_vec_unchecked(b"{ }".to_vec()); *response_size = empty_result.as_bytes().len(); *response = empty_result.into_raw() as *mut u8; @@ -1296,54 +1301,50 @@ impl IotHubClient { debug!("Received direct method call: {method_name:?} with payload: {payload}"); - let client = &mut *(context as *mut IotHubClient); + let (tx_result, rx_result) = oneshot::channel::>>(); - if let Some(tx_direct_method) = &client.tx_direct_method { - let (tx_result, rx_result) = oneshot::channel::>>(); + tx_direct_method + .blocking_send(DirectMethod { + name: method_name.to_string(), + payload, + responder: tx_result, + }) + .expect("c_direct_method_callback: cannot blocking_send"); - tx_direct_method - .blocking_send(DirectMethod { - name: method_name.to_string(), - payload, - responder: tx_result, - }) - .expect("c_direct_method_callback: cannot blocking_send"); - - match rx_result.blocking_recv() { - Ok(Ok(None)) => { - debug!("direct method has no result"); - return METHOD_RESPONSE_SUCCESS; - } - Ok(Ok(Some(result))) => { - debug!("direct method result: {result:?}"); - - match CString::new(result.to_string()) { - Ok(r) => { - *response_size = r.as_bytes().len(); - *response = r.into_raw() as *mut u8; - return METHOD_RESPONSE_SUCCESS; - } - Err(e) => { - error!("cannot parse direct method result: {e}"); - } + match rx_result.blocking_recv() { + Ok(Ok(None)) => { + debug!("direct method has no result"); + return METHOD_RESPONSE_SUCCESS; + } + Ok(Ok(Some(result))) => { + debug!("direct method result: {result:?}"); + + match CString::new(result.to_string()) { + Ok(r) => { + *response_size = r.as_bytes().len(); + *response = r.into_raw() as *mut u8; + return METHOD_RESPONSE_SUCCESS; + } + Err(e) => { + error!("cannot parse direct method result: {e}"); } } - Ok(Err(e)) => { - error!("direct method error: {e:?}"); + } + Ok(Err(e)) => { + error!("direct method error: {e:?}"); - match CString::new(json!(e.to_string()).to_string()) { - Ok(r) => { - *response_size = r.as_bytes().len(); - *response = r.into_raw() as *mut u8; - } - Err(e) => { - error!("cannot parse direct method result: {e}"); - } + match CString::new(json!(e.to_string()).to_string()) { + Ok(r) => { + *response_size = r.as_bytes().len(); + *response = r.into_raw() as *mut u8; + } + Err(e) => { + error!("cannot parse direct method result: {e}"); } } - Err(e) => { - error!("channel unexpectedly closed: {e}"); - } + } + Err(e) => { + error!("channel unexpectedly closed: {e}"); } } @@ -1354,8 +1355,7 @@ impl IotHubClient { status: IOTHUB_CLIENT_CONFIRMATION_RESULT, context: *mut std::ffi::c_void, ) { - let result: Box> = - Box::from_raw(context as *mut oneshot::Sender); + let result = Box::from_raw(context as *mut oneshot::Sender); let mut succeeded = false; match status {