diff --git a/runner/compute_data_runner/src/program.rs b/runner/compute_data_runner/src/program.rs index f6ff1ec..5f7a81b 100644 --- a/runner/compute_data_runner/src/program.rs +++ b/runner/compute_data_runner/src/program.rs @@ -130,7 +130,7 @@ impl BatchProgram { } new_batch.size = entry_count; - //write another + //write outgoing if new_batch.size >0 { if let Err(e) = out_going_tx.send(new_batch) { error!("send data {}", e); @@ -272,7 +272,7 @@ impl BatchProgram { } new_batch.size = entry_count; - //write another + //write outgoing if new_batch.size >0 { if let Err(e) = out_going_tx.send(new_batch) { error!("send data {}", e); @@ -295,6 +295,7 @@ impl BatchProgram { }); Ok(()) } + pub(crate) async fn track_output_data(&mut self) ->Result<()> { let upstreams = self.upstreams.as_ref().expect("input output node must have incoming nodes");