Skip to content

Commit

Permalink
fix: losing data in stream
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 2, 2024
1 parent 5ba0fe9 commit a6ed928
Show file tree
Hide file tree
Showing 14 changed files with 219 additions and 139 deletions.
3 changes: 2 additions & 1 deletion crates/compute_unit_runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ hyperlocal = "0.9.1"
hyper = "1.4.1"
hyper-util = "0.1.6"
http-body-util = "0.1.2"
walkdir = "2.5.0"
walkdir = "2.5.0"
rand = "0.8.5"
2 changes: 2 additions & 0 deletions crates/compute_unit_runner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod ipc;
pub mod media_data_tracker;
mod mprc;
mod multi_sender;
pub mod stream;
34 changes: 15 additions & 19 deletions crates/compute_unit_runner/src/media_data_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::ipc::{AvaiableDataResponse, CompleteDataReq, SubmitOuputDataReq};
use crate::mprc::Mprs;
use anyhow::{Ok, Result};
use jz_action::core::models::{DataRecord, DataState, DbRepo, Direction, NodeRepo, TrackerState};
use jz_action::network::common::Empty;
Expand All @@ -9,18 +10,19 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::error::TrySendError;
use tonic::Status;
use tracing::{debug, error, info, warn};
use walkdir::WalkDir;

use crate::multi_sender::MultiSender;
use tokio::fs;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use tokio::sync::{broadcast, oneshot};
use tokio::time::{self, sleep};
use tokio::time::{self, sleep, Instant};
use tokio_stream::StreamExt;


pub struct MediaDataTracker<R>
where
R: DbRepo,
Expand Down Expand Up @@ -48,16 +50,12 @@ where
// channel for submit output data
pub(crate) ipc_process_submit_output_tx:
Option<mpsc::Sender<(SubmitOuputDataReq, oneshot::Sender<Result<()>>)>>,

pub(crate) out_going_tx: broadcast::Sender<MediaDataBatchResponse>, //receive data from upstream and send it to program with this
}
impl<R> MediaDataTracker<R>
where
R: DbRepo,
{
pub fn new(repo: R, name: &str, tmp_store: PathBuf) -> Self {
let out_going_tx = broadcast::Sender::new(128);

MediaDataTracker {
tmp_store,
name: name.to_string(),
Expand All @@ -68,7 +66,6 @@ where
ipc_process_submit_output_tx: None,
ipc_process_completed_data_tx: None,
ipc_process_data_req_tx: None,
out_going_tx: out_going_tx,
}
}
}
Expand All @@ -93,7 +90,6 @@ where
//process outgoing data
let db_repo = self.repo.clone();
let tmp_store = self.tmp_store.clone();
let out_going_tx = self.out_going_tx.clone();
let downstreams = self.downstreams.clone(); //make dynamic downstreams?
let node_name = self.name.clone();
let new_data_tx = new_data_tx.clone();
Expand All @@ -114,14 +110,15 @@ where
}
});

