Skip to content

Commit

Permalink
flowd-rs: Add runtime.packet() delivery into graph inports (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
ERnsTL committed Sep 1, 2022
1 parent 0afb62b commit 9bcfd0b
Showing 1 changed file with 21 additions and 3 deletions.
24 changes: 21 additions & 3 deletions flowd-rs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1886,11 +1886,29 @@ impl RuntimeRuntimePayload {
Ok(String::from("")) //TODO how to indicate "empty"? Does it maybe require at least "[]" or "{}"?
}

fn packet(&mut self, payload: &RuntimePacketRequestPayload) -> std::result::Result<(), std::io::Error> {
fn packet(&mut self, payload: &RuntimePacketRequestPayload, graph_inout: &mut GraphInportOutportHolder) -> std::result::Result<(), std::io::Error> {
//TODO check if graph exists and if that port actually exists
//TODO check payload datatype, schema, event (?) etc.
//TODO implement and deliver to destination process
info!("runtime: got a packet: {:?}", payload);
Ok(())
info!("runtime: got a packet for port {}: {:?}", payload.port, payload.payload);
// deliver to destination process
if let Some(inports) = graph_inout.inports.as_mut() {
if let Some(inport) = inports.get_mut(payload.port.as_str()) {
while inport.sink.is_full() {
// wait until non-full
//TODO optimize
inport.wakeup.as_ref().unwrap().unpark(); //TODO optimize
thread::yield_now();
}
inport.sink.push(payload.payload.clone().into()).expect("push packet from graph inport into component failed");
inport.wakeup.as_ref().unwrap().unpark(); //TODO optimize
return Ok(());
} else {
return Err(std::io::Error::new(std::io::ErrorKind::NotFound, String::from("graph inport with that name not found")));
}
} else {
return Err(std::io::Error::new(std::io::ErrorKind::NotFound, String::from("no graph inports exist")));
}
}

//TODO the payload has unusual type -> can we really re-use it? Unify these three: RuntimePacketRequestPayload, RuntimePacketResponsePayload, RuntimePacketsentResponsePayload?
Expand Down

0 comments on commit 9bcfd0b

Please sign in to comment.