Skip to content

Commit

Permalink
fix: fixed usage of tx channels in callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
JanZachmann committed Apr 22, 2024
1 parent cc98463 commit 8be8b54
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 126 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ authors = ["[email protected]>"]
edition = "2021"
name = "azure-iot-sdk"
repository = "[email protected]:omnect/azure-iot-sdk.git"
version = "0.12.0"
version = "0.12.1"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -19,6 +19,7 @@ url = "2.4"

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
azure-iot-sdk = { path = ".", features = ["module_client"] }

[features]
# select either "module_client", "edge_client" or "device_client" functionality
Expand Down
250 changes: 125 additions & 125 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ pub enum TwinUpdateState {
Partial = 1,
}

pub struct TwinUpdate {
pub state: TwinUpdateState,
pub value: serde_json::Value,
}

/// Reason for unauthenticated connection result
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum UnauthenticatedReason {
Expand Down Expand Up @@ -218,7 +223,7 @@ struct RetrySetting {
#[derive(Debug, Default)]
pub struct IotHubClientBuilder {
tx_connection_status: Option<mpsc::Sender<AuthenticationStatus>>,
tx_twin_desired: Option<mpsc::Sender<(TwinUpdateState, serde_json::Value)>>,
tx_twin_desired: Option<mpsc::Sender<TwinUpdate>>,
tx_direct_method: Option<DirectMethodSender>,
tx_incoming_message: Option<IncomingMessageObserver>,
model_id: Option<&'static str>,
Expand Down Expand Up @@ -488,7 +493,7 @@ impl IotHubClientBuilder {
/// ```
pub fn observe_desired_properties(
mut self,
tx_twin_desired: mpsc::Sender<(TwinUpdateState, serde_json::Value)>,
tx_twin_desired: mpsc::Sender<TwinUpdate>,
) -> Self {
self.tx_twin_desired = Some(tx_twin_desired);
self
Expand Down Expand Up @@ -693,7 +698,7 @@ impl IotHubClientBuilder {
pub struct IotHubClient {
twin: Box<dyn Twin>,
tx_connection_status: Option<mpsc::Sender<AuthenticationStatus>>,
tx_twin_desired: Option<mpsc::Sender<(TwinUpdateState, serde_json::Value)>>,
tx_twin_desired: Option<mpsc::Sender<TwinUpdate>>,
tx_direct_method: Option<DirectMethodSender>,
tx_incoming_message: Option<IncomingMessageObserver>,
model_id: Option<&'static str>,
Expand Down Expand Up @@ -1008,27 +1013,32 @@ impl IotHubClient {
}

fn set_callbacks(&mut self) -> Result<()> {
let context = self as *mut IotHubClient as *mut c_void;

if self.tx_connection_status.is_some() {
if let Some(tx) = &self.tx_connection_status {
self.twin.set_connection_status_callback(
Some(IotHubClient::c_connection_status_callback),
context,
Box::into_raw(Box::new(tx.clone())) as *mut c_void,
)?;
}

if self.tx_incoming_message.is_some() {
self.twin
.set_input_message_callback(Some(IotHubClient::c_c2d_message_callback), context)?;
if let Some(tx) = &self.tx_incoming_message {
self.twin.set_input_message_callback(
Some(IotHubClient::c_c2d_message_callback),
Box::into_raw(Box::new(tx.clone())) as *mut c_void,
)?;
}

if self.tx_twin_desired.is_some() {
self.twin
.set_twin_callback(Some(IotHubClient::c_twin_callback), context)?;
if let Some(tx) = &self.tx_twin_desired {
self.twin.set_twin_callback(
Some(IotHubClient::c_twin_callback),
Box::into_raw(Box::new(tx.clone())) as *mut c_void,
)?;
}
if self.tx_direct_method.is_some() {
self.twin
.set_method_callback(Some(IotHubClient::c_direct_method_callback), context)?;

if let Some(tx) = &self.tx_direct_method {
self.twin.set_method_callback(
Some(IotHubClient::c_direct_method_callback),
Box::into_raw(Box::new(tx.clone())) as *mut c_void,
)?;
}

Ok(())
Expand Down Expand Up @@ -1089,7 +1099,7 @@ impl IotHubClient {
status_reason: IOTHUB_CLIENT_CONNECTION_STATUS_REASON,
context: *mut ::std::os::raw::c_void,
) {
let client = &mut *(context as *mut IotHubClient);
let tx = Box::from_raw(context as *mut tokio::sync::mpsc::Sender<AuthenticationStatus>);

let status = match connection_status {
IOTHUB_CLIENT_CONNECTION_STATUS_TAG_IOTHUB_CLIENT_CONNECTION_AUTHENTICATED => {
Expand Down Expand Up @@ -1133,81 +1143,70 @@ impl IotHubClient {

debug!("Received connection status: {status:?}");

if let Some(tx) = &client.tx_connection_status {
tx.blocking_send(status)
.expect("c_connection_status_callback: cannot blocking_send");
}
tx.blocking_send(status)
.expect("c_connection_status_callback: cannot blocking_send");
}

unsafe extern "C" fn c_c2d_message_callback(
handle: *mut IOTHUB_MESSAGE_HANDLE_DATA_TAG,
context: *mut ::std::os::raw::c_void,
) -> IOTHUBMESSAGE_DISPOSITION_RESULT {
let client = &mut *(context as *mut IotHubClient);
let observer = Box::from_raw(context as *mut IncomingMessageObserver);
let mut property_keys: Vec<CString> = vec![];

if let Some(observer) = &client.tx_incoming_message {
let mut property_keys: Vec<CString> = vec![];
for property in &observer.properties {
match CString::new(property.clone()) {
Ok(p) => property_keys.push(p),
Err(e) => {
error!(
"invalid property in c2d message received. payload: {property}, error: {e}"
);
return IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED;
}
for property in &observer.properties {
match CString::new(property.clone()) {
Ok(p) => property_keys.push(p),
Err(e) => {
error!(
"invalid property in c2d message received. payload: {property}, error: {e}"
);
return IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED;
}
}
}

match IotMessage::from_incoming_handle(handle, property_keys) {
Ok(msg) => {
debug!("Received message from iothub: {msg:?}");
match IotMessage::from_incoming_handle(handle, property_keys) {
Ok(msg) => {
debug!("Received message from iothub: {msg:?}");

let (tx_result, rx_result) = oneshot::channel::<Result<DispositionResult>>();
let (tx_result, rx_result) = oneshot::channel::<Result<DispositionResult>>();

observer
.observer
.blocking_send(IncomingIotMessage {
inner: msg,
responder: tx_result,
})
.expect("c_c2d_message_callback: cannot blocking_send");
observer
.observer
.blocking_send(IncomingIotMessage {
inner: msg,
responder: tx_result,
})
.expect("c_c2d_message_callback: cannot blocking_send");

match rx_result.blocking_recv() {
Ok(Ok(DispositionResult::Accepted)) => {
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_ACCEPTED
}
Ok(Ok(DispositionResult::Rejected)) => {
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED
}
Ok(Ok(DispositionResult::Abandoned)) => {
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_ABANDONED
}
Ok(Ok(DispositionResult::AsyncAck)) => {
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_ASYNC_ACK
}
Ok(Err(e)) => {
error!("cannot handle c2d message: {e}");
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED
}
Err(e) => {
error!("channel unexpectedly closed: {e}");
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED
}
match rx_result.blocking_recv() {
Ok(Ok(DispositionResult::Accepted)) => {
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_ACCEPTED
}
Ok(Ok(DispositionResult::Rejected)) => {
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED
}
Ok(Ok(DispositionResult::Abandoned)) => {
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_ABANDONED
}
Ok(Ok(DispositionResult::AsyncAck)) => {
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_ASYNC_ACK
}
Ok(Err(e)) => {
error!("cannot handle c2d message: {e}");
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED
}
Err(e) => {
error!("channel unexpectedly closed: {e}");
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED
}
}
Err(e) => {
error!("cannot create IotMessage from incomming handle: {e}");
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED
}
}
} else {
match IotMessage::from_incoming_handle(handle, vec![]) {
Ok(msg) => debug!("Received message from iothub: {msg:?}"),
Err(e) => error!("cannot create IotMessage from incomming handle: {e}"),
Err(e) => {
error!("cannot create IotMessage from incomming handle: {e}");
IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED
}

IOTHUBMESSAGE_DISPOSITION_RESULT_TAG_IOTHUBMESSAGE_REJECTED
}
}

Expand All @@ -1217,21 +1216,25 @@ impl IotHubClient {
size: usize,
context: *mut ::std::os::raw::c_void,
) {
let tx = Box::from_raw(
context as *mut tokio::sync::mpsc::Sender<TwinUpdate>,
);

match String::from_utf8(slice::from_raw_parts(payload, size).to_vec()) {
Ok(desired_string) => {
match serde_json::from_str::<serde_json::Value>(&desired_string) {
Ok(desired_json) => {
let client = &mut *(context as *mut IotHubClient);
let desired_state: TwinUpdateState = mem::transmute(state as i8);

debug!(
"Twin callback. state: {desired_state:?} size: {size} payload: {desired_json}"
);

if let Some(tx) = &client.tx_twin_desired {
tx.blocking_send((desired_state, desired_json))
.expect("c_twin_callback: cannot blocking_send");
}
tx.blocking_send(TwinUpdate {
state: desired_state,
value: desired_json,
})
.expect("c_twin_callback: cannot blocking_send");
}
Err(e) => error!(
"desired twin cannot be parsed. payload: {desired_string} error: {e}"
Expand All @@ -1248,8 +1251,7 @@ impl IotHubClient {
) {
trace!("SendReportedTwin result: {status_code}");

let result: Box<oneshot::Sender<bool>> =
Box::from_raw(context as *mut oneshot::Sender<bool>);
let result = Box::from_raw(context as *mut oneshot::Sender<bool>);

result
.send(status_code == 204)
Expand All @@ -1267,6 +1269,9 @@ impl IotHubClient {
const METHOD_RESPONSE_SUCCESS: i32 = 200;
const METHOD_RESPONSE_ERROR: i32 = 401;

let tx_direct_method =
Box::from_raw(context as *mut tokio::sync::mpsc::Sender<DirectMethod>);

let empty_result: CString = CString::from_vec_unchecked(b"{ }".to_vec());
*response_size = empty_result.as_bytes().len();
*response = empty_result.into_raw() as *mut u8;
Expand Down Expand Up @@ -1296,54 +1301,50 @@ impl IotHubClient {

debug!("Received direct method call: {method_name:?} with payload: {payload}");

let client = &mut *(context as *mut IotHubClient);
let (tx_result, rx_result) = oneshot::channel::<Result<Option<serde_json::Value>>>();

if let Some(tx_direct_method) = &client.tx_direct_method {
let (tx_result, rx_result) = oneshot::channel::<Result<Option<serde_json::Value>>>();
tx_direct_method
.blocking_send(DirectMethod {
name: method_name.to_string(),
payload,
responder: tx_result,
})
.expect("c_direct_method_callback: cannot blocking_send");

tx_direct_method
.blocking_send(DirectMethod {
name: method_name.to_string(),
payload,
responder: tx_result,
})
.expect("c_direct_method_callback: cannot blocking_send");

match rx_result.blocking_recv() {
Ok(Ok(None)) => {
debug!("direct method has no result");
return METHOD_RESPONSE_SUCCESS;
}
Ok(Ok(Some(result))) => {
debug!("direct method result: {result:?}");

match CString::new(result.to_string()) {
Ok(r) => {
*response_size = r.as_bytes().len();
*response = r.into_raw() as *mut u8;
return METHOD_RESPONSE_SUCCESS;
}
Err(e) => {
error!("cannot parse direct method result: {e}");
}
match rx_result.blocking_recv() {
Ok(Ok(None)) => {
debug!("direct method has no result");
return METHOD_RESPONSE_SUCCESS;
}
Ok(Ok(Some(result))) => {
debug!("direct method result: {result:?}");

match CString::new(result.to_string()) {
Ok(r) => {
*response_size = r.as_bytes().len();
*response = r.into_raw() as *mut u8;
return METHOD_RESPONSE_SUCCESS;
}
Err(e) => {
error!("cannot parse direct method result: {e}");
}
}
Ok(Err(e)) => {
error!("direct method error: {e:?}");
}
Ok(Err(e)) => {
error!("direct method error: {e:?}");

match CString::new(json!(e.to_string()).to_string()) {
Ok(r) => {
*response_size = r.as_bytes().len();
*response = r.into_raw() as *mut u8;
}
Err(e) => {
error!("cannot parse direct method result: {e}");
}
match CString::new(json!(e.to_string()).to_string()) {
Ok(r) => {
*response_size = r.as_bytes().len();
*response = r.into_raw() as *mut u8;
}
Err(e) => {
error!("cannot parse direct method result: {e}");
}
}
Err(e) => {
error!("channel unexpectedly closed: {e}");
}
}
Err(e) => {
error!("channel unexpectedly closed: {e}");
}
}

Expand All @@ -1354,8 +1355,7 @@ impl IotHubClient {
status: IOTHUB_CLIENT_CONFIRMATION_RESULT,
context: *mut std::ffi::c_void,
) {
let result: Box<oneshot::Sender<bool>> =
Box::from_raw(context as *mut oneshot::Sender<bool>);
let result = Box::from_raw(context as *mut oneshot::Sender<bool>);
let mut succeeded = false;

match status {
Expand Down

0 comments on commit 8be8b54

Please sign in to comment.