From 61e0f8e3acc5b5f510b1d3848b7ed12df1ffc05b Mon Sep 17 00:00:00 2001 From: hunjixin <1084400399@qq.com> Date: Tue, 6 Aug 2024 15:01:47 +0000 Subject: [PATCH] feat: add main database, job model, basic api implementation --- Cargo.toml | 2 + .../src/bin/compute_unit_runner.rs | 11 +- crates/dp_runner/src/main.rs | 11 +- nodes/jz_reader/src/main.rs | 4 +- src/api/client/job.rs | 134 ++++++++++++++++++ src/api/client/mod.rs | 1 + src/api/job_api.rs | 70 ++++++--- src/api/mod.rs | 1 + src/api/server.rs | 27 ++-- src/bin/jz-flow/global.rs | 10 ++ src/bin/jz-flow/job.rs | 16 +++ src/bin/jz-flow/main.rs | 83 +++++++++++ src/bin/{main.rs => jz-flow/run.rs} | 54 +++---- src/core/db_config.rs | 3 - src/core/main_db_models.rs | 12 +- src/core/mod.rs | 2 - src/dbrepo/job_db_mongo.rs | 49 ++++--- src/dbrepo/main_db_mongo.rs | 83 ++++++++--- src/dbrepo/mod.rs | 2 - src/dbrepo/mongo_config.rs | 20 --- src/driver/driver.rs | 2 +- src/driver/kube.rs | 53 +++---- src/driver/kubetpl/channel_statefulset.tpl | 3 +- src/driver/kubetpl/statefulset.tpl | 3 +- src/job/job_mgr.rs | 69 +++++++-- src/utils/utils.rs | 4 +- 26 files changed, 526 insertions(+), 203 deletions(-) create mode 100644 src/api/client/job.rs create mode 100644 src/api/client/mod.rs create mode 100644 src/bin/jz-flow/global.rs create mode 100644 src/bin/jz-flow/job.rs create mode 100644 src/bin/jz-flow/main.rs rename src/bin/{main.rs => jz-flow/run.rs} (63%) delete mode 100644 src/core/db_config.rs delete mode 100644 src/dbrepo/mongo_config.rs diff --git a/Cargo.toml b/Cargo.toml index 34e166a..e452ae2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ async-trait = "0.1.81" futures = "0.3.30" clap = {version="4.5.7", features=["derive"]} actix-web = "4.8.0" +awc="3.5.0" [package] name = "jz_action" @@ -56,6 +57,7 @@ tokio-stream = {workspace = true} tokio-util= {workspace = true} actix-web = {workspace = true} +awc= {workspace = true} clap = {workspace = true} uuid = {workspace = true} anyhow = {workspace = true} diff --git a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs index ec958ea..e7b15b0 100644 --- a/crates/compute_unit_runner/src/bin/compute_unit_runner.rs +++ b/crates/compute_unit_runner/src/bin/compute_unit_runner.rs @@ -8,10 +8,7 @@ use compute_unit_runner::{ state_controller::StateController, }; use jz_action::{ - dbrepo::{ - MongoConfig, - MongoRunDbRepo, - }, + dbrepo::MongoRunDbRepo, utils::StdIntoAnyhowResult, }; @@ -62,9 +59,6 @@ struct Args { #[arg(short, long)] mongo_url: String, - #[arg(short, long)] - database: String, - #[arg(short, long, default_value = "/unix_socket/compute_unit_runner_d")] unix_socket_addr: String, } @@ -84,8 +78,7 @@ async fn main() -> Result<()> { args.tmp_path.expect("compute node only support disk cache"), )); - let db_repo = - MongoRunDbRepo::new(MongoConfig::new(args.mongo_url.clone()), &args.database).await?; + let db_repo = MongoRunDbRepo::new(&args.mongo_url).await?; let program = MediaDataTracker::new(db_repo.clone(), &args.node_name, fs_cache, args.buf_size); diff --git a/crates/dp_runner/src/main.rs b/crates/dp_runner/src/main.rs index 2d266bb..854830a 100644 --- a/crates/dp_runner/src/main.rs +++ b/crates/dp_runner/src/main.rs @@ -3,10 +3,7 @@ mod state_controller; mod stream; use jz_action::{ - dbrepo::{ - MongoConfig, - MongoRunDbRepo, - }, + dbrepo::MongoRunDbRepo, network::datatransfer::data_stream_server::DataStreamServer, utils::StdIntoAnyhowResult, }; @@ -58,9 +55,6 @@ struct Args { #[arg(short, long)] mongo_url: String, - #[arg(short, long)] - database: String, - #[arg(short, long, default_value = "30")] buf_size: usize, @@ -79,8 +73,7 @@ async fn main() -> Result<()> { let mut join_set = JoinSet::new(); let token = CancellationToken::new(); - let db_repo = - MongoRunDbRepo::new(MongoConfig::new(args.mongo_url.clone()), &args.database).await?; + let db_repo = MongoRunDbRepo::new(&args.mongo_url).await?; let fs_cache: Arc = match args.tmp_path { Some(path) => Arc::new(FSCache::new(path)), diff --git a/nodes/jz_reader/src/main.rs b/nodes/jz_reader/src/main.rs index cbc94b0..4c61502 100644 --- a/nodes/jz_reader/src/main.rs +++ b/nodes/jz_reader/src/main.rs @@ -91,8 +91,8 @@ async fn main() -> Result<()> { { let shutdown_tx = shutdown_tx.clone(); let _ = tokio::spawn(async move { - if let Err(e) = read_jz_fs(args).await { - let _ = shutdown_tx.send(Err(anyhow!("read jz fs {e}"))).await; + if let Err(err) = read_jz_fs(args).await { + let _ = shutdown_tx.send(Err(anyhow!("read jz fs {err}"))).await; } }); } diff --git a/src/api/client/job.rs b/src/api/client/job.rs new file mode 100644 index 0000000..c07ec6c --- /dev/null +++ b/src/api/client/job.rs @@ -0,0 +1,134 @@ +use crate::{ + core::db::{Job, JobUpdateInfo}, + utils::StdIntoAnyhowResult, +}; +use anyhow::{ + anyhow, + Result, +}; +use awc::{ + http::StatusCode, + Client, +}; +use mongodb::bson::oid::ObjectId; +#[derive(Clone)] +pub struct JzFlowClient { + client: Client, + base_uri: String, +} + +impl JzFlowClient { + pub fn new(base_uri: &str) -> Result { + let client = Client::builder() + .add_default_header(("Content-Type", "application/json")) + .finish(); + let base_uri = base_uri.to_string() + "/api/v1"; + Ok(JzFlowClient { client, base_uri }) + } + + fn job(&self) -> JobClient { + JobClient { + client: self.client.clone(), + base_uri: self.base_uri.clone(), + } + } +} + +pub struct JobClient { + client: Client, + base_uri: String, +} + +impl JobClient { + pub async fn create(&self, job: &Job) -> Result { + let mut resp = self + .client + .post(self.base_uri.clone() + "/job") + .send_json(&job) + .await + .anyhow()?; + + if !resp.status().is_success() { + let err_msg = resp + .body() + .await + .anyhow() + .and_then(|body| String::from_utf8(body.into()).anyhow())?; + return Err(anyhow!("request job {} reason {err_msg}", resp.status())); + } + + resp.body() + .await + .anyhow() + .and_then(|body| serde_json::from_slice(&body).anyhow()) + .anyhow() + } + + pub async fn get(&self, job_id: &ObjectId) -> Result> { + let mut resp = self + .client + .get(self.base_uri.clone() + "/job/" + job_id.to_hex().as_str()) + .send() + .await + .anyhow()?; + + if resp.status() == StatusCode::NOT_FOUND { + return Ok(None); + } + + if !resp.status().is_success() { + let err_msg = resp + .body() + .await + .anyhow() + .and_then(|body| String::from_utf8(body.into()).anyhow())?; + return Err(anyhow!("request job {} reason {err_msg}", resp.status())); + } + + resp.body() + .await + .anyhow() + .and_then(|body| serde_json::from_slice(&body).anyhow()) + .anyhow() + } + + pub async fn delete(&self, job_id: &ObjectId) -> Result<()> { + let mut resp = self + .client + .delete(self.base_uri.clone() + "/job/" + job_id.to_hex().as_str()) + .send() + .await + .anyhow()?; + + if !resp.status().is_success() { + let err_msg = resp + .body() + .await + .anyhow() + .and_then(|body| String::from_utf8(body.into()).anyhow())?; + return Err(anyhow!("request job {} reason {err_msg}", resp.status())); + } + + Ok(()) + } + + pub async fn update(&self, job_id: &ObjectId, update_info: &JobUpdateInfo) -> Result<()> { + let mut resp = self + .client + .post(self.base_uri.clone() + "/job/" + job_id.to_hex().as_str()) + .send_json(update_info) + .await + .anyhow()?; + + if !resp.status().is_success() { + let err_msg = resp + .body() + .await + .anyhow() + .and_then(|body| String::from_utf8(body.into()).anyhow())?; + return Err(anyhow!("request job {} reason {err_msg}", resp.status())); + } + + Ok(()) + } +} diff --git a/src/api/client/mod.rs b/src/api/client/mod.rs new file mode 100644 index 0000000..80daa3e --- /dev/null +++ b/src/api/client/mod.rs @@ -0,0 +1 @@ +pub mod job; diff --git a/src/api/job_api.rs b/src/api/job_api.rs index 3d0e7bb..711b397 100644 --- a/src/api/job_api.rs +++ b/src/api/job_api.rs @@ -1,43 +1,58 @@ -use crate::core::db::{ - DBConfig, Job, JobDbRepo, JobRepo, MainDbRepo +use crate::{ + core::db::{ + Job, + JobDbRepo, + JobRepo, + JobState, + JobUpdateInfo, + MainDbRepo, + }, + driver::Driver, }; use actix_web::{ web, HttpResponse, }; use mongodb::bson::oid::ObjectId; -use serde::Serialize; +use serde::{ + Deserialize, + Serialize, +}; //TODO change to use route macro after https://github.com/actix/actix-web/issues/2866 resolved -async fn create<'reg, MAINR, JOBR, DBC>(db_repo: web::Data, data: web::Json) -> HttpResponse +async fn create(db_repo: web::Data, data: web::Json) -> HttpResponse where + D: Driver, MAINR: MainDbRepo, JOBR: JobDbRepo, - DBC: Clone + Serialize + Send + Sync + DBConfig + 'static, { match db_repo.insert(&data.0).await { - Ok(_) => HttpResponse::Ok().finish(), + Ok(inserted_result) => HttpResponse::Ok().json(&inserted_result), Err(err) => HttpResponse::InternalServerError().body(err.to_string()), } } -async fn get<'reg, MAINR, JOBR, DBC>(db_repo: web::Data, path: web::Path) -> HttpResponse +async fn get(db_repo: web::Data, path: web::Path) -> HttpResponse where + D: Driver, MAINR: MainDbRepo, JOBR: JobDbRepo, - DBC: Clone + Serialize + Send + Sync + DBConfig + 'static, { match db_repo.get(&path.into_inner()).await { - Ok(_) => HttpResponse::Ok().finish(), + Ok(Some(inserted_result)) => HttpResponse::Ok().json(&inserted_result), + Ok(None) => HttpResponse::NotFound().finish(), Err(err) => HttpResponse::InternalServerError().body(err.to_string()), } } -async fn delete<'reg, MAINR, JOBR, DBC>(db_repo: web::Data, path: web::Path) -> HttpResponse +async fn delete( + db_repo: web::Data, + path: web::Path, +) -> HttpResponse where + D: Driver, MAINR: MainDbRepo, JOBR: JobDbRepo, - DBC: Clone + Serialize + Send + Sync + DBConfig + 'static, { match db_repo.delete(&path.into_inner()).await { Ok(_) => HttpResponse::Ok().finish(), @@ -45,16 +60,39 @@ where } } -pub(super) fn job_route_config<'reg, MAINR, JOBR, DBC>(cfg: &mut web::ServiceConfig) +async fn update( + db_repo: web::Data, + path: web::Path, + query: web::Query, +) -> HttpResponse +where + D: Driver, + MAINR: MainDbRepo, + JOBR: JobDbRepo, +{ + match db_repo + .update(&path.into_inner(), &query.into_inner()) + .await + { + Ok(_) => HttpResponse::Ok().finish(), + Err(err) => HttpResponse::InternalServerError().body(err.to_string()), + } +} + +pub(super) fn job_route_config(cfg: &mut web::ServiceConfig) where + D: Driver, MAINR: MainDbRepo, JOBR: JobDbRepo, - DBC: Clone + Serialize + Send + Sync + DBConfig + 'static, { cfg.service( web::resource("/job") - .route(web::post().to(create::)) - .route(web::delete().to(delete::)), + .route(web::post().to(create::)) + .route(web::delete().to(delete::)), ) - .service(web::resource("/job/{id}").route(web::get().to(get::))); + .service( + web::resource("/job/{id}") + .route(web::get().to(get::)) + .route(web::post().to(update::)), + ); } diff --git a/src/api/mod.rs b/src/api/mod.rs index b2974e6..641547a 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -1,3 +1,4 @@ +pub mod client; pub mod server; mod job_api; diff --git a/src/api/server.rs b/src/api/server.rs index 33f2182..5d77f24 100644 --- a/src/api/server.rs +++ b/src/api/server.rs @@ -17,40 +17,43 @@ use serde::Serialize; use crate::{ core::db::{ - DBConfig, DataRepo, JobDbRepo, MainDbRepo + DataRepo, + JobDbRepo, + MainDbRepo, }, + driver::Driver, job::job_mgr::JobManager, }; use super::job_api::job_route_config; -fn v1_route<'reg, MAINR, JOBR, DBC>(cfg: &mut web::ServiceConfig) +fn v1_route(cfg: &mut web::ServiceConfig) where -MAINR: MainDbRepo, + D: Driver, + MAINR: MainDbRepo, JOBR: JobDbRepo, - DBC: Clone + Serialize + Send + Sync + DBConfig + 'static, { - cfg.service(web::scope("/job").configure(job_route_config::)); + cfg.service(web::scope("/job").configure(job_route_config::)); } -fn config<'reg, MAINR, JOBR, DBC>(cfg: &mut web::ServiceConfig) +fn config(cfg: &mut web::ServiceConfig) where + D: Driver, MAINR: MainDbRepo, JOBR: JobDbRepo, - DBC: Clone + Serialize + Send + Sync + DBConfig + 'static, { - cfg.service(web::scope("/api/v1").configure(v1_route::)); + cfg.service(web::scope("/api/v1").configure(v1_route::)); } -pub fn start_rpc_server<'reg, MAINR, JOBR, DBC>( +pub fn start_rpc_server( addr: &str, db_repo: MAINR, - job_manager: JobManager<'reg, JOBR, DBC>, + job_manager: JobManager, ) -> Result where + D: Driver, MAINR: MainDbRepo, JOBR: JobDbRepo, - DBC: Clone + Serialize + Send + Sync + DBConfig + 'static, { let db_repo = db_repo; let server = HttpServer::new(move || { @@ -71,7 +74,7 @@ where App::new() .wrap(middleware::Logger::default()) .app_data(db_repo.clone()) - .configure(config::) + .configure(config::) .app_data(web::JsonConfig::default().error_handler(json_error_handler)) }) .disable_signals() diff --git a/src/bin/jz-flow/global.rs b/src/bin/jz-flow/global.rs new file mode 100644 index 0000000..bc83c0d --- /dev/null +++ b/src/bin/jz-flow/global.rs @@ -0,0 +1,10 @@ +use clap::Args; + +#[derive(Debug, Args)] +pub(super) struct GlobalOptions { + #[arg(short, long, default_value = "INFO")] + pub(super) log_level: String, + + #[arg(short, long, default_value = "localhost:45131")] + pub(super) listen: String, +} diff --git a/src/bin/jz-flow/job.rs b/src/bin/jz-flow/job.rs new file mode 100644 index 0000000..92fd1a7 --- /dev/null +++ b/src/bin/jz-flow/job.rs @@ -0,0 +1,16 @@ +use crate::global::GlobalOptions; +use anyhow::Result; +use clap::Args; + +#[derive(Debug, Args)] +pub(super) struct JobCreateArgs { + #[arg(long)] + pub(super) name: String, + + #[arg(long, help = "dag pipline definition")] + pub(super) path: String, +} + +pub(super) async fn run_backend(global_opts: GlobalOptions, args: JobCreateArgs) -> Result<()> { + Ok(()) +} diff --git a/src/bin/jz-flow/main.rs b/src/bin/jz-flow/main.rs new file mode 100644 index 0000000..674d0df --- /dev/null +++ b/src/bin/jz-flow/main.rs @@ -0,0 +1,83 @@ +mod global; +mod job; +mod run; + +use anyhow::Result; +use clap::{ + Args, + Parser, + Subcommand, +}; + +use global::GlobalOptions; +use jz_action::{ + api::{ + self, + server::start_rpc_server, + }, + core::db::MainDbRepo, + dbrepo::{ + MongoMainDbRepo, + MongoRunDbRepo, + }, + driver::kube::KubeDriver, + utils::StdIntoAnyhowResult, +}; +use kube::Client; +use run::{ + run_backend, + RunArgs, +}; +use std::{ + path::Path, + str::FromStr, +}; +use tokio::{ + fs, + io::AsyncWriteExt, + select, + signal::unix::{ + signal, + SignalKind, + }, + task::JoinSet, + time::Instant, +}; +use tokio_util::sync::CancellationToken; +use tracing::{ + error, + info, + Level, +}; + +use jz_action::job::job_mgr::JobManager; + +#[derive(Debug, Parser)] +#[command(name = "jz-action-backend", author = "Author Name ", version, about= "jz-action backend", long_about = None, disable_version_flag = true)] +struct Cli { + #[clap(flatten)] + global_opts: GlobalOptions, + + #[command(subcommand)] + command: Commands, +} + +#[derive(Debug, Subcommand)] +enum Commands { + /// Adds files to myapp + Run(RunArgs), +} + +#[tokio::main(flavor = "multi_thread")] +async fn main() -> Result<()> { + let args = Cli::parse(); + + tracing_subscriber::fmt() + .with_max_level(Level::from_str(&args.global_opts.log_level)?) + .try_init() + .anyhow()?; + + match args.command { + Commands::Run(run_args) => run_backend(args.global_opts, run_args).await, + } +} diff --git a/src/bin/main.rs b/src/bin/jz-flow/run.rs similarity index 63% rename from src/bin/main.rs rename to src/bin/jz-flow/run.rs index 9a052d8..06bcb20 100644 --- a/src/bin/main.rs +++ b/src/bin/jz-flow/run.rs @@ -1,13 +1,17 @@ use anyhow::Result; -use clap::Parser; +use clap::{ + Args, + Parser, + Subcommand, +}; use jz_action::{ api::{ self, server::start_rpc_server, }, + core::db::MainDbRepo, dbrepo::{ - MongoConfig, MongoMainDbRepo, MongoRunDbRepo, }, @@ -39,51 +43,39 @@ use tracing::{ use jz_action::job::job_mgr::JobManager; -#[derive(Debug, Parser)] -#[command( - name = "jz-action-backend", - version = "0.0.1", - author = "Author Name ", - about = "jz-action backend" -)] - -struct Args { - #[arg(short, long, default_value = "INFO")] - log_level: String, - - #[arg(short, long, default_value = "localhost:45131")] - listen: String, +use crate::global::GlobalOptions; +#[derive(Debug, Args)] +pub(super) struct RunArgs { #[arg(short, long)] mongo_url: String, - - #[arg(short, long, default_value = "/app/tmp")] - tmp_path: String, } -#[tokio::main(flavor = "multi_thread")] -async fn main() -> Result<()> { - let args = Args::parse(); - tracing_subscriber::fmt() - .with_max_level(Level::from_str(&args.log_level)?) - .try_init() - .anyhow()?; +pub(super) async fn run_backend(global_opts: GlobalOptions, args: RunArgs) -> Result<()> { let mut join_set: JoinSet> = JoinSet::new(); let token = CancellationToken::new(); - let mongo_cfg = MongoConfig::new(args.mongo_url.clone()); + let db_url = args.mongo_url.to_string() + "jz_action"; + let db_repo = MongoMainDbRepo::new(db_url.as_str()).await?; let client = Client::try_default().await.unwrap(); + + let driver = KubeDriver::new(client.clone(), db_url.as_str()).await?; let job_manager = - JobManager::::new(client, mongo_cfg.clone()).await?; - let db_repo = MongoMainDbRepo::new(mongo_cfg, "backend").await?; - let server = start_rpc_server(&args.listen, db_repo, job_manager).unwrap(); + JobManager::, MongoMainDbRepo, MongoRunDbRepo>::new( + client, + driver, + db_repo.clone(), + &args.mongo_url, + ) + .await?; + let server = start_rpc_server(&global_opts.listen, db_repo, job_manager).unwrap(); let handler = server.handle(); { //listen unix socket let token = token.clone(); let handler = handler.clone(); join_set.spawn(async move { - info!("start ipc server {}", &args.listen); + info!("start ipc server {}", &global_opts.listen); tokio::spawn(server); select! { _ = token.cancelled() => { diff --git a/src/core/db_config.rs b/src/core/db_config.rs deleted file mode 100644 index 81e66d0..0000000 --- a/src/core/db_config.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub trait DBConfig { - fn connection_string(&self) -> &str; -} diff --git a/src/core/main_db_models.rs b/src/core/main_db_models.rs index f01b282..97d588e 100644 --- a/src/core/main_db_models.rs +++ b/src/core/main_db_models.rs @@ -16,6 +16,7 @@ pub enum JobState { #[derive(Serialize, Deserialize, Debug)] pub struct Job { pub id: ObjectId, + pub name: String, pub graph_json: String, pub state: JobState, pub retry_number: u32, @@ -23,8 +24,13 @@ pub struct Job { pub updated_at: i64, } +#[derive(Serialize, Deserialize, Debug)] +pub struct JobUpdateInfo { + pub state: Option, +} + pub trait JobRepo { - fn insert(&self, job: &Job) -> impl std::future::Future> + Send; + fn insert(&self, job: &Job) -> impl std::future::Future> + Send; fn get(&self, id: &ObjectId) -> impl std::future::Future>> + Send; @@ -32,10 +38,10 @@ pub trait JobRepo { fn get_job_for_running(&self) -> impl std::future::Future>> + Send; - fn update_job( + fn update( &self, id: &ObjectId, - state: &JobState, + info: &JobUpdateInfo, ) -> impl std::future::Future> + Send; fn list_jobs(&self) -> impl std::future::Future>> + Send; diff --git a/src/core/mod.rs b/src/core/mod.rs index 9c54402..ea3b129 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -1,7 +1,6 @@ mod cnode; mod spec; -mod db_config; mod job_db_models; mod main_db_models; @@ -10,7 +9,6 @@ pub use spec::*; pub mod db { pub use super::{ - db_config::*, job_db_models::*, main_db_models::*, }; diff --git a/src/dbrepo/job_db_mongo.rs b/src/dbrepo/job_db_mongo.rs index 7bf702f..d82157c 100644 --- a/src/dbrepo/job_db_mongo.rs +++ b/src/dbrepo/job_db_mongo.rs @@ -1,6 +1,5 @@ use crate::{ core::db::{ - DBConfig, DataRecord, DataRepo, DataState, @@ -24,7 +23,10 @@ use futures::TryStreamExt; use mongodb::{ bson::doc, error::ErrorKind, - options::IndexOptions, + options::{ + ClientOptions, + IndexOptions, + }, Client, Collection, IndexModel, @@ -43,12 +45,15 @@ pub struct MongoRunDbRepo { } impl MongoRunDbRepo { - pub async fn new(config: DBC, db_name: &str) -> Result - where - DBC: DBConfig, - { - let client = Client::with_uri_str(config.connection_string()).await?; - let database = client.database(db_name); + pub async fn new(connectstring: &str) -> Result { + let options = ClientOptions::parse(connectstring).await?; + let database = options + .default_database + .as_ref() + .expect("set db name in url") + .clone(); + let client = Client::with_options(options)?; + let database = client.database(database.as_str()); let graph_col: Collection = database.collection(&GRAPH_COL_NAME); let node_col: Collection = database.collection(&NODE_COL_NAME); let data_col: Collection = database.collection(&DATA_COL_NAME); @@ -65,11 +70,11 @@ impl MongoRunDbRepo { .options(idx_opts) .build(); - if let Err(e) = node_col.create_index(index).await { - match *e.kind { + if let Err(err) = node_col.create_index(index).await { + match *err.kind { ErrorKind::Command(ref command_error) if command_error.code == 85 => {} - e => { - return Err(anyhow!("create index error {}", e)); + err => { + return Err(anyhow!("create index error {err}")); } } } @@ -86,11 +91,11 @@ impl MongoRunDbRepo { .options(idx_opts) .build(); - if let Err(e) = data_col.create_index(index).await { - match *e.kind { + if let Err(err) = data_col.create_index(index).await { + match *err.kind { ErrorKind::Command(ref command_error) if command_error.code == 85 => {} - e => { - return Err(anyhow!("create index error {}", e)); + err => { + return Err(anyhow!("create index error {err}")); } } } @@ -107,11 +112,11 @@ impl MongoRunDbRepo { .options(idx_opts) .build(); - if let Err(e) = data_col.create_index(index).await { - match *e.kind { + if let Err(err) = data_col.create_index(index).await { + match *err.kind { ErrorKind::Command(ref command_error) if command_error.code == 85 => {} - e => { - return Err(anyhow!("create index error {}", e)); + err => { + return Err(anyhow!("create index error {err}")); } } } @@ -134,7 +139,7 @@ impl GraphRepo for MongoRunDbRepo { match self.graph_col.find_one(doc! {}).await { Ok(None) => Err(anyhow!("global state not exit")), Ok(Some(val)) => Ok(val), - Err(e) => Err(e.into()), + Err(err) => Err(err.into()), } } } @@ -148,7 +153,7 @@ impl NodeRepo for MongoRunDbRepo { match self.node_col.find_one(doc! {"node_name":name}).await { Ok(None) => Err(anyhow!("node not exit")), Ok(Some(val)) => Ok(val), - Err(e) => Err(e.into()), + Err(err) => Err(err.into()), } } } diff --git a/src/dbrepo/main_db_mongo.rs b/src/dbrepo/main_db_mongo.rs index 9e73439..372e4e3 100644 --- a/src/dbrepo/main_db_mongo.rs +++ b/src/dbrepo/main_db_mongo.rs @@ -1,11 +1,14 @@ use crate::{ core::db::{ - DBConfig, Job, JobRepo, JobState, + JobUpdateInfo, + }, + utils::{ + IntoAnyhowResult, + StdIntoAnyhowResult, }, - utils::StdIntoAnyhowResult, }; use anyhow::{ @@ -25,7 +28,10 @@ use mongodb::{ oid::ObjectId, }, error::ErrorKind, - options::IndexOptions, + options::{ + ClientOptions, + IndexOptions, + }, Client, Collection, IndexModel, @@ -41,14 +47,39 @@ pub struct MongoMainDbRepo { } impl MongoMainDbRepo { - pub async fn new(config: DBC, db_name: &str) -> Result - where - DBC: DBConfig, - { - let client = Client::with_uri_str(config.connection_string()).await?; - let database = client.database(db_name); + pub async fn new(connectstring: &str) -> Result { + let options = ClientOptions::parse(connectstring).await?; + let database = options + .default_database + .as_ref() + .expect("set db name in url") + .clone(); + let client = Client::with_options(options)?; + let database = client.database(database.as_str()); let job_col: Collection = database.collection(&JOB_COL_NAME); + { + //create index for jobs + let idx_opts: IndexOptions = IndexOptions::builder() + .unique(true) + .name("idx_name".to_owned()) + .build(); + + let index = IndexModel::builder() + .keys(doc! { "name": 1 }) + .options(idx_opts) + .build(); + + if let Err(err) = job_col.create_index(index).await { + match *err.kind { + ErrorKind::Command(ref command_error) if command_error.code == 85 => {} + err => { + return Err(anyhow!("create job state index error {err}")); + } + } + } + } + { //create index for jobs let idx_opts: IndexOptions = @@ -59,11 +90,11 @@ impl MongoMainDbRepo { .options(idx_opts) .build(); - if let Err(e) = job_col.create_index(index).await { - match *e.kind { + if let Err(err) = job_col.create_index(index).await { + match *err.kind { ErrorKind::Command(ref command_error) if command_error.code == 85 => {} - e => { - return Err(anyhow!("create job state index error {}", e)); + err => { + return Err(anyhow!("create job state index error {err}")); } } } @@ -73,8 +104,14 @@ impl MongoMainDbRepo { } impl JobRepo for MongoMainDbRepo { - async fn insert(&self, job: &Job) -> Result<()> { - self.job_col.insert_one(job).await.map(|_| ()).anyhow() + async fn insert(&self, job: &Job) -> Result { + let inserted_id = self.job_col.insert_one(job).await?.inserted_id; + + self.job_col + .find_one(doc! {"_id": inserted_id}) + .await + .anyhow() + .and_then(|r| r.anyhow("insert job not found")) } async fn get_job_for_running(&self) -> Result> { @@ -83,6 +120,9 @@ impl JobRepo for MongoMainDbRepo { "state": to_variant_name(&JobState::Running)?, "updated_at":Utc::now().timestamp(), }, + "$inc":{ + "retry_number":1 + } }; self.job_col @@ -91,14 +131,15 @@ impl JobRepo for MongoMainDbRepo { .anyhow() } - async fn update_job(&self, id: &ObjectId, state: &JobState) -> Result<()> { - let update = doc! { - "$set": { - "state": to_variant_name(state)?, - "updated_at":Utc::now().timestamp(), - } + async fn update(&self, id: &ObjectId, info: &JobUpdateInfo) -> Result<()> { + let mut update_fields = doc! { + "updated_at":Utc::now().timestamp() }; + if let Some(state) = info.state.as_ref() { + update_fields.insert("state", to_variant_name(state)?); + } + let update = doc! {"$set": update_fields}; let query = doc! { "_id": id, }; diff --git a/src/dbrepo/mod.rs b/src/dbrepo/mod.rs index 58a0b9c..930c075 100644 --- a/src/dbrepo/mod.rs +++ b/src/dbrepo/mod.rs @@ -1,7 +1,5 @@ mod job_db_mongo; mod main_db_mongo; -mod mongo_config; pub use job_db_mongo::*; pub use main_db_mongo::*; -pub use mongo_config::*; diff --git a/src/dbrepo/mongo_config.rs b/src/dbrepo/mongo_config.rs deleted file mode 100644 index e99d1ad..0000000 --- a/src/dbrepo/mongo_config.rs +++ /dev/null @@ -1,20 +0,0 @@ -use serde::Serialize; - -use crate::core::db::DBConfig; - -#[derive(Clone, Serialize)] -pub struct MongoConfig { - pub mongo_url: String, -} - -impl MongoConfig { - pub fn new(mongo_url: String) -> Self { - MongoConfig { mongo_url } - } -} - -impl DBConfig for MongoConfig { - fn connection_string(&self) -> &str { - return &self.mongo_url; - } -} diff --git a/src/driver/driver.rs b/src/driver/driver.rs index 1fa442a..a5822ca 100644 --- a/src/driver/driver.rs +++ b/src/driver/driver.rs @@ -46,7 +46,7 @@ pub trait PipelineController { ) -> impl std::future::Future> + Send; } -pub trait Driver { +pub trait Driver: 'static + Clone + Send + Sync { //deploy graph to cluster fn deploy( &self, diff --git a/src/driver/kube.rs b/src/driver/kube.rs index b3bc422..9da97d6 100644 --- a/src/driver/kube.rs +++ b/src/driver/kube.rs @@ -7,12 +7,10 @@ use super::{ use crate::{ core::{ db::{ - DBConfig, Graph, GraphRepo, JobDbRepo, Node, - NodeRepo, NodeType, TrackerState, }, @@ -48,6 +46,7 @@ use kube::{ DeleteParams, PostParams, }, + runtime::controller::RunnerError, Api, Client, }; @@ -185,23 +184,22 @@ fn join_array( } } -pub struct KubeDriver<'reg, R, DBC> +#[derive(Clone)] +pub struct KubeDriver where R: JobDbRepo, - DBC: Clone + Serialize + Send + Sync + DBConfig + 'static, { - reg: Handlebars<'reg>, + reg: Handlebars<'static>, client: Client, - db_config: DBC, + db_url: String, _phantom_data: PhantomData, } -impl<'reg, R, DBC> KubeDriver<'reg, R, DBC> +impl KubeDriver where R: JobDbRepo, - DBC: Clone + Serialize + Send + Sync + DBConfig, { - pub async fn new(client: Client, db_config: DBC) -> Result> { + pub async fn new(client: Client, db_url: &str) -> Result> { let mut reg = Handlebars::new(); reg.register_template_string("claim", include_str!("kubetpl/claim.tpl"))?; @@ -219,7 +217,7 @@ where Ok(KubeDriver { reg, client, - db_config, + db_url: db_url.to_string(), _phantom_data: PhantomData, }) } @@ -245,8 +243,8 @@ where let _ = Retry::spawn(retry_strategy, || async { match namespaces.get(ns).await { Ok(_) => Err(anyhow!("expect deleted")), - Err(e) => { - if e.to_string().contains("not found") { + Err(err) => { + if err.to_string().contains("not found") { Ok(()) } else { Err(anyhow!("retry")) @@ -270,20 +268,16 @@ struct ClaimRenderParams { } #[derive(Serialize)] -struct NodeRenderParams<'a, DBC> -where - DBC: Sized + Serialize + Send + Sync + DBConfig + 'static, -{ +struct NodeRenderParams<'a> { node: &'a ComputeUnit, log_level: &'a str, - db: DBC, + db_url: &'a str, run_id: &'a str, } -impl Driver for KubeDriver<'_, R, DBC> +impl Driver for KubeDriver where R: JobDbRepo, - DBC: Clone + Serialize + Send + Sync + DBConfig + 'static, { #[allow(refining_impl_trait)] async fn deploy( @@ -293,7 +287,8 @@ where ) -> Result> { Self::ensure_namespace_exit_and_clean(&self.client, run_id).await?; - let repo = MongoRunDbRepo::new(self.db_config.clone(), run_id).await?; + let db_url = self.db_url.to_string() + "/" + run_id; + let repo = MongoRunDbRepo::new(db_url.as_str()).await?; let statefulset_api: Api = Api::namespaced(self.client.clone(), run_id); let claim_api: Api = Api::namespaced(self.client.clone(), run_id); let service_api: Api = Api::namespaced(self.client.clone(), run_id); @@ -315,7 +310,7 @@ where let data_unit_render_args = NodeRenderParams { node, - db: self.db_config.clone(), + db_url: db_url.as_str(), log_level: "debug", run_id, }; @@ -503,10 +498,7 @@ where mod tests { use std::env; - use crate::dbrepo::{ - MongoConfig, - MongoRunDbRepo, - }; + use crate::dbrepo::MongoRunDbRepo; use super::*; @@ -565,17 +557,12 @@ mod tests { "#; let dag = Dag::from_json(json_str).unwrap(); - let mongo_cfg = MongoConfig { - mongo_url: "mongodb://192.168.3.163:27017".to_string(), - }; - - let client = MongoClient::with_uri_str(mongo_cfg.connection_string()) - .await - .unwrap(); + let db_url = "mongodb://192.168.3.163:27017/ntest"; + let client = MongoClient::with_uri_str(db_url).await.unwrap(); client.database("ntest").drop().await.unwrap(); let client = Client::try_default().await.unwrap(); - let kube_driver = KubeDriver::::new(client, mongo_cfg) + let kube_driver = KubeDriver::::new(client, db_url) .await .unwrap(); kube_driver.deploy("ntest", &dag).await.unwrap(); diff --git a/src/driver/kubetpl/channel_statefulset.tpl b/src/driver/kubetpl/channel_statefulset.tpl index 242cacb..6b0098a 100644 --- a/src/driver/kubetpl/channel_statefulset.tpl +++ b/src/driver/kubetpl/channel_statefulset.tpl @@ -34,8 +34,7 @@ "args": [ "--node-name={{{node.name}}}-channel", "--log-level={{{log_level}}}", - "--mongo-url={{{db.mongo_url}}}", - "--database={{{run_id}}}" + "--mongo-url={{{db_url}}}", {{#if (eq node.channel.cache_type "Disk") }},"--tmp-path=/app/tmp"{{/if}} ], "ports": [ diff --git a/src/driver/kubetpl/statefulset.tpl b/src/driver/kubetpl/statefulset.tpl index 473fe48..abcbce2 100644 --- a/src/driver/kubetpl/statefulset.tpl +++ b/src/driver/kubetpl/statefulset.tpl @@ -32,8 +32,7 @@ "args": [ "--node-name={{{node.name}}}", "--log-level={{{log_level}}}", - "--mongo-url={{{db.mongo_url}}}", - "--database={{{run_id}}}" + "--mongo-url={{{db_url}}}" {{#if (eq node.spec.cache_type "Disk") }},"--tmp-path=/app/tmp"{{/if}} ], "imagePullPolicy": "IfNotPresent", diff --git a/src/job/job_mgr.rs b/src/job/job_mgr.rs index b7440be..d3b0fbe 100644 --- a/src/job/job_mgr.rs +++ b/src/job/job_mgr.rs @@ -1,29 +1,76 @@ use crate::{ core::db::{ - DBConfig, + Job, JobDbRepo, + MainDbRepo, + }, + dag::Dag, + driver::{ + kube::KubeDriver, + Driver, }, - driver::kube::KubeDriver, }; use anyhow::Result; +use futures::future::join; use kube::Client; -use serde::Serialize; +use std::marker::PhantomData; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use tracing::error; +pub struct JobManager +where + D: Driver, + JOBR: JobDbRepo, + MAINR: MainDbRepo, +{ + driver: D, + db: MAINR, + _phantom_data: PhantomData, +} -pub struct JobManager<'reg, JOBR, DBC> +impl JobManager where + D: Driver, JOBR: JobDbRepo, - DBC: Clone + Serialize + Send + Sync + DBConfig + 'static, + MAINR: MainDbRepo, { - driver: KubeDriver<'reg, JOBR, DBC>, + pub async fn new(client: Client, driver: D, db: MAINR, db_url: &str) -> Result { + Ok(JobManager { + db, + driver, + _phantom_data: PhantomData, + }) + } } -impl<'reg, JOBR, DBC> JobManager<'reg, JOBR, DBC> +impl JobManager where + D: Driver, JOBR: JobDbRepo, - DBC: Clone + Serialize + Send + Sync + DBConfig, + MAINR: MainDbRepo, { - pub async fn new(client: Client, db_config: DBC) -> Result> { - let driver = KubeDriver::new(client, db_config).await?; - Ok(JobManager { driver }) + pub async fn run_backend(&self, token: CancellationToken) -> Result>> { + let mut join_set = JoinSet::new(); + { + let db = self.db.clone(); + let driver = self.driver.clone(); + join_set.spawn(async move { + loop { + if token.is_cancelled() { + return Ok(()); + } + + while let Some(job) = db.get_job_for_running().await? { + let dag = Dag::from_json(job.graph_json.as_str())?; + let namespace = format!("{}-{}", job.name, job.retry_number); + if let Err(err) = driver.deploy(namespace.as_str(), &dag).await { + error!("run job {} {err}", job.name); + }; + } + } + }); + } + + Ok(join_set) } } diff --git a/src/utils/utils.rs b/src/utils/utils.rs index fb9ca9a..15a8429 100644 --- a/src/utils/utils.rs +++ b/src/utils/utils.rs @@ -33,7 +33,7 @@ where fn anyhow(self) -> Result { match self { Ok(v) => Ok(v), - Err(e) => Err(anyhow!("{}", e)), + Err(err) => Err(anyhow!("{err}")), } } } @@ -46,7 +46,7 @@ impl AnyhowToGrpc for Result { fn to_rpc(self, code: Code) -> std::result::Result { match self { Ok(v) => Ok(v), - Err(e) => Err(Status::new(code, e.to_string())), + Err(err) => Err(Status::new(code, err.to_string())), } } }