let mut multi_sender = MultiSender::new(downstreams.clone());
tokio::spawn(async move {
loop {
tokio::select! {
Some(()) = new_data_rx.recv() => {
println!("start to send data");
//reconstruct batch
//TODO combine multiple batch
loop {
let now = Instant::now();
match db_repo.find_and_sent_output_data(&node_name).await {
std::result::Result::Ok(Some(req)) =>{
let tmp_out_path = tmp_store.join(&req.id);
Expand Down Expand Up @@ -154,25 +151,22 @@ where
}
}
new_batch.size = req.size;
new_batch.id = req.id.clone();

println!("start to send data2 ");
//write outgoing
if new_batch.size >0 && downstreams.len()>0 {
//TODO change a more predicatable and reliable way to broadcase data message.
//must ensure every downstream node receive this message?
println!("start to send data3");
if let Err(_) = out_going_tx.send(new_batch) {
//no receiver
warn!("broadcast data fail due to no receiver, revert this data state to Processed");
if let Err(err) = db_repo.update_state(&node_name, &req.id, DataState::Received).await{
info!("start to send data {} {:?}", &req.id, now.elapsed());
if let Err(sent_nodes) = multi_sender.send(new_batch).await {
if let Err(err) = db_repo.mark_partial_sent(&node_name, &req.id, sent_nodes.iter().map(|key|key.as_str()).collect()).await{
error!("revert data state fail {err}");
}
warn!("send data to partial downnstream {:?}",sent_nodes);
break;
}
info!("send data to downnstream successfully");

info!("send data to downnstream successfully {} {:?}", &req.id, now.elapsed());
}

println!("start to send data4");
match db_repo.update_state(&node_name, &req.id, DataState::Sent).await{
std::result::Result::Ok(_) =>{
//remove input data
Expand All @@ -185,6 +179,7 @@ where
error!("update state to process fail {} {}", &req.id, err)
}
}
info!("fetch and send a batch {:?}", now.elapsed());
},
std::result::Result::Ok(None)=>{
break;
Expand Down Expand Up @@ -258,6 +253,7 @@ where
node_name:node_name.clone(),
id:req.id.clone(),
size: req.size,
sent: vec![],
state: DataState::Received,
direction: Direction::Out,
}).await{
Expand Down
File renamed without changes.
58 changes: 58 additions & 0 deletions crates/compute_unit_runner/src/multi_sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use jz_action::network::datatransfer::{
data_stream_client::DataStreamClient, MediaDataBatchResponse,
};
use tokio::time::Instant;
use std::collections::{hash_map::Entry, HashMap};
use tonic::transport::Channel;
use tracing::error;

pub struct MultiSender {
streams: Vec<String>,

connects: Vec<Option<DataStreamClient<Channel>>>,
}

impl MultiSender {
pub fn new(streams: Vec<String>) -> Self {
let connects = streams.iter().map(|_| None).collect();
MultiSender {
streams,
connects: connects,
}
}
}

impl MultiSender {
pub async fn send(&mut self, val: MediaDataBatchResponse) -> Result<(), Vec<String>> {
let mut sent = vec![];
for (index, stream) in self.connects.iter_mut().enumerate() {
let url = self.streams[index].clone();
if stream.is_none() {
match DataStreamClient::connect(url.clone()).await {
Ok(client) => {
*stream = Some(client);
}
Err(err) => {
error!("connect data streams {url} {err}");
continue;
}
}
}

let client = stream.as_mut().unwrap();
let now = Instant::now();
if let Err(err) = client.transfer_media_data(val.clone()).await {
error!("send reqeust will try next time {url} {err}");
continue;
}
println!("send one success {:?}", now.elapsed());
sent.push(url);
}

if sent.len() == self.streams.len() {
Ok(())
} else {
Err(sent)
}
}
}
41 changes: 10 additions & 31 deletions crates/compute_unit_runner/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Result;
use jz_action::core::models::DbRepo;
use jz_action::network::datatransfer::data_stream_server::DataStream;
use jz_action::network::datatransfer::{MediaDataBatchResponse, TabularDataBatchResponse};
use jz_action::utils::AnyhowToGrpc;
use jz_action::utils::{AnyhowToGrpc, IntoAnyhowResult};
use jz_action::{network::common::Empty, utils::StdIntoAnyhowResult};
use std::marker::PhantomData;
use std::sync::Arc;
Expand Down Expand Up @@ -40,39 +40,18 @@ impl<R> DataStream for UnitDataStream<R>
where
R: DbRepo + Clone + Send + Sync + 'static,
{
type subscribeMediaDataStream = ReceiverStream<Result<MediaDataBatchResponse, Status>>;
async fn subscribe_media_data(
async fn transfer_media_data(
&self,
request: Request<Empty>,
) -> Result<Response<Self::subscribeMediaDataStream>, Status> {
info!("receive media subscribe request {:?}", request);

let (tx, rx) = mpsc::channel(4);
let program_guard = self.program.lock().await;
let mut data_rx = program_guard.out_going_tx.subscribe();
tokio::spawn(async move {
loop {
select! {
data_batch = data_rx.recv() => {
if let Err(err) = tx.send(data_batch.anyhow().to_rpc(Code::Internal)).await {
//todo handle disconnect
error!("unable to send data to downstream {}", err);
return
}
},
}
}
});

Ok(Response::new(ReceiverStream::new(rx)))
req: Request<MediaDataBatchResponse>,
) -> Result<Response<Empty>, Status> {
//channel and compputeunit share a pv. so not use network to fetch data between channlle and computeunit.
//if plan to support networks seperate, impl this
todo!()
}

type subscribeTabularDataStream = ReceiverStream<Result<TabularDataBatchResponse, Status>>;

async fn subscribe_tabular_data(
async fn transfer_tabular_data(
&self,
_request: Request<Empty>,
) -> Result<Response<Self::subscribeTabularDataStream>, Status> {
req: Request<TabularDataBatchResponse>,
) -> Result<Response<Empty>, tonic::Status> {
todo!()
}
}
Loading

0 comments on commit a6ed928

Please sign in to comment.