Skip to content

Commit

Permalink
fix: partial sent
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 3, 2024
1 parent 45d8048 commit b67e591
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 30 deletions.
11 changes: 7 additions & 4 deletions crates/compute_unit_runner/src/media_data_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ where
Err(err) => {
warn!("Failed to read batch: {}", err);
//todo how to handle missing data
if let Err(err) = db_repo.update_state(&node_name, &req.id, Direction::Out, DataState::Error, None).await {
error!("mark data {} to fail {}", &req.id, err);
}
break;
}
};
Expand All @@ -141,7 +144,7 @@ where
info!("start to send data {} {:?}", &req.id, now.elapsed());
let sent_nodes: Vec<_>= req.sent.iter().map(|v|v.as_str()).collect();
if let Err(sent_nodes) = multi_sender.send(new_batch, &sent_nodes).await {
if let Err(err) = db_repo.mark_partial_sent(&node_name, &req.id, sent_nodes.iter().map(|key|key.as_str()).collect()).await{
if let Err(err) = db_repo.update_state(&node_name, &req.id, Direction::Out, DataState::PartialSent, Some(sent_nodes.iter().map(|key|key.as_str()).collect())).await{
error!("revert data state fail {err}");
}
warn!("send data to partial downnstream {:?}",sent_nodes);
Expand All @@ -151,7 +154,7 @@ where
info!("send data to downnstream successfully {} {:?}", &req.id, now.elapsed());
}

match db_repo.update_state(&node_name, &req.id, Direction::Out,DataState::Sent).await{
match db_repo.update_state(&node_name, &req.id, Direction::Out,DataState::Sent, Some(downstreams.iter().map(|key|key.as_str()).collect())).await{
std::result::Result::Ok(_) =>{
//remove input data
if let Err(err) = fs_cache.remove(&req.id).await {
Expand Down Expand Up @@ -300,7 +303,7 @@ where
}

//insert a new incoming data record
if let Err(err) = db_repo.update_state(&(node_name.clone() +"-channel"), &res_data.id, Direction::In, DataState::EndRecieved).await{
if let Err(err) = db_repo.update_state(&(node_name.clone() +"-channel"), &res_data.id, Direction::In, DataState::EndRecieved,None).await{
error!("mark data as client receive {err}");
resp.send(Err(anyhow!("mark data as client receive {err}"))).expect("channel only read once");
continue;
Expand Down Expand Up @@ -332,7 +335,7 @@ where
}
},
Some((req, resp)) = ipc_process_completed_data_rx.recv() => {
match db_repo.update_state(&node_name, &req.id, Direction::In, DataState::Processed).await{
match db_repo.update_state(&node_name, &req.id, Direction::In, DataState::Processed, None).await{
std::result::Result::Ok(_) =>{
// respose with nothing
resp.send(Ok(())).expect("channel only read once");
Expand Down
6 changes: 5 additions & 1 deletion crates/compute_unit_runner/src/multi_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ impl MultiSender {
}

impl MultiSender {
pub async fn send(&mut self, val: MediaDataBatchResponse, sent_nodes: &[&str]) -> Result<(), Vec<String>> {
pub async fn send(
&mut self,
val: MediaDataBatchResponse,
sent_nodes: &[&str],
) -> Result<(), Vec<String>> {
let mut sent = vec![];
for (index, stream) in self.connects.iter_mut().enumerate() {
let url = &self.streams[index];
Expand Down
2 changes: 1 addition & 1 deletion crates/dp_runner/src/channel_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where
for data in datas {
match fs_cache.remove(&data.id).await {
Ok(_)=>{
if let Err(err) = db_repo.update_state(&node_name, &data.id, Direction::In, DataState::Clean).await{
if let Err(err) = db_repo.update_state(&node_name, &data.id, Direction::In, DataState::Clean, None).await{
error!("mark data as client receive {err}");
continue;
}
Expand Down
8 changes: 1 addition & 7 deletions src/core/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,6 @@ pub trait DataRepo {
direction: Direction,
) -> impl std::future::Future<Output = Result<usize>> + Send;

fn mark_partial_sent(
&self,
node_name: &str,
id: &str,
sent: Vec<&str>,
) -> impl std::future::Future<Output = Result<()>> + Send;

fn insert_new_path(
&self,
record: &DataRecord,
Expand All @@ -135,6 +128,7 @@ pub trait DataRepo {
id: &str,
direction: Direction,
state: DataState,
sent: Option<Vec<&str>>,
) -> impl std::future::Future<Output = Result<()>> + Send;
}

Expand Down
22 changes: 6 additions & 16 deletions src/dbrepo/mongo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,33 +206,23 @@ impl DataRepo for MongoRepo {
id: &str,
direction: Direction,
state: DataState,
sent: Option<Vec<&str>>,
) -> Result<()> {
let update = doc! {
"$set": { "state": to_variant_name(&state)? },
};
let mut update_fields = doc! {"state": to_variant_name(&state)?};
if let Some(sent) = sent {
update_fields.insert("sent", sent);
}

self.data_col
.find_one_and_update(
doc! {"node_name":node_name,"id": id, "direction": to_variant_name(&direction)?},
update,
doc! {"$set": update_fields},
)
.await
.map(|_| ())
.anyhow()
}

async fn mark_partial_sent(&self, node_name: &str, id: &str, sent: Vec<&str>) -> Result<()> {
let update = doc! {
"$set": { "sent": sent },
};

self.data_col
.update_one(doc! {"node_name":node_name, "id":id}, update)
.await
.map(|_| ())
.anyhow()
}

async fn insert_new_path(&self, record: &DataRecord) -> Result<()> {
self.data_col.insert_one(record).await.map(|_| ()).anyhow()
}
Expand Down
1 change: 0 additions & 1 deletion src/driver/kube.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ where
)
})
.collect::<Vec<_>>();

let channel_record = Node {
node_name: node.name.clone() + "-channel",
state: TrackerState::Init,
Expand Down

0 comments on commit b67e591

Please sign in to comment.