Skip to content

Commit

Permalink
feat: add limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 2, 2024
1 parent a6ed928 commit 1a503cb
Show file tree
Hide file tree
Showing 7 changed files with 203 additions and 76 deletions.
4 changes: 4 additions & 0 deletions crates/compute_unit_runner/src/bin/compute_unit_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ struct Args {
#[arg(short, long, default_value = "/app/tmp")]
tmp_path: String,

#[arg(short, long, default_value = "30")]
buf_size: usize,

#[arg(short, long)]
node_name: String,

Expand Down Expand Up @@ -61,6 +64,7 @@ async fn main() -> Result<()> {
db_repo.clone(),
&args.node_name,
PathBuf::from_str(args.tmp_path.as_str())?,
args.buf_size,
);

let program_safe = Arc::new(Mutex::new(program));
Expand Down
96 changes: 64 additions & 32 deletions crates/compute_unit_runner/src/media_data_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::ipc::{AvaiableDataResponse, CompleteDataReq, SubmitOuputDataReq};
use crate::mprc::Mprs;
use anyhow::{Ok, Result};
use anyhow::{anyhow, Ok, Result};
use jz_action::core::models::{DataRecord, DataState, DbRepo, Direction, NodeRepo, TrackerState};
use jz_action::network::common::Empty;
use jz_action::network::datatransfer::data_stream_client::DataStreamClient;
Expand Down Expand Up @@ -29,6 +29,8 @@ where
{
pub(crate) name: String,

pub(crate) buf_size: usize,

pub(crate) tmp_store: PathBuf,

pub(crate) repo: R,
Expand All @@ -55,9 +57,10 @@ impl<R> MediaDataTracker<R>
where
R: DbRepo,
{
pub fn new(repo: R, name: &str, tmp_store: PathBuf) -> Self {
pub fn new(repo: R, name: &str, tmp_store: PathBuf, buf_size: usize) -> Self {
MediaDataTracker {
tmp_store,
buf_size,
name: name.to_string(),
repo: repo,
local_state: TrackerState::Init,
Expand Down Expand Up @@ -143,7 +146,7 @@ where
}
Err(err) => {
error!("walk out dir({:?}) fail {err}", &req.id);
if let Err(err) = db_repo.update_state(&node_name, &req.id, DataState::Error).await{
if let Err(err) = db_repo.update_state(&node_name, &req.id, Direction::Out, DataState::Error).await{
error!("mark data state error fail {err}");
}
break;
Expand All @@ -167,7 +170,7 @@ where
info!("send data to downnstream successfully {} {:?}", &req.id, now.elapsed());
}

match db_repo.update_state(&node_name, &req.id, DataState::Sent).await{
match db_repo.update_state(&node_name, &req.id, Direction::Out,DataState::Sent).await{
std::result::Result::Ok(_) =>{
//remove input data
let tmp_path = tmp_store.join(&req.id);
Expand Down Expand Up @@ -197,6 +200,61 @@ where
});
}

{
//process submit data
let db_repo = self.repo.clone();
let node_name = self.name.clone();
let buf_size = self.buf_size;

tokio::spawn(async move {
loop {
select! {
Some((req, resp)) = ipc_process_submit_result_rx.recv() => {
loop {
if let Err(err) = db_repo.count_pending(&node_name,Direction::In).await.and_then(|count|{
if count > buf_size {
Err(anyhow!("has reach limit current:{count} limit:{buf_size}"))
} else {
Ok(())
}
}){
warn!("fail with limit {err}");
sleep(Duration::from_secs(10)).await;
continue;
}
break;
}

// respose with nothing
match db_repo.insert_new_path(&DataRecord{
node_name:node_name.clone(),
id:req.id.clone(),
size: req.size,
sent: vec![],
state: DataState::Received,
direction: Direction::Out,
}).await{
std::result::Result::Ok(_) =>{
// respose with nothing
resp.send(Ok(())).expect("channel only read once");
info!("insert this data to path");
match new_data_tx.try_send(()) {
Err(TrySendError::Closed(_)) =>{
error!("new data channel has been closed")
}
_=>{}
}
},
Err(err) => {
resp.send(Err(err)).expect("channel only read once");
}
}
},
}
}
});
}

//TODO this make a async process to be sync process. got a low performance,
//if have any bottleneck here, we should refrator this one
{
Expand Down Expand Up @@ -232,7 +290,7 @@ where
}
},
Some((req, resp)) = ipc_process_completed_data_rx.recv() => {
match db_repo.update_state(&node_name, &req.id, DataState::Processed).await{
match db_repo.update_state(&node_name, &req.id, Direction::In, DataState::Processed).await{
std::result::Result::Ok(_) =>{
// respose with nothing
resp.send(Ok(())).expect("channel only read once");
Expand All @@ -246,33 +304,7 @@ where
resp.send(Err(err)).expect("channel only read once");
}
}
},
Some((req, resp)) = ipc_process_submit_result_rx.recv() => {
// respose with nothing
match db_repo.insert_new_path(&DataRecord{
node_name:node_name.clone(),
id:req.id.clone(),
size: req.size,
sent: vec![],
state: DataState::Received,
direction: Direction::Out,
}).await{
std::result::Result::Ok(_) =>{
// respose with nothing
resp.send(Ok(())).expect("channel only read once");
println!("send signal");
match new_data_tx.try_send(()) {
Err(TrySendError::Closed(_)) =>{
error!("new data channel has been closed")
}
_=>{}
}
},
Err(err) => {
resp.send(Err(err)).expect("channel only read once");
}
}
},
}
}
}
});
Expand Down
2 changes: 1 addition & 1 deletion crates/compute_unit_runner/src/multi_sender.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use jz_action::network::datatransfer::{
data_stream_client::DataStreamClient, MediaDataBatchResponse,
};
use tokio::time::Instant;
use std::collections::{hash_map::Entry, HashMap};
use tokio::time::Instant;
use tonic::transport::Channel;
use tracing::error;

Expand Down
33 changes: 25 additions & 8 deletions crates/dp_runner/src/channel_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ where

pub(crate) name: String,

pub(crate) buf_size: usize,

pub(crate) tmp_store: PathBuf,

pub(crate) local_state: TrackerState,
Expand All @@ -38,11 +40,12 @@ impl<R> ChannelTracker<R>
where
R: DbRepo,
{
pub(crate) fn new(repo: R, name: &str, tmp_store: PathBuf) -> Self {
pub(crate) fn new(repo: R, name: &str, buf_size: usize, tmp_store: PathBuf) -> Self {
ChannelTracker {
name: name.to_string(),
repo: repo,
tmp_store,
buf_size,
local_state: TrackerState::Init,
upstreams: vec![],
downstreams: vec![],
Expand All @@ -61,15 +64,29 @@ where
let tmp_store = self.tmp_store.clone();
let db_repo = self.repo.clone();
let node_name = self.name.clone();
let buf_size = self.buf_size;
tokio::spawn(async move {
loop {
select! {
Some((data_batch, resp)) = rx.recv() => {
Some((data_batch, resp)) = rx.recv() => { //make this params
//save to fs
//create input directory
let now = Instant::now();
let id = data_batch.id;
match db_repo.find_by_node_id(&node_name,&id).await {
//check limit
if let Err(err) = db_repo.count_pending(&node_name,Direction::In).await.and_then(|count|{
if count > buf_size {
Err(anyhow!("has reach limit current:{count} limit:{buf_size}"))
} else {
Ok(())
}
}){
resp.send(Err(anyhow!("cannt query limit from mongo {err}"))).expect("request alread listen this channel");
continue;
}

// processed before
match db_repo.find_by_node_id(&node_name,&id,Direction::In).await {
Ok(Some(_))=>{
warn!("data {} processed before", &id);
resp.send(Ok(())).expect("request alread listen this channel");
Expand All @@ -83,17 +100,18 @@ where
_=>{}
}

let tmp_in_path = tmp_store.join(id.clone());
// code below this can be move another coroutine

debug!("try to create directory {:?}", tmp_in_path);
//write batch files
//write files is too slow. try to use mem cache
let tmp_in_path = tmp_store.join(id.clone());
debug!("try to create directory {:?} {:?}", tmp_in_path, now.elapsed());
if let Err(err) = fs::create_dir_all(&tmp_in_path).await {
error!("create input dir {:?} fail {}", tmp_in_path, err);
resp.send(Err(err.into())).expect("request alread listen this channel");
continue;
}

//write batch files

let mut is_write_err = false;
for (entry_path, entry) in data_batch.cells.iter().map(|entry|(tmp_in_path.join(entry.path.clone()), entry)) {
if let Err(err) = fs::write(entry_path.clone(), &entry.data).await {
Expand All @@ -106,7 +124,6 @@ where
resp.send(Err(anyhow!("write file "))).expect("request alread listen this channel");
continue;
}

info!("write files to disk in {} {:?}", &id, now.elapsed());

//insert record to database
Expand Down
4 changes: 4 additions & 0 deletions crates/dp_runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ struct Args {
#[arg(short, long)]
database: String,

#[arg(short, long, default_value = "30")]
buf_size: usize,

#[arg(long, default_value = "0.0.0.0:80")]
host_port: String,
}
Expand All @@ -60,6 +63,7 @@ async fn main() -> Result<()> {
let program = ChannelTracker::new(
db_repo.clone(),
&args.node_name,
args.buf_size,
PathBuf::from_str(args.tmp_path.as_str())?,
);

Expand Down
8 changes: 8 additions & 0 deletions src/core/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,15 @@ pub trait DataRepo {
&self,
node_name: &str,
id: &str,
direction: Direction,
) -> impl std::future::Future<Output = Result<Option<DataRecord>>> + Send;

fn count_pending(
&self,
node_name: &str,
direction: Direction,
) -> impl std::future::Future<Output = Result<usize>> + Send;

fn mark_partial_sent(
&self,
node_name: &str,
Expand All @@ -117,6 +124,7 @@ pub trait DataRepo {
&self,
node_name: &str,
id: &str,
direction: Direction,
state: DataState,
) -> impl std::future::Future<Output = Result<()>> + Send;
}
Expand Down
Loading

0 comments on commit 1a503cb

Please sign in to comment.