From 9bcfd0bd5cee07fb2c6c68a4bce52146d7072b4b Mon Sep 17 00:00:00 2001 From: Ernst Rohlicek Date: Fri, 2 Sep 2022 00:28:29 +0200 Subject: [PATCH] flowd-rs: Add runtime.packet() delivery into graph inports (#207) --- flowd-rs/src/main.rs | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/flowd-rs/src/main.rs b/flowd-rs/src/main.rs index d8c8262..7c58073 100644 --- a/flowd-rs/src/main.rs +++ b/flowd-rs/src/main.rs @@ -1886,11 +1886,29 @@ impl RuntimeRuntimePayload { Ok(String::from("")) //TODO how to indicate "empty"? Does it maybe require at least "[]" or "{}"? } - fn packet(&mut self, payload: &RuntimePacketRequestPayload) -> std::result::Result<(), std::io::Error> { + fn packet(&mut self, payload: &RuntimePacketRequestPayload, graph_inout: &mut GraphInportOutportHolder) -> std::result::Result<(), std::io::Error> { //TODO check if graph exists and if that port actually exists + //TODO check payload datatype, schema, event (?) etc. //TODO implement and deliver to destination process - info!("runtime: got a packet: {:?}", payload); - Ok(()) + info!("runtime: got a packet for port {}: {:?}", payload.port, payload.payload); + // deliver to destination process + if let Some(inports) = graph_inout.inports.as_mut() { + if let Some(inport) = inports.get_mut(payload.port.as_str()) { + while inport.sink.is_full() { + // wait until non-full + //TODO optimize + inport.wakeup.as_ref().unwrap().unpark(); //TODO optimize + thread::yield_now(); + } + inport.sink.push(payload.payload.clone().into()).expect("push packet from graph inport into component failed"); + inport.wakeup.as_ref().unwrap().unpark(); //TODO optimize + return Ok(()); + } else { + return Err(std::io::Error::new(std::io::ErrorKind::NotFound, String::from("graph inport with that name not found"))); + } + } else { + return Err(std::io::Error::new(std::io::ErrorKind::NotFound, String::from("no graph inports exist"))); + } } //TODO the payload has unusual type -> can we really re-use it? Unify these three: RuntimePacketRequestPayload, RuntimePacketResponsePayload, RuntimePacketsentResponsePayload?