diff --git a/crates/compute_unit_runner/Cargo.toml b/crates/compute_unit_runner/Cargo.toml index 0d00eec..b505ed7 100644 --- a/crates/compute_unit_runner/Cargo.toml +++ b/crates/compute_unit_runner/Cargo.toml @@ -33,4 +33,4 @@ hyper-util = "0.1.6" http-body-util = "0.1.2" walkdir = "2.5.0" rand = "0.8.5" -query-string-builder = "=0.2.0" \ No newline at end of file +query-string-builder = "=0.6.0" \ No newline at end of file diff --git a/crates/compute_unit_runner/src/ipc.rs b/crates/compute_unit_runner/src/ipc.rs index 85cd2d5..049dc63 100644 --- a/crates/compute_unit_runner/src/ipc.rs +++ b/crates/compute_unit_runner/src/ipc.rs @@ -631,7 +631,7 @@ impl IPCClient for IPCClientImpl { &self, id: Option<&str>, ) -> Result, IPCError> { - let qs = QueryString::new().with_opt_value("id", id); + let qs = QueryString::dynamic().with_opt_value("id", id); let url: hyper::Uri = Uri::new( self.unix_socket_addr.clone(), format!("/api/v1/data{qs}").as_str(), diff --git a/src/api/client/job.rs b/src/api/client/job.rs index fd07927..d6eeb2b 100644 --- a/src/api/client/job.rs +++ b/src/api/client/job.rs @@ -1,7 +1,6 @@ use crate::{ core::db::{ - Job, - JobUpdateInfo, + GetJobParams, Job, JobUpdateInfo }, job::job_mgr::JobDetails, utils::StdIntoAnyhowResult, @@ -50,7 +49,7 @@ impl JobClient { .anyhow() } - pub async fn get(&self, job_id: &ObjectId) -> Result> { + pub async fn get_by_id(&self, job_id: &ObjectId) -> Result> { let resp = self .client .get( @@ -84,6 +83,47 @@ impl JobClient { .anyhow() } + pub async fn get(&self, get_job_params: &GetJobParams) -> Result> { + let mut uri = self.base_uri + .clone() + .join("job")?; + + if let Some(id) = get_job_params.id.as_ref() { + uri.query_pairs_mut().append_pair("id", id.to_string().as_str()); + } + + if let Some(name) = get_job_params.name.as_ref() { + uri.query_pairs_mut().append_pair("name", name.as_str()); + } + + let resp = self + .client + .get(uri) + .send() + .await + .anyhow()?; + + if resp.status() == StatusCode::NOT_FOUND { + return Ok(None); + } + + if !resp.status().is_success() { + let code = resp.status(); + let err_msg = resp + .bytes() + .await + .anyhow() + .and_then(|body| String::from_utf8(body.into()).anyhow())?; + return Err(anyhow!("get job {code} reason {err_msg}")); + } + + resp.bytes() + .await + .anyhow() + .and_then(|body| serde_json::from_slice(&body).anyhow()) + .anyhow() + } + pub async fn list(&self) -> Result> { let resp = self .client diff --git a/src/api/job_api.rs b/src/api/job_api.rs index 72ec88c..b8b94d9 100644 --- a/src/api/job_api.rs +++ b/src/api/job_api.rs @@ -2,11 +2,7 @@ use std::str::FromStr; use crate::{ core::db::{ - Job, - JobDbRepo, - JobUpdateInfo, - ListJobParams, - MainDbRepo, + GetJobParams, Job, JobDbRepo, JobUpdateInfo, ListJobParams, MainDbRepo }, driver::Driver, job::job_mgr::JobManager, @@ -28,11 +24,22 @@ where } } -async fn get(db_repo: web::Data, path: web::Path) -> HttpResponse +async fn get_by_id(db_repo: web::Data, path: web::Path) -> HttpResponse where MAINR: MainDbRepo, { - match db_repo.get(&path.into_inner()).await { + match db_repo.get(&GetJobParams::new().set_id(path.into_inner())).await { + Ok(Some(inserted_result)) => HttpResponse::Ok().json(&inserted_result), + Ok(None) => HttpResponse::NotFound().finish(), + Err(err) => HttpResponse::InternalServerError().body(err.to_string()), + } +} + +async fn get(db_repo: web::Data, query: web::Query) -> HttpResponse +where + MAINR: MainDbRepo, +{ + match db_repo.get(&query.into_inner()).await { Ok(Some(inserted_result)) => HttpResponse::Ok().json(&inserted_result), Ok(None) => HttpResponse::NotFound().finish(), Err(err) => HttpResponse::InternalServerError().body(err.to_string()), @@ -60,7 +67,7 @@ where JOBR: JobDbRepo, { let id = ObjectId::from_str(&path.into_inner()).unwrap(); - match job_manager.clean_job(&id).await { + match job_manager.clean_job(&GetJobParams::new().set_id(id)).await { Ok(_) => HttpResponse::Ok().finish(), Err(err) => HttpResponse::InternalServerError().body(err.to_string()), } @@ -93,7 +100,7 @@ where JOBR: JobDbRepo, { let id = ObjectId::from_str(&path.into_inner()).unwrap(); - match job_manager.get_job_details(&id).await { + match job_manager.get_job_details(&GetJobParams::new().set_id(id)).await { Ok(detail) => HttpResponse::Ok().json(detail), Err(err) => HttpResponse::InternalServerError().body(err.to_string()), } @@ -109,7 +116,7 @@ where JOBR: JobDbRepo, { let id = ObjectId::from_str(&path.into_inner()).unwrap(); - match job_manager.start_job(&id).await { + match job_manager.start_job(&GetJobParams::new().set_id(id)).await { Ok(detail) => HttpResponse::Ok().json(detail), Err(err) => HttpResponse::InternalServerError().body(err.to_string()), } @@ -121,10 +128,14 @@ where MAINR: MainDbRepo, JOBR: JobDbRepo, { - cfg.service(web::resource("/job").route(web::post().to(create::))) + cfg.service( + web::resource("/job") + .route(web::post().to(create::)) + .route(web::get().to(get::)) + ) .service( web::resource("/job/{id}") - .route(web::get().to(get::)) + .route(web::get().to(get_by_id::)) .route(web::post().to(update::)) .route(web::delete().to(clean_job::)), ) diff --git a/src/bin/jz-flow/job.rs b/src/bin/jz-flow/job.rs index ff829a3..606f58d 100644 --- a/src/bin/jz-flow/job.rs +++ b/src/bin/jz-flow/job.rs @@ -12,9 +12,9 @@ use clap::{ }; use jiaoziflow::{ api::client::JzFlowClient, - core::db::Job, + core::db::{GetJobParams, Job}, dag::Dag, - utils::sizefmt::SmartSize, + utils::{sizefmt::SmartSize, IntoAnyhowResult}, }; use mongodb::bson::oid::ObjectId; use prettytable::{ @@ -117,8 +117,8 @@ pub(super) async fn list_job(global_opts: GlobalOptions, args: ListJobArgs) -> R #[derive(Debug, Args)] pub(super) struct JobDetailArgs { - #[arg(index = 1, help = "job name, must be unique")] - pub(super) id: String, + #[arg(index = 1, help = "job name or id")] + pub(super) name_or_id: String, #[arg(long, default_value = "table", help = "format json/table")] pub(super) format: String, @@ -126,8 +126,13 @@ pub(super) struct JobDetailArgs { pub(super) async fn get_job_details(global_opts: GlobalOptions, args: JobDetailArgs) -> Result<()> { let client = JzFlowClient::new(&global_opts.listen)?.job(); - let id: ObjectId = ObjectId::from_str(&args.id)?; - let job_detail = client.get_job_detail(&id).await?; + let get_job_params = match ObjectId::from_str(&args.name_or_id){ + Ok(id)=> GetJobParams::new().set_id(id), + Err(_)=> GetJobParams::new().set_name(args.name_or_id) + }; + + let job: Job = client.get(&get_job_params).await?.anyhow("job not exit")?; + let job_detail = client.get_job_detail(&job.id).await?; if args.format == "json" { println!("{}", serde_json::to_string_pretty(&job_detail)?); @@ -194,30 +199,42 @@ pub(super) async fn get_job_details(global_opts: GlobalOptions, args: JobDetailA #[derive(Debug, Args)] pub(super) struct RunJobArgs { - #[arg(index = 1, help = "job name, must be unique")] - pub(super) id: String, + #[arg(index = 1, help = "job name or id")] + pub(super) name_or_id: String, } pub(super) async fn run_job(global_opts: GlobalOptions, args: RunJobArgs) -> Result<()> { let client = JzFlowClient::new(&global_opts.listen)?.job(); - let id: ObjectId = ObjectId::from_str(&args.id)?; - client.run_job(&id).await?; - println!("Run job successfully, job ID: {}", args.id); + let get_job_params = match ObjectId::from_str(&args.name_or_id){ + Ok(id)=> GetJobParams::new().set_id(id), + Err(_)=> GetJobParams::new().set_name(args.name_or_id) + }; + let job = client.get(&get_job_params).await?.anyhow("job not exit")?; + + client.run_job(&job.id).await?; + + println!("Run job successfully, job ID: {}", job.id); Ok(()) } #[derive(Debug, Args)] pub(super) struct CleanJobArgs { - #[arg(index = 1, help = "job name, must be unique")] - pub(super) id: String, + #[arg(index = 1, help = "job name or id")] + pub(super) name_or_id: String, } pub(super) async fn clean_job(global_opts: GlobalOptions, args: CleanJobArgs) -> Result<()> { let client = JzFlowClient::new(&global_opts.listen)?.job(); - let id: ObjectId = ObjectId::from_str(&args.id)?; - client.clean_job(&id).await?; - println!("Clean job successfully, job ID: {}", args.id); + let get_job_params = match ObjectId::from_str(&args.name_or_id){ + Ok(id)=> GetJobParams::new().set_id(id), + Err(_)=> GetJobParams::new().set_name(args.name_or_id) + }; + let job = client.get(&get_job_params).await?.anyhow("job not exit")?; + + client.clean_job(&job.id).await?; + + println!("Clean job successfully, job ID: {}", job.id); Ok(()) } diff --git a/src/core/main_db_models.rs b/src/core/main_db_models.rs index 4671f60..aa48b2f 100644 --- a/src/core/main_db_models.rs +++ b/src/core/main_db_models.rs @@ -37,10 +37,42 @@ pub struct JobUpdateInfo { pub struct ListJobParams { pub state: Option, } + +#[derive(Serialize, Deserialize, Debug)] +pub struct GetJobParams { + pub name: Option, + pub id: Option, +} + +impl Default for GetJobParams { + fn default() -> Self { + Self::new() + } +} + +impl GetJobParams { + pub fn new() ->Self { + GetJobParams{ + name:None, + id:None, + } + } + + pub fn set_id(mut self, id: ObjectId) -> Self { + self.id = Some(id); + self + } + + pub fn set_name(mut self, name: String) -> Self { + self.name = Some(name); + self + } +} + pub trait JobRepo { fn insert(&self, job: &Job) -> impl std::future::Future> + Send; - fn get(&self, id: &ObjectId) -> impl std::future::Future>> + Send; + fn get(&self, id: &GetJobParams) -> impl std::future::Future>> + Send; fn delete(&self, id: &ObjectId) -> impl std::future::Future> + Send; diff --git a/src/dbrepo/main_db_mongo.rs b/src/dbrepo/main_db_mongo.rs index 37403c6..38b37d9 100644 --- a/src/dbrepo/main_db_mongo.rs +++ b/src/dbrepo/main_db_mongo.rs @@ -1,10 +1,6 @@ use crate::{ core::db::{ - Job, - JobRepo, - JobState, - JobUpdateInfo, - ListJobParams, + GetJobParams, Job, JobRepo, JobState, JobUpdateInfo, ListJobParams }, utils::{ IntoAnyhowResult, @@ -155,8 +151,20 @@ impl JobRepo for MongoMainDbRepo { self.job_col.find(query).await?.try_collect().await.anyhow() } - async fn get(&self, id: &ObjectId) -> Result> { - self.job_col.find_one(doc! {"_id": id}).await.anyhow() + async fn get(&self, get_params: &GetJobParams) -> Result> { + let mut query = doc! {}; + if let Some(id) = get_params.id.as_ref() { + query.insert("_id", id); + } + if let Some(name) = get_params.name.as_ref() { + query.insert("name", name); + } + + if query.is_empty() { + return Ok(None); + } + + self.job_col.find_one(query).await.anyhow() } async fn delete(&self, id: &ObjectId) -> Result<()> { diff --git a/src/job/job_mgr.rs b/src/job/job_mgr.rs index 1003006..554c958 100644 --- a/src/job/job_mgr.rs +++ b/src/job/job_mgr.rs @@ -1,11 +1,6 @@ use crate::{ core::db::{ - Job, - JobDbRepo, - JobState, - JobUpdateInfo, - ListJobParams, - MainDbRepo, + GetJobParams, Job, JobDbRepo, JobState, JobUpdateInfo, ListJobParams, MainDbRepo }, dag::Dag, dbrepo::MongoRunDbRepo, @@ -26,7 +21,6 @@ use anyhow::{ }; use futures::future::try_join_all; use kube::Client; -use mongodb::bson::oid::ObjectId; use serde::{ Deserialize, Serialize, @@ -202,8 +196,8 @@ where Ok(()) } - pub async fn get_job_details(&self, id: &ObjectId) -> Result { - let job = self.db.get(id).await?.anyhow("job not found")?; + pub async fn get_job_details(&self, params: &GetJobParams) -> Result { + let job = self.db.get(params).await?.anyhow("job not found")?; let node_status = if job.state == JobState::Running { let dag = Dag::from_json(job.graph_json.as_str())?; @@ -225,8 +219,8 @@ where Ok(JobDetails { job, node_status }) } - pub async fn start_job(&self, id: &ObjectId) -> Result<()> { - let job = self.db.get(id).await?.anyhow("job not found")?; + pub async fn start_job(&self, params: &GetJobParams) -> Result<()> { + let job = self.db.get(params).await?.anyhow("job not found")?; if job.state != JobState::Deployed { return Err(anyhow!("only can run a deployed job")); } @@ -248,12 +242,12 @@ where }) } - pub async fn clean_job(&self, id: &ObjectId) -> Result<()> { - let job = self.db.get(id).await?.anyhow("job not found")?; + pub async fn clean_job(&self, params: &GetJobParams) -> Result<()> { + let job = self.db.get(params).await?.anyhow("job not found")?; //clean k8s self.db .update( - id, + &job.id, &JobUpdateInfo { state: Some(JobState::Finish), }, @@ -265,7 +259,7 @@ where MongoRunDbRepo::drop(&db_url).await?; self.db .update( - id, + &job.id, &JobUpdateInfo { state: Some(JobState::Clean), },