Skip to content

Commit

Permalink
feat: add job cli
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 7, 2024
1 parent 690b13d commit 88d2276
Show file tree
Hide file tree
Showing 13 changed files with 156 additions and 41 deletions.
1 change: 1 addition & 0 deletions crates/compute_unit_runner/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ pub trait IPCClient {

pub struct IPCClientImpl {
unix_socket_addr: String,
//TODO change reqwest https://github.com/seanmonstar/reqwest/pull/1623
client: Client<UnixConnector, Full<Bytes>>,
}

Expand Down
4 changes: 4 additions & 0 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ $(OUTPUT):
mkdir -p $(OUTPUT)

################### build crates
build-jz: $(OUTPUT)
cargo build --release --bin jz-flow
cp target/release/jz-flow $(OUTPUT)/jz-flow

build-cd: $(OUTPUT)
cargo build -p compute_unit_runner --release --bin compute_unit_runner
cp target/release/compute_unit_runner $(OUTPUT)/compute_unit_runner
Expand Down
52 changes: 52 additions & 0 deletions script/example_dag.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
{
"name": "example",
"version": "v1",
"dag": [
{
"name": "dummy-in",
"spec": {
"image": "jz-action/dummy_in:latest",
"command": "/dummy_in",
"args": [
"--log-level=debug"
]
}
},
{
"name": "copy-in-place",
"node_type": "ComputeUnit",
"dependency": [
"dummy-in"
],
"spec": {
"image": "jz-action/copy_in_place:latest",
"command": "/copy_in_place",
"replicas": 3,
"args": [
"--log-level=debug"
]
},
"channel": {
"cache_type": "Memory"
}
},
{
"name": "dummy-out",
"node_type": "ComputeUnit",
"dependency": [
"copy-in-place"
],
"spec": {
"image": "jz-action/dummy_out:latest",
"command": "/dummy_out",
"replicas": 3,
"args": [
"--log-level=debug"
]
},
"channel": {
"cache_type": "Memory"
}
}
]
}
25 changes: 2 additions & 23 deletions src/api/client/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,11 @@ use awc::{
Client,
};
use mongodb::bson::oid::ObjectId;
#[derive(Clone)]
pub struct JzFlowClient {
client: Client,
base_uri: String,
}

impl JzFlowClient {
pub fn new(base_uri: &str) -> Result<Self> {
let client = Client::builder()
.add_default_header(("Content-Type", "application/json"))
.finish();
let base_uri = base_uri.to_string() + "/api/v1";
Ok(JzFlowClient { client, base_uri })
}

fn job(&self) -> JobClient {
JobClient {
client: self.client.clone(),
base_uri: self.base_uri.clone(),
}
}
}

pub struct JobClient {
client: Client,
base_uri: String,
pub(crate) client: Client,
pub(crate) base_uri: String,
}

impl JobClient {
Expand Down
29 changes: 28 additions & 1 deletion src/api/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,28 @@
pub mod job;
mod job;

use awc::Client;
use anyhow::Result;
use job::JobClient;

#[derive(Clone)]
pub struct JzFlowClient {
client: Client,
base_uri: String,
}

impl JzFlowClient {
pub fn new(base_uri: &str) -> Result<Self> {
let client = Client::builder()
.add_default_header(("Content-Type", "application/json"))
.finish();
let base_uri = base_uri.to_string() + "/api/v1";
Ok(JzFlowClient { client, base_uri })
}

pub fn job(&self) -> JobClient {
JobClient {
client: self.client.clone(),
base_uri: self.base_uri.clone(),
}
}
}
9 changes: 7 additions & 2 deletions src/api/server.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

use std::fmt::format;

use actix_web::{
dev::Server,
error,
Expand All @@ -12,14 +14,15 @@ use actix_web::{
HttpServer,
};
use anyhow::Result;
use awc::http::Uri;

use crate::{
core::db::{
JobDbRepo,
MainDbRepo,
},
driver::Driver,
job::job_mgr::JobManager,
job::job_mgr::JobManager, utils::IntoAnyhowResult,
};

use super::job_api::job_route_config;
Expand Down Expand Up @@ -53,6 +56,8 @@ where
JOBR: JobDbRepo,
{
let db_repo = db_repo;
let uri = Uri::try_from(addr)?;
let host_port= format!("{}:{}", uri.host().anyhow("host not found")?, uri.port().map(|v|v.as_u16()).unwrap_or_else(||80));
let server = HttpServer::new(move || {
fn json_error_handler(err: error::JsonPayloadError, _req: &HttpRequest) -> error::Error {
use actix_web::error::JsonPayloadError;
Expand All @@ -75,7 +80,7 @@ where
.app_data(web::JsonConfig::default().error_handler(json_error_handler))
})
.disable_signals()
.bind(addr)?
.bind(host_port)?
.run();

Ok(server)
Expand Down
4 changes: 2 additions & 2 deletions src/bin/jz-flow/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use clap::Args;

#[derive(Debug, Args)]
pub(super) struct GlobalOptions {
#[arg(short, long, default_value = "INFO")]
#[arg(long, default_value = "INFO", help="set log level(TRACE, DEBUG, INFO, WARN, ERROR)")]
pub(super) log_level: String,

#[arg(short, long, default_value = "localhost:45131")]
#[arg(long, default_value = "http://localhost:45131", help="set api address")]
pub(super) listen: String,
}
38 changes: 35 additions & 3 deletions src/bin/jz-flow/job.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,48 @@
use crate::global::GlobalOptions;
use anyhow::Result;
use clap::Args;
use chrono::Utc;
use clap::{Args, Parser, Subcommand};
use tokio::fs;
use jz_action::{api::client::JzFlowClient, core::db::Job, dag::Dag};

#[derive(Debug, Parser)]
pub(super) enum JobCommands {
/// Adds files to myapp
Create(JobCreateArgs),
}

pub(super) async fn run_job_subcommand(global_opts: GlobalOptions ,command: JobCommands) ->Result<()> {
match command {
JobCommands::Create(args) => create_job(global_opts, args).await,
}
}


#[derive(Debug, Args)]
pub(super) struct JobCreateArgs {
#[arg(long)]
#[arg(long, help="job name, must be unique")]
pub(super) name: String,

#[arg(long, help = "dag pipline definition")]
pub(super) path: String,
}

pub(super) async fn run_backend(global_opts: GlobalOptions, args: JobCreateArgs) -> Result<()> {
pub(super) async fn create_job(global_opts: GlobalOptions, args: JobCreateArgs) -> Result<()> {
let client = JzFlowClient::new(&global_opts.listen)?.job();
let dag_config = fs::read_to_string(&args.path).await?;
let _ = Dag::from_json(dag_config.as_str())?;
let tm = Utc::now().timestamp();
let job = Job{
name: args.name.clone(),
graph_json: dag_config,
created_at: tm,
updated_at: tm,
..Default::default()
};

println!("aaaaad");
let created_job = client.create(&job).await?;

println!("Create job successfully, job ID: {}", created_job.id.to_string());
Ok(())
}
5 changes: 5 additions & 0 deletions src/bin/jz-flow/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use clap::{
};

use global::GlobalOptions;
use job::{run_job_subcommand, JobCommands};
use jz_action::{
core::db::MainDbRepo,
utils::StdIntoAnyhowResult,
Expand All @@ -35,6 +36,9 @@ struct Cli {
enum Commands {
/// Adds files to myapp
Run(RunArgs),

#[command(subcommand)]
Job(JobCommands)
}

#[tokio::main(flavor = "multi_thread")]
Expand All @@ -48,5 +52,6 @@ async fn main() -> Result<()> {

match args.command {
Commands::Run(run_args) => run_backend(args.global_opts, run_args).await,
Commands::Job(job_commands) => run_job_subcommand(args.global_opts, job_commands).await,
}
}
6 changes: 3 additions & 3 deletions src/bin/jz-flow/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ use crate::global::GlobalOptions;

#[derive(Debug, Args)]
pub(super) struct RunArgs {
#[arg(short, long)]
#[arg(long, default_value="mongodb://192.168.3.163:27017", help="mongo connection string")]
mongo_url: String,
}

pub(super) async fn run_backend(global_opts: GlobalOptions, args: RunArgs) -> Result<()> {
let mut join_set: JoinSet<Result<()>> = JoinSet::new();
let token = CancellationToken::new();

let db_url = args.mongo_url.to_string() + "jz_action";
let db_url = args.mongo_url.to_string() + "/jz_action";
let db_repo = MongoMainDbRepo::new(db_url.as_str()).await?;
let client = Client::try_default().await.unwrap();

Expand All @@ -54,7 +54,7 @@ pub(super) async fn run_backend(global_opts: GlobalOptions, args: RunArgs) -> Re
&args.mongo_url,
)
.await?;
let server = start_rpc_server(&global_opts.listen, db_repo, job_manager).unwrap();
let server = start_rpc_server(&global_opts.listen, db_repo, job_manager)?;
let handler = server.handle();
{
//listen unix socket
Expand Down
7 changes: 6 additions & 1 deletion src/core/main_db_models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ pub enum JobState {
Finish,
}

#[derive(Serialize, Deserialize, Debug)]
impl Default for JobState {
fn default() -> Self {
JobState::Created
}
}
#[derive(Default, Serialize, Deserialize, Debug)]
pub struct Job {
pub id: ObjectId,
pub name: String,
Expand Down
6 changes: 3 additions & 3 deletions src/driver/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
Mutex,
},
};
pub trait UnitHandler {
pub trait UnitHandler:Send {
//pause graph running for now
fn pause(&mut self) -> impl Future<Output = Result<()>> + Send;

Expand All @@ -23,7 +23,7 @@ pub trait UnitHandler {
) -> impl Future<Output = Result<Option<Arc<Mutex<impl ChannelHandler>>>>> + Send;
}

pub trait ChannelHandler {
pub trait ChannelHandler:Send {
//pause graph running for now
fn pause(&mut self) -> impl Future<Output = Result<()>> + Send;

Expand All @@ -34,7 +34,7 @@ pub trait ChannelHandler {
fn stop(&mut self) -> impl Future<Output = Result<()>> + Send;
}

pub trait PipelineController {
pub trait PipelineController:Send {
fn get_node<'a>(
&'a self,
id: &'a String,
Expand Down
11 changes: 8 additions & 3 deletions src/job/job_mgr.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{
core::db::{
JobDbRepo,
MainDbRepo,
JobDbRepo, JobState, JobUpdateInfo, MainDbRepo
},
dag::Dag,
driver::Driver,
Expand Down Expand Up @@ -59,7 +58,13 @@ where
let dag = Dag::from_json(job.graph_json.as_str())?;
let namespace = format!("{}-{}", job.name, job.retry_number);
if let Err(err) = driver.deploy(namespace.as_str(), &dag).await {
error!("run job {} {err}", job.name);
error!("run job {} {err}, start cleaning", job.name);
if let Err(err) = driver.clean(namespace.as_str()).await {
error!("clean job resource {err}");
}
if let Err(err) = db.update(&job.id, &JobUpdateInfo{state: Some(JobState::Error)}).await{
error!("set job to error state {err}");
}
};
}
}
Expand Down

0 comments on commit 88d2276

Please sign in to comment.