Skip to content

Commit

Permalink
feat: add more cli to control job & add status report
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 9, 2024
1 parent a291633 commit 7fb4352
Show file tree
Hide file tree
Showing 23 changed files with 568 additions and 137 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,14 @@ version = "0.1.0"
edition = "2021"

[dependencies]

bimap = "0.6.3"
kube = { version = "0.93.1", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.22.0", features = ["latest"] }
handlebars = "6.0.0"
prost = "0.13.1"
serde_variant = "0.1.3"
uri="0.4.0"
comfy-table = "7.1.1"
prettytable-rs = "^0.10"
k8s-metrics = "0.16.0"

tokio = { workspace = true, features = ["macros", "rt-multi-thread", "fs"] }
tokio-retry = {workspace = true}
Expand All @@ -72,6 +71,7 @@ chrono = {workspace = true}
futures = {workspace = true}
async-trait = {workspace = true}
reqwest = {workspace = true}
ratatui = "0.28.0"

[build-dependencies]
tonic-build = "0.12.1"
Expand Down
2 changes: 1 addition & 1 deletion crates/compute_unit_runner/src/bin/compute_unit_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ async fn main() -> Result<()> {
info!("ipc server stopped");
}
};
Ok::<(), anyhow::Error>(())
Ok::<(), anyhow::Error>(())
});
}
{
Expand Down
8 changes: 4 additions & 4 deletions crates/compute_unit_runner/src/media_data_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ where
&DataState::PartialSent
];

