Skip to content

Commit

Permalink
feat: add auto running
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 13, 2024
1 parent cd1a7fd commit 8a5a30f
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 58 deletions.
5 changes: 1 addition & 4 deletions crates/compute_unit_runner/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ where
}
}

async fn request_media_data(
&self,
_: Request<Empty>,
) -> Result<Response<DataBatch>, Status> {
async fn request_media_data(&self, _: Request<Empty>) -> Result<Response<DataBatch>, Status> {
todo!();
}
}
6 changes: 1 addition & 5 deletions crates/nodes_sdk/src/multi_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ impl MultiSender {
}

impl MultiSender {
pub async fn send(
&mut self,
val: DataBatch,
sent_nodes: &[&str],
) -> Result<(), Vec<String>> {
pub async fn send(&mut self, val: DataBatch, sent_nodes: &[&str]) -> Result<(), Vec<String>> {
let mut sent = vec![];
for (index, stream) in self.connects.iter_mut().enumerate() {
let url = &self.streams[index];
Expand Down
20 changes: 8 additions & 12 deletions src/api/client/job.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::{
core::db::{
GetJobParams, Job, JobUpdateInfo
GetJobParams,
Job,
JobUpdateInfo,
},
job::job_mgr::JobDetails,
utils::StdIntoAnyhowResult,
Expand Down Expand Up @@ -84,24 +86,18 @@ impl JobClient {
}

pub async fn get(&self, get_job_params: &GetJobParams) -> Result<Option<Job>> {
let mut uri = self.base_uri
.clone()
.join("job")?;

let mut uri = self.base_uri.clone().join("job")?;

if let Some(id) = get_job_params.id.as_ref() {
uri.query_pairs_mut().append_pair("id", id.to_string().as_str());
uri.query_pairs_mut()
.append_pair("id", id.to_string().as_str());
}

if let Some(name) = get_job_params.name.as_ref() {
uri.query_pairs_mut().append_pair("name", name.as_str());
}

let resp = self
.client
.get(uri)
.send()
.await
.anyhow()?;
let resp = self.client.get(uri).send().await.anyhow()?;

if resp.status() == StatusCode::NOT_FOUND {
return Ok(None);
Expand Down
45 changes: 27 additions & 18 deletions src/api/job_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ use std::str::FromStr;

use crate::{
core::db::{
GetJobParams, Job, JobDbRepo, JobUpdateInfo, ListJobParams, MainDbRepo
GetJobParams,
Job,
JobDbRepo,
JobUpdateInfo,
ListJobParams,
MainDbRepo,
},
driver::Driver,
job::job_mgr::JobManager,
Expand All @@ -28,7 +33,10 @@ async fn get_by_id<MAINR>(db_repo: web::Data<MAINR>, path: web::Path<ObjectId>)
where
MAINR: MainDbRepo,
{
match db_repo.get(&GetJobParams::new().set_id(path.into_inner())).await {
match db_repo
.get(&GetJobParams::new().set_id(path.into_inner()))
.await
{
Ok(Some(inserted_result)) => HttpResponse::Ok().json(&inserted_result),
Ok(None) => HttpResponse::NotFound().finish(),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
Expand Down Expand Up @@ -100,7 +108,10 @@ where
JOBR: JobDbRepo,
{
let id = ObjectId::from_str(&path.into_inner()).unwrap();
match job_manager.get_job_details(&GetJobParams::new().set_id(id)).await {
match job_manager
.get_job_details(&GetJobParams::new().set_id(id))
.await
{
Ok(detail) => HttpResponse::Ok().json(detail),
Err(err) => HttpResponse::InternalServerError().body(err.to_string()),
}
Expand Down Expand Up @@ -129,19 +140,17 @@ where
JOBR: JobDbRepo,
{
cfg.service(
web::resource("/job")
.route(web::post().to(create::<MAINR>))
.route(web::get().to(get::<MAINR>))
)
.service(
web::resource("/job/{id}")
.route(web::get().to(get_by_id::<MAINR>))
.route(web::post().to(update::<MAINR>))
.route(web::delete().to(clean_job::<D, MAINR, JOBR>)),
)
.service(web::resource("/jobs").route(web::get().to(list::<MAINR>)))
.service(
web::resource("/job/detail/{id}").route(web::get().to(job_details::<D, MAINR, JOBR>)),
)
.service(web::resource("/job/run/{id}").route(web::post().to(run_job::<D, MAINR, JOBR>)));
web::resource("/job")
.route(web::post().to(create::<MAINR>))
.route(web::get().to(get::<MAINR>)),
)
.service(
web::resource("/job/{id}")
.route(web::get().to(get_by_id::<MAINR>))
.route(web::post().to(update::<MAINR>))
.route(web::delete().to(clean_job::<D, MAINR, JOBR>)),
)
.service(web::resource("/jobs").route(web::get().to(list::<MAINR>)))
.service(web::resource("/job/detail/{id}").route(web::get().to(job_details::<D, MAINR, JOBR>)))
.service(web::resource("/job/run/{id}").route(web::post().to(run_job::<D, MAINR, JOBR>)));
}
32 changes: 21 additions & 11 deletions src/bin/jz-flow/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,15 @@ use clap::{
};
use jiaoziflow::{
api::client::JzFlowClient,
core::db::{GetJobParams, Job},
core::db::{
GetJobParams,
Job,
},
dag::Dag,
utils::{sizefmt::SmartSize, IntoAnyhowResult},
utils::{
sizefmt::SmartSize,
IntoAnyhowResult,
},
};
use mongodb::bson::oid::ObjectId;
use prettytable::{
Expand Down Expand Up @@ -54,6 +60,9 @@ pub(super) struct JobCreateArgs {

#[arg(long, help = "dag pipline definition")]
pub(super) path: String,

#[arg(long, help = "only deploy not run immediately")]
pub(super) manual_run: bool,
}

pub(super) async fn create_job(global_opts: GlobalOptions, args: JobCreateArgs) -> Result<()> {
Expand All @@ -66,6 +75,7 @@ pub(super) async fn create_job(global_opts: GlobalOptions, args: JobCreateArgs)
graph_json: dag_config,
created_at: tm,
updated_at: tm,
manual_run: args.manual_run,
..Default::default()
};

Expand Down Expand Up @@ -126,9 +136,9 @@ pub(super) struct JobDetailArgs {

pub(super) async fn get_job_details(global_opts: GlobalOptions, args: JobDetailArgs) -> Result<()> {
let client = JzFlowClient::new(&global_opts.listen)?.job();
let get_job_params = match ObjectId::from_str(&args.name_or_id){
Ok(id)=> GetJobParams::new().set_id(id),
Err(_)=> GetJobParams::new().set_name(args.name_or_id)
let get_job_params = match ObjectId::from_str(&args.name_or_id) {
Ok(id) => GetJobParams::new().set_id(id),
Err(_) => GetJobParams::new().set_name(args.name_or_id),
};

let job: Job = client.get(&get_job_params).await?.anyhow("job not exit")?;
Expand Down Expand Up @@ -206,9 +216,9 @@ pub(super) struct RunJobArgs {
pub(super) async fn run_job(global_opts: GlobalOptions, args: RunJobArgs) -> Result<()> {
let client = JzFlowClient::new(&global_opts.listen)?.job();

let get_job_params = match ObjectId::from_str(&args.name_or_id){
Ok(id)=> GetJobParams::new().set_id(id),
Err(_)=> GetJobParams::new().set_name(args.name_or_id)
let get_job_params = match ObjectId::from_str(&args.name_or_id) {
Ok(id) => GetJobParams::new().set_id(id),
Err(_) => GetJobParams::new().set_name(args.name_or_id),
};
let job = client.get(&get_job_params).await?.anyhow("job not exit")?;

Expand All @@ -227,9 +237,9 @@ pub(super) struct CleanJobArgs {
pub(super) async fn clean_job(global_opts: GlobalOptions, args: CleanJobArgs) -> Result<()> {
let client = JzFlowClient::new(&global_opts.listen)?.job();

let get_job_params = match ObjectId::from_str(&args.name_or_id){
Ok(id)=> GetJobParams::new().set_id(id),
Err(_)=> GetJobParams::new().set_name(args.name_or_id)
let get_job_params = match ObjectId::from_str(&args.name_or_id) {
Ok(id) => GetJobParams::new().set_id(id),
Err(_) => GetJobParams::new().set_name(args.name_or_id),
};
let job = client.get(&get_job_params).await?.anyhow("job not exit")?;

Expand Down
14 changes: 9 additions & 5 deletions src/core/main_db_models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct Job {
pub name: String,
pub graph_json: String,
pub state: JobState,
pub manual_run: bool,
pub created_at: i64,
pub updated_at: i64,
}
Expand Down Expand Up @@ -51,10 +52,10 @@ impl Default for GetJobParams {
}

impl GetJobParams {
pub fn new() ->Self {
GetJobParams{
name:None,
id:None,
pub fn new() -> Self {
GetJobParams {
name: None,
id: None,
}
}

Expand All @@ -72,7 +73,10 @@ impl GetJobParams {
pub trait JobRepo {
fn insert(&self, job: &Job) -> impl std::future::Future<Output = Result<Job>> + Send;

fn get(&self, id: &GetJobParams) -> impl std::future::Future<Output = Result<Option<Job>>> + Send;
fn get(
&self,
id: &GetJobParams,
) -> impl std::future::Future<Output = Result<Option<Job>>> + Send;

fn delete(&self, id: &ObjectId) -> impl std::future::Future<Output = Result<()>> + Send;

Expand Down
7 changes: 6 additions & 1 deletion src/dbrepo/main_db_mongo.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
use crate::{
core::db::{
GetJobParams, Job, JobRepo, JobState, JobUpdateInfo, ListJobParams
GetJobParams,
Job,
JobRepo,
JobState,
JobUpdateInfo,
ListJobParams,
},
utils::{
IntoAnyhowResult,
Expand Down
25 changes: 23 additions & 2 deletions src/job/job_mgr.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use crate::{
core::db::{
GetJobParams, Job, JobDbRepo, JobState, JobUpdateInfo, ListJobParams, MainDbRepo
GetJobParams,
Job,
JobDbRepo,
JobState,
JobUpdateInfo,
ListJobParams,
MainDbRepo,
},
dag::Dag,
dbrepo::MongoRunDbRepo,
Expand Down Expand Up @@ -107,7 +113,7 @@ where
while let Some(job) = db.get_job_for_running().await? {
let dag = Dag::from_json(job.graph_json.as_str())?;
match driver.deploy(job.name.as_str(), &dag).await {
Ok(_) => {
Ok(controller) => {
if let Err(err) = db
.update(
&job.id,
Expand All @@ -119,6 +125,21 @@ where
{
error!("set job to deploy state {err}");
}
if !job.manual_run {
if controller.start().await.is_ok() {
if let Err(err) = db
.update(
&job.id,
&JobUpdateInfo {
state: Some(JobState::Running),
},
)
.await
{
error!("start job {err}");
}
}
}
}
Err(err) => {
error!("run job {} {err}, start cleaning", job.name);
Expand Down

0 comments on commit 8a5a30f

Please sign in to comment.