diff --git a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs index baf6db1..b4a4e7c 100644 --- a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs +++ b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs @@ -31,6 +31,9 @@ struct Args { #[arg(short, long, default_value = "/app/tmp")] tmp_path: String, + #[arg(short, long, default_value = "30")] + buf_size: usize, + #[arg(short, long)] node_name: String, @@ -61,6 +64,7 @@ async fn main() -> Result<()> { db_repo.clone(), &args.node_name, PathBuf::from_str(args.tmp_path.as_str())?, + args.buf_size, ); let program_safe = Arc::new(Mutex::new(program)); diff --git a/crates/compute_unit_runner/src/media_data_tracker.rs b/crates/compute_unit_runner/src/media_data_tracker.rs index c421424..1b42dc2 100644 --- a/crates/compute_unit_runner/src/media_data_tracker.rs +++ b/crates/compute_unit_runner/src/media_data_tracker.rs @@ -1,6 +1,6 @@ use crate::ipc::{AvaiableDataResponse, CompleteDataReq, SubmitOuputDataReq}; use crate::mprc::Mprs; -use anyhow::{Ok, Result}; +use anyhow::{anyhow, Ok, Result}; use jz_action::core::models::{DataRecord, DataState, DbRepo, Direction, NodeRepo, TrackerState}; use jz_action::network::common::Empty; use jz_action::network::datatransfer::data_stream_client::DataStreamClient; @@ -29,6 +29,8 @@ where { pub(crate) name: String, + pub(crate) buf_size: usize, + pub(crate) tmp_store: PathBuf, pub(crate) repo: R, @@ -55,9 +57,10 @@ impl MediaDataTracker where R: DbRepo, { - pub fn new(repo: R, name: &str, tmp_store: PathBuf) -> Self { + pub fn new(repo: R, name: &str, tmp_store: PathBuf, buf_size: usize) -> Self { MediaDataTracker { tmp_store, + buf_size, name: name.to_string(), repo: repo, local_state: TrackerState::Init, @@ -143,7 +146,7 @@ where } Err(err) => { error!("walk out dir({:?}) fail {err}", &req.id); - if let Err(err) = db_repo.update_state(&node_name, &req.id, DataState::Error).await{ + if let Err(err) = db_repo.update_state(&node_name, &req.id, Direction::Out, DataState::Error).await{ error!("mark data state error fail {err}"); } break; @@ -167,7 +170,7 @@ where info!("send data to downnstream successfully {} {:?}", &req.id, now.elapsed()); } - match db_repo.update_state(&node_name, &req.id, DataState::Sent).await{ + match db_repo.update_state(&node_name, &req.id, Direction::Out,DataState::Sent).await{ std::result::Result::Ok(_) =>{ //remove input data let tmp_path = tmp_store.join(&req.id); @@ -197,6 +200,61 @@ where }); } + { + //process submit data + let db_repo = self.repo.clone(); + let node_name = self.name.clone(); + let buf_size = self.buf_size; + + tokio::spawn(async move { + loop { + select! { + Some((req, resp)) = ipc_process_submit_result_rx.recv() => { + loop { + if let Err(err) = db_repo.count_pending(&node_name,Direction::In).await.and_then(|count|{ + if count > buf_size { + Err(anyhow!("has reach limit current:{count} limit:{buf_size}")) + } else { + Ok(()) + } + }){ + warn!("fail with limit {err}"); + sleep(Duration::from_secs(10)).await; + continue; + } + break; + } + + // respose with nothing + match db_repo.insert_new_path(&DataRecord{ + node_name:node_name.clone(), + id:req.id.clone(), + size: req.size, + sent: vec![], + state: DataState::Received, + direction: Direction::Out, + }).await{ + std::result::Result::Ok(_) =>{ + // respose with nothing + resp.send(Ok(())).expect("channel only read once"); + info!("insert this data to path"); + match new_data_tx.try_send(()) { + Err(TrySendError::Closed(_)) =>{ + error!("new data channel has been closed") + } + _=>{} + } + }, + Err(err) => { + resp.send(Err(err)).expect("channel only read once"); + } + } + }, + } + } + }); + } + //TODO this make a async process to be sync process. got a low performance, //if have any bottleneck here, we should refrator this one { @@ -232,7 +290,7 @@ where } }, Some((req, resp)) = ipc_process_completed_data_rx.recv() => { - match db_repo.update_state(&node_name, &req.id, DataState::Processed).await{ + match db_repo.update_state(&node_name, &req.id, Direction::In, DataState::Processed).await{ std::result::Result::Ok(_) =>{ // respose with nothing resp.send(Ok(())).expect("channel only read once"); @@ -246,33 +304,7 @@ where resp.send(Err(err)).expect("channel only read once"); } } - }, - Some((req, resp)) = ipc_process_submit_result_rx.recv() => { - // respose with nothing - match db_repo.insert_new_path(&DataRecord{ - node_name:node_name.clone(), - id:req.id.clone(), - size: req.size, - sent: vec![], - state: DataState::Received, - direction: Direction::Out, - }).await{ - std::result::Result::Ok(_) =>{ - // respose with nothing - resp.send(Ok(())).expect("channel only read once"); - println!("send signal"); - match new_data_tx.try_send(()) { - Err(TrySendError::Closed(_)) =>{ - error!("new data channel has been closed") - } - _=>{} - } - }, - Err(err) => { - resp.send(Err(err)).expect("channel only read once"); - } - } - }, + } } } }); diff --git a/crates/compute_unit_runner/src/multi_sender.rs b/crates/compute_unit_runner/src/multi_sender.rs index 1478020..05119d7 100644 --- a/crates/compute_unit_runner/src/multi_sender.rs +++ b/crates/compute_unit_runner/src/multi_sender.rs @@ -1,8 +1,8 @@ use jz_action::network::datatransfer::{ data_stream_client::DataStreamClient, MediaDataBatchResponse, }; -use tokio::time::Instant; use std::collections::{hash_map::Entry, HashMap}; +use tokio::time::Instant; use tonic::transport::Channel; use tracing::error; diff --git a/crates/dp_runner/src/channel_tracker.rs b/crates/dp_runner/src/channel_tracker.rs index 2702aa5..31eac25 100644 --- a/crates/dp_runner/src/channel_tracker.rs +++ b/crates/dp_runner/src/channel_tracker.rs @@ -23,6 +23,8 @@ where pub(crate) name: String, + pub(crate) buf_size: usize, + pub(crate) tmp_store: PathBuf, pub(crate) local_state: TrackerState, @@ -38,11 +40,12 @@ impl ChannelTracker where R: DbRepo, { - pub(crate) fn new(repo: R, name: &str, tmp_store: PathBuf) -> Self { + pub(crate) fn new(repo: R, name: &str, buf_size: usize, tmp_store: PathBuf) -> Self { ChannelTracker { name: name.to_string(), repo: repo, tmp_store, + buf_size, local_state: TrackerState::Init, upstreams: vec![], downstreams: vec![], @@ -61,15 +64,29 @@ where let tmp_store = self.tmp_store.clone(); let db_repo = self.repo.clone(); let node_name = self.name.clone(); + let buf_size = self.buf_size; tokio::spawn(async move { loop { select! { - Some((data_batch, resp)) = rx.recv() => { + Some((data_batch, resp)) = rx.recv() => { //make this params //save to fs //create input directory let now = Instant::now(); let id = data_batch.id; - match db_repo.find_by_node_id(&node_name,&id).await { + //check limit + if let Err(err) = db_repo.count_pending(&node_name,Direction::In).await.and_then(|count|{ + if count > buf_size { + Err(anyhow!("has reach limit current:{count} limit:{buf_size}")) + } else { + Ok(()) + } + }){ + resp.send(Err(anyhow!("cannt query limit from mongo {err}"))).expect("request alread listen this channel"); + continue; + } + + // processed before + match db_repo.find_by_node_id(&node_name,&id,Direction::In).await { Ok(Some(_))=>{ warn!("data {} processed before", &id); resp.send(Ok(())).expect("request alread listen this channel"); @@ -83,17 +100,18 @@ where _=>{} } - let tmp_in_path = tmp_store.join(id.clone()); + // code below this can be move another coroutine - debug!("try to create directory {:?}", tmp_in_path); + //write batch files + //write files is too slow. try to use mem cache + let tmp_in_path = tmp_store.join(id.clone()); + debug!("try to create directory {:?} {:?}", tmp_in_path, now.elapsed()); if let Err(err) = fs::create_dir_all(&tmp_in_path).await { error!("create input dir {:?} fail {}", tmp_in_path, err); resp.send(Err(err.into())).expect("request alread listen this channel"); continue; } - //write batch files - let mut is_write_err = false; for (entry_path, entry) in data_batch.cells.iter().map(|entry|(tmp_in_path.join(entry.path.clone()), entry)) { if let Err(err) = fs::write(entry_path.clone(), &entry.data).await { @@ -106,7 +124,6 @@ where resp.send(Err(anyhow!("write file "))).expect("request alread listen this channel"); continue; } - info!("write files to disk in {} {:?}", &id, now.elapsed()); //insert record to database diff --git a/crates/dp_runner/src/main.rs b/crates/dp_runner/src/main.rs index d7c1722..4540745 100644 --- a/crates/dp_runner/src/main.rs +++ b/crates/dp_runner/src/main.rs @@ -42,6 +42,9 @@ struct Args { #[arg(short, long)] database: String, + #[arg(short, long, default_value = "30")] + buf_size: usize, + #[arg(long, default_value = "0.0.0.0:80")] host_port: String, } @@ -60,6 +63,7 @@ async fn main() -> Result<()> { let program = ChannelTracker::new( db_repo.clone(), &args.node_name, + args.buf_size, PathBuf::from_str(args.tmp_path.as_str())?, ); diff --git a/src/core/models.rs b/src/core/models.rs index 0405762..22f4c3b 100644 --- a/src/core/models.rs +++ b/src/core/models.rs @@ -99,8 +99,15 @@ pub trait DataRepo { &self, node_name: &str, id: &str, + direction: Direction, ) -> impl std::future::Future>> + Send; + fn count_pending( + &self, + node_name: &str, + direction: Direction, + ) -> impl std::future::Future> + Send; + fn mark_partial_sent( &self, node_name: &str, @@ -117,6 +124,7 @@ pub trait DataRepo { &self, node_name: &str, id: &str, + direction: Direction, state: DataState, ) -> impl std::future::Future> + Send; } diff --git a/src/dbrepo/mongo.rs b/src/dbrepo/mongo.rs index 2de3005..4f036b3 100644 --- a/src/dbrepo/mongo.rs +++ b/src/dbrepo/mongo.rs @@ -1,12 +1,13 @@ use crate::{ - core::models::{DBConfig, DataRecord, DataRepo, DataState, Graph, GraphRepo, Node, NodeRepo}, + core::models::{ + DBConfig, DataRecord, DataRepo, DataState, Direction, Graph, GraphRepo, Node, NodeRepo, + }, utils::StdIntoAnyhowResult, }; use anyhow::{anyhow, Result}; use mongodb::{bson::doc, error::ErrorKind, options::IndexOptions, Client, Collection, IndexModel}; use serde::Serialize; use serde_variant::to_variant_name; -use std::{ops::Deref, sync::Arc}; const GRAPH_COL_NAME: &'static str = "graph"; const NODE_COL_NAME: &'static str = "node"; @@ -47,22 +48,66 @@ impl MongoRepo { let node_col: Collection = database.collection(&NODE_COL_NAME); let data_col: Collection = database.collection(&DATA_COL_NAME); - //create index - let idx_opts = IndexOptions::builder() - .unique(true) - .name("idx_node_name_unique".to_owned()) - .build(); - - let index = IndexModel::builder() - .keys(doc! { "node_name": 1 }) - .options(idx_opts) - .build(); - - if let Err(e) = node_col.create_index(index).await { - match *e.kind { - ErrorKind::Command(ref command_error) if command_error.code == 85 => {} - e => { - return Err(anyhow!("create index error {}", e)); + { + //create index for nodes + let idx_opts = IndexOptions::builder() + .unique(true) + .name("idx_node_name_unique".to_owned()) + .build(); + + let index = IndexModel::builder() + .keys(doc! { "node_name": 1 }) + .options(idx_opts) + .build(); + + if let Err(e) = node_col.create_index(index).await { + match *e.kind { + ErrorKind::Command(ref command_error) if command_error.code == 85 => {} + e => { + return Err(anyhow!("create index error {}", e)); + } + } + } + } + + { + //create index for data + let idx_opts = IndexOptions::builder() + .name("idx_node_name_state_direction".to_owned()) + .build(); + + let index = IndexModel::builder() + .keys(doc! { "node_name": 1,"state": 1,"direction": 1}) + .options(idx_opts) + .build(); + + if let Err(e) = data_col.create_index(index).await { + match *e.kind { + ErrorKind::Command(ref command_error) if command_error.code == 85 => {} + e => { + return Err(anyhow!("create index error {}", e)); + } + } + } + } + + { + //create index for data + let idx_opts = IndexOptions::builder() + .name("idx_node_name_id_direction".to_owned()) + .build(); + + let index = IndexModel::builder() + .keys(doc! { "node_name": 1,"id": 1,"direction": 1}) + .options(idx_opts) + .build(); + + if let Err(e) = data_col.create_index(index).await { + match *e.kind { + ErrorKind::Command(ref command_error) if command_error.code == 85 => {} + e => { + return Err(anyhow!("create index error {}", e)); + } } } } @@ -109,14 +154,13 @@ impl DataRepo for MongoRepo { "$set": { "state": "Assigned" }, }; - let result = self - .data_col + self.data_col .find_one_and_update( doc! {"node_name":node_name,"state": "Received", "direction":"In"}, update, ) - .await?; - Ok(result) + .await + .anyhow() } async fn find_and_sent_output_data(&self, node_name: &str) -> Result> { @@ -124,27 +168,49 @@ impl DataRepo for MongoRepo { "$set": { "state": "PartialSent" }, }; - let result = self + self .data_col .find_one_and_update( doc! {"node_name":node_name, "state": doc! {"$in": ["Received","PartialSent"]}, "direction":"Out"}, update, ) - .await?; - Ok(result) + .await.anyhow() } - async fn insert_new_path(&self, record: &DataRecord) -> Result<()> { - self.data_col.insert_one(record).await.map(|_| ()).anyhow() + async fn find_by_node_id( + &self, + node_name: &str, + id: &str, + direction: Direction, + ) -> Result> { + self.data_col + .find_one( + doc! {"id": id,"node_name":node_name, "direction": to_variant_name(&direction)?}, + ) + .await + .anyhow() + } + + async fn count_pending(&self, node_name: &str, direction: Direction) -> Result { + self.data_col.count_documents(doc! {"node_name":node_name,"state": doc! {"$in": ["Received","PartialSent"]}, "direction": to_variant_name(&direction)?}).await.map(|count|count as usize).anyhow() } - async fn update_state(&self, node_name: &str, id: &str, state: DataState) -> Result<()> { + async fn update_state( + &self, + node_name: &str, + id: &str, + direction: Direction, + state: DataState, + ) -> Result<()> { let update = doc! { "$set": { "state": to_variant_name(&state)? }, }; self.data_col - .find_one_and_update(doc! {"node_name":node_name,"id": id}, update) + .find_one_and_update( + doc! {"node_name":node_name,"id": id, "direction": to_variant_name(&direction)?}, + update, + ) .await .map(|_| ()) .anyhow() @@ -162,12 +228,8 @@ impl DataRepo for MongoRepo { .anyhow() } - async fn find_by_node_id(&self, node_name: &str, id: &str) -> Result> { - let result = self - .data_col - .find_one(doc! {"id": id,"node_name":node_name}) - .await?; - Ok(result) + async fn insert_new_path(&self, record: &DataRecord) -> Result<()> { + self.data_col.insert_one(record).await.map(|_| ()).anyhow() } }