Skip to content

Commit

Permalink
feat: support job name in cli
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 13, 2024
1 parent b77b739 commit cd1a7fd
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 56 deletions.
2 changes: 1 addition & 1 deletion crates/compute_unit_runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ hyper-util = "0.1.6"
http-body-util = "0.1.2"
walkdir = "2.5.0"
rand = "0.8.5"
query-string-builder = "=0.2.0"
query-string-builder = "=0.6.0"
2 changes: 1 addition & 1 deletion crates/compute_unit_runner/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ impl IPCClient for IPCClientImpl {
&self,
id: Option<&str>,
) -> Result<Option<AvaiableDataResponse>, IPCError> {
let qs = QueryString::new().with_opt_value("id", id);
let qs = QueryString::dynamic().with_opt_value("id", id);
let url: hyper::Uri = Uri::new(
self.unix_socket_addr.clone(),
format!("/api/v1/data{qs}").as_str(),
Expand Down
46 changes: 43 additions & 3 deletions src/api/client/job.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{
core::db::{
Job,
JobUpdateInfo,
GetJobParams, Job, JobUpdateInfo
},
job::job_mgr::JobDetails,
utils::StdIntoAnyhowResult,
Expand Down Expand Up @@ -50,7 +49,7 @@ impl JobClient {
.anyhow()
}

pub async fn get(&self, job_id: &ObjectId) -> Result<Option<Job>> {
pub async fn get_by_id(&self, job_id: &ObjectId) -> Result<Option<Job>> {
let resp = self
.client
.get(
Expand Down Expand Up @@ -84,6 +83,47 @@ impl JobClient {
.anyhow()
}

pub async fn get(&self, get_job_params: &GetJobParams) -> Result<Option<Job>> {
let mut uri = self.base_uri
.clone()
.join("job")?;

if let Some(id) = get_job_params.id.as_ref() {
uri.query_pairs_mut().append_pair("id", id.to_string().as_str());
}

if let Some(name) = get_job_params.name.as_ref() {
uri.query_pairs_mut().append_pair("name", name.as_str());
}

let resp = self
.client
.get(uri)
.send()
.await
.anyhow()?;

if resp.status() == StatusCode::NOT_FOUND {
return Ok(None);
}

if !resp.status().is_success() {
let code = resp.status();
let err_msg = resp
.bytes()
.await
.anyhow()
.and_then(|body| String::from_utf8(body.into()).anyhow())?;
return Err(anyhow!("get job {code} reason {err_msg}"));
}

resp.bytes()
.await
.anyhow()
.and_then(|body| serde_json::from_slice(&body).anyhow())
.anyhow()
}

pub async fn list(&self) -> Result<Vec<Job>> {
let resp = self
.client
Expand Down
35 changes: 23 additions & 12 deletions src/api/job_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,7 @@ use std::str::FromStr;

use crate::{
core::db::{
Job,
JobDbRepo,
JobUpdateInfo,
ListJobParams,
MainDbRepo,
GetJobParams, Job, JobDbRepo, JobUpdateInfo, ListJobParams, MainDbRepo
},
driver::Driver,
job::job_mgr::JobManager,
Expand All @@ -28,11 +24,22 @@ where
}
}

async fn get<MAINR>(db_repo: web::Data<MAINR>, path: web::Path<ObjectId>) -> HttpResponse
async fn get_by_id<MAINR>(db_repo: web::Data<MAINR>, path: web::Path<ObjectId>) -> HttpResponse
where
MAINR: MainDbRepo,
{
match db_repo.get(&path.into_inner()).await {
match db_repo.get(&GetJobParams::new().set_id(path.into_inner())).await {
Ok(Some(inserted_result)) => HttpResponse::Ok().json(&inserted_result),
Ok(None) => HttpResponse::NotFound().finish(),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
}
}

async fn get<MAINR>(db_repo: web::Data<MAINR>, query: web::Query<GetJobParams>) -> HttpResponse
where
MAINR: MainDbRepo,
{
match db_repo.get(&query.into_inner()).await {
Ok(Some(inserted_result)) => HttpResponse::Ok().json(&inserted_result),
Ok(None) => HttpResponse::NotFound().finish(),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
Expand Down Expand Up @@ -60,7 +67,7 @@ where
JOBR: JobDbRepo,
{
let id = ObjectId::from_str(&path.into_inner()).unwrap();
match job_manager.clean_job(&id).await {
match job_manager.clean_job(&GetJobParams::new().set_id(id)).await {
Ok(_) => HttpResponse::Ok().finish(),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
}
Expand Down Expand Up @@ -93,7 +100,7 @@ where
JOBR: JobDbRepo,
{
let id = ObjectId::from_str(&path.into_inner()).unwrap();
match job_manager.get_job_details(&id).await {
match job_manager.get_job_details(&GetJobParams::new().set_id(id)).await {
Ok(detail) => HttpResponse::Ok().json(detail),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
}
Expand All @@ -109,7 +116,7 @@ where
JOBR: JobDbRepo,
{
let id = ObjectId::from_str(&path.into_inner()).unwrap();
match job_manager.start_job(&id).await {
match job_manager.start_job(&GetJobParams::new().set_id(id)).await {
Ok(detail) => HttpResponse::Ok().json(detail),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
}
Expand All @@ -121,10 +128,14 @@ where
MAINR: MainDbRepo,
JOBR: JobDbRepo,
{
cfg.service(web::resource("/job").route(web::post().to(create::<MAINR>)))
cfg.service(
web::resource("/job")
.route(web::post().to(create::<MAINR>))
.route(web::get().to(get::<MAINR>))
)
.service(
web::resource("/job/{id}")
.route(web::get().to(get::<MAINR>))
.route(web::get().to(get_by_id::<MAINR>))
.route(web::post().to(update::<MAINR>))
.route(web::delete().to(clean_job::<D, MAINR, JOBR>)),
)
Expand Down
49 changes: 33 additions & 16 deletions src/bin/jz-flow/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use clap::{
};
use jiaoziflow::{
api::client::JzFlowClient,
core::db::Job,
core::db::{GetJobParams, Job},
dag::Dag,
utils::sizefmt::SmartSize,
utils::{sizefmt::SmartSize, IntoAnyhowResult},
};
use mongodb::bson::oid::ObjectId;
use prettytable::{
Expand Down Expand Up @@ -117,17 +117,22 @@ pub(super) async fn list_job(global_opts: GlobalOptions, args: ListJobArgs) -> R

#[derive(Debug, Args)]
pub(super) struct JobDetailArgs {
#[arg(index = 1, help = "job name, must be unique")]
pub(super) id: String,
#[arg(index = 1, help = "job name or id")]
pub(super) name_or_id: String,

#[arg(long, default_value = "table", help = "format json/table")]
pub(super) format: String,
}

pub(super) async fn get_job_details(global_opts: GlobalOptions, args: JobDetailArgs) -> Result<()> {
let client = JzFlowClient::new(&global_opts.listen)?.job();
let id: ObjectId = ObjectId::from_str(&args.id)?;
let job_detail = client.get_job_detail(&id).await?;
let get_job_params = match ObjectId::from_str(&args.name_or_id){
Ok(id)=> GetJobParams::new().set_id(id),
Err(_)=> GetJobParams::new().set_name(args.name_or_id)
};

let job: Job = client.get(&get_job_params).await?.anyhow("job not exit")?;
let job_detail = client.get_job_detail(&job.id).await?;

if args.format == "json" {
println!("{}", serde_json::to_string_pretty(&job_detail)?);
Expand Down Expand Up @@ -194,30 +199,42 @@ pub(super) async fn get_job_details(global_opts: GlobalOptions, args: JobDetailA

#[derive(Debug, Args)]
pub(super) struct RunJobArgs {
#[arg(index = 1, help = "job name, must be unique")]
pub(super) id: String,
#[arg(index = 1, help = "job name or id")]
pub(super) name_or_id: String,
}

pub(super) async fn run_job(global_opts: GlobalOptions, args: RunJobArgs) -> Result<()> {
let client = JzFlowClient::new(&global_opts.listen)?.job();
let id: ObjectId = ObjectId::from_str(&args.id)?;
client.run_job(&id).await?;

println!("Run job successfully, job ID: {}", args.id);
let get_job_params = match ObjectId::from_str(&args.name_or_id){
Ok(id)=> GetJobParams::new().set_id(id),
Err(_)=> GetJobParams::new().set_name(args.name_or_id)
};
let job = client.get(&get_job_params).await?.anyhow("job not exit")?;

client.run_job(&job.id).await?;

println!("Run job successfully, job ID: {}", job.id);
Ok(())
}

#[derive(Debug, Args)]
pub(super) struct CleanJobArgs {
#[arg(index = 1, help = "job name, must be unique")]
pub(super) id: String,
#[arg(index = 1, help = "job name or id")]
pub(super) name_or_id: String,
}

pub(super) async fn clean_job(global_opts: GlobalOptions, args: CleanJobArgs) -> Result<()> {
let client = JzFlowClient::new(&global_opts.listen)?.job();
let id: ObjectId = ObjectId::from_str(&args.id)?;
client.clean_job(&id).await?;

println!("Clean job successfully, job ID: {}", args.id);
let get_job_params = match ObjectId::from_str(&args.name_or_id){
Ok(id)=> GetJobParams::new().set_id(id),
Err(_)=> GetJobParams::new().set_name(args.name_or_id)
};
let job = client.get(&get_job_params).await?.anyhow("job not exit")?;

client.clean_job(&job.id).await?;

println!("Clean job successfully, job ID: {}", job.id);
Ok(())
}
34 changes: 33 additions & 1 deletion src/core/main_db_models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,42 @@ pub struct JobUpdateInfo {
pub struct ListJobParams {
pub state: Option<JobState>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct GetJobParams {
pub name: Option<String>,
pub id: Option<ObjectId>,
}

impl Default for GetJobParams {
fn default() -> Self {
Self::new()
}
}

impl GetJobParams {
pub fn new() ->Self {
GetJobParams{
name:None,
id:None,
}
}

pub fn set_id(mut self, id: ObjectId) -> Self {
self.id = Some(id);
self
}

pub fn set_name(mut self, name: String) -> Self {
self.name = Some(name);
self
}
}

pub trait JobRepo {
fn insert(&self, job: &Job) -> impl std::future::Future<Output = Result<Job>> + Send;

fn get(&self, id: &ObjectId) -> impl std::future::Future<Output = Result<Option<Job>>> + Send;
fn get(&self, id: &GetJobParams) -> impl std::future::Future<Output = Result<Option<Job>>> + Send;

fn delete(&self, id: &ObjectId) -> impl std::future::Future<Output = Result<()>> + Send;

Expand Down
22 changes: 15 additions & 7 deletions src/dbrepo/main_db_mongo.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use crate::{
core::db::{
Job,
JobRepo,
JobState,
JobUpdateInfo,
ListJobParams,
GetJobParams, Job, JobRepo, JobState, JobUpdateInfo, ListJobParams
},
utils::{
IntoAnyhowResult,
Expand Down Expand Up @@ -155,8 +151,20 @@ impl JobRepo for MongoMainDbRepo {
self.job_col.find(query).await?.try_collect().await.anyhow()
}

async fn get(&self, id: &ObjectId) -> Result<Option<Job>> {
self.job_col.find_one(doc! {"_id": id}).await.anyhow()
async fn get(&self, get_params: &GetJobParams) -> Result<Option<Job>> {
let mut query = doc! {};
if let Some(id) = get_params.id.as_ref() {
query.insert("_id", id);
}
if let Some(name) = get_params.name.as_ref() {
query.insert("name", name);
}

if query.is_empty() {
return Ok(None);
}

self.job_col.find_one(query).await.anyhow()
}

async fn delete(&self, id: &ObjectId) -> Result<()> {
Expand Down
Loading

0 comments on commit cd1a7fd

Please sign in to comment.