Skip to content

Commit

Permalink
fixed sender
Browse files Browse the repository at this point in the history
  • Loading branch information
JanZachmann committed Apr 22, 2024
1 parent 8be8b54 commit 963a10c
Showing 1 changed file with 32 additions and 35 deletions.
67 changes: 32 additions & 35 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,11 @@ pub enum TwinUpdateState {
Partial = 1,
}

/// Used to update [desired properties](https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-devguide-module-twins#back-end-operations) to the client
pub struct TwinUpdate {
/// type of update [`TwinUpdateState`]
pub state: TwinUpdateState,
/// value
pub value: serde_json::Value,
}

Expand Down Expand Up @@ -153,15 +156,15 @@ pub type IotMessageSender = mpsc::Sender<IncomingIotMessage>;
/// Provides a channel and a property array to receive incoming cloud to device messages
#[derive(Clone, Debug)]
pub struct IncomingMessageObserver {
observer: IotMessageSender,
responder: IotMessageSender,
properties: Vec<String>,
}

impl IncomingMessageObserver {
/// Creates a new instance of [`IncomingMessageObserver`]
pub fn new(observer: IotMessageSender, properties: Vec<String>) -> Self {
pub fn new(responder: IotMessageSender, properties: Vec<String>) -> Self {
IncomingMessageObserver {
observer,
responder,
properties,
}
}
Expand Down Expand Up @@ -222,10 +225,10 @@ struct RetrySetting {
/// ```
#[derive(Debug, Default)]
pub struct IotHubClientBuilder {
tx_connection_status: Option<mpsc::Sender<AuthenticationStatus>>,
tx_twin_desired: Option<mpsc::Sender<TwinUpdate>>,
tx_direct_method: Option<DirectMethodSender>,
tx_incoming_message: Option<IncomingMessageObserver>,
tx_connection_status: Option<Box<mpsc::Sender<AuthenticationStatus>>>,
tx_twin_desired: Option<Box<mpsc::Sender<TwinUpdate>>>,
tx_direct_method: Option<Box<DirectMethodSender>>,
tx_incoming_message: Option<Box<IncomingMessageObserver>>,
model_id: Option<&'static str>,
retry_setting: Option<RetrySetting>,
}
Expand Down Expand Up @@ -460,7 +463,7 @@ impl IotHubClientBuilder {
mut self,
tx_connection_status: mpsc::Sender<AuthenticationStatus>,
) -> Self {
self.tx_connection_status = Some(tx_connection_status);
self.tx_connection_status = Some(Box::new(tx_connection_status));
self
}

Expand Down Expand Up @@ -491,11 +494,8 @@ impl IotHubClientBuilder {
/// }
/// }
/// ```
pub fn observe_desired_properties(
mut self,
tx_twin_desired: mpsc::Sender<TwinUpdate>,
) -> Self {
self.tx_twin_desired = Some(tx_twin_desired);
pub fn observe_desired_properties(mut self, tx_twin_desired: mpsc::Sender<TwinUpdate>) -> Self {
self.tx_twin_desired = Some(Box::new(tx_twin_desired));
self
}

Expand Down Expand Up @@ -536,7 +536,7 @@ impl IotHubClientBuilder {
/// }
/// ```
pub fn observe_direct_methods(mut self, tx_direct_method: DirectMethodSender) -> Self {
self.tx_direct_method = Some(tx_direct_method);
self.tx_direct_method = Some(Box::new(tx_direct_method));
self
}

Expand Down Expand Up @@ -580,7 +580,7 @@ impl IotHubClientBuilder {
mut self,
tx_incoming_message: IncomingMessageObserver,
) -> Self {
self.tx_incoming_message = Some(tx_incoming_message);
self.tx_incoming_message = Some(Box::new(tx_incoming_message));
self
}

Expand Down Expand Up @@ -697,10 +697,10 @@ impl IotHubClientBuilder {
/// ```
pub struct IotHubClient {
twin: Box<dyn Twin>,
tx_connection_status: Option<mpsc::Sender<AuthenticationStatus>>,
tx_twin_desired: Option<mpsc::Sender<TwinUpdate>>,
tx_direct_method: Option<DirectMethodSender>,
tx_incoming_message: Option<IncomingMessageObserver>,
tx_connection_status: Option<Box<mpsc::Sender<AuthenticationStatus>>>,
tx_twin_desired: Option<Box<mpsc::Sender<TwinUpdate>>>,
tx_direct_method: Option<Box<DirectMethodSender>>,
tx_incoming_message: Option<Box<IncomingMessageObserver>>,
model_id: Option<&'static str>,
retry_setting: Option<RetrySetting>,
confirmation_set: JoinSet<()>,
Expand Down Expand Up @@ -1013,31 +1013,31 @@ impl IotHubClient {
}

fn set_callbacks(&mut self) -> Result<()> {
if let Some(tx) = &self.tx_connection_status {
if let Some(tx) = self.tx_connection_status.as_deref_mut() {
self.twin.set_connection_status_callback(
Some(IotHubClient::c_connection_status_callback),
Box::into_raw(Box::new(tx.clone())) as *mut c_void,
tx as *mut mpsc::Sender<AuthenticationStatus> as *mut c_void,
)?;
}

if let Some(tx) = &self.tx_incoming_message {
if let Some(tx) = self.tx_incoming_message.as_deref_mut() {
self.twin.set_input_message_callback(
Some(IotHubClient::c_c2d_message_callback),
Box::into_raw(Box::new(tx.clone())) as *mut c_void,
tx as *mut IncomingMessageObserver as *mut c_void,
)?;
}

if let Some(tx) = &self.tx_twin_desired {
if let Some(tx) = self.tx_twin_desired.as_deref_mut() {
self.twin.set_twin_callback(
Some(IotHubClient::c_twin_callback),
Box::into_raw(Box::new(tx.clone())) as *mut c_void,
tx as *mut mpsc::Sender<TwinUpdate> as *mut c_void,
)?;
}

if let Some(tx) = &self.tx_direct_method {
if let Some(tx) = self.tx_direct_method.as_deref_mut() {
self.twin.set_method_callback(
Some(IotHubClient::c_direct_method_callback),
Box::into_raw(Box::new(tx.clone())) as *mut c_void,
tx as *mut DirectMethodSender as *mut c_void,
)?;
}

Expand Down Expand Up @@ -1099,7 +1099,7 @@ impl IotHubClient {
status_reason: IOTHUB_CLIENT_CONNECTION_STATUS_REASON,
context: *mut ::std::os::raw::c_void,
) {
let tx = Box::from_raw(context as *mut tokio::sync::mpsc::Sender<AuthenticationStatus>);
let tx = &mut *(context as *mut mpsc::Sender<AuthenticationStatus>);

let status = match connection_status {
IOTHUB_CLIENT_CONNECTION_STATUS_TAG_IOTHUB_CLIENT_CONNECTION_AUTHENTICATED => {
Expand Down Expand Up @@ -1151,7 +1151,7 @@ impl IotHubClient {
handle: *mut IOTHUB_MESSAGE_HANDLE_DATA_TAG,
context: *mut ::std::os::raw::c_void,
) -> IOTHUBMESSAGE_DISPOSITION_RESULT {
let observer = Box::from_raw(context as *mut IncomingMessageObserver);
let observer = &mut *(context as *mut IncomingMessageObserver);
let mut property_keys: Vec<CString> = vec![];

for property in &observer.properties {
Expand All @@ -1173,7 +1173,7 @@ impl IotHubClient {
let (tx_result, rx_result) = oneshot::channel::<Result<DispositionResult>>();

observer
.observer
.responder
.blocking_send(IncomingIotMessage {
inner: msg,
responder: tx_result,
Expand Down Expand Up @@ -1216,9 +1216,7 @@ impl IotHubClient {
size: usize,
context: *mut ::std::os::raw::c_void,
) {
let tx = Box::from_raw(
context as *mut tokio::sync::mpsc::Sender<TwinUpdate>,
);
let tx = &mut *(context as *mut mpsc::Sender<TwinUpdate>);

match String::from_utf8(slice::from_raw_parts(payload, size).to_vec()) {
Ok(desired_string) => {
Expand Down Expand Up @@ -1269,8 +1267,7 @@ impl IotHubClient {
const METHOD_RESPONSE_SUCCESS: i32 = 200;
const METHOD_RESPONSE_ERROR: i32 = 401;

let tx_direct_method =
Box::from_raw(context as *mut tokio::sync::mpsc::Sender<DirectMethod>);
let tx_direct_method = &mut *(context as *mut DirectMethodSender);

let empty_result: CString = CString::from_vec_unchecked(b"{ }".to_vec());
*response_size = empty_result.as_bytes().len();
Expand Down

0 comments on commit 963a10c

Please sign in to comment.