if db_repo.count(&node_name, running_state.as_slice(), &Direction::Out).await? == 0 &&
db_repo.count(&node_name, &[&DataState::Received,&DataState::Assigned], &Direction::In).await? == 0 {
if db_repo.count(&node_name, running_state.as_slice(), Some(&Direction::Out)).await? == 0 &&
db_repo.count(&node_name, &[&DataState::Received,&DataState::Assigned], Some(&Direction::In)).await? == 0 {
db_repo.update_node_by_name(&node_name, TrackerState::Finish).await.map_err(|err|anyhow!("update node data {err}"))?;
info!("node was finished, not need to run backend");
return anyhow::Ok(());
Expand Down Expand Up @@ -333,7 +333,7 @@ where
}

loop {
if let Err(err) = db_repo.count(&node_name, &[&DataState::Received, &DataState::PartialSent], &Direction::Out).await.and_then(|count|{
if let Err(err) = db_repo.count(&node_name, &[&DataState::Received, &DataState::PartialSent], Some(&Direction::Out)).await.and_then(|count|{
if count > buf_size {
Err(anyhow!("has reach limit current:{count} limit:{buf_size}"))
} else {
Expand Down Expand Up @@ -542,7 +542,7 @@ where
&DataState::SelectForSend,
&DataState::PartialSent,
];
match db_repo.count(&node_name, running_state.as_slice(), &Direction::Out).await {
match db_repo.count(&node_name, running_state.as_slice(), Some(&Direction::Out)).await {
Ok(count) => {
if count ==0 {
break;
Expand Down
8 changes: 4 additions & 4 deletions crates/dp_runner/src/channel_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::{
use tokio::{
select,
sync::mpsc::{
self,
},
self,
},
task::JoinSet,
time::{
self,
Expand Down Expand Up @@ -148,7 +148,7 @@ where
&DataState::SelectForSend,
&DataState::EndRecieved
];
if db_repo.count(&node_name, running_state.as_slice(), &Direction::Out).await? == 0 {
if db_repo.count(&node_name, running_state.as_slice(), Some(&Direction::Out)).await? == 0 {
db_repo.update_node_by_name(&node_name, TrackerState::Finish).await.map_err(|err|anyhow!("update node data {err}"))?;
info!("node was finished, not need to run backend");
return anyhow::Ok(());
Expand Down Expand Up @@ -255,7 +255,7 @@ where
let id = data_batch.id.clone();
let size = data_batch.size;
//check limit
if let Err(err) = db_repo.count(&node_name,&[&DataState::Received], &Direction::In).await.and_then(|count|{
if let Err(err) = db_repo.count(&node_name,&[&DataState::Received], Some(&Direction::In)).await.and_then(|count|{
if count > buf_size {
Err(anyhow!("has reach limit current:{count} limit:{buf_size}"))
} else {
Expand Down
4 changes: 3 additions & 1 deletion crates/dp_runner/src/state_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ where
.get_node_by_name(name)
.await{
Ok(record)=> {
debug!("{} fetch state from db", record.node_name);
debug!("{} fetch state {:?} from db", record.node_name, record.state);
let mut program_guard = program.write().await;
if program_guard.local_state == record.state {
println!("xcxxxxxx {:?}",program_guard.local_state);
continue
}
println!("xxxxxxxxxxxx");
let old_local_state = program_guard.local_state.clone();
program_guard.local_state = record.state.clone();
info!("update state {:?} -> {:?}", &old_local_state, &record.state);
Expand Down
Binary file added jz-flow
Binary file not shown.
2 changes: 1 addition & 1 deletion nodes/dummy_in/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ async fn dummy_in(token: CancellationToken, args: Args) -> Result<()> {
};

loop {
if args.total_count > 0 && count > args.total_count {
if args.total_count > 0 && count >= args.total_count {
info!("exit pod because work has done");
if client.status().await.unwrap().state == TrackerState::Finish {
return Ok(());
Expand Down
3 changes: 2 additions & 1 deletion script/example_dag.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"image": "gitdatateam/dummy_in:latest",
"command": "/dummy_in",
"args": [
"--log-level=debug"
"--log-level=debug",
"--total-count=100"
]
}
},
Expand Down
59 changes: 56 additions & 3 deletions src/api/client/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl JobClient {
.await
.anyhow()
.and_then(|body| String::from_utf8(body.into()).anyhow())?;
return Err(anyhow!("get job {code} reason {err_msg}"));
return Err(anyhow!("list job {code} reason {err_msg}"));
}

resp.bytes()
Expand Down Expand Up @@ -156,7 +156,7 @@ impl JobClient {
.await
.anyhow()
.and_then(|body| String::from_utf8(body.into()).anyhow())?;
return Err(anyhow!("request job {code} reason {err_msg}"));
return Err(anyhow!("request update job {code} reason {err_msg}"));
}

Ok(())
Expand All @@ -183,7 +183,7 @@ impl JobClient {
.await
.anyhow()
.and_then(|body| String::from_utf8(body.into()).anyhow())?;
return Err(anyhow!("request job {code} reason {err_msg}"));
return Err(anyhow!("request job detail {code} reason {err_msg}"));
}

resp.bytes()
Expand All @@ -192,4 +192,57 @@ impl JobClient {
.and_then(|body| serde_json::from_slice(&body).anyhow())
.anyhow()
}

pub async fn run_job(&self, job_id: &ObjectId) -> Result<()> {
let resp = self
.client
.post(
self.base_uri
.clone()
.join("job/")?
.join("run/")?
.join(job_id.to_hex().as_str())?,
)
.send()
.await
.anyhow()?;

if !resp.status().is_success() {
let code = resp.status();
let err_msg = resp
.bytes()
.await
.anyhow()
.and_then(|body| String::from_utf8(body.into()).anyhow())?;
return Err(anyhow!("request start job {code} reason {err_msg}"));
}

Ok(())
}

pub async fn clean_job(&self, job_id: &ObjectId) -> Result<()> {
let resp = self
.client
.delete(
self.base_uri
.clone()
.join("job/")?
.join(job_id.to_hex().as_str())?,
)
.send()
.await
.anyhow()?;

if !resp.status().is_success() {
let code = resp.status();
let err_msg = resp
.bytes()
.await
.anyhow()
.and_then(|body| String::from_utf8(body.into()).anyhow())?;
return Err(anyhow!("request start job {code} reason {err_msg}"));
}

Ok(())
}
}
54 changes: 39 additions & 15 deletions src/api/job_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
Job,
JobDbRepo,
JobUpdateInfo,
ListJobParams,
MainDbRepo,
},
driver::Driver,
Expand Down Expand Up @@ -42,17 +43,24 @@ async fn list<MAINR>(db_repo: web::Data<MAINR>) -> HttpResponse
where
MAINR: MainDbRepo,
{
match db_repo.list_jobs().await {
let list_job_params = &ListJobParams { state: None };
match db_repo.list_jobs(list_job_params).await {
Ok(jobs) => HttpResponse::Ok().json(&jobs),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
}
}

async fn delete<MAINR>(db_repo: web::Data<MAINR>, path: web::Path<ObjectId>) -> HttpResponse
async fn clean_job<D, MAINR, JOBR>(
job_manager: web::Data<JobManager<D, MAINR, JOBR>>,
path: web::Path<String>,
) -> HttpResponse
where
D: Driver,
MAINR: MainDbRepo,
JOBR: JobDbRepo,
{
match db_repo.delete(&path.into_inner()).await {
let id = ObjectId::from_str(&path.into_inner()).unwrap();
match job_manager.clean_job(&id).await {
Ok(_) => HttpResponse::Ok().finish(),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
}
Expand Down Expand Up @@ -91,22 +99,38 @@ where
}
}

async fn run_job<D, MAINR, JOBR>(
job_manager: web::Data<JobManager<D, MAINR, JOBR>>,
path: web::Path<String>,
) -> HttpResponse
where
D: Driver,
MAINR: MainDbRepo,
JOBR: JobDbRepo,
{
let id = ObjectId::from_str(&path.into_inner()).unwrap();
match job_manager.start_job(&id).await {
Ok(detail) => HttpResponse::Ok().json(detail),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
}
}

pub(super) fn job_route_config<D, MAINR, JOBR>(cfg: &mut web::ServiceConfig)
where
D: Driver,
MAINR: MainDbRepo,
JOBR: JobDbRepo,
{
cfg.service(
web::resource("/job")
.route(web::post().to(create::<MAINR>))
.route(web::delete().to(delete::<MAINR>)),
)
.service(
web::resource("/job/{id}")
.route(web::get().to(get::<MAINR>))
.route(web::post().to(update::<MAINR>)),
)
.service(web::resource("/jobs").route(web::get().to(list::<MAINR>)))
.service(web::resource("/job/detail/{id}").route(web::get().to(job_details::<D, MAINR, JOBR>)));
cfg.service(web::resource("/job").route(web::post().to(create::<MAINR>)))
.service(
web::resource("/job/{id}")
.route(web::get().to(get::<MAINR>))
.route(web::post().to(update::<MAINR>))
.route(web::delete().to(clean_job::<D, MAINR, JOBR>)),
)
.service(web::resource("/jobs").route(web::get().to(list::<MAINR>)))
.service(
web::resource("/job/detail/{id}").route(web::get().to(job_details::<D, MAINR, JOBR>)),
)
.service(web::resource("/job/run/{id}").route(web::post().to(run_job::<D, MAINR, JOBR>)));
}
Loading

0 comments on commit 7fb4352

Please sign in to comment.