diff --git a/flowd-rs/src/main.rs b/flowd-rs/src/main.rs index d8c8262..7c58073 100644 --- a/flowd-rs/src/main.rs +++ b/flowd-rs/src/main.rs @@ -1886,11 +1886,29 @@ impl RuntimeRuntimePayload { Ok(String::from("")) //TODO how to indicate "empty"? Does it maybe require at least "[]" or "{}"? } - fn packet(&mut self, payload: &RuntimePacketRequestPayload) -> std::result::Result<(), std::io::Error> { + fn packet(&mut self, payload: &RuntimePacketRequestPayload, graph_inout: &mut GraphInportOutportHolder) -> std::result::Result<(), std::io::Error> { //TODO check if graph exists and if that port actually exists + //TODO check payload datatype, schema, event (?) etc. //TODO implement and deliver to destination process - info!("runtime: got a packet: {:?}", payload); - Ok(()) + info!("runtime: got a packet for port {}: {:?}", payload.port, payload.payload); + // deliver to destination process + if let Some(inports) = graph_inout.inports.as_mut() { + if let Some(inport) = inports.get_mut(payload.port.as_str()) { + while inport.sink.is_full() { + // wait until non-full + //TODO optimize + inport.wakeup.as_ref().unwrap().unpark(); //TODO optimize + thread::yield_now(); + } + inport.sink.push(payload.payload.clone().into()).expect("push packet from graph inport into component failed"); + inport.wakeup.as_ref().unwrap().unpark(); //TODO optimize + return Ok(()); + } else { + return Err(std::io::Error::new(std::io::ErrorKind::NotFound, String::from("graph inport with that name not found"))); + } + } else { + return Err(std::io::Error::new(std::io::ErrorKind::NotFound, String::from("no graph inports exist"))); + } } //TODO the payload has unusual type -> can we really re-use it? Unify these three: RuntimePacketRequestPayload, RuntimePacketResponsePayload, RuntimePacketsentResponsePayload?