Skip to content

Commit

Permalink
Partially refactor input port opening
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelSarle committed Nov 29, 2024
1 parent 33431fd commit 0f5c529
Showing 1 changed file with 81 additions and 17 deletions.
98 changes: 81 additions & 17 deletions lib/protoflow-zeromq/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub use protoflow_core::prelude;
extern crate std;

use protoflow_core::{
prelude::{Arc, BTreeMap, Bytes, String, ToString, Vec},
prelude::{vec, Arc, BTreeMap, Bytes, String, ToString, Vec},
InputPortID, OutputPortID, PortError, PortResult, PortState, Transport,
};

Expand Down Expand Up @@ -74,7 +74,7 @@ enum ZmqInputPortState {
Open(
// TODO: hide these
Arc<SyncSender<ZmqTransportEvent>>,
Arc<Mutex<Receiver<ZmqInputPortEvent>>>,
Arc<Mutex<Receiver<ZmqTransportEvent>>>,
),
Connected(
// channels for requests from public close
Expand All @@ -85,7 +85,7 @@ enum ZmqInputPortState {
Arc<Mutex<Receiver<ZmqInputPortEvent>>>,
// internal channels for events
Arc<SyncSender<ZmqTransportEvent>>,
Arc<Mutex<Receiver<ZmqInputPortEvent>>>,
Arc<Mutex<Receiver<ZmqTransportEvent>>>,
// vec of the connected port ids
Vec<OutputPortID>,
),
Expand Down Expand Up @@ -405,6 +405,83 @@ impl ZmqTransport {

Ok((to_worker_send, from_worker_recv))
}

fn start_input_worker(&self, input_port_id: InputPortID) -> Result<(), PortError> {
let topic = format!("{}:", input_port_id);

let (to_worker_send, to_worker_recv) = sync_channel(1);
let to_worker_send = Arc::new(to_worker_send);
let to_worker_recv = Arc::new(Mutex::new(to_worker_recv));

{
let mut inputs = self.inputs.write();
let state = ZmqInputPortState::Open(to_worker_send.clone(), to_worker_recv.clone());
let state = RwLock::new(state);
inputs.insert(input_port_id, state);
}

let inputs = self.inputs.clone();
tokio::task::spawn(async move {
let input = &to_worker_recv;

let inputs = inputs;

loop {
let event: ZmqTransportEvent = input.lock().recv().expect("input worker recv");
use ZmqTransportEvent::*;
match event {
Connect(output_port_id, input_port_id) => {
let inputs = inputs.read();
let Some(input_state) = inputs.get(&input_port_id) else {
todo!();
};
let input_state = input_state.write();

use ZmqInputPortState::*;
match &*input_state {
Open(_, _) => {
let (req_send, req_recv) = sync_channel(1);
let req_send = Arc::new(req_send);
let req_recv = Arc::new(Mutex::new(req_recv));

let (msgs_send, msgs_recv) = sync_channel(1);

let msgs_send = Arc::new(msgs_send);
let msgs_recv = Arc::new(Mutex::new(msgs_recv));

let mut input_state = input_state;

*input_state = ZmqInputPortState::Connected(
req_send,
req_recv,
msgs_send,
msgs_recv,
to_worker_send.clone(),
input.clone(),
vec![output_port_id],
);
}
Connected(_, _, _, _, _, _, _) => todo!(),
Closed => todo!(),
}
}
AckConnection(output_port_id, input_port_id) => todo!(),
Message(output_port_id, input_port_id, _, bytes) => todo!(),
AckMessage(output_port_id, input_port_id, _) => todo!(),
CloseOutput(output_port_id, input_port_id) => todo!(),
CloseInput(input_port_id) => todo!(),
};
}
});

// send sub request
self.tokio
.block_on(
self.sub_queue
.send(ZmqSubscriptionRequest::Subscribe(topic)),
)
.map_err(|e| PortError::Other(e.to_string()))
}
}

impl Transport for ZmqTransport {
Expand All @@ -430,20 +507,7 @@ impl Transport for ZmqTransport {
let new_id = InputPortID::try_from(-(inputs.len() as isize + 1))
.map_err(|e| PortError::Other(e.to_string()))?;

let (_, receiver) = self
.subscribe_for_input_port(new_id)
.map_err(|e| PortError::Other(e.to_string()))?;

loop {
let msg = receiver
.lock()
.recv()
.map_err(|e| PortError::Other(e.to_string()))?;
match msg {
ZmqInputPortEvent::Opened => break Ok(new_id),
_ => continue, // TODO
}
}
self.start_input_worker(new_id).map(|_| new_id)
}

fn open_output(&self) -> PortResult<OutputPortID> {
Expand Down

0 comments on commit 0f5c529

Please sign in to comment.